Send channel_reestablish out-of-band to ensure ordered deliver

This commit is contained in:
Matt Corallo 2018-10-20 17:50:34 -04:00
parent e2de49ddc4
commit 249aa77550
5 changed files with 63 additions and 22 deletions

View file

@ -2676,9 +2676,10 @@ impl ChannelMessageHandler for ChannelManager {
} }
} }
fn peer_connected(&self, their_node_id: &PublicKey) -> Vec<msgs::ChannelReestablish> { fn peer_connected(&self, their_node_id: &PublicKey) {
let mut res = Vec::new(); let mut channel_state_lock = self.channel_state.lock().unwrap();
let mut channel_state = self.channel_state.lock().unwrap(); let channel_state = channel_state_lock.borrow_parts();
let pending_msg_events = channel_state.pending_msg_events;
channel_state.by_id.retain(|_, chan| { channel_state.by_id.retain(|_, chan| {
if chan.get_their_node_id() == *their_node_id { if chan.get_their_node_id() == *their_node_id {
if !chan.have_received_message() { if !chan.have_received_message() {
@ -2688,13 +2689,15 @@ impl ChannelMessageHandler for ChannelManager {
// drop it. // drop it.
false false
} else { } else {
res.push(chan.get_channel_reestablish()); pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
node_id: chan.get_their_node_id(),
msg: chan.get_channel_reestablish(),
});
true true
} }
} else { true } } else { true }
}); });
//TODO: Also re-broadcast announcement_signatures //TODO: Also re-broadcast announcement_signatures
res
} }
fn handle_error(&self, their_node_id: &PublicKey, msg: &msgs::ErrorMessage) { fn handle_error(&self, their_node_id: &PublicKey, msg: &msgs::ErrorMessage) {
@ -5197,6 +5200,23 @@ mod tests {
assert_eq!(channel_state.short_to_id.len(), 0); assert_eq!(channel_state.short_to_id.len(), 0);
} }
macro_rules! get_chan_reestablish_msgs {
($src_node: expr, $dst_node: expr) => {
{
let mut res = Vec::with_capacity(1);
for msg in $src_node.node.get_and_clear_pending_msg_events() {
if let MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } = msg {
assert_eq!(*node_id, $dst_node.node.get_our_node_id());
res.push(msg.clone());
} else {
panic!("Unexpected event")
}
}
res
}
}
}
macro_rules! handle_chan_reestablish_msgs { macro_rules! handle_chan_reestablish_msgs {
($src_node: expr, $dst_node: expr) => { ($src_node: expr, $dst_node: expr) => {
{ {
@ -5255,8 +5275,10 @@ mod tests {
/// pending_htlc_adds includes both the holding cell and in-flight update_add_htlcs, whereas /// pending_htlc_adds includes both the holding cell and in-flight update_add_htlcs, whereas
/// for claims/fails they are separated out. /// for claims/fails they are separated out.
fn reconnect_nodes(node_a: &Node, node_b: &Node, pre_all_htlcs: bool, pending_htlc_adds: (i64, i64), pending_htlc_claims: (usize, usize), pending_cell_htlc_claims: (usize, usize), pending_cell_htlc_fails: (usize, usize), pending_raa: (bool, bool)) { fn reconnect_nodes(node_a: &Node, node_b: &Node, pre_all_htlcs: bool, pending_htlc_adds: (i64, i64), pending_htlc_claims: (usize, usize), pending_cell_htlc_claims: (usize, usize), pending_cell_htlc_fails: (usize, usize), pending_raa: (bool, bool)) {
let reestablish_1 = node_a.node.peer_connected(&node_b.node.get_our_node_id()); node_a.node.peer_connected(&node_b.node.get_our_node_id());
let reestablish_2 = node_b.node.peer_connected(&node_a.node.get_our_node_id()); let reestablish_1 = get_chan_reestablish_msgs!(node_a, node_b);
node_b.node.peer_connected(&node_a.node.get_our_node_id());
let reestablish_2 = get_chan_reestablish_msgs!(node_b, node_a);
let mut resp_1 = Vec::new(); let mut resp_1 = Vec::new();
for msg in reestablish_1 { for msg in reestablish_1 {
@ -5754,9 +5776,11 @@ mod tests {
nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false); nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id()); nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]);
assert_eq!(reestablish_1.len(), 1); assert_eq!(reestablish_1.len(), 1);
let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id()); nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]);
assert_eq!(reestablish_2.len(), 1); assert_eq!(reestablish_2.len(), 1);
nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap(); nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap();
@ -6042,9 +6066,11 @@ mod tests {
nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false); nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id()); nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]);
assert_eq!(reestablish_1.len(), 1); assert_eq!(reestablish_1.len(), 1);
let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id()); nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]);
assert_eq!(reestablish_2.len(), 1); assert_eq!(reestablish_2.len(), 1);
nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap(); nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap();
@ -6062,9 +6088,11 @@ mod tests {
assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id()); nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]);
assert_eq!(reestablish_1.len(), 1); assert_eq!(reestablish_1.len(), 1);
let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id()); nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]);
assert_eq!(reestablish_2.len(), 1); assert_eq!(reestablish_2.len(), 1);
nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap(); nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap();

