socket.js (11597B)
1 /** 2 * Module dependencies. 3 */ 4 5 var EventEmitter = require('events').EventEmitter; 6 var util = require('util'); 7 var debug = require('debug')('engine:socket'); 8 9 /** 10 * Module exports. 11 */ 12 13 module.exports = Socket; 14 15 /** 16 * Client class (abstract). 17 * 18 * @api private 19 */ 20 21 function Socket (id, server, transport, req) { 22 this.id = id; 23 this.server = server; 24 this.upgrading = false; 25 this.upgraded = false; 26 this.readyState = 'opening'; 27 this.writeBuffer = []; 28 this.packetsFn = []; 29 this.sentCallbackFn = []; 30 this.cleanupFn = []; 31 this.request = req; 32 33 // Cache IP since it might not be in the req later 34 if (req.websocket && req.websocket._socket) { 35 this.remoteAddress = req.websocket._socket.remoteAddress; 36 } else { 37 this.remoteAddress = req.connection.remoteAddress; 38 } 39 40 this.checkIntervalTimer = null; 41 this.upgradeTimeoutTimer = null; 42 this.pingTimeoutTimer = null; 43 44 this.setTransport(transport); 45 this.onOpen(); 46 } 47 48 /** 49 * Inherits from EventEmitter. 50 */ 51 52 util.inherits(Socket, EventEmitter); 53 54 /** 55 * Called upon transport considered open. 56 * 57 * @api private 58 */ 59 60 Socket.prototype.onOpen = function () { 61 this.readyState = 'open'; 62 63 // sends an `open` packet 64 this.transport.sid = this.id; 65 this.sendPacket('open', JSON.stringify({ 66 sid: this.id, 67 upgrades: this.getAvailableUpgrades(), 68 pingInterval: this.server.pingInterval, 69 pingTimeout: this.server.pingTimeout 70 })); 71 72 if (this.server.initialPacket) { 73 this.sendPacket('message', this.server.initialPacket); 74 } 75 76 this.emit('open'); 77 this.setPingTimeout(); 78 }; 79 80 /** 81 * Called upon transport packet. 82 * 83 * @param {Object} packet 84 * @api private 85 */ 86 87 Socket.prototype.onPacket = function (packet) { 88 if ('open' === this.readyState) { 89 // export packet event 90 debug('packet'); 91 this.emit('packet', packet); 92 93 // Reset ping timeout on any packet, incoming data is a good sign of 94 // other side's liveness 95 this.setPingTimeout(); 96 97 switch (packet.type) { 98 case 'ping': 99 debug('got ping'); 100 this.sendPacket('pong'); 101 this.emit('heartbeat'); 102 break; 103 104 case 'error': 105 this.onClose('parse error'); 106 break; 107 108 case 'message': 109 this.emit('data', packet.data); 110 this.emit('message', packet.data); 111 break; 112 } 113 } else { 114 debug('packet received with closed socket'); 115 } 116 }; 117 118 /** 119 * Called upon transport error. 120 * 121 * @param {Error} error object 122 * @api private 123 */ 124 125 Socket.prototype.onError = function (err) { 126 debug('transport error'); 127 this.onClose('transport error', err); 128 }; 129 130 /** 131 * Sets and resets ping timeout timer based on client pings. 132 * 133 * @api private 134 */ 135 136 Socket.prototype.setPingTimeout = function () { 137 var self = this; 138 clearTimeout(self.pingTimeoutTimer); 139 self.pingTimeoutTimer = setTimeout(function () { 140 self.onClose('ping timeout'); 141 }, self.server.pingInterval + self.server.pingTimeout); 142 }; 143 144 /** 145 * Attaches handlers for the given transport. 146 * 147 * @param {Transport} transport 148 * @api private 149 */ 150 151 Socket.prototype.setTransport = function (transport) { 152 var onError = this.onError.bind(this); 153 var onPacket = this.onPacket.bind(this); 154 var flush = this.flush.bind(this); 155 var onClose = this.onClose.bind(this, 'transport close'); 156 157 this.transport = transport; 158 this.transport.once('error', onError); 159 this.transport.on('packet', onPacket); 160 this.transport.on('drain', flush); 161 this.transport.once('close', onClose); 162 // this function will manage packet events (also message callbacks) 163 this.setupSendCallback(); 164 165 this.cleanupFn.push(function () { 166 transport.removeListener('error', onError); 167 transport.removeListener('packet', onPacket); 168 transport.removeListener('drain', flush); 169 transport.removeListener('close', onClose); 170 }); 171 }; 172 173 /** 174 * Upgrades socket to the given transport 175 * 176 * @param {Transport} transport 177 * @api private 178 */ 179 180 Socket.prototype.maybeUpgrade = function (transport) { 181 debug('might upgrade socket transport from "%s" to "%s"' 182 , this.transport.name, transport.name); 183 184 this.upgrading = true; 185 186 var self = this; 187 188 // set transport upgrade timer 189 self.upgradeTimeoutTimer = setTimeout(function () { 190 debug('client did not complete upgrade - closing transport'); 191 cleanup(); 192 if ('open' === transport.readyState) { 193 transport.close(); 194 } 195 }, this.server.upgradeTimeout); 196 197 function onPacket (packet) { 198 if ('ping' === packet.type && 'probe' === packet.data) { 199 transport.send([{ type: 'pong', data: 'probe' }]); 200 self.emit('upgrading', transport); 201 clearInterval(self.checkIntervalTimer); 202 self.checkIntervalTimer = setInterval(check, 100); 203 } else if ('upgrade' === packet.type && self.readyState !== 'closed') { 204 debug('got upgrade packet - upgrading'); 205 cleanup(); 206 self.transport.discard(); 207 self.upgraded = true; 208 self.clearTransport(); 209 self.setTransport(transport); 210 self.emit('upgrade', transport); 211 self.setPingTimeout(); 212 self.flush(); 213 if (self.readyState === 'closing') { 214 transport.close(function () { 215 self.onClose('forced close'); 216 }); 217 } 218 } else { 219 cleanup(); 220 transport.close(); 221 } 222 } 223 224 // we force a polling cycle to ensure a fast upgrade 225 function check () { 226 if ('polling' === self.transport.name && self.transport.writable) { 227 debug('writing a noop packet to polling for fast upgrade'); 228 self.transport.send([{ type: 'noop' }]); 229 } 230 } 231 232 function cleanup () { 233 self.upgrading = false; 234 235 clearInterval(self.checkIntervalTimer); 236 self.checkIntervalTimer = null; 237 238 clearTimeout(self.upgradeTimeoutTimer); 239 self.upgradeTimeoutTimer = null; 240 241 transport.removeListener('packet', onPacket); 242 transport.removeListener('close', onTransportClose); 243 transport.removeListener('error', onError); 244 self.removeListener('close', onClose); 245 } 246 247 function onError (err) { 248 debug('client did not complete upgrade - %s', err); 249 cleanup(); 250 transport.close(); 251 transport = null; 252 } 253 254 function onTransportClose () { 255 onError('transport closed'); 256 } 257 258 function onClose () { 259 onError('socket closed'); 260 } 261 262 transport.on('packet', onPacket); 263 transport.once('close', onTransportClose); 264 transport.once('error', onError); 265 266 self.once('close', onClose); 267 }; 268 269 /** 270 * Clears listeners and timers associated with current transport. 271 * 272 * @api private 273 */ 274 275 Socket.prototype.clearTransport = function () { 276 var cleanup; 277 278 var toCleanUp = this.cleanupFn.length; 279 280 for (var i = 0; i < toCleanUp; i++) { 281 cleanup = this.cleanupFn.shift(); 282 cleanup(); 283 } 284 285 // silence further transport errors and prevent uncaught exceptions 286 this.transport.on('error', function () { 287 debug('error triggered by discarded transport'); 288 }); 289 290 // ensure transport won't stay open 291 this.transport.close(); 292 293 clearTimeout(this.pingTimeoutTimer); 294 }; 295 296 /** 297 * Called upon transport considered closed. 298 * Possible reasons: `ping timeout`, `client error`, `parse error`, 299 * `transport error`, `server close`, `transport close` 300 */ 301 302 Socket.prototype.onClose = function (reason, description) { 303 if ('closed' !== this.readyState) { 304 this.readyState = 'closed'; 305 clearTimeout(this.pingTimeoutTimer); 306 clearInterval(this.checkIntervalTimer); 307 this.checkIntervalTimer = null; 308 clearTimeout(this.upgradeTimeoutTimer); 309 var self = this; 310 // clean writeBuffer in next tick, so developers can still 311 // grab the writeBuffer on 'close' event 312 process.nextTick(function () { 313 self.writeBuffer = []; 314 }); 315 this.packetsFn = []; 316 this.sentCallbackFn = []; 317 this.clearTransport(); 318 this.emit('close', reason, description); 319 } 320 }; 321 322 /** 323 * Setup and manage send callback 324 * 325 * @api private 326 */ 327 328 Socket.prototype.setupSendCallback = function () { 329 var self = this; 330 this.transport.on('drain', onDrain); 331 332 this.cleanupFn.push(function () { 333 self.transport.removeListener('drain', onDrain); 334 }); 335 336 // the message was sent successfully, execute the callback 337 function onDrain () { 338 if (self.sentCallbackFn.length > 0) { 339 var seqFn = self.sentCallbackFn.splice(0, 1)[0]; 340 if ('function' === typeof seqFn) { 341 debug('executing send callback'); 342 seqFn(self.transport); 343 } else if (Array.isArray(seqFn)) { 344 debug('executing batch send callback'); 345 for (var l = seqFn.length, i = 0; i < l; i++) { 346 if ('function' === typeof seqFn[i]) { 347 seqFn[i](self.transport); 348 } 349 } 350 } 351 } 352 } 353 }; 354 355 /** 356 * Sends a message packet. 357 * 358 * @param {String} message 359 * @param {Object} options 360 * @param {Function} callback 361 * @return {Socket} for chaining 362 * @api public 363 */ 364 365 Socket.prototype.send = 366 Socket.prototype.write = function (data, options, callback) { 367 this.sendPacket('message', data, options, callback); 368 return this; 369 }; 370 371 /** 372 * Sends a packet. 373 * 374 * @param {String} packet type 375 * @param {String} optional, data 376 * @param {Object} options 377 * @api private 378 */ 379 380 Socket.prototype.sendPacket = function (type, data, options, callback) { 381 if ('function' === typeof options) { 382 callback = options; 383 options = null; 384 } 385 386 options = options || {}; 387 options.compress = false !== options.compress; 388 389 if ('closing' !== this.readyState && 'closed' !== this.readyState) { 390 debug('sending packet "%s" (%s)', type, data); 391 392 var packet = { 393 type: type, 394 options: options 395 }; 396 if (data) packet.data = data; 397 398 // exports packetCreate event 399 this.emit('packetCreate', packet); 400 401 this.writeBuffer.push(packet); 402 403 // add send callback to object, if defined 404 if (callback) this.packetsFn.push(callback); 405 406 this.flush(); 407 } 408 }; 409 410 /** 411 * Attempts to flush the packets buffer. 412 * 413 * @api private 414 */ 415 416 Socket.prototype.flush = function () { 417 if ('closed' !== this.readyState && 418 this.transport.writable && 419 this.writeBuffer.length) { 420 debug('flushing buffer to transport'); 421 this.emit('flush', this.writeBuffer); 422 this.server.emit('flush', this, this.writeBuffer); 423 var wbuf = this.writeBuffer; 424 this.writeBuffer = []; 425 if (!this.transport.supportsFraming) { 426 this.sentCallbackFn.push(this.packetsFn); 427 } else { 428 this.sentCallbackFn.push.apply(this.sentCallbackFn, this.packetsFn); 429 } 430 this.packetsFn = []; 431 this.transport.send(wbuf); 432 this.emit('drain'); 433 this.server.emit('drain', this); 434 } 435 }; 436 437 /** 438 * Get available upgrades for this socket. 439 * 440 * @api private 441 */ 442 443 Socket.prototype.getAvailableUpgrades = function () { 444 var availableUpgrades = []; 445 var allUpgrades = this.server.upgrades(this.transport.name); 446 for (var i = 0, l = allUpgrades.length; i < l; ++i) { 447 var upg = allUpgrades[i]; 448 if (this.server.transports.indexOf(upg) !== -1) { 449 availableUpgrades.push(upg); 450 } 451 } 452 return availableUpgrades; 453 }; 454 455 /** 456 * Closes the socket and underlying transport. 457 * 458 * @param {Boolean} optional, discard 459 * @return {Socket} for chaining 460 * @api public 461 */ 462 463 Socket.prototype.close = function (discard) { 464 if ('open' !== this.readyState) return; 465 466 this.readyState = 'closing'; 467 468 if (this.writeBuffer.length) { 469 this.once('drain', this.closeTransport.bind(this, discard)); 470 return; 471 } 472 473 this.closeTransport(discard); 474 }; 475 476 /** 477 * Closes the underlying transport. 478 * 479 * @param {Boolean} discard 480 * @api private 481 */ 482 483 Socket.prototype.closeTransport = function (discard) { 484 if (discard) this.transport.discard(); 485 this.transport.close(this.onClose.bind(this, 'forced close')); 486 };