Basic error handling framework in peer_handler

This commit is contained in:
Matt Corallo 2018-04-01 19:23:09 -04:00
parent 4fbc0b3768
commit 91b964ae1f
3 changed files with 101 additions and 46 deletions

View file

@ -1481,6 +1481,37 @@ impl ChannelMessageHandler for ChannelManager {
pending_events.push(events::Event::BroadcastChannelAnnouncement { msg: chan_announcement, update_msg: chan_update }); pending_events.push(events::Event::BroadcastChannelAnnouncement { msg: chan_announcement, update_msg: chan_update });
Ok(()) Ok(())
} }
fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let channel_state = channel_state_lock.borrow_parts();
let short_to_id = channel_state.short_to_id;
if no_connection_possible {
channel_state.by_id.retain(move |_, chan| {
if chan.get_their_node_id() == *their_node_id {
match chan.get_short_channel_id() {
Some(short_id) => {
short_to_id.remove(&short_id);
},
None => {},
}
//TODO: get the latest commitment tx, any HTLC txn built on top of it, etc out
//of the channel and throw those into the announcement blackhole.
false
} else {
true
}
});
} else {
for chan in channel_state.by_id {
if chan.1.get_their_node_id() == *their_node_id {
//TODO: mark channel disabled (and maybe announce such after a timeout). Also
//fail and wipe any uncommitted outbound HTLCs as those are considered after
//reconnect.
}
}
}
}
} }
#[cfg(test)] #[cfg(test)]

View file

