Backfill gossip without buffering directly in LDK

Instead of backfilling gossip by buffering (up to) ten messages at
a time, only buffer one message at a time, as the peers' outbound
socket buffer drains. This moves the outbound backfill messages out
of `PeerHandler` and into the operating system buffer, where it
arguably belongs.

Not buffering causes us to walk the gossip B-Trees somewhat more
often, but avoids allocating vecs for the responses. While its
probably (without having benchmarked it) a net performance loss, it
simplifies buffer tracking and leaves us with more room to play
with the buffer sizing constants as we add onion message forwarding
which is an important win.

Note that because we change how often we check if we're out of
messages to send before pinging, we slightly change how many
messages are exchanged at once, impacting the
`test_do_attempt_write_data` constants.
This commit is contained in:
Matt Corallo 2022-08-09 21:26:16 +00:00
parent 4f6da92c4e
commit 7717fa23a8
6 changed files with 84 additions and 109 deletions

View file

@ -562,8 +562,8 @@ mod tests {
fn handle_node_announcement(&self, _msg: &NodeAnnouncement) -> Result<bool, LightningError> { Ok(false) }
fn handle_channel_announcement(&self, _msg: &ChannelAnnouncement) -> Result<bool, LightningError> { Ok(false) }
fn handle_channel_update(&self, _msg: &ChannelUpdate) -> Result<bool, LightningError> { Ok(false) }
fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> { Vec::new() }
fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<NodeAnnouncement> { Vec::new() }
fn get_next_channel_announcement(&self, _starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> { None }
fn get_next_node_announcement(&self, _starting_point: Option<&PublicKey>) -> Option<NodeAnnouncement> { None }
fn peer_connected(&self, _their_node_id: &PublicKey, _init_msg: &Init) { }
fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }

View file

@ -318,20 +318,20 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> {
);
let mut chan_progress = 0;
loop {
let orig_announcements = self.gossip_sync.get_next_channel_announcements(chan_progress, 255);
let deserialized_announcements = gossip_sync.get_next_channel_announcements(chan_progress, 255);
let orig_announcements = self.gossip_sync.get_next_channel_announcement(chan_progress);
let deserialized_announcements = gossip_sync.get_next_channel_announcement(chan_progress);
assert!(orig_announcements == deserialized_announcements);
chan_progress = match orig_announcements.last() {
chan_progress = match orig_announcements {
Some(announcement) => announcement.0.contents.short_channel_id + 1,
None => break,
};
}
let mut node_progress = None;
loop {
let orig_announcements = self.gossip_sync.get_next_node_announcements(node_progress.as_ref(), 255);
let deserialized_announcements = gossip_sync.get_next_node_announcements(node_progress.as_ref(), 255);
let orig_announcements = self.gossip_sync.get_next_node_announcement(node_progress.as_ref());
let deserialized_announcements = gossip_sync.get_next_node_announcement(node_progress.as_ref());
assert!(orig_announcements == deserialized_announcements);
node_progress = match orig_announcements.last() {
node_progress = match orig_announcements {
Some(announcement) => Some(announcement.contents.node_id),
None => break,
};

View file

@ -915,15 +915,15 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider {
/// Handle an incoming channel_update message, returning true if it should be forwarded on,
/// false or returning an Err otherwise.
fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError>;
/// Gets a subset of the channel announcements and updates required to dump our routing table
/// to a remote node, starting at the short_channel_id indicated by starting_point and
/// including the batch_amount entries immediately higher in numerical value than starting_point.
fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)>;
/// Gets a subset of the node announcements required to dump our routing table to a remote node,
/// starting at the node *after* the provided publickey and including batch_amount entries
/// immediately higher (as defined by <PublicKey as Ord>::cmp) than starting_point.
/// Gets channel announcements and updates required to dump our routing table to a remote node,
/// starting at the short_channel_id indicated by starting_point and including announcements
/// for a single channel.
fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)>;
/// Gets a node announcement required to dump our routing table to a remote node, starting at
/// the node *after* the provided pubkey and including up to one announcement immediately
/// higher (as defined by <PublicKey as Ord>::cmp) than starting_point.
/// If None is provided for starting_point, we start at the first node.
fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement>;
fn get_next_node_announcement(&self, starting_point: Option<&PublicKey>) -> Option<NodeAnnouncement>;
/// Called when a connection is established with a peer. This can be used to
/// perform routing table synchronization using a strategy defined by the
/// implementor.

View file

@ -67,9 +67,9 @@ impl RoutingMessageHandler for IgnoringMessageHandler {
fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result<bool, LightningError> { Ok(false) }
fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result<bool, LightningError> { Ok(false) }
fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result<bool, LightningError> { Ok(false) }
fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) ->
Vec<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> { Vec::new() }
fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<msgs::NodeAnnouncement> { Vec::new() }
fn get_next_channel_announcement(&self, _starting_point: u64) ->
Option<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> { None }
fn get_next_node_announcement(&self, _starting_point: Option<&PublicKey>) -> Option<msgs::NodeAnnouncement> { None }
fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) {}
fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }
@ -383,19 +383,17 @@ impl Peer {
}
}
/// Returns the number of gossip messages we can fit in this peer's buffer.
fn gossip_buffer_slots_available(&self) -> usize {
OUTBOUND_BUFFER_LIMIT_READ_PAUSE.saturating_sub(self.pending_outbound_buffer.len())
}
/// Returns whether we should be reading bytes from this peer, based on whether its outbound
/// buffer still has space and we don't need to pause reads to get some writes out.
fn should_read(&self) -> bool {
self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
}
fn should_backfill_gossip(&self) -> bool {
self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE &&
/// Determines if we should push additional gossip messages onto a peer's outbound buffer for
/// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
/// been drained.
fn should_buffer_gossip_backfill(&self) -> bool {
self.pending_outbound_buffer.is_empty() &&
self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
}
@ -739,46 +737,39 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
while !peer.awaiting_write_event {
if peer.should_backfill_gossip() {
if peer.should_buffer_gossip_backfill() {
match peer.sync_status {
InitSyncTracker::NoSyncRequested => {},
InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
let steps = ((peer.gossip_buffer_slots_available() + 2) / 3) as u8;
let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps);
for &(ref announce, ref update_a_option, ref update_b_option) in all_messages.iter() {
self.enqueue_message(peer, announce);
if let &Some(ref update_a) = update_a_option {
self.enqueue_message(peer, update_a);
if let Some((announce, update_a_option, update_b_option)) =
self.message_handler.route_handler.get_next_channel_announcement(c)
{
self.enqueue_message(peer, &announce);
if let Some(update_a) = update_a_option {
self.enqueue_message(peer, &update_a);
}
if let &Some(ref update_b) = update_b_option {
self.enqueue_message(peer, update_b);
if let Some(update_b) = update_b_option {
self.enqueue_message(peer, &update_b);
}
peer.sync_status = InitSyncTracker::ChannelsSyncing(announce.contents.short_channel_id + 1);
}
if all_messages.is_empty() || all_messages.len() != steps as usize {
} else {
peer.sync_status = InitSyncTracker::ChannelsSyncing(0xffff_ffff_ffff_ffff);
}
},
InitSyncTracker::ChannelsSyncing(c) if c == 0xffff_ffff_ffff_ffff => {
let steps = peer.gossip_buffer_slots_available() as u8;
let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps);
for msg in all_messages.iter() {
self.enqueue_message(peer, msg);
if let Some(msg) = self.message_handler.route_handler.get_next_node_announcement(None) {
self.enqueue_message(peer, &msg);
peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
}
if all_messages.is_empty() || all_messages.len() != steps as usize {
} else {
peer.sync_status = InitSyncTracker::NoSyncRequested;
}
},
InitSyncTracker::ChannelsSyncing(_) => unreachable!(),
InitSyncTracker::NodesSyncing(key) => {
let steps = peer.gossip_buffer_slots_available() as u8;
let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps);
for msg in all_messages.iter() {
self.enqueue_message(peer, msg);
if let Some(msg) = self.message_handler.route_handler.get_next_node_announcement(Some(&key)) {
self.enqueue_message(peer, &msg);
peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
}
if all_messages.is_empty() || all_messages.len() != steps as usize {
} else {
peer.sync_status = InitSyncTracker::NoSyncRequested;
}
},
@ -2082,10 +2073,10 @@ mod tests {
// Check that each peer has received the expected number of channel updates and channel
// announcements.
assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108);
assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54);
assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108);
assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54);
}
#[test]

View file

@ -318,11 +318,10 @@ where C::Target: chain::Access, L::Target: Logger
Ok(msg.contents.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY)
}
fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
let mut result = Vec::with_capacity(batch_amount as usize);
fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
let channels = self.network_graph.channels.read().unwrap();
let mut iter = channels.range(starting_point..);
while result.len() < batch_amount as usize {
loop {
if let Some((_, ref chan)) = iter.next() {
if chan.announcement_message.is_some() {
let chan_announcement = chan.announcement_message.clone().unwrap();
@ -334,20 +333,18 @@ where C::Target: chain::Access, L::Target: Logger
if let Some(two_to_one) = chan.two_to_one.as_ref() {
two_to_one_announcement = two_to_one.last_update_message.clone();
}
result.push((chan_announcement, one_to_two_announcement, two_to_one_announcement));
return Some((chan_announcement, one_to_two_announcement, two_to_one_announcement));
} else {
// TODO: We may end up sending un-announced channel_updates if we are sending
// initial sync data while receiving announce/updates for this channel.
}
} else {
return result;
return None;
}
}
result
}
fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement> {
let mut result = Vec::with_capacity(batch_amount as usize);
fn get_next_node_announcement(&self, starting_point: Option<&PublicKey>) -> Option<NodeAnnouncement> {
let nodes = self.network_graph.nodes.read().unwrap();
let mut iter = if let Some(pubkey) = starting_point {
let mut iter = nodes.range(NodeId::from_pubkey(pubkey)..);
@ -356,18 +353,17 @@ where C::Target: chain::Access, L::Target: Logger
} else {
nodes.range::<NodeId, _>(..)
};
while result.len() < batch_amount as usize {
loop {
if let Some((_, ref node)) = iter.next() {
if let Some(node_info) = node.announcement_info.as_ref() {
if node_info.announcement_message.is_some() {
result.push(node_info.announcement_message.clone().unwrap());
if let Some(msg) = node_info.announcement_message.clone() {
return Some(msg);
}
}
} else {
return result;
return None;
}
}
result
}
/// Initiates a stateless sync of routing gossip information with a peer
@ -2412,8 +2408,8 @@ mod tests {
let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
// Channels were not announced yet.
let channels_with_announcements = gossip_sync.get_next_channel_announcements(0, 1);
assert_eq!(channels_with_announcements.len(), 0);
let channels_with_announcements = gossip_sync.get_next_channel_announcement(0);
assert!(channels_with_announcements.is_none());
let short_channel_id;
{
@ -2427,17 +2423,15 @@ mod tests {
}
// Contains initial channel announcement now.
let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id, 1);
assert_eq!(channels_with_announcements.len(), 1);
if let Some(channel_announcements) = channels_with_announcements.first() {
let &(_, ref update_1, ref update_2) = channel_announcements;
let channels_with_announcements = gossip_sync.get_next_channel_announcement(short_channel_id);
if let Some(channel_announcements) = channels_with_announcements {
let (_, ref update_1, ref update_2) = channel_announcements;
assert_eq!(update_1, &None);
assert_eq!(update_2, &None);
} else {
panic!();
}
{
// Valid channel update
let valid_channel_update = get_signed_channel_update(|unsigned_channel_update| {
@ -2450,10 +2444,9 @@ mod tests {
}
// Now contains an initial announcement and an update.
let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id, 1);
assert_eq!(channels_with_announcements.len(), 1);
if let Some(channel_announcements) = channels_with_announcements.first() {
let &(_, ref update_1, ref update_2) = channel_announcements;
let channels_with_announcements = gossip_sync.get_next_channel_announcement(short_channel_id);
if let Some(channel_announcements) = channels_with_announcements {
let (_, ref update_1, ref update_2) = channel_announcements;
assert_ne!(update_1, &None);
assert_eq!(update_2, &None);
} else {
@ -2473,10 +2466,9 @@ mod tests {
}
// Test that announcements with excess data won't be returned
let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id, 1);
assert_eq!(channels_with_announcements.len(), 1);
if let Some(channel_announcements) = channels_with_announcements.first() {
let &(_, ref update_1, ref update_2) = channel_announcements;
let channels_with_announcements = gossip_sync.get_next_channel_announcement(short_channel_id);
if let Some(channel_announcements) = channels_with_announcements {
let (_, ref update_1, ref update_2) = channel_announcements;
assert_eq!(update_1, &None);
assert_eq!(update_2, &None);
} else {
@ -2484,8 +2476,8 @@ mod tests {
}
// Further starting point have no channels after it
let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id + 1000, 1);
assert_eq!(channels_with_announcements.len(), 0);
let channels_with_announcements = gossip_sync.get_next_channel_announcement(short_channel_id + 1000);
assert!(channels_with_announcements.is_none());
}
#[test]
@ -2497,8 +2489,8 @@ mod tests {
let node_id_1 = PublicKey::from_secret_key(&secp_ctx, node_1_privkey);
// No nodes yet.
let next_announcements = gossip_sync.get_next_node_announcements(None, 10);
assert_eq!(next_announcements.len(), 0);
let next_announcements = gossip_sync.get_next_node_announcement(None);
assert!(next_announcements.is_none());
{
// Announce a channel to add 2 nodes
@ -2509,10 +2501,9 @@ mod tests {
};
}
// Nodes were never announced
let next_announcements = gossip_sync.get_next_node_announcements(None, 3);
assert_eq!(next_announcements.len(), 0);
let next_announcements = gossip_sync.get_next_node_announcement(None);
assert!(next_announcements.is_none());
{
let valid_announcement = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx);
@ -2528,12 +2519,12 @@ mod tests {
};
}
let next_announcements = gossip_sync.get_next_node_announcements(None, 3);
assert_eq!(next_announcements.len(), 2);
let next_announcements = gossip_sync.get_next_node_announcement(None);
assert!(next_announcements.is_some());
// Skip the first node.
let next_announcements = gossip_sync.get_next_node_announcements(Some(&node_id_1), 2);
assert_eq!(next_announcements.len(), 1);
let next_announcements = gossip_sync.get_next_node_announcement(Some(&node_id_1));
assert!(next_announcements.is_some());
{
// Later announcement which should not be relayed (excess data) prevent us from sharing a node
@ -2547,8 +2538,8 @@ mod tests {
};
}
let next_announcements = gossip_sync.get_next_node_announcements(Some(&node_id_1), 2);
assert_eq!(next_announcements.len(), 0);
let next_announcements = gossip_sync.get_next_node_announcement(Some(&node_id_1));
assert!(next_announcements.is_none());
}
#[test]

View file

@ -45,7 +45,7 @@ use prelude::*;
use core::time::Duration;
use sync::{Mutex, Arc};
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use core::{cmp, mem};
use core::mem;
use bitcoin::bech32::u5;
use chain::keysinterface::{InMemorySigner, Recipient, KeyMaterial};
@ -445,23 +445,16 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
self.chan_upds_recvd.fetch_add(1, Ordering::AcqRel);
Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError })
}
fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> {
let mut chan_anns = Vec::new();
const TOTAL_UPDS: u64 = 50;
let end: u64 = cmp::min(starting_point + batch_amount as u64, TOTAL_UPDS);
for i in starting_point..end {
let chan_upd_1 = get_dummy_channel_update(i);
let chan_upd_2 = get_dummy_channel_update(i);
let chan_ann = get_dummy_channel_announcement(i);
fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> {
let chan_upd_1 = get_dummy_channel_update(starting_point);
let chan_upd_2 = get_dummy_channel_update(starting_point);
let chan_ann = get_dummy_channel_announcement(starting_point);
chan_anns.push((chan_ann, Some(chan_upd_1), Some(chan_upd_2)));
}
chan_anns
Some((chan_ann, Some(chan_upd_1), Some(chan_upd_2)))
}
fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<msgs::NodeAnnouncement> {
Vec::new()
fn get_next_node_announcement(&self, _starting_point: Option<&PublicKey>) -> Option<msgs::NodeAnnouncement> {
None
}
fn peer_connected(&self, their_node_id: &PublicKey, init_msg: &msgs::Init) {