1
0
mirror of https://github.com/romanz/electrs.git synced 2024-11-19 01:43:29 +01:00

Save on allocations by using fixed size types for database rows (#1043)

This commit is contained in:
Antoni Spaanderman 2024-07-13 08:40:43 +02:00 committed by GitHub
parent 474c016715
commit b101e301dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 154 additions and 93 deletions

View File

@ -57,11 +57,11 @@ impl Chain {
}
/// Load the chain from a collection of headers, up to the given tip
pub(crate) fn load(&mut self, headers: Vec<BlockHeader>, tip: BlockHash) {
pub(crate) fn load(&mut self, headers: impl Iterator<Item = BlockHeader>, tip: BlockHash) {
let genesis_hash = self.headers[0].0;
let header_map: HashMap<BlockHash, BlockHeader> =
headers.into_iter().map(|h| (h.block_hash(), h)).collect();
headers.map(|h| (h.block_hash(), h)).collect();
let mut blockhash = tip;
let mut new_headers: Vec<&BlockHeader> = Vec::with_capacity(header_map.len());
while blockhash != genesis_hash {
@ -202,7 +202,10 @@ hex!("000000200030d7f9c11ef35b89a0eefb9a5e449909339b5e7854d99804ea8d6a49bf900a03
// test loading from a list of headers and tip
let mut regtest = Chain::new(Regtest);
regtest.load(headers.clone(), headers.last().unwrap().block_hash());
regtest.load(
headers.iter().copied(),
headers.last().unwrap().block_hash(),
);
assert_eq!(regtest.height(), headers.len());
// test getters
@ -239,7 +242,10 @@ hex!("000000200030d7f9c11ef35b89a0eefb9a5e449909339b5e7854d99804ea8d6a49bf900a03
// test reorg
let mut regtest = Chain::new(Regtest);
regtest.load(headers.clone(), headers.last().unwrap().block_hash());
regtest.load(
headers.iter().copied(),
headers.last().unwrap().block_hash(),
);
let height = regtest.height();
let new_header: BlockHeader = deserialize(&hex!("000000200030d7f9c11ef35b89a0eefb9a5e449909339b5e7854d99804ea8d6a49bf900a0304d2e55fe0b6415949cff9bca0f88c0717884a5e5797509f89f856af93624a7a6bcc60ffff7f2000000000")).unwrap();

137
src/db.rs
View File

@ -4,15 +4,15 @@ use electrs_rocksdb as rocksdb;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
pub(crate) type Row = Box<[u8]>;
use crate::types::{HashPrefix, SerializedHashPrefixRow, SerializedHeaderRow};
#[derive(Default)]
pub(crate) struct WriteBatch {
pub(crate) tip_row: Row,
pub(crate) header_rows: Vec<Row>,
pub(crate) funding_rows: Vec<Row>,
pub(crate) spending_rows: Vec<Row>,
pub(crate) txid_rows: Vec<Row>,
pub(crate) tip_row: [u8; 32],
pub(crate) header_rows: Vec<SerializedHeaderRow>,
pub(crate) funding_rows: Vec<SerializedHashPrefixRow>,
pub(crate) spending_rows: Vec<SerializedHashPrefixRow>,
pub(crate) txid_rows: Vec<SerializedHashPrefixRow>,
}
impl WriteBatch {
@ -218,39 +218,50 @@ impl DBStore {
self.db.cf_handle(HEADERS_CF).expect("missing HEADERS_CF")
}
pub(crate) fn iter_funding(&self, prefix: Row) -> impl Iterator<Item = Row> + '_ {
pub(crate) fn iter_funding(
&self,
prefix: HashPrefix,
) -> impl Iterator<Item = SerializedHashPrefixRow> + '_ {
self.iter_prefix_cf(self.funding_cf(), prefix)
}
pub(crate) fn iter_spending(&self, prefix: Row) -> impl Iterator<Item = Row> + '_ {
pub(crate) fn iter_spending(
&self,
prefix: HashPrefix,
) -> impl Iterator<Item = SerializedHashPrefixRow> + '_ {
self.iter_prefix_cf(self.spending_cf(), prefix)
}
pub(crate) fn iter_txid(&self, prefix: Row) -> impl Iterator<Item = Row> + '_ {
pub(crate) fn iter_txid(
&self,
prefix: HashPrefix,
) -> impl Iterator<Item = SerializedHashPrefixRow> + '_ {
self.iter_prefix_cf(self.txid_cf(), prefix)
}
fn iter_cf<'a, const N: usize>(
&'a self,
cf: &rocksdb::ColumnFamily,
readopts: rocksdb::ReadOptions,
prefix: Option<HashPrefix>,
) -> impl Iterator<Item = [u8; N]> + '_ {
DBIterator::new(self.db.raw_iterator_cf_opt(cf, readopts), prefix)
}
fn iter_prefix_cf(
&self,
cf: &rocksdb::ColumnFamily,
prefix: Row,
) -> impl Iterator<Item = Row> + '_ {
let mode = rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward);
prefix: HashPrefix,
) -> impl Iterator<Item = SerializedHashPrefixRow> + '_ {
let mut opts = rocksdb::ReadOptions::default();
opts.set_prefix_same_as_start(true); // requires .set_prefix_extractor() above.
self.db
.iterator_cf_opt(cf, opts, mode)
.map(|row| row.expect("prefix iterator failed").0) // values are empty in prefix-scanned CFs
self.iter_cf(cf, opts, Some(prefix))
}
pub(crate) fn read_headers(&self) -> Vec<Row> {
pub(crate) fn iter_headers(&self) -> impl Iterator<Item = SerializedHeaderRow> + '_ {
let mut opts = rocksdb::ReadOptions::default();
opts.fill_cache(false);
self.db
.iterator_cf_opt(self.headers_cf(), opts, rocksdb::IteratorMode::Start)
.map(|row| row.expect("header iterator failed").0) // extract key from row
.filter(|key| &key[..] != TIP_KEY) // headers' rows are longer than TIP_KEY
.collect()
self.iter_cf(self.headers_cf(), opts, None)
}
pub(crate) fn get_tip(&self) -> Option<Vec<u8>> {
@ -273,7 +284,7 @@ impl DBStore {
for key in &batch.header_rows {
db_batch.put_cf(self.headers_cf(), key, b"");
}
db_batch.put_cf(self.headers_cf(), TIP_KEY, &batch.tip_row);
db_batch.put_cf(self.headers_cf(), TIP_KEY, batch.tip_row);
let mut opts = rocksdb::WriteOptions::new();
let bulk_import = self.bulk_import.load(Ordering::Relaxed);
@ -354,6 +365,57 @@ impl DBStore {
}
}
struct DBIterator<'a, const N: usize> {
raw: rocksdb::DBRawIterator<'a>,
prefix: Option<HashPrefix>,
done: bool,
}
impl<'a, const N: usize> DBIterator<'a, N> {
fn new(mut raw: rocksdb::DBRawIterator<'a>, prefix: Option<HashPrefix>) -> Self {
match prefix {
Some(key) => raw.seek(key),
None => raw.seek_to_first(),
};
Self {
raw,
prefix,
done: false,
}
}
}
impl<'a, const N: usize> Iterator for DBIterator<'a, N> {
type Item = [u8; N];
fn next(&mut self) -> Option<Self::Item> {
while !self.done {
let key = match self.raw.key() {
Some(key) => key,
None => {
self.raw.status().expect("DB scan failed");
break; // end of scan
}
};
let prefix_match = match self.prefix {
Some(key_prefix) => key.starts_with(&key_prefix),
None => true,
};
if !prefix_match {
break; // prefix mismatch
}
let result: Option<[u8; N]> = key.try_into().ok();
self.raw.next();
match result {
Some(value) => return Some(value),
None => continue, // skip keys with size != N
}
}
self.done = true;
None
}
}
impl Drop for DBStore {
fn drop(&mut self) {
info!("closing DB at {}", self.db.path().display());
@ -424,31 +486,24 @@ mod tests {
let dir = tempfile::tempdir().unwrap();
let store = DBStore::open(dir.path(), None, true).unwrap();
let items: &[&[u8]] = &[
b"ab",
b"abcdefgh",
b"abcdefghj",
b"abcdefghjk",
b"abcdefghxyz",
b"abcdefgi",
b"b",
b"c",
let items = [
*b"ab ",
*b"abcdefgh ",
*b"abcdefghj ",
*b"abcdefghjk ",
*b"abcdefghxyz ",
*b"abcdefgi ",
*b"b ",
*b"c ",
];
store.write(&WriteBatch {
txid_rows: to_rows(items),
txid_rows: items.to_vec(),
..Default::default()
});
let rows = store.iter_txid(b"abcdefgh".to_vec().into_boxed_slice());
assert_eq!(rows.collect::<Vec<_>>(), to_rows(&items[1..5]));
}
fn to_rows(values: &[&[u8]]) -> Vec<Box<[u8]>> {
values
.iter()
.map(|v| v.to_vec().into_boxed_slice())
.collect()
let rows = store.iter_txid(*b"abcdefgh");
assert_eq!(rows.collect::<Vec<_>>(), items[1..5]);
}
#[test]

View File

@ -1,5 +1,6 @@
use anyhow::{Context, Result};
use bitcoin::consensus::{deserialize, serialize, Decodable};
use bitcoin::consensus::{deserialize, Decodable, Encodable};
use bitcoin::hashes::Hash;
use bitcoin::{BlockHash, OutPoint, Txid};
use bitcoin_slices::{bsl, Visit, Visitor};
use std::ops::ControlFlow;
@ -7,7 +8,7 @@ use std::ops::ControlFlow;
use crate::{
chain::{Chain, NewHeader},
daemon::Daemon,
db::{DBStore, Row, WriteBatch},
db::{DBStore, WriteBatch},
metrics::{self, Gauge, Histogram, Metrics},
signals::ExitFlag,
types::{
@ -48,8 +49,8 @@ impl Stats {
self.update_duration.observe_duration(label, f)
}
fn observe_size(&self, label: &str, rows: &[Row]) {
self.update_size.observe(label, db_rows_size(rows) as f64);
fn observe_size<const N: usize>(&self, label: &str, rows: &[[u8; N]]) {
self.update_size.observe(label, (rows.len() * N) as f64);
}
fn observe_batch(&self, batch: &WriteBatch) {
@ -101,10 +102,8 @@ impl Index {
if let Some(row) = store.get_tip() {
let tip = deserialize(&row).expect("invalid tip");
let headers = store
.read_headers()
.into_iter()
.map(|row| HeaderRow::from_db_row(&row).header)
.collect();
.iter_headers()
.map(|row| HeaderRow::from_db_row(row).header);
chain.load(headers, tip);
chain.drop_last_headers(reindex_last_blocks);
};
@ -141,7 +140,7 @@ impl Index {
pub(crate) fn filter_by_txid(&self, txid: Txid) -> impl Iterator<Item = BlockHash> + '_ {
self.store
.iter_txid(TxidRow::scan_prefix(txid))
.map(|row| HashPrefixRow::from_db_row(&row).height())
.map(|row| HashPrefixRow::from_db_row(row).height())
.filter_map(move |height| self.chain.get_block_hash(height))
}
@ -151,7 +150,7 @@ impl Index {
) -> impl Iterator<Item = BlockHash> + '_ {
self.store
.iter_funding(ScriptHashRow::scan_prefix(scripthash))
.map(|row| HashPrefixRow::from_db_row(&row).height())
.map(|row| HashPrefixRow::from_db_row(row).height())
.filter_map(move |height| self.chain.get_block_hash(height))
}
@ -161,7 +160,7 @@ impl Index {
) -> impl Iterator<Item = BlockHash> + '_ {
self.store
.iter_spending(SpendingPrefixRow::scan_prefix(outpoint))
.map(|row| HashPrefixRow::from_db_row(&row).height())
.map(|row| HashPrefixRow::from_db_row(row).height())
.filter_map(move |height| self.chain.get_block_hash(height))
}
@ -236,10 +235,6 @@ impl Index {
}
}
fn db_rows_size(rows: &[Row]) -> usize {
rows.iter().map(|key| key.len()).sum()
}
fn index_single_block(
block_hash: BlockHash,
block: SerBlock,
@ -292,5 +287,9 @@ fn index_single_block(
let mut index_block = IndexBlockVisitor { batch, height };
bsl::Block::visit(&block, &mut index_block).expect("core returned invalid block");
batch.tip_row = serialize(&block_hash).into_boxed_slice();
let len = block_hash
.consensus_encode(&mut (&mut batch.tip_row as &mut [u8]))
.expect("in-memory writers don't error");
debug_assert_eq!(len, BlockHash::LEN);
}

View File

@ -10,8 +10,6 @@ use bitcoin::{
};
use bitcoin_slices::bsl;
use crate::db;
macro_rules! impl_consensus_encoding {
($thing:ident, $($field:ident),+) => (
impl Encodable for $thing {
@ -39,33 +37,34 @@ macro_rules! impl_consensus_encoding {
);
}
const HASH_PREFIX_LEN: usize = 8;
pub const HASH_PREFIX_LEN: usize = 8;
const HEIGHT_SIZE: usize = 4;
type HashPrefix = [u8; HASH_PREFIX_LEN];
pub(crate) type HashPrefix = [u8; HASH_PREFIX_LEN];
pub(crate) type SerializedHashPrefixRow = [u8; HASH_PREFIX_ROW_SIZE];
type Height = u32;
pub(crate) type SerBlock = Vec<u8>;
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub(crate) struct HashPrefixRow {
prefix: [u8; HASH_PREFIX_LEN],
prefix: HashPrefix,
height: Height, // transaction confirmed height
}
const HASH_PREFIX_ROW_SIZE: usize = HASH_PREFIX_LEN + HEIGHT_SIZE;
pub const HASH_PREFIX_ROW_SIZE: usize = HASH_PREFIX_LEN + HEIGHT_SIZE;
impl HashPrefixRow {
pub(crate) fn to_db_row(&self) -> db::Row {
let mut vec = Vec::with_capacity(HASH_PREFIX_ROW_SIZE);
pub(crate) fn to_db_row(&self) -> SerializedHashPrefixRow {
let mut row = [0; HASH_PREFIX_ROW_SIZE];
let len = self
.consensus_encode(&mut vec)
.consensus_encode(&mut (&mut row as &mut [u8]))
.expect("in-memory writers don't error");
debug_assert_eq!(len, HASH_PREFIX_ROW_SIZE);
vec.into_boxed_slice()
row
}
pub(crate) fn from_db_row(row: &[u8]) -> Self {
deserialize(row).expect("bad HashPrefixRow")
pub(crate) fn from_db_row(row: SerializedHashPrefixRow) -> Self {
deserialize(&row).expect("bad HashPrefixRow")
}
pub fn height(&self) -> usize {
@ -96,8 +95,8 @@ impl ScriptHash {
pub(crate) struct ScriptHashRow;
impl ScriptHashRow {
pub(crate) fn scan_prefix(scripthash: ScriptHash) -> Box<[u8]> {
scripthash.0[..HASH_PREFIX_LEN].to_vec().into_boxed_slice()
pub(crate) fn scan_prefix(scripthash: ScriptHash) -> HashPrefix {
scripthash.0[..HASH_PREFIX_LEN].try_into().unwrap()
}
pub(crate) fn row(scripthash: ScriptHash, height: usize) -> HashPrefixRow {
@ -118,7 +117,7 @@ hash_newtype! {
// ***************************************************************************
fn spending_prefix(prev: OutPoint) -> HashPrefix {
let txid_prefix = <[u8; HASH_PREFIX_LEN]>::try_from(&prev.txid[..HASH_PREFIX_LEN]).unwrap();
let txid_prefix = HashPrefix::try_from(&prev.txid[..HASH_PREFIX_LEN]).unwrap();
let value = u64::from_be_bytes(txid_prefix);
let value = value.wrapping_add(prev.vout.into());
value.to_be_bytes()
@ -127,8 +126,8 @@ fn spending_prefix(prev: OutPoint) -> HashPrefix {
pub(crate) struct SpendingPrefixRow;
impl SpendingPrefixRow {
pub(crate) fn scan_prefix(outpoint: OutPoint) -> Box<[u8]> {
Box::new(spending_prefix(outpoint))
pub(crate) fn scan_prefix(outpoint: OutPoint) -> HashPrefix {
spending_prefix(outpoint)
}
pub(crate) fn row(outpoint: OutPoint, height: usize) -> HashPrefixRow {
@ -150,8 +149,8 @@ fn txid_prefix(txid: &Txid) -> HashPrefix {
pub(crate) struct TxidRow;
impl TxidRow {
pub(crate) fn scan_prefix(txid: Txid) -> Box<[u8]> {
Box::new(txid_prefix(&txid))
pub(crate) fn scan_prefix(txid: Txid) -> HashPrefix {
txid_prefix(&txid)
}
pub(crate) fn row(txid: Txid, height: usize) -> HashPrefixRow {
@ -164,12 +163,14 @@ impl TxidRow {
// ***************************************************************************
pub(crate) type SerializedHeaderRow = [u8; HEADER_ROW_SIZE];
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct HeaderRow {
pub(crate) header: BlockHeader,
}
const HEADER_ROW_SIZE: usize = 80;
pub const HEADER_ROW_SIZE: usize = 80;
impl_consensus_encoding!(HeaderRow, header);
@ -178,17 +179,17 @@ impl HeaderRow {
Self { header }
}
pub(crate) fn to_db_row(&self) -> db::Row {
let mut vec = Vec::with_capacity(HEADER_ROW_SIZE);
pub(crate) fn to_db_row(&self) -> SerializedHeaderRow {
let mut row = [0; HEADER_ROW_SIZE];
let len = self
.consensus_encode(&mut vec)
.consensus_encode(&mut (&mut row as &mut [u8]))
.expect("in-memory writers don't error");
debug_assert_eq!(len, HEADER_ROW_SIZE);
vec.into_boxed_slice()
row
}
pub(crate) fn from_db_row(row: &[u8]) -> Self {
deserialize(row).expect("bad HeaderRow")
pub(crate) fn from_db_row(row: SerializedHeaderRow) -> Self {
deserialize(&row).expect("bad HeaderRow")
}
}
@ -219,8 +220,8 @@ mod tests {
let scripthash: ScriptHash = from_str(hex).unwrap();
let row1 = ScriptHashRow::row(scripthash, 123456);
let db_row = row1.to_db_row();
assert_eq!(&*db_row, &hex!("a384491d38929fcc40e20100"));
let row2 = HashPrefixRow::from_db_row(&db_row);
assert_eq!(db_row, hex!("a384491d38929fcc40e20100"));
let row2 = HashPrefixRow::from_db_row(db_row);
assert_eq!(row1, row2);
}
@ -247,8 +248,8 @@ mod tests {
let row1 = TxidRow::row(txid, 91812);
let row2 = TxidRow::row(txid, 91842);
assert_eq!(&*row1.to_db_row(), &hex!("9985d82954e10f22a4660100"));
assert_eq!(&*row2.to_db_row(), &hex!("9985d82954e10f22c2660100"));
assert_eq!(row1.to_db_row(), hex!("9985d82954e10f22a4660100"));
assert_eq!(row2.to_db_row(), hex!("9985d82954e10f22c2660100"));
}
#[test]
@ -261,8 +262,8 @@ mod tests {
let row2 = TxidRow::row(txid, 91880);
// low-endian encoding => rows should be sorted according to block height
assert_eq!(&*row1.to_db_row(), &hex!("68b45f58b674e94e4a660100"));
assert_eq!(&*row2.to_db_row(), &hex!("68b45f58b674e94ee8660100"));
assert_eq!(row1.to_db_row(), hex!("68b45f58b674e94e4a660100"));
assert_eq!(row2.to_db_row(), hex!("68b45f58b674e94ee8660100"));
}
#[test]