mirror of
https://github.com/romanz/electrs.git
synced 2024-11-19 01:43:29 +01:00
execute mempool scanning in separate thread
The mempool scanning procedure can now be launched in a separate thread, while mempool updates are applied synchronously in the main thread.
This commit is contained in:
parent
060c548b06
commit
c2c7141ef3
@ -9,15 +9,17 @@ use rayon::prelude::*;
|
||||
use serde_derive::Deserialize;
|
||||
use serde_json::{self, json, Value};
|
||||
|
||||
use std::collections::{hash_map::Entry, HashMap};
|
||||
use std::collections::{hash_map::Entry, HashMap, HashSet};
|
||||
use std::fmt;
|
||||
use std::iter::FromIterator;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::{
|
||||
cache::Cache,
|
||||
config::{Config, ELECTRS_VERSION},
|
||||
daemon::{self, extract_bitcoind_error, Daemon},
|
||||
mempool::MempoolSyncUpdate,
|
||||
merkle::Proof,
|
||||
metrics::{self, Histogram, Metrics},
|
||||
signals::Signal,
|
||||
@ -123,7 +125,7 @@ pub struct Rpc {
|
||||
tracker: Tracker,
|
||||
cache: Cache,
|
||||
rpc_duration: Histogram,
|
||||
daemon: Daemon,
|
||||
daemon: Arc<Daemon>,
|
||||
signal: Signal,
|
||||
banner: String,
|
||||
port: u16,
|
||||
@ -147,7 +149,7 @@ impl Rpc {
|
||||
tracker,
|
||||
cache,
|
||||
rpc_duration,
|
||||
daemon,
|
||||
daemon: Arc::new(daemon),
|
||||
signal,
|
||||
banner: config.server_banner.clone(),
|
||||
port: config.electrum_rpc_addr.port(),
|
||||
@ -158,12 +160,25 @@ impl Rpc {
|
||||
&self.signal
|
||||
}
|
||||
|
||||
pub(crate) fn daemon(&self) -> &Arc<Daemon> {
|
||||
&self.daemon
|
||||
}
|
||||
|
||||
pub fn new_block_notification(&self) -> Receiver<()> {
|
||||
self.daemon.new_block_notification()
|
||||
}
|
||||
|
||||
pub fn sync(&mut self) -> Result<bool> {
|
||||
self.tracker.sync(&self.daemon, self.signal.exit_flag())
|
||||
pub fn sync_chain(&mut self) -> Result<bool> {
|
||||
self.tracker
|
||||
.sync_chain(&self.daemon, self.signal.exit_flag())
|
||||
}
|
||||
|
||||
pub(crate) fn mempool_txids(&self) -> HashSet<Txid> {
|
||||
self.tracker.mempool.all_txids()
|
||||
}
|
||||
|
||||
pub(crate) fn mempool_apply(&mut self, sync_update: MempoolSyncUpdate) {
|
||||
self.tracker.mempool.apply_sync_update(sync_update)
|
||||
}
|
||||
|
||||
pub fn update_client(&self, client: &mut Client) -> Result<Vec<String>> {
|
||||
@ -432,7 +447,7 @@ impl Rpc {
|
||||
}
|
||||
|
||||
fn get_fee_histogram(&self) -> Result<Value> {
|
||||
Ok(json!(self.tracker.fees_histogram()))
|
||||
Ok(json!(self.tracker.mempool.fees_histogram()))
|
||||
}
|
||||
|
||||
fn server_id(&self) -> String {
|
||||
|
@ -127,6 +127,10 @@ impl Mempool {
|
||||
&self.fees
|
||||
}
|
||||
|
||||
pub(crate) fn all_txids(&self) -> HashSet<Txid> {
|
||||
HashSet::<Txid>::from_iter(self.entries.keys().copied())
|
||||
}
|
||||
|
||||
pub(crate) fn get(&self, txid: &Txid) -> Option<&Entry> {
|
||||
self.entries.get(txid)
|
||||
}
|
||||
@ -186,22 +190,6 @@ impl Mempool {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) {
|
||||
let old_txids = HashSet::<Txid>::from_iter(self.entries.keys().copied());
|
||||
|
||||
let poll_result = MempoolSyncUpdate::poll(daemon, old_txids, exit_flag);
|
||||
|
||||
let sync_update = match poll_result {
|
||||
Ok(sync_update) => sync_update,
|
||||
Err(e) => {
|
||||
warn!("mempool sync failed: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
self.apply_sync_update(sync_update);
|
||||
}
|
||||
|
||||
/// Add a transaction entry to the mempool and update the fee histogram.
|
||||
fn add_entry(&mut self, entry: Entry) {
|
||||
for txi in &entry.tx.input {
|
||||
|
@ -1,19 +1,23 @@
|
||||
use anyhow::{Context, Result};
|
||||
use crossbeam_channel::{select, unbounded, Sender};
|
||||
use bitcoin::Txid;
|
||||
use crossbeam_channel::{bounded, select, unbounded, Receiver, Sender};
|
||||
use rayon::prelude::*;
|
||||
|
||||
use std::{
|
||||
collections::hash_map::HashMap,
|
||||
collections::{HashMap, HashSet},
|
||||
io::{BufRead, BufReader, Write},
|
||||
iter::once,
|
||||
net::{Shutdown, TcpListener, TcpStream},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
config::Config,
|
||||
daemon::Daemon,
|
||||
electrum::{Client, Rpc},
|
||||
mempool::MempoolSyncUpdate,
|
||||
metrics::{self, Metrics},
|
||||
signals::ExitError,
|
||||
signals::{ExitError, ExitFlag},
|
||||
thread::spawn,
|
||||
};
|
||||
|
||||
@ -87,19 +91,58 @@ fn serve() -> Result<()> {
|
||||
|
||||
let new_block_rx = rpc.new_block_notification();
|
||||
let mut peers = HashMap::<usize, Peer>::new();
|
||||
|
||||
let (mempool_update_tx, mempool_update_rx) = bounded(0);
|
||||
let (mempool_run_tx, mempool_run_rx) = bounded(0);
|
||||
|
||||
let daemon = Arc::clone(rpc.daemon());
|
||||
let exit_flag = rpc.signal().exit_flag().clone();
|
||||
if !config.ignore_mempool {
|
||||
spawn("mempool_sync", move || {
|
||||
mempool_sync_loop(daemon, mempool_update_tx, mempool_run_rx, exit_flag)
|
||||
});
|
||||
}
|
||||
|
||||
loop {
|
||||
// initial sync and compaction may take a few hours
|
||||
while server_rx.is_empty() {
|
||||
let done = duration.observe_duration("sync", || rpc.sync().context("sync failed"))?; // sync a batch of blocks
|
||||
let done =
|
||||
duration.observe_duration("sync", || rpc.sync_chain().context("sync failed"))?; // sync a batch of blocks
|
||||
peers = duration.observe_duration("notify", || notify_peers(&rpc, peers)); // peers are disconnected on error
|
||||
if !done {
|
||||
continue; // more blocks to sync
|
||||
}
|
||||
|
||||
if config.sync_once {
|
||||
return Ok(()); // exit after initial sync is done
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
select! {
|
||||
// Start an asynchronous scan of the mempool if we aren't already doing so.
|
||||
// You'd expect this might cause unneeded allocations copying mempool TXIDs, but
|
||||
// `rpc.mempool_txids()` is only evaluated if the channel send is doesn't block.
|
||||
send(mempool_run_tx, rpc.mempool_txids()) -> res => {
|
||||
match res {
|
||||
Ok(_) => (),
|
||||
Err(_) => warn!("disconnected from mempool scan thread"),
|
||||
};
|
||||
}
|
||||
|
||||
// Received an update to the mempool state. Apply it.
|
||||
recv(mempool_update_rx) -> res => {
|
||||
match res {
|
||||
Ok(sync_update) => rpc.mempool_apply(sync_update),
|
||||
Err(_) => warn!("disconnected from mempool scan thread"),
|
||||
};
|
||||
}
|
||||
|
||||
// Mempool scanning in progress, or `ignore_mempool` is true.
|
||||
default => {}
|
||||
};
|
||||
|
||||
duration.observe_duration("select", || -> Result<()> {
|
||||
select! {
|
||||
// Handle signals for graceful shutdown
|
||||
@ -247,3 +290,24 @@ fn recv_loop(peer_id: usize, stream: &TcpStream, server_tx: Sender<Event>) -> Re
|
||||
server_tx.send(Event { peer_id, msg })?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn mempool_sync_loop(
|
||||
daemon: Arc<Daemon>,
|
||||
mempool_update_tx: Sender<MempoolSyncUpdate>,
|
||||
mempool_run_rx: Receiver<HashSet<Txid>>,
|
||||
exit_flag: ExitFlag,
|
||||
) -> Result<()> {
|
||||
while let Ok(old_txids) = mempool_run_rx.recv() {
|
||||
match MempoolSyncUpdate::poll(&daemon, old_txids, &exit_flag) {
|
||||
Ok(sync_update) => {
|
||||
let _ = mempool_update_tx.send(sync_update);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("mempool sync failed: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
debug!("mempool sync thread exiting");
|
||||
Ok(())
|
||||
}
|
||||
|
@ -13,7 +13,7 @@ use crate::{
|
||||
daemon::Daemon,
|
||||
db::DBStore,
|
||||
index::Index,
|
||||
mempool::{FeeHistogram, Mempool},
|
||||
mempool::Mempool,
|
||||
metrics::Metrics,
|
||||
signals::ExitFlag,
|
||||
status::{Balance, ScriptHashStatus, UnspentEntry},
|
||||
@ -21,10 +21,9 @@ use crate::{
|
||||
|
||||
/// Electrum protocol subscriptions' tracker
|
||||
pub struct Tracker {
|
||||
pub(crate) mempool: Mempool,
|
||||
index: Index,
|
||||
mempool: Mempool,
|
||||
metrics: Metrics,
|
||||
ignore_mempool: bool,
|
||||
}
|
||||
|
||||
pub(crate) enum Error {
|
||||
@ -51,7 +50,6 @@ impl Tracker {
|
||||
.context("failed to open index")?,
|
||||
mempool: Mempool::new(&metrics),
|
||||
metrics,
|
||||
ignore_mempool: config.ignore_mempool,
|
||||
})
|
||||
}
|
||||
|
||||
@ -59,10 +57,6 @@ impl Tracker {
|
||||
self.index.chain()
|
||||
}
|
||||
|
||||
pub(crate) fn fees_histogram(&self) -> &FeeHistogram {
|
||||
self.mempool.fees_histogram()
|
||||
}
|
||||
|
||||
pub(crate) fn metrics(&self) -> &Metrics {
|
||||
&self.metrics
|
||||
}
|
||||
@ -71,12 +65,8 @@ impl Tracker {
|
||||
status.get_unspent(self.index.chain())
|
||||
}
|
||||
|
||||
pub(crate) fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) -> Result<bool> {
|
||||
pub(crate) fn sync_chain(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) -> Result<bool> {
|
||||
let done = self.index.sync(daemon, exit_flag)?;
|
||||
if done && !self.ignore_mempool {
|
||||
self.mempool.sync(daemon, exit_flag);
|
||||
// TODO: double check tip - and retry on diff
|
||||
}
|
||||
Ok(done)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user