sender.js (10290B)
1 'use strict'; 2 3 const { randomFillSync } = require('crypto'); 4 5 const PerMessageDeflate = require('./permessage-deflate'); 6 const { EMPTY_BUFFER } = require('./constants'); 7 const { isValidStatusCode } = require('./validation'); 8 const { mask: applyMask, toBuffer } = require('./buffer-util'); 9 10 const mask = Buffer.alloc(4); 11 12 /** 13 * HyBi Sender implementation. 14 */ 15 class Sender { 16 /** 17 * Creates a Sender instance. 18 * 19 * @param {net.Socket} socket The connection socket 20 * @param {Object} extensions An object containing the negotiated extensions 21 */ 22 constructor(socket, extensions) { 23 this._extensions = extensions || {}; 24 this._socket = socket; 25 26 this._firstFragment = true; 27 this._compress = false; 28 29 this._bufferedBytes = 0; 30 this._deflating = false; 31 this._queue = []; 32 } 33 34 /** 35 * Frames a piece of data according to the HyBi WebSocket protocol. 36 * 37 * @param {Buffer} data The data to frame 38 * @param {Object} options Options object 39 * @param {Number} options.opcode The opcode 40 * @param {Boolean} options.readOnly Specifies whether `data` can be modified 41 * @param {Boolean} options.fin Specifies whether or not to set the FIN bit 42 * @param {Boolean} options.mask Specifies whether or not to mask `data` 43 * @param {Boolean} options.rsv1 Specifies whether or not to set the RSV1 bit 44 * @return {Buffer[]} The framed data as a list of `Buffer` instances 45 * @public 46 */ 47 static frame(data, options) { 48 const merge = options.mask && options.readOnly; 49 let offset = options.mask ? 6 : 2; 50 let payloadLength = data.length; 51 52 if (data.length >= 65536) { 53 offset += 8; 54 payloadLength = 127; 55 } else if (data.length > 125) { 56 offset += 2; 57 payloadLength = 126; 58 } 59 60 const target = Buffer.allocUnsafe(merge ? data.length + offset : offset); 61 62 target[0] = options.fin ? options.opcode | 0x80 : options.opcode; 63 if (options.rsv1) target[0] |= 0x40; 64 65 target[1] = payloadLength; 66 67 if (payloadLength === 126) { 68 target.writeUInt16BE(data.length, 2); 69 } else if (payloadLength === 127) { 70 target.writeUInt32BE(0, 2); 71 target.writeUInt32BE(data.length, 6); 72 } 73 74 if (!options.mask) return [target, data]; 75 76 randomFillSync(mask, 0, 4); 77 78 target[1] |= 0x80; 79 target[offset - 4] = mask[0]; 80 target[offset - 3] = mask[1]; 81 target[offset - 2] = mask[2]; 82 target[offset - 1] = mask[3]; 83 84 if (merge) { 85 applyMask(data, mask, target, offset, data.length); 86 return [target]; 87 } 88 89 applyMask(data, mask, data, 0, data.length); 90 return [target, data]; 91 } 92 93 /** 94 * Sends a close message to the other peer. 95 * 96 * @param {(Number|undefined)} code The status code component of the body 97 * @param {String} data The message component of the body 98 * @param {Boolean} mask Specifies whether or not to mask the message 99 * @param {Function} cb Callback 100 * @public 101 */ 102 close(code, data, mask, cb) { 103 let buf; 104 105 if (code === undefined) { 106 buf = EMPTY_BUFFER; 107 } else if (typeof code !== 'number' || !isValidStatusCode(code)) { 108 throw new TypeError('First argument must be a valid error code number'); 109 } else if (data === undefined || data === '') { 110 buf = Buffer.allocUnsafe(2); 111 buf.writeUInt16BE(code, 0); 112 } else { 113 const length = Buffer.byteLength(data); 114 115 if (length > 123) { 116 throw new RangeError('The message must not be greater than 123 bytes'); 117 } 118 119 buf = Buffer.allocUnsafe(2 + length); 120 buf.writeUInt16BE(code, 0); 121 buf.write(data, 2); 122 } 123 124 if (this._deflating) { 125 this.enqueue([this.doClose, buf, mask, cb]); 126 } else { 127 this.doClose(buf, mask, cb); 128 } 129 } 130 131 /** 132 * Frames and sends a close message. 133 * 134 * @param {Buffer} data The message to send 135 * @param {Boolean} mask Specifies whether or not to mask `data` 136 * @param {Function} cb Callback 137 * @private 138 */ 139 doClose(data, mask, cb) { 140 this.sendFrame( 141 Sender.frame(data, { 142 fin: true, 143 rsv1: false, 144 opcode: 0x08, 145 mask, 146 readOnly: false 147 }), 148 cb 149 ); 150 } 151 152 /** 153 * Sends a ping message to the other peer. 154 * 155 * @param {*} data The message to send 156 * @param {Boolean} mask Specifies whether or not to mask `data` 157 * @param {Function} cb Callback 158 * @public 159 */ 160 ping(data, mask, cb) { 161 const buf = toBuffer(data); 162 163 if (buf.length > 125) { 164 throw new RangeError('The data size must not be greater than 125 bytes'); 165 } 166 167 if (this._deflating) { 168 this.enqueue([this.doPing, buf, mask, toBuffer.readOnly, cb]); 169 } else { 170 this.doPing(buf, mask, toBuffer.readOnly, cb); 171 } 172 } 173 174 /** 175 * Frames and sends a ping message. 176 * 177 * @param {*} data The message to send 178 * @param {Boolean} mask Specifies whether or not to mask `data` 179 * @param {Boolean} readOnly Specifies whether `data` can be modified 180 * @param {Function} cb Callback 181 * @private 182 */ 183 doPing(data, mask, readOnly, cb) { 184 this.sendFrame( 185 Sender.frame(data, { 186 fin: true, 187 rsv1: false, 188 opcode: 0x09, 189 mask, 190 readOnly 191 }), 192 cb 193 ); 194 } 195 196 /** 197 * Sends a pong message to the other peer. 198 * 199 * @param {*} data The message to send 200 * @param {Boolean} mask Specifies whether or not to mask `data` 201 * @param {Function} cb Callback 202 * @public 203 */ 204 pong(data, mask, cb) { 205 const buf = toBuffer(data); 206 207 if (buf.length > 125) { 208 throw new RangeError('The data size must not be greater than 125 bytes'); 209 } 210 211 if (this._deflating) { 212 this.enqueue([this.doPong, buf, mask, toBuffer.readOnly, cb]); 213 } else { 214 this.doPong(buf, mask, toBuffer.readOnly, cb); 215 } 216 } 217 218 /** 219 * Frames and sends a pong message. 220 * 221 * @param {*} data The message to send 222 * @param {Boolean} mask Specifies whether or not to mask `data` 223 * @param {Boolean} readOnly Specifies whether `data` can be modified 224 * @param {Function} cb Callback 225 * @private 226 */ 227 doPong(data, mask, readOnly, cb) { 228 this.sendFrame( 229 Sender.frame(data, { 230 fin: true, 231 rsv1: false, 232 opcode: 0x0a, 233 mask, 234 readOnly 235 }), 236 cb 237 ); 238 } 239 240 /** 241 * Sends a data message to the other peer. 242 * 243 * @param {*} data The message to send 244 * @param {Object} options Options object 245 * @param {Boolean} options.compress Specifies whether or not to compress `data` 246 * @param {Boolean} options.binary Specifies whether `data` is binary or text 247 * @param {Boolean} options.fin Specifies whether the fragment is the last one 248 * @param {Boolean} options.mask Specifies whether or not to mask `data` 249 * @param {Function} cb Callback 250 * @public 251 */ 252 send(data, options, cb) { 253 const buf = toBuffer(data); 254 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName]; 255 let opcode = options.binary ? 2 : 1; 256 let rsv1 = options.compress; 257 258 if (this._firstFragment) { 259 this._firstFragment = false; 260 if (rsv1 && perMessageDeflate) { 261 rsv1 = buf.length >= perMessageDeflate._threshold; 262 } 263 this._compress = rsv1; 264 } else { 265 rsv1 = false; 266 opcode = 0; 267 } 268 269 if (options.fin) this._firstFragment = true; 270 271 if (perMessageDeflate) { 272 const opts = { 273 fin: options.fin, 274 rsv1, 275 opcode, 276 mask: options.mask, 277 readOnly: toBuffer.readOnly 278 }; 279 280 if (this._deflating) { 281 this.enqueue([this.dispatch, buf, this._compress, opts, cb]); 282 } else { 283 this.dispatch(buf, this._compress, opts, cb); 284 } 285 } else { 286 this.sendFrame( 287 Sender.frame(buf, { 288 fin: options.fin, 289 rsv1: false, 290 opcode, 291 mask: options.mask, 292 readOnly: toBuffer.readOnly 293 }), 294 cb 295 ); 296 } 297 } 298 299 /** 300 * Dispatches a data message. 301 * 302 * @param {Buffer} data The message to send 303 * @param {Boolean} compress Specifies whether or not to compress `data` 304 * @param {Object} options Options object 305 * @param {Number} options.opcode The opcode 306 * @param {Boolean} options.readOnly Specifies whether `data` can be modified 307 * @param {Boolean} options.fin Specifies whether or not to set the FIN bit 308 * @param {Boolean} options.mask Specifies whether or not to mask `data` 309 * @param {Boolean} options.rsv1 Specifies whether or not to set the RSV1 bit 310 * @param {Function} cb Callback 311 * @private 312 */ 313 dispatch(data, compress, options, cb) { 314 if (!compress) { 315 this.sendFrame(Sender.frame(data, options), cb); 316 return; 317 } 318 319 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName]; 320 321 this._deflating = true; 322 perMessageDeflate.compress(data, options.fin, (_, buf) => { 323 if (this._socket.destroyed) { 324 const err = new Error( 325 'The socket was closed while data was being compressed' 326 ); 327 328 if (typeof cb === 'function') cb(err); 329 330 for (let i = 0; i < this._queue.length; i++) { 331 const callback = this._queue[i][4]; 332 333 if (typeof callback === 'function') callback(err); 334 } 335 336 return; 337 } 338 339 this._deflating = false; 340 options.readOnly = false; 341 this.sendFrame(Sender.frame(buf, options), cb); 342 this.dequeue(); 343 }); 344 } 345 346 /** 347 * Executes queued send operations. 348 * 349 * @private 350 */ 351 dequeue() { 352 while (!this._deflating && this._queue.length) { 353 const params = this._queue.shift(); 354 355 this._bufferedBytes -= params[1].length; 356 Reflect.apply(params[0], this, params.slice(1)); 357 } 358 } 359 360 /** 361 * Enqueues a send operation. 362 * 363 * @param {Array} params Send operation parameters. 364 * @private 365 */ 366 enqueue(params) { 367 this._bufferedBytes += params[1].length; 368 this._queue.push(params); 369 } 370 371 /** 372 * Sends a frame. 373 * 374 * @param {Buffer[]} list The frame to send 375 * @param {Function} cb Callback 376 * @private 377 */ 378 sendFrame(list, cb) { 379 if (list.length === 2) { 380 this._socket.cork(); 381 this._socket.write(list[0]); 382 this._socket.write(list[1], cb); 383 this._socket.uncork(); 384 } else { 385 this._socket.write(list[0], cb); 386 } 387 } 388 } 389 390 module.exports = Sender;