Separate ChannelManager needing persistence from having events

Currently, when a ChannelManager generates a notification for the
background processor, any pending events are handled and the
ChannelManager is always re-persisted.

Many channel related messages don't actually change the channel
state in a way that changes the persisted channel. For example,
an `update_add_htlc` or `update_fail_htlc` message simply adds the
change to a queue, changing the channel state when we receive a
`commitment_signed` message.

In these cases we shouldn't be re-persisting the ChannelManager as
it hasn't changed (persisted) state at all. In anticipation of
doing so in the next few commits, here we make the public API
handle the two concepts (somewhat) separately. The notification
still goes out via a single waker, however whether or not to
persist is now handled via a separate atomic bool.
This commit is contained in:
Matt Corallo 2023-09-08 20:26:29 +00:00
parent 63e6b80fb0
commit 7fa499c188
2 changed files with 25 additions and 9 deletions

View file

@ -315,7 +315,7 @@ macro_rules! define_run_body {
// see `await_start`'s use below. // see `await_start`'s use below.
let mut await_start = None; let mut await_start = None;
if $check_slow_await { await_start = Some($get_timer(1)); } if $check_slow_await { await_start = Some($get_timer(1)); }
let updates_available = $await; $await;
let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false }; let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false };
// Exit the loop if the background processor was requested to stop. // Exit the loop if the background processor was requested to stop.
@ -324,7 +324,7 @@ macro_rules! define_run_body {
break; break;
} }
if updates_available { if $channel_manager.get_and_clear_needs_persistence() {
log_trace!($logger, "Persisting ChannelManager..."); log_trace!($logger, "Persisting ChannelManager...");
$persister.persist_manager(&*$channel_manager)?; $persister.persist_manager(&*$channel_manager)?;
log_trace!($logger, "Done persisting ChannelManager."); log_trace!($logger, "Done persisting ChannelManager.");
@ -660,11 +660,9 @@ where
c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }), c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
}; };
match fut.await { match fut.await {
SelectorOutput::A => true, SelectorOutput::A|SelectorOutput::B => {},
SelectorOutput::B => false,
SelectorOutput::C(exit) => { SelectorOutput::C(exit) => {
should_break = exit; should_break = exit;
false
} }
} }
}, |t| sleeper(Duration::from_secs(t)), }, |t| sleeper(Duration::from_secs(t)),
@ -787,10 +785,10 @@ impl BackgroundProcessor {
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
channel_manager, channel_manager.process_pending_events(&event_handler), channel_manager, channel_manager.process_pending_events(&event_handler),
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
Sleeper::from_two_futures( { Sleeper::from_two_futures(
channel_manager.get_event_or_persistence_needed_future(), channel_manager.get_event_or_persistence_needed_future(),
chain_monitor.get_update_future() chain_monitor.get_update_future()
).wait_timeout(Duration::from_millis(100)), ).wait_timeout(Duration::from_millis(100)); },
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false) |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false)
}); });
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }

View file

