permessage-deflate.js (14135B)
1 'use strict'; 2 3 const zlib = require('zlib'); 4 5 const bufferUtil = require('./buffer-util'); 6 const Limiter = require('./limiter'); 7 const { kStatusCode, NOOP } = require('./constants'); 8 9 const TRAILER = Buffer.from([0x00, 0x00, 0xff, 0xff]); 10 const kPerMessageDeflate = Symbol('permessage-deflate'); 11 const kTotalLength = Symbol('total-length'); 12 const kCallback = Symbol('callback'); 13 const kBuffers = Symbol('buffers'); 14 const kError = Symbol('error'); 15 16 // 17 // We limit zlib concurrency, which prevents severe memory fragmentation 18 // as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913 19 // and https://github.com/websockets/ws/issues/1202 20 // 21 // Intentionally global; it's the global thread pool that's an issue. 22 // 23 let zlibLimiter; 24 25 /** 26 * permessage-deflate implementation. 27 */ 28 class PerMessageDeflate { 29 /** 30 * Creates a PerMessageDeflate instance. 31 * 32 * @param {Object} options Configuration options 33 * @param {Boolean} options.serverNoContextTakeover Request/accept disabling 34 * of server context takeover 35 * @param {Boolean} options.clientNoContextTakeover Advertise/acknowledge 36 * disabling of client context takeover 37 * @param {(Boolean|Number)} options.serverMaxWindowBits Request/confirm the 38 * use of a custom server window size 39 * @param {(Boolean|Number)} options.clientMaxWindowBits Advertise support 40 * for, or request, a custom client window size 41 * @param {Object} options.zlibDeflateOptions Options to pass to zlib on deflate 42 * @param {Object} options.zlibInflateOptions Options to pass to zlib on inflate 43 * @param {Number} options.threshold Size (in bytes) below which messages 44 * should not be compressed 45 * @param {Number} options.concurrencyLimit The number of concurrent calls to 46 * zlib 47 * @param {Boolean} isServer Create the instance in either server or client 48 * mode 49 * @param {Number} maxPayload The maximum allowed message length 50 */ 51 constructor(options, isServer, maxPayload) { 52 this._maxPayload = maxPayload | 0; 53 this._options = options || {}; 54 this._threshold = 55 this._options.threshold !== undefined ? this._options.threshold : 1024; 56 this._isServer = !!isServer; 57 this._deflate = null; 58 this._inflate = null; 59 60 this.params = null; 61 62 if (!zlibLimiter) { 63 const concurrency = 64 this._options.concurrencyLimit !== undefined 65 ? this._options.concurrencyLimit 66 : 10; 67 zlibLimiter = new Limiter(concurrency); 68 } 69 } 70 71 /** 72 * @type {String} 73 */ 74 static get extensionName() { 75 return 'permessage-deflate'; 76 } 77 78 /** 79 * Create an extension negotiation offer. 80 * 81 * @return {Object} Extension parameters 82 * @public 83 */ 84 offer() { 85 const params = {}; 86 87 if (this._options.serverNoContextTakeover) { 88 params.server_no_context_takeover = true; 89 } 90 if (this._options.clientNoContextTakeover) { 91 params.client_no_context_takeover = true; 92 } 93 if (this._options.serverMaxWindowBits) { 94 params.server_max_window_bits = this._options.serverMaxWindowBits; 95 } 96 if (this._options.clientMaxWindowBits) { 97 params.client_max_window_bits = this._options.clientMaxWindowBits; 98 } else if (this._options.clientMaxWindowBits == null) { 99 params.client_max_window_bits = true; 100 } 101 102 return params; 103 } 104 105 /** 106 * Accept an extension negotiation offer/response. 107 * 108 * @param {Array} configurations The extension negotiation offers/reponse 109 * @return {Object} Accepted configuration 110 * @public 111 */ 112 accept(configurations) { 113 configurations = this.normalizeParams(configurations); 114 115 this.params = this._isServer 116 ? this.acceptAsServer(configurations) 117 : this.acceptAsClient(configurations); 118 119 return this.params; 120 } 121 122 /** 123 * Releases all resources used by the extension. 124 * 125 * @public 126 */ 127 cleanup() { 128 if (this._inflate) { 129 this._inflate.close(); 130 this._inflate = null; 131 } 132 133 if (this._deflate) { 134 const callback = this._deflate[kCallback]; 135 136 this._deflate.close(); 137 this._deflate = null; 138 139 if (callback) { 140 callback( 141 new Error( 142 'The deflate stream was closed while data was being processed' 143 ) 144 ); 145 } 146 } 147 } 148 149 /** 150 * Accept an extension negotiation offer. 151 * 152 * @param {Array} offers The extension negotiation offers 153 * @return {Object} Accepted configuration 154 * @private 155 */ 156 acceptAsServer(offers) { 157 const opts = this._options; 158 const accepted = offers.find((params) => { 159 if ( 160 (opts.serverNoContextTakeover === false && 161 params.server_no_context_takeover) || 162 (params.server_max_window_bits && 163 (opts.serverMaxWindowBits === false || 164 (typeof opts.serverMaxWindowBits === 'number' && 165 opts.serverMaxWindowBits > params.server_max_window_bits))) || 166 (typeof opts.clientMaxWindowBits === 'number' && 167 !params.client_max_window_bits) 168 ) { 169 return false; 170 } 171 172 return true; 173 }); 174 175 if (!accepted) { 176 throw new Error('None of the extension offers can be accepted'); 177 } 178 179 if (opts.serverNoContextTakeover) { 180 accepted.server_no_context_takeover = true; 181 } 182 if (opts.clientNoContextTakeover) { 183 accepted.client_no_context_takeover = true; 184 } 185 if (typeof opts.serverMaxWindowBits === 'number') { 186 accepted.server_max_window_bits = opts.serverMaxWindowBits; 187 } 188 if (typeof opts.clientMaxWindowBits === 'number') { 189 accepted.client_max_window_bits = opts.clientMaxWindowBits; 190 } else if ( 191 accepted.client_max_window_bits === true || 192 opts.clientMaxWindowBits === false 193 ) { 194 delete accepted.client_max_window_bits; 195 } 196 197 return accepted; 198 } 199 200 /** 201 * Accept the extension negotiation response. 202 * 203 * @param {Array} response The extension negotiation response 204 * @return {Object} Accepted configuration 205 * @private 206 */ 207 acceptAsClient(response) { 208 const params = response[0]; 209 210 if ( 211 this._options.clientNoContextTakeover === false && 212 params.client_no_context_takeover 213 ) { 214 throw new Error('Unexpected parameter "client_no_context_takeover"'); 215 } 216 217 if (!params.client_max_window_bits) { 218 if (typeof this._options.clientMaxWindowBits === 'number') { 219 params.client_max_window_bits = this._options.clientMaxWindowBits; 220 } 221 } else if ( 222 this._options.clientMaxWindowBits === false || 223 (typeof this._options.clientMaxWindowBits === 'number' && 224 params.client_max_window_bits > this._options.clientMaxWindowBits) 225 ) { 226 throw new Error( 227 'Unexpected or invalid parameter "client_max_window_bits"' 228 ); 229 } 230 231 return params; 232 } 233 234 /** 235 * Normalize parameters. 236 * 237 * @param {Array} configurations The extension negotiation offers/reponse 238 * @return {Array} The offers/response with normalized parameters 239 * @private 240 */ 241 normalizeParams(configurations) { 242 configurations.forEach((params) => { 243 Object.keys(params).forEach((key) => { 244 let value = params[key]; 245 246 if (value.length > 1) { 247 throw new Error(`Parameter "${key}" must have only a single value`); 248 } 249 250 value = value[0]; 251 252 if (key === 'client_max_window_bits') { 253 if (value !== true) { 254 const num = +value; 255 if (!Number.isInteger(num) || num < 8 || num > 15) { 256 throw new TypeError( 257 `Invalid value for parameter "${key}": ${value}` 258 ); 259 } 260 value = num; 261 } else if (!this._isServer) { 262 throw new TypeError( 263 `Invalid value for parameter "${key}": ${value}` 264 ); 265 } 266 } else if (key === 'server_max_window_bits') { 267 const num = +value; 268 if (!Number.isInteger(num) || num < 8 || num > 15) { 269 throw new TypeError( 270 `Invalid value for parameter "${key}": ${value}` 271 ); 272 } 273 value = num; 274 } else if ( 275 key === 'client_no_context_takeover' || 276 key === 'server_no_context_takeover' 277 ) { 278 if (value !== true) { 279 throw new TypeError( 280 `Invalid value for parameter "${key}": ${value}` 281 ); 282 } 283 } else { 284 throw new Error(`Unknown parameter "${key}"`); 285 } 286 287 params[key] = value; 288 }); 289 }); 290 291 return configurations; 292 } 293 294 /** 295 * Decompress data. Concurrency limited. 296 * 297 * @param {Buffer} data Compressed data 298 * @param {Boolean} fin Specifies whether or not this is the last fragment 299 * @param {Function} callback Callback 300 * @public 301 */ 302 decompress(data, fin, callback) { 303 zlibLimiter.add((done) => { 304 this._decompress(data, fin, (err, result) => { 305 done(); 306 callback(err, result); 307 }); 308 }); 309 } 310 311 /** 312 * Compress data. Concurrency limited. 313 * 314 * @param {Buffer} data Data to compress 315 * @param {Boolean} fin Specifies whether or not this is the last fragment 316 * @param {Function} callback Callback 317 * @public 318 */ 319 compress(data, fin, callback) { 320 zlibLimiter.add((done) => { 321 this._compress(data, fin, (err, result) => { 322 done(); 323 callback(err, result); 324 }); 325 }); 326 } 327 328 /** 329 * Decompress data. 330 * 331 * @param {Buffer} data Compressed data 332 * @param {Boolean} fin Specifies whether or not this is the last fragment 333 * @param {Function} callback Callback 334 * @private 335 */ 336 _decompress(data, fin, callback) { 337 const endpoint = this._isServer ? 'client' : 'server'; 338 339 if (!this._inflate) { 340 const key = `${endpoint}_max_window_bits`; 341 const windowBits = 342 typeof this.params[key] !== 'number' 343 ? zlib.Z_DEFAULT_WINDOWBITS 344 : this.params[key]; 345 346 this._inflate = zlib.createInflateRaw({ 347 ...this._options.zlibInflateOptions, 348 windowBits 349 }); 350 this._inflate[kPerMessageDeflate] = this; 351 this._inflate[kTotalLength] = 0; 352 this._inflate[kBuffers] = []; 353 this._inflate.on('error', inflateOnError); 354 this._inflate.on('data', inflateOnData); 355 } 356 357 this._inflate[kCallback] = callback; 358 359 this._inflate.write(data); 360 if (fin) this._inflate.write(TRAILER); 361 362 this._inflate.flush(() => { 363 const err = this._inflate[kError]; 364 365 if (err) { 366 this._inflate.close(); 367 this._inflate = null; 368 callback(err); 369 return; 370 } 371 372 const data = bufferUtil.concat( 373 this._inflate[kBuffers], 374 this._inflate[kTotalLength] 375 ); 376 377 if (fin && this.params[`${endpoint}_no_context_takeover`]) { 378 this._inflate.close(); 379 this._inflate = null; 380 } else { 381 this._inflate[kTotalLength] = 0; 382 this._inflate[kBuffers] = []; 383 } 384 385 callback(null, data); 386 }); 387 } 388 389 /** 390 * Compress data. 391 * 392 * @param {Buffer} data Data to compress 393 * @param {Boolean} fin Specifies whether or not this is the last fragment 394 * @param {Function} callback Callback 395 * @private 396 */ 397 _compress(data, fin, callback) { 398 const endpoint = this._isServer ? 'server' : 'client'; 399 400 if (!this._deflate) { 401 const key = `${endpoint}_max_window_bits`; 402 const windowBits = 403 typeof this.params[key] !== 'number' 404 ? zlib.Z_DEFAULT_WINDOWBITS 405 : this.params[key]; 406 407 this._deflate = zlib.createDeflateRaw({ 408 ...this._options.zlibDeflateOptions, 409 windowBits 410 }); 411 412 this._deflate[kTotalLength] = 0; 413 this._deflate[kBuffers] = []; 414 415 // 416 // An `'error'` event is emitted, only on Node.js < 10.0.0, if the 417 // `zlib.DeflateRaw` instance is closed while data is being processed. 418 // This can happen if `PerMessageDeflate#cleanup()` is called at the wrong 419 // time due to an abnormal WebSocket closure. 420 // 421 this._deflate.on('error', NOOP); 422 this._deflate.on('data', deflateOnData); 423 } 424 425 this._deflate[kCallback] = callback; 426 427 this._deflate.write(data); 428 this._deflate.flush(zlib.Z_SYNC_FLUSH, () => { 429 if (!this._deflate) { 430 // 431 // The deflate stream was closed while data was being processed. 432 // 433 return; 434 } 435 436 let data = bufferUtil.concat( 437 this._deflate[kBuffers], 438 this._deflate[kTotalLength] 439 ); 440 441 if (fin) data = data.slice(0, data.length - 4); 442 443 // 444 // Ensure that the callback will not be called again in 445 // `PerMessageDeflate#cleanup()`. 446 // 447 this._deflate[kCallback] = null; 448 449 if (fin && this.params[`${endpoint}_no_context_takeover`]) { 450 this._deflate.close(); 451 this._deflate = null; 452 } else { 453 this._deflate[kTotalLength] = 0; 454 this._deflate[kBuffers] = []; 455 } 456 457 callback(null, data); 458 }); 459 } 460 } 461 462 module.exports = PerMessageDeflate; 463 464 /** 465 * The listener of the `zlib.DeflateRaw` stream `'data'` event. 466 * 467 * @param {Buffer} chunk A chunk of data 468 * @private 469 */ 470 function deflateOnData(chunk) { 471 this[kBuffers].push(chunk); 472 this[kTotalLength] += chunk.length; 473 } 474 475 /** 476 * The listener of the `zlib.InflateRaw` stream `'data'` event. 477 * 478 * @param {Buffer} chunk A chunk of data 479 * @private 480 */ 481 function inflateOnData(chunk) { 482 this[kTotalLength] += chunk.length; 483 484 if ( 485 this[kPerMessageDeflate]._maxPayload < 1 || 486 this[kTotalLength] <= this[kPerMessageDeflate]._maxPayload 487 ) { 488 this[kBuffers].push(chunk); 489 return; 490 } 491 492 this[kError] = new RangeError('Max payload size exceeded'); 493 this[kError][kStatusCode] = 1009; 494 this.removeListener('data', inflateOnData); 495 this.reset(); 496 } 497 498 /** 499 * The listener of the `zlib.InflateRaw` stream `'error'` event. 500 * 501 * @param {Error} err The emitted error 502 * @private 503 */ 504 function inflateOnError(err) { 505 // 506 // There is no need to call `Zlib#close()` as the handle is automatically 507 // closed when an error is emitted. 508 // 509 this[kPerMessageDeflate]._inflate = null; 510 err[kStatusCode] = 1007; 511 this[kCallback](err); 512 }