Change routing table sync to use gossip_queries

This commit changes outbound routing table sync to use gossip_queries
instead of the effectively deprecated initial_routing_sync feature.

This change removes setting of initial_routing_sync in our outbound Init
message. Instead we now call sync_routing_table after receiving an Init
message from a peer. If the peer supports gossip_queries and
should_request_full_sync returns true, we initiate a full gossip_queries
sync.
This commit is contained in:
bmancini55 2020-12-03 12:48:40 -05:00
parent 7a4a29ffe0
commit e742894492
4 changed files with 55 additions and 67 deletions

View file

@ -484,6 +484,7 @@ impl<T: sealed::GossipQueries> Features<T> {
pub(crate) fn supports_gossip_queries(&self) -> bool {
<T as sealed::GossipQueries>::supports_feature(&self.flags)
}
#[cfg(test)]
pub(crate) fn clear_gossip_queries(mut self) -> Self {
<T as sealed::GossipQueries>::clear_bits(&mut self.flags);
self
@ -514,6 +515,10 @@ impl<T: sealed::InitialRoutingSync> Features<T> {
pub(crate) fn initial_routing_sync(&self) -> bool {
<T as sealed::InitialRoutingSync>::supports_feature(&self.flags)
}
// We are no longer setting initial_routing_sync now that gossip_queries
// is enabled. This feature is ignored by a peer when gossip_queries has
// been negotiated.
#[cfg(test)]
pub(crate) fn clear_initial_routing_sync(&mut self) {
<T as sealed::InitialRoutingSync>::clear_bits(&mut self.flags)
}

View file

@ -804,6 +804,12 @@ pub trait ChannelMessageHandler : events::MessageSendEventsProvider + Send + Syn
}
/// A trait to describe an object which can receive routing messages.
///
/// # Implementor DoS Warnings
///
/// For `gossip_queries` messages there are potential DoS vectors when handling
/// inbound queries. Implementors using an on-disk network graph should be aware of
/// repeated disk I/O for queries accessing different parts of the network graph.
pub trait RoutingMessageHandler : Send + Sync + events::MessageSendEventsProvider {
/// Handle an incoming node_announcement message, returning true if it should be forwarded on,
/// false or returning an Err otherwise.
@ -841,14 +847,10 @@ pub trait RoutingMessageHandler : Send + Sync + events::MessageSendEventsProvide
/// gossip messages.
fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError>;
/// Handles when a peer asks us to send a list of short_channel_ids
/// for the requested range of blocks. There are potential DoS vectors when
/// handling inbound queries. Handling requests with first_blocknum very far
/// away may trigger repeated disk I/O if the NetworkGraph is not fully in-memory.
/// for the requested range of blocks.
fn handle_query_channel_range(&self, their_node_id: &PublicKey, msg: QueryChannelRange) -> Result<(), LightningError>;
/// Handles when a peer asks us to send routing gossip messages for a
/// list of short_channel_ids. There are potential DoS vectors when handling
/// inbound queries. Handling requests with first_blocknum very far away may
/// trigger repeated disk I/O if the NetworkGraph is not fully in-memory.
/// list of short_channel_ids.
fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>;
}

View file

@ -584,11 +584,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
peer.their_node_id = Some(their_node_id);
insert_node_id!();
let mut features = InitFeatures::known().clear_gossip_queries();
if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
features.clear_initial_routing_sync();
}
let features = InitFeatures::known();
let resp = msgs::Init { features };
self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &resp);
},
@ -713,15 +709,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
}
if !peer.outbound {
let mut features = InitFeatures::known().clear_gossip_queries();
if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
features.clear_initial_routing_sync();
}
let features = InitFeatures::known();
let resp = msgs::Init { features };
self.enqueue_message(peers_needing_send, peer, peer_descriptor.clone(), &resp);
}
self.message_handler.route_handler.sync_routing_table(&peer.their_node_id.unwrap(), &msg);
self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg);
peer.their_features = Some(msg.features);
},
@ -1329,13 +1323,6 @@ mod tests {
(fd_a.clone(), fd_b.clone())
}
fn establish_connection_and_read_events<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>) -> (FileDescriptor, FileDescriptor) {
let (mut fd_a, mut fd_b) = establish_connection(peer_a, peer_b);
assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
(fd_a.clone(), fd_b.clone())
}
#[test]
fn test_disconnect_peer() {
// Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
@ -1404,41 +1391,4 @@ mod tests {
assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
}
#[test]
fn limit_initial_routing_sync_requests() {
// Inbound peer 0 requests initial_routing_sync, but outbound peer 1 does not.
{
let cfgs = create_peermgr_cfgs(2);
cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
let peers = create_network(2, &cfgs);
let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]);
let peer_0 = peers[0].peers.lock().unwrap();
let peer_1 = peers[1].peers.lock().unwrap();
let peer_0_features = peer_1.peers.get(&fd_1_to_0).unwrap().their_features.as_ref();
let peer_1_features = peer_0.peers.get(&fd_0_to_1).unwrap().their_features.as_ref();
assert!(peer_0_features.unwrap().initial_routing_sync());
assert!(!peer_1_features.unwrap().initial_routing_sync());
}
// Outbound peer 1 requests initial_routing_sync, but inbound peer 0 does not.
{
let cfgs = create_peermgr_cfgs(2);
cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
let peers = create_network(2, &cfgs);
let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]);
let peer_0 = peers[0].peers.lock().unwrap();
let peer_1 = peers[1].peers.lock().unwrap();
let peer_0_features = peer_1.peers.get(&fd_1_to_0).unwrap().their_features.as_ref();
let peer_1_features = peer_0.peers.get(&fd_0_to_1).unwrap().their_features.as_ref();
assert!(!peer_0_features.unwrap().initial_routing_sync());
assert!(peer_1_features.unwrap().initial_routing_sync());
}
}
}

