Immutable BlockSource interface

Querying a BlockSource is a logically immutable operation. Use non-mut
references in its interface to reflect this, which allows for users to
hold multiple references if desired.
This commit is contained in:
Jeffrey Czyz 2022-02-14 15:31:59 -06:00
parent 0a0f87c00f
commit 3cdbbf56e4
No known key found for this signature in database
GPG key ID: 3A4E08275D5E96D2
7 changed files with 59 additions and 54 deletions

View file

@ -20,6 +20,7 @@ rpc-client = [ "serde", "serde_json", "chunked_transfer" ]
[dependencies]
bitcoin = "0.27"
lightning = { version = "0.0.106", path = "../lightning" }
futures = { version = "0.3" }
tokio = { version = "1.0", features = [ "io-util", "net", "time" ], optional = true }
serde = { version = "1.0", features = ["derive"], optional = true }
serde_json = { version = "1.0", optional = true }

View file

@ -16,7 +16,7 @@ use lightning::chain;
/// start when there are no chain listeners to sync yet.
///
/// [`SpvClient`]: crate::SpvClient
pub async fn validate_best_block_header<B: BlockSource>(block_source: &mut B) ->
pub async fn validate_best_block_header<B: BlockSource>(block_source: &B) ->
BlockSourceResult<ValidatedBlockHeader> {
let (best_block_hash, best_block_height) = block_source.get_best_block().await?;
block_source
@ -67,7 +67,7 @@ BlockSourceResult<ValidatedBlockHeader> {
/// C: chain::Filter,
/// P: chainmonitor::Persist<S>,
/// >(
/// block_source: &mut B,
/// block_source: &B,
/// chain_monitor: &ChainMonitor<S, &C, &T, &F, &L, &P>,
/// config: UserConfig,
/// keys_manager: &K,
@ -122,7 +122,7 @@ BlockSourceResult<ValidatedBlockHeader> {
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
pub async fn synchronize_listeners<'a, B: BlockSource, C: Cache, L: chain::Listen + ?Sized>(
block_source: &mut B,
block_source: &B,
network: Network,
header_cache: &mut C,
mut chain_listeners: Vec<(BlockHash, &'a L)>,

View file

@ -61,11 +61,11 @@ pub trait BlockSource : Sync + Send {
///
/// Implementations that cannot find headers based on the hash should return a `Transient` error
/// when `height_hint` is `None`.
fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, height_hint: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData>;
fn get_header<'a>(&'a self, header_hash: &'a BlockHash, height_hint: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData>;
/// Returns the block for a given hash. A headers-only block source should return a `Transient`
/// error.
fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block>;
fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block>;
/// Returns the hash of the best block and, optionally, its height.
///
@ -73,7 +73,7 @@ pub trait BlockSource : Sync + Send {
/// to allow for a more efficient lookup.
///
/// [`get_header`]: Self::get_header
fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<(BlockHash, Option<u32>)>;
fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<(BlockHash, Option<u32>)>;
}
/// Result type for `BlockSource` requests.

View file

@ -6,7 +6,7 @@ use bitcoin::blockdata::block::Block;
use bitcoin::hash_types::BlockHash;
use bitcoin::network::constants::Network;
use std::ops::DerefMut;
use std::ops::Deref;
/// The `Poll` trait defines behavior for polling block sources for a chain tip and retrieving
/// related chain data. It serves as an adapter for `BlockSource`.
@ -17,15 +17,15 @@ use std::ops::DerefMut;
/// [`ChainPoller`]: ../struct.ChainPoller.html
pub trait Poll {
/// Returns a chain tip in terms of its relationship to the provided chain tip.
fn poll_chain_tip<'a>(&'a mut self, best_known_chain_tip: ValidatedBlockHeader) ->
fn poll_chain_tip<'a>(&'a self, best_known_chain_tip: ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ChainTip>;
/// Returns the header that preceded the given header in the chain.
fn look_up_previous_header<'a>(&'a mut self, header: &'a ValidatedBlockHeader) ->
fn look_up_previous_header<'a>(&'a self, header: &'a ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ValidatedBlockHeader>;
/// Returns the block associated with the given header.
fn fetch_block<'a>(&'a mut self, header: &'a ValidatedBlockHeader) ->
fn fetch_block<'a>(&'a self, header: &'a ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ValidatedBlock>;
}
@ -170,12 +170,12 @@ mod sealed {
///
/// Other `Poll` implementations should be built using `ChainPoller` as it provides the simplest way
/// of validating chain data and checking consistency.
pub struct ChainPoller<B: DerefMut<Target=T> + Sized, T: BlockSource> {
pub struct ChainPoller<B: Deref<Target=T> + Sized, T: BlockSource> {
block_source: B,
network: Network,
}
impl<B: DerefMut<Target=T> + Sized, T: BlockSource> ChainPoller<B, T> {
impl<B: Deref<Target=T> + Sized, T: BlockSource> ChainPoller<B, T> {
/// Creates a new poller for the given block source.
///
/// If the `network` parameter is mainnet, then the difficulty between blocks is checked for
@ -185,8 +185,8 @@ impl<B: DerefMut<Target=T> + Sized, T: BlockSource> ChainPoller<B, T> {
}
}
impl<B: DerefMut<Target=T> + Sized + Send + Sync, T: BlockSource> Poll for ChainPoller<B, T> {
fn poll_chain_tip<'a>(&'a mut self, best_known_chain_tip: ValidatedBlockHeader) ->
impl<B: Deref<Target=T> + Sized + Send + Sync, T: BlockSource> Poll for ChainPoller<B, T> {
fn poll_chain_tip<'a>(&'a self, best_known_chain_tip: ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ChainTip>
{
Box::pin(async move {
@ -206,7 +206,7 @@ impl<B: DerefMut<Target=T> + Sized + Send + Sync, T: BlockSource> Poll for Chain
})
}
fn look_up_previous_header<'a>(&'a mut self, header: &'a ValidatedBlockHeader) ->
fn look_up_previous_header<'a>(&'a self, header: &'a ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ValidatedBlockHeader>
{
Box::pin(async move {
@ -225,7 +225,7 @@ impl<B: DerefMut<Target=T> + Sized + Send + Sync, T: BlockSource> Poll for Chain
})
}
fn fetch_block<'a>(&'a mut self, header: &'a ValidatedBlockHeader) ->
fn fetch_block<'a>(&'a self, header: &'a ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ValidatedBlock>
{
Box::pin(async move {
@ -249,7 +249,7 @@ mod tests {
let best_known_chain_tip = chain.tip();
chain.disconnect_tip();
let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => {
assert_eq!(e.kind(), BlockSourceErrorKind::Transient);
@ -261,10 +261,10 @@ mod tests {
#[tokio::test]
async fn poll_chain_without_headers() {
let mut chain = Blockchain::default().with_height(1).without_headers();
let chain = Blockchain::default().with_height(1).without_headers();
let best_known_chain_tip = chain.at_height(0);
let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => {
assert_eq!(e.kind(), BlockSourceErrorKind::Persistent);
@ -283,7 +283,7 @@ mod tests {
chain.blocks.last_mut().unwrap().header.bits =
BlockHeader::compact_target_from_u256(&Uint256::from_be_bytes([0; 32]));
let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => {
assert_eq!(e.kind(), BlockSourceErrorKind::Persistent);
@ -295,10 +295,10 @@ mod tests {
#[tokio::test]
async fn poll_chain_with_malformed_headers() {
let mut chain = Blockchain::default().with_height(1).malformed_headers();
let chain = Blockchain::default().with_height(1).malformed_headers();
let best_known_chain_tip = chain.at_height(0);
let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => {
assert_eq!(e.kind(), BlockSourceErrorKind::Persistent);
@ -310,10 +310,10 @@ mod tests {
#[tokio::test]
async fn poll_chain_with_common_tip() {
let mut chain = Blockchain::default().with_height(0);
let chain = Blockchain::default().with_height(0);
let best_known_chain_tip = chain.tip();
let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(tip) => assert_eq!(tip, ChainTip::Common),
@ -330,7 +330,7 @@ mod tests {
let worse_chain_tip = chain.tip();
assert_eq!(best_known_chain_tip.chainwork, worse_chain_tip.chainwork);
let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(tip) => assert_eq!(tip, ChainTip::Worse(worse_chain_tip)),
@ -345,7 +345,7 @@ mod tests {
chain.disconnect_tip();
let worse_chain_tip = chain.tip();
let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(tip) => assert_eq!(tip, ChainTip::Worse(worse_chain_tip)),
@ -354,12 +354,12 @@ mod tests {
#[tokio::test]
async fn poll_chain_with_better_tip() {
let mut chain = Blockchain::default().with_height(1);
let chain = Blockchain::default().with_height(1);
let best_known_chain_tip = chain.at_height(0);
let better_chain_tip = chain.tip();
let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(tip) => assert_eq!(tip, ChainTip::Better(better_chain_tip)),

View file

@ -8,13 +8,15 @@ use bitcoin::blockdata::block::Block;
use bitcoin::hash_types::BlockHash;
use bitcoin::hashes::hex::ToHex;
use futures::lock::Mutex;
use std::convert::TryFrom;
use std::convert::TryInto;
/// A simple REST client for requesting resources using HTTP `GET`.
pub struct RestClient {
endpoint: HttpEndpoint,
client: HttpClient,
client: Mutex<HttpClient>,
}
impl RestClient {
@ -22,35 +24,35 @@ impl RestClient {
///
/// The endpoint should contain the REST path component (e.g., http://127.0.0.1:8332/rest).
pub fn new(endpoint: HttpEndpoint) -> std::io::Result<Self> {
let client = HttpClient::connect(&endpoint)?;
let client = Mutex::new(HttpClient::connect(&endpoint)?);
Ok(Self { endpoint, client })
}
/// Requests a resource encoded in `F` format and interpreted as type `T`.
pub async fn request_resource<F, T>(&mut self, resource_path: &str) -> std::io::Result<T>
pub async fn request_resource<F, T>(&self, resource_path: &str) -> std::io::Result<T>
where F: TryFrom<Vec<u8>, Error = std::io::Error> + TryInto<T, Error = std::io::Error> {
let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port());
let uri = format!("{}/{}", self.endpoint.path().trim_end_matches("/"), resource_path);
self.client.get::<F>(&uri, &host).await?.try_into()
self.client.lock().await.get::<F>(&uri, &host).await?.try_into()
}
}
impl BlockSource for RestClient {
fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
fn get_header<'a>(&'a self, header_hash: &'a BlockHash, _height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
Box::pin(async move {
let resource_path = format!("headers/1/{}.json", header_hash.to_hex());
Ok(self.request_resource::<JsonResponse, _>(&resource_path).await?)
})
}
fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
Box::pin(async move {
let resource_path = format!("block/{}.bin", header_hash.to_hex());
Ok(self.request_resource::<BinaryResponse, _>(&resource_path).await?)
})
}
fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
Box::pin(async move {
Ok(self.request_resource::<JsonResponse, _>("chaininfo.json").await?)
})
@ -81,7 +83,7 @@ mod tests {
#[tokio::test]
async fn request_unknown_resource() {
let server = HttpServer::responding_with_not_found();
let mut client = RestClient::new(server.endpoint()).unwrap();
let client = RestClient::new(server.endpoint()).unwrap();
match client.request_resource::<BinaryResponse, u32>("/").await {
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::Other),
@ -92,7 +94,7 @@ mod tests {
#[tokio::test]
async fn request_malformed_resource() {
let server = HttpServer::responding_with_ok(MessageBody::Content("foo"));
let mut client = RestClient::new(server.endpoint()).unwrap();
let client = RestClient::new(server.endpoint()).unwrap();
match client.request_resource::<BinaryResponse, u32>("/").await {
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::InvalidData),
@ -103,7 +105,7 @@ mod tests {
#[tokio::test]
async fn request_valid_resource() {
let server = HttpServer::responding_with_ok(MessageBody::Content(42));
let mut client = RestClient::new(server.endpoint()).unwrap();
let client = RestClient::new(server.endpoint()).unwrap();
match client.request_resource::<BinaryResponse, u32>("/").await {
Err(e) => panic!("Unexpected error: {:?}", e),

View file

@ -8,6 +8,8 @@ use bitcoin::blockdata::block::Block;
use bitcoin::hash_types::BlockHash;
use bitcoin::hashes::hex::ToHex;
use futures::lock::Mutex;
use serde_json;
use std::convert::TryFrom;
@ -18,7 +20,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
pub struct RpcClient {
basic_auth: String,
endpoint: HttpEndpoint,
client: HttpClient,
client: Mutex<HttpClient>,
id: AtomicUsize,
}
@ -27,7 +29,7 @@ impl RpcClient {
/// credentials should be a base64 encoding of a user name and password joined by a colon, as is
/// required for HTTP basic access authentication.
pub fn new(credentials: &str, endpoint: HttpEndpoint) -> std::io::Result<Self> {
let client = HttpClient::connect(&endpoint)?;
let client = Mutex::new(HttpClient::connect(&endpoint)?);
Ok(Self {
basic_auth: "Basic ".to_string() + credentials,
endpoint,
@ -37,7 +39,7 @@ impl RpcClient {
}
/// Calls a method with the response encoded in JSON format and interpreted as type `T`.
pub async fn call_method<T>(&mut self, method: &str, params: &[serde_json::Value]) -> std::io::Result<T>
pub async fn call_method<T>(&self, method: &str, params: &[serde_json::Value]) -> std::io::Result<T>
where JsonResponse: TryFrom<Vec<u8>, Error = std::io::Error> + TryInto<T, Error = std::io::Error> {
let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port());
let uri = self.endpoint.path();
@ -47,7 +49,7 @@ impl RpcClient {
"id": &self.id.fetch_add(1, Ordering::AcqRel).to_string()
});
let mut response = match self.client.post::<JsonResponse>(&uri, &host, &self.basic_auth, content).await {
let mut response = match self.client.lock().await.post::<JsonResponse>(&uri, &host, &self.basic_auth, content).await {
Ok(JsonResponse(response)) => response,
Err(e) if e.kind() == std::io::ErrorKind::Other => {
match e.get_ref().unwrap().downcast_ref::<HttpError>() {
@ -82,14 +84,14 @@ impl RpcClient {
}
impl BlockSource for RpcClient {
fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
fn get_header<'a>(&'a self, header_hash: &'a BlockHash, _height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
Box::pin(async move {
let header_hash = serde_json::json!(header_hash.to_hex());
Ok(self.call_method("getblockheader", &[header_hash]).await?)
})
}
fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
Box::pin(async move {
let header_hash = serde_json::json!(header_hash.to_hex());
let verbosity = serde_json::json!(0);
@ -97,7 +99,7 @@ impl BlockSource for RpcClient {
})
}
fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
Box::pin(async move {
Ok(self.call_method("getblockchaininfo", &[]).await?)
})
@ -127,7 +129,7 @@ mod tests {
#[tokio::test]
async fn call_method_returning_unknown_response() {
let server = HttpServer::responding_with_not_found();
let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
match client.call_method::<u64>("getblockcount", &[]).await {
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::Other),
@ -139,7 +141,7 @@ mod tests {
async fn call_method_returning_malfomred_response() {
let response = serde_json::json!("foo");
let server = HttpServer::responding_with_ok(MessageBody::Content(response));
let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
match client.call_method::<u64>("getblockcount", &[]).await {
Err(e) => {
@ -156,7 +158,7 @@ mod tests {
"error": { "code": -8, "message": "invalid parameter" },
});
let server = HttpServer::responding_with_server_error(response);
let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
let invalid_block_hash = serde_json::json!("foo");
match client.call_method::<u64>("getblock", &[invalid_block_hash]).await {
@ -172,7 +174,7 @@ mod tests {
async fn call_method_returning_missing_result() {
let response = serde_json::json!({ "result": null });
let server = HttpServer::responding_with_ok(MessageBody::Content(response));
let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
match client.call_method::<u64>("getblockcount", &[]).await {
Err(e) => {
@ -187,7 +189,7 @@ mod tests {
async fn call_method_returning_malformed_result() {
let response = serde_json::json!({ "result": "foo" });
let server = HttpServer::responding_with_ok(MessageBody::Content(response));
let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
match client.call_method::<u64>("getblockcount", &[]).await {
Err(e) => {
@ -202,7 +204,7 @@ mod tests {
async fn call_method_returning_valid_result() {
let response = serde_json::json!({ "result": 654470 });
let server = HttpServer::responding_with_ok(MessageBody::Content(response));
let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
match client.call_method::<u64>("getblockcount", &[]).await {
Err(e) => panic!("Unexpected error: {:?}", e),

View file

@ -113,7 +113,7 @@ impl Blockchain {
}
impl BlockSource for Blockchain {
fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height_hint: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
fn get_header<'a>(&'a self, header_hash: &'a BlockHash, _height_hint: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
Box::pin(async move {
if self.without_headers {
return Err(BlockSourceError::persistent("header not found"));
@ -133,7 +133,7 @@ impl BlockSource for Blockchain {
})
}
fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
Box::pin(async move {
for (height, block) in self.blocks.iter().enumerate() {
if block.header.block_hash() == *header_hash {
@ -150,7 +150,7 @@ impl BlockSource for Blockchain {
})
}
fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
Box::pin(async move {
match self.blocks.last() {
None => Err(BlockSourceError::transient("empty chain")),