diff --git a/src/bin/main.rs b/src/bin/main.rs index bb62c0c..29a5824 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -38,7 +38,7 @@ fn run_server(config: &Config) -> Result<()> { let app = App::new(store, index, daemon); let query = Query::new(app.clone(), &metrics); - let rpc = RPC::start(config.rpc_addr, query.clone()); + let rpc = RPC::start(config.rpc_addr, query.clone(), &metrics); while let None = signal.wait(Duration::from_secs(5)) { query.update_mempool()?; if tip != app.daemon().getbestblockhash()? { diff --git a/src/rpc.rs b/src/rpc.rs index 967a3f9..365285c 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -11,8 +11,9 @@ use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender, TrySe use std::sync::{Arc, Mutex}; use std::thread; +use metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics}; use query::Query; -use util::{HeaderEntry, Timer}; +use util::HeaderEntry; use errors::*; @@ -50,17 +51,24 @@ struct Connection { stream: TcpStream, addr: SocketAddr, chan: Channel, + stats: Arc, } impl Connection { - pub fn new(query: Arc, stream: TcpStream, addr: SocketAddr) -> Connection { + pub fn new( + query: Arc, + stream: TcpStream, + addr: SocketAddr, + stats: Arc, + ) -> Connection { Connection { - query: query, + query, last_header_entry: None, // disable header subscription for now status_hashes: HashMap::new(), stream, addr, chan: Channel::new(), + stats, } } @@ -189,6 +197,10 @@ impl Connection { } fn handle_command(&mut self, method: &str, params: &[Value], id: &Number) -> Result { + let timer = self.stats + .latency + .with_label_values(&[method]) + .start_timer(); let result = match method { "blockchain.headers.subscribe" => self.blockchain_headers_subscribe(), "server.version" => self.server_version(), @@ -208,6 +220,7 @@ impl Connection { "blockchain.transaction.get_merkle" => self.blockchain_transaction_get_merkle(¶ms), &_ => bail!("unknown method {} {:?}", method, params), }; + timer.observe_duration(); // TODO: return application errors should be sent to the client Ok(match result { Ok(result) => json!({"jsonrpc": "2.0", "id": id, "result": result}), @@ -225,8 +238,11 @@ impl Connection { } fn update_subscriptions(&mut self) -> Result> { + let timer = self.stats + .latency + .with_label_values(&["periodic_update"]) + .start_timer(); let mut result = vec![]; - let mut timer = Timer::new(); if let Some(ref mut last_entry) = self.last_header_entry { let entry = self.query.get_best_header()?; if *last_entry != entry { @@ -251,18 +267,15 @@ impl Connection { "params": [script_hash.be_hex_string(), new_status_hash]})); *status_hash = new_status_hash; } - timer.tick("total"); - debug!( - "updating {} subscriptions {:?}", - self.status_hashes.len(), - timer - ); + timer.observe_duration(); + self.stats + .subscriptions + .set(self.status_hashes.len() as i64); Ok(result) } fn send_values(&mut self, values: &[Value]) -> Result<()> { for value in values { - debug!("[{}] <- {}", self.addr, value); let line = value.to_string() + "\n"; self.stream .write_all(line.as_bytes()) @@ -277,7 +290,6 @@ impl Connection { match msg { Message::Request(line) => { let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?; - debug!("[{}] -> {}", self.addr, cmd); let reply = match (cmd.get("method"), cmd.get("params"), cmd.get("id")) { ( Some(&Value::String(ref method)), @@ -366,6 +378,11 @@ pub struct RPC { notification: Sender<()>, } +struct Stats { + latency: HistogramVec, + subscriptions: Gauge, +} + impl RPC { fn start_notification_worker( receiver: Receiver<()>, @@ -386,7 +403,17 @@ impl RPC { }); } - pub fn start(addr: SocketAddr, query: Arc) -> RPC { + pub fn start(addr: SocketAddr, query: Arc, metrics: &Metrics) -> RPC { + let stats = Arc::new(Stats { + latency: metrics.histogram( + HistogramOpts::new("electrum_rpc", "Electrum RPC latency (seconds)"), + &["method"], + ), + subscriptions: metrics.gauge(MetricOpts::new( + "electrum_subscriptions", + "# of Electrum subscriptions", + )), + }); let (tx, rx) = channel(); let listener = TcpListener::bind(addr).expect(&format!("bind({}) failed", addr)); info!("RPC server running on {}", addr); @@ -397,9 +424,10 @@ impl RPC { let (stream, addr) = listener.accept().expect("accept failed"); let query = query.clone(); let senders = senders.clone(); + let stats = stats.clone(); thread::spawn(move || { info!("[{}] connected peer", addr); - let conn = Connection::new(query, stream, addr); + let conn = Connection::new(query, stream, addr, stats); senders.lock().unwrap().push(conn.chan.sender()); conn.run(); info!("[{}] disconnected peer", addr);