streaming-api-connection.js (12850B)
1 2 var EventEmitter = require('events').EventEmitter; 3 var util = require('util'); 4 5 var helpers = require('./helpers') 6 var Parser = require('./parser'); 7 var request = require('request'); 8 9 var STATUS_CODES_TO_ABORT_ON = require('./settings').STATUS_CODES_TO_ABORT_ON 10 11 var StreamingAPIConnection = function (reqOpts, twitOptions) { 12 this.reqOpts = reqOpts 13 this.twitOptions = twitOptions 14 this._twitter_time_minus_local_time_ms = 0 15 EventEmitter.call(this) 16 } 17 18 util.inherits(StreamingAPIConnection, EventEmitter) 19 20 /** 21 * Resets the connection. 22 * - clears request, response, parser 23 * - removes scheduled reconnect handle (if one was scheduled) 24 * - stops the stall abort timeout handle (if one was scheduled) 25 */ 26 StreamingAPIConnection.prototype._resetConnection = function () { 27 if (this.request) { 28 // clear our reference to the `request` instance 29 this.request.removeAllListeners(); 30 this.request.destroy(); 31 } 32 33 if (this.response) { 34 // clear our reference to the http.IncomingMessage instance 35 this.response.removeAllListeners(); 36 this.response.destroy(); 37 } 38 39 if (this.parser) { 40 this.parser.removeAllListeners() 41 } 42 43 // ensure a scheduled reconnect does not occur (if one was scheduled) 44 // this can happen if we get a close event before .stop() is called 45 clearTimeout(this._scheduledReconnect) 46 delete this._scheduledReconnect 47 48 // clear our stall abort timeout 49 this._stopStallAbortTimeout() 50 } 51 52 /** 53 * Resets the parameters used in determining the next reconnect time 54 */ 55 StreamingAPIConnection.prototype._resetRetryParams = function () { 56 // delay for next reconnection attempt 57 this._connectInterval = 0 58 // flag indicating whether we used a 0-delay reconnect 59 this._usedFirstReconnect = false 60 } 61 62 StreamingAPIConnection.prototype._startPersistentConnection = function () { 63 var self = this; 64 self._resetConnection(); 65 self._setupParser(); 66 self._resetStallAbortTimeout(); 67 self._setOauthTimestamp(); 68 this.reqOpts.encoding = 'utf8' 69 self.request = request.post(this.reqOpts); 70 self.emit('connect', self.request); 71 self.request.on('response', function (response) { 72 self._updateOauthTimestampOffsetFromResponse(response) 73 // reset our reconnection attempt flag so next attempt goes through with 0 delay 74 // if we get a transport-level error 75 self._usedFirstReconnect = false; 76 // start a stall abort timeout handle 77 self._resetStallAbortTimeout(); 78 self.response = response 79 if (STATUS_CODES_TO_ABORT_ON.indexOf(self.response.statusCode) !== -1) { 80 // We got a status code telling us we should abort the connection. 81 // Read the body from the response and return an error to the user. 82 var body = ''; 83 84 self.request.on('data', function (chunk) { 85 body += chunk; 86 }) 87 88 self.request.on('end', function () { 89 try { 90 body = JSON.parse(body) 91 } catch (jsonDecodeError) { 92 // Twitter may send an HTML body 93 // if non-JSON text was returned, we'll just attach it to the error as-is 94 } 95 // surface the error to the user 96 var error = helpers.makeTwitError('Bad Twitter streaming request: ' + self.response.statusCode) 97 error.statusCode = response ? response.statusCode: null; 98 helpers.attachBodyInfoToError(error, body) 99 self.emit('error', error); 100 // stop the stream explicitly so we don't reconnect 101 self.stop() 102 body = null; 103 }); 104 self.request.on('error', function (err) { 105 var twitErr = helpers.makeTwitError(err.message); 106 twitErr.statusCode = self.response.statusCode; 107 helpers.attachBodyInfoToError(twitErr, body); 108 self.emit('parser-error', twitErr); 109 }); 110 } else if (self.response.statusCode === 420) { 111 // close the connection forcibly so a reconnect is scheduled by `self.onClose()` 112 self._scheduleReconnect(); 113 } else { 114 // We got an OK status code - the response should be valid. 115 // Read the body from the response and return to the user. 116 //pass all response data to parser 117 self.request.on('data', function(data) { 118 self._connectInterval = 0 119 self._resetStallAbortTimeout(); 120 self.parser.parse(data); 121 }) 122 123 self.response.on('close', self._onClose.bind(self)) 124 self.response.on('error', function (err) { 125 // expose response errors on twit instance 126 self.emit('error', err); 127 }) 128 129 // connected without an error response from Twitter, emit `connected` event 130 // this must be emitted after all its event handlers are bound 131 // so the reference to `self.response` is not interfered-with by the user until it is emitted 132 self.emit('connected', self.response); 133 } 134 }); 135 self.request.on('close', self._onClose.bind(self)); 136 self.request.on('error', function (err) { self._scheduleReconnect.bind(self) }); 137 return self; 138 } 139 140 /** 141 * Handle when the request or response closes. 142 * Schedule a reconnect according to Twitter's reconnect guidelines 143 * 144 */ 145 StreamingAPIConnection.prototype._onClose = function () { 146 var self = this; 147 self._stopStallAbortTimeout(); 148 if (self._scheduledReconnect) { 149 // if we already have a reconnect scheduled, don't schedule another one. 150 // this race condition can happen if the http.ClientRequest and http.IncomingMessage both emit `close` 151 return 152 } 153 154 self._scheduleReconnect(); 155 } 156 157 /** 158 * Kick off the http request, and persist the connection 159 * 160 */ 161 StreamingAPIConnection.prototype.start = function () { 162 this._resetRetryParams(); 163 this._startPersistentConnection(); 164 return this; 165 } 166 167 /** 168 * Abort the http request, stop scheduled reconnect (if one was scheduled) and clear state 169 * 170 */ 171 StreamingAPIConnection.prototype.stop = function () { 172 // clear connection variables and timeout handles 173 this._resetConnection(); 174 this._resetRetryParams(); 175 return this; 176 } 177 178 /** 179 * Stop and restart the stall abort timer (called when new data is received) 180 * 181 * If we go 90s without receiving data from twitter, we abort the request & reconnect. 182 */ 183 StreamingAPIConnection.prototype._resetStallAbortTimeout = function () { 184 var self = this; 185 // stop the previous stall abort timer 186 self._stopStallAbortTimeout(); 187 //start a new 90s timeout to trigger a close & reconnect if no data received 188 self._stallAbortTimeout = setTimeout(function () { 189 self._scheduleReconnect() 190 }, 90000); 191 return this; 192 } 193 194 /** 195 * Stop stall timeout 196 * 197 */ 198 StreamingAPIConnection.prototype._stopStallAbortTimeout = function () { 199 clearTimeout(this._stallAbortTimeout); 200 // mark the timer as `null` so it is clear via introspection that the timeout is not scheduled 201 delete this._stallAbortTimeout; 202 return this; 203 } 204 205 /** 206 * Computes the next time a reconnect should occur (based on the last HTTP response received) 207 * and starts a timeout handle to begin reconnecting after `self._connectInterval` passes. 208 * 209 * @return {Undefined} 210 */ 211 StreamingAPIConnection.prototype._scheduleReconnect = function () { 212 var self = this; 213 if (self.response && self.response.statusCode === 420) { 214 // we are being rate limited 215 // start with a 1 minute wait and double each attempt 216 if (!self._connectInterval) { 217 self._connectInterval = 60000; 218 } else { 219 self._connectInterval *= 2; 220 } 221 } else if (self.response && String(self.response.statusCode).charAt(0) === '5') { 222 // twitter 5xx errors 223 // start with a 5s wait, double each attempt up to 320s 224 if (!self._connectInterval) { 225 self._connectInterval = 5000; 226 } else if (self._connectInterval < 320000) { 227 self._connectInterval *= 2; 228 } else { 229 self._connectInterval = 320000; 230 } 231 } else { 232 // we did not get an HTTP response from our last connection attempt. 233 // DNS/TCP error, or a stall in the stream (and stall timer closed the connection) 234 if (!self._usedFirstReconnect) { 235 // first reconnection attempt on a valid connection should occur immediately 236 self._connectInterval = 0; 237 self._usedFirstReconnect = true; 238 } else if (self._connectInterval < 16000) { 239 // linearly increase delay by 250ms up to 16s 240 self._connectInterval += 250; 241 } else { 242 // cap out reconnect interval at 16s 243 self._connectInterval = 16000; 244 } 245 } 246 247 // schedule the reconnect 248 self._scheduledReconnect = setTimeout(function () { 249 self._startPersistentConnection(); 250 }, self._connectInterval); 251 self.emit('reconnect', self.request, self.response, self._connectInterval); 252 } 253 254 StreamingAPIConnection.prototype._setupParser = function () { 255 var self = this 256 self.parser = new Parser() 257 258 // handle twitter objects as they come in - emit the generic `message` event 259 // along with the specific event corresponding to the message 260 self.parser.on('element', function (msg) { 261 self.emit('message', msg) 262 263 if (msg.delete) { self.emit('delete', msg) } 264 else if (msg.disconnect) { self._handleDisconnect(msg) } 265 else if (msg.limit) { self.emit('limit', msg) } 266 else if (msg.scrub_geo) { self.emit('scrub_geo', msg) } 267 else if (msg.warning) { self.emit('warning', msg) } 268 else if (msg.status_withheld) { self.emit('status_withheld', msg) } 269 else if (msg.user_withheld) { self.emit('user_withheld', msg) } 270 else if (msg.friends || msg.friends_str) { self.emit('friends', msg) } 271 else if (msg.direct_message) { self.emit('direct_message', msg) } 272 else if (msg.event) { 273 self.emit('user_event', msg) 274 // reference: https://dev.twitter.com/docs/streaming-apis/messages#User_stream_messages 275 var ev = msg.event 276 277 if (ev === 'blocked') { self.emit('blocked', msg) } 278 else if (ev === 'unblocked') { self.emit('unblocked', msg) } 279 else if (ev === 'favorite') { self.emit('favorite', msg) } 280 else if (ev === 'unfavorite') { self.emit('unfavorite', msg) } 281 else if (ev === 'follow') { self.emit('follow', msg) } 282 else if (ev === 'unfollow') { self.emit('unfollow', msg) } 283 else if (ev === 'mute') { self.emit('mute', msg) } 284 else if (ev === 'unmute') { self.emit('unmute', msg) } 285 else if (ev === 'user_update') { self.emit('user_update', msg) } 286 else if (ev === 'list_created') { self.emit('list_created', msg) } 287 else if (ev === 'list_destroyed') { self.emit('list_destroyed', msg) } 288 else if (ev === 'list_updated') { self.emit('list_updated', msg) } 289 else if (ev === 'list_member_added') { self.emit('list_member_added', msg) } 290 else if (ev === 'list_member_removed') { self.emit('list_member_removed', msg) } 291 else if (ev === 'list_user_subscribed') { self.emit('list_user_subscribed', msg) } 292 else if (ev === 'list_user_unsubscribed') { self.emit('list_user_unsubscribed', msg) } 293 else if (ev === 'quoted_tweet') { self.emit('quoted_tweet', msg) } 294 else if (ev === 'favorited_retweet') { self.emit('favorited_retweet', msg) } 295 else if (ev === 'retweeted_retweet') { self.emit('retweeted_retweet', msg) } 296 else { self.emit('unknown_user_event', msg) } 297 } else { self.emit('tweet', msg) } 298 }) 299 300 self.parser.on('error', function (err) { 301 self.emit('parser-error', err) 302 }); 303 self.parser.on('connection-limit-exceeded', function (err) { 304 self.emit('error', err); 305 }) 306 } 307 308 StreamingAPIConnection.prototype._handleDisconnect = function (twitterMsg) { 309 this.emit('disconnect', twitterMsg); 310 this.stop(); 311 } 312 313 /** 314 * Call whenever an http request is about to be made to update 315 * our local timestamp (used for Oauth) to be Twitter's server time. 316 * 317 */ 318 StreamingAPIConnection.prototype._setOauthTimestamp = function () { 319 var self = this; 320 if (self.reqOpts.oauth) { 321 var oauth_ts = Date.now() + self._twitter_time_minus_local_time_ms; 322 self.reqOpts.oauth.timestamp = Math.floor(oauth_ts/1000).toString(); 323 } 324 } 325 326 /** 327 * Call whenever an http response is received from Twitter, 328 * to set our local timestamp offset from Twitter's server time. 329 * This is used to set the Oauth timestamp for our next http request 330 * to Twitter (by calling _setOauthTimestamp). 331 * 332 * @param {http.IncomingResponse} resp http response received from Twitter. 333 */ 334 StreamingAPIConnection.prototype._updateOauthTimestampOffsetFromResponse = function (resp) { 335 if (resp && resp.headers && resp.headers.date && 336 new Date(resp.headers.date).toString() !== 'Invalid Date' 337 ) { 338 var twitterTimeMs = new Date(resp.headers.date).getTime() 339 this._twitter_time_minus_local_time_ms = twitterTimeMs - Date.now(); 340 } 341 } 342 343 module.exports = StreamingAPIConnection