index.js 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. 'use strict';
  2. var Cell = require('./cell'),
  3. Pledge = require('./pledge');
  4. var Pipeline = function(sessions) {
  5. this._cells = sessions.map(function(session) { return new Cell(session) });
  6. this._stopped = { incoming: false, outgoing: false };
  7. };
  8. Pipeline.prototype.processIncomingMessage = function(message, callback, context) {
  9. if (this._stopped.incoming) return;
  10. this._loop('incoming', this._cells.length - 1, -1, -1, message, callback, context);
  11. };
  12. Pipeline.prototype.processOutgoingMessage = function(message, callback, context) {
  13. if (this._stopped.outgoing) return;
  14. this._loop('outgoing', 0, this._cells.length, 1, message, callback, context);
  15. };
  16. Pipeline.prototype.close = function(callback, context) {
  17. this._stopped = { incoming: true, outgoing: true };
  18. var closed = this._cells.map(function(a) { return a.close() });
  19. if (callback)
  20. Pledge.all(closed).then(function() { callback.call(context) });
  21. };
  22. Pipeline.prototype._loop = function(direction, start, end, step, message, callback, context) {
  23. var cells = this._cells,
  24. n = cells.length,
  25. self = this;
  26. while (n--) cells[n].pending(direction);
  27. var pipe = function(index, error, msg) {
  28. if (index === end) return callback.call(context, error, msg);
  29. cells[index][direction](error, msg, function(err, m) {
  30. if (err) self._stopped[direction] = true;
  31. pipe(index + step, err, m);
  32. });
  33. };
  34. pipe(start, null, message);
  35. };
  36. module.exports = Pipeline;