From 8164cb930714129cde4d3f671f6f0b82621285c0 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sat, 29 Apr 2023 22:31:24 +0000 Subject: [PATCH 1/5] Accept RPC responses with a `null` `result` This is actually a valid response in some cases, at least for the `gettxout` command, where `null` is returned if no corresponding UTXO was found, but the command otherwise succeeded. --- lightning-block-sync/src/rpc.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/lightning-block-sync/src/rpc.rs b/lightning-block-sync/src/rpc.rs index 4c4706cb1..c778279ae 100644 --- a/lightning-block-sync/src/rpc.rs +++ b/lightning-block-sync/src/rpc.rs @@ -105,12 +105,13 @@ impl RpcClient { return Err(std::io::Error::new(std::io::ErrorKind::Other, rpc_error)); } - let result = &mut response["result"]; - if result.is_null() { - return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON result")); - } + let result = match response.get_mut("result") { + Some(result) => result.take(), + None => + return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON result")), + }; - JsonResponse(result.take()).try_into() + JsonResponse(result).try_into() } } @@ -205,7 +206,7 @@ mod tests { #[tokio::test] async fn call_method_returning_missing_result() { - let response = serde_json::json!({ "result": null }); + let response = serde_json::json!({ }); let server = HttpServer::responding_with_ok(MessageBody::Content(response)); let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); From 01857b51a1e798ef344af845351b1c05c1c8a677 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sat, 29 Apr 2023 22:32:57 +0000 Subject: [PATCH 2/5] Implement the `UtxoSource` interface for REST/RPC clients In LDK, we expect users operating nodes on the public network to implement the `UtxoSource` interface in order to validate the gossip they receive from the network. Sadly, because the DoS attack of flooding a node's gossip store isn't a common issue, and because we do not provide an implementation off-the-shelf to make doing so easily, many of our downstream users do not have a `UtxoSource` implementation. In order to change that, here we implement an async `UtxoSource` in the `lightning-block-sync` crate, providing one for users who sync the chain from Bitcoin Core's RPC or REST interfaces. --- lightning-block-sync/src/convert.rs | 57 ++++++++ lightning-block-sync/src/gossip.rs | 201 ++++++++++++++++++++++++++++ lightning-block-sync/src/lib.rs | 2 + lightning-block-sync/src/rest.rs | 50 +++++++ lightning-block-sync/src/rpc.rs | 44 ++++++ 5 files changed, 354 insertions(+) create mode 100644 lightning-block-sync/src/gossip.rs diff --git a/lightning-block-sync/src/convert.rs b/lightning-block-sync/src/convert.rs index d6294e1d2..bf9e95776 100644 --- a/lightning-block-sync/src/convert.rs +++ b/lightning-block-sync/src/convert.rs @@ -13,8 +13,14 @@ use serde_json; use std::convert::From; use std::convert::TryFrom; use std::convert::TryInto; +use std::str::FromStr; use bitcoin::hashes::Hash; +impl TryInto for JsonResponse { + type Error = std::io::Error; + fn try_into(self) -> Result { Ok(self.0) } +} + /// Conversion from `std::io::Error` into `BlockSourceError`. impl From for BlockSourceError { fn from(e: std::io::Error) -> BlockSourceError { @@ -38,6 +44,17 @@ impl TryInto for BinaryResponse { } } +/// Parses binary data as a block hash. +impl TryInto for BinaryResponse { + type Error = std::io::Error; + + fn try_into(self) -> std::io::Result { + BlockHash::from_slice(&self.0).map_err(|_| + std::io::Error::new(std::io::ErrorKind::InvalidData, "bad block hash length") + ) + } +} + /// Converts a JSON value into block header data. The JSON value may be an object representing a /// block header or an array of such objects. In the latter case, the first object is converted. impl TryInto for JsonResponse { @@ -226,6 +243,46 @@ impl TryInto for JsonResponse { } } +impl TryInto for JsonResponse { + type Error = std::io::Error; + + fn try_into(self) -> std::io::Result { + match self.0.as_str() { + None => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON string")), + Some(hex_data) if hex_data.len() != 64 => + Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid hash length")), + Some(hex_data) => BlockHash::from_str(hex_data) + .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid hex data")), + } + } +} + +/// The REST `getutxos` endpoint retuns a whole pile of data we don't care about and one bit we do +/// - whether the `hit bitmap` field had any entries. Thus we condense the result down into only +/// that. +pub(crate) struct GetUtxosResponse { + pub(crate) hit_bitmap_nonempty: bool +} + +impl TryInto for JsonResponse { + type Error = std::io::Error; + + fn try_into(self) -> std::io::Result { + let bitmap_str = + self.0.as_object().ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected an object"))? + .get("bitmap").ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "missing bitmap field"))? + .as_str().ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "bitmap should be an str"))?; + let mut hit_bitmap_nonempty = false; + for c in bitmap_str.chars() { + if c < '0' || c > '9' { + return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid byte")); + } + if c > '0' { hit_bitmap_nonempty = true; } + } + Ok(GetUtxosResponse { hit_bitmap_nonempty }) + } +} + #[cfg(test)] pub(crate) mod tests { use super::*; diff --git a/lightning-block-sync/src/gossip.rs b/lightning-block-sync/src/gossip.rs new file mode 100644 index 000000000..3eb7ad4ae --- /dev/null +++ b/lightning-block-sync/src/gossip.rs @@ -0,0 +1,201 @@ +//! When fetching gossip from peers, lightning nodes need to validate that gossip against the +//! current UTXO set. This module defines an implementation of the LDK API required to do so +//! against a [`BlockSource`] which implements a few additional methods for accessing the UTXO set. + +use crate::{AsyncBlockSourceResult, BlockData, BlockSource}; + +use bitcoin::blockdata::transaction::{TxOut, OutPoint}; +use bitcoin::hash_types::BlockHash; + +use lightning::sign::NodeSigner; + +use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor}; +use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler}; + +use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; +use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoResult, UtxoLookupError}; + +use lightning::util::logger::Logger; + +use std::sync::Arc; +use std::future::Future; +use std::ops::Deref; + +/// A trait which extends [`BlockSource`] and can be queried to fetch the block at a given height +/// as well as whether a given output is unspent (i.e. a member of the current UTXO set). +/// +/// Note that while this is implementable for a [`BlockSource`] which returns filtered block data +/// (i.e. [`BlockData::HeaderOnly`] for [`BlockSource::get_block`] requests), such an +/// implementation will reject all gossip as it is not fully able to verify the UTXOs referenced. +/// +/// For efficiency, an implementation may consider caching some set of blocks, as many redundant +/// calls may be made. +pub trait UtxoSource : BlockSource + 'static { + /// Fetches the block hash of the block at the given height. + /// + /// This will, in turn, be passed to to [`BlockSource::get_block`] to fetch the block needed + /// for gossip validation. + fn get_block_hash_by_height<'a>(&'a self, block_height: u32) -> AsyncBlockSourceResult<'a, BlockHash>; + + /// Returns true if the given output has *not* been spent, i.e. is a member of the current UTXO + /// set. + fn is_output_unspent<'a>(&'a self, outpoint: OutPoint) -> AsyncBlockSourceResult<'a, bool>; +} + +/// A generic trait which is able to spawn futures in the background. +/// +/// If the `tokio` feature is enabled, this is implemented on `TokioSpawner` struct which +/// delegates to `tokio::spawn()`. +pub trait FutureSpawner : Send + Sync + 'static { + /// Spawns the given future as a background task. + /// + /// This method MUST NOT block on the given future immediately. + fn spawn + Send + 'static>(&self, future: T); +} + +#[cfg(feature = "tokio")] +/// A trivial [`FutureSpawner`] which delegates to `tokio::spawn`. +pub struct TokioSpawner; +#[cfg(feature = "tokio")] +impl FutureSpawner for TokioSpawner { + fn spawn + Send + 'static>(&self, future: T) { + tokio::spawn(future); + } +} + +/// A struct which wraps a [`UtxoSource`] and a few LDK objects and implements the LDK +/// [`UtxoLookup`] trait. +/// +/// Note that if you're using this against a Bitcoin Core REST or RPC server, you likely wish to +/// increase the `rpcworkqueue` setting in Bitcoin Core as LDK attempts to parallelize requests (a +/// value of 1024 should more than suffice), and ensure you have sufficient file descriptors +/// available on both Bitcoin Core and your LDK application for each request to hold its own +/// connection. +pub struct GossipVerifier where + Blocks::Target: UtxoSource, + L::Target: Logger, + CM::Target: ChannelMessageHandler, + OM::Target: OnionMessageHandler, + CMH::Target: CustomMessageHandler, + NS::Target: NodeSigner, +{ + source: Blocks, + peer_manager: Arc>, Self, L>>, OM, L, CMH, NS>>, + gossiper: Arc>, Self, L>>, + spawn: S, +} + +impl GossipVerifier where + Blocks::Target: UtxoSource, + L::Target: Logger, + CM::Target: ChannelMessageHandler, + OM::Target: OnionMessageHandler, + CMH::Target: CustomMessageHandler, + NS::Target: NodeSigner, +{ + /// Constructs a new [`GossipVerifier`]. + /// + /// This is expected to be given to a [`P2PGossipSync`] (initially constructed with `None` for + /// the UTXO lookup) via [`P2PGossipSync::add_utxo_lookup`]. + pub fn new(source: Blocks, spawn: S, gossiper: Arc>, Self, L>>, peer_manager: Arc>, Self, L>>, OM, L, CMH, NS>>) -> Self { + Self { source, spawn, gossiper, peer_manager } + } + + async fn retrieve_utxo(source: Blocks, short_channel_id: u64) -> Result { + let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes + let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32; + let output_index = (short_channel_id & 0xffff) as u16; + + let block_hash = source.get_block_hash_by_height(block_height).await + .map_err(|_| UtxoLookupError::UnknownTx)?; + let block_data = source.get_block(&block_hash).await + .map_err(|_| UtxoLookupError::UnknownTx)?; + let mut block = match block_data { + BlockData::HeaderOnly(_) => return Err(UtxoLookupError::UnknownTx), + BlockData::FullBlock(block) => block, + }; + if transaction_index as usize >= block.txdata.len() { + return Err(UtxoLookupError::UnknownTx); + } + let mut transaction = block.txdata.swap_remove(transaction_index as usize); + if output_index as usize >= transaction.output.len() { + return Err(UtxoLookupError::UnknownTx); + } + let outpoint_unspent = + source.is_output_unspent(OutPoint::new(transaction.txid(), output_index.into())).await + .map_err(|_| UtxoLookupError::UnknownTx)?; + if outpoint_unspent { + Ok(transaction.output.swap_remove(output_index as usize)) + } else { + Err(UtxoLookupError::UnknownTx) + } + } +} + +impl Deref for GossipVerifier where + Blocks::Target: UtxoSource, + L::Target: Logger, + CM::Target: ChannelMessageHandler, + OM::Target: OnionMessageHandler, + CMH::Target: CustomMessageHandler, + NS::Target: NodeSigner, +{ + type Target = Self; + fn deref(&self) -> &Self { self } +} + + +impl UtxoLookup for GossipVerifier where + Blocks::Target: UtxoSource, + L::Target: Logger, + CM::Target: ChannelMessageHandler, + OM::Target: OnionMessageHandler, + CMH::Target: CustomMessageHandler, + NS::Target: NodeSigner, +{ + fn get_utxo(&self, _genesis_hash: &BlockHash, short_channel_id: u64) -> UtxoResult { + let res = UtxoFuture::new(); + let fut = res.clone(); + let source = self.source.clone(); + let gossiper = Arc::clone(&self.gossiper); + let pm = Arc::clone(&self.peer_manager); + self.spawn.spawn(async move { + let res = Self::retrieve_utxo(source, short_channel_id).await; + fut.resolve(gossiper.network_graph(), &*gossiper, res); + pm.process_events(); + }); + UtxoResult::Async(res) + } +} diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 5c7c0dee8..3561a1b5d 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -28,6 +28,8 @@ pub mod http; pub mod init; pub mod poll; +pub mod gossip; + #[cfg(feature = "rest-client")] pub mod rest; diff --git a/lightning-block-sync/src/rest.rs b/lightning-block-sync/src/rest.rs index 430089301..5690da12e 100644 --- a/lightning-block-sync/src/rest.rs +++ b/lightning-block-sync/src/rest.rs @@ -3,7 +3,10 @@ use crate::{BlockData, BlockHeaderData, BlockSource, AsyncBlockSourceResult}; use crate::http::{BinaryResponse, HttpEndpoint, HttpClient, JsonResponse}; +use crate::gossip::UtxoSource; +use crate::convert::GetUtxosResponse; +use bitcoin::OutPoint; use bitcoin::hash_types::BlockHash; use bitcoin::hashes::hex::ToHex; @@ -60,11 +63,30 @@ impl BlockSource for RestClient { } } +impl UtxoSource for RestClient { + fn get_block_hash_by_height<'a>(&'a self, block_height: u32) -> AsyncBlockSourceResult<'a, BlockHash> { + Box::pin(async move { + let resource_path = format!("blockhashbyheight/{}.bin", block_height); + Ok(self.request_resource::(&resource_path).await?) + }) + } + + fn is_output_unspent<'a>(&'a self, outpoint: OutPoint) -> AsyncBlockSourceResult<'a, bool> { + Box::pin(async move { + let resource_path = format!("getutxos/{}-{}.json", outpoint.txid.to_hex(), outpoint.vout); + let utxo_result = + self.request_resource::(&resource_path).await?; + Ok(utxo_result.hit_bitmap_nonempty) + }) + } +} + #[cfg(test)] mod tests { use super::*; use crate::http::BinaryResponse; use crate::http::client_tests::{HttpServer, MessageBody}; + use bitcoin::hashes::Hash; /// Parses binary data as a string-encoded `u32`. impl TryInto for BinaryResponse { @@ -113,4 +135,32 @@ mod tests { Ok(n) => assert_eq!(n, 42), } } + + #[tokio::test] + async fn parses_negative_getutxos() { + let server = HttpServer::responding_with_ok(MessageBody::Content( + // A real response contains a few more fields, but we actually only look at the + // "bitmap" field, so this should suffice for testing + "{\"chainHeight\": 1, \"bitmap\":\"0\",\"utxos\":[]}" + )); + let client = RestClient::new(server.endpoint()).unwrap(); + + let outpoint = OutPoint::new(bitcoin::Txid::from_inner([0; 32]), 0); + let unspent_output = client.is_output_unspent(outpoint).await.unwrap(); + assert_eq!(unspent_output, false); + } + + #[tokio::test] + async fn parses_positive_getutxos() { + let server = HttpServer::responding_with_ok(MessageBody::Content( + // A real response contains lots more data, but we actually only look at the "bitmap" + // field, so this should suffice for testing + "{\"chainHeight\": 1, \"bitmap\":\"1\",\"utxos\":[]}" + )); + let client = RestClient::new(server.endpoint()).unwrap(); + + let outpoint = OutPoint::new(bitcoin::Txid::from_inner([0; 32]), 0); + let unspent_output = client.is_output_unspent(outpoint).await.unwrap(); + assert_eq!(unspent_output, true); + } } diff --git a/lightning-block-sync/src/rpc.rs b/lightning-block-sync/src/rpc.rs index c778279ae..0ad94040a 100644 --- a/lightning-block-sync/src/rpc.rs +++ b/lightning-block-sync/src/rpc.rs @@ -3,9 +3,11 @@ use crate::{BlockData, BlockHeaderData, BlockSource, AsyncBlockSourceResult}; use crate::http::{HttpClient, HttpEndpoint, HttpError, JsonResponse}; +use crate::gossip::UtxoSource; use bitcoin::hash_types::BlockHash; use bitcoin::hashes::hex::ToHex; +use bitcoin::OutPoint; use std::sync::Mutex; @@ -138,11 +140,33 @@ impl BlockSource for RpcClient { } } +impl UtxoSource for RpcClient { + fn get_block_hash_by_height<'a>(&'a self, block_height: u32) -> AsyncBlockSourceResult<'a, BlockHash> { + Box::pin(async move { + let height_param = serde_json::json!(block_height); + Ok(self.call_method("getblockhash", &[height_param]).await?) + }) + } + + fn is_output_unspent<'a>(&'a self, outpoint: OutPoint) -> AsyncBlockSourceResult<'a, bool> { + Box::pin(async move { + let txid_param = serde_json::json!(outpoint.txid.to_hex()); + let vout_param = serde_json::json!(outpoint.vout); + let include_mempool = serde_json::json!(false); + let utxo_opt: serde_json::Value = self.call_method( + "gettxout", &[txid_param, vout_param, include_mempool]).await?; + Ok(!utxo_opt.is_null()) + }) + } +} + #[cfg(test)] mod tests { use super::*; use crate::http::client_tests::{HttpServer, MessageBody}; + use bitcoin::hashes::Hash; + /// Credentials encoded in base64. const CREDENTIALS: &'static str = "dXNlcjpwYXNzd29yZA=="; @@ -245,4 +269,24 @@ mod tests { Ok(count) => assert_eq!(count, 654470), } } + + #[tokio::test] + async fn fails_to_fetch_spent_utxo() { + let response = serde_json::json!({ "result": null }); + let server = HttpServer::responding_with_ok(MessageBody::Content(response)); + let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + let outpoint = OutPoint::new(bitcoin::Txid::from_inner([0; 32]), 0); + let unspent_output = client.is_output_unspent(outpoint).await.unwrap(); + assert_eq!(unspent_output, false); + } + + #[tokio::test] + async fn fetches_utxo() { + let response = serde_json::json!({ "result": {"bestblock": 1, "confirmations": 42}}); + let server = HttpServer::responding_with_ok(MessageBody::Content(response)); + let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + let outpoint = OutPoint::new(bitcoin::Txid::from_inner([0; 32]), 0); + let unspent_output = client.is_output_unspent(outpoint).await.unwrap(); + assert_eq!(unspent_output, true); + } } From b315856e686dd9e72a7ff93616f314ca6a037b56 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 30 Apr 2023 00:48:57 +0000 Subject: [PATCH 3/5] Make the `P2PGossipSync` `UtxoLookup` exchangable without &mut self Because a `UtxoLookup` implementation is likely to need a reference to the `PeerManager` which contains a reference to the `P2PGossipSync`, it is likely to be impossible to get a mutable reference to the `P2PGossipSync` by the time we want to add a `UtxoLookup` without a ton of boilerplate and trait wrapping. Instead, we simply place the `UtxoLookup` in a `RwLock`, allowing us to modify it without a mutable self reference. The lifetime bounds updates in tests required in this commit are entirely unclear to me, but do allow tests to continue building, so somehow make rustc happier. --- lightning/src/ln/functional_test_utils.rs | 12 ++++++------ lightning/src/routing/gossip.rs | 10 +++++----- lightning/src/routing/router.rs | 2 +- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index 84bc1a1b3..5ce40f057 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -561,11 +561,11 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> { } } -pub fn create_chan_between_nodes<'a, 'b, 'c, 'd>(node_a: &'a Node<'b, 'c, 'd>, node_b: &'a Node<'b, 'c, 'd>) -> (msgs::ChannelAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction) { +pub fn create_chan_between_nodes<'a, 'b, 'c: 'd, 'd>(node_a: &'a Node<'b, 'c, 'd>, node_b: &'a Node<'b, 'c, 'd>) -> (msgs::ChannelAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction) { create_chan_between_nodes_with_value(node_a, node_b, 100000, 10001) } -pub fn create_chan_between_nodes_with_value<'a, 'b, 'c, 'd>(node_a: &'a Node<'b, 'c, 'd>, node_b: &'a Node<'b, 'c, 'd>, channel_value: u64, push_msat: u64) -> (msgs::ChannelAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction) { +pub fn create_chan_between_nodes_with_value<'a, 'b, 'c: 'd, 'd>(node_a: &'a Node<'b, 'c, 'd>, node_b: &'a Node<'b, 'c, 'd>, channel_value: u64, push_msat: u64) -> (msgs::ChannelAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction) { let (channel_ready, channel_id, tx) = create_chan_between_nodes_with_value_a(node_a, node_b, channel_value, push_msat); let (announcement, as_update, bs_update) = create_chan_between_nodes_with_value_b(node_a, node_b, &channel_ready); (announcement, as_update, bs_update, channel_id, tx) @@ -1169,7 +1169,7 @@ pub fn create_chan_between_nodes_with_value_confirm_second<'a, 'b, 'c>(node_recv }), channel_id) } -pub fn create_chan_between_nodes_with_value_confirm<'a, 'b, 'c, 'd>(node_a: &'a Node<'b, 'c, 'd>, node_b: &'a Node<'b, 'c, 'd>, tx: &Transaction) -> ((msgs::ChannelReady, msgs::AnnouncementSignatures), [u8; 32]) { +pub fn create_chan_between_nodes_with_value_confirm<'a, 'b, 'c: 'd, 'd>(node_a: &'a Node<'b, 'c, 'd>, node_b: &'a Node<'b, 'c, 'd>, tx: &Transaction) -> ((msgs::ChannelReady, msgs::AnnouncementSignatures), [u8; 32]) { let conf_height = core::cmp::max(node_a.best_block_info().1 + 1, node_b.best_block_info().1 + 1); create_chan_between_nodes_with_value_confirm_first(node_a, node_b, tx, conf_height); confirm_transaction_at(node_a, tx, conf_height); @@ -1178,7 +1178,7 @@ pub fn create_chan_between_nodes_with_value_confirm<'a, 'b, 'c, 'd>(node_a: &'a create_chan_between_nodes_with_value_confirm_second(node_b, node_a) } -pub fn create_chan_between_nodes_with_value_a<'a, 'b, 'c, 'd>(node_a: &'a Node<'b, 'c, 'd>, node_b: &'a Node<'b, 'c, 'd>, channel_value: u64, push_msat: u64) -> ((msgs::ChannelReady, msgs::AnnouncementSignatures), [u8; 32], Transaction) { +pub fn create_chan_between_nodes_with_value_a<'a, 'b, 'c: 'd, 'd>(node_a: &'a Node<'b, 'c, 'd>, node_b: &'a Node<'b, 'c, 'd>, channel_value: u64, push_msat: u64) -> ((msgs::ChannelReady, msgs::AnnouncementSignatures), [u8; 32], Transaction) { let tx = create_chan_between_nodes_with_value_init(node_a, node_b, channel_value, push_msat); let (msgs, chan_id) = create_chan_between_nodes_with_value_confirm(node_a, node_b, &tx); (msgs, chan_id, tx) @@ -1218,11 +1218,11 @@ pub fn create_chan_between_nodes_with_value_b<'a, 'b, 'c>(node_a: &Node<'a, 'b, ((*announcement).clone(), as_update, bs_update) } -pub fn create_announced_chan_between_nodes<'a, 'b, 'c, 'd>(nodes: &'a Vec>, a: usize, b: usize) -> (msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction) { +pub fn create_announced_chan_between_nodes<'a, 'b, 'c: 'd, 'd>(nodes: &'a Vec>, a: usize, b: usize) -> (msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction) { create_announced_chan_between_nodes_with_value(nodes, a, b, 100000, 10001) } -pub fn create_announced_chan_between_nodes_with_value<'a, 'b, 'c, 'd>(nodes: &'a Vec>, a: usize, b: usize, channel_value: u64, push_msat: u64) -> (msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction) { +pub fn create_announced_chan_between_nodes_with_value<'a, 'b, 'c: 'd, 'd>(nodes: &'a Vec>, a: usize, b: usize, channel_value: u64, push_msat: u64) -> (msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction) { let chan_announcement = create_chan_between_nodes_with_value(&nodes[a], &nodes[b], channel_value, push_msat); update_nodes_with_chan_announce(nodes, a, b, &chan_announcement.0, &chan_announcement.1, &chan_announcement.2); (chan_announcement.1, chan_announcement.2, chan_announcement.3, chan_announcement.4) diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index b9b70e0a0..90b7bb0ed 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -254,7 +254,7 @@ pub struct P2PGossipSync>, U: Deref, L: Deref> where U::Target: UtxoLookup, L::Target: Logger { network_graph: G, - utxo_lookup: Option, + utxo_lookup: RwLock>, #[cfg(feature = "std")] full_syncs_requested: AtomicUsize, pending_events: Mutex>, @@ -273,7 +273,7 @@ where U::Target: UtxoLookup, L::Target: Logger network_graph, #[cfg(feature = "std")] full_syncs_requested: AtomicUsize::new(0), - utxo_lookup, + utxo_lookup: RwLock::new(utxo_lookup), pending_events: Mutex::new(vec![]), logger, } @@ -282,8 +282,8 @@ where U::Target: UtxoLookup, L::Target: Logger /// Adds a provider used to check new announcements. Does not affect /// existing announcements unless they are updated. /// Add, update or remove the provider would replace the current one. - pub fn add_utxo_lookup(&mut self, utxo_lookup: Option) { - self.utxo_lookup = utxo_lookup; + pub fn add_utxo_lookup(&self, utxo_lookup: Option) { + *self.utxo_lookup.write().unwrap() = utxo_lookup; } /// Gets a reference to the underlying [`NetworkGraph`] which was provided in @@ -443,7 +443,7 @@ where U::Target: UtxoLookup, L::Target: Logger } fn handle_channel_announcement(&self, msg: &msgs::ChannelAnnouncement) -> Result { - self.network_graph.update_channel_from_announcement(msg, &self.utxo_lookup)?; + self.network_graph.update_channel_from_announcement(msg, &*self.utxo_lookup.read().unwrap())?; Ok(msg.contents.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY) } diff --git a/lightning/src/routing/router.rs b/lightning/src/routing/router.rs index 3419f122e..7abca061f 100644 --- a/lightning/src/routing/router.rs +++ b/lightning/src/routing/router.rs @@ -3897,7 +3897,7 @@ mod tests { fn available_amount_while_routing_test() { // Tests whether we choose the correct available channel amount while routing. - let (secp_ctx, network_graph, mut gossip_sync, chain_monitor, logger) = build_graph(); + let (secp_ctx, network_graph, gossip_sync, chain_monitor, logger) = build_graph(); let (our_privkey, our_id, privkeys, nodes) = get_nodes(&secp_ctx); let scorer = ln_test_utils::TestScorer::new(); let keys_manager = ln_test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet); From 3482fceeab251b2534bf0b7d0ff882db37409d40 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 30 Apr 2023 02:06:19 +0000 Subject: [PATCH 4/5] Add a simple naive block cache in gossip sync lookups --- lightning-block-sync/src/gossip.rs | 95 ++++++++++++++++++++++-------- 1 file changed, 70 insertions(+), 25 deletions(-) diff --git a/lightning-block-sync/src/gossip.rs b/lightning-block-sync/src/gossip.rs index 3eb7ad4ae..4e66c0ce9 100644 --- a/lightning-block-sync/src/gossip.rs +++ b/lightning-block-sync/src/gossip.rs @@ -4,6 +4,7 @@ use crate::{AsyncBlockSourceResult, BlockData, BlockSource}; +use bitcoin::blockdata::block::Block; use bitcoin::blockdata::transaction::{TxOut, OutPoint}; use bitcoin::hash_types::BlockHash; @@ -17,7 +18,8 @@ use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoResult, UtxoLookupErr use lightning::util::logger::Logger; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use std::collections::VecDeque; use std::future::Future; use std::ops::Deref; @@ -27,9 +29,6 @@ use std::ops::Deref; /// Note that while this is implementable for a [`BlockSource`] which returns filtered block data /// (i.e. [`BlockData::HeaderOnly`] for [`BlockSource::get_block`] requests), such an /// implementation will reject all gossip as it is not fully able to verify the UTXOs referenced. -/// -/// For efficiency, an implementation may consider caching some set of blocks, as many redundant -/// calls may be made. pub trait UtxoSource : BlockSource + 'static { /// Fetches the block hash of the block at the given height. /// @@ -91,8 +90,11 @@ pub struct GossipVerifier>, Self, L>>, OM, L, CMH, NS>>, gossiper: Arc>, Self, L>>, spawn: S, + block_cache: Arc>>, } +const BLOCK_CACHE_SIZE: usize = 5; + impl>, Self, L>>, peer_manager: Arc>, Self, L>>, OM, L, CMH, NS>>) -> Self { - Self { source, spawn, gossiper, peer_manager } + Self { + source, spawn, gossiper, peer_manager, + block_cache: Arc::new(Mutex::new(VecDeque::with_capacity(BLOCK_CACHE_SIZE))), + } } - async fn retrieve_utxo(source: Blocks, short_channel_id: u64) -> Result { + async fn retrieve_utxo( + source: Blocks, block_cache: Arc>>, short_channel_id: u64 + ) -> Result { let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32; let output_index = (short_channel_id & 0xffff) as u16; - let block_hash = source.get_block_hash_by_height(block_height).await - .map_err(|_| UtxoLookupError::UnknownTx)?; - let block_data = source.get_block(&block_hash).await - .map_err(|_| UtxoLookupError::UnknownTx)?; - let mut block = match block_data { - BlockData::HeaderOnly(_) => return Err(UtxoLookupError::UnknownTx), - BlockData::FullBlock(block) => block, - }; - if transaction_index as usize >= block.txdata.len() { - return Err(UtxoLookupError::UnknownTx); - } - let mut transaction = block.txdata.swap_remove(transaction_index as usize); - if output_index as usize >= transaction.output.len() { - return Err(UtxoLookupError::UnknownTx); - } - let outpoint_unspent = - source.is_output_unspent(OutPoint::new(transaction.txid(), output_index.into())).await + let (outpoint, output); + + 'tx_found: loop { // Used as a simple goto + macro_rules! process_block { + ($block: expr) => { { + if transaction_index as usize >= $block.txdata.len() { + return Err(UtxoLookupError::UnknownTx); + } + let transaction = &$block.txdata[transaction_index as usize]; + if output_index as usize >= transaction.output.len() { + return Err(UtxoLookupError::UnknownTx); + } + + outpoint = OutPoint::new(transaction.txid(), output_index.into()); + output = transaction.output[output_index as usize].clone(); + } } + } + { + let recent_blocks = block_cache.lock().unwrap(); + for (height, block) in recent_blocks.iter() { + if *height == block_height { + process_block!(block); + break 'tx_found; + } + } + } + + let block_hash = source.get_block_hash_by_height(block_height).await .map_err(|_| UtxoLookupError::UnknownTx)?; + let block_data = source.get_block(&block_hash).await + .map_err(|_| UtxoLookupError::UnknownTx)?; + let block = match block_data { + BlockData::HeaderOnly(_) => return Err(UtxoLookupError::UnknownTx), + BlockData::FullBlock(block) => block, + }; + process_block!(block); + { + let mut recent_blocks = block_cache.lock().unwrap(); + let mut insert = true; + for (height, _) in recent_blocks.iter() { + if *height == block_height { + insert = false; + } + } + if insert { + if recent_blocks.len() >= BLOCK_CACHE_SIZE { + recent_blocks.pop_front(); + } + recent_blocks.push_back((block_height, block)); + } + } + break 'tx_found; + }; + let outpoint_unspent = + source.is_output_unspent(outpoint).await.map_err(|_| UtxoLookupError::UnknownTx)?; if outpoint_unspent { - Ok(transaction.output.swap_remove(output_index as usize)) + Ok(output) } else { Err(UtxoLookupError::UnknownTx) } @@ -190,9 +234,10 @@ impl Date: Mon, 5 Jun 2023 17:22:36 +0000 Subject: [PATCH 5/5] Fail UTXO lookups if the block doesn't have five confirmations The BOLT spec mandates that channels not be announced until they have at least six confirmations. This is important to enforce not because we particularly care about any specific DoS concerns, but because if we do not we may have to handle reorgs of channel funding transactions which change their SCID or have conflicting SCIDs. --- lightning-block-sync/src/gossip.rs | 77 +++++++++++++++++++++++++++++- 1 file changed, 75 insertions(+), 2 deletions(-) diff --git a/lightning-block-sync/src/gossip.rs b/lightning-block-sync/src/gossip.rs index 4e66c0ce9..37f426851 100644 --- a/lightning-block-sync/src/gossip.rs +++ b/lightning-block-sync/src/gossip.rs @@ -2,7 +2,7 @@ //! current UTXO set. This module defines an implementation of the LDK API required to do so //! against a [`BlockSource`] which implements a few additional methods for accessing the UTXO set. -use crate::{AsyncBlockSourceResult, BlockData, BlockSource}; +use crate::{AsyncBlockSourceResult, BlockData, BlockSource, BlockSourceError}; use bitcoin::blockdata::block::Block; use bitcoin::blockdata::transaction::{TxOut, OutPoint}; @@ -22,6 +22,8 @@ use std::sync::{Arc, Mutex}; use std::collections::VecDeque; use std::future::Future; use std::ops::Deref; +use std::pin::Pin; +use std::task::Poll; /// A trait which extends [`BlockSource`] and can be queried to fetch the block at a given height /// as well as whether a given output is unspent (i.e. a member of the current UTXO set). @@ -62,6 +64,65 @@ impl FutureSpawner for TokioSpawner { } } +/// A trivial future which joins two other futures and polls them at the same time, returning only +/// once both complete. +pub(crate) struct Joiner< + A: Future), BlockSourceError>> + Unpin, + B: Future> + Unpin, +> { + pub a: A, + pub b: B, + a_res: Option<(BlockHash, Option)>, + b_res: Option, +} + +impl< + A: Future), BlockSourceError>> + Unpin, + B: Future> + Unpin, +> Joiner { + fn new(a: A, b: B) -> Self { Self { a, b, a_res: None, b_res: None } } +} + +impl< + A: Future), BlockSourceError>> + Unpin, + B: Future> + Unpin, +> Future for Joiner { + type Output = Result<((BlockHash, Option), BlockHash), BlockSourceError>; + fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll { + if self.a_res.is_none() { + match Pin::new(&mut self.a).poll(ctx) { + Poll::Ready(res) => { + if let Ok(ok) = res { + self.a_res = Some(ok); + } else { + return Poll::Ready(Err(res.unwrap_err())); + } + }, + Poll::Pending => {}, + } + } + if self.b_res.is_none() { + match Pin::new(&mut self.b).poll(ctx) { + Poll::Ready(res) => { + if let Ok(ok) = res { + self.b_res = Some(ok); + } else { + return Poll::Ready(Err(res.unwrap_err())); + } + + }, + Poll::Pending => {}, + } + } + if let Some(b_res) = self.b_res { + if let Some(a_res) = self.a_res { + return Poll::Ready(Ok((a_res, b_res))) + } + } + Poll::Pending + } +} + /// A struct which wraps a [`UtxoSource`] and a few LDK objects and implements the LDK /// [`UtxoLookup`] trait. /// @@ -156,8 +217,20 @@ impl tip_height { + return Err(UtxoLookupError::UnknownTx); + } + } let block_data = source.get_block(&block_hash).await .map_err(|_| UtxoLookupError::UnknownTx)?; let block = match block_data {