From 1402d290dc05d30a68a9a5cfd8cc3147d8bc1c81 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Fri, 1 Oct 2021 20:07:36 +0300 Subject: [PATCH] Use p2p protocol to replace `waitfornewblock` hidden RPC Fixes https://github.com/romanz/electrs/issues/522. --- src/daemon.rs | 9 +- src/electrum.rs | 5 + src/p2p.rs | 319 ++++++++++++++++++++++++++++++------------------ src/server.rs | 55 +-------- src/status.rs | 2 +- 5 files changed, 220 insertions(+), 170 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index 090a9ec..e9bf423 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -4,6 +4,7 @@ use bitcoin::{ consensus::serialize, hashes::hex::ToHex, Amount, Block, BlockHash, Transaction, Txid, }; use bitcoincore_rpc::{json, jsonrpc, Auth, Client, RpcApi}; +use crossbeam_channel::Receiver; use parking_lot::Mutex; use serde_json::{json, Value}; @@ -70,7 +71,7 @@ fn read_cookie(path: &Path) -> Result<(String, String)> { Ok((parts[0].to_owned(), parts[1].to_owned())) } -pub(crate) fn rpc_connect(config: &Config) -> Result { +fn rpc_connect(config: &Config) -> Result { let rpc_url = format!("http://{}", config.daemon_rpc_addr); let mut client = { // Allow `wait_for_new_block` to take a bit longer before timing out. @@ -218,10 +219,14 @@ impl Daemon { pub(crate) fn for_blocks(&self, blockhashes: B, func: F) -> Result<()> where B: IntoIterator, - F: FnMut(BlockHash, Block) + Send, + F: FnMut(BlockHash, Block), { self.p2p.lock().for_blocks(blockhashes, func) } + + pub(crate) fn new_block_notification(&self) -> Receiver<()> { + self.p2p.lock().new_block_notification() + } } pub(crate) type RpcError = bitcoincore_rpc::jsonrpc::error::RpcError; diff --git a/src/electrum.rs b/src/electrum.rs index deb5e54..4ca149b 100644 --- a/src/electrum.rs +++ b/src/electrum.rs @@ -4,6 +4,7 @@ use bitcoin::{ hashes::hex::{FromHex, ToHex}, BlockHash, Txid, }; +use crossbeam_channel::Receiver; use rayon::prelude::*; use serde_derive::Deserialize; use serde_json::{self, json, Value}; @@ -152,6 +153,10 @@ impl Rpc { &self.signal } + pub fn new_block_notification(&self) -> Receiver<()> { + self.daemon.new_block_notification() + } + pub fn sync(&mut self) -> Result<()> { self.tracker.sync(&self.daemon, self.signal.exit_flag()) } diff --git a/src/p2p.rs b/src/p2p.rs index 396a09a..ace4234 100644 --- a/src/p2p.rs +++ b/src/p2p.rs @@ -1,9 +1,3 @@ -use std::io::Write; -use std::iter::FromIterator; -use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}; -use std::time::{SystemTime, UNIX_EPOCH}; - -use crate::chain::{Chain, NewHeader}; use anyhow::{Context, Result}; use bitcoin::{ consensus::encode, @@ -15,125 +9,56 @@ use bitcoin::{ stream_reader::StreamReader, }, secp256k1::{self, rand::Rng}, - Block, BlockHash, Network, + Block, BlockHash, BlockHeader, Network, }; +use crossbeam_channel::{bounded, select, Receiver, Sender}; + +use std::io::Write; +use std::iter::FromIterator; +use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use crate::chain::{Chain, NewHeader}; + +enum Request { + GetNewHeaders(GetHeadersMessage), + GetBlocks(Vec), +} + +impl Request { + fn get_new_headers(chain: &Chain) -> Request { + Request::GetNewHeaders(GetHeadersMessage::new( + chain.locator(), + BlockHash::default(), + )) + } + + fn get_blocks(blockhashes: &[BlockHash]) -> Request { + Request::GetBlocks( + blockhashes + .iter() + .map(|blockhash| Inventory::WitnessBlock(*blockhash)) + .collect(), + ) + } +} pub(crate) struct Connection { - stream: TcpStream, - reader: StreamReader, - network: Network, + req_send: Sender, + blocks_recv: Receiver, + headers_recv: Receiver>, + new_block_recv: Receiver<()>, } impl Connection { - /// Connect to a Bitcoin node via p2p protocol. - /// See https://en.bitcoin.it/wiki/Protocol_documentation for details. - pub fn connect(network: Network, address: SocketAddr) -> Result { - let stream = TcpStream::connect(address) - .with_context(|| format!("{} p2p failed to connect: {:?}", network, address))?; - let reader = StreamReader::new( - stream.try_clone().context("stream failed to clone")?, - /*buffer_size*/ Some(1 << 20), - ); - let mut conn = Self { - stream, - reader, - network, - }; - conn.send(build_version_message())?; - if let NetworkMessage::GetHeaders(_) = conn.recv().context("failed to get headers")? { - conn.send(NetworkMessage::Headers(vec![]))?; - } - Ok(conn) - } - - fn send(&mut self, msg: NetworkMessage) -> Result<()> { - trace!("send: {:?}", msg); - let raw_msg = message::RawNetworkMessage { - magic: self.network.magic(), - payload: msg, - }; - self.stream - .write_all(encode::serialize(&raw_msg).as_slice()) - .context("p2p failed to send") - } - - fn recv(&mut self) -> Result { - loop { - let raw_msg: message::RawNetworkMessage = - self.reader.read_next().context("p2p failed to recv")?; - - trace!("recv: {:?}", raw_msg.payload); - match raw_msg.payload { - NetworkMessage::Version(version) => { - debug!("peer version: {:?}", version); - self.send(NetworkMessage::Verack)?; - } - NetworkMessage::Ping(nonce) => { - self.send(NetworkMessage::Pong(nonce))?; - } - NetworkMessage::Verack - | NetworkMessage::Alert(_) - | NetworkMessage::Addr(_) - | NetworkMessage::Inv(_) => {} - payload => return Ok(payload), - }; - } - } - - /// Request and process the specified blocks (in the specified order). - /// See https://en.bitcoin.it/wiki/Protocol_documentation#getblocks for details. - pub(crate) fn for_blocks(&mut self, blockhashes: B, mut func: F) -> Result<()> - where - B: IntoIterator, - F: FnMut(BlockHash, Block) + Send, - { - let blockhashes = Vec::from_iter(blockhashes); - if blockhashes.is_empty() { - return Ok(()); - } - let inv = blockhashes - .iter() - .map(|h| Inventory::WitnessBlock(*h)) - .collect(); - debug!("loading {} blocks", blockhashes.len()); - self.send(NetworkMessage::GetData(inv))?; - - // receive, parse and process the blocks concurrently - rayon::scope(|s| { - let (tx, rx) = crossbeam_channel::bounded(10); - s.spawn(|_| { - // the loop will exit when the sender is dropped - for (hash, block) in rx { - func(hash, block); - } - }); - - for hash in blockhashes { - match self - .recv() - .with_context(|| format!("failed to get block {}", hash))? - { - NetworkMessage::Block(block) => { - ensure!(block.block_hash() == hash, "got unexpected block"); - tx.send((hash, block)) - .context("disconnected from block processor")?; - } - msg => bail!("unexpected {:?}", msg), - }; - } - Ok(()) - }) - } - /// Get new block headers (supporting reorgs). /// https://en.bitcoin.it/wiki/Protocol_documentation#getheaders pub(crate) fn get_new_headers(&mut self, chain: &Chain) -> Result> { - let msg = GetHeadersMessage::new(chain.locator(), BlockHash::default()); - self.send(NetworkMessage::GetHeaders(msg))?; - let headers = match self.recv().context("failed to get new headers")? { - NetworkMessage::Headers(headers) => headers, - msg => bail!("unexpected {:?}", msg), - }; + self.req_send.send(Request::get_new_headers(chain))?; + let headers = self + .headers_recv + .recv() + .context("failed to get new headers")?; debug!("got {} new headers", headers.len()); let prev_blockhash = match headers.first().map(|h| h.prev_blockhash) { @@ -150,6 +75,160 @@ impl Connection { .map(NewHeader::from) .collect()) } + + /// Request and process the specified blocks (in the specified order). + /// See https://en.bitcoin.it/wiki/Protocol_documentation#getblocks for details. + pub(crate) fn for_blocks(&mut self, blockhashes: B, mut func: F) -> Result<()> + where + B: IntoIterator, + F: FnMut(BlockHash, Block), + { + let blockhashes = Vec::from_iter(blockhashes); + if blockhashes.is_empty() { + return Ok(()); + } + debug!("loading {} blocks", blockhashes.len()); + self.req_send.send(Request::get_blocks(&blockhashes))?; + + for hash in blockhashes { + let block = self + .blocks_recv + .recv() + .with_context(|| format!("failed to get block {}", hash))?; + ensure!(block.block_hash() == hash, "got unexpected block"); + func(hash, block); + } + Ok(()) + } + + pub(crate) fn new_block_notification(&self) -> Receiver<()> { + self.new_block_recv.clone() + } + + pub(crate) fn connect(network: Network, address: SocketAddr) -> Result { + let mut stream = TcpStream::connect(address) + .with_context(|| format!("{} p2p failed to connect: {:?}", network, address))?; + let mut reader = StreamReader::new( + stream.try_clone().context("stream failed to clone")?, + /*buffer_size*/ Some(1 << 20), + ); + let (tx_send, tx_recv) = bounded::(1); + let (rx_send, rx_recv) = bounded::(1); + + crate::thread::spawn("p2p_send", move || loop { + use std::net::Shutdown; + + let msg = match tx_recv.recv() { + Ok(msg) => msg, + Err(_) => { + // p2p_loop is closed, so tx_send is disconnected + debug!("closing p2p_send thread: no more messages to send"); + // close the stream reader (p2p_recv thread may block on it) + if let Err(e) = stream.shutdown(Shutdown::Read) { + warn!("failed to shutdown p2p connection: {}", e) + } + return Ok(()); + } + }; + + trace!("send: {:?}", msg); + let raw_msg = message::RawNetworkMessage { + magic: network.magic(), + payload: msg, + }; + stream + .write_all(encode::serialize(&raw_msg).as_slice()) + .context("p2p failed to send")?; + }); + + crate::thread::spawn("p2p_recv", move || loop { + use bitcoin::consensus::encode::Error; + use std::io::ErrorKind; + + let raw_msg: message::RawNetworkMessage = match reader.read_next() { + Ok(raw_msg) => raw_msg, + Err(Error::Io(e)) if e.kind() == ErrorKind::UnexpectedEof => { + debug!("closing p2p_recv thread: connection closed"); + return Ok(()); + } + Err(e) => bail!("failed to recv a message from peer: {}", e), + }; + + trace!("recv: {:?}", raw_msg.payload); + rx_send.send(raw_msg.payload)?; + }); + + let (req_send, req_recv) = bounded::(1); + let (blocks_send, blocks_recv) = bounded::(10); + let (headers_send, headers_recv) = bounded::>(1); + let (new_block_send, new_block_recv) = bounded::<()>(0); + let (init_send, init_recv) = bounded::<()>(0); + + tx_send.send(build_version_message())?; + + crate::thread::spawn("p2p_loop", move || loop { + select! { + recv(rx_recv) -> result => { + let msg = match result { + Ok(msg) => msg, + Err(_) => { // p2p_recv is closed, so rx_send is disconnected + debug!("closing p2p_loop thread: peer has disconnected"); + return Ok(()); + } + }; + match msg { + NetworkMessage::GetHeaders(_) => { + tx_send.send(NetworkMessage::Headers(vec![]))?; + } + NetworkMessage::Version(version) => { + debug!("peer version: {:?}", version); + tx_send.send(NetworkMessage::Verack)?; + } + NetworkMessage::Inv(inventory) => { + debug!("peer inventory: {:?}", inventory); + if inventory.iter().any(is_block_inv) { + let _ = new_block_send.try_send(()); // best-effort notification + } + + }, + NetworkMessage::Ping(nonce) => { + tx_send.send(NetworkMessage::Pong(nonce))?; // connection keep-alive + } + NetworkMessage::Verack => { + init_send.send(())?; // peer acknowledged our version + } + NetworkMessage::Alert(_) | NetworkMessage::Addr(_) => {} + NetworkMessage::Block(block) => blocks_send.send(block)?, + NetworkMessage::Headers(headers) => headers_send.send(headers)?, + msg => warn!("unexpected message: {:?}", msg), + } + } + recv(req_recv) -> result => { + let req = match result { + Ok(req) => req, + Err(_) => { // self is dropped, so req_send is disconnected + debug!("closing p2p_loop thread: no more requests to handle"); + return Ok(()); + } + }; + let msg = match req { + Request::GetNewHeaders(msg) => NetworkMessage::GetHeaders(msg), + Request::GetBlocks(inv) => NetworkMessage::GetData(inv), + }; + tx_send.send(msg)?; + } + } + }); + + init_recv.recv()?; // wait until `verack` is received + + Ok(Connection { + req_send, + blocks_recv, + headers_recv, + new_block_recv, + }) + } } fn build_version_message() -> NetworkMessage { @@ -173,3 +252,11 @@ fn build_version_message() -> NetworkMessage { relay: false, }) } + +fn is_block_inv(inv: &Inventory) -> bool { + if let Inventory::Block(_) = inv { + true + } else { + false + } +} diff --git a/src/server.rs b/src/server.rs index c328389..d73604d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,19 +1,15 @@ use anyhow::{Context, Result}; -use bitcoin::BlockHash; -use bitcoincore_rpc::RpcApi; -use crossbeam_channel::{bounded, select, unbounded, Receiver, Sender}; +use crossbeam_channel::{select, unbounded, Sender}; use rayon::prelude::*; use std::{ collections::hash_map::HashMap, - convert::TryFrom, io::{BufRead, BufReader, Write}, net::{Shutdown, TcpListener, TcpStream}, }; use crate::{ config::Config, - daemon::rpc_connect, electrum::{Client, Rpc}, thread::spawn, }; @@ -48,54 +44,10 @@ impl Peer { } } -fn handle_rpc_error(name: &str, err: bitcoincore_rpc::Error) -> Result<()> { - use bitcoincore_rpc::{ - jsonrpc::{error::Error::Transport as TransportError, simple_http::Error as HttpError}, - Error::JsonRpc as JsonRpcError, - }; - if let JsonRpcError(TransportError(ref e)) = err { - if let Some(HttpError::Timeout) = e.downcast_ref::() { - // Following https://github.com/romanz/electrs/issues/495 - warn!("ignoring HTTP timeout from RPC '{}'", name); - return Ok(()); - } - } - bail!("RPC '{}' failed: {}", name, err); // fail on all other RPC errors -} - -fn tip_receiver(config: &Config) -> Result> { - let duration = u64::try_from(config.wait_duration.as_millis()).unwrap(); - let (tip_tx, tip_rx) = bounded(0); - let rpc = rpc_connect(config)?; - - use crossbeam_channel::TrySendError; - spawn("tip_loop", move || loop { - let tip = match rpc.get_best_block_hash() { - Ok(tip) => tip, - Err(err) => { - handle_rpc_error("getbestblockhash", err)?; - continue; - } - }; - match tip_tx.try_send(tip) { - Ok(_) | Err(TrySendError::Full(_)) => (), - Err(TrySendError::Disconnected(_)) => bail!("tip receiver disconnected"), - } - if let Err(err) = rpc.wait_for_new_block(duration) { - warn!( - "waiting {:.1}s for new block failed: {}", - duration as f64 / 1e3, - err - ); - } - }); - Ok(tip_rx) -} - pub fn run(config: &Config, mut rpc: Rpc) -> Result<()> { let listener = TcpListener::bind(config.electrum_rpc_addr)?; - let tip_rx = tip_receiver(config)?; info!("serving Electrum RPC on {}", listener.local_addr()?); + let new_block_rx = rpc.new_block_notification(); let (server_tx, server_rx) = unbounded(); spawn("accept_loop", || accept_loop(listener, server_tx)); // detach accepting thread @@ -109,7 +61,7 @@ pub fn run(config: &Config, mut rpc: Rpc) -> Result<()> { break; } }, - recv(tip_rx) -> tip => match tip { + recv(new_block_rx) -> result => match result { Ok(_) => (), // sync and update Err(_) => break, // daemon is shutting down }, @@ -117,6 +69,7 @@ pub fn run(config: &Config, mut rpc: Rpc) -> Result<()> { let event = event.context("server disconnected")?; handle_event(&rpc, &mut peers, event); }, + default(config.wait_duration) => (), // sync and update }; if !server_rx.is_empty() { continue; // continue RPC processing (if not done) diff --git a/src/status.rs b/src/status.rs index dc684d8..e1e30a5 100644 --- a/src/status.rs +++ b/src/status.rs @@ -307,7 +307,7 @@ impl ScriptHashStatus { fn for_new_blocks(&self, blockhashes: B, daemon: &Daemon, func: F) -> Result<()> where B: IntoIterator, - F: FnMut(BlockHash, Block) + Send, + F: FnMut(BlockHash, Block), { daemon.for_blocks( blockhashes