MariaDB wrapper can listen to status updates from cluster

This commit is contained in:
Erik 2023-07-13 19:06:07 +03:00
parent b68484d4c5
commit d1ed5afe09
Signed by: Navy.gif
GPG Key ID: 2532FBBB61C65A68
9 changed files with 469 additions and 189 deletions

269
.gitignore vendored
View File

@ -1,134 +1,135 @@
# ---> Node
# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
lerna-debug.log*
.pnpm-debug.log*
# Diagnostic reports (https://nodejs.org/api/report.html)
report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json
# Runtime data
pids
*.pid
*.seed
*.pid.lock
# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov
# Coverage directory used by tools like istanbul
coverage
*.lcov
# nyc test coverage
.nyc_output
# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files)
.grunt
# Bower dependency directory (https://bower.io/)
bower_components
# node-waf configuration
.lock-wscript
# Compiled binary addons (https://nodejs.org/api/addons.html)
build/Release
build
# Dependency directories
node_modules/
jspm_packages/
# Snowpack dependency directory (https://snowpack.dev/)
web_modules/
# TypeScript cache
*.tsbuildinfo
# Optional npm cache directory
.npm
# Optional eslint cache
.eslintcache
# Optional stylelint cache
.stylelintcache
# Microbundle cache
.rpt2_cache/
.rts2_cache_cjs/
.rts2_cache_es/
.rts2_cache_umd/
# Optional REPL history
.node_repl_history
# Output of 'npm pack'
*.tgz
# Yarn Integrity file
.yarn-integrity
# dotenv environment variable files
.env
.env.development.local
.env.test.local
.env.production.local
.env.local
# parcel-bundler cache (https://parceljs.org/)
.cache
.parcel-cache
# Next.js build output
.next
out
# Nuxt.js build / generate output
.nuxt
dist
# Gatsby files
.cache/
# Comment in the public line in if your project uses Gatsby and not Next.js
# https://nextjs.org/blog/next-9-1#public-directory-support
# public
# vuepress build output
.vuepress/dist
# vuepress v2.x temp and cache directory
.temp
.cache
# Docusaurus cache and generated files
.docusaurus
# Serverless directories
.serverless/
# FuseBox cache
.fusebox/
# DynamoDB Local files
.dynamodb/
# TernJS port file
.tern-port
# Stores VSCode versions used for testing VSCode extensions
.vscode-test
# yarn v2
.yarn/cache
.yarn/unplugged
.yarn/build-state.yml
.yarn/install-state.gz
.pnp.*
credentials.json
# ---> Node
# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
lerna-debug.log*
.pnpm-debug.log*
# Diagnostic reports (https://nodejs.org/api/report.html)
report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json
# Runtime data
pids
*.pid
*.seed
*.pid.lock
# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov
# Coverage directory used by tools like istanbul
coverage
*.lcov
# nyc test coverage
.nyc_output
# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files)
.grunt
# Bower dependency directory (https://bower.io/)
bower_components
# node-waf configuration
.lock-wscript
# Compiled binary addons (https://nodejs.org/api/addons.html)
build/Release
build
# Dependency directories
node_modules/
jspm_packages/
# Snowpack dependency directory (https://snowpack.dev/)
web_modules/
# TypeScript cache
*.tsbuildinfo
# Optional npm cache directory
.npm
# Optional eslint cache
.eslintcache
# Optional stylelint cache
.stylelintcache
# Microbundle cache
.rpt2_cache/
.rts2_cache_cjs/
.rts2_cache_es/
.rts2_cache_umd/
# Optional REPL history
.node_repl_history
# Output of 'npm pack'
*.tgz
# Yarn Integrity file
.yarn-integrity
# dotenv environment variable files
.env
.env.development.local
.env.test.local
.env.production.local
.env.local
# parcel-bundler cache (https://parceljs.org/)
.cache
.parcel-cache
# Next.js build output
.next
out
# Nuxt.js build / generate output
.nuxt
dist
# Gatsby files
.cache/
# Comment in the public line in if your project uses Gatsby and not Next.js
# https://nextjs.org/blog/next-9-1#public-directory-support
# public
# vuepress build output
.vuepress/dist
# vuepress v2.x temp and cache directory
.temp
.cache
# Docusaurus cache and generated files
.docusaurus
# Serverless directories
.serverless/
# FuseBox cache
.fusebox/
# DynamoDB Local files
.dynamodb/
# TernJS port file
.tern-port
# Stores VSCode versions used for testing VSCode extensions
.vscode-test
# yarn v2
.yarn/cache
.yarn/unplugged
.yarn/build-state.yml
.yarn/install-state.gz
.pnp.*
credentials.json
brokerCreds.json

