From c7a4949a25aed0e10bf0d65e9a5918eea5e22615 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Fri, 25 Aug 2023 12:31:33 -0700 Subject: [PATCH 1/2] Release write lock between monitor update iterations. Previously, updating block data on a chain monitor would acquire a write lock on all of its associated channel monitors and not release it until the loop completed. Now, we instead acquire it on each iteration, fixing #2470. --- lightning/src/chain/chainmonitor.rs | 108 ++++++++++++++++------------ 1 file changed, 62 insertions(+), 46 deletions(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 7d698a220..6051f00b9 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -42,6 +42,7 @@ use crate::ln::channelmanager::ChannelDetails; use crate::prelude::*; use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard}; +use core::iter::FromIterator; use core::ops::Deref; use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use bitcoin::secp256k1::PublicKey; @@ -285,7 +286,22 @@ where C::Target: chain::Filter, where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { + let funding_outpoints: HashSet = HashSet::from_iter(self.monitors.read().unwrap().keys().cloned()); + for funding_outpoint in funding_outpoints.iter() { + let monitor_lock = self.monitors.read().unwrap(); + if let Some(monitor_state) = monitor_lock.get(funding_outpoint) { + self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state); + } + } + + // do some followup cleanup if any funding outpoints were added in between iterations let monitor_states = self.monitors.write().unwrap(); + for (funding_outpoint, monitor_state) in monitor_states.iter() { + if !funding_outpoints.contains(funding_outpoint) { + self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state); + } + } + if let Some(height) = best_height { // If the best block height is being updated, update highest_chain_height under the // monitors write lock. @@ -295,55 +311,55 @@ where C::Target: chain::Filter, self.highest_chain_height.store(new_height, Ordering::Release); } } + } - for (funding_outpoint, monitor_state) in monitor_states.iter() { - let monitor = &monitor_state.monitor; - let mut txn_outputs; - { - txn_outputs = process(monitor, txdata); - let update_id = MonitorUpdateId { - contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()), - }; - let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); - if let Some(height) = best_height { - if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) { - // If there are not ChainSync persists awaiting completion, go ahead and - // set last_chain_persist_height here - we wouldn't want the first - // InProgress to always immediately be considered "overly delayed". - monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release); - } - } - - log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); - match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) { - ChannelMonitorUpdateStatus::Completed => - log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)), - ChannelMonitorUpdateStatus::PermanentFailure => { - monitor_state.channel_perm_failed.store(true, Ordering::Release); - self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id())); - self.event_notifier.notify(); - }, - ChannelMonitorUpdateStatus::InProgress => { - log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); - pending_monitor_updates.push(update_id); - }, + fn update_monitor_with_chain_data(&self, header: &BlockHeader, best_height: Option, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint, monitor_state: &MonitorHolder) where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { + let monitor = &monitor_state.monitor; + let mut txn_outputs; + { + txn_outputs = process(monitor, txdata); + let update_id = MonitorUpdateId { + contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()), + }; + let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); + if let Some(height) = best_height { + if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) { + // If there are not ChainSync persists awaiting completion, go ahead and + // set last_chain_persist_height here - we wouldn't want the first + // InProgress to always immediately be considered "overly delayed". + monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release); } } - // Register any new outputs with the chain source for filtering, storing any dependent - // transactions from within the block that previously had not been included in txdata. - if let Some(ref chain_source) = self.chain_source { - let block_hash = header.block_hash(); - for (txid, mut outputs) in txn_outputs.drain(..) { - for (idx, output) in outputs.drain(..) { - // Register any new outputs with the chain source for filtering - let output = WatchedOutput { - block_hash: Some(block_hash), - outpoint: OutPoint { txid, index: idx as u16 }, - script_pubkey: output.script_pubkey, - }; - chain_source.register_output(output) - } + log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); + match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) { + ChannelMonitorUpdateStatus::Completed => + log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)), + ChannelMonitorUpdateStatus::PermanentFailure => { + monitor_state.channel_perm_failed.store(true, Ordering::Release); + self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id())); + self.event_notifier.notify(); + } + ChannelMonitorUpdateStatus::InProgress => { + log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); + pending_monitor_updates.push(update_id); + } + } + } + + // Register any new outputs with the chain source for filtering, storing any dependent + // transactions from within the block that previously had not been included in txdata. + if let Some(ref chain_source) = self.chain_source { + let block_hash = header.block_hash(); + for (txid, mut outputs) in txn_outputs.drain(..) { + for (idx, output) in outputs.drain(..) { + // Register any new outputs with the chain source for filtering + let output = WatchedOutput { + block_hash: Some(block_hash), + outpoint: OutPoint { txid, index: idx as u16 }, + script_pubkey: output.script_pubkey, + }; + chain_source.register_output(output) } } } @@ -976,7 +992,7 @@ mod tests { assert!(err.contains("ChannelMonitor storage failure"))); check_added_monitors!(nodes[0], 2); // After the failure we generate a close-channel monitor update check_closed_broadcast!(nodes[0], true); - check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() }, + check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() }, [nodes[1].node.get_our_node_id()], 100000); // However, as the ChainMonitor is still waiting for the original persistence to complete, From f80284cc88ccbac8168316eba887e51142623cc1 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Fri, 25 Aug 2023 17:34:10 -0700 Subject: [PATCH 2/2] Fix flaky aggregated HTLC revocation test. Releasing write locks in between monitor updates requires storing a set of cloned keys to iterate over. For efficiency purposes, that set of keys is an actual set, as opposed to array, which means that the iteration order may not be consistent. The test was relying on an event array index to access the revocation transaction. We change that to accessing a hash map keyed by the txid, fixing the test. --- lightning/src/ln/monitor_tests.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/lightning/src/ln/monitor_tests.rs b/lightning/src/ln/monitor_tests.rs index 728752f97..58878a999 100644 --- a/lightning/src/ln/monitor_tests.rs +++ b/lightning/src/ln/monitor_tests.rs @@ -2191,7 +2191,7 @@ fn test_anchors_aggregated_revoked_htlc_tx() { // Alice should see that Bob is trying to claim to HTLCs, so she should now try to claim them at // the second level instead. - let revoked_claims = { + let revoked_claim_transactions = { let txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0); assert_eq!(txn.len(), 2); @@ -2205,10 +2205,14 @@ fn test_anchors_aggregated_revoked_htlc_tx() { check_spends!(revoked_htlc_claim, htlc_tx); } - txn + let mut revoked_claim_transaction_map = HashMap::new(); + for current_tx in txn.into_iter() { + revoked_claim_transaction_map.insert(current_tx.txid(), current_tx); + } + revoked_claim_transaction_map }; for node in &nodes { - mine_transactions(node, &revoked_claims.iter().collect::>()); + mine_transactions(node, &revoked_claim_transactions.values().collect::>()); } @@ -2234,7 +2238,8 @@ fn test_anchors_aggregated_revoked_htlc_tx() { let spend_tx = nodes[0].keys_manager.backing.spend_spendable_outputs( &[&outputs[0]], Vec::new(), Script::new_op_return(&[]), 253, None, &Secp256k1::new(), ).unwrap(); - check_spends!(spend_tx, revoked_claims[idx]); + + check_spends!(spend_tx, revoked_claim_transactions.get(&spend_tx.input[0].previous_output.txid).unwrap()); } else { panic!("unexpected event"); }