combined_stream.js (4687B)
1 var util = require('util'); 2 var Stream = require('stream').Stream; 3 var DelayedStream = require('delayed-stream'); 4 5 module.exports = CombinedStream; 6 function CombinedStream() { 7 this.writable = false; 8 this.readable = true; 9 this.dataSize = 0; 10 this.maxDataSize = 2 * 1024 * 1024; 11 this.pauseStreams = true; 12 13 this._released = false; 14 this._streams = []; 15 this._currentStream = null; 16 this._insideLoop = false; 17 this._pendingNext = false; 18 } 19 util.inherits(CombinedStream, Stream); 20 21 CombinedStream.create = function(options) { 22 var combinedStream = new this(); 23 24 options = options || {}; 25 for (var option in options) { 26 combinedStream[option] = options[option]; 27 } 28 29 return combinedStream; 30 }; 31 32 CombinedStream.isStreamLike = function(stream) { 33 return (typeof stream !== 'function') 34 && (typeof stream !== 'string') 35 && (typeof stream !== 'boolean') 36 && (typeof stream !== 'number') 37 && (!Buffer.isBuffer(stream)); 38 }; 39 40 CombinedStream.prototype.append = function(stream) { 41 var isStreamLike = CombinedStream.isStreamLike(stream); 42 43 if (isStreamLike) { 44 if (!(stream instanceof DelayedStream)) { 45 var newStream = DelayedStream.create(stream, { 46 maxDataSize: Infinity, 47 pauseStream: this.pauseStreams, 48 }); 49 stream.on('data', this._checkDataSize.bind(this)); 50 stream = newStream; 51 } 52 53 this._handleErrors(stream); 54 55 if (this.pauseStreams) { 56 stream.pause(); 57 } 58 } 59 60 this._streams.push(stream); 61 return this; 62 }; 63 64 CombinedStream.prototype.pipe = function(dest, options) { 65 Stream.prototype.pipe.call(this, dest, options); 66 this.resume(); 67 return dest; 68 }; 69 70 CombinedStream.prototype._getNext = function() { 71 this._currentStream = null; 72 73 if (this._insideLoop) { 74 this._pendingNext = true; 75 return; // defer call 76 } 77 78 this._insideLoop = true; 79 try { 80 do { 81 this._pendingNext = false; 82 this._realGetNext(); 83 } while (this._pendingNext); 84 } finally { 85 this._insideLoop = false; 86 } 87 }; 88 89 CombinedStream.prototype._realGetNext = function() { 90 var stream = this._streams.shift(); 91 92 93 if (typeof stream == 'undefined') { 94 this.end(); 95 return; 96 } 97 98 if (typeof stream !== 'function') { 99 this._pipeNext(stream); 100 return; 101 } 102 103 var getStream = stream; 104 getStream(function(stream) { 105 var isStreamLike = CombinedStream.isStreamLike(stream); 106 if (isStreamLike) { 107 stream.on('data', this._checkDataSize.bind(this)); 108 this._handleErrors(stream); 109 } 110 111 this._pipeNext(stream); 112 }.bind(this)); 113 }; 114 115 CombinedStream.prototype._pipeNext = function(stream) { 116 this._currentStream = stream; 117 118 var isStreamLike = CombinedStream.isStreamLike(stream); 119 if (isStreamLike) { 120 stream.on('end', this._getNext.bind(this)); 121 stream.pipe(this, {end: false}); 122 return; 123 } 124 125 var value = stream; 126 this.write(value); 127 this._getNext(); 128 }; 129 130 CombinedStream.prototype._handleErrors = function(stream) { 131 var self = this; 132 stream.on('error', function(err) { 133 self._emitError(err); 134 }); 135 }; 136 137 CombinedStream.prototype.write = function(data) { 138 this.emit('data', data); 139 }; 140 141 CombinedStream.prototype.pause = function() { 142 if (!this.pauseStreams) { 143 return; 144 } 145 146 if(this.pauseStreams && this._currentStream && typeof(this._currentStream.pause) == 'function') this._currentStream.pause(); 147 this.emit('pause'); 148 }; 149 150 CombinedStream.prototype.resume = function() { 151 if (!this._released) { 152 this._released = true; 153 this.writable = true; 154 this._getNext(); 155 } 156 157 if(this.pauseStreams && this._currentStream && typeof(this._currentStream.resume) == 'function') this._currentStream.resume(); 158 this.emit('resume'); 159 }; 160 161 CombinedStream.prototype.end = function() { 162 this._reset(); 163 this.emit('end'); 164 }; 165 166 CombinedStream.prototype.destroy = function() { 167 this._reset(); 168 this.emit('close'); 169 }; 170 171 CombinedStream.prototype._reset = function() { 172 this.writable = false; 173 this._streams = []; 174 this._currentStream = null; 175 }; 176 177 CombinedStream.prototype._checkDataSize = function() { 178 this._updateDataSize(); 179 if (this.dataSize <= this.maxDataSize) { 180 return; 181 } 182 183 var message = 184 'DelayedStream#maxDataSize of ' + this.maxDataSize + ' bytes exceeded.'; 185 this._emitError(new Error(message)); 186 }; 187 188 CombinedStream.prototype._updateDataSize = function() { 189 this.dataSize = 0; 190 191 var self = this; 192 this._streams.forEach(function(stream) { 193 if (!stream.dataSize) { 194 return; 195 } 196 197 self.dataSize += stream.dataSize; 198 }); 199 200 if (this._currentStream && this._currentStream.dataSize) { 201 this.dataSize += this._currentStream.dataSize; 202 } 203 }; 204 205 CombinedStream.prototype._emitError = function(err) { 206 this._reset(); 207 this.emit('error', err); 208 };