Compare commits
8 Commits
36d5339e15
...
b68484d4c5
Author | SHA1 | Date | |
---|---|---|---|
b68484d4c5 | |||
24b2bcd3d9 | |||
40b24ee95e | |||
4ef0a2cbd8 | |||
b95ec26917 | |||
1374b92f8e | |||
191e9e77b2 | |||
1fcbf949cc |
115
.eslintrc.json
115
.eslintrc.json
@ -15,17 +15,38 @@
|
|||||||
"sourceType": "module"
|
"sourceType": "module"
|
||||||
},
|
},
|
||||||
"rules": {
|
"rules": {
|
||||||
|
"@typescript-eslint/no-unused-vars": "off",
|
||||||
|
"@typescript-eslint/no-non-null-assertion":"off",
|
||||||
"accessor-pairs": "warn",
|
"accessor-pairs": "warn",
|
||||||
"array-callback-return": "warn",
|
"array-callback-return": "warn",
|
||||||
"array-bracket-newline": [ "warn", "consistent" ],
|
"array-bracket-newline": [
|
||||||
"array-bracket-spacing": [ "warn", "always", { "objectsInArrays": false, "arraysInArrays": false }],
|
"warn",
|
||||||
|
"consistent"
|
||||||
|
],
|
||||||
|
"array-bracket-spacing": [
|
||||||
|
"warn",
|
||||||
|
"always",
|
||||||
|
{
|
||||||
|
"objectsInArrays": false,
|
||||||
|
"arraysInArrays": false
|
||||||
|
}
|
||||||
|
],
|
||||||
"arrow-spacing": "warn",
|
"arrow-spacing": "warn",
|
||||||
"block-scoped-var": "warn",
|
"block-scoped-var": "warn",
|
||||||
"block-spacing": [ "warn", "always" ],
|
"block-spacing": [
|
||||||
"brace-style": [ "warn", "1tbs" ],
|
"warn",
|
||||||
|
"always"
|
||||||
|
],
|
||||||
|
"brace-style": [
|
||||||
|
"warn",
|
||||||
|
"allman"
|
||||||
|
],
|
||||||
"callback-return": "warn",
|
"callback-return": "warn",
|
||||||
"camelcase": "warn",
|
"camelcase": "warn",
|
||||||
"comma-dangle": [ "warn", "only-multiline" ],
|
"comma-dangle": [
|
||||||
|
"warn",
|
||||||
|
"only-multiline"
|
||||||
|
],
|
||||||
"comma-spacing": [
|
"comma-spacing": [
|
||||||
"warn",
|
"warn",
|
||||||
{
|
{
|
||||||
@ -68,9 +89,25 @@
|
|||||||
"implicit-arrow-linebreak": "warn",
|
"implicit-arrow-linebreak": "warn",
|
||||||
"indent": "warn",
|
"indent": "warn",
|
||||||
"init-declarations": "warn",
|
"init-declarations": "warn",
|
||||||
"jsx-quotes": [ "warn", "prefer-single" ],
|
"quotes": ["error" , "single"],
|
||||||
"key-spacing": [ "warn", { "beforeColon": false, "afterColon": true }],
|
"jsx-quotes": [
|
||||||
"keyword-spacing": [ "warn", { "after": true, "before": true }],
|
"warn",
|
||||||
|
"prefer-single"
|
||||||
|
],
|
||||||
|
"key-spacing": [
|
||||||
|
"warn",
|
||||||
|
{
|
||||||
|
"beforeColon": false,
|
||||||
|
"afterColon": true
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"keyword-spacing": [
|
||||||
|
"warn",
|
||||||
|
{
|
||||||
|
"after": true,
|
||||||
|
"before": true
|
||||||
|
}
|
||||||
|
],
|
||||||
"linebreak-style": [
|
"linebreak-style": [
|
||||||
"error",
|
"error",
|
||||||
"unix"
|
"unix"
|
||||||
@ -79,10 +116,28 @@
|
|||||||
"lines-around-directive": "warn",
|
"lines-around-directive": "warn",
|
||||||
"max-classes-per-file": "warn",
|
"max-classes-per-file": "warn",
|
||||||
"max-nested-callbacks": "warn",
|
"max-nested-callbacks": "warn",
|
||||||
|
"max-len": [
|
||||||
|
"warn",
|
||||||
|
{
|
||||||
|
"code": 140,
|
||||||
|
"ignoreComments": true,
|
||||||
|
"ignoreStrings": true,
|
||||||
|
"ignoreTemplateLiterals": true,
|
||||||
|
"ignoreRegExpLiterals": true
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"max-lines-per-function": [
|
||||||
|
"warn",
|
||||||
|
100
|
||||||
|
],
|
||||||
|
"max-depth": [
|
||||||
|
"warn",
|
||||||
|
3
|
||||||
|
],
|
||||||
"new-parens": "warn",
|
"new-parens": "warn",
|
||||||
"no-alert": "warn",
|
"no-alert": "warn",
|
||||||
"no-array-constructor": "warn",
|
"no-array-constructor": "warn",
|
||||||
"no-bitwise": "warn",
|
// "no-bitwise": "warn",
|
||||||
"no-buffer-constructor": "warn",
|
"no-buffer-constructor": "warn",
|
||||||
"no-caller": "warn",
|
"no-caller": "warn",
|
||||||
"no-console": "warn",
|
"no-console": "warn",
|
||||||
@ -96,7 +151,6 @@
|
|||||||
"no-extend-native": "warn",
|
"no-extend-native": "warn",
|
||||||
"no-extra-bind": "warn",
|
"no-extra-bind": "warn",
|
||||||
"no-extra-label": "warn",
|
"no-extra-label": "warn",
|
||||||
"no-extra-parens": "warn",
|
|
||||||
"no-floating-decimal": "warn",
|
"no-floating-decimal": "warn",
|
||||||
"no-implicit-coercion": "warn",
|
"no-implicit-coercion": "warn",
|
||||||
"no-implicit-globals": "warn",
|
"no-implicit-globals": "warn",
|
||||||
@ -147,7 +201,7 @@
|
|||||||
"no-unmodified-loop-condition": "warn",
|
"no-unmodified-loop-condition": "warn",
|
||||||
"no-unneeded-ternary": "error",
|
"no-unneeded-ternary": "error",
|
||||||
"no-unused-expressions": "warn",
|
"no-unused-expressions": "warn",
|
||||||
"no-use-before-define": "error",
|
"@typescript-eslint/no-use-before-define": "error",
|
||||||
"no-useless-call": "warn",
|
"no-useless-call": "warn",
|
||||||
"no-useless-computed-key": "warn",
|
"no-useless-computed-key": "warn",
|
||||||
"no-useless-concat": "warn",
|
"no-useless-concat": "warn",
|
||||||
@ -155,20 +209,39 @@
|
|||||||
"no-useless-rename": "warn",
|
"no-useless-rename": "warn",
|
||||||
"no-useless-return": "warn",
|
"no-useless-return": "warn",
|
||||||
"no-var": "warn",
|
"no-var": "warn",
|
||||||
"no-void": "warn",
|
// "no-void": "warn",
|
||||||
"no-whitespace-before-property": "error",
|
"no-whitespace-before-property": "error",
|
||||||
"nonblock-statement-body-position": [ "warn", "below" ],
|
"nonblock-statement-body-position": [
|
||||||
|
"warn",
|
||||||
|
"below"
|
||||||
|
],
|
||||||
"object-curly-spacing": [
|
"object-curly-spacing": [
|
||||||
"warn",
|
"warn",
|
||||||
"always"
|
"always"
|
||||||
],
|
],
|
||||||
"object-property-newline": [ "warn", { "allowAllPropertiesOnSameLine": true }],
|
"object-property-newline": [
|
||||||
|
"warn",
|
||||||
|
{
|
||||||
|
"allowAllPropertiesOnSameLine": true
|
||||||
|
}
|
||||||
|
],
|
||||||
"object-shorthand": "warn",
|
"object-shorthand": "warn",
|
||||||
"one-var-declaration-per-line": "warn",
|
"one-var-declaration-per-line": "warn",
|
||||||
"operator-assignment": "warn",
|
"operator-assignment": "warn",
|
||||||
"operator-linebreak": [ "warn", "before" ],
|
"operator-linebreak": [
|
||||||
|
"warn",
|
||||||
|
"before"
|
||||||
|
],
|
||||||
"padding-line-between-statements": "warn",
|
"padding-line-between-statements": "warn",
|
||||||
"padded-blocks": [ "warn", { "switches": "never" }, { "allowSingleLineBlocks": true }],
|
"padded-blocks": [
|
||||||
|
"warn",
|
||||||
|
{
|
||||||
|
"switches": "never"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"allowSingleLineBlocks": true
|
||||||
|
}
|
||||||
|
],
|
||||||
"prefer-arrow-callback": "warn",
|
"prefer-arrow-callback": "warn",
|
||||||
"prefer-const": "warn",
|
"prefer-const": "warn",
|
||||||
"prefer-destructuring": "warn",
|
"prefer-destructuring": "warn",
|
||||||
@ -189,12 +262,18 @@
|
|||||||
"last"
|
"last"
|
||||||
],
|
],
|
||||||
"space-before-blocks": "warn",
|
"space-before-blocks": "warn",
|
||||||
"space-before-function-paren": [ "error", "always" ],
|
"space-before-function-paren": [
|
||||||
|
"error",
|
||||||
|
"always"
|
||||||
|
],
|
||||||
"space-in-parens": [
|
"space-in-parens": [
|
||||||
"warn",
|
"warn",
|
||||||
"never"
|
"never"
|
||||||
],
|
],
|
||||||
"spaced-comment": [ "warn", "always" ],
|
"spaced-comment": [
|
||||||
|
"warn",
|
||||||
|
"always"
|
||||||
|
],
|
||||||
"strict": "warn",
|
"strict": "warn",
|
||||||
"switch-colon-spacing": "warn",
|
"switch-colon-spacing": "warn",
|
||||||
"symbol-description": "warn",
|
"symbol-description": "warn",
|
||||||
|
2
index.ts
2
index.ts
@ -1,4 +1,4 @@
|
|||||||
export { MessageBroker, BrokerOptions } from "./src/MessageBroker.js";
|
export { MessageBroker, BrokerOptions } from './src/MessageBroker.js';
|
||||||
export { MariaDB, MariaOptions } from './src/MariaDB.js';
|
export { MariaDB, MariaOptions } from './src/MariaDB.js';
|
||||||
export { MongoDB, MongoOptions } from './src/MongoDB.js';
|
export { MongoDB, MongoOptions } from './src/MongoDB.js';
|
||||||
export { ObjectId, Document, DeleteResult, Collection } from 'mongodb';
|
export { ObjectId, Document, DeleteResult, Collection } from 'mongodb';
|
@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@navy.gif/wrappers",
|
"name": "@navy.gif/wrappers",
|
||||||
"version": "1.3.20",
|
"version": "1.4.3",
|
||||||
"description": "Various wrapper classes I use in my projects",
|
"description": "Various wrapper classes I use in my projects",
|
||||||
"repository": "https://git.corgi.wtf/Navy.gif/wrappers.git",
|
"repository": "https://git.corgi.wtf/Navy.gif/wrappers.git",
|
||||||
"author": "Navy.gif",
|
"author": "Navy.gif",
|
||||||
|
158
src/MariaDB.ts
158
src/MariaDB.ts
@ -1,17 +1,18 @@
|
|||||||
import { inspect } from 'node:util';
|
import { inspect } from 'node:util';
|
||||||
import { ILogger, IServer } from './interfaces/index.js';
|
import { ILogger, IServer } from './interfaces/index.js';
|
||||||
|
|
||||||
import mysql, { Pool, PoolCluster, PoolClusterConfig, PoolConfig, PoolConnection, FieldInfo } from 'mysql';
|
import mysql, { PoolCluster, PoolClusterConfig, PoolConfig, PoolConnection, FieldInfo } from 'mysql';
|
||||||
import { LoggerClientOptions } from './interfaces/Logger.js';
|
import { LoggerClientOptions } from './interfaces/Logger.js';
|
||||||
|
|
||||||
const SAFE_TO_RETRY = [ 'ER_LOCK_DEADLOCK' ];
|
const SAFE_TO_RETRY = [ 'ER_LOCK_DEADLOCK', 'PROTOCOL_CONNECTION_LOST' ];
|
||||||
|
|
||||||
type Credentials = {
|
type Credentials = {
|
||||||
user: string,
|
user: string,
|
||||||
password: string,
|
password: string,
|
||||||
host: string,
|
host: string,
|
||||||
port: number,
|
port: number,
|
||||||
database: string
|
database: string,
|
||||||
|
node: string
|
||||||
}
|
}
|
||||||
|
|
||||||
export type MariaOptions = {
|
export type MariaOptions = {
|
||||||
@ -31,19 +32,28 @@ type MariaError = {
|
|||||||
sqlMessage:string
|
sqlMessage:string
|
||||||
} & Error
|
} & Error
|
||||||
|
|
||||||
class MariaDB {
|
type QueryOptions = {
|
||||||
|
node?: string,
|
||||||
|
timeout?: number
|
||||||
|
}
|
||||||
|
|
||||||
|
class MariaDB
|
||||||
|
{
|
||||||
|
|
||||||
#_activeQueries: number;
|
#_activeQueries: number;
|
||||||
#afterLastQuery: (() => void) | null;
|
#afterLastQuery: (() => void) | null;
|
||||||
#logger: ILogger;
|
#logger: ILogger;
|
||||||
#_ready: boolean;
|
#_ready: boolean;
|
||||||
|
#load: boolean;
|
||||||
|
|
||||||
#config: MariaOptions;
|
#config: MariaOptions;
|
||||||
#credentials: Credentials[];
|
#credentials: Credentials[];
|
||||||
#cluster: boolean;
|
#cluster: boolean;
|
||||||
#pool: PoolCluster | Pool | null;
|
#pool: PoolCluster | null;
|
||||||
|
#nodes: string[];
|
||||||
|
|
||||||
constructor (server: IServer, options: MariaOptions) {
|
constructor (server: IServer, options: MariaOptions)
|
||||||
|
{
|
||||||
|
|
||||||
if (!server)
|
if (!server)
|
||||||
throw new Error('Missing reference to server!');
|
throw new Error('Missing reference to server!');
|
||||||
@ -51,11 +61,18 @@ class MariaDB {
|
|||||||
throw new Error('No config options provided!');
|
throw new Error('No config options provided!');
|
||||||
|
|
||||||
this.#config = options;
|
this.#config = options;
|
||||||
|
this.#load = options.load ?? true;
|
||||||
const { host, user, port, password, database } = options.credentials;
|
const { host, user, port, password, database } = options.credentials;
|
||||||
const hosts = host.split(',');
|
const hosts = host.split(',');
|
||||||
this.#credentials = [];
|
this.#credentials = [];
|
||||||
for (const remote of hosts) {
|
this.#nodes = [];
|
||||||
this.#credentials.push({ host: remote, user, port, password, database });
|
for (const remote of hosts)
|
||||||
|
{
|
||||||
|
const [ node ] = remote.split('.');
|
||||||
|
if (typeof node === 'undefined' || !(/(([a-zA-Z]\w*[0-9]?)|([0-9]?\w*[a-zA-Z]))/u).test(node))
|
||||||
|
throw new Error('Invalid host config, expecting qualified domain names');
|
||||||
|
this.#credentials.push({ host: remote, user, port, password, database, node });
|
||||||
|
this.#nodes.push(node);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.#pool = null;
|
this.#pool = null;
|
||||||
@ -69,42 +86,48 @@ class MariaDB {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
get ready () {
|
get ready ()
|
||||||
|
{
|
||||||
return this.#_ready;
|
return this.#_ready;
|
||||||
}
|
}
|
||||||
|
|
||||||
get activeQueries () {
|
get activeQueries ()
|
||||||
|
{
|
||||||
return this.#_activeQueries;
|
return this.#_activeQueries;
|
||||||
}
|
}
|
||||||
|
|
||||||
async init () {
|
async init ()
|
||||||
|
{
|
||||||
|
|
||||||
if (!this.#config.load)
|
if (!this.#load)
|
||||||
return this.#logger.info('Not loading MariaDB');
|
return this.#logger.info('Not loading MariaDB');
|
||||||
|
|
||||||
this.#logger.status(`Creating${this.#cluster ? ' cluster' : ''} connection pool`);
|
this.#logger.status(`Creating${this.#cluster ? ' cluster' : ''} connection pool`);
|
||||||
if (this.#cluster) {
|
this.#pool = mysql.createPoolCluster(this.#config.cluster);
|
||||||
this.#pool = mysql.createPoolCluster(this.#config.cluster);
|
for (const creds of this.#credentials)
|
||||||
for (const creds of this.#credentials)
|
{
|
||||||
this.#pool.add({ ...this.#config.client, ...creds });
|
this.#pool.add(creds.node, { ...this.#config.client, ...creds });
|
||||||
} else {
|
this.#logger.info(`Added node ${creds.node} to pool cluster`);
|
||||||
this.#pool = mysql.createPool({ ...this.#config.client, ...this.#credentials[0] });
|
|
||||||
}
|
}
|
||||||
|
|
||||||
this.#pool.on('connection', (connection) => {
|
this.#pool.on('connection', (connection) =>
|
||||||
|
{
|
||||||
this.#logger.debug(`New connection: ${connection?.threadId || null}`);
|
this.#logger.debug(`New connection: ${connection?.threadId || null}`);
|
||||||
});
|
});
|
||||||
|
|
||||||
this.#pool.on('acquire', (connection) => {
|
this.#pool.on('acquire', (connection) =>
|
||||||
|
{
|
||||||
this.#_activeQueries++;
|
this.#_activeQueries++;
|
||||||
this.#logger.debug(`Connection acquired: ${connection?.threadId || null}`);
|
this.#logger.debug(`Connection acquired: ${connection?.threadId || null}`);
|
||||||
});
|
});
|
||||||
|
|
||||||
this.#pool.on('enqueue', () => {
|
this.#pool.on('enqueue', () =>
|
||||||
this.#logger.debug(`Query enqueued for connection`);
|
{
|
||||||
|
this.#logger.debug('Query enqueued for connection');
|
||||||
});
|
});
|
||||||
|
|
||||||
this.#pool.on('release', (connection) => {
|
this.#pool.on('release', (connection) =>
|
||||||
|
{
|
||||||
this.#_activeQueries--;
|
this.#_activeQueries--;
|
||||||
|
|
||||||
if (!this.ready && !this.#_activeQueries && this.#afterLastQuery)
|
if (!this.ready && !this.#_activeQueries && this.#afterLastQuery)
|
||||||
@ -113,11 +136,20 @@ class MariaDB {
|
|||||||
this.#logger.debug(`Connection released: ${connection?.threadId || null}`);
|
this.#logger.debug(`Connection released: ${connection?.threadId || null}`);
|
||||||
});
|
});
|
||||||
|
|
||||||
this.#logger.info(`Testing MariaDB connection`);
|
this.#pool.on('remove', (nodeId) =>
|
||||||
await new Promise<void>((resolve, reject) => {
|
{
|
||||||
|
this.#logger.status(`Node ${nodeId} was removed from pool`);
|
||||||
|
const index = this.#nodes.findIndex(n => n === nodeId);
|
||||||
|
this.#nodes.splice(index, 1);
|
||||||
|
});
|
||||||
|
|
||||||
|
this.#logger.info('Testing MariaDB connection');
|
||||||
|
await new Promise<void>((resolve, reject) =>
|
||||||
|
{
|
||||||
if (!this.#pool)
|
if (!this.#pool)
|
||||||
return reject(new Error('Missing connection pool'));
|
return reject(new Error('Missing connection pool'));
|
||||||
this.#pool.getConnection((err, conn) => {
|
this.#pool.getConnection((err, conn) =>
|
||||||
|
{
|
||||||
if (err)
|
if (err)
|
||||||
return reject(err);
|
return reject(err);
|
||||||
conn.release();
|
conn.release();
|
||||||
@ -125,27 +157,31 @@ class MariaDB {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
this.#logger.status(`Database connected`);
|
this.#logger.status('Database connected');
|
||||||
this.#_ready = true;
|
this.#_ready = true;
|
||||||
|
|
||||||
return this;
|
return this;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async close () {
|
async close ()
|
||||||
this.#logger.status(`Shutting down database connections`);
|
{
|
||||||
|
this.#logger.status('Shutting down database connections');
|
||||||
if (!this.ready)
|
if (!this.ready)
|
||||||
return Promise.resolve();
|
return Promise.resolve();
|
||||||
this.#_ready = false;
|
this.#_ready = false;
|
||||||
if (this.#_activeQueries) {
|
if (this.#_activeQueries)
|
||||||
|
{
|
||||||
this.#logger.info(`${this.#_activeQueries} active queries still running, letting them finish`);
|
this.#logger.info(`${this.#_activeQueries} active queries still running, letting them finish`);
|
||||||
await this.finishQueries();
|
await this.finishQueries();
|
||||||
this.#logger.info(`Queries finished, shutting down`);
|
this.#logger.info('Queries finished, shutting down');
|
||||||
}
|
}
|
||||||
return new Promise<void>(resolve => {
|
return new Promise<void>(resolve =>
|
||||||
|
{
|
||||||
if (!this.#pool)
|
if (!this.#pool)
|
||||||
return resolve();
|
return resolve();
|
||||||
this.#pool.end(() => {
|
this.#pool.end(() =>
|
||||||
|
{
|
||||||
this.#pool?.removeAllListeners();
|
this.#pool?.removeAllListeners();
|
||||||
resolve();
|
resolve();
|
||||||
});
|
});
|
||||||
@ -153,21 +189,34 @@ class MariaDB {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
finishQueries () {
|
finishQueries ()
|
||||||
return new Promise<void>(resolve => {
|
{
|
||||||
|
return new Promise<void>(resolve =>
|
||||||
|
{
|
||||||
this.#afterLastQuery = resolve;
|
this.#afterLastQuery = resolve;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
getConnection (): Promise<PoolConnection> {
|
getConnection (node?: string): Promise<PoolConnection>
|
||||||
return new Promise((resolve, reject) => {
|
{
|
||||||
|
return new Promise((resolve, reject) =>
|
||||||
|
{
|
||||||
if (!this.#pool)
|
if (!this.#pool)
|
||||||
return reject(new Error('Pool closed'));
|
return reject(new Error('Pool closed'));
|
||||||
this.#pool.getConnection((err, conn) => {
|
// Get node by name
|
||||||
|
const pool = this.#pool;
|
||||||
|
if (node && !this.#nodes.includes(node))
|
||||||
|
{
|
||||||
|
this.#logger.warn(`Node ${node} is not available in pool, falling back to arbitrary node`);
|
||||||
|
node = '*';
|
||||||
|
}
|
||||||
|
return pool.of(node ?? '*').getConnection((err, conn) =>
|
||||||
|
{
|
||||||
if (err)
|
if (err)
|
||||||
return reject(err);
|
return reject(err);
|
||||||
resolve(conn);
|
return resolve(conn);
|
||||||
});
|
});
|
||||||
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -176,11 +225,16 @@ class MariaDB {
|
|||||||
* @throws {MariaError}
|
* @throws {MariaError}
|
||||||
* @private
|
* @private
|
||||||
* */
|
* */
|
||||||
async #_query<T> (query: string, values: (string | number | string[] | number[])[], timeout?: number, attempts = 0): Promise<T[] | FieldInfo[]> {
|
async #_query<T> (query: string, values: (string | number | string[] | number[])[], { timeout, node }: QueryOptions = {}, attempts = 0):
|
||||||
const connection = await this.getConnection();
|
Promise<T[] | FieldInfo[]>
|
||||||
try {
|
{
|
||||||
const result = await new Promise<T[] | FieldInfo[] | undefined>((resolve, reject) => {
|
const connection = await this.getConnection(node);
|
||||||
const q = connection.query({ timeout, sql: query }, [ values ], (err, results, fields) => {
|
try
|
||||||
|
{
|
||||||
|
const result = await new Promise<T[] | FieldInfo[] | undefined>((resolve, reject) =>
|
||||||
|
{
|
||||||
|
const q = connection.query({ timeout, sql: query }, values, (err, results, fields) =>
|
||||||
|
{
|
||||||
if (err)
|
if (err)
|
||||||
reject(err);
|
reject(err);
|
||||||
else if (results)
|
else if (results)
|
||||||
@ -192,16 +246,19 @@ class MariaDB {
|
|||||||
this.#logger.debug(`Constructed query: ${q.sql}`);
|
this.#logger.debug(`Constructed query: ${q.sql}`);
|
||||||
});
|
});
|
||||||
return Promise.resolve(result || []);
|
return Promise.resolve(result || []);
|
||||||
} catch (err) {
|
}
|
||||||
|
catch (err)
|
||||||
|
{
|
||||||
const error = err as MariaError;
|
const error = err as MariaError;
|
||||||
// Retry safe errors // (Galera) Instance not ready for query
|
// Retry safe errors // (Galera) Instance not ready for query
|
||||||
if ((SAFE_TO_RETRY.includes(error.code) || error.errno === 1047) && attempts < 5) //
|
if ((SAFE_TO_RETRY.includes(error.code) || error.errno === 1047) && attempts < 5) //
|
||||||
return this.#_query(query, values, timeout, ++attempts);
|
return this.#_query(query, values, { timeout, node }, ++attempts);
|
||||||
return Promise.reject(error);
|
return Promise.reject(error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async query<T> (query: string, values: (string | number | string[] | number[])[], timeout?: number): Promise<T[] | FieldInfo[]> {
|
async query<T> (query: string, values: (string | number | string[] | number[])[], opts?: QueryOptions): Promise<T[] | FieldInfo[]>
|
||||||
|
{
|
||||||
|
|
||||||
if (!this.ready)
|
if (!this.ready)
|
||||||
return Promise.reject(new Error('MariaDB not ready'));
|
return Promise.reject(new Error('MariaDB not ready'));
|
||||||
@ -211,12 +268,13 @@ class MariaDB {
|
|||||||
batch = values.some(val => val instanceof Array);
|
batch = values.some(val => val instanceof Array);
|
||||||
this.#logger.debug(`Incoming query (batch: ${batch})\n${query}\n${inspect(values)}`);
|
this.#logger.debug(`Incoming query (batch: ${batch})\n${query}\n${inspect(values)}`);
|
||||||
|
|
||||||
return this.#_query<T>(query, values, timeout);
|
return this.#_query<T>(query, values, opts);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
q<T> (query: string, values: (string | number | string[] | number[])[], timeout?: number) {
|
q<T> (query: string, values: (string | number | string[] | number[])[], opts?: QueryOptions)
|
||||||
return this.query<T>(query, values, timeout);
|
{
|
||||||
|
return this.query<T>(query, values, opts);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
import { ILogger, IServer } from "./interfaces/index.js";
|
import { ILogger, IServer } from './interfaces/index.js';
|
||||||
import amqp, { AmqpConnectionManager, ChannelWrapper } from 'amqp-connection-manager';
|
import amqp, { AmqpConnectionManager, ChannelWrapper } from 'amqp-connection-manager';
|
||||||
import { Channel, ConfirmChannel, ConsumeMessage, Options } from 'amqplib';
|
import { Channel, ConfirmChannel, ConsumeMessage, Options } from 'amqplib';
|
||||||
|
|
||||||
@ -56,7 +56,8 @@ type Consumer<T = any> = (content: T, msg: ConsumeMessage) => Promise<void>
|
|||||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||||
type Subscriber<T = any> = (content: T, msg: ConsumeMessage) => Promise<void>
|
type Subscriber<T = any> = (content: T, msg: ConsumeMessage) => Promise<void>
|
||||||
|
|
||||||
class MessageBroker {
|
class MessageBroker
|
||||||
|
{
|
||||||
|
|
||||||
// Broker definitions
|
// Broker definitions
|
||||||
#load: boolean;
|
#load: boolean;
|
||||||
@ -80,9 +81,10 @@ class MessageBroker {
|
|||||||
#_qQueue: InternalQueueMsg[];
|
#_qQueue: InternalQueueMsg[];
|
||||||
#_qTO: NodeJS.Timeout | null;
|
#_qTO: NodeJS.Timeout | null;
|
||||||
|
|
||||||
constructor (server: IServer, options: BrokerOptions) {
|
constructor (server: IServer, options: BrokerOptions)
|
||||||
|
{
|
||||||
|
|
||||||
this.#load = options.load || false;
|
this.#load = options.load ?? true;
|
||||||
this.#hosts = options.host.split(',');
|
this.#hosts = options.host.split(',');
|
||||||
this.#username = options.user;
|
this.#username = options.user;
|
||||||
this.#password = options.pass;
|
this.#password = options.pass;
|
||||||
@ -114,7 +116,8 @@ class MessageBroker {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async init () {
|
async init ()
|
||||||
|
{
|
||||||
|
|
||||||
if (!this.#load)
|
if (!this.#load)
|
||||||
return this.#logger.info('Not loading RabbitMQ');
|
return this.#logger.info('Not loading RabbitMQ');
|
||||||
@ -124,23 +127,29 @@ class MessageBroker {
|
|||||||
const connectionStrings = this.#hosts.map(host => `amqp://${credentials}${host}:${this.#port}/${this.#vhost}`);
|
const connectionStrings = this.#hosts.map(host => `amqp://${credentials}${host}:${this.#port}/${this.#vhost}`);
|
||||||
this.#connection = await amqp.connect(connectionStrings);
|
this.#connection = await amqp.connect(connectionStrings);
|
||||||
|
|
||||||
this.#connection.on('disconnect', async ({ err }) => {
|
this.#connection.on('disconnect', async ({ err }) =>
|
||||||
|
{
|
||||||
this.#logger.status(`Disconnected: ${err.message}`);
|
this.#logger.status(`Disconnected: ${err.message}`);
|
||||||
await this.#channel?.close();
|
await this.#channel?.close();
|
||||||
this.#channel = null;
|
this.#channel = null;
|
||||||
});
|
});
|
||||||
this.#connection.on('blocked', ({ reason }) => {
|
this.#connection.on('blocked', ({ reason }) =>
|
||||||
|
{
|
||||||
this.#logger.status(`Blocked: ${reason}`);
|
this.#logger.status(`Blocked: ${reason}`);
|
||||||
});
|
});
|
||||||
this.#connection.on('connectFailed', ({ err }) => {
|
this.#connection.on('connectFailed', ({ err }) =>
|
||||||
|
{
|
||||||
this.#logger.error(`Message broker failed to connect: ${err.stack || err.message}`);
|
this.#logger.error(`Message broker failed to connect: ${err.stack || err.message}`);
|
||||||
});
|
});
|
||||||
this.#connection.on('connect', async ({ url }) => {
|
this.#connection.on('connect', async ({ url }) =>
|
||||||
|
{
|
||||||
this.#logger.status(`Message broker connected to ${url}`);
|
this.#logger.status(`Message broker connected to ${url}`);
|
||||||
});
|
});
|
||||||
|
|
||||||
await new Promise<void>((resolve, reject) => {
|
await new Promise<void>((resolve, reject) =>
|
||||||
this.#connection?.once('connect', () => {
|
{
|
||||||
|
this.#connection?.once('connect', () =>
|
||||||
|
{
|
||||||
this.#connection?.removeListener('connectFailed', reject);
|
this.#connection?.removeListener('connectFailed', reject);
|
||||||
resolve();
|
resolve();
|
||||||
});
|
});
|
||||||
@ -152,12 +161,15 @@ class MessageBroker {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async close () {
|
async close ()
|
||||||
if (this.#channel) {
|
{
|
||||||
|
if (this.#channel)
|
||||||
|
{
|
||||||
await this.#channel.close();
|
await this.#channel.close();
|
||||||
this.#channel.removeAllListeners();
|
this.#channel.removeAllListeners();
|
||||||
}
|
}
|
||||||
if (this.#connection) {
|
if (this.#connection)
|
||||||
|
{
|
||||||
await this.#connection.close();
|
await this.#connection.close();
|
||||||
// No clue why the removeAllListeners function isn't exposed from the connection manager, but it exists since it's an EventEmitter
|
// No clue why the removeAllListeners function isn't exposed from the connection manager, but it exists since it's an EventEmitter
|
||||||
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
|
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
|
||||||
@ -166,11 +178,13 @@ class MessageBroker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async createChannel () {
|
async createChannel ()
|
||||||
|
{
|
||||||
const exchanges = Object.entries(this.#exchanges);
|
const exchanges = Object.entries(this.#exchanges);
|
||||||
const queues = Object.entries(this.#queues);
|
const queues = Object.entries(this.#queues);
|
||||||
|
|
||||||
if (this.#channel) {
|
if (this.#channel)
|
||||||
|
{
|
||||||
this.#logger.debug('Closing old channel');
|
this.#logger.debug('Closing old channel');
|
||||||
await this.#channel.close().catch(err => this.#logger.error(err.stack));
|
await this.#channel.close().catch(err => this.#logger.error(err.stack));
|
||||||
}
|
}
|
||||||
@ -179,11 +193,14 @@ class MessageBroker {
|
|||||||
throw new Error();
|
throw new Error();
|
||||||
this.#logger.debug('Creating channel');
|
this.#logger.debug('Creating channel');
|
||||||
this.#channel = this.#connection.createChannel({
|
this.#channel = this.#connection.createChannel({
|
||||||
setup: async (channel: Channel | ConfirmChannel) => {
|
setup: async (channel: Channel | ConfirmChannel) =>
|
||||||
for (const [ name, props ] of exchanges) {
|
{
|
||||||
|
for (const [ name, props ] of exchanges)
|
||||||
|
{
|
||||||
await channel.assertExchange(name, props.type ?? 'fanout', props);
|
await channel.assertExchange(name, props.type ?? 'fanout', props);
|
||||||
}
|
}
|
||||||
for (const [ name, props ] of queues) {
|
for (const [ name, props ] of queues)
|
||||||
|
{
|
||||||
await channel.assertQueue(name, props);
|
await channel.assertQueue(name, props);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -193,10 +210,12 @@ class MessageBroker {
|
|||||||
this.#channel.on('connect', () => this.#logger.status('Channel connected'));
|
this.#channel.on('connect', () => this.#logger.status('Channel connected'));
|
||||||
this.#channel.on('error', (err, info) => this.#logger.error(`${info.name}: ${err.stack}`));
|
this.#channel.on('error', (err, info) => this.#logger.error(`${info.name}: ${err.stack}`));
|
||||||
|
|
||||||
await new Promise<void>((resolve, reject) => {
|
await new Promise<void>((resolve, reject) =>
|
||||||
|
{
|
||||||
if (!this.#channel)
|
if (!this.#channel)
|
||||||
return reject(new Error('Missing channel?'));
|
return reject(new Error('Missing channel?'));
|
||||||
this.#channel.once('connect', () => {
|
this.#channel.once('connect', () =>
|
||||||
|
{
|
||||||
this.#channel?.removeListener('error', reject);
|
this.#channel?.removeListener('error', reject);
|
||||||
resolve();
|
resolve();
|
||||||
});
|
});
|
||||||
@ -210,7 +229,8 @@ class MessageBroker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Consume queue
|
// Consume queue
|
||||||
async consume<T> (queue: string, consumer: Consumer<T>, options?: Options.Consume) {
|
async consume<T> (queue: string, consumer: Consumer<T>, options?: Options.Consume)
|
||||||
|
{
|
||||||
if (!this.#channel)
|
if (!this.#channel)
|
||||||
throw new Error('Channel does not exist');
|
throw new Error('Channel does not exist');
|
||||||
|
|
||||||
@ -222,10 +242,12 @@ class MessageBroker {
|
|||||||
this.#consumers.set(queue, list);
|
this.#consumers.set(queue, list);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async _consume<T> (queue: string, consumer: Consumer<T>, options?: Options.Consume): Promise<void> {
|
private async _consume<T> (queue: string, consumer: Consumer<T>, options?: Options.Consume): Promise<void>
|
||||||
|
{
|
||||||
if (!this.#channel)
|
if (!this.#channel)
|
||||||
return Promise.reject(new Error('Channel doesn\'t exist'));
|
return Promise.reject(new Error('Channel doesn\'t exist'));
|
||||||
await this.#channel.consume(queue, async (msg: ConsumeMessage) => {
|
await this.#channel.consume(queue, async (msg: ConsumeMessage) =>
|
||||||
|
{
|
||||||
if (msg.content)
|
if (msg.content)
|
||||||
await consumer(JSON.parse(msg.content.toString()), msg);
|
await consumer(JSON.parse(msg.content.toString()), msg);
|
||||||
this.#channel?.ack(msg);
|
this.#channel?.ack(msg);
|
||||||
@ -233,7 +255,8 @@ class MessageBroker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe to exchange, ensures messages aren't lost by binding it to a queue
|
// Subscribe to exchange, ensures messages aren't lost by binding it to a queue
|
||||||
async subscribe<T> (name: string, listener: Subscriber<T>) {
|
async subscribe<T> (name: string, listener: Subscriber<T | Buffer>)
|
||||||
|
{
|
||||||
if (!this.#channel)
|
if (!this.#channel)
|
||||||
throw new Error('Channel does not exist');
|
throw new Error('Channel does not exist');
|
||||||
|
|
||||||
@ -247,7 +270,8 @@ class MessageBroker {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private async _subscribe<T> (name: string, listener: Subscriber<T>): Promise<void> {
|
private async _subscribe<T> (name: string, listener: Subscriber<T | Buffer>): Promise<void>
|
||||||
|
{
|
||||||
if (!this.#channel)
|
if (!this.#channel)
|
||||||
return Promise.reject(new Error('Channel doesn\'t exist'));
|
return Promise.reject(new Error('Channel doesn\'t exist'));
|
||||||
|
|
||||||
@ -255,15 +279,19 @@ class MessageBroker {
|
|||||||
const queue = await this.#channel.assertQueue('', { exclusive: true });
|
const queue = await this.#channel.assertQueue('', { exclusive: true });
|
||||||
|
|
||||||
await this.#channel.bindQueue(queue.queue, name, '');
|
await this.#channel.bindQueue(queue.queue, name, '');
|
||||||
await this.#channel.consume(queue.queue, async (msg) => {
|
await this.#channel.consume(queue.queue, async (msg) =>
|
||||||
if (msg.content)
|
{
|
||||||
|
if (msg.content && msg.content.toString().startsWith('{'))
|
||||||
await listener(JSON.parse(msg.content.toString()), msg);
|
await listener(JSON.parse(msg.content.toString()), msg);
|
||||||
|
else
|
||||||
|
await listener(msg.content, msg);
|
||||||
this.#channel?.ack(msg);
|
this.#channel?.ack(msg);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add item to queue
|
// Add item to queue
|
||||||
async enqueue (queue: string, content: object, headers?: object): Promise<void | number> {
|
async enqueue (queue: string, content: object, headers?: object): Promise<void | number>
|
||||||
|
{
|
||||||
const properties = {
|
const properties = {
|
||||||
persistent: true,
|
persistent: true,
|
||||||
contentType: 'application/json',
|
contentType: 'application/json',
|
||||||
@ -274,9 +302,12 @@ class MessageBroker {
|
|||||||
if (!this.#channel)
|
if (!this.#channel)
|
||||||
return this.#_qQueue.push({ queue, content, properties });
|
return this.#_qQueue.push({ queue, content, properties });
|
||||||
|
|
||||||
try {
|
try
|
||||||
|
{
|
||||||
await this.#channel.sendToQueue(queue, Buffer.from(JSON.stringify(content)), properties);
|
await this.#channel.sendToQueue(queue, Buffer.from(JSON.stringify(content)), properties);
|
||||||
} catch (_) {
|
}
|
||||||
|
catch (_)
|
||||||
|
{
|
||||||
this.#_qQueue.push({ queue, content, properties });
|
this.#_qQueue.push({ queue, content, properties });
|
||||||
if (!this.#_qTO)
|
if (!this.#_qTO)
|
||||||
this.#_qTO = setTimeout(this.#_processQueues.bind(this), 5000);
|
this.#_qTO = setTimeout(this.#_processQueues.bind(this), 5000);
|
||||||
@ -284,7 +315,8 @@ class MessageBroker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Publish to exchange
|
// Publish to exchange
|
||||||
async publish (exchange: string, content: object, { headers, routingKey = '' }: {headers?: string, routingKey?: string} = {}): Promise<void|number> {
|
async publish (exchange: string, content: object, { headers, routingKey = '' }: {headers?: string, routingKey?: string} = {}): Promise<void|number>
|
||||||
|
{
|
||||||
const properties = {
|
const properties = {
|
||||||
contentType: 'application/json',
|
contentType: 'application/json',
|
||||||
headers
|
headers
|
||||||
@ -294,9 +326,12 @@ class MessageBroker {
|
|||||||
if (!this.#channel)
|
if (!this.#channel)
|
||||||
return this.#_pQueue.push({ exchange, content, routingKey, properties });
|
return this.#_pQueue.push({ exchange, content, routingKey, properties });
|
||||||
|
|
||||||
try {
|
try
|
||||||
|
{
|
||||||
await this.#channel.publish(exchange, routingKey, Buffer.from(JSON.stringify(content)), properties);
|
await this.#channel.publish(exchange, routingKey, Buffer.from(JSON.stringify(content)), properties);
|
||||||
} catch (err) {
|
}
|
||||||
|
catch (err)
|
||||||
|
{
|
||||||
const error = err as Error;
|
const error = err as Error;
|
||||||
if (!error.message.includes('nack'))
|
if (!error.message.includes('nack'))
|
||||||
this.#logger.error(`Error while publishing to ${exchange}:\n${error.stack}`);
|
this.#logger.error(`Error while publishing to ${exchange}:\n${error.stack}`);
|
||||||
@ -306,62 +341,74 @@ class MessageBroker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assertExchange (exchange: string, props?: ExchangeDef) {
|
assertExchange (exchange: string, props?: ExchangeDef)
|
||||||
|
{
|
||||||
if (!this.#channel)
|
if (!this.#channel)
|
||||||
throw new Error('Channel doesn\'t exist');
|
throw new Error('Channel doesn\'t exist');
|
||||||
return this.#channel.assertExchange(exchange, props?.type ?? 'fanout', props);
|
return this.#channel.assertExchange(exchange, props?.type ?? 'fanout', props);
|
||||||
}
|
}
|
||||||
|
|
||||||
assertQueue (queue: string, opts?: QueueDef) {
|
assertQueue (queue: string, opts?: QueueDef)
|
||||||
|
{
|
||||||
if (!this.#channel)
|
if (!this.#channel)
|
||||||
throw new Error('Channel doesn\'t exist');
|
throw new Error('Channel doesn\'t exist');
|
||||||
return this.#channel.assertQueue(queue, opts);
|
return this.#channel.assertQueue(queue, opts);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Processes messages queued up while the broker was unreachable
|
// Processes messages queued up while the broker was unreachable
|
||||||
async #_processQueues () {
|
async #_processQueues ()
|
||||||
|
{
|
||||||
if (!this.#channel)
|
if (!this.#channel)
|
||||||
throw new Error('Channel doesn\'t exist');
|
throw new Error('Channel doesn\'t exist');
|
||||||
this.#logger.status('Processing queues of unsent messages');
|
this.#logger.status('Processing queues of unsent messages');
|
||||||
const pQ = [ ...this.#_pQueue ];
|
const pQ = [ ...this.#_pQueue ];
|
||||||
this.#_pQueue = [];
|
this.#_pQueue = [];
|
||||||
for (const msg of pQ) {
|
for (const msg of pQ)
|
||||||
|
{
|
||||||
const { exchange, content, routingKey, properties } = msg;
|
const { exchange, content, routingKey, properties } = msg;
|
||||||
const result = await this.#channel.publish(exchange, routingKey, Buffer.from(JSON.stringify(content)), properties).catch(() => null);
|
const result = await this.#channel.publish(exchange, routingKey, Buffer.from(JSON.stringify(content)), properties)
|
||||||
|
.catch(() => null);
|
||||||
if (!result)
|
if (!result)
|
||||||
this.#_pQueue.push(msg);
|
this.#_pQueue.push(msg);
|
||||||
}
|
}
|
||||||
const qQ = [ ...this.#_qQueue ];
|
const qQ = [ ...this.#_qQueue ];
|
||||||
for (const msg of qQ) {
|
for (const msg of qQ)
|
||||||
|
{
|
||||||
const { queue, content, properties } = msg;
|
const { queue, content, properties } = msg;
|
||||||
const result = await this.#channel.sendToQueue(queue, Buffer.from(JSON.stringify(content)), properties).catch(() => null);
|
const result = await this.#channel.sendToQueue(queue, Buffer.from(JSON.stringify(content)), properties).catch(() => null);
|
||||||
if (!result)
|
if (!result)
|
||||||
this.#_qQueue.push(msg);
|
this.#_qQueue.push(msg);
|
||||||
}
|
}
|
||||||
this.#logger.status('Done processing');
|
this.#logger.status('Done processing');
|
||||||
if (this.#_qTO) {
|
if (this.#_qTO)
|
||||||
|
{
|
||||||
clearTimeout(this.#_qTO);
|
clearTimeout(this.#_qTO);
|
||||||
this.#_qTO = null;
|
this.#_qTO = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async #_restoreListeners () {
|
async #_restoreListeners ()
|
||||||
|
{
|
||||||
this.#logger.status(`Restoring ${this.#consumers.size} consumers`);
|
this.#logger.status(`Restoring ${this.#consumers.size} consumers`);
|
||||||
for (const [ name, list ] of this.#consumers) {
|
for (const [ name, list ] of this.#consumers)
|
||||||
|
{
|
||||||
this.#logger.debug(`Processing consumer ${name}: ${list.length}`);
|
this.#logger.debug(`Processing consumer ${name}: ${list.length}`);
|
||||||
for (const { consumer, options } of list) {
|
for (const { consumer, options } of list)
|
||||||
|
{
|
||||||
await this._consume(name, consumer, options);
|
await this._consume(name, consumer, options);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this.#logger.status(`Restoring ${this.#subscribers.size} subscribers`);
|
this.#logger.status(`Restoring ${this.#subscribers.size} subscribers`);
|
||||||
for (const [ name, list ] of this.#subscribers) {
|
for (const [ name, list ] of this.#subscribers)
|
||||||
|
{
|
||||||
this.#logger.debug(`Processing subscriber ${name}: ${list.length}`);
|
this.#logger.debug(`Processing subscriber ${name}: ${list.length}`);
|
||||||
for (const subscriber of list) {
|
for (const subscriber of list)
|
||||||
|
{
|
||||||
await this._subscribe(name, subscriber);
|
await this._subscribe(name, subscriber);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.#logger.status(`Done restoring`);
|
this.#logger.status('Done restoring');
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
117
src/MongoDB.ts
117
src/MongoDB.ts
@ -1,6 +1,6 @@
|
|||||||
import { inspect } from "node:util";
|
import { inspect } from 'node:util';
|
||||||
import { MongoClient, MongoClientOptions, Db, Document, WithId, ObjectId, Filter, IndexSpecification, CreateIndexesOptions, FindOptions } from "mongodb";
|
import { MongoClient, MongoClientOptions, Db, Document, WithId, ObjectId, Filter, IndexSpecification, CreateIndexesOptions, FindOptions } from 'mongodb';
|
||||||
import { IServer, ILogger, LoggerClientOptions } from "./interfaces/index.js";
|
import { IServer, ILogger, LoggerClientOptions } from './interfaces/index.js';
|
||||||
|
|
||||||
type Credentials = {
|
type Credentials = {
|
||||||
URI?: string,
|
URI?: string,
|
||||||
@ -29,7 +29,8 @@ export type MongoOptions = {
|
|||||||
*
|
*
|
||||||
* @class MongoDB
|
* @class MongoDB
|
||||||
*/
|
*/
|
||||||
class MongoDB {
|
class MongoDB
|
||||||
|
{
|
||||||
|
|
||||||
#_database: string;
|
#_database: string;
|
||||||
#config: MongoOptions;
|
#config: MongoOptions;
|
||||||
@ -39,7 +40,8 @@ class MongoDB {
|
|||||||
#db: Db | null;
|
#db: Db | null;
|
||||||
#_client: MongoClient;
|
#_client: MongoClient;
|
||||||
|
|
||||||
constructor (server: IServer, config: MongoOptions) {
|
constructor (server: IServer, config: MongoOptions)
|
||||||
|
{
|
||||||
|
|
||||||
if (!server)
|
if (!server)
|
||||||
throw new Error('Missing reference to server!');
|
throw new Error('Missing reference to server!');
|
||||||
@ -48,7 +50,7 @@ class MongoDB {
|
|||||||
|
|
||||||
const { user, password, host, port, database, URI, authDb } = config.credentials;
|
const { user, password, host, port, database, URI, authDb } = config.credentials;
|
||||||
if ((!host?.length || !port || !database?.length) && !URI)
|
if ((!host?.length || !port || !database?.length) && !URI)
|
||||||
throw new Error(`Must provide host, port, and database OR URI parameters!`);
|
throw new Error('Must provide host, port, and database OR URI parameters!');
|
||||||
|
|
||||||
this.#config = config;
|
this.#config = config;
|
||||||
this.#db = null; // DB connection
|
this.#db = null; // DB connection
|
||||||
@ -56,16 +58,22 @@ class MongoDB {
|
|||||||
|
|
||||||
this.#logger = server.createLogger(this, config.loggerOptions);
|
this.#logger = server.createLogger(this, config.loggerOptions);
|
||||||
|
|
||||||
if (URI) {
|
if (URI)
|
||||||
|
{
|
||||||
this.#URI = URI;
|
this.#URI = URI;
|
||||||
} else {
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
let AUTH_DB = authDb;
|
let AUTH_DB = authDb;
|
||||||
const auth = user ? `${user}:${password}@` : '';
|
const auth = user ? `${user}:${password}@` : '';
|
||||||
if (!AUTH_DB && auth) {
|
if (!AUTH_DB && auth)
|
||||||
this.#logger.warn(`No explicit auth db provided with MONGO_AUTH_DB, will attempt to use MONGO_DB for auth source`);
|
{
|
||||||
|
this.#logger.warn('No explicit auth db provided with MONGO_AUTH_DB, will attempt to use MONGO_DB for auth source');
|
||||||
AUTH_DB = authDb;
|
AUTH_DB = authDb;
|
||||||
} else if (!auth) {
|
}
|
||||||
this.#logger.warn(`No auth provided, proceeding without`);
|
else if (!auth)
|
||||||
|
{
|
||||||
|
this.#logger.warn('No auth provided, proceeding without');
|
||||||
}
|
}
|
||||||
|
|
||||||
this.#URI = `mongodb://${auth}${host}:${port}/${AUTH_DB || ''}?readPreference=secondaryPreferred`;
|
this.#URI = `mongodb://${auth}${host}:${port}/${AUTH_DB || ''}?readPreference=secondaryPreferred`;
|
||||||
@ -75,17 +83,19 @@ class MongoDB {
|
|||||||
|
|
||||||
// TODO figure out reconnecting to DB when connection fails
|
// TODO figure out reconnecting to DB when connection fails
|
||||||
this.#_client.on('error', (error) => this.#logger.error(`MongoDB error:\n${error.stack}`))
|
this.#_client.on('error', (error) => this.#logger.error(`MongoDB error:\n${error.stack}`))
|
||||||
.on('timeout', () => this.#logger.warn(`MongoDB timed out`))
|
.on('timeout', () => this.#logger.warn('MongoDB timed out'))
|
||||||
.on('close', () => this.#logger.info(`MongoDB client disconnected`))
|
.on('close', () => this.#logger.info('MongoDB client disconnected'))
|
||||||
.on('open', () => this.#logger.info(`MongoDB client connected`));
|
.on('open', () => this.#logger.info('MongoDB client connected'));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
get database () {
|
get database ()
|
||||||
|
{
|
||||||
return this.#_database;
|
return this.#_database;
|
||||||
}
|
}
|
||||||
|
|
||||||
get client () {
|
get client ()
|
||||||
|
{
|
||||||
return this.#_client;
|
return this.#_client;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -94,7 +104,8 @@ class MongoDB {
|
|||||||
*
|
*
|
||||||
* @memberof MongoDB
|
* @memberof MongoDB
|
||||||
*/
|
*/
|
||||||
async init () {
|
async init ()
|
||||||
|
{
|
||||||
|
|
||||||
if (!this.#config.load)
|
if (!this.#config.load)
|
||||||
return this.#logger.info('Not loading MongoDB');
|
return this.#logger.info('Not loading MongoDB');
|
||||||
@ -105,7 +116,7 @@ class MongoDB {
|
|||||||
this.#logger.status(`Initializing database connection to ${this.#_client.options.hosts}`);
|
this.#logger.status(`Initializing database connection to ${this.#_client.options.hosts}`);
|
||||||
|
|
||||||
await this.#_client.connect();
|
await this.#_client.connect();
|
||||||
this.#logger.debug(`Connected, selecting DB`);
|
this.#logger.debug('Connected, selecting DB');
|
||||||
this.#db = await this.#_client.db(this.#_database);
|
this.#db = await this.#_client.db(this.#_database);
|
||||||
|
|
||||||
this.#logger.status('MongoDB ready');
|
this.#logger.status('MongoDB ready');
|
||||||
@ -114,7 +125,8 @@ class MongoDB {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async close () {
|
async close ()
|
||||||
|
{
|
||||||
if (!this.#db)
|
if (!this.#db)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
@ -124,7 +136,8 @@ class MongoDB {
|
|||||||
this.#logger.status('Database closed');
|
this.#logger.status('Database closed');
|
||||||
}
|
}
|
||||||
|
|
||||||
get mongoClient () {
|
get mongoClient ()
|
||||||
|
{
|
||||||
return this.#_client;
|
return this.#_client;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -136,10 +149,11 @@ class MongoDB {
|
|||||||
* @returns {Array} An array containing the corresponding objects for the query
|
* @returns {Array} An array containing the corresponding objects for the query
|
||||||
* @memberof Database
|
* @memberof Database
|
||||||
*/
|
*/
|
||||||
async find<T extends Document> (db: string, query: MongoQuery, options?: FindOptions<T>): Promise<WithId<T>[]> {
|
async find<T extends Document> (db: string, query: MongoQuery, options?: FindOptions<T>): Promise<WithId<T>[]>
|
||||||
|
{
|
||||||
|
|
||||||
if (!this.#db)
|
if (!this.#db)
|
||||||
throw new Error(`MongoDB not connected`);
|
throw new Error('MongoDB not connected');
|
||||||
|
|
||||||
if (typeof db !== 'string')
|
if (typeof db !== 'string')
|
||||||
throw new TypeError('Expecting collection name for the first argument');
|
throw new TypeError('Expecting collection name for the first argument');
|
||||||
@ -162,10 +176,11 @@ class MongoDB {
|
|||||||
* @returns {Object} An object containing the queried data
|
* @returns {Object} An object containing the queried data
|
||||||
* @memberof Database
|
* @memberof Database
|
||||||
*/
|
*/
|
||||||
async findOne<T extends Document> (db: string, query: MongoQuery, options: FindOptions<T> = {}): Promise<WithId<T> | null> {
|
async findOne<T extends Document> (db: string, query: MongoQuery, options: FindOptions<T> = {}): Promise<WithId<T> | null>
|
||||||
|
{
|
||||||
|
|
||||||
if (!this.#db)
|
if (!this.#db)
|
||||||
throw new Error(`MongoDB not connected`);
|
throw new Error('MongoDB not connected');
|
||||||
if (typeof db !== 'string')
|
if (typeof db !== 'string')
|
||||||
throw new TypeError('Expecting collection name for the first argument');
|
throw new TypeError('Expecting collection name for the first argument');
|
||||||
|
|
||||||
@ -187,14 +202,15 @@ class MongoDB {
|
|||||||
* @returns {WriteResult} Object containing the followint counts: Matched, Upserted, Modified
|
* @returns {WriteResult} Object containing the followint counts: Matched, Upserted, Modified
|
||||||
* @memberof Database
|
* @memberof Database
|
||||||
*/
|
*/
|
||||||
async updateMany<T extends Document> (db: string, filter: MongoQuery, data: T, upsert = false) {
|
async updateMany<T extends Document> (db: string, filter: MongoQuery, data: T, upsert = false)
|
||||||
|
{
|
||||||
|
|
||||||
if (!this.#db)
|
if (!this.#db)
|
||||||
throw new Error(`MongoDB not connected`);
|
throw new Error('MongoDB not connected');
|
||||||
if (typeof db !== 'string')
|
if (typeof db !== 'string')
|
||||||
throw new TypeError('Expecting collection name for the first argument');
|
throw new TypeError('Expecting collection name for the first argument');
|
||||||
if (!filter)
|
if (!filter)
|
||||||
throw new Error(`Cannot run update many without a filter, if you mean to update every single document, pass an empty object`);
|
throw new Error('Cannot run update many without a filter, if you mean to update every single document, pass an empty object');
|
||||||
if (typeof filter._id === 'string')
|
if (typeof filter._id === 'string')
|
||||||
filter._id = new ObjectId(filter._id);
|
filter._id = new ObjectId(filter._id);
|
||||||
|
|
||||||
@ -213,10 +229,11 @@ class MongoDB {
|
|||||||
* @returns {WriteResult} Object containing the followint counts: Matched, Upserted, Modified
|
* @returns {WriteResult} Object containing the followint counts: Matched, Upserted, Modified
|
||||||
* @memberof Database
|
* @memberof Database
|
||||||
*/
|
*/
|
||||||
async updateOne (db: string, filter: MongoQuery, data: Document, upsert = false) {
|
async updateOne (db: string, filter: MongoQuery, data: Document, upsert = false)
|
||||||
|
{
|
||||||
|
|
||||||
if (!this.#db)
|
if (!this.#db)
|
||||||
throw new Error(`MongoDB not connected`);
|
throw new Error('MongoDB not connected');
|
||||||
if (typeof db !== 'string')
|
if (typeof db !== 'string')
|
||||||
throw new TypeError('Expecting collection name for the first argument');
|
throw new TypeError('Expecting collection name for the first argument');
|
||||||
if (typeof filter._id === 'string')
|
if (typeof filter._id === 'string')
|
||||||
@ -237,10 +254,11 @@ class MongoDB {
|
|||||||
* @returns {WriteResult} Object containing the followint counts: Matched, Upserted, Modified
|
* @returns {WriteResult} Object containing the followint counts: Matched, Upserted, Modified
|
||||||
* @memberof Database
|
* @memberof Database
|
||||||
*/
|
*/
|
||||||
async insertOne (db: string, data: Document) {
|
async insertOne (db: string, data: Document)
|
||||||
|
{
|
||||||
|
|
||||||
if (!this.#db)
|
if (!this.#db)
|
||||||
throw new Error(`MongoDB not connected`);
|
throw new Error('MongoDB not connected');
|
||||||
if (typeof db !== 'string')
|
if (typeof db !== 'string')
|
||||||
throw new TypeError('Expecting collection name for the first argument');
|
throw new TypeError('Expecting collection name for the first argument');
|
||||||
if (typeof data._id === 'string')
|
if (typeof data._id === 'string')
|
||||||
@ -252,10 +270,11 @@ class MongoDB {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async deleteOne (db: string, filter: Document) {
|
async deleteOne (db: string, filter: Document)
|
||||||
|
{
|
||||||
|
|
||||||
if (!this.#db)
|
if (!this.#db)
|
||||||
throw new Error(`MongoDB not connected`);
|
throw new Error('MongoDB not connected');
|
||||||
if (typeof db !== 'string')
|
if (typeof db !== 'string')
|
||||||
throw new TypeError('Expecting collection name for the first argument');
|
throw new TypeError('Expecting collection name for the first argument');
|
||||||
if (typeof filter._id === 'string')
|
if (typeof filter._id === 'string')
|
||||||
@ -277,10 +296,11 @@ class MongoDB {
|
|||||||
* @returns
|
* @returns
|
||||||
* @memberof Database
|
* @memberof Database
|
||||||
*/
|
*/
|
||||||
async push (db: string, filter: Document, data: object, upsert = false) {
|
async push (db: string, filter: Document, data: object, upsert = false)
|
||||||
|
{
|
||||||
|
|
||||||
if (!this.#db)
|
if (!this.#db)
|
||||||
throw new Error(`MongoDB not connected`);
|
throw new Error('MongoDB not connected');
|
||||||
if (typeof db !== 'string')
|
if (typeof db !== 'string')
|
||||||
throw new TypeError('Expecting collection name for the first argument');
|
throw new TypeError('Expecting collection name for the first argument');
|
||||||
if (typeof filter._id === 'string')
|
if (typeof filter._id === 'string')
|
||||||
@ -301,10 +321,11 @@ class MongoDB {
|
|||||||
* @returns {object}
|
* @returns {object}
|
||||||
* @memberof Database
|
* @memberof Database
|
||||||
*/
|
*/
|
||||||
random<T extends Document> (db: string, filter: Document = {}, amount = 1) {
|
random<T extends Document> (db: string, filter: Document = {}, amount = 1)
|
||||||
|
{
|
||||||
|
|
||||||
if (!this.#db)
|
if (!this.#db)
|
||||||
throw new Error(`MongoDB not connected`);
|
throw new Error('MongoDB not connected');
|
||||||
if (typeof db !== 'string')
|
if (typeof db !== 'string')
|
||||||
throw new TypeError('Expecting collection name for the first argument');
|
throw new TypeError('Expecting collection name for the first argument');
|
||||||
if (typeof filter._id === 'string')
|
if (typeof filter._id === 'string')
|
||||||
@ -320,28 +341,32 @@ class MongoDB {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
stats (options = {}) {
|
stats (options = {})
|
||||||
|
{
|
||||||
if (!this.#db)
|
if (!this.#db)
|
||||||
throw new Error(`MongoDB not connected`);
|
throw new Error('MongoDB not connected');
|
||||||
const result = this.#db.stats(options);
|
const result = this.#db.stats(options);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
collection<T extends Document> (coll: string) {
|
collection<T extends Document> (coll: string)
|
||||||
|
{
|
||||||
if (!this.#db)
|
if (!this.#db)
|
||||||
throw new Error(`MongoDB not connected`);
|
throw new Error('MongoDB not connected');
|
||||||
return this.#db.collection<T>(coll);
|
return this.#db.collection<T>(coll);
|
||||||
}
|
}
|
||||||
|
|
||||||
count (coll: string, query: Document) {
|
count (coll: string, query: Document)
|
||||||
|
{
|
||||||
if (!this.#db)
|
if (!this.#db)
|
||||||
throw new Error(`MongoDB not connected`);
|
throw new Error('MongoDB not connected');
|
||||||
return this.#db.collection(coll).countDocuments(query);
|
return this.#db.collection(coll).countDocuments(query);
|
||||||
}
|
}
|
||||||
|
|
||||||
async ensureIndex (collection: string, indices: IndexSpecification = [], options?: CreateIndexesOptions) {
|
async ensureIndex (collection: string, indices: IndexSpecification = [], options?: CreateIndexesOptions)
|
||||||
|
{
|
||||||
if (!this.#db)
|
if (!this.#db)
|
||||||
throw new Error(`MongoDB not connected`);
|
throw new Error('MongoDB not connected');
|
||||||
if (!(indices instanceof Array))
|
if (!(indices instanceof Array))
|
||||||
indices = [ indices ];
|
indices = [ indices ];
|
||||||
await this.#db.collection(collection).createIndex(indices, options);
|
await this.#db.collection(collection).createIndex(indices, options);
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
import { ILogger, LoggerClientOptions } from "./Logger.js";
|
import { ILogger, LoggerClientOptions } from "./Logger.js";
|
||||||
|
|
||||||
export interface IServer {
|
export interface IServer {
|
||||||
createLogger(obj: object, options?: LoggerClientOptions): ILogger
|
createLogger(obj: object, options?: Partial<LoggerClientOptions>): ILogger
|
||||||
}
|
}
|
27
tests/testBroker.js
Normal file
27
tests/testBroker.js
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
import { MessageBroker } from '../build/esm/index.js';
|
||||||
|
|
||||||
|
const broker = new MessageBroker({
|
||||||
|
createLogger: () =>
|
||||||
|
{
|
||||||
|
return {
|
||||||
|
debug: console.log,
|
||||||
|
info: console.log,
|
||||||
|
status: console.log,
|
||||||
|
warn: console.log,
|
||||||
|
error: console.error
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}, {
|
||||||
|
load: true,
|
||||||
|
host: 'rabbitmq-01.stylis.local',
|
||||||
|
user: 'stylis',
|
||||||
|
pass: 'RrwJyrfeXFMimDH3hjZ5xSreMAmXtQJj',
|
||||||
|
vhost: 'development',
|
||||||
|
port: 5672
|
||||||
|
});
|
||||||
|
|
||||||
|
await broker.init();
|
||||||
|
broker.subscribe('chatlogs', (message) =>
|
||||||
|
{
|
||||||
|
console.log(message);
|
||||||
|
});
|
@ -4,7 +4,8 @@ import { MariaDB } from '../build/esm/index.js';
|
|||||||
const credentials = JSON.parse(readFileSync('./credentials.json', { encoding: 'utf-8' }));
|
const credentials = JSON.parse(readFileSync('./credentials.json', { encoding: 'utf-8' }));
|
||||||
|
|
||||||
const maria = new MariaDB({
|
const maria = new MariaDB({
|
||||||
createLogger: () => {
|
createLogger: () =>
|
||||||
|
{
|
||||||
return {
|
return {
|
||||||
debug: console.log,
|
debug: console.log,
|
||||||
info: console.log,
|
info: console.log,
|
||||||
|
@ -1,7 +1,8 @@
|
|||||||
import { MongoDB } from '../build/esm/index.js';
|
import { MongoDB } from '../build/esm/index.js';
|
||||||
|
|
||||||
const mongo = new MongoDB({
|
const mongo = new MongoDB({
|
||||||
createLogger: () => {
|
createLogger: () =>
|
||||||
|
{
|
||||||
return {
|
return {
|
||||||
debug: console.log,
|
debug: console.log,
|
||||||
info: console.log,
|
info: console.log,
|
||||||
|
Loading…
Reference in New Issue
Block a user