Merge pull request #2293 from wpaulino/disconnect-peers-timer-tick

Disconnect peers on timer ticks to unblock channel state machine
This commit is contained in:
Matt Corallo 2023-05-30 18:30:49 +00:00 committed by GitHub
commit eec5ec6b50
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 299 additions and 23 deletions

View file

@ -479,6 +479,13 @@ pub(crate) const MIN_AFFORDABLE_HTLC_COUNT: usize = 4;
/// * `EXPIRE_PREV_CONFIG_TICKS` = convergence_delay / tick_interval
pub(crate) const EXPIRE_PREV_CONFIG_TICKS: usize = 5;
/// The number of ticks that may elapse while we're waiting for a response to a
/// [`msgs::RevokeAndACK`] or [`msgs::ChannelReestablish`] message before we attempt to disconnect
/// them.
///
/// See [`Channel::sent_message_awaiting_response`] for more information.
pub(crate) const DISCONNECT_PEER_AWAITING_RESPONSE_TICKS: usize = 2;
struct PendingChannelMonitorUpdate {
update: ChannelMonitorUpdate,
/// In some cases we need to delay letting the [`ChannelMonitorUpdate`] go until after an
@ -715,6 +722,19 @@ pub(super) struct Channel<Signer: ChannelSigner> {
/// See-also <https://github.com/lightningnetwork/lnd/issues/4006>
pub workaround_lnd_bug_4006: Option<msgs::ChannelReady>,
/// An option set when we wish to track how many ticks have elapsed while waiting for a response
/// from our counterparty after sending a message. If the peer has yet to respond after reaching
/// `DISCONNECT_PEER_AWAITING_RESPONSE_TICKS`, a reconnection should be attempted to try to
/// unblock the state machine.
///
/// This behavior is mostly motivated by a lnd bug in which we don't receive a message we expect
/// to in a timely manner, which may lead to channels becoming unusable and/or force-closed. An
/// example of such can be found at <https://github.com/lightningnetwork/lnd/issues/7682>.
///
/// This is currently only used when waiting for a [`msgs::ChannelReestablish`] or
/// [`msgs::RevokeAndACK`] message from the counterparty.
sent_message_awaiting_response: Option<usize>,
#[cfg(any(test, fuzzing))]
// When we receive an HTLC fulfill on an outbound path, we may immediately fulfill the
// corresponding HTLC on the inbound path. If, then, the outbound path channel is
@ -1130,6 +1150,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
next_remote_commitment_tx_fee_info_cached: Mutex::new(None),
workaround_lnd_bug_4006: None,
sent_message_awaiting_response: None,
latest_inbound_scid_alias: None,
outbound_scid_alias,
@ -1489,6 +1510,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
next_remote_commitment_tx_fee_info_cached: Mutex::new(None),
workaround_lnd_bug_4006: None,
sent_message_awaiting_response: None,
latest_inbound_scid_alias: None,
outbound_scid_alias,
@ -3526,6 +3548,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
// OK, we step the channel here and *then* if the new generation fails we can fail the
// channel based on that, but stepping stuff here should be safe either way.
self.channel_state &= !(ChannelState::AwaitingRemoteRevoke as u32);
self.sent_message_awaiting_response = None;
self.counterparty_prev_commitment_point = self.counterparty_cur_commitment_point;
self.counterparty_cur_commitment_point = Some(msg.next_per_commitment_point);
self.cur_counterparty_commitment_transaction_number -= 1;
@ -3841,6 +3864,8 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}
}
self.sent_message_awaiting_response = None;
self.channel_state |= ChannelState::PeerDisconnected as u32;
log_trace!(logger, "Peer disconnection resulted in {} remote-announced HTLC drops on channel {}", inbound_drop_count, log_bytes!(self.channel_id()));
}
@ -3943,6 +3968,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
Some(self.get_last_revoke_and_ack())
} else { None };
let commitment_update = if self.monitor_pending_commitment_signed {
self.mark_awaiting_response();
Some(self.get_last_commitment_update(logger))
} else { None };
@ -4132,6 +4158,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
// Go ahead and unmark PeerDisconnected as various calls we may make check for it (and all
// remaining cases either succeed or ErrorMessage-fail).
self.channel_state &= !(ChannelState::PeerDisconnected as u32);
self.sent_message_awaiting_response = None;
let shutdown_msg = if self.channel_state & (ChannelState::LocalShutdownSent as u32) != 0 {
assert!(self.shutdown_scriptpubkey.is_some());
@ -4192,7 +4219,11 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
// revoke_and_ack, not on sending commitment_signed, so we add one if have
// AwaitingRemoteRevoke set, which indicates we sent a commitment_signed but haven't gotten
// the corresponding revoke_and_ack back yet.
let next_counterparty_commitment_number = INITIAL_COMMITMENT_NUMBER - self.cur_counterparty_commitment_transaction_number + if (self.channel_state & ChannelState::AwaitingRemoteRevoke as u32) != 0 { 1 } else { 0 };
let is_awaiting_remote_revoke = self.channel_state & ChannelState::AwaitingRemoteRevoke as u32 != 0;
if is_awaiting_remote_revoke && !self.is_awaiting_monitor_update() {
self.mark_awaiting_response();
}
let next_counterparty_commitment_number = INITIAL_COMMITMENT_NUMBER - self.cur_counterparty_commitment_transaction_number + if is_awaiting_remote_revoke { 1 } else { 0 };
let channel_ready = if msg.next_local_commitment_number == 1 && INITIAL_COMMITMENT_NUMBER - self.cur_holder_commitment_transaction_number == 1 {
// We should never have to worry about MonitorUpdateInProgress resending ChannelReady
@ -4361,6 +4392,28 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}), None))
}
// Marks a channel as waiting for a response from the counterparty. If it's not received
// [`DISCONNECT_PEER_AWAITING_RESPONSE_TICKS`] after sending our own to them, then we'll attempt
// a reconnection.
fn mark_awaiting_response(&mut self) {
self.sent_message_awaiting_response = Some(0);
}
/// Determines whether we should disconnect the counterparty due to not receiving a response
/// within our expected timeframe.
///
/// This should be called on every [`super::channelmanager::ChannelManager::timer_tick_occurred`].
pub fn should_disconnect_peer_awaiting_response(&mut self) -> bool {
let ticks_elapsed = if let Some(ticks_elapsed) = self.sent_message_awaiting_response.as_mut() {
ticks_elapsed
} else {
// Don't disconnect when we're not waiting on a response.
return false;
};
*ticks_elapsed += 1;
*ticks_elapsed >= DISCONNECT_PEER_AWAITING_RESPONSE_TICKS
}
pub fn shutdown<SP: Deref>(
&mut self, signer_provider: &SP, their_features: &InitFeatures, msg: &msgs::Shutdown
) -> Result<(Option<msgs::Shutdown>, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), ChannelError>
@ -5733,7 +5786,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
/// May panic if called on a channel that wasn't immediately-previously
/// self.remove_uncommitted_htlcs_and_mark_paused()'d
pub fn get_channel_reestablish<L: Deref>(&self, logger: &L) -> msgs::ChannelReestablish where L::Target: Logger {
pub fn get_channel_reestablish<L: Deref>(&mut self, logger: &L) -> msgs::ChannelReestablish where L::Target: Logger {
assert_eq!(self.channel_state & ChannelState::PeerDisconnected as u32, ChannelState::PeerDisconnected as u32);
assert_ne!(self.cur_counterparty_commitment_transaction_number, INITIAL_COMMITMENT_NUMBER);
// Prior to static_remotekey, my_current_per_commitment_point was critical to claiming
@ -5752,6 +5805,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
log_info!(logger, "Sending a data_loss_protect with no previous remote per_commitment_secret for channel {}", log_bytes!(self.channel_id()));
[0;32]
};
self.mark_awaiting_response();
msgs::ChannelReestablish {
channel_id: self.channel_id(),
// The protocol has two different commitment number concepts - the "commitment
@ -7090,6 +7144,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
next_remote_commitment_tx_fee_info_cached: Mutex::new(None),
workaround_lnd_bug_4006: None,
sent_message_awaiting_response: None,
latest_inbound_scid_alias,
// Later in the ChannelManager deserialization phase we scan for channels and assign scid aliases if its missing

View file

@ -3921,6 +3921,20 @@ where
chan.maybe_expire_prev_config();
if chan.should_disconnect_peer_awaiting_response() {
log_debug!(self.logger, "Disconnecting peer {} due to not making any progress on channel {}",
counterparty_node_id, log_bytes!(*chan_id));
pending_msg_events.push(MessageSendEvent::HandleError {
node_id: counterparty_node_id,
action: msgs::ErrorAction::DisconnectPeerWithWarning {
msg: msgs::WarningMessage {
channel_id: *chan_id,
data: "Disconnecting due to timeout awaiting response".to_owned(),
},
},
});
}
true
});
if peer_state.ok_to_remove(true) {

View file

@ -22,7 +22,7 @@ use crate::events::{Event, MessageSendEvent, MessageSendEventsProvider, PathFail
use crate::ln::{PaymentPreimage, PaymentSecret, PaymentHash};
use crate::ln::channel::{commitment_tx_base_weight, COMMITMENT_TX_WEIGHT_PER_HTLC, CONCURRENT_INBOUND_HTLC_FEE_BUFFER, FEE_SPIKE_BUFFER_FEE_INCREASE_MULTIPLE, MIN_AFFORDABLE_HTLC_COUNT};
use crate::ln::channelmanager::{self, PaymentId, RAACommitmentOrder, PaymentSendFailure, RecipientOnionFields, BREAKDOWN_TIMEOUT, ENABLE_GOSSIP_TICKS, DISABLE_GOSSIP_TICKS, MIN_CLTV_EXPIRY_DELTA};
use crate::ln::channel::{Channel, ChannelError};
use crate::ln::channel::{DISCONNECT_PEER_AWAITING_RESPONSE_TICKS, Channel, ChannelError};
use crate::ln::{chan_utils, onion_utils};
use crate::ln::chan_utils::{OFFERED_HTLC_SCRIPT_WEIGHT, htlc_success_tx_weight, htlc_timeout_tx_weight, HTLCOutputInCommitment};
use crate::routing::gossip::{NetworkGraph, NetworkUpdate};
@ -9955,3 +9955,128 @@ fn test_payment_with_custom_min_cltv_expiry_delta() {
do_payment_with_custom_min_final_cltv_expiry(true, false);
do_payment_with_custom_min_final_cltv_expiry(true, true);
}
#[test]
fn test_disconnects_peer_awaiting_response_ticks() {
// Tests that nodes which are awaiting on a response critical for channel responsiveness
// disconnect their counterparty after `DISCONNECT_PEER_AWAITING_RESPONSE_TICKS`.
let mut chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
// Asserts a disconnect event is queued to the user.
let check_disconnect_event = |node: &Node, should_disconnect: bool| {
let disconnect_event = node.node.get_and_clear_pending_msg_events().iter().find_map(|event|
if let MessageSendEvent::HandleError { action, .. } = event {
if let msgs::ErrorAction::DisconnectPeerWithWarning { .. } = action {
Some(())
} else {
None
}
} else {
None
}
);
assert_eq!(disconnect_event.is_some(), should_disconnect);
};
// Fires timer ticks ensuring we only attempt to disconnect peers after reaching
// `DISCONNECT_PEER_AWAITING_RESPONSE_TICKS`.
let check_disconnect = |node: &Node| {
// No disconnect without any timer ticks.
check_disconnect_event(node, false);
// No disconnect with 1 timer tick less than required.
for _ in 0..DISCONNECT_PEER_AWAITING_RESPONSE_TICKS - 1 {
node.node.timer_tick_occurred();
check_disconnect_event(node, false);
}
// Disconnect after reaching the required ticks.
node.node.timer_tick_occurred();
check_disconnect_event(node, true);
// Disconnect again on the next tick if the peer hasn't been disconnected yet.
node.node.timer_tick_occurred();
check_disconnect_event(node, true);
};
create_chan_between_nodes(&nodes[0], &nodes[1]);
// We'll start by performing a fee update with Alice (nodes[0]) on the channel.
*nodes[0].fee_estimator.sat_per_kw.lock().unwrap() *= 2;
nodes[0].node.timer_tick_occurred();
check_added_monitors!(&nodes[0], 1);
let alice_fee_update = get_htlc_update_msgs(&nodes[0], &nodes[1].node.get_our_node_id());
nodes[1].node.handle_update_fee(&nodes[0].node.get_our_node_id(), alice_fee_update.update_fee.as_ref().unwrap());
nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &alice_fee_update.commitment_signed);
check_added_monitors!(&nodes[1], 1);
// This will prompt Bob (nodes[1]) to respond with his `CommitmentSigned` and `RevokeAndACK`.
let (bob_revoke_and_ack, bob_commitment_signed) = get_revoke_commit_msgs!(&nodes[1], nodes[0].node.get_our_node_id());
nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bob_revoke_and_ack);
check_added_monitors!(&nodes[0], 1);
nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &bob_commitment_signed);
check_added_monitors(&nodes[0], 1);
// Alice then needs to send her final `RevokeAndACK` to complete the commitment dance. We
// pretend Bob hasn't received the message and check whether he'll disconnect Alice after
// reaching `DISCONNECT_PEER_AWAITING_RESPONSE_TICKS`.
let alice_revoke_and_ack = get_event_msg!(nodes[0], MessageSendEvent::SendRevokeAndACK, nodes[1].node.get_our_node_id());
check_disconnect(&nodes[1]);
// Now, we'll reconnect them to test awaiting a `ChannelReestablish` message.
//
// Note that since the commitment dance didn't complete above, Alice is expected to resend her
// final `RevokeAndACK` to Bob to complete it.
nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
let bob_init = msgs::Init { features: nodes[1].node.init_features(), remote_network_address: None };
nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id(), &bob_init, true).unwrap();
let alice_init = msgs::Init { features: nodes[0].node.init_features(), remote_network_address: None };
nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &alice_init, true).unwrap();
// Upon reconnection, Alice sends her `ChannelReestablish` to Bob. Alice, however, hasn't
// received Bob's yet, so she should disconnect him after reaching
// `DISCONNECT_PEER_AWAITING_RESPONSE_TICKS`.
let alice_channel_reestablish = get_event_msg!(
nodes[0], MessageSendEvent::SendChannelReestablish, nodes[1].node.get_our_node_id()
);
nodes[1].node.handle_channel_reestablish(&nodes[0].node.get_our_node_id(), &alice_channel_reestablish);
check_disconnect(&nodes[0]);
// Bob now sends his `ChannelReestablish` to Alice to resume the channel and consider it "live".
let bob_channel_reestablish = nodes[1].node.get_and_clear_pending_msg_events().iter().find_map(|event|
if let MessageSendEvent::SendChannelReestablish { node_id, msg } = event {
assert_eq!(*node_id, nodes[0].node.get_our_node_id());
Some(msg.clone())
} else {
None
}
).unwrap();
nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &bob_channel_reestablish);
// Sanity check that Alice won't disconnect Bob since she's no longer waiting for any messages.
for _ in 0..DISCONNECT_PEER_AWAITING_RESPONSE_TICKS {
nodes[0].node.timer_tick_occurred();
check_disconnect_event(&nodes[0], false);
}
// However, Bob is still waiting on Alice's `RevokeAndACK`, so he should disconnect her after
// reaching `DISCONNECT_PEER_AWAITING_RESPONSE_TICKS`.
check_disconnect(&nodes[1]);
// Finally, have Bob process the last message.
nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(), &alice_revoke_and_ack);
check_added_monitors(&nodes[1], 1);
// At this point, neither node should attempt to disconnect each other, since they aren't
// waiting on any messages.
for node in &nodes {
for _ in 0..DISCONNECT_PEER_AWAITING_RESPONSE_TICKS {
node.node.timer_tick_occurred();
check_disconnect_event(node, false);
}
}
}

