Spaces:
Running
Running
File size: 2,956 Bytes
05a77ff |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
/*
* buffered-stream.js: A simple(r) Stream which is partially buffered into memory.
*
* (C) 2010, Mikeal Rogers
*
* Adapted for Flatiron
* (C) 2011, Charlie Robbins & the Contributors
* MIT LICENSE
*
*/
var events = require('events'),
fs = require('fs'),
stream = require('stream'),
util = require('util');
//
// ### function BufferedStream (limit)
// #### @limit {number} **Optional** Size of the buffer to limit
// Constructor function for the BufferedStream object responsible for
// maintaining a stream interface which can also persist to memory
// temporarily.
//
var BufferedStream = module.exports = function (limit) {
events.EventEmitter.call(this);
if (typeof limit === 'undefined') {
limit = Infinity;
}
this.limit = limit;
this.size = 0;
this.chunks = [];
this.writable = true;
this.readable = true;
this._buffer = true;
};
util.inherits(BufferedStream, stream.Stream);
Object.defineProperty(BufferedStream.prototype, 'buffer', {
get: function () {
return this._buffer;
},
set: function (value) {
if (!value && this.chunks) {
var self = this;
this.chunks.forEach(function (c) { self.emit('data', c) });
if (this.ended) this.emit('end');
this.size = 0;
delete this.chunks;
}
this._buffer = value;
}
});
BufferedStream.prototype.pipe = function () {
var self = this,
dest;
if (self.resume) {
self.resume();
}
dest = stream.Stream.prototype.pipe.apply(self, arguments);
//
// just incase you are piping to two streams, do not emit data twice.
// note: you can pipe twice, but you need to pipe both streams in the same tick.
// (this is normal for streams)
//
if (this.piped) {
return dest;
}
process.nextTick(function () {
if (self.chunks) {
self.chunks.forEach(function (c) { self.emit('data', c) });
self.size = 0;
delete self.chunks;
}
if (!self.readable) {
if (self.ended) {
self.emit('end');
}
else if (self.closed) {
self.emit('close');
}
}
});
this.piped = true;
return dest;
};
BufferedStream.prototype.write = function (chunk) {
if (!this.chunks || this.piped) {
this.emit('data', chunk);
return;
}
this.chunks.push(chunk);
this.size += chunk.length;
if (this.limit < this.size) {
this.pause();
}
};
BufferedStream.prototype.end = function () {
this.readable = false;
this.ended = true;
this.emit('end');
};
BufferedStream.prototype.destroy = function () {
this.readable = false;
this.writable = false;
delete this.chunks;
};
BufferedStream.prototype.close = function () {
this.readable = false;
this.closed = true;
};
if (!stream.Stream.prototype.pause) {
BufferedStream.prototype.pause = function () {
this.emit('pause');
};
}
if (!stream.Stream.prototype.resume) {
BufferedStream.prototype.resume = function () {
this.emit('resume');
};
}
|