commit ebd6e190ec9d7b82cda98942d4a7a162cfe22bae Author: Navy.gif Date: Sun Nov 6 19:35:53 2022 +0200 Controller initial files diff --git a/src/controller/Controller.js b/src/controller/Controller.js new file mode 100644 index 0000000..1342fd0 --- /dev/null +++ b/src/controller/Controller.js @@ -0,0 +1,84 @@ +// Native +const { EventEmitter } = require('node:events'); +const { inspect } = require('node:util'); +const path = require('node:path'); + +// External +const { MasterLogger } = require('@navy.gif/logger'); +const { Collection } = require('@discordjs/collection'); + +// Local +const Shard = require('./Shard'); + +class Controller extends EventEmitter { + + constructor (options = {}) { + super(); + + this._options = options; + this._built = false; + this._debug = options.debug || false; + + this.logger = new MasterLogger(options.logger); + + // Path to the file that gets forked into shards -- relative to process.cwd() (e.g. the directory in which the process was started) + this.filePath = path.resolve(options.filePath); + + this.shards = new Collection(); + + process.on('warning', (warn) => { + this.logger.warn(warn.stack); + }); + + process.on('error', (error) => { + this.logger.errror(error.stack); + }); + + process.on('uncaughtException', (ex) => { + this.logger.error(ex.stack); + }); + + this.on('built', () => { + this.logger.info(`API Controller built`); + this._built = true; + this.readyAt = Date.now(); + }); + + } + + async init () { + + this.logger.info(`Initialising Controller`); + const { shardCount = 1, shardOptions = {}, serverOptions = {} } = this._options; + this.logger.info(`Spawning ${shardCount} shards`); + + const promises = []; + for (let i = 0; i < shardCount; i++) { + const shard = new Shard(this, i, { serverOptions, ...shardOptions, env: this._options.env, path: this.filePath }); + this._setListeners(shard); + this.shards.set(i, shard); + promises.push(shard.spawn()); + } + + await Promise.all(promises); + this.emit('built'); + + } + + _handleMessage (shard, msg) { + this.logger.debug(`Message from ${shard.id}: ${inspect(msg)}`); + } + + _setListeners (shard) { + shard.on('death', () => this.logger.info(`Shard ${shard.id} has died`)); + shard.on('shutdown', () => this.logger.info(`Shard ${shard.id} is shutting down gracefully`)); + shard.on('ready', () => this.logger.info(`Shard ${shard.id} is ready`)); + shard.on('disconnect', () => this.logger.warn(`Shard ${shard.id} has disconnected`)); + shard.on('spawn', () => this.logger.info(`Shard ${shard.id} spawned`)); + + shard.on('message', (msg) => this._handleMessage(shard, msg)); + } + +} + +module.exports = Controller; \ No newline at end of file diff --git a/src/controller/Shard.js b/src/controller/Shard.js new file mode 100644 index 0000000..bbe626a --- /dev/null +++ b/src/controller/Shard.js @@ -0,0 +1,136 @@ +const ChildProcess = require('node:child_process'); +const { EventEmitter } = require('node:events'); + +const { Util } = require('../util'); + +class Shard extends EventEmitter { + + constructor (controller, id, options = {}) { + super(); + + this.controller = controller; + if (typeof id !== 'number' || isNaN(id)) throw new Error('Missing ID'); + this.id = id; + + if (!options.path) throw new Error('Missing path to file to fork'); + this.filePath = options.path; + this.args = options.args || []; + this.execArgv = options.execArgv || []; + this.env = options.env || {}; + this._respawn = options.respawn || false; + this.serverOptions = options.serverOptions || {}; + + this.ready = false; + this.process = null; + + } + + async spawn (waitForReady = false) { + + if (this.process) throw new Error(`[shard-${this.id}] A process for this shard already exists!`); + + this.process = ChildProcess.fork(this.filePath, this.args, { env: { ...this.env, SHARD_ID: this.id }, execArgv: this.execArgv }) + .on('message', this._handleMessage.bind(this)) + .on('exit', () => this._handleExit()) + .on('disconnect', this._handleDisconnect.bind(this)); // Don't know if this is going to help, but monitoring whether this gets called whenever a process on its own closes the IPC channel + + this.process.once('spawn', () => { + this.emit('spawn'); + this.process.send({ _start: this.serverOptions }); + }); + if (!waitForReady) return; + + return new Promise((resolve, reject) => { + this.once('ready', resolve); + this.once('disconnect', () => reject(new Error(`[shard-${this.id}] Shard disconnected while starting up`))); + this.once('death', () => reject(new Error(`[shard-${this.id}] Shard died while starting`))); + setTimeout(() => reject(new Error(`[shard-${this.id}] Shard timed out while starting`)), 30_000); + }); + + } + + async respawn (delay = 500) { + await this.kill(); + if (delay) await Util.wait(delay); + return this.spawn(); + } + + /** + * Sends a shutdown command to the shard, if it doesn't respond within 5 seconds it gets killed + * TODO: Add a check to see if the process actually ends and print out a warning if it hasn't + * + * @return {*} + * @memberof Shard + */ + kill () { + if (this.process) { + return new Promise((resolve) => { + // Clear out all other exit listeners so they don't accidentally start the process up again + this.process.removeAllListeners('exit'); + // Set timeout for force kill + const to = setTimeout(() => { + this.process.kill(); + resolve(); + }, 5000); + // Gracefully handle exit + this.process.once('exit', () => { + clearTimeout(to); + this._handleExit(false); + resolve(); + }); + // Clear the force kill timeout if the process responds with a shutdown echo, allowing it time to gracefully close all connections + this.once('shutdown', () => { + clearTimeout(to); + }); + + this.process.send({ _shutdown: true }); + }); + } + this._handleExit(false); + } + + send (message) { + return new Promise((resolve, reject) => { + if (this.ready && this.process) { + this.process.send(message, err => { + if (err) reject(err); + else resolve(); + }); + } else reject(new Error(`[shard-${this.id}] Cannot send message to dead shard.`)); + }); + } + + _handleMessage (message) { + if (message) { + if (message._ready) { + this.ready = true; + this.emit('ready'); + return; + } else if (message._shutdown) { + this.ready = false; + this.emit('shutdown'); + return; + } + } + + this.emit('message', this, message); + + } + + _handleDisconnect () { + this.emit('disconnect'); + } + + _handleExit (respawn = this._respawn) { + this.process.removeAllListeners(); + this.emit('death'); + + this.ready = false; + this.process = null; + + if (respawn) this.spawn().catch(err => this.emit('error', err)); + } + +} + +module.exports = Shard; \ No newline at end of file diff --git a/src/controller/index.js b/src/controller/index.js new file mode 100644 index 0000000..6f94c55 --- /dev/null +++ b/src/controller/index.js @@ -0,0 +1,3 @@ +module.exports = { + Controller: require('./Controller'), +}; \ No newline at end of file