retry certain queries (deadlocks)
This commit is contained in:
parent
0a2d0818e7
commit
ed2bbaa6b1
@ -3,13 +3,15 @@ const mysql = require('mysql');
|
||||
const { inspect } = require('node:util');
|
||||
const { Util } = require('../../util');
|
||||
|
||||
const SAFE_TO_RETRY = [ 'ER_LOCK_DEADLOCK' ];
|
||||
|
||||
class MariaDB {
|
||||
|
||||
constructor (server, config) {
|
||||
|
||||
if (!server)
|
||||
if (!server)
|
||||
Util.fatal(new Error('Missing reference to server!'));
|
||||
if (!config)
|
||||
if (!config)
|
||||
Util.fatal(new Error('No config options provided!'));
|
||||
|
||||
this.config = config;
|
||||
@ -19,7 +21,7 @@ class MariaDB {
|
||||
for (const host of hosts) {
|
||||
this._credentials.push({ host, user, port, password, database });
|
||||
}
|
||||
|
||||
|
||||
this.pool = null;
|
||||
this.ready = false;
|
||||
this.debug = true;
|
||||
@ -37,11 +39,10 @@ class MariaDB {
|
||||
this.logger.status(`Creating${this._cluster ? ' cluster' : ''} connection pool`);
|
||||
if (this._cluster) {
|
||||
this.pool = mysql.createPoolCluster(this.config.options.cluster);
|
||||
for (const creds of this._credentials)
|
||||
for (const creds of this._credentials)
|
||||
this.pool.add({ ...this.config.options.client, ...creds });
|
||||
|
||||
} else {
|
||||
this.pool = mysql.createPool({ ...this.config.options.client, ...this._credentials[0] });
|
||||
this.pool = mysql.createPool({ ...this.config.options.client, ...this._credentials[0] });
|
||||
}
|
||||
|
||||
this.pool.on('connection', (connection) => {
|
||||
@ -59,17 +60,17 @@ class MariaDB {
|
||||
|
||||
this.pool.on('release', (connection) => {
|
||||
this.activeQueries--;
|
||||
|
||||
if (!this.ready && !this.activeQueries && this.afterLastQuery)
|
||||
|
||||
if (!this.ready && !this.activeQueries && this.afterLastQuery)
|
||||
this.afterLastQuery();
|
||||
|
||||
|
||||
this.logger.debug(`Connection released: ${connection?.threadId || null}`);
|
||||
});
|
||||
|
||||
this.logger.info(`Testing MariaDB connection`);
|
||||
await new Promise((resolve, reject) => {
|
||||
this.pool.getConnection((err, conn) => {
|
||||
if (err)
|
||||
if (err)
|
||||
return reject(err);
|
||||
conn.release();
|
||||
return resolve();
|
||||
@ -106,30 +107,56 @@ class MariaDB {
|
||||
|
||||
}
|
||||
|
||||
query (query, values) {
|
||||
|
||||
getConnection () {
|
||||
return new Promise((resolve, reject) => {
|
||||
|
||||
if (!this.ready)
|
||||
return reject(new Error('MariaDB not ready'));
|
||||
|
||||
let batch = false;
|
||||
if (values && typeof values.some === 'function')
|
||||
batch = values.some(val => val instanceof Array);
|
||||
this.logger.debug(`Incoming query (batch: ${batch})\n${query}\n${inspect(values)}`);
|
||||
|
||||
// If connected to a cluster, pick one of the node connection pools
|
||||
// By default the config is set to use round robin for picking a pool when connected to a cluster
|
||||
const pool = this._cluster ? this.pool.of('*') : this.pool;
|
||||
|
||||
return pool.query(query, values, (err, results, fields) => {
|
||||
this.pool.getConnection((err, conn) => {
|
||||
if (err)
|
||||
return reject(err);
|
||||
if (results)
|
||||
return resolve(results);
|
||||
resolve(fields);
|
||||
resolve(conn);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Retry certain queries that are safe to retry, e.g. deadlock
|
||||
*
|
||||
* @private
|
||||
* */
|
||||
async #_query (query, values, attempts = 0) {
|
||||
const connection = await this.getConnection();
|
||||
try {
|
||||
const result = await new Promise((resolve, reject) => {
|
||||
const q = connection.query(query, values, (err, results, fields) => {
|
||||
if (err)
|
||||
reject(err);
|
||||
else if (results)
|
||||
resolve(results);
|
||||
else
|
||||
resolve(fields);
|
||||
connection.release();
|
||||
});
|
||||
this.logger.debug(`Constructed query: ${q.sql}`);
|
||||
});
|
||||
return Promise.resolve(result);
|
||||
} catch (err) {
|
||||
// Retry safe errors
|
||||
if (SAFE_TO_RETRY.includes(err.code) && attempts < 5) //
|
||||
return this.#_query(query, values, ++attempts);
|
||||
return Promise.reject(err);
|
||||
}
|
||||
}
|
||||
|
||||
async query (query, values) {
|
||||
|
||||
if (!this.ready)
|
||||
return Promise.reject(new Error('MariaDB not ready'));
|
||||
|
||||
let batch = false;
|
||||
if (values && typeof values.some === 'function')
|
||||
batch = values.some(val => val instanceof Array);
|
||||
this.logger.debug(`Incoming query (batch: ${batch})\n${query}\n${inspect(values)}`);
|
||||
|
||||
return this.#_query(query, values);
|
||||
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user