ShardClientUtil.js (7223B)
1 'use strict'; 2 3 const { Events } = require('../util/Constants'); 4 const Util = require('../util/Util'); 5 6 /** 7 * Helper class for sharded clients spawned as a child process/worker, such as from a {@link ShardingManager}. 8 * Utilises IPC to send and receive data to/from the master process and other shards. 9 */ 10 class ShardClientUtil { 11 /** 12 * @param {Client} client Client of the current shard 13 * @param {ShardingManagerMode} mode Mode the shard was spawned with 14 */ 15 constructor(client, mode) { 16 /** 17 * Client for the shard 18 * @type {Client} 19 */ 20 this.client = client; 21 22 /** 23 * Mode the shard was spawned with 24 * @type {ShardingManagerMode} 25 */ 26 this.mode = mode; 27 28 /** 29 * Message port for the master process (only when {@link ShardClientUtil#mode} is `worker`) 30 * @type {?MessagePort} 31 */ 32 this.parentPort = null; 33 34 if (mode === 'process') { 35 process.on('message', this._handleMessage.bind(this)); 36 client.on('ready', () => { 37 process.send({ _ready: true }); 38 }); 39 client.on('disconnect', () => { 40 process.send({ _disconnect: true }); 41 }); 42 client.on('reconnecting', () => { 43 process.send({ _reconnecting: true }); 44 }); 45 } else if (mode === 'worker') { 46 this.parentPort = require('worker_threads').parentPort; 47 this.parentPort.on('message', this._handleMessage.bind(this)); 48 client.on('ready', () => { 49 this.parentPort.postMessage({ _ready: true }); 50 }); 51 client.on('disconnect', () => { 52 this.parentPort.postMessage({ _disconnect: true }); 53 }); 54 client.on('reconnecting', () => { 55 this.parentPort.postMessage({ _reconnecting: true }); 56 }); 57 } 58 } 59 60 /** 61 * Array of shard IDs of this client 62 * @type {number[]} 63 * @readonly 64 */ 65 get ids() { 66 return this.client.options.shards; 67 } 68 69 /** 70 * Total number of shards 71 * @type {number} 72 * @readonly 73 */ 74 get count() { 75 return this.client.options.shardCount; 76 } 77 78 /** 79 * Sends a message to the master process. 80 * @param {*} message Message to send 81 * @returns {Promise<void>} 82 * @emits Shard#message 83 */ 84 send(message) { 85 return new Promise((resolve, reject) => { 86 if (this.mode === 'process') { 87 process.send(message, err => { 88 if (err) reject(err); 89 else resolve(); 90 }); 91 } else if (this.mode === 'worker') { 92 this.parentPort.postMessage(message); 93 resolve(); 94 } 95 }); 96 } 97 98 /** 99 * Fetches a client property value of each shard. 100 * @param {string} prop Name of the client property to get, using periods for nesting 101 * @returns {Promise<Array<*>>} 102 * @example 103 * client.shard.fetchClientValues('guilds.cache.size') 104 * .then(results => console.log(`${results.reduce((prev, val) => prev + val, 0)} total guilds`)) 105 * .catch(console.error); 106 * @see {@link ShardingManager#fetchClientValues} 107 */ 108 fetchClientValues(prop) { 109 return new Promise((resolve, reject) => { 110 const parent = this.parentPort || process; 111 112 const listener = message => { 113 if (!message || message._sFetchProp !== prop) return; 114 parent.removeListener('message', listener); 115 if (!message._error) resolve(message._result); 116 else reject(Util.makeError(message._error)); 117 }; 118 parent.on('message', listener); 119 120 this.send({ _sFetchProp: prop }).catch(err => { 121 parent.removeListener('message', listener); 122 reject(err); 123 }); 124 }); 125 } 126 127 /** 128 * Evaluates a script or function on all shards, in the context of the {@link Clients}. 129 * @param {string|Function} script JavaScript to run on each shard 130 * @returns {Promise<Array<*>>} Results of the script execution 131 * @example 132 * client.shard.broadcastEval('this.guilds.cache.size') 133 * .then(results => console.log(`${results.reduce((prev, val) => prev + val, 0)} total guilds`)) 134 * .catch(console.error); 135 * @see {@link ShardingManager#broadcastEval} 136 */ 137 broadcastEval(script) { 138 return new Promise((resolve, reject) => { 139 const parent = this.parentPort || process; 140 script = typeof script === 'function' ? `(${script})(this)` : script; 141 142 const listener = message => { 143 if (!message || message._sEval !== script) return; 144 parent.removeListener('message', listener); 145 if (!message._error) resolve(message._result); 146 else reject(Util.makeError(message._error)); 147 }; 148 parent.on('message', listener); 149 150 this.send({ _sEval: script }).catch(err => { 151 parent.removeListener('message', listener); 152 reject(err); 153 }); 154 }); 155 } 156 157 /** 158 * Requests a respawn of all shards. 159 * @param {number} [shardDelay=5000] How long to wait between shards (in milliseconds) 160 * @param {number} [respawnDelay=500] How long to wait between killing a shard's process/worker and restarting it 161 * (in milliseconds) 162 * @param {number} [spawnTimeout=30000] The amount in milliseconds to wait for a shard to become ready before 163 * continuing to another. (-1 or Infinity for no wait) 164 * @returns {Promise<void>} Resolves upon the message being sent 165 * @see {@link ShardingManager#respawnAll} 166 */ 167 respawnAll(shardDelay = 5000, respawnDelay = 500, spawnTimeout = 30000) { 168 return this.send({ _sRespawnAll: { shardDelay, respawnDelay, spawnTimeout } }); 169 } 170 171 /** 172 * Handles an IPC message. 173 * @param {*} message Message received 174 * @private 175 */ 176 async _handleMessage(message) { 177 if (!message) return; 178 if (message._fetchProp) { 179 const props = message._fetchProp.split('.'); 180 let value = this.client; 181 for (const prop of props) value = value[prop]; 182 this._respond('fetchProp', { _fetchProp: message._fetchProp, _result: value }); 183 } else if (message._eval) { 184 try { 185 this._respond('eval', { _eval: message._eval, _result: await this.client._eval(message._eval) }); 186 } catch (err) { 187 this._respond('eval', { _eval: message._eval, _error: Util.makePlainError(err) }); 188 } 189 } 190 } 191 192 /** 193 * Sends a message to the master process, emitting an error from the client upon failure. 194 * @param {string} type Type of response to send 195 * @param {*} message Message to send 196 * @private 197 */ 198 _respond(type, message) { 199 this.send(message).catch(err => { 200 err.message = `Error when sending ${type} response to master process: ${err.message}`; 201 /** 202 * Emitted when the client encounters an error. 203 * @event Client#error 204 * @param {Error} error The error encountered 205 */ 206 this.client.emit(Events.ERROR, err); 207 }); 208 } 209 210 /** 211 * Creates/gets the singleton of this class. 212 * @param {Client} client The client to use 213 * @param {ShardingManagerMode} mode Mode the shard was spawned with 214 * @returns {ShardClientUtil} 215 */ 216 static singleton(client, mode) { 217 if (!this._singleton) { 218 this._singleton = new this(client, mode); 219 } else { 220 client.emit( 221 Events.WARN, 222 'Multiple clients created in child process/worker; only the first will handle sharding helpers.', 223 ); 224 } 225 return this._singleton; 226 } 227 } 228 229 module.exports = ShardClientUtil;