Merge pull request #3588 from wpaulino/quiescence

Implement quiescence protocol
This commit is contained in:
Matt Corallo 2025-02-21 14:52:31 +00:00 committed by GitHub
commit 609f89de43
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 1252 additions and 128 deletions

View file

@ -978,7 +978,9 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
lock_fundings!(nodes); lock_fundings!(nodes);
let chan_a = nodes[0].list_usable_channels()[0].short_channel_id.unwrap(); let chan_a = nodes[0].list_usable_channels()[0].short_channel_id.unwrap();
let chan_a_id = nodes[0].list_usable_channels()[0].channel_id;
let chan_b = nodes[2].list_usable_channels()[0].short_channel_id.unwrap(); let chan_b = nodes[2].list_usable_channels()[0].short_channel_id.unwrap();
let chan_b_id = nodes[2].list_usable_channels()[0].channel_id;
let mut p_id: u8 = 0; let mut p_id: u8 = 0;
let mut p_idx: u64 = 0; let mut p_idx: u64 = 0;
@ -1039,6 +1041,10 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
if Some(*node_id) == expect_drop_id { panic!("peer_disconnected should drop msgs bound for the disconnected peer"); } if Some(*node_id) == expect_drop_id { panic!("peer_disconnected should drop msgs bound for the disconnected peer"); }
*node_id == a_id *node_id == a_id
}, },
events::MessageSendEvent::SendStfu { ref node_id, .. } => {
if Some(*node_id) == expect_drop_id { panic!("peer_disconnected should drop msgs bound for the disconnected peer"); }
*node_id == a_id
},
events::MessageSendEvent::SendChannelReady { .. } => continue, events::MessageSendEvent::SendChannelReady { .. } => continue,
events::MessageSendEvent::SendAnnouncementSignatures { .. } => continue, events::MessageSendEvent::SendAnnouncementSignatures { .. } => continue,
events::MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => { events::MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => {
@ -1101,7 +1107,7 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
for (idx, dest) in nodes.iter().enumerate() { for (idx, dest) in nodes.iter().enumerate() {
if dest.get_our_node_id() == node_id { if dest.get_our_node_id() == node_id {
for update_add in update_add_htlcs.iter() { for update_add in update_add_htlcs.iter() {
out.locked_write(format!("Delivering update_add_htlc to node {}.\n", idx).as_bytes()); out.locked_write(format!("Delivering update_add_htlc from node {} to node {}.\n", $node, idx).as_bytes());
if !$corrupt_forward { if !$corrupt_forward {
dest.handle_update_add_htlc(nodes[$node].get_our_node_id(), update_add); dest.handle_update_add_htlc(nodes[$node].get_our_node_id(), update_add);
} else { } else {
@ -1116,19 +1122,19 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
} }
} }
for update_fulfill in update_fulfill_htlcs.iter() { for update_fulfill in update_fulfill_htlcs.iter() {
out.locked_write(format!("Delivering update_fulfill_htlc to node {}.\n", idx).as_bytes()); out.locked_write(format!("Delivering update_fulfill_htlc from node {} to node {}.\n", $node, idx).as_bytes());
dest.handle_update_fulfill_htlc(nodes[$node].get_our_node_id(), update_fulfill); dest.handle_update_fulfill_htlc(nodes[$node].get_our_node_id(), update_fulfill);
} }
for update_fail in update_fail_htlcs.iter() { for update_fail in update_fail_htlcs.iter() {
out.locked_write(format!("Delivering update_fail_htlc to node {}.\n", idx).as_bytes()); out.locked_write(format!("Delivering update_fail_htlc from node {} to node {}.\n", $node, idx).as_bytes());
dest.handle_update_fail_htlc(nodes[$node].get_our_node_id(), update_fail); dest.handle_update_fail_htlc(nodes[$node].get_our_node_id(), update_fail);
} }
for update_fail_malformed in update_fail_malformed_htlcs.iter() { for update_fail_malformed in update_fail_malformed_htlcs.iter() {
out.locked_write(format!("Delivering update_fail_malformed_htlc to node {}.\n", idx).as_bytes()); out.locked_write(format!("Delivering update_fail_malformed_htlc from node {} to node {}.\n", $node, idx).as_bytes());
dest.handle_update_fail_malformed_htlc(nodes[$node].get_our_node_id(), update_fail_malformed); dest.handle_update_fail_malformed_htlc(nodes[$node].get_our_node_id(), update_fail_malformed);
} }
if let Some(msg) = update_fee { if let Some(msg) = update_fee {
out.locked_write(format!("Delivering update_fee to node {}.\n", idx).as_bytes()); out.locked_write(format!("Delivering update_fee from node {} to node {}.\n", $node, idx).as_bytes());
dest.handle_update_fee(nodes[$node].get_our_node_id(), &msg); dest.handle_update_fee(nodes[$node].get_our_node_id(), &msg);
} }
let processed_change = !update_add_htlcs.is_empty() || !update_fulfill_htlcs.is_empty() || let processed_change = !update_add_htlcs.is_empty() || !update_fulfill_htlcs.is_empty() ||
@ -1145,7 +1151,7 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
} }); } });
break; break;
} }
out.locked_write(format!("Delivering commitment_signed to node {}.\n", idx).as_bytes()); out.locked_write(format!("Delivering commitment_signed from node {} to node {}.\n", $node, idx).as_bytes());
dest.handle_commitment_signed(nodes[$node].get_our_node_id(), &commitment_signed); dest.handle_commitment_signed(nodes[$node].get_our_node_id(), &commitment_signed);
break; break;
} }
@ -1154,7 +1160,7 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
events::MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => { events::MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
for (idx, dest) in nodes.iter().enumerate() { for (idx, dest) in nodes.iter().enumerate() {
if dest.get_our_node_id() == *node_id { if dest.get_our_node_id() == *node_id {
out.locked_write(format!("Delivering revoke_and_ack to node {}.\n", idx).as_bytes()); out.locked_write(format!("Delivering revoke_and_ack from node {} to node {}.\n", $node, idx).as_bytes());
dest.handle_revoke_and_ack(nodes[$node].get_our_node_id(), msg); dest.handle_revoke_and_ack(nodes[$node].get_our_node_id(), msg);
} }
} }
@ -1162,11 +1168,19 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
events::MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { events::MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
for (idx, dest) in nodes.iter().enumerate() { for (idx, dest) in nodes.iter().enumerate() {
if dest.get_our_node_id() == *node_id { if dest.get_our_node_id() == *node_id {
out.locked_write(format!("Delivering channel_reestablish to node {}.\n", idx).as_bytes()); out.locked_write(format!("Delivering channel_reestablish from node {} to node {}.\n", $node, idx).as_bytes());
dest.handle_channel_reestablish(nodes[$node].get_our_node_id(), msg); dest.handle_channel_reestablish(nodes[$node].get_our_node_id(), msg);
} }
} }
}, },
events::MessageSendEvent::SendStfu { ref node_id, ref msg } => {
for (idx, dest) in nodes.iter().enumerate() {
if dest.get_our_node_id() == *node_id {
out.locked_write(format!("Delivering stfu from node {} to node {}.\n", $node, idx).as_bytes());
dest.handle_stfu(nodes[$node].get_our_node_id(), msg);
}
}
}
events::MessageSendEvent::SendChannelReady { .. } => { events::MessageSendEvent::SendChannelReady { .. } => {
// Can be generated as a reestablish response // Can be generated as a reestablish response
}, },
@ -1219,6 +1233,7 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
events::MessageSendEvent::UpdateHTLCs { .. } => {}, events::MessageSendEvent::UpdateHTLCs { .. } => {},
events::MessageSendEvent::SendRevokeAndACK { .. } => {}, events::MessageSendEvent::SendRevokeAndACK { .. } => {},
events::MessageSendEvent::SendChannelReestablish { .. } => {}, events::MessageSendEvent::SendChannelReestablish { .. } => {},
events::MessageSendEvent::SendStfu { .. } => {},
events::MessageSendEvent::SendChannelReady { .. } => {}, events::MessageSendEvent::SendChannelReady { .. } => {},
events::MessageSendEvent::SendAnnouncementSignatures { .. } => {}, events::MessageSendEvent::SendAnnouncementSignatures { .. } => {},
events::MessageSendEvent::SendChannelUpdate { ref msg, .. } => { events::MessageSendEvent::SendChannelUpdate { ref msg, .. } => {
@ -1245,6 +1260,7 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
events::MessageSendEvent::UpdateHTLCs { .. } => {}, events::MessageSendEvent::UpdateHTLCs { .. } => {},
events::MessageSendEvent::SendRevokeAndACK { .. } => {}, events::MessageSendEvent::SendRevokeAndACK { .. } => {},
events::MessageSendEvent::SendChannelReestablish { .. } => {}, events::MessageSendEvent::SendChannelReestablish { .. } => {},
events::MessageSendEvent::SendStfu { .. } => {},
events::MessageSendEvent::SendChannelReady { .. } => {}, events::MessageSendEvent::SendChannelReady { .. } => {},
events::MessageSendEvent::SendAnnouncementSignatures { .. } => {}, events::MessageSendEvent::SendAnnouncementSignatures { .. } => {},
events::MessageSendEvent::SendChannelUpdate { ref msg, .. } => { events::MessageSendEvent::SendChannelUpdate { ref msg, .. } => {
@ -1688,6 +1704,19 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
nodes[2].maybe_update_chan_fees(); nodes[2].maybe_update_chan_fees();
}, },
0xa0 => {
nodes[0].maybe_propose_quiescence(&nodes[1].get_our_node_id(), &chan_a_id).unwrap()
},
0xa1 => {
nodes[1].maybe_propose_quiescence(&nodes[0].get_our_node_id(), &chan_a_id).unwrap()
},
0xa2 => {
nodes[1].maybe_propose_quiescence(&nodes[2].get_our_node_id(), &chan_b_id).unwrap()
},
0xa3 => {
nodes[2].maybe_propose_quiescence(&nodes[1].get_our_node_id(), &chan_b_id).unwrap()
},
0xf0 => complete_monitor_update(&monitor_a, &chan_1_id, &complete_first), 0xf0 => complete_monitor_update(&monitor_a, &chan_1_id, &complete_first),
0xf1 => complete_monitor_update(&monitor_a, &chan_1_id, &complete_second), 0xf1 => complete_monitor_update(&monitor_a, &chan_1_id, &complete_second),
0xf2 => complete_monitor_update(&monitor_a, &chan_1_id, &Vec::pop), 0xf2 => complete_monitor_update(&monitor_a, &chan_1_id, &Vec::pop),
@ -1753,34 +1782,49 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
chan_b_disconnected = false; chan_b_disconnected = false;
} }
for i in 0..std::usize::MAX { macro_rules! process_all_events {
if i == 100 { () => {
panic!("It may take may iterations to settle the state, but it should not take forever"); for i in 0..std::usize::MAX {
} if i == 100 {
// Then, make sure any current forwards make their way to their destination panic!("It may take may iterations to settle the state, but it should not take forever");
if process_msg_events!(0, false, ProcessMessages::AllMessages) { }
continue; // Then, make sure any current forwards make their way to their destination
} if process_msg_events!(0, false, ProcessMessages::AllMessages) {
if process_msg_events!(1, false, ProcessMessages::AllMessages) { continue;
continue; }
} if process_msg_events!(1, false, ProcessMessages::AllMessages) {
if process_msg_events!(2, false, ProcessMessages::AllMessages) { continue;
continue; }
} if process_msg_events!(2, false, ProcessMessages::AllMessages) {
// ...making sure any pending PendingHTLCsForwardable events are handled and continue;
// payments claimed. }
if process_events!(0, false) { // ...making sure any pending PendingHTLCsForwardable events are handled and
continue; // payments claimed.
} if process_events!(0, false) {
if process_events!(1, false) { continue;
continue; }
} if process_events!(1, false) {
if process_events!(2, false) { continue;
continue; }
} if process_events!(2, false) {
break; continue;
}
break;
}
};
} }
// At this point, we may be pending quiescence, so we'll process all messages to
// ensure we can complete its handshake. We'll then exit quiescence and process all
// messages again, to resolve any pending HTLCs (only irrevocably committed ones)
// before attempting to send more payments.
process_all_events!();
nodes[0].exit_quiescence(&nodes[1].get_our_node_id(), &chan_a_id).unwrap();
nodes[1].exit_quiescence(&nodes[0].get_our_node_id(), &chan_a_id).unwrap();
nodes[1].exit_quiescence(&nodes[2].get_our_node_id(), &chan_b_id).unwrap();
nodes[2].exit_quiescence(&nodes[1].get_our_node_id(), &chan_b_id).unwrap();
process_all_events!();
// Finally, make sure that at least one end of each channel can make a substantial payment // Finally, make sure that at least one end of each channel can make a substantial payment
assert!( assert!(
send_payment(&nodes[0], &nodes[1], chan_a, 10_000_000, &mut p_id, &mut p_idx) send_payment(&nodes[0], &nodes[1], chan_a, 10_000_000, &mut p_id, &mut p_idx)

View file

@ -74,6 +74,8 @@
//! (see [bLIP 32](https://github.com/lightning/blips/blob/master/blip-0032.md) for more information). //! (see [bLIP 32](https://github.com/lightning/blips/blob/master/blip-0032.md) for more information).
//! - `ProvideStorage` - Indicates that we offer the capability to store data of our peers //! - `ProvideStorage` - Indicates that we offer the capability to store data of our peers
//! (see https://github.com/lightning/bolts/pull/1110 for more info). //! (see https://github.com/lightning/bolts/pull/1110 for more info).
//! - `Quiescence` - protocol to quiesce a channel by indicating that "SomeThing Fundamental is Underway"
//! (see [BOLT-2](https://github.com/lightning/bolts/blob/master/02-peer-protocol.md#channel-quiescence) for more information).
//! //!
//! LDK knows about the following features, but does not support them: //! LDK knows about the following features, but does not support them:
//! - `AnchorsNonzeroFeeHtlcTx` - the initial version of anchor outputs, which was later found to be //! - `AnchorsNonzeroFeeHtlcTx` - the initial version of anchor outputs, which was later found to be
@ -152,7 +154,7 @@ mod sealed {
// Byte 3 // Byte 3
RouteBlinding | ShutdownAnySegwit | DualFund | Taproot, RouteBlinding | ShutdownAnySegwit | DualFund | Taproot,
// Byte 4 // Byte 4
OnionMessages, Quiescence | OnionMessages,
// Byte 5 // Byte 5
ProvideStorage | ChannelType | SCIDPrivacy, ProvideStorage | ChannelType | SCIDPrivacy,
// Byte 6 // Byte 6
@ -173,7 +175,7 @@ mod sealed {
// Byte 3 // Byte 3
RouteBlinding | ShutdownAnySegwit | DualFund | Taproot, RouteBlinding | ShutdownAnySegwit | DualFund | Taproot,
// Byte 4 // Byte 4
OnionMessages, Quiescence | OnionMessages,
// Byte 5 // Byte 5
ProvideStorage | ChannelType | SCIDPrivacy, ProvideStorage | ChannelType | SCIDPrivacy,
// Byte 6 // Byte 6
@ -536,6 +538,16 @@ mod sealed {
supports_taproot, supports_taproot,
requires_taproot requires_taproot
); );
define_feature!(
35,
Quiescence,
[InitContext, NodeContext],
"Feature flags for `option_quiesce`.",
set_quiescence_optional,
set_quiescence_required,
supports_quiescence,
requires_quiescence
);
define_feature!( define_feature!(
39, 39,
OnionMessages, OnionMessages,
@ -1195,6 +1207,7 @@ mod tests {
init_features.set_channel_type_optional(); init_features.set_channel_type_optional();
init_features.set_scid_privacy_optional(); init_features.set_scid_privacy_optional();
init_features.set_zero_conf_optional(); init_features.set_zero_conf_optional();
init_features.set_quiescence_optional();
assert!(init_features.initial_routing_sync()); assert!(init_features.initial_routing_sync());
assert!(!init_features.supports_upfront_shutdown_script()); assert!(!init_features.supports_upfront_shutdown_script());
@ -1215,7 +1228,7 @@ mod tests {
assert_eq!(node_features.flags[1], 0b01010001); assert_eq!(node_features.flags[1], 0b01010001);
assert_eq!(node_features.flags[2], 0b10001010); assert_eq!(node_features.flags[2], 0b10001010);
assert_eq!(node_features.flags[3], 0b00001010); assert_eq!(node_features.flags[3], 0b00001010);
assert_eq!(node_features.flags[4], 0b10000000); assert_eq!(node_features.flags[4], 0b10001000);
assert_eq!(node_features.flags[5], 0b10100000); assert_eq!(node_features.flags[5], 0b10100000);
assert_eq!(node_features.flags[6], 0b00001000); assert_eq!(node_features.flags[6], 0b00001000);
} }

View file

@ -474,6 +474,10 @@ mod state_flags {
pub const LOCAL_SHUTDOWN_SENT: u32 = 1 << 11; pub const LOCAL_SHUTDOWN_SENT: u32 = 1 << 11;
pub const SHUTDOWN_COMPLETE: u32 = 1 << 12; pub const SHUTDOWN_COMPLETE: u32 = 1 << 12;
pub const WAITING_FOR_BATCH: u32 = 1 << 13; pub const WAITING_FOR_BATCH: u32 = 1 << 13;
pub const AWAITING_QUIESCENCE: u32 = 1 << 14;
pub const LOCAL_STFU_SENT: u32 = 1 << 15;
pub const REMOTE_STFU_SENT: u32 = 1 << 16;
pub const QUIESCENT: u32 = 1 << 17;
} }
define_state_flags!( define_state_flags!(
@ -532,7 +536,26 @@ define_state_flags!(
messages as we'd be unable to determine which HTLCs they included in their `revoke_and_ack` \ messages as we'd be unable to determine which HTLCs they included in their `revoke_and_ack` \
implicit ACK, so instead we have to hold them away temporarily to be sent later.", implicit ACK, so instead we have to hold them away temporarily to be sent later.",
AWAITING_REMOTE_REVOKE, state_flags::AWAITING_REMOTE_REVOKE, AWAITING_REMOTE_REVOKE, state_flags::AWAITING_REMOTE_REVOKE,
is_awaiting_remote_revoke, set_awaiting_remote_revoke, clear_awaiting_remote_revoke) is_awaiting_remote_revoke, set_awaiting_remote_revoke, clear_awaiting_remote_revoke),
("Indicates a local request has been made for the channel to become quiescent. Both nodes \
must send `stfu` for the channel to become quiescent. This flag will be cleared and we \
will no longer attempt quiescence if either node requests a shutdown.",
AWAITING_QUIESCENCE, state_flags::AWAITING_QUIESCENCE,
is_awaiting_quiescence, set_awaiting_quiescence, clear_awaiting_quiescence),
("Indicates we have sent a `stfu` message to the counterparty. This message can only be sent \
if either `AWAITING_QUIESCENCE` or `REMOTE_STFU_SENT` is set. Shutdown requests are \
rejected if this flag is set.",
LOCAL_STFU_SENT, state_flags::LOCAL_STFU_SENT,
is_local_stfu_sent, set_local_stfu_sent, clear_local_stfu_sent),
("Indicates we have received a `stfu` message from the counterparty. Shutdown requests are \
rejected if this flag is set.",
REMOTE_STFU_SENT, state_flags::REMOTE_STFU_SENT,
is_remote_stfu_sent, set_remote_stfu_sent, clear_remote_stfu_sent),
("Indicates the quiescence handshake has completed and the channel is now quiescent. \
Updates are not allowed while this flag is set, and any outbound updates will go \
directly into the holding cell.",
QUIESCENT, state_flags::QUIESCENT,
is_quiescent, set_quiescent, clear_quiescent)
] ]
); );
@ -646,6 +669,8 @@ impl ChannelState {
match self { match self {
ChannelState::ChannelReady(flags) => ChannelState::ChannelReady(flags) =>
!flags.is_set(ChannelReadyFlags::AWAITING_REMOTE_REVOKE) && !flags.is_set(ChannelReadyFlags::AWAITING_REMOTE_REVOKE) &&
!flags.is_set(ChannelReadyFlags::LOCAL_STFU_SENT) &&
!flags.is_set(ChannelReadyFlags::QUIESCENT) &&
!flags.is_set(FundedStateFlags::MONITOR_UPDATE_IN_PROGRESS.into()) && !flags.is_set(FundedStateFlags::MONITOR_UPDATE_IN_PROGRESS.into()) &&
!flags.is_set(FundedStateFlags::PEER_DISCONNECTED.into()), !flags.is_set(FundedStateFlags::PEER_DISCONNECTED.into()),
_ => { _ => {
@ -663,6 +688,10 @@ impl ChannelState {
impl_state_flag!(is_their_channel_ready, set_their_channel_ready, clear_their_channel_ready, AwaitingChannelReady); impl_state_flag!(is_their_channel_ready, set_their_channel_ready, clear_their_channel_ready, AwaitingChannelReady);
impl_state_flag!(is_waiting_for_batch, set_waiting_for_batch, clear_waiting_for_batch, AwaitingChannelReady); impl_state_flag!(is_waiting_for_batch, set_waiting_for_batch, clear_waiting_for_batch, AwaitingChannelReady);
impl_state_flag!(is_awaiting_remote_revoke, set_awaiting_remote_revoke, clear_awaiting_remote_revoke, ChannelReady); impl_state_flag!(is_awaiting_remote_revoke, set_awaiting_remote_revoke, clear_awaiting_remote_revoke, ChannelReady);
impl_state_flag!(is_awaiting_quiescence, set_awaiting_quiescence, clear_awaiting_quiescence, ChannelReady);
impl_state_flag!(is_local_stfu_sent, set_local_stfu_sent, clear_local_stfu_sent, ChannelReady);
impl_state_flag!(is_remote_stfu_sent, set_remote_stfu_sent, clear_remote_stfu_sent, ChannelReady);
impl_state_flag!(is_quiescent, set_quiescent, clear_quiescent, ChannelReady);
} }
pub const INITIAL_COMMITMENT_NUMBER: u64 = (1 << 48) - 1; pub const INITIAL_COMMITMENT_NUMBER: u64 = (1 << 48) - 1;
@ -713,6 +742,7 @@ pub const MIN_THEIR_CHAN_RESERVE_SATOSHIS: u64 = 1000;
pub(super) enum ChannelError { pub(super) enum ChannelError {
Ignore(String), Ignore(String),
Warn(String), Warn(String),
WarnAndDisconnect(String),
Close((String, ClosureReason)), Close((String, ClosureReason)),
SendError(String), SendError(String),
} }
@ -720,10 +750,11 @@ pub(super) enum ChannelError {
impl fmt::Debug for ChannelError { impl fmt::Debug for ChannelError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self { match self {
&ChannelError::Ignore(ref e) => write!(f, "Ignore : {}", e), &ChannelError::Ignore(ref e) => write!(f, "Ignore: {}", e),
&ChannelError::Warn(ref e) => write!(f, "Warn : {}", e), &ChannelError::Warn(ref e) => write!(f, "Warn: {}", e),
&ChannelError::Close((ref e, _)) => write!(f, "Close : {}", e), &ChannelError::WarnAndDisconnect(ref e) => write!(f, "Disconnecting with warning: {}", e),
&ChannelError::SendError(ref e) => write!(f, "Not Found : {}", e), &ChannelError::Close((ref e, _)) => write!(f, "Close: {}", e),
&ChannelError::SendError(ref e) => write!(f, "Not Found: {}", e),
} }
} }
} }
@ -733,6 +764,7 @@ impl fmt::Display for ChannelError {
match self { match self {
&ChannelError::Ignore(ref e) => write!(f, "{}", e), &ChannelError::Ignore(ref e) => write!(f, "{}", e),
&ChannelError::Warn(ref e) => write!(f, "{}", e), &ChannelError::Warn(ref e) => write!(f, "{}", e),
&ChannelError::WarnAndDisconnect(ref e) => write!(f, "{}", e),
&ChannelError::Close((ref e, _)) => write!(f, "{}", e), &ChannelError::Close((ref e, _)) => write!(f, "{}", e),
&ChannelError::SendError(ref e) => write!(f, "{}", e), &ChannelError::SendError(ref e) => write!(f, "{}", e),
} }
@ -1112,9 +1144,8 @@ pub(crate) const MIN_AFFORDABLE_HTLC_COUNT: usize = 4;
/// * `EXPIRE_PREV_CONFIG_TICKS` = convergence_delay / tick_interval /// * `EXPIRE_PREV_CONFIG_TICKS` = convergence_delay / tick_interval
pub(crate) const EXPIRE_PREV_CONFIG_TICKS: usize = 5; pub(crate) const EXPIRE_PREV_CONFIG_TICKS: usize = 5;
/// The number of ticks that may elapse while we're waiting for a response to a /// The number of ticks that may elapse while we're waiting for a response before we attempt to
/// [`msgs::RevokeAndACK`] or [`msgs::ChannelReestablish`] message before we attempt to disconnect /// disconnect them.
/// them.
/// ///
/// See [`ChannelContext::sent_message_awaiting_response`] for more information. /// See [`ChannelContext::sent_message_awaiting_response`] for more information.
pub(crate) const DISCONNECT_PEER_AWAITING_RESPONSE_TICKS: usize = 2; pub(crate) const DISCONNECT_PEER_AWAITING_RESPONSE_TICKS: usize = 2;
@ -1842,16 +1873,14 @@ pub(super) struct ChannelContext<SP: Deref> where SP::Target: SignerProvider {
pub workaround_lnd_bug_4006: Option<msgs::ChannelReady>, pub workaround_lnd_bug_4006: Option<msgs::ChannelReady>,
/// An option set when we wish to track how many ticks have elapsed while waiting for a response /// An option set when we wish to track how many ticks have elapsed while waiting for a response
/// from our counterparty after sending a message. If the peer has yet to respond after reaching /// from our counterparty after entering specific states. If the peer has yet to respond after
/// `DISCONNECT_PEER_AWAITING_RESPONSE_TICKS`, a reconnection should be attempted to try to /// reaching `DISCONNECT_PEER_AWAITING_RESPONSE_TICKS`, a reconnection should be attempted to
/// unblock the state machine. /// try to unblock the state machine.
/// ///
/// This behavior is mostly motivated by a lnd bug in which we don't receive a message we expect /// This behavior was initially motivated by a lnd bug in which we don't receive a message we
/// to in a timely manner, which may lead to channels becoming unusable and/or force-closed. An /// expect to in a timely manner, which may lead to channels becoming unusable and/or
/// example of such can be found at <https://github.com/lightningnetwork/lnd/issues/7682>. /// force-closed. An example of such can be found at
/// /// <https://github.com/lightningnetwork/lnd/issues/7682>.
/// This is currently only used when waiting for a [`msgs::ChannelReestablish`] or
/// [`msgs::RevokeAndACK`] message from the counterparty.
sent_message_awaiting_response: Option<usize>, sent_message_awaiting_response: Option<usize>,
/// This channel's type, as negotiated during channel open /// This channel's type, as negotiated during channel open
@ -1897,6 +1926,7 @@ pub(super) struct ChannelContext<SP: Deref> where SP::Target: SignerProvider {
/// If we can't release a [`ChannelMonitorUpdate`] until some external action completes, we /// If we can't release a [`ChannelMonitorUpdate`] until some external action completes, we
/// store it here and only release it to the `ChannelManager` once it asks for it. /// store it here and only release it to the `ChannelManager` once it asks for it.
blocked_monitor_updates: Vec<PendingChannelMonitorUpdate>, blocked_monitor_updates: Vec<PendingChannelMonitorUpdate>,
// The `next_funding_txid` field allows peers to finalize the signing steps of an interactive // The `next_funding_txid` field allows peers to finalize the signing steps of an interactive
// transaction construction, or safely abort that transaction if it was not signed by one of the // transaction construction, or safely abort that transaction if it was not signed by one of the
// peers, who has thus already removed it from its state. // peers, who has thus already removed it from its state.
@ -1912,6 +1942,10 @@ pub(super) struct ChannelContext<SP: Deref> where SP::Target: SignerProvider {
// TODO(dual_funding): Persist this when we actually contribute funding inputs. For now we always // TODO(dual_funding): Persist this when we actually contribute funding inputs. For now we always
// send an empty witnesses array in `tx_signatures` as a V2 channel acceptor // send an empty witnesses array in `tx_signatures` as a V2 channel acceptor
next_funding_txid: Option<Txid>, next_funding_txid: Option<Txid>,
/// Only set when a counterparty `stfu` has been processed to track which node is allowed to
/// propose "something fundamental" upon becoming quiescent.
is_holder_quiescence_initiator: Option<bool>,
} }
/// A channel struct implementing this trait can receive an initial counterparty commitment /// A channel struct implementing this trait can receive an initial counterparty commitment
@ -2603,6 +2637,8 @@ impl<SP: Deref> ChannelContext<SP> where SP::Target: SignerProvider {
is_manual_broadcast: false, is_manual_broadcast: false,
next_funding_txid: None, next_funding_txid: None,
is_holder_quiescence_initiator: None,
}; };
Ok((funding, channel_context)) Ok((funding, channel_context))
@ -2832,6 +2868,8 @@ impl<SP: Deref> ChannelContext<SP> where SP::Target: SignerProvider {
local_initiated_shutdown: None, local_initiated_shutdown: None,
is_manual_broadcast: false, is_manual_broadcast: false,
next_funding_txid: None, next_funding_txid: None,
is_holder_quiescence_initiator: None,
}; };
Ok((funding, channel_context)) Ok((funding, channel_context))
@ -2921,6 +2959,57 @@ impl<SP: Deref> ChannelContext<SP> where SP::Target: SignerProvider {
} }
} }
/// Checks whether the channel has any HTLC additions, HTLC removals, or fee updates that have
/// been sent by either side but not yet irrevocably committed on both commitments. Holding cell
/// updates are not considered because they haven't been sent to the peer yet.
///
/// This can be used to satisfy quiescence's requirement when sending `stfu`:
/// - MUST NOT send `stfu` if any of the sender's htlc additions, htlc removals
/// or fee updates are pending for either peer.
fn has_pending_channel_update(&self) -> bool {
// An update from the local/remote node may be pending on the remote/local commitment since
// they are not tracked within our state, so we rely on whether any `commitment_signed` or
// `revoke_and_ack` messages are owed.
//
// We check these flags first as they are more likely to be set.
if self.channel_state.is_awaiting_remote_revoke() || self.expecting_peer_commitment_signed
|| self.monitor_pending_revoke_and_ack || self.signer_pending_revoke_and_ack
|| self.monitor_pending_commitment_signed || self.signer_pending_commitment_update
{
return true;
}
// A fee update is pending on either commitment.
if self.pending_update_fee.is_some() {
return true;
}
if self.pending_inbound_htlcs.iter()
.any(|htlc| match htlc.state {
InboundHTLCState::Committed => false,
// An HTLC removal from the local node is pending on the remote commitment.
InboundHTLCState::LocalRemoved(_) => true,
// An HTLC add from the remote node is pending on the local commitment.
InboundHTLCState::RemoteAnnounced(_)
| InboundHTLCState::AwaitingRemoteRevokeToAnnounce(_)
| InboundHTLCState::AwaitingAnnouncedRemoteRevoke(_) => true,
})
{
return true;
}
self.pending_outbound_htlcs.iter()
.any(|htlc| match htlc.state {
OutboundHTLCState::Committed => false,
// An HTLC add from the local node is pending on the remote commitment.
OutboundHTLCState::LocalAnnounced(_) => true,
// An HTLC removal from the remote node is pending on the local commitment.
OutboundHTLCState::RemoteRemoved(_)
| OutboundHTLCState::AwaitingRemoteRevokeToRemove(_)
| OutboundHTLCState::AwaitingRemovedRemoteRevoke(_) => true,
})
}
// Public utilities: // Public utilities:
pub fn channel_id(&self) -> ChannelId { pub fn channel_id(&self) -> ChannelId {
@ -5140,6 +5229,9 @@ impl<SP: Deref> FundedChannel<SP> where
pub fn update_add_htlc<F: Deref>( pub fn update_add_htlc<F: Deref>(
&mut self, msg: &msgs::UpdateAddHTLC, fee_estimator: &LowerBoundedFeeEstimator<F>, &mut self, msg: &msgs::UpdateAddHTLC, fee_estimator: &LowerBoundedFeeEstimator<F>,
) -> Result<(), ChannelError> where F::Target: FeeEstimator { ) -> Result<(), ChannelError> where F::Target: FeeEstimator {
if self.context.channel_state.is_remote_stfu_sent() || self.context.channel_state.is_quiescent() {
return Err(ChannelError::WarnAndDisconnect("Got add HTLC message while quiescent".to_owned()));
}
if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) { if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) {
return Err(ChannelError::close("Got add HTLC message when channel was not in an operational state".to_owned())); return Err(ChannelError::close("Got add HTLC message when channel was not in an operational state".to_owned()));
} }
@ -5284,6 +5376,9 @@ impl<SP: Deref> FundedChannel<SP> where
} }
pub fn update_fulfill_htlc(&mut self, msg: &msgs::UpdateFulfillHTLC) -> Result<(HTLCSource, u64, Option<u64>), ChannelError> { pub fn update_fulfill_htlc(&mut self, msg: &msgs::UpdateFulfillHTLC) -> Result<(HTLCSource, u64, Option<u64>), ChannelError> {
if self.context.channel_state.is_remote_stfu_sent() || self.context.channel_state.is_quiescent() {
return Err(ChannelError::WarnAndDisconnect("Got fulfill HTLC message while quiescent".to_owned()));
}
if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) { if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) {
return Err(ChannelError::close("Got fulfill HTLC message when channel was not in an operational state".to_owned())); return Err(ChannelError::close("Got fulfill HTLC message when channel was not in an operational state".to_owned()));
} }
@ -5295,6 +5390,9 @@ impl<SP: Deref> FundedChannel<SP> where
} }
pub fn update_fail_htlc(&mut self, msg: &msgs::UpdateFailHTLC, fail_reason: HTLCFailReason) -> Result<(), ChannelError> { pub fn update_fail_htlc(&mut self, msg: &msgs::UpdateFailHTLC, fail_reason: HTLCFailReason) -> Result<(), ChannelError> {
if self.context.channel_state.is_remote_stfu_sent() || self.context.channel_state.is_quiescent() {
return Err(ChannelError::WarnAndDisconnect("Got fail HTLC message while quiescent".to_owned()));
}
if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) { if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) {
return Err(ChannelError::close("Got fail HTLC message when channel was not in an operational state".to_owned())); return Err(ChannelError::close("Got fail HTLC message when channel was not in an operational state".to_owned()));
} }
@ -5307,6 +5405,9 @@ impl<SP: Deref> FundedChannel<SP> where
} }
pub fn update_fail_malformed_htlc(&mut self, msg: &msgs::UpdateFailMalformedHTLC, fail_reason: HTLCFailReason) -> Result<(), ChannelError> { pub fn update_fail_malformed_htlc(&mut self, msg: &msgs::UpdateFailMalformedHTLC, fail_reason: HTLCFailReason) -> Result<(), ChannelError> {
if self.context.channel_state.is_remote_stfu_sent() || self.context.channel_state.is_quiescent() {
return Err(ChannelError::WarnAndDisconnect("Got fail malformed HTLC message while quiescent".to_owned()));
}
if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) { if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) {
return Err(ChannelError::close("Got fail malformed HTLC message when channel was not in an operational state".to_owned())); return Err(ChannelError::close("Got fail malformed HTLC message when channel was not in an operational state".to_owned()));
} }
@ -5358,6 +5459,9 @@ impl<SP: Deref> FundedChannel<SP> where
pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<Option<ChannelMonitorUpdate>, ChannelError> pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<Option<ChannelMonitorUpdate>, ChannelError>
where L::Target: Logger where L::Target: Logger
{ {
if self.context.channel_state.is_quiescent() {
return Err(ChannelError::WarnAndDisconnect("Got commitment_signed message while quiescent".to_owned()));
}
if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) { if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) {
return Err(ChannelError::close("Got commitment signed message when channel was not in an operational state".to_owned())); return Err(ChannelError::close("Got commitment signed message when channel was not in an operational state".to_owned()));
} }
@ -5607,7 +5711,9 @@ impl<SP: Deref> FundedChannel<SP> where
) -> (Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) ) -> (Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>)
where F::Target: FeeEstimator, L::Target: Logger where F::Target: FeeEstimator, L::Target: Logger
{ {
assert!(matches!(self.context.channel_state, ChannelState::ChannelReady(_)));
assert!(!self.context.channel_state.is_monitor_update_in_progress()); assert!(!self.context.channel_state.is_monitor_update_in_progress());
assert!(!self.context.channel_state.is_quiescent());
if self.context.holding_cell_htlc_updates.len() != 0 || self.context.holding_cell_update_fee.is_some() { if self.context.holding_cell_htlc_updates.len() != 0 || self.context.holding_cell_update_fee.is_some() {
log_trace!(logger, "Freeing holding cell with {} HTLC updates{} in channel {}", self.context.holding_cell_htlc_updates.len(), log_trace!(logger, "Freeing holding cell with {} HTLC updates{} in channel {}", self.context.holding_cell_htlc_updates.len(),
if self.context.holding_cell_update_fee.is_some() { " and a fee update" } else { "" }, &self.context.channel_id()); if self.context.holding_cell_update_fee.is_some() { " and a fee update" } else { "" }, &self.context.channel_id());
@ -5640,7 +5746,16 @@ impl<SP: Deref> FundedChannel<SP> where
amount_msat, *payment_hash, cltv_expiry, source.clone(), onion_routing_packet.clone(), amount_msat, *payment_hash, cltv_expiry, source.clone(), onion_routing_packet.clone(),
false, skimmed_fee_msat, blinding_point, fee_estimator, logger false, skimmed_fee_msat, blinding_point, fee_estimator, logger
) { ) {
Ok(_) => update_add_count += 1, Ok(update_add_msg_opt) => {
// `send_htlc` only returns `Ok(None)`, when an update goes into
// the holding cell, but since we're currently freeing it, we should
// always expect to see the `update_add` go out.
debug_assert!(
update_add_msg_opt.is_some(),
"Must generate new update if we're freeing the holding cell"
);
update_add_count += 1;
},
Err(e) => { Err(e) => {
match e { match e {
ChannelError::Ignore(ref msg) => { ChannelError::Ignore(ref msg) => {
@ -5743,6 +5858,9 @@ impl<SP: Deref> FundedChannel<SP> where
) -> Result<(Vec<(HTLCSource, PaymentHash)>, Option<ChannelMonitorUpdate>), ChannelError> ) -> Result<(Vec<(HTLCSource, PaymentHash)>, Option<ChannelMonitorUpdate>), ChannelError>
where F::Target: FeeEstimator, L::Target: Logger, where F::Target: FeeEstimator, L::Target: Logger,
{ {
if self.context.channel_state.is_quiescent() {
return Err(ChannelError::WarnAndDisconnect("Got revoke_and_ack message while quiescent".to_owned()));
}
if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) { if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) {
return Err(ChannelError::close("Got revoke/ACK message when channel was not in an operational state".to_owned())); return Err(ChannelError::close("Got revoke/ACK message when channel was not in an operational state".to_owned()));
} }
@ -5808,7 +5926,7 @@ impl<SP: Deref> FundedChannel<SP> where
// OK, we step the channel here and *then* if the new generation fails we can fail the // OK, we step the channel here and *then* if the new generation fails we can fail the
// channel based on that, but stepping stuff here should be safe either way. // channel based on that, but stepping stuff here should be safe either way.
self.context.channel_state.clear_awaiting_remote_revoke(); self.context.channel_state.clear_awaiting_remote_revoke();
self.context.sent_message_awaiting_response = None; self.mark_response_received();
self.context.counterparty_prev_commitment_point = self.context.counterparty_cur_commitment_point; self.context.counterparty_prev_commitment_point = self.context.counterparty_cur_commitment_point;
self.context.counterparty_cur_commitment_point = Some(msg.next_per_commitment_point); self.context.counterparty_cur_commitment_point = Some(msg.next_per_commitment_point);
self.context.cur_counterparty_commitment_transaction_number -= 1; self.context.cur_counterparty_commitment_transaction_number -= 1;
@ -5960,29 +6078,7 @@ impl<SP: Deref> FundedChannel<SP> where
self.context.monitor_pending_update_adds.append(&mut pending_update_adds); self.context.monitor_pending_update_adds.append(&mut pending_update_adds);
if self.context.channel_state.is_monitor_update_in_progress() { match self.maybe_free_holding_cell_htlcs(fee_estimator, logger) {
// We can't actually generate a new commitment transaction (incl by freeing holding
// cells) while we can't update the monitor, so we just return what we have.
if require_commitment {
self.context.monitor_pending_commitment_signed = true;
// When the monitor updating is restored we'll call
// get_last_commitment_update_for_send(), which does not update state, but we're
// definitely now awaiting a remote revoke before we can step forward any more, so
// set it here.
let mut additional_update = self.build_commitment_no_status_check(logger);
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
// strictly increasing by one, so decrement it here.
self.context.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);
}
self.context.monitor_pending_forwards.append(&mut to_forward_infos);
self.context.monitor_pending_failures.append(&mut revoked_htlcs);
self.context.monitor_pending_finalized_fulfills.append(&mut finalized_claimed_htlcs);
log_debug!(logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply.", &self.context.channel_id());
return_with_htlcs_to_fail!(Vec::new());
}
match self.free_holding_cell_htlcs(fee_estimator, logger) {
(Some(mut additional_update), htlcs_to_fail) => { (Some(mut additional_update), htlcs_to_fail) => {
// free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be // free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
// strictly increasing by one, so decrement it here. // strictly increasing by one, so decrement it here.
@ -5997,6 +6093,11 @@ impl<SP: Deref> FundedChannel<SP> where
}, },
(None, htlcs_to_fail) => { (None, htlcs_to_fail) => {
if require_commitment { if require_commitment {
// We can't generate a new commitment transaction yet so we just return what we
// have. When the monitor updating is restored we'll call
// get_last_commitment_update_for_send(), which does not update state, but we're
// definitely now awaiting a remote revoke before we can step forward any more,
// so set it here.
let mut additional_update = self.build_commitment_no_status_check(logger); let mut additional_update = self.build_commitment_no_status_check(logger);
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be // build_commitment_no_status_check may bump latest_monitor_id but we want them to be
@ -6004,10 +6105,24 @@ impl<SP: Deref> FundedChannel<SP> where
self.context.latest_monitor_update_id = monitor_update.update_id; self.context.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates); monitor_update.updates.append(&mut additional_update.updates);
log_debug!(logger, "Received a valid revoke_and_ack for channel {}. Responding with a commitment update with {} HTLCs failed. {} monitor update.", log_debug!(logger, "Received a valid revoke_and_ack for channel {}. {} monitor update.",
&self.context.channel_id(), &self.context.channel_id(), release_state_str);
update_fail_htlcs.len() + update_fail_malformed_htlcs.len(), if self.context.channel_state.can_generate_new_commitment() {
release_state_str); log_debug!(logger, "Responding with a commitment update with {} HTLCs failed for channel {}",
update_fail_htlcs.len() + update_fail_malformed_htlcs.len(),
&self.context.channel_id);
} else {
debug_assert!(htlcs_to_fail.is_empty());
let reason = if self.context.channel_state.is_local_stfu_sent() {
"exits quiescence"
} else if self.context.channel_state.is_monitor_update_in_progress() {
"completes pending monitor update"
} else {
"can continue progress"
};
log_debug!(logger, "Holding back commitment update until channel {} {}",
&self.context.channel_id, reason);
}
self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs); self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
return_with_htlcs_to_fail!(htlcs_to_fail); return_with_htlcs_to_fail!(htlcs_to_fail);
@ -6144,7 +6259,10 @@ impl<SP: Deref> FundedChannel<SP> where
return None; return None;
} }
if self.context.channel_state.is_awaiting_remote_revoke() || self.context.channel_state.is_monitor_update_in_progress() { // Some of the checks of `can_generate_new_commitment` have already been done above, but
// it's much more brittle to not use it in favor of checking the remaining flags left, as it
// gives us one less code path to update if the method changes.
if !self.context.channel_state.can_generate_new_commitment() {
force_holding_cell = true; force_holding_cell = true;
} }
@ -6174,6 +6292,10 @@ impl<SP: Deref> FundedChannel<SP> where
return Err(()) return Err(())
} }
// We only clear `peer_disconnected` if we were able to reestablish the channel. We always
// reset our awaiting response in case we failed reestablishment and are disconnecting.
self.context.sent_message_awaiting_response = None;
if self.context.channel_state.is_peer_disconnected() { if self.context.channel_state.is_peer_disconnected() {
// While the below code should be idempotent, it's simpler to just return early, as // While the below code should be idempotent, it's simpler to just return early, as
// redundant disconnect events can fire, though they should be rare. // redundant disconnect events can fire, though they should be rare.
@ -6234,7 +6356,14 @@ impl<SP: Deref> FundedChannel<SP> where
} }
} }
self.context.sent_message_awaiting_response = None; // Reset any quiescence-related state as it is implicitly terminated once disconnected.
if matches!(self.context.channel_state, ChannelState::ChannelReady(_)) {
self.context.channel_state.clear_awaiting_quiescence();
self.context.channel_state.clear_local_stfu_sent();
self.context.channel_state.clear_remote_stfu_sent();
self.context.channel_state.clear_quiescent();
self.context.is_holder_quiescence_initiator.take();
}
self.context.channel_state.set_peer_disconnected(); self.context.channel_state.set_peer_disconnected();
log_trace!(logger, "Peer disconnection resulted in {} remote-announced HTLC drops on channel {}", inbound_drop_count, &self.context.channel_id()); log_trace!(logger, "Peer disconnection resulted in {} remote-announced HTLC drops on channel {}", inbound_drop_count, &self.context.channel_id());
@ -6351,10 +6480,6 @@ impl<SP: Deref> FundedChannel<SP> where
commitment_update = None; commitment_update = None;
} }
if commitment_update.is_some() {
self.mark_awaiting_response();
}
self.context.monitor_pending_revoke_and_ack = false; self.context.monitor_pending_revoke_and_ack = false;
self.context.monitor_pending_commitment_signed = false; self.context.monitor_pending_commitment_signed = false;
let order = self.context.resend_order.clone(); let order = self.context.resend_order.clone();
@ -6397,6 +6522,9 @@ impl<SP: Deref> FundedChannel<SP> where
if self.context.channel_state.is_peer_disconnected() { if self.context.channel_state.is_peer_disconnected() {
return Err(ChannelError::close("Peer sent update_fee when we needed a channel_reestablish".to_owned())); return Err(ChannelError::close("Peer sent update_fee when we needed a channel_reestablish".to_owned()));
} }
if self.context.channel_state.is_remote_stfu_sent() || self.context.channel_state.is_quiescent() {
return Err(ChannelError::WarnAndDisconnect("Got fee update message while quiescent".to_owned()));
}
FundedChannel::<SP>::check_remote_fee(&self.context.channel_type, fee_estimator, msg.feerate_per_kw, Some(self.context.feerate_per_kw), logger)?; FundedChannel::<SP>::check_remote_fee(&self.context.channel_type, fee_estimator, msg.feerate_per_kw, Some(self.context.feerate_per_kw), logger)?;
self.context.pending_update_fee = Some((msg.feerate_per_kw, FeeUpdateState::RemoteAnnounced)); self.context.pending_update_fee = Some((msg.feerate_per_kw, FeeUpdateState::RemoteAnnounced));
@ -6708,7 +6836,7 @@ impl<SP: Deref> FundedChannel<SP> where
// Go ahead and unmark PeerDisconnected as various calls we may make check for it (and all // Go ahead and unmark PeerDisconnected as various calls we may make check for it (and all
// remaining cases either succeed or ErrorMessage-fail). // remaining cases either succeed or ErrorMessage-fail).
self.context.channel_state.clear_peer_disconnected(); self.context.channel_state.clear_peer_disconnected();
self.context.sent_message_awaiting_response = None; self.mark_response_received();
let shutdown_msg = self.get_outbound_shutdown(); let shutdown_msg = self.get_outbound_shutdown();
@ -6764,9 +6892,6 @@ impl<SP: Deref> FundedChannel<SP> where
// AwaitingRemoteRevoke set, which indicates we sent a commitment_signed but haven't gotten // AwaitingRemoteRevoke set, which indicates we sent a commitment_signed but haven't gotten
// the corresponding revoke_and_ack back yet. // the corresponding revoke_and_ack back yet.
let is_awaiting_remote_revoke = self.context.channel_state.is_awaiting_remote_revoke(); let is_awaiting_remote_revoke = self.context.channel_state.is_awaiting_remote_revoke();
if is_awaiting_remote_revoke && !self.is_awaiting_monitor_update() {
self.mark_awaiting_response();
}
let next_counterparty_commitment_number = INITIAL_COMMITMENT_NUMBER - self.context.cur_counterparty_commitment_transaction_number + if is_awaiting_remote_revoke { 1 } else { 0 }; let next_counterparty_commitment_number = INITIAL_COMMITMENT_NUMBER - self.context.cur_counterparty_commitment_transaction_number + if is_awaiting_remote_revoke { 1 } else { 0 };
let channel_ready = if msg.next_local_commitment_number == 1 && INITIAL_COMMITMENT_NUMBER - self.holder_commitment_point.transaction_number() == 1 { let channel_ready = if msg.next_local_commitment_number == 1 && INITIAL_COMMITMENT_NUMBER - self.holder_commitment_point.transaction_number() == 1 {
@ -6951,26 +7076,38 @@ impl<SP: Deref> FundedChannel<SP> where
Ok((closing_signed, None, None)) Ok((closing_signed, None, None))
} }
// Marks a channel as waiting for a response from the counterparty. If it's not received fn mark_response_received(&mut self) {
// [`DISCONNECT_PEER_AWAITING_RESPONSE_TICKS`] after sending our own to them, then we'll attempt self.context.sent_message_awaiting_response = None;
// a reconnection.
fn mark_awaiting_response(&mut self) {
self.context.sent_message_awaiting_response = Some(0);
} }
/// Determines whether we should disconnect the counterparty due to not receiving a response /// Determines whether we should disconnect the counterparty due to not receiving a response
/// within our expected timeframe. /// within our expected timeframe.
/// ///
/// This should be called on every [`super::channelmanager::ChannelManager::timer_tick_occurred`]. /// This should be called for peers with an active socket on every
/// [`super::channelmanager::ChannelManager::timer_tick_occurred`].
#[allow(clippy::assertions_on_constants)]
pub fn should_disconnect_peer_awaiting_response(&mut self) -> bool { pub fn should_disconnect_peer_awaiting_response(&mut self) -> bool {
let ticks_elapsed = if let Some(ticks_elapsed) = self.context.sent_message_awaiting_response.as_mut() { if let Some(ticks_elapsed) = self.context.sent_message_awaiting_response.as_mut() {
ticks_elapsed *ticks_elapsed += 1;
*ticks_elapsed >= DISCONNECT_PEER_AWAITING_RESPONSE_TICKS
} else if
// Cleared upon receiving `channel_reestablish`.
self.context.channel_state.is_peer_disconnected()
// Cleared upon receiving `stfu`.
|| self.context.channel_state.is_local_stfu_sent()
// Cleared upon receiving a message that triggers the end of quiescence.
|| self.context.channel_state.is_quiescent()
// Cleared upon receiving `revoke_and_ack`.
|| self.context.has_pending_channel_update()
{
// This is the first tick we've seen after expecting to make forward progress.
self.context.sent_message_awaiting_response = Some(1);
debug_assert!(DISCONNECT_PEER_AWAITING_RESPONSE_TICKS > 1);
false
} else { } else {
// Don't disconnect when we're not waiting on a response. // Don't disconnect when we're not waiting on a response.
return false; false
}; }
*ticks_elapsed += 1;
*ticks_elapsed >= DISCONNECT_PEER_AWAITING_RESPONSE_TICKS
} }
pub fn shutdown( pub fn shutdown(
@ -6993,6 +7130,14 @@ impl<SP: Deref> FundedChannel<SP> where
} }
assert!(!matches!(self.context.channel_state, ChannelState::ShutdownComplete)); assert!(!matches!(self.context.channel_state, ChannelState::ShutdownComplete));
// TODO: The spec is pretty vague regarding the handling of shutdown within quiescence.
if self.context.channel_state.is_local_stfu_sent()
|| self.context.channel_state.is_remote_stfu_sent()
|| self.context.channel_state.is_quiescent()
{
return Err(ChannelError::WarnAndDisconnect("Got shutdown request while quiescent".to_owned()));
}
if !script::is_bolt2_compliant(&msg.scriptpubkey, their_features) { if !script::is_bolt2_compliant(&msg.scriptpubkey, their_features) {
return Err(ChannelError::Warn(format!("Got a nonstandard scriptpubkey ({}) from remote peer", msg.scriptpubkey.to_hex_string()))); return Err(ChannelError::Warn(format!("Got a nonstandard scriptpubkey ({}) from remote peer", msg.scriptpubkey.to_hex_string())));
} }
@ -7029,6 +7174,11 @@ impl<SP: Deref> FundedChannel<SP> where
// From here on out, we may not fail! // From here on out, we may not fail!
self.context.channel_state.set_remote_shutdown_sent(); self.context.channel_state.set_remote_shutdown_sent();
if self.context.channel_state.is_awaiting_quiescence() {
// We haven't been able to send `stfu` yet, and there's no point in attempting
// quiescence anymore since the counterparty wishes to close the channel.
self.context.channel_state.clear_awaiting_quiescence();
}
self.context.update_time_counter += 1; self.context.update_time_counter += 1;
let monitor_update = if update_shutdown_script { let monitor_update = if update_shutdown_script {
@ -8120,7 +8270,6 @@ impl<SP: Deref> FundedChannel<SP> where
log_info!(logger, "Sending a data_loss_protect with no previous remote per_commitment_secret for channel {}", &self.context.channel_id()); log_info!(logger, "Sending a data_loss_protect with no previous remote per_commitment_secret for channel {}", &self.context.channel_id());
[0;32] [0;32]
}; };
self.mark_awaiting_response();
msgs::ChannelReestablish { msgs::ChannelReestablish {
channel_id: self.context.channel_id(), channel_id: self.context.channel_id(),
// The protocol has two different commitment number concepts - the "commitment // The protocol has two different commitment number concepts - the "commitment
@ -8481,6 +8630,12 @@ impl<SP: Deref> FundedChannel<SP> where
target_feerate_sats_per_kw: Option<u32>, override_shutdown_script: Option<ShutdownScript>) target_feerate_sats_per_kw: Option<u32>, override_shutdown_script: Option<ShutdownScript>)
-> Result<(msgs::Shutdown, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError> -> Result<(msgs::Shutdown, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
{ {
if self.context.channel_state.is_local_stfu_sent()
|| self.context.channel_state.is_remote_stfu_sent()
|| self.context.channel_state.is_quiescent()
{
return Err(APIError::APIMisuseError { err: "Cannot begin shutdown while quiescent".to_owned() });
}
for htlc in self.context.pending_outbound_htlcs.iter() { for htlc in self.context.pending_outbound_htlcs.iter() {
if let OutboundHTLCState::LocalAnnounced(_) = htlc.state { if let OutboundHTLCState::LocalAnnounced(_) = htlc.state {
return Err(APIError::APIMisuseError{err: "Cannot begin shutdown with pending HTLCs. Process pending events first".to_owned()}); return Err(APIError::APIMisuseError{err: "Cannot begin shutdown with pending HTLCs. Process pending events first".to_owned()});
@ -8525,6 +8680,9 @@ impl<SP: Deref> FundedChannel<SP> where
// From here on out, we may not fail! // From here on out, we may not fail!
self.context.target_closing_feerate_sats_per_kw = target_feerate_sats_per_kw; self.context.target_closing_feerate_sats_per_kw = target_feerate_sats_per_kw;
self.context.channel_state.set_local_shutdown_sent(); self.context.channel_state.set_local_shutdown_sent();
if self.context.channel_state.is_awaiting_quiescence() {
self.context.channel_state.clear_awaiting_quiescence();
}
self.context.local_initiated_shutdown = Some(()); self.context.local_initiated_shutdown = Some(());
self.context.update_time_counter += 1; self.context.update_time_counter += 1;
@ -8590,6 +8748,200 @@ impl<SP: Deref> FundedChannel<SP> where
self.context.counterparty_max_htlc_value_in_flight_msat self.context.counterparty_max_htlc_value_in_flight_msat
); );
} }
#[cfg(any(test, fuzzing))]
pub fn propose_quiescence<L: Deref>(
&mut self, logger: &L,
) -> Result<Option<msgs::Stfu>, ChannelError>
where
L::Target: Logger,
{
log_debug!(logger, "Attempting to initiate quiescence");
if !self.context.is_live() {
return Err(ChannelError::Ignore(
"Channel is not in a live state to propose quiescence".to_owned()
));
}
if self.context.channel_state.is_quiescent() {
return Err(ChannelError::Ignore("Channel is already quiescent".to_owned()));
}
if self.context.channel_state.is_awaiting_quiescence()
|| self.context.channel_state.is_local_stfu_sent()
{
return Ok(None);
}
self.context.channel_state.set_awaiting_quiescence();
Ok(Some(self.send_stfu(logger)?))
}
// Assumes we are either awaiting quiescence or our counterparty has requested quiescence.
pub fn send_stfu<L: Deref>(&mut self, logger: &L) -> Result<msgs::Stfu, ChannelError>
where
L::Target: Logger,
{
debug_assert!(!self.context.channel_state.is_local_stfu_sent());
// Either state being set implies the channel is live.
debug_assert!(
self.context.channel_state.is_awaiting_quiescence()
|| self.context.channel_state.is_remote_stfu_sent()
);
debug_assert!(self.context.is_live());
if self.context.has_pending_channel_update() {
return Err(ChannelError::Ignore(
"We cannot send `stfu` while state machine is pending".to_owned()
));
}
let initiator = if self.context.channel_state.is_remote_stfu_sent() {
// We may have also attempted to initiate quiescence.
self.context.channel_state.clear_awaiting_quiescence();
self.context.channel_state.clear_remote_stfu_sent();
self.context.channel_state.set_quiescent();
if let Some(initiator) = self.context.is_holder_quiescence_initiator.as_ref() {
log_debug!(
logger,
"Responding to counterparty stfu with our own, channel is now quiescent and we are{} the initiator",
if !initiator { " not" } else { "" }
);
*initiator
} else {
debug_assert!(false, "Quiescence initiator must have been set when we received stfu");
false
}
} else {
log_debug!(logger, "Sending stfu as quiescence initiator");
debug_assert!(self.context.channel_state.is_awaiting_quiescence());
self.context.channel_state.clear_awaiting_quiescence();
self.context.channel_state.set_local_stfu_sent();
true
};
Ok(msgs::Stfu { channel_id: self.context.channel_id, initiator })
}
pub fn stfu<L: Deref>(
&mut self, msg: &msgs::Stfu, logger: &L
) -> Result<Option<msgs::Stfu>, ChannelError> where L::Target: Logger {
if self.context.channel_state.is_quiescent() {
return Err(ChannelError::Warn("Channel is already quiescent".to_owned()));
}
if self.context.channel_state.is_remote_stfu_sent() {
return Err(ChannelError::Warn(
"Peer sent `stfu` when they already sent it and we've yet to become quiescent".to_owned()
));
}
if !self.context.is_live() {
return Err(ChannelError::Warn(
"Peer sent `stfu` when we were not in a live state".to_owned()
));
}
if self.context.channel_state.is_awaiting_quiescence()
|| !self.context.channel_state.is_local_stfu_sent()
{
if !msg.initiator {
return Err(ChannelError::WarnAndDisconnect(
"Peer sent unexpected `stfu` without signaling as initiator".to_owned()
));
}
// We don't check `has_pending_channel_update` prior to setting the flag because it
// considers pending updates from either node. This means we may accept a counterparty
// `stfu` while they had pending updates, but that's fine as we won't send ours until
// _all_ pending updates complete, allowing the channel to become quiescent then.
self.context.channel_state.set_remote_stfu_sent();
let is_holder_initiator = if self.context.channel_state.is_awaiting_quiescence() {
// We were also planning to propose quiescence, let the tie-breaker decide the
// initiator.
self.context.is_outbound()
} else {
false
};
self.context.is_holder_quiescence_initiator = Some(is_holder_initiator);
log_debug!(logger, "Received counterparty stfu proposing quiescence");
return self.send_stfu(logger).map(|stfu| Some(stfu));
}
// We already sent `stfu` and are now processing theirs. It may be in response to ours, or
// we happened to both send `stfu` at the same time and a tie-break is needed.
let is_holder_quiescence_initiator = !msg.initiator || self.context.is_outbound();
self.context.is_holder_quiescence_initiator = Some(is_holder_quiescence_initiator);
// We were expecting to receive `stfu` because we already sent ours.
self.mark_response_received();
if self.context.has_pending_channel_update() {
// Since we've already sent `stfu`, it should not be possible for one of our updates to
// be pending, so anything pending currently must be from a counterparty update.
return Err(ChannelError::WarnAndDisconnect(
"Received counterparty stfu while having pending counterparty updates".to_owned()
));
}
self.context.channel_state.clear_local_stfu_sent();
self.context.channel_state.set_quiescent();
log_debug!(
logger,
"Received counterparty stfu, channel is now quiescent and we are{} the initiator",
if !is_holder_quiescence_initiator { " not" } else { "" }
);
Ok(None)
}
pub fn try_send_stfu<L: Deref>(
&mut self, logger: &L,
) -> Result<Option<msgs::Stfu>, ChannelError>
where
L::Target: Logger,
{
// We must never see both stfu flags set, we always set the quiescent flag instead.
debug_assert!(
!(self.context.channel_state.is_local_stfu_sent()
&& self.context.channel_state.is_remote_stfu_sent())
);
// We need to send our `stfu`, either because we're trying to initiate quiescence, or the
// counterparty is and we've yet to send ours.
if self.context.channel_state.is_awaiting_quiescence()
|| (self.context.channel_state.is_remote_stfu_sent()
&& !self.context.channel_state.is_local_stfu_sent())
{
return self.send_stfu(logger).map(|stfu| Some(stfu));
}
// We're either:
// - already quiescent
// - in a state where quiescence is not possible
// - not currently trying to become quiescent
Ok(None)
}
#[cfg(any(test, fuzzing))]
pub fn exit_quiescence(&mut self) -> bool {
// Make sure we either finished the quiescence handshake and are quiescent, or we never
// attempted to initiate quiescence at all.
debug_assert!(!self.context.channel_state.is_awaiting_quiescence());
debug_assert!(!self.context.channel_state.is_local_stfu_sent());
debug_assert!(!self.context.channel_state.is_remote_stfu_sent());
if self.context.channel_state.is_quiescent() {
self.mark_response_received();
self.context.channel_state.clear_quiescent();
self.context.is_holder_quiescence_initiator.take().expect("Must always be set while quiescent")
} else {
false
}
}
} }
/// A not-yet-funded outbound (from holder) channel using V1 channel establishment. /// A not-yet-funded outbound (from holder) channel using V1 channel establishment.
@ -9581,11 +9933,17 @@ impl<SP: Deref> Writeable for FundedChannel<SP> where SP::Target: SignerProvider
self.context.channel_id.write(writer)?; self.context.channel_id.write(writer)?;
{ {
let mut channel_state = self.context.channel_state; let mut channel_state = self.context.channel_state;
if matches!(channel_state, ChannelState::AwaitingChannelReady(_)|ChannelState::ChannelReady(_)) { match channel_state {
channel_state.set_peer_disconnected(); ChannelState::AwaitingChannelReady(_) => {},
} else { ChannelState::ChannelReady(_) => {
debug_assert!(false, "Pre-funded/shutdown channels should not be written"); channel_state.clear_awaiting_quiescence();
channel_state.clear_local_stfu_sent();
channel_state.clear_remote_stfu_sent();
channel_state.clear_quiescent();
},
_ => debug_assert!(false, "Pre-funded/shutdown channels should not be written"),
} }
channel_state.set_peer_disconnected();
channel_state.to_u32().write(writer)?; channel_state.to_u32().write(writer)?;
} }
self.funding.channel_value_satoshis.write(writer)?; self.funding.channel_value_satoshis.write(writer)?;
@ -10497,6 +10855,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
blocked_monitor_updates: blocked_monitor_updates.unwrap(), blocked_monitor_updates: blocked_monitor_updates.unwrap(),
is_manual_broadcast: is_manual_broadcast.unwrap_or(false), is_manual_broadcast: is_manual_broadcast.unwrap_or(false),
// TODO(dual_funding): Instead of getting this from persisted value, figure it out based on the // TODO(dual_funding): Instead of getting this from persisted value, figure it out based on the
// funding transaction and other channel state. // funding transaction and other channel state.
// //
@ -10504,6 +10863,8 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
// during a signing session, but have not received `tx_signatures` we MUST set `next_funding_txid` // during a signing session, but have not received `tx_signatures` we MUST set `next_funding_txid`
// to the txid of that interactive transaction, else we MUST NOT set it. // to the txid of that interactive transaction, else we MUST NOT set it.
next_funding_txid: None, next_funding_txid: None,
is_holder_quiescence_initiator: None,
}, },
interactive_tx_signing_session: None, interactive_tx_signing_session: None,
holder_commitment_point, holder_commitment_point,

View file

@ -840,6 +840,15 @@ impl MsgHandleErrInternal {
log_level: Level::Warn, log_level: Level::Warn,
}, },
}, },
ChannelError::WarnAndDisconnect(msg) => LightningError {
err: msg.clone(),
action: msgs::ErrorAction::DisconnectPeerWithWarning {
msg: msgs::WarningMessage {
channel_id,
data: msg
},
},
},
ChannelError::Ignore(msg) => LightningError { ChannelError::Ignore(msg) => LightningError {
err: msg, err: msg,
action: msgs::ErrorAction::IgnoreError, action: msgs::ErrorAction::IgnoreError,
@ -3069,6 +3078,9 @@ macro_rules! convert_channel_err {
ChannelError::Warn(msg) => { ChannelError::Warn(msg) => {
(false, MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn(msg), *$channel_id)) (false, MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn(msg), *$channel_id))
}, },
ChannelError::WarnAndDisconnect(msg) => {
(false, MsgHandleErrInternal::from_chan_no_close(ChannelError::WarnAndDisconnect(msg), *$channel_id))
},
ChannelError::Ignore(msg) => { ChannelError::Ignore(msg) => {
(false, MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore(msg), *$channel_id)) (false, MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore(msg), *$channel_id))
}, },
@ -6620,19 +6632,21 @@ where
funded_chan.context.maybe_expire_prev_config(); funded_chan.context.maybe_expire_prev_config();
if funded_chan.should_disconnect_peer_awaiting_response() { if peer_state.is_connected {
let logger = WithChannelContext::from(&self.logger, &funded_chan.context, None); if funded_chan.should_disconnect_peer_awaiting_response() {
log_debug!(logger, "Disconnecting peer {} due to not making any progress on channel {}", let logger = WithChannelContext::from(&self.logger, &funded_chan.context, None);
counterparty_node_id, chan_id); log_debug!(logger, "Disconnecting peer {} due to not making any progress on channel {}",
pending_msg_events.push(MessageSendEvent::HandleError { counterparty_node_id, chan_id);
node_id: counterparty_node_id, pending_msg_events.push(MessageSendEvent::HandleError {
action: msgs::ErrorAction::DisconnectPeerWithWarning { node_id: counterparty_node_id,
msg: msgs::WarningMessage { action: msgs::ErrorAction::DisconnectPeerWithWarning {
channel_id: *chan_id, msg: msgs::WarningMessage {
data: "Disconnecting due to timeout awaiting response".to_owned(), channel_id: *chan_id,
data: "Disconnecting due to timeout awaiting response".to_owned(),
},
}, },
}, });
}); }
} }
true true
@ -9214,6 +9228,58 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
Ok(()) Ok(())
} }
fn internal_stfu(&self, counterparty_node_id: &PublicKey, msg: &msgs::Stfu) -> Result<bool, MsgHandleErrInternal> {
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex = per_peer_state.get(counterparty_node_id).ok_or_else(|| {
debug_assert!(false);
MsgHandleErrInternal::send_err_msg_no_close(
format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id),
msg.channel_id
)
})?;
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
if !self.init_features().supports_quiescence() {
return Err(MsgHandleErrInternal::from_chan_no_close(
ChannelError::Warn("Quiescense not supported".to_string()), msg.channel_id
));
}
let mut sent_stfu = false;
match peer_state.channel_by_id.entry(msg.channel_id) {
hash_map::Entry::Occupied(mut chan_entry) => {
if let Some(chan) = chan_entry.get_mut().as_funded_mut() {
let logger = WithContext::from(
&self.logger, Some(*counterparty_node_id), Some(msg.channel_id), None
);
if let Some(stfu) = try_channel_entry!(
self, peer_state, chan.stfu(&msg, &&logger), chan_entry
) {
sent_stfu = true;
peer_state.pending_msg_events.push(MessageSendEvent::SendStfu {
node_id: *counterparty_node_id,
msg: stfu,
});
}
} else {
let msg = "Peer sent `stfu` for an unfunded channel";
let err = Err(ChannelError::Close(
(msg.into(), ClosureReason::ProcessingError { err: msg.into() })
));
return try_channel_entry!(self, peer_state, err, chan_entry);
}
},
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(
format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id),
msg.channel_id
))
}
Ok(sent_stfu)
}
fn internal_announcement_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) -> Result<(), MsgHandleErrInternal> { fn internal_announcement_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) -> Result<(), MsgHandleErrInternal> {
let per_peer_state = self.per_peer_state.read().unwrap(); let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex = per_peer_state.get(counterparty_node_id) let peer_state_mutex = per_peer_state.get(counterparty_node_id)
@ -9739,6 +9805,118 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
has_update has_update
} }
fn maybe_send_stfu(&self) {
let per_peer_state = self.per_peer_state.read().unwrap();
for (counterparty_node_id, peer_state_mutex) in per_peer_state.iter() {
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
let pending_msg_events = &mut peer_state.pending_msg_events;
for (channel_id, chan) in &mut peer_state.channel_by_id {
if let Some(funded_chan) = chan.as_funded_mut() {
let logger = WithContext::from(
&self.logger, Some(*counterparty_node_id), Some(*channel_id), None
);
match funded_chan.try_send_stfu(&&logger) {
Ok(None) => {},
Ok(Some(stfu)) => {
pending_msg_events.push(events::MessageSendEvent::SendStfu {
node_id: chan.context().get_counterparty_node_id(),
msg: stfu,
});
},
Err(e) => {
log_debug!(logger, "Could not advance quiescence handshake: {}", e);
}
}
}
}
}
}
#[cfg(any(test, fuzzing))]
pub fn maybe_propose_quiescence(&self, counterparty_node_id: &PublicKey, channel_id: &ChannelId) -> Result<(), APIError> {
let mut result = Ok(());
PersistenceNotifierGuard::optionally_notify(self, || {
let mut notify = NotifyOption::SkipPersistNoEvents;
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
if peer_state_mutex_opt.is_none() {
result = Err(APIError::ChannelUnavailable {
err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id)
});
return notify;
}
let mut peer_state = peer_state_mutex_opt.unwrap().lock().unwrap();
if !peer_state.latest_features.supports_quiescence() {
result = Err(APIError::ChannelUnavailable { err: "Peer does not support quiescence".to_owned() });
return notify;
}
match peer_state.channel_by_id.entry(channel_id.clone()) {
hash_map::Entry::Occupied(mut chan_entry) => {
if let Some(chan) = chan_entry.get_mut().as_funded_mut() {
let logger = WithContext::from(
&self.logger, Some(*counterparty_node_id), Some(*channel_id), None
);
match chan.propose_quiescence(&&logger) {
Ok(None) => {},
Ok(Some(stfu)) => {
peer_state.pending_msg_events.push(MessageSendEvent::SendStfu {
node_id: *counterparty_node_id, msg: stfu
});
notify = NotifyOption::SkipPersistHandleEvents;
},
Err(msg) => log_trace!(logger, "{}", msg),
}
} else {
result = Err(APIError::APIMisuseError {
err: format!("Unfunded channel {} cannot be quiescent", channel_id),
});
}
},
hash_map::Entry::Vacant(_) => {
result = Err(APIError::ChannelUnavailable {
err: format!("Channel with id {} not found for the passed counterparty node_id {}",
channel_id, counterparty_node_id),
});
},
}
notify
});
result
}
#[cfg(any(test, fuzzing))]
pub fn exit_quiescence(&self, counterparty_node_id: &PublicKey, channel_id: &ChannelId) -> Result<bool, APIError> {
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex = per_peer_state.get(counterparty_node_id)
.ok_or_else(|| APIError::ChannelUnavailable {
err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id)
})?;
let mut peer_state = peer_state_mutex.lock().unwrap();
let initiator = match peer_state.channel_by_id.entry(*channel_id) {
hash_map::Entry::Occupied(mut chan_entry) => {
if let Some(chan) = chan_entry.get_mut().as_funded_mut() {
chan.exit_quiescence()
} else {
return Err(APIError::APIMisuseError {
err: format!("Unfunded channel {} cannot be quiescent", channel_id),
})
}
},
hash_map::Entry::Vacant(_) => return Err(APIError::ChannelUnavailable {
err: format!("Channel with id {} not found for the passed counterparty node_id {}",
channel_id, counterparty_node_id),
}),
};
Ok(initiator)
}
/// Utility for creating a BOLT11 invoice that can be verified by [`ChannelManager`] without /// Utility for creating a BOLT11 invoice that can be verified by [`ChannelManager`] without
/// storing any additional state. It achieves this by including a [`PaymentSecret`] in the /// storing any additional state. It achieves this by including a [`PaymentSecret`] in the
/// invoice which it uses to verify that the invoice has not expired and the payment amount is /// invoice which it uses to verify that the invoice has not expired and the payment amount is
@ -10927,6 +11105,9 @@ where
result = NotifyOption::DoPersist; result = NotifyOption::DoPersist;
} }
// Quiescence is an in-memory protocol, so we don't have to persist because of it.
self.maybe_send_stfu();
let mut is_any_peer_connected = false; let mut is_any_peer_connected = false;
let mut pending_events = Vec::new(); let mut pending_events = Vec::new();
let per_peer_state = self.per_peer_state.read().unwrap(); let per_peer_state = self.per_peer_state.read().unwrap();
@ -11547,9 +11728,20 @@ where
} }
fn handle_stfu(&self, counterparty_node_id: PublicKey, msg: &msgs::Stfu) { fn handle_stfu(&self, counterparty_node_id: PublicKey, msg: &msgs::Stfu) {
let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
"Quiescence not supported".to_owned(), let res = self.internal_stfu(&counterparty_node_id, msg);
msg.channel_id.clone())), counterparty_node_id); let persist = match &res {
Err(e) if e.closes_channel() => NotifyOption::DoPersist,
Err(_) => NotifyOption::SkipPersistHandleEvents,
Ok(sent_stfu) => if *sent_stfu {
NotifyOption::SkipPersistHandleEvents
} else {
NotifyOption::SkipPersistNoEvents
},
};
let _ = handle_error!(self, res, counterparty_node_id);
persist
});
} }
#[cfg(splicing)] #[cfg(splicing)]
@ -12558,6 +12750,10 @@ pub fn provided_init_features(config: &UserConfig) -> InitFeatures {
} }
#[cfg(dual_funding)] #[cfg(dual_funding)]
features.set_dual_fund_optional(); features.set_dual_fund_optional();
// Only signal quiescence support in tests for now, as we don't yet support any
// quiescent-dependent protocols (e.g., splicing).
#[cfg(any(test, fuzzing))]
features.set_quiescence_optional();
features features
} }

