twitst4tz

twitter statistics web application
Log | Files | Refs | README | LICENSE

streaming-api-connection.js (12850B)


      1 
      2 var EventEmitter = require('events').EventEmitter;
      3 var util = require('util');
      4 
      5 var helpers = require('./helpers')
      6 var Parser = require('./parser');
      7 var request = require('request');
      8 
      9 var STATUS_CODES_TO_ABORT_ON = require('./settings').STATUS_CODES_TO_ABORT_ON
     10 
     11 var StreamingAPIConnection = function (reqOpts, twitOptions) {
     12   this.reqOpts = reqOpts
     13   this.twitOptions = twitOptions
     14   this._twitter_time_minus_local_time_ms = 0
     15   EventEmitter.call(this)
     16 }
     17 
     18 util.inherits(StreamingAPIConnection, EventEmitter)
     19 
     20 /**
     21  * Resets the connection.
     22  * - clears request, response, parser
     23  * - removes scheduled reconnect handle (if one was scheduled)
     24  * - stops the stall abort timeout handle (if one was scheduled)
     25  */
     26 StreamingAPIConnection.prototype._resetConnection = function () {
     27   if (this.request) {
     28     // clear our reference to the `request` instance
     29     this.request.removeAllListeners();
     30     this.request.destroy();
     31   }
     32 
     33   if (this.response) {
     34     // clear our reference to the http.IncomingMessage instance
     35     this.response.removeAllListeners();
     36     this.response.destroy();
     37   }
     38 
     39   if (this.parser) {
     40     this.parser.removeAllListeners()
     41   }
     42 
     43   // ensure a scheduled reconnect does not occur (if one was scheduled)
     44   // this can happen if we get a close event before .stop() is called
     45   clearTimeout(this._scheduledReconnect)
     46   delete this._scheduledReconnect
     47 
     48   // clear our stall abort timeout
     49   this._stopStallAbortTimeout()
     50 }
     51 
     52 /**
     53  * Resets the parameters used in determining the next reconnect time
     54  */
     55 StreamingAPIConnection.prototype._resetRetryParams = function () {
     56   // delay for next reconnection attempt
     57   this._connectInterval = 0
     58   // flag indicating whether we used a 0-delay reconnect
     59   this._usedFirstReconnect = false
     60 }
     61 
     62 StreamingAPIConnection.prototype._startPersistentConnection = function () {
     63   var self = this;
     64   self._resetConnection();
     65   self._setupParser();
     66   self._resetStallAbortTimeout();
     67   self._setOauthTimestamp();
     68   this.reqOpts.encoding = 'utf8'
     69   self.request = request.post(this.reqOpts);
     70   self.emit('connect', self.request);
     71   self.request.on('response', function (response) {
     72     self._updateOauthTimestampOffsetFromResponse(response)
     73     // reset our reconnection attempt flag so next attempt goes through with 0 delay
     74     // if we get a transport-level error
     75     self._usedFirstReconnect = false;
     76     // start a stall abort timeout handle
     77     self._resetStallAbortTimeout();
     78     self.response = response
     79     if (STATUS_CODES_TO_ABORT_ON.indexOf(self.response.statusCode) !== -1) {
     80       // We got a status code telling us we should abort the connection.
     81       // Read the body from the response and return an error to the user.
     82       var body = '';
     83 
     84       self.request.on('data', function (chunk) {
     85         body += chunk;
     86       })
     87 
     88       self.request.on('end', function () {
     89         try {
     90           body = JSON.parse(body)
     91         } catch (jsonDecodeError) {
     92           // Twitter may send an HTML body
     93           // if non-JSON text was returned, we'll just attach it to the error as-is
     94         }
     95         // surface the error to the user
     96         var error = helpers.makeTwitError('Bad Twitter streaming request: ' + self.response.statusCode)
     97         error.statusCode = response ? response.statusCode: null;
     98         helpers.attachBodyInfoToError(error, body)
     99         self.emit('error', error);
    100         // stop the stream explicitly so we don't reconnect
    101         self.stop()
    102         body = null;
    103       });
    104       self.request.on('error', function (err) {
    105         var twitErr = helpers.makeTwitError(err.message);
    106         twitErr.statusCode = self.response.statusCode;
    107         helpers.attachBodyInfoToError(twitErr, body);
    108         self.emit('parser-error', twitErr);
    109       });
    110     } else if (self.response.statusCode === 420) {
    111       // close the connection forcibly so a reconnect is scheduled by `self.onClose()`
    112       self._scheduleReconnect();
    113     } else {
    114       // We got an OK status code - the response should be valid.
    115       // Read the body from the response and return to the user.
    116       //pass all response data to parser
    117       self.request.on('data', function(data) {
    118         self._connectInterval = 0
    119         self._resetStallAbortTimeout();
    120         self.parser.parse(data);
    121       })
    122 
    123       self.response.on('close', self._onClose.bind(self))
    124       self.response.on('error', function (err) {
    125         // expose response errors on twit instance
    126         self.emit('error', err);
    127       })
    128 
    129       // connected without an error response from Twitter, emit `connected` event
    130       // this must be emitted after all its event handlers are bound
    131       // so the reference to `self.response` is not interfered-with by the user until it is emitted
    132       self.emit('connected', self.response);
    133     }
    134   });
    135   self.request.on('close', self._onClose.bind(self));
    136   self.request.on('error', function (err) { self._scheduleReconnect.bind(self) });
    137   return self;
    138 }
    139 
    140 /**
    141  * Handle when the request or response closes.
    142  * Schedule a reconnect according to Twitter's reconnect guidelines
    143  *
    144  */
    145 StreamingAPIConnection.prototype._onClose = function () {
    146   var self = this;
    147   self._stopStallAbortTimeout();
    148   if (self._scheduledReconnect) {
    149     // if we already have a reconnect scheduled, don't schedule another one.
    150     // this race condition can happen if the http.ClientRequest and http.IncomingMessage both emit `close`
    151     return
    152   }
    153 
    154   self._scheduleReconnect();
    155 }
    156 
    157 /**
    158  * Kick off the http request, and persist the connection
    159  *
    160  */
    161 StreamingAPIConnection.prototype.start = function () {
    162   this._resetRetryParams();
    163   this._startPersistentConnection();
    164   return this;
    165 }
    166 
    167 /**
    168  * Abort the http request, stop scheduled reconnect (if one was scheduled) and clear state
    169  *
    170  */
    171 StreamingAPIConnection.prototype.stop = function () {
    172   // clear connection variables and timeout handles
    173   this._resetConnection();
    174   this._resetRetryParams();
    175   return this;
    176 }
    177 
    178 /**
    179  * Stop and restart the stall abort timer (called when new data is received)
    180  *
    181  * If we go 90s without receiving data from twitter, we abort the request & reconnect.
    182  */
    183 StreamingAPIConnection.prototype._resetStallAbortTimeout = function () {
    184   var self = this;
    185   // stop the previous stall abort timer
    186   self._stopStallAbortTimeout();
    187   //start a new 90s timeout to trigger a close & reconnect if no data received
    188   self._stallAbortTimeout = setTimeout(function () {
    189     self._scheduleReconnect()
    190   }, 90000);
    191   return this;
    192 }
    193 
    194 /**
    195  * Stop stall timeout
    196  *
    197  */
    198 StreamingAPIConnection.prototype._stopStallAbortTimeout = function () {
    199   clearTimeout(this._stallAbortTimeout);
    200   // mark the timer as `null` so it is clear via introspection that the timeout is not scheduled
    201   delete this._stallAbortTimeout;
    202   return this;
    203 }
    204 
    205 /**
    206  * Computes the next time a reconnect should occur (based on the last HTTP response received)
    207  * and starts a timeout handle to begin reconnecting after `self._connectInterval` passes.
    208  *
    209  * @return {Undefined}
    210  */
    211 StreamingAPIConnection.prototype._scheduleReconnect = function () {
    212   var self = this;
    213   if (self.response && self.response.statusCode === 420) {
    214     // we are being rate limited
    215     // start with a 1 minute wait and double each attempt
    216     if (!self._connectInterval) {
    217       self._connectInterval = 60000;
    218     } else {
    219       self._connectInterval *= 2;
    220     }
    221   } else if (self.response && String(self.response.statusCode).charAt(0) === '5') {
    222     // twitter 5xx errors
    223     // start with a 5s wait, double each attempt up to 320s
    224     if (!self._connectInterval) {
    225       self._connectInterval = 5000;
    226     } else if (self._connectInterval < 320000) {
    227       self._connectInterval *= 2;
    228     } else {
    229       self._connectInterval = 320000;
    230     }
    231   } else {
    232     // we did not get an HTTP response from our last connection attempt.
    233     // DNS/TCP error, or a stall in the stream (and stall timer closed the connection)
    234     if (!self._usedFirstReconnect) {
    235       // first reconnection attempt on a valid connection should occur immediately
    236       self._connectInterval = 0;
    237       self._usedFirstReconnect = true;
    238     } else if (self._connectInterval < 16000) {
    239       // linearly increase delay by 250ms up to 16s
    240       self._connectInterval += 250;
    241     } else {
    242       // cap out reconnect interval at 16s
    243       self._connectInterval = 16000;
    244     }
    245   }
    246 
    247   // schedule the reconnect
    248   self._scheduledReconnect = setTimeout(function () {
    249     self._startPersistentConnection();
    250   }, self._connectInterval);
    251   self.emit('reconnect', self.request, self.response, self._connectInterval);
    252 }
    253 
    254 StreamingAPIConnection.prototype._setupParser = function () {
    255   var self = this
    256   self.parser = new Parser()
    257 
    258   // handle twitter objects as they come in - emit the generic `message` event
    259   // along with the specific event corresponding to the message
    260   self.parser.on('element', function (msg) {
    261     self.emit('message', msg)
    262 
    263     if      (msg.delete)          { self.emit('delete', msg) }
    264     else if (msg.disconnect)      { self._handleDisconnect(msg) }
    265     else if (msg.limit)           { self.emit('limit', msg) }
    266     else if (msg.scrub_geo)       { self.emit('scrub_geo', msg) }
    267     else if (msg.warning)         { self.emit('warning', msg) }
    268     else if (msg.status_withheld) { self.emit('status_withheld', msg) }
    269     else if (msg.user_withheld)   { self.emit('user_withheld', msg) }
    270     else if (msg.friends || msg.friends_str) { self.emit('friends', msg) }
    271     else if (msg.direct_message)  { self.emit('direct_message', msg) }
    272     else if (msg.event)           {
    273       self.emit('user_event', msg)
    274       // reference: https://dev.twitter.com/docs/streaming-apis/messages#User_stream_messages
    275       var ev = msg.event
    276 
    277       if      (ev === 'blocked')                { self.emit('blocked', msg) }
    278       else if (ev === 'unblocked')              { self.emit('unblocked', msg) }
    279       else if (ev === 'favorite')               { self.emit('favorite', msg) }
    280       else if (ev === 'unfavorite')             { self.emit('unfavorite', msg) }
    281       else if (ev === 'follow')                 { self.emit('follow', msg) }
    282       else if (ev === 'unfollow')               { self.emit('unfollow', msg) }
    283       else if (ev === 'mute')                   { self.emit('mute', msg) }
    284       else if (ev === 'unmute')                 { self.emit('unmute', msg) }
    285       else if (ev === 'user_update')            { self.emit('user_update', msg) }
    286       else if (ev === 'list_created')           { self.emit('list_created', msg) }
    287       else if (ev === 'list_destroyed')         { self.emit('list_destroyed', msg) }
    288       else if (ev === 'list_updated')           { self.emit('list_updated', msg) }
    289       else if (ev === 'list_member_added')      { self.emit('list_member_added', msg) }
    290       else if (ev === 'list_member_removed')    { self.emit('list_member_removed', msg) }
    291       else if (ev === 'list_user_subscribed')   { self.emit('list_user_subscribed', msg) }
    292       else if (ev === 'list_user_unsubscribed') { self.emit('list_user_unsubscribed', msg) }
    293       else if (ev === 'quoted_tweet')           { self.emit('quoted_tweet', msg) }
    294       else if (ev === 'favorited_retweet')      { self.emit('favorited_retweet', msg) }
    295       else if (ev === 'retweeted_retweet')      { self.emit('retweeted_retweet', msg) }
    296       else                                      { self.emit('unknown_user_event', msg) }
    297     } else                                      { self.emit('tweet', msg) }
    298   })
    299 
    300   self.parser.on('error', function (err) {
    301     self.emit('parser-error', err)
    302   });
    303   self.parser.on('connection-limit-exceeded', function (err) {
    304     self.emit('error', err);
    305   })
    306 }
    307 
    308 StreamingAPIConnection.prototype._handleDisconnect = function (twitterMsg) {
    309   this.emit('disconnect', twitterMsg);
    310   this.stop();
    311 }
    312 
    313 /**
    314  * Call whenever an http request is about to be made to update
    315  * our local timestamp (used for Oauth) to be Twitter's server time.
    316  *
    317  */
    318 StreamingAPIConnection.prototype._setOauthTimestamp = function () {
    319   var self = this;
    320   if (self.reqOpts.oauth) {
    321     var oauth_ts = Date.now() + self._twitter_time_minus_local_time_ms;
    322     self.reqOpts.oauth.timestamp = Math.floor(oauth_ts/1000).toString();
    323   }
    324 }
    325 
    326 /**
    327  * Call whenever an http response is received from Twitter,
    328  * to set our local timestamp offset from Twitter's server time.
    329  * This is used to set the Oauth timestamp for our next http request
    330  * to Twitter (by calling _setOauthTimestamp).
    331  *
    332  * @param  {http.IncomingResponse} resp   http response received from Twitter.
    333  */
    334 StreamingAPIConnection.prototype._updateOauthTimestampOffsetFromResponse = function (resp) {
    335   if (resp && resp.headers && resp.headers.date &&
    336       new Date(resp.headers.date).toString() !== 'Invalid Date'
    337   ) {
    338     var twitterTimeMs = new Date(resp.headers.date).getTime()
    339     this._twitter_time_minus_local_time_ms = twitterTimeMs - Date.now();
    340   }
    341 }
    342 
    343 module.exports = StreamingAPIConnection