Add transaction sync crate

This crate provides utilities for syncing LDK via the transaction-based
`Confirm` interface. The initial implementation facilitates
synchronization with an Esplora backend server.
This commit is contained in:
Elias Rohrer 2022-11-23 09:33:37 +01:00
parent 760ab65dbd
commit fa77f34327
No known key found for this signature in database
GPG key ID: 36153082BDF676FD
6 changed files with 631 additions and 0 deletions

View file

@ -3,6 +3,7 @@
members = [
"lightning",
"lightning-block-sync",
"lightning-transaction-sync",
"lightning-invoice",
"lightning-net-tokio",
"lightning-persister",

View file

@ -0,0 +1,27 @@
[package]
name = "lightning-transaction-sync"
version = "0.0.113"
authors = ["Elias Rohrer"]
license = "MIT OR Apache-2.0"
repository = "http://github.com/lightningdevkit/rust-lightning"
description = """
Utilities for syncing LDK via the transaction-based `Confirm` interface.
"""
edition = "2018"
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[features]
default = []
esplora-async = ["async-interface", "esplora-client/async", "futures"]
esplora-blocking = ["esplora-client/blocking"]
async-interface = []
[dependencies]
lightning = { version = "0.0.113", path = "../lightning" }
bitcoin = "0.29.0"
bdk-macros = "0.6"
futures = { version = "0.3", optional = true }
esplora-client = { version = "0.3.0", default-features = false, optional = true }

View file

@ -0,0 +1,75 @@
use lightning::chain::WatchedOutput;
use bitcoin::{Txid, BlockHash, Transaction, BlockHeader, OutPoint};
use std::collections::{HashSet, HashMap};
// Represents the current state.
pub(crate) struct SyncState {
// Transactions that were previously processed, but must not be forgotten
// yet since they still need to be monitored for confirmation on-chain.
pub watched_transactions: HashSet<Txid>,
// Outputs that were previously processed, but must not be forgotten yet as
// as we still need to monitor any spends on-chain.
pub watched_outputs: HashMap<OutPoint, WatchedOutput>,
// The tip hash observed during our last sync.
pub last_sync_hash: Option<BlockHash>,
// Indicates whether we need to resync, e.g., after encountering an error.
pub pending_sync: bool,
}
impl SyncState {
pub fn new() -> Self {
Self {
watched_transactions: HashSet::new(),
watched_outputs: HashMap::new(),
last_sync_hash: None,
pending_sync: false,
}
}
}
// A queue that is to be filled by `Filter` and drained during the next syncing round.
pub(crate) struct FilterQueue {
// Transactions that were registered via the `Filter` interface and have to be processed.
pub transactions: HashSet<Txid>,
// Outputs that were registered via the `Filter` interface and have to be processed.
pub outputs: HashMap<OutPoint, WatchedOutput>,
}
impl FilterQueue {
pub fn new() -> Self {
Self {
transactions: HashSet::new(),
outputs: HashMap::new(),
}
}
// Processes the transaction and output queues and adds them to the given [`SyncState`].
//
// Returns `true` if new items had been registered.
pub fn process_queues(&mut self, sync_state: &mut SyncState) -> bool {
let mut pending_registrations = false;
if !self.transactions.is_empty() {
pending_registrations = true;
sync_state.watched_transactions.extend(self.transactions.drain());
}
if !self.outputs.is_empty() {
pending_registrations = true;
sync_state.watched_outputs.extend(self.outputs.drain());
}
pending_registrations
}
}
pub(crate) struct ConfirmedTx {
pub tx: Transaction,
pub block_header: BlockHeader,
pub block_height: u32,
pub pos: usize,
}

View file

@ -0,0 +1,63 @@
use std::fmt;
#[derive(Debug)]
/// An error that possibly needs to be handled by the user.
pub enum TxSyncError {
/// A transaction sync failed and needs to be retried eventually.
Failed,
}
impl std::error::Error for TxSyncError {}
impl fmt::Display for TxSyncError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Self::Failed => write!(f, "Failed to conduct transaction sync."),
}
}
}
#[derive(Debug)]
#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
pub(crate) enum InternalError {
/// A transaction sync failed and needs to be retried eventually.
Failed,
/// An inconsisteny was encounterd during transaction sync.
Inconsistency,
}
#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
impl fmt::Display for InternalError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Self::Failed => write!(f, "Failed to conduct transaction sync."),
Self::Inconsistency => {
write!(f, "Encountered an inconsisteny during transaction sync.")
}
}
}
}
#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
impl std::error::Error for InternalError {}
#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
impl From<esplora_client::Error> for TxSyncError {
fn from(_e: esplora_client::Error) -> Self {
Self::Failed
}
}
#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
impl From<esplora_client::Error> for InternalError {
fn from(_e: esplora_client::Error) -> Self {
Self::Failed
}
}
#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
impl From<InternalError> for TxSyncError {
fn from(_e: InternalError) -> Self {
Self::Failed
}
}