@ -1186,6 +1186,7 @@ where
background_events_processed_since_startup: AtomicBool, background_events_processed_since_startup: AtomicBool,
event_persist_notifier: Notifier, event_persist_notifier: Notifier,
needs_persist_flag: AtomicBool,
entropy_source: ES, entropy_source: ES,
node_signer: NS, node_signer: NS,
@ -1229,6 +1230,7 @@ enum NotifyOption {
/// `optionally_notify` which returns a `NotifyOption`. /// `optionally_notify` which returns a `NotifyOption`.
struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> { struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> {
event_persist_notifier: &'a Notifier, event_persist_notifier: &'a Notifier,
needs_persist_flag: &'a AtomicBool,
should_persist: F, should_persist: F,
// We hold onto this result so the lock doesn't get released immediately. // We hold onto this result so the lock doesn't get released immediately.
_read_guard: RwLockReadGuard<'a, ()>, _read_guard: RwLockReadGuard<'a, ()>,
@ -1246,6 +1248,7 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w
PersistenceNotifierGuard { PersistenceNotifierGuard {
event_persist_notifier: &cm.get_cm().event_persist_notifier, event_persist_notifier: &cm.get_cm().event_persist_notifier,
needs_persist_flag: &cm.get_cm().needs_persist_flag,
should_persist: move || { should_persist: move || {
// Pick the "most" action between `persist_check` and the background events // Pick the "most" action between `persist_check` and the background events
// processing and return that. // processing and return that.
@ -1266,6 +1269,7 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w
PersistenceNotifierGuard { PersistenceNotifierGuard {
event_persist_notifier: &cm.get_cm().event_persist_notifier, event_persist_notifier: &cm.get_cm().event_persist_notifier,
needs_persist_flag: &cm.get_cm().needs_persist_flag,
should_persist: persist_check, should_persist: persist_check,
_read_guard: read_guard, _read_guard: read_guard,
} }
@ -1275,6 +1279,7 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w
impl<'a, F: Fn() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> { impl<'a, F: Fn() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> {
fn drop(&mut self) { fn drop(&mut self) {
if (self.should_persist)() == NotifyOption::DoPersist { if (self.should_persist)() == NotifyOption::DoPersist {
self.needs_persist_flag.store(true, Ordering::Release);
self.event_persist_notifier.notify(); self.event_persist_notifier.notify();
} }
} }
@ -2137,6 +2142,7 @@ macro_rules! process_events_body {
} }
if result == NotifyOption::DoPersist { if result == NotifyOption::DoPersist {
$self.needs_persist_flag.store(true, Ordering::Release);
$self.event_persist_notifier.notify(); $self.event_persist_notifier.notify();
} }
} }
@ -2216,7 +2222,9 @@ where
pending_background_events: Mutex::new(Vec::new()), pending_background_events: Mutex::new(Vec::new()),
total_consistency_lock: RwLock::new(()), total_consistency_lock: RwLock::new(()),
background_events_processed_since_startup: AtomicBool::new(false), background_events_processed_since_startup: AtomicBool::new(false),
event_persist_notifier: Notifier::new(), event_persist_notifier: Notifier::new(),
needs_persist_flag: AtomicBool::new(false),
entropy_source, entropy_source,
node_signer, node_signer,
@ -7381,15 +7389,23 @@ where
} }
} }
/// Gets a [`Future`] that completes when this [`ChannelManager`] needs to be persisted. /// Gets a [`Future`] that completes when this [`ChannelManager`] may need to be persisted or
/// may have events that need processing.
///
/// In order to check if this [`ChannelManager`] needs persisting, call
/// [`Self::get_and_clear_needs_persistence`].
/// ///
/// Note that callbacks registered on the [`Future`] MUST NOT call back into this /// Note that callbacks registered on the [`Future`] MUST NOT call back into this
/// [`ChannelManager`] and should instead register actions to be taken later. /// [`ChannelManager`] and should instead register actions to be taken later.
///
pub fn get_event_or_persistence_needed_future(&self) -> Future { pub fn get_event_or_persistence_needed_future(&self) -> Future {
self.event_persist_notifier.get_future() self.event_persist_notifier.get_future()
} }
/// Returns true if this [`ChannelManager`] needs to be persisted.
pub fn get_and_clear_needs_persistence(&self) -> bool {
self.needs_persist_flag.swap(false, Ordering::AcqRel)
}
#[cfg(any(test, feature = "_test_utils"))] #[cfg(any(test, feature = "_test_utils"))]
pub fn get_event_or_persist_condvar_value(&self) -> bool { pub fn get_event_or_persist_condvar_value(&self) -> bool {
self.event_persist_notifier.notify_pending() self.event_persist_notifier.notify_pending()
@ -9562,7 +9578,9 @@ where
pending_background_events: Mutex::new(pending_background_events), pending_background_events: Mutex::new(pending_background_events),
total_consistency_lock: RwLock::new(()), total_consistency_lock: RwLock::new(()),
background_events_processed_since_startup: AtomicBool::new(false), background_events_processed_since_startup: AtomicBool::new(false),
event_persist_notifier: Notifier::new(), event_persist_notifier: Notifier::new(),
needs_persist_flag: AtomicBool::new(false),
entropy_source: args.entropy_source, entropy_source: args.entropy_source,
node_signer: args.node_signer, node_signer: args.node_signer,