18
LICENSE
View File

@ -1,9 +1,9 @@
MIT License
Copyright (c) <year> <copyright holders>
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
MIT License
Copyright (c) 2023 Navy.gif
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -1,10 +1,15 @@
# wrappers
Various wrapper classes I use in my projects.
These are specifically written for my use cases, though feel free to use.
MessageBroker: Wraps `amqp-connection-manager` and `amqplib` by extension. Ensures smooth failover when connected to a cluster.
MariaDB: Wraps `mysql`. Takes care of connection pooling whether connecting to a cluster or single instance.
MongoDB: Wraps `mongodb`. Primarily just adds helper functions.
# wrappers
Various wrapper classes I use in my projects.
These are specifically written for my use cases, though feel free to use.
MessageBroker: Wraps `amqp-connection-manager` and `amqplib` by extension. Ensures smooth failover when connected to a cluster.
MariaDB: Wraps `mysql`. Takes care of connection pooling whether connecting to a cluster or single instance.
MongoDB: Wraps `mongodb`. Primarily just adds helper functions.
The MariaDB wrapper takes care of automatic cluster status updates if supplied with an event source, it provides a listener through the `statusUpdateListener` property. It ensures that the wrapper does not query a node that is not currently synced in the cluster. Somewhat similar to how the MessageBroker handles automatic failover.
The MongoDB native driver seems to take care of this stuff by itself, so it remains a fairly trivial wrapper with just helper functions.
A notification script that can be used to listen for cluster updates is located in `/scripts/wsrep_notify.sh`. It takes the arguments provided by galera and posts them to a given URL, the JSON payload is then to be fed into the status update listener of the MariaDB wrapper.
Expected to be used together with a parent class that has a `createLogger` method, as defined in `/src/interfaces/Server.ts` and `/src/interfaces/Logger.ts`, utilising the logger from https://git.corgi.wtf/Navy.gif/logger.

56
scripts/wsrep_notify.sh Normal file
View File

@ -0,0 +1,56 @@
#!/bin/bash -eu
# Add wsrep_notify_cmd option
# wsrep_notify_cmd = /usr/bin/wsrep_notify.sh
# Ensure mysql owns the script
# sudo chown mysql:mysql /usr/bin/wsrep_notify.sh
# Also ensure it can be ran
# sudo chmod +x /usr/bin/wsrep_notify.sh
STATUS=""
CLUSTER_UUID=""
PRIMARY=""
INDEX=""
MEMBERS=""
while [ $# -gt 0 ]
do
case $1 in
--status)
STATUS=$2
shift
;;
--uuid)
CLUSTER_UUID=$2
shift
;;
--primary)
[ "$2" = "yes" ] && PRIMARY="1" || PRIMARY="0"
COM=configuration_change
shift
;;
--index)
INDEX=$2
shift
;;
--members)
MEMBERS=$2
shift
;;
esac
shift
done
# I hate bash, I suck at it and I don't want to be good
PAYLOAD="{\\\"status\\\": \\\"${STATUS}\\\", \\\"uuid\\\": \\\"$CLUSTER_UUID\\\", \\\"primary\\\": \\\"$PRIMARY\\\", \\\"index\\\": \\\"$INDEX\\\", \\\"members\\\": \\\"$MEMBERS\\\"}"
DATA='{
"routing_key": "",
"payload_encoding": "string",
"properties": {},'
DATA+="\"payload\": \"$PAYLOAD\" }"
curl --location '[INSERT LOCATION HERE]' \
--header 'Content-Type: application/json' \
--header 'Authorization: [INSERT AUTH TOKEN]' \
--data "$DATA" \
-s

View File

