File size: 3,241 Bytes
1ce325b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#ifndef UTIL_STREAM_REWINDABLE_STREAM_H
#define UTIL_STREAM_REWINDABLE_STREAM_H

#include "chain.hh"

#include <boost/noncopyable.hpp>

#include <deque>

namespace util {
namespace stream {

/**
 * A RewindableStream is like a Stream (but one that is only used for
 * creating input at the start of a chain) except that it can be rewound to
 * be able to re-write a part of the stream before it is sent. Rewinding
 * has a limit of 2 * block_size_ - 1 in distance (it does *not* buffer an
 * entire stream into memory, only a maximum of 2 * block_size_).
 */
class RewindableStream : boost::noncopyable {
  public:
    /**
     * Creates an uninitialized RewindableStream. You **must** call Init()
     * on it later!
     */
    RewindableStream();

    ~RewindableStream() {
      Poison();
    }

    /**
     * Initializes an existing RewindableStream at a specific position in
     * a Chain.
     *
     * @param position The position in the chain to get input from and
     *  produce output on
     */
    void Init(const ChainPosition &position);

    /**
     * Constructs a RewindableStream at a specific position in a Chain all
     * in one step.
     *
     * Equivalent to RewindableStream a(); a.Init(....);
     */
    explicit RewindableStream(const ChainPosition &position)
      : in_(NULL) {
      Init(position);
    }

    /**
     * Gets the record at the current stream position. Const version.
     */
    const void *Get() const {
      assert(!poisoned_);
      assert(current_);
      return current_;
    }

    /**
     * Gets the record at the current stream position.
     */
    void *Get() {
      assert(!poisoned_);
      assert(current_);
      return current_;
    }

    operator bool() const { return !poisoned_; }

    bool operator!() const { return poisoned_; }

    /**
     * Marks the current position in the stream to be rewound to later.
     * Note that you can only rewind back as far as 2 * block_size_ - 1!
     */
    void Mark();

    /**
     * Rewinds the stream back to the marked position. This will throw an
     * exception if the marked position is too far away.
     */
    void Rewind();

    /**
     * Moves the stream forward to the next record. This internally may
     * buffer a block for the purposes of rewinding.
     */
    RewindableStream& operator++();

    /**
     * Poisons the stream. This sends any buffered blocks down the chain
     * and sends a poison block as well (sending at most 2 non-poison and 1
     * poison block).
     */
    void Poison();

  private:
    void AppendBlock();

    void Flush(std::deque<Block>::iterator to);

    std::deque<Block> blocks_;
    // current_ is in blocks_[blocks_it_] unless poisoned_.
    std::size_t blocks_it_;

    std::size_t entry_size_;
    std::size_t block_size_;
    std::size_t block_count_;

    uint8_t *marked_, *current_;
    const uint8_t *block_end_;

    PCQueue<Block> *in_, *out_;

    // Have we hit poison at the end of the stream, even if rewinding?
    bool hit_poison_;
    // Is the curren position poison?
    bool poisoned_;

    WorkerProgress progress_;
};

inline Chain &operator>>(Chain &chain, RewindableStream &stream) {
  stream.Init(chain.Add());
  return chain;
}

}
}
#endif