diff --git a/middleware/Shard.js b/middleware/Shard.js index 50e721a..5a73a33 100644 --- a/middleware/Shard.js +++ b/middleware/Shard.js @@ -1,7 +1,7 @@ /*Adopted from Discord.js */ -const path = require('path'); const EventEmitter = require('events'); +const path = require('path'); const { Util } = require('../util/'); @@ -9,22 +9,24 @@ let childProcess = null; let Worker = null; class Shard extends EventEmitter { - + constructor(manager, id) { - + super(); - if(manager.mode === 'process') childProcess = require('child_process'); - else if(manager.mode === 'worker') Worker = require('worker_threads').Worker; //eslint-disable-line prefer-destructuring + if (manager.mode === 'process') childProcess = require('child_process'); + else if (manager.mode === 'worker') Worker = require('worker_threads').Worker; //eslint-disable-line prefer-destructuring this.manager = manager; this.id = id; this.args = manager.shardArgs || []; this.execArgv = manager.execArgv; - this.env = { ...process.env, SHARDING_MANAGER: true, + this.env = { + ...process.env, + SHARDING_MANAGER: true, SHARDS: this.id, - TOTAL_SHARD_COUNT: this.manager.totalShards, - DISCORD_TOKEN: this.manager.token + SHARD_COUNT: this.manager.totalShards, + DISCORD_TOKEN: this.manager.token }; this.ready = false; @@ -38,39 +40,68 @@ class Shard extends EventEmitter { } - async spawn(waitForReady = true) { - if(this.process) throw new Error(`[shard${this.id}] Sharding process already exists.`); - if(this.worker) throw new Error(`[shard${this.id}] Sharding worker already exists.`); + async spawn(spawnTimeout = 30000) { + if (this.process) throw new Error(`[shard${this.id}] Sharding process already exists.`); + if (this.worker) throw new Error(`[shard${this.id}] Sharding worker already exists.`); - if(this.manager.mode === 'process') { - this.process = childProcess.fork(path.resolve(this.manager.file), this.args, { - env: this.env, - execArgv: this.execArgv - }). - on('message', this._handleMessage.bind(this)). - on('exit', this._exitListener); - } else if(this.manager.mode === 'worker') { - this.worker = new Worker(path.resolve(this.manager.file), { workerData: this.env }). - on('message', this._handleMessage.bind(this)). - on('exit', this._exitListener); + if (this.manager.mode === 'process') { + this.process = childProcess + .fork(path.resolve(this.manager.file), this.args, { + env: this.env, + execArgv: this.execArgv + }) + .on('message', this._handleMessage.bind(this)) + .on('exit', this._exitListener); + } else if (this.manager.mode === 'worker') { + this.worker = new Worker(path.resolve(this.manager.file), { workerData: this.env }) + .on('message', this._handleMessage.bind(this)) + .on('exit', this._exitListener); } + this._evals.clear(); + this._fetches.clear(); + this.emit('spawn', this.process || this.worker); - if(!waitForReady) return this.process || this.worker; + if (spawnTimeout === -1 || spawnTimeout === Infinity) return this.process || this.worker; await new Promise((resolve, reject) => { - this.once('ready', resolve); - this.once('disconnect', () => reject(new Error(`[shard${this.id}] Shard disconnected while readying.`))); - this.once('death', () => reject(new Error(`[shard${this.id}] Shard died while readying.`))); - setTimeout(() => reject(new Error(`[shard${this.id}] Shard timed out while readying.`)), 30000); + const cleanup = () => { + clearTimeout(spawnTimeoutTimer); + this.off('ready', onReady); + this.off('disconnect', onDisconnect); + this.off('death', onDeath); + }; + + const onReady = () => { + cleanup(); + resolve(); + }; + + const onDisconnect = () => { + cleanup(); + reject(new Error(`[shard${this.id}] Shard disconnected while readying.`)); + }; + + const onDeath = () => { + cleanup(); + reject(new Error(`[shard${this.id}] Shard died while readying.`)); + }; + + const onTimeout = () => { + cleanup(); + reject(new Error(`[shard${this.id}] Shard timed out while readying.`)); + }; + + const spawnTimeoutTimer = setTimeout(onTimeout, spawnTimeout); + this.once('ready', onReady); + this.once('disconnect', onDisconnect); + this.once('death', onDeath); }); - return this.process || this.worker; - } kill() { - if(this.process) { + if (this.process) { this.process.removeListener('exit', this._exitListener); this.process.kill(); } else { @@ -79,20 +110,20 @@ class Shard extends EventEmitter { } this._handleExit(false); - } - async respawn(delay = 500, waitForReady = true) { + async respawn(delay = 500, spawnTimeout) { this.kill(); - if(delay > 0) await Util.delayFor(delay); - return this.spawn(waitForReady); + if (delay > 0) await Util.delayFor(delay); + return this.spawn(spawnTimeout); } send(message) { return new Promise((resolve, reject) => { - if(this.process) { - this.process.send(message, (error) => { - if(error) reject(error); else resolve(this); + if (this.process) { + this.process.send(message, err => { + if (err) reject(err); + else resolve(this); }); } else { this.worker.postMessage(message); @@ -102,46 +133,53 @@ class Shard extends EventEmitter { } fetchClientValue(prop) { - if(this._fetches.has(prop)) return this._fetches.get(prop); + // Shard is dead (maybe respawning), don't cache anything and error immediately + if (!this.process && !this.worker) return Promise.reject(new Error(`[shard${this.id}] Shard process missing.`)); + + // Cached promise from previous call + if (this._fetches.has(prop)) return this._fetches.get(prop); const promise = new Promise((resolve, reject) => { const child = this.process || this.worker; - + const listener = (message) => { - if(!message || message._fetchProp !== prop) return; + if (!message || message._fetchProp !== prop) return; child.removeListener('message', listener); this._fetches.delete(prop); resolve(message._result); }; child.on('message', listener); - + this.send({ _fetchProp: prop }).catch((err) => { child.removeListener('message', listener); this._fetches.delete(prop); reject(err); }); }); - + this._fetches.set(prop, promise); return promise; - } eval(script) { + // Shard is dead (maybe respawning), don't cache anything and error immediately + if (!this.process && !this.worker) return Promise.reject(new Error(`[shard${this.id}] Shard process missing.`)); - if(this._evals.has(script)) return this._evals.get(script); + // Cached promise from previous call + if (this._evals.has(script)) return this._evals.get(script); const promise = new Promise((resolve, reject) => { const child = this.process || this.worker; - + const listener = (message) => { - if(!message || message._eval !== script) return; + if (!message || message._eval !== script) return; child.removeListener('message', listener); this._evals.delete(script); - if(!message._error) resolve(message._result); else reject(new Error(message._error.stack)); + if (!message._error) resolve(message._result); + else reject(Util.makePlainError(message._error)); }; child.on('message', listener); - + const _eval = typeof script === 'function' ? `(${script})(this)` : script; this.send({ _eval }).catch((err) => { child.removeListener('message', listener); @@ -149,68 +187,78 @@ class Shard extends EventEmitter { reject(err); }); }); - + this._evals.set(script, promise); return promise; - } _handleMessage(message) { - if(message) { - if(message._ready) { //Shard ready + if (message) { + // Shard is ready + if (message._ready) { this.ready = true; this.emit('ready'); return; } - if(message._disconnect) { //Shard disconnected + + // Shard has disconnected + if (message._disconnect) { this.ready = false; this.emit('disconnect'); return; } - if(message._reconnecting) { //Shard attempting to reconnect + + // Shard is attempting to reconnect + if (message._reconnecting) { this.ready = false; this.emit('reconnecting'); return; } - if(message._sFetchProp) { //Shard requesting property fetch - this.manager.fetchClientValues(message._sFetchProp).then( - (results) => this.send({ _sFetchProp: message._sFetchProp, _result: results }), - (err) => this.send({ _sFetchProp: message._sFetchProp, _error: Util.makePlainError(err) }) + + // Shard is requesting a property fetch + if (message._sFetchProp) { + const resp = { _sFetchProp: message._sFetchProp, _sFetchPropShard: message._sFetchPropShard }; + this.manager.fetchClientValues(message._sFetchProp, message._sFetchPropShard).then( + (results) => this.send({ ...resp, _result: results }), + (err) => this.send({ ...resp, _error: Util.makePlainError(err) }) ); return; } - if(message._sEval) { //Shard requesting eval broadcast - this.manager.broadcastEval(message._sEval).then( - (results) => this.send({ _sEval: message._sEval, _result: results }), - (err) => this.send({ _sEval: message._sEval, _error: Util.makePlainError(err) }) + + // Shard is requesting an eval broadcast + if (message._sEval) { + const resp = { _sEval: message._sEval, _sEvalShard: message._sEvalShard }; + this.manager.broadcastEval(message._sEval, message._sEvalShard).then( + (results) => this.send({ ...resp, _result: results }), + (err) => this.send({ ...resp, _error: Util.makePlainError(err) }) ); return; } - if(message._sRespawnAll) { //Shard requesting to respawn all shards. - const { shardDelay, respawnDelay, waitForReady } = message._sRespawnAll; - this.manager.respawnAll(shardDelay, respawnDelay, waitForReady).catch(() => { //eslint-disable-line no-empty-function + + // Shard is requesting a respawn of all shards + if (message._sRespawnAll) { + const { shardDelay, respawnDelay, spawnTimeout } = message._sRespawnAll; + this.manager.respawnAll(shardDelay, respawnDelay, spawnTimeout).catch(() => { + // Do nothing }); return; } } this.manager.emit('message', this, message); - } _handleExit(respawn = this.manager.respawn) { this.emit('death', this.process || this.worker); - + this.ready = false; this.process = null; this.worker = null; this._evals.clear(); this._fetches.clear(); - - if(respawn) this.spawn().catch((err) => this.emit('error', err)); + if (respawn) this.spawn().catch((err) => this.emit('error', err)); } - } module.exports = Shard; \ No newline at end of file diff --git a/middleware/ShardManager.js b/middleware/ShardManager.js index 9aea6fe..e23942a 100644 --- a/middleware/ShardManager.js +++ b/middleware/ShardManager.js @@ -1,13 +1,13 @@ /*Adopted from Discord.js */ -const path = require('path'); -const fs = require('fs'); const EventEmitter = require('events'); +const fs = require('fs'); +const path = require('path'); -const Shard = require('./Shard.js'); +const Shard = require('./Shard'); const { Util, Collection } = require('../util/'); -class ShardManager extends EventEmitter { +class ShardingManager extends EventEmitter { constructor(file, options = {}) { @@ -23,22 +23,21 @@ class ShardManager extends EventEmitter { }, options.shard); this.file = file; - if(!file) throw new Error('[shardmanager] File must be specified.'); - if(!path.isAbsolute(file)) this.file = path.resolve(process.cwd(), file); - + if (!file) throw new Error('[shardmanager] File must be specified.'); + if (!path.isAbsolute(file)) this.file = path.resolve(process.cwd(), file); const stats = fs.statSync(this.file); - if(!stats.isFile()) throw new Error('[shardmanager] File path does not point to a valid file.'); + if (!stats.isFile()) throw new Error('[shardmanager] File path does not point to a valid file.'); this.shardList = options.shardList || 'auto'; - if(this.shardList !== 'auto') { - if(!Array.isArray(this.shardList)) { + if (this.shardList !== 'auto') { + if (!Array.isArray(this.shardList)) { throw new TypeError('[shardmanager] ShardList must be an array.'); } this.shardList = [...new Set(this.shardList)]; - if(this.shardList.length < 1) throw new RangeError('[shardmanager] ShardList must have one ID.'); - if(this.shardList.some((shardID) => typeof shardID !== 'number' || - isNaN(shardID) || - !Number.isInteger(shardID) || + if (this.shardList.length < 1) throw new RangeError('[shardmanager] ShardList must have one ID.'); + if (this.shardList.some(shardID => typeof shardID !== 'number' || + isNaN(shardID) || + !Number.isInteger(shardID) || shardID < 0) ) { throw new TypeError('[shardmanager] ShardList must be an array of positive integers.'); @@ -46,31 +45,30 @@ class ShardManager extends EventEmitter { } this.totalShards = options.totalShards || 'auto'; - if(this.totalShards !== 'auto') { - if(typeof this.totalShards !== 'number' || isNaN(this.totalShards)) { + if (this.totalShards !== 'auto') { + if (typeof this.totalShards !== 'number' || isNaN(this.totalShards)) { throw new TypeError('[shardmanager] TotalShards must be an integer.'); } - if(this.totalShards < 1) throw new RangeError('[shardmanager] TotalShards must be at least one.'); - if(!Number.isInteger(this.totalShards)) { + if (this.totalShards < 1) throw new RangeError('[shardmanager] TotalShards must be at least one.'); + if (!Number.isInteger(this.totalShards)) { throw new RangeError('[shardmanager] TotalShards must be an integer.'); } } this.mode = options.mode; - if(this.mode !== 'process' && this.mode !== 'worker') { + if (this.mode !== 'process' && this.mode !== 'worker') { throw new RangeError('[shardmanager] Mode must be either \'worker\' or \'process\'.'); } this.respawn = options.respawn; this.shardArgs = options.shardArgs; this.execArgv = options.execArgv; - this.token = options.token; + this.token = options.token ? options.token.replace(/^Bot\s*/i, '') : null; this.shards = new Collection(); process.env.SHARDING_MANAGER = true; process.env.SHARDING_MANAGER_MODE = this.mode; process.env.DISCORD_TOKEN = this.token; - } createShard(id = this.shards.size) { @@ -81,73 +79,81 @@ class ShardManager extends EventEmitter { return shard; } - async spawn(amount = this.totalShards, delay = 5500, waitForReady = true) { - - if(amount === 'auto') { + async spawn(amount = this.totalShards, delay = 5500, spawnTimeout) { + if (amount === 'auto') { amount = await Util.fetchRecommendedShards(this.token); } else { - if(typeof amount !== 'number' || isNaN(amount)) { + if (typeof amount !== 'number' || isNaN(amount)) { throw new TypeError('[shardmanager] Amount of shards must be a number.'); } - if(amount < 1) throw new RangeError('[shardmanager] Amount of shards must be at least one.'); - if(!Number.isInteger(amount)) { + if (amount < 1) throw new RangeError('[shardmanager] Amount of shards must be at least one.'); + if (!Number.isInteger(amount)) { throw new TypeError('[shardmanager] Amount of shards must be an integer.'); } } - if(this.shards.size >= amount) throw new Error('[shardmanager] Already spawned all necessary shards.'); - if(this.shardList === 'auto' || this.totalShards === 'auto' || this.totalShards !== amount) { + // Make sure this many shards haven't already been spawned + if (this.shards.size >= amount) throw new Error('[shardmanager] Already spawned all necessary shards.'); + if (this.shardList === 'auto' || this.totalShards === 'auto' || this.totalShards !== amount) { this.shardList = [...Array(amount).keys()]; } - if(this.totalShards === 'auto' || this.totalShards !== amount) { + if (this.totalShards === 'auto' || this.totalShards !== amount) { this.totalShards = amount; } - if(this.shardList.some((id) => id >= amount)) { + + if (this.shardList.some((shardID) => shardID >= amount)) { throw new RangeError('[shardmanager] Amount of shards cannot be larger than the highest shard ID.'); } - for(const shardID of this.shardList) { + // Spawn the shards + for (const shardID of this.shardList) { const promises = []; const shard = this.createShard(shardID); - promises.push(shard.spawn(waitForReady)); - if(delay > 0 && this.shards.size !== this.shardList.length - 1) promises.push(Util.delayFor(delay)); - await Promise.all(promises); + promises.push(shard.spawn(spawnTimeout)); + if (delay > 0 && this.shards.size !== this.shardList.length) promises.push(Util.delayFor(delay)); + await Promise.all(promises); // eslint-disable-line no-await-in-loop } return this.shards; - } broadcast(message) { const promises = []; - for(const shard of this.shards.values()) promises.push(shard.send(message)); + for (const shard of this.shards.values()) promises.push(shard.send(message)); return Promise.all(promises); } - broadcastEval(script) { + broadcastEval(script, shard) { + return this._performOnShards('eval', [script], shard); + } + + fetchClientValues(prop, shard) { + return this._performOnShards('fetchClientValue', [prop], shard); + } + + _performOnShards(method, args, shard) { + if (this.shards.size === 0) return Promise.reject(new Error('[shardmanager] No shards available.')); + if (this.shards.size !== this.shardList.length) return Promise.reject(new Error('[shardmanager] Sharding in progress.')); + + if (typeof shard === 'number') { + if (this.shards.has(shard)) return this.shards.get(shard)[method](...args); + return Promise.reject(new Error(`[shardmanager] Shard ${shard} not found.`)); + } + const promises = []; - for(const shard of this.shards.values()) promises.push(shard.eval(script)); + for (const sh of this.shards.values()) promises.push(sh[method](...args)); return Promise.all(promises); } - fetchClientValues(prop) { - if(this.shards.size === 0) return Promise.reject(new Error('[shardmanager] No shards available.')); - if(this.shards.size !== this.totalShards) return Promise.reject(new Error('[shardmanager] Sharding in progress.')); - const promises = []; - for(const shard of this.shards.values()) promises.push(shard.fetchClientValue(prop)); - return Promise.all(promises); - } - - async respawnAll(shardDelay = 5000, respawnDelay = 500, waitForReady = true) { + async respawnAll(shardDelay = 5000, respawnDelay = 500, spawnTimeout) { let s = 0; - for(const shard of this.shards.values()) { - const promises = [shard.respawn(respawnDelay, waitForReady)]; - if(++s < this.shards.size && shardDelay > 0) promises.push(Util.delayFor(shardDelay)); - await Promise.all(promises); + for (const shard of this.shards.values()) { + const promises = [shard.respawn(respawnDelay, spawnTimeout)]; + if (++s < this.shards.size && shardDelay > 0) promises.push(Util.delayFor(shardDelay)); + await Promise.all(promises); // eslint-disable-line no-await-in-loop } return this.shards; } - } -module.exports = ShardManager; \ No newline at end of file +module.exports = ShardingManager; \ No newline at end of file