shard stuff

This commit is contained in:
Erik 2021-06-09 02:45:50 +03:00
parent a584f38f38
commit 5bf30707f4
No known key found for this signature in database
GPG Key ID: 7E862371D3409F16
2 changed files with 178 additions and 124 deletions

View File

@ -1,7 +1,7 @@
/*Adopted from Discord.js */
const path = require('path');
const EventEmitter = require('events');
const path = require('path');
const { Util } = require('../util/');
@ -14,16 +14,18 @@ class Shard extends EventEmitter {
super();
if(manager.mode === 'process') childProcess = require('child_process');
else if(manager.mode === 'worker') Worker = require('worker_threads').Worker; //eslint-disable-line prefer-destructuring
if (manager.mode === 'process') childProcess = require('child_process');
else if (manager.mode === 'worker') Worker = require('worker_threads').Worker; //eslint-disable-line prefer-destructuring
this.manager = manager;
this.id = id;
this.args = manager.shardArgs || [];
this.execArgv = manager.execArgv;
this.env = { ...process.env, SHARDING_MANAGER: true,
this.env = {
...process.env,
SHARDING_MANAGER: true,
SHARDS: this.id,
TOTAL_SHARD_COUNT: this.manager.totalShards,
SHARD_COUNT: this.manager.totalShards,
DISCORD_TOKEN: this.manager.token
};
@ -38,39 +40,68 @@ class Shard extends EventEmitter {
}
async spawn(waitForReady = true) {
if(this.process) throw new Error(`[shard${this.id}] Sharding process already exists.`);
if(this.worker) throw new Error(`[shard${this.id}] Sharding worker already exists.`);
async spawn(spawnTimeout = 30000) {
if (this.process) throw new Error(`[shard${this.id}] Sharding process already exists.`);
if (this.worker) throw new Error(`[shard${this.id}] Sharding worker already exists.`);
if(this.manager.mode === 'process') {
this.process = childProcess.fork(path.resolve(this.manager.file), this.args, {
env: this.env,
execArgv: this.execArgv
}).
on('message', this._handleMessage.bind(this)).
on('exit', this._exitListener);
} else if(this.manager.mode === 'worker') {
this.worker = new Worker(path.resolve(this.manager.file), { workerData: this.env }).
on('message', this._handleMessage.bind(this)).
on('exit', this._exitListener);
if (this.manager.mode === 'process') {
this.process = childProcess
.fork(path.resolve(this.manager.file), this.args, {
env: this.env,
execArgv: this.execArgv
})
.on('message', this._handleMessage.bind(this))
.on('exit', this._exitListener);
} else if (this.manager.mode === 'worker') {
this.worker = new Worker(path.resolve(this.manager.file), { workerData: this.env })
.on('message', this._handleMessage.bind(this))
.on('exit', this._exitListener);
}
this._evals.clear();
this._fetches.clear();
this.emit('spawn', this.process || this.worker);
if(!waitForReady) return this.process || this.worker;
if (spawnTimeout === -1 || spawnTimeout === Infinity) return this.process || this.worker;
await new Promise((resolve, reject) => {
this.once('ready', resolve);
this.once('disconnect', () => reject(new Error(`[shard${this.id}] Shard disconnected while readying.`)));
this.once('death', () => reject(new Error(`[shard${this.id}] Shard died while readying.`)));
setTimeout(() => reject(new Error(`[shard${this.id}] Shard timed out while readying.`)), 30000);
const cleanup = () => {
clearTimeout(spawnTimeoutTimer);
this.off('ready', onReady);
this.off('disconnect', onDisconnect);
this.off('death', onDeath);
};
const onReady = () => {
cleanup();
resolve();
};
const onDisconnect = () => {
cleanup();
reject(new Error(`[shard${this.id}] Shard disconnected while readying.`));
};
const onDeath = () => {
cleanup();
reject(new Error(`[shard${this.id}] Shard died while readying.`));
};
const onTimeout = () => {
cleanup();
reject(new Error(`[shard${this.id}] Shard timed out while readying.`));
};
const spawnTimeoutTimer = setTimeout(onTimeout, spawnTimeout);
this.once('ready', onReady);
this.once('disconnect', onDisconnect);
this.once('death', onDeath);
});
return this.process || this.worker;
}
kill() {
if(this.process) {
if (this.process) {
this.process.removeListener('exit', this._exitListener);
this.process.kill();
} else {
@ -79,20 +110,20 @@ class Shard extends EventEmitter {
}
this._handleExit(false);
}
async respawn(delay = 500, waitForReady = true) {
async respawn(delay = 500, spawnTimeout) {
this.kill();
if(delay > 0) await Util.delayFor(delay);
return this.spawn(waitForReady);
if (delay > 0) await Util.delayFor(delay);
return this.spawn(spawnTimeout);
}
send(message) {
return new Promise((resolve, reject) => {
if(this.process) {
this.process.send(message, (error) => {
if(error) reject(error); else resolve(this);
if (this.process) {
this.process.send(message, err => {
if (err) reject(err);
else resolve(this);
});
} else {
this.worker.postMessage(message);
@ -102,13 +133,17 @@ class Shard extends EventEmitter {
}
fetchClientValue(prop) {
if(this._fetches.has(prop)) return this._fetches.get(prop);
// Shard is dead (maybe respawning), don't cache anything and error immediately
if (!this.process && !this.worker) return Promise.reject(new Error(`[shard${this.id}] Shard process missing.`));
// Cached promise from previous call
if (this._fetches.has(prop)) return this._fetches.get(prop);
const promise = new Promise((resolve, reject) => {
const child = this.process || this.worker;
const listener = (message) => {
if(!message || message._fetchProp !== prop) return;
if (!message || message._fetchProp !== prop) return;
child.removeListener('message', listener);
this._fetches.delete(prop);
resolve(message._result);
@ -124,21 +159,24 @@ class Shard extends EventEmitter {
this._fetches.set(prop, promise);
return promise;
}
eval(script) {
// Shard is dead (maybe respawning), don't cache anything and error immediately
if (!this.process && !this.worker) return Promise.reject(new Error(`[shard${this.id}] Shard process missing.`));
if(this._evals.has(script)) return this._evals.get(script);
// Cached promise from previous call
if (this._evals.has(script)) return this._evals.get(script);
const promise = new Promise((resolve, reject) => {
const child = this.process || this.worker;
const listener = (message) => {
if(!message || message._eval !== script) return;
if (!message || message._eval !== script) return;
child.removeListener('message', listener);
this._evals.delete(script);
if(!message._error) resolve(message._result); else reject(new Error(message._error.stack));
if (!message._error) resolve(message._result);
else reject(Util.makePlainError(message._error));
};
child.on('message', listener);
@ -152,50 +190,62 @@ class Shard extends EventEmitter {
this._evals.set(script, promise);
return promise;
}
_handleMessage(message) {
if(message) {
if(message._ready) { //Shard ready
if (message) {
// Shard is ready
if (message._ready) {
this.ready = true;
this.emit('ready');
return;
}
if(message._disconnect) { //Shard disconnected
// Shard has disconnected
if (message._disconnect) {
this.ready = false;
this.emit('disconnect');
return;
}
if(message._reconnecting) { //Shard attempting to reconnect
// Shard is attempting to reconnect
if (message._reconnecting) {
this.ready = false;
this.emit('reconnecting');
return;
}
if(message._sFetchProp) { //Shard requesting property fetch
this.manager.fetchClientValues(message._sFetchProp).then(
(results) => this.send({ _sFetchProp: message._sFetchProp, _result: results }),
(err) => this.send({ _sFetchProp: message._sFetchProp, _error: Util.makePlainError(err) })
// Shard is requesting a property fetch
if (message._sFetchProp) {
const resp = { _sFetchProp: message._sFetchProp, _sFetchPropShard: message._sFetchPropShard };
this.manager.fetchClientValues(message._sFetchProp, message._sFetchPropShard).then(
(results) => this.send({ ...resp, _result: results }),
(err) => this.send({ ...resp, _error: Util.makePlainError(err) })
);
return;
}
if(message._sEval) { //Shard requesting eval broadcast
this.manager.broadcastEval(message._sEval).then(
(results) => this.send({ _sEval: message._sEval, _result: results }),
(err) => this.send({ _sEval: message._sEval, _error: Util.makePlainError(err) })
// Shard is requesting an eval broadcast
if (message._sEval) {
const resp = { _sEval: message._sEval, _sEvalShard: message._sEvalShard };
this.manager.broadcastEval(message._sEval, message._sEvalShard).then(
(results) => this.send({ ...resp, _result: results }),
(err) => this.send({ ...resp, _error: Util.makePlainError(err) })
);
return;
}
if(message._sRespawnAll) { //Shard requesting to respawn all shards.
const { shardDelay, respawnDelay, waitForReady } = message._sRespawnAll;
this.manager.respawnAll(shardDelay, respawnDelay, waitForReady).catch(() => { //eslint-disable-line no-empty-function
// Shard is requesting a respawn of all shards
if (message._sRespawnAll) {
const { shardDelay, respawnDelay, spawnTimeout } = message._sRespawnAll;
this.manager.respawnAll(shardDelay, respawnDelay, spawnTimeout).catch(() => {
// Do nothing
});
return;
}
}
this.manager.emit('message', this, message);
}
_handleExit(respawn = this.manager.respawn) {
@ -207,10 +257,8 @@ class Shard extends EventEmitter {
this._evals.clear();
this._fetches.clear();
if(respawn) this.spawn().catch((err) => this.emit('error', err));
if (respawn) this.spawn().catch((err) => this.emit('error', err));
}
}
module.exports = Shard;

View File

@ -1,13 +1,13 @@
/*Adopted from Discord.js */
const path = require('path');
const fs = require('fs');
const EventEmitter = require('events');
const fs = require('fs');
const path = require('path');
const Shard = require('./Shard.js');
const Shard = require('./Shard');
const { Util, Collection } = require('../util/');
class ShardManager extends EventEmitter {
class ShardingManager extends EventEmitter {
constructor(file, options = {}) {
@ -23,20 +23,19 @@ class ShardManager extends EventEmitter {
}, options.shard);
this.file = file;
if(!file) throw new Error('[shardmanager] File must be specified.');
if(!path.isAbsolute(file)) this.file = path.resolve(process.cwd(), file);
if (!file) throw new Error('[shardmanager] File must be specified.');
if (!path.isAbsolute(file)) this.file = path.resolve(process.cwd(), file);
const stats = fs.statSync(this.file);
if(!stats.isFile()) throw new Error('[shardmanager] File path does not point to a valid file.');
if (!stats.isFile()) throw new Error('[shardmanager] File path does not point to a valid file.');
this.shardList = options.shardList || 'auto';
if(this.shardList !== 'auto') {
if(!Array.isArray(this.shardList)) {
if (this.shardList !== 'auto') {
if (!Array.isArray(this.shardList)) {
throw new TypeError('[shardmanager] ShardList must be an array.');
}
this.shardList = [...new Set(this.shardList)];
if(this.shardList.length < 1) throw new RangeError('[shardmanager] ShardList must have one ID.');
if(this.shardList.some((shardID) => typeof shardID !== 'number' ||
if (this.shardList.length < 1) throw new RangeError('[shardmanager] ShardList must have one ID.');
if (this.shardList.some(shardID => typeof shardID !== 'number' ||
isNaN(shardID) ||
!Number.isInteger(shardID) ||
shardID < 0)
@ -46,31 +45,30 @@ class ShardManager extends EventEmitter {
}
this.totalShards = options.totalShards || 'auto';
if(this.totalShards !== 'auto') {
if(typeof this.totalShards !== 'number' || isNaN(this.totalShards)) {
if (this.totalShards !== 'auto') {
if (typeof this.totalShards !== 'number' || isNaN(this.totalShards)) {
throw new TypeError('[shardmanager] TotalShards must be an integer.');
}
if(this.totalShards < 1) throw new RangeError('[shardmanager] TotalShards must be at least one.');
if(!Number.isInteger(this.totalShards)) {
if (this.totalShards < 1) throw new RangeError('[shardmanager] TotalShards must be at least one.');
if (!Number.isInteger(this.totalShards)) {
throw new RangeError('[shardmanager] TotalShards must be an integer.');
}
}
this.mode = options.mode;
if(this.mode !== 'process' && this.mode !== 'worker') {
if (this.mode !== 'process' && this.mode !== 'worker') {
throw new RangeError('[shardmanager] Mode must be either \'worker\' or \'process\'.');
}
this.respawn = options.respawn;
this.shardArgs = options.shardArgs;
this.execArgv = options.execArgv;
this.token = options.token;
this.token = options.token ? options.token.replace(/^Bot\s*/i, '') : null;
this.shards = new Collection();
process.env.SHARDING_MANAGER = true;
process.env.SHARDING_MANAGER_MODE = this.mode;
process.env.DISCORD_TOKEN = this.token;
}
createShard(id = this.shards.size) {
@ -81,73 +79,81 @@ class ShardManager extends EventEmitter {
return shard;
}
async spawn(amount = this.totalShards, delay = 5500, waitForReady = true) {
if(amount === 'auto') {
async spawn(amount = this.totalShards, delay = 5500, spawnTimeout) {
if (amount === 'auto') {
amount = await Util.fetchRecommendedShards(this.token);
} else {
if(typeof amount !== 'number' || isNaN(amount)) {
if (typeof amount !== 'number' || isNaN(amount)) {
throw new TypeError('[shardmanager] Amount of shards must be a number.');
}
if(amount < 1) throw new RangeError('[shardmanager] Amount of shards must be at least one.');
if(!Number.isInteger(amount)) {
if (amount < 1) throw new RangeError('[shardmanager] Amount of shards must be at least one.');
if (!Number.isInteger(amount)) {
throw new TypeError('[shardmanager] Amount of shards must be an integer.');
}
}
if(this.shards.size >= amount) throw new Error('[shardmanager] Already spawned all necessary shards.');
if(this.shardList === 'auto' || this.totalShards === 'auto' || this.totalShards !== amount) {
// Make sure this many shards haven't already been spawned
if (this.shards.size >= amount) throw new Error('[shardmanager] Already spawned all necessary shards.');
if (this.shardList === 'auto' || this.totalShards === 'auto' || this.totalShards !== amount) {
this.shardList = [...Array(amount).keys()];
}
if(this.totalShards === 'auto' || this.totalShards !== amount) {
if (this.totalShards === 'auto' || this.totalShards !== amount) {
this.totalShards = amount;
}
if(this.shardList.some((id) => id >= amount)) {
if (this.shardList.some((shardID) => shardID >= amount)) {
throw new RangeError('[shardmanager] Amount of shards cannot be larger than the highest shard ID.');
}
for(const shardID of this.shardList) {
// Spawn the shards
for (const shardID of this.shardList) {
const promises = [];
const shard = this.createShard(shardID);
promises.push(shard.spawn(waitForReady));
if(delay > 0 && this.shards.size !== this.shardList.length - 1) promises.push(Util.delayFor(delay));
await Promise.all(promises);
promises.push(shard.spawn(spawnTimeout));
if (delay > 0 && this.shards.size !== this.shardList.length) promises.push(Util.delayFor(delay));
await Promise.all(promises); // eslint-disable-line no-await-in-loop
}
return this.shards;
}
broadcast(message) {
const promises = [];
for(const shard of this.shards.values()) promises.push(shard.send(message));
for (const shard of this.shards.values()) promises.push(shard.send(message));
return Promise.all(promises);
}
broadcastEval(script) {
broadcastEval(script, shard) {
return this._performOnShards('eval', [script], shard);
}
fetchClientValues(prop, shard) {
return this._performOnShards('fetchClientValue', [prop], shard);
}
_performOnShards(method, args, shard) {
if (this.shards.size === 0) return Promise.reject(new Error('[shardmanager] No shards available.'));
if (this.shards.size !== this.shardList.length) return Promise.reject(new Error('[shardmanager] Sharding in progress.'));
if (typeof shard === 'number') {
if (this.shards.has(shard)) return this.shards.get(shard)[method](...args);
return Promise.reject(new Error(`[shardmanager] Shard ${shard} not found.`));
}
const promises = [];
for(const shard of this.shards.values()) promises.push(shard.eval(script));
for (const sh of this.shards.values()) promises.push(sh[method](...args));
return Promise.all(promises);
}
fetchClientValues(prop) {
if(this.shards.size === 0) return Promise.reject(new Error('[shardmanager] No shards available.'));
if(this.shards.size !== this.totalShards) return Promise.reject(new Error('[shardmanager] Sharding in progress.'));
const promises = [];
for(const shard of this.shards.values()) promises.push(shard.fetchClientValue(prop));
return Promise.all(promises);
}
async respawnAll(shardDelay = 5000, respawnDelay = 500, waitForReady = true) {
async respawnAll(shardDelay = 5000, respawnDelay = 500, spawnTimeout) {
let s = 0;
for(const shard of this.shards.values()) {
const promises = [shard.respawn(respawnDelay, waitForReady)];
if(++s < this.shards.size && shardDelay > 0) promises.push(Util.delayFor(shardDelay));
await Promise.all(promises);
for (const shard of this.shards.values()) {
const promises = [shard.respawn(respawnDelay, spawnTimeout)];
if (++s < this.shards.size && shardDelay > 0) promises.push(Util.delayFor(shardDelay));
await Promise.all(promises); // eslint-disable-line no-await-in-loop
}
return this.shards;
}
}
module.exports = ShardManager;
module.exports = ShardingManager;