twitst4tz

twitter statistics web application
Log | Files | Refs | README | LICENSE

socket.js (11597B)


      1 /**
      2  * Module dependencies.
      3  */
      4 
      5 var EventEmitter = require('events').EventEmitter;
      6 var util = require('util');
      7 var debug = require('debug')('engine:socket');
      8 
      9 /**
     10  * Module exports.
     11  */
     12 
     13 module.exports = Socket;
     14 
     15 /**
     16  * Client class (abstract).
     17  *
     18  * @api private
     19  */
     20 
     21 function Socket (id, server, transport, req) {
     22   this.id = id;
     23   this.server = server;
     24   this.upgrading = false;
     25   this.upgraded = false;
     26   this.readyState = 'opening';
     27   this.writeBuffer = [];
     28   this.packetsFn = [];
     29   this.sentCallbackFn = [];
     30   this.cleanupFn = [];
     31   this.request = req;
     32 
     33   // Cache IP since it might not be in the req later
     34   if (req.websocket && req.websocket._socket) {
     35     this.remoteAddress = req.websocket._socket.remoteAddress;
     36   } else {
     37     this.remoteAddress = req.connection.remoteAddress;
     38   }
     39 
     40   this.checkIntervalTimer = null;
     41   this.upgradeTimeoutTimer = null;
     42   this.pingTimeoutTimer = null;
     43 
     44   this.setTransport(transport);
     45   this.onOpen();
     46 }
     47 
     48 /**
     49  * Inherits from EventEmitter.
     50  */
     51 
     52 util.inherits(Socket, EventEmitter);
     53 
     54 /**
     55  * Called upon transport considered open.
     56  *
     57  * @api private
     58  */
     59 
     60 Socket.prototype.onOpen = function () {
     61   this.readyState = 'open';
     62 
     63   // sends an `open` packet
     64   this.transport.sid = this.id;
     65   this.sendPacket('open', JSON.stringify({
     66     sid: this.id,
     67     upgrades: this.getAvailableUpgrades(),
     68     pingInterval: this.server.pingInterval,
     69     pingTimeout: this.server.pingTimeout
     70   }));
     71 
     72   if (this.server.initialPacket) {
     73     this.sendPacket('message', this.server.initialPacket);
     74   }
     75 
     76   this.emit('open');
     77   this.setPingTimeout();
     78 };
     79 
     80 /**
     81  * Called upon transport packet.
     82  *
     83  * @param {Object} packet
     84  * @api private
     85  */
     86 
     87 Socket.prototype.onPacket = function (packet) {
     88   if ('open' === this.readyState) {
     89     // export packet event
     90     debug('packet');
     91     this.emit('packet', packet);
     92 
     93     // Reset ping timeout on any packet, incoming data is a good sign of
     94     // other side's liveness
     95     this.setPingTimeout();
     96 
     97     switch (packet.type) {
     98       case 'ping':
     99         debug('got ping');
    100         this.sendPacket('pong');
    101         this.emit('heartbeat');
    102         break;
    103 
    104       case 'error':
    105         this.onClose('parse error');
    106         break;
    107 
    108       case 'message':
    109         this.emit('data', packet.data);
    110         this.emit('message', packet.data);
    111         break;
    112     }
    113   } else {
    114     debug('packet received with closed socket');
    115   }
    116 };
    117 
    118 /**
    119  * Called upon transport error.
    120  *
    121  * @param {Error} error object
    122  * @api private
    123  */
    124 
    125 Socket.prototype.onError = function (err) {
    126   debug('transport error');
    127   this.onClose('transport error', err);
    128 };
    129 
    130 /**
    131  * Sets and resets ping timeout timer based on client pings.
    132  *
    133  * @api private
    134  */
    135 
    136 Socket.prototype.setPingTimeout = function () {
    137   var self = this;
    138   clearTimeout(self.pingTimeoutTimer);
    139   self.pingTimeoutTimer = setTimeout(function () {
    140     self.onClose('ping timeout');
    141   }, self.server.pingInterval + self.server.pingTimeout);
    142 };
    143 
    144 /**
    145  * Attaches handlers for the given transport.
    146  *
    147  * @param {Transport} transport
    148  * @api private
    149  */
    150 
    151 Socket.prototype.setTransport = function (transport) {
    152   var onError = this.onError.bind(this);
    153   var onPacket = this.onPacket.bind(this);
    154   var flush = this.flush.bind(this);
    155   var onClose = this.onClose.bind(this, 'transport close');
    156 
    157   this.transport = transport;
    158   this.transport.once('error', onError);
    159   this.transport.on('packet', onPacket);
    160   this.transport.on('drain', flush);
    161   this.transport.once('close', onClose);
    162   // this function will manage packet events (also message callbacks)
    163   this.setupSendCallback();
    164 
    165   this.cleanupFn.push(function () {
    166     transport.removeListener('error', onError);
    167     transport.removeListener('packet', onPacket);
    168     transport.removeListener('drain', flush);
    169     transport.removeListener('close', onClose);
    170   });
    171 };
    172 
    173 /**
    174  * Upgrades socket to the given transport
    175  *
    176  * @param {Transport} transport
    177  * @api private
    178  */
    179 
    180 Socket.prototype.maybeUpgrade = function (transport) {
    181   debug('might upgrade socket transport from "%s" to "%s"'
    182     , this.transport.name, transport.name);
    183 
    184   this.upgrading = true;
    185 
    186   var self = this;
    187 
    188   // set transport upgrade timer
    189   self.upgradeTimeoutTimer = setTimeout(function () {
    190     debug('client did not complete upgrade - closing transport');
    191     cleanup();
    192     if ('open' === transport.readyState) {
    193       transport.close();
    194     }
    195   }, this.server.upgradeTimeout);
    196 
    197   function onPacket (packet) {
    198     if ('ping' === packet.type && 'probe' === packet.data) {
    199       transport.send([{ type: 'pong', data: 'probe' }]);
    200       self.emit('upgrading', transport);
    201       clearInterval(self.checkIntervalTimer);
    202       self.checkIntervalTimer = setInterval(check, 100);
    203     } else if ('upgrade' === packet.type && self.readyState !== 'closed') {
    204       debug('got upgrade packet - upgrading');
    205       cleanup();
    206       self.transport.discard();
    207       self.upgraded = true;
    208       self.clearTransport();
    209       self.setTransport(transport);
    210       self.emit('upgrade', transport);
    211       self.setPingTimeout();
    212       self.flush();
    213       if (self.readyState === 'closing') {
    214         transport.close(function () {
    215           self.onClose('forced close');
    216         });
    217       }
    218     } else {
    219       cleanup();
    220       transport.close();
    221     }
    222   }
    223 
    224   // we force a polling cycle to ensure a fast upgrade
    225   function check () {
    226     if ('polling' === self.transport.name && self.transport.writable) {
    227       debug('writing a noop packet to polling for fast upgrade');
    228       self.transport.send([{ type: 'noop' }]);
    229     }
    230   }
    231 
    232   function cleanup () {
    233     self.upgrading = false;
    234 
    235     clearInterval(self.checkIntervalTimer);
    236     self.checkIntervalTimer = null;
    237 
    238     clearTimeout(self.upgradeTimeoutTimer);
    239     self.upgradeTimeoutTimer = null;
    240 
    241     transport.removeListener('packet', onPacket);
    242     transport.removeListener('close', onTransportClose);
    243     transport.removeListener('error', onError);
    244     self.removeListener('close', onClose);
    245   }
    246 
    247   function onError (err) {
    248     debug('client did not complete upgrade - %s', err);
    249     cleanup();
    250     transport.close();
    251     transport = null;
    252   }
    253 
    254   function onTransportClose () {
    255     onError('transport closed');
    256   }
    257 
    258   function onClose () {
    259     onError('socket closed');
    260   }
    261 
    262   transport.on('packet', onPacket);
    263   transport.once('close', onTransportClose);
    264   transport.once('error', onError);
    265 
    266   self.once('close', onClose);
    267 };
    268 
    269 /**
    270  * Clears listeners and timers associated with current transport.
    271  *
    272  * @api private
    273  */
    274 
    275 Socket.prototype.clearTransport = function () {
    276   var cleanup;
    277 
    278   var toCleanUp = this.cleanupFn.length;
    279 
    280   for (var i = 0; i < toCleanUp; i++) {
    281     cleanup = this.cleanupFn.shift();
    282     cleanup();
    283   }
    284 
    285   // silence further transport errors and prevent uncaught exceptions
    286   this.transport.on('error', function () {
    287     debug('error triggered by discarded transport');
    288   });
    289 
    290   // ensure transport won't stay open
    291   this.transport.close();
    292 
    293   clearTimeout(this.pingTimeoutTimer);
    294 };
    295 
    296 /**
    297  * Called upon transport considered closed.
    298  * Possible reasons: `ping timeout`, `client error`, `parse error`,
    299  * `transport error`, `server close`, `transport close`
    300  */
    301 
    302 Socket.prototype.onClose = function (reason, description) {
    303   if ('closed' !== this.readyState) {
    304     this.readyState = 'closed';
    305     clearTimeout(this.pingTimeoutTimer);
    306     clearInterval(this.checkIntervalTimer);
    307     this.checkIntervalTimer = null;
    308     clearTimeout(this.upgradeTimeoutTimer);
    309     var self = this;
    310     // clean writeBuffer in next tick, so developers can still
    311     // grab the writeBuffer on 'close' event
    312     process.nextTick(function () {
    313       self.writeBuffer = [];
    314     });
    315     this.packetsFn = [];
    316     this.sentCallbackFn = [];
    317     this.clearTransport();
    318     this.emit('close', reason, description);
    319   }
    320 };
    321 
    322 /**
    323  * Setup and manage send callback
    324  *
    325  * @api private
    326  */
    327 
    328 Socket.prototype.setupSendCallback = function () {
    329   var self = this;
    330   this.transport.on('drain', onDrain);
    331 
    332   this.cleanupFn.push(function () {
    333     self.transport.removeListener('drain', onDrain);
    334   });
    335 
    336   // the message was sent successfully, execute the callback
    337   function onDrain () {
    338     if (self.sentCallbackFn.length > 0) {
    339       var seqFn = self.sentCallbackFn.splice(0, 1)[0];
    340       if ('function' === typeof seqFn) {
    341         debug('executing send callback');
    342         seqFn(self.transport);
    343       } else if (Array.isArray(seqFn)) {
    344         debug('executing batch send callback');
    345         for (var l = seqFn.length, i = 0; i < l; i++) {
    346           if ('function' === typeof seqFn[i]) {
    347             seqFn[i](self.transport);
    348           }
    349         }
    350       }
    351     }
    352   }
    353 };
    354 
    355 /**
    356  * Sends a message packet.
    357  *
    358  * @param {String} message
    359  * @param {Object} options
    360  * @param {Function} callback
    361  * @return {Socket} for chaining
    362  * @api public
    363  */
    364 
    365 Socket.prototype.send =
    366 Socket.prototype.write = function (data, options, callback) {
    367   this.sendPacket('message', data, options, callback);
    368   return this;
    369 };
    370 
    371 /**
    372  * Sends a packet.
    373  *
    374  * @param {String} packet type
    375  * @param {String} optional, data
    376  * @param {Object} options
    377  * @api private
    378  */
    379 
    380 Socket.prototype.sendPacket = function (type, data, options, callback) {
    381   if ('function' === typeof options) {
    382     callback = options;
    383     options = null;
    384   }
    385 
    386   options = options || {};
    387   options.compress = false !== options.compress;
    388 
    389   if ('closing' !== this.readyState && 'closed' !== this.readyState) {
    390     debug('sending packet "%s" (%s)', type, data);
    391 
    392     var packet = {
    393       type: type,
    394       options: options
    395     };
    396     if (data) packet.data = data;
    397 
    398     // exports packetCreate event
    399     this.emit('packetCreate', packet);
    400 
    401     this.writeBuffer.push(packet);
    402 
    403     // add send callback to object, if defined
    404     if (callback) this.packetsFn.push(callback);
    405 
    406     this.flush();
    407   }
    408 };
    409 
    410 /**
    411  * Attempts to flush the packets buffer.
    412  *
    413  * @api private
    414  */
    415 
    416 Socket.prototype.flush = function () {
    417   if ('closed' !== this.readyState &&
    418                 this.transport.writable &&
    419                 this.writeBuffer.length) {
    420     debug('flushing buffer to transport');
    421     this.emit('flush', this.writeBuffer);
    422     this.server.emit('flush', this, this.writeBuffer);
    423     var wbuf = this.writeBuffer;
    424     this.writeBuffer = [];
    425     if (!this.transport.supportsFraming) {
    426       this.sentCallbackFn.push(this.packetsFn);
    427     } else {
    428       this.sentCallbackFn.push.apply(this.sentCallbackFn, this.packetsFn);
    429     }
    430     this.packetsFn = [];
    431     this.transport.send(wbuf);
    432     this.emit('drain');
    433     this.server.emit('drain', this);
    434   }
    435 };
    436 
    437 /**
    438  * Get available upgrades for this socket.
    439  *
    440  * @api private
    441  */
    442 
    443 Socket.prototype.getAvailableUpgrades = function () {
    444   var availableUpgrades = [];
    445   var allUpgrades = this.server.upgrades(this.transport.name);
    446   for (var i = 0, l = allUpgrades.length; i < l; ++i) {
    447     var upg = allUpgrades[i];
    448     if (this.server.transports.indexOf(upg) !== -1) {
    449       availableUpgrades.push(upg);
    450     }
    451   }
    452   return availableUpgrades;
    453 };
    454 
    455 /**
    456  * Closes the socket and underlying transport.
    457  *
    458  * @param {Boolean} optional, discard
    459  * @return {Socket} for chaining
    460  * @api public
    461  */
    462 
    463 Socket.prototype.close = function (discard) {
    464   if ('open' !== this.readyState) return;
    465 
    466   this.readyState = 'closing';
    467 
    468   if (this.writeBuffer.length) {
    469     this.once('drain', this.closeTransport.bind(this, discard));
    470     return;
    471   }
    472 
    473   this.closeTransport(discard);
    474 };
    475 
    476 /**
    477  * Closes the underlying transport.
    478  *
    479  * @param {Boolean} discard
    480  * @api private
    481  */
    482 
    483 Socket.prototype.closeTransport = function (discard) {
    484   if (discard) this.transport.discard();
    485   this.transport.close(this.onClose.bind(this, 'forced close'));
    486 };