mirror of
https://github.com/lightningdevkit/rust-lightning.git
synced 2025-02-27 17:01:10 +01:00
In `fuzz_threaded_connections`, if one thread is being run while another is starved, and the running thread manages to call `timer_tick_ocurred` twice after the starved thread constructs the inbound connection but before it delivers the first bytes, we'll receive an immediate error and `unwrap` it, causing failure. The fix is trivial, simply remove the unwrap and return if we're already disconnected when we do the initial read. While we're here, we also reduce the frequency of the `timer_tick_ocurred` calls to give us a chance to occasionally deliver some additional messages. Fixes #2073
2675 lines
120 KiB
Rust
2675 lines
120 KiB
Rust
// This file is Copyright its original authors, visible in version control
|
|
// history.
|
|
//
|
|
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
|
|
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
|
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
|
|
// You may not use this file except in accordance with one or both of these
|
|
// licenses.
|
|
|
|
//! Top level peer message handling and socket handling logic lives here.
|
|
//!
|
|
//! Instead of actually servicing sockets ourselves we require that you implement the
|
|
//! SocketDescriptor interface and use that to receive actions which you should perform on the
|
|
//! socket, and call into PeerManager with bytes read from the socket. The PeerManager will then
|
|
//! call into the provided message handlers (probably a ChannelManager and P2PGossipSync) with
|
|
//! messages they should handle, and encoding/sending response messages.
|
|
|
|
use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey};
|
|
|
|
use crate::chain::keysinterface::{KeysManager, NodeSigner, Recipient};
|
|
use crate::ln::features::{InitFeatures, NodeFeatures};
|
|
use crate::ln::msgs;
|
|
use crate::ln::msgs::{ChannelMessageHandler, LightningError, NetAddress, OnionMessageHandler, RoutingMessageHandler};
|
|
use crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager};
|
|
use crate::util::ser::{VecWriter, Writeable, Writer};
|
|
use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
|
|
use crate::ln::wire;
|
|
use crate::ln::wire::Encode;
|
|
use crate::onion_message::{CustomOnionMessageContents, CustomOnionMessageHandler, SimpleArcOnionMessenger, SimpleRefOnionMessenger};
|
|
use crate::routing::gossip::{NetworkGraph, P2PGossipSync, NodeId};
|
|
use crate::util::atomic_counter::AtomicCounter;
|
|
use crate::util::events::{MessageSendEvent, MessageSendEventsProvider, OnionMessageProvider};
|
|
use crate::util::logger::Logger;
|
|
|
|
use crate::prelude::*;
|
|
use crate::io;
|
|
use alloc::collections::LinkedList;
|
|
use crate::sync::{Arc, Mutex, MutexGuard, FairRwLock};
|
|
use core::sync::atomic::{AtomicBool, AtomicU32, Ordering};
|
|
use core::{cmp, hash, fmt, mem};
|
|
use core::ops::Deref;
|
|
use core::convert::Infallible;
|
|
#[cfg(feature = "std")] use std::error;
|
|
|
|
use bitcoin::hashes::sha256::Hash as Sha256;
|
|
use bitcoin::hashes::sha256::HashEngine as Sha256Engine;
|
|
use bitcoin::hashes::{HashEngine, Hash};
|
|
|
|
/// A handler provided to [`PeerManager`] for reading and handling custom messages.
|
|
///
|
|
/// [BOLT 1] specifies a custom message type range for use with experimental or application-specific
|
|
/// messages. `CustomMessageHandler` allows for user-defined handling of such types. See the
|
|
/// [`lightning_custom_message`] crate for tools useful in composing more than one custom handler.
|
|
///
|
|
/// [BOLT 1]: https://github.com/lightning/bolts/blob/master/01-messaging.md
|
|
/// [`lightning_custom_message`]: https://docs.rs/lightning_custom_message/latest/lightning_custom_message
|
|
pub trait CustomMessageHandler: wire::CustomMessageReader {
|
|
/// Handles the given message sent from `sender_node_id`, possibly producing messages for
|
|
/// [`CustomMessageHandler::get_and_clear_pending_msg`] to return and thus for [`PeerManager`]
|
|
/// to send.
|
|
fn handle_custom_message(&self, msg: Self::CustomMessage, sender_node_id: &PublicKey) -> Result<(), LightningError>;
|
|
|
|
/// Returns the list of pending messages that were generated by the handler, clearing the list
|
|
/// in the process. Each message is paired with the node id of the intended recipient. If no
|
|
/// connection to the node exists, then the message is simply not sent.
|
|
fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)>;
|
|
}
|
|
|
|
/// A dummy struct which implements `RoutingMessageHandler` without storing any routing information
|
|
/// or doing any processing. You can provide one of these as the route_handler in a MessageHandler.
|
|
pub struct IgnoringMessageHandler{}
|
|
impl MessageSendEventsProvider for IgnoringMessageHandler {
|
|
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> { Vec::new() }
|
|
}
|
|
impl RoutingMessageHandler for IgnoringMessageHandler {
|
|
fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result<bool, LightningError> { Ok(false) }
|
|
fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result<bool, LightningError> { Ok(false) }
|
|
fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result<bool, LightningError> { Ok(false) }
|
|
fn get_next_channel_announcement(&self, _starting_point: u64) ->
|
|
Option<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> { None }
|
|
fn get_next_node_announcement(&self, _starting_point: Option<&NodeId>) -> Option<msgs::NodeAnnouncement> { None }
|
|
fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) }
|
|
fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
|
|
fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }
|
|
fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::QueryChannelRange) -> Result<(), LightningError> { Ok(()) }
|
|
fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: msgs::QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) }
|
|
fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() }
|
|
fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures {
|
|
InitFeatures::empty()
|
|
}
|
|
fn processing_queue_high(&self) -> bool { false }
|
|
}
|
|
impl OnionMessageProvider for IgnoringMessageHandler {
|
|
fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option<msgs::OnionMessage> { None }
|
|
}
|
|
impl OnionMessageHandler for IgnoringMessageHandler {
|
|
fn handle_onion_message(&self, _their_node_id: &PublicKey, _msg: &msgs::OnionMessage) {}
|
|
fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) }
|
|
fn peer_disconnected(&self, _their_node_id: &PublicKey) {}
|
|
fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() }
|
|
fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures {
|
|
InitFeatures::empty()
|
|
}
|
|
}
|
|
impl CustomOnionMessageHandler for IgnoringMessageHandler {
|
|
type CustomMessage = Infallible;
|
|
fn handle_custom_message(&self, _msg: Infallible) {
|
|
// Since we always return `None` in the read the handle method should never be called.
|
|
unreachable!();
|
|
}
|
|
fn read_custom_message<R: io::Read>(&self, _msg_type: u64, _buffer: &mut R) -> Result<Option<Infallible>, msgs::DecodeError> where Self: Sized {
|
|
Ok(None)
|
|
}
|
|
}
|
|
|
|
impl CustomOnionMessageContents for Infallible {
|
|
fn tlv_type(&self) -> u64 { unreachable!(); }
|
|
}
|
|
|
|
impl Deref for IgnoringMessageHandler {
|
|
type Target = IgnoringMessageHandler;
|
|
fn deref(&self) -> &Self { self }
|
|
}
|
|
|
|
// Implement Type for Infallible, note that it cannot be constructed, and thus you can never call a
|
|
// method that takes self for it.
|
|
impl wire::Type for Infallible {
|
|
fn type_id(&self) -> u16 {
|
|
unreachable!();
|
|
}
|
|
}
|
|
impl Writeable for Infallible {
|
|
fn write<W: Writer>(&self, _: &mut W) -> Result<(), io::Error> {
|
|
unreachable!();
|
|
}
|
|
}
|
|
|
|
impl wire::CustomMessageReader for IgnoringMessageHandler {
|
|
type CustomMessage = Infallible;
|
|
fn read<R: io::Read>(&self, _message_type: u16, _buffer: &mut R) -> Result<Option<Self::CustomMessage>, msgs::DecodeError> {
|
|
Ok(None)
|
|
}
|
|
}
|
|
|
|
impl CustomMessageHandler for IgnoringMessageHandler {
|
|
fn handle_custom_message(&self, _msg: Infallible, _sender_node_id: &PublicKey) -> Result<(), LightningError> {
|
|
// Since we always return `None` in the read the handle method should never be called.
|
|
unreachable!();
|
|
}
|
|
|
|
fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> { Vec::new() }
|
|
}
|
|
|
|
/// A dummy struct which implements `ChannelMessageHandler` without having any channels.
|
|
/// You can provide one of these as the route_handler in a MessageHandler.
|
|
pub struct ErroringMessageHandler {
|
|
message_queue: Mutex<Vec<MessageSendEvent>>
|
|
}
|
|
impl ErroringMessageHandler {
|
|
/// Constructs a new ErroringMessageHandler
|
|
pub fn new() -> Self {
|
|
Self { message_queue: Mutex::new(Vec::new()) }
|
|
}
|
|
fn push_error(&self, node_id: &PublicKey, channel_id: [u8; 32]) {
|
|
self.message_queue.lock().unwrap().push(MessageSendEvent::HandleError {
|
|
action: msgs::ErrorAction::SendErrorMessage {
|
|
msg: msgs::ErrorMessage { channel_id, data: "We do not support channel messages, sorry.".to_owned() },
|
|
},
|
|
node_id: node_id.clone(),
|
|
});
|
|
}
|
|
}
|
|
impl MessageSendEventsProvider for ErroringMessageHandler {
|
|
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
|
|
let mut res = Vec::new();
|
|
mem::swap(&mut res, &mut self.message_queue.lock().unwrap());
|
|
res
|
|
}
|
|
}
|
|
impl ChannelMessageHandler for ErroringMessageHandler {
|
|
// Any messages which are related to a specific channel generate an error message to let the
|
|
// peer know we don't care about channels.
|
|
fn handle_open_channel(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannel) {
|
|
ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id);
|
|
}
|
|
fn handle_accept_channel(&self, their_node_id: &PublicKey, msg: &msgs::AcceptChannel) {
|
|
ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id);
|
|
}
|
|
fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) {
|
|
ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id);
|
|
}
|
|
fn handle_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) {
|
|
ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
|
|
}
|
|
fn handle_channel_ready(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReady) {
|
|
ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
|
|
}
|
|
fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) {
|
|
ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
|
|
}
|
|
fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) {
|
|
ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
|
|
}
|
|
fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) {
|
|
ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
|
|
}
|
|
fn handle_update_fulfill_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) {
|
|
ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
|
|
}
|
|
fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) {
|
|
ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
|
|
}
|
|
fn handle_update_fail_malformed_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) {
|
|
ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
|
|
}
|
|
fn handle_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) {
|
|
ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
|
|
}
|
|
fn handle_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) {
|
|
ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
|
|
}
|
|
fn handle_update_fee(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFee) {
|
|
ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
|
|
}
|
|
fn handle_announcement_signatures(&self, their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) {
|
|
ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
|
|
}
|
|
fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) {
|
|
ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
|
|
}
|
|
// msgs::ChannelUpdate does not contain the channel_id field, so we just drop them.
|
|
fn handle_channel_update(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelUpdate) {}
|
|
fn peer_disconnected(&self, _their_node_id: &PublicKey) {}
|
|
fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) }
|
|
fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {}
|
|
fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() }
|
|
fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures {
|
|
// Set a number of features which various nodes may require to talk to us. It's totally
|
|
// reasonable to indicate we "support" all kinds of channel features...we just reject all
|
|
// channels.
|
|
let mut features = InitFeatures::empty();
|
|
features.set_data_loss_protect_optional();
|
|
features.set_upfront_shutdown_script_optional();
|
|
features.set_variable_length_onion_optional();
|
|
features.set_static_remote_key_optional();
|
|
features.set_payment_secret_optional();
|
|
features.set_basic_mpp_optional();
|
|
features.set_wumbo_optional();
|
|
features.set_shutdown_any_segwit_optional();
|
|
features.set_channel_type_optional();
|
|
features.set_scid_privacy_optional();
|
|
features.set_zero_conf_optional();
|
|
features
|
|
}
|
|
}
|
|
impl Deref for ErroringMessageHandler {
|
|
type Target = ErroringMessageHandler;
|
|
fn deref(&self) -> &Self { self }
|
|
}
|
|
|
|
/// Provides references to trait impls which handle different types of messages.
|
|
pub struct MessageHandler<CM: Deref, RM: Deref, OM: Deref> where
|
|
CM::Target: ChannelMessageHandler,
|
|
RM::Target: RoutingMessageHandler,
|
|
OM::Target: OnionMessageHandler,
|
|
{
|
|
/// A message handler which handles messages specific to channels. Usually this is just a
|
|
/// [`ChannelManager`] object or an [`ErroringMessageHandler`].
|
|
///
|
|
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
|
|
pub chan_handler: CM,
|
|
/// A message handler which handles messages updating our knowledge of the network channel
|
|
/// graph. Usually this is just a [`P2PGossipSync`] object or an [`IgnoringMessageHandler`].
|
|
///
|
|
/// [`P2PGossipSync`]: crate::routing::gossip::P2PGossipSync
|
|
pub route_handler: RM,
|
|
|
|
/// A message handler which handles onion messages. For now, this can only be an
|
|
/// [`IgnoringMessageHandler`].
|
|
pub onion_message_handler: OM,
|
|
}
|
|
|
|
/// Provides an object which can be used to send data to and which uniquely identifies a connection
|
|
/// to a remote host. You will need to be able to generate multiple of these which meet Eq and
|
|
/// implement Hash to meet the PeerManager API.
|
|
///
|
|
/// For efficiency, Clone should be relatively cheap for this type.
|
|
///
|
|
/// Two descriptors may compare equal (by [`cmp::Eq`] and [`hash::Hash`]) as long as the original
|
|
/// has been disconnected, the [`PeerManager`] has been informed of the disconnection (either by it
|
|
/// having triggered the disconnection or a call to [`PeerManager::socket_disconnected`]), and no
|
|
/// further calls to the [`PeerManager`] related to the original socket occur. This allows you to
|
|
/// use a file descriptor for your SocketDescriptor directly, however for simplicity you may wish
|
|
/// to simply use another value which is guaranteed to be globally unique instead.
|
|
pub trait SocketDescriptor : cmp::Eq + hash::Hash + Clone {
|
|
/// Attempts to send some data from the given slice to the peer.
|
|
///
|
|
/// Returns the amount of data which was sent, possibly 0 if the socket has since disconnected.
|
|
/// Note that in the disconnected case, [`PeerManager::socket_disconnected`] must still be
|
|
/// called and further write attempts may occur until that time.
|
|
///
|
|
/// If the returned size is smaller than `data.len()`, a
|
|
/// [`PeerManager::write_buffer_space_avail`] call must be made the next time more data can be
|
|
/// written. Additionally, until a `send_data` event completes fully, no further
|
|
/// [`PeerManager::read_event`] calls should be made for the same peer! Because this is to
|
|
/// prevent denial-of-service issues, you should not read or buffer any data from the socket
|
|
/// until then.
|
|
///
|
|
/// If a [`PeerManager::read_event`] call on this descriptor had previously returned true
|
|
/// (indicating that read events should be paused to prevent DoS in the send buffer),
|
|
/// `resume_read` may be set indicating that read events on this descriptor should resume. A
|
|
/// `resume_read` of false carries no meaning, and should not cause any action.
|
|
fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize;
|
|
/// Disconnect the socket pointed to by this SocketDescriptor.
|
|
///
|
|
/// You do *not* need to call [`PeerManager::socket_disconnected`] with this socket after this
|
|
/// call (doing so is a noop).
|
|
fn disconnect_socket(&mut self);
|
|
}
|
|
|
|
/// Error for PeerManager errors. If you get one of these, you must disconnect the socket and
|
|
/// generate no further read_event/write_buffer_space_avail/socket_disconnected calls for the
|
|
/// descriptor.
|
|
#[derive(Clone)]
|
|
pub struct PeerHandleError { }
|
|
impl fmt::Debug for PeerHandleError {
|
|
fn fmt(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
|
formatter.write_str("Peer Sent Invalid Data")
|
|
}
|
|
}
|
|
impl fmt::Display for PeerHandleError {
|
|
fn fmt(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
|
formatter.write_str("Peer Sent Invalid Data")
|
|
}
|
|
}
|
|
|
|
#[cfg(feature = "std")]
|
|
impl error::Error for PeerHandleError {
|
|
fn description(&self) -> &str {
|
|
"Peer Sent Invalid Data"
|
|
}
|
|
}
|
|
|
|
enum InitSyncTracker{
|
|
NoSyncRequested,
|
|
ChannelsSyncing(u64),
|
|
NodesSyncing(NodeId),
|
|
}
|
|
|
|
/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we stop
|
|
/// forwarding gossip messages to peers altogether.
|
|
const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2;
|
|
|
|
/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
|
|
/// we have fewer than this many messages in the outbound buffer again.
|
|
/// We also use this as the target number of outbound gossip messages to keep in the write buffer,
|
|
/// refilled as we send bytes.
|
|
const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 12;
|
|
/// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
|
|
/// the peer.
|
|
const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO;
|
|
|
|
/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through
|
|
/// the socket receive buffer before receiving the ping.
|
|
///
|
|
/// On a fairly old Arm64 board, with Linux defaults, this can take as long as 20 seconds, not
|
|
/// including any network delays, outbound traffic, or the same for messages from other peers.
|
|
///
|
|
/// Thus, to avoid needlessly disconnecting a peer, we allow a peer to take this many timer ticks
|
|
/// per connected peer to respond to a ping, as long as they send us at least one message during
|
|
/// each tick, ensuring we aren't actually just disconnected.
|
|
/// With a timer tick interval of ten seconds, this translates to about 40 seconds per connected
|
|
/// peer.
|
|
///
|
|
/// When we improve parallelism somewhat we should reduce this to e.g. this many timer ticks per
|
|
/// two connected peers, assuming most LDK-running systems have at least two cores.
|
|
const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 4;
|
|
|
|
/// This is the minimum number of messages we expect a peer to be able to handle within one timer
|
|
/// tick. Once we have sent this many messages since the last ping, we send a ping right away to
|
|
/// ensures we don't just fill up our send buffer and leave the peer with too many messages to
|
|
/// process before the next ping.
|
|
///
|
|
/// Note that we continue responding to other messages even after we've sent this many messages, so
|
|
/// it's more of a general guideline used for gossip backfill (and gossip forwarding, times
|
|
/// [`FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO`]) than a hard limit.
|
|
const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32;
|
|
|
|
struct Peer {
|
|
channel_encryptor: PeerChannelEncryptor,
|
|
/// We cache a `NodeId` here to avoid serializing peers' keys every time we forward gossip
|
|
/// messages in `PeerManager`. Use `Peer::set_their_node_id` to modify this field.
|
|
their_node_id: Option<(PublicKey, NodeId)>,
|
|
/// The features provided in the peer's [`msgs::Init`] message.
|
|
///
|
|
/// This is set only after we've processed the [`msgs::Init`] message and called relevant
|
|
/// `peer_connected` handler methods. Thus, this field is set *iff* we've finished our
|
|
/// handshake and can talk to this peer normally (though use [`Peer::handshake_complete`] to
|
|
/// check this.
|
|
their_features: Option<InitFeatures>,
|
|
their_net_address: Option<NetAddress>,
|
|
|
|
pending_outbound_buffer: LinkedList<Vec<u8>>,
|
|
pending_outbound_buffer_first_msg_offset: usize,
|
|
/// Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily
|
|
/// prioritize channel messages over them.
|
|
///
|
|
/// Note that these messages are *not* encrypted/MAC'd, and are only serialized.
|
|
gossip_broadcast_buffer: LinkedList<Vec<u8>>,
|
|
awaiting_write_event: bool,
|
|
|
|
pending_read_buffer: Vec<u8>,
|
|
pending_read_buffer_pos: usize,
|
|
pending_read_is_header: bool,
|
|
|
|
sync_status: InitSyncTracker,
|
|
|
|
msgs_sent_since_pong: usize,
|
|
awaiting_pong_timer_tick_intervals: i8,
|
|
received_message_since_timer_tick: bool,
|
|
sent_gossip_timestamp_filter: bool,
|
|
|
|
/// Indicates we've received a `channel_announcement` since the last time we had
|
|
/// [`PeerManager::gossip_processing_backlogged`] set (or, really, that we've received a
|
|
/// `channel_announcement` at all - we set this unconditionally but unset it every time we
|
|
/// check if we're gossip-processing-backlogged).
|
|
received_channel_announce_since_backlogged: bool,
|
|
|
|
inbound_connection: bool,
|
|
}
|
|
|
|
impl Peer {
|
|
/// True after we've processed the [`msgs::Init`] message and called relevant `peer_connected`
|
|
/// handler methods. Thus, this implies we've finished our handshake and can talk to this peer
|
|
/// normally.
|
|
fn handshake_complete(&self) -> bool {
|
|
self.their_features.is_some()
|
|
}
|
|
|
|
/// Returns true if the channel announcements/updates for the given channel should be
|
|
/// forwarded to this peer.
|
|
/// If we are sending our routing table to this peer and we have not yet sent channel
|
|
/// announcements/updates for the given channel_id then we will send it when we get to that
|
|
/// point and we shouldn't send it yet to avoid sending duplicate updates. If we've already
|
|
/// sent the old versions, we should send the update, and so return true here.
|
|
fn should_forward_channel_announcement(&self, channel_id: u64) -> bool {
|
|
if !self.handshake_complete() { return false; }
|
|
if self.their_features.as_ref().unwrap().supports_gossip_queries() &&
|
|
!self.sent_gossip_timestamp_filter {
|
|
return false;
|
|
}
|
|
match self.sync_status {
|
|
InitSyncTracker::NoSyncRequested => true,
|
|
InitSyncTracker::ChannelsSyncing(i) => i < channel_id,
|
|
InitSyncTracker::NodesSyncing(_) => true,
|
|
}
|
|
}
|
|
|
|
/// Similar to the above, but for node announcements indexed by node_id.
|
|
fn should_forward_node_announcement(&self, node_id: NodeId) -> bool {
|
|
if !self.handshake_complete() { return false; }
|
|
if self.their_features.as_ref().unwrap().supports_gossip_queries() &&
|
|
!self.sent_gossip_timestamp_filter {
|
|
return false;
|
|
}
|
|
match self.sync_status {
|
|
InitSyncTracker::NoSyncRequested => true,
|
|
InitSyncTracker::ChannelsSyncing(_) => false,
|
|
InitSyncTracker::NodesSyncing(sync_node_id) => sync_node_id.as_slice() < node_id.as_slice(),
|
|
}
|
|
}
|
|
|
|
/// Returns whether we should be reading bytes from this peer, based on whether its outbound
|
|
/// buffer still has space and we don't need to pause reads to get some writes out.
|
|
fn should_read(&mut self, gossip_processing_backlogged: bool) -> bool {
|
|
if !gossip_processing_backlogged {
|
|
self.received_channel_announce_since_backlogged = false;
|
|
}
|
|
self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE &&
|
|
(!gossip_processing_backlogged || !self.received_channel_announce_since_backlogged)
|
|
}
|
|
|
|
/// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's
|
|
/// outbound buffer. This is checked every time the peer's buffer may have been drained.
|
|
fn should_buffer_gossip_backfill(&self) -> bool {
|
|
self.pending_outbound_buffer.is_empty() && self.gossip_broadcast_buffer.is_empty()
|
|
&& self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
|
|
&& self.handshake_complete()
|
|
}
|
|
|
|
/// Determines if we should push an onion message onto a peer's outbound buffer. This is checked
|
|
/// every time the peer's buffer may have been drained.
|
|
fn should_buffer_onion_message(&self) -> bool {
|
|
self.pending_outbound_buffer.is_empty() && self.handshake_complete()
|
|
&& self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
|
|
}
|
|
|
|
/// Determines if we should push additional gossip broadcast messages onto a peer's outbound
|
|
/// buffer. This is checked every time the peer's buffer may have been drained.
|
|
fn should_buffer_gossip_broadcast(&self) -> bool {
|
|
self.pending_outbound_buffer.is_empty() && self.handshake_complete()
|
|
&& self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
|
|
}
|
|
|
|
/// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts.
|
|
fn buffer_full_drop_gossip_broadcast(&self) -> bool {
|
|
let total_outbound_buffered =
|
|
self.gossip_broadcast_buffer.len() + self.pending_outbound_buffer.len();
|
|
|
|
total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ||
|
|
self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
|
|
}
|
|
|
|
fn set_their_node_id(&mut self, node_id: PublicKey) {
|
|
self.their_node_id = Some((node_id, NodeId::from_pubkey(&node_id)));
|
|
}
|
|
}
|
|
|
|
/// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
|
|
/// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static
|
|
/// lifetimes). Other times you can afford a reference, which is more efficient, in which case
|
|
/// SimpleRefPeerManager is the more appropriate type. Defining these type aliases prevents
|
|
/// issues such as overly long function definitions.
|
|
///
|
|
/// (C-not exported) as `Arc`s don't make sense in bindings.
|
|
pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArcChannelManager<M, T, F, L>>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<L>>>, Arc<C>, Arc<L>>>, Arc<SimpleArcOnionMessenger<L>>, Arc<L>, IgnoringMessageHandler, Arc<KeysManager>>;
|
|
|
|
/// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference
|
|
/// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't
|
|
/// need a PeerManager with a static lifetime. You'll need a static lifetime in cases such as
|
|
/// usage of lightning-net-tokio (since tokio::spawn requires parameters with static lifetimes).
|
|
/// But if this is not necessary, using a reference is more efficient. Defining these type aliases
|
|
/// helps with issues such as long function definitions.
|
|
///
|
|
/// (C-not exported) as general type aliases don't make sense in bindings.
|
|
pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, 'j, 'k, 'l, 'm, SD, M, T, F, C, L> = PeerManager<SD, SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'm, M, T, F, L>, &'f P2PGossipSync<&'g NetworkGraph<&'f L>, &'h C, &'f L>, &'i SimpleRefOnionMessenger<'j, 'k, L>, &'f L, IgnoringMessageHandler, &'c KeysManager>;
|
|
|
|
/// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls
|
|
/// socket events into messages which it passes on to its [`MessageHandler`].
|
|
///
|
|
/// Locks are taken internally, so you must never assume that reentrancy from a
|
|
/// [`SocketDescriptor`] call back into [`PeerManager`] methods will not deadlock.
|
|
///
|
|
/// Calls to [`read_event`] will decode relevant messages and pass them to the
|
|
/// [`ChannelMessageHandler`], likely doing message processing in-line. Thus, the primary form of
|
|
/// parallelism in Rust-Lightning is in calls to [`read_event`]. Note, however, that calls to any
|
|
/// [`PeerManager`] functions related to the same connection must occur only in serial, making new
|
|
/// calls only after previous ones have returned.
|
|
///
|
|
/// Rather than using a plain PeerManager, it is preferable to use either a SimpleArcPeerManager
|
|
/// a SimpleRefPeerManager, for conciseness. See their documentation for more details, but
|
|
/// essentially you should default to using a SimpleRefPeerManager, and use a
|
|
/// SimpleArcPeerManager when you require a PeerManager with a static lifetime, such as when
|
|
/// you're using lightning-net-tokio.
|
|
///
|
|
/// [`read_event`]: PeerManager::read_event
|
|
pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CMH: Deref, NS: Deref> where
|
|
CM::Target: ChannelMessageHandler,
|
|
RM::Target: RoutingMessageHandler,
|
|
OM::Target: OnionMessageHandler,
|
|
L::Target: Logger,
|
|
CMH::Target: CustomMessageHandler,
|
|
NS::Target: NodeSigner {
|
|
message_handler: MessageHandler<CM, RM, OM>,
|
|
/// Connection state for each connected peer - we have an outer read-write lock which is taken
|
|
/// as read while we're doing processing for a peer and taken write when a peer is being added
|
|
/// or removed.
|
|
///
|
|
/// The inner Peer lock is held for sending and receiving bytes, but note that we do *not* hold
|
|
/// it while we're processing a message. This is fine as [`PeerManager::read_event`] requires
|
|
/// that there be no parallel calls for a given peer, so mutual exclusion of messages handed to
|
|
/// the `MessageHandler`s for a given peer is already guaranteed.
|
|
peers: FairRwLock<HashMap<Descriptor, Mutex<Peer>>>,
|
|
/// Only add to this set when noise completes.
|
|
/// Locked *after* peers. When an item is removed, it must be removed with the `peers` write
|
|
/// lock held. Entries may be added with only the `peers` read lock held (though the
|
|
/// `Descriptor` value must already exist in `peers`).
|
|
node_id_to_descriptor: Mutex<HashMap<PublicKey, Descriptor>>,
|
|
/// We can only have one thread processing events at once, but we don't usually need the full
|
|
/// `peers` write lock to do so, so instead we block on this empty mutex when entering
|
|
/// `process_events`.
|
|
event_processing_lock: Mutex<()>,
|
|
/// Because event processing is global and always does all available work before returning,
|
|
/// there is no reason for us to have many event processors waiting on the lock at once.
|
|
/// Instead, we limit the total blocked event processors to always exactly one by setting this
|
|
/// when an event process call is waiting.
|
|
blocked_event_processors: AtomicBool,
|
|
|
|
/// Used to track the last value sent in a node_announcement "timestamp" field. We ensure this
|
|
/// value increases strictly since we don't assume access to a time source.
|
|
last_node_announcement_serial: AtomicU32,
|
|
|
|
ephemeral_key_midstate: Sha256Engine,
|
|
custom_message_handler: CMH,
|
|
|
|
peer_counter: AtomicCounter,
|
|
|
|
gossip_processing_backlogged: AtomicBool,
|
|
gossip_processing_backlog_lifted: AtomicBool,
|
|
|
|
node_signer: NS,
|
|
|
|
logger: L,
|
|
secp_ctx: Secp256k1<secp256k1::SignOnly>
|
|
}
|
|
|
|
enum MessageHandlingError {
|
|
PeerHandleError(PeerHandleError),
|
|
LightningError(LightningError),
|
|
}
|
|
|
|
impl From<PeerHandleError> for MessageHandlingError {
|
|
fn from(error: PeerHandleError) -> Self {
|
|
MessageHandlingError::PeerHandleError(error)
|
|
}
|
|
}
|
|
|
|
impl From<LightningError> for MessageHandlingError {
|
|
fn from(error: LightningError) -> Self {
|
|
MessageHandlingError::LightningError(error)
|
|
}
|
|
}
|
|
|
|
macro_rules! encode_msg {
|
|
($msg: expr) => {{
|
|
let mut buffer = VecWriter(Vec::new());
|
|
wire::write($msg, &mut buffer).unwrap();
|
|
buffer.0
|
|
}}
|
|
}
|
|
|
|
impl<Descriptor: SocketDescriptor, CM: Deref, OM: Deref, L: Deref, NS: Deref> PeerManager<Descriptor, CM, IgnoringMessageHandler, OM, L, IgnoringMessageHandler, NS> where
|
|
CM::Target: ChannelMessageHandler,
|
|
OM::Target: OnionMessageHandler,
|
|
L::Target: Logger,
|
|
NS::Target: NodeSigner {
|
|
/// Constructs a new `PeerManager` with the given `ChannelMessageHandler` and
|
|
/// `OnionMessageHandler`. No routing message handler is used and network graph messages are
|
|
/// ignored.
|
|
///
|
|
/// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
|
|
/// cryptographically secure random bytes.
|
|
///
|
|
/// `current_time` is used as an always-increasing counter that survives across restarts and is
|
|
/// incremented irregularly internally. In general it is best to simply use the current UNIX
|
|
/// timestamp, however if it is not available a persistent counter that increases once per
|
|
/// minute should suffice.
|
|
///
|
|
/// (C-not exported) as we can't export a PeerManager with a dummy route handler
|
|
pub fn new_channel_only(channel_message_handler: CM, onion_message_handler: OM, current_time: u32, ephemeral_random_data: &[u8; 32], logger: L, node_signer: NS) -> Self {
|
|
Self::new(MessageHandler {
|
|
chan_handler: channel_message_handler,
|
|
route_handler: IgnoringMessageHandler{},
|
|
onion_message_handler,
|
|
}, current_time, ephemeral_random_data, logger, IgnoringMessageHandler{}, node_signer)
|
|
}
|
|
}
|
|
|
|
impl<Descriptor: SocketDescriptor, RM: Deref, L: Deref, NS: Deref> PeerManager<Descriptor, ErroringMessageHandler, RM, IgnoringMessageHandler, L, IgnoringMessageHandler, NS> where
|
|
RM::Target: RoutingMessageHandler,
|
|
L::Target: Logger,
|
|
NS::Target: NodeSigner {
|
|
/// Constructs a new `PeerManager` with the given `RoutingMessageHandler`. No channel message
|
|
/// handler or onion message handler is used and onion and channel messages will be ignored (or
|
|
/// generate error messages). Note that some other lightning implementations time-out connections
|
|
/// after some time if no channel is built with the peer.
|
|
///
|
|
/// `current_time` is used as an always-increasing counter that survives across restarts and is
|
|
/// incremented irregularly internally. In general it is best to simply use the current UNIX
|
|
/// timestamp, however if it is not available a persistent counter that increases once per
|
|
/// minute should suffice.
|
|
///
|
|
/// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
|
|
/// cryptographically secure random bytes.
|
|
///
|
|
/// (C-not exported) as we can't export a PeerManager with a dummy channel handler
|
|
pub fn new_routing_only(routing_message_handler: RM, current_time: u32, ephemeral_random_data: &[u8; 32], logger: L, node_signer: NS) -> Self {
|
|
Self::new(MessageHandler {
|
|
chan_handler: ErroringMessageHandler::new(),
|
|
route_handler: routing_message_handler,
|
|
onion_message_handler: IgnoringMessageHandler{},
|
|
}, current_time, ephemeral_random_data, logger, IgnoringMessageHandler{}, node_signer)
|
|
}
|
|
}
|
|
|
|
/// A simple wrapper that optionally prints ` from <pubkey>` for an optional pubkey.
|
|
/// This works around `format!()` taking a reference to each argument, preventing
|
|
/// `if let Some(node_id) = peer.their_node_id { format!(.., node_id) } else { .. }` from compiling
|
|
/// due to lifetime errors.
|
|
struct OptionalFromDebugger<'a>(&'a Option<(PublicKey, NodeId)>);
|
|
impl core::fmt::Display for OptionalFromDebugger<'_> {
|
|
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
|
|
if let Some((node_id, _)) = self.0 { write!(f, " from {}", log_pubkey!(node_id)) } else { Ok(()) }
|
|
}
|
|
}
|
|
|
|
/// A function used to filter out local or private addresses
|
|
/// <https://www.iana.org./assignments/ipv4-address-space/ipv4-address-space.xhtml>
|
|
/// <https://www.iana.org/assignments/ipv6-address-space/ipv6-address-space.xhtml>
|
|
fn filter_addresses(ip_address: Option<NetAddress>) -> Option<NetAddress> {
|
|
match ip_address{
|
|
// For IPv4 range 10.0.0.0 - 10.255.255.255 (10/8)
|
|
Some(NetAddress::IPv4{addr: [10, _, _, _], port: _}) => None,
|
|
// For IPv4 range 0.0.0.0 - 0.255.255.255 (0/8)
|
|
Some(NetAddress::IPv4{addr: [0, _, _, _], port: _}) => None,
|
|
// For IPv4 range 100.64.0.0 - 100.127.255.255 (100.64/10)
|
|
Some(NetAddress::IPv4{addr: [100, 64..=127, _, _], port: _}) => None,
|
|
// For IPv4 range 127.0.0.0 - 127.255.255.255 (127/8)
|
|
Some(NetAddress::IPv4{addr: [127, _, _, _], port: _}) => None,
|
|
// For IPv4 range 169.254.0.0 - 169.254.255.255 (169.254/16)
|
|
Some(NetAddress::IPv4{addr: [169, 254, _, _], port: _}) => None,
|
|
// For IPv4 range 172.16.0.0 - 172.31.255.255 (172.16/12)
|
|
Some(NetAddress::IPv4{addr: [172, 16..=31, _, _], port: _}) => None,
|
|
// For IPv4 range 192.168.0.0 - 192.168.255.255 (192.168/16)
|
|
Some(NetAddress::IPv4{addr: [192, 168, _, _], port: _}) => None,
|
|
// For IPv4 range 192.88.99.0 - 192.88.99.255 (192.88.99/24)
|
|
Some(NetAddress::IPv4{addr: [192, 88, 99, _], port: _}) => None,
|
|
// For IPv6 range 2000:0000:0000:0000:0000:0000:0000:0000 - 3fff:ffff:ffff:ffff:ffff:ffff:ffff:ffff (2000::/3)
|
|
Some(NetAddress::IPv6{addr: [0x20..=0x3F, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], port: _}) => ip_address,
|
|
// For remaining addresses
|
|
Some(NetAddress::IPv6{addr: _, port: _}) => None,
|
|
Some(..) => ip_address,
|
|
None => None,
|
|
}
|
|
}
|
|
|
|
impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CMH: Deref, NS: Deref> PeerManager<Descriptor, CM, RM, OM, L, CMH, NS> where
|
|
CM::Target: ChannelMessageHandler,
|
|
RM::Target: RoutingMessageHandler,
|
|
OM::Target: OnionMessageHandler,
|
|
L::Target: Logger,
|
|
CMH::Target: CustomMessageHandler,
|
|
NS::Target: NodeSigner
|
|
{
|
|
/// Constructs a new PeerManager with the given message handlers and node_id secret key
|
|
/// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
|
|
/// cryptographically secure random bytes.
|
|
///
|
|
/// `current_time` is used as an always-increasing counter that survives across restarts and is
|
|
/// incremented irregularly internally. In general it is best to simply use the current UNIX
|
|
/// timestamp, however if it is not available a persistent counter that increases once per
|
|
/// minute should suffice.
|
|
pub fn new(message_handler: MessageHandler<CM, RM, OM>, current_time: u32, ephemeral_random_data: &[u8; 32], logger: L, custom_message_handler: CMH, node_signer: NS) -> Self {
|
|
let mut ephemeral_key_midstate = Sha256::engine();
|
|
ephemeral_key_midstate.input(ephemeral_random_data);
|
|
|
|
let mut secp_ctx = Secp256k1::signing_only();
|
|
let ephemeral_hash = Sha256::from_engine(ephemeral_key_midstate.clone()).into_inner();
|
|
secp_ctx.seeded_randomize(&ephemeral_hash);
|
|
|
|
PeerManager {
|
|
message_handler,
|
|
peers: FairRwLock::new(HashMap::new()),
|
|
node_id_to_descriptor: Mutex::new(HashMap::new()),
|
|
event_processing_lock: Mutex::new(()),
|
|
blocked_event_processors: AtomicBool::new(false),
|
|
ephemeral_key_midstate,
|
|
peer_counter: AtomicCounter::new(),
|
|
gossip_processing_backlogged: AtomicBool::new(false),
|
|
gossip_processing_backlog_lifted: AtomicBool::new(false),
|
|
last_node_announcement_serial: AtomicU32::new(current_time),
|
|
logger,
|
|
custom_message_handler,
|
|
node_signer,
|
|
secp_ctx,
|
|
}
|
|
}
|
|
|
|
/// Get a list of tuples mapping from node id to network addresses for peers which have
|
|
/// completed the initial handshake.
|
|
///
|
|
/// For outbound connections, the [`PublicKey`] will be the same as the `their_node_id` parameter
|
|
/// passed in to [`Self::new_outbound_connection`], however entries will only appear once the initial
|
|
/// handshake has completed and we are sure the remote peer has the private key for the given
|
|
/// [`PublicKey`].
|
|
///
|
|
/// The returned `Option`s will only be `Some` if an address had been previously given via
|
|
/// [`Self::new_outbound_connection`] or [`Self::new_inbound_connection`].
|
|
pub fn get_peer_node_ids(&self) -> Vec<(PublicKey, Option<NetAddress>)> {
|
|
let peers = self.peers.read().unwrap();
|
|
peers.values().filter_map(|peer_mutex| {
|
|
let p = peer_mutex.lock().unwrap();
|
|
if !p.handshake_complete() {
|
|
return None;
|
|
}
|
|
Some((p.their_node_id.unwrap().0, p.their_net_address.clone()))
|
|
}).collect()
|
|
}
|
|
|
|
fn get_ephemeral_key(&self) -> SecretKey {
|
|
let mut ephemeral_hash = self.ephemeral_key_midstate.clone();
|
|
let counter = self.peer_counter.get_increment();
|
|
ephemeral_hash.input(&counter.to_le_bytes());
|
|
SecretKey::from_slice(&Sha256::from_engine(ephemeral_hash).into_inner()).expect("You broke SHA-256!")
|
|
}
|
|
|
|
/// Indicates a new outbound connection has been established to a node with the given `node_id`
|
|
/// and an optional remote network address.
|
|
///
|
|
/// The remote network address adds the option to report a remote IP address back to a connecting
|
|
/// peer using the init message.
|
|
/// The user should pass the remote network address of the host they are connected to.
|
|
///
|
|
/// If an `Err` is returned here you must disconnect the connection immediately.
|
|
///
|
|
/// Returns a small number of bytes to send to the remote node (currently always 50).
|
|
///
|
|
/// Panics if descriptor is duplicative with some other descriptor which has not yet been
|
|
/// [`socket_disconnected()`].
|
|
///
|
|
/// [`socket_disconnected()`]: PeerManager::socket_disconnected
|
|
pub fn new_outbound_connection(&self, their_node_id: PublicKey, descriptor: Descriptor, remote_network_address: Option<NetAddress>) -> Result<Vec<u8>, PeerHandleError> {
|
|
let mut peer_encryptor = PeerChannelEncryptor::new_outbound(their_node_id.clone(), self.get_ephemeral_key());
|
|
let res = peer_encryptor.get_act_one(&self.secp_ctx).to_vec();
|
|
let pending_read_buffer = [0; 50].to_vec(); // Noise act two is 50 bytes
|
|
|
|
let mut peers = self.peers.write().unwrap();
|
|
match peers.entry(descriptor) {
|
|
hash_map::Entry::Occupied(_) => {
|
|
debug_assert!(false, "PeerManager driver duplicated descriptors!");
|
|
Err(PeerHandleError {})
|
|
},
|
|
hash_map::Entry::Vacant(e) => {
|
|
e.insert(Mutex::new(Peer {
|
|
channel_encryptor: peer_encryptor,
|
|
their_node_id: None,
|
|
their_features: None,
|
|
their_net_address: remote_network_address,
|
|
|
|
pending_outbound_buffer: LinkedList::new(),
|
|
pending_outbound_buffer_first_msg_offset: 0,
|
|
gossip_broadcast_buffer: LinkedList::new(),
|
|
awaiting_write_event: false,
|
|
|
|
pending_read_buffer,
|
|
pending_read_buffer_pos: 0,
|
|
pending_read_is_header: false,
|
|
|
|
sync_status: InitSyncTracker::NoSyncRequested,
|
|
|
|
msgs_sent_since_pong: 0,
|
|
awaiting_pong_timer_tick_intervals: 0,
|
|
received_message_since_timer_tick: false,
|
|
sent_gossip_timestamp_filter: false,
|
|
|
|
received_channel_announce_since_backlogged: false,
|
|
inbound_connection: false,
|
|
}));
|
|
Ok(res)
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Indicates a new inbound connection has been established to a node with an optional remote
|
|
/// network address.
|
|
///
|
|
/// The remote network address adds the option to report a remote IP address back to a connecting
|
|
/// peer using the init message.
|
|
/// The user should pass the remote network address of the host they are connected to.
|
|
///
|
|
/// May refuse the connection by returning an Err, but will never write bytes to the remote end
|
|
/// (outbound connector always speaks first). If an `Err` is returned here you must disconnect
|
|
/// the connection immediately.
|
|
///
|
|
/// Panics if descriptor is duplicative with some other descriptor which has not yet been
|
|
/// [`socket_disconnected()`].
|
|
///
|
|
/// [`socket_disconnected()`]: PeerManager::socket_disconnected
|
|
pub fn new_inbound_connection(&self, descriptor: Descriptor, remote_network_address: Option<NetAddress>) -> Result<(), PeerHandleError> {
|
|
let peer_encryptor = PeerChannelEncryptor::new_inbound(&self.node_signer);
|
|
let pending_read_buffer = [0; 50].to_vec(); // Noise act one is 50 bytes
|
|
|
|
let mut peers = self.peers.write().unwrap();
|
|
match peers.entry(descriptor) {
|
|
hash_map::Entry::Occupied(_) => {
|
|
debug_assert!(false, "PeerManager driver duplicated descriptors!");
|
|
Err(PeerHandleError {})
|
|
},
|
|
hash_map::Entry::Vacant(e) => {
|
|
e.insert(Mutex::new(Peer {
|
|
channel_encryptor: peer_encryptor,
|
|
their_node_id: None,
|
|
their_features: None,
|
|
their_net_address: remote_network_address,
|
|
|
|
pending_outbound_buffer: LinkedList::new(),
|
|
pending_outbound_buffer_first_msg_offset: 0,
|
|
gossip_broadcast_buffer: LinkedList::new(),
|
|
awaiting_write_event: false,
|
|
|
|
pending_read_buffer,
|
|
pending_read_buffer_pos: 0,
|
|
pending_read_is_header: false,
|
|
|
|
sync_status: InitSyncTracker::NoSyncRequested,
|
|
|
|
msgs_sent_since_pong: 0,
|
|
awaiting_pong_timer_tick_intervals: 0,
|
|
received_message_since_timer_tick: false,
|
|
sent_gossip_timestamp_filter: false,
|
|
|
|
received_channel_announce_since_backlogged: false,
|
|
inbound_connection: true,
|
|
}));
|
|
Ok(())
|
|
}
|
|
}
|
|
}
|
|
|
|
fn peer_should_read(&self, peer: &mut Peer) -> bool {
|
|
peer.should_read(self.gossip_processing_backlogged.load(Ordering::Relaxed))
|
|
}
|
|
|
|
fn update_gossip_backlogged(&self) {
|
|
let new_state = self.message_handler.route_handler.processing_queue_high();
|
|
let prev_state = self.gossip_processing_backlogged.swap(new_state, Ordering::Relaxed);
|
|
if prev_state && !new_state {
|
|
self.gossip_processing_backlog_lifted.store(true, Ordering::Relaxed);
|
|
}
|
|
}
|
|
|
|
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer, force_one_write: bool) {
|
|
let mut have_written = false;
|
|
while !peer.awaiting_write_event {
|
|
if peer.should_buffer_onion_message() {
|
|
if let Some((peer_node_id, _)) = peer.their_node_id {
|
|
if let Some(next_onion_message) =
|
|
self.message_handler.onion_message_handler.next_onion_message_for_peer(peer_node_id) {
|
|
self.enqueue_message(peer, &next_onion_message);
|
|
}
|
|
}
|
|
}
|
|
if peer.should_buffer_gossip_broadcast() {
|
|
if let Some(msg) = peer.gossip_broadcast_buffer.pop_front() {
|
|
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_buffer(&msg[..]));
|
|
}
|
|
}
|
|
if peer.should_buffer_gossip_backfill() {
|
|
match peer.sync_status {
|
|
InitSyncTracker::NoSyncRequested => {},
|
|
InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
|
|
if let Some((announce, update_a_option, update_b_option)) =
|
|
self.message_handler.route_handler.get_next_channel_announcement(c)
|
|
{
|
|
self.enqueue_message(peer, &announce);
|
|
if let Some(update_a) = update_a_option {
|
|
self.enqueue_message(peer, &update_a);
|
|
}
|
|
if let Some(update_b) = update_b_option {
|
|
self.enqueue_message(peer, &update_b);
|
|
}
|
|
peer.sync_status = InitSyncTracker::ChannelsSyncing(announce.contents.short_channel_id + 1);
|
|
} else {
|
|
peer.sync_status = InitSyncTracker::ChannelsSyncing(0xffff_ffff_ffff_ffff);
|
|
}
|
|
},
|
|
InitSyncTracker::ChannelsSyncing(c) if c == 0xffff_ffff_ffff_ffff => {
|
|
if let Some(msg) = self.message_handler.route_handler.get_next_node_announcement(None) {
|
|
self.enqueue_message(peer, &msg);
|
|
peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
|
|
} else {
|
|
peer.sync_status = InitSyncTracker::NoSyncRequested;
|
|
}
|
|
},
|
|
InitSyncTracker::ChannelsSyncing(_) => unreachable!(),
|
|
InitSyncTracker::NodesSyncing(sync_node_id) => {
|
|
if let Some(msg) = self.message_handler.route_handler.get_next_node_announcement(Some(&sync_node_id)) {
|
|
self.enqueue_message(peer, &msg);
|
|
peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
|
|
} else {
|
|
peer.sync_status = InitSyncTracker::NoSyncRequested;
|
|
}
|
|
},
|
|
}
|
|
}
|
|
if peer.msgs_sent_since_pong >= BUFFER_DRAIN_MSGS_PER_TICK {
|
|
self.maybe_send_extra_ping(peer);
|
|
}
|
|
|
|
let should_read = self.peer_should_read(peer);
|
|
let next_buff = match peer.pending_outbound_buffer.front() {
|
|
None => {
|
|
if force_one_write && !have_written {
|
|
if should_read {
|
|
let data_sent = descriptor.send_data(&[], should_read);
|
|
debug_assert_eq!(data_sent, 0, "Can't write more than no data");
|
|
}
|
|
}
|
|
return
|
|
},
|
|
Some(buff) => buff,
|
|
};
|
|
|
|
let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
|
|
let data_sent = descriptor.send_data(pending, should_read);
|
|
have_written = true;
|
|
peer.pending_outbound_buffer_first_msg_offset += data_sent;
|
|
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
|
|
peer.pending_outbound_buffer_first_msg_offset = 0;
|
|
peer.pending_outbound_buffer.pop_front();
|
|
} else {
|
|
peer.awaiting_write_event = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Indicates that there is room to write data to the given socket descriptor.
|
|
///
|
|
/// May return an Err to indicate that the connection should be closed.
|
|
///
|
|
/// May call [`send_data`] on the descriptor passed in (or an equal descriptor) before
|
|
/// returning. Thus, be very careful with reentrancy issues! The invariants around calling
|
|
/// [`write_buffer_space_avail`] in case a write did not fully complete must still hold - be
|
|
/// ready to call `[write_buffer_space_avail`] again if a write call generated here isn't
|
|
/// sufficient!
|
|
///
|
|
/// [`send_data`]: SocketDescriptor::send_data
|
|
/// [`write_buffer_space_avail`]: PeerManager::write_buffer_space_avail
|
|
pub fn write_buffer_space_avail(&self, descriptor: &mut Descriptor) -> Result<(), PeerHandleError> {
|
|
let peers = self.peers.read().unwrap();
|
|
match peers.get(descriptor) {
|
|
None => {
|
|
// This is most likely a simple race condition where the user found that the socket
|
|
// was writeable, then we told the user to `disconnect_socket()`, then they called
|
|
// this method. Return an error to make sure we get disconnected.
|
|
return Err(PeerHandleError { });
|
|
},
|
|
Some(peer_mutex) => {
|
|
let mut peer = peer_mutex.lock().unwrap();
|
|
peer.awaiting_write_event = false;
|
|
self.do_attempt_write_data(descriptor, &mut peer, false);
|
|
}
|
|
};
|
|
Ok(())
|
|
}
|
|
|
|
/// Indicates that data was read from the given socket descriptor.
|
|
///
|
|
/// May return an Err to indicate that the connection should be closed.
|
|
///
|
|
/// Will *not* call back into [`send_data`] on any descriptors to avoid reentrancy complexity.
|
|
/// Thus, however, you should call [`process_events`] after any `read_event` to generate
|
|
/// [`send_data`] calls to handle responses.
|
|
///
|
|
/// If `Ok(true)` is returned, further read_events should not be triggered until a
|
|
/// [`send_data`] call on this descriptor has `resume_read` set (preventing DoS issues in the
|
|
/// send buffer).
|
|
///
|
|
/// In order to avoid processing too many messages at once per peer, `data` should be on the
|
|
/// order of 4KiB.
|
|
///
|
|
/// [`send_data`]: SocketDescriptor::send_data
|
|
/// [`process_events`]: PeerManager::process_events
|
|
pub fn read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
|
|
match self.do_read_event(peer_descriptor, data) {
|
|
Ok(res) => Ok(res),
|
|
Err(e) => {
|
|
log_trace!(self.logger, "Peer sent invalid data or we decided to disconnect due to a protocol error");
|
|
self.disconnect_event_internal(peer_descriptor);
|
|
Err(e)
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Append a message to a peer's pending outbound/write buffer
|
|
fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
|
|
if is_gossip_msg(message.type_id()) {
|
|
log_gossip!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0));
|
|
} else {
|
|
log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0))
|
|
}
|
|
peer.msgs_sent_since_pong += 1;
|
|
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(message));
|
|
}
|
|
|
|
/// Append a message to a peer's pending outbound/write gossip broadcast buffer
|
|
fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: Vec<u8>) {
|
|
peer.msgs_sent_since_pong += 1;
|
|
peer.gossip_broadcast_buffer.push_back(encoded_message);
|
|
}
|
|
|
|
fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
|
|
let mut pause_read = false;
|
|
let peers = self.peers.read().unwrap();
|
|
let mut msgs_to_forward = Vec::new();
|
|
let mut peer_node_id = None;
|
|
match peers.get(peer_descriptor) {
|
|
None => {
|
|
// This is most likely a simple race condition where the user read some bytes
|
|
// from the socket, then we told the user to `disconnect_socket()`, then they
|
|
// called this method. Return an error to make sure we get disconnected.
|
|
return Err(PeerHandleError { });
|
|
},
|
|
Some(peer_mutex) => {
|
|
let mut read_pos = 0;
|
|
while read_pos < data.len() {
|
|
macro_rules! try_potential_handleerror {
|
|
($peer: expr, $thing: expr) => {
|
|
match $thing {
|
|
Ok(x) => x,
|
|
Err(e) => {
|
|
match e.action {
|
|
msgs::ErrorAction::DisconnectPeer { msg: _ } => {
|
|
//TODO: Try to push msg
|
|
log_debug!(self.logger, "Error handling message{}; disconnecting peer with: {}", OptionalFromDebugger(&peer_node_id), e.err);
|
|
return Err(PeerHandleError { });
|
|
},
|
|
msgs::ErrorAction::IgnoreAndLog(level) => {
|
|
log_given_level!(self.logger, level, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer_node_id), e.err);
|
|
continue
|
|
},
|
|
msgs::ErrorAction::IgnoreDuplicateGossip => continue, // Don't even bother logging these
|
|
msgs::ErrorAction::IgnoreError => {
|
|
log_debug!(self.logger, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer_node_id), e.err);
|
|
continue;
|
|
},
|
|
msgs::ErrorAction::SendErrorMessage { msg } => {
|
|
log_debug!(self.logger, "Error handling message{}; sending error message with: {}", OptionalFromDebugger(&peer_node_id), e.err);
|
|
self.enqueue_message($peer, &msg);
|
|
continue;
|
|
},
|
|
msgs::ErrorAction::SendWarningMessage { msg, log_level } => {
|
|
log_given_level!(self.logger, log_level, "Error handling message{}; sending warning message with: {}", OptionalFromDebugger(&peer_node_id), e.err);
|
|
self.enqueue_message($peer, &msg);
|
|
continue;
|
|
},
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
let mut peer_lock = peer_mutex.lock().unwrap();
|
|
let peer = &mut *peer_lock;
|
|
let mut msg_to_handle = None;
|
|
if peer_node_id.is_none() {
|
|
peer_node_id = peer.their_node_id.clone();
|
|
}
|
|
|
|
assert!(peer.pending_read_buffer.len() > 0);
|
|
assert!(peer.pending_read_buffer.len() > peer.pending_read_buffer_pos);
|
|
|
|
{
|
|
let data_to_copy = cmp::min(peer.pending_read_buffer.len() - peer.pending_read_buffer_pos, data.len() - read_pos);
|
|
peer.pending_read_buffer[peer.pending_read_buffer_pos..peer.pending_read_buffer_pos + data_to_copy].copy_from_slice(&data[read_pos..read_pos + data_to_copy]);
|
|
read_pos += data_to_copy;
|
|
peer.pending_read_buffer_pos += data_to_copy;
|
|
}
|
|
|
|
if peer.pending_read_buffer_pos == peer.pending_read_buffer.len() {
|
|
peer.pending_read_buffer_pos = 0;
|
|
|
|
macro_rules! insert_node_id {
|
|
() => {
|
|
match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap().0) {
|
|
hash_map::Entry::Occupied(e) => {
|
|
log_trace!(self.logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap().0));
|
|
peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event
|
|
// Check that the peers map is consistent with the
|
|
// node_id_to_descriptor map, as this has been broken
|
|
// before.
|
|
debug_assert!(peers.get(e.get()).is_some());
|
|
return Err(PeerHandleError { })
|
|
},
|
|
hash_map::Entry::Vacant(entry) => {
|
|
log_debug!(self.logger, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap().0));
|
|
entry.insert(peer_descriptor.clone())
|
|
},
|
|
};
|
|
}
|
|
}
|
|
|
|
let next_step = peer.channel_encryptor.get_noise_step();
|
|
match next_step {
|
|
NextNoiseStep::ActOne => {
|
|
let act_two = try_potential_handleerror!(peer, peer.channel_encryptor
|
|
.process_act_one_with_keys(&peer.pending_read_buffer[..],
|
|
&self.node_signer, self.get_ephemeral_key(), &self.secp_ctx)).to_vec();
|
|
peer.pending_outbound_buffer.push_back(act_two);
|
|
peer.pending_read_buffer = [0; 66].to_vec(); // act three is 66 bytes long
|
|
},
|
|
NextNoiseStep::ActTwo => {
|
|
let (act_three, their_node_id) = try_potential_handleerror!(peer,
|
|
peer.channel_encryptor.process_act_two(&peer.pending_read_buffer[..],
|
|
&self.node_signer));
|
|
peer.pending_outbound_buffer.push_back(act_three.to_vec());
|
|
peer.pending_read_buffer = [0; 18].to_vec(); // Message length header is 18 bytes
|
|
peer.pending_read_is_header = true;
|
|
|
|
peer.set_their_node_id(their_node_id);
|
|
insert_node_id!();
|
|
let features = self.message_handler.chan_handler.provided_init_features(&their_node_id)
|
|
.or(self.message_handler.route_handler.provided_init_features(&their_node_id))
|
|
.or(self.message_handler.onion_message_handler.provided_init_features(&their_node_id));
|
|
let resp = msgs::Init { features, remote_network_address: filter_addresses(peer.their_net_address.clone()) };
|
|
self.enqueue_message(peer, &resp);
|
|
peer.awaiting_pong_timer_tick_intervals = 0;
|
|
},
|
|
NextNoiseStep::ActThree => {
|
|
let their_node_id = try_potential_handleerror!(peer,
|
|
peer.channel_encryptor.process_act_three(&peer.pending_read_buffer[..]));
|
|
peer.pending_read_buffer = [0; 18].to_vec(); // Message length header is 18 bytes
|
|
peer.pending_read_is_header = true;
|
|
peer.set_their_node_id(their_node_id);
|
|
insert_node_id!();
|
|
let features = self.message_handler.chan_handler.provided_init_features(&their_node_id)
|
|
.or(self.message_handler.route_handler.provided_init_features(&their_node_id))
|
|
.or(self.message_handler.onion_message_handler.provided_init_features(&their_node_id));
|
|
let resp = msgs::Init { features, remote_network_address: filter_addresses(peer.their_net_address.clone()) };
|
|
self.enqueue_message(peer, &resp);
|
|
peer.awaiting_pong_timer_tick_intervals = 0;
|
|
},
|
|
NextNoiseStep::NoiseComplete => {
|
|
if peer.pending_read_is_header {
|
|
let msg_len = try_potential_handleerror!(peer,
|
|
peer.channel_encryptor.decrypt_length_header(&peer.pending_read_buffer[..]));
|
|
if peer.pending_read_buffer.capacity() > 8192 { peer.pending_read_buffer = Vec::new(); }
|
|
peer.pending_read_buffer.resize(msg_len as usize + 16, 0);
|
|
if msg_len < 2 { // Need at least the message type tag
|
|
return Err(PeerHandleError { });
|
|
}
|
|
peer.pending_read_is_header = false;
|
|
} else {
|
|
let msg_data = try_potential_handleerror!(peer,
|
|
peer.channel_encryptor.decrypt_message(&peer.pending_read_buffer[..]));
|
|
assert!(msg_data.len() >= 2);
|
|
|
|
// Reset read buffer
|
|
if peer.pending_read_buffer.capacity() > 8192 { peer.pending_read_buffer = Vec::new(); }
|
|
peer.pending_read_buffer.resize(18, 0);
|
|
peer.pending_read_is_header = true;
|
|
|
|
let mut reader = io::Cursor::new(&msg_data[..]);
|
|
let message_result = wire::read(&mut reader, &*self.custom_message_handler);
|
|
let message = match message_result {
|
|
Ok(x) => x,
|
|
Err(e) => {
|
|
match e {
|
|
// Note that to avoid recursion we never call
|
|
// `do_attempt_write_data` from here, causing
|
|
// the messages enqueued here to not actually
|
|
// be sent before the peer is disconnected.
|
|
(msgs::DecodeError::UnknownRequiredFeature, Some(ty)) if is_gossip_msg(ty) => {
|
|
log_gossip!(self.logger, "Got a channel/node announcement with an unknown required feature flag, you may want to update!");
|
|
continue;
|
|
}
|
|
(msgs::DecodeError::UnsupportedCompression, _) => {
|
|
log_gossip!(self.logger, "We don't support zlib-compressed message fields, sending a warning and ignoring message");
|
|
self.enqueue_message(peer, &msgs::WarningMessage { channel_id: [0; 32], data: "Unsupported message compression: zlib".to_owned() });
|
|
continue;
|
|
}
|
|
(_, Some(ty)) if is_gossip_msg(ty) => {
|
|
log_gossip!(self.logger, "Got an invalid value while deserializing a gossip message");
|
|
self.enqueue_message(peer, &msgs::WarningMessage {
|
|
channel_id: [0; 32],
|
|
data: format!("Unreadable/bogus gossip message of type {}", ty),
|
|
});
|
|
continue;
|
|
}
|
|
(msgs::DecodeError::UnknownRequiredFeature, ty) => {
|
|
log_gossip!(self.logger, "Received a message with an unknown required feature flag or TLV, you may want to update!");
|
|
self.enqueue_message(peer, &msgs::WarningMessage { channel_id: [0; 32], data: format!("Received an unknown required feature/TLV in message type {:?}", ty) });
|
|
return Err(PeerHandleError { });
|
|
}
|
|
(msgs::DecodeError::UnknownVersion, _) => return Err(PeerHandleError { }),
|
|
(msgs::DecodeError::InvalidValue, _) => {
|
|
log_debug!(self.logger, "Got an invalid value while deserializing message");
|
|
return Err(PeerHandleError { });
|
|
}
|
|
(msgs::DecodeError::ShortRead, _) => {
|
|
log_debug!(self.logger, "Deserialization failed due to shortness of message");
|
|
return Err(PeerHandleError { });
|
|
}
|
|
(msgs::DecodeError::BadLengthDescriptor, _) => return Err(PeerHandleError { }),
|
|
(msgs::DecodeError::Io(_), _) => return Err(PeerHandleError { }),
|
|
}
|
|
}
|
|
};
|
|
|
|
msg_to_handle = Some(message);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
pause_read = !self.peer_should_read(peer);
|
|
|
|
if let Some(message) = msg_to_handle {
|
|
match self.handle_message(&peer_mutex, peer_lock, message) {
|
|
Err(handling_error) => match handling_error {
|
|
MessageHandlingError::PeerHandleError(e) => { return Err(e) },
|
|
MessageHandlingError::LightningError(e) => {
|
|
try_potential_handleerror!(&mut peer_mutex.lock().unwrap(), Err(e));
|
|
},
|
|
},
|
|
Ok(Some(msg)) => {
|
|
msgs_to_forward.push(msg);
|
|
},
|
|
Ok(None) => {},
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
for msg in msgs_to_forward.drain(..) {
|
|
self.forward_broadcast_msg(&*peers, &msg, peer_node_id.as_ref().map(|(pk, _)| pk));
|
|
}
|
|
|
|
Ok(pause_read)
|
|
}
|
|
|
|
/// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
|
|
/// Returns the message back if it needs to be broadcasted to all other peers.
|
|
fn handle_message(
|
|
&self,
|
|
peer_mutex: &Mutex<Peer>,
|
|
mut peer_lock: MutexGuard<Peer>,
|
|
message: wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>
|
|
) -> Result<Option<wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> {
|
|
let their_node_id = peer_lock.their_node_id.clone().expect("We know the peer's public key by the time we receive messages").0;
|
|
peer_lock.received_message_since_timer_tick = true;
|
|
|
|
// Need an Init as first message
|
|
if let wire::Message::Init(msg) = message {
|
|
if msg.features.requires_unknown_bits() {
|
|
log_debug!(self.logger, "Peer features required unknown version bits");
|
|
return Err(PeerHandleError { }.into());
|
|
}
|
|
if peer_lock.their_features.is_some() {
|
|
return Err(PeerHandleError { }.into());
|
|
}
|
|
|
|
log_info!(self.logger, "Received peer Init message from {}: {}", log_pubkey!(their_node_id), msg.features);
|
|
|
|
// For peers not supporting gossip queries start sync now, otherwise wait until we receive a filter.
|
|
if msg.features.initial_routing_sync() && !msg.features.supports_gossip_queries() {
|
|
peer_lock.sync_status = InitSyncTracker::ChannelsSyncing(0);
|
|
}
|
|
|
|
if let Err(()) = self.message_handler.route_handler.peer_connected(&their_node_id, &msg, peer_lock.inbound_connection) {
|
|
log_debug!(self.logger, "Route Handler decided we couldn't communicate with peer {}", log_pubkey!(their_node_id));
|
|
return Err(PeerHandleError { }.into());
|
|
}
|
|
if let Err(()) = self.message_handler.chan_handler.peer_connected(&their_node_id, &msg, peer_lock.inbound_connection) {
|
|
log_debug!(self.logger, "Channel Handler decided we couldn't communicate with peer {}", log_pubkey!(their_node_id));
|
|
return Err(PeerHandleError { }.into());
|
|
}
|
|
if let Err(()) = self.message_handler.onion_message_handler.peer_connected(&their_node_id, &msg, peer_lock.inbound_connection) {
|
|
log_debug!(self.logger, "Onion Message Handler decided we couldn't communicate with peer {}", log_pubkey!(their_node_id));
|
|
return Err(PeerHandleError { }.into());
|
|
}
|
|
|
|
peer_lock.their_features = Some(msg.features);
|
|
return Ok(None);
|
|
} else if peer_lock.their_features.is_none() {
|
|
log_debug!(self.logger, "Peer {} sent non-Init first message", log_pubkey!(their_node_id));
|
|
return Err(PeerHandleError { }.into());
|
|
}
|
|
|
|
if let wire::Message::GossipTimestampFilter(_msg) = message {
|
|
// When supporting gossip messages, start inital gossip sync only after we receive
|
|
// a GossipTimestampFilter
|
|
if peer_lock.their_features.as_ref().unwrap().supports_gossip_queries() &&
|
|
!peer_lock.sent_gossip_timestamp_filter {
|
|
peer_lock.sent_gossip_timestamp_filter = true;
|
|
peer_lock.sync_status = InitSyncTracker::ChannelsSyncing(0);
|
|
}
|
|
return Ok(None);
|
|
}
|
|
|
|
if let wire::Message::ChannelAnnouncement(ref _msg) = message {
|
|
peer_lock.received_channel_announce_since_backlogged = true;
|
|
}
|
|
|
|
mem::drop(peer_lock);
|
|
|
|
if is_gossip_msg(message.type_id()) {
|
|
log_gossip!(self.logger, "Received message {:?} from {}", message, log_pubkey!(their_node_id));
|
|
} else {
|
|
log_trace!(self.logger, "Received message {:?} from {}", message, log_pubkey!(their_node_id));
|
|
}
|
|
|
|
let mut should_forward = None;
|
|
|
|
match message {
|
|
// Setup and Control messages:
|
|
wire::Message::Init(_) => {
|
|
// Handled above
|
|
},
|
|
wire::Message::GossipTimestampFilter(_) => {
|
|
// Handled above
|
|
},
|
|
wire::Message::Error(msg) => {
|
|
let mut data_is_printable = true;
|
|
for b in msg.data.bytes() {
|
|
if b < 32 || b > 126 {
|
|
data_is_printable = false;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if data_is_printable {
|
|
log_debug!(self.logger, "Got Err message from {}: {}", log_pubkey!(their_node_id), msg.data);
|
|
} else {
|
|
log_debug!(self.logger, "Got Err message from {} with non-ASCII error message", log_pubkey!(their_node_id));
|
|
}
|
|
self.message_handler.chan_handler.handle_error(&their_node_id, &msg);
|
|
if msg.channel_id == [0; 32] {
|
|
return Err(PeerHandleError { }.into());
|
|
}
|
|
},
|
|
wire::Message::Warning(msg) => {
|
|
let mut data_is_printable = true;
|
|
for b in msg.data.bytes() {
|
|
if b < 32 || b > 126 {
|
|
data_is_printable = false;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if data_is_printable {
|
|
log_debug!(self.logger, "Got warning message from {}: {}", log_pubkey!(their_node_id), msg.data);
|
|
} else {
|
|
log_debug!(self.logger, "Got warning message from {} with non-ASCII error message", log_pubkey!(their_node_id));
|
|
}
|
|
},
|
|
|
|
wire::Message::Ping(msg) => {
|
|
if msg.ponglen < 65532 {
|
|
let resp = msgs::Pong { byteslen: msg.ponglen };
|
|
self.enqueue_message(&mut *peer_mutex.lock().unwrap(), &resp);
|
|
}
|
|
},
|
|
wire::Message::Pong(_msg) => {
|
|
let mut peer_lock = peer_mutex.lock().unwrap();
|
|
peer_lock.awaiting_pong_timer_tick_intervals = 0;
|
|
peer_lock.msgs_sent_since_pong = 0;
|
|
},
|
|
|
|
// Channel messages:
|
|
wire::Message::OpenChannel(msg) => {
|
|
self.message_handler.chan_handler.handle_open_channel(&their_node_id, &msg);
|
|
},
|
|
wire::Message::AcceptChannel(msg) => {
|
|
self.message_handler.chan_handler.handle_accept_channel(&their_node_id, &msg);
|
|
},
|
|
|
|
wire::Message::FundingCreated(msg) => {
|
|
self.message_handler.chan_handler.handle_funding_created(&their_node_id, &msg);
|
|
},
|
|
wire::Message::FundingSigned(msg) => {
|
|
self.message_handler.chan_handler.handle_funding_signed(&their_node_id, &msg);
|
|
},
|
|
wire::Message::ChannelReady(msg) => {
|
|
self.message_handler.chan_handler.handle_channel_ready(&their_node_id, &msg);
|
|
},
|
|
|
|
wire::Message::Shutdown(msg) => {
|
|
self.message_handler.chan_handler.handle_shutdown(&their_node_id, &msg);
|
|
},
|
|
wire::Message::ClosingSigned(msg) => {
|
|
self.message_handler.chan_handler.handle_closing_signed(&their_node_id, &msg);
|
|
},
|
|
|
|
// Commitment messages:
|
|
wire::Message::UpdateAddHTLC(msg) => {
|
|
self.message_handler.chan_handler.handle_update_add_htlc(&their_node_id, &msg);
|
|
},
|
|
wire::Message::UpdateFulfillHTLC(msg) => {
|
|
self.message_handler.chan_handler.handle_update_fulfill_htlc(&their_node_id, &msg);
|
|
},
|
|
wire::Message::UpdateFailHTLC(msg) => {
|
|
self.message_handler.chan_handler.handle_update_fail_htlc(&their_node_id, &msg);
|
|
},
|
|
wire::Message::UpdateFailMalformedHTLC(msg) => {
|
|
self.message_handler.chan_handler.handle_update_fail_malformed_htlc(&their_node_id, &msg);
|
|
},
|
|
|
|
wire::Message::CommitmentSigned(msg) => {
|
|
self.message_handler.chan_handler.handle_commitment_signed(&their_node_id, &msg);
|
|
},
|
|
wire::Message::RevokeAndACK(msg) => {
|
|
self.message_handler.chan_handler.handle_revoke_and_ack(&their_node_id, &msg);
|
|
},
|
|
wire::Message::UpdateFee(msg) => {
|
|
self.message_handler.chan_handler.handle_update_fee(&their_node_id, &msg);
|
|
},
|
|
wire::Message::ChannelReestablish(msg) => {
|
|
self.message_handler.chan_handler.handle_channel_reestablish(&their_node_id, &msg);
|
|
},
|
|
|
|
// Routing messages:
|
|
wire::Message::AnnouncementSignatures(msg) => {
|
|
self.message_handler.chan_handler.handle_announcement_signatures(&their_node_id, &msg);
|
|
},
|
|
wire::Message::ChannelAnnouncement(msg) => {
|
|
if self.message_handler.route_handler.handle_channel_announcement(&msg)
|
|
.map_err(|e| -> MessageHandlingError { e.into() })? {
|
|
should_forward = Some(wire::Message::ChannelAnnouncement(msg));
|
|
}
|
|
self.update_gossip_backlogged();
|
|
},
|
|
wire::Message::NodeAnnouncement(msg) => {
|
|
if self.message_handler.route_handler.handle_node_announcement(&msg)
|
|
.map_err(|e| -> MessageHandlingError { e.into() })? {
|
|
should_forward = Some(wire::Message::NodeAnnouncement(msg));
|
|
}
|
|
self.update_gossip_backlogged();
|
|
},
|
|
wire::Message::ChannelUpdate(msg) => {
|
|
self.message_handler.chan_handler.handle_channel_update(&their_node_id, &msg);
|
|
if self.message_handler.route_handler.handle_channel_update(&msg)
|
|
.map_err(|e| -> MessageHandlingError { e.into() })? {
|
|
should_forward = Some(wire::Message::ChannelUpdate(msg));
|
|
}
|
|
self.update_gossip_backlogged();
|
|
},
|
|
wire::Message::QueryShortChannelIds(msg) => {
|
|
self.message_handler.route_handler.handle_query_short_channel_ids(&their_node_id, msg)?;
|
|
},
|
|
wire::Message::ReplyShortChannelIdsEnd(msg) => {
|
|
self.message_handler.route_handler.handle_reply_short_channel_ids_end(&their_node_id, msg)?;
|
|
},
|
|
wire::Message::QueryChannelRange(msg) => {
|
|
self.message_handler.route_handler.handle_query_channel_range(&their_node_id, msg)?;
|
|
},
|
|
wire::Message::ReplyChannelRange(msg) => {
|
|
self.message_handler.route_handler.handle_reply_channel_range(&their_node_id, msg)?;
|
|
},
|
|
|
|
// Onion message:
|
|
wire::Message::OnionMessage(msg) => {
|
|
self.message_handler.onion_message_handler.handle_onion_message(&their_node_id, &msg);
|
|
},
|
|
|
|
// Unknown messages:
|
|
wire::Message::Unknown(type_id) if message.is_even() => {
|
|
log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", type_id);
|
|
return Err(PeerHandleError { }.into());
|
|
},
|
|
wire::Message::Unknown(type_id) => {
|
|
log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", type_id);
|
|
},
|
|
wire::Message::Custom(custom) => {
|
|
self.custom_message_handler.handle_custom_message(custom, &their_node_id)?;
|
|
},
|
|
};
|
|
Ok(should_forward)
|
|
}
|
|
|
|
fn forward_broadcast_msg(&self, peers: &HashMap<Descriptor, Mutex<Peer>>, msg: &wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) {
|
|
match msg {
|
|
wire::Message::ChannelAnnouncement(ref msg) => {
|
|
log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}", except_node, msg);
|
|
let encoded_msg = encode_msg!(msg);
|
|
|
|
for (_, peer_mutex) in peers.iter() {
|
|
let mut peer = peer_mutex.lock().unwrap();
|
|
if !peer.handshake_complete() ||
|
|
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
|
|
continue
|
|
}
|
|
debug_assert!(peer.their_node_id.is_some());
|
|
debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
|
|
if peer.buffer_full_drop_gossip_broadcast() {
|
|
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
|
|
continue;
|
|
}
|
|
if let Some((_, their_node_id)) = peer.their_node_id {
|
|
if their_node_id == msg.contents.node_id_1 || their_node_id == msg.contents.node_id_2 {
|
|
continue;
|
|
}
|
|
}
|
|
if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
|
|
continue;
|
|
}
|
|
self.enqueue_encoded_gossip_broadcast(&mut *peer, encoded_msg.clone());
|
|
}
|
|
},
|
|
wire::Message::NodeAnnouncement(ref msg) => {
|
|
log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced node: {:?}", except_node, msg);
|
|
let encoded_msg = encode_msg!(msg);
|
|
|
|
for (_, peer_mutex) in peers.iter() {
|
|
let mut peer = peer_mutex.lock().unwrap();
|
|
if !peer.handshake_complete() ||
|
|
!peer.should_forward_node_announcement(msg.contents.node_id) {
|
|
continue
|
|
}
|
|
debug_assert!(peer.their_node_id.is_some());
|
|
debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
|
|
if peer.buffer_full_drop_gossip_broadcast() {
|
|
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
|
|
continue;
|
|
}
|
|
if let Some((_, their_node_id)) = peer.their_node_id {
|
|
if their_node_id == msg.contents.node_id {
|
|
continue;
|
|
}
|
|
}
|
|
if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
|
|
continue;
|
|
}
|
|
self.enqueue_encoded_gossip_broadcast(&mut *peer, encoded_msg.clone());
|
|
}
|
|
},
|
|
wire::Message::ChannelUpdate(ref msg) => {
|
|
log_gossip!(self.logger, "Sending message to all peers except {:?}: {:?}", except_node, msg);
|
|
let encoded_msg = encode_msg!(msg);
|
|
|
|
for (_, peer_mutex) in peers.iter() {
|
|
let mut peer = peer_mutex.lock().unwrap();
|
|
if !peer.handshake_complete() ||
|
|
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
|
|
continue
|
|
}
|
|
debug_assert!(peer.their_node_id.is_some());
|
|
debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
|
|
if peer.buffer_full_drop_gossip_broadcast() {
|
|
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
|
|
continue;
|
|
}
|
|
if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
|
|
continue;
|
|
}
|
|
self.enqueue_encoded_gossip_broadcast(&mut *peer, encoded_msg.clone());
|
|
}
|
|
},
|
|
_ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),
|
|
}
|
|
}
|
|
|
|
/// Checks for any events generated by our handlers and processes them. Includes sending most
|
|
/// response messages as well as messages generated by calls to handler functions directly (eg
|
|
/// functions like [`ChannelManager::process_pending_htlc_forwards`] or [`send_payment`]).
|
|
///
|
|
/// May call [`send_data`] on [`SocketDescriptor`]s. Thus, be very careful with reentrancy
|
|
/// issues!
|
|
///
|
|
/// You don't have to call this function explicitly if you are using [`lightning-net-tokio`]
|
|
/// or one of the other clients provided in our language bindings.
|
|
///
|
|
/// Note that if there are any other calls to this function waiting on lock(s) this may return
|
|
/// without doing any work. All available events that need handling will be handled before the
|
|
/// other calls return.
|
|
///
|
|
/// [`send_payment`]: crate::ln::channelmanager::ChannelManager::send_payment
|
|
/// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards
|
|
/// [`send_data`]: SocketDescriptor::send_data
|
|
pub fn process_events(&self) {
|
|
let mut _single_processor_lock = self.event_processing_lock.try_lock();
|
|
if _single_processor_lock.is_err() {
|
|
// While we could wake the older sleeper here with a CV and make more even waiting
|
|
// times, that would be a lot of overengineering for a simple "reduce total waiter
|
|
// count" goal.
|
|
match self.blocked_event_processors.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) {
|
|
Err(val) => {
|
|
debug_assert!(val, "compare_exchange failed spuriously?");
|
|
return;
|
|
},
|
|
Ok(val) => {
|
|
debug_assert!(!val, "compare_exchange succeeded spuriously?");
|
|
// We're the only waiter, as the running process_events may have emptied the
|
|
// pending events "long" ago and there are new events for us to process, wait until
|
|
// its done and process any leftover events before returning.
|
|
_single_processor_lock = Ok(self.event_processing_lock.lock().unwrap());
|
|
self.blocked_event_processors.store(false, Ordering::Release);
|
|
}
|
|
}
|
|
}
|
|
|
|
self.update_gossip_backlogged();
|
|
let flush_read_disabled = self.gossip_processing_backlog_lifted.swap(false, Ordering::Relaxed);
|
|
|
|
let mut peers_to_disconnect = HashMap::new();
|
|
let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
|
|
events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
|
|
|
|
{
|
|
// TODO: There are some DoS attacks here where you can flood someone's outbound send
|
|
// buffer by doing things like announcing channels on another node. We should be willing to
|
|
// drop optional-ish messages when send buffers get full!
|
|
|
|
let peers_lock = self.peers.read().unwrap();
|
|
let peers = &*peers_lock;
|
|
macro_rules! get_peer_for_forwarding {
|
|
($node_id: expr) => {
|
|
{
|
|
if peers_to_disconnect.get($node_id).is_some() {
|
|
// If we've "disconnected" this peer, do not send to it.
|
|
continue;
|
|
}
|
|
let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned();
|
|
match descriptor_opt {
|
|
Some(descriptor) => match peers.get(&descriptor) {
|
|
Some(peer_mutex) => {
|
|
let peer_lock = peer_mutex.lock().unwrap();
|
|
if !peer_lock.handshake_complete() {
|
|
continue;
|
|
}
|
|
peer_lock
|
|
},
|
|
None => {
|
|
debug_assert!(false, "Inconsistent peers set state!");
|
|
continue;
|
|
}
|
|
},
|
|
None => {
|
|
continue;
|
|
},
|
|
}
|
|
}
|
|
}
|
|
}
|
|
for event in events_generated.drain(..) {
|
|
match event {
|
|
MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => {
|
|
log_debug!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
|
|
log_pubkey!(node_id),
|
|
log_bytes!(msg.temporary_channel_id));
|
|
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
|
},
|
|
MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
|
|
log_debug!(self.logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
|
|
log_pubkey!(node_id),
|
|
log_bytes!(msg.temporary_channel_id));
|
|
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
|
},
|
|
MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
|
|
log_debug!(self.logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
|
|
log_pubkey!(node_id),
|
|
log_bytes!(msg.temporary_channel_id),
|
|
log_funding_channel_id!(msg.funding_txid, msg.funding_output_index));
|
|
// TODO: If the peer is gone we should generate a DiscardFunding event
|
|
// indicating to the wallet that they should just throw away this funding transaction
|
|
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
|
},
|
|
MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
|
|
log_debug!(self.logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
|
|
log_pubkey!(node_id),
|
|
log_bytes!(msg.channel_id));
|
|
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
|
},
|
|
MessageSendEvent::SendChannelReady { ref node_id, ref msg } => {
|
|
log_debug!(self.logger, "Handling SendChannelReady event in peer_handler for node {} for channel {}",
|
|
log_pubkey!(node_id),
|
|
log_bytes!(msg.channel_id));
|
|
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
|
},
|
|
MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
|
|
log_debug!(self.logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
|
|
log_pubkey!(node_id),
|
|
log_bytes!(msg.channel_id));
|
|
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
|
},
|
|
MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
|
|
log_debug!(self.logger, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
|
|
log_pubkey!(node_id),
|
|
update_add_htlcs.len(),
|
|
update_fulfill_htlcs.len(),
|
|
update_fail_htlcs.len(),
|
|
log_bytes!(commitment_signed.channel_id));
|
|
let mut peer = get_peer_for_forwarding!(node_id);
|
|
for msg in update_add_htlcs {
|
|
self.enqueue_message(&mut *peer, msg);
|
|
}
|
|
for msg in update_fulfill_htlcs {
|
|
self.enqueue_message(&mut *peer, msg);
|
|
}
|
|
for msg in update_fail_htlcs {
|
|
self.enqueue_message(&mut *peer, msg);
|
|
}
|
|
for msg in update_fail_malformed_htlcs {
|
|
self.enqueue_message(&mut *peer, msg);
|
|
}
|
|
if let &Some(ref msg) = update_fee {
|
|
self.enqueue_message(&mut *peer, msg);
|
|
}
|
|
self.enqueue_message(&mut *peer, commitment_signed);
|
|
},
|
|
MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
|
|
log_debug!(self.logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
|
|
log_pubkey!(node_id),
|
|
log_bytes!(msg.channel_id));
|
|
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
|
},
|
|
MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
|
|
log_debug!(self.logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
|
|
log_pubkey!(node_id),
|
|
log_bytes!(msg.channel_id));
|
|
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
|
},
|
|
MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
|
|
log_debug!(self.logger, "Handling Shutdown event in peer_handler for node {} for channel {}",
|
|
log_pubkey!(node_id),
|
|
log_bytes!(msg.channel_id));
|
|
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
|
},
|
|
MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
|
|
log_debug!(self.logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
|
|
log_pubkey!(node_id),
|
|
log_bytes!(msg.channel_id));
|
|
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
|
},
|
|
MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => {
|
|
log_debug!(self.logger, "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}",
|
|
log_pubkey!(node_id),
|
|
msg.contents.short_channel_id);
|
|
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
|
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), update_msg);
|
|
},
|
|
MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => {
|
|
log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
|
|
match self.message_handler.route_handler.handle_channel_announcement(&msg) {
|
|
Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
|
|
self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None),
|
|
_ => {},
|
|
}
|
|
if let Some(msg) = update_msg {
|
|
match self.message_handler.route_handler.handle_channel_update(&msg) {
|
|
Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
|
|
self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None),
|
|
_ => {},
|
|
}
|
|
}
|
|
},
|
|
MessageSendEvent::BroadcastChannelUpdate { msg } => {
|
|
log_debug!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id);
|
|
match self.message_handler.route_handler.handle_channel_update(&msg) {
|
|
Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
|
|
self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None),
|
|
_ => {},
|
|
}
|
|
},
|
|
MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
|
|
log_debug!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}", msg.contents.node_id);
|
|
match self.message_handler.route_handler.handle_node_announcement(&msg) {
|
|
Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
|
|
self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None),
|
|
_ => {},
|
|
}
|
|
},
|
|
MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => {
|
|
log_trace!(self.logger, "Handling SendChannelUpdate event in peer_handler for node {} for channel {}",
|
|
log_pubkey!(node_id), msg.contents.short_channel_id);
|
|
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
|
},
|
|
MessageSendEvent::HandleError { ref node_id, ref action } => {
|
|
match *action {
|
|
msgs::ErrorAction::DisconnectPeer { ref msg } => {
|
|
// We do not have the peers write lock, so we just store that we're
|
|
// about to disconenct the peer and do it after we finish
|
|
// processing most messages.
|
|
peers_to_disconnect.insert(*node_id, msg.clone());
|
|
},
|
|
msgs::ErrorAction::IgnoreAndLog(level) => {
|
|
log_given_level!(self.logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
|
|
},
|
|
msgs::ErrorAction::IgnoreDuplicateGossip => {},
|
|
msgs::ErrorAction::IgnoreError => {
|
|
log_debug!(self.logger, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
|
|
},
|
|
msgs::ErrorAction::SendErrorMessage { ref msg } => {
|
|
log_trace!(self.logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
|
|
log_pubkey!(node_id),
|
|
msg.data);
|
|
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
|
},
|
|
msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => {
|
|
log_given_level!(self.logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}",
|
|
log_pubkey!(node_id),
|
|
msg.data);
|
|
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
|
},
|
|
}
|
|
},
|
|
MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => {
|
|
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
|
},
|
|
MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => {
|
|
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
|
}
|
|
MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => {
|
|
log_gossip!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
|
|
log_pubkey!(node_id),
|
|
msg.short_channel_ids.len(),
|
|
msg.first_blocknum,
|
|
msg.number_of_blocks,
|
|
msg.sync_complete);
|
|
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
|
}
|
|
MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => {
|
|
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
|
|
}
|
|
}
|
|
}
|
|
|
|
for (node_id, msg) in self.custom_message_handler.get_and_clear_pending_msg() {
|
|
if peers_to_disconnect.get(&node_id).is_some() { continue; }
|
|
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg);
|
|
}
|
|
|
|
for (descriptor, peer_mutex) in peers.iter() {
|
|
let mut peer = peer_mutex.lock().unwrap();
|
|
if flush_read_disabled { peer.received_channel_announce_since_backlogged = false; }
|
|
self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer, flush_read_disabled);
|
|
}
|
|
}
|
|
if !peers_to_disconnect.is_empty() {
|
|
let mut peers_lock = self.peers.write().unwrap();
|
|
let peers = &mut *peers_lock;
|
|
for (node_id, msg) in peers_to_disconnect.drain() {
|
|
// Note that since we are holding the peers *write* lock we can
|
|
// remove from node_id_to_descriptor immediately (as no other
|
|
// thread can be holding the peer lock if we have the global write
|
|
// lock).
|
|
|
|
let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
|
|
if let Some(mut descriptor) = descriptor_opt {
|
|
if let Some(peer_mutex) = peers.remove(&descriptor) {
|
|
let mut peer = peer_mutex.lock().unwrap();
|
|
if let Some(msg) = msg {
|
|
log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
|
|
log_pubkey!(node_id),
|
|
msg.data);
|
|
self.enqueue_message(&mut *peer, &msg);
|
|
// This isn't guaranteed to work, but if there is enough free
|
|
// room in the send buffer, put the error message there...
|
|
self.do_attempt_write_data(&mut descriptor, &mut *peer, false);
|
|
}
|
|
self.do_disconnect(descriptor, &*peer, "DisconnectPeer HandleError");
|
|
} else { debug_assert!(false, "Missing connection for peer"); }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Indicates that the given socket descriptor's connection is now closed.
|
|
pub fn socket_disconnected(&self, descriptor: &Descriptor) {
|
|
self.disconnect_event_internal(descriptor);
|
|
}
|
|
|
|
fn do_disconnect(&self, mut descriptor: Descriptor, peer: &Peer, reason: &'static str) {
|
|
if !peer.handshake_complete() {
|
|
log_trace!(self.logger, "Disconnecting peer which hasn't completed handshake due to {}", reason);
|
|
descriptor.disconnect_socket();
|
|
return;
|
|
}
|
|
|
|
debug_assert!(peer.their_node_id.is_some());
|
|
if let Some((node_id, _)) = peer.their_node_id {
|
|
log_trace!(self.logger, "Disconnecting peer with id {} due to {}", node_id, reason);
|
|
self.message_handler.chan_handler.peer_disconnected(&node_id);
|
|
self.message_handler.onion_message_handler.peer_disconnected(&node_id);
|
|
}
|
|
descriptor.disconnect_socket();
|
|
}
|
|
|
|
fn disconnect_event_internal(&self, descriptor: &Descriptor) {
|
|
let mut peers = self.peers.write().unwrap();
|
|
let peer_option = peers.remove(descriptor);
|
|
match peer_option {
|
|
None => {
|
|
// This is most likely a simple race condition where the user found that the socket
|
|
// was disconnected, then we told the user to `disconnect_socket()`, then they
|
|
// called this method. Either way we're disconnected, return.
|
|
},
|
|
Some(peer_lock) => {
|
|
let peer = peer_lock.lock().unwrap();
|
|
if let Some((node_id, _)) = peer.their_node_id {
|
|
log_trace!(self.logger, "Handling disconnection of peer {}", log_pubkey!(node_id));
|
|
let removed = self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
|
|
debug_assert!(removed.is_some(), "descriptor maps should be consistent");
|
|
if !peer.handshake_complete() { return; }
|
|
self.message_handler.chan_handler.peer_disconnected(&node_id);
|
|
self.message_handler.onion_message_handler.peer_disconnected(&node_id);
|
|
}
|
|
}
|
|
};
|
|
}
|
|
|
|
/// Disconnect a peer given its node id.
|
|
///
|
|
/// If a peer is connected, this will call [`disconnect_socket`] on the descriptor for the
|
|
/// peer. Thus, be very careful about reentrancy issues.
|
|
///
|
|
/// [`disconnect_socket`]: SocketDescriptor::disconnect_socket
|
|
pub fn disconnect_by_node_id(&self, node_id: PublicKey) {
|
|
let mut peers_lock = self.peers.write().unwrap();
|
|
if let Some(descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(&node_id) {
|
|
let peer_opt = peers_lock.remove(&descriptor);
|
|
if let Some(peer_mutex) = peer_opt {
|
|
self.do_disconnect(descriptor, &*peer_mutex.lock().unwrap(), "client request");
|
|
} else { debug_assert!(false, "node_id_to_descriptor thought we had a peer"); }
|
|
}
|
|
}
|
|
|
|
/// Disconnects all currently-connected peers. This is useful on platforms where there may be
|
|
/// an indication that TCP sockets have stalled even if we weren't around to time them out
|
|
/// using regular ping/pongs.
|
|
pub fn disconnect_all_peers(&self) {
|
|
let mut peers_lock = self.peers.write().unwrap();
|
|
self.node_id_to_descriptor.lock().unwrap().clear();
|
|
let peers = &mut *peers_lock;
|
|
for (descriptor, peer_mutex) in peers.drain() {
|
|
self.do_disconnect(descriptor, &*peer_mutex.lock().unwrap(), "client request to disconnect all peers");
|
|
}
|
|
}
|
|
|
|
/// This is called when we're blocked on sending additional gossip messages until we receive a
|
|
/// pong. If we aren't waiting on a pong, we take this opportunity to send a ping (setting
|
|
/// `awaiting_pong_timer_tick_intervals` to a special flag value to indicate this).
|
|
fn maybe_send_extra_ping(&self, peer: &mut Peer) {
|
|
if peer.awaiting_pong_timer_tick_intervals == 0 {
|
|
peer.awaiting_pong_timer_tick_intervals = -1;
|
|
let ping = msgs::Ping {
|
|
ponglen: 0,
|
|
byteslen: 64,
|
|
};
|
|
self.enqueue_message(peer, &ping);
|
|
}
|
|
}
|
|
|
|
/// Send pings to each peer and disconnect those which did not respond to the last round of
|
|
/// pings.
|
|
///
|
|
/// This may be called on any timescale you want, however, roughly once every ten seconds is
|
|
/// preferred. The call rate determines both how often we send a ping to our peers and how much
|
|
/// time they have to respond before we disconnect them.
|
|
///
|
|
/// May call [`send_data`] on all [`SocketDescriptor`]s. Thus, be very careful with reentrancy
|
|
/// issues!
|
|
///
|
|
/// [`send_data`]: SocketDescriptor::send_data
|
|
pub fn timer_tick_occurred(&self) {
|
|
let mut descriptors_needing_disconnect = Vec::new();
|
|
{
|
|
let peers_lock = self.peers.read().unwrap();
|
|
|
|
self.update_gossip_backlogged();
|
|
let flush_read_disabled = self.gossip_processing_backlog_lifted.swap(false, Ordering::Relaxed);
|
|
|
|
for (descriptor, peer_mutex) in peers_lock.iter() {
|
|
let mut peer = peer_mutex.lock().unwrap();
|
|
if flush_read_disabled { peer.received_channel_announce_since_backlogged = false; }
|
|
|
|
if !peer.handshake_complete() {
|
|
// The peer needs to complete its handshake before we can exchange messages. We
|
|
// give peers one timer tick to complete handshake, reusing
|
|
// `awaiting_pong_timer_tick_intervals` to track number of timer ticks taken
|
|
// for handshake completion.
|
|
if peer.awaiting_pong_timer_tick_intervals != 0 {
|
|
descriptors_needing_disconnect.push(descriptor.clone());
|
|
} else {
|
|
peer.awaiting_pong_timer_tick_intervals = 1;
|
|
}
|
|
continue;
|
|
}
|
|
debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
|
|
debug_assert!(peer.their_node_id.is_some());
|
|
|
|
loop { // Used as a `goto` to skip writing a Ping message.
|
|
if peer.awaiting_pong_timer_tick_intervals == -1 {
|
|
// Magic value set in `maybe_send_extra_ping`.
|
|
peer.awaiting_pong_timer_tick_intervals = 1;
|
|
peer.received_message_since_timer_tick = false;
|
|
break;
|
|
}
|
|
|
|
if (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick)
|
|
|| peer.awaiting_pong_timer_tick_intervals as u64 >
|
|
MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64
|
|
{
|
|
descriptors_needing_disconnect.push(descriptor.clone());
|
|
break;
|
|
}
|
|
peer.received_message_since_timer_tick = false;
|
|
|
|
if peer.awaiting_pong_timer_tick_intervals > 0 {
|
|
peer.awaiting_pong_timer_tick_intervals += 1;
|
|
break;
|
|
}
|
|
|
|
peer.awaiting_pong_timer_tick_intervals = 1;
|
|
let ping = msgs::Ping {
|
|
ponglen: 0,
|
|
byteslen: 64,
|
|
};
|
|
self.enqueue_message(&mut *peer, &ping);
|
|
break;
|
|
}
|
|
self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer, flush_read_disabled);
|
|
}
|
|
}
|
|
|
|
if !descriptors_needing_disconnect.is_empty() {
|
|
{
|
|
let mut peers_lock = self.peers.write().unwrap();
|
|
for descriptor in descriptors_needing_disconnect {
|
|
if let Some(peer_mutex) = peers_lock.remove(&descriptor) {
|
|
let peer = peer_mutex.lock().unwrap();
|
|
if let Some((node_id, _)) = peer.their_node_id {
|
|
self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
|
|
}
|
|
self.do_disconnect(descriptor, &*peer, "ping timeout");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
// Messages of up to 64KB should never end up more than half full with addresses, as that would
|
|
// be absurd. We ensure this by checking that at least 100 (our stated public contract on when
|
|
// broadcast_node_announcement panics) of the maximum-length addresses would fit in a 64KB
|
|
// message...
|
|
const HALF_MESSAGE_IS_ADDRS: u32 = ::core::u16::MAX as u32 / (NetAddress::MAX_LEN as u32 + 1) / 2;
|
|
#[deny(const_err)]
|
|
#[allow(dead_code)]
|
|
// ...by failing to compile if the number of addresses that would be half of a message is
|
|
// smaller than 100:
|
|
const STATIC_ASSERT: u32 = Self::HALF_MESSAGE_IS_ADDRS - 100;
|
|
|
|
/// Generates a signed node_announcement from the given arguments, sending it to all connected
|
|
/// peers. Note that peers will likely ignore this message unless we have at least one public
|
|
/// channel which has at least six confirmations on-chain.
|
|
///
|
|
/// `rgb` is a node "color" and `alias` is a printable human-readable string to describe this
|
|
/// node to humans. They carry no in-protocol meaning.
|
|
///
|
|
/// `addresses` represent the set (possibly empty) of socket addresses on which this node
|
|
/// accepts incoming connections. These will be included in the node_announcement, publicly
|
|
/// tying these addresses together and to this node. If you wish to preserve user privacy,
|
|
/// addresses should likely contain only Tor Onion addresses.
|
|
///
|
|
/// Panics if `addresses` is absurdly large (more than 100).
|
|
///
|
|
/// [`get_and_clear_pending_msg_events`]: MessageSendEventsProvider::get_and_clear_pending_msg_events
|
|
pub fn broadcast_node_announcement(&self, rgb: [u8; 3], alias: [u8; 32], mut addresses: Vec<NetAddress>) {
|
|
if addresses.len() > 100 {
|
|
panic!("More than half the message size was taken up by public addresses!");
|
|
}
|
|
|
|
// While all existing nodes handle unsorted addresses just fine, the spec requires that
|
|
// addresses be sorted for future compatibility.
|
|
addresses.sort_by_key(|addr| addr.get_id());
|
|
|
|
let features = self.message_handler.chan_handler.provided_node_features()
|
|
.or(self.message_handler.route_handler.provided_node_features())
|
|
.or(self.message_handler.onion_message_handler.provided_node_features());
|
|
let announcement = msgs::UnsignedNodeAnnouncement {
|
|
features,
|
|
timestamp: self.last_node_announcement_serial.fetch_add(1, Ordering::AcqRel),
|
|
node_id: NodeId::from_pubkey(&self.node_signer.get_node_id(Recipient::Node).unwrap()),
|
|
rgb, alias, addresses,
|
|
excess_address_data: Vec::new(),
|
|
excess_data: Vec::new(),
|
|
};
|
|
let node_announce_sig = match self.node_signer.sign_gossip_message(
|
|
msgs::UnsignedGossipMessage::NodeAnnouncement(&announcement)
|
|
) {
|
|
Ok(sig) => sig,
|
|
Err(_) => {
|
|
log_error!(self.logger, "Failed to generate signature for node_announcement");
|
|
return;
|
|
},
|
|
};
|
|
|
|
let msg = msgs::NodeAnnouncement {
|
|
signature: node_announce_sig,
|
|
contents: announcement
|
|
};
|
|
|
|
log_debug!(self.logger, "Broadcasting NodeAnnouncement after passing it to our own RoutingMessageHandler.");
|
|
let _ = self.message_handler.route_handler.handle_node_announcement(&msg);
|
|
self.forward_broadcast_msg(&*self.peers.read().unwrap(), &wire::Message::NodeAnnouncement(msg), None);
|
|
}
|
|
}
|
|
|
|
fn is_gossip_msg(type_id: u16) -> bool {
|
|
match type_id {
|
|
msgs::ChannelAnnouncement::TYPE |
|
|
msgs::ChannelUpdate::TYPE |
|
|
msgs::NodeAnnouncement::TYPE |
|
|
msgs::QueryChannelRange::TYPE |
|
|
msgs::ReplyChannelRange::TYPE |
|
|
msgs::QueryShortChannelIds::TYPE |
|
|
msgs::ReplyShortChannelIdsEnd::TYPE => true,
|
|
_ => false
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use crate::chain::keysinterface::{NodeSigner, Recipient};
|
|
use crate::ln::peer_channel_encryptor::PeerChannelEncryptor;
|
|
use crate::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses};
|
|
use crate::ln::{msgs, wire};
|
|
use crate::ln::msgs::NetAddress;
|
|
use crate::util::events;
|
|
use crate::util::test_utils;
|
|
|
|
use bitcoin::secp256k1::SecretKey;
|
|
|
|
use crate::prelude::*;
|
|
use crate::sync::{Arc, Mutex};
|
|
use core::sync::atomic::{AtomicBool, Ordering};
|
|
|
|
#[derive(Clone)]
|
|
struct FileDescriptor {
|
|
fd: u16,
|
|
outbound_data: Arc<Mutex<Vec<u8>>>,
|
|
disconnect: Arc<AtomicBool>,
|
|
}
|
|
impl PartialEq for FileDescriptor {
|
|
fn eq(&self, other: &Self) -> bool {
|
|
self.fd == other.fd
|
|
}
|
|
}
|
|
impl Eq for FileDescriptor { }
|
|
impl core::hash::Hash for FileDescriptor {
|
|
fn hash<H: core::hash::Hasher>(&self, hasher: &mut H) {
|
|
self.fd.hash(hasher)
|
|
}
|
|
}
|
|
|
|
impl SocketDescriptor for FileDescriptor {
|
|
fn send_data(&mut self, data: &[u8], _resume_read: bool) -> usize {
|
|
self.outbound_data.lock().unwrap().extend_from_slice(data);
|
|
data.len()
|
|
}
|
|
|
|
fn disconnect_socket(&mut self) { self.disconnect.store(true, Ordering::Release); }
|
|
}
|
|
|
|
struct PeerManagerCfg {
|
|
chan_handler: test_utils::TestChannelMessageHandler,
|
|
routing_handler: test_utils::TestRoutingMessageHandler,
|
|
logger: test_utils::TestLogger,
|
|
node_signer: test_utils::TestNodeSigner,
|
|
}
|
|
|
|
fn create_peermgr_cfgs(peer_count: usize) -> Vec<PeerManagerCfg> {
|
|
let mut cfgs = Vec::new();
|
|
for i in 0..peer_count {
|
|
let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap();
|
|
cfgs.push(
|
|
PeerManagerCfg{
|
|
chan_handler: test_utils::TestChannelMessageHandler::new(),
|
|
logger: test_utils::TestLogger::new(),
|
|
routing_handler: test_utils::TestRoutingMessageHandler::new(),
|
|
node_signer: test_utils::TestNodeSigner::new(node_secret),
|
|
}
|
|
);
|
|
}
|
|
|
|
cfgs
|
|
}
|
|
|
|
fn create_network<'a>(peer_count: usize, cfgs: &'a Vec<PeerManagerCfg>) -> Vec<PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler, &'a test_utils::TestNodeSigner>> {
|
|
let mut peers = Vec::new();
|
|
for i in 0..peer_count {
|
|
let ephemeral_bytes = [i as u8; 32];
|
|
let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler, onion_message_handler: IgnoringMessageHandler {} };
|
|
let peer = PeerManager::new(msg_handler, 0, &ephemeral_bytes, &cfgs[i].logger, IgnoringMessageHandler {}, &cfgs[i].node_signer);
|
|
peers.push(peer);
|
|
}
|
|
|
|
peers
|
|
}
|
|
|
|
fn establish_connection<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler, &'a test_utils::TestNodeSigner>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler, &'a test_utils::TestNodeSigner>) -> (FileDescriptor, FileDescriptor) {
|
|
let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap();
|
|
let mut fd_a = FileDescriptor {
|
|
fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
|
|
disconnect: Arc::new(AtomicBool::new(false)),
|
|
};
|
|
let addr_a = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1000};
|
|
let id_b = peer_b.node_signer.get_node_id(Recipient::Node).unwrap();
|
|
let mut fd_b = FileDescriptor {
|
|
fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
|
|
disconnect: Arc::new(AtomicBool::new(false)),
|
|
};
|
|
let addr_b = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1001};
|
|
let initial_data = peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
|
|
peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
|
|
assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
|
|
peer_a.process_events();
|
|
|
|
let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
|
|
assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false);
|
|
|
|
peer_b.process_events();
|
|
let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
|
|
assert_eq!(peer_a.read_event(&mut fd_a, &b_data).unwrap(), false);
|
|
|
|
peer_a.process_events();
|
|
let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
|
|
assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false);
|
|
|
|
assert!(peer_a.get_peer_node_ids().contains(&(id_b, Some(addr_b))));
|
|
assert!(peer_b.get_peer_node_ids().contains(&(id_a, Some(addr_a))));
|
|
|
|
(fd_a.clone(), fd_b.clone())
|
|
}
|
|
|
|
#[test]
|
|
#[cfg(feature = "std")]
|
|
fn fuzz_threaded_connections() {
|
|
// Spawn two threads which repeatedly connect two peers together, leading to "got second
|
|
// connection with peer" disconnections and rapid reconnect. This previously found an issue
|
|
// with our internal map consistency, and is a generally good smoke test of disconnection.
|
|
let cfgs = Arc::new(create_peermgr_cfgs(2));
|
|
// Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }.
|
|
let peers = Arc::new(create_network(2, unsafe { &*(&*cfgs as *const _) as &'static _ }));
|
|
|
|
let start_time = std::time::Instant::now();
|
|
macro_rules! spawn_thread { ($id: expr) => { {
|
|
let peers = Arc::clone(&peers);
|
|
let cfgs = Arc::clone(&cfgs);
|
|
std::thread::spawn(move || {
|
|
let mut ctr = 0;
|
|
while start_time.elapsed() < std::time::Duration::from_secs(1) {
|
|
let id_a = peers[0].node_signer.get_node_id(Recipient::Node).unwrap();
|
|
let mut fd_a = FileDescriptor {
|
|
fd: $id + ctr * 3, outbound_data: Arc::new(Mutex::new(Vec::new())),
|
|
disconnect: Arc::new(AtomicBool::new(false)),
|
|
};
|
|
let addr_a = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1000};
|
|
let mut fd_b = FileDescriptor {
|
|
fd: $id + ctr * 3, outbound_data: Arc::new(Mutex::new(Vec::new())),
|
|
disconnect: Arc::new(AtomicBool::new(false)),
|
|
};
|
|
let addr_b = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1001};
|
|
let initial_data = peers[1].new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
|
|
peers[0].new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
|
|
if peers[0].read_event(&mut fd_a, &initial_data).is_err() { break; }
|
|
|
|
while start_time.elapsed() < std::time::Duration::from_secs(1) {
|
|
peers[0].process_events();
|
|
if fd_a.disconnect.load(Ordering::Acquire) { break; }
|
|
let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
|
|
if peers[1].read_event(&mut fd_b, &a_data).is_err() { break; }
|
|
|
|
peers[1].process_events();
|
|
if fd_b.disconnect.load(Ordering::Acquire) { break; }
|
|
let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
|
|
if peers[0].read_event(&mut fd_a, &b_data).is_err() { break; }
|
|
|
|
cfgs[0].chan_handler.pending_events.lock().unwrap()
|
|
.push(crate::util::events::MessageSendEvent::SendShutdown {
|
|
node_id: peers[1].node_signer.get_node_id(Recipient::Node).unwrap(),
|
|
msg: msgs::Shutdown {
|
|
channel_id: [0; 32],
|
|
scriptpubkey: bitcoin::Script::new(),
|
|
},
|
|
});
|
|
cfgs[1].chan_handler.pending_events.lock().unwrap()
|
|
.push(crate::util::events::MessageSendEvent::SendShutdown {
|
|
node_id: peers[0].node_signer.get_node_id(Recipient::Node).unwrap(),
|
|
msg: msgs::Shutdown {
|
|
channel_id: [0; 32],
|
|
scriptpubkey: bitcoin::Script::new(),
|
|
},
|
|
});
|
|
|
|
if ctr % 2 == 0 {
|
|
peers[0].timer_tick_occurred();
|
|
peers[1].timer_tick_occurred();
|
|
}
|
|
}
|
|
|
|
peers[0].socket_disconnected(&fd_a);
|
|
peers[1].socket_disconnected(&fd_b);
|
|
ctr += 1;
|
|
std::thread::sleep(std::time::Duration::from_micros(1));
|
|
}
|
|
})
|
|
} } }
|
|
let thrd_a = spawn_thread!(1);
|
|
let thrd_b = spawn_thread!(2);
|
|
|
|
thrd_a.join().unwrap();
|
|
thrd_b.join().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn test_disconnect_peer() {
|
|
// Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
|
|
// push a DisconnectPeer event to remove the node flagged by id
|
|
let cfgs = create_peermgr_cfgs(2);
|
|
let peers = create_network(2, &cfgs);
|
|
establish_connection(&peers[0], &peers[1]);
|
|
assert_eq!(peers[0].peers.read().unwrap().len(), 1);
|
|
|
|
let their_id = peers[1].node_signer.get_node_id(Recipient::Node).unwrap();
|
|
cfgs[0].chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::HandleError {
|
|
node_id: their_id,
|
|
action: msgs::ErrorAction::DisconnectPeer { msg: None },
|
|
});
|
|
|
|
peers[0].process_events();
|
|
assert_eq!(peers[0].peers.read().unwrap().len(), 0);
|
|
}
|
|
|
|
#[test]
|
|
fn test_send_simple_msg() {
|
|
// Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
|
|
// push a message from one peer to another.
|
|
let cfgs = create_peermgr_cfgs(2);
|
|
let a_chan_handler = test_utils::TestChannelMessageHandler::new();
|
|
let b_chan_handler = test_utils::TestChannelMessageHandler::new();
|
|
let mut peers = create_network(2, &cfgs);
|
|
let (fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
|
|
assert_eq!(peers[0].peers.read().unwrap().len(), 1);
|
|
|
|
let their_id = peers[1].node_signer.get_node_id(Recipient::Node).unwrap();
|
|
|
|
let msg = msgs::Shutdown { channel_id: [42; 32], scriptpubkey: bitcoin::Script::new() };
|
|
a_chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::SendShutdown {
|
|
node_id: their_id, msg: msg.clone()
|
|
});
|
|
peers[0].message_handler.chan_handler = &a_chan_handler;
|
|
|
|
b_chan_handler.expect_receive_msg(wire::Message::Shutdown(msg));
|
|
peers[1].message_handler.chan_handler = &b_chan_handler;
|
|
|
|
peers[0].process_events();
|
|
|
|
let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
|
|
assert_eq!(peers[1].read_event(&mut fd_b, &a_data).unwrap(), false);
|
|
}
|
|
|
|
#[test]
|
|
fn test_non_init_first_msg() {
|
|
// Simple test of the first message received over a connection being something other than
|
|
// Init. This results in an immediate disconnection, which previously included a spurious
|
|
// peer_disconnected event handed to event handlers (which would panic in
|
|
// `TestChannelMessageHandler` here).
|
|
let cfgs = create_peermgr_cfgs(2);
|
|
let peers = create_network(2, &cfgs);
|
|
|
|
let mut fd_dup = FileDescriptor {
|
|
fd: 3, outbound_data: Arc::new(Mutex::new(Vec::new())),
|
|
disconnect: Arc::new(AtomicBool::new(false)),
|
|
};
|
|
let addr_dup = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1003};
|
|
let id_a = cfgs[0].node_signer.get_node_id(Recipient::Node).unwrap();
|
|
peers[0].new_inbound_connection(fd_dup.clone(), Some(addr_dup.clone())).unwrap();
|
|
|
|
let mut dup_encryptor = PeerChannelEncryptor::new_outbound(id_a, SecretKey::from_slice(&[42; 32]).unwrap());
|
|
let initial_data = dup_encryptor.get_act_one(&peers[1].secp_ctx);
|
|
assert_eq!(peers[0].read_event(&mut fd_dup, &initial_data).unwrap(), false);
|
|
peers[0].process_events();
|
|
|
|
let a_data = fd_dup.outbound_data.lock().unwrap().split_off(0);
|
|
let (act_three, _) =
|
|
dup_encryptor.process_act_two(&a_data[..], &&cfgs[1].node_signer).unwrap();
|
|
assert_eq!(peers[0].read_event(&mut fd_dup, &act_three).unwrap(), false);
|
|
|
|
let not_init_msg = msgs::Ping { ponglen: 4, byteslen: 0 };
|
|
let msg_bytes = dup_encryptor.encrypt_message(¬_init_msg);
|
|
assert!(peers[0].read_event(&mut fd_dup, &msg_bytes).is_err());
|
|
}
|
|
|
|
#[test]
|
|
fn test_disconnect_all_peer() {
|
|
// Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
|
|
// then calls disconnect_all_peers
|
|
let cfgs = create_peermgr_cfgs(2);
|
|
let peers = create_network(2, &cfgs);
|
|
establish_connection(&peers[0], &peers[1]);
|
|
assert_eq!(peers[0].peers.read().unwrap().len(), 1);
|
|
|
|
peers[0].disconnect_all_peers();
|
|
assert_eq!(peers[0].peers.read().unwrap().len(), 0);
|
|
}
|
|
|
|
#[test]
|
|
fn test_timer_tick_occurred() {
|
|
// Create peers, a vector of two peer managers, perform initial set up and check that peers[0] has one Peer.
|
|
let cfgs = create_peermgr_cfgs(2);
|
|
let peers = create_network(2, &cfgs);
|
|
establish_connection(&peers[0], &peers[1]);
|
|
assert_eq!(peers[0].peers.read().unwrap().len(), 1);
|
|
|
|
// peers[0] awaiting_pong is set to true, but the Peer is still connected
|
|
peers[0].timer_tick_occurred();
|
|
peers[0].process_events();
|
|
assert_eq!(peers[0].peers.read().unwrap().len(), 1);
|
|
|
|
// Since timer_tick_occurred() is called again when awaiting_pong is true, all Peers are disconnected
|
|
peers[0].timer_tick_occurred();
|
|
peers[0].process_events();
|
|
assert_eq!(peers[0].peers.read().unwrap().len(), 0);
|
|
}
|
|
|
|
#[test]
|
|
fn test_do_attempt_write_data() {
|
|
// Create 2 peers with custom TestRoutingMessageHandlers and connect them.
|
|
let cfgs = create_peermgr_cfgs(2);
|
|
cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
|
|
cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
|
|
let peers = create_network(2, &cfgs);
|
|
|
|
// By calling establish_connect, we trigger do_attempt_write_data between
|
|
// the peers. Previously this function would mistakenly enter an infinite loop
|
|
// when there were more channel messages available than could fit into a peer's
|
|
// buffer. This issue would now be detected by this test (because we use custom
|
|
// RoutingMessageHandlers that intentionally return more channel messages
|
|
// than can fit into a peer's buffer).
|
|
let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
|
|
|
|
// Make each peer to read the messages that the other peer just wrote to them. Note that
|
|
// due to the max-message-before-ping limits this may take a few iterations to complete.
|
|
for _ in 0..150/super::BUFFER_DRAIN_MSGS_PER_TICK + 1 {
|
|
peers[1].process_events();
|
|
let a_read_data = fd_b.outbound_data.lock().unwrap().split_off(0);
|
|
assert!(!a_read_data.is_empty());
|
|
|
|
peers[0].read_event(&mut fd_a, &a_read_data).unwrap();
|
|
peers[0].process_events();
|
|
|
|
let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0);
|
|
assert!(!b_read_data.is_empty());
|
|
peers[1].read_event(&mut fd_b, &b_read_data).unwrap();
|
|
|
|
peers[0].process_events();
|
|
assert_eq!(fd_a.outbound_data.lock().unwrap().len(), 0, "Until A receives data, it shouldn't send more messages");
|
|
}
|
|
|
|
// Check that each peer has received the expected number of channel updates and channel
|
|
// announcements.
|
|
assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108);
|
|
assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54);
|
|
assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108);
|
|
assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54);
|
|
}
|
|
|
|
#[test]
|
|
fn test_handshake_timeout() {
|
|
// Tests that we time out a peer still waiting on handshake completion after a full timer
|
|
// tick.
|
|
let cfgs = create_peermgr_cfgs(2);
|
|
cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
|
|
cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
|
|
let peers = create_network(2, &cfgs);
|
|
|
|
let a_id = peers[0].node_signer.get_node_id(Recipient::Node).unwrap();
|
|
let mut fd_a = FileDescriptor {
|
|
fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
|
|
disconnect: Arc::new(AtomicBool::new(false)),
|
|
};
|
|
let mut fd_b = FileDescriptor {
|
|
fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
|
|
disconnect: Arc::new(AtomicBool::new(false)),
|
|
};
|
|
let initial_data = peers[1].new_outbound_connection(a_id, fd_b.clone(), None).unwrap();
|
|
peers[0].new_inbound_connection(fd_a.clone(), None).unwrap();
|
|
|
|
// If we get a single timer tick before completion, that's fine
|
|
assert_eq!(peers[0].peers.read().unwrap().len(), 1);
|
|
peers[0].timer_tick_occurred();
|
|
assert_eq!(peers[0].peers.read().unwrap().len(), 1);
|
|
|
|
assert_eq!(peers[0].read_event(&mut fd_a, &initial_data).unwrap(), false);
|
|
peers[0].process_events();
|
|
let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
|
|
assert_eq!(peers[1].read_event(&mut fd_b, &a_data).unwrap(), false);
|
|
peers[1].process_events();
|
|
|
|
// ...but if we get a second timer tick, we should disconnect the peer
|
|
peers[0].timer_tick_occurred();
|
|
assert_eq!(peers[0].peers.read().unwrap().len(), 0);
|
|
|
|
let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
|
|
assert!(peers[0].read_event(&mut fd_a, &b_data).is_err());
|
|
}
|
|
|
|
#[test]
|
|
fn test_filter_addresses(){
|
|
// Tests the filter_addresses function.
|
|
|
|
// For (10/8)
|
|
let ip_address = NetAddress::IPv4{addr: [10, 0, 0, 0], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
let ip_address = NetAddress::IPv4{addr: [10, 0, 255, 201], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
let ip_address = NetAddress::IPv4{addr: [10, 255, 255, 255], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
|
|
// For (0/8)
|
|
let ip_address = NetAddress::IPv4{addr: [0, 0, 0, 0], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
let ip_address = NetAddress::IPv4{addr: [0, 0, 255, 187], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
let ip_address = NetAddress::IPv4{addr: [0, 255, 255, 255], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
|
|
// For (100.64/10)
|
|
let ip_address = NetAddress::IPv4{addr: [100, 64, 0, 0], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
let ip_address = NetAddress::IPv4{addr: [100, 78, 255, 0], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
let ip_address = NetAddress::IPv4{addr: [100, 127, 255, 255], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
|
|
// For (127/8)
|
|
let ip_address = NetAddress::IPv4{addr: [127, 0, 0, 0], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
let ip_address = NetAddress::IPv4{addr: [127, 65, 73, 0], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
let ip_address = NetAddress::IPv4{addr: [127, 255, 255, 255], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
|
|
// For (169.254/16)
|
|
let ip_address = NetAddress::IPv4{addr: [169, 254, 0, 0], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
let ip_address = NetAddress::IPv4{addr: [169, 254, 221, 101], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
let ip_address = NetAddress::IPv4{addr: [169, 254, 255, 255], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
|
|
// For (172.16/12)
|
|
let ip_address = NetAddress::IPv4{addr: [172, 16, 0, 0], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
let ip_address = NetAddress::IPv4{addr: [172, 27, 101, 23], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
let ip_address = NetAddress::IPv4{addr: [172, 31, 255, 255], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
|
|
// For (192.168/16)
|
|
let ip_address = NetAddress::IPv4{addr: [192, 168, 0, 0], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
let ip_address = NetAddress::IPv4{addr: [192, 168, 205, 159], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
let ip_address = NetAddress::IPv4{addr: [192, 168, 255, 255], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
|
|
// For (192.88.99/24)
|
|
let ip_address = NetAddress::IPv4{addr: [192, 88, 99, 0], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
let ip_address = NetAddress::IPv4{addr: [192, 88, 99, 140], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
let ip_address = NetAddress::IPv4{addr: [192, 88, 99, 255], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
|
|
// For other IPv4 addresses
|
|
let ip_address = NetAddress::IPv4{addr: [188, 255, 99, 0], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone()));
|
|
let ip_address = NetAddress::IPv4{addr: [123, 8, 129, 14], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone()));
|
|
let ip_address = NetAddress::IPv4{addr: [2, 88, 9, 255], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone()));
|
|
|
|
// For (2000::/3)
|
|
let ip_address = NetAddress::IPv6{addr: [32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone()));
|
|
let ip_address = NetAddress::IPv6{addr: [45, 34, 209, 190, 0, 123, 55, 34, 0, 0, 3, 27, 201, 0, 0, 0], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone()));
|
|
let ip_address = NetAddress::IPv6{addr: [63, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone()));
|
|
|
|
// For other IPv6 addresses
|
|
let ip_address = NetAddress::IPv6{addr: [24, 240, 12, 32, 0, 0, 0, 0, 20, 97, 0, 32, 121, 254, 0, 0], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
let ip_address = NetAddress::IPv6{addr: [68, 23, 56, 63, 0, 0, 2, 7, 75, 109, 0, 39, 0, 0, 0, 0], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
let ip_address = NetAddress::IPv6{addr: [101, 38, 140, 230, 100, 0, 30, 98, 0, 26, 0, 0, 57, 96, 0, 0], port: 1000};
|
|
assert_eq!(filter_addresses(Some(ip_address.clone())), None);
|
|
|
|
// For (None)
|
|
assert_eq!(filter_addresses(None), None);
|
|
}
|
|
}
|