File size: 3,362 Bytes
1ce325b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
#ifndef UTIL_STREAM_MULTI_STREAM_H
#define UTIL_STREAM_MULTI_STREAM_H
#include "../fixed_array.hh"
#include "../scoped.hh"
#include "chain.hh"
#include "stream.hh"
#include <cstddef>
#include <new>
#include <cassert>
#include <cstdlib>
namespace util { namespace stream {
class Chains;
class ChainPositions : public util::FixedArray<util::stream::ChainPosition> {
public:
ChainPositions() {}
explicit ChainPositions(std::size_t bound) :
util::FixedArray<util::stream::ChainPosition>(bound) {}
void Init(Chains &chains);
explicit ChainPositions(Chains &chains) {
Init(chains);
}
};
class Chains : public util::FixedArray<util::stream::Chain> {
private:
template <class T, void (T::*ptr)(const ChainPositions &) = &T::Run> struct CheckForRun {
typedef Chains type;
};
public:
// Must call Init.
Chains() {}
explicit Chains(std::size_t limit) : util::FixedArray<util::stream::Chain>(limit) {}
template <class Worker> typename CheckForRun<Worker>::type &operator>>(const Worker &worker) {
threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker));
return *this;
}
template <class Worker> typename CheckForRun<Worker>::type &operator>>(const boost::reference_wrapper<Worker> &worker) {
threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker));
return *this;
}
Chains &operator>>(const util::stream::Recycler &recycler) {
for (util::stream::Chain *i = begin(); i != end(); ++i)
*i >> recycler;
return *this;
}
void Wait(bool release_memory = true) {
threads_.clear();
for (util::stream::Chain *i = begin(); i != end(); ++i) {
i->Wait(release_memory);
}
}
private:
boost::ptr_vector<util::stream::Thread> threads_;
Chains(const Chains &);
void operator=(const Chains &);
};
inline void ChainPositions::Init(Chains &chains) {
util::FixedArray<util::stream::ChainPosition>::Init(chains.size());
for (util::stream::Chain *i = chains.begin(); i != chains.end(); ++i) {
// use "placement new" syntax to initalize ChainPosition in an already-allocated memory location
new (end()) util::stream::ChainPosition(i->Add()); Constructed();
}
}
inline Chains &operator>>(Chains &chains, ChainPositions &positions) {
positions.Init(chains);
return chains;
}
template <class T> class GenericStreams : public util::FixedArray<T> {
private:
typedef util::FixedArray<T> P;
public:
GenericStreams() {}
// Limit restricts to positions[0,limit)
void Init(const ChainPositions &positions, std::size_t limit) {
P::Init(limit);
for (const util::stream::ChainPosition *i = positions.begin(); i != positions.begin() + limit; ++i) {
P::push_back(*i);
}
}
void Init(const ChainPositions &positions) {
Init(positions, positions.size());
}
GenericStreams(const ChainPositions &positions) {
Init(positions);
}
void Init(std::size_t amount) {
P::Init(amount);
}
};
template <class T> inline Chains &operator>>(Chains &chains, GenericStreams<T> &streams) {
ChainPositions positions;
chains >> positions;
streams.Init(positions);
return chains;
}
typedef GenericStreams<Stream> Streams;
}} // namespaces
#endif // UTIL_STREAM_MULTI_STREAM_H
|