1
0
mirror of https://github.com/romanz/electrs.git synced 2024-11-19 01:43:29 +01:00

WIP: use a separate RocksDB for status caching

This commit is contained in:
Roman Zeyde 2021-10-09 12:41:59 +03:00
parent 04089ee651
commit c158f83ce2
7 changed files with 128 additions and 18 deletions

10
Cargo.lock generated
View File

@ -65,6 +65,15 @@ version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf9ff0bbfd639f15c74af777d81383cf53efb7c93613f6cab67c6c11e05bbf8b"
[[package]]
name = "bincode"
version = "1.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad"
dependencies = [
"serde",
]
[[package]]
name = "bindgen"
version = "0.55.1"
@ -334,6 +343,7 @@ name = "electrs"
version = "0.9.4"
dependencies = [
"anyhow",
"bincode",
"bitcoin",
"bitcoincore-rpc",
"configure_me",

View File

@ -23,6 +23,7 @@ spec = "internal/config_specification.toml"
[dependencies]
anyhow = "1.0"
bincode = "=1.3.3"
bitcoin = { version = "0.27.1", features = ["use-serde", "rand"] }
bitcoincore-rpc = "0.14.0"
configure_me = "0.4"

View File

@ -130,3 +130,8 @@ default = "concat!(\"Welcome to electrs \", env!(\"CARGO_PKG_VERSION\"), \" (Ele
name = "log_filters"
type = "String"
doc = "Logging filters, overriding `RUST_LOG` environment variable (see https://docs.rs/env_logger/ for details)"
[[param]]
name = "cache_db_dir"
type = "std::path::PathBuf"
doc = "Directory to store server-side cache database (default: disabled for privacy considerations)"

View File

@ -1,22 +1,32 @@
use bitcoin::{Transaction, Txid};
use anyhow::Result;
use bitcoin::{BlockHash, Transaction, Txid};
use electrs_rocksdb as rocksdb;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::io::{Cursor, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use crate::metrics::{self, Histogram, Metrics};
use crate::{
metrics::{self, Histogram, Metrics},
status::TxEntry,
types::ScriptHash,
};
pub(crate) struct Cache {
txs: Arc<RwLock<HashMap<Txid, Transaction>>>,
db: Option<CacheDB>,
// stats
txs_size: Histogram,
}
impl Cache {
pub fn new(metrics: &Metrics) -> Self {
pub fn new(metrics: &Metrics, cache_db_path: Option<&PathBuf>) -> Self {
Cache {
txs: Default::default(),
db: cache_db_path.map(|path| CacheDB::open(path).unwrap()),
txs_size: metrics.histogram_vec(
"cache_txs_size",
"Cached transactions' size (in bytes)",
@ -40,4 +50,66 @@ impl Cache {
{
self.txs.read().get(txid).map(f)
}
pub fn add_status_entry(
&self,
scripthash: ScriptHash,
blockhash: BlockHash,
entries: &[TxEntry],
) {
if let Some(db) = &self.db {
db.add(scripthash, blockhash, entries);
}
}
pub fn get_status_entries(&self, scripthash: ScriptHash) -> HashMap<BlockHash, Vec<TxEntry>> {
self.db
.as_ref()
.map(|db| db.scan(scripthash))
.unwrap_or_default()
}
}
struct CacheDB {
db: rocksdb::DB,
}
impl CacheDB {
fn open(path: &Path) -> Result<Self> {
let db = rocksdb::DB::open_default(path)?;
let live_files = db.live_files()?;
info!(
"{:?}: {} SST files, {} GB, {} Grows",
path,
live_files.len(),
live_files.iter().map(|f| f.size).sum::<usize>() as f64 / 1e9,
live_files.iter().map(|f| f.num_entries).sum::<u64>() as f64 / 1e9
);
Ok(CacheDB { db })
}
fn add(&self, scripthash: ScriptHash, blockhash: BlockHash, entries: &[TxEntry]) {
let mut cursor = Cursor::new(Vec::with_capacity(1024));
cursor.write_all(&scripthash).unwrap();
bincode::serialize_into(&mut cursor, &blockhash).unwrap();
bincode::serialize_into(&mut cursor, entries).unwrap();
let mut batch = rocksdb::WriteBatch::default();
batch.put(cursor.into_inner(), b"");
self.db.write_without_wal(batch).unwrap(); // best-effort write
}
fn scan(&self, scripthash: ScriptHash) -> HashMap<BlockHash, Vec<TxEntry>> {
let mode = rocksdb::IteratorMode::From(&scripthash, rocksdb::Direction::Forward);
self.db
.iterator(mode)
.map(|(key, _)| key)
.take_while(|key| key.starts_with(&scripthash))
.map(|key| {
let mut cursor = &key[scripthash.len()..];
let blockhash: BlockHash = bincode::deserialize_from(&mut cursor).unwrap();
let entries: Vec<TxEntry> = bincode::deserialize_from(&mut cursor).unwrap();
(blockhash, entries)
})
.collect()
}
}

View File

@ -144,6 +144,7 @@ pub struct Config {
pub sync_once: bool,
pub disable_electrum_rpc: bool,
pub server_banner: String,
pub cache_db_path: Option<PathBuf>,
pub args: Vec<String>,
}
@ -201,7 +202,7 @@ impl Config {
pub fn from_args() -> Config {
use internal::ResultExt;
let (mut config, args) =
let (config, args) =
internal::Config::including_optional_config_files(default_config_files())
.unwrap_or_exit();
@ -212,7 +213,10 @@ impl Config {
Network::Signet => "signet",
};
config.db_dir.push(db_subdir);
let db_path = config.db_dir.join(db_subdir);
let cache_db_path = config
.cache_db_dir
.map(|d| d.join(format!("{}-cache", db_subdir)));
let default_daemon_rpc_port = match config.network {
Network::Bitcoin => 8332,
@ -263,14 +267,13 @@ impl Config {
ResolvAddr::resolve_or_exit,
);
match config.network {
Network::Bitcoin => (),
Network::Testnet => config.daemon_dir.push("testnet3"),
Network::Regtest => config.daemon_dir.push("regtest"),
Network::Signet => config.daemon_dir.push("signet"),
}
let daemon_dir = match config.network {
Network::Bitcoin => config.daemon_dir.clone(),
Network::Testnet => config.daemon_dir.join("testnet3"),
Network::Regtest => config.daemon_dir.join("regtest"),
Network::Signet => config.daemon_dir.join("signet"),
};
let daemon_dir = &config.daemon_dir;
let daemon_auth = SensitiveAuth(match (config.auth, config.cookie_file) {
(None, None) => Auth::CookieFile(daemon_dir.join(".cookie")),
(None, Some(cookie_file)) => Auth::CookieFile(cookie_file),
@ -314,8 +317,8 @@ impl Config {
let config = Config {
network: config.network,
db_path: config.db_dir,
daemon_dir: config.daemon_dir,
db_path,
daemon_dir,
daemon_auth,
daemon_rpc_addr,
daemon_p2p_addr,
@ -331,6 +334,7 @@ impl Config {
sync_once: config.sync_once,
disable_electrum_rpc: config.disable_electrum_rpc,
server_banner: config.server_banner,
cache_db_path,
args: args.map(|a| a.into_string().unwrap()).collect(),
};
eprintln!(

View File

@ -140,7 +140,7 @@ impl Rpc {
let tracker = Tracker::new(config, metrics)?;
let signal = Signal::new();
let daemon = Daemon::connect(config, signal.exit_flag(), tracker.metrics())?;
let cache = Cache::new(tracker.metrics());
let cache = Cache::new(tracker.metrics(), config.cache_db_path.as_ref());
Ok(Self {
tracker,
cache,
@ -340,7 +340,7 @@ impl Rpc {
}
fn new_status(&self, scripthash: ScriptHash) -> Result<ScriptHashStatus> {
let mut status = ScriptHashStatus::new(scripthash);
let mut status = ScriptHashStatus::load(scripthash, &self.cache);
self.tracker
.update_scripthash_status(&mut status, &self.daemon, &self.cache)?;
Ok(status)

View File

@ -19,14 +19,17 @@ use crate::{
};
/// Given a scripthash, store relevant inputs and outputs of a specific transaction
struct TxEntry {
#[derive(Clone, Serialize, Deserialize)]
pub(crate) struct TxEntry {
txid: Txid,
outputs: Vec<TxOutput>, // relevant funded outputs and their amounts
spent: Vec<OutPoint>, // relevant spent outpoints
}
#[derive(Clone, Serialize, Deserialize)]
struct TxOutput {
index: u32,
#[serde(with = "bitcoin::util::amount::serde::as_sat")]
value: Amount,
}
@ -230,6 +233,20 @@ impl ScriptHashStatus {
}
}
/// Return non-synced (empty) status for a given script hash.
pub(crate) fn load(scripthash: ScriptHash, cache: &Cache) -> Self {
let mut result = Self::new(scripthash);
result.confirmed = cache.get_status_entries(scripthash);
if !result.confirmed.is_empty() {
debug!(
"{} status transaction entries loaded from {} blocks",
result.confirmed.values().map(Vec::len).sum::<usize>(),
result.confirmed.len()
);
}
result
}
/// Iterate through confirmed TxEntries with their corresponding block heights.
/// Skip entries from stale blocks.
fn confirmed_height_entries<'a>(
@ -370,7 +387,7 @@ impl ScriptHashStatus {
})?;
Ok(result
.into_iter()
.into_par_iter()
.map(|(blockhash, entries_map)| {
// sort transactions by their position in a block
let sorted_entries = entries_map
@ -379,6 +396,7 @@ impl ScriptHashStatus {
.into_iter()
.map(|(_pos, entry)| entry)
.collect::<Vec<TxEntry>>();
cache.add_status_entry(self.scripthash, blockhash, &sorted_entries);
(blockhash, sorted_entries)
})
.collect())