Release write lock between monitor update iterations.

Previously, updating block data on a chain monitor
would acquire a write lock on all of its associated
channel monitors and not release it until the loop
completed.

Now, we instead acquire it on each iteration,
fixing #2470.
This commit is contained in:
Arik Sosman 2023-08-25 12:31:33 -07:00
parent 3dffe54258
commit c7a4949a25
No known key found for this signature in database
GPG key ID: F4FB5A3366C4D92E

View file

@ -42,6 +42,7 @@ use crate::ln::channelmanager::ChannelDetails;
use crate::prelude::*;
use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
use core::iter::FromIterator;
use core::ops::Deref;
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use bitcoin::secp256k1::PublicKey;
@ -285,7 +286,22 @@ where C::Target: chain::Filter,
where
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
{
let funding_outpoints: HashSet<OutPoint> = HashSet::from_iter(self.monitors.read().unwrap().keys().cloned());
for funding_outpoint in funding_outpoints.iter() {
let monitor_lock = self.monitors.read().unwrap();
if let Some(monitor_state) = monitor_lock.get(funding_outpoint) {
self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state);
}
}
// do some followup cleanup if any funding outpoints were added in between iterations
let monitor_states = self.monitors.write().unwrap();
for (funding_outpoint, monitor_state) in monitor_states.iter() {
if !funding_outpoints.contains(funding_outpoint) {
self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state);
}
}
if let Some(height) = best_height {
// If the best block height is being updated, update highest_chain_height under the
// monitors write lock.
@ -295,55 +311,55 @@ where C::Target: chain::Filter,
self.highest_chain_height.store(new_height, Ordering::Release);
}
}
}
for (funding_outpoint, monitor_state) in monitor_states.iter() {
let monitor = &monitor_state.monitor;
let mut txn_outputs;
{
txn_outputs = process(monitor, txdata);
let update_id = MonitorUpdateId {
contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
};
let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
if let Some(height) = best_height {
if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
// If there are not ChainSync persists awaiting completion, go ahead and
// set last_chain_persist_height here - we wouldn't want the first
// InProgress to always immediately be considered "overly delayed".
monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
}
}
log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) {
ChannelMonitorUpdateStatus::Completed =>
log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
ChannelMonitorUpdateStatus::PermanentFailure => {
monitor_state.channel_perm_failed.store(true, Ordering::Release);
self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
self.event_notifier.notify();
},
ChannelMonitorUpdateStatus::InProgress => {
log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
pending_monitor_updates.push(update_id);
},
fn update_monitor_with_chain_data<FN>(&self, header: &BlockHeader, best_height: Option<u32>, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint, monitor_state: &MonitorHolder<ChannelSigner>) where FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs> {
let monitor = &monitor_state.monitor;
let mut txn_outputs;
{
txn_outputs = process(monitor, txdata);
let update_id = MonitorUpdateId {
contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
};
let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
if let Some(height) = best_height {
if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
// If there are not ChainSync persists awaiting completion, go ahead and
// set last_chain_persist_height here - we wouldn't want the first
// InProgress to always immediately be considered "overly delayed".
monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
}
}
// Register any new outputs with the chain source for filtering, storing any dependent
// transactions from within the block that previously had not been included in txdata.
if let Some(ref chain_source) = self.chain_source {
let block_hash = header.block_hash();
for (txid, mut outputs) in txn_outputs.drain(..) {
for (idx, output) in outputs.drain(..) {
// Register any new outputs with the chain source for filtering
let output = WatchedOutput {
block_hash: Some(block_hash),
outpoint: OutPoint { txid, index: idx as u16 },
script_pubkey: output.script_pubkey,
};
chain_source.register_output(output)
}
log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) {
ChannelMonitorUpdateStatus::Completed =>
log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
ChannelMonitorUpdateStatus::PermanentFailure => {
monitor_state.channel_perm_failed.store(true, Ordering::Release);
self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
self.event_notifier.notify();
}
ChannelMonitorUpdateStatus::InProgress => {
log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
pending_monitor_updates.push(update_id);
}
}
}
// Register any new outputs with the chain source for filtering, storing any dependent
// transactions from within the block that previously had not been included in txdata.
if let Some(ref chain_source) = self.chain_source {
let block_hash = header.block_hash();
for (txid, mut outputs) in txn_outputs.drain(..) {
for (idx, output) in outputs.drain(..) {
// Register any new outputs with the chain source for filtering
let output = WatchedOutput {
block_hash: Some(block_hash),
outpoint: OutPoint { txid, index: idx as u16 },
script_pubkey: output.script_pubkey,
};
chain_source.register_output(output)
}
}
}
@ -976,7 +992,7 @@ mod tests {
assert!(err.contains("ChannelMonitor storage failure")));
check_added_monitors!(nodes[0], 2); // After the failure we generate a close-channel monitor update
check_closed_broadcast!(nodes[0], true);
check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() },
check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() },
[nodes[1].node.get_our_node_id()], 100000);
// However, as the ChainMonitor is still waiting for the original persistence to complete,