File size: 3,241 Bytes
1ce325b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
#ifndef UTIL_STREAM_REWINDABLE_STREAM_H
#define UTIL_STREAM_REWINDABLE_STREAM_H
#include "chain.hh"
#include <boost/noncopyable.hpp>
#include <deque>
namespace util {
namespace stream {
/**
* A RewindableStream is like a Stream (but one that is only used for
* creating input at the start of a chain) except that it can be rewound to
* be able to re-write a part of the stream before it is sent. Rewinding
* has a limit of 2 * block_size_ - 1 in distance (it does *not* buffer an
* entire stream into memory, only a maximum of 2 * block_size_).
*/
class RewindableStream : boost::noncopyable {
public:
/**
* Creates an uninitialized RewindableStream. You **must** call Init()
* on it later!
*/
RewindableStream();
~RewindableStream() {
Poison();
}
/**
* Initializes an existing RewindableStream at a specific position in
* a Chain.
*
* @param position The position in the chain to get input from and
* produce output on
*/
void Init(const ChainPosition &position);
/**
* Constructs a RewindableStream at a specific position in a Chain all
* in one step.
*
* Equivalent to RewindableStream a(); a.Init(....);
*/
explicit RewindableStream(const ChainPosition &position)
: in_(NULL) {
Init(position);
}
/**
* Gets the record at the current stream position. Const version.
*/
const void *Get() const {
assert(!poisoned_);
assert(current_);
return current_;
}
/**
* Gets the record at the current stream position.
*/
void *Get() {
assert(!poisoned_);
assert(current_);
return current_;
}
operator bool() const { return !poisoned_; }
bool operator!() const { return poisoned_; }
/**
* Marks the current position in the stream to be rewound to later.
* Note that you can only rewind back as far as 2 * block_size_ - 1!
*/
void Mark();
/**
* Rewinds the stream back to the marked position. This will throw an
* exception if the marked position is too far away.
*/
void Rewind();
/**
* Moves the stream forward to the next record. This internally may
* buffer a block for the purposes of rewinding.
*/
RewindableStream& operator++();
/**
* Poisons the stream. This sends any buffered blocks down the chain
* and sends a poison block as well (sending at most 2 non-poison and 1
* poison block).
*/
void Poison();
private:
void AppendBlock();
void Flush(std::deque<Block>::iterator to);
std::deque<Block> blocks_;
// current_ is in blocks_[blocks_it_] unless poisoned_.
std::size_t blocks_it_;
std::size_t entry_size_;
std::size_t block_size_;
std::size_t block_count_;
uint8_t *marked_, *current_;
const uint8_t *block_end_;
PCQueue<Block> *in_, *out_;
// Have we hit poison at the end of the stream, even if rewinding?
bool hit_poison_;
// Is the curren position poison?
bool poisoned_;
WorkerProgress progress_;
};
inline Chain &operator>>(Chain &chain, RewindableStream &stream) {
stream.Init(chain.Add());
return chain;
}
}
}
#endif
|