mirror of
https://github.com/lightningdevkit/rust-lightning.git
synced 2025-01-19 05:43:55 +01:00
Merge pull request #1023 from TheBlueMatt/2021-07-par-gossip-processing
This commit is contained in:
commit
b5a63070f5
@ -120,11 +120,28 @@ struct Connection {
|
||||
id: u64,
|
||||
}
|
||||
impl Connection {
|
||||
async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, mut event_receiver: mpsc::Receiver<()>) where
|
||||
CMH: ChannelMessageHandler + 'static + Send + Sync,
|
||||
RMH: RoutingMessageHandler + 'static + Send + Sync,
|
||||
L: Logger + 'static + ?Sized + Send + Sync,
|
||||
UMH: CustomMessageHandler + 'static + Send + Sync {
|
||||
loop {
|
||||
if event_receiver.recv().await.is_none() {
|
||||
return;
|
||||
}
|
||||
peer_manager.process_events();
|
||||
}
|
||||
}
|
||||
|
||||
async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
|
||||
CMH: ChannelMessageHandler + 'static,
|
||||
RMH: RoutingMessageHandler + 'static,
|
||||
L: Logger + 'static + ?Sized,
|
||||
UMH: CustomMessageHandler + 'static {
|
||||
CMH: ChannelMessageHandler + 'static + Send + Sync,
|
||||
RMH: RoutingMessageHandler + 'static + Send + Sync,
|
||||
L: Logger + 'static + ?Sized + Send + Sync,
|
||||
UMH: CustomMessageHandler + 'static + Send + Sync {
|
||||
// Create a waker to wake up poll_event_process, above
|
||||
let (event_waker, event_receiver) = mpsc::channel(1);
|
||||
tokio::spawn(Self::poll_event_process(Arc::clone(&peer_manager), event_receiver));
|
||||
|
||||
// 8KB is nice and big but also should never cause any issues with stack overflowing.
|
||||
let mut buf = [0; 8192];
|
||||
|
||||
@ -175,7 +192,14 @@ impl Connection {
|
||||
Err(_) => break Disconnect::PeerDisconnected,
|
||||
},
|
||||
}
|
||||
peer_manager.process_events();
|
||||
let _ = event_waker.try_send(());
|
||||
|
||||
// At this point we've processed a message or two, and reset the ping timer for this
|
||||
// peer, at least in the "are we still receiving messages" context, if we don't give up
|
||||
// our timeslice to another task we may just spin on this peer, starving other peers
|
||||
// and eventually disconnecting them for ping timeouts. Instead, we explicitly yield
|
||||
// here.
|
||||
tokio::task::yield_now().await;
|
||||
};
|
||||
let writer_option = us.lock().unwrap().writer.take();
|
||||
if let Some(mut writer) = writer_option {
|
||||
@ -443,6 +467,9 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
|
||||
// pause read given we're now waiting on the remote end to ACK (and in
|
||||
// accordance with the send_data() docs).
|
||||
us.read_paused = true;
|
||||
// Further, to avoid any current pending read causing a `read_event` call, wake
|
||||
// up the read_waker and restart its loop.
|
||||
let _ = us.read_waker.try_send(());
|
||||
return written_len;
|
||||
},
|
||||
}
|
||||
|
@ -362,3 +362,5 @@ fn read_write_lockorder_fail() {
|
||||
let _a = a.write().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
pub type FairRwLock<T> = RwLock<T>;
|
||||
|
@ -159,6 +159,8 @@ mod sync {
|
||||
pub use debug_sync::*;
|
||||
#[cfg(not(test))]
|
||||
pub use ::std::sync::{Arc, Mutex, Condvar, MutexGuard, RwLock, RwLockReadGuard};
|
||||
#[cfg(not(test))]
|
||||
pub use crate::util::fairrwlock::FairRwLock;
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "std"))]
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -28,11 +28,26 @@ pub trait CustomMessageReader {
|
||||
fn read<R: io::Read>(&self, message_type: u16, buffer: &mut R) -> Result<Option<Self::CustomMessage>, msgs::DecodeError>;
|
||||
}
|
||||
|
||||
// TestEq is a dummy trait which requires PartialEq when built in testing, and otherwise is
|
||||
// blanket-implemented for all types.
|
||||
|
||||
#[cfg(test)]
|
||||
pub trait TestEq : PartialEq {}
|
||||
#[cfg(test)]
|
||||
impl<T: PartialEq> TestEq for T {}
|
||||
|
||||
#[cfg(not(test))]
|
||||
pub(crate) trait TestEq {}
|
||||
#[cfg(not(test))]
|
||||
impl<T> TestEq for T {}
|
||||
|
||||
|
||||
/// A Lightning message returned by [`read()`] when decoding bytes received over the wire. Each
|
||||
/// variant contains a message from [`msgs`] or otherwise the message type if unknown.
|
||||
#[allow(missing_docs)]
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum Message<T> where T: core::fmt::Debug + Type {
|
||||
#[cfg_attr(test, derive(PartialEq))]
|
||||
pub(crate) enum Message<T> where T: core::fmt::Debug + Type + TestEq {
|
||||
Init(msgs::Init),
|
||||
Error(msgs::ErrorMessage),
|
||||
Warning(msgs::WarningMessage),
|
||||
@ -69,7 +84,7 @@ pub(crate) enum Message<T> where T: core::fmt::Debug + Type {
|
||||
Custom(T),
|
||||
}
|
||||
|
||||
impl<T> Message<T> where T: core::fmt::Debug + Type {
|
||||
impl<T> Message<T> where T: core::fmt::Debug + Type + TestEq {
|
||||
/// Returns the type that was used to decode the message payload.
|
||||
pub fn type_id(&self) -> u16 {
|
||||
match self {
|
||||
@ -252,6 +267,7 @@ mod encode {
|
||||
|
||||
pub(crate) use self::encode::Encode;
|
||||
|
||||
#[cfg(not(test))]
|
||||
/// Defines a type identifier for sending messages over the wire.
|
||||
///
|
||||
/// Messages implementing this trait specify a type and must be [`Writeable`].
|
||||
@ -260,10 +276,24 @@ pub trait Type: core::fmt::Debug + Writeable {
|
||||
fn type_id(&self) -> u16;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub trait Type: core::fmt::Debug + Writeable + PartialEq {
|
||||
fn type_id(&self) -> u16;
|
||||
}
|
||||
|
||||
#[cfg(any(feature = "_test_utils", fuzzing, test))]
|
||||
impl Type for () {
|
||||
fn type_id(&self) -> u16 { unreachable!(); }
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl<T: core::fmt::Debug + Writeable + PartialEq> Type for T where T: Encode {
|
||||
fn type_id(&self) -> u16 { T::TYPE }
|
||||
}
|
||||
|
||||
#[cfg(not(test))]
|
||||
impl<T: core::fmt::Debug + Writeable> Type for T where T: Encode {
|
||||
fn type_id(&self) -> u16 {
|
||||
T::TYPE
|
||||
}
|
||||
fn type_id(&self) -> u16 { T::TYPE }
|
||||
}
|
||||
|
||||
impl Encode for msgs::Init {
|
||||
@ -471,10 +501,6 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
impl Type for () {
|
||||
fn type_id(&self) -> u16 { unreachable!(); }
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_even_message_type() {
|
||||
let message = Message::<()>::Unknown(42);
|
||||
|
@ -113,3 +113,5 @@ impl<T> RwLock<T> {
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
|
||||
pub type FairRwLock<T> = RwLock<T>;
|
||||
|
50
lightning/src/util/fairrwlock.rs
Normal file
50
lightning/src/util/fairrwlock.rs
Normal file
@ -0,0 +1,50 @@
|
||||
use std::sync::{TryLockResult, LockResult, RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
/// Rust libstd's RwLock does not provide any fairness guarantees (and, in fact, when used on
|
||||
/// Linux with pthreads under the hood, readers trivially and completely starve writers).
|
||||
/// Because we often hold read locks while doing message processing in multiple threads which
|
||||
/// can use significant CPU time, with write locks being time-sensitive but relatively small in
|
||||
/// CPU time, we can end up with starvation completely blocking incoming connections or pings,
|
||||
/// especially during initial graph sync.
|
||||
///
|
||||
/// Thus, we need to block readers when a writer is pending, which we do with a trivial RwLock
|
||||
/// wrapper here. Its not particularly optimized, but provides some reasonable fairness by
|
||||
/// blocking readers (by taking the write lock) if there are writers pending when we go to take
|
||||
/// a read lock.
|
||||
pub struct FairRwLock<T> {
|
||||
lock: RwLock<T>,
|
||||
waiting_writers: AtomicUsize,
|
||||
}
|
||||
|
||||
impl<T> FairRwLock<T> {
|
||||
pub fn new(t: T) -> Self {
|
||||
Self { lock: RwLock::new(t), waiting_writers: AtomicUsize::new(0) }
|
||||
}
|
||||
|
||||
// Note that all atomic accesses are relaxed, as we do not rely on the atomics here for any
|
||||
// ordering at all, instead relying on the underlying RwLock to provide ordering of unrelated
|
||||
// memory.
|
||||
pub fn write(&self) -> LockResult<RwLockWriteGuard<T>> {
|
||||
self.waiting_writers.fetch_add(1, Ordering::Relaxed);
|
||||
let res = self.lock.write();
|
||||
self.waiting_writers.fetch_sub(1, Ordering::Relaxed);
|
||||
res
|
||||
}
|
||||
|
||||
pub fn try_write(&self) -> TryLockResult<RwLockWriteGuard<T>> {
|
||||
self.lock.try_write()
|
||||
}
|
||||
|
||||
pub fn read(&self) -> LockResult<RwLockReadGuard<T>> {
|
||||
if self.waiting_writers.load(Ordering::Relaxed) != 0 {
|
||||
let _write_queue_lock = self.lock.write();
|
||||
}
|
||||
// Note that we don't consider ensuring that an underlying RwLock allowing writers to
|
||||
// starve readers doesn't exhibit the same behavior here. I'm not aware of any
|
||||
// libstd-backing RwLock which exhibits this behavior, and as documented in the
|
||||
// struct-level documentation, it shouldn't pose a significant issue for our current
|
||||
// codebase.
|
||||
self.lock.read()
|
||||
}
|
||||
}
|
@ -25,6 +25,8 @@ pub mod persist;
|
||||
pub(crate) mod atomic_counter;
|
||||
pub(crate) mod byte_utils;
|
||||
pub(crate) mod chacha20;
|
||||
#[cfg(feature = "std")]
|
||||
pub(crate) mod fairrwlock;
|
||||
#[cfg(fuzzing)]
|
||||
pub mod zbase32;
|
||||
#[cfg(not(fuzzing))]
|
||||
|
@ -18,7 +18,7 @@ use chain::channelmonitor::MonitorEvent;
|
||||
use chain::transaction::OutPoint;
|
||||
use chain::keysinterface;
|
||||
use ln::features::{ChannelFeatures, InitFeatures};
|
||||
use ln::msgs;
|
||||
use ln::{msgs, wire};
|
||||
use ln::msgs::OptionalField;
|
||||
use ln::script::ShutdownScript;
|
||||
use routing::scoring::FixedPenaltyScorer;
|
||||
@ -249,37 +249,106 @@ impl chaininterface::BroadcasterInterface for TestBroadcaster {
|
||||
|
||||
pub struct TestChannelMessageHandler {
|
||||
pub pending_events: Mutex<Vec<events::MessageSendEvent>>,
|
||||
expected_recv_msgs: Mutex<Option<Vec<wire::Message<()>>>>,
|
||||
}
|
||||
|
||||
impl TestChannelMessageHandler {
|
||||
pub fn new() -> Self {
|
||||
TestChannelMessageHandler {
|
||||
pending_events: Mutex::new(Vec::new()),
|
||||
expected_recv_msgs: Mutex::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn expect_receive_msg(&self, ev: wire::Message<()>) {
|
||||
let mut expected_msgs = self.expected_recv_msgs.lock().unwrap();
|
||||
if expected_msgs.is_none() { *expected_msgs = Some(Vec::new()); }
|
||||
expected_msgs.as_mut().unwrap().push(ev);
|
||||
}
|
||||
|
||||
fn received_msg(&self, ev: wire::Message<()>) {
|
||||
let mut msgs = self.expected_recv_msgs.lock().unwrap();
|
||||
if msgs.is_none() { return; }
|
||||
assert!(!msgs.as_ref().unwrap().is_empty(), "Received message when we weren't expecting one");
|
||||
#[cfg(test)]
|
||||
assert_eq!(msgs.as_ref().unwrap()[0], ev);
|
||||
msgs.as_mut().unwrap().remove(0);
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TestChannelMessageHandler {
|
||||
fn drop(&mut self) {
|
||||
let l = self.expected_recv_msgs.lock().unwrap();
|
||||
#[cfg(feature = "std")]
|
||||
{
|
||||
if !std::thread::panicking() {
|
||||
assert!(l.is_none() || l.as_ref().unwrap().is_empty());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl msgs::ChannelMessageHandler for TestChannelMessageHandler {
|
||||
fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &msgs::OpenChannel) {}
|
||||
fn handle_accept_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &msgs::AcceptChannel) {}
|
||||
fn handle_funding_created(&self, _their_node_id: &PublicKey, _msg: &msgs::FundingCreated) {}
|
||||
fn handle_funding_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::FundingSigned) {}
|
||||
fn handle_funding_locked(&self, _their_node_id: &PublicKey, _msg: &msgs::FundingLocked) {}
|
||||
fn handle_shutdown(&self, _their_node_id: &PublicKey, _their_features: &InitFeatures, _msg: &msgs::Shutdown) {}
|
||||
fn handle_closing_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::ClosingSigned) {}
|
||||
fn handle_update_add_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateAddHTLC) {}
|
||||
fn handle_update_fulfill_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFulfillHTLC) {}
|
||||
fn handle_update_fail_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFailHTLC) {}
|
||||
fn handle_update_fail_malformed_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFailMalformedHTLC) {}
|
||||
fn handle_commitment_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::CommitmentSigned) {}
|
||||
fn handle_revoke_and_ack(&self, _their_node_id: &PublicKey, _msg: &msgs::RevokeAndACK) {}
|
||||
fn handle_update_fee(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFee) {}
|
||||
fn handle_channel_update(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelUpdate) {}
|
||||
fn handle_announcement_signatures(&self, _their_node_id: &PublicKey, _msg: &msgs::AnnouncementSignatures) {}
|
||||
fn handle_channel_reestablish(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelReestablish) {}
|
||||
fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, msg: &msgs::OpenChannel) {
|
||||
self.received_msg(wire::Message::OpenChannel(msg.clone()));
|
||||
}
|
||||
fn handle_accept_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, msg: &msgs::AcceptChannel) {
|
||||
self.received_msg(wire::Message::AcceptChannel(msg.clone()));
|
||||
}
|
||||
fn handle_funding_created(&self, _their_node_id: &PublicKey, msg: &msgs::FundingCreated) {
|
||||
self.received_msg(wire::Message::FundingCreated(msg.clone()));
|
||||
}
|
||||
fn handle_funding_signed(&self, _their_node_id: &PublicKey, msg: &msgs::FundingSigned) {
|
||||
self.received_msg(wire::Message::FundingSigned(msg.clone()));
|
||||
}
|
||||
fn handle_funding_locked(&self, _their_node_id: &PublicKey, msg: &msgs::FundingLocked) {
|
||||
self.received_msg(wire::Message::FundingLocked(msg.clone()));
|
||||
}
|
||||
fn handle_shutdown(&self, _their_node_id: &PublicKey, _their_features: &InitFeatures, msg: &msgs::Shutdown) {
|
||||
self.received_msg(wire::Message::Shutdown(msg.clone()));
|
||||
}
|
||||
fn handle_closing_signed(&self, _their_node_id: &PublicKey, msg: &msgs::ClosingSigned) {
|
||||
self.received_msg(wire::Message::ClosingSigned(msg.clone()));
|
||||
}
|
||||
fn handle_update_add_htlc(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) {
|
||||
self.received_msg(wire::Message::UpdateAddHTLC(msg.clone()));
|
||||
}
|
||||
fn handle_update_fulfill_htlc(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) {
|
||||
self.received_msg(wire::Message::UpdateFulfillHTLC(msg.clone()));
|
||||
}
|
||||
fn handle_update_fail_htlc(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) {
|
||||
self.received_msg(wire::Message::UpdateFailHTLC(msg.clone()));
|
||||
}
|
||||
fn handle_update_fail_malformed_htlc(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) {
|
||||
self.received_msg(wire::Message::UpdateFailMalformedHTLC(msg.clone()));
|
||||
}
|
||||
fn handle_commitment_signed(&self, _their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) {
|
||||
self.received_msg(wire::Message::CommitmentSigned(msg.clone()));
|
||||
}
|
||||
fn handle_revoke_and_ack(&self, _their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) {
|
||||
self.received_msg(wire::Message::RevokeAndACK(msg.clone()));
|
||||
}
|
||||
fn handle_update_fee(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateFee) {
|
||||
self.received_msg(wire::Message::UpdateFee(msg.clone()));
|
||||
}
|
||||
fn handle_channel_update(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelUpdate) {
|
||||
// Don't call `received_msg` here as `TestRoutingMessageHandler` generates these sometimes
|
||||
}
|
||||
fn handle_announcement_signatures(&self, _their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) {
|
||||
self.received_msg(wire::Message::AnnouncementSignatures(msg.clone()));
|
||||
}
|
||||
fn handle_channel_reestablish(&self, _their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) {
|
||||
self.received_msg(wire::Message::ChannelReestablish(msg.clone()));
|
||||
}
|
||||
fn peer_disconnected(&self, _their_node_id: &PublicKey, _no_connection_possible: bool) {}
|
||||
fn peer_connected(&self, _their_node_id: &PublicKey, _msg: &msgs::Init) {}
|
||||
fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {}
|
||||
fn peer_connected(&self, _their_node_id: &PublicKey, _msg: &msgs::Init) {
|
||||
// Don't bother with `received_msg` for Init as its auto-generated and we don't want to
|
||||
// bother re-generating the expected Init message in all tests.
|
||||
}
|
||||
fn handle_error(&self, _their_node_id: &PublicKey, msg: &msgs::ErrorMessage) {
|
||||
self.received_msg(wire::Message::Error(msg.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
impl events::MessageSendEventsProvider for TestChannelMessageHandler {
|
||||
|
Loading…
Reference in New Issue
Block a user