Finish closing channel after async closing signed

In addressing a followup to test reconnection during closing negotation
with async signing, we change things to only return a `ShutdownResult`
when we actually finish shutting down the channel, i.e. we have the
signature ready to send the final closing signed. This slightly
simplifies the logic where we would shutdown our channel
prematurely before we got the final signature. This also means
that we don't push multiple `ChannelClosed` events if we receive closing
signed, reconnect, and receive closing signed again.
This commit is contained in:
Alec Chen 2024-09-05 18:58:01 -05:00
parent d35239ca62
commit 1fa7bf9391
4 changed files with 102 additions and 59 deletions

View file

@ -854,11 +854,13 @@ fn test_async_holder_signatures_remote_commitment_anchors() {
#[test]
fn test_closing_signed() {
do_test_closing_signed(false);
do_test_closing_signed(true);
do_test_closing_signed(false, false);
do_test_closing_signed(true, false);
do_test_closing_signed(false, true);
do_test_closing_signed(true, true);
}
fn do_test_closing_signed(extra_closing_signed: bool) {
fn do_test_closing_signed(extra_closing_signed: bool, reconnect: bool) {
// Based off of `expect_channel_shutdown_state`.
// Test that we can asynchronously sign closing transactions.
let chanmon_cfgs = create_chanmon_cfgs(2);
@ -867,6 +869,9 @@ fn do_test_closing_signed(extra_closing_signed: bool) {
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let chan_1 = create_announced_chan_between_nodes(&nodes, 0, 1);
// Avoid extra channel ready message upon reestablish later
send_payment(&nodes[0], &vec![&nodes[1]][..], 8_000_000);
expect_channel_shutdown_state!(nodes[0], chan_1.2, ChannelShutdownState::NotShuttingDown);
nodes[0].node.close_channel(&chan_1.2, &nodes[1].node.get_our_node_id()).unwrap();
@ -914,7 +919,7 @@ fn do_test_closing_signed(extra_closing_signed: bool) {
let mut node_1_closing_signed_2 = node_1_closing_signed.clone();
let holder_script = nodes[0].keys_manager.get_shutdown_scriptpubkey().unwrap();
let counterparty_script = nodes[1].keys_manager.get_shutdown_scriptpubkey().unwrap();
let funding_outpoint = bitcoin::OutPoint { txid: chan_1.3.txid(), vout: 0 };
let funding_outpoint = bitcoin::OutPoint { txid: chan_1.3.compute_txid(), vout: 0 };
let closing_tx_2 = ClosingTransaction::new(50000, 0, holder_script.into(),
counterparty_script.into(), funding_outpoint);
@ -941,9 +946,45 @@ fn do_test_closing_signed(extra_closing_signed: bool) {
};
}
if reconnect {
nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
*nodes[0].fee_estimator.sat_per_kw.lock().unwrap() *= 8;
*nodes[1].fee_estimator.sat_per_kw.lock().unwrap() *= 8;
connect_nodes(&nodes[0], &nodes[1]);
let node_0_reestablish = get_chan_reestablish_msgs!(nodes[0], nodes[1]).pop().unwrap();
let node_1_reestablish = get_chan_reestablish_msgs!(nodes[1], nodes[0]).pop().unwrap();
nodes[1].node.handle_channel_reestablish(&nodes[0].node.get_our_node_id(), &node_0_reestablish);
nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &node_1_reestablish);
let node_0_msgs = nodes[0].node.get_and_clear_pending_msg_events();
assert_eq!(node_0_msgs.len(), 2);
let node_0_2nd_shutdown = match node_0_msgs[0] {
MessageSendEvent::SendShutdown { ref msg, .. } => {
msg.clone()
},
_ => panic!(),
};
let node_0_2nd_closing_signed = match node_0_msgs[1] {
MessageSendEvent::SendClosingSigned { ref msg, .. } => {
msg.clone()
},
_ => panic!(),
};
let node_1_2nd_shutdown = get_event_msg!(nodes[1], MessageSendEvent::SendShutdown, nodes[0].node.get_our_node_id());
nodes[1].node.handle_shutdown(&nodes[0].node.get_our_node_id(), &node_0_2nd_shutdown);
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
nodes[0].node.handle_shutdown(&nodes[1].node.get_our_node_id(), &node_1_2nd_shutdown);
nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &node_0_2nd_closing_signed);
let node_1_closing_signed = get_event_msg!(nodes[1], MessageSendEvent::SendClosingSigned, nodes[0].node.get_our_node_id());
nodes[0].node.handle_closing_signed(&nodes[1].node.get_our_node_id(), &node_1_closing_signed);
}
nodes[0].node.signer_unblocked(None);
let (_, node_0_2nd_closing_signed) = get_closing_signed_broadcast!(nodes[0].node, nodes[1].node.get_our_node_id());
nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &node_0_2nd_closing_signed.unwrap());
let (_, node_1_closing_signed) = get_closing_signed_broadcast!(nodes[1].node, nodes[0].node.get_our_node_id());
assert!(node_1_closing_signed.is_none());
@ -952,4 +993,5 @@ fn do_test_closing_signed(extra_closing_signed: bool) {
assert!(nodes[1].node.list_channels().is_empty());
check_closed_event!(nodes[0], 1, ClosureReason::LocallyInitiatedCooperativeClosure, [nodes[1].node.get_our_node_id()], 100000);
check_closed_event!(nodes[1], 1, ClosureReason::CounterpartyInitiatedCooperativeClosure, [nodes[0].node.get_our_node_id()], 100000);
}

View file

@ -909,6 +909,7 @@ pub(super) struct SignerResumeUpdates {
pub order: RAACommitmentOrder,
pub closing_signed: Option<msgs::ClosingSigned>,
pub signed_closing_tx: Option<Transaction>,
pub shutdown_result: Option<ShutdownResult>,
}
/// The return value of `channel_reestablish`
@ -5508,7 +5509,7 @@ impl<SP: Deref> Channel<SP> where
commitment_update = None;
}
let (closing_signed, signed_closing_tx) = if self.context.signer_pending_closing {
let (closing_signed, signed_closing_tx, shutdown_result) = if self.context.signer_pending_closing {
debug_assert!(self.context.last_sent_closing_fee.is_some());
if let Some((fee, skip_remote_output, fee_range, holder_sig)) = self.context.last_sent_closing_fee.clone() {
debug_assert!(holder_sig.is_none());
@ -5524,19 +5525,21 @@ impl<SP: Deref> Channel<SP> where
&self.context.get_counterparty_pubkeys().funding_pubkey).is_ok());
Some(self.build_signed_closing_transaction(&closing_tx, &counterparty_sig, signature))
} else { None };
(closing_signed, signed_tx)
} else { (None, None) }
} else { (None, None) };
let shutdown_result = signed_tx.as_ref().map(|_| self.shutdown_result_coop_close());
(closing_signed, signed_tx, shutdown_result)
} else { (None, None, None) }
} else { (None, None, None) };
log_trace!(logger, "Signer unblocked with {} commitment_update, {} revoke_and_ack, with resend order {:?}, {} funding_signed, {} channel_ready,
{} closing_signed, and {} signed_closing_tx",
{} closing_signed, {} signed_closing_tx, and {} shutdown result",
if commitment_update.is_some() { "a" } else { "no" },
if revoke_and_ack.is_some() { "a" } else { "no" },
self.context.resend_order,
if funding_signed.is_some() { "a" } else { "no" },
if channel_ready.is_some() { "a" } else { "no" },
if closing_signed.is_some() { "a" } else { "no" },
if signed_closing_tx.is_some() { "a" } else { "no" });
if signed_closing_tx.is_some() { "a" } else { "no" },
if shutdown_result.is_some() { "a" } else { "no" });
SignerResumeUpdates {
commitment_update,
@ -5546,6 +5549,7 @@ impl<SP: Deref> Channel<SP> where
order: self.context.resend_order.clone(),
closing_signed,
signed_closing_tx,
shutdown_result,
}
}
@ -6170,6 +6174,27 @@ impl<SP: Deref> Channel<SP> where
})
}
fn shutdown_result_coop_close(&self) -> ShutdownResult {
let closure_reason = if self.initiated_shutdown() {
ClosureReason::LocallyInitiatedCooperativeClosure
} else {
ClosureReason::CounterpartyInitiatedCooperativeClosure
};
ShutdownResult {
closure_reason,
monitor_update: None,
dropped_outbound_htlcs: Vec::new(),
unbroadcasted_batch_funding_txid: self.context.unbroadcasted_batch_funding_txid(),
channel_id: self.context.channel_id,
user_channel_id: self.context.user_id,
channel_capacity_satoshis: self.context.channel_value_satoshis,
counterparty_node_id: self.context.counterparty_node_id,
unbroadcasted_funding_tx: self.context.unbroadcasted_funding(),
is_manual_broadcast: self.context.is_manual_broadcast,
channel_funding_txo: self.context.get_funding_txo(),
}
}
pub fn closing_signed<F: Deref, L: Deref>(
&mut self, fee_estimator: &LowerBoundedFeeEstimator<F>, msg: &msgs::ClosingSigned, logger: &L)
-> Result<(Option<msgs::ClosingSigned>, Option<Transaction>, Option<ShutdownResult>), ChannelError>
@ -6226,28 +6251,10 @@ impl<SP: Deref> Channel<SP> where
}
}
let closure_reason = if self.initiated_shutdown() {
ClosureReason::LocallyInitiatedCooperativeClosure
} else {
ClosureReason::CounterpartyInitiatedCooperativeClosure
};
assert!(self.context.shutdown_scriptpubkey.is_some());
if let Some((last_fee, _, _, Some(sig))) = self.context.last_sent_closing_fee {
if last_fee == msg.fee_satoshis {
let shutdown_result = ShutdownResult {
closure_reason,
monitor_update: None,
dropped_outbound_htlcs: Vec::new(),
unbroadcasted_batch_funding_txid: self.context.unbroadcasted_batch_funding_txid(),
channel_id: self.context.channel_id,
user_channel_id: self.context.user_id,
channel_capacity_satoshis: self.context.channel_value_satoshis,
counterparty_node_id: self.context.counterparty_node_id,
unbroadcasted_funding_tx: self.context.unbroadcasted_funding(),
is_manual_broadcast: self.context.is_manual_broadcast,
channel_funding_txo: self.context.get_funding_txo(),
};
let shutdown_result = self.shutdown_result_coop_close();
let tx = self.build_signed_closing_transaction(&mut closing_tx, &msg.signature, &sig);
self.context.channel_state = ChannelState::ShutdownComplete;
self.context.update_time_counter += 1;
@ -6268,19 +6275,8 @@ impl<SP: Deref> Channel<SP> where
let closing_signed = self.get_closing_signed_msg(&closing_tx, skip_remote_output, used_fee, our_min_fee, our_max_fee, logger);
let (signed_tx, shutdown_result) = if $new_fee == msg.fee_satoshis {
let shutdown_result = ShutdownResult {
closure_reason,
monitor_update: None,
dropped_outbound_htlcs: Vec::new(),
unbroadcasted_batch_funding_txid: self.context.unbroadcasted_batch_funding_txid(),
channel_id: self.context.channel_id,
user_channel_id: self.context.user_id,
channel_capacity_satoshis: self.context.channel_value_satoshis,
counterparty_node_id: self.context.counterparty_node_id,
unbroadcasted_funding_tx: self.context.unbroadcasted_funding(),
is_manual_broadcast: self.context.is_manual_broadcast,
channel_funding_txo: self.context.get_funding_txo(),
};
let shutdown_result = closing_signed.as_ref()
.map(|_| self.shutdown_result_coop_close());
if closing_signed.is_some() {
self.context.channel_state = ChannelState::ShutdownComplete;
}
@ -6288,7 +6284,7 @@ impl<SP: Deref> Channel<SP> where
self.context.last_received_closing_sig = Some(msg.signature.clone());
let tx = closing_signed.as_ref().map(|ClosingSigned { signature, .. }|
self.build_signed_closing_transaction(&closing_tx, &msg.signature, signature));
(tx, Some(shutdown_result))
(tx, shutdown_result)
} else {
(None, None)
};

View file

@ -7827,7 +7827,7 @@ where
if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
let logger = WithChannelContext::from(&self.logger, &chan.context, None);
let (closing_signed, tx, shutdown_result) = try_chan_phase_entry!(self, chan.closing_signed(&self.fee_estimator, &msg, &&logger), chan_phase_entry);
debug_assert_eq!(shutdown_result.is_some(), chan.is_shutdown() || chan.is_shutdown_pending_signature());
debug_assert_eq!(shutdown_result.is_some(), chan.is_shutdown());
if let Some(msg) = closing_signed {
peer_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
node_id: counterparty_node_id.clone(),
@ -8650,7 +8650,7 @@ where
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
// Returns whether we should remove this channel as it's just been closed.
let unblock_chan = |phase: &mut ChannelPhase<SP>, pending_msg_events: &mut Vec<MessageSendEvent>| -> bool {
let unblock_chan = |phase: &mut ChannelPhase<SP>, pending_msg_events: &mut Vec<MessageSendEvent>| -> Option<ShutdownResult> {
let node_id = phase.context().get_counterparty_node_id();
match phase {
ChannelPhase::Funded(chan) => {
@ -8703,11 +8703,8 @@ where
msg: update
});
}
// We should return true to remove the channel if we just
// broadcasted the closing transaction.
true
} else { false }
}
msgs.shutdown_result
}
ChannelPhase::UnfundedOutboundV1(chan) => {
if let Some(msg) = chan.signer_maybe_unblocked(&self.logger) {
@ -8716,12 +8713,13 @@ where
msg,
});
}
false
None
}
ChannelPhase::UnfundedInboundV1(_) => false,
ChannelPhase::UnfundedInboundV1(_) => None,
}
};
let mut shutdown_results = Vec::new();
let per_peer_state = self.per_peer_state.read().unwrap();
let per_peer_state_iter = per_peer_state.iter().filter(|(cp_id, _)| {
if let Some((counterparty_node_id, _)) = channel_opt {
@ -8732,16 +8730,23 @@ where
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
peer_state.channel_by_id.retain(|_, chan| {
let should_remove = match channel_opt {
Some((_, channel_id)) if chan.context().channel_id() != channel_id => false,
let shutdown_result = match channel_opt {
Some((_, channel_id)) if chan.context().channel_id() != channel_id => None,
_ => unblock_chan(chan, &mut peer_state.pending_msg_events),
};
if should_remove {
if let Some(shutdown_result) = shutdown_result {
log_trace!(self.logger, "Removing channel after unblocking signer");
shutdown_results.push(shutdown_result);
false
} else {
true
}
!should_remove
});
}
drop(per_peer_state);
for shutdown_result in shutdown_results.drain(..) {
self.finish_close_channel(shutdown_result);
}
}
/// Check whether any channels have finished removing all pending updates after a shutdown
@ -8770,8 +8775,8 @@ where
node_id: chan.context.get_counterparty_node_id(), msg,
});
}
debug_assert_eq!(shutdown_result_opt.is_some(), chan.is_shutdown());
if let Some(shutdown_result) = shutdown_result_opt {
debug_assert!(chan.is_shutdown() || chan.is_shutdown_pending_signature());
shutdown_results.push(shutdown_result);
}
if let Some(tx) = tx_opt {

View file

@ -3295,7 +3295,7 @@ pub fn create_network<'a, 'b: 'a, 'c: 'b>(node_count: usize, cfgs: &'b Vec<NodeC
nodes
}
fn connect_nodes<'a, 'b: 'a, 'c: 'b>(node_a: &Node<'a, 'b, 'c>, node_b: &Node<'a, 'b, 'c>) {
pub fn connect_nodes<'a, 'b: 'a, 'c: 'b>(node_a: &Node<'a, 'b, 'c>, node_b: &Node<'a, 'b, 'c>) {
let node_id_a = node_a.node.get_our_node_id();
let node_id_b = node_b.node.get_our_node_id();