socket.js (11301B)
1 2 /** 3 * Module dependencies. 4 */ 5 6 var Emitter = require('events').EventEmitter; 7 var parser = require('socket.io-parser'); 8 var hasBin = require('has-binary2'); 9 var url = require('url'); 10 var debug = require('debug')('socket.io:socket'); 11 12 /** 13 * Module exports. 14 */ 15 16 module.exports = exports = Socket; 17 18 /** 19 * Blacklisted events. 20 * 21 * @api public 22 */ 23 24 exports.events = [ 25 'error', 26 'connect', 27 'disconnect', 28 'disconnecting', 29 'newListener', 30 'removeListener' 31 ]; 32 33 /** 34 * Flags. 35 * 36 * @api private 37 */ 38 39 var flags = [ 40 'json', 41 'volatile', 42 'broadcast', 43 'local' 44 ]; 45 46 /** 47 * `EventEmitter#emit` reference. 48 */ 49 50 var emit = Emitter.prototype.emit; 51 52 /** 53 * Interface to a `Client` for a given `Namespace`. 54 * 55 * @param {Namespace} nsp 56 * @param {Client} client 57 * @api public 58 */ 59 60 function Socket(nsp, client, query){ 61 this.nsp = nsp; 62 this.server = nsp.server; 63 this.adapter = this.nsp.adapter; 64 this.id = nsp.name !== '/' ? nsp.name + '#' + client.id : client.id; 65 this.client = client; 66 this.conn = client.conn; 67 this.rooms = {}; 68 this.acks = {}; 69 this.connected = true; 70 this.disconnected = false; 71 this.handshake = this.buildHandshake(query); 72 this.fns = []; 73 this.flags = {}; 74 this._rooms = []; 75 } 76 77 /** 78 * Inherits from `EventEmitter`. 79 */ 80 81 Socket.prototype.__proto__ = Emitter.prototype; 82 83 /** 84 * Apply flags from `Socket`. 85 */ 86 87 flags.forEach(function(flag){ 88 Object.defineProperty(Socket.prototype, flag, { 89 get: function() { 90 this.flags[flag] = true; 91 return this; 92 } 93 }); 94 }); 95 96 /** 97 * `request` engine.io shortcut. 98 * 99 * @api public 100 */ 101 102 Object.defineProperty(Socket.prototype, 'request', { 103 get: function() { 104 return this.conn.request; 105 } 106 }); 107 108 /** 109 * Builds the `handshake` BC object 110 * 111 * @api private 112 */ 113 114 Socket.prototype.buildHandshake = function(query){ 115 var self = this; 116 function buildQuery(){ 117 var requestQuery = url.parse(self.request.url, true).query; 118 //if socket-specific query exist, replace query strings in requestQuery 119 return Object.assign({}, query, requestQuery); 120 } 121 return { 122 headers: this.request.headers, 123 time: (new Date) + '', 124 address: this.conn.remoteAddress, 125 xdomain: !!this.request.headers.origin, 126 secure: !!this.request.connection.encrypted, 127 issued: +(new Date), 128 url: this.request.url, 129 query: buildQuery() 130 }; 131 }; 132 133 /** 134 * Emits to this client. 135 * 136 * @return {Socket} self 137 * @api public 138 */ 139 140 Socket.prototype.emit = function(ev){ 141 if (~exports.events.indexOf(ev)) { 142 emit.apply(this, arguments); 143 return this; 144 } 145 146 var args = Array.prototype.slice.call(arguments); 147 var packet = { 148 type: (this.flags.binary !== undefined ? this.flags.binary : hasBin(args)) ? parser.BINARY_EVENT : parser.EVENT, 149 data: args 150 }; 151 152 // access last argument to see if it's an ACK callback 153 if (typeof args[args.length - 1] === 'function') { 154 if (this._rooms.length || this.flags.broadcast) { 155 throw new Error('Callbacks are not supported when broadcasting'); 156 } 157 158 debug('emitting packet with ack id %d', this.nsp.ids); 159 this.acks[this.nsp.ids] = args.pop(); 160 packet.id = this.nsp.ids++; 161 } 162 163 var rooms = this._rooms.slice(0); 164 var flags = Object.assign({}, this.flags); 165 166 // reset flags 167 this._rooms = []; 168 this.flags = {}; 169 170 if (rooms.length || flags.broadcast) { 171 this.adapter.broadcast(packet, { 172 except: [this.id], 173 rooms: rooms, 174 flags: flags 175 }); 176 } else { 177 // dispatch packet 178 this.packet(packet, flags); 179 } 180 return this; 181 }; 182 183 /** 184 * Targets a room when broadcasting. 185 * 186 * @param {String} name 187 * @return {Socket} self 188 * @api public 189 */ 190 191 Socket.prototype.to = 192 Socket.prototype.in = function(name){ 193 if (!~this._rooms.indexOf(name)) this._rooms.push(name); 194 return this; 195 }; 196 197 /** 198 * Sends a `message` event. 199 * 200 * @return {Socket} self 201 * @api public 202 */ 203 204 Socket.prototype.send = 205 Socket.prototype.write = function(){ 206 var args = Array.prototype.slice.call(arguments); 207 args.unshift('message'); 208 this.emit.apply(this, args); 209 return this; 210 }; 211 212 /** 213 * Writes a packet. 214 * 215 * @param {Object} packet object 216 * @param {Object} opts options 217 * @api private 218 */ 219 220 Socket.prototype.packet = function(packet, opts){ 221 packet.nsp = this.nsp.name; 222 opts = opts || {}; 223 opts.compress = false !== opts.compress; 224 this.client.packet(packet, opts); 225 }; 226 227 /** 228 * Joins a room. 229 * 230 * @param {String|Array} room or array of rooms 231 * @param {Function} fn optional, callback 232 * @return {Socket} self 233 * @api private 234 */ 235 236 Socket.prototype.join = function(rooms, fn){ 237 debug('joining room %s', rooms); 238 var self = this; 239 if (!Array.isArray(rooms)) { 240 rooms = [rooms]; 241 } 242 rooms = rooms.filter(function (room) { 243 return !self.rooms.hasOwnProperty(room); 244 }); 245 if (!rooms.length) { 246 fn && fn(null); 247 return this; 248 } 249 this.adapter.addAll(this.id, rooms, function(err){ 250 if (err) return fn && fn(err); 251 debug('joined room %s', rooms); 252 rooms.forEach(function (room) { 253 self.rooms[room] = room; 254 }); 255 fn && fn(null); 256 }); 257 return this; 258 }; 259 260 /** 261 * Leaves a room. 262 * 263 * @param {String} room 264 * @param {Function} fn optional, callback 265 * @return {Socket} self 266 * @api private 267 */ 268 269 Socket.prototype.leave = function(room, fn){ 270 debug('leave room %s', room); 271 var self = this; 272 this.adapter.del(this.id, room, function(err){ 273 if (err) return fn && fn(err); 274 debug('left room %s', room); 275 delete self.rooms[room]; 276 fn && fn(null); 277 }); 278 return this; 279 }; 280 281 /** 282 * Leave all rooms. 283 * 284 * @api private 285 */ 286 287 Socket.prototype.leaveAll = function(){ 288 this.adapter.delAll(this.id); 289 this.rooms = {}; 290 }; 291 292 /** 293 * Called by `Namespace` upon successful 294 * middleware execution (ie: authorization). 295 * Socket is added to namespace array before 296 * call to join, so adapters can access it. 297 * 298 * @api private 299 */ 300 301 Socket.prototype.onconnect = function(){ 302 debug('socket connected - writing packet'); 303 this.nsp.connected[this.id] = this; 304 this.join(this.id); 305 var skip = this.nsp.name === '/' && this.nsp.fns.length === 0; 306 if (skip) { 307 debug('packet already sent in initial handshake'); 308 } else { 309 this.packet({ type: parser.CONNECT }); 310 } 311 }; 312 313 /** 314 * Called with each packet. Called by `Client`. 315 * 316 * @param {Object} packet 317 * @api private 318 */ 319 320 Socket.prototype.onpacket = function(packet){ 321 debug('got packet %j', packet); 322 switch (packet.type) { 323 case parser.EVENT: 324 this.onevent(packet); 325 break; 326 327 case parser.BINARY_EVENT: 328 this.onevent(packet); 329 break; 330 331 case parser.ACK: 332 this.onack(packet); 333 break; 334 335 case parser.BINARY_ACK: 336 this.onack(packet); 337 break; 338 339 case parser.DISCONNECT: 340 this.ondisconnect(); 341 break; 342 343 case parser.ERROR: 344 this.onerror(new Error(packet.data)); 345 } 346 }; 347 348 /** 349 * Called upon event packet. 350 * 351 * @param {Object} packet object 352 * @api private 353 */ 354 355 Socket.prototype.onevent = function(packet){ 356 var args = packet.data || []; 357 debug('emitting event %j', args); 358 359 if (null != packet.id) { 360 debug('attaching ack callback to event'); 361 args.push(this.ack(packet.id)); 362 } 363 364 this.dispatch(args); 365 }; 366 367 /** 368 * Produces an ack callback to emit with an event. 369 * 370 * @param {Number} id packet id 371 * @api private 372 */ 373 374 Socket.prototype.ack = function(id){ 375 var self = this; 376 var sent = false; 377 return function(){ 378 // prevent double callbacks 379 if (sent) return; 380 var args = Array.prototype.slice.call(arguments); 381 debug('sending ack %j', args); 382 383 self.packet({ 384 id: id, 385 type: hasBin(args) ? parser.BINARY_ACK : parser.ACK, 386 data: args 387 }); 388 389 sent = true; 390 }; 391 }; 392 393 /** 394 * Called upon ack packet. 395 * 396 * @api private 397 */ 398 399 Socket.prototype.onack = function(packet){ 400 var ack = this.acks[packet.id]; 401 if ('function' == typeof ack) { 402 debug('calling ack %s with %j', packet.id, packet.data); 403 ack.apply(this, packet.data); 404 delete this.acks[packet.id]; 405 } else { 406 debug('bad ack %s', packet.id); 407 } 408 }; 409 410 /** 411 * Called upon client disconnect packet. 412 * 413 * @api private 414 */ 415 416 Socket.prototype.ondisconnect = function(){ 417 debug('got disconnect packet'); 418 this.onclose('client namespace disconnect'); 419 }; 420 421 /** 422 * Handles a client error. 423 * 424 * @api private 425 */ 426 427 Socket.prototype.onerror = function(err){ 428 if (this.listeners('error').length) { 429 this.emit('error', err); 430 } else { 431 console.error('Missing error handler on `socket`.'); 432 console.error(err.stack); 433 } 434 }; 435 436 /** 437 * Called upon closing. Called by `Client`. 438 * 439 * @param {String} reason 440 * @throw {Error} optional error object 441 * @api private 442 */ 443 444 Socket.prototype.onclose = function(reason){ 445 if (!this.connected) return this; 446 debug('closing socket - reason %s', reason); 447 this.emit('disconnecting', reason); 448 this.leaveAll(); 449 this.nsp.remove(this); 450 this.client.remove(this); 451 this.connected = false; 452 this.disconnected = true; 453 delete this.nsp.connected[this.id]; 454 this.emit('disconnect', reason); 455 }; 456 457 /** 458 * Produces an `error` packet. 459 * 460 * @param {Object} err error object 461 * @api private 462 */ 463 464 Socket.prototype.error = function(err){ 465 this.packet({ type: parser.ERROR, data: err }); 466 }; 467 468 /** 469 * Disconnects this client. 470 * 471 * @param {Boolean} close if `true`, closes the underlying connection 472 * @return {Socket} self 473 * @api public 474 */ 475 476 Socket.prototype.disconnect = function(close){ 477 if (!this.connected) return this; 478 if (close) { 479 this.client.disconnect(); 480 } else { 481 this.packet({ type: parser.DISCONNECT }); 482 this.onclose('server namespace disconnect'); 483 } 484 return this; 485 }; 486 487 /** 488 * Sets the compress flag. 489 * 490 * @param {Boolean} compress if `true`, compresses the sending data 491 * @return {Socket} self 492 * @api public 493 */ 494 495 Socket.prototype.compress = function(compress){ 496 this.flags.compress = compress; 497 return this; 498 }; 499 500 /** 501 * Sets the binary flag 502 * 503 * @param {Boolean} Encode as if it has binary data if `true`, Encode as if it doesnt have binary data if `false` 504 * @return {Socket} self 505 * @api public 506 */ 507 508 Socket.prototype.binary = function (binary) { 509 this.flags.binary = binary; 510 return this; 511 }; 512 513 /** 514 * Dispatch incoming event to socket listeners. 515 * 516 * @param {Array} event that will get emitted 517 * @api private 518 */ 519 520 Socket.prototype.dispatch = function(event){ 521 debug('dispatching an event %j', event); 522 var self = this; 523 function dispatchSocket(err) { 524 process.nextTick(function(){ 525 if (err) { 526 return self.error(err.data || err.message); 527 } 528 emit.apply(self, event); 529 }); 530 } 531 this.run(event, dispatchSocket); 532 }; 533 534 /** 535 * Sets up socket middleware. 536 * 537 * @param {Function} middleware function (event, next) 538 * @return {Socket} self 539 * @api public 540 */ 541 542 Socket.prototype.use = function(fn){ 543 this.fns.push(fn); 544 return this; 545 }; 546 547 /** 548 * Executes the middleware for an incoming event. 549 * 550 * @param {Array} event that will get emitted 551 * @param {Function} last fn call in the middleware 552 * @api private 553 */ 554 Socket.prototype.run = function(event, fn){ 555 var fns = this.fns.slice(0); 556 if (!fns.length) return fn(null); 557 558 function run(i){ 559 fns[i](event, function(err){ 560 // upon error, short-circuit 561 if (err) return fn(err); 562 563 // if no middleware left, summon callback 564 if (!fns[i + 1]) return fn(null); 565 566 // go on to next 567 run(i + 1); 568 }); 569 } 570 571 run(0); 572 };