2020-08-10 15:00:09 -04:00
// This file is Copyright its original authors, visible in version control
// history.
//
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
// You may not use this file except in accordance with one or both of these
// licenses.
2018-09-19 17:39:43 -04:00
//! Top level peer message handling and socket handling logic lives here.
2018-09-20 12:57:47 -04:00
//!
2018-09-19 17:39:43 -04:00
//! Instead of actually servicing sockets ourselves we require that you implement the
//! SocketDescriptor interface and use that to receive actions which you should perform on the
//! socket, and call into PeerManager with bytes read from the socket. The PeerManager will then
2020-05-06 19:04:44 -04:00
//! call into the provided message handlers (probably a ChannelManager and NetGraphmsgHandler) with messages
2018-09-19 17:39:43 -04:00
//! they should handle, and encoding/sending response messages.
2020-04-27 16:51:59 +02:00
use bitcoin ::secp256k1 ::key ::{ SecretKey , PublicKey } ;
2017-12-25 01:05:27 -05:00
2020-01-06 17:54:02 -05:00
use ln ::features ::InitFeatures ;
2017-12-25 01:05:27 -05:00
use ln ::msgs ;
2020-05-22 13:03:49 -07:00
use ln ::msgs ::{ ChannelMessageHandler , LightningError , RoutingMessageHandler } ;
2020-01-16 13:26:38 -05:00
use ln ::channelmanager ::{ SimpleArcChannelManager , SimpleRefChannelManager } ;
2020-05-18 10:55:28 -07:00
use util ::ser ::{ VecWriter , Writeable } ;
2017-12-25 01:05:27 -05:00
use ln ::peer_channel_encryptor ::{ PeerChannelEncryptor , NextNoiseStep } ;
2020-01-21 15:26:21 -08:00
use ln ::wire ;
use ln ::wire ::Encode ;
2017-12-25 01:05:27 -05:00
use util ::byte_utils ;
2020-01-16 13:26:38 -05:00
use util ::events ::{ MessageSendEvent , MessageSendEventsProvider } ;
2018-08-20 12:56:17 -04:00
use util ::logger ::Logger ;
2020-05-12 13:31:20 -04:00
use routing ::network_graph ::NetGraphMsgHandler ;
2017-12-25 01:05:27 -05:00
2018-10-20 18:17:19 -04:00
use std ::collections ::{ HashMap , hash_map , HashSet , LinkedList } ;
2017-12-25 01:05:27 -05:00
use std ::sync ::{ Arc , Mutex } ;
2018-06-16 19:39:40 -04:00
use std ::sync ::atomic ::{ AtomicUsize , Ordering } ;
2018-10-19 16:25:32 -04:00
use std ::{ cmp , error , hash , fmt } ;
2020-01-16 13:26:38 -05:00
use std ::ops ::Deref ;
2017-12-25 01:05:27 -05:00
2020-04-27 16:41:54 +02:00
use bitcoin ::hashes ::sha256 ::Hash as Sha256 ;
use bitcoin ::hashes ::sha256 ::HashEngine as Sha256Engine ;
use bitcoin ::hashes ::{ HashEngine , Hash } ;
2019-07-18 22:17:36 -04:00
2018-09-19 17:39:43 -04:00
/// Provides references to trait impls which handle different types of messages.
2020-05-12 13:31:20 -04:00
pub struct MessageHandler < CM : Deref , RM : Deref > where
CM ::Target : ChannelMessageHandler ,
RM ::Target : RoutingMessageHandler {
2018-09-19 17:39:43 -04:00
/// A message handler which handles messages specific to channels. Usually this is just a
/// ChannelManager object.
2020-01-16 13:26:38 -05:00
pub chan_handler : CM ,
2018-09-19 17:39:43 -04:00
/// A message handler which handles messages updating our knowledge of the network channel
2020-05-02 15:05:04 -04:00
/// graph. Usually this is just a NetGraphMsgHandlerMonitor object.
2020-05-12 13:31:20 -04:00
pub route_handler : RM ,
2017-12-25 01:05:27 -05:00
}
/// Provides an object which can be used to send data to and which uniquely identifies a connection
/// to a remote host. You will need to be able to generate multiple of these which meet Eq and
/// implement Hash to meet the PeerManager API.
2018-09-20 12:57:47 -04:00
///
2018-02-27 23:38:52 +01:00
/// For efficiency, Clone should be relatively cheap for this type.
2018-09-20 12:57:47 -04:00
///
2017-12-25 01:05:27 -05:00
/// You probably want to just extend an int and put a file descriptor in a struct and implement
2020-02-24 11:54:15 -05:00
/// send_data. Note that if you are using a higher-level net library that may call close() itself,
/// be careful to ensure you don't have races whereby you might register a new connection with an
/// fd which is the same as a previous one which has yet to be removed via
/// PeerManager::socket_disconnected().
2017-12-25 01:05:27 -05:00
pub trait SocketDescriptor : cmp ::Eq + hash ::Hash + Clone {
2019-07-31 02:57:08 +00:00
/// Attempts to send some data from the given slice to the peer.
///
2017-12-25 01:05:27 -05:00
/// Returns the amount of data which was sent, possibly 0 if the socket has since disconnected.
2020-02-24 11:54:15 -05:00
/// Note that in the disconnected case, socket_disconnected must still fire and further write
2017-12-25 01:05:27 -05:00
/// attempts may occur until that time.
2018-09-20 12:57:47 -04:00
///
2019-07-31 02:57:08 +00:00
/// If the returned size is smaller than data.len(), a write_available event must
2017-12-25 01:05:27 -05:00
/// trigger the next time more data can be written. Additionally, until the a send_data event
/// completes fully, no further read_events should trigger on the same peer!
2018-09-20 12:57:47 -04:00
///
2017-12-25 01:05:27 -05:00
/// If a read_event on this descriptor had previously returned true (indicating that read
/// events should be paused to prevent DoS in the send buffer), resume_read may be set
/// indicating that read events on this descriptor should resume. A resume_read of false does
/// *not* imply that further read events should be paused.
2019-07-31 02:57:08 +00:00
fn send_data ( & mut self , data : & [ u8 ] , resume_read : bool ) -> usize ;
2018-07-23 01:06:45 +00:00
/// Disconnect the socket pointed to by this SocketDescriptor. Once this function returns, no
2020-02-20 15:12:42 -05:00
/// more calls to write_buffer_space_avail, read_event or socket_disconnected may be made with
/// this descriptor. No socket_disconnected call should be generated as a result of this call,
2020-02-24 11:54:15 -05:00
/// though races may occur whereby disconnect_socket is called after a call to
/// socket_disconnected but prior to socket_disconnected returning.
2018-07-23 01:06:45 +00:00
fn disconnect_socket ( & mut self ) ;
2017-12-25 01:05:27 -05:00
}
/// Error for PeerManager errors. If you get one of these, you must disconnect the socket and
2020-02-20 15:12:42 -05:00
/// generate no further read_event/write_buffer_space_avail calls for the descriptor, only
/// triggering a single socket_disconnected call (unless it was provided in response to a
2020-02-24 11:54:15 -05:00
/// new_*_connection event, in which case no such socket_disconnected() must be called and the
/// socket silently disconencted).
2018-04-01 19:21:26 -04:00
pub struct PeerHandleError {
2018-09-19 17:39:43 -04:00
/// Used to indicate that we probably can't make any future connections to this peer, implying
/// we should go ahead and force-close any channels we have with it.
2020-05-13 21:25:51 -04:00
pub no_connection_possible : bool ,
2018-04-01 19:21:26 -04:00
}
2018-03-19 17:26:29 -04:00
impl fmt ::Debug for PeerHandleError {
fn fmt ( & self , formatter : & mut fmt ::Formatter ) -> Result < ( ) , fmt ::Error > {
2018-04-01 19:21:26 -04:00
formatter . write_str ( " Peer Sent Invalid Data " )
2018-03-19 17:26:29 -04:00
}
}
2018-06-14 18:03:16 -04:00
impl fmt ::Display for PeerHandleError {
fn fmt ( & self , formatter : & mut fmt ::Formatter ) -> Result < ( ) , fmt ::Error > {
formatter . write_str ( " Peer Sent Invalid Data " )
}
}
impl error ::Error for PeerHandleError {
fn description ( & self ) -> & str {
" Peer Sent Invalid Data "
}
}
2017-12-25 01:05:27 -05:00
2018-10-02 16:02:17 +02:00
enum InitSyncTracker {
NoSyncRequested ,
ChannelsSyncing ( u64 ) ,
NodesSyncing ( PublicKey ) ,
}
2017-12-25 01:05:27 -05:00
struct Peer {
channel_encryptor : PeerChannelEncryptor ,
2018-04-01 19:21:26 -04:00
outbound : bool ,
2017-12-25 01:05:27 -05:00
their_node_id : Option < PublicKey > ,
2020-01-06 17:54:02 -05:00
their_features : Option < InitFeatures > ,
2017-12-25 01:05:27 -05:00
pending_outbound_buffer : LinkedList < Vec < u8 > > ,
pending_outbound_buffer_first_msg_offset : usize ,
awaiting_write_event : bool ,
pending_read_buffer : Vec < u8 > ,
pending_read_buffer_pos : usize ,
pending_read_is_header : bool ,
2018-10-02 16:02:17 +02:00
sync_status : InitSyncTracker ,
2019-09-20 11:16:45 -04:00
awaiting_pong : bool ,
2018-10-02 16:02:17 +02:00
}
impl Peer {
2019-01-24 16:41:51 +02:00
/// Returns true if the channel announcements/updates for the given channel should be
2018-10-02 16:02:17 +02:00
/// forwarded to this peer.
/// If we are sending our routing table to this peer and we have not yet sent channel
/// announcements/updates for the given channel_id then we will send it when we get to that
/// point and we shouldn't send it yet to avoid sending duplicate updates. If we've already
/// sent the old versions, we should send the update, and so return true here.
2020-01-02 20:32:37 -05:00
fn should_forward_channel_announcement ( & self , channel_id : u64 ) ->bool {
2018-10-02 16:02:17 +02:00
match self . sync_status {
InitSyncTracker ::NoSyncRequested = > true ,
InitSyncTracker ::ChannelsSyncing ( i ) = > i < channel_id ,
InitSyncTracker ::NodesSyncing ( _ ) = > true ,
}
}
2020-01-02 20:32:37 -05:00
/// Similar to the above, but for node announcements indexed by node_id.
fn should_forward_node_announcement ( & self , node_id : PublicKey ) -> bool {
match self . sync_status {
InitSyncTracker ::NoSyncRequested = > true ,
InitSyncTracker ::ChannelsSyncing ( _ ) = > false ,
InitSyncTracker ::NodesSyncing ( pk ) = > pk < node_id ,
}
}
2017-12-25 01:05:27 -05:00
}
struct PeerHolder < Descriptor : SocketDescriptor > {
peers : HashMap < Descriptor , Peer > ,
2018-10-20 18:17:19 -04:00
/// Added to by do_read_event for cases where we pushed a message onto the send buffer but
/// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events()
peers_needing_send : HashSet < Descriptor > ,
2017-12-25 01:05:27 -05:00
/// Only add to this set when noise completes:
node_id_to_descriptor : HashMap < PublicKey , Descriptor > ,
}
2019-07-18 22:17:36 -04:00
#[ cfg(not(any(target_pointer_width = " 32 " , target_pointer_width = " 64 " ))) ]
fn _check_usize_is_32_or_64 ( ) {
// See below, less than 32 bit pointers may be unsafe here!
unsafe { mem ::transmute ::< * const usize , [ u8 ; 4 ] > ( panic! ( ) ) ; }
}
2020-01-16 13:26:38 -05:00
/// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
/// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static
/// lifetimes). Other times you can afford a reference, which is more efficient, in which case
/// SimpleRefPeerManager is the more appropriate type. Defining these type aliases prevents
/// issues such as overly long function definitions.
2020-05-12 13:31:20 -04:00
pub type SimpleArcPeerManager < SD , M , T , F , C , L > = Arc < PeerManager < SD , SimpleArcChannelManager < M , T , F , L > , Arc < NetGraphMsgHandler < Arc < C > , Arc < L > > > , Arc < L > > > ;
2020-01-16 13:26:38 -05:00
/// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference
/// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't
/// need a PeerManager with a static lifetime. You'll need a static lifetime in cases such as
/// usage of lightning-net-tokio (since tokio::spawn requires parameters with static lifetimes).
/// But if this is not necessary, using a reference is more efficient. Defining these type aliases
/// helps with issues such as long function definitions.
2020-05-12 13:31:20 -04:00
pub type SimpleRefPeerManager < ' a , ' b , ' c , ' d , ' e , ' f , ' g , SD , M , T , F , C , L > = PeerManager < SD , SimpleRefChannelManager < ' a , ' b , ' c , ' d , ' e , M , T , F , L > , & ' e NetGraphMsgHandler < & ' g C , & ' f L > , & ' f L > ;
2020-01-16 13:26:38 -05:00
2018-09-19 17:39:43 -04:00
/// A PeerManager manages a set of peers, described by their SocketDescriptor and marshalls socket
/// events into messages which it passes on to its MessageHandlers.
2020-01-16 13:26:38 -05:00
///
/// Rather than using a plain PeerManager, it is preferable to use either a SimpleArcPeerManager
/// a SimpleRefPeerManager, for conciseness. See their documentation for more details, but
/// essentially you should default to using a SimpleRefPeerManager, and use a
/// SimpleArcPeerManager when you require a PeerManager with a static lifetime, such as when
/// you're using lightning-net-tokio.
2020-05-12 13:31:20 -04:00
pub struct PeerManager < Descriptor : SocketDescriptor , CM : Deref , RM : Deref , L : Deref > where
CM ::Target : ChannelMessageHandler ,
RM ::Target : RoutingMessageHandler ,
L ::Target : Logger {
message_handler : MessageHandler < CM , RM > ,
2017-12-25 01:05:27 -05:00
peers : Mutex < PeerHolder < Descriptor > > ,
our_node_secret : SecretKey ,
2019-07-18 22:17:36 -04:00
ephemeral_key_midstate : Sha256Engine ,
// Usize needs to be at least 32 bits to avoid overflowing both low and high. If usize is 64
// bits we will never realistically count into high:
peer_counter_low : AtomicUsize ,
peer_counter_high : AtomicUsize ,
2020-05-17 12:13:29 -04:00
logger : L ,
2017-12-25 01:05:27 -05:00
}
2020-05-22 13:03:49 -07:00
enum MessageHandlingError {
PeerHandleError ( PeerHandleError ) ,
LightningError ( LightningError ) ,
}
impl From < PeerHandleError > for MessageHandlingError {
fn from ( error : PeerHandleError ) -> Self {
MessageHandlingError ::PeerHandleError ( error )
}
}
impl From < LightningError > for MessageHandlingError {
fn from ( error : LightningError ) -> Self {
MessageHandlingError ::LightningError ( error )
}
}
2017-12-25 01:05:27 -05:00
macro_rules ! encode_msg {
2020-01-21 15:26:21 -08:00
( $msg : expr ) = > { {
let mut buffer = VecWriter ( Vec ::new ( ) ) ;
wire ::write ( $msg , & mut buffer ) . unwrap ( ) ;
buffer . 0
2018-09-10 16:13:26 +09:00
} }
2017-12-25 01:05:27 -05:00
}
/// Manages and reacts to connection events. You probably want to use file descriptors as PeerIds.
2020-02-20 15:12:42 -05:00
/// PeerIds may repeat, but only after socket_disconnected() has been called.
2020-05-12 13:31:20 -04:00
impl < Descriptor : SocketDescriptor , CM : Deref , RM : Deref , L : Deref > PeerManager < Descriptor , CM , RM , L > where
CM ::Target : ChannelMessageHandler ,
RM ::Target : RoutingMessageHandler ,
L ::Target : Logger {
2018-09-19 17:39:43 -04:00
/// Constructs a new PeerManager with the given message handlers and node_id secret key
2019-07-18 22:17:36 -04:00
/// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
/// cryptographically secure random bytes.
2020-05-12 13:31:20 -04:00
pub fn new ( message_handler : MessageHandler < CM , RM > , our_node_secret : SecretKey , ephemeral_random_data : & [ u8 ; 32 ] , logger : L ) -> Self {
2019-07-18 22:17:36 -04:00
let mut ephemeral_key_midstate = Sha256 ::engine ( ) ;
ephemeral_key_midstate . input ( ephemeral_random_data ) ;
2017-12-25 01:05:27 -05:00
PeerManager {
2020-05-17 12:13:29 -04:00
message_handler ,
2018-10-20 18:17:19 -04:00
peers : Mutex ::new ( PeerHolder {
peers : HashMap ::new ( ) ,
peers_needing_send : HashSet ::new ( ) ,
node_id_to_descriptor : HashMap ::new ( )
} ) ,
2020-05-17 12:13:29 -04:00
our_node_secret ,
2019-07-18 22:17:36 -04:00
ephemeral_key_midstate ,
peer_counter_low : AtomicUsize ::new ( 0 ) ,
peer_counter_high : AtomicUsize ::new ( 0 ) ,
2018-07-25 02:34:51 +00:00
logger ,
2017-12-25 01:05:27 -05:00
}
}
2018-07-22 13:11:58 -04:00
/// Get the list of node ids for peers which have completed the initial handshake.
2018-09-20 12:57:47 -04:00
///
2018-07-22 13:11:58 -04:00
/// For outbound connections, this will be the same as the their_node_id parameter passed in to
/// new_outbound_connection, however entries will only appear once the initial handshake has
/// completed and we are sure the remote peer has the private key for the given node_id.
2018-07-21 04:04:28 +09:00
pub fn get_peer_node_ids ( & self ) -> Vec < PublicKey > {
let peers = self . peers . lock ( ) . unwrap ( ) ;
2018-09-08 13:57:20 -04:00
peers . peers . values ( ) . filter_map ( | p | {
2019-12-23 17:52:58 -05:00
if ! p . channel_encryptor . is_ready_for_encryption ( ) | | p . their_features . is_none ( ) {
2018-09-08 13:57:20 -04:00
return None ;
}
p . their_node_id
} ) . collect ( )
2018-07-21 04:04:28 +09:00
}
2019-07-18 22:17:36 -04:00
fn get_ephemeral_key ( & self ) -> SecretKey {
let mut ephemeral_hash = self . ephemeral_key_midstate . clone ( ) ;
let low = self . peer_counter_low . fetch_add ( 1 , Ordering ::AcqRel ) ;
let high = if low = = 0 {
self . peer_counter_high . fetch_add ( 1 , Ordering ::AcqRel )
} else {
self . peer_counter_high . load ( Ordering ::Acquire )
} ;
ephemeral_hash . input ( & byte_utils ::le64_to_array ( low as u64 ) ) ;
ephemeral_hash . input ( & byte_utils ::le64_to_array ( high as u64 ) ) ;
SecretKey ::from_slice ( & Sha256 ::from_engine ( ephemeral_hash ) . into_inner ( ) ) . expect ( " You broke SHA-256! " )
}
2017-12-25 01:05:27 -05:00
/// Indicates a new outbound connection has been established to a node with the given node_id.
2020-02-20 15:12:42 -05:00
/// Note that if an Err is returned here you MUST NOT call socket_disconnected for the new
2017-12-25 01:05:27 -05:00
/// descriptor but must disconnect the connection immediately.
2018-09-20 12:57:47 -04:00
///
2018-10-20 18:17:19 -04:00
/// Returns a small number of bytes to send to the remote node (currently always 50).
2018-09-20 12:57:47 -04:00
///
2020-02-20 15:12:42 -05:00
/// Panics if descriptor is duplicative with some other descriptor which has not yet had a
/// socket_disconnected().
2017-12-25 01:05:27 -05:00
pub fn new_outbound_connection ( & self , their_node_id : PublicKey , descriptor : Descriptor ) -> Result < Vec < u8 > , PeerHandleError > {
2019-07-18 22:17:36 -04:00
let mut peer_encryptor = PeerChannelEncryptor ::new_outbound ( their_node_id . clone ( ) , self . get_ephemeral_key ( ) ) ;
2017-12-25 01:05:27 -05:00
let res = peer_encryptor . get_act_one ( ) . to_vec ( ) ;
let pending_read_buffer = [ 0 ; 50 ] . to_vec ( ) ; // Noise act two is 50 bytes
let mut peers = self . peers . lock ( ) . unwrap ( ) ;
if peers . peers . insert ( descriptor , Peer {
channel_encryptor : peer_encryptor ,
2018-04-01 19:21:26 -04:00
outbound : true ,
2018-11-08 10:36:34 +10:30
their_node_id : None ,
2019-12-23 17:52:58 -05:00
their_features : None ,
2017-12-25 01:05:27 -05:00
pending_outbound_buffer : LinkedList ::new ( ) ,
pending_outbound_buffer_first_msg_offset : 0 ,
awaiting_write_event : false ,
2020-10-06 16:47:23 -07:00
pending_read_buffer ,
2017-12-25 01:05:27 -05:00
pending_read_buffer_pos : 0 ,
pending_read_is_header : false ,
2018-10-02 16:02:17 +02:00
sync_status : InitSyncTracker ::NoSyncRequested ,
2019-09-20 11:16:45 -04:00
awaiting_pong : false ,
2017-12-25 01:05:27 -05:00
} ) . is_some ( ) {
panic! ( " PeerManager driver duplicated descriptors! " ) ;
} ;
Ok ( res )
}
/// Indicates a new inbound connection has been established.
2018-09-20 12:57:47 -04:00
///
2017-12-25 01:05:27 -05:00
/// May refuse the connection by returning an Err, but will never write bytes to the remote end
/// (outbound connector always speaks first). Note that if an Err is returned here you MUST NOT
2020-02-20 15:12:42 -05:00
/// call socket_disconnected for the new descriptor but must disconnect the connection
2017-12-25 01:05:27 -05:00
/// immediately.
2018-09-20 12:57:47 -04:00
///
2020-02-20 15:12:42 -05:00
/// Panics if descriptor is duplicative with some other descriptor which has not yet had
/// socket_disconnected called.
2017-12-25 01:05:27 -05:00
pub fn new_inbound_connection ( & self , descriptor : Descriptor ) -> Result < ( ) , PeerHandleError > {
let peer_encryptor = PeerChannelEncryptor ::new_inbound ( & self . our_node_secret ) ;
let pending_read_buffer = [ 0 ; 50 ] . to_vec ( ) ; // Noise act one is 50 bytes
let mut peers = self . peers . lock ( ) . unwrap ( ) ;
if peers . peers . insert ( descriptor , Peer {
channel_encryptor : peer_encryptor ,
2018-04-01 19:21:26 -04:00
outbound : false ,
2017-12-25 01:05:27 -05:00
their_node_id : None ,
2019-12-23 17:52:58 -05:00
their_features : None ,
2017-12-25 01:05:27 -05:00
pending_outbound_buffer : LinkedList ::new ( ) ,
pending_outbound_buffer_first_msg_offset : 0 ,
awaiting_write_event : false ,
2020-10-06 16:47:23 -07:00
pending_read_buffer ,
2017-12-25 01:05:27 -05:00
pending_read_buffer_pos : 0 ,
pending_read_is_header : false ,
2018-10-02 16:02:17 +02:00
sync_status : InitSyncTracker ::NoSyncRequested ,
2019-09-20 11:16:45 -04:00
awaiting_pong : false ,
2017-12-25 01:05:27 -05:00
} ) . is_some ( ) {
panic! ( " PeerManager driver duplicated descriptors! " ) ;
} ;
Ok ( ( ) )
}
2018-10-02 16:02:17 +02:00
fn do_attempt_write_data ( & self , descriptor : & mut Descriptor , peer : & mut Peer ) {
macro_rules ! encode_and_send_msg {
2020-01-21 15:26:21 -08:00
( $msg : expr ) = > {
2018-10-02 16:02:17 +02:00
{
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Encoding and sending sync update message of type {} to {} " , $msg . type_id ( ) , log_pubkey! ( peer . their_node_id . unwrap ( ) ) ) ;
2020-01-21 15:26:21 -08:00
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encode_msg! ( $msg ) [ .. ] ) ) ;
2018-10-02 16:02:17 +02:00
}
}
}
const MSG_BUFF_SIZE : usize = 10 ;
2017-12-25 01:05:27 -05:00
while ! peer . awaiting_write_event {
2018-10-02 16:02:17 +02:00
if peer . pending_outbound_buffer . len ( ) < MSG_BUFF_SIZE {
match peer . sync_status {
InitSyncTracker ::NoSyncRequested = > { } ,
InitSyncTracker ::ChannelsSyncing ( c ) if c < 0xffff_ffff_ffff_ffff = > {
let steps = ( ( MSG_BUFF_SIZE - peer . pending_outbound_buffer . len ( ) + 2 ) / 3 ) as u8 ;
2020-02-23 18:04:03 -05:00
let all_messages = self . message_handler . route_handler . get_next_channel_announcements ( c , steps ) ;
2020-02-24 13:18:50 -05:00
for & ( ref announce , ref update_a_option , ref update_b_option ) in all_messages . iter ( ) {
2020-01-21 15:26:21 -08:00
encode_and_send_msg! ( announce ) ;
2020-02-24 13:18:50 -05:00
if let & Some ( ref update_a ) = update_a_option {
encode_and_send_msg! ( update_a ) ;
}
if let & Some ( ref update_b ) = update_b_option {
encode_and_send_msg! ( update_b ) ;
}
2018-10-02 16:02:17 +02:00
peer . sync_status = InitSyncTracker ::ChannelsSyncing ( announce . contents . short_channel_id + 1 ) ;
}
if all_messages . is_empty ( ) | | all_messages . len ( ) ! = steps as usize {
peer . sync_status = InitSyncTracker ::ChannelsSyncing ( 0xffff_ffff_ffff_ffff ) ;
}
} ,
InitSyncTracker ::ChannelsSyncing ( c ) if c = = 0xffff_ffff_ffff_ffff = > {
let steps = ( MSG_BUFF_SIZE - peer . pending_outbound_buffer . len ( ) ) as u8 ;
let all_messages = self . message_handler . route_handler . get_next_node_announcements ( None , steps ) ;
for msg in all_messages . iter ( ) {
2020-01-21 15:26:21 -08:00
encode_and_send_msg! ( msg ) ;
2018-10-02 16:02:17 +02:00
peer . sync_status = InitSyncTracker ::NodesSyncing ( msg . contents . node_id ) ;
}
if all_messages . is_empty ( ) | | all_messages . len ( ) ! = steps as usize {
peer . sync_status = InitSyncTracker ::NoSyncRequested ;
}
} ,
InitSyncTracker ::ChannelsSyncing ( _ ) = > unreachable! ( ) ,
InitSyncTracker ::NodesSyncing ( key ) = > {
let steps = ( MSG_BUFF_SIZE - peer . pending_outbound_buffer . len ( ) ) as u8 ;
let all_messages = self . message_handler . route_handler . get_next_node_announcements ( Some ( & key ) , steps ) ;
for msg in all_messages . iter ( ) {
2020-01-21 15:26:21 -08:00
encode_and_send_msg! ( msg ) ;
2018-10-02 16:02:17 +02:00
peer . sync_status = InitSyncTracker ::NodesSyncing ( msg . contents . node_id ) ;
}
if all_messages . is_empty ( ) | | all_messages . len ( ) ! = steps as usize {
peer . sync_status = InitSyncTracker ::NoSyncRequested ;
}
} ,
}
}
2017-12-25 01:05:27 -05:00
if {
let next_buff = match peer . pending_outbound_buffer . front ( ) {
None = > return ,
Some ( buff ) = > buff ,
} ;
2018-10-02 16:02:17 +02:00
let should_be_reading = peer . pending_outbound_buffer . len ( ) < MSG_BUFF_SIZE ;
2019-07-31 02:57:08 +00:00
let pending = & next_buff [ peer . pending_outbound_buffer_first_msg_offset .. ] ;
let data_sent = descriptor . send_data ( pending , should_be_reading ) ;
2017-12-25 01:05:27 -05:00
peer . pending_outbound_buffer_first_msg_offset + = data_sent ;
if peer . pending_outbound_buffer_first_msg_offset = = next_buff . len ( ) { true } else { false }
} {
peer . pending_outbound_buffer_first_msg_offset = 0 ;
peer . pending_outbound_buffer . pop_front ( ) ;
} else {
peer . awaiting_write_event = true ;
}
}
}
/// Indicates that there is room to write data to the given socket descriptor.
2018-09-20 12:57:47 -04:00
///
2017-12-25 01:05:27 -05:00
/// May return an Err to indicate that the connection should be closed.
2018-09-20 12:57:47 -04:00
///
2017-12-25 01:05:27 -05:00
/// Will most likely call send_data on the descriptor passed in (or the descriptor handed into
2018-09-20 12:57:47 -04:00
/// new_*\_connection) before returning. Thus, be very careful with reentrancy issues! The
2020-02-20 15:12:42 -05:00
/// invariants around calling write_buffer_space_avail in case a write did not fully complete
/// must still hold - be ready to call write_buffer_space_avail again if a write call generated
/// here isn't sufficient! Panics if the descriptor was not previously registered in a
/// new_\*_connection event.
pub fn write_buffer_space_avail ( & self , descriptor : & mut Descriptor ) -> Result < ( ) , PeerHandleError > {
2017-12-25 01:05:27 -05:00
let mut peers = self . peers . lock ( ) . unwrap ( ) ;
match peers . peers . get_mut ( descriptor ) {
None = > panic! ( " Descriptor for write_event is not already known to PeerManager " ) ,
Some ( peer ) = > {
peer . awaiting_write_event = false ;
2018-10-02 16:02:17 +02:00
self . do_attempt_write_data ( descriptor , peer ) ;
2017-12-25 01:05:27 -05:00
}
} ;
Ok ( ( ) )
}
/// Indicates that data was read from the given socket descriptor.
2018-09-20 12:57:47 -04:00
///
2017-12-25 01:05:27 -05:00
/// May return an Err to indicate that the connection should be closed.
2018-09-20 12:57:47 -04:00
///
2018-10-20 18:17:19 -04:00
/// Will *not* call back into send_data on any descriptors to avoid reentrancy complexity.
/// Thus, however, you almost certainly want to call process_events() after any read_event to
/// generate send_data calls to handle responses.
2018-09-20 12:57:47 -04:00
///
2020-02-20 15:12:42 -05:00
/// If Ok(true) is returned, further read_events should not be triggered until a send_data call
/// on this file descriptor has resume_read set (preventing DoS issues in the send buffer).
2018-09-20 12:57:47 -04:00
///
2017-12-25 01:05:27 -05:00
/// Panics if the descriptor was not previously registered in a new_*_connection event.
2020-01-31 20:57:01 -05:00
pub fn read_event ( & self , peer_descriptor : & mut Descriptor , data : & [ u8 ] ) -> Result < bool , PeerHandleError > {
2018-03-19 17:26:29 -04:00
match self . do_read_event ( peer_descriptor , data ) {
Ok ( res ) = > Ok ( res ) ,
Err ( e ) = > {
2018-04-01 19:23:09 -04:00
self . disconnect_event_internal ( peer_descriptor , e . no_connection_possible ) ;
2018-03-19 17:26:29 -04:00
Err ( e )
}
}
}
2020-05-18 10:55:28 -07:00
/// Append a message to a peer's pending outbound/write buffer, and update the map of peers needing sends accordingly.
fn enqueue_message < M : Encode + Writeable > ( & self , peers_needing_send : & mut HashSet < Descriptor > , peer : & mut Peer , descriptor : Descriptor , message : & M ) {
let mut buffer = VecWriter ( Vec ::new ( ) ) ;
wire ::write ( message , & mut buffer ) . unwrap ( ) ; // crash if the write failed
let encoded_message = buffer . 0 ;
log_trace! ( self . logger , " Enqueueing message of type {} to {} " , message . type_id ( ) , log_pubkey! ( peer . their_node_id . unwrap ( ) ) ) ;
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encoded_message [ .. ] ) ) ;
peers_needing_send . insert ( descriptor ) ;
}
2020-01-31 20:57:01 -05:00
fn do_read_event ( & self , peer_descriptor : & mut Descriptor , data : & [ u8 ] ) -> Result < bool , PeerHandleError > {
2017-12-25 01:05:27 -05:00
let pause_read = {
2018-09-08 13:56:45 -04:00
let mut peers_lock = self . peers . lock ( ) . unwrap ( ) ;
2020-01-16 10:48:16 -08:00
let peers = & mut * peers_lock ;
2018-09-08 13:56:45 -04:00
let pause_read = match peers . peers . get_mut ( peer_descriptor ) {
2017-12-25 01:05:27 -05:00
None = > panic! ( " Descriptor for read_event is not already known to PeerManager " ) ,
Some ( peer ) = > {
assert! ( peer . pending_read_buffer . len ( ) > 0 ) ;
assert! ( peer . pending_read_buffer . len ( ) > peer . pending_read_buffer_pos ) ;
let mut read_pos = 0 ;
while read_pos < data . len ( ) {
{
let data_to_copy = cmp ::min ( peer . pending_read_buffer . len ( ) - peer . pending_read_buffer_pos , data . len ( ) - read_pos ) ;
peer . pending_read_buffer [ peer . pending_read_buffer_pos .. peer . pending_read_buffer_pos + data_to_copy ] . copy_from_slice ( & data [ read_pos .. read_pos + data_to_copy ] ) ;
read_pos + = data_to_copy ;
peer . pending_read_buffer_pos + = data_to_copy ;
}
2018-04-01 19:23:09 -04:00
2017-12-25 01:05:27 -05:00
if peer . pending_read_buffer_pos = = peer . pending_read_buffer . len ( ) {
2018-04-01 19:23:09 -04:00
peer . pending_read_buffer_pos = 0 ;
macro_rules ! try_potential_handleerror {
( $thing : expr ) = > {
match $thing {
Ok ( x ) = > x ,
Err ( e ) = > {
2019-11-04 19:54:43 -05:00
match e . action {
msgs ::ErrorAction ::DisconnectPeer { msg : _ } = > {
//TODO: Try to push msg
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Got Err handling message, disconnecting peer because {} " , e . err ) ;
2019-11-04 19:54:43 -05:00
return Err ( PeerHandleError { no_connection_possible : false } ) ;
} ,
msgs ::ErrorAction ::IgnoreError = > {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Got Err handling message, ignoring because {} " , e . err ) ;
2019-11-04 19:54:43 -05:00
continue ;
} ,
msgs ::ErrorAction ::SendErrorMessage { msg } = > {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Got Err handling message, sending Error message because {} " , e . err ) ;
2020-05-18 10:55:28 -07:00
self . enqueue_message ( & mut peers . peers_needing_send , peer , peer_descriptor . clone ( ) , & msg ) ;
2019-11-04 19:54:43 -05:00
continue ;
} ,
2018-04-01 19:23:09 -04:00
}
}
} ;
}
}
2018-09-08 13:56:45 -04:00
macro_rules ! insert_node_id {
( ) = > {
match peers . node_id_to_descriptor . entry ( peer . their_node_id . unwrap ( ) ) {
hash_map ::Entry ::Occupied ( _ ) = > {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Got second connection with {}, closing " , log_pubkey! ( peer . their_node_id . unwrap ( ) ) ) ;
2018-09-08 13:56:45 -04:00
peer . their_node_id = None ; // Unset so that we don't generate a peer_disconnected event
return Err ( PeerHandleError { no_connection_possible : false } )
} ,
2018-11-02 10:45:29 -04:00
hash_map ::Entry ::Vacant ( entry ) = > {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Finished noise handshake for connection with {} " , log_pubkey! ( peer . their_node_id . unwrap ( ) ) ) ;
2018-11-02 10:45:29 -04:00
entry . insert ( peer_descriptor . clone ( ) )
} ,
2018-09-08 13:56:45 -04:00
} ;
}
}
2017-12-25 01:05:27 -05:00
let next_step = peer . channel_encryptor . get_noise_step ( ) ;
match next_step {
NextNoiseStep ::ActOne = > {
2019-07-18 22:17:36 -04:00
let act_two = try_potential_handleerror! ( peer . channel_encryptor . process_act_one_with_keys ( & peer . pending_read_buffer [ .. ] , & self . our_node_secret , self . get_ephemeral_key ( ) ) ) . to_vec ( ) ;
2017-12-25 01:05:27 -05:00
peer . pending_outbound_buffer . push_back ( act_two ) ;
peer . pending_read_buffer = [ 0 ; 66 ] . to_vec ( ) ; // act three is 66 bytes long
} ,
NextNoiseStep ::ActTwo = > {
2018-11-08 10:36:34 +10:30
let ( act_three , their_node_id ) = try_potential_handleerror! ( peer . channel_encryptor . process_act_two ( & peer . pending_read_buffer [ .. ] , & self . our_node_secret ) ) ;
peer . pending_outbound_buffer . push_back ( act_three . to_vec ( ) ) ;
2017-12-25 01:05:27 -05:00
peer . pending_read_buffer = [ 0 ; 18 ] . to_vec ( ) ; // Message length header is 18 bytes
2018-03-29 16:42:44 -04:00
peer . pending_read_is_header = true ;
2017-12-25 01:05:27 -05:00
2018-11-08 10:36:34 +10:30
peer . their_node_id = Some ( their_node_id ) ;
2018-09-08 13:56:45 -04:00
insert_node_id! ( ) ;
2020-04-15 17:16:45 -07:00
let mut features = InitFeatures ::known ( ) ;
2020-04-23 09:47:15 -07:00
if ! self . message_handler . route_handler . should_request_full_sync ( & peer . their_node_id . unwrap ( ) ) {
features . clear_initial_routing_sync ( ) ;
2018-06-16 19:39:40 -04:00
}
2020-01-21 15:26:21 -08:00
let resp = msgs ::Init { features } ;
2020-05-18 10:55:28 -07:00
self . enqueue_message ( & mut peers . peers_needing_send , peer , peer_descriptor . clone ( ) , & resp ) ;
2017-12-25 01:05:27 -05:00
} ,
NextNoiseStep ::ActThree = > {
2018-11-08 10:36:34 +10:30
let their_node_id = try_potential_handleerror! ( peer . channel_encryptor . process_act_three ( & peer . pending_read_buffer [ .. ] ) ) ;
2017-12-25 01:05:27 -05:00
peer . pending_read_buffer = [ 0 ; 18 ] . to_vec ( ) ; // Message length header is 18 bytes
peer . pending_read_is_header = true ;
peer . their_node_id = Some ( their_node_id ) ;
2018-09-08 13:56:45 -04:00
insert_node_id! ( ) ;
2017-12-25 01:05:27 -05:00
} ,
NextNoiseStep ::NoiseComplete = > {
if peer . pending_read_is_header {
let msg_len = try_potential_handleerror! ( peer . channel_encryptor . decrypt_length_header ( & peer . pending_read_buffer [ .. ] ) ) ;
peer . pending_read_buffer = Vec ::with_capacity ( msg_len as usize + 16 ) ;
peer . pending_read_buffer . resize ( msg_len as usize + 16 , 0 ) ;
2018-03-20 16:41:33 -04:00
if msg_len < 2 { // Need at least the message type tag
2018-04-01 19:23:09 -04:00
return Err ( PeerHandleError { no_connection_possible : false } ) ;
2017-12-25 01:05:27 -05:00
}
peer . pending_read_is_header = false ;
} else {
let msg_data = try_potential_handleerror! ( peer . channel_encryptor . decrypt_message ( & peer . pending_read_buffer [ .. ] ) ) ;
assert! ( msg_data . len ( ) > = 2 ) ;
2018-04-01 19:23:09 -04:00
// Reset read buffer
peer . pending_read_buffer = [ 0 ; 18 ] . to_vec ( ) ;
peer . pending_read_is_header = true ;
2020-01-21 15:26:21 -08:00
let mut reader = ::std ::io ::Cursor ::new ( & msg_data [ .. ] ) ;
2020-02-06 14:17:44 -08:00
let message_result = wire ::read ( & mut reader ) ;
let message = match message_result {
Ok ( x ) = > x ,
Err ( e ) = > {
match e {
msgs ::DecodeError ::UnknownVersion = > return Err ( PeerHandleError { no_connection_possible : false } ) ,
msgs ::DecodeError ::UnknownRequiredFeature = > {
2020-03-02 12:55:53 -05:00
log_debug! ( self . logger , " Got a channel/node announcement with an known required feature flag, you may want to update! " ) ;
2020-02-06 14:17:44 -08:00
continue ;
}
msgs ::DecodeError ::InvalidValue = > {
2020-03-02 12:55:53 -05:00
log_debug! ( self . logger , " Got an invalid value while deserializing message " ) ;
2020-02-06 14:17:44 -08:00
return Err ( PeerHandleError { no_connection_possible : false } ) ;
}
msgs ::DecodeError ::ShortRead = > {
2020-03-02 12:55:53 -05:00
log_debug! ( self . logger , " Deserialization failed due to shortness of message " ) ;
2020-02-06 14:17:44 -08:00
return Err ( PeerHandleError { no_connection_possible : false } ) ;
}
msgs ::DecodeError ::BadLengthDescriptor = > return Err ( PeerHandleError { no_connection_possible : false } ) ,
msgs ::DecodeError ::Io ( _ ) = > return Err ( PeerHandleError { no_connection_possible : false } ) ,
}
}
} ;
2020-05-22 13:03:49 -07:00
if let Err ( handling_error ) = self . handle_message ( & mut peers . peers_needing_send , peer , peer_descriptor . clone ( ) , message ) {
match handling_error {
MessageHandlingError ::PeerHandleError ( e ) = > { return Err ( e ) } ,
MessageHandlingError ::LightningError ( e ) = > {
try_potential_handleerror! ( Err ( e ) ) ;
} ,
}
2017-12-25 01:05:27 -05:00
}
}
}
}
}
}
2018-10-02 16:02:17 +02:00
self . do_attempt_write_data ( peer_descriptor , peer ) ;
2017-12-25 01:05:27 -05:00
2018-09-08 13:56:45 -04:00
peer . pending_outbound_buffer . len ( ) > 10 // pause_read
2017-12-25 01:05:27 -05:00
}
} ;
2018-03-27 11:17:40 -04:00
pause_read
} ;
Ok ( pause_read )
}
2020-05-22 13:03:49 -07:00
/// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
fn handle_message ( & self , peers_needing_send : & mut HashSet < Descriptor > , peer : & mut Peer , peer_descriptor : Descriptor , message : wire ::Message ) -> Result < ( ) , MessageHandlingError > {
log_trace! ( self . logger , " Received message of type {} from {} " , message . type_id ( ) , log_pubkey! ( peer . their_node_id . unwrap ( ) ) ) ;
// Need an Init as first message
if let wire ::Message ::Init ( _ ) = message {
} else if peer . their_features . is_none ( ) {
log_trace! ( self . logger , " Peer {} sent non-Init first message " , log_pubkey! ( peer . their_node_id . unwrap ( ) ) ) ;
return Err ( PeerHandleError { no_connection_possible : false } . into ( ) ) ;
}
match message {
// Setup and Control messages:
wire ::Message ::Init ( msg ) = > {
if msg . features . requires_unknown_bits ( ) {
log_info! ( self . logger , " Peer global features required unknown version bits " ) ;
return Err ( PeerHandleError { no_connection_possible : true } . into ( ) ) ;
}
if msg . features . requires_unknown_bits ( ) {
log_info! ( self . logger , " Peer local features required unknown version bits " ) ;
return Err ( PeerHandleError { no_connection_possible : true } . into ( ) ) ;
}
if peer . their_features . is_some ( ) {
return Err ( PeerHandleError { no_connection_possible : false } . into ( ) ) ;
}
log_info! (
self . logger , " Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, static_remote_key: {}, unknown flags (local and global): {} " ,
if msg . features . supports_data_loss_protect ( ) { " supported " } else { " not supported " } ,
if msg . features . initial_routing_sync ( ) { " requested " } else { " not requested " } ,
if msg . features . supports_upfront_shutdown_script ( ) { " supported " } else { " not supported " } ,
if msg . features . supports_static_remote_key ( ) { " supported " } else { " not supported " } ,
if msg . features . supports_unknown_bits ( ) { " present " } else { " none " }
) ;
if msg . features . initial_routing_sync ( ) {
peer . sync_status = InitSyncTracker ::ChannelsSyncing ( 0 ) ;
peers_needing_send . insert ( peer_descriptor . clone ( ) ) ;
}
if ! msg . features . supports_static_remote_key ( ) {
log_debug! ( self . logger , " Peer {} does not support static remote key, disconnecting with no_connection_possible " , log_pubkey! ( peer . their_node_id . unwrap ( ) ) ) ;
return Err ( PeerHandleError { no_connection_possible : true } . into ( ) ) ;
}
if ! peer . outbound {
let mut features = InitFeatures ::known ( ) ;
if ! self . message_handler . route_handler . should_request_full_sync ( & peer . their_node_id . unwrap ( ) ) {
features . clear_initial_routing_sync ( ) ;
}
let resp = msgs ::Init { features } ;
self . enqueue_message ( peers_needing_send , peer , peer_descriptor . clone ( ) , & resp ) ;
}
self . message_handler . chan_handler . peer_connected ( & peer . their_node_id . unwrap ( ) , & msg ) ;
peer . their_features = Some ( msg . features ) ;
} ,
wire ::Message ::Error ( msg ) = > {
let mut data_is_printable = true ;
for b in msg . data . bytes ( ) {
if b < 32 | | b > 126 {
data_is_printable = false ;
break ;
}
}
if data_is_printable {
log_debug! ( self . logger , " Got Err message from {}: {} " , log_pubkey! ( peer . their_node_id . unwrap ( ) ) , msg . data ) ;
} else {
log_debug! ( self . logger , " Got Err message from {} with non-ASCII error message " , log_pubkey! ( peer . their_node_id . unwrap ( ) ) ) ;
}
self . message_handler . chan_handler . handle_error ( & peer . their_node_id . unwrap ( ) , & msg ) ;
if msg . channel_id = = [ 0 ; 32 ] {
return Err ( PeerHandleError { no_connection_possible : true } . into ( ) ) ;
}
} ,
wire ::Message ::Ping ( msg ) = > {
if msg . ponglen < 65532 {
let resp = msgs ::Pong { byteslen : msg . ponglen } ;
self . enqueue_message ( peers_needing_send , peer , peer_descriptor . clone ( ) , & resp ) ;
}
} ,
wire ::Message ::Pong ( _msg ) = > {
peer . awaiting_pong = false ;
} ,
// Channel messages:
wire ::Message ::OpenChannel ( msg ) = > {
self . message_handler . chan_handler . handle_open_channel ( & peer . their_node_id . unwrap ( ) , peer . their_features . clone ( ) . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::AcceptChannel ( msg ) = > {
self . message_handler . chan_handler . handle_accept_channel ( & peer . their_node_id . unwrap ( ) , peer . their_features . clone ( ) . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::FundingCreated ( msg ) = > {
self . message_handler . chan_handler . handle_funding_created ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::FundingSigned ( msg ) = > {
self . message_handler . chan_handler . handle_funding_signed ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::FundingLocked ( msg ) = > {
self . message_handler . chan_handler . handle_funding_locked ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::Shutdown ( msg ) = > {
self . message_handler . chan_handler . handle_shutdown ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::ClosingSigned ( msg ) = > {
self . message_handler . chan_handler . handle_closing_signed ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
// Commitment messages:
wire ::Message ::UpdateAddHTLC ( msg ) = > {
self . message_handler . chan_handler . handle_update_add_htlc ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::UpdateFulfillHTLC ( msg ) = > {
self . message_handler . chan_handler . handle_update_fulfill_htlc ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::UpdateFailHTLC ( msg ) = > {
self . message_handler . chan_handler . handle_update_fail_htlc ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::UpdateFailMalformedHTLC ( msg ) = > {
self . message_handler . chan_handler . handle_update_fail_malformed_htlc ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::CommitmentSigned ( msg ) = > {
self . message_handler . chan_handler . handle_commitment_signed ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::RevokeAndACK ( msg ) = > {
self . message_handler . chan_handler . handle_revoke_and_ack ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::UpdateFee ( msg ) = > {
self . message_handler . chan_handler . handle_update_fee ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::ChannelReestablish ( msg ) = > {
self . message_handler . chan_handler . handle_channel_reestablish ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
// Routing messages:
wire ::Message ::AnnouncementSignatures ( msg ) = > {
self . message_handler . chan_handler . handle_announcement_signatures ( & peer . their_node_id . unwrap ( ) , & msg ) ;
} ,
wire ::Message ::ChannelAnnouncement ( msg ) = > {
let should_forward = match self . message_handler . route_handler . handle_channel_announcement ( & msg ) {
Ok ( v ) = > v ,
Err ( e ) = > { return Err ( e . into ( ) ) ; } ,
} ;
if should_forward {
// TODO: forward msg along to all our other peers!
}
} ,
wire ::Message ::NodeAnnouncement ( msg ) = > {
let should_forward = match self . message_handler . route_handler . handle_node_announcement ( & msg ) {
Ok ( v ) = > v ,
Err ( e ) = > { return Err ( e . into ( ) ) ; } ,
} ;
if should_forward {
// TODO: forward msg along to all our other peers!
}
} ,
wire ::Message ::ChannelUpdate ( msg ) = > {
let should_forward = match self . message_handler . route_handler . handle_channel_update ( & msg ) {
Ok ( v ) = > v ,
Err ( e ) = > { return Err ( e . into ( ) ) ; } ,
} ;
if should_forward {
// TODO: forward msg along to all our other peers!
}
} ,
// Unknown messages:
wire ::Message ::Unknown ( msg_type ) if msg_type . is_even ( ) = > {
log_debug! ( self . logger , " Received unknown even message of type {}, disconnecting peer! " , msg_type ) ;
// Fail the channel if message is an even, unknown type as per BOLT #1.
return Err ( PeerHandleError { no_connection_possible : true } . into ( ) ) ;
} ,
wire ::Message ::Unknown ( msg_type ) = > {
log_trace! ( self . logger , " Received unknown odd message of type {}, ignoring " , msg_type ) ;
}
} ;
Ok ( ( ) )
}
2018-10-20 18:17:19 -04:00
/// Checks for any events generated by our handlers and processes them. Includes sending most
/// response messages as well as messages generated by calls to handler functions directly (eg
/// functions like ChannelManager::process_pending_htlc_forward or send_payment).
2018-03-27 11:17:40 -04:00
pub fn process_events ( & self ) {
{
2017-12-25 01:05:27 -05:00
// TODO: There are some DoS attacks here where you can flood someone's outbound send
// buffer by doing things like announcing channels on another node. We should be willing to
// drop optional-ish messages when send buffers get full!
2018-10-19 16:25:32 -04:00
let mut events_generated = self . message_handler . chan_handler . get_and_clear_pending_msg_events ( ) ;
2018-10-20 18:17:19 -04:00
let mut peers_lock = self . peers . lock ( ) . unwrap ( ) ;
2020-01-16 10:48:16 -08:00
let peers = & mut * peers_lock ;
2017-12-25 01:05:27 -05:00
for event in events_generated . drain ( .. ) {
macro_rules ! get_peer_for_forwarding {
( $node_id : expr , $handle_no_such_peer : block ) = > {
{
let descriptor = match peers . node_id_to_descriptor . get ( $node_id ) {
Some ( descriptor ) = > descriptor . clone ( ) ,
None = > {
$handle_no_such_peer ;
continue ;
} ,
} ;
match peers . peers . get_mut ( & descriptor ) {
Some ( peer ) = > {
2019-12-23 17:52:58 -05:00
if peer . their_features . is_none ( ) {
2018-08-25 14:32:02 -04:00
$handle_no_such_peer ;
continue ;
}
2017-12-25 01:05:27 -05:00
( descriptor , peer )
} ,
None = > panic! ( " Inconsistent peers set state! " ) ,
}
}
}
}
match event {
2018-10-19 16:49:12 -04:00
MessageSendEvent ::SendAcceptChannel { ref node_id , ref msg } = > {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Handling SendAcceptChannel event in peer_handler for node {} for channel {} " ,
2018-10-19 16:49:12 -04:00
log_pubkey! ( node_id ) ,
log_bytes! ( msg . temporary_channel_id ) ) ;
let ( mut descriptor , peer ) = get_peer_for_forwarding! ( node_id , {
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
} ) ;
2020-01-21 15:26:21 -08:00
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encode_msg! ( msg ) ) ) ;
2018-10-02 16:02:17 +02:00
self . do_attempt_write_data ( & mut descriptor , peer ) ;
2018-10-19 16:49:12 -04:00
} ,
2018-10-19 16:25:32 -04:00
MessageSendEvent ::SendOpenChannel { ref node_id , ref msg } = > {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Handling SendOpenChannel event in peer_handler for node {} for channel {} " ,
2018-08-16 22:38:49 -04:00
log_pubkey! ( node_id ) ,
log_bytes! ( msg . temporary_channel_id ) ) ;
2018-07-06 17:29:34 -04:00
let ( mut descriptor , peer ) = get_peer_for_forwarding! ( node_id , {
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
} ) ;
2020-01-21 15:26:21 -08:00
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encode_msg! ( msg ) ) ) ;
2018-10-02 16:02:17 +02:00
self . do_attempt_write_data ( & mut descriptor , peer ) ;
2018-07-06 17:29:34 -04:00
} ,
2018-10-19 16:25:32 -04:00
MessageSendEvent ::SendFundingCreated { ref node_id , ref msg } = > {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {}) " ,
2018-08-16 22:38:49 -04:00
log_pubkey! ( node_id ) ,
log_bytes! ( msg . temporary_channel_id ) ,
log_funding_channel_id! ( msg . funding_txid , msg . funding_output_index ) ) ;
2017-12-25 01:05:27 -05:00
let ( mut descriptor , peer ) = get_peer_for_forwarding! ( node_id , {
//TODO: generate a DiscardFunding event indicating to the wallet that
//they should just throw away this funding transaction
} ) ;
2020-01-21 15:26:21 -08:00
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encode_msg! ( msg ) ) ) ;
2018-10-02 16:02:17 +02:00
self . do_attempt_write_data ( & mut descriptor , peer ) ;
2017-12-25 01:05:27 -05:00
} ,
2018-10-19 17:06:40 -04:00
MessageSendEvent ::SendFundingSigned { ref node_id , ref msg } = > {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Handling SendFundingSigned event in peer_handler for node {} for channel {} " ,
2018-10-19 17:06:40 -04:00
log_pubkey! ( node_id ) ,
log_bytes! ( msg . channel_id ) ) ;
let ( mut descriptor , peer ) = get_peer_for_forwarding! ( node_id , {
//TODO: generate a DiscardFunding event indicating to the wallet that
//they should just throw away this funding transaction
} ) ;
2020-01-21 15:26:21 -08:00
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encode_msg! ( msg ) ) ) ;
2018-10-02 16:02:17 +02:00
self . do_attempt_write_data ( & mut descriptor , peer ) ;
2018-10-19 17:06:40 -04:00
} ,
2018-10-19 17:30:52 -04:00
MessageSendEvent ::SendFundingLocked { ref node_id , ref msg } = > {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Handling SendFundingLocked event in peer_handler for node {} for channel {} " ,
2018-08-16 22:38:49 -04:00
log_pubkey! ( node_id ) ,
log_bytes! ( msg . channel_id ) ) ;
2017-12-25 01:05:27 -05:00
let ( mut descriptor , peer ) = get_peer_for_forwarding! ( node_id , {
//TODO: Do whatever we're gonna do for handling dropped messages
} ) ;
2020-01-21 15:26:21 -08:00
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encode_msg! ( msg ) ) ) ;
2018-10-02 16:02:17 +02:00
self . do_attempt_write_data ( & mut descriptor , peer ) ;
2018-10-19 17:30:52 -04:00
} ,
MessageSendEvent ::SendAnnouncementSignatures { ref node_id , ref msg } = > {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {}) " ,
2018-10-19 17:30:52 -04:00
log_pubkey! ( node_id ) ,
log_bytes! ( msg . channel_id ) ) ;
let ( mut descriptor , peer ) = get_peer_for_forwarding! ( node_id , {
//TODO: generate a DiscardFunding event indicating to the wallet that
//they should just throw away this funding transaction
} ) ;
2020-01-21 15:26:21 -08:00
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encode_msg! ( msg ) ) ) ;
2018-10-02 16:02:17 +02:00
self . do_attempt_write_data ( & mut descriptor , peer ) ;
2017-12-25 01:05:27 -05:00
} ,
2018-10-19 16:25:32 -04:00
MessageSendEvent ::UpdateHTLCs { ref node_id , updates : msgs ::CommitmentUpdate { ref update_add_htlcs , ref update_fulfill_htlcs , ref update_fail_htlcs , ref update_fail_malformed_htlcs , ref update_fee , ref commitment_signed } } = > {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {} " ,
2018-08-16 22:38:49 -04:00
log_pubkey! ( node_id ) ,
2018-08-22 12:09:11 -04:00
update_add_htlcs . len ( ) ,
update_fulfill_htlcs . len ( ) ,
update_fail_htlcs . len ( ) ,
log_bytes! ( commitment_signed . channel_id ) ) ;
2017-12-25 01:05:27 -05:00
let ( mut descriptor , peer ) = get_peer_for_forwarding! ( node_id , {
//TODO: Do whatever we're gonna do for handling dropped messages
} ) ;
2018-08-22 12:09:11 -04:00
for msg in update_add_htlcs {
2020-01-21 15:26:21 -08:00
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encode_msg! ( msg ) ) ) ;
2017-12-25 01:05:27 -05:00
}
2018-08-22 12:09:11 -04:00
for msg in update_fulfill_htlcs {
2020-01-21 15:26:21 -08:00
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encode_msg! ( msg ) ) ) ;
2018-08-22 12:09:11 -04:00
}
for msg in update_fail_htlcs {
2020-01-21 15:26:21 -08:00
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encode_msg! ( msg ) ) ) ;
2018-08-22 12:09:11 -04:00
}
2018-08-26 16:30:01 -04:00
for msg in update_fail_malformed_htlcs {
2020-01-21 15:26:21 -08:00
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encode_msg! ( msg ) ) ) ;
2018-08-26 16:30:01 -04:00
}
2018-09-26 19:54:28 -04:00
if let & Some ( ref msg ) = update_fee {
2020-01-21 15:26:21 -08:00
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encode_msg! ( msg ) ) ) ;
2018-09-26 19:54:28 -04:00
}
2020-01-21 15:26:21 -08:00
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encode_msg! ( commitment_signed ) ) ) ;
2018-10-02 16:02:17 +02:00
self . do_attempt_write_data ( & mut descriptor , peer ) ;
2018-03-20 19:11:27 -04:00
} ,
2018-10-19 16:25:32 -04:00
MessageSendEvent ::SendRevokeAndACK { ref node_id , ref msg } = > {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Handling SendRevokeAndACK event in peer_handler for node {} for channel {} " ,
2018-10-17 11:35:26 -04:00
log_pubkey! ( node_id ) ,
log_bytes! ( msg . channel_id ) ) ;
let ( mut descriptor , peer ) = get_peer_for_forwarding! ( node_id , {
//TODO: Do whatever we're gonna do for handling dropped messages
} ) ;
2020-01-21 15:26:21 -08:00
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encode_msg! ( msg ) ) ) ;
2018-10-02 16:02:17 +02:00
self . do_attempt_write_data ( & mut descriptor , peer ) ;
2018-10-17 11:35:26 -04:00
} ,
2018-10-19 21:50:16 -04:00
MessageSendEvent ::SendClosingSigned { ref node_id , ref msg } = > {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Handling SendClosingSigned event in peer_handler for node {} for channel {} " ,
2018-10-19 21:50:16 -04:00
log_pubkey! ( node_id ) ,
log_bytes! ( msg . channel_id ) ) ;
let ( mut descriptor , peer ) = get_peer_for_forwarding! ( node_id , {
//TODO: Do whatever we're gonna do for handling dropped messages
} ) ;
2020-01-21 15:26:21 -08:00
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encode_msg! ( msg ) ) ) ;
2018-10-02 16:02:17 +02:00
self . do_attempt_write_data ( & mut descriptor , peer ) ;
2018-10-19 21:50:16 -04:00
} ,
2018-10-19 16:25:32 -04:00
MessageSendEvent ::SendShutdown { ref node_id , ref msg } = > {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Handling Shutdown event in peer_handler for node {} for channel {} " ,
2018-08-16 22:38:49 -04:00
log_pubkey! ( node_id ) ,
log_bytes! ( msg . channel_id ) ) ;
2018-07-22 23:03:31 -04:00
let ( mut descriptor , peer ) = get_peer_for_forwarding! ( node_id , {
//TODO: Do whatever we're gonna do for handling dropped messages
} ) ;
2020-01-21 15:26:21 -08:00
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encode_msg! ( msg ) ) ) ;
2018-10-02 16:02:17 +02:00
self . do_attempt_write_data ( & mut descriptor , peer ) ;
2018-07-22 23:03:31 -04:00
} ,
2018-10-20 17:50:34 -04:00
MessageSendEvent ::SendChannelReestablish { ref node_id , ref msg } = > {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Handling SendChannelReestablish event in peer_handler for node {} for channel {} " ,
2018-10-20 17:50:34 -04:00
log_pubkey! ( node_id ) ,
log_bytes! ( msg . channel_id ) ) ;
let ( mut descriptor , peer ) = get_peer_for_forwarding! ( node_id , {
//TODO: Do whatever we're gonna do for handling dropped messages
} ) ;
2020-01-21 15:26:21 -08:00
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encode_msg! ( msg ) ) ) ;
2018-10-02 16:02:17 +02:00
self . do_attempt_write_data ( & mut descriptor , peer ) ;
2018-10-20 17:50:34 -04:00
} ,
2018-10-19 16:25:32 -04:00
MessageSendEvent ::BroadcastChannelAnnouncement { ref msg , ref update_msg } = > {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {} " , msg . contents . short_channel_id ) ;
2018-04-24 20:40:22 -04:00
if self . message_handler . route_handler . handle_channel_announcement ( msg ) . is_ok ( ) & & self . message_handler . route_handler . handle_channel_update ( update_msg ) . is_ok ( ) {
2020-01-21 15:26:21 -08:00
let encoded_msg = encode_msg! ( msg ) ;
let encoded_update_msg = encode_msg! ( update_msg ) ;
2017-12-25 01:05:27 -05:00
2018-04-24 20:40:22 -04:00
for ( ref descriptor , ref mut peer ) in peers . peers . iter_mut ( ) {
2019-12-23 17:52:58 -05:00
if ! peer . channel_encryptor . is_ready_for_encryption ( ) | | peer . their_features . is_none ( ) | |
2020-01-02 20:32:37 -05:00
! peer . should_forward_channel_announcement ( msg . contents . short_channel_id ) {
2018-04-24 20:40:22 -04:00
continue
}
match peer . their_node_id {
None = > continue ,
Some ( their_node_id ) = > {
if their_node_id = = msg . contents . node_id_1 | | their_node_id = = msg . contents . node_id_2 {
continue
}
2017-12-25 01:05:27 -05:00
}
}
2018-04-24 20:40:22 -04:00
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encoded_msg [ .. ] ) ) ;
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encoded_update_msg [ .. ] ) ) ;
2018-10-02 16:02:17 +02:00
self . do_attempt_write_data ( & mut ( * descriptor ) . clone ( ) , peer ) ;
2018-04-24 20:40:22 -04:00
}
}
} ,
2020-01-02 20:32:37 -05:00
MessageSendEvent ::BroadcastNodeAnnouncement { ref msg } = > {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Handling BroadcastNodeAnnouncement event in peer_handler " ) ;
2020-01-02 20:32:37 -05:00
if self . message_handler . route_handler . handle_node_announcement ( msg ) . is_ok ( ) {
let encoded_msg = encode_msg! ( msg ) ;
for ( ref descriptor , ref mut peer ) in peers . peers . iter_mut ( ) {
if ! peer . channel_encryptor . is_ready_for_encryption ( ) | | peer . their_features . is_none ( ) | |
! peer . should_forward_node_announcement ( msg . contents . node_id ) {
continue
}
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encoded_msg [ .. ] ) ) ;
self . do_attempt_write_data ( & mut ( * descriptor ) . clone ( ) , peer ) ;
}
}
} ,
2018-10-19 16:25:32 -04:00
MessageSendEvent ::BroadcastChannelUpdate { ref msg } = > {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Handling BroadcastChannelUpdate event in peer_handler for short channel id {} " , msg . contents . short_channel_id ) ;
2018-04-24 20:40:22 -04:00
if self . message_handler . route_handler . handle_channel_update ( msg ) . is_ok ( ) {
2020-01-21 15:26:21 -08:00
let encoded_msg = encode_msg! ( msg ) ;
2018-04-24 20:40:22 -04:00
for ( ref descriptor , ref mut peer ) in peers . peers . iter_mut ( ) {
2019-12-23 17:52:58 -05:00
if ! peer . channel_encryptor . is_ready_for_encryption ( ) | | peer . their_features . is_none ( ) | |
2020-01-02 20:32:37 -05:00
! peer . should_forward_channel_announcement ( msg . contents . short_channel_id ) {
2018-04-24 20:40:22 -04:00
continue
}
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encoded_msg [ .. ] ) ) ;
2018-10-02 16:02:17 +02:00
self . do_attempt_write_data ( & mut ( * descriptor ) . clone ( ) , peer ) ;
2017-12-25 01:05:27 -05:00
}
}
} ,
2018-10-19 16:25:32 -04:00
MessageSendEvent ::PaymentFailureNetworkUpdate { ref update } = > {
2018-10-22 11:12:44 -04:00
self . message_handler . route_handler . handle_htlc_fail_channel_update ( update ) ;
} ,
2018-10-19 16:25:32 -04:00
MessageSendEvent ::HandleError { ref node_id , ref action } = > {
2019-11-04 19:54:43 -05:00
match * action {
msgs ::ErrorAction ::DisconnectPeer { ref msg } = > {
if let Some ( mut descriptor ) = peers . node_id_to_descriptor . remove ( node_id ) {
peers . peers_needing_send . remove ( & descriptor ) ;
if let Some ( mut peer ) = peers . peers . remove ( & descriptor ) {
if let Some ( ref msg ) = * msg {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Handling DisconnectPeer HandleError event in peer_handler for node {} with message {} " ,
2019-11-04 19:54:43 -05:00
log_pubkey! ( node_id ) ,
msg . data ) ;
2020-01-21 15:26:21 -08:00
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encode_msg! ( msg ) ) ) ;
2019-11-04 19:54:43 -05:00
// This isn't guaranteed to work, but if there is enough free
// room in the send buffer, put the error message there...
self . do_attempt_write_data ( & mut descriptor , & mut peer ) ;
} else {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Handling DisconnectPeer HandleError event in peer_handler for node {} with no message " , log_pubkey! ( node_id ) ) ;
2018-08-01 16:34:03 +00:00
}
}
2019-11-04 19:54:43 -05:00
descriptor . disconnect_socket ( ) ;
self . message_handler . chan_handler . peer_disconnected ( & node_id , false ) ;
}
} ,
msgs ::ErrorAction ::IgnoreError = > { } ,
msgs ::ErrorAction ::SendErrorMessage { ref msg } = > {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Handling SendErrorMessage HandleError event in peer_handler for node {} with message {} " ,
2019-11-04 19:54:43 -05:00
log_pubkey! ( node_id ) ,
msg . data ) ;
let ( mut descriptor , peer ) = get_peer_for_forwarding! ( node_id , {
//TODO: Do whatever we're gonna do for handling dropped messages
} ) ;
2020-01-21 15:26:21 -08:00
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encode_msg! ( msg ) ) ) ;
2019-11-04 19:54:43 -05:00
self . do_attempt_write_data ( & mut descriptor , peer ) ;
} ,
2018-07-23 01:06:45 +00:00
}
2018-08-01 16:34:03 +00:00
}
2017-12-25 01:05:27 -05:00
}
}
2018-10-20 18:17:19 -04:00
for mut descriptor in peers . peers_needing_send . drain ( ) {
match peers . peers . get_mut ( & descriptor ) {
2018-10-02 16:02:17 +02:00
Some ( peer ) = > self . do_attempt_write_data ( & mut descriptor , peer ) ,
2018-10-20 18:17:19 -04:00
None = > panic! ( " Inconsistent peers set state! " ) ,
}
}
2018-03-27 11:17:40 -04:00
}
2017-12-25 01:05:27 -05:00
}
/// Indicates that the given socket descriptor's connection is now closed.
2018-09-20 12:57:47 -04:00
///
2020-02-20 15:12:42 -05:00
/// This must only be called if the socket has been disconnected by the peer or your own
/// decision to disconnect it and must NOT be called in any case where other parts of this
/// library (eg PeerHandleError, explicit disconnect_socket calls) instruct you to disconnect
/// the peer.
2018-09-20 12:57:47 -04:00
///
2017-12-25 01:05:27 -05:00
/// Panics if the descriptor was not previously registered in a successful new_*_connection event.
2020-02-20 15:12:42 -05:00
pub fn socket_disconnected ( & self , descriptor : & Descriptor ) {
2018-04-01 19:23:09 -04:00
self . disconnect_event_internal ( descriptor , false ) ;
}
fn disconnect_event_internal ( & self , descriptor : & Descriptor , no_connection_possible : bool ) {
2017-12-25 01:05:27 -05:00
let mut peers = self . peers . lock ( ) . unwrap ( ) ;
2018-10-20 18:17:19 -04:00
peers . peers_needing_send . remove ( descriptor ) ;
2017-12-25 01:05:27 -05:00
let peer_option = peers . peers . remove ( descriptor ) ;
match peer_option {
None = > panic! ( " Descriptor for disconnect_event is not already known to PeerManager " ) ,
Some ( peer ) = > {
match peer . their_node_id {
2018-04-01 19:23:09 -04:00
Some ( node_id ) = > {
peers . node_id_to_descriptor . remove ( & node_id ) ;
self . message_handler . chan_handler . peer_disconnected ( & node_id , no_connection_possible ) ;
} ,
2017-12-25 01:05:27 -05:00
None = > { }
}
}
} ;
}
2019-09-20 11:16:45 -04:00
/// This function should be called roughly once every 30 seconds.
/// It will send pings to each peer and disconnect those which did not respond to the last round of pings.
/// Will most likely call send_data on all of the registered descriptors, thus, be very careful with reentrancy issues!
pub fn timer_tick_occured ( & self ) {
let mut peers_lock = self . peers . lock ( ) . unwrap ( ) ;
{
2020-01-16 10:48:16 -08:00
let peers = & mut * peers_lock ;
let peers_needing_send = & mut peers . peers_needing_send ;
let node_id_to_descriptor = & mut peers . node_id_to_descriptor ;
let peers = & mut peers . peers ;
2020-02-20 15:12:42 -05:00
let mut descriptors_needing_disconnect = Vec ::new ( ) ;
2019-09-20 11:16:45 -04:00
peers . retain ( | descriptor , peer | {
2020-02-18 14:16:33 -08:00
if peer . awaiting_pong {
2019-09-20 11:16:45 -04:00
peers_needing_send . remove ( descriptor ) ;
2020-02-20 15:12:42 -05:00
descriptors_needing_disconnect . push ( descriptor . clone ( ) ) ;
2019-09-20 11:16:45 -04:00
match peer . their_node_id {
Some ( node_id ) = > {
2020-03-02 12:55:53 -05:00
log_trace! ( self . logger , " Disconnecting peer with id {} due to ping timeout " , node_id ) ;
2019-09-20 11:16:45 -04:00
node_id_to_descriptor . remove ( & node_id ) ;
2020-02-24 18:47:38 -05:00
self . message_handler . chan_handler . peer_disconnected ( & node_id , false ) ;
2020-02-18 14:16:33 -08:00
}
2020-02-24 18:47:38 -05:00
None = > {
// This can't actually happen as we should have hit
// is_ready_for_encryption() previously on this same peer.
unreachable! ( ) ;
} ,
2019-09-20 11:16:45 -04:00
}
2020-02-18 14:16:33 -08:00
return false ;
}
if ! peer . channel_encryptor . is_ready_for_encryption ( ) {
// The peer needs to complete its handshake before we can exchange messages
return true ;
2019-09-20 11:16:45 -04:00
}
let ping = msgs ::Ping {
ponglen : 0 ,
byteslen : 64 ,
} ;
2020-02-18 14:16:33 -08:00
peer . pending_outbound_buffer . push_back ( peer . channel_encryptor . encrypt_message ( & encode_msg! ( & ping ) ) ) ;
2019-09-20 11:16:45 -04:00
let mut descriptor_clone = descriptor . clone ( ) ;
self . do_attempt_write_data ( & mut descriptor_clone , peer ) ;
2020-02-18 14:16:33 -08:00
peer . awaiting_pong = true ;
true
2019-09-20 11:16:45 -04:00
} ) ;
2020-02-20 15:12:42 -05:00
for mut descriptor in descriptors_needing_disconnect . drain ( .. ) {
descriptor . disconnect_socket ( ) ;
}
2019-09-20 11:16:45 -04:00
}
}
2017-12-25 01:05:27 -05:00
}
2018-07-23 01:06:45 +00:00
#[ cfg(test) ]
mod tests {
use ln ::peer_handler ::{ PeerManager , MessageHandler , SocketDescriptor } ;
2018-08-01 16:34:03 +00:00
use ln ::msgs ;
2018-07-23 01:06:45 +00:00
use util ::events ;
use util ::test_utils ;
2020-04-27 16:51:59 +02:00
use bitcoin ::secp256k1 ::Secp256k1 ;
use bitcoin ::secp256k1 ::key ::{ SecretKey , PublicKey } ;
2018-07-23 01:06:45 +00:00
2020-02-18 14:16:33 -08:00
use std ;
use std ::sync ::{ Arc , Mutex } ;
2020-05-12 13:31:20 -04:00
use std ::sync ::atomic ::Ordering ;
2018-07-23 01:06:45 +00:00
2020-02-18 14:16:33 -08:00
#[ derive(Clone) ]
2018-07-23 01:06:45 +00:00
struct FileDescriptor {
fd : u16 ,
2020-02-18 14:16:33 -08:00
outbound_data : Arc < Mutex < Vec < u8 > > > ,
}
impl PartialEq for FileDescriptor {
fn eq ( & self , other : & Self ) -> bool {
self . fd = = other . fd
}
}
impl Eq for FileDescriptor { }
impl std ::hash ::Hash for FileDescriptor {
fn hash < H : std ::hash ::Hasher > ( & self , hasher : & mut H ) {
self . fd . hash ( hasher )
}
2018-07-23 01:06:45 +00:00
}
impl SocketDescriptor for FileDescriptor {
2019-07-31 02:57:08 +00:00
fn send_data ( & mut self , data : & [ u8 ] , _resume_read : bool ) -> usize {
2020-02-18 14:16:33 -08:00
self . outbound_data . lock ( ) . unwrap ( ) . extend_from_slice ( data ) ;
2019-07-31 02:57:08 +00:00
data . len ( )
2018-07-23 01:06:45 +00:00
}
fn disconnect_socket ( & mut self ) { }
}
2020-05-17 12:13:29 -04:00
struct PeerManagerCfg {
chan_handler : test_utils ::TestChannelMessageHandler ,
2020-05-12 13:31:20 -04:00
routing_handler : test_utils ::TestRoutingMessageHandler ,
2020-05-17 12:13:29 -04:00
logger : test_utils ::TestLogger ,
}
fn create_peermgr_cfgs ( peer_count : usize ) -> Vec < PeerManagerCfg > {
let mut cfgs = Vec ::new ( ) ;
2020-01-16 13:26:38 -05:00
for _ in 0 .. peer_count {
2020-05-17 12:13:29 -04:00
cfgs . push (
PeerManagerCfg {
2020-05-12 13:31:20 -04:00
chan_handler : test_utils ::TestChannelMessageHandler ::new ( ) ,
logger : test_utils ::TestLogger ::new ( ) ,
routing_handler : test_utils ::TestRoutingMessageHandler ::new ( ) ,
2020-05-17 12:13:29 -04:00
}
) ;
2020-01-16 13:26:38 -05:00
}
2020-05-17 12:13:29 -04:00
cfgs
2020-01-16 13:26:38 -05:00
}
2020-05-12 13:31:20 -04:00
fn create_network < ' a > ( peer_count : usize , cfgs : & ' a Vec < PeerManagerCfg > ) -> Vec < PeerManager < FileDescriptor , & ' a test_utils ::TestChannelMessageHandler , & ' a test_utils ::TestRoutingMessageHandler , & ' a test_utils ::TestLogger > > {
2018-07-23 01:06:45 +00:00
let mut peers = Vec ::new ( ) ;
2020-01-16 13:26:38 -05:00
for i in 0 .. peer_count {
2020-06-17 08:29:30 -07:00
let node_secret = SecretKey ::from_slice ( & [ 42 + i as u8 ; 32 ] ) . unwrap ( ) ;
let ephemeral_bytes = [ i as u8 ; 32 ] ;
2020-05-12 13:31:20 -04:00
let msg_handler = MessageHandler { chan_handler : & cfgs [ i ] . chan_handler , route_handler : & cfgs [ i ] . routing_handler } ;
2020-06-17 08:29:30 -07:00
let peer = PeerManager ::new ( msg_handler , node_secret , & ephemeral_bytes , & cfgs [ i ] . logger ) ;
2018-07-23 01:06:45 +00:00
peers . push ( peer ) ;
}
peers
}
2020-05-12 13:31:20 -04:00
fn establish_connection < ' a > ( peer_a : & PeerManager < FileDescriptor , & ' a test_utils ::TestChannelMessageHandler , & ' a test_utils ::TestRoutingMessageHandler , & ' a test_utils ::TestLogger > , peer_b : & PeerManager < FileDescriptor , & ' a test_utils ::TestChannelMessageHandler , & ' a test_utils ::TestRoutingMessageHandler , & ' a test_utils ::TestLogger > ) -> ( FileDescriptor , FileDescriptor ) {
2018-07-23 01:06:45 +00:00
let secp_ctx = Secp256k1 ::new ( ) ;
2020-02-18 14:16:33 -08:00
let a_id = PublicKey ::from_secret_key ( & secp_ctx , & peer_a . our_node_secret ) ;
let mut fd_a = FileDescriptor { fd : 1 , outbound_data : Arc ::new ( Mutex ::new ( Vec ::new ( ) ) ) } ;
let mut fd_b = FileDescriptor { fd : 1 , outbound_data : Arc ::new ( Mutex ::new ( Vec ::new ( ) ) ) } ;
let initial_data = peer_b . new_outbound_connection ( a_id , fd_b . clone ( ) ) . unwrap ( ) ;
peer_a . new_inbound_connection ( fd_a . clone ( ) ) . unwrap ( ) ;
2020-01-31 20:57:01 -05:00
assert_eq! ( peer_a . read_event ( & mut fd_a , & initial_data ) . unwrap ( ) , false ) ;
assert_eq! ( peer_b . read_event ( & mut fd_b , & fd_a . outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ) . unwrap ( ) , false ) ;
assert_eq! ( peer_a . read_event ( & mut fd_a , & fd_b . outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ) . unwrap ( ) , false ) ;
2020-03-20 18:42:02 -04:00
( fd_a . clone ( ) , fd_b . clone ( ) )
2018-07-23 01:06:45 +00:00
}
2020-05-12 13:31:20 -04:00
fn establish_connection_and_read_events < ' a > ( peer_a : & PeerManager < FileDescriptor , & ' a test_utils ::TestChannelMessageHandler , & ' a test_utils ::TestRoutingMessageHandler , & ' a test_utils ::TestLogger > , peer_b : & PeerManager < FileDescriptor , & ' a test_utils ::TestChannelMessageHandler , & ' a test_utils ::TestRoutingMessageHandler , & ' a test_utils ::TestLogger > ) -> ( FileDescriptor , FileDescriptor ) {
2020-04-23 09:47:15 -07:00
let ( mut fd_a , mut fd_b ) = establish_connection ( peer_a , peer_b ) ;
assert_eq! ( peer_b . read_event ( & mut fd_b , & fd_a . outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ) . unwrap ( ) , false ) ;
assert_eq! ( peer_a . read_event ( & mut fd_a , & fd_b . outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ) . unwrap ( ) , false ) ;
( fd_a . clone ( ) , fd_b . clone ( ) )
}
2018-07-23 01:06:45 +00:00
#[ test ]
fn test_disconnect_peer ( ) {
// Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
2019-01-24 16:41:51 +02:00
// push a DisconnectPeer event to remove the node flagged by id
2020-05-17 12:13:29 -04:00
let cfgs = create_peermgr_cfgs ( 2 ) ;
2020-01-16 13:26:38 -05:00
let chan_handler = test_utils ::TestChannelMessageHandler ::new ( ) ;
2020-05-12 13:31:20 -04:00
let mut peers = create_network ( 2 , & cfgs ) ;
2018-07-23 01:06:45 +00:00
establish_connection ( & peers [ 0 ] , & peers [ 1 ] ) ;
assert_eq! ( peers [ 0 ] . peers . lock ( ) . unwrap ( ) . peers . len ( ) , 1 ) ;
let secp_ctx = Secp256k1 ::new ( ) ;
2018-08-20 17:13:07 -04:00
let their_id = PublicKey ::from_secret_key ( & secp_ctx , & peers [ 1 ] . our_node_secret ) ;
2018-07-23 01:06:45 +00:00
2018-10-19 16:25:32 -04:00
chan_handler . pending_events . lock ( ) . unwrap ( ) . push ( events ::MessageSendEvent ::HandleError {
2018-07-23 01:06:45 +00:00
node_id : their_id ,
2019-11-04 19:54:43 -05:00
action : msgs ::ErrorAction ::DisconnectPeer { msg : None } ,
2018-07-23 01:06:45 +00:00
} ) ;
assert_eq! ( chan_handler . pending_events . lock ( ) . unwrap ( ) . len ( ) , 1 ) ;
2020-01-16 13:26:38 -05:00
peers [ 0 ] . message_handler . chan_handler = & chan_handler ;
2018-07-23 01:06:45 +00:00
peers [ 0 ] . process_events ( ) ;
assert_eq! ( peers [ 0 ] . peers . lock ( ) . unwrap ( ) . peers . len ( ) , 0 ) ;
}
2020-03-20 18:42:02 -04:00
2019-09-20 11:16:45 -04:00
#[ test ]
2020-03-20 18:42:02 -04:00
fn test_timer_tick_occurred ( ) {
2019-09-20 11:16:45 -04:00
// Create peers, a vector of two peer managers, perform initial set up and check that peers[0] has one Peer.
2020-05-17 12:13:29 -04:00
let cfgs = create_peermgr_cfgs ( 2 ) ;
2020-05-12 13:31:20 -04:00
let peers = create_network ( 2 , & cfgs ) ;
2019-09-20 11:16:45 -04:00
establish_connection ( & peers [ 0 ] , & peers [ 1 ] ) ;
assert_eq! ( peers [ 0 ] . peers . lock ( ) . unwrap ( ) . peers . len ( ) , 1 ) ;
// peers[0] awaiting_pong is set to true, but the Peer is still connected
peers [ 0 ] . timer_tick_occured ( ) ;
assert_eq! ( peers [ 0 ] . peers . lock ( ) . unwrap ( ) . peers . len ( ) , 1 ) ;
// Since timer_tick_occured() is called again when awaiting_pong is true, all Peers are disconnected
peers [ 0 ] . timer_tick_occured ( ) ;
assert_eq! ( peers [ 0 ] . peers . lock ( ) . unwrap ( ) . peers . len ( ) , 0 ) ;
}
2020-03-20 18:42:02 -04:00
#[ test ]
fn test_do_attempt_write_data ( ) {
// Create 2 peers with custom TestRoutingMessageHandlers and connect them.
2020-05-17 12:13:29 -04:00
let cfgs = create_peermgr_cfgs ( 2 ) ;
2020-05-12 13:31:20 -04:00
cfgs [ 0 ] . routing_handler . request_full_sync . store ( true , Ordering ::Release ) ;
cfgs [ 1 ] . routing_handler . request_full_sync . store ( true , Ordering ::Release ) ;
let peers = create_network ( 2 , & cfgs ) ;
2020-03-20 18:42:02 -04:00
// By calling establish_connect, we trigger do_attempt_write_data between
// the peers. Previously this function would mistakenly enter an infinite loop
// when there were more channel messages available than could fit into a peer's
// buffer. This issue would now be detected by this test (because we use custom
// RoutingMessageHandlers that intentionally return more channel messages
// than can fit into a peer's buffer).
let ( mut fd_a , mut fd_b ) = establish_connection ( & peers [ 0 ] , & peers [ 1 ] ) ;
// Make each peer to read the messages that the other peer just wrote to them.
peers [ 1 ] . read_event ( & mut fd_b , & fd_a . outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ) . unwrap ( ) ;
peers [ 0 ] . read_event ( & mut fd_a , & fd_b . outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ) . unwrap ( ) ;
// Check that each peer has received the expected number of channel updates and channel
// announcements.
2020-05-12 13:31:20 -04:00
assert_eq! ( cfgs [ 0 ] . routing_handler . chan_upds_recvd . load ( Ordering ::Acquire ) , 100 ) ;
assert_eq! ( cfgs [ 0 ] . routing_handler . chan_anns_recvd . load ( Ordering ::Acquire ) , 50 ) ;
assert_eq! ( cfgs [ 1 ] . routing_handler . chan_upds_recvd . load ( Ordering ::Acquire ) , 100 ) ;
assert_eq! ( cfgs [ 1 ] . routing_handler . chan_anns_recvd . load ( Ordering ::Acquire ) , 50 ) ;
2020-03-20 18:42:02 -04:00
}
2020-04-23 09:47:15 -07:00
#[ test ]
fn limit_initial_routing_sync_requests ( ) {
// Inbound peer 0 requests initial_routing_sync, but outbound peer 1 does not.
{
2020-05-17 12:13:29 -04:00
let cfgs = create_peermgr_cfgs ( 2 ) ;
2020-05-12 13:31:20 -04:00
cfgs [ 0 ] . routing_handler . request_full_sync . store ( true , Ordering ::Release ) ;
let peers = create_network ( 2 , & cfgs ) ;
2020-04-23 09:47:15 -07:00
let ( fd_0_to_1 , fd_1_to_0 ) = establish_connection_and_read_events ( & peers [ 0 ] , & peers [ 1 ] ) ;
let peer_0 = peers [ 0 ] . peers . lock ( ) . unwrap ( ) ;
let peer_1 = peers [ 1 ] . peers . lock ( ) . unwrap ( ) ;
let peer_0_features = peer_1 . peers . get ( & fd_1_to_0 ) . unwrap ( ) . their_features . as_ref ( ) ;
let peer_1_features = peer_0 . peers . get ( & fd_0_to_1 ) . unwrap ( ) . their_features . as_ref ( ) ;
assert! ( peer_0_features . unwrap ( ) . initial_routing_sync ( ) ) ;
assert! ( ! peer_1_features . unwrap ( ) . initial_routing_sync ( ) ) ;
}
// Outbound peer 1 requests initial_routing_sync, but inbound peer 0 does not.
{
2020-05-17 12:13:29 -04:00
let cfgs = create_peermgr_cfgs ( 2 ) ;
2020-05-12 13:31:20 -04:00
cfgs [ 1 ] . routing_handler . request_full_sync . store ( true , Ordering ::Release ) ;
let peers = create_network ( 2 , & cfgs ) ;
2020-04-23 09:47:15 -07:00
let ( fd_0_to_1 , fd_1_to_0 ) = establish_connection_and_read_events ( & peers [ 0 ] , & peers [ 1 ] ) ;
let peer_0 = peers [ 0 ] . peers . lock ( ) . unwrap ( ) ;
let peer_1 = peers [ 1 ] . peers . lock ( ) . unwrap ( ) ;
let peer_0_features = peer_1 . peers . get ( & fd_1_to_0 ) . unwrap ( ) . their_features . as_ref ( ) ;
let peer_1_features = peer_0 . peers . get ( & fd_0_to_1 ) . unwrap ( ) . their_features . as_ref ( ) ;
assert! ( ! peer_0_features . unwrap ( ) . initial_routing_sync ( ) ) ;
assert! ( peer_1_features . unwrap ( ) . initial_routing_sync ( ) ) ;
}
}
2018-07-23 01:06:45 +00:00
}