Add more debug statements to broker

Upgrade packages
This commit is contained in:
Erik 2024-10-09 00:03:42 +03:00
parent d781458cee
commit 0dac59b426
13 changed files with 1247 additions and 1149 deletions

1
.eslintignore Normal file
View File

@ -0,0 +1 @@
tests

View File

@ -12,11 +12,19 @@
"plugins": [ "@typescript-eslint" ], "plugins": [ "@typescript-eslint" ],
"parserOptions": { "parserOptions": {
"ecmaVersion": 2022, "ecmaVersion": 2022,
"sourceType": "module" "sourceType": "module",
"project": "./tsconfig.json"
}, },
"rules": { "rules": {
"@typescript-eslint/no-unused-vars": "off", "@typescript-eslint/no-unused-vars": "off",
"@typescript-eslint/no-non-null-assertion":"off", "@typescript-eslint/no-non-null-assertion": "off",
"@typescript-eslint/no-misused-promises": ["error", {
"checksVoidReturn": {
"arguments": false
}
}],
"@typescript-eslint/no-shadow": "error",
"@typescript-eslint/no-use-before-define": "error",
"accessor-pairs": "warn", "accessor-pairs": "warn",
"array-callback-return": "warn", "array-callback-return": "warn",
"array-bracket-newline": [ "array-bracket-newline": [
@ -74,7 +82,7 @@
"warn", "warn",
"never" "never"
], ],
"eqeqeq": "warn", "eqeqeq": "error",
"func-call-spacing": "warn", "func-call-spacing": "warn",
"func-name-matching": "warn", "func-name-matching": "warn",
"func-names": "warn", "func-names": "warn",
@ -87,18 +95,38 @@
"id-blacklist": "warn", "id-blacklist": "warn",
"id-match": "warn", "id-match": "warn",
"implicit-arrow-linebreak": "warn", "implicit-arrow-linebreak": "warn",
"indent": "warn", "indent": [
"init-declarations": "warn",
"quotes": ["error" , "single"],
"jsx-quotes": [
"warn", "warn",
4,
{
"SwitchCase": 1,
"VariableDeclarator": "first",
"FunctionDeclaration": {
"parameters": "first"
},
"CallExpression": {
"arguments": "first"
},
"ArrayExpression": "first",
"ObjectExpression": "first",
"ImportDeclaration": "first"
}
],
"init-declarations": "warn",
"quotes": [
"error",
"single"
],
"jsx-quotes": [
"error",
"prefer-single" "prefer-single"
], ],
"key-spacing": [ "key-spacing": [
"warn", "warn",
{ {
"beforeColon": false, "beforeColon": false,
"afterColon": true "afterColon": true,
"align": "value"
} }
], ],
"keyword-spacing": [ "keyword-spacing": [
@ -128,7 +156,7 @@
], ],
"max-lines-per-function": [ "max-lines-per-function": [
"warn", "warn",
100 140
], ],
"max-depth": [ "max-depth": [
"warn", "warn",
@ -137,16 +165,16 @@
"new-parens": "warn", "new-parens": "warn",
"no-alert": "warn", "no-alert": "warn",
"no-array-constructor": "warn", "no-array-constructor": "warn",
// "no-bitwise": "warn",
"no-buffer-constructor": "warn", "no-buffer-constructor": "warn",
"no-caller": "warn", "no-caller": "warn",
"no-console": "warn", "no-console": "warn",
"no-constant-binary-expression": "error",
"no-div-regex": "warn", "no-div-regex": "warn",
"no-dupe-else-if": "warn", "no-dupe-else-if": "warn",
"no-duplicate-imports": "warn", "no-duplicate-imports": "warn",
"no-else-return": "warn", "no-else-return": "warn",
"no-empty-function": "warn", "no-empty-function": "warn",
"no-eq-null": "warn", "no-eq-null": "off",
"no-eval": "warn", "no-eval": "warn",
"no-extend-native": "warn", "no-extend-native": "warn",
"no-extra-bind": "warn", "no-extra-bind": "warn",
@ -164,7 +192,15 @@
"no-loop-func": "warn", "no-loop-func": "warn",
"no-mixed-requires": "warn", "no-mixed-requires": "warn",
"no-multi-assign": "warn", "no-multi-assign": "warn",
"no-multi-spaces": "warn", "no-multi-spaces": [
"warn",
{
"ignoreEOLComments": true,
"exceptions": {
"ImportDeclaration": true
}
}
],
"no-multi-str": "warn", "no-multi-str": "warn",
"no-multiple-empty-lines": "warn", "no-multiple-empty-lines": "warn",
"no-native-reassign": "warn", "no-native-reassign": "warn",
@ -185,23 +221,25 @@
"no-restricted-modules": "warn", "no-restricted-modules": "warn",
"no-restricted-properties": "warn", "no-restricted-properties": "warn",
"no-restricted-syntax": "warn", "no-restricted-syntax": "warn",
"no-return-assign": "warn", "no-return-assign": [
"warn",
"except-parens"
],
"no-return-await": "warn", "no-return-await": "warn",
"no-script-url": "warn", "no-script-url": "warn",
"no-self-compare": "warn", "no-self-compare": "warn",
"no-sequences": "warn", "no-sequences": "warn",
"no-setter-return": "warn", "no-setter-return": "warn",
"no-spaced-func": "warn", "no-spaced-func": "warn",
"@typescript-eslint/no-shadow": "error", "no-tabs": "error",
"no-tabs": "warn",
"no-template-curly-in-string": "error", "no-template-curly-in-string": "error",
"no-throw-literal": "warn", "no-throw-literal": "warn",
"no-trailing-spaces": "warn",
"no-undef-init": "error", "no-undef-init": "error",
"no-undefined": "error", "no-undefined": "error",
"no-unmodified-loop-condition": "warn", "no-unmodified-loop-condition": "warn",
"no-unneeded-ternary": "error", "no-unneeded-ternary": "error",
"no-unused-expressions": "warn", "no-unused-expressions": "warn",
"@typescript-eslint/no-use-before-define": "error",
"no-useless-call": "warn", "no-useless-call": "warn",
"no-useless-computed-key": "warn", "no-useless-computed-key": "warn",
"no-useless-concat": "warn", "no-useless-concat": "warn",
@ -209,7 +247,6 @@
"no-useless-rename": "warn", "no-useless-rename": "warn",
"no-useless-return": "warn", "no-useless-return": "warn",
"no-var": "warn", "no-var": "warn",
// "no-void": "warn",
"no-whitespace-before-property": "error", "no-whitespace-before-property": "error",
"nonblock-statement-body-position": [ "nonblock-statement-body-position": [
"warn", "warn",
@ -293,6 +330,16 @@
"yoda": [ "yoda": [
"warn", "warn",
"never" "never"
],
"no-warning-comments": [
1,
{
"terms": [
"todo",
"fixme"
],
"location": "anywhere"
}
] ]
} }
} }

File diff suppressed because one or more lines are too long

925
.yarn/releases/yarn-4.5.0.cjs vendored Executable file

File diff suppressed because one or more lines are too long

View File

@ -1,5 +1,5 @@
nodeLinker: node-modules nodeLinker: node-modules
yarnPath: .yarn/releases/yarn-4.1.1.cjs npmRegistryServer: "https://registry.corgi.wtf"
npmRegistryServer: "https://registry.corgi.wtf" yarnPath: .yarn/releases/yarn-4.5.0.cjs

View File

@ -54,5 +54,5 @@
"optional": true "optional": true
} }
}, },
"packageManager": "yarn@4.1.1" "packageManager": "yarn@4.5.0"
} }

View File

