From b101e301dd754295b61d80e5378ff4988067693a Mon Sep 17 00:00:00 2001 From: Antoni Spaanderman <49868160+antonilol@users.noreply.github.com> Date: Sat, 13 Jul 2024 08:40:43 +0200 Subject: [PATCH] Save on allocations by using fixed size types for database rows (#1043) --- src/chain.rs | 14 ++++-- src/db.rs | 137 ++++++++++++++++++++++++++++++++++++--------------- src/index.rs | 31 ++++++------ src/types.rs | 65 ++++++++++++------------ 4 files changed, 154 insertions(+), 93 deletions(-) diff --git a/src/chain.rs b/src/chain.rs index 26496a2..9f17276 100644 --- a/src/chain.rs +++ b/src/chain.rs @@ -57,11 +57,11 @@ impl Chain { } /// Load the chain from a collection of headers, up to the given tip - pub(crate) fn load(&mut self, headers: Vec, tip: BlockHash) { + pub(crate) fn load(&mut self, headers: impl Iterator, tip: BlockHash) { let genesis_hash = self.headers[0].0; let header_map: HashMap = - headers.into_iter().map(|h| (h.block_hash(), h)).collect(); + headers.map(|h| (h.block_hash(), h)).collect(); let mut blockhash = tip; let mut new_headers: Vec<&BlockHeader> = Vec::with_capacity(header_map.len()); while blockhash != genesis_hash { @@ -202,7 +202,10 @@ hex!("000000200030d7f9c11ef35b89a0eefb9a5e449909339b5e7854d99804ea8d6a49bf900a03 // test loading from a list of headers and tip let mut regtest = Chain::new(Regtest); - regtest.load(headers.clone(), headers.last().unwrap().block_hash()); + regtest.load( + headers.iter().copied(), + headers.last().unwrap().block_hash(), + ); assert_eq!(regtest.height(), headers.len()); // test getters @@ -239,7 +242,10 @@ hex!("000000200030d7f9c11ef35b89a0eefb9a5e449909339b5e7854d99804ea8d6a49bf900a03 // test reorg let mut regtest = Chain::new(Regtest); - regtest.load(headers.clone(), headers.last().unwrap().block_hash()); + regtest.load( + headers.iter().copied(), + headers.last().unwrap().block_hash(), + ); let height = regtest.height(); let new_header: BlockHeader = deserialize(&hex!("000000200030d7f9c11ef35b89a0eefb9a5e449909339b5e7854d99804ea8d6a49bf900a0304d2e55fe0b6415949cff9bca0f88c0717884a5e5797509f89f856af93624a7a6bcc60ffff7f2000000000")).unwrap(); diff --git a/src/db.rs b/src/db.rs index f261f03..07e9f9d 100644 --- a/src/db.rs +++ b/src/db.rs @@ -4,15 +4,15 @@ use electrs_rocksdb as rocksdb; use std::path::Path; use std::sync::atomic::{AtomicBool, Ordering}; -pub(crate) type Row = Box<[u8]>; +use crate::types::{HashPrefix, SerializedHashPrefixRow, SerializedHeaderRow}; #[derive(Default)] pub(crate) struct WriteBatch { - pub(crate) tip_row: Row, - pub(crate) header_rows: Vec, - pub(crate) funding_rows: Vec, - pub(crate) spending_rows: Vec, - pub(crate) txid_rows: Vec, + pub(crate) tip_row: [u8; 32], + pub(crate) header_rows: Vec, + pub(crate) funding_rows: Vec, + pub(crate) spending_rows: Vec, + pub(crate) txid_rows: Vec, } impl WriteBatch { @@ -218,39 +218,50 @@ impl DBStore { self.db.cf_handle(HEADERS_CF).expect("missing HEADERS_CF") } - pub(crate) fn iter_funding(&self, prefix: Row) -> impl Iterator + '_ { + pub(crate) fn iter_funding( + &self, + prefix: HashPrefix, + ) -> impl Iterator + '_ { self.iter_prefix_cf(self.funding_cf(), prefix) } - pub(crate) fn iter_spending(&self, prefix: Row) -> impl Iterator + '_ { + pub(crate) fn iter_spending( + &self, + prefix: HashPrefix, + ) -> impl Iterator + '_ { self.iter_prefix_cf(self.spending_cf(), prefix) } - pub(crate) fn iter_txid(&self, prefix: Row) -> impl Iterator + '_ { + pub(crate) fn iter_txid( + &self, + prefix: HashPrefix, + ) -> impl Iterator + '_ { self.iter_prefix_cf(self.txid_cf(), prefix) } + fn iter_cf<'a, const N: usize>( + &'a self, + cf: &rocksdb::ColumnFamily, + readopts: rocksdb::ReadOptions, + prefix: Option, + ) -> impl Iterator + '_ { + DBIterator::new(self.db.raw_iterator_cf_opt(cf, readopts), prefix) + } + fn iter_prefix_cf( &self, cf: &rocksdb::ColumnFamily, - prefix: Row, - ) -> impl Iterator + '_ { - let mode = rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward); + prefix: HashPrefix, + ) -> impl Iterator + '_ { let mut opts = rocksdb::ReadOptions::default(); opts.set_prefix_same_as_start(true); // requires .set_prefix_extractor() above. - self.db - .iterator_cf_opt(cf, opts, mode) - .map(|row| row.expect("prefix iterator failed").0) // values are empty in prefix-scanned CFs + self.iter_cf(cf, opts, Some(prefix)) } - pub(crate) fn read_headers(&self) -> Vec { + pub(crate) fn iter_headers(&self) -> impl Iterator + '_ { let mut opts = rocksdb::ReadOptions::default(); opts.fill_cache(false); - self.db - .iterator_cf_opt(self.headers_cf(), opts, rocksdb::IteratorMode::Start) - .map(|row| row.expect("header iterator failed").0) // extract key from row - .filter(|key| &key[..] != TIP_KEY) // headers' rows are longer than TIP_KEY - .collect() + self.iter_cf(self.headers_cf(), opts, None) } pub(crate) fn get_tip(&self) -> Option> { @@ -273,7 +284,7 @@ impl DBStore { for key in &batch.header_rows { db_batch.put_cf(self.headers_cf(), key, b""); } - db_batch.put_cf(self.headers_cf(), TIP_KEY, &batch.tip_row); + db_batch.put_cf(self.headers_cf(), TIP_KEY, batch.tip_row); let mut opts = rocksdb::WriteOptions::new(); let bulk_import = self.bulk_import.load(Ordering::Relaxed); @@ -354,6 +365,57 @@ impl DBStore { } } +struct DBIterator<'a, const N: usize> { + raw: rocksdb::DBRawIterator<'a>, + prefix: Option, + done: bool, +} + +impl<'a, const N: usize> DBIterator<'a, N> { + fn new(mut raw: rocksdb::DBRawIterator<'a>, prefix: Option) -> Self { + match prefix { + Some(key) => raw.seek(key), + None => raw.seek_to_first(), + }; + Self { + raw, + prefix, + done: false, + } + } +} + +impl<'a, const N: usize> Iterator for DBIterator<'a, N> { + type Item = [u8; N]; + + fn next(&mut self) -> Option { + while !self.done { + let key = match self.raw.key() { + Some(key) => key, + None => { + self.raw.status().expect("DB scan failed"); + break; // end of scan + } + }; + let prefix_match = match self.prefix { + Some(key_prefix) => key.starts_with(&key_prefix), + None => true, + }; + if !prefix_match { + break; // prefix mismatch + } + let result: Option<[u8; N]> = key.try_into().ok(); + self.raw.next(); + match result { + Some(value) => return Some(value), + None => continue, // skip keys with size != N + } + } + self.done = true; + None + } +} + impl Drop for DBStore { fn drop(&mut self) { info!("closing DB at {}", self.db.path().display()); @@ -424,31 +486,24 @@ mod tests { let dir = tempfile::tempdir().unwrap(); let store = DBStore::open(dir.path(), None, true).unwrap(); - let items: &[&[u8]] = &[ - b"ab", - b"abcdefgh", - b"abcdefghj", - b"abcdefghjk", - b"abcdefghxyz", - b"abcdefgi", - b"b", - b"c", + let items = [ + *b"ab ", + *b"abcdefgh ", + *b"abcdefghj ", + *b"abcdefghjk ", + *b"abcdefghxyz ", + *b"abcdefgi ", + *b"b ", + *b"c ", ]; store.write(&WriteBatch { - txid_rows: to_rows(items), + txid_rows: items.to_vec(), ..Default::default() }); - let rows = store.iter_txid(b"abcdefgh".to_vec().into_boxed_slice()); - assert_eq!(rows.collect::>(), to_rows(&items[1..5])); - } - - fn to_rows(values: &[&[u8]]) -> Vec> { - values - .iter() - .map(|v| v.to_vec().into_boxed_slice()) - .collect() + let rows = store.iter_txid(*b"abcdefgh"); + assert_eq!(rows.collect::>(), items[1..5]); } #[test] diff --git a/src/index.rs b/src/index.rs index e4d2a6a..5a22dc6 100644 --- a/src/index.rs +++ b/src/index.rs @@ -1,5 +1,6 @@ use anyhow::{Context, Result}; -use bitcoin::consensus::{deserialize, serialize, Decodable}; +use bitcoin::consensus::{deserialize, Decodable, Encodable}; +use bitcoin::hashes::Hash; use bitcoin::{BlockHash, OutPoint, Txid}; use bitcoin_slices::{bsl, Visit, Visitor}; use std::ops::ControlFlow; @@ -7,7 +8,7 @@ use std::ops::ControlFlow; use crate::{ chain::{Chain, NewHeader}, daemon::Daemon, - db::{DBStore, Row, WriteBatch}, + db::{DBStore, WriteBatch}, metrics::{self, Gauge, Histogram, Metrics}, signals::ExitFlag, types::{ @@ -48,8 +49,8 @@ impl Stats { self.update_duration.observe_duration(label, f) } - fn observe_size(&self, label: &str, rows: &[Row]) { - self.update_size.observe(label, db_rows_size(rows) as f64); + fn observe_size(&self, label: &str, rows: &[[u8; N]]) { + self.update_size.observe(label, (rows.len() * N) as f64); } fn observe_batch(&self, batch: &WriteBatch) { @@ -101,10 +102,8 @@ impl Index { if let Some(row) = store.get_tip() { let tip = deserialize(&row).expect("invalid tip"); let headers = store - .read_headers() - .into_iter() - .map(|row| HeaderRow::from_db_row(&row).header) - .collect(); + .iter_headers() + .map(|row| HeaderRow::from_db_row(row).header); chain.load(headers, tip); chain.drop_last_headers(reindex_last_blocks); }; @@ -141,7 +140,7 @@ impl Index { pub(crate) fn filter_by_txid(&self, txid: Txid) -> impl Iterator + '_ { self.store .iter_txid(TxidRow::scan_prefix(txid)) - .map(|row| HashPrefixRow::from_db_row(&row).height()) + .map(|row| HashPrefixRow::from_db_row(row).height()) .filter_map(move |height| self.chain.get_block_hash(height)) } @@ -151,7 +150,7 @@ impl Index { ) -> impl Iterator + '_ { self.store .iter_funding(ScriptHashRow::scan_prefix(scripthash)) - .map(|row| HashPrefixRow::from_db_row(&row).height()) + .map(|row| HashPrefixRow::from_db_row(row).height()) .filter_map(move |height| self.chain.get_block_hash(height)) } @@ -161,7 +160,7 @@ impl Index { ) -> impl Iterator + '_ { self.store .iter_spending(SpendingPrefixRow::scan_prefix(outpoint)) - .map(|row| HashPrefixRow::from_db_row(&row).height()) + .map(|row| HashPrefixRow::from_db_row(row).height()) .filter_map(move |height| self.chain.get_block_hash(height)) } @@ -236,10 +235,6 @@ impl Index { } } -fn db_rows_size(rows: &[Row]) -> usize { - rows.iter().map(|key| key.len()).sum() -} - fn index_single_block( block_hash: BlockHash, block: SerBlock, @@ -292,5 +287,9 @@ fn index_single_block( let mut index_block = IndexBlockVisitor { batch, height }; bsl::Block::visit(&block, &mut index_block).expect("core returned invalid block"); - batch.tip_row = serialize(&block_hash).into_boxed_slice(); + + let len = block_hash + .consensus_encode(&mut (&mut batch.tip_row as &mut [u8])) + .expect("in-memory writers don't error"); + debug_assert_eq!(len, BlockHash::LEN); } diff --git a/src/types.rs b/src/types.rs index ca7f7b3..2d33949 100644 --- a/src/types.rs +++ b/src/types.rs @@ -10,8 +10,6 @@ use bitcoin::{ }; use bitcoin_slices::bsl; -use crate::db; - macro_rules! impl_consensus_encoding { ($thing:ident, $($field:ident),+) => ( impl Encodable for $thing { @@ -39,33 +37,34 @@ macro_rules! impl_consensus_encoding { ); } -const HASH_PREFIX_LEN: usize = 8; +pub const HASH_PREFIX_LEN: usize = 8; const HEIGHT_SIZE: usize = 4; -type HashPrefix = [u8; HASH_PREFIX_LEN]; +pub(crate) type HashPrefix = [u8; HASH_PREFIX_LEN]; +pub(crate) type SerializedHashPrefixRow = [u8; HASH_PREFIX_ROW_SIZE]; type Height = u32; pub(crate) type SerBlock = Vec; #[derive(Debug, Serialize, Deserialize, PartialEq)] pub(crate) struct HashPrefixRow { - prefix: [u8; HASH_PREFIX_LEN], + prefix: HashPrefix, height: Height, // transaction confirmed height } -const HASH_PREFIX_ROW_SIZE: usize = HASH_PREFIX_LEN + HEIGHT_SIZE; +pub const HASH_PREFIX_ROW_SIZE: usize = HASH_PREFIX_LEN + HEIGHT_SIZE; impl HashPrefixRow { - pub(crate) fn to_db_row(&self) -> db::Row { - let mut vec = Vec::with_capacity(HASH_PREFIX_ROW_SIZE); + pub(crate) fn to_db_row(&self) -> SerializedHashPrefixRow { + let mut row = [0; HASH_PREFIX_ROW_SIZE]; let len = self - .consensus_encode(&mut vec) + .consensus_encode(&mut (&mut row as &mut [u8])) .expect("in-memory writers don't error"); debug_assert_eq!(len, HASH_PREFIX_ROW_SIZE); - vec.into_boxed_slice() + row } - pub(crate) fn from_db_row(row: &[u8]) -> Self { - deserialize(row).expect("bad HashPrefixRow") + pub(crate) fn from_db_row(row: SerializedHashPrefixRow) -> Self { + deserialize(&row).expect("bad HashPrefixRow") } pub fn height(&self) -> usize { @@ -96,8 +95,8 @@ impl ScriptHash { pub(crate) struct ScriptHashRow; impl ScriptHashRow { - pub(crate) fn scan_prefix(scripthash: ScriptHash) -> Box<[u8]> { - scripthash.0[..HASH_PREFIX_LEN].to_vec().into_boxed_slice() + pub(crate) fn scan_prefix(scripthash: ScriptHash) -> HashPrefix { + scripthash.0[..HASH_PREFIX_LEN].try_into().unwrap() } pub(crate) fn row(scripthash: ScriptHash, height: usize) -> HashPrefixRow { @@ -118,7 +117,7 @@ hash_newtype! { // *************************************************************************** fn spending_prefix(prev: OutPoint) -> HashPrefix { - let txid_prefix = <[u8; HASH_PREFIX_LEN]>::try_from(&prev.txid[..HASH_PREFIX_LEN]).unwrap(); + let txid_prefix = HashPrefix::try_from(&prev.txid[..HASH_PREFIX_LEN]).unwrap(); let value = u64::from_be_bytes(txid_prefix); let value = value.wrapping_add(prev.vout.into()); value.to_be_bytes() @@ -127,8 +126,8 @@ fn spending_prefix(prev: OutPoint) -> HashPrefix { pub(crate) struct SpendingPrefixRow; impl SpendingPrefixRow { - pub(crate) fn scan_prefix(outpoint: OutPoint) -> Box<[u8]> { - Box::new(spending_prefix(outpoint)) + pub(crate) fn scan_prefix(outpoint: OutPoint) -> HashPrefix { + spending_prefix(outpoint) } pub(crate) fn row(outpoint: OutPoint, height: usize) -> HashPrefixRow { @@ -150,8 +149,8 @@ fn txid_prefix(txid: &Txid) -> HashPrefix { pub(crate) struct TxidRow; impl TxidRow { - pub(crate) fn scan_prefix(txid: Txid) -> Box<[u8]> { - Box::new(txid_prefix(&txid)) + pub(crate) fn scan_prefix(txid: Txid) -> HashPrefix { + txid_prefix(&txid) } pub(crate) fn row(txid: Txid, height: usize) -> HashPrefixRow { @@ -164,12 +163,14 @@ impl TxidRow { // *************************************************************************** +pub(crate) type SerializedHeaderRow = [u8; HEADER_ROW_SIZE]; + #[derive(Debug, Serialize, Deserialize)] pub(crate) struct HeaderRow { pub(crate) header: BlockHeader, } -const HEADER_ROW_SIZE: usize = 80; +pub const HEADER_ROW_SIZE: usize = 80; impl_consensus_encoding!(HeaderRow, header); @@ -178,17 +179,17 @@ impl HeaderRow { Self { header } } - pub(crate) fn to_db_row(&self) -> db::Row { - let mut vec = Vec::with_capacity(HEADER_ROW_SIZE); + pub(crate) fn to_db_row(&self) -> SerializedHeaderRow { + let mut row = [0; HEADER_ROW_SIZE]; let len = self - .consensus_encode(&mut vec) + .consensus_encode(&mut (&mut row as &mut [u8])) .expect("in-memory writers don't error"); debug_assert_eq!(len, HEADER_ROW_SIZE); - vec.into_boxed_slice() + row } - pub(crate) fn from_db_row(row: &[u8]) -> Self { - deserialize(row).expect("bad HeaderRow") + pub(crate) fn from_db_row(row: SerializedHeaderRow) -> Self { + deserialize(&row).expect("bad HeaderRow") } } @@ -219,8 +220,8 @@ mod tests { let scripthash: ScriptHash = from_str(hex).unwrap(); let row1 = ScriptHashRow::row(scripthash, 123456); let db_row = row1.to_db_row(); - assert_eq!(&*db_row, &hex!("a384491d38929fcc40e20100")); - let row2 = HashPrefixRow::from_db_row(&db_row); + assert_eq!(db_row, hex!("a384491d38929fcc40e20100")); + let row2 = HashPrefixRow::from_db_row(db_row); assert_eq!(row1, row2); } @@ -247,8 +248,8 @@ mod tests { let row1 = TxidRow::row(txid, 91812); let row2 = TxidRow::row(txid, 91842); - assert_eq!(&*row1.to_db_row(), &hex!("9985d82954e10f22a4660100")); - assert_eq!(&*row2.to_db_row(), &hex!("9985d82954e10f22c2660100")); + assert_eq!(row1.to_db_row(), hex!("9985d82954e10f22a4660100")); + assert_eq!(row2.to_db_row(), hex!("9985d82954e10f22c2660100")); } #[test] @@ -261,8 +262,8 @@ mod tests { let row2 = TxidRow::row(txid, 91880); // low-endian encoding => rows should be sorted according to block height - assert_eq!(&*row1.to_db_row(), &hex!("68b45f58b674e94e4a660100")); - assert_eq!(&*row2.to_db_row(), &hex!("68b45f58b674e94ee8660100")); + assert_eq!(row1.to_db_row(), hex!("68b45f58b674e94e4a660100")); + assert_eq!(row2.to_db_row(), hex!("68b45f58b674e94ee8660100")); } #[test]