mirror of
https://github.com/lightningdevkit/rust-lightning.git
synced 2025-03-15 15:39:09 +01:00
Give peers which are sending us messages longer to respond to ping
See comment for rationale.
This commit is contained in:
parent
3f9a7de188
commit
ed4a39fe1e
1 changed files with 100 additions and 20 deletions
|
@ -298,6 +298,28 @@ const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10;
|
|||
/// the peer.
|
||||
const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO;
|
||||
|
||||
/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through
|
||||
/// the socket receive buffer before receiving the ping.
|
||||
///
|
||||
/// On a fairly old Arm64 board, with Linux defaults, this can take as long as 20 seconds, not
|
||||
/// including any network delays, outbound traffic, or the same for messages from other peers.
|
||||
///
|
||||
/// Thus, to avoid needlessly disconnecting a peer, we allow a peer to take this many timer ticks
|
||||
/// per connected peer to respond to a ping, as long as they send us at least one message during
|
||||
/// each tick, ensuring we aren't actually just disconnected.
|
||||
/// With a timer tick interval of five seconds, this translates to about 30 seconds per connected
|
||||
/// peer.
|
||||
///
|
||||
/// When we improve parallelism somewhat we should reduce this to e.g. this many timer ticks per
|
||||
/// two connected peers, assuming most LDK-running systems have at least two cores.
|
||||
const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 6;
|
||||
|
||||
/// This is the minimum number of messages we expect a peer to be able to handle within one timer
|
||||
/// tick. Once we have sent this many messages since the last ping, we send a ping right away to
|
||||
/// ensures we don't just fill up our send buffer and leave the peer with too many messages to
|
||||
/// process before the next ping.
|
||||
const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32;
|
||||
|
||||
struct Peer {
|
||||
channel_encryptor: PeerChannelEncryptor,
|
||||
their_node_id: Option<PublicKey>,
|
||||
|
@ -313,7 +335,9 @@ struct Peer {
|
|||
|
||||
sync_status: InitSyncTracker,
|
||||
|
||||
awaiting_pong: bool,
|
||||
msgs_sent_since_pong: usize,
|
||||
awaiting_pong_timer_tick_intervals: i8,
|
||||
received_message_since_timer_tick: bool,
|
||||
}
|
||||
|
||||
impl Peer {
|
||||
|
@ -555,7 +579,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
|
||||
sync_status: InitSyncTracker::NoSyncRequested,
|
||||
|
||||
awaiting_pong: false,
|
||||
msgs_sent_since_pong: 0,
|
||||
awaiting_pong_timer_tick_intervals: 0,
|
||||
received_message_since_timer_tick: false,
|
||||
}).is_some() {
|
||||
panic!("PeerManager driver duplicated descriptors!");
|
||||
};
|
||||
|
@ -593,7 +619,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
|
||||
sync_status: InitSyncTracker::NoSyncRequested,
|
||||
|
||||
awaiting_pong: false,
|
||||
msgs_sent_since_pong: 0,
|
||||
awaiting_pong_timer_tick_intervals: 0,
|
||||
received_message_since_timer_tick: false,
|
||||
}).is_some() {
|
||||
panic!("PeerManager driver duplicated descriptors!");
|
||||
};
|
||||
|
@ -602,7 +630,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
|
||||
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
|
||||
while !peer.awaiting_write_event {
|
||||
if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE {
|
||||
if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && peer.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK {
|
||||
match peer.sync_status {
|
||||
InitSyncTracker::NoSyncRequested => {},
|
||||
InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
|
||||
|
@ -647,6 +675,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
},
|
||||
}
|
||||
}
|
||||
if peer.msgs_sent_since_pong >= BUFFER_DRAIN_MSGS_PER_TICK {
|
||||
self.maybe_send_extra_ping(peer);
|
||||
}
|
||||
|
||||
if {
|
||||
let next_buff = match peer.pending_outbound_buffer.front() {
|
||||
|
@ -724,6 +755,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
|
||||
/// Append a message to a peer's pending outbound/write buffer
|
||||
fn enqueue_encoded_message(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
|
||||
peer.msgs_sent_since_pong += 1;
|
||||
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
|
||||
}
|
||||
|
||||
|
@ -926,6 +958,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
message: wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>
|
||||
) -> Result<Option<wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> {
|
||||
log_trace!(self.logger, "Received message {:?} from {}", message, log_pubkey!(peer.their_node_id.unwrap()));
|
||||
peer.received_message_since_timer_tick = true;
|
||||
|
||||
// Need an Init as first message
|
||||
if let wire::Message::Init(_) = message {
|
||||
|
@ -989,7 +1022,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
}
|
||||
},
|
||||
wire::Message::Pong(_msg) => {
|
||||
peer.awaiting_pong = false;
|
||||
peer.awaiting_pong_timer_tick_intervals = 0;
|
||||
peer.msgs_sent_since_pong = 0;
|
||||
},
|
||||
|
||||
// Channel messages:
|
||||
|
@ -1110,7 +1144,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
|
||||
continue
|
||||
}
|
||||
if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
|
||||
if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
|
||||
|| peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
|
||||
{
|
||||
log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
|
||||
continue;
|
||||
}
|
||||
|
@ -1133,7 +1169,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
!peer.should_forward_node_announcement(msg.contents.node_id) {
|
||||
continue
|
||||
}
|
||||
if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
|
||||
if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
|
||||
|| peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
|
||||
{
|
||||
log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
|
||||
continue;
|
||||
}
|
||||
|
@ -1155,7 +1193,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
|
||||
continue
|
||||
}
|
||||
if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
|
||||
if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
|
||||
|| peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
|
||||
{
|
||||
log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
|
||||
continue;
|
||||
}
|
||||
|
@ -1455,6 +1495,20 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
debug_assert!(peers.node_id_to_descriptor.is_empty());
|
||||
}
|
||||
|
||||
/// This is called when we're blocked on sending additional gossip messages until we receive a
|
||||
/// pong. If we aren't waiting on a pong, we take this opportunity to send a ping (setting
|
||||
/// `awaiting_pong_timer_tick_intervals` to a special flag value to indicate this).
|
||||
fn maybe_send_extra_ping(&self, peer: &mut Peer) {
|
||||
if peer.awaiting_pong_timer_tick_intervals == 0 {
|
||||
peer.awaiting_pong_timer_tick_intervals = -1;
|
||||
let ping = msgs::Ping {
|
||||
ponglen: 0,
|
||||
byteslen: 64,
|
||||
};
|
||||
self.enqueue_message(peer, &ping);
|
||||
}
|
||||
}
|
||||
|
||||
/// Send pings to each peer and disconnect those which did not respond to the last round of
|
||||
/// pings.
|
||||
///
|
||||
|
@ -1473,9 +1527,18 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
let node_id_to_descriptor = &mut peers.node_id_to_descriptor;
|
||||
let peers = &mut peers.peers;
|
||||
let mut descriptors_needing_disconnect = Vec::new();
|
||||
let peer_count = peers.len();
|
||||
|
||||
peers.retain(|descriptor, peer| {
|
||||
if peer.awaiting_pong {
|
||||
if !peer.channel_encryptor.is_ready_for_encryption() {
|
||||
// The peer needs to complete its handshake before we can exchange messages
|
||||
return true;
|
||||
}
|
||||
|
||||
if (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick)
|
||||
|| peer.awaiting_pong_timer_tick_intervals as u64 >
|
||||
MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peer_count as u64
|
||||
{
|
||||
descriptors_needing_disconnect.push(descriptor.clone());
|
||||
match peer.their_node_id {
|
||||
Some(node_id) => {
|
||||
|
@ -1492,21 +1555,26 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
return false;
|
||||
}
|
||||
|
||||
if !peer.channel_encryptor.is_ready_for_encryption() {
|
||||
// The peer needs to complete its handshake before we can exchange messages
|
||||
peer.received_message_since_timer_tick = false;
|
||||
if peer.awaiting_pong_timer_tick_intervals == -1 {
|
||||
// Magic value set in `maybe_send_extra_ping`.
|
||||
peer.awaiting_pong_timer_tick_intervals = 1;
|
||||
return true;
|
||||
}
|
||||
|
||||
if peer.awaiting_pong_timer_tick_intervals > 0 {
|
||||
peer.awaiting_pong_timer_tick_intervals += 1;
|
||||
return true;
|
||||
}
|
||||
|
||||
peer.awaiting_pong_timer_tick_intervals = 1;
|
||||
let ping = msgs::Ping {
|
||||
ponglen: 0,
|
||||
byteslen: 64,
|
||||
};
|
||||
self.enqueue_message(peer, &ping);
|
||||
self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer);
|
||||
|
||||
let mut descriptor_clone = descriptor.clone();
|
||||
self.do_attempt_write_data(&mut descriptor_clone, peer);
|
||||
|
||||
peer.awaiting_pong = true;
|
||||
true
|
||||
});
|
||||
|
||||
|
@ -1665,11 +1733,23 @@ mod tests {
|
|||
// than can fit into a peer's buffer).
|
||||
let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
|
||||
|
||||
// Make each peer to read the messages that the other peer just wrote to them.
|
||||
peers[0].process_events();
|
||||
peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap();
|
||||
peers[1].process_events();
|
||||
peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap();
|
||||
// Make each peer to read the messages that the other peer just wrote to them. Note that
|
||||
// due to the max-messagse-before-ping limits this may take a few iterations to complete.
|
||||
for _ in 0..150/super::BUFFER_DRAIN_MSGS_PER_TICK + 1 {
|
||||
peers[0].process_events();
|
||||
let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0);
|
||||
assert!(!b_read_data.is_empty());
|
||||
|
||||
peers[1].read_event(&mut fd_b, &b_read_data).unwrap();
|
||||
peers[1].process_events();
|
||||
|
||||
let a_read_data = fd_b.outbound_data.lock().unwrap().split_off(0);
|
||||
assert!(!a_read_data.is_empty());
|
||||
peers[0].read_event(&mut fd_a, &a_read_data).unwrap();
|
||||
|
||||
peers[1].process_events();
|
||||
assert_eq!(fd_b.outbound_data.lock().unwrap().len(), 0, "Until B receives data, it shouldn't send more messages");
|
||||
}
|
||||
|
||||
// Check that each peer has received the expected number of channel updates and channel
|
||||
// announcements.
|
||||
|
|
Loading…
Add table
Reference in a new issue