Run daily stats at midnight and backfill first launch

This commit is contained in:
softsimon 2022-07-10 20:01:15 +02:00
parent 665d85204b
commit 1c86273059
No known key found for this signature in database
GPG Key ID: 488D7DCFB5A430D7
2 changed files with 207 additions and 222 deletions

View File

@ -4,7 +4,7 @@ import logger from '../logger';
import { Common } from './common';
class DatabaseMigration {
private static currentVersion = 27;
private static currentVersion = 28;
private queryTimeout = 120000;
private statisticsAddedIndexed = false;
private uniqueLogs: string[] = [];
@ -274,6 +274,12 @@ class DatabaseMigration {
await this.$executeQuery('ALTER TABLE `lightning_stats` ADD med_base_fee_mtokens bigint(20) unsigned NOT NULL DEFAULT "0"');
}
if (databaseSchemaVersion < 28 && isBitcoin === true) {
await this.$executeQuery(`TRUNCATE lightning_stats`);
await this.$executeQuery(`TRUNCATE node_stats`);
await this.$executeQuery(`ALTER TABLE lightning_stats MODIFY added DATE`);
}
} catch (e) {
throw e;
}

View File

@ -28,17 +28,26 @@ class LightningStatsUpdater {
return;
}
const now = new Date();
const nextHourInterval = new Date(now.getFullYear(), now.getMonth(), now.getDate(), Math.floor(now.getHours() / 1) + 1, 0, 0, 0);
const difference = nextHourInterval.getTime() - now.getTime();
await this.$populateHistoricalStatistics();
await this.$populateHistoricalNodeStatistics();
setTimeout(() => {
setInterval(async () => {
await this.$runTasks();
}, 1000 * 60 * 60);
}, difference);
this.$runTasks();
}, this.timeUntilMidnight());
}
await this.$runTasks();
private timeUntilMidnight(): number {
const date = new Date();
this.setDateMidnight(date);
date.setUTCHours(24);
return date.getTime() - new Date().getTime();
}
private setDateMidnight(date: Date): void {
date.setUTCHours(0);
date.setUTCMinutes(0);
date.setUTCSeconds(0);
date.setUTCMilliseconds(0);
}
private async $lightningIsSynced(): Promise<boolean> {
@ -46,225 +55,17 @@ class LightningStatsUpdater {
return nodeInfo.is_synced_to_chain && nodeInfo.is_synced_to_graph;
}
private async $runTasks() {
await this.$populateHistoricalStatistics();
await this.$populateHistoricalNodeStatistics();
private async $runTasks(): Promise<void> {
await this.$logLightningStatsDaily();
await this.$logNodeStatsDaily();
}
private async $logNodeStatsDaily() {
const currentDate = new Date().toISOString().split('T')[0];
try {
const [state]: any = await DB.query(`SELECT string FROM state WHERE name = 'last_node_stats'`);
// Only store once per day
if (state[0].string === currentDate) {
return;
}
logger.info(`Running daily node stats update...`);
const query = `SELECT nodes.public_key, c1.channels_count_left, c2.channels_count_right, c1.channels_capacity_left, c2.channels_capacity_right FROM nodes LEFT JOIN (SELECT node1_public_key, COUNT(id) AS channels_count_left, SUM(capacity) AS channels_capacity_left FROM channels WHERE channels.status < 2 GROUP BY node1_public_key) c1 ON c1.node1_public_key = nodes.public_key LEFT JOIN (SELECT node2_public_key, COUNT(id) AS channels_count_right, SUM(capacity) AS channels_capacity_right FROM channels WHERE channels.status < 2 GROUP BY node2_public_key) c2 ON c2.node2_public_key = nodes.public_key`;
const [nodes]: any = await DB.query(query);
// First run we won't have any nodes yet
if (nodes.length < 10) {
return;
}
for (const node of nodes) {
await DB.query(
`INSERT INTO node_stats(public_key, added, capacity, channels) VALUES (?, NOW(), ?, ?)`,
[node.public_key, (parseInt(node.channels_capacity_left || 0, 10)) + (parseInt(node.channels_capacity_right || 0, 10)),
node.channels_count_left + node.channels_count_right]);
}
await DB.query(`UPDATE state SET string = ? WHERE name = 'last_node_stats'`, [currentDate]);
logger.info('Daily node stats has updated.');
} catch (e) {
logger.err('$logNodeStatsDaily() error: ' + (e instanceof Error ? e.message : e));
}
}
// We only run this on first launch
private async $populateHistoricalStatistics() {
try {
const [rows]: any = await DB.query(`SELECT COUNT(*) FROM lightning_stats`);
// Only run if table is empty
if (rows[0]['COUNT(*)'] > 0) {
return;
}
logger.info(`Running historical stats population...`);
const [channels]: any = await DB.query(`SELECT capacity, created, closing_date FROM channels ORDER BY created ASC`);
let date: Date = new Date(this.hardCodedStartTime);
const currentDate = new Date();
while (date < currentDate) {
date.setUTCDate(date.getUTCDate() + 1);
let totalCapacity = 0;
let channelsCount = 0;
for (const channel of channels) {
if (new Date(channel.created) > date) {
break;
}
if (channel.closing_date !== null && new Date(channel.closing_date) < date) {
continue;
}
totalCapacity += channel.capacity;
channelsCount++;
}
const query = `INSERT INTO lightning_stats(
added,
channel_count,
node_count,
total_capacity,
tor_nodes,
clearnet_nodes,
unannounced_nodes
)
VALUES (FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?)`;
await DB.query(query, [
date.getTime() / 1000,
channelsCount,
0,
totalCapacity,
0,
0,
0
]);
// Add one day and continue
date.setDate(date.getDate() + 1);
}
const [nodes]: any = await DB.query(`SELECT first_seen, sockets FROM nodes ORDER BY first_seen ASC`);
date = new Date(this.hardCodedStartTime);
while (date < currentDate) {
date.setUTCDate(date.getUTCDate() + 1);
let nodeCount = 0;
let clearnetNodes = 0;
let torNodes = 0;
let unannouncedNodes = 0;
for (const node of nodes) {
if (new Date(node.first_seen) > date) {
break;
}
nodeCount++;
const sockets = node.sockets.split(',');
let isUnnanounced = true;
for (const socket of sockets) {
const hasOnion = socket.indexOf('.onion') !== -1;
if (hasOnion) {
torNodes++;
isUnnanounced = false;
}
const hasClearnet = [4, 6].includes(net.isIP(socket.split(':')[0]));
if (hasClearnet) {
clearnetNodes++;
isUnnanounced = false;
}
}
if (isUnnanounced) {
unannouncedNodes++;
}
}
const query = `UPDATE lightning_stats SET node_count = ?, tor_nodes = ?, clearnet_nodes = ?, unannounced_nodes = ? WHERE added = FROM_UNIXTIME(?)`;
await DB.query(query, [
nodeCount,
torNodes,
clearnetNodes,
unannouncedNodes,
date.getTime() / 1000,
]);
}
logger.info('Historical stats populated.');
} catch (e) {
logger.err('$populateHistoricalData() error: ' + (e instanceof Error ? e.message : e));
}
}
private async $populateHistoricalNodeStatistics() {
try {
const [rows]: any = await DB.query(`SELECT COUNT(*) FROM node_stats`);
// Only run if table is empty
if (rows[0]['COUNT(*)'] > 0) {
return;
}
logger.info(`Running historical node stats population...`);
const [nodes]: any = await DB.query(`SELECT public_key, first_seen, alias FROM nodes ORDER BY first_seen ASC`);
for (const node of nodes) {
const [channels]: any = await DB.query(`SELECT capacity, created, closing_date FROM channels WHERE node1_public_key = ? OR node2_public_key = ? ORDER BY created ASC`, [node.public_key, node.public_key]);
let date: Date = new Date(this.hardCodedStartTime);
const currentDate = new Date();
let lastTotalCapacity = 0;
let lastChannelsCount = 0;
while (date < currentDate) {
date.setUTCDate(date.getUTCDate() + 1);
let totalCapacity = 0;
let channelsCount = 0;
for (const channel of channels) {
if (new Date(channel.created) > date) {
break;
}
if (channel.closing_date !== null && new Date(channel.closing_date) < date) {
continue;
}
totalCapacity += channel.capacity;
channelsCount++;
}
if (lastTotalCapacity === totalCapacity && lastChannelsCount === channelsCount) {
continue;
}
lastTotalCapacity = totalCapacity;
lastChannelsCount = channelsCount;
const query = `INSERT INTO node_stats(
public_key,
added,
capacity,
channels
)
VALUES (?, FROM_UNIXTIME(?), ?, ?)`;
await DB.query(query, [
node.public_key,
date.getTime() / 1000,
totalCapacity,
channelsCount,
]);
}
logger.debug('Updated node_stats for: ' + node.alias);
}
logger.info('Historical stats populated.');
} catch (e) {
logger.err('$populateHistoricalNodeData() error: ' + (e instanceof Error ? e.message : e));
}
setTimeout(() => {
this.$runTasks();
}, this.timeUntilMidnight());
}
private async $logLightningStatsDaily() {
const currentDate = new Date().toISOString().split('T')[0];
try {
const [state]: any = await DB.query(`SELECT string FROM state WHERE name = 'last_node_stats'`);
// Only store once per day
if (state[0].string === currentDate) {
return;
}
logger.info(`Running lightning daily stats log...`);
const networkGraph = await lightningApi.$getNetworkGraph();
@ -314,7 +115,7 @@ class LightningStatsUpdater {
med_fee_rate,
med_base_fee_mtokens
)
VALUES (NOW(), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`;
VALUES (NOW() - INTERVAL 1 DAY, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`;
await DB.query(query, [
networkGraph.channels.length,
@ -335,6 +136,184 @@ class LightningStatsUpdater {
logger.err('$logLightningStatsDaily() error: ' + (e instanceof Error ? e.message : e));
}
}
private async $logNodeStatsDaily() {
try {
logger.info(`Running daily node stats update...`);
const query = `SELECT nodes.public_key, c1.channels_count_left, c2.channels_count_right, c1.channels_capacity_left, c2.channels_capacity_right FROM nodes LEFT JOIN (SELECT node1_public_key, COUNT(id) AS channels_count_left, SUM(capacity) AS channels_capacity_left FROM channels WHERE channels.status < 2 GROUP BY node1_public_key) c1 ON c1.node1_public_key = nodes.public_key LEFT JOIN (SELECT node2_public_key, COUNT(id) AS channels_count_right, SUM(capacity) AS channels_capacity_right FROM channels WHERE channels.status < 2 GROUP BY node2_public_key) c2 ON c2.node2_public_key = nodes.public_key`;
const [nodes]: any = await DB.query(query);
for (const node of nodes) {
await DB.query(
`INSERT INTO node_stats(public_key, added, capacity, channels) VALUES (?, NOW() - INTERVAL 1 DAY, ?, ?)`,
[node.public_key, (parseInt(node.channels_capacity_left || 0, 10)) + (parseInt(node.channels_capacity_right || 0, 10)),
node.channels_count_left + node.channels_count_right]);
}
logger.info('Daily node stats has updated.');
} catch (e) {
logger.err('$logNodeStatsDaily() error: ' + (e instanceof Error ? e.message : e));
}
}
// We only run this on first launch
private async $populateHistoricalStatistics() {
try {
const [rows]: any = await DB.query(`SELECT COUNT(*) FROM lightning_stats`);
// Only run if table is empty
if (rows[0]['COUNT(*)'] > 0) {
return;
}
logger.info(`Running historical stats population...`);
const [channels]: any = await DB.query(`SELECT capacity, created, closing_date FROM channels ORDER BY created ASC`);
const [nodes]: any = await DB.query(`SELECT first_seen, sockets FROM nodes ORDER BY first_seen ASC`);
const date: Date = new Date(this.hardCodedStartTime);
const currentDate = new Date();
this.setDateMidnight(currentDate);
while (date < currentDate) {
let totalCapacity = 0;
let channelsCount = 0;
for (const channel of channels) {
if (new Date(channel.created) > date) {
break;
}
if (channel.closing_date === null || new Date(channel.closing_date) > date) {
totalCapacity += channel.capacity;
channelsCount++;
}
}
let nodeCount = 0;
let clearnetNodes = 0;
let torNodes = 0;
let unannouncedNodes = 0;
for (const node of nodes) {
if (new Date(node.first_seen) > date) {
break;
}
nodeCount++;
const sockets = node.sockets.split(',');
let isUnnanounced = true;
for (const socket of sockets) {
const hasOnion = socket.indexOf('.onion') !== -1;
if (hasOnion) {
torNodes++;
isUnnanounced = false;
}
const hasClearnet = [4, 6].includes(net.isIP(socket.split(':')[0]));
if (hasClearnet) {
clearnetNodes++;
isUnnanounced = false;
}
}
if (isUnnanounced) {
unannouncedNodes++;
}
}
const query = `INSERT INTO lightning_stats(
added,
channel_count,
node_count,
total_capacity,
tor_nodes,
clearnet_nodes,
unannounced_nodes
)
VALUES (FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?)`;
await DB.query(query, [
date.getTime() / 1000,
channelsCount,
nodeCount,
totalCapacity,
torNodes,
clearnetNodes,
unannouncedNodes,
]);
date.setUTCDate(date.getUTCDate() + 1);
}
logger.info('Historical stats populated.');
} catch (e) {
logger.err('$populateHistoricalData() error: ' + (e instanceof Error ? e.message : e));
}
}
private async $populateHistoricalNodeStatistics() {
try {
const [rows]: any = await DB.query(`SELECT COUNT(*) FROM node_stats`);
// Only run if table is empty
if (rows[0]['COUNT(*)'] > 0) {
return;
}
logger.info(`Running historical node stats population...`);
const [nodes]: any = await DB.query(`SELECT public_key, first_seen, alias FROM nodes ORDER BY first_seen ASC`);
for (const node of nodes) {
const [channels]: any = await DB.query(`SELECT capacity, created, closing_date FROM channels WHERE node1_public_key = ? OR node2_public_key = ? ORDER BY created ASC`, [node.public_key, node.public_key]);
const date: Date = new Date(this.hardCodedStartTime);
const currentDate = new Date();
this.setDateMidnight(currentDate);
let lastTotalCapacity = 0;
let lastChannelsCount = 0;
while (date < currentDate) {
let totalCapacity = 0;
let channelsCount = 0;
for (const channel of channels) {
if (new Date(channel.created) > date) {
break;
}
if (channel.closing_date !== null && new Date(channel.closing_date) < date) {
date.setUTCDate(date.getUTCDate() + 1);
continue;
}
totalCapacity += channel.capacity;
channelsCount++;
}
if (lastTotalCapacity === totalCapacity && lastChannelsCount === channelsCount) {
date.setUTCDate(date.getUTCDate() + 1);
continue;
}
lastTotalCapacity = totalCapacity;
lastChannelsCount = channelsCount;
const query = `INSERT INTO node_stats(
public_key,
added,
capacity,
channels
)
VALUES (?, FROM_UNIXTIME(?), ?, ?)`;
await DB.query(query, [
node.public_key,
date.getTime() / 1000,
totalCapacity,
channelsCount,
]);
date.setUTCDate(date.getUTCDate() + 1);
}
logger.debug('Updated node_stats for: ' + node.alias);
}
logger.info('Historical stats populated.');
} catch (e) {
logger.err('$populateHistoricalNodeData() error: ' + (e instanceof Error ? e.message : e));
}
}
}
export default new LightningStatsUpdater();