Merge pull request #568 from jkczyz/2020-03-handle-error-deadlock

Fix deadlock in ChannelManager's handle_error!()
This commit is contained in:
Matt Corallo 2020-04-02 20:06:00 +00:00 committed by GitHub
commit f0b037ce14
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 124 additions and 107 deletions

View file

@ -467,21 +467,41 @@ pub struct ChannelDetails {
}
macro_rules! handle_error {
($self: ident, $internal: expr, $their_node_id: expr, $locked_channel_state: expr) => {
($self: ident, $internal: expr, $their_node_id: expr) => {
match $internal {
Ok(msg) => Ok(msg),
Err(MsgHandleErrInternal { err, shutdown_finish }) => {
#[cfg(debug_assertions)]
{
// In testing, ensure there are no deadlocks where the lock is already held upon
// entering the macro.
assert!($self.channel_state.try_lock().is_ok());
}
let mut msg_events = Vec::with_capacity(2);
if let Some((shutdown_res, update_option)) = shutdown_finish {
$self.finish_force_close_channel(shutdown_res);
if let Some(update) = update_option {
$locked_channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
}
log_error!($self, "{}", err.err);
if let msgs::ErrorAction::IgnoreError = err.action {
} else { $locked_channel_state.pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: $their_node_id, action: err.action.clone() }); }
} else {
msg_events.push(events::MessageSendEvent::HandleError {
node_id: $their_node_id,
action: err.action.clone()
});
}
if !msg_events.is_empty() {
$self.channel_state.lock().unwrap().pending_msg_events.append(&mut msg_events);
}
// Return error in case higher-API need one
Err(err)
},
@ -1191,9 +1211,8 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
let _ = self.total_consistency_lock.read().unwrap();
let mut channel_lock = self.channel_state.lock().unwrap();
let err: Result<(), _> = loop {
let mut channel_lock = self.channel_state.lock().unwrap();
let id = match channel_lock.short_to_id.get(&route.hops.first().unwrap().short_channel_id) {
None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!"}),
Some(id) => id.clone(),
@ -1242,7 +1261,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
return Ok(());
};
match handle_error!(self, err, route.hops.first().unwrap().pubkey, channel_lock) {
match handle_error!(self, err, route.hops.first().unwrap().pubkey) {
Ok(_) => unreachable!(),
Err(e) => { Err(APIError::ChannelUnavailable { err: e.err }) }
}
@ -1261,8 +1280,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
let _ = self.total_consistency_lock.read().unwrap();
let (mut chan, msg, chan_monitor) = {
let mut channel_state = self.channel_state.lock().unwrap();
let (res, chan) = match channel_state.by_id.remove(temporary_channel_id) {
let (res, chan) = match self.channel_state.lock().unwrap().by_id.remove(temporary_channel_id) {
Some(mut chan) => {
(chan.get_outbound_funding_created(funding_txo)
.map_err(|e| if let ChannelError::Close(msg) = e {
@ -1272,7 +1290,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
},
None => return
};
match handle_error!(self, res, chan.get_their_node_id(), channel_state) {
match handle_error!(self, res, chan.get_their_node_id()) {
Ok(funding_msg) => {
(chan, funding_msg.0, funding_msg.1)
},
@ -1284,13 +1302,10 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
if let Err(e) = self.monitor.add_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
match e {
ChannelMonitorUpdateErr::PermanentFailure => {
{
let mut channel_state = self.channel_state.lock().unwrap();
match handle_error!(self, Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", *temporary_channel_id, chan.force_shutdown(true), None)), chan.get_their_node_id(), channel_state) {
match handle_error!(self, Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", *temporary_channel_id, chan.force_shutdown(true), None)), chan.get_their_node_id()) {
Err(_) => { return; },
Ok(()) => unreachable!(),
}
}
},
ChannelMonitorUpdateErr::TemporaryFailure => {
// Its completely fine to continue with a FundingCreated until the monitor
@ -1520,10 +1535,8 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
},
ChannelError::CloseDelayBroadcast { .. } => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); }
};
match handle_error!(self, err, their_node_id, channel_state) {
Ok(_) => unreachable!(),
Err(_) => { continue; },
}
handle_errors.push((their_node_id, err));
continue;
}
};
if let Err(e) = self.monitor.update_monitor(chan.get().get_funding_txo().unwrap(), monitor_update) {
@ -1579,11 +1592,8 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
};
}
if handle_errors.len() > 0 {
let mut channel_state_lock = self.channel_state.lock().unwrap();
for (their_node_id, err) in handle_errors.drain(..) {
let _ = handle_error!(self, err, their_node_id, channel_state_lock);
}
let _ = handle_error!(self, err, their_node_id);
}
if new_events.is_empty() { return }
@ -1835,7 +1845,8 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
return;
};
let _ = handle_error!(self, err, their_node_id, channel_state_lock);
mem::drop(channel_state_lock);
let _ = handle_error!(self, err, their_node_id);
}
/// Gets the node_id held by this ChannelManager
@ -2579,9 +2590,9 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
#[doc(hidden)]
pub fn update_fee(&self, channel_id: [u8;32], feerate_per_kw: u64) -> Result<(), APIError> {
let _ = self.total_consistency_lock.read().unwrap();
let mut channel_state_lock = self.channel_state.lock().unwrap();
let their_node_id;
let err: Result<(), _> = loop {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let channel_state = &mut *channel_state_lock;
match channel_state.by_id.entry(channel_id) {
@ -2620,7 +2631,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
return Ok(())
};
match handle_error!(self, err, their_node_id, channel_state_lock) {
match handle_error!(self, err, their_node_id) {
Ok(_) => unreachable!(),
Err(e) => { Err(APIError::APIMisuseError { err: e.err })}
}
@ -2830,146 +2841,82 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
{
fn handle_open_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::OpenChannel) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_open_channel(their_node_id, their_features, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_open_channel(their_node_id, their_features, msg), *their_node_id);
}
fn handle_accept_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::AcceptChannel) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_accept_channel(their_node_id, their_features, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_accept_channel(their_node_id, their_features, msg), *their_node_id);
}
fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_funding_created(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_funding_created(their_node_id, msg), *their_node_id);
}
fn handle_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_funding_signed(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_funding_signed(their_node_id, msg), *their_node_id);
}
fn handle_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_funding_locked(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_funding_locked(their_node_id, msg), *their_node_id);
}
fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_shutdown(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_shutdown(their_node_id, msg), *their_node_id);
}
fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_closing_signed(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_closing_signed(their_node_id, msg), *their_node_id);
}
fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_update_add_htlc(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_update_add_htlc(their_node_id, msg), *their_node_id);
}
fn handle_update_fulfill_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_update_fulfill_htlc(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_update_fulfill_htlc(their_node_id, msg), *their_node_id);
}
fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_update_fail_htlc(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_update_fail_htlc(their_node_id, msg), *their_node_id);
}
fn handle_update_fail_malformed_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_update_fail_malformed_htlc(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_update_fail_malformed_htlc(their_node_id, msg), *their_node_id);
}
fn handle_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_commitment_signed(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_commitment_signed(their_node_id, msg), *their_node_id);
}
fn handle_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_revoke_and_ack(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_revoke_and_ack(their_node_id, msg), *their_node_id);
}
fn handle_update_fee(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFee) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_update_fee(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_update_fee(their_node_id, msg), *their_node_id);
}
fn handle_announcement_signatures(&self, their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_announcement_signatures(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_announcement_signatures(their_node_id, msg), *their_node_id);
}
fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_channel_reestablish(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_channel_reestablish(their_node_id, msg), *their_node_id);
}
fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) {

View file

@ -2988,6 +2988,76 @@ fn test_commitment_revoked_fail_backward_exhaustive_b() {
do_test_commitment_revoked_fail_backward_exhaustive(true, false, true);
}
#[test]
fn fail_backward_pending_htlc_upon_channel_failure() {
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let chan = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 500_000_000, InitFeatures::supported(), InitFeatures::supported());
// Alice -> Bob: Route a payment but without Bob sending revoke_and_ack.
{
let (_, payment_hash) = get_payment_preimage_hash!(nodes[0]);
let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &Vec::new(), 50_000, TEST_FINAL_CLTV).unwrap();
nodes[0].node.send_payment(route, payment_hash).unwrap();
check_added_monitors!(nodes[0], 1);
let payment_event = {
let mut events = nodes[0].node.get_and_clear_pending_msg_events();
assert_eq!(events.len(), 1);
SendEvent::from_event(events.remove(0))
};
assert_eq!(payment_event.node_id, nodes[1].node.get_our_node_id());
assert_eq!(payment_event.msgs.len(), 1);
}
// Alice -> Bob: Route another payment but now Alice waits for Bob's earlier revoke_and_ack.
let (_, failed_payment_hash) = get_payment_preimage_hash!(nodes[0]);
{
let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &Vec::new(), 50_000, TEST_FINAL_CLTV).unwrap();
nodes[0].node.send_payment(route, failed_payment_hash).unwrap();
check_added_monitors!(nodes[0], 0);
assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
}
// Alice <- Bob: Send a malformed update_add_htlc so Alice fails the channel.
{
let route = nodes[1].router.get_route(&nodes[0].node.get_our_node_id(), None, &Vec::new(), 50_000, TEST_FINAL_CLTV).unwrap();
let (_, payment_hash) = get_payment_preimage_hash!(nodes[1]);
let secp_ctx = Secp256k1::new();
let session_priv = {
let mut session_key = [0; 32];
let mut rng = thread_rng();
rng.fill_bytes(&mut session_key);
SecretKey::from_slice(&session_key).expect("RNG is bad!")
};
let current_height = nodes[1].node.latest_block_height.load(Ordering::Acquire) as u32 + 1;
let (onion_payloads, _amount_msat, cltv_expiry) = onion_utils::build_onion_payloads(&route, current_height).unwrap();
let onion_keys = onion_utils::construct_onion_keys(&secp_ctx, &route, &session_priv).unwrap();
let onion_routing_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, [0; 32], &payment_hash);
// Send a 0-msat update_add_htlc to fail the channel.
let update_add_htlc = msgs::UpdateAddHTLC {
channel_id: chan.2,
htlc_id: 0,
amount_msat: 0,
payment_hash,
cltv_expiry,
onion_routing_packet,
};
nodes[0].node.handle_update_add_htlc(&nodes[1].node.get_our_node_id(), &update_add_htlc);
}
// Check that Alice fails backward the pending HTLC from the second payment.
expect_payment_failed!(nodes[0], failed_payment_hash, true);
check_closed_broadcast!(nodes[0], true);
check_added_monitors!(nodes[0], 1);
}
#[test]
fn test_htlc_ignore_latest_remote_commitment() {
// Test that HTLC transactions spending the latest remote commitment transaction are simply