mirror of
https://github.com/romanz/electrs.git
synced 2024-11-19 01:43:29 +01:00
Fix a few DB-related issues
This commit is contained in:
parent
231a943fad
commit
8845713acd
100
src/db.rs
100
src/db.rs
@ -1,7 +1,6 @@
|
||||
use anyhow::{Context, Result};
|
||||
use electrs_rocksdb as rocksdb;
|
||||
|
||||
use std::iter;
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
@ -240,48 +239,13 @@ impl DBStore {
|
||||
self.iter_prefix_cf(self.txid_cf(), prefix)
|
||||
}
|
||||
|
||||
fn iter_cf<'a, const N: usize, F: FnMut(&[u8]) -> bool + 'a>(
|
||||
fn iter_cf<'a, const N: usize>(
|
||||
&'a self,
|
||||
cf: &rocksdb::ColumnFamily,
|
||||
readopts: rocksdb::ReadOptions,
|
||||
start: Option<&[u8]>,
|
||||
mut filter_fn: F,
|
||||
prefix: Option<HashPrefix>,
|
||||
) -> impl Iterator<Item = [u8; N]> + '_ {
|
||||
let mut raw_iter = self.db.raw_iterator_cf_opt(cf, readopts);
|
||||
let mut done = false;
|
||||
|
||||
if let Some(key) = start {
|
||||
raw_iter.seek(key);
|
||||
} else {
|
||||
raw_iter.seek_to_first();
|
||||
};
|
||||
|
||||
iter::from_fn(move || loop {
|
||||
// based on <DBIteratorWithThreadMode as Iterator>::next
|
||||
break if done {
|
||||
None
|
||||
} else if let Some((key, _)) = raw_iter.item() {
|
||||
let ret = if filter_fn(key) {
|
||||
Some(
|
||||
key.try_into()
|
||||
.with_context(|| format!("expected {N} bytes, got {}", key.len()))
|
||||
.expect("database key has wrong length"),
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
raw_iter.next();
|
||||
if ret.is_some() {
|
||||
ret
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
done = true;
|
||||
raw_iter.status().expect("cf iterator failed");
|
||||
None
|
||||
};
|
||||
})
|
||||
DBIterator::new(self.db.raw_iterator_cf_opt(cf, readopts), prefix)
|
||||
}
|
||||
|
||||
fn iter_prefix_cf(
|
||||
@ -291,18 +255,13 @@ impl DBStore {
|
||||
) -> impl Iterator<Item = SerializedHashPrefixRow> + '_ {
|
||||
let mut opts = rocksdb::ReadOptions::default();
|
||||
opts.set_prefix_same_as_start(true); // requires .set_prefix_extractor() above.
|
||||
self.iter_cf(cf, opts, Some(&prefix), |_| true)
|
||||
self.iter_cf(cf, opts, Some(prefix))
|
||||
}
|
||||
|
||||
pub(crate) fn iter_headers(&self) -> impl Iterator<Item = SerializedHeaderRow> + '_ {
|
||||
let mut opts = rocksdb::ReadOptions::default();
|
||||
opts.fill_cache(false);
|
||||
self.iter_cf(
|
||||
self.headers_cf(),
|
||||
opts,
|
||||
None,
|
||||
|key| key != TIP_KEY, // headers' rows are longer than TIP_KEY
|
||||
)
|
||||
self.iter_cf(self.headers_cf(), opts, None)
|
||||
}
|
||||
|
||||
pub(crate) fn get_tip(&self) -> Option<Vec<u8>> {
|
||||
@ -406,6 +365,55 @@ impl DBStore {
|
||||
}
|
||||
}
|
||||
|
||||
struct DBIterator<'a, const N: usize> {
|
||||
raw: rocksdb::DBRawIterator<'a>,
|
||||
prefix: Option<HashPrefix>,
|
||||
done: bool,
|
||||
}
|
||||
|
||||
impl<'a, const N: usize> DBIterator<'a, N> {
|
||||
fn new(mut raw: rocksdb::DBRawIterator<'a>, prefix: Option<HashPrefix>) -> Self {
|
||||
match prefix {
|
||||
Some(key) => raw.seek(key),
|
||||
None => raw.seek_to_first(),
|
||||
};
|
||||
Self {
|
||||
raw,
|
||||
prefix,
|
||||
done: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, const N: usize> Iterator for DBIterator<'a, N> {
|
||||
type Item = [u8; N];
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
while !self.done {
|
||||
let key = match self.raw.key() {
|
||||
Some(key) => key,
|
||||
None => break, // end of scan
|
||||
};
|
||||
let prefix_match = match self.prefix {
|
||||
Some(key_prefix) => key.starts_with(&key_prefix),
|
||||
None => true,
|
||||
};
|
||||
if !prefix_match {
|
||||
break; // prefix mismatch
|
||||
}
|
||||
let result: Option<[u8; N]> = key.try_into().ok();
|
||||
self.raw.next();
|
||||
match result {
|
||||
Some(value) => return Some(value),
|
||||
None => continue, // skip keys with size != N
|
||||
}
|
||||
}
|
||||
self.raw.status().expect("DB scan failed");
|
||||
self.done = true;
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for DBStore {
|
||||
fn drop(&mut self) {
|
||||
info!("closing DB at {}", self.db.path().display());
|
||||
|
Loading…
Reference in New Issue
Block a user