@ -6,7 +6,7 @@ import { ILogger, IServer } from './interfaces/index.js';
const SAFE_TO_RETRY = [ 'ER_LOCK_DEADLOCK', 'PROTOCOL_CONNECTION_LOST' ]; const SAFE_TO_RETRY = [ 'ER_LOCK_DEADLOCK', 'PROTOCOL_CONNECTION_LOST' ];
type Credentials = { interface Credentials {
user: string, user: string,
password: string, password: string,
host: string, host: string,
@ -18,7 +18,7 @@ type ExtendedCredentials = {
node: string node: string
} & Credentials } & Credentials
export type MariaOptions = { export interface MariaOptions {
load?: boolean, load?: boolean,
cluster?: PoolClusterConfig, cluster?: PoolClusterConfig,
client?: PoolConfig, client?: PoolConfig,
@ -40,7 +40,7 @@ type MariaError = {
type Value = (string | number | null | boolean | object) type Value = (string | number | null | boolean | object)
type Values = (Value | Value[] | Value[][])[]; type Values = (Value | Value[] | Value[][])[];
type QueryOptions = { interface QueryOptions {
node?: string, node?: string,
timeout?: number, timeout?: number,
errorIfNodeUnavailable?: boolean errorIfNodeUnavailable?: boolean
@ -54,14 +54,14 @@ type StatusString =
| 'donor' | 'donor'
| 'joiner' | 'joiner'
| 'joined' | 'joined'
type Node = { interface Node {
name: string, name: string,
host: string, host: string,
uuid?: string, uuid?: string,
status?: StatusString status?: StatusString
} }
type StatusUpdate = { interface StatusUpdate {
status: StatusString, status: StatusString,
uuid: string, uuid: string,
primary: string, primary: string,
@ -69,7 +69,7 @@ type StatusUpdate = {
members: string members: string
} }
const isOkPacket = (obj: object): obj is OkPacket => const isOkPacket = (obj: object): obj is OkPacket =>
{ {
if ('fieldCount' in obj if ('fieldCount' in obj
&& 'affectedRows' in obj && 'affectedRows' in obj
@ -82,7 +82,7 @@ const isOkPacket = (obj: object): obj is OkPacket =>
// Designed to work in a single instance or cluster (galera) configuration // Designed to work in a single instance or cluster (galera) configuration
// Will need some work for primary/replica configuration (e.g. not treating all nodes as read-write nodes) // Will need some work for primary/replica configuration (e.g. not treating all nodes as read-write nodes)
class MariaDB class MariaDB
{ {
#_activeQueries: number; #_activeQueries: number;
@ -113,7 +113,7 @@ class MariaDB
const hosts = host.split(','); const hosts = host.split(',');
this.#credentials = []; this.#credentials = [];
this.#nodes = []; this.#nodes = [];
for (const remote of hosts) for (const remote of hosts)
{ {
const [ node ] = remote.split('.'); const [ node ] = remote.split('.');
if (typeof node === 'undefined' || !(/(([a-zA-Z]\w*[0-9]?)|([0-9]?\w*[a-zA-Z]))/u).test(node)) if (typeof node === 'undefined' || !(/(([a-zA-Z]\w*[0-9]?)|([0-9]?\w*[a-zA-Z]))/u).test(node))
@ -135,48 +135,48 @@ class MariaDB
this.#server = server; this.#server = server;
} }
get ready () get ready ()
{ {
return this.#_ready; return this.#_ready;
} }
get activeQueries () get activeQueries ()
{ {
return this.#_activeQueries; return this.#_activeQueries;
} }
// eslint-disable-next-line max-lines-per-function
async init () async init ()
{ {
if (!this.#load) if (!this.#load)
return this.#logger?.info('Not loading MariaDB'); return this.#logger?.info('Not loading MariaDB');
this.#logger?.status(`Creating${this.#cluster ? ' cluster' : ''} connection pool`); this.#logger?.status(`Creating${this.#cluster ? ' cluster' : ''} connection pool`);
this.#pool = mysql.createPoolCluster(this.#config.cluster); this.#pool = mysql.createPoolCluster(this.#config.cluster);
for (const creds of this.#credentials) for (const creds of this.#credentials)
{ {
const name = creds.node; const name = creds.node;
this.#pool.add(name, { ...this.#config.client, ...creds }); this.#pool.add(name, { ...this.#config.client, ...creds });
this.#logger?.info(`Added node ${name} to pool cluster`); this.#logger?.info(`Added node ${name} to pool cluster`);
} }
this.#pool.on('connection', (connection) => this.#pool.on('connection', (connection) =>
{ {
this.#logger?.debug(`New connection: ${connection?.threadId || null}`); this.#logger?.debug(`New connection: ${connection?.threadId || null}`);
}); });
this.#pool.on('acquire', (connection) => this.#pool.on('acquire', (connection) =>
{ {
this.#_activeQueries++; this.#_activeQueries++;
this.#logger?.debug(`Connection acquired: ${connection?.threadId || null}`); this.#logger?.debug(`Connection acquired: ${connection?.threadId || null}`);
}); });
this.#pool.on('enqueue', () => this.#pool.on('enqueue', () =>
{ {
this.#logger?.debug('Query enqueued for connection'); this.#logger?.debug('Query enqueued for connection');
}); });
this.#pool.on('release', (connection) => this.#pool.on('release', (connection) =>
{ {
this.#_activeQueries--; this.#_activeQueries--;
@ -186,7 +186,7 @@ class MariaDB
this.#logger?.debug(`Connection released: ${connection?.threadId || null}`); this.#logger?.debug(`Connection released: ${connection?.threadId || null}`);
}); });
this.#pool.on('remove', (nodeId) => this.#pool.on('remove', (nodeId) =>
{ {
this.#logger?.status(`Node ${nodeId} was removed from pool`); this.#logger?.status(`Node ${nodeId} was removed from pool`);
const index = this.#nodes.findIndex(n => n === nodeId); const index = this.#nodes.findIndex(n => n === nodeId);
@ -194,45 +194,49 @@ class MariaDB
}); });
this.#logger?.info('Testing MariaDB connection'); this.#logger?.info('Testing MariaDB connection');
await new Promise<void>((resolve, reject) => await new Promise<void>((resolve, reject) =>
{ {
if (!this.#pool) if (!this.#pool)
return reject(new Error('Missing connection pool')); {
this.#pool.getConnection((err, conn) => reject(new Error('Missing connection pool')); return;
}
this.#pool.getConnection((err, conn) =>
{ {
if (err) if (err)
return reject(err); {
reject(err); return;
}
conn.release(); conn.release();
return resolve(); resolve();
}); });
}); });
this.#logger?.status('Database connected'); this.#logger?.status('Database connected');
if (this.#cluster) if (this.#cluster)
{ // Resolve the UUID for each node to enable status updates { // Resolve the UUID for each node to enable status updates
this.#logger?.status('Resolving cluster node UUIDs for status updates'); this.#logger?.status('Resolving cluster node UUIDs for status updates');
for (const node of this.#nodes) for (const node of this.#nodes)
{ {
// UUIDs are not always enough, we also need the node's IP for cases where the UUID changes due to restarts // UUIDs are not always enough, we also need the node's IP for cases where the UUID changes due to restarts
const dnsResult = await dns.lookup(node.host, 4); const dnsResult = await dns.lookup(node.host, 4);
node.host = dnsResult.address; node.host = dnsResult.address;
const response = await this.#_query('SHOW STATUS WHERE `Variable_name` = "wsrep_gcomm_uuid" OR `Variable_name` = "wsrep_local_state_comment"', [], { node: node.name }).catch(() => null) as { Variable_name: string, Value: string }[]; const response = await this.#_query('SHOW STATUS WHERE `Variable_name` = "wsrep_gcomm_uuid" OR `Variable_name` = "wsrep_local_state_comment"', [], { node: node.name }).catch(() => null) as { Variable_name: string, Value: string }[];
// Error gives us null // Error gives us null
if (!response) if (!response)
{ {
this.#logger?.info(`Could not resolve UUID for ${node.name}, presumably offline, setting status to disconnected`); this.#logger?.info(`Could not resolve UUID for ${node.name}, presumably offline, setting status to disconnected`);
node.status = 'disconnected'; node.status = 'disconnected';
continue; continue;
} }
// if we for some reason get a response that is empty, we got a problem // if we for some reason get a response that is empty, we got a problem
if (!response.length) if (!response.length)
throw new Error(`Failed to acquire UUID for node ${node.name}`); throw new Error(`Failed to acquire UUID for node ${node.name}`);
const uuid = response.find(entry => entry.Variable_name === 'wsrep_gcomm_uuid')?.Value; const uuid = response.find(entry => entry.Variable_name === 'wsrep_gcomm_uuid')?.Value;
const status = response.find(entry => entry.Variable_name === 'wsrep_local_state_comment')?.Value.toLowerCase(); const status = response.find(entry => entry.Variable_name === 'wsrep_local_state_comment')?.Value.toLowerCase();
node.uuid = uuid; node.uuid = uuid;
node.status = status as StatusString; node.status = status as StatusString;
} }
} }
// eslint-disable-next-line @typescript-eslint/ban-ts-comment // eslint-disable-next-line @typescript-eslint/ban-ts-comment
@ -246,9 +250,9 @@ class MariaDB
{ {
this.#logger?.info('Setting up metric recording'); this.#logger?.info('Setting up metric recording');
MariaDB.#queryHistogram = new Prometheus.Histogram({ MariaDB.#queryHistogram = new Prometheus.Histogram({
name: 'sql_queries', name: 'sql_queries',
help: 'Tracks query duration in seconds and frequency', help: 'Tracks query duration in seconds and frequency',
buckets: this.#config.metricsBuckets ?? Prometheus?.exponentialBuckets(0.005, 2, 10), buckets: this.#config.metricsBuckets ?? Prometheus?.exponentialBuckets(0.005, 2, 10),
labelNames: [ 'type' ] as const labelNames: [ 'type' ] as const
}); });
if (this.#server.registerMetric) if (this.#server.registerMetric)
@ -259,23 +263,25 @@ class MariaDB
return this; return this;
} }
async close () async close ()
{ {
this.#logger?.status('Shutting down database connections'); this.#logger?.status('Shutting down database connections');
if (!this.ready) if (!this.ready)
return Promise.resolve(); return Promise.resolve();
this.#_ready = false; this.#_ready = false;
if (this.#_activeQueries) if (this.#_activeQueries)
{ {
this.#logger?.info(`${this.#_activeQueries} active queries still running, letting them finish`); this.#logger?.info(`${this.#_activeQueries} active queries still running, letting them finish`);
await this.finishQueries(); await this.finishQueries();
this.#logger?.info('Queries finished, shutting down'); this.#logger?.info('Queries finished, shutting down');
} }
return new Promise<void>(resolve => return new Promise<void>(resolve =>
{ {
if (!this.#pool) if (!this.#pool)
return resolve(); {
this.#pool.end(() => resolve(); return;
}
this.#pool.end(() =>
{ {
this.#pool?.removeAllListeners(); this.#pool?.removeAllListeners();
resolve(); resolve();
@ -284,7 +290,7 @@ class MariaDB
}); });
} }
get statusUpdateListener () get statusUpdateListener ()
{ {
return this.#statusUpdate.bind(this); return this.#statusUpdate.bind(this);
} }
@ -297,29 +303,29 @@ class MariaDB
if (!update.members.length) if (!update.members.length)
{ {
if (update.status === 'disconnected') if (update.status === 'disconnected')
this.#nodes.forEach(node => this.#nodes.forEach(node =>
{ {
if (node.status === 'disconnecting') if (node.status === 'disconnecting')
{ {
const oldStatus = node.status; const oldStatus = node.status;
node.status = 'disconnected'; node.status = 'disconnected';
this.#logger?.status(`Cluster node ${node.name} changed status from ${oldStatus} to ${node.status}`, { broadcast: true }); this.#logger?.status(`Cluster node ${node.name} changed status from ${oldStatus} to ${node.status}`, { broadcast: true });
} }
}); });
else if (update.status === 'synced') else if (update.status === 'synced')
{ // Sometimes we'll get a status update without the member info, probably safe to assume if we get a synced status the recently joined members will be synced { // Sometimes we'll get a status update without the member info, probably safe to assume if we get a synced status the recently joined members will be synced
this.#nodes.forEach(node => this.#nodes.forEach(node =>
{ {
if (node.status === 'joined') if (node.status === 'joined')
{ {
const oldStatus = node.status; const oldStatus = node.status;
node.status = 'synced'; node.status = 'synced';
this.#logger?.status(`Cluster node ${node.name} changed status from ${oldStatus} to ${node.status}`, { broadcast: true }); this.#logger?.status(`Cluster node ${node.name} changed status from ${oldStatus} to ${node.status}`, { broadcast: true });
} }
}); });
} }
return; return;
} }
const members = update.members.split(','); const members = update.members.split(',');
const index = parseInt(update.index); const index = parseInt(update.index);
@ -337,29 +343,31 @@ class MariaDB
return this.#logger?.warn('Received status update for node that is not present in config', { broadcast: true }); return this.#logger?.warn('Received status update for node that is not present in config', { broadcast: true });
node.uuid = id; node.uuid = id;
} }
if (node.status !== update.status) if (node.status !== update.status)
{ {
const oldStatus = node.status; const oldStatus = node.status;
node.status = update.status; node.status = update.status;
this.#logger?.status(`Cluster node ${node.name} changed status from ${oldStatus} to ${node.status}`, { broadcast: true }); this.#logger?.status(`Cluster node ${node.name} changed status from ${oldStatus} to ${node.status}`, { broadcast: true });
} }
} }
finishQueries () finishQueries ()
{ {
return new Promise<void>(resolve => return new Promise<void>(resolve =>
{ {
this.#afterLastQuery = resolve; this.#afterLastQuery = resolve;
}); });
} }
getConnection (nodeName: string | null, throwError: boolean): Promise<PoolConnection> getConnection (nodeName: string | null, throwError: boolean): Promise<PoolConnection>
{ {
return new Promise((resolve, reject) => return new Promise((resolve, reject) =>
{ {
if (!this.#pool) if (!this.#pool)
return reject(new Error('Pool closed')); {
reject(new Error('Pool closed')); return;
}
// Get node by name // Get node by name
const pool = this.#pool; const pool = this.#pool;
if (nodeName) if (nodeName)
@ -371,54 +379,56 @@ class MariaDB
if (!node) if (!node)
{ {
const str = `Node ${nodeName} is not available in pool, falling back to arbitrary node`; const str = `Node ${nodeName} is not available in pool, falling back to arbitrary node`;
if (throwError) if (throwError)
throw new Error(str); throw new Error(str);
this.#logger?.warn(str); this.#logger?.warn(str);
nodeName = '*'; nodeName = '*';
} }
else if (node.status && node.status !== 'synced' && !(this.#canQueryDonor && node.status === 'donor')) else if (node.status && node.status !== 'synced' && !(this.#canQueryDonor && node.status === 'donor'))
{ {
const str = `Node ${nodeName} is currently not synced with the pool and thus unqueryable`; const str = `Node ${nodeName} is currently not synced with the pool and thus unqueryable`;
if (throwError) if (throwError)
throw new Error(str); throw new Error(str);
this.#logger?.warn(str); this.#logger?.warn(str);
nodeName = '*'; nodeName = '*';
} }
} }
this.#logger?.debug(`Selected node ${nodeName} for query`); this.#logger?.debug(`Selected node ${nodeName} for query`);
return pool.of(nodeName ?? '*').getConnection((err, conn) => pool.of(nodeName ?? '*').getConnection((err, conn) =>
{ {
if (err) if (err)
return reject(err); {
return resolve(conn); reject(err); return;
}
resolve(conn);
}); });
}); });
} }
/** /**
* Retry certain queries that are safe to retry, e.g. deadlock * Retry certain queries that are safe to retry, e.g. deadlock
* @throws {MariaError} * @throws {MariaError}
* @private * @private
* */ * */
async #_query<T> (query: string, values: Values, { timeout, node, errorIfNodeUnavailable }: QueryOptions = {}, attempts = 0): async #_query<T> (query: string, values: Values, { timeout, node, errorIfNodeUnavailable }: QueryOptions = {}, attempts = 0):
Promise<T[] | FieldInfo[] | [OkPacket]> Promise<T[] | FieldInfo[] | [OkPacket]>
{ {
const connection = await this.getConnection(node ?? null, errorIfNodeUnavailable ?? false); const connection = await this.getConnection(node ?? null, errorIfNodeUnavailable ?? false);
try try
{ {
const result = await new Promise<T[] | FieldInfo[] | [OkPacket]| undefined>((resolve, reject) => const result = await new Promise<T[] | FieldInfo[] | [OkPacket]| undefined>((resolve, reject) =>
{ {
const endTimer = MariaDB.#queryHistogram?.startTimer({ type: [ 'SELECT', 'UPDATE', 'INSERT', 'DELETE' ].find(entry => query.toUpperCase().includes(entry)) ?? 'OTHER' }); const endTimer = MariaDB.#queryHistogram?.startTimer({ type: [ 'SELECT', 'UPDATE', 'INSERT', 'DELETE' ].find(entry => query.toUpperCase().includes(entry)) ?? 'OTHER' });
const q = connection.query({ timeout, sql: query }, values, (err, results, fields) => const q = connection.query({ timeout, sql: query }, values, (err, results, fields) =>
{ {
if (err) if (err)
reject(err); reject(err);
else if (isOkPacket(results)) else if (isOkPacket(results))
resolve([ results ]); resolve([ results ]);
else if (results) else if (results)
resolve(results); resolve(results);
else else
resolve(fields); resolve(fields);
connection.release(); connection.release();
if (endTimer) if (endTimer)
@ -428,24 +438,24 @@ class MariaDB
}); });
return Promise.resolve(result || []); return Promise.resolve(result || []);
} }
catch (err) catch (err)
{ {
const error = err as MariaError; const error = err as MariaError;
// Retry safe errors // (Galera) Instance not ready for query // Retry safe errors // (Galera) Instance not ready for query
if ((SAFE_TO_RETRY.includes(error.code) || error.errno === 1047) && attempts < 5) // if ((SAFE_TO_RETRY.includes(error.code) || error.errno === 1047) && attempts < 5) //
return this.#_query(query, values, { timeout, node }, ++attempts); return this.#_query(query, values, { timeout, node }, ++attempts);
return Promise.reject(error); return Promise.reject(error);
} }
} }
async query<T> (query: string, values?: Values | QueryOptions, opts?: QueryOptions): Promise<T[] | FieldInfo[] | [OkPacket]> async query<T> (query: string, values?: Values | QueryOptions, opts?: QueryOptions): Promise<T[] | FieldInfo[] | [OkPacket]>
{ {
if (values && !(values instanceof Array)) if (values && !(values instanceof Array))
{ {
opts = values; opts = values;
values = []; values = [];
} }
if (!this.ready) if (!this.ready)
return Promise.reject(new Error('MariaDB not ready')); return Promise.reject(new Error('MariaDB not ready'));
@ -460,7 +470,7 @@ class MariaDB
} }
q<T> (query: string, values?: Values | QueryOptions, opts?: QueryOptions) q<T> (query: string, values?: Values | QueryOptions, opts?: QueryOptions)
{ {
return this.query<T>(query, values, opts); return this.query<T>(query, values, opts);
} }

View File

@ -2,15 +2,15 @@ import { ILogger, IServer } from './interfaces/index.js';
import amqp, { AmqpConnectionManager, ChannelWrapper } from 'amqp-connection-manager'; import amqp, { AmqpConnectionManager, ChannelWrapper } from 'amqp-connection-manager';
import { Channel, ConfirmChannel, ConsumeMessage, Options } from 'amqplib'; import { Channel, ConfirmChannel, ConsumeMessage, Options } from 'amqplib';
type ExchangeDef = { interface ExchangeDef {
durable?: boolean, durable?: boolean,
internal?: boolean, internal?: boolean,
autoDelete?: boolean, autoDelete?: boolean,
type?: 'direct' | 'topic' | 'headers' | 'fanout' | 'match' | string, type?: 'direct' | 'topic' | 'headers' | 'fanout' | 'match',
arguments?: object arguments?: object
} }
type QueueDef = { interface QueueDef {
durable?: boolean, durable?: boolean,
messageTtl?: number, messageTtl?: number,
exclusive?: boolean, exclusive?: boolean,
@ -22,22 +22,18 @@ type QueueDef = {
maxPriority?: number maxPriority?: number
} }
export type BrokerOptions = { export interface BrokerOptions {
load?: boolean, load?: boolean,
host: string, host: string,
port: number port: number
user: string, user: string,
pass: string, pass: string,
vhost: string, vhost: string,
exchanges?: { exchanges?: Record<string, ExchangeDef>,
[key: string]: ExchangeDef queues?: Record<string, QueueDef>
},
queues?: {
[key: string]: QueueDef
}
} }
type InternalMessage = { interface InternalMessage {
properties: object, properties: object,
content: object content: object
} }
@ -54,23 +50,21 @@ type InternalQueueMsg = {
type Consumer<T = unknown> = (content: T, msg: ConsumeMessage) => Promise<void> | void type Consumer<T = unknown> = (content: T, msg: ConsumeMessage) => Promise<void> | void
type Subscriber<T = unknown> = (content: T, msg: ConsumeMessage) => Promise<void> | void type Subscriber<T = unknown> = (content: T, msg: ConsumeMessage) => Promise<void> | void
export type SubscriptionOptions = { export interface SubscriptionOptions {
exchangeType?: 'direct' | 'topic' | 'headers' | 'fanout' | 'match', exchangeType?: 'direct' | 'topic' | 'headers' | 'fanout' | 'match',
routingKey?: string routingKey?: string
} }
class MessageBroker class MessageBroker
{ {
// Broker definitions
#load: boolean; #load: boolean;
#hosts: string[]; #hosts: string[];
#port: number; #port: number;
#username: string; #username: string;
#password: string; #password: string;
#vhost: string; #vhost: string;
#exchanges: { [key: string]: ExchangeDef }; #exchanges: Record<string, ExchangeDef>;
#queues: {[key: string]: QueueDef}; #queues: Record<string, QueueDef>;
// Wrapper related // Wrapper related
#connection: AmqpConnectionManager | null; #connection: AmqpConnectionManager | null;
@ -86,9 +80,8 @@ class MessageBroker
#_qQueue: InternalQueueMsg[]; #_qQueue: InternalQueueMsg[];
#_qTO: NodeJS.Timeout | null; #_qTO: NodeJS.Timeout | null;
constructor (server: IServer, options: BrokerOptions) constructor (server: IServer, options: BrokerOptions)
{ {
this.#load = options.load ?? true; this.#load = options.load ?? true;
this.#hosts = options.host.split(','); this.#hosts = options.host.split(',');
this.#username = options.user; this.#username = options.user;
@ -96,10 +89,10 @@ class MessageBroker
this.#vhost = options.vhost || ''; this.#vhost = options.vhost || '';
this.#port = options.port; this.#port = options.port;
if (!this.#hosts.length) if (!this.#hosts.length)
throw new Error('Missing hosts configuration'); throw new Error('Missing hosts configuration');
if (!options.port) if (!options.port)
throw new Error('Missing port option'); throw new Error('Missing port option');
this.#exchanges = options.exchanges || {}; this.#exchanges = options.exchanges || {};
@ -119,10 +112,9 @@ class MessageBroker
this.#_pQueue = []; this.#_pQueue = [];
this.#_qQueue = []; this.#_qQueue = [];
this.#_qTO = null; this.#_qTO = null;
} }
async init () async init ()
{ {
if (!this.#load) if (!this.#load)
return this.#logger?.info('Not loading RabbitMQ'); return this.#logger?.info('Not loading RabbitMQ');
@ -130,30 +122,30 @@ class MessageBroker
this.#logger?.info('Initialising message broker'); this.#logger?.info('Initialising message broker');
const credentials = this.#username ? `${this.#username}:${this.#password}@` : ''; const credentials = this.#username ? `${this.#username}:${this.#password}@` : '';
const connectionStrings = this.#hosts.map(host => `amqp://${credentials}${host}:${this.#port}/${this.#vhost}`); const connectionStrings = this.#hosts.map(host => `amqp://${credentials}${host}:${this.#port}/${this.#vhost}`);
this.#connection = await amqp.connect(connectionStrings); this.#connection = amqp.connect(connectionStrings);
this.#connection.on('disconnect', async ({ err }) => this.#connection.on('disconnect', async ({ err }) =>
{ {
this.#logger?.status(`Disconnected: ${err.message}`); this.#logger?.status(`Disconnected: ${err.message}`);
await this.#channel?.close(); await this.#channel?.close();
this.#channel = null; this.#channel = null;
}); });
this.#connection.on('blocked', ({ reason }) => this.#connection.on('blocked', ({ reason }) =>
{ {
this.#logger?.status(`Blocked: ${reason}`); this.#logger?.status(`Blocked: ${reason}`);
}); });
this.#connection.on('connectFailed', ({ err }) => this.#connection.on('connectFailed', ({ err }) =>
{ {
this.#logger?.error(`Message broker failed to connect: ${err.stack || err.message}`); this.#logger?.error(`Message broker failed to connect: ${err.stack || err.message}`);
}); });
this.#connection.on('connect', async ({ url }) => this.#connection.on('connect', async ({ url }) =>
{ {
this.#logger?.status(`Message broker connected to ${url}`); this.#logger?.status(`Message broker connected to ${url}`);
}); });
await new Promise<void>((resolve, reject) => await new Promise<void>((resolve, reject) =>
{ {
this.#connection?.once('connect', () => this.#connection?.once('connect', () =>
{ {
this.#connection?.removeListener('connectFailed', reject); this.#connection?.removeListener('connectFailed', reject);
resolve(); resolve();
@ -163,48 +155,53 @@ class MessageBroker
await this.createChannel(); await this.createChannel();
this.#connection.on('connect', this.createChannel.bind(this)); this.#connection.on('connect', this.createChannel.bind(this));
} }
async close () async close ()
{ {
if (this.#channel) this.#logger?.status('Closing broker');
if (this.#channel)
{ {
await this.#channel.close(); this.#logger?.info('Closing channel');
this.#channel.removeAllListeners(); this.#channel.removeAllListeners();
await this.#channel.close();
} }
if (this.#connection) if (this.#connection)
{ {
this.#logger?.info('Closing connection');
await this.#connection.close(); await this.#connection.close();
// No clue why the removeAllListeners function isn't exposed from the connection manager, but it exists since it's an EventEmitter // No clue why the removeAllListeners function isn't exposed from the connection manager, but it exists since it's an EventEmitter
// eslint-disable-next-line @typescript-eslint/ban-ts-comment // eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore // @ts-ignore
this.#connection.removeAllListeners(); this.#connection.removeAllListeners();
} }
this.#logger?.status('Broker closed');
} }
async createChannel () async createChannel ()
{ {
const exchanges = Object.entries(this.#exchanges); const exchanges = Object.entries(this.#exchanges);
const queues = Object.entries(this.#queues); const queues = Object.entries(this.#queues);
if (this.#channel) if (!this.#connection)
throw new Error('Broker not connected');
if (this.#channel)
{ {
this.#logger?.debug('Closing old channel'); this.#logger?.debug('Closing old channel');
this.#channel.removeAllListeners();
await this.#channel.close().catch(err => this.#logger?.error(err.stack)); await this.#channel.close().catch(err => this.#logger?.error(err.stack));
} }
if (!this.#connection)
throw new Error();
this.#logger?.debug('Creating channel'); this.#logger?.debug('Creating channel');
this.#channel = this.#connection.createChannel({ this.#channel = this.#connection.createChannel({
setup: async (channel: Channel | ConfirmChannel) => setup: async (channel: Channel | ConfirmChannel) =>
{ {
for (const [ name, props ] of exchanges) for (const [ name, props ] of exchanges)
{ {
await channel.assertExchange(name, props.type ?? 'fanout', props); await channel.assertExchange(name, props.type ?? 'fanout', props);
} }
for (const [ name, props ] of queues) for (const [ name, props ] of queues)
{ {
await channel.assertQueue(name, props); await channel.assertQueue(name, props);
} }
@ -215,11 +212,13 @@ class MessageBroker
this.#channel.on('connect', () => this.#logger?.status('Channel connected')); this.#channel.on('connect', () => this.#logger?.status('Channel connected'));
this.#channel.on('error', (err, info) => this.#logger?.error(`${info.name}: ${err.stack}`)); this.#channel.on('error', (err, info) => this.#logger?.error(`${info.name}: ${err.stack}`));
await new Promise<void>((resolve, reject) => await new Promise<void>((resolve, reject) =>
{ {
if (!this.#channel) if (!this.#channel)
return reject(new Error('Missing channel?')); {
this.#channel.once('connect', () => reject(new Error('Missing channel?')); return;
}
this.#channel.once('connect', () =>
{ {
this.#channel?.removeListener('error', reject); this.#channel?.removeListener('error', reject);
resolve(); resolve();
@ -236,7 +235,7 @@ class MessageBroker
/** /**
* Consume queue * Consume queue
*/ */
async consume<T = unknown> (queue: string, consumer: Consumer<T>, options?: Options.Consume) async consume<T = unknown> (queue: string, consumer: Consumer<T>, options?: Options.Consume)
{ {
if (!this.#channel) if (!this.#channel)
throw new Error('Channel does not exist'); throw new Error('Channel does not exist');
@ -249,12 +248,13 @@ class MessageBroker
this.#consumers.set(queue, list); this.#consumers.set(queue, list);
} }
private async _consume<T> (queue: string, consumer: Consumer<T>, options?: Options.Consume): Promise<void> private async _consume<T> (queue: string, consumer: Consumer<T>, options?: Options.Consume): Promise<void>
{ {
if (!this.#channel) if (!this.#channel)
return Promise.reject(new Error('Channel doesn\'t exist')); return Promise.reject(new Error('Channel doesn\'t exist'));
await this.#channel.consume(queue, async (msg: ConsumeMessage) => await this.#channel.consume(queue, async (msg: ConsumeMessage) =>
{ {
this.#logger?.debug(`Consumer for ${queue} (${consumer.name}) fired`);
if (msg.content) if (msg.content)
await consumer(JSON.parse(msg.content.toString()), msg); await consumer(JSON.parse(msg.content.toString()), msg);
this.#channel?.ack(msg); this.#channel?.ack(msg);
@ -264,7 +264,7 @@ class MessageBroker
/** /**
* Subscribe to exchange, ensures messages aren't lost by binding it to a queue * Subscribe to exchange, ensures messages aren't lost by binding it to a queue
*/ */
async subscribe<T> (name: string, subscriber: Subscriber<T>, options: SubscriptionOptions = {}) async subscribe<T> (name: string, subscriber: Subscriber<T>, options: SubscriptionOptions = {})
{ {
if (!this.#channel) if (!this.#channel)
throw new Error('Channel does not exist'); throw new Error('Channel does not exist');
@ -276,21 +276,21 @@ class MessageBroker
const list = this.#subscribers.get(name) ?? []; const list = this.#subscribers.get(name) ?? [];
list.push({ subscriber, options }); list.push({ subscriber, options });
this.#subscribers.set(name, list); this.#subscribers.set(name, list);
} }
private async _subscribe<T> (name: string, listener: Subscriber<T>, options: SubscriptionOptions): Promise<void> private async _subscribe<T> (name: string, listener: Subscriber<T>, options: SubscriptionOptions): Promise<void>
{ {
if (!this.#channel) if (!this.#channel)
return Promise.reject(new Error('Channel doesn\'t exist')); return Promise.reject(new Error('Channel doesn\'t exist'));
const type = options.exchangeType ?? 'fanout'; const type = options.exchangeType ?? 'fanout';
await this.#channel.assertExchange(name, type, { durable: true }); await this.#channel.assertExchange(name, type, { durable: true });
const queue = await this.#channel.assertQueue('', { exclusive: true }); const queue = await this.#channel.assertQueue('', { exclusive: true });
await this.#channel.bindQueue(queue.queue, name, options.routingKey ?? ''); await this.#channel.bindQueue(queue.queue, name, options.routingKey ?? '');
await this.#channel.consume(queue.queue, async (msg) => await this.#channel.consume(queue.queue, async (msg) =>
{ {
this.#logger?.debug(`Subscriber for ${name} (${listener.name}) fired`);
if (msg.content && msg.content.toString().startsWith('{')) if (msg.content && msg.content.toString().startsWith('{'))
await listener(JSON.parse(msg.content.toString()), msg); await listener(JSON.parse(msg.content.toString()), msg);
else else
@ -302,23 +302,27 @@ class MessageBroker
} }
// Add item to queue // Add item to queue
async enqueue (queue: string, content: object, headers?: object): Promise<void | number> async enqueue (queue: string, content: object, headers?: object): Promise<void | number>
{ {
this.#logger?.debug(`Adding to queue ${queue}`);
const properties = { const properties = {
persistent: true, persistent: true,
contentType: 'application/json', contentType: 'application/json',
headers headers
}; };
// The channel is null while the failover is occurring, so enqueue messages for publishing once the connection is restored // The channel is null while the failover is occurring, so enqueue messages for publishing once the connection is restored
if (!this.#channel) if (!this.#channel)
{
this.#logger?.debug('Channel is missing, adding to internal queue');
return this.#_qQueue.push({ queue, content, properties }); return this.#_qQueue.push({ queue, content, properties });
}
try try
{ {
await this.#channel.sendToQueue(queue, Buffer.from(JSON.stringify(content)), properties); await this.#channel.sendToQueue(queue, Buffer.from(JSON.stringify(content)), properties);
} }
catch (_) catch (_)
{ {
this.#_qQueue.push({ queue, content, properties }); this.#_qQueue.push({ queue, content, properties });
if (!this.#_qTO) if (!this.#_qTO)
@ -327,8 +331,9 @@ class MessageBroker
} }
// Publish to exchange // Publish to exchange
async publish (exchange: string, content: object, { headers, routingKey = '' }: {headers?: string, routingKey?: string} = {}): Promise<void|number> async publish (exchange: string, content: object, { headers, routingKey = '' }: {headers?: string, routingKey?: string} = {}): Promise<void|number>
{ {
this.#logger?.debug(`Publishing to ${exchange}`);
const properties = { const properties = {
contentType: 'application/json', contentType: 'application/json',
headers headers
@ -336,13 +341,16 @@ class MessageBroker
// The channel is null while the failover is occurring, so enqueue messages for publishing once the connection is restored // The channel is null while the failover is occurring, so enqueue messages for publishing once the connection is restored
if (!this.#channel) if (!this.#channel)
{
this.#logger?.debug('Channel is missing, adding to internal queue');
return this.#_pQueue.push({ exchange, content, routingKey, properties }); return this.#_pQueue.push({ exchange, content, routingKey, properties });
}
try try
{ {
await this.#channel.publish(exchange, routingKey, Buffer.from(JSON.stringify(content)), properties); await this.#channel.publish(exchange, routingKey, Buffer.from(JSON.stringify(content)), properties);
} }
catch (err) catch (err)
{ {
const error = err as Error; const error = err as Error;
if (!error.message.includes('nack')) if (!error.message.includes('nack'))
@ -353,14 +361,14 @@ class MessageBroker
} }
} }
assertExchange (exchange: string, props?: ExchangeDef) assertExchange (exchange: string, props?: ExchangeDef)
{ {
if (!this.#channel) if (!this.#channel)
throw new Error('Channel doesn\'t exist'); throw new Error('Channel doesn\'t exist');
return this.#channel.assertExchange(exchange, props?.type ?? 'fanout', props); return this.#channel.assertExchange(exchange, props?.type ?? 'fanout', props);
} }
assertQueue (queue: string, opts?: QueueDef) assertQueue (queue: string, opts?: QueueDef)
{ {
if (!this.#channel) if (!this.#channel)
throw new Error('Channel doesn\'t exist'); throw new Error('Channel doesn\'t exist');
@ -368,14 +376,14 @@ class MessageBroker
} }
// Processes messages queued up while the broker was unreachable // Processes messages queued up while the broker was unreachable
async #_processQueues () async #_processQueues ()
{ {
if (!this.#channel) if (!this.#channel)
throw new Error('Channel doesn\'t exist'); throw new Error('Channel doesn\'t exist');
this.#logger?.status('Processing queues of unsent messages'); this.#logger?.status('Processing queues of unsent messages');
const pQ = [ ...this.#_pQueue ]; const pQ = [ ...this.#_pQueue ];
this.#_pQueue = []; this.#_pQueue = [];
for (const msg of pQ) for (const msg of pQ)
{ {
const { exchange, content, routingKey, properties } = msg; const { exchange, content, routingKey, properties } = msg;
const result = await this.#channel.publish(exchange, routingKey, Buffer.from(JSON.stringify(content)), properties) const result = await this.#channel.publish(exchange, routingKey, Buffer.from(JSON.stringify(content)), properties)
@ -384,7 +392,7 @@ class MessageBroker
this.#_pQueue.push(msg); this.#_pQueue.push(msg);
} }
const qQ = [ ...this.#_qQueue ]; const qQ = [ ...this.#_qQueue ];
for (const msg of qQ) for (const msg of qQ)
{ {
const { queue, content, properties } = msg; const { queue, content, properties } = msg;
const result = await this.#channel.sendToQueue(queue, Buffer.from(JSON.stringify(content)), properties).catch(() => null); const result = await this.#channel.sendToQueue(queue, Buffer.from(JSON.stringify(content)), properties).catch(() => null);
@ -392,30 +400,30 @@ class MessageBroker
this.#_qQueue.push(msg); this.#_qQueue.push(msg);
} }
this.#logger?.status('Done processing'); this.#logger?.status('Done processing');
if (this.#_qTO) if (this.#_qTO)
{ {
clearTimeout(this.#_qTO); clearTimeout(this.#_qTO);
this.#_qTO = null; this.#_qTO = null;
} }
} }
async #_restoreListeners () async #_restoreListeners ()
{ {
this.#logger?.status(`Restoring ${this.#consumers.size} consumers`); this.#logger?.status(`Restoring ${this.#consumers.size} consumers`);
for (const [ name, list ] of this.#consumers) for (const [ name, list ] of this.#consumers)
{ {
this.#logger?.debug(`Processing consumer ${name}: ${list.length}`); this.#logger?.debug(`Processing consumer ${name}: ${list.length}`);
for (const { consumer, options } of list) for (const { consumer, options } of list)
{ {
await this._consume(name, consumer, options); await this._consume(name, consumer, options);
} }
} }
this.#logger?.status(`Restoring ${this.#subscribers.size} subscribers`); this.#logger?.status(`Restoring ${this.#subscribers.size} subscribers`);
for (const [ name, list ] of this.#subscribers) for (const [ name, list ] of this.#subscribers)
{ {
this.#logger?.debug(`Processing subscriber ${name}: ${list.length}`); this.#logger?.debug(`Processing subscriber ${name}: ${list.length}`);
for (const { subscriber, options } of list) for (const { subscriber, options } of list)
{ {
await this._subscribe(name, subscriber, options); await this._subscribe(name, subscriber, options);
} }

View File

@ -2,7 +2,7 @@ import { inspect } from 'node:util';
import { MongoClient, MongoClientOptions, Db, Document, WithId, ObjectId, Filter, IndexSpecification, CreateIndexesOptions, FindOptions, ModifyResult, DistinctOptions } from 'mongodb'; import { MongoClient, MongoClientOptions, Db, Document, WithId, ObjectId, Filter, IndexSpecification, CreateIndexesOptions, FindOptions, ModifyResult, DistinctOptions } from 'mongodb';
import { IServer, ILogger } from './interfaces/index.js'; import { IServer, ILogger } from './interfaces/index.js';
type Credentials = { interface Credentials {
URI?: string, URI?: string,
user?: string, user?: string,
password?: string, password?: string,
@ -12,12 +12,12 @@ type Credentials = {
authDb?: string authDb?: string
} }
type MongoQuery = { interface MongoQuery {
_id?: unknown, _id?: unknown,
[key: string]: unknown [key: string]: unknown
} }
export type MongoOptions = { export interface MongoOptions {
credentials: Credentials, credentials: Credentials,
loggerOptions?: object, loggerOptions?: object,
client?: MongoClientOptions, client?: MongoClientOptions,
@ -26,13 +26,13 @@ export type MongoOptions = {
metricsBuckets?: number[], metricsBuckets?: number[],
} }
type StringIndexable = {[key: string]: boolean | string | number | Document | object} type StringIndexable = Record<string, boolean | string | number | Document | object>;
const objIsSubset = (superObj: StringIndexable, subObj: StringIndexable): boolean => const objIsSubset = (superObj: StringIndexable, subObj: StringIndexable): boolean =>
{ {
return Object.keys(subObj).every(ele => return Object.keys(subObj).every(ele =>
{ {
if (typeof subObj[ele] === 'object' && typeof superObj[ele] === 'object') if (typeof subObj[ele] === 'object' && typeof superObj[ele] === 'object')
{ {
return objIsSubset(superObj[ele] as StringIndexable, subObj[ele] as StringIndexable); return objIsSubset(superObj[ele] as StringIndexable, subObj[ele] as StringIndexable);
} }
@ -45,7 +45,7 @@ const objIsSubset = (superObj: StringIndexable, subObj: StringIndexable): boolea
* *
* @class MongoDB * @class MongoDB
*/ */
class MongoDB class MongoDB
{ {
#_database: string; #_database: string;
@ -59,7 +59,7 @@ class MongoDB
static #queryHistogram?: { startTimer: (labels: object) => () => void }; static #queryHistogram?: { startTimer: (labels: object) => () => void };
#server: IServer; #server: IServer;
constructor (server: IServer, config: MongoOptions) constructor (server: IServer, config: MongoOptions)
{ {
if (!server) if (!server)
throw new Error('Missing reference to server!'); throw new Error('Missing reference to server!');
@ -67,7 +67,7 @@ class MongoDB
throw new Error('No config options provided!'); throw new Error('No config options provided!');
const { user, password, host, port, database, URI, authDb } = config.credentials; const { user, password, host, port, database, URI, authDb } = config.credentials;
if ((!host?.length || !port || !database?.length) && !URI) if ((!host?.length || !port || !database.length) && !URI)
throw new Error('Must provide host, port, and database OR URI parameters!'); throw new Error('Must provide host, port, and database OR URI parameters!');
this.#config = config; this.#config = config;
@ -79,20 +79,20 @@ class MongoDB
this.#logger = server.createLogger(this, config.loggerOptions); this.#logger = server.createLogger(this, config.loggerOptions);
this.#server = server; this.#server = server;
if (URI) if (URI)
{ {
this.#URI = URI; this.#URI = URI;
} }
else else
{ {
let AUTH_DB = authDb; let AUTH_DB = authDb;
const auth = user ? `${user}:${password}@` : ''; const auth = user ? `${user}:${password}@` : '';
if (!AUTH_DB && auth) if (!AUTH_DB && auth)
{ {
this.#logger?.warn('No explicit auth db provided with MONGO_AUTH_DB, will attempt to use MONGO_DB for auth source'); this.#logger?.warn('No explicit auth db provided with MONGO_AUTH_DB, will attempt to use MONGO_DB for auth source');
AUTH_DB = authDb; AUTH_DB = authDb;
} }
else if (!auth) else if (!auth)
{ {
this.#logger?.warn('No auth provided, proceeding without'); this.#logger?.warn('No auth provided, proceeding without');
} }
@ -109,12 +109,12 @@ class MongoDB
.on('open', () => this.#logger?.info('MongoDB client connected')); .on('open', () => this.#logger?.info('MongoDB client connected'));
} }
get database () get database ()
{ {
return this.#_database; return this.#_database;
} }
get client () get client ()
{ {
return this.#_client; return this.#_client;
} }
@ -124,7 +124,7 @@ class MongoDB
* *
* @memberof MongoDB * @memberof MongoDB
*/ */
async init () async init ()
{ {
if (!this.#load) if (!this.#load)
return this.#logger?.info('Not loading MongoDB'); return this.#logger?.info('Not loading MongoDB');
@ -149,9 +149,9 @@ class MongoDB
{ {
this.#logger?.info('Setting up metric recording'); this.#logger?.info('Setting up metric recording');
MongoDB.#queryHistogram = new Prometheus.Histogram({ MongoDB.#queryHistogram = new Prometheus.Histogram({
name: 'mongo_queries', name: 'mongo_queries',
help: 'Tracks query duration in seconds and frequency', help: 'Tracks query duration in seconds and frequency',
buckets: this.#config.metricsBuckets ?? Prometheus?.exponentialBuckets(0.005, 2, 10), buckets: this.#config.metricsBuckets ?? Prometheus?.exponentialBuckets(0.005, 2, 10),
labelNames: [ 'type' ] as const labelNames: [ 'type' ] as const
}); });
if (this.#server.registerMetric) if (this.#server.registerMetric)
@ -162,7 +162,7 @@ class MongoDB
return this; return this;
} }
async close () async close ()
{ {
if (!this.#db) if (!this.#db)
return; return;
@ -173,7 +173,7 @@ class MongoDB
this.#logger?.status('Database closed'); this.#logger?.status('Database closed');
} }
get mongoClient () get mongoClient ()
{ {
return this.#_client; return this.#_client;
} }
@ -186,7 +186,7 @@ class MongoDB
* @returns {Array} An array containing the corresponding objects for the query * @returns {Array} An array containing the corresponding objects for the query
* @memberof Database * @memberof Database
*/ */
async find<T extends Document> (db: string, query: MongoQuery, options?: FindOptions<T>): Promise<WithId<T>[]> async find<T extends Document> (db: string, query: MongoQuery, options?: FindOptions<T>): Promise<WithId<T>[]>
{ {
if (!this.#db) if (!this.#db)
throw new Error('MongoDB not connected'); throw new Error('MongoDB not connected');
@ -215,7 +215,7 @@ class MongoDB
* @returns {Object} An object containing the queried data * @returns {Object} An object containing the queried data
* @memberof Database * @memberof Database
*/ */
async findOne<T extends Document> (db: string, query: MongoQuery, options: FindOptions<T> = {}): Promise<WithId<T> | null> async findOne<T extends Document> (db: string, query: MongoQuery, options: FindOptions<T> = {}): Promise<WithId<T> | null>
{ {
if (!this.#db) if (!this.#db)
throw new Error('MongoDB not connected'); throw new Error('MongoDB not connected');
@ -242,7 +242,7 @@ class MongoDB
* @returns {WriteResult} Object containing the followint counts: Matched, Upserted, Modified * @returns {WriteResult} Object containing the followint counts: Matched, Upserted, Modified
* @memberof Database * @memberof Database
*/ */
async updateMany<T extends Document> (db: string, filter: MongoQuery, data: T, upsert = false) async updateMany<T extends Document> (db: string, filter: MongoQuery, data: T, upsert = false)
{ {
if (!this.#db) if (!this.#db)
throw new Error('MongoDB not connected'); throw new Error('MongoDB not connected');
@ -270,7 +270,7 @@ class MongoDB
* @returns {WriteResult} Object containing the followint counts: Matched, Upserted, Modified * @returns {WriteResult} Object containing the followint counts: Matched, Upserted, Modified
* @memberof Database * @memberof Database
*/ */
async updateOne (db: string, filter: MongoQuery, data: Document, upsert = false) async updateOne (db: string, filter: MongoQuery, data: Document, upsert = false)
{ {
if (!this.#db) if (!this.#db)
throw new Error('MongoDB not connected'); throw new Error('MongoDB not connected');
@ -296,7 +296,7 @@ class MongoDB
* @returns {WriteResult} Object containing the followint counts: Matched, Upserted, Modified * @returns {WriteResult} Object containing the followint counts: Matched, Upserted, Modified
* @memberof Database * @memberof Database
*/ */
async insertOne (db: string, data: Document) async insertOne (db: string, data: Document)
{ {
if (!this.#db) if (!this.#db)
throw new Error('MongoDB not connected'); throw new Error('MongoDB not connected');
@ -313,7 +313,7 @@ class MongoDB
return result; return result;
} }
async deleteOne (db: string, filter: Document) async deleteOne (db: string, filter: Document)
{ {
if (!this.#db) if (!this.#db)
throw new Error('MongoDB not connected'); throw new Error('MongoDB not connected');
@ -358,7 +358,7 @@ class MongoDB
// eslint-disable-next-line @typescript-eslint/ban-ts-comment // eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore // @ts-ignore
filter._id = new ObjectId(filter._id); filter._id = new ObjectId(filter._id);
const endTimer = MongoDB.#queryHistogram?.startTimer({ type: 'distinct' }); const endTimer = MongoDB.#queryHistogram?.startTimer({ type: 'distinct' });
const result = await this.#db.collection<T>(db).distinct(key, filter, options); const result = await this.#db.collection<T>(db).distinct(key, filter, options);
if (endTimer) if (endTimer)
@ -376,7 +376,7 @@ class MongoDB
* @returns * @returns
* @memberof Database * @memberof Database
*/ */
async push (db: string, filter: Document, data: object, upsert = false) async push (db: string, filter: Document, data: object, upsert = false)
{ {
if (!this.#db) if (!this.#db)
throw new Error('MongoDB not connected'); throw new Error('MongoDB not connected');
@ -402,7 +402,7 @@ class MongoDB
* @returns {object} * @returns {object}
* @memberof Database * @memberof Database
*/ */
random<T extends Document> (db: string, filter: Document = {}, amount = 1) random<T extends Document> (db: string, filter: Document = {}, amount = 1)
{ {
if (!this.#db) if (!this.#db)
throw new Error('MongoDB not connected'); throw new Error('MongoDB not connected');
@ -420,7 +420,7 @@ class MongoDB
return cursor.toArray(); return cursor.toArray();
} }
stats (options = {}) stats (options = {})
{ {
if (!this.#db) if (!this.#db)
throw new Error('MongoDB not connected'); throw new Error('MongoDB not connected');
@ -428,14 +428,14 @@ class MongoDB
return result; return result;
} }
collection<T extends Document> (coll: string) collection<T extends Document> (coll: string)
{ {
if (!this.#db) if (!this.#db)
throw new Error('MongoDB not connected'); throw new Error('MongoDB not connected');
return this.#db.collection<T>(coll); return this.#db.collection<T>(coll);
} }
count (coll: string, query: Document) count (coll: string, query: Document)
{ {
if (!this.#db) if (!this.#db)
throw new Error('MongoDB not connected'); throw new Error('MongoDB not connected');
@ -448,7 +448,7 @@ class MongoDB
return Promise.reject(new Error('MongoDB not connected')); return Promise.reject(new Error('MongoDB not connected'));
if (!(index instanceof Array)) if (!(index instanceof Array))
index = [ index ]; index = [ index ];
const collections = await this.#db.collections(); const collections = await this.#db.collections();
if (!collections.some((coll) => coll.namespace.split('.')[1] === collection)) if (!collections.some((coll) => coll.namespace.split('.')[1] === collection))
await this.#db.createCollection(collection); await this.#db.createCollection(collection);
@ -461,7 +461,7 @@ class MongoDB
if (existing) if (existing)
await this.#db.collection(collection).dropIndex(existing.name); await this.#db.collection(collection).dropIndex(existing.name);
try try
{ {
await this.#db.collection(collection).createIndex(index, options); await this.#db.collection(collection).createIndex(index, options);
} }
@ -475,11 +475,11 @@ class MongoDB
{ {
if (!this.#db) if (!this.#db)
return Promise.reject(new Error('MongoDB not connected')); return Promise.reject(new Error('MongoDB not connected'));
for (const index of indices) for (const index of indices)
await this.ensureIndex(collection, index, options); await this.ensureIndex(collection, index, options);
} }
#indexesEqual (existing: Document, options?: CreateIndexesOptions & StringIndexable) #indexesEqual (existing: Document, options?: CreateIndexesOptions & StringIndexable)
{ {
// 3 keys on the existing means that only the name was given // 3 keys on the existing means that only the name was given
if (!options && Object.keys(existing).length === 3) if (!options && Object.keys(existing).length === 3)
@ -488,11 +488,11 @@ class MongoDB
return false; return false;
const keys = Object.keys(options); const keys = Object.keys(options);
for (const key of keys) for (const key of keys)
{ {
if (typeof options[key] !== typeof existing[key]) if (typeof options[key] !== typeof existing[key])
return false; return false;
if (typeof options[key] === 'object' && !objIsSubset(existing, options)) if (typeof options[key] === 'object' && !objIsSubset(existing, options))
return false; return false;
else if (options[key] !== existing[key]) else if (options[key] !== existing[key])
@ -500,7 +500,7 @@ class MongoDB
} }
return true; return true;
} }
} }

View File

@ -3,20 +3,20 @@ import { MessageBroker } from '../build/esm/index.js';
const credentials = JSON.parse(readFileSync('../brokerCreds.json', { encoding: 'utf-8' })); const credentials = JSON.parse(readFileSync('../brokerCreds.json', { encoding: 'utf-8' }));
const broker = new MessageBroker({ const broker = new MessageBroker({
createLogger: () => createLogger: () =>
{ {
return { return {
debug: console.log, debug: console.log,
info: console.log, info: console.log,
status: console.log, status: console.log,
warn: console.log, warn: console.log,
error: console.error error: console.error
}; };
} }
}, credentials); }, credentials);
await broker.init(); await broker.init();
broker.subscribe('db_cluster_status', (message) => broker.subscribe('db_cluster_status', (message) =>
{ {
if (message instanceof Buffer) if (message instanceof Buffer)
console.log(message.toString()); console.log(message.toString());

View File

@ -5,57 +5,57 @@ import { readFileSync } from 'fs';
const credentials = JSON.parse(readFileSync('./credentials.json', { encoding: 'utf-8' })); const credentials = JSON.parse(readFileSync('./credentials.json', { encoding: 'utf-8' }));
const maria = new MariaDB({ const maria = new MariaDB({
createLogger: () => createLogger: () =>
{ {
return { return {
debug: console.log, debug: console.log,
info: console.log, info: console.log,
status: console.log, status: console.log,
warn: console.log, warn: console.log,
error: console.error error: console.error
}; };
} }
}, { }, {
load: true, load: true,
credentials, credentials,
cluster: { cluster: {
canRetry: true, canRetry: true,
removeNodeErrorCount: 5, removeNodeErrorCount: 5,
restoreNodeTimeout: 60000, restoreNodeTimeout: 60000,
defaultSelector: 'RR' defaultSelector: 'RR'
} }
}); });
const broker = new MessageBroker({ const broker = new MessageBroker({
createLogger: () => createLogger: () =>
{ {
return { return {
debug: console.log, debug: console.log,
info: console.log, info: console.log,
status: console.log, status: console.log,
warn: console.log, warn: console.log,
error: console.error error: console.error
}; };
} }
}, { }, {
load: true, load: true,
host: 'rabbitmq-01.stylis.local', host: 'rabbitmq-01.stylis.local',
user: 'stylis', user: 'stylis',
pass: 'RrwJyrfeXFMimDH3hjZ5xSreMAmXtQJj', pass: 'RrwJyrfeXFMimDH3hjZ5xSreMAmXtQJj',
vhost: 'development', vhost: 'development',
port: 5672 port: 5672
}); });
await broker.init(); await broker.init();
await maria.init(); await maria.init();
broker.subscribe('db_cluster_status', maria.statusUpdateListener); broker.subscribe('db_cluster_status', maria.statusUpdateListener);
setInterval(async () => setInterval(async () =>
{ {
console.log('Result: ', await maria.query('SHOW STATUS WHERE `Variable_name` = "wsrep_gcomm_uuid" OR `Variable_name` = "wsrep_local_state_comment"', { node: 'maria-t03', errorIfNodeUnavailable: true }).catch(() => null)); console.log('Result: ', await maria.query('SHOW STATUS WHERE `Variable_name` = "wsrep_gcomm_uuid" OR `Variable_name` = "wsrep_local_state_comment"', { node: 'maria-t03', errorIfNodeUnavailable: true }).catch(() => null));
}, 15_000).unref(); }, 15_000).unref();
process.on('SIGINT', async () => process.on('SIGINT', async () =>
{ {
await broker.close(); await broker.close();
await maria.close(); await maria.close();

View File

@ -5,24 +5,24 @@ import { MariaDB } from '../build/esm/index.js';
const credentials = JSON.parse(readFileSync('../credentials.json', { encoding: 'utf-8' })); const credentials = JSON.parse(readFileSync('../credentials.json', { encoding: 'utf-8' }));
const maria = new MariaDB({ const maria = new MariaDB({
createLogger: () => createLogger: () =>
{ {
return { return {
debug: console.log, debug: console.log,
info: console.log, info: console.log,
status: console.log, status: console.log,
warn: console.log, warn: console.log,
error: console.error error: console.error
}; };
} }
}, { }, {
load: true, load: true,
credentials, credentials,
cluster: { cluster: {
canRetry: true, canRetry: true,
removeNodeErrorCount: 5, removeNodeErrorCount: 5,
restoreNodeTimeout: 60000, restoreNodeTimeout: 60000,
defaultSelector: 'RR' defaultSelector: 'RR'
} }
}); });

View File

@ -1,19 +1,19 @@
import { MongoDB } from '../build/esm/index.js'; import { MongoDB } from '../build/esm/index.js';
const mongo = new MongoDB({ const mongo = new MongoDB({
createLogger: () => createLogger: () =>
{ {
return { return {
debug: console.log, debug: console.log,
info: console.log, info: console.log,
status: console.log, status: console.log,
warn: console.log, warn: console.log,
error: console.error error: console.error
}; };
} }
}, { }, {
credentials: { credentials: {
URI: 'mongodb://127.0.0.1:27017', URI: 'mongodb://127.0.0.1:27017',
database: 'framework-proto' database: 'framework-proto'
}, },
load: true load: true