diff --git a/src/daemon.rs b/src/daemon.rs index c09e468..0903c25 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -15,6 +15,7 @@ use std::path::Path; use crate::{ chain::{Chain, NewHeader}, config::Config, + metrics::Metrics, p2p::Connection, signals::ExitFlag, }; @@ -112,7 +113,11 @@ struct BlockchainInfo { } impl Daemon { - pub(crate) fn connect(config: &Config, exit_flag: &ExitFlag) -> Result { + pub(crate) fn connect( + config: &Config, + exit_flag: &ExitFlag, + metrics: &Metrics, + ) -> Result { let mut rpc = rpc_connect(config)?; loop { @@ -142,7 +147,11 @@ impl Daemon { bail!("electrs requires non-pruned bitcoind node"); } - let p2p = Mutex::new(Connection::connect(config.network, config.daemon_p2p_addr)?); + let p2p = Mutex::new(Connection::connect( + config.network, + config.daemon_p2p_addr, + metrics, + )?); Ok(Self { p2p, rpc }) } diff --git a/src/electrum.rs b/src/electrum.rs index d2a6f85..7f82980 100644 --- a/src/electrum.rs +++ b/src/electrum.rs @@ -133,11 +133,9 @@ impl Rpc { ); let signal = Signal::new(); + let daemon = Daemon::connect(config, signal.exit_flag(), tracker.metrics())?; tracker - .sync( - &Daemon::connect(config, signal.exit_flag())?, - signal.exit_flag(), - ) + .sync(&daemon, signal.exit_flag()) .context("initial sync failed")?; let cache = Cache::new(tracker.metrics()); @@ -145,7 +143,7 @@ impl Rpc { tracker, cache, rpc_duration, - daemon: Daemon::connect(config, signal.exit_flag())?, + daemon, signal, banner: config.server_banner.clone(), port: config.electrum_rpc_addr.port(), diff --git a/src/p2p.rs b/src/p2p.rs index 58acc65..cd82910 100644 --- a/src/p2p.rs +++ b/src/p2p.rs @@ -18,7 +18,10 @@ use std::iter::FromIterator; use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}; use std::time::{SystemTime, UNIX_EPOCH}; -use crate::chain::{Chain, NewHeader}; +use crate::{ + chain::{Chain, NewHeader}, + metrics::{default_duration_buckets, Histogram, Metrics}, +}; enum Request { GetNewHeaders(GetHeadersMessage), @@ -48,6 +51,8 @@ pub(crate) struct Connection { blocks_recv: Receiver, headers_recv: Receiver>, new_block_recv: Receiver<()>, + + blocks_duration: Histogram, } impl Connection { @@ -89,16 +94,23 @@ impl Connection { if blockhashes.is_empty() { return Ok(()); } - debug!("loading {} blocks", blockhashes.len()); - self.req_send.send(Request::get_blocks(&blockhashes))?; + self.blocks_duration.observe_duration("send", || { + debug!("loading {} blocks", blockhashes.len()); + self.req_send.send(Request::get_blocks(&blockhashes)) + })?; for hash in blockhashes { - let block = self - .blocks_recv - .recv() - .with_context(|| format!("failed to get block {}", hash))?; - ensure!(block.block_hash() == hash, "got unexpected block"); - func(hash, block); + let block = self.blocks_duration.observe_duration("recv", || { + self.blocks_recv + .recv() + .with_context(|| format!("failed to get block {}", hash)) + })?; + + self.blocks_duration.observe_duration("process", || { + ensure!(block.block_hash() == hash, "got unexpected block"); + func(hash, block); + Ok(()) + })?; } Ok(()) } @@ -108,7 +120,11 @@ impl Connection { self.new_block_recv.clone() } - pub(crate) fn connect(network: Network, address: SocketAddr) -> Result { + pub(crate) fn connect( + network: Network, + address: SocketAddr, + metrics: &Metrics, + ) -> Result { let mut stream = TcpStream::connect(address) .with_context(|| format!("{} p2p failed to connect: {:?}", network, address))?; let mut reader = StreamReader::new( @@ -118,10 +134,28 @@ impl Connection { let (tx_send, tx_recv) = bounded::(1); let (rx_send, rx_recv) = bounded::(1); + let send_duration = metrics.histogram_vec( + "p2p_send_duration", + "Time spent sending p2p messages", + "step", + default_duration_buckets(), + ); + let recv_duration = metrics.histogram_vec( + "p2p_recv_duration", + "Time spent receiving p2p messages", + "step", + default_duration_buckets(), + ); + let blocks_duration = metrics.histogram_vec( + "p2p_blocks_duration", + "Time spent getting blocks via p2p protocol", + "step", + default_duration_buckets(), + ); + crate::thread::spawn("p2p_send", move || loop { use std::net::Shutdown; - - let msg = match tx_recv.recv() { + let msg = match send_duration.observe_duration("wait", || tx_recv.recv()) { Ok(msg) => msg, Err(_) => { // p2p_loop is closed, so tx_send is disconnected @@ -133,32 +167,36 @@ impl Connection { return Ok(()); } }; - - trace!("send: {:?}", msg); - let raw_msg = message::RawNetworkMessage { - magic: network.magic(), - payload: msg, - }; - stream - .write_all(encode::serialize(&raw_msg).as_slice()) - .context("p2p failed to send")?; + send_duration.observe_duration("send", || { + trace!("send: {:?}", msg); + let raw_msg = message::RawNetworkMessage { + magic: network.magic(), + payload: msg, + }; + stream + .write_all(encode::serialize(&raw_msg).as_slice()) + .context("p2p failed to send") + })?; }); crate::thread::spawn("p2p_recv", move || loop { use bitcoin::consensus::encode::Error; use std::io::ErrorKind; - let raw_msg: message::RawNetworkMessage = match reader.read_next() { - Ok(raw_msg) => raw_msg, - Err(Error::Io(e)) if e.kind() == ErrorKind::UnexpectedEof => { - debug!("closing p2p_recv thread: connection closed"); - return Ok(()); - } - Err(e) => bail!("failed to recv a message from peer: {}", e), - }; + let raw_msg: message::RawNetworkMessage = + match recv_duration.observe_duration("recv", || reader.read_next()) { + Ok(raw_msg) => raw_msg, + Err(Error::Io(e)) if e.kind() == ErrorKind::UnexpectedEof => { + debug!("closing p2p_recv thread: connection closed"); + return Ok(()); + } + Err(e) => bail!("failed to recv a message from peer: {}", e), + }; - trace!("recv: {:?}", raw_msg.payload); - rx_send.send(raw_msg.payload)?; + recv_duration.observe_duration("wait", || { + trace!("recv: {:?}", raw_msg.payload); + rx_send.send(raw_msg.payload) + })?; }); let (req_send, req_recv) = bounded::(1); @@ -230,6 +268,7 @@ impl Connection { blocks_recv, headers_recv, new_block_recv, + blocks_duration, }) } }