2020-08-10 15:00:09 -04:00
// This file is Copyright its original authors, visible in version control
// history.
//
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
// You may not use this file except in accordance with one or both of these
// licenses.
2018-09-19 17:39:43 -04:00
//! Top level peer message handling and socket handling logic lives here.
2018-09-20 12:57:47 -04:00
//!
2018-09-19 17:39:43 -04:00
//! Instead of actually servicing sockets ourselves we require that you implement the
//! SocketDescriptor interface and use that to receive actions which you should perform on the
//! socket, and call into PeerManager with bytes read from the socket. The PeerManager will then
2020-05-06 19:04:44 -04:00
//! call into the provided message handlers (probably a ChannelManager and NetGraphmsgHandler) with messages
2018-09-19 17:39:43 -04:00
//! they should handle, and encoding/sending response messages.
2022-05-05 17:59:38 +02:00
use bitcoin ::secp256k1 ::{ SecretKey , PublicKey } ;
2017-12-25 01:05:27 -05:00
2020-01-06 17:54:02 -05:00
use ln ::features ::InitFeatures ;
2017-12-25 01:05:27 -05:00
use ln ::msgs ;
2022-03-13 00:40:35 +05:30
use ln ::msgs ::{ ChannelMessageHandler , LightningError , NetAddress , RoutingMessageHandler } ;
2020-01-16 13:26:38 -05:00
use ln ::channelmanager ::{ SimpleArcChannelManager , SimpleRefChannelManager } ;
2021-08-05 14:51:17 +09:00
use util ::ser ::{ VecWriter , Writeable , Writer } ;
2017-12-25 01:05:27 -05:00
use ln ::peer_channel_encryptor ::{ PeerChannelEncryptor , NextNoiseStep } ;
2020-01-21 15:26:21 -08:00
use ln ::wire ;
2021-11-22 18:19:08 +01:00
use ln ::wire ::Encode ;
2021-10-08 22:54:32 +00:00
use util ::atomic_counter ::AtomicCounter ;
2020-01-16 13:26:38 -05:00
use util ::events ::{ MessageSendEvent , MessageSendEventsProvider } ;
2021-06-29 23:44:31 +00:00
use util ::logger ::Logger ;
2021-11-01 13:14:14 -05:00
use routing ::network_graph ::{ NetworkGraph , NetGraphMsgHandler } ;
2017-12-25 01:05:27 -05:00
2021-05-19 04:21:39 +00:00
use prelude ::* ;
2021-08-01 18:22:06 +02:00
use io ;
2021-05-19 04:21:39 +00:00
use alloc ::collections ::LinkedList ;
2021-07-19 15:01:58 +02:00
use sync ::{ Arc , Mutex } ;
2021-05-23 23:22:46 +00:00
use core ::{ cmp , hash , fmt , mem } ;
use core ::ops ::Deref ;
Use Infallible for the unconstructable default custom message type
When we landed custom messages, we used the empty tuple for the
custom message type for `IgnoringMessageHandler`. This was fine,
except that we also implemented `Writeable` to panic when writing
a `()`. Later, we added support for anchor output construction in
CommitmentTransaction, signified by setting a field to `Some(())`,
which is serialized as-is.
This causes us to panic when writing a `CommitmentTransaction`
with `opt_anchors` set. Note that we never set it inside of LDK,
but downstream users may.
Instead, we implement `Writeable` to write nothing for `()` and use
`core::convert::Infallible` for the default custom message type as
it is, appropriately, unconstructable.
This also makes it easier to implement various things in bindings,
as we can always assume `Infallible`-conversion logic is
unreachable.
2021-09-22 19:00:30 +00:00
use core ::convert ::Infallible ;
2021-08-01 18:22:06 +02:00
#[ cfg(feature = " std " ) ] use std ::error ;
2017-12-25 01:05:27 -05:00
2020-04-27 16:41:54 +02:00
use bitcoin ::hashes ::sha256 ::Hash as Sha256 ;
use bitcoin ::hashes ::sha256 ::HashEngine as Sha256Engine ;
use bitcoin ::hashes ::{ HashEngine , Hash } ;
2019-07-18 22:17:36 -04:00
2021-08-05 14:51:17 +09:00
/// Handler for BOLT1-compliant messages.
pub trait CustomMessageHandler : wire ::CustomMessageReader {
/// Called with the message type that was received and the buffer to be read.
/// Can return a `MessageHandlingError` if the message could not be handled.
2021-09-14 15:40:10 +09:00
fn handle_custom_message ( & self , msg : Self ::CustomMessage , sender_node_id : & PublicKey ) -> Result < ( ) , LightningError > ;
2021-08-05 14:51:17 +09:00
/// Gets the list of pending messages which were generated by the custom message
/// handler, clearing the list in the process. The first tuple element must
/// correspond to the intended recipients node ids. If no connection to one of the
/// specified node does not exist, the message is simply not sent to it.
fn get_and_clear_pending_msg ( & self ) -> Vec < ( PublicKey , Self ::CustomMessage ) > ;
}
2021-03-01 15:10:59 -05:00
/// A dummy struct which implements `RoutingMessageHandler` without storing any routing information
/// or doing any processing. You can provide one of these as the route_handler in a MessageHandler.
2021-03-02 10:25:37 -05:00
pub struct IgnoringMessageHandler { }
2021-03-01 15:10:59 -05:00
impl MessageSendEventsProvider for IgnoringMessageHandler {
fn get_and_clear_pending_msg_events ( & self ) -> Vec < MessageSendEvent > { Vec ::new ( ) }
}
impl RoutingMessageHandler for IgnoringMessageHandler {
fn handle_node_announcement ( & self , _msg : & msgs ::NodeAnnouncement ) -> Result < bool , LightningError > { Ok ( false ) }
fn handle_channel_announcement ( & self , _msg : & msgs ::ChannelAnnouncement ) -> Result < bool , LightningError > { Ok ( false ) }
fn handle_channel_update ( & self , _msg : & msgs ::ChannelUpdate ) -> Result < bool , LightningError > { Ok ( false ) }
fn get_next_channel_announcements ( & self , _starting_point : u64 , _batch_amount : u8 ) ->
Vec < ( msgs ::ChannelAnnouncement , Option < msgs ::ChannelUpdate > , Option < msgs ::ChannelUpdate > ) > { Vec ::new ( ) }
fn get_next_node_announcements ( & self , _starting_point : Option < & PublicKey > , _batch_amount : u8 ) -> Vec < msgs ::NodeAnnouncement > { Vec ::new ( ) }
2022-03-17 22:04:48 +00:00
fn peer_connected ( & self , _their_node_id : & PublicKey , _init : & msgs ::Init ) { }
2021-03-01 15:10:59 -05:00
fn handle_reply_channel_range ( & self , _their_node_id : & PublicKey , _msg : msgs ::ReplyChannelRange ) -> Result < ( ) , LightningError > { Ok ( ( ) ) }
fn handle_reply_short_channel_ids_end ( & self , _their_node_id : & PublicKey , _msg : msgs ::ReplyShortChannelIdsEnd ) -> Result < ( ) , LightningError > { Ok ( ( ) ) }
fn handle_query_channel_range ( & self , _their_node_id : & PublicKey , _msg : msgs ::QueryChannelRange ) -> Result < ( ) , LightningError > { Ok ( ( ) ) }
fn handle_query_short_channel_ids ( & self , _their_node_id : & PublicKey , _msg : msgs ::QueryShortChannelIds ) -> Result < ( ) , LightningError > { Ok ( ( ) ) }
}
impl Deref for IgnoringMessageHandler {
type Target = IgnoringMessageHandler ;
fn deref ( & self ) -> & Self { self }
}
Use Infallible for the unconstructable default custom message type
When we landed custom messages, we used the empty tuple for the
custom message type for `IgnoringMessageHandler`. This was fine,
except that we also implemented `Writeable` to panic when writing
a `()`. Later, we added support for anchor output construction in
CommitmentTransaction, signified by setting a field to `Some(())`,
which is serialized as-is.
This causes us to panic when writing a `CommitmentTransaction`
with `opt_anchors` set. Note that we never set it inside of LDK,
but downstream users may.
Instead, we implement `Writeable` to write nothing for `()` and use
`core::convert::Infallible` for the default custom message type as
it is, appropriately, unconstructable.
This also makes it easier to implement various things in bindings,
as we can always assume `Infallible`-conversion logic is
unreachable.
2021-09-22 19:00:30 +00:00
// Implement Type for Infallible, note that it cannot be constructed, and thus you can never call a
// method that takes self for it.
impl wire ::Type for Infallible {
2021-08-18 10:41:01 -05:00
fn type_id ( & self ) -> u16 {
2021-08-05 14:51:17 +09:00
unreachable! ( ) ;
}
}
Use Infallible for the unconstructable default custom message type
When we landed custom messages, we used the empty tuple for the
custom message type for `IgnoringMessageHandler`. This was fine,
except that we also implemented `Writeable` to panic when writing
a `()`. Later, we added support for anchor output construction in
CommitmentTransaction, signified by setting a field to `Some(())`,
which is serialized as-is.
This causes us to panic when writing a `CommitmentTransaction`
with `opt_anchors` set. Note that we never set it inside of LDK,
but downstream users may.
Instead, we implement `Writeable` to write nothing for `()` and use
`core::convert::Infallible` for the default custom message type as
it is, appropriately, unconstructable.
This also makes it easier to implement various things in bindings,
as we can always assume `Infallible`-conversion logic is
unreachable.
2021-09-22 19:00:30 +00:00
impl Writeable for Infallible {
2021-08-05 14:51:17 +09:00
fn write < W : Writer > ( & self , _ : & mut W ) -> Result < ( ) , io ::Error > {
unreachable! ( ) ;
}
}
impl wire ::CustomMessageReader for IgnoringMessageHandler {
Use Infallible for the unconstructable default custom message type
When we landed custom messages, we used the empty tuple for the
custom message type for `IgnoringMessageHandler`. This was fine,
except that we also implemented `Writeable` to panic when writing
a `()`. Later, we added support for anchor output construction in
CommitmentTransaction, signified by setting a field to `Some(())`,
which is serialized as-is.
This causes us to panic when writing a `CommitmentTransaction`
with `opt_anchors` set. Note that we never set it inside of LDK,
but downstream users may.
Instead, we implement `Writeable` to write nothing for `()` and use
`core::convert::Infallible` for the default custom message type as
it is, appropriately, unconstructable.
This also makes it easier to implement various things in bindings,
as we can always assume `Infallible`-conversion logic is
unreachable.
2021-09-22 19:00:30 +00:00
type CustomMessage = Infallible ;
2021-08-05 14:51:17 +09:00
fn read < R : io ::Read > ( & self , _message_type : u16 , _buffer : & mut R ) -> Result < Option < Self ::CustomMessage > , msgs ::DecodeError > {
Ok ( None )
}
}
impl CustomMessageHandler for IgnoringMessageHandler {
2021-09-23 04:02:58 +00:00
fn handle_custom_message ( & self , _msg : Infallible , _sender_node_id : & PublicKey ) -> Result < ( ) , LightningError > {
2021-08-05 14:51:17 +09:00
// Since we always return `None` in the read the handle method should never be called.
unreachable! ( ) ;
}
fn get_and_clear_pending_msg ( & self ) -> Vec < ( PublicKey , Self ::CustomMessage ) > { Vec ::new ( ) }
}
2021-03-01 15:10:59 -05:00
/// A dummy struct which implements `ChannelMessageHandler` without having any channels.
/// You can provide one of these as the route_handler in a MessageHandler.
2021-03-02 10:25:37 -05:00
pub struct ErroringMessageHandler {
2021-03-01 15:10:59 -05:00
message_queue : Mutex < Vec < MessageSendEvent > >
}
impl ErroringMessageHandler {
/// Constructs a new ErroringMessageHandler
pub fn new ( ) -> Self {
Self { message_queue : Mutex ::new ( Vec ::new ( ) ) }
}
fn push_error ( & self , node_id : & PublicKey , channel_id : [ u8 ; 32 ] ) {
self . message_queue . lock ( ) . unwrap ( ) . push ( MessageSendEvent ::HandleError {
action : msgs ::ErrorAction ::SendErrorMessage {
msg : msgs ::ErrorMessage { channel_id , data : " We do not support channel messages, sorry. " . to_owned ( ) } ,
} ,
node_id : node_id . clone ( ) ,
} ) ;
}
}
impl MessageSendEventsProvider for ErroringMessageHandler {
fn get_and_clear_pending_msg_events ( & self ) -> Vec < MessageSendEvent > {
let mut res = Vec ::new ( ) ;
mem ::swap ( & mut res , & mut self . message_queue . lock ( ) . unwrap ( ) ) ;
res
}
}
impl ChannelMessageHandler for ErroringMessageHandler {
// Any messages which are related to a specific channel generate an error message to let the
// peer know we don't care about channels.
fn handle_open_channel ( & self , their_node_id : & PublicKey , _their_features : InitFeatures , msg : & msgs ::OpenChannel ) {
ErroringMessageHandler ::push_error ( self , their_node_id , msg . temporary_channel_id ) ;
}
fn handle_accept_channel ( & self , their_node_id : & PublicKey , _their_features : InitFeatures , msg : & msgs ::AcceptChannel ) {
ErroringMessageHandler ::push_error ( self , their_node_id , msg . temporary_channel_id ) ;
}
fn handle_funding_created ( & self , their_node_id : & PublicKey , msg : & msgs ::FundingCreated ) {
ErroringMessageHandler ::push_error ( self , their_node_id , msg . temporary_channel_id ) ;
}
fn handle_funding_signed ( & self , their_node_id : & PublicKey , msg : & msgs ::FundingSigned ) {
ErroringMessageHandler ::push_error ( self , their_node_id , msg . channel_id ) ;
}
fn handle_funding_locked ( & self , their_node_id : & PublicKey , msg : & msgs ::FundingLocked ) {
ErroringMessageHandler ::push_error ( self , their_node_id , msg . channel_id ) ;
}
fn handle_shutdown ( & self , their_node_id : & PublicKey , _their_features : & InitFeatures , msg : & msgs ::Shutdown ) {
ErroringMessageHandler ::push_error ( self , their_node_id , msg . channel_id ) ;
}
fn handle_closing_signed ( & self , their_node_id : & PublicKey , msg : & msgs ::ClosingSigned ) {
ErroringMessageHandler ::push_error ( self , their_node_id , msg . channel_id ) ;
}
fn handle_update_add_htlc ( & self , their_node_id : & PublicKey , msg : & msgs ::UpdateAddHTLC ) {
ErroringMessageHandler ::push_error ( self , their_node_id , msg . channel_id ) ;
}
fn handle_update_fulfill_htlc ( & self , their_node_id : & PublicKey , msg : & msgs ::UpdateFulfillHTLC ) {
ErroringMessageHandler ::push_error ( self , their_node_id , msg . channel_id ) ;
}
fn handle_update_fail_htlc ( & self , their_node_id : & PublicKey , msg : & msgs ::UpdateFailHTLC ) {
ErroringMessageHandler ::push_error ( self , their_node_id , msg . channel_id ) ;
}
fn handle_update_fail_malformed_htlc ( & self , their_node_id : & PublicKey , msg : & msgs ::UpdateFailMalformedHTLC ) {
ErroringMessageHandler ::push_error ( self , their_node_id , msg . channel_id ) ;
}
fn handle_commitment_signed ( & self , their_node_id : & PublicKey , msg : & msgs ::CommitmentSigned ) {
ErroringMessageHandler ::push_error ( self , their_node_id , msg . channel_id ) ;
}
fn handle_revoke_and_ack ( & self , their_node_id : & PublicKey , msg : & msgs ::RevokeAndACK ) {
ErroringMessageHandler ::push_error ( self , their_node_id , msg . channel_id ) ;
}
fn handle_update_fee ( & self , their_node_id : & PublicKey , msg : & msgs ::UpdateFee ) {
ErroringMessageHandler ::push_error ( self , their_node_id , msg . channel_id ) ;
}
fn handle_announcement_signatures ( & self , their_node_id : & PublicKey , msg : & msgs ::AnnouncementSignatures ) {
ErroringMessageHandler ::push_error ( self , their_node_id , msg . channel_id ) ;
}
fn handle_channel_reestablish ( & self , their_node_id : & PublicKey , msg : & msgs ::ChannelReestablish ) {
ErroringMessageHandler ::push_error ( self , their_node_id , msg . channel_id ) ;
}
2021-03-12 15:25:56 -05:00
// msgs::ChannelUpdate does not contain the channel_id field, so we just drop them.
fn handle_channel_update ( & self , _their_node_id : & PublicKey , _msg : & msgs ::ChannelUpdate ) { }
2021-03-01 15:10:59 -05:00
fn peer_disconnected ( & self , _their_node_id : & PublicKey , _no_connection_possible : bool ) { }
fn peer_connected ( & self , _their_node_id : & PublicKey , _msg : & msgs ::Init ) { }
fn handle_error ( & self , _their_node_id : & PublicKey , _msg : & msgs ::ErrorMessage ) { }
}
impl Deref for ErroringMessageHandler {
type Target = ErroringMessageHandler ;
fn deref ( & self ) -> & Self { self }
}
2018-09-19 17:39:43 -04:00
/// Provides references to trait impls which handle different types of messages.
2020-05-12 13:31:20 -04:00
pub struct MessageHandler < CM : Deref , RM : Deref > where
CM ::Target : ChannelMessageHandler ,
RM ::Target : RoutingMessageHandler {
2018-09-19 17:39:43 -04:00
/// A message handler which handles messages specific to channels. Usually this is just a
2021-06-17 22:54:46 +00:00
/// [`ChannelManager`] object or an [`ErroringMessageHandler`].
///
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
2020-01-16 13:26:38 -05:00
pub chan_handler : CM ,
2018-09-19 17:39:43 -04:00
/// A message handler which handles messages updating our knowledge of the network channel
2021-06-17 22:54:46 +00:00
/// graph. Usually this is just a [`NetGraphMsgHandler`] object or an
/// [`IgnoringMessageHandler`].
///
/// [`NetGraphMsgHandler`]: crate::routing::network_graph::NetGraphMsgHandler
2020-05-12 13:31:20 -04:00
pub route_handler : RM ,
2017-12-25 01:05:27 -05:00
}
/// Provides an object which can be used to send data to and which uniquely identifies a connection
/// to a remote host. You will need to be able to generate multiple of these which meet Eq and
/// implement Hash to meet the PeerManager API.
2018-09-20 12:57:47 -04:00
///
2018-02-27 23:38:52 +01:00
/// For efficiency, Clone should be relatively cheap for this type.
2018-09-20 12:57:47 -04:00
///
2021-06-17 22:54:46 +00:00
/// Two descriptors may compare equal (by [`cmp::Eq`] and [`hash::Hash`]) as long as the original
/// has been disconnected, the [`PeerManager`] has been informed of the disconnection (either by it
/// having triggered the disconnection or a call to [`PeerManager::socket_disconnected`]), and no
/// further calls to the [`PeerManager`] related to the original socket occur. This allows you to
/// use a file descriptor for your SocketDescriptor directly, however for simplicity you may wish
/// to simply use another value which is guaranteed to be globally unique instead.
2017-12-25 01:05:27 -05:00
pub trait SocketDescriptor : cmp ::Eq + hash ::Hash + Clone {
2019-07-31 02:57:08 +00:00
/// Attempts to send some data from the given slice to the peer.
///
2017-12-25 01:05:27 -05:00
/// Returns the amount of data which was sent, possibly 0 if the socket has since disconnected.
2021-06-17 22:54:46 +00:00
/// Note that in the disconnected case, [`PeerManager::socket_disconnected`] must still be
/// called and further write attempts may occur until that time.
2018-09-20 12:57:47 -04:00
///
2021-06-17 22:54:46 +00:00
/// If the returned size is smaller than `data.len()`, a
/// [`PeerManager::write_buffer_space_avail`] call must be made the next time more data can be
/// written. Additionally, until a `send_data` event completes fully, no further
/// [`PeerManager::read_event`] calls should be made for the same peer! Because this is to
/// prevent denial-of-service issues, you should not read or buffer any data from the socket
/// until then.
2018-09-20 12:57:47 -04:00
///
2021-06-17 22:54:46 +00:00
/// If a [`PeerManager::read_event`] call on this descriptor had previously returned true
/// (indicating that read events should be paused to prevent DoS in the send buffer),
/// `resume_read` may be set indicating that read events on this descriptor should resume. A
/// `resume_read` of false carries no meaning, and should not cause any action.
2019-07-31 02:57:08 +00:00
fn send_data ( & mut self , data : & [ u8 ] , resume_read : bool ) -> usize ;
2021-06-17 22:30:09 +00:00
/// Disconnect the socket pointed to by this SocketDescriptor.
2021-06-17 22:54:46 +00:00
///
/// You do *not* need to call [`PeerManager::socket_disconnected`] with this socket after this
/// call (doing so is a noop).
2018-07-23 01:06:45 +00:00
fn disconnect_socket ( & mut self ) ;
2017-12-25 01:05:27 -05:00
}
/// Error for PeerManager errors. If you get one of these, you must disconnect the socket and
2021-01-31 20:53:28 -05:00
/// generate no further read_event/write_buffer_space_avail/socket_disconnected calls for the
/// descriptor.
2021-02-10 22:25:42 -05:00
#[ derive(Clone) ]
2018-04-01 19:21:26 -04:00
pub struct PeerHandleError {
2018-09-19 17:39:43 -04:00
/// Used to indicate that we probably can't make any future connections to this peer, implying
/// we should go ahead and force-close any channels we have with it.
2020-05-13 21:25:51 -04:00
pub no_connection_possible : bool ,
2018-04-01 19:21:26 -04:00
}
2018-03-19 17:26:29 -04:00
impl fmt ::Debug for PeerHandleError {
fn fmt ( & self , formatter : & mut fmt ::Formatter ) -> Result < ( ) , fmt ::Error > {
2018-04-01 19:21:26 -04:00
formatter . write_str ( " Peer Sent Invalid Data " )
2018-03-19 17:26:29 -04:00
}
}
2018-06-14 18:03:16 -04:00
impl fmt ::Display for PeerHandleError {
fn fmt ( & self , formatter : & mut fmt ::Formatter ) -> Result < ( ) , fmt ::Error > {
formatter . write_str ( " Peer Sent Invalid Data " )
}
}
2021-08-01 18:22:06 +02:00
#[ cfg(feature = " std " ) ]
2018-06-14 18:03:16 -04:00
impl error ::Error for PeerHandleError {
fn description ( & self ) -> & str {
" Peer Sent Invalid Data "
}
}
2017-12-25 01:05:27 -05:00
2018-10-02 16:02:17 +02:00
enum InitSyncTracker {
NoSyncRequested ,
ChannelsSyncing ( u64 ) ,
NodesSyncing ( PublicKey ) ,
}
2021-10-25 17:31:34 +00:00
/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we stop
/// forwarding gossip messages to peers altogether.
const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO : usize = 2 ;
2021-06-10 18:26:57 +00:00
/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
/// we have fewer than this many messages in the outbound buffer again.
/// We also use this as the target number of outbound gossip messages to keep in the write buffer,
/// refilled as we send bytes.
const OUTBOUND_BUFFER_LIMIT_READ_PAUSE : usize = 10 ;
/// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
/// the peer.
2021-10-25 17:31:34 +00:00
const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP : usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO ;
2021-06-10 18:26:57 +00:00
2021-10-11 04:24:08 +00:00
/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through
/// the socket receive buffer before receiving the ping.
///
/// On a fairly old Arm64 board, with Linux defaults, this can take as long as 20 seconds, not
/// including any network delays, outbound traffic, or the same for messages from other peers.
///
/// Thus, to avoid needlessly disconnecting a peer, we allow a peer to take this many timer ticks
/// per connected peer to respond to a ping, as long as they send us at least one message during
/// each tick, ensuring we aren't actually just disconnected.
2022-03-07 19:02:15 +00:00
/// With a timer tick interval of ten seconds, this translates to about 40 seconds per connected
2021-10-11 04:24:08 +00:00
/// peer.
///
/// When we improve parallelism somewhat we should reduce this to e.g. this many timer ticks per
/// two connected peers, assuming most LDK-running systems have at least two cores.
2022-03-07 19:02:15 +00:00
const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER : i8 = 4 ;
2021-10-11 04:24:08 +00:00
/// This is the minimum number of messages we expect a peer to be able to handle within one timer
/// tick. Once we have sent this many messages since the last ping, we send a ping right away to
/// ensures we don't just fill up our send buffer and leave the peer with too many messages to
/// process before the next ping.
const BUFFER_DRAIN_MSGS_PER_TICK : usize = 32 ;
2021-06-10 18:26:57 +00:00
2017-12-25 01:05:27 -05:00
struct Peer {
channel_encryptor : PeerChannelEncryptor ,
their_node_id : Option < PublicKey > ,
2020-01-06 17:54:02 -05:00
their_features : Option < InitFeatures > ,
2022-03-13 00:40:35 +05:30
their_net_address : Option < NetAddress > ,
2017-12-25 01:05:27 -05:00
pending_outbound_buffer : LinkedList < Vec < u8 > > ,
pending_outbound_buffer_first_msg_offset : usize ,
awaiting_write_event : bool ,
pending_read_buffer : Vec < u8 > ,
pending_read_buffer_pos : usize ,
pending_read_is_header : bool ,
2018-10-02 16:02:17 +02:00
sync_status : InitSyncTracker ,
2019-09-20 11:16:45 -04:00
2021-10-11 04:24:08 +00:00
msgs_sent_since_pong : usize ,
awaiting_pong_timer_tick_intervals : i8 ,
received_message_since_timer_tick : bool ,
2018-10-02 16:02:17 +02:00
}
impl Peer {
2019-01-24 16:41:51 +02:00
/// Returns true if the channel announcements/updates for the given channel should be
2018-10-02 16:02:17 +02:00
/// forwarded to this peer.
/// If we are sending our routing table to this peer and we have not yet sent channel
/// announcements/updates for the given channel_id then we will send it when we get to that
/// point and we shouldn't send it yet to avoid sending duplicate updates. If we've already
/// sent the old versions, we should send the update, and so return true here.
2020-01-02 20:32:37 -05:00
fn should_forward_channel_announcement ( & self , channel_id : u64 ) ->bool {
2018-10-02 16:02:17 +02:00
match self . sync_status {
InitSyncTracker ::NoSyncRequested = > true ,
InitSyncTracker ::ChannelsSyncing ( i ) = > i < channel_id ,
InitSyncTracker ::NodesSyncing ( _ ) = > true ,
}
}
2020-01-02 20:32:37 -05:00
/// Similar to the above, but for node announcements indexed by node_id.
fn should_forward_node_announcement ( & self , node_id : PublicKey ) -> bool {
match self . sync_status {
InitSyncTracker ::NoSyncRequested = > true ,
InitSyncTracker ::ChannelsSyncing ( _ ) = > false ,
InitSyncTracker ::NodesSyncing ( pk ) = > pk < node_id ,
}
}
2017-12-25 01:05:27 -05:00
}
struct PeerHolder < Descriptor : SocketDescriptor > {
peers : HashMap < Descriptor , Peer > ,
/// Only add to this set when noise completes:
node_id_to_descriptor : HashMap < PublicKey , Descriptor > ,
}
2020-01-16 13:26:38 -05:00
/// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
/// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static
/// lifetimes). Other times you can afford a reference, which is more efficient, in which case
/// SimpleRefPeerManager is the more appropriate type. Defining these type aliases prevents
/// issues such as overly long function definitions.
2022-03-18 03:28:25 +00:00
///
/// (C-not exported) as Arcs don't make sense in bindings
2021-11-01 13:14:14 -05:00
pub type SimpleArcPeerManager < SD , M , T , F , C , L > = PeerManager < SD , Arc < SimpleArcChannelManager < M , T , F , L > > , Arc < NetGraphMsgHandler < Arc < NetworkGraph > , Arc < C > , Arc < L > > > , Arc < L > , Arc < IgnoringMessageHandler > > ;
2020-01-16 13:26:38 -05:00
/// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference
/// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't
/// need a PeerManager with a static lifetime. You'll need a static lifetime in cases such as
/// usage of lightning-net-tokio (since tokio::spawn requires parameters with static lifetimes).
/// But if this is not necessary, using a reference is more efficient. Defining these type aliases
/// helps with issues such as long function definitions.
2022-03-18 03:28:25 +00:00
///
/// (C-not exported) as Arcs don't make sense in bindings
2021-11-01 13:14:14 -05:00
pub type SimpleRefPeerManager < ' a , ' b , ' c , ' d , ' e , ' f , ' g , ' h , SD , M , T , F , C , L > = PeerManager < SD , SimpleRefChannelManager < ' a , ' b , ' c , ' d , ' e , M , T , F , L > , & ' e NetGraphMsgHandler < & ' g NetworkGraph , & ' h C , & ' f L > , & ' f L , IgnoringMessageHandler > ;
2020-01-16 13:26:38 -05:00
2021-06-17 22:54:46 +00:00
/// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls
/// socket events into messages which it passes on to its [`MessageHandler`].
///
/// Locks are taken internally, so you must never assume that reentrancy from a
/// [`SocketDescriptor`] call back into [`PeerManager`] methods will not deadlock.
///
/// Calls to [`read_event`] will decode relevant messages and pass them to the
/// [`ChannelMessageHandler`], likely doing message processing in-line. Thus, the primary form of
/// parallelism in Rust-Lightning is in calls to [`read_event`]. Note, however, that calls to any
/// [`PeerManager`] functions related to the same connection must occur only in serial, making new
/// calls only after previous ones have returned.
2020-01-16 13:26:38 -05:00
///
/// Rather than using a plain PeerManager, it is preferable to use either a SimpleArcPeerManager
/// a SimpleRefPeerManager, for conciseness. See their documentation for more details, but
/// essentially you should default to using a SimpleRefPeerManager, and use a
/// SimpleArcPeerManager when you require a PeerManager with a static lifetime, such as when
/// you're using lightning-net-tokio.
2021-06-17 22:54:46 +00:00
///
/// [`read_event`]: PeerManager::read_event
2021-08-05 14:51:17 +09:00
pub struct PeerManager < Descriptor : SocketDescriptor , CM : Deref , RM : Deref , L : Deref , CMH : Deref > where
2020-05-12 13:31:20 -04:00
CM ::Target : ChannelMessageHandler ,
RM ::Target : RoutingMessageHandler ,
2021-08-05 14:51:17 +09:00
L ::Target : Logger ,
CMH ::Target : CustomMessageHandler {
2020-05-12 13:31:20 -04:00
message_handler : MessageHandler < CM , RM > ,
2017-12-25 01:05:27 -05:00
peers : Mutex < PeerHolder < Descriptor > > ,
our_node_secret : SecretKey ,
2019-07-18 22:17:36 -04:00
ephemeral_key_midstate : Sha256Engine ,
2021-08-05 14:51:17 +09:00
custom_message_handler : CMH ,
2019-07-18 22:17:36 -04:00
2021-10-08 22:54:32 +00:00
peer_counter : AtomicCounter ,
2019-07-18 22:17:36 -04:00
2020-05-17 12:13:29 -04:00
logger : L ,
2017-12-25 01:05:27 -05:00
}
2020-05-22 13:03:49 -07:00
enum MessageHandlingError {
PeerHandleError ( PeerHandleError ) ,
LightningError ( LightningError ) ,
}
impl From < PeerHandleError > for MessageHandlingError {
fn from ( error : PeerHandleError ) -> Self {
MessageHandlingError ::PeerHandleError ( error )
}
}
impl From < LightningError > for MessageHandlingError {
fn from ( error : LightningError ) -> Self {
MessageHandlingError ::LightningError ( error )
}
}
2017-12-25 01:05:27 -05:00
macro_rules ! encode_msg {
2020-01-21 15:26:21 -08:00
( $msg : expr ) = > { {
let mut buffer = VecWriter ( Vec ::new ( ) ) ;
wire ::write ( $msg , & mut buffer ) . unwrap ( ) ;
buffer . 0
2018-09-10 16:13:26 +09:00
} }
2017-12-25 01:05:27 -05:00
}
2021-08-05 14:51:17 +09:00
impl < Descriptor : SocketDescriptor , CM : Deref , L : Deref > PeerManager < Descriptor , CM , IgnoringMessageHandler , L , IgnoringMessageHandler > where
2021-03-01 18:04:20 -05:00
CM ::Target : ChannelMessageHandler ,
L ::Target : Logger {
/// Constructs a new PeerManager with the given ChannelMessageHandler. No routing message
/// handler is used and network graph messages are ignored.
///
/// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
/// cryptographically secure random bytes.
///
/// (C-not exported) as we can't export a PeerManager with a dummy route handler
pub fn new_channel_only ( channel_message_handler : CM , our_node_secret : SecretKey , ephemeral_random_data : & [ u8 ; 32 ] , logger : L ) -> Self {
Self ::new ( MessageHandler {
chan_handler : channel_message_handler ,
route_handler : IgnoringMessageHandler { } ,
2021-08-05 14:51:17 +09:00
} , our_node_secret , ephemeral_random_data , logger , IgnoringMessageHandler { } )
2021-03-01 18:04:20 -05:00
}
}
2021-08-05 14:51:17 +09:00
impl < Descriptor : SocketDescriptor , RM : Deref , L : Deref > PeerManager < Descriptor , ErroringMessageHandler , RM , L , IgnoringMessageHandler > where
2021-03-01 18:04:20 -05:00
RM ::Target : RoutingMessageHandler ,
L ::Target : Logger {
/// Constructs a new PeerManager with the given RoutingMessageHandler. No channel message
/// handler is used and messages related to channels will be ignored (or generate error
/// messages). Note that some other lightning implementations time-out connections after some
/// time if no channel is built with the peer.
///
/// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
/// cryptographically secure random bytes.
///
/// (C-not exported) as we can't export a PeerManager with a dummy channel handler
pub fn new_routing_only ( routing_message_handler : RM , our_node_secret : SecretKey , ephemeral_random_data : & [ u8 ; 32 ] , logger : L ) -> Self {
Self ::new ( MessageHandler {
chan_handler : ErroringMessageHandler ::new ( ) ,
route_handler : routing_message_handler ,
2021-08-05 14:51:17 +09:00
} , our_node_secret , ephemeral_random_data , logger , IgnoringMessageHandler { } )
2021-03-01 18:04:20 -05:00
}
}
2021-10-27 20:06:13 +00:00
/// A simple wrapper that optionally prints " from <pubkey>" for an optional pubkey.
/// This works around `format!()` taking a reference to each argument, preventing
/// `if let Some(node_id) = peer.their_node_id { format!(.., node_id) } else { .. }` from compiling
/// due to lifetime errors.
struct OptionalFromDebugger < ' a > ( & ' a Option < PublicKey > ) ;
impl core ::fmt ::Display for OptionalFromDebugger < '_ > {
fn fmt ( & self , f : & mut core ::fmt ::Formatter < '_ > ) -> Result < ( ) , core ::fmt ::Error > {
if let Some ( node_id ) = self . 0 { write! ( f , " from {} " , log_pubkey! ( node_id ) ) } else { Ok ( ( ) ) }
}
}
2022-03-13 00:40:35 +05:30
/// A function used to filter out local or private addresses
/// https://www.iana.org./assignments/ipv4-address-space/ipv4-address-space.xhtml
/// https://www.iana.org/assignments/ipv6-address-space/ipv6-address-space.xhtml
fn filter_addresses ( ip_address : Option < NetAddress > ) -> Option < NetAddress > {
match ip_address {
// For IPv4 range 10.0.0.0 - 10.255.255.255 (10/8)
2022-03-16 04:03:09 +05:30
Some ( NetAddress ::IPv4 { addr : [ 10 , _ , _ , _ ] , port : _ } ) = > None ,
2022-03-13 00:40:35 +05:30
// For IPv4 range 0.0.0.0 - 0.255.255.255 (0/8)
2022-03-16 04:03:09 +05:30
Some ( NetAddress ::IPv4 { addr : [ 0 , _ , _ , _ ] , port : _ } ) = > None ,
2022-03-13 00:40:35 +05:30
// For IPv4 range 100.64.0.0 - 100.127.255.255 (100.64/10)
2022-03-16 04:03:09 +05:30
Some ( NetAddress ::IPv4 { addr : [ 100 , 64 ..= 127 , _ , _ ] , port : _ } ) = > None ,
2022-03-13 00:40:35 +05:30
// For IPv4 range 127.0.0.0 - 127.255.255.255 (127/8)
2022-03-16 04:03:09 +05:30
Some ( NetAddress ::IPv4 { addr : [ 127 , _ , _ , _ ] , port : _ } ) = > None ,
2022-03-13 00:40:35 +05:30
// For IPv4 range 169.254.0.0 - 169.254.255.255 (169.254/16)
2022-03-16 04:03:09 +05:30
Some ( NetAddress ::IPv4 { addr : [ 169 , 254 , _ , _ ] , port : _ } ) = > None ,
2022-03-13 00:40:35 +05:30
// For IPv4 range 172.16.0.0 - 172.31.255.255 (172.16/12)
2022-03-16 04:03:09 +05:30
Some ( NetAddress ::IPv4 { addr : [ 172 , 16 ..= 31 , _ , _ ] , port : _ } ) = > None ,
2022-03-13 00:40:35 +05:30
// For IPv4 range 192.168.0.0 - 192.168.255.255 (192.168/16)
2022-03-16 04:03:09 +05:30
Some ( NetAddress ::IPv4 { addr : [ 192 , 168 , _ , _ ] , port : _ } ) = > None ,
2022-03-13 00:40:35 +05:30
// For IPv4 range 192.88.99.0 - 192.88.99.255 (192.88.99/24)
2022-03-16 04:03:09 +05:30
Some ( NetAddress ::IPv4 { addr : [ 192 , 88 , 99 , _ ] , port : _ } ) = > None ,
2022-03-13 00:40:35 +05:30
// For IPv6 range 2000:0000:0000:0000:0000:0000:0000:0000 - 3fff:ffff:ffff:ffff:ffff:ffff:ffff:ffff (2000::/3)
Some ( NetAddress ::IPv6 { addr : [ 0x20 ..= 0x3F , _ , _ , _ , _ , _ , _ , _ , _ , _ , _ , _ , _ , _ , _ , _ ] , port : _ } ) = > ip_address ,
// For remaining addresses
Some ( NetAddress ::IPv6 { addr : _ , port : _ } ) = > None ,
Some ( .. ) = > ip_address ,
None = > None ,
}
}
2021-08-05 14:51:17 +09:00
impl < Descriptor : SocketDescriptor , CM : Deref , RM : Deref , L : Deref , CMH : Deref > PeerManager < Descriptor , CM , RM , L , CMH > where
2020-05-12 13:31:20 -04:00
CM ::Target : ChannelMessageHandler ,
RM ::Target : RoutingMessageHandler ,
2021-08-05 14:51:17 +09:00
L ::Target : Logger ,
2021-09-22 23:45:27 +00:00
CMH ::Target : CustomMessageHandler {
2018-09-19 17:39:43 -04:00
/// Constructs a new PeerManager with the given message handlers and node_id secret key
2019-07-18 22:17:36 -04:00
/// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
/// cryptographically secure random bytes.
2021-08-05 14:51:17 +09:00
pub fn new ( message_handler : MessageHandler < CM , RM > , our_node_secret : SecretKey , ephemeral_random_data : & [ u8 ; 32 ] , logger : L , custom_message_handler : CMH ) -> Self {
2019-07-18 22:17:36 -04:00
let mut ephemeral_key_midstate = Sha256 ::engine ( ) ;
ephemeral_key_midstate . input ( ephemeral_random_data ) ;
2017-12-25 01:05:27 -05:00
PeerManager {
2020-05-17 12:13:29 -04:00
message_handler ,
2018-10-20 18:17:19 -04:00
peers : Mutex ::new ( PeerHolder {
peers : HashMap ::new ( ) ,
node_id_to_descriptor : HashMap ::new ( )
} ) ,
2020-05-17 12:13:29 -04:00
our_node_secret ,
2019-07-18 22:17:36 -04:00
ephemeral_key_midstate ,
2021-10-08 22:54:32 +00:00
peer_counter : AtomicCounter ::new ( ) ,
2018-07-25 02:34:51 +00:00
logger ,
2021-08-05 14:51:17 +09:00
custom_message_handler ,
2017-12-25 01:05:27 -05:00
}
}
2018-07-22 13:11:58 -04:00
/// Get the list of node ids for peers which have completed the initial handshake.
2018-09-20 12:57:47 -04:00
///
2018-07-22 13:11:58 -04:00
/// For outbound connections, this will be the same as the their_node_id parameter passed in to
/// new_outbound_connection, however entries will only appear once the initial handshake has
/// completed and we are sure the remote peer has the private key for the given node_id.
2018-07-21 04:04:28 +09:00
pub fn get_peer_node_ids ( & self ) -> Vec < PublicKey > {
let peers = self . peers . lock ( ) . unwrap ( ) ;
2018-09-08 13:57:20 -04:00
peers . peers . values ( ) . filter_map ( | p | {
2019-12-23 17:52:58 -05:00
if ! p . channel_encryptor . is_ready_for_encryption ( ) | | p . their_features . is_none ( ) {
2018-09-08 13:57:20 -04:00
return None ;
}
p . their_node_id
} ) . collect ( )
2018-07-21 04:04:28 +09:00
}
2019-07-18 22:17:36 -04:00
fn get_ephemeral_key ( & self ) -> SecretKey {
let mut ephemeral_hash = self . ephemeral_key_midstate . clone ( ) ;
2021-10-08 22:54:32 +00:00
let counter = self . peer_counter . get_increment ( ) ;
ephemeral_hash . input ( & counter . to_le_bytes ( ) ) ;
2019-07-18 22:17:36 -04:00
SecretKey ::from_slice ( & Sha256 ::from_engine ( ephemeral_hash ) . into_inner ( ) ) . expect ( " You broke SHA-256! " )
}
2022-03-13 00:40:35 +05:30
/// Indicates a new outbound connection has been established to a node with the given node_id
/// and an optional remote network address.
2022-03-16 04:03:09 +05:30
///
2022-03-13 00:40:35 +05:30
/// The remote network address adds the option to report a remote IP address back to a connecting
2022-03-16 04:03:09 +05:30
/// peer using the init message.
/// The user should pass the remote network address of the host they are connected to.
2022-03-13 00:40:35 +05:30
///
2020-02-20 15:12:42 -05:00
/// Note that if an Err is returned here you MUST NOT call socket_disconnected for the new
2017-12-25 01:05:27 -05:00
/// descriptor but must disconnect the connection immediately.
2018-09-20 12:57:47 -04:00
///
2018-10-20 18:17:19 -04:00
/// Returns a small number of bytes to send to the remote node (currently always 50).
2018-09-20 12:57:47 -04:00
///
2021-06-17 22:54:46 +00:00
/// Panics if descriptor is duplicative with some other descriptor which has not yet been
/// [`socket_disconnected()`].
///
/// [`socket_disconnected()`]: PeerManager::socket_disconnected
2022-03-13 00:40:35 +05:30
pub fn new_outbound_connection ( & self , their_node_id : PublicKey , descriptor : Descriptor , remote_network_address : Option < NetAddress > ) -> Result < Vec < u8 > , PeerHandleError > {
2019-07-18 22:17:36 -04:00
let mut peer_encryptor = PeerChannelEncryptor ::new_outbound ( their_node_id . clone ( ) , self . get_ephemeral_key ( ) ) ;
2017-12-25 01:05:27 -05:00
let res = peer_encryptor . get_act_one ( ) . to_vec ( ) ;
let pending_read_buffer = [ 0 ; 50 ] . to_vec ( ) ; // Noise act two is 50 bytes
let mut peers = self . peers . lock ( ) . unwrap ( ) ;
if peers . peers . insert ( descriptor , Peer {
channel_encryptor : peer_encryptor ,
2018-11-08 10:36:34 +10:30
their_node_id : None ,
2019-12-23 17:52:58 -05:00
their_features : None ,
2022-03-13 00:40:35 +05:30
their_net_address : remote_network_address ,
2017-12-25 01:05:27 -05:00
pending_outbound_buffer : LinkedList ::new ( ) ,
pending_outbound_buffer_first_msg_offset : 0 ,
awaiting_write_event : false ,
2020-10-06 16:47:23 -07:00
pending_read_buffer ,
2017-12-25 01:05:27 -05:00
pending_read_buffer_pos : 0 ,
pending_read_is_header : false ,
2018-10-02 16:02:17 +02:00
sync_status : InitSyncTracker ::NoSyncRequested ,
2019-09-20 11:16:45 -04:00
2021-10-11 04:24:08 +00:00
msgs_sent_since_pong : 0 ,
awaiting_pong_timer_tick_intervals : 0 ,
received_message_since_timer_tick : false ,
2017-12-25 01:05:27 -05:00
} ) . is_some ( ) {
panic! ( " PeerManager driver duplicated descriptors! " ) ;
} ;
Ok ( res )
}
2022-03-13 00:40:35 +05:30
/// Indicates a new inbound connection has been established to a node with an optional remote
/// network address.
2018-09-20 12:57:47 -04:00
///
2022-03-16 04:03:09 +05:30
/// The remote network address adds the option to report a remote IP address back to a connecting
/// peer using the init message.
/// The user should pass the remote network address of the host they are connected to.
2018-09-20 12:57:47 -04:00
///
2017-12-25 01:05:27 -05:00
/// May refuse the connection by returning an Err, but will never write bytes to the remote end
/// (outbound connector always speaks first). Note that if an Err is returned here you MUST NOT
2020-02-20 15:12:42 -05:00
/// call socket_disconnected for the new descriptor but must disconnect the connection
2017-12-25 01:05:27 -05:00
/// immediately.
2018-09-20 12:57:47 -04:00
///
2021-06-17 22:54:46 +00:00
/// Panics if descriptor is duplicative with some other descriptor which has not yet been
/// [`socket_disconnected()`].
///
/// [`socket_disconnected()`]: PeerManager::socket_disconnected
2022-03-13 00:40:35 +05:30
pub fn new_inbound_connection ( & self , descriptor : Descriptor , remote_network_address : Option < NetAddress > ) -> Result < ( ) , PeerHandleError > {
2017-12-25 01:05:27 -05:00
let peer_encryptor = PeerChannelEncryptor ::new_inbound ( & self . our_node_secret ) ;
let pending_read_buffer = [ 0 ; 50 ] . to_vec ( ) ; // Noise act one is 50 bytes
let mut peers = self . peers . lock ( ) . unwrap ( ) ;
if peers . peers . insert ( descriptor , Peer {
channel_encryptor : peer_encryptor ,
their_node_id : None ,
2019-12-23 17:52:58 -05:00
their_features : None ,
2022-03-13 00:40:35 +05:30
their_net_address : remote_network_address ,
2017-12-25 01:05:27 -05:00
pending_outbound_buffer : LinkedList ::new ( ) ,
pending_outbound_buffer_first_msg_offset : 0 ,
awaiting_write_event : false ,
2020-10-06 16:47:23 -07:00
pending_read_buffer ,
2017-12-25 01:05:27 -05:00
pending_read_buffer_pos : 0 ,
pending_read_is_header : false ,
2018-10-02 16:02:17 +02:00
sync_status : InitSyncTracker ::NoSyncRequested ,
2019-09-20 11:16:45 -04:00
2021-10-11 04:24:08 +00:00
msgs_sent_since_pong : 0 ,
awaiting_pong_timer_tick_intervals : 0 ,
received_message_since_timer_tick : false ,
2017-12-25 01:05:27 -05:00
} ) . is_some ( ) {
panic! ( " PeerManager driver duplicated descriptors! " ) ;
} ;
Ok ( ( ) )
}
2018-10-02 16:02:17 +02:00
fn do_attempt_write_data ( & self , descriptor : & mut Descriptor , peer : & mut Peer ) {
2017-12-25 01:05:27 -05:00
while ! peer . awaiting_write_event {
2021-10-11 04:24:08 +00:00
if peer . pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE & & peer . msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK {
2018-10-02 16:02:17 +02:00
match peer . sync_status {
InitSyncTracker ::NoSyncRequested = > { } ,
InitSyncTracker ::ChannelsSyncing ( c ) if c < 0xffff_ffff_ffff_ffff = > {
2021-06-10 18:26:57 +00:00
let steps = ( ( OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer . pending_outbound_buffer . len ( ) + 2 ) / 3 ) as u8 ;
2020-02-23 18:04:03 -05:00
let all_messages = self . message_handler . route_handler . get_next_channel_announcements ( c , steps ) ;
2020-02-24 13:18:50 -05:00
for & ( ref announce , ref update_a_option , ref update_b_option ) in all_messages . iter ( ) {
2021-06-22 01:30:19 +00:00
self . enqueue_message ( peer , announce ) ;
2020-02-24 13:18:50 -05:00
if let & Some ( ref update_a ) = update_a_option {
2021-06-22 01:30:19 +00:00
self . enqueue_message ( peer , update_a ) ;
2020-02-24 13:18:50 -05:00
}
if let & Some ( ref update_b ) = update_b_option {
2021-06-22 01:30:19 +00:00
self . enqueue_message ( peer , update_b ) ;
2020-02-24 13:18:50 -05:00
}
2018-10-02 16:02:17 +02:00
peer . sync_status = InitSyncTracker ::ChannelsSyncing ( announce . contents . short_channel_id + 1 ) ;
}
if all_messages . is_empty ( ) | | all_messages . len ( ) ! = steps as usize {
peer . sync_status = InitSyncTracker ::ChannelsSyncing ( 0xffff_ffff_ffff_ffff ) ;
}
} ,
InitSyncTracker ::ChannelsSyncing ( c ) if c = = 0xffff_ffff_ffff_ffff = > {
2021-06-10 18:26:57 +00:00
let steps = ( OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer . pending_outbound_buffer . len ( ) ) as u8 ;
2018-10-02 16:02:17 +02:00
let all_messages = self . message_handler . route_handler . get_next_node_announcements ( None , steps ) ;
for msg in all_messages . iter ( ) {
2021-06-22 01:30:19 +00:00
self . enqueue_message ( peer , msg ) ;
2018-10-02 16:02:17 +02:00
peer . sync_status = InitSyncTracker ::NodesSyncing ( msg . contents . node_id ) ;
}
if all_messages . is_empty ( ) | | all_messages . len ( ) ! = steps as usize {
peer . sync_status = InitSyncTracker ::NoSyncRequested ;
}
} ,
InitSyncTracker ::ChannelsSyncing ( _ ) = > unreachable! ( ) ,
InitSyncTracker ::NodesSyncing ( key ) = > {
2021-06-10 18:26:57 +00:00
let steps = ( OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer . pending_outbound_buffer . len ( ) ) as u8 ;
2018-10-02 16:02:17 +02:00
let all_messages = self . message_handler . route_handler . get_next_node_announcements ( Some ( & key ) , steps ) ;
for msg in all_messages . iter ( ) {
2021-06-22 01:30:19 +00:00
self . enqueue_message ( peer , msg ) ;
2018-10-02 16:02:17 +02:00
peer . sync_status = InitSyncTracker ::NodesSyncing ( msg . contents . node_id ) ;
}
if all_messages . is_empty ( ) | | all_messages . len ( ) ! = steps as usize {
peer . sync_status = InitSyncTracker ::NoSyncRequested ;
}
} ,
}
}
2021-10-11 04:24:08 +00:00
if peer . msgs_sent_since_pong > = BUFFER_DRAIN_MSGS_PER_TICK {
self . maybe_send_extra_ping ( peer ) ;
}
2018-10-02 16:02:17 +02:00
2017-12-25 01:05:27 -05:00
if {
let next_buff = match peer . pending_outbound_buffer . front ( ) {
None = > return ,
Some ( buff ) = > buff ,
} ;
2021-06-10 18:26:57 +00:00
let should_be_reading = peer . pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE ;
2019-07-31 02:57:08 +00:00
let pending = & next_buff [ peer . pending_outbound_buffer_first_msg_offset .. ] ;
let data_sent = descriptor . send_data ( pending , should_be_reading ) ;
2017-12-25 01:05:27 -05:00
peer . pending_outbound_buffer_first_msg_offset + = data_sent ;
if peer . pending_outbound_buffer_first_msg_offset = = next_buff . len ( ) { true } else { false }
} {
peer . pending_outbound_buffer_first_msg_offset = 0 ;
peer . pending_outbound_buffer . pop_front ( ) ;
} else {
peer . awaiting_write_event = true ;
}
}
}
/// Indicates that there is room to write data to the given socket descriptor.
2018-09-20 12:57:47 -04:00
///
2017-12-25 01:05:27 -05:00
/// May return an Err to indicate that the connection should be closed.
2018-09-20 12:57:47 -04:00
///
2021-06-17 22:54:46 +00:00
/// May call [`send_data`] on the descriptor passed in (or an equal descriptor) before
/// returning. Thus, be very careful with reentrancy issues! The invariants around calling
/// [`write_buffer_space_avail`] in case a write did not fully complete must still hold - be
/// ready to call `[write_buffer_space_avail`] again if a write call generated here isn't
/// sufficient!
///
/// [`send_data`]: SocketDescriptor::send_data
/// [`write_buffer_space_avail`]: PeerManager::write_buffer_space_avail
2020-02-20 15:12:42 -05:00
pub fn write_buffer_space_avail ( & self , descriptor : & mut Descriptor ) -> Result < ( ) , PeerHandleError > {
2017-12-25 01:05:27 -05:00
let mut peers = self . peers . lock ( ) . unwrap ( ) ;
match peers . peers . get_mut ( descriptor ) {
2021-06-17 22:30:09 +00:00
None = > {
// This is most likely a simple race condition where the user found that the socket
// was writeable, then we told the user to `disconnect_socket()`, then they called
// this method. Return an error to make sure we get disconnected.
return Err ( PeerHandleError { no_connection_possible : false } ) ;
} ,
2017-12-25 01:05:27 -05:00
Some ( peer ) = > {
peer . awaiting_write_event = false ;
2018-10-02 16:02:17 +02:00
self . do_attempt_write_data ( descriptor , peer ) ;
2017-12-25 01:05:27 -05:00
}
} ;
Ok ( ( ) )
}
/// Indicates that data was read from the given socket descriptor.
2018-09-20 12:57:47 -04:00
///
2017-12-25 01:05:27 -05:00
/// May return an Err to indicate that the connection should be closed.
2018-09-20 12:57:47 -04:00
///
2021-06-17 22:54:46 +00:00
/// Will *not* call back into [`send_data`] on any descriptors to avoid reentrancy complexity.
/// Thus, however, you should call [`process_events`] after any `read_event` to generate
/// [`send_data`] calls to handle responses.
2018-09-20 12:57:47 -04:00
///
2021-06-17 22:54:46 +00:00
/// If `Ok(true)` is returned, further read_events should not be triggered until a
/// [`send_data`] call on this descriptor has `resume_read` set (preventing DoS issues in the
/// send buffer).
2018-09-20 12:57:47 -04:00
///
2021-06-17 22:54:46 +00:00
/// [`send_data`]: SocketDescriptor::send_data
/// [`process_events`]: PeerManager::process_events
2020-01-31 20:57:01 -05:00
pub fn read_event ( & self , peer_descriptor : & mut Descriptor , data : & [ u8 ] ) -> Result < bool , PeerHandleError > {
2018-03-19 17:26:29 -04:00
match self . do_read_event ( peer_descriptor , data ) {
Ok ( res ) = > Ok ( res ) ,
Err ( e ) = > {
2021-08-17 00:03:31 +00:00
log_trace! ( self . logger , " Peer sent invalid data or we decided to disconnect due to a protocol error " ) ;
2018-04-01 19:23:09 -04:00
self . disconnect_event_internal ( peer_descriptor , e . no_connection_possible ) ;
2018-03-19 17:26:29 -04:00
Err ( e )
}
}
}
2021-10-28 17:43:58 +00:00
/// Append a message to a peer's pending outbound/write buffer
fn enqueue_encoded_message ( & self , peer : & mut Peer , encoded_message : & Vec < u8 > ) {
2021-10-11 04:24:08 +00:00
peer . msgs_sent_since_pong + = 1 ;
2021-10-28 17:43:58 +00:00
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encoded_message [ .. ] ) ) ;
}
/// Append a message to a peer's pending outbound/write buffer
2021-09-22 23:45:27 +00:00
fn enqueue_message < M : wire ::Type > ( & self , peer : & mut Peer , message : & M ) {
2021-08-29 05:26:39 +00:00
let mut buffer = VecWriter ( Vec ::with_capacity ( 2048 ) ) ;
2020-05-18 10:55:28 -07:00
wire ::write ( message , & mut buffer ) . unwrap ( ) ; // crash if the write failed
2021-11-22 18:19:08 +01:00
if is_gossip_msg ( message . type_id ( ) ) {
log_gossip! ( self . logger , " Enqueueing message {:?} to {} " , message , log_pubkey! ( peer . their_node_id . unwrap ( ) ) ) ;
} else {
log_trace! ( self . logger , " Enqueueing message {:?} to {} " , message , log_pubkey! ( peer . their_node_id . unwrap ( ) ) )
}
2021-10-28 17:43:58 +00:00
self . enqueue_encoded_message ( peer , & buffer . 0 ) ;
2020-05-18 10:55:28 -07:00
}
2020-01-31 20:57:01 -05:00
fn do_read_event ( & self , peer_descriptor : & mut Descriptor , data : & [ u8 ] ) -> Result < bool , PeerHandleError > {
2017-12-25 01:05:27 -05:00
let pause_read = {
2018-09-08 13:56:45 -04:00
let mut peers_lock = self . peers . lock ( ) . unwrap ( ) ;
2020-01-16 10:48:16 -08:00
let peers = & mut * peers_lock ;
2021-06-10 18:20:16 +00:00
let mut msgs_to_forward = Vec ::new ( ) ;
let mut peer_node_id = None ;
2018-09-08 13:56:45 -04:00
let pause_read = match peers . peers . get_mut ( peer_descriptor ) {
2021-06-17 22:30:09 +00:00
None = > {
// This is most likely a simple race condition where the user read some bytes
// from the socket, then we told the user to `disconnect_socket()`, then they
// called this method. Return an error to make sure we get disconnected.
return Err ( PeerHandleError { no_connection_possible : false } ) ;
} ,
2017-12-25 01:05:27 -05:00
Some ( peer ) = > {
assert! ( peer . pending_read_buffer . len ( ) > 0 ) ;
assert! ( peer . pending_read_buffer . len ( ) > peer . pending_read_buffer_pos ) ;
let mut read_pos = 0 ;
while read_pos < data . len ( ) {
{
let data_to_copy = cmp ::min ( peer . pending_read_buffer . len ( ) - peer . pending_read_buffer_pos , data . len ( ) - read_pos ) ;
peer . pending_read_buffer [ peer . pending_read_buffer_pos .. peer . pending_read_buffer_pos + data_to_copy ] . copy_from_slice ( & data [ read_pos .. read_pos + data_to_copy ] ) ;
read_pos + = data_to_copy ;
peer . pending_read_buffer_pos + = data_to_copy ;
}
2018-04-01 19:23:09 -04:00
2017-12-25 01:05:27 -05:00
if peer . pending_read_buffer_pos = = peer . pending_read_buffer . len ( ) {
2018-04-01 19:23:09 -04:00
peer . pending_read_buffer_pos = 0 ;
macro_rules ! try_potential_handleerror {
( $thing : expr ) = > {
match $thing {
Ok ( x ) = > x ,
Err ( e ) = > {
2019-11-04 19:54:43 -05:00
match e . action {
msgs ::ErrorAction ::DisconnectPeer { msg : _ } = > {
//TODO: Try to push msg
2021-10-27 20:06:13 +00:00
log_debug! ( self . logger , " Error handling message{}; disconnecting peer with: {} " , OptionalFromDebugger ( & peer . their_node_id ) , e . err ) ;
2019-11-04 19:54:43 -05:00
return Err ( PeerHandleError { no_connection_possible : false } ) ;
} ,
2021-06-22 01:33:44 +00:00
msgs ::ErrorAction ::IgnoreAndLog ( level ) = > {
2021-10-27 20:06:13 +00:00
log_given_level! ( self . logger , level , " Error handling message{}; ignoring: {} " , OptionalFromDebugger ( & peer . their_node_id ) , e . err ) ;
2021-06-22 01:33:44 +00:00
continue
} ,
2021-11-13 01:54:54 +00:00
msgs ::ErrorAction ::IgnoreDuplicateGossip = > continue , // Don't even bother logging these
2019-11-04 19:54:43 -05:00
msgs ::ErrorAction ::IgnoreError = > {
2021-10-27 20:06:13 +00:00
log_debug! ( self . logger , " Error handling message{}; ignoring: {} " , OptionalFromDebugger ( & peer . their_node_id ) , e . err ) ;
2019-11-04 19:54:43 -05:00
continue ;
} ,
msgs ::ErrorAction ::SendErrorMessage { msg } = > {
2021-10-27 20:06:13 +00:00
log_debug! ( self . logger , " Error handling message{}; sending error message with: {} " , OptionalFromDebugger ( & peer . their_node_id ) , e . err ) ;
2021-06-10 18:46:24 +00:00
self . enqueue_message ( peer , & msg ) ;
2019-11-04 19:54:43 -05:00
continue ;
} ,
2021-07-22 16:05:48 +00:00
msgs ::ErrorAction ::SendWarningMessage { msg , log_level } = > {
log_given_level! ( self . logger , log_level , " Error handling message{}; sending warning message with: {} " , OptionalFromDebugger ( & peer . their_node_id ) , e . err ) ;
self . enqueue_message ( peer , & msg ) ;
continue ;
} ,
2018-04-01 19:23:09 -04:00
}
}
Fix trailing semicolon warnings on latest rustc nightly
Latest rustc nightly compiles are filled with warnings like the
following, which we fix here:
```
warning: trailing semicolon in macro used in expression position
--> lightning/src/util/macro_logger.rs:163:114
|
163 | $logger.log(&$crate::util::logger::Record::new($lvl, format_args!($($arg)+), module_path!(), file!(), line!()));
| ^
|
::: lightning/src/chain/chainmonitor.rs:165:9
|
165 | log_debug!(self.logger, "New best block {} at height {} provided via block_connected", header.block_hash(), height);
| -------------------------------------------------------------------------------------------------------------------- in this macro invocation
|
= note: `#[warn(semicolon_in_expressions_from_macros)]` on by default
= warning: this was previously accepted by the compiler but is being phased out; it will become a hard error in a future release!
= note: for more information, see issue #79813 <https://github.com/rust-lang/rust/issues/79813>
= note: this warning originates in the macro `log_internal` (in Nightly builds, run with -Z macro-backtrace for more info)
```
2021-08-18 00:33:56 +00:00
}
2018-04-01 19:23:09 -04:00
}
}
2018-09-08 13:56:45 -04:00
macro_rules ! insert_node_id {
( ) = > {
match peers . node_id_to_descriptor . entry ( peer . their_node_id . unwrap ( ) ) {
hash_map ::Entry ::Occupied ( _ ) = > {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Got second connection with {}, closing " , log_pubkey! ( peer . their_node_id . unwrap ( ) ) ) ;
2018-09-08 13:56:45 -04:00
peer . their_node_id = None ; // Unset so that we don't generate a peer_disconnected event
return Err ( PeerHandleError { no_connection_possible : false } )
} ,
2018-11-02 10:45:29 -04:00
hash_map ::Entry ::Vacant ( entry ) = > {
2021-06-22 03:36:36 +00:00
log_debug! ( self . logger , " Finished noise handshake for connection with {} " , log_pubkey! ( peer . their_node_id . unwrap ( ) ) ) ;
2018-11-02 10:45:29 -04:00
entry . insert ( peer_descriptor . clone ( ) )
} ,
2018-09-08 13:56:45 -04:00
} ;
}
}
2017-12-25 01:05:27 -05:00
let next_step = peer . channel_encryptor . get_noise_step ( ) ;
match next_step {
NextNoiseStep ::ActOne = > {
2019-07-18 22:17:36 -04:00
let act_two = try_potential_handleerror! ( peer . channel_encryptor . process_act_one_with_keys ( & peer . pending_read_buffer [ .. ] , & self . our_node_secret , self . get_ephemeral_key ( ) ) ) . to_vec ( ) ;
2017-12-25 01:05:27 -05:00
peer . pending_outbound_buffer . push_back ( act_two ) ;
peer . pending_read_buffer = [ 0 ; 66 ] . to_vec ( ) ; // act three is 66 bytes long
} ,
NextNoiseStep ::ActTwo = > {
2018-11-08 10:36:34 +10:30
let ( act_three , their_node_id ) = try_potential_handleerror! ( peer . channel_encryptor . process_act_two ( & peer . pending_read_buffer [ .. ] , & self . our_node_secret ) ) ;
peer . pending_outbound_buffer . push_back ( act_three . to_vec ( ) ) ;
2017-12-25 01:05:27 -05:00
peer . pending_read_buffer = [ 0 ; 18 ] . to_vec ( ) ; // Message length header is 18 bytes
2018-03-29 16:42:44 -04:00
peer . pending_read_is_header = true ;
2017-12-25 01:05:27 -05:00
2018-11-08 10:36:34 +10:30
peer . their_node_id = Some ( their_node_id ) ;
2018-09-08 13:56:45 -04:00
insert_node_id! ( ) ;
2020-12-03 12:48:40 -05:00
let features = InitFeatures ::known ( ) ;
2022-03-16 04:03:09 +05:30
let resp = msgs ::Init { features , remote_network_address : filter_addresses ( peer . their_net_address . clone ( ) ) } ;
2021-06-10 18:46:24 +00:00
self . enqueue_message ( peer , & resp ) ;
2021-10-21 22:33:42 +00:00
peer . awaiting_pong_timer_tick_intervals = 0 ;
2017-12-25 01:05:27 -05:00
} ,
NextNoiseStep ::ActThree = > {
2018-11-08 10:36:34 +10:30
let their_node_id = try_potential_handleerror! ( peer . channel_encryptor . process_act_three ( & peer . pending_read_buffer [ .. ] ) ) ;
2017-12-25 01:05:27 -05:00
peer . pending_read_buffer = [ 0 ; 18 ] . to_vec ( ) ; // Message length header is 18 bytes
peer . pending_read_is_header = true ;
peer . their_node_id = Some ( their_node_id ) ;
2018-09-08 13:56:45 -04:00
insert_node_id! ( ) ;
2021-04-28 15:22:34 +02:00
let features = InitFeatures ::known ( ) ;
2022-03-16 04:03:09 +05:30
let resp = msgs ::Init { features , remote_network_address : filter_addresses ( peer . their_net_address . clone ( ) ) } ;
2021-06-10 18:46:24 +00:00
self . enqueue_message ( peer , & resp ) ;
2021-10-21 22:33:42 +00:00
peer . awaiting_pong_timer_tick_intervals = 0 ;
2017-12-25 01:05:27 -05:00
} ,
NextNoiseStep ::NoiseComplete = > {
if peer . pending_read_is_header {
let msg_len = try_potential_handleerror! ( peer . channel_encryptor . decrypt_length_header ( & peer . pending_read_buffer [ .. ] ) ) ;
peer . pending_read_buffer = Vec ::with_capacity ( msg_len as usize + 16 ) ;
peer . pending_read_buffer . resize ( msg_len as usize + 16 , 0 ) ;
2018-03-20 16:41:33 -04:00
if msg_len < 2 { // Need at least the message type tag
2018-04-01 19:23:09 -04:00
return Err ( PeerHandleError { no_connection_possible : false } ) ;
2017-12-25 01:05:27 -05:00
}
peer . pending_read_is_header = false ;
} else {
let msg_data = try_potential_handleerror! ( peer . channel_encryptor . decrypt_message ( & peer . pending_read_buffer [ .. ] ) ) ;
assert! ( msg_data . len ( ) > = 2 ) ;
2018-04-01 19:23:09 -04:00
// Reset read buffer
peer . pending_read_buffer = [ 0 ; 18 ] . to_vec ( ) ;
peer . pending_read_is_header = true ;
2021-08-01 18:22:06 +02:00
let mut reader = io ::Cursor ::new ( & msg_data [ .. ] ) ;
2021-08-05 14:51:17 +09:00
let message_result = wire ::read ( & mut reader , & * self . custom_message_handler ) ;
2020-02-06 14:17:44 -08:00
let message = match message_result {
Ok ( x ) = > x ,
Err ( e ) = > {
match e {
2022-01-26 02:04:20 +00:00
// Note that to avoid recursion we never call
// `do_attempt_write_data` from here, causing
// the messages enqueued here to not actually
// be sent before the peer is disconnected.
( msgs ::DecodeError ::UnknownRequiredFeature , Some ( ty ) ) if is_gossip_msg ( ty ) = > {
2021-11-22 18:19:08 +01:00
log_gossip! ( self . logger , " Got a channel/node announcement with an unknown required feature flag, you may want to update! " ) ;
2020-02-06 14:17:44 -08:00
continue ;
}
2022-01-07 20:30:50 +00:00
( msgs ::DecodeError ::UnsupportedCompression , _ ) = > {
log_gossip! ( self . logger , " We don't support zlib-compressed message fields, sending a warning and ignoring message " ) ;
self . enqueue_message ( peer , & msgs ::WarningMessage { channel_id : [ 0 ; 32 ] , data : " Unsupported message compression: zlib " . to_owned ( ) } ) ;
continue ;
}
( _ , Some ( ty ) ) if is_gossip_msg ( ty ) = > {
log_gossip! ( self . logger , " Got an invalid value while deserializing a gossip message " ) ;
self . enqueue_message ( peer , & msgs ::WarningMessage { channel_id : [ 0 ; 32 ] , data : " Unreadable/bogus gossip message " . to_owned ( ) } ) ;
continue ;
}
2022-01-26 02:04:20 +00:00
( msgs ::DecodeError ::UnknownRequiredFeature , ty ) = > {
log_gossip! ( self . logger , " Received a message with an unknown required feature flag or TLV, you may want to update! " ) ;
self . enqueue_message ( peer , & msgs ::WarningMessage { channel_id : [ 0 ; 32 ] , data : format ! ( " Received an unknown required feature/TLV in message type {:?} " , ty ) } ) ;
return Err ( PeerHandleError { no_connection_possible : false } ) ;
}
2022-01-07 20:30:50 +00:00
( msgs ::DecodeError ::UnknownVersion , _ ) = > return Err ( PeerHandleError { no_connection_possible : false } ) ,
( msgs ::DecodeError ::InvalidValue , _ ) = > {
2020-03-02 12:55:53 -05:00
log_debug! ( self . logger , " Got an invalid value while deserializing message " ) ;
2020-02-06 14:17:44 -08:00
return Err ( PeerHandleError { no_connection_possible : false } ) ;
}
2022-01-07 20:30:50 +00:00
( msgs ::DecodeError ::ShortRead , _ ) = > {
2020-03-02 12:55:53 -05:00
log_debug! ( self . logger , " Deserialization failed due to shortness of message " ) ;
2020-02-06 14:17:44 -08:00
return Err ( PeerHandleError { no_connection_possible : false } ) ;
}
2022-01-07 20:30:50 +00:00
( msgs ::DecodeError ::BadLengthDescriptor , _ ) = > return Err ( PeerHandleError { no_connection_possible : false } ) ,
( msgs ::DecodeError ::Io ( _ ) , _ ) = > return Err ( PeerHandleError { no_connection_possible : false } ) ,
2020-02-06 14:17:44 -08:00
}
}
} ;
2021-06-10 18:46:24 +00:00
match self . handle_message ( peer , message ) {
2021-06-10 18:20:16 +00:00
Err ( handling_error ) = > match handling_error {
2020-05-22 13:03:49 -07:00
MessageHandlingError ::PeerHandleError ( e ) = > { return Err ( e ) } ,
MessageHandlingError ::LightningError ( e ) = > {
try_potential_handleerror! ( Err ( e ) ) ;
} ,
2021-06-10 18:20:16 +00:00
} ,
Ok ( Some ( msg ) ) = > {
peer_node_id = Some ( peer . their_node_id . expect ( " After noise is complete, their_node_id is always set " ) ) ;
msgs_to_forward . push ( msg ) ;
} ,
Ok ( None ) = > { } ,
2017-12-25 01:05:27 -05:00
}
}
}
}
}
}
2021-06-10 18:26:57 +00:00
peer . pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_READ_PAUSE // pause_read
2017-12-25 01:05:27 -05:00
}
} ;
2021-06-10 18:20:16 +00:00
for msg in msgs_to_forward . drain ( .. ) {
self . forward_broadcast_msg ( peers , & msg , peer_node_id . as_ref ( ) ) ;
}
2018-03-27 11:17:40 -04:00
pause_read
} ;
Ok ( pause_read )
}
2020-05-22 13:03:49 -07:00
/// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
2021-06-10 18:20:16 +00:00
/// Returns the message back if it needs to be broadcasted to all other peers.
2021-08-05 14:51:17 +09:00
fn handle_message (
& self ,
peer : & mut Peer ,
message : wire ::Message < < < CMH as core ::ops ::Deref > ::Target as wire ::CustomMessageReader > ::CustomMessage >
) -> Result < Option < wire ::Message < < < CMH as core ::ops ::Deref > ::Target as wire ::CustomMessageReader > ::CustomMessage > > , MessageHandlingError > {
2021-11-22 18:19:08 +01:00
if is_gossip_msg ( message . type_id ( ) ) {
log_gossip! ( self . logger , " Received message {:?} from {} " , message , log_pubkey! ( peer . their_node_id . unwrap ( ) ) ) ;
} else {
log_trace! ( self . logger , " Received message {:?} from {} " , message , log_pubkey! ( peer . their_node_id . unwrap ( ) ) ) ;
}
2021-10-11 04:24:08 +00:00
peer . received_message_since_timer_tick = true ;
2020-05-22 13:03:49 -07:00
// Need an Init as first message
if let wire ::Message ::Init ( _ ) = message {
} else if peer . their_features . is_none ( ) {
2021-06-22 03:36:36 +00:00
log_debug! ( self . logger , " Peer {} sent non-Init first message " , log_pubkey! ( peer . their_node_id . unwrap ( ) ) ) ;
2020-05-22 13:03:49 -07:00
return Err ( PeerHandleError { no_connection_possible : false } . into ( ) ) ;
}
2021-06-10 18:20:16 +00:00
let mut should_forward = None ;
2020-05-22 13:03:49 -07:00
match message {
// Setup and Control messages:
wire ::Message ::Init ( msg ) = > {
if msg . features . requires_unknown_bits ( ) {
2021-06-22 03:36:36 +00:00
log_debug! ( self . logger , " Peer features required unknown version bits " ) ;
2020-05-22 13:03:49 -07:00
return Err ( PeerHandleError { no_connection_possible : true } . into ( ) ) ;
}
if peer . their_features . is_some ( ) {
return Err ( PeerHandleError { no_connection_possible : false } . into ( ) ) ;
}
2021-10-27 20:06:13 +00:00
log_info! ( self . logger , " Received peer Init message from {}: {} " , log_pubkey! ( peer . their_node_id . unwrap ( ) ) , msg . features ) ;
2020-05-22 13:03:49 -07:00
if msg . features . initial_routing_sync ( ) {
peer . sync_status = InitSyncTracker ::ChannelsSyncing ( 0 ) ;
}
if ! msg . features . supports_static_remote_key ( ) {
log_debug! ( self . logger , " Peer {} does not support static remote key, disconnecting with no_connection_possible " , log_pubkey! ( peer . their_node_id . unwrap ( ) ) ) ;
return Err ( PeerHandleError { no_connection_possible : true } . into ( ) ) ;
}
2022-03-17 22:04:48 +00:00
self . message_handler . route_handler . peer_connected ( & peer . their_node_id . unwrap ( ) , & msg ) ;
2020-12-03 12:48:40 -05:00
2020-05-22 13:03:49 -07:00
self . message_handler . chan_handler . peer_connected ( & peer . their_node_id . unwrap ( ) , & msg ) ;
peer . their_features = Some ( msg . features ) ;
} ,
wire ::Message ::Error ( msg ) = > {
let mut data_is_printable = true ;
for b in msg . data . bytes ( ) {
if b < 32 | | b > 126 {
data_is_printable = false ;
break ;
}
}
if data_is_printable {
log_debug! ( self . logger , " Got Err message from {}: {} " , log_pubkey! ( peer . their_node_id . unwrap ( ) ) , msg . data ) ;
} else {
log_debug! ( self . logger , " Got Err message from {} with non-ASCII error message " , log_pubkey! ( peer . their_node_id . unwrap ( ) ) ) ;
}
self . message_handler . chan_handler . handle_error ( & peer . their_node_id . unwrap ( ) , & msg ) ;
if msg . channel_id = = [ 0 ; 32 ] {
return Err ( PeerHandleError { no_connection_possible : true } . into ( ) ) ;
}
} ,
2021-07-22 16:05:48 +00:00
wire ::Message ::Warning ( msg ) = > {
let mut data_is_printable = true ;
for b in msg . data . bytes ( ) {
if b < 32 | | b > 126 {
data_is_printable = false ;
break ;
}
}
if data_is_printable {
log_debug! ( self . logger , " Got warning message from {}: {} " , log_pubkey! ( peer . their_node_id . unwrap ( ) ) , msg . data ) ;
} else {
log_debug! ( self . logger , " Got warning message from {} with non-ASCII error message " , log_pubkey! ( peer . their_node_id . unwrap ( ) ) ) ;
}
} ,
2020-05-22 13:03:49 -07:00
wire ::Message ::Ping ( msg ) = > {
if msg . ponglen < 65532 {
let resp = msgs ::Pong { byteslen : msg . ponglen } ;
2021-06-10 18:46:24 +00:00
self . enqueue_message ( peer , & resp ) ;
2020-05-22 13:03:49 -07:00
}
} ,
wire ::Message ::Pong ( _msg ) = > {
2021-10-11 04:24:08 +00:00
peer . awaiting_pong_timer_tick_intervals = 0 ;
peer . msgs_sent_since_pong = 0 ;
2020-05-22 13:03:49 -07:00
} ,
// Channel messages:
wire ::Message ::OpenChannel ( msg ) = > {
self . message_handler . chan_handler . handle_open_channel ( & peer . their_node_id . unwrap ( ) , peer . their_features . clone ( ) . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::AcceptChannel ( msg ) = > {
self . message_handler . chan_handler . handle_accept_channel ( & peer . their_node_id . unwrap ( ) , peer . their_features . clone ( ) . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::FundingCreated ( msg ) = > {
self . message_handler . chan_handler . handle_funding_created ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::FundingSigned ( msg ) = > {
self . message_handler . chan_handler . handle_funding_signed ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::FundingLocked ( msg ) = > {
self . message_handler . chan_handler . handle_funding_locked ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::Shutdown ( msg ) = > {
2021-02-05 15:14:12 +01:00
self . message_handler . chan_handler . handle_shutdown ( & peer . their_node_id . unwrap ( ) , peer . their_features . as_ref ( ) . unwrap ( ) , & msg ) ;
2020-05-22 13:03:49 -07:00
} ,
wire ::Message ::ClosingSigned ( msg ) = > {
self . message_handler . chan_handler . handle_closing_signed ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
// Commitment messages:
wire ::Message ::UpdateAddHTLC ( msg ) = > {
self . message_handler . chan_handler . handle_update_add_htlc ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::UpdateFulfillHTLC ( msg ) = > {
self . message_handler . chan_handler . handle_update_fulfill_htlc ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::UpdateFailHTLC ( msg ) = > {
self . message_handler . chan_handler . handle_update_fail_htlc ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::UpdateFailMalformedHTLC ( msg ) = > {
self . message_handler . chan_handler . handle_update_fail_malformed_htlc ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::CommitmentSigned ( msg ) = > {
self . message_handler . chan_handler . handle_commitment_signed ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::RevokeAndACK ( msg ) = > {
self . message_handler . chan_handler . handle_revoke_and_ack ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::UpdateFee ( msg ) = > {
self . message_handler . chan_handler . handle_update_fee ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::ChannelReestablish ( msg ) = > {
self . message_handler . chan_handler . handle_channel_reestablish ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
// Routing messages:
wire ::Message ::AnnouncementSignatures ( msg ) = > {
self . message_handler . chan_handler . handle_announcement_signatures ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::ChannelAnnouncement ( msg ) = > {
2021-06-17 15:31:05 +00:00
if self . message_handler . route_handler . handle_channel_announcement ( & msg )
. map_err ( | e | -> MessageHandlingError { e . into ( ) } ) ? {
2021-06-10 18:20:16 +00:00
should_forward = Some ( wire ::Message ::ChannelAnnouncement ( msg ) ) ;
2020-05-22 13:03:49 -07:00
}
} ,
wire ::Message ::NodeAnnouncement ( msg ) = > {
2021-06-17 15:31:05 +00:00
if self . message_handler . route_handler . handle_node_announcement ( & msg )
. map_err ( | e | -> MessageHandlingError { e . into ( ) } ) ? {
2021-06-10 18:20:16 +00:00
should_forward = Some ( wire ::Message ::NodeAnnouncement ( msg ) ) ;
2020-05-22 13:03:49 -07:00
}
} ,
wire ::Message ::ChannelUpdate ( msg ) = > {
2021-03-12 15:25:56 -05:00
self . message_handler . chan_handler . handle_channel_update ( & peer . their_node_id . unwrap ( ) , & msg ) ;
2021-06-17 15:31:05 +00:00
if self . message_handler . route_handler . handle_channel_update ( & msg )
. map_err ( | e | -> MessageHandlingError { e . into ( ) } ) ? {
2021-06-10 18:20:16 +00:00
should_forward = Some ( wire ::Message ::ChannelUpdate ( msg ) ) ;
2020-05-22 13:03:49 -07:00
}
} ,
2020-10-22 10:51:54 -04:00
wire ::Message ::QueryShortChannelIds ( msg ) = > {
2020-12-03 11:52:54 -05:00
self . message_handler . route_handler . handle_query_short_channel_ids ( & peer . their_node_id . unwrap ( ) , msg ) ? ;
2020-10-21 17:20:26 -04:00
} ,
2020-10-22 10:51:54 -04:00
wire ::Message ::ReplyShortChannelIdsEnd ( msg ) = > {
2020-12-03 11:52:54 -05:00
self . message_handler . route_handler . handle_reply_short_channel_ids_end ( & peer . their_node_id . unwrap ( ) , msg ) ? ;
2020-10-21 17:20:26 -04:00
} ,
2020-10-22 10:51:54 -04:00
wire ::Message ::QueryChannelRange ( msg ) = > {
2020-12-03 11:52:54 -05:00
self . message_handler . route_handler . handle_query_channel_range ( & peer . their_node_id . unwrap ( ) , msg ) ? ;
2020-10-21 17:20:26 -04:00
} ,
2020-10-22 10:51:54 -04:00
wire ::Message ::ReplyChannelRange ( msg ) = > {
2020-12-03 11:52:54 -05:00
self . message_handler . route_handler . handle_reply_channel_range ( & peer . their_node_id . unwrap ( ) , msg ) ? ;
2020-10-21 17:20:26 -04:00
} ,
wire ::Message ::GossipTimestampFilter ( _msg ) = > {
// TODO: handle message
} ,
2020-05-22 13:03:49 -07:00
// Unknown messages:
2021-08-18 10:41:01 -05:00
wire ::Message ::Unknown ( type_id ) if message . is_even ( ) = > {
log_debug! ( self . logger , " Received unknown even message of type {}, disconnecting peer! " , type_id ) ;
2020-05-22 13:03:49 -07:00
// Fail the channel if message is an even, unknown type as per BOLT #1.
return Err ( PeerHandleError { no_connection_possible : true } . into ( ) ) ;
} ,
2021-08-18 10:41:01 -05:00
wire ::Message ::Unknown ( type_id ) = > {
log_trace! ( self . logger , " Received unknown odd message of type {}, ignoring " , type_id ) ;
2021-08-05 14:51:17 +09:00
} ,
wire ::Message ::Custom ( custom ) = > {
2021-09-14 15:40:10 +09:00
self . custom_message_handler . handle_custom_message ( custom , & peer . their_node_id . unwrap ( ) ) ? ;
2021-08-05 14:51:17 +09:00
} ,
2020-05-22 13:03:49 -07:00
} ;
2021-06-10 18:20:16 +00:00
Ok ( should_forward )
2020-05-22 13:03:49 -07:00
}
2021-08-05 14:51:17 +09:00
fn forward_broadcast_msg ( & self , peers : & mut PeerHolder < Descriptor > , msg : & wire ::Message < < < CMH as core ::ops ::Deref > ::Target as wire ::CustomMessageReader > ::CustomMessage > , except_node : Option < & PublicKey > ) {
2021-06-10 18:48:59 +00:00
match msg {
wire ::Message ::ChannelAnnouncement ( ref msg ) = > {
2021-11-22 18:19:08 +01:00
log_gossip! ( self . logger , " Sending message to all peers except {:?} or the announced channel's counterparties: {:?} " , except_node , msg ) ;
2021-06-10 18:48:59 +00:00
let encoded_msg = encode_msg! ( msg ) ;
2021-06-10 18:46:24 +00:00
for ( _ , peer ) in peers . peers . iter_mut ( ) {
2021-06-10 18:48:59 +00:00
if ! peer . channel_encryptor . is_ready_for_encryption ( ) | | peer . their_features . is_none ( ) | |
! peer . should_forward_channel_announcement ( msg . contents . short_channel_id ) {
continue
}
2021-10-11 04:24:08 +00:00
if peer . pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
| | peer . msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
{
2021-11-22 18:19:08 +01:00
log_gossip! ( self . logger , " Skipping broadcast message to {:?} as its outbound buffer is full " , peer . their_node_id ) ;
2021-06-10 18:26:57 +00:00
continue ;
}
2021-06-10 18:48:59 +00:00
if peer . their_node_id . as_ref ( ) = = Some ( & msg . contents . node_id_1 ) | |
peer . their_node_id . as_ref ( ) = = Some ( & msg . contents . node_id_2 ) {
continue ;
}
if except_node . is_some ( ) & & peer . their_node_id . as_ref ( ) = = except_node {
continue ;
}
2021-10-28 17:43:58 +00:00
self . enqueue_encoded_message ( peer , & encoded_msg ) ;
2021-06-10 18:48:59 +00:00
}
} ,
wire ::Message ::NodeAnnouncement ( ref msg ) = > {
2021-11-22 18:19:08 +01:00
log_gossip! ( self . logger , " Sending message to all peers except {:?} or the announced node: {:?} " , except_node , msg ) ;
2021-06-10 18:48:59 +00:00
let encoded_msg = encode_msg! ( msg ) ;
2021-06-10 18:46:24 +00:00
for ( _ , peer ) in peers . peers . iter_mut ( ) {
2021-06-10 18:48:59 +00:00
if ! peer . channel_encryptor . is_ready_for_encryption ( ) | | peer . their_features . is_none ( ) | |
! peer . should_forward_node_announcement ( msg . contents . node_id ) {
continue
}
2021-10-11 04:24:08 +00:00
if peer . pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
| | peer . msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
{
2021-11-22 18:19:08 +01:00
log_gossip! ( self . logger , " Skipping broadcast message to {:?} as its outbound buffer is full " , peer . their_node_id ) ;
2021-06-10 18:26:57 +00:00
continue ;
}
2021-06-10 18:48:59 +00:00
if peer . their_node_id . as_ref ( ) = = Some ( & msg . contents . node_id ) {
continue ;
}
if except_node . is_some ( ) & & peer . their_node_id . as_ref ( ) = = except_node {
continue ;
}
2021-10-28 17:43:58 +00:00
self . enqueue_encoded_message ( peer , & encoded_msg ) ;
2021-06-10 18:48:59 +00:00
}
} ,
wire ::Message ::ChannelUpdate ( ref msg ) = > {
2021-11-22 18:19:08 +01:00
log_gossip! ( self . logger , " Sending message to all peers except {:?}: {:?} " , except_node , msg ) ;
2021-06-10 18:48:59 +00:00
let encoded_msg = encode_msg! ( msg ) ;
2021-06-10 18:46:24 +00:00
for ( _ , peer ) in peers . peers . iter_mut ( ) {
2021-06-10 18:48:59 +00:00
if ! peer . channel_encryptor . is_ready_for_encryption ( ) | | peer . their_features . is_none ( ) | |
! peer . should_forward_channel_announcement ( msg . contents . short_channel_id ) {
continue
}
2021-10-11 04:24:08 +00:00
if peer . pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
| | peer . msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
{
2021-11-22 18:19:08 +01:00
log_gossip! ( self . logger , " Skipping broadcast message to {:?} as its outbound buffer is full " , peer . their_node_id ) ;
2021-06-10 18:26:57 +00:00
continue ;
}
2021-06-10 18:48:59 +00:00
if except_node . is_some ( ) & & peer . their_node_id . as_ref ( ) = = except_node {
continue ;
}
2021-10-28 17:43:58 +00:00
self . enqueue_encoded_message ( peer , & encoded_msg ) ;
2021-06-10 18:48:59 +00:00
}
} ,
_ = > debug_assert! ( false , " We shouldn't attempt to forward anything but gossip messages " ) ,
}
}
2018-10-20 18:17:19 -04:00
/// Checks for any events generated by our handlers and processes them. Includes sending most
/// response messages as well as messages generated by calls to handler functions directly (eg
2021-06-17 22:54:46 +00:00
/// functions like [`ChannelManager::process_pending_htlc_forwards`] or [`send_payment`]).
///
/// May call [`send_data`] on [`SocketDescriptor`]s. Thus, be very careful with reentrancy
/// issues!
///
2021-09-21 00:35:26 +05:30
/// You don't have to call this function explicitly if you are using [`lightning-net-tokio`]
/// or one of the other clients provided in our language bindings.
///
2021-06-17 22:54:46 +00:00
/// [`send_payment`]: crate::ln::channelmanager::ChannelManager::send_payment
/// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards
/// [`send_data`]: SocketDescriptor::send_data
2018-03-27 11:17:40 -04:00
pub fn process_events ( & self ) {
{
2017-12-25 01:05:27 -05:00
// TODO: There are some DoS attacks here where you can flood someone's outbound send
// buffer by doing things like announcing channels on another node. We should be willing to
// drop optional-ish messages when send buffers get full!
2021-04-21 21:50:41 +00:00
let mut peers_lock = self . peers . lock ( ) . unwrap ( ) ;
2018-10-19 16:25:32 -04:00
let mut events_generated = self . message_handler . chan_handler . get_and_clear_pending_msg_events ( ) ;
2020-10-22 10:51:54 -04:00
events_generated . append ( & mut self . message_handler . route_handler . get_and_clear_pending_msg_events ( ) ) ;
2020-01-16 10:48:16 -08:00
let peers = & mut * peers_lock ;
2021-08-05 14:51:17 +09:00
macro_rules ! get_peer_for_forwarding {
( $node_id : expr ) = > {
{
match peers . node_id_to_descriptor . get ( $node_id ) {
Some ( descriptor ) = > match peers . peers . get_mut ( & descriptor ) {
Some ( peer ) = > {
if peer . their_features . is_none ( ) {
continue ;
}
peer
2017-12-25 01:05:27 -05:00
} ,
2021-08-05 14:51:17 +09:00
None = > panic! ( " Inconsistent peers set state! " ) ,
} ,
None = > {
continue ;
} ,
2017-12-25 01:05:27 -05:00
}
}
}
2021-08-05 14:51:17 +09:00
}
for event in events_generated . drain ( .. ) {
2017-12-25 01:05:27 -05:00
match event {
2018-10-19 16:49:12 -04:00
MessageSendEvent ::SendAcceptChannel { ref node_id , ref msg } = > {
2021-06-22 03:36:36 +00:00
log_debug! ( self . logger , " Handling SendAcceptChannel event in peer_handler for node {} for channel {} " ,
2018-10-19 16:49:12 -04:00
log_pubkey! ( node_id ) ,
log_bytes! ( msg . temporary_channel_id ) ) ;
2021-06-22 01:30:19 +00:00
self . enqueue_message ( get_peer_for_forwarding! ( node_id ) , msg ) ;
2018-10-19 16:49:12 -04:00
} ,
2018-10-19 16:25:32 -04:00
MessageSendEvent ::SendOpenChannel { ref node_id , ref msg } = > {
2021-06-22 03:36:36 +00:00
log_debug! ( self . logger , " Handling SendOpenChannel event in peer_handler for node {} for channel {} " ,
2018-08-16 22:38:49 -04:00
log_pubkey! ( node_id ) ,
log_bytes! ( msg . temporary_channel_id ) ) ;
2021-06-22 01:30:19 +00:00
self . enqueue_message ( get_peer_for_forwarding! ( node_id ) , msg ) ;
2018-07-06 17:29:34 -04:00
} ,
2018-10-19 16:25:32 -04:00
MessageSendEvent ::SendFundingCreated { ref node_id , ref msg } = > {
2021-06-22 03:36:36 +00:00
log_debug! ( self . logger , " Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {}) " ,
2018-08-16 22:38:49 -04:00
log_pubkey! ( node_id ) ,
log_bytes! ( msg . temporary_channel_id ) ,
log_funding_channel_id! ( msg . funding_txid , msg . funding_output_index ) ) ;
2021-06-10 16:26:33 +00:00
// TODO: If the peer is gone we should generate a DiscardFunding event
// indicating to the wallet that they should just throw away this funding transaction
2021-06-22 01:30:19 +00:00
self . enqueue_message ( get_peer_for_forwarding! ( node_id ) , msg ) ;
2017-12-25 01:05:27 -05:00
} ,
2018-10-19 17:06:40 -04:00
MessageSendEvent ::SendFundingSigned { ref node_id , ref msg } = > {
2021-06-22 03:36:36 +00:00
log_debug! ( self . logger , " Handling SendFundingSigned event in peer_handler for node {} for channel {} " ,
2018-10-19 17:06:40 -04:00
log_pubkey! ( node_id ) ,
log_bytes! ( msg . channel_id ) ) ;
2021-06-22 01:30:19 +00:00
self . enqueue_message ( get_peer_for_forwarding! ( node_id ) , msg ) ;
2018-10-19 17:06:40 -04:00
} ,
2018-10-19 17:30:52 -04:00
MessageSendEvent ::SendFundingLocked { ref node_id , ref msg } = > {
2021-06-22 03:36:36 +00:00
log_debug! ( self . logger , " Handling SendFundingLocked event in peer_handler for node {} for channel {} " ,
2018-08-16 22:38:49 -04:00
log_pubkey! ( node_id ) ,
log_bytes! ( msg . channel_id ) ) ;
2021-06-22 01:30:19 +00:00
self . enqueue_message ( get_peer_for_forwarding! ( node_id ) , msg ) ;
2018-10-19 17:30:52 -04:00
} ,
MessageSendEvent ::SendAnnouncementSignatures { ref node_id , ref msg } = > {
2021-06-22 03:36:36 +00:00
log_debug! ( self . logger , " Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {}) " ,
2018-10-19 17:30:52 -04:00
log_pubkey! ( node_id ) ,
log_bytes! ( msg . channel_id ) ) ;
2021-06-22 01:30:19 +00:00
self . enqueue_message ( get_peer_for_forwarding! ( node_id ) , msg ) ;
2017-12-25 01:05:27 -05:00
} ,
2018-10-19 16:25:32 -04:00
MessageSendEvent ::UpdateHTLCs { ref node_id , updates : msgs ::CommitmentUpdate { ref update_add_htlcs , ref update_fulfill_htlcs , ref update_fail_htlcs , ref update_fail_malformed_htlcs , ref update_fee , ref commitment_signed } } = > {
2021-06-22 03:36:36 +00:00
log_debug! ( self . logger , " Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {} " ,
2018-08-16 22:38:49 -04:00
log_pubkey! ( node_id ) ,
2018-08-22 12:09:11 -04:00
update_add_htlcs . len ( ) ,
update_fulfill_htlcs . len ( ) ,
update_fail_htlcs . len ( ) ,
log_bytes! ( commitment_signed . channel_id ) ) ;
2021-06-21 18:48:51 +00:00
let peer = get_peer_for_forwarding! ( node_id ) ;
2018-08-22 12:09:11 -04:00
for msg in update_add_htlcs {
2021-06-22 01:30:19 +00:00
self . enqueue_message ( peer , msg ) ;
2017-12-25 01:05:27 -05:00
}
2018-08-22 12:09:11 -04:00
for msg in update_fulfill_htlcs {
2021-06-22 01:30:19 +00:00
self . enqueue_message ( peer , msg ) ;
2018-08-22 12:09:11 -04:00
}
for msg in update_fail_htlcs {
2021-06-22 01:30:19 +00:00
self . enqueue_message ( peer , msg ) ;
2018-08-22 12:09:11 -04:00
}
2018-08-26 16:30:01 -04:00
for msg in update_fail_malformed_htlcs {
2021-06-22 01:30:19 +00:00
self . enqueue_message ( peer , msg ) ;
2018-08-26 16:30:01 -04:00
}
2018-09-26 19:54:28 -04:00
if let & Some ( ref msg ) = update_fee {
2021-06-22 01:30:19 +00:00
self . enqueue_message ( peer , msg ) ;
2018-09-26 19:54:28 -04:00
}
2021-06-22 01:30:19 +00:00
self . enqueue_message ( peer , commitment_signed ) ;
2018-03-20 19:11:27 -04:00
} ,
2018-10-19 16:25:32 -04:00
MessageSendEvent ::SendRevokeAndACK { ref node_id , ref msg } = > {
2021-06-22 03:36:36 +00:00
log_debug! ( self . logger , " Handling SendRevokeAndACK event in peer_handler for node {} for channel {} " ,
2018-10-17 11:35:26 -04:00
log_pubkey! ( node_id ) ,
log_bytes! ( msg . channel_id ) ) ;
2021-06-22 01:30:19 +00:00
self . enqueue_message ( get_peer_for_forwarding! ( node_id ) , msg ) ;
2018-10-17 11:35:26 -04:00
} ,
2018-10-19 21:50:16 -04:00
MessageSendEvent ::SendClosingSigned { ref node_id , ref msg } = > {
2021-06-22 03:36:36 +00:00
log_debug! ( self . logger , " Handling SendClosingSigned event in peer_handler for node {} for channel {} " ,
2018-10-19 21:50:16 -04:00
log_pubkey! ( node_id ) ,
log_bytes! ( msg . channel_id ) ) ;
2021-06-22 01:30:19 +00:00
self . enqueue_message ( get_peer_for_forwarding! ( node_id ) , msg ) ;
2018-10-19 21:50:16 -04:00
} ,
2018-10-19 16:25:32 -04:00
MessageSendEvent ::SendShutdown { ref node_id , ref msg } = > {
2021-06-22 03:36:36 +00:00
log_debug! ( self . logger , " Handling Shutdown event in peer_handler for node {} for channel {} " ,
2018-08-16 22:38:49 -04:00
log_pubkey! ( node_id ) ,
log_bytes! ( msg . channel_id ) ) ;
2021-06-22 01:30:19 +00:00
self . enqueue_message ( get_peer_for_forwarding! ( node_id ) , msg ) ;
2018-07-22 23:03:31 -04:00
} ,
2018-10-20 17:50:34 -04:00
MessageSendEvent ::SendChannelReestablish { ref node_id , ref msg } = > {
2021-06-22 03:36:36 +00:00
log_debug! ( self . logger , " Handling SendChannelReestablish event in peer_handler for node {} for channel {} " ,
2018-10-20 17:50:34 -04:00
log_pubkey! ( node_id ) ,
log_bytes! ( msg . channel_id ) ) ;
2021-06-22 01:30:19 +00:00
self . enqueue_message ( get_peer_for_forwarding! ( node_id ) , msg ) ;
2018-10-20 17:50:34 -04:00
} ,
2021-06-10 18:48:59 +00:00
MessageSendEvent ::BroadcastChannelAnnouncement { msg , update_msg } = > {
2021-06-22 03:36:36 +00:00
log_debug! ( self . logger , " Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {} " , msg . contents . short_channel_id ) ;
2021-11-13 01:54:54 +00:00
match self . message_handler . route_handler . handle_channel_announcement ( & msg ) {
Ok ( _ ) | Err ( LightningError { action : msgs ::ErrorAction ::IgnoreDuplicateGossip , .. } ) = >
self . forward_broadcast_msg ( peers , & wire ::Message ::ChannelAnnouncement ( msg ) , None ) ,
_ = > { } ,
2021-11-13 00:27:05 +00:00
}
2021-11-13 01:54:54 +00:00
match self . message_handler . route_handler . handle_channel_update ( & update_msg ) {
Ok ( _ ) | Err ( LightningError { action : msgs ::ErrorAction ::IgnoreDuplicateGossip , .. } ) = >
self . forward_broadcast_msg ( peers , & wire ::Message ::ChannelUpdate ( update_msg ) , None ) ,
_ = > { } ,
2018-04-24 20:40:22 -04:00
}
} ,
2021-06-10 18:48:59 +00:00
MessageSendEvent ::BroadcastNodeAnnouncement { msg } = > {
2021-06-22 03:36:36 +00:00
log_debug! ( self . logger , " Handling BroadcastNodeAnnouncement event in peer_handler " ) ;
2021-11-13 01:54:54 +00:00
match self . message_handler . route_handler . handle_node_announcement ( & msg ) {
Ok ( _ ) | Err ( LightningError { action : msgs ::ErrorAction ::IgnoreDuplicateGossip , .. } ) = >
self . forward_broadcast_msg ( peers , & wire ::Message ::NodeAnnouncement ( msg ) , None ) ,
_ = > { } ,
2020-01-02 20:32:37 -05:00
}
} ,
2021-06-10 18:48:59 +00:00
MessageSendEvent ::BroadcastChannelUpdate { msg } = > {
2021-06-22 03:36:36 +00:00
log_debug! ( self . logger , " Handling BroadcastChannelUpdate event in peer_handler for short channel id {} " , msg . contents . short_channel_id ) ;
2021-11-13 01:54:54 +00:00
match self . message_handler . route_handler . handle_channel_update ( & msg ) {
Ok ( _ ) | Err ( LightningError { action : msgs ::ErrorAction ::IgnoreDuplicateGossip , .. } ) = >
self . forward_broadcast_msg ( peers , & wire ::Message ::ChannelUpdate ( msg ) , None ) ,
_ = > { } ,
2017-12-25 01:05:27 -05:00
}
} ,
2021-06-12 21:58:50 +00:00
MessageSendEvent ::SendChannelUpdate { ref node_id , ref msg } = > {
log_trace! ( self . logger , " Handling SendChannelUpdate event in peer_handler for node {} for channel {} " ,
log_pubkey! ( node_id ) , msg . contents . short_channel_id ) ;
let peer = get_peer_for_forwarding! ( node_id ) ;
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encode_msg! ( msg ) ) ) ;
} ,
2018-10-19 16:25:32 -04:00
MessageSendEvent ::HandleError { ref node_id , ref action } = > {
2019-11-04 19:54:43 -05:00
match * action {
msgs ::ErrorAction ::DisconnectPeer { ref msg } = > {
if let Some ( mut descriptor ) = peers . node_id_to_descriptor . remove ( node_id ) {
if let Some ( mut peer ) = peers . peers . remove ( & descriptor ) {
if let Some ( ref msg ) = * msg {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Handling DisconnectPeer HandleError event in peer_handler for node {} with message {} " ,
2019-11-04 19:54:43 -05:00
log_pubkey! ( node_id ) ,
msg . data ) ;
2021-06-22 01:30:19 +00:00
self . enqueue_message ( & mut peer , msg ) ;
2019-11-04 19:54:43 -05:00
// This isn't guaranteed to work, but if there is enough free
// room in the send buffer, put the error message there...
self . do_attempt_write_data ( & mut descriptor , & mut peer ) ;
} else {
2021-11-22 18:19:08 +01:00
log_gossip! ( self . logger , " Handling DisconnectPeer HandleError event in peer_handler for node {} with no message " , log_pubkey! ( node_id ) ) ;
2018-08-01 16:34:03 +00:00
}
}
2019-11-04 19:54:43 -05:00
descriptor . disconnect_socket ( ) ;
self . message_handler . chan_handler . peer_disconnected ( & node_id , false ) ;
}
} ,
2021-06-22 01:33:44 +00:00
msgs ::ErrorAction ::IgnoreAndLog ( level ) = > {
log_given_level! ( self . logger , level , " Received a HandleError event to be ignored for node {} " , log_pubkey! ( node_id ) ) ;
} ,
2021-11-13 01:54:54 +00:00
msgs ::ErrorAction ::IgnoreDuplicateGossip = > { } ,
2021-06-22 01:33:44 +00:00
msgs ::ErrorAction ::IgnoreError = > {
log_debug! ( self . logger , " Received a HandleError event to be ignored for node {} " , log_pubkey! ( node_id ) ) ;
} ,
2019-11-04 19:54:43 -05:00
msgs ::ErrorAction ::SendErrorMessage { ref msg } = > {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Handling SendErrorMessage HandleError event in peer_handler for node {} with message {} " ,
2019-11-04 19:54:43 -05:00
log_pubkey! ( node_id ) ,
msg . data ) ;
2021-06-22 01:30:19 +00:00
self . enqueue_message ( get_peer_for_forwarding! ( node_id ) , msg ) ;
2021-07-22 16:05:48 +00:00
} ,
msgs ::ErrorAction ::SendWarningMessage { ref msg , ref log_level } = > {
log_given_level! ( self . logger , * log_level , " Handling SendWarningMessage HandleError event in peer_handler for node {} with message {} " ,
log_pubkey! ( node_id ) ,
msg . data ) ;
self . enqueue_message ( get_peer_for_forwarding! ( node_id ) , msg ) ;
2019-11-04 19:54:43 -05:00
} ,
2018-07-23 01:06:45 +00:00
}
2020-10-22 08:47:24 -04:00
} ,
MessageSendEvent ::SendChannelRangeQuery { ref node_id , ref msg } = > {
2021-06-22 01:30:19 +00:00
self . enqueue_message ( get_peer_for_forwarding! ( node_id ) , msg ) ;
2020-10-22 08:47:24 -04:00
} ,
MessageSendEvent ::SendShortIdsQuery { ref node_id , ref msg } = > {
2021-06-22 01:30:19 +00:00
self . enqueue_message ( get_peer_for_forwarding! ( node_id ) , msg ) ;
2018-08-01 16:34:03 +00:00
}
2021-02-19 16:56:48 -05:00
MessageSendEvent ::SendReplyChannelRange { ref node_id , ref msg } = > {
2021-11-22 18:19:08 +01:00
log_gossip! ( self . logger , " Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={} " ,
2021-02-19 16:56:48 -05:00
log_pubkey! ( node_id ) ,
msg . short_channel_ids . len ( ) ,
msg . first_blocknum ,
msg . number_of_blocks ,
msg . sync_complete ) ;
2021-06-22 01:30:19 +00:00
self . enqueue_message ( get_peer_for_forwarding! ( node_id ) , msg ) ;
2021-02-19 16:56:48 -05:00
}
2022-03-17 22:14:43 +00:00
MessageSendEvent ::SendGossipTimestampFilter { ref node_id , ref msg } = > {
self . enqueue_message ( get_peer_for_forwarding! ( node_id ) , msg ) ;
}
2017-12-25 01:05:27 -05:00
}
}
2018-10-20 18:17:19 -04:00
2021-08-05 14:51:17 +09:00
for ( node_id , msg ) in self . custom_message_handler . get_and_clear_pending_msg ( ) {
self . enqueue_message ( get_peer_for_forwarding! ( & node_id ) , & msg ) ;
}
2021-06-10 18:46:24 +00:00
for ( descriptor , peer ) in peers . peers . iter_mut ( ) {
self . do_attempt_write_data ( & mut ( * descriptor ) . clone ( ) , peer ) ;
2018-10-20 18:17:19 -04:00
}
2018-03-27 11:17:40 -04:00
}
2017-12-25 01:05:27 -05:00
}
/// Indicates that the given socket descriptor's connection is now closed.
2020-02-20 15:12:42 -05:00
pub fn socket_disconnected ( & self , descriptor : & Descriptor ) {
2018-04-01 19:23:09 -04:00
self . disconnect_event_internal ( descriptor , false ) ;
}
fn disconnect_event_internal ( & self , descriptor : & Descriptor , no_connection_possible : bool ) {
2017-12-25 01:05:27 -05:00
let mut peers = self . peers . lock ( ) . unwrap ( ) ;
let peer_option = peers . peers . remove ( descriptor ) ;
match peer_option {
2021-06-17 22:30:09 +00:00
None = > {
// This is most likely a simple race condition where the user found that the socket
// was disconnected, then we told the user to `disconnect_socket()`, then they
// called this method. Either way we're disconnected, return.
} ,
2017-12-25 01:05:27 -05:00
Some ( peer ) = > {
match peer . their_node_id {
2018-04-01 19:23:09 -04:00
Some ( node_id ) = > {
2021-08-17 00:03:31 +00:00
log_trace! ( self . logger ,
" Handling disconnection of peer {}, with {}future connection to the peer possible. " ,
log_pubkey! ( node_id ) , if no_connection_possible { " no " } else { " " } ) ;
2018-04-01 19:23:09 -04:00
peers . node_id_to_descriptor . remove ( & node_id ) ;
self . message_handler . chan_handler . peer_disconnected ( & node_id , no_connection_possible ) ;
} ,
2017-12-25 01:05:27 -05:00
None = > { }
}
}
} ;
}
2019-09-20 11:16:45 -04:00
2021-01-28 17:45:36 -05:00
/// Disconnect a peer given its node id.
///
2021-06-17 22:54:46 +00:00
/// Set `no_connection_possible` to true to prevent any further connection with this peer,
2021-01-28 17:45:36 -05:00
/// force-closing any channels we have with it.
///
2021-06-17 22:54:46 +00:00
/// If a peer is connected, this will call [`disconnect_socket`] on the descriptor for the
/// peer. Thus, be very careful about reentrancy issues.
///
/// [`disconnect_socket`]: SocketDescriptor::disconnect_socket
2021-01-28 17:45:36 -05:00
pub fn disconnect_by_node_id ( & self , node_id : PublicKey , no_connection_possible : bool ) {
let mut peers_lock = self . peers . lock ( ) . unwrap ( ) ;
if let Some ( mut descriptor ) = peers_lock . node_id_to_descriptor . remove ( & node_id ) {
log_trace! ( self . logger , " Disconnecting peer with id {} due to client request " , node_id ) ;
peers_lock . peers . remove ( & descriptor ) ;
self . message_handler . chan_handler . peer_disconnected ( & node_id , no_connection_possible ) ;
descriptor . disconnect_socket ( ) ;
}
}
2021-10-26 02:03:02 +00:00
/// Disconnects all currently-connected peers. This is useful on platforms where there may be
/// an indication that TCP sockets have stalled even if we weren't around to time them out
/// using regular ping/pongs.
pub fn disconnect_all_peers ( & self ) {
let mut peers_lock = self . peers . lock ( ) . unwrap ( ) ;
let peers = & mut * peers_lock ;
for ( mut descriptor , peer ) in peers . peers . drain ( ) {
if let Some ( node_id ) = peer . their_node_id {
log_trace! ( self . logger , " Disconnecting peer with id {} due to client request to disconnect all peers " , node_id ) ;
peers . node_id_to_descriptor . remove ( & node_id ) ;
self . message_handler . chan_handler . peer_disconnected ( & node_id , false ) ;
}
descriptor . disconnect_socket ( ) ;
}
debug_assert! ( peers . node_id_to_descriptor . is_empty ( ) ) ;
}
2021-10-11 04:24:08 +00:00
/// This is called when we're blocked on sending additional gossip messages until we receive a
/// pong. If we aren't waiting on a pong, we take this opportunity to send a ping (setting
/// `awaiting_pong_timer_tick_intervals` to a special flag value to indicate this).
fn maybe_send_extra_ping ( & self , peer : & mut Peer ) {
if peer . awaiting_pong_timer_tick_intervals = = 0 {
peer . awaiting_pong_timer_tick_intervals = - 1 ;
let ping = msgs ::Ping {
ponglen : 0 ,
byteslen : 64 ,
} ;
self . enqueue_message ( peer , & ping ) ;
}
}
2021-08-04 16:21:36 +00:00
/// Send pings to each peer and disconnect those which did not respond to the last round of
/// pings.
///
2022-03-07 19:02:15 +00:00
/// This may be called on any timescale you want, however, roughly once every ten seconds is
/// preferred. The call rate determines both how often we send a ping to our peers and how much
/// time they have to respond before we disconnect them.
2021-06-17 22:54:46 +00:00
///
/// May call [`send_data`] on all [`SocketDescriptor`]s. Thus, be very careful with reentrancy
/// issues!
///
/// [`send_data`]: SocketDescriptor::send_data
2021-03-08 00:09:58 -08:00
pub fn timer_tick_occurred ( & self ) {
2019-09-20 11:16:45 -04:00
let mut peers_lock = self . peers . lock ( ) . unwrap ( ) ;
{
2020-01-16 10:48:16 -08:00
let peers = & mut * peers_lock ;
let node_id_to_descriptor = & mut peers . node_id_to_descriptor ;
let peers = & mut peers . peers ;
2020-02-20 15:12:42 -05:00
let mut descriptors_needing_disconnect = Vec ::new ( ) ;
2021-10-11 04:24:08 +00:00
let peer_count = peers . len ( ) ;
2019-09-20 11:16:45 -04:00
peers . retain ( | descriptor , peer | {
2021-10-21 22:33:42 +00:00
let mut do_disconnect_peer = false ;
if ! peer . channel_encryptor . is_ready_for_encryption ( ) | | peer . their_node_id . is_none ( ) {
// The peer needs to complete its handshake before we can exchange messages. We
// give peers one timer tick to complete handshake, reusing
// `awaiting_pong_timer_tick_intervals` to track number of timer ticks taken
// for handshake completion.
if peer . awaiting_pong_timer_tick_intervals ! = 0 {
do_disconnect_peer = true ;
} else {
peer . awaiting_pong_timer_tick_intervals = 1 ;
return true ;
}
}
if peer . awaiting_pong_timer_tick_intervals = = - 1 {
// Magic value set in `maybe_send_extra_ping`.
peer . awaiting_pong_timer_tick_intervals = 1 ;
peer . received_message_since_timer_tick = false ;
2021-10-11 04:24:08 +00:00
return true ;
}
2021-10-21 22:33:42 +00:00
if do_disconnect_peer
| | ( peer . awaiting_pong_timer_tick_intervals > 0 & & ! peer . received_message_since_timer_tick )
2021-10-11 04:24:08 +00:00
| | peer . awaiting_pong_timer_tick_intervals as u64 >
MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peer_count as u64
{
2020-02-20 15:12:42 -05:00
descriptors_needing_disconnect . push ( descriptor . clone ( ) ) ;
2019-09-20 11:16:45 -04:00
match peer . their_node_id {
Some ( node_id ) = > {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Disconnecting peer with id {} due to ping timeout " , node_id ) ;
2019-09-20 11:16:45 -04:00
node_id_to_descriptor . remove ( & node_id ) ;
2020-02-24 18:47:38 -05:00
self . message_handler . chan_handler . peer_disconnected ( & node_id , false ) ;
2020-02-18 14:16:33 -08:00
}
2021-10-21 22:33:42 +00:00
None = > { } ,
2019-09-20 11:16:45 -04:00
}
2020-02-18 14:16:33 -08:00
return false ;
}
2021-10-11 04:24:08 +00:00
peer . received_message_since_timer_tick = false ;
2020-02-18 14:16:33 -08:00
2021-10-11 04:24:08 +00:00
if peer . awaiting_pong_timer_tick_intervals > 0 {
peer . awaiting_pong_timer_tick_intervals + = 1 ;
2020-02-18 14:16:33 -08:00
return true ;
2019-09-20 11:16:45 -04:00
}
2021-10-11 04:24:08 +00:00
peer . awaiting_pong_timer_tick_intervals = 1 ;
2019-09-20 11:16:45 -04:00
let ping = msgs ::Ping {
ponglen : 0 ,
byteslen : 64 ,
} ;
2021-06-22 01:30:19 +00:00
self . enqueue_message ( peer , & ping ) ;
2021-10-11 04:24:08 +00:00
self . do_attempt_write_data ( & mut ( descriptor . clone ( ) ) , & mut * peer ) ;
2020-02-18 14:16:33 -08:00
true
2019-09-20 11:16:45 -04:00
} ) ;
2020-02-20 15:12:42 -05:00
for mut descriptor in descriptors_needing_disconnect . drain ( .. ) {
descriptor . disconnect_socket ( ) ;
}
2019-09-20 11:16:45 -04:00
}
}
2017-12-25 01:05:27 -05:00
}
2021-11-22 18:19:08 +01:00
fn is_gossip_msg ( type_id : u16 ) -> bool {
match type_id {
msgs ::ChannelAnnouncement ::TYPE |
msgs ::ChannelUpdate ::TYPE |
2022-04-14 02:12:12 +00:00
msgs ::NodeAnnouncement ::TYPE |
msgs ::QueryChannelRange ::TYPE |
msgs ::ReplyChannelRange ::TYPE |
msgs ::QueryShortChannelIds ::TYPE |
msgs ::ReplyShortChannelIdsEnd ::TYPE = > true ,
2021-11-22 18:19:08 +01:00
_ = > false
}
}
2018-07-23 01:06:45 +00:00
#[ cfg(test) ]
mod tests {
2022-03-13 00:40:35 +05:30
use ln ::peer_handler ::{ PeerManager , MessageHandler , SocketDescriptor , IgnoringMessageHandler , filter_addresses } ;
2018-08-01 16:34:03 +00:00
use ln ::msgs ;
2022-03-13 00:40:35 +05:30
use ln ::msgs ::NetAddress ;
2018-07-23 01:06:45 +00:00
use util ::events ;
use util ::test_utils ;
2020-04-27 16:51:59 +02:00
use bitcoin ::secp256k1 ::Secp256k1 ;
2022-05-05 17:59:38 +02:00
use bitcoin ::secp256k1 ::{ SecretKey , PublicKey } ;
2018-07-23 01:06:45 +00:00
2021-05-19 04:21:39 +00:00
use prelude ::* ;
2021-07-19 15:01:58 +02:00
use sync ::{ Arc , Mutex } ;
2021-05-23 23:22:46 +00:00
use core ::sync ::atomic ::Ordering ;
2018-07-23 01:06:45 +00:00
2020-02-18 14:16:33 -08:00
#[ derive(Clone) ]
2018-07-23 01:06:45 +00:00
struct FileDescriptor {
fd : u16 ,
2020-02-18 14:16:33 -08:00
outbound_data : Arc < Mutex < Vec < u8 > > > ,
}
impl PartialEq for FileDescriptor {
fn eq ( & self , other : & Self ) -> bool {
self . fd = = other . fd
}
}
impl Eq for FileDescriptor { }
2021-05-23 23:22:46 +00:00
impl core ::hash ::Hash for FileDescriptor {
fn hash < H : core ::hash ::Hasher > ( & self , hasher : & mut H ) {
2020-02-18 14:16:33 -08:00
self . fd . hash ( hasher )
}
2018-07-23 01:06:45 +00:00
}
impl SocketDescriptor for FileDescriptor {
2019-07-31 02:57:08 +00:00
fn send_data ( & mut self , data : & [ u8 ] , _resume_read : bool ) -> usize {
2020-02-18 14:16:33 -08:00
self . outbound_data . lock ( ) . unwrap ( ) . extend_from_slice ( data ) ;
2019-07-31 02:57:08 +00:00
data . len ( )
2018-07-23 01:06:45 +00:00
}
fn disconnect_socket ( & mut self ) { }
}
2020-05-17 12:13:29 -04:00
struct PeerManagerCfg {
chan_handler : test_utils ::TestChannelMessageHandler ,
2020-05-12 13:31:20 -04:00
routing_handler : test_utils ::TestRoutingMessageHandler ,
2020-05-17 12:13:29 -04:00
logger : test_utils ::TestLogger ,
}
fn create_peermgr_cfgs ( peer_count : usize ) -> Vec < PeerManagerCfg > {
let mut cfgs = Vec ::new ( ) ;
2020-01-16 13:26:38 -05:00
for _ in 0 .. peer_count {
2020-05-17 12:13:29 -04:00
cfgs . push (
PeerManagerCfg {
2020-05-12 13:31:20 -04:00
chan_handler : test_utils ::TestChannelMessageHandler ::new ( ) ,
logger : test_utils ::TestLogger ::new ( ) ,
routing_handler : test_utils ::TestRoutingMessageHandler ::new ( ) ,
2020-05-17 12:13:29 -04:00
}
) ;
2020-01-16 13:26:38 -05:00
}
2020-05-17 12:13:29 -04:00
cfgs
2020-01-16 13:26:38 -05:00
}
2021-08-05 14:51:17 +09:00
fn create_network < ' a > ( peer_count : usize , cfgs : & ' a Vec < PeerManagerCfg > ) -> Vec < PeerManager < FileDescriptor , & ' a test_utils ::TestChannelMessageHandler , & ' a test_utils ::TestRoutingMessageHandler , & ' a test_utils ::TestLogger , IgnoringMessageHandler > > {
2018-07-23 01:06:45 +00:00
let mut peers = Vec ::new ( ) ;
2020-01-16 13:26:38 -05:00
for i in 0 .. peer_count {
2020-06-17 08:29:30 -07:00
let node_secret = SecretKey ::from_slice ( & [ 42 + i as u8 ; 32 ] ) . unwrap ( ) ;
let ephemeral_bytes = [ i as u8 ; 32 ] ;
2020-05-12 13:31:20 -04:00
let msg_handler = MessageHandler { chan_handler : & cfgs [ i ] . chan_handler , route_handler : & cfgs [ i ] . routing_handler } ;
2021-08-05 14:51:17 +09:00
let peer = PeerManager ::new ( msg_handler , node_secret , & ephemeral_bytes , & cfgs [ i ] . logger , IgnoringMessageHandler { } ) ;
2018-07-23 01:06:45 +00:00
peers . push ( peer ) ;
}
peers
}
2021-08-05 14:51:17 +09:00
fn establish_connection < ' a > ( peer_a : & PeerManager < FileDescriptor , & ' a test_utils ::TestChannelMessageHandler , & ' a test_utils ::TestRoutingMessageHandler , & ' a test_utils ::TestLogger , IgnoringMessageHandler > , peer_b : & PeerManager < FileDescriptor , & ' a test_utils ::TestChannelMessageHandler , & ' a test_utils ::TestRoutingMessageHandler , & ' a test_utils ::TestLogger , IgnoringMessageHandler > ) -> ( FileDescriptor , FileDescriptor ) {
2018-07-23 01:06:45 +00:00
let secp_ctx = Secp256k1 ::new ( ) ;
2020-02-18 14:16:33 -08:00
let a_id = PublicKey ::from_secret_key ( & secp_ctx , & peer_a . our_node_secret ) ;
let mut fd_a = FileDescriptor { fd : 1 , outbound_data : Arc ::new ( Mutex ::new ( Vec ::new ( ) ) ) } ;
let mut fd_b = FileDescriptor { fd : 1 , outbound_data : Arc ::new ( Mutex ::new ( Vec ::new ( ) ) ) } ;
2022-03-13 00:40:35 +05:30
let initial_data = peer_b . new_outbound_connection ( a_id , fd_b . clone ( ) , None ) . unwrap ( ) ;
peer_a . new_inbound_connection ( fd_a . clone ( ) , None ) . unwrap ( ) ;
2020-01-31 20:57:01 -05:00
assert_eq! ( peer_a . read_event ( & mut fd_a , & initial_data ) . unwrap ( ) , false ) ;
2021-06-16 18:57:21 +00:00
peer_a . process_events ( ) ;
2020-01-31 20:57:01 -05:00
assert_eq! ( peer_b . read_event ( & mut fd_b , & fd_a . outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ) . unwrap ( ) , false ) ;
2021-06-16 18:57:21 +00:00
peer_b . process_events ( ) ;
2020-01-31 20:57:01 -05:00
assert_eq! ( peer_a . read_event ( & mut fd_a , & fd_b . outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ) . unwrap ( ) , false ) ;
2020-03-20 18:42:02 -04:00
( fd_a . clone ( ) , fd_b . clone ( ) )
2018-07-23 01:06:45 +00:00
}
#[ test ]
fn test_disconnect_peer ( ) {
// Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
2019-01-24 16:41:51 +02:00
// push a DisconnectPeer event to remove the node flagged by id
2020-05-17 12:13:29 -04:00
let cfgs = create_peermgr_cfgs ( 2 ) ;
2020-01-16 13:26:38 -05:00
let chan_handler = test_utils ::TestChannelMessageHandler ::new ( ) ;
2020-05-12 13:31:20 -04:00
let mut peers = create_network ( 2 , & cfgs ) ;
2018-07-23 01:06:45 +00:00
establish_connection ( & peers [ 0 ] , & peers [ 1 ] ) ;
assert_eq! ( peers [ 0 ] . peers . lock ( ) . unwrap ( ) . peers . len ( ) , 1 ) ;
let secp_ctx = Secp256k1 ::new ( ) ;
2018-08-20 17:13:07 -04:00
let their_id = PublicKey ::from_secret_key ( & secp_ctx , & peers [ 1 ] . our_node_secret ) ;
2018-07-23 01:06:45 +00:00
2018-10-19 16:25:32 -04:00
chan_handler . pending_events . lock ( ) . unwrap ( ) . push ( events ::MessageSendEvent ::HandleError {
2018-07-23 01:06:45 +00:00
node_id : their_id ,
2019-11-04 19:54:43 -05:00
action : msgs ::ErrorAction ::DisconnectPeer { msg : None } ,
2018-07-23 01:06:45 +00:00
} ) ;
assert_eq! ( chan_handler . pending_events . lock ( ) . unwrap ( ) . len ( ) , 1 ) ;
2020-01-16 13:26:38 -05:00
peers [ 0 ] . message_handler . chan_handler = & chan_handler ;
2018-07-23 01:06:45 +00:00
peers [ 0 ] . process_events ( ) ;
assert_eq! ( peers [ 0 ] . peers . lock ( ) . unwrap ( ) . peers . len ( ) , 0 ) ;
}
2020-03-20 18:42:02 -04:00
2019-09-20 11:16:45 -04:00
#[ test ]
2020-03-20 18:42:02 -04:00
fn test_timer_tick_occurred ( ) {
2019-09-20 11:16:45 -04:00
// Create peers, a vector of two peer managers, perform initial set up and check that peers[0] has one Peer.
2020-05-17 12:13:29 -04:00
let cfgs = create_peermgr_cfgs ( 2 ) ;
2020-05-12 13:31:20 -04:00
let peers = create_network ( 2 , & cfgs ) ;
2019-09-20 11:16:45 -04:00
establish_connection ( & peers [ 0 ] , & peers [ 1 ] ) ;
assert_eq! ( peers [ 0 ] . peers . lock ( ) . unwrap ( ) . peers . len ( ) , 1 ) ;
// peers[0] awaiting_pong is set to true, but the Peer is still connected
2021-03-08 00:09:58 -08:00
peers [ 0 ] . timer_tick_occurred ( ) ;
2021-06-16 18:57:21 +00:00
peers [ 0 ] . process_events ( ) ;
2019-09-20 11:16:45 -04:00
assert_eq! ( peers [ 0 ] . peers . lock ( ) . unwrap ( ) . peers . len ( ) , 1 ) ;
2021-03-08 00:09:58 -08:00
// Since timer_tick_occurred() is called again when awaiting_pong is true, all Peers are disconnected
peers [ 0 ] . timer_tick_occurred ( ) ;
2021-06-16 18:57:21 +00:00
peers [ 0 ] . process_events ( ) ;
2019-09-20 11:16:45 -04:00
assert_eq! ( peers [ 0 ] . peers . lock ( ) . unwrap ( ) . peers . len ( ) , 0 ) ;
}
2020-03-20 18:42:02 -04:00
#[ test ]
fn test_do_attempt_write_data ( ) {
// Create 2 peers with custom TestRoutingMessageHandlers and connect them.
2020-05-17 12:13:29 -04:00
let cfgs = create_peermgr_cfgs ( 2 ) ;
2020-05-12 13:31:20 -04:00
cfgs [ 0 ] . routing_handler . request_full_sync . store ( true , Ordering ::Release ) ;
cfgs [ 1 ] . routing_handler . request_full_sync . store ( true , Ordering ::Release ) ;
let peers = create_network ( 2 , & cfgs ) ;
2020-03-20 18:42:02 -04:00
// By calling establish_connect, we trigger do_attempt_write_data between
// the peers. Previously this function would mistakenly enter an infinite loop
// when there were more channel messages available than could fit into a peer's
// buffer. This issue would now be detected by this test (because we use custom
// RoutingMessageHandlers that intentionally return more channel messages
// than can fit into a peer's buffer).
let ( mut fd_a , mut fd_b ) = establish_connection ( & peers [ 0 ] , & peers [ 1 ] ) ;
2021-10-11 04:24:08 +00:00
// Make each peer to read the messages that the other peer just wrote to them. Note that
// due to the max-messagse-before-ping limits this may take a few iterations to complete.
for _ in 0 .. 150 / super ::BUFFER_DRAIN_MSGS_PER_TICK + 1 {
peers [ 0 ] . process_events ( ) ;
let b_read_data = fd_a . outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
assert! ( ! b_read_data . is_empty ( ) ) ;
peers [ 1 ] . read_event ( & mut fd_b , & b_read_data ) . unwrap ( ) ;
peers [ 1 ] . process_events ( ) ;
let a_read_data = fd_b . outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
assert! ( ! a_read_data . is_empty ( ) ) ;
peers [ 0 ] . read_event ( & mut fd_a , & a_read_data ) . unwrap ( ) ;
peers [ 1 ] . process_events ( ) ;
assert_eq! ( fd_b . outbound_data . lock ( ) . unwrap ( ) . len ( ) , 0 , " Until B receives data, it shouldn't send more messages " ) ;
}
2020-03-20 18:42:02 -04:00
// Check that each peer has received the expected number of channel updates and channel
// announcements.
2020-05-12 13:31:20 -04:00
assert_eq! ( cfgs [ 0 ] . routing_handler . chan_upds_recvd . load ( Ordering ::Acquire ) , 100 ) ;
assert_eq! ( cfgs [ 0 ] . routing_handler . chan_anns_recvd . load ( Ordering ::Acquire ) , 50 ) ;
assert_eq! ( cfgs [ 1 ] . routing_handler . chan_upds_recvd . load ( Ordering ::Acquire ) , 100 ) ;
assert_eq! ( cfgs [ 1 ] . routing_handler . chan_anns_recvd . load ( Ordering ::Acquire ) , 50 ) ;
2020-03-20 18:42:02 -04:00
}
2021-10-21 22:33:42 +00:00
#[ test ]
fn test_handshake_timeout ( ) {
// Tests that we time out a peer still waiting on handshake completion after a full timer
// tick.
let cfgs = create_peermgr_cfgs ( 2 ) ;
cfgs [ 0 ] . routing_handler . request_full_sync . store ( true , Ordering ::Release ) ;
cfgs [ 1 ] . routing_handler . request_full_sync . store ( true , Ordering ::Release ) ;
let peers = create_network ( 2 , & cfgs ) ;
let secp_ctx = Secp256k1 ::new ( ) ;
let a_id = PublicKey ::from_secret_key ( & secp_ctx , & peers [ 0 ] . our_node_secret ) ;
let mut fd_a = FileDescriptor { fd : 1 , outbound_data : Arc ::new ( Mutex ::new ( Vec ::new ( ) ) ) } ;
let mut fd_b = FileDescriptor { fd : 1 , outbound_data : Arc ::new ( Mutex ::new ( Vec ::new ( ) ) ) } ;
2022-03-13 00:40:35 +05:30
let initial_data = peers [ 1 ] . new_outbound_connection ( a_id , fd_b . clone ( ) , None ) . unwrap ( ) ;
peers [ 0 ] . new_inbound_connection ( fd_a . clone ( ) , None ) . unwrap ( ) ;
2021-10-21 22:33:42 +00:00
// If we get a single timer tick before completion, that's fine
assert_eq! ( peers [ 0 ] . peers . lock ( ) . unwrap ( ) . peers . len ( ) , 1 ) ;
peers [ 0 ] . timer_tick_occurred ( ) ;
assert_eq! ( peers [ 0 ] . peers . lock ( ) . unwrap ( ) . peers . len ( ) , 1 ) ;
assert_eq! ( peers [ 0 ] . read_event ( & mut fd_a , & initial_data ) . unwrap ( ) , false ) ;
peers [ 0 ] . process_events ( ) ;
assert_eq! ( peers [ 1 ] . read_event ( & mut fd_b , & fd_a . outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ) . unwrap ( ) , false ) ;
peers [ 1 ] . process_events ( ) ;
// ...but if we get a second timer tick, we should disconnect the peer
peers [ 0 ] . timer_tick_occurred ( ) ;
assert_eq! ( peers [ 0 ] . peers . lock ( ) . unwrap ( ) . peers . len ( ) , 0 ) ;
assert! ( peers [ 0 ] . read_event ( & mut fd_a , & fd_b . outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ) . is_err ( ) ) ;
}
2022-03-13 00:40:35 +05:30
#[ test ]
fn test_filter_addresses ( ) {
// Tests the filter_addresses function.
// For (10/8)
let ip_address = NetAddress ::IPv4 { addr : [ 10 , 0 , 0 , 0 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
let ip_address = NetAddress ::IPv4 { addr : [ 10 , 0 , 255 , 201 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
let ip_address = NetAddress ::IPv4 { addr : [ 10 , 255 , 255 , 255 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
// For (0/8)
let ip_address = NetAddress ::IPv4 { addr : [ 0 , 0 , 0 , 0 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
let ip_address = NetAddress ::IPv4 { addr : [ 0 , 0 , 255 , 187 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
let ip_address = NetAddress ::IPv4 { addr : [ 0 , 255 , 255 , 255 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
// For (100.64/10)
let ip_address = NetAddress ::IPv4 { addr : [ 100 , 64 , 0 , 0 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
let ip_address = NetAddress ::IPv4 { addr : [ 100 , 78 , 255 , 0 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
let ip_address = NetAddress ::IPv4 { addr : [ 100 , 127 , 255 , 255 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
// For (127/8)
let ip_address = NetAddress ::IPv4 { addr : [ 127 , 0 , 0 , 0 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
let ip_address = NetAddress ::IPv4 { addr : [ 127 , 65 , 73 , 0 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
let ip_address = NetAddress ::IPv4 { addr : [ 127 , 255 , 255 , 255 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
// For (169.254/16)
let ip_address = NetAddress ::IPv4 { addr : [ 169 , 254 , 0 , 0 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
let ip_address = NetAddress ::IPv4 { addr : [ 169 , 254 , 221 , 101 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
let ip_address = NetAddress ::IPv4 { addr : [ 169 , 254 , 255 , 255 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
// For (172.16/12)
let ip_address = NetAddress ::IPv4 { addr : [ 172 , 16 , 0 , 0 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
let ip_address = NetAddress ::IPv4 { addr : [ 172 , 27 , 101 , 23 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
let ip_address = NetAddress ::IPv4 { addr : [ 172 , 31 , 255 , 255 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
// For (192.168/16)
let ip_address = NetAddress ::IPv4 { addr : [ 192 , 168 , 0 , 0 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
let ip_address = NetAddress ::IPv4 { addr : [ 192 , 168 , 205 , 159 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
let ip_address = NetAddress ::IPv4 { addr : [ 192 , 168 , 255 , 255 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
// For (192.88.99/24)
let ip_address = NetAddress ::IPv4 { addr : [ 192 , 88 , 99 , 0 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
let ip_address = NetAddress ::IPv4 { addr : [ 192 , 88 , 99 , 140 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
let ip_address = NetAddress ::IPv4 { addr : [ 192 , 88 , 99 , 255 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
// For other IPv4 addresses
let ip_address = NetAddress ::IPv4 { addr : [ 188 , 255 , 99 , 0 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , Some ( ip_address . clone ( ) ) ) ;
let ip_address = NetAddress ::IPv4 { addr : [ 123 , 8 , 129 , 14 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , Some ( ip_address . clone ( ) ) ) ;
let ip_address = NetAddress ::IPv4 { addr : [ 2 , 88 , 9 , 255 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , Some ( ip_address . clone ( ) ) ) ;
// For (2000::/3)
let ip_address = NetAddress ::IPv6 { addr : [ 32 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , Some ( ip_address . clone ( ) ) ) ;
let ip_address = NetAddress ::IPv6 { addr : [ 45 , 34 , 209 , 190 , 0 , 123 , 55 , 34 , 0 , 0 , 3 , 27 , 201 , 0 , 0 , 0 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , Some ( ip_address . clone ( ) ) ) ;
let ip_address = NetAddress ::IPv6 { addr : [ 63 , 255 , 255 , 255 , 255 , 255 , 255 , 255 , 255 , 255 , 255 , 255 , 255 , 255 , 255 , 255 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , Some ( ip_address . clone ( ) ) ) ;
// For other IPv6 addresses
let ip_address = NetAddress ::IPv6 { addr : [ 24 , 240 , 12 , 32 , 0 , 0 , 0 , 0 , 20 , 97 , 0 , 32 , 121 , 254 , 0 , 0 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
let ip_address = NetAddress ::IPv6 { addr : [ 68 , 23 , 56 , 63 , 0 , 0 , 2 , 7 , 75 , 109 , 0 , 39 , 0 , 0 , 0 , 0 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
let ip_address = NetAddress ::IPv6 { addr : [ 101 , 38 , 140 , 230 , 100 , 0 , 30 , 98 , 0 , 26 , 0 , 0 , 57 , 96 , 0 , 0 ] , port : 1000 } ;
assert_eq! ( filter_addresses ( Some ( ip_address . clone ( ) ) ) , None ) ;
// For (None)
assert_eq! ( filter_addresses ( None ) , None ) ;
}
2018-07-23 01:06:45 +00:00
}