Merge pull request #1897 from TheBlueMatt/2022-11-monitor-updates-always-async

Always process `ChannelMonitorUpdate`s asynchronously
This commit is contained in:
Matt Corallo 2023-02-22 19:12:31 +00:00 committed by GitHub
commit 96c8507fbf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 624 additions and 709 deletions

View file

@ -295,7 +295,7 @@ fn check_api_err(api_err: APIError) {
// all others. If you hit this panic, the list of acceptable errors
// is probably just stale and you should add new messages here.
match err.as_str() {
"Peer for first hop currently disconnected/pending monitor update!" => {},
"Peer for first hop currently disconnected" => {},
_ if err.starts_with("Cannot push more than their max accepted HTLCs ") => {},
_ if err.starts_with("Cannot send value that would put us over the max HTLC value in flight our peer will accept ") => {},
_ if err.starts_with("Cannot send value that would put our balance under counterparty-announced channel reserve value") => {},

View file

@ -796,7 +796,7 @@ mod tests {
use crate::ln::functional_test_utils::*;
use crate::ln::msgs::ChannelMessageHandler;
use crate::util::errors::APIError;
use crate::util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider};
use crate::util::events::{Event, ClosureReason, MessageSendEvent, MessageSendEventsProvider};
#[test]
fn test_async_ooo_offchain_updates() {
@ -819,10 +819,8 @@ mod tests {
nodes[1].node.claim_funds(payment_preimage_1);
check_added_monitors!(nodes[1], 1);
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
nodes[1].node.claim_funds(payment_preimage_2);
check_added_monitors!(nodes[1], 1);
expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000);
let persistences = chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clone();
assert_eq!(persistences.len(), 1);
@ -850,8 +848,24 @@ mod tests {
.find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update));
assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty());
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
assert!(nodes[1].node.get_and_clear_pending_events().is_empty());
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap();
let claim_events = nodes[1].node.get_and_clear_pending_events();
assert_eq!(claim_events.len(), 2);
match claim_events[0] {
Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
assert_eq!(payment_hash_1, *payment_hash);
},
_ => panic!("Unexpected event"),
}
match claim_events[1] {
Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
assert_eq!(payment_hash_2, *payment_hash);
},
_ => panic!("Unexpected event"),
}
// Now manually walk the commitment signed dance - because we claimed two payments
// back-to-back it doesn't fit into the neat walk commitment_signed_dance does.

View file

