twitst4tz

twitter statistics web application
Log | Files | Refs | README | LICENSE

websocket.js (23245B)


      1 'use strict';
      2 
      3 const EventEmitter = require('events');
      4 const crypto = require('crypto');
      5 const https = require('https');
      6 const http = require('http');
      7 const net = require('net');
      8 const tls = require('tls');
      9 const url = require('url');
     10 
     11 const PerMessageDeflate = require('./permessage-deflate');
     12 const EventTarget = require('./event-target');
     13 const extension = require('./extension');
     14 const constants = require('./constants');
     15 const Receiver = require('./receiver');
     16 const Sender = require('./sender');
     17 
     18 const readyStates = ['CONNECTING', 'OPEN', 'CLOSING', 'CLOSED'];
     19 const kWebSocket = constants.kWebSocket;
     20 const protocolVersions = [8, 13];
     21 const closeTimeout = 30 * 1000; // Allow 30 seconds to terminate the connection cleanly.
     22 
     23 /**
     24  * Class representing a WebSocket.
     25  *
     26  * @extends EventEmitter
     27  */
     28 class WebSocket extends EventEmitter {
     29   /**
     30    * Create a new `WebSocket`.
     31    *
     32    * @param {(String|url.Url|url.URL)} address The URL to which to connect
     33    * @param {(String|String[])} protocols The subprotocols
     34    * @param {Object} options Connection options
     35    */
     36   constructor(address, protocols, options) {
     37     super();
     38 
     39     this.readyState = WebSocket.CONNECTING;
     40     this.protocol = '';
     41 
     42     this._binaryType = constants.BINARY_TYPES[0];
     43     this._closeFrameReceived = false;
     44     this._closeFrameSent = false;
     45     this._closeMessage = '';
     46     this._closeTimer = null;
     47     this._closeCode = 1006;
     48     this._extensions = {};
     49     this._isServer = true;
     50     this._receiver = null;
     51     this._sender = null;
     52     this._socket = null;
     53 
     54     if (address !== null) {
     55       if (Array.isArray(protocols)) {
     56         protocols = protocols.join(', ');
     57       } else if (typeof protocols === 'object' && protocols !== null) {
     58         options = protocols;
     59         protocols = undefined;
     60       }
     61 
     62       initAsClient.call(this, address, protocols, options);
     63     }
     64   }
     65 
     66   get CONNECTING() {
     67     return WebSocket.CONNECTING;
     68   }
     69   get CLOSING() {
     70     return WebSocket.CLOSING;
     71   }
     72   get CLOSED() {
     73     return WebSocket.CLOSED;
     74   }
     75   get OPEN() {
     76     return WebSocket.OPEN;
     77   }
     78 
     79   /**
     80    * This deviates from the WHATWG interface since ws doesn't support the required
     81    * default "blob" type (instead we define a custom "nodebuffer" type).
     82    *
     83    * @type {String}
     84    */
     85   get binaryType() {
     86     return this._binaryType;
     87   }
     88 
     89   set binaryType(type) {
     90     if (!constants.BINARY_TYPES.includes(type)) return;
     91 
     92     this._binaryType = type;
     93 
     94     //
     95     // Allow to change `binaryType` on the fly.
     96     //
     97     if (this._receiver) this._receiver._binaryType = type;
     98   }
     99 
    100   /**
    101    * @type {Number}
    102    */
    103   get bufferedAmount() {
    104     if (!this._socket) return 0;
    105 
    106     //
    107     // `socket.bufferSize` is `undefined` if the socket is closed.
    108     //
    109     return (this._socket.bufferSize || 0) + this._sender._bufferedBytes;
    110   }
    111 
    112   /**
    113    * @type {String}
    114    */
    115   get extensions() {
    116     return Object.keys(this._extensions).join();
    117   }
    118 
    119   /**
    120    * Set up the socket and the internal resources.
    121    *
    122    * @param {net.Socket} socket The network socket between the server and client
    123    * @param {Buffer} head The first packet of the upgraded stream
    124    * @param {Number} maxPayload The maximum allowed message size
    125    * @private
    126    */
    127   setSocket(socket, head, maxPayload) {
    128     const receiver = new Receiver(
    129       this._binaryType,
    130       this._extensions,
    131       maxPayload
    132     );
    133 
    134     this._sender = new Sender(socket, this._extensions);
    135     this._receiver = receiver;
    136     this._socket = socket;
    137 
    138     receiver[kWebSocket] = this;
    139     socket[kWebSocket] = this;
    140 
    141     receiver.on('conclude', receiverOnConclude);
    142     receiver.on('drain', receiverOnDrain);
    143     receiver.on('error', receiverOnError);
    144     receiver.on('message', receiverOnMessage);
    145     receiver.on('ping', receiverOnPing);
    146     receiver.on('pong', receiverOnPong);
    147 
    148     socket.setTimeout(0);
    149     socket.setNoDelay();
    150 
    151     if (head.length > 0) socket.unshift(head);
    152 
    153     socket.on('close', socketOnClose);
    154     socket.on('data', socketOnData);
    155     socket.on('end', socketOnEnd);
    156     socket.on('error', socketOnError);
    157 
    158     this.readyState = WebSocket.OPEN;
    159     this.emit('open');
    160   }
    161 
    162   /**
    163    * Emit the `'close'` event.
    164    *
    165    * @private
    166    */
    167   emitClose() {
    168     this.readyState = WebSocket.CLOSED;
    169 
    170     if (!this._socket) {
    171       this.emit('close', this._closeCode, this._closeMessage);
    172       return;
    173     }
    174 
    175     if (this._extensions[PerMessageDeflate.extensionName]) {
    176       this._extensions[PerMessageDeflate.extensionName].cleanup();
    177     }
    178 
    179     this._receiver.removeAllListeners();
    180     this.emit('close', this._closeCode, this._closeMessage);
    181   }
    182 
    183   /**
    184    * Start a closing handshake.
    185    *
    186    *          +----------+   +-----------+   +----------+
    187    *     - - -|ws.close()|-->|close frame|-->|ws.close()|- - -
    188    *    |     +----------+   +-----------+   +----------+     |
    189    *          +----------+   +-----------+         |
    190    * CLOSING  |ws.close()|<--|close frame|<--+-----+       CLOSING
    191    *          +----------+   +-----------+   |
    192    *    |           |                        |   +---+        |
    193    *                +------------------------+-->|fin| - - - -
    194    *    |         +---+                      |   +---+
    195    *     - - - - -|fin|<---------------------+
    196    *              +---+
    197    *
    198    * @param {Number} code Status code explaining why the connection is closing
    199    * @param {String} data A string explaining why the connection is closing
    200    * @public
    201    */
    202   close(code, data) {
    203     if (this.readyState === WebSocket.CLOSED) return;
    204     if (this.readyState === WebSocket.CONNECTING) {
    205       const msg = 'WebSocket was closed before the connection was established';
    206       return abortHandshake(this, this._req, msg);
    207     }
    208 
    209     if (this.readyState === WebSocket.CLOSING) {
    210       if (this._closeFrameSent && this._closeFrameReceived) this._socket.end();
    211       return;
    212     }
    213 
    214     this.readyState = WebSocket.CLOSING;
    215     this._sender.close(code, data, !this._isServer, (err) => {
    216       //
    217       // This error is handled by the `'error'` listener on the socket. We only
    218       // want to know if the close frame has been sent here.
    219       //
    220       if (err) return;
    221 
    222       this._closeFrameSent = true;
    223 
    224       if (this._socket.writable) {
    225         if (this._closeFrameReceived) this._socket.end();
    226 
    227         //
    228         // Ensure that the connection is closed even if the closing handshake
    229         // fails.
    230         //
    231         this._closeTimer = setTimeout(
    232           this._socket.destroy.bind(this._socket),
    233           closeTimeout
    234         );
    235       }
    236     });
    237   }
    238 
    239   /**
    240    * Send a ping.
    241    *
    242    * @param {*} data The data to send
    243    * @param {Boolean} mask Indicates whether or not to mask `data`
    244    * @param {Function} cb Callback which is executed when the ping is sent
    245    * @public
    246    */
    247   ping(data, mask, cb) {
    248     if (typeof data === 'function') {
    249       cb = data;
    250       data = mask = undefined;
    251     } else if (typeof mask === 'function') {
    252       cb = mask;
    253       mask = undefined;
    254     }
    255 
    256     if (this.readyState !== WebSocket.OPEN) {
    257       const err = new Error(
    258         `WebSocket is not open: readyState ${this.readyState} ` +
    259           `(${readyStates[this.readyState]})`
    260       );
    261 
    262       if (cb) return cb(err);
    263       throw err;
    264     }
    265 
    266     if (typeof data === 'number') data = data.toString();
    267     if (mask === undefined) mask = !this._isServer;
    268     this._sender.ping(data || constants.EMPTY_BUFFER, mask, cb);
    269   }
    270 
    271   /**
    272    * Send a pong.
    273    *
    274    * @param {*} data The data to send
    275    * @param {Boolean} mask Indicates whether or not to mask `data`
    276    * @param {Function} cb Callback which is executed when the pong is sent
    277    * @public
    278    */
    279   pong(data, mask, cb) {
    280     if (typeof data === 'function') {
    281       cb = data;
    282       data = mask = undefined;
    283     } else if (typeof mask === 'function') {
    284       cb = mask;
    285       mask = undefined;
    286     }
    287 
    288     if (this.readyState !== WebSocket.OPEN) {
    289       const err = new Error(
    290         `WebSocket is not open: readyState ${this.readyState} ` +
    291           `(${readyStates[this.readyState]})`
    292       );
    293 
    294       if (cb) return cb(err);
    295       throw err;
    296     }
    297 
    298     if (typeof data === 'number') data = data.toString();
    299     if (mask === undefined) mask = !this._isServer;
    300     this._sender.pong(data || constants.EMPTY_BUFFER, mask, cb);
    301   }
    302 
    303   /**
    304    * Send a data message.
    305    *
    306    * @param {*} data The message to send
    307    * @param {Object} options Options object
    308    * @param {Boolean} options.compress Specifies whether or not to compress `data`
    309    * @param {Boolean} options.binary Specifies whether `data` is binary or text
    310    * @param {Boolean} options.fin Specifies whether the fragment is the last one
    311    * @param {Boolean} options.mask Specifies whether or not to mask `data`
    312    * @param {Function} cb Callback which is executed when data is written out
    313    * @public
    314    */
    315   send(data, options, cb) {
    316     if (typeof options === 'function') {
    317       cb = options;
    318       options = {};
    319     }
    320 
    321     if (this.readyState !== WebSocket.OPEN) {
    322       const err = new Error(
    323         `WebSocket is not open: readyState ${this.readyState} ` +
    324           `(${readyStates[this.readyState]})`
    325       );
    326 
    327       if (cb) return cb(err);
    328       throw err;
    329     }
    330 
    331     if (typeof data === 'number') data = data.toString();
    332 
    333     const opts = Object.assign(
    334       {
    335         binary: typeof data !== 'string',
    336         mask: !this._isServer,
    337         compress: true,
    338         fin: true
    339       },
    340       options
    341     );
    342 
    343     if (!this._extensions[PerMessageDeflate.extensionName]) {
    344       opts.compress = false;
    345     }
    346 
    347     this._sender.send(data || constants.EMPTY_BUFFER, opts, cb);
    348   }
    349 
    350   /**
    351    * Forcibly close the connection.
    352    *
    353    * @public
    354    */
    355   terminate() {
    356     if (this.readyState === WebSocket.CLOSED) return;
    357     if (this.readyState === WebSocket.CONNECTING) {
    358       const msg = 'WebSocket was closed before the connection was established';
    359       return abortHandshake(this, this._req, msg);
    360     }
    361 
    362     if (this._socket) {
    363       this.readyState = WebSocket.CLOSING;
    364       this._socket.destroy();
    365     }
    366   }
    367 }
    368 
    369 readyStates.forEach((readyState, i) => {
    370   WebSocket[readyState] = i;
    371 });
    372 
    373 //
    374 // Add the `onopen`, `onerror`, `onclose`, and `onmessage` attributes.
    375 // See https://html.spec.whatwg.org/multipage/comms.html#the-websocket-interface
    376 //
    377 ['open', 'error', 'close', 'message'].forEach((method) => {
    378   Object.defineProperty(WebSocket.prototype, `on${method}`, {
    379     /**
    380      * Return the listener of the event.
    381      *
    382      * @return {(Function|undefined)} The event listener or `undefined`
    383      * @public
    384      */
    385     get() {
    386       const listeners = this.listeners(method);
    387       for (var i = 0; i < listeners.length; i++) {
    388         if (listeners[i]._listener) return listeners[i]._listener;
    389       }
    390 
    391       return undefined;
    392     },
    393     /**
    394      * Add a listener for the event.
    395      *
    396      * @param {Function} listener The listener to add
    397      * @public
    398      */
    399     set(listener) {
    400       const listeners = this.listeners(method);
    401       for (var i = 0; i < listeners.length; i++) {
    402         //
    403         // Remove only the listeners added via `addEventListener`.
    404         //
    405         if (listeners[i]._listener) this.removeListener(method, listeners[i]);
    406       }
    407       this.addEventListener(method, listener);
    408     }
    409   });
    410 });
    411 
    412 WebSocket.prototype.addEventListener = EventTarget.addEventListener;
    413 WebSocket.prototype.removeEventListener = EventTarget.removeEventListener;
    414 
    415 module.exports = WebSocket;
    416 
    417 /**
    418  * Initialize a WebSocket client.
    419  *
    420  * @param {(String|url.Url|url.URL)} address The URL to which to connect
    421  * @param {String} protocols The subprotocols
    422  * @param {Object} options Connection options
    423  * @param {(Boolean|Object)} options.perMessageDeflate Enable/disable permessage-deflate
    424  * @param {Number} options.handshakeTimeout Timeout in milliseconds for the handshake request
    425  * @param {Number} options.protocolVersion Value of the `Sec-WebSocket-Version` header
    426  * @param {String} options.origin Value of the `Origin` or `Sec-WebSocket-Origin` header
    427  * @param {Number} options.maxPayload The maximum allowed message size
    428  * @private
    429  */
    430 function initAsClient(address, protocols, options) {
    431   options = Object.assign(
    432     {
    433       protocolVersion: protocolVersions[1],
    434       perMessageDeflate: true,
    435       maxPayload: 100 * 1024 * 1024
    436     },
    437     options,
    438     {
    439       createConnection: undefined,
    440       socketPath: undefined,
    441       hostname: undefined,
    442       protocol: undefined,
    443       timeout: undefined,
    444       method: undefined,
    445       auth: undefined,
    446       host: undefined,
    447       path: undefined,
    448       port: undefined
    449     }
    450   );
    451 
    452   if (!protocolVersions.includes(options.protocolVersion)) {
    453     throw new RangeError(
    454       `Unsupported protocol version: ${options.protocolVersion} ` +
    455         `(supported versions: ${protocolVersions.join(', ')})`
    456     );
    457   }
    458 
    459   this._isServer = false;
    460 
    461   var parsedUrl;
    462 
    463   if (typeof address === 'object' && address.href !== undefined) {
    464     parsedUrl = address;
    465     this.url = address.href;
    466   } else {
    467     //
    468     // The WHATWG URL constructor is not available on Node.js < 6.13.0
    469     //
    470     parsedUrl = url.URL ? new url.URL(address) : url.parse(address);
    471     this.url = address;
    472   }
    473 
    474   const isUnixSocket = parsedUrl.protocol === 'ws+unix:';
    475 
    476   if (!parsedUrl.host && (!isUnixSocket || !parsedUrl.pathname)) {
    477     throw new Error(`Invalid URL: ${this.url}`);
    478   }
    479 
    480   const isSecure =
    481     parsedUrl.protocol === 'wss:' || parsedUrl.protocol === 'https:';
    482   const defaultPort = isSecure ? 443 : 80;
    483   const key = crypto.randomBytes(16).toString('base64');
    484   const httpObj = isSecure ? https : http;
    485   const path = parsedUrl.search
    486     ? `${parsedUrl.pathname || '/'}${parsedUrl.search}`
    487     : parsedUrl.pathname || '/';
    488   var perMessageDeflate;
    489 
    490   options.createConnection = isSecure ? tlsConnect : netConnect;
    491   options.defaultPort = options.defaultPort || defaultPort;
    492   options.port = parsedUrl.port || defaultPort;
    493   options.host = parsedUrl.hostname.startsWith('[')
    494     ? parsedUrl.hostname.slice(1, -1)
    495     : parsedUrl.hostname;
    496   options.headers = Object.assign(
    497     {
    498       'Sec-WebSocket-Version': options.protocolVersion,
    499       'Sec-WebSocket-Key': key,
    500       Connection: 'Upgrade',
    501       Upgrade: 'websocket'
    502     },
    503     options.headers
    504   );
    505   options.path = path;
    506   options.timeout = options.handshakeTimeout;
    507 
    508   if (options.perMessageDeflate) {
    509     perMessageDeflate = new PerMessageDeflate(
    510       options.perMessageDeflate !== true ? options.perMessageDeflate : {},
    511       false,
    512       options.maxPayload
    513     );
    514     options.headers['Sec-WebSocket-Extensions'] = extension.format({
    515       [PerMessageDeflate.extensionName]: perMessageDeflate.offer()
    516     });
    517   }
    518   if (protocols) {
    519     options.headers['Sec-WebSocket-Protocol'] = protocols;
    520   }
    521   if (options.origin) {
    522     if (options.protocolVersion < 13) {
    523       options.headers['Sec-WebSocket-Origin'] = options.origin;
    524     } else {
    525       options.headers.Origin = options.origin;
    526     }
    527   }
    528   if (parsedUrl.auth) {
    529     options.auth = parsedUrl.auth;
    530   } else if (parsedUrl.username || parsedUrl.password) {
    531     options.auth = `${parsedUrl.username}:${parsedUrl.password}`;
    532   }
    533 
    534   if (isUnixSocket) {
    535     const parts = path.split(':');
    536 
    537     options.socketPath = parts[0];
    538     options.path = parts[1];
    539   }
    540 
    541   var req = (this._req = httpObj.get(options));
    542 
    543   if (options.handshakeTimeout) {
    544     req.on('timeout', () => {
    545       abortHandshake(this, req, 'Opening handshake has timed out');
    546     });
    547   }
    548 
    549   req.on('error', (err) => {
    550     if (this._req.aborted) return;
    551 
    552     req = this._req = null;
    553     this.readyState = WebSocket.CLOSING;
    554     this.emit('error', err);
    555     this.emitClose();
    556   });
    557 
    558   req.on('response', (res) => {
    559     if (this.emit('unexpected-response', req, res)) return;
    560 
    561     abortHandshake(this, req, `Unexpected server response: ${res.statusCode}`);
    562   });
    563 
    564   req.on('upgrade', (res, socket, head) => {
    565     this.emit('upgrade', res);
    566 
    567     //
    568     // The user may have closed the connection from a listener of the `upgrade`
    569     // event.
    570     //
    571     if (this.readyState !== WebSocket.CONNECTING) return;
    572 
    573     req = this._req = null;
    574 
    575     const digest = crypto
    576       .createHash('sha1')
    577       .update(key + constants.GUID, 'binary')
    578       .digest('base64');
    579 
    580     if (res.headers['sec-websocket-accept'] !== digest) {
    581       abortHandshake(this, socket, 'Invalid Sec-WebSocket-Accept header');
    582       return;
    583     }
    584 
    585     const serverProt = res.headers['sec-websocket-protocol'];
    586     const protList = (protocols || '').split(/, */);
    587     var protError;
    588 
    589     if (!protocols && serverProt) {
    590       protError = 'Server sent a subprotocol but none was requested';
    591     } else if (protocols && !serverProt) {
    592       protError = 'Server sent no subprotocol';
    593     } else if (serverProt && !protList.includes(serverProt)) {
    594       protError = 'Server sent an invalid subprotocol';
    595     }
    596 
    597     if (protError) {
    598       abortHandshake(this, socket, protError);
    599       return;
    600     }
    601 
    602     if (serverProt) this.protocol = serverProt;
    603 
    604     if (perMessageDeflate) {
    605       try {
    606         const extensions = extension.parse(
    607           res.headers['sec-websocket-extensions']
    608         );
    609 
    610         if (extensions[PerMessageDeflate.extensionName]) {
    611           perMessageDeflate.accept(extensions[PerMessageDeflate.extensionName]);
    612           this._extensions[PerMessageDeflate.extensionName] = perMessageDeflate;
    613         }
    614       } catch (err) {
    615         abortHandshake(this, socket, 'Invalid Sec-WebSocket-Extensions header');
    616         return;
    617       }
    618     }
    619 
    620     this.setSocket(socket, head, options.maxPayload);
    621   });
    622 }
    623 
    624 /**
    625  * Create a `net.Socket` and initiate a connection.
    626  *
    627  * @param {Object} options Connection options
    628  * @return {net.Socket} The newly created socket used to start the connection
    629  * @private
    630  */
    631 function netConnect(options) {
    632   //
    633   // Override `options.path` only if `options` is a copy of the original options
    634   // object. This is always true on Node.js >= 8 but not on Node.js 6 where
    635   // `options.socketPath` might be `undefined` even if the `socketPath` option
    636   // was originally set.
    637   //
    638   if (options.protocolVersion) options.path = options.socketPath;
    639   return net.connect(options);
    640 }
    641 
    642 /**
    643  * Create a `tls.TLSSocket` and initiate a connection.
    644  *
    645  * @param {Object} options Connection options
    646  * @return {tls.TLSSocket} The newly created socket used to start the connection
    647  * @private
    648  */
    649 function tlsConnect(options) {
    650   options.path = undefined;
    651   options.servername = options.servername || options.host;
    652   return tls.connect(options);
    653 }
    654 
    655 /**
    656  * Abort the handshake and emit an error.
    657  *
    658  * @param {WebSocket} websocket The WebSocket instance
    659  * @param {(http.ClientRequest|net.Socket)} stream The request to abort or the
    660  *     socket to destroy
    661  * @param {String} message The error message
    662  * @private
    663  */
    664 function abortHandshake(websocket, stream, message) {
    665   websocket.readyState = WebSocket.CLOSING;
    666 
    667   const err = new Error(message);
    668   Error.captureStackTrace(err, abortHandshake);
    669 
    670   if (stream.setHeader) {
    671     stream.abort();
    672     stream.once('abort', websocket.emitClose.bind(websocket));
    673     websocket.emit('error', err);
    674   } else {
    675     stream.destroy(err);
    676     stream.once('error', websocket.emit.bind(websocket, 'error'));
    677     stream.once('close', websocket.emitClose.bind(websocket));
    678   }
    679 }
    680 
    681 /**
    682  * The listener of the `Receiver` `'conclude'` event.
    683  *
    684  * @param {Number} code The status code
    685  * @param {String} reason The reason for closing
    686  * @private
    687  */
    688 function receiverOnConclude(code, reason) {
    689   const websocket = this[kWebSocket];
    690 
    691   websocket._socket.removeListener('data', socketOnData);
    692   websocket._socket.resume();
    693 
    694   websocket._closeFrameReceived = true;
    695   websocket._closeMessage = reason;
    696   websocket._closeCode = code;
    697 
    698   if (code === 1005) websocket.close();
    699   else websocket.close(code, reason);
    700 }
    701 
    702 /**
    703  * The listener of the `Receiver` `'drain'` event.
    704  *
    705  * @private
    706  */
    707 function receiverOnDrain() {
    708   this[kWebSocket]._socket.resume();
    709 }
    710 
    711 /**
    712  * The listener of the `Receiver` `'error'` event.
    713  *
    714  * @param {(RangeError|Error)} err The emitted error
    715  * @private
    716  */
    717 function receiverOnError(err) {
    718   const websocket = this[kWebSocket];
    719 
    720   websocket._socket.removeListener('data', socketOnData);
    721 
    722   websocket.readyState = WebSocket.CLOSING;
    723   websocket._closeCode = err[constants.kStatusCode];
    724   websocket.emit('error', err);
    725   websocket._socket.destroy();
    726 }
    727 
    728 /**
    729  * The listener of the `Receiver` `'finish'` event.
    730  *
    731  * @private
    732  */
    733 function receiverOnFinish() {
    734   this[kWebSocket].emitClose();
    735 }
    736 
    737 /**
    738  * The listener of the `Receiver` `'message'` event.
    739  *
    740  * @param {(String|Buffer|ArrayBuffer|Buffer[])} data The message
    741  * @private
    742  */
    743 function receiverOnMessage(data) {
    744   this[kWebSocket].emit('message', data);
    745 }
    746 
    747 /**
    748  * The listener of the `Receiver` `'ping'` event.
    749  *
    750  * @param {Buffer} data The data included in the ping frame
    751  * @private
    752  */
    753 function receiverOnPing(data) {
    754   const websocket = this[kWebSocket];
    755 
    756   websocket.pong(data, !websocket._isServer, constants.NOOP);
    757   websocket.emit('ping', data);
    758 }
    759 
    760 /**
    761  * The listener of the `Receiver` `'pong'` event.
    762  *
    763  * @param {Buffer} data The data included in the pong frame
    764  * @private
    765  */
    766 function receiverOnPong(data) {
    767   this[kWebSocket].emit('pong', data);
    768 }
    769 
    770 /**
    771  * The listener of the `net.Socket` `'close'` event.
    772  *
    773  * @private
    774  */
    775 function socketOnClose() {
    776   const websocket = this[kWebSocket];
    777 
    778   this.removeListener('close', socketOnClose);
    779   this.removeListener('end', socketOnEnd);
    780 
    781   websocket.readyState = WebSocket.CLOSING;
    782 
    783   //
    784   // The close frame might not have been received or the `'end'` event emitted,
    785   // for example, if the socket was destroyed due to an error. Ensure that the
    786   // `receiver` stream is closed after writing any remaining buffered data to
    787   // it. If the readable side of the socket is in flowing mode then there is no
    788   // buffered data as everything has been already written and `readable.read()`
    789   // will return `null`. If instead, the socket is paused, any possible buffered
    790   // data will be read as a single chunk and emitted synchronously in a single
    791   // `'data'` event.
    792   //
    793   websocket._socket.read();
    794   websocket._receiver.end();
    795 
    796   this.removeListener('data', socketOnData);
    797   this[kWebSocket] = undefined;
    798 
    799   clearTimeout(websocket._closeTimer);
    800 
    801   if (
    802     websocket._receiver._writableState.finished ||
    803     websocket._receiver._writableState.errorEmitted
    804   ) {
    805     websocket.emitClose();
    806   } else {
    807     websocket._receiver.on('error', receiverOnFinish);
    808     websocket._receiver.on('finish', receiverOnFinish);
    809   }
    810 }
    811 
    812 /**
    813  * The listener of the `net.Socket` `'data'` event.
    814  *
    815  * @param {Buffer} chunk A chunk of data
    816  * @private
    817  */
    818 function socketOnData(chunk) {
    819   if (!this[kWebSocket]._receiver.write(chunk)) {
    820     this.pause();
    821   }
    822 }
    823 
    824 /**
    825  * The listener of the `net.Socket` `'end'` event.
    826  *
    827  * @private
    828  */
    829 function socketOnEnd() {
    830   const websocket = this[kWebSocket];
    831 
    832   websocket.readyState = WebSocket.CLOSING;
    833   websocket._receiver.end();
    834   this.end();
    835 }
    836 
    837 /**
    838  * The listener of the `net.Socket` `'error'` event.
    839  *
    840  * @private
    841  */
    842 function socketOnError() {
    843   const websocket = this[kWebSocket];
    844 
    845   this.removeListener('error', socketOnError);
    846   this.on('error', constants.NOOP);
    847 
    848   if (websocket) {
    849     websocket.readyState = WebSocket.CLOSING;
    850     this.destroy();
    851   }
    852 }