test.js 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. var tape = require('tape')
  2. var through = require('through2')
  3. var pumpify = require('./')
  4. var stream = require('stream')
  5. var duplexify = require('duplexify')
  6. tape('basic', function(t) {
  7. t.plan(3)
  8. var pipeline = pumpify(
  9. through(function(data, enc, cb) {
  10. t.same(data.toString(), 'hello')
  11. cb(null, data.toString().toUpperCase())
  12. }),
  13. through(function(data, enc, cb) {
  14. t.same(data.toString(), 'HELLO')
  15. cb(null, data.toString().toLowerCase())
  16. })
  17. )
  18. pipeline.write('hello')
  19. pipeline.on('data', function(data) {
  20. t.same(data.toString(), 'hello')
  21. t.end()
  22. })
  23. })
  24. tape('3 times', function(t) {
  25. t.plan(4)
  26. var pipeline = pumpify(
  27. through(function(data, enc, cb) {
  28. t.same(data.toString(), 'hello')
  29. cb(null, data.toString().toUpperCase())
  30. }),
  31. through(function(data, enc, cb) {
  32. t.same(data.toString(), 'HELLO')
  33. cb(null, data.toString().toLowerCase())
  34. }),
  35. through(function(data, enc, cb) {
  36. t.same(data.toString(), 'hello')
  37. cb(null, data.toString().toUpperCase())
  38. })
  39. )
  40. pipeline.write('hello')
  41. pipeline.on('data', function(data) {
  42. t.same(data.toString(), 'HELLO')
  43. t.end()
  44. })
  45. })
  46. tape('destroy', function(t) {
  47. var test = through()
  48. test.destroy = function() {
  49. t.ok(true)
  50. t.end()
  51. }
  52. var pipeline = pumpify(through(), test)
  53. pipeline.destroy()
  54. })
  55. tape('close', function(t) {
  56. var test = through()
  57. var pipeline = pumpify(through(), test)
  58. pipeline.on('error', function(err) {
  59. t.same(err.message, 'lol')
  60. t.end()
  61. })
  62. test.emit('error', new Error('lol'))
  63. })
  64. tape('end waits for last one', function(t) {
  65. var ran = false
  66. var a = through()
  67. var b = through()
  68. var c = through(function(data, enc, cb) {
  69. setTimeout(function() {
  70. ran = true
  71. cb()
  72. }, 100)
  73. })
  74. var pipeline = pumpify(a, b, c)
  75. pipeline.write('foo')
  76. pipeline.end(function() {
  77. t.ok(ran)
  78. t.end()
  79. })
  80. t.ok(!ran)
  81. })
  82. tape('always wait for finish', function(t) {
  83. var a = new stream.Readable()
  84. a._read = function() {}
  85. a.push('hello')
  86. var pipeline = pumpify(a, through(), through())
  87. var ran = false
  88. pipeline.on('finish', function() {
  89. t.ok(ran)
  90. t.end()
  91. })
  92. setTimeout(function() {
  93. ran = true
  94. a.push(null)
  95. }, 100)
  96. })
  97. tape('async', function(t) {
  98. var pipeline = pumpify()
  99. t.plan(4)
  100. pipeline.write('hello')
  101. pipeline.on('data', function(data) {
  102. t.same(data.toString(), 'HELLO')
  103. t.end()
  104. })
  105. setTimeout(function() {
  106. pipeline.setPipeline(
  107. through(function(data, enc, cb) {
  108. t.same(data.toString(), 'hello')
  109. cb(null, data.toString().toUpperCase())
  110. }),
  111. through(function(data, enc, cb) {
  112. t.same(data.toString(), 'HELLO')
  113. cb(null, data.toString().toLowerCase())
  114. }),
  115. through(function(data, enc, cb) {
  116. t.same(data.toString(), 'hello')
  117. cb(null, data.toString().toUpperCase())
  118. })
  119. )
  120. }, 100)
  121. })
  122. tape('early destroy', function(t) {
  123. var a = through()
  124. var b = through()
  125. var c = through()
  126. b.destroy = function() {
  127. t.ok(true)
  128. t.end()
  129. }
  130. var pipeline = pumpify()
  131. pipeline.destroy()
  132. setTimeout(function() {
  133. pipeline.setPipeline(a, b, c)
  134. }, 100)
  135. })
  136. tape('preserves error', function (t) {
  137. var a = through()
  138. var b = through(function (data, enc, cb) {
  139. cb(new Error('stop'))
  140. })
  141. var c = through()
  142. var s = pumpify()
  143. s.on('error', function (err) {
  144. t.same(err.message, 'stop')
  145. t.end()
  146. })
  147. s.setPipeline(a, b, c)
  148. s.resume()
  149. s.write('hi')
  150. })
  151. tape('preserves error again', function (t) {
  152. var ws = new stream.Writable()
  153. var rs = new stream.Readable({highWaterMark: 16})
  154. ws._write = function (data, enc, cb) {
  155. cb(null)
  156. }
  157. rs._read = function () {
  158. process.nextTick(function () {
  159. rs.push('hello world')
  160. })
  161. }
  162. var pumpifyErr = pumpify(
  163. through(),
  164. through(function(chunk, _, cb) {
  165. cb(new Error('test'))
  166. }),
  167. ws
  168. )
  169. rs.pipe(pumpifyErr)
  170. .on('error', function (err) {
  171. t.ok(err)
  172. t.ok(err.message !== 'premature close', 'does not close with premature close')
  173. t.end()
  174. })
  175. })
  176. tape('returns error from duplexify', function (t) {
  177. var a = through()
  178. var b = duplexify()
  179. var s = pumpify()
  180. s.setPipeline(a, b)
  181. s.on('error', function (err) {
  182. t.same(err.message, 'stop')
  183. t.end()
  184. })
  185. s.write('data')
  186. // Test passes if `.end()` is not called
  187. s.end()
  188. b.setWritable(through())
  189. setImmediate(function () {
  190. b.destroy(new Error('stop'))
  191. })
  192. })