twitst4tz

twitter statistics web application
Log | Files | Refs | README | LICENSE

streaming.js (19231B)


      1 var assert = require('assert')
      2   , http = require('http')
      3   , EventEmitter = require('events').EventEmitter
      4   , rewire = require('rewire')
      5   , sinon = require('sinon')
      6   , Twit = require('../lib/twitter')
      7   , config1 = require('../config1')
      8   , config2 = require('../config2')
      9   , colors = require('colors')
     10   , helpers = require('./helpers')
     11   , util = require('util')
     12   , zlib = require('zlib')
     13   , async = require('async')
     14   , restTest = require('./rest');
     15 
     16 /**
     17  * Stop the stream and check the tweet we got back.
     18  * Call @done on completion.
     19  *
     20  * @param  {object}   stream object returned by twit.stream()
     21  * @param  {Function} done   completion callback
     22  */
     23 exports.checkStream = function (stream, done) {
     24   stream.on('connected', function () {
     25     console.log('\nconnected'.grey)
     26   });
     27 
     28   stream.once('tweet', function (tweet) {
     29     stream.stop()
     30     assert.ok(tweet)
     31     assert.equal('string', typeof tweet.text)
     32     assert.equal('string', typeof tweet.id_str)
     33 
     34     console.log(('\ntweet: '+tweet.text).grey)
     35 
     36     done()
     37   });
     38 
     39   stream.on('reconnecting', function (req, res, connectInterval) {
     40     console.log('Got disconnected. Scheduling reconnect! statusCode:', res.statusCode, 'connectInterval', connectInterval)
     41   });
     42 
     43   stream.on('error', function (err) {
     44     console.log('Stream emitted an error', err)
     45     return done(err)
     46   })
     47 }
     48 
     49 /**
     50  * Check the stream state is correctly set for a stopped stream.
     51  *
     52  * @param  {object}   stream object returned by twit.stream()
     53  */
     54 exports.checkStreamStopState = function (stream) {
     55   assert.strictEqual(stream._connectInterval, 0)
     56   assert.strictEqual(stream._usedFirstReconnect, false)
     57   assert.strictEqual(stream._scheduledReconnect, undefined)
     58   assert.strictEqual(stream._stallAbortTimeout, undefined)
     59 }
     60 
     61 describe('Streaming API', function () {
     62 
     63   it('statuses/sample', function (done) {
     64     var twit = new Twit(config1);
     65     var stream = twit.stream('statuses/sample')
     66 
     67     exports.checkStream(stream, done)
     68   })
     69 
     70   it('statuses/filter using `track`', function (done) {
     71     this.timeout(120000)
     72     var twit = new Twit(config2);
     73     var stream = twit.stream('statuses/filter', { track: 'fun' })
     74 
     75     exports.checkStream(stream, done)
     76   })
     77 
     78   it('statuses/filter using `locations` string', function (done) {
     79     var twit = new Twit(config1);
     80     var world = '-180,-90,180,90';
     81     var stream = twit.stream('statuses/filter', { locations: world })
     82 
     83     exports.checkStream(stream, done)
     84   })
     85 
     86   it('statuses/filter using `locations` array for San Francisco and New York', function (done) {
     87     var twit = new Twit(config2);
     88     var params = {
     89       locations: [ '-122.75', '36.8', '121.75', '37.8', '-74', '40', '73', '41' ]
     90     }
     91 
     92     var stream = twit.stream('statuses/filter', params)
     93 
     94     exports.checkStream(stream, done)
     95   })
     96 
     97   it('statuses/filter using `track` array', function (done) {
     98     var twit = new Twit(config1);
     99     var params = {
    100       track: [ 'twitter', ':)', 'fun' ]
    101     }
    102 
    103     var stream = twit.stream('statuses/filter', params)
    104 
    105     exports.checkStream(stream, done)
    106   })
    107 
    108   it('statuses/filter using `track` and `language`', function (done) {
    109     var twit = new Twit(config1);
    110     var params = {
    111       track: [ 'twitter', '#apple', 'google', 'twitter', 'facebook', 'happy', 'party', ':)' ],
    112       language: 'en'
    113     }
    114 
    115     var stream = twit.stream('statuses/filter', params)
    116 
    117     exports.checkStream(stream, done)
    118   })
    119 
    120   it('stopping & restarting the stream works', function (done) {
    121     var twit = new Twit(config2);
    122     var stream = twit.stream('statuses/sample')
    123 
    124     //stop the stream after 2 seconds
    125     setTimeout(function () {
    126       stream.stop()
    127 
    128       exports.checkStreamStopState(stream)
    129 
    130       console.log('\nstopped stream')
    131     }, 2000)
    132 
    133     //after 3 seconds, start the stream, and stop after 'connect'
    134     setTimeout(function () {
    135       stream.once('connected', function (req) {
    136         console.log('\nrestarted stream')
    137         stream.stop()
    138 
    139         exports.checkStreamStopState(stream)
    140 
    141         console.log('\nstopped stream')
    142         done()
    143       })
    144 
    145       //restart the stream
    146       stream.start()
    147     }, 3000)
    148   })
    149 
    150   it('stopping & restarting stream emits to previously assigned callbacks', function (done) {
    151     var twit = new Twit(config1);
    152     var stream = twit.stream('statuses/sample')
    153 
    154     var started = false
    155     var numTweets = 0
    156     stream.on('tweet', function (tweet) {
    157       process.stdout.write('.')
    158       if (!started) {
    159         started = true
    160         numTweets++
    161         console.log('received tweet', numTweets)
    162 
    163         console.log('stopping stream')
    164         stream.stop()
    165 
    166         exports.checkStreamStopState(stream)
    167 
    168         // we've successfully received a new tweet after restarting, test successful
    169         if (numTweets === 2) {
    170           done()
    171         } else {
    172           started = false
    173           console.log('restarting stream')
    174 
    175           setTimeout(function () {
    176             stream.start()
    177           }, 1000)
    178         }
    179       }
    180     })
    181 
    182     stream.on('limit', function (limitMsg) {
    183       console.log('limit', limitMsg)
    184     })
    185 
    186     stream.on('disconnect', function (disconnMsg) {
    187       console.log('disconnect', disconnMsg)
    188     })
    189 
    190     stream.on('reconnect', function (req, res, ival) {
    191       console.log('reconnect. statusCode:', res.statusCode, 'interval:', ival)
    192     })
    193 
    194     stream.on('connect', function (req) {
    195       console.log('connect')
    196     })
    197 
    198   })
    199 })
    200 
    201 describe('streaming API direct message events', function () {
    202     var senderScreenName;
    203     var receiverScreenName;
    204     var twitSender;
    205     var twitReceiver;
    206 
    207     // before we send direct messages the user receiving the DM
    208     // has to follow the sender. Make this so.
    209     before(function (done) {
    210       twitSender = new Twit(config1);
    211       twitReceiver = new Twit(config2);
    212 
    213       // get sender/receiver names in parallel, then make the receiver follow the sender
    214       async.parallel({
    215         // get sender screen name and set it for tests to use
    216         getSenderScreenName: function (parNext) {
    217           console.log('getting sender user screen_name')
    218 
    219           twitSender.get('account/verify_credentials', { twit_options: { retry: true } }, function (err, reply) {
    220             assert(!err, err)
    221 
    222             assert(reply)
    223             assert(reply.screen_name)
    224 
    225             senderScreenName = reply.screen_name
    226 
    227             return parNext()
    228           })
    229         },
    230         // get receiver screen name and set it for tests to use
    231         getReceiverScreenName: function (parNext) {
    232           console.log('getting receiver user screen_name')
    233           twitReceiver.get('account/verify_credentials', { twit_options: { retry: true } }, function (err, reply) {
    234             assert(!err, err)
    235 
    236             assert(reply)
    237             assert(reply.screen_name)
    238 
    239             receiverScreenName = reply.screen_name
    240 
    241             return parNext()
    242           })
    243         }
    244       }, function (err) {
    245         assert(!err, err)
    246 
    247         var followParams = { screen_name: senderScreenName }
    248         console.log('making receiver user follow the sender user')
    249         // make receiver follow sender
    250         twitReceiver.post('friendships/create', followParams, function (err, reply) {
    251           assert(!err, err)
    252           assert(reply.following)
    253 
    254           done()
    255         })
    256       })
    257     })
    258 
    259     it('user_stream `direct_message` event', function (done) {
    260       // User A follows User B
    261       // User A connects to their user stream
    262       // User B posts a DM to User A
    263       // User A receives it in their user stream
    264       this.timeout(0);
    265 
    266       // build out DM params
    267       function makeDmParams () {
    268         return {
    269           screen_name: receiverScreenName,
    270           text: helpers.generateRandomString(10) + ' direct message streaming event test! :-) ' + helpers.generateRandomString(20),
    271           twit_options: {
    272             retry: true
    273           }
    274         }
    275       }
    276 
    277       var dmIdsReceived = []
    278       var dmIdsSent = []
    279       var sentDmFound = false
    280 
    281       // start listening for user stream events
    282       var receiverStream = twitReceiver.stream('user')
    283 
    284       console.log('\nlistening for DMs')
    285       // listen for direct_message event and check DM once it's received
    286       receiverStream.on('direct_message', function (directMsg) {
    287         if (sentDmFound) {
    288           // don't call `done` more than once
    289           return
    290         }
    291 
    292         console.log('got DM event. id:', directMsg.direct_message.id_str)
    293         restTest.checkDm(directMsg.direct_message)
    294         dmIdsReceived.push(directMsg.direct_message.id_str)
    295 
    296         // make sure one of the DMs sent was found
    297         // (we can send multiple DMs if our stream has to reconnect)
    298         sentDmFound = dmIdsSent.some(function (dmId) {
    299           return dmId == directMsg.direct_message.id_str
    300         })
    301 
    302         if (!sentDmFound) {
    303           console.log('this DM doesnt match our test DMs - still waiting for a matching one.')
    304           console.log('dmIdsSent', dmIdsSent)
    305           return
    306         }
    307 
    308         receiverStream.stop()
    309         return done()
    310       })
    311 
    312       var lastTimeSent = 0
    313       var msToWait = 0
    314       var postDmInterval = null
    315 
    316       receiverStream.on('connected', function () {
    317         var dmParams = makeDmParams()
    318 
    319         console.log('sending a new DM:', dmParams.text)
    320         twitSender.post('direct_messages/new', dmParams, function (err, reply) {
    321           assert(!err, err)
    322           assert(reply)
    323           restTest.checkDm(reply)
    324           assert(reply.id_str)
    325           // we will check this dm against the reply recieved in the message event
    326           dmIdsSent.push(reply.id_str)
    327 
    328           console.log('successfully posted DM:', reply.text, reply.id_str)
    329           if (dmIdsReceived.indexOf(reply.id_str) !== -1) {
    330             // our response to the DM posting lost the race against the direct_message
    331             // listener (we already got the event). So we can finish the test.
    332             done()
    333           }
    334         })
    335       })
    336 
    337       after(function (done) {
    338         console.log('cleaning up DMs:', dmIdsSent)
    339         // delete the DMs we posted
    340         var deleteDms = dmIdsSent.map(function (dmId) {
    341           return function (next) {
    342             assert.equal(typeof dmId, 'string')
    343             console.log('\ndeleting DM', dmId)
    344             var params = { id: dmId, twit_options: { retry: true } }
    345             twitSender.post('direct_messages/destroy', params, function (err, reply) {
    346               assert(!err, err)
    347               restTest.checkDm(reply)
    348               assert.equal(reply.id, dmId)
    349               return next()
    350             })
    351           }
    352         })
    353         async.parallel(deleteDms, done)
    354       })
    355     })
    356 })
    357 
    358 describe('streaming API friends preamble', function () {
    359   it('returns an array of strings if stringify_friend_ids is true', function (done) {
    360     var twit = new Twit(config1);
    361     var stream = twit.stream('user', { stringify_friend_ids: true });
    362     stream.on('friends', function (friendsObj) {
    363       assert(friendsObj)
    364       assert(friendsObj.friends_str)
    365       if (friendsObj.friends_str.length) {
    366         assert.equal(typeof friendsObj.friends_str[0], 'string')
    367       } else {
    368         console.log('\nEmpty friends preamble:', friendsObj, '. Make some friends on Twitter! ^_^')
    369       }
    370       done()
    371     })
    372   })
    373 })
    374 
    375 describe('streaming API bad request', function (done) {
    376   it('emits an error for a 401 response', function (done) {
    377     var badCredentials = {
    378         consumer_key: 'a'
    379       , consumer_secret: 'b'
    380       , access_token: 'c'
    381       , access_token_secret: 'd'
    382     }
    383 
    384     var twit = new Twit(badCredentials);
    385 
    386     var stream = twit.stream('statuses/filter', { track : ['foo'] });
    387 
    388     stream.on('error', function (err) {
    389       assert.equal(err.statusCode, 401)
    390       assert(err.twitterReply)
    391 
    392       return done()
    393     })
    394   })
    395 })
    396 
    397 describe('streaming API `messages` event', function (done) {
    398   var request = require('request');
    399   var originalPost = request.post;
    400   var RewiredTwit = rewire('../lib/twitter');
    401   var RewiredStreamingApiConnection = rewire('../lib/streaming-api-connection');
    402   var revertParser, revertTwit;
    403 
    404   var MockParser = function () {
    405     var self = this;
    406     EventEmitter.call(self);
    407     process.nextTick(function () {
    408       self.emit('element', {scrub_geo: 'bar'})
    409       self.emit('element', {limit: 'buzz'})
    410     });
    411   }
    412   util.inherits(MockParser, EventEmitter);
    413 
    414   before(function () {
    415     revertTwit = RewiredTwit.__set__('StreamingAPIConnection', RewiredStreamingApiConnection);
    416     revertParser = RewiredStreamingApiConnection.__set__('Parser', MockParser);
    417 
    418     request.post = function () { return new helpers.FakeRequest() }
    419   })
    420 
    421   after(function () {
    422     request.post = originalPost;
    423     revertTwit();
    424     revertParser();
    425   })
    426 
    427   it('is returned for 2 different event types', function (done) {
    428     var twit = new RewiredTwit(config1);
    429     var stream = twit.stream('statuses/sample');
    430     var gotScrubGeo = false;
    431     var gotLimit = false;
    432     var numMessages = 0;
    433 
    434     var maybeDone = function () {
    435       if (gotScrubGeo && gotLimit && numMessages == 2) {
    436         done()
    437       }
    438     }
    439 
    440     stream.on('limit', function () {
    441       gotLimit = true;
    442       maybeDone();
    443     });
    444     stream.on('scrub_geo', function () {
    445       gotScrubGeo = true;
    446       maybeDone();
    447     })
    448 
    449     stream.on('message', function (msg) {
    450       numMessages++;
    451       maybeDone();
    452     })
    453   })
    454 })
    455 
    456 describe('streaming reconnect', function (done) {
    457   it('correctly implements connection closing backoff', function (done) {
    458     var stubPost = function () {
    459       var fakeRequest = new helpers.FakeRequest()
    460       process.nextTick(function () {
    461         fakeRequest.emit('close')
    462       })
    463       return fakeRequest
    464     }
    465 
    466     var request = require('request')
    467     var stubPost = sinon.stub(request, 'post', stubPost)
    468 
    469     var twit = new Twit(config1);
    470     var stream = twit.stream('statuses/filter', { track: [ 'fun', 'yolo']});
    471 
    472     var reconnects = [0, 250, 500, 750]
    473     var reconnectCount = -1
    474 
    475     var testDone = false
    476 
    477     stream.on('reconnect', function () {
    478       if (testDone) {
    479         return
    480       }
    481       reconnectCount += 1
    482       var expectedInterval = reconnects[reconnectCount]
    483 
    484       // make sure our connect interval is correct
    485       assert.equal(stream._connectInterval, expectedInterval);
    486 
    487       // simulate immediate reconnect by forcing a new connection (`self._connectInterval` parameter unchanged)
    488       stream._startPersistentConnection();
    489 
    490       if (reconnectCount === reconnects.length -1) {
    491         // restore request.post
    492         stubPost.restore()
    493         testDone = true
    494         return done();
    495       }
    496     });
    497   });
    498 
    499   it('correctly implements 420 backoff', function (done) {
    500     var stubPost = function () {
    501       var fakeRequest = new helpers.FakeRequest()
    502       process.nextTick(function () {
    503         var fakeResponse = new helpers.FakeResponse(420)
    504         fakeRequest.emit('response', fakeResponse)
    505         fakeRequest.emit('close')
    506       })
    507       return fakeRequest
    508     }
    509 
    510     var request = require('request')
    511     var stubPost = sinon.stub(request, 'post', stubPost)
    512 
    513     var twit = new Twit(config1);
    514     var stream = twit.stream('statuses/filter', { track: [ 'fun', 'yolo']});
    515 
    516     var reconnects = [60000, 120000, 240000, 480000]
    517     var reconnectCount = -1
    518     var testComplete = false
    519 
    520     stream.on('reconnect', function (req, res, connectInterval) {
    521       if (testComplete) {
    522         // prevent race between last connection attempt firing a reconnect and us validating the final
    523         // reconnect value in `reconnects`
    524         return
    525       }
    526 
    527       reconnectCount += 1
    528        var expectedInterval = reconnects[reconnectCount]
    529 
    530       // make sure our connect interval is correct
    531       assert.equal(stream._connectInterval, connectInterval);
    532       assert.equal(stream._connectInterval, expectedInterval);
    533       // simulate immediate reconnect by forcing a new connection (`self._connectInterval` parameter unchanged)
    534       stream._startPersistentConnection();
    535 
    536       if (reconnectCount === reconnects.length -1) {
    537         // restore request.post
    538         stubPost.restore()
    539         testComplete = true
    540         return done();
    541       }
    542     });
    543   });
    544 });
    545 
    546 describe('Streaming API disconnect message', function (done) {
    547   it.skip('results in stopping the stream', function (done) {
    548     var stubPost = function () {
    549       var fakeRequest = new helpers.FakeRequest()
    550       process.nextTick(function () {
    551         var body = zlib.gzipSync(JSON.stringify({disconnect: true}) + '\r\n')
    552         var fakeResponse = new helpers.FakeResponse(200, body)
    553         fakeRequest.emit('response', fakeResponse);
    554         fakeResponse.emit('close')
    555       });
    556       return fakeRequest
    557     }
    558 
    559     var request = require('request')
    560     var origRequest = request.post
    561     var stubs = sinon.collection
    562     stubs.stub(request, 'post', stubPost)
    563 
    564     var twit = new Twit(config1);
    565     var stream = twit.stream('statuses/filter', { track: ['fun']});
    566 
    567     stream.on('disconnect', function (disconnMsg) {
    568       stream.stop();
    569       // restore stub
    570       request.post = origRequest
    571       done();
    572     })
    573   })
    574 });
    575 
    576 describe.skip('Streaming API Connection limit exceeded message', function (done) {
    577   it('results in an `error` event containing the message', function (done) {
    578     var errMsg = 'Exceeded connection limit for user';
    579 
    580     var stubPost = function () {
    581       var fakeRequest = new helpers.FakeRequest();
    582       process.nextTick(function () {
    583         var body = zlib.gzipSync(errMsg + '\r\n');
    584         var fakeResponse = new helpers.FakeResponse(200, body);
    585         fakeRequest.emit('response', fakeResponse);
    586         fakeResponse.emit('close');
    587       });
    588       return fakeRequest
    589     }
    590 
    591     var request = require('request');
    592     var origRequest = request.post;
    593     var stubs = sinon.collection;
    594     stubs.stub(request, 'post', stubPost);
    595 
    596     var twit = new Twit(config1);
    597     var stream = twit.stream('statuses/filter');
    598 
    599     stream.on('error', function (err) {
    600       assert(err.toString().indexOf(errMsg) !== -1, 'Unexpected error msg:' + errMsg + '.');;
    601       stream.stop();
    602       // restore stub
    603       request.post = origRequest;
    604       done();
    605     })
    606   })
    607 })
    608 
    609 describe('Streaming API connection management', function () {
    610   it('.stop() works in all states', function (done) {
    611     var stubPost = function () {
    612       var fakeRequest = new helpers.FakeRequest();
    613       process.nextTick(function () {
    614         var body = zlib.gzipSync('Foobar\r\n');
    615         var fakeResponse = new helpers.FakeResponse(200, body);
    616         fakeRequest.emit('response', fakeResponse);
    617       });
    618       return fakeRequest
    619     }
    620 
    621     var request = require('request');
    622     var origRequest = request.post;
    623     var stubs = sinon.collection;
    624     stubs.stub(request, 'post', stubPost);
    625 
    626     var twit = new Twit(config1);
    627 
    628     var stream = twit.stream('statuses/sample');
    629     stream.stop();
    630     console.log('\nStopped. Restarting..');
    631     stream.start();
    632     stream.once('connect', function(request) {
    633       console.log('Stream emitted `connect`. Stopping & starting stream..')
    634       stream.stop();
    635 
    636       stream.once('connected', function () {
    637         console.log('Stream emitted `connected`. Stopping stream.');
    638         stream.stop();
    639 
    640         stubs.restore();
    641         done();
    642       });
    643       stream.start();
    644     });
    645   })
    646 })