streamify.js (2964B)
1 var async = require('./async.js'); 2 3 // API 4 module.exports = { 5 iterator: wrapIterator, 6 callback: wrapCallback 7 }; 8 9 /** 10 * Wraps iterators with long signature 11 * 12 * @this ReadableAsyncKit# 13 * @param {function} iterator - function to wrap 14 * @returns {function} - wrapped function 15 */ 16 function wrapIterator(iterator) 17 { 18 var stream = this; 19 20 return function(item, key, cb) 21 { 22 var aborter 23 , wrappedCb = async(wrapIteratorCallback.call(stream, cb, key)) 24 ; 25 26 stream.jobs[key] = wrappedCb; 27 28 // it's either shortcut (item, cb) 29 if (iterator.length == 2) 30 { 31 aborter = iterator(item, wrappedCb); 32 } 33 // or long format (item, key, cb) 34 else 35 { 36 aborter = iterator(item, key, wrappedCb); 37 } 38 39 return aborter; 40 }; 41 } 42 43 /** 44 * Wraps provided callback function 45 * allowing to execute snitch function before 46 * real callback 47 * 48 * @this ReadableAsyncKit# 49 * @param {function} callback - function to wrap 50 * @returns {function} - wrapped function 51 */ 52 function wrapCallback(callback) 53 { 54 var stream = this; 55 56 var wrapped = function(error, result) 57 { 58 return finisher.call(stream, error, result, callback); 59 }; 60 61 return wrapped; 62 } 63 64 /** 65 * Wraps provided iterator callback function 66 * makes sure snitch only called once, 67 * but passes secondary calls to the original callback 68 * 69 * @this ReadableAsyncKit# 70 * @param {function} callback - callback to wrap 71 * @param {number|string} key - iteration key 72 * @returns {function} wrapped callback 73 */ 74 function wrapIteratorCallback(callback, key) 75 { 76 var stream = this; 77 78 return function(error, output) 79 { 80 // don't repeat yourself 81 if (!(key in stream.jobs)) 82 { 83 callback(error, output); 84 return; 85 } 86 87 // clean up jobs 88 delete stream.jobs[key]; 89 90 return streamer.call(stream, error, {key: key, value: output}, callback); 91 }; 92 } 93 94 /** 95 * Stream wrapper for iterator callback 96 * 97 * @this ReadableAsyncKit# 98 * @param {mixed} error - error response 99 * @param {mixed} output - iterator output 100 * @param {function} callback - callback that expects iterator results 101 */ 102 function streamer(error, output, callback) 103 { 104 if (error && !this.error) 105 { 106 this.error = error; 107 this.pause(); 108 this.emit('error', error); 109 // send back value only, as expected 110 callback(error, output && output.value); 111 return; 112 } 113 114 // stream stuff 115 this.push(output); 116 117 // back to original track 118 // send back value only, as expected 119 callback(error, output && output.value); 120 } 121 122 /** 123 * Stream wrapper for finishing callback 124 * 125 * @this ReadableAsyncKit# 126 * @param {mixed} error - error response 127 * @param {mixed} output - iterator output 128 * @param {function} callback - callback that expects final results 129 */ 130 function finisher(error, output, callback) 131 { 132 // signal end of the stream 133 // only for successfully finished streams 134 if (!error) 135 { 136 this.push(null); 137 } 138 139 // back to original track 140 callback(error, output); 141 }