mirror of
https://github.com/lightningdevkit/rust-lightning.git
synced 2025-02-24 23:08:36 +01:00
Merge pull request #1137 from TheBlueMatt/2021-10-ping-fixes
Give peers which are sending us messages/receiving messages from us longer to respond to ping
This commit is contained in:
commit
070e22bf09
3 changed files with 199 additions and 47 deletions
|
@ -14,8 +14,7 @@ use lightning::chain::chainmonitor::{ChainMonitor, Persist};
|
||||||
use lightning::chain::keysinterface::{Sign, KeysInterface};
|
use lightning::chain::keysinterface::{Sign, KeysInterface};
|
||||||
use lightning::ln::channelmanager::ChannelManager;
|
use lightning::ln::channelmanager::ChannelManager;
|
||||||
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
|
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
|
||||||
use lightning::ln::peer_handler::{PeerManager, SocketDescriptor};
|
use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
|
||||||
use lightning::ln::peer_handler::CustomMessageHandler;
|
|
||||||
use lightning::routing::network_graph::NetGraphMsgHandler;
|
use lightning::routing::network_graph::NetGraphMsgHandler;
|
||||||
use lightning::util::events::{Event, EventHandler, EventsProvider};
|
use lightning::util::events::{Event, EventHandler, EventsProvider};
|
||||||
use lightning::util::logger::Logger;
|
use lightning::util::logger::Logger;
|
||||||
|
@ -236,8 +235,7 @@ impl BackgroundProcessor {
|
||||||
// timer, we should have disconnected all sockets by now (and they're probably
|
// timer, we should have disconnected all sockets by now (and they're probably
|
||||||
// dead anyway), so disconnect them by calling `timer_tick_occurred()` twice.
|
// dead anyway), so disconnect them by calling `timer_tick_occurred()` twice.
|
||||||
log_trace!(logger, "Awoke after more than double our ping timer, disconnecting peers.");
|
log_trace!(logger, "Awoke after more than double our ping timer, disconnecting peers.");
|
||||||
peer_manager.timer_tick_occurred();
|
peer_manager.disconnect_all_peers();
|
||||||
peer_manager.timer_tick_occurred();
|
|
||||||
last_ping_call = Instant::now();
|
last_ping_call = Instant::now();
|
||||||
} else if last_ping_call.elapsed().as_secs() > PING_TIMER {
|
} else if last_ping_call.elapsed().as_secs() > PING_TIMER {
|
||||||
log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
|
log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
|
||||||
|
|
|
@ -284,6 +284,10 @@ enum InitSyncTracker{
|
||||||
NodesSyncing(PublicKey),
|
NodesSyncing(PublicKey),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we stop
|
||||||
|
/// forwarding gossip messages to peers altogether.
|
||||||
|
const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2;
|
||||||
|
|
||||||
/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
|
/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
|
||||||
/// we have fewer than this many messages in the outbound buffer again.
|
/// we have fewer than this many messages in the outbound buffer again.
|
||||||
/// We also use this as the target number of outbound gossip messages to keep in the write buffer,
|
/// We also use this as the target number of outbound gossip messages to keep in the write buffer,
|
||||||
|
@ -291,7 +295,29 @@ enum InitSyncTracker{
|
||||||
const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10;
|
const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10;
|
||||||
/// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
|
/// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
|
||||||
/// the peer.
|
/// the peer.
|
||||||
const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = 20;
|
const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO;
|
||||||
|
|
||||||
|
/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through
|
||||||
|
/// the socket receive buffer before receiving the ping.
|
||||||
|
///
|
||||||
|
/// On a fairly old Arm64 board, with Linux defaults, this can take as long as 20 seconds, not
|
||||||
|
/// including any network delays, outbound traffic, or the same for messages from other peers.
|
||||||
|
///
|
||||||
|
/// Thus, to avoid needlessly disconnecting a peer, we allow a peer to take this many timer ticks
|
||||||
|
/// per connected peer to respond to a ping, as long as they send us at least one message during
|
||||||
|
/// each tick, ensuring we aren't actually just disconnected.
|
||||||
|
/// With a timer tick interval of five seconds, this translates to about 30 seconds per connected
|
||||||
|
/// peer.
|
||||||
|
///
|
||||||
|
/// When we improve parallelism somewhat we should reduce this to e.g. this many timer ticks per
|
||||||
|
/// two connected peers, assuming most LDK-running systems have at least two cores.
|
||||||
|
const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 6;
|
||||||
|
|
||||||
|
/// This is the minimum number of messages we expect a peer to be able to handle within one timer
|
||||||
|
/// tick. Once we have sent this many messages since the last ping, we send a ping right away to
|
||||||
|
/// ensures we don't just fill up our send buffer and leave the peer with too many messages to
|
||||||
|
/// process before the next ping.
|
||||||
|
const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32;
|
||||||
|
|
||||||
struct Peer {
|
struct Peer {
|
||||||
channel_encryptor: PeerChannelEncryptor,
|
channel_encryptor: PeerChannelEncryptor,
|
||||||
|
@ -308,7 +334,9 @@ struct Peer {
|
||||||
|
|
||||||
sync_status: InitSyncTracker,
|
sync_status: InitSyncTracker,
|
||||||
|
|
||||||
awaiting_pong: bool,
|
msgs_sent_since_pong: usize,
|
||||||
|
awaiting_pong_timer_tick_intervals: i8,
|
||||||
|
received_message_since_timer_tick: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Peer {
|
impl Peer {
|
||||||
|
@ -455,6 +483,17 @@ impl<Descriptor: SocketDescriptor, RM: Deref, L: Deref> PeerManager<Descriptor,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A simple wrapper that optionally prints " from <pubkey>" for an optional pubkey.
|
||||||
|
/// This works around `format!()` taking a reference to each argument, preventing
|
||||||
|
/// `if let Some(node_id) = peer.their_node_id { format!(.., node_id) } else { .. }` from compiling
|
||||||
|
/// due to lifetime errors.
|
||||||
|
struct OptionalFromDebugger<'a>(&'a Option<PublicKey>);
|
||||||
|
impl core::fmt::Display for OptionalFromDebugger<'_> {
|
||||||
|
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
|
||||||
|
if let Some(node_id) = self.0 { write!(f, " from {}", log_pubkey!(node_id)) } else { Ok(()) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> PeerManager<Descriptor, CM, RM, L, CMH> where
|
impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> PeerManager<Descriptor, CM, RM, L, CMH> where
|
||||||
CM::Target: ChannelMessageHandler,
|
CM::Target: ChannelMessageHandler,
|
||||||
RM::Target: RoutingMessageHandler,
|
RM::Target: RoutingMessageHandler,
|
||||||
|
@ -534,7 +573,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
||||||
|
|
||||||
sync_status: InitSyncTracker::NoSyncRequested,
|
sync_status: InitSyncTracker::NoSyncRequested,
|
||||||
|
|
||||||
awaiting_pong: false,
|
msgs_sent_since_pong: 0,
|
||||||
|
awaiting_pong_timer_tick_intervals: 0,
|
||||||
|
received_message_since_timer_tick: false,
|
||||||
}).is_some() {
|
}).is_some() {
|
||||||
panic!("PeerManager driver duplicated descriptors!");
|
panic!("PeerManager driver duplicated descriptors!");
|
||||||
};
|
};
|
||||||
|
@ -572,7 +613,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
||||||
|
|
||||||
sync_status: InitSyncTracker::NoSyncRequested,
|
sync_status: InitSyncTracker::NoSyncRequested,
|
||||||
|
|
||||||
awaiting_pong: false,
|
msgs_sent_since_pong: 0,
|
||||||
|
awaiting_pong_timer_tick_intervals: 0,
|
||||||
|
received_message_since_timer_tick: false,
|
||||||
}).is_some() {
|
}).is_some() {
|
||||||
panic!("PeerManager driver duplicated descriptors!");
|
panic!("PeerManager driver duplicated descriptors!");
|
||||||
};
|
};
|
||||||
|
@ -581,7 +624,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
||||||
|
|
||||||
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
|
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
|
||||||
while !peer.awaiting_write_event {
|
while !peer.awaiting_write_event {
|
||||||
if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE {
|
if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && peer.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK {
|
||||||
match peer.sync_status {
|
match peer.sync_status {
|
||||||
InitSyncTracker::NoSyncRequested => {},
|
InitSyncTracker::NoSyncRequested => {},
|
||||||
InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
|
InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
|
||||||
|
@ -626,6 +669,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if peer.msgs_sent_since_pong >= BUFFER_DRAIN_MSGS_PER_TICK {
|
||||||
|
self.maybe_send_extra_ping(peer);
|
||||||
|
}
|
||||||
|
|
||||||
if {
|
if {
|
||||||
let next_buff = match peer.pending_outbound_buffer.front() {
|
let next_buff = match peer.pending_outbound_buffer.front() {
|
||||||
|
@ -701,14 +747,18 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Append a message to a peer's pending outbound/write buffer, and update the map of peers needing sends accordingly.
|
/// Append a message to a peer's pending outbound/write buffer
|
||||||
|
fn enqueue_encoded_message(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
|
||||||
|
peer.msgs_sent_since_pong += 1;
|
||||||
|
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Append a message to a peer's pending outbound/write buffer
|
||||||
fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
|
fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
|
||||||
let mut buffer = VecWriter(Vec::with_capacity(2048));
|
let mut buffer = VecWriter(Vec::with_capacity(2048));
|
||||||
wire::write(message, &mut buffer).unwrap(); // crash if the write failed
|
wire::write(message, &mut buffer).unwrap(); // crash if the write failed
|
||||||
let encoded_message = buffer.0;
|
|
||||||
|
|
||||||
log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap()));
|
log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap()));
|
||||||
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
|
self.enqueue_encoded_message(peer, &buffer.0);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
|
fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
|
||||||
|
@ -748,19 +798,19 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
||||||
match e.action {
|
match e.action {
|
||||||
msgs::ErrorAction::DisconnectPeer { msg: _ } => {
|
msgs::ErrorAction::DisconnectPeer { msg: _ } => {
|
||||||
//TODO: Try to push msg
|
//TODO: Try to push msg
|
||||||
log_debug!(self.logger, "Error handling message; disconnecting peer with: {}", e.err);
|
log_debug!(self.logger, "Error handling message{}; disconnecting peer with: {}", OptionalFromDebugger(&peer.their_node_id), e.err);
|
||||||
return Err(PeerHandleError{ no_connection_possible: false });
|
return Err(PeerHandleError{ no_connection_possible: false });
|
||||||
},
|
},
|
||||||
msgs::ErrorAction::IgnoreAndLog(level) => {
|
msgs::ErrorAction::IgnoreAndLog(level) => {
|
||||||
log_given_level!(self.logger, level, "Error handling message; ignoring: {}", e.err);
|
log_given_level!(self.logger, level, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer.their_node_id), e.err);
|
||||||
continue
|
continue
|
||||||
},
|
},
|
||||||
msgs::ErrorAction::IgnoreError => {
|
msgs::ErrorAction::IgnoreError => {
|
||||||
log_debug!(self.logger, "Error handling message; ignoring: {}", e.err);
|
log_debug!(self.logger, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer.their_node_id), e.err);
|
||||||
continue;
|
continue;
|
||||||
},
|
},
|
||||||
msgs::ErrorAction::SendErrorMessage { msg } => {
|
msgs::ErrorAction::SendErrorMessage { msg } => {
|
||||||
log_debug!(self.logger, "Error handling message; sending error message with: {}", e.err);
|
log_debug!(self.logger, "Error handling message{}; sending error message with: {}", OptionalFromDebugger(&peer.their_node_id), e.err);
|
||||||
self.enqueue_message(peer, &msg);
|
self.enqueue_message(peer, &msg);
|
||||||
continue;
|
continue;
|
||||||
},
|
},
|
||||||
|
@ -804,6 +854,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
||||||
let features = InitFeatures::known();
|
let features = InitFeatures::known();
|
||||||
let resp = msgs::Init { features };
|
let resp = msgs::Init { features };
|
||||||
self.enqueue_message(peer, &resp);
|
self.enqueue_message(peer, &resp);
|
||||||
|
peer.awaiting_pong_timer_tick_intervals = 0;
|
||||||
},
|
},
|
||||||
NextNoiseStep::ActThree => {
|
NextNoiseStep::ActThree => {
|
||||||
let their_node_id = try_potential_handleerror!(peer.channel_encryptor.process_act_three(&peer.pending_read_buffer[..]));
|
let their_node_id = try_potential_handleerror!(peer.channel_encryptor.process_act_three(&peer.pending_read_buffer[..]));
|
||||||
|
@ -814,6 +865,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
||||||
let features = InitFeatures::known();
|
let features = InitFeatures::known();
|
||||||
let resp = msgs::Init { features };
|
let resp = msgs::Init { features };
|
||||||
self.enqueue_message(peer, &resp);
|
self.enqueue_message(peer, &resp);
|
||||||
|
peer.awaiting_pong_timer_tick_intervals = 0;
|
||||||
},
|
},
|
||||||
NextNoiseStep::NoiseComplete => {
|
NextNoiseStep::NoiseComplete => {
|
||||||
if peer.pending_read_is_header {
|
if peer.pending_read_is_header {
|
||||||
|
@ -902,6 +954,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
||||||
message: wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>
|
message: wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>
|
||||||
) -> Result<Option<wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> {
|
) -> Result<Option<wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> {
|
||||||
log_trace!(self.logger, "Received message {:?} from {}", message, log_pubkey!(peer.their_node_id.unwrap()));
|
log_trace!(self.logger, "Received message {:?} from {}", message, log_pubkey!(peer.their_node_id.unwrap()));
|
||||||
|
peer.received_message_since_timer_tick = true;
|
||||||
|
|
||||||
// Need an Init as first message
|
// Need an Init as first message
|
||||||
if let wire::Message::Init(_) = message {
|
if let wire::Message::Init(_) = message {
|
||||||
|
@ -923,7 +976,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
||||||
return Err(PeerHandleError{ no_connection_possible: false }.into());
|
return Err(PeerHandleError{ no_connection_possible: false }.into());
|
||||||
}
|
}
|
||||||
|
|
||||||
log_info!(self.logger, "Received peer Init message: {}", msg.features);
|
log_info!(self.logger, "Received peer Init message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.features);
|
||||||
|
|
||||||
if msg.features.initial_routing_sync() {
|
if msg.features.initial_routing_sync() {
|
||||||
peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
|
peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
|
||||||
|
@ -965,7 +1018,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
wire::Message::Pong(_msg) => {
|
wire::Message::Pong(_msg) => {
|
||||||
peer.awaiting_pong = false;
|
peer.awaiting_pong_timer_tick_intervals = 0;
|
||||||
|
peer.msgs_sent_since_pong = 0;
|
||||||
},
|
},
|
||||||
|
|
||||||
// Channel messages:
|
// Channel messages:
|
||||||
|
@ -1086,7 +1140,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
||||||
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
|
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
|
if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
|
||||||
|
|| peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
|
||||||
|
{
|
||||||
log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
|
log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1097,7 +1153,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
||||||
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
|
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
|
self.enqueue_encoded_message(peer, &encoded_msg);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
wire::Message::NodeAnnouncement(ref msg) => {
|
wire::Message::NodeAnnouncement(ref msg) => {
|
||||||
|
@ -1109,7 +1165,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
||||||
!peer.should_forward_node_announcement(msg.contents.node_id) {
|
!peer.should_forward_node_announcement(msg.contents.node_id) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
|
if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
|
||||||
|
|| peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
|
||||||
|
{
|
||||||
log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
|
log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1119,7 +1177,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
||||||
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
|
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
|
self.enqueue_encoded_message(peer, &encoded_msg);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
wire::Message::ChannelUpdate(ref msg) => {
|
wire::Message::ChannelUpdate(ref msg) => {
|
||||||
|
@ -1131,14 +1189,16 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
||||||
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
|
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
|
if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
|
||||||
|
|| peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
|
||||||
|
{
|
||||||
log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
|
log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
|
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
|
self.enqueue_encoded_message(peer, &encoded_msg);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
_ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),
|
_ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),
|
||||||
|
@ -1414,6 +1474,37 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Disconnects all currently-connected peers. This is useful on platforms where there may be
|
||||||
|
/// an indication that TCP sockets have stalled even if we weren't around to time them out
|
||||||
|
/// using regular ping/pongs.
|
||||||
|
pub fn disconnect_all_peers(&self) {
|
||||||
|
let mut peers_lock = self.peers.lock().unwrap();
|
||||||
|
let peers = &mut *peers_lock;
|
||||||
|
for (mut descriptor, peer) in peers.peers.drain() {
|
||||||
|
if let Some(node_id) = peer.their_node_id {
|
||||||
|
log_trace!(self.logger, "Disconnecting peer with id {} due to client request to disconnect all peers", node_id);
|
||||||
|
peers.node_id_to_descriptor.remove(&node_id);
|
||||||
|
self.message_handler.chan_handler.peer_disconnected(&node_id, false);
|
||||||
|
}
|
||||||
|
descriptor.disconnect_socket();
|
||||||
|
}
|
||||||
|
debug_assert!(peers.node_id_to_descriptor.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This is called when we're blocked on sending additional gossip messages until we receive a
|
||||||
|
/// pong. If we aren't waiting on a pong, we take this opportunity to send a ping (setting
|
||||||
|
/// `awaiting_pong_timer_tick_intervals` to a special flag value to indicate this).
|
||||||
|
fn maybe_send_extra_ping(&self, peer: &mut Peer) {
|
||||||
|
if peer.awaiting_pong_timer_tick_intervals == 0 {
|
||||||
|
peer.awaiting_pong_timer_tick_intervals = -1;
|
||||||
|
let ping = msgs::Ping {
|
||||||
|
ponglen: 0,
|
||||||
|
byteslen: 64,
|
||||||
|
};
|
||||||
|
self.enqueue_message(peer, &ping);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Send pings to each peer and disconnect those which did not respond to the last round of
|
/// Send pings to each peer and disconnect those which did not respond to the last round of
|
||||||
/// pings.
|
/// pings.
|
||||||
///
|
///
|
||||||
|
@ -1432,9 +1523,35 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
||||||
let node_id_to_descriptor = &mut peers.node_id_to_descriptor;
|
let node_id_to_descriptor = &mut peers.node_id_to_descriptor;
|
||||||
let peers = &mut peers.peers;
|
let peers = &mut peers.peers;
|
||||||
let mut descriptors_needing_disconnect = Vec::new();
|
let mut descriptors_needing_disconnect = Vec::new();
|
||||||
|
let peer_count = peers.len();
|
||||||
|
|
||||||
peers.retain(|descriptor, peer| {
|
peers.retain(|descriptor, peer| {
|
||||||
if peer.awaiting_pong {
|
let mut do_disconnect_peer = false;
|
||||||
|
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_node_id.is_none() {
|
||||||
|
// The peer needs to complete its handshake before we can exchange messages. We
|
||||||
|
// give peers one timer tick to complete handshake, reusing
|
||||||
|
// `awaiting_pong_timer_tick_intervals` to track number of timer ticks taken
|
||||||
|
// for handshake completion.
|
||||||
|
if peer.awaiting_pong_timer_tick_intervals != 0 {
|
||||||
|
do_disconnect_peer = true;
|
||||||
|
} else {
|
||||||
|
peer.awaiting_pong_timer_tick_intervals = 1;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if peer.awaiting_pong_timer_tick_intervals == -1 {
|
||||||
|
// Magic value set in `maybe_send_extra_ping`.
|
||||||
|
peer.awaiting_pong_timer_tick_intervals = 1;
|
||||||
|
peer.received_message_since_timer_tick = false;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if do_disconnect_peer
|
||||||
|
|| (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick)
|
||||||
|
|| peer.awaiting_pong_timer_tick_intervals as u64 >
|
||||||
|
MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peer_count as u64
|
||||||
|
{
|
||||||
descriptors_needing_disconnect.push(descriptor.clone());
|
descriptors_needing_disconnect.push(descriptor.clone());
|
||||||
match peer.their_node_id {
|
match peer.their_node_id {
|
||||||
Some(node_id) => {
|
Some(node_id) => {
|
||||||
|
@ -1442,30 +1559,25 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
|
||||||
node_id_to_descriptor.remove(&node_id);
|
node_id_to_descriptor.remove(&node_id);
|
||||||
self.message_handler.chan_handler.peer_disconnected(&node_id, false);
|
self.message_handler.chan_handler.peer_disconnected(&node_id, false);
|
||||||
}
|
}
|
||||||
None => {
|
None => {},
|
||||||
// This can't actually happen as we should have hit
|
|
||||||
// is_ready_for_encryption() previously on this same peer.
|
|
||||||
unreachable!();
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
peer.received_message_since_timer_tick = false;
|
||||||
|
|
||||||
if !peer.channel_encryptor.is_ready_for_encryption() {
|
if peer.awaiting_pong_timer_tick_intervals > 0 {
|
||||||
// The peer needs to complete its handshake before we can exchange messages
|
peer.awaiting_pong_timer_tick_intervals += 1;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
peer.awaiting_pong_timer_tick_intervals = 1;
|
||||||
let ping = msgs::Ping {
|
let ping = msgs::Ping {
|
||||||
ponglen: 0,
|
ponglen: 0,
|
||||||
byteslen: 64,
|
byteslen: 64,
|
||||||
};
|
};
|
||||||
self.enqueue_message(peer, &ping);
|
self.enqueue_message(peer, &ping);
|
||||||
|
self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer);
|
||||||
|
|
||||||
let mut descriptor_clone = descriptor.clone();
|
|
||||||
self.do_attempt_write_data(&mut descriptor_clone, peer);
|
|
||||||
|
|
||||||
peer.awaiting_pong = true;
|
|
||||||
true
|
true
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -1624,11 +1736,23 @@ mod tests {
|
||||||
// than can fit into a peer's buffer).
|
// than can fit into a peer's buffer).
|
||||||
let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
|
let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
|
||||||
|
|
||||||
// Make each peer to read the messages that the other peer just wrote to them.
|
// Make each peer to read the messages that the other peer just wrote to them. Note that
|
||||||
|
// due to the max-messagse-before-ping limits this may take a few iterations to complete.
|
||||||
|
for _ in 0..150/super::BUFFER_DRAIN_MSGS_PER_TICK + 1 {
|
||||||
peers[0].process_events();
|
peers[0].process_events();
|
||||||
peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap();
|
let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0);
|
||||||
|
assert!(!b_read_data.is_empty());
|
||||||
|
|
||||||
|
peers[1].read_event(&mut fd_b, &b_read_data).unwrap();
|
||||||
peers[1].process_events();
|
peers[1].process_events();
|
||||||
peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap();
|
|
||||||
|
let a_read_data = fd_b.outbound_data.lock().unwrap().split_off(0);
|
||||||
|
assert!(!a_read_data.is_empty());
|
||||||
|
peers[0].read_event(&mut fd_a, &a_read_data).unwrap();
|
||||||
|
|
||||||
|
peers[1].process_events();
|
||||||
|
assert_eq!(fd_b.outbound_data.lock().unwrap().len(), 0, "Until B receives data, it shouldn't send more messages");
|
||||||
|
}
|
||||||
|
|
||||||
// Check that each peer has received the expected number of channel updates and channel
|
// Check that each peer has received the expected number of channel updates and channel
|
||||||
// announcements.
|
// announcements.
|
||||||
|
@ -1637,4 +1761,37 @@ mod tests {
|
||||||
assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
|
assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
|
||||||
assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
|
assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_handshake_timeout() {
|
||||||
|
// Tests that we time out a peer still waiting on handshake completion after a full timer
|
||||||
|
// tick.
|
||||||
|
let cfgs = create_peermgr_cfgs(2);
|
||||||
|
cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
|
||||||
|
cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
|
||||||
|
let peers = create_network(2, &cfgs);
|
||||||
|
|
||||||
|
let secp_ctx = Secp256k1::new();
|
||||||
|
let a_id = PublicKey::from_secret_key(&secp_ctx, &peers[0].our_node_secret);
|
||||||
|
let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
|
||||||
|
let mut fd_b = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
|
||||||
|
let initial_data = peers[1].new_outbound_connection(a_id, fd_b.clone()).unwrap();
|
||||||
|
peers[0].new_inbound_connection(fd_a.clone()).unwrap();
|
||||||
|
|
||||||
|
// If we get a single timer tick before completion, that's fine
|
||||||
|
assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
|
||||||
|
peers[0].timer_tick_occurred();
|
||||||
|
assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
|
||||||
|
|
||||||
|
assert_eq!(peers[0].read_event(&mut fd_a, &initial_data).unwrap(), false);
|
||||||
|
peers[0].process_events();
|
||||||
|
assert_eq!(peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
|
||||||
|
peers[1].process_events();
|
||||||
|
|
||||||
|
// ...but if we get a second timer tick, we should disconnect the peer
|
||||||
|
peers[0].timer_tick_occurred();
|
||||||
|
assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0);
|
||||||
|
|
||||||
|
assert!(peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).is_err());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -320,7 +320,6 @@ fn get_dummy_channel_update(short_chan_id: u64) -> msgs::ChannelUpdate {
|
||||||
pub struct TestRoutingMessageHandler {
|
pub struct TestRoutingMessageHandler {
|
||||||
pub chan_upds_recvd: AtomicUsize,
|
pub chan_upds_recvd: AtomicUsize,
|
||||||
pub chan_anns_recvd: AtomicUsize,
|
pub chan_anns_recvd: AtomicUsize,
|
||||||
pub chan_anns_sent: AtomicUsize,
|
|
||||||
pub request_full_sync: AtomicBool,
|
pub request_full_sync: AtomicBool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -329,7 +328,6 @@ impl TestRoutingMessageHandler {
|
||||||
TestRoutingMessageHandler {
|
TestRoutingMessageHandler {
|
||||||
chan_upds_recvd: AtomicUsize::new(0),
|
chan_upds_recvd: AtomicUsize::new(0),
|
||||||
chan_anns_recvd: AtomicUsize::new(0),
|
chan_anns_recvd: AtomicUsize::new(0),
|
||||||
chan_anns_sent: AtomicUsize::new(0),
|
|
||||||
request_full_sync: AtomicBool::new(false),
|
request_full_sync: AtomicBool::new(false),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -348,8 +346,8 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
|
||||||
}
|
}
|
||||||
fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> {
|
fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> {
|
||||||
let mut chan_anns = Vec::new();
|
let mut chan_anns = Vec::new();
|
||||||
const TOTAL_UPDS: u64 = 100;
|
const TOTAL_UPDS: u64 = 50;
|
||||||
let end: u64 = cmp::min(starting_point + batch_amount as u64, TOTAL_UPDS - self.chan_anns_sent.load(Ordering::Acquire) as u64);
|
let end: u64 = cmp::min(starting_point + batch_amount as u64, TOTAL_UPDS);
|
||||||
for i in starting_point..end {
|
for i in starting_point..end {
|
||||||
let chan_upd_1 = get_dummy_channel_update(i);
|
let chan_upd_1 = get_dummy_channel_update(i);
|
||||||
let chan_upd_2 = get_dummy_channel_update(i);
|
let chan_upd_2 = get_dummy_channel_update(i);
|
||||||
|
@ -358,7 +356,6 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
|
||||||
chan_anns.push((chan_ann, Some(chan_upd_1), Some(chan_upd_2)));
|
chan_anns.push((chan_ann, Some(chan_upd_1), Some(chan_upd_2)));
|
||||||
}
|
}
|
||||||
|
|
||||||
self.chan_anns_sent.fetch_add(chan_anns.len(), Ordering::AcqRel);
|
|
||||||
chan_anns
|
chan_anns
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue