index.js 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. 'use strict'
  2. /*
  3. * merge2
  4. * https://github.com/teambition/merge2
  5. *
  6. * Copyright (c) 2014-2016 Teambition
  7. * Licensed under the MIT license.
  8. */
  9. const Stream = require('stream')
  10. const PassThrough = Stream.PassThrough
  11. const slice = Array.prototype.slice
  12. module.exports = merge2
  13. function merge2 () {
  14. const streamsQueue = []
  15. let merging = false
  16. const args = slice.call(arguments)
  17. let options = args[args.length - 1]
  18. if (options && !Array.isArray(options) && options.pipe == null) args.pop()
  19. else options = {}
  20. const doEnd = options.end !== false
  21. if (options.objectMode == null) options.objectMode = true
  22. if (options.highWaterMark == null) options.highWaterMark = 64 * 1024
  23. const mergedStream = PassThrough(options)
  24. function addStream () {
  25. for (let i = 0, len = arguments.length; i < len; i++) {
  26. streamsQueue.push(pauseStreams(arguments[i], options))
  27. }
  28. mergeStream()
  29. return this
  30. }
  31. function mergeStream () {
  32. if (merging) return
  33. merging = true
  34. let streams = streamsQueue.shift()
  35. if (!streams) {
  36. process.nextTick(endStream)
  37. return
  38. }
  39. if (!Array.isArray(streams)) streams = [streams]
  40. let pipesCount = streams.length + 1
  41. function next () {
  42. if (--pipesCount > 0) return
  43. merging = false
  44. mergeStream()
  45. }
  46. function pipe (stream) {
  47. function onend () {
  48. stream.removeListener('merge2UnpipeEnd', onend)
  49. stream.removeListener('end', onend)
  50. next()
  51. }
  52. // skip ended stream
  53. if (stream._readableState.endEmitted) return next()
  54. stream.on('merge2UnpipeEnd', onend)
  55. stream.on('end', onend)
  56. stream.pipe(mergedStream, { end: false })
  57. // compatible for old stream
  58. stream.resume()
  59. }
  60. for (let i = 0; i < streams.length; i++) pipe(streams[i])
  61. next()
  62. }
  63. function endStream () {
  64. merging = false
  65. // emit 'queueDrain' when all streams merged.
  66. mergedStream.emit('queueDrain')
  67. return doEnd && mergedStream.end()
  68. }
  69. mergedStream.setMaxListeners(0)
  70. mergedStream.add = addStream
  71. mergedStream.on('unpipe', function (stream) {
  72. stream.emit('merge2UnpipeEnd')
  73. })
  74. if (args.length) addStream.apply(null, args)
  75. return mergedStream
  76. }
  77. // check and pause streams for pipe.
  78. function pauseStreams (streams, options) {
  79. if (!Array.isArray(streams)) {
  80. // Backwards-compat with old-style streams
  81. if (!streams._readableState && streams.pipe) streams = streams.pipe(PassThrough(options))
  82. if (!streams._readableState || !streams.pause || !streams.pipe) {
  83. throw new Error('Only readable stream can be merged.')
  84. }
  85. streams.pause()
  86. } else {
  87. for (let i = 0, len = streams.length; i < len; i++) streams[i] = pauseStreams(streams[i], options)
  88. }
  89. return streams
  90. }