@ -381,6 +381,13 @@ pub trait ChannelMessageHandler : events::EventsProvider {
// Channel-to-announce: // Channel-to-announce:
fn handle_announcement_signatures(&self, their_node_id: &PublicKey, msg: &AnnouncementSignatures) -> Result<(), HandleError>; fn handle_announcement_signatures(&self, their_node_id: &PublicKey, msg: &AnnouncementSignatures) -> Result<(), HandleError>;
// Informational:
/// Indicates a connection to the peer failed/an existing connection was lost. If no connection
/// is believed to be possible in the future (eg they're sending us messages we don't
/// understand or indicate they require unknown feature bits), no_connection_possible is set
/// and any outstanding channels should be failed.
fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool);
} }
pub trait RoutingMessageHandler { pub trait RoutingMessageHandler {

View file

@ -223,7 +223,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
match self.do_read_event(peer_descriptor, data) { match self.do_read_event(peer_descriptor, data) {
Ok(res) => Ok(res), Ok(res) => Ok(res),
Err(e) => { Err(e) => {
self.disconnect_event(peer_descriptor); self.disconnect_event_internal(peer_descriptor, e.no_connection_possible);
Err(e) Err(e)
} }
} }
@ -238,38 +238,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
assert!(peer.pending_read_buffer.len() > 0); assert!(peer.pending_read_buffer.len() > 0);
assert!(peer.pending_read_buffer.len() > peer.pending_read_buffer_pos); assert!(peer.pending_read_buffer.len() > peer.pending_read_buffer_pos);
macro_rules! try_potential_handleerror {
($thing: expr) => {
match $thing {
Ok(x) => x,
Err(_e) => {
//TODO: Handle e appropriately!
return Err(PeerHandleError{});
}
};
}
}
macro_rules! try_potential_decodeerror {
($thing: expr) => {
match $thing {
Ok(x) => x,
Err(_e) => {
//TODO: Handle e?
return Err(PeerHandleError{});
}
};
}
}
macro_rules! encode_and_send_msg {
($msg: expr, $msg_code: expr) => {
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..]));
}
}
let mut insert_node_id = None; let mut insert_node_id = None;
let mut read_pos = 0; let mut read_pos = 0;
while read_pos < data.len() { while read_pos < data.len() {
{ {
@ -278,7 +247,52 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
read_pos += data_to_copy; read_pos += data_to_copy;
peer.pending_read_buffer_pos += data_to_copy; peer.pending_read_buffer_pos += data_to_copy;
} }
if peer.pending_read_buffer_pos == peer.pending_read_buffer.len() { if peer.pending_read_buffer_pos == peer.pending_read_buffer.len() {
peer.pending_read_buffer_pos = 0;
macro_rules! encode_and_send_msg {
($msg: expr, $msg_code: expr) => {
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..]));
}
}
macro_rules! try_potential_handleerror {
($thing: expr) => {
match $thing {
Ok(x) => x,
Err(e) => {
// TODO: Log e.err
if let Some(action) = e.msg {
match action {
msgs::ErrorAction::UpdateFailHTLC { msg } => {
encode_and_send_msg!(msg, 131);
continue;
},
msgs::ErrorAction::DisconnectPeer {} => {
return Err(PeerHandleError{ no_connection_possible: false });
},
}
} else {
return Err(PeerHandleError{ no_connection_possible: false });
}
}
};
}
}
macro_rules! try_potential_decodeerror {
($thing: expr) => {
match $thing {
Ok(x) => x,
Err(_e) => {
//TODO: Handle e?
return Err(PeerHandleError{ no_connection_possible: false });
}
};
}
}
let next_step = peer.channel_encryptor.get_noise_step(); let next_step = peer.channel_encryptor.get_noise_step();
match next_step { match next_step {
NextNoiseStep::ActOne => { NextNoiseStep::ActOne => {
@ -311,27 +325,31 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
peer.pending_read_buffer = Vec::with_capacity(msg_len as usize + 16); peer.pending_read_buffer = Vec::with_capacity(msg_len as usize + 16);
peer.pending_read_buffer.resize(msg_len as usize + 16, 0); peer.pending_read_buffer.resize(msg_len as usize + 16, 0);
if msg_len < 2 { // Need at least the message type tag if msg_len < 2 { // Need at least the message type tag
return Err(PeerHandleError{}); return Err(PeerHandleError{ no_connection_possible: false });
} }
peer.pending_read_is_header = false; peer.pending_read_is_header = false;
} else { } else {
let msg_data = try_potential_handleerror!(peer.channel_encryptor.decrypt_message(&peer.pending_read_buffer[..])); let msg_data = try_potential_handleerror!(peer.channel_encryptor.decrypt_message(&peer.pending_read_buffer[..]));
assert!(msg_data.len() >= 2); assert!(msg_data.len() >= 2);
// Reset read buffer
peer.pending_read_buffer = [0; 18].to_vec();
peer.pending_read_is_header = true;
let msg_type = byte_utils::slice_to_be16(&msg_data[0..2]); let msg_type = byte_utils::slice_to_be16(&msg_data[0..2]);
if msg_type != 16 && peer.their_global_features.is_none() { if msg_type != 16 && peer.their_global_features.is_none() {
// Need an init message as first message // Need an init message as first message
return Err(PeerHandleError{}); return Err(PeerHandleError{ no_connection_possible: false });
} }
match msg_type { match msg_type {
// Connection control: // Connection control:
16 => { 16 => {
let msg = try_potential_decodeerror!(msgs::Init::decode(&msg_data[2..])); let msg = try_potential_decodeerror!(msgs::Init::decode(&msg_data[2..]));
if msg.global_features.requires_unknown_bits() { if msg.global_features.requires_unknown_bits() {
return Err(PeerHandleError{}); return Err(PeerHandleError{ no_connection_possible: true });
} }
if msg.local_features.requires_unknown_bits() { if msg.local_features.requires_unknown_bits() {
return Err(PeerHandleError{}); return Err(PeerHandleError{ no_connection_possible: true });
} }
peer.their_global_features = Some(msg.global_features); peer.their_global_features = Some(msg.global_features);
peer.their_local_features = Some(msg.local_features); peer.their_local_features = Some(msg.local_features);
@ -460,18 +478,13 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
}, },
_ => { _ => {
if (msg_type & 1) == 0 { if (msg_type & 1) == 0 {
//TODO: Fail all channels. Kill the peer! return Err(PeerHandleError{ no_connection_possible: true });
return Err(PeerHandleError{});
} }
}, },
} }
peer.pending_read_buffer = [0; 18].to_vec();
peer.pending_read_is_header = true;
} }
} }
} }
peer.pending_read_buffer_pos = 0;
} }
} }
@ -623,18 +636,22 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
/// but must NOT be called if a PeerHandleError was provided out of a new_*_connection event! /// but must NOT be called if a PeerHandleError was provided out of a new_*_connection event!
/// Panics if the descriptor was not previously registered in a successful new_*_connection event. /// Panics if the descriptor was not previously registered in a successful new_*_connection event.
pub fn disconnect_event(&self, descriptor: &Descriptor) { pub fn disconnect_event(&self, descriptor: &Descriptor) {
self.disconnect_event_internal(descriptor, false);
}
fn disconnect_event_internal(&self, descriptor: &Descriptor, no_connection_possible: bool) {
let mut peers = self.peers.lock().unwrap(); let mut peers = self.peers.lock().unwrap();
let peer_option = peers.peers.remove(descriptor); let peer_option = peers.peers.remove(descriptor);
match peer_option { match peer_option {
None => panic!("Descriptor for disconnect_event is not already known to PeerManager"), None => panic!("Descriptor for disconnect_event is not already known to PeerManager"),
Some(peer) => { Some(peer) => {
match peer.their_node_id { match peer.their_node_id {
Some(node_id) => { peers.node_id_to_descriptor.remove(&node_id); }, Some(node_id) => {
peers.node_id_to_descriptor.remove(&node_id);
self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible);
},
None => {} None => {}
} }
//TODO: Notify the chan_handler that this node disconnected, and do something about
//handling response messages that were queued for sending (maybe the send buffer
//needs to be unencrypted?)
} }
}; };
} }