functor.js 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. 'use strict';
  2. var RingBuffer = require('./ring_buffer');
  3. var Functor = function(session, method) {
  4. this._session = session;
  5. this._method = method;
  6. this._queue = new RingBuffer(Functor.QUEUE_SIZE);
  7. this._stopped = false;
  8. this.pending = 0;
  9. };
  10. Functor.QUEUE_SIZE = 8;
  11. Functor.prototype.call = function(error, message, callback, context) {
  12. if (this._stopped) return;
  13. var record = { error: error, message: message, callback: callback, context: context, done: false },
  14. called = false,
  15. self = this;
  16. this._queue.push(record);
  17. if (record.error) {
  18. record.done = true;
  19. this._stop();
  20. return this._flushQueue();
  21. }
  22. var handler = function(err, msg) {
  23. if (!(called ^ (called = true))) return;
  24. if (err) {
  25. self._stop();
  26. record.error = err;
  27. record.message = null;
  28. } else {
  29. record.message = msg;
  30. }
  31. record.done = true;
  32. self._flushQueue();
  33. };
  34. try {
  35. this._session[this._method](message, handler);
  36. } catch (err) {
  37. handler(err);
  38. }
  39. };
  40. Functor.prototype._stop = function() {
  41. this.pending = this._queue.length;
  42. this._stopped = true;
  43. };
  44. Functor.prototype._flushQueue = function() {
  45. var queue = this._queue, record;
  46. while (queue.length > 0 && queue.peek().done) {
  47. record = queue.shift();
  48. if (record.error) {
  49. this.pending = 0;
  50. queue.clear();
  51. } else {
  52. this.pending -= 1;
  53. }
  54. record.callback.call(record.context, record.error, record.message);
  55. }
  56. };
  57. module.exports = Functor;