[net-tokio] Call PeerManager::process_events without blocking reads

Unlike very ancient versions of lightning-net-tokio, this does not
rely on a single global process_events future, but instead has one
per connection. This could still cause significant contention, so
we'll ensure only two process_events calls can exist at once in
the next few commits.
This commit is contained in:
Matt Corallo 2021-10-06 04:29:19 +00:00
parent a731efcb68
commit b1550524cf

View file

@ -121,11 +121,28 @@ struct Connection {
id: u64,
}
impl Connection {
async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, mut event_receiver: mpsc::Receiver<()>) where
CMH: ChannelMessageHandler + 'static + Send + Sync,
RMH: RoutingMessageHandler + 'static + Send + Sync,
L: Logger + 'static + ?Sized + Send + Sync,
UMH: CustomMessageHandler + 'static + Send + Sync {
loop {
if event_receiver.recv().await.is_none() {
return;
}
peer_manager.process_events();
}
}
async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
CMH: ChannelMessageHandler + 'static,
RMH: RoutingMessageHandler + 'static,
L: Logger + 'static + ?Sized,
UMH: CustomMessageHandler + 'static {
CMH: ChannelMessageHandler + 'static + Send + Sync,
RMH: RoutingMessageHandler + 'static + Send + Sync,
L: Logger + 'static + ?Sized + Send + Sync,
UMH: CustomMessageHandler + 'static + Send + Sync {
// Create a waker to wake up poll_event_process, above
let (event_waker, event_receiver) = mpsc::channel(1);
tokio::spawn(Self::poll_event_process(Arc::clone(&peer_manager), event_receiver));
// 8KB is nice and big but also should never cause any issues with stack overflowing.
let mut buf = [0; 8192];
@ -176,7 +193,7 @@ impl Connection {
Err(_) => break Disconnect::PeerDisconnected,
},
}
peer_manager.process_events();
let _ = event_waker.try_send(());
};
let writer_option = us.lock().unwrap().writer.take();
if let Some(mut writer) = writer_option {