1
0
mirror of https://github.com/romanz/electrs.git synced 2024-11-19 01:43:29 +01:00

Add new code

This commit is contained in:
Roman Zeyde 2020-10-24 20:00:04 +03:00
parent bef20f38b6
commit 82c50feea2
19 changed files with 4895 additions and 0 deletions

1688
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

7
Cargo.toml Normal file
View File

@ -0,0 +1,7 @@
[workspace]
members = [
"electrs_index",
"electrs_query",
"electrs_rpc",
]

21
electrs_index/Cargo.toml Normal file
View File

@ -0,0 +1,21 @@
[package]
name = "electrs_index"
version = "0.1.0"
authors = ["Roman Zeyde <me@romanzey.de>"]
edition = "2018"
[dependencies]
anyhow = "1.0"
bitcoin = { version = "0.25", features = ["use-serde"] }
bitcoincore-rpc = "0.12"
chrono = "0.4"
clap = "2.33"
env_logger = "0.7"
hyper = "0.10"
log = "0.4"
prometheus = { version = "0.10", features = ["process"] }
rocksdb = "^0.15.0"
rust-crypto = "0.2"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"

View File

@ -0,0 +1,97 @@
use bitcoin::Network;
use clap::{App, Arg};
use std::net::SocketAddr;
use std::path::PathBuf;
use std::time::Duration;
#[derive(Debug)]
pub struct Config {
pub electrum_rpc_addr: SocketAddr,
pub daemon_rpc_addr: SocketAddr,
pub monitoring_addr: SocketAddr,
pub db_path: PathBuf,
pub daemon_dir: PathBuf,
pub wait_duration: Duration,
}
impl Config {
pub fn from_args() -> Self {
let matches = App::new("Electrum Server in Rust")
.arg(
Arg::with_name("network")
.long("network")
.help("mainnet/testnet/regtest")
.required(true)
.takes_value(true),
)
.arg(
Arg::with_name("db-dir")
.long("db-dir")
.help("RocksDB directory")
.default_value("./db")
.takes_value(true),
)
.arg(
Arg::with_name("daemon-dir")
.long("daemon-dir")
.help("bitcoind directory")
.takes_value(true),
)
.get_matches();
let network_str = matches.value_of("network").unwrap();
let network = match network_str {
"mainnet" => Network::Bitcoin,
"testnet" => Network::Testnet,
"regtest" => Network::Regtest,
_ => panic!("unknown network"),
};
let electrum_port = match network {
Network::Bitcoin => 50001,
Network::Testnet => 60001,
Network::Regtest => 60401,
};
let electrum_rpc_addr: SocketAddr = ([127, 0, 0, 1], electrum_port).into();
let daemon_port = match network {
Network::Bitcoin => 8332,
Network::Testnet => 18332,
Network::Regtest => 18443,
};
let daemon_rpc_addr: SocketAddr = ([127, 0, 0, 1], daemon_port).into();
let monitoring_port = match network {
Network::Bitcoin => 4224,
Network::Testnet => 14224,
Network::Regtest => 24224,
};
let monitoring_addr: SocketAddr = ([127, 0, 0, 1], monitoring_port).into();
let daemon_dir: PathBuf = matches.value_of("daemon-dir").unwrap().into();
let daemon_dir = match network {
Network::Bitcoin => daemon_dir,
Network::Testnet => daemon_dir.join("testnet3"),
Network::Regtest => daemon_dir.join("regtest"),
};
let mut db_path: PathBuf = matches.value_of("db-dir").unwrap().into();
db_path.push(network_str);
env_logger::Builder::from_default_env()
.default_format()
.format_timestamp_millis()
.init();
Self {
electrum_rpc_addr,
daemon_rpc_addr,
monitoring_addr,
db_path,
daemon_dir,
wait_duration: Duration::from_secs(600),
}
}
}

126
electrs_index/src/daemon.rs Normal file
View File

@ -0,0 +1,126 @@
use anyhow::{Context, Result};
use bitcoin::{hash_types::BlockHash, Amount, Transaction, Txid};
use bitcoincore_rpc::{Auth, Client, RpcApi};
use serde_json::json;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::time::Duration;
#[derive(Debug, Deserialize, Serialize)]
pub struct BlockLocation {
pub file: usize,
pub data: usize,
pub undo: Option<usize>, // genesis block has no undo
pub prev: bitcoin::BlockHash,
}
#[derive(Debug, Deserialize, Serialize)]
struct GetBlockLocationsResult(Vec<BlockLocation>);
pub struct Daemon {
client: Client,
addr: SocketAddr,
daemon_dir: PathBuf,
blocks_dir: PathBuf,
}
impl Daemon {
pub fn new(addr: SocketAddr, daemon_dir: &Path) -> Result<Self> {
let cookie_path = daemon_dir.join(".cookie");
if !cookie_path.exists() {
bail!("missing cookie file: {:?}", cookie_path);
}
let auth = Auth::CookieFile(cookie_path);
let client = Client::new(format!("http://{}", addr), auth)
.with_context(|| format!("failed to connect {}", addr))?;
let blocks_dir = daemon_dir.join("blocks");
if !blocks_dir.exists() {
bail!("missing blocks directory: {:?}", blocks_dir);
}
let blockchain_info = client
.get_blockchain_info()
.context("get_network_info failed")?;
debug!("{:?}", blockchain_info);
if blockchain_info.pruned {
bail!("pruned node is not supported (use '-prune=0' bitcoind flag)")
}
let network_info = client
.get_network_info()
.context("get_network_info failed")?;
debug!("{:?}", network_info);
if network_info.version < 20_00_00 {
bail!(
"{} is not supported - please use bitcoind 0.20+",
network_info.subversion,
)
}
Ok(Daemon {
client,
addr,
daemon_dir: daemon_dir.to_owned(),
blocks_dir,
})
}
pub fn reconnect(&self) -> Result<Self> {
Self::new(self.addr, &self.daemon_dir)
}
pub fn client(&self) -> &Client {
&self.client
}
pub fn broadcast(&self, tx: &Transaction) -> Result<Txid> {
self.client
.send_raw_transaction(tx)
.context("broadcast failed")
}
pub fn get_best_block_hash(&self) -> Result<BlockHash> {
self.client
.get_best_block_hash()
.context("get_best_block_hash failed")
}
pub fn wait_for_new_block(&self, timeout: Duration) -> Result<BlockHash> {
Ok(self
.client
.wait_for_new_block(timeout.as_millis() as u64)?
.hash)
}
pub fn get_block_locations(
&self,
hash: &BlockHash,
nblocks: usize,
) -> Result<Vec<BlockLocation>> {
self.client
.call("getblocklocations", &[json!(hash), nblocks.into()])
.context("get_block_locations failed")
}
pub fn get_relay_fee(&self) -> Result<Amount> {
Ok(self
.client
.get_network_info()
.context("get_network_info failed")?
.relay_fee)
}
pub fn estimate_fee(&self, nblocks: u16) -> Result<Option<Amount>> {
Ok(self
.client
.estimate_smart_fee(nblocks, None)
.context("estimate_fee failed")?
.fee_rate)
}
pub fn blk_file_path(&self, i: usize) -> PathBuf {
self.blocks_dir.join(format!("blk{:05}.dat", i))
}
pub fn undo_file_path(&self, i: usize) -> PathBuf {
self.blocks_dir.join(format!("rev{:05}.dat", i))
}
}

202
electrs_index/src/db.rs Normal file
View File

@ -0,0 +1,202 @@
use anyhow::{Context, Result};
use std::path::{Path, PathBuf};
pub(crate) type Row = Box<[u8]>;
pub(crate) struct WriteBatch {
pub(crate) header_rows: Vec<Row>,
pub(crate) index_rows: Vec<Row>,
}
#[derive(Debug)]
struct Options {
path: PathBuf,
bulk_import: bool,
}
pub struct DBStore {
db: rocksdb::DB,
opts: Options,
}
const CONFIG_CF: &str = "config";
const HEADERS_CF: &str = "headers";
const INDEX_CF: &str = "index";
const CONFIG_KEY: &str = "C";
#[derive(Debug, Deserialize, Serialize)]
struct Config {
compacted: bool,
format: u64,
}
const CURRENT_FORMAT: u64 = 1;
impl DBStore {
fn open_opts(opts: Options) -> Result<Self> {
debug!("opening DB with {:?}", opts);
let mut db_opts = rocksdb::Options::default();
db_opts.create_if_missing(true);
db_opts.create_missing_column_families(true);
db_opts.set_max_background_jobs(4);
let mut cf_opts = rocksdb::Options::default();
cf_opts.set_keep_log_file_num(10);
cf_opts.set_max_open_files(16);
cf_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
cf_opts.set_compression_type(rocksdb::DBCompressionType::Zstd);
cf_opts.set_target_file_size_base(256 << 20);
cf_opts.set_write_buffer_size(256 << 20);
cf_opts.set_disable_auto_compactions(true); // for initial bulk load
cf_opts.set_advise_random_on_open(false); // bulk load uses sequential I/O
cf_opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(8));
cf_opts.set_compaction_readahead_size(1 << 20);
let cf_descriptors = vec![
rocksdb::ColumnFamilyDescriptor::new(CONFIG_CF, cf_opts.clone()),
rocksdb::ColumnFamilyDescriptor::new(HEADERS_CF, cf_opts.clone()),
rocksdb::ColumnFamilyDescriptor::new(INDEX_CF, cf_opts),
];
let db = rocksdb::DB::open_cf_descriptors(&db_opts, &opts.path, cf_descriptors)
.context("failed to open DB")?;
let mut store = DBStore { db, opts };
let config = store.get_config();
debug!("DB {:?}", config);
if config.format < CURRENT_FORMAT {
bail!(
"unsupported storage format {}, re-index required",
config.format
);
}
if config.compacted {
store.opts.bulk_import = false;
}
store.set_config(config);
Ok(store)
}
fn config_cf(&self) -> &rocksdb::ColumnFamily {
self.db.cf_handle(CONFIG_CF).expect("missing CONFIG_CF")
}
fn index_cf(&self) -> &rocksdb::ColumnFamily {
self.db.cf_handle(INDEX_CF).expect("missing INDEX_CF")
}
fn headers_cf(&self) -> &rocksdb::ColumnFamily {
self.db.cf_handle(HEADERS_CF).expect("missing HEADERS_CF")
}
/// Opens a new RocksDB at the specified location.
pub fn open(path: &Path) -> Result<Self> {
DBStore::open_opts(Options {
path: path.to_path_buf(),
bulk_import: true,
})
}
pub(crate) fn iter_index<'a>(&'a self, prefix: &'a [u8]) -> ScanIterator<'a> {
let mode = rocksdb::IteratorMode::From(prefix, rocksdb::Direction::Forward);
let iter = self.db.iterator_cf(self.index_cf(), mode);
ScanIterator {
prefix,
iter,
done: false,
}
}
pub(crate) fn read_headers(&self) -> Vec<Row> {
let mut opts = rocksdb::ReadOptions::default();
opts.fill_cache(false);
self.db
.iterator_cf_opt(self.headers_cf(), opts, rocksdb::IteratorMode::Start)
.map(|(key, _)| key)
.collect()
}
pub(crate) fn write(&self, batch: WriteBatch) {
let mut db_batch = rocksdb::WriteBatch::default();
for key in batch.index_rows {
db_batch.put_cf(self.index_cf(), key, b"");
}
for key in batch.header_rows {
db_batch.put_cf(self.headers_cf(), key, b"");
}
let mut opts = rocksdb::WriteOptions::new();
opts.set_sync(!self.opts.bulk_import);
opts.disable_wal(self.opts.bulk_import);
self.db.write_opt(db_batch, &opts).unwrap();
}
pub(crate) fn start_compactions(&mut self) {
let mut config = self.get_config();
if !config.compacted {
info!("starting full compaction");
let cf = self.index_cf();
self.db.compact_range_cf(cf, None::<&[u8]>, None::<&[u8]>); // would take a while
info!("finished full compaction");
config.compacted = true;
self.set_config(config);
}
if self.opts.bulk_import {
self.opts.bulk_import = false;
self.db
.set_options(&[("disable_auto_compactions", "false")])
.unwrap();
info!("auto-compactions enabled");
}
}
fn set_config(&self, config: Config) {
let mut opts = rocksdb::WriteOptions::default();
opts.set_sync(true);
opts.disable_wal(false);
let value = serde_json::to_vec(&config).expect("failed to serialize config");
self.db
.put_cf_opt(self.config_cf(), CONFIG_KEY, value, &opts)
.expect("DB::put failed");
}
fn get_config(&self) -> Config {
self.db
.get_cf(self.config_cf(), CONFIG_KEY)
.expect("DB::get failed")
.map(|value| serde_json::from_slice(&value).expect("failed to deserialize Config"))
.unwrap_or_else(|| Config {
compacted: false,
format: CURRENT_FORMAT,
})
}
}
pub(crate) struct ScanIterator<'a> {
prefix: &'a [u8],
iter: rocksdb::DBIterator<'a>,
done: bool,
}
impl<'a> Iterator for ScanIterator<'a> {
type Item = Row;
fn next(&mut self) -> Option<Row> {
if self.done {
return None;
}
let (key, _) = self.iter.next()?;
if !key.starts_with(&self.prefix) {
self.done = true;
return None;
}
Some(key)
}
}
impl Drop for DBStore {
fn drop(&mut self) {
info!("closing DB at {:?}", self.opts.path);
}
}

