1
0
mirror of https://github.com/romanz/electrs.git synced 2024-11-19 09:54:09 +01:00

Allow optional metrics collection

Can be disable by `--no-default-features`.

Also, enable RocksDB ZSTD compression (removing the feature).
This commit is contained in:
Roman Zeyde 2021-05-27 20:44:51 +03:00
parent d4ef9a3860
commit b0deaecdc4
6 changed files with 144 additions and 90 deletions

7
Cargo.lock generated
View File

@ -720,9 +720,16 @@ dependencies = [
"memchr", "memchr",
"parking_lot", "parking_lot",
"procfs", "procfs",
"protobuf",
"thiserror", "thiserror",
] ]
[[package]]
name = "protobuf"
version = "2.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45604fc7a88158e7d514d8e22e14ac746081e7a70d7690074dd0029ee37458d6"
[[package]] [[package]]
name = "quick-error" name = "quick-error"
version = "1.2.3" version = "1.2.3"

View File

@ -13,7 +13,8 @@ edition = "2018"
build = "build.rs" build = "build.rs"
[features] [features]
default = ["rocksdb/zstd", "prometheus/process"] default = ["metrics"]
metrics = ["prometheus"]
[package.metadata.configure_me] [package.metadata.configure_me]
spec = "internal/config_specification.toml" spec = "internal/config_specification.toml"
@ -28,9 +29,9 @@ dirs-next = "2.0"
env_logger = "0.7" env_logger = "0.7"
hyper = "0.10" hyper = "0.10"
log = "0.4" log = "0.4"
prometheus = { version = "0.12", default-features = false } prometheus = { version = "0.12", features = ["process"], optional = true }
rayon = "1.5" rayon = "1.5"
rocksdb = { git = "https://github.com/romanz/rust-rocksdb", rev = "4554d19b2ff2e34493564b4d868454097c74b693", default-features = false } # to support building with Rust 1.41.1 and https://github.com/romanz/electrs/issues/403 rocksdb = { git = "https://github.com/romanz/rust-rocksdb", rev = "4554d19b2ff2e34493564b4d868454097c74b693", features = ["zstd"] } # to support building with Rust 1.41.1 and https://github.com/romanz/electrs/issues/403
rust-crypto = "0.2" rust-crypto = "0.2"
serde = "1.0" serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"

View File

