1
0
mirror of https://github.com/romanz/electrs.git synced 2024-11-19 01:43:29 +01:00

Merge pull request #546 from romanz/p2p-metrics

Add p2p protocol monitoring
This commit is contained in:
Roman Zeyde 2021-10-12 17:46:22 +03:00 committed by GitHub
commit b94c98fc31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 84 additions and 38 deletions

View File

@ -15,6 +15,7 @@ use std::path::Path;
use crate::{
chain::{Chain, NewHeader},
config::Config,
metrics::Metrics,
p2p::Connection,
signals::ExitFlag,
};
@ -112,7 +113,11 @@ struct BlockchainInfo {
}
impl Daemon {
pub(crate) fn connect(config: &Config, exit_flag: &ExitFlag) -> Result<Self> {
pub(crate) fn connect(
config: &Config,
exit_flag: &ExitFlag,
metrics: &Metrics,
) -> Result<Self> {
let mut rpc = rpc_connect(config)?;
loop {
@ -142,7 +147,11 @@ impl Daemon {
bail!("electrs requires non-pruned bitcoind node");
}
let p2p = Mutex::new(Connection::connect(config.network, config.daemon_p2p_addr)?);
let p2p = Mutex::new(Connection::connect(
config.network,
config.daemon_p2p_addr,
metrics,
)?);
Ok(Self { p2p, rpc })
}

View File

@ -133,11 +133,9 @@ impl Rpc {
);
let signal = Signal::new();
let daemon = Daemon::connect(config, signal.exit_flag(), tracker.metrics())?;
tracker
.sync(
&Daemon::connect(config, signal.exit_flag())?,
signal.exit_flag(),
)
.sync(&daemon, signal.exit_flag())
.context("initial sync failed")?;
let cache = Cache::new(tracker.metrics());
@ -145,7 +143,7 @@ impl Rpc {
tracker,
cache,
rpc_duration,
daemon: Daemon::connect(config, signal.exit_flag())?,
daemon,
signal,
banner: config.server_banner.clone(),
port: config.electrum_rpc_addr.port(),

View File

@ -18,7 +18,10 @@ use std::iter::FromIterator;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
use std::time::{SystemTime, UNIX_EPOCH};
use crate::chain::{Chain, NewHeader};
use crate::{
chain::{Chain, NewHeader},
metrics::{default_duration_buckets, Histogram, Metrics},
};
enum Request {
GetNewHeaders(GetHeadersMessage),
@ -48,6 +51,8 @@ pub(crate) struct Connection {
blocks_recv: Receiver<Block>,
headers_recv: Receiver<Vec<BlockHeader>>,
new_block_recv: Receiver<()>,
blocks_duration: Histogram,
}
impl Connection {
@ -89,16 +94,23 @@ impl Connection {
if blockhashes.is_empty() {
return Ok(());
}
debug!("loading {} blocks", blockhashes.len());
self.req_send.send(Request::get_blocks(&blockhashes))?;
self.blocks_duration.observe_duration("send", || {
debug!("loading {} blocks", blockhashes.len());
self.req_send.send(Request::get_blocks(&blockhashes))
})?;
for hash in blockhashes {
let block = self
.blocks_recv
.recv()
.with_context(|| format!("failed to get block {}", hash))?;
ensure!(block.block_hash() == hash, "got unexpected block");
func(hash, block);
let block = self.blocks_duration.observe_duration("recv", || {
self.blocks_recv
.recv()
.with_context(|| format!("failed to get block {}", hash))
})?;
self.blocks_duration.observe_duration("process", || {
ensure!(block.block_hash() == hash, "got unexpected block");
func(hash, block);
Ok(())
})?;
}
Ok(())
}
@ -108,7 +120,11 @@ impl Connection {
self.new_block_recv.clone()
}
pub(crate) fn connect(network: Network, address: SocketAddr) -> Result<Self> {
pub(crate) fn connect(
network: Network,
address: SocketAddr,
metrics: &Metrics,
) -> Result<Self> {
let mut stream = TcpStream::connect(address)
.with_context(|| format!("{} p2p failed to connect: {:?}", network, address))?;
let mut reader = StreamReader::new(
@ -118,10 +134,28 @@ impl Connection {
let (tx_send, tx_recv) = bounded::<NetworkMessage>(1);
let (rx_send, rx_recv) = bounded::<NetworkMessage>(1);
let send_duration = metrics.histogram_vec(
"p2p_send_duration",
"Time spent sending p2p messages",
"step",
default_duration_buckets(),
);
let recv_duration = metrics.histogram_vec(
"p2p_recv_duration",
"Time spent receiving p2p messages",
"step",
default_duration_buckets(),
);
let blocks_duration = metrics.histogram_vec(
"p2p_blocks_duration",
"Time spent getting blocks via p2p protocol",
"step",
default_duration_buckets(),
);
crate::thread::spawn("p2p_send", move || loop {
use std::net::Shutdown;
let msg = match tx_recv.recv() {
let msg = match send_duration.observe_duration("wait", || tx_recv.recv()) {
Ok(msg) => msg,
Err(_) => {
// p2p_loop is closed, so tx_send is disconnected
@ -133,32 +167,36 @@ impl Connection {
return Ok(());
}
};
trace!("send: {:?}", msg);
let raw_msg = message::RawNetworkMessage {
magic: network.magic(),
payload: msg,
};
stream
.write_all(encode::serialize(&raw_msg).as_slice())
.context("p2p failed to send")?;
send_duration.observe_duration("send", || {
trace!("send: {:?}", msg);
let raw_msg = message::RawNetworkMessage {
magic: network.magic(),
payload: msg,
};
stream
.write_all(encode::serialize(&raw_msg).as_slice())
.context("p2p failed to send")
})?;
});
crate::thread::spawn("p2p_recv", move || loop {
use bitcoin::consensus::encode::Error;
use std::io::ErrorKind;
let raw_msg: message::RawNetworkMessage = match reader.read_next() {
Ok(raw_msg) => raw_msg,
Err(Error::Io(e)) if e.kind() == ErrorKind::UnexpectedEof => {
debug!("closing p2p_recv thread: connection closed");
return Ok(());
}
Err(e) => bail!("failed to recv a message from peer: {}", e),
};
let raw_msg: message::RawNetworkMessage =
match recv_duration.observe_duration("recv", || reader.read_next()) {
Ok(raw_msg) => raw_msg,
Err(Error::Io(e)) if e.kind() == ErrorKind::UnexpectedEof => {
debug!("closing p2p_recv thread: connection closed");
return Ok(());
}
Err(e) => bail!("failed to recv a message from peer: {}", e),
};
trace!("recv: {:?}", raw_msg.payload);
rx_send.send(raw_msg.payload)?;
recv_duration.observe_duration("wait", || {
trace!("recv: {:?}", raw_msg.payload);
rx_send.send(raw_msg.payload)
})?;
});
let (req_send, req_recv) = bounded::<Request>(1);
@ -230,6 +268,7 @@ impl Connection {
blocks_recv,
headers_recv,
new_block_recv,
blocks_duration,
})
}
}