Create and populate nodes_socket table

This commit is contained in:
nymkappa 2022-08-13 10:24:11 +02:00
parent f55cbe11af
commit 3e9543f0b6
No known key found for this signature in database
GPG Key ID: E155910B16E8BD04
7 changed files with 107 additions and 6 deletions

View File

@ -1,5 +1,7 @@
import { CpfpInfo, TransactionExtended, TransactionStripped } from '../mempool.interfaces'; import { CpfpInfo, TransactionExtended, TransactionStripped } from '../mempool.interfaces';
import config from '../config'; import config from '../config';
import { NodeSocket } from '../repositories/NodesSocketsRepository';
import { isIP } from 'net';
export class Common { export class Common {
static nativeAssetId = config.MEMPOOL.NETWORK === 'liquidtestnet' ? static nativeAssetId = config.MEMPOOL.NETWORK === 'liquidtestnet' ?
'144c654344aa716d6f3abcc1ca90e5641e4e2a7f633bc09fe3baf64585819a49' '144c654344aa716d6f3abcc1ca90e5641e4e2a7f633bc09fe3baf64585819a49'
@ -221,4 +223,35 @@ export class Common {
const d = new Date((date || 0) * 1000); const d = new Date((date || 0) * 1000);
return d.toISOString().split('T')[0] + ' ' + d.toTimeString().split(' ')[0]; return d.toISOString().split('T')[0] + ' ' + d.toTimeString().split(' ')[0];
} }
static formatSocket(publicKey: string, socket: {network: string, addr: string}): NodeSocket {
let network: string | null = null;
if (config.LIGHTNING.BACKEND === 'cln') {
network = socket.network;
} else if (config.LIGHTNING.BACKEND === 'lnd') {
if (socket.addr.indexOf('onion') !== -1) {
if (socket.addr.split('.')[0].length >= 56) {
network = 'torv3';
} else {
network = 'torv2';
}
} else if (socket.addr.indexOf('i2p') !== -1) {
network = 'i2p';
} else {
const ipv = isIP(socket.addr.split(':')[0]);
if (ipv === 4) {
network = 'ipv4';
} else if (ipv === 6) {
network = 'ipv6';
}
}
}
return {
publicKey: publicKey,
network: network,
addr: socket.addr,
};
}
} }

View File

