twitst4tz

twitter statistics web application
Log | Files | Refs | README | LICENSE

stream.js (3900B)


      1 'use strict';
      2 
      3 const { Duplex } = require('stream');
      4 
      5 /**
      6  * Emits the `'close'` event on a stream.
      7  *
      8  * @param {stream.Duplex} The stream.
      9  * @private
     10  */
     11 function emitClose(stream) {
     12   stream.emit('close');
     13 }
     14 
     15 /**
     16  * The listener of the `'end'` event.
     17  *
     18  * @private
     19  */
     20 function duplexOnEnd() {
     21   if (!this.destroyed && this._writableState.finished) {
     22     this.destroy();
     23   }
     24 }
     25 
     26 /**
     27  * The listener of the `'error'` event.
     28  *
     29  * @private
     30  */
     31 function duplexOnError(err) {
     32   this.removeListener('error', duplexOnError);
     33   this.destroy();
     34   if (this.listenerCount('error') === 0) {
     35     // Do not suppress the throwing behavior.
     36     this.emit('error', err);
     37   }
     38 }
     39 
     40 /**
     41  * Wraps a `WebSocket` in a duplex stream.
     42  *
     43  * @param {WebSocket} ws The `WebSocket` to wrap
     44  * @param {Object} options The options for the `Duplex` constructor
     45  * @return {stream.Duplex} The duplex stream
     46  * @public
     47  */
     48 function createWebSocketStream(ws, options) {
     49   let resumeOnReceiverDrain = true;
     50 
     51   function receiverOnDrain() {
     52     if (resumeOnReceiverDrain) ws._socket.resume();
     53   }
     54 
     55   if (ws.readyState === ws.CONNECTING) {
     56     ws.once('open', function open() {
     57       ws._receiver.removeAllListeners('drain');
     58       ws._receiver.on('drain', receiverOnDrain);
     59     });
     60   } else {
     61     ws._receiver.removeAllListeners('drain');
     62     ws._receiver.on('drain', receiverOnDrain);
     63   }
     64 
     65   const duplex = new Duplex({
     66     ...options,
     67     autoDestroy: false,
     68     emitClose: false,
     69     objectMode: false,
     70     writableObjectMode: false
     71   });
     72 
     73   ws.on('message', function message(msg) {
     74     if (!duplex.push(msg)) {
     75       resumeOnReceiverDrain = false;
     76       ws._socket.pause();
     77     }
     78   });
     79 
     80   ws.once('error', function error(err) {
     81     if (duplex.destroyed) return;
     82 
     83     duplex.destroy(err);
     84   });
     85 
     86   ws.once('close', function close() {
     87     if (duplex.destroyed) return;
     88 
     89     duplex.push(null);
     90   });
     91 
     92   duplex._destroy = function(err, callback) {
     93     if (ws.readyState === ws.CLOSED) {
     94       callback(err);
     95       process.nextTick(emitClose, duplex);
     96       return;
     97     }
     98 
     99     let called = false;
    100 
    101     ws.once('error', function error(err) {
    102       called = true;
    103       callback(err);
    104     });
    105 
    106     ws.once('close', function close() {
    107       if (!called) callback(err);
    108       process.nextTick(emitClose, duplex);
    109     });
    110     ws.terminate();
    111   };
    112 
    113   duplex._final = function(callback) {
    114     if (ws.readyState === ws.CONNECTING) {
    115       ws.once('open', function open() {
    116         duplex._final(callback);
    117       });
    118       return;
    119     }
    120 
    121     // If the value of the `_socket` property is `null` it means that `ws` is a
    122     // client websocket and the handshake failed. In fact, when this happens, a
    123     // socket is never assigned to the websocket. Wait for the `'error'` event
    124     // that will be emitted by the websocket.
    125     if (ws._socket === null) return;
    126 
    127     if (ws._socket._writableState.finished) {
    128       if (duplex._readableState.endEmitted) duplex.destroy();
    129       callback();
    130     } else {
    131       ws._socket.once('finish', function finish() {
    132         // `duplex` is not destroyed here because the `'end'` event will be
    133         // emitted on `duplex` after this `'finish'` event. The EOF signaling
    134         // `null` chunk is, in fact, pushed when the websocket emits `'close'`.
    135         callback();
    136       });
    137       ws.close();
    138     }
    139   };
    140 
    141   duplex._read = function() {
    142     if (ws.readyState === ws.OPEN && !resumeOnReceiverDrain) {
    143       resumeOnReceiverDrain = true;
    144       if (!ws._receiver._writableState.needDrain) ws._socket.resume();
    145     }
    146   };
    147 
    148   duplex._write = function(chunk, encoding, callback) {
    149     if (ws.readyState === ws.CONNECTING) {
    150       ws.once('open', function open() {
    151         duplex._write(chunk, encoding, callback);
    152       });
    153       return;
    154     }
    155 
    156     ws.send(chunk, callback);
    157   };
    158 
    159   duplex.on('end', duplexOnEnd);
    160   duplex.on('error', duplexOnError);
    161   return duplex;
    162 }
    163 
    164 module.exports = createWebSocketStream;