socket.js (19106B)
1 /** 2 * Module dependencies. 3 */ 4 5 var transports = require('./transports/index'); 6 var Emitter = require('component-emitter'); 7 var debug = require('debug')('engine.io-client:socket'); 8 var index = require('indexof'); 9 var parser = require('engine.io-parser'); 10 var parseuri = require('parseuri'); 11 var parseqs = require('parseqs'); 12 13 /** 14 * Module exports. 15 */ 16 17 module.exports = Socket; 18 19 /** 20 * Socket constructor. 21 * 22 * @param {String|Object} uri or options 23 * @param {Object} options 24 * @api public 25 */ 26 27 function Socket (uri, opts) { 28 if (!(this instanceof Socket)) return new Socket(uri, opts); 29 30 opts = opts || {}; 31 32 if (uri && 'object' === typeof uri) { 33 opts = uri; 34 uri = null; 35 } 36 37 if (uri) { 38 uri = parseuri(uri); 39 opts.hostname = uri.host; 40 opts.secure = uri.protocol === 'https' || uri.protocol === 'wss'; 41 opts.port = uri.port; 42 if (uri.query) opts.query = uri.query; 43 } else if (opts.host) { 44 opts.hostname = parseuri(opts.host).host; 45 } 46 47 this.secure = null != opts.secure ? opts.secure 48 : (typeof location !== 'undefined' && 'https:' === location.protocol); 49 50 if (opts.hostname && !opts.port) { 51 // if no port is specified manually, use the protocol default 52 opts.port = this.secure ? '443' : '80'; 53 } 54 55 this.agent = opts.agent || false; 56 this.hostname = opts.hostname || 57 (typeof location !== 'undefined' ? location.hostname : 'localhost'); 58 this.port = opts.port || (typeof location !== 'undefined' && location.port 59 ? location.port 60 : (this.secure ? 443 : 80)); 61 this.query = opts.query || {}; 62 if ('string' === typeof this.query) this.query = parseqs.decode(this.query); 63 this.upgrade = false !== opts.upgrade; 64 this.path = (opts.path || '/engine.io').replace(/\/$/, '') + '/'; 65 this.forceJSONP = !!opts.forceJSONP; 66 this.jsonp = false !== opts.jsonp; 67 this.forceBase64 = !!opts.forceBase64; 68 this.enablesXDR = !!opts.enablesXDR; 69 this.withCredentials = false !== opts.withCredentials; 70 this.timestampParam = opts.timestampParam || 't'; 71 this.timestampRequests = opts.timestampRequests; 72 this.transports = opts.transports || ['polling', 'websocket']; 73 this.transportOptions = opts.transportOptions || {}; 74 this.readyState = ''; 75 this.writeBuffer = []; 76 this.prevBufferLen = 0; 77 this.policyPort = opts.policyPort || 843; 78 this.rememberUpgrade = opts.rememberUpgrade || false; 79 this.binaryType = null; 80 this.onlyBinaryUpgrades = opts.onlyBinaryUpgrades; 81 this.perMessageDeflate = false !== opts.perMessageDeflate ? (opts.perMessageDeflate || {}) : false; 82 83 if (true === this.perMessageDeflate) this.perMessageDeflate = {}; 84 if (this.perMessageDeflate && null == this.perMessageDeflate.threshold) { 85 this.perMessageDeflate.threshold = 1024; 86 } 87 88 // SSL options for Node.js client 89 this.pfx = opts.pfx || null; 90 this.key = opts.key || null; 91 this.passphrase = opts.passphrase || null; 92 this.cert = opts.cert || null; 93 this.ca = opts.ca || null; 94 this.ciphers = opts.ciphers || null; 95 this.rejectUnauthorized = opts.rejectUnauthorized === undefined ? true : opts.rejectUnauthorized; 96 this.forceNode = !!opts.forceNode; 97 98 // detect ReactNative environment 99 this.isReactNative = (typeof navigator !== 'undefined' && typeof navigator.product === 'string' && navigator.product.toLowerCase() === 'reactnative'); 100 101 // other options for Node.js or ReactNative client 102 if (typeof self === 'undefined' || this.isReactNative) { 103 if (opts.extraHeaders && Object.keys(opts.extraHeaders).length > 0) { 104 this.extraHeaders = opts.extraHeaders; 105 } 106 107 if (opts.localAddress) { 108 this.localAddress = opts.localAddress; 109 } 110 } 111 112 // set on handshake 113 this.id = null; 114 this.upgrades = null; 115 this.pingInterval = null; 116 this.pingTimeout = null; 117 118 // set on heartbeat 119 this.pingIntervalTimer = null; 120 this.pingTimeoutTimer = null; 121 122 this.open(); 123 } 124 125 Socket.priorWebsocketSuccess = false; 126 127 /** 128 * Mix in `Emitter`. 129 */ 130 131 Emitter(Socket.prototype); 132 133 /** 134 * Protocol version. 135 * 136 * @api public 137 */ 138 139 Socket.protocol = parser.protocol; // this is an int 140 141 /** 142 * Expose deps for legacy compatibility 143 * and standalone browser access. 144 */ 145 146 Socket.Socket = Socket; 147 Socket.Transport = require('./transport'); 148 Socket.transports = require('./transports/index'); 149 Socket.parser = require('engine.io-parser'); 150 151 /** 152 * Creates transport of the given type. 153 * 154 * @param {String} transport name 155 * @return {Transport} 156 * @api private 157 */ 158 159 Socket.prototype.createTransport = function (name) { 160 debug('creating transport "%s"', name); 161 var query = clone(this.query); 162 163 // append engine.io protocol identifier 164 query.EIO = parser.protocol; 165 166 // transport name 167 query.transport = name; 168 169 // per-transport options 170 var options = this.transportOptions[name] || {}; 171 172 // session id if we already have one 173 if (this.id) query.sid = this.id; 174 175 var transport = new transports[name]({ 176 query: query, 177 socket: this, 178 agent: options.agent || this.agent, 179 hostname: options.hostname || this.hostname, 180 port: options.port || this.port, 181 secure: options.secure || this.secure, 182 path: options.path || this.path, 183 forceJSONP: options.forceJSONP || this.forceJSONP, 184 jsonp: options.jsonp || this.jsonp, 185 forceBase64: options.forceBase64 || this.forceBase64, 186 enablesXDR: options.enablesXDR || this.enablesXDR, 187 withCredentials: options.withCredentials || this.withCredentials, 188 timestampRequests: options.timestampRequests || this.timestampRequests, 189 timestampParam: options.timestampParam || this.timestampParam, 190 policyPort: options.policyPort || this.policyPort, 191 pfx: options.pfx || this.pfx, 192 key: options.key || this.key, 193 passphrase: options.passphrase || this.passphrase, 194 cert: options.cert || this.cert, 195 ca: options.ca || this.ca, 196 ciphers: options.ciphers || this.ciphers, 197 rejectUnauthorized: options.rejectUnauthorized || this.rejectUnauthorized, 198 perMessageDeflate: options.perMessageDeflate || this.perMessageDeflate, 199 extraHeaders: options.extraHeaders || this.extraHeaders, 200 forceNode: options.forceNode || this.forceNode, 201 localAddress: options.localAddress || this.localAddress, 202 requestTimeout: options.requestTimeout || this.requestTimeout, 203 protocols: options.protocols || void (0), 204 isReactNative: this.isReactNative 205 }); 206 207 return transport; 208 }; 209 210 function clone (obj) { 211 var o = {}; 212 for (var i in obj) { 213 if (obj.hasOwnProperty(i)) { 214 o[i] = obj[i]; 215 } 216 } 217 return o; 218 } 219 220 /** 221 * Initializes transport to use and starts probe. 222 * 223 * @api private 224 */ 225 Socket.prototype.open = function () { 226 var transport; 227 if (this.rememberUpgrade && Socket.priorWebsocketSuccess && this.transports.indexOf('websocket') !== -1) { 228 transport = 'websocket'; 229 } else if (0 === this.transports.length) { 230 // Emit error on next tick so it can be listened to 231 var self = this; 232 setTimeout(function () { 233 self.emit('error', 'No transports available'); 234 }, 0); 235 return; 236 } else { 237 transport = this.transports[0]; 238 } 239 this.readyState = 'opening'; 240 241 // Retry with the next transport if the transport is disabled (jsonp: false) 242 try { 243 transport = this.createTransport(transport); 244 } catch (e) { 245 this.transports.shift(); 246 this.open(); 247 return; 248 } 249 250 transport.open(); 251 this.setTransport(transport); 252 }; 253 254 /** 255 * Sets the current transport. Disables the existing one (if any). 256 * 257 * @api private 258 */ 259 260 Socket.prototype.setTransport = function (transport) { 261 debug('setting transport %s', transport.name); 262 var self = this; 263 264 if (this.transport) { 265 debug('clearing existing transport %s', this.transport.name); 266 this.transport.removeAllListeners(); 267 } 268 269 // set up transport 270 this.transport = transport; 271 272 // set up transport listeners 273 transport 274 .on('drain', function () { 275 self.onDrain(); 276 }) 277 .on('packet', function (packet) { 278 self.onPacket(packet); 279 }) 280 .on('error', function (e) { 281 self.onError(e); 282 }) 283 .on('close', function () { 284 self.onClose('transport close'); 285 }); 286 }; 287 288 /** 289 * Probes a transport. 290 * 291 * @param {String} transport name 292 * @api private 293 */ 294 295 Socket.prototype.probe = function (name) { 296 debug('probing transport "%s"', name); 297 var transport = this.createTransport(name, { probe: 1 }); 298 var failed = false; 299 var self = this; 300 301 Socket.priorWebsocketSuccess = false; 302 303 function onTransportOpen () { 304 if (self.onlyBinaryUpgrades) { 305 var upgradeLosesBinary = !this.supportsBinary && self.transport.supportsBinary; 306 failed = failed || upgradeLosesBinary; 307 } 308 if (failed) return; 309 310 debug('probe transport "%s" opened', name); 311 transport.send([{ type: 'ping', data: 'probe' }]); 312 transport.once('packet', function (msg) { 313 if (failed) return; 314 if ('pong' === msg.type && 'probe' === msg.data) { 315 debug('probe transport "%s" pong', name); 316 self.upgrading = true; 317 self.emit('upgrading', transport); 318 if (!transport) return; 319 Socket.priorWebsocketSuccess = 'websocket' === transport.name; 320 321 debug('pausing current transport "%s"', self.transport.name); 322 self.transport.pause(function () { 323 if (failed) return; 324 if ('closed' === self.readyState) return; 325 debug('changing transport and sending upgrade packet'); 326 327 cleanup(); 328 329 self.setTransport(transport); 330 transport.send([{ type: 'upgrade' }]); 331 self.emit('upgrade', transport); 332 transport = null; 333 self.upgrading = false; 334 self.flush(); 335 }); 336 } else { 337 debug('probe transport "%s" failed', name); 338 var err = new Error('probe error'); 339 err.transport = transport.name; 340 self.emit('upgradeError', err); 341 } 342 }); 343 } 344 345 function freezeTransport () { 346 if (failed) return; 347 348 // Any callback called by transport should be ignored since now 349 failed = true; 350 351 cleanup(); 352 353 transport.close(); 354 transport = null; 355 } 356 357 // Handle any error that happens while probing 358 function onerror (err) { 359 var error = new Error('probe error: ' + err); 360 error.transport = transport.name; 361 362 freezeTransport(); 363 364 debug('probe transport "%s" failed because of error: %s', name, err); 365 366 self.emit('upgradeError', error); 367 } 368 369 function onTransportClose () { 370 onerror('transport closed'); 371 } 372 373 // When the socket is closed while we're probing 374 function onclose () { 375 onerror('socket closed'); 376 } 377 378 // When the socket is upgraded while we're probing 379 function onupgrade (to) { 380 if (transport && to.name !== transport.name) { 381 debug('"%s" works - aborting "%s"', to.name, transport.name); 382 freezeTransport(); 383 } 384 } 385 386 // Remove all listeners on the transport and on self 387 function cleanup () { 388 transport.removeListener('open', onTransportOpen); 389 transport.removeListener('error', onerror); 390 transport.removeListener('close', onTransportClose); 391 self.removeListener('close', onclose); 392 self.removeListener('upgrading', onupgrade); 393 } 394 395 transport.once('open', onTransportOpen); 396 transport.once('error', onerror); 397 transport.once('close', onTransportClose); 398 399 this.once('close', onclose); 400 this.once('upgrading', onupgrade); 401 402 transport.open(); 403 }; 404 405 /** 406 * Called when connection is deemed open. 407 * 408 * @api public 409 */ 410 411 Socket.prototype.onOpen = function () { 412 debug('socket open'); 413 this.readyState = 'open'; 414 Socket.priorWebsocketSuccess = 'websocket' === this.transport.name; 415 this.emit('open'); 416 this.flush(); 417 418 // we check for `readyState` in case an `open` 419 // listener already closed the socket 420 if ('open' === this.readyState && this.upgrade && this.transport.pause) { 421 debug('starting upgrade probes'); 422 for (var i = 0, l = this.upgrades.length; i < l; i++) { 423 this.probe(this.upgrades[i]); 424 } 425 } 426 }; 427 428 /** 429 * Handles a packet. 430 * 431 * @api private 432 */ 433 434 Socket.prototype.onPacket = function (packet) { 435 if ('opening' === this.readyState || 'open' === this.readyState || 436 'closing' === this.readyState) { 437 debug('socket receive: type "%s", data "%s"', packet.type, packet.data); 438 439 this.emit('packet', packet); 440 441 // Socket is live - any packet counts 442 this.emit('heartbeat'); 443 444 switch (packet.type) { 445 case 'open': 446 this.onHandshake(JSON.parse(packet.data)); 447 break; 448 449 case 'pong': 450 this.setPing(); 451 this.emit('pong'); 452 break; 453 454 case 'error': 455 var err = new Error('server error'); 456 err.code = packet.data; 457 this.onError(err); 458 break; 459 460 case 'message': 461 this.emit('data', packet.data); 462 this.emit('message', packet.data); 463 break; 464 } 465 } else { 466 debug('packet received with socket readyState "%s"', this.readyState); 467 } 468 }; 469 470 /** 471 * Called upon handshake completion. 472 * 473 * @param {Object} handshake obj 474 * @api private 475 */ 476 477 Socket.prototype.onHandshake = function (data) { 478 this.emit('handshake', data); 479 this.id = data.sid; 480 this.transport.query.sid = data.sid; 481 this.upgrades = this.filterUpgrades(data.upgrades); 482 this.pingInterval = data.pingInterval; 483 this.pingTimeout = data.pingTimeout; 484 this.onOpen(); 485 // In case open handler closes socket 486 if ('closed' === this.readyState) return; 487 this.setPing(); 488 489 // Prolong liveness of socket on heartbeat 490 this.removeListener('heartbeat', this.onHeartbeat); 491 this.on('heartbeat', this.onHeartbeat); 492 }; 493 494 /** 495 * Resets ping timeout. 496 * 497 * @api private 498 */ 499 500 Socket.prototype.onHeartbeat = function (timeout) { 501 clearTimeout(this.pingTimeoutTimer); 502 var self = this; 503 self.pingTimeoutTimer = setTimeout(function () { 504 if ('closed' === self.readyState) return; 505 self.onClose('ping timeout'); 506 }, timeout || (self.pingInterval + self.pingTimeout)); 507 }; 508 509 /** 510 * Pings server every `this.pingInterval` and expects response 511 * within `this.pingTimeout` or closes connection. 512 * 513 * @api private 514 */ 515 516 Socket.prototype.setPing = function () { 517 var self = this; 518 clearTimeout(self.pingIntervalTimer); 519 self.pingIntervalTimer = setTimeout(function () { 520 debug('writing ping packet - expecting pong within %sms', self.pingTimeout); 521 self.ping(); 522 self.onHeartbeat(self.pingTimeout); 523 }, self.pingInterval); 524 }; 525 526 /** 527 * Sends a ping packet. 528 * 529 * @api private 530 */ 531 532 Socket.prototype.ping = function () { 533 var self = this; 534 this.sendPacket('ping', function () { 535 self.emit('ping'); 536 }); 537 }; 538 539 /** 540 * Called on `drain` event 541 * 542 * @api private 543 */ 544 545 Socket.prototype.onDrain = function () { 546 this.writeBuffer.splice(0, this.prevBufferLen); 547 548 // setting prevBufferLen = 0 is very important 549 // for example, when upgrading, upgrade packet is sent over, 550 // and a nonzero prevBufferLen could cause problems on `drain` 551 this.prevBufferLen = 0; 552 553 if (0 === this.writeBuffer.length) { 554 this.emit('drain'); 555 } else { 556 this.flush(); 557 } 558 }; 559 560 /** 561 * Flush write buffers. 562 * 563 * @api private 564 */ 565 566 Socket.prototype.flush = function () { 567 if ('closed' !== this.readyState && this.transport.writable && 568 !this.upgrading && this.writeBuffer.length) { 569 debug('flushing %d packets in socket', this.writeBuffer.length); 570 this.transport.send(this.writeBuffer); 571 // keep track of current length of writeBuffer 572 // splice writeBuffer and callbackBuffer on `drain` 573 this.prevBufferLen = this.writeBuffer.length; 574 this.emit('flush'); 575 } 576 }; 577 578 /** 579 * Sends a message. 580 * 581 * @param {String} message. 582 * @param {Function} callback function. 583 * @param {Object} options. 584 * @return {Socket} for chaining. 585 * @api public 586 */ 587 588 Socket.prototype.write = 589 Socket.prototype.send = function (msg, options, fn) { 590 this.sendPacket('message', msg, options, fn); 591 return this; 592 }; 593 594 /** 595 * Sends a packet. 596 * 597 * @param {String} packet type. 598 * @param {String} data. 599 * @param {Object} options. 600 * @param {Function} callback function. 601 * @api private 602 */ 603 604 Socket.prototype.sendPacket = function (type, data, options, fn) { 605 if ('function' === typeof data) { 606 fn = data; 607 data = undefined; 608 } 609 610 if ('function' === typeof options) { 611 fn = options; 612 options = null; 613 } 614 615 if ('closing' === this.readyState || 'closed' === this.readyState) { 616 return; 617 } 618 619 options = options || {}; 620 options.compress = false !== options.compress; 621 622 var packet = { 623 type: type, 624 data: data, 625 options: options 626 }; 627 this.emit('packetCreate', packet); 628 this.writeBuffer.push(packet); 629 if (fn) this.once('flush', fn); 630 this.flush(); 631 }; 632 633 /** 634 * Closes the connection. 635 * 636 * @api private 637 */ 638 639 Socket.prototype.close = function () { 640 if ('opening' === this.readyState || 'open' === this.readyState) { 641 this.readyState = 'closing'; 642 643 var self = this; 644 645 if (this.writeBuffer.length) { 646 this.once('drain', function () { 647 if (this.upgrading) { 648 waitForUpgrade(); 649 } else { 650 close(); 651 } 652 }); 653 } else if (this.upgrading) { 654 waitForUpgrade(); 655 } else { 656 close(); 657 } 658 } 659 660 function close () { 661 self.onClose('forced close'); 662 debug('socket closing - telling transport to close'); 663 self.transport.close(); 664 } 665 666 function cleanupAndClose () { 667 self.removeListener('upgrade', cleanupAndClose); 668 self.removeListener('upgradeError', cleanupAndClose); 669 close(); 670 } 671 672 function waitForUpgrade () { 673 // wait for upgrade to finish since we can't send packets while pausing a transport 674 self.once('upgrade', cleanupAndClose); 675 self.once('upgradeError', cleanupAndClose); 676 } 677 678 return this; 679 }; 680 681 /** 682 * Called upon transport error 683 * 684 * @api private 685 */ 686 687 Socket.prototype.onError = function (err) { 688 debug('socket error %j', err); 689 Socket.priorWebsocketSuccess = false; 690 this.emit('error', err); 691 this.onClose('transport error', err); 692 }; 693 694 /** 695 * Called upon transport close. 696 * 697 * @api private 698 */ 699 700 Socket.prototype.onClose = function (reason, desc) { 701 if ('opening' === this.readyState || 'open' === this.readyState || 'closing' === this.readyState) { 702 debug('socket close with reason: "%s"', reason); 703 var self = this; 704 705 // clear timers 706 clearTimeout(this.pingIntervalTimer); 707 clearTimeout(this.pingTimeoutTimer); 708 709 // stop event from firing again for transport 710 this.transport.removeAllListeners('close'); 711 712 // ensure transport won't stay open 713 this.transport.close(); 714 715 // ignore further transport communication 716 this.transport.removeAllListeners(); 717 718 // set ready state 719 this.readyState = 'closed'; 720 721 // clear session id 722 this.id = null; 723 724 // emit close event 725 this.emit('close', reason, desc); 726 727 // clean buffers after, so users can still 728 // grab the buffers on `close` event 729 self.writeBuffer = []; 730 self.prevBufferLen = 0; 731 } 732 }; 733 734 /** 735 * Filters upgrades, returning only those matching client transports. 736 * 737 * @param {Array} server upgrades 738 * @api private 739 * 740 */ 741 742 Socket.prototype.filterUpgrades = function (upgrades) { 743 var filteredUpgrades = []; 744 for (var i = 0, j = upgrades.length; i < j; i++) { 745 if (~index(this.transports, upgrades[i])) filteredUpgrades.push(upgrades[i]); 746 } 747 return filteredUpgrades; 748 };