543
electrs_index/src/index.rs Normal file
View File

@ -0,0 +1,543 @@
use anyhow::{Context, Result};
use bitcoin::{
consensus::{serialize, Decodable},
Block, BlockHash, BlockHeader, VarInt,
};
use std::{
collections::BTreeMap,
io::{Cursor, Read, Seek, SeekFrom},
path::{Path, PathBuf},
sync::{
mpsc::{sync_channel, Receiver, SyncSender},
Arc, Mutex, RwLock, RwLockReadGuard,
},
time::Instant,
};
use crate::{
daemon::{BlockLocation, Daemon},
db,
map::BlockMap,
metrics::{Histogram, Metrics},
types::{BlockRow, BlockUndo, Confirmed, FilePos, Reader, ScriptHash, ScriptHashRow},
};
pub struct Index {
store: RwLock<db::DBStore>,
map: Arc<RwLock<BlockMap>>,
stats: Stats,
}
#[derive(Clone)]
struct Stats {
update_duration: Histogram,
update_size: Histogram,
lookup_duration: Histogram,
}
struct ReadRequest {
file_id: usize,
locations: Vec<BlockLocation>,
blk_file: PathBuf,
undo_file: PathBuf,
}
struct IndexRequest {
file_id: usize,
locations: Vec<BlockLocation>,
blk_file: Cursor<Vec<u8>>,
undo_file: Cursor<Vec<u8>>,
}
impl db::WriteBatch {
fn new() -> Self {
Self {
header_rows: vec![],
index_rows: vec![],
}
}
fn index(&mut self, loc: BlockLocation, block: Block, undo: BlockUndo) {
let (index_rows, header_row) = index_single_block(loc, block, undo);
self.index_rows
.extend(index_rows.iter().map(ScriptHashRow::to_db_row));
self.header_rows.push(header_row.to_db_row());
}
}
pub struct LookupResult {
pub readers: Vec<ConfirmedReader>,
pub tip: BlockHash,
}
impl Index {
pub fn new(store: db::DBStore, metrics: &Metrics) -> Result<Self> {
let update_duration = metrics.histogram_vec(
"index_update_duration",
"Index duration (in seconds)",
&["step"],
);
let update_size =
metrics.histogram_vec("index_update_size", "Index size (in bytes)", &["step"]);
let lookup_duration = metrics.histogram_vec(
"index_lookup_duration",
"Lookup duration (in seconds)",
&["step"],
);
let start = Instant::now();
let rows = store.read_headers();
let size: u64 = rows.iter().map(|row| row.len() as u64).sum();
let block_rows = rows
.into_iter()
.map(|row| BlockRow::from_db_row(&row))
.collect::<Result<Vec<BlockRow>>>()
.context("failed to read block rows")?;
debug!(
"read {} blocks ({:.3} MB) from DB at {:.3} ms",
block_rows.len(),
size as f64 / 1e6,
start.elapsed().as_millis()
);
let map = BlockMap::new(block_rows);
Ok(Index {
store: RwLock::new(store),
map: Arc::new(RwLock::new(map)),
stats: Stats {
update_duration,
update_size,
lookup_duration,
},
})
}
pub fn map(&self) -> RwLockReadGuard<BlockMap> {
self.map.read().unwrap()
}
pub fn block_reader(&self, blockhash: &BlockHash, daemon: &Daemon) -> Result<BlockReader> {
Ok(match self.map().get_block_pos(blockhash) {
None => bail!("unknown block {}", blockhash),
Some(pos) => BlockReader {
reader: pos.reader(daemon),
lookup_duration: self.stats.lookup_duration.clone(),
},
})
}
pub fn lookup(&self, script_hash: &ScriptHash, daemon: &Daemon) -> Result<LookupResult> {
let scan_prefix = ScriptHashRow::scan_prefix(script_hash);
let rows: Vec<ScriptHashRow> =
self.stats
.lookup_duration
.observe_duration("lookup_scan_rows", || {
self.store
.read()
.unwrap()
.iter_index(&scan_prefix)
.map(|row| ScriptHashRow::from_db_row(&row))
.collect::<Result<Vec<_>>>()
.context("failed to parse index rows")
})?;
debug!("{} has {} index rows", script_hash, rows.len());
let map = self.map(); // lock block map for concurrent updates
let readers = rows
.into_iter()
.filter_map(|row| match map.find_block(row.position()) {
Some((hash, height)) => {
let header = map.get_by_hash(hash).expect("missing block header");
Some(ConfirmedReader {
reader: row.position().reader(daemon),
lookup_duration: self.stats.lookup_duration.clone(),
header: *header,
height,
})
}
None => {
warn!("{:?} not confirmed", row);
None
}
})
.collect();
let tip = map.chain().last().cloned().unwrap_or_default();
Ok(LookupResult { readers, tip })
}
fn start_reader(
&self,
requests: Vec<ReadRequest>,
reader_tx: SyncSender<IndexRequest>,
) -> Result<std::thread::JoinHandle<Result<()>>> {
let update_duration = self.stats.update_duration.clone();
let update_size = self.stats.update_size.clone();
let reader_thread = std::thread::spawn(move || -> Result<()> {
for r in requests {
let blk_file =
update_duration.observe_duration("load_block", || read_file(&r.blk_file))?;
update_size.observe_size("load_block", blk_file.len());
let undo_file =
update_duration.observe_duration("load_undo", || read_file(&r.undo_file))?;
update_size.observe_size("load_undo", undo_file.len());
let file_id = r.file_id;
let index_request = IndexRequest {
file_id,
locations: r.locations,
blk_file: Cursor::new(blk_file),
undo_file: Cursor::new(undo_file),
};
reader_tx
.send(index_request)
.with_context(|| format!("reader send failed: file_id={}", file_id))?
}
Ok(())
});
Ok(reader_thread)
}
fn start_indexer(
&self,
i: usize,
reader_rx: Arc<Mutex<Receiver<IndexRequest>>>,
indexer_tx: SyncSender<db::WriteBatch>,
) -> std::thread::JoinHandle<Result<()>> {
let update_duration = self.stats.update_duration.clone();
std::thread::spawn(move || {
loop {
let IndexRequest {
file_id,
locations,
mut blk_file,
mut undo_file,
} = {
// make sure receiver is unlocked after recv() is over
match reader_rx.lock().unwrap().recv() {
Ok(msg) => msg,
Err(_) => break, // channel has disconnected
}
};
let parsed: Vec<(BlockLocation, Block, BlockUndo)> = update_duration
.observe_duration("parse", || {
locations
.into_iter()
.map(|loc| parse_block_and_undo(loc, &mut blk_file, &mut undo_file))
.collect()
});
let blocks_count = parsed.len();
let mut result = db::WriteBatch::new();
update_duration.observe_duration("index", || {
parsed.into_iter().for_each(|(loc, block, undo)| {
result.index(loc, block, undo);
});
});
debug!(
"indexer #{}: indexed {} blocks from file #{} into {} rows",
i,
blocks_count,
file_id,
result.index_rows.len()
);
update_duration.observe_duration("sort", || {
result.index_rows.sort_unstable();
result.header_rows.sort_unstable();
});
indexer_tx
.send(result)
.with_context(|| format!("indexer send failed: file_id={}", file_id))?
}
Ok(())
})
}
fn report_stats(&self, batch: &db::WriteBatch) {
self.stats
.update_size
.observe_size("write_index_rows", db_rows_size(&batch.index_rows));
self.stats
.update_size
.observe_size("write_header_rows", db_rows_size(&batch.header_rows));
debug!(
"writing index {} rows from {} blocks",
batch.index_rows.len(),
batch.header_rows.len(),
);
}
pub fn update(&self, daemon: &Daemon) -> Result<BlockHash> {
let start = Instant::now();
let tip = daemon.get_best_block_hash()?;
let locations = {
let map = self.map();
if map.chain().last() == Some(&tip) {
debug!("skip update, same tip: {}", tip);
return Ok(tip);
}
load_locations(daemon, tip, &map)?
};
let read_requests = group_locations_by_file(locations)
.into_iter()
.map(|(file_id, locations)| ReadRequest {
file_id,
locations,
blk_file: daemon.blk_file_path(file_id),
undo_file: daemon.undo_file_path(file_id),
})
.collect::<Vec<_>>();
let new_blocks = read_requests
.iter()
.map(|r| r.locations.len())
.sum::<usize>();
info!(
"reading {} new blocks from {} blk*.dat files",
new_blocks,
read_requests.len()
);
let (reader_tx, reader_rx) = sync_channel(0);
let (indexer_tx, indexer_rx) = sync_channel(0);
let reader_thread = self.start_reader(read_requests, reader_tx)?;
let reader_rx = Arc::new(Mutex::new(reader_rx));
let indexers: Vec<std::thread::JoinHandle<_>> = (0..4)
.map(|i| self.start_indexer(i, Arc::clone(&reader_rx), indexer_tx.clone()))
.collect();
drop(indexer_tx); // no need for the original sender
let mut block_rows = vec![];
let mut index_rows_count = 0usize;
for batch in indexer_rx.into_iter() {
let batch_block_rows = batch
.header_rows
.iter()
.map(|row| BlockRow::from_db_row(row).expect("bad BlockRow"));
block_rows.extend(batch_block_rows);
index_rows_count += batch.index_rows.len();
self.report_stats(&batch);
self.stats
.update_duration
.observe_duration("write", || self.store.read().unwrap().write(batch));
}
indexers
.into_iter()
.map(|indexer| indexer.join().expect("indexer thread panicked"))
.collect::<Result<_>>()
.context("indexer thread failed")?;
reader_thread
.join()
.expect("reader thread panicked")
.context("reader thread failed")?;
info!(
"indexed {} new blocks, {} DB rows, took {:.3}s",
block_rows.len(),
index_rows_count,
start.elapsed().as_millis() as f64 / 1e3
);
assert_eq!(new_blocks, block_rows.len());
// allow only one thread to apply full compaction
self.store.write().unwrap().start_compactions();
self.map.write().unwrap().update_chain(block_rows, tip);
Ok(tip)
}
}
fn db_rows_size(rows: &[db::Row]) -> usize {
rows.iter().map(|key| key.len()).sum()
}
fn index_single_block(
loc: BlockLocation,
block: Block,
undo: BlockUndo,
) -> (Vec<ScriptHashRow>, BlockRow) {
assert!(undo.txdata.len() + 1 == block.txdata.len());
let mut script_hash_rows = vec![];
let file_id = loc.file as u16;
let txcount = VarInt(block.txdata.len() as u64);
let mut next_tx_offset: usize =
loc.data + serialize(&block.header).len() + serialize(&txcount).len();
let mut undo_iter = undo.txdata.into_iter();
for tx in &block.txdata {
let tx_offset = next_tx_offset as u32;
next_tx_offset += tx.get_size();
let create_index_row = |script| {
let pos = FilePos {
file_id,
offset: tx_offset,
};
ScriptHashRow::new(ScriptHash::new(script), pos)
};
tx.output
.iter()
.map(|txo| &txo.script_pubkey)
.for_each(|script| script_hash_rows.push(create_index_row(script)));
if tx.is_coin_base() {
continue; // coinbase doesn't have an undo
}
let txundo = undo_iter.next().expect("no txundo");
assert_eq!(tx.input.len(), txundo.scripts.len());
txundo
.scripts
.iter()
.for_each(|script| script_hash_rows.push(create_index_row(script)));
}
assert!(undo_iter.next().is_none());
let block_pos = FilePos {
file_id,
offset: loc.data as u32,
};
let block_size = (next_tx_offset - loc.data) as u32;
(
script_hash_rows,
BlockRow::new(block.header, block_pos, block_size),
)
}
fn load_locations(
daemon: &Daemon,
mut blockhash: BlockHash,
existing: &BlockMap,
) -> Result<Vec<BlockLocation>> {
let start = Instant::now();
let null = BlockHash::default();
let mut result = vec![];
let mut loc_chunk = Vec::<BlockLocation>::new();
let mut loc_iter = loc_chunk.into_iter();
let mut total_blocks = 0usize;
let mut chunk_size = 10;
// scan until genesis
while blockhash != null {
if existing.in_valid_chain(&blockhash) {
break; // no need to continue validation
}
total_blocks += 1;
if let Some(header) = existing.get_by_hash(&blockhash) {
// this block was indexed - make sure it points back correctly (advancing loc_iter)
if let Some(loc) = loc_iter.next() {
assert_eq!(loc.prev, header.prev_blockhash);
}
blockhash = header.prev_blockhash;
continue;
}
// get next block location
let location = match loc_iter.next() {
// get a new chunk from daemon if needed
Some(loc) => loc,
None => {
loc_chunk = daemon.get_block_locations(&blockhash, chunk_size)?;
chunk_size = std::cmp::min(chunk_size * 10, 100_000); // takes <1s
trace!("got {} block locations", loc_chunk.len());
loc_iter = loc_chunk.into_iter();
loc_iter.next().unwrap()
}
};
blockhash = location.prev;
result.push(location);
}
debug!(
"loaded {} block locations ({} new) at {} ms",
total_blocks,
result.len(),
start.elapsed().as_millis(),
);
Ok(result)
}
fn group_locations_by_file(locations: Vec<BlockLocation>) -> BTreeMap<usize, Vec<BlockLocation>> {
let mut locations_by_file = BTreeMap::new();
for loc in locations.into_iter() {
locations_by_file
.entry(loc.file)
.or_insert_with(Vec::new)
.push(loc);
}
for locations in &mut locations_by_file.values_mut() {
locations.sort_by_key(|loc| loc.data);
}
locations_by_file
}
fn parse_from<T: Decodable, F: Read + Seek>(src: &mut F, offset: usize) -> Result<T> {
src.seek(SeekFrom::Start(offset as u64))?;
Ok(Decodable::consensus_decode(src).context("parsing failed")?)
}
fn parse_block_and_undo<F: Read + Seek>(
loc: BlockLocation,
blk_file: &mut F,
undo_file: &mut F,
) -> (BlockLocation, Block, BlockUndo) {
blk_file
.seek(SeekFrom::Start(loc.data as u64))
.expect("failed to seek");
let block: Block =
parse_from(blk_file, loc.data).unwrap_or_else(|_| panic!("bad Block at {:?}", loc));
let undo: BlockUndo = loc
.undo
.map(|offset| {
parse_from(undo_file, offset).unwrap_or_else(|_| panic!("bad BlockUndo at {:?}", loc))
})
.unwrap_or_else(|| {
// genesis block has no undo
assert_eq!(block.header.prev_blockhash, BlockHash::default());
BlockUndo::default() // create an empty undo
});
(loc, block, undo)
}
fn read_file(path: &Path) -> Result<Vec<u8>> {
Ok(std::fs::read(&path).with_context(|| format!("failed to read {:?}", path))?)
}
pub struct ConfirmedReader {
reader: Reader,
lookup_duration: Histogram,
header: BlockHeader,
height: usize,
}
impl ConfirmedReader {
pub fn read(self) -> Result<Confirmed> {
let tx = self
.lookup_duration
.observe_duration("lookup_read_tx", || {
self.reader
.read()
.context("failed to read confirmed transaction")
})?;
Ok(Confirmed::new(tx, self.header, self.height, &self.reader))
}
}
pub struct BlockReader {
reader: Reader,
lookup_duration: Histogram,
}
impl BlockReader {
pub fn read(self) -> Result<Block> {
self.lookup_duration
.observe_duration("lookup_read_block", || {
self.reader.read().context("failed to read block")
})
}
}

