index.js 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. 'use strict';
  2. function Queue(options) {
  3. if (!(this instanceof Queue)) {
  4. return new Queue(options);
  5. }
  6. options = options || {};
  7. this.concurrency = options.concurrency || Infinity;
  8. this.pending = 0;
  9. this.jobs = [];
  10. this.cbs = [];
  11. this._done = done.bind(this);
  12. }
  13. var arrayAddMethods = [
  14. 'push',
  15. 'unshift',
  16. 'splice'
  17. ];
  18. arrayAddMethods.forEach(function(method) {
  19. Queue.prototype[method] = function() {
  20. var methodResult = Array.prototype[method].apply(this.jobs, arguments);
  21. this._run();
  22. return methodResult;
  23. };
  24. });
  25. Object.defineProperty(Queue.prototype, 'length', {
  26. get: function() {
  27. return this.pending + this.jobs.length;
  28. }
  29. });
  30. Queue.prototype._run = function() {
  31. if (this.pending === this.concurrency) {
  32. return;
  33. }
  34. if (this.jobs.length) {
  35. var job = this.jobs.shift();
  36. this.pending++;
  37. job(this._done);
  38. this._run();
  39. }
  40. if (this.pending === 0) {
  41. while (this.cbs.length !== 0) {
  42. var cb = this.cbs.pop();
  43. process.nextTick(cb);
  44. }
  45. }
  46. };
  47. Queue.prototype.onDone = function(cb) {
  48. if (typeof cb === 'function') {
  49. this.cbs.push(cb);
  50. this._run();
  51. }
  52. };
  53. function done() {
  54. this.pending--;
  55. this._run();
  56. }
  57. module.exports = Queue;