Support filtered blocks in lightning-block-sync

Expand the BlockSource trait to allow filtered blocks now that
chain::Listen supports them (d629a7edb7).
This makes it possible to use BIP 157/158 compact block filters with
lightning-block-sync.
This commit is contained in:
Jeffrey Czyz 2022-09-08 15:17:05 -05:00
parent 15a5966fa2
commit c1938e8c9f
No known key found for this signature in database
GPG key ID: 912EF12EA67705F5
6 changed files with 119 additions and 30 deletions

View file

@ -216,6 +216,16 @@ impl<'a, L: chain::Listen + ?Sized> chain::Listen for DynamicChainListener<'a, L
struct ChainListenerSet<'a, L: chain::Listen + ?Sized>(Vec<(u32, &'a L)>);
impl<'a, L: chain::Listen + ?Sized> chain::Listen for ChainListenerSet<'a, L> {
// Needed to differentiate test expectations.
#[cfg(test)]
fn block_connected(&self, block: &bitcoin::Block, height: u32) {
for (starting_height, chain_listener) in self.0.iter() {
if height > *starting_height {
chain_listener.block_connected(block, height);
}
}
}
fn filtered_block_connected(&self, header: &BlockHeader, txdata: &chain::transaction::TransactionData, height: u32) {
for (starting_height, chain_listener) in self.0.iter() {
if height > *starting_height {

View file

@ -68,7 +68,7 @@ pub trait BlockSource : Sync + Send {
/// Returns the block for a given hash. A headers-only block source should return a `Transient`
/// error.
fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block>;
fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, BlockData>;
/// Returns the hash of the best block and, optionally, its height.
///
@ -152,6 +152,18 @@ pub struct BlockHeaderData {
pub chainwork: Uint256,
}
/// A block including either all its transactions or only the block header.
///
/// [`BlockSource`] may be implemented to either always return full blocks or, in the case of
/// compact block filters (BIP 157/158), return header-only blocks when no pertinent transactions
/// match. See [`chain::Filter`] for details on how to notify a source of such transactions.
pub enum BlockData {
/// A block containing all its transactions.
FullBlock(Block),
/// A block header for when the block does not contain any pertinent transactions.
HeaderOnly(BlockHeader),
}
/// A lightweight client for keeping a listener in sync with the chain, allowing for Simplified
/// Payment Verification (SPV).
///
@ -396,13 +408,22 @@ impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L> where L::Target: chain::Lis
chain_poller: &mut P,
) -> Result<(), (BlockSourceError, Option<ValidatedBlockHeader>)> {
for header in connected_blocks.drain(..).rev() {
let block = chain_poller
let height = header.height;
let block_data = chain_poller
.fetch_block(&header).await
.or_else(|e| Err((e, Some(new_tip))))?;
debug_assert_eq!(block.block_hash, header.block_hash);
debug_assert_eq!(block_data.block_hash, header.block_hash);
match block_data.deref() {
BlockData::FullBlock(block) => {
self.chain_listener.block_connected(&block, height);
},
BlockData::HeaderOnly(header) => {
self.chain_listener.filtered_block_connected(&header, &[], height);
},
}
self.header_cache.block_connected(header.block_hash, header);
self.chain_listener.block_connected(&block, header.height);
new_tip = header;
}
@ -707,4 +728,25 @@ mod chain_notifier_tests {
Ok(_) => panic!("Expected error"),
}
}
#[tokio::test]
async fn sync_from_chain_with_filtered_blocks() {
let mut chain = Blockchain::default().with_height(3).filtered_blocks();
let new_tip = chain.tip();
let old_tip = chain.at_height(1);
let chain_listener = &MockChainListener::new()
.expect_filtered_block_connected(*chain.at_height(2))
.expect_filtered_block_connected(*new_tip);
let mut notifier = ChainNotifier {
header_cache: &mut chain.header_cache(0..=1),
chain_listener,
};
let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await {
Err((e, _)) => panic!("Unexpected error: {:?}", e),
Ok(_) => {},
}
}
}

View file

@ -1,8 +1,7 @@
//! Adapters that make one or more [`BlockSource`]s simpler to poll for new chain tip transitions.
use crate::{AsyncBlockSourceResult, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceResult};
use crate::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceResult};
use bitcoin::blockdata::block::Block;
use bitcoin::hash_types::BlockHash;
use bitcoin::network::constants::Network;
@ -71,24 +70,31 @@ impl Validate for BlockHeaderData {
}
}
impl Validate for Block {
impl Validate for BlockData {
type T = ValidatedBlock;
fn validate(self, block_hash: BlockHash) -> BlockSourceResult<Self::T> {
let pow_valid_block_hash = self.header
.validate_pow(&self.header.target())
let header = match &self {
BlockData::FullBlock(block) => &block.header,
BlockData::HeaderOnly(header) => header,
};
let pow_valid_block_hash = header
.validate_pow(&header.target())
.or_else(|e| Err(BlockSourceError::persistent(e)))?;
if pow_valid_block_hash != block_hash {
return Err(BlockSourceError::persistent("invalid block hash"));
}
if !self.check_merkle_root() {
return Err(BlockSourceError::persistent("invalid merkle root"));
}
if let BlockData::FullBlock(block) = &self {
if !block.check_merkle_root() {
return Err(BlockSourceError::persistent("invalid merkle root"));
}
if !self.check_witness_commitment() {
return Err(BlockSourceError::persistent("invalid witness commitment"));
if !block.check_witness_commitment() {
return Err(BlockSourceError::persistent("invalid witness commitment"));
}
}
Ok(ValidatedBlock { block_hash, inner: self })
@ -145,11 +151,11 @@ impl ValidatedBlockHeader {
/// A block with validated data against its transaction list and corresponding block hash.
pub struct ValidatedBlock {
pub(crate) block_hash: BlockHash,
inner: Block,
inner: BlockData,
}
impl std::ops::Deref for ValidatedBlock {
type Target = Block;
type Target = BlockData;
fn deref(&self) -> &Self::Target {
&self.inner
@ -161,7 +167,7 @@ mod sealed {
pub trait Validate {}
impl Validate for crate::BlockHeaderData {}
impl Validate for bitcoin::blockdata::block::Block {}
impl Validate for crate::BlockData {}
}
/// The canonical `Poll` implementation used for a single `BlockSource`.

View file

@ -1,10 +1,9 @@
//! Simple REST client implementation which implements [`BlockSource`] against a Bitcoin Core REST
//! endpoint.
use crate::{BlockHeaderData, BlockSource, AsyncBlockSourceResult};
use crate::{BlockData, BlockHeaderData, BlockSource, AsyncBlockSourceResult};
use crate::http::{BinaryResponse, HttpEndpoint, HttpClient, JsonResponse};
use bitcoin::blockdata::block::Block;
use bitcoin::hash_types::BlockHash;
use bitcoin::hashes::hex::ToHex;
@ -45,10 +44,10 @@ impl BlockSource for RestClient {
})
}
fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, BlockData> {
Box::pin(async move {
let resource_path = format!("block/{}.bin", header_hash.to_hex());
Ok(self.request_resource::<BinaryResponse, _>(&resource_path).await?)
Ok(BlockData::FullBlock(self.request_resource::<BinaryResponse, _>(&resource_path).await?))
})
}

View file

@ -1,10 +1,9 @@
//! Simple RPC client implementation which implements [`BlockSource`] against a Bitcoin Core RPC
//! endpoint.
use crate::{BlockHeaderData, BlockSource, AsyncBlockSourceResult};
use crate::{BlockData, BlockHeaderData, BlockSource, AsyncBlockSourceResult};
use crate::http::{HttpClient, HttpEndpoint, HttpError, JsonResponse};
use bitcoin::blockdata::block::Block;
use bitcoin::hash_types::BlockHash;
use bitcoin::hashes::hex::ToHex;
@ -91,11 +90,11 @@ impl BlockSource for RpcClient {
})
}
fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, BlockData> {
Box::pin(async move {
let header_hash = serde_json::json!(header_hash.to_hex());
let verbosity = serde_json::json!(0);
Ok(self.call_method("getblock", &[header_hash, verbosity]).await?)
Ok(BlockData::FullBlock(self.call_method("getblock", &[header_hash, verbosity]).await?))
})
}

View file

@ -1,4 +1,4 @@
use crate::{AsyncBlockSourceResult, BlockHeaderData, BlockSource, BlockSourceError, UnboundedCache};
use crate::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, BlockSourceError, UnboundedCache};
use crate::poll::{Validate, ValidatedBlockHeader};
use bitcoin::blockdata::block::{Block, BlockHeader};
@ -20,6 +20,7 @@ pub struct Blockchain {
without_blocks: Option<std::ops::RangeFrom<usize>>,
without_headers: bool,
malformed_headers: bool,
filtered_blocks: bool,
}
impl Blockchain {
@ -77,6 +78,10 @@ impl Blockchain {
Self { malformed_headers: true, ..self }
}
pub fn filtered_blocks(self) -> Self {
Self { filtered_blocks: true, ..self }
}
pub fn fork_at_height(&self, height: usize) -> Self {
assert!(height + 1 < self.blocks.len());
let mut blocks = self.blocks.clone();
@ -146,7 +151,7 @@ impl BlockSource for Blockchain {
})
}
fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, BlockData> {
Box::pin(async move {
for (height, block) in self.blocks.iter().enumerate() {
if block.header.block_hash() == *header_hash {
@ -156,7 +161,11 @@ impl BlockSource for Blockchain {
}
}
return Ok(block.clone());
if self.filtered_blocks {
return Ok(BlockData::HeaderOnly(block.header.clone()));
} else {
return Ok(BlockData::FullBlock(block.clone()));
}
}
}
Err(BlockSourceError::transient("block not found"))
@ -185,6 +194,7 @@ impl chain::Listen for NullChainListener {
pub struct MockChainListener {
expected_blocks_connected: RefCell<VecDeque<BlockHeaderData>>,
expected_filtered_blocks_connected: RefCell<VecDeque<BlockHeaderData>>,
expected_blocks_disconnected: RefCell<VecDeque<BlockHeaderData>>,
}
@ -192,6 +202,7 @@ impl MockChainListener {
pub fn new() -> Self {
Self {
expected_blocks_connected: RefCell::new(VecDeque::new()),
expected_filtered_blocks_connected: RefCell::new(VecDeque::new()),
expected_blocks_disconnected: RefCell::new(VecDeque::new()),
}
}
@ -201,6 +212,11 @@ impl MockChainListener {
self
}
pub fn expect_filtered_block_connected(self, block: BlockHeaderData) -> Self {
self.expected_filtered_blocks_connected.borrow_mut().push_back(block);
self
}
pub fn expect_block_disconnected(self, block: BlockHeaderData) -> Self {
self.expected_blocks_disconnected.borrow_mut().push_back(block);
self
@ -208,10 +224,22 @@ impl MockChainListener {
}
impl chain::Listen for MockChainListener {
fn filtered_block_connected(&self, header: &BlockHeader, _txdata: &chain::transaction::TransactionData, height: u32) {
fn block_connected(&self, block: &Block, height: u32) {
match self.expected_blocks_connected.borrow_mut().pop_front() {
None => {
panic!("Unexpected block connected: {:?}", header.block_hash());
panic!("Unexpected block connected: {:?}", block.block_hash());
},
Some(expected_block) => {
assert_eq!(block.block_hash(), expected_block.header.block_hash());
assert_eq!(height, expected_block.height);
},
}
}
fn filtered_block_connected(&self, header: &BlockHeader, _txdata: &chain::transaction::TransactionData, height: u32) {
match self.expected_filtered_blocks_connected.borrow_mut().pop_front() {
None => {
panic!("Unexpected filtered block connected: {:?}", header.block_hash());
},
Some(expected_block) => {
assert_eq!(header.block_hash(), expected_block.header.block_hash());
@ -244,6 +272,11 @@ impl Drop for MockChainListener {
panic!("Expected blocks connected: {:?}", expected_blocks_connected);
}
let expected_filtered_blocks_connected = self.expected_filtered_blocks_connected.borrow();
if !expected_filtered_blocks_connected.is_empty() {
panic!("Expected filtered_blocks connected: {:?}", expected_filtered_blocks_connected);
}
let expected_blocks_disconnected = self.expected_blocks_disconnected.borrow();
if !expected_blocks_disconnected.is_empty() {
panic!("Expected blocks disconnected: {:?}", expected_blocks_disconnected);