buddy

node MVC discord bot
Log | Files | Refs | README

Shard.js (11527B)


      1 'use strict';
      2 
      3 const EventEmitter = require('events');
      4 const path = require('path');
      5 const { Error } = require('../errors');
      6 const Util = require('../util/Util');
      7 let childProcess = null;
      8 let Worker = null;
      9 
     10 /**
     11  * A self-contained shard created by the {@link ShardingManager}. Each one has a {@link ChildProcess} that contains
     12  * an instance of the bot and its {@link Client}. When its child process/worker exits for any reason, the shard will
     13  * spawn a new one to replace it as necessary.
     14  * @extends EventEmitter
     15  */
     16 class Shard extends EventEmitter {
     17   /**
     18    * @param {ShardingManager} manager Manager that is creating this shard
     19    * @param {number} id ID of this shard
     20    */
     21   constructor(manager, id) {
     22     super();
     23 
     24     if (manager.mode === 'process') childProcess = require('child_process');
     25     else if (manager.mode === 'worker') Worker = require('worker_threads').Worker;
     26 
     27     /**
     28      * Manager that created the shard
     29      * @type {ShardingManager}
     30      */
     31     this.manager = manager;
     32 
     33     /**
     34      * ID of the shard in the manager
     35      * @type {number}
     36      */
     37     this.id = id;
     38 
     39     /**
     40      * Arguments for the shard's process (only when {@link ShardingManager#mode} is `process`)
     41      * @type {string[]}
     42      */
     43     this.args = manager.shardArgs || [];
     44 
     45     /**
     46      * Arguments for the shard's process executable (only when {@link ShardingManager#mode} is `process`)
     47      * @type {?string[]}
     48      */
     49     this.execArgv = manager.execArgv;
     50 
     51     /**
     52      * Environment variables for the shard's process, or workerData for the shard's worker
     53      * @type {Object}
     54      */
     55     this.env = Object.assign({}, process.env, {
     56       SHARDING_MANAGER: true,
     57       SHARDS: this.id,
     58       SHARD_COUNT: this.manager.totalShards,
     59       DISCORD_TOKEN: this.manager.token,
     60     });
     61 
     62     /**
     63      * Whether the shard's {@link Client} is ready
     64      * @type {boolean}
     65      */
     66     this.ready = false;
     67 
     68     /**
     69      * Process of the shard (if {@link ShardingManager#mode} is `process`)
     70      * @type {?ChildProcess}
     71      */
     72     this.process = null;
     73 
     74     /**
     75      * Worker of the shard (if {@link ShardingManager#mode} is `worker`)
     76      * @type {?Worker}
     77      */
     78     this.worker = null;
     79 
     80     /**
     81      * Ongoing promises for calls to {@link Shard#eval}, mapped by the `script` they were called with
     82      * @type {Map<string, Promise>}
     83      * @private
     84      */
     85     this._evals = new Map();
     86 
     87     /**
     88      * Ongoing promises for calls to {@link Shard#fetchClientValue}, mapped by the `prop` they were called with
     89      * @type {Map<string, Promise>}
     90      * @private
     91      */
     92     this._fetches = new Map();
     93 
     94     /**
     95      * Listener function for the {@link ChildProcess}' `exit` event
     96      * @type {Function}
     97      * @private
     98      */
     99     this._exitListener = this._handleExit.bind(this, undefined);
    100   }
    101 
    102   /**
    103    * Forks a child process or creates a worker thread for the shard.
    104    * <warn>You should not need to call this manually.</warn>
    105    * @param {number} [spawnTimeout=30000] The amount in milliseconds to wait until the {@link Client} has become ready
    106    * before resolving. (-1 or Infinity for no wait)
    107    * @returns {Promise<ChildProcess>}
    108    */
    109   async spawn(spawnTimeout = 30000) {
    110     if (this.process) throw new Error('SHARDING_PROCESS_EXISTS', this.id);
    111     if (this.worker) throw new Error('SHARDING_WORKER_EXISTS', this.id);
    112 
    113     if (this.manager.mode === 'process') {
    114       this.process = childProcess
    115         .fork(path.resolve(this.manager.file), this.args, {
    116           env: this.env,
    117           execArgv: this.execArgv,
    118         })
    119         .on('message', this._handleMessage.bind(this))
    120         .on('exit', this._exitListener);
    121     } else if (this.manager.mode === 'worker') {
    122       this.worker = new Worker(path.resolve(this.manager.file), { workerData: this.env })
    123         .on('message', this._handleMessage.bind(this))
    124         .on('exit', this._exitListener);
    125     }
    126 
    127     /**
    128      * Emitted upon the creation of the shard's child process/worker.
    129      * @event Shard#spawn
    130      * @param {ChildProcess|Worker} process Child process/worker that was created
    131      */
    132     this.emit('spawn', this.process || this.worker);
    133 
    134     if (spawnTimeout === -1 || spawnTimeout === Infinity) return this.process || this.worker;
    135     await new Promise((resolve, reject) => {
    136       const cleanup = () => {
    137         clearTimeout(spawnTimeoutTimer);
    138         this.off('ready', onReady);
    139         this.off('disconnect', onDisconnect);
    140         this.off('death', onDeath);
    141       };
    142 
    143       const onReady = () => {
    144         cleanup();
    145         resolve();
    146       };
    147 
    148       const onDisconnect = () => {
    149         cleanup();
    150         reject(new Error('SHARDING_READY_DISCONNECTED', this.id));
    151       };
    152 
    153       const onDeath = () => {
    154         cleanup();
    155         reject(new Error('SHARDING_READY_DIED', this.id));
    156       };
    157 
    158       const onTimeout = () => {
    159         cleanup();
    160         reject(new Error('SHARDING_READY_TIMEOUT', this.id));
    161       };
    162 
    163       const spawnTimeoutTimer = setTimeout(onTimeout, spawnTimeout);
    164       this.once('ready', onReady);
    165       this.once('disconnect', onDisconnect);
    166       this.once('death', onDeath);
    167     });
    168     return this.process || this.worker;
    169   }
    170 
    171   /**
    172    * Immediately kills the shard's process/worker and does not restart it.
    173    */
    174   kill() {
    175     if (this.process) {
    176       this.process.removeListener('exit', this._exitListener);
    177       this.process.kill();
    178     } else {
    179       this.worker.removeListener('exit', this._exitListener);
    180       this.worker.terminate();
    181     }
    182 
    183     this._handleExit(false);
    184   }
    185 
    186   /**
    187    * Kills and restarts the shard's process/worker.
    188    * @param {number} [delay=500] How long to wait between killing the process/worker and restarting it (in milliseconds)
    189    * @param {number} [spawnTimeout=30000] The amount in milliseconds to wait until the {@link Client} has become ready
    190    * before resolving. (-1 or Infinity for no wait)
    191    * @returns {Promise<ChildProcess>}
    192    */
    193   async respawn(delay = 500, spawnTimeout) {
    194     this.kill();
    195     if (delay > 0) await Util.delayFor(delay);
    196     return this.spawn(spawnTimeout);
    197   }
    198 
    199   /**
    200    * Sends a message to the shard's process/worker.
    201    * @param {*} message Message to send to the shard
    202    * @returns {Promise<Shard>}
    203    */
    204   send(message) {
    205     return new Promise((resolve, reject) => {
    206       if (this.process) {
    207         this.process.send(message, err => {
    208           if (err) reject(err);
    209           else resolve(this);
    210         });
    211       } else {
    212         this.worker.postMessage(message);
    213         resolve(this);
    214       }
    215     });
    216   }
    217 
    218   /**
    219    * Fetches a client property value of the shard.
    220    * @param {string} prop Name of the client property to get, using periods for nesting
    221    * @returns {Promise<*>}
    222    * @example
    223    * shard.fetchClientValue('guilds.cache.size')
    224    *   .then(count => console.log(`${count} guilds in shard ${shard.id}`))
    225    *   .catch(console.error);
    226    */
    227   fetchClientValue(prop) {
    228     if (this._fetches.has(prop)) return this._fetches.get(prop);
    229 
    230     const promise = new Promise((resolve, reject) => {
    231       const child = this.process || this.worker;
    232 
    233       const listener = message => {
    234         if (!message || message._fetchProp !== prop) return;
    235         child.removeListener('message', listener);
    236         this._fetches.delete(prop);
    237         resolve(message._result);
    238       };
    239       child.on('message', listener);
    240 
    241       this.send({ _fetchProp: prop }).catch(err => {
    242         child.removeListener('message', listener);
    243         this._fetches.delete(prop);
    244         reject(err);
    245       });
    246     });
    247 
    248     this._fetches.set(prop, promise);
    249     return promise;
    250   }
    251 
    252   /**
    253    * Evaluates a script or function on the shard, in the context of the {@link Client}.
    254    * @param {string|Function} script JavaScript to run on the shard
    255    * @returns {Promise<*>} Result of the script execution
    256    */
    257   eval(script) {
    258     if (this._evals.has(script)) return this._evals.get(script);
    259 
    260     const promise = new Promise((resolve, reject) => {
    261       const child = this.process || this.worker;
    262 
    263       const listener = message => {
    264         if (!message || message._eval !== script) return;
    265         child.removeListener('message', listener);
    266         this._evals.delete(script);
    267         if (!message._error) resolve(message._result);
    268         else reject(Util.makeError(message._error));
    269       };
    270       child.on('message', listener);
    271 
    272       const _eval = typeof script === 'function' ? `(${script})(this)` : script;
    273       this.send({ _eval }).catch(err => {
    274         child.removeListener('message', listener);
    275         this._evals.delete(script);
    276         reject(err);
    277       });
    278     });
    279 
    280     this._evals.set(script, promise);
    281     return promise;
    282   }
    283 
    284   /**
    285    * Handles a message received from the child process/worker.
    286    * @param {*} message Message received
    287    * @private
    288    */
    289   _handleMessage(message) {
    290     if (message) {
    291       // Shard is ready
    292       if (message._ready) {
    293         this.ready = true;
    294         /**
    295          * Emitted upon the shard's {@link Client#ready} event.
    296          * @event Shard#ready
    297          */
    298         this.emit('ready');
    299         return;
    300       }
    301 
    302       // Shard has disconnected
    303       if (message._disconnect) {
    304         this.ready = false;
    305         /**
    306          * Emitted upon the shard's {@link Client#disconnect} event.
    307          * @event Shard#disconnect
    308          */
    309         this.emit('disconnect');
    310         return;
    311       }
    312 
    313       // Shard is attempting to reconnect
    314       if (message._reconnecting) {
    315         this.ready = false;
    316         /**
    317          * Emitted upon the shard's {@link Client#reconnecting} event.
    318          * @event Shard#reconnecting
    319          */
    320         this.emit('reconnecting');
    321         return;
    322       }
    323 
    324       // Shard is requesting a property fetch
    325       if (message._sFetchProp) {
    326         this.manager.fetchClientValues(message._sFetchProp).then(
    327           results => this.send({ _sFetchProp: message._sFetchProp, _result: results }),
    328           err => this.send({ _sFetchProp: message._sFetchProp, _error: Util.makePlainError(err) }),
    329         );
    330         return;
    331       }
    332 
    333       // Shard is requesting an eval broadcast
    334       if (message._sEval) {
    335         this.manager.broadcastEval(message._sEval).then(
    336           results => this.send({ _sEval: message._sEval, _result: results }),
    337           err => this.send({ _sEval: message._sEval, _error: Util.makePlainError(err) }),
    338         );
    339         return;
    340       }
    341 
    342       // Shard is requesting a respawn of all shards
    343       if (message._sRespawnAll) {
    344         const { shardDelay, respawnDelay, spawnTimeout } = message._sRespawnAll;
    345         this.manager.respawnAll(shardDelay, respawnDelay, spawnTimeout).catch(() => {
    346           // Do nothing
    347         });
    348         return;
    349       }
    350     }
    351 
    352     /**
    353      * Emitted upon receiving a message from the child process/worker.
    354      * @event Shard#message
    355      * @param {*} message Message that was received
    356      */
    357     this.emit('message', message);
    358   }
    359 
    360   /**
    361    * Handles the shard's process/worker exiting.
    362    * @param {boolean} [respawn=this.manager.respawn] Whether to spawn the shard again
    363    * @private
    364    */
    365   _handleExit(respawn = this.manager.respawn) {
    366     /**
    367      * Emitted upon the shard's child process/worker exiting.
    368      * @event Shard#death
    369      * @param {ChildProcess|Worker} process Child process/worker that exited
    370      */
    371     this.emit('death', this.process || this.worker);
    372 
    373     this.ready = false;
    374     this.process = null;
    375     this.worker = null;
    376     this._evals.clear();
    377     this._fetches.clear();
    378 
    379     if (respawn) this.spawn().catch(err => this.emit('error', err));
    380   }
    381 }
    382 
    383 module.exports = Shard;