mirror of
https://github.com/romanz/electrs.git
synced 2024-11-19 09:54:09 +01:00
Parse p2p messages in a separate thread
Also, use a separate metrics for parsing duration monitoring.
This commit is contained in:
parent
67d8472e19
commit
6a3165af43
27
src/p2p.rs
27
src/p2p.rs
@ -135,7 +135,7 @@ impl Connection {
|
||||
);
|
||||
|
||||
let (tx_send, tx_recv) = bounded::<NetworkMessage>(1);
|
||||
let (rx_send, rx_recv) = bounded::<NetworkMessage>(1);
|
||||
let (rx_send, rx_recv) = bounded::<RawNetworkMessage>(1);
|
||||
|
||||
let send_duration = metrics.histogram_vec(
|
||||
"p2p_send_duration",
|
||||
@ -149,6 +149,12 @@ impl Connection {
|
||||
"step",
|
||||
default_duration_buckets(),
|
||||
);
|
||||
let parse_duration = metrics.histogram_vec(
|
||||
"p2p_parse_duration",
|
||||
"Time spent parsing p2p messages (in seconds)",
|
||||
"step",
|
||||
default_duration_buckets(),
|
||||
);
|
||||
let recv_size = metrics.histogram_vec(
|
||||
"p2p_recv_size",
|
||||
"Size of p2p messages read (in bytes)",
|
||||
@ -217,14 +223,7 @@ impl Connection {
|
||||
Err(e) => bail!("failed to recv a message from peer: {}", e),
|
||||
};
|
||||
|
||||
let label = format!("parse_{}", raw_msg.cmd.as_ref());
|
||||
let msg = recv_duration
|
||||
.observe_duration(&label, || raw_msg.parse().expect("invalid message"));
|
||||
|
||||
recv_duration.observe_duration("wait", || {
|
||||
trace!("recv: {:?}", msg);
|
||||
rx_send.send(msg)
|
||||
})?;
|
||||
recv_duration.observe_duration("wait", || rx_send.send(raw_msg))?;
|
||||
});
|
||||
|
||||
let (req_send, req_recv) = bounded::<Request>(1);
|
||||
@ -238,13 +237,19 @@ impl Connection {
|
||||
crate::thread::spawn("p2p_loop", move || loop {
|
||||
select! {
|
||||
recv(rx_recv) -> result => {
|
||||
let msg = match result {
|
||||
Ok(msg) => msg,
|
||||
let raw_msg = match result {
|
||||
Ok(raw_msg) => raw_msg,
|
||||
Err(_) => { // p2p_recv is closed, so rx_send is disconnected
|
||||
debug!("closing p2p_loop thread: peer has disconnected");
|
||||
return Ok(()); // new_block_send is dropped, causing the server to exit
|
||||
}
|
||||
};
|
||||
|
||||
let label = format!("parse_{}", raw_msg.cmd.as_ref());
|
||||
let msg = parse_duration
|
||||
.observe_duration(&label, || raw_msg.parse().expect("invalid message"));
|
||||
trace!("recv: {:?}", msg);
|
||||
|
||||
match msg {
|
||||
NetworkMessage::GetHeaders(_) => {
|
||||
tx_send.send(NetworkMessage::Headers(vec![]))?;
|
||||
|
Loading…
Reference in New Issue
Block a user