twitst4tz

twitter statistics web application
Log | Files | Refs | README | LICENSE

streamify.js (2964B)


      1 var async = require('./async.js');
      2 
      3 // API
      4 module.exports = {
      5   iterator: wrapIterator,
      6   callback: wrapCallback
      7 };
      8 
      9 /**
     10  * Wraps iterators with long signature
     11  *
     12  * @this    ReadableAsyncKit#
     13  * @param   {function} iterator - function to wrap
     14  * @returns {function} - wrapped function
     15  */
     16 function wrapIterator(iterator)
     17 {
     18   var stream = this;
     19 
     20   return function(item, key, cb)
     21   {
     22     var aborter
     23       , wrappedCb = async(wrapIteratorCallback.call(stream, cb, key))
     24       ;
     25 
     26     stream.jobs[key] = wrappedCb;
     27 
     28     // it's either shortcut (item, cb)
     29     if (iterator.length == 2)
     30     {
     31       aborter = iterator(item, wrappedCb);
     32     }
     33     // or long format (item, key, cb)
     34     else
     35     {
     36       aborter = iterator(item, key, wrappedCb);
     37     }
     38 
     39     return aborter;
     40   };
     41 }
     42 
     43 /**
     44  * Wraps provided callback function
     45  * allowing to execute snitch function before
     46  * real callback
     47  *
     48  * @this    ReadableAsyncKit#
     49  * @param   {function} callback - function to wrap
     50  * @returns {function} - wrapped function
     51  */
     52 function wrapCallback(callback)
     53 {
     54   var stream = this;
     55 
     56   var wrapped = function(error, result)
     57   {
     58     return finisher.call(stream, error, result, callback);
     59   };
     60 
     61   return wrapped;
     62 }
     63 
     64 /**
     65  * Wraps provided iterator callback function
     66  * makes sure snitch only called once,
     67  * but passes secondary calls to the original callback
     68  *
     69  * @this    ReadableAsyncKit#
     70  * @param   {function} callback - callback to wrap
     71  * @param   {number|string} key - iteration key
     72  * @returns {function} wrapped callback
     73  */
     74 function wrapIteratorCallback(callback, key)
     75 {
     76   var stream = this;
     77 
     78   return function(error, output)
     79   {
     80     // don't repeat yourself
     81     if (!(key in stream.jobs))
     82     {
     83       callback(error, output);
     84       return;
     85     }
     86 
     87     // clean up jobs
     88     delete stream.jobs[key];
     89 
     90     return streamer.call(stream, error, {key: key, value: output}, callback);
     91   };
     92 }
     93 
     94 /**
     95  * Stream wrapper for iterator callback
     96  *
     97  * @this  ReadableAsyncKit#
     98  * @param {mixed} error - error response
     99  * @param {mixed} output - iterator output
    100  * @param {function} callback - callback that expects iterator results
    101  */
    102 function streamer(error, output, callback)
    103 {
    104   if (error && !this.error)
    105   {
    106     this.error = error;
    107     this.pause();
    108     this.emit('error', error);
    109     // send back value only, as expected
    110     callback(error, output && output.value);
    111     return;
    112   }
    113 
    114   // stream stuff
    115   this.push(output);
    116 
    117   // back to original track
    118   // send back value only, as expected
    119   callback(error, output && output.value);
    120 }
    121 
    122 /**
    123  * Stream wrapper for finishing callback
    124  *
    125  * @this  ReadableAsyncKit#
    126  * @param {mixed} error - error response
    127  * @param {mixed} output - iterator output
    128  * @param {function} callback - callback that expects final results
    129  */
    130 function finisher(error, output, callback)
    131 {
    132   // signal end of the stream
    133   // only for successfully finished streams
    134   if (!error)
    135   {
    136     this.push(null);
    137   }
    138 
    139   // back to original track
    140   callback(error, output);
    141 }