WebSocketManager.js (13393B)
1 'use strict'; 2 3 const EventEmitter = require('events'); 4 const WebSocketShard = require('./WebSocketShard'); 5 const PacketHandlers = require('./handlers'); 6 const { Error: DJSError } = require('../../errors'); 7 const Collection = require('../../util/Collection'); 8 const { Events, ShardEvents, Status, WSCodes, WSEvents } = require('../../util/Constants'); 9 const Util = require('../../util/Util'); 10 11 const BeforeReadyWhitelist = [ 12 WSEvents.READY, 13 WSEvents.RESUMED, 14 WSEvents.GUILD_CREATE, 15 WSEvents.GUILD_DELETE, 16 WSEvents.GUILD_MEMBERS_CHUNK, 17 WSEvents.GUILD_MEMBER_ADD, 18 WSEvents.GUILD_MEMBER_REMOVE, 19 ]; 20 21 const UNRECOVERABLE_CLOSE_CODES = Object.keys(WSCodes) 22 .slice(1) 23 .map(Number); 24 const UNRESUMABLE_CLOSE_CODES = [1000, 4006, 4007]; 25 26 /** 27 * The WebSocket manager for this client. 28 * <info>This class forwards raw dispatch events, 29 * read more about it here {@link https://discordapp.com/developers/docs/topics/gateway}</info> 30 * @extends EventEmitter 31 */ 32 class WebSocketManager extends EventEmitter { 33 constructor(client) { 34 super(); 35 36 /** 37 * The client that instantiated this WebSocketManager 38 * @type {Client} 39 * @readonly 40 * @name WebSocketManager#client 41 */ 42 Object.defineProperty(this, 'client', { value: client }); 43 44 /** 45 * The gateway this manager uses 46 * @type {?string} 47 */ 48 this.gateway = undefined; 49 50 /** 51 * The amount of shards this manager handles 52 * @private 53 * @type {number} 54 */ 55 this.totalShards = this.client.options.shards.length; 56 57 /** 58 * A collection of all shards this manager handles 59 * @type {Collection<number, WebSocketShard>} 60 */ 61 this.shards = new Collection(); 62 63 /** 64 * An array of shards to be connected or that need to reconnect 65 * @type {Set<WebSocketShard>} 66 * @private 67 * @name WebSocketManager#shardQueue 68 */ 69 Object.defineProperty(this, 'shardQueue', { value: new Set(), writable: true }); 70 71 /** 72 * An array of queued events before this WebSocketManager became ready 73 * @type {object[]} 74 * @private 75 * @name WebSocketManager#packetQueue 76 */ 77 Object.defineProperty(this, 'packetQueue', { value: [] }); 78 79 /** 80 * The current status of this WebSocketManager 81 * @type {number} 82 */ 83 this.status = Status.IDLE; 84 85 /** 86 * If this manager was destroyed. It will prevent shards from reconnecting 87 * @type {boolean} 88 * @private 89 */ 90 this.destroyed = false; 91 92 /** 93 * If this manager is currently reconnecting one or multiple shards 94 * @type {boolean} 95 * @private 96 */ 97 this.reconnecting = false; 98 99 /** 100 * The current session limit of the client 101 * @private 102 * @type {?Object} 103 * @prop {number} total Total number of identifies available 104 * @prop {number} remaining Number of identifies remaining 105 * @prop {number} reset_after Number of milliseconds after which the limit resets 106 */ 107 this.sessionStartLimit = undefined; 108 } 109 110 /** 111 * The average ping of all WebSocketShards 112 * @type {number} 113 * @readonly 114 */ 115 get ping() { 116 const sum = this.shards.reduce((a, b) => a + b.ping, 0); 117 return sum / this.shards.size; 118 } 119 120 /** 121 * Emits a debug message. 122 * @param {string} message The debug message 123 * @param {?WebSocketShard} [shard] The shard that emitted this message, if any 124 * @private 125 */ 126 debug(message, shard) { 127 this.client.emit(Events.DEBUG, `[WS => ${shard ? `Shard ${shard.id}` : 'Manager'}] ${message}`); 128 } 129 130 /** 131 * Connects this manager to the gateway. 132 * @private 133 */ 134 async connect() { 135 const invalidToken = new DJSError(WSCodes[4004]); 136 const { 137 url: gatewayURL, 138 shards: recommendedShards, 139 session_start_limit: sessionStartLimit, 140 } = await this.client.api.gateway.bot.get().catch(error => { 141 throw error.httpStatus === 401 ? invalidToken : error; 142 }); 143 144 this.sessionStartLimit = sessionStartLimit; 145 146 const { total, remaining, reset_after } = sessionStartLimit; 147 148 this.debug(`Fetched Gateway Information 149 URL: ${gatewayURL} 150 Recommended Shards: ${recommendedShards}`); 151 152 this.debug(`Session Limit Information 153 Total: ${total} 154 Remaining: ${remaining}`); 155 156 this.gateway = `${gatewayURL}/`; 157 158 let { shards } = this.client.options; 159 160 if (shards === 'auto') { 161 this.debug(`Using the recommended shard count provided by Discord: ${recommendedShards}`); 162 this.totalShards = this.client.options.shardCount = recommendedShards; 163 shards = this.client.options.shards = Array.from({ length: recommendedShards }, (_, i) => i); 164 } 165 166 this.totalShards = shards.length; 167 this.debug(`Spawning shards: ${shards.join(', ')}`); 168 this.shardQueue = new Set(shards.map(id => new WebSocketShard(this, id))); 169 170 await this._handleSessionLimit(remaining, reset_after); 171 172 return this.createShards(); 173 } 174 175 /** 176 * Handles the creation of a shard. 177 * @returns {Promise<boolean>} 178 * @private 179 */ 180 async createShards() { 181 // If we don't have any shards to handle, return 182 if (!this.shardQueue.size) return false; 183 184 const [shard] = this.shardQueue; 185 186 this.shardQueue.delete(shard); 187 188 if (!shard.eventsAttached) { 189 shard.on(ShardEvents.ALL_READY, unavailableGuilds => { 190 /** 191 * Emitted when a shard turns ready. 192 * @event Client#shardReady 193 * @param {number} id The shard ID that turned ready 194 * @param {?Set<string>} unavailableGuilds Set of unavailable guild IDs, if any 195 */ 196 this.client.emit(Events.SHARD_READY, shard.id, unavailableGuilds); 197 198 if (!this.shardQueue.size) this.reconnecting = false; 199 this.checkShardsReady(); 200 }); 201 202 shard.on(ShardEvents.CLOSE, event => { 203 if (event.code === 1000 ? this.destroyed : UNRECOVERABLE_CLOSE_CODES.includes(event.code)) { 204 /** 205 * Emitted when a shard's WebSocket disconnects and will no longer reconnect. 206 * @event Client#shardDisconnect 207 * @param {CloseEvent} event The WebSocket close event 208 * @param {number} id The shard ID that disconnected 209 */ 210 this.client.emit(Events.SHARD_DISCONNECT, event, shard.id); 211 this.debug(WSCodes[event.code], shard); 212 return; 213 } 214 215 if (UNRESUMABLE_CLOSE_CODES.includes(event.code)) { 216 // These event codes cannot be resumed 217 shard.sessionID = undefined; 218 } 219 220 /** 221 * Emitted when a shard is attempting to reconnect or re-identify. 222 * @event Client#shardReconnecting 223 * @param {number} id The shard ID that is attempting to reconnect 224 */ 225 this.client.emit(Events.SHARD_RECONNECTING, shard.id); 226 227 this.shardQueue.add(shard); 228 229 if (shard.sessionID) { 230 this.debug(`Session ID is present, attempting an immediate reconnect...`, shard); 231 this.reconnect(true); 232 } else { 233 shard.destroy({ reset: true, emit: false, log: false }); 234 this.reconnect(); 235 } 236 }); 237 238 shard.on(ShardEvents.INVALID_SESSION, () => { 239 this.client.emit(Events.SHARD_RECONNECTING, shard.id); 240 }); 241 242 shard.on(ShardEvents.DESTROYED, () => { 243 this.debug('Shard was destroyed but no WebSocket connection was present! Reconnecting...', shard); 244 245 this.client.emit(Events.SHARD_RECONNECTING, shard.id); 246 247 this.shardQueue.add(shard); 248 this.reconnect(); 249 }); 250 251 shard.eventsAttached = true; 252 } 253 254 this.shards.set(shard.id, shard); 255 256 try { 257 await shard.connect(); 258 } catch (error) { 259 if (error && error.code && UNRECOVERABLE_CLOSE_CODES.includes(error.code)) { 260 throw new DJSError(WSCodes[error.code]); 261 // Undefined if session is invalid, error event for regular closes 262 } else if (!error || error.code) { 263 this.debug('Failed to connect to the gateway, requeueing...', shard); 264 this.shardQueue.add(shard); 265 } else { 266 throw error; 267 } 268 } 269 // If we have more shards, add a 5s delay 270 if (this.shardQueue.size) { 271 this.debug(`Shard Queue Size: ${this.shardQueue.size}; continuing in 5 seconds...`); 272 await Util.delayFor(5000); 273 await this._handleSessionLimit(); 274 return this.createShards(); 275 } 276 277 return true; 278 } 279 280 /** 281 * Handles reconnects for this manager. 282 * @param {boolean} [skipLimit=false] IF this reconnect should skip checking the session limit 283 * @private 284 * @returns {Promise<boolean>} 285 */ 286 async reconnect(skipLimit = false) { 287 if (this.reconnecting || this.status !== Status.READY) return false; 288 this.reconnecting = true; 289 try { 290 if (!skipLimit) await this._handleSessionLimit(); 291 await this.createShards(); 292 } catch (error) { 293 this.debug(`Couldn't reconnect or fetch information about the gateway. ${error}`); 294 if (error.httpStatus !== 401) { 295 this.debug(`Possible network error occurred. Retrying in 5s...`); 296 await Util.delayFor(5000); 297 this.reconnecting = false; 298 return this.reconnect(); 299 } 300 // If we get an error at this point, it means we cannot reconnect anymore 301 if (this.client.listenerCount(Events.INVALIDATED)) { 302 /** 303 * Emitted when the client's session becomes invalidated. 304 * You are expected to handle closing the process gracefully and preventing a boot loop 305 * if you are listening to this event. 306 * @event Client#invalidated 307 */ 308 this.client.emit(Events.INVALIDATED); 309 // Destroy just the shards. This means you have to handle the cleanup yourself 310 this.destroy(); 311 } else { 312 this.client.destroy(); 313 } 314 } finally { 315 this.reconnecting = false; 316 } 317 return true; 318 } 319 320 /** 321 * Broadcasts a packet to every shard this manager handles. 322 * @param {Object} packet The packet to send 323 * @private 324 */ 325 broadcast(packet) { 326 for (const shard of this.shards.values()) shard.send(packet); 327 } 328 329 /** 330 * Destroys this manager and all its shards. 331 * @private 332 */ 333 destroy() { 334 if (this.destroyed) return; 335 this.debug(`Manager was destroyed. Called by:\n${new Error('MANAGER_DESTROYED').stack}`); 336 this.destroyed = true; 337 this.shardQueue.clear(); 338 for (const shard of this.shards.values()) shard.destroy({ closeCode: 1000, reset: true, emit: false, log: false }); 339 } 340 341 /** 342 * Handles the timeout required if we cannot identify anymore. 343 * @param {number} [remaining] The amount of remaining identify sessions that can be done today 344 * @param {number} [resetAfter] The amount of time in which the identify counter resets 345 * @private 346 */ 347 async _handleSessionLimit(remaining, resetAfter) { 348 if (typeof remaining === 'undefined' && typeof resetAfter === 'undefined') { 349 const { session_start_limit } = await this.client.api.gateway.bot.get(); 350 this.sessionStartLimit = session_start_limit; 351 remaining = session_start_limit.remaining; 352 resetAfter = session_start_limit.reset_after; 353 this.debug(`Session Limit Information 354 Total: ${session_start_limit.total} 355 Remaining: ${remaining}`); 356 } 357 if (!remaining) { 358 this.debug(`Exceeded identify threshold. Will attempt a connection in ${resetAfter}ms`); 359 await Util.delayFor(resetAfter); 360 } 361 } 362 363 /** 364 * Processes a packet and queues it if this WebSocketManager is not ready. 365 * @param {Object} [packet] The packet to be handled 366 * @param {WebSocketShard} [shard] The shard that will handle this packet 367 * @returns {boolean} 368 * @private 369 */ 370 handlePacket(packet, shard) { 371 if (packet && this.status !== Status.READY) { 372 if (!BeforeReadyWhitelist.includes(packet.t)) { 373 this.packetQueue.push({ packet, shard }); 374 return false; 375 } 376 } 377 378 if (this.packetQueue.length) { 379 const item = this.packetQueue.shift(); 380 this.client.setImmediate(() => { 381 this.handlePacket(item.packet, item.shard); 382 }); 383 } 384 385 if (packet && PacketHandlers[packet.t]) { 386 PacketHandlers[packet.t](this.client, packet, shard); 387 } 388 389 return true; 390 } 391 392 /** 393 * Checks whether the client is ready to be marked as ready. 394 * @private 395 */ 396 async checkShardsReady() { 397 if (this.status === Status.READY) return; 398 if (this.shards.size !== this.totalShards || this.shards.some(s => s.status !== Status.READY)) { 399 return; 400 } 401 402 this.status = Status.NEARLY; 403 404 if (this.client.options.fetchAllMembers) { 405 try { 406 const promises = this.client.guilds.cache.map(guild => { 407 if (guild.available) return guild.members.fetch(); 408 // Return empty promise if guild is unavailable 409 return Promise.resolve(); 410 }); 411 await Promise.all(promises); 412 } catch (err) { 413 this.debug(`Failed to fetch all members before ready! ${err}\n${err.stack}`); 414 } 415 } 416 417 this.triggerClientReady(); 418 } 419 420 /** 421 * Causes the client to be marked as ready and emits the ready event. 422 * @private 423 */ 424 triggerClientReady() { 425 this.status = Status.READY; 426 427 this.client.readyAt = new Date(); 428 429 /** 430 * Emitted when the client becomes ready to start working. 431 * @event Client#ready 432 */ 433 this.client.emit(Events.CLIENT_READY); 434 435 this.handlePacket(); 436 } 437 } 438 439 module.exports = WebSocketManager;