View file

@ -308,14 +308,14 @@ pub struct UpdateFee {
pub(crate) feerate_per_kw: u32, pub(crate) feerate_per_kw: u32,
} }
#[derive(PartialEq)] #[derive(PartialEq, Clone)]
pub(crate) struct DataLossProtect { pub(crate) struct DataLossProtect {
pub(crate) your_last_per_commitment_secret: [u8; 32], pub(crate) your_last_per_commitment_secret: [u8; 32],
pub(crate) my_current_per_commitment_point: PublicKey, pub(crate) my_current_per_commitment_point: PublicKey,
} }
/// A channel_reestablish message to be sent or received from a peer /// A channel_reestablish message to be sent or received from a peer
#[derive(PartialEq)] #[derive(PartialEq, Clone)]
pub struct ChannelReestablish { pub struct ChannelReestablish {
pub(crate) channel_id: [u8; 32], pub(crate) channel_id: [u8; 32],
pub(crate) next_local_commitment_number: u64, pub(crate) next_local_commitment_number: u64,
@ -563,7 +563,7 @@ pub trait ChannelMessageHandler : events::MessageSendEventsProvider + Send + Syn
fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool); fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool);
/// Handle a peer reconnecting, possibly generating channel_reestablish message(s). /// Handle a peer reconnecting, possibly generating channel_reestablish message(s).
fn peer_connected(&self, their_node_id: &PublicKey) -> Vec<ChannelReestablish>; fn peer_connected(&self, their_node_id: &PublicKey);
/// Handle an incoming channel_reestablish message from the given peer. /// Handle an incoming channel_reestablish message from the given peer.
fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &ChannelReestablish) -> Result<(), HandleError>; fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &ChannelReestablish) -> Result<(), HandleError>;

View file

@ -520,9 +520,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
}, 16); }, 16);
} }
for msg in self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap()) { self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap());
encode_and_send_msg!(msg, 136);
}
}, },
17 => { 17 => {
let msg = try_potential_decodeerror!(msgs::ErrorMessage::read(&mut reader)); let msg = try_potential_decodeerror!(msgs::ErrorMessage::read(&mut reader));
@ -834,6 +832,16 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38))); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38)));
Self::do_attempt_write_data(&mut descriptor, peer); Self::do_attempt_write_data(&mut descriptor, peer);
}, },
MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
log_trace!(self, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
//TODO: Do whatever we're gonna do for handling dropped messages
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 136)));
Self::do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id); log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() { if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() {

View file

@ -175,6 +175,13 @@ pub enum MessageSendEvent {
/// The message which should be sent. /// The message which should be sent.
msg: msgs::Shutdown, msg: msgs::Shutdown,
}, },
/// Used to indicate that a channel_reestablish message should be sent to the peer with the given node_id.
SendChannelReestablish {
/// The node_id of the node which should receive this message
node_id: PublicKey,
/// The message which should be sent.
msg: msgs::ChannelReestablish,
},
/// Used to indicate that a channel_announcement and channel_update should be broadcast to all /// Used to indicate that a channel_announcement and channel_update should be broadcast to all
/// peers (except the peer with node_id either msg.contents.node_id_1 or msg.contents.node_id_2). /// peers (except the peer with node_id either msg.contents.node_id_1 or msg.contents.node_id_2).
BroadcastChannelAnnouncement { BroadcastChannelAnnouncement {

View file

@ -135,9 +135,7 @@ impl msgs::ChannelMessageHandler for TestChannelMessageHandler {
Err(HandleError { err: "", action: None }) Err(HandleError { err: "", action: None })
} }
fn peer_disconnected(&self, _their_node_id: &PublicKey, _no_connection_possible: bool) {} fn peer_disconnected(&self, _their_node_id: &PublicKey, _no_connection_possible: bool) {}
fn peer_connected(&self, _their_node_id: &PublicKey) -> Vec<msgs::ChannelReestablish> { fn peer_connected(&self, _their_node_id: &PublicKey) {}
Vec::new()
}
fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {} fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {}
} }