index.js (1234B)
1 'use strict'; 2 3 function Queue(options) { 4 if (!(this instanceof Queue)) { 5 return new Queue(options); 6 } 7 8 options = options || {}; 9 this.concurrency = options.concurrency || Infinity; 10 this.pending = 0; 11 this.jobs = []; 12 this.cbs = []; 13 this._done = done.bind(this); 14 } 15 16 var arrayAddMethods = [ 17 'push', 18 'unshift', 19 'splice' 20 ]; 21 22 arrayAddMethods.forEach(function(method) { 23 Queue.prototype[method] = function() { 24 var methodResult = Array.prototype[method].apply(this.jobs, arguments); 25 this._run(); 26 return methodResult; 27 }; 28 }); 29 30 Object.defineProperty(Queue.prototype, 'length', { 31 get: function() { 32 return this.pending + this.jobs.length; 33 } 34 }); 35 36 Queue.prototype._run = function() { 37 if (this.pending === this.concurrency) { 38 return; 39 } 40 if (this.jobs.length) { 41 var job = this.jobs.shift(); 42 this.pending++; 43 job(this._done); 44 this._run(); 45 } 46 47 if (this.pending === 0) { 48 while (this.cbs.length !== 0) { 49 var cb = this.cbs.pop(); 50 process.nextTick(cb); 51 } 52 } 53 }; 54 55 Queue.prototype.onDone = function(cb) { 56 if (typeof cb === 'function') { 57 this.cbs.push(cb); 58 this._run(); 59 } 60 }; 61 62 function done() { 63 this.pending--; 64 this._run(); 65 } 66 67 module.exports = Queue;