View file

@ -89,6 +89,8 @@ mod monitor_tests;
#[allow(unused_mut)] #[allow(unused_mut)]
mod shutdown_tests; mod shutdown_tests;
#[cfg(test)] #[cfg(test)]
mod quiescence_tests;
#[cfg(test)]
#[allow(unused_mut)] #[allow(unused_mut)]
mod async_signer_tests; mod async_signer_tests;
#[cfg(test)] #[cfg(test)]

View file

@ -455,8 +455,8 @@ pub type SerialId = u64;
pub struct Stfu { pub struct Stfu {
/// The channel ID where quiescence is intended /// The channel ID where quiescence is intended
pub channel_id: ChannelId, pub channel_id: ChannelId,
/// Initiator flag, 1 if initiating, 0 if replying to an stfu. /// Initiator flag, true if initiating, false if replying to an stfu.
pub initiator: u8, pub initiator: bool,
} }
/// A `splice_init` message to be sent by or received from the stfu initiator (splice initiator). /// A `splice_init` message to be sent by or received from the stfu initiator (splice initiator).
@ -4112,10 +4112,17 @@ mod tests {
fn encoding_stfu() { fn encoding_stfu() {
let stfu = msgs::Stfu { let stfu = msgs::Stfu {
channel_id: ChannelId::from_bytes([2; 32]), channel_id: ChannelId::from_bytes([2; 32]),
initiator: 1, initiator: true,
}; };
let encoded_value = stfu.encode(); let encoded_value = stfu.encode();
assert_eq!(encoded_value.as_hex().to_string(), "020202020202020202020202020202020202020202020202020202020202020201"); assert_eq!(encoded_value.as_hex().to_string(), "020202020202020202020202020202020202020202020202020202020202020201");
let stfu = msgs::Stfu {
channel_id: ChannelId::from_bytes([3; 32]),
initiator: false,
};
let encoded_value = stfu.encode();
assert_eq!(encoded_value.as_hex().to_string(), "030303030303030303030303030303030303030303030303030303030303030300");
} }
#[test] #[test]

View file

@ -0,0 +1,501 @@
use crate::chain::ChannelMonitorUpdateStatus;
use crate::events::Event;
use crate::events::HTLCDestination;
use crate::events::MessageSendEvent;
use crate::events::MessageSendEventsProvider;
use crate::ln::channel::DISCONNECT_PEER_AWAITING_RESPONSE_TICKS;
use crate::ln::channelmanager::PaymentId;
use crate::ln::channelmanager::RecipientOnionFields;
use crate::ln::functional_test_utils::*;
use crate::ln::msgs;
use crate::ln::msgs::{ChannelMessageHandler, ErrorAction};
use crate::util::errors::APIError;
use crate::util::test_channel_signer::SignerOp;
#[test]
fn test_quiescence_tie() {
// Test that both nodes proposing quiescence at the same time results in the channel funder
// becoming the quiescence initiator.
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1).2;
nodes[0].node.maybe_propose_quiescence(&nodes[1].node.get_our_node_id(), &chan_id).unwrap();
nodes[1].node.maybe_propose_quiescence(&nodes[0].node.get_our_node_id(), &chan_id).unwrap();
let stfu_node_0 =
get_event_msg!(nodes[0], MessageSendEvent::SendStfu, nodes[1].node.get_our_node_id());
nodes[1].node.handle_stfu(nodes[0].node.get_our_node_id(), &stfu_node_0);
let stfu_node_1 =
get_event_msg!(nodes[1], MessageSendEvent::SendStfu, nodes[0].node.get_our_node_id());
nodes[0].node.handle_stfu(nodes[1].node.get_our_node_id(), &stfu_node_1);
assert!(stfu_node_0.initiator && stfu_node_1.initiator);
assert!(nodes[0].node.exit_quiescence(&nodes[1].node.get_our_node_id(), &chan_id).unwrap());
assert!(!nodes[1].node.exit_quiescence(&nodes[0].node.get_our_node_id(), &chan_id).unwrap());
}
#[test]
fn test_quiescence_shutdown_ignored() {
// Test that a shutdown sent/received during quiescence is ignored.
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1).2;
nodes[0].node.maybe_propose_quiescence(&nodes[1].node.get_our_node_id(), &chan_id).unwrap();
let _ = get_event_msg!(nodes[0], MessageSendEvent::SendStfu, nodes[1].node.get_our_node_id());
if let Err(e) = nodes[0].node.close_channel(&chan_id, &nodes[1].node.get_our_node_id()) {
assert_eq!(
e,
APIError::APIMisuseError { err: "Cannot begin shutdown while quiescent".to_owned() }
);
} else {
panic!("Expected shutdown to be ignored while quiescent");
}
nodes[1].node.close_channel(&chan_id, &nodes[0].node.get_our_node_id()).unwrap();
let shutdown =
get_event_msg!(nodes[1], MessageSendEvent::SendShutdown, nodes[0].node.get_our_node_id());
nodes[0].node.handle_shutdown(nodes[1].node.get_our_node_id(), &shutdown);
let msg_events = nodes[0].node.get_and_clear_pending_msg_events();
match msg_events[0] {
MessageSendEvent::HandleError {
action: ErrorAction::DisconnectPeerWithWarning { ref msg, .. },
..
} => {
assert_eq!(msg.data, "Got shutdown request while quiescent".to_owned());
},
_ => panic!(),
}
}
#[test]
fn test_allow_shutdown_while_awaiting_quiescence() {
allow_shutdown_while_awaiting_quiescence(false);
allow_shutdown_while_awaiting_quiescence(true);
}
fn allow_shutdown_while_awaiting_quiescence(local_shutdown: bool) {
// Test that a shutdown sent/received while we're still awaiting quiescence (stfu has not been
// sent yet) is honored and the channel is closed cooperatively.
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1).2;
let local_node = &nodes[0];
let remote_node = &nodes[1];
let local_node_id = local_node.node.get_our_node_id();
let remote_node_id = remote_node.node.get_our_node_id();
let payment_amount = 1_000_000;
let (route, payment_hash, _, payment_secret) =
get_route_and_payment_hash!(local_node, remote_node, payment_amount);
let onion = RecipientOnionFields::secret_only(payment_secret);
let payment_id = PaymentId(payment_hash.0);
local_node.node.send_payment_with_route(route, payment_hash, onion, payment_id).unwrap();
check_added_monitors!(local_node, 1);
// Attempt to send an HTLC, but don't fully commit it yet.
let update_add = get_htlc_update_msgs!(local_node, remote_node_id);
remote_node.node.handle_update_add_htlc(local_node_id, &update_add.update_add_htlcs[0]);
remote_node.node.handle_commitment_signed(local_node_id, &update_add.commitment_signed);
let (revoke_and_ack, commit_sig) = get_revoke_commit_msgs!(remote_node, local_node_id);
local_node.node.handle_revoke_and_ack(remote_node_id, &revoke_and_ack);
check_added_monitors(local_node, 1);
// Request the local node to propose quiescence, and immediately try to close the channel. Since
// we haven't sent `stfu` yet as the state machine is pending, we should forget about our
// quiescence attempt.
local_node.node.maybe_propose_quiescence(&remote_node_id, &chan_id).unwrap();
assert!(local_node.node.get_and_clear_pending_msg_events().is_empty());
let (closer_node, closee_node) =
if local_shutdown { (local_node, remote_node) } else { (remote_node, local_node) };
let closer_node_id = closer_node.node.get_our_node_id();
let closee_node_id = closee_node.node.get_our_node_id();
closer_node.node.close_channel(&chan_id, &closee_node_id).unwrap();
check_added_monitors(&remote_node, 1);
let shutdown_initiator =
get_event_msg!(closer_node, MessageSendEvent::SendShutdown, closee_node_id);
closee_node.node.handle_shutdown(closer_node_id, &shutdown_initiator);
let shutdown_responder =
get_event_msg!(closee_node, MessageSendEvent::SendShutdown, closer_node_id);
closer_node.node.handle_shutdown(closee_node_id, &shutdown_responder);
// Continue exchanging messages until the HTLC is irrevocably committed and eventually failed
// back as we are shutting down.
local_node.node.handle_commitment_signed(remote_node_id, &commit_sig);
check_added_monitors(local_node, 1);
let last_revoke_and_ack =
get_event_msg!(local_node, MessageSendEvent::SendRevokeAndACK, remote_node_id);
remote_node.node.handle_revoke_and_ack(local_node_id, &last_revoke_and_ack);
check_added_monitors(remote_node, 1);
expect_pending_htlcs_forwardable!(remote_node);
expect_htlc_handling_failed_destinations!(
remote_node.node.get_and_clear_pending_events(),
&[HTLCDestination::FailedPayment { payment_hash }]
);
check_added_monitors(remote_node, 1);
let update_fail = get_htlc_update_msgs!(remote_node, local_node_id);
local_node.node.handle_update_fail_htlc(remote_node_id, &update_fail.update_fail_htlcs[0]);
local_node.node.handle_commitment_signed(remote_node_id, &update_fail.commitment_signed);
let (revoke_and_ack, commit_sig) = get_revoke_commit_msgs!(local_node, remote_node_id);
remote_node.node.handle_revoke_and_ack(local_node_id, &revoke_and_ack);
check_added_monitors(remote_node, 1);
remote_node.node.handle_commitment_signed(local_node_id, &commit_sig);
check_added_monitors(remote_node, 1);
let last_revoke_and_ack =
get_event_msg!(remote_node, MessageSendEvent::SendRevokeAndACK, local_node_id);
local_node.node.handle_revoke_and_ack(remote_node_id, &last_revoke_and_ack);
expect_payment_failed_conditions(
local_node,
payment_hash,
true,
PaymentFailedConditions::new(),
);
// Now that the state machine is no longer pending, and `closing_signed` is ready to be sent,
// make sure we're still not waiting for the quiescence handshake to complete.
local_node.node.exit_quiescence(&remote_node_id, &chan_id).unwrap();
let _ = get_event_msg!(local_node, MessageSendEvent::SendClosingSigned, remote_node_id);
check_added_monitors(local_node, 2); // One for the last revoke_and_ack, another for closing_signed
}
#[test]
fn test_quiescence_tracks_monitor_update_in_progress_and_waits_for_async_signer() {
// Test that quiescence:
// a) considers an async signer when determining whether a pending channel update exists
// b) tracks in-progress monitor updates until no longer quiescent
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1).2;
let node_id_0 = nodes[0].node.get_our_node_id();
let node_id_1 = nodes[1].node.get_our_node_id();
let payment_amount = 1_000_000;
let (preimage, payment_hash, ..) = route_payment(&nodes[0], &[&nodes[1]], payment_amount);
nodes[1].node.claim_funds(preimage);
check_added_monitors(&nodes[1], 1);
let update = get_htlc_update_msgs!(&nodes[1], node_id_0);
nodes[0].node.handle_update_fulfill_htlc(node_id_1, &update.update_fulfill_htlcs[0]);
nodes[0].node.handle_commitment_signed(node_id_1, &update.commitment_signed);
check_added_monitors(&nodes[0], 1);
// While settling back the payment, propose quiescence from nodes[1]. We won't see its `stfu` go
// out yet as the `update_fulfill` is still pending on both sides.
nodes[1].node.maybe_propose_quiescence(&node_id_0, &chan_id).unwrap();
// Disable releasing commitment secrets on nodes[1], to hold back their `stfu` until the
// `revoke_and_ack` goes out, and drive the state machine forward.
nodes[1].disable_channel_signer_op(&node_id_0, &chan_id, SignerOp::ReleaseCommitmentSecret);
let (revoke_and_ack, commit_sig) = get_revoke_commit_msgs!(&nodes[0], node_id_1);
nodes[1].node.handle_revoke_and_ack(node_id_0, &revoke_and_ack);
check_added_monitors(&nodes[1], 1);
nodes[1].node.handle_commitment_signed(node_id_0, &commit_sig);
check_added_monitors(&nodes[1], 1);
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
// Resume the signer. We should now expect to see both messages.
nodes[1].enable_channel_signer_op(&node_id_0, &chan_id, SignerOp::ReleaseCommitmentSecret);
nodes[1].node.signer_unblocked(Some((node_id_0, chan_id)));
expect_payment_claimed!(&nodes[1], payment_hash, payment_amount);
macro_rules! find_msg {
($events: expr, $msg: ident) => {{
$events
.iter()
.find_map(|event| {
if let MessageSendEvent::$msg { ref msg, .. } = event {
Some(msg)
} else {
None
}
})
.unwrap()
}};
}
let msg_events = nodes[1].node.get_and_clear_pending_msg_events();
assert_eq!(msg_events.len(), 2);
let revoke_and_ack = find_msg!(msg_events, SendRevokeAndACK);
let stfu = find_msg!(msg_events, SendStfu);
// While handling the last `revoke_and_ack` on nodes[0], we'll hold the monitor update and
// become quiescent.
chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
nodes[0].node.handle_revoke_and_ack(node_id_1, &revoke_and_ack);
nodes[0].node.handle_stfu(node_id_1, &stfu);
let stfu = get_event_msg!(&nodes[0], MessageSendEvent::SendStfu, node_id_1);
nodes[1].node.handle_stfu(node_id_0, &stfu);
nodes[0].node.exit_quiescence(&node_id_1, &chan_id).unwrap();
nodes[1].node.exit_quiescence(&node_id_0, &chan_id).unwrap();
// After exiting quiescence, we should be able to resume payments from nodes[0], but the monitor
// update has yet to complete. Attempting to send a payment now will be delayed until the
// monitor update completes.
{
let (route, payment_hash, _, payment_secret) =
get_route_and_payment_hash!(&nodes[0], &nodes[1], payment_amount);
let onion = RecipientOnionFields::secret_only(payment_secret);
let payment_id = PaymentId(payment_hash.0);
nodes[0].node.send_payment_with_route(route, payment_hash, onion, payment_id).unwrap();
}
check_added_monitors(&nodes[0], 0);
assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
// We have two updates pending:
{
let chain_monitor = &nodes[0].chain_monitor;
let (_, latest_update) =
chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id).unwrap().clone();
let chain_monitor = &nodes[0].chain_monitor.chain_monitor;
// One for the latest commitment transaction update from the last `revoke_and_ack`
chain_monitor.channel_monitor_updated(chan_id, latest_update - 1).unwrap();
expect_payment_sent(&nodes[0], preimage, None, true, true);
// One for the commitment secret update from the last `revoke_and_ack`
chain_monitor.channel_monitor_updated(chan_id, latest_update).unwrap();
}
// With the pending monitor updates complete, we'll see a new monitor update go out when freeing
// the holding cells to send out the new HTLC.
nodes[0].chain_monitor.complete_sole_pending_chan_update(&chan_id);
let _ = get_htlc_update_msgs!(&nodes[0], node_id_1);
check_added_monitors(&nodes[0], 1);
}
#[test]
fn test_quiescence_updates_go_to_holding_cell() {
quiescence_updates_go_to_holding_cell(false);
quiescence_updates_go_to_holding_cell(true);
}
fn quiescence_updates_go_to_holding_cell(fail_htlc: bool) {
// Test that any updates made to a channel while quiescent go to the holding cell.
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1).2;
let node_id_0 = nodes[0].node.get_our_node_id();
let node_id_1 = nodes[1].node.get_our_node_id();
// Send enough to be able to pay from both directions.
let payment_amount = 1_000_000;
send_payment(&nodes[0], &[&nodes[1]], payment_amount * 4);
// Propose quiescence from nodes[1], and immediately try to send a payment. Since its `stfu` has
// already gone out first, the outbound HTLC will go into the holding cell.
nodes[1].node.maybe_propose_quiescence(&node_id_0, &chan_id).unwrap();
let stfu = get_event_msg!(&nodes[1], MessageSendEvent::SendStfu, node_id_0);
let (route1, payment_hash1, payment_preimage1, payment_secret1) =
get_route_and_payment_hash!(&nodes[1], &nodes[0], payment_amount);
let onion1 = RecipientOnionFields::secret_only(payment_secret1);
let payment_id1 = PaymentId(payment_hash1.0);
nodes[1].node.send_payment_with_route(route1, payment_hash1, onion1, payment_id1).unwrap();
check_added_monitors!(&nodes[1], 0);
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
// Send a payment in the opposite direction. Since nodes[0] hasn't sent its own `stfu` yet, it's
// allowed to make updates.
let (route2, payment_hash2, payment_preimage2, payment_secret2) =
get_route_and_payment_hash!(&nodes[0], &nodes[1], payment_amount);
let onion2 = RecipientOnionFields::secret_only(payment_secret2);
let payment_id2 = PaymentId(payment_hash2.0);
nodes[0].node.send_payment_with_route(route2, payment_hash2, onion2, payment_id2).unwrap();
check_added_monitors!(&nodes[0], 1);
let update_add = get_htlc_update_msgs!(&nodes[0], node_id_1);
nodes[1].node.handle_update_add_htlc(node_id_0, &update_add.update_add_htlcs[0]);
commitment_signed_dance!(&nodes[1], &nodes[0], update_add.commitment_signed, false);
expect_pending_htlcs_forwardable!(&nodes[1]);
expect_payment_claimable!(nodes[1], payment_hash2, payment_secret2, payment_amount);
// Have nodes[1] attempt to fail/claim nodes[0]'s payment. Since nodes[1] already sent out
// `stfu`, the `update_fail/fulfill` will go into the holding cell.
if fail_htlc {
nodes[1].node.fail_htlc_backwards(&payment_hash2);
let failed_payment = HTLCDestination::FailedPayment { payment_hash: payment_hash2 };
expect_pending_htlcs_forwardable_and_htlc_handling_failed!(&nodes[1], vec![failed_payment]);
} else {
nodes[1].node.claim_funds(payment_preimage2);
check_added_monitors(&nodes[1], 1);
}
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
// Finish the quiescence handshake.
nodes[0].node.handle_stfu(node_id_1, &stfu);
let stfu = get_event_msg!(&nodes[0], MessageSendEvent::SendStfu, node_id_1);
nodes[1].node.handle_stfu(node_id_0, &stfu);
nodes[0].node.exit_quiescence(&node_id_1, &chan_id).unwrap();
nodes[1].node.exit_quiescence(&node_id_0, &chan_id).unwrap();
// Now that quiescence is over, nodes are allowed to make updates again. nodes[1] will have its
// outbound HTLC finally go out, along with the fail/claim of nodes[0]'s payment.
let update = get_htlc_update_msgs!(&nodes[1], node_id_0);
check_added_monitors(&nodes[1], 1);
nodes[0].node.handle_update_add_htlc(node_id_1, &update.update_add_htlcs[0]);
if fail_htlc {
nodes[0].node.handle_update_fail_htlc(node_id_1, &update.update_fail_htlcs[0]);
} else {
nodes[0].node.handle_update_fulfill_htlc(node_id_1, &update.update_fulfill_htlcs[0]);
}
commitment_signed_dance!(&nodes[0], &nodes[1], update.commitment_signed, false);
if !fail_htlc {
expect_payment_claimed!(nodes[1], payment_hash2, payment_amount);
}
// The payment from nodes[0] should now be seen as failed/successful.
let events = nodes[0].node.get_and_clear_pending_events();
assert_eq!(events.len(), 3);
assert!(events.iter().find(|e| matches!(e, Event::PendingHTLCsForwardable { .. })).is_some());
if fail_htlc {
assert!(events.iter().find(|e| matches!(e, Event::PaymentFailed { .. })).is_some());
assert!(events.iter().find(|e| matches!(e, Event::PaymentPathFailed { .. })).is_some());
} else {
assert!(events.iter().find(|e| matches!(e, Event::PaymentSent { .. })).is_some());
assert!(events.iter().find(|e| matches!(e, Event::PaymentPathSuccessful { .. })).is_some());
check_added_monitors(&nodes[0], 1);
}
nodes[0].node.process_pending_htlc_forwards();
expect_payment_claimable!(nodes[0], payment_hash1, payment_secret1, payment_amount);
// Have nodes[0] fail/claim nodes[1]'s payment.
if fail_htlc {
nodes[0].node.fail_htlc_backwards(&payment_hash1);
let failed_payment = HTLCDestination::FailedPayment { payment_hash: payment_hash1 };
expect_pending_htlcs_forwardable_and_htlc_handling_failed!(&nodes[0], vec![failed_payment]);
} else {
nodes[0].node.claim_funds(payment_preimage1);
}
check_added_monitors(&nodes[0], 1);
let update = get_htlc_update_msgs!(&nodes[0], node_id_1);
if fail_htlc {
nodes[1].node.handle_update_fail_htlc(node_id_0, &update.update_fail_htlcs[0]);
} else {
nodes[1].node.handle_update_fulfill_htlc(node_id_0, &update.update_fulfill_htlcs[0]);
}
commitment_signed_dance!(&nodes[1], &nodes[0], update.commitment_signed, false);
// The payment from nodes[1] should now be seen as failed/successful.
if fail_htlc {
let conditions = PaymentFailedConditions::new();
expect_payment_failed_conditions(&nodes[1], payment_hash1, true, conditions);
} else {
expect_payment_claimed!(nodes[0], payment_hash1, payment_amount);
expect_payment_sent(&nodes[1], payment_preimage1, None, true, true);
}
}
#[test]
fn test_quiescence_timeout() {
// Test that we'll disconnect if we remain quiescent for `DISCONNECT_PEER_AWAITING_RESPONSE_TICKS`.
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1).2;
let node_id_0 = nodes[0].node.get_our_node_id();
let node_id_1 = nodes[1].node.get_our_node_id();
nodes[0].node.maybe_propose_quiescence(&nodes[1].node.get_our_node_id(), &chan_id).unwrap();
let stfu_initiator = get_event_msg!(nodes[0], MessageSendEvent::SendStfu, node_id_1);
nodes[1].node.handle_stfu(node_id_0, &stfu_initiator);
let stfu_responder = get_event_msg!(nodes[1], MessageSendEvent::SendStfu, node_id_0);
nodes[0].node.handle_stfu(node_id_1, &stfu_responder);
assert!(stfu_initiator.initiator && !stfu_responder.initiator);
for _ in 0..DISCONNECT_PEER_AWAITING_RESPONSE_TICKS {
nodes[0].node.timer_tick_occurred();
nodes[1].node.timer_tick_occurred();
}
let f = |event| {
if let MessageSendEvent::HandleError { action, .. } = event {
if let msgs::ErrorAction::DisconnectPeerWithWarning { .. } = action {
Some(())
} else {
None
}
} else {
None
}
};
assert!(nodes[0].node.get_and_clear_pending_msg_events().into_iter().find_map(f).is_some());
assert!(nodes[1].node.get_and_clear_pending_msg_events().into_iter().find_map(f).is_some());
}
#[test]
fn test_quiescence_timeout_while_waiting_for_counterparty_stfu() {
// Test that we'll disconnect if the counterparty does not send their stfu within a reasonable
// time if we've already sent ours.
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1).2;
let node_id_0 = nodes[0].node.get_our_node_id();
nodes[1].node.maybe_propose_quiescence(&node_id_0, &chan_id).unwrap();
let _ = get_event_msg!(nodes[1], MessageSendEvent::SendStfu, node_id_0);
// Route a payment in between to ensure expecting to receive `revoke_and_ack` doesn't override
// the expectation of receiving `stfu` as well.
let _ = route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
for _ in 0..DISCONNECT_PEER_AWAITING_RESPONSE_TICKS {
nodes[0].node.timer_tick_occurred();
nodes[1].node.timer_tick_occurred();
}
// nodes[0] hasn't received stfu from nodes[1], so it's not enforcing any timeouts.
assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
// nodes[1] didn't receive nodes[0]'s stfu within the timeout so it'll disconnect.
let f = |&ref event| {
if let MessageSendEvent::HandleError { action, .. } = event {
if let msgs::ErrorAction::DisconnectPeerWithWarning { .. } = action {
Some(())
} else {
None
}
} else {
None
}
};
assert!(nodes[1].node.get_and_clear_pending_msg_events().iter().find_map(f).is_some());
}