delayed_stream.js (2319B)
1 var Stream = require('stream').Stream; 2 var util = require('util'); 3 4 module.exports = DelayedStream; 5 function DelayedStream() { 6 this.source = null; 7 this.dataSize = 0; 8 this.maxDataSize = 1024 * 1024; 9 this.pauseStream = true; 10 11 this._maxDataSizeExceeded = false; 12 this._released = false; 13 this._bufferedEvents = []; 14 } 15 util.inherits(DelayedStream, Stream); 16 17 DelayedStream.create = function(source, options) { 18 var delayedStream = new this(); 19 20 options = options || {}; 21 for (var option in options) { 22 delayedStream[option] = options[option]; 23 } 24 25 delayedStream.source = source; 26 27 var realEmit = source.emit; 28 source.emit = function() { 29 delayedStream._handleEmit(arguments); 30 return realEmit.apply(source, arguments); 31 }; 32 33 source.on('error', function() {}); 34 if (delayedStream.pauseStream) { 35 source.pause(); 36 } 37 38 return delayedStream; 39 }; 40 41 Object.defineProperty(DelayedStream.prototype, 'readable', { 42 configurable: true, 43 enumerable: true, 44 get: function() { 45 return this.source.readable; 46 } 47 }); 48 49 DelayedStream.prototype.setEncoding = function() { 50 return this.source.setEncoding.apply(this.source, arguments); 51 }; 52 53 DelayedStream.prototype.resume = function() { 54 if (!this._released) { 55 this.release(); 56 } 57 58 this.source.resume(); 59 }; 60 61 DelayedStream.prototype.pause = function() { 62 this.source.pause(); 63 }; 64 65 DelayedStream.prototype.release = function() { 66 this._released = true; 67 68 this._bufferedEvents.forEach(function(args) { 69 this.emit.apply(this, args); 70 }.bind(this)); 71 this._bufferedEvents = []; 72 }; 73 74 DelayedStream.prototype.pipe = function() { 75 var r = Stream.prototype.pipe.apply(this, arguments); 76 this.resume(); 77 return r; 78 }; 79 80 DelayedStream.prototype._handleEmit = function(args) { 81 if (this._released) { 82 this.emit.apply(this, args); 83 return; 84 } 85 86 if (args[0] === 'data') { 87 this.dataSize += args[1].length; 88 this._checkIfMaxDataSizeExceeded(); 89 } 90 91 this._bufferedEvents.push(args); 92 }; 93 94 DelayedStream.prototype._checkIfMaxDataSizeExceeded = function() { 95 if (this._maxDataSizeExceeded) { 96 return; 97 } 98 99 if (this.dataSize <= this.maxDataSize) { 100 return; 101 } 102 103 this._maxDataSizeExceeded = true; 104 var message = 105 'DelayedStream#maxDataSize of ' + this.maxDataSize + ' bytes exceeded.' 106 this.emit('error', new Error(message)); 107 };