index.js 1.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. var eos = require('end-of-stream')
  2. var shift = require('stream-shift')
  3. module.exports = each
  4. function each (stream, fn, cb) {
  5. var want = true
  6. var error = null
  7. var ended = false
  8. var running = false
  9. var calling = false
  10. stream.on('readable', onreadable)
  11. onreadable()
  12. if (cb) eos(stream, {readable: true, writable: false}, done)
  13. return stream
  14. function done (err) {
  15. if (!error) error = err
  16. ended = true
  17. if (!running) cb(error)
  18. }
  19. function onreadable () {
  20. if (want) read()
  21. }
  22. function afterRead (err) {
  23. running = false
  24. if (err) {
  25. error = err
  26. if (ended) return cb(error)
  27. stream.destroy(err)
  28. return
  29. }
  30. if (ended) return cb(error)
  31. if (!calling) read()
  32. }
  33. function read () {
  34. while (!running && !ended) {
  35. want = false
  36. var data = shift(stream)
  37. if (ended) return
  38. if (data === null) {
  39. want = true
  40. return
  41. }
  42. running = true
  43. calling = true
  44. fn(data, afterRead)
  45. calling = false
  46. }
  47. }
  48. }