@ -143,7 +143,7 @@ fn test_monitor_and_persister_update_fail() {
let mut node_0_per_peer_lock;
let mut node_0_peer_state_lock;
let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan.2);
if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
// Check that even though the persister is returning a InProgress,
// because the update is bogus, ultimately the error that's returned
// should be a PermanentFailure.
@ -1602,7 +1602,6 @@ fn test_monitor_update_fail_claim() {
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
nodes[1].node.claim_funds(payment_preimage_1);
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
check_added_monitors!(nodes[1], 1);
@ -1628,6 +1627,7 @@ fn test_monitor_update_fail_claim() {
let events = nodes[1].node.get_and_clear_pending_msg_events();
assert_eq!(events.len(), 0);
commitment_signed_dance!(nodes[1], nodes[2], payment_event.commitment_msg, false, true);
expect_pending_htlcs_forwardable_ignore!(nodes[1]);
let (_, payment_hash_3, payment_secret_3) = get_payment_preimage_hash!(nodes[0]);
nodes[2].node.send_payment(&route, payment_hash_3, &Some(payment_secret_3), PaymentId(payment_hash_3.0)).unwrap();
@ -1645,6 +1645,7 @@ fn test_monitor_update_fail_claim() {
let channel_id = chan_1.2;
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
check_added_monitors!(nodes[1], 0);
let bs_fulfill_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
@ -1653,7 +1654,7 @@ fn test_monitor_update_fail_claim() {
expect_payment_sent!(nodes[0], payment_preimage_1);
// Get the payment forwards, note that they were batched into one commitment update.
expect_pending_htlcs_forwardable!(nodes[1]);
nodes[1].node.process_pending_htlc_forwards();
check_added_monitors!(nodes[1], 1);
let bs_forward_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
nodes[0].node.handle_update_add_htlc(&nodes[1].node.get_our_node_id(), &bs_forward_update.update_add_htlcs[0]);
@ -1739,7 +1740,6 @@ fn test_monitor_update_on_pending_forwards() {
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
expect_pending_htlcs_forwardable_and_htlc_handling_failed!(nodes[1], vec![HTLCDestination::NextHopChannel { node_id: Some(nodes[2].node.get_our_node_id()), channel_id: chan_2.2 }]);
check_added_monitors!(nodes[1], 1);
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone();
@ -1753,17 +1753,17 @@ fn test_monitor_update_on_pending_forwards() {
let events = nodes[0].node.get_and_clear_pending_events();
assert_eq!(events.len(), 3);
if let Event::PaymentPathFailed { payment_hash, payment_failed_permanently, .. } = events[0] {
if let Event::PaymentPathFailed { payment_hash, payment_failed_permanently, .. } = events[1] {
assert_eq!(payment_hash, payment_hash_1);
assert!(payment_failed_permanently);
} else { panic!("Unexpected event!"); }
match events[1] {
match events[2] {
Event::PaymentFailed { payment_hash, .. } => {
assert_eq!(payment_hash, payment_hash_1);
},
_ => panic!("Unexpected event"),
}
match events[2] {
match events[0] {
Event::PendingHTLCsForwardable { .. } => { },
_ => panic!("Unexpected event"),
};
@ -1803,7 +1803,6 @@ fn monitor_update_claim_fail_no_response() {
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
nodes[1].node.claim_funds(payment_preimage_1);
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
check_added_monitors!(nodes[1], 1);
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
@ -1811,6 +1810,7 @@ fn monitor_update_claim_fail_no_response() {
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
check_added_monitors!(nodes[1], 0);
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
@ -2290,7 +2290,6 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) {
chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
nodes[0].node.claim_funds(payment_preimage_0);
check_added_monitors!(nodes[0], 1);
expect_payment_claimed!(nodes[0], payment_hash_0, 100_000);
nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &send.msgs[0]);
nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &send.commitment_msg);
@ -2353,6 +2352,7 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) {
chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
let (funding_txo, mon_id, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id).unwrap().clone();
nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_txo, mon_id);
expect_payment_claimed!(nodes[0], payment_hash_0, 100_000);
// New outbound messages should be generated immediately upon a call to
// get_and_clear_pending_msg_events (but not before).
@ -2606,7 +2606,15 @@ fn test_permanent_error_during_sending_shutdown() {
chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::PermanentFailure);
assert!(nodes[0].node.close_channel(&channel_id, &nodes[1].node.get_our_node_id()).is_ok());
check_closed_broadcast!(nodes[0], true);
// We always send the `shutdown` response when initiating a shutdown, even if we immediately
// close the channel thereafter.
let msg_events = nodes[0].node.get_and_clear_pending_msg_events();
assert_eq!(msg_events.len(), 3);
if let MessageSendEvent::SendShutdown { .. } = msg_events[0] {} else { panic!(); }
if let MessageSendEvent::BroadcastChannelUpdate { .. } = msg_events[1] {} else { panic!(); }
if let MessageSendEvent::HandleError { .. } = msg_events[2] {} else { panic!(); }
check_added_monitors!(nodes[0], 2);
check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() });
}
@ -2629,7 +2637,15 @@ fn test_permanent_error_during_handling_shutdown() {
assert!(nodes[0].node.close_channel(&channel_id, &nodes[1].node.get_our_node_id()).is_ok());
let shutdown = get_event_msg!(nodes[0], MessageSendEvent::SendShutdown, nodes[1].node.get_our_node_id());
nodes[1].node.handle_shutdown(&nodes[0].node.get_our_node_id(), &shutdown);
check_closed_broadcast!(nodes[1], true);
// We always send the `shutdown` response when receiving a shutdown, even if we immediately
// close the channel thereafter.
let msg_events = nodes[1].node.get_and_clear_pending_msg_events();
assert_eq!(msg_events.len(), 3);
if let MessageSendEvent::SendShutdown { .. } = msg_events[0] {} else { panic!(); }
if let MessageSendEvent::BroadcastChannelUpdate { .. } = msg_events[1] {} else { panic!(); }
if let MessageSendEvent::HandleError { .. } = msg_events[2] {} else { panic!(); }
check_added_monitors!(nodes[1], 2);
check_closed_event!(nodes[1], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() });
}
@ -2651,7 +2667,6 @@ fn double_temp_error() {
// `claim_funds` results in a ChannelMonitorUpdate.
nodes[1].node.claim_funds(payment_preimage_1);
check_added_monitors!(nodes[1], 1);
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
let (funding_tx, latest_update_1, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
@ -2659,7 +2674,6 @@ fn double_temp_error() {
// which had some asserts that prevented it from being called twice.
nodes[1].node.claim_funds(payment_preimage_2);
check_added_monitors!(nodes[1], 1);
expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000);
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
let (_, latest_update_2, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
@ -2668,11 +2682,24 @@ fn double_temp_error() {
check_added_monitors!(nodes[1], 0);
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_tx, latest_update_2);
// Complete the first HTLC.
let events = nodes[1].node.get_and_clear_pending_msg_events();
assert_eq!(events.len(), 1);
// Complete the first HTLC. Note that as a side-effect we handle the monitor update completions
// and get both PaymentClaimed events at once.
let msg_events = nodes[1].node.get_and_clear_pending_msg_events();
let events = nodes[1].node.get_and_clear_pending_events();
assert_eq!(events.len(), 2);
match events[0] {
Event::PaymentClaimed { amount_msat: 1_000_000, payment_hash, .. } => assert_eq!(payment_hash, payment_hash_1),
_ => panic!("Unexpected Event: {:?}", events[0]),
}
match events[1] {
Event::PaymentClaimed { amount_msat: 1_000_000, payment_hash, .. } => assert_eq!(payment_hash, payment_hash_2),
_ => panic!("Unexpected Event: {:?}", events[1]),
}
assert_eq!(msg_events.len(), 1);
let (update_fulfill_1, commitment_signed_b1, node_id) = {
match &events[0] {
match &msg_events[0] {
&MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
assert!(update_add_htlcs.is_empty());
assert_eq!(update_fulfill_htlcs.len(), 1);

View file

@ -393,35 +393,21 @@ enum UpdateFulfillFetch {
}
/// The return type of get_update_fulfill_htlc_and_commit.
pub enum UpdateFulfillCommitFetch {
pub enum UpdateFulfillCommitFetch<'a> {
/// Indicates the HTLC fulfill is new, and either generated an update_fulfill message, placed
/// it in the holding cell, or re-generated the update_fulfill message after the same claim was
/// previously placed in the holding cell (and has since been removed).
NewClaim {
/// The ChannelMonitorUpdate which places the new payment preimage in the channel monitor
monitor_update: ChannelMonitorUpdate,
monitor_update: &'a ChannelMonitorUpdate,
/// The value of the HTLC which was claimed, in msat.
htlc_value_msat: u64,
/// The update_fulfill message and commitment_signed message (if the claim was not placed
/// in the holding cell).
msgs: Option<(msgs::UpdateFulfillHTLC, msgs::CommitmentSigned)>,
},
/// Indicates the HTLC fulfill is duplicative and already existed either in the holding cell
/// or has been forgotten (presumably previously claimed).
DuplicateClaim {},
}
/// The return value of `revoke_and_ack` on success, primarily updates to other channels or HTLC
/// state.
pub(super) struct RAAUpdates {
pub commitment_update: Option<msgs::CommitmentUpdate>,
pub accepted_htlcs: Vec<(PendingHTLCInfo, u64)>,
pub failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
pub finalized_claimed_htlcs: Vec<HTLCSource>,
pub monitor_update: ChannelMonitorUpdate,
pub holding_cell_failed_htlcs: Vec<(HTLCSource, PaymentHash)>,
}
/// The return value of `monitor_updating_restored`
pub(super) struct MonitorRestoreUpdates {
pub raa: Option<msgs::RevokeAndACK>,
@ -558,6 +544,11 @@ pub(super) struct Channel<Signer: ChannelSigner> {
monitor_pending_channel_ready: bool,
monitor_pending_revoke_and_ack: bool,
monitor_pending_commitment_signed: bool,
// TODO: If a channel is drop'd, we don't know whether the `ChannelMonitor` is ultimately
// responsible for some of the HTLCs here or not - we don't know whether the update in question
// completed or not. We currently ignore these fields entirely when force-closing a channel,
// but need to handle this somehow or we run the risk of losing HTLCs!
monitor_pending_forwards: Vec<(PendingHTLCInfo, u64)>,
monitor_pending_failures: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
monitor_pending_finalized_fulfills: Vec<HTLCSource>,
@ -743,6 +734,12 @@ pub(super) struct Channel<Signer: ChannelSigner> {
/// The unique identifier used to re-derive the private key material for the channel through
/// [`SignerProvider::derive_channel_signer`].
channel_keys_id: [u8; 32],
/// When we generate [`ChannelMonitorUpdate`]s to persist, they may not be persisted immediately.
/// If we then persist the [`channelmanager::ChannelManager`] and crash before the persistence
/// completes we still need to be able to complete the persistence. Thus, we have to keep a
/// copy of the [`ChannelMonitorUpdate`] here until it is complete.
pending_monitor_updates: Vec<ChannelMonitorUpdate>,
}
#[cfg(any(test, fuzzing))]
@ -1112,6 +1109,8 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
channel_type,
channel_keys_id,
pending_monitor_updates: Vec::new(),
})
}
@ -1458,6 +1457,8 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
channel_type,
channel_keys_id,
pending_monitor_updates: Vec::new(),
};
Ok(chan)
@ -1964,22 +1965,30 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}
}
pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> Result<UpdateFulfillCommitFetch, (ChannelError, ChannelMonitorUpdate)> where L::Target: Logger {
pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger {
match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) {
UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(update_fulfill_htlc) } => {
let (commitment, mut additional_update) = match self.send_commitment_no_status_check(logger) {
Err(e) => return Err((e, monitor_update)),
Ok(res) => res
};
// send_commitment_no_status_check may bump latest_monitor_id but we want them to be
UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(_) } => {
let mut additional_update = self.build_commitment_no_status_check(logger);
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
// strictly increasing by one, so decrement it here.
self.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);
Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, msgs: Some((update_fulfill_htlc, commitment)) })
self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
self.pending_monitor_updates.push(monitor_update);
UpdateFulfillCommitFetch::NewClaim {
monitor_update: self.pending_monitor_updates.last().unwrap(),
htlc_value_msat,
}
},
UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } =>
Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, msgs: None }),
UpdateFulfillFetch::DuplicateClaim {} => Ok(UpdateFulfillCommitFetch::DuplicateClaim {}),
UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } => {
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
self.pending_monitor_updates.push(monitor_update);
UpdateFulfillCommitFetch::NewClaim {
monitor_update: self.pending_monitor_updates.last().unwrap(),
htlc_value_msat,
}
}
UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {},
}
}
@ -2259,9 +2268,9 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
pub fn funding_created<SP: Deref, L: Deref>(
&mut self, msg: &msgs::FundingCreated, best_block: BestBlock, signer_provider: &SP, logger: &L
) -> Result<(msgs::FundingSigned, ChannelMonitor<<SP::Target as SignerProvider>::Signer>, Option<msgs::ChannelReady>), ChannelError>
) -> Result<(msgs::FundingSigned, ChannelMonitor<Signer>), ChannelError>
where
SP::Target: SignerProvider,
SP::Target: SignerProvider<Signer = Signer>,
L::Target: Logger
{
if self.is_outbound() {
@ -2337,19 +2346,22 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
log_info!(logger, "Generated funding_signed for peer for channel {}", log_bytes!(self.channel_id()));
let need_channel_ready = self.check_get_channel_ready(0).is_some();
self.monitor_updating_paused(false, false, need_channel_ready, Vec::new(), Vec::new(), Vec::new());
Ok((msgs::FundingSigned {
channel_id: self.channel_id,
signature
}, channel_monitor, self.check_get_channel_ready(0)))
}, channel_monitor))
}
/// Handles a funding_signed message from the remote end.
/// If this call is successful, broadcast the funding transaction (and not before!)
pub fn funding_signed<SP: Deref, L: Deref>(
&mut self, msg: &msgs::FundingSigned, best_block: BestBlock, signer_provider: &SP, logger: &L
) -> Result<(ChannelMonitor<<SP::Target as SignerProvider>::Signer>, Transaction, Option<msgs::ChannelReady>), ChannelError>
) -> Result<ChannelMonitor<Signer>, ChannelError>
where
SP::Target: SignerProvider,
SP::Target: SignerProvider<Signer = Signer>,
L::Target: Logger
{
if !self.is_outbound() {
@ -2422,7 +2434,9 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
log_info!(logger, "Received funding_signed from peer for channel {}", log_bytes!(self.channel_id()));
Ok((channel_monitor, self.funding_transaction.as_ref().cloned().unwrap(), self.check_get_channel_ready(0)))
let need_channel_ready = self.check_get_channel_ready(0).is_some();
self.monitor_updating_paused(false, false, need_channel_ready, Vec::new(), Vec::new(), Vec::new());
Ok(channel_monitor)
}
/// Handles a channel_ready message from our peer. If we've already sent our channel_ready
@ -3034,17 +3048,17 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
Ok(())
}
pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<(msgs::RevokeAndACK, Option<msgs::CommitmentSigned>, ChannelMonitorUpdate), (Option<ChannelMonitorUpdate>, ChannelError)>
pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<&ChannelMonitorUpdate, ChannelError>
where L::Target: Logger
{
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
return Err((None, ChannelError::Close("Got commitment signed message when channel was not in an operational state".to_owned())));
return Err(ChannelError::Close("Got commitment signed message when channel was not in an operational state".to_owned()));
}
if self.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 {
return Err((None, ChannelError::Close("Peer sent commitment_signed when we needed a channel_reestablish".to_owned())));
return Err(ChannelError::Close("Peer sent commitment_signed when we needed a channel_reestablish".to_owned()));
}
if self.channel_state & BOTH_SIDES_SHUTDOWN_MASK == BOTH_SIDES_SHUTDOWN_MASK && self.last_sent_closing_fee.is_some() {
return Err((None, ChannelError::Close("Peer sent commitment_signed after we'd started exchanging closing_signeds".to_owned())));
return Err(ChannelError::Close("Peer sent commitment_signed after we'd started exchanging closing_signeds".to_owned()));
}
let funding_script = self.get_funding_redeemscript();
@ -3062,7 +3076,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
log_bytes!(self.counterparty_funding_pubkey().serialize()), encode::serialize_hex(&bitcoin_tx.transaction),
log_bytes!(sighash[..]), encode::serialize_hex(&funding_script), log_bytes!(self.channel_id()));
if let Err(_) = self.secp_ctx.verify_ecdsa(&sighash, &msg.signature, &self.counterparty_funding_pubkey()) {
return Err((None, ChannelError::Close("Invalid commitment tx signature from peer".to_owned())));
return Err(ChannelError::Close("Invalid commitment tx signature from peer".to_owned()));
}
bitcoin_tx.txid
};
@ -3077,7 +3091,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
debug_assert!(!self.is_outbound());
let counterparty_reserve_we_require_msat = self.holder_selected_channel_reserve_satoshis * 1000;
if commitment_stats.remote_balance_msat < commitment_stats.total_fee_sat * 1000 + counterparty_reserve_we_require_msat {
return Err((None, ChannelError::Close("Funding remote cannot afford proposed new fee".to_owned())));
return Err(ChannelError::Close("Funding remote cannot afford proposed new fee".to_owned()));
}
}
#[cfg(any(test, fuzzing))]
@ -3099,7 +3113,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}
if msg.htlc_signatures.len() != commitment_stats.num_nondust_htlcs {
return Err((None, ChannelError::Close(format!("Got wrong number of HTLC signatures ({}) from remote. It must be {}", msg.htlc_signatures.len(), commitment_stats.num_nondust_htlcs))));
return Err(ChannelError::Close(format!("Got wrong number of HTLC signatures ({}) from remote. It must be {}", msg.htlc_signatures.len(), commitment_stats.num_nondust_htlcs)));
}
// TODO: Sadly, we pass HTLCs twice to ChannelMonitor: once via the HolderCommitmentTransaction and once via the update
@ -3117,7 +3131,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
log_bytes!(msg.htlc_signatures[idx].serialize_compact()[..]), log_bytes!(keys.countersignatory_htlc_key.serialize()),
encode::serialize_hex(&htlc_tx), log_bytes!(htlc_sighash[..]), encode::serialize_hex(&htlc_redeemscript), log_bytes!(self.channel_id()));
if let Err(_) = self.secp_ctx.verify_ecdsa(&htlc_sighash, &msg.htlc_signatures[idx], &keys.countersignatory_htlc_key) {
return Err((None, ChannelError::Close("Invalid HTLC tx signature from peer".to_owned())));
return Err(ChannelError::Close("Invalid HTLC tx signature from peer".to_owned()));
}
htlcs_and_sigs.push((htlc, Some(msg.htlc_signatures[idx]), source));
} else {
@ -3133,10 +3147,8 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
self.counterparty_funding_pubkey()
);
let next_per_commitment_point = self.holder_signer.get_per_commitment_point(self.cur_holder_commitment_transaction_number - 1, &self.secp_ctx);
self.holder_signer.validate_holder_commitment(&holder_commitment_tx, commitment_stats.preimages)
.map_err(|_| (None, ChannelError::Close("Failed to validate our commitment".to_owned())))?;
let per_commitment_secret = self.holder_signer.release_commitment_secret(self.cur_holder_commitment_transaction_number + 1);
.map_err(|_| ChannelError::Close("Failed to validate our commitment".to_owned()))?;
// Update state now that we've passed all the can-fail calls...
let mut need_commitment = false;
@ -3181,7 +3193,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
self.cur_holder_commitment_transaction_number -= 1;
// Note that if we need_commitment & !AwaitingRemoteRevoke we'll call
// send_commitment_no_status_check() next which will reset this to RAAFirst.
// build_commitment_no_status_check() next which will reset this to RAAFirst.
self.resend_order = RAACommitmentOrder::CommitmentFirst;
if (self.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0 {
@ -3193,52 +3205,50 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
// the corresponding HTLC status updates so that get_last_commitment_update
// includes the right HTLCs.
self.monitor_pending_commitment_signed = true;
let (_, mut additional_update) = self.send_commitment_no_status_check(logger).map_err(|e| (None, e))?;
// send_commitment_no_status_check may bump latest_monitor_id but we want them to be
let mut additional_update = self.build_commitment_no_status_check(logger);
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
// strictly increasing by one, so decrement it here.
self.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);
}
log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updated HTLC state but awaiting a monitor update resolution to reply.",
log_bytes!(self.channel_id));
return Err((Some(monitor_update), ChannelError::Ignore("Previous monitor update failure prevented generation of RAA".to_owned())));
self.pending_monitor_updates.push(monitor_update);
return Ok(self.pending_monitor_updates.last().unwrap());
}
let commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 {
let need_commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 {
// If we're AwaitingRemoteRevoke we can't send a new commitment here, but that's ok -
// we'll send one right away when we get the revoke_and_ack when we
// free_holding_cell_htlcs().
let (msg, mut additional_update) = self.send_commitment_no_status_check(logger).map_err(|e| (None, e))?;
// send_commitment_no_status_check may bump latest_monitor_id but we want them to be
let mut additional_update = self.build_commitment_no_status_check(logger);
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
// strictly increasing by one, so decrement it here.
self.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);
Some(msg)
} else { None };
true
} else { false };
log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updating HTLC state and responding with{} a revoke_and_ack.",
log_bytes!(self.channel_id()), if commitment_signed.is_some() { " our own commitment_signed and" } else { "" });
Ok((msgs::RevokeAndACK {
channel_id: self.channel_id,
per_commitment_secret,
next_per_commitment_point,
}, commitment_signed, monitor_update))
log_bytes!(self.channel_id()), if need_commitment_signed { " our own commitment_signed and" } else { "" });
self.pending_monitor_updates.push(monitor_update);
self.monitor_updating_paused(true, need_commitment_signed, false, Vec::new(), Vec::new(), Vec::new());
return Ok(self.pending_monitor_updates.last().unwrap());
}
/// Public version of the below, checking relevant preconditions first.
/// If we're not in a state where freeing the holding cell makes sense, this is a no-op and
/// returns `(None, Vec::new())`.
pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
if self.channel_state >= ChannelState::ChannelReady as u32 &&
(self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) == 0 {
self.free_holding_cell_htlcs(logger)
} else { Ok((None, Vec::new())) }
} else { (None, Vec::new()) }
}
/// Frees any pending commitment updates in the holding cell, generating the relevant messages
/// for our counterparty.
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, 0);
if self.holding_cell_htlc_updates.len() != 0 || self.holding_cell_update_fee.is_some() {
log_trace!(logger, "Freeing holding cell with {} HTLC updates{} in channel {}", self.holding_cell_htlc_updates.len(),
@ -3319,7 +3329,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}
}
if update_add_htlcs.is_empty() && update_fulfill_htlcs.is_empty() && update_fail_htlcs.is_empty() && self.holding_cell_update_fee.is_none() {
return Ok((None, htlcs_to_fail));
return (None, htlcs_to_fail);
}
let update_fee = if let Some(feerate) = self.holding_cell_update_fee.take() {
self.send_update_fee(feerate, false, logger)
@ -3327,8 +3337,8 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
None
};
let (commitment_signed, mut additional_update) = self.send_commitment_no_status_check(logger)?;
// send_commitment_no_status_check and get_update_fulfill_htlc may bump latest_monitor_id
let mut additional_update = self.build_commitment_no_status_check(logger);
// build_commitment_no_status_check and get_update_fulfill_htlc may bump latest_monitor_id
// but we want them to be strictly increasing by one, so reset it here.
self.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);
@ -3337,16 +3347,11 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
log_bytes!(self.channel_id()), if update_fee.is_some() { "a fee update, " } else { "" },
update_add_htlcs.len(), update_fulfill_htlcs.len(), update_fail_htlcs.len());
Ok((Some((msgs::CommitmentUpdate {
update_add_htlcs,
update_fulfill_htlcs,
update_fail_htlcs,
update_fail_malformed_htlcs: Vec::new(),
update_fee,
commitment_signed,
}, monitor_update)), htlcs_to_fail))
self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
self.pending_monitor_updates.push(monitor_update);
(Some(self.pending_monitor_updates.last().unwrap()), htlcs_to_fail)
} else {
Ok((None, Vec::new()))
(None, Vec::new())
}
}
@ -3355,7 +3360,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
/// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
/// generating an appropriate error *after* the channel state has been updated based on the
/// revoke_and_ack message.
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<RAAUpdates, ChannelError>
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, &ChannelMonitorUpdate), ChannelError>
where L::Target: Logger,
{
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
@ -3542,8 +3547,8 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
// When the monitor updating is restored we'll call get_last_commitment_update(),
// which does not update state, but we're definitely now awaiting a remote revoke
// before we can step forward any more, so set it here.
let (_, mut additional_update) = self.send_commitment_no_status_check(logger)?;
// send_commitment_no_status_check may bump latest_monitor_id but we want them to be
let mut additional_update = self.build_commitment_no_status_check(logger);
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
// strictly increasing by one, so decrement it here.
self.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);
@ -3552,71 +3557,41 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
self.monitor_pending_failures.append(&mut revoked_htlcs);
self.monitor_pending_finalized_fulfills.append(&mut finalized_claimed_htlcs);
log_debug!(logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id()));
return Ok(RAAUpdates {
commitment_update: None, finalized_claimed_htlcs: Vec::new(),
accepted_htlcs: Vec::new(), failed_htlcs: Vec::new(),
monitor_update,
holding_cell_failed_htlcs: Vec::new()
});
self.pending_monitor_updates.push(monitor_update);
return Ok((Vec::new(), self.pending_monitor_updates.last().unwrap()));
}
match self.free_holding_cell_htlcs(logger)? {
(Some((mut commitment_update, mut additional_update)), htlcs_to_fail) => {
commitment_update.update_fail_htlcs.reserve(update_fail_htlcs.len());
for fail_msg in update_fail_htlcs.drain(..) {
commitment_update.update_fail_htlcs.push(fail_msg);
}
commitment_update.update_fail_malformed_htlcs.reserve(update_fail_malformed_htlcs.len());
for fail_msg in update_fail_malformed_htlcs.drain(..) {
commitment_update.update_fail_malformed_htlcs.push(fail_msg);
}
match self.free_holding_cell_htlcs(logger) {
(Some(_), htlcs_to_fail) => {
let mut additional_update = self.pending_monitor_updates.pop().unwrap();
// free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
// strictly increasing by one, so decrement it here.
self.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);
Ok(RAAUpdates {
commitment_update: Some(commitment_update),
finalized_claimed_htlcs,
accepted_htlcs: to_forward_infos,
failed_htlcs: revoked_htlcs,
monitor_update,
holding_cell_failed_htlcs: htlcs_to_fail
})
self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
self.pending_monitor_updates.push(monitor_update);
Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
},
(None, htlcs_to_fail) => {
if require_commitment {
let (commitment_signed, mut additional_update) = self.send_commitment_no_status_check(logger)?;
let mut additional_update = self.build_commitment_no_status_check(logger);
// send_commitment_no_status_check may bump latest_monitor_id but we want them to be
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
// strictly increasing by one, so decrement it here.
self.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);
log_debug!(logger, "Received a valid revoke_and_ack for channel {}. Responding with a commitment update with {} HTLCs failed.",
log_bytes!(self.channel_id()), update_fail_htlcs.len() + update_fail_malformed_htlcs.len());
Ok(RAAUpdates {
commitment_update: Some(msgs::CommitmentUpdate {
update_add_htlcs: Vec::new(),
update_fulfill_htlcs: Vec::new(),
update_fail_htlcs,
update_fail_malformed_htlcs,
update_fee: None,
commitment_signed
}),
finalized_claimed_htlcs,
accepted_htlcs: to_forward_infos, failed_htlcs: revoked_htlcs,
monitor_update, holding_cell_failed_htlcs: htlcs_to_fail
})
self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
self.pending_monitor_updates.push(monitor_update);
Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
} else {
log_debug!(logger, "Received a valid revoke_and_ack for channel {} with no reply necessary.", log_bytes!(self.channel_id()));
Ok(RAAUpdates {
commitment_update: None,
finalized_claimed_htlcs,
accepted_htlcs: to_forward_infos, failed_htlcs: revoked_htlcs,
monitor_update, holding_cell_failed_htlcs: htlcs_to_fail
})
self.monitor_updating_paused(false, false, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
self.pending_monitor_updates.push(monitor_update);
Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
}
}
}
@ -3768,15 +3743,17 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}
/// Indicates that a ChannelMonitor update is in progress and has not yet been fully persisted.
/// This must be called immediately after the [`chain::Watch`] call which returned
/// [`ChannelMonitorUpdateStatus::InProgress`].
/// This must be called before we return the [`ChannelMonitorUpdate`] back to the
/// [`ChannelManager`], which will call [`Self::monitor_updating_restored`] once the monitor
/// update completes (potentially immediately).
/// The messages which were generated with the monitor update must *not* have been sent to the
/// remote end, and must instead have been dropped. They will be regenerated when
/// [`Self::monitor_updating_restored`] is called.
///
/// [`ChannelManager`]: super::channelmanager::ChannelManager
/// [`chain::Watch`]: crate::chain::Watch
/// [`ChannelMonitorUpdateStatus::InProgress`]: crate::chain::ChannelMonitorUpdateStatus::InProgress
pub fn monitor_updating_paused(&mut self, resend_raa: bool, resend_commitment: bool,
fn monitor_updating_paused(&mut self, resend_raa: bool, resend_commitment: bool,
resend_channel_ready: bool, mut pending_forwards: Vec<(PendingHTLCInfo, u64)>,
mut pending_fails: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
mut pending_finalized_claimed_htlcs: Vec<HTLCSource>
@ -3803,6 +3780,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
{
assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32);
self.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32);
self.pending_monitor_updates.clear();
// If we're past (or at) the FundingSent stage on an outbound channel, try to
// (re-)broadcast the funding transaction as we may have declined to broadcast it when we
@ -4280,7 +4258,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
pub fn shutdown<SP: Deref>(
&mut self, signer_provider: &SP, their_features: &InitFeatures, msg: &msgs::Shutdown
) -> Result<(Option<msgs::Shutdown>, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), ChannelError>
) -> Result<(Option<msgs::Shutdown>, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), ChannelError>
where SP::Target: SignerProvider
{
if self.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 {
@ -4336,12 +4314,15 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
let monitor_update = if update_shutdown_script {
self.latest_monitor_update_id += 1;
Some(ChannelMonitorUpdate {
let monitor_update = ChannelMonitorUpdate {
update_id: self.latest_monitor_update_id,
updates: vec![ChannelMonitorUpdateStep::ShutdownScript {
scriptpubkey: self.get_closing_scriptpubkey(),
}],
})
};
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
self.pending_monitor_updates.push(monitor_update);
Some(self.pending_monitor_updates.last().unwrap())
} else { None };
let shutdown = if send_shutdown {
Some(msgs::Shutdown {
@ -4891,6 +4872,10 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
(self.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0
}
pub fn get_next_monitor_update(&self) -> Option<&ChannelMonitorUpdate> {
self.pending_monitor_updates.first()
}
/// Returns true if funding_created was sent/received.
pub fn is_funding_initiated(&self) -> bool {
self.channel_state >= ChannelState::FundingSent as u32
@ -5791,8 +5776,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
Ok(Some(res))
}
/// Only fails in case of bad keys
fn send_commitment_no_status_check<L: Deref>(&mut self, logger: &L) -> Result<(msgs::CommitmentSigned, ChannelMonitorUpdate), ChannelError> where L::Target: Logger {
fn build_commitment_no_status_check<L: Deref>(&mut self, logger: &L) -> ChannelMonitorUpdate where L::Target: Logger {
log_trace!(logger, "Updating HTLC state for a newly-sent commitment_signed...");
// We can upgrade the status of some HTLCs that are waiting on a commitment, even if we
// fail to generate this, we still are at least at a position where upgrading their status
@ -5825,15 +5809,9 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}
self.resend_order = RAACommitmentOrder::RevokeAndACKFirst;
let (res, counterparty_commitment_txid, htlcs) = match self.send_commitment_no_state_update(logger) {
Ok((res, (counterparty_commitment_tx, mut htlcs))) => {
// Update state now that we've passed all the can-fail calls...
let htlcs_no_ref: Vec<(HTLCOutputInCommitment, Option<Box<HTLCSource>>)> =
htlcs.drain(..).map(|(htlc, htlc_source)| (htlc, htlc_source.map(|source_ref| Box::new(source_ref.clone())))).collect();
(res, counterparty_commitment_tx, htlcs_no_ref)
},
Err(e) => return Err(e),
};
let (counterparty_commitment_txid, mut htlcs_ref) = self.build_commitment_no_state_update(logger);
let htlcs: Vec<(HTLCOutputInCommitment, Option<Box<HTLCSource>>)> =
htlcs_ref.drain(..).map(|(htlc, htlc_source)| (htlc, htlc_source.map(|source_ref| Box::new(source_ref.clone())))).collect();
if self.announcement_sigs_state == AnnouncementSigsState::MessageSent {
self.announcement_sigs_state = AnnouncementSigsState::Committed;
@ -5850,16 +5828,13 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}]
};
self.channel_state |= ChannelState::AwaitingRemoteRevoke as u32;
Ok((res, monitor_update))
monitor_update
}
/// Only fails in case of bad keys. Used for channel_reestablish commitment_signed generation
/// when we shouldn't change HTLC/channel state.
fn send_commitment_no_state_update<L: Deref>(&self, logger: &L) -> Result<(msgs::CommitmentSigned, (Txid, Vec<(HTLCOutputInCommitment, Option<&HTLCSource>)>)), ChannelError> where L::Target: Logger {
fn build_commitment_no_state_update<L: Deref>(&self, logger: &L) -> (Txid, Vec<(HTLCOutputInCommitment, Option<&HTLCSource>)>) where L::Target: Logger {
let counterparty_keys = self.build_remote_transaction_keys();
let commitment_stats = self.build_commitment_transaction(self.cur_counterparty_commitment_transaction_number, &counterparty_keys, false, true, logger);
let counterparty_commitment_txid = commitment_stats.tx.trust().txid();
let (signature, htlc_signatures);
#[cfg(any(test, fuzzing))]
{
@ -5879,6 +5854,21 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}
}
(counterparty_commitment_txid, commitment_stats.htlcs_included)
}
/// Only fails in case of signer rejection. Used for channel_reestablish commitment_signed
/// generation when we shouldn't change HTLC/channel state.
fn send_commitment_no_state_update<L: Deref>(&self, logger: &L) -> Result<(msgs::CommitmentSigned, (Txid, Vec<(HTLCOutputInCommitment, Option<&HTLCSource>)>)), ChannelError> where L::Target: Logger {
// Get the fee tests from `build_commitment_no_state_update`
#[cfg(any(test, fuzzing))]
self.build_commitment_no_state_update(logger);
let counterparty_keys = self.build_remote_transaction_keys();
let commitment_stats = self.build_commitment_transaction(self.cur_counterparty_commitment_transaction_number, &counterparty_keys, false, true, logger);
let counterparty_commitment_txid = commitment_stats.tx.trust().txid();
let (signature, htlc_signatures);
{
let mut htlcs = Vec::with_capacity(commitment_stats.htlcs_included.len());
for &(ref htlc, _) in commitment_stats.htlcs_included.iter() {
@ -5911,16 +5901,20 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}, (counterparty_commitment_txid, commitment_stats.htlcs_included)))
}
/// Adds a pending outbound HTLC to this channel, and creates a signed commitment transaction
/// to send to the remote peer in one go.
/// Adds a pending outbound HTLC to this channel, and builds a new remote commitment
/// transaction and generates the corresponding [`ChannelMonitorUpdate`] in one go.
///
/// Shorthand for calling [`Self::send_htlc`] followed by a commitment update, see docs on
/// [`Self::send_htlc`] and [`Self::send_commitment_no_state_update`] for more info.
pub fn send_htlc_and_commit<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, logger: &L) -> Result<Option<(msgs::UpdateAddHTLC, msgs::CommitmentSigned, ChannelMonitorUpdate)>, ChannelError> where L::Target: Logger {
match self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, false, logger)? {
Some(update_add_htlc) => {
let (commitment_signed, monitor_update) = self.send_commitment_no_status_check(logger)?;
Ok(Some((update_add_htlc, commitment_signed, monitor_update)))
/// [`Self::send_htlc`] and [`Self::build_commitment_no_state_update`] for more info.
pub fn send_htlc_and_commit<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, logger: &L) -> Result<Option<&ChannelMonitorUpdate>, ChannelError> where L::Target: Logger {
let send_res = self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, false, logger);
if let Err(e) = &send_res { if let ChannelError::Ignore(_) = e {} else { debug_assert!(false, "Sending cannot trigger channel failure"); } }
match send_res? {
Some(_) => {
let monitor_update = self.build_commitment_no_status_check(logger);
self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
self.pending_monitor_updates.push(monitor_update);
Ok(Some(self.pending_monitor_updates.last().unwrap()))
},
None => Ok(None)
}
@ -5946,8 +5940,12 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
/// Begins the shutdown process, getting a message for the remote peer and returning all
/// holding cell HTLCs for payment failure.
pub fn get_shutdown<SP: Deref>(&mut self, signer_provider: &SP, their_features: &InitFeatures, target_feerate_sats_per_kw: Option<u32>)
-> Result<(msgs::Shutdown, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
///
/// May jump to the channel being fully shutdown (see [`Self::is_shutdown`]) in which case no
/// [`ChannelMonitorUpdate`] will be returned).
pub fn get_shutdown<SP: Deref>(&mut self, signer_provider: &SP, their_features: &InitFeatures,
target_feerate_sats_per_kw: Option<u32>)
-> Result<(msgs::Shutdown, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
where SP::Target: SignerProvider {
for htlc in self.pending_outbound_htlcs.iter() {
if let OutboundHTLCState::LocalAnnounced(_) = htlc.state {
@ -5967,9 +5965,16 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
return Err(APIError::ChannelUnavailable{err: "Cannot begin shutdown while peer is disconnected or we're waiting on a monitor update, maybe force-close instead?".to_owned()});
}
// If we haven't funded the channel yet, we don't need to bother ensuring the shutdown
// script is set, we just force-close and call it a day.
let mut chan_closed = false;
if self.channel_state < ChannelState::FundingSent as u32 {
chan_closed = true;
}
let update_shutdown_script = match self.shutdown_scriptpubkey {
Some(_) => false,
None => {
None if !chan_closed => {
let shutdown_scriptpubkey = signer_provider.get_shutdown_scriptpubkey();
if !shutdown_scriptpubkey.is_compatible(their_features) {
return Err(APIError::IncompatibleShutdownScript { script: shutdown_scriptpubkey.clone() });
@ -5977,6 +5982,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
self.shutdown_scriptpubkey = Some(shutdown_scriptpubkey);
true
},
None => false,
};
// From here on out, we may not fail!
@ -5990,12 +5996,15 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
let monitor_update = if update_shutdown_script {
self.latest_monitor_update_id += 1;
Some(ChannelMonitorUpdate {
let monitor_update = ChannelMonitorUpdate {
update_id: self.latest_monitor_update_id,
updates: vec![ChannelMonitorUpdateStep::ShutdownScript {
scriptpubkey: self.get_closing_scriptpubkey(),
}],
})
};
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
self.pending_monitor_updates.push(monitor_update);
Some(self.pending_monitor_updates.last().unwrap())
} else { None };
let shutdown = msgs::Shutdown {
channel_id: self.channel_id,
@ -6016,6 +6025,9 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}
});
debug_assert!(!self.is_shutdown() || monitor_update.is_none(),
"we can't both complete shutdown and return a monitor update");
Ok((shutdown, monitor_update, dropped_outbound_htlcs))
}
@ -6877,6 +6889,8 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
channel_type: channel_type.unwrap(),
channel_keys_id,
pending_monitor_updates: Vec::new(),
})
}
}
@ -7188,7 +7202,7 @@ mod tests {
}]};
let funding_outpoint = OutPoint{ txid: tx.txid(), index: 0 };
let funding_created_msg = node_a_chan.get_outbound_funding_created(tx.clone(), funding_outpoint, &&logger).unwrap();
let (funding_signed_msg, _, _) = node_b_chan.funding_created(&funding_created_msg, best_block, &&keys_provider, &&logger).unwrap();
let (funding_signed_msg, _) = node_b_chan.funding_created(&funding_created_msg, best_block, &&keys_provider, &&logger).unwrap();
// Node B --> Node A: funding signed
let _ = node_a_chan.funding_signed(&funding_signed_msg, best_block, &&keys_provider, &&logger);

