twitst4tz

twitter statistics web application
Log | Files | Refs | README | LICENSE

receiver.js (12390B)


      1 'use strict';
      2 
      3 const { Writable } = require('stream');
      4 
      5 const PerMessageDeflate = require('./permessage-deflate');
      6 const {
      7   BINARY_TYPES,
      8   EMPTY_BUFFER,
      9   kStatusCode,
     10   kWebSocket
     11 } = require('./constants');
     12 const { concat, toArrayBuffer, unmask } = require('./buffer-util');
     13 const { isValidStatusCode, isValidUTF8 } = require('./validation');
     14 
     15 const GET_INFO = 0;
     16 const GET_PAYLOAD_LENGTH_16 = 1;
     17 const GET_PAYLOAD_LENGTH_64 = 2;
     18 const GET_MASK = 3;
     19 const GET_DATA = 4;
     20 const INFLATING = 5;
     21 
     22 /**
     23  * HyBi Receiver implementation.
     24  *
     25  * @extends stream.Writable
     26  */
     27 class Receiver extends Writable {
     28   /**
     29    * Creates a Receiver instance.
     30    *
     31    * @param {String} binaryType The type for binary data
     32    * @param {Object} extensions An object containing the negotiated extensions
     33    * @param {Boolean} isServer Specifies whether to operate in client or server
     34    *     mode
     35    * @param {Number} maxPayload The maximum allowed message length
     36    */
     37   constructor(binaryType, extensions, isServer, maxPayload) {
     38     super();
     39 
     40     this._binaryType = binaryType || BINARY_TYPES[0];
     41     this[kWebSocket] = undefined;
     42     this._extensions = extensions || {};
     43     this._isServer = !!isServer;
     44     this._maxPayload = maxPayload | 0;
     45 
     46     this._bufferedBytes = 0;
     47     this._buffers = [];
     48 
     49     this._compressed = false;
     50     this._payloadLength = 0;
     51     this._mask = undefined;
     52     this._fragmented = 0;
     53     this._masked = false;
     54     this._fin = false;
     55     this._opcode = 0;
     56 
     57     this._totalPayloadLength = 0;
     58     this._messageLength = 0;
     59     this._fragments = [];
     60 
     61     this._state = GET_INFO;
     62     this._loop = false;
     63   }
     64 
     65   /**
     66    * Implements `Writable.prototype._write()`.
     67    *
     68    * @param {Buffer} chunk The chunk of data to write
     69    * @param {String} encoding The character encoding of `chunk`
     70    * @param {Function} cb Callback
     71    */
     72   _write(chunk, encoding, cb) {
     73     if (this._opcode === 0x08 && this._state == GET_INFO) return cb();
     74 
     75     this._bufferedBytes += chunk.length;
     76     this._buffers.push(chunk);
     77     this.startLoop(cb);
     78   }
     79 
     80   /**
     81    * Consumes `n` bytes from the buffered data.
     82    *
     83    * @param {Number} n The number of bytes to consume
     84    * @return {Buffer} The consumed bytes
     85    * @private
     86    */
     87   consume(n) {
     88     this._bufferedBytes -= n;
     89 
     90     if (n === this._buffers[0].length) return this._buffers.shift();
     91 
     92     if (n < this._buffers[0].length) {
     93       const buf = this._buffers[0];
     94       this._buffers[0] = buf.slice(n);
     95       return buf.slice(0, n);
     96     }
     97 
     98     const dst = Buffer.allocUnsafe(n);
     99 
    100     do {
    101       const buf = this._buffers[0];
    102       const offset = dst.length - n;
    103 
    104       if (n >= buf.length) {
    105         dst.set(this._buffers.shift(), offset);
    106       } else {
    107         dst.set(new Uint8Array(buf.buffer, buf.byteOffset, n), offset);
    108         this._buffers[0] = buf.slice(n);
    109       }
    110 
    111       n -= buf.length;
    112     } while (n > 0);
    113 
    114     return dst;
    115   }
    116 
    117   /**
    118    * Starts the parsing loop.
    119    *
    120    * @param {Function} cb Callback
    121    * @private
    122    */
    123   startLoop(cb) {
    124     let err;
    125     this._loop = true;
    126 
    127     do {
    128       switch (this._state) {
    129         case GET_INFO:
    130           err = this.getInfo();
    131           break;
    132         case GET_PAYLOAD_LENGTH_16:
    133           err = this.getPayloadLength16();
    134           break;
    135         case GET_PAYLOAD_LENGTH_64:
    136           err = this.getPayloadLength64();
    137           break;
    138         case GET_MASK:
    139           this.getMask();
    140           break;
    141         case GET_DATA:
    142           err = this.getData(cb);
    143           break;
    144         default:
    145           // `INFLATING`
    146           this._loop = false;
    147           return;
    148       }
    149     } while (this._loop);
    150 
    151     cb(err);
    152   }
    153 
    154   /**
    155    * Reads the first two bytes of a frame.
    156    *
    157    * @return {(RangeError|undefined)} A possible error
    158    * @private
    159    */
    160   getInfo() {
    161     if (this._bufferedBytes < 2) {
    162       this._loop = false;
    163       return;
    164     }
    165 
    166     const buf = this.consume(2);
    167 
    168     if ((buf[0] & 0x30) !== 0x00) {
    169       this._loop = false;
    170       return error(RangeError, 'RSV2 and RSV3 must be clear', true, 1002);
    171     }
    172 
    173     const compressed = (buf[0] & 0x40) === 0x40;
    174 
    175     if (compressed && !this._extensions[PerMessageDeflate.extensionName]) {
    176       this._loop = false;
    177       return error(RangeError, 'RSV1 must be clear', true, 1002);
    178     }
    179 
    180     this._fin = (buf[0] & 0x80) === 0x80;
    181     this._opcode = buf[0] & 0x0f;
    182     this._payloadLength = buf[1] & 0x7f;
    183 
    184     if (this._opcode === 0x00) {
    185       if (compressed) {
    186         this._loop = false;
    187         return error(RangeError, 'RSV1 must be clear', true, 1002);
    188       }
    189 
    190       if (!this._fragmented) {
    191         this._loop = false;
    192         return error(RangeError, 'invalid opcode 0', true, 1002);
    193       }
    194 
    195       this._opcode = this._fragmented;
    196     } else if (this._opcode === 0x01 || this._opcode === 0x02) {
    197       if (this._fragmented) {
    198         this._loop = false;
    199         return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002);
    200       }
    201 
    202       this._compressed = compressed;
    203     } else if (this._opcode > 0x07 && this._opcode < 0x0b) {
    204       if (!this._fin) {
    205         this._loop = false;
    206         return error(RangeError, 'FIN must be set', true, 1002);
    207       }
    208 
    209       if (compressed) {
    210         this._loop = false;
    211         return error(RangeError, 'RSV1 must be clear', true, 1002);
    212       }
    213 
    214       if (this._payloadLength > 0x7d) {
    215         this._loop = false;
    216         return error(
    217           RangeError,
    218           `invalid payload length ${this._payloadLength}`,
    219           true,
    220           1002
    221         );
    222       }
    223     } else {
    224       this._loop = false;
    225       return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002);
    226     }
    227 
    228     if (!this._fin && !this._fragmented) this._fragmented = this._opcode;
    229     this._masked = (buf[1] & 0x80) === 0x80;
    230 
    231     if (this._isServer) {
    232       if (!this._masked) {
    233         this._loop = false;
    234         return error(RangeError, 'MASK must be set', true, 1002);
    235       }
    236     } else if (this._masked) {
    237       this._loop = false;
    238       return error(RangeError, 'MASK must be clear', true, 1002);
    239     }
    240 
    241     if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16;
    242     else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64;
    243     else return this.haveLength();
    244   }
    245 
    246   /**
    247    * Gets extended payload length (7+16).
    248    *
    249    * @return {(RangeError|undefined)} A possible error
    250    * @private
    251    */
    252   getPayloadLength16() {
    253     if (this._bufferedBytes < 2) {
    254       this._loop = false;
    255       return;
    256     }
    257 
    258     this._payloadLength = this.consume(2).readUInt16BE(0);
    259     return this.haveLength();
    260   }
    261 
    262   /**
    263    * Gets extended payload length (7+64).
    264    *
    265    * @return {(RangeError|undefined)} A possible error
    266    * @private
    267    */
    268   getPayloadLength64() {
    269     if (this._bufferedBytes < 8) {
    270       this._loop = false;
    271       return;
    272     }
    273 
    274     const buf = this.consume(8);
    275     const num = buf.readUInt32BE(0);
    276 
    277     //
    278     // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned
    279     // if payload length is greater than this number.
    280     //
    281     if (num > Math.pow(2, 53 - 32) - 1) {
    282       this._loop = false;
    283       return error(
    284         RangeError,
    285         'Unsupported WebSocket frame: payload length > 2^53 - 1',
    286         false,
    287         1009
    288       );
    289     }
    290 
    291     this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4);
    292     return this.haveLength();
    293   }
    294 
    295   /**
    296    * Payload length has been read.
    297    *
    298    * @return {(RangeError|undefined)} A possible error
    299    * @private
    300    */
    301   haveLength() {
    302     if (this._payloadLength && this._opcode < 0x08) {
    303       this._totalPayloadLength += this._payloadLength;
    304       if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) {
    305         this._loop = false;
    306         return error(RangeError, 'Max payload size exceeded', false, 1009);
    307       }
    308     }
    309 
    310     if (this._masked) this._state = GET_MASK;
    311     else this._state = GET_DATA;
    312   }
    313 
    314   /**
    315    * Reads mask bytes.
    316    *
    317    * @private
    318    */
    319   getMask() {
    320     if (this._bufferedBytes < 4) {
    321       this._loop = false;
    322       return;
    323     }
    324 
    325     this._mask = this.consume(4);
    326     this._state = GET_DATA;
    327   }
    328 
    329   /**
    330    * Reads data bytes.
    331    *
    332    * @param {Function} cb Callback
    333    * @return {(Error|RangeError|undefined)} A possible error
    334    * @private
    335    */
    336   getData(cb) {
    337     let data = EMPTY_BUFFER;
    338 
    339     if (this._payloadLength) {
    340       if (this._bufferedBytes < this._payloadLength) {
    341         this._loop = false;
    342         return;
    343       }
    344 
    345       data = this.consume(this._payloadLength);
    346       if (this._masked) unmask(data, this._mask);
    347     }
    348 
    349     if (this._opcode > 0x07) return this.controlMessage(data);
    350 
    351     if (this._compressed) {
    352       this._state = INFLATING;
    353       this.decompress(data, cb);
    354       return;
    355     }
    356 
    357     if (data.length) {
    358       //
    359       // This message is not compressed so its lenght is the sum of the payload
    360       // length of all fragments.
    361       //
    362       this._messageLength = this._totalPayloadLength;
    363       this._fragments.push(data);
    364     }
    365 
    366     return this.dataMessage();
    367   }
    368 
    369   /**
    370    * Decompresses data.
    371    *
    372    * @param {Buffer} data Compressed data
    373    * @param {Function} cb Callback
    374    * @private
    375    */
    376   decompress(data, cb) {
    377     const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
    378 
    379     perMessageDeflate.decompress(data, this._fin, (err, buf) => {
    380       if (err) return cb(err);
    381 
    382       if (buf.length) {
    383         this._messageLength += buf.length;
    384         if (this._messageLength > this._maxPayload && this._maxPayload > 0) {
    385           return cb(
    386             error(RangeError, 'Max payload size exceeded', false, 1009)
    387           );
    388         }
    389 
    390         this._fragments.push(buf);
    391       }
    392 
    393       const er = this.dataMessage();
    394       if (er) return cb(er);
    395 
    396       this.startLoop(cb);
    397     });
    398   }
    399 
    400   /**
    401    * Handles a data message.
    402    *
    403    * @return {(Error|undefined)} A possible error
    404    * @private
    405    */
    406   dataMessage() {
    407     if (this._fin) {
    408       const messageLength = this._messageLength;
    409       const fragments = this._fragments;
    410 
    411       this._totalPayloadLength = 0;
    412       this._messageLength = 0;
    413       this._fragmented = 0;
    414       this._fragments = [];
    415 
    416       if (this._opcode === 2) {
    417         let data;
    418 
    419         if (this._binaryType === 'nodebuffer') {
    420           data = concat(fragments, messageLength);
    421         } else if (this._binaryType === 'arraybuffer') {
    422           data = toArrayBuffer(concat(fragments, messageLength));
    423         } else {
    424           data = fragments;
    425         }
    426 
    427         this.emit('message', data);
    428       } else {
    429         const buf = concat(fragments, messageLength);
    430 
    431         if (!isValidUTF8(buf)) {
    432           this._loop = false;
    433           return error(Error, 'invalid UTF-8 sequence', true, 1007);
    434         }
    435 
    436         this.emit('message', buf.toString());
    437       }
    438     }
    439 
    440     this._state = GET_INFO;
    441   }
    442 
    443   /**
    444    * Handles a control message.
    445    *
    446    * @param {Buffer} data Data to handle
    447    * @return {(Error|RangeError|undefined)} A possible error
    448    * @private
    449    */
    450   controlMessage(data) {
    451     if (this._opcode === 0x08) {
    452       this._loop = false;
    453 
    454       if (data.length === 0) {
    455         this.emit('conclude', 1005, '');
    456         this.end();
    457       } else if (data.length === 1) {
    458         return error(RangeError, 'invalid payload length 1', true, 1002);
    459       } else {
    460         const code = data.readUInt16BE(0);
    461 
    462         if (!isValidStatusCode(code)) {
    463           return error(RangeError, `invalid status code ${code}`, true, 1002);
    464         }
    465 
    466         const buf = data.slice(2);
    467 
    468         if (!isValidUTF8(buf)) {
    469           return error(Error, 'invalid UTF-8 sequence', true, 1007);
    470         }
    471 
    472         this.emit('conclude', code, buf.toString());
    473         this.end();
    474       }
    475     } else if (this._opcode === 0x09) {
    476       this.emit('ping', data);
    477     } else {
    478       this.emit('pong', data);
    479     }
    480 
    481     this._state = GET_INFO;
    482   }
    483 }
    484 
    485 module.exports = Receiver;
    486 
    487 /**
    488  * Builds an error object.
    489  *
    490  * @param {(Error|RangeError)} ErrorCtor The error constructor
    491  * @param {String} message The error message
    492  * @param {Boolean} prefix Specifies whether or not to add a default prefix to
    493  *     `message`
    494  * @param {Number} statusCode The status code
    495  * @return {(Error|RangeError)} The error
    496  * @private
    497  */
    498 function error(ErrorCtor, message, prefix, statusCode) {
    499   const err = new ErrorCtor(
    500     prefix ? `Invalid WebSocket frame: ${message}` : message
    501   );
    502 
    503   Error.captureStackTrace(err, error);
    504   err[kStatusCode] = statusCode;
    505   return err;
    506 }