Avoid the peers write lock unless we need it in timer_tick_occurred

Similar to the previous commit, this avoids "blocking the world" on
every timer tick unless we need to disconnect peers.
This commit is contained in:
Matt Corallo 2021-10-06 06:10:01 +00:00
parent a5adda18dc
commit 4f50a94a3f

View file

@ -1720,55 +1720,44 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
///
/// [`send_data`]: SocketDescriptor::send_data
pub fn timer_tick_occurred(&self) {
let mut peers_lock = self.peers.write().unwrap();
let mut descriptors_needing_disconnect = Vec::new();
{
let mut descriptors_needing_disconnect = Vec::new();
let peer_count = peers_lock.peers.len();
let peers_lock = self.peers.read().unwrap();
peers_lock.peers.retain(|descriptor, peer_mutex| {
for (descriptor, peer_mutex) in peers_lock.peers.iter() {
let mut peer = peer_mutex.lock().unwrap();
let mut do_disconnect_peer = false;
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_node_id.is_none() {
// The peer needs to complete its handshake before we can exchange messages. We
// give peers one timer tick to complete handshake, reusing
// `awaiting_pong_timer_tick_intervals` to track number of timer ticks taken
// for handshake completion.
if peer.awaiting_pong_timer_tick_intervals != 0 {
do_disconnect_peer = true;
descriptors_needing_disconnect.push(descriptor.clone());
} else {
peer.awaiting_pong_timer_tick_intervals = 1;
return true;
}
continue;
}
if peer.awaiting_pong_timer_tick_intervals == -1 {
// Magic value set in `maybe_send_extra_ping`.
peer.awaiting_pong_timer_tick_intervals = 1;
peer.received_message_since_timer_tick = false;
return true;
continue;
}
if do_disconnect_peer
|| (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick)
if (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick)
|| peer.awaiting_pong_timer_tick_intervals as u64 >
MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peer_count as u64
MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.peers.len() as u64
{
descriptors_needing_disconnect.push(descriptor.clone());
match peer.their_node_id {
Some(node_id) => {
log_trace!(self.logger, "Disconnecting peer with id {} due to ping timeout", node_id);
self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
self.message_handler.chan_handler.peer_disconnected(&node_id, false);
}
None => {},
}
return false;
continue;
}
peer.received_message_since_timer_tick = false;
if peer.awaiting_pong_timer_tick_intervals > 0 {
peer.awaiting_pong_timer_tick_intervals += 1;
return true;
continue;
}
peer.awaiting_pong_timer_tick_intervals = 1;
@ -1778,9 +1767,22 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
};
self.enqueue_message(&mut *peer, &ping);
self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer);
}
}
true
});
if !descriptors_needing_disconnect.is_empty() {
{
let mut peers_lock = self.peers.write().unwrap();
for descriptor in descriptors_needing_disconnect.iter() {
if let Some(peer) = peers_lock.peers.remove(&descriptor) {
if let Some(node_id) = peer.lock().unwrap().their_node_id {
log_trace!(self.logger, "Disconnecting peer with id {} due to ping timeout", node_id);
self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
self.message_handler.chan_handler.peer_disconnected(&node_id, false);
}
}
}
}
for mut descriptor in descriptors_needing_disconnect.drain(..) {
descriptor.disconnect_socket();