wrappers/src/MariaDB.ts
2023-08-15 22:31:41 +03:00

437 lines
14 KiB
TypeScript

import { inspect } from 'node:util';
import dns from 'node:dns/promises';
import mysql, { PoolCluster, PoolClusterConfig, PoolConfig, PoolConnection, FieldInfo, OkPacket } from 'mysql';
import { ILogger, IServer } from './interfaces/index.js';
import { LoggerClientOptions } from './interfaces/Logger.js';
const SAFE_TO_RETRY = [ 'ER_LOCK_DEADLOCK', 'PROTOCOL_CONNECTION_LOST' ];
type Credentials = {
user: string,
password: string,
host: string,
port: number,
database: string,
}
type ExtendedCredentials = {
node: string
} & Credentials
export type MariaOptions = {
load?: boolean,
cluster?: PoolClusterConfig,
client?: PoolConfig,
credentials: Credentials,
loggerOptions?: LoggerClientOptions
}
type MariaError = {
code: string,
errno: number,
fatal: boolean,
sql: string,
sqlState: string,
sqlMessage:string
} & Error
type Values = ((string | number | null | boolean) | (string | number | null | boolean)[] | (string | number | null | boolean)[][])[];
type QueryOptions = {
node?: string,
timeout?: number,
errorIfNodeUnavailable?: boolean
}
type StatusString =
| 'disconnecting'
| 'disconnected'
| 'connecting'
| 'connected'
| 'synced'
| 'donor'
| 'joiner'
| 'joined'
type Node = {
name: string,
host: string,
uuid?: string,
status?: StatusString
}
type StatusUpdate = {
status: StatusString,
uuid: string,
primary: string,
index: string,
members: string
}
const isOkPacket = (obj: object): obj is OkPacket =>
{
if ('fieldCount' in obj
&& 'affectedRows' in obj
&& 'message' in obj
)
return true;
return false;
};
// Designed to work in a single instance or cluster (galera) configuration
// Will need some work for primary/replica configuration (e.g. not treating all nodes as read-write nodes)
class MariaDB
{
#_activeQueries: number;
#afterLastQuery: (() => void) | null;
#logger: ILogger;
#_ready: boolean;
#load: boolean;
#config: MariaOptions;
#credentials: ExtendedCredentials[];
#cluster: boolean;
#pool: PoolCluster | null;
#nodes: Node[];
constructor (server: IServer, options: MariaOptions)
{
if (!server)
throw new Error('Missing reference to server!');
if (!options)
throw new Error('No config options provided!');
this.#config = options;
this.#load = options.load ?? true;
const { host, user, port, password, database } = options.credentials;
const hosts = host.split(',');
this.#credentials = [];
this.#nodes = [];
for (const remote of hosts)
{
const [ node ] = remote.split('.');
if (typeof node === 'undefined' || !(/(([a-zA-Z]\w*[0-9]?)|([0-9]?\w*[a-zA-Z]))/u).test(node))
throw new Error('Invalid host config, expecting qualified domain names');
this.#credentials.push({ host: remote, user, port, password, database, node });
this.#nodes.push({ name: node, host: remote });
}
this.#pool = null;
this.#_ready = false;
this.#_activeQueries = 0;
this.#afterLastQuery = null;
this.#cluster = this.#credentials.length > 1;
this.#logger = server.createLogger(this, options.loggerOptions);
}
get ready ()
{
return this.#_ready;
}
get activeQueries ()
{
return this.#_activeQueries;
}
async init ()
{
if (!this.#load)
return this.#logger.info('Not loading MariaDB');
this.#logger.status(`Creating${this.#cluster ? ' cluster' : ''} connection pool`);
this.#pool = mysql.createPoolCluster(this.#config.cluster);
for (const creds of this.#credentials)
{
const name = creds.node;
this.#pool.add(name, { ...this.#config.client, ...creds });
this.#logger.info(`Added node ${name} to pool cluster`);
}
this.#pool.on('connection', (connection) =>
{
this.#logger.debug(`New connection: ${connection?.threadId || null}`);
});
this.#pool.on('acquire', (connection) =>
{
this.#_activeQueries++;
this.#logger.debug(`Connection acquired: ${connection?.threadId || null}`);
});
this.#pool.on('enqueue', () =>
{
this.#logger.debug('Query enqueued for connection');
});
this.#pool.on('release', (connection) =>
{
this.#_activeQueries--;
if (!this.ready && !this.#_activeQueries && this.#afterLastQuery)
this.#afterLastQuery();
this.#logger.debug(`Connection released: ${connection?.threadId || null}`);
});
this.#pool.on('remove', (nodeId) =>
{
this.#logger.status(`Node ${nodeId} was removed from pool`);
const index = this.#nodes.findIndex(n => n === nodeId);
this.#nodes.splice(index, 1);
});
this.#logger.info('Testing MariaDB connection');
await new Promise<void>((resolve, reject) =>
{
if (!this.#pool)
return reject(new Error('Missing connection pool'));
this.#pool.getConnection((err, conn) =>
{
if (err)
return reject(err);
conn.release();
return resolve();
});
});
this.#logger.status('Database connected');
if (this.#cluster)
{ // Resolve the UUID for each node to enable status updates
this.#logger.status('Resolving cluster node UUIDs for status updates');
for (const node of this.#nodes)
{
// UUIDs are not always enough, we also need the node's IP for cases where the UUID changes due to restarts
const dnsResult = await dns.lookup(node.host, 4);
node.host = dnsResult.address;
const response = await this.#_query('SHOW STATUS WHERE `Variable_name` = "wsrep_gcomm_uuid" OR `Variable_name` = "wsrep_local_state_comment"', [], { node: node.name }).catch(() => null) as { Variable_name: string, Value: string }[];
// Error gives us null
if (!response)
{
this.#logger.info(`Could not resolve UUID for ${node.name}, presumably offline, setting status to disconnected`);
node.status = 'disconnected';
continue;
}
// if we for some reason get a response that is empty, we got a problem
if (!response.length)
throw new Error(`Failed to acquire UUID for node ${node.name}`);
const uuid = response.find(entry => entry.Variable_name === 'wsrep_gcomm_uuid')?.Value;
const status = response.find(entry => entry.Variable_name === 'wsrep_local_state_comment')?.Value.toLowerCase();
node.uuid = uuid;
node.status = status as StatusString;
}
}
this.#_ready = true;
return this;
}
async close ()
{
this.#logger.status('Shutting down database connections');
if (!this.ready)
return Promise.resolve();
this.#_ready = false;
if (this.#_activeQueries)
{
this.#logger.info(`${this.#_activeQueries} active queries still running, letting them finish`);
await this.finishQueries();
this.#logger.info('Queries finished, shutting down');
}
return new Promise<void>(resolve =>
{
if (!this.#pool)
return resolve();
this.#pool.end(() =>
{
this.#pool?.removeAllListeners();
resolve();
});
this.#pool = null;
});
}
get statusUpdateListener ()
{
return this.#statusUpdate.bind(this);
}
// This is a bit of a clusterfuck but it seems to work
#statusUpdate (update: StatusUpdate)
{
// this.#logger.debug('Status update');
// this.#logger.debug(update);
if (!update.members.length)
{
if (update.status === 'disconnected')
this.#nodes.forEach(node =>
{
if (node.status === 'disconnecting')
{
const oldStatus = node.status;
node.status = 'disconnected';
this.#logger.status(`Cluster node ${node.name} changed status from ${oldStatus} to ${node.status}`);
}
});
else if (update.status === 'synced')
{ // Sometimes we'll get a status update without the member info, probably safe to assume if we get a synced status the recently joined members will be synced
this.#nodes.forEach(node =>
{
if (node.status === 'joined')
{
const oldStatus = node.status;
node.status = 'synced';
this.#logger.status(`Cluster node ${node.name} changed status from ${oldStatus} to ${node.status}`);
}
});
}
return;
}
const members = update.members.split(',');
const index = parseInt(update.index);
if (index === -1)
return this.#logger.warn('Received -1 index, member does not recognise itself');
const member = members[index];
const [ id,, host ] = member.split('/');
let node = this.#nodes.find(n => n.uuid === id);
if (!node) // Node ID has changed due to restart, use IP to figure out which one
{
const [ ip ] = host.split(':');
node = this.#nodes.find(n => n.host === ip);
if (!node)
return this.#logger.warn('Received status update for node that is not present in config');
node.uuid = id;
}
if (node.status !== update.status)
{
const oldStatus = node.status;
node.status = update.status;
this.#logger.status(`Cluster node ${node.name} changed status from ${oldStatus} to ${node.status}`);
}
}
finishQueries ()
{
return new Promise<void>(resolve =>
{
this.#afterLastQuery = resolve;
});
}
getConnection (nodeName: string | null, throwError: boolean): Promise<PoolConnection>
{
return new Promise((resolve, reject) =>
{
if (!this.#pool)
return reject(new Error('Pool closed'));
// Get node by name
const pool = this.#pool;
if (nodeName)
{
const node = this.#nodes.find(n => n.name === nodeName);
if (!node)
{
const str = `Node ${nodeName} is not available in pool, falling back to arbitrary node`;
if (throwError)
throw new Error(str);
this.#logger.warn(str);
nodeName = '*';
}
else if (node.status && node.status !== 'synced')
{
const str = `Node ${nodeName} is currently not synced with the pool and thus unqueryable`;
if (throwError)
throw new Error(str);
this.#logger.warn(str);
nodeName = '*';
}
}
this.#logger.debug(`Selected node ${nodeName} for query`);
return pool.of(nodeName ?? '*').getConnection((err, conn) =>
{
if (err)
return reject(err);
return resolve(conn);
});
});
}
/**
* Retry certain queries that are safe to retry, e.g. deadlock
* @throws {MariaError}
* @private
* */
async #_query<T> (query: string, values: Values, { timeout, node, errorIfNodeUnavailable }: QueryOptions = {}, attempts = 0):
Promise<T[] | FieldInfo[] | [OkPacket]>
{
const connection = await this.getConnection(node ?? null, errorIfNodeUnavailable ?? false);
try
{
const result = await new Promise<T[] | FieldInfo[] | [OkPacket]| undefined>((resolve, reject) =>
{
const q = connection.query({ timeout, sql: query }, values, (err, results, fields) =>
{
if (err)
reject(err);
else if (isOkPacket(results))
resolve([ results ]);
else if (results)
resolve(results);
else
resolve(fields);
connection.release();
});
this.#logger.debug(`Constructed query: ${q.sql}`);
});
return Promise.resolve(result || []);
}
catch (err)
{
const error = err as MariaError;
// Retry safe errors // (Galera) Instance not ready for query
if ((SAFE_TO_RETRY.includes(error.code) || error.errno === 1047) && attempts < 5) //
return this.#_query(query, values, { timeout, node }, ++attempts);
return Promise.reject(error);
}
}
async query<T> (query: string, values?: Values | QueryOptions, opts?: QueryOptions): Promise<T[] | FieldInfo[] | [OkPacket]>
{
if (values && !(values instanceof Array))
{
opts = values;
values = [];
}
if (!this.ready)
return Promise.reject(new Error('MariaDB not ready'));
query = query.trim();
let batch = false;
if (values && typeof values.some === 'function')
batch = values.some(val => val instanceof Array);
this.#logger.debug(`Incoming query (batch: ${batch})\n${query}\n${inspect(values)}`);
return this.#_query<T>(query, values ?? [], opts);
}
q<T> (query: string, values?: Values | QueryOptions, opts?: QueryOptions)
{
return this.query<T>(query, values, opts);
}
}
export { MariaDB };