mirror of
https://github.com/mempool/mempool.git
synced 2024-11-19 09:52:14 +01:00
Merge pull request #5042 from mempool/natsoni/statistics-replication
Add statistics to replication service
This commit is contained in:
commit
286fc8e9ad
@ -139,6 +139,8 @@
|
||||
"ENABLED": false,
|
||||
"AUDIT": false,
|
||||
"AUDIT_START_HEIGHT": 774000,
|
||||
"STATISTICS": false,
|
||||
"STATISTICS_START_TIME": 1481932800,
|
||||
"SERVERS": [
|
||||
"list",
|
||||
"of",
|
||||
|
@ -131,6 +131,8 @@
|
||||
"ENABLED": false,
|
||||
"AUDIT": false,
|
||||
"AUDIT_START_HEIGHT": 774000,
|
||||
"STATISTICS": false,
|
||||
"STATISTICS_START_TIME": 1481932800,
|
||||
"SERVERS": []
|
||||
},
|
||||
"MEMPOOL_SERVICES": {
|
||||
|
@ -135,6 +135,8 @@ describe('Mempool Backend Config', () => {
|
||||
ENABLED: false,
|
||||
AUDIT: false,
|
||||
AUDIT_START_HEIGHT: 774000,
|
||||
STATISTICS: false,
|
||||
STATISTICS_START_TIME: 1481932800,
|
||||
SERVERS: []
|
||||
});
|
||||
|
||||
|
@ -64,7 +64,7 @@ class StatisticsApi {
|
||||
}
|
||||
}
|
||||
|
||||
public async $create(statistics: Statistic): Promise<number | undefined> {
|
||||
public async $create(statistics: Statistic, convertToDatetime = false): Promise<number | undefined> {
|
||||
try {
|
||||
const query = `INSERT INTO statistics(
|
||||
added,
|
||||
@ -114,7 +114,7 @@ class StatisticsApi {
|
||||
vsize_1800,
|
||||
vsize_2000
|
||||
)
|
||||
VALUES (${statistics.added}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
|
||||
VALUES (${convertToDatetime ? `FROM_UNIXTIME(${statistics.added})` : statistics.added}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
|
||||
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`;
|
||||
|
||||
const params: (string | number)[] = [
|
||||
@ -456,6 +456,59 @@ class StatisticsApi {
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
public mapOptimizedStatisticToStatistic(statistic: OptimizedStatistic[]): Statistic[] {
|
||||
return statistic.map((s) => {
|
||||
return {
|
||||
added: s.added,
|
||||
unconfirmed_transactions: s.count,
|
||||
tx_per_second: 0,
|
||||
vbytes_per_second: s.vbytes_per_second,
|
||||
mempool_byte_weight: s.mempool_byte_weight || 0,
|
||||
total_fee: s.total_fee || 0,
|
||||
min_fee: s.min_fee,
|
||||
fee_data: '',
|
||||
vsize_1: s.vsizes[0],
|
||||
vsize_2: s.vsizes[1],
|
||||
vsize_3: s.vsizes[2],
|
||||
vsize_4: s.vsizes[3],
|
||||
vsize_5: s.vsizes[4],
|
||||
vsize_6: s.vsizes[5],
|
||||
vsize_8: s.vsizes[6],
|
||||
vsize_10: s.vsizes[7],
|
||||
vsize_12: s.vsizes[8],
|
||||
vsize_15: s.vsizes[9],
|
||||
vsize_20: s.vsizes[10],
|
||||
vsize_30: s.vsizes[11],
|
||||
vsize_40: s.vsizes[12],
|
||||
vsize_50: s.vsizes[13],
|
||||
vsize_60: s.vsizes[14],
|
||||
vsize_70: s.vsizes[15],
|
||||
vsize_80: s.vsizes[16],
|
||||
vsize_90: s.vsizes[17],
|
||||
vsize_100: s.vsizes[18],
|
||||
vsize_125: s.vsizes[19],
|
||||
vsize_150: s.vsizes[20],
|
||||
vsize_175: s.vsizes[21],
|
||||
vsize_200: s.vsizes[22],
|
||||
vsize_250: s.vsizes[23],
|
||||
vsize_300: s.vsizes[24],
|
||||
vsize_350: s.vsizes[25],
|
||||
vsize_400: s.vsizes[26],
|
||||
vsize_500: s.vsizes[27],
|
||||
vsize_600: s.vsizes[28],
|
||||
vsize_700: s.vsizes[29],
|
||||
vsize_800: s.vsizes[30],
|
||||
vsize_900: s.vsizes[31],
|
||||
vsize_1000: s.vsizes[32],
|
||||
vsize_1200: s.vsizes[33],
|
||||
vsize_1400: s.vsizes[34],
|
||||
vsize_1600: s.vsizes[35],
|
||||
vsize_1800: s.vsizes[36],
|
||||
vsize_2000: s.vsizes[37],
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export default new StatisticsApi();
|
||||
|
@ -141,6 +141,8 @@ interface IConfig {
|
||||
ENABLED: boolean;
|
||||
AUDIT: boolean;
|
||||
AUDIT_START_HEIGHT: number;
|
||||
STATISTICS: boolean;
|
||||
STATISTICS_START_TIME: number | string;
|
||||
SERVERS: string[];
|
||||
},
|
||||
MEMPOOL_SERVICES: {
|
||||
@ -298,6 +300,8 @@ const defaults: IConfig = {
|
||||
'ENABLED': false,
|
||||
'AUDIT': false,
|
||||
'AUDIT_START_HEIGHT': 774000,
|
||||
'STATISTICS': false,
|
||||
'STATISTICS_START_TIME': 1481932800,
|
||||
'SERVERS': [],
|
||||
},
|
||||
'MEMPOOL_SERVICES': {
|
||||
|
@ -8,6 +8,7 @@ import priceUpdater from './tasks/price-updater';
|
||||
import PricesRepository from './repositories/PricesRepository';
|
||||
import config from './config';
|
||||
import auditReplicator from './replication/AuditReplication';
|
||||
import statisticsReplicator from './replication/StatisticsReplication';
|
||||
import AccelerationRepository from './repositories/AccelerationRepository';
|
||||
|
||||
export interface CoreIndex {
|
||||
@ -188,6 +189,7 @@ class Indexer {
|
||||
await blocks.$generateCPFPDatabase();
|
||||
await blocks.$generateAuditStats();
|
||||
await auditReplicator.$sync();
|
||||
await statisticsReplicator.$sync();
|
||||
await AccelerationRepository.$indexPastAccelerations();
|
||||
// do not wait for classify blocks to finish
|
||||
blocks.$classifyBlocks();
|
||||
|
@ -422,6 +422,7 @@ export interface Statistic {
|
||||
|
||||
export interface OptimizedStatistic {
|
||||
added: string;
|
||||
count: number;
|
||||
vbytes_per_second: number;
|
||||
total_fee: number;
|
||||
mempool_byte_weight: number;
|
||||
|
228
backend/src/replication/StatisticsReplication.ts
Normal file
228
backend/src/replication/StatisticsReplication.ts
Normal file
@ -0,0 +1,228 @@
|
||||
import DB from '../database';
|
||||
import logger from '../logger';
|
||||
import { $sync } from './replicator';
|
||||
import config from '../config';
|
||||
import { Common } from '../api/common';
|
||||
import statistics from '../api/statistics/statistics-api';
|
||||
|
||||
interface MissingStatistics {
|
||||
'24h': Set<number>;
|
||||
'1w': Set<number>;
|
||||
'1m': Set<number>;
|
||||
'3m': Set<number>;
|
||||
'6m': Set<number>;
|
||||
'2y': Set<number>;
|
||||
'all': Set<number>;
|
||||
}
|
||||
|
||||
const steps = {
|
||||
'24h': 60,
|
||||
'1w': 300,
|
||||
'1m': 1800,
|
||||
'3m': 7200,
|
||||
'6m': 10800,
|
||||
'2y': 28800,
|
||||
'all': 43200,
|
||||
};
|
||||
|
||||
/**
|
||||
* Syncs missing statistics data from trusted servers
|
||||
*/
|
||||
class StatisticsReplication {
|
||||
inProgress: boolean = false;
|
||||
|
||||
public async $sync(): Promise<void> {
|
||||
if (!config.REPLICATION.ENABLED || !config.REPLICATION.STATISTICS || !config.STATISTICS.ENABLED) {
|
||||
// replication not enabled, or statistics not enabled
|
||||
return;
|
||||
}
|
||||
if (this.inProgress) {
|
||||
logger.info(`StatisticsReplication sync already in progress`, 'Replication');
|
||||
return;
|
||||
}
|
||||
this.inProgress = true;
|
||||
|
||||
const missingStatistics = await this.$getMissingStatistics();
|
||||
const missingIntervals = Object.keys(missingStatistics).filter(key => missingStatistics[key].size > 0);
|
||||
const totalMissing = missingIntervals.reduce((total, key) => total + missingStatistics[key].size, 0);
|
||||
|
||||
if (totalMissing === 0) {
|
||||
this.inProgress = false;
|
||||
logger.info(`Statistics table is complete, no replication needed`, 'Replication');
|
||||
return;
|
||||
}
|
||||
|
||||
for (const interval of missingIntervals) {
|
||||
logger.debug(`Missing ${missingStatistics[interval].size} statistics rows in '${interval}' timespan`, 'Replication');
|
||||
}
|
||||
logger.debug(`Fetching ${missingIntervals.join(', ')} statistics endpoints from trusted servers to fill ${totalMissing} rows missing in statistics`, 'Replication');
|
||||
|
||||
let totalSynced = 0;
|
||||
let totalMissed = 0;
|
||||
|
||||
for (const interval of missingIntervals) {
|
||||
const results = await this.$syncStatistics(interval, missingStatistics[interval]);
|
||||
totalSynced += results.synced;
|
||||
totalMissed += results.missed;
|
||||
|
||||
logger.info(`Found ${totalSynced} / ${totalSynced + totalMissed} of ${totalMissing} missing statistics rows`, 'Replication');
|
||||
await Common.sleep$(3000);
|
||||
}
|
||||
|
||||
logger.debug(`Synced ${totalSynced} statistics rows, ${totalMissed} still missing`, 'Replication');
|
||||
|
||||
this.inProgress = false;
|
||||
}
|
||||
|
||||
private async $syncStatistics(interval: string, missingTimes: Set<number>): Promise<any> {
|
||||
|
||||
let success = false;
|
||||
let synced = 0;
|
||||
let missed = new Set(missingTimes);
|
||||
const syncResult = await $sync(`/api/v1/statistics/${interval}`);
|
||||
if (syncResult && syncResult.data?.length) {
|
||||
success = true;
|
||||
logger.info(`Fetched /api/v1/statistics/${interval} from ${syncResult.server}`);
|
||||
|
||||
for (const stat of syncResult.data) {
|
||||
const time = this.roundToNearestStep(stat.added, steps[interval]);
|
||||
if (missingTimes.has(time)) {
|
||||
try {
|
||||
await statistics.$create(statistics.mapOptimizedStatisticToStatistic([stat])[0], true);
|
||||
if (missed.delete(time)) {
|
||||
synced++;
|
||||
}
|
||||
} catch (e: any) {
|
||||
logger.err(`Failed to insert statistics row at ${stat.added} (${interval}) from ${syncResult.server}. Reason: ` + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
logger.warn(`An error occured when trying to fetch /api/v1/statistics/${interval}`);
|
||||
}
|
||||
|
||||
return { success, synced, missed: missed.size };
|
||||
}
|
||||
|
||||
private async $getMissingStatistics(): Promise<MissingStatistics> {
|
||||
try {
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
const day = 60 * 60 * 24;
|
||||
|
||||
const startTime = this.getStartTimeFromConfig();
|
||||
|
||||
const missingStatistics: MissingStatistics = {
|
||||
'24h': new Set<number>(),
|
||||
'1w': new Set<number>(),
|
||||
'1m': new Set<number>(),
|
||||
'3m': new Set<number>(),
|
||||
'6m': new Set<number>(),
|
||||
'2y': new Set<number>(),
|
||||
'all': new Set<number>()
|
||||
};
|
||||
|
||||
const intervals = [ // [start, end, label ]
|
||||
[now - day, now - 60, '24h'] , // from 24 hours ago to now = 1 minute granularity
|
||||
startTime < now - day ? [now - day * 7, now - day, '1w' ] : null, // from 1 week ago to 24 hours ago = 5 minutes granularity
|
||||
startTime < now - day * 7 ? [now - day * 30, now - day * 7, '1m' ] : null, // from 1 month ago to 1 week ago = 30 minutes granularity
|
||||
startTime < now - day * 30 ? [now - day * 90, now - day * 30, '3m' ] : null, // from 3 months ago to 1 month ago = 2 hours granularity
|
||||
startTime < now - day * 90 ? [now - day * 180, now - day * 90, '6m' ] : null, // from 6 months ago to 3 months ago = 3 hours granularity
|
||||
startTime < now - day * 180 ? [now - day * 365 * 2, now - day * 180, '2y' ] : null, // from 2 years ago to 6 months ago = 8 hours granularity
|
||||
startTime < now - day * 365 * 2 ? [startTime, now - day * 365 * 2, 'all'] : null, // from start of statistics to 2 years ago = 12 hours granularity
|
||||
];
|
||||
|
||||
for (const interval of intervals) {
|
||||
if (!interval) {
|
||||
continue;
|
||||
}
|
||||
missingStatistics[interval[2] as string] = await this.$getMissingStatisticsInterval(interval, startTime);
|
||||
}
|
||||
|
||||
return missingStatistics;
|
||||
} catch (e: any) {
|
||||
logger.err(`Cannot fetch missing statistics times from db. Reason: ` + (e instanceof Error ? e.message : e));
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private async $getMissingStatisticsInterval(interval: any, startTime: number): Promise<Set<number>> {
|
||||
try {
|
||||
const start = interval[0];
|
||||
const end = interval[1];
|
||||
const step = steps[interval[2]];
|
||||
|
||||
const [rows]: any[] = await DB.query(`
|
||||
SELECT UNIX_TIMESTAMP(added) as added
|
||||
FROM statistics
|
||||
WHERE added >= FROM_UNIXTIME(?) AND added <= FROM_UNIXTIME(?)
|
||||
GROUP BY UNIX_TIMESTAMP(added) DIV ${step} ORDER BY statistics.added DESC
|
||||
`, [start, end]);
|
||||
|
||||
const startingTime = Math.max(startTime, start) - Math.max(startTime, start) % step;
|
||||
|
||||
const timeSteps: number[] = [];
|
||||
for (let time = startingTime; time < end; time += step) {
|
||||
timeSteps.push(time);
|
||||
}
|
||||
|
||||
if (timeSteps.length === 0) {
|
||||
return new Set<number>();
|
||||
}
|
||||
|
||||
const roundedTimesAlreadyHere = new Set(rows.map(row => this.roundToNearestStep(row.added, step)));
|
||||
const missingTimes = new Set(timeSteps.filter(time => !roundedTimesAlreadyHere.has(time)));
|
||||
|
||||
// Don't bother fetching if very few rows are missing
|
||||
if (missingTimes.size < timeSteps.length * 0.005) {
|
||||
return new Set();
|
||||
}
|
||||
|
||||
return missingTimes;
|
||||
} catch (e: any) {
|
||||
logger.err(`Cannot fetch missing statistics times from db. Reason: ` + (e instanceof Error ? e.message : e));
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private roundToNearestStep(time: number, step: number): number {
|
||||
const remainder = time % step;
|
||||
if (remainder < step / 2) {
|
||||
return time - remainder;
|
||||
} else {
|
||||
return time + (step - remainder);
|
||||
}
|
||||
}
|
||||
|
||||
private getStartTimeFromConfig(): number {
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
const day = 60 * 60 * 24;
|
||||
|
||||
let startTime: number;
|
||||
if (typeof(config.REPLICATION.STATISTICS_START_TIME) === 'string' && ['24h', '1w', '1m', '3m', '6m', '2y', 'all'].includes(config.REPLICATION.STATISTICS_START_TIME)) {
|
||||
if (config.REPLICATION.STATISTICS_START_TIME === 'all') {
|
||||
startTime = 1481932800;
|
||||
} else if (config.REPLICATION.STATISTICS_START_TIME === '2y') {
|
||||
startTime = now - day * 365 * 2;
|
||||
} else if (config.REPLICATION.STATISTICS_START_TIME === '6m') {
|
||||
startTime = now - day * 180;
|
||||
} else if (config.REPLICATION.STATISTICS_START_TIME === '3m') {
|
||||
startTime = now - day * 90;
|
||||
} else if (config.REPLICATION.STATISTICS_START_TIME === '1m') {
|
||||
startTime = now - day * 30;
|
||||
} else if (config.REPLICATION.STATISTICS_START_TIME === '1w') {
|
||||
startTime = now - day * 7;
|
||||
} else {
|
||||
startTime = now - day;
|
||||
}
|
||||
} else {
|
||||
startTime = Math.max(config.REPLICATION.STATISTICS_START_TIME as number || 1481932800, 1481932800);
|
||||
}
|
||||
|
||||
return startTime;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
export default new StatisticsReplication();
|
||||
|
@ -137,6 +137,8 @@
|
||||
"ENABLED": __REPLICATION_ENABLED__,
|
||||
"AUDIT": __REPLICATION_AUDIT__,
|
||||
"AUDIT_START_HEIGHT": __REPLICATION_AUDIT_START_HEIGHT__,
|
||||
"STATISTICS": __REPLICATION_STATISTICS__,
|
||||
"STATISTICS_START_TIME": __REPLICATION_STATISTICS_START_TIME__,
|
||||
"SERVERS": __REPLICATION_SERVERS__
|
||||
},
|
||||
"MEMPOOL_SERVICES": {
|
||||
|
@ -138,6 +138,8 @@ __MAXMIND_GEOIP2_ISP__=${MAXMIND_GEOIP2_ISP:=""}
|
||||
__REPLICATION_ENABLED__=${REPLICATION_ENABLED:=false}
|
||||
__REPLICATION_AUDIT__=${REPLICATION_AUDIT:=false}
|
||||
__REPLICATION_AUDIT_START_HEIGHT__=${REPLICATION_AUDIT_START_HEIGHT:=774000}
|
||||
__REPLICATION_STATISTICS__=${REPLICATION_STATISTICS:=false}
|
||||
__REPLICATION_STATISTICS_START_TIME__=${REPLICATION_STATISTICS_START_TIME:=1481932800}
|
||||
__REPLICATION_SERVERS__=${REPLICATION_SERVERS:=[]}
|
||||
|
||||
# MEMPOOL_SERVICES
|
||||
@ -284,6 +286,8 @@ sed -i "s!__MAXMIND_GEOIP2_ISP__!${__MAXMIND_GEOIP2_ISP__}!g" mempool-config.jso
|
||||
sed -i "s!__REPLICATION_ENABLED__!${__REPLICATION_ENABLED__}!g" mempool-config.json
|
||||
sed -i "s!__REPLICATION_AUDIT__!${__REPLICATION_AUDIT__}!g" mempool-config.json
|
||||
sed -i "s!__REPLICATION_AUDIT_START_HEIGHT__!${__REPLICATION_AUDIT_START_HEIGHT__}!g" mempool-config.json
|
||||
sed -i "s!__REPLICATION_STATISTICS__!${__REPLICATION_STATISTICS__}!g" mempool-config.json
|
||||
sed -i "s!__REPLICATION_STATISTICS_START_TIME__!${__REPLICATION_STATISTICS_START_TIME__}!g" mempool-config.json
|
||||
sed -i "s!__REPLICATION_SERVERS__!${__REPLICATION_SERVERS__}!g" mempool-config.json
|
||||
|
||||
# MEMPOOL_SERVICES
|
||||
|
@ -97,6 +97,8 @@
|
||||
"ENABLED": true,
|
||||
"AUDIT": true,
|
||||
"AUDIT_START_HEIGHT": 774000,
|
||||
"STATISTICS": true,
|
||||
"STATISTICS_START_TIME": "24h",
|
||||
"SERVERS": [
|
||||
"node201.fmt.mempool.space",
|
||||
"node202.fmt.mempool.space",
|
||||
|
Loading…
Reference in New Issue
Block a user