25
electrs_index/src/lib.rs Normal file
View File

@ -0,0 +1,25 @@
#[macro_use]
extern crate anyhow;
#[macro_use]
extern crate log;
#[macro_use]
extern crate serde_derive;
mod config;
mod daemon;
mod db;
mod index;
mod map;
mod metrics;
mod types;
pub use {
config::Config,
daemon::Daemon,
db::DBStore,
index::Index,
metrics::{Gauge, GaugeVec, Histogram, Metrics},
types::{Confirmed, ScriptHash},
};

176
electrs_index/src/map.rs Normal file
View File

@ -0,0 +1,176 @@
use bitcoin::{BlockHash, BlockHeader};
use chrono::{offset::TimeZone, Utc};
use std::{
collections::{BTreeMap, HashMap},
ops::Bound::{Included, Unbounded},
time::Instant,
};
use crate::types::{BlockRow, FilePos};
#[derive(Default)]
struct Chain {
by_height: Vec<BlockHash>,
by_hash: HashMap<BlockHash, usize>,
}
impl Chain {
fn build(tip: BlockHash, by_hash: &HashMap<BlockHash, HeaderPos>) -> Self {
// verify full chain till genesis
let mut by_height = vec![];
let mut blockhash = tip;
while blockhash != BlockHash::default() {
by_height.push(blockhash);
blockhash = match by_hash.get(&blockhash) {
Some(value) => value.header.prev_blockhash,
None => panic!("missing block header: {}", blockhash),
};
}
by_height.reverse();
let by_hash = by_height
.iter()
.enumerate()
.map(|(index, blockhash)| (*blockhash, index))
.collect();
Self { by_height, by_hash }
}
fn len(&self) -> usize {
self.by_height.len()
}
fn tip(&self) -> Option<&BlockHash> {
self.by_height.last()
}
}
struct HeaderPos {
header: BlockHeader,
pos: FilePos,
}
struct HashLimit {
hash: BlockHash,
limit: FilePos,
}
#[derive(Default)]
pub struct BlockMap {
by_hash: HashMap<BlockHash, HeaderPos>,
by_pos: BTreeMap<FilePos, HashLimit>, // start -> (limit, hash)
chain: Chain,
}
impl BlockMap {
pub(crate) fn new(block_rows: Vec<BlockRow>) -> Self {
let mut map = Self::default();
map.update_blocks(block_rows);
map
}
pub fn chain(&self) -> &[BlockHash] {
&self.chain.by_height
}
/// May return stale blocks
pub fn get_by_hash(&self, hash: &BlockHash) -> Option<&BlockHeader> {
self.by_hash.get(hash).map(|value| &value.header)
}
pub(crate) fn get_block_pos(&self, hash: &BlockHash) -> Option<&FilePos> {
self.by_hash.get(hash).map(|value| &value.pos)
}
pub(crate) fn in_valid_chain(&self, hash: &BlockHash) -> bool {
self.chain.by_hash.contains_key(hash)
}
fn update_blocks(&mut self, block_rows: Vec<BlockRow>) {
let start = Instant::now();
let total_blocks = block_rows.len();
let mut new_blocks = 0usize;
for row in block_rows {
let hash = row.header.block_hash();
self.by_hash.entry(hash).or_insert_with(|| {
new_blocks += 1;
HeaderPos {
header: row.header,
pos: row.pos,
}
});
let offset = FilePos {
file_id: row.pos.file_id,
offset: row.pos.offset,
};
let limit = FilePos {
file_id: row.pos.file_id,
offset: row.pos.offset + row.size,
};
assert!(self
.by_pos
.insert(offset, HashLimit { limit, hash })
.is_none());
}
debug!(
"added {}/{} headers at {} ms",
new_blocks,
total_blocks,
start.elapsed().as_millis()
);
}
pub(crate) fn find_block(&self, tx_pos: &FilePos) -> Option<(&BlockHash, usize)> {
// look up the block that ends after this position
let (start, HashLimit { limit, hash }) =
match self.by_pos.range((Unbounded, Included(tx_pos))).next_back() {
Some(item) => item,
None => panic!("block not found: {:?}", tx_pos),
};
// make sure the position is in the block
assert!(tx_pos < limit);
assert!(tx_pos >= start);
// make sure it's part of an active chain
self.chain.by_hash.get(hash).map(|height| (hash, *height))
}
pub(crate) fn update_chain(&mut self, block_rows: Vec<BlockRow>, tip: BlockHash) {
self.update_blocks(block_rows);
assert_eq!(self.by_hash.len(), self.by_pos.len());
// make sure there is no overlap between blocks
let mut last_limit = FilePos {
file_id: 0,
offset: 0,
};
for (start, HashLimit { limit, hash }) in &self.by_pos {
assert!(
start < limit,
"invalid block {}: bad start={:?} limit={:?}",
hash,
start,
limit
);
assert!(
last_limit < *start,
"invalid block {}: overlap found, start={:?} last_limit={:?}",
hash,
start,
last_limit
);
last_limit = *limit;
}
let chain = Chain::build(tip, &self.by_hash);
assert_eq!(chain.tip(), Some(&tip));
let tip_time = match self.get_by_hash(&tip) {
Some(header) => Utc.timestamp(header.time.into(), 0).to_rfc3339(),
None => panic!("missing tip: {}", tip),
};
info!(
"verified {} blocks, tip={} @ {}",
chain.len(),
tip,
tip_time
);
self.chain = chain;
}
}

View File

