mirror of
https://github.com/romanz/electrs.git
synced 2025-02-24 06:57:53 +01:00
Get hashtx notifications via 0MQ for mempool tracking
This commit is contained in:
parent
5ee8e7171a
commit
03dacc1b82
2 changed files with 39 additions and 13 deletions
|
@ -96,13 +96,19 @@ fn run_server(config: &Config) {
|
|||
let tx = chan.sender();
|
||||
scope.spawn(|| rpc::serve(config.rpc_addr(), &query, chan));
|
||||
loop {
|
||||
let blockhash = waiter.wait();
|
||||
if config.enable_indexing {
|
||||
index.update(&store, &daemon);
|
||||
}
|
||||
if let Err(e) = tx.try_send(rpc::Message::Block(blockhash)) {
|
||||
debug!("failed to send update for {}: {:?}", blockhash, e)
|
||||
}
|
||||
match waiter.wait() {
|
||||
waiter::Topic::HashBlock(blockhash) => {
|
||||
if config.enable_indexing {
|
||||
index.update(&store, &daemon);
|
||||
}
|
||||
if let Err(e) = tx.try_send(rpc::Message::Block(blockhash)) {
|
||||
debug!("failed to update RPC server {}: {:?}", blockhash, e)
|
||||
}
|
||||
}
|
||||
waiter::Topic::HashTx(txhash) => {
|
||||
debug!("got tx {}", txhash);
|
||||
}
|
||||
} // match
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -1,23 +1,43 @@
|
|||
use bitcoin::network::serialize::deserialize;
|
||||
use bitcoin::util::hash::Sha256dHash;
|
||||
use std::str;
|
||||
use zmq;
|
||||
|
||||
pub struct Waiter {
|
||||
sock: zmq::Socket,
|
||||
}
|
||||
|
||||
pub enum Topic {
|
||||
HashBlock(Sha256dHash),
|
||||
HashTx(Sha256dHash),
|
||||
}
|
||||
|
||||
impl Waiter {
|
||||
pub fn new(endpoint: &str) -> Waiter {
|
||||
let ctx = zmq::Context::new();
|
||||
let sock = ctx.socket(zmq::SocketType::SUB).unwrap();
|
||||
sock.set_subscribe(b"hashblock").unwrap();
|
||||
sock.connect(endpoint).unwrap();
|
||||
sock.set_subscribe(b"hashblock")
|
||||
.expect("failed to subscribe on blocks");
|
||||
sock.set_subscribe(b"hashtx")
|
||||
.expect("failed to subscribe on transactions");
|
||||
sock.connect(endpoint)
|
||||
.expect(&format!("failed to connect to {}", endpoint));
|
||||
Waiter { sock }
|
||||
}
|
||||
|
||||
pub fn wait(&self) -> Sha256dHash {
|
||||
let mut blockhash = self.sock.recv_multipart(0).unwrap().remove(1);
|
||||
blockhash.reverse(); // block hash needs to be LSB-first
|
||||
deserialize(&blockhash).unwrap()
|
||||
pub fn wait(&self) -> Topic {
|
||||
loop {
|
||||
let mut parts = self.sock.recv_multipart(0).unwrap().into_iter();
|
||||
let topic = parts.next().expect("missing topic");
|
||||
let mut blockhash = parts.next().expect("missing blockhash");
|
||||
blockhash.reverse(); // block hash needs to be LSB-first
|
||||
let hash: Sha256dHash = deserialize(&blockhash).unwrap();
|
||||
|
||||
match str::from_utf8(&topic).expect("non-string topic") {
|
||||
"hashblock" => return Topic::HashBlock(hash),
|
||||
"hashtx" => return Topic::HashTx(hash),
|
||||
_ => warn!("unknown topic {:?}", topic),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue