replace audit_pool hashmap with a vec

This commit is contained in:
Mononaut 2023-07-03 22:01:54 -04:00
parent cfa2363743
commit 8fdc44aa89
No known key found for this signature in database
GPG key ID: A3F058E41374C04E
5 changed files with 65 additions and 51 deletions

View file

@ -19,13 +19,13 @@ export class GbtGenerator {
* *
* Rejects if the thread panics or if the Mutex is poisoned. * Rejects if the thread panics or if the Mutex is poisoned.
*/ */
make(mempool: Array<ThreadTransaction>): Promise<GbtResult> make(mempool: Array<ThreadTransaction>, maxUid: number): Promise<GbtResult>
/** /**
* # Errors * # Errors
* *
* Rejects if the thread panics or if the Mutex is poisoned. * Rejects if the thread panics or if the Mutex is poisoned.
*/ */
update(newTxs: Array<ThreadTransaction>, removeTxs: Array<number>): Promise<GbtResult> update(newTxs: Array<ThreadTransaction>, removeTxs: Array<number>, maxUid: number): Promise<GbtResult>
} }
/** /**
* The result from calling the gbt function. * The result from calling the gbt function.

View file

@ -1,14 +1,14 @@
use priority_queue::PriorityQueue; use priority_queue::PriorityQueue;
use std::{ use std::{
cmp::Ordering, cmp::Ordering,
collections::{HashMap, HashSet}, collections::{HashSet},
}; };
use tracing::{info, trace}; use tracing::{info, trace};
use crate::{ use crate::{
audit_transaction::{partial_cmp_uid_score, AuditTransaction}, audit_transaction::{partial_cmp_uid_score, AuditTransaction},
u32_hasher_types::{ u32_hasher_types::{
u32hashmap_with_capacity, u32hashset_new, u32priority_queue_with_capacity, U32HasherState, u32hashset_new, u32priority_queue_with_capacity, U32HasherState,
}, },
GbtResult, ThreadTransactionsMap, GbtResult, ThreadTransactionsMap,
}; };
@ -19,7 +19,7 @@ const BLOCK_RESERVED_WEIGHT: u32 = 4_000;
const BLOCK_RESERVED_SIGOPS: u32 = 400; const BLOCK_RESERVED_SIGOPS: u32 = 400;
const MAX_BLOCKS: usize = 8; const MAX_BLOCKS: usize = 8;
type AuditPool = HashMap<u32, AuditTransaction, U32HasherState>; type AuditPool = Vec<Option<AuditTransaction>>;
type ModifiedQueue = PriorityQueue<u32, TxPriority, U32HasherState>; type ModifiedQueue = PriorityQueue<u32, TxPriority, U32HasherState>;
#[derive(Debug)] #[derive(Debug)]
@ -58,9 +58,10 @@ impl Ord for TxPriority {
// TODO: Make gbt smaller to fix these lints. // TODO: Make gbt smaller to fix these lints.
#[allow(clippy::too_many_lines)] #[allow(clippy::too_many_lines)]
#[allow(clippy::cognitive_complexity)] #[allow(clippy::cognitive_complexity)]
pub fn gbt(mempool: &mut ThreadTransactionsMap) -> GbtResult { pub fn gbt(mempool: &mut ThreadTransactionsMap, max_uid: usize) -> GbtResult {
let mempool_len = mempool.len(); let mempool_len = mempool.len();
let mut audit_pool: AuditPool = u32hashmap_with_capacity(mempool_len); let mut audit_pool: AuditPool = Vec::with_capacity(max_uid + 1);
audit_pool.resize(max_uid + 1, None);
let mut mempool_stack: Vec<u32> = Vec::with_capacity(mempool_len); let mut mempool_stack: Vec<u32> = Vec::with_capacity(mempool_len);
let mut clusters: Vec<Vec<u32>> = Vec::new(); let mut clusters: Vec<Vec<u32>> = Vec::new();
let mut block_weights: Vec<u32> = Vec::new(); let mut block_weights: Vec<u32> = Vec::new();
@ -69,7 +70,7 @@ pub fn gbt(mempool: &mut ThreadTransactionsMap) -> GbtResult {
for (uid, tx) in &mut *mempool { for (uid, tx) in &mut *mempool {
let audit_tx = AuditTransaction::from_thread_transaction(tx); let audit_tx = AuditTransaction::from_thread_transaction(tx);
// Safety: audit_pool and mempool_stack must always contain the same transactions // Safety: audit_pool and mempool_stack must always contain the same transactions
audit_pool.insert(audit_tx.uid, audit_tx); audit_pool[*uid as usize] = Some(audit_tx);
mempool_stack.push(*uid); mempool_stack.push(*uid);
} }
@ -84,7 +85,8 @@ pub fn gbt(mempool: &mut ThreadTransactionsMap) -> GbtResult {
.into_iter() .into_iter()
.map(|txid| { .map(|txid| {
let atx = audit_pool let atx = audit_pool
.get(&txid) .get(txid as usize)
.and_then(Option::as_ref)
.expect("All txids are from audit_pool"); .expect("All txids are from audit_pool");
(txid, atx.order(), atx.score()) (txid, atx.order(), atx.score())
}) })
@ -154,7 +156,7 @@ pub fn gbt(mempool: &mut ThreadTransactionsMap) -> GbtResult {
let mut cluster: Vec<u32> = Vec::new(); let mut cluster: Vec<u32> = Vec::new();
let is_cluster: bool = !next_tx.ancestors.is_empty(); let is_cluster: bool = !next_tx.ancestors.is_empty();
for ancestor_id in &next_tx.ancestors { for ancestor_id in &next_tx.ancestors {
if let Some(ancestor) = audit_pool.get(ancestor_id) { if let Some(Some(ancestor)) = audit_pool.get(*ancestor_id as usize) {
package.push((*ancestor_id, ancestor.order(), ancestor.ancestors.len())); package.push((*ancestor_id, ancestor.order(), ancestor.ancestors.len()));
} }
} }
@ -176,7 +178,7 @@ pub fn gbt(mempool: &mut ThreadTransactionsMap) -> GbtResult {
for (txid, _, _) in &package { for (txid, _, _) in &package {
cluster.push(*txid); cluster.push(*txid);
if let Some(tx) = audit_pool.get_mut(txid) { if let Some(Some(tx)) = audit_pool.get_mut(*txid as usize) {
tx.used = true; tx.used = true;
tx.set_dirty_if_different(cluster_rate); tx.set_dirty_if_different(cluster_rate);
transactions.push(tx.uid); transactions.push(tx.uid);
@ -211,7 +213,7 @@ pub fn gbt(mempool: &mut ThreadTransactionsMap) -> GbtResult {
// 'overflow' packages didn't fit in this block, but are valid candidates for the next // 'overflow' packages didn't fit in this block, but are valid candidates for the next
overflow.reverse(); overflow.reverse();
for overflowed in &overflow { for overflowed in &overflow {
if let Some(overflowed_tx) = audit_pool.get(overflowed) { if let Some(Some(overflowed_tx)) = audit_pool.get(*overflowed as usize) {
if overflowed_tx.modified { if overflowed_tx.modified {
modified.push( modified.push(
*overflowed, *overflowed,
@ -237,12 +239,12 @@ pub fn gbt(mempool: &mut ThreadTransactionsMap) -> GbtResult {
info!("make a list of dirty transactions and their new rates"); info!("make a list of dirty transactions and their new rates");
let mut rates: Vec<Vec<f64>> = Vec::new(); let mut rates: Vec<Vec<f64>> = Vec::new();
for (txid, tx) in audit_pool { for (uid, thread_tx) in mempool {
trace!("txid: {}, is_dirty: {}", txid, tx.dirty); if let Some(Some(audit_tx)) = audit_pool.get(*uid as usize) {
if tx.dirty { trace!("txid: {}, is_dirty: {}", uid, audit_tx.dirty);
rates.push(vec![f64::from(txid), tx.effective_fee_per_vsize]); if audit_tx.dirty {
if let Some(mempool_tx) = mempool.get_mut(&txid) { rates.push(vec![f64::from(*uid), audit_tx.effective_fee_per_vsize]);
mempool_tx.effective_fee_per_vsize = tx.effective_fee_per_vsize; thread_tx.effective_fee_per_vsize = audit_tx.effective_fee_per_vsize;
} }
} }
} }
@ -263,33 +265,39 @@ fn next_valid_from_stack<'a>(
mempool_stack: &mut Vec<u32>, mempool_stack: &mut Vec<u32>,
audit_pool: &'a AuditPool, audit_pool: &'a AuditPool,
) -> Option<&'a AuditTransaction> { ) -> Option<&'a AuditTransaction> {
let mut next_txid = mempool_stack.last()?; while let Some(next_txid) = mempool_stack.last() {
let mut tx: &AuditTransaction = audit_pool.get(next_txid)?; match audit_pool.get(*next_txid as usize) {
while tx.used || tx.modified { Some(Some(tx)) if !tx.used && !tx.modified => {
mempool_stack.pop(); return Some(tx);
next_txid = mempool_stack.last()?; }
tx = audit_pool.get(next_txid)?; _ => {
mempool_stack.pop();
}
}
} }
Some(tx) None
} }
fn next_valid_from_queue<'a>( fn next_valid_from_queue<'a>(
queue: &mut ModifiedQueue, queue: &mut ModifiedQueue,
audit_pool: &'a AuditPool, audit_pool: &'a AuditPool,
) -> Option<&'a AuditTransaction> { ) -> Option<&'a AuditTransaction> {
let mut next_txid = queue.peek()?.0; while let Some((next_txid, _)) = queue.peek() {
let mut tx: &AuditTransaction = audit_pool.get(next_txid)?; match audit_pool.get(*next_txid as usize) {
while tx.used { Some(Some(tx)) if !tx.used => {
queue.pop(); return Some(tx);
next_txid = queue.peek()?.0; }
tx = audit_pool.get(next_txid)?; _ => {
queue.pop();
}
}
} }
Some(tx) None
} }
fn set_relatives(txid: u32, audit_pool: &mut AuditPool) { fn set_relatives(txid: u32, audit_pool: &mut AuditPool) {
let mut parents: HashSet<u32, U32HasherState> = u32hashset_new(); let mut parents: HashSet<u32, U32HasherState> = u32hashset_new();
if let Some(tx) = audit_pool.get(&txid) { if let Some(Some(tx)) = audit_pool.get(txid as usize) {
if tx.relatives_set_flag { if tx.relatives_set_flag {
return; return;
} }
@ -304,7 +312,7 @@ fn set_relatives(txid: u32, audit_pool: &mut AuditPool) {
for parent_id in &parents { for parent_id in &parents {
set_relatives(*parent_id, audit_pool); set_relatives(*parent_id, audit_pool);
if let Some(parent) = audit_pool.get_mut(parent_id) { if let Some(Some(parent)) = audit_pool.get_mut(*parent_id as usize) {
// Safety: ancestors must always contain only txes in audit_pool // Safety: ancestors must always contain only txes in audit_pool
ancestors.insert(*parent_id); ancestors.insert(*parent_id);
parent.children.insert(txid); parent.children.insert(txid);
@ -320,16 +328,16 @@ fn set_relatives(txid: u32, audit_pool: &mut AuditPool) {
let mut total_sigops: u32 = 0; let mut total_sigops: u32 = 0;
for ancestor_id in &ancestors { for ancestor_id in &ancestors {
let ancestor = audit_pool let Some(ancestor) = audit_pool
.get(ancestor_id) .get(*ancestor_id as usize)
.expect("audit_pool contains all ancestors"); .expect("audit_pool contains all ancestors") else { todo!() };
total_fee += ancestor.fee; total_fee += ancestor.fee;
total_sigop_adjusted_weight += ancestor.sigop_adjusted_weight; total_sigop_adjusted_weight += ancestor.sigop_adjusted_weight;
total_sigop_adjusted_vsize += ancestor.sigop_adjusted_vsize; total_sigop_adjusted_vsize += ancestor.sigop_adjusted_vsize;
total_sigops += ancestor.sigops; total_sigops += ancestor.sigops;
} }
if let Some(tx) = audit_pool.get_mut(&txid) { if let Some(Some(tx)) = audit_pool.get_mut(txid as usize) {
tx.set_ancestors( tx.set_ancestors(
ancestors, ancestors,
total_fee, total_fee,
@ -353,7 +361,7 @@ fn update_descendants(
let root_sigop_adjusted_weight: u32; let root_sigop_adjusted_weight: u32;
let root_sigop_adjusted_vsize: u32; let root_sigop_adjusted_vsize: u32;
let root_sigops: u32; let root_sigops: u32;
if let Some(root_tx) = audit_pool.get(&root_txid) { if let Some(Some(root_tx)) = audit_pool.get(root_txid as usize) {
for descendant_id in &root_tx.children { for descendant_id in &root_tx.children {
if !visited.contains(descendant_id) { if !visited.contains(descendant_id) {
descendant_stack.push(*descendant_id); descendant_stack.push(*descendant_id);
@ -368,7 +376,7 @@ fn update_descendants(
return; return;
} }
while let Some(next_txid) = descendant_stack.pop() { while let Some(next_txid) = descendant_stack.pop() {
if let Some(descendant) = audit_pool.get_mut(&next_txid) { if let Some(Some(descendant)) = audit_pool.get_mut(next_txid as usize) {
// remove root tx as ancestor // remove root tx as ancestor
let old_score = descendant.remove_root( let old_score = descendant.remove_root(
root_txid, root_txid,

View file

@ -74,9 +74,9 @@ impl GbtGenerator {
/// ///
/// Rejects if the thread panics or if the Mutex is poisoned. /// Rejects if the thread panics or if the Mutex is poisoned.
#[napi] #[napi]
pub async fn make(&self, mempool: Vec<ThreadTransaction>) -> Result<GbtResult> { pub async fn make(&self, mempool: Vec<ThreadTransaction>, max_uid: u32) -> Result<GbtResult> {
trace!("make: Current State {:#?}", self.thread_transactions); trace!("make: Current State {:#?}", self.thread_transactions);
run_task(Arc::clone(&self.thread_transactions), move |map| { run_task(Arc::clone(&self.thread_transactions), max_uid as usize, move |map| {
for tx in mempool { for tx in mempool {
map.insert(tx.uid, tx); map.insert(tx.uid, tx);
} }
@ -92,9 +92,10 @@ impl GbtGenerator {
&self, &self,
new_txs: Vec<ThreadTransaction>, new_txs: Vec<ThreadTransaction>,
remove_txs: Vec<u32>, remove_txs: Vec<u32>,
max_uid: u32,
) -> Result<GbtResult> { ) -> Result<GbtResult> {
trace!("update: Current State {:#?}", self.thread_transactions); trace!("update: Current State {:#?}", self.thread_transactions);
run_task(Arc::clone(&self.thread_transactions), move |map| { run_task(Arc::clone(&self.thread_transactions), max_uid as usize, move |map| {
for tx in new_txs { for tx in new_txs {
map.insert(tx.uid, tx); map.insert(tx.uid, tx);
} }
@ -132,6 +133,7 @@ pub struct GbtResult {
/// to the `HashMap` as the only argument. (A move closure is recommended to meet the bounds) /// to the `HashMap` as the only argument. (A move closure is recommended to meet the bounds)
async fn run_task<F>( async fn run_task<F>(
thread_transactions: Arc<Mutex<ThreadTransactionsMap>>, thread_transactions: Arc<Mutex<ThreadTransactionsMap>>,
max_uid: usize,
callback: F, callback: F,
) -> Result<GbtResult> ) -> Result<GbtResult>
where where
@ -149,7 +151,7 @@ where
callback(&mut map); callback(&mut map);
info!("Starting gbt algorithm for {} elements...", map.len()); info!("Starting gbt algorithm for {} elements...", map.len());
let result = gbt::gbt(&mut map); let result = gbt::gbt(&mut map, max_uid);
info!("Finished gbt algorithm for {} elements...", map.len()); info!("Finished gbt algorithm for {} elements...", map.len());
debug!( debug!(

View file

@ -14,8 +14,8 @@ const vectorBuffer: Buffer = fs.readFileSync(path.join(__dirname, './', './test-
describe('Rust GBT', () => { describe('Rust GBT', () => {
test('should produce the same template as getBlockTemplate from Bitcoin Core', async () => { test('should produce the same template as getBlockTemplate from Bitcoin Core', async () => {
const rustGbt = new GbtGenerator(); const rustGbt = new GbtGenerator();
const mempool = mempoolFromArrayBuffer(vectorBuffer.buffer); const { mempool, maxUid } = mempoolFromArrayBuffer(vectorBuffer.buffer);
const result = await rustGbt.make(mempool); const result = await rustGbt.make(mempool, maxUid);
const blocks: [string, number][][] = result.blocks.map(block => { const blocks: [string, number][][] = result.blocks.map(block => {
return block.map(uid => [vectorUidMap.get(uid) || 'missing', uid]); return block.map(uid => [vectorUidMap.get(uid) || 'missing', uid]);
@ -27,13 +27,15 @@ describe('Rust GBT', () => {
}); });
}); });
function mempoolFromArrayBuffer(buf: ArrayBuffer): ThreadTransaction[] { function mempoolFromArrayBuffer(buf: ArrayBuffer): { mempool: ThreadTransaction[], maxUid: number } {
let maxUid = 0;
const view = new DataView(buf); const view = new DataView(buf);
const count = view.getUint32(0, false); const count = view.getUint32(0, false);
const txs: ThreadTransaction[] = []; const txs: ThreadTransaction[] = [];
let offset = 4; let offset = 4;
for (let i = 0; i < count; i++) { for (let i = 0; i < count; i++) {
const uid = view.getUint32(offset, false); const uid = view.getUint32(offset, false);
maxUid = Math.max(maxUid, uid);
const tx: ThreadTransaction = { const tx: ThreadTransaction = {
uid, uid,
order: txidToOrdering(vectorUidMap.get(uid) as string), order: txidToOrdering(vectorUidMap.get(uid) as string),
@ -52,7 +54,7 @@ function mempoolFromArrayBuffer(buf: ArrayBuffer): ThreadTransaction[] {
} }
txs.push(tx); txs.push(tx);
} }
return txs; return { mempool: txs, maxUid };
} }
function txidToOrdering(txid: string): number { function txidToOrdering(txid: string): number {

View file

@ -350,7 +350,7 @@ class MempoolBlocks {
const rustGbt = saveResults ? this.rustGbtGenerator : new GbtGenerator(); const rustGbt = saveResults ? this.rustGbtGenerator : new GbtGenerator();
try { try {
const { blocks, blockWeights, rates, clusters } = this.convertNapiResultTxids( const { blocks, blockWeights, rates, clusters } = this.convertNapiResultTxids(
await rustGbt.make(Object.values(newMempool) as RustThreadTransaction[]), await rustGbt.make(Object.values(newMempool) as RustThreadTransaction[], this.nextUid),
); );
if (saveResults) { if (saveResults) {
this.rustInitialized = true; this.rustInitialized = true;
@ -372,8 +372,9 @@ class MempoolBlocks {
} }
public async $rustUpdateBlockTemplates(newMempool: { [txid: string]: MempoolTransactionExtended }, mempoolSize: number, added: MempoolTransactionExtended[], removed: MempoolTransactionExtended[]): Promise<void> { public async $rustUpdateBlockTemplates(newMempool: { [txid: string]: MempoolTransactionExtended }, mempoolSize: number, added: MempoolTransactionExtended[], removed: MempoolTransactionExtended[]): Promise<void> {
// sanity check to avoid approaching uint32 uid overflow // GBT optimization requires that uids never get too sparse
if (this.nextUid + added.length > MAX_UINT32) { // as a sanity check, we should also explicitly prevent uint32 uid overflow
if (this.nextUid + added.length >= Math.min(Math.max(262144, 2 * mempoolSize), MAX_UINT32)) {
this.resetRustGbt(); this.resetRustGbt();
} }
if (!this.rustInitialized) { if (!this.rustInitialized) {
@ -399,6 +400,7 @@ class MempoolBlocks {
await this.rustGbtGenerator.update( await this.rustGbtGenerator.update(
added as RustThreadTransaction[], added as RustThreadTransaction[],
removedUids, removedUids,
this.nextUid,
), ),
); );
const resultMempoolSize = blocks.reduce((total, block) => total + block.length, 0); const resultMempoolSize = blocks.reduce((total, block) => total + block.length, 0);