@ -0,0 +1,105 @@
use anyhow::{Context, Result};
use hyper::server::{Handler, Listening, Request, Response, Server};
use prometheus::{
self, process_collector::ProcessCollector, Encoder, HistogramOpts, HistogramVec, Opts, Registry,
};
pub use prometheus::{Gauge, GaugeVec};
use std::net::SocketAddr;
pub struct Metrics {
reg: Registry,
listen: Listening,
}
impl Drop for Metrics {
fn drop(&mut self) {
debug!("closing Prometheus server");
if let Err(e) = self.listen.close() {
warn!("failed to stop Prometheus server: {}", e);
}
}
}
#[derive(Clone)]
pub struct Histogram {
hist: HistogramVec,
}
impl Histogram {
pub fn observe_size(&self, label: &str, value: usize) {
self.hist.with_label_values(&[label]).observe(value as f64);
}
pub fn observe_duration<F, T>(&self, label: &str, func: F) -> T
where
F: FnOnce() -> T,
{
self.hist
.with_label_values(&[label])
.observe_closure_duration(func)
}
}
struct RegistryHandler {
reg: Registry,
}
impl RegistryHandler {
fn gather(&self) -> Result<Vec<u8>> {
let mut buffer = vec![];
prometheus::TextEncoder::new()
.encode(&self.reg.gather(), &mut buffer)
.context("failed to encode metrics")?;
Ok(buffer)
}
}
impl Handler for RegistryHandler {
fn handle(&self, req: Request, res: Response) {
trace!("{} {}", req.method, req.uri);
let buffer = self.gather().expect("failed to gather metrics");
res.send(&buffer).expect("send failed");
}
}
impl Metrics {
pub fn new(addr: SocketAddr) -> Result<Self> {
let reg = Registry::new();
reg.register(Box::new(ProcessCollector::for_self()))
.expect("failed to register ProcessCollector");
let listen = Server::http(addr)?
.handle(RegistryHandler { reg: reg.clone() })
.with_context(|| format!("failed to serve on {}", addr))?;
info!("serving Prometheus metrics on {}", addr);
Ok(Self { reg, listen })
}
pub fn histogram_vec(&self, name: &str, desc: &str, labels: &[&str]) -> Histogram {
let opts = HistogramOpts::new(name, desc);
let hist = HistogramVec::new(opts, labels).unwrap();
self.reg
.register(Box::new(hist.clone()))
.expect("failed to register Histogram");
Histogram { hist }
}
pub fn gauge(&self, name: &str, desc: &str) -> Gauge {
let gauge = Gauge::new(name, desc).unwrap();
self.reg
.register(Box::new(gauge.clone()))
.expect("failed to register Gauge");
gauge
}
pub fn gauge_vec(&self, name: &str, desc: &str, label: &str) -> GaugeVec {
let opts = Opts::new(name, desc);
let gauge_vec = GaugeVec::new(opts, &[label]).unwrap();
self.reg
.register(Box::new(gauge_vec.clone()))
.expect("failed to register GaugeVec");
gauge_vec
}
}

371
electrs_index/src/types.rs Normal file
View File

