mirror of
https://github.com/mempool/mempool.git
synced 2025-03-13 11:36:07 +01:00
Store block first seen in db
This commit is contained in:
parent
6884830da6
commit
dd0542bbe1
8 changed files with 101 additions and 57 deletions
|
@ -1,4 +1,3 @@
|
|||
import * as fs from 'fs';
|
||||
import config from '../config';
|
||||
import logger from '../logger';
|
||||
import { MempoolTransactionExtended, MempoolBlockWithTransactions } from '../mempool.interfaces';
|
||||
|
@ -8,10 +7,10 @@ import transactionUtils from './transaction-utils';
|
|||
const PROPAGATION_MARGIN = 180; // in seconds, time since a transaction is first seen after which it is assumed to have propagated to all miners
|
||||
|
||||
class Audit {
|
||||
auditBlock(height: number, transactions: MempoolTransactionExtended[], projectedBlocks: MempoolBlockWithTransactions[], mempool: { [txId: string]: MempoolTransactionExtended }, hash: string)
|
||||
: { unseen: string[], censored: string[], added: string[], prioritized: string[], fresh: string[], sigop: string[], fullrbf: string[], accelerated: string[], score: number, similarity: number, firstSeen: string | undefined } {
|
||||
auditBlock(height: number, transactions: MempoolTransactionExtended[], projectedBlocks: MempoolBlockWithTransactions[], mempool: { [txId: string]: MempoolTransactionExtended })
|
||||
: { unseen: string[], censored: string[], added: string[], prioritized: string[], fresh: string[], sigop: string[], fullrbf: string[], accelerated: string[], score: number, similarity: number } {
|
||||
if (!projectedBlocks?.[0]?.transactionIds || !mempool) {
|
||||
return { unseen: [], censored: [], added: [], prioritized: [], fresh: [], sigop: [], fullrbf: [], accelerated: [], score: 1, similarity: 1, firstSeen: undefined };
|
||||
return { unseen: [], censored: [], added: [], prioritized: [], fresh: [], sigop: [], fullrbf: [], accelerated: [], score: 1, similarity: 1 };
|
||||
}
|
||||
|
||||
const matches: string[] = []; // present in both mined block and template
|
||||
|
@ -177,8 +176,6 @@ class Audit {
|
|||
}
|
||||
const similarity = projectedWeight ? matchedWeight / projectedWeight : 1;
|
||||
|
||||
const firstSeen = this.getFirstSeenFromLogs(hash);
|
||||
|
||||
return {
|
||||
unseen,
|
||||
censored: Object.keys(isCensored),
|
||||
|
@ -190,39 +187,8 @@ class Audit {
|
|||
accelerated,
|
||||
score,
|
||||
similarity,
|
||||
firstSeen
|
||||
};
|
||||
}
|
||||
|
||||
getFirstSeenFromLogs(hash: string): string | undefined {
|
||||
const debugLogPath = config.CORE_RPC.DEBUG_LOG_PATH;
|
||||
if (debugLogPath) {
|
||||
try {
|
||||
const fileDescriptor = fs.openSync(debugLogPath, 'r');
|
||||
const bufferSize = 2048; // Read the last few lines of the file
|
||||
const buffer = Buffer.alloc(bufferSize);
|
||||
const fileSize = fs.statSync(debugLogPath).size;
|
||||
const chunkSize = Math.min(bufferSize, fileSize);
|
||||
fs.readSync(fileDescriptor, buffer, 0, chunkSize, fileSize - chunkSize);
|
||||
const lines = buffer.toString('utf8', 0, chunkSize).split('\n');
|
||||
fs.closeSync(fileDescriptor);
|
||||
|
||||
for (let i = lines.length - 1; i >= 0; i--) {
|
||||
const line = lines[i];
|
||||
if (line && line.includes(`Saw new header hash=${hash}`)) {
|
||||
// Extract time from log: "2021-08-31T12:34:56Z" or "2021-08-31T12:34:56.123456Z" if logtimemicros=1
|
||||
const dateMatch = line.match(/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.?\d{6})?Z/);
|
||||
if (dateMatch) {
|
||||
return dateMatch[0].replace("T", " ").replace("Z", "");
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
logger.err(`Cannot parse block first seen time from Core logs. Reason: ` + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
export default new Audit();
|
|
@ -707,7 +707,7 @@ class DatabaseMigration {
|
|||
}
|
||||
|
||||
if (databaseSchemaVersion < 83 && isBitcoin === true) {
|
||||
await this.$executeQuery('ALTER TABLE `blocks_audits` ADD first_seen timestamp(6) DEFAULT NULL');
|
||||
await this.$executeQuery('ALTER TABLE `blocks` ADD first_seen datetime(6) DEFAULT NULL');
|
||||
await this.updateToSchemaVersion(83);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ import transactionUtils from './transaction-utils';
|
|||
import rbfCache, { ReplacementInfo } from './rbf-cache';
|
||||
import difficultyAdjustment from './difficulty-adjustment';
|
||||
import feeApi from './fee-api';
|
||||
import BlocksRepository from '../repositories/BlocksRepository';
|
||||
import BlocksAuditsRepository from '../repositories/BlocksAuditsRepository';
|
||||
import BlocksSummariesRepository from '../repositories/BlocksSummariesRepository';
|
||||
import Audit from './audit';
|
||||
|
@ -34,6 +35,7 @@ interface AddressTransactions {
|
|||
}
|
||||
import bitcoinSecondClient from './bitcoin/bitcoin-second-client';
|
||||
import { calculateMempoolTxCpfp } from './cpfp';
|
||||
import { getRecentFirstSeen } from '../utils/file-read';
|
||||
|
||||
// valid 'want' subscriptions
|
||||
const wantable = [
|
||||
|
@ -975,7 +977,7 @@ class WebsocketHandler {
|
|||
}
|
||||
|
||||
if (Common.indexingEnabled()) {
|
||||
const { unseen, censored, added, prioritized, fresh, sigop, fullrbf, accelerated, score, similarity, firstSeen } = Audit.auditBlock(block.height, blockTransactions, projectedBlocks, auditMempool, block.id);
|
||||
const { unseen, censored, added, prioritized, fresh, sigop, fullrbf, accelerated, score, similarity } = Audit.auditBlock(block.height, blockTransactions, projectedBlocks, auditMempool);
|
||||
const matchRate = Math.round(score * 100 * 100) / 100;
|
||||
|
||||
const stripped = projectedBlocks[0]?.transactions ? projectedBlocks[0].transactions : [];
|
||||
|
@ -1012,7 +1014,6 @@ class WebsocketHandler {
|
|||
matchRate: matchRate,
|
||||
expectedFees: totalFees,
|
||||
expectedWeight: totalWeight,
|
||||
firstSeen: firstSeen,
|
||||
});
|
||||
|
||||
if (block.extras) {
|
||||
|
@ -1029,6 +1030,14 @@ class WebsocketHandler {
|
|||
}
|
||||
}
|
||||
|
||||
if (config.CORE_RPC.DEBUG_LOG_PATH && block.extras) {
|
||||
const firstSeen = getRecentFirstSeen(block.id);
|
||||
if (firstSeen) {
|
||||
BlocksRepository.$saveFirstSeenTime(block.id, firstSeen);
|
||||
block.extras.firstSeen = firstSeen;
|
||||
}
|
||||
}
|
||||
|
||||
const confirmedTxids: { [txid: string]: boolean } = {};
|
||||
|
||||
// Update mempool to remove transactions included in the new block
|
||||
|
|
|
@ -45,7 +45,6 @@ export interface BlockAudit {
|
|||
expectedFees?: number,
|
||||
expectedWeight?: number,
|
||||
template?: any[];
|
||||
firstSeen?: string;
|
||||
}
|
||||
|
||||
export interface TransactionAudit {
|
||||
|
@ -58,7 +57,6 @@ export interface TransactionAudit {
|
|||
conflict?: boolean;
|
||||
coinbase?: boolean;
|
||||
firstSeen?: number;
|
||||
blockFirstSeen?: string;
|
||||
}
|
||||
|
||||
export interface AuditScore {
|
||||
|
@ -322,6 +320,7 @@ export interface BlockExtension {
|
|||
segwitTotalSize: number;
|
||||
segwitTotalWeight: number;
|
||||
header: string;
|
||||
firstSeen: number | null;
|
||||
utxoSetChange: number;
|
||||
// Requires coinstatsindex, will be set to NULL otherwise
|
||||
utxoSetSize: number | null;
|
||||
|
|
|
@ -124,8 +124,7 @@ class AuditReplication {
|
|||
matchRate: auditSummary.matchRate,
|
||||
expectedFees: auditSummary.expectedFees,
|
||||
expectedWeight: auditSummary.expectedWeight,
|
||||
firstSeen: auditSummary.firstSeen,
|
||||
}, true);
|
||||
});
|
||||
// add missing data to cached blocks
|
||||
const cachedBlock = blocks.getBlocks().find(block => block.id === blockHash);
|
||||
if (cachedBlock) {
|
||||
|
|
|
@ -15,11 +15,11 @@ interface MigrationAudit {
|
|||
}
|
||||
|
||||
class BlocksAuditRepositories {
|
||||
public async $saveAudit(audit: BlockAudit, replication = false): Promise<void> {
|
||||
public async $saveAudit(audit: BlockAudit): Promise<void> {
|
||||
try {
|
||||
await DB.query(`INSERT INTO blocks_audits(version, time, height, hash, unseen_txs, missing_txs, added_txs, prioritized_txs, fresh_txs, sigop_txs, fullrbf_txs, accelerated_txs, match_rate, expected_fees, expected_weight, first_seen)
|
||||
VALUE (?, FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ${replication ? 'FROM_UNIXTIME(?)' : '?'})`, [audit.version, audit.time, audit.height, audit.hash, JSON.stringify(audit.unseenTxs), JSON.stringify(audit.missingTxs),
|
||||
JSON.stringify(audit.addedTxs), JSON.stringify(audit.prioritizedTxs), JSON.stringify(audit.freshTxs), JSON.stringify(audit.sigopTxs), JSON.stringify(audit.fullrbfTxs), JSON.stringify(audit.acceleratedTxs), audit.matchRate, audit.expectedFees, audit.expectedWeight, audit.firstSeen]);
|
||||
await DB.query(`INSERT INTO blocks_audits(version, time, height, hash, unseen_txs, missing_txs, added_txs, prioritized_txs, fresh_txs, sigop_txs, fullrbf_txs, accelerated_txs, match_rate, expected_fees, expected_weight)
|
||||
VALUE (?, FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, [audit.version, audit.time, audit.height, audit.hash, JSON.stringify(audit.unseenTxs), JSON.stringify(audit.missingTxs),
|
||||
JSON.stringify(audit.addedTxs), JSON.stringify(audit.prioritizedTxs), JSON.stringify(audit.freshTxs), JSON.stringify(audit.sigopTxs), JSON.stringify(audit.fullrbfTxs), JSON.stringify(audit.acceleratedTxs), audit.matchRate, audit.expectedFees, audit.expectedWeight]);
|
||||
} catch (e: any) {
|
||||
if (e.errno === 1062) { // ER_DUP_ENTRY - This scenario is possible upon node backend restart
|
||||
logger.debug(`Cannot save block audit for block ${audit.hash} because it has already been indexed, ignoring`);
|
||||
|
@ -78,7 +78,6 @@ class BlocksAuditRepositories {
|
|||
blocks_audits.height,
|
||||
blocks_audits.hash as id,
|
||||
UNIX_TIMESTAMP(blocks_audits.time) as timestamp,
|
||||
UNIX_TIMESTAMP(blocks_audits.first_seen) as firstSeen,
|
||||
template,
|
||||
unseen_txs as unseenTxs,
|
||||
missing_txs as missingTxs,
|
||||
|
@ -97,7 +96,6 @@ class BlocksAuditRepositories {
|
|||
`, [hash]);
|
||||
|
||||
if (rows.length) {
|
||||
console.log(rows[0].firstSeen);
|
||||
rows[0].unseenTxs = JSON.parse(rows[0].unseenTxs);
|
||||
rows[0].missingTxs = JSON.parse(rows[0].missingTxs);
|
||||
rows[0].addedTxs = JSON.parse(rows[0].addedTxs);
|
||||
|
@ -108,10 +106,6 @@ class BlocksAuditRepositories {
|
|||
rows[0].acceleratedTxs = JSON.parse(rows[0].acceleratedTxs);
|
||||
rows[0].template = JSON.parse(rows[0].template);
|
||||
|
||||
if (!rows[0].firstSeen) {
|
||||
delete rows[0].firstSeen;
|
||||
}
|
||||
|
||||
return rows[0];
|
||||
}
|
||||
return null;
|
||||
|
@ -130,7 +124,6 @@ class BlocksAuditRepositories {
|
|||
const isPrioritized = blockAudit.prioritizedTxs.includes(txid);
|
||||
const isAccelerated = blockAudit.acceleratedTxs.includes(txid);
|
||||
const isConflict = blockAudit.fullrbfTxs.includes(txid);
|
||||
const blockFirstSeen = blockAudit.firstSeen;
|
||||
let isExpected = false;
|
||||
let firstSeen = undefined;
|
||||
blockAudit.template?.forEach(tx => {
|
||||
|
@ -149,7 +142,6 @@ class BlocksAuditRepositories {
|
|||
conflict: isConflict,
|
||||
accelerated: isAccelerated,
|
||||
firstSeen,
|
||||
...(blockFirstSeen) && { blockFirstSeen },
|
||||
};
|
||||
}
|
||||
return null;
|
||||
|
|
|
@ -57,6 +57,7 @@ interface DatabaseBlock {
|
|||
utxoSetChange: number;
|
||||
utxoSetSize: number;
|
||||
totalInputAmt: number;
|
||||
firstSeen: number;
|
||||
}
|
||||
|
||||
const BLOCK_DB_FIELDS = `
|
||||
|
@ -99,7 +100,8 @@ const BLOCK_DB_FIELDS = `
|
|||
blocks.header,
|
||||
blocks.utxoset_change AS utxoSetChange,
|
||||
blocks.utxoset_size AS utxoSetSize,
|
||||
blocks.total_input_amt AS totalInputAmt
|
||||
blocks.total_input_amt AS totalInputAmt,
|
||||
UNIX_TIMESTAMP(blocks.first_seen) AS firstSeen
|
||||
`;
|
||||
|
||||
class BlocksRepository {
|
||||
|
@ -1021,6 +1023,24 @@ class BlocksRepository {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Save block first seen time
|
||||
*
|
||||
* @param id
|
||||
*/
|
||||
public async $saveFirstSeenTime(id: string, firstSeen: number): Promise<void> {
|
||||
try {
|
||||
await DB.query(`
|
||||
UPDATE blocks SET first_seen = FROM_UNIXTIME(?)
|
||||
WHERE hash = ?`,
|
||||
[firstSeen, id]
|
||||
);
|
||||
} catch (e) {
|
||||
logger.err(`Cannot update block first seen time. Reason: ` + (e instanceof Error ? e.message : e));
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a mysql row block into a BlockExtended. Note that you
|
||||
* must provide the correct field into dbBlk object param
|
||||
|
@ -1078,6 +1098,7 @@ class BlocksRepository {
|
|||
extras.utxoSetSize = dbBlk.utxoSetSize;
|
||||
extras.totalInputAmt = dbBlk.totalInputAmt;
|
||||
extras.virtualSize = dbBlk.weight / 4.0;
|
||||
extras.firstSeen = dbBlk.firstSeen;
|
||||
|
||||
// Re-org can happen after indexing so we need to always get the
|
||||
// latest state from core
|
||||
|
|
58
backend/src/utils/file-read.ts
Normal file
58
backend/src/utils/file-read.ts
Normal file
|
@ -0,0 +1,58 @@
|
|||
import * as fs from 'fs';
|
||||
import logger from '../logger';
|
||||
import config from '../config';
|
||||
|
||||
function readFile(filePath: string, bufferSize?: number): string[] {
|
||||
const fileSize = fs.statSync(filePath).size;
|
||||
const chunkSize = bufferSize || fileSize;
|
||||
const fileDescriptor = fs.openSync(filePath, 'r');
|
||||
const buffer = Buffer.alloc(chunkSize);
|
||||
|
||||
fs.readSync(fileDescriptor, buffer, 0, chunkSize, fileSize - chunkSize);
|
||||
fs.closeSync(fileDescriptor);
|
||||
|
||||
const lines = buffer.toString('utf8', 0, chunkSize).split('\n');
|
||||
return lines;
|
||||
}
|
||||
|
||||
function extractDateFromLogLine(line: string): number | undefined {
|
||||
// Extract time from log: "2021-08-31T12:34:56Z" or "2021-08-31T12:34:56.123456Z"
|
||||
const dateMatch = line.match(/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d{6})?Z/);
|
||||
if (!dateMatch) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const dateStr = dateMatch[0];
|
||||
const date = new Date(dateStr);
|
||||
let timestamp = Math.floor(date.getTime() / 1000); // Remove decimal (microseconds are added later)
|
||||
|
||||
const timePart = dateStr.split('T')[1];
|
||||
const microseconds = timePart.split('.')[1] || '';
|
||||
|
||||
if (!microseconds) {
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
return parseFloat(timestamp + '.' + microseconds);
|
||||
}
|
||||
|
||||
export function getRecentFirstSeen(hash: string): number | undefined {
|
||||
const debugLogPath = config.CORE_RPC.DEBUG_LOG_PATH;
|
||||
if (debugLogPath) {
|
||||
try {
|
||||
// Read the last few lines of debug.log
|
||||
const lines = readFile(debugLogPath, 2048);
|
||||
|
||||
for (let i = lines.length - 1; i >= 0; i--) {
|
||||
const line = lines[i];
|
||||
if (line && line.includes(`Saw new header hash=${hash}`)) {
|
||||
return extractDateFromLogLine(line);
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
logger.err(`Cannot parse block first seen time from Core logs. Reason: ` + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
Loading…
Add table
Reference in a new issue