|
#include "chain.hh" |
|
|
|
#include "io.hh" |
|
|
|
#include "../exception.hh" |
|
#include "../pcqueue.hh" |
|
|
|
#include <cstdlib> |
|
#include <new> |
|
#include <iostream> |
|
#include <stdint.h> |
|
|
|
namespace util { |
|
namespace stream { |
|
|
|
ChainConfigException::ChainConfigException() throw() { *this << "Chain configured with "; } |
|
ChainConfigException::~ChainConfigException() throw() {} |
|
|
|
Thread::~Thread() { |
|
thread_.join(); |
|
} |
|
|
|
void Thread::UnhandledException(const std::exception &e) { |
|
std::cerr << e.what() << std::endl; |
|
abort(); |
|
} |
|
|
|
void Recycler::Run(const ChainPosition &position) { |
|
for (Link l(position); l; ++l) { |
|
l->SetValidSize(position.GetChain().BlockSize()); |
|
} |
|
} |
|
|
|
const Recycler kRecycle = Recycler(); |
|
|
|
Chain::Chain(const ChainConfig &config) : config_(config), complete_called_(false) { |
|
UTIL_THROW_IF(!config.entry_size, ChainConfigException, "zero-size entries."); |
|
UTIL_THROW_IF(!config.block_count, ChainConfigException, "block count zero"); |
|
UTIL_THROW_IF(config.total_memory < config.entry_size * config.block_count, ChainConfigException, config.total_memory << " total memory, too small for " << config.block_count << " blocks of containing entries of size " << config.entry_size); |
|
|
|
block_size_ = config.total_memory / (config.block_count * config.entry_size) * config.entry_size; |
|
} |
|
|
|
Chain::~Chain() { |
|
Wait(); |
|
} |
|
|
|
ChainPosition Chain::Add() { |
|
if (!Running()) Start(); |
|
PCQueue<Block> &in = queues_.back(); |
|
queues_.push_back(new PCQueue<Block>(config_.block_count)); |
|
return ChainPosition(in, queues_.back(), this, progress_); |
|
} |
|
|
|
Chain &Chain::operator>>(const WriteAndRecycle &writer) { |
|
threads_.push_back(new Thread(Complete(), writer)); |
|
return *this; |
|
} |
|
|
|
void Chain::Wait(bool release_memory) { |
|
if (queues_.empty()) { |
|
assert(threads_.empty()); |
|
return; |
|
} |
|
if (!complete_called_) CompleteLoop(); |
|
threads_.clear(); |
|
for (std::size_t i = 0; queues_.front().Consume(); ++i) { |
|
if (i == config_.block_count) { |
|
std::cerr << "Chain ending without poison." << std::endl; |
|
abort(); |
|
} |
|
} |
|
queues_.clear(); |
|
progress_.Finished(); |
|
complete_called_ = false; |
|
if (release_memory) memory_.reset(); |
|
} |
|
|
|
void Chain::Start() { |
|
Wait(false); |
|
if (!memory_.get()) { |
|
|
|
assert(threads_.empty()); |
|
assert(queues_.empty()); |
|
std::size_t malloc_size = block_size_ * config_.block_count; |
|
memory_.reset(MallocOrThrow(malloc_size)); |
|
} |
|
|
|
queues_.push_back(new PCQueue<Block>(config_.block_count)); |
|
|
|
uint8_t *base = static_cast<uint8_t*>(memory_.get()); |
|
for (std::size_t i = 0; i < config_.block_count; ++i) { |
|
queues_.front().Produce(Block(base, block_size_)); |
|
base += block_size_; |
|
} |
|
} |
|
|
|
ChainPosition Chain::Complete() { |
|
assert(Running()); |
|
UTIL_THROW_IF(complete_called_, util::Exception, "CompleteLoop() called twice"); |
|
complete_called_ = true; |
|
return ChainPosition(queues_.back(), queues_.front(), this, progress_); |
|
} |
|
|
|
Link::Link() : in_(NULL), out_(NULL), poisoned_(true) {} |
|
|
|
void Link::Init(const ChainPosition &position) { |
|
UTIL_THROW_IF(in_, util::Exception, "Link::Init twice"); |
|
in_ = position.in_; |
|
out_ = position.out_; |
|
poisoned_ = false; |
|
progress_ = position.progress_; |
|
in_->Consume(current_); |
|
} |
|
|
|
Link::Link(const ChainPosition &position) : in_(NULL) { |
|
Init(position); |
|
} |
|
|
|
Link::~Link() { |
|
if (current_) { |
|
|
|
std::cerr << "Last input should have been poison. The program should end soon with an error. If it doesn't, there's a bug." << std::endl; |
|
|
|
} else { |
|
if (!poisoned_) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
out_->Produce(current_); |
|
} |
|
} |
|
} |
|
|
|
Link &Link::operator++() { |
|
assert(current_); |
|
progress_ += current_.ValidSize(); |
|
out_->Produce(current_); |
|
in_->Consume(current_); |
|
if (!current_) { |
|
poisoned_ = true; |
|
out_->Produce(current_); |
|
} |
|
return *this; |
|
} |
|
|
|
void Link::Poison() { |
|
assert(!poisoned_); |
|
current_.SetToPoison(); |
|
out_->Produce(current_); |
|
poisoned_ = true; |
|
} |
|
|
|
} |
|
} |
|
|