twitst4tz

twitter statistics web application
Log | Files | Refs | README | LICENSE

readable_asynckit.js (1611B)


      1 var streamify = require('./streamify.js')
      2   , defer     = require('./defer.js')
      3   ;
      4 
      5 // API
      6 module.exports = ReadableAsyncKit;
      7 
      8 /**
      9  * Base constructor for all streams
     10  * used to hold properties/methods
     11  */
     12 function ReadableAsyncKit()
     13 {
     14   ReadableAsyncKit.super_.apply(this, arguments);
     15 
     16   // list of active jobs
     17   this.jobs = {};
     18 
     19   // add stream methods
     20   this.destroy = destroy;
     21   this._start  = _start;
     22   this._read   = _read;
     23 }
     24 
     25 /**
     26  * Destroys readable stream,
     27  * by aborting outstanding jobs
     28  *
     29  * @returns {void}
     30  */
     31 function destroy()
     32 {
     33   if (this.destroyed)
     34   {
     35     return;
     36   }
     37 
     38   this.destroyed = true;
     39 
     40   if (typeof this.terminator == 'function')
     41   {
     42     this.terminator();
     43   }
     44 }
     45 
     46 /**
     47  * Starts provided jobs in async manner
     48  *
     49  * @private
     50  */
     51 function _start()
     52 {
     53   // first argument – runner function
     54   var runner = arguments[0]
     55     // take away first argument
     56     , args   = Array.prototype.slice.call(arguments, 1)
     57       // second argument - input data
     58     , input  = args[0]
     59       // last argument - result callback
     60     , endCb  = streamify.callback.call(this, args[args.length - 1])
     61     ;
     62 
     63   args[args.length - 1] = endCb;
     64   // third argument - iterator
     65   args[1] = streamify.iterator.call(this, args[1]);
     66 
     67   // allow time for proper setup
     68   defer(function()
     69   {
     70     if (!this.destroyed)
     71     {
     72       this.terminator = runner.apply(null, args);
     73     }
     74     else
     75     {
     76       endCb(null, Array.isArray(input) ? [] : {});
     77     }
     78   }.bind(this));
     79 }
     80 
     81 
     82 /**
     83  * Implement _read to comply with Readable streams
     84  * Doesn't really make sense for flowing object mode
     85  *
     86  * @private
     87  */
     88 function _read()
     89 {
     90 
     91 }