twitst4tz

twitter statistics web application
Log | Files | Refs | README | LICENSE

manager.js (12520B)


      1 
      2 /**
      3  * Module dependencies.
      4  */
      5 
      6 var eio = require('engine.io-client');
      7 var Socket = require('./socket');
      8 var Emitter = require('component-emitter');
      9 var parser = require('socket.io-parser');
     10 var on = require('./on');
     11 var bind = require('component-bind');
     12 var debug = require('debug')('socket.io-client:manager');
     13 var indexOf = require('indexof');
     14 var Backoff = require('backo2');
     15 
     16 /**
     17  * IE6+ hasOwnProperty
     18  */
     19 
     20 var has = Object.prototype.hasOwnProperty;
     21 
     22 /**
     23  * Module exports
     24  */
     25 
     26 module.exports = Manager;
     27 
     28 /**
     29  * `Manager` constructor.
     30  *
     31  * @param {String} engine instance or engine uri/opts
     32  * @param {Object} options
     33  * @api public
     34  */
     35 
     36 function Manager (uri, opts) {
     37   if (!(this instanceof Manager)) return new Manager(uri, opts);
     38   if (uri && ('object' === typeof uri)) {
     39     opts = uri;
     40     uri = undefined;
     41   }
     42   opts = opts || {};
     43 
     44   opts.path = opts.path || '/socket.io';
     45   this.nsps = {};
     46   this.subs = [];
     47   this.opts = opts;
     48   this.reconnection(opts.reconnection !== false);
     49   this.reconnectionAttempts(opts.reconnectionAttempts || Infinity);
     50   this.reconnectionDelay(opts.reconnectionDelay || 1000);
     51   this.reconnectionDelayMax(opts.reconnectionDelayMax || 5000);
     52   this.randomizationFactor(opts.randomizationFactor || 0.5);
     53   this.backoff = new Backoff({
     54     min: this.reconnectionDelay(),
     55     max: this.reconnectionDelayMax(),
     56     jitter: this.randomizationFactor()
     57   });
     58   this.timeout(null == opts.timeout ? 20000 : opts.timeout);
     59   this.readyState = 'closed';
     60   this.uri = uri;
     61   this.connecting = [];
     62   this.lastPing = null;
     63   this.encoding = false;
     64   this.packetBuffer = [];
     65   var _parser = opts.parser || parser;
     66   this.encoder = new _parser.Encoder();
     67   this.decoder = new _parser.Decoder();
     68   this.autoConnect = opts.autoConnect !== false;
     69   if (this.autoConnect) this.open();
     70 }
     71 
     72 /**
     73  * Propagate given event to sockets and emit on `this`
     74  *
     75  * @api private
     76  */
     77 
     78 Manager.prototype.emitAll = function () {
     79   this.emit.apply(this, arguments);
     80   for (var nsp in this.nsps) {
     81     if (has.call(this.nsps, nsp)) {
     82       this.nsps[nsp].emit.apply(this.nsps[nsp], arguments);
     83     }
     84   }
     85 };
     86 
     87 /**
     88  * Update `socket.id` of all sockets
     89  *
     90  * @api private
     91  */
     92 
     93 Manager.prototype.updateSocketIds = function () {
     94   for (var nsp in this.nsps) {
     95     if (has.call(this.nsps, nsp)) {
     96       this.nsps[nsp].id = this.generateId(nsp);
     97     }
     98   }
     99 };
    100 
    101 /**
    102  * generate `socket.id` for the given `nsp`
    103  *
    104  * @param {String} nsp
    105  * @return {String}
    106  * @api private
    107  */
    108 
    109 Manager.prototype.generateId = function (nsp) {
    110   return (nsp === '/' ? '' : (nsp + '#')) + this.engine.id;
    111 };
    112 
    113 /**
    114  * Mix in `Emitter`.
    115  */
    116 
    117 Emitter(Manager.prototype);
    118 
    119 /**
    120  * Sets the `reconnection` config.
    121  *
    122  * @param {Boolean} true/false if it should automatically reconnect
    123  * @return {Manager} self or value
    124  * @api public
    125  */
    126 
    127 Manager.prototype.reconnection = function (v) {
    128   if (!arguments.length) return this._reconnection;
    129   this._reconnection = !!v;
    130   return this;
    131 };
    132 
    133 /**
    134  * Sets the reconnection attempts config.
    135  *
    136  * @param {Number} max reconnection attempts before giving up
    137  * @return {Manager} self or value
    138  * @api public
    139  */
    140 
    141 Manager.prototype.reconnectionAttempts = function (v) {
    142   if (!arguments.length) return this._reconnectionAttempts;
    143   this._reconnectionAttempts = v;
    144   return this;
    145 };
    146 
    147 /**
    148  * Sets the delay between reconnections.
    149  *
    150  * @param {Number} delay
    151  * @return {Manager} self or value
    152  * @api public
    153  */
    154 
    155 Manager.prototype.reconnectionDelay = function (v) {
    156   if (!arguments.length) return this._reconnectionDelay;
    157   this._reconnectionDelay = v;
    158   this.backoff && this.backoff.setMin(v);
    159   return this;
    160 };
    161 
    162 Manager.prototype.randomizationFactor = function (v) {
    163   if (!arguments.length) return this._randomizationFactor;
    164   this._randomizationFactor = v;
    165   this.backoff && this.backoff.setJitter(v);
    166   return this;
    167 };
    168 
    169 /**
    170  * Sets the maximum delay between reconnections.
    171  *
    172  * @param {Number} delay
    173  * @return {Manager} self or value
    174  * @api public
    175  */
    176 
    177 Manager.prototype.reconnectionDelayMax = function (v) {
    178   if (!arguments.length) return this._reconnectionDelayMax;
    179   this._reconnectionDelayMax = v;
    180   this.backoff && this.backoff.setMax(v);
    181   return this;
    182 };
    183 
    184 /**
    185  * Sets the connection timeout. `false` to disable
    186  *
    187  * @return {Manager} self or value
    188  * @api public
    189  */
    190 
    191 Manager.prototype.timeout = function (v) {
    192   if (!arguments.length) return this._timeout;
    193   this._timeout = v;
    194   return this;
    195 };
    196 
    197 /**
    198  * Starts trying to reconnect if reconnection is enabled and we have not
    199  * started reconnecting yet
    200  *
    201  * @api private
    202  */
    203 
    204 Manager.prototype.maybeReconnectOnOpen = function () {
    205   // Only try to reconnect if it's the first time we're connecting
    206   if (!this.reconnecting && this._reconnection && this.backoff.attempts === 0) {
    207     // keeps reconnection from firing twice for the same reconnection loop
    208     this.reconnect();
    209   }
    210 };
    211 
    212 /**
    213  * Sets the current transport `socket`.
    214  *
    215  * @param {Function} optional, callback
    216  * @return {Manager} self
    217  * @api public
    218  */
    219 
    220 Manager.prototype.open =
    221 Manager.prototype.connect = function (fn, opts) {
    222   debug('readyState %s', this.readyState);
    223   if (~this.readyState.indexOf('open')) return this;
    224 
    225   debug('opening %s', this.uri);
    226   this.engine = eio(this.uri, this.opts);
    227   var socket = this.engine;
    228   var self = this;
    229   this.readyState = 'opening';
    230   this.skipReconnect = false;
    231 
    232   // emit `open`
    233   var openSub = on(socket, 'open', function () {
    234     self.onopen();
    235     fn && fn();
    236   });
    237 
    238   // emit `connect_error`
    239   var errorSub = on(socket, 'error', function (data) {
    240     debug('connect_error');
    241     self.cleanup();
    242     self.readyState = 'closed';
    243     self.emitAll('connect_error', data);
    244     if (fn) {
    245       var err = new Error('Connection error');
    246       err.data = data;
    247       fn(err);
    248     } else {
    249       // Only do this if there is no fn to handle the error
    250       self.maybeReconnectOnOpen();
    251     }
    252   });
    253 
    254   // emit `connect_timeout`
    255   if (false !== this._timeout) {
    256     var timeout = this._timeout;
    257     debug('connect attempt will timeout after %d', timeout);
    258 
    259     // set timer
    260     var timer = setTimeout(function () {
    261       debug('connect attempt timed out after %d', timeout);
    262       openSub.destroy();
    263       socket.close();
    264       socket.emit('error', 'timeout');
    265       self.emitAll('connect_timeout', timeout);
    266     }, timeout);
    267 
    268     this.subs.push({
    269       destroy: function () {
    270         clearTimeout(timer);
    271       }
    272     });
    273   }
    274 
    275   this.subs.push(openSub);
    276   this.subs.push(errorSub);
    277 
    278   return this;
    279 };
    280 
    281 /**
    282  * Called upon transport open.
    283  *
    284  * @api private
    285  */
    286 
    287 Manager.prototype.onopen = function () {
    288   debug('open');
    289 
    290   // clear old subs
    291   this.cleanup();
    292 
    293   // mark as open
    294   this.readyState = 'open';
    295   this.emit('open');
    296 
    297   // add new subs
    298   var socket = this.engine;
    299   this.subs.push(on(socket, 'data', bind(this, 'ondata')));
    300   this.subs.push(on(socket, 'ping', bind(this, 'onping')));
    301   this.subs.push(on(socket, 'pong', bind(this, 'onpong')));
    302   this.subs.push(on(socket, 'error', bind(this, 'onerror')));
    303   this.subs.push(on(socket, 'close', bind(this, 'onclose')));
    304   this.subs.push(on(this.decoder, 'decoded', bind(this, 'ondecoded')));
    305 };
    306 
    307 /**
    308  * Called upon a ping.
    309  *
    310  * @api private
    311  */
    312 
    313 Manager.prototype.onping = function () {
    314   this.lastPing = new Date();
    315   this.emitAll('ping');
    316 };
    317 
    318 /**
    319  * Called upon a packet.
    320  *
    321  * @api private
    322  */
    323 
    324 Manager.prototype.onpong = function () {
    325   this.emitAll('pong', new Date() - this.lastPing);
    326 };
    327 
    328 /**
    329  * Called with data.
    330  *
    331  * @api private
    332  */
    333 
    334 Manager.prototype.ondata = function (data) {
    335   this.decoder.add(data);
    336 };
    337 
    338 /**
    339  * Called when parser fully decodes a packet.
    340  *
    341  * @api private
    342  */
    343 
    344 Manager.prototype.ondecoded = function (packet) {
    345   this.emit('packet', packet);
    346 };
    347 
    348 /**
    349  * Called upon socket error.
    350  *
    351  * @api private
    352  */
    353 
    354 Manager.prototype.onerror = function (err) {
    355   debug('error', err);
    356   this.emitAll('error', err);
    357 };
    358 
    359 /**
    360  * Creates a new socket for the given `nsp`.
    361  *
    362  * @return {Socket}
    363  * @api public
    364  */
    365 
    366 Manager.prototype.socket = function (nsp, opts) {
    367   var socket = this.nsps[nsp];
    368   if (!socket) {
    369     socket = new Socket(this, nsp, opts);
    370     this.nsps[nsp] = socket;
    371     var self = this;
    372     socket.on('connecting', onConnecting);
    373     socket.on('connect', function () {
    374       socket.id = self.generateId(nsp);
    375     });
    376 
    377     if (this.autoConnect) {
    378       // manually call here since connecting event is fired before listening
    379       onConnecting();
    380     }
    381   }
    382 
    383   function onConnecting () {
    384     if (!~indexOf(self.connecting, socket)) {
    385       self.connecting.push(socket);
    386     }
    387   }
    388 
    389   return socket;
    390 };
    391 
    392 /**
    393  * Called upon a socket close.
    394  *
    395  * @param {Socket} socket
    396  */
    397 
    398 Manager.prototype.destroy = function (socket) {
    399   var index = indexOf(this.connecting, socket);
    400   if (~index) this.connecting.splice(index, 1);
    401   if (this.connecting.length) return;
    402 
    403   this.close();
    404 };
    405 
    406 /**
    407  * Writes a packet.
    408  *
    409  * @param {Object} packet
    410  * @api private
    411  */
    412 
    413 Manager.prototype.packet = function (packet) {
    414   debug('writing packet %j', packet);
    415   var self = this;
    416   if (packet.query && packet.type === 0) packet.nsp += '?' + packet.query;
    417 
    418   if (!self.encoding) {
    419     // encode, then write to engine with result
    420     self.encoding = true;
    421     this.encoder.encode(packet, function (encodedPackets) {
    422       for (var i = 0; i < encodedPackets.length; i++) {
    423         self.engine.write(encodedPackets[i], packet.options);
    424       }
    425       self.encoding = false;
    426       self.processPacketQueue();
    427     });
    428   } else { // add packet to the queue
    429     self.packetBuffer.push(packet);
    430   }
    431 };
    432 
    433 /**
    434  * If packet buffer is non-empty, begins encoding the
    435  * next packet in line.
    436  *
    437  * @api private
    438  */
    439 
    440 Manager.prototype.processPacketQueue = function () {
    441   if (this.packetBuffer.length > 0 && !this.encoding) {
    442     var pack = this.packetBuffer.shift();
    443     this.packet(pack);
    444   }
    445 };
    446 
    447 /**
    448  * Clean up transport subscriptions and packet buffer.
    449  *
    450  * @api private
    451  */
    452 
    453 Manager.prototype.cleanup = function () {
    454   debug('cleanup');
    455 
    456   var subsLength = this.subs.length;
    457   for (var i = 0; i < subsLength; i++) {
    458     var sub = this.subs.shift();
    459     sub.destroy();
    460   }
    461 
    462   this.packetBuffer = [];
    463   this.encoding = false;
    464   this.lastPing = null;
    465 
    466   this.decoder.destroy();
    467 };
    468 
    469 /**
    470  * Close the current socket.
    471  *
    472  * @api private
    473  */
    474 
    475 Manager.prototype.close =
    476 Manager.prototype.disconnect = function () {
    477   debug('disconnect');
    478   this.skipReconnect = true;
    479   this.reconnecting = false;
    480   if ('opening' === this.readyState) {
    481     // `onclose` will not fire because
    482     // an open event never happened
    483     this.cleanup();
    484   }
    485   this.backoff.reset();
    486   this.readyState = 'closed';
    487   if (this.engine) this.engine.close();
    488 };
    489 
    490 /**
    491  * Called upon engine close.
    492  *
    493  * @api private
    494  */
    495 
    496 Manager.prototype.onclose = function (reason) {
    497   debug('onclose');
    498 
    499   this.cleanup();
    500   this.backoff.reset();
    501   this.readyState = 'closed';
    502   this.emit('close', reason);
    503 
    504   if (this._reconnection && !this.skipReconnect) {
    505     this.reconnect();
    506   }
    507 };
    508 
    509 /**
    510  * Attempt a reconnection.
    511  *
    512  * @api private
    513  */
    514 
    515 Manager.prototype.reconnect = function () {
    516   if (this.reconnecting || this.skipReconnect) return this;
    517 
    518   var self = this;
    519 
    520   if (this.backoff.attempts >= this._reconnectionAttempts) {
    521     debug('reconnect failed');
    522     this.backoff.reset();
    523     this.emitAll('reconnect_failed');
    524     this.reconnecting = false;
    525   } else {
    526     var delay = this.backoff.duration();
    527     debug('will wait %dms before reconnect attempt', delay);
    528 
    529     this.reconnecting = true;
    530     var timer = setTimeout(function () {
    531       if (self.skipReconnect) return;
    532 
    533       debug('attempting reconnect');
    534       self.emitAll('reconnect_attempt', self.backoff.attempts);
    535       self.emitAll('reconnecting', self.backoff.attempts);
    536 
    537       // check again for the case socket closed in above events
    538       if (self.skipReconnect) return;
    539 
    540       self.open(function (err) {
    541         if (err) {
    542           debug('reconnect attempt error');
    543           self.reconnecting = false;
    544           self.reconnect();
    545           self.emitAll('reconnect_error', err.data);
    546         } else {
    547           debug('reconnect success');
    548           self.onreconnect();
    549         }
    550       });
    551     }, delay);
    552 
    553     this.subs.push({
    554       destroy: function () {
    555         clearTimeout(timer);
    556       }
    557     });
    558   }
    559 };
    560 
    561 /**
    562  * Called upon successful reconnect.
    563  *
    564  * @api private
    565  */
    566 
    567 Manager.prototype.onreconnect = function () {
    568   var attempt = this.backoff.attempts;
    569   this.reconnecting = false;
    570   this.backoff.reset();
    571   this.updateSocketIds();
    572   this.emitAll('reconnect', attempt);
    573 };