diff --git a/src/MessageBroker.ts b/src/MessageBroker.ts index 145a0a3..dfc9b72 100644 --- a/src/MessageBroker.ts +++ b/src/MessageBroker.ts @@ -30,7 +30,8 @@ export interface BrokerOptions { pass: string, vhost: string, exchanges?: Record, - queues?: Record + queues?: Record, + heartbeat?: number } interface InternalMessage { @@ -79,6 +80,8 @@ class MessageBroker #_pQueue: InternalPublishMsg[]; #_qQueue: InternalQueueMsg[]; #_qTO: NodeJS.Timeout | null; + #heartbeat: number | undefined; + #creatingChannel?: Promise | null; constructor (server: IServer, options: BrokerOptions) { @@ -88,6 +91,7 @@ class MessageBroker this.#password = options.pass; this.#vhost = options.vhost || ''; this.#port = options.port; + this.#heartbeat = options.heartbeat; if (!this.#hosts.length) throw new Error('Missing hosts configuration'); @@ -122,7 +126,7 @@ class MessageBroker this.#logger?.info('Initialising message broker'); const credentials = this.#username ? `${this.#username}:${this.#password}@` : ''; const connectionStrings = this.#hosts.map(host => `amqp://${credentials}${host}:${this.#port}/${this.#vhost}`); - this.#connection = amqp.connect(connectionStrings); + this.#connection = amqp.connect(connectionStrings, { heartbeatIntervalInSeconds: this.#heartbeat }); this.#connection.on('disconnect', async ({ err }) => { @@ -153,8 +157,7 @@ class MessageBroker this.#connection?.once('connectFailed', reject); }); - await this.createChannel(); - this.#connection.on('connect', this.createChannel.bind(this)); + await this.awaitChannel(); } async close () @@ -232,14 +235,31 @@ class MessageBroker await this.#_processQueues(); } + awaitChannel () + { + if (this.#channel) + return Promise.resolve(this.#channel); + + if (this.#creatingChannel) + return this.#creatingChannel; + this.#logger?.debug('Creating channel create promise'); + const promise = new Promise((resolve, reject) => + { + this.createChannel().then(() => + { + this.#creatingChannel = null; + resolve(this.#channel!); + }).catch(reject); + }); + this.#creatingChannel = promise; + return promise; + } + /** * Consume queue */ async consume (queue: string, consumer: Consumer, options?: Options.Consume) { - if (!this.#channel) - throw new Error('Channel does not exist'); - this.#logger?.debug(`Adding queue consumer for ${queue}`); await this._consume(queue, consumer, options); @@ -250,9 +270,8 @@ class MessageBroker private async _consume (queue: string, consumer: Consumer, options?: Options.Consume): Promise { - if (!this.#channel) - return Promise.reject(new Error('Channel doesn\'t exist')); - await this.#channel.consume(queue, async (msg: ConsumeMessage) => + await this.awaitChannel(); + await this.#channel!.consume(queue, async (msg: ConsumeMessage) => { this.#logger?.debug(`Consumer for ${queue} (${consumer.name}) fired`); if (msg.content) @@ -266,9 +285,6 @@ class MessageBroker */ async subscribe (name: string, subscriber: Subscriber, options: SubscriptionOptions = {}) { - if (!this.#channel) - throw new Error('Channel does not exist'); - this.#logger?.debug(`Subscribing to ${name}`); // if (!this.#subscribers.has(name)) await this._subscribe(name, subscriber, options); @@ -280,15 +296,14 @@ class MessageBroker private async _subscribe (name: string, listener: Subscriber, options: SubscriptionOptions): Promise { - if (!this.#channel) - return Promise.reject(new Error('Channel doesn\'t exist')); + await this.awaitChannel(); const type = options.exchangeType ?? 'fanout'; - await this.#channel.assertExchange(name, type, { durable: true }); - const queue = await this.#channel.assertQueue('', { exclusive: true }); + await this.#channel!.assertExchange(name, type, { durable: true }); + const queue = await this.#channel!.assertQueue('', { exclusive: true }); - await this.#channel.bindQueue(queue.queue, name, options.routingKey ?? ''); - await this.#channel.consume(queue.queue, async (msg) => + await this.#channel!.bindQueue(queue.queue, name, options.routingKey ?? ''); + await this.#channel!.consume(queue.queue, async (msg) => { this.#logger?.debug(`Subscriber for ${name} (${listener.name}) fired`); if (msg.content && msg.content.toString().startsWith('{')) @@ -361,32 +376,30 @@ class MessageBroker } } - assertExchange (exchange: string, props?: ExchangeDef) + async assertExchange (exchange: string, props?: ExchangeDef) { - if (!this.#channel) - throw new Error('Channel doesn\'t exist'); - return this.#channel.assertExchange(exchange, props?.type ?? 'fanout', props); + await this.awaitChannel(); + return this.#channel!.assertExchange(exchange, props?.type ?? 'fanout', props); } - assertQueue (queue: string, opts?: QueueDef) + async assertQueue (queue: string, opts?: QueueDef) { - if (!this.#channel) - throw new Error('Channel doesn\'t exist'); - return this.#channel.assertQueue(queue, opts); + await this.awaitChannel(); + return this.#channel!.assertQueue(queue, opts); } // Processes messages queued up while the broker was unreachable async #_processQueues () { - if (!this.#channel) - throw new Error('Channel doesn\'t exist'); + await this.awaitChannel(); + this.#logger?.status('Processing queues of unsent messages'); const pQ = [ ...this.#_pQueue ]; this.#_pQueue = []; for (const msg of pQ) { const { exchange, content, routingKey, properties } = msg; - const result = await this.#channel.publish(exchange, routingKey, Buffer.from(JSON.stringify(content)), properties) + const result = await this.#channel!.publish(exchange, routingKey, Buffer.from(JSON.stringify(content)), properties) .catch(() => null); if (!result) this.#_pQueue.push(msg); @@ -395,7 +408,7 @@ class MessageBroker for (const msg of qQ) { const { queue, content, properties } = msg; - const result = await this.#channel.sendToQueue(queue, Buffer.from(JSON.stringify(content)), properties).catch(() => null); + const result = await this.#channel!.sendToQueue(queue, Buffer.from(JSON.stringify(content)), properties).catch(() => null); if (!result) this.#_qQueue.push(msg); }