From 88dc5dfb4d839cfa21cc254ee8bf430065526f13 Mon Sep 17 00:00:00 2001 From: Navy Date: Sun, 13 Oct 2024 00:56:27 +0300 Subject: [PATCH] Fix potential bug --- src/MessageBroker.ts | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/MessageBroker.ts b/src/MessageBroker.ts index dfc9b72..a4ce2e1 100644 --- a/src/MessageBroker.ts +++ b/src/MessageBroker.ts @@ -145,6 +145,7 @@ class MessageBroker this.#connection.on('connect', async ({ url }) => { this.#logger?.status(`Message broker connected to ${url}`); + this.ensureChannel(); }); await new Promise((resolve, reject) => @@ -157,7 +158,7 @@ class MessageBroker this.#connection?.once('connectFailed', reject); }); - await this.awaitChannel(); + await this.ensureChannel(); } async close () @@ -235,7 +236,7 @@ class MessageBroker await this.#_processQueues(); } - awaitChannel () + ensureChannel () { if (this.#channel) return Promise.resolve(this.#channel); @@ -270,7 +271,7 @@ class MessageBroker private async _consume (queue: string, consumer: Consumer, options?: Options.Consume): Promise { - await this.awaitChannel(); + await this.ensureChannel(); await this.#channel!.consume(queue, async (msg: ConsumeMessage) => { this.#logger?.debug(`Consumer for ${queue} (${consumer.name}) fired`); @@ -296,7 +297,7 @@ class MessageBroker private async _subscribe (name: string, listener: Subscriber, options: SubscriptionOptions): Promise { - await this.awaitChannel(); + await this.ensureChannel(); const type = options.exchangeType ?? 'fanout'; await this.#channel!.assertExchange(name, type, { durable: true }); @@ -378,20 +379,20 @@ class MessageBroker async assertExchange (exchange: string, props?: ExchangeDef) { - await this.awaitChannel(); + await this.ensureChannel(); return this.#channel!.assertExchange(exchange, props?.type ?? 'fanout', props); } async assertQueue (queue: string, opts?: QueueDef) { - await this.awaitChannel(); + await this.ensureChannel(); return this.#channel!.assertQueue(queue, opts); } // Processes messages queued up while the broker was unreachable async #_processQueues () { - await this.awaitChannel(); + await this.ensureChannel(); this.#logger?.status('Processing queues of unsent messages'); const pQ = [ ...this.#_pQueue ];