permessage-deflate.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517
  1. 'use strict';
  2. const Limiter = require('async-limiter');
  3. const zlib = require('zlib');
  4. const bufferUtil = require('./buffer-util');
  5. const constants = require('./constants');
  6. const TRAILER = Buffer.from([0x00, 0x00, 0xff, 0xff]);
  7. const EMPTY_BLOCK = Buffer.from([0x00]);
  8. const kPerMessageDeflate = Symbol('permessage-deflate');
  9. const kWriteInProgress = Symbol('write-in-progress');
  10. const kPendingClose = Symbol('pending-close');
  11. const kTotalLength = Symbol('total-length');
  12. const kCallback = Symbol('callback');
  13. const kBuffers = Symbol('buffers');
  14. const kError = Symbol('error');
  15. //
  16. // We limit zlib concurrency, which prevents severe memory fragmentation
  17. // as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913
  18. // and https://github.com/websockets/ws/issues/1202
  19. //
  20. // Intentionally global; it's the global thread pool that's an issue.
  21. //
  22. let zlibLimiter;
  23. /**
  24. * permessage-deflate implementation.
  25. */
  26. class PerMessageDeflate {
  27. /**
  28. * Creates a PerMessageDeflate instance.
  29. *
  30. * @param {Object} options Configuration options
  31. * @param {Boolean} options.serverNoContextTakeover Request/accept disabling
  32. * of server context takeover
  33. * @param {Boolean} options.clientNoContextTakeover Advertise/acknowledge
  34. * disabling of client context takeover
  35. * @param {(Boolean|Number)} options.serverMaxWindowBits Request/confirm the
  36. * use of a custom server window size
  37. * @param {(Boolean|Number)} options.clientMaxWindowBits Advertise support
  38. * for, or request, a custom client window size
  39. * @param {Object} options.zlibDeflateOptions Options to pass to zlib on deflate
  40. * @param {Object} options.zlibInflateOptions Options to pass to zlib on inflate
  41. * @param {Number} options.threshold Size (in bytes) below which messages
  42. * should not be compressed
  43. * @param {Number} options.concurrencyLimit The number of concurrent calls to
  44. * zlib
  45. * @param {Boolean} isServer Create the instance in either server or client
  46. * mode
  47. * @param {Number} maxPayload The maximum allowed message length
  48. */
  49. constructor (options, isServer, maxPayload) {
  50. this._maxPayload = maxPayload | 0;
  51. this._options = options || {};
  52. this._threshold = this._options.threshold !== undefined
  53. ? this._options.threshold
  54. : 1024;
  55. this._isServer = !!isServer;
  56. this._deflate = null;
  57. this._inflate = null;
  58. this.params = null;
  59. if (!zlibLimiter) {
  60. const concurrency = this._options.concurrencyLimit !== undefined
  61. ? this._options.concurrencyLimit
  62. : 10;
  63. zlibLimiter = new Limiter({ concurrency });
  64. }
  65. }
  66. /**
  67. * @type {String}
  68. */
  69. static get extensionName () {
  70. return 'permessage-deflate';
  71. }
  72. /**
  73. * Create an extension negotiation offer.
  74. *
  75. * @return {Object} Extension parameters
  76. * @public
  77. */
  78. offer () {
  79. const params = {};
  80. if (this._options.serverNoContextTakeover) {
  81. params.server_no_context_takeover = true;
  82. }
  83. if (this._options.clientNoContextTakeover) {
  84. params.client_no_context_takeover = true;
  85. }
  86. if (this._options.serverMaxWindowBits) {
  87. params.server_max_window_bits = this._options.serverMaxWindowBits;
  88. }
  89. if (this._options.clientMaxWindowBits) {
  90. params.client_max_window_bits = this._options.clientMaxWindowBits;
  91. } else if (this._options.clientMaxWindowBits == null) {
  92. params.client_max_window_bits = true;
  93. }
  94. return params;
  95. }
  96. /**
  97. * Accept an extension negotiation offer/response.
  98. *
  99. * @param {Array} configurations The extension negotiation offers/reponse
  100. * @return {Object} Accepted configuration
  101. * @public
  102. */
  103. accept (configurations) {
  104. configurations = this.normalizeParams(configurations);
  105. this.params = this._isServer
  106. ? this.acceptAsServer(configurations)
  107. : this.acceptAsClient(configurations);
  108. return this.params;
  109. }
  110. /**
  111. * Releases all resources used by the extension.
  112. *
  113. * @public
  114. */
  115. cleanup () {
  116. if (this._inflate) {
  117. if (this._inflate[kWriteInProgress]) {
  118. this._inflate[kPendingClose] = true;
  119. } else {
  120. this._inflate.close();
  121. this._inflate = null;
  122. }
  123. }
  124. if (this._deflate) {
  125. if (this._deflate[kWriteInProgress]) {
  126. this._deflate[kPendingClose] = true;
  127. } else {
  128. this._deflate.close();
  129. this._deflate = null;
  130. }
  131. }
  132. }
  133. /**
  134. * Accept an extension negotiation offer.
  135. *
  136. * @param {Array} offers The extension negotiation offers
  137. * @return {Object} Accepted configuration
  138. * @private
  139. */
  140. acceptAsServer (offers) {
  141. const opts = this._options;
  142. const accepted = offers.find((params) => {
  143. if (
  144. (opts.serverNoContextTakeover === false &&
  145. params.server_no_context_takeover) ||
  146. (params.server_max_window_bits &&
  147. (opts.serverMaxWindowBits === false ||
  148. (typeof opts.serverMaxWindowBits === 'number' &&
  149. opts.serverMaxWindowBits > params.server_max_window_bits))) ||
  150. (typeof opts.clientMaxWindowBits === 'number' &&
  151. !params.client_max_window_bits)
  152. ) {
  153. return false;
  154. }
  155. return true;
  156. });
  157. if (!accepted) {
  158. throw new Error('None of the extension offers can be accepted');
  159. }
  160. if (opts.serverNoContextTakeover) {
  161. accepted.server_no_context_takeover = true;
  162. }
  163. if (opts.clientNoContextTakeover) {
  164. accepted.client_no_context_takeover = true;
  165. }
  166. if (typeof opts.serverMaxWindowBits === 'number') {
  167. accepted.server_max_window_bits = opts.serverMaxWindowBits;
  168. }
  169. if (typeof opts.clientMaxWindowBits === 'number') {
  170. accepted.client_max_window_bits = opts.clientMaxWindowBits;
  171. } else if (
  172. accepted.client_max_window_bits === true ||
  173. opts.clientMaxWindowBits === false
  174. ) {
  175. delete accepted.client_max_window_bits;
  176. }
  177. return accepted;
  178. }
  179. /**
  180. * Accept the extension negotiation response.
  181. *
  182. * @param {Array} response The extension negotiation response
  183. * @return {Object} Accepted configuration
  184. * @private
  185. */
  186. acceptAsClient (response) {
  187. const params = response[0];
  188. if (
  189. this._options.clientNoContextTakeover === false &&
  190. params.client_no_context_takeover
  191. ) {
  192. throw new Error('Unexpected parameter "client_no_context_takeover"');
  193. }
  194. if (!params.client_max_window_bits) {
  195. if (typeof this._options.clientMaxWindowBits === 'number') {
  196. params.client_max_window_bits = this._options.clientMaxWindowBits;
  197. }
  198. } else if (
  199. this._options.clientMaxWindowBits === false ||
  200. (typeof this._options.clientMaxWindowBits === 'number' &&
  201. params.client_max_window_bits > this._options.clientMaxWindowBits)
  202. ) {
  203. throw new Error(
  204. 'Unexpected or invalid parameter "client_max_window_bits"'
  205. );
  206. }
  207. return params;
  208. }
  209. /**
  210. * Normalize parameters.
  211. *
  212. * @param {Array} configurations The extension negotiation offers/reponse
  213. * @return {Array} The offers/response with normalized parameters
  214. * @private
  215. */
  216. normalizeParams (configurations) {
  217. configurations.forEach((params) => {
  218. Object.keys(params).forEach((key) => {
  219. var value = params[key];
  220. if (value.length > 1) {
  221. throw new Error(`Parameter "${key}" must have only a single value`);
  222. }
  223. value = value[0];
  224. if (key === 'client_max_window_bits') {
  225. if (value !== true) {
  226. const num = +value;
  227. if (!Number.isInteger(num) || num < 8 || num > 15) {
  228. throw new TypeError(
  229. `Invalid value for parameter "${key}": ${value}`
  230. );
  231. }
  232. value = num;
  233. } else if (!this._isServer) {
  234. throw new TypeError(
  235. `Invalid value for parameter "${key}": ${value}`
  236. );
  237. }
  238. } else if (key === 'server_max_window_bits') {
  239. const num = +value;
  240. if (!Number.isInteger(num) || num < 8 || num > 15) {
  241. throw new TypeError(
  242. `Invalid value for parameter "${key}": ${value}`
  243. );
  244. }
  245. value = num;
  246. } else if (
  247. key === 'client_no_context_takeover' ||
  248. key === 'server_no_context_takeover'
  249. ) {
  250. if (value !== true) {
  251. throw new TypeError(
  252. `Invalid value for parameter "${key}": ${value}`
  253. );
  254. }
  255. } else {
  256. throw new Error(`Unknown parameter "${key}"`);
  257. }
  258. params[key] = value;
  259. });
  260. });
  261. return configurations;
  262. }
  263. /**
  264. * Decompress data. Concurrency limited by async-limiter.
  265. *
  266. * @param {Buffer} data Compressed data
  267. * @param {Boolean} fin Specifies whether or not this is the last fragment
  268. * @param {Function} callback Callback
  269. * @public
  270. */
  271. decompress (data, fin, callback) {
  272. zlibLimiter.push((done) => {
  273. this._decompress(data, fin, (err, result) => {
  274. done();
  275. callback(err, result);
  276. });
  277. });
  278. }
  279. /**
  280. * Compress data. Concurrency limited by async-limiter.
  281. *
  282. * @param {Buffer} data Data to compress
  283. * @param {Boolean} fin Specifies whether or not this is the last fragment
  284. * @param {Function} callback Callback
  285. * @public
  286. */
  287. compress (data, fin, callback) {
  288. zlibLimiter.push((done) => {
  289. this._compress(data, fin, (err, result) => {
  290. done();
  291. callback(err, result);
  292. });
  293. });
  294. }
  295. /**
  296. * Decompress data.
  297. *
  298. * @param {Buffer} data Compressed data
  299. * @param {Boolean} fin Specifies whether or not this is the last fragment
  300. * @param {Function} callback Callback
  301. * @private
  302. */
  303. _decompress (data, fin, callback) {
  304. const endpoint = this._isServer ? 'client' : 'server';
  305. if (!this._inflate) {
  306. const key = `${endpoint}_max_window_bits`;
  307. const windowBits = typeof this.params[key] !== 'number'
  308. ? zlib.Z_DEFAULT_WINDOWBITS
  309. : this.params[key];
  310. this._inflate = zlib.createInflateRaw(
  311. Object.assign({}, this._options.zlibInflateOptions, { windowBits })
  312. );
  313. this._inflate[kPerMessageDeflate] = this;
  314. this._inflate[kTotalLength] = 0;
  315. this._inflate[kBuffers] = [];
  316. this._inflate.on('error', inflateOnError);
  317. this._inflate.on('data', inflateOnData);
  318. }
  319. this._inflate[kCallback] = callback;
  320. this._inflate[kWriteInProgress] = true;
  321. this._inflate.write(data);
  322. if (fin) this._inflate.write(TRAILER);
  323. this._inflate.flush(() => {
  324. const err = this._inflate[kError];
  325. if (err) {
  326. this._inflate.close();
  327. this._inflate = null;
  328. callback(err);
  329. return;
  330. }
  331. const data = bufferUtil.concat(
  332. this._inflate[kBuffers],
  333. this._inflate[kTotalLength]
  334. );
  335. if (
  336. (fin && this.params[`${endpoint}_no_context_takeover`]) ||
  337. this._inflate[kPendingClose]
  338. ) {
  339. this._inflate.close();
  340. this._inflate = null;
  341. } else {
  342. this._inflate[kWriteInProgress] = false;
  343. this._inflate[kTotalLength] = 0;
  344. this._inflate[kBuffers] = [];
  345. }
  346. callback(null, data);
  347. });
  348. }
  349. /**
  350. * Compress data.
  351. *
  352. * @param {Buffer} data Data to compress
  353. * @param {Boolean} fin Specifies whether or not this is the last fragment
  354. * @param {Function} callback Callback
  355. * @private
  356. */
  357. _compress (data, fin, callback) {
  358. if (!data || data.length === 0) {
  359. process.nextTick(callback, null, EMPTY_BLOCK);
  360. return;
  361. }
  362. const endpoint = this._isServer ? 'server' : 'client';
  363. if (!this._deflate) {
  364. const key = `${endpoint}_max_window_bits`;
  365. const windowBits = typeof this.params[key] !== 'number'
  366. ? zlib.Z_DEFAULT_WINDOWBITS
  367. : this.params[key];
  368. this._deflate = zlib.createDeflateRaw(
  369. Object.assign(
  370. // TODO deprecate memLevel/level and recommend zlibDeflateOptions instead
  371. {
  372. memLevel: this._options.memLevel,
  373. level: this._options.level
  374. },
  375. this._options.zlibDeflateOptions,
  376. { windowBits }
  377. )
  378. );
  379. this._deflate[kTotalLength] = 0;
  380. this._deflate[kBuffers] = [];
  381. //
  382. // `zlib.DeflateRaw` emits an `'error'` event only when an attempt to use
  383. // it is made after it has already been closed. This cannot happen here,
  384. // so we only add a listener for the `'data'` event.
  385. //
  386. this._deflate.on('data', deflateOnData);
  387. }
  388. this._deflate[kWriteInProgress] = true;
  389. this._deflate.write(data);
  390. this._deflate.flush(zlib.Z_SYNC_FLUSH, () => {
  391. var data = bufferUtil.concat(
  392. this._deflate[kBuffers],
  393. this._deflate[kTotalLength]
  394. );
  395. if (fin) data = data.slice(0, data.length - 4);
  396. if (
  397. (fin && this.params[`${endpoint}_no_context_takeover`]) ||
  398. this._deflate[kPendingClose]
  399. ) {
  400. this._deflate.close();
  401. this._deflate = null;
  402. } else {
  403. this._deflate[kWriteInProgress] = false;
  404. this._deflate[kTotalLength] = 0;
  405. this._deflate[kBuffers] = [];
  406. }
  407. callback(null, data);
  408. });
  409. }
  410. }
  411. module.exports = PerMessageDeflate;
  412. /**
  413. * The listener of the `zlib.DeflateRaw` stream `'data'` event.
  414. *
  415. * @param {Buffer} chunk A chunk of data
  416. * @private
  417. */
  418. function deflateOnData (chunk) {
  419. this[kBuffers].push(chunk);
  420. this[kTotalLength] += chunk.length;
  421. }
  422. /**
  423. * The listener of the `zlib.InflateRaw` stream `'data'` event.
  424. *
  425. * @param {Buffer} chunk A chunk of data
  426. * @private
  427. */
  428. function inflateOnData (chunk) {
  429. this[kTotalLength] += chunk.length;
  430. if (
  431. this[kPerMessageDeflate]._maxPayload < 1 ||
  432. this[kTotalLength] <= this[kPerMessageDeflate]._maxPayload
  433. ) {
  434. this[kBuffers].push(chunk);
  435. return;
  436. }
  437. this[kError] = new RangeError('Max payload size exceeded');
  438. this[kError][constants.kStatusCode] = 1009;
  439. this.removeListener('data', inflateOnData);
  440. this.reset();
  441. }
  442. /**
  443. * The listener of the `zlib.InflateRaw` stream `'error'` event.
  444. *
  445. * @param {Error} err The emitted error
  446. * @private
  447. */
  448. function inflateOnError (err) {
  449. //
  450. // There is no need to call `Zlib#close()` as the handle is automatically
  451. // closed when an error is emitted.
  452. //
  453. this[kPerMessageDeflate]._inflate = null;
  454. err[constants.kStatusCode] = 1007;
  455. this[kCallback](err);
  456. }