mirror of
https://github.com/lightningdevkit/rust-lightning.git
synced 2025-02-25 07:17:40 +01:00
Process channel_update
/node_announcement
async if needed
If we have a `channel_announcement` which is waiting on a UTXO lookup before we can process it, and we receive a `channel_update` or `node_announcement` for the same channel or a node which is a part of the channel, we have to wait until the lookup completes until we can decide if we want to accept the new message. Here, we store the new message in the pending lookup state and process it asynchronously like the original `channel_announcement`.
This commit is contained in:
parent
7388b6c1d7
commit
67c9c7f2ae
2 changed files with 188 additions and 6 deletions
|
@ -1298,8 +1298,13 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_node_from_announcement_intern(&self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>) -> Result<(), LightningError> {
|
fn update_node_from_announcement_intern(&self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>) -> Result<(), LightningError> {
|
||||||
match self.nodes.write().unwrap().get_mut(&msg.node_id) {
|
let mut nodes = self.nodes.write().unwrap();
|
||||||
None => Err(LightningError{err: "No existing channels for node_announcement".to_owned(), action: ErrorAction::IgnoreError}),
|
match nodes.get_mut(&msg.node_id) {
|
||||||
|
None => {
|
||||||
|
core::mem::drop(nodes);
|
||||||
|
self.pending_checks.check_hold_pending_node_announcement(msg, full_msg)?;
|
||||||
|
Err(LightningError{err: "No existing channels for node_announcement".to_owned(), action: ErrorAction::IgnoreError})
|
||||||
|
},
|
||||||
Some(node) => {
|
Some(node) => {
|
||||||
if let Some(node_info) = node.announcement_info.as_ref() {
|
if let Some(node_info) = node.announcement_info.as_ref() {
|
||||||
// The timestamp field is somewhat of a misnomer - the BOLTs use it to order
|
// The timestamp field is somewhat of a misnomer - the BOLTs use it to order
|
||||||
|
@ -1724,7 +1729,11 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
|
||||||
|
|
||||||
let mut channels = self.channels.write().unwrap();
|
let mut channels = self.channels.write().unwrap();
|
||||||
match channels.get_mut(&msg.short_channel_id) {
|
match channels.get_mut(&msg.short_channel_id) {
|
||||||
None => return Err(LightningError{err: "Couldn't find channel for update".to_owned(), action: ErrorAction::IgnoreError}),
|
None => {
|
||||||
|
core::mem::drop(channels);
|
||||||
|
self.pending_checks.check_hold_pending_channel_update(msg, full_msg)?;
|
||||||
|
return Err(LightningError{err: "Couldn't find channel for update".to_owned(), action: ErrorAction::IgnoreError});
|
||||||
|
},
|
||||||
Some(channel) => {
|
Some(channel) => {
|
||||||
if msg.htlc_maximum_msat > MAX_VALUE_MSAT {
|
if msg.htlc_maximum_msat > MAX_VALUE_MSAT {
|
||||||
return Err(LightningError{err:
|
return Err(LightningError{err:
|
||||||
|
|
|
@ -69,10 +69,48 @@ enum ChannelAnnouncement {
|
||||||
Full(msgs::ChannelAnnouncement),
|
Full(msgs::ChannelAnnouncement),
|
||||||
Unsigned(msgs::UnsignedChannelAnnouncement),
|
Unsigned(msgs::UnsignedChannelAnnouncement),
|
||||||
}
|
}
|
||||||
|
impl ChannelAnnouncement {
|
||||||
|
fn node_id_1(&self) -> &NodeId {
|
||||||
|
match self {
|
||||||
|
ChannelAnnouncement::Full(msg) => &msg.contents.node_id_1,
|
||||||
|
ChannelAnnouncement::Unsigned(msg) => &msg.node_id_1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum NodeAnnouncement {
|
||||||
|
Full(msgs::NodeAnnouncement),
|
||||||
|
Unsigned(msgs::UnsignedNodeAnnouncement),
|
||||||
|
}
|
||||||
|
impl NodeAnnouncement {
|
||||||
|
fn timestamp(&self) -> u32 {
|
||||||
|
match self {
|
||||||
|
NodeAnnouncement::Full(msg) => msg.contents.timestamp,
|
||||||
|
NodeAnnouncement::Unsigned(msg) => msg.timestamp,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum ChannelUpdate {
|
||||||
|
Full(msgs::ChannelUpdate),
|
||||||
|
Unsigned(msgs::UnsignedChannelUpdate),
|
||||||
|
}
|
||||||
|
impl ChannelUpdate {
|
||||||
|
fn timestamp(&self) -> u32 {
|
||||||
|
match self {
|
||||||
|
ChannelUpdate::Full(msg) => msg.contents.timestamp,
|
||||||
|
ChannelUpdate::Unsigned(msg) => msg.timestamp,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct UtxoMessages {
|
struct UtxoMessages {
|
||||||
complete: Option<Result<TxOut, UtxoLookupError>>,
|
complete: Option<Result<TxOut, UtxoLookupError>>,
|
||||||
channel_announce: Option<ChannelAnnouncement>,
|
channel_announce: Option<ChannelAnnouncement>,
|
||||||
|
latest_node_announce_a: Option<NodeAnnouncement>,
|
||||||
|
latest_node_announce_b: Option<NodeAnnouncement>,
|
||||||
|
latest_channel_update_a: Option<ChannelUpdate>,
|
||||||
|
latest_channel_update_b: Option<ChannelUpdate>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Represents a future resolution of a [`UtxoLookup::get_utxo`] query resolving async.
|
/// Represents a future resolution of a [`UtxoLookup::get_utxo`] query resolving async.
|
||||||
|
@ -98,13 +136,17 @@ impl UtxoFuture {
|
||||||
Self { state: Arc::new(Mutex::new(UtxoMessages {
|
Self { state: Arc::new(Mutex::new(UtxoMessages {
|
||||||
complete: None,
|
complete: None,
|
||||||
channel_announce: None,
|
channel_announce: None,
|
||||||
|
latest_node_announce_a: None,
|
||||||
|
latest_node_announce_b: None,
|
||||||
|
latest_channel_update_a: None,
|
||||||
|
latest_channel_update_b: None,
|
||||||
}))}
|
}))}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Resolves this future against the given `graph` and with the given `result`.
|
/// Resolves this future against the given `graph` and with the given `result`.
|
||||||
pub fn resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
|
pub fn resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
|
||||||
where L::Target: Logger {
|
where L::Target: Logger {
|
||||||
let announcement = {
|
let (announcement, node_a, node_b, update_a, update_b) = {
|
||||||
let mut pending_checks = graph.pending_checks.internal.lock().unwrap();
|
let mut pending_checks = graph.pending_checks.internal.lock().unwrap();
|
||||||
let mut async_messages = self.state.lock().unwrap();
|
let mut async_messages = self.state.lock().unwrap();
|
||||||
|
|
||||||
|
@ -115,6 +157,7 @@ impl UtxoFuture {
|
||||||
async_messages.complete = Some(result);
|
async_messages.complete = Some(result);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() {
|
let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() {
|
||||||
ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents,
|
ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents,
|
||||||
ChannelAnnouncement::Unsigned(msg) => &msg,
|
ChannelAnnouncement::Unsigned(msg) => &msg,
|
||||||
|
@ -122,7 +165,11 @@ impl UtxoFuture {
|
||||||
|
|
||||||
pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state));
|
pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state));
|
||||||
|
|
||||||
async_messages.channel_announce.take().unwrap()
|
(async_messages.channel_announce.take().unwrap(),
|
||||||
|
async_messages.latest_node_announce_a.take(),
|
||||||
|
async_messages.latest_node_announce_b.take(),
|
||||||
|
async_messages.latest_channel_update_a.take(),
|
||||||
|
async_messages.latest_channel_update_b.take())
|
||||||
};
|
};
|
||||||
|
|
||||||
// Now that we've updated our internal state, pass the pending messages back through the
|
// Now that we've updated our internal state, pass the pending messages back through the
|
||||||
|
@ -138,11 +185,36 @@ impl UtxoFuture {
|
||||||
let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver));
|
let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver));
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for announce in core::iter::once(node_a).chain(core::iter::once(node_b)) {
|
||||||
|
match announce {
|
||||||
|
Some(NodeAnnouncement::Full(signed_msg)) => {
|
||||||
|
let _ = graph.update_node_from_announcement(&signed_msg);
|
||||||
|
},
|
||||||
|
Some(NodeAnnouncement::Unsigned(msg)) => {
|
||||||
|
let _ = graph.update_node_from_unsigned_announcement(&msg);
|
||||||
|
},
|
||||||
|
None => {},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for update in core::iter::once(update_a).chain(core::iter::once(update_b)) {
|
||||||
|
match update {
|
||||||
|
Some(ChannelUpdate::Full(signed_msg)) => {
|
||||||
|
let _ = graph.update_channel(&signed_msg);
|
||||||
|
},
|
||||||
|
Some(ChannelUpdate::Unsigned(msg)) => {
|
||||||
|
let _ = graph.update_channel_unsigned(&msg);
|
||||||
|
},
|
||||||
|
None => {},
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct PendingChecksContext {
|
struct PendingChecksContext {
|
||||||
channels: HashMap<u64, Weak<Mutex<UtxoMessages>>>,
|
channels: HashMap<u64, Weak<Mutex<UtxoMessages>>>,
|
||||||
|
nodes: HashMap<NodeId, Vec<Weak<Mutex<UtxoMessages>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PendingChecksContext {
|
impl PendingChecksContext {
|
||||||
|
@ -154,6 +226,15 @@ impl PendingChecksContext {
|
||||||
e.remove();
|
e.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_1) {
|
||||||
|
e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
|
||||||
|
if e.get().is_empty() { e.remove(); }
|
||||||
|
}
|
||||||
|
if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_2) {
|
||||||
|
e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
|
||||||
|
if e.get().is_empty() { e.remove(); }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,10 +246,98 @@ pub(super) struct PendingChecks {
|
||||||
impl PendingChecks {
|
impl PendingChecks {
|
||||||
pub(super) fn new() -> Self {
|
pub(super) fn new() -> Self {
|
||||||
PendingChecks { internal: Mutex::new(PendingChecksContext {
|
PendingChecks { internal: Mutex::new(PendingChecksContext {
|
||||||
channels: HashMap::new(),
|
channels: HashMap::new(), nodes: HashMap::new(),
|
||||||
}) }
|
}) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Checks if there is a pending `channel_update` UTXO validation for the given channel,
|
||||||
|
/// and, if so, stores the channel message for handling later and returns an `Err`.
|
||||||
|
pub(super) fn check_hold_pending_channel_update(
|
||||||
|
&self, msg: &msgs::UnsignedChannelUpdate, full_msg: Option<&msgs::ChannelUpdate>
|
||||||
|
) -> Result<(), LightningError> {
|
||||||
|
let mut pending_checks = self.internal.lock().unwrap();
|
||||||
|
if let hash_map::Entry::Occupied(e) = pending_checks.channels.entry(msg.short_channel_id) {
|
||||||
|
let is_from_a = (msg.flags & 1) == 1;
|
||||||
|
match Weak::upgrade(e.get()) {
|
||||||
|
Some(msgs_ref) => {
|
||||||
|
let mut messages = msgs_ref.lock().unwrap();
|
||||||
|
let latest_update = if is_from_a {
|
||||||
|
&mut messages.latest_channel_update_a
|
||||||
|
} else {
|
||||||
|
&mut messages.latest_channel_update_b
|
||||||
|
};
|
||||||
|
if latest_update.is_none() || latest_update.as_ref().unwrap().timestamp() < msg.timestamp {
|
||||||
|
// If the messages we got has a higher timestamp, just blindly assume the
|
||||||
|
// signatures on the new message are correct and drop the old message. This
|
||||||
|
// may cause us to end up dropping valid `channel_update`s if a peer is
|
||||||
|
// malicious, but we should get the correct ones when the node updates them.
|
||||||
|
*latest_update = Some(
|
||||||
|
if let Some(msg) = full_msg { ChannelUpdate::Full(msg.clone()) }
|
||||||
|
else { ChannelUpdate::Unsigned(msg.clone()) });
|
||||||
|
}
|
||||||
|
return Err(LightningError {
|
||||||
|
err: "Awaiting channel_announcement validation to accept channel_update".to_owned(),
|
||||||
|
action: ErrorAction::IgnoreAndLog(Level::Gossip),
|
||||||
|
});
|
||||||
|
},
|
||||||
|
None => { e.remove(); },
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Checks if there is a pending `node_announcement` UTXO validation for a channel with the
|
||||||
|
/// given node and, if so, stores the channel message for handling later and returns an `Err`.
|
||||||
|
pub(super) fn check_hold_pending_node_announcement(
|
||||||
|
&self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>
|
||||||
|
) -> Result<(), LightningError> {
|
||||||
|
let mut pending_checks = self.internal.lock().unwrap();
|
||||||
|
if let hash_map::Entry::Occupied(mut e) = pending_checks.nodes.entry(msg.node_id) {
|
||||||
|
let mut found_at_least_one_chan = false;
|
||||||
|
e.get_mut().retain(|node_msgs| {
|
||||||
|
match Weak::upgrade(&node_msgs) {
|
||||||
|
Some(chan_mtx) => {
|
||||||
|
let mut chan_msgs = chan_mtx.lock().unwrap();
|
||||||
|
if let Some(chan_announce) = &chan_msgs.channel_announce {
|
||||||
|
let latest_announce =
|
||||||
|
if *chan_announce.node_id_1() == msg.node_id {
|
||||||
|
&mut chan_msgs.latest_node_announce_a
|
||||||
|
} else {
|
||||||
|
&mut chan_msgs.latest_node_announce_b
|
||||||
|
};
|
||||||
|
if latest_announce.is_none() ||
|
||||||
|
latest_announce.as_ref().unwrap().timestamp() < msg.timestamp
|
||||||
|
{
|
||||||
|
// If the messages we got has a higher timestamp, just blindly
|
||||||
|
// assume the signatures on the new message are correct and drop
|
||||||
|
// the old message. This may cause us to end up dropping valid
|
||||||
|
// `node_announcement`s if a peer is malicious, but we should get
|
||||||
|
// the correct ones when the node updates them.
|
||||||
|
*latest_announce = Some(
|
||||||
|
if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) }
|
||||||
|
else { NodeAnnouncement::Unsigned(msg.clone()) });
|
||||||
|
}
|
||||||
|
found_at_least_one_chan = true;
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
debug_assert!(false, "channel_announce is set before struct is added to node map");
|
||||||
|
false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None => false,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if e.get().is_empty() { e.remove(); }
|
||||||
|
if found_at_least_one_chan {
|
||||||
|
return Err(LightningError {
|
||||||
|
err: "Awaiting channel_announcement validation to accept node_announcement".to_owned(),
|
||||||
|
action: ErrorAction::IgnoreAndLog(Level::Gossip),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn check_replace_previous_entry(msg: &msgs::UnsignedChannelAnnouncement,
|
fn check_replace_previous_entry(msg: &msgs::UnsignedChannelAnnouncement,
|
||||||
full_msg: Option<&msgs::ChannelAnnouncement>, replacement: Option<Weak<Mutex<UtxoMessages>>>,
|
full_msg: Option<&msgs::ChannelAnnouncement>, replacement: Option<Weak<Mutex<UtxoMessages>>>,
|
||||||
pending_channels: &mut HashMap<u64, Weak<Mutex<UtxoMessages>>>
|
pending_channels: &mut HashMap<u64, Weak<Mutex<UtxoMessages>>>
|
||||||
|
@ -282,6 +451,10 @@ impl PendingChecks {
|
||||||
async_messages.channel_announce = Some(
|
async_messages.channel_announce = Some(
|
||||||
if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) }
|
if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) }
|
||||||
else { ChannelAnnouncement::Unsigned(msg.clone()) });
|
else { ChannelAnnouncement::Unsigned(msg.clone()) });
|
||||||
|
pending_checks.nodes.entry(msg.node_id_1)
|
||||||
|
.or_insert(Vec::new()).push(Arc::downgrade(&future.state));
|
||||||
|
pending_checks.nodes.entry(msg.node_id_2)
|
||||||
|
.or_insert(Vec::new()).push(Arc::downgrade(&future.state));
|
||||||
Err(LightningError {
|
Err(LightningError {
|
||||||
err: "Channel being checked async".to_owned(),
|
err: "Channel being checked async".to_owned(),
|
||||||
action: ErrorAction::IgnoreAndLog(Level::Gossip),
|
action: ErrorAction::IgnoreAndLog(Level::Gossip),
|
||||||
|
|
Loading…
Add table
Reference in a new issue