index.js 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. var once = require('once')
  2. var eos = require('end-of-stream')
  3. var fs = require('fs') // we only need fs to get the ReadStream and WriteStream prototypes
  4. var noop = function () {}
  5. var ancient = /^v?\.0/.test(process.version)
  6. var isFn = function (fn) {
  7. return typeof fn === 'function'
  8. }
  9. var isFS = function (stream) {
  10. if (!ancient) return false // newer node version do not need to care about fs is a special way
  11. if (!fs) return false // browser
  12. return (stream instanceof (fs.ReadStream || noop) || stream instanceof (fs.WriteStream || noop)) && isFn(stream.close)
  13. }
  14. var isRequest = function (stream) {
  15. return stream.setHeader && isFn(stream.abort)
  16. }
  17. var destroyer = function (stream, reading, writing, callback) {
  18. callback = once(callback)
  19. var closed = false
  20. stream.on('close', function () {
  21. closed = true
  22. })
  23. eos(stream, {readable: reading, writable: writing}, function (err) {
  24. if (err) return callback(err)
  25. closed = true
  26. callback()
  27. })
  28. var destroyed = false
  29. return function (err) {
  30. if (closed) return
  31. if (destroyed) return
  32. destroyed = true
  33. if (isFS(stream)) return stream.close(noop) // use close for fs streams to avoid fd leaks
  34. if (isRequest(stream)) return stream.abort() // request.destroy just do .end - .abort is what we want
  35. if (isFn(stream.destroy)) return stream.destroy()
  36. callback(err || new Error('stream was destroyed'))
  37. }
  38. }
  39. var call = function (fn) {
  40. fn()
  41. }
  42. var pipe = function (from, to) {
  43. return from.pipe(to)
  44. }
  45. var pump = function () {
  46. var streams = Array.prototype.slice.call(arguments)
  47. var callback = isFn(streams[streams.length - 1] || noop) && streams.pop() || noop
  48. if (Array.isArray(streams[0])) streams = streams[0]
  49. if (streams.length < 2) throw new Error('pump requires two streams per minimum')
  50. var error
  51. var destroys = streams.map(function (stream, i) {
  52. var reading = i < streams.length - 1
  53. var writing = i > 0
  54. return destroyer(stream, reading, writing, function (err) {
  55. if (!error) error = err
  56. if (err) destroys.forEach(call)
  57. if (reading) return
  58. destroys.forEach(call)
  59. callback(error)
  60. })
  61. })
  62. return streams.reduce(pipe)
  63. }
  64. module.exports = pump