@ -4,7 +4,7 @@ import logger from '../logger';
import { Common } from './common'; import { Common } from './common';
class DatabaseMigration { class DatabaseMigration {
private static currentVersion = 36; private static currentVersion = 37;
private queryTimeout = 120000; private queryTimeout = 120000;
private statisticsAddedIndexed = false; private statisticsAddedIndexed = false;
private uniqueLogs: string[] = []; private uniqueLogs: string[] = [];
@ -324,6 +324,10 @@ class DatabaseMigration {
if (databaseSchemaVersion < 36 && isBitcoin == true) { if (databaseSchemaVersion < 36 && isBitcoin == true) {
await this.$executeQuery('ALTER TABLE `nodes` ADD status TINYINT NOT NULL DEFAULT "1"'); await this.$executeQuery('ALTER TABLE `nodes` ADD status TINYINT NOT NULL DEFAULT "1"');
} }
if (databaseSchemaVersion < 37 && isBitcoin == true) {
await this.$executeQuery(this.getCreateLNNodesSocketsTableQuery(), await this.$checkIfTableExists('nodes_sockets'));
}
} }
/** /**
@ -737,7 +741,7 @@ class DatabaseMigration {
names text DEFAULT NULL, names text DEFAULT NULL,
UNIQUE KEY id (id,type), UNIQUE KEY id (id,type),
KEY id_2 (id) KEY id_2 (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;` ) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
} }
private getCreateBlocksPricesTableQuery(): string { private getCreateBlocksPricesTableQuery(): string {
@ -749,6 +753,16 @@ class DatabaseMigration {
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`; ) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
} }
private getCreateLNNodesSocketsTableQuery(): string {
return `CREATE TABLE IF NOT EXISTS nodes_sockets (
public_key varchar(66) NOT NULL,
socket varchar(100) NOT NULL,
type enum('ipv4', 'ipv6', 'torv2', 'torv3', 'i2p', 'dns') NULL,
UNIQUE KEY public_key_socket (public_key, socket),
INDEX (public_key)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
}
public async $truncateIndexedData(tables: string[]) { public async $truncateIndexedData(tables: string[]) {
const allowedTables = ['blocks', 'hashrates', 'prices']; const allowedTables = ['blocks', 'hashrates', 'prices'];

View File

@ -17,7 +17,7 @@ export function convertNode(clNode: any): ILightningApi.Node {
network: addr.type, network: addr.type,
addr: `${addr.address}:${addr.port}` addr: `${addr.address}:${addr.port}`
}; };
}), }) ?? [],
last_update: clNode?.last_timestamp ?? 0, last_update: clNode?.last_timestamp ?? 0,
}; };
} }

View File

@ -82,4 +82,4 @@ export namespace ILightningApi {
is_required: boolean; is_required: boolean;
is_known: boolean; is_known: boolean;
} }
} }

View File

@ -1,4 +1,3 @@
import transactionUtils from '../api/transaction-utils';
import DB from '../database'; import DB from '../database';
import logger from '../logger'; import logger from '../logger';
import { BlockAudit } from '../mempool.interfaces'; import { BlockAudit } from '../mempool.interfaces';

View File

@ -0,0 +1,45 @@
import { ResultSetHeader } from 'mysql2';
import DB from '../database';
import logger from '../logger';
export interface NodeSocket {
publicKey: string;
network: string | null;
addr: string;
}
class NodesSocketsRepository {
public async $saveSocket(socket: NodeSocket): Promise<void> {
try {
await DB.query(`
INSERT INTO nodes_sockets(public_key, socket, type)
VALUE (?, ?, ?)
`, [socket.publicKey, socket.addr, socket.network]);
} catch (e: any) {
if (e.errno !== 1062) { // ER_DUP_ENTRY - Not an issue, just ignore this
logger.err(`Cannot save node socket (${[socket.publicKey, socket.addr, socket.network]}) into db. Reason: ` + (e instanceof Error ? e.message : e));
// We don't throw, not a critical issue if we miss some nodes sockets
}
}
}
public async $deleteUnusedSockets(publicKey: string, addresses: string[]): Promise<number> {
if (addresses.length === 0) {
return 0;
}
try {
const query = `
DELETE FROM nodes_sockets
WHERE public_key = ?
AND socket NOT IN (${addresses.map(id => `"${id}"`).join(',')})
`;
const [result] = await DB.query<ResultSetHeader>(query, [publicKey]);
return result.affectedRows;
} catch (e) {
logger.err(`Cannot delete unused sockets for ${publicKey} from db. Reason: ` + (e instanceof Error ? e.message : e));
return 0;
}
}
}
export default new NodesSocketsRepository();

View File

@ -10,6 +10,8 @@ import lightningApi from '../../api/lightning/lightning-api-factory';
import nodesApi from '../../api/explorer/nodes.api'; import nodesApi from '../../api/explorer/nodes.api';
import { ResultSetHeader } from 'mysql2'; import { ResultSetHeader } from 'mysql2';
import fundingTxFetcher from './sync-tasks/funding-tx-fetcher'; import fundingTxFetcher from './sync-tasks/funding-tx-fetcher';
import NodesSocketsRepository from '../../repositories/NodesSocketsRepository';
import { Common } from '../../api/common';
class NetworkSyncService { class NetworkSyncService {
loggerTimer = 0; loggerTimer = 0;
@ -58,6 +60,7 @@ class NetworkSyncService {
private async $updateNodesList(nodes: ILightningApi.Node[]): Promise<void> { private async $updateNodesList(nodes: ILightningApi.Node[]): Promise<void> {
let progress = 0; let progress = 0;
let deletedSockets = 0;
const graphNodesPubkeys: string[] = []; const graphNodesPubkeys: string[] = [];
for (const node of nodes) { for (const node of nodes) {
await nodesApi.$saveNode(node); await nodesApi.$saveNode(node);
@ -69,8 +72,15 @@ class NetworkSyncService {
logger.info(`Updating node ${progress}/${nodes.length}`); logger.info(`Updating node ${progress}/${nodes.length}`);
this.loggerTimer = new Date().getTime() / 1000; this.loggerTimer = new Date().getTime() / 1000;
} }
const addresses: string[] = [];
for (const socket of node.addresses) {
await NodesSocketsRepository.$saveSocket(Common.formatSocket(node.pub_key, socket));
addresses.push(socket.addr);
}
deletedSockets += await NodesSocketsRepository.$deleteUnusedSockets(node.pub_key, addresses);
} }
logger.info(`${progress} nodes updated`); logger.info(`${progress} nodes updated. ${deletedSockets} sockets deleted`);
// If a channel if not present in the graph, mark it as inactive // If a channel if not present in the graph, mark it as inactive
nodesApi.$setNodesInactive(graphNodesPubkeys); nodesApi.$setNodesInactive(graphNodesPubkeys);