mirror of
https://github.com/mempool/mempool.git
synced 2025-02-23 14:40:38 +01:00
Store nodes and channels
This commit is contained in:
parent
948f905a66
commit
3e6af8e87b
8 changed files with 422 additions and 29 deletions
|
@ -20,8 +20,8 @@
|
|||
"HOST": "127.0.0.1",
|
||||
"PORT": 3306,
|
||||
"SOCKET": "/var/run/mysql/mysql.sock",
|
||||
"DATABASE": "mempool",
|
||||
"USERNAME": "mempool",
|
||||
"PASSWORD": "mempool"
|
||||
"DATABASE": "lightning",
|
||||
"USERNAME": "root",
|
||||
"PASSWORD": "root"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import { ILightningApi } from './lightning-api.interface';
|
||||
|
||||
export interface AbstractLightningApi {
|
||||
getNetworkInfo(): Promise<ILightningApi.NetworkInfo>;
|
||||
getNetworkGraph(): Promise<ILightningApi.NetworkGraph>;
|
||||
$getNetworkInfo(): Promise<ILightningApi.NetworkInfo>;
|
||||
$getNetworkGraph(): Promise<ILightningApi.NetworkGraph>;
|
||||
$getChanInfo(id: string): Promise<ILightningApi.Channel>;
|
||||
}
|
||||
|
|
|
@ -26,6 +26,13 @@ export namespace ILightningApi {
|
|||
|
||||
interface Policy {
|
||||
public_key: string;
|
||||
base_fee_mtokens?: number;
|
||||
cltv_delta?: number;
|
||||
fee_rate?: number;
|
||||
is_disabled?: boolean;
|
||||
max_htlc_mtokens?: number;
|
||||
min_htlc_mtokens?: number;
|
||||
updated_at?: string;
|
||||
}
|
||||
|
||||
export interface Node {
|
||||
|
|
|
@ -25,13 +25,17 @@ class LndApi implements AbstractLightningApi {
|
|||
}
|
||||
}
|
||||
|
||||
async getNetworkInfo(): Promise<ILightningApi.NetworkInfo> {
|
||||
async $getNetworkInfo(): Promise<ILightningApi.NetworkInfo> {
|
||||
return await lnService.getNetworkInfo({ lnd: this.lnd });
|
||||
}
|
||||
|
||||
async getNetworkGraph(): Promise<ILightningApi.NetworkGraph> {
|
||||
async $getNetworkGraph(): Promise<ILightningApi.NetworkGraph> {
|
||||
return await lnService.getNetworkGraph({ lnd: this.lnd });
|
||||
}
|
||||
|
||||
async $getChanInfo(id: string): Promise<ILightningApi.Channel> {
|
||||
return await lnService.getChannel({ lnd: this.lnd, id });
|
||||
}
|
||||
}
|
||||
|
||||
export default LndApi;
|
||||
|
|
232
lightning-backend/src/database-migration.ts
Normal file
232
lightning-backend/src/database-migration.ts
Normal file
|
@ -0,0 +1,232 @@
|
|||
import config from './config';
|
||||
import DB from './database';
|
||||
import logger from './logger';
|
||||
|
||||
const sleep = (ms: number) => new Promise(res => setTimeout(res, ms));
|
||||
|
||||
class DatabaseMigration {
|
||||
private static currentVersion = 1;
|
||||
private queryTimeout = 120000;
|
||||
|
||||
constructor() { }
|
||||
/**
|
||||
* Entry point
|
||||
*/
|
||||
public async $initializeOrMigrateDatabase(): Promise<void> {
|
||||
logger.debug('MIGRATIONS: Running migrations');
|
||||
|
||||
await this.$printDatabaseVersion();
|
||||
|
||||
// First of all, if the `state` database does not exist, create it so we can track migration version
|
||||
if (!await this.$checkIfTableExists('state')) {
|
||||
logger.debug('MIGRATIONS: `state` table does not exist. Creating it.');
|
||||
try {
|
||||
await this.$createMigrationStateTable();
|
||||
} catch (e) {
|
||||
logger.err('MIGRATIONS: Unable to create `state` table, aborting in 10 seconds. ' + e);
|
||||
await sleep(10000);
|
||||
process.exit(-1);
|
||||
}
|
||||
logger.debug('MIGRATIONS: `state` table initialized.');
|
||||
}
|
||||
|
||||
let databaseSchemaVersion = 0;
|
||||
try {
|
||||
databaseSchemaVersion = await this.$getSchemaVersionFromDatabase();
|
||||
} catch (e) {
|
||||
logger.err('MIGRATIONS: Unable to get current database migration version, aborting in 10 seconds. ' + e);
|
||||
await sleep(10000);
|
||||
process.exit(-1);
|
||||
}
|
||||
|
||||
logger.debug('MIGRATIONS: Current state.schema_version ' + databaseSchemaVersion);
|
||||
logger.debug('MIGRATIONS: Latest DatabaseMigration.version is ' + DatabaseMigration.currentVersion);
|
||||
if (databaseSchemaVersion >= DatabaseMigration.currentVersion) {
|
||||
logger.debug('MIGRATIONS: Nothing to do.');
|
||||
return;
|
||||
}
|
||||
|
||||
// Now, create missing tables. Those queries cannot be wrapped into a transaction unfortunately
|
||||
try {
|
||||
await this.$createMissingTablesAndIndexes(databaseSchemaVersion);
|
||||
} catch (e) {
|
||||
logger.err('MIGRATIONS: Unable to create required tables, aborting in 10 seconds. ' + e);
|
||||
await sleep(10000);
|
||||
process.exit(-1);
|
||||
}
|
||||
|
||||
if (DatabaseMigration.currentVersion > databaseSchemaVersion) {
|
||||
logger.notice('MIGRATIONS: Upgrading datababse schema');
|
||||
try {
|
||||
await this.$migrateTableSchemaFromVersion(databaseSchemaVersion);
|
||||
logger.notice(`MIGRATIONS: OK. Database schema have been migrated from version ${databaseSchemaVersion} to ${DatabaseMigration.currentVersion} (latest version)`);
|
||||
} catch (e) {
|
||||
logger.err('MIGRATIONS: Unable to migrate database, aborting. ' + e);
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create all missing tables
|
||||
*/
|
||||
private async $createMissingTablesAndIndexes(databaseSchemaVersion: number) {
|
||||
try {
|
||||
await this.$executeQuery(this.getCreateStatisticsQuery(), await this.$checkIfTableExists('statistics'));
|
||||
await this.$executeQuery(this.getCreateNodesQuery(), await this.$checkIfTableExists('nodes'));
|
||||
await this.$executeQuery(this.getCreateChannelsQuery(), await this.$checkIfTableExists('channels'));
|
||||
} catch (e) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Small query execution wrapper to log all executed queries
|
||||
*/
|
||||
private async $executeQuery(query: string, silent: boolean = false): Promise<any> {
|
||||
if (!silent) {
|
||||
logger.debug('MIGRATIONS: Execute query:\n' + query);
|
||||
}
|
||||
return DB.query({ sql: query, timeout: this.queryTimeout });
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if 'table' exists in the database
|
||||
*/
|
||||
private async $checkIfTableExists(table: string): Promise<boolean> {
|
||||
const query = `SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = '${config.DATABASE.DATABASE}' AND TABLE_NAME = '${table}'`;
|
||||
const [rows] = await DB.query({ sql: query, timeout: this.queryTimeout });
|
||||
return rows[0]['COUNT(*)'] === 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get current database version
|
||||
*/
|
||||
private async $getSchemaVersionFromDatabase(): Promise<number> {
|
||||
const query = `SELECT number FROM state WHERE name = 'schema_version';`;
|
||||
const [rows] = await this.$executeQuery(query, true);
|
||||
return rows[0]['number'];
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the `state` table
|
||||
*/
|
||||
private async $createMigrationStateTable(): Promise<void> {
|
||||
try {
|
||||
const query = `CREATE TABLE IF NOT EXISTS state (
|
||||
name varchar(25) NOT NULL,
|
||||
number int(11) NULL,
|
||||
string varchar(100) NULL,
|
||||
CONSTRAINT name_unique UNIQUE (name)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
|
||||
await this.$executeQuery(query);
|
||||
|
||||
// Set initial values
|
||||
await this.$executeQuery(`INSERT INTO state VALUES('schema_version', 0, NULL);`);
|
||||
} catch (e) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* We actually execute the migrations queries here
|
||||
*/
|
||||
private async $migrateTableSchemaFromVersion(version: number): Promise<void> {
|
||||
const transactionQueries: string[] = [];
|
||||
for (const query of this.getMigrationQueriesFromVersion(version)) {
|
||||
transactionQueries.push(query);
|
||||
}
|
||||
transactionQueries.push(this.getUpdateToLatestSchemaVersionQuery());
|
||||
|
||||
try {
|
||||
await this.$executeQuery('START TRANSACTION;');
|
||||
for (const query of transactionQueries) {
|
||||
await this.$executeQuery(query);
|
||||
}
|
||||
await this.$executeQuery('COMMIT;');
|
||||
} catch (e) {
|
||||
await this.$executeQuery('ROLLBACK;');
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate migration queries based on schema version
|
||||
*/
|
||||
private getMigrationQueriesFromVersion(version: number): string[] {
|
||||
const queries: string[] = [];
|
||||
return queries;
|
||||
}
|
||||
|
||||
/**
|
||||
* Save the schema version in the database
|
||||
*/
|
||||
private getUpdateToLatestSchemaVersionQuery(): string {
|
||||
return `UPDATE state SET number = ${DatabaseMigration.currentVersion} WHERE name = 'schema_version';`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Print current database version
|
||||
*/
|
||||
private async $printDatabaseVersion() {
|
||||
try {
|
||||
const [rows] = await this.$executeQuery('SELECT VERSION() as version;', true);
|
||||
logger.debug(`MIGRATIONS: Database engine version '${rows[0].version}'`);
|
||||
} catch (e) {
|
||||
logger.debug(`MIGRATIONS: Could not fetch database engine version. ` + e);
|
||||
}
|
||||
}
|
||||
|
||||
private getCreateStatisticsQuery(): string {
|
||||
return `CREATE TABLE IF NOT EXISTS statistics (
|
||||
id int(11) NOT NULL AUTO_INCREMENT,
|
||||
added datetime NOT NULL,
|
||||
channel_count int(11) NOT NULL,
|
||||
node_count int(11) NOT NULL,
|
||||
total_capacity double unsigned NOT NULL,
|
||||
average_channel_size double unsigned NOT NULL,
|
||||
CONSTRAINT PRIMARY KEY (id)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
|
||||
}
|
||||
|
||||
private getCreateNodesQuery(): string {
|
||||
return `CREATE TABLE IF NOT EXISTS nodes (
|
||||
public_key varchar(66) NOT NULL,
|
||||
first_seen datetime NOT NULL,
|
||||
updated_at datetime NOT NULL,
|
||||
alias varchar(200) COLLATE utf8mb4_general_ci NOT NULL,
|
||||
color varchar(200) NOT NULL,
|
||||
CONSTRAINT PRIMARY KEY (public_key)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
|
||||
}
|
||||
|
||||
private getCreateChannelsQuery(): string {
|
||||
return `CREATE TABLE IF NOT EXISTS channels (
|
||||
id varchar(15) NOT NULL,
|
||||
capacity double unsigned NOT NULL,
|
||||
transaction_id varchar(64) NOT NULL,
|
||||
transaction_vout int(11) NOT NULL,
|
||||
updated_at datetime NOT NULL,
|
||||
node1_public_key varchar(66) NOT NULL,
|
||||
node1_base_fee_mtokens double unsigned NULL,
|
||||
node1_cltv_delta int(11) NULL,
|
||||
node1_fee_rate int(11) NULL,
|
||||
node1_is_disabled boolean NULL,
|
||||
node1_max_htlc_mtokens double unsigned NULL,
|
||||
node1_min_htlc_mtokens double unsigned NULL,
|
||||
node1_updated_at datetime NULL,
|
||||
node2_public_key varchar(66) NOT NULL,
|
||||
node2_base_fee_mtokens double unsigned NULL,
|
||||
node2_cltv_delta int(11) NULL,
|
||||
node2_fee_rate int(11) NULL,
|
||||
node2_is_disabled boolean NULL,
|
||||
node2_max_htlc_mtokens double unsigned NULL,
|
||||
node2_min_htlc_mtokens double unsigned NULL,
|
||||
node2_updated_at datetime NULL,
|
||||
CONSTRAINT PRIMARY KEY (id)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
|
||||
}
|
||||
}
|
||||
|
||||
export default new DatabaseMigration();
|
|
@ -1,8 +1,9 @@
|
|||
import config from './config';
|
||||
import logger from './logger';
|
||||
import DB from './database';
|
||||
import lightningApi from './api/lightning-api-factory';
|
||||
import statsUpdater from './tasks/stats-updater';
|
||||
import databaseMigration from './database-migration';
|
||||
import statsUpdater from './tasks/stats-updater.service';
|
||||
import nodeSyncService from './tasks/node-sync.service';
|
||||
|
||||
logger.notice(`Mempool Server is running on port ${config.MEMPOOL.HTTP_PORT}`);
|
||||
|
||||
|
@ -13,12 +14,10 @@ class LightningServer {
|
|||
|
||||
async init() {
|
||||
await DB.checkDbConnection();
|
||||
await databaseMigration.$initializeOrMigrateDatabase();
|
||||
|
||||
statsUpdater.startService();
|
||||
|
||||
const networkGraph = await lightningApi.getNetworkGraph();
|
||||
logger.info('Network graph channels: ' + networkGraph.channels.length);
|
||||
logger.info('Network graph nodes: ' + networkGraph.nodes.length);
|
||||
nodeSyncService.startService();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
163
lightning-backend/src/tasks/node-sync.service.ts
Normal file
163
lightning-backend/src/tasks/node-sync.service.ts
Normal file
|
@ -0,0 +1,163 @@
|
|||
|
||||
import DB from '../database';
|
||||
import logger from '../logger';
|
||||
import lightningApi from '../api/lightning-api-factory';
|
||||
import { ILightningApi } from '../api/lightning-api.interface';
|
||||
|
||||
class NodeSyncService {
|
||||
constructor() {}
|
||||
|
||||
public async startService() {
|
||||
logger.info('Starting node sync service');
|
||||
|
||||
this.$updateNodes();
|
||||
|
||||
setInterval(async () => {
|
||||
await this.$updateNodes();
|
||||
}, 1000 * 60 * 60);
|
||||
}
|
||||
|
||||
private async $updateNodes() {
|
||||
try {
|
||||
const networkGraph = await lightningApi.$getNetworkGraph();
|
||||
|
||||
for (const node of networkGraph.nodes) {
|
||||
await this.$saveNode(node);
|
||||
}
|
||||
|
||||
for (const channel of networkGraph.channels) {
|
||||
await this.$saveChannel(channel);
|
||||
}
|
||||
} catch (e) {
|
||||
logger.err('$updateNodes() error: ' + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
}
|
||||
|
||||
private async $saveChannel(channel: ILightningApi.Channel) {
|
||||
try {
|
||||
const d = new Date(Date.parse(channel.updated_at));
|
||||
const query = `INSERT INTO channels
|
||||
(
|
||||
id,
|
||||
capacity,
|
||||
transaction_id,
|
||||
transaction_vout,
|
||||
updated_at,
|
||||
node1_public_key,
|
||||
node1_base_fee_mtokens,
|
||||
node1_cltv_delta,
|
||||
node1_fee_rate,
|
||||
node1_is_disabled,
|
||||
node1_max_htlc_mtokens,
|
||||
node1_min_htlc_mtokens,
|
||||
node1_updated_at,
|
||||
node2_public_key,
|
||||
node2_base_fee_mtokens,
|
||||
node2_cltv_delta,
|
||||
node2_fee_rate,
|
||||
node2_is_disabled,
|
||||
node2_max_htlc_mtokens,
|
||||
node2_min_htlc_mtokens,
|
||||
node2_updated_at
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
capacity = ?,
|
||||
updated_at = ?,
|
||||
node1_public_key = ?,
|
||||
node1_base_fee_mtokens = ?,
|
||||
node1_cltv_delta = ?,
|
||||
node1_fee_rate = ?,
|
||||
node1_is_disabled = ?,
|
||||
node1_max_htlc_mtokens = ?,
|
||||
node1_min_htlc_mtokens = ?,
|
||||
node1_updated_at = ?,
|
||||
node2_public_key = ?,
|
||||
node2_base_fee_mtokens = ?,
|
||||
node2_cltv_delta = ?,
|
||||
node2_fee_rate = ?,
|
||||
node2_is_disabled = ?,
|
||||
node2_max_htlc_mtokens = ?,
|
||||
node2_min_htlc_mtokens = ?,
|
||||
node2_updated_at = ?
|
||||
;`;
|
||||
|
||||
await DB.query(query, [
|
||||
channel.id,
|
||||
channel.capacity,
|
||||
channel.transaction_id,
|
||||
channel.transaction_vout,
|
||||
channel.updated_at ? this.utcDateToMysql(channel.updated_at) : 0,
|
||||
channel.policies[0].public_key,
|
||||
channel.policies[0].base_fee_mtokens,
|
||||
channel.policies[0].cltv_delta,
|
||||
channel.policies[0].fee_rate,
|
||||
channel.policies[0].is_disabled,
|
||||
channel.policies[0].max_htlc_mtokens,
|
||||
channel.policies[0].min_htlc_mtokens,
|
||||
channel.policies[0].updated_at ? this.utcDateToMysql(channel.policies[0].updated_at) : 0,
|
||||
channel.policies[1].public_key,
|
||||
channel.policies[1].base_fee_mtokens,
|
||||
channel.policies[1].cltv_delta,
|
||||
channel.policies[1].fee_rate,
|
||||
channel.policies[1].is_disabled,
|
||||
channel.policies[1].max_htlc_mtokens,
|
||||
channel.policies[1].min_htlc_mtokens,
|
||||
channel.policies[1].updated_at ? this.utcDateToMysql(channel.policies[1].updated_at) : 0,
|
||||
channel.capacity,
|
||||
channel.updated_at ? this.utcDateToMysql(channel.updated_at) : 0,
|
||||
channel.policies[0].public_key,
|
||||
channel.policies[0].base_fee_mtokens,
|
||||
channel.policies[0].cltv_delta,
|
||||
channel.policies[0].fee_rate,
|
||||
channel.policies[0].is_disabled,
|
||||
channel.policies[0].max_htlc_mtokens,
|
||||
channel.policies[0].min_htlc_mtokens,
|
||||
channel.policies[0].updated_at ? this.utcDateToMysql(channel.policies[0].updated_at) : 0,
|
||||
channel.policies[1].public_key,
|
||||
channel.policies[1].base_fee_mtokens,
|
||||
channel.policies[1].cltv_delta,
|
||||
channel.policies[1].fee_rate,
|
||||
channel.policies[1].is_disabled,
|
||||
channel.policies[1].max_htlc_mtokens,
|
||||
channel.policies[1].min_htlc_mtokens,
|
||||
channel.policies[1].updated_at ? this.utcDateToMysql(channel.policies[1].updated_at) : 0,
|
||||
]);
|
||||
} catch (e) {
|
||||
logger.err('$saveChannel() error: ' + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
}
|
||||
|
||||
private async $saveNode(node: ILightningApi.Node) {
|
||||
try {
|
||||
const updatedAt = this.utcDateToMysql(node.updated_at);
|
||||
const query = `INSERT INTO nodes(
|
||||
public_key,
|
||||
first_seen,
|
||||
updated_at,
|
||||
alias,
|
||||
color
|
||||
)
|
||||
VALUES (?, NOW(), ?, ?, ?) ON DUPLICATE KEY UPDATE updated_at = ?, alias = ?, color = ?;`;
|
||||
|
||||
await DB.query(query, [
|
||||
node.public_key,
|
||||
updatedAt,
|
||||
node.alias,
|
||||
node.color,
|
||||
updatedAt,
|
||||
node.alias,
|
||||
node.color,
|
||||
]);
|
||||
} catch (e) {
|
||||
logger.err('$saveNode() error: ' + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
}
|
||||
|
||||
private utcDateToMysql(dateString: string): string {
|
||||
const d = new Date(Date.parse(dateString));
|
||||
return d.toISOString().split('T')[0] + ' ' + d.toTimeString().split(' ')[0];
|
||||
}
|
||||
}
|
||||
|
||||
export default new NodeSyncService();
|
|
@ -3,24 +3,11 @@ import DB from '../database';
|
|||
import logger from '../logger';
|
||||
import lightningApi from '../api/lightning-api-factory';
|
||||
|
||||
/*
|
||||
CREATE TABLE IF NOT EXISTS lightning_stats (
|
||||
id int(11) NOT NULL AUTO_INCREMENT,
|
||||
added datetime NOT NULL,
|
||||
channel_count int(11) NOT NULL,
|
||||
node_count int(11) NOT NULL,
|
||||
total_capacity double unsigned NOT NULL,
|
||||
average_channel_size double unsigned NOT NULL,
|
||||
CONSTRAINT PRIMARY KEY (id)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
|
||||
*/
|
||||
|
||||
|
||||
class LightningStatsUpdater {
|
||||
constructor() {}
|
||||
|
||||
public async startService() {
|
||||
logger.info('Starting Lightning Stats service');
|
||||
logger.info('Starting Stats service');
|
||||
|
||||
const now = new Date();
|
||||
const nextHourInterval = new Date(now.getFullYear(), now.getMonth(), now.getDate(), Math.floor(now.getHours() / 1) + 1, 0, 0, 0);
|
||||
|
@ -35,10 +22,10 @@ class LightningStatsUpdater {
|
|||
}
|
||||
|
||||
private async $logLightningStats() {
|
||||
const networkInfo = await lightningApi.getNetworkInfo();
|
||||
const networkInfo = await lightningApi.$getNetworkInfo();
|
||||
|
||||
try {
|
||||
const query = `INSERT INTO lightning_stats(
|
||||
const query = `INSERT INTO statistics(
|
||||
added,
|
||||
channel_count,
|
||||
node_count,
|
Loading…
Add table
Reference in a new issue