permessage-deflate.js (14131B)
1 'use strict'; 2 3 const Limiter = require('async-limiter'); 4 const zlib = require('zlib'); 5 6 const bufferUtil = require('./buffer-util'); 7 const { kStatusCode, NOOP } = require('./constants'); 8 9 const TRAILER = Buffer.from([0x00, 0x00, 0xff, 0xff]); 10 const EMPTY_BLOCK = Buffer.from([0x00]); 11 12 const kPerMessageDeflate = Symbol('permessage-deflate'); 13 const kTotalLength = Symbol('total-length'); 14 const kCallback = Symbol('callback'); 15 const kBuffers = Symbol('buffers'); 16 const kError = Symbol('error'); 17 18 // 19 // We limit zlib concurrency, which prevents severe memory fragmentation 20 // as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913 21 // and https://github.com/websockets/ws/issues/1202 22 // 23 // Intentionally global; it's the global thread pool that's an issue. 24 // 25 let zlibLimiter; 26 27 /** 28 * permessage-deflate implementation. 29 */ 30 class PerMessageDeflate { 31 /** 32 * Creates a PerMessageDeflate instance. 33 * 34 * @param {Object} options Configuration options 35 * @param {Boolean} options.serverNoContextTakeover Request/accept disabling 36 * of server context takeover 37 * @param {Boolean} options.clientNoContextTakeover Advertise/acknowledge 38 * disabling of client context takeover 39 * @param {(Boolean|Number)} options.serverMaxWindowBits Request/confirm the 40 * use of a custom server window size 41 * @param {(Boolean|Number)} options.clientMaxWindowBits Advertise support 42 * for, or request, a custom client window size 43 * @param {Object} options.zlibDeflateOptions Options to pass to zlib on deflate 44 * @param {Object} options.zlibInflateOptions Options to pass to zlib on inflate 45 * @param {Number} options.threshold Size (in bytes) below which messages 46 * should not be compressed 47 * @param {Number} options.concurrencyLimit The number of concurrent calls to 48 * zlib 49 * @param {Boolean} isServer Create the instance in either server or client 50 * mode 51 * @param {Number} maxPayload The maximum allowed message length 52 */ 53 constructor(options, isServer, maxPayload) { 54 this._maxPayload = maxPayload | 0; 55 this._options = options || {}; 56 this._threshold = 57 this._options.threshold !== undefined ? this._options.threshold : 1024; 58 this._isServer = !!isServer; 59 this._deflate = null; 60 this._inflate = null; 61 62 this.params = null; 63 64 if (!zlibLimiter) { 65 const concurrency = 66 this._options.concurrencyLimit !== undefined 67 ? this._options.concurrencyLimit 68 : 10; 69 zlibLimiter = new Limiter({ concurrency }); 70 } 71 } 72 73 /** 74 * @type {String} 75 */ 76 static get extensionName() { 77 return 'permessage-deflate'; 78 } 79 80 /** 81 * Create an extension negotiation offer. 82 * 83 * @return {Object} Extension parameters 84 * @public 85 */ 86 offer() { 87 const params = {}; 88 89 if (this._options.serverNoContextTakeover) { 90 params.server_no_context_takeover = true; 91 } 92 if (this._options.clientNoContextTakeover) { 93 params.client_no_context_takeover = true; 94 } 95 if (this._options.serverMaxWindowBits) { 96 params.server_max_window_bits = this._options.serverMaxWindowBits; 97 } 98 if (this._options.clientMaxWindowBits) { 99 params.client_max_window_bits = this._options.clientMaxWindowBits; 100 } else if (this._options.clientMaxWindowBits == null) { 101 params.client_max_window_bits = true; 102 } 103 104 return params; 105 } 106 107 /** 108 * Accept an extension negotiation offer/response. 109 * 110 * @param {Array} configurations The extension negotiation offers/reponse 111 * @return {Object} Accepted configuration 112 * @public 113 */ 114 accept(configurations) { 115 configurations = this.normalizeParams(configurations); 116 117 this.params = this._isServer 118 ? this.acceptAsServer(configurations) 119 : this.acceptAsClient(configurations); 120 121 return this.params; 122 } 123 124 /** 125 * Releases all resources used by the extension. 126 * 127 * @public 128 */ 129 cleanup() { 130 if (this._inflate) { 131 this._inflate.close(); 132 this._inflate = null; 133 } 134 135 if (this._deflate) { 136 this._deflate.close(); 137 this._deflate = null; 138 } 139 } 140 141 /** 142 * Accept an extension negotiation offer. 143 * 144 * @param {Array} offers The extension negotiation offers 145 * @return {Object} Accepted configuration 146 * @private 147 */ 148 acceptAsServer(offers) { 149 const opts = this._options; 150 const accepted = offers.find((params) => { 151 if ( 152 (opts.serverNoContextTakeover === false && 153 params.server_no_context_takeover) || 154 (params.server_max_window_bits && 155 (opts.serverMaxWindowBits === false || 156 (typeof opts.serverMaxWindowBits === 'number' && 157 opts.serverMaxWindowBits > params.server_max_window_bits))) || 158 (typeof opts.clientMaxWindowBits === 'number' && 159 !params.client_max_window_bits) 160 ) { 161 return false; 162 } 163 164 return true; 165 }); 166 167 if (!accepted) { 168 throw new Error('None of the extension offers can be accepted'); 169 } 170 171 if (opts.serverNoContextTakeover) { 172 accepted.server_no_context_takeover = true; 173 } 174 if (opts.clientNoContextTakeover) { 175 accepted.client_no_context_takeover = true; 176 } 177 if (typeof opts.serverMaxWindowBits === 'number') { 178 accepted.server_max_window_bits = opts.serverMaxWindowBits; 179 } 180 if (typeof opts.clientMaxWindowBits === 'number') { 181 accepted.client_max_window_bits = opts.clientMaxWindowBits; 182 } else if ( 183 accepted.client_max_window_bits === true || 184 opts.clientMaxWindowBits === false 185 ) { 186 delete accepted.client_max_window_bits; 187 } 188 189 return accepted; 190 } 191 192 /** 193 * Accept the extension negotiation response. 194 * 195 * @param {Array} response The extension negotiation response 196 * @return {Object} Accepted configuration 197 * @private 198 */ 199 acceptAsClient(response) { 200 const params = response[0]; 201 202 if ( 203 this._options.clientNoContextTakeover === false && 204 params.client_no_context_takeover 205 ) { 206 throw new Error('Unexpected parameter "client_no_context_takeover"'); 207 } 208 209 if (!params.client_max_window_bits) { 210 if (typeof this._options.clientMaxWindowBits === 'number') { 211 params.client_max_window_bits = this._options.clientMaxWindowBits; 212 } 213 } else if ( 214 this._options.clientMaxWindowBits === false || 215 (typeof this._options.clientMaxWindowBits === 'number' && 216 params.client_max_window_bits > this._options.clientMaxWindowBits) 217 ) { 218 throw new Error( 219 'Unexpected or invalid parameter "client_max_window_bits"' 220 ); 221 } 222 223 return params; 224 } 225 226 /** 227 * Normalize parameters. 228 * 229 * @param {Array} configurations The extension negotiation offers/reponse 230 * @return {Array} The offers/response with normalized parameters 231 * @private 232 */ 233 normalizeParams(configurations) { 234 configurations.forEach((params) => { 235 Object.keys(params).forEach((key) => { 236 var value = params[key]; 237 238 if (value.length > 1) { 239 throw new Error(`Parameter "${key}" must have only a single value`); 240 } 241 242 value = value[0]; 243 244 if (key === 'client_max_window_bits') { 245 if (value !== true) { 246 const num = +value; 247 if (!Number.isInteger(num) || num < 8 || num > 15) { 248 throw new TypeError( 249 `Invalid value for parameter "${key}": ${value}` 250 ); 251 } 252 value = num; 253 } else if (!this._isServer) { 254 throw new TypeError( 255 `Invalid value for parameter "${key}": ${value}` 256 ); 257 } 258 } else if (key === 'server_max_window_bits') { 259 const num = +value; 260 if (!Number.isInteger(num) || num < 8 || num > 15) { 261 throw new TypeError( 262 `Invalid value for parameter "${key}": ${value}` 263 ); 264 } 265 value = num; 266 } else if ( 267 key === 'client_no_context_takeover' || 268 key === 'server_no_context_takeover' 269 ) { 270 if (value !== true) { 271 throw new TypeError( 272 `Invalid value for parameter "${key}": ${value}` 273 ); 274 } 275 } else { 276 throw new Error(`Unknown parameter "${key}"`); 277 } 278 279 params[key] = value; 280 }); 281 }); 282 283 return configurations; 284 } 285 286 /** 287 * Decompress data. Concurrency limited by async-limiter. 288 * 289 * @param {Buffer} data Compressed data 290 * @param {Boolean} fin Specifies whether or not this is the last fragment 291 * @param {Function} callback Callback 292 * @public 293 */ 294 decompress(data, fin, callback) { 295 zlibLimiter.push((done) => { 296 this._decompress(data, fin, (err, result) => { 297 done(); 298 callback(err, result); 299 }); 300 }); 301 } 302 303 /** 304 * Compress data. Concurrency limited by async-limiter. 305 * 306 * @param {Buffer} data Data to compress 307 * @param {Boolean} fin Specifies whether or not this is the last fragment 308 * @param {Function} callback Callback 309 * @public 310 */ 311 compress(data, fin, callback) { 312 zlibLimiter.push((done) => { 313 this._compress(data, fin, (err, result) => { 314 done(); 315 callback(err, result); 316 }); 317 }); 318 } 319 320 /** 321 * Decompress data. 322 * 323 * @param {Buffer} data Compressed data 324 * @param {Boolean} fin Specifies whether or not this is the last fragment 325 * @param {Function} callback Callback 326 * @private 327 */ 328 _decompress(data, fin, callback) { 329 const endpoint = this._isServer ? 'client' : 'server'; 330 331 if (!this._inflate) { 332 const key = `${endpoint}_max_window_bits`; 333 const windowBits = 334 typeof this.params[key] !== 'number' 335 ? zlib.Z_DEFAULT_WINDOWBITS 336 : this.params[key]; 337 338 this._inflate = zlib.createInflateRaw( 339 Object.assign({}, this._options.zlibInflateOptions, { windowBits }) 340 ); 341 this._inflate[kPerMessageDeflate] = this; 342 this._inflate[kTotalLength] = 0; 343 this._inflate[kBuffers] = []; 344 this._inflate.on('error', inflateOnError); 345 this._inflate.on('data', inflateOnData); 346 } 347 348 this._inflate[kCallback] = callback; 349 350 this._inflate.write(data); 351 if (fin) this._inflate.write(TRAILER); 352 353 this._inflate.flush(() => { 354 const err = this._inflate[kError]; 355 356 if (err) { 357 this._inflate.close(); 358 this._inflate = null; 359 callback(err); 360 return; 361 } 362 363 const data = bufferUtil.concat( 364 this._inflate[kBuffers], 365 this._inflate[kTotalLength] 366 ); 367 368 if (fin && this.params[`${endpoint}_no_context_takeover`]) { 369 this._inflate.close(); 370 this._inflate = null; 371 } else { 372 this._inflate[kTotalLength] = 0; 373 this._inflate[kBuffers] = []; 374 } 375 376 callback(null, data); 377 }); 378 } 379 380 /** 381 * Compress data. 382 * 383 * @param {Buffer} data Data to compress 384 * @param {Boolean} fin Specifies whether or not this is the last fragment 385 * @param {Function} callback Callback 386 * @private 387 */ 388 _compress(data, fin, callback) { 389 if (!data || data.length === 0) { 390 process.nextTick(callback, null, EMPTY_BLOCK); 391 return; 392 } 393 394 const endpoint = this._isServer ? 'server' : 'client'; 395 396 if (!this._deflate) { 397 const key = `${endpoint}_max_window_bits`; 398 const windowBits = 399 typeof this.params[key] !== 'number' 400 ? zlib.Z_DEFAULT_WINDOWBITS 401 : this.params[key]; 402 403 this._deflate = zlib.createDeflateRaw( 404 Object.assign({}, this._options.zlibDeflateOptions, { windowBits }) 405 ); 406 407 this._deflate[kTotalLength] = 0; 408 this._deflate[kBuffers] = []; 409 410 // 411 // An `'error'` event is emitted, only on Node.js < 10.0.0, if the 412 // `zlib.DeflateRaw` instance is closed while data is being processed. 413 // This can happen if `PerMessageDeflate#cleanup()` is called at the wrong 414 // time due to an abnormal WebSocket closure. 415 // 416 this._deflate.on('error', NOOP); 417 this._deflate.on('data', deflateOnData); 418 } 419 420 this._deflate.write(data); 421 this._deflate.flush(zlib.Z_SYNC_FLUSH, () => { 422 if (!this._deflate) { 423 // 424 // This `if` statement is only needed for Node.js < 10.0.0 because as of 425 // commit https://github.com/nodejs/node/commit/5e3f5164, the flush 426 // callback is no longer called if the deflate stream is closed while 427 // data is being processed. 428 // 429 return; 430 } 431 432 var data = bufferUtil.concat( 433 this._deflate[kBuffers], 434 this._deflate[kTotalLength] 435 ); 436 437 if (fin) data = data.slice(0, data.length - 4); 438 439 if (fin && this.params[`${endpoint}_no_context_takeover`]) { 440 this._deflate.close(); 441 this._deflate = null; 442 } else { 443 this._deflate[kTotalLength] = 0; 444 this._deflate[kBuffers] = []; 445 } 446 447 callback(null, data); 448 }); 449 } 450 } 451 452 module.exports = PerMessageDeflate; 453 454 /** 455 * The listener of the `zlib.DeflateRaw` stream `'data'` event. 456 * 457 * @param {Buffer} chunk A chunk of data 458 * @private 459 */ 460 function deflateOnData(chunk) { 461 this[kBuffers].push(chunk); 462 this[kTotalLength] += chunk.length; 463 } 464 465 /** 466 * The listener of the `zlib.InflateRaw` stream `'data'` event. 467 * 468 * @param {Buffer} chunk A chunk of data 469 * @private 470 */ 471 function inflateOnData(chunk) { 472 this[kTotalLength] += chunk.length; 473 474 if ( 475 this[kPerMessageDeflate]._maxPayload < 1 || 476 this[kTotalLength] <= this[kPerMessageDeflate]._maxPayload 477 ) { 478 this[kBuffers].push(chunk); 479 return; 480 } 481 482 this[kError] = new RangeError('Max payload size exceeded'); 483 this[kError][kStatusCode] = 1009; 484 this.removeListener('data', inflateOnData); 485 this.reset(); 486 } 487 488 /** 489 * The listener of the `zlib.InflateRaw` stream `'error'` event. 490 * 491 * @param {Error} err The emitted error 492 * @private 493 */ 494 function inflateOnError(err) { 495 // 496 // There is no need to call `Zlib#close()` as the handle is automatically 497 // closed when an error is emitted. 498 // 499 this[kPerMessageDeflate]._inflate = null; 500 err[kStatusCode] = 1007; 501 this[kCallback](err); 502 }