index.js 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. var pump = require('pump')
  2. var inherits = require('inherits')
  3. var Duplexify = require('duplexify')
  4. var toArray = function(args) {
  5. if (!args.length) return []
  6. return Array.isArray(args[0]) ? args[0] : Array.prototype.slice.call(args)
  7. }
  8. var define = function(opts) {
  9. var Pumpify = function() {
  10. var streams = toArray(arguments)
  11. if (!(this instanceof Pumpify)) return new Pumpify(streams)
  12. Duplexify.call(this, null, null, opts)
  13. if (streams.length) this.setPipeline(streams)
  14. }
  15. inherits(Pumpify, Duplexify)
  16. Pumpify.prototype.setPipeline = function() {
  17. var streams = toArray(arguments)
  18. var self = this
  19. var ended = false
  20. var w = streams[0]
  21. var r = streams[streams.length-1]
  22. r = r.readable ? r : null
  23. w = w.writable ? w : null
  24. var onclose = function() {
  25. streams[0].emit('error', new Error('stream was destroyed'))
  26. }
  27. this.on('close', onclose)
  28. this.on('prefinish', function() {
  29. if (!ended) self.cork()
  30. })
  31. pump(streams, function(err) {
  32. self.removeListener('close', onclose)
  33. if (err) return self.destroy(err.message === 'premature close' ? null : err)
  34. ended = true
  35. // pump ends after the last stream is not writable *but*
  36. // pumpify still forwards the readable part so we need to catch errors
  37. // still, so reenable autoDestroy in this case
  38. if (self._autoDestroy === false) self._autoDestroy = true
  39. self.uncork()
  40. })
  41. if (this.destroyed) return onclose()
  42. this.setWritable(w)
  43. this.setReadable(r)
  44. }
  45. return Pumpify
  46. }
  47. module.exports = define({autoDestroy:false, destroy:false})
  48. module.exports.obj = define({autoDestroy: false, destroy:false, objectMode:true, highWaterMark:16})
  49. module.exports.ctor = define