From 8fdc44aa89c2af8ebc8fc4b0d4a3cdaa1beb9180 Mon Sep 17 00:00:00 2001 From: Mononaut Date: Mon, 3 Jul 2023 22:01:54 -0400 Subject: [PATCH] replace audit_pool hashmap with a vec --- backend/rust-gbt/index.d.ts | 4 +- backend/rust-gbt/src/gbt.rs | 84 ++++++++++++++------------ backend/rust-gbt/src/lib.rs | 10 +-- backend/src/__tests__/gbt/gbt-tests.ts | 10 +-- backend/src/api/mempool-blocks.ts | 8 ++- 5 files changed, 65 insertions(+), 51 deletions(-) diff --git a/backend/rust-gbt/index.d.ts b/backend/rust-gbt/index.d.ts index 68b56a8e1..33ae32bdf 100644 --- a/backend/rust-gbt/index.d.ts +++ b/backend/rust-gbt/index.d.ts @@ -19,13 +19,13 @@ export class GbtGenerator { * * Rejects if the thread panics or if the Mutex is poisoned. */ - make(mempool: Array): Promise + make(mempool: Array, maxUid: number): Promise /** * # Errors * * Rejects if the thread panics or if the Mutex is poisoned. */ - update(newTxs: Array, removeTxs: Array): Promise + update(newTxs: Array, removeTxs: Array, maxUid: number): Promise } /** * The result from calling the gbt function. diff --git a/backend/rust-gbt/src/gbt.rs b/backend/rust-gbt/src/gbt.rs index 9354d706b..876bd12ef 100644 --- a/backend/rust-gbt/src/gbt.rs +++ b/backend/rust-gbt/src/gbt.rs @@ -1,14 +1,14 @@ use priority_queue::PriorityQueue; use std::{ cmp::Ordering, - collections::{HashMap, HashSet}, + collections::{HashSet}, }; use tracing::{info, trace}; use crate::{ audit_transaction::{partial_cmp_uid_score, AuditTransaction}, u32_hasher_types::{ - u32hashmap_with_capacity, u32hashset_new, u32priority_queue_with_capacity, U32HasherState, + u32hashset_new, u32priority_queue_with_capacity, U32HasherState, }, GbtResult, ThreadTransactionsMap, }; @@ -19,7 +19,7 @@ const BLOCK_RESERVED_WEIGHT: u32 = 4_000; const BLOCK_RESERVED_SIGOPS: u32 = 400; const MAX_BLOCKS: usize = 8; -type AuditPool = HashMap; +type AuditPool = Vec>; type ModifiedQueue = PriorityQueue; #[derive(Debug)] @@ -58,9 +58,10 @@ impl Ord for TxPriority { // TODO: Make gbt smaller to fix these lints. #[allow(clippy::too_many_lines)] #[allow(clippy::cognitive_complexity)] -pub fn gbt(mempool: &mut ThreadTransactionsMap) -> GbtResult { +pub fn gbt(mempool: &mut ThreadTransactionsMap, max_uid: usize) -> GbtResult { let mempool_len = mempool.len(); - let mut audit_pool: AuditPool = u32hashmap_with_capacity(mempool_len); + let mut audit_pool: AuditPool = Vec::with_capacity(max_uid + 1); + audit_pool.resize(max_uid + 1, None); let mut mempool_stack: Vec = Vec::with_capacity(mempool_len); let mut clusters: Vec> = Vec::new(); let mut block_weights: Vec = Vec::new(); @@ -69,7 +70,7 @@ pub fn gbt(mempool: &mut ThreadTransactionsMap) -> GbtResult { for (uid, tx) in &mut *mempool { let audit_tx = AuditTransaction::from_thread_transaction(tx); // Safety: audit_pool and mempool_stack must always contain the same transactions - audit_pool.insert(audit_tx.uid, audit_tx); + audit_pool[*uid as usize] = Some(audit_tx); mempool_stack.push(*uid); } @@ -84,7 +85,8 @@ pub fn gbt(mempool: &mut ThreadTransactionsMap) -> GbtResult { .into_iter() .map(|txid| { let atx = audit_pool - .get(&txid) + .get(txid as usize) + .and_then(Option::as_ref) .expect("All txids are from audit_pool"); (txid, atx.order(), atx.score()) }) @@ -154,7 +156,7 @@ pub fn gbt(mempool: &mut ThreadTransactionsMap) -> GbtResult { let mut cluster: Vec = Vec::new(); let is_cluster: bool = !next_tx.ancestors.is_empty(); for ancestor_id in &next_tx.ancestors { - if let Some(ancestor) = audit_pool.get(ancestor_id) { + if let Some(Some(ancestor)) = audit_pool.get(*ancestor_id as usize) { package.push((*ancestor_id, ancestor.order(), ancestor.ancestors.len())); } } @@ -176,7 +178,7 @@ pub fn gbt(mempool: &mut ThreadTransactionsMap) -> GbtResult { for (txid, _, _) in &package { cluster.push(*txid); - if let Some(tx) = audit_pool.get_mut(txid) { + if let Some(Some(tx)) = audit_pool.get_mut(*txid as usize) { tx.used = true; tx.set_dirty_if_different(cluster_rate); transactions.push(tx.uid); @@ -211,7 +213,7 @@ pub fn gbt(mempool: &mut ThreadTransactionsMap) -> GbtResult { // 'overflow' packages didn't fit in this block, but are valid candidates for the next overflow.reverse(); for overflowed in &overflow { - if let Some(overflowed_tx) = audit_pool.get(overflowed) { + if let Some(Some(overflowed_tx)) = audit_pool.get(*overflowed as usize) { if overflowed_tx.modified { modified.push( *overflowed, @@ -237,12 +239,12 @@ pub fn gbt(mempool: &mut ThreadTransactionsMap) -> GbtResult { info!("make a list of dirty transactions and their new rates"); let mut rates: Vec> = Vec::new(); - for (txid, tx) in audit_pool { - trace!("txid: {}, is_dirty: {}", txid, tx.dirty); - if tx.dirty { - rates.push(vec![f64::from(txid), tx.effective_fee_per_vsize]); - if let Some(mempool_tx) = mempool.get_mut(&txid) { - mempool_tx.effective_fee_per_vsize = tx.effective_fee_per_vsize; + for (uid, thread_tx) in mempool { + if let Some(Some(audit_tx)) = audit_pool.get(*uid as usize) { + trace!("txid: {}, is_dirty: {}", uid, audit_tx.dirty); + if audit_tx.dirty { + rates.push(vec![f64::from(*uid), audit_tx.effective_fee_per_vsize]); + thread_tx.effective_fee_per_vsize = audit_tx.effective_fee_per_vsize; } } } @@ -263,33 +265,39 @@ fn next_valid_from_stack<'a>( mempool_stack: &mut Vec, audit_pool: &'a AuditPool, ) -> Option<&'a AuditTransaction> { - let mut next_txid = mempool_stack.last()?; - let mut tx: &AuditTransaction = audit_pool.get(next_txid)?; - while tx.used || tx.modified { - mempool_stack.pop(); - next_txid = mempool_stack.last()?; - tx = audit_pool.get(next_txid)?; + while let Some(next_txid) = mempool_stack.last() { + match audit_pool.get(*next_txid as usize) { + Some(Some(tx)) if !tx.used && !tx.modified => { + return Some(tx); + } + _ => { + mempool_stack.pop(); + } + } } - Some(tx) + None } fn next_valid_from_queue<'a>( queue: &mut ModifiedQueue, audit_pool: &'a AuditPool, ) -> Option<&'a AuditTransaction> { - let mut next_txid = queue.peek()?.0; - let mut tx: &AuditTransaction = audit_pool.get(next_txid)?; - while tx.used { - queue.pop(); - next_txid = queue.peek()?.0; - tx = audit_pool.get(next_txid)?; + while let Some((next_txid, _)) = queue.peek() { + match audit_pool.get(*next_txid as usize) { + Some(Some(tx)) if !tx.used => { + return Some(tx); + } + _ => { + queue.pop(); + } + } } - Some(tx) + None } fn set_relatives(txid: u32, audit_pool: &mut AuditPool) { let mut parents: HashSet = u32hashset_new(); - if let Some(tx) = audit_pool.get(&txid) { + if let Some(Some(tx)) = audit_pool.get(txid as usize) { if tx.relatives_set_flag { return; } @@ -304,7 +312,7 @@ fn set_relatives(txid: u32, audit_pool: &mut AuditPool) { for parent_id in &parents { set_relatives(*parent_id, audit_pool); - if let Some(parent) = audit_pool.get_mut(parent_id) { + if let Some(Some(parent)) = audit_pool.get_mut(*parent_id as usize) { // Safety: ancestors must always contain only txes in audit_pool ancestors.insert(*parent_id); parent.children.insert(txid); @@ -320,16 +328,16 @@ fn set_relatives(txid: u32, audit_pool: &mut AuditPool) { let mut total_sigops: u32 = 0; for ancestor_id in &ancestors { - let ancestor = audit_pool - .get(ancestor_id) - .expect("audit_pool contains all ancestors"); + let Some(ancestor) = audit_pool + .get(*ancestor_id as usize) + .expect("audit_pool contains all ancestors") else { todo!() }; total_fee += ancestor.fee; total_sigop_adjusted_weight += ancestor.sigop_adjusted_weight; total_sigop_adjusted_vsize += ancestor.sigop_adjusted_vsize; total_sigops += ancestor.sigops; } - if let Some(tx) = audit_pool.get_mut(&txid) { + if let Some(Some(tx)) = audit_pool.get_mut(txid as usize) { tx.set_ancestors( ancestors, total_fee, @@ -353,7 +361,7 @@ fn update_descendants( let root_sigop_adjusted_weight: u32; let root_sigop_adjusted_vsize: u32; let root_sigops: u32; - if let Some(root_tx) = audit_pool.get(&root_txid) { + if let Some(Some(root_tx)) = audit_pool.get(root_txid as usize) { for descendant_id in &root_tx.children { if !visited.contains(descendant_id) { descendant_stack.push(*descendant_id); @@ -368,7 +376,7 @@ fn update_descendants( return; } while let Some(next_txid) = descendant_stack.pop() { - if let Some(descendant) = audit_pool.get_mut(&next_txid) { + if let Some(Some(descendant)) = audit_pool.get_mut(next_txid as usize) { // remove root tx as ancestor let old_score = descendant.remove_root( root_txid, diff --git a/backend/rust-gbt/src/lib.rs b/backend/rust-gbt/src/lib.rs index 08f7cb599..21b333441 100644 --- a/backend/rust-gbt/src/lib.rs +++ b/backend/rust-gbt/src/lib.rs @@ -74,9 +74,9 @@ impl GbtGenerator { /// /// Rejects if the thread panics or if the Mutex is poisoned. #[napi] - pub async fn make(&self, mempool: Vec) -> Result { + pub async fn make(&self, mempool: Vec, max_uid: u32) -> Result { trace!("make: Current State {:#?}", self.thread_transactions); - run_task(Arc::clone(&self.thread_transactions), move |map| { + run_task(Arc::clone(&self.thread_transactions), max_uid as usize, move |map| { for tx in mempool { map.insert(tx.uid, tx); } @@ -92,9 +92,10 @@ impl GbtGenerator { &self, new_txs: Vec, remove_txs: Vec, + max_uid: u32, ) -> Result { trace!("update: Current State {:#?}", self.thread_transactions); - run_task(Arc::clone(&self.thread_transactions), move |map| { + run_task(Arc::clone(&self.thread_transactions), max_uid as usize, move |map| { for tx in new_txs { map.insert(tx.uid, tx); } @@ -132,6 +133,7 @@ pub struct GbtResult { /// to the `HashMap` as the only argument. (A move closure is recommended to meet the bounds) async fn run_task( thread_transactions: Arc>, + max_uid: usize, callback: F, ) -> Result where @@ -149,7 +151,7 @@ where callback(&mut map); info!("Starting gbt algorithm for {} elements...", map.len()); - let result = gbt::gbt(&mut map); + let result = gbt::gbt(&mut map, max_uid); info!("Finished gbt algorithm for {} elements...", map.len()); debug!( diff --git a/backend/src/__tests__/gbt/gbt-tests.ts b/backend/src/__tests__/gbt/gbt-tests.ts index 8927b4096..8af76a47c 100644 --- a/backend/src/__tests__/gbt/gbt-tests.ts +++ b/backend/src/__tests__/gbt/gbt-tests.ts @@ -14,8 +14,8 @@ const vectorBuffer: Buffer = fs.readFileSync(path.join(__dirname, './', './test- describe('Rust GBT', () => { test('should produce the same template as getBlockTemplate from Bitcoin Core', async () => { const rustGbt = new GbtGenerator(); - const mempool = mempoolFromArrayBuffer(vectorBuffer.buffer); - const result = await rustGbt.make(mempool); + const { mempool, maxUid } = mempoolFromArrayBuffer(vectorBuffer.buffer); + const result = await rustGbt.make(mempool, maxUid); const blocks: [string, number][][] = result.blocks.map(block => { return block.map(uid => [vectorUidMap.get(uid) || 'missing', uid]); @@ -27,13 +27,15 @@ describe('Rust GBT', () => { }); }); -function mempoolFromArrayBuffer(buf: ArrayBuffer): ThreadTransaction[] { +function mempoolFromArrayBuffer(buf: ArrayBuffer): { mempool: ThreadTransaction[], maxUid: number } { + let maxUid = 0; const view = new DataView(buf); const count = view.getUint32(0, false); const txs: ThreadTransaction[] = []; let offset = 4; for (let i = 0; i < count; i++) { const uid = view.getUint32(offset, false); + maxUid = Math.max(maxUid, uid); const tx: ThreadTransaction = { uid, order: txidToOrdering(vectorUidMap.get(uid) as string), @@ -52,7 +54,7 @@ function mempoolFromArrayBuffer(buf: ArrayBuffer): ThreadTransaction[] { } txs.push(tx); } - return txs; + return { mempool: txs, maxUid }; } function txidToOrdering(txid: string): number { diff --git a/backend/src/api/mempool-blocks.ts b/backend/src/api/mempool-blocks.ts index a012ba4c2..d5538854a 100644 --- a/backend/src/api/mempool-blocks.ts +++ b/backend/src/api/mempool-blocks.ts @@ -350,7 +350,7 @@ class MempoolBlocks { const rustGbt = saveResults ? this.rustGbtGenerator : new GbtGenerator(); try { const { blocks, blockWeights, rates, clusters } = this.convertNapiResultTxids( - await rustGbt.make(Object.values(newMempool) as RustThreadTransaction[]), + await rustGbt.make(Object.values(newMempool) as RustThreadTransaction[], this.nextUid), ); if (saveResults) { this.rustInitialized = true; @@ -372,8 +372,9 @@ class MempoolBlocks { } public async $rustUpdateBlockTemplates(newMempool: { [txid: string]: MempoolTransactionExtended }, mempoolSize: number, added: MempoolTransactionExtended[], removed: MempoolTransactionExtended[]): Promise { - // sanity check to avoid approaching uint32 uid overflow - if (this.nextUid + added.length > MAX_UINT32) { + // GBT optimization requires that uids never get too sparse + // as a sanity check, we should also explicitly prevent uint32 uid overflow + if (this.nextUid + added.length >= Math.min(Math.max(262144, 2 * mempoolSize), MAX_UINT32)) { this.resetRustGbt(); } if (!this.rustInitialized) { @@ -399,6 +400,7 @@ class MempoolBlocks { await this.rustGbtGenerator.update( added as RustThreadTransaction[], removedUids, + this.nextUid, ), ); const resultMempoolSize = blocks.reduce((total, block) => total + block.length, 0);