@ -0,0 +1,371 @@
use anyhow::{Context, Result};
use std::fs::File;
use std::io::{Seek, SeekFrom};
use std::path::PathBuf;
use bitcoin::{
blockdata::opcodes::all::*,
consensus::encode::{deserialize, serialize, Decodable, Encodable, ReadExt, VarInt},
hashes::{borrow_slice_impl, hash_newtype, hex_fmt_impl, index_impl, serde_impl, sha256, Hash},
util::key::PublicKey,
BlockHeader, Script, Transaction, Txid,
};
use crate::{daemon::Daemon, db};
hash_newtype!(
ScriptHash,
sha256::Hash,
32,
doc = "SHA256(scriptPubkey)",
true
);
macro_rules! impl_consensus_encoding {
($thing:ident, $($field:ident),+) => (
impl Encodable for $thing {
#[inline]
fn consensus_encode<S: ::std::io::Write>(
&self,
mut s: S,
) -> Result<usize, bitcoin::consensus::encode::Error> {
let mut len = 0;
$(len += self.$field.consensus_encode(&mut s)?;)+
Ok(len)
}
}
impl Decodable for $thing {
#[inline]
fn consensus_decode<D: ::std::io::Read>(
mut d: D,
) -> Result<$thing, bitcoin::consensus::encode::Error> {
Ok($thing {
$($field: Decodable::consensus_decode(&mut d)?),+
})
}
}
);
}
pub(crate) struct TxUndo {
pub scripts: Vec<Script>,
}
fn varint_decode<D: std::io::Read>(
mut d: D,
) -> std::result::Result<usize, bitcoin::consensus::encode::Error> {
let mut n = 0usize;
// TODO: add checks
loop {
let b = u8::consensus_decode(&mut d)?;
n = (n << 7) | (b & 0x7F) as usize;
if b & 0x80 != 0 {
n += 1;
} else {
return Ok(n);
}
}
}
fn decode_bytes<D: std::io::Read>(
mut d: D,
len: usize,
) -> std::result::Result<Vec<u8>, bitcoin::consensus::encode::Error> {
let mut ret = vec![0; len];
d.read_slice(&mut ret)?;
Ok(ret)
}
const SPECIAL_SCRIPTS: usize = 6;
fn decompress_script(
script_type: u8,
mut bytes: Vec<u8>,
) -> std::result::Result<Script, bitcoin::consensus::encode::Error> {
let builder = bitcoin::blockdata::script::Builder::new();
let script = match script_type {
0 => builder
.push_opcode(OP_DUP)
.push_opcode(OP_HASH160)
.push_slice(&bytes[..])
.push_opcode(OP_EQUALVERIFY)
.push_opcode(OP_CHECKSIG),
1 => builder
.push_opcode(OP_HASH160)
.push_slice(&bytes[..])
.push_opcode(OP_EQUAL),
2 | 3 => {
bytes.insert(0, script_type);
builder.push_slice(&bytes).push_opcode(OP_CHECKSIG)
}
4 | 5 => {
bytes.insert(0, script_type - 2);
let mut pubkey = PublicKey::from_slice(&bytes).expect("bad PublicKey");
pubkey.compressed = false;
builder
.push_slice(&pubkey.to_bytes())
.push_opcode(OP_CHECKSIG)
}
_ => unreachable!(),
}
.into_script();
assert!(script.is_p2pk() || script.is_p2pkh() || script.is_p2sh());
Ok(script)
}
fn script_decode<D: std::io::Read>(
mut d: D,
) -> std::result::Result<Script, bitcoin::consensus::encode::Error> {
let len = varint_decode(&mut d)?;
// info!("script len={}", len);
Ok(if len < SPECIAL_SCRIPTS {
let script_type = len as u8;
let size = match script_type {
0 | 1 => 20,
2 | 3 | 4 | 5 => 32,
_ => unreachable!(),
};
let compressed = decode_bytes(d, size)?;
decompress_script(script_type, compressed)?
} else {
let len = len - SPECIAL_SCRIPTS;
Script::from(decode_bytes(d, len)?)
})
}
impl Decodable for TxUndo {
fn consensus_decode<D: std::io::Read>(
mut d: D,
) -> std::result::Result<Self, bitcoin::consensus::encode::Error> {
let len = VarInt::consensus_decode(&mut d)?.0;
let mut scripts = vec![];
for _ in 0..len {
let _height_coinbase = varint_decode(&mut d)?;
assert_eq!(varint_decode(&mut d)?, 0); // unused today
let _amount = varint_decode(&mut d)?;
scripts.push(script_decode(&mut d)?)
}
Ok(TxUndo { scripts })
}
}
#[derive(Default)]
pub(crate) struct BlockUndo {
pub txdata: Vec<TxUndo>,
}
impl Decodable for BlockUndo {
fn consensus_decode<D: std::io::Read>(
mut d: D,
) -> std::result::Result<Self, bitcoin::consensus::encode::Error> {
let len = VarInt::consensus_decode(&mut d)?.0;
let mut txdata = vec![];
for _ in 0..len {
txdata.push(TxUndo::consensus_decode(&mut d)?)
}
Ok(BlockUndo { txdata })
}
}
pub struct Confirmed {
pub tx: Transaction,
pub txid: Txid,
pub header: BlockHeader,
pub height: usize,
pub file_offset: u32, // for correct ordering (https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html#status)
}
impl Confirmed {
pub(crate) fn new(
tx: Transaction,
header: BlockHeader,
height: usize,
reader: &Reader,
) -> Self {
let txid = tx.txid();
Self {
tx,
txid,
header,
height,
file_offset: reader.offset,
}
}
}
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Copy, Clone)]
pub(crate) struct FilePos {
pub file_id: u16, // currently there are <3k blk files
pub offset: u32, // currently blk files are ~128MB (2**27)
}
impl FilePos {
pub fn reader(&self, daemon: &Daemon) -> Reader {
let file = daemon.blk_file_path(self.file_id as usize);
Reader {
file,
offset: self.offset,
}
}
}
impl_consensus_encoding!(FilePos, file_id, offset);
#[derive(Debug)]
pub(crate) struct Reader {
file: PathBuf,
offset: u32,
}
impl Reader {
pub fn read<T>(&self) -> Result<T>
where
T: Decodable,
{
let mut file =
File::open(&self.file).with_context(|| format!("failed to open {:?}", self))?;
file.seek(SeekFrom::Start(self.offset.into()))
.with_context(|| format!("failed to seek {:?}", self))?;
T::consensus_decode(&mut file).with_context(|| format!("failed to decode {:?}", self))
}
}
const SCRIPT_HASH_PREFIX_LEN: usize = 8;
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct ScriptHashPrefix {
prefix: [u8; SCRIPT_HASH_PREFIX_LEN],
}
impl_consensus_encoding!(ScriptHashPrefix, prefix);
impl ScriptHash {
pub fn new(script: &Script) -> Self {
ScriptHash::hash(&script[..])
}
fn prefix(&self) -> ScriptHashPrefix {
let mut prefix = [0u8; SCRIPT_HASH_PREFIX_LEN];
prefix.copy_from_slice(&self.0[..SCRIPT_HASH_PREFIX_LEN]);
ScriptHashPrefix { prefix }
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub(crate) struct ScriptHashRow {
prefix: ScriptHashPrefix,
pos: FilePos, // transaction position on disk
}
impl_consensus_encoding!(ScriptHashRow, prefix, pos);
impl ScriptHashRow {
pub fn scan_prefix(script_hash: &ScriptHash) -> Box<[u8]> {
script_hash.0[..SCRIPT_HASH_PREFIX_LEN]
.to_vec()
.into_boxed_slice()
}
pub fn new(script_hash: ScriptHash, pos: FilePos) -> Self {
Self {
prefix: script_hash.prefix(),
pos,
}
}
pub fn to_db_row(&self) -> db::Row {
serialize(self).into_boxed_slice()
}
pub fn from_db_row(row: &[u8]) -> Result<Self> {
deserialize(&row).context("bad ScriptHashRow")
}
pub fn position(&self) -> &FilePos {
&self.pos
}
}
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct BlockRow {
pub header: BlockHeader,
pub pos: FilePos, // block position on disk
pub size: u32, // block size on disk
}
impl_consensus_encoding!(BlockRow, header, pos, size);
impl BlockRow {
pub fn new(header: BlockHeader, pos: FilePos, size: u32) -> Self {
Self { header, pos, size }
}
pub fn to_db_row(&self) -> db::Row {
serialize(self).into_boxed_slice()
}
pub fn from_db_row(row: &[u8]) -> Result<Self> {
deserialize(&row).context("bad BlockRowKey")
}
}
#[cfg(test)]
mod tests {
use crate::types::{varint_decode, FilePos, ScriptHash, ScriptHashRow};
use bitcoin::{hashes::hex::ToHex, Address};
use serde_json::{from_str, json};
use std::io::Cursor;
use std::str::FromStr;
#[test]
fn test_script_hash_serde() {
let hex = "\"4b3d912c1523ece4615e91bf0d27381ca72169dbf6b1c2ffcc9f92381d4984a3\"";
let script_hash: ScriptHash = from_str(&hex).unwrap();
assert_eq!(format!("\"{}\"", script_hash), hex);
assert_eq!(json!(script_hash).to_string(), hex);
}
#[test]
fn test_script_hash_row() {
let hex = "\"4b3d912c1523ece4615e91bf0d27381ca72169dbf6b1c2ffcc9f92381d4984a3\"";
let script_hash: ScriptHash = from_str(&hex).unwrap();
let row1 = ScriptHashRow::new(
script_hash,
FilePos {
file_id: 0x1234,
offset: 0x12345678,
},
);
let db_row = row1.to_db_row();
assert_eq!(db_row[..].to_hex(), "a384491d38929fcc341278563412");
let row2 = ScriptHashRow::from_db_row(&db_row).unwrap();
assert_eq!(row1, row2);
}
#[test]
fn test_varint() {
assert_eq!(varint_decode(Cursor::new(b"\x00")).unwrap(), 0);
assert_eq!(varint_decode(Cursor::new(b"\x01")).unwrap(), 1);
assert_eq!(varint_decode(Cursor::new(b"\x10")).unwrap(), 0x10);
assert_eq!(varint_decode(Cursor::new(b"\x7f")).unwrap(), 0x7f);
assert_eq!(varint_decode(Cursor::new(b"\x80\x00")).unwrap(), 0x80);
assert_eq!(varint_decode(Cursor::new(b"\x80\x01")).unwrap(), 0x81);
assert_eq!(varint_decode(Cursor::new(b"\x80\x7f")).unwrap(), 0xff);
assert_eq!(varint_decode(Cursor::new(b"\x81\x00")).unwrap(), 0x100);
assert_eq!(varint_decode(Cursor::new(b"\x81\x23")).unwrap(), 0x123);
assert_eq!(varint_decode(Cursor::new(b"\x80\x80\x00")).unwrap(), 0x4080);
assert_eq!(varint_decode(Cursor::new(b"\x81\x86\x07")).unwrap(), 0x8387);
}
#[test]
fn test_script_hash() {
let addr = Address::from_str("1KVNjD3AAnQ3gTMqoTKcWFeqSFujq9gTBT").unwrap();
let script_hash = ScriptHash::new(&addr.script_pubkey());
assert_eq!(
script_hash.to_hex(),
"00dfb264221d07712a144bda338e89237d1abd2db4086057573895ea2659766a"
);
}
}

View File

@ -0,0 +1,53 @@
use crossbeam_channel::{bounded, Receiver, RecvTimeoutError};
use signal_hook::iterator::Signals;
use std::thread;
use std::time::Duration;
use anyhow::Result;
#[derive(Clone)] // so multiple threads could wait on signals
pub struct Waiter {
rx: Receiver<i32>,
}
fn notify(signals: &[i32]) -> Receiver<i32> {
let (tx, rx) = bounded(1);
let signals = Signals::new(signals).expect("failed to register signal hook");
thread::spawn(move || {
for signal in signals.forever() {
info!("notified via SIG{}", signal);
tx.send(signal)
.unwrap_or_else(|_| panic!("failed to send signal {}", signal));
}
});
rx
}
impl Waiter {
pub fn start() -> Waiter {
Waiter {
rx: notify(&[
signal_hook::SIGINT,
signal_hook::SIGTERM,
signal_hook::SIGUSR1, // allow external triggering (e.g. via bitcoind `blocknotify`)
]),
}
}
pub fn wait(&self, duration: Duration) -> Result<()> {
match self.rx.recv_timeout(duration) {
Ok(sig) => {
if sig != signal_hook::SIGUSR1 {
bail!("Interrupted with SIG{}", sig);
};
Ok(())
}
Err(RecvTimeoutError::Timeout) => Ok(()),
Err(RecvTimeoutError::Disconnected) => panic!("signal hook channel disconnected"),
}
}
pub fn poll(&self) -> Result<()> {
self.wait(Duration::from_secs(0))
}
}

14
electrs_query/Cargo.toml Normal file
View File

@ -0,0 +1,14 @@
[package]
name = "electrs_query"
version = "0.1.0"
authors = ["Roman Zeyde <me@romanzey.de>"]
edition = "2018"
[dependencies]
anyhow = "1.0"
bitcoin = { version = "0.25", features = ["use-serde"] }
log = "0.4"
rayon = "1.3"
libc = "0.2"
electrs_index = { path = "../electrs_index" }

172
electrs_query/src/main.rs Normal file
View File

@ -0,0 +1,172 @@
#[macro_use]
extern crate log;
use anyhow::{Context, Result};
use bitcoin::{Address, OutPoint, Script, Txid};
use rayon::prelude::*;
use std::collections::HashMap;
use std::io::Read;
use std::path::Path;
use std::str::FromStr;
use std::time::Duration;
use electrs_index::*;
fn get_received(txs: &HashMap<Txid, Confirmed>, script: &Script) -> HashMap<OutPoint, u64> {
txs.iter()
.flat_map(|(txid, confirmed)| {
confirmed
.tx
.output
.iter()
.enumerate()
.filter_map(move |(i, txo)| {
if txo.script_pubkey == *script {
let outpoint = OutPoint {
txid: *txid,
vout: i as u32,
};
Some((outpoint, txo.value))
} else {
None
}
})
})
.collect()
}
fn get_spent<F>(txs: &HashMap<Txid, Confirmed>, is_received: F) -> HashMap<OutPoint, Txid>
where
F: Fn(&OutPoint) -> bool,
{
txs.iter()
.flat_map(|(txid, confirmed)| {
confirmed
.tx
.input
.iter()
.filter(|txi| is_received(&txi.previous_output))
.map(move |txi| (txi.previous_output, *txid))
})
.collect()
}
fn address_balance<'a>(
index: &Index,
daemon: &Daemon,
address: &'a Address,
) -> Result<Vec<(usize, OutPoint, u64, &'a Address)>> {
let script = &address.payload.script_pubkey();
let confirmed: Vec<Confirmed> = index
.lookup(&ScriptHash::new(script), daemon)?
.readers
.into_par_iter()
.map(|r| r.read())
.collect::<Result<_>>()?;
let txs: HashMap<Txid, Confirmed> = confirmed
.into_iter()
.map(|confirmed| (confirmed.txid, confirmed))
.collect();
let received_map = get_received(&txs, script);
let received_value = received_map.values().sum::<u64>();
debug!(
"received: {:.8} @ {} outputs (scanned {} txs, {} outputs)",
received_value as f64 / 1e8,
received_map.len(),
txs.len(),
txs.values()
.map(|confirmed| confirmed.tx.output.len())
.sum::<usize>(),
);
let spent_map = get_spent(&txs, |outpoint| received_map.contains_key(outpoint));
let spent_value = spent_map
.keys()
.map(|outpoint| received_map.get(outpoint).expect("missing TXO"))
.sum::<u64>();
debug!(
"spent: {:.8} @ {} inputs (scanned {} txs, {} inputs)",
spent_value as f64 / 1e8,
spent_map.len(),
txs.len(),
txs.values()
.map(|confirmed| confirmed.tx.input.len())
.sum::<usize>()
);
let unspent_map: HashMap<OutPoint, u64> = received_map
.into_iter()
.filter(|(outpoint, _)| !spent_map.contains_key(outpoint))
.collect();
Ok(unspent_map
.into_iter()
.map(|(outpoint, value)| {
let confirmed = txs.get(&outpoint.txid).unwrap();
(confirmed.height, outpoint, value, address)
})
.collect())
}
fn read_addresses_from_stdin() -> Vec<Address> {
let mut buffer = String::new();
std::io::stdin()
.read_to_string(&mut buffer)
.expect("failed to read stdin");
buffer
.split_whitespace()
.map(|a| Address::from_str(a).expect("invalid address"))
.collect()
}
fn query_index(index: &Index, daemon: &Daemon, addresses: &[Address]) -> Result<()> {
if addresses.is_empty() {
return Ok(());
}
let mut unspent = addresses
.par_iter()
.map(|address| address_balance(index, daemon, address))
.collect::<Result<Vec<Vec<_>>>>()
.context("failed to query address")?
.into_iter()
.flatten()
.collect::<Vec<_>>();
unspent.sort();
let total: u64 = unspent
.iter()
.map(|(height, outpoint, value, address)| {
info!(
"{}:{:<5} {:20.8} @ {} {}",
outpoint.txid,
outpoint.vout,
*value as f64 / 1e8,
height,
address,
);
value
})
.sum();
info!("total: {:.8} BTC", total as f64 / 1e8);
Ok(())
}
fn main() -> Result<()> {
let addresses = read_addresses_from_stdin();
let config = Config::from_args();
let daemon = Daemon::new(config.daemon_rpc_addr, &config.daemon_dir)
.context("failed to connect to daemon")?;
let metrics = Metrics::new(config.monitoring_addr)?;
let store = DBStore::open(Path::new(&config.db_path))?;
let index = Index::new(store, &metrics).context("failed to open index")?;
loop {
let tip = index.update(&daemon).context("failed to update index")?;
query_index(&index, &daemon, &addresses)?;
while daemon.wait_for_new_block(Duration::from_secs(60))? == tip {}
}
}

24
electrs_rpc/Cargo.toml Normal file
View File

@ -0,0 +1,24 @@
[package]
name = "electrs_rpc"
version = "0.1.0"
authors = ["Roman Zeyde <me@romanzey.de>"]
edition = "2018"
[dependencies]
anyhow = "1.0"
async-signals = "0.3"
async-std = { version = "1.6" }
bitcoin = { version = "0.25", features = ["use-serde"] }
bitcoincore-rpc = "0.12"
clap = "2.33"
futures = "0.3"
libc = "0.2"
log = "0.4"
rayon = "1.3"
rust-crypto = "0.2"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
hyper = "0.10"
electrs_index = { path = "../electrs_index" }

253
electrs_rpc/src/main.rs Normal file
View File

@ -0,0 +1,253 @@
#![recursion_limit = "256"]
mod mempool;
mod rpc;
mod util;
#[macro_use]
extern crate log;
use anyhow::{Context, Result};
use async_signals::Signals;
use async_std::{
future,
io::BufReader,
net::{TcpListener, TcpStream, ToSocketAddrs},
prelude::*,
task,
};
use bitcoin::BlockHash;
use futures::{
sink::SinkExt,
stream::StreamExt,
{select, FutureExt},
};
use serde_json::{de::from_str, Value};
use std::{
collections::hash_map::{Entry, HashMap},
path::Path,
sync::Arc,
time::Duration,
};
use electrs_index::*;
use rpc::{Rpc, Subscription};
use util::{spawn, unbounded, Receiver, Sender};
fn main() -> Result<()> {
let config = Config::from_args();
info!("{:?}", config);
let metrics = Metrics::new(config.monitoring_addr)?;
let daemon = Daemon::new(config.daemon_rpc_addr, &config.daemon_dir)
.context("failed to connect to daemon")?;
let store = DBStore::open(Path::new(&config.db_path))?;
let index = Index::new(store, &metrics).context("failed to open index")?;
let rpc = Rpc::new(index, daemon, &metrics)?;
let handle = task::spawn(accept_loop(config.electrum_rpc_addr, rpc));
task::block_on(handle)
}
#[derive(Debug)]
enum Void {}
#[derive(Debug)]
enum Event {
NewPeer {
id: usize,
stream: Arc<TcpStream>,
shutdown: Receiver<Void>,
},
Message {
id: usize,
req: Value,
},
NewBlock {
tip: BlockHash,
},
}
async fn accept_loop(addr: impl ToSocketAddrs, rpc: Rpc) -> Result<()> {
let listener = TcpListener::bind(addr).await?;
let mut incoming = listener.incoming();
let mut count = 0usize;
info!("serving Electrum RPC on {}", listener.local_addr()?);
let (broker_tx, broker_rx) = unbounded();
let mut broker_shutdown = task::spawn(server_loop(broker_rx, rpc)).into_stream();
loop {
select! {
result = incoming.next().fuse() => {
let stream = result.expect("missing stream")?;
spawn("recv_loop", recv_loop(count, broker_tx.clone(), stream));
count += 1;
},
result = broker_shutdown.next().fuse() => {
debug!("accept_loop is done");
return result.expect("missing result"); // TODO: takes ~0.5s?
},
};
}
}
struct Peer {
subscription: Subscription,
sender: Sender<Value>,
}
impl Peer {
fn new(sender: Sender<Value>) -> Self {
Self {
subscription: Subscription::new(),
sender,
}
}
}
async fn server_loop(events: Receiver<Event>, mut rpc: Rpc) -> Result<()> {
let mut peers: HashMap<usize, Peer> = HashMap::new();
let (disconnect_tx, disconnect_rx) = unbounded::<(usize, Receiver<Value>)>();
let mut disconnect_rx = disconnect_rx.fuse();
let mut events = events.fuse();
let mut signals = Signals::new(vec![libc::SIGINT, libc::SIGUSR1])
.context("failed to register signal handler")?
.fuse();
let mut new_block_rx = rpc.start_waiter()?;
loop {
for peer in peers.values_mut() {
for notification in rpc.notify(&mut peer.subscription)? {
peer.sender.send(notification).await.unwrap();
}
}
let event = select! {
sig = future::timeout(Duration::from_secs(5), signals.next()).fuse() => {
match sig {
Ok(Some(libc::SIGUSR1)) | Err(_) => {
rpc.sync_mempool();
continue;
},
Ok(Some(_)) => break, // unsupported signal
Ok(None) => panic!("missing signal"),
};
}
msg = new_block_rx.next() => {
match msg {
Some(tip) => Event::NewBlock { tip },
None => break,
}
},
disconnect = disconnect_rx.next() => {
let (id, _pending_messages) = disconnect.expect("missing disconnected ID");
info!("{}: disconnected", id);
assert!(peers.remove(&id).is_some());
continue;
},
event = events.next() => match event {
Some(event) => event,
None => break,
},
};
match event {
Event::NewBlock { tip } => {
debug!("new block: {}", tip);
rpc.sync_index().context("failed to sync with bitcoind")?;
}
Event::Message { id, req } => match peers.get_mut(&id) {
Some(peer) => {
let response = rpc.handle_request(&mut peer.subscription, req)?;
peer.sender.send(response).await.unwrap();
}
None => warn!("unknown client {}", id),
},
Event::NewPeer {
id,
stream,
shutdown,
} => match peers.entry(id) {
Entry::Occupied(..) => panic!("duplicate connection ID: {}", id),
Entry::Vacant(entry) => {
let (sender_tx, mut sender_rx) = unbounded();
entry.insert(Peer::new(sender_tx));
let mut disconnect_tx = disconnect_tx.clone();
spawn("send_loop", async move {
let res = send_loop(id, &mut sender_rx, stream, shutdown).await;
disconnect_tx
.send((id, sender_rx))
.await
.with_context(|| format!("failed to disconnect {}", id))?;
res
});
}
},
}
}
debug!("disconnecting {} clients: {:?}", peers.len(), peers.keys());
drop(peers); // drop all senders that write responses
drop(disconnect_tx);
while let Some((id, _sender_rx)) = disconnect_rx.next().await {
debug!("{}: gone", id)
}
debug!("server_loop is done");
Ok(())
}
async fn recv_loop(id: usize, mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
info!("{}: accepted {}", id, stream.peer_addr()?);
let stream = Arc::new(stream);
let reader = BufReader::new(&*stream);
let mut lines = reader.lines();
let (shutdown_tx, shutdown_rx) = unbounded::<Void>();
broker
.send(Event::NewPeer {
id,
stream: Arc::clone(&stream),
shutdown: shutdown_rx,
})
.await
.unwrap();
while let Some(line) = lines.next().await {
let line = line?;
debug!("{}: recv {}", id, line);
let req: Value = from_str(&line).with_context(|| format!("invalid JSON: {:?}", line))?;
broker
.send(Event::Message { id, req })
.await
.with_context(|| format!("failed to send {:?}", id))?;
}
drop(shutdown_tx);
Ok(())
}
async fn send_loop(
id: usize,
messages: &mut Receiver<Value>,
stream: Arc<TcpStream>,
shutdown: Receiver<Void>,
) -> Result<()> {
let mut stream = &*stream;
let mut messages = messages.fuse();
let mut shutdown = shutdown.fuse();
loop {
select! {
msg = messages.next().fuse() => match msg {
Some(msg) => {
let line = msg.to_string();
debug!("{}: send {}", id, line);
stream.write_all((line + "\n").as_bytes()).await?;
},
None => break,
},
void = shutdown.next().fuse() => match void {
Some(void) => match void {},
None => break,
}
}
}
Ok(())
}

485
electrs_rpc/src/mempool.rs Normal file
View File

@ -0,0 +1,485 @@
use anyhow::{Context, Result};
use bitcoin::{
consensus::deserialize, hashes::hex::FromHex, Amount, OutPoint, Script, Transaction, Txid,
};
use bitcoincore_rpc::{
json::{GetMempoolEntryResult, GetTxOutResult},
RpcApi,
};
use serde::ser::{Serialize, SerializeSeq, Serializer};
use serde_json::json;
use std::{
collections::{BTreeSet, HashMap, HashSet},
ops::Bound,
time::Instant,
};
use electrs_index::{
Daemon, ScriptHash, {Gauge, GaugeVec, Histogram, Metrics},
};
pub struct MempoolEntry {
pub tx: Transaction,
pub txid: Txid,
pub fee: Amount,
pub vsize: u64,
scripthashes: HashSet<ScriptHash>, // inputs & outputs
}
#[derive(Debug)]
pub struct HistogramEntry {
fee_rate: u64,
vsize: u64,
}
impl HistogramEntry {
fn new(fee_rate: Amount, vsize: u64) -> Self {
Self {
fee_rate: fee_rate.as_sat(),
vsize,
}
}
}
impl Serialize for HistogramEntry {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut seq = serializer.serialize_seq(Some(2))?;
seq.serialize_element(&self.fee_rate)?;
seq.serialize_element(&self.vsize)?;
seq.end()
}
}
struct Stats {
update_duration: Histogram,
lookup_duration: Histogram,
tx_rates: FeeRatesGauge,
tx_count: Gauge,
}
impl Stats {
fn new(metrics: &Metrics) -> Self {
Self {
update_duration: metrics.histogram_vec(
"mempool_update_duration",
"Update duration (in seconds)",
&["step"],
),
lookup_duration: metrics.histogram_vec(
"mempool_lookup_duration",
"Lookup duration (in seconds)",
&["step"],
),
tx_rates: FeeRatesGauge::new(metrics.gauge_vec(
"mempool_tx_rates",
"Mempool transactions' vsize",
"fee_rate",
)),
tx_count: metrics.gauge("mempool_tx_count", "Mempool transactions' count"),
}
}
}
pub struct Mempool {
entries: HashMap<Txid, MempoolEntry>,
by_scripthash: BTreeSet<(ScriptHash, Txid)>,
// sorted by descending fee rate.
// vsize of transactions paying >= fee_rate
histogram: Vec<HistogramEntry>,
stats: Stats,
txid_min: Txid,
txid_max: Txid,
}
impl Mempool {
pub fn empty(metrics: &Metrics) -> Self {
Self {
entries: HashMap::new(),
by_scripthash: BTreeSet::new(),
histogram: vec![],
stats: Stats::new(metrics),
txid_min: Txid::from_hex(
"0000000000000000000000000000000000000000000000000000000000000000",
)
.unwrap(),
txid_max: Txid::from_hex(
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
)
.unwrap(),
}
}
pub fn get(&self, txid: &Txid) -> Option<&MempoolEntry> {
self.stats
.lookup_duration
.observe_duration("get", || self.entries.get(txid))
}
pub fn lookup(&self, scripthash: ScriptHash) -> Vec<&MempoolEntry> {
self.stats.lookup_duration.observe_duration("lookup", || {
let range = (
Bound::Included((scripthash, self.txid_min)),
Bound::Included((scripthash, self.txid_max)),
);
self.by_scripthash
.range(range)
.map(|(_, txid)| self.entries.get(txid).expect("missing entry"))
.collect()
})
}
pub fn update(&mut self, daemon: &Daemon) -> Result<()> {
let start = Instant::now();
let update_duration = self.stats.update_duration.clone();
let (old, new) = update_duration.observe_duration("poll", || -> Result<_> {
let old: HashSet<Txid> = self.entries.keys().copied().collect();
let new: HashSet<Txid> = daemon
.client()
.get_raw_mempool()
.context("get_raw_mempool failed")?
.into_iter()
.collect();
Ok((old, new))
})?;
let removed = update_duration.observe_duration("remove", || {
// removed entries
let removed = &old - &new;
let removed_len = removed.len();
for txid in removed {
let entry = self.entries.remove(&txid).expect("missing entry");
for scripthash in &entry.scripthashes {
assert!(
self.by_scripthash.remove(&(*scripthash, txid)),
"missing entry"
);
}
}
removed_len
});
let entries = update_duration.observe_duration("fetch", || -> Result<_> {
// added entries
get_mempool_entries(daemon, &new - &old, &update_duration)
})?;
let added = entries.len();
update_duration.observe_duration("add", || {
for entry in entries {
for scripthash in &entry.scripthashes {
self.by_scripthash.insert((*scripthash, entry.txid));
}
assert!(self.entries.insert(entry.txid, entry).is_none());
}
});
update_duration.observe_duration("histogram", || {
self.histogram = build_histogram(self.entries.values(), &mut self.stats.tx_rates);
});
self.stats.tx_count.set(self.entries.len() as f64);
debug!(
"mempool has {} txs: added {}, removed {} (took {} ms)",
self.entries.len(),
added,
removed,
start.elapsed().as_millis()
);
Ok(())
}
pub fn histogram(&self) -> &[HistogramEntry] {
&self.histogram
}
pub fn count(&self) -> usize {
self.entries.len()
}
}
struct FeeRatesIter {
values: Vec<u64>,
factor: u64,
index: usize,
}
impl FeeRatesIter {
fn new() -> Self {
Self {
values: vec![1, 2, 5],
factor: 10,
index: 0,
}
}
}
impl Iterator for FeeRatesIter {
type Item = Amount;
fn next(&mut self) -> Option<Amount> {
if self.index == self.values.len() {
self.index = 0;
for v in &mut self.values {
*v *= self.factor
}
}
let result = self.values[self.index];
self.index += 1;
Some(Amount::from_sat(result))
}
}
struct FeeRatesGauge {
gauge: GaugeVec,
max_limit: Amount,
}
impl FeeRatesGauge {
fn new(gauge: GaugeVec) -> Self {
Self {
gauge,
max_limit: Amount::ZERO,
}
}
fn observe_histogram(&mut self, fee_rates: &[(Amount, u64)]) {
let mut limit_iter = FeeRatesIter::new().peekable();
let mut total_vsize = 0.0;
let mut prev_limit = Amount::ZERO;
for (rate, vsize) in fee_rates {
loop {
let limit = limit_iter.peek().unwrap();
if rate <= limit {
break;
}
self.gauge
.with_label_values(&[&format!(
"{:10} - {:10}",
prev_limit.as_sat(),
limit.as_sat()
)])
.set(total_vsize);
total_vsize = 0.0;
prev_limit = *limit;
limit_iter.next();
}
total_vsize += *vsize as f64;
}
loop {
let limit = limit_iter.peek().unwrap();
self.gauge
.with_label_values(&[&format!(
"{:10} - {:10}",
prev_limit.as_sat(),
limit.as_sat()
)])
.set(total_vsize);
total_vsize = 0.0;
prev_limit = *limit;
if *limit >= self.max_limit {
break;
}
limit_iter.next();
}
self.max_limit = *limit_iter.peek().unwrap();
}
}
fn build_histogram<'a>(
entries: impl Iterator<Item = &'a MempoolEntry>,
gauge: &mut FeeRatesGauge,
) -> Vec<HistogramEntry> {
let mut fee_rates: Vec<(Amount, u64)> = entries.map(|e| (e.fee / e.vsize, e.vsize)).collect();
fee_rates.sort_by_key(|item| item.0);
gauge.observe_histogram(&fee_rates);
let mut histogram = vec![];
let mut bin_vsize = 0;
let mut bin_fee_rate = Amount::ZERO;
for (fee_rate, vsize) in fee_rates.into_iter().rev() {
bin_fee_rate = fee_rate;
bin_vsize += vsize;
if bin_vsize > 100_000 {
histogram.push(HistogramEntry::new(bin_fee_rate, bin_vsize));
bin_vsize = 0;
}
}
if bin_vsize > 0 {
histogram.push(HistogramEntry::new(bin_fee_rate, bin_vsize));
}
histogram
}
fn get_entries_map(
daemon: &Daemon,
txids: &HashSet<Txid>,
) -> Result<HashMap<Txid, GetMempoolEntryResult>> {
let client = daemon.client().get_jsonrpc_client();
let txids: Vec<Txid> = txids.iter().copied().collect();
let args_vec: Vec<Vec<_>> = txids.iter().map(|txid| vec![json!(txid)]).collect();
let responses: Vec<_> = {
let requests: Vec<_> = args_vec
.iter()
.map(|args| client.build_request("getmempoolentry", args))
.collect();
client
.send_batch(&requests)
.with_context(|| format!("failed to fetch {} mempool entries", requests.len()))
}?;
let entries: Vec<_> = responses
.into_iter()
.map(|r| {
r.expect("no response")
.into_result::<GetMempoolEntryResult>()
.context("no mempool entry")
})
.collect::<Result<_>>()?;
debug!("got {} mempool entries", entries.len());
assert_eq!(entries.len(), args_vec.len());
Ok(txids.into_iter().zip(entries.into_iter()).collect())
}
fn get_transactions_map(
daemon: &Daemon,
txids: &HashSet<Txid>,
) -> Result<HashMap<Txid, Transaction>> {
let client = daemon.client().get_jsonrpc_client();
let txids: Vec<Txid> = txids.iter().copied().collect();
let args_vec: Vec<Vec<_>> = txids.iter().map(|txid| vec![json!(txid)]).collect();
let responses: Vec<_> = {
let requests: Vec<_> = args_vec
.iter()
.map(|args| client.build_request("getrawtransaction", args))
.collect();
client
.send_batch(&requests)
.with_context(|| format!("failed to fetch {} mempool transactions", requests.len()))
}?;
let txs: Vec<Transaction> = responses
.into_iter()
.map(|r| {
let hex = r
.expect("no response")
.into_result::<String>()
.context("no transaction")?;
let bytes: Vec<u8> = FromHex::from_hex(&hex).expect("invalid hex");
Ok(deserialize(&bytes).expect("invalid transaction"))
})
.collect::<Result<_>>()?;
debug!("got {} mempool transactions", txs.len());
assert_eq!(txs.len(), txids.len());
Ok(txids.into_iter().zip(txs.into_iter()).collect())
}
fn get_confirmed_outpoints(
daemon: &Daemon,
outpoints: Vec<OutPoint>,
) -> Result<HashMap<OutPoint, ScriptHash>> {
if outpoints.is_empty() {
return Ok(HashMap::new());
}
let client = daemon.client().get_jsonrpc_client();
let args_vec: Vec<Vec<_>> = outpoints
.iter()
.map(|outpoint| vec![json!(outpoint.txid), json!(outpoint.vout), json!(false)])
.collect();
let responses: Vec<_> = {
let requests: Vec<_> = args_vec
.iter()
.map(|args| client.build_request("gettxout", args))
.collect();
client
.send_batch(&requests)
.with_context(|| format!("failed to fetch {} UTXOs", requests.len()))
}?;
let scripthashes: Vec<ScriptHash> = responses
.into_iter()
.map(|r| {
let utxo = r
.expect("no response")
.into_result::<GetTxOutResult>()
.context("missing UTXO")?;
Ok(ScriptHash::new(&Script::from(utxo.script_pub_key.hex)))
})
.collect::<Result<_>>()?;
debug!("got {} confirmed scripthashes", scripthashes.len());
assert_eq!(scripthashes.len(), outpoints.len());
Ok(outpoints
.into_iter()
.zip(scripthashes.into_iter())
.collect())
}
fn get_mempool_entries(
daemon: &Daemon,
mut txids: HashSet<Txid>,
update_duration: &Histogram,
) -> Result<Vec<MempoolEntry>> {
if txids.is_empty() {
return Ok(vec![]);
}
let entry_map =
update_duration.observe_duration("fetch_entries", || get_entries_map(daemon, &txids))?;
for e in entry_map.values() {
for txid in &e.depends {
txids.insert(*txid);
}
}
let mut unconfirmed_map: HashMap<Txid, Transaction> = update_duration
.observe_duration("fetch_transactions", || {
get_transactions_map(daemon, &txids)
})?;
let mut confirmed_inputs = vec![];
let mut unconfirmed_inputs = vec![];
for (txid, entry) in &entry_map {
let tx = unconfirmed_map.get(txid).expect("missing mempool tx");
for txi in &tx.input {
if entry.depends.contains(&txi.previous_output.txid) {
unconfirmed_inputs.push(txi.previous_output)
} else {
confirmed_inputs.push(txi.previous_output)
}
}
}
let mut scripthashes_map = update_duration.observe_duration("fetch_confirmed", || {
get_confirmed_outpoints(daemon, confirmed_inputs)
})?;
for prev in unconfirmed_inputs {
let prev_tx = unconfirmed_map.get(&prev.txid).expect("missing mempool tx");
let prev_out = &prev_tx.output[prev.vout as usize];
scripthashes_map.insert(prev, ScriptHash::new(&prev_out.script_pubkey));
}
Ok(entry_map
.into_iter()
.map(|(txid, entry)| {
let tx = unconfirmed_map.remove(&txid).expect("missing mempool tx");
let scripthashes = tx
.output
.iter()
.map(|txo| ScriptHash::new(&txo.script_pubkey))
.chain(tx.input.iter().map(|txi| {
scripthashes_map
.remove(&txi.previous_output)
.expect("missing input")
}))
.collect();
MempoolEntry {
tx,
txid,
fee: entry.fees.base,
vsize: entry.vsize,
scripthashes,
}
})
.collect())
}

