readable_asynckit.js (1611B)
1 var streamify = require('./streamify.js') 2 , defer = require('./defer.js') 3 ; 4 5 // API 6 module.exports = ReadableAsyncKit; 7 8 /** 9 * Base constructor for all streams 10 * used to hold properties/methods 11 */ 12 function ReadableAsyncKit() 13 { 14 ReadableAsyncKit.super_.apply(this, arguments); 15 16 // list of active jobs 17 this.jobs = {}; 18 19 // add stream methods 20 this.destroy = destroy; 21 this._start = _start; 22 this._read = _read; 23 } 24 25 /** 26 * Destroys readable stream, 27 * by aborting outstanding jobs 28 * 29 * @returns {void} 30 */ 31 function destroy() 32 { 33 if (this.destroyed) 34 { 35 return; 36 } 37 38 this.destroyed = true; 39 40 if (typeof this.terminator == 'function') 41 { 42 this.terminator(); 43 } 44 } 45 46 /** 47 * Starts provided jobs in async manner 48 * 49 * @private 50 */ 51 function _start() 52 { 53 // first argument – runner function 54 var runner = arguments[0] 55 // take away first argument 56 , args = Array.prototype.slice.call(arguments, 1) 57 // second argument - input data 58 , input = args[0] 59 // last argument - result callback 60 , endCb = streamify.callback.call(this, args[args.length - 1]) 61 ; 62 63 args[args.length - 1] = endCb; 64 // third argument - iterator 65 args[1] = streamify.iterator.call(this, args[1]); 66 67 // allow time for proper setup 68 defer(function() 69 { 70 if (!this.destroyed) 71 { 72 this.terminator = runner.apply(null, args); 73 } 74 else 75 { 76 endCb(null, Array.isArray(input) ? [] : {}); 77 } 78 }.bind(this)); 79 } 80 81 82 /** 83 * Implement _read to comply with Readable streams 84 * Doesn't really make sense for flowing object mode 85 * 86 * @private 87 */ 88 function _read() 89 { 90 91 }