|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#ifndef UTIL_STREAM_SORT_H |
|
#define UTIL_STREAM_SORT_H |
|
|
|
#include "util/stream/chain.hh" |
|
#include "util/stream/config.hh" |
|
#include "util/stream/io.hh" |
|
#include "util/stream/stream.hh" |
|
#include "util/stream/timer.hh" |
|
|
|
#include "util/file.hh" |
|
#include "util/scoped.hh" |
|
#include "util/sized_iterator.hh" |
|
|
|
#include <algorithm> |
|
#include <iostream> |
|
#include <queue> |
|
#include <string> |
|
|
|
namespace util { |
|
namespace stream { |
|
|
|
struct NeverCombine { |
|
template <class Compare> bool operator()(const void *, const void *, const Compare &) const { |
|
return false; |
|
} |
|
}; |
|
|
|
|
|
class Offsets { |
|
public: |
|
explicit Offsets(int fd) : log_(fd) { |
|
Reset(); |
|
} |
|
|
|
int File() const { return log_; } |
|
|
|
void Append(uint64_t length) { |
|
if (!length) return; |
|
++block_count_; |
|
if (length == cur_.length) { |
|
++cur_.run; |
|
return; |
|
} |
|
WriteOrThrow(log_, &cur_, sizeof(Entry)); |
|
cur_.length = length; |
|
cur_.run = 1; |
|
} |
|
|
|
void FinishedAppending() { |
|
WriteOrThrow(log_, &cur_, sizeof(Entry)); |
|
SeekOrThrow(log_, sizeof(Entry)); |
|
cur_.run = 0; |
|
if (block_count_) { |
|
ReadOrThrow(log_, &cur_, sizeof(Entry)); |
|
assert(cur_.length); |
|
assert(cur_.run); |
|
} |
|
} |
|
|
|
uint64_t RemainingBlocks() const { return block_count_; } |
|
|
|
uint64_t TotalOffset() const { return output_sum_; } |
|
|
|
uint64_t PeekSize() const { |
|
return cur_.length; |
|
} |
|
|
|
uint64_t NextSize() { |
|
assert(block_count_); |
|
uint64_t ret = cur_.length; |
|
output_sum_ += ret; |
|
|
|
--cur_.run; |
|
--block_count_; |
|
if (!cur_.run && block_count_) { |
|
ReadOrThrow(log_, &cur_, sizeof(Entry)); |
|
assert(cur_.length); |
|
assert(cur_.run); |
|
} |
|
return ret; |
|
} |
|
|
|
void Reset() { |
|
SeekOrThrow(log_, 0); |
|
ResizeOrThrow(log_, 0); |
|
cur_.length = 0; |
|
cur_.run = 0; |
|
block_count_ = 0; |
|
output_sum_ = 0; |
|
} |
|
|
|
private: |
|
int log_; |
|
|
|
struct Entry { |
|
uint64_t length; |
|
uint64_t run; |
|
}; |
|
Entry cur_; |
|
|
|
uint64_t block_count_; |
|
|
|
uint64_t output_sum_; |
|
}; |
|
|
|
|
|
template <class Compare> class MergeQueue { |
|
public: |
|
MergeQueue(int fd, std::size_t buffer_size, std::size_t entry_size, const Compare &compare) |
|
: queue_(Greater(compare)), in_(fd), buffer_size_(buffer_size), entry_size_(entry_size) {} |
|
|
|
void Push(void *base, uint64_t offset, uint64_t amount) { |
|
queue_.push(Entry(base, in_, offset, amount, buffer_size_)); |
|
} |
|
|
|
const void *Top() const { |
|
return queue_.top().Current(); |
|
} |
|
|
|
void Pop() { |
|
Entry top(queue_.top()); |
|
queue_.pop(); |
|
if (top.Increment(in_, buffer_size_, entry_size_)) |
|
queue_.push(top); |
|
} |
|
|
|
std::size_t Size() const { |
|
return queue_.size(); |
|
} |
|
|
|
bool Empty() const { |
|
return queue_.empty(); |
|
} |
|
|
|
private: |
|
|
|
class Entry { |
|
public: |
|
Entry() {} |
|
|
|
Entry(void *base, int fd, uint64_t offset, uint64_t amount, std::size_t buf_size) { |
|
offset_ = offset; |
|
remaining_ = amount; |
|
buffer_end_ = static_cast<uint8_t*>(base) + buf_size; |
|
Read(fd, buf_size); |
|
} |
|
|
|
bool Increment(int fd, std::size_t buf_size, std::size_t entry_size) { |
|
current_ += entry_size; |
|
if (current_ != buffer_end_) return true; |
|
return Read(fd, buf_size); |
|
} |
|
|
|
const void *Current() const { return current_; } |
|
|
|
private: |
|
bool Read(int fd, std::size_t buf_size) { |
|
current_ = buffer_end_ - buf_size; |
|
std::size_t amount; |
|
if (static_cast<uint64_t>(buf_size) < remaining_) { |
|
amount = buf_size; |
|
} else if (!remaining_) { |
|
return false; |
|
} else { |
|
amount = remaining_; |
|
buffer_end_ = current_ + remaining_; |
|
} |
|
ErsatzPRead(fd, current_, amount, offset_); |
|
offset_ += amount; |
|
assert(current_ <= buffer_end_); |
|
remaining_ -= amount; |
|
return true; |
|
} |
|
|
|
|
|
uint8_t *current_, *buffer_end_; |
|
|
|
uint64_t remaining_, offset_; |
|
}; |
|
|
|
|
|
class Greater : public std::binary_function<const Entry &, const Entry &, bool> { |
|
public: |
|
explicit Greater(const Compare &compare) : compare_(compare) {} |
|
|
|
bool operator()(const Entry &first, const Entry &second) const { |
|
return compare_(second.Current(), first.Current()); |
|
} |
|
|
|
private: |
|
const Compare compare_; |
|
}; |
|
|
|
typedef std::priority_queue<Entry, std::vector<Entry>, Greater> Queue; |
|
Queue queue_; |
|
|
|
const int in_; |
|
const std::size_t buffer_size_; |
|
const std::size_t entry_size_; |
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template <class Compare, class Combine> class MergingReader { |
|
public: |
|
MergingReader(int in, Offsets *in_offsets, Offsets *out_offsets, std::size_t buffer_size, std::size_t total_memory, const Compare &compare, const Combine &combine) : |
|
compare_(compare), combine_(combine), |
|
in_(in), |
|
in_offsets_(in_offsets), out_offsets_(out_offsets), |
|
buffer_size_(buffer_size), total_memory_(total_memory) {} |
|
|
|
void Run(const ChainPosition &position) { |
|
Run(position, false); |
|
} |
|
|
|
void Run(const ChainPosition &position, bool assert_one) { |
|
|
|
if (!in_offsets_->RemainingBlocks()) { |
|
Link l(position); |
|
l.Poison(); |
|
return; |
|
} |
|
|
|
if (in_offsets_->RemainingBlocks() == 1) { |
|
|
|
uint64_t offset = in_offsets_->TotalOffset(); |
|
uint64_t amount = in_offsets_->NextSize(); |
|
ReadSingle(offset, amount, position); |
|
if (out_offsets_) out_offsets_->Append(amount); |
|
return; |
|
} |
|
|
|
Stream str(position); |
|
scoped_malloc buffer(MallocOrThrow(total_memory_)); |
|
uint8_t *const buffer_end = static_cast<uint8_t*>(buffer.get()) + total_memory_; |
|
|
|
const std::size_t entry_size = position.GetChain().EntrySize(); |
|
|
|
while (in_offsets_->RemainingBlocks()) { |
|
|
|
uint64_t per_buffer = static_cast<uint64_t>(std::max<std::size_t>( |
|
buffer_size_, |
|
static_cast<std::size_t>((static_cast<uint64_t>(total_memory_) / in_offsets_->RemainingBlocks())))); |
|
per_buffer -= per_buffer % entry_size; |
|
assert(per_buffer); |
|
|
|
|
|
MergeQueue<Compare> queue(in_, per_buffer, entry_size, compare_); |
|
for (uint8_t *buf = static_cast<uint8_t*>(buffer.get()); |
|
in_offsets_->RemainingBlocks() && (buf + std::min(per_buffer, in_offsets_->PeekSize()) <= buffer_end);) { |
|
uint64_t offset = in_offsets_->TotalOffset(); |
|
uint64_t size = in_offsets_->NextSize(); |
|
queue.Push(buf, offset, size); |
|
buf += static_cast<std::size_t>(std::min<uint64_t>(size, per_buffer)); |
|
} |
|
|
|
if (queue.Size() < 2 && in_offsets_->RemainingBlocks()) { |
|
std::cerr << "Bug in sort implementation: not merging at least two stripes." << std::endl; |
|
abort(); |
|
} |
|
if (assert_one && in_offsets_->RemainingBlocks()) { |
|
std::cerr << "Bug in sort implementation: should only be one merge group for lazy sort" << std::endl; |
|
abort(); |
|
} |
|
|
|
uint64_t written = 0; |
|
|
|
memcpy(str.Get(), queue.Top(), entry_size); |
|
for (queue.Pop(); !queue.Empty(); queue.Pop()) { |
|
if (!combine_(str.Get(), queue.Top(), compare_)) { |
|
++written; ++str; |
|
memcpy(str.Get(), queue.Top(), entry_size); |
|
} |
|
} |
|
++written; ++str; |
|
if (out_offsets_) |
|
out_offsets_->Append(written * entry_size); |
|
} |
|
str.Poison(); |
|
} |
|
|
|
private: |
|
void ReadSingle(uint64_t offset, const uint64_t size, const ChainPosition &position) { |
|
|
|
const uint64_t end = offset + size; |
|
const uint64_t block_size = position.GetChain().BlockSize(); |
|
Link l(position); |
|
for (; offset + block_size < end; ++l, offset += block_size) { |
|
ErsatzPRead(in_, l->Get(), block_size, offset); |
|
l->SetValidSize(block_size); |
|
} |
|
ErsatzPRead(in_, l->Get(), end - offset, offset); |
|
l->SetValidSize(end - offset); |
|
(++l).Poison(); |
|
return; |
|
} |
|
|
|
Compare compare_; |
|
Combine combine_; |
|
|
|
int in_; |
|
|
|
protected: |
|
Offsets *in_offsets_; |
|
|
|
private: |
|
Offsets *out_offsets_; |
|
|
|
std::size_t buffer_size_; |
|
std::size_t total_memory_; |
|
}; |
|
|
|
|
|
template <class Compare, class Combine> class OwningMergingReader : public MergingReader<Compare, Combine> { |
|
private: |
|
typedef MergingReader<Compare, Combine> P; |
|
public: |
|
OwningMergingReader(int data, const Offsets &offsets, std::size_t buffer, std::size_t lazy, const Compare &compare, const Combine &combine) |
|
: P(data, NULL, NULL, buffer, lazy, compare, combine), |
|
data_(data), |
|
offsets_(offsets) {} |
|
|
|
void Run(const ChainPosition &position) { |
|
P::in_offsets_ = &offsets_; |
|
scoped_fd data(data_); |
|
scoped_fd offsets_file(offsets_.File()); |
|
P::Run(position, true); |
|
} |
|
|
|
private: |
|
int data_; |
|
Offsets offsets_; |
|
}; |
|
|
|
|
|
template <class Compare> class BlockSorter { |
|
public: |
|
BlockSorter(Offsets &offsets, const Compare &compare) : |
|
offsets_(&offsets), compare_(compare) {} |
|
|
|
void Run(const ChainPosition &position) { |
|
const std::size_t entry_size = position.GetChain().EntrySize(); |
|
for (Link link(position); link; ++link) { |
|
|
|
offsets_->Append(link->ValidSize()); |
|
void *end = static_cast<uint8_t*>(link->Get()) + link->ValidSize(); |
|
#if defined(_WIN32) || defined(_WIN64) |
|
std::stable_sort |
|
#else |
|
std::sort |
|
#endif |
|
(SizedIt(link->Get(), entry_size), |
|
SizedIt(end, entry_size), |
|
compare_); |
|
} |
|
offsets_->FinishedAppending(); |
|
} |
|
|
|
private: |
|
Offsets *offsets_; |
|
SizedCompare<Compare> compare_; |
|
}; |
|
|
|
class BadSortConfig : public Exception { |
|
public: |
|
BadSortConfig() throw() {} |
|
~BadSortConfig() throw() {} |
|
}; |
|
|
|
|
|
template <class Compare, class Combine = NeverCombine> class Sort { |
|
public: |
|
|
|
Sort(Chain &in, const SortConfig &config, const Compare &compare = Compare(), const Combine &combine = Combine()) |
|
: config_(config), |
|
data_(MakeTemp(config.temp_prefix)), |
|
offsets_file_(MakeTemp(config.temp_prefix)), offsets_(offsets_file_.get()), |
|
compare_(compare), combine_(combine), |
|
entry_size_(in.EntrySize()) { |
|
UTIL_THROW_IF(!entry_size_, BadSortConfig, "Sorting entries of size 0"); |
|
|
|
config_.buffer_size -= config_.buffer_size % entry_size_; |
|
UTIL_THROW_IF(!config_.buffer_size, BadSortConfig, "Sort buffer too small"); |
|
UTIL_THROW_IF(config_.total_memory < config_.buffer_size * 4, BadSortConfig, "Sorting memory " << config_.total_memory << " is too small for four buffers (two read and two write)."); |
|
in >> BlockSorter<Compare>(offsets_, compare_) >> WriteAndRecycle(data_.get()); |
|
} |
|
|
|
uint64_t Size() const { |
|
return SizeOrThrow(data_.get()); |
|
} |
|
|
|
|
|
|
|
std::size_t Merge(std::size_t lazy_memory) { |
|
if (offsets_.RemainingBlocks() <= 1) return 0; |
|
const uint64_t lazy_arity = std::max<uint64_t>(1, lazy_memory / config_.buffer_size); |
|
uint64_t size = Size(); |
|
|
|
|
|
|
|
|
|
if (offsets_.RemainingBlocks() <= lazy_arity || size <= static_cast<uint64_t>(lazy_memory)) |
|
return std::min<std::size_t>(size, offsets_.RemainingBlocks() * config_.buffer_size); |
|
|
|
scoped_fd data2(MakeTemp(config_.temp_prefix)); |
|
int fd_in = data_.get(), fd_out = data2.get(); |
|
scoped_fd offsets2_file(MakeTemp(config_.temp_prefix)); |
|
Offsets offsets2(offsets2_file.get()); |
|
Offsets *offsets_in = &offsets_, *offsets_out = &offsets2; |
|
|
|
|
|
ChainConfig chain_config; |
|
chain_config.entry_size = entry_size_; |
|
chain_config.block_count = 2; |
|
chain_config.total_memory = config_.buffer_size * 2; |
|
Chain chain(chain_config); |
|
|
|
while (offsets_in->RemainingBlocks() > lazy_arity) { |
|
if (size <= static_cast<uint64_t>(lazy_memory)) break; |
|
std::size_t reading_memory = config_.total_memory - 2 * config_.buffer_size; |
|
if (size < static_cast<uint64_t>(reading_memory)) { |
|
reading_memory = static_cast<std::size_t>(size); |
|
} |
|
SeekOrThrow(fd_in, 0); |
|
chain >> |
|
MergingReader<Compare, Combine>( |
|
fd_in, |
|
offsets_in, offsets_out, |
|
config_.buffer_size, |
|
reading_memory, |
|
compare_, combine_) >> |
|
WriteAndRecycle(fd_out); |
|
chain.Wait(); |
|
offsets_out->FinishedAppending(); |
|
ResizeOrThrow(fd_in, 0); |
|
offsets_in->Reset(); |
|
std::swap(fd_in, fd_out); |
|
std::swap(offsets_in, offsets_out); |
|
size = SizeOrThrow(fd_in); |
|
} |
|
|
|
SeekOrThrow(fd_in, 0); |
|
if (fd_in == data2.get()) { |
|
data_.reset(data2.release()); |
|
offsets_file_.reset(offsets2_file.release()); |
|
offsets_ = offsets2; |
|
} |
|
if (offsets_.RemainingBlocks() <= 1) return 0; |
|
|
|
return std::min(size, offsets_.RemainingBlocks() * static_cast<uint64_t>(config_.buffer_size)); |
|
} |
|
|
|
|
|
|
|
void Output(Chain &out, std::size_t lazy_memory) { |
|
Merge(lazy_memory); |
|
out.SetProgressTarget(Size()); |
|
out >> OwningMergingReader<Compare, Combine>(data_.get(), offsets_, config_.buffer_size, lazy_memory, compare_, combine_); |
|
data_.release(); |
|
offsets_file_.release(); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
std::size_t DefaultLazy() { |
|
float arity = static_cast<float>(config_.total_memory / config_.buffer_size); |
|
return static_cast<std::size_t>(static_cast<float>(config_.total_memory) * (arity - 1.0) / arity); |
|
} |
|
|
|
|
|
void Output(Chain &out) { |
|
Output(out, DefaultLazy()); |
|
} |
|
|
|
|
|
int StealCompleted() { |
|
|
|
Merge(0); |
|
SeekOrThrow(data_.get(), 0); |
|
offsets_file_.reset(); |
|
return data_.release(); |
|
} |
|
|
|
private: |
|
SortConfig config_; |
|
|
|
scoped_fd data_; |
|
|
|
scoped_fd offsets_file_; |
|
Offsets offsets_; |
|
|
|
const Compare compare_; |
|
const Combine combine_; |
|
const std::size_t entry_size_; |
|
}; |
|
|
|
|
|
template <class Compare, class Combine> uint64_t BlockingSort(Chain &chain, const SortConfig &config, const Compare &compare = Compare(), const Combine &combine = NeverCombine()) { |
|
Sort<Compare, Combine> sorter(chain, config, compare, combine); |
|
chain.Wait(true); |
|
uint64_t size = sorter.Size(); |
|
sorter.Output(chain); |
|
return size; |
|
} |
|
|
|
} |
|
} |
|
|
|
#endif |
|
|