mirror of
https://github.com/lightningdevkit/rust-lightning.git
synced 2025-02-25 15:20:24 +01:00
Merge pull request #1452 from tnull/2022-04-honor-gossip-timestamp-filters
Initiate gossip sync only after receiving GossipTimestampFilter.
This commit is contained in:
commit
171dfeee07
2 changed files with 71 additions and 15 deletions
|
@ -339,6 +339,7 @@ struct Peer {
|
||||||
msgs_sent_since_pong: usize,
|
msgs_sent_since_pong: usize,
|
||||||
awaiting_pong_timer_tick_intervals: i8,
|
awaiting_pong_timer_tick_intervals: i8,
|
||||||
received_message_since_timer_tick: bool,
|
received_message_since_timer_tick: bool,
|
||||||
|
sent_gossip_timestamp_filter: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Peer {
|
impl Peer {
|
||||||
|
@ -348,7 +349,11 @@ impl Peer {
|
||||||
/// announcements/updates for the given channel_id then we will send it when we get to that
|
/// announcements/updates for the given channel_id then we will send it when we get to that
|
||||||
/// point and we shouldn't send it yet to avoid sending duplicate updates. If we've already
|
/// point and we shouldn't send it yet to avoid sending duplicate updates. If we've already
|
||||||
/// sent the old versions, we should send the update, and so return true here.
|
/// sent the old versions, we should send the update, and so return true here.
|
||||||
fn should_forward_channel_announcement(&self, channel_id: u64)->bool{
|
fn should_forward_channel_announcement(&self, channel_id: u64) -> bool {
|
||||||
|
if self.their_features.as_ref().unwrap().supports_gossip_queries() &&
|
||||||
|
!self.sent_gossip_timestamp_filter {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
match self.sync_status {
|
match self.sync_status {
|
||||||
InitSyncTracker::NoSyncRequested => true,
|
InitSyncTracker::NoSyncRequested => true,
|
||||||
InitSyncTracker::ChannelsSyncing(i) => i < channel_id,
|
InitSyncTracker::ChannelsSyncing(i) => i < channel_id,
|
||||||
|
@ -358,6 +363,10 @@ impl Peer {
|
||||||
|
|
||||||
/// Similar to the above, but for node announcements indexed by node_id.
|
/// Similar to the above, but for node announcements indexed by node_id.
|
||||||
fn should_forward_node_announcement(&self, node_id: PublicKey) -> bool {
|
fn should_forward_node_announcement(&self, node_id: PublicKey) -> bool {
|
||||||
|
if self.their_features.as_ref().unwrap().supports_gossip_queries() &&
|
||||||
|
!self.sent_gossip_timestamp_filter {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
match self.sync_status {
|
match self.sync_status {
|
||||||
InitSyncTracker::NoSyncRequested => true,
|
InitSyncTracker::NoSyncRequested => true,
|
||||||
InitSyncTracker::ChannelsSyncing(_) => false,
|
InitSyncTracker::ChannelsSyncing(_) => false,
|
||||||
|
@ -619,6 +628,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
||||||
msgs_sent_since_pong: 0,
|
msgs_sent_since_pong: 0,
|
||||||
awaiting_pong_timer_tick_intervals: 0,
|
awaiting_pong_timer_tick_intervals: 0,
|
||||||
received_message_since_timer_tick: false,
|
received_message_since_timer_tick: false,
|
||||||
|
sent_gossip_timestamp_filter: false,
|
||||||
}).is_some() {
|
}).is_some() {
|
||||||
panic!("PeerManager driver duplicated descriptors!");
|
panic!("PeerManager driver duplicated descriptors!");
|
||||||
};
|
};
|
||||||
|
@ -665,6 +675,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
||||||
msgs_sent_since_pong: 0,
|
msgs_sent_since_pong: 0,
|
||||||
awaiting_pong_timer_tick_intervals: 0,
|
awaiting_pong_timer_tick_intervals: 0,
|
||||||
received_message_since_timer_tick: false,
|
received_message_since_timer_tick: false,
|
||||||
|
sent_gossip_timestamp_filter: false,
|
||||||
}).is_some() {
|
}).is_some() {
|
||||||
panic!("PeerManager driver duplicated descriptors!");
|
panic!("PeerManager driver duplicated descriptors!");
|
||||||
};
|
};
|
||||||
|
@ -1058,7 +1069,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
||||||
|
|
||||||
log_info!(self.logger, "Received peer Init message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.features);
|
log_info!(self.logger, "Received peer Init message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.features);
|
||||||
|
|
||||||
if msg.features.initial_routing_sync() {
|
// For peers not supporting gossip queries start sync now, otherwise wait until we receive a filter.
|
||||||
|
if msg.features.initial_routing_sync() && !msg.features.supports_gossip_queries() {
|
||||||
peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
|
peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
|
||||||
}
|
}
|
||||||
if !msg.features.supports_static_remote_key() {
|
if !msg.features.supports_static_remote_key() {
|
||||||
|
@ -1205,7 +1217,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
||||||
self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), msg)?;
|
self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), msg)?;
|
||||||
},
|
},
|
||||||
wire::Message::GossipTimestampFilter(_msg) => {
|
wire::Message::GossipTimestampFilter(_msg) => {
|
||||||
// TODO: handle message
|
// When supporting gossip messages, start inital gossip sync only after we receive
|
||||||
|
// a GossipTimestampFilter
|
||||||
|
if peer.their_features.as_ref().unwrap().supports_gossip_queries() &&
|
||||||
|
!peer.sent_gossip_timestamp_filter {
|
||||||
|
peer.sent_gossip_timestamp_filter = true;
|
||||||
|
peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
|
||||||
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
// Unknown messages:
|
// Unknown messages:
|
||||||
|
@ -1803,6 +1821,8 @@ mod tests {
|
||||||
assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
|
assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
|
||||||
peer_b.process_events();
|
peer_b.process_events();
|
||||||
assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
|
assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
|
||||||
|
peer_a.process_events();
|
||||||
|
assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
|
||||||
(fd_a.clone(), fd_b.clone())
|
(fd_a.clone(), fd_b.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1866,21 +1886,21 @@ mod tests {
|
||||||
let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
|
let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
|
||||||
|
|
||||||
// Make each peer to read the messages that the other peer just wrote to them. Note that
|
// Make each peer to read the messages that the other peer just wrote to them. Note that
|
||||||
// due to the max-messagse-before-ping limits this may take a few iterations to complete.
|
// due to the max-message-before-ping limits this may take a few iterations to complete.
|
||||||
for _ in 0..150/super::BUFFER_DRAIN_MSGS_PER_TICK + 1 {
|
for _ in 0..150/super::BUFFER_DRAIN_MSGS_PER_TICK + 1 {
|
||||||
peers[0].process_events();
|
|
||||||
let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0);
|
|
||||||
assert!(!b_read_data.is_empty());
|
|
||||||
|
|
||||||
peers[1].read_event(&mut fd_b, &b_read_data).unwrap();
|
|
||||||
peers[1].process_events();
|
peers[1].process_events();
|
||||||
|
|
||||||
let a_read_data = fd_b.outbound_data.lock().unwrap().split_off(0);
|
let a_read_data = fd_b.outbound_data.lock().unwrap().split_off(0);
|
||||||
assert!(!a_read_data.is_empty());
|
assert!(!a_read_data.is_empty());
|
||||||
peers[0].read_event(&mut fd_a, &a_read_data).unwrap();
|
|
||||||
|
|
||||||
peers[1].process_events();
|
peers[0].read_event(&mut fd_a, &a_read_data).unwrap();
|
||||||
assert_eq!(fd_b.outbound_data.lock().unwrap().len(), 0, "Until B receives data, it shouldn't send more messages");
|
peers[0].process_events();
|
||||||
|
|
||||||
|
let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0);
|
||||||
|
assert!(!b_read_data.is_empty());
|
||||||
|
peers[1].read_event(&mut fd_b, &b_read_data).unwrap();
|
||||||
|
|
||||||
|
peers[0].process_events();
|
||||||
|
assert_eq!(fd_a.outbound_data.lock().unwrap().len(), 0, "Until A receives data, it shouldn't send more messages");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that each peer has received the expected number of channel updates and channel
|
// Check that each peer has received the expected number of channel updates and channel
|
||||||
|
|
|
@ -49,6 +49,9 @@ use core::{cmp, mem};
|
||||||
use bitcoin::bech32::u5;
|
use bitcoin::bech32::u5;
|
||||||
use chain::keysinterface::{InMemorySigner, Recipient, KeyMaterial};
|
use chain::keysinterface::{InMemorySigner, Recipient, KeyMaterial};
|
||||||
|
|
||||||
|
#[cfg(feature = "std")]
|
||||||
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
pub struct TestVecWriter(pub Vec<u8>);
|
pub struct TestVecWriter(pub Vec<u8>);
|
||||||
impl Writer for TestVecWriter {
|
impl Writer for TestVecWriter {
|
||||||
fn write_all(&mut self, buf: &[u8]) -> Result<(), io::Error> {
|
fn write_all(&mut self, buf: &[u8]) -> Result<(), io::Error> {
|
||||||
|
@ -341,6 +344,7 @@ fn get_dummy_channel_update(short_chan_id: u64) -> msgs::ChannelUpdate {
|
||||||
pub struct TestRoutingMessageHandler {
|
pub struct TestRoutingMessageHandler {
|
||||||
pub chan_upds_recvd: AtomicUsize,
|
pub chan_upds_recvd: AtomicUsize,
|
||||||
pub chan_anns_recvd: AtomicUsize,
|
pub chan_anns_recvd: AtomicUsize,
|
||||||
|
pub pending_events: Mutex<Vec<events::MessageSendEvent>>,
|
||||||
pub request_full_sync: AtomicBool,
|
pub request_full_sync: AtomicBool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -349,6 +353,7 @@ impl TestRoutingMessageHandler {
|
||||||
TestRoutingMessageHandler {
|
TestRoutingMessageHandler {
|
||||||
chan_upds_recvd: AtomicUsize::new(0),
|
chan_upds_recvd: AtomicUsize::new(0),
|
||||||
chan_anns_recvd: AtomicUsize::new(0),
|
chan_anns_recvd: AtomicUsize::new(0),
|
||||||
|
pending_events: Mutex::new(vec![]),
|
||||||
request_full_sync: AtomicBool::new(false),
|
request_full_sync: AtomicBool::new(false),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -384,7 +389,35 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
|
||||||
Vec::new()
|
Vec::new()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn peer_connected(&self, _their_node_id: &PublicKey, _init_msg: &msgs::Init) {}
|
fn peer_connected(&self, their_node_id: &PublicKey, init_msg: &msgs::Init) {
|
||||||
|
if !init_msg.features.supports_gossip_queries() {
|
||||||
|
return ();
|
||||||
|
}
|
||||||
|
|
||||||
|
let should_request_full_sync = self.request_full_sync.load(Ordering::Acquire);
|
||||||
|
|
||||||
|
#[allow(unused_mut, unused_assignments)]
|
||||||
|
let mut gossip_start_time = 0;
|
||||||
|
#[cfg(feature = "std")]
|
||||||
|
{
|
||||||
|
gossip_start_time = SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs();
|
||||||
|
if should_request_full_sync {
|
||||||
|
gossip_start_time -= 60 * 60 * 24 * 7 * 2; // 2 weeks ago
|
||||||
|
} else {
|
||||||
|
gossip_start_time -= 60 * 60; // an hour ago
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut pending_events = self.pending_events.lock().unwrap();
|
||||||
|
pending_events.push(events::MessageSendEvent::SendGossipTimestampFilter {
|
||||||
|
node_id: their_node_id.clone(),
|
||||||
|
msg: msgs::GossipTimestampFilter {
|
||||||
|
chain_hash: genesis_block(Network::Testnet).header.block_hash(),
|
||||||
|
first_timestamp: gossip_start_time as u32,
|
||||||
|
timestamp_range: u32::max_value(),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), msgs::LightningError> {
|
fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), msgs::LightningError> {
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -405,7 +438,10 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
|
||||||
|
|
||||||
impl events::MessageSendEventsProvider for TestRoutingMessageHandler {
|
impl events::MessageSendEventsProvider for TestRoutingMessageHandler {
|
||||||
fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
|
fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
|
||||||
vec![]
|
let mut ret = Vec::new();
|
||||||
|
let mut pending_events = self.pending_events.lock().unwrap();
|
||||||
|
core::mem::swap(&mut ret, &mut pending_events);
|
||||||
|
ret
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue