Compare commits
No commits in common. "17dfb9356bef28bb492e7b18a98605acd4a0dc79" and "55c91289872859196e3dcef62c1a532326db4b46" have entirely different histories.
17dfb9356b
...
55c9128987
@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@navy.gif/wrappers",
|
"name": "@navy.gif/wrappers",
|
||||||
"version": "1.6.2",
|
"version": "1.6.1",
|
||||||
"description": "Various wrapper classes I use in my projects",
|
"description": "Various wrapper classes I use in my projects",
|
||||||
"repository": "https://git.corgi.wtf/Navy.gif/wrappers.git",
|
"repository": "https://git.corgi.wtf/Navy.gif/wrappers.git",
|
||||||
"author": "Navy.gif",
|
"author": "Navy.gif",
|
||||||
|
@ -6,7 +6,7 @@ type ExchangeDef = {
|
|||||||
durable?: boolean,
|
durable?: boolean,
|
||||||
internal?: boolean,
|
internal?: boolean,
|
||||||
autoDelete?: boolean,
|
autoDelete?: boolean,
|
||||||
type?: 'direct' | 'topic' | 'headers' | 'fanout' | 'match' | string,
|
type?: string,
|
||||||
arguments?: object
|
arguments?: object
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -51,13 +51,10 @@ type InternalQueueMsg = {
|
|||||||
queue: string
|
queue: string
|
||||||
} & InternalMessage
|
} & InternalMessage
|
||||||
|
|
||||||
type Consumer<T = unknown> = (content: T, msg: ConsumeMessage) => Promise<void> | void
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||||
type Subscriber<T = unknown> = (content: T, msg: ConsumeMessage) => Promise<void> | void
|
type Consumer<T = any> = (content: T, msg: ConsumeMessage) => Promise<void> | void
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||||
export type SubscriptionOptions = {
|
type Subscriber<T = any> = (content: T, msg: ConsumeMessage) => Promise<void> | void
|
||||||
exchangeType?: 'direct' | 'topic' | 'headers' | 'fanout' | 'match',
|
|
||||||
routingKey?: string
|
|
||||||
}
|
|
||||||
|
|
||||||
class MessageBroker
|
class MessageBroker
|
||||||
{
|
{
|
||||||
@ -77,10 +74,8 @@ class MessageBroker
|
|||||||
#channel: ChannelWrapper | null;
|
#channel: ChannelWrapper | null;
|
||||||
|
|
||||||
#logger: ILogger;
|
#logger: ILogger;
|
||||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
#subscribers: Map<string, Subscriber[]>;
|
||||||
#subscribers: Map<string, {options: SubscriptionOptions, subscriber: Subscriber<any>}[]>;
|
#consumers: Map<string, {consumer: Consumer, options?: Options.Consume}[]>;
|
||||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
|
||||||
#consumers: Map<string, {consumer: Consumer<any>, options?: Options.Consume}[]>;
|
|
||||||
|
|
||||||
#_pQueue: InternalPublishMsg[];
|
#_pQueue: InternalPublishMsg[];
|
||||||
#_qQueue: InternalQueueMsg[];
|
#_qQueue: InternalQueueMsg[];
|
||||||
@ -233,10 +228,8 @@ class MessageBroker
|
|||||||
await this.#_processQueues();
|
await this.#_processQueues();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
// Consume queue
|
||||||
* Consume queue
|
async consume<T> (queue: string, consumer: Consumer<T>, options?: Options.Consume)
|
||||||
*/
|
|
||||||
async consume<T = unknown> (queue: string, consumer: Consumer<T>, options?: Options.Consume)
|
|
||||||
{
|
{
|
||||||
if (!this.#channel)
|
if (!this.#channel)
|
||||||
throw new Error('Channel does not exist');
|
throw new Error('Channel does not exist');
|
||||||
@ -261,34 +254,31 @@ class MessageBroker
|
|||||||
}, options);
|
}, options);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
// Subscribe to exchange, ensures messages aren't lost by binding it to a queue
|
||||||
* Subscribe to exchange, ensures messages aren't lost by binding it to a queue
|
async subscribe<T> (name: string, listener: Subscriber<T>)
|
||||||
*/
|
|
||||||
async subscribe<T> (name: string, subscriber: Subscriber<T>, options: SubscriptionOptions = {})
|
|
||||||
{
|
{
|
||||||
if (!this.#channel)
|
if (!this.#channel)
|
||||||
throw new Error('Channel does not exist');
|
throw new Error('Channel does not exist');
|
||||||
|
|
||||||
this.#logger.debug(`Subscribing to ${name}`);
|
this.#logger.debug(`Subscribing to ${name}`);
|
||||||
// if (!this.#subscribers.has(name))
|
// if (!this.#subscribers.has(name))
|
||||||
await this._subscribe<T>(name, subscriber, options);
|
await this._subscribe<T>(name, listener);
|
||||||
|
|
||||||
const list = this.#subscribers.get(name) ?? [];
|
const list = this.#subscribers.get(name) ?? [];
|
||||||
list.push({ subscriber, options });
|
list.push(listener);
|
||||||
this.#subscribers.set(name, list);
|
this.#subscribers.set(name, list);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private async _subscribe<T> (name: string, listener: Subscriber<T>, options: SubscriptionOptions): Promise<void>
|
private async _subscribe<T> (name: string, listener: Subscriber<T>): Promise<void>
|
||||||
{
|
{
|
||||||
if (!this.#channel)
|
if (!this.#channel)
|
||||||
return Promise.reject(new Error('Channel doesn\'t exist'));
|
return Promise.reject(new Error('Channel doesn\'t exist'));
|
||||||
|
|
||||||
const type = options.exchangeType ?? 'fanout';
|
await this.#channel.assertExchange(name, 'fanout', { durable: true });
|
||||||
await this.#channel.assertExchange(name, type, { durable: true });
|
|
||||||
const queue = await this.#channel.assertQueue('', { exclusive: true });
|
const queue = await this.#channel.assertQueue('', { exclusive: true });
|
||||||
|
|
||||||
await this.#channel.bindQueue(queue.queue, name, options.routingKey ?? '');
|
await this.#channel.bindQueue(queue.queue, name, '');
|
||||||
await this.#channel.consume(queue.queue, async (msg) =>
|
await this.#channel.consume(queue.queue, async (msg) =>
|
||||||
{
|
{
|
||||||
if (msg.content && msg.content.toString().startsWith('{'))
|
if (msg.content && msg.content.toString().startsWith('{'))
|
||||||
@ -415,9 +405,9 @@ class MessageBroker
|
|||||||
for (const [ name, list ] of this.#subscribers)
|
for (const [ name, list ] of this.#subscribers)
|
||||||
{
|
{
|
||||||
this.#logger.debug(`Processing subscriber ${name}: ${list.length}`);
|
this.#logger.debug(`Processing subscriber ${name}: ${list.length}`);
|
||||||
for (const { subscriber, options } of list)
|
for (const subscriber of list)
|
||||||
{
|
{
|
||||||
await this._subscribe(name, subscriber, options);
|
await this._subscribe(name, subscriber);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.#logger.status('Done restoring');
|
this.#logger.status('Done restoring');
|
||||||
|
@ -327,6 +327,7 @@ class MongoDB
|
|||||||
*/
|
*/
|
||||||
async push (db: string, filter: Document, data: object, upsert = false)
|
async push (db: string, filter: Document, data: object, upsert = false)
|
||||||
{
|
{
|
||||||
|
|
||||||
if (!this.#db)
|
if (!this.#db)
|
||||||
throw new Error('MongoDB not connected');
|
throw new Error('MongoDB not connected');
|
||||||
if (typeof db !== 'string')
|
if (typeof db !== 'string')
|
||||||
@ -337,6 +338,7 @@ class MongoDB
|
|||||||
this.#logger.debug(`Incoming push query for ${db}, with upsert ${upsert} and with parameters ${inspect(filter)} and data ${inspect(data)}`);
|
this.#logger.debug(`Incoming push query for ${db}, with upsert ${upsert} and with parameters ${inspect(filter)} and data ${inspect(data)}`);
|
||||||
const result = await this.#db.collection(db).updateOne(filter, { $push: data }, { upsert });
|
const result = await this.#db.collection(db).updateOne(filter, { $push: data }, { upsert });
|
||||||
return result;
|
return result;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Loading…
Reference in New Issue
Block a user