From 3753161edb8097abfb1e2145d9673b0729bed6f5 Mon Sep 17 00:00:00 2001 From: "Navy.gif" Date: Thu, 17 Aug 2023 21:14:12 +0300 Subject: [PATCH] properly handle routing keys --- src/MessageBroker.ts | 46 +++++++++++++++++++++++++++----------------- src/MongoDB.ts | 2 -- 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/src/MessageBroker.ts b/src/MessageBroker.ts index b8cbb2a..a765202 100644 --- a/src/MessageBroker.ts +++ b/src/MessageBroker.ts @@ -6,7 +6,7 @@ type ExchangeDef = { durable?: boolean, internal?: boolean, autoDelete?: boolean, - type?: string, + type?: 'direct' | 'topic' | 'headers' | 'fanout' | 'match' | string, arguments?: object } @@ -51,10 +51,13 @@ type InternalQueueMsg = { queue: string } & InternalMessage -// eslint-disable-next-line @typescript-eslint/no-explicit-any -type Consumer = (content: T, msg: ConsumeMessage) => Promise | void -// eslint-disable-next-line @typescript-eslint/no-explicit-any -type Subscriber = (content: T, msg: ConsumeMessage) => Promise | void +type Consumer = (content: T, msg: ConsumeMessage) => Promise | void +type Subscriber = (content: T, msg: ConsumeMessage) => Promise | void + +export type SubscriptionOptions = { + exchangeType?: 'direct' | 'topic' | 'headers' | 'fanout' | 'match', + routingKey?: string +} class MessageBroker { @@ -74,8 +77,10 @@ class MessageBroker #channel: ChannelWrapper | null; #logger: ILogger; - #subscribers: Map; - #consumers: Map; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + #subscribers: Map}[]>; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + #consumers: Map, options?: Options.Consume}[]>; #_pQueue: InternalPublishMsg[]; #_qQueue: InternalQueueMsg[]; @@ -228,8 +233,10 @@ class MessageBroker await this.#_processQueues(); } - // Consume queue - async consume (queue: string, consumer: Consumer, options?: Options.Consume) + /** + * Consume queue + */ + async consume (queue: string, consumer: Consumer, options?: Options.Consume) { if (!this.#channel) throw new Error('Channel does not exist'); @@ -254,31 +261,34 @@ class MessageBroker }, options); } - // Subscribe to exchange, ensures messages aren't lost by binding it to a queue - async subscribe (name: string, listener: Subscriber) + /** + * Subscribe to exchange, ensures messages aren't lost by binding it to a queue + */ + async subscribe (name: string, subscriber: Subscriber, options: SubscriptionOptions = {}) { if (!this.#channel) throw new Error('Channel does not exist'); this.#logger.debug(`Subscribing to ${name}`); // if (!this.#subscribers.has(name)) - await this._subscribe(name, listener); + await this._subscribe(name, subscriber, options); const list = this.#subscribers.get(name) ?? []; - list.push(listener); + list.push({ subscriber, options }); this.#subscribers.set(name, list); } - private async _subscribe (name: string, listener: Subscriber): Promise + private async _subscribe (name: string, listener: Subscriber, options: SubscriptionOptions): Promise { if (!this.#channel) return Promise.reject(new Error('Channel doesn\'t exist')); - await this.#channel.assertExchange(name, 'fanout', { durable: true }); + const type = options.exchangeType ?? 'fanout'; + await this.#channel.assertExchange(name, type, { durable: true }); const queue = await this.#channel.assertQueue('', { exclusive: true }); - await this.#channel.bindQueue(queue.queue, name, ''); + await this.#channel.bindQueue(queue.queue, name, options.routingKey ?? ''); await this.#channel.consume(queue.queue, async (msg) => { if (msg.content && msg.content.toString().startsWith('{')) @@ -405,9 +415,9 @@ class MessageBroker for (const [ name, list ] of this.#subscribers) { this.#logger.debug(`Processing subscriber ${name}: ${list.length}`); - for (const subscriber of list) + for (const { subscriber, options } of list) { - await this._subscribe(name, subscriber); + await this._subscribe(name, subscriber, options); } } this.#logger.status('Done restoring'); diff --git a/src/MongoDB.ts b/src/MongoDB.ts index 838e7ba..e2201ce 100644 --- a/src/MongoDB.ts +++ b/src/MongoDB.ts @@ -327,7 +327,6 @@ class MongoDB */ async push (db: string, filter: Document, data: object, upsert = false) { - if (!this.#db) throw new Error('MongoDB not connected'); if (typeof db !== 'string') @@ -338,7 +337,6 @@ class MongoDB this.#logger.debug(`Incoming push query for ${db}, with upsert ${upsert} and with parameters ${inspect(filter)} and data ${inspect(data)}`); const result = await this.#db.collection(db).updateOne(filter, { $push: data }, { upsert }); return result; - } /**