mirror of
https://github.com/romanz/electrs.git
synced 2024-11-19 01:43:29 +01:00
Improve p2p receiving metrics
This commit is contained in:
parent
81aa7ad24a
commit
fe30cd605d
@ -29,7 +29,7 @@ impl Cache {
|
||||
pub fn add_tx(&self, txid: Txid, f: impl FnOnce() -> Transaction) {
|
||||
self.txs.write().entry(txid).or_insert_with(|| {
|
||||
let tx = f();
|
||||
self.txs_size.observe("serialized", tx.get_size());
|
||||
self.txs_size.observe("serialized", tx.get_size() as f64);
|
||||
tx
|
||||
});
|
||||
}
|
||||
|
@ -44,7 +44,7 @@ impl Stats {
|
||||
}
|
||||
|
||||
fn observe_size(&self, label: &str, rows: &[Row]) {
|
||||
self.update_size.observe(label, db_rows_size(rows));
|
||||
self.update_size.observe(label, db_rows_size(rows) as f64);
|
||||
}
|
||||
|
||||
fn observe_batch(&self, batch: &WriteBatch) {
|
||||
|
@ -86,8 +86,8 @@ mod metrics_impl {
|
||||
}
|
||||
|
||||
impl Histogram {
|
||||
pub fn observe(&self, label: &str, value: usize) {
|
||||
self.hist.with_label_values(&[label]).observe(value as f64);
|
||||
pub fn observe(&self, label: &str, value: f64) {
|
||||
self.hist.with_label_values(&[label]).observe(value);
|
||||
}
|
||||
|
||||
pub fn observe_duration<F, T>(&self, label: &str, func: F) -> T
|
||||
@ -144,7 +144,7 @@ mod metrics_fake {
|
||||
pub struct Histogram {}
|
||||
|
||||
impl Histogram {
|
||||
pub fn observe(&self, _label: &str, _value: usize) {}
|
||||
pub fn observe(&self, _label: &str, _value: f64) {}
|
||||
|
||||
pub fn observe_duration<F, T>(&self, _label: &str, func: F) -> T
|
||||
where
|
||||
|
26
src/p2p.rs
26
src/p2p.rs
@ -18,7 +18,7 @@ use crossbeam_channel::{bounded, select, Receiver, Sender};
|
||||
use std::io::{self, ErrorKind, Write};
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
|
||||
use std::sync::Arc;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use crate::{
|
||||
chain::{Chain, NewHeader},
|
||||
@ -191,12 +191,23 @@ impl Connection {
|
||||
|
||||
let stream = Arc::clone(&conn);
|
||||
crate::thread::spawn("p2p_recv", move || loop {
|
||||
let raw_msg = match recv_duration
|
||||
.observe_duration("recv", || RawNetworkMessage::consensus_decode(&*stream))
|
||||
let start = Instant::now();
|
||||
let raw_msg = RawNetworkMessage::consensus_decode(&*stream);
|
||||
{
|
||||
let duration = duration_to_seconds(start.elapsed());
|
||||
let label = format!(
|
||||
"recv_{}",
|
||||
raw_msg
|
||||
.as_ref()
|
||||
.map(|msg| msg.cmd.as_ref())
|
||||
.unwrap_or("err")
|
||||
);
|
||||
recv_duration.observe(&label, duration);
|
||||
}
|
||||
let raw_msg = match raw_msg {
|
||||
Ok(raw_msg) => {
|
||||
assert_eq!(raw_msg.magic, network.magic());
|
||||
recv_size.observe(raw_msg.cmd.as_ref(), raw_msg.raw.len());
|
||||
recv_size.observe(raw_msg.cmd.as_ref(), raw_msg.raw.len() as f64);
|
||||
raw_msg
|
||||
}
|
||||
Err(encode::Error::Io(e)) if e.kind() == ErrorKind::UnexpectedEof => {
|
||||
@ -363,3 +374,10 @@ impl Decodable for RawNetworkMessage {
|
||||
Ok(RawNetworkMessage { magic, cmd, raw })
|
||||
}
|
||||
}
|
||||
|
||||
/// `duration_to_seconds` converts Duration to seconds.
|
||||
#[inline]
|
||||
pub fn duration_to_seconds(d: Duration) -> f64 {
|
||||
let nanos = f64::from(d.subsec_nanos()) / 1e9;
|
||||
d.as_secs() as f64 + nanos
|
||||
}
|
||||
|
@ -113,7 +113,7 @@ fn serve() -> Result<()> {
|
||||
let first = once(event.context("server disconnected")?);
|
||||
let rest = server_rx.iter().take(server_rx.len());
|
||||
let events: Vec<Event> = first.chain(rest).collect();
|
||||
server_batch_size.observe("recv", events.len());
|
||||
server_batch_size.observe("recv", events.len() as f64);
|
||||
handle_events(&rpc, &mut peers, events);
|
||||
},
|
||||
default(config.wait_duration) => (), // sync and update
|
||||
|
Loading…
Reference in New Issue
Block a user