From fe30cd605db3996fd78d3ce13cb424e737c8c47a Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Mon, 29 Nov 2021 23:40:55 +0200 Subject: [PATCH] Improve p2p receiving metrics --- src/cache.rs | 2 +- src/index.rs | 2 +- src/metrics.rs | 6 +++--- src/p2p.rs | 26 ++++++++++++++++++++++---- src/server.rs | 2 +- 5 files changed, 28 insertions(+), 10 deletions(-) diff --git a/src/cache.rs b/src/cache.rs index c11116f..08c0a19 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -29,7 +29,7 @@ impl Cache { pub fn add_tx(&self, txid: Txid, f: impl FnOnce() -> Transaction) { self.txs.write().entry(txid).or_insert_with(|| { let tx = f(); - self.txs_size.observe("serialized", tx.get_size()); + self.txs_size.observe("serialized", tx.get_size() as f64); tx }); } diff --git a/src/index.rs b/src/index.rs index 143ce89..323cb31 100644 --- a/src/index.rs +++ b/src/index.rs @@ -44,7 +44,7 @@ impl Stats { } fn observe_size(&self, label: &str, rows: &[Row]) { - self.update_size.observe(label, db_rows_size(rows)); + self.update_size.observe(label, db_rows_size(rows) as f64); } fn observe_batch(&self, batch: &WriteBatch) { diff --git a/src/metrics.rs b/src/metrics.rs index ba1ff2a..ddf0a02 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -86,8 +86,8 @@ mod metrics_impl { } impl Histogram { - pub fn observe(&self, label: &str, value: usize) { - self.hist.with_label_values(&[label]).observe(value as f64); + pub fn observe(&self, label: &str, value: f64) { + self.hist.with_label_values(&[label]).observe(value); } pub fn observe_duration(&self, label: &str, func: F) -> T @@ -144,7 +144,7 @@ mod metrics_fake { pub struct Histogram {} impl Histogram { - pub fn observe(&self, _label: &str, _value: usize) {} + pub fn observe(&self, _label: &str, _value: f64) {} pub fn observe_duration(&self, _label: &str, func: F) -> T where diff --git a/src/p2p.rs b/src/p2p.rs index 50db28b..1754a43 100644 --- a/src/p2p.rs +++ b/src/p2p.rs @@ -18,7 +18,7 @@ use crossbeam_channel::{bounded, select, Receiver, Sender}; use std::io::{self, ErrorKind, Write}; use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}; use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use crate::{ chain::{Chain, NewHeader}, @@ -191,12 +191,23 @@ impl Connection { let stream = Arc::clone(&conn); crate::thread::spawn("p2p_recv", move || loop { - let raw_msg = match recv_duration - .observe_duration("recv", || RawNetworkMessage::consensus_decode(&*stream)) + let start = Instant::now(); + let raw_msg = RawNetworkMessage::consensus_decode(&*stream); { + let duration = duration_to_seconds(start.elapsed()); + let label = format!( + "recv_{}", + raw_msg + .as_ref() + .map(|msg| msg.cmd.as_ref()) + .unwrap_or("err") + ); + recv_duration.observe(&label, duration); + } + let raw_msg = match raw_msg { Ok(raw_msg) => { assert_eq!(raw_msg.magic, network.magic()); - recv_size.observe(raw_msg.cmd.as_ref(), raw_msg.raw.len()); + recv_size.observe(raw_msg.cmd.as_ref(), raw_msg.raw.len() as f64); raw_msg } Err(encode::Error::Io(e)) if e.kind() == ErrorKind::UnexpectedEof => { @@ -363,3 +374,10 @@ impl Decodable for RawNetworkMessage { Ok(RawNetworkMessage { magic, cmd, raw }) } } + +/// `duration_to_seconds` converts Duration to seconds. +#[inline] +pub fn duration_to_seconds(d: Duration) -> f64 { + let nanos = f64::from(d.subsec_nanos()) / 1e9; + d.as_secs() as f64 + nanos +} diff --git a/src/server.rs b/src/server.rs index a13443c..3da8073 100644 --- a/src/server.rs +++ b/src/server.rs @@ -113,7 +113,7 @@ fn serve() -> Result<()> { let first = once(event.context("server disconnected")?); let rest = server_rx.iter().take(server_rx.len()); let events: Vec = first.chain(rest).collect(); - server_batch_size.observe("recv", events.len()); + server_batch_size.observe("recv", events.len() as f64); handle_events(&rpc, &mut peers, events); }, default(config.wait_duration) => (), // sync and update