twitst4tz

twitter statistics web application
Log | Files | Refs | README | LICENSE

sender.js (10290B)


      1 'use strict';
      2 
      3 const { randomFillSync } = require('crypto');
      4 
      5 const PerMessageDeflate = require('./permessage-deflate');
      6 const { EMPTY_BUFFER } = require('./constants');
      7 const { isValidStatusCode } = require('./validation');
      8 const { mask: applyMask, toBuffer } = require('./buffer-util');
      9 
     10 const mask = Buffer.alloc(4);
     11 
     12 /**
     13  * HyBi Sender implementation.
     14  */
     15 class Sender {
     16   /**
     17    * Creates a Sender instance.
     18    *
     19    * @param {net.Socket} socket The connection socket
     20    * @param {Object} extensions An object containing the negotiated extensions
     21    */
     22   constructor(socket, extensions) {
     23     this._extensions = extensions || {};
     24     this._socket = socket;
     25 
     26     this._firstFragment = true;
     27     this._compress = false;
     28 
     29     this._bufferedBytes = 0;
     30     this._deflating = false;
     31     this._queue = [];
     32   }
     33 
     34   /**
     35    * Frames a piece of data according to the HyBi WebSocket protocol.
     36    *
     37    * @param {Buffer} data The data to frame
     38    * @param {Object} options Options object
     39    * @param {Number} options.opcode The opcode
     40    * @param {Boolean} options.readOnly Specifies whether `data` can be modified
     41    * @param {Boolean} options.fin Specifies whether or not to set the FIN bit
     42    * @param {Boolean} options.mask Specifies whether or not to mask `data`
     43    * @param {Boolean} options.rsv1 Specifies whether or not to set the RSV1 bit
     44    * @return {Buffer[]} The framed data as a list of `Buffer` instances
     45    * @public
     46    */
     47   static frame(data, options) {
     48     const merge = options.mask && options.readOnly;
     49     let offset = options.mask ? 6 : 2;
     50     let payloadLength = data.length;
     51 
     52     if (data.length >= 65536) {
     53       offset += 8;
     54       payloadLength = 127;
     55     } else if (data.length > 125) {
     56       offset += 2;
     57       payloadLength = 126;
     58     }
     59 
     60     const target = Buffer.allocUnsafe(merge ? data.length + offset : offset);
     61 
     62     target[0] = options.fin ? options.opcode | 0x80 : options.opcode;
     63     if (options.rsv1) target[0] |= 0x40;
     64 
     65     target[1] = payloadLength;
     66 
     67     if (payloadLength === 126) {
     68       target.writeUInt16BE(data.length, 2);
     69     } else if (payloadLength === 127) {
     70       target.writeUInt32BE(0, 2);
     71       target.writeUInt32BE(data.length, 6);
     72     }
     73 
     74     if (!options.mask) return [target, data];
     75 
     76     randomFillSync(mask, 0, 4);
     77 
     78     target[1] |= 0x80;
     79     target[offset - 4] = mask[0];
     80     target[offset - 3] = mask[1];
     81     target[offset - 2] = mask[2];
     82     target[offset - 1] = mask[3];
     83 
     84     if (merge) {
     85       applyMask(data, mask, target, offset, data.length);
     86       return [target];
     87     }
     88 
     89     applyMask(data, mask, data, 0, data.length);
     90     return [target, data];
     91   }
     92 
     93   /**
     94    * Sends a close message to the other peer.
     95    *
     96    * @param {(Number|undefined)} code The status code component of the body
     97    * @param {String} data The message component of the body
     98    * @param {Boolean} mask Specifies whether or not to mask the message
     99    * @param {Function} cb Callback
    100    * @public
    101    */
    102   close(code, data, mask, cb) {
    103     let buf;
    104 
    105     if (code === undefined) {
    106       buf = EMPTY_BUFFER;
    107     } else if (typeof code !== 'number' || !isValidStatusCode(code)) {
    108       throw new TypeError('First argument must be a valid error code number');
    109     } else if (data === undefined || data === '') {
    110       buf = Buffer.allocUnsafe(2);
    111       buf.writeUInt16BE(code, 0);
    112     } else {
    113       const length = Buffer.byteLength(data);
    114 
    115       if (length > 123) {
    116         throw new RangeError('The message must not be greater than 123 bytes');
    117       }
    118 
    119       buf = Buffer.allocUnsafe(2 + length);
    120       buf.writeUInt16BE(code, 0);
    121       buf.write(data, 2);
    122     }
    123 
    124     if (this._deflating) {
    125       this.enqueue([this.doClose, buf, mask, cb]);
    126     } else {
    127       this.doClose(buf, mask, cb);
    128     }
    129   }
    130 
    131   /**
    132    * Frames and sends a close message.
    133    *
    134    * @param {Buffer} data The message to send
    135    * @param {Boolean} mask Specifies whether or not to mask `data`
    136    * @param {Function} cb Callback
    137    * @private
    138    */
    139   doClose(data, mask, cb) {
    140     this.sendFrame(
    141       Sender.frame(data, {
    142         fin: true,
    143         rsv1: false,
    144         opcode: 0x08,
    145         mask,
    146         readOnly: false
    147       }),
    148       cb
    149     );
    150   }
    151 
    152   /**
    153    * Sends a ping message to the other peer.
    154    *
    155    * @param {*} data The message to send
    156    * @param {Boolean} mask Specifies whether or not to mask `data`
    157    * @param {Function} cb Callback
    158    * @public
    159    */
    160   ping(data, mask, cb) {
    161     const buf = toBuffer(data);
    162 
    163     if (buf.length > 125) {
    164       throw new RangeError('The data size must not be greater than 125 bytes');
    165     }
    166 
    167     if (this._deflating) {
    168       this.enqueue([this.doPing, buf, mask, toBuffer.readOnly, cb]);
    169     } else {
    170       this.doPing(buf, mask, toBuffer.readOnly, cb);
    171     }
    172   }
    173 
    174   /**
    175    * Frames and sends a ping message.
    176    *
    177    * @param {*} data The message to send
    178    * @param {Boolean} mask Specifies whether or not to mask `data`
    179    * @param {Boolean} readOnly Specifies whether `data` can be modified
    180    * @param {Function} cb Callback
    181    * @private
    182    */
    183   doPing(data, mask, readOnly, cb) {
    184     this.sendFrame(
    185       Sender.frame(data, {
    186         fin: true,
    187         rsv1: false,
    188         opcode: 0x09,
    189         mask,
    190         readOnly
    191       }),
    192       cb
    193     );
    194   }
    195 
    196   /**
    197    * Sends a pong message to the other peer.
    198    *
    199    * @param {*} data The message to send
    200    * @param {Boolean} mask Specifies whether or not to mask `data`
    201    * @param {Function} cb Callback
    202    * @public
    203    */
    204   pong(data, mask, cb) {
    205     const buf = toBuffer(data);
    206 
    207     if (buf.length > 125) {
    208       throw new RangeError('The data size must not be greater than 125 bytes');
    209     }
    210 
    211     if (this._deflating) {
    212       this.enqueue([this.doPong, buf, mask, toBuffer.readOnly, cb]);
    213     } else {
    214       this.doPong(buf, mask, toBuffer.readOnly, cb);
    215     }
    216   }
    217 
    218   /**
    219    * Frames and sends a pong message.
    220    *
    221    * @param {*} data The message to send
    222    * @param {Boolean} mask Specifies whether or not to mask `data`
    223    * @param {Boolean} readOnly Specifies whether `data` can be modified
    224    * @param {Function} cb Callback
    225    * @private
    226    */
    227   doPong(data, mask, readOnly, cb) {
    228     this.sendFrame(
    229       Sender.frame(data, {
    230         fin: true,
    231         rsv1: false,
    232         opcode: 0x0a,
    233         mask,
    234         readOnly
    235       }),
    236       cb
    237     );
    238   }
    239 
    240   /**
    241    * Sends a data message to the other peer.
    242    *
    243    * @param {*} data The message to send
    244    * @param {Object} options Options object
    245    * @param {Boolean} options.compress Specifies whether or not to compress `data`
    246    * @param {Boolean} options.binary Specifies whether `data` is binary or text
    247    * @param {Boolean} options.fin Specifies whether the fragment is the last one
    248    * @param {Boolean} options.mask Specifies whether or not to mask `data`
    249    * @param {Function} cb Callback
    250    * @public
    251    */
    252   send(data, options, cb) {
    253     const buf = toBuffer(data);
    254     const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
    255     let opcode = options.binary ? 2 : 1;
    256     let rsv1 = options.compress;
    257 
    258     if (this._firstFragment) {
    259       this._firstFragment = false;
    260       if (rsv1 && perMessageDeflate) {
    261         rsv1 = buf.length >= perMessageDeflate._threshold;
    262       }
    263       this._compress = rsv1;
    264     } else {
    265       rsv1 = false;
    266       opcode = 0;
    267     }
    268 
    269     if (options.fin) this._firstFragment = true;
    270 
    271     if (perMessageDeflate) {
    272       const opts = {
    273         fin: options.fin,
    274         rsv1,
    275         opcode,
    276         mask: options.mask,
    277         readOnly: toBuffer.readOnly
    278       };
    279 
    280       if (this._deflating) {
    281         this.enqueue([this.dispatch, buf, this._compress, opts, cb]);
    282       } else {
    283         this.dispatch(buf, this._compress, opts, cb);
    284       }
    285     } else {
    286       this.sendFrame(
    287         Sender.frame(buf, {
    288           fin: options.fin,
    289           rsv1: false,
    290           opcode,
    291           mask: options.mask,
    292           readOnly: toBuffer.readOnly
    293         }),
    294         cb
    295       );
    296     }
    297   }
    298 
    299   /**
    300    * Dispatches a data message.
    301    *
    302    * @param {Buffer} data The message to send
    303    * @param {Boolean} compress Specifies whether or not to compress `data`
    304    * @param {Object} options Options object
    305    * @param {Number} options.opcode The opcode
    306    * @param {Boolean} options.readOnly Specifies whether `data` can be modified
    307    * @param {Boolean} options.fin Specifies whether or not to set the FIN bit
    308    * @param {Boolean} options.mask Specifies whether or not to mask `data`
    309    * @param {Boolean} options.rsv1 Specifies whether or not to set the RSV1 bit
    310    * @param {Function} cb Callback
    311    * @private
    312    */
    313   dispatch(data, compress, options, cb) {
    314     if (!compress) {
    315       this.sendFrame(Sender.frame(data, options), cb);
    316       return;
    317     }
    318 
    319     const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
    320 
    321     this._deflating = true;
    322     perMessageDeflate.compress(data, options.fin, (_, buf) => {
    323       if (this._socket.destroyed) {
    324         const err = new Error(
    325           'The socket was closed while data was being compressed'
    326         );
    327 
    328         if (typeof cb === 'function') cb(err);
    329 
    330         for (let i = 0; i < this._queue.length; i++) {
    331           const callback = this._queue[i][4];
    332 
    333           if (typeof callback === 'function') callback(err);
    334         }
    335 
    336         return;
    337       }
    338 
    339       this._deflating = false;
    340       options.readOnly = false;
    341       this.sendFrame(Sender.frame(buf, options), cb);
    342       this.dequeue();
    343     });
    344   }
    345 
    346   /**
    347    * Executes queued send operations.
    348    *
    349    * @private
    350    */
    351   dequeue() {
    352     while (!this._deflating && this._queue.length) {
    353       const params = this._queue.shift();
    354 
    355       this._bufferedBytes -= params[1].length;
    356       Reflect.apply(params[0], this, params.slice(1));
    357     }
    358   }
    359 
    360   /**
    361    * Enqueues a send operation.
    362    *
    363    * @param {Array} params Send operation parameters.
    364    * @private
    365    */
    366   enqueue(params) {
    367     this._bufferedBytes += params[1].length;
    368     this._queue.push(params);
    369   }
    370 
    371   /**
    372    * Sends a frame.
    373    *
    374    * @param {Buffer[]} list The frame to send
    375    * @param {Function} cb Callback
    376    * @private
    377    */
    378   sendFrame(list, cb) {
    379     if (list.length === 2) {
    380       this._socket.cork();
    381       this._socket.write(list[0]);
    382       this._socket.write(list[1], cb);
    383       this._socket.uncork();
    384     } else {
    385       this._socket.write(list[0], cb);
    386     }
    387   }
    388 }
    389 
    390 module.exports = Sender;