Merge pull request #1121 from nymkappa/feature/database-migration-update

Wrap migration with transactions
This commit is contained in:
wiz 2022-01-12 11:28:14 +00:00 committed by GitHub
commit a9c1dc3726
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 299 additions and 141 deletions

View File

@ -1,62 +1,144 @@
import { PoolConnection } from 'mysql2/promise';
import config from '../config';
import { DB } from '../database';
import logger from '../logger';
const sleep = (ms: number) => new Promise( res => setTimeout(res, ms));
class DatabaseMigration {
private static currentVersion = 2;
private queryTimeout = 120000;
private statisticsAddedIndexed = false;
constructor() { }
/**
* Entry point
*/
public async $initializeOrMigrateDatabase(): Promise<void> {
if (!await this.$checkIfTableExists('statistics')) {
await this.$initializeDatabaseTables();
}
logger.info('MIGRATIONS: Running migrations');
if (await this.$checkIfTableExists('state')) {
const databaseSchemaVersion = await this.$getSchemaVersionFromDatabase();
if (DatabaseMigration.currentVersion > databaseSchemaVersion) {
await this.$migrateTableSchemaFromVersion(databaseSchemaVersion);
await this.$printDatabaseVersion();
// First of all, if the `state` database does not exist, create it so we can track migration version
if (!await this.$checkIfTableExists('state')) {
logger.info('MIGRATIONS: `state` table does not exist. Creating it.');
try {
await this.$createMigrationStateTable();
} catch (e) {
logger.err('MIGRATIONS: Unable to create `state` table, aborting in 10 seconds. ' + e);
await sleep(10000);
process.exit(-1);
}
} else {
await this.$migrateTableSchemaFromVersion(0);
logger.info('MIGRATIONS: `state` table initialized.');
}
let databaseSchemaVersion = 0;
try {
databaseSchemaVersion = await this.$getSchemaVersionFromDatabase();
} catch (e) {
logger.err('MIGRATIONS: Unable to get current database migration version, aborting in 10 seconds. ' + e);
await sleep(10000);
process.exit(-1);
}
logger.info('MIGRATIONS: Current state.schema_version ' + databaseSchemaVersion);
logger.info('MIGRATIONS: Latest DatabaseMigration.version is ' + DatabaseMigration.currentVersion);
if (databaseSchemaVersion >= DatabaseMigration.currentVersion) {
logger.info('MIGRATIONS: Nothing to do.');
return;
}
// Now, create missing tables. Those queries cannot be wrapped into a transaction unfortunately
try {
await this.$createMissingTablesAndIndexes(databaseSchemaVersion);
} catch (e) {
logger.err('MIGRATIONS: Unable to create required tables, aborting in 10 seconds. ' + e);
await sleep(10000);
process.exit(-1);
}
if (DatabaseMigration.currentVersion > databaseSchemaVersion) {
logger.info('MIGRATIONS: Upgrading datababse schema');
try {
await this.$migrateTableSchemaFromVersion(databaseSchemaVersion);
logger.info(`MIGRATIONS: OK. Database schema have been migrated from version ${databaseSchemaVersion} to ${DatabaseMigration.currentVersion} (latest version)`);
} catch (e) {
logger.err('MIGRATIONS: Unable to migrate database, aborting. ' + e);
}
}
return;
}
/**
* Create all missing tables
*/
private async $createMissingTablesAndIndexes(databaseSchemaVersion: number) {
await this.$setStatisticsAddedIndexedFlag(databaseSchemaVersion);
const connection = await DB.pool.getConnection();
try {
await this.$executeQuery(connection, this.getCreateElementsTableQuery(), await this.$checkIfTableExists('elements_pegs'));
await this.$executeQuery(connection, this.getCreateStatisticsQuery(), await this.$checkIfTableExists('statistics'));
if (databaseSchemaVersion < 2 && this.statisticsAddedIndexed === false) {
await this.$executeQuery(connection, `CREATE INDEX added ON statistics (added);`);
}
connection.release();
} catch (e) {
connection.release();
throw e;
}
}
private async $initializeDatabaseTables(): Promise<void> {
const connection = await DB.pool.getConnection();
for (const query of this.getInitializeTableQueries()) {
await connection.query<any>({ sql: query, timeout: this.queryTimeout });
/**
* Special case here for the `statistics` table - It appeared that somehow some dbs already had the `added` field indexed
* while it does not appear in previous schemas. The mariadb command "CREATE INDEX IF NOT EXISTS" is not supported on
* older mariadb version. Therefore we set a flag here in order to know if the index needs to be created or not before
* running the migration process
*/
private async $setStatisticsAddedIndexedFlag(databaseSchemaVersion: number) {
if (databaseSchemaVersion >= 2) {
this.statisticsAddedIndexed = true;
return;
}
connection.release();
logger.info(`Initial database tables have been created`);
}
private async $migrateTableSchemaFromVersion(version: number): Promise<void> {
const connection = await DB.pool.getConnection();
for (const query of this.getMigrationQueriesFromVersion(version)) {
await connection.query<any>({ sql: query, timeout: this.queryTimeout });
try {
// We don't use "CREATE INDEX IF NOT EXISTS" because it is not supported on old mariadb version 5.X
const query = `SELECT COUNT(1) hasIndex FROM INFORMATION_SCHEMA.STATISTICS
WHERE table_schema=DATABASE() AND table_name='statistics' AND index_name='added';`;
const [rows] = await this.$executeQuery(connection, query, true);
if (rows[0].hasIndex === 0) {
logger.info('MIGRATIONS: `statistics.added` is not indexed');
this.statisticsAddedIndexed = false;
} else if (rows[0].hasIndex === 1) {
logger.info('MIGRATIONS: `statistics.added` is already indexed');
this.statisticsAddedIndexed = true;
}
} catch (e) {
// Should really never happen but just in case it fails, we just don't execute
// any query related to this indexing so it won't fail if the index actually already exists
logger.err('MIGRATIONS: Unable to check if `statistics.added` INDEX exist or not.');
this.statisticsAddedIndexed = true;
}
connection.release();
await this.$updateToLatestSchemaVersion();
logger.info(`Database schema have been migrated from version ${version} to ${DatabaseMigration.currentVersion} (latest version)`);
}
private async $getSchemaVersionFromDatabase(): Promise<number> {
const connection = await DB.pool.getConnection();
const query = `SELECT number FROM state WHERE name = 'schema_version';`;
const [rows] = await connection.query<any>({ sql: query, timeout: this.queryTimeout });
connection.release();
return rows[0]['number'];
}
private async $updateToLatestSchemaVersion(): Promise<void> {
const connection = await DB.pool.getConnection();
const query = `UPDATE state SET number = ${DatabaseMigration.currentVersion} WHERE name = 'schema_version'`;
const [rows] = await connection.query<any>({ sql: query, timeout: this.queryTimeout });
connection.release();
}
/**
* Small query execution wrapper to log all executed queries
*/
private async $executeQuery(connection: PoolConnection, query: string, silent: boolean = false): Promise<any> {
if (!silent) {
logger.info('MIGRATIONS: Execute query:\n' + query);
}
return connection.query<any>({ sql: query, timeout: this.queryTimeout });
}
/**
* Check if 'table' exists in the database
*/
private async $checkIfTableExists(table: string): Promise<boolean> {
const connection = await DB.pool.getConnection();
const query = `SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = '${config.DATABASE.DATABASE}' AND TABLE_NAME = '${table}'`;
@ -65,11 +147,132 @@ class DatabaseMigration {
return rows[0]['COUNT(*)'] === 1;
}
private getInitializeTableQueries(): string[] {
/**
* Get current database version
*/
private async $getSchemaVersionFromDatabase(): Promise<number> {
const connection = await DB.pool.getConnection();
const query = `SELECT number FROM state WHERE name = 'schema_version';`;
const [rows] = await this.$executeQuery(connection, query, true);
connection.release();
return rows[0]['number'];
}
/**
* Create the `state` table
*/
private async $createMigrationStateTable(): Promise<void> {
const connection = await DB.pool.getConnection();
try {
const query = `CREATE TABLE IF NOT EXISTS state (
name varchar(25) NOT NULL,
number int(11) NULL,
string varchar(100) NULL,
CONSTRAINT name_unique UNIQUE (name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
await this.$executeQuery(connection, query);
// Set initial values
await this.$executeQuery(connection, `INSERT INTO state VALUES('schema_version', 0, NULL);`);
await this.$executeQuery(connection, `INSERT INTO state VALUES('last_elements_block', 0, NULL);`);
connection.release();
} catch (e) {
connection.release();
throw e;
}
}
/**
* We actually execute the migrations queries here
*/
private async $migrateTableSchemaFromVersion(version: number): Promise<void> {
const transactionQueries: string[] = [];
for (const query of this.getMigrationQueriesFromVersion(version)) {
transactionQueries.push(query);
}
transactionQueries.push(this.getUpdateToLatestSchemaVersionQuery());
const connection = await DB.pool.getConnection();
try {
await this.$executeQuery(connection, 'START TRANSACTION;');
await this.$executeQuery(connection, 'SET autocommit = 0;');
for (const query of transactionQueries) {
await this.$executeQuery(connection, query);
}
await this.$executeQuery(connection, 'COMMIT;');
connection.release();
} catch (e) {
await this.$executeQuery(connection, 'ROLLBACK;');
connection.release();
throw e;
}
}
/**
* Generate migration queries based on schema version
*/
private getMigrationQueriesFromVersion(version: number): string[] {
const queries: string[] = [];
queries.push(`CREATE TABLE IF NOT EXISTS statistics (
id int(11) NOT NULL,
if (version < 1) {
if (config.MEMPOOL.NETWORK !== 'liquid' && config.MEMPOOL.NETWORK !== 'liquidtestnet') {
queries.push(this.getShiftStatisticsQuery());
}
}
return queries;
}
/**
* Save the schema version in the database
*/
private getUpdateToLatestSchemaVersionQuery(): string {
return `UPDATE state SET number = ${DatabaseMigration.currentVersion} WHERE name = 'schema_version';`;
}
/**
* Print current database version
*/
private async $printDatabaseVersion() {
const connection = await DB.pool.getConnection();
try {
const [rows] = await this.$executeQuery(connection, 'SELECT VERSION() as version;', true);
logger.info(`MIGRATIONS: Database engine version '${rows[0].version}'`);
} catch (e) {
logger.info(`MIGRATIONS: Could not fetch database engine version. ` + e);
}
connection.release();
}
// Couple of wrappers to clean the main logic
private getShiftStatisticsQuery(): string {
return `UPDATE statistics SET
vsize_1 = vsize_1 + vsize_2, vsize_2 = vsize_3,
vsize_3 = vsize_4, vsize_4 = vsize_5,
vsize_5 = vsize_6, vsize_6 = vsize_8,
vsize_8 = vsize_10, vsize_10 = vsize_12,
vsize_12 = vsize_15, vsize_15 = vsize_20,
vsize_20 = vsize_30, vsize_30 = vsize_40,
vsize_40 = vsize_50, vsize_50 = vsize_60,
vsize_60 = vsize_70, vsize_70 = vsize_80,
vsize_80 = vsize_90, vsize_90 = vsize_100,
vsize_100 = vsize_125, vsize_125 = vsize_150,
vsize_150 = vsize_175, vsize_175 = vsize_200,
vsize_200 = vsize_250, vsize_250 = vsize_300,
vsize_300 = vsize_350, vsize_350 = vsize_400,
vsize_400 = vsize_500, vsize_500 = vsize_600,
vsize_600 = vsize_700, vsize_700 = vsize_800,
vsize_800 = vsize_900, vsize_900 = vsize_1000,
vsize_1000 = vsize_1200, vsize_1200 = vsize_1400,
vsize_1400 = vsize_1800, vsize_1800 = vsize_2000, vsize_2000 = 0;`;
}
private getCreateStatisticsQuery(): string {
return `CREATE TABLE IF NOT EXISTS statistics (
id int(11) NOT NULL AUTO_INCREMENT,
added datetime NOT NULL,
unconfirmed_transactions int(11) UNSIGNED NOT NULL,
tx_per_second float UNSIGNED NOT NULL,
@ -114,68 +317,23 @@ class DatabaseMigration {
vsize_1400 int(11) NOT NULL,
vsize_1600 int(11) NOT NULL,
vsize_1800 int(11) NOT NULL,
vsize_2000 int(11) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`);
queries.push(`ALTER TABLE statistics ADD PRIMARY KEY (id);`);
queries.push(`ALTER TABLE statistics MODIFY id int(11) NOT NULL AUTO_INCREMENT;`);
return queries;
vsize_2000 int(11) NOT NULL,
CONSTRAINT PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
}
private getMigrationQueriesFromVersion(version: number): string[] {
const queries: string[] = [];
if (version < 1) {
if (config.MEMPOOL.NETWORK !== 'liquid' && config.MEMPOOL.NETWORK !== 'liquidtestnet') {
queries.push(`UPDATE statistics SET
vsize_1 = vsize_1 + vsize_2, vsize_2 = vsize_3,
vsize_3 = vsize_4, vsize_4 = vsize_5,
vsize_5 = vsize_6, vsize_6 = vsize_8,
vsize_8 = vsize_10, vsize_10 = vsize_12,
vsize_12 = vsize_15, vsize_15 = vsize_20,
vsize_20 = vsize_30, vsize_30 = vsize_40,
vsize_40 = vsize_50, vsize_50 = vsize_60,
vsize_60 = vsize_70, vsize_70 = vsize_80,
vsize_80 = vsize_90, vsize_90 = vsize_100,
vsize_100 = vsize_125, vsize_125 = vsize_150,
vsize_150 = vsize_175, vsize_175 = vsize_200,
vsize_200 = vsize_250, vsize_250 = vsize_300,
vsize_300 = vsize_350, vsize_350 = vsize_400,
vsize_400 = vsize_500, vsize_500 = vsize_600,
vsize_600 = vsize_700, vsize_700 = vsize_800,
vsize_800 = vsize_900, vsize_900 = vsize_1000,
vsize_1000 = vsize_1200, vsize_1200 = vsize_1400,
vsize_1400 = vsize_1800, vsize_1800 = vsize_2000, vsize_2000 = 0`);
}
queries.push(`CREATE TABLE IF NOT EXISTS elements_pegs (
block int(11) NOT NULL,
datetime int(11) NOT NULL,
amount bigint(20) NOT NULL,
txid varchar(65) NOT NULL,
txindex int(11) NOT NULL,
bitcoinaddress varchar(100) NOT NULL,
bitcointxid varchar(65) NOT NULL,
bitcoinindex int(11) NOT NULL,
final_tx int(11) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`);
queries.push(`CREATE TABLE IF NOT EXISTS state (
name varchar(25) NOT NULL,
number int(11) NULL,
string varchar(100) NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`);
queries.push(`INSERT INTO state VALUES('schema_version', 0, NULL);`);
queries.push(`INSERT INTO state VALUES('last_elements_block', 0, NULL);`);
}
if (version < 2) {
queries.push(`CREATE INDEX IF NOT EXISTS added ON statistics (added);`);
}
return queries;
private getCreateElementsTableQuery(): string {
return `CREATE TABLE IF NOT EXISTS elements_pegs (
block int(11) NOT NULL,
datetime int(11) NOT NULL,
amount bigint(20) NOT NULL,
txid varchar(65) NOT NULL,
txindex int(11) NOT NULL,
bitcoinaddress varchar(100) NOT NULL,
bitcointxid varchar(65) NOT NULL,
bitcoinindex int(11) NOT NULL,
final_tx int(11) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
}
}

View File

@ -270,47 +270,47 @@ class Statistics {
private getQueryForDaysAvg(div: number, interval: string) {
return `SELECT id, UNIX_TIMESTAMP(added) as added,
CAST(avg(unconfirmed_transactions) as FLOAT) as unconfirmed_transactions,
CAST(avg(tx_per_second) as FLOAT) as tx_per_second,
CAST(avg(vbytes_per_second) as FLOAT) as vbytes_per_second,
CAST(avg(vsize_1) as FLOAT) as vsize_1,
CAST(avg(vsize_2) as FLOAT) as vsize_2,
CAST(avg(vsize_3) as FLOAT) as vsize_3,
CAST(avg(vsize_4) as FLOAT) as vsize_4,
CAST(avg(vsize_5) as FLOAT) as vsize_5,
CAST(avg(vsize_6) as FLOAT) as vsize_6,
CAST(avg(vsize_8) as FLOAT) as vsize_8,
CAST(avg(vsize_10) as FLOAT) as vsize_10,
CAST(avg(vsize_12) as FLOAT) as vsize_12,
CAST(avg(vsize_15) as FLOAT) as vsize_15,
CAST(avg(vsize_20) as FLOAT) as vsize_20,
CAST(avg(vsize_30) as FLOAT) as vsize_30,
CAST(avg(vsize_40) as FLOAT) as vsize_40,
CAST(avg(vsize_50) as FLOAT) as vsize_50,
CAST(avg(vsize_60) as FLOAT) as vsize_60,
CAST(avg(vsize_70) as FLOAT) as vsize_70,
CAST(avg(vsize_80) as FLOAT) as vsize_80,
CAST(avg(vsize_90) as FLOAT) as vsize_90,
CAST(avg(vsize_100) as FLOAT) as vsize_100,
CAST(avg(vsize_125) as FLOAT) as vsize_125,
CAST(avg(vsize_150) as FLOAT) as vsize_150,
CAST(avg(vsize_175) as FLOAT) as vsize_175,
CAST(avg(vsize_200) as FLOAT) as vsize_200,
CAST(avg(vsize_250) as FLOAT) as vsize_250,
CAST(avg(vsize_300) as FLOAT) as vsize_300,
CAST(avg(vsize_350) as FLOAT) as vsize_350,
CAST(avg(vsize_400) as FLOAT) as vsize_400,
CAST(avg(vsize_500) as FLOAT) as vsize_500,
CAST(avg(vsize_600) as FLOAT) as vsize_600,
CAST(avg(vsize_700) as FLOAT) as vsize_700,
CAST(avg(vsize_800) as FLOAT) as vsize_800,
CAST(avg(vsize_900) as FLOAT) as vsize_900,
CAST(avg(vsize_1000) as FLOAT) as vsize_1000,
CAST(avg(vsize_1200) as FLOAT) as vsize_1200,
CAST(avg(vsize_1400) as FLOAT) as vsize_1400,
CAST(avg(vsize_1600) as FLOAT) as vsize_1600,
CAST(avg(vsize_1800) as FLOAT) as vsize_1800,
CAST(avg(vsize_2000) as FLOAT) as vsize_2000 \
CAST(avg(unconfirmed_transactions) as DOUBLE) as unconfirmed_transactions,
CAST(avg(tx_per_second) as DOUBLE) as tx_per_second,
CAST(avg(vbytes_per_second) as DOUBLE) as vbytes_per_second,
CAST(avg(vsize_1) as DOUBLE) as vsize_1,
CAST(avg(vsize_2) as DOUBLE) as vsize_2,
CAST(avg(vsize_3) as DOUBLE) as vsize_3,
CAST(avg(vsize_4) as DOUBLE) as vsize_4,
CAST(avg(vsize_5) as DOUBLE) as vsize_5,
CAST(avg(vsize_6) as DOUBLE) as vsize_6,
CAST(avg(vsize_8) as DOUBLE) as vsize_8,
CAST(avg(vsize_10) as DOUBLE) as vsize_10,
CAST(avg(vsize_12) as DOUBLE) as vsize_12,
CAST(avg(vsize_15) as DOUBLE) as vsize_15,
CAST(avg(vsize_20) as DOUBLE) as vsize_20,
CAST(avg(vsize_30) as DOUBLE) as vsize_30,
CAST(avg(vsize_40) as DOUBLE) as vsize_40,
CAST(avg(vsize_50) as DOUBLE) as vsize_50,
CAST(avg(vsize_60) as DOUBLE) as vsize_60,
CAST(avg(vsize_70) as DOUBLE) as vsize_70,
CAST(avg(vsize_80) as DOUBLE) as vsize_80,
CAST(avg(vsize_90) as DOUBLE) as vsize_90,
CAST(avg(vsize_100) as DOUBLE) as vsize_100,
CAST(avg(vsize_125) as DOUBLE) as vsize_125,
CAST(avg(vsize_150) as DOUBLE) as vsize_150,
CAST(avg(vsize_175) as DOUBLE) as vsize_175,
CAST(avg(vsize_200) as DOUBLE) as vsize_200,
CAST(avg(vsize_250) as DOUBLE) as vsize_250,
CAST(avg(vsize_300) as DOUBLE) as vsize_300,
CAST(avg(vsize_350) as DOUBLE) as vsize_350,
CAST(avg(vsize_400) as DOUBLE) as vsize_400,
CAST(avg(vsize_500) as DOUBLE) as vsize_500,
CAST(avg(vsize_600) as DOUBLE) as vsize_600,
CAST(avg(vsize_700) as DOUBLE) as vsize_700,
CAST(avg(vsize_800) as DOUBLE) as vsize_800,
CAST(avg(vsize_900) as DOUBLE) as vsize_900,
CAST(avg(vsize_1000) as DOUBLE) as vsize_1000,
CAST(avg(vsize_1200) as DOUBLE) as vsize_1200,
CAST(avg(vsize_1400) as DOUBLE) as vsize_1400,
CAST(avg(vsize_1600) as DOUBLE) as vsize_1600,
CAST(avg(vsize_1800) as DOUBLE) as vsize_1800,
CAST(avg(vsize_2000) as DOUBLE) as vsize_2000 \
FROM statistics \
WHERE added BETWEEN DATE_SUB(NOW(), INTERVAL ${interval}) AND NOW() \
GROUP BY UNIX_TIMESTAMP(added) DIV ${div} \
@ -320,7 +320,7 @@ class Statistics {
private getQueryForDays(div: number, interval: string) {
return `SELECT id, UNIX_TIMESTAMP(added) as added, unconfirmed_transactions,
tx_per_second,
CAST(avg(vbytes_per_second) as FLOAT) as vbytes_per_second,
CAST(avg(vbytes_per_second) as DOUBLE) as vbytes_per_second,
vsize_1,
vsize_2,
vsize_3,