Spaces:
Running
Running
/* | |
* buffered-stream.js: A simple(r) Stream which is partially buffered into memory. | |
* | |
* (C) 2010, Mikeal Rogers | |
* | |
* Adapted for Flatiron | |
* (C) 2011, Charlie Robbins & the Contributors | |
* MIT LICENSE | |
* | |
*/ | |
var events = require('events'), | |
fs = require('fs'), | |
stream = require('stream'), | |
util = require('util'); | |
// | |
// ### function BufferedStream (limit) | |
// #### @limit {number} **Optional** Size of the buffer to limit | |
// Constructor function for the BufferedStream object responsible for | |
// maintaining a stream interface which can also persist to memory | |
// temporarily. | |
// | |
var BufferedStream = module.exports = function (limit) { | |
events.EventEmitter.call(this); | |
if (typeof limit === 'undefined') { | |
limit = Infinity; | |
} | |
this.limit = limit; | |
this.size = 0; | |
this.chunks = []; | |
this.writable = true; | |
this.readable = true; | |
this._buffer = true; | |
}; | |
util.inherits(BufferedStream, stream.Stream); | |
Object.defineProperty(BufferedStream.prototype, 'buffer', { | |
get: function () { | |
return this._buffer; | |
}, | |
set: function (value) { | |
if (!value && this.chunks) { | |
var self = this; | |
this.chunks.forEach(function (c) { self.emit('data', c) }); | |
if (this.ended) this.emit('end'); | |
this.size = 0; | |
delete this.chunks; | |
} | |
this._buffer = value; | |
} | |
}); | |
BufferedStream.prototype.pipe = function () { | |
var self = this, | |
dest; | |
if (self.resume) { | |
self.resume(); | |
} | |
dest = stream.Stream.prototype.pipe.apply(self, arguments); | |
// | |
// just incase you are piping to two streams, do not emit data twice. | |
// note: you can pipe twice, but you need to pipe both streams in the same tick. | |
// (this is normal for streams) | |
// | |
if (this.piped) { | |
return dest; | |
} | |
process.nextTick(function () { | |
if (self.chunks) { | |
self.chunks.forEach(function (c) { self.emit('data', c) }); | |
self.size = 0; | |
delete self.chunks; | |
} | |
if (!self.readable) { | |
if (self.ended) { | |
self.emit('end'); | |
} | |
else if (self.closed) { | |
self.emit('close'); | |
} | |
} | |
}); | |
this.piped = true; | |
return dest; | |
}; | |
BufferedStream.prototype.write = function (chunk) { | |
if (!this.chunks || this.piped) { | |
this.emit('data', chunk); | |
return; | |
} | |
this.chunks.push(chunk); | |
this.size += chunk.length; | |
if (this.limit < this.size) { | |
this.pause(); | |
} | |
}; | |
BufferedStream.prototype.end = function () { | |
this.readable = false; | |
this.ended = true; | |
this.emit('end'); | |
}; | |
BufferedStream.prototype.destroy = function () { | |
this.readable = false; | |
this.writable = false; | |
delete this.chunks; | |
}; | |
BufferedStream.prototype.close = function () { | |
this.readable = false; | |
this.closed = true; | |
}; | |
if (!stream.Stream.prototype.pause) { | |
BufferedStream.prototype.pause = function () { | |
this.emit('pause'); | |
}; | |
} | |
if (!stream.Stream.prototype.resume) { | |
BufferedStream.prototype.resume = function () { | |
this.emit('resume'); | |
}; | |
} | |