From 03dacc1b82ba66e0f02fbab730496a25c6611943 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Thu, 10 May 2018 23:22:19 +0300 Subject: [PATCH] Get hashtx notifications via 0MQ for mempool tracking --- src/bin/index_server.rs | 20 +++++++++++++------- src/waiter.rs | 32 ++++++++++++++++++++++++++------ 2 files changed, 39 insertions(+), 13 deletions(-) diff --git a/src/bin/index_server.rs b/src/bin/index_server.rs index 49aa71c..f3196a5 100644 --- a/src/bin/index_server.rs +++ b/src/bin/index_server.rs @@ -96,13 +96,19 @@ fn run_server(config: &Config) { let tx = chan.sender(); scope.spawn(|| rpc::serve(config.rpc_addr(), &query, chan)); loop { - let blockhash = waiter.wait(); - if config.enable_indexing { - index.update(&store, &daemon); - } - if let Err(e) = tx.try_send(rpc::Message::Block(blockhash)) { - debug!("failed to send update for {}: {:?}", blockhash, e) - } + match waiter.wait() { + waiter::Topic::HashBlock(blockhash) => { + if config.enable_indexing { + index.update(&store, &daemon); + } + if let Err(e) = tx.try_send(rpc::Message::Block(blockhash)) { + debug!("failed to update RPC server {}: {:?}", blockhash, e) + } + } + waiter::Topic::HashTx(txhash) => { + debug!("got tx {}", txhash); + } + } // match } }); } diff --git a/src/waiter.rs b/src/waiter.rs index 0074229..cd7d26f 100644 --- a/src/waiter.rs +++ b/src/waiter.rs @@ -1,23 +1,43 @@ use bitcoin::network::serialize::deserialize; use bitcoin::util::hash::Sha256dHash; +use std::str; use zmq; pub struct Waiter { sock: zmq::Socket, } +pub enum Topic { + HashBlock(Sha256dHash), + HashTx(Sha256dHash), +} + impl Waiter { pub fn new(endpoint: &str) -> Waiter { let ctx = zmq::Context::new(); let sock = ctx.socket(zmq::SocketType::SUB).unwrap(); - sock.set_subscribe(b"hashblock").unwrap(); - sock.connect(endpoint).unwrap(); + sock.set_subscribe(b"hashblock") + .expect("failed to subscribe on blocks"); + sock.set_subscribe(b"hashtx") + .expect("failed to subscribe on transactions"); + sock.connect(endpoint) + .expect(&format!("failed to connect to {}", endpoint)); Waiter { sock } } - pub fn wait(&self) -> Sha256dHash { - let mut blockhash = self.sock.recv_multipart(0).unwrap().remove(1); - blockhash.reverse(); // block hash needs to be LSB-first - deserialize(&blockhash).unwrap() + pub fn wait(&self) -> Topic { + loop { + let mut parts = self.sock.recv_multipart(0).unwrap().into_iter(); + let topic = parts.next().expect("missing topic"); + let mut blockhash = parts.next().expect("missing blockhash"); + blockhash.reverse(); // block hash needs to be LSB-first + let hash: Sha256dHash = deserialize(&blockhash).unwrap(); + + match str::from_utf8(&topic).expect("non-string topic") { + "hashblock" => return Topic::HashBlock(hash), + "hashtx" => return Topic::HashTx(hash), + _ => warn!("unknown topic {:?}", topic), + }; + } } }