File diff suppressed because it is too large Load diff

View file

@ -3618,22 +3618,22 @@ fn test_simple_peer_disconnect() {
_ => panic!("Unexpected event"),
}
match events[1] {
Event::PaymentPathSuccessful { .. } => {},
_ => panic!("Unexpected event"),
}
match events[2] {
Event::PaymentPathFailed { payment_hash, payment_failed_permanently, .. } => {
assert_eq!(payment_hash, payment_hash_5);
assert!(payment_failed_permanently);
},
_ => panic!("Unexpected event"),
}
match events[2] {
match events[3] {
Event::PaymentFailed { payment_hash, .. } => {
assert_eq!(payment_hash, payment_hash_5);
},
_ => panic!("Unexpected event"),
}
match events[3] {
Event::PaymentPathSuccessful { .. } => {},
_ => panic!("Unexpected event"),
}
}
claim_payment(&nodes[0], &vec!(&nodes[1], &nodes[2]), payment_preimage_4);
@ -8186,7 +8186,7 @@ fn test_update_err_monitor_lockdown() {
let mut node_0_per_peer_lock;
let mut node_0_peer_state_lock;
let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan_1.2);
if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
assert_eq!(watchtower.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::PermanentFailure);
assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::Completed);
} else { assert!(false); }
@ -8280,7 +8280,7 @@ fn test_concurrent_monitor_claim() {
let mut node_0_per_peer_lock;
let mut node_0_peer_state_lock;
let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan_1.2);
if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
// Watchtower Alice should already have seen the block and reject the update
assert_eq!(watchtower_alice.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::PermanentFailure);
assert_eq!(watchtower_bob.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::Completed);
@ -8736,9 +8736,9 @@ fn test_duplicate_chan_id() {
};
check_added_monitors!(nodes[0], 0);
nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created);
// At this point we'll try to add a duplicate channel monitor, which will be rejected, but
// still needs to be cleared here.
check_added_monitors!(nodes[1], 1);
// At this point we'll look up if the channel_id is present and immediately fail the channel
// without trying to persist the `ChannelMonitor`.
check_added_monitors!(nodes[1], 0);
// ...still, nodes[1] will reject the duplicate channel.
{

View file

@ -258,7 +258,7 @@ fn no_pending_leak_on_initial_send_failure() {
unwrap_send_err!(nodes[0].node.send_payment(&route, payment_hash, &Some(payment_secret), PaymentId(payment_hash.0)),
true, APIError::ChannelUnavailable { ref err },
assert_eq!(err, "Peer for first hop currently disconnected/pending monitor update!"));
assert_eq!(err, "Peer for first hop currently disconnected"));
assert!(!nodes[0].node.has_pending_payments());
}

View file

@ -770,6 +770,7 @@ impl Readable for Vec<u8> {
}
impl_for_vec!(ecdsa::Signature);
impl_for_vec!(crate::ln::channelmanager::MonitorUpdateCompletionAction);
impl_for_vec!((A, B), A, B);
impl Writeable for Script {