From 033e78c0a7d44098e19f764db512ad65ec6c82b9 Mon Sep 17 00:00:00 2001 From: Mononaut Date: Mon, 8 May 2023 19:03:39 -0600 Subject: [PATCH] Optimize main thread processing of GBT updates --- backend/src/api/mempool-blocks.ts | 219 ++++++++++++++----------- backend/src/api/tx-selection-worker.ts | 39 +++-- backend/src/mempool.interfaces.ts | 1 + 3 files changed, 144 insertions(+), 115 deletions(-) diff --git a/backend/src/api/mempool-blocks.ts b/backend/src/api/mempool-blocks.ts index af23a6376..62717ed7e 100644 --- a/backend/src/api/mempool-blocks.ts +++ b/backend/src/api/mempool-blocks.ts @@ -1,5 +1,5 @@ import logger from '../logger'; -import { MempoolBlock, TransactionExtended, ThreadTransaction, TransactionStripped, MempoolBlockWithTransactions, MempoolBlockDelta, Ancestor, CompactThreadTransaction } from '../mempool.interfaces'; +import { MempoolBlock, TransactionExtended, TransactionStripped, MempoolBlockWithTransactions, MempoolBlockDelta, Ancestor, CompactThreadTransaction } from '../mempool.interfaces'; import { Common } from './common'; import config from '../config'; import { Worker } from 'worker_threads'; @@ -104,8 +104,12 @@ class MempoolBlocks { private calculateMempoolBlocks(transactionsSorted: TransactionExtended[]): MempoolBlockWithTransactions[] { const mempoolBlocks: MempoolBlockWithTransactions[] = []; + let blockSize = 0; let blockWeight = 0; let blockVsize = 0; + let blockFees = 0; + const sizeLimit = (config.MEMPOOL.BLOCK_WEIGHT_UNITS / 4) * 1.2; + let transactionIds: string[] = []; let transactions: TransactionExtended[] = []; transactionsSorted.forEach((tx) => { if (blockWeight + tx.weight <= config.MEMPOOL.BLOCK_WEIGHT_UNITS @@ -116,9 +120,14 @@ class MempoolBlocks { }; blockWeight += tx.weight; blockVsize += tx.vsize; - transactions.push(tx); + blockSize += tx.size; + blockFees += tx.fee; + if (blockVsize <= sizeLimit) { + transactions.push(tx); + } + transactionIds.push(tx.txid); } else { - mempoolBlocks.push(this.dataToMempoolBlocks(transactions)); + mempoolBlocks.push(this.dataToMempoolBlocks(transactionIds, transactions, blockSize, blockWeight, blockFees)); blockVsize = 0; tx.position = { block: mempoolBlocks.length, @@ -126,11 +135,14 @@ class MempoolBlocks { }; blockVsize += tx.vsize; blockWeight = tx.weight; + blockSize = tx.size; + blockFees = tx.fee; + transactionIds = [tx.txid]; transactions = [tx]; } }); if (transactions.length) { - mempoolBlocks.push(this.dataToMempoolBlocks(transactions)); + mempoolBlocks.push(this.dataToMempoolBlocks(transactionIds, transactions, blockSize, blockWeight, blockFees)); } return mempoolBlocks; @@ -178,6 +190,8 @@ class MempoolBlocks { } public async $makeBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, saveResults: boolean = false): Promise { + const start = Date.now(); + // reset mempool short ids this.resetUids(); for (const tx of Object.values(newMempool)) { @@ -194,7 +208,7 @@ class MempoolBlocks { fee: entry.fee, weight: entry.weight, feePerVsize: entry.fee / (entry.weight / 4), - effectiveFeePerVsize: entry.fee / (entry.weight / 4), + effectiveFeePerVsize: entry.effectiveFeePerVsize || (entry.fee / (entry.weight / 4)), inputs: entry.vin.map(v => this.getUid(newMempool[v.txid])).filter(uid => uid != null) as number[], }); } @@ -216,7 +230,7 @@ class MempoolBlocks { // run the block construction algorithm in a separate thread, and wait for a result let threadErrorListener; try { - const workerResultPromise = new Promise<{ blocks: CompactThreadTransaction[][], clusters: Map }>((resolve, reject) => { + const workerResultPromise = new Promise<{ blocks: number[][], rates: Map, clusters: Map }>((resolve, reject) => { threadErrorListener = reject; this.txSelectionWorker?.once('message', (result): void => { resolve(result); @@ -224,19 +238,14 @@ class MempoolBlocks { this.txSelectionWorker?.once('error', reject); }); this.txSelectionWorker.postMessage({ type: 'set', mempool: strippedMempool }); - let { blocks, clusters } = this.convertResultTxids(await workerResultPromise); - // filter out stale transactions - const unfilteredCount = blocks.reduce((total, block) => { return total + block.length; }, 0); - blocks = blocks.map(block => block.filter(tx => (tx.txid && tx.txid in newMempool))); - const filteredCount = blocks.reduce((total, block) => { return total + block.length; }, 0); - if (filteredCount < unfilteredCount) { - logger.warn(`tx selection worker thread returned ${unfilteredCount - filteredCount} stale transactions from makeBlockTemplates`); - } + const { blocks, rates, clusters } = this.convertResultTxids(await workerResultPromise); // clean up thread error listener this.txSelectionWorker?.removeListener('error', threadErrorListener); - return this.processBlockTemplates(newMempool, blocks, clusters, saveResults); + const processed = this.processBlockTemplates(newMempool, blocks, rates, clusters, saveResults); + logger.debug(`makeBlockTemplates completed in ${(Date.now() - start)/1000} seconds`); + return processed; } catch (e) { logger.err('makeBlockTemplates failed. ' + (e instanceof Error ? e.message : e)); } @@ -250,6 +259,8 @@ class MempoolBlocks { return; } + const start = Date.now(); + for (const tx of Object.values(added)) { this.setUid(tx); } @@ -262,7 +273,7 @@ class MempoolBlocks { fee: entry.fee, weight: entry.weight, feePerVsize: entry.fee / (entry.weight / 4), - effectiveFeePerVsize: entry.fee / (entry.weight / 4), + effectiveFeePerVsize: entry.effectiveFeePerVsize || (entry.fee / (entry.weight / 4)), inputs: entry.vin.map(v => this.getUid(newMempool[v.txid])).filter(uid => uid != null) as number[], }; }); @@ -270,7 +281,7 @@ class MempoolBlocks { // run the block construction algorithm in a separate thread, and wait for a result let threadErrorListener; try { - const workerResultPromise = new Promise<{ blocks: CompactThreadTransaction[][], clusters: Map }>((resolve, reject) => { + const workerResultPromise = new Promise<{ blocks: number[][], rates: Map, clusters: Map }>((resolve, reject) => { threadErrorListener = reject; this.txSelectionWorker?.once('message', (result): void => { resolve(result); @@ -278,84 +289,100 @@ class MempoolBlocks { this.txSelectionWorker?.once('error', reject); }); this.txSelectionWorker.postMessage({ type: 'update', added: addedStripped, removed: removedUids }); - let { blocks, clusters } = this.convertResultTxids(await workerResultPromise); - // filter out stale transactions - const unfilteredCount = blocks.reduce((total, block) => { return total + block.length; }, 0); - blocks = blocks.map(block => block.filter(tx => (tx.txid && tx.txid in newMempool))); - const filteredCount = blocks.reduce((total, block) => { return total + block.length; }, 0); - if (filteredCount < unfilteredCount) { - logger.warn(`tx selection worker thread returned ${unfilteredCount - filteredCount} stale transactions from updateBlockTemplates`); - } + const { blocks, rates, clusters } = this.convertResultTxids(await workerResultPromise); this.removeUids(removedUids); // clean up thread error listener this.txSelectionWorker?.removeListener('error', threadErrorListener); - this.processBlockTemplates(newMempool, blocks, clusters, saveResults); + this.processBlockTemplates(newMempool, blocks, rates, clusters, saveResults); + logger.debug(`updateBlockTemplates completed in ${(Date.now() - start) / 1000} seconds`); } catch (e) { logger.err('updateBlockTemplates failed. ' + (e instanceof Error ? e.message : e)); } } - private processBlockTemplates(mempool, blocks: ThreadTransaction[][], clusters, saveResults): MempoolBlockWithTransactions[] { + private processBlockTemplates(mempool, blocks: string[][], rates: { [root: string]: number }, clusters: { [root: string]: string[] }, saveResults): MempoolBlockWithTransactions[] { + for (const txid of Object.keys(rates)) { + if (txid in mempool) { + mempool[txid].effectiveFeePerVsize = rates[txid]; + } + } + + const readyBlocks: { transactionIds, transactions, totalSize, totalWeight, totalFees }[] = []; + const sizeLimit = (config.MEMPOOL.BLOCK_WEIGHT_UNITS / 4) * 1.2; // update this thread's mempool with the results - blocks.forEach((block, blockIndex) => { - let runningVsize = 0; - block.forEach(tx => { - if (tx.txid && tx.txid in mempool) { + for (let blockIndex = 0; blockIndex < blocks.length; blockIndex++) { + const block: string[] = blocks[blockIndex]; + let txid: string; + let mempoolTx: TransactionExtended; + let totalSize = 0; + let totalVsize = 0; + let totalWeight = 0; + let totalFees = 0; + const transactions: TransactionExtended[] = []; + for (let txIndex = 0; txIndex < block.length; txIndex++) { + txid = block[txIndex]; + if (txid) { + mempoolTx = mempool[txid]; // save position in projected blocks - mempool[tx.txid].position = { + mempoolTx.position = { block: blockIndex, - vsize: runningVsize + (mempool[tx.txid].vsize / 2), + vsize: totalVsize + (mempoolTx.vsize / 2), }; - runningVsize += mempool[tx.txid].vsize; + mempoolTx.cpfpChecked = true; - if (tx.effectiveFeePerVsize != null) { - mempool[tx.txid].effectiveFeePerVsize = tx.effectiveFeePerVsize; + totalSize += mempoolTx.size; + totalVsize += mempoolTx.vsize; + totalWeight += mempoolTx.weight; + totalFees += mempoolTx.fee; + + if (totalVsize <= sizeLimit) { + transactions.push(mempoolTx); } - if (tx.cpfpRoot && tx.cpfpRoot in clusters) { - const ancestors: Ancestor[] = []; - const descendants: Ancestor[] = []; - const cluster = clusters[tx.cpfpRoot]; - let matched = false; - cluster.forEach(txid => { - if (!txid || !mempool[txid]) { - logger.warn('projected transaction ancestor missing from mempool cache'); - return; - } - if (txid === tx.txid) { - matched = true; - } else { - const relative = { - txid: txid, - fee: mempool[txid].fee, - weight: mempool[txid].weight, - }; - if (matched) { - descendants.push(relative); - } else { - ancestors.push(relative); - } - } - }); - mempool[tx.txid].ancestors = ancestors; - mempool[tx.txid].descendants = descendants; - mempool[tx.txid].bestDescendant = null; - } - mempool[tx.txid].cpfpChecked = tx.cpfpChecked; - } else { - logger.warn('projected transaction missing from mempool cache'); } + } + readyBlocks.push({ + transactionIds: block, + transactions, + totalSize, + totalWeight, + totalFees }); - }); + } - // unpack the condensed blocks into proper mempool blocks - const mempoolBlocks = blocks.map((transactions) => { - return this.dataToMempoolBlocks(transactions.map(tx => { - return mempool[tx.txid] || null; - }).filter(tx => !!tx)); - }); + for (const cluster of Object.values(clusters)) { + for (const memberTxid of cluster) { + if (memberTxid in mempool) { + const mempoolTx = mempool[memberTxid]; + const ancestors: Ancestor[] = []; + const descendants: Ancestor[] = []; + let matched = false; + cluster.forEach(txid => { + if (txid === memberTxid) { + matched = true; + } else { + const relative = { + txid: txid, + fee: mempool[txid].fee, + weight: mempool[txid].weight, + }; + if (matched) { + descendants.push(relative); + } else { + ancestors.push(relative); + } + } + }); + mempoolTx.ancestors = ancestors; + mempoolTx.descendants = descendants; + mempoolTx.bestDescendant = null; + } + } + } + + const mempoolBlocks = readyBlocks.map(b => this.dataToMempoolBlocks(b.transactionIds, b.transactions, b.totalSize, b.totalWeight, b.totalFees)); if (saveResults) { const deltas = this.calculateMempoolDeltas(this.mempoolBlocks, mempoolBlocks); @@ -366,27 +393,17 @@ class MempoolBlocks { return mempoolBlocks; } - private dataToMempoolBlocks(transactions: TransactionExtended[]): MempoolBlockWithTransactions { - let totalSize = 0; - let totalWeight = 0; - const fitTransactions: TransactionExtended[] = []; - transactions.forEach(tx => { - totalSize += tx.size; - totalWeight += tx.weight; - if ((totalWeight + tx.weight) <= config.MEMPOOL.BLOCK_WEIGHT_UNITS * 1.2) { - fitTransactions.push(tx); - } - }); + private dataToMempoolBlocks(transactionIds: string[], transactions: TransactionExtended[], totalSize: number, totalWeight: number, totalFees: number): MempoolBlockWithTransactions { const feeStats = Common.calcEffectiveFeeStatistics(transactions); return { blockSize: totalSize, - blockVSize: totalWeight / 4, - nTx: transactions.length, - totalFees: transactions.reduce((acc, cur) => acc + cur.fee, 0), + blockVSize: (totalWeight / 4), // fractional vsize to avoid rounding errors + nTx: transactionIds.length, + totalFees: totalFees, medianFee: feeStats.medianFee, // Common.percentile(transactions.map((tx) => tx.effectiveFeePerVsize), config.MEMPOOL.RECOMMENDED_FEE_PERCENTILE), feeRange: feeStats.feeRange, //Common.getFeesInRange(transactions, rangeLength), - transactionIds: transactions.map((tx) => tx.txid), - transactions: fitTransactions.map((tx) => Common.stripTransaction(tx)), + transactionIds: transactionIds, + transactions: transactions.map((tx) => Common.stripTransaction(tx)), }; } @@ -415,14 +432,16 @@ class MempoolBlocks { } } - private convertResultTxids({ blocks, clusters }: { blocks: any[][], clusters: Map}) - : { blocks: ThreadTransaction[][], clusters: { [root: string]: string[] }} { - for (const block of blocks) { - for (const tx of block) { - tx.txid = this.uidMap.get(tx.uid); - if (tx.cpfpRoot) { - tx.cpfpRoot = this.uidMap.get(tx.cpfpRoot); - } + private convertResultTxids({ blocks, rates, clusters }: { blocks: number[][], rates: Map, clusters: Map}) + : { blocks: string[][], rates: { [root: string]: number }, clusters: { [root: string]: string[] }} { + const convertedBlocks: string[][] = blocks.map(block => block.map(uid => { + return this.uidMap.get(uid) || ''; + })); + const convertedRates = {}; + for (const rateUid of rates.keys()) { + const rateTxid = this.uidMap.get(rateUid); + if (rateTxid) { + convertedRates[rateTxid] = rates.get(rateUid); } } const convertedClusters = {}; @@ -435,7 +454,7 @@ class MempoolBlocks { convertedClusters[rootTxid] = members; } } - return { blocks, clusters: convertedClusters } as { blocks: ThreadTransaction[][], clusters: { [root: string]: string[] }}; + return { blocks: convertedBlocks, rates: convertedRates, clusters: convertedClusters } as { blocks: string[][], rates: { [root: string]: number }, clusters: { [root: string]: string[] }}; } } diff --git a/backend/src/api/tx-selection-worker.ts b/backend/src/api/tx-selection-worker.ts index 1635acac4..b22f42823 100644 --- a/backend/src/api/tx-selection-worker.ts +++ b/backend/src/api/tx-selection-worker.ts @@ -1,6 +1,6 @@ import config from '../config'; import logger from '../logger'; -import { CompactThreadTransaction, MempoolBlockWithTransactions, AuditTransaction } from '../mempool.interfaces'; +import { CompactThreadTransaction, AuditTransaction } from '../mempool.interfaces'; import { PairingHeap } from '../utils/pairing-heap'; import { parentPort } from 'worker_threads'; @@ -19,11 +19,11 @@ if (parentPort) { }); } - const { blocks, clusters } = makeBlockTemplates(mempool); + const { blocks, rates, clusters } = makeBlockTemplates(mempool); // return the result to main thread. if (parentPort) { - parentPort.postMessage({ blocks, clusters }); + parentPort.postMessage({ blocks, rates, clusters }); } }); } @@ -33,14 +33,14 @@ if (parentPort) { * (see BlockAssembler in https://github.com/bitcoin/bitcoin/blob/master/src/node/miner.cpp) */ function makeBlockTemplates(mempool: Map) - : { blocks: CompactThreadTransaction[][], clusters: Map } { + : { blocks: number[][], rates: Map, clusters: Map } { const start = Date.now(); const auditPool: Map = new Map(); const mempoolArray: AuditTransaction[] = []; - const restOfArray: CompactThreadTransaction[] = []; const cpfpClusters: Map = new Map(); mempool.forEach(tx => { + tx.dirty = false; // initializing everything up front helps V8 optimize property access later auditPool.set(tx.uid, { uid: tx.uid, @@ -81,9 +81,8 @@ function makeBlockTemplates(mempool: Map) // Build blocks by greedily choosing the highest feerate package // (i.e. the package rooted in the transaction with the best ancestor score) - const blocks: CompactThreadTransaction[][] = []; + const blocks: number[][] = []; let blockWeight = 4000; - let blockSize = 0; let transactions: AuditTransaction[] = []; const modified: PairingHeap = new PairingHeap((a, b): boolean => { if (a.score === b.score) { @@ -139,13 +138,16 @@ function makeBlockTemplates(mempool: Map) ancestor.used = true; ancestor.usedBy = nextTx.uid; // update original copy of this tx with effective fee rate & relatives data - mempoolTx.effectiveFeePerVsize = effectiveFeeRate; - if (isCluster) { - mempoolTx.cpfpRoot = nextTx.uid; + if (mempoolTx.effectiveFeePerVsize !== effectiveFeeRate) { + mempoolTx.effectiveFeePerVsize = effectiveFeeRate; + mempoolTx.dirty = true; + } + if (mempoolTx.cpfpRoot !== nextTx.uid) { + mempoolTx.cpfpRoot = isCluster ? nextTx.uid : null; + mempoolTx.dirty; } mempoolTx.cpfpChecked = true; transactions.push(ancestor); - blockSize += ancestor.size; blockWeight += ancestor.weight; used.push(ancestor); } @@ -171,11 +173,10 @@ function makeBlockTemplates(mempool: Map) if ((exceededPackageTries || queueEmpty) && blocks.length < 7) { // construct this block if (transactions.length) { - blocks.push(transactions.map(t => mempool.get(t.uid) as CompactThreadTransaction)); + blocks.push(transactions.map(t => t.uid)); } // reset for the next block transactions = []; - blockSize = 0; blockWeight = 4000; // 'overflow' packages didn't fit in this block, but are valid candidates for the next @@ -196,14 +197,22 @@ function makeBlockTemplates(mempool: Map) } // add the final unbounded block if it contains any transactions if (transactions.length > 0) { - blocks.push(transactions.map(t => mempool.get(t.uid) as CompactThreadTransaction)); + blocks.push(transactions.map(t => t.uid)); + } + + // get map of dirty transactions + const rates = new Map(); + for (const tx of mempool.values()) { + if (tx?.dirty) { + rates.set(tx.uid, tx.effectiveFeePerVsize || tx.feePerVsize); + } } const end = Date.now(); const time = end - start; logger.debug('Mempool templates calculated in ' + time / 1000 + ' seconds'); - return { blocks, clusters: cpfpClusters }; + return { blocks, rates, clusters: cpfpClusters }; } // traverse in-mempool ancestors diff --git a/backend/src/mempool.interfaces.ts b/backend/src/mempool.interfaces.ts index 53bd3ff33..ab4c4cd25 100644 --- a/backend/src/mempool.interfaces.ts +++ b/backend/src/mempool.interfaces.ts @@ -114,6 +114,7 @@ export interface CompactThreadTransaction { inputs: number[]; cpfpRoot?: string; cpfpChecked?: boolean; + dirty?: boolean; } export interface ThreadTransaction {