index.js (1460B)
1 2 'use strict'; 3 4 var Stream = require('stream') 5 6 // from 7 // 8 // a stream that reads from an source. 9 // source may be an array, or a function. 10 // from handles pause behaviour for you. 11 12 module.exports = 13 function from (source) { 14 if(Array.isArray(source)) { 15 var source_index = 0, source_len = source.length; 16 return from (function (i) { 17 if(source_index < source_len) 18 this.emit('data', source[source_index++]) 19 else 20 this.emit('end') 21 return true 22 }) 23 } 24 var s = new Stream(), i = 0 25 s.ended = false 26 s.started = false 27 s.readable = true 28 s.writable = false 29 s.paused = false 30 s.ended = false 31 s.pause = function () { 32 s.started = true 33 s.paused = true 34 } 35 function next () { 36 s.started = true 37 if(s.ended) return 38 while(!s.ended && !s.paused && source.call(s, i++, function () { 39 if(!s.ended && !s.paused) 40 process.nextTick(next); 41 })) 42 ; 43 } 44 s.resume = function () { 45 s.started = true 46 s.paused = false 47 next() 48 } 49 s.on('end', function () { 50 s.ended = true 51 s.readable = false 52 process.nextTick(s.destroy) 53 }) 54 s.destroy = function () { 55 s.ended = true 56 s.emit('close') 57 } 58 /* 59 by default, the stream will start emitting at nextTick 60 if you want, you can pause it, after pipeing. 61 you can also resume before next tick, and that will also 62 work. 63 */ 64 process.nextTick(function () { 65 if(!s.started) s.resume() 66 }) 67 return s 68 }