Limit blocked PeerManager::process_events waiters to two

Only one instance of PeerManager::process_events can run at a time,
and each run always finishes all available work before returning.
Thus, having several threads blocked on the process_events lock
doesn't accomplish anything but blocking more threads.

Here we limit the number of blocked calls on process_events to two
- one processing events and one blocked at the top which will
process all available events after the first completes.
This commit is contained in:
Matt Corallo 2021-10-06 04:45:07 +00:00
parent 4f50a94a3f
commit 97711aef96

View file

@ -34,6 +34,7 @@ use prelude::*;
use io;
use alloc::collections::LinkedList;
use sync::{Arc, Mutex, MutexGuard, RwLock};
use core::sync::atomic::{AtomicBool, Ordering};
use core::{cmp, hash, fmt, mem};
use core::ops::Deref;
use core::convert::Infallible;
@ -437,6 +438,11 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: De
/// `peers` write lock to do so, so instead we block on this empty mutex when entering
/// `process_events`.
event_processing_lock: Mutex<()>,
/// Because event processing is global and always does all available work before returning,
/// there is no reason for us to have many event processors waiting on the lock at once.
/// Instead, we limit the total blocked event processors to always exactly one by setting this
/// when an event process call is waiting.
blocked_event_processors: AtomicBool,
our_node_secret: SecretKey,
ephemeral_key_midstate: Sha256Engine,
custom_message_handler: CMH,
@ -569,6 +575,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
}),
node_id_to_descriptor: Mutex::new(HashMap::new()),
event_processing_lock: Mutex::new(()),
blocked_event_processors: AtomicBool::new(false),
our_node_secret,
ephemeral_key_midstate,
peer_counter: AtomicCounter::new(),
@ -1369,11 +1376,34 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
/// You don't have to call this function explicitly if you are using [`lightning-net-tokio`]
/// or one of the other clients provided in our language bindings.
///
/// Note that if there are any other calls to this function waiting on lock(s) this may return
/// without doing any work. All available events that need handling will be handled before the
/// other calls return.
///
/// [`send_payment`]: crate::ln::channelmanager::ChannelManager::send_payment
/// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards
/// [`send_data`]: SocketDescriptor::send_data
pub fn process_events(&self) {
let _single_processor_lock = self.event_processing_lock.lock().unwrap();
let mut _single_processor_lock = self.event_processing_lock.try_lock();
if _single_processor_lock.is_err() {
// While we could wake the older sleeper here with a CV and make more even waiting
// times, that would be a lot of overengineering for a simple "reduce total waiter
// count" goal.
match self.blocked_event_processors.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) {
Err(val) => {
debug_assert!(val, "compare_exchange failed spuriously?");
return;
},
Ok(val) => {
debug_assert!(!val, "compare_exchange succeeded spuriously?");
// We're the only waiter, as the running process_events may have emptied the
// pending events "long" ago and there are new events for us to process, wait until
// its done and process any leftover events before returning.
_single_processor_lock = Ok(self.event_processing_lock.lock().unwrap());
self.blocked_event_processors.store(false, Ordering::Release);
}
}
}
let mut peers_to_disconnect = HashMap::new();
let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();