1
0
Fork 0
mirror of https://github.com/romanz/electrs.git synced 2025-02-24 15:02:21 +01:00

Use p2p protocol to replace waitfornewblock hidden RPC

Fixes https://github.com/romanz/electrs/issues/522.
This commit is contained in:
Roman Zeyde 2021-10-01 20:07:36 +03:00
parent 1a7d624e6a
commit 1402d290dc
5 changed files with 220 additions and 170 deletions

View file

@ -4,6 +4,7 @@ use bitcoin::{
consensus::serialize, hashes::hex::ToHex, Amount, Block, BlockHash, Transaction, Txid,
};
use bitcoincore_rpc::{json, jsonrpc, Auth, Client, RpcApi};
use crossbeam_channel::Receiver;
use parking_lot::Mutex;
use serde_json::{json, Value};
@ -70,7 +71,7 @@ fn read_cookie(path: &Path) -> Result<(String, String)> {
Ok((parts[0].to_owned(), parts[1].to_owned()))
}
pub(crate) fn rpc_connect(config: &Config) -> Result<Client> {
fn rpc_connect(config: &Config) -> Result<Client> {
let rpc_url = format!("http://{}", config.daemon_rpc_addr);
let mut client = {
// Allow `wait_for_new_block` to take a bit longer before timing out.
@ -218,10 +219,14 @@ impl Daemon {
pub(crate) fn for_blocks<B, F>(&self, blockhashes: B, func: F) -> Result<()>
where
B: IntoIterator<Item = BlockHash>,
F: FnMut(BlockHash, Block) + Send,
F: FnMut(BlockHash, Block),
{
self.p2p.lock().for_blocks(blockhashes, func)
}
pub(crate) fn new_block_notification(&self) -> Receiver<()> {
self.p2p.lock().new_block_notification()
}
}
pub(crate) type RpcError = bitcoincore_rpc::jsonrpc::error::RpcError;

View file

@ -4,6 +4,7 @@ use bitcoin::{
hashes::hex::{FromHex, ToHex},
BlockHash, Txid,
};
use crossbeam_channel::Receiver;
use rayon::prelude::*;
use serde_derive::Deserialize;
use serde_json::{self, json, Value};
@ -152,6 +153,10 @@ impl Rpc {
&self.signal
}
pub fn new_block_notification(&self) -> Receiver<()> {
self.daemon.new_block_notification()
}
pub fn sync(&mut self) -> Result<()> {
self.tracker.sync(&self.daemon, self.signal.exit_flag())
}

View file

@ -1,9 +1,3 @@
use std::io::Write;
use std::iter::FromIterator;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
use std::time::{SystemTime, UNIX_EPOCH};
use crate::chain::{Chain, NewHeader};
use anyhow::{Context, Result};
use bitcoin::{
consensus::encode,
@ -15,125 +9,56 @@ use bitcoin::{
stream_reader::StreamReader,
},
secp256k1::{self, rand::Rng},
Block, BlockHash, Network,
Block, BlockHash, BlockHeader, Network,
};
use crossbeam_channel::{bounded, select, Receiver, Sender};
use std::io::Write;
use std::iter::FromIterator;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
use std::time::{SystemTime, UNIX_EPOCH};
use crate::chain::{Chain, NewHeader};
enum Request {
GetNewHeaders(GetHeadersMessage),
GetBlocks(Vec<Inventory>),
}
impl Request {
fn get_new_headers(chain: &Chain) -> Request {
Request::GetNewHeaders(GetHeadersMessage::new(
chain.locator(),
BlockHash::default(),
))
}
fn get_blocks(blockhashes: &[BlockHash]) -> Request {
Request::GetBlocks(
blockhashes
.iter()
.map(|blockhash| Inventory::WitnessBlock(*blockhash))
.collect(),
)
}
}
pub(crate) struct Connection {
stream: TcpStream,
reader: StreamReader<TcpStream>,
network: Network,
req_send: Sender<Request>,
blocks_recv: Receiver<Block>,
headers_recv: Receiver<Vec<BlockHeader>>,
new_block_recv: Receiver<()>,
}
impl Connection {
/// Connect to a Bitcoin node via p2p protocol.
/// See https://en.bitcoin.it/wiki/Protocol_documentation for details.
pub fn connect(network: Network, address: SocketAddr) -> Result<Self> {
let stream = TcpStream::connect(address)
.with_context(|| format!("{} p2p failed to connect: {:?}", network, address))?;
let reader = StreamReader::new(
stream.try_clone().context("stream failed to clone")?,
/*buffer_size*/ Some(1 << 20),
);
let mut conn = Self {
stream,
reader,
network,
};
conn.send(build_version_message())?;
if let NetworkMessage::GetHeaders(_) = conn.recv().context("failed to get headers")? {
conn.send(NetworkMessage::Headers(vec![]))?;
}
Ok(conn)
}
fn send(&mut self, msg: NetworkMessage) -> Result<()> {
trace!("send: {:?}", msg);
let raw_msg = message::RawNetworkMessage {
magic: self.network.magic(),
payload: msg,
};
self.stream
.write_all(encode::serialize(&raw_msg).as_slice())
.context("p2p failed to send")
}
fn recv(&mut self) -> Result<NetworkMessage> {
loop {
let raw_msg: message::RawNetworkMessage =
self.reader.read_next().context("p2p failed to recv")?;
trace!("recv: {:?}", raw_msg.payload);
match raw_msg.payload {
NetworkMessage::Version(version) => {
debug!("peer version: {:?}", version);
self.send(NetworkMessage::Verack)?;
}
NetworkMessage::Ping(nonce) => {
self.send(NetworkMessage::Pong(nonce))?;
}
NetworkMessage::Verack
| NetworkMessage::Alert(_)
| NetworkMessage::Addr(_)
| NetworkMessage::Inv(_) => {}
payload => return Ok(payload),
};
}
}
/// Request and process the specified blocks (in the specified order).
/// See https://en.bitcoin.it/wiki/Protocol_documentation#getblocks for details.
pub(crate) fn for_blocks<B, F>(&mut self, blockhashes: B, mut func: F) -> Result<()>
where
B: IntoIterator<Item = BlockHash>,
F: FnMut(BlockHash, Block) + Send,
{
let blockhashes = Vec::from_iter(blockhashes);
if blockhashes.is_empty() {
return Ok(());
}
let inv = blockhashes
.iter()
.map(|h| Inventory::WitnessBlock(*h))
.collect();
debug!("loading {} blocks", blockhashes.len());
self.send(NetworkMessage::GetData(inv))?;
// receive, parse and process the blocks concurrently
rayon::scope(|s| {
let (tx, rx) = crossbeam_channel::bounded(10);
s.spawn(|_| {
// the loop will exit when the sender is dropped
for (hash, block) in rx {
func(hash, block);
}
});
for hash in blockhashes {
match self
.recv()
.with_context(|| format!("failed to get block {}", hash))?
{
NetworkMessage::Block(block) => {
ensure!(block.block_hash() == hash, "got unexpected block");
tx.send((hash, block))
.context("disconnected from block processor")?;
}
msg => bail!("unexpected {:?}", msg),
};
}
Ok(())
})
}
/// Get new block headers (supporting reorgs).
/// https://en.bitcoin.it/wiki/Protocol_documentation#getheaders
pub(crate) fn get_new_headers(&mut self, chain: &Chain) -> Result<Vec<NewHeader>> {
let msg = GetHeadersMessage::new(chain.locator(), BlockHash::default());
self.send(NetworkMessage::GetHeaders(msg))?;
let headers = match self.recv().context("failed to get new headers")? {
NetworkMessage::Headers(headers) => headers,
msg => bail!("unexpected {:?}", msg),
};
self.req_send.send(Request::get_new_headers(chain))?;
let headers = self
.headers_recv
.recv()
.context("failed to get new headers")?;
debug!("got {} new headers", headers.len());
let prev_blockhash = match headers.first().map(|h| h.prev_blockhash) {
@ -150,6 +75,160 @@ impl Connection {
.map(NewHeader::from)
.collect())
}
/// Request and process the specified blocks (in the specified order).
/// See https://en.bitcoin.it/wiki/Protocol_documentation#getblocks for details.
pub(crate) fn for_blocks<B, F>(&mut self, blockhashes: B, mut func: F) -> Result<()>
where
B: IntoIterator<Item = BlockHash>,
F: FnMut(BlockHash, Block),
{
let blockhashes = Vec::from_iter(blockhashes);
if blockhashes.is_empty() {
return Ok(());
}
debug!("loading {} blocks", blockhashes.len());
self.req_send.send(Request::get_blocks(&blockhashes))?;
for hash in blockhashes {
let block = self
.blocks_recv
.recv()
.with_context(|| format!("failed to get block {}", hash))?;
ensure!(block.block_hash() == hash, "got unexpected block");
func(hash, block);
}
Ok(())
}
pub(crate) fn new_block_notification(&self) -> Receiver<()> {
self.new_block_recv.clone()
}
pub(crate) fn connect(network: Network, address: SocketAddr) -> Result<Self> {
let mut stream = TcpStream::connect(address)
.with_context(|| format!("{} p2p failed to connect: {:?}", network, address))?;
let mut reader = StreamReader::new(
stream.try_clone().context("stream failed to clone")?,
/*buffer_size*/ Some(1 << 20),
);
let (tx_send, tx_recv) = bounded::<NetworkMessage>(1);
let (rx_send, rx_recv) = bounded::<NetworkMessage>(1);
crate::thread::spawn("p2p_send", move || loop {
use std::net::Shutdown;
let msg = match tx_recv.recv() {
Ok(msg) => msg,
Err(_) => {
// p2p_loop is closed, so tx_send is disconnected
debug!("closing p2p_send thread: no more messages to send");
// close the stream reader (p2p_recv thread may block on it)
if let Err(e) = stream.shutdown(Shutdown::Read) {
warn!("failed to shutdown p2p connection: {}", e)
}
return Ok(());
}
};
trace!("send: {:?}", msg);
let raw_msg = message::RawNetworkMessage {
magic: network.magic(),
payload: msg,
};
stream
.write_all(encode::serialize(&raw_msg).as_slice())
.context("p2p failed to send")?;
});
crate::thread::spawn("p2p_recv", move || loop {
use bitcoin::consensus::encode::Error;
use std::io::ErrorKind;
let raw_msg: message::RawNetworkMessage = match reader.read_next() {
Ok(raw_msg) => raw_msg,
Err(Error::Io(e)) if e.kind() == ErrorKind::UnexpectedEof => {
debug!("closing p2p_recv thread: connection closed");
return Ok(());
}
Err(e) => bail!("failed to recv a message from peer: {}", e),
};
trace!("recv: {:?}", raw_msg.payload);
rx_send.send(raw_msg.payload)?;
});
let (req_send, req_recv) = bounded::<Request>(1);
let (blocks_send, blocks_recv) = bounded::<Block>(10);
let (headers_send, headers_recv) = bounded::<Vec<BlockHeader>>(1);
let (new_block_send, new_block_recv) = bounded::<()>(0);
let (init_send, init_recv) = bounded::<()>(0);
tx_send.send(build_version_message())?;
crate::thread::spawn("p2p_loop", move || loop {
select! {
recv(rx_recv) -> result => {
let msg = match result {
Ok(msg) => msg,
Err(_) => { // p2p_recv is closed, so rx_send is disconnected
debug!("closing p2p_loop thread: peer has disconnected");
return Ok(());
}
};
match msg {
NetworkMessage::GetHeaders(_) => {
tx_send.send(NetworkMessage::Headers(vec![]))?;
}
NetworkMessage::Version(version) => {
debug!("peer version: {:?}", version);
tx_send.send(NetworkMessage::Verack)?;
}
NetworkMessage::Inv(inventory) => {
debug!("peer inventory: {:?}", inventory);
if inventory.iter().any(is_block_inv) {
let _ = new_block_send.try_send(()); // best-effort notification
}
},
NetworkMessage::Ping(nonce) => {
tx_send.send(NetworkMessage::Pong(nonce))?; // connection keep-alive
}
NetworkMessage::Verack => {
init_send.send(())?; // peer acknowledged our version
}
NetworkMessage::Alert(_) | NetworkMessage::Addr(_) => {}
NetworkMessage::Block(block) => blocks_send.send(block)?,
NetworkMessage::Headers(headers) => headers_send.send(headers)?,
msg => warn!("unexpected message: {:?}", msg),
}
}
recv(req_recv) -> result => {
let req = match result {
Ok(req) => req,
Err(_) => { // self is dropped, so req_send is disconnected
debug!("closing p2p_loop thread: no more requests to handle");
return Ok(());
}
};
let msg = match req {
Request::GetNewHeaders(msg) => NetworkMessage::GetHeaders(msg),
Request::GetBlocks(inv) => NetworkMessage::GetData(inv),
};
tx_send.send(msg)?;
}
}
});
init_recv.recv()?; // wait until `verack` is received
Ok(Connection {
req_send,
blocks_recv,
headers_recv,
new_block_recv,
})
}
}
fn build_version_message() -> NetworkMessage {
@ -173,3 +252,11 @@ fn build_version_message() -> NetworkMessage {
relay: false,
})
}
fn is_block_inv(inv: &Inventory) -> bool {
if let Inventory::Block(_) = inv {
true
} else {
false
}
}

View file

@ -1,19 +1,15 @@
use anyhow::{Context, Result};
use bitcoin::BlockHash;
use bitcoincore_rpc::RpcApi;
use crossbeam_channel::{bounded, select, unbounded, Receiver, Sender};
use crossbeam_channel::{select, unbounded, Sender};
use rayon::prelude::*;
use std::{
collections::hash_map::HashMap,
convert::TryFrom,
io::{BufRead, BufReader, Write},
net::{Shutdown, TcpListener, TcpStream},
};
use crate::{
config::Config,
daemon::rpc_connect,
electrum::{Client, Rpc},
thread::spawn,
};
@ -48,54 +44,10 @@ impl Peer {
}
}
fn handle_rpc_error(name: &str, err: bitcoincore_rpc::Error) -> Result<()> {
use bitcoincore_rpc::{
jsonrpc::{error::Error::Transport as TransportError, simple_http::Error as HttpError},
Error::JsonRpc as JsonRpcError,
};
if let JsonRpcError(TransportError(ref e)) = err {
if let Some(HttpError::Timeout) = e.downcast_ref::<HttpError>() {
// Following https://github.com/romanz/electrs/issues/495
warn!("ignoring HTTP timeout from RPC '{}'", name);
return Ok(());
}
}
bail!("RPC '{}' failed: {}", name, err); // fail on all other RPC errors
}
fn tip_receiver(config: &Config) -> Result<Receiver<BlockHash>> {
let duration = u64::try_from(config.wait_duration.as_millis()).unwrap();
let (tip_tx, tip_rx) = bounded(0);
let rpc = rpc_connect(config)?;
use crossbeam_channel::TrySendError;
spawn("tip_loop", move || loop {
let tip = match rpc.get_best_block_hash() {
Ok(tip) => tip,
Err(err) => {
handle_rpc_error("getbestblockhash", err)?;
continue;
}
};
match tip_tx.try_send(tip) {
Ok(_) | Err(TrySendError::Full(_)) => (),
Err(TrySendError::Disconnected(_)) => bail!("tip receiver disconnected"),
}
if let Err(err) = rpc.wait_for_new_block(duration) {
warn!(
"waiting {:.1}s for new block failed: {}",
duration as f64 / 1e3,
err
);
}
});
Ok(tip_rx)
}
pub fn run(config: &Config, mut rpc: Rpc) -> Result<()> {
let listener = TcpListener::bind(config.electrum_rpc_addr)?;
let tip_rx = tip_receiver(config)?;
info!("serving Electrum RPC on {}", listener.local_addr()?);
let new_block_rx = rpc.new_block_notification();
let (server_tx, server_rx) = unbounded();
spawn("accept_loop", || accept_loop(listener, server_tx)); // detach accepting thread
@ -109,7 +61,7 @@ pub fn run(config: &Config, mut rpc: Rpc) -> Result<()> {
break;
}
},
recv(tip_rx) -> tip => match tip {
recv(new_block_rx) -> result => match result {
Ok(_) => (), // sync and update
Err(_) => break, // daemon is shutting down
},
@ -117,6 +69,7 @@ pub fn run(config: &Config, mut rpc: Rpc) -> Result<()> {
let event = event.context("server disconnected")?;
handle_event(&rpc, &mut peers, event);
},
default(config.wait_duration) => (), // sync and update
};
if !server_rx.is_empty() {
continue; // continue RPC processing (if not done)

View file

@ -307,7 +307,7 @@ impl ScriptHashStatus {
fn for_new_blocks<B, F>(&self, blockhashes: B, daemon: &Daemon, func: F) -> Result<()>
where
B: IntoIterator<Item = BlockHash>,
F: FnMut(BlockHash, Block) + Send,
F: FnMut(BlockHash, Block),
{
daemon.for_blocks(
blockhashes