From 9228f902e48eab79d65e55a087a8039e4cab1fe4 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 24 Aug 2023 18:34:55 +0000 Subject: [PATCH 01/11] Rename `ChannelManager` update future methods for new API In the next commit, we separate `ChannelManager`'s concept of waking a listener to both be persisted and to allow the user to handle events. Here we rename the future-fetching method in anticipation of this split. --- lightning-background-processor/src/lib.rs | 8 +-- lightning/src/ln/channelmanager.rs | 70 +++++++++++------------ 2 files changed, 39 insertions(+), 39 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 353ed6738..c17ab522e 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -655,7 +655,7 @@ where channel_manager, channel_manager.process_pending_events_async(async_event_handler).await, gossip_sync, peer_manager, logger, scorer, should_break, { let fut = Selector { - a: channel_manager.get_persistable_update_future(), + a: channel_manager.get_event_or_persistence_needed_future(), b: chain_monitor.get_update_future(), c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }), }; @@ -788,7 +788,7 @@ impl BackgroundProcessor { channel_manager, channel_manager.process_pending_events(&event_handler), gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), Sleeper::from_two_futures( - channel_manager.get_persistable_update_future(), + channel_manager.get_event_or_persistence_needed_future(), chain_monitor.get_update_future() ).wait_timeout(Duration::from_millis(100)), |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false) @@ -1326,7 +1326,7 @@ mod tests { check_persisted_data!(nodes[0].node, filepath.clone()); loop { - if !nodes[0].node.get_persistence_condvar_value() { break } + if !nodes[0].node.get_event_or_persist_condvar_value() { break } } // Force-close the channel. @@ -1335,7 +1335,7 @@ mod tests { // Check that the force-close updates are persisted. check_persisted_data!(nodes[0].node, filepath.clone()); loop { - if !nodes[0].node.get_persistence_condvar_value() { break } + if !nodes[0].node.get_event_or_persist_condvar_value() { break } } // Check network graph is persisted diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 6ea516600..08b6f607e 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1185,7 +1185,7 @@ where background_events_processed_since_startup: AtomicBool, - persistence_notifier: Notifier, + event_persist_notifier: Notifier, entropy_source: ES, node_signer: NS, @@ -1228,7 +1228,7 @@ enum NotifyOption { /// notify or not based on whether relevant changes have been made, providing a closure to /// `optionally_notify` which returns a `NotifyOption`. struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> { - persistence_notifier: &'a Notifier, + event_persist_notifier: &'a Notifier, should_persist: F, // We hold onto this result so the lock doesn't get released immediately. _read_guard: RwLockReadGuard<'a, ()>, @@ -1240,7 +1240,7 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w let _ = cm.get_cm().process_background_events(); // We always persist PersistenceNotifierGuard { - persistence_notifier: &cm.get_cm().persistence_notifier, + event_persist_notifier: &cm.get_cm().event_persist_notifier, should_persist: || -> NotifyOption { NotifyOption::DoPersist }, _read_guard: read_guard, } @@ -1253,7 +1253,7 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w let read_guard = lock.read().unwrap(); PersistenceNotifierGuard { - persistence_notifier: notifier, + event_persist_notifier: notifier, should_persist: persist_check, _read_guard: read_guard, } @@ -1263,7 +1263,7 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w impl<'a, F: Fn() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> { fn drop(&mut self) { if (self.should_persist)() == NotifyOption::DoPersist { - self.persistence_notifier.notify(); + self.event_persist_notifier.notify(); } } } @@ -2125,7 +2125,7 @@ macro_rules! process_events_body { } if result == NotifyOption::DoPersist { - $self.persistence_notifier.notify(); + $self.event_persist_notifier.notify(); } } } @@ -2204,7 +2204,7 @@ where pending_background_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), background_events_processed_since_startup: AtomicBool::new(false), - persistence_notifier: Notifier::new(), + event_persist_notifier: Notifier::new(), entropy_source, node_signer, @@ -4422,7 +4422,7 @@ where /// these a fuzz failure (as they usually indicate a channel force-close, which is exactly what /// it wants to detect). Thus, we have a variant exposed here for its benefit. pub fn maybe_update_chan_fees(&self) { - PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { + PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.event_persist_notifier, || { let mut should_persist = self.process_background_events(); let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); @@ -4467,7 +4467,7 @@ where /// [`ChannelUpdate`]: msgs::ChannelUpdate /// [`ChannelConfig`]: crate::util::config::ChannelConfig pub fn timer_tick_occurred(&self) { - PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { + PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.event_persist_notifier, || { let mut should_persist = self.process_background_events(); let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); @@ -7000,7 +7000,7 @@ where /// the `MessageSendEvent`s to the specific peer they were generated under. fn get_and_clear_pending_msg_events(&self) -> Vec { let events = RefCell::new(Vec::new()); - PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { + PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.event_persist_notifier, || { let mut result = self.process_background_events(); // TODO: This behavior should be documented. It's unintuitive that we query @@ -7083,7 +7083,7 @@ where fn block_disconnected(&self, header: &BlockHeader, height: u32) { let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, - &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); + &self.event_persist_notifier, || -> NotifyOption { NotifyOption::DoPersist }); let new_height = height - 1; { let mut best_block = self.best_block.write().unwrap(); @@ -7118,7 +7118,7 @@ where log_trace!(self.logger, "{} transactions included in block {} at height {} provided", txdata.len(), block_hash, height); let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, - &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); + &self.event_persist_notifier, || -> NotifyOption { NotifyOption::DoPersist }); self.do_chain_event(Some(height), |channel| channel.transactions_confirmed(&block_hash, height, txdata, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger) .map(|(a, b)| (a, Vec::new(), b))); @@ -7138,7 +7138,7 @@ where log_trace!(self.logger, "New best block: {} at height {}", block_hash, height); let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, - &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); + &self.event_persist_notifier, || -> NotifyOption { NotifyOption::DoPersist }); *self.best_block.write().unwrap() = BestBlock::new(block_hash, height); self.do_chain_event(Some(height), |channel| channel.best_block_updated(height, header.time, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger)); @@ -7182,7 +7182,7 @@ where fn transaction_unconfirmed(&self, txid: &Txid) { let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, - &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); + &self.event_persist_notifier, || -> NotifyOption { NotifyOption::DoPersist }); self.do_chain_event(None, |channel| { if let Some(funding_txo) = channel.context.get_funding_txo() { if funding_txo.txid == *txid { @@ -7370,13 +7370,13 @@ where /// Note that callbacks registered on the [`Future`] MUST NOT call back into this /// [`ChannelManager`] and should instead register actions to be taken later. /// - pub fn get_persistable_update_future(&self) -> Future { - self.persistence_notifier.get_future() + pub fn get_event_or_persistence_needed_future(&self) -> Future { + self.event_persist_notifier.get_future() } #[cfg(any(test, feature = "_test_utils"))] - pub fn get_persistence_condvar_value(&self) -> bool { - self.persistence_notifier.notify_pending() + pub fn get_event_or_persist_condvar_value(&self) -> bool { + self.event_persist_notifier.notify_pending() } /// Gets the latest best block which was connected either via the [`chain::Listen`] or @@ -7520,7 +7520,7 @@ where } fn handle_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) { - PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { + PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.event_persist_notifier, || { let force_persist = self.process_background_events(); if let Ok(persist) = handle_error!(self, self.internal_channel_update(counterparty_node_id, msg), *counterparty_node_id) { if force_persist == NotifyOption::DoPersist { NotifyOption::DoPersist } else { persist } @@ -9547,7 +9547,7 @@ where pending_background_events: Mutex::new(pending_background_events), total_consistency_lock: RwLock::new(()), background_events_processed_since_startup: AtomicBool::new(false), - persistence_notifier: Notifier::new(), + event_persist_notifier: Notifier::new(), entropy_source: args.entropy_source, node_signer: args.node_signer, @@ -9609,9 +9609,9 @@ mod tests { // All nodes start with a persistable update pending as `create_network` connects each node // with all other nodes to make most tests simpler. - assert!(nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(nodes[1].node.get_persistable_update_future().poll_is_complete()); - assert!(nodes[2].node.get_persistable_update_future().poll_is_complete()); + assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete()); let mut chan = create_announced_chan_between_nodes(&nodes, 0, 1); @@ -9625,19 +9625,19 @@ mod tests { &nodes[0].node.get_our_node_id()).pop().unwrap(); // The first two nodes (which opened a channel) should now require fresh persistence - assert!(nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(nodes[1].node.get_persistable_update_future().poll_is_complete()); + assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); // ... but the last node should not. - assert!(!nodes[2].node.get_persistable_update_future().poll_is_complete()); + assert!(!nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete()); // After persisting the first two nodes they should no longer need fresh persistence. - assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete()); + assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); // Node 3, unrelated to the only channel, shouldn't care if it receives a channel_update // about the channel. nodes[2].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &chan.0); nodes[2].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &chan.1); - assert!(!nodes[2].node.get_persistable_update_future().poll_is_complete()); + assert!(!nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete()); // The nodes which are a party to the channel should also ignore messages from unrelated // parties. @@ -9645,8 +9645,8 @@ mod tests { nodes[0].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.1); nodes[1].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.0); nodes[1].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.1); - assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete()); + assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); // At this point the channel info given by peers should still be the same. assert_eq!(nodes[0].node.list_channels()[0], node_a_chan_info); @@ -9663,8 +9663,8 @@ mod tests { // persisted and that its channel info remains the same. nodes[0].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &as_update); nodes[1].node.handle_channel_update(&nodes[0].node.get_our_node_id(), &bs_update); - assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete()); + assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); assert_eq!(nodes[0].node.list_channels()[0], node_a_chan_info); assert_eq!(nodes[1].node.list_channels()[0], node_b_chan_info); @@ -9672,8 +9672,8 @@ mod tests { // the channel info has updated. nodes[0].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &bs_update); nodes[1].node.handle_channel_update(&nodes[0].node.get_our_node_id(), &as_update); - assert!(nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(nodes[1].node.get_persistable_update_future().poll_is_complete()); + assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); assert_ne!(nodes[0].node.list_channels()[0], node_a_chan_info); assert_ne!(nodes[1].node.list_channels()[0], node_b_chan_info); } From c2aee577701e85495516b45c4f6dfc2f8f4b0e24 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 11 Sep 2023 03:10:36 +0000 Subject: [PATCH 02/11] Make PersistenceNotifierGuard::optionally_notify take a ChanMan ref Long ago, for reasons lost to the ages, the `PersistenceNotifierGuard::optionally_notify` constructor didn't take a `ChannelManager` reference, but rather explicit references to the fields of it that it needs. This is cumbersome and useless, so we fix it here. --- lightning/src/ln/channelmanager.rs | 31 +++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 08b6f607e..3d96db4bb 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1249,11 +1249,12 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w /// Note that if any [`ChannelMonitorUpdate`]s are possibly generated, /// [`ChannelManager::process_background_events`] MUST be called first. - fn optionally_notify NotifyOption>(lock: &'a RwLock<()>, notifier: &'a Notifier, persist_check: F) -> PersistenceNotifierGuard<'a, F> { - let read_guard = lock.read().unwrap(); + fn optionally_notify NotifyOption, C: AChannelManager>(cm: &'a C, persist_check: F) + -> PersistenceNotifierGuard<'a, F> { + let read_guard = cm.get_cm().total_consistency_lock.read().unwrap(); PersistenceNotifierGuard { - event_persist_notifier: notifier, + event_persist_notifier: &cm.get_cm().event_persist_notifier, should_persist: persist_check, _read_guard: read_guard, } @@ -4422,7 +4423,7 @@ where /// these a fuzz failure (as they usually indicate a channel force-close, which is exactly what /// it wants to detect). Thus, we have a variant exposed here for its benefit. pub fn maybe_update_chan_fees(&self) { - PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.event_persist_notifier, || { + PersistenceNotifierGuard::optionally_notify(self, || { let mut should_persist = self.process_background_events(); let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); @@ -4467,7 +4468,7 @@ where /// [`ChannelUpdate`]: msgs::ChannelUpdate /// [`ChannelConfig`]: crate::util::config::ChannelConfig pub fn timer_tick_occurred(&self) { - PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.event_persist_notifier, || { + PersistenceNotifierGuard::optionally_notify(self, || { let mut should_persist = self.process_background_events(); let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); @@ -7000,7 +7001,7 @@ where /// the `MessageSendEvent`s to the specific peer they were generated under. fn get_and_clear_pending_msg_events(&self) -> Vec { let events = RefCell::new(Vec::new()); - PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.event_persist_notifier, || { + PersistenceNotifierGuard::optionally_notify(self, || { let mut result = self.process_background_events(); // TODO: This behavior should be documented. It's unintuitive that we query @@ -7082,8 +7083,8 @@ where } fn block_disconnected(&self, header: &BlockHeader, height: u32) { - let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, - &self.event_persist_notifier, || -> NotifyOption { NotifyOption::DoPersist }); + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, + || -> NotifyOption { NotifyOption::DoPersist }); let new_height = height - 1; { let mut best_block = self.best_block.write().unwrap(); @@ -7117,8 +7118,8 @@ where let block_hash = header.block_hash(); log_trace!(self.logger, "{} transactions included in block {} at height {} provided", txdata.len(), block_hash, height); - let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, - &self.event_persist_notifier, || -> NotifyOption { NotifyOption::DoPersist }); + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, + || -> NotifyOption { NotifyOption::DoPersist }); self.do_chain_event(Some(height), |channel| channel.transactions_confirmed(&block_hash, height, txdata, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger) .map(|(a, b)| (a, Vec::new(), b))); @@ -7137,8 +7138,8 @@ where let block_hash = header.block_hash(); log_trace!(self.logger, "New best block: {} at height {}", block_hash, height); - let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, - &self.event_persist_notifier, || -> NotifyOption { NotifyOption::DoPersist }); + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, + || -> NotifyOption { NotifyOption::DoPersist }); *self.best_block.write().unwrap() = BestBlock::new(block_hash, height); self.do_chain_event(Some(height), |channel| channel.best_block_updated(height, header.time, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger)); @@ -7181,8 +7182,8 @@ where } fn transaction_unconfirmed(&self, txid: &Txid) { - let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, - &self.event_persist_notifier, || -> NotifyOption { NotifyOption::DoPersist }); + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, + || -> NotifyOption { NotifyOption::DoPersist }); self.do_chain_event(None, |channel| { if let Some(funding_txo) = channel.context.get_funding_txo() { if funding_txo.txid == *txid { @@ -7520,7 +7521,7 @@ where } fn handle_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) { - PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.event_persist_notifier, || { + PersistenceNotifierGuard::optionally_notify(self, || { let force_persist = self.process_background_events(); if let Ok(persist) = handle_error!(self, self.internal_channel_update(counterparty_node_id, msg), *counterparty_node_id) { if force_persist == NotifyOption::DoPersist { NotifyOption::DoPersist } else { persist } From 63e6b80fb034b56d4fc0b7b1f3d3ce317ffecfe0 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 11 Sep 2023 03:38:14 +0000 Subject: [PATCH 03/11] Make it harder to forget to call CM::process_background_events Prior to any actions which may generate a `ChannelMonitorUpdate`, and in general after startup, `ChannelManager::process_background_events` must be called. This is mostly accomplished by doing so on taking the `total_consistency_lock` via the `PersistenceNotifierGuard`. In order to skip this call in block connection logic, the `PersistenceNotifierGuard::optionally_notify` constructor did not call the `process_background_events` method. However, this is very easy to misuse - `optionally_notify` does not convey to the reader that they need to call `process_background_events` at all. Here we fix this by adding a separate `optionally_notify_skipping_background_events` method, making the requirements much clearer to callers. --- lightning/src/ln/channelmanager.rs | 52 +++++++++++++++++++----------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 3d96db4bb..f3dbd9c25 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1236,21 +1236,32 @@ struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> { impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care what the concrete F is here, it's unused fn notify_on_drop(cm: &'a C) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> { + Self::optionally_notify(cm, || -> NotifyOption { NotifyOption::DoPersist }) + } + + fn optionally_notify NotifyOption, C: AChannelManager>(cm: &'a C, persist_check: F) + -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> { let read_guard = cm.get_cm().total_consistency_lock.read().unwrap(); - let _ = cm.get_cm().process_background_events(); // We always persist + let force_notify = cm.get_cm().process_background_events(); PersistenceNotifierGuard { event_persist_notifier: &cm.get_cm().event_persist_notifier, - should_persist: || -> NotifyOption { NotifyOption::DoPersist }, + should_persist: move || { + // Pick the "most" action between `persist_check` and the background events + // processing and return that. + let notify = persist_check(); + if force_notify == NotifyOption::DoPersist { NotifyOption::DoPersist } + else { notify } + }, _read_guard: read_guard, } - } /// Note that if any [`ChannelMonitorUpdate`]s are possibly generated, - /// [`ChannelManager::process_background_events`] MUST be called first. - fn optionally_notify NotifyOption, C: AChannelManager>(cm: &'a C, persist_check: F) - -> PersistenceNotifierGuard<'a, F> { + /// [`ChannelManager::process_background_events`] MUST be called first (or + /// [`Self::optionally_notify`] used). + fn optionally_notify_skipping_background_events NotifyOption, C: AChannelManager> + (cm: &'a C, persist_check: F) -> PersistenceNotifierGuard<'a, F> { let read_guard = cm.get_cm().total_consistency_lock.read().unwrap(); PersistenceNotifierGuard { @@ -4424,7 +4435,7 @@ where /// it wants to detect). Thus, we have a variant exposed here for its benefit. pub fn maybe_update_chan_fees(&self) { PersistenceNotifierGuard::optionally_notify(self, || { - let mut should_persist = self.process_background_events(); + let mut should_persist = NotifyOption::SkipPersist; let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum); @@ -4469,7 +4480,7 @@ where /// [`ChannelConfig`]: crate::util::config::ChannelConfig pub fn timer_tick_occurred(&self) { PersistenceNotifierGuard::optionally_notify(self, || { - let mut should_persist = self.process_background_events(); + let mut should_persist = NotifyOption::SkipPersist; let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum); @@ -7002,7 +7013,7 @@ where fn get_and_clear_pending_msg_events(&self) -> Vec { let events = RefCell::new(Vec::new()); PersistenceNotifierGuard::optionally_notify(self, || { - let mut result = self.process_background_events(); + let mut result = NotifyOption::SkipPersist; // TODO: This behavior should be documented. It's unintuitive that we query // ChannelMonitors when clearing other events. @@ -7083,8 +7094,9 @@ where } fn block_disconnected(&self, header: &BlockHeader, height: u32) { - let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, - || -> NotifyOption { NotifyOption::DoPersist }); + let _persistence_guard = + PersistenceNotifierGuard::optionally_notify_skipping_background_events( + self, || -> NotifyOption { NotifyOption::DoPersist }); let new_height = height - 1; { let mut best_block = self.best_block.write().unwrap(); @@ -7118,8 +7130,9 @@ where let block_hash = header.block_hash(); log_trace!(self.logger, "{} transactions included in block {} at height {} provided", txdata.len(), block_hash, height); - let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, - || -> NotifyOption { NotifyOption::DoPersist }); + let _persistence_guard = + PersistenceNotifierGuard::optionally_notify_skipping_background_events( + self, || -> NotifyOption { NotifyOption::DoPersist }); self.do_chain_event(Some(height), |channel| channel.transactions_confirmed(&block_hash, height, txdata, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger) .map(|(a, b)| (a, Vec::new(), b))); @@ -7138,8 +7151,9 @@ where let block_hash = header.block_hash(); log_trace!(self.logger, "New best block: {} at height {}", block_hash, height); - let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, - || -> NotifyOption { NotifyOption::DoPersist }); + let _persistence_guard = + PersistenceNotifierGuard::optionally_notify_skipping_background_events( + self, || -> NotifyOption { NotifyOption::DoPersist }); *self.best_block.write().unwrap() = BestBlock::new(block_hash, height); self.do_chain_event(Some(height), |channel| channel.best_block_updated(height, header.time, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger)); @@ -7182,8 +7196,9 @@ where } fn transaction_unconfirmed(&self, txid: &Txid) { - let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, - || -> NotifyOption { NotifyOption::DoPersist }); + let _persistence_guard = + PersistenceNotifierGuard::optionally_notify_skipping_background_events( + self, || -> NotifyOption { NotifyOption::DoPersist }); self.do_chain_event(None, |channel| { if let Some(funding_txo) = channel.context.get_funding_txo() { if funding_txo.txid == *txid { @@ -7522,9 +7537,8 @@ where fn handle_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) { PersistenceNotifierGuard::optionally_notify(self, || { - let force_persist = self.process_background_events(); if let Ok(persist) = handle_error!(self, self.internal_channel_update(counterparty_node_id, msg), *counterparty_node_id) { - if force_persist == NotifyOption::DoPersist { NotifyOption::DoPersist } else { persist } + persist } else { NotifyOption::SkipPersist } From 7fa499c188294cf4179e57cc47050e9a261ed72b Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 8 Sep 2023 20:26:29 +0000 Subject: [PATCH 04/11] Separate ChannelManager needing persistence from having events Currently, when a ChannelManager generates a notification for the background processor, any pending events are handled and the ChannelManager is always re-persisted. Many channel related messages don't actually change the channel state in a way that changes the persisted channel. For example, an `update_add_htlc` or `update_fail_htlc` message simply adds the change to a queue, changing the channel state when we receive a `commitment_signed` message. In these cases we shouldn't be re-persisting the ChannelManager as it hasn't changed (persisted) state at all. In anticipation of doing so in the next few commits, here we make the public API handle the two concepts (somewhat) separately. The notification still goes out via a single waker, however whether or not to persist is now handled via a separate atomic bool. --- lightning-background-processor/src/lib.rs | 12 +++++------- lightning/src/ln/channelmanager.rs | 22 ++++++++++++++++++++-- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index c17ab522e..6a36874a3 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -315,7 +315,7 @@ macro_rules! define_run_body { // see `await_start`'s use below. let mut await_start = None; if $check_slow_await { await_start = Some($get_timer(1)); } - let updates_available = $await; + $await; let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false }; // Exit the loop if the background processor was requested to stop. @@ -324,7 +324,7 @@ macro_rules! define_run_body { break; } - if updates_available { + if $channel_manager.get_and_clear_needs_persistence() { log_trace!($logger, "Persisting ChannelManager..."); $persister.persist_manager(&*$channel_manager)?; log_trace!($logger, "Done persisting ChannelManager."); @@ -660,11 +660,9 @@ where c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }), }; match fut.await { - SelectorOutput::A => true, - SelectorOutput::B => false, + SelectorOutput::A|SelectorOutput::B => {}, SelectorOutput::C(exit) => { should_break = exit; - false } } }, |t| sleeper(Duration::from_secs(t)), @@ -787,10 +785,10 @@ impl BackgroundProcessor { define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), channel_manager, channel_manager.process_pending_events(&event_handler), gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), - Sleeper::from_two_futures( + { Sleeper::from_two_futures( channel_manager.get_event_or_persistence_needed_future(), chain_monitor.get_update_future() - ).wait_timeout(Duration::from_millis(100)), + ).wait_timeout(Duration::from_millis(100)); }, |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index f3dbd9c25..6f3b2e457 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1186,6 +1186,7 @@ where background_events_processed_since_startup: AtomicBool, event_persist_notifier: Notifier, + needs_persist_flag: AtomicBool, entropy_source: ES, node_signer: NS, @@ -1229,6 +1230,7 @@ enum NotifyOption { /// `optionally_notify` which returns a `NotifyOption`. struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> { event_persist_notifier: &'a Notifier, + needs_persist_flag: &'a AtomicBool, should_persist: F, // We hold onto this result so the lock doesn't get released immediately. _read_guard: RwLockReadGuard<'a, ()>, @@ -1246,6 +1248,7 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w PersistenceNotifierGuard { event_persist_notifier: &cm.get_cm().event_persist_notifier, + needs_persist_flag: &cm.get_cm().needs_persist_flag, should_persist: move || { // Pick the "most" action between `persist_check` and the background events // processing and return that. @@ -1266,6 +1269,7 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w PersistenceNotifierGuard { event_persist_notifier: &cm.get_cm().event_persist_notifier, + needs_persist_flag: &cm.get_cm().needs_persist_flag, should_persist: persist_check, _read_guard: read_guard, } @@ -1275,6 +1279,7 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w impl<'a, F: Fn() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> { fn drop(&mut self) { if (self.should_persist)() == NotifyOption::DoPersist { + self.needs_persist_flag.store(true, Ordering::Release); self.event_persist_notifier.notify(); } } @@ -2137,6 +2142,7 @@ macro_rules! process_events_body { } if result == NotifyOption::DoPersist { + $self.needs_persist_flag.store(true, Ordering::Release); $self.event_persist_notifier.notify(); } } @@ -2216,7 +2222,9 @@ where pending_background_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), background_events_processed_since_startup: AtomicBool::new(false), + event_persist_notifier: Notifier::new(), + needs_persist_flag: AtomicBool::new(false), entropy_source, node_signer, @@ -7381,15 +7389,23 @@ where } } - /// Gets a [`Future`] that completes when this [`ChannelManager`] needs to be persisted. + /// Gets a [`Future`] that completes when this [`ChannelManager`] may need to be persisted or + /// may have events that need processing. + /// + /// In order to check if this [`ChannelManager`] needs persisting, call + /// [`Self::get_and_clear_needs_persistence`]. /// /// Note that callbacks registered on the [`Future`] MUST NOT call back into this /// [`ChannelManager`] and should instead register actions to be taken later. - /// pub fn get_event_or_persistence_needed_future(&self) -> Future { self.event_persist_notifier.get_future() } + /// Returns true if this [`ChannelManager`] needs to be persisted. + pub fn get_and_clear_needs_persistence(&self) -> bool { + self.needs_persist_flag.swap(false, Ordering::AcqRel) + } + #[cfg(any(test, feature = "_test_utils"))] pub fn get_event_or_persist_condvar_value(&self) -> bool { self.event_persist_notifier.notify_pending() @@ -9562,7 +9578,9 @@ where pending_background_events: Mutex::new(pending_background_events), total_consistency_lock: RwLock::new(()), background_events_processed_since_startup: AtomicBool::new(false), + event_persist_notifier: Notifier::new(), + needs_persist_flag: AtomicBool::new(false), entropy_source: args.entropy_source, node_signer: args.node_signer, From 305df1d7da1f23e60b7d06f2d8026e9dbcea35f4 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 24 Aug 2023 18:37:18 +0000 Subject: [PATCH 05/11] Update `channelmanager::NotifyOption` to indicate persist or event As we now signal events-available from persistence-needed separately, the `NotifyOption` enum should include a separate variant for events-but-no-persistence, which we add here. --- lightning/src/ln/channelmanager.rs | 59 ++++++++++++++++++------------ 1 file changed, 35 insertions(+), 24 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 6f3b2e457..e97d6a5ee 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1215,7 +1215,8 @@ pub struct ChainParameters { #[must_use] enum NotifyOption { DoPersist, - SkipPersist, + SkipPersistHandleEvents, + SkipPersistNoEvents, } /// Whenever we release the `ChannelManager`'s `total_consistency_lock`, from read mode, it is @@ -1253,8 +1254,13 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w // Pick the "most" action between `persist_check` and the background events // processing and return that. let notify = persist_check(); - if force_notify == NotifyOption::DoPersist { NotifyOption::DoPersist } - else { notify } + match (notify, force_notify) { + (NotifyOption::DoPersist, _) => NotifyOption::DoPersist, + (_, NotifyOption::DoPersist) => NotifyOption::DoPersist, + (NotifyOption::SkipPersistHandleEvents, _) => NotifyOption::SkipPersistHandleEvents, + (_, NotifyOption::SkipPersistHandleEvents) => NotifyOption::SkipPersistHandleEvents, + _ => NotifyOption::SkipPersistNoEvents, + } }, _read_guard: read_guard, } @@ -1278,9 +1284,14 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w impl<'a, F: Fn() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> { fn drop(&mut self) { - if (self.should_persist)() == NotifyOption::DoPersist { - self.needs_persist_flag.store(true, Ordering::Release); - self.event_persist_notifier.notify(); + match (self.should_persist)() { + NotifyOption::DoPersist => { + self.needs_persist_flag.store(true, Ordering::Release); + self.event_persist_notifier.notify() + }, + NotifyOption::SkipPersistHandleEvents => + self.event_persist_notifier.notify(), + NotifyOption::SkipPersistNoEvents => {}, } } } @@ -2092,7 +2103,7 @@ macro_rules! process_events_body { return; } - let mut result = NotifyOption::SkipPersist; + let mut result; { // We'll acquire our total consistency lock so that we can be sure no other @@ -2101,7 +2112,7 @@ macro_rules! process_events_body { // Because `handle_post_event_actions` may send `ChannelMonitorUpdate`s to the user we must // ensure any startup-generated background events are handled first. - if $self.process_background_events() == NotifyOption::DoPersist { result = NotifyOption::DoPersist; } + result = $self.process_background_events(); // TODO: This behavior should be documented. It's unintuitive that we query // ChannelMonitors when clearing other events. @@ -4348,7 +4359,7 @@ where let mut background_events = Vec::new(); mem::swap(&mut *self.pending_background_events.lock().unwrap(), &mut background_events); if background_events.is_empty() { - return NotifyOption::SkipPersist; + return NotifyOption::SkipPersistNoEvents; } for event in background_events.drain(..) { @@ -4417,17 +4428,17 @@ where } fn update_channel_fee(&self, chan_id: &ChannelId, chan: &mut Channel, new_feerate: u32) -> NotifyOption { - if !chan.context.is_outbound() { return NotifyOption::SkipPersist; } + if !chan.context.is_outbound() { return NotifyOption::SkipPersistNoEvents; } // If the feerate has decreased by less than half, don't bother if new_feerate <= chan.context.get_feerate_sat_per_1000_weight() && new_feerate * 2 > chan.context.get_feerate_sat_per_1000_weight() { log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {}.", - &chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate); - return NotifyOption::SkipPersist; + chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate); + return NotifyOption::SkipPersistNoEvents; } if !chan.context.is_live() { log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {} as it cannot currently be updated (probably the peer is disconnected).", - &chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate); - return NotifyOption::SkipPersist; + chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate); + return NotifyOption::SkipPersistNoEvents; } log_trace!(self.logger, "Channel {} qualifies for a feerate change from {} to {}.", &chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate); @@ -4443,7 +4454,7 @@ where /// it wants to detect). Thus, we have a variant exposed here for its benefit. pub fn maybe_update_chan_fees(&self) { PersistenceNotifierGuard::optionally_notify(self, || { - let mut should_persist = NotifyOption::SkipPersist; + let mut should_persist = NotifyOption::SkipPersistNoEvents; let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum); @@ -4488,7 +4499,7 @@ where /// [`ChannelConfig`]: crate::util::config::ChannelConfig pub fn timer_tick_occurred(&self) { PersistenceNotifierGuard::optionally_notify(self, || { - let mut should_persist = NotifyOption::SkipPersist; + let mut should_persist = NotifyOption::SkipPersistNoEvents; let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum); @@ -6361,19 +6372,19 @@ where Ok(()) } - /// Returns ShouldPersist if anything changed, otherwise either SkipPersist or an Err. + /// Returns DoPersist if anything changed, otherwise either SkipPersistNoEvents or an Err. fn internal_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) -> Result { let (chan_counterparty_node_id, chan_id) = match self.short_to_chan_info.read().unwrap().get(&msg.contents.short_channel_id) { Some((cp_id, chan_id)) => (cp_id.clone(), chan_id.clone()), None => { // It's not a local channel - return Ok(NotifyOption::SkipPersist) + return Ok(NotifyOption::SkipPersistNoEvents) } }; let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex_opt = per_peer_state.get(&chan_counterparty_node_id); if peer_state_mutex_opt.is_none() { - return Ok(NotifyOption::SkipPersist) + return Ok(NotifyOption::SkipPersistNoEvents) } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; @@ -6385,14 +6396,14 @@ where // If the announcement is about a channel of ours which is public, some // other peer may simply be forwarding all its gossip to us. Don't provide // a scary-looking error message and return Ok instead. - return Ok(NotifyOption::SkipPersist); + return Ok(NotifyOption::SkipPersistNoEvents); } return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a channel_update for a channel from the wrong node - it shouldn't know about our private channels!".to_owned(), chan_id)); } let were_node_one = self.get_our_node_id().serialize()[..] < chan.context.get_counterparty_node_id().serialize()[..]; let msg_from_node_one = msg.contents.flags & 1 == 0; if were_node_one == msg_from_node_one { - return Ok(NotifyOption::SkipPersist); + return Ok(NotifyOption::SkipPersistNoEvents); } else { log_debug!(self.logger, "Received channel_update for channel {}.", chan_id); try_chan_phase_entry!(self, chan.channel_update(&msg), chan_phase_entry); @@ -6402,7 +6413,7 @@ where "Got a channel_update for an unfunded channel!".into())), chan_phase_entry); } }, - hash_map::Entry::Vacant(_) => return Ok(NotifyOption::SkipPersist) + hash_map::Entry::Vacant(_) => return Ok(NotifyOption::SkipPersistNoEvents) } Ok(NotifyOption::DoPersist) } @@ -7021,7 +7032,7 @@ where fn get_and_clear_pending_msg_events(&self) -> Vec { let events = RefCell::new(Vec::new()); PersistenceNotifierGuard::optionally_notify(self, || { - let mut result = NotifyOption::SkipPersist; + let mut result = NotifyOption::SkipPersistNoEvents; // TODO: This behavior should be documented. It's unintuitive that we query // ChannelMonitors when clearing other events. @@ -7556,7 +7567,7 @@ where if let Ok(persist) = handle_error!(self, self.internal_channel_update(counterparty_node_id, msg), *counterparty_node_id) { persist } else { - NotifyOption::SkipPersist + NotifyOption::SkipPersistNoEvents } }); } From 71bafecafccbf35298f41149cd0153b72b8e786e Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 24 Aug 2023 19:36:58 +0000 Subject: [PATCH 06/11] Move a handful of channel messages to notify-without-persist Many channel related messages don't actually change the channel state in a way that changes the persisted channel. For example, an `update_add_htlc` or `update_fail_htlc` message simply adds the change to a queue, changing the channel state when we receive a `commitment_signed` message. In these cases there's really no reason to wake the background processor at all - there's no response message and there's no state update. However, note that if we close the channel we should persist the `ChannelManager`. If we send an error message without closing the channel, we should wake the background processor without persisting. Here we move to the appropriate `NotifyOption` on some of the simpler channel message handlers. --- lightning/src/ln/channelmanager.rs | 135 +++++++++++++++++++++++++---- 1 file changed, 118 insertions(+), 17 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index e97d6a5ee..eeeab90a8 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -494,6 +494,10 @@ impl MsgHandleErrInternal { channel_capacity: None, } } + + fn closes_channel(&self) -> bool { + self.chan_id.is_some() + } } /// We hold back HTLCs we intend to relay for a random interval greater than this (see @@ -1238,6 +1242,12 @@ struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> { } impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care what the concrete F is here, it's unused + /// Notifies any waiters and indicates that we need to persist, in addition to possibly having + /// events to handle. + /// + /// This must always be called if the changes included a `ChannelMonitorUpdate`, as well as in + /// other cases where losing the changes on restart may result in a force-close or otherwise + /// isn't ideal. fn notify_on_drop(cm: &'a C) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> { Self::optionally_notify(cm, || -> NotifyOption { NotifyOption::DoPersist }) } @@ -2152,9 +2162,14 @@ macro_rules! process_events_body { processed_all_events = false; } - if result == NotifyOption::DoPersist { - $self.needs_persist_flag.store(true, Ordering::Release); - $self.event_persist_notifier.notify(); + match result { + NotifyOption::DoPersist => { + $self.needs_persist_flag.store(true, Ordering::Release); + $self.event_persist_notifier.notify(); + }, + NotifyOption::SkipPersistHandleEvents => + $self.event_persist_notifier.notify(), + NotifyOption::SkipPersistNoEvents => {}, } } } @@ -5560,6 +5575,8 @@ where } fn internal_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this, so any changes are + // likely to be lost on restart! if msg.chain_hash != self.genesis_hash { return Err(MsgHandleErrInternal::send_err_msg_no_close("Unknown genesis block hash".to_owned(), msg.temporary_channel_id.clone())); } @@ -5659,6 +5676,8 @@ where } fn internal_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this, so any changes are + // likely to be lost on restart! let (value, output_script, user_id) = { let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) @@ -5819,6 +5838,8 @@ where } fn internal_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error + // closing a channel), so any changes are likely to be lost on restart! let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -5997,6 +6018,9 @@ where //encrypted with the same key. It's not immediately obvious how to usefully exploit that, //but we should prevent it anyway. + // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error + // closing a channel), so any changes are likely to be lost on restart! + let decoded_hop_res = self.decode_update_add_htlc_onion(msg); let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) @@ -6078,6 +6102,8 @@ where } fn internal_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error + // closing a channel), so any changes are likely to be lost on restart! let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -6101,6 +6127,8 @@ where } fn internal_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error + // closing a channel), so any changes are likely to be lost on restart! let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -7476,8 +7504,21 @@ where L::Target: Logger, { fn handle_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_open_channel(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // open_channel message - pre-funded channels are never written so there should be no + // change to the contents. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_open_channel(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => { + debug_assert!(false, "We shouldn't close a new channel"); + NotifyOption::DoPersist + }, + _ => NotifyOption::SkipPersistHandleEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_open_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannelV2) { @@ -7487,8 +7528,13 @@ where } fn handle_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // accept_channel message - pre-funded channels are never written so there should be no + // change to the contents. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, msg), *counterparty_node_id); + NotifyOption::SkipPersistHandleEvents + }); } fn handle_accept_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannelV2) { @@ -7508,8 +7554,19 @@ where } fn handle_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_channel_ready(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // channel_ready message - while the channel's state will change, any channel_ready message + // will ultimately be re-sent on startup and the `ChannelMonitor` won't be updated so we + // will not force-close the channel on startup. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_channel_ready(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + _ => NotifyOption::SkipPersistHandleEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_shutdown(&self, counterparty_node_id: &PublicKey, msg: &msgs::Shutdown) { @@ -7523,8 +7580,19 @@ where } fn handle_update_add_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_update_add_htlc(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // update_add_htlc message - the message itself doesn't change our channel state only the + // `commitment_signed` message afterwards will. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_update_add_htlc(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(()) => NotifyOption::SkipPersistNoEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) { @@ -7533,13 +7601,35 @@ where } fn handle_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_update_fail_htlc(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // update_fail_htlc message - the message itself doesn't change our channel state only the + // `commitment_signed` message afterwards will. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_update_fail_htlc(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(()) => NotifyOption::SkipPersistNoEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_update_fail_malformed_htlc(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // update_fail_malformed_htlc message - the message itself doesn't change our channel state + // only the `commitment_signed` message afterwards will. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_update_fail_malformed_htlc(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(()) => NotifyOption::SkipPersistNoEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_commitment_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::CommitmentSigned) { @@ -7553,8 +7643,19 @@ where } fn handle_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_update_fee(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // update_fee message - the message itself doesn't change our channel state only the + // `commitment_signed` message afterwards will. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_update_fee(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(()) => NotifyOption::SkipPersistNoEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_announcement_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) { From e37b35040813a7b4dfc9123ba82ce29c32156367 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 10 Sep 2023 23:10:03 +0000 Subject: [PATCH 07/11] Avoid persisting `ChannelManager` in response to peer connection When a peer connects and we send some `channel_reestablish` messages or create a `per_peer_state` entry there's really no reason to need to persist the `ChannelManager`. None of the possible actions we take immediately result in a change to the persisted contents of a `ChannelManager`, only the peer's later `channel_reestablish` message does. --- lightning/src/ln/channelmanager.rs | 144 +++++++++++++++-------------- 1 file changed, 75 insertions(+), 69 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index eeeab90a8..1df6db7db 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1233,7 +1233,7 @@ enum NotifyOption { /// We allow callers to either always notify by constructing with `notify_on_drop` or choose to /// notify or not based on whether relevant changes have been made, providing a closure to /// `optionally_notify` which returns a `NotifyOption`. -struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> { +struct PersistenceNotifierGuard<'a, F: FnMut() -> NotifyOption> { event_persist_notifier: &'a Notifier, needs_persist_flag: &'a AtomicBool, should_persist: F, @@ -1248,12 +1248,12 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w /// This must always be called if the changes included a `ChannelMonitorUpdate`, as well as in /// other cases where losing the changes on restart may result in a force-close or otherwise /// isn't ideal. - fn notify_on_drop(cm: &'a C) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> { + fn notify_on_drop(cm: &'a C) -> PersistenceNotifierGuard<'a, impl FnMut() -> NotifyOption> { Self::optionally_notify(cm, || -> NotifyOption { NotifyOption::DoPersist }) } - fn optionally_notify NotifyOption, C: AChannelManager>(cm: &'a C, persist_check: F) - -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> { + fn optionally_notify NotifyOption, C: AChannelManager>(cm: &'a C, mut persist_check: F) + -> PersistenceNotifierGuard<'a, impl FnMut() -> NotifyOption> { let read_guard = cm.get_cm().total_consistency_lock.read().unwrap(); let force_notify = cm.get_cm().process_background_events(); @@ -1292,7 +1292,7 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w } } -impl<'a, F: Fn() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> { +impl<'a, F: FnMut() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> { fn drop(&mut self) { match (self.should_persist)() { NotifyOption::DoPersist => { @@ -7778,76 +7778,82 @@ where return Err(()); } - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); + let mut res = Ok(()); - // If we have too many peers connected which don't have funded channels, disconnect the - // peer immediately (as long as it doesn't have funded channels). If we have a bunch of - // unfunded channels taking up space in memory for disconnected peers, we still let new - // peers connect, but we'll reject new channels from them. - let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected); - let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS; + PersistenceNotifierGuard::optionally_notify(self, || { + // If we have too many peers connected which don't have funded channels, disconnect the + // peer immediately (as long as it doesn't have funded channels). If we have a bunch of + // unfunded channels taking up space in memory for disconnected peers, we still let new + // peers connect, but we'll reject new channels from them. + let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected); + let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS; - { - let mut peer_state_lock = self.per_peer_state.write().unwrap(); - match peer_state_lock.entry(counterparty_node_id.clone()) { - hash_map::Entry::Vacant(e) => { - if inbound_peer_limited { - return Err(()); - } - e.insert(Mutex::new(PeerState { - channel_by_id: HashMap::new(), - inbound_channel_request_by_id: HashMap::new(), - latest_features: init_msg.features.clone(), - pending_msg_events: Vec::new(), - in_flight_monitor_updates: BTreeMap::new(), - monitor_update_blocked_actions: BTreeMap::new(), - actions_blocking_raa_monitor_updates: BTreeMap::new(), - is_connected: true, - })); - }, - hash_map::Entry::Occupied(e) => { - let mut peer_state = e.get().lock().unwrap(); - peer_state.latest_features = init_msg.features.clone(); + { + let mut peer_state_lock = self.per_peer_state.write().unwrap(); + match peer_state_lock.entry(counterparty_node_id.clone()) { + hash_map::Entry::Vacant(e) => { + if inbound_peer_limited { + res = Err(()); + return NotifyOption::SkipPersistNoEvents; + } + e.insert(Mutex::new(PeerState { + channel_by_id: HashMap::new(), + inbound_channel_request_by_id: HashMap::new(), + latest_features: init_msg.features.clone(), + pending_msg_events: Vec::new(), + in_flight_monitor_updates: BTreeMap::new(), + monitor_update_blocked_actions: BTreeMap::new(), + actions_blocking_raa_monitor_updates: BTreeMap::new(), + is_connected: true, + })); + }, + hash_map::Entry::Occupied(e) => { + let mut peer_state = e.get().lock().unwrap(); + peer_state.latest_features = init_msg.features.clone(); - let best_block_height = self.best_block.read().unwrap().height(); - if inbound_peer_limited && - Self::unfunded_channel_count(&*peer_state, best_block_height) == - peer_state.channel_by_id.len() - { - return Err(()); - } + let best_block_height = self.best_block.read().unwrap().height(); + if inbound_peer_limited && + Self::unfunded_channel_count(&*peer_state, best_block_height) == + peer_state.channel_by_id.len() + { + res = Err(()); + return NotifyOption::SkipPersistNoEvents; + } - debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice"); - peer_state.is_connected = true; - }, - } - } - - log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id)); - - let per_peer_state = self.per_peer_state.read().unwrap(); - if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) { - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; - let pending_msg_events = &mut peer_state.pending_msg_events; - - peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)| - if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { - // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted - // (so won't be recovered after a crash), they shouldn't exist here and we would never need to - // worry about closing and removing them. - debug_assert!(false); - None + debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice"); + peer_state.is_connected = true; + }, } - ).for_each(|chan| { - pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish { - node_id: chan.context.get_counterparty_node_id(), - msg: chan.get_channel_reestablish(&self.logger), + } + + log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id)); + + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let pending_msg_events = &mut peer_state.pending_msg_events; + + peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)| + if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { + // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted + // (so won't be recovered after a crash), they shouldn't exist here and we would never need to + // worry about closing and removing them. + debug_assert!(false); + None + } + ).for_each(|chan| { + pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish { + node_id: chan.context.get_counterparty_node_id(), + msg: chan.get_channel_reestablish(&self.logger), + }); }); - }); - } - //TODO: Also re-broadcast announcement_signatures - Ok(()) + } + + return NotifyOption::SkipPersistHandleEvents; + //TODO: Also re-broadcast announcement_signatures + }); + res } fn handle_error(&self, counterparty_node_id: &PublicKey, msg: &msgs::ErrorMessage) { From 9078c0dc5c30f7fcd1503cf30237cd3fe87154c0 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 24 Aug 2023 19:57:45 +0000 Subject: [PATCH 08/11] Always persist the `ChannelManager` on a failed ChannelUpdate If we receive a `ChannelUpdate` message which was invalid, it can cause us to force-close the channel, which should result in a `ChannelManager` persistence, though its not critical to do so. --- lightning/src/ln/channelmanager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 1df6db7db..5e9b53606 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -7668,7 +7668,7 @@ where if let Ok(persist) = handle_error!(self, self.internal_channel_update(counterparty_node_id, msg), *counterparty_node_id) { persist } else { - NotifyOption::SkipPersistNoEvents + NotifyOption::DoPersist } }); } From ce94a5ec221d63f47d65f674b46422b1612147bf Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 24 Aug 2023 20:02:08 +0000 Subject: [PATCH 09/11] Skip persistence in the usual case handling channel_reestablish When we handle an inbound `channel_reestablish` from our peers it generally doesn't change any state and thus doesn't need a `ChannelManager` persistence. Here we avoid said persistence where possible. --- lightning/src/ln/channelmanager.rs | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 5e9b53606..07e4b2a0e 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -6446,7 +6446,7 @@ where Ok(NotifyOption::DoPersist) } - fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(), MsgHandleErrInternal> { + fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result { let htlc_forwards; let need_lnd_workaround = { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -6502,14 +6502,16 @@ where } }; + let mut persist = NotifyOption::SkipPersistHandleEvents; if let Some(forwards) = htlc_forwards { self.forward_htlcs(&mut [forwards][..]); + persist = NotifyOption::DoPersist; } if let Some(channel_ready_msg) = need_lnd_workaround { self.internal_channel_ready(counterparty_node_id, &channel_ready_msg)?; } - Ok(()) + Ok(persist) } /// Process pending events from the [`chain::Watch`], returning whether any events were processed. @@ -7674,12 +7676,22 @@ where } fn handle_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_channel_reestablish(counterparty_node_id, msg), *counterparty_node_id); + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_channel_reestablish(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(persist) => *persist, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn peer_disconnected(&self, counterparty_node_id: &PublicKey) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); + let _persistence_guard = PersistenceNotifierGuard::optionally_notify( + self, || NotifyOption::SkipPersistHandleEvents); + let mut failed_channels = Vec::new(); let mut per_peer_state = self.per_peer_state.write().unwrap(); let remove_peer = { From 5c3fa553a10be74a123b9530da4aec2342164d97 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 28 Aug 2023 01:35:16 +0000 Subject: [PATCH 10/11] Remove largely useless checks in chanmon_consistency fuzzer When reloading nodes A or C, the chanmon_consistency fuzzer currently calls `get_and_clear_pending_msg_events` on the node, potentially causing additional `ChannelMonitor` or `ChannelManager` updates, just to check that no unexpected messages are generated. There's not much reason to do so, the fuzzer could always swap for a different command to call the same method, and the additional checking requires some weird monitor persistence introspection. Here we simplify the fuzzer by simply removing this logic. --- fuzz/src/chanmon_consistency.rs | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 4c79f0bee..05df09110 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -125,7 +125,6 @@ struct TestChainMonitor { // "fails" if we ever force-close a channel, we avoid doing so, always saving the latest // fully-serialized monitor state here, as well as the corresponding update_id. pub latest_monitors: Mutex)>>, - pub should_update_manager: atomic::AtomicBool, } impl TestChainMonitor { pub fn new(broadcaster: Arc, logger: Arc, feeest: Arc, persister: Arc, keys: Arc) -> Self { @@ -135,7 +134,6 @@ impl TestChainMonitor { keys, persister, latest_monitors: Mutex::new(HashMap::new()), - should_update_manager: atomic::AtomicBool::new(false), } } } @@ -146,7 +144,6 @@ impl chain::Watch for TestChainMonitor { if let Some(_) = self.latest_monitors.lock().unwrap().insert(funding_txo, (monitor.get_latest_update_id(), ser.0)) { panic!("Already had monitor pre-watch_channel"); } - self.should_update_manager.store(true, atomic::Ordering::Relaxed); self.chain_monitor.watch_channel(funding_txo, monitor) } @@ -162,7 +159,6 @@ impl chain::Watch for TestChainMonitor { let mut ser = VecWriter(Vec::new()); deserialized_monitor.write(&mut ser).unwrap(); map_entry.insert((update.update_id, ser.0)); - self.should_update_manager.store(true, atomic::Ordering::Relaxed); self.chain_monitor.update_channel(funding_txo, update) } @@ -1101,11 +1097,9 @@ pub fn do_test(data: &[u8], underlying_out: Out) { if !chan_a_disconnected { nodes[1].peer_disconnected(&nodes[0].get_our_node_id()); chan_a_disconnected = true; - drain_msg_events_on_disconnect!(0); - } - if monitor_a.should_update_manager.load(atomic::Ordering::Relaxed) { - node_a_ser.0.clear(); - nodes[0].write(&mut node_a_ser).unwrap(); + push_excess_b_events!(nodes[1].get_and_clear_pending_msg_events().drain(..), Some(0)); + ab_events.clear(); + ba_events.clear(); } let (new_node_a, new_monitor_a) = reload_node!(node_a_ser, 0, monitor_a, keys_manager_a, fee_est_a); nodes[0] = new_node_a; @@ -1134,11 +1128,9 @@ pub fn do_test(data: &[u8], underlying_out: Out) { if !chan_b_disconnected { nodes[1].peer_disconnected(&nodes[2].get_our_node_id()); chan_b_disconnected = true; - drain_msg_events_on_disconnect!(2); - } - if monitor_c.should_update_manager.load(atomic::Ordering::Relaxed) { - node_c_ser.0.clear(); - nodes[2].write(&mut node_c_ser).unwrap(); + push_excess_b_events!(nodes[1].get_and_clear_pending_msg_events().drain(..), Some(2)); + bc_events.clear(); + cb_events.clear(); } let (new_node_c, new_monitor_c) = reload_node!(node_c_ser, 2, monitor_c, keys_manager_c, fee_est_c); nodes[2] = new_node_c; @@ -1306,13 +1298,10 @@ pub fn do_test(data: &[u8], underlying_out: Out) { node_a_ser.0.clear(); nodes[0].write(&mut node_a_ser).unwrap(); - monitor_a.should_update_manager.store(false, atomic::Ordering::Relaxed); node_b_ser.0.clear(); nodes[1].write(&mut node_b_ser).unwrap(); - monitor_b.should_update_manager.store(false, atomic::Ordering::Relaxed); node_c_ser.0.clear(); nodes[2].write(&mut node_c_ser).unwrap(); - monitor_c.should_update_manager.store(false, atomic::Ordering::Relaxed); } } From 32e5903ef23c6861b88dc4c078ba100785c2e2cb Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 28 Aug 2023 01:25:36 +0000 Subject: [PATCH 11/11] Restrict `ChannelManager` persist in fuzzing to when we're told to In the `chanmon_consistency` fuzz, we currently "persist" the `ChannelManager` on each loop iteration. With the new logic in the past few commits to reduce the frequency of `ChannelManager` persistences, this behavior now leaves a gap in our test coverage - missing persistence notifications. In order to cath (common-case) persistence misses, we update the `chanmon_consistency` fuzzer to no longer persist the `ChannelManager` unless the waker was woken and signaled to persist, possibly reloading with a previous `ChannelManager` if we were not signaled. --- fuzz/src/chanmon_consistency.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 05df09110..8afc2e151 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -1296,12 +1296,18 @@ pub fn do_test(data: &[u8], underlying_out: Out) { _ => test_return!(), } - node_a_ser.0.clear(); - nodes[0].write(&mut node_a_ser).unwrap(); - node_b_ser.0.clear(); - nodes[1].write(&mut node_b_ser).unwrap(); - node_c_ser.0.clear(); - nodes[2].write(&mut node_c_ser).unwrap(); + if nodes[0].get_and_clear_needs_persistence() == true { + node_a_ser.0.clear(); + nodes[0].write(&mut node_a_ser).unwrap(); + } + if nodes[1].get_and_clear_needs_persistence() == true { + node_b_ser.0.clear(); + nodes[1].write(&mut node_b_ser).unwrap(); + } + if nodes[2].get_and_clear_needs_persistence() == true { + node_c_ser.0.clear(); + nodes[2].write(&mut node_c_ser).unwrap(); + } } }