buddy

node MVC discord bot
Log | Files | Refs | README

WebSocketManager.js (13393B)


      1 'use strict';
      2 
      3 const EventEmitter = require('events');
      4 const WebSocketShard = require('./WebSocketShard');
      5 const PacketHandlers = require('./handlers');
      6 const { Error: DJSError } = require('../../errors');
      7 const Collection = require('../../util/Collection');
      8 const { Events, ShardEvents, Status, WSCodes, WSEvents } = require('../../util/Constants');
      9 const Util = require('../../util/Util');
     10 
     11 const BeforeReadyWhitelist = [
     12   WSEvents.READY,
     13   WSEvents.RESUMED,
     14   WSEvents.GUILD_CREATE,
     15   WSEvents.GUILD_DELETE,
     16   WSEvents.GUILD_MEMBERS_CHUNK,
     17   WSEvents.GUILD_MEMBER_ADD,
     18   WSEvents.GUILD_MEMBER_REMOVE,
     19 ];
     20 
     21 const UNRECOVERABLE_CLOSE_CODES = Object.keys(WSCodes)
     22   .slice(1)
     23   .map(Number);
     24 const UNRESUMABLE_CLOSE_CODES = [1000, 4006, 4007];
     25 
     26 /**
     27  * The WebSocket manager for this client.
     28  * <info>This class forwards raw dispatch events,
     29  * read more about it here {@link https://discordapp.com/developers/docs/topics/gateway}</info>
     30  * @extends EventEmitter
     31  */
     32 class WebSocketManager extends EventEmitter {
     33   constructor(client) {
     34     super();
     35 
     36     /**
     37      * The client that instantiated this WebSocketManager
     38      * @type {Client}
     39      * @readonly
     40      * @name WebSocketManager#client
     41      */
     42     Object.defineProperty(this, 'client', { value: client });
     43 
     44     /**
     45      * The gateway this manager uses
     46      * @type {?string}
     47      */
     48     this.gateway = undefined;
     49 
     50     /**
     51      * The amount of shards this manager handles
     52      * @private
     53      * @type {number}
     54      */
     55     this.totalShards = this.client.options.shards.length;
     56 
     57     /**
     58      * A collection of all shards this manager handles
     59      * @type {Collection<number, WebSocketShard>}
     60      */
     61     this.shards = new Collection();
     62 
     63     /**
     64      * An array of shards to be connected or that need to reconnect
     65      * @type {Set<WebSocketShard>}
     66      * @private
     67      * @name WebSocketManager#shardQueue
     68      */
     69     Object.defineProperty(this, 'shardQueue', { value: new Set(), writable: true });
     70 
     71     /**
     72      * An array of queued events before this WebSocketManager became ready
     73      * @type {object[]}
     74      * @private
     75      * @name WebSocketManager#packetQueue
     76      */
     77     Object.defineProperty(this, 'packetQueue', { value: [] });
     78 
     79     /**
     80      * The current status of this WebSocketManager
     81      * @type {number}
     82      */
     83     this.status = Status.IDLE;
     84 
     85     /**
     86      * If this manager was destroyed. It will prevent shards from reconnecting
     87      * @type {boolean}
     88      * @private
     89      */
     90     this.destroyed = false;
     91 
     92     /**
     93      * If this manager is currently reconnecting one or multiple shards
     94      * @type {boolean}
     95      * @private
     96      */
     97     this.reconnecting = false;
     98 
     99     /**
    100      * The current session limit of the client
    101      * @private
    102      * @type {?Object}
    103      * @prop {number} total Total number of identifies available
    104      * @prop {number} remaining Number of identifies remaining
    105      * @prop {number} reset_after Number of milliseconds after which the limit resets
    106      */
    107     this.sessionStartLimit = undefined;
    108   }
    109 
    110   /**
    111    * The average ping of all WebSocketShards
    112    * @type {number}
    113    * @readonly
    114    */
    115   get ping() {
    116     const sum = this.shards.reduce((a, b) => a + b.ping, 0);
    117     return sum / this.shards.size;
    118   }
    119 
    120   /**
    121    * Emits a debug message.
    122    * @param {string} message The debug message
    123    * @param {?WebSocketShard} [shard] The shard that emitted this message, if any
    124    * @private
    125    */
    126   debug(message, shard) {
    127     this.client.emit(Events.DEBUG, `[WS => ${shard ? `Shard ${shard.id}` : 'Manager'}] ${message}`);
    128   }
    129 
    130   /**
    131    * Connects this manager to the gateway.
    132    * @private
    133    */
    134   async connect() {
    135     const invalidToken = new DJSError(WSCodes[4004]);
    136     const {
    137       url: gatewayURL,
    138       shards: recommendedShards,
    139       session_start_limit: sessionStartLimit,
    140     } = await this.client.api.gateway.bot.get().catch(error => {
    141       throw error.httpStatus === 401 ? invalidToken : error;
    142     });
    143 
    144     this.sessionStartLimit = sessionStartLimit;
    145 
    146     const { total, remaining, reset_after } = sessionStartLimit;
    147 
    148     this.debug(`Fetched Gateway Information
    149     URL: ${gatewayURL}
    150     Recommended Shards: ${recommendedShards}`);
    151 
    152     this.debug(`Session Limit Information
    153     Total: ${total}
    154     Remaining: ${remaining}`);
    155 
    156     this.gateway = `${gatewayURL}/`;
    157 
    158     let { shards } = this.client.options;
    159 
    160     if (shards === 'auto') {
    161       this.debug(`Using the recommended shard count provided by Discord: ${recommendedShards}`);
    162       this.totalShards = this.client.options.shardCount = recommendedShards;
    163       shards = this.client.options.shards = Array.from({ length: recommendedShards }, (_, i) => i);
    164     }
    165 
    166     this.totalShards = shards.length;
    167     this.debug(`Spawning shards: ${shards.join(', ')}`);
    168     this.shardQueue = new Set(shards.map(id => new WebSocketShard(this, id)));
    169 
    170     await this._handleSessionLimit(remaining, reset_after);
    171 
    172     return this.createShards();
    173   }
    174 
    175   /**
    176    * Handles the creation of a shard.
    177    * @returns {Promise<boolean>}
    178    * @private
    179    */
    180   async createShards() {
    181     // If we don't have any shards to handle, return
    182     if (!this.shardQueue.size) return false;
    183 
    184     const [shard] = this.shardQueue;
    185 
    186     this.shardQueue.delete(shard);
    187 
    188     if (!shard.eventsAttached) {
    189       shard.on(ShardEvents.ALL_READY, unavailableGuilds => {
    190         /**
    191          * Emitted when a shard turns ready.
    192          * @event Client#shardReady
    193          * @param {number} id The shard ID that turned ready
    194          * @param {?Set<string>} unavailableGuilds Set of unavailable guild IDs, if any
    195          */
    196         this.client.emit(Events.SHARD_READY, shard.id, unavailableGuilds);
    197 
    198         if (!this.shardQueue.size) this.reconnecting = false;
    199         this.checkShardsReady();
    200       });
    201 
    202       shard.on(ShardEvents.CLOSE, event => {
    203         if (event.code === 1000 ? this.destroyed : UNRECOVERABLE_CLOSE_CODES.includes(event.code)) {
    204           /**
    205            * Emitted when a shard's WebSocket disconnects and will no longer reconnect.
    206            * @event Client#shardDisconnect
    207            * @param {CloseEvent} event The WebSocket close event
    208            * @param {number} id The shard ID that disconnected
    209            */
    210           this.client.emit(Events.SHARD_DISCONNECT, event, shard.id);
    211           this.debug(WSCodes[event.code], shard);
    212           return;
    213         }
    214 
    215         if (UNRESUMABLE_CLOSE_CODES.includes(event.code)) {
    216           // These event codes cannot be resumed
    217           shard.sessionID = undefined;
    218         }
    219 
    220         /**
    221          * Emitted when a shard is attempting to reconnect or re-identify.
    222          * @event Client#shardReconnecting
    223          * @param {number} id The shard ID that is attempting to reconnect
    224          */
    225         this.client.emit(Events.SHARD_RECONNECTING, shard.id);
    226 
    227         this.shardQueue.add(shard);
    228 
    229         if (shard.sessionID) {
    230           this.debug(`Session ID is present, attempting an immediate reconnect...`, shard);
    231           this.reconnect(true);
    232         } else {
    233           shard.destroy({ reset: true, emit: false, log: false });
    234           this.reconnect();
    235         }
    236       });
    237 
    238       shard.on(ShardEvents.INVALID_SESSION, () => {
    239         this.client.emit(Events.SHARD_RECONNECTING, shard.id);
    240       });
    241 
    242       shard.on(ShardEvents.DESTROYED, () => {
    243         this.debug('Shard was destroyed but no WebSocket connection was present! Reconnecting...', shard);
    244 
    245         this.client.emit(Events.SHARD_RECONNECTING, shard.id);
    246 
    247         this.shardQueue.add(shard);
    248         this.reconnect();
    249       });
    250 
    251       shard.eventsAttached = true;
    252     }
    253 
    254     this.shards.set(shard.id, shard);
    255 
    256     try {
    257       await shard.connect();
    258     } catch (error) {
    259       if (error && error.code && UNRECOVERABLE_CLOSE_CODES.includes(error.code)) {
    260         throw new DJSError(WSCodes[error.code]);
    261         // Undefined if session is invalid, error event for regular closes
    262       } else if (!error || error.code) {
    263         this.debug('Failed to connect to the gateway, requeueing...', shard);
    264         this.shardQueue.add(shard);
    265       } else {
    266         throw error;
    267       }
    268     }
    269     // If we have more shards, add a 5s delay
    270     if (this.shardQueue.size) {
    271       this.debug(`Shard Queue Size: ${this.shardQueue.size}; continuing in 5 seconds...`);
    272       await Util.delayFor(5000);
    273       await this._handleSessionLimit();
    274       return this.createShards();
    275     }
    276 
    277     return true;
    278   }
    279 
    280   /**
    281    * Handles reconnects for this manager.
    282    * @param {boolean} [skipLimit=false] IF this reconnect should skip checking the session limit
    283    * @private
    284    * @returns {Promise<boolean>}
    285    */
    286   async reconnect(skipLimit = false) {
    287     if (this.reconnecting || this.status !== Status.READY) return false;
    288     this.reconnecting = true;
    289     try {
    290       if (!skipLimit) await this._handleSessionLimit();
    291       await this.createShards();
    292     } catch (error) {
    293       this.debug(`Couldn't reconnect or fetch information about the gateway. ${error}`);
    294       if (error.httpStatus !== 401) {
    295         this.debug(`Possible network error occurred. Retrying in 5s...`);
    296         await Util.delayFor(5000);
    297         this.reconnecting = false;
    298         return this.reconnect();
    299       }
    300       // If we get an error at this point, it means we cannot reconnect anymore
    301       if (this.client.listenerCount(Events.INVALIDATED)) {
    302         /**
    303          * Emitted when the client's session becomes invalidated.
    304          * You are expected to handle closing the process gracefully and preventing a boot loop
    305          * if you are listening to this event.
    306          * @event Client#invalidated
    307          */
    308         this.client.emit(Events.INVALIDATED);
    309         // Destroy just the shards. This means you have to handle the cleanup yourself
    310         this.destroy();
    311       } else {
    312         this.client.destroy();
    313       }
    314     } finally {
    315       this.reconnecting = false;
    316     }
    317     return true;
    318   }
    319 
    320   /**
    321    * Broadcasts a packet to every shard this manager handles.
    322    * @param {Object} packet The packet to send
    323    * @private
    324    */
    325   broadcast(packet) {
    326     for (const shard of this.shards.values()) shard.send(packet);
    327   }
    328 
    329   /**
    330    * Destroys this manager and all its shards.
    331    * @private
    332    */
    333   destroy() {
    334     if (this.destroyed) return;
    335     this.debug(`Manager was destroyed. Called by:\n${new Error('MANAGER_DESTROYED').stack}`);
    336     this.destroyed = true;
    337     this.shardQueue.clear();
    338     for (const shard of this.shards.values()) shard.destroy({ closeCode: 1000, reset: true, emit: false, log: false });
    339   }
    340 
    341   /**
    342    * Handles the timeout required if we cannot identify anymore.
    343    * @param {number} [remaining] The amount of remaining identify sessions that can be done today
    344    * @param {number} [resetAfter] The amount of time in which the identify counter resets
    345    * @private
    346    */
    347   async _handleSessionLimit(remaining, resetAfter) {
    348     if (typeof remaining === 'undefined' && typeof resetAfter === 'undefined') {
    349       const { session_start_limit } = await this.client.api.gateway.bot.get();
    350       this.sessionStartLimit = session_start_limit;
    351       remaining = session_start_limit.remaining;
    352       resetAfter = session_start_limit.reset_after;
    353       this.debug(`Session Limit Information
    354     Total: ${session_start_limit.total}
    355     Remaining: ${remaining}`);
    356     }
    357     if (!remaining) {
    358       this.debug(`Exceeded identify threshold. Will attempt a connection in ${resetAfter}ms`);
    359       await Util.delayFor(resetAfter);
    360     }
    361   }
    362 
    363   /**
    364    * Processes a packet and queues it if this WebSocketManager is not ready.
    365    * @param {Object} [packet] The packet to be handled
    366    * @param {WebSocketShard} [shard] The shard that will handle this packet
    367    * @returns {boolean}
    368    * @private
    369    */
    370   handlePacket(packet, shard) {
    371     if (packet && this.status !== Status.READY) {
    372       if (!BeforeReadyWhitelist.includes(packet.t)) {
    373         this.packetQueue.push({ packet, shard });
    374         return false;
    375       }
    376     }
    377 
    378     if (this.packetQueue.length) {
    379       const item = this.packetQueue.shift();
    380       this.client.setImmediate(() => {
    381         this.handlePacket(item.packet, item.shard);
    382       });
    383     }
    384 
    385     if (packet && PacketHandlers[packet.t]) {
    386       PacketHandlers[packet.t](this.client, packet, shard);
    387     }
    388 
    389     return true;
    390   }
    391 
    392   /**
    393    * Checks whether the client is ready to be marked as ready.
    394    * @private
    395    */
    396   async checkShardsReady() {
    397     if (this.status === Status.READY) return;
    398     if (this.shards.size !== this.totalShards || this.shards.some(s => s.status !== Status.READY)) {
    399       return;
    400     }
    401 
    402     this.status = Status.NEARLY;
    403 
    404     if (this.client.options.fetchAllMembers) {
    405       try {
    406         const promises = this.client.guilds.cache.map(guild => {
    407           if (guild.available) return guild.members.fetch();
    408           // Return empty promise if guild is unavailable
    409           return Promise.resolve();
    410         });
    411         await Promise.all(promises);
    412       } catch (err) {
    413         this.debug(`Failed to fetch all members before ready! ${err}\n${err.stack}`);
    414       }
    415     }
    416 
    417     this.triggerClientReady();
    418   }
    419 
    420   /**
    421    * Causes the client to be marked as ready and emits the ready event.
    422    * @private
    423    */
    424   triggerClientReady() {
    425     this.status = Status.READY;
    426 
    427     this.client.readyAt = new Date();
    428 
    429     /**
    430      * Emitted when the client becomes ready to start working.
    431      * @event Client#ready
    432      */
    433     this.client.emit(Events.CLIENT_READY);
    434 
    435     this.handlePacket();
    436   }
    437 }
    438 
    439 module.exports = WebSocketManager;