@ -208,6 +208,13 @@ impl Config {
(DEFAULT_SERVER_ADDRESS, default_electrum_port).into(), (DEFAULT_SERVER_ADDRESS, default_electrum_port).into(),
ResolvAddr::resolve_or_exit, ResolvAddr::resolve_or_exit,
); );
#[cfg(not(feature = "metrics"))]
{
assert!(
config.monitoring_addr.is_none(),
"Enable \"metrics\" feature to specify monitoring_addr"
);
}
let monitoring_addr: SocketAddr = config.monitoring_addr.map_or( let monitoring_addr: SocketAddr = config.monitoring_addr.map_or(
(DEFAULT_SERVER_ADDRESS, default_monitoring_port).into(), (DEFAULT_SERVER_ADDRESS, default_monitoring_port).into(),
ResolvAddr::resolve_or_exit, ResolvAddr::resolve_or_exit,

View File

@ -79,11 +79,10 @@ pub struct Rpc {
impl Rpc { impl Rpc {
pub fn new(config: &Config, tracker: Tracker) -> Result<Self> { pub fn new(config: &Config, tracker: Tracker) -> Result<Self> {
let rpc_duration = tracker.metrics().histogram_vec( let rpc_duration =
"rpc_duration", tracker
"RPC duration (in seconds)", .metrics()
&["method"], .histogram_vec("rpc_duration", "RPC duration (in seconds)", "method");
);
Ok(Self { Ok(Self {
tracker, tracker,
cache: Cache::default(), cache: Cache::default(),

View File

@ -25,30 +25,30 @@ impl Stats {
update_duration: metrics.histogram_vec( update_duration: metrics.histogram_vec(
"index_update_duration", "index_update_duration",
"Index update duration (in seconds)", "Index update duration (in seconds)",
&["step"], "step",
), ),
update_size: metrics.histogram_vec( update_size: metrics.histogram_vec(
"index_update_size", "index_update_size",
"Index update size (in bytes)", "Index update size (in bytes)",
&["step"], "step",
), ),
lookup_duration: metrics.histogram_vec( lookup_duration: metrics.histogram_vec(
"index_lookup_duration", "index_lookup_duration",
"Index lookup duration (in seconds)", "Index lookup duration (in seconds)",
&["step"], "step",
), ),
} }
} }
fn report_stats(&self, batch: &db::WriteBatch) { fn report_stats(&self, batch: &db::WriteBatch) {
self.update_size self.update_size
.observe_size("write_funding_rows", db_rows_size(&batch.funding_rows)); .observe("write_funding_rows", db_rows_size(&batch.funding_rows));
self.update_size self.update_size
.observe_size("write_spending_rows", db_rows_size(&batch.spending_rows)); .observe("write_spending_rows", db_rows_size(&batch.spending_rows));
self.update_size self.update_size
.observe_size("write_txid_rows", db_rows_size(&batch.txid_rows)); .observe("write_txid_rows", db_rows_size(&batch.txid_rows));
self.update_size self.update_size
.observe_size("write_header_rows", db_rows_size(&batch.header_rows)); .observe("write_header_rows", db_rows_size(&batch.header_rows));
debug!( debug!(
"writing {} funding and {} spending rows from {} transactions, {} blocks", "writing {} funding and {} spending rows from {} transactions, {} blocks",
batch.funding_rows.len(), batch.funding_rows.len(),

View File

@ -1,89 +1,129 @@
use anyhow::{Context, Result}; #[cfg(feature = "metrics")]
use hyper::server::{Handler, Listening, Request, Response, Server}; mod metrics_impl {
use prometheus::{self, Encoder, HistogramOpts, HistogramVec, Registry}; use anyhow::{Context, Result};
use hyper::server::{Handler, Listening, Request, Response, Server};
use prometheus::process_collector::ProcessCollector;
use prometheus::{self, Encoder, HistogramOpts, HistogramVec, Registry};
#[cfg(feature = "process_collector")] use std::net::SocketAddr;
use prometheus::process_collector::ProcessCollector;
use std::net::SocketAddr; pub struct Metrics {
reg: Registry,
listen: Listening,
}
pub struct Metrics { impl Metrics {
reg: Registry, pub fn new(addr: SocketAddr) -> Result<Self> {
listen: Listening, let reg = Registry::new();
}
impl Drop for Metrics { reg.register(Box::new(ProcessCollector::for_self()))
fn drop(&mut self) { .expect("failed to register ProcessCollector");
debug!("closing Prometheus server");
if let Err(e) = self.listen.close() { let listen = Server::http(addr)?
warn!("failed to stop Prometheus server: {}", e); .handle(RegistryHandler { reg: reg.clone() })
.with_context(|| format!("failed to serve on {}", addr))?;
info!("serving Prometheus metrics on {}", addr);
Ok(Self { reg, listen })
}
pub fn histogram_vec(&self, name: &str, desc: &str, label: &str) -> Histogram {
let opts = HistogramOpts::new(name, desc);
let hist = HistogramVec::new(opts, &[label]).unwrap();
self.reg
.register(Box::new(hist.clone()))
.expect("failed to register Histogram");
Histogram { hist }
}
}
impl Drop for Metrics {
fn drop(&mut self) {
debug!("closing Prometheus server");
if let Err(e) = self.listen.close() {
warn!("failed to stop Prometheus server: {}", e);
}
}
}
#[derive(Clone)]
pub struct Histogram {
hist: HistogramVec,
}
impl Histogram {
pub fn observe(&self, label: &str, value: usize) {
self.hist.with_label_values(&[label]).observe(value as f64);
}
pub fn observe_duration<F, T>(&self, label: &str, func: F) -> T
where
F: FnOnce() -> T,
{
self.hist
.with_label_values(&[label])
.observe_closure_duration(func)
}
}
struct RegistryHandler {
reg: Registry,
}
impl RegistryHandler {
fn gather(&self) -> Result<Vec<u8>> {
let mut buffer = vec![];
prometheus::TextEncoder::new()
.encode(&self.reg.gather(), &mut buffer)
.context("failed to encode metrics")?;
Ok(buffer)
}
}
impl Handler for RegistryHandler {
fn handle(&self, req: Request, res: Response) {
trace!("{} {}", req.method, req.uri);
let buffer = self.gather().expect("failed to gather metrics");
res.send(&buffer).expect("failed to send metrics");
} }
} }
} }
#[derive(Clone)] #[cfg(feature = "metrics")]
pub struct Histogram { pub use metrics_impl::{Histogram, Metrics};
hist: HistogramVec,
}
impl Histogram { #[cfg(not(feature = "metrics"))]
pub fn observe_size(&self, label: &str, value: usize) { mod metrics_fake {
self.hist.with_label_values(&[label]).observe(value as f64); use anyhow::Result;
use std::net::SocketAddr;
pub struct Metrics {}
impl Metrics {
pub fn new(_addr: SocketAddr) -> Result<Self> {
debug!("metrics collection is disabled");
Ok(Self {})
}
pub fn histogram_vec(&self, _name: &str, _desc: &str, _label: &str) -> Histogram {
Histogram {}
}
} }
pub fn observe_duration<F, T>(&self, label: &str, func: F) -> T #[derive(Clone)]
where pub struct Histogram {}
F: FnOnce() -> T,
{ impl Histogram {
self.hist pub fn observe(&self, _label: &str, _value: usize) {}
.with_label_values(&[label])
.observe_closure_duration(func) pub fn observe_duration<F, T>(&self, _label: &str, func: F) -> T
where
F: FnOnce() -> T,
{
func()
}
} }
} }
struct RegistryHandler { #[cfg(not(feature = "metrics"))]
reg: Registry, pub use metrics_fake::{Histogram, Metrics};
}
impl RegistryHandler {
fn gather(&self) -> Result<Vec<u8>> {
let mut buffer = vec![];
prometheus::TextEncoder::new()
.encode(&self.reg.gather(), &mut buffer)
.context("failed to encode metrics")?;
Ok(buffer)
}
}
impl Handler for RegistryHandler {
fn handle(&self, req: Request, res: Response) {
trace!("{} {}", req.method, req.uri);
let buffer = self.gather().expect("failed to gather metrics");
res.send(&buffer).expect("send failed");
}
}
impl Metrics {
pub fn new(addr: SocketAddr) -> Result<Self> {
let reg = Registry::new();
#[cfg(feature = "prometheus/process")]
reg.register(Box::new(ProcessCollector::for_self()))
.expect("failed to register ProcessCollector");
let listen = Server::http(addr)?
.handle(RegistryHandler { reg: reg.clone() })
.with_context(|| format!("failed to serve on {}", addr))?;
info!("serving Prometheus metrics on {}", addr);
Ok(Self { reg, listen })
}
pub fn histogram_vec(&self, name: &str, desc: &str, labels: &[&str]) -> Histogram {
let opts = HistogramOpts::new(name, desc);
let hist = HistogramVec::new(opts, labels).unwrap();
self.reg
.register(Box::new(hist.clone()))
.expect("failed to register Histogram");
Histogram { hist }
}
}