sender.js (10715B)
1 'use strict'; 2 3 const crypto = require('crypto'); 4 5 const PerMessageDeflate = require('./permessage-deflate'); 6 const bufferUtil = require('./buffer-util'); 7 const validation = require('./validation'); 8 const constants = require('./constants'); 9 10 /** 11 * HyBi Sender implementation. 12 */ 13 class Sender { 14 /** 15 * Creates a Sender instance. 16 * 17 * @param {net.Socket} socket The connection socket 18 * @param {Object} extensions An object containing the negotiated extensions 19 */ 20 constructor(socket, extensions) { 21 this._extensions = extensions || {}; 22 this._socket = socket; 23 24 this._firstFragment = true; 25 this._compress = false; 26 27 this._bufferedBytes = 0; 28 this._deflating = false; 29 this._queue = []; 30 } 31 32 /** 33 * Frames a piece of data according to the HyBi WebSocket protocol. 34 * 35 * @param {Buffer} data The data to frame 36 * @param {Object} options Options object 37 * @param {Number} options.opcode The opcode 38 * @param {Boolean} options.readOnly Specifies whether `data` can be modified 39 * @param {Boolean} options.fin Specifies whether or not to set the FIN bit 40 * @param {Boolean} options.mask Specifies whether or not to mask `data` 41 * @param {Boolean} options.rsv1 Specifies whether or not to set the RSV1 bit 42 * @return {Buffer[]} The framed data as a list of `Buffer` instances 43 * @public 44 */ 45 static frame(data, options) { 46 const merge = data.length < 1024 || (options.mask && options.readOnly); 47 var offset = options.mask ? 6 : 2; 48 var payloadLength = data.length; 49 50 if (data.length >= 65536) { 51 offset += 8; 52 payloadLength = 127; 53 } else if (data.length > 125) { 54 offset += 2; 55 payloadLength = 126; 56 } 57 58 const target = Buffer.allocUnsafe(merge ? data.length + offset : offset); 59 60 target[0] = options.fin ? options.opcode | 0x80 : options.opcode; 61 if (options.rsv1) target[0] |= 0x40; 62 63 if (payloadLength === 126) { 64 target.writeUInt16BE(data.length, 2); 65 } else if (payloadLength === 127) { 66 target.writeUInt32BE(0, 2); 67 target.writeUInt32BE(data.length, 6); 68 } 69 70 if (!options.mask) { 71 target[1] = payloadLength; 72 if (merge) { 73 data.copy(target, offset); 74 return [target]; 75 } 76 77 return [target, data]; 78 } 79 80 const mask = crypto.randomBytes(4); 81 82 target[1] = payloadLength | 0x80; 83 target[offset - 4] = mask[0]; 84 target[offset - 3] = mask[1]; 85 target[offset - 2] = mask[2]; 86 target[offset - 1] = mask[3]; 87 88 if (merge) { 89 bufferUtil.mask(data, mask, target, offset, data.length); 90 return [target]; 91 } 92 93 bufferUtil.mask(data, mask, data, 0, data.length); 94 return [target, data]; 95 } 96 97 /** 98 * Sends a close message to the other peer. 99 * 100 * @param {(Number|undefined)} code The status code component of the body 101 * @param {String} data The message component of the body 102 * @param {Boolean} mask Specifies whether or not to mask the message 103 * @param {Function} cb Callback 104 * @public 105 */ 106 close(code, data, mask, cb) { 107 var buf; 108 109 if (code === undefined) { 110 buf = constants.EMPTY_BUFFER; 111 } else if ( 112 typeof code !== 'number' || 113 !validation.isValidStatusCode(code) 114 ) { 115 throw new TypeError('First argument must be a valid error code number'); 116 } else if (data === undefined || data === '') { 117 buf = Buffer.allocUnsafe(2); 118 buf.writeUInt16BE(code, 0); 119 } else { 120 buf = Buffer.allocUnsafe(2 + Buffer.byteLength(data)); 121 buf.writeUInt16BE(code, 0); 122 buf.write(data, 2); 123 } 124 125 if (this._deflating) { 126 this.enqueue([this.doClose, buf, mask, cb]); 127 } else { 128 this.doClose(buf, mask, cb); 129 } 130 } 131 132 /** 133 * Frames and sends a close message. 134 * 135 * @param {Buffer} data The message to send 136 * @param {Boolean} mask Specifies whether or not to mask `data` 137 * @param {Function} cb Callback 138 * @private 139 */ 140 doClose(data, mask, cb) { 141 this.sendFrame( 142 Sender.frame(data, { 143 fin: true, 144 rsv1: false, 145 opcode: 0x08, 146 mask, 147 readOnly: false 148 }), 149 cb 150 ); 151 } 152 153 /** 154 * Sends a ping message to the other peer. 155 * 156 * @param {*} data The message to send 157 * @param {Boolean} mask Specifies whether or not to mask `data` 158 * @param {Function} cb Callback 159 * @public 160 */ 161 ping(data, mask, cb) { 162 var readOnly = true; 163 164 if (!Buffer.isBuffer(data)) { 165 if (data instanceof ArrayBuffer) { 166 data = Buffer.from(data); 167 } else if (ArrayBuffer.isView(data)) { 168 data = viewToBuffer(data); 169 } else { 170 data = Buffer.from(data); 171 readOnly = false; 172 } 173 } 174 175 if (this._deflating) { 176 this.enqueue([this.doPing, data, mask, readOnly, cb]); 177 } else { 178 this.doPing(data, mask, readOnly, cb); 179 } 180 } 181 182 /** 183 * Frames and sends a ping message. 184 * 185 * @param {*} data The message to send 186 * @param {Boolean} mask Specifies whether or not to mask `data` 187 * @param {Boolean} readOnly Specifies whether `data` can be modified 188 * @param {Function} cb Callback 189 * @private 190 */ 191 doPing(data, mask, readOnly, cb) { 192 this.sendFrame( 193 Sender.frame(data, { 194 fin: true, 195 rsv1: false, 196 opcode: 0x09, 197 mask, 198 readOnly 199 }), 200 cb 201 ); 202 } 203 204 /** 205 * Sends a pong message to the other peer. 206 * 207 * @param {*} data The message to send 208 * @param {Boolean} mask Specifies whether or not to mask `data` 209 * @param {Function} cb Callback 210 * @public 211 */ 212 pong(data, mask, cb) { 213 var readOnly = true; 214 215 if (!Buffer.isBuffer(data)) { 216 if (data instanceof ArrayBuffer) { 217 data = Buffer.from(data); 218 } else if (ArrayBuffer.isView(data)) { 219 data = viewToBuffer(data); 220 } else { 221 data = Buffer.from(data); 222 readOnly = false; 223 } 224 } 225 226 if (this._deflating) { 227 this.enqueue([this.doPong, data, mask, readOnly, cb]); 228 } else { 229 this.doPong(data, mask, readOnly, cb); 230 } 231 } 232 233 /** 234 * Frames and sends a pong message. 235 * 236 * @param {*} data The message to send 237 * @param {Boolean} mask Specifies whether or not to mask `data` 238 * @param {Boolean} readOnly Specifies whether `data` can be modified 239 * @param {Function} cb Callback 240 * @private 241 */ 242 doPong(data, mask, readOnly, cb) { 243 this.sendFrame( 244 Sender.frame(data, { 245 fin: true, 246 rsv1: false, 247 opcode: 0x0a, 248 mask, 249 readOnly 250 }), 251 cb 252 ); 253 } 254 255 /** 256 * Sends a data message to the other peer. 257 * 258 * @param {*} data The message to send 259 * @param {Object} options Options object 260 * @param {Boolean} options.compress Specifies whether or not to compress `data` 261 * @param {Boolean} options.binary Specifies whether `data` is binary or text 262 * @param {Boolean} options.fin Specifies whether the fragment is the last one 263 * @param {Boolean} options.mask Specifies whether or not to mask `data` 264 * @param {Function} cb Callback 265 * @public 266 */ 267 send(data, options, cb) { 268 var opcode = options.binary ? 2 : 1; 269 var rsv1 = options.compress; 270 var readOnly = true; 271 272 if (!Buffer.isBuffer(data)) { 273 if (data instanceof ArrayBuffer) { 274 data = Buffer.from(data); 275 } else if (ArrayBuffer.isView(data)) { 276 data = viewToBuffer(data); 277 } else { 278 data = Buffer.from(data); 279 readOnly = false; 280 } 281 } 282 283 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName]; 284 285 if (this._firstFragment) { 286 this._firstFragment = false; 287 if (rsv1 && perMessageDeflate) { 288 rsv1 = data.length >= perMessageDeflate._threshold; 289 } 290 this._compress = rsv1; 291 } else { 292 rsv1 = false; 293 opcode = 0; 294 } 295 296 if (options.fin) this._firstFragment = true; 297 298 if (perMessageDeflate) { 299 const opts = { 300 fin: options.fin, 301 rsv1, 302 opcode, 303 mask: options.mask, 304 readOnly 305 }; 306 307 if (this._deflating) { 308 this.enqueue([this.dispatch, data, this._compress, opts, cb]); 309 } else { 310 this.dispatch(data, this._compress, opts, cb); 311 } 312 } else { 313 this.sendFrame( 314 Sender.frame(data, { 315 fin: options.fin, 316 rsv1: false, 317 opcode, 318 mask: options.mask, 319 readOnly 320 }), 321 cb 322 ); 323 } 324 } 325 326 /** 327 * Dispatches a data message. 328 * 329 * @param {Buffer} data The message to send 330 * @param {Boolean} compress Specifies whether or not to compress `data` 331 * @param {Object} options Options object 332 * @param {Number} options.opcode The opcode 333 * @param {Boolean} options.readOnly Specifies whether `data` can be modified 334 * @param {Boolean} options.fin Specifies whether or not to set the FIN bit 335 * @param {Boolean} options.mask Specifies whether or not to mask `data` 336 * @param {Boolean} options.rsv1 Specifies whether or not to set the RSV1 bit 337 * @param {Function} cb Callback 338 * @private 339 */ 340 dispatch(data, compress, options, cb) { 341 if (!compress) { 342 this.sendFrame(Sender.frame(data, options), cb); 343 return; 344 } 345 346 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName]; 347 348 this._deflating = true; 349 perMessageDeflate.compress(data, options.fin, (_, buf) => { 350 this._deflating = false; 351 options.readOnly = false; 352 this.sendFrame(Sender.frame(buf, options), cb); 353 this.dequeue(); 354 }); 355 } 356 357 /** 358 * Executes queued send operations. 359 * 360 * @private 361 */ 362 dequeue() { 363 while (!this._deflating && this._queue.length) { 364 const params = this._queue.shift(); 365 366 this._bufferedBytes -= params[1].length; 367 params[0].apply(this, params.slice(1)); 368 } 369 } 370 371 /** 372 * Enqueues a send operation. 373 * 374 * @param {Array} params Send operation parameters. 375 * @private 376 */ 377 enqueue(params) { 378 this._bufferedBytes += params[1].length; 379 this._queue.push(params); 380 } 381 382 /** 383 * Sends a frame. 384 * 385 * @param {Buffer[]} list The frame to send 386 * @param {Function} cb Callback 387 * @private 388 */ 389 sendFrame(list, cb) { 390 if (list.length === 2) { 391 this._socket.write(list[0]); 392 this._socket.write(list[1], cb); 393 } else { 394 this._socket.write(list[0], cb); 395 } 396 } 397 } 398 399 module.exports = Sender; 400 401 /** 402 * Converts an `ArrayBuffer` view into a buffer. 403 * 404 * @param {(DataView|TypedArray)} view The view to convert 405 * @return {Buffer} Converted view 406 * @private 407 */ 408 function viewToBuffer(view) { 409 const buf = Buffer.from(view.buffer); 410 411 if (view.byteLength !== view.buffer.byteLength) { 412 return buf.slice(view.byteOffset, view.byteOffset + view.byteLength); 413 } 414 415 return buf; 416 }