Merge pull request #1980 from TheBlueMatt/2023-01-async-utxo-lookups

This commit is contained in:
wpaulino 2023-02-10 19:57:11 -08:00 committed by GitHub
commit be4bb58573
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 1191 additions and 201 deletions

View file

@ -51,9 +51,9 @@ At a high level, some of the common interfaces fit together as follows:
--------------- / (as EventsProvider) ^ | |
| PeerManager |- \ | | |
--------------- \ | (is-a) | |
| ----------------- \ _---------------- / /
| | chain::Access | \ / | ChainMonitor |---------------
| ----------------- \ / ----------------
| -------------- \ _---------------- / /
| | UtxoLookup | \ / | ChainMonitor |---------------
| -------------- \ / ----------------
| ^ \ / |
(as RoutingMessageHandler) | v v
\ ----------------- --------- -----------------

View file

@ -41,6 +41,7 @@ use lightning::ln::peer_handler::{MessageHandler,PeerManager,SocketDescriptor,Ig
use lightning::ln::msgs::{self, DecodeError};
use lightning::ln::script::ShutdownScript;
use lightning::routing::gossip::{P2PGossipSync, NetworkGraph};
use lightning::routing::utxo::UtxoLookup;
use lightning::routing::router::{find_route, InFlightHtlcs, PaymentParameters, Route, RouteHop, RouteParameters, Router};
use lightning::routing::scoring::FixedPenaltyScorer;
use lightning::util::config::UserConfig;
@ -183,7 +184,7 @@ impl<'a> std::hash::Hash for Peer<'a> {
type ChannelMan<'a> = ChannelManager<
Arc<chainmonitor::ChainMonitor<EnforcingSigner, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<TestPersister>>>,
Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<KeyProvider>, Arc<KeyProvider>, Arc<FuzzEstimator>, &'a FuzzRouter, Arc<dyn Logger>>;
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan<'a>>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<dyn Logger>>>, Arc<dyn chain::Access>, Arc<dyn Logger>>>, IgnoringMessageHandler, Arc<dyn Logger>, IgnoringMessageHandler, Arc<KeyProvider>>;
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan<'a>>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<dyn Logger>>>, Arc<dyn UtxoLookup>, Arc<dyn Logger>>>, IgnoringMessageHandler, Arc<dyn Logger>, IgnoringMessageHandler, Arc<KeyProvider>>;
struct MoneyLossDetector<'a> {
manager: Arc<ChannelMan<'a>>,

View file

@ -11,11 +11,11 @@ use bitcoin::blockdata::script::Builder;
use bitcoin::blockdata::transaction::TxOut;
use bitcoin::hash_types::BlockHash;
use lightning::chain;
use lightning::chain::transaction::OutPoint;
use lightning::ln::channelmanager::{self, ChannelDetails, ChannelCounterparty};
use lightning::ln::msgs;
use lightning::routing::gossip::{NetworkGraph, RoutingFees};
use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoLookupError, UtxoResult};
use lightning::routing::router::{find_route, PaymentParameters, RouteHint, RouteHintHop, RouteParameters};
use lightning::routing::scoring::FixedPenaltyScorer;
use lightning::util::config::UserConfig;
@ -81,17 +81,36 @@ impl InputData {
}
}
struct FuzzChainSource {
struct FuzzChainSource<'a, 'b, Out: test_logger::Output> {
input: Arc<InputData>,
net_graph: &'a NetworkGraph<&'b test_logger::TestLogger<Out>>,
}
impl chain::Access for FuzzChainSource {
fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> Result<TxOut, chain::AccessError> {
match self.input.get_slice(2) {
Some(&[0, _]) => Err(chain::AccessError::UnknownChain),
Some(&[1, _]) => Err(chain::AccessError::UnknownTx),
Some(&[_, x]) => Ok(TxOut { value: 0, script_pubkey: Builder::new().push_int(x as i64).into_script().to_v0_p2wsh() }),
None => Err(chain::AccessError::UnknownTx),
_ => unreachable!(),
impl<Out: test_logger::Output> UtxoLookup for FuzzChainSource<'_, '_, Out> {
fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> UtxoResult {
let input_slice = self.input.get_slice(2);
if input_slice.is_none() { return UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)); }
let input_slice = input_slice.unwrap();
let txo_res = TxOut {
value: if input_slice[0] % 2 == 0 { 1_000_000 } else { 1_000 },
script_pubkey: Builder::new().push_int(input_slice[1] as i64).into_script().to_v0_p2wsh(),
};
match input_slice {
&[0, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownChain)),
&[1, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)),
&[2, _] => {
let future = UtxoFuture::new();
future.resolve_without_forwarding(self.net_graph, Ok(txo_res));
UtxoResult::Async(future.clone())
},
&[3, _] => {
let future = UtxoFuture::new();
future.resolve_without_forwarding(self.net_graph, Err(UtxoLookupError::UnknownTx));
UtxoResult::Async(future.clone())
},
&[4, _] => {
UtxoResult::Async(UtxoFuture::new()) // the future will never resolve
},
&[..] => UtxoResult::Sync(Ok(txo_res)),
}
}
}
@ -171,6 +190,10 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
let our_pubkey = get_pubkey!();
let net_graph = NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash(), &logger);
let chain_source = FuzzChainSource {
input: Arc::clone(&input),
net_graph: &net_graph,
};
let mut node_pks = HashSet::new();
let mut scid = 42;
@ -191,13 +214,14 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
let msg = decode_msg_with_len16!(msgs::UnsignedChannelAnnouncement, 32+8+33*4);
node_pks.insert(get_pubkey_from_node_id!(msg.node_id_1));
node_pks.insert(get_pubkey_from_node_id!(msg.node_id_2));
let _ = net_graph.update_channel_from_unsigned_announcement::<&FuzzChainSource>(&msg, &None);
let _ = net_graph.update_channel_from_unsigned_announcement::
<&FuzzChainSource<'_, '_, Out>>(&msg, &None);
},
2 => {
let msg = decode_msg_with_len16!(msgs::UnsignedChannelAnnouncement, 32+8+33*4);
node_pks.insert(get_pubkey_from_node_id!(msg.node_id_1));
node_pks.insert(get_pubkey_from_node_id!(msg.node_id_2));
let _ = net_graph.update_channel_from_unsigned_announcement(&msg, &Some(&FuzzChainSource { input: Arc::clone(&input) }));
let _ = net_graph.update_channel_from_unsigned_announcement(&msg, &Some(&chain_source));
},
3 => {
let _ = net_graph.update_channel_unsigned(&decode_msg!(msgs::UnsignedChannelUpdate, 72));

View file

@ -30,6 +30,7 @@ use lightning::ln::channelmanager::ChannelManager;
use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
use lightning::routing::utxo::UtxoLookup;
use lightning::routing::router::Router;
use lightning::routing::scoring::{Score, WriteableScore};
use lightning::util::events::{Event, EventHandler, EventsProvider};
@ -116,13 +117,13 @@ const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
/// Either [`P2PGossipSync`] or [`RapidGossipSync`].
pub enum GossipSync<
P: Deref<Target = P2PGossipSync<G, A, L>>,
P: Deref<Target = P2PGossipSync<G, U, L>>,
R: Deref<Target = RapidGossipSync<G, L>>,
G: Deref<Target = NetworkGraph<L>>,
A: Deref,
U: Deref,
L: Deref,
>
where A::Target: chain::Access, L::Target: Logger {
where U::Target: UtxoLookup, L::Target: Logger {
/// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
P2P(P),
/// Rapid gossip sync from a trusted server.
@ -132,13 +133,13 @@ where A::Target: chain::Access, L::Target: Logger {
}
impl<
P: Deref<Target = P2PGossipSync<G, A, L>>,
P: Deref<Target = P2PGossipSync<G, U, L>>,
R: Deref<Target = RapidGossipSync<G, L>>,
G: Deref<Target = NetworkGraph<L>>,
A: Deref,
U: Deref,
L: Deref,
> GossipSync<P, R, G, A, L>
where A::Target: chain::Access, L::Target: Logger {
> GossipSync<P, R, G, U, L>
where U::Target: UtxoLookup, L::Target: Logger {
fn network_graph(&self) -> Option<&G> {
match self {
GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
@ -163,10 +164,10 @@ where A::Target: chain::Access, L::Target: Logger {
}
/// (C-not exported) as the bindings concretize everything and have constructors for us
impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
impl<P: Deref<Target = P2PGossipSync<G, U, L>>, G: Deref<Target = NetworkGraph<L>>, U: Deref, L: Deref>
GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
where
A::Target: chain::Access,
U::Target: UtxoLookup,
L::Target: Logger,
{
/// Initializes a new [`GossipSync::P2P`] variant.
@ -178,10 +179,10 @@ where
/// (C-not exported) as the bindings concretize everything and have constructors for us
impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
GossipSync<
&P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
&P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
R,
G,
&'a (dyn chain::Access + Send + Sync),
&'a (dyn UtxoLookup + Send + Sync),
L,
>
where
@ -196,10 +197,10 @@ where
/// (C-not exported) as the bindings concretize everything and have constructors for us
impl<'a, L: Deref>
GossipSync<
&P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
&P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
&RapidGossipSync<&'a NetworkGraph<L>, L>,
&'a NetworkGraph<L>,
&'a (dyn chain::Access + Send + Sync),
&'a (dyn UtxoLookup + Send + Sync),
L,
>
where
@ -397,7 +398,7 @@ macro_rules! define_run_body {
#[cfg(feature = "futures")]
pub async fn process_events_async<
'a,
CA: 'static + Deref + Send + Sync,
UL: 'static + Deref + Send + Sync,
CF: 'static + Deref + Send + Sync,
CW: 'static + Deref + Send + Sync,
T: 'static + Deref + Send + Sync,
@ -418,7 +419,7 @@ pub async fn process_events_async<
PS: 'static + Deref + Send,
M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
UMH: 'static + Deref + Send + Sync,
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
@ -428,11 +429,11 @@ pub async fn process_events_async<
Sleeper: Fn(Duration) -> SleepFuture
>(
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
sleeper: Sleeper,
) -> Result<(), io::Error>
where
CA::Target: 'static + chain::Access,
UL::Target: 'static + UtxoLookup,
CF::Target: 'static + chain::Filter,
CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
T::Target: 'static + BroadcasterInterface,
@ -531,7 +532,7 @@ impl BackgroundProcessor {
/// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
pub fn start<
'a,
CA: 'static + Deref + Send + Sync,
UL: 'static + Deref + Send + Sync,
CF: 'static + Deref + Send + Sync,
CW: 'static + Deref + Send + Sync,
T: 'static + Deref + Send + Sync,
@ -551,7 +552,7 @@ impl BackgroundProcessor {
PS: 'static + Deref + Send,
M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
UMH: 'static + Deref + Send + Sync,
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
@ -559,10 +560,10 @@ impl BackgroundProcessor {
SC: for <'b> WriteableScore<'b>,
>(
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
) -> Self
where
CA::Target: 'static + chain::Access,
UL::Target: 'static + UtxoLookup,
CF::Target: 'static + chain::Filter,
CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
T::Target: 'static + BroadcasterInterface,

View file

@ -33,12 +33,12 @@
//! type FeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
//! type Logger = dyn lightning::util::logger::Logger + Send + Sync;
//! type NodeSigner = dyn lightning::chain::keysinterface::NodeSigner + Send + Sync;
//! type ChainAccess = dyn lightning::chain::Access + Send + Sync;
//! type UtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
//! type ChainFilter = dyn lightning::chain::Filter + Send + Sync;
//! type DataPersister = dyn lightning::chain::chainmonitor::Persist<lightning::chain::keysinterface::InMemorySigner> + Send + Sync;
//! type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::chain::keysinterface::InMemorySigner, Arc<ChainFilter>, Arc<TxBroadcaster>, Arc<FeeEstimator>, Arc<Logger>, Arc<DataPersister>>;
//! type ChannelManager = Arc<lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor, TxBroadcaster, FeeEstimator, Logger>>;
//! type PeerManager = Arc<lightning::ln::peer_handler::SimpleArcPeerManager<lightning_net_tokio::SocketDescriptor, ChainMonitor, TxBroadcaster, FeeEstimator, ChainAccess, Logger>>;
//! type PeerManager = Arc<lightning::ln::peer_handler::SimpleArcPeerManager<lightning_net_tokio::SocketDescriptor, ChainMonitor, TxBroadcaster, FeeEstimator, UtxoLookup, Logger>>;
//!
//! // Connect to node with pubkey their_node_id at addr:
//! async fn connect_to_node(peer_manager: PeerManager, chain_monitor: Arc<ChainMonitor>, channel_manager: ChannelManager, their_node_id: PublicKey, addr: SocketAddr) {
@ -176,8 +176,9 @@ impl Connection {
let (event_waker, event_receiver) = mpsc::channel(1);
tokio::spawn(Self::poll_event_process(peer_manager.clone(), event_receiver));
// 8KB is nice and big but also should never cause any issues with stack overflowing.
let mut buf = [0; 8192];
// 4KiB is nice and big without handling too many messages all at once, giving other peers
// a chance to do some work.
let mut buf = [0; 4096];
let mut our_descriptor = SocketDescriptor::new(us.clone());
// An enum describing why we did/are disconnecting:
@ -623,6 +624,7 @@ mod tests {
fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) }
fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() }
fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() }
fn processing_queue_high(&self) -> bool { false }
}
impl ChannelMessageHandler for MsgHandler {
fn handle_open_channel(&self, _their_node_id: &PublicKey, _msg: &OpenChannel) {}

View file

@ -12,7 +12,6 @@
use bitcoin::blockdata::block::{Block, BlockHeader};
use bitcoin::blockdata::constants::genesis_block;
use bitcoin::blockdata::script::Script;
use bitcoin::blockdata::transaction::TxOut;
use bitcoin::hash_types::{BlockHash, Txid};
use bitcoin::network::constants::Network;
use bitcoin::secp256k1::PublicKey;
@ -60,26 +59,6 @@ impl BestBlock {
pub fn height(&self) -> u32 { self.height }
}
/// An error when accessing the chain via [`Access`].
#[derive(Clone, Debug)]
pub enum AccessError {
/// The requested chain is unknown.
UnknownChain,
/// The requested transaction doesn't exist or hasn't confirmed.
UnknownTx,
}
/// The `Access` trait defines behavior for accessing chain data and state, such as blocks and
/// UTXOs.
pub trait Access {
/// Returns the transaction output of a funding transaction encoded by [`short_channel_id`].
/// Returns an error if `genesis_hash` is for a different chain or if such a transaction output
/// is unknown.
///
/// [`short_channel_id`]: https://github.com/lightning/bolts/blob/master/07-routing-gossip.md#definition-of-short_channel_id
fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> Result<TxOut, AccessError>;
}
/// The `Listen` trait is used to notify when blocks have been connected or disconnected from the
/// chain.

View file

@ -5044,7 +5044,7 @@ where
), chan),
// Note that announcement_signatures fails if the channel cannot be announced,
// so get_channel_update_for_broadcast will never fail by the time we get here.
update_msg: self.get_channel_update_for_broadcast(chan.get()).unwrap(),
update_msg: Some(self.get_channel_update_for_broadcast(chan.get()).unwrap()),
});
},
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
@ -5973,7 +5973,7 @@ where
msg: announcement,
// Note that announcement_signatures fails if the channel cannot be announced,
// so get_channel_update_for_broadcast will never fail by the time we get here.
update_msg: self.get_channel_update_for_broadcast(channel).unwrap(),
update_msg: Some(self.get_channel_update_for_broadcast(channel).unwrap()),
});
}
}
@ -6289,6 +6289,7 @@ where
&events::MessageSendEvent::SendChannelAnnouncement { .. } => false,
&events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true,
&events::MessageSendEvent::BroadcastChannelUpdate { .. } => true,
&events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true,
&events::MessageSendEvent::SendChannelUpdate { .. } => false,
&events::MessageSendEvent::HandleError { .. } => false,
&events::MessageSendEvent::SendChannelRangeQuery { .. } => false,

View file

@ -621,6 +621,9 @@ pub fn remove_first_msg_event_to_node(msg_node_id: &PublicKey, msg_events: &Vec<
MessageSendEvent::BroadcastChannelUpdate { .. } => {
false
},
MessageSendEvent::BroadcastNodeAnnouncement { .. } => {
false
},
MessageSendEvent::SendChannelUpdate { node_id, .. } => {
node_id == msg_node_id
},
@ -1010,7 +1013,7 @@ pub fn create_chan_between_nodes_with_value_b<'a, 'b, 'c>(node_a: &Node<'a, 'b,
assert_eq!(events_7.len(), 1);
let (announcement, bs_update) = match events_7[0] {
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
(msg, update_msg)
(msg, update_msg.clone().unwrap())
},
_ => panic!("Unexpected event"),
};
@ -1021,6 +1024,7 @@ pub fn create_chan_between_nodes_with_value_b<'a, 'b, 'c>(node_a: &Node<'a, 'b,
let as_update = match events_8[0] {
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
assert!(*announcement == *msg);
let update_msg = update_msg.clone().unwrap();
assert_eq!(update_msg.contents.short_channel_id, announcement.contents.short_channel_id);
assert_eq!(update_msg.contents.short_channel_id, bs_update.contents.short_channel_id);
update_msg
@ -1031,7 +1035,7 @@ pub fn create_chan_between_nodes_with_value_b<'a, 'b, 'c>(node_a: &Node<'a, 'b,
*node_a.network_chan_count.borrow_mut() += 1;
expect_channel_ready_event(&node_b, &node_a.node.get_our_node_id());
((*announcement).clone(), (*as_update).clone(), (*bs_update).clone())
((*announcement).clone(), as_update, bs_update)
}
pub fn create_announced_chan_between_nodes<'a, 'b, 'c, 'd>(nodes: &'a Vec<Node<'b, 'c, 'd>>, a: usize, b: usize) -> (msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction) {

View file

@ -1082,6 +1082,13 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider {
/// list of `short_channel_id`s.
fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>;
// Handler queueing status:
/// Indicates that there are a large number of [`ChannelAnnouncement`] (or other) messages
/// pending some async action. While there is no guarantee of the rate of future messages, the
/// caller should seek to reduce the rate of new gossip messages handled, especially
/// [`ChannelAnnouncement`]s.
fn processing_queue_high(&self) -> bool;
// Handler information:
/// Gets the node feature flags which this handler itself supports. All available handlers are
/// queried similarly and their feature flags are OR'd together to form the [`NodeFeatures`]

View file

@ -81,6 +81,7 @@ impl RoutingMessageHandler for IgnoringMessageHandler {
fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures {
InitFeatures::empty()
}
fn processing_queue_high(&self) -> bool { false }
}
impl OnionMessageProvider for IgnoringMessageHandler {
fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option<msgs::OnionMessage> { None }
@ -414,6 +415,12 @@ struct Peer {
awaiting_pong_timer_tick_intervals: i8,
received_message_since_timer_tick: bool,
sent_gossip_timestamp_filter: bool,
/// Indicates we've received a `channel_announcement` since the last time we had
/// [`PeerManager::gossip_processing_backlogged`] set (or, really, that we've received a
/// `channel_announcement` at all - we set this unconditionally but unset it every time we
/// check if we're gossip-processing-backlogged).
received_channel_announce_since_backlogged: bool,
}
impl Peer {
@ -450,8 +457,12 @@ impl Peer {
/// Returns whether we should be reading bytes from this peer, based on whether its outbound
/// buffer still has space and we don't need to pause reads to get some writes out.
fn should_read(&self) -> bool {
self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
fn should_read(&mut self, gossip_processing_backlogged: bool) -> bool {
if !gossip_processing_backlogged {
self.received_channel_announce_since_backlogged = false;
}
self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE &&
(!gossip_processing_backlogged || !self.received_channel_announce_since_backlogged)
}
/// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's
@ -568,6 +579,9 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: D
peer_counter: AtomicCounter,
gossip_processing_backlogged: AtomicBool,
gossip_processing_backlog_lifted: AtomicBool,
node_signer: NS,
logger: L,
@ -726,6 +740,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
blocked_event_processors: AtomicBool::new(false),
ephemeral_key_midstate,
peer_counter: AtomicCounter::new(),
gossip_processing_backlogged: AtomicBool::new(false),
gossip_processing_backlog_lifted: AtomicBool::new(false),
last_node_announcement_serial: AtomicU32::new(current_time),
logger,
custom_message_handler,
@ -805,6 +821,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
awaiting_pong_timer_tick_intervals: 0,
received_message_since_timer_tick: false,
sent_gossip_timestamp_filter: false,
received_channel_announce_since_backlogged: false,
})).is_some() {
panic!("PeerManager driver duplicated descriptors!");
};
@ -852,13 +870,28 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
awaiting_pong_timer_tick_intervals: 0,
received_message_since_timer_tick: false,
sent_gossip_timestamp_filter: false,
received_channel_announce_since_backlogged: false,
})).is_some() {
panic!("PeerManager driver duplicated descriptors!");
};
Ok(())
}
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
fn peer_should_read(&self, peer: &mut Peer) -> bool {
peer.should_read(self.gossip_processing_backlogged.load(Ordering::Relaxed))
}
fn update_gossip_backlogged(&self) {
let new_state = self.message_handler.route_handler.processing_queue_high();
let prev_state = self.gossip_processing_backlogged.swap(new_state, Ordering::Relaxed);
if prev_state && !new_state {
self.gossip_processing_backlog_lifted.store(true, Ordering::Relaxed);
}
}
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer, force_one_write: bool) {
let mut have_written = false;
while !peer.awaiting_write_event {
if peer.should_buffer_onion_message() {
if let Some((peer_node_id, _)) = peer.their_node_id {
@ -915,13 +948,23 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
self.maybe_send_extra_ping(peer);
}
let should_read = self.peer_should_read(peer);
let next_buff = match peer.pending_outbound_buffer.front() {
None => return,
None => {
if force_one_write && !have_written {
if should_read {
let data_sent = descriptor.send_data(&[], should_read);
debug_assert_eq!(data_sent, 0, "Can't write more than no data");
}
}
return
},
Some(buff) => buff,
};
let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
let data_sent = descriptor.send_data(pending, peer.should_read());
let data_sent = descriptor.send_data(pending, should_read);
have_written = true;
peer.pending_outbound_buffer_first_msg_offset += data_sent;
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
peer.pending_outbound_buffer_first_msg_offset = 0;
@ -956,7 +999,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
Some(peer_mutex) => {
let mut peer = peer_mutex.lock().unwrap();
peer.awaiting_write_event = false;
self.do_attempt_write_data(descriptor, &mut peer);
self.do_attempt_write_data(descriptor, &mut peer, false);
}
};
Ok(())
@ -974,6 +1017,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
/// [`send_data`] call on this descriptor has `resume_read` set (preventing DoS issues in the
/// send buffer).
///
/// In order to avoid processing too many messages at once per peer, `data` should be on the
/// order of 4KiB.
///
/// [`send_data`]: SocketDescriptor::send_data
/// [`process_events`]: PeerManager::process_events
pub fn read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
@ -1203,7 +1249,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
}
}
}
pause_read = !peer.should_read();
pause_read = !self.peer_should_read(peer);
if let Some(message) = msg_to_handle {
match self.handle_message(&peer_mutex, peer_lock, message) {
@ -1289,6 +1335,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
return Ok(None);
}
if let wire::Message::ChannelAnnouncement(ref _msg) = message {
peer_lock.received_channel_announce_since_backlogged = true;
}
mem::drop(peer_lock);
if is_gossip_msg(message.type_id()) {
@ -1415,12 +1465,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
.map_err(|e| -> MessageHandlingError { e.into() })? {
should_forward = Some(wire::Message::ChannelAnnouncement(msg));
}
self.update_gossip_backlogged();
},
wire::Message::NodeAnnouncement(msg) => {
if self.message_handler.route_handler.handle_node_announcement(&msg)
.map_err(|e| -> MessageHandlingError { e.into() })? {
should_forward = Some(wire::Message::NodeAnnouncement(msg));
}
self.update_gossip_backlogged();
},
wire::Message::ChannelUpdate(msg) => {
self.message_handler.chan_handler.handle_channel_update(&their_node_id, &msg);
@ -1428,6 +1480,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
.map_err(|e| -> MessageHandlingError { e.into() })? {
should_forward = Some(wire::Message::ChannelUpdate(msg));
}
self.update_gossip_backlogged();
},
wire::Message::QueryShortChannelIds(msg) => {
self.message_handler.route_handler.handle_query_short_channel_ids(&their_node_id, msg)?;
@ -1578,6 +1631,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
}
}
self.update_gossip_backlogged();
let flush_read_disabled = self.gossip_processing_backlog_lifted.swap(false, Ordering::Relaxed);
let mut peers_to_disconnect = HashMap::new();
let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
@ -1722,10 +1778,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None),
_ => {},
}
match self.message_handler.route_handler.handle_channel_update(&update_msg) {
Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(update_msg), None),
_ => {},
if let Some(msg) = update_msg {
match self.message_handler.route_handler.handle_channel_update(&msg) {
Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None),
_ => {},
}
}
},
MessageSendEvent::BroadcastChannelUpdate { msg } => {
@ -1736,6 +1794,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
_ => {},
}
},
MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
log_debug!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}", msg.contents.node_id);
match self.message_handler.route_handler.handle_node_announcement(&msg) {
Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None),
_ => {},
}
},
MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => {
log_trace!(self.logger, "Handling SendChannelUpdate event in peer_handler for node {} for channel {}",
log_pubkey!(node_id), msg.contents.short_channel_id);
@ -1797,7 +1863,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
}
for (descriptor, peer_mutex) in peers.iter() {
self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap());
let mut peer = peer_mutex.lock().unwrap();
if flush_read_disabled { peer.received_channel_announce_since_backlogged = false; }
self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer, flush_read_disabled);
}
}
if !peers_to_disconnect.is_empty() {
@ -1819,7 +1887,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
self.enqueue_message(&mut *peer, &msg);
// This isn't guaranteed to work, but if there is enough free
// room in the send buffer, put the error message there...
self.do_attempt_write_data(&mut descriptor, &mut *peer);
self.do_attempt_write_data(&mut descriptor, &mut *peer, false);
} else {
log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id));
}
@ -1927,8 +1995,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
{
let peers_lock = self.peers.read().unwrap();
self.update_gossip_backlogged();
let flush_read_disabled = self.gossip_processing_backlog_lifted.swap(false, Ordering::Relaxed);
for (descriptor, peer_mutex) in peers_lock.iter() {
let mut peer = peer_mutex.lock().unwrap();
if flush_read_disabled { peer.received_channel_announce_since_backlogged = false; }
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_node_id.is_none() {
// The peer needs to complete its handshake before we can exchange messages. We
// give peers one timer tick to complete handshake, reusing
@ -1942,34 +2015,37 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
continue;
}
if peer.awaiting_pong_timer_tick_intervals == -1 {
// Magic value set in `maybe_send_extra_ping`.
peer.awaiting_pong_timer_tick_intervals = 1;
loop { // Used as a `goto` to skip writing a Ping message.
if peer.awaiting_pong_timer_tick_intervals == -1 {
// Magic value set in `maybe_send_extra_ping`.
peer.awaiting_pong_timer_tick_intervals = 1;
peer.received_message_since_timer_tick = false;
break;
}
if (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick)
|| peer.awaiting_pong_timer_tick_intervals as u64 >
MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64
{
descriptors_needing_disconnect.push(descriptor.clone());
break;
}
peer.received_message_since_timer_tick = false;
continue;
}
if (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick)
|| peer.awaiting_pong_timer_tick_intervals as u64 >
MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64
{
descriptors_needing_disconnect.push(descriptor.clone());
continue;
}
peer.received_message_since_timer_tick = false;
if peer.awaiting_pong_timer_tick_intervals > 0 {
peer.awaiting_pong_timer_tick_intervals += 1;
break;
}
if peer.awaiting_pong_timer_tick_intervals > 0 {
peer.awaiting_pong_timer_tick_intervals += 1;
continue;
peer.awaiting_pong_timer_tick_intervals = 1;
let ping = msgs::Ping {
ponglen: 0,
byteslen: 64,
};
self.enqueue_message(&mut *peer, &ping);
break;
}
peer.awaiting_pong_timer_tick_intervals = 1;
let ping = msgs::Ping {
ponglen: 0,
byteslen: 64,
};
self.enqueue_message(&mut *peer, &ping);
self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer);
self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer, flush_read_disabled);
}
}

