Destination in OnionMessenger::send_onion_message

OnionMessenger::send_onion_message takes an OnionMessagePath. This isn't
very useful as it requires finding a path manually. Instead, have the
method take a Destination and use OnionMessenger's MessageRouter to
construct the path. Later, this will allow for buffering messages where
the first node in the path isn't a direct connection.
This commit is contained in:
Jeffrey Czyz 2023-11-15 17:26:45 -06:00
parent 79f212b70a
commit 8412e8368c
No known key found for this signature in database
GPG key ID: 3A4E08275D5E96D2
3 changed files with 108 additions and 65 deletions

View file

@ -269,7 +269,9 @@ mod tests {
"Received an onion message with path_id None and a reply_path: Custom(TestCustomMessage)" "Received an onion message with path_id None and a reply_path: Custom(TestCustomMessage)"
.to_string())), Some(&1)); .to_string())), Some(&1));
assert_eq!(log_entries.get(&("lightning::onion_message::messenger".to_string(), assert_eq!(log_entries.get(&("lightning::onion_message::messenger".to_string(),
"Sending onion message when responding to Custom onion message with path_id None: TestCustomMessage".to_string())), Some(&1)); "Constructing onion message when responding to Custom onion message with path_id None: TestCustomMessage".to_string())), Some(&1));
assert_eq!(log_entries.get(&("lightning::onion_message::messenger".to_string(),
"Buffered onion message when responding to Custom onion message with path_id None".to_string())), Some(&1));
} }
let two_unblinded_hops_om = "\ let two_unblinded_hops_om = "\

View file

