From 5be29c6add4cb37619a506caea2296d9472028c1 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 1 Feb 2023 20:54:10 +0000 Subject: [PATCH 01/11] Fix the err msg provided when a send fails due to peer disconnected We haven't had a `MonitorUpdateInProgress` check in `Channel::is_live` for some time. --- fuzz/src/chanmon_consistency.rs | 2 +- lightning/src/ln/channelmanager.rs | 6 +++--- lightning/src/ln/payment_tests.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 39887e838..41b10fb5a 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -295,7 +295,7 @@ fn check_api_err(api_err: APIError) { // all others. If you hit this panic, the list of acceptable errors // is probably just stale and you should add new messages here. match err.as_str() { - "Peer for first hop currently disconnected/pending monitor update!" => {}, + "Peer for first hop currently disconnected" => {}, _ if err.starts_with("Cannot push more than their max accepted HTLCs ") => {}, _ if err.starts_with("Cannot send value that would put us over the max HTLC value in flight our peer will accept ") => {}, _ if err.starts_with("Cannot send value that would put our balance under counterparty-announced channel reserve value") => {}, diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index d47fbe0ba..04158da3e 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -2390,10 +2390,10 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(id) { + if !chan.get().is_live() { + return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected".to_owned()}); + } match { - if !chan.get().is_live() { - return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected/pending monitor update!".to_owned()}); - } break_chan_entry!(self, chan.get_mut().send_htlc_and_commit( htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute { path: path.clone(), diff --git a/lightning/src/ln/payment_tests.rs b/lightning/src/ln/payment_tests.rs index 4c228a322..ef9a9d375 100644 --- a/lightning/src/ln/payment_tests.rs +++ b/lightning/src/ln/payment_tests.rs @@ -258,7 +258,7 @@ fn no_pending_leak_on_initial_send_failure() { unwrap_send_err!(nodes[0].node.send_payment(&route, payment_hash, &Some(payment_secret), PaymentId(payment_hash.0)), true, APIError::ChannelUnavailable { ref err }, - assert_eq!(err, "Peer for first hop currently disconnected/pending monitor update!")); + assert_eq!(err, "Peer for first hop currently disconnected")); assert!(!nodes[0].node.has_pending_payments()); } From 8aa518f23d48a6749e23c590700648ea01f6a4df Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 1 Dec 2022 00:25:32 +0000 Subject: [PATCH 02/11] Add an infallible no-sign version of send_commitment_no_status_check In the coming commits we'll move to async `ChannelMonitorUpdate` application, which means we'll want to generate a `ChannelMonitorUpdate` (including a new counterparty commitment transaction) before we actually send it to our counterparty. To do that today we'd have to actually sign the commitment transaction by calling the signer, then drop it, apply the `ChannelMonitorUpdate`, then re-sign the commitment transaction to send it to our peer. In this commit we instead split `send_commitment_no_status_check` and `send_commitment_no_state_update` into `build_` and `send_` variants, allowing us to generate new counterparty commitment transactions without actually signing, then build them for sending, with signatures, later. --- lightning/src/ln/channel.rs | 44 ++++++++++++++++++++++++------------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 62a2b7b51..73a1b5695 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -5791,8 +5791,16 @@ impl Channel { Ok(Some(res)) } - /// Only fails in case of bad keys + /// Only fails in case of signer rejection. fn send_commitment_no_status_check(&mut self, logger: &L) -> Result<(msgs::CommitmentSigned, ChannelMonitorUpdate), ChannelError> where L::Target: Logger { + let monitor_update = self.build_commitment_no_status_check(logger); + match self.send_commitment_no_state_update(logger) { + Ok((commitment_signed, _)) => Ok((commitment_signed, monitor_update)), + Err(e) => Err(e), + } + } + + fn build_commitment_no_status_check(&mut self, logger: &L) -> ChannelMonitorUpdate where L::Target: Logger { log_trace!(logger, "Updating HTLC state for a newly-sent commitment_signed..."); // We can upgrade the status of some HTLCs that are waiting on a commitment, even if we // fail to generate this, we still are at least at a position where upgrading their status @@ -5825,15 +5833,9 @@ impl Channel { } self.resend_order = RAACommitmentOrder::RevokeAndACKFirst; - let (res, counterparty_commitment_txid, htlcs) = match self.send_commitment_no_state_update(logger) { - Ok((res, (counterparty_commitment_tx, mut htlcs))) => { - // Update state now that we've passed all the can-fail calls... - let htlcs_no_ref: Vec<(HTLCOutputInCommitment, Option>)> = - htlcs.drain(..).map(|(htlc, htlc_source)| (htlc, htlc_source.map(|source_ref| Box::new(source_ref.clone())))).collect(); - (res, counterparty_commitment_tx, htlcs_no_ref) - }, - Err(e) => return Err(e), - }; + let (counterparty_commitment_txid, mut htlcs_ref) = self.build_commitment_no_state_update(logger); + let htlcs: Vec<(HTLCOutputInCommitment, Option>)> = + htlcs_ref.drain(..).map(|(htlc, htlc_source)| (htlc, htlc_source.map(|source_ref| Box::new(source_ref.clone())))).collect(); if self.announcement_sigs_state == AnnouncementSigsState::MessageSent { self.announcement_sigs_state = AnnouncementSigsState::Committed; @@ -5850,16 +5852,13 @@ impl Channel { }] }; self.channel_state |= ChannelState::AwaitingRemoteRevoke as u32; - Ok((res, monitor_update)) + monitor_update } - /// Only fails in case of bad keys. Used for channel_reestablish commitment_signed generation - /// when we shouldn't change HTLC/channel state. - fn send_commitment_no_state_update(&self, logger: &L) -> Result<(msgs::CommitmentSigned, (Txid, Vec<(HTLCOutputInCommitment, Option<&HTLCSource>)>)), ChannelError> where L::Target: Logger { + fn build_commitment_no_state_update(&self, logger: &L) -> (Txid, Vec<(HTLCOutputInCommitment, Option<&HTLCSource>)>) where L::Target: Logger { let counterparty_keys = self.build_remote_transaction_keys(); let commitment_stats = self.build_commitment_transaction(self.cur_counterparty_commitment_transaction_number, &counterparty_keys, false, true, logger); let counterparty_commitment_txid = commitment_stats.tx.trust().txid(); - let (signature, htlc_signatures); #[cfg(any(test, fuzzing))] { @@ -5879,6 +5878,21 @@ impl Channel { } } + (counterparty_commitment_txid, commitment_stats.htlcs_included) + } + + /// Only fails in case of signer rejection. Used for channel_reestablish commitment_signed + /// generation when we shouldn't change HTLC/channel state. + fn send_commitment_no_state_update(&self, logger: &L) -> Result<(msgs::CommitmentSigned, (Txid, Vec<(HTLCOutputInCommitment, Option<&HTLCSource>)>)), ChannelError> where L::Target: Logger { + // Get the fee tests from `build_commitment_no_state_update` + #[cfg(any(test, fuzzing))] + self.build_commitment_no_state_update(logger); + + let counterparty_keys = self.build_remote_transaction_keys(); + let commitment_stats = self.build_commitment_transaction(self.cur_counterparty_commitment_transaction_number, &counterparty_keys, false, true, logger); + let counterparty_commitment_txid = commitment_stats.tx.trust().txid(); + let (signature, htlc_signatures); + { let mut htlcs = Vec::with_capacity(commitment_stats.htlcs_included.len()); for &(ref htlc, _) in commitment_stats.htlcs_included.iter() { From 56f979be3e88957b1adbfd108e1f75692bd364b5 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 30 Nov 2022 18:49:44 +0000 Subject: [PATCH 03/11] Track actions to execute after a `ChannelMonitor` is updated When a `ChannelMonitor` update completes, we may need to take some further action, such as exposing an `Event` to the user initiating another `ChannelMonitor` update, etc. This commit adds the basic structure to track such actions and serialize them as required. Note that while this does introduce a new map which is written as an even value which users cannot opt out of, the map is only filled in when users use the asynchronous `ChannelMonitor` updates. As these are still considered beta, breaking downgrades for such users is considered acceptable here. --- lightning/src/ln/channelmanager.rs | 53 +++++++++++++++++++++++++++--- lightning/src/util/ser.rs | 1 + 2 files changed, 50 insertions(+), 4 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 04158da3e..9b277ff1e 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -65,6 +65,8 @@ use crate::util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, Maybe use crate::util::logger::{Level, Logger}; use crate::util::errors::APIError; +use alloc::collections::BTreeMap; + use crate::io; use crate::prelude::*; use core::{cmp, mem}; @@ -474,6 +476,11 @@ pub(crate) enum MonitorUpdateCompletionAction { EmitEvent { event: events::Event }, } +impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction, + (0, PaymentClaimed) => { (0, payment_hash, required) }, + (2, EmitEvent) => { (0, event, ignorable) }, +); + /// State we hold per-peer. pub(super) struct PeerState { /// `temporary_channel_id` or `channel_id` -> `channel`. @@ -487,6 +494,21 @@ pub(super) struct PeerState { /// Messages to send to the peer - pushed to in the same lock that they are generated in (except /// for broadcast messages, where ordering isn't as strict). pub(super) pending_msg_events: Vec, + /// Map from a specific channel to some action(s) that should be taken when all pending + /// [`ChannelMonitorUpdate`]s for the channel complete updating. + /// + /// Note that because we generally only have one entry here a HashMap is pretty overkill. A + /// BTreeMap currently stores more than ten elements per leaf node, so even up to a few + /// channels with a peer this will just be one allocation and will amount to a linear list of + /// channels to walk, avoiding the whole hashing rigmarole. + /// + /// Note that the channel may no longer exist. For example, if a channel was closed but we + /// later needed to claim an HTLC which is pending on-chain, we may generate a monitor update + /// for a missing channel. While a malicious peer could construct a second channel with the + /// same `temporary_channel_id` (or final `channel_id` in the case of 0conf channels or prior + /// to funding appearing on-chain), the downstream `ChannelMonitor` set is required to ensure + /// duplicates do not occur, so such channels should fail without a monitor update completing. + monitor_update_blocked_actions: BTreeMap<[u8; 32], Vec>, /// The peer is currently connected (i.e. we've seen a /// [`ChannelMessageHandler::peer_connected`] and no corresponding /// [`ChannelMessageHandler::peer_disconnected`]. @@ -501,7 +523,7 @@ impl PeerState { if require_disconnected && self.is_connected { return false } - self.channel_by_id.len() == 0 + self.channel_by_id.is_empty() && self.monitor_update_blocked_actions.is_empty() } } @@ -6319,6 +6341,7 @@ where channel_by_id: HashMap::new(), latest_features: init_msg.features.clone(), pending_msg_events: Vec::new(), + monitor_update_blocked_actions: BTreeMap::new(), is_connected: true, })); }, @@ -6946,10 +6969,14 @@ where htlc_purposes.push(purpose); } + let mut monitor_update_blocked_actions_per_peer = None; + let mut peer_states = Vec::new(); + for (_, peer_state_mutex) in per_peer_state.iter() { + peer_states.push(peer_state_mutex.lock().unwrap()); + } + (serializable_peer_count).write(writer)?; - for (peer_pubkey, peer_state_mutex) in per_peer_state.iter() { - let peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &*peer_state_lock; + for ((peer_pubkey, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) { // Peers which we have no channels to should be dropped once disconnected. As we // disconnect all peers when shutting down and serializing the ChannelManager, we // consider all peers as disconnected here. There's therefore no need write peers with @@ -6957,6 +6984,11 @@ where if !peer_state.ok_to_remove(false) { peer_pubkey.write(writer)?; peer_state.latest_features.write(writer)?; + if !peer_state.monitor_update_blocked_actions.is_empty() { + monitor_update_blocked_actions_per_peer + .get_or_insert_with(Vec::new) + .push((*peer_pubkey, &peer_state.monitor_update_blocked_actions)); + } } } @@ -7044,6 +7076,7 @@ where (3, pending_outbound_payments, required), (4, pending_claiming_payments, option), (5, self.our_network_pubkey, required), + (6, monitor_update_blocked_actions_per_peer, option), (7, self.fake_scid_rand_bytes, required), (9, htlc_purposes, vec_type), (11, self.probing_cookie_secret, required), @@ -7356,6 +7389,7 @@ where channel_by_id: peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()), latest_features: Readable::read(reader)?, pending_msg_events: Vec::new(), + monitor_update_blocked_actions: BTreeMap::new(), is_connected: false, }; per_peer_state.insert(peer_pubkey, Mutex::new(peer_state)); @@ -7412,12 +7446,14 @@ where let mut probing_cookie_secret: Option<[u8; 32]> = None; let mut claimable_htlc_purposes = None; let mut pending_claiming_payments = Some(HashMap::new()); + let mut monitor_update_blocked_actions_per_peer = Some(Vec::new()); read_tlv_fields!(reader, { (1, pending_outbound_payments_no_retry, option), (2, pending_intercepted_htlcs, option), (3, pending_outbound_payments, option), (4, pending_claiming_payments, option), (5, received_network_pubkey, option), + (6, monitor_update_blocked_actions_per_peer, option), (7, fake_scid_rand_bytes, option), (9, claimable_htlc_purposes, vec_type), (11, probing_cookie_secret, option), @@ -7683,6 +7719,15 @@ where } } + for (node_id, monitor_update_blocked_actions) in monitor_update_blocked_actions_per_peer.unwrap() { + if let Some(peer_state) = per_peer_state.get_mut(&node_id) { + peer_state.lock().unwrap().monitor_update_blocked_actions = monitor_update_blocked_actions; + } else { + log_error!(args.logger, "Got blocked actions without a per-peer-state for {}", node_id); + return Err(DecodeError::InvalidValue); + } + } + let channel_manager = ChannelManager { genesis_hash, fee_estimator: bounded_fee_estimator, diff --git a/lightning/src/util/ser.rs b/lightning/src/util/ser.rs index 928cc6194..847005e32 100644 --- a/lightning/src/util/ser.rs +++ b/lightning/src/util/ser.rs @@ -770,6 +770,7 @@ impl Readable for Vec { } impl_for_vec!(ecdsa::Signature); +impl_for_vec!(crate::ln::channelmanager::MonitorUpdateCompletionAction); impl_for_vec!((A, B), A, B); impl Writeable for Script { From 34218cc4ee68b825ba8f4f12a93b7b0ed0f9898e Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 19 Dec 2022 20:41:42 +0000 Subject: [PATCH 04/11] Add storage for `ChannelMonitorUpdate`s in `Channel`s In order to support fully async `ChannelMonitor` updating, we need to ensure that we can replay `ChannelMonitorUpdate`s if we shut down after persisting a `ChannelManager` but without completing a `ChannelMonitorUpdate` persistence. In order to support that we (obviously) have to store the `ChannelMonitorUpdate`s in the `ChannelManager`, which we do here inside the `Channel`. We do so now because in the coming commits we will start using the async persistence flow for all updates, and while we won't yet support fully async monitor updating it's nice to get some of the foundational structures in place now. --- lightning/src/ln/channel.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 73a1b5695..13d51624f 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -743,6 +743,12 @@ pub(super) struct Channel { /// The unique identifier used to re-derive the private key material for the channel through /// [`SignerProvider::derive_channel_signer`]. channel_keys_id: [u8; 32], + + /// When we generate [`ChannelMonitorUpdate`]s to persist, they may not be persisted immediately. + /// If we then persist the [`channelmanager::ChannelManager`] and crash before the persistence + /// completes we still need to be able to complete the persistence. Thus, we have to keep a + /// copy of the [`ChannelMonitorUpdate`] here until it is complete. + pending_monitor_updates: Vec, } #[cfg(any(test, fuzzing))] @@ -1112,6 +1118,8 @@ impl Channel { channel_type, channel_keys_id, + + pending_monitor_updates: Vec::new(), }) } @@ -1458,6 +1466,8 @@ impl Channel { channel_type, channel_keys_id, + + pending_monitor_updates: Vec::new(), }; Ok(chan) @@ -4891,6 +4901,10 @@ impl Channel { (self.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0 } + pub fn get_next_monitor_update(&self) -> Option<&ChannelMonitorUpdate> { + self.pending_monitor_updates.first() + } + /// Returns true if funding_created was sent/received. pub fn is_funding_initiated(&self) -> bool { self.channel_state >= ChannelState::FundingSent as u32 @@ -6891,6 +6905,8 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch channel_type: channel_type.unwrap(), channel_keys_id, + + pending_monitor_updates: Vec::new(), }) } } From ba07622d0573e30d1343ef9c50aa5bc8ed8ef61e Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sat, 3 Dec 2022 04:25:37 +0000 Subject: [PATCH 05/11] Add a new monitor update result handling macro Over the next few commits, this macro will replace the `handle_monitor_update_res` macro. It takes a different approach - instead of receiving the message(s) that need to be re-sent after the monitor update completes and pushing them back into the channel, we'll not get the messages from the channel at all until we're ready for them. This will unify our message sending into only actually fetching + sending messages in the common monitor-update-completed code, rather than both there *and* in the functions that call `Channel` when new messages are originated. --- lightning/src/ln/channelmanager.rs | 75 ++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 9b277ff1e..6b9faa564 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1476,6 +1476,81 @@ macro_rules! emit_channel_ready_event { } } +macro_rules! handle_new_monitor_update { + ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { { + // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in + // any case so that it won't deadlock. + debug_assert!($self.id_to_peer.try_lock().is_ok()); + match $update_res { + ChannelMonitorUpdateStatus::InProgress => { + log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.", + log_bytes!($chan.channel_id()[..])); + Ok(()) + }, + ChannelMonitorUpdateStatus::PermanentFailure => { + log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure", + log_bytes!($chan.channel_id()[..])); + update_maps_on_chan_removal!($self, $chan); + let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown( + "ChannelMonitor storage failure".to_owned(), $chan.channel_id(), + $chan.get_user_id(), $chan.force_shutdown(false), + $self.get_channel_update_for_broadcast(&$chan).ok())); + $remove; + res + }, + ChannelMonitorUpdateStatus::Completed => { + if ($update_id == 0 || $chan.get_next_monitor_update() + .expect("We can't be processing a monitor update if it isn't queued") + .update_id == $update_id) && + $chan.get_latest_monitor_update_id() == $update_id + { + let mut updates = $chan.monitor_updating_restored(&$self.logger, + &$self.node_signer, $self.genesis_hash, &$self.default_configuration, + $self.best_block.read().unwrap().height()); + let counterparty_node_id = $chan.get_counterparty_node_id(); + let channel_update = if updates.channel_ready.is_some() && $chan.is_usable() { + // We only send a channel_update in the case where we are just now sending a + // channel_ready and the channel is in a usable state. We may re-send a + // channel_update later through the announcement_signatures process for public + // channels, but there's no reason not to just inform our counterparty of our fees + // now. + if let Ok(msg) = $self.get_channel_update_for_unicast($chan) { + Some(events::MessageSendEvent::SendChannelUpdate { + node_id: counterparty_node_id, + msg, + }) + } else { None } + } else { None }; + let htlc_forwards = $self.handle_channel_resumption( + &mut $peer_state.pending_msg_events, $chan, updates.raa, + updates.commitment_update, updates.order, updates.accepted_htlcs, + updates.funding_broadcastable, updates.channel_ready, + updates.announcement_sigs); + if let Some(upd) = channel_update { + $peer_state.pending_msg_events.push(upd); + } + + let channel_id = $chan.channel_id(); + core::mem::drop($peer_state_lock); + + if let Some(forwards) = htlc_forwards { + $self.forward_htlcs(&mut [forwards][..]); + } + $self.finalize_claims(updates.finalized_claimed_htlcs); + for failure in updates.failed_htlcs.drain(..) { + let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id }; + $self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver); + } + } + Ok(()) + }, + } + } }; + ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan_entry: expr) => { + handle_new_monitor_update!($self, $update_res, $update_id, $peer_state_lock, $peer_state, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry()) + } +} + impl ChannelManager where M::Target: chain::Watch<::Signer>, From 9802afa53bac30e8bb831d267868b2aaabb61668 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 27 Jan 2023 06:14:18 +0000 Subject: [PATCH 06/11] Handle `MonitorUpdateCompletionAction`s after monitor update sync In a previous PR, we added a `MonitorUpdateCompletionAction` enum which described actions to take after a `ChannelMonitorUpdate` persistence completes. At the time, it was only used to execute actions in-line, however in the next commit we'll start (correctly) leaving the existing actions until after monitor updates complete. --- lightning/src/ln/channelmanager.rs | 176 ++++++++++++++--------------- 1 file changed, 84 insertions(+), 92 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 6b9faa564..7e9dc77ad 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -466,6 +466,7 @@ enum BackgroundEvent { ClosingMonitorUpdate((OutPoint, ChannelMonitorUpdate)), } +#[derive(Debug)] pub(crate) enum MonitorUpdateCompletionAction { /// Indicates that a payment ultimately destined for us was claimed and we should emit an /// [`events::Event::PaymentClaimed`] to the user if we haven't yet generated such an event for @@ -1476,6 +1477,54 @@ macro_rules! emit_channel_ready_event { } } +macro_rules! handle_monitor_update_completion { + ($self: ident, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan: expr) => { { + let mut updates = $chan.monitor_updating_restored(&$self.logger, + &$self.node_signer, $self.genesis_hash, &$self.default_configuration, + $self.best_block.read().unwrap().height()); + let counterparty_node_id = $chan.get_counterparty_node_id(); + let channel_update = if updates.channel_ready.is_some() && $chan.is_usable() { + // We only send a channel_update in the case where we are just now sending a + // channel_ready and the channel is in a usable state. We may re-send a + // channel_update later through the announcement_signatures process for public + // channels, but there's no reason not to just inform our counterparty of our fees + // now. + if let Ok(msg) = $self.get_channel_update_for_unicast($chan) { + Some(events::MessageSendEvent::SendChannelUpdate { + node_id: counterparty_node_id, + msg, + }) + } else { None } + } else { None }; + + let update_actions = $peer_state.monitor_update_blocked_actions + .remove(&$chan.channel_id()).unwrap_or(Vec::new()); + + let htlc_forwards = $self.handle_channel_resumption( + &mut $peer_state.pending_msg_events, $chan, updates.raa, + updates.commitment_update, updates.order, updates.accepted_htlcs, + updates.funding_broadcastable, updates.channel_ready, + updates.announcement_sigs); + if let Some(upd) = channel_update { + $peer_state.pending_msg_events.push(upd); + } + + let channel_id = $chan.channel_id(); + core::mem::drop($peer_state_lock); + + $self.handle_monitor_update_completion_actions(update_actions); + + if let Some(forwards) = htlc_forwards { + $self.forward_htlcs(&mut [forwards][..]); + } + $self.finalize_claims(updates.finalized_claimed_htlcs); + for failure in updates.failed_htlcs.drain(..) { + let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id }; + $self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver); + } + } } +} + macro_rules! handle_new_monitor_update { ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { { // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in @@ -1504,43 +1553,7 @@ macro_rules! handle_new_monitor_update { .update_id == $update_id) && $chan.get_latest_monitor_update_id() == $update_id { - let mut updates = $chan.monitor_updating_restored(&$self.logger, - &$self.node_signer, $self.genesis_hash, &$self.default_configuration, - $self.best_block.read().unwrap().height()); - let counterparty_node_id = $chan.get_counterparty_node_id(); - let channel_update = if updates.channel_ready.is_some() && $chan.is_usable() { - // We only send a channel_update in the case where we are just now sending a - // channel_ready and the channel is in a usable state. We may re-send a - // channel_update later through the announcement_signatures process for public - // channels, but there's no reason not to just inform our counterparty of our fees - // now. - if let Ok(msg) = $self.get_channel_update_for_unicast($chan) { - Some(events::MessageSendEvent::SendChannelUpdate { - node_id: counterparty_node_id, - msg, - }) - } else { None } - } else { None }; - let htlc_forwards = $self.handle_channel_resumption( - &mut $peer_state.pending_msg_events, $chan, updates.raa, - updates.commitment_update, updates.order, updates.accepted_htlcs, - updates.funding_broadcastable, updates.channel_ready, - updates.announcement_sigs); - if let Some(upd) = channel_update { - $peer_state.pending_msg_events.push(upd); - } - - let channel_id = $chan.channel_id(); - core::mem::drop($peer_state_lock); - - if let Some(forwards) = htlc_forwards { - $self.forward_htlcs(&mut [forwards][..]); - } - $self.finalize_claims(updates.finalized_claimed_htlcs); - for failure in updates.failed_htlcs.drain(..) { - let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id }; - $self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver); - } + handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $chan); } Ok(()) }, @@ -4208,6 +4221,14 @@ where pending_forwards: Vec<(PendingHTLCInfo, u64)>, funding_broadcastable: Option, channel_ready: Option, announcement_sigs: Option) -> Option<(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)> { + log_trace!(self.logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {}broadcasting funding, {} channel ready, {} announcement", + log_bytes!(channel.channel_id()), + if raa.is_some() { "an" } else { "no" }, + if commitment_update.is_some() { "a" } else { "no" }, pending_forwards.len(), + if funding_broadcastable.is_some() { "" } else { "not " }, + if channel_ready.is_some() { "sending" } else { "without" }, + if announcement_sigs.is_some() { "sending" } else { "without" }); + let mut htlc_forwards = None; let counterparty_node_id = channel.get_counterparty_node_id(); @@ -4266,65 +4287,36 @@ where fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64, counterparty_node_id: Option<&PublicKey>) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); - let htlc_forwards; - let (mut pending_failures, finalized_claims, counterparty_node_id) = { - let counterparty_node_id = match counterparty_node_id { - Some(cp_id) => cp_id.clone(), - None => { - // TODO: Once we can rely on the counterparty_node_id from the - // monitor event, this and the id_to_peer map should be removed. - let id_to_peer = self.id_to_peer.lock().unwrap(); - match id_to_peer.get(&funding_txo.to_channel_id()) { - Some(cp_id) => cp_id.clone(), - None => return, - } + let counterparty_node_id = match counterparty_node_id { + Some(cp_id) => cp_id.clone(), + None => { + // TODO: Once we can rely on the counterparty_node_id from the + // monitor event, this and the id_to_peer map should be removed. + let id_to_peer = self.id_to_peer.lock().unwrap(); + match id_to_peer.get(&funding_txo.to_channel_id()) { + Some(cp_id) => cp_id.clone(), + None => return, } - }; - let per_peer_state = self.per_peer_state.read().unwrap(); - let mut peer_state_lock; - let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); - if peer_state_mutex_opt.is_none() { return } - peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); - let peer_state = &mut *peer_state_lock; - let mut channel = { - match peer_state.channel_by_id.entry(funding_txo.to_channel_id()){ - hash_map::Entry::Occupied(chan) => chan, - hash_map::Entry::Vacant(_) => return, - } - }; - if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id { - return; } - - let updates = channel.get_mut().monitor_updating_restored(&self.logger, &self.node_signer, self.genesis_hash, &self.default_configuration, self.best_block.read().unwrap().height()); - let channel_update = if updates.channel_ready.is_some() && channel.get().is_usable() { - // We only send a channel_update in the case where we are just now sending a - // channel_ready and the channel is in a usable state. We may re-send a - // channel_update later through the announcement_signatures process for public - // channels, but there's no reason not to just inform our counterparty of our fees - // now. - if let Ok(msg) = self.get_channel_update_for_unicast(channel.get()) { - Some(events::MessageSendEvent::SendChannelUpdate { - node_id: channel.get().get_counterparty_node_id(), - msg, - }) - } else { None } - } else { None }; - htlc_forwards = self.handle_channel_resumption(&mut peer_state.pending_msg_events, channel.get_mut(), updates.raa, updates.commitment_update, updates.order, updates.accepted_htlcs, updates.funding_broadcastable, updates.channel_ready, updates.announcement_sigs); - if let Some(upd) = channel_update { - peer_state.pending_msg_events.push(upd); - } - - (updates.failed_htlcs, updates.finalized_claimed_htlcs, counterparty_node_id) }; - if let Some(forwards) = htlc_forwards { - self.forward_htlcs(&mut [forwards][..]); - } - self.finalize_claims(finalized_claims); - for failure in pending_failures.drain(..) { - let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id: funding_txo.to_channel_id() }; - self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver); + let per_peer_state = self.per_peer_state.read().unwrap(); + let mut peer_state_lock; + let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); + if peer_state_mutex_opt.is_none() { return } + peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let mut channel = { + match peer_state.channel_by_id.entry(funding_txo.to_channel_id()){ + hash_map::Entry::Occupied(chan) => chan, + hash_map::Entry::Vacant(_) => return, + } + }; + log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}", + highest_applied_update_id, channel.get().get_latest_monitor_update_id()); + if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id { + return; } + handle_monitor_update_completion!(self, highest_applied_update_id, peer_state_lock, peer_state, channel.get_mut()); } /// Accepts a request to open a channel after a [`Event::OpenChannelRequest`]. From 46c6fb7f91b180fcae8358c82228008fa4eab2c4 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sat, 3 Dec 2022 05:38:24 +0000 Subject: [PATCH 07/11] Move TODO from `handle_monitor_update_res` into `Channel` The TODO mentioned in `handle_monitor_update_res` about how we might forget about HTLCs in case of permanent monitor update failure still applies in spite of all our changes. If a channel is drop'd in general, monitor-pending updates may be lost if the monitor update failed to persist. This was always the case, and is ultimately the general form of the the specific TODO, so we simply leave comments there --- lightning/src/ln/channel.rs | 5 +++++ lightning/src/ln/channelmanager.rs | 9 --------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 13d51624f..896c47504 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -558,6 +558,11 @@ pub(super) struct Channel { monitor_pending_channel_ready: bool, monitor_pending_revoke_and_ack: bool, monitor_pending_commitment_signed: bool, + + // TODO: If a channel is drop'd, we don't know whether the `ChannelMonitor` is ultimately + // responsible for some of the HTLCs here or not - we don't know whether the update in question + // completed or not. We currently ignore these fields entirely when force-closing a channel, + // but need to handle this somehow or we run the risk of losing HTLCs! monitor_pending_forwards: Vec<(PendingHTLCInfo, u64)>, monitor_pending_failures: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, monitor_pending_finalized_fulfills: Vec, diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 7e9dc77ad..ee3b50936 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1374,15 +1374,6 @@ macro_rules! handle_monitor_update_res { ChannelMonitorUpdateStatus::PermanentFailure => { log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure", log_bytes!($chan_id[..])); update_maps_on_chan_removal!($self, $chan); - // TODO: $failed_fails is dropped here, which will cause other channels to hit the - // chain in a confused state! We need to move them into the ChannelMonitor which - // will be responsible for failing backwards once things confirm on-chain. - // It's ok that we drop $failed_forwards here - at this point we'd rather they - // broadcast HTLC-Timeout and pay the associated fees to get their funds back than - // us bother trying to claim it just to forward on to another peer. If we're - // splitting hairs we'd prefer to claim payments that were to us, but we haven't - // given up the preimage yet, so might as well just wait until the payment is - // retried, avoiding the on-chain fees. let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure".to_owned(), *$chan_id, $chan.get_user_id(), $chan.force_shutdown(false), $self.get_channel_update_for_broadcast(&$chan).ok() )); (res, true) From 4e002dcf5c97f506cf8e79e9fab02d71808cae5f Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 11 Jan 2023 21:37:57 +0000 Subject: [PATCH 08/11] Always process `ChannelMonitorUpdate`s asynchronously We currently have two codepaths on most channel update functions - most methods return a set of messages to send a peer iff the `ChannelMonitorUpdate` succeeds, but if it does not we push the messages back into the `Channel` and then pull them back out when the `ChannelMonitorUpdate` completes and send them then. This adds a substantial amount of complexity in very critical codepaths. Instead, here we swap all our channel update codepaths to immediately set the channel-update-required flag and only return a `ChannelMonitorUpdate` to the `ChannelManager`. Internally in the `Channel` we store a queue of `ChannelMonitorUpdate`s, which will become critical in future work to surface pending `ChannelMonitorUpdate`s to users at startup so they can complete. This leaves some redundant work in `Channel` to be cleaned up later. Specifically, we still generate the messages which we will now ignore and regenerate later. This commit updates the `ChannelMonitorUpdate` pipeline across all the places we generate them. --- lightning/src/chain/chainmonitor.rs | 20 +- lightning/src/ln/chanmon_update_fail_tests.rs | 61 ++- lightning/src/ln/channel.rs | 248 +++++----- lightning/src/ln/channelmanager.rs | 443 ++++++------------ lightning/src/ln/functional_tests.rs | 14 +- 5 files changed, 318 insertions(+), 468 deletions(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 3a2077209..9a74f891b 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -796,7 +796,7 @@ mod tests { use crate::ln::functional_test_utils::*; use crate::ln::msgs::ChannelMessageHandler; use crate::util::errors::APIError; - use crate::util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider}; + use crate::util::events::{Event, ClosureReason, MessageSendEvent, MessageSendEventsProvider}; #[test] fn test_async_ooo_offchain_updates() { @@ -819,10 +819,8 @@ mod tests { nodes[1].node.claim_funds(payment_preimage_1); check_added_monitors!(nodes[1], 1); - expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000); nodes[1].node.claim_funds(payment_preimage_2); check_added_monitors!(nodes[1], 1); - expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000); let persistences = chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clone(); assert_eq!(persistences.len(), 1); @@ -850,8 +848,24 @@ mod tests { .find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update)); assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty()); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + assert!(nodes[1].node.get_and_clear_pending_events().is_empty()); nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap(); + let claim_events = nodes[1].node.get_and_clear_pending_events(); + assert_eq!(claim_events.len(), 2); + match claim_events[0] { + Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => { + assert_eq!(payment_hash_1, *payment_hash); + }, + _ => panic!("Unexpected event"), + } + match claim_events[1] { + Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => { + assert_eq!(payment_hash_2, *payment_hash); + }, + _ => panic!("Unexpected event"), + } + // Now manually walk the commitment signed dance - because we claimed two payments // back-to-back it doesn't fit into the neat walk commitment_signed_dance does. diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index fdb66cc4d..b1d43ca07 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -143,7 +143,7 @@ fn test_monitor_and_persister_update_fail() { let mut node_0_per_peer_lock; let mut node_0_peer_state_lock; let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan.2); - if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { + if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { // Check that even though the persister is returning a InProgress, // because the update is bogus, ultimately the error that's returned // should be a PermanentFailure. @@ -1602,7 +1602,6 @@ fn test_monitor_update_fail_claim() { chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); nodes[1].node.claim_funds(payment_preimage_1); - expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); check_added_monitors!(nodes[1], 1); @@ -1628,6 +1627,7 @@ fn test_monitor_update_fail_claim() { let events = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 0); commitment_signed_dance!(nodes[1], nodes[2], payment_event.commitment_msg, false, true); + expect_pending_htlcs_forwardable_ignore!(nodes[1]); let (_, payment_hash_3, payment_secret_3) = get_payment_preimage_hash!(nodes[0]); nodes[2].node.send_payment(&route, payment_hash_3, &Some(payment_secret_3), PaymentId(payment_hash_3.0)).unwrap(); @@ -1645,6 +1645,7 @@ fn test_monitor_update_fail_claim() { let channel_id = chan_1.2; let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); + expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000); check_added_monitors!(nodes[1], 0); let bs_fulfill_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); @@ -1653,7 +1654,7 @@ fn test_monitor_update_fail_claim() { expect_payment_sent!(nodes[0], payment_preimage_1); // Get the payment forwards, note that they were batched into one commitment update. - expect_pending_htlcs_forwardable!(nodes[1]); + nodes[1].node.process_pending_htlc_forwards(); check_added_monitors!(nodes[1], 1); let bs_forward_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); nodes[0].node.handle_update_add_htlc(&nodes[1].node.get_our_node_id(), &bs_forward_update.update_add_htlcs[0]); @@ -1739,7 +1740,6 @@ fn test_monitor_update_on_pending_forwards() { chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); expect_pending_htlcs_forwardable_and_htlc_handling_failed!(nodes[1], vec![HTLCDestination::NextHopChannel { node_id: Some(nodes[2].node.get_our_node_id()), channel_id: chan_2.2 }]); check_added_monitors!(nodes[1], 1); - assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone(); @@ -1753,17 +1753,17 @@ fn test_monitor_update_on_pending_forwards() { let events = nodes[0].node.get_and_clear_pending_events(); assert_eq!(events.len(), 3); - if let Event::PaymentPathFailed { payment_hash, payment_failed_permanently, .. } = events[0] { + if let Event::PaymentPathFailed { payment_hash, payment_failed_permanently, .. } = events[1] { assert_eq!(payment_hash, payment_hash_1); assert!(payment_failed_permanently); } else { panic!("Unexpected event!"); } - match events[1] { + match events[2] { Event::PaymentFailed { payment_hash, .. } => { assert_eq!(payment_hash, payment_hash_1); }, _ => panic!("Unexpected event"), } - match events[2] { + match events[0] { Event::PendingHTLCsForwardable { .. } => { }, _ => panic!("Unexpected event"), }; @@ -1803,7 +1803,6 @@ fn monitor_update_claim_fail_no_response() { chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); nodes[1].node.claim_funds(payment_preimage_1); - expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000); check_added_monitors!(nodes[1], 1); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); @@ -1811,6 +1810,7 @@ fn monitor_update_claim_fail_no_response() { chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); + expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000); check_added_monitors!(nodes[1], 0); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); @@ -2290,7 +2290,6 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) { chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); nodes[0].node.claim_funds(payment_preimage_0); check_added_monitors!(nodes[0], 1); - expect_payment_claimed!(nodes[0], payment_hash_0, 100_000); nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &send.msgs[0]); nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &send.commitment_msg); @@ -2353,6 +2352,7 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) { chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); let (funding_txo, mon_id, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id).unwrap().clone(); nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_txo, mon_id); + expect_payment_claimed!(nodes[0], payment_hash_0, 100_000); // New outbound messages should be generated immediately upon a call to // get_and_clear_pending_msg_events (but not before). @@ -2606,7 +2606,15 @@ fn test_permanent_error_during_sending_shutdown() { chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::PermanentFailure); assert!(nodes[0].node.close_channel(&channel_id, &nodes[1].node.get_our_node_id()).is_ok()); - check_closed_broadcast!(nodes[0], true); + + // We always send the `shutdown` response when initiating a shutdown, even if we immediately + // close the channel thereafter. + let msg_events = nodes[0].node.get_and_clear_pending_msg_events(); + assert_eq!(msg_events.len(), 3); + if let MessageSendEvent::SendShutdown { .. } = msg_events[0] {} else { panic!(); } + if let MessageSendEvent::BroadcastChannelUpdate { .. } = msg_events[1] {} else { panic!(); } + if let MessageSendEvent::HandleError { .. } = msg_events[2] {} else { panic!(); } + check_added_monitors!(nodes[0], 2); check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() }); } @@ -2629,7 +2637,15 @@ fn test_permanent_error_during_handling_shutdown() { assert!(nodes[0].node.close_channel(&channel_id, &nodes[1].node.get_our_node_id()).is_ok()); let shutdown = get_event_msg!(nodes[0], MessageSendEvent::SendShutdown, nodes[1].node.get_our_node_id()); nodes[1].node.handle_shutdown(&nodes[0].node.get_our_node_id(), &shutdown); - check_closed_broadcast!(nodes[1], true); + + // We always send the `shutdown` response when receiving a shutdown, even if we immediately + // close the channel thereafter. + let msg_events = nodes[1].node.get_and_clear_pending_msg_events(); + assert_eq!(msg_events.len(), 3); + if let MessageSendEvent::SendShutdown { .. } = msg_events[0] {} else { panic!(); } + if let MessageSendEvent::BroadcastChannelUpdate { .. } = msg_events[1] {} else { panic!(); } + if let MessageSendEvent::HandleError { .. } = msg_events[2] {} else { panic!(); } + check_added_monitors!(nodes[1], 2); check_closed_event!(nodes[1], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() }); } @@ -2651,7 +2667,6 @@ fn double_temp_error() { // `claim_funds` results in a ChannelMonitorUpdate. nodes[1].node.claim_funds(payment_preimage_1); check_added_monitors!(nodes[1], 1); - expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000); let (funding_tx, latest_update_1, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); @@ -2659,7 +2674,6 @@ fn double_temp_error() { // which had some asserts that prevented it from being called twice. nodes[1].node.claim_funds(payment_preimage_2); check_added_monitors!(nodes[1], 1); - expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000); chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); let (_, latest_update_2, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); @@ -2668,11 +2682,24 @@ fn double_temp_error() { check_added_monitors!(nodes[1], 0); nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_tx, latest_update_2); - // Complete the first HTLC. - let events = nodes[1].node.get_and_clear_pending_msg_events(); - assert_eq!(events.len(), 1); + // Complete the first HTLC. Note that as a side-effect we handle the monitor update completions + // and get both PaymentClaimed events at once. + let msg_events = nodes[1].node.get_and_clear_pending_msg_events(); + + let events = nodes[1].node.get_and_clear_pending_events(); + assert_eq!(events.len(), 2); + match events[0] { + Event::PaymentClaimed { amount_msat: 1_000_000, payment_hash, .. } => assert_eq!(payment_hash, payment_hash_1), + _ => panic!("Unexpected Event: {:?}", events[0]), + } + match events[1] { + Event::PaymentClaimed { amount_msat: 1_000_000, payment_hash, .. } => assert_eq!(payment_hash, payment_hash_2), + _ => panic!("Unexpected Event: {:?}", events[1]), + } + + assert_eq!(msg_events.len(), 1); let (update_fulfill_1, commitment_signed_b1, node_id) = { - match &events[0] { + match &msg_events[0] { &MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { assert!(update_add_htlcs.is_empty()); assert_eq!(update_fulfill_htlcs.len(), 1); diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 896c47504..53e8201ff 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -393,35 +393,21 @@ enum UpdateFulfillFetch { } /// The return type of get_update_fulfill_htlc_and_commit. -pub enum UpdateFulfillCommitFetch { +pub enum UpdateFulfillCommitFetch<'a> { /// Indicates the HTLC fulfill is new, and either generated an update_fulfill message, placed /// it in the holding cell, or re-generated the update_fulfill message after the same claim was /// previously placed in the holding cell (and has since been removed). NewClaim { /// The ChannelMonitorUpdate which places the new payment preimage in the channel monitor - monitor_update: ChannelMonitorUpdate, + monitor_update: &'a ChannelMonitorUpdate, /// The value of the HTLC which was claimed, in msat. htlc_value_msat: u64, - /// The update_fulfill message and commitment_signed message (if the claim was not placed - /// in the holding cell). - msgs: Option<(msgs::UpdateFulfillHTLC, msgs::CommitmentSigned)>, }, /// Indicates the HTLC fulfill is duplicative and already existed either in the holding cell /// or has been forgotten (presumably previously claimed). DuplicateClaim {}, } -/// The return value of `revoke_and_ack` on success, primarily updates to other channels or HTLC -/// state. -pub(super) struct RAAUpdates { - pub commitment_update: Option, - pub accepted_htlcs: Vec<(PendingHTLCInfo, u64)>, - pub failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, - pub finalized_claimed_htlcs: Vec, - pub monitor_update: ChannelMonitorUpdate, - pub holding_cell_failed_htlcs: Vec<(HTLCSource, PaymentHash)>, -} - /// The return value of `monitor_updating_restored` pub(super) struct MonitorRestoreUpdates { pub raa: Option, @@ -1979,22 +1965,30 @@ impl Channel { } } - pub fn get_update_fulfill_htlc_and_commit(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> Result where L::Target: Logger { + pub fn get_update_fulfill_htlc_and_commit(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger { match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) { - UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(update_fulfill_htlc) } => { - let (commitment, mut additional_update) = match self.send_commitment_no_status_check(logger) { - Err(e) => return Err((e, monitor_update)), - Ok(res) => res - }; - // send_commitment_no_status_check may bump latest_monitor_id but we want them to be + UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(_) } => { + let mut additional_update = self.build_commitment_no_status_check(logger); + // build_commitment_no_status_check may bump latest_monitor_id but we want them to be // strictly increasing by one, so decrement it here. self.latest_monitor_update_id = monitor_update.update_id; monitor_update.updates.append(&mut additional_update.updates); - Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, msgs: Some((update_fulfill_htlc, commitment)) }) + self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new()); + self.pending_monitor_updates.push(monitor_update); + UpdateFulfillCommitFetch::NewClaim { + monitor_update: self.pending_monitor_updates.last().unwrap(), + htlc_value_msat, + } }, - UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } => - Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, msgs: None }), - UpdateFulfillFetch::DuplicateClaim {} => Ok(UpdateFulfillCommitFetch::DuplicateClaim {}), + UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } => { + self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new()); + self.pending_monitor_updates.push(monitor_update); + UpdateFulfillCommitFetch::NewClaim { + monitor_update: self.pending_monitor_updates.last().unwrap(), + htlc_value_msat, + } + } + UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {}, } } @@ -3049,17 +3043,17 @@ impl Channel { Ok(()) } - pub fn commitment_signed(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<(msgs::RevokeAndACK, Option, ChannelMonitorUpdate), (Option, ChannelError)> + pub fn commitment_signed(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<&ChannelMonitorUpdate, ChannelError> where L::Target: Logger { if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) { - return Err((None, ChannelError::Close("Got commitment signed message when channel was not in an operational state".to_owned()))); + return Err(ChannelError::Close("Got commitment signed message when channel was not in an operational state".to_owned())); } if self.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 { - return Err((None, ChannelError::Close("Peer sent commitment_signed when we needed a channel_reestablish".to_owned()))); + return Err(ChannelError::Close("Peer sent commitment_signed when we needed a channel_reestablish".to_owned())); } if self.channel_state & BOTH_SIDES_SHUTDOWN_MASK == BOTH_SIDES_SHUTDOWN_MASK && self.last_sent_closing_fee.is_some() { - return Err((None, ChannelError::Close("Peer sent commitment_signed after we'd started exchanging closing_signeds".to_owned()))); + return Err(ChannelError::Close("Peer sent commitment_signed after we'd started exchanging closing_signeds".to_owned())); } let funding_script = self.get_funding_redeemscript(); @@ -3077,7 +3071,7 @@ impl Channel { log_bytes!(self.counterparty_funding_pubkey().serialize()), encode::serialize_hex(&bitcoin_tx.transaction), log_bytes!(sighash[..]), encode::serialize_hex(&funding_script), log_bytes!(self.channel_id())); if let Err(_) = self.secp_ctx.verify_ecdsa(&sighash, &msg.signature, &self.counterparty_funding_pubkey()) { - return Err((None, ChannelError::Close("Invalid commitment tx signature from peer".to_owned()))); + return Err(ChannelError::Close("Invalid commitment tx signature from peer".to_owned())); } bitcoin_tx.txid }; @@ -3092,7 +3086,7 @@ impl Channel { debug_assert!(!self.is_outbound()); let counterparty_reserve_we_require_msat = self.holder_selected_channel_reserve_satoshis * 1000; if commitment_stats.remote_balance_msat < commitment_stats.total_fee_sat * 1000 + counterparty_reserve_we_require_msat { - return Err((None, ChannelError::Close("Funding remote cannot afford proposed new fee".to_owned()))); + return Err(ChannelError::Close("Funding remote cannot afford proposed new fee".to_owned())); } } #[cfg(any(test, fuzzing))] @@ -3114,7 +3108,7 @@ impl Channel { } if msg.htlc_signatures.len() != commitment_stats.num_nondust_htlcs { - return Err((None, ChannelError::Close(format!("Got wrong number of HTLC signatures ({}) from remote. It must be {}", msg.htlc_signatures.len(), commitment_stats.num_nondust_htlcs)))); + return Err(ChannelError::Close(format!("Got wrong number of HTLC signatures ({}) from remote. It must be {}", msg.htlc_signatures.len(), commitment_stats.num_nondust_htlcs))); } // TODO: Sadly, we pass HTLCs twice to ChannelMonitor: once via the HolderCommitmentTransaction and once via the update @@ -3132,7 +3126,7 @@ impl Channel { log_bytes!(msg.htlc_signatures[idx].serialize_compact()[..]), log_bytes!(keys.countersignatory_htlc_key.serialize()), encode::serialize_hex(&htlc_tx), log_bytes!(htlc_sighash[..]), encode::serialize_hex(&htlc_redeemscript), log_bytes!(self.channel_id())); if let Err(_) = self.secp_ctx.verify_ecdsa(&htlc_sighash, &msg.htlc_signatures[idx], &keys.countersignatory_htlc_key) { - return Err((None, ChannelError::Close("Invalid HTLC tx signature from peer".to_owned()))); + return Err(ChannelError::Close("Invalid HTLC tx signature from peer".to_owned())); } htlcs_and_sigs.push((htlc, Some(msg.htlc_signatures[idx]), source)); } else { @@ -3148,10 +3142,8 @@ impl Channel { self.counterparty_funding_pubkey() ); - let next_per_commitment_point = self.holder_signer.get_per_commitment_point(self.cur_holder_commitment_transaction_number - 1, &self.secp_ctx); self.holder_signer.validate_holder_commitment(&holder_commitment_tx, commitment_stats.preimages) - .map_err(|_| (None, ChannelError::Close("Failed to validate our commitment".to_owned())))?; - let per_commitment_secret = self.holder_signer.release_commitment_secret(self.cur_holder_commitment_transaction_number + 1); + .map_err(|_| ChannelError::Close("Failed to validate our commitment".to_owned()))?; // Update state now that we've passed all the can-fail calls... let mut need_commitment = false; @@ -3196,7 +3188,7 @@ impl Channel { self.cur_holder_commitment_transaction_number -= 1; // Note that if we need_commitment & !AwaitingRemoteRevoke we'll call - // send_commitment_no_status_check() next which will reset this to RAAFirst. + // build_commitment_no_status_check() next which will reset this to RAAFirst. self.resend_order = RAACommitmentOrder::CommitmentFirst; if (self.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0 { @@ -3208,52 +3200,50 @@ impl Channel { // the corresponding HTLC status updates so that get_last_commitment_update // includes the right HTLCs. self.monitor_pending_commitment_signed = true; - let (_, mut additional_update) = self.send_commitment_no_status_check(logger).map_err(|e| (None, e))?; - // send_commitment_no_status_check may bump latest_monitor_id but we want them to be + let mut additional_update = self.build_commitment_no_status_check(logger); + // build_commitment_no_status_check may bump latest_monitor_id but we want them to be // strictly increasing by one, so decrement it here. self.latest_monitor_update_id = monitor_update.update_id; monitor_update.updates.append(&mut additional_update.updates); } log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updated HTLC state but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id)); - return Err((Some(monitor_update), ChannelError::Ignore("Previous monitor update failure prevented generation of RAA".to_owned()))); + self.pending_monitor_updates.push(monitor_update); + return Ok(self.pending_monitor_updates.last().unwrap()); } - let commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 { + let need_commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 { // If we're AwaitingRemoteRevoke we can't send a new commitment here, but that's ok - // we'll send one right away when we get the revoke_and_ack when we // free_holding_cell_htlcs(). - let (msg, mut additional_update) = self.send_commitment_no_status_check(logger).map_err(|e| (None, e))?; - // send_commitment_no_status_check may bump latest_monitor_id but we want them to be + let mut additional_update = self.build_commitment_no_status_check(logger); + // build_commitment_no_status_check may bump latest_monitor_id but we want them to be // strictly increasing by one, so decrement it here. self.latest_monitor_update_id = monitor_update.update_id; monitor_update.updates.append(&mut additional_update.updates); - Some(msg) - } else { None }; + true + } else { false }; log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updating HTLC state and responding with{} a revoke_and_ack.", - log_bytes!(self.channel_id()), if commitment_signed.is_some() { " our own commitment_signed and" } else { "" }); - - Ok((msgs::RevokeAndACK { - channel_id: self.channel_id, - per_commitment_secret, - next_per_commitment_point, - }, commitment_signed, monitor_update)) + log_bytes!(self.channel_id()), if need_commitment_signed { " our own commitment_signed and" } else { "" }); + self.pending_monitor_updates.push(monitor_update); + self.monitor_updating_paused(true, need_commitment_signed, false, Vec::new(), Vec::new(), Vec::new()); + return Ok(self.pending_monitor_updates.last().unwrap()); } /// Public version of the below, checking relevant preconditions first. /// If we're not in a state where freeing the holding cell makes sense, this is a no-op and /// returns `(None, Vec::new())`. - pub fn maybe_free_holding_cell_htlcs(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger { + pub fn maybe_free_holding_cell_htlcs(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger { if self.channel_state >= ChannelState::ChannelReady as u32 && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) == 0 { self.free_holding_cell_htlcs(logger) - } else { Ok((None, Vec::new())) } + } else { (None, Vec::new()) } } /// Frees any pending commitment updates in the holding cell, generating the relevant messages /// for our counterparty. - fn free_holding_cell_htlcs(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger { + fn free_holding_cell_htlcs(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger { assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, 0); if self.holding_cell_htlc_updates.len() != 0 || self.holding_cell_update_fee.is_some() { log_trace!(logger, "Freeing holding cell with {} HTLC updates{} in channel {}", self.holding_cell_htlc_updates.len(), @@ -3334,7 +3324,7 @@ impl Channel { } } if update_add_htlcs.is_empty() && update_fulfill_htlcs.is_empty() && update_fail_htlcs.is_empty() && self.holding_cell_update_fee.is_none() { - return Ok((None, htlcs_to_fail)); + return (None, htlcs_to_fail); } let update_fee = if let Some(feerate) = self.holding_cell_update_fee.take() { self.send_update_fee(feerate, false, logger) @@ -3342,8 +3332,8 @@ impl Channel { None }; - let (commitment_signed, mut additional_update) = self.send_commitment_no_status_check(logger)?; - // send_commitment_no_status_check and get_update_fulfill_htlc may bump latest_monitor_id + let mut additional_update = self.build_commitment_no_status_check(logger); + // build_commitment_no_status_check and get_update_fulfill_htlc may bump latest_monitor_id // but we want them to be strictly increasing by one, so reset it here. self.latest_monitor_update_id = monitor_update.update_id; monitor_update.updates.append(&mut additional_update.updates); @@ -3352,16 +3342,11 @@ impl Channel { log_bytes!(self.channel_id()), if update_fee.is_some() { "a fee update, " } else { "" }, update_add_htlcs.len(), update_fulfill_htlcs.len(), update_fail_htlcs.len()); - Ok((Some((msgs::CommitmentUpdate { - update_add_htlcs, - update_fulfill_htlcs, - update_fail_htlcs, - update_fail_malformed_htlcs: Vec::new(), - update_fee, - commitment_signed, - }, monitor_update)), htlcs_to_fail)) + self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new()); + self.pending_monitor_updates.push(monitor_update); + (Some(self.pending_monitor_updates.last().unwrap()), htlcs_to_fail) } else { - Ok((None, Vec::new())) + (None, Vec::new()) } } @@ -3370,7 +3355,7 @@ impl Channel { /// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail, /// generating an appropriate error *after* the channel state has been updated based on the /// revoke_and_ack message. - pub fn revoke_and_ack(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result + pub fn revoke_and_ack(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, &ChannelMonitorUpdate), ChannelError> where L::Target: Logger, { if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) { @@ -3557,8 +3542,8 @@ impl Channel { // When the monitor updating is restored we'll call get_last_commitment_update(), // which does not update state, but we're definitely now awaiting a remote revoke // before we can step forward any more, so set it here. - let (_, mut additional_update) = self.send_commitment_no_status_check(logger)?; - // send_commitment_no_status_check may bump latest_monitor_id but we want them to be + let mut additional_update = self.build_commitment_no_status_check(logger); + // build_commitment_no_status_check may bump latest_monitor_id but we want them to be // strictly increasing by one, so decrement it here. self.latest_monitor_update_id = monitor_update.update_id; monitor_update.updates.append(&mut additional_update.updates); @@ -3567,71 +3552,41 @@ impl Channel { self.monitor_pending_failures.append(&mut revoked_htlcs); self.monitor_pending_finalized_fulfills.append(&mut finalized_claimed_htlcs); log_debug!(logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id())); - return Ok(RAAUpdates { - commitment_update: None, finalized_claimed_htlcs: Vec::new(), - accepted_htlcs: Vec::new(), failed_htlcs: Vec::new(), - monitor_update, - holding_cell_failed_htlcs: Vec::new() - }); + self.pending_monitor_updates.push(monitor_update); + return Ok((Vec::new(), self.pending_monitor_updates.last().unwrap())); } - match self.free_holding_cell_htlcs(logger)? { - (Some((mut commitment_update, mut additional_update)), htlcs_to_fail) => { - commitment_update.update_fail_htlcs.reserve(update_fail_htlcs.len()); - for fail_msg in update_fail_htlcs.drain(..) { - commitment_update.update_fail_htlcs.push(fail_msg); - } - commitment_update.update_fail_malformed_htlcs.reserve(update_fail_malformed_htlcs.len()); - for fail_msg in update_fail_malformed_htlcs.drain(..) { - commitment_update.update_fail_malformed_htlcs.push(fail_msg); - } - + match self.free_holding_cell_htlcs(logger) { + (Some(_), htlcs_to_fail) => { + let mut additional_update = self.pending_monitor_updates.pop().unwrap(); // free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be // strictly increasing by one, so decrement it here. self.latest_monitor_update_id = monitor_update.update_id; monitor_update.updates.append(&mut additional_update.updates); - Ok(RAAUpdates { - commitment_update: Some(commitment_update), - finalized_claimed_htlcs, - accepted_htlcs: to_forward_infos, - failed_htlcs: revoked_htlcs, - monitor_update, - holding_cell_failed_htlcs: htlcs_to_fail - }) + self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs); + self.pending_monitor_updates.push(monitor_update); + Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap())) }, (None, htlcs_to_fail) => { if require_commitment { - let (commitment_signed, mut additional_update) = self.send_commitment_no_status_check(logger)?; + let mut additional_update = self.build_commitment_no_status_check(logger); - // send_commitment_no_status_check may bump latest_monitor_id but we want them to be + // build_commitment_no_status_check may bump latest_monitor_id but we want them to be // strictly increasing by one, so decrement it here. self.latest_monitor_update_id = monitor_update.update_id; monitor_update.updates.append(&mut additional_update.updates); log_debug!(logger, "Received a valid revoke_and_ack for channel {}. Responding with a commitment update with {} HTLCs failed.", log_bytes!(self.channel_id()), update_fail_htlcs.len() + update_fail_malformed_htlcs.len()); - Ok(RAAUpdates { - commitment_update: Some(msgs::CommitmentUpdate { - update_add_htlcs: Vec::new(), - update_fulfill_htlcs: Vec::new(), - update_fail_htlcs, - update_fail_malformed_htlcs, - update_fee: None, - commitment_signed - }), - finalized_claimed_htlcs, - accepted_htlcs: to_forward_infos, failed_htlcs: revoked_htlcs, - monitor_update, holding_cell_failed_htlcs: htlcs_to_fail - }) + self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs); + self.pending_monitor_updates.push(monitor_update); + Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap())) } else { log_debug!(logger, "Received a valid revoke_and_ack for channel {} with no reply necessary.", log_bytes!(self.channel_id())); - Ok(RAAUpdates { - commitment_update: None, - finalized_claimed_htlcs, - accepted_htlcs: to_forward_infos, failed_htlcs: revoked_htlcs, - monitor_update, holding_cell_failed_htlcs: htlcs_to_fail - }) + self.monitor_updating_paused(false, false, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs); + self.pending_monitor_updates.push(monitor_update); + Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap())) } } } @@ -3818,6 +3773,7 @@ impl Channel { { assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32); self.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32); + self.pending_monitor_updates.clear(); // If we're past (or at) the FundingSent stage on an outbound channel, try to // (re-)broadcast the funding transaction as we may have declined to broadcast it when we @@ -4295,7 +4251,7 @@ impl Channel { pub fn shutdown( &mut self, signer_provider: &SP, their_features: &InitFeatures, msg: &msgs::Shutdown - ) -> Result<(Option, Option, Vec<(HTLCSource, PaymentHash)>), ChannelError> + ) -> Result<(Option, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where SP::Target: SignerProvider { if self.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 { @@ -4351,12 +4307,15 @@ impl Channel { let monitor_update = if update_shutdown_script { self.latest_monitor_update_id += 1; - Some(ChannelMonitorUpdate { + let monitor_update = ChannelMonitorUpdate { update_id: self.latest_monitor_update_id, updates: vec![ChannelMonitorUpdateStep::ShutdownScript { scriptpubkey: self.get_closing_scriptpubkey(), }], - }) + }; + self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new()); + self.pending_monitor_updates.push(monitor_update); + Some(self.pending_monitor_updates.last().unwrap()) } else { None }; let shutdown = if send_shutdown { Some(msgs::Shutdown { @@ -5810,15 +5769,6 @@ impl Channel { Ok(Some(res)) } - /// Only fails in case of signer rejection. - fn send_commitment_no_status_check(&mut self, logger: &L) -> Result<(msgs::CommitmentSigned, ChannelMonitorUpdate), ChannelError> where L::Target: Logger { - let monitor_update = self.build_commitment_no_status_check(logger); - match self.send_commitment_no_state_update(logger) { - Ok((commitment_signed, _)) => Ok((commitment_signed, monitor_update)), - Err(e) => Err(e), - } - } - fn build_commitment_no_status_check(&mut self, logger: &L) -> ChannelMonitorUpdate where L::Target: Logger { log_trace!(logger, "Updating HTLC state for a newly-sent commitment_signed..."); // We can upgrade the status of some HTLCs that are waiting on a commitment, even if we @@ -5944,16 +5894,20 @@ impl Channel { }, (counterparty_commitment_txid, commitment_stats.htlcs_included))) } - /// Adds a pending outbound HTLC to this channel, and creates a signed commitment transaction - /// to send to the remote peer in one go. + /// Adds a pending outbound HTLC to this channel, and builds a new remote commitment + /// transaction and generates the corresponding [`ChannelMonitorUpdate`] in one go. /// /// Shorthand for calling [`Self::send_htlc`] followed by a commitment update, see docs on - /// [`Self::send_htlc`] and [`Self::send_commitment_no_state_update`] for more info. - pub fn send_htlc_and_commit(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, logger: &L) -> Result, ChannelError> where L::Target: Logger { - match self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, false, logger)? { - Some(update_add_htlc) => { - let (commitment_signed, monitor_update) = self.send_commitment_no_status_check(logger)?; - Ok(Some((update_add_htlc, commitment_signed, monitor_update))) + /// [`Self::send_htlc`] and [`Self::build_commitment_no_state_update`] for more info. + pub fn send_htlc_and_commit(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, logger: &L) -> Result, ChannelError> where L::Target: Logger { + let send_res = self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, false, logger); + if let Err(e) = &send_res { if let ChannelError::Ignore(_) = e {} else { debug_assert!(false, "Sending cannot trigger channel failure"); } } + match send_res? { + Some(_) => { + let monitor_update = self.build_commitment_no_status_check(logger); + self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new()); + self.pending_monitor_updates.push(monitor_update); + Ok(Some(self.pending_monitor_updates.last().unwrap())) }, None => Ok(None) } @@ -5979,8 +5933,12 @@ impl Channel { /// Begins the shutdown process, getting a message for the remote peer and returning all /// holding cell HTLCs for payment failure. - pub fn get_shutdown(&mut self, signer_provider: &SP, their_features: &InitFeatures, target_feerate_sats_per_kw: Option) - -> Result<(msgs::Shutdown, Option, Vec<(HTLCSource, PaymentHash)>), APIError> + /// + /// May jump to the channel being fully shutdown (see [`Self::is_shutdown`]) in which case no + /// [`ChannelMonitorUpdate`] will be returned). + pub fn get_shutdown(&mut self, signer_provider: &SP, their_features: &InitFeatures, + target_feerate_sats_per_kw: Option) + -> Result<(msgs::Shutdown, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError> where SP::Target: SignerProvider { for htlc in self.pending_outbound_htlcs.iter() { if let OutboundHTLCState::LocalAnnounced(_) = htlc.state { @@ -6023,12 +5981,15 @@ impl Channel { let monitor_update = if update_shutdown_script { self.latest_monitor_update_id += 1; - Some(ChannelMonitorUpdate { + let monitor_update = ChannelMonitorUpdate { update_id: self.latest_monitor_update_id, updates: vec![ChannelMonitorUpdateStep::ShutdownScript { scriptpubkey: self.get_closing_scriptpubkey(), }], - }) + }; + self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new()); + self.pending_monitor_updates.push(monitor_update); + Some(self.pending_monitor_updates.last().unwrap()) } else { None }; let shutdown = msgs::Shutdown { channel_id: self.channel_id, @@ -6049,6 +6010,9 @@ impl Channel { } }); + debug_assert!(!self.is_shutdown() || monitor_update.is_none(), + "we can't both complete shutdown and return a monitor update"); + Ok((shutdown, monitor_update, dropped_outbound_htlcs)) } diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index ee3b50936..675122cfb 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -345,17 +345,6 @@ impl MsgHandleErrInternal { } } #[inline] - fn ignore_no_close(err: String) -> Self { - Self { - err: LightningError { - err, - action: msgs::ErrorAction::IgnoreError, - }, - chan_id: None, - shutdown_finish: None, - } - } - #[inline] fn from_no_close(err: msgs::LightningError) -> Self { Self { err, chan_id: None, shutdown_finish: None } } @@ -1869,25 +1858,27 @@ where let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(channel_id.clone()) { hash_map::Entry::Occupied(mut chan_entry) => { - let (shutdown_msg, monitor_update, htlcs) = chan_entry.get_mut().get_shutdown(&self.signer_provider, &peer_state.latest_features, target_feerate_sats_per_1000_weight)?; + let funding_txo_opt = chan_entry.get().get_funding_txo(); + let their_features = &peer_state.latest_features; + let (shutdown_msg, mut monitor_update_opt, htlcs) = chan_entry.get_mut() + .get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight)?; failed_htlcs = htlcs; - // Update the monitor with the shutdown script if necessary. - if let Some(monitor_update) = monitor_update { - let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), &monitor_update); - let (result, is_permanent) = - handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE); - if is_permanent { - remove_channel!(self, chan_entry); - break result; - } - } - + // We can send the `shutdown` message before updating the `ChannelMonitor` + // here as we don't need the monitor update to complete until we send a + // `shutdown_signed`, which we'll delay if we're pending a monitor update. peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { node_id: *counterparty_node_id, - msg: shutdown_msg + msg: shutdown_msg, }); + // Update the monitor with the shutdown script if necessary. + if let Some(monitor_update) = monitor_update_opt.take() { + let update_id = monitor_update.update_id; + let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update); + break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan_entry); + } + if chan_entry.get().is_shutdown() { let channel = remove_channel!(self, chan_entry); if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) { @@ -2494,51 +2485,32 @@ where if !chan.get().is_live() { return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected".to_owned()}); } - match { - break_chan_entry!(self, chan.get_mut().send_htlc_and_commit( - htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute { - path: path.clone(), - session_priv: session_priv.clone(), - first_hop_htlc_msat: htlc_msat, - payment_id, - payment_secret: payment_secret.clone(), - payment_params: payment_params.clone(), - }, onion_packet, &self.logger), - chan) - } { - Some((update_add, commitment_signed, monitor_update)) => { - let update_err = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update); - let chan_id = chan.get().channel_id(); - match (update_err, - handle_monitor_update_res!(self, update_err, chan, - RAACommitmentOrder::CommitmentFirst, false, true)) - { - (ChannelMonitorUpdateStatus::PermanentFailure, Err(e)) => break Err(e), - (ChannelMonitorUpdateStatus::Completed, Ok(())) => {}, - (ChannelMonitorUpdateStatus::InProgress, Err(_)) => { - // Note that MonitorUpdateInProgress here indicates (per function - // docs) that we will resend the commitment update once monitor - // updating completes. Therefore, we must return an error - // indicating that it is unsafe to retry the payment wholesale, - // which we do in the send_payment check for - // MonitorUpdateInProgress, below. - return Err(APIError::MonitorUpdateInProgress); - }, - _ => unreachable!(), + let funding_txo = chan.get().get_funding_txo().unwrap(); + let send_res = chan.get_mut().send_htlc_and_commit(htlc_msat, payment_hash.clone(), + htlc_cltv, HTLCSource::OutboundRoute { + path: path.clone(), + session_priv: session_priv.clone(), + first_hop_htlc_msat: htlc_msat, + payment_id, + payment_secret: payment_secret.clone(), + payment_params: payment_params.clone(), + }, onion_packet, &self.logger); + match break_chan_entry!(self, send_res, chan) { + Some(monitor_update) => { + let update_id = monitor_update.update_id; + let update_res = self.chain_monitor.update_channel(funding_txo, monitor_update); + if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan) { + break Err(e); + } + if update_res == ChannelMonitorUpdateStatus::InProgress { + // Note that MonitorUpdateInProgress here indicates (per function + // docs) that we will resend the commitment update once monitor + // updating completes. Therefore, we must return an error + // indicating that it is unsafe to retry the payment wholesale, + // which we do in the send_payment check for + // MonitorUpdateInProgress, below. + return Err(APIError::MonitorUpdateInProgress); } - - log_debug!(self.logger, "Sending payment along path resulted in a commitment_signed for channel {}", log_bytes!(chan_id)); - peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: path.first().unwrap().pubkey, - updates: msgs::CommitmentUpdate { - update_add_htlcs: vec![update_add], - update_fulfill_htlcs: Vec::new(), - update_fail_htlcs: Vec::new(), - update_fail_malformed_htlcs: Vec::new(), - update_fee: None, - commitment_signed, - }, - }); }, None => { }, } @@ -4037,7 +4009,6 @@ where let per_peer_state = self.per_peer_state.read().unwrap(); let chan_id = prev_hop.outpoint.to_channel_id(); - let counterparty_node_id_opt = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) { Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()), None => None @@ -4049,99 +4020,57 @@ where ) ).unwrap_or(None); - if let Some(hash_map::Entry::Occupied(mut chan)) = peer_state_opt.as_mut().map(|peer_state| peer_state.channel_by_id.entry(chan_id)) - { - let counterparty_node_id = chan.get().get_counterparty_node_id(); - match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) { - Ok(msgs_monitor_option) => { - if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option { - match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) { - ChannelMonitorUpdateStatus::Completed => {}, - e => { - log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Debug }, - "Failed to update channel monitor with preimage {:?}: {:?}", - payment_preimage, e); - let err = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err(); - mem::drop(peer_state_opt); - mem::drop(per_peer_state); - self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat))); - return Err((counterparty_node_id, err)); - } - } - if let Some((msg, commitment_signed)) = msgs { - log_debug!(self.logger, "Claiming funds for HTLC with preimage {} resulted in a commitment_signed for channel {}", - log_bytes!(payment_preimage.0), log_bytes!(chan.get().channel_id())); - peer_state_opt.as_mut().unwrap().pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: counterparty_node_id, - updates: msgs::CommitmentUpdate { - update_add_htlcs: Vec::new(), - update_fulfill_htlcs: vec![msg], - update_fail_htlcs: Vec::new(), - update_fail_malformed_htlcs: Vec::new(), - update_fee: None, - commitment_signed, - } - }); - } - mem::drop(peer_state_opt); - mem::drop(per_peer_state); - self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat))); - Ok(()) - } else { - Ok(()) + if let Some(mut peer_state_lock) = peer_state_opt.take() { + let peer_state = &mut *peer_state_lock; + if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(chan_id) { + let counterparty_node_id = chan.get().get_counterparty_node_id(); + let fulfill_res = chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger); + + if let UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } = fulfill_res { + if let Some(action) = completion_action(Some(htlc_value_msat)) { + log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}", + log_bytes!(chan_id), action); + peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); } - }, - Err((e, monitor_update)) => { - match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) { - ChannelMonitorUpdateStatus::Completed => {}, - e => { - // TODO: This needs to be handled somehow - if we receive a monitor update - // with a preimage we *must* somehow manage to propagate it to the upstream - // channel, or we must have an ability to receive the same update and try - // again on restart. - log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Info }, - "Failed to update channel monitor with preimage {:?} immediately prior to force-close: {:?}", - payment_preimage, e); - }, + let update_id = monitor_update.update_id; + let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, monitor_update); + let res = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, + peer_state, chan); + if let Err(e) = res { + // TODO: This is a *critical* error - we probably updated the outbound edge + // of the HTLC's monitor with a preimage. We should retry this monitor + // update over and over again until morale improves. + log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage); + return Err((counterparty_node_id, e)); } - let (drop, res) = convert_chan_err!(self, e, chan.get_mut(), &chan_id); - if drop { - chan.remove_entry(); - } - mem::drop(peer_state_opt); - mem::drop(per_peer_state); - self.handle_monitor_update_completion_actions(completion_action(None)); - Err((counterparty_node_id, res)) - }, + } + return Ok(()); } - } else { - let preimage_update = ChannelMonitorUpdate { - update_id: CLOSED_CHANNEL_UPDATE_ID, - updates: vec![ChannelMonitorUpdateStep::PaymentPreimage { - payment_preimage, - }], - }; - // We update the ChannelMonitor on the backward link, after - // receiving an `update_fulfill_htlc` from the forward link. - let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update); - if update_res != ChannelMonitorUpdateStatus::Completed { - // TODO: This needs to be handled somehow - if we receive a monitor update - // with a preimage we *must* somehow manage to propagate it to the upstream - // channel, or we must have an ability to receive the same event and try - // again on restart. - log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}", - payment_preimage, update_res); - } - mem::drop(peer_state_opt); - mem::drop(per_peer_state); - // Note that we do process the completion action here. This totally could be a - // duplicate claim, but we have no way of knowing without interrogating the - // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are - // generally always allowed to be duplicative (and it's specifically noted in - // `PaymentForwarded`). - self.handle_monitor_update_completion_actions(completion_action(None)); - Ok(()) } + let preimage_update = ChannelMonitorUpdate { + update_id: CLOSED_CHANNEL_UPDATE_ID, + updates: vec![ChannelMonitorUpdateStep::PaymentPreimage { + payment_preimage, + }], + }; + // We update the ChannelMonitor on the backward link, after + // receiving an `update_fulfill_htlc` from the forward link. + let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update); + if update_res != ChannelMonitorUpdateStatus::Completed { + // TODO: This needs to be handled somehow - if we receive a monitor update + // with a preimage we *must* somehow manage to propagate it to the upstream + // channel, or we must have an ability to receive the same event and try + // again on restart. + log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}", + payment_preimage, update_res); + } + // Note that we do process the completion action here. This totally could be a + // duplicate claim, but we have no way of knowing without interrogating the + // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are + // generally always allowed to be duplicative (and it's specifically noted in + // `PaymentForwarded`). + self.handle_monitor_update_completion_actions(completion_action(None)); + Ok(()) } fn finalize_claims(&self, sources: Vec) { @@ -4670,27 +4599,27 @@ where if chan_entry.get().sent_shutdown() { " after we initiated shutdown" } else { "" }); } - let (shutdown, monitor_update, htlcs) = try_chan_entry!(self, chan_entry.get_mut().shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_entry); + let funding_txo_opt = chan_entry.get().get_funding_txo(); + let (shutdown, monitor_update_opt, htlcs) = try_chan_entry!(self, + chan_entry.get_mut().shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_entry); dropped_htlcs = htlcs; - // Update the monitor with the shutdown script if necessary. - if let Some(monitor_update) = monitor_update { - let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), &monitor_update); - let (result, is_permanent) = - handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE); - if is_permanent { - remove_channel!(self, chan_entry); - break result; - } - } - if let Some(msg) = shutdown { + // We can send the `shutdown` message before updating the `ChannelMonitor` + // here as we don't need the monitor update to complete until we send a + // `shutdown_signed`, which we'll delay if we're pending a monitor update. peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { node_id: *counterparty_node_id, msg, }); } + // Update the monitor with the shutdown script if necessary. + if let Some(monitor_update) = monitor_update_opt { + let update_id = monitor_update.update_id; + let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update); + break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan_entry); + } break Ok(()); }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) @@ -4702,8 +4631,7 @@ where self.fail_htlc_backwards_internal(&htlc_source.0, &htlc_source.1, &reason, receiver); } - let _ = handle_error!(self, result, *counterparty_node_id); - Ok(()) + result } fn internal_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> { @@ -4877,40 +4805,12 @@ where let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { - let (revoke_and_ack, commitment_signed, monitor_update) = - match chan.get_mut().commitment_signed(&msg, &self.logger) { - Err((None, e)) => try_chan_entry!(self, Err(e), chan), - Err((Some(update), e)) => { - assert!(chan.get().is_awaiting_monitor_update()); - let _ = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &update); - try_chan_entry!(self, Err(e), chan); - unreachable!(); - }, - Ok(res) => res - }; - let update_res = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update); - if let Err(e) = handle_monitor_update_res!(self, update_res, chan, RAACommitmentOrder::RevokeAndACKFirst, true, commitment_signed.is_some()) { - return Err(e); - } - - peer_state.pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK { - node_id: counterparty_node_id.clone(), - msg: revoke_and_ack, - }); - if let Some(msg) = commitment_signed { - peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: counterparty_node_id.clone(), - updates: msgs::CommitmentUpdate { - update_add_htlcs: Vec::new(), - update_fulfill_htlcs: Vec::new(), - update_fail_htlcs: Vec::new(), - update_fail_malformed_htlcs: Vec::new(), - update_fee: None, - commitment_signed: msg, - }, - }); - } - Ok(()) + let funding_txo = chan.get().get_funding_txo(); + let monitor_update = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan); + let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); + let update_id = monitor_update.update_id; + handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, + peer_state, chan) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } @@ -5009,8 +4909,7 @@ where } fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> { - let mut htlcs_to_fail = Vec::new(); - let res = loop { + let (htlcs_to_fail, res) = { let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -5021,59 +4920,19 @@ where let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { - let was_paused_for_mon_update = chan.get().is_awaiting_monitor_update(); - let raa_updates = break_chan_entry!(self, - chan.get_mut().revoke_and_ack(&msg, &self.logger), chan); - htlcs_to_fail = raa_updates.holding_cell_failed_htlcs; - let update_res = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &raa_updates.monitor_update); - if was_paused_for_mon_update { - assert!(update_res != ChannelMonitorUpdateStatus::Completed); - assert!(raa_updates.commitment_update.is_none()); - assert!(raa_updates.accepted_htlcs.is_empty()); - assert!(raa_updates.failed_htlcs.is_empty()); - assert!(raa_updates.finalized_claimed_htlcs.is_empty()); - break Err(MsgHandleErrInternal::ignore_no_close("Existing pending monitor update prevented responses to RAA".to_owned())); - } - if update_res != ChannelMonitorUpdateStatus::Completed { - if let Err(e) = handle_monitor_update_res!(self, update_res, chan, - RAACommitmentOrder::CommitmentFirst, false, - raa_updates.commitment_update.is_some(), false, - raa_updates.accepted_htlcs, raa_updates.failed_htlcs, - raa_updates.finalized_claimed_htlcs) { - break Err(e); - } else { unreachable!(); } - } - if let Some(updates) = raa_updates.commitment_update { - peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: counterparty_node_id.clone(), - updates, - }); - } - break Ok((raa_updates.accepted_htlcs, raa_updates.failed_htlcs, - raa_updates.finalized_claimed_htlcs, - chan.get().get_short_channel_id() - .unwrap_or(chan.get().outbound_scid_alias()), - chan.get().get_funding_txo().unwrap(), - chan.get().get_user_id())) + let funding_txo = chan.get().get_funding_txo(); + let (htlcs_to_fail, monitor_update) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan); + let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); + let update_id = monitor_update.update_id; + let res = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, + peer_state, chan); + (htlcs_to_fail, res) }, - hash_map::Entry::Vacant(_) => break Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) + hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } }; self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id, counterparty_node_id); - match res { - Ok((pending_forwards, mut pending_failures, finalized_claim_htlcs, - short_channel_id, channel_outpoint, user_channel_id)) => - { - for failure in pending_failures.drain(..) { - let receiver = HTLCDestination::NextHopChannel { node_id: Some(*counterparty_node_id), channel_id: channel_outpoint.to_channel_id() }; - self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver); - } - self.forward_htlcs(&mut [(short_channel_id, channel_outpoint, user_channel_id, pending_forwards)]); - self.finalize_claims(finalized_claim_htlcs); - Ok(()) - }, - Err(e) => Err(e) - } + res } fn internal_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) -> Result<(), MsgHandleErrInternal> { @@ -5315,49 +5174,37 @@ where let mut has_monitor_update = false; let mut failed_htlcs = Vec::new(); let mut handle_errors = Vec::new(); - { - let per_peer_state = self.per_peer_state.read().unwrap(); + let per_peer_state = self.per_peer_state.read().unwrap(); - for (_cp_id, peer_state_mutex) in per_peer_state.iter() { + for (_cp_id, peer_state_mutex) in per_peer_state.iter() { + 'chan_loop: loop { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; - let pending_msg_events = &mut peer_state.pending_msg_events; - peer_state.channel_by_id.retain(|channel_id, chan| { - match chan.maybe_free_holding_cell_htlcs(&self.logger) { - Ok((commitment_opt, holding_cell_failed_htlcs)) => { - if !holding_cell_failed_htlcs.is_empty() { - failed_htlcs.push(( - holding_cell_failed_htlcs, - *channel_id, - chan.get_counterparty_node_id() - )); - } - if let Some((commitment_update, monitor_update)) = commitment_opt { - match self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), &monitor_update) { - ChannelMonitorUpdateStatus::Completed => { - pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: chan.get_counterparty_node_id(), - updates: commitment_update, - }); - }, - e => { - has_monitor_update = true; - let (res, close_channel) = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, channel_id, COMMITMENT_UPDATE_ONLY); - handle_errors.push((chan.get_counterparty_node_id(), res)); - if close_channel { return false; } - }, - } - } - true - }, - Err(e) => { - let (close_channel, res) = convert_chan_err!(self, e, chan, channel_id); - handle_errors.push((chan.get_counterparty_node_id(), Err(res))); - // ChannelClosed event is generated by handle_error for us - !close_channel - } + let peer_state: &mut PeerState<_> = &mut *peer_state_lock; + for (channel_id, chan) in peer_state.channel_by_id.iter_mut() { + let counterparty_node_id = chan.get_counterparty_node_id(); + let funding_txo = chan.get_funding_txo(); + let (monitor_opt, holding_cell_failed_htlcs) = + chan.maybe_free_holding_cell_htlcs(&self.logger); + if !holding_cell_failed_htlcs.is_empty() { + failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id)); } - }); + if let Some(monitor_update) = monitor_opt { + has_monitor_update = true; + + let update_res = self.chain_monitor.update_channel( + funding_txo.expect("channel is live"), monitor_update); + let update_id = monitor_update.update_id; + let channel_id: [u8; 32] = *channel_id; + let res = handle_new_monitor_update!(self, update_res, update_id, + peer_state_lock, peer_state, chan, MANUALLY_REMOVING, + peer_state.channel_by_id.remove(&channel_id)); + if res.is_err() { + handle_errors.push((counterparty_node_id, res)); + } + continue 'chan_loop; + } + } + break 'chan_loop; } } @@ -7124,8 +6971,6 @@ where // LDK versions prior to 0.0.113 do not know how to read the pending claimed payments // map. Thus, if there are no entries we skip writing a TLV for it. pending_claiming_payments = None; - } else { - debug_assert!(false, "While we have code to serialize pending_claiming_payments, the map should always be empty until a later PR"); } write_tlv_fields!(writer, { diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 3da49ca7d..fbd0e133f 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -3618,22 +3618,22 @@ fn test_simple_peer_disconnect() { _ => panic!("Unexpected event"), } match events[1] { + Event::PaymentPathSuccessful { .. } => {}, + _ => panic!("Unexpected event"), + } + match events[2] { Event::PaymentPathFailed { payment_hash, payment_failed_permanently, .. } => { assert_eq!(payment_hash, payment_hash_5); assert!(payment_failed_permanently); }, _ => panic!("Unexpected event"), } - match events[2] { + match events[3] { Event::PaymentFailed { payment_hash, .. } => { assert_eq!(payment_hash, payment_hash_5); }, _ => panic!("Unexpected event"), } - match events[3] { - Event::PaymentPathSuccessful { .. } => {}, - _ => panic!("Unexpected event"), - } } claim_payment(&nodes[0], &vec!(&nodes[1], &nodes[2]), payment_preimage_4); @@ -8186,7 +8186,7 @@ fn test_update_err_monitor_lockdown() { let mut node_0_per_peer_lock; let mut node_0_peer_state_lock; let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan_1.2); - if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { + if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { assert_eq!(watchtower.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::PermanentFailure); assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::Completed); } else { assert!(false); } @@ -8280,7 +8280,7 @@ fn test_concurrent_monitor_claim() { let mut node_0_per_peer_lock; let mut node_0_peer_state_lock; let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan_1.2); - if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { + if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { // Watchtower Alice should already have seen the block and reject the update assert_eq!(watchtower_alice.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::PermanentFailure); assert_eq!(watchtower_bob.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::Completed); From c35253d6a88e504913ddb24c0ccdd1d3cb3206ab Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 6 Feb 2023 23:03:38 +0000 Subject: [PATCH 09/11] Use new monitor persistence flow in funding_signed handling In the previous commit, we moved all our `ChannelMonitorUpdate` pipelines to use a new async path via the `handle_new_monitor_update` macro. This avoids having two message sending pathways and simply sends messages in the "monitor update completed" flow, which is shared between sync and async monitor updates. Here we reuse the new macro for handling `funding_signed` messages when doing an initial `ChannelMonitor` persistence. This provides a similar benefit, simplifying the code a trivial amount, but importantly allows us to fully remove the original `handle_monitor_update_res` macro. --- lightning/src/ln/channel.rs | 8 +- lightning/src/ln/channelmanager.rs | 130 ++++++----------------------- 2 files changed, 31 insertions(+), 107 deletions(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 53e8201ff..5ae6f81a6 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -2356,9 +2356,9 @@ impl Channel { /// If this call is successful, broadcast the funding transaction (and not before!) pub fn funding_signed( &mut self, msg: &msgs::FundingSigned, best_block: BestBlock, signer_provider: &SP, logger: &L - ) -> Result<(ChannelMonitor<::Signer>, Transaction, Option), ChannelError> + ) -> Result, ChannelError> where - SP::Target: SignerProvider, + SP::Target: SignerProvider, L::Target: Logger { if !self.is_outbound() { @@ -2431,7 +2431,9 @@ impl Channel { log_info!(logger, "Received funding_signed from peer for channel {}", log_bytes!(self.channel_id())); - Ok((channel_monitor, self.funding_transaction.as_ref().cloned().unwrap(), self.check_get_channel_ready(0))) + let need_channel_ready = self.check_get_channel_ready(0).is_some(); + self.monitor_updating_paused(false, false, need_channel_ready, Vec::new(), Vec::new(), Vec::new()); + Ok(channel_monitor) } /// Handles a channel_ready message from our peer. If we've already sent our channel_ready diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 675122cfb..2071fd51b 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1357,69 +1357,6 @@ macro_rules! remove_channel { } } -macro_rules! handle_monitor_update_res { - ($self: ident, $err: expr, $chan: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $resend_channel_ready: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr, $chan_id: expr) => { - match $err { - ChannelMonitorUpdateStatus::PermanentFailure => { - log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure", log_bytes!($chan_id[..])); - update_maps_on_chan_removal!($self, $chan); - let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure".to_owned(), *$chan_id, $chan.get_user_id(), - $chan.force_shutdown(false), $self.get_channel_update_for_broadcast(&$chan).ok() )); - (res, true) - }, - ChannelMonitorUpdateStatus::InProgress => { - log_info!($self.logger, "Disabling channel {} due to monitor update in progress. On restore will send {} and process {} forwards, {} fails, and {} fulfill finalizations", - log_bytes!($chan_id[..]), - if $resend_commitment && $resend_raa { - match $action_type { - RAACommitmentOrder::CommitmentFirst => { "commitment then RAA" }, - RAACommitmentOrder::RevokeAndACKFirst => { "RAA then commitment" }, - } - } else if $resend_commitment { "commitment" } - else if $resend_raa { "RAA" } - else { "nothing" }, - (&$failed_forwards as &Vec<(PendingHTLCInfo, u64)>).len(), - (&$failed_fails as &Vec<(HTLCSource, PaymentHash, HTLCFailReason)>).len(), - (&$failed_finalized_fulfills as &Vec).len()); - if !$resend_commitment { - debug_assert!($action_type == RAACommitmentOrder::RevokeAndACKFirst || !$resend_raa); - } - if !$resend_raa { - debug_assert!($action_type == RAACommitmentOrder::CommitmentFirst || !$resend_commitment); - } - $chan.monitor_updating_paused($resend_raa, $resend_commitment, $resend_channel_ready, $failed_forwards, $failed_fails, $failed_finalized_fulfills); - (Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore("Failed to update ChannelMonitor".to_owned()), *$chan_id)), false) - }, - ChannelMonitorUpdateStatus::Completed => { - (Ok(()), false) - }, - } - }; - ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $resend_channel_ready: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr) => { { - let (res, drop) = handle_monitor_update_res!($self, $err, $entry.get_mut(), $action_type, $resend_raa, $resend_commitment, $resend_channel_ready, $failed_forwards, $failed_fails, $failed_finalized_fulfills, $entry.key()); - if drop { - $entry.remove_entry(); - } - res - } }; - ($self: ident, $err: expr, $entry: expr, $action_type: path, $chan_id: expr, COMMITMENT_UPDATE_ONLY) => { { - debug_assert!($action_type == RAACommitmentOrder::CommitmentFirst); - handle_monitor_update_res!($self, $err, $entry, $action_type, false, true, false, Vec::new(), Vec::new(), Vec::new(), $chan_id) - } }; - ($self: ident, $err: expr, $entry: expr, $action_type: path, $chan_id: expr, NO_UPDATE) => { - handle_monitor_update_res!($self, $err, $entry, $action_type, false, false, false, Vec::new(), Vec::new(), Vec::new(), $chan_id) - }; - ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_channel_ready: expr, OPTIONALLY_RESEND_FUNDING_LOCKED) => { - handle_monitor_update_res!($self, $err, $entry, $action_type, false, false, $resend_channel_ready, Vec::new(), Vec::new(), Vec::new()) - }; - ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr) => { - handle_monitor_update_res!($self, $err, $entry, $action_type, $resend_raa, $resend_commitment, false, Vec::new(), Vec::new(), Vec::new()) - }; - ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => { - handle_monitor_update_res!($self, $err, $entry, $action_type, $resend_raa, $resend_commitment, false, $failed_forwards, $failed_fails, Vec::new()) - }; -} - macro_rules! send_channel_ready { ($self: ident, $pending_msg_events: expr, $channel: expr, $channel_ready_msg: expr) => {{ $pending_msg_events.push(events::MessageSendEvent::SendChannelReady { @@ -4492,49 +4429,34 @@ where } fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> { - let funding_tx = { - let best_block = *self.best_block.read().unwrap(); - let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex = per_peer_state.get(counterparty_node_id) - .ok_or_else(|| { - debug_assert!(false); - MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) - })?; + let best_block = *self.best_block.read().unwrap(); + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; - match peer_state.channel_by_id.entry(msg.channel_id) { - hash_map::Entry::Occupied(mut chan) => { - let (monitor, funding_tx, channel_ready) = match chan.get_mut().funding_signed(&msg, best_block, &self.signer_provider, &self.logger) { - Ok(update) => update, - Err(e) => try_chan_entry!(self, Err(e), chan), - }; - match self.chain_monitor.watch_channel(chan.get().get_funding_txo().unwrap(), monitor) { - ChannelMonitorUpdateStatus::Completed => {}, - e => { - let mut res = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::RevokeAndACKFirst, channel_ready.is_some(), OPTIONALLY_RESEND_FUNDING_LOCKED); - if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res { - // We weren't able to watch the channel to begin with, so no updates should be made on - // it. Previously, full_stack_target found an (unreachable) panic when the - // monitor update contained within `shutdown_finish` was applied. - if let Some((ref mut shutdown_finish, _)) = shutdown_finish { - shutdown_finish.0.take(); - } - } - return res - }, + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.entry(msg.channel_id) { + hash_map::Entry::Occupied(mut chan) => { + let monitor = try_chan_entry!(self, + chan.get_mut().funding_signed(&msg, best_block, &self.signer_provider, &self.logger), chan); + let update_res = self.chain_monitor.watch_channel(chan.get().get_funding_txo().unwrap(), monitor); + let mut res = handle_new_monitor_update!(self, update_res, 0, peer_state_lock, peer_state, chan); + if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res { + // We weren't able to watch the channel to begin with, so no updates should be made on + // it. Previously, full_stack_target found an (unreachable) panic when the + // monitor update contained within `shutdown_finish` was applied. + if let Some((ref mut shutdown_finish, _)) = shutdown_finish { + shutdown_finish.0.take(); } - if let Some(msg) = channel_ready { - send_channel_ready!(self, peer_state.pending_msg_events, chan.get(), msg); - } - funding_tx - }, - hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) - } - }; - log_info!(self.logger, "Broadcasting funding transaction with txid {}", funding_tx.txid()); - self.tx_broadcaster.broadcast_transaction(&funding_tx); - Ok(()) + } + res + }, + hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) + } } fn internal_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) -> Result<(), MsgHandleErrInternal> { From 685b08d8c13c62a3a4c4cf283c3d86b96fd3de23 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sat, 3 Dec 2022 03:15:04 +0000 Subject: [PATCH 10/11] Use the new monitor persistence flow for `funding_created` handling Building on the previous commits, this finishes our transition to doing all message-sending in the monitor update completion pipeline, unifying our immediate- and async- `ChannelMonitor` update and persistence flows. --- lightning/src/ln/channel.rs | 19 ++++--- lightning/src/ln/channelmanager.rs | 75 ++++++++++++---------------- lightning/src/ln/functional_tests.rs | 6 +-- 3 files changed, 47 insertions(+), 53 deletions(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 5ae6f81a6..515e10264 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -2268,9 +2268,9 @@ impl Channel { pub fn funding_created( &mut self, msg: &msgs::FundingCreated, best_block: BestBlock, signer_provider: &SP, logger: &L - ) -> Result<(msgs::FundingSigned, ChannelMonitor<::Signer>, Option), ChannelError> + ) -> Result<(msgs::FundingSigned, ChannelMonitor), ChannelError> where - SP::Target: SignerProvider, + SP::Target: SignerProvider, L::Target: Logger { if self.is_outbound() { @@ -2346,10 +2346,13 @@ impl Channel { log_info!(logger, "Generated funding_signed for peer for channel {}", log_bytes!(self.channel_id())); + let need_channel_ready = self.check_get_channel_ready(0).is_some(); + self.monitor_updating_paused(false, false, need_channel_ready, Vec::new(), Vec::new(), Vec::new()); + Ok((msgs::FundingSigned { channel_id: self.channel_id, signature - }, channel_monitor, self.check_get_channel_ready(0))) + }, channel_monitor)) } /// Handles a funding_signed message from the remote end. @@ -3740,15 +3743,17 @@ impl Channel { } /// Indicates that a ChannelMonitor update is in progress and has not yet been fully persisted. - /// This must be called immediately after the [`chain::Watch`] call which returned - /// [`ChannelMonitorUpdateStatus::InProgress`]. + /// This must be called before we return the [`ChannelMonitorUpdate`] back to the + /// [`ChannelManager`], which will call [`Self::monitor_updating_restored`] once the monitor + /// update completes (potentially immediately). /// The messages which were generated with the monitor update must *not* have been sent to the /// remote end, and must instead have been dropped. They will be regenerated when /// [`Self::monitor_updating_restored`] is called. /// + /// [`ChannelManager`]: super::channelmanager::ChannelManager /// [`chain::Watch`]: crate::chain::Watch /// [`ChannelMonitorUpdateStatus::InProgress`]: crate::chain::ChannelMonitorUpdateStatus::InProgress - pub fn monitor_updating_paused(&mut self, resend_raa: bool, resend_commitment: bool, + fn monitor_updating_paused(&mut self, resend_raa: bool, resend_commitment: bool, resend_channel_ready: bool, mut pending_forwards: Vec<(PendingHTLCInfo, u64)>, mut pending_fails: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, mut pending_finalized_claimed_htlcs: Vec @@ -7189,7 +7194,7 @@ mod tests { }]}; let funding_outpoint = OutPoint{ txid: tx.txid(), index: 0 }; let funding_created_msg = node_a_chan.get_outbound_funding_created(tx.clone(), funding_outpoint, &&logger).unwrap(); - let (funding_signed_msg, _, _) = node_b_chan.funding_created(&funding_created_msg, best_block, &&keys_provider, &&logger).unwrap(); + let (funding_signed_msg, _) = node_b_chan.funding_created(&funding_created_msg, best_block, &&keys_provider, &&logger).unwrap(); // Node B --> Node A: funding signed let _ = node_a_chan.funding_signed(&funding_signed_msg, best_block, &&keys_provider, &&logger); diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 2071fd51b..bdfd29530 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -4352,60 +4352,31 @@ where } fn internal_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), MsgHandleErrInternal> { + let best_block = *self.best_block.read().unwrap(); + let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { debug_assert!(false); MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id) })?; - let ((funding_msg, monitor, mut channel_ready), mut chan) = { - let best_block = *self.best_block.read().unwrap(); - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; + + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let ((funding_msg, monitor), chan) = match peer_state.channel_by_id.entry(msg.temporary_channel_id) { hash_map::Entry::Occupied(mut chan) => { (try_chan_entry!(self, chan.get_mut().funding_created(msg, best_block, &self.signer_provider, &self.logger), chan), chan.remove()) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id)) - } - }; - // Because we have exclusive ownership of the channel here we can release the peer_state - // lock before watch_channel - match self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor) { - ChannelMonitorUpdateStatus::Completed => {}, - ChannelMonitorUpdateStatus::PermanentFailure => { - // Note that we reply with the new channel_id in error messages if we gave up on the - // channel, not the temporary_channel_id. This is compatible with ourselves, but the - // spec is somewhat ambiguous here. Not a huge deal since we'll send error messages for - // any messages referencing a previously-closed channel anyway. - // We do not propagate the monitor update to the user as it would be for a monitor - // that we didn't manage to store (and that we don't care about - we don't respond - // with the funding_signed so the channel can never go on chain). - let (_monitor_update, failed_htlcs) = chan.force_shutdown(false); - assert!(failed_htlcs.is_empty()); - return Err(MsgHandleErrInternal::send_err_msg_no_close("ChannelMonitor storage failure".to_owned(), funding_msg.channel_id)); - }, - ChannelMonitorUpdateStatus::InProgress => { - // There's no problem signing a counterparty's funding transaction if our monitor - // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't - // accepted payment from yet. We do, however, need to wait to send our channel_ready - // until we have persisted our monitor. - chan.monitor_updating_paused(false, false, channel_ready.is_some(), Vec::new(), Vec::new(), Vec::new()); - channel_ready = None; // Don't send the channel_ready now - }, - } - // It's safe to unwrap as we've held the `per_peer_state` read lock since checking that the - // peer exists, despite the inner PeerState potentially having no channels after removing - // the channel above. - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; + }; + match peer_state.channel_by_id.entry(funding_msg.channel_id) { hash_map::Entry::Occupied(_) => { - return Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id".to_owned(), funding_msg.channel_id)) + Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id".to_owned(), funding_msg.channel_id)) }, hash_map::Entry::Vacant(e) => { - let mut id_to_peer = self.id_to_peer.lock().unwrap(); - match id_to_peer.entry(chan.channel_id()) { + match self.id_to_peer.lock().unwrap().entry(chan.channel_id()) { hash_map::Entry::Occupied(_) => { return Err(MsgHandleErrInternal::send_err_msg_no_close( "The funding_created message had the same funding_txid as an existing channel - funding is not possible".to_owned(), @@ -4415,17 +4386,35 @@ where i_e.insert(chan.get_counterparty_node_id()); } } + + // There's no problem signing a counterparty's funding transaction if our monitor + // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't + // accepted payment from yet. We do, however, need to wait to send our channel_ready + // until we have persisted our monitor. + let new_channel_id = funding_msg.channel_id; peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned { node_id: counterparty_node_id.clone(), msg: funding_msg, }); - if let Some(msg) = channel_ready { - send_channel_ready!(self, peer_state.pending_msg_events, chan, msg); + + let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor); + + let chan = e.insert(chan); + let mut res = handle_new_monitor_update!(self, monitor_res, 0, peer_state_lock, peer_state, chan, MANUALLY_REMOVING, { peer_state.channel_by_id.remove(&new_channel_id) }); + + // Note that we reply with the new channel_id in error messages if we gave up on the + // channel, not the temporary_channel_id. This is compatible with ourselves, but the + // spec is somewhat ambiguous here. Not a huge deal since we'll send error messages for + // any messages referencing a previously-closed channel anyway. + // We do not propagate the monitor update to the user as it would be for a monitor + // that we didn't manage to store (and that we don't care about - we don't respond + // with the funding_signed so the channel can never go on chain). + if let Err(MsgHandleErrInternal { shutdown_finish: Some((res, _)), .. }) = &mut res { + res.0 = None; } - e.insert(chan); + res } } - Ok(()) } fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> { diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index fbd0e133f..aa258b571 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -8736,9 +8736,9 @@ fn test_duplicate_chan_id() { }; check_added_monitors!(nodes[0], 0); nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created); - // At this point we'll try to add a duplicate channel monitor, which will be rejected, but - // still needs to be cleared here. - check_added_monitors!(nodes[1], 1); + // At this point we'll look up if the channel_id is present and immediately fail the channel + // without trying to persist the `ChannelMonitor`. + check_added_monitors!(nodes[1], 0); // ...still, nodes[1] will reject the duplicate channel. { From 2adb8eeb49ad2c7870984afdc49ed3d292113905 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 19 Feb 2023 00:13:51 +0000 Subject: [PATCH 11/11] Don't generate a `ChannelMonitorUpdate` for closed chans on shutdown The `Channel::get_shutdown` docs are very clear - if the channel jumps to `Shutdown` as a result of not being funded when we go to initiate shutdown we should not generate a `ChannelMonitorUpdate` as there's no need to bother with the shutdown script - we're force-closing anyway. However, this wasn't actually implemented, potentially causing a spurious monitor update for no reason. --- lightning/src/ln/channel.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 515e10264..e3641a620 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -5965,9 +5965,16 @@ impl Channel { return Err(APIError::ChannelUnavailable{err: "Cannot begin shutdown while peer is disconnected or we're waiting on a monitor update, maybe force-close instead?".to_owned()}); } + // If we haven't funded the channel yet, we don't need to bother ensuring the shutdown + // script is set, we just force-close and call it a day. + let mut chan_closed = false; + if self.channel_state < ChannelState::FundingSent as u32 { + chan_closed = true; + } + let update_shutdown_script = match self.shutdown_scriptpubkey { Some(_) => false, - None => { + None if !chan_closed => { let shutdown_scriptpubkey = signer_provider.get_shutdown_scriptpubkey(); if !shutdown_scriptpubkey.is_compatible(their_features) { return Err(APIError::IncompatibleShutdownScript { script: shutdown_scriptpubkey.clone() }); @@ -5975,6 +5982,7 @@ impl Channel { self.shutdown_scriptpubkey = Some(shutdown_scriptpubkey); true }, + None => false, }; // From here on out, we may not fail!