receiver.js (12390B)
1 'use strict'; 2 3 const { Writable } = require('stream'); 4 5 const PerMessageDeflate = require('./permessage-deflate'); 6 const { 7 BINARY_TYPES, 8 EMPTY_BUFFER, 9 kStatusCode, 10 kWebSocket 11 } = require('./constants'); 12 const { concat, toArrayBuffer, unmask } = require('./buffer-util'); 13 const { isValidStatusCode, isValidUTF8 } = require('./validation'); 14 15 const GET_INFO = 0; 16 const GET_PAYLOAD_LENGTH_16 = 1; 17 const GET_PAYLOAD_LENGTH_64 = 2; 18 const GET_MASK = 3; 19 const GET_DATA = 4; 20 const INFLATING = 5; 21 22 /** 23 * HyBi Receiver implementation. 24 * 25 * @extends stream.Writable 26 */ 27 class Receiver extends Writable { 28 /** 29 * Creates a Receiver instance. 30 * 31 * @param {String} binaryType The type for binary data 32 * @param {Object} extensions An object containing the negotiated extensions 33 * @param {Boolean} isServer Specifies whether to operate in client or server 34 * mode 35 * @param {Number} maxPayload The maximum allowed message length 36 */ 37 constructor(binaryType, extensions, isServer, maxPayload) { 38 super(); 39 40 this._binaryType = binaryType || BINARY_TYPES[0]; 41 this[kWebSocket] = undefined; 42 this._extensions = extensions || {}; 43 this._isServer = !!isServer; 44 this._maxPayload = maxPayload | 0; 45 46 this._bufferedBytes = 0; 47 this._buffers = []; 48 49 this._compressed = false; 50 this._payloadLength = 0; 51 this._mask = undefined; 52 this._fragmented = 0; 53 this._masked = false; 54 this._fin = false; 55 this._opcode = 0; 56 57 this._totalPayloadLength = 0; 58 this._messageLength = 0; 59 this._fragments = []; 60 61 this._state = GET_INFO; 62 this._loop = false; 63 } 64 65 /** 66 * Implements `Writable.prototype._write()`. 67 * 68 * @param {Buffer} chunk The chunk of data to write 69 * @param {String} encoding The character encoding of `chunk` 70 * @param {Function} cb Callback 71 */ 72 _write(chunk, encoding, cb) { 73 if (this._opcode === 0x08 && this._state == GET_INFO) return cb(); 74 75 this._bufferedBytes += chunk.length; 76 this._buffers.push(chunk); 77 this.startLoop(cb); 78 } 79 80 /** 81 * Consumes `n` bytes from the buffered data. 82 * 83 * @param {Number} n The number of bytes to consume 84 * @return {Buffer} The consumed bytes 85 * @private 86 */ 87 consume(n) { 88 this._bufferedBytes -= n; 89 90 if (n === this._buffers[0].length) return this._buffers.shift(); 91 92 if (n < this._buffers[0].length) { 93 const buf = this._buffers[0]; 94 this._buffers[0] = buf.slice(n); 95 return buf.slice(0, n); 96 } 97 98 const dst = Buffer.allocUnsafe(n); 99 100 do { 101 const buf = this._buffers[0]; 102 const offset = dst.length - n; 103 104 if (n >= buf.length) { 105 dst.set(this._buffers.shift(), offset); 106 } else { 107 dst.set(new Uint8Array(buf.buffer, buf.byteOffset, n), offset); 108 this._buffers[0] = buf.slice(n); 109 } 110 111 n -= buf.length; 112 } while (n > 0); 113 114 return dst; 115 } 116 117 /** 118 * Starts the parsing loop. 119 * 120 * @param {Function} cb Callback 121 * @private 122 */ 123 startLoop(cb) { 124 let err; 125 this._loop = true; 126 127 do { 128 switch (this._state) { 129 case GET_INFO: 130 err = this.getInfo(); 131 break; 132 case GET_PAYLOAD_LENGTH_16: 133 err = this.getPayloadLength16(); 134 break; 135 case GET_PAYLOAD_LENGTH_64: 136 err = this.getPayloadLength64(); 137 break; 138 case GET_MASK: 139 this.getMask(); 140 break; 141 case GET_DATA: 142 err = this.getData(cb); 143 break; 144 default: 145 // `INFLATING` 146 this._loop = false; 147 return; 148 } 149 } while (this._loop); 150 151 cb(err); 152 } 153 154 /** 155 * Reads the first two bytes of a frame. 156 * 157 * @return {(RangeError|undefined)} A possible error 158 * @private 159 */ 160 getInfo() { 161 if (this._bufferedBytes < 2) { 162 this._loop = false; 163 return; 164 } 165 166 const buf = this.consume(2); 167 168 if ((buf[0] & 0x30) !== 0x00) { 169 this._loop = false; 170 return error(RangeError, 'RSV2 and RSV3 must be clear', true, 1002); 171 } 172 173 const compressed = (buf[0] & 0x40) === 0x40; 174 175 if (compressed && !this._extensions[PerMessageDeflate.extensionName]) { 176 this._loop = false; 177 return error(RangeError, 'RSV1 must be clear', true, 1002); 178 } 179 180 this._fin = (buf[0] & 0x80) === 0x80; 181 this._opcode = buf[0] & 0x0f; 182 this._payloadLength = buf[1] & 0x7f; 183 184 if (this._opcode === 0x00) { 185 if (compressed) { 186 this._loop = false; 187 return error(RangeError, 'RSV1 must be clear', true, 1002); 188 } 189 190 if (!this._fragmented) { 191 this._loop = false; 192 return error(RangeError, 'invalid opcode 0', true, 1002); 193 } 194 195 this._opcode = this._fragmented; 196 } else if (this._opcode === 0x01 || this._opcode === 0x02) { 197 if (this._fragmented) { 198 this._loop = false; 199 return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002); 200 } 201 202 this._compressed = compressed; 203 } else if (this._opcode > 0x07 && this._opcode < 0x0b) { 204 if (!this._fin) { 205 this._loop = false; 206 return error(RangeError, 'FIN must be set', true, 1002); 207 } 208 209 if (compressed) { 210 this._loop = false; 211 return error(RangeError, 'RSV1 must be clear', true, 1002); 212 } 213 214 if (this._payloadLength > 0x7d) { 215 this._loop = false; 216 return error( 217 RangeError, 218 `invalid payload length ${this._payloadLength}`, 219 true, 220 1002 221 ); 222 } 223 } else { 224 this._loop = false; 225 return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002); 226 } 227 228 if (!this._fin && !this._fragmented) this._fragmented = this._opcode; 229 this._masked = (buf[1] & 0x80) === 0x80; 230 231 if (this._isServer) { 232 if (!this._masked) { 233 this._loop = false; 234 return error(RangeError, 'MASK must be set', true, 1002); 235 } 236 } else if (this._masked) { 237 this._loop = false; 238 return error(RangeError, 'MASK must be clear', true, 1002); 239 } 240 241 if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16; 242 else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64; 243 else return this.haveLength(); 244 } 245 246 /** 247 * Gets extended payload length (7+16). 248 * 249 * @return {(RangeError|undefined)} A possible error 250 * @private 251 */ 252 getPayloadLength16() { 253 if (this._bufferedBytes < 2) { 254 this._loop = false; 255 return; 256 } 257 258 this._payloadLength = this.consume(2).readUInt16BE(0); 259 return this.haveLength(); 260 } 261 262 /** 263 * Gets extended payload length (7+64). 264 * 265 * @return {(RangeError|undefined)} A possible error 266 * @private 267 */ 268 getPayloadLength64() { 269 if (this._bufferedBytes < 8) { 270 this._loop = false; 271 return; 272 } 273 274 const buf = this.consume(8); 275 const num = buf.readUInt32BE(0); 276 277 // 278 // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned 279 // if payload length is greater than this number. 280 // 281 if (num > Math.pow(2, 53 - 32) - 1) { 282 this._loop = false; 283 return error( 284 RangeError, 285 'Unsupported WebSocket frame: payload length > 2^53 - 1', 286 false, 287 1009 288 ); 289 } 290 291 this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4); 292 return this.haveLength(); 293 } 294 295 /** 296 * Payload length has been read. 297 * 298 * @return {(RangeError|undefined)} A possible error 299 * @private 300 */ 301 haveLength() { 302 if (this._payloadLength && this._opcode < 0x08) { 303 this._totalPayloadLength += this._payloadLength; 304 if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) { 305 this._loop = false; 306 return error(RangeError, 'Max payload size exceeded', false, 1009); 307 } 308 } 309 310 if (this._masked) this._state = GET_MASK; 311 else this._state = GET_DATA; 312 } 313 314 /** 315 * Reads mask bytes. 316 * 317 * @private 318 */ 319 getMask() { 320 if (this._bufferedBytes < 4) { 321 this._loop = false; 322 return; 323 } 324 325 this._mask = this.consume(4); 326 this._state = GET_DATA; 327 } 328 329 /** 330 * Reads data bytes. 331 * 332 * @param {Function} cb Callback 333 * @return {(Error|RangeError|undefined)} A possible error 334 * @private 335 */ 336 getData(cb) { 337 let data = EMPTY_BUFFER; 338 339 if (this._payloadLength) { 340 if (this._bufferedBytes < this._payloadLength) { 341 this._loop = false; 342 return; 343 } 344 345 data = this.consume(this._payloadLength); 346 if (this._masked) unmask(data, this._mask); 347 } 348 349 if (this._opcode > 0x07) return this.controlMessage(data); 350 351 if (this._compressed) { 352 this._state = INFLATING; 353 this.decompress(data, cb); 354 return; 355 } 356 357 if (data.length) { 358 // 359 // This message is not compressed so its lenght is the sum of the payload 360 // length of all fragments. 361 // 362 this._messageLength = this._totalPayloadLength; 363 this._fragments.push(data); 364 } 365 366 return this.dataMessage(); 367 } 368 369 /** 370 * Decompresses data. 371 * 372 * @param {Buffer} data Compressed data 373 * @param {Function} cb Callback 374 * @private 375 */ 376 decompress(data, cb) { 377 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName]; 378 379 perMessageDeflate.decompress(data, this._fin, (err, buf) => { 380 if (err) return cb(err); 381 382 if (buf.length) { 383 this._messageLength += buf.length; 384 if (this._messageLength > this._maxPayload && this._maxPayload > 0) { 385 return cb( 386 error(RangeError, 'Max payload size exceeded', false, 1009) 387 ); 388 } 389 390 this._fragments.push(buf); 391 } 392 393 const er = this.dataMessage(); 394 if (er) return cb(er); 395 396 this.startLoop(cb); 397 }); 398 } 399 400 /** 401 * Handles a data message. 402 * 403 * @return {(Error|undefined)} A possible error 404 * @private 405 */ 406 dataMessage() { 407 if (this._fin) { 408 const messageLength = this._messageLength; 409 const fragments = this._fragments; 410 411 this._totalPayloadLength = 0; 412 this._messageLength = 0; 413 this._fragmented = 0; 414 this._fragments = []; 415 416 if (this._opcode === 2) { 417 let data; 418 419 if (this._binaryType === 'nodebuffer') { 420 data = concat(fragments, messageLength); 421 } else if (this._binaryType === 'arraybuffer') { 422 data = toArrayBuffer(concat(fragments, messageLength)); 423 } else { 424 data = fragments; 425 } 426 427 this.emit('message', data); 428 } else { 429 const buf = concat(fragments, messageLength); 430 431 if (!isValidUTF8(buf)) { 432 this._loop = false; 433 return error(Error, 'invalid UTF-8 sequence', true, 1007); 434 } 435 436 this.emit('message', buf.toString()); 437 } 438 } 439 440 this._state = GET_INFO; 441 } 442 443 /** 444 * Handles a control message. 445 * 446 * @param {Buffer} data Data to handle 447 * @return {(Error|RangeError|undefined)} A possible error 448 * @private 449 */ 450 controlMessage(data) { 451 if (this._opcode === 0x08) { 452 this._loop = false; 453 454 if (data.length === 0) { 455 this.emit('conclude', 1005, ''); 456 this.end(); 457 } else if (data.length === 1) { 458 return error(RangeError, 'invalid payload length 1', true, 1002); 459 } else { 460 const code = data.readUInt16BE(0); 461 462 if (!isValidStatusCode(code)) { 463 return error(RangeError, `invalid status code ${code}`, true, 1002); 464 } 465 466 const buf = data.slice(2); 467 468 if (!isValidUTF8(buf)) { 469 return error(Error, 'invalid UTF-8 sequence', true, 1007); 470 } 471 472 this.emit('conclude', code, buf.toString()); 473 this.end(); 474 } 475 } else if (this._opcode === 0x09) { 476 this.emit('ping', data); 477 } else { 478 this.emit('pong', data); 479 } 480 481 this._state = GET_INFO; 482 } 483 } 484 485 module.exports = Receiver; 486 487 /** 488 * Builds an error object. 489 * 490 * @param {(Error|RangeError)} ErrorCtor The error constructor 491 * @param {String} message The error message 492 * @param {Boolean} prefix Specifies whether or not to add a default prefix to 493 * `message` 494 * @param {Number} statusCode The status code 495 * @return {(Error|RangeError)} The error 496 * @private 497 */ 498 function error(ErrorCtor, message, prefix, statusCode) { 499 const err = new ErrorCtor( 500 prefix ? `Invalid WebSocket frame: ${message}` : message 501 ); 502 503 Error.captureStackTrace(err, error); 504 err[kStatusCode] = statusCode; 505 return err; 506 }