WebSocketShard.js (22170B)
1 'use strict'; 2 3 const EventEmitter = require('events'); 4 const WebSocket = require('../../WebSocket'); 5 const { browser, Status, Events, ShardEvents, OPCodes, WSEvents } = require('../../util/Constants'); 6 7 const STATUS_KEYS = Object.keys(Status); 8 const CONNECTION_STATE = Object.keys(WebSocket.WebSocket); 9 10 let zlib; 11 12 if (!browser) { 13 try { 14 zlib = require('zlib-sync'); 15 } catch {} // eslint-disable-line no-empty 16 } 17 18 /** 19 * Represents a Shard's WebSocket connection 20 */ 21 class WebSocketShard extends EventEmitter { 22 constructor(manager, id) { 23 super(); 24 25 /** 26 * The WebSocketManager of the shard 27 * @type {WebSocketManager} 28 */ 29 this.manager = manager; 30 31 /** 32 * The ID of the shard 33 * @type {number} 34 */ 35 this.id = id; 36 37 /** 38 * The current status of the shard 39 * @type {Status} 40 */ 41 this.status = Status.IDLE; 42 43 /** 44 * The current sequence of the shard 45 * @type {number} 46 * @private 47 */ 48 this.sequence = -1; 49 50 /** 51 * The sequence of the shard after close 52 * @type {number} 53 * @private 54 */ 55 this.closeSequence = 0; 56 57 /** 58 * The current session ID of the shard 59 * @type {string} 60 * @private 61 */ 62 this.sessionID = undefined; 63 64 /** 65 * The previous heartbeat ping of the shard 66 * @type {number} 67 */ 68 this.ping = -1; 69 70 /** 71 * The last time a ping was sent (a timestamp) 72 * @type {number} 73 * @private 74 */ 75 this.lastPingTimestamp = -1; 76 77 /** 78 * If we received a heartbeat ack back. Used to identify zombie connections 79 * @type {boolean} 80 * @private 81 */ 82 this.lastHeartbeatAcked = true; 83 84 /** 85 * Contains the rate limit queue and metadata 86 * @type {Object} 87 * @private 88 */ 89 Object.defineProperty(this, 'ratelimit', { 90 value: { 91 queue: [], 92 total: 120, 93 remaining: 120, 94 time: 60e3, 95 timer: null, 96 }, 97 }); 98 99 /** 100 * The WebSocket connection for the current shard 101 * @type {?WebSocket} 102 * @private 103 */ 104 Object.defineProperty(this, 'connection', { value: null, writable: true }); 105 106 /** 107 * @external Inflate 108 * @see {@link https://www.npmjs.com/package/zlib-sync} 109 */ 110 111 /** 112 * The compression to use 113 * @type {?Inflate} 114 * @private 115 */ 116 Object.defineProperty(this, 'inflate', { value: null, writable: true }); 117 118 /** 119 * The HELLO timeout 120 * @type {?NodeJS.Timer} 121 * @private 122 */ 123 Object.defineProperty(this, 'helloTimeout', { value: undefined, writable: true }); 124 125 /** 126 * If the manager attached its event handlers on the shard 127 * @type {boolean} 128 * @private 129 */ 130 Object.defineProperty(this, 'eventsAttached', { value: false, writable: true }); 131 132 /** 133 * A set of guild IDs this shard expects to receive 134 * @type {?Set<string>} 135 * @private 136 */ 137 Object.defineProperty(this, 'expectedGuilds', { value: undefined, writable: true }); 138 139 /** 140 * The ready timeout 141 * @type {?NodeJS.Timer} 142 * @private 143 */ 144 Object.defineProperty(this, 'readyTimeout', { value: undefined, writable: true }); 145 146 /** 147 * Time when the WebSocket connection was opened 148 * @type {number} 149 * @private 150 */ 151 Object.defineProperty(this, 'connectedAt', { value: 0, writable: true }); 152 } 153 154 /** 155 * Emits a debug event. 156 * @param {string} message The debug message 157 * @private 158 */ 159 debug(message) { 160 this.manager.debug(message, this); 161 } 162 163 /** 164 * Connects the shard to the gateway. 165 * @private 166 * @returns {Promise<void>} A promise that will resolve if the shard turns ready successfully, 167 * or reject if we couldn't connect 168 */ 169 connect() { 170 const { gateway, client } = this.manager; 171 172 if (this.connection && this.connection.readyState === WebSocket.OPEN && this.status === Status.READY) { 173 return Promise.resolve(); 174 } 175 176 return new Promise((resolve, reject) => { 177 const cleanup = () => { 178 this.removeListener(ShardEvents.CLOSE, onClose); 179 this.removeListener(ShardEvents.READY, onReady); 180 this.removeListener(ShardEvents.RESUMED, onResumed); 181 this.removeListener(ShardEvents.INVALID_SESSION, onInvalidOrDestroyed); 182 this.removeListener(ShardEvents.DESTROYED, onInvalidOrDestroyed); 183 }; 184 185 const onReady = () => { 186 cleanup(); 187 resolve(); 188 }; 189 190 const onResumed = () => { 191 cleanup(); 192 resolve(); 193 }; 194 195 const onClose = event => { 196 cleanup(); 197 reject(event); 198 }; 199 200 const onInvalidOrDestroyed = () => { 201 cleanup(); 202 // eslint-disable-next-line prefer-promise-reject-errors 203 reject(); 204 }; 205 206 this.once(ShardEvents.READY, onReady); 207 this.once(ShardEvents.RESUMED, onResumed); 208 this.once(ShardEvents.CLOSE, onClose); 209 this.once(ShardEvents.INVALID_SESSION, onInvalidOrDestroyed); 210 this.once(ShardEvents.DESTROYED, onInvalidOrDestroyed); 211 212 if (this.connection && this.connection.readyState === WebSocket.OPEN) { 213 this.debug('An open connection was found, attempting an immediate identify.'); 214 this.identify(); 215 return; 216 } 217 218 if (this.connection) { 219 this.debug(`A connection object was found. Cleaning up before continuing. 220 State: ${CONNECTION_STATE[this.connection.readyState]}`); 221 this.destroy({ emit: false }); 222 } 223 224 const wsQuery = { v: client.options.ws.version }; 225 226 if (zlib) { 227 this.inflate = new zlib.Inflate({ 228 chunkSize: 65535, 229 flush: zlib.Z_SYNC_FLUSH, 230 to: WebSocket.encoding === 'json' ? 'string' : '', 231 }); 232 wsQuery.compress = 'zlib-stream'; 233 } 234 235 this.debug( 236 `[CONNECT] 237 Gateway : ${gateway} 238 Version : ${client.options.ws.version} 239 Encoding : ${WebSocket.encoding} 240 Compression: ${zlib ? 'zlib-stream' : 'none'}`, 241 ); 242 243 this.status = this.status === Status.DISCONNECTED ? Status.RECONNECTING : Status.CONNECTING; 244 this.setHelloTimeout(); 245 246 this.connectedAt = Date.now(); 247 248 const ws = (this.connection = WebSocket.create(gateway, wsQuery)); 249 ws.onopen = this.onOpen.bind(this); 250 ws.onmessage = this.onMessage.bind(this); 251 ws.onerror = this.onError.bind(this); 252 ws.onclose = this.onClose.bind(this); 253 }); 254 } 255 256 /** 257 * Called whenever a connection is opened to the gateway. 258 * @private 259 */ 260 onOpen() { 261 this.debug(`[CONNECTED] ${this.connection.url} in ${Date.now() - this.connectedAt}ms`); 262 this.status = Status.NEARLY; 263 } 264 265 /** 266 * Called whenever a message is received. 267 * @param {MessageEvent} event Event received 268 * @private 269 */ 270 onMessage({ data }) { 271 let raw; 272 if (data instanceof ArrayBuffer) data = new Uint8Array(data); 273 if (zlib) { 274 const l = data.length; 275 const flush = 276 l >= 4 && data[l - 4] === 0x00 && data[l - 3] === 0x00 && data[l - 2] === 0xff && data[l - 1] === 0xff; 277 278 this.inflate.push(data, flush && zlib.Z_SYNC_FLUSH); 279 if (!flush) return; 280 raw = this.inflate.result; 281 } else { 282 raw = data; 283 } 284 let packet; 285 try { 286 packet = WebSocket.unpack(raw); 287 this.manager.client.emit(Events.RAW, packet, this.id); 288 if (packet.op === OPCodes.DISPATCH) this.manager.emit(packet.t, packet.d, this.id); 289 } catch (err) { 290 this.manager.client.emit(Events.SHARD_ERROR, err, this.id); 291 return; 292 } 293 this.onPacket(packet); 294 } 295 296 /** 297 * Called whenever an error occurs with the WebSocket. 298 * @param {ErrorEvent} event The error that occurred 299 * @private 300 */ 301 onError(event) { 302 const error = event && event.error ? event.error : event; 303 if (!error) return; 304 305 /** 306 * Emitted whenever a shard's WebSocket encounters a connection error. 307 * @event Client#shardError 308 * @param {Error} error The encountered error 309 * @param {number} shardID The shard that encountered this error 310 */ 311 this.manager.client.emit(Events.SHARD_ERROR, error, this.id); 312 } 313 314 /** 315 * @external CloseEvent 316 * @see {@link https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent} 317 */ 318 319 /** 320 * @external ErrorEvent 321 * @see {@link https://developer.mozilla.org/en-US/docs/Web/API/ErrorEvent} 322 */ 323 324 /** 325 * @external MessageEvent 326 * @see {@link https://developer.mozilla.org/en-US/docs/Web/API/MessageEvent} 327 */ 328 329 /** 330 * Called whenever a connection to the gateway is closed. 331 * @param {CloseEvent} event Close event that was received 332 * @private 333 */ 334 onClose(event) { 335 if (this.sequence !== -1) this.closeSequence = this.sequence; 336 this.sequence = -1; 337 338 this.debug(`[CLOSE] 339 Event Code: ${event.code} 340 Clean : ${event.wasClean} 341 Reason : ${event.reason || 'No reason received'}`); 342 343 this.setHeartbeatTimer(-1); 344 this.setHelloTimeout(-1); 345 // If we still have a connection object, clean up its listeners 346 if (this.connection) this._cleanupConnection(); 347 348 this.status = Status.DISCONNECTED; 349 350 /** 351 * Emitted when a shard's WebSocket closes. 352 * @private 353 * @event WebSocketShard#close 354 * @param {CloseEvent} event The received event 355 */ 356 this.emit(ShardEvents.CLOSE, event); 357 } 358 359 /** 360 * Called whenever a packet is received. 361 * @param {Object} packet The received packet 362 * @private 363 */ 364 onPacket(packet) { 365 if (!packet) { 366 this.debug(`Received broken packet: '${packet}'.`); 367 return; 368 } 369 370 switch (packet.t) { 371 case WSEvents.READY: 372 /** 373 * Emitted when the shard receives the READY payload and is now waiting for guilds 374 * @event WebSocketShard#ready 375 */ 376 this.emit(ShardEvents.READY); 377 378 this.sessionID = packet.d.session_id; 379 this.expectedGuilds = new Set(packet.d.guilds.map(d => d.id)); 380 this.status = Status.WAITING_FOR_GUILDS; 381 this.debug(`[READY] Session ${this.sessionID}.`); 382 this.lastHeartbeatAcked = true; 383 this.sendHeartbeat('ReadyHeartbeat'); 384 break; 385 case WSEvents.RESUMED: { 386 /** 387 * Emitted when the shard resumes successfully 388 * @event WebSocketShard#resumed 389 */ 390 this.emit(ShardEvents.RESUMED); 391 392 this.status = Status.READY; 393 const replayed = packet.s - this.closeSequence; 394 this.debug(`[RESUMED] Session ${this.sessionID} | Replayed ${replayed} events.`); 395 this.lastHeartbeatAcked = true; 396 this.sendHeartbeat('ResumeHeartbeat'); 397 break; 398 } 399 } 400 401 if (packet.s > this.sequence) this.sequence = packet.s; 402 403 switch (packet.op) { 404 case OPCodes.HELLO: 405 this.setHelloTimeout(-1); 406 this.setHeartbeatTimer(packet.d.heartbeat_interval); 407 this.identify(); 408 break; 409 case OPCodes.RECONNECT: 410 this.debug('[RECONNECT] Discord asked us to reconnect'); 411 this.destroy({ closeCode: 4000 }); 412 break; 413 case OPCodes.INVALID_SESSION: 414 this.debug(`[INVALID SESSION] Resumable: ${packet.d}.`); 415 // If we can resume the session, do so immediately 416 if (packet.d) { 417 this.identifyResume(); 418 return; 419 } 420 // Reset the sequence 421 this.sequence = -1; 422 // Reset the session ID as it's invalid 423 this.sessionID = undefined; 424 // Set the status to reconnecting 425 this.status = Status.RECONNECTING; 426 // Finally, emit the INVALID_SESSION event 427 this.emit(ShardEvents.INVALID_SESSION); 428 break; 429 case OPCodes.HEARTBEAT_ACK: 430 this.ackHeartbeat(); 431 break; 432 case OPCodes.HEARTBEAT: 433 this.sendHeartbeat('HeartbeatRequest', true); 434 break; 435 default: 436 this.manager.handlePacket(packet, this); 437 if (this.status === Status.WAITING_FOR_GUILDS && packet.t === WSEvents.GUILD_CREATE) { 438 this.expectedGuilds.delete(packet.d.id); 439 this.checkReady(); 440 } 441 } 442 } 443 444 /** 445 * Checks if the shard can be marked as ready 446 * @private 447 */ 448 checkReady() { 449 // Step 0. Clear the ready timeout, if it exists 450 if (this.readyTimeout) { 451 this.manager.client.clearTimeout(this.readyTimeout); 452 this.readyTimeout = undefined; 453 } 454 // Step 1. If we don't have any other guilds pending, we are ready 455 if (!this.expectedGuilds.size) { 456 this.debug('Shard received all its guilds. Marking as fully ready.'); 457 this.status = Status.READY; 458 459 /** 460 * Emitted when the shard is fully ready. 461 * This event is emitted if: 462 * * all guilds were received by this shard 463 * * the ready timeout expired, and some guilds are unavailable 464 * @event WebSocketShard#allReady 465 * @param {?Set<string>} unavailableGuilds Set of unavailable guilds, if any 466 */ 467 this.emit(ShardEvents.ALL_READY); 468 return; 469 } 470 // Step 2. Create a 15s timeout that will mark the shard as ready if there are still unavailable guilds 471 this.readyTimeout = this.manager.client.setTimeout(() => { 472 this.debug(`Shard did not receive any more guild packets in 15 seconds. 473 Unavailable guild count: ${this.expectedGuilds.size}`); 474 475 this.readyTimeout = undefined; 476 477 this.status = Status.READY; 478 479 this.emit(ShardEvents.ALL_READY, this.expectedGuilds); 480 }, 15000); 481 } 482 483 /** 484 * Sets the HELLO packet timeout. 485 * @param {number} [time] If set to -1, it will clear the hello timeout timeout 486 * @private 487 */ 488 setHelloTimeout(time) { 489 if (time === -1) { 490 if (this.helloTimeout) { 491 this.debug('Clearing the HELLO timeout.'); 492 this.manager.client.clearTimeout(this.helloTimeout); 493 this.helloTimeout = undefined; 494 } 495 return; 496 } 497 this.debug('Setting a HELLO timeout for 20s.'); 498 this.helloTimeout = this.manager.client.setTimeout(() => { 499 this.debug('Did not receive HELLO in time. Destroying and connecting again.'); 500 this.destroy({ reset: true, closeCode: 4009 }); 501 }, 20000); 502 } 503 504 /** 505 * Sets the heartbeat timer for this shard. 506 * @param {number} time If -1, clears the interval, any other number sets an interval 507 * @private 508 */ 509 setHeartbeatTimer(time) { 510 if (time === -1) { 511 if (this.heartbeatInterval) { 512 this.debug('Clearing the heartbeat interval.'); 513 this.manager.client.clearInterval(this.heartbeatInterval); 514 this.heartbeatInterval = undefined; 515 } 516 return; 517 } 518 this.debug(`Setting a heartbeat interval for ${time}ms.`); 519 // Sanity checks 520 if (this.heartbeatInterval) this.manager.client.clearInterval(this.heartbeatInterval); 521 this.heartbeatInterval = this.manager.client.setInterval(() => this.sendHeartbeat(), time); 522 } 523 524 /** 525 * Sends a heartbeat to the WebSocket. 526 * If this shard didn't receive a heartbeat last time, it will destroy it and reconnect 527 * @param {string} [tag='HeartbeatTimer'] What caused this heartbeat to be sent 528 * @param {boolean} [ignoreHeartbeatAck] If we should send the heartbeat forcefully. 529 * @private 530 */ 531 sendHeartbeat( 532 tag = 'HeartbeatTimer', 533 ignoreHeartbeatAck = [Status.WAITING_FOR_GUILDS, Status.IDENTIFYING, Status.RESUMING].includes(this.status), 534 ) { 535 if (ignoreHeartbeatAck && !this.lastHeartbeatAcked) { 536 this.debug(`[${tag}] Didn't process heartbeat ack yet but we are still connected. Sending one now.`); 537 } else if (!this.lastHeartbeatAcked) { 538 this.debug( 539 `[${tag}] Didn't receive a heartbeat ack last time, assuming zombie connection. Destroying and reconnecting. 540 Status : ${STATUS_KEYS[this.status]} 541 Sequence : ${this.sequence} 542 Connection State: ${this.connection ? CONNECTION_STATE[this.connection.readyState] : 'No Connection??'}`, 543 ); 544 545 this.destroy({ closeCode: 4009, reset: true }); 546 return; 547 } 548 549 this.debug(`[${tag}] Sending a heartbeat.`); 550 this.lastHeartbeatAcked = false; 551 this.lastPingTimestamp = Date.now(); 552 this.send({ op: OPCodes.HEARTBEAT, d: this.sequence }, true); 553 } 554 555 /** 556 * Acknowledges a heartbeat. 557 * @private 558 */ 559 ackHeartbeat() { 560 this.lastHeartbeatAcked = true; 561 const latency = Date.now() - this.lastPingTimestamp; 562 this.debug(`Heartbeat acknowledged, latency of ${latency}ms.`); 563 this.ping = latency; 564 } 565 566 /** 567 * Identifies the client on the connection. 568 * @private 569 * @returns {void} 570 */ 571 identify() { 572 return this.sessionID ? this.identifyResume() : this.identifyNew(); 573 } 574 575 /** 576 * Identifies as a new connection on the gateway. 577 * @private 578 */ 579 identifyNew() { 580 const { client } = this.manager; 581 if (!client.token) { 582 this.debug('[IDENTIFY] No token available to identify a new session.'); 583 return; 584 } 585 586 this.status = Status.IDENTIFYING; 587 588 // Clone the identify payload and assign the token and shard info 589 const d = { 590 ...client.options.ws, 591 token: client.token, 592 shard: [this.id, Number(client.options.shardCount)], 593 }; 594 595 this.debug(`[IDENTIFY] Shard ${this.id}/${client.options.shardCount}`); 596 this.send({ op: OPCodes.IDENTIFY, d }, true); 597 } 598 599 /** 600 * Resumes a session on the gateway. 601 * @private 602 */ 603 identifyResume() { 604 if (!this.sessionID) { 605 this.debug('[RESUME] No session ID was present; identifying as a new session.'); 606 this.identifyNew(); 607 return; 608 } 609 610 this.status = Status.RESUMING; 611 612 this.debug(`[RESUME] Session ${this.sessionID}, sequence ${this.closeSequence}`); 613 614 const d = { 615 token: this.manager.client.token, 616 session_id: this.sessionID, 617 seq: this.closeSequence, 618 }; 619 620 this.send({ op: OPCodes.RESUME, d }, true); 621 } 622 623 /** 624 * Adds a packet to the queue to be sent to the gateway. 625 * <warn>If you use this method, make sure you understand that you need to provide 626 * a full [Payload](https://discordapp.com/developers/docs/topics/gateway#commands-and-events-gateway-commands). 627 * Do not use this method if you don't know what you're doing.</warn> 628 * @param {Object} data The full packet to send 629 * @param {boolean} [important=false] If this packet should be added first in queue 630 */ 631 send(data, important = false) { 632 this.ratelimit.queue[important ? 'unshift' : 'push'](data); 633 this.processQueue(); 634 } 635 636 /** 637 * Sends data, bypassing the queue. 638 * @param {Object} data Packet to send 639 * @returns {void} 640 * @private 641 */ 642 _send(data) { 643 if (!this.connection || this.connection.readyState !== WebSocket.OPEN) { 644 this.debug(`Tried to send packet '${JSON.stringify(data)}' but no WebSocket is available!`); 645 this.destroy({ close: 4000 }); 646 return; 647 } 648 649 this.connection.send(WebSocket.pack(data), err => { 650 if (err) this.manager.client.emit(Events.SHARD_ERROR, err, this.id); 651 }); 652 } 653 654 /** 655 * Processes the current WebSocket queue. 656 * @returns {void} 657 * @private 658 */ 659 processQueue() { 660 if (this.ratelimit.remaining === 0) return; 661 if (this.ratelimit.queue.length === 0) return; 662 if (this.ratelimit.remaining === this.ratelimit.total) { 663 this.ratelimit.timer = this.manager.client.setTimeout(() => { 664 this.ratelimit.remaining = this.ratelimit.total; 665 this.processQueue(); 666 }, this.ratelimit.time); 667 } 668 while (this.ratelimit.remaining > 0) { 669 const item = this.ratelimit.queue.shift(); 670 if (!item) return; 671 this._send(item); 672 this.ratelimit.remaining--; 673 } 674 } 675 676 /** 677 * Destroys this shard and closes its WebSocket connection. 678 * @param {Object} [options={ closeCode: 1000, reset: false, emit: true, log: true }] Options for destroying the shard 679 * @private 680 */ 681 destroy({ closeCode = 1000, reset = false, emit = true, log = true } = {}) { 682 if (log) { 683 this.debug(`[DESTROY] 684 Close Code : ${closeCode} 685 Reset : ${reset} 686 Emit DESTROYED: ${emit}`); 687 } 688 689 // Step 0: Remove all timers 690 this.setHeartbeatTimer(-1); 691 this.setHelloTimeout(-1); 692 693 // Step 1: Close the WebSocket connection, if any, otherwise, emit DESTROYED 694 if (this.connection) { 695 // If the connection is currently opened, we will (hopefully) receive close 696 if (this.connection.readyState === WebSocket.OPEN) { 697 this.connection.close(closeCode); 698 } else { 699 // Connection is not OPEN 700 this.debug(`WS State: ${CONNECTION_STATE[this.connection.readyState]}`); 701 // Remove listeners from the connection 702 this._cleanupConnection(); 703 // Attempt to close the connection just in case 704 try { 705 this.connection.close(closeCode); 706 } catch { 707 // No-op 708 } 709 // Emit the destroyed event if needed 710 if (emit) this._emitDestroyed(); 711 } 712 } else if (emit) { 713 // We requested a destroy, but we had no connection. Emit destroyed 714 this._emitDestroyed(); 715 } 716 717 // Step 2: Null the connection object 718 this.connection = null; 719 720 // Step 3: Set the shard status to DISCONNECTED 721 this.status = Status.DISCONNECTED; 722 723 // Step 4: Cache the old sequence (use to attempt a resume) 724 if (this.sequence !== -1) this.closeSequence = this.sequence; 725 726 // Step 5: Reset the sequence and session ID if requested 727 if (reset) { 728 this.sequence = -1; 729 this.sessionID = undefined; 730 } 731 732 // Step 6: reset the ratelimit data 733 this.ratelimit.remaining = this.ratelimit.total; 734 this.ratelimit.queue.length = 0; 735 if (this.ratelimit.timer) { 736 this.manager.client.clearTimeout(this.ratelimit.timer); 737 this.ratelimit.timer = null; 738 } 739 } 740 741 /** 742 * Cleans up the WebSocket connection listeners. 743 * @private 744 */ 745 _cleanupConnection() { 746 this.connection.onopen = this.connection.onclose = this.connection.onerror = this.connection.onmessage = null; 747 } 748 749 /** 750 * Emits the DESTROYED event on the shard 751 * @private 752 */ 753 _emitDestroyed() { 754 /** 755 * Emitted when a shard is destroyed, but no WebSocket connection was present. 756 * @private 757 * @event WebSocketShard#destroyed 758 */ 759 this.emit(ShardEvents.DESTROYED); 760 } 761 } 762 763 module.exports = WebSocketShard;