index.js 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. var Transform = require('readable-stream').Transform;
  2. var inherits = require('inherits');
  3. var cyclist = require('cyclist');
  4. var util = require('util');
  5. var ParallelTransform = function(maxParallel, opts, ontransform) {
  6. if (!(this instanceof ParallelTransform)) return new ParallelTransform(maxParallel, opts, ontransform);
  7. if (typeof maxParallel === 'function') {
  8. ontransform = maxParallel;
  9. opts = null;
  10. maxParallel = 1;
  11. }
  12. if (typeof opts === 'function') {
  13. ontransform = opts;
  14. opts = null;
  15. }
  16. if (!opts) opts = {};
  17. if (!opts.highWaterMark) opts.highWaterMark = Math.max(maxParallel, 16);
  18. if (opts.objectMode !== false) opts.objectMode = true;
  19. Transform.call(this, opts);
  20. this._maxParallel = maxParallel;
  21. this._ontransform = ontransform;
  22. this._destroyed = false;
  23. this._flushed = false;
  24. this._ordered = opts.ordered !== false;
  25. this._buffer = this._ordered ? cyclist(maxParallel) : [];
  26. this._top = 0;
  27. this._bottom = 0;
  28. this._ondrain = null;
  29. };
  30. inherits(ParallelTransform, Transform);
  31. ParallelTransform.prototype.destroy = function() {
  32. if (this._destroyed) return;
  33. this._destroyed = true;
  34. this.emit('close');
  35. };
  36. ParallelTransform.prototype._transform = function(chunk, enc, callback) {
  37. var self = this;
  38. var pos = this._top++;
  39. this._ontransform(chunk, function(err, data) {
  40. if (self._destroyed) return;
  41. if (err) {
  42. self.emit('error', err);
  43. self.push(null);
  44. self.destroy();
  45. return;
  46. }
  47. if (self._ordered) {
  48. self._buffer.put(pos, (data === undefined || data === null) ? null : data);
  49. }
  50. else {
  51. self._buffer.push(data);
  52. }
  53. self._drain();
  54. });
  55. if (this._top - this._bottom < this._maxParallel) return callback();
  56. this._ondrain = callback;
  57. };
  58. ParallelTransform.prototype._flush = function(callback) {
  59. this._flushed = true;
  60. this._ondrain = callback;
  61. this._drain();
  62. };
  63. ParallelTransform.prototype._drain = function() {
  64. if (this._ordered) {
  65. while (this._buffer.get(this._bottom) !== undefined) {
  66. var data = this._buffer.del(this._bottom++);
  67. if (data === null) continue;
  68. this.push(data);
  69. }
  70. }
  71. else {
  72. while (this._buffer.length > 0) {
  73. var data = this._buffer.pop();
  74. this._bottom++;
  75. if (data === null) continue;
  76. this.push(data);
  77. }
  78. }
  79. if (!this._drained() || !this._ondrain) return;
  80. var ondrain = this._ondrain;
  81. this._ondrain = null;
  82. ondrain();
  83. };
  84. ParallelTransform.prototype._drained = function() {
  85. var diff = this._top - this._bottom;
  86. return this._flushed ? !diff : diff < this._maxParallel;
  87. };
  88. module.exports = ParallelTransform;