View file

@ -184,14 +184,14 @@ fn do_test_1_conf_open(connect_style: ConnectStyle) {
msg.clone()
} else { panic!("Unexpected event"); };
let (bs_announcement, bs_update) = if let MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } = bs_announce_events[1] {
(msg.clone(), update_msg.clone())
(msg.clone(), update_msg.clone().unwrap())
} else { panic!("Unexpected event"); };
nodes[0].node.handle_announcement_signatures(&nodes[1].node.get_our_node_id(), &bs_announcement_sigs);
let as_announce_events = nodes[0].node.get_and_clear_pending_msg_events();
assert_eq!(as_announce_events.len(), 1);
let (announcement, as_update) = if let MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } = as_announce_events[0] {
(msg.clone(), update_msg.clone())
(msg.clone(), update_msg.clone().unwrap())
} else { panic!("Unexpected event"); };
assert_eq!(announcement, bs_announcement);
@ -757,7 +757,7 @@ fn test_public_0conf_channel() {
match bs_announcement[0] {
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
announcement = msg.clone();
bs_update = update_msg.clone();
bs_update = update_msg.clone().unwrap();
},
_ => panic!("Unexpected event"),
};
@ -767,6 +767,7 @@ fn test_public_0conf_channel() {
match as_announcement[0] {
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
assert!(announcement == *msg);
let update_msg = update_msg.as_ref().unwrap();
assert_eq!(update_msg.contents.short_channel_id, scid);
assert_eq!(update_msg.contents.short_channel_id, announcement.contents.short_channel_id);
assert_eq!(update_msg.contents.short_channel_id, bs_update.contents.short_channel_id);

View file

@ -140,7 +140,7 @@ fn test_funding_peer_disconnect() {
assert_eq!(events_7.len(), 1);
let (chan_announcement, as_update) = match events_7[0] {
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
(msg.clone(), update_msg.clone())
(msg.clone(), update_msg.clone().unwrap())
},
_ => panic!("Unexpected event {:?}", events_7[0]),
};
@ -153,7 +153,7 @@ fn test_funding_peer_disconnect() {
let bs_update = match events_8[0] {
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
assert_eq!(*msg, chan_announcement);
update_msg.clone()
update_msg.clone().unwrap()
},
_ => panic!("Unexpected event {:?}", events_8[0]),
};

View file

@ -7,7 +7,7 @@
// You may not use this file except in accordance with one or both of these
// licenses.
//! The top-level network map tracking logic lives here.
//! The [`NetworkGraph`] stores the network gossip and [`P2PGossipSync`] fetches it from peers
use bitcoin::secp256k1::constants::PUBLIC_KEY_SIZE;
use bitcoin::secp256k1::PublicKey;
@ -16,17 +16,14 @@ use bitcoin::secp256k1;
use bitcoin::hashes::sha256d::Hash as Sha256dHash;
use bitcoin::hashes::Hash;
use bitcoin::blockdata::transaction::TxOut;
use bitcoin::hash_types::BlockHash;
use crate::chain;
use crate::chain::Access;
use crate::ln::chan_utils::make_funding_redeemscript_from_slices;
use crate::ln::features::{ChannelFeatures, NodeFeatures, InitFeatures};
use crate::ln::msgs::{DecodeError, ErrorAction, Init, LightningError, RoutingMessageHandler, NetAddress, MAX_VALUE_MSAT};
use crate::ln::msgs::{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, GossipTimestampFilter};
use crate::ln::msgs::{QueryChannelRange, ReplyChannelRange, QueryShortChannelIds, ReplyShortChannelIdsEnd};
use crate::ln::msgs;
use crate::routing::utxo::{self, UtxoLookup};
use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer, MaybeReadable};
use crate::util::logger::{Logger, Level};
use crate::util::events::{MessageSendEvent, MessageSendEventsProvider};
@ -43,7 +40,6 @@ use crate::sync::{RwLock, RwLockReadGuard};
use core::sync::atomic::{AtomicUsize, Ordering};
use crate::sync::Mutex;
use core::ops::{Bound, Deref};
use bitcoin::hashes::hex::ToHex;
#[cfg(feature = "std")]
use std::time::{SystemTime, UNIX_EPOCH};
@ -159,6 +155,8 @@ pub struct NetworkGraph<L: Deref> where L::Target: Logger {
/// resync them from gossip. Each `NodeId` is mapped to the time (in seconds) it was removed so
/// that once some time passes, we can potentially resync it from gossip again.
removed_nodes: Mutex<HashMap<NodeId, Option<u64>>>,
/// Announcement messages which are awaiting an on-chain lookup to be processed.
pub(super) pending_checks: utxo::PendingChecks,
}
/// A read-only view of [`NetworkGraph`].
@ -218,31 +216,30 @@ impl_writeable_tlv_based_enum_upgradable!(NetworkUpdate,
/// This network graph is then used for routing payments.
/// Provides interface to help with initial routing sync by
/// serving historical announcements.
pub struct P2PGossipSync<G: Deref<Target=NetworkGraph<L>>, C: Deref, L: Deref>
where C::Target: chain::Access, L::Target: Logger
pub struct P2PGossipSync<G: Deref<Target=NetworkGraph<L>>, U: Deref, L: Deref>
where U::Target: UtxoLookup, L::Target: Logger
{
network_graph: G,
chain_access: Option<C>,
utxo_lookup: Option<U>,
#[cfg(feature = "std")]
full_syncs_requested: AtomicUsize,
pending_events: Mutex<Vec<MessageSendEvent>>,
logger: L,
}
impl<G: Deref<Target=NetworkGraph<L>>, C: Deref, L: Deref> P2PGossipSync<G, C, L>
where C::Target: chain::Access, L::Target: Logger
impl<G: Deref<Target=NetworkGraph<L>>, U: Deref, L: Deref> P2PGossipSync<G, U, L>
where U::Target: UtxoLookup, L::Target: Logger
{
/// Creates a new tracker of the actual state of the network of channels and nodes,
/// assuming an existing Network Graph.
/// Chain monitor is used to make sure announced channels exist on-chain,
/// channel data is correct, and that the announcement is signed with
/// channel owners' keys.
pub fn new(network_graph: G, chain_access: Option<C>, logger: L) -> Self {
/// UTXO lookup is used to make sure announced channels exist on-chain, channel data is
/// correct, and the announcement is signed with channel owners' keys.
pub fn new(network_graph: G, utxo_lookup: Option<U>, logger: L) -> Self {
P2PGossipSync {
network_graph,
#[cfg(feature = "std")]
full_syncs_requested: AtomicUsize::new(0),
chain_access,
utxo_lookup,
pending_events: Mutex::new(vec![]),
logger,
}
@ -251,8 +248,8 @@ where C::Target: chain::Access, L::Target: Logger
/// Adds a provider used to check new announcements. Does not affect
/// existing announcements unless they are updated.
/// Add, update or remove the provider would replace the current one.
pub fn add_chain_access(&mut self, chain_access: Option<C>) {
self.chain_access = chain_access;
pub fn add_utxo_lookup(&mut self, utxo_lookup: Option<U>) {
self.utxo_lookup = utxo_lookup;
}
/// Gets a reference to the underlying [`NetworkGraph`] which was provided in
@ -275,6 +272,36 @@ where C::Target: chain::Access, L::Target: Logger
false
}
}
/// Used to broadcast forward gossip messages which were validated async.
///
/// Note that this will ignore events other than `Broadcast*` or messages with too much excess
/// data.
pub(super) fn forward_gossip_msg(&self, mut ev: MessageSendEvent) {
match &mut ev {
MessageSendEvent::BroadcastChannelAnnouncement { msg, ref mut update_msg } => {
if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY { return; }
if update_msg.as_ref()
.map(|msg| msg.contents.excess_data.len()).unwrap_or(0) > MAX_EXCESS_BYTES_FOR_RELAY
{
*update_msg = None;
}
},
MessageSendEvent::BroadcastChannelUpdate { msg } => {
if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY { return; }
},
MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY ||
msg.contents.excess_address_data.len() > MAX_EXCESS_BYTES_FOR_RELAY ||
msg.contents.excess_data.len() + msg.contents.excess_address_data.len() > MAX_EXCESS_BYTES_FOR_RELAY
{
return;
}
},
_ => return,
}
self.pending_events.lock().unwrap().push(ev);
}
}
impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
@ -342,8 +369,8 @@ macro_rules! get_pubkey_from_node_id {
}
}
impl<G: Deref<Target=NetworkGraph<L>>, C: Deref, L: Deref> RoutingMessageHandler for P2PGossipSync<G, C, L>
where C::Target: chain::Access, L::Target: Logger
impl<G: Deref<Target=NetworkGraph<L>>, U: Deref, L: Deref> RoutingMessageHandler for P2PGossipSync<G, U, L>
where U::Target: UtxoLookup, L::Target: Logger
{
fn handle_node_announcement(&self, msg: &msgs::NodeAnnouncement) -> Result<bool, LightningError> {
self.network_graph.update_node_from_announcement(msg)?;
@ -353,8 +380,7 @@ where C::Target: chain::Access, L::Target: Logger
}
fn handle_channel_announcement(&self, msg: &msgs::ChannelAnnouncement) -> Result<bool, LightningError> {
self.network_graph.update_channel_from_announcement(msg, &self.chain_access)?;
log_gossip!(self.logger, "Added channel_announcement for {}{}", msg.contents.short_channel_id, if !msg.contents.excess_data.is_empty() { " with excess uninterpreted data!" } else { "" });
self.network_graph.update_channel_from_announcement(msg, &self.utxo_lookup)?;
Ok(msg.contents.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY)
}
@ -630,11 +656,15 @@ where C::Target: chain::Access, L::Target: Logger
features.set_gossip_queries_optional();
features
}
fn processing_queue_high(&self) -> bool {
self.network_graph.pending_checks.too_many_checks_pending()
}
}
impl<G: Deref<Target=NetworkGraph<L>>, C: Deref, L: Deref> MessageSendEventsProvider for P2PGossipSync<G, C, L>
impl<G: Deref<Target=NetworkGraph<L>>, U: Deref, L: Deref> MessageSendEventsProvider for P2PGossipSync<G, U, L>
where
C::Target: chain::Access,
U::Target: UtxoLookup,
L::Target: Logger,
{
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
@ -1205,6 +1235,7 @@ impl<L: Deref> ReadableArgs<L> for NetworkGraph<L> where L::Target: Logger {
last_rapid_gossip_sync_timestamp: Mutex::new(last_rapid_gossip_sync_timestamp),
removed_nodes: Mutex::new(HashMap::new()),
removed_channels: Mutex::new(HashMap::new()),
pending_checks: utxo::PendingChecks::new(),
})
}
}
@ -1244,6 +1275,7 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
last_rapid_gossip_sync_timestamp: Mutex::new(None),
removed_channels: Mutex::new(HashMap::new()),
removed_nodes: Mutex::new(HashMap::new()),
pending_checks: utxo::PendingChecks::new(),
}
}
@ -1299,8 +1331,13 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
}
fn update_node_from_announcement_intern(&self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>) -> Result<(), LightningError> {
match self.nodes.write().unwrap().get_mut(&msg.node_id) {
None => Err(LightningError{err: "No existing channels for node_announcement".to_owned(), action: ErrorAction::IgnoreError}),
let mut nodes = self.nodes.write().unwrap();
match nodes.get_mut(&msg.node_id) {
None => {
core::mem::drop(nodes);
self.pending_checks.check_hold_pending_node_announcement(msg, full_msg)?;
Err(LightningError{err: "No existing channels for node_announcement".to_owned(), action: ErrorAction::IgnoreError})
},
Some(node) => {
if let Some(node_info) = node.announcement_info.as_ref() {
// The timestamp field is somewhat of a misnomer - the BOLTs use it to order
@ -1337,35 +1374,35 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
/// RoutingMessageHandler implementation to call it indirectly. This may be useful to accept
/// routing messages from a source using a protocol other than the lightning P2P protocol.
///
/// If a `chain::Access` object is provided via `chain_access`, it will be called to verify
/// If a [`UtxoLookup`] object is provided via `utxo_lookup`, it will be called to verify
/// the corresponding UTXO exists on chain and is correctly-formatted.
pub fn update_channel_from_announcement<C: Deref>(
&self, msg: &msgs::ChannelAnnouncement, chain_access: &Option<C>,
pub fn update_channel_from_announcement<U: Deref>(
&self, msg: &msgs::ChannelAnnouncement, utxo_lookup: &Option<U>,
) -> Result<(), LightningError>
where
C::Target: chain::Access,
U::Target: UtxoLookup,
{
let msg_hash = hash_to_message!(&Sha256dHash::hash(&msg.contents.encode()[..])[..]);
secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.node_signature_1, &get_pubkey_from_node_id!(msg.contents.node_id_1, "channel_announcement"), "channel_announcement");
secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.node_signature_2, &get_pubkey_from_node_id!(msg.contents.node_id_2, "channel_announcement"), "channel_announcement");
secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.bitcoin_signature_1, &get_pubkey_from_node_id!(msg.contents.bitcoin_key_1, "channel_announcement"), "channel_announcement");
secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.bitcoin_signature_2, &get_pubkey_from_node_id!(msg.contents.bitcoin_key_2, "channel_announcement"), "channel_announcement");
self.update_channel_from_unsigned_announcement_intern(&msg.contents, Some(msg), chain_access)
self.update_channel_from_unsigned_announcement_intern(&msg.contents, Some(msg), utxo_lookup)
}
/// Store or update channel info from a channel announcement without verifying the associated
/// signatures. Because we aren't given the associated signatures here we cannot relay the
/// channel announcement to any of our peers.
///
/// If a `chain::Access` object is provided via `chain_access`, it will be called to verify
/// If a [`UtxoLookup`] object is provided via `utxo_lookup`, it will be called to verify
/// the corresponding UTXO exists on chain and is correctly-formatted.
pub fn update_channel_from_unsigned_announcement<C: Deref>(
&self, msg: &msgs::UnsignedChannelAnnouncement, chain_access: &Option<C>
pub fn update_channel_from_unsigned_announcement<U: Deref>(
&self, msg: &msgs::UnsignedChannelAnnouncement, utxo_lookup: &Option<U>
) -> Result<(), LightningError>
where
C::Target: chain::Access,
U::Target: UtxoLookup,
{
self.update_channel_from_unsigned_announcement_intern(msg, None, chain_access)
self.update_channel_from_unsigned_announcement_intern(msg, None, utxo_lookup)
}
/// Update channel from partial announcement data received via rapid gossip sync
@ -1444,11 +1481,11 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
Ok(())
}
fn update_channel_from_unsigned_announcement_intern<C: Deref>(
&self, msg: &msgs::UnsignedChannelAnnouncement, full_msg: Option<&msgs::ChannelAnnouncement>, chain_access: &Option<C>
fn update_channel_from_unsigned_announcement_intern<U: Deref>(
&self, msg: &msgs::UnsignedChannelAnnouncement, full_msg: Option<&msgs::ChannelAnnouncement>, utxo_lookup: &Option<U>
) -> Result<(), LightningError>
where
C::Target: chain::Access,
U::Target: UtxoLookup,
{
if msg.node_id_1 == msg.node_id_2 || msg.bitcoin_key_1 == msg.bitcoin_key_2 {
return Err(LightningError{err: "Channel announcement node had a channel with itself".to_owned(), action: ErrorAction::IgnoreError});
@ -1476,7 +1513,7 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
action: ErrorAction::IgnoreDuplicateGossip
});
}
} else if chain_access.is_none() {
} else if utxo_lookup.is_none() {
// Similarly, if we can't check the chain right now anyway, ignore the
// duplicate announcement without bothering to take the channels write lock.
return Err(LightningError {
@ -1499,32 +1536,8 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
}
}
let utxo_value = match &chain_access {
&None => {
// Tentatively accept, potentially exposing us to DoS attacks
None
},
&Some(ref chain_access) => {
match chain_access.get_utxo(&msg.chain_hash, msg.short_channel_id) {
Ok(TxOut { value, script_pubkey }) => {
let expected_script =
make_funding_redeemscript_from_slices(msg.bitcoin_key_1.as_slice(), msg.bitcoin_key_2.as_slice()).to_v0_p2wsh();
if script_pubkey != expected_script {
return Err(LightningError{err: format!("Channel announcement key ({}) didn't match on-chain script ({})", expected_script.to_hex(), script_pubkey.to_hex()), action: ErrorAction::IgnoreError});
}
//TODO: Check if value is worth storing, use it to inform routing, and compare it
//to the new HTLC max field in channel_update
Some(value)
},
Err(chain::AccessError::UnknownChain) => {
return Err(LightningError{err: format!("Channel announced on an unknown chain ({})", msg.chain_hash.encode().to_hex()), action: ErrorAction::IgnoreError});
},
Err(chain::AccessError::UnknownTx) => {
return Err(LightningError{err: "Channel announced without corresponding UTXO entry".to_owned(), action: ErrorAction::IgnoreError});
},
}
},
};
let utxo_value = self.pending_checks.check_channel_announcement(
utxo_lookup, msg, full_msg)?;
#[allow(unused_mut, unused_assignments)]
let mut announcement_received_time = 0;
@ -1545,7 +1558,10 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
announcement_received_time,
};
self.add_channel_between_nodes(msg.short_channel_id, chan_info, utxo_value)
self.add_channel_between_nodes(msg.short_channel_id, chan_info, utxo_value)?;
log_gossip!(self.logger, "Added channel_announcement for {}{}", msg.short_channel_id, if !msg.excess_data.is_empty() { " with excess uninterpreted data!" } else { "" });
Ok(())
}
/// Marks a channel in the graph as failed if a corresponding HTLC fail was sent.
@ -1749,7 +1765,11 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
let mut channels = self.channels.write().unwrap();
match channels.get_mut(&msg.short_channel_id) {
None => return Err(LightningError{err: "Couldn't find channel for update".to_owned(), action: ErrorAction::IgnoreError}),
None => {
core::mem::drop(channels);
self.pending_checks.check_hold_pending_channel_update(msg, full_msg)?;
return Err(LightningError{err: "Couldn't find channel for update".to_owned(), action: ErrorAction::IgnoreError});
},
Some(channel) => {
if msg.htlc_maximum_msat > MAX_VALUE_MSAT {
return Err(LightningError{err:
@ -1903,13 +1923,13 @@ impl ReadOnlyNetworkGraph<'_> {
}
#[cfg(test)]
mod tests {
use crate::chain;
pub(crate) mod tests {
use crate::ln::channelmanager;
use crate::ln::chan_utils::make_funding_redeemscript;
#[cfg(feature = "std")]
use crate::ln::features::InitFeatures;
use crate::routing::gossip::{P2PGossipSync, NetworkGraph, NetworkUpdate, NodeAlias, MAX_EXCESS_BYTES_FOR_RELAY, NodeId, RoutingFees, ChannelUpdateInfo, ChannelInfo, NodeAnnouncementInfo, NodeInfo};
use crate::routing::utxo::{UtxoLookupError, UtxoResult};
use crate::ln::msgs::{RoutingMessageHandler, UnsignedNodeAnnouncement, NodeAnnouncement,
UnsignedChannelAnnouncement, ChannelAnnouncement, UnsignedChannelUpdate, ChannelUpdate,
ReplyChannelRange, QueryChannelRange, QueryShortChannelIds, MAX_VALUE_MSAT};
@ -1970,7 +1990,7 @@ mod tests {
assert!(!gossip_sync.should_request_full_sync(&node_id));
}
fn get_signed_node_announcement<F: Fn(&mut UnsignedNodeAnnouncement)>(f: F, node_key: &SecretKey, secp_ctx: &Secp256k1<secp256k1::All>) -> NodeAnnouncement {
pub(crate) fn get_signed_node_announcement<F: Fn(&mut UnsignedNodeAnnouncement)>(f: F, node_key: &SecretKey, secp_ctx: &Secp256k1<secp256k1::All>) -> NodeAnnouncement {
let node_id = NodeId::from_pubkey(&PublicKey::from_secret_key(&secp_ctx, node_key));
let mut unsigned_announcement = UnsignedNodeAnnouncement {
features: channelmanager::provided_node_features(&UserConfig::default()),
@ -1990,7 +2010,7 @@ mod tests {
}
}
fn get_signed_channel_announcement<F: Fn(&mut UnsignedChannelAnnouncement)>(f: F, node_1_key: &SecretKey, node_2_key: &SecretKey, secp_ctx: &Secp256k1<secp256k1::All>) -> ChannelAnnouncement {
pub(crate) fn get_signed_channel_announcement<F: Fn(&mut UnsignedChannelAnnouncement)>(f: F, node_1_key: &SecretKey, node_2_key: &SecretKey, secp_ctx: &Secp256k1<secp256k1::All>) -> ChannelAnnouncement {
let node_id_1 = PublicKey::from_secret_key(&secp_ctx, node_1_key);
let node_id_2 = PublicKey::from_secret_key(&secp_ctx, node_2_key);
let node_1_btckey = &SecretKey::from_slice(&[40; 32]).unwrap();
@ -2017,14 +2037,14 @@ mod tests {
}
}
fn get_channel_script(secp_ctx: &Secp256k1<secp256k1::All>) -> Script {
pub(crate) fn get_channel_script(secp_ctx: &Secp256k1<secp256k1::All>) -> Script {
let node_1_btckey = SecretKey::from_slice(&[40; 32]).unwrap();
let node_2_btckey = SecretKey::from_slice(&[39; 32]).unwrap();
make_funding_redeemscript(&PublicKey::from_secret_key(secp_ctx, &node_1_btckey),
&PublicKey::from_secret_key(secp_ctx, &node_2_btckey)).to_v0_p2wsh()
}
fn get_signed_channel_update<F: Fn(&mut UnsignedChannelUpdate)>(f: F, node_key: &SecretKey, secp_ctx: &Secp256k1<secp256k1::All>) -> ChannelUpdate {
pub(crate) fn get_signed_channel_update<F: Fn(&mut UnsignedChannelUpdate)>(f: F, node_key: &SecretKey, secp_ctx: &Secp256k1<secp256k1::All>) -> ChannelUpdate {
let mut unsigned_channel_update = UnsignedChannelUpdate {
chain_hash: genesis_block(Network::Testnet).header.block_hash(),
short_channel_id: 0,
@ -2141,7 +2161,7 @@ mod tests {
// Test if an associated transaction were not on-chain (or not confirmed).
let chain_source = test_utils::TestChainSource::new(Network::Testnet);
*chain_source.utxo_ret.lock().unwrap() = Err(chain::AccessError::UnknownTx);
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Sync(Err(UtxoLookupError::UnknownTx));
let network_graph = NetworkGraph::new(genesis_hash, &logger);
gossip_sync = P2PGossipSync::new(&network_graph, Some(&chain_source), &logger);
@ -2154,7 +2174,8 @@ mod tests {
};
// Now test if the transaction is found in the UTXO set and the script is correct.
*chain_source.utxo_ret.lock().unwrap() = Ok(TxOut { value: 0, script_pubkey: good_script.clone() });
*chain_source.utxo_ret.lock().unwrap() =
UtxoResult::Sync(Ok(TxOut { value: 0, script_pubkey: good_script.clone() }));
let valid_announcement = get_signed_channel_announcement(|unsigned_announcement| {
unsigned_announcement.short_channel_id += 2;
}, node_1_privkey, node_2_privkey, &secp_ctx);
@ -2172,7 +2193,8 @@ mod tests {
// If we receive announcement for the same channel, once we've validated it against the
// chain, we simply ignore all new (duplicate) announcements.
*chain_source.utxo_ret.lock().unwrap() = Ok(TxOut { value: 0, script_pubkey: good_script });
*chain_source.utxo_ret.lock().unwrap() =
UtxoResult::Sync(Ok(TxOut { value: 0, script_pubkey: good_script }));
match gossip_sync.handle_channel_announcement(&valid_announcement) {
Ok(_) => panic!(),
Err(e) => assert_eq!(e.err, "Already have chain-validated channel")
@ -2246,7 +2268,8 @@ mod tests {
{
// Announce a channel we will update
let good_script = get_channel_script(&secp_ctx);
*chain_source.utxo_ret.lock().unwrap() = Ok(TxOut { value: amount_sats, script_pubkey: good_script.clone() });
*chain_source.utxo_ret.lock().unwrap() =
UtxoResult::Sync(Ok(TxOut { value: amount_sats, script_pubkey: good_script.clone() }));
let valid_channel_announcement = get_signed_channel_announcement(|_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
short_channel_id = valid_channel_announcement.contents.short_channel_id;

View file

@ -9,6 +9,7 @@
//! Structs and impls for receiving messages about the network and storing the topology live here.
pub mod utxo;
pub mod gossip;
pub mod router;
pub mod scoring;

View file

@ -7,10 +7,7 @@
// You may not use this file except in accordance with one or both of these
// licenses.
//! The top-level routing/network map tracking logic lives here.
//!
//! You probably want to create a P2PGossipSync and use that as your RoutingMessageHandler and then
//! interrogate it to get routes for your own payments.
//! The router finds paths within a [`NetworkGraph`] for a payment.
use bitcoin::secp256k1::PublicKey;
use bitcoin::hashes::Hash;
@ -2115,6 +2112,7 @@ fn build_route_from_hops_internal<L: Deref>(
#[cfg(test)]
mod tests {
use crate::routing::gossip::{NetworkGraph, P2PGossipSync, NodeId, EffectiveCapacity};
use crate::routing::utxo::UtxoResult;
use crate::routing::router::{get_route, build_route_from_hops_internal, add_random_cltv_offset, default_node_features,
PaymentParameters, Route, RouteHint, RouteHintHop, RouteHop, RoutingFees,
DEFAULT_MAX_TOTAL_CLTV_EXPIRY_DELTA, MAX_PATH_LENGTH_ESTIMATE};
@ -3529,8 +3527,9 @@ mod tests {
.push_opcode(opcodes::all::OP_PUSHNUM_2)
.push_opcode(opcodes::all::OP_CHECKMULTISIG).into_script().to_v0_p2wsh();
*chain_monitor.utxo_ret.lock().unwrap() = Ok(TxOut { value: 15, script_pubkey: good_script.clone() });
gossip_sync.add_chain_access(Some(chain_monitor));
*chain_monitor.utxo_ret.lock().unwrap() =
UtxoResult::Sync(Ok(TxOut { value: 15, script_pubkey: good_script.clone() }));
gossip_sync.add_utxo_lookup(Some(chain_monitor));
add_channel(&gossip_sync, &secp_ctx, &privkeys[0], &privkeys[2], ChannelFeatures::from_le_bytes(id_to_feature_flags(3)), 333);
update_channel(&gossip_sync, &secp_ctx, &privkeys[0], UnsignedChannelUpdate {

View file

@ -0,0 +1,861 @@
// This file is Copyright its original authors, visible in version control
// history.
//
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
// You may not use this file except in accordance with one or both of these
// licenses.
//! This module contains traits for LDK to access UTXOs to check gossip data is correct.
//!
//! When lightning nodes gossip channel information, they resist DoS attacks by checking that each
//! channel matches a UTXO on-chain, requiring at least some marginal on-chain transacting in
//! order to announce a channel. This module handles that checking.
use bitcoin::{BlockHash, TxOut};
use bitcoin::hashes::hex::ToHex;
use crate::ln::chan_utils::make_funding_redeemscript_from_slices;
use crate::ln::msgs::{self, LightningError, ErrorAction};
use crate::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
use crate::util::events::MessageSendEvent;
use crate::util::logger::{Level, Logger};
use crate::util::ser::Writeable;
use crate::prelude::*;
use alloc::sync::{Arc, Weak};
use crate::sync::Mutex;
use core::ops::Deref;
/// An error when accessing the chain via [`UtxoLookup`].
#[derive(Clone, Debug)]
pub enum UtxoLookupError {
/// The requested chain is unknown.
UnknownChain,
/// The requested transaction doesn't exist or hasn't confirmed.
UnknownTx,
}
/// The result of a [`UtxoLookup::get_utxo`] call. A call may resolve either synchronously,
/// returning the `Sync` variant, or asynchronously, returning an [`UtxoFuture`] in the `Async`
/// variant.
#[derive(Clone)]
pub enum UtxoResult {
/// A result which was resolved synchronously. It either includes a [`TxOut`] for the output
/// requested or a [`UtxoLookupError`].
Sync(Result<TxOut, UtxoLookupError>),
/// A result which will be resolved asynchronously. It includes a [`UtxoFuture`], a `clone` of
/// which you must keep locally and call [`UtxoFuture::resolve`] on once the lookup completes.
///
/// Note that in order to avoid runaway memory usage, the number of parallel checks is limited,
/// but only fairly loosely. Because a pending checks block all message processing, leaving
/// checks pending for an extended time may cause DoS of other functions. It is recommended you
/// keep a tight timeout on lookups, on the order of a few seconds.
Async(UtxoFuture),
}
/// The `UtxoLookup` trait defines behavior for accessing on-chain UTXOs.
pub trait UtxoLookup {
/// Returns the transaction output of a funding transaction encoded by [`short_channel_id`].
/// Returns an error if `genesis_hash` is for a different chain or if such a transaction output
/// is unknown.
///
/// [`short_channel_id`]: https://github.com/lightning/bolts/blob/master/07-routing-gossip.md#definition-of-short_channel_id
fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> UtxoResult;
}
enum ChannelAnnouncement {
Full(msgs::ChannelAnnouncement),
Unsigned(msgs::UnsignedChannelAnnouncement),
}
impl ChannelAnnouncement {
fn node_id_1(&self) -> &NodeId {
match self {
ChannelAnnouncement::Full(msg) => &msg.contents.node_id_1,
ChannelAnnouncement::Unsigned(msg) => &msg.node_id_1,
}
}
}
enum NodeAnnouncement {
Full(msgs::NodeAnnouncement),
Unsigned(msgs::UnsignedNodeAnnouncement),
}
impl NodeAnnouncement {
fn timestamp(&self) -> u32 {
match self {
NodeAnnouncement::Full(msg) => msg.contents.timestamp,
NodeAnnouncement::Unsigned(msg) => msg.timestamp,
}
}
}
enum ChannelUpdate {
Full(msgs::ChannelUpdate),
Unsigned(msgs::UnsignedChannelUpdate),
}
impl ChannelUpdate {
fn timestamp(&self) -> u32 {
match self {
ChannelUpdate::Full(msg) => msg.contents.timestamp,
ChannelUpdate::Unsigned(msg) => msg.timestamp,
}
}
}
struct UtxoMessages {
complete: Option<Result<TxOut, UtxoLookupError>>,
channel_announce: Option<ChannelAnnouncement>,
latest_node_announce_a: Option<NodeAnnouncement>,
latest_node_announce_b: Option<NodeAnnouncement>,
latest_channel_update_a: Option<ChannelUpdate>,
latest_channel_update_b: Option<ChannelUpdate>,
}
/// Represents a future resolution of a [`UtxoLookup::get_utxo`] query resolving async.
///
/// See [`UtxoResult::Async`] and [`UtxoFuture::resolve`] for more info.
#[derive(Clone)]
pub struct UtxoFuture {
state: Arc<Mutex<UtxoMessages>>,
}
/// A trivial implementation of [`UtxoLookup`] which is used to call back into the network graph
/// once we have a concrete resolution of a request.
struct UtxoResolver(Result<TxOut, UtxoLookupError>);
impl UtxoLookup for UtxoResolver {
fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> UtxoResult {
UtxoResult::Sync(self.0.clone())
}
}
impl UtxoFuture {
/// Builds a new future for later resolution.
pub fn new() -> Self {
Self { state: Arc::new(Mutex::new(UtxoMessages {
complete: None,
channel_announce: None,
latest_node_announce_a: None,
latest_node_announce_b: None,
latest_channel_update_a: None,
latest_channel_update_b: None,
}))}
}
/// Resolves this future against the given `graph` and with the given `result`.
///
/// This is identical to calling [`UtxoFuture::resolve`] with a dummy `gossip`, disabling
/// forwarding the validated gossip message onwards to peers.
///
/// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order
/// to allow us to interact with peers again, you should call [`PeerManager::process_events`]
/// after this.
///
/// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high
/// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
pub fn resolve_without_forwarding<L: Deref>(&self,
graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
where L::Target: Logger {
self.do_resolve(graph, result);
}
/// Resolves this future against the given `graph` and with the given `result`.
///
/// The given `gossip` is used to broadcast any validated messages onwards to all peers which
/// have available buffer space.
///
/// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order
/// to allow us to interact with peers again, you should call [`PeerManager::process_events`]
/// after this.
///
/// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high
/// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
pub fn resolve<L: Deref, G: Deref<Target=NetworkGraph<L>>, U: Deref, GS: Deref<Target = P2PGossipSync<G, U, L>>>(&self,
graph: &NetworkGraph<L>, gossip: GS, result: Result<TxOut, UtxoLookupError>
) where L::Target: Logger, U::Target: UtxoLookup {
let mut res = self.do_resolve(graph, result);
for msg_opt in res.iter_mut() {
if let Some(msg) = msg_opt.take() {
gossip.forward_gossip_msg(msg);
}
}
}
fn do_resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
-> [Option<MessageSendEvent>; 5] where L::Target: Logger {
let (announcement, node_a, node_b, update_a, update_b) = {
let mut pending_checks = graph.pending_checks.internal.lock().unwrap();
let mut async_messages = self.state.lock().unwrap();
if async_messages.channel_announce.is_none() {
// We raced returning to `check_channel_announcement` which hasn't updated
// `channel_announce` yet. That's okay, we can set the `complete` field which it will
// check once it gets control again.
async_messages.complete = Some(result);
return [None, None, None, None, None];
}
let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() {
ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents,
ChannelAnnouncement::Unsigned(msg) => &msg,
};
pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state));
(async_messages.channel_announce.take().unwrap(),
async_messages.latest_node_announce_a.take(),
async_messages.latest_node_announce_b.take(),
async_messages.latest_channel_update_a.take(),
async_messages.latest_channel_update_b.take())
};
let mut res = [None, None, None, None, None];
let mut res_idx = 0;
// Now that we've updated our internal state, pass the pending messages back through the
// network graph with a different `UtxoLookup` which will resolve immediately.
// Note that we ignore errors as we don't disconnect peers anyway, so there's nothing to do
// with them.
let resolver = UtxoResolver(result);
match announcement {
ChannelAnnouncement::Full(signed_msg) => {
if graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)).is_ok() {
res[res_idx] = Some(MessageSendEvent::BroadcastChannelAnnouncement {
msg: signed_msg, update_msg: None,
});
res_idx += 1;
}
},
ChannelAnnouncement::Unsigned(msg) => {
let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver));
},
}
for announce in core::iter::once(node_a).chain(core::iter::once(node_b)) {
match announce {
Some(NodeAnnouncement::Full(signed_msg)) => {
if graph.update_node_from_announcement(&signed_msg).is_ok() {
res[res_idx] = Some(MessageSendEvent::BroadcastNodeAnnouncement {
msg: signed_msg,
});
res_idx += 1;
}
},
Some(NodeAnnouncement::Unsigned(msg)) => {
let _ = graph.update_node_from_unsigned_announcement(&msg);
},
None => {},
}
}
for update in core::iter::once(update_a).chain(core::iter::once(update_b)) {
match update {
Some(ChannelUpdate::Full(signed_msg)) => {
if graph.update_channel(&signed_msg).is_ok() {
res[res_idx] = Some(MessageSendEvent::BroadcastChannelUpdate {
msg: signed_msg,
});
res_idx += 1;
}
},
Some(ChannelUpdate::Unsigned(msg)) => {
let _ = graph.update_channel_unsigned(&msg);
},
None => {},
}
}
res
}
}
struct PendingChecksContext {
channels: HashMap<u64, Weak<Mutex<UtxoMessages>>>,
nodes: HashMap<NodeId, Vec<Weak<Mutex<UtxoMessages>>>>,
}
impl PendingChecksContext {
fn lookup_completed(&mut self,
msg: &msgs::UnsignedChannelAnnouncement, completed_state: &Weak<Mutex<UtxoMessages>>
) {
if let hash_map::Entry::Occupied(e) = self.channels.entry(msg.short_channel_id) {
if Weak::ptr_eq(e.get(), &completed_state) {
e.remove();
}
}
if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_1) {
e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
if e.get().is_empty() { e.remove(); }
}
if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_2) {
e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
if e.get().is_empty() { e.remove(); }
}
}
}
/// A set of messages which are pending UTXO lookups for processing.
pub(super) struct PendingChecks {
internal: Mutex<PendingChecksContext>,
}
impl PendingChecks {
pub(super) fn new() -> Self {
PendingChecks { internal: Mutex::new(PendingChecksContext {
channels: HashMap::new(), nodes: HashMap::new(),
}) }
}
/// Checks if there is a pending `channel_update` UTXO validation for the given channel,
/// and, if so, stores the channel message for handling later and returns an `Err`.
pub(super) fn check_hold_pending_channel_update(
&self, msg: &msgs::UnsignedChannelUpdate, full_msg: Option<&msgs::ChannelUpdate>
) -> Result<(), LightningError> {
let mut pending_checks = self.internal.lock().unwrap();
if let hash_map::Entry::Occupied(e) = pending_checks.channels.entry(msg.short_channel_id) {
let is_from_a = (msg.flags & 1) == 1;
match Weak::upgrade(e.get()) {
Some(msgs_ref) => {
let mut messages = msgs_ref.lock().unwrap();
let latest_update = if is_from_a {
&mut messages.latest_channel_update_a
} else {
&mut messages.latest_channel_update_b
};
if latest_update.is_none() || latest_update.as_ref().unwrap().timestamp() < msg.timestamp {
// If the messages we got has a higher timestamp, just blindly assume the
// signatures on the new message are correct and drop the old message. This
// may cause us to end up dropping valid `channel_update`s if a peer is
// malicious, but we should get the correct ones when the node updates them.
*latest_update = Some(
if let Some(msg) = full_msg { ChannelUpdate::Full(msg.clone()) }
else { ChannelUpdate::Unsigned(msg.clone()) });
}
return Err(LightningError {
err: "Awaiting channel_announcement validation to accept channel_update".to_owned(),
action: ErrorAction::IgnoreAndLog(Level::Gossip),
});
},
None => { e.remove(); },
}
}
Ok(())
}
/// Checks if there is a pending `node_announcement` UTXO validation for a channel with the
/// given node and, if so, stores the channel message for handling later and returns an `Err`.
pub(super) fn check_hold_pending_node_announcement(
&self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>
) -> Result<(), LightningError> {
let mut pending_checks = self.internal.lock().unwrap();
if let hash_map::Entry::Occupied(mut e) = pending_checks.nodes.entry(msg.node_id) {
let mut found_at_least_one_chan = false;
e.get_mut().retain(|node_msgs| {
match Weak::upgrade(&node_msgs) {
Some(chan_mtx) => {
let mut chan_msgs = chan_mtx.lock().unwrap();
if let Some(chan_announce) = &chan_msgs.channel_announce {
let latest_announce =
if *chan_announce.node_id_1() == msg.node_id {
&mut chan_msgs.latest_node_announce_a
} else {
&mut chan_msgs.latest_node_announce_b
};
if latest_announce.is_none() ||
latest_announce.as_ref().unwrap().timestamp() < msg.timestamp
{
*latest_announce = Some(
if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) }
else { NodeAnnouncement::Unsigned(msg.clone()) });
}
found_at_least_one_chan = true;
true
} else {
debug_assert!(false, "channel_announce is set before struct is added to node map");
false
}
},
None => false,
}
});
if e.get().is_empty() { e.remove(); }
if found_at_least_one_chan {
return Err(LightningError {
err: "Awaiting channel_announcement validation to accept node_announcement".to_owned(),
action: ErrorAction::IgnoreAndLog(Level::Gossip),
});
}
}
Ok(())
}
fn check_replace_previous_entry(msg: &msgs::UnsignedChannelAnnouncement,
full_msg: Option<&msgs::ChannelAnnouncement>, replacement: Option<Weak<Mutex<UtxoMessages>>>,
pending_channels: &mut HashMap<u64, Weak<Mutex<UtxoMessages>>>
) -> Result<(), msgs::LightningError> {
match pending_channels.entry(msg.short_channel_id) {
hash_map::Entry::Occupied(mut e) => {
// There's already a pending lookup for the given SCID. Check if the messages
// are the same and, if so, return immediately (don't bother spawning another
// lookup if we haven't gotten that far yet).
match Weak::upgrade(&e.get()) {
Some(pending_msgs) => {
let pending_matches = match &pending_msgs.lock().unwrap().channel_announce {
Some(ChannelAnnouncement::Full(pending_msg)) => Some(pending_msg) == full_msg,
Some(ChannelAnnouncement::Unsigned(pending_msg)) => pending_msg == msg,
None => {
// This shouldn't actually be reachable. We set the
// `channel_announce` field under the same lock as setting the
// channel map entry. Still, we can just treat it as
// non-matching and let the new request fly.
debug_assert!(false);
false
},
};
if pending_matches {
return Err(LightningError {
err: "Channel announcement is already being checked".to_owned(),
action: ErrorAction::IgnoreDuplicateGossip,
});
} else {
// The earlier lookup is a different message. If we have another
// request in-flight now replace the original.
// Note that in the replace case whether to replace is somewhat
// arbitrary - both results will be handled, we're just updating the
// value that will be compared to future lookups with the same SCID.
if let Some(item) = replacement {
*e.get_mut() = item;
}
}
},
None => {
// The earlier lookup already resolved. We can't be sure its the same
// so just remove/replace it and move on.
if let Some(item) = replacement {
*e.get_mut() = item;
} else { e.remove(); }
},
}
},
hash_map::Entry::Vacant(v) => {
if let Some(item) = replacement { v.insert(item); }
},
}
Ok(())
}
pub(super) fn check_channel_announcement<U: Deref>(&self,
utxo_lookup: &Option<U>, msg: &msgs::UnsignedChannelAnnouncement,
full_msg: Option<&msgs::ChannelAnnouncement>
) -> Result<Option<u64>, msgs::LightningError> where U::Target: UtxoLookup {
let handle_result = |res| {
match res {
Ok(TxOut { value, script_pubkey }) => {
let expected_script =
make_funding_redeemscript_from_slices(msg.bitcoin_key_1.as_slice(), msg.bitcoin_key_2.as_slice()).to_v0_p2wsh();
if script_pubkey != expected_script {
return Err(LightningError{
err: format!("Channel announcement key ({}) didn't match on-chain script ({})",
expected_script.to_hex(), script_pubkey.to_hex()),
action: ErrorAction::IgnoreError
});
}
Ok(Some(value))
},
Err(UtxoLookupError::UnknownChain) => {
Err(LightningError {
err: format!("Channel announced on an unknown chain ({})",
msg.chain_hash.encode().to_hex()),
action: ErrorAction::IgnoreError
})
},
Err(UtxoLookupError::UnknownTx) => {
Err(LightningError {
err: "Channel announced without corresponding UTXO entry".to_owned(),
action: ErrorAction::IgnoreError
})
},
}
};
Self::check_replace_previous_entry(msg, full_msg, None,
&mut self.internal.lock().unwrap().channels)?;
match utxo_lookup {
&None => {
// Tentatively accept, potentially exposing us to DoS attacks
Ok(None)
},
&Some(ref utxo_lookup) => {
match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) {
UtxoResult::Sync(res) => handle_result(res),
UtxoResult::Async(future) => {
let mut pending_checks = self.internal.lock().unwrap();
let mut async_messages = future.state.lock().unwrap();
if let Some(res) = async_messages.complete.take() {
// In the unlikely event the future resolved before we managed to get it,
// handle the result in-line.
handle_result(res)
} else {
Self::check_replace_previous_entry(msg, full_msg,
Some(Arc::downgrade(&future.state)), &mut pending_checks.channels)?;
async_messages.channel_announce = Some(
if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) }
else { ChannelAnnouncement::Unsigned(msg.clone()) });
pending_checks.nodes.entry(msg.node_id_1)
.or_insert(Vec::new()).push(Arc::downgrade(&future.state));
pending_checks.nodes.entry(msg.node_id_2)
.or_insert(Vec::new()).push(Arc::downgrade(&future.state));
Err(LightningError {
err: "Channel being checked async".to_owned(),
action: ErrorAction::IgnoreAndLog(Level::Gossip),
})
}
},
}
}
}
}
/// The maximum number of pending gossip checks before [`Self::too_many_checks_pending`]
/// returns `true`. Note that this isn't a strict upper-bound on the number of checks pending -
/// each peer may, at a minimum, read one more socket buffer worth of `channel_announcement`s
/// which we'll have to process. With a socket buffer of 4KB and a minimum
/// `channel_announcement` size of, roughly, 429 bytes, this may leave us with `10*our peer
/// count` messages to process beyond this limit. Because we'll probably have a few peers,
/// there's no reason for this constant to be materially less than 30 or so, and 32 in-flight
/// checks should be more than enough for decent parallelism.
const MAX_PENDING_LOOKUPS: usize = 32;
/// Returns true if there are a large number of async checks pending and future
/// `channel_announcement` messages should be delayed. Note that this is only a hint and
/// messages already in-flight may still have to be handled for various reasons.
pub(super) fn too_many_checks_pending(&self) -> bool {
let mut pending_checks = self.internal.lock().unwrap();
if pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS {
// If we have many channel checks pending, ensure we don't have any dangling checks
// (i.e. checks where the user told us they'd call back but drop'd the `UtxoFuture`
// instead) before we commit to applying backpressure.
pending_checks.channels.retain(|_, chan| {
Weak::upgrade(&chan).is_some()
});
pending_checks.nodes.retain(|_, channels| {
channels.retain(|chan| Weak::upgrade(&chan).is_some());
!channels.is_empty()
});
pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS
} else {
false
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::routing::gossip::tests::*;
use crate::util::test_utils::{TestChainSource, TestLogger};
use crate::ln::msgs;
use bitcoin::blockdata::constants::genesis_block;
use bitcoin::secp256k1::{Secp256k1, SecretKey};
use core::sync::atomic::Ordering;
fn get_network() -> (TestChainSource, NetworkGraph<Box<TestLogger>>) {
let logger = Box::new(TestLogger::new());
let genesis_hash = genesis_block(bitcoin::Network::Testnet).header.block_hash();
let chain_source = TestChainSource::new(bitcoin::Network::Testnet);
let network_graph = NetworkGraph::new(genesis_hash, logger);
(chain_source, network_graph)
}
fn get_test_objects() -> (msgs::ChannelAnnouncement, TestChainSource,
NetworkGraph<Box<TestLogger>>, bitcoin::Script, msgs::NodeAnnouncement,
msgs::NodeAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, msgs::ChannelUpdate)
{
let secp_ctx = Secp256k1::new();
let (chain_source, network_graph) = get_network();
let good_script = get_channel_script(&secp_ctx);
let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
let valid_announcement = get_signed_channel_announcement(|_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
let node_a_announce = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx);
let node_b_announce = get_signed_node_announcement(|_| {}, node_2_privkey, &secp_ctx);
// Note that we have to set the "direction" flag correctly on both messages
let chan_update_a = get_signed_channel_update(|msg| msg.flags = 0, node_1_privkey, &secp_ctx);
let chan_update_b = get_signed_channel_update(|msg| msg.flags = 1, node_2_privkey, &secp_ctx);
let chan_update_c = get_signed_channel_update(|msg| {
msg.flags = 1; msg.timestamp += 1; }, node_2_privkey, &secp_ctx);
(valid_announcement, chain_source, network_graph, good_script, node_a_announce,
node_b_announce, chan_update_a, chan_update_b, chan_update_c)
}
#[test]
fn test_fast_async_lookup() {
// Check that async lookups which resolve quicker than the future is returned to the
// `get_utxo` call can read it still resolve properly.
let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
let future = UtxoFuture::new();
future.resolve_without_forwarding(&network_graph,
Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap();
assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_some());
}
#[test]
fn test_async_lookup() {
// Test a simple async lookup
let (valid_announcement, chain_source, network_graph, good_script,
node_a_announce, node_b_announce, ..) = get_test_objects();
let future = UtxoFuture::new();
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
assert_eq!(
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
"Channel being checked async");
assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
future.resolve_without_forwarding(&network_graph,
Ok(TxOut { value: 0, script_pubkey: good_script }));
network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
.unwrap().announcement_info.is_none());
network_graph.update_node_from_announcement(&node_a_announce).unwrap();
network_graph.update_node_from_announcement(&node_b_announce).unwrap();
assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
.unwrap().announcement_info.is_some());
}
#[test]
fn test_invalid_async_lookup() {
// Test an async lookup which returns an incorrect script
let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
let future = UtxoFuture::new();
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
assert_eq!(
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
"Channel being checked async");
assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
future.resolve_without_forwarding(&network_graph,
Ok(TxOut { value: 1_000_000, script_pubkey: bitcoin::Script::new() }));
assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
}
#[test]
fn test_failing_async_lookup() {
// Test an async lookup which returns an error
let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
let future = UtxoFuture::new();
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
assert_eq!(
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
"Channel being checked async");
assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
}
#[test]
fn test_updates_async_lookup() {
// Test async lookups will process pending channel_update/node_announcements once they
// complete.
let (valid_announcement, chain_source, network_graph, good_script, node_a_announce,
node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects();
let future = UtxoFuture::new();
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
assert_eq!(
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
"Channel being checked async");
assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
assert_eq!(
network_graph.update_node_from_announcement(&node_a_announce).unwrap_err().err,
"Awaiting channel_announcement validation to accept node_announcement");
assert_eq!(
network_graph.update_node_from_announcement(&node_b_announce).unwrap_err().err,
"Awaiting channel_announcement validation to accept node_announcement");
assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
"Awaiting channel_announcement validation to accept channel_update");
assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
"Awaiting channel_announcement validation to accept channel_update");
future.resolve_without_forwarding(&network_graph,
Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
assert!(network_graph.read_only().channels()
.get(&valid_announcement.contents.short_channel_id).unwrap().one_to_two.is_some());
assert!(network_graph.read_only().channels()
.get(&valid_announcement.contents.short_channel_id).unwrap().two_to_one.is_some());
assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
.unwrap().announcement_info.is_some());
assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_2)
.unwrap().announcement_info.is_some());
}
#[test]
fn test_latest_update_async_lookup() {
// Test async lookups will process the latest channel_update if two are received while
// awaiting an async UTXO lookup.
let (valid_announcement, chain_source, network_graph, good_script, _,
_, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects();
let future = UtxoFuture::new();
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
assert_eq!(
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
"Channel being checked async");
assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
"Awaiting channel_announcement validation to accept channel_update");
assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
"Awaiting channel_announcement validation to accept channel_update");
assert_eq!(network_graph.update_channel(&chan_update_c).unwrap_err().err,
"Awaiting channel_announcement validation to accept channel_update");
future.resolve_without_forwarding(&network_graph,
Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
assert_eq!(chan_update_a.contents.timestamp, chan_update_b.contents.timestamp);
assert!(network_graph.read_only().channels()
.get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
.one_to_two.as_ref().unwrap().last_update !=
network_graph.read_only().channels()
.get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
.two_to_one.as_ref().unwrap().last_update);
}
#[test]
fn test_no_double_lookups() {
// Test that a pending async lookup will prevent a second async lookup from flying, but
// only if the channel_announcement message is identical.
let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
let future = UtxoFuture::new();
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
assert_eq!(
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
"Channel being checked async");
assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
// If we make a second request with the same message, the call count doesn't increase...
let future_b = UtxoFuture::new();
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future_b.clone());
assert_eq!(
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
"Channel announcement is already being checked");
assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
// But if we make a third request with a tweaked message, we should get a second call
// against our new future...
let secp_ctx = Secp256k1::new();
let replacement_pk_1 = &SecretKey::from_slice(&[99; 32]).unwrap();
let replacement_pk_2 = &SecretKey::from_slice(&[98; 32]).unwrap();
let invalid_announcement = get_signed_channel_announcement(|_| {}, replacement_pk_1, replacement_pk_2, &secp_ctx);
assert_eq!(
network_graph.update_channel_from_announcement(&invalid_announcement, &Some(&chain_source)).unwrap_err().err,
"Channel being checked async");
assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 2);
// Still, if we resolve the original future, the original channel will be accepted.
future.resolve_without_forwarding(&network_graph,
Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
assert!(!network_graph.read_only().channels()
.get(&valid_announcement.contents.short_channel_id).unwrap()
.announcement_message.as_ref().unwrap()
.contents.features.supports_unknown_test_feature());
}
#[test]
fn test_checks_backpressure() {
// Test that too_many_checks_pending returns true when there are many checks pending, and
// returns false once they complete.
let secp_ctx = Secp256k1::new();
let (chain_source, network_graph) = get_network();
// We cheat and use a single future for all the lookups to complete them all at once.
let future = UtxoFuture::new();
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
let valid_announcement = get_signed_channel_announcement(
|msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
assert!(!network_graph.pending_checks.too_many_checks_pending());
}
let valid_announcement = get_signed_channel_announcement(
|_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
assert!(network_graph.pending_checks.too_many_checks_pending());
// Once the future completes the "too many checks" flag should reset.
future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
assert!(!network_graph.pending_checks.too_many_checks_pending());
}
#[test]
fn test_checks_backpressure_drop() {
// Test that too_many_checks_pending returns true when there are many checks pending, and
// returns false if we drop some of the futures without completion.
let secp_ctx = Secp256k1::new();
let (chain_source, network_graph) = get_network();
// We cheat and use a single future for all the lookups to complete them all at once.
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(UtxoFuture::new());
let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
let valid_announcement = get_signed_channel_announcement(
|msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
assert!(!network_graph.pending_checks.too_many_checks_pending());
}
let valid_announcement = get_signed_channel_announcement(
|_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
assert!(network_graph.pending_checks.too_many_checks_pending());
// Once the future is drop'd (by resetting the `utxo_ret` value) the "too many checks" flag
// should reset to false.
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Sync(Err(UtxoLookupError::UnknownTx));
assert!(!network_graph.pending_checks.too_many_checks_pending());
}
}

View file

@ -1612,13 +1612,18 @@ pub enum MessageSendEvent {
/// The channel_announcement which should be sent.
msg: msgs::ChannelAnnouncement,
/// The followup channel_update which should be sent.
update_msg: msgs::ChannelUpdate,
update_msg: Option<msgs::ChannelUpdate>,
},
/// Used to indicate that a channel_update should be broadcast to all peers.
BroadcastChannelUpdate {
/// The channel_update which should be sent.
msg: msgs::ChannelUpdate,
},
/// Used to indicate that a node_announcement should be broadcast to all peers.
BroadcastNodeAnnouncement {
/// The node_announcement which should be sent.
msg: msgs::NodeAnnouncement,
},
/// Used to indicate that a channel_update should be sent to a single peer.
/// In contrast to [`Self::BroadcastChannelUpdate`], this is used when the channel is a
/// private channel and we shouldn't be informing all of our peers of channel parameters.

View file

@ -22,8 +22,8 @@ use crate::ln::features::{ChannelFeatures, InitFeatures, NodeFeatures};
use crate::ln::{msgs, wire};
use crate::ln::msgs::LightningError;
use crate::ln::script::ShutdownScript;
use crate::routing::gossip::NetworkGraph;
use crate::routing::gossip::NodeId;
use crate::routing::gossip::{NetworkGraph, NodeId};
use crate::routing::utxo::{UtxoLookup, UtxoLookupError, UtxoResult};
use crate::routing::router::{find_route, InFlightHtlcs, Route, RouteHop, RouteParameters, Router, ScorerAccountingForInFlightHtlcs};
use crate::routing::scoring::FixedPenaltyScorer;
use crate::util::config::UserConfig;
@ -571,6 +571,8 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
features.set_gossip_queries_optional();
features
}
fn processing_queue_high(&self) -> bool { false }
}
impl events::MessageSendEventsProvider for TestRoutingMessageHandler {
@ -839,7 +841,8 @@ impl core::fmt::Debug for OnGetShutdownScriptpubkey {
pub struct TestChainSource {
pub genesis_hash: BlockHash,
pub utxo_ret: Mutex<Result<TxOut, chain::AccessError>>,
pub utxo_ret: Mutex<UtxoResult>,
pub get_utxo_call_count: AtomicUsize,
pub watched_txn: Mutex<HashSet<(Txid, Script)>>,
pub watched_outputs: Mutex<HashSet<(OutPoint, Script)>>,
}
@ -849,17 +852,19 @@ impl TestChainSource {
let script_pubkey = Builder::new().push_opcode(opcodes::OP_TRUE).into_script();
Self {
genesis_hash: genesis_block(network).block_hash(),
utxo_ret: Mutex::new(Ok(TxOut { value: u64::max_value(), script_pubkey })),
utxo_ret: Mutex::new(UtxoResult::Sync(Ok(TxOut { value: u64::max_value(), script_pubkey }))),
get_utxo_call_count: AtomicUsize::new(0),
watched_txn: Mutex::new(HashSet::new()),
watched_outputs: Mutex::new(HashSet::new()),
}
}
}
impl chain::Access for TestChainSource {
fn get_utxo(&self, genesis_hash: &BlockHash, _short_channel_id: u64) -> Result<TxOut, chain::AccessError> {
impl UtxoLookup for TestChainSource {
fn get_utxo(&self, genesis_hash: &BlockHash, _short_channel_id: u64) -> UtxoResult {
self.get_utxo_call_count.fetch_add(1, Ordering::Relaxed);
if self.genesis_hash != *genesis_hash {
return Err(chain::AccessError::UnknownChain);
return UtxoResult::Sync(Err(UtxoLookupError::UnknownChain));
}
self.utxo_ret.lock().unwrap().clone()