clean up after closed connections
This commit is contained in:
parent
b00c712748
commit
a52eb1c9e8
@ -122,8 +122,12 @@ class MessageBroker {
|
|||||||
this.#logger.status(`Message broker connected to ${url}`);
|
this.#logger.status(`Message broker connected to ${url}`);
|
||||||
});
|
});
|
||||||
|
|
||||||
await new Promise((resolve) => {
|
await new Promise<void>((resolve, reject) => {
|
||||||
this.#connection?.once('connect', resolve);
|
this.#connection?.once('connect', () => {
|
||||||
|
this.#connection?.removeListener('connectFailed', reject);
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
this.#connection?.once('connectFailed', reject);
|
||||||
});
|
});
|
||||||
|
|
||||||
await this.createChannel();
|
await this.createChannel();
|
||||||
@ -131,6 +135,20 @@ class MessageBroker {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async close () {
|
||||||
|
if (this.#channel) {
|
||||||
|
await this.#channel.close();
|
||||||
|
this.#channel.removeAllListeners();
|
||||||
|
}
|
||||||
|
if (this.#connection) {
|
||||||
|
await this.#connection.close();
|
||||||
|
// No clue why the removeAllListeners function isn't exposed from the connection manager, but it exists since it's an EventEmitter
|
||||||
|
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
|
||||||
|
// @ts-ignore
|
||||||
|
this.#connection.removeAllListeners();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async createChannel () {
|
async createChannel () {
|
||||||
const exchanges = Object.entries(this.#exchanges);
|
const exchanges = Object.entries(this.#exchanges);
|
||||||
const queues = Object.entries(this.#queues);
|
const queues = Object.entries(this.#queues);
|
||||||
@ -158,11 +176,14 @@ class MessageBroker {
|
|||||||
this.#channel.on('connect', () => this.#logger.status('Channel connected'));
|
this.#channel.on('connect', () => this.#logger.status('Channel connected'));
|
||||||
this.#channel.on('error', (err, info) => this.#logger.error(`${info.name}: ${err.stack}`));
|
this.#channel.on('error', (err, info) => this.#logger.error(`${info.name}: ${err.stack}`));
|
||||||
|
|
||||||
await new Promise((resolve, reject) => {
|
await new Promise<void>((resolve, reject) => {
|
||||||
if (!this.#channel)
|
if (!this.#channel)
|
||||||
return reject(new Error('Missing channel?'));
|
return reject(new Error('Missing channel?'));
|
||||||
|
this.#channel.once('connect', () => {
|
||||||
|
this.#channel?.removeListener('error', reject);
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
this.#channel.once('error', reject);
|
this.#channel.once('error', reject);
|
||||||
this.#channel.once('connect', resolve);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
// If the create channel function was called as a result of a failover we'll have to resubscribe
|
// If the create channel function was called as a result of a failover we'll have to resubscribe
|
||||||
|
Loading…
Reference in New Issue
Block a user