index.js 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. var Stream = require("stream")
  2. var writeMethods = ["write", "end", "destroy"]
  3. var readMethods = ["resume", "pause"]
  4. var readEvents = ["data", "close"]
  5. var slice = Array.prototype.slice
  6. module.exports = duplex
  7. function forEach (arr, fn) {
  8. if (arr.forEach) {
  9. return arr.forEach(fn)
  10. }
  11. for (var i = 0; i < arr.length; i++) {
  12. fn(arr[i], i)
  13. }
  14. }
  15. function duplex(writer, reader) {
  16. var stream = new Stream()
  17. var ended = false
  18. forEach(writeMethods, proxyWriter)
  19. forEach(readMethods, proxyReader)
  20. forEach(readEvents, proxyStream)
  21. reader.on("end", handleEnd)
  22. writer.on("drain", function() {
  23. stream.emit("drain")
  24. })
  25. writer.on("error", reemit)
  26. reader.on("error", reemit)
  27. stream.writable = writer.writable
  28. stream.readable = reader.readable
  29. return stream
  30. function proxyWriter(methodName) {
  31. stream[methodName] = method
  32. function method() {
  33. return writer[methodName].apply(writer, arguments)
  34. }
  35. }
  36. function proxyReader(methodName) {
  37. stream[methodName] = method
  38. function method() {
  39. stream.emit(methodName)
  40. var func = reader[methodName]
  41. if (func) {
  42. return func.apply(reader, arguments)
  43. }
  44. reader.emit(methodName)
  45. }
  46. }
  47. function proxyStream(methodName) {
  48. reader.on(methodName, reemit)
  49. function reemit() {
  50. var args = slice.call(arguments)
  51. args.unshift(methodName)
  52. stream.emit.apply(stream, args)
  53. }
  54. }
  55. function handleEnd() {
  56. if (ended) {
  57. return
  58. }
  59. ended = true
  60. var args = slice.call(arguments)
  61. args.unshift("end")
  62. stream.emit.apply(stream, args)
  63. }
  64. function reemit(err) {
  65. stream.emit("error", err)
  66. }
  67. }