Compare commits

..

No commits in common. "17dfb9356bef28bb492e7b18a98605acd4a0dc79" and "55c91289872859196e3dcef62c1a532326db4b46" have entirely different histories.

3 changed files with 21 additions and 29 deletions

View File

@ -1,6 +1,6 @@
{ {
"name": "@navy.gif/wrappers", "name": "@navy.gif/wrappers",
"version": "1.6.2", "version": "1.6.1",
"description": "Various wrapper classes I use in my projects", "description": "Various wrapper classes I use in my projects",
"repository": "https://git.corgi.wtf/Navy.gif/wrappers.git", "repository": "https://git.corgi.wtf/Navy.gif/wrappers.git",
"author": "Navy.gif", "author": "Navy.gif",

View File

@ -6,7 +6,7 @@ type ExchangeDef = {
durable?: boolean, durable?: boolean,
internal?: boolean, internal?: boolean,
autoDelete?: boolean, autoDelete?: boolean,
type?: 'direct' | 'topic' | 'headers' | 'fanout' | 'match' | string, type?: string,
arguments?: object arguments?: object
} }
@ -51,13 +51,10 @@ type InternalQueueMsg = {
queue: string queue: string
} & InternalMessage } & InternalMessage
type Consumer<T = unknown> = (content: T, msg: ConsumeMessage) => Promise<void> | void // eslint-disable-next-line @typescript-eslint/no-explicit-any
type Subscriber<T = unknown> = (content: T, msg: ConsumeMessage) => Promise<void> | void type Consumer<T = any> = (content: T, msg: ConsumeMessage) => Promise<void> | void
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type SubscriptionOptions = { type Subscriber<T = any> = (content: T, msg: ConsumeMessage) => Promise<void> | void
exchangeType?: 'direct' | 'topic' | 'headers' | 'fanout' | 'match',
routingKey?: string
}
class MessageBroker class MessageBroker
{ {
@ -77,10 +74,8 @@ class MessageBroker
#channel: ChannelWrapper | null; #channel: ChannelWrapper | null;
#logger: ILogger; #logger: ILogger;
// eslint-disable-next-line @typescript-eslint/no-explicit-any #subscribers: Map<string, Subscriber[]>;
#subscribers: Map<string, {options: SubscriptionOptions, subscriber: Subscriber<any>}[]>; #consumers: Map<string, {consumer: Consumer, options?: Options.Consume}[]>;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
#consumers: Map<string, {consumer: Consumer<any>, options?: Options.Consume}[]>;
#_pQueue: InternalPublishMsg[]; #_pQueue: InternalPublishMsg[];
#_qQueue: InternalQueueMsg[]; #_qQueue: InternalQueueMsg[];
@ -233,10 +228,8 @@ class MessageBroker
await this.#_processQueues(); await this.#_processQueues();
} }
/** // Consume queue
* Consume queue async consume<T> (queue: string, consumer: Consumer<T>, options?: Options.Consume)
*/
async consume<T = unknown> (queue: string, consumer: Consumer<T>, options?: Options.Consume)
{ {
if (!this.#channel) if (!this.#channel)
throw new Error('Channel does not exist'); throw new Error('Channel does not exist');
@ -261,34 +254,31 @@ class MessageBroker
}, options); }, options);
} }
/** // Subscribe to exchange, ensures messages aren't lost by binding it to a queue
* Subscribe to exchange, ensures messages aren't lost by binding it to a queue async subscribe<T> (name: string, listener: Subscriber<T>)
*/
async subscribe<T> (name: string, subscriber: Subscriber<T>, options: SubscriptionOptions = {})
{ {
if (!this.#channel) if (!this.#channel)
throw new Error('Channel does not exist'); throw new Error('Channel does not exist');
this.#logger.debug(`Subscribing to ${name}`); this.#logger.debug(`Subscribing to ${name}`);
// if (!this.#subscribers.has(name)) // if (!this.#subscribers.has(name))
await this._subscribe<T>(name, subscriber, options); await this._subscribe<T>(name, listener);
const list = this.#subscribers.get(name) ?? []; const list = this.#subscribers.get(name) ?? [];
list.push({ subscriber, options }); list.push(listener);
this.#subscribers.set(name, list); this.#subscribers.set(name, list);
} }
private async _subscribe<T> (name: string, listener: Subscriber<T>, options: SubscriptionOptions): Promise<void> private async _subscribe<T> (name: string, listener: Subscriber<T>): Promise<void>
{ {
if (!this.#channel) if (!this.#channel)
return Promise.reject(new Error('Channel doesn\'t exist')); return Promise.reject(new Error('Channel doesn\'t exist'));
const type = options.exchangeType ?? 'fanout'; await this.#channel.assertExchange(name, 'fanout', { durable: true });
await this.#channel.assertExchange(name, type, { durable: true });
const queue = await this.#channel.assertQueue('', { exclusive: true }); const queue = await this.#channel.assertQueue('', { exclusive: true });
await this.#channel.bindQueue(queue.queue, name, options.routingKey ?? ''); await this.#channel.bindQueue(queue.queue, name, '');
await this.#channel.consume(queue.queue, async (msg) => await this.#channel.consume(queue.queue, async (msg) =>
{ {
if (msg.content && msg.content.toString().startsWith('{')) if (msg.content && msg.content.toString().startsWith('{'))
@ -415,9 +405,9 @@ class MessageBroker
for (const [ name, list ] of this.#subscribers) for (const [ name, list ] of this.#subscribers)
{ {
this.#logger.debug(`Processing subscriber ${name}: ${list.length}`); this.#logger.debug(`Processing subscriber ${name}: ${list.length}`);
for (const { subscriber, options } of list) for (const subscriber of list)
{ {
await this._subscribe(name, subscriber, options); await this._subscribe(name, subscriber);
} }
} }
this.#logger.status('Done restoring'); this.#logger.status('Done restoring');

View File

@ -327,6 +327,7 @@ class MongoDB
*/ */
async push (db: string, filter: Document, data: object, upsert = false) async push (db: string, filter: Document, data: object, upsert = false)
{ {
if (!this.#db) if (!this.#db)
throw new Error('MongoDB not connected'); throw new Error('MongoDB not connected');
if (typeof db !== 'string') if (typeof db !== 'string')
@ -337,6 +338,7 @@ class MongoDB
this.#logger.debug(`Incoming push query for ${db}, with upsert ${upsert} and with parameters ${inspect(filter)} and data ${inspect(data)}`); this.#logger.debug(`Incoming push query for ${db}, with upsert ${upsert} and with parameters ${inspect(filter)} and data ${inspect(data)}`);
const result = await this.#db.collection(db).updateOne(filter, { $push: data }, { upsert }); const result = await this.#db.collection(db).updateOne(filter, { $push: data }, { upsert });
return result; return result;
} }
/** /**