twitst4tz

twitter statistics web application
Log | Files | Refs | README | LICENSE

permessage-deflate.js (14135B)


      1 'use strict';
      2 
      3 const zlib = require('zlib');
      4 
      5 const bufferUtil = require('./buffer-util');
      6 const Limiter = require('./limiter');
      7 const { kStatusCode, NOOP } = require('./constants');
      8 
      9 const TRAILER = Buffer.from([0x00, 0x00, 0xff, 0xff]);
     10 const kPerMessageDeflate = Symbol('permessage-deflate');
     11 const kTotalLength = Symbol('total-length');
     12 const kCallback = Symbol('callback');
     13 const kBuffers = Symbol('buffers');
     14 const kError = Symbol('error');
     15 
     16 //
     17 // We limit zlib concurrency, which prevents severe memory fragmentation
     18 // as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913
     19 // and https://github.com/websockets/ws/issues/1202
     20 //
     21 // Intentionally global; it's the global thread pool that's an issue.
     22 //
     23 let zlibLimiter;
     24 
     25 /**
     26  * permessage-deflate implementation.
     27  */
     28 class PerMessageDeflate {
     29   /**
     30    * Creates a PerMessageDeflate instance.
     31    *
     32    * @param {Object} options Configuration options
     33    * @param {Boolean} options.serverNoContextTakeover Request/accept disabling
     34    *     of server context takeover
     35    * @param {Boolean} options.clientNoContextTakeover Advertise/acknowledge
     36    *     disabling of client context takeover
     37    * @param {(Boolean|Number)} options.serverMaxWindowBits Request/confirm the
     38    *     use of a custom server window size
     39    * @param {(Boolean|Number)} options.clientMaxWindowBits Advertise support
     40    *     for, or request, a custom client window size
     41    * @param {Object} options.zlibDeflateOptions Options to pass to zlib on deflate
     42    * @param {Object} options.zlibInflateOptions Options to pass to zlib on inflate
     43    * @param {Number} options.threshold Size (in bytes) below which messages
     44    *     should not be compressed
     45    * @param {Number} options.concurrencyLimit The number of concurrent calls to
     46    *     zlib
     47    * @param {Boolean} isServer Create the instance in either server or client
     48    *     mode
     49    * @param {Number} maxPayload The maximum allowed message length
     50    */
     51   constructor(options, isServer, maxPayload) {
     52     this._maxPayload = maxPayload | 0;
     53     this._options = options || {};
     54     this._threshold =
     55       this._options.threshold !== undefined ? this._options.threshold : 1024;
     56     this._isServer = !!isServer;
     57     this._deflate = null;
     58     this._inflate = null;
     59 
     60     this.params = null;
     61 
     62     if (!zlibLimiter) {
     63       const concurrency =
     64         this._options.concurrencyLimit !== undefined
     65           ? this._options.concurrencyLimit
     66           : 10;
     67       zlibLimiter = new Limiter(concurrency);
     68     }
     69   }
     70 
     71   /**
     72    * @type {String}
     73    */
     74   static get extensionName() {
     75     return 'permessage-deflate';
     76   }
     77 
     78   /**
     79    * Create an extension negotiation offer.
     80    *
     81    * @return {Object} Extension parameters
     82    * @public
     83    */
     84   offer() {
     85     const params = {};
     86 
     87     if (this._options.serverNoContextTakeover) {
     88       params.server_no_context_takeover = true;
     89     }
     90     if (this._options.clientNoContextTakeover) {
     91       params.client_no_context_takeover = true;
     92     }
     93     if (this._options.serverMaxWindowBits) {
     94       params.server_max_window_bits = this._options.serverMaxWindowBits;
     95     }
     96     if (this._options.clientMaxWindowBits) {
     97       params.client_max_window_bits = this._options.clientMaxWindowBits;
     98     } else if (this._options.clientMaxWindowBits == null) {
     99       params.client_max_window_bits = true;
    100     }
    101 
    102     return params;
    103   }
    104 
    105   /**
    106    * Accept an extension negotiation offer/response.
    107    *
    108    * @param {Array} configurations The extension negotiation offers/reponse
    109    * @return {Object} Accepted configuration
    110    * @public
    111    */
    112   accept(configurations) {
    113     configurations = this.normalizeParams(configurations);
    114 
    115     this.params = this._isServer
    116       ? this.acceptAsServer(configurations)
    117       : this.acceptAsClient(configurations);
    118 
    119     return this.params;
    120   }
    121 
    122   /**
    123    * Releases all resources used by the extension.
    124    *
    125    * @public
    126    */
    127   cleanup() {
    128     if (this._inflate) {
    129       this._inflate.close();
    130       this._inflate = null;
    131     }
    132 
    133     if (this._deflate) {
    134       const callback = this._deflate[kCallback];
    135 
    136       this._deflate.close();
    137       this._deflate = null;
    138 
    139       if (callback) {
    140         callback(
    141           new Error(
    142             'The deflate stream was closed while data was being processed'
    143           )
    144         );
    145       }
    146     }
    147   }
    148 
    149   /**
    150    *  Accept an extension negotiation offer.
    151    *
    152    * @param {Array} offers The extension negotiation offers
    153    * @return {Object} Accepted configuration
    154    * @private
    155    */
    156   acceptAsServer(offers) {
    157     const opts = this._options;
    158     const accepted = offers.find((params) => {
    159       if (
    160         (opts.serverNoContextTakeover === false &&
    161           params.server_no_context_takeover) ||
    162         (params.server_max_window_bits &&
    163           (opts.serverMaxWindowBits === false ||
    164             (typeof opts.serverMaxWindowBits === 'number' &&
    165               opts.serverMaxWindowBits > params.server_max_window_bits))) ||
    166         (typeof opts.clientMaxWindowBits === 'number' &&
    167           !params.client_max_window_bits)
    168       ) {
    169         return false;
    170       }
    171 
    172       return true;
    173     });
    174 
    175     if (!accepted) {
    176       throw new Error('None of the extension offers can be accepted');
    177     }
    178 
    179     if (opts.serverNoContextTakeover) {
    180       accepted.server_no_context_takeover = true;
    181     }
    182     if (opts.clientNoContextTakeover) {
    183       accepted.client_no_context_takeover = true;
    184     }
    185     if (typeof opts.serverMaxWindowBits === 'number') {
    186       accepted.server_max_window_bits = opts.serverMaxWindowBits;
    187     }
    188     if (typeof opts.clientMaxWindowBits === 'number') {
    189       accepted.client_max_window_bits = opts.clientMaxWindowBits;
    190     } else if (
    191       accepted.client_max_window_bits === true ||
    192       opts.clientMaxWindowBits === false
    193     ) {
    194       delete accepted.client_max_window_bits;
    195     }
    196 
    197     return accepted;
    198   }
    199 
    200   /**
    201    * Accept the extension negotiation response.
    202    *
    203    * @param {Array} response The extension negotiation response
    204    * @return {Object} Accepted configuration
    205    * @private
    206    */
    207   acceptAsClient(response) {
    208     const params = response[0];
    209 
    210     if (
    211       this._options.clientNoContextTakeover === false &&
    212       params.client_no_context_takeover
    213     ) {
    214       throw new Error('Unexpected parameter "client_no_context_takeover"');
    215     }
    216 
    217     if (!params.client_max_window_bits) {
    218       if (typeof this._options.clientMaxWindowBits === 'number') {
    219         params.client_max_window_bits = this._options.clientMaxWindowBits;
    220       }
    221     } else if (
    222       this._options.clientMaxWindowBits === false ||
    223       (typeof this._options.clientMaxWindowBits === 'number' &&
    224         params.client_max_window_bits > this._options.clientMaxWindowBits)
    225     ) {
    226       throw new Error(
    227         'Unexpected or invalid parameter "client_max_window_bits"'
    228       );
    229     }
    230 
    231     return params;
    232   }
    233 
    234   /**
    235    * Normalize parameters.
    236    *
    237    * @param {Array} configurations The extension negotiation offers/reponse
    238    * @return {Array} The offers/response with normalized parameters
    239    * @private
    240    */
    241   normalizeParams(configurations) {
    242     configurations.forEach((params) => {
    243       Object.keys(params).forEach((key) => {
    244         let value = params[key];
    245 
    246         if (value.length > 1) {
    247           throw new Error(`Parameter "${key}" must have only a single value`);
    248         }
    249 
    250         value = value[0];
    251 
    252         if (key === 'client_max_window_bits') {
    253           if (value !== true) {
    254             const num = +value;
    255             if (!Number.isInteger(num) || num < 8 || num > 15) {
    256               throw new TypeError(
    257                 `Invalid value for parameter "${key}": ${value}`
    258               );
    259             }
    260             value = num;
    261           } else if (!this._isServer) {
    262             throw new TypeError(
    263               `Invalid value for parameter "${key}": ${value}`
    264             );
    265           }
    266         } else if (key === 'server_max_window_bits') {
    267           const num = +value;
    268           if (!Number.isInteger(num) || num < 8 || num > 15) {
    269             throw new TypeError(
    270               `Invalid value for parameter "${key}": ${value}`
    271             );
    272           }
    273           value = num;
    274         } else if (
    275           key === 'client_no_context_takeover' ||
    276           key === 'server_no_context_takeover'
    277         ) {
    278           if (value !== true) {
    279             throw new TypeError(
    280               `Invalid value for parameter "${key}": ${value}`
    281             );
    282           }
    283         } else {
    284           throw new Error(`Unknown parameter "${key}"`);
    285         }
    286 
    287         params[key] = value;
    288       });
    289     });
    290 
    291     return configurations;
    292   }
    293 
    294   /**
    295    * Decompress data. Concurrency limited.
    296    *
    297    * @param {Buffer} data Compressed data
    298    * @param {Boolean} fin Specifies whether or not this is the last fragment
    299    * @param {Function} callback Callback
    300    * @public
    301    */
    302   decompress(data, fin, callback) {
    303     zlibLimiter.add((done) => {
    304       this._decompress(data, fin, (err, result) => {
    305         done();
    306         callback(err, result);
    307       });
    308     });
    309   }
    310 
    311   /**
    312    * Compress data. Concurrency limited.
    313    *
    314    * @param {Buffer} data Data to compress
    315    * @param {Boolean} fin Specifies whether or not this is the last fragment
    316    * @param {Function} callback Callback
    317    * @public
    318    */
    319   compress(data, fin, callback) {
    320     zlibLimiter.add((done) => {
    321       this._compress(data, fin, (err, result) => {
    322         done();
    323         callback(err, result);
    324       });
    325     });
    326   }
    327 
    328   /**
    329    * Decompress data.
    330    *
    331    * @param {Buffer} data Compressed data
    332    * @param {Boolean} fin Specifies whether or not this is the last fragment
    333    * @param {Function} callback Callback
    334    * @private
    335    */
    336   _decompress(data, fin, callback) {
    337     const endpoint = this._isServer ? 'client' : 'server';
    338 
    339     if (!this._inflate) {
    340       const key = `${endpoint}_max_window_bits`;
    341       const windowBits =
    342         typeof this.params[key] !== 'number'
    343           ? zlib.Z_DEFAULT_WINDOWBITS
    344           : this.params[key];
    345 
    346       this._inflate = zlib.createInflateRaw({
    347         ...this._options.zlibInflateOptions,
    348         windowBits
    349       });
    350       this._inflate[kPerMessageDeflate] = this;
    351       this._inflate[kTotalLength] = 0;
    352       this._inflate[kBuffers] = [];
    353       this._inflate.on('error', inflateOnError);
    354       this._inflate.on('data', inflateOnData);
    355     }
    356 
    357     this._inflate[kCallback] = callback;
    358 
    359     this._inflate.write(data);
    360     if (fin) this._inflate.write(TRAILER);
    361 
    362     this._inflate.flush(() => {
    363       const err = this._inflate[kError];
    364 
    365       if (err) {
    366         this._inflate.close();
    367         this._inflate = null;
    368         callback(err);
    369         return;
    370       }
    371 
    372       const data = bufferUtil.concat(
    373         this._inflate[kBuffers],
    374         this._inflate[kTotalLength]
    375       );
    376 
    377       if (fin && this.params[`${endpoint}_no_context_takeover`]) {
    378         this._inflate.close();
    379         this._inflate = null;
    380       } else {
    381         this._inflate[kTotalLength] = 0;
    382         this._inflate[kBuffers] = [];
    383       }
    384 
    385       callback(null, data);
    386     });
    387   }
    388 
    389   /**
    390    * Compress data.
    391    *
    392    * @param {Buffer} data Data to compress
    393    * @param {Boolean} fin Specifies whether or not this is the last fragment
    394    * @param {Function} callback Callback
    395    * @private
    396    */
    397   _compress(data, fin, callback) {
    398     const endpoint = this._isServer ? 'server' : 'client';
    399 
    400     if (!this._deflate) {
    401       const key = `${endpoint}_max_window_bits`;
    402       const windowBits =
    403         typeof this.params[key] !== 'number'
    404           ? zlib.Z_DEFAULT_WINDOWBITS
    405           : this.params[key];
    406 
    407       this._deflate = zlib.createDeflateRaw({
    408         ...this._options.zlibDeflateOptions,
    409         windowBits
    410       });
    411 
    412       this._deflate[kTotalLength] = 0;
    413       this._deflate[kBuffers] = [];
    414 
    415       //
    416       // An `'error'` event is emitted, only on Node.js < 10.0.0, if the
    417       // `zlib.DeflateRaw` instance is closed while data is being processed.
    418       // This can happen if `PerMessageDeflate#cleanup()` is called at the wrong
    419       // time due to an abnormal WebSocket closure.
    420       //
    421       this._deflate.on('error', NOOP);
    422       this._deflate.on('data', deflateOnData);
    423     }
    424 
    425     this._deflate[kCallback] = callback;
    426 
    427     this._deflate.write(data);
    428     this._deflate.flush(zlib.Z_SYNC_FLUSH, () => {
    429       if (!this._deflate) {
    430         //
    431         // The deflate stream was closed while data was being processed.
    432         //
    433         return;
    434       }
    435 
    436       let data = bufferUtil.concat(
    437         this._deflate[kBuffers],
    438         this._deflate[kTotalLength]
    439       );
    440 
    441       if (fin) data = data.slice(0, data.length - 4);
    442 
    443       //
    444       // Ensure that the callback will not be called again in
    445       // `PerMessageDeflate#cleanup()`.
    446       //
    447       this._deflate[kCallback] = null;
    448 
    449       if (fin && this.params[`${endpoint}_no_context_takeover`]) {
    450         this._deflate.close();
    451         this._deflate = null;
    452       } else {
    453         this._deflate[kTotalLength] = 0;
    454         this._deflate[kBuffers] = [];
    455       }
    456 
    457       callback(null, data);
    458     });
    459   }
    460 }
    461 
    462 module.exports = PerMessageDeflate;
    463 
    464 /**
    465  * The listener of the `zlib.DeflateRaw` stream `'data'` event.
    466  *
    467  * @param {Buffer} chunk A chunk of data
    468  * @private
    469  */
    470 function deflateOnData(chunk) {
    471   this[kBuffers].push(chunk);
    472   this[kTotalLength] += chunk.length;
    473 }
    474 
    475 /**
    476  * The listener of the `zlib.InflateRaw` stream `'data'` event.
    477  *
    478  * @param {Buffer} chunk A chunk of data
    479  * @private
    480  */
    481 function inflateOnData(chunk) {
    482   this[kTotalLength] += chunk.length;
    483 
    484   if (
    485     this[kPerMessageDeflate]._maxPayload < 1 ||
    486     this[kTotalLength] <= this[kPerMessageDeflate]._maxPayload
    487   ) {
    488     this[kBuffers].push(chunk);
    489     return;
    490   }
    491 
    492   this[kError] = new RangeError('Max payload size exceeded');
    493   this[kError][kStatusCode] = 1009;
    494   this.removeListener('data', inflateOnData);
    495   this.reset();
    496 }
    497 
    498 /**
    499  * The listener of the `zlib.InflateRaw` stream `'error'` event.
    500  *
    501  * @param {Error} err The emitted error
    502  * @private
    503  */
    504 function inflateOnError(err) {
    505   //
    506   // There is no need to call `Zlib#close()` as the handle is automatically
    507   // closed when an error is emitted.
    508   //
    509   this[kPerMessageDeflate]._inflate = null;
    510   err[kStatusCode] = 1007;
    511   this[kCallback](err);
    512 }