View file

@ -0,0 +1,383 @@
use crate::error::{TxSyncError, InternalError};
use crate::common::{SyncState, FilterQueue, ConfirmedTx};
use lightning::util::logger::Logger;
use lightning::{log_error, log_given_level, log_info, log_internal, log_debug, log_trace};
use lightning::chain::WatchedOutput;
use lightning::chain::{Confirm, Filter};
use bitcoin::{BlockHash, Script, Txid};
use esplora_client::Builder;
#[cfg(feature = "async-interface")]
use esplora_client::r#async::AsyncClient;
#[cfg(not(feature = "async-interface"))]
use esplora_client::blocking::BlockingClient;
use std::collections::HashSet;
use core::ops::Deref;
/// Synchronizes LDK with a given [`Esplora`] server.
///
/// Needs to be registered with a [`ChainMonitor`] via the [`Filter`] interface to be informed of
/// transactions and outputs to monitor for on-chain confirmation, unconfirmation, and
/// reconfirmation.
///
/// Note that registration via [`Filter`] needs to happen before any calls to
/// [`Watch::watch_channel`] to ensure we get notified of the items to monitor.
///
/// This uses and exposes either a blocking or async client variant dependent on whether the
/// `esplora-blocking` or the `esplora-async` feature is enabled.
///
/// [`Esplora`]: https://github.com/Blockstream/electrs
/// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
/// [`Watch::watch_channel`]: lightning::chain::Watch::watch_channel
/// [`Filter`]: lightning::chain::Filter
pub struct EsploraSyncClient<L: Deref>
where
L::Target: Logger,
{
sync_state: MutexType<SyncState>,
queue: std::sync::Mutex<FilterQueue>,
client: EsploraClientType,
logger: L,
}
impl<L: Deref> EsploraSyncClient<L>
where
L::Target: Logger,
{
/// Returns a new [`EsploraSyncClient`] object.
pub fn new(server_url: String, logger: L) -> Self {
let builder = Builder::new(&server_url);
#[cfg(not(feature = "async-interface"))]
let client = builder.build_blocking().unwrap();
#[cfg(feature = "async-interface")]
let client = builder.build_async().unwrap();
EsploraSyncClient::from_client(client, logger)
}
/// Returns a new [`EsploraSyncClient`] object using the given Esplora client.
pub fn from_client(client: EsploraClientType, logger: L) -> Self {
let sync_state = MutexType::new(SyncState::new());
let queue = std::sync::Mutex::new(FilterQueue::new());
Self {
sync_state,
queue,
client,
logger,
}
}
/// Synchronizes the given `confirmables` via their [`Confirm`] interface implementations. This
/// method should be called regularly to keep LDK up-to-date with current chain data.
///
/// For example, instances of [`ChannelManager`] and [`ChainMonitor`] can be informed about the
/// newest on-chain activity related to the items previously registered via the [`Filter`]
/// interface.
///
/// [`Confirm`]: lightning::chain::Confirm
/// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
/// [`Filter`]: lightning::chain::Filter
#[maybe_async]
pub fn sync(&self, confirmables: Vec<&(dyn Confirm + Sync + Send)>) -> Result<(), TxSyncError> {
// This lock makes sure we're syncing once at a time.
#[cfg(not(feature = "async-interface"))]
let mut sync_state = self.sync_state.lock().unwrap();
#[cfg(feature = "async-interface")]
let mut sync_state = self.sync_state.lock().await;
log_info!(self.logger, "Starting transaction sync.");
let mut tip_hash = maybe_await!(self.client.get_tip_hash())?;
loop {
let pending_registrations = self.queue.lock().unwrap().process_queues(&mut sync_state);
let tip_is_new = Some(tip_hash) != sync_state.last_sync_hash;
// We loop until any registered transactions have been processed at least once, or the
// tip hasn't been updated during the last iteration.
if !sync_state.pending_sync && !pending_registrations && !tip_is_new {
// Nothing to do.
break;
} else {
// Update the known tip to the newest one.
if tip_is_new {
// First check for any unconfirmed transactions and act on it immediately.
match maybe_await!(self.get_unconfirmed_transactions(&confirmables)) {
Ok(unconfirmed_txs) => {
// Double-check the tip hash. If it changed, a reorg happened since
// we started syncing and we need to restart last-minute.
let check_tip_hash = maybe_await!(self.client.get_tip_hash())?;
if check_tip_hash != tip_hash {
tip_hash = check_tip_hash;
continue;
}
self.sync_unconfirmed_transactions(&mut sync_state, &confirmables, unconfirmed_txs);
},
Err(err) => {
// (Semi-)permanent failure, retry later.
log_error!(self.logger, "Failed during transaction sync, aborting.");
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
}
}
match maybe_await!(self.sync_best_block_updated(&confirmables, &tip_hash)) {
Ok(()) => {}
Err(InternalError::Inconsistency) => {
// Immediately restart syncing when we encounter any inconsistencies.
log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting.");
sync_state.pending_sync = true;
continue;
}
Err(err) => {
// (Semi-)permanent failure, retry later.
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
}
}
}
match maybe_await!(self.get_confirmed_transactions(&sync_state)) {
Ok(confirmed_txs) => {
// Double-check the tip hash. If it changed, a reorg happened since
// we started syncing and we need to restart last-minute.
let check_tip_hash = maybe_await!(self.client.get_tip_hash())?;
if check_tip_hash != tip_hash {
tip_hash = check_tip_hash;
continue;
}
self.sync_confirmed_transactions(
&mut sync_state,
&confirmables,
confirmed_txs,
);
}
Err(InternalError::Inconsistency) => {
// Immediately restart syncing when we encounter any inconsistencies.
log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting.");
sync_state.pending_sync = true;
continue;
}
Err(err) => {
// (Semi-)permanent failure, retry later.
log_error!(self.logger, "Failed during transaction sync, aborting.");
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
}
}
sync_state.last_sync_hash = Some(tip_hash);
sync_state.pending_sync = false;
}
}
log_info!(self.logger, "Finished transaction sync.");
Ok(())
}
#[maybe_async]
fn sync_best_block_updated(
&self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, tip_hash: &BlockHash,
) -> Result<(), InternalError> {
// Inform the interface of the new block.
let tip_header = maybe_await!(self.client.get_header_by_hash(tip_hash))?;
let tip_status = maybe_await!(self.client.get_block_status(&tip_hash))?;
if tip_status.in_best_chain {
if let Some(tip_height) = tip_status.height {
for c in confirmables {
c.best_block_updated(&tip_header, tip_height);
}
}
} else {
return Err(InternalError::Inconsistency);
}
Ok(())
}
fn sync_confirmed_transactions(
&self, sync_state: &mut SyncState, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, confirmed_txs: Vec<ConfirmedTx>,
) {
for ctx in confirmed_txs {
for c in confirmables {
c.transactions_confirmed(
&ctx.block_header,
&[(ctx.pos, &ctx.tx)],
ctx.block_height,
);
}
sync_state.watched_transactions.remove(&ctx.tx.txid());
for input in &ctx.tx.input {
sync_state.watched_outputs.remove(&input.previous_output);
}
}
}
#[maybe_async]
fn get_confirmed_transactions(
&self, sync_state: &SyncState,
) -> Result<Vec<ConfirmedTx>, InternalError> {
// First, check the confirmation status of registered transactions as well as the
// status of dependent transactions of registered outputs.
let mut confirmed_txs = Vec::new();
for txid in &sync_state.watched_transactions {
if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(&txid, None, None))? {
confirmed_txs.push(confirmed_tx);
}
}
for (_, output) in &sync_state.watched_outputs {
if let Some(output_status) = maybe_await!(self.client
.get_output_status(&output.outpoint.txid, output.outpoint.index as u64))?
{
if let Some(spending_txid) = output_status.txid {
if let Some(spending_tx_status) = output_status.status {
if let Some(confirmed_tx) = maybe_await!(self
.get_confirmed_tx(
&spending_txid,
spending_tx_status.block_hash,
spending_tx_status.block_height,
))?
{
confirmed_txs.push(confirmed_tx);
}
}
}
}
}
// Sort all confirmed transactions first by block height, then by in-block
// position, and finally feed them to the interface in order.
confirmed_txs.sort_unstable_by(|tx1, tx2| {
tx1.block_height.cmp(&tx2.block_height).then_with(|| tx1.pos.cmp(&tx2.pos))
});
Ok(confirmed_txs)
}
#[maybe_async]
fn get_confirmed_tx(
&self, txid: &Txid, expected_block_hash: Option<BlockHash>, known_block_height: Option<u32>,
) -> Result<Option<ConfirmedTx>, InternalError> {
if let Some(merkle_block) = maybe_await!(self.client.get_merkle_block(&txid))? {
let block_header = merkle_block.header;
let block_hash = block_header.block_hash();
if let Some(expected_block_hash) = expected_block_hash {
if expected_block_hash != block_hash {
log_trace!(self.logger, "Inconsistency: Tx {} expected in block {}, but is confirmed in {}", txid, expected_block_hash, block_hash);
return Err(InternalError::Inconsistency);
}
}
let mut matches = Vec::new();
let mut indexes = Vec::new();
let _ = merkle_block.txn.extract_matches(&mut matches, &mut indexes);
if indexes.len() != 1 || matches.len() != 1 || matches[0] != *txid {
log_error!(self.logger, "Retrieved Merkle block for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid);
return Err(InternalError::Failed);
}
let pos = *indexes.get(0).ok_or(InternalError::Failed)? as usize;
if let Some(tx) = maybe_await!(self.client.get_tx(&txid))? {
if let Some(block_height) = known_block_height {
// We can take a shortcut here if a previous call already gave us the height.
return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height }));
}
let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
if let Some(block_height) = block_status.height {
return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height }));
} else {
// If any previously-confirmed block suddenly is no longer confirmed, we found
// an inconsistency and should start over.
log_trace!(self.logger, "Inconsistency: Tx {} was unconfirmed during syncing.", txid);
return Err(InternalError::Inconsistency);
}
}
}
Ok(None)
}
#[maybe_async]
fn get_unconfirmed_transactions(
&self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>,
) -> Result<Vec<Txid>, InternalError> {
// Query the interface for relevant txids and check whether the relevant blocks are still
// in the best chain, mark them unconfirmed otherwise
let relevant_txids = confirmables
.iter()
.flat_map(|c| c.get_relevant_txids())
.collect::<HashSet<(Txid, Option<BlockHash>)>>();
let mut unconfirmed_txs = Vec::new();
for (txid, block_hash_opt) in relevant_txids {
if let Some(block_hash) = block_hash_opt {
let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
if block_status.in_best_chain {
// Skip if the block in question is still confirmed.
continue;
}
unconfirmed_txs.push(txid);
} else {
log_error!(self.logger, "Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
panic!("Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
}
}
Ok(unconfirmed_txs)
}
fn sync_unconfirmed_transactions(
&self, sync_state: &mut SyncState, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, unconfirmed_txs: Vec<Txid>,
) {
for txid in unconfirmed_txs {
for c in confirmables {
c.transaction_unconfirmed(&txid);
}
sync_state.watched_transactions.insert(txid);
}
}
/// Returns a reference to the underlying esplora client.
pub fn client(&self) -> &EsploraClientType {
&self.client
}
}
#[cfg(feature = "async-interface")]
type MutexType<I> = futures::lock::Mutex<I>;
#[cfg(not(feature = "async-interface"))]
type MutexType<I> = std::sync::Mutex<I>;
// The underlying client type.
#[cfg(feature = "async-interface")]
type EsploraClientType = AsyncClient;
#[cfg(not(feature = "async-interface"))]
type EsploraClientType = BlockingClient;
impl<L: Deref> Filter for EsploraSyncClient<L>
where
L::Target: Logger,
{
fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) {
let mut locked_queue = self.queue.lock().unwrap();
locked_queue.transactions.insert(*txid);
}
fn register_output(&self, output: WatchedOutput) {
let mut locked_queue = self.queue.lock().unwrap();
locked_queue.outputs.insert(output.outpoint.into_bitcoin_outpoint(), output);
}
}

