Shard.js (11527B)
1 'use strict'; 2 3 const EventEmitter = require('events'); 4 const path = require('path'); 5 const { Error } = require('../errors'); 6 const Util = require('../util/Util'); 7 let childProcess = null; 8 let Worker = null; 9 10 /** 11 * A self-contained shard created by the {@link ShardingManager}. Each one has a {@link ChildProcess} that contains 12 * an instance of the bot and its {@link Client}. When its child process/worker exits for any reason, the shard will 13 * spawn a new one to replace it as necessary. 14 * @extends EventEmitter 15 */ 16 class Shard extends EventEmitter { 17 /** 18 * @param {ShardingManager} manager Manager that is creating this shard 19 * @param {number} id ID of this shard 20 */ 21 constructor(manager, id) { 22 super(); 23 24 if (manager.mode === 'process') childProcess = require('child_process'); 25 else if (manager.mode === 'worker') Worker = require('worker_threads').Worker; 26 27 /** 28 * Manager that created the shard 29 * @type {ShardingManager} 30 */ 31 this.manager = manager; 32 33 /** 34 * ID of the shard in the manager 35 * @type {number} 36 */ 37 this.id = id; 38 39 /** 40 * Arguments for the shard's process (only when {@link ShardingManager#mode} is `process`) 41 * @type {string[]} 42 */ 43 this.args = manager.shardArgs || []; 44 45 /** 46 * Arguments for the shard's process executable (only when {@link ShardingManager#mode} is `process`) 47 * @type {?string[]} 48 */ 49 this.execArgv = manager.execArgv; 50 51 /** 52 * Environment variables for the shard's process, or workerData for the shard's worker 53 * @type {Object} 54 */ 55 this.env = Object.assign({}, process.env, { 56 SHARDING_MANAGER: true, 57 SHARDS: this.id, 58 SHARD_COUNT: this.manager.totalShards, 59 DISCORD_TOKEN: this.manager.token, 60 }); 61 62 /** 63 * Whether the shard's {@link Client} is ready 64 * @type {boolean} 65 */ 66 this.ready = false; 67 68 /** 69 * Process of the shard (if {@link ShardingManager#mode} is `process`) 70 * @type {?ChildProcess} 71 */ 72 this.process = null; 73 74 /** 75 * Worker of the shard (if {@link ShardingManager#mode} is `worker`) 76 * @type {?Worker} 77 */ 78 this.worker = null; 79 80 /** 81 * Ongoing promises for calls to {@link Shard#eval}, mapped by the `script` they were called with 82 * @type {Map<string, Promise>} 83 * @private 84 */ 85 this._evals = new Map(); 86 87 /** 88 * Ongoing promises for calls to {@link Shard#fetchClientValue}, mapped by the `prop` they were called with 89 * @type {Map<string, Promise>} 90 * @private 91 */ 92 this._fetches = new Map(); 93 94 /** 95 * Listener function for the {@link ChildProcess}' `exit` event 96 * @type {Function} 97 * @private 98 */ 99 this._exitListener = this._handleExit.bind(this, undefined); 100 } 101 102 /** 103 * Forks a child process or creates a worker thread for the shard. 104 * <warn>You should not need to call this manually.</warn> 105 * @param {number} [spawnTimeout=30000] The amount in milliseconds to wait until the {@link Client} has become ready 106 * before resolving. (-1 or Infinity for no wait) 107 * @returns {Promise<ChildProcess>} 108 */ 109 async spawn(spawnTimeout = 30000) { 110 if (this.process) throw new Error('SHARDING_PROCESS_EXISTS', this.id); 111 if (this.worker) throw new Error('SHARDING_WORKER_EXISTS', this.id); 112 113 if (this.manager.mode === 'process') { 114 this.process = childProcess 115 .fork(path.resolve(this.manager.file), this.args, { 116 env: this.env, 117 execArgv: this.execArgv, 118 }) 119 .on('message', this._handleMessage.bind(this)) 120 .on('exit', this._exitListener); 121 } else if (this.manager.mode === 'worker') { 122 this.worker = new Worker(path.resolve(this.manager.file), { workerData: this.env }) 123 .on('message', this._handleMessage.bind(this)) 124 .on('exit', this._exitListener); 125 } 126 127 /** 128 * Emitted upon the creation of the shard's child process/worker. 129 * @event Shard#spawn 130 * @param {ChildProcess|Worker} process Child process/worker that was created 131 */ 132 this.emit('spawn', this.process || this.worker); 133 134 if (spawnTimeout === -1 || spawnTimeout === Infinity) return this.process || this.worker; 135 await new Promise((resolve, reject) => { 136 const cleanup = () => { 137 clearTimeout(spawnTimeoutTimer); 138 this.off('ready', onReady); 139 this.off('disconnect', onDisconnect); 140 this.off('death', onDeath); 141 }; 142 143 const onReady = () => { 144 cleanup(); 145 resolve(); 146 }; 147 148 const onDisconnect = () => { 149 cleanup(); 150 reject(new Error('SHARDING_READY_DISCONNECTED', this.id)); 151 }; 152 153 const onDeath = () => { 154 cleanup(); 155 reject(new Error('SHARDING_READY_DIED', this.id)); 156 }; 157 158 const onTimeout = () => { 159 cleanup(); 160 reject(new Error('SHARDING_READY_TIMEOUT', this.id)); 161 }; 162 163 const spawnTimeoutTimer = setTimeout(onTimeout, spawnTimeout); 164 this.once('ready', onReady); 165 this.once('disconnect', onDisconnect); 166 this.once('death', onDeath); 167 }); 168 return this.process || this.worker; 169 } 170 171 /** 172 * Immediately kills the shard's process/worker and does not restart it. 173 */ 174 kill() { 175 if (this.process) { 176 this.process.removeListener('exit', this._exitListener); 177 this.process.kill(); 178 } else { 179 this.worker.removeListener('exit', this._exitListener); 180 this.worker.terminate(); 181 } 182 183 this._handleExit(false); 184 } 185 186 /** 187 * Kills and restarts the shard's process/worker. 188 * @param {number} [delay=500] How long to wait between killing the process/worker and restarting it (in milliseconds) 189 * @param {number} [spawnTimeout=30000] The amount in milliseconds to wait until the {@link Client} has become ready 190 * before resolving. (-1 or Infinity for no wait) 191 * @returns {Promise<ChildProcess>} 192 */ 193 async respawn(delay = 500, spawnTimeout) { 194 this.kill(); 195 if (delay > 0) await Util.delayFor(delay); 196 return this.spawn(spawnTimeout); 197 } 198 199 /** 200 * Sends a message to the shard's process/worker. 201 * @param {*} message Message to send to the shard 202 * @returns {Promise<Shard>} 203 */ 204 send(message) { 205 return new Promise((resolve, reject) => { 206 if (this.process) { 207 this.process.send(message, err => { 208 if (err) reject(err); 209 else resolve(this); 210 }); 211 } else { 212 this.worker.postMessage(message); 213 resolve(this); 214 } 215 }); 216 } 217 218 /** 219 * Fetches a client property value of the shard. 220 * @param {string} prop Name of the client property to get, using periods for nesting 221 * @returns {Promise<*>} 222 * @example 223 * shard.fetchClientValue('guilds.cache.size') 224 * .then(count => console.log(`${count} guilds in shard ${shard.id}`)) 225 * .catch(console.error); 226 */ 227 fetchClientValue(prop) { 228 if (this._fetches.has(prop)) return this._fetches.get(prop); 229 230 const promise = new Promise((resolve, reject) => { 231 const child = this.process || this.worker; 232 233 const listener = message => { 234 if (!message || message._fetchProp !== prop) return; 235 child.removeListener('message', listener); 236 this._fetches.delete(prop); 237 resolve(message._result); 238 }; 239 child.on('message', listener); 240 241 this.send({ _fetchProp: prop }).catch(err => { 242 child.removeListener('message', listener); 243 this._fetches.delete(prop); 244 reject(err); 245 }); 246 }); 247 248 this._fetches.set(prop, promise); 249 return promise; 250 } 251 252 /** 253 * Evaluates a script or function on the shard, in the context of the {@link Client}. 254 * @param {string|Function} script JavaScript to run on the shard 255 * @returns {Promise<*>} Result of the script execution 256 */ 257 eval(script) { 258 if (this._evals.has(script)) return this._evals.get(script); 259 260 const promise = new Promise((resolve, reject) => { 261 const child = this.process || this.worker; 262 263 const listener = message => { 264 if (!message || message._eval !== script) return; 265 child.removeListener('message', listener); 266 this._evals.delete(script); 267 if (!message._error) resolve(message._result); 268 else reject(Util.makeError(message._error)); 269 }; 270 child.on('message', listener); 271 272 const _eval = typeof script === 'function' ? `(${script})(this)` : script; 273 this.send({ _eval }).catch(err => { 274 child.removeListener('message', listener); 275 this._evals.delete(script); 276 reject(err); 277 }); 278 }); 279 280 this._evals.set(script, promise); 281 return promise; 282 } 283 284 /** 285 * Handles a message received from the child process/worker. 286 * @param {*} message Message received 287 * @private 288 */ 289 _handleMessage(message) { 290 if (message) { 291 // Shard is ready 292 if (message._ready) { 293 this.ready = true; 294 /** 295 * Emitted upon the shard's {@link Client#ready} event. 296 * @event Shard#ready 297 */ 298 this.emit('ready'); 299 return; 300 } 301 302 // Shard has disconnected 303 if (message._disconnect) { 304 this.ready = false; 305 /** 306 * Emitted upon the shard's {@link Client#disconnect} event. 307 * @event Shard#disconnect 308 */ 309 this.emit('disconnect'); 310 return; 311 } 312 313 // Shard is attempting to reconnect 314 if (message._reconnecting) { 315 this.ready = false; 316 /** 317 * Emitted upon the shard's {@link Client#reconnecting} event. 318 * @event Shard#reconnecting 319 */ 320 this.emit('reconnecting'); 321 return; 322 } 323 324 // Shard is requesting a property fetch 325 if (message._sFetchProp) { 326 this.manager.fetchClientValues(message._sFetchProp).then( 327 results => this.send({ _sFetchProp: message._sFetchProp, _result: results }), 328 err => this.send({ _sFetchProp: message._sFetchProp, _error: Util.makePlainError(err) }), 329 ); 330 return; 331 } 332 333 // Shard is requesting an eval broadcast 334 if (message._sEval) { 335 this.manager.broadcastEval(message._sEval).then( 336 results => this.send({ _sEval: message._sEval, _result: results }), 337 err => this.send({ _sEval: message._sEval, _error: Util.makePlainError(err) }), 338 ); 339 return; 340 } 341 342 // Shard is requesting a respawn of all shards 343 if (message._sRespawnAll) { 344 const { shardDelay, respawnDelay, spawnTimeout } = message._sRespawnAll; 345 this.manager.respawnAll(shardDelay, respawnDelay, spawnTimeout).catch(() => { 346 // Do nothing 347 }); 348 return; 349 } 350 } 351 352 /** 353 * Emitted upon receiving a message from the child process/worker. 354 * @event Shard#message 355 * @param {*} message Message that was received 356 */ 357 this.emit('message', message); 358 } 359 360 /** 361 * Handles the shard's process/worker exiting. 362 * @param {boolean} [respawn=this.manager.respawn] Whether to spawn the shard again 363 * @private 364 */ 365 _handleExit(respawn = this.manager.respawn) { 366 /** 367 * Emitted upon the shard's child process/worker exiting. 368 * @event Shard#death 369 * @param {ChildProcess|Worker} process Child process/worker that exited 370 */ 371 this.emit('death', this.process || this.worker); 372 373 this.ready = false; 374 this.process = null; 375 this.worker = null; 376 this._evals.clear(); 377 this._fetches.clear(); 378 379 if (respawn) this.spawn().catch(err => this.emit('error', err)); 380 } 381 } 382 383 module.exports = Shard;