@ -206,7 +206,7 @@ fn one_unblinded_hop() {
intermediate_nodes: vec![], intermediate_nodes: vec![],
destination: Destination::Node(nodes[1].get_node_pk()), destination: Destination::Node(nodes[1].get_node_pk()),
}; };
nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap(); nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap();
nodes[1].custom_message_handler.expect_message(TestCustomMessage::Response); nodes[1].custom_message_handler.expect_message(TestCustomMessage::Response);
pass_along_path(&nodes); pass_along_path(&nodes);
} }
@ -220,7 +220,7 @@ fn two_unblinded_hops() {
intermediate_nodes: vec![nodes[1].get_node_pk()], intermediate_nodes: vec![nodes[1].get_node_pk()],
destination: Destination::Node(nodes[2].get_node_pk()), destination: Destination::Node(nodes[2].get_node_pk()),
}; };
nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap(); nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap();
nodes[2].custom_message_handler.expect_message(TestCustomMessage::Response); nodes[2].custom_message_handler.expect_message(TestCustomMessage::Response);
pass_along_path(&nodes); pass_along_path(&nodes);
} }
@ -236,7 +236,7 @@ fn one_blinded_hop() {
intermediate_nodes: vec![], intermediate_nodes: vec![],
destination: Destination::BlindedPath(blinded_path), destination: Destination::BlindedPath(blinded_path),
}; };
nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap(); nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap();
nodes[1].custom_message_handler.expect_message(TestCustomMessage::Response); nodes[1].custom_message_handler.expect_message(TestCustomMessage::Response);
pass_along_path(&nodes); pass_along_path(&nodes);
} }
@ -253,7 +253,7 @@ fn two_unblinded_two_blinded() {
destination: Destination::BlindedPath(blinded_path), destination: Destination::BlindedPath(blinded_path),
}; };
nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap(); nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap();
nodes[4].custom_message_handler.expect_message(TestCustomMessage::Response); nodes[4].custom_message_handler.expect_message(TestCustomMessage::Response);
pass_along_path(&nodes); pass_along_path(&nodes);
} }
@ -270,7 +270,7 @@ fn three_blinded_hops() {
destination: Destination::BlindedPath(blinded_path), destination: Destination::BlindedPath(blinded_path),
}; };
nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap(); nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap();
nodes[3].custom_message_handler.expect_message(TestCustomMessage::Response); nodes[3].custom_message_handler.expect_message(TestCustomMessage::Response);
pass_along_path(&nodes); pass_along_path(&nodes);
} }
@ -287,7 +287,7 @@ fn too_big_packet_error() {
intermediate_nodes: hops, intermediate_nodes: hops,
destination: Destination::Node(hop_node_id), destination: Destination::Node(hop_node_id),
}; };
let err = nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap_err(); let err = nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap_err();
assert_eq!(err, SendError::TooBigPacket); assert_eq!(err, SendError::TooBigPacket);
} }
@ -305,7 +305,7 @@ fn we_are_intro_node() {
destination: Destination::BlindedPath(blinded_path), destination: Destination::BlindedPath(blinded_path),
}; };
nodes[0].messenger.send_onion_message(path, test_msg.clone(), None).unwrap(); nodes[0].messenger.send_onion_message_using_path(path, test_msg.clone(), None).unwrap();
nodes[2].custom_message_handler.expect_message(TestCustomMessage::Response); nodes[2].custom_message_handler.expect_message(TestCustomMessage::Response);
pass_along_path(&nodes); pass_along_path(&nodes);
@ -315,7 +315,7 @@ fn we_are_intro_node() {
intermediate_nodes: vec![], intermediate_nodes: vec![],
destination: Destination::BlindedPath(blinded_path), destination: Destination::BlindedPath(blinded_path),
}; };
nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap(); nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap();
nodes[1].custom_message_handler.expect_message(TestCustomMessage::Response); nodes[1].custom_message_handler.expect_message(TestCustomMessage::Response);
nodes.remove(2); nodes.remove(2);
pass_along_path(&nodes); pass_along_path(&nodes);
@ -335,7 +335,7 @@ fn invalid_blinded_path_error() {
intermediate_nodes: vec![], intermediate_nodes: vec![],
destination: Destination::BlindedPath(blinded_path), destination: Destination::BlindedPath(blinded_path),
}; };
let err = nodes[0].messenger.send_onion_message(path, test_msg.clone(), None).unwrap_err(); let err = nodes[0].messenger.send_onion_message_using_path(path, test_msg.clone(), None).unwrap_err();
assert_eq!(err, SendError::TooFewBlindedHops); assert_eq!(err, SendError::TooFewBlindedHops);
} }
@ -351,7 +351,7 @@ fn reply_path() {
destination: Destination::Node(nodes[3].get_node_pk()), destination: Destination::Node(nodes[3].get_node_pk()),
}; };
let reply_path = BlindedPath::new_for_message(&[nodes[2].get_node_pk(), nodes[1].get_node_pk(), nodes[0].get_node_pk()], &*nodes[0].keys_manager, &secp_ctx).unwrap(); let reply_path = BlindedPath::new_for_message(&[nodes[2].get_node_pk(), nodes[1].get_node_pk(), nodes[0].get_node_pk()], &*nodes[0].keys_manager, &secp_ctx).unwrap();
nodes[0].messenger.send_onion_message(path, test_msg.clone(), Some(reply_path)).unwrap(); nodes[0].messenger.send_onion_message_using_path(path, test_msg.clone(), Some(reply_path)).unwrap();
nodes[3].custom_message_handler.expect_message(TestCustomMessage::Request); nodes[3].custom_message_handler.expect_message(TestCustomMessage::Request);
pass_along_path(&nodes); pass_along_path(&nodes);
// Make sure the last node successfully decoded the reply path. // Make sure the last node successfully decoded the reply path.
@ -367,7 +367,7 @@ fn reply_path() {
}; };
let reply_path = BlindedPath::new_for_message(&[nodes[2].get_node_pk(), nodes[1].get_node_pk(), nodes[0].get_node_pk()], &*nodes[0].keys_manager, &secp_ctx).unwrap(); let reply_path = BlindedPath::new_for_message(&[nodes[2].get_node_pk(), nodes[1].get_node_pk(), nodes[0].get_node_pk()], &*nodes[0].keys_manager, &secp_ctx).unwrap();
nodes[0].messenger.send_onion_message(path, test_msg, Some(reply_path)).unwrap(); nodes[0].messenger.send_onion_message_using_path(path, test_msg, Some(reply_path)).unwrap();
nodes[3].custom_message_handler.expect_message(TestCustomMessage::Request); nodes[3].custom_message_handler.expect_message(TestCustomMessage::Request);
pass_along_path(&nodes); pass_along_path(&nodes);
@ -399,7 +399,7 @@ fn invalid_custom_message_type() {
intermediate_nodes: vec![], intermediate_nodes: vec![],
destination: Destination::Node(nodes[1].get_node_pk()), destination: Destination::Node(nodes[1].get_node_pk()),
}; };
let err = nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap_err(); let err = nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap_err();
assert_eq!(err, SendError::InvalidMessage); assert_eq!(err, SendError::InvalidMessage);
} }
@ -412,9 +412,9 @@ fn peer_buffer_full() {
destination: Destination::Node(nodes[1].get_node_pk()), destination: Destination::Node(nodes[1].get_node_pk()),
}; };
for _ in 0..188 { // Based on MAX_PER_PEER_BUFFER_SIZE in OnionMessenger for _ in 0..188 { // Based on MAX_PER_PEER_BUFFER_SIZE in OnionMessenger
nodes[0].messenger.send_onion_message(path.clone(), test_msg.clone(), None).unwrap(); nodes[0].messenger.send_onion_message_using_path(path.clone(), test_msg.clone(), None).unwrap();
} }
let err = nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap_err(); let err = nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap_err();
assert_eq!(err, SendError::BufferFull); assert_eq!(err, SendError::BufferFull);
} }
@ -435,7 +435,7 @@ fn many_hops() {
intermediate_nodes, intermediate_nodes,
destination: Destination::Node(nodes[num_nodes-1].get_node_pk()), destination: Destination::Node(nodes[num_nodes-1].get_node_pk()),
}; };
nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap(); nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap();
nodes[num_nodes-1].custom_message_handler.expect_message(TestCustomMessage::Response); nodes[num_nodes-1].custom_message_handler.expect_message(TestCustomMessage::Response);
pass_along_path(&nodes); pass_along_path(&nodes);
} }

