index.js 899 B

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. 'use strict';
  2. var PassThrough = require('readable-stream/passthrough')
  3. module.exports = function (/*streams...*/) {
  4. var sources = []
  5. var output = new PassThrough({objectMode: true})
  6. output.setMaxListeners(0)
  7. output.add = add
  8. output.isEmpty = isEmpty
  9. output.on('unpipe', remove)
  10. Array.prototype.slice.call(arguments).forEach(add)
  11. return output
  12. function add (source) {
  13. if (Array.isArray(source)) {
  14. source.forEach(add)
  15. return this
  16. }
  17. sources.push(source);
  18. source.once('end', remove.bind(null, source))
  19. source.once('error', output.emit.bind(output, 'error'))
  20. source.pipe(output, {end: false})
  21. return this
  22. }
  23. function isEmpty () {
  24. return sources.length == 0;
  25. }
  26. function remove (source) {
  27. sources = sources.filter(function (it) { return it !== source })
  28. if (!sources.length && output.readable) { output.end() }
  29. }
  30. }