View file

@ -0,0 +1,82 @@
//! Provides utilities for syncing LDK via the transaction-based [`Confirm`] interface.
//!
//! The provided synchronization clients need to be registered with a [`ChainMonitor`] via the
//! [`Filter`] interface. Then, the respective `fn sync` needs to be called with the [`Confirm`]
//! implementations to be synchronized, i.e., usually instances of [`ChannelManager`] and
//! [`ChainMonitor`].
//!
//! ## Features and Backend Support
//!
//!- `esplora_blocking` enables syncing against an Esplora backend based on a blocking client.
//!- `esplora_async` enables syncing against an Esplora backend based on an async client.
//!
//! ## Version Compatibility
//!
//! Currently this crate is compatible with nodes that were created with LDK version 0.0.113 and above.
//!
//! ## Usage Example:
//!
//! ```ignore
//! let tx_sync = Arc::new(EsploraSyncClient::new(
//! esplora_server_url,
//! Arc::clone(&some_logger),
//! ));
//!
//! let chain_monitor = Arc::new(ChainMonitor::new(
//! Some(Arc::clone(&tx_sync)),
//! Arc::clone(&some_broadcaster),
//! Arc::clone(&some_logger),
//! Arc::clone(&some_fee_estimator),
//! Arc::clone(&some_persister),
//! ));
//!
//! let channel_manager = Arc::new(ChannelManager::new(
//! Arc::clone(&some_fee_estimator),
//! Arc::clone(&chain_monitor),
//! Arc::clone(&some_broadcaster),
//! Arc::clone(&some_router),
//! Arc::clone(&some_logger),
//! Arc::clone(&some_entropy_source),
//! Arc::clone(&some_node_signer),
//! Arc::clone(&some_signer_provider),
//! user_config,
//! chain_params,
//! ));
//!
//! let confirmables = vec![
//! &*channel_manager as &(dyn Confirm + Sync + Send),
//! &*chain_monitor as &(dyn Confirm + Sync + Send),
//! ];
//!
//! tx_sync.sync(confirmables).unwrap();
//! ```
//!
//! [`Confirm`]: lightning::chain::Confirm
//! [`Filter`]: lightning::chain::Filter
//! [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
//! [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
// Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
#![deny(broken_intra_doc_links)]
#![deny(private_intra_doc_links)]
#![deny(missing_docs)]
#![deny(unsafe_code)]
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
#[macro_use]
extern crate bdk_macros;
#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
mod esplora;
#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
mod common;
mod error;
pub use error::TxSyncError;
#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
pub use esplora::EsploraSyncClient;