twitst4tz

twitter statistics web application
Log | Files | Refs | README | LICENSE

permessage-deflate.js (14131B)


      1 'use strict';
      2 
      3 const Limiter = require('async-limiter');
      4 const zlib = require('zlib');
      5 
      6 const bufferUtil = require('./buffer-util');
      7 const { kStatusCode, NOOP } = require('./constants');
      8 
      9 const TRAILER = Buffer.from([0x00, 0x00, 0xff, 0xff]);
     10 const EMPTY_BLOCK = Buffer.from([0x00]);
     11 
     12 const kPerMessageDeflate = Symbol('permessage-deflate');
     13 const kTotalLength = Symbol('total-length');
     14 const kCallback = Symbol('callback');
     15 const kBuffers = Symbol('buffers');
     16 const kError = Symbol('error');
     17 
     18 //
     19 // We limit zlib concurrency, which prevents severe memory fragmentation
     20 // as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913
     21 // and https://github.com/websockets/ws/issues/1202
     22 //
     23 // Intentionally global; it's the global thread pool that's an issue.
     24 //
     25 let zlibLimiter;
     26 
     27 /**
     28  * permessage-deflate implementation.
     29  */
     30 class PerMessageDeflate {
     31   /**
     32    * Creates a PerMessageDeflate instance.
     33    *
     34    * @param {Object} options Configuration options
     35    * @param {Boolean} options.serverNoContextTakeover Request/accept disabling
     36    *     of server context takeover
     37    * @param {Boolean} options.clientNoContextTakeover Advertise/acknowledge
     38    *     disabling of client context takeover
     39    * @param {(Boolean|Number)} options.serverMaxWindowBits Request/confirm the
     40    *     use of a custom server window size
     41    * @param {(Boolean|Number)} options.clientMaxWindowBits Advertise support
     42    *     for, or request, a custom client window size
     43    * @param {Object} options.zlibDeflateOptions Options to pass to zlib on deflate
     44    * @param {Object} options.zlibInflateOptions Options to pass to zlib on inflate
     45    * @param {Number} options.threshold Size (in bytes) below which messages
     46    *     should not be compressed
     47    * @param {Number} options.concurrencyLimit The number of concurrent calls to
     48    *     zlib
     49    * @param {Boolean} isServer Create the instance in either server or client
     50    *     mode
     51    * @param {Number} maxPayload The maximum allowed message length
     52    */
     53   constructor(options, isServer, maxPayload) {
     54     this._maxPayload = maxPayload | 0;
     55     this._options = options || {};
     56     this._threshold =
     57       this._options.threshold !== undefined ? this._options.threshold : 1024;
     58     this._isServer = !!isServer;
     59     this._deflate = null;
     60     this._inflate = null;
     61 
     62     this.params = null;
     63 
     64     if (!zlibLimiter) {
     65       const concurrency =
     66         this._options.concurrencyLimit !== undefined
     67           ? this._options.concurrencyLimit
     68           : 10;
     69       zlibLimiter = new Limiter({ concurrency });
     70     }
     71   }
     72 
     73   /**
     74    * @type {String}
     75    */
     76   static get extensionName() {
     77     return 'permessage-deflate';
     78   }
     79 
     80   /**
     81    * Create an extension negotiation offer.
     82    *
     83    * @return {Object} Extension parameters
     84    * @public
     85    */
     86   offer() {
     87     const params = {};
     88 
     89     if (this._options.serverNoContextTakeover) {
     90       params.server_no_context_takeover = true;
     91     }
     92     if (this._options.clientNoContextTakeover) {
     93       params.client_no_context_takeover = true;
     94     }
     95     if (this._options.serverMaxWindowBits) {
     96       params.server_max_window_bits = this._options.serverMaxWindowBits;
     97     }
     98     if (this._options.clientMaxWindowBits) {
     99       params.client_max_window_bits = this._options.clientMaxWindowBits;
    100     } else if (this._options.clientMaxWindowBits == null) {
    101       params.client_max_window_bits = true;
    102     }
    103 
    104     return params;
    105   }
    106 
    107   /**
    108    * Accept an extension negotiation offer/response.
    109    *
    110    * @param {Array} configurations The extension negotiation offers/reponse
    111    * @return {Object} Accepted configuration
    112    * @public
    113    */
    114   accept(configurations) {
    115     configurations = this.normalizeParams(configurations);
    116 
    117     this.params = this._isServer
    118       ? this.acceptAsServer(configurations)
    119       : this.acceptAsClient(configurations);
    120 
    121     return this.params;
    122   }
    123 
    124   /**
    125    * Releases all resources used by the extension.
    126    *
    127    * @public
    128    */
    129   cleanup() {
    130     if (this._inflate) {
    131       this._inflate.close();
    132       this._inflate = null;
    133     }
    134 
    135     if (this._deflate) {
    136       this._deflate.close();
    137       this._deflate = null;
    138     }
    139   }
    140 
    141   /**
    142    *  Accept an extension negotiation offer.
    143    *
    144    * @param {Array} offers The extension negotiation offers
    145    * @return {Object} Accepted configuration
    146    * @private
    147    */
    148   acceptAsServer(offers) {
    149     const opts = this._options;
    150     const accepted = offers.find((params) => {
    151       if (
    152         (opts.serverNoContextTakeover === false &&
    153           params.server_no_context_takeover) ||
    154         (params.server_max_window_bits &&
    155           (opts.serverMaxWindowBits === false ||
    156             (typeof opts.serverMaxWindowBits === 'number' &&
    157               opts.serverMaxWindowBits > params.server_max_window_bits))) ||
    158         (typeof opts.clientMaxWindowBits === 'number' &&
    159           !params.client_max_window_bits)
    160       ) {
    161         return false;
    162       }
    163 
    164       return true;
    165     });
    166 
    167     if (!accepted) {
    168       throw new Error('None of the extension offers can be accepted');
    169     }
    170 
    171     if (opts.serverNoContextTakeover) {
    172       accepted.server_no_context_takeover = true;
    173     }
    174     if (opts.clientNoContextTakeover) {
    175       accepted.client_no_context_takeover = true;
    176     }
    177     if (typeof opts.serverMaxWindowBits === 'number') {
    178       accepted.server_max_window_bits = opts.serverMaxWindowBits;
    179     }
    180     if (typeof opts.clientMaxWindowBits === 'number') {
    181       accepted.client_max_window_bits = opts.clientMaxWindowBits;
    182     } else if (
    183       accepted.client_max_window_bits === true ||
    184       opts.clientMaxWindowBits === false
    185     ) {
    186       delete accepted.client_max_window_bits;
    187     }
    188 
    189     return accepted;
    190   }
    191 
    192   /**
    193    * Accept the extension negotiation response.
    194    *
    195    * @param {Array} response The extension negotiation response
    196    * @return {Object} Accepted configuration
    197    * @private
    198    */
    199   acceptAsClient(response) {
    200     const params = response[0];
    201 
    202     if (
    203       this._options.clientNoContextTakeover === false &&
    204       params.client_no_context_takeover
    205     ) {
    206       throw new Error('Unexpected parameter "client_no_context_takeover"');
    207     }
    208 
    209     if (!params.client_max_window_bits) {
    210       if (typeof this._options.clientMaxWindowBits === 'number') {
    211         params.client_max_window_bits = this._options.clientMaxWindowBits;
    212       }
    213     } else if (
    214       this._options.clientMaxWindowBits === false ||
    215       (typeof this._options.clientMaxWindowBits === 'number' &&
    216         params.client_max_window_bits > this._options.clientMaxWindowBits)
    217     ) {
    218       throw new Error(
    219         'Unexpected or invalid parameter "client_max_window_bits"'
    220       );
    221     }
    222 
    223     return params;
    224   }
    225 
    226   /**
    227    * Normalize parameters.
    228    *
    229    * @param {Array} configurations The extension negotiation offers/reponse
    230    * @return {Array} The offers/response with normalized parameters
    231    * @private
    232    */
    233   normalizeParams(configurations) {
    234     configurations.forEach((params) => {
    235       Object.keys(params).forEach((key) => {
    236         var value = params[key];
    237 
    238         if (value.length > 1) {
    239           throw new Error(`Parameter "${key}" must have only a single value`);
    240         }
    241 
    242         value = value[0];
    243 
    244         if (key === 'client_max_window_bits') {
    245           if (value !== true) {
    246             const num = +value;
    247             if (!Number.isInteger(num) || num < 8 || num > 15) {
    248               throw new TypeError(
    249                 `Invalid value for parameter "${key}": ${value}`
    250               );
    251             }
    252             value = num;
    253           } else if (!this._isServer) {
    254             throw new TypeError(
    255               `Invalid value for parameter "${key}": ${value}`
    256             );
    257           }
    258         } else if (key === 'server_max_window_bits') {
    259           const num = +value;
    260           if (!Number.isInteger(num) || num < 8 || num > 15) {
    261             throw new TypeError(
    262               `Invalid value for parameter "${key}": ${value}`
    263             );
    264           }
    265           value = num;
    266         } else if (
    267           key === 'client_no_context_takeover' ||
    268           key === 'server_no_context_takeover'
    269         ) {
    270           if (value !== true) {
    271             throw new TypeError(
    272               `Invalid value for parameter "${key}": ${value}`
    273             );
    274           }
    275         } else {
    276           throw new Error(`Unknown parameter "${key}"`);
    277         }
    278 
    279         params[key] = value;
    280       });
    281     });
    282 
    283     return configurations;
    284   }
    285 
    286   /**
    287    * Decompress data. Concurrency limited by async-limiter.
    288    *
    289    * @param {Buffer} data Compressed data
    290    * @param {Boolean} fin Specifies whether or not this is the last fragment
    291    * @param {Function} callback Callback
    292    * @public
    293    */
    294   decompress(data, fin, callback) {
    295     zlibLimiter.push((done) => {
    296       this._decompress(data, fin, (err, result) => {
    297         done();
    298         callback(err, result);
    299       });
    300     });
    301   }
    302 
    303   /**
    304    * Compress data. Concurrency limited by async-limiter.
    305    *
    306    * @param {Buffer} data Data to compress
    307    * @param {Boolean} fin Specifies whether or not this is the last fragment
    308    * @param {Function} callback Callback
    309    * @public
    310    */
    311   compress(data, fin, callback) {
    312     zlibLimiter.push((done) => {
    313       this._compress(data, fin, (err, result) => {
    314         done();
    315         callback(err, result);
    316       });
    317     });
    318   }
    319 
    320   /**
    321    * Decompress data.
    322    *
    323    * @param {Buffer} data Compressed data
    324    * @param {Boolean} fin Specifies whether or not this is the last fragment
    325    * @param {Function} callback Callback
    326    * @private
    327    */
    328   _decompress(data, fin, callback) {
    329     const endpoint = this._isServer ? 'client' : 'server';
    330 
    331     if (!this._inflate) {
    332       const key = `${endpoint}_max_window_bits`;
    333       const windowBits =
    334         typeof this.params[key] !== 'number'
    335           ? zlib.Z_DEFAULT_WINDOWBITS
    336           : this.params[key];
    337 
    338       this._inflate = zlib.createInflateRaw(
    339         Object.assign({}, this._options.zlibInflateOptions, { windowBits })
    340       );
    341       this._inflate[kPerMessageDeflate] = this;
    342       this._inflate[kTotalLength] = 0;
    343       this._inflate[kBuffers] = [];
    344       this._inflate.on('error', inflateOnError);
    345       this._inflate.on('data', inflateOnData);
    346     }
    347 
    348     this._inflate[kCallback] = callback;
    349 
    350     this._inflate.write(data);
    351     if (fin) this._inflate.write(TRAILER);
    352 
    353     this._inflate.flush(() => {
    354       const err = this._inflate[kError];
    355 
    356       if (err) {
    357         this._inflate.close();
    358         this._inflate = null;
    359         callback(err);
    360         return;
    361       }
    362 
    363       const data = bufferUtil.concat(
    364         this._inflate[kBuffers],
    365         this._inflate[kTotalLength]
    366       );
    367 
    368       if (fin && this.params[`${endpoint}_no_context_takeover`]) {
    369         this._inflate.close();
    370         this._inflate = null;
    371       } else {
    372         this._inflate[kTotalLength] = 0;
    373         this._inflate[kBuffers] = [];
    374       }
    375 
    376       callback(null, data);
    377     });
    378   }
    379 
    380   /**
    381    * Compress data.
    382    *
    383    * @param {Buffer} data Data to compress
    384    * @param {Boolean} fin Specifies whether or not this is the last fragment
    385    * @param {Function} callback Callback
    386    * @private
    387    */
    388   _compress(data, fin, callback) {
    389     if (!data || data.length === 0) {
    390       process.nextTick(callback, null, EMPTY_BLOCK);
    391       return;
    392     }
    393 
    394     const endpoint = this._isServer ? 'server' : 'client';
    395 
    396     if (!this._deflate) {
    397       const key = `${endpoint}_max_window_bits`;
    398       const windowBits =
    399         typeof this.params[key] !== 'number'
    400           ? zlib.Z_DEFAULT_WINDOWBITS
    401           : this.params[key];
    402 
    403       this._deflate = zlib.createDeflateRaw(
    404         Object.assign({}, this._options.zlibDeflateOptions, { windowBits })
    405       );
    406 
    407       this._deflate[kTotalLength] = 0;
    408       this._deflate[kBuffers] = [];
    409 
    410       //
    411       // An `'error'` event is emitted, only on Node.js < 10.0.0, if the
    412       // `zlib.DeflateRaw` instance is closed while data is being processed.
    413       // This can happen if `PerMessageDeflate#cleanup()` is called at the wrong
    414       // time due to an abnormal WebSocket closure.
    415       //
    416       this._deflate.on('error', NOOP);
    417       this._deflate.on('data', deflateOnData);
    418     }
    419 
    420     this._deflate.write(data);
    421     this._deflate.flush(zlib.Z_SYNC_FLUSH, () => {
    422       if (!this._deflate) {
    423         //
    424         // This `if` statement is only needed for Node.js < 10.0.0 because as of
    425         // commit https://github.com/nodejs/node/commit/5e3f5164, the flush
    426         // callback is no longer called if the deflate stream is closed while
    427         // data is being processed.
    428         //
    429         return;
    430       }
    431 
    432       var data = bufferUtil.concat(
    433         this._deflate[kBuffers],
    434         this._deflate[kTotalLength]
    435       );
    436 
    437       if (fin) data = data.slice(0, data.length - 4);
    438 
    439       if (fin && this.params[`${endpoint}_no_context_takeover`]) {
    440         this._deflate.close();
    441         this._deflate = null;
    442       } else {
    443         this._deflate[kTotalLength] = 0;
    444         this._deflate[kBuffers] = [];
    445       }
    446 
    447       callback(null, data);
    448     });
    449   }
    450 }
    451 
    452 module.exports = PerMessageDeflate;
    453 
    454 /**
    455  * The listener of the `zlib.DeflateRaw` stream `'data'` event.
    456  *
    457  * @param {Buffer} chunk A chunk of data
    458  * @private
    459  */
    460 function deflateOnData(chunk) {
    461   this[kBuffers].push(chunk);
    462   this[kTotalLength] += chunk.length;
    463 }
    464 
    465 /**
    466  * The listener of the `zlib.InflateRaw` stream `'data'` event.
    467  *
    468  * @param {Buffer} chunk A chunk of data
    469  * @private
    470  */
    471 function inflateOnData(chunk) {
    472   this[kTotalLength] += chunk.length;
    473 
    474   if (
    475     this[kPerMessageDeflate]._maxPayload < 1 ||
    476     this[kTotalLength] <= this[kPerMessageDeflate]._maxPayload
    477   ) {
    478     this[kBuffers].push(chunk);
    479     return;
    480   }
    481 
    482   this[kError] = new RangeError('Max payload size exceeded');
    483   this[kError][kStatusCode] = 1009;
    484   this.removeListener('data', inflateOnData);
    485   this.reset();
    486 }
    487 
    488 /**
    489  * The listener of the `zlib.InflateRaw` stream `'error'` event.
    490  *
    491  * @param {Error} err The emitted error
    492  * @private
    493  */
    494 function inflateOnError(err) {
    495   //
    496   // There is no need to call `Zlib#close()` as the handle is automatically
    497   // closed when an error is emitted.
    498   //
    499   this[kPerMessageDeflate]._inflate = null;
    500   err[kStatusCode] = 1007;
    501   this[kCallback](err);
    502 }