514
electrs_rpc/src/rpc.rs Normal file
View File

@ -0,0 +1,514 @@
use anyhow::{bail, Context, Result};
use bitcoin::{
consensus::{deserialize, serialize},
hashes::{
borrow_slice_impl, hash_newtype,
hex::{FromHex, ToHex},
hex_fmt_impl, index_impl, serde_impl, sha256, Hash,
},
BlockHash, Transaction, TxMerkleNode, Txid,
};
use futures::sink::SinkExt;
use rayon::prelude::*;
use serde_derive::{Deserialize, Serialize};
use serde_json::{from_value, json, Value};
use std::{
cmp::min,
collections::HashMap,
sync::RwLock,
time::{Duration, Instant},
};
use crate::mempool::{Mempool, MempoolEntry};
use crate::util::{spawn, unbounded, Receiver, Sender};
use electrs_index::{Confirmed, Daemon, Histogram, Index, Metrics, ScriptHash};
#[derive(Debug, Deserialize, Serialize, Clone)]
struct TxEntry {
#[serde(rename = "tx_hash")]
txid: Txid,
height: isize,
fee: Option<u64>, // in satoshis
}
impl TxEntry {
fn unconfirmed(entry: &MempoolEntry, mempool: &Mempool) -> Self {
let inputs = &entry.tx.input;
let has_unconfirmed_inputs = inputs
.iter()
.any(|txi| mempool.get(&txi.previous_output.txid).is_some());
Self {
txid: entry.txid,
height: if has_unconfirmed_inputs { -1 } else { 0 },
fee: Some(entry.fee.as_sat()),
}
}
fn confirmed(c: &Confirmed) -> Self {
Self {
txid: c.txid,
height: c.height as isize,
fee: None,
}
}
fn is_confirmed(&self) -> bool {
self.height > 0
}
}
hash_newtype!(StatusHash, sha256::Hash, 32, doc = "SHA256(status)", false);
impl StatusHash {
fn new(entries: &[TxEntry]) -> Option<Self> {
if entries.is_empty() {
None
} else {
let status = entries
.iter()
.map(|entry| format!("{}:{}:", entry.txid, entry.height))
.collect::<Vec<String>>()
.join("");
let hash = StatusHash::hash(&status.as_bytes());
trace!("{} => {}", status, hash);
Some(hash)
}
}
}
struct Status {
entries: Vec<TxEntry>,
hash: Option<StatusHash>,
tip: BlockHash,
}
impl Status {
fn new(entries: Vec<TxEntry>, tip: BlockHash) -> Self {
let hash = StatusHash::new(&entries);
Self { entries, hash, tip }
}
fn confirmed(&self) -> Vec<TxEntry> {
self.entries
.iter()
.filter(|e| e.is_confirmed())
.cloned()
.collect()
}
}
pub(crate) struct Subscription {
tip: Option<BlockHash>,
status: HashMap<ScriptHash, Status>,
}
impl Subscription {
pub(crate) fn new() -> Self {
Self {
tip: None,
status: HashMap::new(),
}
}
}
fn notification(method: &str, params: &[Value]) -> Value {
json!({"jsonrpc": "2.0", "method": method, "params": params})
}
#[derive(Debug, Deserialize, Serialize)]
struct Request {
id: u64,
jsonrpc: String,
method: String,
#[serde(default)]
params: Value,
}
async fn wait_for_new_blocks(
mut tip: BlockHash,
daemon: Daemon,
mut new_block_tx: Sender<BlockHash>,
) -> Result<()> {
loop {
let new_tip = daemon
.wait_for_new_block(Duration::from_secs(60))
.context("failed to wait for new block")?;
if tip != new_tip {
tip = new_tip;
new_block_tx.send(tip).await?;
}
}
}
struct Stats {
rpc_duration: Histogram,
sync_duration: Histogram,
}
impl Stats {
fn new(metrics: &Metrics) -> Self {
Self {
rpc_duration: metrics.histogram_vec(
"rpc_duration",
"RPC handling duration (in seconds)",
&["name"],
),
sync_duration: metrics.histogram_vec(
"sync_duration",
"RPC sync duration (in seconds)",
&["name"],
),
}
}
}
pub(crate) struct Rpc {
index: Index,
daemon: Daemon,
mempool: Mempool,
tx_cache: RwLock<HashMap<Txid, Transaction>>,
stats: Stats,
// mempool polling
next_poll: Instant,
poll_period: Duration,
}
impl Rpc {
pub(crate) fn new(index: Index, daemon: Daemon, metrics: &Metrics) -> Result<Self> {
let mut rpc = Self {
index,
daemon,
mempool: Mempool::empty(metrics),
tx_cache: RwLock::new(HashMap::new()),
stats: Stats::new(metrics),
next_poll: Instant::now(),
poll_period: Duration::from_secs(1),
};
rpc.sync_index().context("failed to sync with bitcoind")?;
rpc.sync_mempool();
info!("loaded {} mempool txs", rpc.mempool.count());
Ok(rpc)
}
pub(crate) fn sync_index(&mut self) -> Result<BlockHash> {
self.stats
.sync_duration
.observe_duration("index", || self.index.update(&self.daemon))
}
pub(crate) fn sync_mempool(&mut self) {
let sync_duration = self.stats.sync_duration.clone();
sync_duration.observe_duration("mempool", || {
let now = Instant::now();
if now <= self.next_poll {
return;
}
self.next_poll = now + self.poll_period;
if let Err(e) = self.mempool.update(&self.daemon) {
warn!("failed to sync mempool: {:?}", e);
}
})
}
pub(crate) fn start_waiter(&self) -> Result<Receiver<BlockHash>> {
let (new_block_tx, new_block_rx) = unbounded::<BlockHash>();
let current_tip = match self.index.map().chain().last() {
None => bail!("empty chain"),
Some(tip) => *tip,
};
spawn(
"waiter",
wait_for_new_blocks(current_tip, self.daemon.reconnect()?, new_block_tx),
);
Ok(new_block_rx)
}
pub(crate) fn notify(&self, subscription: &mut Subscription) -> Result<Vec<Value>> {
self.stats.sync_duration.observe_duration("notify", || {
let mut result = vec![];
let map = self.index.map();
let chain = map.chain();
let current_tip = match chain.last() {
None => bail!("empty chain"),
Some(tip) => *tip,
};
if let Some(last_tip) = subscription.tip {
if current_tip != last_tip {
let header = serialize(map.get_by_hash(&current_tip).unwrap());
result.push(notification(
"blockchain.headers.subscribe",
&[json!({"hex": header.to_hex(), "height": chain.len() - 1})],
));
subscription.tip = Some(current_tip);
}
};
drop(map);
for (scripthash, status) in subscription.status.iter_mut() {
let current_hash = status.hash;
let (mut entries, tip) = if status.tip == current_tip {
(status.confirmed(), status.tip)
} else {
self.get_confirmed(&scripthash)?
};
entries.extend(self.get_unconfirmed(&scripthash));
*status = Status::new(entries, tip);
if current_hash != status.hash {
result.push(notification(
"blockchain.scripthash.subscribe",
&[json!(scripthash), json!(status.hash)],
));
}
}
Ok(result)
})
}
fn headers_subscribe(&self, subscription: &mut Subscription) -> Result<Value> {
let map = self.index.map();
let chain = map.chain();
Ok(match chain.last() {
None => bail!("empty chain"),
Some(tip) => {
subscription.tip = Some(*tip);
let header = serialize(map.get_by_hash(tip).unwrap());
let height = chain.len() - 1;
json!({"hex": header.to_hex(), "height": height})
}
})
}
fn block_header(&self, (height,): (usize,)) -> Result<Value> {
let map = self.index.map();
let chain = map.chain();
match chain.get(height) {
None => bail!("no header at {}", height),
Some(hash) => Ok(json!(serialize(map.get_by_hash(hash).unwrap()).to_hex())),
}
}
fn block_headers(&self, (start_height, count): (usize, usize)) -> Result<Value> {
let map = self.index.map();
let chain = map.chain();
let max: usize = 2016;
let count = min(min(count, max), chain.len() - start_height);
let hex_headers: Vec<String> = chain
.get(start_height..start_height + count)
.unwrap()
.iter()
.map(|hash| serialize(map.get_by_hash(hash).unwrap()).to_hex())
.collect();
Ok(json!({"count": count, "hex": hex_headers.join(""), "max": max}))
}
fn estimate_fee(&self, (nblocks,): (u16,)) -> Result<Value> {
Ok(self
.daemon
.estimate_fee(nblocks)?
.map(|a| json!(a.as_btc()))
.unwrap_or_else(|| json!(-1)))
}
fn scripthash_get_history(
&self,
subscription: &Subscription,
(scripthash,): (ScriptHash,),
) -> Result<Value> {
match subscription.status.get(&scripthash) {
Some(status) => Ok(json!(status.entries)),
None => bail!("no subscription for scripthash"),
}
}
fn scripthash_subscribe(
&self,
subscription: &mut Subscription,
(scripthash,): (ScriptHash,),
) -> Result<Value> {
let (mut entries, tip) = self.get_confirmed(&scripthash)?;
entries.extend(self.get_unconfirmed(&scripthash));
let status = Status::new(entries, tip);
let hash = status.hash;
subscription.status.insert(scripthash, status);
Ok(json!(hash))
}
fn get_confirmed(&self, scripthash: &ScriptHash) -> Result<(Vec<TxEntry>, BlockHash)> {
let result = self
.index
.lookup(&scripthash, &self.daemon)
.context("index lookup failed")?;
let mut confirmed: Vec<Confirmed> = result
.readers
.into_par_iter()
.map(|r| r.read())
.collect::<Result<_>>()
.context("transaction reading failed")?;
confirmed.sort_by_key(|c| (c.height, c.file_offset));
let entries: Vec<TxEntry> = confirmed.iter().map(TxEntry::confirmed).collect();
let mut tx_cache = self.tx_cache.write().unwrap();
for c in confirmed {
tx_cache.entry(c.txid).or_insert(c.tx);
}
Ok((entries, result.tip))
}
fn get_unconfirmed(&self, scripthash: &ScriptHash) -> Vec<TxEntry> {
let entries: Vec<&MempoolEntry> = self.mempool.lookup(*scripthash);
let mut unconfirmed: Vec<TxEntry> = entries
.iter()
.map(|e| TxEntry::unconfirmed(e, &self.mempool))
.collect();
unconfirmed.sort_by_key(|u| u.txid);
let getter = |txid| match self.mempool.get(txid) {
Some(e) => e.tx.clone(),
None => panic!("missing mempool entry {}", txid),
};
let mut tx_cache = self.tx_cache.write().unwrap();
for u in &unconfirmed {
tx_cache.entry(u.txid).or_insert_with(|| getter(&u.txid));
}
unconfirmed
}
fn transaction_get(&self, (txid,): (Txid,)) -> Result<Value> {
match self.tx_cache.read().unwrap().get(&txid) {
Some(tx) => Ok(json!(serialize(tx).to_hex())),
None => panic!("tx {} is not cached", txid), // TODO: do we need txindex?
}
}
fn transaction_get_merkle(&self, (txid, height): (Txid, usize)) -> Result<Value> {
let blockhash = {
let map = self.index.map();
let chain = map.chain();
match chain.get(height) {
None => bail!("missing block at {}", height),
Some(blockhash) => *blockhash,
}
};
let block = self.index.block_reader(&blockhash, &self.daemon)?.read()?;
let txids: Vec<Txid> = block.txdata.into_iter().map(|tx| tx.txid()).collect();
let pos = match txids.iter().position(|current_txid| *current_txid == txid) {
None => bail!("missing tx {} at block {}", txid, blockhash),
Some(pos) => pos,
};
let nodes: Vec<TxMerkleNode> = txids
.iter()
.map(|txid| TxMerkleNode::from_hash(txid.as_hash()))
.collect();
let merkle: Vec<String> = create_merkle_branch(nodes, pos)
.into_iter()
.map(|node| node.to_hex())
.collect();
Ok(json!({"block_height": height, "pos": pos, "merkle": merkle}))
}
fn transaction_broadcast(&mut self, (tx_hex,): (String,)) -> Result<Value> {
let tx_bytes = Vec::from_hex(&tx_hex).context("non-hex transaction")?;
let tx: Transaction = deserialize(&tx_bytes).context("invalid transaction")?;
let txid = self
.daemon
.broadcast(&tx)
.with_context(|| format!("failed to broadcast transaction: {}", tx.txid()))?;
self.next_poll = Instant::now();
self.sync_mempool();
Ok(json!(txid))
}
fn relayfee(&self) -> Result<Value> {
Ok(json!(self.daemon.get_relay_fee()?.as_btc())) // [BTC/kB]
}
fn get_fee_histogram(&self) -> Result<Value> {
Ok(json!(self.mempool.histogram()))
}
pub(crate) fn handle_request(&mut self, sub: &mut Subscription, value: Value) -> Result<Value> {
let rpc_duration = self.stats.rpc_duration.clone();
let req_str = value.to_string();
let mut req: Request = from_value(value).context("invalid request")?;
let params = req.params.take();
rpc_duration.observe_duration(req.method.as_str(), || {
let result = match req.method.as_str() {
"blockchain.scripthash.get_history" => {
self.scripthash_get_history(sub, from_value(params)?)
}
"blockchain.scripthash.subscribe" => {
self.scripthash_subscribe(sub, from_value(params)?)
}
"blockchain.transaction.broadcast" => {
self.transaction_broadcast(from_value(params)?)
}
"blockchain.transaction.get" => self.transaction_get(from_value(params)?),
"blockchain.transaction.get_merkle" => {
self.transaction_get_merkle(from_value(params)?)
}
"server.banner" => Ok(json!("Welcome to Electrum Rust Server V2 (WIP)!")),
"server.donation_address" => Ok(Value::Null),
"server.peers.subscribe" => Ok(json!([])),
"blockchain.block.header" => self.block_header(from_value(params)?),
"blockchain.block.headers" => self.block_headers(from_value(params)?),
"blockchain.estimatefee" => self.estimate_fee(from_value(params)?),
"blockchain.headers.subscribe" => self.headers_subscribe(sub),
"blockchain.relayfee" => self.relayfee(),
"mempool.get_fee_histogram" => self.get_fee_histogram(),
"server.ping" => Ok(Value::Null),
"server.version" => Ok(json!(["electrs v2 - WIP", "1.4"])),
&_ => bail!("unknown method '{}' with {}", req.method, params,),
};
Ok(match result {
Ok(value) => json!({"jsonrpc": req.jsonrpc, "id": req.id, "result": value}),
Err(err) => {
let msg = format!("RPC {} failed: {:#}", req_str, err);
warn!("{}", msg);
let error = json!({"code": -32603, "message": msg});
json!({"jsonrpc": req.jsonrpc, "id": req.id, "error": error})
}
})
})
}
}
fn create_merkle_branch<T: Hash>(mut hashes: Vec<T>, mut index: usize) -> Vec<T> {
let mut result = vec![];
while hashes.len() > 1 {
if hashes.len() % 2 != 0 {
let last = *hashes.last().unwrap();
hashes.push(last);
}
index = if index % 2 == 0 { index + 1 } else { index - 1 };
result.push(hashes[index]);
index /= 2;
hashes = hashes
.chunks(2)
.map(|pair| {
let left = pair[0];
let right = pair[1];
let input = [&left[..], &right[..]].concat();
<T as Hash>::hash(&input)
})
.collect()
}
result
}
#[cfg(test)]
mod tests {
use crate::rpc::StatusHash;
use bitcoin::hashes::Hash;
use serde_json::json;
#[test]
fn test_status_hash() {
let status = "fb94cc7696fd077921a5918ad3ba973178845d800f564ac718b40f28b54e6091:650954:a7a3c9b47bf8f18136eca88386eed331971e6b2c0c41a5ea29cea88f7511564e:650958:6cc9451739098b70afc5d0e2cc3965136e87cb16dfd514f20d8252b3e6cbe565:650958:".as_bytes();
let hash = StatusHash::hash(status);
let hex = "8187fc643ddca88968ee123770078e3304cf1dcd889edf6b4b1026980774f8d9";
assert_eq!(format!("{}", hash), hex);
assert_eq!(json!(hash).to_string(), format!("\"{}\"", hex));
}
}

19
electrs_rpc/src/util.rs Normal file
View File

@ -0,0 +1,19 @@
use anyhow::Result;
use async_std::task;
use futures::{channel::mpsc, future::Future};
pub type Sender<T> = mpsc::UnboundedSender<T>;
pub type Receiver<T> = mpsc::UnboundedReceiver<T>;
pub use mpsc::unbounded;
pub fn spawn<F>(name: &'static str, fut: F) -> task::JoinHandle<()>
where
F: Future<Output = Result<()>> + Send + 'static,
{
task::spawn(async move {
if let Err(e) = fut.await {
error!("{} failed: {:?}", name, e)
}
})
}