Controller initial files
This commit is contained in:
commit
ebd6e190ec
84
src/controller/Controller.js
Normal file
84
src/controller/Controller.js
Normal file
@ -0,0 +1,84 @@
|
||||
// Native
|
||||
const { EventEmitter } = require('node:events');
|
||||
const { inspect } = require('node:util');
|
||||
const path = require('node:path');
|
||||
|
||||
// External
|
||||
const { MasterLogger } = require('@navy.gif/logger');
|
||||
const { Collection } = require('@discordjs/collection');
|
||||
|
||||
// Local
|
||||
const Shard = require('./Shard');
|
||||
|
||||
class Controller extends EventEmitter {
|
||||
|
||||
constructor (options = {}) {
|
||||
super();
|
||||
|
||||
this._options = options;
|
||||
this._built = false;
|
||||
this._debug = options.debug || false;
|
||||
|
||||
this.logger = new MasterLogger(options.logger);
|
||||
|
||||
// Path to the file that gets forked into shards -- relative to process.cwd() (e.g. the directory in which the process was started)
|
||||
this.filePath = path.resolve(options.filePath);
|
||||
|
||||
this.shards = new Collection();
|
||||
|
||||
process.on('warning', (warn) => {
|
||||
this.logger.warn(warn.stack);
|
||||
});
|
||||
|
||||
process.on('error', (error) => {
|
||||
this.logger.errror(error.stack);
|
||||
});
|
||||
|
||||
process.on('uncaughtException', (ex) => {
|
||||
this.logger.error(ex.stack);
|
||||
});
|
||||
|
||||
this.on('built', () => {
|
||||
this.logger.info(`API Controller built`);
|
||||
this._built = true;
|
||||
this.readyAt = Date.now();
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
async init () {
|
||||
|
||||
this.logger.info(`Initialising Controller`);
|
||||
const { shardCount = 1, shardOptions = {}, serverOptions = {} } = this._options;
|
||||
this.logger.info(`Spawning ${shardCount} shards`);
|
||||
|
||||
const promises = [];
|
||||
for (let i = 0; i < shardCount; i++) {
|
||||
const shard = new Shard(this, i, { serverOptions, ...shardOptions, env: this._options.env, path: this.filePath });
|
||||
this._setListeners(shard);
|
||||
this.shards.set(i, shard);
|
||||
promises.push(shard.spawn());
|
||||
}
|
||||
|
||||
await Promise.all(promises);
|
||||
this.emit('built');
|
||||
|
||||
}
|
||||
|
||||
_handleMessage (shard, msg) {
|
||||
this.logger.debug(`Message from ${shard.id}: ${inspect(msg)}`);
|
||||
}
|
||||
|
||||
_setListeners (shard) {
|
||||
shard.on('death', () => this.logger.info(`Shard ${shard.id} has died`));
|
||||
shard.on('shutdown', () => this.logger.info(`Shard ${shard.id} is shutting down gracefully`));
|
||||
shard.on('ready', () => this.logger.info(`Shard ${shard.id} is ready`));
|
||||
shard.on('disconnect', () => this.logger.warn(`Shard ${shard.id} has disconnected`));
|
||||
shard.on('spawn', () => this.logger.info(`Shard ${shard.id} spawned`));
|
||||
|
||||
shard.on('message', (msg) => this._handleMessage(shard, msg));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
module.exports = Controller;
|
136
src/controller/Shard.js
Normal file
136
src/controller/Shard.js
Normal file
@ -0,0 +1,136 @@
|
||||
const ChildProcess = require('node:child_process');
|
||||
const { EventEmitter } = require('node:events');
|
||||
|
||||
const { Util } = require('../util');
|
||||
|
||||
class Shard extends EventEmitter {
|
||||
|
||||
constructor (controller, id, options = {}) {
|
||||
super();
|
||||
|
||||
this.controller = controller;
|
||||
if (typeof id !== 'number' || isNaN(id)) throw new Error('Missing ID');
|
||||
this.id = id;
|
||||
|
||||
if (!options.path) throw new Error('Missing path to file to fork');
|
||||
this.filePath = options.path;
|
||||
this.args = options.args || [];
|
||||
this.execArgv = options.execArgv || [];
|
||||
this.env = options.env || {};
|
||||
this._respawn = options.respawn || false;
|
||||
this.serverOptions = options.serverOptions || {};
|
||||
|
||||
this.ready = false;
|
||||
this.process = null;
|
||||
|
||||
}
|
||||
|
||||
async spawn (waitForReady = false) {
|
||||
|
||||
if (this.process) throw new Error(`[shard-${this.id}] A process for this shard already exists!`);
|
||||
|
||||
this.process = ChildProcess.fork(this.filePath, this.args, { env: { ...this.env, SHARD_ID: this.id }, execArgv: this.execArgv })
|
||||
.on('message', this._handleMessage.bind(this))
|
||||
.on('exit', () => this._handleExit())
|
||||
.on('disconnect', this._handleDisconnect.bind(this)); // Don't know if this is going to help, but monitoring whether this gets called whenever a process on its own closes the IPC channel
|
||||
|
||||
this.process.once('spawn', () => {
|
||||
this.emit('spawn');
|
||||
this.process.send({ _start: this.serverOptions });
|
||||
});
|
||||
if (!waitForReady) return;
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
this.once('ready', resolve);
|
||||
this.once('disconnect', () => reject(new Error(`[shard-${this.id}] Shard disconnected while starting up`)));
|
||||
this.once('death', () => reject(new Error(`[shard-${this.id}] Shard died while starting`)));
|
||||
setTimeout(() => reject(new Error(`[shard-${this.id}] Shard timed out while starting`)), 30_000);
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
async respawn (delay = 500) {
|
||||
await this.kill();
|
||||
if (delay) await Util.wait(delay);
|
||||
return this.spawn();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a shutdown command to the shard, if it doesn't respond within 5 seconds it gets killed
|
||||
* TODO: Add a check to see if the process actually ends and print out a warning if it hasn't
|
||||
*
|
||||
* @return {*}
|
||||
* @memberof Shard
|
||||
*/
|
||||
kill () {
|
||||
if (this.process) {
|
||||
return new Promise((resolve) => {
|
||||
// Clear out all other exit listeners so they don't accidentally start the process up again
|
||||
this.process.removeAllListeners('exit');
|
||||
// Set timeout for force kill
|
||||
const to = setTimeout(() => {
|
||||
this.process.kill();
|
||||
resolve();
|
||||
}, 5000);
|
||||
// Gracefully handle exit
|
||||
this.process.once('exit', () => {
|
||||
clearTimeout(to);
|
||||
this._handleExit(false);
|
||||
resolve();
|
||||
});
|
||||
// Clear the force kill timeout if the process responds with a shutdown echo, allowing it time to gracefully close all connections
|
||||
this.once('shutdown', () => {
|
||||
clearTimeout(to);
|
||||
});
|
||||
|
||||
this.process.send({ _shutdown: true });
|
||||
});
|
||||
}
|
||||
this._handleExit(false);
|
||||
}
|
||||
|
||||
send (message) {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (this.ready && this.process) {
|
||||
this.process.send(message, err => {
|
||||
if (err) reject(err);
|
||||
else resolve();
|
||||
});
|
||||
} else reject(new Error(`[shard-${this.id}] Cannot send message to dead shard.`));
|
||||
});
|
||||
}
|
||||
|
||||
_handleMessage (message) {
|
||||
if (message) {
|
||||
if (message._ready) {
|
||||
this.ready = true;
|
||||
this.emit('ready');
|
||||
return;
|
||||
} else if (message._shutdown) {
|
||||
this.ready = false;
|
||||
this.emit('shutdown');
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
this.emit('message', this, message);
|
||||
|
||||
}
|
||||
|
||||
_handleDisconnect () {
|
||||
this.emit('disconnect');
|
||||
}
|
||||
|
||||
_handleExit (respawn = this._respawn) {
|
||||
this.process.removeAllListeners();
|
||||
this.emit('death');
|
||||
|
||||
this.ready = false;
|
||||
this.process = null;
|
||||
|
||||
if (respawn) this.spawn().catch(err => this.emit('error', err));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
module.exports = Shard;
|
3
src/controller/index.js
Normal file
3
src/controller/index.js
Normal file
@ -0,0 +1,3 @@
|
||||
module.exports = {
|
||||
Controller: require('./Controller'),
|
||||
};
|
Loading…
Reference in New Issue
Block a user