diff --git a/src/MessageBroker.ts b/src/MessageBroker.ts index d63bcea..99af8f5 100644 --- a/src/MessageBroker.ts +++ b/src/MessageBroker.ts @@ -122,8 +122,12 @@ class MessageBroker { this.#logger.status(`Message broker connected to ${url}`); }); - await new Promise((resolve) => { - this.#connection?.once('connect', resolve); + await new Promise((resolve, reject) => { + this.#connection?.once('connect', () => { + this.#connection?.removeListener('connectFailed', reject); + resolve(); + }); + this.#connection?.once('connectFailed', reject); }); await this.createChannel(); @@ -131,6 +135,20 @@ class MessageBroker { } + async close () { + if (this.#channel) { + await this.#channel.close(); + this.#channel.removeAllListeners(); + } + if (this.#connection) { + await this.#connection.close(); + // No clue why the removeAllListeners function isn't exposed from the connection manager, but it exists since it's an EventEmitter + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + this.#connection.removeAllListeners(); + } + } + async createChannel () { const exchanges = Object.entries(this.#exchanges); const queues = Object.entries(this.#queues); @@ -158,11 +176,14 @@ class MessageBroker { this.#channel.on('connect', () => this.#logger.status('Channel connected')); this.#channel.on('error', (err, info) => this.#logger.error(`${info.name}: ${err.stack}`)); - await new Promise((resolve, reject) => { + await new Promise((resolve, reject) => { if (!this.#channel) return reject(new Error('Missing channel?')); + this.#channel.once('connect', () => { + this.#channel?.removeListener('error', reject); + resolve(); + }); this.#channel.once('error', reject); - this.#channel.once('connect', resolve); }); // If the create channel function was called as a result of a failover we'll have to resubscribe