View file

@ -1137,6 +1137,11 @@ pub enum ErrorAction {
/// An error message which we should make an effort to send before we disconnect.
msg: Option<ErrorMessage>
},
/// The peer did something incorrect. Tell them without closing any channels and disconnect them.
DisconnectPeerWithWarning {
/// A warning message which we should make an effort to send before we disconnect.
msg: WarningMessage,
},
/// The peer did something harmless that we weren't able to process, just log and ignore
// New code should *not* use this. New code must use IgnoreAndLog, below!
IgnoreError,

View file

@ -26,7 +26,7 @@ use crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager
use crate::util::ser::{VecWriter, Writeable, Writer};
use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
use crate::ln::wire;
use crate::ln::wire::Encode;
use crate::ln::wire::{Encode, Type};
use crate::onion_message::{CustomOnionMessageContents, CustomOnionMessageHandler, SimpleArcOnionMessenger, SimpleRefOnionMessenger};
use crate::routing::gossip::{NetworkGraph, P2PGossipSync, NodeId, NodeAlias};
use crate::util::atomic_counter::AtomicCounter;
@ -1230,8 +1230,21 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
Ok(x) => x,
Err(e) => {
match e.action {
msgs::ErrorAction::DisconnectPeer { msg: _ } => {
//TODO: Try to push msg
msgs::ErrorAction::DisconnectPeer { .. } => {
// We may have an `ErrorMessage` to send to the peer,
// but writing to the socket while reading can lead to
// re-entrant code and possibly unexpected behavior. The
// message send is optimistic anyway, and in this case
// we immediately disconnect the peer.
log_debug!(self.logger, "Error handling message{}; disconnecting peer with: {}", OptionalFromDebugger(&peer_node_id), e.err);
return Err(PeerHandleError { });
},
msgs::ErrorAction::DisconnectPeerWithWarning { .. } => {
// We have a `WarningMessage` to send to the peer, but
// writing to the socket while reading can lead to
// re-entrant code and possibly unexpected behavior. The
// message send is optimistic anyway, and in this case
// we immediately disconnect the peer.
log_debug!(self.logger, "Error handling message{}; disconnecting peer with: {}", OptionalFromDebugger(&peer_node_id), e.err);
return Err(PeerHandleError { });
},
@ -1362,7 +1375,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
Ok(x) => x,
Err(e) => {
match e {
// Note that to avoid recursion we never call
// Note that to avoid re-entrancy we never call
// `do_attempt_write_data` from here, causing
// the messages enqueued here to not actually
// be sent before the peer is disconnected.
@ -1383,9 +1396,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
});
continue;
}
(msgs::DecodeError::UnknownRequiredFeature, ty) => {
log_gossip!(self.logger, "Received a message with an unknown required feature flag or TLV, you may want to update!");
self.enqueue_message(peer, &msgs::WarningMessage { channel_id: [0; 32], data: format!("Received an unknown required feature/TLV in message type {:?}", ty) });
(msgs::DecodeError::UnknownRequiredFeature, _) => {
log_debug!(self.logger, "Received a message with an unknown required feature flag or TLV, you may want to update!");
return Err(PeerHandleError { });
}
(msgs::DecodeError::UnknownVersion, _) => return Err(PeerHandleError { }),
@ -2065,32 +2077,48 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
log_pubkey!(node_id), msg.contents.short_channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::HandleError { ref node_id, ref action } => {
match *action {
msgs::ErrorAction::DisconnectPeer { ref msg } => {
MessageSendEvent::HandleError { node_id, action } => {
match action {
msgs::ErrorAction::DisconnectPeer { msg } => {
if let Some(msg) = msg.as_ref() {
log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
log_pubkey!(node_id), msg.data);
} else {
log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {}",
log_pubkey!(node_id));
}
// We do not have the peers write lock, so we just store that we're
// about to disconenct the peer and do it after we finish
// processing most messages.
peers_to_disconnect.insert(*node_id, msg.clone());
let msg = msg.map(|msg| wire::Message::<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>::Error(msg));
peers_to_disconnect.insert(node_id, msg);
},
msgs::ErrorAction::DisconnectPeerWithWarning { msg } => {
log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
log_pubkey!(node_id), msg.data);
// We do not have the peers write lock, so we just store that we're
// about to disconenct the peer and do it after we finish
// processing most messages.
peers_to_disconnect.insert(node_id, Some(wire::Message::Warning(msg)));
},
msgs::ErrorAction::IgnoreAndLog(level) => {
log_given_level!(self.logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
},
msgs::ErrorAction::IgnoreDuplicateGossip => {},
msgs::ErrorAction::IgnoreError => {
log_debug!(self.logger, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
},
log_debug!(self.logger, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
},
msgs::ErrorAction::SendErrorMessage { ref msg } => {
log_trace!(self.logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
log_pubkey!(node_id),
msg.data);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg);
},
msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => {
log_given_level!(self.logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}",
log_pubkey!(node_id),
msg.data);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg);
},
}
},
@ -2140,9 +2168,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
if let Some(peer_mutex) = peers.remove(&descriptor) {
let mut peer = peer_mutex.lock().unwrap();
if let Some(msg) = msg {
log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
log_pubkey!(node_id),
msg.data);
self.enqueue_message(&mut *peer, &msg);
// This isn't guaranteed to work, but if there is enough free
// room in the send buffer, put the error message there...

View file

@ -96,9 +96,59 @@ pub(crate) enum Message<T> where T: core::fmt::Debug + Type + TestEq {
Custom(T),
}
impl<T> Message<T> where T: core::fmt::Debug + Type + TestEq {
impl<T> Writeable for Message<T> where T: core::fmt::Debug + Type + TestEq {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {
match self {
&Message::Init(ref msg) => msg.write(writer),
&Message::Error(ref msg) => msg.write(writer),
&Message::Warning(ref msg) => msg.write(writer),
&Message::Ping(ref msg) => msg.write(writer),
&Message::Pong(ref msg) => msg.write(writer),
&Message::OpenChannel(ref msg) => msg.write(writer),
&Message::OpenChannelV2(ref msg) => msg.write(writer),
&Message::AcceptChannel(ref msg) => msg.write(writer),
&Message::AcceptChannelV2(ref msg) => msg.write(writer),
&Message::FundingCreated(ref msg) => msg.write(writer),
&Message::FundingSigned(ref msg) => msg.write(writer),
&Message::TxAddInput(ref msg) => msg.write(writer),
&Message::TxAddOutput(ref msg) => msg.write(writer),
&Message::TxRemoveInput(ref msg) => msg.write(writer),
&Message::TxRemoveOutput(ref msg) => msg.write(writer),
&Message::TxComplete(ref msg) => msg.write(writer),
&Message::TxSignatures(ref msg) => msg.write(writer),
&Message::TxInitRbf(ref msg) => msg.write(writer),
&Message::TxAckRbf(ref msg) => msg.write(writer),
&Message::TxAbort(ref msg) => msg.write(writer),
&Message::ChannelReady(ref msg) => msg.write(writer),
&Message::Shutdown(ref msg) => msg.write(writer),
&Message::ClosingSigned(ref msg) => msg.write(writer),
&Message::OnionMessage(ref msg) => msg.write(writer),
&Message::UpdateAddHTLC(ref msg) => msg.write(writer),
&Message::UpdateFulfillHTLC(ref msg) => msg.write(writer),
&Message::UpdateFailHTLC(ref msg) => msg.write(writer),
&Message::UpdateFailMalformedHTLC(ref msg) => msg.write(writer),
&Message::CommitmentSigned(ref msg) => msg.write(writer),
&Message::RevokeAndACK(ref msg) => msg.write(writer),
&Message::UpdateFee(ref msg) => msg.write(writer),
&Message::ChannelReestablish(ref msg) => msg.write(writer),
&Message::AnnouncementSignatures(ref msg) => msg.write(writer),
&Message::ChannelAnnouncement(ref msg) => msg.write(writer),
&Message::NodeAnnouncement(ref msg) => msg.write(writer),
&Message::ChannelUpdate(ref msg) => msg.write(writer),
&Message::QueryShortChannelIds(ref msg) => msg.write(writer),
&Message::ReplyShortChannelIdsEnd(ref msg) => msg.write(writer),
&Message::QueryChannelRange(ref msg) => msg.write(writer),
&Message::ReplyChannelRange(ref msg) => msg.write(writer),
&Message::GossipTimestampFilter(ref msg) => msg.write(writer),
&Message::Unknown(_) => { Ok(()) },
&Message::Custom(ref msg) => msg.write(writer),
}
}
}
impl<T> Type for Message<T> where T: core::fmt::Debug + Type + TestEq {
/// Returns the type that was used to decode the message payload.
pub fn type_id(&self) -> u16 {
fn type_id(&self) -> u16 {
match self {
&Message::Init(ref msg) => msg.type_id(),
&Message::Error(ref msg) => msg.type_id(),
@ -145,7 +195,9 @@ impl<T> Message<T> where T: core::fmt::Debug + Type + TestEq {
&Message::Custom(ref msg) => msg.type_id(),
}
}
}
impl<T> Message<T> where T: core::fmt::Debug + Type + TestEq {
/// Returns whether the message's type is even, indicating both endpoints must support it.
pub fn is_even(&self) -> bool {
(self.type_id() & 1) == 0