View file

@ -219,17 +219,26 @@ impl<C: Deref + Sync + Send, L: Deref + Sync + Send> RoutingMessageHandler for N
}
/// Initiates a stateless sync of routing gossip information with a peer
/// by calling query_channel_range. The default strategy used by this
/// implementation is to sync for the full block range with several peers.
/// using gossip_queries. The default strategy used by this implementation
/// is to sync the full block range with several peers.
///
/// We should expect one or more reply_channel_range messages in response
/// to our query. Each reply will enqueue a query_scid message to request
/// gossip messages for each channel. The sync is considered complete when
/// the final reply_scids_end message is received, though we are not
/// to our query_channel_range. Each reply will enqueue a query_scid message
/// to request gossip messages for each channel. The sync is considered complete
/// when the final reply_scids_end message is received, though we are not
/// tracking this directly.
fn sync_routing_table(&self, their_node_id: &PublicKey, init_msg: &Init) {
// We will only perform a sync with peers that support gossip_queries.
if !init_msg.features.supports_gossip_queries() {
return ();
}
// Check if we need to perform a full synchronization with this peer
if !self.should_request_full_sync(their_node_id) {
return ();
}
let first_blocknum = 0;
let number_of_blocks = 0xffffffff;
log_debug!(self.logger, "Sending query_channel_range peer={}, first_blocknum={}, number_of_blocks={}", log_pubkey!(their_node_id), first_blocknum, number_of_blocks);
@ -249,7 +258,10 @@ impl<C: Deref + Sync + Send, L: Deref + Sync + Send> RoutingMessageHandler for N
/// stateless, it does not validate the sequencing of replies for multi-
/// reply ranges. It does not validate whether the reply(ies) cover the
/// queried range. It also does not filter SCIDs to only those in the
/// original query range.
/// original query range. We also do not validate that the chain_hash
/// matches the chain_hash of the NetworkGraph. Any chan_ann message that
/// does not match our chain_hash will be rejected when the announcement is
/// processed.
fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> {
log_debug!(self.logger, "Handling reply_channel_range peer={}, first_blocknum={}, number_of_blocks={}, full_information={}, scids={}", log_pubkey!(their_node_id), msg.first_blocknum, msg.number_of_blocks, msg.full_information, msg.short_channel_ids.len(),);
@ -1967,6 +1979,26 @@ mod tests {
_ => panic!("Expected MessageSendEvent::SendChannelRangeQuery")
};
}
// It should not enqueue a query when should_request_full_sync return false.
// The initial implementation allows syncing with the first 5 peers after
// which should_request_full_sync will return false
{
let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler();
let init_msg = Init { features: InitFeatures::known() };
for n in 1..7 {
let node_privkey = &SecretKey::from_slice(&[n; 32]).unwrap();
let node_id = PublicKey::from_secret_key(&secp_ctx, node_privkey);
net_graph_msg_handler.sync_routing_table(&node_id, &init_msg);
let events = net_graph_msg_handler.get_and_clear_pending_msg_events();
if n <= 5 {
assert_eq!(events.len(), 1);
} else {
assert_eq!(events.len(), 0);
}
}
}
}
#[test]
@ -1980,7 +2012,6 @@ mod tests {
// Test receipt of a single reply that should enqueue an SCID query
// matching the SCIDs in the reply
{
// Handle a single successful reply that encompasses the queried channel range
let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, ReplyChannelRange {
chain_hash,
full_information: true,