mirror of
https://github.com/lightningdevkit/rust-lightning.git
synced 2025-02-25 07:17:40 +01:00
Separate gossip broadcasts into their own queue in PeerManager
This allows us to better prioritize channel messages over gossip broadcasts and lays groundwork for rate limiting onion messages more simply, since they won't be competing with gossip broadcasts for space in the main message queue.
This commit is contained in:
parent
ab149dc9d5
commit
47e818f198
1 changed files with 37 additions and 17 deletions
|
@ -337,6 +337,9 @@ struct Peer {
|
|||
|
||||
pending_outbound_buffer: LinkedList<Vec<u8>>,
|
||||
pending_outbound_buffer_first_msg_offset: usize,
|
||||
// Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily prioritize
|
||||
// channel messages over them.
|
||||
gossip_broadcast_buffer: LinkedList<Vec<u8>>,
|
||||
awaiting_write_event: bool,
|
||||
|
||||
pending_read_buffer: Vec<u8>,
|
||||
|
@ -389,17 +392,26 @@ impl Peer {
|
|||
self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
|
||||
}
|
||||
|
||||
/// Determines if we should push additional gossip messages onto a peer's outbound buffer for
|
||||
/// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
|
||||
/// been drained.
|
||||
/// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's
|
||||
/// outbound buffer. This is checked every time the peer's buffer may have been drained.
|
||||
fn should_buffer_gossip_backfill(&self) -> bool {
|
||||
self.pending_outbound_buffer.is_empty() &&
|
||||
self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
|
||||
self.pending_outbound_buffer.is_empty() && self.gossip_broadcast_buffer.is_empty()
|
||||
&& self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
|
||||
}
|
||||
|
||||
/// Returns whether this peer's buffer is full and we should drop gossip messages.
|
||||
/// Determines if we should push additional gossip broadcast messages onto a peer's outbound
|
||||
/// buffer. This is checked every time the peer's buffer may have been drained.
|
||||
fn should_buffer_gossip_broadcast(&self) -> bool {
|
||||
self.pending_outbound_buffer.is_empty()
|
||||
&& self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
|
||||
}
|
||||
|
||||
/// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts.
|
||||
fn buffer_full_drop_gossip_broadcast(&self) -> bool {
|
||||
self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ||
|
||||
let total_outbound_buffered =
|
||||
self.gossip_broadcast_buffer.len() + self.pending_outbound_buffer.len();
|
||||
|
||||
total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ||
|
||||
self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
|
||||
}
|
||||
}
|
||||
|
@ -668,6 +680,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
|
||||
pending_outbound_buffer: LinkedList::new(),
|
||||
pending_outbound_buffer_first_msg_offset: 0,
|
||||
gossip_broadcast_buffer: LinkedList::new(),
|
||||
awaiting_write_event: false,
|
||||
|
||||
pending_read_buffer,
|
||||
|
@ -714,6 +727,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
|
||||
pending_outbound_buffer: LinkedList::new(),
|
||||
pending_outbound_buffer_first_msg_offset: 0,
|
||||
gossip_broadcast_buffer: LinkedList::new(),
|
||||
awaiting_write_event: false,
|
||||
|
||||
pending_read_buffer,
|
||||
|
@ -734,6 +748,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
|
||||
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
|
||||
while !peer.awaiting_write_event {
|
||||
if peer.should_buffer_gossip_broadcast() {
|
||||
if let Some(msg) = peer.gossip_broadcast_buffer.pop_front() {
|
||||
peer.pending_outbound_buffer.push_back(msg);
|
||||
}
|
||||
}
|
||||
if peer.should_buffer_gossip_backfill() {
|
||||
match peer.sync_status {
|
||||
InitSyncTracker::NoSyncRequested => {},
|
||||
|
@ -848,12 +867,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
}
|
||||
}
|
||||
|
||||
/// Append a message to a peer's pending outbound/write buffer
|
||||
fn enqueue_encoded_message(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
|
||||
peer.msgs_sent_since_pong += 1;
|
||||
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
|
||||
}
|
||||
|
||||
/// Append a message to a peer's pending outbound/write buffer
|
||||
fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
|
||||
let mut buffer = VecWriter(Vec::with_capacity(2048));
|
||||
|
@ -864,7 +877,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
} else {
|
||||
log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap()))
|
||||
}
|
||||
self.enqueue_encoded_message(peer, &buffer.0);
|
||||
peer.msgs_sent_since_pong += 1;
|
||||
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&buffer.0[..]));
|
||||
}
|
||||
|
||||
/// Append a message to a peer's pending outbound/write gossip broadcast buffer
|
||||
fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
|
||||
peer.msgs_sent_since_pong += 1;
|
||||
peer.gossip_broadcast_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
|
||||
}
|
||||
|
||||
fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
|
||||
|
@ -1333,7 +1353,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
|
||||
continue;
|
||||
}
|
||||
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
|
||||
self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
|
||||
}
|
||||
},
|
||||
wire::Message::NodeAnnouncement(ref msg) => {
|
||||
|
@ -1356,7 +1376,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
|
||||
continue;
|
||||
}
|
||||
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
|
||||
self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
|
||||
}
|
||||
},
|
||||
wire::Message::ChannelUpdate(ref msg) => {
|
||||
|
@ -1376,7 +1396,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
|||
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
|
||||
continue;
|
||||
}
|
||||
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
|
||||
self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
|
||||
}
|
||||
},
|
||||
_ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),
|
||||
|
|
Loading…
Add table
Reference in a new issue