Compare commits

..

8 Commits

Author SHA1 Message Date
b68484d4c5
v1.4.3 2023-07-03 16:33:52 +03:00
24b2bcd3d9
update eslint rules 2023-07-03 16:33:39 +03:00
40b24ee95e
v1.4.2 2023-07-03 16:28:42 +03:00
4ef0a2cbd8
v1.4.1 2023-07-03 16:27:39 +03:00
b95ec26917
v1.4.0 2023-07-03 16:11:56 +03:00
1374b92f8e
v1.3.22 2023-06-18 16:31:30 +03:00
191e9e77b2
fixes 2023-06-18 16:30:58 +03:00
1fcbf949cc
v1.3.21 2023-05-18 16:00:18 +03:00
10 changed files with 404 additions and 166 deletions

View File

@ -15,17 +15,38 @@
"sourceType": "module" "sourceType": "module"
}, },
"rules": { "rules": {
"@typescript-eslint/no-unused-vars": "off",
"@typescript-eslint/no-non-null-assertion":"off",
"accessor-pairs": "warn", "accessor-pairs": "warn",
"array-callback-return": "warn", "array-callback-return": "warn",
"array-bracket-newline": [ "warn", "consistent" ], "array-bracket-newline": [
"array-bracket-spacing": [ "warn", "always", { "objectsInArrays": false, "arraysInArrays": false }], "warn",
"consistent"
],
"array-bracket-spacing": [
"warn",
"always",
{
"objectsInArrays": false,
"arraysInArrays": false
}
],
"arrow-spacing": "warn", "arrow-spacing": "warn",
"block-scoped-var": "warn", "block-scoped-var": "warn",
"block-spacing": [ "warn", "always" ], "block-spacing": [
"brace-style": [ "warn", "1tbs" ], "warn",
"always"
],
"brace-style": [
"warn",
"allman"
],
"callback-return": "warn", "callback-return": "warn",
"camelcase": "warn", "camelcase": "warn",
"comma-dangle": [ "warn", "only-multiline" ], "comma-dangle": [
"warn",
"only-multiline"
],
"comma-spacing": [ "comma-spacing": [
"warn", "warn",
{ {
@ -68,9 +89,25 @@
"implicit-arrow-linebreak": "warn", "implicit-arrow-linebreak": "warn",
"indent": "warn", "indent": "warn",
"init-declarations": "warn", "init-declarations": "warn",
"jsx-quotes": [ "warn", "prefer-single" ], "quotes": ["error" , "single"],
"key-spacing": [ "warn", { "beforeColon": false, "afterColon": true }], "jsx-quotes": [
"keyword-spacing": [ "warn", { "after": true, "before": true }], "warn",
"prefer-single"
],
"key-spacing": [
"warn",
{
"beforeColon": false,
"afterColon": true
}
],
"keyword-spacing": [
"warn",
{
"after": true,
"before": true
}
],
"linebreak-style": [ "linebreak-style": [
"error", "error",
"unix" "unix"
@ -79,10 +116,28 @@
"lines-around-directive": "warn", "lines-around-directive": "warn",
"max-classes-per-file": "warn", "max-classes-per-file": "warn",
"max-nested-callbacks": "warn", "max-nested-callbacks": "warn",
"max-len": [
"warn",
{
"code": 140,
"ignoreComments": true,
"ignoreStrings": true,
"ignoreTemplateLiterals": true,
"ignoreRegExpLiterals": true
}
],
"max-lines-per-function": [
"warn",
100
],
"max-depth": [
"warn",
3
],
"new-parens": "warn", "new-parens": "warn",
"no-alert": "warn", "no-alert": "warn",
"no-array-constructor": "warn", "no-array-constructor": "warn",
"no-bitwise": "warn", // "no-bitwise": "warn",
"no-buffer-constructor": "warn", "no-buffer-constructor": "warn",
"no-caller": "warn", "no-caller": "warn",
"no-console": "warn", "no-console": "warn",
@ -96,7 +151,6 @@
"no-extend-native": "warn", "no-extend-native": "warn",
"no-extra-bind": "warn", "no-extra-bind": "warn",
"no-extra-label": "warn", "no-extra-label": "warn",
"no-extra-parens": "warn",
"no-floating-decimal": "warn", "no-floating-decimal": "warn",
"no-implicit-coercion": "warn", "no-implicit-coercion": "warn",
"no-implicit-globals": "warn", "no-implicit-globals": "warn",
@ -147,7 +201,7 @@
"no-unmodified-loop-condition": "warn", "no-unmodified-loop-condition": "warn",
"no-unneeded-ternary": "error", "no-unneeded-ternary": "error",
"no-unused-expressions": "warn", "no-unused-expressions": "warn",
"no-use-before-define": "error", "@typescript-eslint/no-use-before-define": "error",
"no-useless-call": "warn", "no-useless-call": "warn",
"no-useless-computed-key": "warn", "no-useless-computed-key": "warn",
"no-useless-concat": "warn", "no-useless-concat": "warn",
@ -155,20 +209,39 @@
"no-useless-rename": "warn", "no-useless-rename": "warn",
"no-useless-return": "warn", "no-useless-return": "warn",
"no-var": "warn", "no-var": "warn",
"no-void": "warn", // "no-void": "warn",
"no-whitespace-before-property": "error", "no-whitespace-before-property": "error",
"nonblock-statement-body-position": [ "warn", "below" ], "nonblock-statement-body-position": [
"warn",
"below"
],
"object-curly-spacing": [ "object-curly-spacing": [
"warn", "warn",
"always" "always"
], ],
"object-property-newline": [ "warn", { "allowAllPropertiesOnSameLine": true }], "object-property-newline": [
"warn",
{
"allowAllPropertiesOnSameLine": true
}
],
"object-shorthand": "warn", "object-shorthand": "warn",
"one-var-declaration-per-line": "warn", "one-var-declaration-per-line": "warn",
"operator-assignment": "warn", "operator-assignment": "warn",
"operator-linebreak": [ "warn", "before" ], "operator-linebreak": [
"warn",
"before"
],
"padding-line-between-statements": "warn", "padding-line-between-statements": "warn",
"padded-blocks": [ "warn", { "switches": "never" }, { "allowSingleLineBlocks": true }], "padded-blocks": [
"warn",
{
"switches": "never"
},
{
"allowSingleLineBlocks": true
}
],
"prefer-arrow-callback": "warn", "prefer-arrow-callback": "warn",
"prefer-const": "warn", "prefer-const": "warn",
"prefer-destructuring": "warn", "prefer-destructuring": "warn",
@ -189,12 +262,18 @@
"last" "last"
], ],
"space-before-blocks": "warn", "space-before-blocks": "warn",
"space-before-function-paren": [ "error", "always" ], "space-before-function-paren": [
"error",
"always"
],
"space-in-parens": [ "space-in-parens": [
"warn", "warn",
"never" "never"
], ],
"spaced-comment": [ "warn", "always" ], "spaced-comment": [
"warn",
"always"
],
"strict": "warn", "strict": "warn",
"switch-colon-spacing": "warn", "switch-colon-spacing": "warn",
"symbol-description": "warn", "symbol-description": "warn",

View File

@ -1,4 +1,4 @@
export { MessageBroker, BrokerOptions } from "./src/MessageBroker.js"; export { MessageBroker, BrokerOptions } from './src/MessageBroker.js';
export { MariaDB, MariaOptions } from './src/MariaDB.js'; export { MariaDB, MariaOptions } from './src/MariaDB.js';
export { MongoDB, MongoOptions } from './src/MongoDB.js'; export { MongoDB, MongoOptions } from './src/MongoDB.js';
export { ObjectId, Document, DeleteResult, Collection } from 'mongodb'; export { ObjectId, Document, DeleteResult, Collection } from 'mongodb';

View File

@ -1,6 +1,6 @@
{ {
"name": "@navy.gif/wrappers", "name": "@navy.gif/wrappers",
"version": "1.3.20", "version": "1.4.3",
"description": "Various wrapper classes I use in my projects", "description": "Various wrapper classes I use in my projects",
"repository": "https://git.corgi.wtf/Navy.gif/wrappers.git", "repository": "https://git.corgi.wtf/Navy.gif/wrappers.git",
"author": "Navy.gif", "author": "Navy.gif",

View File

@ -1,17 +1,18 @@
import { inspect } from 'node:util'; import { inspect } from 'node:util';
import { ILogger, IServer } from './interfaces/index.js'; import { ILogger, IServer } from './interfaces/index.js';
import mysql, { Pool, PoolCluster, PoolClusterConfig, PoolConfig, PoolConnection, FieldInfo } from 'mysql'; import mysql, { PoolCluster, PoolClusterConfig, PoolConfig, PoolConnection, FieldInfo } from 'mysql';
import { LoggerClientOptions } from './interfaces/Logger.js'; import { LoggerClientOptions } from './interfaces/Logger.js';
const SAFE_TO_RETRY = [ 'ER_LOCK_DEADLOCK' ]; const SAFE_TO_RETRY = [ 'ER_LOCK_DEADLOCK', 'PROTOCOL_CONNECTION_LOST' ];
type Credentials = { type Credentials = {
user: string, user: string,
password: string, password: string,
host: string, host: string,
port: number, port: number,
database: string database: string,
node: string
} }
export type MariaOptions = { export type MariaOptions = {
@ -31,19 +32,28 @@ type MariaError = {
sqlMessage:string sqlMessage:string
} & Error } & Error
class MariaDB { type QueryOptions = {
node?: string,
timeout?: number
}
class MariaDB
{
#_activeQueries: number; #_activeQueries: number;
#afterLastQuery: (() => void) | null; #afterLastQuery: (() => void) | null;
#logger: ILogger; #logger: ILogger;
#_ready: boolean; #_ready: boolean;
#load: boolean;
#config: MariaOptions; #config: MariaOptions;
#credentials: Credentials[]; #credentials: Credentials[];
#cluster: boolean; #cluster: boolean;
#pool: PoolCluster | Pool | null; #pool: PoolCluster | null;
#nodes: string[];
constructor (server: IServer, options: MariaOptions) { constructor (server: IServer, options: MariaOptions)
{
if (!server) if (!server)
throw new Error('Missing reference to server!'); throw new Error('Missing reference to server!');
@ -51,11 +61,18 @@ class MariaDB {
throw new Error('No config options provided!'); throw new Error('No config options provided!');
this.#config = options; this.#config = options;
this.#load = options.load ?? true;
const { host, user, port, password, database } = options.credentials; const { host, user, port, password, database } = options.credentials;
const hosts = host.split(','); const hosts = host.split(',');
this.#credentials = []; this.#credentials = [];
for (const remote of hosts) { this.#nodes = [];
this.#credentials.push({ host: remote, user, port, password, database }); for (const remote of hosts)
{
const [ node ] = remote.split('.');
if (typeof node === 'undefined' || !(/(([a-zA-Z]\w*[0-9]?)|([0-9]?\w*[a-zA-Z]))/u).test(node))
throw new Error('Invalid host config, expecting qualified domain names');
this.#credentials.push({ host: remote, user, port, password, database, node });
this.#nodes.push(node);
} }
this.#pool = null; this.#pool = null;
@ -69,42 +86,48 @@ class MariaDB {
} }
get ready () { get ready ()
{
return this.#_ready; return this.#_ready;
} }
get activeQueries () { get activeQueries ()
{
return this.#_activeQueries; return this.#_activeQueries;
} }
async init () { async init ()
{
if (!this.#config.load) if (!this.#load)
return this.#logger.info('Not loading MariaDB'); return this.#logger.info('Not loading MariaDB');
this.#logger.status(`Creating${this.#cluster ? ' cluster' : ''} connection pool`); this.#logger.status(`Creating${this.#cluster ? ' cluster' : ''} connection pool`);
if (this.#cluster) { this.#pool = mysql.createPoolCluster(this.#config.cluster);
this.#pool = mysql.createPoolCluster(this.#config.cluster); for (const creds of this.#credentials)
for (const creds of this.#credentials) {
this.#pool.add({ ...this.#config.client, ...creds }); this.#pool.add(creds.node, { ...this.#config.client, ...creds });
} else { this.#logger.info(`Added node ${creds.node} to pool cluster`);
this.#pool = mysql.createPool({ ...this.#config.client, ...this.#credentials[0] });
} }
this.#pool.on('connection', (connection) => { this.#pool.on('connection', (connection) =>
{
this.#logger.debug(`New connection: ${connection?.threadId || null}`); this.#logger.debug(`New connection: ${connection?.threadId || null}`);
}); });
this.#pool.on('acquire', (connection) => { this.#pool.on('acquire', (connection) =>
{
this.#_activeQueries++; this.#_activeQueries++;
this.#logger.debug(`Connection acquired: ${connection?.threadId || null}`); this.#logger.debug(`Connection acquired: ${connection?.threadId || null}`);
}); });
this.#pool.on('enqueue', () => { this.#pool.on('enqueue', () =>
this.#logger.debug(`Query enqueued for connection`); {
this.#logger.debug('Query enqueued for connection');
}); });
this.#pool.on('release', (connection) => { this.#pool.on('release', (connection) =>
{
this.#_activeQueries--; this.#_activeQueries--;
if (!this.ready && !this.#_activeQueries && this.#afterLastQuery) if (!this.ready && !this.#_activeQueries && this.#afterLastQuery)
@ -113,11 +136,20 @@ class MariaDB {
this.#logger.debug(`Connection released: ${connection?.threadId || null}`); this.#logger.debug(`Connection released: ${connection?.threadId || null}`);
}); });
this.#logger.info(`Testing MariaDB connection`); this.#pool.on('remove', (nodeId) =>
await new Promise<void>((resolve, reject) => { {
this.#logger.status(`Node ${nodeId} was removed from pool`);
const index = this.#nodes.findIndex(n => n === nodeId);
this.#nodes.splice(index, 1);
});
this.#logger.info('Testing MariaDB connection');
await new Promise<void>((resolve, reject) =>
{
if (!this.#pool) if (!this.#pool)
return reject(new Error('Missing connection pool')); return reject(new Error('Missing connection pool'));
this.#pool.getConnection((err, conn) => { this.#pool.getConnection((err, conn) =>
{
if (err) if (err)
return reject(err); return reject(err);
conn.release(); conn.release();
@ -125,27 +157,31 @@ class MariaDB {
}); });
}); });
this.#logger.status(`Database connected`); this.#logger.status('Database connected');
this.#_ready = true; this.#_ready = true;
return this; return this;
} }
async close () { async close ()
this.#logger.status(`Shutting down database connections`); {
this.#logger.status('Shutting down database connections');
if (!this.ready) if (!this.ready)
return Promise.resolve(); return Promise.resolve();
this.#_ready = false; this.#_ready = false;
if (this.#_activeQueries) { if (this.#_activeQueries)
{
this.#logger.info(`${this.#_activeQueries} active queries still running, letting them finish`); this.#logger.info(`${this.#_activeQueries} active queries still running, letting them finish`);
await this.finishQueries(); await this.finishQueries();
this.#logger.info(`Queries finished, shutting down`); this.#logger.info('Queries finished, shutting down');
} }
return new Promise<void>(resolve => { return new Promise<void>(resolve =>
{
if (!this.#pool) if (!this.#pool)
return resolve(); return resolve();
this.#pool.end(() => { this.#pool.end(() =>
{
this.#pool?.removeAllListeners(); this.#pool?.removeAllListeners();
resolve(); resolve();
}); });
@ -153,21 +189,34 @@ class MariaDB {
}); });
} }
finishQueries () { finishQueries ()
return new Promise<void>(resolve => { {
return new Promise<void>(resolve =>
{
this.#afterLastQuery = resolve; this.#afterLastQuery = resolve;
}); });
} }
getConnection (): Promise<PoolConnection> { getConnection (node?: string): Promise<PoolConnection>
return new Promise((resolve, reject) => { {
return new Promise((resolve, reject) =>
{
if (!this.#pool) if (!this.#pool)
return reject(new Error('Pool closed')); return reject(new Error('Pool closed'));
this.#pool.getConnection((err, conn) => { // Get node by name
const pool = this.#pool;
if (node && !this.#nodes.includes(node))
{
this.#logger.warn(`Node ${node} is not available in pool, falling back to arbitrary node`);
node = '*';
}
return pool.of(node ?? '*').getConnection((err, conn) =>
{
if (err) if (err)
return reject(err); return reject(err);
resolve(conn); return resolve(conn);
}); });
}); });
} }
@ -176,11 +225,16 @@ class MariaDB {
* @throws {MariaError} * @throws {MariaError}
* @private * @private
* */ * */
async #_query<T> (query: string, values: (string | number | string[] | number[])[], timeout?: number, attempts = 0): Promise<T[] | FieldInfo[]> { async #_query<T> (query: string, values: (string | number | string[] | number[])[], { timeout, node }: QueryOptions = {}, attempts = 0):
const connection = await this.getConnection(); Promise<T[] | FieldInfo[]>
try { {
const result = await new Promise<T[] | FieldInfo[] | undefined>((resolve, reject) => { const connection = await this.getConnection(node);
const q = connection.query({ timeout, sql: query }, [ values ], (err, results, fields) => { try
{
const result = await new Promise<T[] | FieldInfo[] | undefined>((resolve, reject) =>
{
const q = connection.query({ timeout, sql: query }, values, (err, results, fields) =>
{
if (err) if (err)
reject(err); reject(err);
else if (results) else if (results)
@ -192,16 +246,19 @@ class MariaDB {
this.#logger.debug(`Constructed query: ${q.sql}`); this.#logger.debug(`Constructed query: ${q.sql}`);
}); });
return Promise.resolve(result || []); return Promise.resolve(result || []);
} catch (err) { }
catch (err)
{
const error = err as MariaError; const error = err as MariaError;
// Retry safe errors // (Galera) Instance not ready for query // Retry safe errors // (Galera) Instance not ready for query
if ((SAFE_TO_RETRY.includes(error.code) || error.errno === 1047) && attempts < 5) // if ((SAFE_TO_RETRY.includes(error.code) || error.errno === 1047) && attempts < 5) //
return this.#_query(query, values, timeout, ++attempts); return this.#_query(query, values, { timeout, node }, ++attempts);
return Promise.reject(error); return Promise.reject(error);
} }
} }
async query<T> (query: string, values: (string | number | string[] | number[])[], timeout?: number): Promise<T[] | FieldInfo[]> { async query<T> (query: string, values: (string | number | string[] | number[])[], opts?: QueryOptions): Promise<T[] | FieldInfo[]>
{
if (!this.ready) if (!this.ready)
return Promise.reject(new Error('MariaDB not ready')); return Promise.reject(new Error('MariaDB not ready'));
@ -211,12 +268,13 @@ class MariaDB {
batch = values.some(val => val instanceof Array); batch = values.some(val => val instanceof Array);
this.#logger.debug(`Incoming query (batch: ${batch})\n${query}\n${inspect(values)}`); this.#logger.debug(`Incoming query (batch: ${batch})\n${query}\n${inspect(values)}`);
return this.#_query<T>(query, values, timeout); return this.#_query<T>(query, values, opts);
} }
q<T> (query: string, values: (string | number | string[] | number[])[], timeout?: number) { q<T> (query: string, values: (string | number | string[] | number[])[], opts?: QueryOptions)
return this.query<T>(query, values, timeout); {
return this.query<T>(query, values, opts);
} }
} }

View File

@ -1,4 +1,4 @@
import { ILogger, IServer } from "./interfaces/index.js"; import { ILogger, IServer } from './interfaces/index.js';
import amqp, { AmqpConnectionManager, ChannelWrapper } from 'amqp-connection-manager'; import amqp, { AmqpConnectionManager, ChannelWrapper } from 'amqp-connection-manager';
import { Channel, ConfirmChannel, ConsumeMessage, Options } from 'amqplib'; import { Channel, ConfirmChannel, ConsumeMessage, Options } from 'amqplib';
@ -56,7 +56,8 @@ type Consumer<T = any> = (content: T, msg: ConsumeMessage) => Promise<void>
// eslint-disable-next-line @typescript-eslint/no-explicit-any // eslint-disable-next-line @typescript-eslint/no-explicit-any
type Subscriber<T = any> = (content: T, msg: ConsumeMessage) => Promise<void> type Subscriber<T = any> = (content: T, msg: ConsumeMessage) => Promise<void>
class MessageBroker { class MessageBroker
{
// Broker definitions // Broker definitions
#load: boolean; #load: boolean;
@ -80,9 +81,10 @@ class MessageBroker {
#_qQueue: InternalQueueMsg[]; #_qQueue: InternalQueueMsg[];
#_qTO: NodeJS.Timeout | null; #_qTO: NodeJS.Timeout | null;
constructor (server: IServer, options: BrokerOptions) { constructor (server: IServer, options: BrokerOptions)
{
this.#load = options.load || false; this.#load = options.load ?? true;
this.#hosts = options.host.split(','); this.#hosts = options.host.split(',');
this.#username = options.user; this.#username = options.user;
this.#password = options.pass; this.#password = options.pass;
@ -114,7 +116,8 @@ class MessageBroker {
} }
async init () { async init ()
{
if (!this.#load) if (!this.#load)
return this.#logger.info('Not loading RabbitMQ'); return this.#logger.info('Not loading RabbitMQ');
@ -124,23 +127,29 @@ class MessageBroker {
const connectionStrings = this.#hosts.map(host => `amqp://${credentials}${host}:${this.#port}/${this.#vhost}`); const connectionStrings = this.#hosts.map(host => `amqp://${credentials}${host}:${this.#port}/${this.#vhost}`);
this.#connection = await amqp.connect(connectionStrings); this.#connection = await amqp.connect(connectionStrings);
this.#connection.on('disconnect', async ({ err }) => { this.#connection.on('disconnect', async ({ err }) =>
{
this.#logger.status(`Disconnected: ${err.message}`); this.#logger.status(`Disconnected: ${err.message}`);
await this.#channel?.close(); await this.#channel?.close();
this.#channel = null; this.#channel = null;
}); });
this.#connection.on('blocked', ({ reason }) => { this.#connection.on('blocked', ({ reason }) =>
{
this.#logger.status(`Blocked: ${reason}`); this.#logger.status(`Blocked: ${reason}`);
}); });
this.#connection.on('connectFailed', ({ err }) => { this.#connection.on('connectFailed', ({ err }) =>
{
this.#logger.error(`Message broker failed to connect: ${err.stack || err.message}`); this.#logger.error(`Message broker failed to connect: ${err.stack || err.message}`);
}); });
this.#connection.on('connect', async ({ url }) => { this.#connection.on('connect', async ({ url }) =>
{
this.#logger.status(`Message broker connected to ${url}`); this.#logger.status(`Message broker connected to ${url}`);
}); });
await new Promise<void>((resolve, reject) => { await new Promise<void>((resolve, reject) =>
this.#connection?.once('connect', () => { {
this.#connection?.once('connect', () =>
{
this.#connection?.removeListener('connectFailed', reject); this.#connection?.removeListener('connectFailed', reject);
resolve(); resolve();
}); });
@ -152,12 +161,15 @@ class MessageBroker {
} }
async close () { async close ()
if (this.#channel) { {
if (this.#channel)
{
await this.#channel.close(); await this.#channel.close();
this.#channel.removeAllListeners(); this.#channel.removeAllListeners();
} }
if (this.#connection) { if (this.#connection)
{
await this.#connection.close(); await this.#connection.close();
// No clue why the removeAllListeners function isn't exposed from the connection manager, but it exists since it's an EventEmitter // No clue why the removeAllListeners function isn't exposed from the connection manager, but it exists since it's an EventEmitter
// eslint-disable-next-line @typescript-eslint/ban-ts-comment // eslint-disable-next-line @typescript-eslint/ban-ts-comment
@ -166,11 +178,13 @@ class MessageBroker {
} }
} }
async createChannel () { async createChannel ()
{
const exchanges = Object.entries(this.#exchanges); const exchanges = Object.entries(this.#exchanges);
const queues = Object.entries(this.#queues); const queues = Object.entries(this.#queues);
if (this.#channel) { if (this.#channel)
{
this.#logger.debug('Closing old channel'); this.#logger.debug('Closing old channel');
await this.#channel.close().catch(err => this.#logger.error(err.stack)); await this.#channel.close().catch(err => this.#logger.error(err.stack));
} }
@ -179,11 +193,14 @@ class MessageBroker {
throw new Error(); throw new Error();
this.#logger.debug('Creating channel'); this.#logger.debug('Creating channel');
this.#channel = this.#connection.createChannel({ this.#channel = this.#connection.createChannel({
setup: async (channel: Channel | ConfirmChannel) => { setup: async (channel: Channel | ConfirmChannel) =>
for (const [ name, props ] of exchanges) { {
for (const [ name, props ] of exchanges)
{
await channel.assertExchange(name, props.type ?? 'fanout', props); await channel.assertExchange(name, props.type ?? 'fanout', props);
} }
for (const [ name, props ] of queues) { for (const [ name, props ] of queues)
{
await channel.assertQueue(name, props); await channel.assertQueue(name, props);
} }
} }
@ -193,10 +210,12 @@ class MessageBroker {
this.#channel.on('connect', () => this.#logger.status('Channel connected')); this.#channel.on('connect', () => this.#logger.status('Channel connected'));
this.#channel.on('error', (err, info) => this.#logger.error(`${info.name}: ${err.stack}`)); this.#channel.on('error', (err, info) => this.#logger.error(`${info.name}: ${err.stack}`));
await new Promise<void>((resolve, reject) => { await new Promise<void>((resolve, reject) =>
{
if (!this.#channel) if (!this.#channel)
return reject(new Error('Missing channel?')); return reject(new Error('Missing channel?'));
this.#channel.once('connect', () => { this.#channel.once('connect', () =>
{
this.#channel?.removeListener('error', reject); this.#channel?.removeListener('error', reject);
resolve(); resolve();
}); });
@ -210,7 +229,8 @@ class MessageBroker {
} }
// Consume queue // Consume queue
async consume<T> (queue: string, consumer: Consumer<T>, options?: Options.Consume) { async consume<T> (queue: string, consumer: Consumer<T>, options?: Options.Consume)
{
if (!this.#channel) if (!this.#channel)
throw new Error('Channel does not exist'); throw new Error('Channel does not exist');
@ -222,10 +242,12 @@ class MessageBroker {
this.#consumers.set(queue, list); this.#consumers.set(queue, list);
} }
private async _consume<T> (queue: string, consumer: Consumer<T>, options?: Options.Consume): Promise<void> { private async _consume<T> (queue: string, consumer: Consumer<T>, options?: Options.Consume): Promise<void>
{
if (!this.#channel) if (!this.#channel)
return Promise.reject(new Error('Channel doesn\'t exist')); return Promise.reject(new Error('Channel doesn\'t exist'));
await this.#channel.consume(queue, async (msg: ConsumeMessage) => { await this.#channel.consume(queue, async (msg: ConsumeMessage) =>
{
if (msg.content) if (msg.content)
await consumer(JSON.parse(msg.content.toString()), msg); await consumer(JSON.parse(msg.content.toString()), msg);
this.#channel?.ack(msg); this.#channel?.ack(msg);
@ -233,7 +255,8 @@ class MessageBroker {
} }
// Subscribe to exchange, ensures messages aren't lost by binding it to a queue // Subscribe to exchange, ensures messages aren't lost by binding it to a queue
async subscribe<T> (name: string, listener: Subscriber<T>) { async subscribe<T> (name: string, listener: Subscriber<T | Buffer>)
{
if (!this.#channel) if (!this.#channel)
throw new Error('Channel does not exist'); throw new Error('Channel does not exist');
@ -247,7 +270,8 @@ class MessageBroker {
} }
private async _subscribe<T> (name: string, listener: Subscriber<T>): Promise<void> { private async _subscribe<T> (name: string, listener: Subscriber<T | Buffer>): Promise<void>
{
if (!this.#channel) if (!this.#channel)
return Promise.reject(new Error('Channel doesn\'t exist')); return Promise.reject(new Error('Channel doesn\'t exist'));
@ -255,15 +279,19 @@ class MessageBroker {
const queue = await this.#channel.assertQueue('', { exclusive: true }); const queue = await this.#channel.assertQueue('', { exclusive: true });
await this.#channel.bindQueue(queue.queue, name, ''); await this.#channel.bindQueue(queue.queue, name, '');
await this.#channel.consume(queue.queue, async (msg) => { await this.#channel.consume(queue.queue, async (msg) =>
if (msg.content) {
if (msg.content && msg.content.toString().startsWith('{'))
await listener(JSON.parse(msg.content.toString()), msg); await listener(JSON.parse(msg.content.toString()), msg);
else
await listener(msg.content, msg);
this.#channel?.ack(msg); this.#channel?.ack(msg);
}); });
} }
// Add item to queue // Add item to queue
async enqueue (queue: string, content: object, headers?: object): Promise<void | number> { async enqueue (queue: string, content: object, headers?: object): Promise<void | number>
{
const properties = { const properties = {
persistent: true, persistent: true,
contentType: 'application/json', contentType: 'application/json',
@ -274,9 +302,12 @@ class MessageBroker {
if (!this.#channel) if (!this.#channel)
return this.#_qQueue.push({ queue, content, properties }); return this.#_qQueue.push({ queue, content, properties });
try { try
{
await this.#channel.sendToQueue(queue, Buffer.from(JSON.stringify(content)), properties); await this.#channel.sendToQueue(queue, Buffer.from(JSON.stringify(content)), properties);
} catch (_) { }
catch (_)
{
this.#_qQueue.push({ queue, content, properties }); this.#_qQueue.push({ queue, content, properties });
if (!this.#_qTO) if (!this.#_qTO)
this.#_qTO = setTimeout(this.#_processQueues.bind(this), 5000); this.#_qTO = setTimeout(this.#_processQueues.bind(this), 5000);
@ -284,7 +315,8 @@ class MessageBroker {
} }
// Publish to exchange // Publish to exchange
async publish (exchange: string, content: object, { headers, routingKey = '' }: {headers?: string, routingKey?: string} = {}): Promise<void|number> { async publish (exchange: string, content: object, { headers, routingKey = '' }: {headers?: string, routingKey?: string} = {}): Promise<void|number>
{
const properties = { const properties = {
contentType: 'application/json', contentType: 'application/json',
headers headers
@ -294,9 +326,12 @@ class MessageBroker {
if (!this.#channel) if (!this.#channel)
return this.#_pQueue.push({ exchange, content, routingKey, properties }); return this.#_pQueue.push({ exchange, content, routingKey, properties });
try { try
{
await this.#channel.publish(exchange, routingKey, Buffer.from(JSON.stringify(content)), properties); await this.#channel.publish(exchange, routingKey, Buffer.from(JSON.stringify(content)), properties);
} catch (err) { }
catch (err)
{
const error = err as Error; const error = err as Error;
if (!error.message.includes('nack')) if (!error.message.includes('nack'))
this.#logger.error(`Error while publishing to ${exchange}:\n${error.stack}`); this.#logger.error(`Error while publishing to ${exchange}:\n${error.stack}`);
@ -306,62 +341,74 @@ class MessageBroker {
} }
} }
assertExchange (exchange: string, props?: ExchangeDef) { assertExchange (exchange: string, props?: ExchangeDef)
{
if (!this.#channel) if (!this.#channel)
throw new Error('Channel doesn\'t exist'); throw new Error('Channel doesn\'t exist');
return this.#channel.assertExchange(exchange, props?.type ?? 'fanout', props); return this.#channel.assertExchange(exchange, props?.type ?? 'fanout', props);
} }
assertQueue (queue: string, opts?: QueueDef) { assertQueue (queue: string, opts?: QueueDef)
{
if (!this.#channel) if (!this.#channel)
throw new Error('Channel doesn\'t exist'); throw new Error('Channel doesn\'t exist');
return this.#channel.assertQueue(queue, opts); return this.#channel.assertQueue(queue, opts);
} }
// Processes messages queued up while the broker was unreachable // Processes messages queued up while the broker was unreachable
async #_processQueues () { async #_processQueues ()
{
if (!this.#channel) if (!this.#channel)
throw new Error('Channel doesn\'t exist'); throw new Error('Channel doesn\'t exist');
this.#logger.status('Processing queues of unsent messages'); this.#logger.status('Processing queues of unsent messages');
const pQ = [ ...this.#_pQueue ]; const pQ = [ ...this.#_pQueue ];
this.#_pQueue = []; this.#_pQueue = [];
for (const msg of pQ) { for (const msg of pQ)
{
const { exchange, content, routingKey, properties } = msg; const { exchange, content, routingKey, properties } = msg;
const result = await this.#channel.publish(exchange, routingKey, Buffer.from(JSON.stringify(content)), properties).catch(() => null); const result = await this.#channel.publish(exchange, routingKey, Buffer.from(JSON.stringify(content)), properties)
.catch(() => null);
if (!result) if (!result)
this.#_pQueue.push(msg); this.#_pQueue.push(msg);
} }
const qQ = [ ...this.#_qQueue ]; const qQ = [ ...this.#_qQueue ];
for (const msg of qQ) { for (const msg of qQ)
{
const { queue, content, properties } = msg; const { queue, content, properties } = msg;
const result = await this.#channel.sendToQueue(queue, Buffer.from(JSON.stringify(content)), properties).catch(() => null); const result = await this.#channel.sendToQueue(queue, Buffer.from(JSON.stringify(content)), properties).catch(() => null);
if (!result) if (!result)
this.#_qQueue.push(msg); this.#_qQueue.push(msg);
} }
this.#logger.status('Done processing'); this.#logger.status('Done processing');
if (this.#_qTO) { if (this.#_qTO)
{
clearTimeout(this.#_qTO); clearTimeout(this.#_qTO);
this.#_qTO = null; this.#_qTO = null;
} }
} }
async #_restoreListeners () { async #_restoreListeners ()
{
this.#logger.status(`Restoring ${this.#consumers.size} consumers`); this.#logger.status(`Restoring ${this.#consumers.size} consumers`);
for (const [ name, list ] of this.#consumers) { for (const [ name, list ] of this.#consumers)
{
this.#logger.debug(`Processing consumer ${name}: ${list.length}`); this.#logger.debug(`Processing consumer ${name}: ${list.length}`);
for (const { consumer, options } of list) { for (const { consumer, options } of list)
{
await this._consume(name, consumer, options); await this._consume(name, consumer, options);
} }
} }
this.#logger.status(`Restoring ${this.#subscribers.size} subscribers`); this.#logger.status(`Restoring ${this.#subscribers.size} subscribers`);
for (const [ name, list ] of this.#subscribers) { for (const [ name, list ] of this.#subscribers)
{
this.#logger.debug(`Processing subscriber ${name}: ${list.length}`); this.#logger.debug(`Processing subscriber ${name}: ${list.length}`);
for (const subscriber of list) { for (const subscriber of list)
{
await this._subscribe(name, subscriber); await this._subscribe(name, subscriber);
} }
} }
this.#logger.status(`Done restoring`); this.#logger.status('Done restoring');
} }
} }

View File

@ -1,6 +1,6 @@
import { inspect } from "node:util"; import { inspect } from 'node:util';
import { MongoClient, MongoClientOptions, Db, Document, WithId, ObjectId, Filter, IndexSpecification, CreateIndexesOptions, FindOptions } from "mongodb"; import { MongoClient, MongoClientOptions, Db, Document, WithId, ObjectId, Filter, IndexSpecification, CreateIndexesOptions, FindOptions } from 'mongodb';
import { IServer, ILogger, LoggerClientOptions } from "./interfaces/index.js"; import { IServer, ILogger, LoggerClientOptions } from './interfaces/index.js';
type Credentials = { type Credentials = {
URI?: string, URI?: string,
@ -29,7 +29,8 @@ export type MongoOptions = {
* *
* @class MongoDB * @class MongoDB
*/ */
class MongoDB { class MongoDB
{
#_database: string; #_database: string;
#config: MongoOptions; #config: MongoOptions;
@ -39,7 +40,8 @@ class MongoDB {
#db: Db | null; #db: Db | null;
#_client: MongoClient; #_client: MongoClient;
constructor (server: IServer, config: MongoOptions) { constructor (server: IServer, config: MongoOptions)
{
if (!server) if (!server)
throw new Error('Missing reference to server!'); throw new Error('Missing reference to server!');
@ -48,7 +50,7 @@ class MongoDB {
const { user, password, host, port, database, URI, authDb } = config.credentials; const { user, password, host, port, database, URI, authDb } = config.credentials;
if ((!host?.length || !port || !database?.length) && !URI) if ((!host?.length || !port || !database?.length) && !URI)
throw new Error(`Must provide host, port, and database OR URI parameters!`); throw new Error('Must provide host, port, and database OR URI parameters!');
this.#config = config; this.#config = config;
this.#db = null; // DB connection this.#db = null; // DB connection
@ -56,16 +58,22 @@ class MongoDB {
this.#logger = server.createLogger(this, config.loggerOptions); this.#logger = server.createLogger(this, config.loggerOptions);
if (URI) { if (URI)
{
this.#URI = URI; this.#URI = URI;
} else { }
else
{
let AUTH_DB = authDb; let AUTH_DB = authDb;
const auth = user ? `${user}:${password}@` : ''; const auth = user ? `${user}:${password}@` : '';
if (!AUTH_DB && auth) { if (!AUTH_DB && auth)
this.#logger.warn(`No explicit auth db provided with MONGO_AUTH_DB, will attempt to use MONGO_DB for auth source`); {
this.#logger.warn('No explicit auth db provided with MONGO_AUTH_DB, will attempt to use MONGO_DB for auth source');
AUTH_DB = authDb; AUTH_DB = authDb;
} else if (!auth) { }
this.#logger.warn(`No auth provided, proceeding without`); else if (!auth)
{
this.#logger.warn('No auth provided, proceeding without');
} }
this.#URI = `mongodb://${auth}${host}:${port}/${AUTH_DB || ''}?readPreference=secondaryPreferred`; this.#URI = `mongodb://${auth}${host}:${port}/${AUTH_DB || ''}?readPreference=secondaryPreferred`;
@ -75,17 +83,19 @@ class MongoDB {
// TODO figure out reconnecting to DB when connection fails // TODO figure out reconnecting to DB when connection fails
this.#_client.on('error', (error) => this.#logger.error(`MongoDB error:\n${error.stack}`)) this.#_client.on('error', (error) => this.#logger.error(`MongoDB error:\n${error.stack}`))
.on('timeout', () => this.#logger.warn(`MongoDB timed out`)) .on('timeout', () => this.#logger.warn('MongoDB timed out'))
.on('close', () => this.#logger.info(`MongoDB client disconnected`)) .on('close', () => this.#logger.info('MongoDB client disconnected'))
.on('open', () => this.#logger.info(`MongoDB client connected`)); .on('open', () => this.#logger.info('MongoDB client connected'));
} }
get database () { get database ()
{
return this.#_database; return this.#_database;
} }
get client () { get client ()
{
return this.#_client; return this.#_client;
} }
@ -94,7 +104,8 @@ class MongoDB {
* *
* @memberof MongoDB * @memberof MongoDB
*/ */
async init () { async init ()
{
if (!this.#config.load) if (!this.#config.load)
return this.#logger.info('Not loading MongoDB'); return this.#logger.info('Not loading MongoDB');
@ -105,7 +116,7 @@ class MongoDB {
this.#logger.status(`Initializing database connection to ${this.#_client.options.hosts}`); this.#logger.status(`Initializing database connection to ${this.#_client.options.hosts}`);
await this.#_client.connect(); await this.#_client.connect();
this.#logger.debug(`Connected, selecting DB`); this.#logger.debug('Connected, selecting DB');
this.#db = await this.#_client.db(this.#_database); this.#db = await this.#_client.db(this.#_database);
this.#logger.status('MongoDB ready'); this.#logger.status('MongoDB ready');
@ -114,7 +125,8 @@ class MongoDB {
} }
async close () { async close ()
{
if (!this.#db) if (!this.#db)
return; return;
@ -124,7 +136,8 @@ class MongoDB {
this.#logger.status('Database closed'); this.#logger.status('Database closed');
} }
get mongoClient () { get mongoClient ()
{
return this.#_client; return this.#_client;
} }
@ -136,10 +149,11 @@ class MongoDB {
* @returns {Array} An array containing the corresponding objects for the query * @returns {Array} An array containing the corresponding objects for the query
* @memberof Database * @memberof Database
*/ */
async find<T extends Document> (db: string, query: MongoQuery, options?: FindOptions<T>): Promise<WithId<T>[]> { async find<T extends Document> (db: string, query: MongoQuery, options?: FindOptions<T>): Promise<WithId<T>[]>
{
if (!this.#db) if (!this.#db)
throw new Error(`MongoDB not connected`); throw new Error('MongoDB not connected');
if (typeof db !== 'string') if (typeof db !== 'string')
throw new TypeError('Expecting collection name for the first argument'); throw new TypeError('Expecting collection name for the first argument');
@ -162,10 +176,11 @@ class MongoDB {
* @returns {Object} An object containing the queried data * @returns {Object} An object containing the queried data
* @memberof Database * @memberof Database
*/ */
async findOne<T extends Document> (db: string, query: MongoQuery, options: FindOptions<T> = {}): Promise<WithId<T> | null> { async findOne<T extends Document> (db: string, query: MongoQuery, options: FindOptions<T> = {}): Promise<WithId<T> | null>
{
if (!this.#db) if (!this.#db)
throw new Error(`MongoDB not connected`); throw new Error('MongoDB not connected');
if (typeof db !== 'string') if (typeof db !== 'string')
throw new TypeError('Expecting collection name for the first argument'); throw new TypeError('Expecting collection name for the first argument');
@ -187,14 +202,15 @@ class MongoDB {
* @returns {WriteResult} Object containing the followint counts: Matched, Upserted, Modified * @returns {WriteResult} Object containing the followint counts: Matched, Upserted, Modified
* @memberof Database * @memberof Database
*/ */
async updateMany<T extends Document> (db: string, filter: MongoQuery, data: T, upsert = false) { async updateMany<T extends Document> (db: string, filter: MongoQuery, data: T, upsert = false)
{
if (!this.#db) if (!this.#db)
throw new Error(`MongoDB not connected`); throw new Error('MongoDB not connected');
if (typeof db !== 'string') if (typeof db !== 'string')
throw new TypeError('Expecting collection name for the first argument'); throw new TypeError('Expecting collection name for the first argument');
if (!filter) if (!filter)
throw new Error(`Cannot run update many without a filter, if you mean to update every single document, pass an empty object`); throw new Error('Cannot run update many without a filter, if you mean to update every single document, pass an empty object');
if (typeof filter._id === 'string') if (typeof filter._id === 'string')
filter._id = new ObjectId(filter._id); filter._id = new ObjectId(filter._id);
@ -213,10 +229,11 @@ class MongoDB {
* @returns {WriteResult} Object containing the followint counts: Matched, Upserted, Modified * @returns {WriteResult} Object containing the followint counts: Matched, Upserted, Modified
* @memberof Database * @memberof Database
*/ */
async updateOne (db: string, filter: MongoQuery, data: Document, upsert = false) { async updateOne (db: string, filter: MongoQuery, data: Document, upsert = false)
{
if (!this.#db) if (!this.#db)
throw new Error(`MongoDB not connected`); throw new Error('MongoDB not connected');
if (typeof db !== 'string') if (typeof db !== 'string')
throw new TypeError('Expecting collection name for the first argument'); throw new TypeError('Expecting collection name for the first argument');
if (typeof filter._id === 'string') if (typeof filter._id === 'string')
@ -237,10 +254,11 @@ class MongoDB {
* @returns {WriteResult} Object containing the followint counts: Matched, Upserted, Modified * @returns {WriteResult} Object containing the followint counts: Matched, Upserted, Modified
* @memberof Database * @memberof Database
*/ */
async insertOne (db: string, data: Document) { async insertOne (db: string, data: Document)
{
if (!this.#db) if (!this.#db)
throw new Error(`MongoDB not connected`); throw new Error('MongoDB not connected');
if (typeof db !== 'string') if (typeof db !== 'string')
throw new TypeError('Expecting collection name for the first argument'); throw new TypeError('Expecting collection name for the first argument');
if (typeof data._id === 'string') if (typeof data._id === 'string')
@ -252,10 +270,11 @@ class MongoDB {
} }
async deleteOne (db: string, filter: Document) { async deleteOne (db: string, filter: Document)
{
if (!this.#db) if (!this.#db)
throw new Error(`MongoDB not connected`); throw new Error('MongoDB not connected');
if (typeof db !== 'string') if (typeof db !== 'string')
throw new TypeError('Expecting collection name for the first argument'); throw new TypeError('Expecting collection name for the first argument');
if (typeof filter._id === 'string') if (typeof filter._id === 'string')
@ -277,10 +296,11 @@ class MongoDB {
* @returns * @returns
* @memberof Database * @memberof Database
*/ */
async push (db: string, filter: Document, data: object, upsert = false) { async push (db: string, filter: Document, data: object, upsert = false)
{
if (!this.#db) if (!this.#db)
throw new Error(`MongoDB not connected`); throw new Error('MongoDB not connected');
if (typeof db !== 'string') if (typeof db !== 'string')
throw new TypeError('Expecting collection name for the first argument'); throw new TypeError('Expecting collection name for the first argument');
if (typeof filter._id === 'string') if (typeof filter._id === 'string')
@ -301,10 +321,11 @@ class MongoDB {
* @returns {object} * @returns {object}
* @memberof Database * @memberof Database
*/ */
random<T extends Document> (db: string, filter: Document = {}, amount = 1) { random<T extends Document> (db: string, filter: Document = {}, amount = 1)
{
if (!this.#db) if (!this.#db)
throw new Error(`MongoDB not connected`); throw new Error('MongoDB not connected');
if (typeof db !== 'string') if (typeof db !== 'string')
throw new TypeError('Expecting collection name for the first argument'); throw new TypeError('Expecting collection name for the first argument');
if (typeof filter._id === 'string') if (typeof filter._id === 'string')
@ -320,28 +341,32 @@ class MongoDB {
} }
stats (options = {}) { stats (options = {})
{
if (!this.#db) if (!this.#db)
throw new Error(`MongoDB not connected`); throw new Error('MongoDB not connected');
const result = this.#db.stats(options); const result = this.#db.stats(options);
return result; return result;
} }
collection<T extends Document> (coll: string) { collection<T extends Document> (coll: string)
{
if (!this.#db) if (!this.#db)
throw new Error(`MongoDB not connected`); throw new Error('MongoDB not connected');
return this.#db.collection<T>(coll); return this.#db.collection<T>(coll);
} }
count (coll: string, query: Document) { count (coll: string, query: Document)
{
if (!this.#db) if (!this.#db)
throw new Error(`MongoDB not connected`); throw new Error('MongoDB not connected');
return this.#db.collection(coll).countDocuments(query); return this.#db.collection(coll).countDocuments(query);
} }
async ensureIndex (collection: string, indices: IndexSpecification = [], options?: CreateIndexesOptions) { async ensureIndex (collection: string, indices: IndexSpecification = [], options?: CreateIndexesOptions)
{
if (!this.#db) if (!this.#db)
throw new Error(`MongoDB not connected`); throw new Error('MongoDB not connected');
if (!(indices instanceof Array)) if (!(indices instanceof Array))
indices = [ indices ]; indices = [ indices ];
await this.#db.collection(collection).createIndex(indices, options); await this.#db.collection(collection).createIndex(indices, options);

View File

@ -1,5 +1,5 @@
import { ILogger, LoggerClientOptions } from "./Logger.js"; import { ILogger, LoggerClientOptions } from "./Logger.js";
export interface IServer { export interface IServer {
createLogger(obj: object, options?: LoggerClientOptions): ILogger createLogger(obj: object, options?: Partial<LoggerClientOptions>): ILogger
} }

27
tests/testBroker.js Normal file
View File

@ -0,0 +1,27 @@
import { MessageBroker } from '../build/esm/index.js';
const broker = new MessageBroker({
createLogger: () =>
{
return {
debug: console.log,
info: console.log,
status: console.log,
warn: console.log,
error: console.error
};
}
}, {
load: true,
host: 'rabbitmq-01.stylis.local',
user: 'stylis',
pass: 'RrwJyrfeXFMimDH3hjZ5xSreMAmXtQJj',
vhost: 'development',
port: 5672
});
await broker.init();
broker.subscribe('chatlogs', (message) =>
{
console.log(message);
});

View File

@ -4,7 +4,8 @@ import { MariaDB } from '../build/esm/index.js';
const credentials = JSON.parse(readFileSync('./credentials.json', { encoding: 'utf-8' })); const credentials = JSON.parse(readFileSync('./credentials.json', { encoding: 'utf-8' }));
const maria = new MariaDB({ const maria = new MariaDB({
createLogger: () => { createLogger: () =>
{
return { return {
debug: console.log, debug: console.log,
info: console.log, info: console.log,

View File

@ -1,7 +1,8 @@
import { MongoDB } from '../build/esm/index.js'; import { MongoDB } from '../build/esm/index.js';
const mongo = new MongoDB({ const mongo = new MongoDB({
createLogger: () => { createLogger: () =>
{
return { return {
debug: console.log, debug: console.log,
info: console.log, info: console.log,