mirror of
https://github.com/lightningdevkit/rust-lightning.git
synced 2025-02-25 07:17:40 +01:00
Process messages from peers in parallel in PeerManager
.
This adds the required locking to process messages from different peers simultaneously in `PeerManager`. Note that channel messages are still processed under a global lock in `ChannelManager`, and most work is still processed under a global lock in gossip message handling, but parallelizing message deserialization and message decryption is somewhat helpful.
This commit is contained in:
parent
6418c9ef0d
commit
7c8b098698
1 changed files with 107 additions and 94 deletions
|
@ -33,7 +33,7 @@ use routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
|
|||
use prelude::*;
|
||||
use io;
|
||||
use alloc::collections::LinkedList;
|
||||
use sync::{Arc, Mutex};
|
||||
use sync::{Arc, Mutex, RwLock};
|
||||
use core::{cmp, hash, fmt, mem};
|
||||
use core::ops::Deref;
|
||||
use core::convert::Infallible;
|
||||
|
@ -376,9 +376,7 @@ impl Peer {
|
|||
}
|
||||
|
||||
struct PeerHolder<Descriptor: SocketDescriptor> {
|
||||
peers: HashMap<Descriptor, Peer>,
|
||||
/// Only add to this set when noise completes:
|
||||
node_id_to_descriptor: HashMap<PublicKey, Descriptor>,
|
||||
peers: HashMap<Descriptor, Mutex<Peer>>,
|
||||
}
|
||||
|
||||
/// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
|
||||
|
@ -425,7 +423,12 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: De
|
|||
L::Target: Logger,
|
||||
CMH::Target: CustomMessageHandler {
|
||||
message_handler: MessageHandler<CM, RM>,
|
||||
peers: Mutex<PeerHolder<Descriptor>>,
|
||||
peers: RwLock<PeerHolder<Descriptor>>,
|
||||
/// Only add to this set when noise completes.
|
||||
/// Locked *after* peers. When an item is removed, it must be removed with the `peers` write
|
||||
/// lock held. Entries may be added with only the `peers` read lock held (though the
|
||||
/// `Descriptor` value must already exist in `peers`).
|
||||
node_id_to_descriptor: Mutex<HashMap<PublicKey, Descriptor>>,
|
||||
our_node_secret: SecretKey,
|
||||
ephemeral_key_midstate: Sha256Engine,
|
||||
custom_message_handler: CMH,
|
||||
|
@ -553,10 +556,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
|
||||
PeerManager {
|
||||
message_handler,
|
||||
peers: Mutex::new(PeerHolder {
|
||||
peers: RwLock::new(PeerHolder {
|
||||
peers: HashMap::new(),
|
||||
node_id_to_descriptor: HashMap::new()
|
||||
}),
|
||||
node_id_to_descriptor: Mutex::new(HashMap::new()),
|
||||
our_node_secret,
|
||||
ephemeral_key_midstate,
|
||||
peer_counter: AtomicCounter::new(),
|
||||
|
@ -571,8 +574,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
/// new_outbound_connection, however entries will only appear once the initial handshake has
|
||||
/// completed and we are sure the remote peer has the private key for the given node_id.
|
||||
pub fn get_peer_node_ids(&self) -> Vec<PublicKey> {
|
||||
let peers = self.peers.lock().unwrap();
|
||||
peers.peers.values().filter_map(|p| {
|
||||
let peers = self.peers.read().unwrap();
|
||||
peers.peers.values().filter_map(|peer_mutex| {
|
||||
let p = peer_mutex.lock().unwrap();
|
||||
if !p.channel_encryptor.is_ready_for_encryption() || p.their_features.is_none() {
|
||||
return None;
|
||||
}
|
||||
|
@ -608,8 +612,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
let res = peer_encryptor.get_act_one().to_vec();
|
||||
let pending_read_buffer = [0; 50].to_vec(); // Noise act two is 50 bytes
|
||||
|
||||
let mut peers = self.peers.lock().unwrap();
|
||||
if peers.peers.insert(descriptor, Peer {
|
||||
let mut peers = self.peers.write().unwrap();
|
||||
if peers.peers.insert(descriptor, Mutex::new(Peer {
|
||||
channel_encryptor: peer_encryptor,
|
||||
their_node_id: None,
|
||||
their_features: None,
|
||||
|
@ -629,7 +633,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
awaiting_pong_timer_tick_intervals: 0,
|
||||
received_message_since_timer_tick: false,
|
||||
sent_gossip_timestamp_filter: false,
|
||||
}).is_some() {
|
||||
})).is_some() {
|
||||
panic!("PeerManager driver duplicated descriptors!");
|
||||
};
|
||||
Ok(res)
|
||||
|
@ -655,8 +659,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
let peer_encryptor = PeerChannelEncryptor::new_inbound(&self.our_node_secret);
|
||||
let pending_read_buffer = [0; 50].to_vec(); // Noise act one is 50 bytes
|
||||
|
||||
let mut peers = self.peers.lock().unwrap();
|
||||
if peers.peers.insert(descriptor, Peer {
|
||||
let mut peers = self.peers.write().unwrap();
|
||||
if peers.peers.insert(descriptor, Mutex::new(Peer {
|
||||
channel_encryptor: peer_encryptor,
|
||||
their_node_id: None,
|
||||
their_features: None,
|
||||
|
@ -676,7 +680,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
awaiting_pong_timer_tick_intervals: 0,
|
||||
received_message_since_timer_tick: false,
|
||||
sent_gossip_timestamp_filter: false,
|
||||
}).is_some() {
|
||||
})).is_some() {
|
||||
panic!("PeerManager driver duplicated descriptors!");
|
||||
};
|
||||
Ok(())
|
||||
|
@ -766,17 +770,18 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
/// [`send_data`]: SocketDescriptor::send_data
|
||||
/// [`write_buffer_space_avail`]: PeerManager::write_buffer_space_avail
|
||||
pub fn write_buffer_space_avail(&self, descriptor: &mut Descriptor) -> Result<(), PeerHandleError> {
|
||||
let mut peers = self.peers.lock().unwrap();
|
||||
match peers.peers.get_mut(descriptor) {
|
||||
let peers = self.peers.read().unwrap();
|
||||
match peers.peers.get(descriptor) {
|
||||
None => {
|
||||
// This is most likely a simple race condition where the user found that the socket
|
||||
// was writeable, then we told the user to `disconnect_socket()`, then they called
|
||||
// this method. Return an error to make sure we get disconnected.
|
||||
return Err(PeerHandleError { no_connection_possible: false });
|
||||
},
|
||||
Some(peer) => {
|
||||
Some(peer_mutex) => {
|
||||
let mut peer = peer_mutex.lock().unwrap();
|
||||
peer.awaiting_write_event = false;
|
||||
self.do_attempt_write_data(descriptor, peer);
|
||||
self.do_attempt_write_data(descriptor, &mut peer);
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
|
@ -828,18 +833,20 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
|
||||
fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
|
||||
let pause_read = {
|
||||
let mut peers_lock = self.peers.lock().unwrap();
|
||||
let peers = &mut *peers_lock;
|
||||
let peers = self.peers.read().unwrap();
|
||||
let mut msgs_to_forward = Vec::new();
|
||||
let mut peer_node_id = None;
|
||||
let pause_read = match peers.peers.get_mut(peer_descriptor) {
|
||||
let pause_read = match peers.peers.get(peer_descriptor) {
|
||||
None => {
|
||||
// This is most likely a simple race condition where the user read some bytes
|
||||
// from the socket, then we told the user to `disconnect_socket()`, then they
|
||||
// called this method. Return an error to make sure we get disconnected.
|
||||
return Err(PeerHandleError { no_connection_possible: false });
|
||||
},
|
||||
Some(peer) => {
|
||||
Some(peer_mutex) => {
|
||||
let mut peer_lock = peer_mutex.lock().unwrap();
|
||||
let peer = &mut *peer_lock;
|
||||
|
||||
assert!(peer.pending_read_buffer.len() > 0);
|
||||
assert!(peer.pending_read_buffer.len() > peer.pending_read_buffer_pos);
|
||||
|
||||
|
@ -893,7 +900,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
|
||||
macro_rules! insert_node_id {
|
||||
() => {
|
||||
match peers.node_id_to_descriptor.entry(peer.their_node_id.unwrap()) {
|
||||
match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap()) {
|
||||
hash_map::Entry::Occupied(_) => {
|
||||
log_trace!(self.logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap()));
|
||||
peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event
|
||||
|
@ -1023,7 +1030,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
};
|
||||
|
||||
for msg in msgs_to_forward.drain(..) {
|
||||
self.forward_broadcast_msg(peers, &msg, peer_node_id.as_ref());
|
||||
self.forward_broadcast_msg(&*peers, &msg, peer_node_id.as_ref());
|
||||
}
|
||||
|
||||
pause_read
|
||||
|
@ -1242,13 +1249,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
Ok(should_forward)
|
||||
}
|
||||
|
||||
fn forward_broadcast_msg(&self, peers: &mut PeerHolder<Descriptor>, msg: &wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) {
|
||||
fn forward_broadcast_msg(&self, peers: &PeerHolder<Descriptor>, msg: &wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) {
|
||||
match msg {
|
||||
wire::Message::ChannelAnnouncement(ref msg) => {
|
||||
log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}", except_node, msg);
|
||||
let encoded_msg = encode_msg!(msg);
|
||||
|
||||
for (_, peer) in peers.peers.iter_mut() {
|
||||
for (_, peer_mutex) in peers.peers.iter() {
|
||||
let mut peer = peer_mutex.lock().unwrap();
|
||||
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
|
||||
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
|
||||
continue
|
||||
|
@ -1266,14 +1274,15 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
|
||||
continue;
|
||||
}
|
||||
self.enqueue_encoded_message(peer, &encoded_msg);
|
||||
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
|
||||
}
|
||||
},
|
||||
wire::Message::NodeAnnouncement(ref msg) => {
|
||||
log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced node: {:?}", except_node, msg);
|
||||
let encoded_msg = encode_msg!(msg);
|
||||
|
||||
for (_, peer) in peers.peers.iter_mut() {
|
||||
for (_, peer_mutex) in peers.peers.iter() {
|
||||
let mut peer = peer_mutex.lock().unwrap();
|
||||
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
|
||||
!peer.should_forward_node_announcement(msg.contents.node_id) {
|
||||
continue
|
||||
|
@ -1290,14 +1299,15 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
|
||||
continue;
|
||||
}
|
||||
self.enqueue_encoded_message(peer, &encoded_msg);
|
||||
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
|
||||
}
|
||||
},
|
||||
wire::Message::ChannelUpdate(ref msg) => {
|
||||
log_gossip!(self.logger, "Sending message to all peers except {:?}: {:?}", except_node, msg);
|
||||
let encoded_msg = encode_msg!(msg);
|
||||
|
||||
for (_, peer) in peers.peers.iter_mut() {
|
||||
for (_, peer_mutex) in peers.peers.iter() {
|
||||
let mut peer = peer_mutex.lock().unwrap();
|
||||
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
|
||||
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
|
||||
continue
|
||||
|
@ -1311,7 +1321,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
|
||||
continue;
|
||||
}
|
||||
self.enqueue_encoded_message(peer, &encoded_msg);
|
||||
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
|
||||
}
|
||||
},
|
||||
_ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),
|
||||
|
@ -1337,20 +1347,21 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
// buffer by doing things like announcing channels on another node. We should be willing to
|
||||
// drop optional-ish messages when send buffers get full!
|
||||
|
||||
let mut peers_lock = self.peers.lock().unwrap();
|
||||
let mut peers_lock = self.peers.write().unwrap();
|
||||
let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
|
||||
events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
|
||||
let peers = &mut *peers_lock;
|
||||
macro_rules! get_peer_for_forwarding {
|
||||
($node_id: expr) => {
|
||||
{
|
||||
match peers.node_id_to_descriptor.get($node_id) {
|
||||
match self.node_id_to_descriptor.lock().unwrap().get($node_id) {
|
||||
Some(descriptor) => match peers.peers.get_mut(&descriptor) {
|
||||
Some(peer) => {
|
||||
if peer.their_features.is_none() {
|
||||
Some(peer_mutex) => {
|
||||
let peer_lock = peer_mutex.lock().unwrap();
|
||||
if peer_lock.their_features.is_none() {
|
||||
continue;
|
||||
}
|
||||
peer
|
||||
peer_lock
|
||||
},
|
||||
None => panic!("Inconsistent peers set state!"),
|
||||
},
|
||||
|
@ -1367,13 +1378,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
log_debug!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
|
||||
log_pubkey!(node_id),
|
||||
log_bytes!(msg.temporary_channel_id));
|
||||
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
|
||||
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
||||
},
|
||||
MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
|
||||
log_debug!(self.logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
|
||||
log_pubkey!(node_id),
|
||||
log_bytes!(msg.temporary_channel_id));
|
||||
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
|
||||
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
||||
},
|
||||
MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
|
||||
log_debug!(self.logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
|
||||
|
@ -1382,25 +1393,25 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
log_funding_channel_id!(msg.funding_txid, msg.funding_output_index));
|
||||
// TODO: If the peer is gone we should generate a DiscardFunding event
|
||||
// indicating to the wallet that they should just throw away this funding transaction
|
||||
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
|
||||
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
||||
},
|
||||
MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
|
||||
log_debug!(self.logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
|
||||
log_pubkey!(node_id),
|
||||
log_bytes!(msg.channel_id));
|
||||
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
|
||||
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
||||
},
|
||||
MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => {
|
||||
log_debug!(self.logger, "Handling SendFundingLocked event in peer_handler for node {} for channel {}",
|
||||
log_pubkey!(node_id),
|
||||
log_bytes!(msg.channel_id));
|
||||
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
|
||||
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
||||
},
|
||||
MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
|
||||
log_debug!(self.logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
|
||||
log_pubkey!(node_id),
|
||||
log_bytes!(msg.channel_id));
|
||||
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
|
||||
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
||||
},
|
||||
MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
|
||||
log_debug!(self.logger, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
|
||||
|
@ -1409,47 +1420,47 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
update_fulfill_htlcs.len(),
|
||||
update_fail_htlcs.len(),
|
||||
log_bytes!(commitment_signed.channel_id));
|
||||
let peer = get_peer_for_forwarding!(node_id);
|
||||
let mut peer = get_peer_for_forwarding!(node_id);
|
||||
for msg in update_add_htlcs {
|
||||
self.enqueue_message(peer, msg);
|
||||
self.enqueue_message(&mut *peer, msg);
|
||||
}
|
||||
for msg in update_fulfill_htlcs {
|
||||
self.enqueue_message(peer, msg);
|
||||
self.enqueue_message(&mut *peer, msg);
|
||||
}
|
||||
for msg in update_fail_htlcs {
|
||||
self.enqueue_message(peer, msg);
|
||||
self.enqueue_message(&mut *peer, msg);
|
||||
}
|
||||
for msg in update_fail_malformed_htlcs {
|
||||
self.enqueue_message(peer, msg);
|
||||
self.enqueue_message(&mut *peer, msg);
|
||||
}
|
||||
if let &Some(ref msg) = update_fee {
|
||||
self.enqueue_message(peer, msg);
|
||||
self.enqueue_message(&mut *peer, msg);
|
||||
}
|
||||
self.enqueue_message(peer, commitment_signed);
|
||||
self.enqueue_message(&mut *peer, commitment_signed);
|
||||
},
|
||||
MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
|
||||
log_debug!(self.logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
|
||||
log_pubkey!(node_id),
|
||||
log_bytes!(msg.channel_id));
|
||||
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
|
||||
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
||||
},
|
||||
MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
|
||||
log_debug!(self.logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
|
||||
log_pubkey!(node_id),
|
||||
log_bytes!(msg.channel_id));
|
||||
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
|
||||
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
||||
},
|
||||
MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
|
||||
log_debug!(self.logger, "Handling Shutdown event in peer_handler for node {} for channel {}",
|
||||
log_pubkey!(node_id),
|
||||
log_bytes!(msg.channel_id));
|
||||
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
|
||||
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
||||
},
|
||||
MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
|
||||
log_debug!(self.logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
|
||||
log_pubkey!(node_id),
|
||||
log_bytes!(msg.channel_id));
|
||||
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
|
||||
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
||||
},
|
||||
MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => {
|
||||
log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
|
||||
|
@ -1483,22 +1494,26 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => {
|
||||
log_trace!(self.logger, "Handling SendChannelUpdate event in peer_handler for node {} for channel {}",
|
||||
log_pubkey!(node_id), msg.contents.short_channel_id);
|
||||
let peer = get_peer_for_forwarding!(node_id);
|
||||
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
|
||||
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
||||
},
|
||||
MessageSendEvent::HandleError { ref node_id, ref action } => {
|
||||
match *action {
|
||||
msgs::ErrorAction::DisconnectPeer { ref msg } => {
|
||||
if let Some(mut descriptor) = peers.node_id_to_descriptor.remove(node_id) {
|
||||
if let Some(mut peer) = peers.peers.remove(&descriptor) {
|
||||
// Note that since we are holding the peers *write* lock we can
|
||||
// remove from node_id_to_descriptor immediately (as no other
|
||||
// thread can be holding the peer lock if we have the global write
|
||||
// lock).
|
||||
if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(node_id) {
|
||||
if let Some(peer_mutex) = peers.peers.remove(&descriptor) {
|
||||
if let Some(ref msg) = *msg {
|
||||
log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
|
||||
log_pubkey!(node_id),
|
||||
msg.data);
|
||||
self.enqueue_message(&mut peer, msg);
|
||||
let mut peer = peer_mutex.lock().unwrap();
|
||||
self.enqueue_message(&mut *peer, msg);
|
||||
// This isn't guaranteed to work, but if there is enough free
|
||||
// room in the send buffer, put the error message there...
|
||||
self.do_attempt_write_data(&mut descriptor, &mut peer);
|
||||
self.do_attempt_write_data(&mut descriptor, &mut *peer);
|
||||
} else {
|
||||
log_gossip!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id));
|
||||
}
|
||||
|
@ -1518,21 +1533,21 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
log_trace!(self.logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
|
||||
log_pubkey!(node_id),
|
||||
msg.data);
|
||||
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
|
||||
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
||||
},
|
||||
msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => {
|
||||
log_given_level!(self.logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}",
|
||||
log_pubkey!(node_id),
|
||||
msg.data);
|
||||
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
|
||||
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
||||
},
|
||||
}
|
||||
},
|
||||
MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => {
|
||||
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
|
||||
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
||||
},
|
||||
MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => {
|
||||
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
|
||||
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
||||
}
|
||||
MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => {
|
||||
log_gossip!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
|
||||
|
@ -1541,20 +1556,20 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
msg.first_blocknum,
|
||||
msg.number_of_blocks,
|
||||
msg.sync_complete);
|
||||
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
|
||||
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
||||
}
|
||||
MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => {
|
||||
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
|
||||
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (node_id, msg) in self.custom_message_handler.get_and_clear_pending_msg() {
|
||||
self.enqueue_message(get_peer_for_forwarding!(&node_id), &msg);
|
||||
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg);
|
||||
}
|
||||
|
||||
for (descriptor, peer) in peers.peers.iter_mut() {
|
||||
self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
|
||||
for (descriptor, peer_mutex) in peers.peers.iter_mut() {
|
||||
self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1565,7 +1580,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
}
|
||||
|
||||
fn disconnect_event_internal(&self, descriptor: &Descriptor, no_connection_possible: bool) {
|
||||
let mut peers = self.peers.lock().unwrap();
|
||||
let mut peers = self.peers.write().unwrap();
|
||||
let peer_option = peers.peers.remove(descriptor);
|
||||
match peer_option {
|
||||
None => {
|
||||
|
@ -1573,13 +1588,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
// was disconnected, then we told the user to `disconnect_socket()`, then they
|
||||
// called this method. Either way we're disconnected, return.
|
||||
},
|
||||
Some(peer) => {
|
||||
Some(peer_lock) => {
|
||||
let peer = peer_lock.lock().unwrap();
|
||||
match peer.their_node_id {
|
||||
Some(node_id) => {
|
||||
log_trace!(self.logger,
|
||||
"Handling disconnection of peer {}, with {}future connection to the peer possible.",
|
||||
log_pubkey!(node_id), if no_connection_possible { "no " } else { "" });
|
||||
peers.node_id_to_descriptor.remove(&node_id);
|
||||
self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
|
||||
self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible);
|
||||
},
|
||||
None => {}
|
||||
|
@ -1598,8 +1614,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
///
|
||||
/// [`disconnect_socket`]: SocketDescriptor::disconnect_socket
|
||||
pub fn disconnect_by_node_id(&self, node_id: PublicKey, no_connection_possible: bool) {
|
||||
let mut peers_lock = self.peers.lock().unwrap();
|
||||
if let Some(mut descriptor) = peers_lock.node_id_to_descriptor.remove(&node_id) {
|
||||
let mut peers_lock = self.peers.write().unwrap();
|
||||
if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(&node_id) {
|
||||
log_trace!(self.logger, "Disconnecting peer with id {} due to client request", node_id);
|
||||
peers_lock.peers.remove(&descriptor);
|
||||
self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible);
|
||||
|
@ -1611,17 +1627,16 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
/// an indication that TCP sockets have stalled even if we weren't around to time them out
|
||||
/// using regular ping/pongs.
|
||||
pub fn disconnect_all_peers(&self) {
|
||||
let mut peers_lock = self.peers.lock().unwrap();
|
||||
let mut peers_lock = self.peers.write().unwrap();
|
||||
self.node_id_to_descriptor.lock().unwrap().clear();
|
||||
let peers = &mut *peers_lock;
|
||||
for (mut descriptor, peer) in peers.peers.drain() {
|
||||
if let Some(node_id) = peer.their_node_id {
|
||||
if let Some(node_id) = peer.lock().unwrap().their_node_id {
|
||||
log_trace!(self.logger, "Disconnecting peer with id {} due to client request to disconnect all peers", node_id);
|
||||
peers.node_id_to_descriptor.remove(&node_id);
|
||||
self.message_handler.chan_handler.peer_disconnected(&node_id, false);
|
||||
}
|
||||
descriptor.disconnect_socket();
|
||||
}
|
||||
debug_assert!(peers.node_id_to_descriptor.is_empty());
|
||||
}
|
||||
|
||||
/// This is called when we're blocked on sending additional gossip messages until we receive a
|
||||
|
@ -1650,15 +1665,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
///
|
||||
/// [`send_data`]: SocketDescriptor::send_data
|
||||
pub fn timer_tick_occurred(&self) {
|
||||
let mut peers_lock = self.peers.lock().unwrap();
|
||||
let mut peers_lock = self.peers.write().unwrap();
|
||||
{
|
||||
let peers = &mut *peers_lock;
|
||||
let node_id_to_descriptor = &mut peers.node_id_to_descriptor;
|
||||
let peers = &mut peers.peers;
|
||||
let mut descriptors_needing_disconnect = Vec::new();
|
||||
let peer_count = peers.len();
|
||||
let peer_count = peers_lock.peers.len();
|
||||
|
||||
peers.retain(|descriptor, peer| {
|
||||
peers_lock.peers.retain(|descriptor, peer_mutex| {
|
||||
let mut peer = peer_mutex.lock().unwrap();
|
||||
let mut do_disconnect_peer = false;
|
||||
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_node_id.is_none() {
|
||||
// The peer needs to complete its handshake before we can exchange messages. We
|
||||
|
@ -1689,7 +1702,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
match peer.their_node_id {
|
||||
Some(node_id) => {
|
||||
log_trace!(self.logger, "Disconnecting peer with id {} due to ping timeout", node_id);
|
||||
node_id_to_descriptor.remove(&node_id);
|
||||
self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
|
||||
self.message_handler.chan_handler.peer_disconnected(&node_id, false);
|
||||
}
|
||||
None => {},
|
||||
|
@ -1708,7 +1721,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
ponglen: 0,
|
||||
byteslen: 64,
|
||||
};
|
||||
self.enqueue_message(peer, &ping);
|
||||
self.enqueue_message(&mut *peer, &ping);
|
||||
self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer);
|
||||
|
||||
true
|
||||
|
@ -1834,7 +1847,7 @@ mod tests {
|
|||
let chan_handler = test_utils::TestChannelMessageHandler::new();
|
||||
let mut peers = create_network(2, &cfgs);
|
||||
establish_connection(&peers[0], &peers[1]);
|
||||
assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
|
||||
assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1);
|
||||
|
||||
let secp_ctx = Secp256k1::new();
|
||||
let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret);
|
||||
|
@ -1847,7 +1860,7 @@ mod tests {
|
|||
peers[0].message_handler.chan_handler = &chan_handler;
|
||||
|
||||
peers[0].process_events();
|
||||
assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0);
|
||||
assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -1856,17 +1869,17 @@ mod tests {
|
|||
let cfgs = create_peermgr_cfgs(2);
|
||||
let peers = create_network(2, &cfgs);
|
||||
establish_connection(&peers[0], &peers[1]);
|
||||
assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
|
||||
assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1);
|
||||
|
||||
// peers[0] awaiting_pong is set to true, but the Peer is still connected
|
||||
peers[0].timer_tick_occurred();
|
||||
peers[0].process_events();
|
||||
assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
|
||||
assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1);
|
||||
|
||||
// Since timer_tick_occurred() is called again when awaiting_pong is true, all Peers are disconnected
|
||||
peers[0].timer_tick_occurred();
|
||||
peers[0].process_events();
|
||||
assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0);
|
||||
assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -1928,9 +1941,9 @@ mod tests {
|
|||
peers[0].new_inbound_connection(fd_a.clone(), None).unwrap();
|
||||
|
||||
// If we get a single timer tick before completion, that's fine
|
||||
assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
|
||||
assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1);
|
||||
peers[0].timer_tick_occurred();
|
||||
assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
|
||||
assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1);
|
||||
|
||||
assert_eq!(peers[0].read_event(&mut fd_a, &initial_data).unwrap(), false);
|
||||
peers[0].process_events();
|
||||
|
@ -1939,7 +1952,7 @@ mod tests {
|
|||
|
||||
// ...but if we get a second timer tick, we should disconnect the peer
|
||||
peers[0].timer_tick_occurred();
|
||||
assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0);
|
||||
assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0);
|
||||
|
||||
assert!(peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).is_err());
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue