websocket.js (24317B)
1 'use strict'; 2 3 const EventEmitter = require('events'); 4 const https = require('https'); 5 const http = require('http'); 6 const net = require('net'); 7 const tls = require('tls'); 8 const { randomBytes, createHash } = require('crypto'); 9 const { URL } = require('url'); 10 11 const PerMessageDeflate = require('./permessage-deflate'); 12 const Receiver = require('./receiver'); 13 const Sender = require('./sender'); 14 const { 15 BINARY_TYPES, 16 EMPTY_BUFFER, 17 GUID, 18 kStatusCode, 19 kWebSocket, 20 NOOP 21 } = require('./constants'); 22 const { addEventListener, removeEventListener } = require('./event-target'); 23 const { format, parse } = require('./extension'); 24 const { toBuffer } = require('./buffer-util'); 25 26 const readyStates = ['CONNECTING', 'OPEN', 'CLOSING', 'CLOSED']; 27 const protocolVersions = [8, 13]; 28 const closeTimeout = 30 * 1000; 29 30 /** 31 * Class representing a WebSocket. 32 * 33 * @extends EventEmitter 34 */ 35 class WebSocket extends EventEmitter { 36 /** 37 * Create a new `WebSocket`. 38 * 39 * @param {(String|url.URL)} address The URL to which to connect 40 * @param {(String|String[])} protocols The subprotocols 41 * @param {Object} options Connection options 42 */ 43 constructor(address, protocols, options) { 44 super(); 45 46 this.readyState = WebSocket.CONNECTING; 47 this.protocol = ''; 48 49 this._binaryType = BINARY_TYPES[0]; 50 this._closeFrameReceived = false; 51 this._closeFrameSent = false; 52 this._closeMessage = ''; 53 this._closeTimer = null; 54 this._closeCode = 1006; 55 this._extensions = {}; 56 this._receiver = null; 57 this._sender = null; 58 this._socket = null; 59 60 if (address !== null) { 61 this._bufferedAmount = 0; 62 this._isServer = false; 63 this._redirects = 0; 64 65 if (Array.isArray(protocols)) { 66 protocols = protocols.join(', '); 67 } else if (typeof protocols === 'object' && protocols !== null) { 68 options = protocols; 69 protocols = undefined; 70 } 71 72 initAsClient(this, address, protocols, options); 73 } else { 74 this._isServer = true; 75 } 76 } 77 78 get CONNECTING() { 79 return WebSocket.CONNECTING; 80 } 81 get CLOSING() { 82 return WebSocket.CLOSING; 83 } 84 get CLOSED() { 85 return WebSocket.CLOSED; 86 } 87 get OPEN() { 88 return WebSocket.OPEN; 89 } 90 91 /** 92 * This deviates from the WHATWG interface since ws doesn't support the 93 * required default "blob" type (instead we define a custom "nodebuffer" 94 * type). 95 * 96 * @type {String} 97 */ 98 get binaryType() { 99 return this._binaryType; 100 } 101 102 set binaryType(type) { 103 if (!BINARY_TYPES.includes(type)) return; 104 105 this._binaryType = type; 106 107 // 108 // Allow to change `binaryType` on the fly. 109 // 110 if (this._receiver) this._receiver._binaryType = type; 111 } 112 113 /** 114 * @type {Number} 115 */ 116 get bufferedAmount() { 117 if (!this._socket) return this._bufferedAmount; 118 119 // 120 // `socket.bufferSize` is `undefined` if the socket is closed. 121 // 122 return (this._socket.bufferSize || 0) + this._sender._bufferedBytes; 123 } 124 125 /** 126 * @type {String} 127 */ 128 get extensions() { 129 return Object.keys(this._extensions).join(); 130 } 131 132 /** 133 * Set up the socket and the internal resources. 134 * 135 * @param {net.Socket} socket The network socket between the server and client 136 * @param {Buffer} head The first packet of the upgraded stream 137 * @param {Number} maxPayload The maximum allowed message size 138 * @private 139 */ 140 setSocket(socket, head, maxPayload) { 141 const receiver = new Receiver( 142 this._binaryType, 143 this._extensions, 144 this._isServer, 145 maxPayload 146 ); 147 148 this._sender = new Sender(socket, this._extensions); 149 this._receiver = receiver; 150 this._socket = socket; 151 152 receiver[kWebSocket] = this; 153 socket[kWebSocket] = this; 154 155 receiver.on('conclude', receiverOnConclude); 156 receiver.on('drain', receiverOnDrain); 157 receiver.on('error', receiverOnError); 158 receiver.on('message', receiverOnMessage); 159 receiver.on('ping', receiverOnPing); 160 receiver.on('pong', receiverOnPong); 161 162 socket.setTimeout(0); 163 socket.setNoDelay(); 164 165 if (head.length > 0) socket.unshift(head); 166 167 socket.on('close', socketOnClose); 168 socket.on('data', socketOnData); 169 socket.on('end', socketOnEnd); 170 socket.on('error', socketOnError); 171 172 this.readyState = WebSocket.OPEN; 173 this.emit('open'); 174 } 175 176 /** 177 * Emit the `'close'` event. 178 * 179 * @private 180 */ 181 emitClose() { 182 if (!this._socket) { 183 this.readyState = WebSocket.CLOSED; 184 this.emit('close', this._closeCode, this._closeMessage); 185 return; 186 } 187 188 if (this._extensions[PerMessageDeflate.extensionName]) { 189 this._extensions[PerMessageDeflate.extensionName].cleanup(); 190 } 191 192 this._receiver.removeAllListeners(); 193 this.readyState = WebSocket.CLOSED; 194 this.emit('close', this._closeCode, this._closeMessage); 195 } 196 197 /** 198 * Start a closing handshake. 199 * 200 * +----------+ +-----------+ +----------+ 201 * - - -|ws.close()|-->|close frame|-->|ws.close()|- - - 202 * | +----------+ +-----------+ +----------+ | 203 * +----------+ +-----------+ | 204 * CLOSING |ws.close()|<--|close frame|<--+-----+ CLOSING 205 * +----------+ +-----------+ | 206 * | | | +---+ | 207 * +------------------------+-->|fin| - - - - 208 * | +---+ | +---+ 209 * - - - - -|fin|<---------------------+ 210 * +---+ 211 * 212 * @param {Number} code Status code explaining why the connection is closing 213 * @param {String} data A string explaining why the connection is closing 214 * @public 215 */ 216 close(code, data) { 217 if (this.readyState === WebSocket.CLOSED) return; 218 if (this.readyState === WebSocket.CONNECTING) { 219 const msg = 'WebSocket was closed before the connection was established'; 220 return abortHandshake(this, this._req, msg); 221 } 222 223 if (this.readyState === WebSocket.CLOSING) { 224 if (this._closeFrameSent && this._closeFrameReceived) this._socket.end(); 225 return; 226 } 227 228 this.readyState = WebSocket.CLOSING; 229 this._sender.close(code, data, !this._isServer, (err) => { 230 // 231 // This error is handled by the `'error'` listener on the socket. We only 232 // want to know if the close frame has been sent here. 233 // 234 if (err) return; 235 236 this._closeFrameSent = true; 237 if (this._closeFrameReceived) this._socket.end(); 238 }); 239 240 // 241 // Specify a timeout for the closing handshake to complete. 242 // 243 this._closeTimer = setTimeout( 244 this._socket.destroy.bind(this._socket), 245 closeTimeout 246 ); 247 } 248 249 /** 250 * Send a ping. 251 * 252 * @param {*} data The data to send 253 * @param {Boolean} mask Indicates whether or not to mask `data` 254 * @param {Function} cb Callback which is executed when the ping is sent 255 * @public 256 */ 257 ping(data, mask, cb) { 258 if (this.readyState === WebSocket.CONNECTING) { 259 throw new Error('WebSocket is not open: readyState 0 (CONNECTING)'); 260 } 261 262 if (typeof data === 'function') { 263 cb = data; 264 data = mask = undefined; 265 } else if (typeof mask === 'function') { 266 cb = mask; 267 mask = undefined; 268 } 269 270 if (typeof data === 'number') data = data.toString(); 271 272 if (this.readyState !== WebSocket.OPEN) { 273 sendAfterClose(this, data, cb); 274 return; 275 } 276 277 if (mask === undefined) mask = !this._isServer; 278 this._sender.ping(data || EMPTY_BUFFER, mask, cb); 279 } 280 281 /** 282 * Send a pong. 283 * 284 * @param {*} data The data to send 285 * @param {Boolean} mask Indicates whether or not to mask `data` 286 * @param {Function} cb Callback which is executed when the pong is sent 287 * @public 288 */ 289 pong(data, mask, cb) { 290 if (this.readyState === WebSocket.CONNECTING) { 291 throw new Error('WebSocket is not open: readyState 0 (CONNECTING)'); 292 } 293 294 if (typeof data === 'function') { 295 cb = data; 296 data = mask = undefined; 297 } else if (typeof mask === 'function') { 298 cb = mask; 299 mask = undefined; 300 } 301 302 if (typeof data === 'number') data = data.toString(); 303 304 if (this.readyState !== WebSocket.OPEN) { 305 sendAfterClose(this, data, cb); 306 return; 307 } 308 309 if (mask === undefined) mask = !this._isServer; 310 this._sender.pong(data || EMPTY_BUFFER, mask, cb); 311 } 312 313 /** 314 * Send a data message. 315 * 316 * @param {*} data The message to send 317 * @param {Object} options Options object 318 * @param {Boolean} options.compress Specifies whether or not to compress 319 * `data` 320 * @param {Boolean} options.binary Specifies whether `data` is binary or text 321 * @param {Boolean} options.fin Specifies whether the fragment is the last one 322 * @param {Boolean} options.mask Specifies whether or not to mask `data` 323 * @param {Function} cb Callback which is executed when data is written out 324 * @public 325 */ 326 send(data, options, cb) { 327 if (this.readyState === WebSocket.CONNECTING) { 328 throw new Error('WebSocket is not open: readyState 0 (CONNECTING)'); 329 } 330 331 if (typeof options === 'function') { 332 cb = options; 333 options = {}; 334 } 335 336 if (typeof data === 'number') data = data.toString(); 337 338 if (this.readyState !== WebSocket.OPEN) { 339 sendAfterClose(this, data, cb); 340 return; 341 } 342 343 const opts = { 344 binary: typeof data !== 'string', 345 mask: !this._isServer, 346 compress: true, 347 fin: true, 348 ...options 349 }; 350 351 if (!this._extensions[PerMessageDeflate.extensionName]) { 352 opts.compress = false; 353 } 354 355 this._sender.send(data || EMPTY_BUFFER, opts, cb); 356 } 357 358 /** 359 * Forcibly close the connection. 360 * 361 * @public 362 */ 363 terminate() { 364 if (this.readyState === WebSocket.CLOSED) return; 365 if (this.readyState === WebSocket.CONNECTING) { 366 const msg = 'WebSocket was closed before the connection was established'; 367 return abortHandshake(this, this._req, msg); 368 } 369 370 if (this._socket) { 371 this.readyState = WebSocket.CLOSING; 372 this._socket.destroy(); 373 } 374 } 375 } 376 377 readyStates.forEach((readyState, i) => { 378 WebSocket[readyState] = i; 379 }); 380 381 // 382 // Add the `onopen`, `onerror`, `onclose`, and `onmessage` attributes. 383 // See https://html.spec.whatwg.org/multipage/comms.html#the-websocket-interface 384 // 385 ['open', 'error', 'close', 'message'].forEach((method) => { 386 Object.defineProperty(WebSocket.prototype, `on${method}`, { 387 /** 388 * Return the listener of the event. 389 * 390 * @return {(Function|undefined)} The event listener or `undefined` 391 * @public 392 */ 393 get() { 394 const listeners = this.listeners(method); 395 for (let i = 0; i < listeners.length; i++) { 396 if (listeners[i]._listener) return listeners[i]._listener; 397 } 398 399 return undefined; 400 }, 401 /** 402 * Add a listener for the event. 403 * 404 * @param {Function} listener The listener to add 405 * @public 406 */ 407 set(listener) { 408 const listeners = this.listeners(method); 409 for (let i = 0; i < listeners.length; i++) { 410 // 411 // Remove only the listeners added via `addEventListener`. 412 // 413 if (listeners[i]._listener) this.removeListener(method, listeners[i]); 414 } 415 this.addEventListener(method, listener); 416 } 417 }); 418 }); 419 420 WebSocket.prototype.addEventListener = addEventListener; 421 WebSocket.prototype.removeEventListener = removeEventListener; 422 423 module.exports = WebSocket; 424 425 /** 426 * Initialize a WebSocket client. 427 * 428 * @param {WebSocket} websocket The client to initialize 429 * @param {(String|url.URL)} address The URL to which to connect 430 * @param {String} protocols The subprotocols 431 * @param {Object} options Connection options 432 * @param {(Boolean|Object)} options.perMessageDeflate Enable/disable 433 * permessage-deflate 434 * @param {Number} options.handshakeTimeout Timeout in milliseconds for the 435 * handshake request 436 * @param {Number} options.protocolVersion Value of the `Sec-WebSocket-Version` 437 * header 438 * @param {String} options.origin Value of the `Origin` or 439 * `Sec-WebSocket-Origin` header 440 * @param {Number} options.maxPayload The maximum allowed message size 441 * @param {Boolean} options.followRedirects Whether or not to follow redirects 442 * @param {Number} options.maxRedirects The maximum number of redirects allowed 443 * @private 444 */ 445 function initAsClient(websocket, address, protocols, options) { 446 const opts = { 447 protocolVersion: protocolVersions[1], 448 maxPayload: 100 * 1024 * 1024, 449 perMessageDeflate: true, 450 followRedirects: false, 451 maxRedirects: 10, 452 ...options, 453 createConnection: undefined, 454 socketPath: undefined, 455 hostname: undefined, 456 protocol: undefined, 457 timeout: undefined, 458 method: undefined, 459 host: undefined, 460 path: undefined, 461 port: undefined 462 }; 463 464 if (!protocolVersions.includes(opts.protocolVersion)) { 465 throw new RangeError( 466 `Unsupported protocol version: ${opts.protocolVersion} ` + 467 `(supported versions: ${protocolVersions.join(', ')})` 468 ); 469 } 470 471 let parsedUrl; 472 473 if (address instanceof URL) { 474 parsedUrl = address; 475 websocket.url = address.href; 476 } else { 477 parsedUrl = new URL(address); 478 websocket.url = address; 479 } 480 481 const isUnixSocket = parsedUrl.protocol === 'ws+unix:'; 482 483 if (!parsedUrl.host && (!isUnixSocket || !parsedUrl.pathname)) { 484 throw new Error(`Invalid URL: ${websocket.url}`); 485 } 486 487 const isSecure = 488 parsedUrl.protocol === 'wss:' || parsedUrl.protocol === 'https:'; 489 const defaultPort = isSecure ? 443 : 80; 490 const key = randomBytes(16).toString('base64'); 491 const get = isSecure ? https.get : http.get; 492 let perMessageDeflate; 493 494 opts.createConnection = isSecure ? tlsConnect : netConnect; 495 opts.defaultPort = opts.defaultPort || defaultPort; 496 opts.port = parsedUrl.port || defaultPort; 497 opts.host = parsedUrl.hostname.startsWith('[') 498 ? parsedUrl.hostname.slice(1, -1) 499 : parsedUrl.hostname; 500 opts.headers = { 501 'Sec-WebSocket-Version': opts.protocolVersion, 502 'Sec-WebSocket-Key': key, 503 Connection: 'Upgrade', 504 Upgrade: 'websocket', 505 ...opts.headers 506 }; 507 opts.path = parsedUrl.pathname + parsedUrl.search; 508 opts.timeout = opts.handshakeTimeout; 509 510 if (opts.perMessageDeflate) { 511 perMessageDeflate = new PerMessageDeflate( 512 opts.perMessageDeflate !== true ? opts.perMessageDeflate : {}, 513 false, 514 opts.maxPayload 515 ); 516 opts.headers['Sec-WebSocket-Extensions'] = format({ 517 [PerMessageDeflate.extensionName]: perMessageDeflate.offer() 518 }); 519 } 520 if (protocols) { 521 opts.headers['Sec-WebSocket-Protocol'] = protocols; 522 } 523 if (opts.origin) { 524 if (opts.protocolVersion < 13) { 525 opts.headers['Sec-WebSocket-Origin'] = opts.origin; 526 } else { 527 opts.headers.Origin = opts.origin; 528 } 529 } 530 if (parsedUrl.username || parsedUrl.password) { 531 opts.auth = `${parsedUrl.username}:${parsedUrl.password}`; 532 } 533 534 if (isUnixSocket) { 535 const parts = opts.path.split(':'); 536 537 opts.socketPath = parts[0]; 538 opts.path = parts[1]; 539 } 540 541 let req = (websocket._req = get(opts)); 542 543 if (opts.timeout) { 544 req.on('timeout', () => { 545 abortHandshake(websocket, req, 'Opening handshake has timed out'); 546 }); 547 } 548 549 req.on('error', (err) => { 550 if (websocket._req.aborted) return; 551 552 req = websocket._req = null; 553 websocket.readyState = WebSocket.CLOSING; 554 websocket.emit('error', err); 555 websocket.emitClose(); 556 }); 557 558 req.on('response', (res) => { 559 const location = res.headers.location; 560 const statusCode = res.statusCode; 561 562 if ( 563 location && 564 opts.followRedirects && 565 statusCode >= 300 && 566 statusCode < 400 567 ) { 568 if (++websocket._redirects > opts.maxRedirects) { 569 abortHandshake(websocket, req, 'Maximum redirects exceeded'); 570 return; 571 } 572 573 req.abort(); 574 575 const addr = new URL(location, address); 576 577 initAsClient(websocket, addr, protocols, options); 578 } else if (!websocket.emit('unexpected-response', req, res)) { 579 abortHandshake( 580 websocket, 581 req, 582 `Unexpected server response: ${res.statusCode}` 583 ); 584 } 585 }); 586 587 req.on('upgrade', (res, socket, head) => { 588 websocket.emit('upgrade', res); 589 590 // 591 // The user may have closed the connection from a listener of the `upgrade` 592 // event. 593 // 594 if (websocket.readyState !== WebSocket.CONNECTING) return; 595 596 req = websocket._req = null; 597 598 const digest = createHash('sha1') 599 .update(key + GUID) 600 .digest('base64'); 601 602 if (res.headers['sec-websocket-accept'] !== digest) { 603 abortHandshake(websocket, socket, 'Invalid Sec-WebSocket-Accept header'); 604 return; 605 } 606 607 const serverProt = res.headers['sec-websocket-protocol']; 608 const protList = (protocols || '').split(/, */); 609 let protError; 610 611 if (!protocols && serverProt) { 612 protError = 'Server sent a subprotocol but none was requested'; 613 } else if (protocols && !serverProt) { 614 protError = 'Server sent no subprotocol'; 615 } else if (serverProt && !protList.includes(serverProt)) { 616 protError = 'Server sent an invalid subprotocol'; 617 } 618 619 if (protError) { 620 abortHandshake(websocket, socket, protError); 621 return; 622 } 623 624 if (serverProt) websocket.protocol = serverProt; 625 626 if (perMessageDeflate) { 627 try { 628 const extensions = parse(res.headers['sec-websocket-extensions']); 629 630 if (extensions[PerMessageDeflate.extensionName]) { 631 perMessageDeflate.accept(extensions[PerMessageDeflate.extensionName]); 632 websocket._extensions[ 633 PerMessageDeflate.extensionName 634 ] = perMessageDeflate; 635 } 636 } catch (err) { 637 abortHandshake( 638 websocket, 639 socket, 640 'Invalid Sec-WebSocket-Extensions header' 641 ); 642 return; 643 } 644 } 645 646 websocket.setSocket(socket, head, opts.maxPayload); 647 }); 648 } 649 650 /** 651 * Create a `net.Socket` and initiate a connection. 652 * 653 * @param {Object} options Connection options 654 * @return {net.Socket} The newly created socket used to start the connection 655 * @private 656 */ 657 function netConnect(options) { 658 options.path = options.socketPath; 659 return net.connect(options); 660 } 661 662 /** 663 * Create a `tls.TLSSocket` and initiate a connection. 664 * 665 * @param {Object} options Connection options 666 * @return {tls.TLSSocket} The newly created socket used to start the connection 667 * @private 668 */ 669 function tlsConnect(options) { 670 options.path = undefined; 671 672 if (!options.servername && options.servername !== '') { 673 options.servername = options.host; 674 } 675 676 return tls.connect(options); 677 } 678 679 /** 680 * Abort the handshake and emit an error. 681 * 682 * @param {WebSocket} websocket The WebSocket instance 683 * @param {(http.ClientRequest|net.Socket)} stream The request to abort or the 684 * socket to destroy 685 * @param {String} message The error message 686 * @private 687 */ 688 function abortHandshake(websocket, stream, message) { 689 websocket.readyState = WebSocket.CLOSING; 690 691 const err = new Error(message); 692 Error.captureStackTrace(err, abortHandshake); 693 694 if (stream.setHeader) { 695 stream.abort(); 696 stream.once('abort', websocket.emitClose.bind(websocket)); 697 websocket.emit('error', err); 698 } else { 699 stream.destroy(err); 700 stream.once('error', websocket.emit.bind(websocket, 'error')); 701 stream.once('close', websocket.emitClose.bind(websocket)); 702 } 703 } 704 705 /** 706 * Handle cases where the `ping()`, `pong()`, or `send()` methods are called 707 * when the `readyState` attribute is `CLOSING` or `CLOSED`. 708 * 709 * @param {WebSocket} websocket The WebSocket instance 710 * @param {*} data The data to send 711 * @param {Function} cb Callback 712 * @private 713 */ 714 function sendAfterClose(websocket, data, cb) { 715 if (data) { 716 const length = toBuffer(data).length; 717 718 // 719 // The `_bufferedAmount` property is used only when the peer is a client and 720 // the opening handshake fails. Under these circumstances, in fact, the 721 // `setSocket()` method is not called, so the `_socket` and `_sender` 722 // properties are set to `null`. 723 // 724 if (websocket._socket) websocket._sender._bufferedBytes += length; 725 else websocket._bufferedAmount += length; 726 } 727 728 if (cb) { 729 const err = new Error( 730 `WebSocket is not open: readyState ${websocket.readyState} ` + 731 `(${readyStates[websocket.readyState]})` 732 ); 733 cb(err); 734 } 735 } 736 737 /** 738 * The listener of the `Receiver` `'conclude'` event. 739 * 740 * @param {Number} code The status code 741 * @param {String} reason The reason for closing 742 * @private 743 */ 744 function receiverOnConclude(code, reason) { 745 const websocket = this[kWebSocket]; 746 747 websocket._socket.removeListener('data', socketOnData); 748 websocket._socket.resume(); 749 750 websocket._closeFrameReceived = true; 751 websocket._closeMessage = reason; 752 websocket._closeCode = code; 753 754 if (code === 1005) websocket.close(); 755 else websocket.close(code, reason); 756 } 757 758 /** 759 * The listener of the `Receiver` `'drain'` event. 760 * 761 * @private 762 */ 763 function receiverOnDrain() { 764 this[kWebSocket]._socket.resume(); 765 } 766 767 /** 768 * The listener of the `Receiver` `'error'` event. 769 * 770 * @param {(RangeError|Error)} err The emitted error 771 * @private 772 */ 773 function receiverOnError(err) { 774 const websocket = this[kWebSocket]; 775 776 websocket._socket.removeListener('data', socketOnData); 777 778 websocket.readyState = WebSocket.CLOSING; 779 websocket._closeCode = err[kStatusCode]; 780 websocket.emit('error', err); 781 websocket._socket.destroy(); 782 } 783 784 /** 785 * The listener of the `Receiver` `'finish'` event. 786 * 787 * @private 788 */ 789 function receiverOnFinish() { 790 this[kWebSocket].emitClose(); 791 } 792 793 /** 794 * The listener of the `Receiver` `'message'` event. 795 * 796 * @param {(String|Buffer|ArrayBuffer|Buffer[])} data The message 797 * @private 798 */ 799 function receiverOnMessage(data) { 800 this[kWebSocket].emit('message', data); 801 } 802 803 /** 804 * The listener of the `Receiver` `'ping'` event. 805 * 806 * @param {Buffer} data The data included in the ping frame 807 * @private 808 */ 809 function receiverOnPing(data) { 810 const websocket = this[kWebSocket]; 811 812 websocket.pong(data, !websocket._isServer, NOOP); 813 websocket.emit('ping', data); 814 } 815 816 /** 817 * The listener of the `Receiver` `'pong'` event. 818 * 819 * @param {Buffer} data The data included in the pong frame 820 * @private 821 */ 822 function receiverOnPong(data) { 823 this[kWebSocket].emit('pong', data); 824 } 825 826 /** 827 * The listener of the `net.Socket` `'close'` event. 828 * 829 * @private 830 */ 831 function socketOnClose() { 832 const websocket = this[kWebSocket]; 833 834 this.removeListener('close', socketOnClose); 835 this.removeListener('end', socketOnEnd); 836 837 websocket.readyState = WebSocket.CLOSING; 838 839 // 840 // The close frame might not have been received or the `'end'` event emitted, 841 // for example, if the socket was destroyed due to an error. Ensure that the 842 // `receiver` stream is closed after writing any remaining buffered data to 843 // it. If the readable side of the socket is in flowing mode then there is no 844 // buffered data as everything has been already written and `readable.read()` 845 // will return `null`. If instead, the socket is paused, any possible buffered 846 // data will be read as a single chunk and emitted synchronously in a single 847 // `'data'` event. 848 // 849 websocket._socket.read(); 850 websocket._receiver.end(); 851 852 this.removeListener('data', socketOnData); 853 this[kWebSocket] = undefined; 854 855 clearTimeout(websocket._closeTimer); 856 857 if ( 858 websocket._receiver._writableState.finished || 859 websocket._receiver._writableState.errorEmitted 860 ) { 861 websocket.emitClose(); 862 } else { 863 websocket._receiver.on('error', receiverOnFinish); 864 websocket._receiver.on('finish', receiverOnFinish); 865 } 866 } 867 868 /** 869 * The listener of the `net.Socket` `'data'` event. 870 * 871 * @param {Buffer} chunk A chunk of data 872 * @private 873 */ 874 function socketOnData(chunk) { 875 if (!this[kWebSocket]._receiver.write(chunk)) { 876 this.pause(); 877 } 878 } 879 880 /** 881 * The listener of the `net.Socket` `'end'` event. 882 * 883 * @private 884 */ 885 function socketOnEnd() { 886 const websocket = this[kWebSocket]; 887 888 websocket.readyState = WebSocket.CLOSING; 889 websocket._receiver.end(); 890 this.end(); 891 } 892 893 /** 894 * The listener of the `net.Socket` `'error'` event. 895 * 896 * @private 897 */ 898 function socketOnError() { 899 const websocket = this[kWebSocket]; 900 901 this.removeListener('error', socketOnError); 902 this.on('error', NOOP); 903 904 if (websocket) { 905 websocket.readyState = WebSocket.CLOSING; 906 this.destroy(); 907 } 908 }