mirror of
https://github.com/romanz/electrs.git
synced 2025-02-24 06:57:53 +01:00
Merge branch 'sized_cache'
This commit is contained in:
commit
56d69d1709
6 changed files with 208 additions and 61 deletions
|
@ -2,12 +2,13 @@
|
|||
|
||||
* Use `configure_me` instead of `clap` to support config files, environment variables and man pages (@Kixunil)
|
||||
* Don't accept `--cookie` via CLI arguments (@Kixunil)
|
||||
* Define cache size in MB instead of number of elements (@dagurval)
|
||||
* Support Rust >=1.34 (for Debian)
|
||||
|
||||
# 0.7.1 (27 July 2019)
|
||||
|
||||
* Allow stopping bulk indexing via SIGINT/SIGTERM
|
||||
* Cache list of transaction IDs for blocks
|
||||
* Cache list of transaction IDs for blocks (@dagurval)
|
||||
|
||||
# 0.7.0 (13 June 2019)
|
||||
|
||||
|
|
|
@ -76,16 +76,16 @@ doc = "Number of threads used for bulk indexing (default: use the # of CPUs)"
|
|||
default = "0"
|
||||
|
||||
[[param]]
|
||||
name = "tx_cache_size"
|
||||
type = "usize"
|
||||
doc = "Number of transactions to keep in for query LRU cache"
|
||||
default = "10000"
|
||||
name = "tx_cache_size_mb"
|
||||
type = "f32"
|
||||
doc = "Total size of transactions to cache (MB)"
|
||||
default = "10.0"
|
||||
|
||||
[[param]]
|
||||
name = "blocktxids_cache_size"
|
||||
type = "usize"
|
||||
doc = "Number of blocks to cache transactions IDs in LRU cache"
|
||||
default = "100"
|
||||
name = "blocktxids_cache_size_mb"
|
||||
type = "f32"
|
||||
doc = "Total size of block transactions IDs to cache (in MB)"
|
||||
default = "10.0"
|
||||
|
||||
[[param]]
|
||||
name = "txid_limit"
|
||||
|
|
|
@ -12,13 +12,13 @@ use std::time::Duration;
|
|||
use electrs::{
|
||||
app::App,
|
||||
bulk,
|
||||
cache::BlockTxIDsCache,
|
||||
cache::{BlockTxIDsCache, TransactionCache},
|
||||
config::Config,
|
||||
daemon::Daemon,
|
||||
errors::*,
|
||||
index::Index,
|
||||
metrics::Metrics,
|
||||
query::{Query, TransactionCache},
|
||||
query::Query,
|
||||
rpc::RPC,
|
||||
signal::Waiter,
|
||||
store::{full_compaction, is_fully_compacted, DBStore},
|
||||
|
@ -58,7 +58,7 @@ fn run_server(config: &Config) -> Result<()> {
|
|||
.enable_compaction(); // enable auto compactions before starting incremental index updates.
|
||||
|
||||
let app = App::new(store, index, daemon, &config)?;
|
||||
let tx_cache = TransactionCache::new(config.tx_cache_size);
|
||||
let tx_cache = TransactionCache::new(config.tx_cache_size, &metrics);
|
||||
let query = Query::new(app.clone(), &metrics, tx_cache, config.txid_limit);
|
||||
|
||||
let mut server = None; // Electrum RPC server
|
||||
|
|
204
src/cache.rs
204
src/cache.rs
|
@ -1,27 +1,76 @@
|
|||
use crate::errors::*;
|
||||
use crate::metrics::{Counter, MetricOpts, Metrics};
|
||||
use crate::metrics::{CounterVec, MetricOpts, Metrics};
|
||||
|
||||
use bitcoin::blockdata::transaction::Transaction;
|
||||
use bitcoin::consensus::encode::deserialize;
|
||||
use bitcoin_hashes::sha256d::Hash as Sha256dHash;
|
||||
use lru::LruCache;
|
||||
use std::hash::Hash;
|
||||
use std::sync::Mutex;
|
||||
|
||||
struct SizedLruCache<K, V> {
|
||||
map: LruCache<K, (V, usize)>,
|
||||
bytes_usage: usize,
|
||||
bytes_capacity: usize,
|
||||
lookups: CounterVec,
|
||||
}
|
||||
|
||||
impl<K: Hash + Eq, V> SizedLruCache<K, V> {
|
||||
fn new(bytes_capacity: usize, lookups: CounterVec) -> SizedLruCache<K, V> {
|
||||
SizedLruCache {
|
||||
map: LruCache::unbounded(),
|
||||
bytes_usage: 0,
|
||||
bytes_capacity,
|
||||
lookups,
|
||||
}
|
||||
}
|
||||
|
||||
fn get(&mut self, key: &K) -> Option<&V> {
|
||||
match self.map.get(key) {
|
||||
None => {
|
||||
self.lookups.with_label_values(&["miss"]).inc();
|
||||
None
|
||||
}
|
||||
Some((value, _)) => {
|
||||
self.lookups.with_label_values(&["hit"]).inc();
|
||||
Some(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn put(&mut self, key: K, value: V, byte_size: usize) {
|
||||
if byte_size > self.bytes_capacity {
|
||||
return;
|
||||
}
|
||||
if let Some((_, popped_size)) = self.map.put(key, (value, byte_size)) {
|
||||
self.bytes_usage -= popped_size
|
||||
}
|
||||
self.bytes_usage += byte_size;
|
||||
|
||||
while self.bytes_usage > self.bytes_capacity {
|
||||
match self.map.pop_lru() {
|
||||
Some((_, (_, popped_size))) => self.bytes_usage -= popped_size,
|
||||
None => return,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BlockTxIDsCache {
|
||||
map: Mutex<LruCache<Sha256dHash /* blockhash */, Vec<Sha256dHash /* txid */>>>,
|
||||
hits: Counter,
|
||||
misses: Counter,
|
||||
map: Mutex<SizedLruCache<Sha256dHash /* blockhash */, Vec<Sha256dHash /* txid */>>>,
|
||||
}
|
||||
|
||||
impl BlockTxIDsCache {
|
||||
pub fn new(capacity: usize, metrics: &Metrics) -> BlockTxIDsCache {
|
||||
pub fn new(bytes_capacity: usize, metrics: &Metrics) -> BlockTxIDsCache {
|
||||
let lookups = metrics.counter_vec(
|
||||
MetricOpts::new(
|
||||
"electrs_blocktxids_cache",
|
||||
"# of cache lookups for list of transactions in a block",
|
||||
),
|
||||
&["type"],
|
||||
);
|
||||
BlockTxIDsCache {
|
||||
map: Mutex::new(LruCache::new(capacity)),
|
||||
hits: metrics.counter(MetricOpts::new(
|
||||
"electrs_blocktxids_cache_hits",
|
||||
"# of cache hits for list of transactions in a block",
|
||||
)),
|
||||
misses: metrics.counter(MetricOpts::new(
|
||||
"electrs_blocktxids_cache_misses",
|
||||
"# of cache misses for list of transactions in a block",
|
||||
)),
|
||||
map: Mutex::new(SizedLruCache::new(bytes_capacity, lookups)),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -34,29 +83,111 @@ impl BlockTxIDsCache {
|
|||
F: FnOnce() -> Result<Vec<Sha256dHash>>,
|
||||
{
|
||||
if let Some(txids) = self.map.lock().unwrap().get(blockhash) {
|
||||
self.hits.inc();
|
||||
return Ok(txids.clone());
|
||||
}
|
||||
|
||||
self.misses.inc();
|
||||
let txids = load_txids_func()?;
|
||||
self.map.lock().unwrap().put(*blockhash, txids.clone());
|
||||
let byte_size = 32 /* hash size */ * (1 /* key */ + txids.len() /* values */);
|
||||
self.map
|
||||
.lock()
|
||||
.unwrap()
|
||||
.put(*blockhash, txids.clone(), byte_size);
|
||||
Ok(txids)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TransactionCache {
|
||||
// Store serialized transaction (should use less RAM).
|
||||
map: Mutex<SizedLruCache<Sha256dHash, Vec<u8>>>,
|
||||
}
|
||||
|
||||
impl TransactionCache {
|
||||
pub fn new(bytes_capacity: usize, metrics: &Metrics) -> TransactionCache {
|
||||
let lookups = metrics.counter_vec(
|
||||
MetricOpts::new(
|
||||
"electrs_transactions_cache",
|
||||
"# of cache lookups for transactions",
|
||||
),
|
||||
&["type"],
|
||||
);
|
||||
TransactionCache {
|
||||
map: Mutex::new(SizedLruCache::new(bytes_capacity, lookups)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_or_else<F>(&self, txid: &Sha256dHash, load_txn_func: F) -> Result<Transaction>
|
||||
where
|
||||
F: FnOnce() -> Result<Vec<u8>>,
|
||||
{
|
||||
match self.map.lock().unwrap().get(txid) {
|
||||
Some(serialized_txn) => {
|
||||
return Ok(deserialize(&serialized_txn).chain_err(|| "failed to parse cached tx")?);
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
let serialized_txn = load_txn_func()?;
|
||||
let txn = deserialize(&serialized_txn).chain_err(|| "failed to parse serialized tx")?;
|
||||
let byte_size = 32 /* key (hash size) */ + serialized_txn.len();
|
||||
self.map
|
||||
.lock()
|
||||
.unwrap()
|
||||
.put(*txid, serialized_txn, byte_size);
|
||||
Ok(txn)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use bitcoin_hashes::Hash;
|
||||
|
||||
#[test]
|
||||
fn test_sized_lru_cache_hit_and_miss() {
|
||||
let counter = CounterVec::new(prometheus::Opts::new("name", "help"), &["type"]).unwrap();
|
||||
let mut cache = SizedLruCache::<i8, i32>::new(100, counter.clone());
|
||||
assert_eq!(counter.with_label_values(&["miss"]).get(), 0);
|
||||
assert_eq!(counter.with_label_values(&["hit"]).get(), 0);
|
||||
|
||||
assert_eq!(cache.get(&1), None); // no such key
|
||||
assert_eq!(counter.with_label_values(&["miss"]).get(), 1);
|
||||
assert_eq!(counter.with_label_values(&["hit"]).get(), 0);
|
||||
|
||||
cache.put(1, 10, 50); // add new key-value
|
||||
assert_eq!(cache.get(&1), Some(&10));
|
||||
assert_eq!(counter.with_label_values(&["miss"]).get(), 1);
|
||||
assert_eq!(counter.with_label_values(&["hit"]).get(), 1);
|
||||
|
||||
cache.put(3, 30, 50); // drop oldest key (1)
|
||||
cache.put(2, 20, 50);
|
||||
assert_eq!(cache.get(&1), None);
|
||||
assert_eq!(cache.get(&2), Some(&20));
|
||||
assert_eq!(cache.get(&3), Some(&30));
|
||||
assert_eq!(counter.with_label_values(&["miss"]).get(), 2);
|
||||
assert_eq!(counter.with_label_values(&["hit"]).get(), 3);
|
||||
|
||||
cache.put(3, 33, 50); // replace existing value
|
||||
assert_eq!(cache.get(&1), None);
|
||||
assert_eq!(cache.get(&2), Some(&20));
|
||||
assert_eq!(cache.get(&3), Some(&33));
|
||||
assert_eq!(counter.with_label_values(&["miss"]).get(), 3);
|
||||
assert_eq!(counter.with_label_values(&["hit"]).get(), 5);
|
||||
|
||||
cache.put(9, 90, 9999); // larger than cache capacity, don't drop the cache
|
||||
assert_eq!(cache.get(&1), None);
|
||||
assert_eq!(cache.get(&2), Some(&20));
|
||||
assert_eq!(cache.get(&3), Some(&33));
|
||||
assert_eq!(cache.get(&9), None);
|
||||
assert_eq!(counter.with_label_values(&["miss"]).get(), 5);
|
||||
assert_eq!(counter.with_label_values(&["hit"]).get(), 7);
|
||||
}
|
||||
|
||||
fn gen_hash(seed: u8) -> Sha256dHash {
|
||||
let bytes: Vec<u8> = (seed..seed + 32).collect();
|
||||
Sha256dHash::hash(&bytes[..])
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cache_hit_and_miss() {
|
||||
fn test_blocktxids_cache_hit_and_miss() {
|
||||
let block1 = gen_hash(1);
|
||||
let block2 = gen_hash(2);
|
||||
let block3 = gen_hash(3);
|
||||
|
@ -69,7 +200,8 @@ mod tests {
|
|||
};
|
||||
|
||||
let dummy_metrics = Metrics::new("127.0.0.1:60000".parse().unwrap());
|
||||
let cache = BlockTxIDsCache::new(2, &dummy_metrics);
|
||||
// 200 bytes ~ 32 (bytes/hash) * (1 key hash + 2 value hashes) * 2 txns
|
||||
let cache = BlockTxIDsCache::new(200, &dummy_metrics);
|
||||
|
||||
// cache miss
|
||||
let result = cache.get_or_else(&block1, &miss_func).unwrap();
|
||||
|
@ -81,7 +213,7 @@ mod tests {
|
|||
assert_eq!(1, *misses.lock().unwrap());
|
||||
assert_eq!(txids, result);
|
||||
|
||||
// cache size is 2, test that blockhash1 falls out of cache
|
||||
// cache size is 200, test that blockhash1 falls out of cache
|
||||
cache.get_or_else(&block2, &miss_func).unwrap();
|
||||
assert_eq!(2, *misses.lock().unwrap());
|
||||
cache.get_or_else(&block3, &miss_func).unwrap();
|
||||
|
@ -94,4 +226,36 @@ mod tests {
|
|||
cache.get_or_else(&block1, &miss_func).unwrap();
|
||||
assert_eq!(4, *misses.lock().unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_txn_cache() {
|
||||
use bitcoin::util::hash::BitcoinHash;
|
||||
use hex;
|
||||
|
||||
let dummy_metrics = Metrics::new("127.0.0.1:60000".parse().unwrap());
|
||||
let cache = TransactionCache::new(1024, &dummy_metrics);
|
||||
let tx_bytes = hex::decode("0100000001a15d57094aa7a21a28cb20b59aab8fc7d1149a3bdbcddba9c622e4f5f6a99ece010000006c493046022100f93bb0e7d8db7bd46e40132d1f8242026e045f03a0efe71bbb8e3f475e970d790221009337cd7f1f929f00cc6ff01f03729b069a7c21b59b1736ddfee5db5946c5da8c0121033b9b137ee87d5a812d6f506efdd37f0affa7ffc310711c06c7f3e097c9447c52ffffffff0100e1f505000000001976a9140389035a9225b3839e2bbf32d826a1e222031fd888ac00000000").unwrap();
|
||||
|
||||
let tx: Transaction = deserialize(&tx_bytes).unwrap();
|
||||
let txid = tx.bitcoin_hash();
|
||||
|
||||
let mut misses = 0;
|
||||
assert_eq!(
|
||||
cache
|
||||
.get_or_else(&txid, || {
|
||||
misses += 1;
|
||||
Ok(tx_bytes.clone())
|
||||
})
|
||||
.unwrap(),
|
||||
tx
|
||||
);
|
||||
assert_eq!(misses, 1);
|
||||
assert_eq!(
|
||||
cache
|
||||
.get_or_else(&txid, || panic!("should not be called"))
|
||||
.unwrap(),
|
||||
tx
|
||||
);
|
||||
assert_eq!(misses, 1);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -234,6 +234,7 @@ impl Config {
|
|||
if config.bulk_index_threads == 0 {
|
||||
config.bulk_index_threads = num_cpus::get();
|
||||
}
|
||||
const MB: f32 = (1 << 20) as f32;
|
||||
let config = Config {
|
||||
log,
|
||||
network_type: config.network,
|
||||
|
@ -246,8 +247,8 @@ impl Config {
|
|||
jsonrpc_import: config.jsonrpc_import,
|
||||
index_batch_size: config.index_batch_size,
|
||||
bulk_index_threads: config.bulk_index_threads,
|
||||
tx_cache_size: config.tx_cache_size,
|
||||
blocktxids_cache_size: config.blocktxids_cache_size,
|
||||
tx_cache_size: (config.tx_cache_size_mb * MB) as usize,
|
||||
blocktxids_cache_size: (config.blocktxids_cache_size_mb * MB) as usize,
|
||||
txid_limit: config.txid_limit,
|
||||
server_banner: config.server_banner,
|
||||
};
|
||||
|
|
35
src/query.rs
35
src/query.rs
|
@ -5,12 +5,12 @@ use bitcoin_hashes::sha256d::Hash as Sha256dHash;
|
|||
use bitcoin_hashes::Hash;
|
||||
use crypto::digest::Digest;
|
||||
use crypto::sha2::Sha256;
|
||||
use lru::LruCache;
|
||||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use crate::app::App;
|
||||
use crate::cache::TransactionCache;
|
||||
use crate::errors::*;
|
||||
use crate::index::{compute_script_hash, TxInRow, TxOutRow, TxRow};
|
||||
use crate::mempool::Tracker;
|
||||
|
@ -177,30 +177,6 @@ fn txids_by_funding_output(
|
|||
.collect()
|
||||
}
|
||||
|
||||
pub struct TransactionCache {
|
||||
map: Mutex<LruCache<Sha256dHash, Transaction>>,
|
||||
}
|
||||
|
||||
impl TransactionCache {
|
||||
pub fn new(capacity: usize) -> TransactionCache {
|
||||
TransactionCache {
|
||||
map: Mutex::new(LruCache::new(capacity)),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_or_else<F>(&self, txid: &Sha256dHash, load_txn_func: F) -> Result<Transaction>
|
||||
where
|
||||
F: FnOnce() -> Result<Transaction>,
|
||||
{
|
||||
if let Some(txn) = self.map.lock().unwrap().get(txid) {
|
||||
return Ok(txn.clone());
|
||||
}
|
||||
let txn = load_txn_func()?;
|
||||
self.map.lock().unwrap().put(*txid, txn.clone());
|
||||
Ok(txn)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Query {
|
||||
app: Arc<App>,
|
||||
tracker: RwLock<Tracker>,
|
||||
|
@ -378,7 +354,12 @@ impl Query {
|
|||
fn load_txn(&self, txid: &Sha256dHash, block_height: Option<u32>) -> Result<Transaction> {
|
||||
self.tx_cache.get_or_else(&txid, || {
|
||||
let blockhash = self.lookup_confirmed_blockhash(txid, block_height)?;
|
||||
self.app.daemon().gettransaction(txid, blockhash)
|
||||
let value: Value = self
|
||||
.app
|
||||
.daemon()
|
||||
.gettransaction_raw(txid, blockhash, /*verbose*/ false)?;
|
||||
let value_hex: &str = value.as_str().chain_err(|| "non-string tx")?;
|
||||
hex::decode(&value_hex).chain_err(|| "non-hex tx")
|
||||
})
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue