websocket.js (24338B)
1 'use strict'; 2 3 const EventEmitter = require('events'); 4 const https = require('https'); 5 const http = require('http'); 6 const net = require('net'); 7 const tls = require('tls'); 8 const { randomBytes, createHash } = require('crypto'); 9 const { URL } = require('url'); 10 11 const PerMessageDeflate = require('./permessage-deflate'); 12 const Receiver = require('./receiver'); 13 const Sender = require('./sender'); 14 const { 15 BINARY_TYPES, 16 EMPTY_BUFFER, 17 GUID, 18 kStatusCode, 19 kWebSocket, 20 NOOP 21 } = require('./constants'); 22 const { addEventListener, removeEventListener } = require('./event-target'); 23 const { format, parse } = require('./extension'); 24 const { toBuffer } = require('./buffer-util'); 25 26 const readyStates = ['CONNECTING', 'OPEN', 'CLOSING', 'CLOSED']; 27 const protocolVersions = [8, 13]; 28 const closeTimeout = 30 * 1000; 29 30 /** 31 * Class representing a WebSocket. 32 * 33 * @extends EventEmitter 34 */ 35 class WebSocket extends EventEmitter { 36 /** 37 * Create a new `WebSocket`. 38 * 39 * @param {(String|url.URL)} address The URL to which to connect 40 * @param {(String|String[])} protocols The subprotocols 41 * @param {Object} options Connection options 42 */ 43 constructor(address, protocols, options) { 44 super(); 45 46 this.readyState = WebSocket.CONNECTING; 47 this.protocol = ''; 48 49 this._binaryType = BINARY_TYPES[0]; 50 this._closeFrameReceived = false; 51 this._closeFrameSent = false; 52 this._closeMessage = ''; 53 this._closeTimer = null; 54 this._closeCode = 1006; 55 this._extensions = {}; 56 this._receiver = null; 57 this._sender = null; 58 this._socket = null; 59 60 if (address !== null) { 61 this._bufferedAmount = 0; 62 this._isServer = false; 63 this._redirects = 0; 64 65 if (Array.isArray(protocols)) { 66 protocols = protocols.join(', '); 67 } else if (typeof protocols === 'object' && protocols !== null) { 68 options = protocols; 69 protocols = undefined; 70 } 71 72 initAsClient(this, address, protocols, options); 73 } else { 74 this._isServer = true; 75 } 76 } 77 78 get CONNECTING() { 79 return WebSocket.CONNECTING; 80 } 81 get CLOSING() { 82 return WebSocket.CLOSING; 83 } 84 get CLOSED() { 85 return WebSocket.CLOSED; 86 } 87 get OPEN() { 88 return WebSocket.OPEN; 89 } 90 91 /** 92 * This deviates from the WHATWG interface since ws doesn't support the 93 * required default "blob" type (instead we define a custom "nodebuffer" 94 * type). 95 * 96 * @type {String} 97 */ 98 get binaryType() { 99 return this._binaryType; 100 } 101 102 set binaryType(type) { 103 if (!BINARY_TYPES.includes(type)) return; 104 105 this._binaryType = type; 106 107 // 108 // Allow to change `binaryType` on the fly. 109 // 110 if (this._receiver) this._receiver._binaryType = type; 111 } 112 113 /** 114 * @type {Number} 115 */ 116 get bufferedAmount() { 117 if (!this._socket) return this._bufferedAmount; 118 119 // 120 // `socket.bufferSize` is `undefined` if the socket is closed. 121 // 122 return (this._socket.bufferSize || 0) + this._sender._bufferedBytes; 123 } 124 125 /** 126 * @type {String} 127 */ 128 get extensions() { 129 return Object.keys(this._extensions).join(); 130 } 131 132 /** 133 * Set up the socket and the internal resources. 134 * 135 * @param {net.Socket} socket The network socket between the server and client 136 * @param {Buffer} head The first packet of the upgraded stream 137 * @param {Number} maxPayload The maximum allowed message size 138 * @private 139 */ 140 setSocket(socket, head, maxPayload) { 141 const receiver = new Receiver( 142 this._binaryType, 143 this._extensions, 144 this._isServer, 145 maxPayload 146 ); 147 148 this._sender = new Sender(socket, this._extensions); 149 this._receiver = receiver; 150 this._socket = socket; 151 152 receiver[kWebSocket] = this; 153 socket[kWebSocket] = this; 154 155 receiver.on('conclude', receiverOnConclude); 156 receiver.on('drain', receiverOnDrain); 157 receiver.on('error', receiverOnError); 158 receiver.on('message', receiverOnMessage); 159 receiver.on('ping', receiverOnPing); 160 receiver.on('pong', receiverOnPong); 161 162 socket.setTimeout(0); 163 socket.setNoDelay(); 164 165 if (head.length > 0) socket.unshift(head); 166 167 socket.on('close', socketOnClose); 168 socket.on('data', socketOnData); 169 socket.on('end', socketOnEnd); 170 socket.on('error', socketOnError); 171 172 this.readyState = WebSocket.OPEN; 173 this.emit('open'); 174 } 175 176 /** 177 * Emit the `'close'` event. 178 * 179 * @private 180 */ 181 emitClose() { 182 if (!this._socket) { 183 this.readyState = WebSocket.CLOSED; 184 this.emit('close', this._closeCode, this._closeMessage); 185 return; 186 } 187 188 if (this._extensions[PerMessageDeflate.extensionName]) { 189 this._extensions[PerMessageDeflate.extensionName].cleanup(); 190 } 191 192 this._receiver.removeAllListeners(); 193 this.readyState = WebSocket.CLOSED; 194 this.emit('close', this._closeCode, this._closeMessage); 195 } 196 197 /** 198 * Start a closing handshake. 199 * 200 * +----------+ +-----------+ +----------+ 201 * - - -|ws.close()|-->|close frame|-->|ws.close()|- - - 202 * | +----------+ +-----------+ +----------+ | 203 * +----------+ +-----------+ | 204 * CLOSING |ws.close()|<--|close frame|<--+-----+ CLOSING 205 * +----------+ +-----------+ | 206 * | | | +---+ | 207 * +------------------------+-->|fin| - - - - 208 * | +---+ | +---+ 209 * - - - - -|fin|<---------------------+ 210 * +---+ 211 * 212 * @param {Number} code Status code explaining why the connection is closing 213 * @param {String} data A string explaining why the connection is closing 214 * @public 215 */ 216 close(code, data) { 217 if (this.readyState === WebSocket.CLOSED) return; 218 if (this.readyState === WebSocket.CONNECTING) { 219 const msg = 'WebSocket was closed before the connection was established'; 220 return abortHandshake(this, this._req, msg); 221 } 222 223 if (this.readyState === WebSocket.CLOSING) { 224 if (this._closeFrameSent && this._closeFrameReceived) this._socket.end(); 225 return; 226 } 227 228 this.readyState = WebSocket.CLOSING; 229 this._sender.close(code, data, !this._isServer, (err) => { 230 // 231 // This error is handled by the `'error'` listener on the socket. We only 232 // want to know if the close frame has been sent here. 233 // 234 if (err) return; 235 236 this._closeFrameSent = true; 237 if (this._closeFrameReceived) this._socket.end(); 238 }); 239 240 // 241 // Specify a timeout for the closing handshake to complete. 242 // 243 this._closeTimer = setTimeout( 244 this._socket.destroy.bind(this._socket), 245 closeTimeout 246 ); 247 } 248 249 /** 250 * Send a ping. 251 * 252 * @param {*} data The data to send 253 * @param {Boolean} mask Indicates whether or not to mask `data` 254 * @param {Function} cb Callback which is executed when the ping is sent 255 * @public 256 */ 257 ping(data, mask, cb) { 258 if (this.readyState === WebSocket.CONNECTING) { 259 throw new Error('WebSocket is not open: readyState 0 (CONNECTING)'); 260 } 261 262 if (typeof data === 'function') { 263 cb = data; 264 data = mask = undefined; 265 } else if (typeof mask === 'function') { 266 cb = mask; 267 mask = undefined; 268 } 269 270 if (typeof data === 'number') data = data.toString(); 271 272 if (this.readyState !== WebSocket.OPEN) { 273 sendAfterClose(this, data, cb); 274 return; 275 } 276 277 if (mask === undefined) mask = !this._isServer; 278 this._sender.ping(data || EMPTY_BUFFER, mask, cb); 279 } 280 281 /** 282 * Send a pong. 283 * 284 * @param {*} data The data to send 285 * @param {Boolean} mask Indicates whether or not to mask `data` 286 * @param {Function} cb Callback which is executed when the pong is sent 287 * @public 288 */ 289 pong(data, mask, cb) { 290 if (this.readyState === WebSocket.CONNECTING) { 291 throw new Error('WebSocket is not open: readyState 0 (CONNECTING)'); 292 } 293 294 if (typeof data === 'function') { 295 cb = data; 296 data = mask = undefined; 297 } else if (typeof mask === 'function') { 298 cb = mask; 299 mask = undefined; 300 } 301 302 if (typeof data === 'number') data = data.toString(); 303 304 if (this.readyState !== WebSocket.OPEN) { 305 sendAfterClose(this, data, cb); 306 return; 307 } 308 309 if (mask === undefined) mask = !this._isServer; 310 this._sender.pong(data || EMPTY_BUFFER, mask, cb); 311 } 312 313 /** 314 * Send a data message. 315 * 316 * @param {*} data The message to send 317 * @param {Object} options Options object 318 * @param {Boolean} options.compress Specifies whether or not to compress 319 * `data` 320 * @param {Boolean} options.binary Specifies whether `data` is binary or text 321 * @param {Boolean} options.fin Specifies whether the fragment is the last one 322 * @param {Boolean} options.mask Specifies whether or not to mask `data` 323 * @param {Function} cb Callback which is executed when data is written out 324 * @public 325 */ 326 send(data, options, cb) { 327 if (this.readyState === WebSocket.CONNECTING) { 328 throw new Error('WebSocket is not open: readyState 0 (CONNECTING)'); 329 } 330 331 if (typeof options === 'function') { 332 cb = options; 333 options = {}; 334 } 335 336 if (typeof data === 'number') data = data.toString(); 337 338 if (this.readyState !== WebSocket.OPEN) { 339 sendAfterClose(this, data, cb); 340 return; 341 } 342 343 const opts = { 344 binary: typeof data !== 'string', 345 mask: !this._isServer, 346 compress: true, 347 fin: true, 348 ...options 349 }; 350 351 if (!this._extensions[PerMessageDeflate.extensionName]) { 352 opts.compress = false; 353 } 354 355 this._sender.send(data || EMPTY_BUFFER, opts, cb); 356 } 357 358 /** 359 * Forcibly close the connection. 360 * 361 * @public 362 */ 363 terminate() { 364 if (this.readyState === WebSocket.CLOSED) return; 365 if (this.readyState === WebSocket.CONNECTING) { 366 const msg = 'WebSocket was closed before the connection was established'; 367 return abortHandshake(this, this._req, msg); 368 } 369 370 if (this._socket) { 371 this.readyState = WebSocket.CLOSING; 372 this._socket.destroy(); 373 } 374 } 375 } 376 377 readyStates.forEach((readyState, i) => { 378 WebSocket[readyState] = i; 379 }); 380 381 // 382 // Add the `onopen`, `onerror`, `onclose`, and `onmessage` attributes. 383 // See https://html.spec.whatwg.org/multipage/comms.html#the-websocket-interface 384 // 385 ['open', 'error', 'close', 'message'].forEach((method) => { 386 Object.defineProperty(WebSocket.prototype, `on${method}`, { 387 /** 388 * Return the listener of the event. 389 * 390 * @return {(Function|undefined)} The event listener or `undefined` 391 * @public 392 */ 393 get() { 394 const listeners = this.listeners(method); 395 for (let i = 0; i < listeners.length; i++) { 396 if (listeners[i]._listener) return listeners[i]._listener; 397 } 398 399 return undefined; 400 }, 401 /** 402 * Add a listener for the event. 403 * 404 * @param {Function} listener The listener to add 405 * @public 406 */ 407 set(listener) { 408 const listeners = this.listeners(method); 409 for (let i = 0; i < listeners.length; i++) { 410 // 411 // Remove only the listeners added via `addEventListener`. 412 // 413 if (listeners[i]._listener) this.removeListener(method, listeners[i]); 414 } 415 this.addEventListener(method, listener); 416 } 417 }); 418 }); 419 420 WebSocket.prototype.addEventListener = addEventListener; 421 WebSocket.prototype.removeEventListener = removeEventListener; 422 423 module.exports = WebSocket; 424 425 /** 426 * Initialize a WebSocket client. 427 * 428 * @param {WebSocket} websocket The client to initialize 429 * @param {(String|url.URL)} address The URL to which to connect 430 * @param {String} protocols The subprotocols 431 * @param {Object} options Connection options 432 * @param {(Boolean|Object)} options.perMessageDeflate Enable/disable 433 * permessage-deflate 434 * @param {Number} options.handshakeTimeout Timeout in milliseconds for the 435 * handshake request 436 * @param {Number} options.protocolVersion Value of the `Sec-WebSocket-Version` 437 * header 438 * @param {String} options.origin Value of the `Origin` or 439 * `Sec-WebSocket-Origin` header 440 * @param {Number} options.maxPayload The maximum allowed message size 441 * @param {Boolean} options.followRedirects Whether or not to follow redirects 442 * @param {Number} options.maxRedirects The maximum number of redirects allowed 443 * @private 444 */ 445 function initAsClient(websocket, address, protocols, options) { 446 const opts = { 447 protocolVersion: protocolVersions[1], 448 maxPayload: 100 * 1024 * 1024, 449 perMessageDeflate: true, 450 followRedirects: false, 451 maxRedirects: 10, 452 ...options, 453 createConnection: undefined, 454 socketPath: undefined, 455 hostname: undefined, 456 protocol: undefined, 457 timeout: undefined, 458 method: undefined, 459 auth: undefined, 460 host: undefined, 461 path: undefined, 462 port: undefined 463 }; 464 465 if (!protocolVersions.includes(opts.protocolVersion)) { 466 throw new RangeError( 467 `Unsupported protocol version: ${opts.protocolVersion} ` + 468 `(supported versions: ${protocolVersions.join(', ')})` 469 ); 470 } 471 472 let parsedUrl; 473 474 if (address instanceof URL) { 475 parsedUrl = address; 476 websocket.url = address.href; 477 } else { 478 parsedUrl = new URL(address); 479 websocket.url = address; 480 } 481 482 const isUnixSocket = parsedUrl.protocol === 'ws+unix:'; 483 484 if (!parsedUrl.host && (!isUnixSocket || !parsedUrl.pathname)) { 485 throw new Error(`Invalid URL: ${websocket.url}`); 486 } 487 488 const isSecure = 489 parsedUrl.protocol === 'wss:' || parsedUrl.protocol === 'https:'; 490 const defaultPort = isSecure ? 443 : 80; 491 const key = randomBytes(16).toString('base64'); 492 const get = isSecure ? https.get : http.get; 493 let perMessageDeflate; 494 495 opts.createConnection = isSecure ? tlsConnect : netConnect; 496 opts.defaultPort = opts.defaultPort || defaultPort; 497 opts.port = parsedUrl.port || defaultPort; 498 opts.host = parsedUrl.hostname.startsWith('[') 499 ? parsedUrl.hostname.slice(1, -1) 500 : parsedUrl.hostname; 501 opts.headers = { 502 'Sec-WebSocket-Version': opts.protocolVersion, 503 'Sec-WebSocket-Key': key, 504 Connection: 'Upgrade', 505 Upgrade: 'websocket', 506 ...opts.headers 507 }; 508 opts.path = parsedUrl.pathname + parsedUrl.search; 509 opts.timeout = opts.handshakeTimeout; 510 511 if (opts.perMessageDeflate) { 512 perMessageDeflate = new PerMessageDeflate( 513 opts.perMessageDeflate !== true ? opts.perMessageDeflate : {}, 514 false, 515 opts.maxPayload 516 ); 517 opts.headers['Sec-WebSocket-Extensions'] = format({ 518 [PerMessageDeflate.extensionName]: perMessageDeflate.offer() 519 }); 520 } 521 if (protocols) { 522 opts.headers['Sec-WebSocket-Protocol'] = protocols; 523 } 524 if (opts.origin) { 525 if (opts.protocolVersion < 13) { 526 opts.headers['Sec-WebSocket-Origin'] = opts.origin; 527 } else { 528 opts.headers.Origin = opts.origin; 529 } 530 } 531 if (parsedUrl.username || parsedUrl.password) { 532 opts.auth = `${parsedUrl.username}:${parsedUrl.password}`; 533 } 534 535 if (isUnixSocket) { 536 const parts = opts.path.split(':'); 537 538 opts.socketPath = parts[0]; 539 opts.path = parts[1]; 540 } 541 542 let req = (websocket._req = get(opts)); 543 544 if (opts.timeout) { 545 req.on('timeout', () => { 546 abortHandshake(websocket, req, 'Opening handshake has timed out'); 547 }); 548 } 549 550 req.on('error', (err) => { 551 if (websocket._req.aborted) return; 552 553 req = websocket._req = null; 554 websocket.readyState = WebSocket.CLOSING; 555 websocket.emit('error', err); 556 websocket.emitClose(); 557 }); 558 559 req.on('response', (res) => { 560 const location = res.headers.location; 561 const statusCode = res.statusCode; 562 563 if ( 564 location && 565 opts.followRedirects && 566 statusCode >= 300 && 567 statusCode < 400 568 ) { 569 if (++websocket._redirects > opts.maxRedirects) { 570 abortHandshake(websocket, req, 'Maximum redirects exceeded'); 571 return; 572 } 573 574 req.abort(); 575 576 const addr = new URL(location, address); 577 578 initAsClient(websocket, addr, protocols, options); 579 } else if (!websocket.emit('unexpected-response', req, res)) { 580 abortHandshake( 581 websocket, 582 req, 583 `Unexpected server response: ${res.statusCode}` 584 ); 585 } 586 }); 587 588 req.on('upgrade', (res, socket, head) => { 589 websocket.emit('upgrade', res); 590 591 // 592 // The user may have closed the connection from a listener of the `upgrade` 593 // event. 594 // 595 if (websocket.readyState !== WebSocket.CONNECTING) return; 596 597 req = websocket._req = null; 598 599 const digest = createHash('sha1') 600 .update(key + GUID) 601 .digest('base64'); 602 603 if (res.headers['sec-websocket-accept'] !== digest) { 604 abortHandshake(websocket, socket, 'Invalid Sec-WebSocket-Accept header'); 605 return; 606 } 607 608 const serverProt = res.headers['sec-websocket-protocol']; 609 const protList = (protocols || '').split(/, */); 610 let protError; 611 612 if (!protocols && serverProt) { 613 protError = 'Server sent a subprotocol but none was requested'; 614 } else if (protocols && !serverProt) { 615 protError = 'Server sent no subprotocol'; 616 } else if (serverProt && !protList.includes(serverProt)) { 617 protError = 'Server sent an invalid subprotocol'; 618 } 619 620 if (protError) { 621 abortHandshake(websocket, socket, protError); 622 return; 623 } 624 625 if (serverProt) websocket.protocol = serverProt; 626 627 if (perMessageDeflate) { 628 try { 629 const extensions = parse(res.headers['sec-websocket-extensions']); 630 631 if (extensions[PerMessageDeflate.extensionName]) { 632 perMessageDeflate.accept(extensions[PerMessageDeflate.extensionName]); 633 websocket._extensions[ 634 PerMessageDeflate.extensionName 635 ] = perMessageDeflate; 636 } 637 } catch (err) { 638 abortHandshake( 639 websocket, 640 socket, 641 'Invalid Sec-WebSocket-Extensions header' 642 ); 643 return; 644 } 645 } 646 647 websocket.setSocket(socket, head, opts.maxPayload); 648 }); 649 } 650 651 /** 652 * Create a `net.Socket` and initiate a connection. 653 * 654 * @param {Object} options Connection options 655 * @return {net.Socket} The newly created socket used to start the connection 656 * @private 657 */ 658 function netConnect(options) { 659 options.path = options.socketPath; 660 return net.connect(options); 661 } 662 663 /** 664 * Create a `tls.TLSSocket` and initiate a connection. 665 * 666 * @param {Object} options Connection options 667 * @return {tls.TLSSocket} The newly created socket used to start the connection 668 * @private 669 */ 670 function tlsConnect(options) { 671 options.path = undefined; 672 673 if (!options.servername && options.servername !== '') { 674 options.servername = options.host; 675 } 676 677 return tls.connect(options); 678 } 679 680 /** 681 * Abort the handshake and emit an error. 682 * 683 * @param {WebSocket} websocket The WebSocket instance 684 * @param {(http.ClientRequest|net.Socket)} stream The request to abort or the 685 * socket to destroy 686 * @param {String} message The error message 687 * @private 688 */ 689 function abortHandshake(websocket, stream, message) { 690 websocket.readyState = WebSocket.CLOSING; 691 692 const err = new Error(message); 693 Error.captureStackTrace(err, abortHandshake); 694 695 if (stream.setHeader) { 696 stream.abort(); 697 stream.once('abort', websocket.emitClose.bind(websocket)); 698 websocket.emit('error', err); 699 } else { 700 stream.destroy(err); 701 stream.once('error', websocket.emit.bind(websocket, 'error')); 702 stream.once('close', websocket.emitClose.bind(websocket)); 703 } 704 } 705 706 /** 707 * Handle cases where the `ping()`, `pong()`, or `send()` methods are called 708 * when the `readyState` attribute is `CLOSING` or `CLOSED`. 709 * 710 * @param {WebSocket} websocket The WebSocket instance 711 * @param {*} data The data to send 712 * @param {Function} cb Callback 713 * @private 714 */ 715 function sendAfterClose(websocket, data, cb) { 716 if (data) { 717 const length = toBuffer(data).length; 718 719 // 720 // The `_bufferedAmount` property is used only when the peer is a client and 721 // the opening handshake fails. Under these circumstances, in fact, the 722 // `setSocket()` method is not called, so the `_socket` and `_sender` 723 // properties are set to `null`. 724 // 725 if (websocket._socket) websocket._sender._bufferedBytes += length; 726 else websocket._bufferedAmount += length; 727 } 728 729 if (cb) { 730 const err = new Error( 731 `WebSocket is not open: readyState ${websocket.readyState} ` + 732 `(${readyStates[websocket.readyState]})` 733 ); 734 cb(err); 735 } 736 } 737 738 /** 739 * The listener of the `Receiver` `'conclude'` event. 740 * 741 * @param {Number} code The status code 742 * @param {String} reason The reason for closing 743 * @private 744 */ 745 function receiverOnConclude(code, reason) { 746 const websocket = this[kWebSocket]; 747 748 websocket._socket.removeListener('data', socketOnData); 749 websocket._socket.resume(); 750 751 websocket._closeFrameReceived = true; 752 websocket._closeMessage = reason; 753 websocket._closeCode = code; 754 755 if (code === 1005) websocket.close(); 756 else websocket.close(code, reason); 757 } 758 759 /** 760 * The listener of the `Receiver` `'drain'` event. 761 * 762 * @private 763 */ 764 function receiverOnDrain() { 765 this[kWebSocket]._socket.resume(); 766 } 767 768 /** 769 * The listener of the `Receiver` `'error'` event. 770 * 771 * @param {(RangeError|Error)} err The emitted error 772 * @private 773 */ 774 function receiverOnError(err) { 775 const websocket = this[kWebSocket]; 776 777 websocket._socket.removeListener('data', socketOnData); 778 779 websocket.readyState = WebSocket.CLOSING; 780 websocket._closeCode = err[kStatusCode]; 781 websocket.emit('error', err); 782 websocket._socket.destroy(); 783 } 784 785 /** 786 * The listener of the `Receiver` `'finish'` event. 787 * 788 * @private 789 */ 790 function receiverOnFinish() { 791 this[kWebSocket].emitClose(); 792 } 793 794 /** 795 * The listener of the `Receiver` `'message'` event. 796 * 797 * @param {(String|Buffer|ArrayBuffer|Buffer[])} data The message 798 * @private 799 */ 800 function receiverOnMessage(data) { 801 this[kWebSocket].emit('message', data); 802 } 803 804 /** 805 * The listener of the `Receiver` `'ping'` event. 806 * 807 * @param {Buffer} data The data included in the ping frame 808 * @private 809 */ 810 function receiverOnPing(data) { 811 const websocket = this[kWebSocket]; 812 813 websocket.pong(data, !websocket._isServer, NOOP); 814 websocket.emit('ping', data); 815 } 816 817 /** 818 * The listener of the `Receiver` `'pong'` event. 819 * 820 * @param {Buffer} data The data included in the pong frame 821 * @private 822 */ 823 function receiverOnPong(data) { 824 this[kWebSocket].emit('pong', data); 825 } 826 827 /** 828 * The listener of the `net.Socket` `'close'` event. 829 * 830 * @private 831 */ 832 function socketOnClose() { 833 const websocket = this[kWebSocket]; 834 835 this.removeListener('close', socketOnClose); 836 this.removeListener('end', socketOnEnd); 837 838 websocket.readyState = WebSocket.CLOSING; 839 840 // 841 // The close frame might not have been received or the `'end'` event emitted, 842 // for example, if the socket was destroyed due to an error. Ensure that the 843 // `receiver` stream is closed after writing any remaining buffered data to 844 // it. If the readable side of the socket is in flowing mode then there is no 845 // buffered data as everything has been already written and `readable.read()` 846 // will return `null`. If instead, the socket is paused, any possible buffered 847 // data will be read as a single chunk and emitted synchronously in a single 848 // `'data'` event. 849 // 850 websocket._socket.read(); 851 websocket._receiver.end(); 852 853 this.removeListener('data', socketOnData); 854 this[kWebSocket] = undefined; 855 856 clearTimeout(websocket._closeTimer); 857 858 if ( 859 websocket._receiver._writableState.finished || 860 websocket._receiver._writableState.errorEmitted 861 ) { 862 websocket.emitClose(); 863 } else { 864 websocket._receiver.on('error', receiverOnFinish); 865 websocket._receiver.on('finish', receiverOnFinish); 866 } 867 } 868 869 /** 870 * The listener of the `net.Socket` `'data'` event. 871 * 872 * @param {Buffer} chunk A chunk of data 873 * @private 874 */ 875 function socketOnData(chunk) { 876 if (!this[kWebSocket]._receiver.write(chunk)) { 877 this.pause(); 878 } 879 } 880 881 /** 882 * The listener of the `net.Socket` `'end'` event. 883 * 884 * @private 885 */ 886 function socketOnEnd() { 887 const websocket = this[kWebSocket]; 888 889 websocket.readyState = WebSocket.CLOSING; 890 websocket._receiver.end(); 891 this.end(); 892 } 893 894 /** 895 * The listener of the `net.Socket` `'error'` event. 896 * 897 * @private 898 */ 899 function socketOnError() { 900 const websocket = this[kWebSocket]; 901 902 this.removeListener('error', socketOnError); 903 this.on('error', NOOP); 904 905 if (websocket) { 906 websocket.readyState = WebSocket.CLOSING; 907 this.destroy(); 908 } 909 }