mirror of
https://github.com/mempool/mempool.git
synced 2025-02-24 06:47:52 +01:00
Fix new block mempool deletion race condition
This commit is contained in:
parent
a7dff0effe
commit
d322c6b5b5
6 changed files with 22 additions and 29 deletions
|
@ -529,13 +529,14 @@ class Blocks {
|
||||||
return await BlocksRepository.$validateChain();
|
return await BlocksRepository.$validateChain();
|
||||||
}
|
}
|
||||||
|
|
||||||
public async $updateBlocks() {
|
public async $updateBlocks(): Promise<number> {
|
||||||
// warn if this run stalls the main loop for more than 2 minutes
|
// warn if this run stalls the main loop for more than 2 minutes
|
||||||
const timer = this.startTimer();
|
const timer = this.startTimer();
|
||||||
|
|
||||||
diskCache.lock();
|
diskCache.lock();
|
||||||
|
|
||||||
let fastForwarded = false;
|
let fastForwarded = false;
|
||||||
|
let handledBlocks = 0;
|
||||||
const blockHeightTip = await bitcoinApi.$getBlockHeightTip();
|
const blockHeightTip = await bitcoinApi.$getBlockHeightTip();
|
||||||
this.updateTimerProgress(timer, 'got block height tip');
|
this.updateTimerProgress(timer, 'got block height tip');
|
||||||
|
|
||||||
|
@ -697,11 +698,15 @@ class Blocks {
|
||||||
this.updateTimerProgress(timer, `waiting for async callbacks to complete for ${this.currentBlockHeight}`);
|
this.updateTimerProgress(timer, `waiting for async callbacks to complete for ${this.currentBlockHeight}`);
|
||||||
await Promise.all(callbackPromises);
|
await Promise.all(callbackPromises);
|
||||||
this.updateTimerProgress(timer, `async callbacks completed for ${this.currentBlockHeight}`);
|
this.updateTimerProgress(timer, `async callbacks completed for ${this.currentBlockHeight}`);
|
||||||
|
|
||||||
|
handledBlocks++;
|
||||||
}
|
}
|
||||||
|
|
||||||
diskCache.unlock();
|
diskCache.unlock();
|
||||||
|
|
||||||
this.clearTimer(timer);
|
this.clearTimer(timer);
|
||||||
|
|
||||||
|
return handledBlocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
private startTimer() {
|
private startTimer() {
|
||||||
|
|
|
@ -52,7 +52,7 @@ class DiskCache {
|
||||||
const mempool = memPool.getMempool();
|
const mempool = memPool.getMempool();
|
||||||
const mempoolArray: TransactionExtended[] = [];
|
const mempoolArray: TransactionExtended[] = [];
|
||||||
for (const tx in mempool) {
|
for (const tx in mempool) {
|
||||||
if (mempool[tx] && !mempool[tx].deleteAfter) {
|
if (mempool[tx]) {
|
||||||
mempoolArray.push(mempool[tx]);
|
mempoolArray.push(mempool[tx]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -178,7 +178,7 @@ class MempoolBlocks {
|
||||||
// prepare a stripped down version of the mempool with only the minimum necessary data
|
// prepare a stripped down version of the mempool with only the minimum necessary data
|
||||||
// to reduce the overhead of passing this data to the worker thread
|
// to reduce the overhead of passing this data to the worker thread
|
||||||
const strippedMempool: { [txid: string]: ThreadTransaction } = {};
|
const strippedMempool: { [txid: string]: ThreadTransaction } = {};
|
||||||
Object.values(newMempool).filter(tx => !tx.deleteAfter).forEach(entry => {
|
Object.values(newMempool).forEach(entry => {
|
||||||
strippedMempool[entry.txid] = {
|
strippedMempool[entry.txid] = {
|
||||||
txid: entry.txid,
|
txid: entry.txid,
|
||||||
fee: entry.fee,
|
fee: entry.fee,
|
||||||
|
|
|
@ -12,7 +12,6 @@ import rbfCache from './rbf-cache';
|
||||||
|
|
||||||
class Mempool {
|
class Mempool {
|
||||||
private static WEBSOCKET_REFRESH_RATE_MS = 10000;
|
private static WEBSOCKET_REFRESH_RATE_MS = 10000;
|
||||||
private static LAZY_DELETE_AFTER_SECONDS = 30;
|
|
||||||
private inSync: boolean = false;
|
private inSync: boolean = false;
|
||||||
private mempoolCacheDelta: number = -1;
|
private mempoolCacheDelta: number = -1;
|
||||||
private mempoolCache: { [txId: string]: TransactionExtended } = {};
|
private mempoolCache: { [txId: string]: TransactionExtended } = {};
|
||||||
|
@ -119,7 +118,7 @@ class Mempool {
|
||||||
return txTimes;
|
return txTimes;
|
||||||
}
|
}
|
||||||
|
|
||||||
public async $updateMempool(): Promise<void> {
|
public async $updateMempool(transactions: string[]): Promise<void> {
|
||||||
logger.debug(`Updating mempool...`);
|
logger.debug(`Updating mempool...`);
|
||||||
|
|
||||||
// warn if this run stalls the main loop for more than 2 minutes
|
// warn if this run stalls the main loop for more than 2 minutes
|
||||||
|
@ -128,7 +127,6 @@ class Mempool {
|
||||||
const start = new Date().getTime();
|
const start = new Date().getTime();
|
||||||
let hasChange: boolean = false;
|
let hasChange: boolean = false;
|
||||||
const currentMempoolSize = Object.keys(this.mempoolCache).length;
|
const currentMempoolSize = Object.keys(this.mempoolCache).length;
|
||||||
const transactions = await bitcoinApi.$getRawMempool();
|
|
||||||
this.updateTimerProgress(timer, 'got raw mempool');
|
this.updateTimerProgress(timer, 'got raw mempool');
|
||||||
const diff = transactions.length - currentMempoolSize;
|
const diff = transactions.length - currentMempoolSize;
|
||||||
const newTransactions: TransactionExtended[] = [];
|
const newTransactions: TransactionExtended[] = [];
|
||||||
|
@ -207,13 +205,15 @@ class Mempool {
|
||||||
const transactionsObject = {};
|
const transactionsObject = {};
|
||||||
transactions.forEach((txId) => transactionsObject[txId] = true);
|
transactions.forEach((txId) => transactionsObject[txId] = true);
|
||||||
|
|
||||||
// Flag transactions for lazy deletion
|
// Delete evicted transactions from mempool
|
||||||
for (const tx in this.mempoolCache) {
|
for (const tx in this.mempoolCache) {
|
||||||
if (!transactionsObject[tx] && !this.mempoolCache[tx].deleteAfter) {
|
if (!transactionsObject[tx]) {
|
||||||
deletedTransactions.push(this.mempoolCache[tx]);
|
deletedTransactions.push(this.mempoolCache[tx]);
|
||||||
this.mempoolCache[tx].deleteAfter = new Date().getTime() + Mempool.LAZY_DELETE_AFTER_SECONDS * 1000;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for (const tx of deletedTransactions) {
|
||||||
|
delete this.mempoolCache[tx.txid];
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const newTransactionsStripped = newTransactions.map((tx) => Common.stripTransaction(tx));
|
const newTransactionsStripped = newTransactions.map((tx) => Common.stripTransaction(tx));
|
||||||
|
@ -270,10 +270,6 @@ class Mempool {
|
||||||
if (this.mempoolCache[rbfTransaction] && rbfTransactions[rbfTransaction]?.length) {
|
if (this.mempoolCache[rbfTransaction] && rbfTransactions[rbfTransaction]?.length) {
|
||||||
// Store replaced transactions
|
// Store replaced transactions
|
||||||
rbfCache.add(rbfTransactions[rbfTransaction], this.mempoolCache[rbfTransaction]);
|
rbfCache.add(rbfTransactions[rbfTransaction], this.mempoolCache[rbfTransaction]);
|
||||||
// Erase the replaced transactions from the local mempool
|
|
||||||
for (const replaced of rbfTransactions[rbfTransaction]) {
|
|
||||||
delete this.mempoolCache[replaced.txid];
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -291,17 +287,6 @@ class Mempool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public deleteExpiredTransactions() {
|
|
||||||
const now = new Date().getTime();
|
|
||||||
for (const tx in this.mempoolCache) {
|
|
||||||
const lazyDeleteAt = this.mempoolCache[tx].deleteAfter;
|
|
||||||
if (lazyDeleteAt && lazyDeleteAt < now) {
|
|
||||||
delete this.mempoolCache[tx];
|
|
||||||
rbfCache.evict(tx);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private $getMempoolInfo() {
|
private $getMempoolInfo() {
|
||||||
if (config.MEMPOOL.USE_SECOND_NODE_FOR_MINFEE) {
|
if (config.MEMPOOL.USE_SECOND_NODE_FOR_MINFEE) {
|
||||||
return Promise.all([
|
return Promise.all([
|
||||||
|
|
|
@ -2,6 +2,7 @@ import express from 'express';
|
||||||
import { Application, Request, Response, NextFunction } from 'express';
|
import { Application, Request, Response, NextFunction } from 'express';
|
||||||
import * as http from 'http';
|
import * as http from 'http';
|
||||||
import * as WebSocket from 'ws';
|
import * as WebSocket from 'ws';
|
||||||
|
import bitcoinApi from './api/bitcoin/bitcoin-api-factory';
|
||||||
import cluster from 'cluster';
|
import cluster from 'cluster';
|
||||||
import DB from './database';
|
import DB from './database';
|
||||||
import config from './config';
|
import config from './config';
|
||||||
|
@ -179,12 +180,15 @@ class Server {
|
||||||
logger.debug(msg);
|
logger.debug(msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
await blocks.$updateBlocks();
|
const newMempool = await bitcoinApi.$getRawMempool();
|
||||||
memPool.deleteExpiredTransactions();
|
const numHandledBlocks = await blocks.$updateBlocks();
|
||||||
await memPool.$updateMempool();
|
if (numHandledBlocks === 0) {
|
||||||
|
await memPool.$updateMempool(newMempool);
|
||||||
|
}
|
||||||
indexer.$run();
|
indexer.$run();
|
||||||
|
|
||||||
setTimeout(this.runMainUpdateLoop.bind(this), config.MEMPOOL.POLL_RATE_MS);
|
// rerun immediately if we skipped the mempool update, otherwise wait POLL_RATE_MS
|
||||||
|
setTimeout(this.runMainUpdateLoop.bind(this), numHandledBlocks > 0 ? 1 : config.MEMPOOL.POLL_RATE_MS);
|
||||||
this.backendRetryCount = 0;
|
this.backendRetryCount = 0;
|
||||||
} catch (e: any) {
|
} catch (e: any) {
|
||||||
this.backendRetryCount++;
|
this.backendRetryCount++;
|
||||||
|
|
|
@ -80,7 +80,6 @@ export interface TransactionExtended extends IEsploraApi.Transaction {
|
||||||
descendants?: Ancestor[];
|
descendants?: Ancestor[];
|
||||||
bestDescendant?: BestDescendant | null;
|
bestDescendant?: BestDescendant | null;
|
||||||
cpfpChecked?: boolean;
|
cpfpChecked?: boolean;
|
||||||
deleteAfter?: number;
|
|
||||||
position?: {
|
position?: {
|
||||||
block: number,
|
block: number,
|
||||||
vsize: number,
|
vsize: number,
|
||||||
|
|
Loading…
Add table
Reference in a new issue