twitst4tz

twitter statistics web application
Log | Files | Refs | README | LICENSE

receiver.js (12664B)


      1 'use strict';
      2 
      3 const stream = require('stream');
      4 
      5 const PerMessageDeflate = require('./permessage-deflate');
      6 const bufferUtil = require('./buffer-util');
      7 const validation = require('./validation');
      8 const constants = require('./constants');
      9 
     10 const GET_INFO = 0;
     11 const GET_PAYLOAD_LENGTH_16 = 1;
     12 const GET_PAYLOAD_LENGTH_64 = 2;
     13 const GET_MASK = 3;
     14 const GET_DATA = 4;
     15 const INFLATING = 5;
     16 
     17 /**
     18  * HyBi Receiver implementation.
     19  *
     20  * @extends stream.Writable
     21  */
     22 class Receiver extends stream.Writable {
     23   /**
     24    * Creates a Receiver instance.
     25    *
     26    * @param {String} binaryType The type for binary data
     27    * @param {Object} extensions An object containing the negotiated extensions
     28    * @param {Number} maxPayload The maximum allowed message length
     29    */
     30   constructor(binaryType, extensions, maxPayload) {
     31     super();
     32 
     33     this._binaryType = binaryType || constants.BINARY_TYPES[0];
     34     this[constants.kWebSocket] = undefined;
     35     this._extensions = extensions || {};
     36     this._maxPayload = maxPayload | 0;
     37 
     38     this._bufferedBytes = 0;
     39     this._buffers = [];
     40 
     41     this._compressed = false;
     42     this._payloadLength = 0;
     43     this._mask = undefined;
     44     this._fragmented = 0;
     45     this._masked = false;
     46     this._fin = false;
     47     this._opcode = 0;
     48 
     49     this._totalPayloadLength = 0;
     50     this._messageLength = 0;
     51     this._fragments = [];
     52 
     53     this._state = GET_INFO;
     54     this._loop = false;
     55   }
     56 
     57   /**
     58    * Implements `Writable.prototype._write()`.
     59    *
     60    * @param {Buffer} chunk The chunk of data to write
     61    * @param {String} encoding The character encoding of `chunk`
     62    * @param {Function} cb Callback
     63    */
     64   _write(chunk, encoding, cb) {
     65     if (this._opcode === 0x08 && this._state == GET_INFO) return cb();
     66 
     67     this._bufferedBytes += chunk.length;
     68     this._buffers.push(chunk);
     69     this.startLoop(cb);
     70   }
     71 
     72   /**
     73    * Consumes `n` bytes from the buffered data.
     74    *
     75    * @param {Number} n The number of bytes to consume
     76    * @return {Buffer} The consumed bytes
     77    * @private
     78    */
     79   consume(n) {
     80     this._bufferedBytes -= n;
     81 
     82     if (n === this._buffers[0].length) return this._buffers.shift();
     83 
     84     if (n < this._buffers[0].length) {
     85       const buf = this._buffers[0];
     86       this._buffers[0] = buf.slice(n);
     87       return buf.slice(0, n);
     88     }
     89 
     90     const dst = Buffer.allocUnsafe(n);
     91 
     92     do {
     93       const buf = this._buffers[0];
     94 
     95       if (n >= buf.length) {
     96         this._buffers.shift().copy(dst, dst.length - n);
     97       } else {
     98         buf.copy(dst, dst.length - n, 0, n);
     99         this._buffers[0] = buf.slice(n);
    100       }
    101 
    102       n -= buf.length;
    103     } while (n > 0);
    104 
    105     return dst;
    106   }
    107 
    108   /**
    109    * Starts the parsing loop.
    110    *
    111    * @param {Function} cb Callback
    112    * @private
    113    */
    114   startLoop(cb) {
    115     var err;
    116     this._loop = true;
    117 
    118     do {
    119       switch (this._state) {
    120         case GET_INFO:
    121           err = this.getInfo();
    122           break;
    123         case GET_PAYLOAD_LENGTH_16:
    124           err = this.getPayloadLength16();
    125           break;
    126         case GET_PAYLOAD_LENGTH_64:
    127           err = this.getPayloadLength64();
    128           break;
    129         case GET_MASK:
    130           this.getMask();
    131           break;
    132         case GET_DATA:
    133           err = this.getData(cb);
    134           break;
    135         default:
    136           // `INFLATING`
    137           this._loop = false;
    138           return;
    139       }
    140     } while (this._loop);
    141 
    142     cb(err);
    143   }
    144 
    145   /**
    146    * Reads the first two bytes of a frame.
    147    *
    148    * @return {(RangeError|undefined)} A possible error
    149    * @private
    150    */
    151   getInfo() {
    152     if (this._bufferedBytes < 2) {
    153       this._loop = false;
    154       return;
    155     }
    156 
    157     const buf = this.consume(2);
    158 
    159     if ((buf[0] & 0x30) !== 0x00) {
    160       this._loop = false;
    161       return error(RangeError, 'RSV2 and RSV3 must be clear', true, 1002);
    162     }
    163 
    164     const compressed = (buf[0] & 0x40) === 0x40;
    165 
    166     if (compressed && !this._extensions[PerMessageDeflate.extensionName]) {
    167       this._loop = false;
    168       return error(RangeError, 'RSV1 must be clear', true, 1002);
    169     }
    170 
    171     this._fin = (buf[0] & 0x80) === 0x80;
    172     this._opcode = buf[0] & 0x0f;
    173     this._payloadLength = buf[1] & 0x7f;
    174 
    175     if (this._opcode === 0x00) {
    176       if (compressed) {
    177         this._loop = false;
    178         return error(RangeError, 'RSV1 must be clear', true, 1002);
    179       }
    180 
    181       if (!this._fragmented) {
    182         this._loop = false;
    183         return error(RangeError, 'invalid opcode 0', true, 1002);
    184       }
    185 
    186       this._opcode = this._fragmented;
    187     } else if (this._opcode === 0x01 || this._opcode === 0x02) {
    188       if (this._fragmented) {
    189         this._loop = false;
    190         return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002);
    191       }
    192 
    193       this._compressed = compressed;
    194     } else if (this._opcode > 0x07 && this._opcode < 0x0b) {
    195       if (!this._fin) {
    196         this._loop = false;
    197         return error(RangeError, 'FIN must be set', true, 1002);
    198       }
    199 
    200       if (compressed) {
    201         this._loop = false;
    202         return error(RangeError, 'RSV1 must be clear', true, 1002);
    203       }
    204 
    205       if (this._payloadLength > 0x7d) {
    206         this._loop = false;
    207         return error(
    208           RangeError,
    209           `invalid payload length ${this._payloadLength}`,
    210           true,
    211           1002
    212         );
    213       }
    214     } else {
    215       this._loop = false;
    216       return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002);
    217     }
    218 
    219     if (!this._fin && !this._fragmented) this._fragmented = this._opcode;
    220     this._masked = (buf[1] & 0x80) === 0x80;
    221 
    222     if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16;
    223     else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64;
    224     else return this.haveLength();
    225   }
    226 
    227   /**
    228    * Gets extended payload length (7+16).
    229    *
    230    * @return {(RangeError|undefined)} A possible error
    231    * @private
    232    */
    233   getPayloadLength16() {
    234     if (this._bufferedBytes < 2) {
    235       this._loop = false;
    236       return;
    237     }
    238 
    239     this._payloadLength = this.consume(2).readUInt16BE(0);
    240     return this.haveLength();
    241   }
    242 
    243   /**
    244    * Gets extended payload length (7+64).
    245    *
    246    * @return {(RangeError|undefined)} A possible error
    247    * @private
    248    */
    249   getPayloadLength64() {
    250     if (this._bufferedBytes < 8) {
    251       this._loop = false;
    252       return;
    253     }
    254 
    255     const buf = this.consume(8);
    256     const num = buf.readUInt32BE(0);
    257 
    258     //
    259     // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned
    260     // if payload length is greater than this number.
    261     //
    262     if (num > Math.pow(2, 53 - 32) - 1) {
    263       this._loop = false;
    264       return error(
    265         RangeError,
    266         'Unsupported WebSocket frame: payload length > 2^53 - 1',
    267         false,
    268         1009
    269       );
    270     }
    271 
    272     this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4);
    273     return this.haveLength();
    274   }
    275 
    276   /**
    277    * Payload length has been read.
    278    *
    279    * @return {(RangeError|undefined)} A possible error
    280    * @private
    281    */
    282   haveLength() {
    283     if (this._payloadLength && this._opcode < 0x08) {
    284       this._totalPayloadLength += this._payloadLength;
    285       if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) {
    286         this._loop = false;
    287         return error(RangeError, 'Max payload size exceeded', false, 1009);
    288       }
    289     }
    290 
    291     if (this._masked) this._state = GET_MASK;
    292     else this._state = GET_DATA;
    293   }
    294 
    295   /**
    296    * Reads mask bytes.
    297    *
    298    * @private
    299    */
    300   getMask() {
    301     if (this._bufferedBytes < 4) {
    302       this._loop = false;
    303       return;
    304     }
    305 
    306     this._mask = this.consume(4);
    307     this._state = GET_DATA;
    308   }
    309 
    310   /**
    311    * Reads data bytes.
    312    *
    313    * @param {Function} cb Callback
    314    * @return {(Error|RangeError|undefined)} A possible error
    315    * @private
    316    */
    317   getData(cb) {
    318     var data = constants.EMPTY_BUFFER;
    319 
    320     if (this._payloadLength) {
    321       if (this._bufferedBytes < this._payloadLength) {
    322         this._loop = false;
    323         return;
    324       }
    325 
    326       data = this.consume(this._payloadLength);
    327       if (this._masked) bufferUtil.unmask(data, this._mask);
    328     }
    329 
    330     if (this._opcode > 0x07) return this.controlMessage(data);
    331 
    332     if (this._compressed) {
    333       this._state = INFLATING;
    334       this.decompress(data, cb);
    335       return;
    336     }
    337 
    338     if (data.length) {
    339       //
    340       // This message is not compressed so its lenght is the sum of the payload
    341       // length of all fragments.
    342       //
    343       this._messageLength = this._totalPayloadLength;
    344       this._fragments.push(data);
    345     }
    346 
    347     return this.dataMessage();
    348   }
    349 
    350   /**
    351    * Decompresses data.
    352    *
    353    * @param {Buffer} data Compressed data
    354    * @param {Function} cb Callback
    355    * @private
    356    */
    357   decompress(data, cb) {
    358     const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
    359 
    360     perMessageDeflate.decompress(data, this._fin, (err, buf) => {
    361       if (err) return cb(err);
    362 
    363       if (buf.length) {
    364         this._messageLength += buf.length;
    365         if (this._messageLength > this._maxPayload && this._maxPayload > 0) {
    366           return cb(
    367             error(RangeError, 'Max payload size exceeded', false, 1009)
    368           );
    369         }
    370 
    371         this._fragments.push(buf);
    372       }
    373 
    374       const er = this.dataMessage();
    375       if (er) return cb(er);
    376 
    377       this.startLoop(cb);
    378     });
    379   }
    380 
    381   /**
    382    * Handles a data message.
    383    *
    384    * @return {(Error|undefined)} A possible error
    385    * @private
    386    */
    387   dataMessage() {
    388     if (this._fin) {
    389       const messageLength = this._messageLength;
    390       const fragments = this._fragments;
    391 
    392       this._totalPayloadLength = 0;
    393       this._messageLength = 0;
    394       this._fragmented = 0;
    395       this._fragments = [];
    396 
    397       if (this._opcode === 2) {
    398         var data;
    399 
    400         if (this._binaryType === 'nodebuffer') {
    401           data = toBuffer(fragments, messageLength);
    402         } else if (this._binaryType === 'arraybuffer') {
    403           data = toArrayBuffer(toBuffer(fragments, messageLength));
    404         } else {
    405           data = fragments;
    406         }
    407 
    408         this.emit('message', data);
    409       } else {
    410         const buf = toBuffer(fragments, messageLength);
    411 
    412         if (!validation.isValidUTF8(buf)) {
    413           this._loop = false;
    414           return error(Error, 'invalid UTF-8 sequence', true, 1007);
    415         }
    416 
    417         this.emit('message', buf.toString());
    418       }
    419     }
    420 
    421     this._state = GET_INFO;
    422   }
    423 
    424   /**
    425    * Handles a control message.
    426    *
    427    * @param {Buffer} data Data to handle
    428    * @return {(Error|RangeError|undefined)} A possible error
    429    * @private
    430    */
    431   controlMessage(data) {
    432     if (this._opcode === 0x08) {
    433       this._loop = false;
    434 
    435       if (data.length === 0) {
    436         this.emit('conclude', 1005, '');
    437         this.end();
    438       } else if (data.length === 1) {
    439         return error(RangeError, 'invalid payload length 1', true, 1002);
    440       } else {
    441         const code = data.readUInt16BE(0);
    442 
    443         if (!validation.isValidStatusCode(code)) {
    444           return error(RangeError, `invalid status code ${code}`, true, 1002);
    445         }
    446 
    447         const buf = data.slice(2);
    448 
    449         if (!validation.isValidUTF8(buf)) {
    450           return error(Error, 'invalid UTF-8 sequence', true, 1007);
    451         }
    452 
    453         this.emit('conclude', code, buf.toString());
    454         this.end();
    455       }
    456     } else if (this._opcode === 0x09) {
    457       this.emit('ping', data);
    458     } else {
    459       this.emit('pong', data);
    460     }
    461 
    462     this._state = GET_INFO;
    463   }
    464 }
    465 
    466 module.exports = Receiver;
    467 
    468 /**
    469  * Builds an error object.
    470  *
    471  * @param {(Error|RangeError)} ErrorCtor The error constructor
    472  * @param {String} message The error message
    473  * @param {Boolean} prefix Specifies whether or not to add a default prefix to
    474  *     `message`
    475  * @param {Number} statusCode The status code
    476  * @return {(Error|RangeError)} The error
    477  * @private
    478  */
    479 function error(ErrorCtor, message, prefix, statusCode) {
    480   const err = new ErrorCtor(
    481     prefix ? `Invalid WebSocket frame: ${message}` : message
    482   );
    483 
    484   Error.captureStackTrace(err, error);
    485   err[constants.kStatusCode] = statusCode;
    486   return err;
    487 }
    488 
    489 /**
    490  * Makes a buffer from a list of fragments.
    491  *
    492  * @param {Buffer[]} fragments The list of fragments composing the message
    493  * @param {Number} messageLength The length of the message
    494  * @return {Buffer}
    495  * @private
    496  */
    497 function toBuffer(fragments, messageLength) {
    498   if (fragments.length === 1) return fragments[0];
    499   if (fragments.length > 1) return bufferUtil.concat(fragments, messageLength);
    500   return constants.EMPTY_BUFFER;
    501 }
    502 
    503 /**
    504  * Converts a buffer to an `ArrayBuffer`.
    505  *
    506  * @param {Buffer} buf The buffer to convert
    507  * @return {ArrayBuffer} Converted buffer
    508  */
    509 function toArrayBuffer(buf) {
    510   if (buf.byteLength === buf.buffer.byteLength) {
    511     return buf.buffer;
    512   }
    513 
    514   return buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength);
    515 }