From 1433e9ee7bdbedd76f04d2ec79f5afc6a70674da Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 18 Jun 2023 21:18:03 +0000 Subject: [PATCH 1/9] Return owned `ChannelMonitorUpdate`s from `Channel` In the coming commits we'll move to storing in-flight `ChannelMonitorUpdate`s in the `ChannelManager` rather in the `Channel` (which will then only retain `ChannelMonitorUpdate`s which have not yet been released/are blocked. This will simplify handling of pending `ChannelMonitorUpdate` after a channel has closed by not having to move them into the `ChannelManager`. --- lightning/src/ln/channel.rs | 38 +++++++++++++----------------- lightning/src/ln/channelmanager.rs | 16 ++++++------- 2 files changed, 25 insertions(+), 29 deletions(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 388ef6a8b..df0ebcbd7 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -488,13 +488,13 @@ enum UpdateFulfillFetch { } /// The return type of get_update_fulfill_htlc_and_commit. -pub enum UpdateFulfillCommitFetch<'a> { +pub enum UpdateFulfillCommitFetch { /// Indicates the HTLC fulfill is new, and either generated an update_fulfill message, placed /// it in the holding cell, or re-generated the update_fulfill message after the same claim was /// previously placed in the holding cell (and has since been removed). NewClaim { /// The ChannelMonitorUpdate which places the new payment preimage in the channel monitor - monitor_update: &'a ChannelMonitorUpdate, + monitor_update: ChannelMonitorUpdate, /// The value of the HTLC which was claimed, in msat. htlc_value_msat: u64, }, @@ -2305,8 +2305,8 @@ impl Channel { }; self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new()); UpdateFulfillCommitFetch::NewClaim { - monitor_update: &self.context.pending_monitor_updates.get(unblocked_update_pos) - .expect("We just pushed the monitor update").update, + monitor_update: self.context.pending_monitor_updates.get(unblocked_update_pos) + .expect("We just pushed the monitor update").update.clone(), htlc_value_msat, } }, @@ -2798,7 +2798,7 @@ impl Channel { Ok(()) } - pub fn commitment_signed(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result, ChannelError> + pub fn commitment_signed(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result, ChannelError> where L::Target: Logger { if (self.context.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) { @@ -3022,7 +3022,7 @@ impl Channel { /// Public version of the below, checking relevant preconditions first. /// If we're not in a state where freeing the holding cell makes sense, this is a no-op and /// returns `(None, Vec::new())`. - pub fn maybe_free_holding_cell_htlcs(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger { + pub fn maybe_free_holding_cell_htlcs(&mut self, logger: &L) -> (Option, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger { if self.context.channel_state >= ChannelState::ChannelReady as u32 && (self.context.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) == 0 { self.free_holding_cell_htlcs(logger) @@ -3031,7 +3031,7 @@ impl Channel { /// Frees any pending commitment updates in the holding cell, generating the relevant messages /// for our counterparty. - fn free_holding_cell_htlcs(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger { + fn free_holding_cell_htlcs(&mut self, logger: &L) -> (Option, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger { assert_eq!(self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32, 0); if self.context.holding_cell_htlc_updates.len() != 0 || self.context.holding_cell_update_fee.is_some() { log_trace!(logger, "Freeing holding cell with {} HTLC updates{} in channel {}", self.context.holding_cell_htlc_updates.len(), @@ -3147,7 +3147,7 @@ impl Channel { /// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail, /// generating an appropriate error *after* the channel state has been updated based on the /// revoke_and_ack message. - pub fn revoke_and_ack(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, Option<&ChannelMonitorUpdate>), ChannelError> + pub fn revoke_and_ack(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, Option), ChannelError> where L::Target: Logger, { if (self.context.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) { @@ -4075,7 +4075,7 @@ impl Channel { pub fn shutdown( &mut self, signer_provider: &SP, their_features: &InitFeatures, msg: &msgs::Shutdown - ) -> Result<(Option, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), ChannelError> + ) -> Result<(Option, Option, Vec<(HTLCSource, PaymentHash)>), ChannelError> where SP::Target: SignerProvider { if self.context.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 { @@ -4141,9 +4141,7 @@ impl Channel { }], }; self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new()); - if self.push_blockable_mon_update(monitor_update) { - self.context.pending_monitor_updates.last().map(|upd| &upd.update) - } else { None } + self.push_ret_blockable_mon_update(monitor_update) } else { None }; let shutdown = if send_shutdown { Some(msgs::Shutdown { @@ -4440,11 +4438,11 @@ impl Channel { /// Returns the next blocked monitor update, if one exists, and a bool which indicates a /// further blocked monitor update exists after the next. - pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(&ChannelMonitorUpdate, bool)> { + pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(ChannelMonitorUpdate, bool)> { for i in 0..self.context.pending_monitor_updates.len() { if self.context.pending_monitor_updates[i].blocked { self.context.pending_monitor_updates[i].blocked = false; - return Some((&self.context.pending_monitor_updates[i].update, + return Some((self.context.pending_monitor_updates[i].update.clone(), self.context.pending_monitor_updates.len() > i + 1)); } } @@ -4465,9 +4463,9 @@ impl Channel { /// it should be immediately given to the user for persisting or `None` if it should be held as /// blocked. fn push_ret_blockable_mon_update(&mut self, update: ChannelMonitorUpdate) - -> Option<&ChannelMonitorUpdate> { + -> Option { let release_monitor = self.push_blockable_mon_update(update); - if release_monitor { self.context.pending_monitor_updates.last().map(|upd| &upd.update) } else { None } + if release_monitor { self.context.pending_monitor_updates.last().map(|upd| upd.update.clone()) } else { None } } pub fn no_monitor_updates_pending(&self) -> bool { @@ -5302,7 +5300,7 @@ impl Channel { pub fn send_htlc_and_commit( &mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, skimmed_fee_msat: Option, logger: &L - ) -> Result, ChannelError> where L::Target: Logger { + ) -> Result, ChannelError> where L::Target: Logger { let send_res = self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, false, skimmed_fee_msat, logger); if let Err(e) = &send_res { if let ChannelError::Ignore(_) = e {} else { debug_assert!(false, "Sending cannot trigger channel failure"); } } @@ -5336,7 +5334,7 @@ impl Channel { /// [`ChannelMonitorUpdate`] will be returned). pub fn get_shutdown(&mut self, signer_provider: &SP, their_features: &InitFeatures, target_feerate_sats_per_kw: Option, override_shutdown_script: Option) - -> Result<(msgs::Shutdown, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError> + -> Result<(msgs::Shutdown, Option, Vec<(HTLCSource, PaymentHash)>), APIError> where SP::Target: SignerProvider { for htlc in self.context.pending_outbound_htlcs.iter() { if let OutboundHTLCState::LocalAnnounced(_) = htlc.state { @@ -5407,9 +5405,7 @@ impl Channel { }], }; self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new()); - if self.push_blockable_mon_update(monitor_update) { - self.context.pending_monitor_updates.last().map(|upd| &upd.update) - } else { None } + self.push_ret_blockable_mon_update(monitor_update) } else { None }; let shutdown = msgs::Shutdown { channel_id: self.context.channel_id, diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index fcc672a86..8e5bc1a39 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -2310,7 +2310,7 @@ where // Update the monitor with the shutdown script if necessary. if let Some(monitor_update) = monitor_update_opt.take() { let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update); + let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), &monitor_update); break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan_entry); } @@ -3036,7 +3036,7 @@ where match break_chan_entry!(self, send_res, chan) { Some(monitor_update) => { let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(funding_txo, monitor_update); + let update_res = self.chain_monitor.update_channel(funding_txo, &monitor_update); if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan) { break Err(e); } @@ -4678,7 +4678,7 @@ where peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); } let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, monitor_update); + let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &monitor_update); let res = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan); if let Err(e) = res { @@ -5347,7 +5347,7 @@ where // Update the monitor with the shutdown script if necessary. if let Some(monitor_update) = monitor_update_opt { let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update); + let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), &monitor_update); break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan_entry); } break Ok(()); @@ -5544,7 +5544,7 @@ where let funding_txo = chan.get().context.get_funding_txo(); let monitor_update_opt = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan); if let Some(monitor_update) = monitor_update_opt { - let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); + let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), &monitor_update); let update_id = monitor_update.update_id; handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan) @@ -5683,7 +5683,7 @@ where let funding_txo = chan.get().context.get_funding_txo(); let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan); let res = if let Some(monitor_update) = monitor_update_opt { - let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); + let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), &monitor_update); let update_id = monitor_update.update_id; handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan) @@ -5962,7 +5962,7 @@ where has_monitor_update = true; let update_res = self.chain_monitor.update_channel( - funding_txo.expect("channel is live"), monitor_update); + funding_txo.expect("channel is live"), &monitor_update); let update_id = monitor_update.update_id; let channel_id: [u8; 32] = *channel_id; let res = handle_new_monitor_update!(self, update_res, update_id, @@ -6307,7 +6307,7 @@ where if let Some((monitor_update, further_update_exists)) = chan.get_mut().unblock_next_blocked_monitor_update() { log_debug!(self.logger, "Unlocking monitor updating for channel {} and updating monitor", log_bytes!(&channel_funding_outpoint.to_channel_id()[..])); - let update_res = self.chain_monitor.update_channel(channel_funding_outpoint, monitor_update); + let update_res = self.chain_monitor.update_channel(channel_funding_outpoint, &monitor_update); let update_id = monitor_update.update_id; if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, peer_state_lck, peer_state, per_peer_state, chan) From c843f68d5b3361b47b60bdf5dd4dbdbe06e6fdeb Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 18 Jun 2023 21:55:30 +0000 Subject: [PATCH 2/9] Move most `handle_new_monitor_update` calls to pass the update Most of the calls to the `handle_new_monitor_update` macro had the exact same pattern - calling `update_monitor` followed by the macro. Given that common pattern will grow to first pushing the new monitor onto an in-flight set and then calling `update_monitor` unifying the pattern into a single macro now avoids more code churn in the coming commits. --- lightning/src/ln/channelmanager.rs | 51 +++++++++++++----------------- 1 file changed, 22 insertions(+), 29 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 8e5bc1a39..9450bf842 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1860,7 +1860,7 @@ macro_rules! handle_monitor_update_completion { } macro_rules! handle_new_monitor_update { - ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { { + ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING_ALREADY_APPLIED, $remove: expr) => { { // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in // any case so that it won't deadlock. debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread); @@ -1893,8 +1893,15 @@ macro_rules! handle_new_monitor_update { }, } } }; - ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => { - handle_new_monitor_update!($self, $update_res, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry()) + ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr, ALREADY_APPLIED) => { + handle_new_monitor_update!($self, $update_res, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING_ALREADY_APPLIED, $chan_entry.remove_entry()) + }; + ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { { + let update_res = $self.chain_monitor.update_channel($funding_txo, &$update); + handle_new_monitor_update!($self, update_res, $update.update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan, MANUALLY_REMOVING_ALREADY_APPLIED, $remove) + } }; + ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => { + handle_new_monitor_update!($self, $funding_txo, $update, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry()) } } @@ -2309,9 +2316,7 @@ where // Update the monitor with the shutdown script if necessary. if let Some(monitor_update) = monitor_update_opt.take() { - let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), &monitor_update); - break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan_entry); + break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, peer_state_lock, peer_state, per_peer_state, chan_entry); } if chan_entry.get().is_shutdown() { @@ -3037,7 +3042,7 @@ where Some(monitor_update) => { let update_id = monitor_update.update_id; let update_res = self.chain_monitor.update_channel(funding_txo, &monitor_update); - if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan) { + if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan, ALREADY_APPLIED) { break Err(e); } if update_res == ChannelMonitorUpdateStatus::InProgress { @@ -4091,7 +4096,7 @@ where let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(funding_txo.to_channel_id()) { hash_map::Entry::Occupied(mut chan) => { - handle_new_monitor_update!(self, update_res, update.update_id, peer_state_lock, peer_state, per_peer_state, chan) + handle_new_monitor_update!(self, update_res, update.update_id, peer_state_lock, peer_state, per_peer_state, chan, ALREADY_APPLIED) }, hash_map::Entry::Vacant(_) => Ok(()), } @@ -4677,9 +4682,7 @@ where log_bytes!(chan_id), action); peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); } - let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &monitor_update); - let res = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, + let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock, peer_state, per_peer_state, chan); if let Err(e) = res { // TODO: This is a *critical* error - we probably updated the outbound edge @@ -5216,7 +5219,8 @@ where let chan = e.insert(chan); let mut res = handle_new_monitor_update!(self, monitor_res, 0, peer_state_lock, peer_state, - per_peer_state, chan, MANUALLY_REMOVING, { peer_state.channel_by_id.remove(&new_channel_id) }); + per_peer_state, chan, MANUALLY_REMOVING_ALREADY_APPLIED, + { peer_state.channel_by_id.remove(&new_channel_id) }); // Note that we reply with the new channel_id in error messages if we gave up on the // channel, not the temporary_channel_id. This is compatible with ourselves, but the @@ -5249,7 +5253,7 @@ where let monitor = try_chan_entry!(self, chan.get_mut().funding_signed(&msg, best_block, &self.signer_provider, &self.logger), chan); let update_res = self.chain_monitor.watch_channel(chan.get().context.get_funding_txo().unwrap(), monitor); - let mut res = handle_new_monitor_update!(self, update_res, 0, peer_state_lock, peer_state, per_peer_state, chan); + let mut res = handle_new_monitor_update!(self, update_res, 0, peer_state_lock, peer_state, per_peer_state, chan, ALREADY_APPLIED); if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res { // We weren't able to watch the channel to begin with, so no updates should be made on // it. Previously, full_stack_target found an (unreachable) panic when the @@ -5346,9 +5350,7 @@ where // Update the monitor with the shutdown script if necessary. if let Some(monitor_update) = monitor_update_opt { - let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), &monitor_update); - break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan_entry); + break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, peer_state_lock, peer_state, per_peer_state, chan_entry); } break Ok(()); }, @@ -5544,9 +5546,7 @@ where let funding_txo = chan.get().context.get_funding_txo(); let monitor_update_opt = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan); if let Some(monitor_update) = monitor_update_opt { - let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), &monitor_update); - let update_id = monitor_update.update_id; - handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, + handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock, peer_state, per_peer_state, chan) } else { Ok(()) } }, @@ -5683,9 +5683,7 @@ where let funding_txo = chan.get().context.get_funding_txo(); let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan); let res = if let Some(monitor_update) = monitor_update_opt { - let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), &monitor_update); - let update_id = monitor_update.update_id; - handle_new_monitor_update!(self, update_res, update_id, + handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock, peer_state, per_peer_state, chan) } else { Ok(()) }; (htlcs_to_fail, res) @@ -5961,11 +5959,8 @@ where if let Some(monitor_update) = monitor_opt { has_monitor_update = true; - let update_res = self.chain_monitor.update_channel( - funding_txo.expect("channel is live"), &monitor_update); - let update_id = monitor_update.update_id; let channel_id: [u8; 32] = *channel_id; - let res = handle_new_monitor_update!(self, update_res, update_id, + let res = handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock, peer_state, per_peer_state, chan, MANUALLY_REMOVING, peer_state.channel_by_id.remove(&channel_id)); if res.is_err() { @@ -6307,9 +6302,7 @@ where if let Some((monitor_update, further_update_exists)) = chan.get_mut().unblock_next_blocked_monitor_update() { log_debug!(self.logger, "Unlocking monitor updating for channel {} and updating monitor", log_bytes!(&channel_funding_outpoint.to_channel_id()[..])); - let update_res = self.chain_monitor.update_channel(channel_funding_outpoint, &monitor_update); - let update_id = monitor_update.update_id; - if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, + if let Err(e) = handle_new_monitor_update!(self, channel_funding_outpoint, monitor_update, peer_state_lck, peer_state, per_peer_state, chan) { errors.push((e, counterparty_node_id)); From 5e528ff6bb141ea1044114128406b6a6b194c4b1 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 18 Jun 2023 23:18:34 +0000 Subject: [PATCH 3/9] Drop the now-unused `push_blockable_mon_update` --- lightning/src/ln/channel.rs | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index df0ebcbd7..26b70f286 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -4449,22 +4449,14 @@ impl Channel { None } - /// Pushes a new monitor update into our monitor update queue, returning whether it should be - /// immediately given to the user for persisting or if it should be held as blocked. - fn push_blockable_mon_update(&mut self, update: ChannelMonitorUpdate) -> bool { - let release_monitor = self.context.pending_monitor_updates.iter().all(|upd| !upd.blocked); - self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate { - update, blocked: !release_monitor - }); - release_monitor - } - - /// Pushes a new monitor update into our monitor update queue, returning a reference to it if - /// it should be immediately given to the user for persisting or `None` if it should be held as - /// blocked. + /// Pushes a new monitor update into our monitor update queue, returning it if it should be + /// immediately given to the user for persisting or `None` if it should be held as blocked. fn push_ret_blockable_mon_update(&mut self, update: ChannelMonitorUpdate) -> Option { - let release_monitor = self.push_blockable_mon_update(update); + let release_monitor = self.context.pending_monitor_updates.iter().all(|upd| !upd.blocked); + self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate { + update, blocked: !release_monitor, + }); if release_monitor { self.context.pending_monitor_updates.last().map(|upd| upd.update.clone()) } else { None } } From 1c7b6925a2c3e078228091fd692bee72c75ddcd8 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 18 Jun 2023 23:56:16 +0000 Subject: [PATCH 4/9] Simplify cases in `handle_new_monitor_update` macro By giving up on a tiny bit of parallelism and tweaking the return types, we can make the `handle_new_monitor_update` macro a bit clearer - now the only cases where its called after a monitor was updated was when the monitor was initially committed. --- lightning/src/ln/channelmanager.rs | 68 ++++++++++++++++-------------- 1 file changed, 37 insertions(+), 31 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 9450bf842..f51de289b 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1860,7 +1860,7 @@ macro_rules! handle_monitor_update_completion { } macro_rules! handle_new_monitor_update { - ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING_ALREADY_APPLIED, $remove: expr) => { { + ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING_INITIAL_MONITOR, $remove: expr) => { { // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in // any case so that it won't deadlock. debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread); @@ -1871,13 +1871,13 @@ macro_rules! handle_new_monitor_update { ChannelMonitorUpdateStatus::InProgress => { log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.", log_bytes!($chan.context.channel_id()[..])); - Ok(()) + Ok(false) }, ChannelMonitorUpdateStatus::PermanentFailure => { log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure", log_bytes!($chan.context.channel_id()[..])); update_maps_on_chan_removal!($self, &$chan.context); - let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown( + let res = Err(MsgHandleErrInternal::from_finish_shutdown( "ChannelMonitor storage failure".to_owned(), $chan.context.channel_id(), $chan.context.get_user_id(), $chan.context.force_shutdown(false), $self.get_channel_update_for_broadcast(&$chan).ok())); @@ -1889,16 +1889,16 @@ macro_rules! handle_new_monitor_update { if $chan.no_monitor_updates_pending() { handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan); } - Ok(()) + Ok(true) }, } } }; - ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr, ALREADY_APPLIED) => { - handle_new_monitor_update!($self, $update_res, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING_ALREADY_APPLIED, $chan_entry.remove_entry()) + ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr, INITIAL_MONITOR) => { + handle_new_monitor_update!($self, $update_res, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING_INITIAL_MONITOR, $chan_entry.remove_entry()) }; ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { { let update_res = $self.chain_monitor.update_channel($funding_txo, &$update); - handle_new_monitor_update!($self, update_res, $update.update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan, MANUALLY_REMOVING_ALREADY_APPLIED, $remove) + handle_new_monitor_update!($self, update_res, $update.update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan, MANUALLY_REMOVING_INITIAL_MONITOR, $remove) } }; ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => { handle_new_monitor_update!($self, $funding_txo, $update, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry()) @@ -2316,7 +2316,8 @@ where // Update the monitor with the shutdown script if necessary. if let Some(monitor_update) = monitor_update_opt.take() { - break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, peer_state_lock, peer_state, per_peer_state, chan_entry); + break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, + peer_state_lock, peer_state, per_peer_state, chan_entry).map(|_| ()); } if chan_entry.get().is_shutdown() { @@ -3040,19 +3041,18 @@ where }, onion_packet, None, &self.logger); match break_chan_entry!(self, send_res, chan) { Some(monitor_update) => { - let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(funding_txo, &monitor_update); - if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan, ALREADY_APPLIED) { - break Err(e); - } - if update_res == ChannelMonitorUpdateStatus::InProgress { - // Note that MonitorUpdateInProgress here indicates (per function - // docs) that we will resend the commitment update once monitor - // updating completes. Therefore, we must return an error - // indicating that it is unsafe to retry the payment wholesale, - // which we do in the send_payment check for - // MonitorUpdateInProgress, below. - return Err(APIError::MonitorUpdateInProgress); + match handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan) { + Err(e) => break Err(e), + Ok(false) => { + // Note that MonitorUpdateInProgress here indicates (per function + // docs) that we will resend the commitment update once monitor + // updating completes. Therefore, we must return an error + // indicating that it is unsafe to retry the payment wholesale, + // which we do in the send_payment check for + // MonitorUpdateInProgress, below. + return Err(APIError::MonitorUpdateInProgress); + }, + Ok(true) => {}, } }, None => { }, @@ -4087,8 +4087,7 @@ where let _ = self.chain_monitor.update_channel(funding_txo, &update); }, BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, update } => { - let update_res = self.chain_monitor.update_channel(funding_txo, &update); - + let mut updated_chan = false; let res = { let per_peer_state = self.per_peer_state.read().unwrap(); if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) { @@ -4096,12 +4095,18 @@ where let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(funding_txo.to_channel_id()) { hash_map::Entry::Occupied(mut chan) => { - handle_new_monitor_update!(self, update_res, update.update_id, peer_state_lock, peer_state, per_peer_state, chan, ALREADY_APPLIED) + updated_chan = true; + handle_new_monitor_update!(self, funding_txo, update, + peer_state_lock, peer_state, per_peer_state, chan).map(|_| ()) }, hash_map::Entry::Vacant(_) => Ok(()), } } else { Ok(()) } }; + if !updated_chan { + // TODO: Track this as in-flight even though the channel is closed. + let _ = self.chain_monitor.update_channel(funding_txo, &update); + } // TODO: If this channel has since closed, we're likely providing a payment // preimage update, which we must ensure is durable! We currently don't, // however, ensure that. @@ -5219,7 +5224,7 @@ where let chan = e.insert(chan); let mut res = handle_new_monitor_update!(self, monitor_res, 0, peer_state_lock, peer_state, - per_peer_state, chan, MANUALLY_REMOVING_ALREADY_APPLIED, + per_peer_state, chan, MANUALLY_REMOVING_INITIAL_MONITOR, { peer_state.channel_by_id.remove(&new_channel_id) }); // Note that we reply with the new channel_id in error messages if we gave up on the @@ -5232,7 +5237,7 @@ where if let Err(MsgHandleErrInternal { shutdown_finish: Some((res, _)), .. }) = &mut res { res.0 = None; } - res + res.map(|_| ()) } } } @@ -5253,7 +5258,7 @@ where let monitor = try_chan_entry!(self, chan.get_mut().funding_signed(&msg, best_block, &self.signer_provider, &self.logger), chan); let update_res = self.chain_monitor.watch_channel(chan.get().context.get_funding_txo().unwrap(), monitor); - let mut res = handle_new_monitor_update!(self, update_res, 0, peer_state_lock, peer_state, per_peer_state, chan, ALREADY_APPLIED); + let mut res = handle_new_monitor_update!(self, update_res, 0, peer_state_lock, peer_state, per_peer_state, chan, INITIAL_MONITOR); if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res { // We weren't able to watch the channel to begin with, so no updates should be made on // it. Previously, full_stack_target found an (unreachable) panic when the @@ -5262,7 +5267,7 @@ where shutdown_finish.0.take(); } } - res + res.map(|_| ()) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) } @@ -5350,7 +5355,8 @@ where // Update the monitor with the shutdown script if necessary. if let Some(monitor_update) = monitor_update_opt { - break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, peer_state_lock, peer_state, per_peer_state, chan_entry); + break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, + peer_state_lock, peer_state, per_peer_state, chan_entry).map(|_| ()); } break Ok(()); }, @@ -5547,7 +5553,7 @@ where let monitor_update_opt = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan); if let Some(monitor_update) = monitor_update_opt { handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock, - peer_state, per_peer_state, chan) + peer_state, per_peer_state, chan).map(|_| ()) } else { Ok(()) } }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) @@ -5684,7 +5690,7 @@ where let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan); let res = if let Some(monitor_update) = monitor_update_opt { handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, - peer_state_lock, peer_state, per_peer_state, chan) + peer_state_lock, peer_state, per_peer_state, chan).map(|_| ()) } else { Ok(()) }; (htlcs_to_fail, res) }, From 4041f0899f86eaf6a0a4576a91918fa54026ac46 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 19 Jun 2023 06:26:39 +0000 Subject: [PATCH 5/9] Move in-flight `ChannelMonitorUpdate`s to `ChannelManager` Because `ChannelMonitorUpdate`s can be generated for a channel which is already closed, and must still be tracked through their completion, storing them in a `Channel` doesn't make sense - we'd have to have a redundant place to put them post-closure and handle both storage locations equivalently. Instead, here, we move to storing in-flight `ChannelMonitorUpdate`s to the `ChannelManager`, leaving blocked `ChannelMonitorUpdate`s in the `Channel` as they were. --- lightning/src/ln/channel.rs | 88 ++++------- lightning/src/ln/channelmanager.rs | 229 ++++++++++++++++++++++------- lightning/src/util/ser.rs | 1 + 3 files changed, 204 insertions(+), 114 deletions(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 26b70f286..d68108b31 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -2264,7 +2264,7 @@ impl Channel { } pub fn get_update_fulfill_htlc_and_commit(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger { - let release_cs_monitor = self.context.pending_monitor_updates.iter().all(|upd| !upd.blocked); + let release_cs_monitor = self.context.pending_monitor_updates.is_empty(); match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) { UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg } => { // Even if we aren't supposed to let new monitor updates with commitment state @@ -2272,26 +2272,17 @@ impl Channel { // matter what. Sadly, to push a new monitor update which flies before others // already queued, we have to insert it into the pending queue and update the // update_ids of all the following monitors. - let unblocked_update_pos = if release_cs_monitor && msg.is_some() { + if release_cs_monitor && msg.is_some() { let mut additional_update = self.build_commitment_no_status_check(logger); // build_commitment_no_status_check may bump latest_monitor_id but we want them // to be strictly increasing by one, so decrement it here. self.context.latest_monitor_update_id = monitor_update.update_id; monitor_update.updates.append(&mut additional_update.updates); - self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate { - update: monitor_update, blocked: false, - }); - self.context.pending_monitor_updates.len() - 1 } else { - let insert_pos = self.context.pending_monitor_updates.iter().position(|upd| upd.blocked) - .unwrap_or(self.context.pending_monitor_updates.len()); - let new_mon_id = self.context.pending_monitor_updates.get(insert_pos) + let new_mon_id = self.context.pending_monitor_updates.get(0) .map(|upd| upd.update.update_id).unwrap_or(monitor_update.update_id); monitor_update.update_id = new_mon_id; - self.context.pending_monitor_updates.insert(insert_pos, PendingChannelMonitorUpdate { - update: monitor_update, blocked: false, - }); - for held_update in self.context.pending_monitor_updates.iter_mut().skip(insert_pos + 1) { + for held_update in self.context.pending_monitor_updates.iter_mut() { held_update.update.update_id += 1; } if msg.is_some() { @@ -2301,14 +2292,10 @@ impl Channel { update, blocked: true, }); } - insert_pos - }; - self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new()); - UpdateFulfillCommitFetch::NewClaim { - monitor_update: self.context.pending_monitor_updates.get(unblocked_update_pos) - .expect("We just pushed the monitor update").update.clone(), - htlc_value_msat, } + + self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new()); + UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, } }, UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {}, } @@ -3349,8 +3336,7 @@ impl Channel { } match self.free_holding_cell_htlcs(logger) { - (Some(_), htlcs_to_fail) => { - let mut additional_update = self.context.pending_monitor_updates.pop().unwrap().update; + (Some(mut additional_update), htlcs_to_fail) => { // free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be // strictly increasing by one, so decrement it here. self.context.latest_monitor_update_id = monitor_update.update_id; @@ -3566,12 +3552,9 @@ impl Channel { { assert_eq!(self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32); self.context.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32); - let mut found_blocked = false; - self.context.pending_monitor_updates.retain(|upd| { - if found_blocked { debug_assert!(upd.blocked, "No mons may be unblocked after a blocked one"); } - if upd.blocked { found_blocked = true; } - upd.blocked - }); + for upd in self.context.pending_monitor_updates.iter() { + debug_assert!(upd.blocked); + } // If we're past (or at) the FundingSent stage on an outbound channel, try to // (re-)broadcast the funding transaction as we may have declined to broadcast it when we @@ -4439,48 +4422,31 @@ impl Channel { /// Returns the next blocked monitor update, if one exists, and a bool which indicates a /// further blocked monitor update exists after the next. pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(ChannelMonitorUpdate, bool)> { - for i in 0..self.context.pending_monitor_updates.len() { - if self.context.pending_monitor_updates[i].blocked { - self.context.pending_monitor_updates[i].blocked = false; - return Some((self.context.pending_monitor_updates[i].update.clone(), - self.context.pending_monitor_updates.len() > i + 1)); - } + for upd in self.context.pending_monitor_updates.iter() { + debug_assert!(upd.blocked); } - None + if self.context.pending_monitor_updates.is_empty() { return None; } + Some((self.context.pending_monitor_updates.remove(0).update, + !self.context.pending_monitor_updates.is_empty())) } /// Pushes a new monitor update into our monitor update queue, returning it if it should be /// immediately given to the user for persisting or `None` if it should be held as blocked. fn push_ret_blockable_mon_update(&mut self, update: ChannelMonitorUpdate) -> Option { - let release_monitor = self.context.pending_monitor_updates.iter().all(|upd| !upd.blocked); - self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate { - update, blocked: !release_monitor, - }); - if release_monitor { self.context.pending_monitor_updates.last().map(|upd| upd.update.clone()) } else { None } + let release_monitor = self.context.pending_monitor_updates.is_empty(); + if !release_monitor { + self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate { + update, blocked: true, + }); + None + } else { + Some(update) + } } - pub fn no_monitor_updates_pending(&self) -> bool { - self.context.pending_monitor_updates.is_empty() - } - - pub fn complete_all_mon_updates_through(&mut self, update_id: u64) { - self.context.pending_monitor_updates.retain(|upd| { - if upd.update.update_id <= update_id { - assert!(!upd.blocked, "Completed update must have flown"); - false - } else { true } - }); - } - - pub fn complete_one_mon_update(&mut self, update_id: u64) { - self.context.pending_monitor_updates.retain(|upd| upd.update.update_id != update_id); - } - - /// Returns an iterator over all unblocked monitor updates which have not yet completed. - pub fn uncompleted_unblocked_mon_updates(&self) -> impl Iterator { - self.context.pending_monitor_updates.iter() - .filter_map(|upd| if upd.blocked { None } else { Some(&upd.update) }) + pub fn blocked_monitor_updates_pending(&self) -> usize { + self.context.pending_monitor_updates.len() } /// Returns true if the channel is awaiting the persistence of the initial ChannelMonitor. diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index f51de289b..0aca388ea 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -633,6 +633,13 @@ pub(super) struct PeerState { /// Messages to send to the peer - pushed to in the same lock that they are generated in (except /// for broadcast messages, where ordering isn't as strict). pub(super) pending_msg_events: Vec, + /// Map from Channel IDs to pending [`ChannelMonitorUpdate`]s which have been passed to the + /// user but which have not yet completed. + /// + /// Note that the channel may no longer exist. For example if the channel was closed but we + /// later needed to claim an HTLC which is pending on-chain, we may generate a monitor update + /// for a missing channel. + in_flight_monitor_updates: BTreeMap>, /// Map from a specific channel to some action(s) that should be taken when all pending /// [`ChannelMonitorUpdate`]s for the channel complete updating. /// @@ -668,6 +675,7 @@ impl PeerState { return false } self.channel_by_id.is_empty() && self.monitor_update_blocked_actions.is_empty() + && self.in_flight_monitor_updates.is_empty() } // Returns a count of all channels we have with this peer, including pending channels. @@ -1860,7 +1868,7 @@ macro_rules! handle_monitor_update_completion { } macro_rules! handle_new_monitor_update { - ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING_INITIAL_MONITOR, $remove: expr) => { { + ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, _internal, $remove: expr, $completed: expr) => { { // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in // any case so that it won't deadlock. debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread); @@ -1885,20 +1893,40 @@ macro_rules! handle_new_monitor_update { res }, ChannelMonitorUpdateStatus::Completed => { - $chan.complete_one_mon_update($update_id); - if $chan.no_monitor_updates_pending() { - handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan); - } + $completed; Ok(true) }, } } }; + ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING_INITIAL_MONITOR, $remove: expr) => { + handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state, + $per_peer_state_lock, $chan, _internal, $remove, + handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan)) + }; ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr, INITIAL_MONITOR) => { handle_new_monitor_update!($self, $update_res, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING_INITIAL_MONITOR, $chan_entry.remove_entry()) }; ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { { - let update_res = $self.chain_monitor.update_channel($funding_txo, &$update); - handle_new_monitor_update!($self, update_res, $update.update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan, MANUALLY_REMOVING_INITIAL_MONITOR, $remove) + let update_id = $update.update_id; + let in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo) + .or_insert_with(Vec::new); + // During startup, we push monitor updates as background events through to here in + // order to replay updates that were in-flight when we shut down. Thus, we have to + // filter for uniqueness here. + let idx = in_flight_updates.iter().position(|upd| upd == &$update) + .unwrap_or_else(|| { + in_flight_updates.push($update); + in_flight_updates.len() - 1 + }); + let update_res = $self.chain_monitor.update_channel($funding_txo, &in_flight_updates[idx]); + handle_new_monitor_update!($self, update_res, $peer_state_lock, $peer_state, + $per_peer_state_lock, $chan, _internal, $remove, + { + let _ = in_flight_updates.remove(idx); + if in_flight_updates.is_empty() && $chan.blocked_monitor_updates_pending() == 0 { + handle_monitor_update_completion!($self, update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan); + } + }) } }; ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => { handle_new_monitor_update!($self, $funding_txo, $update, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry()) @@ -4096,7 +4124,7 @@ where match peer_state.channel_by_id.entry(funding_txo.to_channel_id()) { hash_map::Entry::Occupied(mut chan) => { updated_chan = true; - handle_new_monitor_update!(self, funding_txo, update, + handle_new_monitor_update!(self, funding_txo, update.clone(), peer_state_lock, peer_state, per_peer_state, chan).map(|_| ()) }, hash_map::Entry::Vacant(_) => Ok(()), @@ -4895,8 +4923,14 @@ where hash_map::Entry::Vacant(_) => return, } }; - log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}", - highest_applied_update_id, channel.get().context.get_latest_monitor_update_id()); + let remaining_in_flight = + if let Some(pending) = peer_state.in_flight_monitor_updates.get_mut(funding_txo) { + pending.retain(|upd| upd.update_id > highest_applied_update_id); + pending.len() + } else { 0 }; + log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}. {} pending in-flight updates.", + highest_applied_update_id, channel.get().context.get_latest_monitor_update_id(), + remaining_in_flight); if !channel.get().is_awaiting_monitor_update() || channel.get().context.get_latest_monitor_update_id() != highest_applied_update_id { return; } @@ -7029,6 +7063,7 @@ where inbound_v1_channel_by_id: HashMap::new(), latest_features: init_msg.features.clone(), pending_msg_events: Vec::new(), + in_flight_monitor_updates: BTreeMap::new(), monitor_update_blocked_actions: BTreeMap::new(), actions_blocking_raa_monitor_updates: BTreeMap::new(), is_connected: true, @@ -7860,6 +7895,16 @@ where pending_claiming_payments = None; } + let mut in_flight_monitor_updates: Option>> = None; + for ((counterparty_id, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) { + for (funding_outpoint, updates) in peer_state.in_flight_monitor_updates.iter() { + if !updates.is_empty() { + if in_flight_monitor_updates.is_none() { in_flight_monitor_updates = Some(HashMap::new()); } + in_flight_monitor_updates.as_mut().unwrap().insert((counterparty_id, funding_outpoint), updates); + } + } + } + write_tlv_fields!(writer, { (1, pending_outbound_payments_no_retry, required), (2, pending_intercepted_htlcs, option), @@ -7870,6 +7915,7 @@ where (7, self.fake_scid_rand_bytes, required), (8, if events_not_backwards_compatible { Some(&*events) } else { None }, option), (9, htlc_purposes, vec_type), + (10, in_flight_monitor_updates, option), (11, self.probing_cookie_secret, required), (13, htlc_onion_fields, optional_vec), }); @@ -8086,7 +8132,7 @@ where let mut id_to_peer = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut short_to_chan_info = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut channel_closures = VecDeque::new(); - let mut pending_background_events = Vec::new(); + let mut close_background_events = Vec::new(); for _ in 0..channel_count { let mut channel: Channel<::Signer> = Channel::read(reader, ( &args.entropy_source, &args.signer_provider, best_block_height, &provided_channel_type_features(&args.default_config) @@ -8094,17 +8140,7 @@ where let funding_txo = channel.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?; funding_txo_set.insert(funding_txo.clone()); if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) { - if channel.get_latest_complete_monitor_update_id() > monitor.get_latest_update_id() { - // If the channel is ahead of the monitor, return InvalidValue: - log_error!(args.logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!"); - log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} but the ChannelManager is at update_id {}.", - log_bytes!(channel.context.channel_id()), monitor.get_latest_update_id(), channel.get_latest_complete_monitor_update_id()); - log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); - log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); - log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds."); - log_error!(args.logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning"); - return Err(DecodeError::InvalidValue); - } else if channel.get_cur_holder_commitment_transaction_number() > monitor.get_cur_holder_commitment_number() || + if channel.get_cur_holder_commitment_transaction_number() > monitor.get_cur_holder_commitment_number() || channel.get_revoked_counterparty_commitment_transaction_number() > monitor.get_min_seen_secret() || channel.get_cur_counterparty_commitment_transaction_number() > monitor.get_cur_counterparty_commitment_number() || channel.context.get_latest_monitor_update_id() < monitor.get_latest_update_id() { @@ -8115,7 +8151,7 @@ where log_bytes!(channel.context.channel_id()), monitor.get_latest_update_id(), channel.context.get_latest_monitor_update_id()); let (monitor_update, mut new_failed_htlcs) = channel.context.force_shutdown(true); if let Some((counterparty_node_id, funding_txo, update)) = monitor_update { - pending_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + close_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, update }); } @@ -8148,7 +8184,6 @@ where log_info!(args.logger, "Successfully loaded channel {} at update_id {} against monitor at update id {}", log_bytes!(channel.context.channel_id()), channel.context.get_latest_monitor_update_id(), monitor.get_latest_update_id()); - channel.complete_all_mon_updates_through(monitor.get_latest_update_id()); if let Some(short_channel_id) = channel.context.get_short_channel_id() { short_to_chan_info.insert(short_channel_id, (channel.context.get_counterparty_node_id(), channel.context.channel_id())); } @@ -8195,7 +8230,7 @@ where update_id: CLOSED_CHANNEL_UPDATE_ID, updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast: true }], }; - pending_background_events.push(BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update))); + close_background_events.push(BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update))); } } @@ -8224,20 +8259,27 @@ where claimable_htlcs_list.push((payment_hash, previous_hops)); } + let peer_state_from_chans = |channel_by_id| { + PeerState { + channel_by_id, + outbound_v1_channel_by_id: HashMap::new(), + inbound_v1_channel_by_id: HashMap::new(), + latest_features: InitFeatures::empty(), + pending_msg_events: Vec::new(), + in_flight_monitor_updates: BTreeMap::new(), + monitor_update_blocked_actions: BTreeMap::new(), + actions_blocking_raa_monitor_updates: BTreeMap::new(), + is_connected: false, + } + }; + let peer_count: u64 = Readable::read(reader)?; let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(PublicKey, Mutex::Signer>>)>())); for _ in 0..peer_count { let peer_pubkey = Readable::read(reader)?; - let peer_state = PeerState { - channel_by_id: peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()), - outbound_v1_channel_by_id: HashMap::new(), - inbound_v1_channel_by_id: HashMap::new(), - latest_features: Readable::read(reader)?, - pending_msg_events: Vec::new(), - monitor_update_blocked_actions: BTreeMap::new(), - actions_blocking_raa_monitor_updates: BTreeMap::new(), - is_connected: false, - }; + let peer_chans = peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()); + let mut peer_state = peer_state_from_chans(peer_chans); + peer_state.latest_features = Readable::read(reader)?; per_peer_state.insert(peer_pubkey, Mutex::new(peer_state)); } @@ -8265,24 +8307,6 @@ where } } - for (node_id, peer_mtx) in per_peer_state.iter() { - let peer_state = peer_mtx.lock().unwrap(); - for (_, chan) in peer_state.channel_by_id.iter() { - for update in chan.uncompleted_unblocked_mon_updates() { - if let Some(funding_txo) = chan.context.get_funding_txo() { - log_trace!(args.logger, "Replaying ChannelMonitorUpdate {} for channel {}", - update.update_id, log_bytes!(funding_txo.to_channel_id())); - pending_background_events.push( - BackgroundEvent::MonitorUpdateRegeneratedOnStartup { - counterparty_node_id: *node_id, funding_txo, update: update.clone(), - }); - } else { - return Err(DecodeError::InvalidValue); - } - } - } - } - let _last_node_announcement_serial: u32 = Readable::read(reader)?; // Only used < 0.0.111 let highest_seen_timestamp: u32 = Readable::read(reader)?; @@ -8319,6 +8343,7 @@ where let mut pending_claiming_payments = Some(HashMap::new()); let mut monitor_update_blocked_actions_per_peer: Option>)>> = Some(Vec::new()); let mut events_override = None; + let mut in_flight_monitor_updates: Option>> = None; read_tlv_fields!(reader, { (1, pending_outbound_payments_no_retry, option), (2, pending_intercepted_htlcs, option), @@ -8329,6 +8354,7 @@ where (7, fake_scid_rand_bytes, option), (8, events_override, option), (9, claimable_htlc_purposes, vec_type), + (10, in_flight_monitor_updates, option), (11, probing_cookie_secret, option), (13, claimable_htlc_onion_fields, optional_vec), }); @@ -8362,6 +8388,103 @@ where retry_lock: Mutex::new(()) }; + // We have to replay (or skip, if they were completed after we wrote the `ChannelManager`) + // each `ChannelMonitorUpdate` in `in_flight_monitor_updates`. After doing so, we have to + // check that each channel we have isn't newer than the latest `ChannelMonitorUpdate`(s) we + // replayed, and for each monitor update we have to replay we have to ensure there's a + // `ChannelMonitor` for it. + // + // In order to do so we first walk all of our live channels (so that we can check their + // state immediately after doing the update replays, when we have the `update_id`s + // available) and then walk any remaining in-flight updates. + // + // Because the actual handling of the in-flight updates is the same, it's macro'ized here: + let mut pending_background_events = Vec::new(); + macro_rules! handle_in_flight_updates { + ($counterparty_node_id: expr, $chan_in_flight_upds: expr, $funding_txo: expr, + $monitor: expr, $peer_state: expr, $channel_info_log: expr + ) => { { + let mut max_in_flight_update_id = 0; + $chan_in_flight_upds.retain(|upd| upd.update_id > $monitor.get_latest_update_id()); + for update in $chan_in_flight_upds.iter() { + log_trace!(args.logger, "Replaying ChannelMonitorUpdate {} for {}channel {}", + update.update_id, $channel_info_log, log_bytes!($funding_txo.to_channel_id())); + max_in_flight_update_id = cmp::max(max_in_flight_update_id, update.update_id); + pending_background_events.push( + BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id: $counterparty_node_id, + funding_txo: $funding_txo, + update: update.clone(), + }); + } + if $peer_state.in_flight_monitor_updates.insert($funding_txo, $chan_in_flight_upds).is_some() { + log_error!(args.logger, "Duplicate in-flight monitor update set for the same channel!"); + return Err(DecodeError::InvalidValue); + } + max_in_flight_update_id + } } + } + + for (counterparty_id, peer_state_mtx) in per_peer_state.iter_mut() { + let mut peer_state_lock = peer_state_mtx.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + for (_, chan) in peer_state.channel_by_id.iter() { + // Channels that were persisted have to be funded, otherwise they should have been + // discarded. + let funding_txo = chan.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?; + let monitor = args.channel_monitors.get(&funding_txo) + .expect("We already checked for monitor presence when loading channels"); + let mut max_in_flight_update_id = monitor.get_latest_update_id(); + if let Some(in_flight_upds) = &mut in_flight_monitor_updates { + if let Some(mut chan_in_flight_upds) = in_flight_upds.remove(&(*counterparty_id, funding_txo)) { + max_in_flight_update_id = cmp::max(max_in_flight_update_id, + handle_in_flight_updates!(*counterparty_id, chan_in_flight_upds, + funding_txo, monitor, peer_state, "")); + } + } + if chan.get_latest_complete_monitor_update_id() > max_in_flight_update_id { + // If the channel is ahead of the monitor, return InvalidValue: + log_error!(args.logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!"); + log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} with update_id through {} in-flight", + log_bytes!(chan.context.channel_id()), monitor.get_latest_update_id(), max_in_flight_update_id); + log_error!(args.logger, " but the ChannelManager is at update_id {}.", chan.get_latest_complete_monitor_update_id()); + log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); + log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); + log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds."); + log_error!(args.logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning"); + return Err(DecodeError::InvalidValue); + } + } + } + + if let Some(in_flight_upds) = in_flight_monitor_updates { + for ((counterparty_id, funding_txo), mut chan_in_flight_updates) in in_flight_upds { + if let Some(monitor) = args.channel_monitors.get(&funding_txo) { + // Now that we've removed all the in-flight monitor updates for channels that are + // still open, we need to replay any monitor updates that are for closed channels, + // creating the neccessary peer_state entries as we go. + let peer_state_mutex = per_peer_state.entry(counterparty_id).or_insert_with(|| { + Mutex::new(peer_state_from_chans(HashMap::new())) + }); + let mut peer_state = peer_state_mutex.lock().unwrap(); + handle_in_flight_updates!(counterparty_id, chan_in_flight_updates, + funding_txo, monitor, peer_state, "closed "); + } else { + log_error!(args.logger, "A ChannelMonitor is missing even though we have in-flight updates for it! This indicates a potentially-critical violation of the chain::Watch API!"); + log_error!(args.logger, " The ChannelMonitor for channel {} is missing.", + log_bytes!(funding_txo.to_channel_id())); + log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); + log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); + log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds."); + log_error!(args.logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning"); + return Err(DecodeError::InvalidValue); + } + } + } + + // Note that we have to do the above replays before we push new monitor updates. + pending_background_events.append(&mut close_background_events); + { // If we're tracking pending payments, ensure we haven't lost any by looking at the // ChannelMonitor data for any channels for which we do not have authorative state diff --git a/lightning/src/util/ser.rs b/lightning/src/util/ser.rs index 6d66d353f..0e510760e 100644 --- a/lightning/src/util/ser.rs +++ b/lightning/src/util/ser.rs @@ -848,6 +848,7 @@ impl Readable for Vec { } impl_for_vec!(ecdsa::Signature); +impl_for_vec!(crate::chain::channelmonitor::ChannelMonitorUpdate); impl_for_vec!(crate::ln::channelmanager::MonitorUpdateCompletionAction); impl_for_vec!((A, B), A, B); impl_writeable_for_vec!(&crate::routing::router::BlindedTail); From e186593687ee99d68773214b6b63aae0647dd327 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 19 Jun 2023 05:35:34 +0000 Subject: [PATCH 6/9] Drop the now-unused update_id param to monitor update macros --- lightning/src/ln/channelmanager.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 0aca388ea..c7c16e077 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1819,7 +1819,7 @@ macro_rules! emit_channel_ready_event { } macro_rules! handle_monitor_update_completion { - ($self: ident, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { { + ($self: ident, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { { let mut updates = $chan.monitor_updating_restored(&$self.logger, &$self.node_signer, $self.genesis_hash, &$self.default_configuration, $self.best_block.read().unwrap().height()); @@ -1898,16 +1898,15 @@ macro_rules! handle_new_monitor_update { }, } } }; - ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING_INITIAL_MONITOR, $remove: expr) => { + ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING_INITIAL_MONITOR, $remove: expr) => { handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan, _internal, $remove, - handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan)) + handle_monitor_update_completion!($self, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan)) }; - ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr, INITIAL_MONITOR) => { - handle_new_monitor_update!($self, $update_res, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING_INITIAL_MONITOR, $chan_entry.remove_entry()) + ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr, INITIAL_MONITOR) => { + handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING_INITIAL_MONITOR, $chan_entry.remove_entry()) }; ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { { - let update_id = $update.update_id; let in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo) .or_insert_with(Vec::new); // During startup, we push monitor updates as background events through to here in @@ -1924,7 +1923,7 @@ macro_rules! handle_new_monitor_update { { let _ = in_flight_updates.remove(idx); if in_flight_updates.is_empty() && $chan.blocked_monitor_updates_pending() == 0 { - handle_monitor_update_completion!($self, update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan); + handle_monitor_update_completion!($self, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan); } }) } }; @@ -4934,7 +4933,7 @@ where if !channel.get().is_awaiting_monitor_update() || channel.get().context.get_latest_monitor_update_id() != highest_applied_update_id { return; } - handle_monitor_update_completion!(self, highest_applied_update_id, peer_state_lock, peer_state, per_peer_state, channel.get_mut()); + handle_monitor_update_completion!(self, peer_state_lock, peer_state, per_peer_state, channel.get_mut()); } /// Accepts a request to open a channel after a [`Event::OpenChannelRequest`]. @@ -5257,7 +5256,7 @@ where let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor); let chan = e.insert(chan); - let mut res = handle_new_monitor_update!(self, monitor_res, 0, peer_state_lock, peer_state, + let mut res = handle_new_monitor_update!(self, monitor_res, peer_state_lock, peer_state, per_peer_state, chan, MANUALLY_REMOVING_INITIAL_MONITOR, { peer_state.channel_by_id.remove(&new_channel_id) }); @@ -5292,7 +5291,7 @@ where let monitor = try_chan_entry!(self, chan.get_mut().funding_signed(&msg, best_block, &self.signer_provider, &self.logger), chan); let update_res = self.chain_monitor.watch_channel(chan.get().context.get_funding_txo().unwrap(), monitor); - let mut res = handle_new_monitor_update!(self, update_res, 0, peer_state_lock, peer_state, per_peer_state, chan, INITIAL_MONITOR); + let mut res = handle_new_monitor_update!(self, update_res, peer_state_lock, peer_state, per_peer_state, chan, INITIAL_MONITOR); if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res { // We weren't able to watch the channel to begin with, so no updates should be made on // it. Previously, full_stack_target found an (unreachable) panic when the From 60e671bb9388610dab2518806f352b6fc68bb501 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 19 Jun 2023 06:31:43 +0000 Subject: [PATCH 7/9] Remove the `blocked` param on `ChannelMonitorUpdates` in `Channel` Now that all `ChannelMonitorUpdate`s stored in `Channel` are blocked we don't need a bool to track it. --- lightning/src/ln/channel.rs | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index d68108b31..2ff6da801 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -588,17 +588,10 @@ pub(crate) const DISCONNECT_PEER_AWAITING_RESPONSE_TICKS: usize = 2; struct PendingChannelMonitorUpdate { update: ChannelMonitorUpdate, - /// In some cases we need to delay letting the [`ChannelMonitorUpdate`] go until after an - /// `Event` is processed by the user. This bool indicates the [`ChannelMonitorUpdate`] is - /// blocked on some external event and the [`ChannelManager`] will update us when we're ready. - /// - /// [`ChannelManager`]: super::channelmanager::ChannelManager - blocked: bool, } impl_writeable_tlv_based!(PendingChannelMonitorUpdate, { (0, update, required), - (2, blocked, required), }); /// Contains everything about the channel including state, and various flags. @@ -2289,7 +2282,7 @@ impl Channel { debug_assert!(false, "If there is a pending blocked monitor we should have MonitorUpdateInProgress set"); let update = self.build_commitment_no_status_check(logger); self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate { - update, blocked: true, + update, }); } } @@ -3552,9 +3545,6 @@ impl Channel { { assert_eq!(self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32); self.context.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32); - for upd in self.context.pending_monitor_updates.iter() { - debug_assert!(upd.blocked); - } // If we're past (or at) the FundingSent stage on an outbound channel, try to // (re-)broadcast the funding transaction as we may have declined to broadcast it when we @@ -4422,9 +4412,6 @@ impl Channel { /// Returns the next blocked monitor update, if one exists, and a bool which indicates a /// further blocked monitor update exists after the next. pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(ChannelMonitorUpdate, bool)> { - for upd in self.context.pending_monitor_updates.iter() { - debug_assert!(upd.blocked); - } if self.context.pending_monitor_updates.is_empty() { return None; } Some((self.context.pending_monitor_updates.remove(0).update, !self.context.pending_monitor_updates.is_empty())) @@ -4437,7 +4424,7 @@ impl Channel { let release_monitor = self.context.pending_monitor_updates.is_empty(); if !release_monitor { self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate { - update, blocked: true, + update, }); None } else { From 7884e687d2e6cae1da5bbc514546c284359f703c Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 20 Jun 2023 02:16:03 +0000 Subject: [PATCH 8/9] Rename Channel's latest-monitor-update fetch method for clarity `Channel::get_latest_complete_monitor_update_id` no longer refers to complete updates, but rather ones which were passed to the `ChannelManager` and which the `CHannel` no longer knows about. Thus, we rename it `get_latest_unblocked_monitor_update_id`. --- lightning/src/ln/channel.rs | 3 ++- lightning/src/ln/channelmanager.rs | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 2ff6da801..4745ac80e 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -4404,7 +4404,8 @@ impl Channel { (self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0 } - pub fn get_latest_complete_monitor_update_id(&self) -> u64 { + /// Gets the latest [`ChannelMonitorUpdate`] ID which has been released and is in-flight. + pub fn get_latest_unblocked_monitor_update_id(&self) -> u64 { if self.context.pending_monitor_updates.is_empty() { return self.context.get_latest_monitor_update_id(); } self.context.pending_monitor_updates[0].update.update_id - 1 } diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index c7c16e077..f38c6c13d 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -8441,12 +8441,12 @@ where funding_txo, monitor, peer_state, "")); } } - if chan.get_latest_complete_monitor_update_id() > max_in_flight_update_id { + if chan.get_latest_unblocked_monitor_update_id() > max_in_flight_update_id { // If the channel is ahead of the monitor, return InvalidValue: log_error!(args.logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!"); log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} with update_id through {} in-flight", log_bytes!(chan.context.channel_id()), monitor.get_latest_update_id(), max_in_flight_update_id); - log_error!(args.logger, " but the ChannelManager is at update_id {}.", chan.get_latest_complete_monitor_update_id()); + log_error!(args.logger, " but the ChannelManager is at update_id {}.", chan.get_latest_unblocked_monitor_update_id()); log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds."); From 9c3ad28fa854ffa43eff9a367028920ec3d8f294 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 20 Jun 2023 23:27:55 +0000 Subject: [PATCH 9/9] Rename `Channel::pending_monitor_updates` to `blocked` To differentiate between in-flight pending completion and blocked updates. --- lightning/src/ln/channel.rs | 44 ++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 4745ac80e..69a69e7a1 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -862,11 +862,9 @@ pub(super) struct ChannelContext { /// [`SignerProvider::derive_channel_signer`]. channel_keys_id: [u8; 32], - /// When we generate [`ChannelMonitorUpdate`]s to persist, they may not be persisted immediately. - /// If we then persist the [`channelmanager::ChannelManager`] and crash before the persistence - /// completes we still need to be able to complete the persistence. Thus, we have to keep a - /// copy of the [`ChannelMonitorUpdate`] here until it is complete. - pending_monitor_updates: Vec, + /// If we can't release a [`ChannelMonitorUpdate`] until some external action completes, we + /// store it here and only release it to the `ChannelManager` once it asks for it. + blocked_monitor_updates: Vec, } impl ChannelContext { @@ -2257,7 +2255,7 @@ impl Channel { } pub fn get_update_fulfill_htlc_and_commit(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger { - let release_cs_monitor = self.context.pending_monitor_updates.is_empty(); + let release_cs_monitor = self.context.blocked_monitor_updates.is_empty(); match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) { UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg } => { // Even if we aren't supposed to let new monitor updates with commitment state @@ -2272,16 +2270,16 @@ impl Channel { self.context.latest_monitor_update_id = monitor_update.update_id; monitor_update.updates.append(&mut additional_update.updates); } else { - let new_mon_id = self.context.pending_monitor_updates.get(0) + let new_mon_id = self.context.blocked_monitor_updates.get(0) .map(|upd| upd.update.update_id).unwrap_or(monitor_update.update_id); monitor_update.update_id = new_mon_id; - for held_update in self.context.pending_monitor_updates.iter_mut() { + for held_update in self.context.blocked_monitor_updates.iter_mut() { held_update.update.update_id += 1; } if msg.is_some() { debug_assert!(false, "If there is a pending blocked monitor we should have MonitorUpdateInProgress set"); let update = self.build_commitment_no_status_check(logger); - self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate { + self.context.blocked_monitor_updates.push(PendingChannelMonitorUpdate { update, }); } @@ -4406,25 +4404,25 @@ impl Channel { /// Gets the latest [`ChannelMonitorUpdate`] ID which has been released and is in-flight. pub fn get_latest_unblocked_monitor_update_id(&self) -> u64 { - if self.context.pending_monitor_updates.is_empty() { return self.context.get_latest_monitor_update_id(); } - self.context.pending_monitor_updates[0].update.update_id - 1 + if self.context.blocked_monitor_updates.is_empty() { return self.context.get_latest_monitor_update_id(); } + self.context.blocked_monitor_updates[0].update.update_id - 1 } /// Returns the next blocked monitor update, if one exists, and a bool which indicates a /// further blocked monitor update exists after the next. pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(ChannelMonitorUpdate, bool)> { - if self.context.pending_monitor_updates.is_empty() { return None; } - Some((self.context.pending_monitor_updates.remove(0).update, - !self.context.pending_monitor_updates.is_empty())) + if self.context.blocked_monitor_updates.is_empty() { return None; } + Some((self.context.blocked_monitor_updates.remove(0).update, + !self.context.blocked_monitor_updates.is_empty())) } /// Pushes a new monitor update into our monitor update queue, returning it if it should be /// immediately given to the user for persisting or `None` if it should be held as blocked. fn push_ret_blockable_mon_update(&mut self, update: ChannelMonitorUpdate) -> Option { - let release_monitor = self.context.pending_monitor_updates.is_empty(); + let release_monitor = self.context.blocked_monitor_updates.is_empty(); if !release_monitor { - self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate { + self.context.blocked_monitor_updates.push(PendingChannelMonitorUpdate { update, }); None @@ -4434,7 +4432,7 @@ impl Channel { } pub fn blocked_monitor_updates_pending(&self) -> usize { - self.context.pending_monitor_updates.len() + self.context.blocked_monitor_updates.len() } /// Returns true if the channel is awaiting the persistence of the initial ChannelMonitor. @@ -5590,7 +5588,7 @@ impl OutboundV1Channel { channel_type, channel_keys_id, - pending_monitor_updates: Vec::new(), + blocked_monitor_updates: Vec::new(), } }) } @@ -6220,7 +6218,7 @@ impl InboundV1Channel { channel_type, channel_keys_id, - pending_monitor_updates: Vec::new(), + blocked_monitor_updates: Vec::new(), } }; @@ -6806,7 +6804,7 @@ impl Writeable for Channel { (28, holder_max_accepted_htlcs, option), (29, self.context.temporary_channel_id, option), (31, channel_pending_event_emitted, option), - (33, self.context.pending_monitor_updates, vec_type), + (33, self.context.blocked_monitor_updates, vec_type), (35, pending_outbound_skimmed_fees, optional_vec), (37, holding_cell_skimmed_fees, optional_vec), }); @@ -7087,7 +7085,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch let mut temporary_channel_id: Option<[u8; 32]> = None; let mut holder_max_accepted_htlcs: Option = None; - let mut pending_monitor_updates = Some(Vec::new()); + let mut blocked_monitor_updates = Some(Vec::new()); let mut pending_outbound_skimmed_fees_opt: Option>> = None; let mut holding_cell_skimmed_fees_opt: Option>> = None; @@ -7114,7 +7112,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch (28, holder_max_accepted_htlcs, option), (29, temporary_channel_id, option), (31, channel_pending_event_emitted, option), - (33, pending_monitor_updates, vec_type), + (33, blocked_monitor_updates, vec_type), (35, pending_outbound_skimmed_fees_opt, optional_vec), (37, holding_cell_skimmed_fees_opt, optional_vec), }); @@ -7307,7 +7305,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch channel_type: channel_type.unwrap(), channel_keys_id, - pending_monitor_updates: pending_monitor_updates.unwrap(), + blocked_monitor_updates: blocked_monitor_updates.unwrap(), } }) }