Fix potential bug

This commit is contained in:
Erik 2024-10-13 00:56:27 +03:00
parent 4e9c63856c
commit 88dc5dfb4d

View File

@ -145,6 +145,7 @@ class MessageBroker
this.#connection.on('connect', async ({ url }) =>
{
this.#logger?.status(`Message broker connected to ${url}`);
this.ensureChannel();
});
await new Promise<void>((resolve, reject) =>
@ -157,7 +158,7 @@ class MessageBroker
this.#connection?.once('connectFailed', reject);
});
await this.awaitChannel();
await this.ensureChannel();
}
async close ()
@ -235,7 +236,7 @@ class MessageBroker
await this.#_processQueues();
}
awaitChannel ()
ensureChannel ()
{
if (this.#channel)
return Promise.resolve(this.#channel);
@ -270,7 +271,7 @@ class MessageBroker
private async _consume<T> (queue: string, consumer: Consumer<T>, options?: Options.Consume): Promise<void>
{
await this.awaitChannel();
await this.ensureChannel();
await this.#channel!.consume(queue, async (msg: ConsumeMessage) =>
{
this.#logger?.debug(`Consumer for ${queue} (${consumer.name}) fired`);
@ -296,7 +297,7 @@ class MessageBroker
private async _subscribe<T> (name: string, listener: Subscriber<T>, options: SubscriptionOptions): Promise<void>
{
await this.awaitChannel();
await this.ensureChannel();
const type = options.exchangeType ?? 'fanout';
await this.#channel!.assertExchange(name, type, { durable: true });
@ -378,20 +379,20 @@ class MessageBroker
async assertExchange (exchange: string, props?: ExchangeDef)
{
await this.awaitChannel();
await this.ensureChannel();
return this.#channel!.assertExchange(exchange, props?.type ?? 'fanout', props);
}
async assertQueue (queue: string, opts?: QueueDef)
{
await this.awaitChannel();
await this.ensureChannel();
return this.#channel!.assertQueue(queue, opts);
}
// Processes messages queued up while the broker was unreachable
async #_processQueues ()
{
await this.awaitChannel();
await this.ensureChannel();
this.#logger?.status('Processing queues of unsent messages');
const pQ = [ ...this.#_pQueue ];