Split Event, move MessageSendEvent push() inside channel_state lock

This commit is contained in:
Matt Corallo 2018-10-19 16:25:32 -04:00
parent 5180686b1d
commit e397cb9960
6 changed files with 328 additions and 415 deletions

View file

@ -460,7 +460,7 @@ pub fn do_test(data: &[u8], logger: &Arc<Logger>) {
_ => return,
}
loss_detector.handler.process_events();
for event in loss_detector.handler.get_and_clear_pending_events() {
for event in loss_detector.manager.get_and_clear_pending_events() {
match event {
Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, output_script, .. } => {
pending_funding_generation.push((temporary_channel_id, channel_value_satoshis, output_script));
@ -473,11 +473,10 @@ pub fn do_test(data: &[u8], logger: &Arc<Logger>) {
},
Event::PaymentSent {..} => {},
Event::PaymentFailed {..} => {},
Event::PendingHTLCsForwardable {..} => {
should_forward = true;
},
_ => panic!("Unknown event"),
Event::SpendableOutputs {..} => {},
}
}
}

File diff suppressed because it is too large Load diff

View file

@ -521,7 +521,7 @@ pub enum RAACommitmentOrder {
///
/// Messages MAY be called in parallel when they originate from different their_node_ids, however
/// they MUST NOT be called in parallel when the two calls have the same their_node_id.
pub trait ChannelMessageHandler : events::EventsProvider + Send + Sync {
pub trait ChannelMessageHandler : events::MessageSendEventsProvider + Send + Sync {
//Channel init:
/// Handle an incoming open_channel message from the given peer.
fn handle_open_channel(&self, their_node_id: &PublicKey, msg: &OpenChannel) -> Result<AcceptChannel, HandleError>;

View file

@ -12,13 +12,13 @@ use ln::msgs;
use util::ser::{Writeable, Writer, Readable};
use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
use util::byte_utils;
use util::events::{EventsProvider,Event};
use util::events::{MessageSendEvent};
use util::logger::Logger;
use std::collections::{HashMap,hash_map,LinkedList};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{cmp,error,mem,hash,fmt};
use std::{cmp,error,hash,fmt};
/// Provides references to trait impls which handle different types of messages.
pub struct MessageHandler {
@ -127,7 +127,6 @@ impl<Descriptor: SocketDescriptor> PeerHolder<Descriptor> {
pub struct PeerManager<Descriptor: SocketDescriptor> {
message_handler: MessageHandler,
peers: Mutex<PeerHolder<Descriptor>>,
pending_events: Mutex<Vec<Event>>,
our_node_secret: SecretKey,
initial_syncs_sent: AtomicUsize,
logger: Arc<Logger>,
@ -164,7 +163,6 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
PeerManager {
message_handler: message_handler,
peers: Mutex::new(PeerHolder { peers: HashMap::new(), node_id_to_descriptor: HashMap::new() }),
pending_events: Mutex::new(Vec::new()),
our_node_secret: our_node_secret,
initial_syncs_sent: AtomicUsize::new(0),
logger,
@ -757,13 +755,12 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
/// Checks for any events generated by our handlers and processes them. May be needed after eg
/// calls to ChannelManager::process_pending_htlc_forward.
pub fn process_events(&self) {
let mut upstream_events = Vec::new();
{
// TODO: There are some DoS attacks here where you can flood someone's outbound send
// buffer by doing things like announcing channels on another node. We should be willing to
// drop optional-ish messages when send buffers get full!
let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_events();
let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
let mut peers = self.peers.lock().unwrap();
for event in events_generated.drain(..) {
macro_rules! get_peer_for_forwarding {
@ -790,15 +787,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
}
}
match event {
Event::FundingGenerationReady {..} => { /* Hand upstream */ },
Event::FundingBroadcastSafe {..} => { /* Hand upstream */ },
Event::PaymentReceived {..} => { /* Hand upstream */ },
Event::PaymentSent {..} => { /* Hand upstream */ },
Event::PaymentFailed {..} => { /* Hand upstream */ },
Event::PendingHTLCsForwardable {..} => { /* Hand upstream */ },
Event::SpendableOutputs { .. } => { /* Hand upstream */ },
Event::SendOpenChannel { ref node_id, ref msg } => {
MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
log_trace!(self, "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.temporary_channel_id));
@ -807,9 +796,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 32)));
Self::do_attempt_write_data(&mut descriptor, peer);
continue;
},
Event::SendFundingCreated { ref node_id, ref msg } => {
MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
log_trace!(self, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
log_pubkey!(node_id),
log_bytes!(msg.temporary_channel_id),
@ -820,9 +808,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 34)));
Self::do_attempt_write_data(&mut descriptor, peer);
continue;
},
Event::SendFundingLocked { ref node_id, ref msg, ref announcement_sigs } => {
MessageSendEvent::SendFundingLocked { ref node_id, ref msg, ref announcement_sigs } => {
log_trace!(self, "Handling SendFundingLocked event in peer_handler for node {}{} for channel {}",
log_pubkey!(node_id),
if announcement_sigs.is_some() { " with announcement sigs" } else { "" },
@ -836,9 +823,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
&None => {},
}
Self::do_attempt_write_data(&mut descriptor, peer);
continue;
},
Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
log_trace!(self, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
log_pubkey!(node_id),
update_add_htlcs.len(),
@ -865,9 +851,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
}
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(commitment_signed, 132)));
Self::do_attempt_write_data(&mut descriptor, peer);
continue;
},
Event::SendRevokeAndACK { ref node_id, ref msg } => {
MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
log_trace!(self, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
@ -876,9 +861,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 133)));
Self::do_attempt_write_data(&mut descriptor, peer);
continue;
},
Event::SendShutdown { ref node_id, ref msg } => {
MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
log_trace!(self, "Handling Shutdown event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
@ -887,9 +871,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38)));
Self::do_attempt_write_data(&mut descriptor, peer);
continue;
},
Event::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() {
let encoded_msg = encode_msg!(msg, 256);
@ -912,9 +895,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
Self::do_attempt_write_data(&mut (*descriptor).clone(), peer);
}
}
continue;
},
Event::BroadcastChannelUpdate { ref msg } => {
MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
log_trace!(self, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id);
if self.message_handler.route_handler.handle_channel_update(msg).is_ok() {
let encoded_msg = encode_msg!(msg, 258);
@ -927,13 +909,11 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
Self::do_attempt_write_data(&mut (*descriptor).clone(), peer);
}
}
continue;
},
Event::PaymentFailureNetworkUpdate { ref update } => {
MessageSendEvent::PaymentFailureNetworkUpdate { ref update } => {
self.message_handler.route_handler.handle_htlc_fail_channel_update(update);
continue;
},
Event::HandleError { ref node_id, ref action } => {
MessageSendEvent::HandleError { ref node_id, ref action } => {
if let Some(ref action) = *action {
match *action {
msgs::ErrorAction::DisconnectPeer { ref msg } => {
@ -955,9 +935,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
self.message_handler.chan_handler.peer_disconnected(&node_id, false);
}
},
msgs::ErrorAction::IgnoreError => {
continue;
},
msgs::ErrorAction::IgnoreError => {},
msgs::ErrorAction::SendErrorMessage { ref msg } => {
log_trace!(self, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
log_pubkey!(node_id),
@ -972,18 +950,10 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
} else {
log_error!(self, "Got no-action HandleError Event in peer_handler for node {}, no such events should ever be generated!", log_pubkey!(node_id));
}
continue;
}
}
upstream_events.push(event);
}
}
let mut pending_events = self.pending_events.lock().unwrap();
for event in upstream_events.drain(..) {
pending_events.push(event);
}
}
/// Indicates that the given socket descriptor's connection is now closed.
@ -1014,15 +984,6 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
}
}
impl<Descriptor: SocketDescriptor> EventsProvider for PeerManager<Descriptor> {
fn get_and_clear_pending_events(&self) -> Vec<Event> {
let mut pending_events = self.pending_events.lock().unwrap();
let mut ret = Vec::new();
mem::swap(&mut ret, &mut *pending_events);
ret
}
}
#[cfg(test)]
mod tests {
use ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor};
@ -1094,7 +1055,7 @@ mod tests {
let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret);
let chan_handler = test_utils::TestChannelMessageHandler::new();
chan_handler.pending_events.lock().unwrap().push(events::Event::HandleError {
chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::HandleError {
node_id: their_id,
action: Some(msgs::ErrorAction::DisconnectPeer { msg: None }),
});

View file

@ -24,7 +24,6 @@ use std::time::Instant;
/// An Event which you should probably take some action in response to.
pub enum Event {
// Events a user will probably have to handle
/// Used to indicate that the client should generate a funding transaction with the given
/// parameters and then call ChannelManager::funding_transaction_generated.
/// Generated in ChannelManager message handling.
@ -97,13 +96,14 @@ pub enum Event {
/// The outputs which you should store as spendable by you.
outputs: Vec<SpendableOutputDescriptor>,
},
}
// Events indicating the network loop should send a message to a peer:
// TODO: Move these into a separate struct and make a top-level enum
/// An event generated by ChannelManager which indicates a message should be sent to a peer (or
/// broadcast to most peers).
/// These events are handled by PeerManager::process_events if you are using a PeerManager.
pub enum MessageSendEvent {
/// Used to indicate that we've initialted a channel open and should send the open_channel
/// message provided to the given peer.
///
/// This event is handled by PeerManager::process_events if you are using a PeerManager.
SendOpenChannel {
/// The node_id of the node which should receive this message
node_id: PublicKey,
@ -111,8 +111,6 @@ pub enum Event {
msg: msgs::OpenChannel,
},
/// Used to indicate that a funding_created message should be sent to the peer with the given node_id.
///
/// This event is handled by PeerManager::process_events if you are using a PeerManager.
SendFundingCreated {
/// The node_id of the node which should receive this message
node_id: PublicKey,
@ -120,8 +118,6 @@ pub enum Event {
msg: msgs::FundingCreated,
},
/// Used to indicate that a funding_locked message should be sent to the peer with the given node_id.
///
/// This event is handled by PeerManager::process_events if you are using a PeerManager.
SendFundingLocked {
/// The node_id of the node which should receive these message(s)
node_id: PublicKey,
@ -132,8 +128,6 @@ pub enum Event {
},
/// Used to indicate that a series of HTLC update messages, as well as a commitment_signed
/// message should be sent to the peer with the given node_id.
///
/// This event is handled by PeerManager::process_events if you are using a PeerManager.
UpdateHTLCs {
/// The node_id of the node which should receive these message(s)
node_id: PublicKey,
@ -141,8 +135,6 @@ pub enum Event {
updates: msgs::CommitmentUpdate,
},
/// Used to indicate that a revoke_and_ack message should be sent to the peer with the given node_id.
///
/// This event is handled by PeerManager::process_events if you are using a PeerManager.
SendRevokeAndACK {
/// The node_id of the node which should receive this message
node_id: PublicKey,
@ -150,8 +142,6 @@ pub enum Event {
msg: msgs::RevokeAndACK,
},
/// Used to indicate that a shutdown message should be sent to the peer with the given node_id.
///
/// This event is handled by PeerManager::process_events if you are using a PeerManager.
SendShutdown {
/// The node_id of the node which should receive this message
node_id: PublicKey,
@ -160,8 +150,6 @@ pub enum Event {
},
/// Used to indicate that a channel_announcement and channel_update should be broadcast to all
/// peers (except the peer with node_id either msg.contents.node_id_1 or msg.contents.node_id_2).
///
/// This event is handled by PeerManager::process_events if you are using a PeerManager.
BroadcastChannelAnnouncement {
/// The channel_announcement which should be sent.
msg: msgs::ChannelAnnouncement,
@ -169,17 +157,11 @@ pub enum Event {
update_msg: msgs::ChannelUpdate,
},
/// Used to indicate that a channel_update should be broadcast to all peers.
///
/// This event is handled by PeerManager::process_events if you are using a PeerManager.
BroadcastChannelUpdate {
/// The channel_update which should be sent.
msg: msgs::ChannelUpdate,
},
//Error handling
/// Broadcast an error downstream to be handled
///
/// This event is handled by PeerManager::process_events if you are using a PeerManager.
HandleError {
/// The node_id of the node which should receive this message
node_id: PublicKey,
@ -188,14 +170,19 @@ pub enum Event {
},
/// When a payment fails we may receive updates back from the hop where it failed. In such
/// cases this event is generated so that we can inform the router of this information.
///
/// This event is handled by PeerManager::process_events if you are using a PeerManager.
PaymentFailureNetworkUpdate {
/// The channel/node update which should be sent to router
update: msgs::HTLCFailChannelUpdate,
}
}
/// A trait indicating an object may generate message send events
pub trait MessageSendEventsProvider {
/// Gets the list of pending events which were generated by previous actions, clearing the list
/// in the process.
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent>;
}
/// A trait indicating an object may generate events
pub trait EventsProvider {
/// Gets the list of pending events which were generated by previous actions, clearing the list

View file

@ -74,7 +74,7 @@ impl chaininterface::BroadcasterInterface for TestBroadcaster {
}
pub struct TestChannelMessageHandler {
pub pending_events: Mutex<Vec<events::Event>>,
pub pending_events: Mutex<Vec<events::MessageSendEvent>>,
}
impl TestChannelMessageHandler {
@ -141,8 +141,8 @@ impl msgs::ChannelMessageHandler for TestChannelMessageHandler {
fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {}
}
impl events::EventsProvider for TestChannelMessageHandler {
fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
impl events::MessageSendEventsProvider for TestChannelMessageHandler {
fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
let mut pending_events = self.pending_events.lock().unwrap();
let mut ret = Vec::new();
mem::swap(&mut ret, &mut *pending_events);