diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 06d03fd0e..4386302f8 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -295,7 +295,7 @@ fn check_api_err(api_err: APIError) { // all others. If you hit this panic, the list of acceptable errors // is probably just stale and you should add new messages here. match err.as_str() { - "Peer for first hop currently disconnected/pending monitor update!" => {}, + "Peer for first hop currently disconnected" => {}, _ if err.starts_with("Cannot push more than their max accepted HTLCs ") => {}, _ if err.starts_with("Cannot send value that would put us over the max HTLC value in flight our peer will accept ") => {}, _ if err.starts_with("Cannot send value that would put our balance under counterparty-announced channel reserve value") => {}, diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 3a2077209..9a74f891b 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -796,7 +796,7 @@ mod tests { use crate::ln::functional_test_utils::*; use crate::ln::msgs::ChannelMessageHandler; use crate::util::errors::APIError; - use crate::util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider}; + use crate::util::events::{Event, ClosureReason, MessageSendEvent, MessageSendEventsProvider}; #[test] fn test_async_ooo_offchain_updates() { @@ -819,10 +819,8 @@ mod tests { nodes[1].node.claim_funds(payment_preimage_1); check_added_monitors!(nodes[1], 1); - expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000); nodes[1].node.claim_funds(payment_preimage_2); check_added_monitors!(nodes[1], 1); - expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000); let persistences = chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clone(); assert_eq!(persistences.len(), 1); @@ -850,8 +848,24 @@ mod tests { .find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update)); assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty()); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + assert!(nodes[1].node.get_and_clear_pending_events().is_empty()); nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap(); + let claim_events = nodes[1].node.get_and_clear_pending_events(); + assert_eq!(claim_events.len(), 2); + match claim_events[0] { + Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => { + assert_eq!(payment_hash_1, *payment_hash); + }, + _ => panic!("Unexpected event"), + } + match claim_events[1] { + Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => { + assert_eq!(payment_hash_2, *payment_hash); + }, + _ => panic!("Unexpected event"), + } + // Now manually walk the commitment signed dance - because we claimed two payments // back-to-back it doesn't fit into the neat walk commitment_signed_dance does. diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index abc4f4a14..1532884fa 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -143,7 +143,7 @@ fn test_monitor_and_persister_update_fail() { let mut node_0_per_peer_lock; let mut node_0_peer_state_lock; let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan.2); - if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { + if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { // Check that even though the persister is returning a InProgress, // because the update is bogus, ultimately the error that's returned // should be a PermanentFailure. @@ -1602,7 +1602,6 @@ fn test_monitor_update_fail_claim() { chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); nodes[1].node.claim_funds(payment_preimage_1); - expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); check_added_monitors!(nodes[1], 1); @@ -1628,6 +1627,7 @@ fn test_monitor_update_fail_claim() { let events = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 0); commitment_signed_dance!(nodes[1], nodes[2], payment_event.commitment_msg, false, true); + expect_pending_htlcs_forwardable_ignore!(nodes[1]); let (_, payment_hash_3, payment_secret_3) = get_payment_preimage_hash!(nodes[0]); nodes[2].node.send_payment(&route, payment_hash_3, &Some(payment_secret_3), PaymentId(payment_hash_3.0)).unwrap(); @@ -1645,6 +1645,7 @@ fn test_monitor_update_fail_claim() { let channel_id = chan_1.2; let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); + expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000); check_added_monitors!(nodes[1], 0); let bs_fulfill_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); @@ -1653,7 +1654,7 @@ fn test_monitor_update_fail_claim() { expect_payment_sent!(nodes[0], payment_preimage_1); // Get the payment forwards, note that they were batched into one commitment update. - expect_pending_htlcs_forwardable!(nodes[1]); + nodes[1].node.process_pending_htlc_forwards(); check_added_monitors!(nodes[1], 1); let bs_forward_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); nodes[0].node.handle_update_add_htlc(&nodes[1].node.get_our_node_id(), &bs_forward_update.update_add_htlcs[0]); @@ -1739,7 +1740,6 @@ fn test_monitor_update_on_pending_forwards() { chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); expect_pending_htlcs_forwardable_and_htlc_handling_failed!(nodes[1], vec![HTLCDestination::NextHopChannel { node_id: Some(nodes[2].node.get_our_node_id()), channel_id: chan_2.2 }]); check_added_monitors!(nodes[1], 1); - assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone(); @@ -1753,17 +1753,17 @@ fn test_monitor_update_on_pending_forwards() { let events = nodes[0].node.get_and_clear_pending_events(); assert_eq!(events.len(), 3); - if let Event::PaymentPathFailed { payment_hash, payment_failed_permanently, .. } = events[0] { + if let Event::PaymentPathFailed { payment_hash, payment_failed_permanently, .. } = events[1] { assert_eq!(payment_hash, payment_hash_1); assert!(payment_failed_permanently); } else { panic!("Unexpected event!"); } - match events[1] { + match events[2] { Event::PaymentFailed { payment_hash, .. } => { assert_eq!(payment_hash, payment_hash_1); }, _ => panic!("Unexpected event"), } - match events[2] { + match events[0] { Event::PendingHTLCsForwardable { .. } => { }, _ => panic!("Unexpected event"), }; @@ -1803,7 +1803,6 @@ fn monitor_update_claim_fail_no_response() { chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); nodes[1].node.claim_funds(payment_preimage_1); - expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000); check_added_monitors!(nodes[1], 1); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); @@ -1811,6 +1810,7 @@ fn monitor_update_claim_fail_no_response() { chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); + expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000); check_added_monitors!(nodes[1], 0); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); @@ -2290,7 +2290,6 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) { chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); nodes[0].node.claim_funds(payment_preimage_0); check_added_monitors!(nodes[0], 1); - expect_payment_claimed!(nodes[0], payment_hash_0, 100_000); nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &send.msgs[0]); nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &send.commitment_msg); @@ -2353,6 +2352,7 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) { chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); let (funding_txo, mon_id, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id).unwrap().clone(); nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_txo, mon_id); + expect_payment_claimed!(nodes[0], payment_hash_0, 100_000); // New outbound messages should be generated immediately upon a call to // get_and_clear_pending_msg_events (but not before). @@ -2606,7 +2606,15 @@ fn test_permanent_error_during_sending_shutdown() { chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::PermanentFailure); assert!(nodes[0].node.close_channel(&channel_id, &nodes[1].node.get_our_node_id()).is_ok()); - check_closed_broadcast!(nodes[0], true); + + // We always send the `shutdown` response when initiating a shutdown, even if we immediately + // close the channel thereafter. + let msg_events = nodes[0].node.get_and_clear_pending_msg_events(); + assert_eq!(msg_events.len(), 3); + if let MessageSendEvent::SendShutdown { .. } = msg_events[0] {} else { panic!(); } + if let MessageSendEvent::BroadcastChannelUpdate { .. } = msg_events[1] {} else { panic!(); } + if let MessageSendEvent::HandleError { .. } = msg_events[2] {} else { panic!(); } + check_added_monitors!(nodes[0], 2); check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() }); } @@ -2629,7 +2637,15 @@ fn test_permanent_error_during_handling_shutdown() { assert!(nodes[0].node.close_channel(&channel_id, &nodes[1].node.get_our_node_id()).is_ok()); let shutdown = get_event_msg!(nodes[0], MessageSendEvent::SendShutdown, nodes[1].node.get_our_node_id()); nodes[1].node.handle_shutdown(&nodes[0].node.get_our_node_id(), &shutdown); - check_closed_broadcast!(nodes[1], true); + + // We always send the `shutdown` response when receiving a shutdown, even if we immediately + // close the channel thereafter. + let msg_events = nodes[1].node.get_and_clear_pending_msg_events(); + assert_eq!(msg_events.len(), 3); + if let MessageSendEvent::SendShutdown { .. } = msg_events[0] {} else { panic!(); } + if let MessageSendEvent::BroadcastChannelUpdate { .. } = msg_events[1] {} else { panic!(); } + if let MessageSendEvent::HandleError { .. } = msg_events[2] {} else { panic!(); } + check_added_monitors!(nodes[1], 2); check_closed_event!(nodes[1], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() }); } @@ -2651,7 +2667,6 @@ fn double_temp_error() { // `claim_funds` results in a ChannelMonitorUpdate. nodes[1].node.claim_funds(payment_preimage_1); check_added_monitors!(nodes[1], 1); - expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000); let (funding_tx, latest_update_1, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); @@ -2659,7 +2674,6 @@ fn double_temp_error() { // which had some asserts that prevented it from being called twice. nodes[1].node.claim_funds(payment_preimage_2); check_added_monitors!(nodes[1], 1); - expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000); chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); let (_, latest_update_2, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); @@ -2668,11 +2682,24 @@ fn double_temp_error() { check_added_monitors!(nodes[1], 0); nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_tx, latest_update_2); - // Complete the first HTLC. - let events = nodes[1].node.get_and_clear_pending_msg_events(); - assert_eq!(events.len(), 1); + // Complete the first HTLC. Note that as a side-effect we handle the monitor update completions + // and get both PaymentClaimed events at once. + let msg_events = nodes[1].node.get_and_clear_pending_msg_events(); + + let events = nodes[1].node.get_and_clear_pending_events(); + assert_eq!(events.len(), 2); + match events[0] { + Event::PaymentClaimed { amount_msat: 1_000_000, payment_hash, .. } => assert_eq!(payment_hash, payment_hash_1), + _ => panic!("Unexpected Event: {:?}", events[0]), + } + match events[1] { + Event::PaymentClaimed { amount_msat: 1_000_000, payment_hash, .. } => assert_eq!(payment_hash, payment_hash_2), + _ => panic!("Unexpected Event: {:?}", events[1]), + } + + assert_eq!(msg_events.len(), 1); let (update_fulfill_1, commitment_signed_b1, node_id) = { - match &events[0] { + match &msg_events[0] { &MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { assert!(update_add_htlcs.is_empty()); assert_eq!(update_fulfill_htlcs.len(), 1); diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 62a2b7b51..e3641a620 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -393,35 +393,21 @@ enum UpdateFulfillFetch { } /// The return type of get_update_fulfill_htlc_and_commit. -pub enum UpdateFulfillCommitFetch { +pub enum UpdateFulfillCommitFetch<'a> { /// Indicates the HTLC fulfill is new, and either generated an update_fulfill message, placed /// it in the holding cell, or re-generated the update_fulfill message after the same claim was /// previously placed in the holding cell (and has since been removed). NewClaim { /// The ChannelMonitorUpdate which places the new payment preimage in the channel monitor - monitor_update: ChannelMonitorUpdate, + monitor_update: &'a ChannelMonitorUpdate, /// The value of the HTLC which was claimed, in msat. htlc_value_msat: u64, - /// The update_fulfill message and commitment_signed message (if the claim was not placed - /// in the holding cell). - msgs: Option<(msgs::UpdateFulfillHTLC, msgs::CommitmentSigned)>, }, /// Indicates the HTLC fulfill is duplicative and already existed either in the holding cell /// or has been forgotten (presumably previously claimed). DuplicateClaim {}, } -/// The return value of `revoke_and_ack` on success, primarily updates to other channels or HTLC -/// state. -pub(super) struct RAAUpdates { - pub commitment_update: Option, - pub accepted_htlcs: Vec<(PendingHTLCInfo, u64)>, - pub failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, - pub finalized_claimed_htlcs: Vec, - pub monitor_update: ChannelMonitorUpdate, - pub holding_cell_failed_htlcs: Vec<(HTLCSource, PaymentHash)>, -} - /// The return value of `monitor_updating_restored` pub(super) struct MonitorRestoreUpdates { pub raa: Option, @@ -558,6 +544,11 @@ pub(super) struct Channel { monitor_pending_channel_ready: bool, monitor_pending_revoke_and_ack: bool, monitor_pending_commitment_signed: bool, + + // TODO: If a channel is drop'd, we don't know whether the `ChannelMonitor` is ultimately + // responsible for some of the HTLCs here or not - we don't know whether the update in question + // completed or not. We currently ignore these fields entirely when force-closing a channel, + // but need to handle this somehow or we run the risk of losing HTLCs! monitor_pending_forwards: Vec<(PendingHTLCInfo, u64)>, monitor_pending_failures: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, monitor_pending_finalized_fulfills: Vec, @@ -743,6 +734,12 @@ pub(super) struct Channel { /// The unique identifier used to re-derive the private key material for the channel through /// [`SignerProvider::derive_channel_signer`]. channel_keys_id: [u8; 32], + + /// When we generate [`ChannelMonitorUpdate`]s to persist, they may not be persisted immediately. + /// If we then persist the [`channelmanager::ChannelManager`] and crash before the persistence + /// completes we still need to be able to complete the persistence. Thus, we have to keep a + /// copy of the [`ChannelMonitorUpdate`] here until it is complete. + pending_monitor_updates: Vec, } #[cfg(any(test, fuzzing))] @@ -1112,6 +1109,8 @@ impl Channel { channel_type, channel_keys_id, + + pending_monitor_updates: Vec::new(), }) } @@ -1458,6 +1457,8 @@ impl Channel { channel_type, channel_keys_id, + + pending_monitor_updates: Vec::new(), }; Ok(chan) @@ -1964,22 +1965,30 @@ impl Channel { } } - pub fn get_update_fulfill_htlc_and_commit(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> Result where L::Target: Logger { + pub fn get_update_fulfill_htlc_and_commit(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger { match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) { - UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(update_fulfill_htlc) } => { - let (commitment, mut additional_update) = match self.send_commitment_no_status_check(logger) { - Err(e) => return Err((e, monitor_update)), - Ok(res) => res - }; - // send_commitment_no_status_check may bump latest_monitor_id but we want them to be + UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(_) } => { + let mut additional_update = self.build_commitment_no_status_check(logger); + // build_commitment_no_status_check may bump latest_monitor_id but we want them to be // strictly increasing by one, so decrement it here. self.latest_monitor_update_id = monitor_update.update_id; monitor_update.updates.append(&mut additional_update.updates); - Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, msgs: Some((update_fulfill_htlc, commitment)) }) + self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new()); + self.pending_monitor_updates.push(monitor_update); + UpdateFulfillCommitFetch::NewClaim { + monitor_update: self.pending_monitor_updates.last().unwrap(), + htlc_value_msat, + } }, - UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } => - Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, msgs: None }), - UpdateFulfillFetch::DuplicateClaim {} => Ok(UpdateFulfillCommitFetch::DuplicateClaim {}), + UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } => { + self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new()); + self.pending_monitor_updates.push(monitor_update); + UpdateFulfillCommitFetch::NewClaim { + monitor_update: self.pending_monitor_updates.last().unwrap(), + htlc_value_msat, + } + } + UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {}, } } @@ -2259,9 +2268,9 @@ impl Channel { pub fn funding_created( &mut self, msg: &msgs::FundingCreated, best_block: BestBlock, signer_provider: &SP, logger: &L - ) -> Result<(msgs::FundingSigned, ChannelMonitor<::Signer>, Option), ChannelError> + ) -> Result<(msgs::FundingSigned, ChannelMonitor), ChannelError> where - SP::Target: SignerProvider, + SP::Target: SignerProvider, L::Target: Logger { if self.is_outbound() { @@ -2337,19 +2346,22 @@ impl Channel { log_info!(logger, "Generated funding_signed for peer for channel {}", log_bytes!(self.channel_id())); + let need_channel_ready = self.check_get_channel_ready(0).is_some(); + self.monitor_updating_paused(false, false, need_channel_ready, Vec::new(), Vec::new(), Vec::new()); + Ok((msgs::FundingSigned { channel_id: self.channel_id, signature - }, channel_monitor, self.check_get_channel_ready(0))) + }, channel_monitor)) } /// Handles a funding_signed message from the remote end. /// If this call is successful, broadcast the funding transaction (and not before!) pub fn funding_signed( &mut self, msg: &msgs::FundingSigned, best_block: BestBlock, signer_provider: &SP, logger: &L - ) -> Result<(ChannelMonitor<::Signer>, Transaction, Option), ChannelError> + ) -> Result, ChannelError> where - SP::Target: SignerProvider, + SP::Target: SignerProvider, L::Target: Logger { if !self.is_outbound() { @@ -2422,7 +2434,9 @@ impl Channel { log_info!(logger, "Received funding_signed from peer for channel {}", log_bytes!(self.channel_id())); - Ok((channel_monitor, self.funding_transaction.as_ref().cloned().unwrap(), self.check_get_channel_ready(0))) + let need_channel_ready = self.check_get_channel_ready(0).is_some(); + self.monitor_updating_paused(false, false, need_channel_ready, Vec::new(), Vec::new(), Vec::new()); + Ok(channel_monitor) } /// Handles a channel_ready message from our peer. If we've already sent our channel_ready @@ -3034,17 +3048,17 @@ impl Channel { Ok(()) } - pub fn commitment_signed(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<(msgs::RevokeAndACK, Option, ChannelMonitorUpdate), (Option, ChannelError)> + pub fn commitment_signed(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<&ChannelMonitorUpdate, ChannelError> where L::Target: Logger { if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) { - return Err((None, ChannelError::Close("Got commitment signed message when channel was not in an operational state".to_owned()))); + return Err(ChannelError::Close("Got commitment signed message when channel was not in an operational state".to_owned())); } if self.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 { - return Err((None, ChannelError::Close("Peer sent commitment_signed when we needed a channel_reestablish".to_owned()))); + return Err(ChannelError::Close("Peer sent commitment_signed when we needed a channel_reestablish".to_owned())); } if self.channel_state & BOTH_SIDES_SHUTDOWN_MASK == BOTH_SIDES_SHUTDOWN_MASK && self.last_sent_closing_fee.is_some() { - return Err((None, ChannelError::Close("Peer sent commitment_signed after we'd started exchanging closing_signeds".to_owned()))); + return Err(ChannelError::Close("Peer sent commitment_signed after we'd started exchanging closing_signeds".to_owned())); } let funding_script = self.get_funding_redeemscript(); @@ -3062,7 +3076,7 @@ impl Channel { log_bytes!(self.counterparty_funding_pubkey().serialize()), encode::serialize_hex(&bitcoin_tx.transaction), log_bytes!(sighash[..]), encode::serialize_hex(&funding_script), log_bytes!(self.channel_id())); if let Err(_) = self.secp_ctx.verify_ecdsa(&sighash, &msg.signature, &self.counterparty_funding_pubkey()) { - return Err((None, ChannelError::Close("Invalid commitment tx signature from peer".to_owned()))); + return Err(ChannelError::Close("Invalid commitment tx signature from peer".to_owned())); } bitcoin_tx.txid }; @@ -3077,7 +3091,7 @@ impl Channel { debug_assert!(!self.is_outbound()); let counterparty_reserve_we_require_msat = self.holder_selected_channel_reserve_satoshis * 1000; if commitment_stats.remote_balance_msat < commitment_stats.total_fee_sat * 1000 + counterparty_reserve_we_require_msat { - return Err((None, ChannelError::Close("Funding remote cannot afford proposed new fee".to_owned()))); + return Err(ChannelError::Close("Funding remote cannot afford proposed new fee".to_owned())); } } #[cfg(any(test, fuzzing))] @@ -3099,7 +3113,7 @@ impl Channel { } if msg.htlc_signatures.len() != commitment_stats.num_nondust_htlcs { - return Err((None, ChannelError::Close(format!("Got wrong number of HTLC signatures ({}) from remote. It must be {}", msg.htlc_signatures.len(), commitment_stats.num_nondust_htlcs)))); + return Err(ChannelError::Close(format!("Got wrong number of HTLC signatures ({}) from remote. It must be {}", msg.htlc_signatures.len(), commitment_stats.num_nondust_htlcs))); } // TODO: Sadly, we pass HTLCs twice to ChannelMonitor: once via the HolderCommitmentTransaction and once via the update @@ -3117,7 +3131,7 @@ impl Channel { log_bytes!(msg.htlc_signatures[idx].serialize_compact()[..]), log_bytes!(keys.countersignatory_htlc_key.serialize()), encode::serialize_hex(&htlc_tx), log_bytes!(htlc_sighash[..]), encode::serialize_hex(&htlc_redeemscript), log_bytes!(self.channel_id())); if let Err(_) = self.secp_ctx.verify_ecdsa(&htlc_sighash, &msg.htlc_signatures[idx], &keys.countersignatory_htlc_key) { - return Err((None, ChannelError::Close("Invalid HTLC tx signature from peer".to_owned()))); + return Err(ChannelError::Close("Invalid HTLC tx signature from peer".to_owned())); } htlcs_and_sigs.push((htlc, Some(msg.htlc_signatures[idx]), source)); } else { @@ -3133,10 +3147,8 @@ impl Channel { self.counterparty_funding_pubkey() ); - let next_per_commitment_point = self.holder_signer.get_per_commitment_point(self.cur_holder_commitment_transaction_number - 1, &self.secp_ctx); self.holder_signer.validate_holder_commitment(&holder_commitment_tx, commitment_stats.preimages) - .map_err(|_| (None, ChannelError::Close("Failed to validate our commitment".to_owned())))?; - let per_commitment_secret = self.holder_signer.release_commitment_secret(self.cur_holder_commitment_transaction_number + 1); + .map_err(|_| ChannelError::Close("Failed to validate our commitment".to_owned()))?; // Update state now that we've passed all the can-fail calls... let mut need_commitment = false; @@ -3181,7 +3193,7 @@ impl Channel { self.cur_holder_commitment_transaction_number -= 1; // Note that if we need_commitment & !AwaitingRemoteRevoke we'll call - // send_commitment_no_status_check() next which will reset this to RAAFirst. + // build_commitment_no_status_check() next which will reset this to RAAFirst. self.resend_order = RAACommitmentOrder::CommitmentFirst; if (self.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0 { @@ -3193,52 +3205,50 @@ impl Channel { // the corresponding HTLC status updates so that get_last_commitment_update // includes the right HTLCs. self.monitor_pending_commitment_signed = true; - let (_, mut additional_update) = self.send_commitment_no_status_check(logger).map_err(|e| (None, e))?; - // send_commitment_no_status_check may bump latest_monitor_id but we want them to be + let mut additional_update = self.build_commitment_no_status_check(logger); + // build_commitment_no_status_check may bump latest_monitor_id but we want them to be // strictly increasing by one, so decrement it here. self.latest_monitor_update_id = monitor_update.update_id; monitor_update.updates.append(&mut additional_update.updates); } log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updated HTLC state but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id)); - return Err((Some(monitor_update), ChannelError::Ignore("Previous monitor update failure prevented generation of RAA".to_owned()))); + self.pending_monitor_updates.push(monitor_update); + return Ok(self.pending_monitor_updates.last().unwrap()); } - let commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 { + let need_commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 { // If we're AwaitingRemoteRevoke we can't send a new commitment here, but that's ok - // we'll send one right away when we get the revoke_and_ack when we // free_holding_cell_htlcs(). - let (msg, mut additional_update) = self.send_commitment_no_status_check(logger).map_err(|e| (None, e))?; - // send_commitment_no_status_check may bump latest_monitor_id but we want them to be + let mut additional_update = self.build_commitment_no_status_check(logger); + // build_commitment_no_status_check may bump latest_monitor_id but we want them to be // strictly increasing by one, so decrement it here. self.latest_monitor_update_id = monitor_update.update_id; monitor_update.updates.append(&mut additional_update.updates); - Some(msg) - } else { None }; + true + } else { false }; log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updating HTLC state and responding with{} a revoke_and_ack.", - log_bytes!(self.channel_id()), if commitment_signed.is_some() { " our own commitment_signed and" } else { "" }); - - Ok((msgs::RevokeAndACK { - channel_id: self.channel_id, - per_commitment_secret, - next_per_commitment_point, - }, commitment_signed, monitor_update)) + log_bytes!(self.channel_id()), if need_commitment_signed { " our own commitment_signed and" } else { "" }); + self.pending_monitor_updates.push(monitor_update); + self.monitor_updating_paused(true, need_commitment_signed, false, Vec::new(), Vec::new(), Vec::new()); + return Ok(self.pending_monitor_updates.last().unwrap()); } /// Public version of the below, checking relevant preconditions first. /// If we're not in a state where freeing the holding cell makes sense, this is a no-op and /// returns `(None, Vec::new())`. - pub fn maybe_free_holding_cell_htlcs(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger { + pub fn maybe_free_holding_cell_htlcs(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger { if self.channel_state >= ChannelState::ChannelReady as u32 && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) == 0 { self.free_holding_cell_htlcs(logger) - } else { Ok((None, Vec::new())) } + } else { (None, Vec::new()) } } /// Frees any pending commitment updates in the holding cell, generating the relevant messages /// for our counterparty. - fn free_holding_cell_htlcs(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger { + fn free_holding_cell_htlcs(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger { assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, 0); if self.holding_cell_htlc_updates.len() != 0 || self.holding_cell_update_fee.is_some() { log_trace!(logger, "Freeing holding cell with {} HTLC updates{} in channel {}", self.holding_cell_htlc_updates.len(), @@ -3319,7 +3329,7 @@ impl Channel { } } if update_add_htlcs.is_empty() && update_fulfill_htlcs.is_empty() && update_fail_htlcs.is_empty() && self.holding_cell_update_fee.is_none() { - return Ok((None, htlcs_to_fail)); + return (None, htlcs_to_fail); } let update_fee = if let Some(feerate) = self.holding_cell_update_fee.take() { self.send_update_fee(feerate, false, logger) @@ -3327,8 +3337,8 @@ impl Channel { None }; - let (commitment_signed, mut additional_update) = self.send_commitment_no_status_check(logger)?; - // send_commitment_no_status_check and get_update_fulfill_htlc may bump latest_monitor_id + let mut additional_update = self.build_commitment_no_status_check(logger); + // build_commitment_no_status_check and get_update_fulfill_htlc may bump latest_monitor_id // but we want them to be strictly increasing by one, so reset it here. self.latest_monitor_update_id = monitor_update.update_id; monitor_update.updates.append(&mut additional_update.updates); @@ -3337,16 +3347,11 @@ impl Channel { log_bytes!(self.channel_id()), if update_fee.is_some() { "a fee update, " } else { "" }, update_add_htlcs.len(), update_fulfill_htlcs.len(), update_fail_htlcs.len()); - Ok((Some((msgs::CommitmentUpdate { - update_add_htlcs, - update_fulfill_htlcs, - update_fail_htlcs, - update_fail_malformed_htlcs: Vec::new(), - update_fee, - commitment_signed, - }, monitor_update)), htlcs_to_fail)) + self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new()); + self.pending_monitor_updates.push(monitor_update); + (Some(self.pending_monitor_updates.last().unwrap()), htlcs_to_fail) } else { - Ok((None, Vec::new())) + (None, Vec::new()) } } @@ -3355,7 +3360,7 @@ impl Channel { /// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail, /// generating an appropriate error *after* the channel state has been updated based on the /// revoke_and_ack message. - pub fn revoke_and_ack(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result + pub fn revoke_and_ack(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, &ChannelMonitorUpdate), ChannelError> where L::Target: Logger, { if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) { @@ -3542,8 +3547,8 @@ impl Channel { // When the monitor updating is restored we'll call get_last_commitment_update(), // which does not update state, but we're definitely now awaiting a remote revoke // before we can step forward any more, so set it here. - let (_, mut additional_update) = self.send_commitment_no_status_check(logger)?; - // send_commitment_no_status_check may bump latest_monitor_id but we want them to be + let mut additional_update = self.build_commitment_no_status_check(logger); + // build_commitment_no_status_check may bump latest_monitor_id but we want them to be // strictly increasing by one, so decrement it here. self.latest_monitor_update_id = monitor_update.update_id; monitor_update.updates.append(&mut additional_update.updates); @@ -3552,71 +3557,41 @@ impl Channel { self.monitor_pending_failures.append(&mut revoked_htlcs); self.monitor_pending_finalized_fulfills.append(&mut finalized_claimed_htlcs); log_debug!(logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id())); - return Ok(RAAUpdates { - commitment_update: None, finalized_claimed_htlcs: Vec::new(), - accepted_htlcs: Vec::new(), failed_htlcs: Vec::new(), - monitor_update, - holding_cell_failed_htlcs: Vec::new() - }); + self.pending_monitor_updates.push(monitor_update); + return Ok((Vec::new(), self.pending_monitor_updates.last().unwrap())); } - match self.free_holding_cell_htlcs(logger)? { - (Some((mut commitment_update, mut additional_update)), htlcs_to_fail) => { - commitment_update.update_fail_htlcs.reserve(update_fail_htlcs.len()); - for fail_msg in update_fail_htlcs.drain(..) { - commitment_update.update_fail_htlcs.push(fail_msg); - } - commitment_update.update_fail_malformed_htlcs.reserve(update_fail_malformed_htlcs.len()); - for fail_msg in update_fail_malformed_htlcs.drain(..) { - commitment_update.update_fail_malformed_htlcs.push(fail_msg); - } - + match self.free_holding_cell_htlcs(logger) { + (Some(_), htlcs_to_fail) => { + let mut additional_update = self.pending_monitor_updates.pop().unwrap(); // free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be // strictly increasing by one, so decrement it here. self.latest_monitor_update_id = monitor_update.update_id; monitor_update.updates.append(&mut additional_update.updates); - Ok(RAAUpdates { - commitment_update: Some(commitment_update), - finalized_claimed_htlcs, - accepted_htlcs: to_forward_infos, - failed_htlcs: revoked_htlcs, - monitor_update, - holding_cell_failed_htlcs: htlcs_to_fail - }) + self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs); + self.pending_monitor_updates.push(monitor_update); + Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap())) }, (None, htlcs_to_fail) => { if require_commitment { - let (commitment_signed, mut additional_update) = self.send_commitment_no_status_check(logger)?; + let mut additional_update = self.build_commitment_no_status_check(logger); - // send_commitment_no_status_check may bump latest_monitor_id but we want them to be + // build_commitment_no_status_check may bump latest_monitor_id but we want them to be // strictly increasing by one, so decrement it here. self.latest_monitor_update_id = monitor_update.update_id; monitor_update.updates.append(&mut additional_update.updates); log_debug!(logger, "Received a valid revoke_and_ack for channel {}. Responding with a commitment update with {} HTLCs failed.", log_bytes!(self.channel_id()), update_fail_htlcs.len() + update_fail_malformed_htlcs.len()); - Ok(RAAUpdates { - commitment_update: Some(msgs::CommitmentUpdate { - update_add_htlcs: Vec::new(), - update_fulfill_htlcs: Vec::new(), - update_fail_htlcs, - update_fail_malformed_htlcs, - update_fee: None, - commitment_signed - }), - finalized_claimed_htlcs, - accepted_htlcs: to_forward_infos, failed_htlcs: revoked_htlcs, - monitor_update, holding_cell_failed_htlcs: htlcs_to_fail - }) + self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs); + self.pending_monitor_updates.push(monitor_update); + Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap())) } else { log_debug!(logger, "Received a valid revoke_and_ack for channel {} with no reply necessary.", log_bytes!(self.channel_id())); - Ok(RAAUpdates { - commitment_update: None, - finalized_claimed_htlcs, - accepted_htlcs: to_forward_infos, failed_htlcs: revoked_htlcs, - monitor_update, holding_cell_failed_htlcs: htlcs_to_fail - }) + self.monitor_updating_paused(false, false, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs); + self.pending_monitor_updates.push(monitor_update); + Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap())) } } } @@ -3768,15 +3743,17 @@ impl Channel { } /// Indicates that a ChannelMonitor update is in progress and has not yet been fully persisted. - /// This must be called immediately after the [`chain::Watch`] call which returned - /// [`ChannelMonitorUpdateStatus::InProgress`]. + /// This must be called before we return the [`ChannelMonitorUpdate`] back to the + /// [`ChannelManager`], which will call [`Self::monitor_updating_restored`] once the monitor + /// update completes (potentially immediately). /// The messages which were generated with the monitor update must *not* have been sent to the /// remote end, and must instead have been dropped. They will be regenerated when /// [`Self::monitor_updating_restored`] is called. /// + /// [`ChannelManager`]: super::channelmanager::ChannelManager /// [`chain::Watch`]: crate::chain::Watch /// [`ChannelMonitorUpdateStatus::InProgress`]: crate::chain::ChannelMonitorUpdateStatus::InProgress - pub fn monitor_updating_paused(&mut self, resend_raa: bool, resend_commitment: bool, + fn monitor_updating_paused(&mut self, resend_raa: bool, resend_commitment: bool, resend_channel_ready: bool, mut pending_forwards: Vec<(PendingHTLCInfo, u64)>, mut pending_fails: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, mut pending_finalized_claimed_htlcs: Vec @@ -3803,6 +3780,7 @@ impl Channel { { assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32); self.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32); + self.pending_monitor_updates.clear(); // If we're past (or at) the FundingSent stage on an outbound channel, try to // (re-)broadcast the funding transaction as we may have declined to broadcast it when we @@ -4280,7 +4258,7 @@ impl Channel { pub fn shutdown( &mut self, signer_provider: &SP, their_features: &InitFeatures, msg: &msgs::Shutdown - ) -> Result<(Option, Option, Vec<(HTLCSource, PaymentHash)>), ChannelError> + ) -> Result<(Option, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where SP::Target: SignerProvider { if self.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 { @@ -4336,12 +4314,15 @@ impl Channel { let monitor_update = if update_shutdown_script { self.latest_monitor_update_id += 1; - Some(ChannelMonitorUpdate { + let monitor_update = ChannelMonitorUpdate { update_id: self.latest_monitor_update_id, updates: vec![ChannelMonitorUpdateStep::ShutdownScript { scriptpubkey: self.get_closing_scriptpubkey(), }], - }) + }; + self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new()); + self.pending_monitor_updates.push(monitor_update); + Some(self.pending_monitor_updates.last().unwrap()) } else { None }; let shutdown = if send_shutdown { Some(msgs::Shutdown { @@ -4891,6 +4872,10 @@ impl Channel { (self.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0 } + pub fn get_next_monitor_update(&self) -> Option<&ChannelMonitorUpdate> { + self.pending_monitor_updates.first() + } + /// Returns true if funding_created was sent/received. pub fn is_funding_initiated(&self) -> bool { self.channel_state >= ChannelState::FundingSent as u32 @@ -5791,8 +5776,7 @@ impl Channel { Ok(Some(res)) } - /// Only fails in case of bad keys - fn send_commitment_no_status_check(&mut self, logger: &L) -> Result<(msgs::CommitmentSigned, ChannelMonitorUpdate), ChannelError> where L::Target: Logger { + fn build_commitment_no_status_check(&mut self, logger: &L) -> ChannelMonitorUpdate where L::Target: Logger { log_trace!(logger, "Updating HTLC state for a newly-sent commitment_signed..."); // We can upgrade the status of some HTLCs that are waiting on a commitment, even if we // fail to generate this, we still are at least at a position where upgrading their status @@ -5825,15 +5809,9 @@ impl Channel { } self.resend_order = RAACommitmentOrder::RevokeAndACKFirst; - let (res, counterparty_commitment_txid, htlcs) = match self.send_commitment_no_state_update(logger) { - Ok((res, (counterparty_commitment_tx, mut htlcs))) => { - // Update state now that we've passed all the can-fail calls... - let htlcs_no_ref: Vec<(HTLCOutputInCommitment, Option>)> = - htlcs.drain(..).map(|(htlc, htlc_source)| (htlc, htlc_source.map(|source_ref| Box::new(source_ref.clone())))).collect(); - (res, counterparty_commitment_tx, htlcs_no_ref) - }, - Err(e) => return Err(e), - }; + let (counterparty_commitment_txid, mut htlcs_ref) = self.build_commitment_no_state_update(logger); + let htlcs: Vec<(HTLCOutputInCommitment, Option>)> = + htlcs_ref.drain(..).map(|(htlc, htlc_source)| (htlc, htlc_source.map(|source_ref| Box::new(source_ref.clone())))).collect(); if self.announcement_sigs_state == AnnouncementSigsState::MessageSent { self.announcement_sigs_state = AnnouncementSigsState::Committed; @@ -5850,16 +5828,13 @@ impl Channel { }] }; self.channel_state |= ChannelState::AwaitingRemoteRevoke as u32; - Ok((res, monitor_update)) + monitor_update } - /// Only fails in case of bad keys. Used for channel_reestablish commitment_signed generation - /// when we shouldn't change HTLC/channel state. - fn send_commitment_no_state_update(&self, logger: &L) -> Result<(msgs::CommitmentSigned, (Txid, Vec<(HTLCOutputInCommitment, Option<&HTLCSource>)>)), ChannelError> where L::Target: Logger { + fn build_commitment_no_state_update(&self, logger: &L) -> (Txid, Vec<(HTLCOutputInCommitment, Option<&HTLCSource>)>) where L::Target: Logger { let counterparty_keys = self.build_remote_transaction_keys(); let commitment_stats = self.build_commitment_transaction(self.cur_counterparty_commitment_transaction_number, &counterparty_keys, false, true, logger); let counterparty_commitment_txid = commitment_stats.tx.trust().txid(); - let (signature, htlc_signatures); #[cfg(any(test, fuzzing))] { @@ -5879,6 +5854,21 @@ impl Channel { } } + (counterparty_commitment_txid, commitment_stats.htlcs_included) + } + + /// Only fails in case of signer rejection. Used for channel_reestablish commitment_signed + /// generation when we shouldn't change HTLC/channel state. + fn send_commitment_no_state_update(&self, logger: &L) -> Result<(msgs::CommitmentSigned, (Txid, Vec<(HTLCOutputInCommitment, Option<&HTLCSource>)>)), ChannelError> where L::Target: Logger { + // Get the fee tests from `build_commitment_no_state_update` + #[cfg(any(test, fuzzing))] + self.build_commitment_no_state_update(logger); + + let counterparty_keys = self.build_remote_transaction_keys(); + let commitment_stats = self.build_commitment_transaction(self.cur_counterparty_commitment_transaction_number, &counterparty_keys, false, true, logger); + let counterparty_commitment_txid = commitment_stats.tx.trust().txid(); + let (signature, htlc_signatures); + { let mut htlcs = Vec::with_capacity(commitment_stats.htlcs_included.len()); for &(ref htlc, _) in commitment_stats.htlcs_included.iter() { @@ -5911,16 +5901,20 @@ impl Channel { }, (counterparty_commitment_txid, commitment_stats.htlcs_included))) } - /// Adds a pending outbound HTLC to this channel, and creates a signed commitment transaction - /// to send to the remote peer in one go. + /// Adds a pending outbound HTLC to this channel, and builds a new remote commitment + /// transaction and generates the corresponding [`ChannelMonitorUpdate`] in one go. /// /// Shorthand for calling [`Self::send_htlc`] followed by a commitment update, see docs on - /// [`Self::send_htlc`] and [`Self::send_commitment_no_state_update`] for more info. - pub fn send_htlc_and_commit(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, logger: &L) -> Result, ChannelError> where L::Target: Logger { - match self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, false, logger)? { - Some(update_add_htlc) => { - let (commitment_signed, monitor_update) = self.send_commitment_no_status_check(logger)?; - Ok(Some((update_add_htlc, commitment_signed, monitor_update))) + /// [`Self::send_htlc`] and [`Self::build_commitment_no_state_update`] for more info. + pub fn send_htlc_and_commit(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, logger: &L) -> Result, ChannelError> where L::Target: Logger { + let send_res = self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, false, logger); + if let Err(e) = &send_res { if let ChannelError::Ignore(_) = e {} else { debug_assert!(false, "Sending cannot trigger channel failure"); } } + match send_res? { + Some(_) => { + let monitor_update = self.build_commitment_no_status_check(logger); + self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new()); + self.pending_monitor_updates.push(monitor_update); + Ok(Some(self.pending_monitor_updates.last().unwrap())) }, None => Ok(None) } @@ -5946,8 +5940,12 @@ impl Channel { /// Begins the shutdown process, getting a message for the remote peer and returning all /// holding cell HTLCs for payment failure. - pub fn get_shutdown(&mut self, signer_provider: &SP, their_features: &InitFeatures, target_feerate_sats_per_kw: Option) - -> Result<(msgs::Shutdown, Option, Vec<(HTLCSource, PaymentHash)>), APIError> + /// + /// May jump to the channel being fully shutdown (see [`Self::is_shutdown`]) in which case no + /// [`ChannelMonitorUpdate`] will be returned). + pub fn get_shutdown(&mut self, signer_provider: &SP, their_features: &InitFeatures, + target_feerate_sats_per_kw: Option) + -> Result<(msgs::Shutdown, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError> where SP::Target: SignerProvider { for htlc in self.pending_outbound_htlcs.iter() { if let OutboundHTLCState::LocalAnnounced(_) = htlc.state { @@ -5967,9 +5965,16 @@ impl Channel { return Err(APIError::ChannelUnavailable{err: "Cannot begin shutdown while peer is disconnected or we're waiting on a monitor update, maybe force-close instead?".to_owned()}); } + // If we haven't funded the channel yet, we don't need to bother ensuring the shutdown + // script is set, we just force-close and call it a day. + let mut chan_closed = false; + if self.channel_state < ChannelState::FundingSent as u32 { + chan_closed = true; + } + let update_shutdown_script = match self.shutdown_scriptpubkey { Some(_) => false, - None => { + None if !chan_closed => { let shutdown_scriptpubkey = signer_provider.get_shutdown_scriptpubkey(); if !shutdown_scriptpubkey.is_compatible(their_features) { return Err(APIError::IncompatibleShutdownScript { script: shutdown_scriptpubkey.clone() }); @@ -5977,6 +5982,7 @@ impl Channel { self.shutdown_scriptpubkey = Some(shutdown_scriptpubkey); true }, + None => false, }; // From here on out, we may not fail! @@ -5990,12 +5996,15 @@ impl Channel { let monitor_update = if update_shutdown_script { self.latest_monitor_update_id += 1; - Some(ChannelMonitorUpdate { + let monitor_update = ChannelMonitorUpdate { update_id: self.latest_monitor_update_id, updates: vec![ChannelMonitorUpdateStep::ShutdownScript { scriptpubkey: self.get_closing_scriptpubkey(), }], - }) + }; + self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new()); + self.pending_monitor_updates.push(monitor_update); + Some(self.pending_monitor_updates.last().unwrap()) } else { None }; let shutdown = msgs::Shutdown { channel_id: self.channel_id, @@ -6016,6 +6025,9 @@ impl Channel { } }); + debug_assert!(!self.is_shutdown() || monitor_update.is_none(), + "we can't both complete shutdown and return a monitor update"); + Ok((shutdown, monitor_update, dropped_outbound_htlcs)) } @@ -6877,6 +6889,8 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch channel_type: channel_type.unwrap(), channel_keys_id, + + pending_monitor_updates: Vec::new(), }) } } @@ -7188,7 +7202,7 @@ mod tests { }]}; let funding_outpoint = OutPoint{ txid: tx.txid(), index: 0 }; let funding_created_msg = node_a_chan.get_outbound_funding_created(tx.clone(), funding_outpoint, &&logger).unwrap(); - let (funding_signed_msg, _, _) = node_b_chan.funding_created(&funding_created_msg, best_block, &&keys_provider, &&logger).unwrap(); + let (funding_signed_msg, _) = node_b_chan.funding_created(&funding_created_msg, best_block, &&keys_provider, &&logger).unwrap(); // Node B --> Node A: funding signed let _ = node_a_chan.funding_signed(&funding_signed_msg, best_block, &&keys_provider, &&logger); diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 45f45e138..61de296c1 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -65,6 +65,8 @@ use crate::util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, Maybe use crate::util::logger::{Level, Logger}; use crate::util::errors::APIError; +use alloc::collections::BTreeMap; + use crate::io; use crate::prelude::*; use core::{cmp, mem}; @@ -343,17 +345,6 @@ impl MsgHandleErrInternal { } } #[inline] - fn ignore_no_close(err: String) -> Self { - Self { - err: LightningError { - err, - action: msgs::ErrorAction::IgnoreError, - }, - chan_id: None, - shutdown_finish: None, - } - } - #[inline] fn from_no_close(err: msgs::LightningError) -> Self { Self { err, chan_id: None, shutdown_finish: None } } @@ -464,6 +455,7 @@ enum BackgroundEvent { ClosingMonitorUpdate((OutPoint, ChannelMonitorUpdate)), } +#[derive(Debug)] pub(crate) enum MonitorUpdateCompletionAction { /// Indicates that a payment ultimately destined for us was claimed and we should emit an /// [`events::Event::PaymentClaimed`] to the user if we haven't yet generated such an event for @@ -474,6 +466,11 @@ pub(crate) enum MonitorUpdateCompletionAction { EmitEvent { event: events::Event }, } +impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction, + (0, PaymentClaimed) => { (0, payment_hash, required) }, + (2, EmitEvent) => { (0, event, ignorable) }, +); + /// State we hold per-peer. pub(super) struct PeerState { /// `temporary_channel_id` or `channel_id` -> `channel`. @@ -487,6 +484,21 @@ pub(super) struct PeerState { /// Messages to send to the peer - pushed to in the same lock that they are generated in (except /// for broadcast messages, where ordering isn't as strict). pub(super) pending_msg_events: Vec, + /// Map from a specific channel to some action(s) that should be taken when all pending + /// [`ChannelMonitorUpdate`]s for the channel complete updating. + /// + /// Note that because we generally only have one entry here a HashMap is pretty overkill. A + /// BTreeMap currently stores more than ten elements per leaf node, so even up to a few + /// channels with a peer this will just be one allocation and will amount to a linear list of + /// channels to walk, avoiding the whole hashing rigmarole. + /// + /// Note that the channel may no longer exist. For example, if a channel was closed but we + /// later needed to claim an HTLC which is pending on-chain, we may generate a monitor update + /// for a missing channel. While a malicious peer could construct a second channel with the + /// same `temporary_channel_id` (or final `channel_id` in the case of 0conf channels or prior + /// to funding appearing on-chain), the downstream `ChannelMonitor` set is required to ensure + /// duplicates do not occur, so such channels should fail without a monitor update completing. + monitor_update_blocked_actions: BTreeMap<[u8; 32], Vec>, /// The peer is currently connected (i.e. we've seen a /// [`ChannelMessageHandler::peer_connected`] and no corresponding /// [`ChannelMessageHandler::peer_disconnected`]. @@ -501,7 +513,7 @@ impl PeerState { if require_disconnected && self.is_connected { return false } - self.channel_by_id.len() == 0 + self.channel_by_id.is_empty() && self.monitor_update_blocked_actions.is_empty() } } @@ -1367,78 +1379,6 @@ macro_rules! remove_channel { } } -macro_rules! handle_monitor_update_res { - ($self: ident, $err: expr, $chan: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $resend_channel_ready: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr, $chan_id: expr) => { - match $err { - ChannelMonitorUpdateStatus::PermanentFailure => { - log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure", log_bytes!($chan_id[..])); - update_maps_on_chan_removal!($self, $chan); - // TODO: $failed_fails is dropped here, which will cause other channels to hit the - // chain in a confused state! We need to move them into the ChannelMonitor which - // will be responsible for failing backwards once things confirm on-chain. - // It's ok that we drop $failed_forwards here - at this point we'd rather they - // broadcast HTLC-Timeout and pay the associated fees to get their funds back than - // us bother trying to claim it just to forward on to another peer. If we're - // splitting hairs we'd prefer to claim payments that were to us, but we haven't - // given up the preimage yet, so might as well just wait until the payment is - // retried, avoiding the on-chain fees. - let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure".to_owned(), *$chan_id, $chan.get_user_id(), - $chan.force_shutdown(false), $self.get_channel_update_for_broadcast(&$chan).ok() )); - (res, true) - }, - ChannelMonitorUpdateStatus::InProgress => { - log_info!($self.logger, "Disabling channel {} due to monitor update in progress. On restore will send {} and process {} forwards, {} fails, and {} fulfill finalizations", - log_bytes!($chan_id[..]), - if $resend_commitment && $resend_raa { - match $action_type { - RAACommitmentOrder::CommitmentFirst => { "commitment then RAA" }, - RAACommitmentOrder::RevokeAndACKFirst => { "RAA then commitment" }, - } - } else if $resend_commitment { "commitment" } - else if $resend_raa { "RAA" } - else { "nothing" }, - (&$failed_forwards as &Vec<(PendingHTLCInfo, u64)>).len(), - (&$failed_fails as &Vec<(HTLCSource, PaymentHash, HTLCFailReason)>).len(), - (&$failed_finalized_fulfills as &Vec).len()); - if !$resend_commitment { - debug_assert!($action_type == RAACommitmentOrder::RevokeAndACKFirst || !$resend_raa); - } - if !$resend_raa { - debug_assert!($action_type == RAACommitmentOrder::CommitmentFirst || !$resend_commitment); - } - $chan.monitor_updating_paused($resend_raa, $resend_commitment, $resend_channel_ready, $failed_forwards, $failed_fails, $failed_finalized_fulfills); - (Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore("Failed to update ChannelMonitor".to_owned()), *$chan_id)), false) - }, - ChannelMonitorUpdateStatus::Completed => { - (Ok(()), false) - }, - } - }; - ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $resend_channel_ready: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr) => { { - let (res, drop) = handle_monitor_update_res!($self, $err, $entry.get_mut(), $action_type, $resend_raa, $resend_commitment, $resend_channel_ready, $failed_forwards, $failed_fails, $failed_finalized_fulfills, $entry.key()); - if drop { - $entry.remove_entry(); - } - res - } }; - ($self: ident, $err: expr, $entry: expr, $action_type: path, $chan_id: expr, COMMITMENT_UPDATE_ONLY) => { { - debug_assert!($action_type == RAACommitmentOrder::CommitmentFirst); - handle_monitor_update_res!($self, $err, $entry, $action_type, false, true, false, Vec::new(), Vec::new(), Vec::new(), $chan_id) - } }; - ($self: ident, $err: expr, $entry: expr, $action_type: path, $chan_id: expr, NO_UPDATE) => { - handle_monitor_update_res!($self, $err, $entry, $action_type, false, false, false, Vec::new(), Vec::new(), Vec::new(), $chan_id) - }; - ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_channel_ready: expr, OPTIONALLY_RESEND_FUNDING_LOCKED) => { - handle_monitor_update_res!($self, $err, $entry, $action_type, false, false, $resend_channel_ready, Vec::new(), Vec::new(), Vec::new()) - }; - ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr) => { - handle_monitor_update_res!($self, $err, $entry, $action_type, $resend_raa, $resend_commitment, false, Vec::new(), Vec::new(), Vec::new()) - }; - ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => { - handle_monitor_update_res!($self, $err, $entry, $action_type, $resend_raa, $resend_commitment, false, $failed_forwards, $failed_fails, Vec::new()) - }; -} - macro_rules! send_channel_ready { ($self: ident, $pending_msg_events: expr, $channel: expr, $channel_ready_msg: expr) => {{ $pending_msg_events.push(events::MessageSendEvent::SendChannelReady { @@ -1476,6 +1416,93 @@ macro_rules! emit_channel_ready_event { } } +macro_rules! handle_monitor_update_completion { + ($self: ident, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan: expr) => { { + let mut updates = $chan.monitor_updating_restored(&$self.logger, + &$self.node_signer, $self.genesis_hash, &$self.default_configuration, + $self.best_block.read().unwrap().height()); + let counterparty_node_id = $chan.get_counterparty_node_id(); + let channel_update = if updates.channel_ready.is_some() && $chan.is_usable() { + // We only send a channel_update in the case where we are just now sending a + // channel_ready and the channel is in a usable state. We may re-send a + // channel_update later through the announcement_signatures process for public + // channels, but there's no reason not to just inform our counterparty of our fees + // now. + if let Ok(msg) = $self.get_channel_update_for_unicast($chan) { + Some(events::MessageSendEvent::SendChannelUpdate { + node_id: counterparty_node_id, + msg, + }) + } else { None } + } else { None }; + + let update_actions = $peer_state.monitor_update_blocked_actions + .remove(&$chan.channel_id()).unwrap_or(Vec::new()); + + let htlc_forwards = $self.handle_channel_resumption( + &mut $peer_state.pending_msg_events, $chan, updates.raa, + updates.commitment_update, updates.order, updates.accepted_htlcs, + updates.funding_broadcastable, updates.channel_ready, + updates.announcement_sigs); + if let Some(upd) = channel_update { + $peer_state.pending_msg_events.push(upd); + } + + let channel_id = $chan.channel_id(); + core::mem::drop($peer_state_lock); + + $self.handle_monitor_update_completion_actions(update_actions); + + if let Some(forwards) = htlc_forwards { + $self.forward_htlcs(&mut [forwards][..]); + } + $self.finalize_claims(updates.finalized_claimed_htlcs); + for failure in updates.failed_htlcs.drain(..) { + let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id }; + $self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver); + } + } } +} + +macro_rules! handle_new_monitor_update { + ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { { + // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in + // any case so that it won't deadlock. + debug_assert!($self.id_to_peer.try_lock().is_ok()); + match $update_res { + ChannelMonitorUpdateStatus::InProgress => { + log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.", + log_bytes!($chan.channel_id()[..])); + Ok(()) + }, + ChannelMonitorUpdateStatus::PermanentFailure => { + log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure", + log_bytes!($chan.channel_id()[..])); + update_maps_on_chan_removal!($self, $chan); + let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown( + "ChannelMonitor storage failure".to_owned(), $chan.channel_id(), + $chan.get_user_id(), $chan.force_shutdown(false), + $self.get_channel_update_for_broadcast(&$chan).ok())); + $remove; + res + }, + ChannelMonitorUpdateStatus::Completed => { + if ($update_id == 0 || $chan.get_next_monitor_update() + .expect("We can't be processing a monitor update if it isn't queued") + .update_id == $update_id) && + $chan.get_latest_monitor_update_id() == $update_id + { + handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $chan); + } + Ok(()) + }, + } + } }; + ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan_entry: expr) => { + handle_new_monitor_update!($self, $update_res, $update_id, $peer_state_lock, $peer_state, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry()) + } +} + impl ChannelManager where M::Target: chain::Watch<::Signer>, @@ -1790,25 +1817,27 @@ where let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(channel_id.clone()) { hash_map::Entry::Occupied(mut chan_entry) => { - let (shutdown_msg, monitor_update, htlcs) = chan_entry.get_mut().get_shutdown(&self.signer_provider, &peer_state.latest_features, target_feerate_sats_per_1000_weight)?; + let funding_txo_opt = chan_entry.get().get_funding_txo(); + let their_features = &peer_state.latest_features; + let (shutdown_msg, mut monitor_update_opt, htlcs) = chan_entry.get_mut() + .get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight)?; failed_htlcs = htlcs; - // Update the monitor with the shutdown script if necessary. - if let Some(monitor_update) = monitor_update { - let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), &monitor_update); - let (result, is_permanent) = - handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE); - if is_permanent { - remove_channel!(self, chan_entry); - break result; - } - } - + // We can send the `shutdown` message before updating the `ChannelMonitor` + // here as we don't need the monitor update to complete until we send a + // `shutdown_signed`, which we'll delay if we're pending a monitor update. peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { node_id: *counterparty_node_id, - msg: shutdown_msg + msg: shutdown_msg, }); + // Update the monitor with the shutdown script if necessary. + if let Some(monitor_update) = monitor_update_opt.take() { + let update_id = monitor_update.update_id; + let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update); + break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan_entry); + } + if chan_entry.get().is_shutdown() { let channel = remove_channel!(self, chan_entry); if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) { @@ -2412,54 +2441,35 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(id) { - match { - if !chan.get().is_live() { - return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected/pending monitor update!".to_owned()}); - } - break_chan_entry!(self, chan.get_mut().send_htlc_and_commit( - htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute { - path: path.clone(), - session_priv: session_priv.clone(), - first_hop_htlc_msat: htlc_msat, - payment_id, - payment_secret: payment_secret.clone(), - payment_params: payment_params.clone(), - }, onion_packet, &self.logger), - chan) - } { - Some((update_add, commitment_signed, monitor_update)) => { - let update_err = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update); - let chan_id = chan.get().channel_id(); - match (update_err, - handle_monitor_update_res!(self, update_err, chan, - RAACommitmentOrder::CommitmentFirst, false, true)) - { - (ChannelMonitorUpdateStatus::PermanentFailure, Err(e)) => break Err(e), - (ChannelMonitorUpdateStatus::Completed, Ok(())) => {}, - (ChannelMonitorUpdateStatus::InProgress, Err(_)) => { - // Note that MonitorUpdateInProgress here indicates (per function - // docs) that we will resend the commitment update once monitor - // updating completes. Therefore, we must return an error - // indicating that it is unsafe to retry the payment wholesale, - // which we do in the send_payment check for - // MonitorUpdateInProgress, below. - return Err(APIError::MonitorUpdateInProgress); - }, - _ => unreachable!(), + if !chan.get().is_live() { + return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected".to_owned()}); + } + let funding_txo = chan.get().get_funding_txo().unwrap(); + let send_res = chan.get_mut().send_htlc_and_commit(htlc_msat, payment_hash.clone(), + htlc_cltv, HTLCSource::OutboundRoute { + path: path.clone(), + session_priv: session_priv.clone(), + first_hop_htlc_msat: htlc_msat, + payment_id, + payment_secret: payment_secret.clone(), + payment_params: payment_params.clone(), + }, onion_packet, &self.logger); + match break_chan_entry!(self, send_res, chan) { + Some(monitor_update) => { + let update_id = monitor_update.update_id; + let update_res = self.chain_monitor.update_channel(funding_txo, monitor_update); + if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan) { + break Err(e); + } + if update_res == ChannelMonitorUpdateStatus::InProgress { + // Note that MonitorUpdateInProgress here indicates (per function + // docs) that we will resend the commitment update once monitor + // updating completes. Therefore, we must return an error + // indicating that it is unsafe to retry the payment wholesale, + // which we do in the send_payment check for + // MonitorUpdateInProgress, below. + return Err(APIError::MonitorUpdateInProgress); } - - log_debug!(self.logger, "Sending payment along path resulted in a commitment_signed for channel {}", log_bytes!(chan_id)); - peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: path.first().unwrap().pubkey, - updates: msgs::CommitmentUpdate { - update_add_htlcs: vec![update_add], - update_fulfill_htlcs: Vec::new(), - update_fail_htlcs: Vec::new(), - update_fail_malformed_htlcs: Vec::new(), - update_fee: None, - commitment_signed, - }, - }); }, None => { }, } @@ -3957,7 +3967,6 @@ where let per_peer_state = self.per_peer_state.read().unwrap(); let chan_id = prev_hop.outpoint.to_channel_id(); - let counterparty_node_id_opt = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) { Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()), None => None @@ -3969,99 +3978,57 @@ where ) ).unwrap_or(None); - if let Some(hash_map::Entry::Occupied(mut chan)) = peer_state_opt.as_mut().map(|peer_state| peer_state.channel_by_id.entry(chan_id)) - { - let counterparty_node_id = chan.get().get_counterparty_node_id(); - match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) { - Ok(msgs_monitor_option) => { - if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option { - match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) { - ChannelMonitorUpdateStatus::Completed => {}, - e => { - log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Debug }, - "Failed to update channel monitor with preimage {:?}: {:?}", - payment_preimage, e); - let err = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err(); - mem::drop(peer_state_opt); - mem::drop(per_peer_state); - self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat))); - return Err((counterparty_node_id, err)); - } - } - if let Some((msg, commitment_signed)) = msgs { - log_debug!(self.logger, "Claiming funds for HTLC with preimage {} resulted in a commitment_signed for channel {}", - log_bytes!(payment_preimage.0), log_bytes!(chan.get().channel_id())); - peer_state_opt.as_mut().unwrap().pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: counterparty_node_id, - updates: msgs::CommitmentUpdate { - update_add_htlcs: Vec::new(), - update_fulfill_htlcs: vec![msg], - update_fail_htlcs: Vec::new(), - update_fail_malformed_htlcs: Vec::new(), - update_fee: None, - commitment_signed, - } - }); - } - mem::drop(peer_state_opt); - mem::drop(per_peer_state); - self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat))); - Ok(()) - } else { - Ok(()) + if let Some(mut peer_state_lock) = peer_state_opt.take() { + let peer_state = &mut *peer_state_lock; + if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(chan_id) { + let counterparty_node_id = chan.get().get_counterparty_node_id(); + let fulfill_res = chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger); + + if let UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } = fulfill_res { + if let Some(action) = completion_action(Some(htlc_value_msat)) { + log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}", + log_bytes!(chan_id), action); + peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); } - }, - Err((e, monitor_update)) => { - match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) { - ChannelMonitorUpdateStatus::Completed => {}, - e => { - // TODO: This needs to be handled somehow - if we receive a monitor update - // with a preimage we *must* somehow manage to propagate it to the upstream - // channel, or we must have an ability to receive the same update and try - // again on restart. - log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Info }, - "Failed to update channel monitor with preimage {:?} immediately prior to force-close: {:?}", - payment_preimage, e); - }, + let update_id = monitor_update.update_id; + let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, monitor_update); + let res = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, + peer_state, chan); + if let Err(e) = res { + // TODO: This is a *critical* error - we probably updated the outbound edge + // of the HTLC's monitor with a preimage. We should retry this monitor + // update over and over again until morale improves. + log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage); + return Err((counterparty_node_id, e)); } - let (drop, res) = convert_chan_err!(self, e, chan.get_mut(), &chan_id); - if drop { - chan.remove_entry(); - } - mem::drop(peer_state_opt); - mem::drop(per_peer_state); - self.handle_monitor_update_completion_actions(completion_action(None)); - Err((counterparty_node_id, res)) - }, + } + return Ok(()); } - } else { - let preimage_update = ChannelMonitorUpdate { - update_id: CLOSED_CHANNEL_UPDATE_ID, - updates: vec![ChannelMonitorUpdateStep::PaymentPreimage { - payment_preimage, - }], - }; - // We update the ChannelMonitor on the backward link, after - // receiving an `update_fulfill_htlc` from the forward link. - let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update); - if update_res != ChannelMonitorUpdateStatus::Completed { - // TODO: This needs to be handled somehow - if we receive a monitor update - // with a preimage we *must* somehow manage to propagate it to the upstream - // channel, or we must have an ability to receive the same event and try - // again on restart. - log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}", - payment_preimage, update_res); - } - mem::drop(peer_state_opt); - mem::drop(per_peer_state); - // Note that we do process the completion action here. This totally could be a - // duplicate claim, but we have no way of knowing without interrogating the - // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are - // generally always allowed to be duplicative (and it's specifically noted in - // `PaymentForwarded`). - self.handle_monitor_update_completion_actions(completion_action(None)); - Ok(()) } + let preimage_update = ChannelMonitorUpdate { + update_id: CLOSED_CHANNEL_UPDATE_ID, + updates: vec![ChannelMonitorUpdateStep::PaymentPreimage { + payment_preimage, + }], + }; + // We update the ChannelMonitor on the backward link, after + // receiving an `update_fulfill_htlc` from the forward link. + let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update); + if update_res != ChannelMonitorUpdateStatus::Completed { + // TODO: This needs to be handled somehow - if we receive a monitor update + // with a preimage we *must* somehow manage to propagate it to the upstream + // channel, or we must have an ability to receive the same event and try + // again on restart. + log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}", + payment_preimage, update_res); + } + // Note that we do process the completion action here. This totally could be a + // duplicate claim, but we have no way of knowing without interrogating the + // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are + // generally always allowed to be duplicative (and it's specifically noted in + // `PaymentForwarded`). + self.handle_monitor_update_completion_actions(completion_action(None)); + Ok(()) } fn finalize_claims(&self, sources: Vec) { @@ -4132,6 +4099,14 @@ where pending_forwards: Vec<(PendingHTLCInfo, u64)>, funding_broadcastable: Option, channel_ready: Option, announcement_sigs: Option) -> Option<(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)> { + log_trace!(self.logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {}broadcasting funding, {} channel ready, {} announcement", + log_bytes!(channel.channel_id()), + if raa.is_some() { "an" } else { "no" }, + if commitment_update.is_some() { "a" } else { "no" }, pending_forwards.len(), + if funding_broadcastable.is_some() { "" } else { "not " }, + if channel_ready.is_some() { "sending" } else { "without" }, + if announcement_sigs.is_some() { "sending" } else { "without" }); + let mut htlc_forwards = None; let counterparty_node_id = channel.get_counterparty_node_id(); @@ -4190,65 +4165,36 @@ where fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64, counterparty_node_id: Option<&PublicKey>) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); - let htlc_forwards; - let (mut pending_failures, finalized_claims, counterparty_node_id) = { - let counterparty_node_id = match counterparty_node_id { - Some(cp_id) => cp_id.clone(), - None => { - // TODO: Once we can rely on the counterparty_node_id from the - // monitor event, this and the id_to_peer map should be removed. - let id_to_peer = self.id_to_peer.lock().unwrap(); - match id_to_peer.get(&funding_txo.to_channel_id()) { - Some(cp_id) => cp_id.clone(), - None => return, - } + let counterparty_node_id = match counterparty_node_id { + Some(cp_id) => cp_id.clone(), + None => { + // TODO: Once we can rely on the counterparty_node_id from the + // monitor event, this and the id_to_peer map should be removed. + let id_to_peer = self.id_to_peer.lock().unwrap(); + match id_to_peer.get(&funding_txo.to_channel_id()) { + Some(cp_id) => cp_id.clone(), + None => return, } - }; - let per_peer_state = self.per_peer_state.read().unwrap(); - let mut peer_state_lock; - let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); - if peer_state_mutex_opt.is_none() { return } - peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); - let peer_state = &mut *peer_state_lock; - let mut channel = { - match peer_state.channel_by_id.entry(funding_txo.to_channel_id()){ - hash_map::Entry::Occupied(chan) => chan, - hash_map::Entry::Vacant(_) => return, - } - }; - if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id { - return; } - - let updates = channel.get_mut().monitor_updating_restored(&self.logger, &self.node_signer, self.genesis_hash, &self.default_configuration, self.best_block.read().unwrap().height()); - let channel_update = if updates.channel_ready.is_some() && channel.get().is_usable() { - // We only send a channel_update in the case where we are just now sending a - // channel_ready and the channel is in a usable state. We may re-send a - // channel_update later through the announcement_signatures process for public - // channels, but there's no reason not to just inform our counterparty of our fees - // now. - if let Ok(msg) = self.get_channel_update_for_unicast(channel.get()) { - Some(events::MessageSendEvent::SendChannelUpdate { - node_id: channel.get().get_counterparty_node_id(), - msg, - }) - } else { None } - } else { None }; - htlc_forwards = self.handle_channel_resumption(&mut peer_state.pending_msg_events, channel.get_mut(), updates.raa, updates.commitment_update, updates.order, updates.accepted_htlcs, updates.funding_broadcastable, updates.channel_ready, updates.announcement_sigs); - if let Some(upd) = channel_update { - peer_state.pending_msg_events.push(upd); - } - - (updates.failed_htlcs, updates.finalized_claimed_htlcs, counterparty_node_id) }; - if let Some(forwards) = htlc_forwards { - self.forward_htlcs(&mut [forwards][..]); - } - self.finalize_claims(finalized_claims); - for failure in pending_failures.drain(..) { - let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id: funding_txo.to_channel_id() }; - self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver); + let per_peer_state = self.per_peer_state.read().unwrap(); + let mut peer_state_lock; + let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); + if peer_state_mutex_opt.is_none() { return } + peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let mut channel = { + match peer_state.channel_by_id.entry(funding_txo.to_channel_id()){ + hash_map::Entry::Occupied(chan) => chan, + hash_map::Entry::Vacant(_) => return, + } + }; + log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}", + highest_applied_update_id, channel.get().get_latest_monitor_update_id()); + if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id { + return; } + handle_monitor_update_completion!(self, highest_applied_update_id, peer_state_lock, peer_state, channel.get_mut()); } /// Accepts a request to open a channel after a [`Event::OpenChannelRequest`]. @@ -4506,60 +4452,31 @@ where } fn internal_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), MsgHandleErrInternal> { + let best_block = *self.best_block.read().unwrap(); + let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { debug_assert!(false); MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id) })?; - let ((funding_msg, monitor, mut channel_ready), mut chan) = { - let best_block = *self.best_block.read().unwrap(); - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; + + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let ((funding_msg, monitor), chan) = match peer_state.channel_by_id.entry(msg.temporary_channel_id) { hash_map::Entry::Occupied(mut chan) => { (try_chan_entry!(self, chan.get_mut().funding_created(msg, best_block, &self.signer_provider, &self.logger), chan), chan.remove()) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id)) - } - }; - // Because we have exclusive ownership of the channel here we can release the peer_state - // lock before watch_channel - match self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor) { - ChannelMonitorUpdateStatus::Completed => {}, - ChannelMonitorUpdateStatus::PermanentFailure => { - // Note that we reply with the new channel_id in error messages if we gave up on the - // channel, not the temporary_channel_id. This is compatible with ourselves, but the - // spec is somewhat ambiguous here. Not a huge deal since we'll send error messages for - // any messages referencing a previously-closed channel anyway. - // We do not propagate the monitor update to the user as it would be for a monitor - // that we didn't manage to store (and that we don't care about - we don't respond - // with the funding_signed so the channel can never go on chain). - let (_monitor_update, failed_htlcs) = chan.force_shutdown(false); - assert!(failed_htlcs.is_empty()); - return Err(MsgHandleErrInternal::send_err_msg_no_close("ChannelMonitor storage failure".to_owned(), funding_msg.channel_id)); - }, - ChannelMonitorUpdateStatus::InProgress => { - // There's no problem signing a counterparty's funding transaction if our monitor - // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't - // accepted payment from yet. We do, however, need to wait to send our channel_ready - // until we have persisted our monitor. - chan.monitor_updating_paused(false, false, channel_ready.is_some(), Vec::new(), Vec::new(), Vec::new()); - channel_ready = None; // Don't send the channel_ready now - }, - } - // It's safe to unwrap as we've held the `per_peer_state` read lock since checking that the - // peer exists, despite the inner PeerState potentially having no channels after removing - // the channel above. - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; + }; + match peer_state.channel_by_id.entry(funding_msg.channel_id) { hash_map::Entry::Occupied(_) => { - return Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id".to_owned(), funding_msg.channel_id)) + Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id".to_owned(), funding_msg.channel_id)) }, hash_map::Entry::Vacant(e) => { - let mut id_to_peer = self.id_to_peer.lock().unwrap(); - match id_to_peer.entry(chan.channel_id()) { + match self.id_to_peer.lock().unwrap().entry(chan.channel_id()) { hash_map::Entry::Occupied(_) => { return Err(MsgHandleErrInternal::send_err_msg_no_close( "The funding_created message had the same funding_txid as an existing channel - funding is not possible".to_owned(), @@ -4569,63 +4486,66 @@ where i_e.insert(chan.get_counterparty_node_id()); } } + + // There's no problem signing a counterparty's funding transaction if our monitor + // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't + // accepted payment from yet. We do, however, need to wait to send our channel_ready + // until we have persisted our monitor. + let new_channel_id = funding_msg.channel_id; peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned { node_id: counterparty_node_id.clone(), msg: funding_msg, }); - if let Some(msg) = channel_ready { - send_channel_ready!(self, peer_state.pending_msg_events, chan, msg); + + let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor); + + let chan = e.insert(chan); + let mut res = handle_new_monitor_update!(self, monitor_res, 0, peer_state_lock, peer_state, chan, MANUALLY_REMOVING, { peer_state.channel_by_id.remove(&new_channel_id) }); + + // Note that we reply with the new channel_id in error messages if we gave up on the + // channel, not the temporary_channel_id. This is compatible with ourselves, but the + // spec is somewhat ambiguous here. Not a huge deal since we'll send error messages for + // any messages referencing a previously-closed channel anyway. + // We do not propagate the monitor update to the user as it would be for a monitor + // that we didn't manage to store (and that we don't care about - we don't respond + // with the funding_signed so the channel can never go on chain). + if let Err(MsgHandleErrInternal { shutdown_finish: Some((res, _)), .. }) = &mut res { + res.0 = None; } - e.insert(chan); + res } } - Ok(()) } fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> { - let funding_tx = { - let best_block = *self.best_block.read().unwrap(); - let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex = per_peer_state.get(counterparty_node_id) - .ok_or_else(|| { - debug_assert!(false); - MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) - })?; + let best_block = *self.best_block.read().unwrap(); + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; - match peer_state.channel_by_id.entry(msg.channel_id) { - hash_map::Entry::Occupied(mut chan) => { - let (monitor, funding_tx, channel_ready) = match chan.get_mut().funding_signed(&msg, best_block, &self.signer_provider, &self.logger) { - Ok(update) => update, - Err(e) => try_chan_entry!(self, Err(e), chan), - }; - match self.chain_monitor.watch_channel(chan.get().get_funding_txo().unwrap(), monitor) { - ChannelMonitorUpdateStatus::Completed => {}, - e => { - let mut res = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::RevokeAndACKFirst, channel_ready.is_some(), OPTIONALLY_RESEND_FUNDING_LOCKED); - if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res { - // We weren't able to watch the channel to begin with, so no updates should be made on - // it. Previously, full_stack_target found an (unreachable) panic when the - // monitor update contained within `shutdown_finish` was applied. - if let Some((ref mut shutdown_finish, _)) = shutdown_finish { - shutdown_finish.0.take(); - } - } - return res - }, + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.entry(msg.channel_id) { + hash_map::Entry::Occupied(mut chan) => { + let monitor = try_chan_entry!(self, + chan.get_mut().funding_signed(&msg, best_block, &self.signer_provider, &self.logger), chan); + let update_res = self.chain_monitor.watch_channel(chan.get().get_funding_txo().unwrap(), monitor); + let mut res = handle_new_monitor_update!(self, update_res, 0, peer_state_lock, peer_state, chan); + if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res { + // We weren't able to watch the channel to begin with, so no updates should be made on + // it. Previously, full_stack_target found an (unreachable) panic when the + // monitor update contained within `shutdown_finish` was applied. + if let Some((ref mut shutdown_finish, _)) = shutdown_finish { + shutdown_finish.0.take(); } - if let Some(msg) = channel_ready { - send_channel_ready!(self, peer_state.pending_msg_events, chan.get(), msg); - } - funding_tx - }, - hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) - } - }; - log_info!(self.logger, "Broadcasting funding transaction with txid {}", funding_tx.txid()); - self.tx_broadcaster.broadcast_transaction(&funding_tx); - Ok(()) + } + res + }, + hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) + } } fn internal_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) -> Result<(), MsgHandleErrInternal> { @@ -4690,27 +4610,27 @@ where if chan_entry.get().sent_shutdown() { " after we initiated shutdown" } else { "" }); } - let (shutdown, monitor_update, htlcs) = try_chan_entry!(self, chan_entry.get_mut().shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_entry); + let funding_txo_opt = chan_entry.get().get_funding_txo(); + let (shutdown, monitor_update_opt, htlcs) = try_chan_entry!(self, + chan_entry.get_mut().shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_entry); dropped_htlcs = htlcs; - // Update the monitor with the shutdown script if necessary. - if let Some(monitor_update) = monitor_update { - let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), &monitor_update); - let (result, is_permanent) = - handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE); - if is_permanent { - remove_channel!(self, chan_entry); - break result; - } - } - if let Some(msg) = shutdown { + // We can send the `shutdown` message before updating the `ChannelMonitor` + // here as we don't need the monitor update to complete until we send a + // `shutdown_signed`, which we'll delay if we're pending a monitor update. peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { node_id: *counterparty_node_id, msg, }); } + // Update the monitor with the shutdown script if necessary. + if let Some(monitor_update) = monitor_update_opt { + let update_id = monitor_update.update_id; + let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update); + break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan_entry); + } break Ok(()); }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) @@ -4722,8 +4642,7 @@ where self.fail_htlc_backwards_internal(&htlc_source.0, &htlc_source.1, &reason, receiver); } - let _ = handle_error!(self, result, *counterparty_node_id); - Ok(()) + result } fn internal_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> { @@ -4897,40 +4816,12 @@ where let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { - let (revoke_and_ack, commitment_signed, monitor_update) = - match chan.get_mut().commitment_signed(&msg, &self.logger) { - Err((None, e)) => try_chan_entry!(self, Err(e), chan), - Err((Some(update), e)) => { - assert!(chan.get().is_awaiting_monitor_update()); - let _ = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &update); - try_chan_entry!(self, Err(e), chan); - unreachable!(); - }, - Ok(res) => res - }; - let update_res = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update); - if let Err(e) = handle_monitor_update_res!(self, update_res, chan, RAACommitmentOrder::RevokeAndACKFirst, true, commitment_signed.is_some()) { - return Err(e); - } - - peer_state.pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK { - node_id: counterparty_node_id.clone(), - msg: revoke_and_ack, - }); - if let Some(msg) = commitment_signed { - peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: counterparty_node_id.clone(), - updates: msgs::CommitmentUpdate { - update_add_htlcs: Vec::new(), - update_fulfill_htlcs: Vec::new(), - update_fail_htlcs: Vec::new(), - update_fail_malformed_htlcs: Vec::new(), - update_fee: None, - commitment_signed: msg, - }, - }); - } - Ok(()) + let funding_txo = chan.get().get_funding_txo(); + let monitor_update = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan); + let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); + let update_id = monitor_update.update_id; + handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, + peer_state, chan) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } @@ -5034,8 +4925,7 @@ where } fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> { - let mut htlcs_to_fail = Vec::new(); - let res = loop { + let (htlcs_to_fail, res) = { let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -5046,59 +4936,19 @@ where let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { - let was_paused_for_mon_update = chan.get().is_awaiting_monitor_update(); - let raa_updates = break_chan_entry!(self, - chan.get_mut().revoke_and_ack(&msg, &self.logger), chan); - htlcs_to_fail = raa_updates.holding_cell_failed_htlcs; - let update_res = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &raa_updates.monitor_update); - if was_paused_for_mon_update { - assert!(update_res != ChannelMonitorUpdateStatus::Completed); - assert!(raa_updates.commitment_update.is_none()); - assert!(raa_updates.accepted_htlcs.is_empty()); - assert!(raa_updates.failed_htlcs.is_empty()); - assert!(raa_updates.finalized_claimed_htlcs.is_empty()); - break Err(MsgHandleErrInternal::ignore_no_close("Existing pending monitor update prevented responses to RAA".to_owned())); - } - if update_res != ChannelMonitorUpdateStatus::Completed { - if let Err(e) = handle_monitor_update_res!(self, update_res, chan, - RAACommitmentOrder::CommitmentFirst, false, - raa_updates.commitment_update.is_some(), false, - raa_updates.accepted_htlcs, raa_updates.failed_htlcs, - raa_updates.finalized_claimed_htlcs) { - break Err(e); - } else { unreachable!(); } - } - if let Some(updates) = raa_updates.commitment_update { - peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: counterparty_node_id.clone(), - updates, - }); - } - break Ok((raa_updates.accepted_htlcs, raa_updates.failed_htlcs, - raa_updates.finalized_claimed_htlcs, - chan.get().get_short_channel_id() - .unwrap_or(chan.get().outbound_scid_alias()), - chan.get().get_funding_txo().unwrap(), - chan.get().get_user_id())) + let funding_txo = chan.get().get_funding_txo(); + let (htlcs_to_fail, monitor_update) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan); + let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); + let update_id = monitor_update.update_id; + let res = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, + peer_state, chan); + (htlcs_to_fail, res) }, - hash_map::Entry::Vacant(_) => break Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) + hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } }; self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id, counterparty_node_id); - match res { - Ok((pending_forwards, mut pending_failures, finalized_claim_htlcs, - short_channel_id, channel_outpoint, user_channel_id)) => - { - for failure in pending_failures.drain(..) { - let receiver = HTLCDestination::NextHopChannel { node_id: Some(*counterparty_node_id), channel_id: channel_outpoint.to_channel_id() }; - self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver); - } - self.forward_htlcs(&mut [(short_channel_id, channel_outpoint, user_channel_id, pending_forwards)]); - self.finalize_claims(finalized_claim_htlcs); - Ok(()) - }, - Err(e) => Err(e) - } + res } fn internal_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) -> Result<(), MsgHandleErrInternal> { @@ -5340,49 +5190,37 @@ where let mut has_monitor_update = false; let mut failed_htlcs = Vec::new(); let mut handle_errors = Vec::new(); - { - let per_peer_state = self.per_peer_state.read().unwrap(); + let per_peer_state = self.per_peer_state.read().unwrap(); - for (_cp_id, peer_state_mutex) in per_peer_state.iter() { + for (_cp_id, peer_state_mutex) in per_peer_state.iter() { + 'chan_loop: loop { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; - let pending_msg_events = &mut peer_state.pending_msg_events; - peer_state.channel_by_id.retain(|channel_id, chan| { - match chan.maybe_free_holding_cell_htlcs(&self.logger) { - Ok((commitment_opt, holding_cell_failed_htlcs)) => { - if !holding_cell_failed_htlcs.is_empty() { - failed_htlcs.push(( - holding_cell_failed_htlcs, - *channel_id, - chan.get_counterparty_node_id() - )); - } - if let Some((commitment_update, monitor_update)) = commitment_opt { - match self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), &monitor_update) { - ChannelMonitorUpdateStatus::Completed => { - pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: chan.get_counterparty_node_id(), - updates: commitment_update, - }); - }, - e => { - has_monitor_update = true; - let (res, close_channel) = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, channel_id, COMMITMENT_UPDATE_ONLY); - handle_errors.push((chan.get_counterparty_node_id(), res)); - if close_channel { return false; } - }, - } - } - true - }, - Err(e) => { - let (close_channel, res) = convert_chan_err!(self, e, chan, channel_id); - handle_errors.push((chan.get_counterparty_node_id(), Err(res))); - // ChannelClosed event is generated by handle_error for us - !close_channel - } + let peer_state: &mut PeerState<_> = &mut *peer_state_lock; + for (channel_id, chan) in peer_state.channel_by_id.iter_mut() { + let counterparty_node_id = chan.get_counterparty_node_id(); + let funding_txo = chan.get_funding_txo(); + let (monitor_opt, holding_cell_failed_htlcs) = + chan.maybe_free_holding_cell_htlcs(&self.logger); + if !holding_cell_failed_htlcs.is_empty() { + failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id)); } - }); + if let Some(monitor_update) = monitor_opt { + has_monitor_update = true; + + let update_res = self.chain_monitor.update_channel( + funding_txo.expect("channel is live"), monitor_update); + let update_id = monitor_update.update_id; + let channel_id: [u8; 32] = *channel_id; + let res = handle_new_monitor_update!(self, update_res, update_id, + peer_state_lock, peer_state, chan, MANUALLY_REMOVING, + peer_state.channel_by_id.remove(&channel_id)); + if res.is_err() { + handle_errors.push((counterparty_node_id, res)); + } + continue 'chan_loop; + } + } + break 'chan_loop; } } @@ -6432,6 +6270,7 @@ where channel_by_id: HashMap::new(), latest_features: init_msg.features.clone(), pending_msg_events: Vec::new(), + monitor_update_blocked_actions: BTreeMap::new(), is_connected: true, })); }, @@ -7069,10 +6908,14 @@ where htlc_purposes.push(purpose); } + let mut monitor_update_blocked_actions_per_peer = None; + let mut peer_states = Vec::new(); + for (_, peer_state_mutex) in per_peer_state.iter() { + peer_states.push(peer_state_mutex.lock().unwrap()); + } + (serializable_peer_count).write(writer)?; - for (peer_pubkey, peer_state_mutex) in per_peer_state.iter() { - let peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &*peer_state_lock; + for ((peer_pubkey, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) { // Peers which we have no channels to should be dropped once disconnected. As we // disconnect all peers when shutting down and serializing the ChannelManager, we // consider all peers as disconnected here. There's therefore no need write peers with @@ -7080,6 +6923,11 @@ where if !peer_state.ok_to_remove(false) { peer_pubkey.write(writer)?; peer_state.latest_features.write(writer)?; + if !peer_state.monitor_update_blocked_actions.is_empty() { + monitor_update_blocked_actions_per_peer + .get_or_insert_with(Vec::new) + .push((*peer_pubkey, &peer_state.monitor_update_blocked_actions)); + } } } @@ -7157,8 +7005,6 @@ where // LDK versions prior to 0.0.113 do not know how to read the pending claimed payments // map. Thus, if there are no entries we skip writing a TLV for it. pending_claiming_payments = None; - } else { - debug_assert!(false, "While we have code to serialize pending_claiming_payments, the map should always be empty until a later PR"); } write_tlv_fields!(writer, { @@ -7167,6 +7013,7 @@ where (3, pending_outbound_payments, required), (4, pending_claiming_payments, option), (5, self.our_network_pubkey, required), + (6, monitor_update_blocked_actions_per_peer, option), (7, self.fake_scid_rand_bytes, required), (9, htlc_purposes, vec_type), (11, self.probing_cookie_secret, required), @@ -7479,6 +7326,7 @@ where channel_by_id: peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()), latest_features: Readable::read(reader)?, pending_msg_events: Vec::new(), + monitor_update_blocked_actions: BTreeMap::new(), is_connected: false, }; per_peer_state.insert(peer_pubkey, Mutex::new(peer_state)); @@ -7535,12 +7383,14 @@ where let mut probing_cookie_secret: Option<[u8; 32]> = None; let mut claimable_htlc_purposes = None; let mut pending_claiming_payments = Some(HashMap::new()); + let mut monitor_update_blocked_actions_per_peer = Some(Vec::new()); read_tlv_fields!(reader, { (1, pending_outbound_payments_no_retry, option), (2, pending_intercepted_htlcs, option), (3, pending_outbound_payments, option), (4, pending_claiming_payments, option), (5, received_network_pubkey, option), + (6, monitor_update_blocked_actions_per_peer, option), (7, fake_scid_rand_bytes, option), (9, claimable_htlc_purposes, vec_type), (11, probing_cookie_secret, option), @@ -7807,6 +7657,15 @@ where } } + for (node_id, monitor_update_blocked_actions) in monitor_update_blocked_actions_per_peer.unwrap() { + if let Some(peer_state) = per_peer_state.get_mut(&node_id) { + peer_state.lock().unwrap().monitor_update_blocked_actions = monitor_update_blocked_actions; + } else { + log_error!(args.logger, "Got blocked actions without a per-peer-state for {}", node_id); + return Err(DecodeError::InvalidValue); + } + } + let channel_manager = ChannelManager { genesis_hash, fee_estimator: bounded_fee_estimator, diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 65f27da13..d6d2972d0 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -3618,22 +3618,22 @@ fn test_simple_peer_disconnect() { _ => panic!("Unexpected event"), } match events[1] { + Event::PaymentPathSuccessful { .. } => {}, + _ => panic!("Unexpected event"), + } + match events[2] { Event::PaymentPathFailed { payment_hash, payment_failed_permanently, .. } => { assert_eq!(payment_hash, payment_hash_5); assert!(payment_failed_permanently); }, _ => panic!("Unexpected event"), } - match events[2] { + match events[3] { Event::PaymentFailed { payment_hash, .. } => { assert_eq!(payment_hash, payment_hash_5); }, _ => panic!("Unexpected event"), } - match events[3] { - Event::PaymentPathSuccessful { .. } => {}, - _ => panic!("Unexpected event"), - } } claim_payment(&nodes[0], &vec!(&nodes[1], &nodes[2]), payment_preimage_4); @@ -8186,7 +8186,7 @@ fn test_update_err_monitor_lockdown() { let mut node_0_per_peer_lock; let mut node_0_peer_state_lock; let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan_1.2); - if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { + if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { assert_eq!(watchtower.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::PermanentFailure); assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::Completed); } else { assert!(false); } @@ -8280,7 +8280,7 @@ fn test_concurrent_monitor_claim() { let mut node_0_per_peer_lock; let mut node_0_peer_state_lock; let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan_1.2); - if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { + if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { // Watchtower Alice should already have seen the block and reject the update assert_eq!(watchtower_alice.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::PermanentFailure); assert_eq!(watchtower_bob.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::Completed); @@ -8736,9 +8736,9 @@ fn test_duplicate_chan_id() { }; check_added_monitors!(nodes[0], 0); nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created); - // At this point we'll try to add a duplicate channel monitor, which will be rejected, but - // still needs to be cleared here. - check_added_monitors!(nodes[1], 1); + // At this point we'll look up if the channel_id is present and immediately fail the channel + // without trying to persist the `ChannelMonitor`. + check_added_monitors!(nodes[1], 0); // ...still, nodes[1] will reject the duplicate channel. { diff --git a/lightning/src/ln/payment_tests.rs b/lightning/src/ln/payment_tests.rs index 249051c40..96a04f417 100644 --- a/lightning/src/ln/payment_tests.rs +++ b/lightning/src/ln/payment_tests.rs @@ -258,7 +258,7 @@ fn no_pending_leak_on_initial_send_failure() { unwrap_send_err!(nodes[0].node.send_payment(&route, payment_hash, &Some(payment_secret), PaymentId(payment_hash.0)), true, APIError::ChannelUnavailable { ref err }, - assert_eq!(err, "Peer for first hop currently disconnected/pending monitor update!")); + assert_eq!(err, "Peer for first hop currently disconnected")); assert!(!nodes[0].node.has_pending_payments()); } diff --git a/lightning/src/util/ser.rs b/lightning/src/util/ser.rs index 928cc6194..847005e32 100644 --- a/lightning/src/util/ser.rs +++ b/lightning/src/util/ser.rs @@ -770,6 +770,7 @@ impl Readable for Vec { } impl_for_vec!(ecdsa::Signature); +impl_for_vec!(crate::ln::channelmanager::MonitorUpdateCompletionAction); impl_for_vec!((A, B), A, B); impl Writeable for Script {