#ifndef UTIL_STREAM_MULTI_STREAM_H #define UTIL_STREAM_MULTI_STREAM_H #include "../fixed_array.hh" #include "../scoped.hh" #include "chain.hh" #include "stream.hh" #include #include #include #include namespace util { namespace stream { class Chains; class ChainPositions : public util::FixedArray { public: ChainPositions() {} explicit ChainPositions(std::size_t bound) : util::FixedArray(bound) {} void Init(Chains &chains); explicit ChainPositions(Chains &chains) { Init(chains); } }; class Chains : public util::FixedArray { private: template struct CheckForRun { typedef Chains type; }; public: // Must call Init. Chains() {} explicit Chains(std::size_t limit) : util::FixedArray(limit) {} template typename CheckForRun::type &operator>>(const Worker &worker) { threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker)); return *this; } template typename CheckForRun::type &operator>>(const boost::reference_wrapper &worker) { threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker)); return *this; } Chains &operator>>(const util::stream::Recycler &recycler) { for (util::stream::Chain *i = begin(); i != end(); ++i) *i >> recycler; return *this; } void Wait(bool release_memory = true) { threads_.clear(); for (util::stream::Chain *i = begin(); i != end(); ++i) { i->Wait(release_memory); } } private: boost::ptr_vector threads_; Chains(const Chains &); void operator=(const Chains &); }; inline void ChainPositions::Init(Chains &chains) { util::FixedArray::Init(chains.size()); for (util::stream::Chain *i = chains.begin(); i != chains.end(); ++i) { // use "placement new" syntax to initalize ChainPosition in an already-allocated memory location new (end()) util::stream::ChainPosition(i->Add()); Constructed(); } } inline Chains &operator>>(Chains &chains, ChainPositions &positions) { positions.Init(chains); return chains; } template class GenericStreams : public util::FixedArray { private: typedef util::FixedArray P; public: GenericStreams() {} // Limit restricts to positions[0,limit) void Init(const ChainPositions &positions, std::size_t limit) { P::Init(limit); for (const util::stream::ChainPosition *i = positions.begin(); i != positions.begin() + limit; ++i) { P::push_back(*i); } } void Init(const ChainPositions &positions) { Init(positions, positions.size()); } GenericStreams(const ChainPositions &positions) { Init(positions); } void Init(std::size_t amount) { P::Init(amount); } }; template inline Chains &operator>>(Chains &chains, GenericStreams &streams) { ChainPositions positions; chains >> positions; streams.Init(positions); return chains; } typedef GenericStreams Streams; }} // namespaces #endif // UTIL_STREAM_MULTI_STREAM_H