diff --git a/.eslintrc.json b/.eslintrc.json index 52f270c..1d590cb 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -15,17 +15,38 @@ "sourceType": "module" }, "rules": { + "@typescript-eslint/no-unused-vars": "off", + "@typescript-eslint/no-non-null-assertion":"off", "accessor-pairs": "warn", "array-callback-return": "warn", - "array-bracket-newline": [ "warn", "consistent" ], - "array-bracket-spacing": [ "warn", "always", { "objectsInArrays": false, "arraysInArrays": false }], + "array-bracket-newline": [ + "warn", + "consistent" + ], + "array-bracket-spacing": [ + "warn", + "always", + { + "objectsInArrays": false, + "arraysInArrays": false + } + ], "arrow-spacing": "warn", "block-scoped-var": "warn", - "block-spacing": [ "warn", "always" ], - "brace-style": [ "warn", "1tbs" ], + "block-spacing": [ + "warn", + "always" + ], + "brace-style": [ + "warn", + "allman" + ], "callback-return": "warn", "camelcase": "warn", - "comma-dangle": [ "warn", "only-multiline" ], + "comma-dangle": [ + "warn", + "only-multiline" + ], "comma-spacing": [ "warn", { @@ -68,9 +89,25 @@ "implicit-arrow-linebreak": "warn", "indent": "warn", "init-declarations": "warn", - "jsx-quotes": [ "warn", "prefer-single" ], - "key-spacing": [ "warn", { "beforeColon": false, "afterColon": true }], - "keyword-spacing": [ "warn", { "after": true, "before": true }], + "quotes": ["error" , "single"], + "jsx-quotes": [ + "warn", + "prefer-single" + ], + "key-spacing": [ + "warn", + { + "beforeColon": false, + "afterColon": true + } + ], + "keyword-spacing": [ + "warn", + { + "after": true, + "before": true + } + ], "linebreak-style": [ "error", "unix" @@ -79,10 +116,28 @@ "lines-around-directive": "warn", "max-classes-per-file": "warn", "max-nested-callbacks": "warn", + "max-len": [ + "warn", + { + "code": 140, + "ignoreComments": true, + "ignoreStrings": true, + "ignoreTemplateLiterals": true, + "ignoreRegExpLiterals": true + } + ], + "max-lines-per-function": [ + "warn", + 100 + ], + "max-depth": [ + "warn", + 3 + ], "new-parens": "warn", "no-alert": "warn", "no-array-constructor": "warn", - "no-bitwise": "warn", + // "no-bitwise": "warn", "no-buffer-constructor": "warn", "no-caller": "warn", "no-console": "warn", @@ -96,7 +151,6 @@ "no-extend-native": "warn", "no-extra-bind": "warn", "no-extra-label": "warn", - "no-extra-parens": "warn", "no-floating-decimal": "warn", "no-implicit-coercion": "warn", "no-implicit-globals": "warn", @@ -147,7 +201,7 @@ "no-unmodified-loop-condition": "warn", "no-unneeded-ternary": "error", "no-unused-expressions": "warn", - "no-use-before-define": "error", + "@typescript-eslint/no-use-before-define": "error", "no-useless-call": "warn", "no-useless-computed-key": "warn", "no-useless-concat": "warn", @@ -155,20 +209,39 @@ "no-useless-rename": "warn", "no-useless-return": "warn", "no-var": "warn", - "no-void": "warn", + // "no-void": "warn", "no-whitespace-before-property": "error", - "nonblock-statement-body-position": [ "warn", "below" ], + "nonblock-statement-body-position": [ + "warn", + "below" + ], "object-curly-spacing": [ "warn", "always" ], - "object-property-newline": [ "warn", { "allowAllPropertiesOnSameLine": true }], + "object-property-newline": [ + "warn", + { + "allowAllPropertiesOnSameLine": true + } + ], "object-shorthand": "warn", "one-var-declaration-per-line": "warn", "operator-assignment": "warn", - "operator-linebreak": [ "warn", "before" ], + "operator-linebreak": [ + "warn", + "before" + ], "padding-line-between-statements": "warn", - "padded-blocks": [ "warn", { "switches": "never" }, { "allowSingleLineBlocks": true }], + "padded-blocks": [ + "warn", + { + "switches": "never" + }, + { + "allowSingleLineBlocks": true + } + ], "prefer-arrow-callback": "warn", "prefer-const": "warn", "prefer-destructuring": "warn", @@ -189,12 +262,18 @@ "last" ], "space-before-blocks": "warn", - "space-before-function-paren": [ "error", "always" ], + "space-before-function-paren": [ + "error", + "always" + ], "space-in-parens": [ "warn", "never" ], - "spaced-comment": [ "warn", "always" ], + "spaced-comment": [ + "warn", + "always" + ], "strict": "warn", "switch-colon-spacing": "warn", "symbol-description": "warn", diff --git a/index.ts b/index.ts index b2fee76..5d79ca6 100644 --- a/index.ts +++ b/index.ts @@ -1,4 +1,4 @@ -export { MessageBroker, BrokerOptions } from "./src/MessageBroker.js"; +export { MessageBroker, BrokerOptions } from './src/MessageBroker.js'; export { MariaDB, MariaOptions } from './src/MariaDB.js'; export { MongoDB, MongoOptions } from './src/MongoDB.js'; export { ObjectId, Document, DeleteResult, Collection } from 'mongodb'; \ No newline at end of file diff --git a/src/MariaDB.ts b/src/MariaDB.ts index 55869f8..369fb63 100644 --- a/src/MariaDB.ts +++ b/src/MariaDB.ts @@ -1,17 +1,18 @@ import { inspect } from 'node:util'; import { ILogger, IServer } from './interfaces/index.js'; -import mysql, { Pool, PoolCluster, PoolClusterConfig, PoolConfig, PoolConnection, FieldInfo } from 'mysql'; +import mysql, { PoolCluster, PoolClusterConfig, PoolConfig, PoolConnection, FieldInfo } from 'mysql'; import { LoggerClientOptions } from './interfaces/Logger.js'; -const SAFE_TO_RETRY = [ 'ER_LOCK_DEADLOCK' ]; +const SAFE_TO_RETRY = [ 'ER_LOCK_DEADLOCK', 'PROTOCOL_CONNECTION_LOST' ]; type Credentials = { user: string, password: string, host: string, port: number, - database: string + database: string, + node: string } export type MariaOptions = { @@ -31,19 +32,28 @@ type MariaError = { sqlMessage:string } & Error -class MariaDB { +type QueryOptions = { + node?: string, + timeout?: number +} + +class MariaDB +{ #_activeQueries: number; #afterLastQuery: (() => void) | null; #logger: ILogger; #_ready: boolean; + #load: boolean; #config: MariaOptions; #credentials: Credentials[]; #cluster: boolean; - #pool: PoolCluster | Pool | null; + #pool: PoolCluster | null; + #nodes: string[]; - constructor (server: IServer, options: MariaOptions) { + constructor (server: IServer, options: MariaOptions) + { if (!server) throw new Error('Missing reference to server!'); @@ -51,11 +61,18 @@ class MariaDB { throw new Error('No config options provided!'); this.#config = options; + this.#load = options.load ?? true; const { host, user, port, password, database } = options.credentials; const hosts = host.split(','); this.#credentials = []; - for (const remote of hosts) { - this.#credentials.push({ host: remote, user, port, password, database }); + this.#nodes = []; + for (const remote of hosts) + { + const [ node ] = remote.split('.'); + if (typeof node === 'undefined' || !(/(([a-zA-Z]\w*[0-9]?)|([0-9]?\w*[a-zA-Z]))/u).test(node)) + throw new Error('Invalid host config, expecting qualified domain names'); + this.#credentials.push({ host: remote, user, port, password, database, node }); + this.#nodes.push(node); } this.#pool = null; @@ -69,42 +86,48 @@ class MariaDB { } - get ready () { + get ready () + { return this.#_ready; } - get activeQueries () { + get activeQueries () + { return this.#_activeQueries; } - async init () { + async init () + { - if (!this.#config.load) + if (!this.#load) return this.#logger.info('Not loading MariaDB'); this.#logger.status(`Creating${this.#cluster ? ' cluster' : ''} connection pool`); - if (this.#cluster) { - this.#pool = mysql.createPoolCluster(this.#config.cluster); - for (const creds of this.#credentials) - this.#pool.add({ ...this.#config.client, ...creds }); - } else { - this.#pool = mysql.createPool({ ...this.#config.client, ...this.#credentials[0] }); + this.#pool = mysql.createPoolCluster(this.#config.cluster); + for (const creds of this.#credentials) + { + this.#pool.add(creds.node, { ...this.#config.client, ...creds }); + this.#logger.info(`Added node ${creds.node} to pool cluster`); } - this.#pool.on('connection', (connection) => { + this.#pool.on('connection', (connection) => + { this.#logger.debug(`New connection: ${connection?.threadId || null}`); }); - this.#pool.on('acquire', (connection) => { + this.#pool.on('acquire', (connection) => + { this.#_activeQueries++; this.#logger.debug(`Connection acquired: ${connection?.threadId || null}`); }); - this.#pool.on('enqueue', () => { - this.#logger.debug(`Query enqueued for connection`); + this.#pool.on('enqueue', () => + { + this.#logger.debug('Query enqueued for connection'); }); - this.#pool.on('release', (connection) => { + this.#pool.on('release', (connection) => + { this.#_activeQueries--; if (!this.ready && !this.#_activeQueries && this.#afterLastQuery) @@ -113,11 +136,20 @@ class MariaDB { this.#logger.debug(`Connection released: ${connection?.threadId || null}`); }); - this.#logger.info(`Testing MariaDB connection`); - await new Promise((resolve, reject) => { + this.#pool.on('remove', (nodeId) => + { + this.#logger.status(`Node ${nodeId} was removed from pool`); + const index = this.#nodes.findIndex(n => n === nodeId); + this.#nodes.splice(index, 1); + }); + + this.#logger.info('Testing MariaDB connection'); + await new Promise((resolve, reject) => + { if (!this.#pool) return reject(new Error('Missing connection pool')); - this.#pool.getConnection((err, conn) => { + this.#pool.getConnection((err, conn) => + { if (err) return reject(err); conn.release(); @@ -125,27 +157,31 @@ class MariaDB { }); }); - this.#logger.status(`Database connected`); + this.#logger.status('Database connected'); this.#_ready = true; return this; } - async close () { - this.#logger.status(`Shutting down database connections`); + async close () + { + this.#logger.status('Shutting down database connections'); if (!this.ready) return Promise.resolve(); this.#_ready = false; - if (this.#_activeQueries) { + if (this.#_activeQueries) + { this.#logger.info(`${this.#_activeQueries} active queries still running, letting them finish`); await this.finishQueries(); - this.#logger.info(`Queries finished, shutting down`); + this.#logger.info('Queries finished, shutting down'); } - return new Promise(resolve => { + return new Promise(resolve => + { if (!this.#pool) return resolve(); - this.#pool.end(() => { + this.#pool.end(() => + { this.#pool?.removeAllListeners(); resolve(); }); @@ -153,21 +189,34 @@ class MariaDB { }); } - finishQueries () { - return new Promise(resolve => { + finishQueries () + { + return new Promise(resolve => + { this.#afterLastQuery = resolve; }); } - getConnection (): Promise { - return new Promise((resolve, reject) => { + getConnection (node?: string): Promise + { + return new Promise((resolve, reject) => + { if (!this.#pool) return reject(new Error('Pool closed')); - this.#pool.getConnection((err, conn) => { + // Get node by name + const pool = this.#pool; + if (node && !this.#nodes.includes(node)) + { + this.#logger.warn(`Node ${node} is not available in pool, falling back to arbitrary node`); + node = '*'; + } + return pool.of(node ?? '*').getConnection((err, conn) => + { if (err) return reject(err); - resolve(conn); + return resolve(conn); }); + }); } @@ -176,11 +225,16 @@ class MariaDB { * @throws {MariaError} * @private * */ - async #_query (query: string, values: (string | number | string[] | number[])[], timeout?: number, attempts = 0): Promise { - const connection = await this.getConnection(); - try { - const result = await new Promise((resolve, reject) => { - const q = connection.query({ timeout, sql: query }, values, (err, results, fields) => { + async #_query (query: string, values: (string | number | string[] | number[])[], { timeout, node }: QueryOptions = {}, attempts = 0): + Promise + { + const connection = await this.getConnection(node); + try + { + const result = await new Promise((resolve, reject) => + { + const q = connection.query({ timeout, sql: query }, values, (err, results, fields) => + { if (err) reject(err); else if (results) @@ -192,16 +246,19 @@ class MariaDB { this.#logger.debug(`Constructed query: ${q.sql}`); }); return Promise.resolve(result || []); - } catch (err) { + } + catch (err) + { const error = err as MariaError; // Retry safe errors // (Galera) Instance not ready for query if ((SAFE_TO_RETRY.includes(error.code) || error.errno === 1047) && attempts < 5) // - return this.#_query(query, values, timeout, ++attempts); + return this.#_query(query, values, { timeout, node }, ++attempts); return Promise.reject(error); } } - async query (query: string, values: (string | number | string[] | number[])[], timeout?: number): Promise { + async query (query: string, values: (string | number | string[] | number[])[], opts?: QueryOptions): Promise + { if (!this.ready) return Promise.reject(new Error('MariaDB not ready')); @@ -211,12 +268,13 @@ class MariaDB { batch = values.some(val => val instanceof Array); this.#logger.debug(`Incoming query (batch: ${batch})\n${query}\n${inspect(values)}`); - return this.#_query(query, values, timeout); + return this.#_query(query, values, opts); } - q (query: string, values: (string | number | string[] | number[])[], timeout?: number) { - return this.query(query, values, timeout); + q (query: string, values: (string | number | string[] | number[])[], opts?: QueryOptions) + { + return this.query(query, values, opts); } } diff --git a/src/MessageBroker.ts b/src/MessageBroker.ts index 3d6d5ae..518e605 100644 --- a/src/MessageBroker.ts +++ b/src/MessageBroker.ts @@ -1,4 +1,4 @@ -import { ILogger, IServer } from "./interfaces/index.js"; +import { ILogger, IServer } from './interfaces/index.js'; import amqp, { AmqpConnectionManager, ChannelWrapper } from 'amqp-connection-manager'; import { Channel, ConfirmChannel, ConsumeMessage, Options } from 'amqplib'; @@ -56,7 +56,8 @@ type Consumer = (content: T, msg: ConsumeMessage) => Promise // eslint-disable-next-line @typescript-eslint/no-explicit-any type Subscriber = (content: T, msg: ConsumeMessage) => Promise -class MessageBroker { +class MessageBroker +{ // Broker definitions #load: boolean; @@ -80,9 +81,10 @@ class MessageBroker { #_qQueue: InternalQueueMsg[]; #_qTO: NodeJS.Timeout | null; - constructor (server: IServer, options: BrokerOptions) { + constructor (server: IServer, options: BrokerOptions) + { - this.#load = options.load || false; + this.#load = options.load ?? true; this.#hosts = options.host.split(','); this.#username = options.user; this.#password = options.pass; @@ -114,7 +116,8 @@ class MessageBroker { } - async init () { + async init () + { if (!this.#load) return this.#logger.info('Not loading RabbitMQ'); @@ -124,23 +127,29 @@ class MessageBroker { const connectionStrings = this.#hosts.map(host => `amqp://${credentials}${host}:${this.#port}/${this.#vhost}`); this.#connection = await amqp.connect(connectionStrings); - this.#connection.on('disconnect', async ({ err }) => { + this.#connection.on('disconnect', async ({ err }) => + { this.#logger.status(`Disconnected: ${err.message}`); await this.#channel?.close(); this.#channel = null; }); - this.#connection.on('blocked', ({ reason }) => { + this.#connection.on('blocked', ({ reason }) => + { this.#logger.status(`Blocked: ${reason}`); }); - this.#connection.on('connectFailed', ({ err }) => { + this.#connection.on('connectFailed', ({ err }) => + { this.#logger.error(`Message broker failed to connect: ${err.stack || err.message}`); }); - this.#connection.on('connect', async ({ url }) => { + this.#connection.on('connect', async ({ url }) => + { this.#logger.status(`Message broker connected to ${url}`); }); - await new Promise((resolve, reject) => { - this.#connection?.once('connect', () => { + await new Promise((resolve, reject) => + { + this.#connection?.once('connect', () => + { this.#connection?.removeListener('connectFailed', reject); resolve(); }); @@ -152,12 +161,15 @@ class MessageBroker { } - async close () { - if (this.#channel) { + async close () + { + if (this.#channel) + { await this.#channel.close(); this.#channel.removeAllListeners(); } - if (this.#connection) { + if (this.#connection) + { await this.#connection.close(); // No clue why the removeAllListeners function isn't exposed from the connection manager, but it exists since it's an EventEmitter // eslint-disable-next-line @typescript-eslint/ban-ts-comment @@ -166,11 +178,13 @@ class MessageBroker { } } - async createChannel () { + async createChannel () + { const exchanges = Object.entries(this.#exchanges); const queues = Object.entries(this.#queues); - if (this.#channel) { + if (this.#channel) + { this.#logger.debug('Closing old channel'); await this.#channel.close().catch(err => this.#logger.error(err.stack)); } @@ -179,11 +193,14 @@ class MessageBroker { throw new Error(); this.#logger.debug('Creating channel'); this.#channel = this.#connection.createChannel({ - setup: async (channel: Channel | ConfirmChannel) => { - for (const [ name, props ] of exchanges) { + setup: async (channel: Channel | ConfirmChannel) => + { + for (const [ name, props ] of exchanges) + { await channel.assertExchange(name, props.type ?? 'fanout', props); } - for (const [ name, props ] of queues) { + for (const [ name, props ] of queues) + { await channel.assertQueue(name, props); } } @@ -193,10 +210,12 @@ class MessageBroker { this.#channel.on('connect', () => this.#logger.status('Channel connected')); this.#channel.on('error', (err, info) => this.#logger.error(`${info.name}: ${err.stack}`)); - await new Promise((resolve, reject) => { + await new Promise((resolve, reject) => + { if (!this.#channel) return reject(new Error('Missing channel?')); - this.#channel.once('connect', () => { + this.#channel.once('connect', () => + { this.#channel?.removeListener('error', reject); resolve(); }); @@ -210,7 +229,8 @@ class MessageBroker { } // Consume queue - async consume (queue: string, consumer: Consumer, options?: Options.Consume) { + async consume (queue: string, consumer: Consumer, options?: Options.Consume) + { if (!this.#channel) throw new Error('Channel does not exist'); @@ -222,10 +242,12 @@ class MessageBroker { this.#consumers.set(queue, list); } - private async _consume (queue: string, consumer: Consumer, options?: Options.Consume): Promise { + private async _consume (queue: string, consumer: Consumer, options?: Options.Consume): Promise + { if (!this.#channel) return Promise.reject(new Error('Channel doesn\'t exist')); - await this.#channel.consume(queue, async (msg: ConsumeMessage) => { + await this.#channel.consume(queue, async (msg: ConsumeMessage) => + { if (msg.content) await consumer(JSON.parse(msg.content.toString()), msg); this.#channel?.ack(msg); @@ -233,7 +255,8 @@ class MessageBroker { } // Subscribe to exchange, ensures messages aren't lost by binding it to a queue - async subscribe (name: string, listener: Subscriber) { + async subscribe (name: string, listener: Subscriber) + { if (!this.#channel) throw new Error('Channel does not exist'); @@ -247,7 +270,8 @@ class MessageBroker { } - private async _subscribe (name: string, listener: Subscriber): Promise { + private async _subscribe (name: string, listener: Subscriber): Promise + { if (!this.#channel) return Promise.reject(new Error('Channel doesn\'t exist')); @@ -255,15 +279,19 @@ class MessageBroker { const queue = await this.#channel.assertQueue('', { exclusive: true }); await this.#channel.bindQueue(queue.queue, name, ''); - await this.#channel.consume(queue.queue, async (msg) => { - if (msg.content) + await this.#channel.consume(queue.queue, async (msg) => + { + if (msg.content && msg.content.toString().startsWith('{')) await listener(JSON.parse(msg.content.toString()), msg); + else + await listener(msg.content, msg); this.#channel?.ack(msg); }); } // Add item to queue - async enqueue (queue: string, content: object, headers?: object): Promise { + async enqueue (queue: string, content: object, headers?: object): Promise + { const properties = { persistent: true, contentType: 'application/json', @@ -274,9 +302,12 @@ class MessageBroker { if (!this.#channel) return this.#_qQueue.push({ queue, content, properties }); - try { + try + { await this.#channel.sendToQueue(queue, Buffer.from(JSON.stringify(content)), properties); - } catch (_) { + } + catch (_) + { this.#_qQueue.push({ queue, content, properties }); if (!this.#_qTO) this.#_qTO = setTimeout(this.#_processQueues.bind(this), 5000); @@ -284,7 +315,8 @@ class MessageBroker { } // Publish to exchange - async publish (exchange: string, content: object, { headers, routingKey = '' }: {headers?: string, routingKey?: string} = {}): Promise { + async publish (exchange: string, content: object, { headers, routingKey = '' }: {headers?: string, routingKey?: string} = {}): Promise + { const properties = { contentType: 'application/json', headers @@ -294,9 +326,12 @@ class MessageBroker { if (!this.#channel) return this.#_pQueue.push({ exchange, content, routingKey, properties }); - try { + try + { await this.#channel.publish(exchange, routingKey, Buffer.from(JSON.stringify(content)), properties); - } catch (err) { + } + catch (err) + { const error = err as Error; if (!error.message.includes('nack')) this.#logger.error(`Error while publishing to ${exchange}:\n${error.stack}`); @@ -306,62 +341,74 @@ class MessageBroker { } } - assertExchange (exchange: string, props?: ExchangeDef) { + assertExchange (exchange: string, props?: ExchangeDef) + { if (!this.#channel) throw new Error('Channel doesn\'t exist'); return this.#channel.assertExchange(exchange, props?.type ?? 'fanout', props); } - assertQueue (queue: string, opts?: QueueDef) { + assertQueue (queue: string, opts?: QueueDef) + { if (!this.#channel) throw new Error('Channel doesn\'t exist'); return this.#channel.assertQueue(queue, opts); } // Processes messages queued up while the broker was unreachable - async #_processQueues () { + async #_processQueues () + { if (!this.#channel) throw new Error('Channel doesn\'t exist'); this.#logger.status('Processing queues of unsent messages'); const pQ = [ ...this.#_pQueue ]; this.#_pQueue = []; - for (const msg of pQ) { + for (const msg of pQ) + { const { exchange, content, routingKey, properties } = msg; - const result = await this.#channel.publish(exchange, routingKey, Buffer.from(JSON.stringify(content)), properties).catch(() => null); + const result = await this.#channel.publish(exchange, routingKey, Buffer.from(JSON.stringify(content)), properties) + .catch(() => null); if (!result) this.#_pQueue.push(msg); } const qQ = [ ...this.#_qQueue ]; - for (const msg of qQ) { + for (const msg of qQ) + { const { queue, content, properties } = msg; const result = await this.#channel.sendToQueue(queue, Buffer.from(JSON.stringify(content)), properties).catch(() => null); if (!result) this.#_qQueue.push(msg); } this.#logger.status('Done processing'); - if (this.#_qTO) { + if (this.#_qTO) + { clearTimeout(this.#_qTO); this.#_qTO = null; } } - async #_restoreListeners () { + async #_restoreListeners () + { this.#logger.status(`Restoring ${this.#consumers.size} consumers`); - for (const [ name, list ] of this.#consumers) { + for (const [ name, list ] of this.#consumers) + { this.#logger.debug(`Processing consumer ${name}: ${list.length}`); - for (const { consumer, options } of list) { + for (const { consumer, options } of list) + { await this._consume(name, consumer, options); } } this.#logger.status(`Restoring ${this.#subscribers.size} subscribers`); - for (const [ name, list ] of this.#subscribers) { + for (const [ name, list ] of this.#subscribers) + { this.#logger.debug(`Processing subscriber ${name}: ${list.length}`); - for (const subscriber of list) { + for (const subscriber of list) + { await this._subscribe(name, subscriber); } } - this.#logger.status(`Done restoring`); + this.#logger.status('Done restoring'); } } diff --git a/src/MongoDB.ts b/src/MongoDB.ts index d7c1d19..63a6acc 100644 --- a/src/MongoDB.ts +++ b/src/MongoDB.ts @@ -1,6 +1,6 @@ -import { inspect } from "node:util"; -import { MongoClient, MongoClientOptions, Db, Document, WithId, ObjectId, Filter, IndexSpecification, CreateIndexesOptions, FindOptions } from "mongodb"; -import { IServer, ILogger, LoggerClientOptions } from "./interfaces/index.js"; +import { inspect } from 'node:util'; +import { MongoClient, MongoClientOptions, Db, Document, WithId, ObjectId, Filter, IndexSpecification, CreateIndexesOptions, FindOptions } from 'mongodb'; +import { IServer, ILogger, LoggerClientOptions } from './interfaces/index.js'; type Credentials = { URI?: string, @@ -29,7 +29,8 @@ export type MongoOptions = { * * @class MongoDB */ -class MongoDB { +class MongoDB +{ #_database: string; #config: MongoOptions; @@ -39,7 +40,8 @@ class MongoDB { #db: Db | null; #_client: MongoClient; - constructor (server: IServer, config: MongoOptions) { + constructor (server: IServer, config: MongoOptions) + { if (!server) throw new Error('Missing reference to server!'); @@ -48,7 +50,7 @@ class MongoDB { const { user, password, host, port, database, URI, authDb } = config.credentials; if ((!host?.length || !port || !database?.length) && !URI) - throw new Error(`Must provide host, port, and database OR URI parameters!`); + throw new Error('Must provide host, port, and database OR URI parameters!'); this.#config = config; this.#db = null; // DB connection @@ -56,16 +58,22 @@ class MongoDB { this.#logger = server.createLogger(this, config.loggerOptions); - if (URI) { + if (URI) + { this.#URI = URI; - } else { + } + else + { let AUTH_DB = authDb; const auth = user ? `${user}:${password}@` : ''; - if (!AUTH_DB && auth) { - this.#logger.warn(`No explicit auth db provided with MONGO_AUTH_DB, will attempt to use MONGO_DB for auth source`); + if (!AUTH_DB && auth) + { + this.#logger.warn('No explicit auth db provided with MONGO_AUTH_DB, will attempt to use MONGO_DB for auth source'); AUTH_DB = authDb; - } else if (!auth) { - this.#logger.warn(`No auth provided, proceeding without`); + } + else if (!auth) + { + this.#logger.warn('No auth provided, proceeding without'); } this.#URI = `mongodb://${auth}${host}:${port}/${AUTH_DB || ''}?readPreference=secondaryPreferred`; @@ -75,17 +83,19 @@ class MongoDB { // TODO figure out reconnecting to DB when connection fails this.#_client.on('error', (error) => this.#logger.error(`MongoDB error:\n${error.stack}`)) - .on('timeout', () => this.#logger.warn(`MongoDB timed out`)) - .on('close', () => this.#logger.info(`MongoDB client disconnected`)) - .on('open', () => this.#logger.info(`MongoDB client connected`)); + .on('timeout', () => this.#logger.warn('MongoDB timed out')) + .on('close', () => this.#logger.info('MongoDB client disconnected')) + .on('open', () => this.#logger.info('MongoDB client connected')); } - get database () { + get database () + { return this.#_database; } - get client () { + get client () + { return this.#_client; } @@ -94,7 +104,8 @@ class MongoDB { * * @memberof MongoDB */ - async init () { + async init () + { if (!this.#config.load) return this.#logger.info('Not loading MongoDB'); @@ -105,7 +116,7 @@ class MongoDB { this.#logger.status(`Initializing database connection to ${this.#_client.options.hosts}`); await this.#_client.connect(); - this.#logger.debug(`Connected, selecting DB`); + this.#logger.debug('Connected, selecting DB'); this.#db = await this.#_client.db(this.#_database); this.#logger.status('MongoDB ready'); @@ -114,7 +125,8 @@ class MongoDB { } - async close () { + async close () + { if (!this.#db) return; @@ -124,7 +136,8 @@ class MongoDB { this.#logger.status('Database closed'); } - get mongoClient () { + get mongoClient () + { return this.#_client; } @@ -136,10 +149,11 @@ class MongoDB { * @returns {Array} An array containing the corresponding objects for the query * @memberof Database */ - async find (db: string, query: MongoQuery, options?: FindOptions): Promise[]> { + async find (db: string, query: MongoQuery, options?: FindOptions): Promise[]> + { if (!this.#db) - throw new Error(`MongoDB not connected`); + throw new Error('MongoDB not connected'); if (typeof db !== 'string') throw new TypeError('Expecting collection name for the first argument'); @@ -162,10 +176,11 @@ class MongoDB { * @returns {Object} An object containing the queried data * @memberof Database */ - async findOne (db: string, query: MongoQuery, options: FindOptions = {}): Promise | null> { + async findOne (db: string, query: MongoQuery, options: FindOptions = {}): Promise | null> + { if (!this.#db) - throw new Error(`MongoDB not connected`); + throw new Error('MongoDB not connected'); if (typeof db !== 'string') throw new TypeError('Expecting collection name for the first argument'); @@ -187,14 +202,15 @@ class MongoDB { * @returns {WriteResult} Object containing the followint counts: Matched, Upserted, Modified * @memberof Database */ - async updateMany (db: string, filter: MongoQuery, data: T, upsert = false) { + async updateMany (db: string, filter: MongoQuery, data: T, upsert = false) + { if (!this.#db) - throw new Error(`MongoDB not connected`); + throw new Error('MongoDB not connected'); if (typeof db !== 'string') throw new TypeError('Expecting collection name for the first argument'); if (!filter) - throw new Error(`Cannot run update many without a filter, if you mean to update every single document, pass an empty object`); + throw new Error('Cannot run update many without a filter, if you mean to update every single document, pass an empty object'); if (typeof filter._id === 'string') filter._id = new ObjectId(filter._id); @@ -213,10 +229,11 @@ class MongoDB { * @returns {WriteResult} Object containing the followint counts: Matched, Upserted, Modified * @memberof Database */ - async updateOne (db: string, filter: MongoQuery, data: Document, upsert = false) { + async updateOne (db: string, filter: MongoQuery, data: Document, upsert = false) + { if (!this.#db) - throw new Error(`MongoDB not connected`); + throw new Error('MongoDB not connected'); if (typeof db !== 'string') throw new TypeError('Expecting collection name for the first argument'); if (typeof filter._id === 'string') @@ -237,10 +254,11 @@ class MongoDB { * @returns {WriteResult} Object containing the followint counts: Matched, Upserted, Modified * @memberof Database */ - async insertOne (db: string, data: Document) { + async insertOne (db: string, data: Document) + { if (!this.#db) - throw new Error(`MongoDB not connected`); + throw new Error('MongoDB not connected'); if (typeof db !== 'string') throw new TypeError('Expecting collection name for the first argument'); if (typeof data._id === 'string') @@ -252,10 +270,11 @@ class MongoDB { } - async deleteOne (db: string, filter: Document) { + async deleteOne (db: string, filter: Document) + { if (!this.#db) - throw new Error(`MongoDB not connected`); + throw new Error('MongoDB not connected'); if (typeof db !== 'string') throw new TypeError('Expecting collection name for the first argument'); if (typeof filter._id === 'string') @@ -277,10 +296,11 @@ class MongoDB { * @returns * @memberof Database */ - async push (db: string, filter: Document, data: object, upsert = false) { + async push (db: string, filter: Document, data: object, upsert = false) + { if (!this.#db) - throw new Error(`MongoDB not connected`); + throw new Error('MongoDB not connected'); if (typeof db !== 'string') throw new TypeError('Expecting collection name for the first argument'); if (typeof filter._id === 'string') @@ -301,10 +321,11 @@ class MongoDB { * @returns {object} * @memberof Database */ - random (db: string, filter: Document = {}, amount = 1) { + random (db: string, filter: Document = {}, amount = 1) + { if (!this.#db) - throw new Error(`MongoDB not connected`); + throw new Error('MongoDB not connected'); if (typeof db !== 'string') throw new TypeError('Expecting collection name for the first argument'); if (typeof filter._id === 'string') @@ -320,28 +341,32 @@ class MongoDB { } - stats (options = {}) { + stats (options = {}) + { if (!this.#db) - throw new Error(`MongoDB not connected`); + throw new Error('MongoDB not connected'); const result = this.#db.stats(options); return result; } - collection (coll: string) { + collection (coll: string) + { if (!this.#db) - throw new Error(`MongoDB not connected`); + throw new Error('MongoDB not connected'); return this.#db.collection(coll); } - count (coll: string, query: Document) { + count (coll: string, query: Document) + { if (!this.#db) - throw new Error(`MongoDB not connected`); + throw new Error('MongoDB not connected'); return this.#db.collection(coll).countDocuments(query); } - async ensureIndex (collection: string, indices: IndexSpecification = [], options?: CreateIndexesOptions) { + async ensureIndex (collection: string, indices: IndexSpecification = [], options?: CreateIndexesOptions) + { if (!this.#db) - throw new Error(`MongoDB not connected`); + throw new Error('MongoDB not connected'); if (!(indices instanceof Array)) indices = [ indices ]; await this.#db.collection(collection).createIndex(indices, options); diff --git a/tests/testBroker.js b/tests/testBroker.js new file mode 100644 index 0000000..2a36e14 --- /dev/null +++ b/tests/testBroker.js @@ -0,0 +1,27 @@ +import { MessageBroker } from '../build/esm/index.js'; + +const broker = new MessageBroker({ + createLogger: () => + { + return { + debug: console.log, + info: console.log, + status: console.log, + warn: console.log, + error: console.error + }; + } +}, { + load: true, + host: 'rabbitmq-01.stylis.local', + user: 'stylis', + pass: 'RrwJyrfeXFMimDH3hjZ5xSreMAmXtQJj', + vhost: 'development', + port: 5672 +}); + +await broker.init(); +broker.subscribe('chatlogs', (message) => +{ + console.log(message); +}); \ No newline at end of file diff --git a/tests/testMaria.js b/tests/testMaria.js index 4082804..9346d11 100644 --- a/tests/testMaria.js +++ b/tests/testMaria.js @@ -4,7 +4,8 @@ import { MariaDB } from '../build/esm/index.js'; const credentials = JSON.parse(readFileSync('./credentials.json', { encoding: 'utf-8' })); const maria = new MariaDB({ - createLogger: () => { + createLogger: () => + { return { debug: console.log, info: console.log, diff --git a/tests/testMongo.js b/tests/testMongo.js index 61bd492..3208b81 100644 --- a/tests/testMongo.js +++ b/tests/testMongo.js @@ -1,7 +1,8 @@ import { MongoDB } from '../build/esm/index.js'; const mongo = new MongoDB({ - createLogger: () => { + createLogger: () => + { return { debug: console.log, info: console.log,