twitst4tz

twitter statistics web application
Log | Files | Refs | README | LICENSE

socket.js (19106B)


      1 /**
      2  * Module dependencies.
      3  */
      4 
      5 var transports = require('./transports/index');
      6 var Emitter = require('component-emitter');
      7 var debug = require('debug')('engine.io-client:socket');
      8 var index = require('indexof');
      9 var parser = require('engine.io-parser');
     10 var parseuri = require('parseuri');
     11 var parseqs = require('parseqs');
     12 
     13 /**
     14  * Module exports.
     15  */
     16 
     17 module.exports = Socket;
     18 
     19 /**
     20  * Socket constructor.
     21  *
     22  * @param {String|Object} uri or options
     23  * @param {Object} options
     24  * @api public
     25  */
     26 
     27 function Socket (uri, opts) {
     28   if (!(this instanceof Socket)) return new Socket(uri, opts);
     29 
     30   opts = opts || {};
     31 
     32   if (uri && 'object' === typeof uri) {
     33     opts = uri;
     34     uri = null;
     35   }
     36 
     37   if (uri) {
     38     uri = parseuri(uri);
     39     opts.hostname = uri.host;
     40     opts.secure = uri.protocol === 'https' || uri.protocol === 'wss';
     41     opts.port = uri.port;
     42     if (uri.query) opts.query = uri.query;
     43   } else if (opts.host) {
     44     opts.hostname = parseuri(opts.host).host;
     45   }
     46 
     47   this.secure = null != opts.secure ? opts.secure
     48     : (typeof location !== 'undefined' && 'https:' === location.protocol);
     49 
     50   if (opts.hostname && !opts.port) {
     51     // if no port is specified manually, use the protocol default
     52     opts.port = this.secure ? '443' : '80';
     53   }
     54 
     55   this.agent = opts.agent || false;
     56   this.hostname = opts.hostname ||
     57     (typeof location !== 'undefined' ? location.hostname : 'localhost');
     58   this.port = opts.port || (typeof location !== 'undefined' && location.port
     59       ? location.port
     60       : (this.secure ? 443 : 80));
     61   this.query = opts.query || {};
     62   if ('string' === typeof this.query) this.query = parseqs.decode(this.query);
     63   this.upgrade = false !== opts.upgrade;
     64   this.path = (opts.path || '/engine.io').replace(/\/$/, '') + '/';
     65   this.forceJSONP = !!opts.forceJSONP;
     66   this.jsonp = false !== opts.jsonp;
     67   this.forceBase64 = !!opts.forceBase64;
     68   this.enablesXDR = !!opts.enablesXDR;
     69   this.withCredentials = false !== opts.withCredentials;
     70   this.timestampParam = opts.timestampParam || 't';
     71   this.timestampRequests = opts.timestampRequests;
     72   this.transports = opts.transports || ['polling', 'websocket'];
     73   this.transportOptions = opts.transportOptions || {};
     74   this.readyState = '';
     75   this.writeBuffer = [];
     76   this.prevBufferLen = 0;
     77   this.policyPort = opts.policyPort || 843;
     78   this.rememberUpgrade = opts.rememberUpgrade || false;
     79   this.binaryType = null;
     80   this.onlyBinaryUpgrades = opts.onlyBinaryUpgrades;
     81   this.perMessageDeflate = false !== opts.perMessageDeflate ? (opts.perMessageDeflate || {}) : false;
     82 
     83   if (true === this.perMessageDeflate) this.perMessageDeflate = {};
     84   if (this.perMessageDeflate && null == this.perMessageDeflate.threshold) {
     85     this.perMessageDeflate.threshold = 1024;
     86   }
     87 
     88   // SSL options for Node.js client
     89   this.pfx = opts.pfx || null;
     90   this.key = opts.key || null;
     91   this.passphrase = opts.passphrase || null;
     92   this.cert = opts.cert || null;
     93   this.ca = opts.ca || null;
     94   this.ciphers = opts.ciphers || null;
     95   this.rejectUnauthorized = opts.rejectUnauthorized === undefined ? true : opts.rejectUnauthorized;
     96   this.forceNode = !!opts.forceNode;
     97 
     98   // detect ReactNative environment
     99   this.isReactNative = (typeof navigator !== 'undefined' && typeof navigator.product === 'string' && navigator.product.toLowerCase() === 'reactnative');
    100 
    101   // other options for Node.js or ReactNative client
    102   if (typeof self === 'undefined' || this.isReactNative) {
    103     if (opts.extraHeaders && Object.keys(opts.extraHeaders).length > 0) {
    104       this.extraHeaders = opts.extraHeaders;
    105     }
    106 
    107     if (opts.localAddress) {
    108       this.localAddress = opts.localAddress;
    109     }
    110   }
    111 
    112   // set on handshake
    113   this.id = null;
    114   this.upgrades = null;
    115   this.pingInterval = null;
    116   this.pingTimeout = null;
    117 
    118   // set on heartbeat
    119   this.pingIntervalTimer = null;
    120   this.pingTimeoutTimer = null;
    121 
    122   this.open();
    123 }
    124 
    125 Socket.priorWebsocketSuccess = false;
    126 
    127 /**
    128  * Mix in `Emitter`.
    129  */
    130 
    131 Emitter(Socket.prototype);
    132 
    133 /**
    134  * Protocol version.
    135  *
    136  * @api public
    137  */
    138 
    139 Socket.protocol = parser.protocol; // this is an int
    140 
    141 /**
    142  * Expose deps for legacy compatibility
    143  * and standalone browser access.
    144  */
    145 
    146 Socket.Socket = Socket;
    147 Socket.Transport = require('./transport');
    148 Socket.transports = require('./transports/index');
    149 Socket.parser = require('engine.io-parser');
    150 
    151 /**
    152  * Creates transport of the given type.
    153  *
    154  * @param {String} transport name
    155  * @return {Transport}
    156  * @api private
    157  */
    158 
    159 Socket.prototype.createTransport = function (name) {
    160   debug('creating transport "%s"', name);
    161   var query = clone(this.query);
    162 
    163   // append engine.io protocol identifier
    164   query.EIO = parser.protocol;
    165 
    166   // transport name
    167   query.transport = name;
    168 
    169   // per-transport options
    170   var options = this.transportOptions[name] || {};
    171 
    172   // session id if we already have one
    173   if (this.id) query.sid = this.id;
    174 
    175   var transport = new transports[name]({
    176     query: query,
    177     socket: this,
    178     agent: options.agent || this.agent,
    179     hostname: options.hostname || this.hostname,
    180     port: options.port || this.port,
    181     secure: options.secure || this.secure,
    182     path: options.path || this.path,
    183     forceJSONP: options.forceJSONP || this.forceJSONP,
    184     jsonp: options.jsonp || this.jsonp,
    185     forceBase64: options.forceBase64 || this.forceBase64,
    186     enablesXDR: options.enablesXDR || this.enablesXDR,
    187     withCredentials: options.withCredentials || this.withCredentials,
    188     timestampRequests: options.timestampRequests || this.timestampRequests,
    189     timestampParam: options.timestampParam || this.timestampParam,
    190     policyPort: options.policyPort || this.policyPort,
    191     pfx: options.pfx || this.pfx,
    192     key: options.key || this.key,
    193     passphrase: options.passphrase || this.passphrase,
    194     cert: options.cert || this.cert,
    195     ca: options.ca || this.ca,
    196     ciphers: options.ciphers || this.ciphers,
    197     rejectUnauthorized: options.rejectUnauthorized || this.rejectUnauthorized,
    198     perMessageDeflate: options.perMessageDeflate || this.perMessageDeflate,
    199     extraHeaders: options.extraHeaders || this.extraHeaders,
    200     forceNode: options.forceNode || this.forceNode,
    201     localAddress: options.localAddress || this.localAddress,
    202     requestTimeout: options.requestTimeout || this.requestTimeout,
    203     protocols: options.protocols || void (0),
    204     isReactNative: this.isReactNative
    205   });
    206 
    207   return transport;
    208 };
    209 
    210 function clone (obj) {
    211   var o = {};
    212   for (var i in obj) {
    213     if (obj.hasOwnProperty(i)) {
    214       o[i] = obj[i];
    215     }
    216   }
    217   return o;
    218 }
    219 
    220 /**
    221  * Initializes transport to use and starts probe.
    222  *
    223  * @api private
    224  */
    225 Socket.prototype.open = function () {
    226   var transport;
    227   if (this.rememberUpgrade && Socket.priorWebsocketSuccess && this.transports.indexOf('websocket') !== -1) {
    228     transport = 'websocket';
    229   } else if (0 === this.transports.length) {
    230     // Emit error on next tick so it can be listened to
    231     var self = this;
    232     setTimeout(function () {
    233       self.emit('error', 'No transports available');
    234     }, 0);
    235     return;
    236   } else {
    237     transport = this.transports[0];
    238   }
    239   this.readyState = 'opening';
    240 
    241   // Retry with the next transport if the transport is disabled (jsonp: false)
    242   try {
    243     transport = this.createTransport(transport);
    244   } catch (e) {
    245     this.transports.shift();
    246     this.open();
    247     return;
    248   }
    249 
    250   transport.open();
    251   this.setTransport(transport);
    252 };
    253 
    254 /**
    255  * Sets the current transport. Disables the existing one (if any).
    256  *
    257  * @api private
    258  */
    259 
    260 Socket.prototype.setTransport = function (transport) {
    261   debug('setting transport %s', transport.name);
    262   var self = this;
    263 
    264   if (this.transport) {
    265     debug('clearing existing transport %s', this.transport.name);
    266     this.transport.removeAllListeners();
    267   }
    268 
    269   // set up transport
    270   this.transport = transport;
    271 
    272   // set up transport listeners
    273   transport
    274   .on('drain', function () {
    275     self.onDrain();
    276   })
    277   .on('packet', function (packet) {
    278     self.onPacket(packet);
    279   })
    280   .on('error', function (e) {
    281     self.onError(e);
    282   })
    283   .on('close', function () {
    284     self.onClose('transport close');
    285   });
    286 };
    287 
    288 /**
    289  * Probes a transport.
    290  *
    291  * @param {String} transport name
    292  * @api private
    293  */
    294 
    295 Socket.prototype.probe = function (name) {
    296   debug('probing transport "%s"', name);
    297   var transport = this.createTransport(name, { probe: 1 });
    298   var failed = false;
    299   var self = this;
    300 
    301   Socket.priorWebsocketSuccess = false;
    302 
    303   function onTransportOpen () {
    304     if (self.onlyBinaryUpgrades) {
    305       var upgradeLosesBinary = !this.supportsBinary && self.transport.supportsBinary;
    306       failed = failed || upgradeLosesBinary;
    307     }
    308     if (failed) return;
    309 
    310     debug('probe transport "%s" opened', name);
    311     transport.send([{ type: 'ping', data: 'probe' }]);
    312     transport.once('packet', function (msg) {
    313       if (failed) return;
    314       if ('pong' === msg.type && 'probe' === msg.data) {
    315         debug('probe transport "%s" pong', name);
    316         self.upgrading = true;
    317         self.emit('upgrading', transport);
    318         if (!transport) return;
    319         Socket.priorWebsocketSuccess = 'websocket' === transport.name;
    320 
    321         debug('pausing current transport "%s"', self.transport.name);
    322         self.transport.pause(function () {
    323           if (failed) return;
    324           if ('closed' === self.readyState) return;
    325           debug('changing transport and sending upgrade packet');
    326 
    327           cleanup();
    328 
    329           self.setTransport(transport);
    330           transport.send([{ type: 'upgrade' }]);
    331           self.emit('upgrade', transport);
    332           transport = null;
    333           self.upgrading = false;
    334           self.flush();
    335         });
    336       } else {
    337         debug('probe transport "%s" failed', name);
    338         var err = new Error('probe error');
    339         err.transport = transport.name;
    340         self.emit('upgradeError', err);
    341       }
    342     });
    343   }
    344 
    345   function freezeTransport () {
    346     if (failed) return;
    347 
    348     // Any callback called by transport should be ignored since now
    349     failed = true;
    350 
    351     cleanup();
    352 
    353     transport.close();
    354     transport = null;
    355   }
    356 
    357   // Handle any error that happens while probing
    358   function onerror (err) {
    359     var error = new Error('probe error: ' + err);
    360     error.transport = transport.name;
    361 
    362     freezeTransport();
    363 
    364     debug('probe transport "%s" failed because of error: %s', name, err);
    365 
    366     self.emit('upgradeError', error);
    367   }
    368 
    369   function onTransportClose () {
    370     onerror('transport closed');
    371   }
    372 
    373   // When the socket is closed while we're probing
    374   function onclose () {
    375     onerror('socket closed');
    376   }
    377 
    378   // When the socket is upgraded while we're probing
    379   function onupgrade (to) {
    380     if (transport && to.name !== transport.name) {
    381       debug('"%s" works - aborting "%s"', to.name, transport.name);
    382       freezeTransport();
    383     }
    384   }
    385 
    386   // Remove all listeners on the transport and on self
    387   function cleanup () {
    388     transport.removeListener('open', onTransportOpen);
    389     transport.removeListener('error', onerror);
    390     transport.removeListener('close', onTransportClose);
    391     self.removeListener('close', onclose);
    392     self.removeListener('upgrading', onupgrade);
    393   }
    394 
    395   transport.once('open', onTransportOpen);
    396   transport.once('error', onerror);
    397   transport.once('close', onTransportClose);
    398 
    399   this.once('close', onclose);
    400   this.once('upgrading', onupgrade);
    401 
    402   transport.open();
    403 };
    404 
    405 /**
    406  * Called when connection is deemed open.
    407  *
    408  * @api public
    409  */
    410 
    411 Socket.prototype.onOpen = function () {
    412   debug('socket open');
    413   this.readyState = 'open';
    414   Socket.priorWebsocketSuccess = 'websocket' === this.transport.name;
    415   this.emit('open');
    416   this.flush();
    417 
    418   // we check for `readyState` in case an `open`
    419   // listener already closed the socket
    420   if ('open' === this.readyState && this.upgrade && this.transport.pause) {
    421     debug('starting upgrade probes');
    422     for (var i = 0, l = this.upgrades.length; i < l; i++) {
    423       this.probe(this.upgrades[i]);
    424     }
    425   }
    426 };
    427 
    428 /**
    429  * Handles a packet.
    430  *
    431  * @api private
    432  */
    433 
    434 Socket.prototype.onPacket = function (packet) {
    435   if ('opening' === this.readyState || 'open' === this.readyState ||
    436       'closing' === this.readyState) {
    437     debug('socket receive: type "%s", data "%s"', packet.type, packet.data);
    438 
    439     this.emit('packet', packet);
    440 
    441     // Socket is live - any packet counts
    442     this.emit('heartbeat');
    443 
    444     switch (packet.type) {
    445       case 'open':
    446         this.onHandshake(JSON.parse(packet.data));
    447         break;
    448 
    449       case 'pong':
    450         this.setPing();
    451         this.emit('pong');
    452         break;
    453 
    454       case 'error':
    455         var err = new Error('server error');
    456         err.code = packet.data;
    457         this.onError(err);
    458         break;
    459 
    460       case 'message':
    461         this.emit('data', packet.data);
    462         this.emit('message', packet.data);
    463         break;
    464     }
    465   } else {
    466     debug('packet received with socket readyState "%s"', this.readyState);
    467   }
    468 };
    469 
    470 /**
    471  * Called upon handshake completion.
    472  *
    473  * @param {Object} handshake obj
    474  * @api private
    475  */
    476 
    477 Socket.prototype.onHandshake = function (data) {
    478   this.emit('handshake', data);
    479   this.id = data.sid;
    480   this.transport.query.sid = data.sid;
    481   this.upgrades = this.filterUpgrades(data.upgrades);
    482   this.pingInterval = data.pingInterval;
    483   this.pingTimeout = data.pingTimeout;
    484   this.onOpen();
    485   // In case open handler closes socket
    486   if ('closed' === this.readyState) return;
    487   this.setPing();
    488 
    489   // Prolong liveness of socket on heartbeat
    490   this.removeListener('heartbeat', this.onHeartbeat);
    491   this.on('heartbeat', this.onHeartbeat);
    492 };
    493 
    494 /**
    495  * Resets ping timeout.
    496  *
    497  * @api private
    498  */
    499 
    500 Socket.prototype.onHeartbeat = function (timeout) {
    501   clearTimeout(this.pingTimeoutTimer);
    502   var self = this;
    503   self.pingTimeoutTimer = setTimeout(function () {
    504     if ('closed' === self.readyState) return;
    505     self.onClose('ping timeout');
    506   }, timeout || (self.pingInterval + self.pingTimeout));
    507 };
    508 
    509 /**
    510  * Pings server every `this.pingInterval` and expects response
    511  * within `this.pingTimeout` or closes connection.
    512  *
    513  * @api private
    514  */
    515 
    516 Socket.prototype.setPing = function () {
    517   var self = this;
    518   clearTimeout(self.pingIntervalTimer);
    519   self.pingIntervalTimer = setTimeout(function () {
    520     debug('writing ping packet - expecting pong within %sms', self.pingTimeout);
    521     self.ping();
    522     self.onHeartbeat(self.pingTimeout);
    523   }, self.pingInterval);
    524 };
    525 
    526 /**
    527 * Sends a ping packet.
    528 *
    529 * @api private
    530 */
    531 
    532 Socket.prototype.ping = function () {
    533   var self = this;
    534   this.sendPacket('ping', function () {
    535     self.emit('ping');
    536   });
    537 };
    538 
    539 /**
    540  * Called on `drain` event
    541  *
    542  * @api private
    543  */
    544 
    545 Socket.prototype.onDrain = function () {
    546   this.writeBuffer.splice(0, this.prevBufferLen);
    547 
    548   // setting prevBufferLen = 0 is very important
    549   // for example, when upgrading, upgrade packet is sent over,
    550   // and a nonzero prevBufferLen could cause problems on `drain`
    551   this.prevBufferLen = 0;
    552 
    553   if (0 === this.writeBuffer.length) {
    554     this.emit('drain');
    555   } else {
    556     this.flush();
    557   }
    558 };
    559 
    560 /**
    561  * Flush write buffers.
    562  *
    563  * @api private
    564  */
    565 
    566 Socket.prototype.flush = function () {
    567   if ('closed' !== this.readyState && this.transport.writable &&
    568     !this.upgrading && this.writeBuffer.length) {
    569     debug('flushing %d packets in socket', this.writeBuffer.length);
    570     this.transport.send(this.writeBuffer);
    571     // keep track of current length of writeBuffer
    572     // splice writeBuffer and callbackBuffer on `drain`
    573     this.prevBufferLen = this.writeBuffer.length;
    574     this.emit('flush');
    575   }
    576 };
    577 
    578 /**
    579  * Sends a message.
    580  *
    581  * @param {String} message.
    582  * @param {Function} callback function.
    583  * @param {Object} options.
    584  * @return {Socket} for chaining.
    585  * @api public
    586  */
    587 
    588 Socket.prototype.write =
    589 Socket.prototype.send = function (msg, options, fn) {
    590   this.sendPacket('message', msg, options, fn);
    591   return this;
    592 };
    593 
    594 /**
    595  * Sends a packet.
    596  *
    597  * @param {String} packet type.
    598  * @param {String} data.
    599  * @param {Object} options.
    600  * @param {Function} callback function.
    601  * @api private
    602  */
    603 
    604 Socket.prototype.sendPacket = function (type, data, options, fn) {
    605   if ('function' === typeof data) {
    606     fn = data;
    607     data = undefined;
    608   }
    609 
    610   if ('function' === typeof options) {
    611     fn = options;
    612     options = null;
    613   }
    614 
    615   if ('closing' === this.readyState || 'closed' === this.readyState) {
    616     return;
    617   }
    618 
    619   options = options || {};
    620   options.compress = false !== options.compress;
    621 
    622   var packet = {
    623     type: type,
    624     data: data,
    625     options: options
    626   };
    627   this.emit('packetCreate', packet);
    628   this.writeBuffer.push(packet);
    629   if (fn) this.once('flush', fn);
    630   this.flush();
    631 };
    632 
    633 /**
    634  * Closes the connection.
    635  *
    636  * @api private
    637  */
    638 
    639 Socket.prototype.close = function () {
    640   if ('opening' === this.readyState || 'open' === this.readyState) {
    641     this.readyState = 'closing';
    642 
    643     var self = this;
    644 
    645     if (this.writeBuffer.length) {
    646       this.once('drain', function () {
    647         if (this.upgrading) {
    648           waitForUpgrade();
    649         } else {
    650           close();
    651         }
    652       });
    653     } else if (this.upgrading) {
    654       waitForUpgrade();
    655     } else {
    656       close();
    657     }
    658   }
    659 
    660   function close () {
    661     self.onClose('forced close');
    662     debug('socket closing - telling transport to close');
    663     self.transport.close();
    664   }
    665 
    666   function cleanupAndClose () {
    667     self.removeListener('upgrade', cleanupAndClose);
    668     self.removeListener('upgradeError', cleanupAndClose);
    669     close();
    670   }
    671 
    672   function waitForUpgrade () {
    673     // wait for upgrade to finish since we can't send packets while pausing a transport
    674     self.once('upgrade', cleanupAndClose);
    675     self.once('upgradeError', cleanupAndClose);
    676   }
    677 
    678   return this;
    679 };
    680 
    681 /**
    682  * Called upon transport error
    683  *
    684  * @api private
    685  */
    686 
    687 Socket.prototype.onError = function (err) {
    688   debug('socket error %j', err);
    689   Socket.priorWebsocketSuccess = false;
    690   this.emit('error', err);
    691   this.onClose('transport error', err);
    692 };
    693 
    694 /**
    695  * Called upon transport close.
    696  *
    697  * @api private
    698  */
    699 
    700 Socket.prototype.onClose = function (reason, desc) {
    701   if ('opening' === this.readyState || 'open' === this.readyState || 'closing' === this.readyState) {
    702     debug('socket close with reason: "%s"', reason);
    703     var self = this;
    704 
    705     // clear timers
    706     clearTimeout(this.pingIntervalTimer);
    707     clearTimeout(this.pingTimeoutTimer);
    708 
    709     // stop event from firing again for transport
    710     this.transport.removeAllListeners('close');
    711 
    712     // ensure transport won't stay open
    713     this.transport.close();
    714 
    715     // ignore further transport communication
    716     this.transport.removeAllListeners();
    717 
    718     // set ready state
    719     this.readyState = 'closed';
    720 
    721     // clear session id
    722     this.id = null;
    723 
    724     // emit close event
    725     this.emit('close', reason, desc);
    726 
    727     // clean buffers after, so users can still
    728     // grab the buffers on `close` event
    729     self.writeBuffer = [];
    730     self.prevBufferLen = 0;
    731   }
    732 };
    733 
    734 /**
    735  * Filters upgrades, returning only those matching client transports.
    736  *
    737  * @param {Array} server upgrades
    738  * @api private
    739  *
    740  */
    741 
    742 Socket.prototype.filterUpgrades = function (upgrades) {
    743   var filteredUpgrades = [];
    744   for (var i = 0, j = upgrades.length; i < j; i++) {
    745     if (~index(this.transports, upgrades[i])) filteredUpgrades.push(upgrades[i]);
    746   }
    747   return filteredUpgrades;
    748 };