manager.js (12520B)
1 2 /** 3 * Module dependencies. 4 */ 5 6 var eio = require('engine.io-client'); 7 var Socket = require('./socket'); 8 var Emitter = require('component-emitter'); 9 var parser = require('socket.io-parser'); 10 var on = require('./on'); 11 var bind = require('component-bind'); 12 var debug = require('debug')('socket.io-client:manager'); 13 var indexOf = require('indexof'); 14 var Backoff = require('backo2'); 15 16 /** 17 * IE6+ hasOwnProperty 18 */ 19 20 var has = Object.prototype.hasOwnProperty; 21 22 /** 23 * Module exports 24 */ 25 26 module.exports = Manager; 27 28 /** 29 * `Manager` constructor. 30 * 31 * @param {String} engine instance or engine uri/opts 32 * @param {Object} options 33 * @api public 34 */ 35 36 function Manager (uri, opts) { 37 if (!(this instanceof Manager)) return new Manager(uri, opts); 38 if (uri && ('object' === typeof uri)) { 39 opts = uri; 40 uri = undefined; 41 } 42 opts = opts || {}; 43 44 opts.path = opts.path || '/socket.io'; 45 this.nsps = {}; 46 this.subs = []; 47 this.opts = opts; 48 this.reconnection(opts.reconnection !== false); 49 this.reconnectionAttempts(opts.reconnectionAttempts || Infinity); 50 this.reconnectionDelay(opts.reconnectionDelay || 1000); 51 this.reconnectionDelayMax(opts.reconnectionDelayMax || 5000); 52 this.randomizationFactor(opts.randomizationFactor || 0.5); 53 this.backoff = new Backoff({ 54 min: this.reconnectionDelay(), 55 max: this.reconnectionDelayMax(), 56 jitter: this.randomizationFactor() 57 }); 58 this.timeout(null == opts.timeout ? 20000 : opts.timeout); 59 this.readyState = 'closed'; 60 this.uri = uri; 61 this.connecting = []; 62 this.lastPing = null; 63 this.encoding = false; 64 this.packetBuffer = []; 65 var _parser = opts.parser || parser; 66 this.encoder = new _parser.Encoder(); 67 this.decoder = new _parser.Decoder(); 68 this.autoConnect = opts.autoConnect !== false; 69 if (this.autoConnect) this.open(); 70 } 71 72 /** 73 * Propagate given event to sockets and emit on `this` 74 * 75 * @api private 76 */ 77 78 Manager.prototype.emitAll = function () { 79 this.emit.apply(this, arguments); 80 for (var nsp in this.nsps) { 81 if (has.call(this.nsps, nsp)) { 82 this.nsps[nsp].emit.apply(this.nsps[nsp], arguments); 83 } 84 } 85 }; 86 87 /** 88 * Update `socket.id` of all sockets 89 * 90 * @api private 91 */ 92 93 Manager.prototype.updateSocketIds = function () { 94 for (var nsp in this.nsps) { 95 if (has.call(this.nsps, nsp)) { 96 this.nsps[nsp].id = this.generateId(nsp); 97 } 98 } 99 }; 100 101 /** 102 * generate `socket.id` for the given `nsp` 103 * 104 * @param {String} nsp 105 * @return {String} 106 * @api private 107 */ 108 109 Manager.prototype.generateId = function (nsp) { 110 return (nsp === '/' ? '' : (nsp + '#')) + this.engine.id; 111 }; 112 113 /** 114 * Mix in `Emitter`. 115 */ 116 117 Emitter(Manager.prototype); 118 119 /** 120 * Sets the `reconnection` config. 121 * 122 * @param {Boolean} true/false if it should automatically reconnect 123 * @return {Manager} self or value 124 * @api public 125 */ 126 127 Manager.prototype.reconnection = function (v) { 128 if (!arguments.length) return this._reconnection; 129 this._reconnection = !!v; 130 return this; 131 }; 132 133 /** 134 * Sets the reconnection attempts config. 135 * 136 * @param {Number} max reconnection attempts before giving up 137 * @return {Manager} self or value 138 * @api public 139 */ 140 141 Manager.prototype.reconnectionAttempts = function (v) { 142 if (!arguments.length) return this._reconnectionAttempts; 143 this._reconnectionAttempts = v; 144 return this; 145 }; 146 147 /** 148 * Sets the delay between reconnections. 149 * 150 * @param {Number} delay 151 * @return {Manager} self or value 152 * @api public 153 */ 154 155 Manager.prototype.reconnectionDelay = function (v) { 156 if (!arguments.length) return this._reconnectionDelay; 157 this._reconnectionDelay = v; 158 this.backoff && this.backoff.setMin(v); 159 return this; 160 }; 161 162 Manager.prototype.randomizationFactor = function (v) { 163 if (!arguments.length) return this._randomizationFactor; 164 this._randomizationFactor = v; 165 this.backoff && this.backoff.setJitter(v); 166 return this; 167 }; 168 169 /** 170 * Sets the maximum delay between reconnections. 171 * 172 * @param {Number} delay 173 * @return {Manager} self or value 174 * @api public 175 */ 176 177 Manager.prototype.reconnectionDelayMax = function (v) { 178 if (!arguments.length) return this._reconnectionDelayMax; 179 this._reconnectionDelayMax = v; 180 this.backoff && this.backoff.setMax(v); 181 return this; 182 }; 183 184 /** 185 * Sets the connection timeout. `false` to disable 186 * 187 * @return {Manager} self or value 188 * @api public 189 */ 190 191 Manager.prototype.timeout = function (v) { 192 if (!arguments.length) return this._timeout; 193 this._timeout = v; 194 return this; 195 }; 196 197 /** 198 * Starts trying to reconnect if reconnection is enabled and we have not 199 * started reconnecting yet 200 * 201 * @api private 202 */ 203 204 Manager.prototype.maybeReconnectOnOpen = function () { 205 // Only try to reconnect if it's the first time we're connecting 206 if (!this.reconnecting && this._reconnection && this.backoff.attempts === 0) { 207 // keeps reconnection from firing twice for the same reconnection loop 208 this.reconnect(); 209 } 210 }; 211 212 /** 213 * Sets the current transport `socket`. 214 * 215 * @param {Function} optional, callback 216 * @return {Manager} self 217 * @api public 218 */ 219 220 Manager.prototype.open = 221 Manager.prototype.connect = function (fn, opts) { 222 debug('readyState %s', this.readyState); 223 if (~this.readyState.indexOf('open')) return this; 224 225 debug('opening %s', this.uri); 226 this.engine = eio(this.uri, this.opts); 227 var socket = this.engine; 228 var self = this; 229 this.readyState = 'opening'; 230 this.skipReconnect = false; 231 232 // emit `open` 233 var openSub = on(socket, 'open', function () { 234 self.onopen(); 235 fn && fn(); 236 }); 237 238 // emit `connect_error` 239 var errorSub = on(socket, 'error', function (data) { 240 debug('connect_error'); 241 self.cleanup(); 242 self.readyState = 'closed'; 243 self.emitAll('connect_error', data); 244 if (fn) { 245 var err = new Error('Connection error'); 246 err.data = data; 247 fn(err); 248 } else { 249 // Only do this if there is no fn to handle the error 250 self.maybeReconnectOnOpen(); 251 } 252 }); 253 254 // emit `connect_timeout` 255 if (false !== this._timeout) { 256 var timeout = this._timeout; 257 debug('connect attempt will timeout after %d', timeout); 258 259 // set timer 260 var timer = setTimeout(function () { 261 debug('connect attempt timed out after %d', timeout); 262 openSub.destroy(); 263 socket.close(); 264 socket.emit('error', 'timeout'); 265 self.emitAll('connect_timeout', timeout); 266 }, timeout); 267 268 this.subs.push({ 269 destroy: function () { 270 clearTimeout(timer); 271 } 272 }); 273 } 274 275 this.subs.push(openSub); 276 this.subs.push(errorSub); 277 278 return this; 279 }; 280 281 /** 282 * Called upon transport open. 283 * 284 * @api private 285 */ 286 287 Manager.prototype.onopen = function () { 288 debug('open'); 289 290 // clear old subs 291 this.cleanup(); 292 293 // mark as open 294 this.readyState = 'open'; 295 this.emit('open'); 296 297 // add new subs 298 var socket = this.engine; 299 this.subs.push(on(socket, 'data', bind(this, 'ondata'))); 300 this.subs.push(on(socket, 'ping', bind(this, 'onping'))); 301 this.subs.push(on(socket, 'pong', bind(this, 'onpong'))); 302 this.subs.push(on(socket, 'error', bind(this, 'onerror'))); 303 this.subs.push(on(socket, 'close', bind(this, 'onclose'))); 304 this.subs.push(on(this.decoder, 'decoded', bind(this, 'ondecoded'))); 305 }; 306 307 /** 308 * Called upon a ping. 309 * 310 * @api private 311 */ 312 313 Manager.prototype.onping = function () { 314 this.lastPing = new Date(); 315 this.emitAll('ping'); 316 }; 317 318 /** 319 * Called upon a packet. 320 * 321 * @api private 322 */ 323 324 Manager.prototype.onpong = function () { 325 this.emitAll('pong', new Date() - this.lastPing); 326 }; 327 328 /** 329 * Called with data. 330 * 331 * @api private 332 */ 333 334 Manager.prototype.ondata = function (data) { 335 this.decoder.add(data); 336 }; 337 338 /** 339 * Called when parser fully decodes a packet. 340 * 341 * @api private 342 */ 343 344 Manager.prototype.ondecoded = function (packet) { 345 this.emit('packet', packet); 346 }; 347 348 /** 349 * Called upon socket error. 350 * 351 * @api private 352 */ 353 354 Manager.prototype.onerror = function (err) { 355 debug('error', err); 356 this.emitAll('error', err); 357 }; 358 359 /** 360 * Creates a new socket for the given `nsp`. 361 * 362 * @return {Socket} 363 * @api public 364 */ 365 366 Manager.prototype.socket = function (nsp, opts) { 367 var socket = this.nsps[nsp]; 368 if (!socket) { 369 socket = new Socket(this, nsp, opts); 370 this.nsps[nsp] = socket; 371 var self = this; 372 socket.on('connecting', onConnecting); 373 socket.on('connect', function () { 374 socket.id = self.generateId(nsp); 375 }); 376 377 if (this.autoConnect) { 378 // manually call here since connecting event is fired before listening 379 onConnecting(); 380 } 381 } 382 383 function onConnecting () { 384 if (!~indexOf(self.connecting, socket)) { 385 self.connecting.push(socket); 386 } 387 } 388 389 return socket; 390 }; 391 392 /** 393 * Called upon a socket close. 394 * 395 * @param {Socket} socket 396 */ 397 398 Manager.prototype.destroy = function (socket) { 399 var index = indexOf(this.connecting, socket); 400 if (~index) this.connecting.splice(index, 1); 401 if (this.connecting.length) return; 402 403 this.close(); 404 }; 405 406 /** 407 * Writes a packet. 408 * 409 * @param {Object} packet 410 * @api private 411 */ 412 413 Manager.prototype.packet = function (packet) { 414 debug('writing packet %j', packet); 415 var self = this; 416 if (packet.query && packet.type === 0) packet.nsp += '?' + packet.query; 417 418 if (!self.encoding) { 419 // encode, then write to engine with result 420 self.encoding = true; 421 this.encoder.encode(packet, function (encodedPackets) { 422 for (var i = 0; i < encodedPackets.length; i++) { 423 self.engine.write(encodedPackets[i], packet.options); 424 } 425 self.encoding = false; 426 self.processPacketQueue(); 427 }); 428 } else { // add packet to the queue 429 self.packetBuffer.push(packet); 430 } 431 }; 432 433 /** 434 * If packet buffer is non-empty, begins encoding the 435 * next packet in line. 436 * 437 * @api private 438 */ 439 440 Manager.prototype.processPacketQueue = function () { 441 if (this.packetBuffer.length > 0 && !this.encoding) { 442 var pack = this.packetBuffer.shift(); 443 this.packet(pack); 444 } 445 }; 446 447 /** 448 * Clean up transport subscriptions and packet buffer. 449 * 450 * @api private 451 */ 452 453 Manager.prototype.cleanup = function () { 454 debug('cleanup'); 455 456 var subsLength = this.subs.length; 457 for (var i = 0; i < subsLength; i++) { 458 var sub = this.subs.shift(); 459 sub.destroy(); 460 } 461 462 this.packetBuffer = []; 463 this.encoding = false; 464 this.lastPing = null; 465 466 this.decoder.destroy(); 467 }; 468 469 /** 470 * Close the current socket. 471 * 472 * @api private 473 */ 474 475 Manager.prototype.close = 476 Manager.prototype.disconnect = function () { 477 debug('disconnect'); 478 this.skipReconnect = true; 479 this.reconnecting = false; 480 if ('opening' === this.readyState) { 481 // `onclose` will not fire because 482 // an open event never happened 483 this.cleanup(); 484 } 485 this.backoff.reset(); 486 this.readyState = 'closed'; 487 if (this.engine) this.engine.close(); 488 }; 489 490 /** 491 * Called upon engine close. 492 * 493 * @api private 494 */ 495 496 Manager.prototype.onclose = function (reason) { 497 debug('onclose'); 498 499 this.cleanup(); 500 this.backoff.reset(); 501 this.readyState = 'closed'; 502 this.emit('close', reason); 503 504 if (this._reconnection && !this.skipReconnect) { 505 this.reconnect(); 506 } 507 }; 508 509 /** 510 * Attempt a reconnection. 511 * 512 * @api private 513 */ 514 515 Manager.prototype.reconnect = function () { 516 if (this.reconnecting || this.skipReconnect) return this; 517 518 var self = this; 519 520 if (this.backoff.attempts >= this._reconnectionAttempts) { 521 debug('reconnect failed'); 522 this.backoff.reset(); 523 this.emitAll('reconnect_failed'); 524 this.reconnecting = false; 525 } else { 526 var delay = this.backoff.duration(); 527 debug('will wait %dms before reconnect attempt', delay); 528 529 this.reconnecting = true; 530 var timer = setTimeout(function () { 531 if (self.skipReconnect) return; 532 533 debug('attempting reconnect'); 534 self.emitAll('reconnect_attempt', self.backoff.attempts); 535 self.emitAll('reconnecting', self.backoff.attempts); 536 537 // check again for the case socket closed in above events 538 if (self.skipReconnect) return; 539 540 self.open(function (err) { 541 if (err) { 542 debug('reconnect attempt error'); 543 self.reconnecting = false; 544 self.reconnect(); 545 self.emitAll('reconnect_error', err.data); 546 } else { 547 debug('reconnect success'); 548 self.onreconnect(); 549 } 550 }); 551 }, delay); 552 553 this.subs.push({ 554 destroy: function () { 555 clearTimeout(timer); 556 } 557 }); 558 } 559 }; 560 561 /** 562 * Called upon successful reconnect. 563 * 564 * @api private 565 */ 566 567 Manager.prototype.onreconnect = function () { 568 var attempt = this.backoff.attempts; 569 this.reconnecting = false; 570 this.backoff.reset(); 571 this.updateSocketIds(); 572 this.emitAll('reconnect', attempt); 573 };