webserver-framework/src/controller/Shard.ts

276 lines
9.0 KiB
TypeScript
Raw Normal View History

2023-04-17 13:10:45 +02:00
// const ChildProcess = require('node:child_process');
// const { EventEmitter } = require('node:events');
import { ChildProcess, fork } from 'node:child_process';
import EventEmitter from 'node:events';
// const { Util } = require('../util');
import { Util } from '../util/index.js';
import Controller from './Controller.js';
import { ShardOptions } from '../../@types/Controller.js';
import { IPCMessage } from '../../@types/Other.js';
import { ServerOptions } from '../../@types/Server.js';
import path from 'node:path';
2022-11-06 18:35:53 +01:00
2023-02-10 22:56:26 +01:00
// Give subprocess 90s to shut down before being forcibly killed
const KillTO = 90 * 1000;
2022-11-06 18:35:53 +01:00
class Shard extends EventEmitter {
2023-04-17 13:10:45 +02:00
// #_controller: Controller;
#_id: number;
#_filePath: string;
#_args: string[];
#_execArgv: string[];
#_env: { [key: string]: string };
#_respawn: boolean;
#_serverOptions: ServerOptions;
#_ready: boolean;
#_process: ChildProcess | null;
#_fatal: boolean;
#_crashes: number[];
#_spawnedAt: number;
#_awaitingShutdown: (() => void) | null;
#_awaitingResponse: Map<string, (args: IPCMessage) => void>;
constructor (_controller: Controller, id: number, options: ShardOptions) {
2022-11-06 18:35:53 +01:00
super();
2023-04-17 13:10:45 +02:00
// this.#_controller = controller;
2023-02-06 14:12:22 +01:00
if (typeof id !== 'number' || isNaN(id))
throw new Error('Missing ID');
2023-04-17 13:10:45 +02:00
this.#_id = id;
2022-11-06 18:35:53 +01:00
2023-04-17 13:10:45 +02:00
if (!options?.path)
2023-02-06 14:12:22 +01:00
throw new Error('Missing path to file to fork');
2023-04-17 13:10:45 +02:00
this.#_filePath = options.path;
this.#_args = options.args || [];
this.#_execArgv = options.execArgv || [];
this.#_env = options.env || {};
this.#_respawn = options.respawn || false;
this.#_serverOptions = options.serverOptions || {} as ServerOptions;
this.#_serverOptions.dir = path.resolve(options.path, '..');
2022-11-06 18:35:53 +01:00
2023-04-17 13:10:45 +02:00
this.#_ready = false;
this.#_process = null;
this.#_fatal = false;
2022-11-06 18:35:53 +01:00
// Keep track of crashes for preventing crash loops
2023-04-17 13:10:45 +02:00
this.#_crashes = [];
// Set in the spawn method
2023-04-17 13:10:45 +02:00
this.#_spawnedAt = Date.now(); // Gets re-set once actually spawned
this.#_awaitingShutdown = null;
this.#_awaitingResponse = new Map();
}
2023-04-17 13:10:45 +02:00
get id () {
return this.#_id;
}
get fatal () {
return this.#_fatal;
}
2023-02-10 22:56:26 +01:00
2023-04-17 13:10:45 +02:00
get process () {
return this.#_process;
}
2023-02-13 01:08:28 +01:00
2023-04-17 13:10:45 +02:00
get ready () {
return this.#_ready;
}
get spawnedAt () {
return this.#_spawnedAt;
2022-11-06 18:35:53 +01:00
}
async spawn (waitForReady = false) {
2023-02-06 14:12:22 +01:00
if (this.fatal)
throw new Error(`[shard-${this.id}] Process died fatally and cannot be restarted. Fix the issue before trying again.`);
if (this.process)
throw new Error(`[shard-${this.id}] A process for this shard already exists!`);
2022-11-06 18:35:53 +01:00
2023-04-30 01:08:10 +02:00
this.#_process = fork(this.#_filePath, this.#_args, {
env: {
...this.#_env,
SHARD_ID: this.id.toString()
},
execArgv: this.#_execArgv
})
2022-11-06 18:35:53 +01:00
.on('message', this._handleMessage.bind(this))
2023-02-06 13:33:27 +01:00
.on('exit', this._handleExit.bind(this))
2022-11-06 18:35:53 +01:00
.on('disconnect', this._handleDisconnect.bind(this)); // Don't know if this is going to help, but monitoring whether this gets called whenever a process on its own closes the IPC channel
2023-04-17 13:10:45 +02:00
this.#_process.once('spawn', () => {
2022-11-06 18:35:53 +01:00
this.emit('spawn');
2023-04-17 13:10:45 +02:00
if (!this.#_process)
throw new Error('Shut up TS');
this.#_process.send({ _start: this.#_serverOptions });
this.#_spawnedAt = Date.now();
2022-11-06 18:35:53 +01:00
});
2023-02-06 14:12:22 +01:00
if (!waitForReady)
return;
2022-11-06 18:35:53 +01:00
return new Promise((resolve, reject) => {
this.once('ready', resolve);
this.once('disconnect', () => reject(new Error(`[shard-${this.id}] Shard disconnected while starting up`)));
this.once('death', () => reject(new Error(`[shard-${this.id}] Shard died while starting`)));
setTimeout(() => reject(new Error(`[shard-${this.id}] Shard timed out while starting`)), 30_000);
});
}
async respawn (delay = 500) {
await this.kill();
2023-02-06 14:12:22 +01:00
if (delay)
await Util.wait(delay);
2022-11-06 18:35:53 +01:00
return this.spawn();
}
/**
* Sends a shutdown command to the shard, if it doesn't respond within 5 seconds it gets killed
* TODO: Add a check to see if the process actually ends and print out a warning if it hasn't
*
* @return {*}
* @memberof Shard
*/
2023-04-17 13:10:45 +02:00
kill (): Promise<void> | void {
2022-11-06 18:35:53 +01:00
if (this.process) {
2023-04-17 13:10:45 +02:00
return new Promise<void>((resolve) => {
2022-11-06 18:35:53 +01:00
// Clear out all other exit listeners so they don't accidentally start the process up again
2023-04-17 13:10:45 +02:00
if (!this.#_process)
return resolve();
this.#_process.removeAllListeners('exit');
2022-11-06 18:35:53 +01:00
// Set timeout for force kill
const to = setTimeout(() => {
2023-04-17 13:10:45 +02:00
if (!this.#_process)
return resolve();
this.#_process.kill();
2022-11-06 18:35:53 +01:00
resolve();
2023-02-10 22:56:26 +01:00
}, KillTO);
2022-11-06 18:35:53 +01:00
// Gracefully handle exit
2023-04-17 13:10:45 +02:00
this.#_process.once('exit', (code, signal) => {
2022-11-06 18:35:53 +01:00
clearTimeout(to);
2023-02-06 13:08:41 +01:00
this._handleExit(code, signal, false);
2022-11-06 18:35:53 +01:00
resolve();
});
// Clear the force kill timeout if the process responds with a shutdown echo, allowing it time to gracefully close all connections
this.once('shutdown', () => {
clearTimeout(to);
});
2023-04-17 13:10:45 +02:00
this.#_process.send({ _shutdown: true });
2022-11-06 18:35:53 +01:00
});
}
2023-02-06 13:08:41 +01:00
this._handleExit(null, null, false);
2022-11-06 18:35:53 +01:00
}
2023-04-17 13:10:45 +02:00
send (message: IPCMessage, expectResponse = false): Promise<IPCMessage | void> {
2023-02-13 01:08:28 +01:00
2023-04-17 13:10:45 +02:00
if (!this.ready || !this.#_process)
2023-02-13 01:08:28 +01:00
return Promise.reject(new Error(`[shard-${this.id}] Cannot send message to dead shard.`));
2023-04-17 13:10:45 +02:00
return new Promise<IPCMessage | void>((resolve, reject) => {
2023-02-13 01:08:28 +01:00
if (expectResponse) {
message._id = Util.randomUUID();
const to = setTimeout(reject, 10_000, [ new Error('Message timeout') ]);
2023-04-17 13:10:45 +02:00
this.#_awaitingResponse.set(message._id, (args: IPCMessage) => {
2023-02-13 01:08:28 +01:00
clearTimeout(to);
2023-04-17 13:10:45 +02:00
resolve(args);
2022-11-06 18:35:53 +01:00
});
2023-02-06 14:12:22 +01:00
}
2023-02-13 01:08:28 +01:00
2023-04-17 13:10:45 +02:00
this.#_process?.send(message, err => {
2023-02-13 01:08:28 +01:00
if (err)
return reject(err);
if (!expectResponse)
resolve();
});
2022-11-06 18:35:53 +01:00
});
}
2023-02-10 22:56:26 +01:00
awaitShutdown () {
2023-04-17 13:10:45 +02:00
this.#_respawn = false;
return new Promise<void>((resolve) => {
this.#_awaitingShutdown = resolve;
2023-02-10 22:56:26 +01:00
});
}
2023-04-17 13:10:45 +02:00
_handleMessage (message: IPCMessage) {
2022-11-06 18:35:53 +01:00
if (message) {
if (message._ready) {
2023-04-17 13:10:45 +02:00
this.#_ready = true;
2022-11-06 18:35:53 +01:00
this.emit('ready');
return;
} else if (message._shutdown) {
2023-02-10 22:56:26 +01:00
setTimeout(() => {
if (this.process)
this.process.kill('SIGKILL');
}, KillTO);
2023-04-17 13:10:45 +02:00
this.#_ready = false;
2022-11-06 18:35:53 +01:00
this.emit('shutdown');
return;
2022-11-06 19:31:41 +01:00
} else if (message._fatal) {
2023-04-17 13:10:45 +02:00
this.#_process?.removeAllListeners();
this.#_ready = false;
this.#_fatal = true;
2023-02-06 13:08:41 +01:00
this._handleExit(null, null, false);
2023-02-13 01:08:28 +01:00
return this.emit('fatal', message);
} else if (message._id) {
2023-04-17 13:10:45 +02:00
const promise = this.#_awaitingResponse.get(message._id);
2023-02-13 01:08:28 +01:00
if (promise)
return promise(message);
2022-11-06 18:35:53 +01:00
}
}
2022-11-09 10:20:06 +01:00
this.emit('message', message);
2022-11-06 18:35:53 +01:00
}
_handleDisconnect () {
this.emit('disconnect');
}
2023-04-17 13:10:45 +02:00
_handleExit (code: number | null, _signal: string | null, respawn = this.#_respawn) {
2023-02-06 14:12:22 +01:00
if (this.process)
this.process.removeAllListeners();
2022-11-06 18:35:53 +01:00
this.emit('death');
2023-04-17 13:10:45 +02:00
if (this.#_awaitingShutdown)
this.#_awaitingShutdown();
2022-11-06 18:35:53 +01:00
2023-02-06 14:12:22 +01:00
if (code !== 0)
2023-04-17 13:10:45 +02:00
this.#_crashes.push(Date.now() - this.spawnedAt);
2023-02-06 13:08:41 +01:00
2023-04-17 13:10:45 +02:00
this.#_ready = false;
this.#_process = null;
2022-11-06 18:35:53 +01:00
2023-04-17 13:10:45 +02:00
const len = this.#_crashes.length;
2023-02-06 13:08:41 +01:00
if (len > 2) {
2023-04-17 13:10:45 +02:00
const last3 = this.#_crashes.slice(len - 3);
2023-02-06 13:08:41 +01:00
const sum = last3.reduce((s, val) => {
s += val;
return s;
}, 0);
const avg = sum / 3;
// If average run duration is below 60 mins send a notification about detected crash loop and stop the respawn
if (avg < 60 * 60 * 1000) {
this.emit('warn', `Crash loop detected, average run time for the last 3 spawns: ${avg}`);
}
2023-02-06 13:08:41 +01:00
respawn = false;
}
2023-02-06 13:08:41 +01:00
2023-02-06 14:12:22 +01:00
if (respawn)
this.spawn().catch(err => this.emit('error', err));
2023-02-06 13:08:41 +01:00
2022-11-06 18:35:53 +01:00
}
}
2023-04-17 13:10:45 +02:00
export default Shard;