index.mjs 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. 'use strict'
  2. /*
  3. * merge2
  4. * https://github.com/teambition/merge2
  5. *
  6. * Copyright (c) 2014-2016 Teambition
  7. * Licensed under the MIT license.
  8. */
  9. import Stream from 'stream'
  10. const PassThrough = Stream.PassThrough
  11. const slice = Array.prototype.slice
  12. function merge2 () {
  13. const streamsQueue = []
  14. let merging = false
  15. let args = slice.call(arguments)
  16. let options = args[args.length - 1]
  17. if (options && !Array.isArray(options) && options.pipe == null) args.pop()
  18. else options = {}
  19. let doEnd = options.end !== false
  20. if (options.objectMode == null) options.objectMode = true
  21. if (options.highWaterMark == null) options.highWaterMark = 64 * 1024
  22. const mergedStream = PassThrough(options)
  23. function addStream () {
  24. for (let i = 0, len = arguments.length; i < len; i++) {
  25. streamsQueue.push(pauseStreams(arguments[i], options))
  26. }
  27. mergeStream()
  28. return this
  29. }
  30. function mergeStream () {
  31. if (merging) return
  32. merging = true
  33. let streams = streamsQueue.shift()
  34. if (!streams) {
  35. process.nextTick(endStream)
  36. return
  37. }
  38. if (!Array.isArray(streams)) streams = [streams]
  39. let pipesCount = streams.length + 1
  40. function next () {
  41. if (--pipesCount > 0) return
  42. merging = false
  43. mergeStream()
  44. }
  45. function pipe (stream) {
  46. function onend () {
  47. stream.removeListener('merge2UnpipeEnd', onend)
  48. stream.removeListener('end', onend)
  49. next()
  50. }
  51. // skip ended stream
  52. if (stream._readableState.endEmitted) return next()
  53. stream.on('merge2UnpipeEnd', onend)
  54. stream.on('end', onend)
  55. stream.pipe(mergedStream, { end: false })
  56. // compatible for old stream
  57. stream.resume()
  58. }
  59. for (let i = 0; i < streams.length; i++) pipe(streams[i])
  60. next()
  61. }
  62. function endStream () {
  63. merging = false
  64. // emit 'queueDrain' when all streams merged.
  65. mergedStream.emit('queueDrain')
  66. return doEnd && mergedStream.end()
  67. }
  68. mergedStream.setMaxListeners(0)
  69. mergedStream.add = addStream
  70. mergedStream.on('unpipe', function (stream) {
  71. stream.emit('merge2UnpipeEnd')
  72. })
  73. if (args.length) addStream.apply(null, args)
  74. return mergedStream
  75. }
  76. // check and pause streams for pipe.
  77. function pauseStreams (streams, options) {
  78. if (!Array.isArray(streams)) {
  79. // Backwards-compat with old-style streams
  80. if (!streams._readableState && streams.pipe) streams = streams.pipe(PassThrough(options))
  81. if (!streams._readableState || !streams.pause || !streams.pipe) {
  82. throw new Error('Only readable stream can be merged.')
  83. }
  84. streams.pause()
  85. } else {
  86. for (let i = 0, len = streams.length; i < len; i++) streams[i] = pauseStreams(streams[i], options)
  87. }
  88. return streams
  89. }
  90. export default merge2
  91. export { merge2 }