twitst4tz

twitter statistics web application
Log | Files | Refs | README | LICENSE

socket.js (11301B)


      1 
      2 /**
      3  * Module dependencies.
      4  */
      5 
      6 var Emitter = require('events').EventEmitter;
      7 var parser = require('socket.io-parser');
      8 var hasBin = require('has-binary2');
      9 var url = require('url');
     10 var debug = require('debug')('socket.io:socket');
     11 
     12 /**
     13  * Module exports.
     14  */
     15 
     16 module.exports = exports = Socket;
     17 
     18 /**
     19  * Blacklisted events.
     20  *
     21  * @api public
     22  */
     23 
     24 exports.events = [
     25   'error',
     26   'connect',
     27   'disconnect',
     28   'disconnecting',
     29   'newListener',
     30   'removeListener'
     31 ];
     32 
     33 /**
     34  * Flags.
     35  *
     36  * @api private
     37  */
     38 
     39 var flags = [
     40   'json',
     41   'volatile',
     42   'broadcast',
     43   'local'
     44 ];
     45 
     46 /**
     47  * `EventEmitter#emit` reference.
     48  */
     49 
     50 var emit = Emitter.prototype.emit;
     51 
     52 /**
     53  * Interface to a `Client` for a given `Namespace`.
     54  *
     55  * @param {Namespace} nsp
     56  * @param {Client} client
     57  * @api public
     58  */
     59 
     60 function Socket(nsp, client, query){
     61   this.nsp = nsp;
     62   this.server = nsp.server;
     63   this.adapter = this.nsp.adapter;
     64   this.id = nsp.name !== '/' ? nsp.name + '#' + client.id : client.id;
     65   this.client = client;
     66   this.conn = client.conn;
     67   this.rooms = {};
     68   this.acks = {};
     69   this.connected = true;
     70   this.disconnected = false;
     71   this.handshake = this.buildHandshake(query);
     72   this.fns = [];
     73   this.flags = {};
     74   this._rooms = [];
     75 }
     76 
     77 /**
     78  * Inherits from `EventEmitter`.
     79  */
     80 
     81 Socket.prototype.__proto__ = Emitter.prototype;
     82 
     83 /**
     84  * Apply flags from `Socket`.
     85  */
     86 
     87 flags.forEach(function(flag){
     88   Object.defineProperty(Socket.prototype, flag, {
     89     get: function() {
     90       this.flags[flag] = true;
     91       return this;
     92     }
     93   });
     94 });
     95 
     96 /**
     97  * `request` engine.io shortcut.
     98  *
     99  * @api public
    100  */
    101 
    102 Object.defineProperty(Socket.prototype, 'request', {
    103   get: function() {
    104     return this.conn.request;
    105   }
    106 });
    107 
    108 /**
    109  * Builds the `handshake` BC object
    110  *
    111  * @api private
    112  */
    113 
    114 Socket.prototype.buildHandshake = function(query){
    115   var self = this;
    116   function buildQuery(){
    117     var requestQuery = url.parse(self.request.url, true).query;
    118     //if socket-specific query exist, replace query strings in requestQuery
    119     return Object.assign({}, query, requestQuery);
    120   }
    121   return {
    122     headers: this.request.headers,
    123     time: (new Date) + '',
    124     address: this.conn.remoteAddress,
    125     xdomain: !!this.request.headers.origin,
    126     secure: !!this.request.connection.encrypted,
    127     issued: +(new Date),
    128     url: this.request.url,
    129     query: buildQuery()
    130   };
    131 };
    132 
    133 /**
    134  * Emits to this client.
    135  *
    136  * @return {Socket} self
    137  * @api public
    138  */
    139 
    140 Socket.prototype.emit = function(ev){
    141   if (~exports.events.indexOf(ev)) {
    142     emit.apply(this, arguments);
    143     return this;
    144   }
    145 
    146   var args = Array.prototype.slice.call(arguments);
    147   var packet = {
    148     type: (this.flags.binary !== undefined ? this.flags.binary : hasBin(args)) ? parser.BINARY_EVENT : parser.EVENT,
    149     data: args
    150   };
    151 
    152   // access last argument to see if it's an ACK callback
    153   if (typeof args[args.length - 1] === 'function') {
    154     if (this._rooms.length || this.flags.broadcast) {
    155       throw new Error('Callbacks are not supported when broadcasting');
    156     }
    157 
    158     debug('emitting packet with ack id %d', this.nsp.ids);
    159     this.acks[this.nsp.ids] = args.pop();
    160     packet.id = this.nsp.ids++;
    161   }
    162 
    163   var rooms = this._rooms.slice(0);
    164   var flags = Object.assign({}, this.flags);
    165 
    166   // reset flags
    167   this._rooms = [];
    168   this.flags = {};
    169 
    170   if (rooms.length || flags.broadcast) {
    171     this.adapter.broadcast(packet, {
    172       except: [this.id],
    173       rooms: rooms,
    174       flags: flags
    175     });
    176   } else {
    177     // dispatch packet
    178     this.packet(packet, flags);
    179   }
    180   return this;
    181 };
    182 
    183 /**
    184  * Targets a room when broadcasting.
    185  *
    186  * @param {String} name
    187  * @return {Socket} self
    188  * @api public
    189  */
    190 
    191 Socket.prototype.to =
    192 Socket.prototype.in = function(name){
    193   if (!~this._rooms.indexOf(name)) this._rooms.push(name);
    194   return this;
    195 };
    196 
    197 /**
    198  * Sends a `message` event.
    199  *
    200  * @return {Socket} self
    201  * @api public
    202  */
    203 
    204 Socket.prototype.send =
    205 Socket.prototype.write = function(){
    206   var args = Array.prototype.slice.call(arguments);
    207   args.unshift('message');
    208   this.emit.apply(this, args);
    209   return this;
    210 };
    211 
    212 /**
    213  * Writes a packet.
    214  *
    215  * @param {Object} packet object
    216  * @param {Object} opts options
    217  * @api private
    218  */
    219 
    220 Socket.prototype.packet = function(packet, opts){
    221   packet.nsp = this.nsp.name;
    222   opts = opts || {};
    223   opts.compress = false !== opts.compress;
    224   this.client.packet(packet, opts);
    225 };
    226 
    227 /**
    228  * Joins a room.
    229  *
    230  * @param {String|Array} room or array of rooms
    231  * @param {Function} fn optional, callback
    232  * @return {Socket} self
    233  * @api private
    234  */
    235 
    236 Socket.prototype.join = function(rooms, fn){
    237   debug('joining room %s', rooms);
    238   var self = this;
    239   if (!Array.isArray(rooms)) {
    240     rooms = [rooms];
    241   }
    242   rooms = rooms.filter(function (room) {
    243     return !self.rooms.hasOwnProperty(room);
    244   });
    245   if (!rooms.length) {
    246     fn && fn(null);
    247     return this;
    248   }
    249   this.adapter.addAll(this.id, rooms, function(err){
    250     if (err) return fn && fn(err);
    251     debug('joined room %s', rooms);
    252     rooms.forEach(function (room) {
    253       self.rooms[room] = room;
    254     });
    255     fn && fn(null);
    256   });
    257   return this;
    258 };
    259 
    260 /**
    261  * Leaves a room.
    262  *
    263  * @param {String} room
    264  * @param {Function} fn optional, callback
    265  * @return {Socket} self
    266  * @api private
    267  */
    268 
    269 Socket.prototype.leave = function(room, fn){
    270   debug('leave room %s', room);
    271   var self = this;
    272   this.adapter.del(this.id, room, function(err){
    273     if (err) return fn && fn(err);
    274     debug('left room %s', room);
    275     delete self.rooms[room];
    276     fn && fn(null);
    277   });
    278   return this;
    279 };
    280 
    281 /**
    282  * Leave all rooms.
    283  *
    284  * @api private
    285  */
    286 
    287 Socket.prototype.leaveAll = function(){
    288   this.adapter.delAll(this.id);
    289   this.rooms = {};
    290 };
    291 
    292 /**
    293  * Called by `Namespace` upon successful
    294  * middleware execution (ie: authorization).
    295  * Socket is added to namespace array before
    296  * call to join, so adapters can access it.
    297  *
    298  * @api private
    299  */
    300 
    301 Socket.prototype.onconnect = function(){
    302   debug('socket connected - writing packet');
    303   this.nsp.connected[this.id] = this;
    304   this.join(this.id);
    305   var skip = this.nsp.name === '/' && this.nsp.fns.length === 0;
    306   if (skip) {
    307     debug('packet already sent in initial handshake');
    308   } else {
    309     this.packet({ type: parser.CONNECT });
    310   }
    311 };
    312 
    313 /**
    314  * Called with each packet. Called by `Client`.
    315  *
    316  * @param {Object} packet
    317  * @api private
    318  */
    319 
    320 Socket.prototype.onpacket = function(packet){
    321   debug('got packet %j', packet);
    322   switch (packet.type) {
    323     case parser.EVENT:
    324       this.onevent(packet);
    325       break;
    326 
    327     case parser.BINARY_EVENT:
    328       this.onevent(packet);
    329       break;
    330 
    331     case parser.ACK:
    332       this.onack(packet);
    333       break;
    334 
    335     case parser.BINARY_ACK:
    336       this.onack(packet);
    337       break;
    338 
    339     case parser.DISCONNECT:
    340       this.ondisconnect();
    341       break;
    342 
    343     case parser.ERROR:
    344       this.onerror(new Error(packet.data));
    345   }
    346 };
    347 
    348 /**
    349  * Called upon event packet.
    350  *
    351  * @param {Object} packet object
    352  * @api private
    353  */
    354 
    355 Socket.prototype.onevent = function(packet){
    356   var args = packet.data || [];
    357   debug('emitting event %j', args);
    358 
    359   if (null != packet.id) {
    360     debug('attaching ack callback to event');
    361     args.push(this.ack(packet.id));
    362   }
    363 
    364   this.dispatch(args);
    365 };
    366 
    367 /**
    368  * Produces an ack callback to emit with an event.
    369  *
    370  * @param {Number} id packet id
    371  * @api private
    372  */
    373 
    374 Socket.prototype.ack = function(id){
    375   var self = this;
    376   var sent = false;
    377   return function(){
    378     // prevent double callbacks
    379     if (sent) return;
    380     var args = Array.prototype.slice.call(arguments);
    381     debug('sending ack %j', args);
    382 
    383     self.packet({
    384       id: id,
    385       type: hasBin(args) ? parser.BINARY_ACK : parser.ACK,
    386       data: args
    387     });
    388 
    389     sent = true;
    390   };
    391 };
    392 
    393 /**
    394  * Called upon ack packet.
    395  *
    396  * @api private
    397  */
    398 
    399 Socket.prototype.onack = function(packet){
    400   var ack = this.acks[packet.id];
    401   if ('function' == typeof ack) {
    402     debug('calling ack %s with %j', packet.id, packet.data);
    403     ack.apply(this, packet.data);
    404     delete this.acks[packet.id];
    405   } else {
    406     debug('bad ack %s', packet.id);
    407   }
    408 };
    409 
    410 /**
    411  * Called upon client disconnect packet.
    412  *
    413  * @api private
    414  */
    415 
    416 Socket.prototype.ondisconnect = function(){
    417   debug('got disconnect packet');
    418   this.onclose('client namespace disconnect');
    419 };
    420 
    421 /**
    422  * Handles a client error.
    423  *
    424  * @api private
    425  */
    426 
    427 Socket.prototype.onerror = function(err){
    428   if (this.listeners('error').length) {
    429     this.emit('error', err);
    430   } else {
    431     console.error('Missing error handler on `socket`.');
    432     console.error(err.stack);
    433   }
    434 };
    435 
    436 /**
    437  * Called upon closing. Called by `Client`.
    438  *
    439  * @param {String} reason
    440  * @throw {Error} optional error object
    441  * @api private
    442  */
    443 
    444 Socket.prototype.onclose = function(reason){
    445   if (!this.connected) return this;
    446   debug('closing socket - reason %s', reason);
    447   this.emit('disconnecting', reason);
    448   this.leaveAll();
    449   this.nsp.remove(this);
    450   this.client.remove(this);
    451   this.connected = false;
    452   this.disconnected = true;
    453   delete this.nsp.connected[this.id];
    454   this.emit('disconnect', reason);
    455 };
    456 
    457 /**
    458  * Produces an `error` packet.
    459  *
    460  * @param {Object} err error object
    461  * @api private
    462  */
    463 
    464 Socket.prototype.error = function(err){
    465   this.packet({ type: parser.ERROR, data: err });
    466 };
    467 
    468 /**
    469  * Disconnects this client.
    470  *
    471  * @param {Boolean} close if `true`, closes the underlying connection
    472  * @return {Socket} self
    473  * @api public
    474  */
    475 
    476 Socket.prototype.disconnect = function(close){
    477   if (!this.connected) return this;
    478   if (close) {
    479     this.client.disconnect();
    480   } else {
    481     this.packet({ type: parser.DISCONNECT });
    482     this.onclose('server namespace disconnect');
    483   }
    484   return this;
    485 };
    486 
    487 /**
    488  * Sets the compress flag.
    489  *
    490  * @param {Boolean} compress if `true`, compresses the sending data
    491  * @return {Socket} self
    492  * @api public
    493  */
    494 
    495 Socket.prototype.compress = function(compress){
    496   this.flags.compress = compress;
    497   return this;
    498 };
    499 
    500 /**
    501  * Sets the binary flag
    502  *
    503  * @param {Boolean} Encode as if it has binary data if `true`, Encode as if it doesnt have binary data if `false`
    504  * @return {Socket} self
    505  * @api public
    506  */
    507 
    508  Socket.prototype.binary = function (binary) {
    509    this.flags.binary = binary;
    510    return this;
    511  };
    512 
    513 /**
    514  * Dispatch incoming event to socket listeners.
    515  *
    516  * @param {Array} event that will get emitted
    517  * @api private
    518  */
    519 
    520 Socket.prototype.dispatch = function(event){
    521   debug('dispatching an event %j', event);
    522   var self = this;
    523   function dispatchSocket(err) {
    524     process.nextTick(function(){
    525       if (err) {
    526         return self.error(err.data || err.message);
    527       }
    528       emit.apply(self, event);
    529     });
    530   }
    531   this.run(event, dispatchSocket);
    532 };
    533 
    534 /**
    535  * Sets up socket middleware.
    536  *
    537  * @param {Function} middleware function (event, next)
    538  * @return {Socket} self
    539  * @api public
    540  */
    541 
    542 Socket.prototype.use = function(fn){
    543   this.fns.push(fn);
    544   return this;
    545 };
    546 
    547 /**
    548  * Executes the middleware for an incoming event.
    549  *
    550  * @param {Array} event that will get emitted
    551  * @param {Function} last fn call in the middleware
    552  * @api private
    553  */
    554 Socket.prototype.run = function(event, fn){
    555   var fns = this.fns.slice(0);
    556   if (!fns.length) return fn(null);
    557 
    558   function run(i){
    559     fns[i](event, function(err){
    560       // upon error, short-circuit
    561       if (err) return fn(err);
    562 
    563       // if no middleware left, summon callback
    564       if (!fns[i + 1]) return fn(null);
    565 
    566       // go on to next
    567       run(i + 1);
    568     });
    569   }
    570 
    571   run(0);
    572 };