namespace util { | |
namespace stream { | |
/** | |
* A RewindableStream is like a Stream (but one that is only used for | |
* creating input at the start of a chain) except that it can be rewound to | |
* be able to re-write a part of the stream before it is sent. Rewinding | |
* has a limit of 2 * block_size_ - 1 in distance (it does *not* buffer an | |
* entire stream into memory, only a maximum of 2 * block_size_). | |
*/ | |
class RewindableStream : boost::noncopyable { | |
public: | |
/** | |
* Creates an uninitialized RewindableStream. You **must** call Init() | |
* on it later! | |
*/ | |
RewindableStream(); | |
~RewindableStream() { | |
Poison(); | |
} | |
/** | |
* Initializes an existing RewindableStream at a specific position in | |
* a Chain. | |
* | |
* @param position The position in the chain to get input from and | |
* produce output on | |
*/ | |
void Init(const ChainPosition &position); | |
/** | |
* Constructs a RewindableStream at a specific position in a Chain all | |
* in one step. | |
* | |
* Equivalent to RewindableStream a(); a.Init(....); | |
*/ | |
explicit RewindableStream(const ChainPosition &position) | |
: in_(NULL) { | |
Init(position); | |
} | |
/** | |
* Gets the record at the current stream position. Const version. | |
*/ | |
const void *Get() const { | |
assert(!poisoned_); | |
assert(current_); | |
return current_; | |
} | |
/** | |
* Gets the record at the current stream position. | |
*/ | |
void *Get() { | |
assert(!poisoned_); | |
assert(current_); | |
return current_; | |
} | |
operator bool() const { return !poisoned_; } | |
bool operator!() const { return poisoned_; } | |
/** | |
* Marks the current position in the stream to be rewound to later. | |
* Note that you can only rewind back as far as 2 * block_size_ - 1! | |
*/ | |
void Mark(); | |
/** | |
* Rewinds the stream back to the marked position. This will throw an | |
* exception if the marked position is too far away. | |
*/ | |
void Rewind(); | |
/** | |
* Moves the stream forward to the next record. This internally may | |
* buffer a block for the purposes of rewinding. | |
*/ | |
RewindableStream& operator++(); | |
/** | |
* Poisons the stream. This sends any buffered blocks down the chain | |
* and sends a poison block as well (sending at most 2 non-poison and 1 | |
* poison block). | |
*/ | |
void Poison(); | |
private: | |
void AppendBlock(); | |
void Flush(std::deque<Block>::iterator to); | |
std::deque<Block> blocks_; | |
// current_ is in blocks_[blocks_it_] unless poisoned_. | |
std::size_t blocks_it_; | |
std::size_t entry_size_; | |
std::size_t block_size_; | |
std::size_t block_count_; | |
uint8_t *marked_, *current_; | |
const uint8_t *block_end_; | |
PCQueue<Block> *in_, *out_; | |
// Have we hit poison at the end of the stream, even if rewinding? | |
bool hit_poison_; | |
// Is the curren position poison? | |
bool poisoned_; | |
WorkerProgress progress_; | |
}; | |
inline Chain &operator>>(Chain &chain, RewindableStream &stream) { | |
stream.Init(chain.Add()); | |
return chain; | |
} | |
} | |
} | |