@ -1,7 +1,8 @@
import { inspect } from 'node:util';
import { ILogger, IServer } from './interfaces/index.js';
import dns from 'node:dns/promises';
import mysql, { PoolCluster, PoolClusterConfig, PoolConfig, PoolConnection, FieldInfo, OkPacket } from 'mysql';
import mysql, { PoolCluster, PoolClusterConfig, PoolConfig, PoolConnection, FieldInfo } from 'mysql';
import { ILogger, IServer } from './interfaces/index.js';
import { LoggerClientOptions } from './interfaces/Logger.js';
const SAFE_TO_RETRY = [ 'ER_LOCK_DEADLOCK', 'PROTOCOL_CONNECTION_LOST' ];
@ -32,10 +33,48 @@ type MariaError = {
sqlMessage:string
} & Error
type Values = (string | number | (string | number)[] | (string | number)[][])[]
type QueryOptions = {
node?: string,
timeout?: number
timeout?: number,
errorIfNodeUnavailable?: boolean
}
type StatusString =
| 'disconnecting'
| 'disconnected'
| 'connecting'
| 'connected'
| 'synced'
| 'donor'
| 'joiner'
| 'joined'
type Node = {
name: string,
host: string,
uuid?: string,
status?: StatusString
}
type StatusUpdate = {
status: StatusString,
uuid: string,
primary: string,
index: string,
members: string
}
const isOkPacket = (obj: object): obj is OkPacket =>
{
if ('fieldCount' in obj
&& 'affectedRows' in obj
&& 'message' in obj
)
return true;
return false;
};
// Designed to work in a single instance or cluster (galera) configuration
// Will need some work for primary/replica configuration (e.g. not treating all nodes as read-write nodes)
class MariaDB
{
@ -50,7 +89,7 @@ class MariaDB
#credentials: Credentials[];
#cluster: boolean;
#pool: PoolCluster | null;
#nodes: string[];
#nodes: Node[];
constructor (server: IServer, options: MariaOptions)
{
@ -69,10 +108,10 @@ class MariaDB
for (const remote of hosts)
{
const [ node ] = remote.split('.');
if (typeof node === 'undefined' || !(/(([a-zA-Z]\w*[0-9]?)|([0-9]?\w*[a-zA-Z]))/u).test(node))
if (typeof node === 'undefined' || !(/(([a-zA-Z]\w*[0-9]?)|([0-9]?\w*[a-zA-Z]))/u).test(node))
throw new Error('Invalid host config, expecting qualified domain names');
this.#credentials.push({ host: remote, user, port, password, database, node });
this.#nodes.push(node);
this.#nodes.push({ name: node, host: remote });
}
this.#pool = null;
@ -98,7 +137,6 @@ class MariaDB
async init ()
{
if (!this.#load)
return this.#logger.info('Not loading MariaDB');
@ -106,8 +144,9 @@ class MariaDB
this.#pool = mysql.createPoolCluster(this.#config.cluster);
for (const creds of this.#credentials)
{
this.#pool.add(creds.node, { ...this.#config.client, ...creds });
this.#logger.info(`Added node ${creds.node} to pool cluster`);
const name = creds.node;
this.#pool.add(name, { ...this.#config.client, ...creds });
this.#logger.info(`Added node ${name} to pool cluster`);
}
this.#pool.on('connection', (connection) =>
@ -156,8 +195,35 @@ class MariaDB
return resolve();
});
});
this.#logger.status('Database connected');
if (this.#cluster)
{ // Resolve the UUID for each node to enable status updates
this.#logger.status('Resolving cluster node UUIDs for status updates');
for (const node of this.#nodes)
{
// UUIDs are not always enough, we also need the node's IP for cases where the UUID changes due to restarts
const dnsResult = await dns.lookup(node.host, 4);
node.host = dnsResult.address;
const response = await this.#_query('SHOW STATUS WHERE `Variable_name` = "wsrep_gcomm_uuid" OR `Variable_name` = "wsrep_local_state_comment"', [], { node: node.name }).catch(() => null) as { Variable_name: string, Value: string }[];
// Error gives us null
if (!response)
{
this.#logger.info(`Could not resolve UUID for ${node.name}, presumably offline, setting status to disconnected`);
node.status = 'disconnected';
continue;
}
// if we for some reason get a response that is empty, we got a problem
if (!response.length)
throw new Error(`Failed to acquire UUID for node ${node.name}`);
const uuid = response.find(entry => entry.Variable_name === 'wsrep_gcomm_uuid')?.Value;
const status = response.find(entry => entry.Variable_name === 'wsrep_local_state_comment')?.Value.toLowerCase();
node.uuid = uuid;
node.status = status as StatusString;
}
}
this.#_ready = true;
return this;
@ -189,6 +255,65 @@ class MariaDB
});
}
get statusUpdateListener ()
{
return this.#statusUpdate.bind(this);
}
// This is a bit of a clusterfuck but it seems to work
#statusUpdate (update: StatusUpdate)
{
if (!update.members.length)
{
if (update.status === 'disconnected')
this.#nodes.forEach(node =>
{
if (node.status === 'disconnecting')
{
const oldStatus = node.status;
node.status = 'disconnected';
this.#logger.status(`Cluster node ${node.name} changed status from ${oldStatus} to ${node.status}`);
}
});
else if (update.status === 'synced')
{ // Sometimes we'll get a status update without the member info, probably safe to assume if we get a synced status the recently joined members will be synced
this.#nodes.forEach(node =>
{
if (node.status === 'joined')
{
const oldStatus = node.status;
node.status = 'synced';
this.#logger.status(`Cluster node ${node.name} changed status from ${oldStatus} to ${node.status}`);
}
});
}
return;
}
const members = update.members.split(',');
const index = parseInt(update.index);
if (index === -1)
return this.#logger.warn('Received -1 index, member does not recognise itself');
const member = members[index];
const [ id,, host ] = member.split('/');
let node = this.#nodes.find(n => n.uuid === id);
if (!node) // Node ID has changed due to restart, use IP to figure out which one
{
const [ ip ] = host.split(':');
node = this.#nodes.find(n => n.host === ip);
if (!node)
return this.#logger.warn('Received status update for node that is not present in config');
node.uuid = id;
}
if (node.status !== update.status)
{
const oldStatus = node.status;
node.status = update.status;
this.#logger.status(`Cluster node ${node.name} changed status from ${oldStatus} to ${node.status}`);
}
}
finishQueries ()
{
return new Promise<void>(resolve =>
@ -197,7 +322,7 @@ class MariaDB
});
}
getConnection (node?: string): Promise<PoolConnection>
getConnection (nodeName: string | null, throwError: boolean): Promise<PoolConnection>
{
return new Promise((resolve, reject) =>
{
@ -205,12 +330,28 @@ class MariaDB
return reject(new Error('Pool closed'));
// Get node by name
const pool = this.#pool;
if (node && !this.#nodes.includes(node))
if (nodeName)
{
this.#logger.warn(`Node ${node} is not available in pool, falling back to arbitrary node`);
node = '*';
const node = this.#nodes.find(n => n.name === nodeName);
if (!node)
{
const str = `Node ${nodeName} is not available in pool, falling back to arbitrary node`;
if (throwError)
throw new Error(str);
this.#logger.warn(str);
nodeName = '*';
}
else if (node.status && node.status !== 'synced')
{
const str = `Node ${nodeName} is currently not synced with the pool and thus unqueryable`;
if (throwError)
throw new Error(str);
this.#logger.warn(str);
nodeName = '*';
}
}
return pool.of(node ?? '*').getConnection((err, conn) =>
this.#logger.debug(`Selected node ${nodeName} for query`);
return pool.of(nodeName ?? '*').getConnection((err, conn) =>
{
if (err)
return reject(err);
@ -225,19 +366,21 @@ class MariaDB
* @throws {MariaError}
* @private
* */
async #_query<T> (query: string, values: (string | number | string[] | number[])[], { timeout, node }: QueryOptions = {}, attempts = 0):
Promise<T[] | FieldInfo[]>
async #_query<T> (query: string, values: Values, { timeout, node, errorIfNodeUnavailable }: QueryOptions = {}, attempts = 0):
Promise<T[] | FieldInfo[] | [OkPacket]>
{
const connection = await this.getConnection(node);
const connection = await this.getConnection(node ?? null, errorIfNodeUnavailable ?? false);
try
{
const result = await new Promise<T[] | FieldInfo[] | undefined>((resolve, reject) =>
const result = await new Promise<T[] | FieldInfo[] | [OkPacket]| undefined>((resolve, reject) =>
{
const q = connection.query({ timeout, sql: query }, values, (err, results, fields) =>
{
if (err)
reject(err);
else if (results)
else if (isOkPacket(results))
resolve([ results ]);
else if (results)
resolve(results);
else
resolve(fields);
@ -257,9 +400,14 @@ class MariaDB
}
}
async query<T> (query: string, values: (string | number | string[] | number[])[], opts?: QueryOptions): Promise<T[] | FieldInfo[]>
async query<T> (query: string, values?: Values | QueryOptions, opts?: QueryOptions): Promise<T[] | FieldInfo[] | [OkPacket]>
{
if (values && !(values instanceof Array))
{
opts = values;
values = [];
}
if (!this.ready)
return Promise.reject(new Error('MariaDB not ready'));
@ -268,11 +416,11 @@ class MariaDB
batch = values.some(val => val instanceof Array);
this.#logger.debug(`Incoming query (batch: ${batch})\n${query}\n${inspect(values)}`);
return this.#_query<T>(query, values, opts);
return this.#_query<T>(query, values ?? [], opts);
}
q<T> (query: string, values: (string | number | string[] | number[])[], opts?: QueryOptions)
q<T> (query: string, values?: Values | QueryOptions, opts?: QueryOptions)
{
return this.query<T>(query, values, opts);
}

View File

@ -1,4 +1,6 @@
import { readFileSync } from 'fs';
import { MessageBroker } from '../build/esm/index.js';
const credentials = JSON.parse(readFileSync('../brokerCreds.json', { encoding: 'utf-8' }));
const broker = new MessageBroker({
createLogger: () =>
@ -11,17 +13,13 @@ const broker = new MessageBroker({
error: console.error
};
}
}, {
load: true,
host: 'rabbitmq-01.stylis.local',
user: 'stylis',
pass: 'RrwJyrfeXFMimDH3hjZ5xSreMAmXtQJj',
vhost: 'development',
port: 5672
});
}, credentials);
await broker.init();
broker.subscribe('chatlogs', (message) =>
broker.subscribe('db_cluster_status', (message) =>
{
console.log(message);
if (message instanceof Buffer)
console.log(message.toString());
else
console.log(message);
});

View File

@ -0,0 +1,63 @@
process.env.NODE_ENV = 'development';
import { MariaDB, MessageBroker } from '../build/esm/index.js';
import { readFileSync } from 'fs';
const credentials = JSON.parse(readFileSync('./credentials.json', { encoding: 'utf-8' }));
const maria = new MariaDB({
createLogger: () =>
{
return {
debug: console.log,
info: console.log,
status: console.log,
warn: console.log,
error: console.error
};
}
}, {
load: true,
credentials,
cluster: {
canRetry: true,
removeNodeErrorCount: 5,
restoreNodeTimeout: 60000,
defaultSelector: 'RR'
}
});
const broker = new MessageBroker({
createLogger: () =>
{
return {
debug: console.log,
info: console.log,
status: console.log,
warn: console.log,
error: console.error
};
}
}, {
load: true,
host: 'rabbitmq-01.stylis.local',
user: 'stylis',
pass: 'RrwJyrfeXFMimDH3hjZ5xSreMAmXtQJj',
vhost: 'development',
port: 5672
});
await broker.init();
await maria.init();
broker.subscribe('db_cluster_status', maria.statusUpdateListener);
setInterval(async () =>
{
console.log('Result: ', await maria.query('SHOW STATUS WHERE `Variable_name` = "wsrep_gcomm_uuid" OR `Variable_name` = "wsrep_local_state_comment"', { node: 'maria-t03', errorIfNodeUnavailable: true }).catch(() => null));
}, 15_000).unref();
process.on('SIGINT', async () =>
{
await broker.close();
await maria.close();
process.exit();
});

View File

@ -1,7 +1,8 @@
process.env.NODE_ENV = 'development';
import { readFileSync } from 'fs';
import { MariaDB } from '../build/esm/index.js';
const credentials = JSON.parse(readFileSync('./credentials.json', { encoding: 'utf-8' }));
const credentials = JSON.parse(readFileSync('../credentials.json', { encoding: 'utf-8' }));
const maria = new MariaDB({
createLogger: () =>
@ -16,11 +17,17 @@ const maria = new MariaDB({
}
}, {
load: true,
credentials
credentials,
cluster: {
canRetry: true,
removeNodeErrorCount: 5,
restoreNodeTimeout: 60000,
defaultSelector: 'RR'
}
});
await maria.init();
await maria.query('INSERT INTO `test` (`dingle`, `bingle`) VALUES ?', [[ 1, 2 ], [ 3, 4 ], [ 5, 6 ]]);
// await maria.query('INSERT INTO `test` (`dingle`, `bingle`) VALUES ?', [[ 1, 2 ], [ 3, 4 ], [ 5, 6 ]]);
await maria.close();

View File

@ -39,4 +39,6 @@ const ips = result.map(loc => loc.ip);
const ranges = new Set(ips.map(ip => ip.split('.').slice(0, 3).join('.')));
console.log(ranges);
console.log(await mongo.collection('locations').findOneAndUpdate({ name: 'dingus' }, { $set: { name: 'dingle' } }, { returnDocument: 'before' }));
await mongo.close();