View file

@ -76,7 +76,14 @@ use crate::prelude::*;
/// # struct FakeMessageRouter {} /// # struct FakeMessageRouter {}
/// # impl MessageRouter for FakeMessageRouter { /// # impl MessageRouter for FakeMessageRouter {
/// # fn find_path(&self, sender: PublicKey, peers: Vec<PublicKey>, destination: Destination) -> Result<OnionMessagePath, ()> { /// # fn find_path(&self, sender: PublicKey, peers: Vec<PublicKey>, destination: Destination) -> Result<OnionMessagePath, ()> {
/// # unimplemented!() /// # let secp_ctx = Secp256k1::new();
/// # let node_secret = SecretKey::from_slice(&<Vec<u8>>::from_hex("0101010101010101010101010101010101010101010101010101010101010101").unwrap()[..]).unwrap();
/// # let hop_node_id1 = PublicKey::from_secret_key(&secp_ctx, &node_secret);
/// # let hop_node_id2 = hop_node_id1;
/// # Ok(OnionMessagePath {
/// # intermediate_nodes: vec![hop_node_id1, hop_node_id2],
/// # destination,
/// # })
/// # } /// # }
/// # } /// # }
/// # let seed = [42u8; 32]; /// # let seed = [42u8; 32];
@ -86,7 +93,7 @@ use crate::prelude::*;
/// # let node_secret = SecretKey::from_slice(&<Vec<u8>>::from_hex("0101010101010101010101010101010101010101010101010101010101010101").unwrap()[..]).unwrap(); /// # let node_secret = SecretKey::from_slice(&<Vec<u8>>::from_hex("0101010101010101010101010101010101010101010101010101010101010101").unwrap()[..]).unwrap();
/// # let secp_ctx = Secp256k1::new(); /// # let secp_ctx = Secp256k1::new();
/// # let hop_node_id1 = PublicKey::from_secret_key(&secp_ctx, &node_secret); /// # let hop_node_id1 = PublicKey::from_secret_key(&secp_ctx, &node_secret);
/// # let (hop_node_id2, hop_node_id3, hop_node_id4) = (hop_node_id1, hop_node_id1, hop_node_id1); /// # let (hop_node_id3, hop_node_id4) = (hop_node_id1, hop_node_id1);
/// # let destination_node_id = hop_node_id1; /// # let destination_node_id = hop_node_id1;
/// # let message_router = Arc::new(FakeMessageRouter {}); /// # let message_router = Arc::new(FakeMessageRouter {});
/// # let custom_message_handler = IgnoringMessageHandler {}; /// # let custom_message_handler = IgnoringMessageHandler {};
@ -113,13 +120,10 @@ use crate::prelude::*;
/// } /// }
/// } /// }
/// // Send a custom onion message to a node id. /// // Send a custom onion message to a node id.
/// let path = OnionMessagePath { /// let destination = Destination::Node(destination_node_id);
/// intermediate_nodes: vec![hop_node_id1, hop_node_id2],
/// destination: Destination::Node(destination_node_id),
/// };
/// let reply_path = None; /// let reply_path = None;
/// # let message = YourCustomMessage {}; /// # let message = YourCustomMessage {};
/// onion_messenger.send_onion_message(path, message, reply_path); /// onion_messenger.send_onion_message(message, destination, reply_path);
/// ///
/// // Create a blinded path to yourself, for someone to send an onion message to. /// // Create a blinded path to yourself, for someone to send an onion message to.
/// # let your_node_id = hop_node_id1; /// # let your_node_id = hop_node_id1;
@ -127,13 +131,10 @@ use crate::prelude::*;
/// let blinded_path = BlindedPath::new_for_message(&hops, &keys_manager, &secp_ctx).unwrap(); /// let blinded_path = BlindedPath::new_for_message(&hops, &keys_manager, &secp_ctx).unwrap();
/// ///
/// // Send a custom onion message to a blinded path. /// // Send a custom onion message to a blinded path.
/// let path = OnionMessagePath { /// let destination = Destination::BlindedPath(blinded_path);
/// intermediate_nodes: vec![hop_node_id1, hop_node_id2],
/// destination: Destination::BlindedPath(blinded_path),
/// };
/// let reply_path = None; /// let reply_path = None;
/// # let message = YourCustomMessage {}; /// # let message = YourCustomMessage {};
/// onion_messenger.send_onion_message(path, message, reply_path); /// onion_messenger.send_onion_message(message, destination, reply_path);
/// ``` /// ```
/// ///
/// [`InvoiceRequest`]: crate::offers::invoice_request::InvoiceRequest /// [`InvoiceRequest`]: crate::offers::invoice_request::InvoiceRequest
@ -304,6 +305,16 @@ impl Destination {
} }
} }
/// Result of successfully [sending an onion message].
///
/// [sending an onion message]: OnionMessenger::send_onion_message
#[derive(Debug, PartialEq, Eq)]
pub enum SendSuccess {
/// The message was buffered and will be sent once it is processed by
/// [`OnionMessageHandler::next_onion_message_for_peer`].
Buffered,
}
/// Errors that may occur when [sending an onion message]. /// Errors that may occur when [sending an onion message].
/// ///
/// [sending an onion message]: OnionMessenger::send_onion_message /// [sending an onion message]: OnionMessenger::send_onion_message
@ -319,6 +330,8 @@ pub enum SendError {
TooFewBlindedHops, TooFewBlindedHops,
/// Our next-hop peer was offline or does not support onion message forwarding. /// Our next-hop peer was offline or does not support onion message forwarding.
InvalidFirstHop, InvalidFirstHop,
/// A path from the sender to the destination could not be found by the [`MessageRouter`].
PathNotFound,
/// Onion message contents must have a TLV type >= 64. /// Onion message contents must have a TLV type >= 64.
InvalidMessage, InvalidMessage,
/// Our next-hop peer's buffer was full or our total outbound buffer was full. /// Our next-hop peer's buffer was full or our total outbound buffer was full.
@ -568,14 +581,63 @@ where
} }
} }
/// Sends an [`OnionMessage`] with the given `contents` for sending to the destination of /// Sends an [`OnionMessage`] with the given `contents` to `destination`.
/// `path`.
/// ///
/// See [`OnionMessenger`] for example usage. /// See [`OnionMessenger`] for example usage.
pub fn send_onion_message<T: OnionMessageContents>( pub fn send_onion_message<T: OnionMessageContents>(
&self, path: OnionMessagePath, contents: T, reply_path: Option<BlindedPath> &self, contents: T, destination: Destination, reply_path: Option<BlindedPath>
) -> Result<(), SendError> { ) -> Result<SendSuccess, SendError> {
log_trace!(self.logger, "Sending onion message: {:?}", contents); self.find_path_and_enqueue_onion_message(
contents, destination, reply_path, format_args!("")
)
}
fn find_path_and_enqueue_onion_message<T: OnionMessageContents>(
&self, contents: T, destination: Destination, reply_path: Option<BlindedPath>,
log_suffix: fmt::Arguments
) -> Result<SendSuccess, SendError> {
let result = self.find_path(destination)
.and_then(|path| self.enqueue_onion_message(path, contents, reply_path, log_suffix));
match result.as_ref() {
Err(SendError::GetNodeIdFailed) => {
log_warn!(self.logger, "Unable to retrieve node id {}", log_suffix);
},
Err(SendError::PathNotFound) => {
log_trace!(self.logger, "Failed to find path {}", log_suffix);
},
Err(e) => {
log_trace!(self.logger, "Failed sending onion message {}: {:?}", log_suffix, e);
},
Ok(SendSuccess::Buffered) => {
log_trace!(self.logger, "Buffered onion message {}", log_suffix);
},
}
result
}
fn find_path(&self, destination: Destination) -> Result<OnionMessagePath, SendError> {
let sender = self.node_signer
.get_node_id(Recipient::Node)
.map_err(|_| SendError::GetNodeIdFailed)?;
let peers = self.message_buffers.lock().unwrap()
.iter()
.filter(|(_, buffer)| matches!(buffer, OnionMessageBuffer::ConnectedPeer(_)))
.map(|(node_id, _)| *node_id)
.collect();
self.message_router
.find_path(sender, peers, destination)
.map_err(|_| SendError::PathNotFound)
}
fn enqueue_onion_message<T: OnionMessageContents>(
&self, path: OnionMessagePath, contents: T, reply_path: Option<BlindedPath>,
log_suffix: fmt::Arguments
) -> Result<SendSuccess, SendError> {
log_trace!(self.logger, "Constructing onion message {}: {:?}", log_suffix, contents);
let (first_node_id, onion_message) = create_onion_message( let (first_node_id, onion_message) = create_onion_message(
&self.entropy_source, &self.node_signer, &self.secp_ctx, path, contents, reply_path &self.entropy_source, &self.node_signer, &self.secp_ctx, path, contents, reply_path
@ -590,18 +652,25 @@ where
hash_map::Entry::Vacant(_) => Err(SendError::InvalidFirstHop), hash_map::Entry::Vacant(_) => Err(SendError::InvalidFirstHop),
hash_map::Entry::Occupied(mut e) => { hash_map::Entry::Occupied(mut e) => {
e.get_mut().enqueue_message(onion_message); e.get_mut().enqueue_message(onion_message);
Ok(()) Ok(SendSuccess::Buffered)
}, },
} }
} }
#[cfg(test)]
pub(super) fn send_onion_message_using_path<T: OnionMessageContents>(
&self, path: OnionMessagePath, contents: T, reply_path: Option<BlindedPath>
) -> Result<SendSuccess, SendError> {
self.enqueue_onion_message(path, contents, reply_path, format_args!(""))
}
fn handle_onion_message_response<T: OnionMessageContents>( fn handle_onion_message_response<T: OnionMessageContents>(
&self, response: Option<T>, reply_path: Option<BlindedPath>, log_suffix: fmt::Arguments &self, response: Option<T>, reply_path: Option<BlindedPath>, log_suffix: fmt::Arguments
) { ) {
if let Some(response) = response { if let Some(response) = response {
match reply_path { match reply_path {
Some(reply_path) => { Some(reply_path) => {
self.find_path_and_enqueue_onion_message( let _ = self.find_path_and_enqueue_onion_message(
response, Destination::BlindedPath(reply_path), None, log_suffix response, Destination::BlindedPath(reply_path), None, log_suffix
); );
}, },
@ -612,34 +681,6 @@ where
} }
} }
fn find_path_and_enqueue_onion_message<T: OnionMessageContents>(
&self, contents: T, destination: Destination, reply_path: Option<BlindedPath>,
log_suffix: fmt::Arguments
) {
let sender = match self.node_signer.get_node_id(Recipient::Node) {
Ok(node_id) => node_id,
Err(_) => {
log_warn!(self.logger, "Unable to retrieve node id {}", log_suffix);
return;
}
};
let peers = self.message_buffers.lock().unwrap().keys().copied().collect();
let path = match self.message_router.find_path(sender, peers, destination) {
Ok(path) => path,
Err(()) => {
log_trace!(self.logger, "Failed to find path {}", log_suffix);
return;
},
};
log_trace!(self.logger, "Sending onion message {}: {:?}", log_suffix, contents);
if let Err(e) = self.send_onion_message(path, contents, reply_path) {
log_trace!(self.logger, "Failed sending onion message {}: {:?}", log_suffix, e);
}
}
#[cfg(test)] #[cfg(test)]
pub(super) fn release_pending_msgs(&self) -> HashMap<PublicKey, VecDeque<OnionMessage>> { pub(super) fn release_pending_msgs(&self) -> HashMap<PublicKey, VecDeque<OnionMessage>> {
let mut message_buffers = self.message_buffers.lock().unwrap(); let mut message_buffers = self.message_buffers.lock().unwrap();
@ -790,7 +831,7 @@ where
let PendingOnionMessage { contents, destination, reply_path } = message; let PendingOnionMessage { contents, destination, reply_path } = message;
#[cfg(c_bindings)] #[cfg(c_bindings)]
let (contents, destination, reply_path) = message; let (contents, destination, reply_path) = message;
self.find_path_and_enqueue_onion_message( let _ = self.find_path_and_enqueue_onion_message(
contents, destination, reply_path, format_args!("when sending OffersMessage") contents, destination, reply_path, format_args!("when sending OffersMessage")
); );
} }
@ -801,7 +842,7 @@ where
let PendingOnionMessage { contents, destination, reply_path } = message; let PendingOnionMessage { contents, destination, reply_path } = message;
#[cfg(c_bindings)] #[cfg(c_bindings)]
let (contents, destination, reply_path) = message; let (contents, destination, reply_path) = message;
self.find_path_and_enqueue_onion_message( let _ = self.find_path_and_enqueue_onion_message(
contents, destination, reply_path, format_args!("when sending CustomMessage") contents, destination, reply_path, format_args!("when sending CustomMessage")
); );
} }