streaming.js (19231B)
1 var assert = require('assert') 2 , http = require('http') 3 , EventEmitter = require('events').EventEmitter 4 , rewire = require('rewire') 5 , sinon = require('sinon') 6 , Twit = require('../lib/twitter') 7 , config1 = require('../config1') 8 , config2 = require('../config2') 9 , colors = require('colors') 10 , helpers = require('./helpers') 11 , util = require('util') 12 , zlib = require('zlib') 13 , async = require('async') 14 , restTest = require('./rest'); 15 16 /** 17 * Stop the stream and check the tweet we got back. 18 * Call @done on completion. 19 * 20 * @param {object} stream object returned by twit.stream() 21 * @param {Function} done completion callback 22 */ 23 exports.checkStream = function (stream, done) { 24 stream.on('connected', function () { 25 console.log('\nconnected'.grey) 26 }); 27 28 stream.once('tweet', function (tweet) { 29 stream.stop() 30 assert.ok(tweet) 31 assert.equal('string', typeof tweet.text) 32 assert.equal('string', typeof tweet.id_str) 33 34 console.log(('\ntweet: '+tweet.text).grey) 35 36 done() 37 }); 38 39 stream.on('reconnecting', function (req, res, connectInterval) { 40 console.log('Got disconnected. Scheduling reconnect! statusCode:', res.statusCode, 'connectInterval', connectInterval) 41 }); 42 43 stream.on('error', function (err) { 44 console.log('Stream emitted an error', err) 45 return done(err) 46 }) 47 } 48 49 /** 50 * Check the stream state is correctly set for a stopped stream. 51 * 52 * @param {object} stream object returned by twit.stream() 53 */ 54 exports.checkStreamStopState = function (stream) { 55 assert.strictEqual(stream._connectInterval, 0) 56 assert.strictEqual(stream._usedFirstReconnect, false) 57 assert.strictEqual(stream._scheduledReconnect, undefined) 58 assert.strictEqual(stream._stallAbortTimeout, undefined) 59 } 60 61 describe('Streaming API', function () { 62 63 it('statuses/sample', function (done) { 64 var twit = new Twit(config1); 65 var stream = twit.stream('statuses/sample') 66 67 exports.checkStream(stream, done) 68 }) 69 70 it('statuses/filter using `track`', function (done) { 71 this.timeout(120000) 72 var twit = new Twit(config2); 73 var stream = twit.stream('statuses/filter', { track: 'fun' }) 74 75 exports.checkStream(stream, done) 76 }) 77 78 it('statuses/filter using `locations` string', function (done) { 79 var twit = new Twit(config1); 80 var world = '-180,-90,180,90'; 81 var stream = twit.stream('statuses/filter', { locations: world }) 82 83 exports.checkStream(stream, done) 84 }) 85 86 it('statuses/filter using `locations` array for San Francisco and New York', function (done) { 87 var twit = new Twit(config2); 88 var params = { 89 locations: [ '-122.75', '36.8', '121.75', '37.8', '-74', '40', '73', '41' ] 90 } 91 92 var stream = twit.stream('statuses/filter', params) 93 94 exports.checkStream(stream, done) 95 }) 96 97 it('statuses/filter using `track` array', function (done) { 98 var twit = new Twit(config1); 99 var params = { 100 track: [ 'twitter', ':)', 'fun' ] 101 } 102 103 var stream = twit.stream('statuses/filter', params) 104 105 exports.checkStream(stream, done) 106 }) 107 108 it('statuses/filter using `track` and `language`', function (done) { 109 var twit = new Twit(config1); 110 var params = { 111 track: [ 'twitter', '#apple', 'google', 'twitter', 'facebook', 'happy', 'party', ':)' ], 112 language: 'en' 113 } 114 115 var stream = twit.stream('statuses/filter', params) 116 117 exports.checkStream(stream, done) 118 }) 119 120 it('stopping & restarting the stream works', function (done) { 121 var twit = new Twit(config2); 122 var stream = twit.stream('statuses/sample') 123 124 //stop the stream after 2 seconds 125 setTimeout(function () { 126 stream.stop() 127 128 exports.checkStreamStopState(stream) 129 130 console.log('\nstopped stream') 131 }, 2000) 132 133 //after 3 seconds, start the stream, and stop after 'connect' 134 setTimeout(function () { 135 stream.once('connected', function (req) { 136 console.log('\nrestarted stream') 137 stream.stop() 138 139 exports.checkStreamStopState(stream) 140 141 console.log('\nstopped stream') 142 done() 143 }) 144 145 //restart the stream 146 stream.start() 147 }, 3000) 148 }) 149 150 it('stopping & restarting stream emits to previously assigned callbacks', function (done) { 151 var twit = new Twit(config1); 152 var stream = twit.stream('statuses/sample') 153 154 var started = false 155 var numTweets = 0 156 stream.on('tweet', function (tweet) { 157 process.stdout.write('.') 158 if (!started) { 159 started = true 160 numTweets++ 161 console.log('received tweet', numTweets) 162 163 console.log('stopping stream') 164 stream.stop() 165 166 exports.checkStreamStopState(stream) 167 168 // we've successfully received a new tweet after restarting, test successful 169 if (numTweets === 2) { 170 done() 171 } else { 172 started = false 173 console.log('restarting stream') 174 175 setTimeout(function () { 176 stream.start() 177 }, 1000) 178 } 179 } 180 }) 181 182 stream.on('limit', function (limitMsg) { 183 console.log('limit', limitMsg) 184 }) 185 186 stream.on('disconnect', function (disconnMsg) { 187 console.log('disconnect', disconnMsg) 188 }) 189 190 stream.on('reconnect', function (req, res, ival) { 191 console.log('reconnect. statusCode:', res.statusCode, 'interval:', ival) 192 }) 193 194 stream.on('connect', function (req) { 195 console.log('connect') 196 }) 197 198 }) 199 }) 200 201 describe('streaming API direct message events', function () { 202 var senderScreenName; 203 var receiverScreenName; 204 var twitSender; 205 var twitReceiver; 206 207 // before we send direct messages the user receiving the DM 208 // has to follow the sender. Make this so. 209 before(function (done) { 210 twitSender = new Twit(config1); 211 twitReceiver = new Twit(config2); 212 213 // get sender/receiver names in parallel, then make the receiver follow the sender 214 async.parallel({ 215 // get sender screen name and set it for tests to use 216 getSenderScreenName: function (parNext) { 217 console.log('getting sender user screen_name') 218 219 twitSender.get('account/verify_credentials', { twit_options: { retry: true } }, function (err, reply) { 220 assert(!err, err) 221 222 assert(reply) 223 assert(reply.screen_name) 224 225 senderScreenName = reply.screen_name 226 227 return parNext() 228 }) 229 }, 230 // get receiver screen name and set it for tests to use 231 getReceiverScreenName: function (parNext) { 232 console.log('getting receiver user screen_name') 233 twitReceiver.get('account/verify_credentials', { twit_options: { retry: true } }, function (err, reply) { 234 assert(!err, err) 235 236 assert(reply) 237 assert(reply.screen_name) 238 239 receiverScreenName = reply.screen_name 240 241 return parNext() 242 }) 243 } 244 }, function (err) { 245 assert(!err, err) 246 247 var followParams = { screen_name: senderScreenName } 248 console.log('making receiver user follow the sender user') 249 // make receiver follow sender 250 twitReceiver.post('friendships/create', followParams, function (err, reply) { 251 assert(!err, err) 252 assert(reply.following) 253 254 done() 255 }) 256 }) 257 }) 258 259 it('user_stream `direct_message` event', function (done) { 260 // User A follows User B 261 // User A connects to their user stream 262 // User B posts a DM to User A 263 // User A receives it in their user stream 264 this.timeout(0); 265 266 // build out DM params 267 function makeDmParams () { 268 return { 269 screen_name: receiverScreenName, 270 text: helpers.generateRandomString(10) + ' direct message streaming event test! :-) ' + helpers.generateRandomString(20), 271 twit_options: { 272 retry: true 273 } 274 } 275 } 276 277 var dmIdsReceived = [] 278 var dmIdsSent = [] 279 var sentDmFound = false 280 281 // start listening for user stream events 282 var receiverStream = twitReceiver.stream('user') 283 284 console.log('\nlistening for DMs') 285 // listen for direct_message event and check DM once it's received 286 receiverStream.on('direct_message', function (directMsg) { 287 if (sentDmFound) { 288 // don't call `done` more than once 289 return 290 } 291 292 console.log('got DM event. id:', directMsg.direct_message.id_str) 293 restTest.checkDm(directMsg.direct_message) 294 dmIdsReceived.push(directMsg.direct_message.id_str) 295 296 // make sure one of the DMs sent was found 297 // (we can send multiple DMs if our stream has to reconnect) 298 sentDmFound = dmIdsSent.some(function (dmId) { 299 return dmId == directMsg.direct_message.id_str 300 }) 301 302 if (!sentDmFound) { 303 console.log('this DM doesnt match our test DMs - still waiting for a matching one.') 304 console.log('dmIdsSent', dmIdsSent) 305 return 306 } 307 308 receiverStream.stop() 309 return done() 310 }) 311 312 var lastTimeSent = 0 313 var msToWait = 0 314 var postDmInterval = null 315 316 receiverStream.on('connected', function () { 317 var dmParams = makeDmParams() 318 319 console.log('sending a new DM:', dmParams.text) 320 twitSender.post('direct_messages/new', dmParams, function (err, reply) { 321 assert(!err, err) 322 assert(reply) 323 restTest.checkDm(reply) 324 assert(reply.id_str) 325 // we will check this dm against the reply recieved in the message event 326 dmIdsSent.push(reply.id_str) 327 328 console.log('successfully posted DM:', reply.text, reply.id_str) 329 if (dmIdsReceived.indexOf(reply.id_str) !== -1) { 330 // our response to the DM posting lost the race against the direct_message 331 // listener (we already got the event). So we can finish the test. 332 done() 333 } 334 }) 335 }) 336 337 after(function (done) { 338 console.log('cleaning up DMs:', dmIdsSent) 339 // delete the DMs we posted 340 var deleteDms = dmIdsSent.map(function (dmId) { 341 return function (next) { 342 assert.equal(typeof dmId, 'string') 343 console.log('\ndeleting DM', dmId) 344 var params = { id: dmId, twit_options: { retry: true } } 345 twitSender.post('direct_messages/destroy', params, function (err, reply) { 346 assert(!err, err) 347 restTest.checkDm(reply) 348 assert.equal(reply.id, dmId) 349 return next() 350 }) 351 } 352 }) 353 async.parallel(deleteDms, done) 354 }) 355 }) 356 }) 357 358 describe('streaming API friends preamble', function () { 359 it('returns an array of strings if stringify_friend_ids is true', function (done) { 360 var twit = new Twit(config1); 361 var stream = twit.stream('user', { stringify_friend_ids: true }); 362 stream.on('friends', function (friendsObj) { 363 assert(friendsObj) 364 assert(friendsObj.friends_str) 365 if (friendsObj.friends_str.length) { 366 assert.equal(typeof friendsObj.friends_str[0], 'string') 367 } else { 368 console.log('\nEmpty friends preamble:', friendsObj, '. Make some friends on Twitter! ^_^') 369 } 370 done() 371 }) 372 }) 373 }) 374 375 describe('streaming API bad request', function (done) { 376 it('emits an error for a 401 response', function (done) { 377 var badCredentials = { 378 consumer_key: 'a' 379 , consumer_secret: 'b' 380 , access_token: 'c' 381 , access_token_secret: 'd' 382 } 383 384 var twit = new Twit(badCredentials); 385 386 var stream = twit.stream('statuses/filter', { track : ['foo'] }); 387 388 stream.on('error', function (err) { 389 assert.equal(err.statusCode, 401) 390 assert(err.twitterReply) 391 392 return done() 393 }) 394 }) 395 }) 396 397 describe('streaming API `messages` event', function (done) { 398 var request = require('request'); 399 var originalPost = request.post; 400 var RewiredTwit = rewire('../lib/twitter'); 401 var RewiredStreamingApiConnection = rewire('../lib/streaming-api-connection'); 402 var revertParser, revertTwit; 403 404 var MockParser = function () { 405 var self = this; 406 EventEmitter.call(self); 407 process.nextTick(function () { 408 self.emit('element', {scrub_geo: 'bar'}) 409 self.emit('element', {limit: 'buzz'}) 410 }); 411 } 412 util.inherits(MockParser, EventEmitter); 413 414 before(function () { 415 revertTwit = RewiredTwit.__set__('StreamingAPIConnection', RewiredStreamingApiConnection); 416 revertParser = RewiredStreamingApiConnection.__set__('Parser', MockParser); 417 418 request.post = function () { return new helpers.FakeRequest() } 419 }) 420 421 after(function () { 422 request.post = originalPost; 423 revertTwit(); 424 revertParser(); 425 }) 426 427 it('is returned for 2 different event types', function (done) { 428 var twit = new RewiredTwit(config1); 429 var stream = twit.stream('statuses/sample'); 430 var gotScrubGeo = false; 431 var gotLimit = false; 432 var numMessages = 0; 433 434 var maybeDone = function () { 435 if (gotScrubGeo && gotLimit && numMessages == 2) { 436 done() 437 } 438 } 439 440 stream.on('limit', function () { 441 gotLimit = true; 442 maybeDone(); 443 }); 444 stream.on('scrub_geo', function () { 445 gotScrubGeo = true; 446 maybeDone(); 447 }) 448 449 stream.on('message', function (msg) { 450 numMessages++; 451 maybeDone(); 452 }) 453 }) 454 }) 455 456 describe('streaming reconnect', function (done) { 457 it('correctly implements connection closing backoff', function (done) { 458 var stubPost = function () { 459 var fakeRequest = new helpers.FakeRequest() 460 process.nextTick(function () { 461 fakeRequest.emit('close') 462 }) 463 return fakeRequest 464 } 465 466 var request = require('request') 467 var stubPost = sinon.stub(request, 'post', stubPost) 468 469 var twit = new Twit(config1); 470 var stream = twit.stream('statuses/filter', { track: [ 'fun', 'yolo']}); 471 472 var reconnects = [0, 250, 500, 750] 473 var reconnectCount = -1 474 475 var testDone = false 476 477 stream.on('reconnect', function () { 478 if (testDone) { 479 return 480 } 481 reconnectCount += 1 482 var expectedInterval = reconnects[reconnectCount] 483 484 // make sure our connect interval is correct 485 assert.equal(stream._connectInterval, expectedInterval); 486 487 // simulate immediate reconnect by forcing a new connection (`self._connectInterval` parameter unchanged) 488 stream._startPersistentConnection(); 489 490 if (reconnectCount === reconnects.length -1) { 491 // restore request.post 492 stubPost.restore() 493 testDone = true 494 return done(); 495 } 496 }); 497 }); 498 499 it('correctly implements 420 backoff', function (done) { 500 var stubPost = function () { 501 var fakeRequest = new helpers.FakeRequest() 502 process.nextTick(function () { 503 var fakeResponse = new helpers.FakeResponse(420) 504 fakeRequest.emit('response', fakeResponse) 505 fakeRequest.emit('close') 506 }) 507 return fakeRequest 508 } 509 510 var request = require('request') 511 var stubPost = sinon.stub(request, 'post', stubPost) 512 513 var twit = new Twit(config1); 514 var stream = twit.stream('statuses/filter', { track: [ 'fun', 'yolo']}); 515 516 var reconnects = [60000, 120000, 240000, 480000] 517 var reconnectCount = -1 518 var testComplete = false 519 520 stream.on('reconnect', function (req, res, connectInterval) { 521 if (testComplete) { 522 // prevent race between last connection attempt firing a reconnect and us validating the final 523 // reconnect value in `reconnects` 524 return 525 } 526 527 reconnectCount += 1 528 var expectedInterval = reconnects[reconnectCount] 529 530 // make sure our connect interval is correct 531 assert.equal(stream._connectInterval, connectInterval); 532 assert.equal(stream._connectInterval, expectedInterval); 533 // simulate immediate reconnect by forcing a new connection (`self._connectInterval` parameter unchanged) 534 stream._startPersistentConnection(); 535 536 if (reconnectCount === reconnects.length -1) { 537 // restore request.post 538 stubPost.restore() 539 testComplete = true 540 return done(); 541 } 542 }); 543 }); 544 }); 545 546 describe('Streaming API disconnect message', function (done) { 547 it.skip('results in stopping the stream', function (done) { 548 var stubPost = function () { 549 var fakeRequest = new helpers.FakeRequest() 550 process.nextTick(function () { 551 var body = zlib.gzipSync(JSON.stringify({disconnect: true}) + '\r\n') 552 var fakeResponse = new helpers.FakeResponse(200, body) 553 fakeRequest.emit('response', fakeResponse); 554 fakeResponse.emit('close') 555 }); 556 return fakeRequest 557 } 558 559 var request = require('request') 560 var origRequest = request.post 561 var stubs = sinon.collection 562 stubs.stub(request, 'post', stubPost) 563 564 var twit = new Twit(config1); 565 var stream = twit.stream('statuses/filter', { track: ['fun']}); 566 567 stream.on('disconnect', function (disconnMsg) { 568 stream.stop(); 569 // restore stub 570 request.post = origRequest 571 done(); 572 }) 573 }) 574 }); 575 576 describe.skip('Streaming API Connection limit exceeded message', function (done) { 577 it('results in an `error` event containing the message', function (done) { 578 var errMsg = 'Exceeded connection limit for user'; 579 580 var stubPost = function () { 581 var fakeRequest = new helpers.FakeRequest(); 582 process.nextTick(function () { 583 var body = zlib.gzipSync(errMsg + '\r\n'); 584 var fakeResponse = new helpers.FakeResponse(200, body); 585 fakeRequest.emit('response', fakeResponse); 586 fakeResponse.emit('close'); 587 }); 588 return fakeRequest 589 } 590 591 var request = require('request'); 592 var origRequest = request.post; 593 var stubs = sinon.collection; 594 stubs.stub(request, 'post', stubPost); 595 596 var twit = new Twit(config1); 597 var stream = twit.stream('statuses/filter'); 598 599 stream.on('error', function (err) { 600 assert(err.toString().indexOf(errMsg) !== -1, 'Unexpected error msg:' + errMsg + '.');; 601 stream.stop(); 602 // restore stub 603 request.post = origRequest; 604 done(); 605 }) 606 }) 607 }) 608 609 describe('Streaming API connection management', function () { 610 it('.stop() works in all states', function (done) { 611 var stubPost = function () { 612 var fakeRequest = new helpers.FakeRequest(); 613 process.nextTick(function () { 614 var body = zlib.gzipSync('Foobar\r\n'); 615 var fakeResponse = new helpers.FakeResponse(200, body); 616 fakeRequest.emit('response', fakeResponse); 617 }); 618 return fakeRequest 619 } 620 621 var request = require('request'); 622 var origRequest = request.post; 623 var stubs = sinon.collection; 624 stubs.stub(request, 'post', stubPost); 625 626 var twit = new Twit(config1); 627 628 var stream = twit.stream('statuses/sample'); 629 stream.stop(); 630 console.log('\nStopped. Restarting..'); 631 stream.start(); 632 stream.once('connect', function(request) { 633 console.log('Stream emitted `connect`. Stopping & starting stream..') 634 stream.stop(); 635 636 stream.once('connected', function () { 637 console.log('Stream emitted `connected`. Stopping stream.'); 638 stream.stop(); 639 640 stubs.restore(); 641 done(); 642 }); 643 stream.start(); 644 }); 645 }) 646 })