1
0
mirror of https://github.com/romanz/electrs.git synced 2024-11-19 09:54:09 +01:00

Rewrite and simplify p2p message receiving thread

Add a few more utilization metrics.
This commit is contained in:
Roman Zeyde 2021-10-12 19:41:42 +03:00
parent cd0531b8b7
commit 05e0221b8e

View File

@ -1,9 +1,12 @@
use anyhow::{Context, Result};
use bitcoin::{
consensus::{encode, Decodable},
consensus::{
encode::{self, ReadExt, VarInt},
Decodable,
},
network::{
address, constants,
message::{self, NetworkMessage},
message::{self, CommandString, NetworkMessage},
message_blockdata::{GetHeadersMessage, Inventory},
message_network,
},
@ -12,14 +15,13 @@ use bitcoin::{
};
use crossbeam_channel::{bounded, select, Receiver, Sender};
use std::io::{ErrorKind, Read, Write};
use std::iter::FromIterator;
use std::io::{self, ErrorKind, Write};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
use std::time::{SystemTime, UNIX_EPOCH};
use crate::{
chain::{Chain, NewHeader},
metrics::{default_duration_buckets, Histogram, Metrics},
metrics::{default_duration_buckets, default_size_buckets, Histogram, Metrics},
};
enum Request {
@ -89,29 +91,30 @@ impl Connection {
B: IntoIterator<Item = BlockHash>,
F: FnMut(BlockHash, Block),
{
let blockhashes = Vec::from_iter(blockhashes);
if blockhashes.is_empty() {
return Ok(());
}
self.blocks_duration.observe_duration("send", || {
debug!("loading {} blocks", blockhashes.len());
self.req_send.send(Request::get_blocks(&blockhashes))
})?;
for hash in blockhashes {
let block = self.blocks_duration.observe_duration("recv", || {
self.blocks_recv
.recv()
.with_context(|| format!("failed to get block {}", hash))
self.blocks_duration.observe_duration("total", || {
let blockhashes: Vec<BlockHash> = blockhashes.into_iter().collect();
if blockhashes.is_empty() {
return Ok(());
}
self.blocks_duration.observe_duration("request", || {
debug!("loading {} blocks", blockhashes.len());
self.req_send.send(Request::get_blocks(&blockhashes))
})?;
self.blocks_duration.observe_duration("process", || {
ensure!(block.block_hash() == hash, "got unexpected block");
func(hash, block);
Ok(())
})?;
}
Ok(())
for hash in blockhashes {
let block = self.blocks_duration.observe_duration("response", || {
let block = self
.blocks_recv
.recv()
.with_context(|| format!("failed to get block {}", hash))?;
ensure!(block.block_hash() == hash, "got unexpected block");
Ok(block)
})?;
self.blocks_duration
.observe_duration("process", || func(hash, block));
}
Ok(())
})
}
/// Note: only a single receiver will get the notification (https://github.com/romanz/electrs/pull/526#issuecomment-934687415).
@ -126,32 +129,37 @@ impl Connection {
) -> Result<Self> {
let mut stream = TcpStream::connect(address)
.with_context(|| format!("{} p2p failed to connect: {:?}", network, address))?;
let mut reader = StreamReader::new(
stream.try_clone().context("stream failed to clone")?,
/*buffer_size*/ Some(1 << 20),
);
let (tx_send, tx_recv) = bounded::<NetworkMessage>(1);
let (rx_send, rx_recv) = bounded::<NetworkMessage>(1);
let send_duration = metrics.histogram_vec(
"p2p_send_duration",
"Time spent sending p2p messages",
"Time spent sending p2p messages (in seconds)",
"step",
default_duration_buckets(),
);
let recv_duration = metrics.histogram_vec(
"p2p_recv_duration",
"Time spent receiving p2p messages",
"Time spent receiving p2p messages (in seconds)",
"step",
default_duration_buckets(),
);
let recv_size = metrics.histogram_vec(
"p2p_recv_size",
"Size of p2p messages read (in bytes)",
"message",
default_size_buckets(),
);
let blocks_duration = metrics.histogram_vec(
"p2p_blocks_duration",
"Time spent getting blocks via p2p protocol",
"Time spent getting blocks via p2p protocol (in seconds)",
"step",
default_duration_buckets(),
);
let mut reader = stream.try_clone().context("stream failed to clone")?;
crate::thread::spawn("p2p_send", move || loop {
use std::net::Shutdown;
let msg = match send_duration.observe_duration("wait", || tx_recv.recv()) {
@ -179,21 +187,28 @@ impl Connection {
});
crate::thread::spawn("p2p_recv", move || loop {
use bitcoin::consensus::encode::Error;
let raw_msg = match recv_duration
.observe_duration("recv", || RawNetworkMessage::consensus_decode(&mut reader))
{
Ok(raw_msg) => {
assert_eq!(raw_msg.magic, network.magic());
recv_size.observe(raw_msg.cmd.as_ref(), raw_msg.raw.len());
raw_msg
}
Err(encode::Error::Io(e)) if e.kind() == ErrorKind::UnexpectedEof => {
debug!("closing p2p_recv thread: connection closed");
return Ok(());
}
Err(e) => bail!("failed to recv a message from peer: {}", e),
};
let raw_msg: message::RawNetworkMessage =
match recv_duration.observe_duration("recv", || reader.read_next()) {
Ok(raw_msg) => raw_msg,
Err(Error::Io(e)) if e.kind() == ErrorKind::UnexpectedEof => {
debug!("closing p2p_recv thread: connection closed");
return Ok(());
}
Err(e) => bail!("failed to recv a message from peer: {}", e),
};
let label = format!("parse_{}", raw_msg.cmd.as_ref());
let msg = recv_duration
.observe_duration(&label, || raw_msg.parse().expect("invalid message"));
recv_duration.observe_duration("wait", || {
trace!("recv: {:?}", raw_msg.payload);
rx_send.send(raw_msg.payload)
trace!("recv: {:?}", msg);
rx_send.send(msg)
})?;
});
@ -236,9 +251,9 @@ impl Connection {
NetworkMessage::Verack => {
init_send.send(())?; // peer acknowledged our version
}
NetworkMessage::Alert(_) | NetworkMessage::Addr(_) => {}
NetworkMessage::Block(block) => blocks_send.send(block)?,
NetworkMessage::Headers(headers) => headers_send.send(headers)?,
NetworkMessage::Alert(_) => (), // https://bitcoin.org/en/alert/2016-11-01-alert-retirement
msg => warn!("unexpected message: {:?}", msg),
}
}
@ -301,50 +316,52 @@ fn is_block_inv(inv: &Inventory) -> bool {
}
}
// Struct used to configure stream reader function
struct StreamReader<R: Read> {
/// Stream to read from
pub stream: R,
/// I/O buffer
data: Vec<u8>,
/// Buffer containing unparsed message part
unparsed: Vec<u8>,
struct RawNetworkMessage {
magic: u32,
cmd: CommandString,
raw: Vec<u8>,
}
impl<R: Read> StreamReader<R> {
/// Constructs new stream reader for a given input stream `stream` with
/// optional parameter `buffer_size` determining reading buffer size
pub fn new(stream: R, buffer_size: Option<usize>) -> StreamReader<R> {
StreamReader {
stream,
data: vec![0u8; buffer_size.unwrap_or(64 * 1024)],
unparsed: vec![],
}
}
/// Reads stream and parses next message from its current input,
/// also taking into account previously unparsed partial message (if there was such).
pub fn read_next<D: Decodable>(&mut self) -> Result<D, encode::Error> {
loop {
match encode::deserialize_partial::<D>(&self.unparsed) {
// In this case we just have an incomplete data, so we need to read more
Err(encode::Error::Io(ref err)) if err.kind() == ErrorKind::UnexpectedEof => {
let count = self.stream.read(&mut self.data)?;
if count > 0 {
self.unparsed.extend(self.data[0..count].iter());
} else {
return Err(encode::Error::Io(std::io::Error::from(
ErrorKind::UnexpectedEof,
)));
}
}
Err(err) => return Err(err),
// We have successfully read from the buffer
Ok((message, index)) => {
self.unparsed.drain(..index);
return Ok(message);
impl RawNetworkMessage {
fn parse(self) -> Result<NetworkMessage, encode::Error> {
let mut raw: &[u8] = &self.raw;
let payload = match self.cmd.as_ref() {
"version" => NetworkMessage::Version(Decodable::consensus_decode(&mut raw)?),
"verack" => NetworkMessage::Verack,
"inv" => NetworkMessage::Inv(Decodable::consensus_decode(&mut raw)?),
"notfound" => NetworkMessage::NotFound(Decodable::consensus_decode(&mut raw)?),
"block" => NetworkMessage::Block(Decodable::consensus_decode(&mut raw)?),
"headers" => {
let len = VarInt::consensus_decode(&mut raw)?.0;
let mut headers = Vec::with_capacity(len as usize);
for _ in 0..len {
headers.push(Block::consensus_decode(&mut raw)?.header);
}
NetworkMessage::Headers(headers)
}
}
"ping" => NetworkMessage::Ping(Decodable::consensus_decode(&mut raw)?),
"pong" => NetworkMessage::Pong(Decodable::consensus_decode(&mut raw)?),
"reject" => NetworkMessage::Reject(Decodable::consensus_decode(&mut raw)?),
"alert" => NetworkMessage::Alert(Decodable::consensus_decode(&mut raw)?),
_ => NetworkMessage::Unknown {
command: self.cmd,
payload: self.raw,
},
};
Ok(payload)
}
}
impl Decodable for RawNetworkMessage {
fn consensus_decode<D: io::Read>(mut d: D) -> Result<Self, encode::Error> {
let magic = Decodable::consensus_decode(&mut d)?;
let cmd = Decodable::consensus_decode(&mut d)?;
let len = u32::consensus_decode(&mut d)?;
let _checksum = <[u8; 4]>::consensus_decode(&mut d)?; // assume data is correct
let mut raw = vec![0u8; len as usize];
d.read_slice(&mut raw)?;
Ok(RawNetworkMessage { magic, cmd, raw })
}
}