receiver.js (12664B)
1 'use strict'; 2 3 const stream = require('stream'); 4 5 const PerMessageDeflate = require('./permessage-deflate'); 6 const bufferUtil = require('./buffer-util'); 7 const validation = require('./validation'); 8 const constants = require('./constants'); 9 10 const GET_INFO = 0; 11 const GET_PAYLOAD_LENGTH_16 = 1; 12 const GET_PAYLOAD_LENGTH_64 = 2; 13 const GET_MASK = 3; 14 const GET_DATA = 4; 15 const INFLATING = 5; 16 17 /** 18 * HyBi Receiver implementation. 19 * 20 * @extends stream.Writable 21 */ 22 class Receiver extends stream.Writable { 23 /** 24 * Creates a Receiver instance. 25 * 26 * @param {String} binaryType The type for binary data 27 * @param {Object} extensions An object containing the negotiated extensions 28 * @param {Number} maxPayload The maximum allowed message length 29 */ 30 constructor(binaryType, extensions, maxPayload) { 31 super(); 32 33 this._binaryType = binaryType || constants.BINARY_TYPES[0]; 34 this[constants.kWebSocket] = undefined; 35 this._extensions = extensions || {}; 36 this._maxPayload = maxPayload | 0; 37 38 this._bufferedBytes = 0; 39 this._buffers = []; 40 41 this._compressed = false; 42 this._payloadLength = 0; 43 this._mask = undefined; 44 this._fragmented = 0; 45 this._masked = false; 46 this._fin = false; 47 this._opcode = 0; 48 49 this._totalPayloadLength = 0; 50 this._messageLength = 0; 51 this._fragments = []; 52 53 this._state = GET_INFO; 54 this._loop = false; 55 } 56 57 /** 58 * Implements `Writable.prototype._write()`. 59 * 60 * @param {Buffer} chunk The chunk of data to write 61 * @param {String} encoding The character encoding of `chunk` 62 * @param {Function} cb Callback 63 */ 64 _write(chunk, encoding, cb) { 65 if (this._opcode === 0x08 && this._state == GET_INFO) return cb(); 66 67 this._bufferedBytes += chunk.length; 68 this._buffers.push(chunk); 69 this.startLoop(cb); 70 } 71 72 /** 73 * Consumes `n` bytes from the buffered data. 74 * 75 * @param {Number} n The number of bytes to consume 76 * @return {Buffer} The consumed bytes 77 * @private 78 */ 79 consume(n) { 80 this._bufferedBytes -= n; 81 82 if (n === this._buffers[0].length) return this._buffers.shift(); 83 84 if (n < this._buffers[0].length) { 85 const buf = this._buffers[0]; 86 this._buffers[0] = buf.slice(n); 87 return buf.slice(0, n); 88 } 89 90 const dst = Buffer.allocUnsafe(n); 91 92 do { 93 const buf = this._buffers[0]; 94 95 if (n >= buf.length) { 96 this._buffers.shift().copy(dst, dst.length - n); 97 } else { 98 buf.copy(dst, dst.length - n, 0, n); 99 this._buffers[0] = buf.slice(n); 100 } 101 102 n -= buf.length; 103 } while (n > 0); 104 105 return dst; 106 } 107 108 /** 109 * Starts the parsing loop. 110 * 111 * @param {Function} cb Callback 112 * @private 113 */ 114 startLoop(cb) { 115 var err; 116 this._loop = true; 117 118 do { 119 switch (this._state) { 120 case GET_INFO: 121 err = this.getInfo(); 122 break; 123 case GET_PAYLOAD_LENGTH_16: 124 err = this.getPayloadLength16(); 125 break; 126 case GET_PAYLOAD_LENGTH_64: 127 err = this.getPayloadLength64(); 128 break; 129 case GET_MASK: 130 this.getMask(); 131 break; 132 case GET_DATA: 133 err = this.getData(cb); 134 break; 135 default: 136 // `INFLATING` 137 this._loop = false; 138 return; 139 } 140 } while (this._loop); 141 142 cb(err); 143 } 144 145 /** 146 * Reads the first two bytes of a frame. 147 * 148 * @return {(RangeError|undefined)} A possible error 149 * @private 150 */ 151 getInfo() { 152 if (this._bufferedBytes < 2) { 153 this._loop = false; 154 return; 155 } 156 157 const buf = this.consume(2); 158 159 if ((buf[0] & 0x30) !== 0x00) { 160 this._loop = false; 161 return error(RangeError, 'RSV2 and RSV3 must be clear', true, 1002); 162 } 163 164 const compressed = (buf[0] & 0x40) === 0x40; 165 166 if (compressed && !this._extensions[PerMessageDeflate.extensionName]) { 167 this._loop = false; 168 return error(RangeError, 'RSV1 must be clear', true, 1002); 169 } 170 171 this._fin = (buf[0] & 0x80) === 0x80; 172 this._opcode = buf[0] & 0x0f; 173 this._payloadLength = buf[1] & 0x7f; 174 175 if (this._opcode === 0x00) { 176 if (compressed) { 177 this._loop = false; 178 return error(RangeError, 'RSV1 must be clear', true, 1002); 179 } 180 181 if (!this._fragmented) { 182 this._loop = false; 183 return error(RangeError, 'invalid opcode 0', true, 1002); 184 } 185 186 this._opcode = this._fragmented; 187 } else if (this._opcode === 0x01 || this._opcode === 0x02) { 188 if (this._fragmented) { 189 this._loop = false; 190 return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002); 191 } 192 193 this._compressed = compressed; 194 } else if (this._opcode > 0x07 && this._opcode < 0x0b) { 195 if (!this._fin) { 196 this._loop = false; 197 return error(RangeError, 'FIN must be set', true, 1002); 198 } 199 200 if (compressed) { 201 this._loop = false; 202 return error(RangeError, 'RSV1 must be clear', true, 1002); 203 } 204 205 if (this._payloadLength > 0x7d) { 206 this._loop = false; 207 return error( 208 RangeError, 209 `invalid payload length ${this._payloadLength}`, 210 true, 211 1002 212 ); 213 } 214 } else { 215 this._loop = false; 216 return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002); 217 } 218 219 if (!this._fin && !this._fragmented) this._fragmented = this._opcode; 220 this._masked = (buf[1] & 0x80) === 0x80; 221 222 if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16; 223 else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64; 224 else return this.haveLength(); 225 } 226 227 /** 228 * Gets extended payload length (7+16). 229 * 230 * @return {(RangeError|undefined)} A possible error 231 * @private 232 */ 233 getPayloadLength16() { 234 if (this._bufferedBytes < 2) { 235 this._loop = false; 236 return; 237 } 238 239 this._payloadLength = this.consume(2).readUInt16BE(0); 240 return this.haveLength(); 241 } 242 243 /** 244 * Gets extended payload length (7+64). 245 * 246 * @return {(RangeError|undefined)} A possible error 247 * @private 248 */ 249 getPayloadLength64() { 250 if (this._bufferedBytes < 8) { 251 this._loop = false; 252 return; 253 } 254 255 const buf = this.consume(8); 256 const num = buf.readUInt32BE(0); 257 258 // 259 // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned 260 // if payload length is greater than this number. 261 // 262 if (num > Math.pow(2, 53 - 32) - 1) { 263 this._loop = false; 264 return error( 265 RangeError, 266 'Unsupported WebSocket frame: payload length > 2^53 - 1', 267 false, 268 1009 269 ); 270 } 271 272 this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4); 273 return this.haveLength(); 274 } 275 276 /** 277 * Payload length has been read. 278 * 279 * @return {(RangeError|undefined)} A possible error 280 * @private 281 */ 282 haveLength() { 283 if (this._payloadLength && this._opcode < 0x08) { 284 this._totalPayloadLength += this._payloadLength; 285 if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) { 286 this._loop = false; 287 return error(RangeError, 'Max payload size exceeded', false, 1009); 288 } 289 } 290 291 if (this._masked) this._state = GET_MASK; 292 else this._state = GET_DATA; 293 } 294 295 /** 296 * Reads mask bytes. 297 * 298 * @private 299 */ 300 getMask() { 301 if (this._bufferedBytes < 4) { 302 this._loop = false; 303 return; 304 } 305 306 this._mask = this.consume(4); 307 this._state = GET_DATA; 308 } 309 310 /** 311 * Reads data bytes. 312 * 313 * @param {Function} cb Callback 314 * @return {(Error|RangeError|undefined)} A possible error 315 * @private 316 */ 317 getData(cb) { 318 var data = constants.EMPTY_BUFFER; 319 320 if (this._payloadLength) { 321 if (this._bufferedBytes < this._payloadLength) { 322 this._loop = false; 323 return; 324 } 325 326 data = this.consume(this._payloadLength); 327 if (this._masked) bufferUtil.unmask(data, this._mask); 328 } 329 330 if (this._opcode > 0x07) return this.controlMessage(data); 331 332 if (this._compressed) { 333 this._state = INFLATING; 334 this.decompress(data, cb); 335 return; 336 } 337 338 if (data.length) { 339 // 340 // This message is not compressed so its lenght is the sum of the payload 341 // length of all fragments. 342 // 343 this._messageLength = this._totalPayloadLength; 344 this._fragments.push(data); 345 } 346 347 return this.dataMessage(); 348 } 349 350 /** 351 * Decompresses data. 352 * 353 * @param {Buffer} data Compressed data 354 * @param {Function} cb Callback 355 * @private 356 */ 357 decompress(data, cb) { 358 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName]; 359 360 perMessageDeflate.decompress(data, this._fin, (err, buf) => { 361 if (err) return cb(err); 362 363 if (buf.length) { 364 this._messageLength += buf.length; 365 if (this._messageLength > this._maxPayload && this._maxPayload > 0) { 366 return cb( 367 error(RangeError, 'Max payload size exceeded', false, 1009) 368 ); 369 } 370 371 this._fragments.push(buf); 372 } 373 374 const er = this.dataMessage(); 375 if (er) return cb(er); 376 377 this.startLoop(cb); 378 }); 379 } 380 381 /** 382 * Handles a data message. 383 * 384 * @return {(Error|undefined)} A possible error 385 * @private 386 */ 387 dataMessage() { 388 if (this._fin) { 389 const messageLength = this._messageLength; 390 const fragments = this._fragments; 391 392 this._totalPayloadLength = 0; 393 this._messageLength = 0; 394 this._fragmented = 0; 395 this._fragments = []; 396 397 if (this._opcode === 2) { 398 var data; 399 400 if (this._binaryType === 'nodebuffer') { 401 data = toBuffer(fragments, messageLength); 402 } else if (this._binaryType === 'arraybuffer') { 403 data = toArrayBuffer(toBuffer(fragments, messageLength)); 404 } else { 405 data = fragments; 406 } 407 408 this.emit('message', data); 409 } else { 410 const buf = toBuffer(fragments, messageLength); 411 412 if (!validation.isValidUTF8(buf)) { 413 this._loop = false; 414 return error(Error, 'invalid UTF-8 sequence', true, 1007); 415 } 416 417 this.emit('message', buf.toString()); 418 } 419 } 420 421 this._state = GET_INFO; 422 } 423 424 /** 425 * Handles a control message. 426 * 427 * @param {Buffer} data Data to handle 428 * @return {(Error|RangeError|undefined)} A possible error 429 * @private 430 */ 431 controlMessage(data) { 432 if (this._opcode === 0x08) { 433 this._loop = false; 434 435 if (data.length === 0) { 436 this.emit('conclude', 1005, ''); 437 this.end(); 438 } else if (data.length === 1) { 439 return error(RangeError, 'invalid payload length 1', true, 1002); 440 } else { 441 const code = data.readUInt16BE(0); 442 443 if (!validation.isValidStatusCode(code)) { 444 return error(RangeError, `invalid status code ${code}`, true, 1002); 445 } 446 447 const buf = data.slice(2); 448 449 if (!validation.isValidUTF8(buf)) { 450 return error(Error, 'invalid UTF-8 sequence', true, 1007); 451 } 452 453 this.emit('conclude', code, buf.toString()); 454 this.end(); 455 } 456 } else if (this._opcode === 0x09) { 457 this.emit('ping', data); 458 } else { 459 this.emit('pong', data); 460 } 461 462 this._state = GET_INFO; 463 } 464 } 465 466 module.exports = Receiver; 467 468 /** 469 * Builds an error object. 470 * 471 * @param {(Error|RangeError)} ErrorCtor The error constructor 472 * @param {String} message The error message 473 * @param {Boolean} prefix Specifies whether or not to add a default prefix to 474 * `message` 475 * @param {Number} statusCode The status code 476 * @return {(Error|RangeError)} The error 477 * @private 478 */ 479 function error(ErrorCtor, message, prefix, statusCode) { 480 const err = new ErrorCtor( 481 prefix ? `Invalid WebSocket frame: ${message}` : message 482 ); 483 484 Error.captureStackTrace(err, error); 485 err[constants.kStatusCode] = statusCode; 486 return err; 487 } 488 489 /** 490 * Makes a buffer from a list of fragments. 491 * 492 * @param {Buffer[]} fragments The list of fragments composing the message 493 * @param {Number} messageLength The length of the message 494 * @return {Buffer} 495 * @private 496 */ 497 function toBuffer(fragments, messageLength) { 498 if (fragments.length === 1) return fragments[0]; 499 if (fragments.length > 1) return bufferUtil.concat(fragments, messageLength); 500 return constants.EMPTY_BUFFER; 501 } 502 503 /** 504 * Converts a buffer to an `ArrayBuffer`. 505 * 506 * @param {Buffer} buf The buffer to convert 507 * @return {ArrayBuffer} Converted buffer 508 */ 509 function toArrayBuffer(buf) { 510 if (buf.byteLength === buf.buffer.byteLength) { 511 return buf.buffer; 512 } 513 514 return buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength); 515 }