311 lines
9.2 KiB
TypeScript
311 lines
9.2 KiB
TypeScript
// const ChildProcess = require('node:child_process');
|
|
// const { EventEmitter } = require('node:events');
|
|
import { ChildProcess, fork } from 'node:child_process';
|
|
import EventEmitter from 'node:events';
|
|
|
|
// const { Util } = require('../util');
|
|
import { Util } from '../util/index.js';
|
|
import { ShardOptions } from '../../@types/Controller.js';
|
|
import { IPCMessage } from '../../@types/Other.js';
|
|
import { ServerOptions } from '../../@types/Server.js';
|
|
import path from 'node:path';
|
|
|
|
// Give subprocess 90s to shut down before being forcibly killed
|
|
const KillTO = 90 * 1000;
|
|
class Shard extends EventEmitter
|
|
{
|
|
// #_controller: Controller;
|
|
#id: number;
|
|
#filePath: string;
|
|
#args: string[];
|
|
#execArgv: string[];
|
|
#env: { [key: string]: string };
|
|
#respawn: boolean;
|
|
#serverOptions: ServerOptions;
|
|
|
|
#ready: boolean;
|
|
#process: ChildProcess | null;
|
|
#fatal: boolean;
|
|
|
|
#crashes: number[];
|
|
#spawnedAt: number;
|
|
#awaitingShutdown: (() => void) | null;
|
|
#awaitingResponse: Map<string, (args: IPCMessage) => void>;
|
|
|
|
constructor (id: number, options: ShardOptions)
|
|
{
|
|
super();
|
|
|
|
if (typeof id !== 'number' || isNaN(id))
|
|
throw new Error('Missing ID');
|
|
this.#id = id;
|
|
|
|
if (!options?.path)
|
|
throw new Error('Missing path to file to fork');
|
|
this.#filePath = options.path;
|
|
this.#args = options.args || [];
|
|
this.#execArgv = options.execArgv || [];
|
|
this.#env = options.env || {};
|
|
this.#respawn = options.respawn || false;
|
|
this.#serverOptions = options.serverOptions || {} as ServerOptions;
|
|
this.#serverOptions.dir = path.resolve(options.path, '..');
|
|
|
|
this.#ready = false;
|
|
this.#process = null;
|
|
this.#fatal = false;
|
|
|
|
// Keep track of crashes for preventing crash loops
|
|
this.#crashes = [];
|
|
// Set in the spawn method
|
|
this.#spawnedAt = Date.now(); // Gets re-set once actually spawned
|
|
this.#awaitingShutdown = null;
|
|
this.#awaitingResponse = new Map();
|
|
}
|
|
|
|
get id ()
|
|
{
|
|
return this.#id;
|
|
}
|
|
|
|
get fatal ()
|
|
{
|
|
return this.#fatal;
|
|
}
|
|
|
|
get process ()
|
|
{
|
|
return this.#process;
|
|
}
|
|
|
|
get ready ()
|
|
{
|
|
return this.#ready;
|
|
}
|
|
|
|
get spawnedAt ()
|
|
{
|
|
return this.#spawnedAt;
|
|
}
|
|
|
|
async spawn (waitForReady = false)
|
|
{
|
|
if (this.fatal)
|
|
throw new Error(`[shard-${this.id}] Process died fatally and cannot be restarted. Fix the issue before trying again.`);
|
|
if (this.process)
|
|
throw new Error(`[shard-${this.id}] A process for this shard already exists!`);
|
|
|
|
this.#process = fork(this.#filePath, this.#args, {
|
|
env: {
|
|
...this.#env,
|
|
SHARD_ID: this.id.toString()
|
|
},
|
|
execArgv: this.#execArgv
|
|
})
|
|
.on('message', this.#handleMessage.bind(this))
|
|
.on('exit', this.#handleExit.bind(this))
|
|
.on('error', this.#handleError.bind(this))
|
|
.on('disconnect', this.#handleDisconnect.bind(this)); // Don't know if this is going to help, but monitoring whether this gets called whenever a process on its own closes the IPC channel
|
|
|
|
this.#process.once('spawn', () =>
|
|
{
|
|
this.emit('spawn');
|
|
this.#process!.send({ _start: this.#serverOptions });
|
|
this.#spawnedAt = Date.now();
|
|
});
|
|
if (!waitForReady)
|
|
return;
|
|
|
|
return new Promise((resolve, reject) =>
|
|
{
|
|
this.once('ready', resolve);
|
|
this.once('disconnect', () => reject(new Error(`[shard-${this.id}] Shard disconnected while starting up`)));
|
|
this.once('death', () => reject(new Error(`[shard-${this.id}] Shard died while starting`)));
|
|
// setTimeout(() => reject(new Error(`[shard-${this.id}] Shard timed out while starting`)), 30_000);
|
|
});
|
|
}
|
|
|
|
async respawn (delay = 500)
|
|
{
|
|
await this.kill();
|
|
if (delay)
|
|
await Util.wait(delay);
|
|
return this.spawn(true);
|
|
}
|
|
|
|
/**
|
|
* Sends a shutdown command to the shard, if it doesn't respond within 5 seconds it gets killed
|
|
*
|
|
* @return {*}
|
|
* @memberof Shard
|
|
*/
|
|
kill (): Promise<void> | void
|
|
{
|
|
if (this.process)
|
|
{
|
|
return new Promise<void>((resolve) =>
|
|
{
|
|
// Clear out all other exit listeners so they don't accidentally start the process up again
|
|
if (!this.#process)
|
|
return resolve();
|
|
this.#process.removeAllListeners('exit');
|
|
// Set timeout for force kill
|
|
const to = setTimeout(() =>
|
|
{
|
|
if (!this.#process)
|
|
return resolve();
|
|
this.#process.kill();
|
|
resolve();
|
|
}, KillTO);
|
|
// Gracefully handle exit
|
|
this.#process.once('exit', (code = 0, signal) =>
|
|
{
|
|
clearTimeout(to);
|
|
this.#handleExit(code, signal, false);
|
|
resolve();
|
|
});
|
|
// Clear the force kill timeout if the process responds with a shutdown echo, allowing it time to gracefully close all connections
|
|
this.once('shutdown', () =>
|
|
{
|
|
clearTimeout(to);
|
|
});
|
|
|
|
this.#process.send({ _shutdown: true });
|
|
});
|
|
}
|
|
this.#handleExit(null, null, false);
|
|
}
|
|
|
|
send (message: IPCMessage, expectResponse = false): Promise<IPCMessage | void>
|
|
{
|
|
if (!this.ready || !this.#process)
|
|
return Promise.reject(new Error(`[shard-${this.id}] Cannot send message to dead shard.`));
|
|
|
|
return new Promise<IPCMessage | void>((resolve, reject) =>
|
|
{
|
|
if (expectResponse)
|
|
{
|
|
message._id = Util.randomUUID();
|
|
const to = setTimeout(reject, 10_000, [ new Error('Message timeout') ]);
|
|
this.#awaitingResponse.set(message._id, (args: IPCMessage) =>
|
|
{
|
|
clearTimeout(to);
|
|
resolve(args);
|
|
});
|
|
}
|
|
|
|
this.#process?.send(message, err =>
|
|
{
|
|
if (err)
|
|
return reject(err);
|
|
|
|
if (!expectResponse)
|
|
resolve();
|
|
});
|
|
});
|
|
}
|
|
|
|
awaitShutdown ()
|
|
{
|
|
this.#respawn = false;
|
|
return new Promise<void>((resolve) =>
|
|
{
|
|
this.#awaitingShutdown = resolve;
|
|
});
|
|
}
|
|
|
|
#handleMessage (message: IPCMessage)
|
|
{
|
|
if (message)
|
|
{
|
|
if (message._ready)
|
|
{
|
|
this.#ready = true;
|
|
this.emit('ready');
|
|
return;
|
|
}
|
|
else if (message._shutdown)
|
|
{
|
|
const TO = setTimeout(() =>
|
|
{
|
|
this.#process?.kill('SIGKILL');
|
|
}, KillTO);
|
|
this.#process?.once('exit', () =>
|
|
{
|
|
clearTimeout(TO);
|
|
});
|
|
this.#ready = false;
|
|
this.emit('shutdown');
|
|
return;
|
|
}
|
|
else if (message._fatal)
|
|
{
|
|
this.#process?.removeAllListeners();
|
|
this.#ready = false;
|
|
this.#fatal = true;
|
|
this.#handleExit(null, null, false);
|
|
return this.emit('fatal', message);
|
|
}
|
|
else if (message._id)
|
|
{
|
|
const promise = this.#awaitingResponse.get(message._id);
|
|
if (promise)
|
|
return promise(message);
|
|
}
|
|
}
|
|
|
|
this.emit('message', message);
|
|
}
|
|
|
|
#handleDisconnect ()
|
|
{
|
|
this.emit('disconnect');
|
|
}
|
|
|
|
#handleExit (code: number | null, _signal: string | null, respawn = this.#respawn)
|
|
{
|
|
if (this.process)
|
|
this.process.removeAllListeners();
|
|
|
|
this.emit('death');
|
|
if (this.#awaitingShutdown)
|
|
this.#awaitingShutdown();
|
|
|
|
if (code && code !== 0)
|
|
{
|
|
this.#crashes.push(Date.now() - this.spawnedAt);
|
|
this.emit('warn', 'Shard exited with non-zero exit code: ' + code);
|
|
}
|
|
|
|
this.#ready = false;
|
|
this.#process = null;
|
|
|
|
const len = this.#crashes.length;
|
|
if (len > 2)
|
|
{
|
|
const last3 = this.#crashes.slice(len - 3);
|
|
const sum = last3.reduce((s, val) =>
|
|
{
|
|
s += val;
|
|
return s;
|
|
}, 0);
|
|
const avg = sum / 3;
|
|
// If average run duration is below 60 mins send a notification about detected crash loop and stop the respawn
|
|
if (avg < 60 * 60 * 1000)
|
|
{
|
|
this.emit('warn', `Crash loop detected, average run time for the last 3 spawns: ${avg}`);
|
|
}
|
|
respawn = false;
|
|
}
|
|
|
|
if (respawn)
|
|
this.spawn().catch(err => this.emit('error', err));
|
|
}
|
|
|
|
#handleError (error: Error)
|
|
{
|
|
this.emit('error', error);
|
|
}
|
|
|
|
}
|
|
|
|
export default Shard; |