Catch RBF replacements across mempool update boundaries

This commit is contained in:
Mononaut 2024-08-26 21:51:49 +00:00
parent 185eae00e9
commit e362003746
No known key found for this signature in database
GPG Key ID: A3F058E41374C04E
3 changed files with 36 additions and 30 deletions

View File

@ -80,8 +80,8 @@ export class Common {
return arr;
}
static findRbfTransactions(added: MempoolTransactionExtended[], deleted: MempoolTransactionExtended[], forceScalable = false): { [txid: string]: MempoolTransactionExtended[] } {
const matches: { [txid: string]: MempoolTransactionExtended[] } = {};
static findRbfTransactions(added: MempoolTransactionExtended[], deleted: MempoolTransactionExtended[], forceScalable = false): { [txid: string]: { replaced: MempoolTransactionExtended[], replacedBy: TransactionExtended }} {
const matches: { [txid: string]: { replaced: MempoolTransactionExtended[], replacedBy: TransactionExtended }} = {};
// For small N, a naive nested loop is extremely fast, but it doesn't scale
if (added.length < 1000 && deleted.length < 50 && !forceScalable) {
@ -96,7 +96,7 @@ export class Common {
addedTx.vin.some((vin) => vin.txid === deletedVin.txid && vin.vout === deletedVin.vout));
});
if (foundMatches?.length) {
matches[addedTx.txid] = [...new Set(foundMatches)];
matches[addedTx.txid] = { replaced: [...new Set(foundMatches)], replacedBy: addedTx };
}
});
} else {
@ -124,7 +124,7 @@ export class Common {
foundMatches.add(deletedTx);
}
if (foundMatches.size) {
matches[addedTx.txid] = [...foundMatches];
matches[addedTx.txid] = { replaced: [...foundMatches], replacedBy: addedTx };
}
}
}
@ -139,17 +139,17 @@ export class Common {
const replaced: Set<MempoolTransactionExtended> = new Set();
for (let i = 0; i < tx.vin.length; i++) {
const vin = tx.vin[i];
const match = spendMap.get(`${vin.txid}:${vin.vout}`);
const key = `${vin.txid}:${vin.vout}`;
const match = spendMap.get(key);
if (match && match.txid !== tx.txid) {
replaced.add(match);
// remove this tx from the spendMap
// prevents the same tx being replaced more than once
for (const replacedVin of match.vin) {
const key = `${replacedVin.txid}:${replacedVin.vout}`;
spendMap.delete(key);
const replacedKey = `${replacedVin.txid}:${replacedVin.vout}`;
spendMap.delete(replacedKey);
}
}
const key = `${vin.txid}:${vin.vout}`;
spendMap.delete(key);
}
if (replaced.size) {

View File

@ -19,12 +19,13 @@ class Mempool {
private mempoolCache: { [txId: string]: MempoolTransactionExtended } = {};
private mempoolCandidates: { [txid: string ]: boolean } = {};
private spendMap = new Map<string, MempoolTransactionExtended>();
private recentlyDeleted: MempoolTransactionExtended[][] = []; // buffer of transactions deleted in recent mempool updates
private mempoolInfo: IBitcoinApi.MempoolInfo = { loaded: false, size: 0, bytes: 0, usage: 0, total_fee: 0,
maxmempool: 300000000, mempoolminfee: Common.isLiquid() ? 0.00000100 : 0.00001000, minrelaytxfee: Common.isLiquid() ? 0.00000100 : 0.00001000 };
private mempoolChangedCallback: ((newMempool: {[txId: string]: MempoolTransactionExtended; }, newTransactions: MempoolTransactionExtended[],
deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[]) => void) | undefined;
deletedTransactions: MempoolTransactionExtended[][], accelerationDelta: string[]) => void) | undefined;
private $asyncMempoolChangedCallback: ((newMempool: {[txId: string]: MempoolTransactionExtended; }, mempoolSize: number, newTransactions: MempoolTransactionExtended[],
deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[], candidates?: GbtCandidates) => Promise<void>) | undefined;
deletedTransactions: MempoolTransactionExtended[][], accelerationDelta: string[], candidates?: GbtCandidates) => Promise<void>) | undefined;
private accelerations: { [txId: string]: Acceleration } = {};
private accelerationPositions: { [txid: string]: { poolId: number, pool: string, block: number, vsize: number }[] } = {};
@ -74,12 +75,12 @@ class Mempool {
}
public setMempoolChangedCallback(fn: (newMempool: { [txId: string]: MempoolTransactionExtended; },
newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[]) => void): void {
newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[][], accelerationDelta: string[]) => void): void {
this.mempoolChangedCallback = fn;
}
public setAsyncMempoolChangedCallback(fn: (newMempool: { [txId: string]: MempoolTransactionExtended; }, mempoolSize: number,
newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[],
newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[][], accelerationDelta: string[],
candidates?: GbtCandidates) => Promise<void>): void {
this.$asyncMempoolChangedCallback = fn;
}
@ -362,12 +363,15 @@ class Mempool {
const candidatesChanged = candidates?.added?.length || candidates?.removed?.length;
if (this.mempoolChangedCallback && (hasChange || deletedTransactions.length)) {
this.mempoolChangedCallback(this.mempoolCache, newTransactions, deletedTransactions, accelerationDelta);
this.recentlyDeleted.unshift(deletedTransactions);
this.recentlyDeleted.length = Math.min(this.recentlyDeleted.length, 10); // truncate to the last 10 mempool updates
if (this.mempoolChangedCallback && (hasChange || newTransactions.length || deletedTransactions.length)) {
this.mempoolChangedCallback(this.mempoolCache, newTransactions, this.recentlyDeleted, accelerationDelta);
}
if (this.$asyncMempoolChangedCallback && (hasChange || deletedTransactions.length || candidatesChanged)) {
if (this.$asyncMempoolChangedCallback && (hasChange || newTransactions.length || deletedTransactions.length || candidatesChanged)) {
this.updateTimerProgress(timer, 'running async mempool callback');
await this.$asyncMempoolChangedCallback(this.mempoolCache, newMempoolSize, newTransactions, deletedTransactions, accelerationDelta, candidates);
await this.$asyncMempoolChangedCallback(this.mempoolCache, newMempoolSize, newTransactions, this.recentlyDeleted, accelerationDelta, candidates);
this.updateTimerProgress(timer, 'completed async mempool callback');
}
@ -541,16 +545,7 @@ class Mempool {
}
}
public handleRbfTransactions(rbfTransactions: { [txid: string]: MempoolTransactionExtended[]; }): void {
for (const rbfTransaction in rbfTransactions) {
if (this.mempoolCache[rbfTransaction] && rbfTransactions[rbfTransaction]?.length) {
// Store replaced transactions
rbfCache.add(rbfTransactions[rbfTransaction], this.mempoolCache[rbfTransaction]);
}
}
}
public handleMinedRbfTransactions(rbfTransactions: { [txid: string]: { replaced: MempoolTransactionExtended[], replacedBy: TransactionExtended }}): void {
public handleRbfTransactions(rbfTransactions: { [txid: string]: { replaced: MempoolTransactionExtended[], replacedBy: TransactionExtended }}): void {
for (const rbfTransaction in rbfTransactions) {
if (rbfTransactions[rbfTransaction].replacedBy && rbfTransactions[rbfTransaction]?.replaced?.length) {
// Store replaced transactions

View File

@ -520,8 +520,17 @@ class WebsocketHandler {
}
}
/**
*
* @param newMempool
* @param mempoolSize
* @param newTransactions array of transactions added this mempool update.
* @param recentlyDeletedTransactions array of arrays of transactions removed in the last N mempool updates, most recent first.
* @param accelerationDelta
* @param candidates
*/
async $handleMempoolChange(newMempool: { [txid: string]: MempoolTransactionExtended }, mempoolSize: number,
newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[],
newTransactions: MempoolTransactionExtended[], recentlyDeletedTransactions: MempoolTransactionExtended[][], accelerationDelta: string[],
candidates?: GbtCandidates): Promise<void> {
if (!this.webSocketServers.length) {
throw new Error('No WebSocket.Server have been set');
@ -529,6 +538,8 @@ class WebsocketHandler {
this.printLogs();
const deletedTransactions = recentlyDeletedTransactions.length ? recentlyDeletedTransactions[0] : [];
const transactionIds = (memPool.limitGBT && candidates) ? Object.keys(candidates?.txs || {}) : Object.keys(newMempool);
let added = newTransactions;
let removed = deletedTransactions;
@ -547,7 +558,7 @@ class WebsocketHandler {
const mBlockDeltas = mempoolBlocks.getMempoolBlockDeltas();
const mempoolInfo = memPool.getMempoolInfo();
const vBytesPerSecond = memPool.getVBytesPerSecond();
const rbfTransactions = Common.findRbfTransactions(newTransactions, deletedTransactions);
const rbfTransactions = Common.findRbfTransactions(newTransactions, recentlyDeletedTransactions.flat());
const da = difficultyAdjustment.getDifficultyAdjustment();
const accelerations = memPool.getAccelerations();
memPool.handleRbfTransactions(rbfTransactions);
@ -578,7 +589,7 @@ class WebsocketHandler {
const replacedTransactions: { replaced: string, by: TransactionExtended }[] = [];
for (const tx of newTransactions) {
if (rbfTransactions[tx.txid]) {
for (const replaced of rbfTransactions[tx.txid]) {
for (const replaced of rbfTransactions[tx.txid].replaced) {
replacedTransactions.push({ replaced: replaced.txid, by: tx });
}
}
@ -947,7 +958,7 @@ class WebsocketHandler {
await accelerationRepository.$indexAccelerationsForBlock(block, accelerations, structuredClone(transactions));
const rbfTransactions = Common.findMinedRbfTransactions(transactions, memPool.getSpendMap());
memPool.handleMinedRbfTransactions(rbfTransactions);
memPool.handleRbfTransactions(rbfTransactions);
memPool.removeFromSpendMap(transactions);
if (config.MEMPOOL.AUDIT && memPool.isInSync()) {