twitst4tz

twitter statistics web application
Log | Files | Refs | README | LICENSE

websocket-server.js (11460B)


      1 'use strict';
      2 
      3 const EventEmitter = require('events');
      4 const { createHash } = require('crypto');
      5 const { createServer, STATUS_CODES } = require('http');
      6 
      7 const PerMessageDeflate = require('./permessage-deflate');
      8 const WebSocket = require('./websocket');
      9 const { format, parse } = require('./extension');
     10 const { GUID } = require('./constants');
     11 
     12 const keyRegex = /^[+/0-9A-Za-z]{22}==$/;
     13 const kUsedByWebSocketServer = Symbol('kUsedByWebSocketServer');
     14 
     15 /**
     16  * Class representing a WebSocket server.
     17  *
     18  * @extends EventEmitter
     19  */
     20 class WebSocketServer extends EventEmitter {
     21   /**
     22    * Create a `WebSocketServer` instance.
     23    *
     24    * @param {Object} options Configuration options
     25    * @param {Number} options.backlog The maximum length of the queue of pending
     26    *     connections
     27    * @param {Boolean} options.clientTracking Specifies whether or not to track
     28    *     clients
     29    * @param {Function} options.handleProtocols A hook to handle protocols
     30    * @param {String} options.host The hostname where to bind the server
     31    * @param {Number} options.maxPayload The maximum allowed message size
     32    * @param {Boolean} options.noServer Enable no server mode
     33    * @param {String} options.path Accept only connections matching this path
     34    * @param {(Boolean|Object)} options.perMessageDeflate Enable/disable
     35    *     permessage-deflate
     36    * @param {Number} options.port The port where to bind the server
     37    * @param {http.Server} options.server A pre-created HTTP/S server to use
     38    * @param {Function} options.verifyClient A hook to reject connections
     39    * @param {Function} callback A listener for the `listening` event
     40    */
     41   constructor(options, callback) {
     42     super();
     43 
     44     options = {
     45       maxPayload: 100 * 1024 * 1024,
     46       perMessageDeflate: false,
     47       handleProtocols: null,
     48       clientTracking: true,
     49       verifyClient: null,
     50       noServer: false,
     51       backlog: null, // use default (511 as implemented in net.js)
     52       server: null,
     53       host: null,
     54       path: null,
     55       port: null,
     56       ...options
     57     };
     58 
     59     if (options.port == null && !options.server && !options.noServer) {
     60       throw new TypeError(
     61         'One of the "port", "server", or "noServer" options must be specified'
     62       );
     63     }
     64 
     65     if (options.port != null) {
     66       this._server = createServer((req, res) => {
     67         const body = STATUS_CODES[426];
     68 
     69         res.writeHead(426, {
     70           'Content-Length': body.length,
     71           'Content-Type': 'text/plain'
     72         });
     73         res.end(body);
     74       });
     75       this._server.listen(
     76         options.port,
     77         options.host,
     78         options.backlog,
     79         callback
     80       );
     81     } else if (options.server) {
     82       if (options.server[kUsedByWebSocketServer]) {
     83         throw new Error(
     84           'The HTTP/S server is already being used by another WebSocket server'
     85         );
     86       }
     87 
     88       options.server[kUsedByWebSocketServer] = true;
     89       this._server = options.server;
     90     }
     91 
     92     if (this._server) {
     93       this._removeListeners = addListeners(this._server, {
     94         listening: this.emit.bind(this, 'listening'),
     95         error: this.emit.bind(this, 'error'),
     96         upgrade: (req, socket, head) => {
     97           this.handleUpgrade(req, socket, head, (ws) => {
     98             this.emit('connection', ws, req);
     99           });
    100         }
    101       });
    102     }
    103 
    104     if (options.perMessageDeflate === true) options.perMessageDeflate = {};
    105     if (options.clientTracking) this.clients = new Set();
    106     this.options = options;
    107   }
    108 
    109   /**
    110    * Returns the bound address, the address family name, and port of the server
    111    * as reported by the operating system if listening on an IP socket.
    112    * If the server is listening on a pipe or UNIX domain socket, the name is
    113    * returned as a string.
    114    *
    115    * @return {(Object|String|null)} The address of the server
    116    * @public
    117    */
    118   address() {
    119     if (this.options.noServer) {
    120       throw new Error('The server is operating in "noServer" mode');
    121     }
    122 
    123     if (!this._server) return null;
    124     return this._server.address();
    125   }
    126 
    127   /**
    128    * Close the server.
    129    *
    130    * @param {Function} cb Callback
    131    * @public
    132    */
    133   close(cb) {
    134     if (cb) this.once('close', cb);
    135 
    136     //
    137     // Terminate all associated clients.
    138     //
    139     if (this.clients) {
    140       for (const client of this.clients) client.terminate();
    141     }
    142 
    143     const server = this._server;
    144 
    145     if (server) {
    146       this._removeListeners();
    147       this._removeListeners = this._server = null;
    148 
    149       //
    150       // Close the http server if it was internally created.
    151       //
    152       if (this.options.port != null) {
    153         server.close(() => this.emit('close'));
    154         return;
    155       }
    156 
    157       delete server[kUsedByWebSocketServer];
    158     }
    159 
    160     process.nextTick(emitClose, this);
    161   }
    162 
    163   /**
    164    * See if a given request should be handled by this server instance.
    165    *
    166    * @param {http.IncomingMessage} req Request object to inspect
    167    * @return {Boolean} `true` if the request is valid, else `false`
    168    * @public
    169    */
    170   shouldHandle(req) {
    171     if (this.options.path) {
    172       const index = req.url.indexOf('?');
    173       const pathname = index !== -1 ? req.url.slice(0, index) : req.url;
    174 
    175       if (pathname !== this.options.path) return false;
    176     }
    177 
    178     return true;
    179   }
    180 
    181   /**
    182    * Handle a HTTP Upgrade request.
    183    *
    184    * @param {http.IncomingMessage} req The request object
    185    * @param {net.Socket} socket The network socket between the server and client
    186    * @param {Buffer} head The first packet of the upgraded stream
    187    * @param {Function} cb Callback
    188    * @public
    189    */
    190   handleUpgrade(req, socket, head, cb) {
    191     socket.on('error', socketOnError);
    192 
    193     const key =
    194       req.headers['sec-websocket-key'] !== undefined
    195         ? req.headers['sec-websocket-key'].trim()
    196         : false;
    197     const version = +req.headers['sec-websocket-version'];
    198     const extensions = {};
    199 
    200     if (
    201       req.method !== 'GET' ||
    202       req.headers.upgrade.toLowerCase() !== 'websocket' ||
    203       !key ||
    204       !keyRegex.test(key) ||
    205       (version !== 8 && version !== 13) ||
    206       !this.shouldHandle(req)
    207     ) {
    208       return abortHandshake(socket, 400);
    209     }
    210 
    211     if (this.options.perMessageDeflate) {
    212       const perMessageDeflate = new PerMessageDeflate(
    213         this.options.perMessageDeflate,
    214         true,
    215         this.options.maxPayload
    216       );
    217 
    218       try {
    219         const offers = parse(req.headers['sec-websocket-extensions']);
    220 
    221         if (offers[PerMessageDeflate.extensionName]) {
    222           perMessageDeflate.accept(offers[PerMessageDeflate.extensionName]);
    223           extensions[PerMessageDeflate.extensionName] = perMessageDeflate;
    224         }
    225       } catch (err) {
    226         return abortHandshake(socket, 400);
    227       }
    228     }
    229 
    230     //
    231     // Optionally call external client verification handler.
    232     //
    233     if (this.options.verifyClient) {
    234       const info = {
    235         origin:
    236           req.headers[`${version === 8 ? 'sec-websocket-origin' : 'origin'}`],
    237         secure: !!(req.connection.authorized || req.connection.encrypted),
    238         req
    239       };
    240 
    241       if (this.options.verifyClient.length === 2) {
    242         this.options.verifyClient(info, (verified, code, message, headers) => {
    243           if (!verified) {
    244             return abortHandshake(socket, code || 401, message, headers);
    245           }
    246 
    247           this.completeUpgrade(key, extensions, req, socket, head, cb);
    248         });
    249         return;
    250       }
    251 
    252       if (!this.options.verifyClient(info)) return abortHandshake(socket, 401);
    253     }
    254 
    255     this.completeUpgrade(key, extensions, req, socket, head, cb);
    256   }
    257 
    258   /**
    259    * Upgrade the connection to WebSocket.
    260    *
    261    * @param {String} key The value of the `Sec-WebSocket-Key` header
    262    * @param {Object} extensions The accepted extensions
    263    * @param {http.IncomingMessage} req The request object
    264    * @param {net.Socket} socket The network socket between the server and client
    265    * @param {Buffer} head The first packet of the upgraded stream
    266    * @param {Function} cb Callback
    267    * @private
    268    */
    269   completeUpgrade(key, extensions, req, socket, head, cb) {
    270     //
    271     // Destroy the socket if the client has already sent a FIN packet.
    272     //
    273     if (!socket.readable || !socket.writable) return socket.destroy();
    274 
    275     const digest = createHash('sha1')
    276       .update(key + GUID)
    277       .digest('base64');
    278 
    279     const headers = [
    280       'HTTP/1.1 101 Switching Protocols',
    281       'Upgrade: websocket',
    282       'Connection: Upgrade',
    283       `Sec-WebSocket-Accept: ${digest}`
    284     ];
    285 
    286     const ws = new WebSocket(null);
    287     let protocol = req.headers['sec-websocket-protocol'];
    288 
    289     if (protocol) {
    290       protocol = protocol.trim().split(/ *, */);
    291 
    292       //
    293       // Optionally call external protocol selection handler.
    294       //
    295       if (this.options.handleProtocols) {
    296         protocol = this.options.handleProtocols(protocol, req);
    297       } else {
    298         protocol = protocol[0];
    299       }
    300 
    301       if (protocol) {
    302         headers.push(`Sec-WebSocket-Protocol: ${protocol}`);
    303         ws.protocol = protocol;
    304       }
    305     }
    306 
    307     if (extensions[PerMessageDeflate.extensionName]) {
    308       const params = extensions[PerMessageDeflate.extensionName].params;
    309       const value = format({
    310         [PerMessageDeflate.extensionName]: [params]
    311       });
    312       headers.push(`Sec-WebSocket-Extensions: ${value}`);
    313       ws._extensions = extensions;
    314     }
    315 
    316     //
    317     // Allow external modification/inspection of handshake headers.
    318     //
    319     this.emit('headers', headers, req);
    320 
    321     socket.write(headers.concat('\r\n').join('\r\n'));
    322     socket.removeListener('error', socketOnError);
    323 
    324     ws.setSocket(socket, head, this.options.maxPayload);
    325 
    326     if (this.clients) {
    327       this.clients.add(ws);
    328       ws.on('close', () => this.clients.delete(ws));
    329     }
    330 
    331     cb(ws);
    332   }
    333 }
    334 
    335 module.exports = WebSocketServer;
    336 
    337 /**
    338  * Add event listeners on an `EventEmitter` using a map of <event, listener>
    339  * pairs.
    340  *
    341  * @param {EventEmitter} server The event emitter
    342  * @param {Object.<String, Function>} map The listeners to add
    343  * @return {Function} A function that will remove the added listeners when called
    344  * @private
    345  */
    346 function addListeners(server, map) {
    347   for (const event of Object.keys(map)) server.on(event, map[event]);
    348 
    349   return function removeListeners() {
    350     for (const event of Object.keys(map)) {
    351       server.removeListener(event, map[event]);
    352     }
    353   };
    354 }
    355 
    356 /**
    357  * Emit a `'close'` event on an `EventEmitter`.
    358  *
    359  * @param {EventEmitter} server The event emitter
    360  * @private
    361  */
    362 function emitClose(server) {
    363   server.emit('close');
    364 }
    365 
    366 /**
    367  * Handle premature socket errors.
    368  *
    369  * @private
    370  */
    371 function socketOnError() {
    372   this.destroy();
    373 }
    374 
    375 /**
    376  * Close the connection when preconditions are not fulfilled.
    377  *
    378  * @param {net.Socket} socket The socket of the upgrade request
    379  * @param {Number} code The HTTP response status code
    380  * @param {String} [message] The HTTP response body
    381  * @param {Object} [headers] Additional HTTP response headers
    382  * @private
    383  */
    384 function abortHandshake(socket, code, message, headers) {
    385   if (socket.writable) {
    386     message = message || STATUS_CODES[code];
    387     headers = {
    388       Connection: 'close',
    389       'Content-type': 'text/html',
    390       'Content-Length': Buffer.byteLength(message),
    391       ...headers
    392     };
    393 
    394     socket.write(
    395       `HTTP/1.1 ${code} ${STATUS_CODES[code]}\r\n` +
    396         Object.keys(headers)
    397           .map((h) => `${h}: ${headers[h]}`)
    398           .join('\r\n') +
    399         '\r\n\r\n' +
    400         message
    401     );
    402   }
    403 
    404   socket.removeListener('error', socketOnError);
    405   socket.destroy();
    406 }