buddy

node MVC discord bot
Log | Files | Refs | README

delayed_stream.js (2319B)


      1 var Stream = require('stream').Stream;
      2 var util = require('util');
      3 
      4 module.exports = DelayedStream;
      5 function DelayedStream() {
      6   this.source = null;
      7   this.dataSize = 0;
      8   this.maxDataSize = 1024 * 1024;
      9   this.pauseStream = true;
     10 
     11   this._maxDataSizeExceeded = false;
     12   this._released = false;
     13   this._bufferedEvents = [];
     14 }
     15 util.inherits(DelayedStream, Stream);
     16 
     17 DelayedStream.create = function(source, options) {
     18   var delayedStream = new this();
     19 
     20   options = options || {};
     21   for (var option in options) {
     22     delayedStream[option] = options[option];
     23   }
     24 
     25   delayedStream.source = source;
     26 
     27   var realEmit = source.emit;
     28   source.emit = function() {
     29     delayedStream._handleEmit(arguments);
     30     return realEmit.apply(source, arguments);
     31   };
     32 
     33   source.on('error', function() {});
     34   if (delayedStream.pauseStream) {
     35     source.pause();
     36   }
     37 
     38   return delayedStream;
     39 };
     40 
     41 Object.defineProperty(DelayedStream.prototype, 'readable', {
     42   configurable: true,
     43   enumerable: true,
     44   get: function() {
     45     return this.source.readable;
     46   }
     47 });
     48 
     49 DelayedStream.prototype.setEncoding = function() {
     50   return this.source.setEncoding.apply(this.source, arguments);
     51 };
     52 
     53 DelayedStream.prototype.resume = function() {
     54   if (!this._released) {
     55     this.release();
     56   }
     57 
     58   this.source.resume();
     59 };
     60 
     61 DelayedStream.prototype.pause = function() {
     62   this.source.pause();
     63 };
     64 
     65 DelayedStream.prototype.release = function() {
     66   this._released = true;
     67 
     68   this._bufferedEvents.forEach(function(args) {
     69     this.emit.apply(this, args);
     70   }.bind(this));
     71   this._bufferedEvents = [];
     72 };
     73 
     74 DelayedStream.prototype.pipe = function() {
     75   var r = Stream.prototype.pipe.apply(this, arguments);
     76   this.resume();
     77   return r;
     78 };
     79 
     80 DelayedStream.prototype._handleEmit = function(args) {
     81   if (this._released) {
     82     this.emit.apply(this, args);
     83     return;
     84   }
     85 
     86   if (args[0] === 'data') {
     87     this.dataSize += args[1].length;
     88     this._checkIfMaxDataSizeExceeded();
     89   }
     90 
     91   this._bufferedEvents.push(args);
     92 };
     93 
     94 DelayedStream.prototype._checkIfMaxDataSizeExceeded = function() {
     95   if (this._maxDataSizeExceeded) {
     96     return;
     97   }
     98 
     99   if (this.dataSize <= this.maxDataSize) {
    100     return;
    101   }
    102 
    103   this._maxDataSizeExceeded = true;
    104   var message =
    105     'DelayedStream#maxDataSize of ' + this.maxDataSize + ' bytes exceeded.'
    106   this.emit('error', new Error(message));
    107 };