Forward gossip messages which were verified asynchronously

Gossip messages which were verified against the chain
asynchronously should still be forwarded to peers, but must now go
out via a new `P2PGossipSync` parameter in the
`AccessResolver::resolve` method, allowing us to wire them up to
the `P2PGossipSync`'s `MessageSendEventsProvider` implementation.
This commit is contained in:
Matt Corallo 2023-01-22 04:14:58 +00:00
parent 41e6eba201
commit 0da7bbd5ec
3 changed files with 83 additions and 8 deletions

View file

@ -99,12 +99,12 @@ impl<Out: test_logger::Output> UtxoLookup for FuzzChainSource<'_, '_, Out> {
&[1, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)),
&[2, _] => {
let future = UtxoFuture::new();
future.resolve(self.net_graph, Ok(txo_res));
future.resolve_without_forwarding(self.net_graph, Ok(txo_res));
UtxoResult::Async(future.clone())
},
&[3, _] => {
let future = UtxoFuture::new();
future.resolve(self.net_graph, Err(UtxoLookupError::UnknownTx));
future.resolve_without_forwarding(self.net_graph, Err(UtxoLookupError::UnknownTx));
UtxoResult::Async(future.clone())
},
&[4, _] => {

View file

@ -272,6 +272,36 @@ where U::Target: UtxoLookup, L::Target: Logger
false
}
}
/// Used to broadcast forward gossip messages which were validated async.
///
/// Note that this will ignore events other than `Broadcast*` or messages with too much excess
/// data.
pub(super) fn forward_gossip_msg(&self, mut ev: MessageSendEvent) {
match &mut ev {
MessageSendEvent::BroadcastChannelAnnouncement { msg, ref mut update_msg } => {
if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY { return; }
if update_msg.as_ref()
.map(|msg| msg.contents.excess_data.len()).unwrap_or(0) > MAX_EXCESS_BYTES_FOR_RELAY
{
*update_msg = None;
}
},
MessageSendEvent::BroadcastChannelUpdate { msg } => {
if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY { return; }
},
MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY ||
msg.contents.excess_address_data.len() > MAX_EXCESS_BYTES_FOR_RELAY ||
msg.contents.excess_data.len() + msg.contents.excess_address_data.len() > MAX_EXCESS_BYTES_FOR_RELAY
{
return;
}
},
_ => return,
}
self.pending_events.lock().unwrap().push(ev);
}
}
impl<L: Deref> NetworkGraph<L> where L::Target: Logger {

View file

@ -18,7 +18,8 @@ use bitcoin::hashes::hex::ToHex;
use crate::ln::chan_utils::make_funding_redeemscript_from_slices;
use crate::ln::msgs::{self, LightningError, ErrorAction};
use crate::routing::gossip::{NetworkGraph, NodeId};
use crate::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
use crate::util::events::MessageSendEvent;
use crate::util::logger::{Level, Logger};
use crate::util::ser::Writeable;
@ -144,8 +145,32 @@ impl UtxoFuture {
}
/// Resolves this future against the given `graph` and with the given `result`.
pub fn resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
///
/// This is identical to calling [`UtxoFuture::resolve`] with a dummy `gossip`, disabling
/// forwarding the validated gossip message onwards to peers.
pub fn resolve_without_forwarding<L: Deref>(&self,
graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
where L::Target: Logger {
self.do_resolve(graph, result);
}
/// Resolves this future against the given `graph` and with the given `result`.
///
/// The given `gossip` is used to broadcast any validated messages onwards to all peers which
/// have available buffer space.
pub fn resolve<L: Deref, G: Deref<Target=NetworkGraph<L>>, U: Deref, GS: Deref<Target = P2PGossipSync<G, U, L>>>(&self,
graph: &NetworkGraph<L>, gossip: GS, result: Result<TxOut, UtxoLookupError>
) where L::Target: Logger, U::Target: UtxoLookup {
let mut res = self.do_resolve(graph, result);
for msg_opt in res.iter_mut() {
if let Some(msg) = msg_opt.take() {
gossip.forward_gossip_msg(msg);
}
}
}
fn do_resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
-> [Option<MessageSendEvent>; 5] where L::Target: Logger {
let (announcement, node_a, node_b, update_a, update_b) = {
let mut pending_checks = graph.pending_checks.internal.lock().unwrap();
let mut async_messages = self.state.lock().unwrap();
@ -155,7 +180,7 @@ impl UtxoFuture {
// `channel_announce` yet. That's okay, we can set the `complete` field which it will
// check once it gets control again.
async_messages.complete = Some(result);
return;
return [None, None, None, None, None];
}
let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() {
@ -172,6 +197,9 @@ impl UtxoFuture {
async_messages.latest_channel_update_b.take())
};
let mut res = [None, None, None, None, None];
let mut res_idx = 0;
// Now that we've updated our internal state, pass the pending messages back through the
// network graph with a different `UtxoLookup` which will resolve immediately.
// Note that we ignore errors as we don't disconnect peers anyway, so there's nothing to do
@ -179,7 +207,12 @@ impl UtxoFuture {
let resolver = UtxoResolver(result);
match announcement {
ChannelAnnouncement::Full(signed_msg) => {
let _ = graph.update_channel_from_announcement(&signed_msg, &Some(&resolver));
if graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)).is_ok() {
res[res_idx] = Some(MessageSendEvent::BroadcastChannelAnnouncement {
msg: signed_msg, update_msg: None,
});
res_idx += 1;
}
},
ChannelAnnouncement::Unsigned(msg) => {
let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver));
@ -189,7 +222,12 @@ impl UtxoFuture {
for announce in core::iter::once(node_a).chain(core::iter::once(node_b)) {
match announce {
Some(NodeAnnouncement::Full(signed_msg)) => {
let _ = graph.update_node_from_announcement(&signed_msg);
if graph.update_node_from_announcement(&signed_msg).is_ok() {
res[res_idx] = Some(MessageSendEvent::BroadcastNodeAnnouncement {
msg: signed_msg,
});
res_idx += 1;
}
},
Some(NodeAnnouncement::Unsigned(msg)) => {
let _ = graph.update_node_from_unsigned_announcement(&msg);
@ -201,7 +239,12 @@ impl UtxoFuture {
for update in core::iter::once(update_a).chain(core::iter::once(update_b)) {
match update {
Some(ChannelUpdate::Full(signed_msg)) => {
let _ = graph.update_channel(&signed_msg);
if graph.update_channel(&signed_msg).is_ok() {
res[res_idx] = Some(MessageSendEvent::BroadcastChannelUpdate {
msg: signed_msg,
});
res_idx += 1;
}
},
Some(ChannelUpdate::Unsigned(msg)) => {
let _ = graph.update_channel_unsigned(&msg);
@ -209,6 +252,8 @@ impl UtxoFuture {
None => {},
}
}
res
}
}