hopefully fix missing channel errors

This commit is contained in:
Erik 2024-10-09 12:50:10 +03:00
parent b267921d48
commit c1df312b2c

View File

@ -30,7 +30,8 @@ export interface BrokerOptions {
pass: string,
vhost: string,
exchanges?: Record<string, ExchangeDef>,
queues?: Record<string, QueueDef>
queues?: Record<string, QueueDef>,
heartbeat?: number
}
interface InternalMessage {
@ -79,6 +80,8 @@ class MessageBroker
#_pQueue: InternalPublishMsg[];
#_qQueue: InternalQueueMsg[];
#_qTO: NodeJS.Timeout | null;
#heartbeat: number | undefined;
#creatingChannel?: Promise<ChannelWrapper> | null;
constructor (server: IServer, options: BrokerOptions)
{
@ -88,6 +91,7 @@ class MessageBroker
this.#password = options.pass;
this.#vhost = options.vhost || '';
this.#port = options.port;
this.#heartbeat = options.heartbeat;
if (!this.#hosts.length)
throw new Error('Missing hosts configuration');
@ -122,7 +126,7 @@ class MessageBroker
this.#logger?.info('Initialising message broker');
const credentials = this.#username ? `${this.#username}:${this.#password}@` : '';
const connectionStrings = this.#hosts.map(host => `amqp://${credentials}${host}:${this.#port}/${this.#vhost}`);
this.#connection = amqp.connect(connectionStrings);
this.#connection = amqp.connect(connectionStrings, { heartbeatIntervalInSeconds: this.#heartbeat });
this.#connection.on('disconnect', async ({ err }) =>
{
@ -153,8 +157,7 @@ class MessageBroker
this.#connection?.once('connectFailed', reject);
});
await this.createChannel();
this.#connection.on('connect', this.createChannel.bind(this));
await this.awaitChannel();
}
async close ()
@ -232,14 +235,31 @@ class MessageBroker
await this.#_processQueues();
}
awaitChannel ()
{
if (this.#channel)
return Promise.resolve(this.#channel);
if (this.#creatingChannel)
return this.#creatingChannel;
this.#logger?.debug('Creating channel create promise');
const promise = new Promise<ChannelWrapper>((resolve, reject) =>
{
this.createChannel().then(() =>
{
this.#creatingChannel = null;
resolve(this.#channel!);
}).catch(reject);
});
this.#creatingChannel = promise;
return promise;
}
/**
* Consume queue
*/
async consume<T = unknown> (queue: string, consumer: Consumer<T>, options?: Options.Consume)
{
if (!this.#channel)
throw new Error('Channel does not exist');
this.#logger?.debug(`Adding queue consumer for ${queue}`);
await this._consume<T>(queue, consumer, options);
@ -250,9 +270,8 @@ class MessageBroker
private async _consume<T> (queue: string, consumer: Consumer<T>, options?: Options.Consume): Promise<void>
{
if (!this.#channel)
return Promise.reject(new Error('Channel doesn\'t exist'));
await this.#channel.consume(queue, async (msg: ConsumeMessage) =>
await this.awaitChannel();
await this.#channel!.consume(queue, async (msg: ConsumeMessage) =>
{
this.#logger?.debug(`Consumer for ${queue} (${consumer.name}) fired`);
if (msg.content)
@ -266,9 +285,6 @@ class MessageBroker
*/
async subscribe<T> (name: string, subscriber: Subscriber<T>, options: SubscriptionOptions = {})
{
if (!this.#channel)
throw new Error('Channel does not exist');
this.#logger?.debug(`Subscribing to ${name}`);
// if (!this.#subscribers.has(name))
await this._subscribe<T>(name, subscriber, options);
@ -280,15 +296,14 @@ class MessageBroker
private async _subscribe<T> (name: string, listener: Subscriber<T>, options: SubscriptionOptions): Promise<void>
{
if (!this.#channel)
return Promise.reject(new Error('Channel doesn\'t exist'));
await this.awaitChannel();
const type = options.exchangeType ?? 'fanout';
await this.#channel.assertExchange(name, type, { durable: true });
const queue = await this.#channel.assertQueue('', { exclusive: true });
await this.#channel!.assertExchange(name, type, { durable: true });
const queue = await this.#channel!.assertQueue('', { exclusive: true });
await this.#channel.bindQueue(queue.queue, name, options.routingKey ?? '');
await this.#channel.consume(queue.queue, async (msg) =>
await this.#channel!.bindQueue(queue.queue, name, options.routingKey ?? '');
await this.#channel!.consume(queue.queue, async (msg) =>
{
this.#logger?.debug(`Subscriber for ${name} (${listener.name}) fired`);
if (msg.content && msg.content.toString().startsWith('{'))
@ -361,32 +376,30 @@ class MessageBroker
}
}
assertExchange (exchange: string, props?: ExchangeDef)
async assertExchange (exchange: string, props?: ExchangeDef)
{
if (!this.#channel)
throw new Error('Channel doesn\'t exist');
return this.#channel.assertExchange(exchange, props?.type ?? 'fanout', props);
await this.awaitChannel();
return this.#channel!.assertExchange(exchange, props?.type ?? 'fanout', props);
}
assertQueue (queue: string, opts?: QueueDef)
async assertQueue (queue: string, opts?: QueueDef)
{
if (!this.#channel)
throw new Error('Channel doesn\'t exist');
return this.#channel.assertQueue(queue, opts);
await this.awaitChannel();
return this.#channel!.assertQueue(queue, opts);
}
// Processes messages queued up while the broker was unreachable
async #_processQueues ()
{
if (!this.#channel)
throw new Error('Channel doesn\'t exist');
await this.awaitChannel();
this.#logger?.status('Processing queues of unsent messages');
const pQ = [ ...this.#_pQueue ];
this.#_pQueue = [];
for (const msg of pQ)
{
const { exchange, content, routingKey, properties } = msg;
const result = await this.#channel.publish(exchange, routingKey, Buffer.from(JSON.stringify(content)), properties)
const result = await this.#channel!.publish(exchange, routingKey, Buffer.from(JSON.stringify(content)), properties)
.catch(() => null);
if (!result)
this.#_pQueue.push(msg);
@ -395,7 +408,7 @@ class MessageBroker
for (const msg of qQ)
{
const { queue, content, properties } = msg;
const result = await this.#channel.sendToQueue(queue, Buffer.from(JSON.stringify(content)), properties).catch(() => null);
const result = await this.#channel!.sendToQueue(queue, Buffer.from(JSON.stringify(content)), properties).catch(() => null);
if (!result)
this.#_qQueue.push(msg);
}