Merge pull request #3101 from tnull/2024-06-fix-tx-sync

tx-sync: Make confirmables `Deref` and run `rustfmt`
This commit is contained in:
Elias Rohrer 2024-06-10 10:04:49 +02:00 committed by GitHub
commit 1d0c6c60c6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 287 additions and 209 deletions

View file

@ -15,5 +15,5 @@ TMP_FILE=$(mktemp)
find . -name '*.rs' -type f |sort >$TMP_FILE
for file in $(comm -23 $TMP_FILE rustfmt_excluded_files); do
echo "Checking formatting of $file"
rustfmt $VERS --check $file
rustfmt $VERS --edition 2021 --check $file
done

View file

@ -1,10 +1,10 @@
use lightning::chain::{Confirm, WatchedOutput};
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
use bitcoin::{Txid, BlockHash, Transaction, OutPoint};
use bitcoin::block::Header;
use bitcoin::{BlockHash, OutPoint, Transaction, Txid};
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
use lightning::chain::{Confirm, WatchedOutput};
use std::collections::{HashSet, HashMap};
use std::collections::{HashMap, HashSet};
use std::ops::Deref;
// Represents the current state.
pub(crate) struct SyncState {
@ -33,10 +33,11 @@ impl SyncState {
pending_sync: false,
}
}
pub fn sync_unconfirmed_transactions(
&mut self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>,
unconfirmed_txs: Vec<Txid>,
) {
pub fn sync_unconfirmed_transactions<C: Deref>(
&mut self, confirmables: &Vec<C>, unconfirmed_txs: Vec<Txid>,
) where
C::Target: Confirm,
{
for txid in unconfirmed_txs {
for c in confirmables {
c.transaction_unconfirmed(&txid);
@ -46,21 +47,24 @@ impl SyncState {
// If a previously-confirmed output spend is unconfirmed, re-add the watched output to
// the tracking map.
self.outputs_spends_pending_threshold_conf.retain(|(conf_txid, _, prev_outpoint, output)| {
self.outputs_spends_pending_threshold_conf.retain(
|(conf_txid, _, prev_outpoint, output)| {
if txid == *conf_txid {
self.watched_outputs.insert(*prev_outpoint, output.clone());
false
} else {
true
}
})
},
)
}
}
pub fn sync_confirmed_transactions(
&mut self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>,
confirmed_txs: Vec<ConfirmedTx>
) {
pub fn sync_confirmed_transactions<C: Deref>(
&mut self, confirmables: &Vec<C>, confirmed_txs: Vec<ConfirmedTx>,
) where
C::Target: Confirm,
{
for ctx in confirmed_txs {
for c in confirmables {
c.transactions_confirmed(
@ -74,20 +78,19 @@ impl SyncState {
for input in &ctx.tx.input {
if let Some(output) = self.watched_outputs.remove(&input.previous_output) {
self.outputs_spends_pending_threshold_conf.push((ctx.tx.txid(), ctx.block_height, input.previous_output, output));
let spent = (ctx.tx.txid(), ctx.block_height, input.previous_output, output);
self.outputs_spends_pending_threshold_conf.push(spent);
}
}
}
}
pub fn prune_output_spends(&mut self, cur_height: u32) {
self.outputs_spends_pending_threshold_conf.retain(|(_, conf_height, _, _)| {
cur_height < conf_height + ANTI_REORG_DELAY - 1
});
self.outputs_spends_pending_threshold_conf
.retain(|(_, conf_height, _, _)| cur_height < conf_height + ANTI_REORG_DELAY - 1);
}
}
// A queue that is to be filled by `Filter` and drained during the next syncing round.
pub(crate) struct FilterQueue {
// Transactions that were registered via the `Filter` interface and have to be processed.
@ -98,10 +101,7 @@ pub(crate) struct FilterQueue {
impl FilterQueue {
pub fn new() -> Self {
Self {
transactions: HashSet::new(),
outputs: HashMap::new(),
}
Self { transactions: HashSet::new(), outputs: HashMap::new() }
}
// Processes the transaction and output queues and adds them to the given [`SyncState`].

View file

@ -1,24 +1,24 @@
use crate::common::{ConfirmedTx, SyncState, FilterQueue};
use crate::error::{TxSyncError, InternalError};
use crate::common::{ConfirmedTx, FilterQueue, SyncState};
use crate::error::{InternalError, TxSyncError};
use electrum_client::Client as ElectrumClient;
use electrum_client::ElectrumApi;
use electrum_client::GetMerkleRes;
use lightning::util::logger::Logger;
use lightning::{log_error, log_debug, log_trace};
use lightning::chain::WatchedOutput;
use lightning::chain::{Confirm, Filter};
use lightning::util::logger::Logger;
use lightning::{log_debug, log_error, log_trace};
use bitcoin::{BlockHash, Script, Transaction, Txid};
use bitcoin::block::Header;
use bitcoin::hash_types::TxMerkleNode;
use bitcoin::hashes::Hash;
use bitcoin::hashes::sha256d::Hash as Sha256d;
use bitcoin::hashes::Hash;
use bitcoin::{BlockHash, Script, Transaction, Txid};
use std::collections::HashSet;
use std::ops::Deref;
use std::sync::Mutex;
use std::collections::HashSet;
use std::time::Instant;
/// Synchronizes LDK with a given Electrum server.
@ -64,12 +64,7 @@ where
let sync_state = Mutex::new(SyncState::new());
let queue = Mutex::new(FilterQueue::new());
Ok(Self {
sync_state,
queue,
client,
logger,
})
Ok(Self { sync_state, queue, client, logger })
}
/// Synchronizes the given `confirmables` via their [`Confirm`] interface implementations. This
@ -83,7 +78,10 @@ where
/// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
/// [`Filter`]: lightning::chain::Filter
pub fn sync(&self, confirmables: Vec<&(dyn Confirm + Sync + Send)>) -> Result<(), TxSyncError> {
pub fn sync<C: Deref>(&self, confirmables: Vec<C>) -> Result<(), TxSyncError>
where
C::Target: Confirm,
{
// This lock makes sure we're syncing once at a time.
let mut sync_state = self.sync_state.lock().unwrap();
@ -122,15 +120,15 @@ where
num_unconfirmed += unconfirmed_txs.len();
sync_state.sync_unconfirmed_transactions(
&confirmables,
unconfirmed_txs
unconfirmed_txs,
);
}
},
Ok(true) => {
log_debug!(self.logger,
"Encountered inconsistency during transaction sync, restarting.");
sync_state.pending_sync = true;
continue;
}
},
Err(err) => {
// (Semi-)permanent failure, retry later.
log_error!(self.logger,
@ -140,7 +138,7 @@ where
);
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
}
},
}
},
Err(err) => {
@ -152,7 +150,7 @@ where
);
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
}
},
}
// Update the best block.
@ -171,17 +169,15 @@ where
match self.check_update_tip(&mut tip_header, &mut tip_height) {
Ok(false) => {
num_confirmed += confirmed_txs.len();
sync_state.sync_confirmed_transactions(
&confirmables,
confirmed_txs
);
}
sync_state
.sync_confirmed_transactions(&confirmables, confirmed_txs);
},
Ok(true) => {
log_debug!(self.logger,
"Encountered inconsistency during transaction sync, restarting.");
sync_state.pending_sync = true;
continue;
}
},
Err(err) => {
// (Semi-)permanent failure, retry later.
log_error!(self.logger,
@ -191,16 +187,18 @@ where
);
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
},
}
}
}
},
Err(InternalError::Inconsistency) => {
// Immediately restart syncing when we encounter any inconsistencies.
log_debug!(self.logger,
"Encountered inconsistency during transaction sync, restarting.");
log_debug!(
self.logger,
"Encountered inconsistency during transaction sync, restarting."
);
sync_state.pending_sync = true;
continue;
}
},
Err(err) => {
// (Semi-)permanent failure, retry later.
log_error!(self.logger,
@ -210,27 +208,35 @@ where
);
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
}
},
}
sync_state.last_sync_hash = Some(tip_header.block_hash());
sync_state.pending_sync = false;
}
}
#[cfg(feature = "time")]
log_debug!(self.logger,
log_debug!(
self.logger,
"Finished transaction sync at tip {} in {}ms: {} confirmed, {} unconfirmed.",
tip_header.block_hash(), start_time.elapsed().as_millis(), num_confirmed,
num_unconfirmed);
tip_header.block_hash(),
start_time.elapsed().as_millis(),
num_confirmed,
num_unconfirmed
);
#[cfg(not(feature = "time"))]
log_debug!(self.logger,
log_debug!(
self.logger,
"Finished transaction sync at tip {}: {} confirmed, {} unconfirmed.",
tip_header.block_hash(), num_confirmed, num_unconfirmed);
tip_header.block_hash(),
num_confirmed,
num_unconfirmed
);
Ok(())
}
fn check_update_tip(&self, cur_tip_header: &mut Header, cur_tip_height: &mut u32)
-> Result<bool, InternalError>
{
fn check_update_tip(
&self, cur_tip_header: &mut Header, cur_tip_height: &mut u32,
) -> Result<bool, InternalError> {
let check_notification = self.client.block_headers_subscribe()?;
let check_tip_hash = check_notification.header.block_hash();
@ -256,12 +262,12 @@ where
fn get_confirmed_transactions(
&self, sync_state: &SyncState,
) -> Result<Vec<ConfirmedTx>, InternalError> {
// First, check the confirmation status of registered transactions as well as the
// status of dependent transactions of registered outputs.
let mut confirmed_txs: Vec<ConfirmedTx> = Vec::new();
let mut watched_script_pubkeys = Vec::with_capacity(
sync_state.watched_transactions.len() + sync_state.watched_outputs.len());
sync_state.watched_transactions.len() + sync_state.watched_outputs.len(),
);
let mut watched_txs = Vec::with_capacity(sync_state.watched_transactions.len());
for txid in &sync_state.watched_transactions {
@ -278,14 +284,14 @@ where
log_error!(self.logger, "Failed due to retrieving invalid tx data.");
return Err(InternalError::Failed);
}
}
},
Err(electrum_client::Error::Protocol(_)) => {
// We couldn't find the tx, do nothing.
}
},
Err(e) => {
log_error!(self.logger, "Failed to look up transaction {}: {}.", txid, e);
return Err(InternalError::Failed);
}
},
}
}
@ -310,9 +316,9 @@ where
if confirmed_txs.iter().any(|ctx| ctx.txid == **txid) {
continue;
}
let mut filtered_history = script_history.iter().filter(|h| h.tx_hash == **txid);
if let Some(history) = filtered_history.next()
{
let mut filtered_history =
script_history.iter().filter(|h| h.tx_hash == **txid);
if let Some(history) = filtered_history.next() {
let prob_conf_height = history.height as u32;
let confirmed_tx = self.get_confirmed_tx(tx, prob_conf_height)?;
confirmed_txs.push(confirmed_tx);
@ -320,8 +326,8 @@ where
debug_assert!(filtered_history.next().is_none());
}
for (watched_output, script_history) in sync_state.watched_outputs.values()
.zip(output_results)
for (watched_output, script_history) in
sync_state.watched_outputs.values().zip(output_results)
{
for possible_output_spend in script_history {
if possible_output_spend.height <= 0 {
@ -337,8 +343,8 @@ where
Ok(tx) => {
let mut is_spend = false;
for txin in &tx.input {
let watched_outpoint = watched_output.outpoint
.into_bitcoin_outpoint();
let watched_outpoint =
watched_output.outpoint.into_bitcoin_outpoint();
if txin.previous_output == watched_outpoint {
is_spend = true;
break;
@ -352,21 +358,24 @@ where
let prob_conf_height = possible_output_spend.height as u32;
let confirmed_tx = self.get_confirmed_tx(&tx, prob_conf_height)?;
confirmed_txs.push(confirmed_tx);
}
},
Err(e) => {
log_trace!(self.logger,
log_trace!(
self.logger,
"Inconsistency: Tx {} was unconfirmed during syncing: {}",
txid, e);
txid,
e
);
return Err(InternalError::Inconsistency);
},
}
}
}
}
}
},
Err(e) => {
log_error!(self.logger, "Failed to look up script histories: {}.", e);
return Err(InternalError::Failed);
}
},
}
// Sort all confirmed transactions first by block height, then by in-block
@ -378,9 +387,12 @@ where
Ok(confirmed_txs)
}
fn get_unconfirmed_transactions(
&self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>,
) -> Result<Vec<Txid>, InternalError> {
fn get_unconfirmed_transactions<C: Deref>(
&self, confirmables: &Vec<C>,
) -> Result<Vec<Txid>, InternalError>
where
C::Target: Confirm,
{
// Query the interface for relevant txids and check whether the relevant blocks are still
// in the best chain, mark them unconfirmed otherwise
let relevant_txids = confirmables
@ -408,9 +420,9 @@ where
Ok(unconfirmed_txs)
}
fn get_confirmed_tx(&self, tx: &Transaction, prob_conf_height: u32)
-> Result<ConfirmedTx, InternalError>
{
fn get_confirmed_tx(
&self, tx: &Transaction, prob_conf_height: u32,
) -> Result<ConfirmedTx, InternalError> {
let txid = tx.txid();
match self.client.transaction_get_merkle(&txid, prob_conf_height as usize) {
Ok(merkle_res) => {
@ -418,36 +430,47 @@ where
match self.client.block_header(prob_conf_height as usize) {
Ok(block_header) => {
let pos = merkle_res.pos;
if !self.validate_merkle_proof(&txid,
&block_header.merkle_root, merkle_res)?
{
log_trace!(self.logger,
if !self.validate_merkle_proof(
&txid,
&block_header.merkle_root,
merkle_res,
)? {
log_trace!(
self.logger,
"Inconsistency: Block {} was unconfirmed during syncing.",
block_header.block_hash());
block_header.block_hash()
);
return Err(InternalError::Inconsistency);
}
let confirmed_tx = ConfirmedTx {
tx: tx.clone(),
txid,
block_header, block_height: prob_conf_height,
block_header,
block_height: prob_conf_height,
pos,
};
Ok(confirmed_tx)
}
},
Err(e) => {
log_error!(self.logger,
log_error!(
self.logger,
"Failed to retrieve block header for height {}: {}.",
prob_conf_height, e);
prob_conf_height,
e
);
Err(InternalError::Failed)
},
}
}
}
},
Err(e) => {
log_trace!(self.logger,
log_trace!(
self.logger,
"Inconsistency: Tx {} was unconfirmed during syncing: {}",
txid, e);
txid,
e
);
Err(InternalError::Inconsistency)
}
},
}
}
@ -458,20 +481,16 @@ where
&self.client
}
fn validate_merkle_proof(&self, txid: &Txid, merkle_root: &TxMerkleNode,
merkle_res: GetMerkleRes) -> Result<bool, InternalError>
{
fn validate_merkle_proof(
&self, txid: &Txid, merkle_root: &TxMerkleNode, merkle_res: GetMerkleRes,
) -> Result<bool, InternalError> {
let mut index = merkle_res.pos;
let mut cur = txid.to_raw_hash();
for mut bytes in merkle_res.merkle {
bytes.reverse();
// unwrap() safety: `bytes` has len 32 so `from_slice` can never fail.
let next_hash = Sha256d::from_slice(&bytes).unwrap();
let (left, right) = if index % 2 == 0 {
(cur, next_hash)
} else {
(next_hash, cur)
};
let (left, right) = if index % 2 == 0 { (cur, next_hash) } else { (next_hash, cur) };
let data = [&left[..], &right[..]].concat();
cur = Sha256d::hash(&data);

View file

@ -31,7 +31,7 @@ impl fmt::Display for InternalError {
Self::Failed => write!(f, "Failed to conduct transaction sync."),
Self::Inconsistency => {
write!(f, "Encountered an inconsistency during transaction sync.")
}
},
}
}
}

View file

@ -1,21 +1,21 @@
use crate::error::{TxSyncError, InternalError};
use crate::common::{SyncState, FilterQueue, ConfirmedTx};
use crate::common::{ConfirmedTx, FilterQueue, SyncState};
use crate::error::{InternalError, TxSyncError};
use lightning::util::logger::Logger;
use lightning::{log_error, log_debug, log_trace};
use lightning::chain::WatchedOutput;
use lightning::chain::{Confirm, Filter};
use lightning::util::logger::Logger;
use lightning::{log_debug, log_error, log_trace};
use bitcoin::{BlockHash, Script, Txid};
use esplora_client::Builder;
#[cfg(feature = "async-interface")]
use esplora_client::r#async::AsyncClient;
#[cfg(not(feature = "async-interface"))]
use esplora_client::blocking::BlockingClient;
#[cfg(feature = "async-interface")]
use esplora_client::r#async::AsyncClient;
use esplora_client::Builder;
use std::collections::HashSet;
use core::ops::Deref;
use std::collections::HashSet;
/// Synchronizes LDK with a given [`Esplora`] server.
///
@ -64,12 +64,7 @@ where
pub fn from_client(client: EsploraClientType, logger: L) -> Self {
let sync_state = MutexType::new(SyncState::new());
let queue = std::sync::Mutex::new(FilterQueue::new());
Self {
sync_state,
queue,
client,
logger,
}
Self { sync_state, queue, client, logger }
}
/// Synchronizes the given `confirmables` via their [`Confirm`] interface implementations. This
@ -84,7 +79,10 @@ where
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
/// [`Filter`]: lightning::chain::Filter
#[maybe_async]
pub fn sync(&self, confirmables: Vec<&(dyn Confirm + Sync + Send)>) -> Result<(), TxSyncError> {
pub fn sync<C: Deref>(&self, confirmables: Vec<C>) -> Result<(), TxSyncError>
where
C::Target: Confirm,
{
// This lock makes sure we're syncing once at a time.
#[cfg(not(feature = "async-interface"))]
let mut sync_state = self.sync_state.lock().unwrap();
@ -128,9 +126,9 @@ where
num_unconfirmed += unconfirmed_txs.len();
sync_state.sync_unconfirmed_transactions(
&confirmables,
unconfirmed_txs
unconfirmed_txs,
);
}
},
Err(err) => {
// (Semi-)permanent failure, retry later.
log_error!(self.logger,
@ -140,7 +138,7 @@ where
);
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
}
},
}
},
Err(err) => {
@ -152,17 +150,24 @@ where
);
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
}
},
}
match maybe_await!(self.sync_best_block_updated(&confirmables, &mut sync_state, &tip_hash)) {
Ok(()) => {}
match maybe_await!(self.sync_best_block_updated(
&confirmables,
&mut sync_state,
&tip_hash
)) {
Ok(()) => {},
Err(InternalError::Inconsistency) => {
// Immediately restart syncing when we encounter any inconsistencies.
log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting.");
log_debug!(
self.logger,
"Encountered inconsistency during transaction sync, restarting."
);
sync_state.pending_sync = true;
continue;
}
},
Err(err) => {
// (Semi-)permanent failure, retry later.
log_error!(self.logger,
@ -172,7 +177,7 @@ where
);
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
}
},
}
}
@ -191,11 +196,9 @@ where
continue;
}
num_confirmed += confirmed_txs.len();
sync_state.sync_confirmed_transactions(
&confirmables,
confirmed_txs
);
}
sync_state
.sync_confirmed_transactions(&confirmables, confirmed_txs);
},
Err(err) => {
// (Semi-)permanent failure, retry later.
log_error!(self.logger,
@ -205,15 +208,18 @@ where
);
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
},
}
}
}
},
Err(InternalError::Inconsistency) => {
// Immediately restart syncing when we encounter any inconsistencies.
log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting.");
log_debug!(
self.logger,
"Encountered inconsistency during transaction sync, restarting."
);
sync_state.pending_sync = true;
continue;
}
},
Err(err) => {
// (Semi-)permanent failure, retry later.
log_error!(self.logger,
@ -223,26 +229,39 @@ where
);
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
}
},
}
sync_state.last_sync_hash = Some(tip_hash);
sync_state.pending_sync = false;
}
}
#[cfg(feature = "time")]
log_debug!(self.logger, "Finished transaction sync at tip {} in {}ms: {} confirmed, {} unconfirmed.",
tip_hash, start_time.elapsed().as_millis(), num_confirmed, num_unconfirmed);
log_debug!(
self.logger,
"Finished transaction sync at tip {} in {}ms: {} confirmed, {} unconfirmed.",
tip_hash,
start_time.elapsed().as_millis(),
num_confirmed,
num_unconfirmed
);
#[cfg(not(feature = "time"))]
log_debug!(self.logger, "Finished transaction sync at tip {}: {} confirmed, {} unconfirmed.",
tip_hash, num_confirmed, num_unconfirmed);
log_debug!(
self.logger,
"Finished transaction sync at tip {}: {} confirmed, {} unconfirmed.",
tip_hash,
num_confirmed,
num_unconfirmed
);
Ok(())
}
#[maybe_async]
fn sync_best_block_updated(
&self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, sync_state: &mut SyncState, tip_hash: &BlockHash,
) -> Result<(), InternalError> {
fn sync_best_block_updated<C: Deref>(
&self, confirmables: &Vec<C>, sync_state: &mut SyncState, tip_hash: &BlockHash,
) -> Result<(), InternalError>
where
C::Target: Confirm,
{
// Inform the interface of the new block.
let tip_header = maybe_await!(self.client.get_header_by_hash(tip_hash))?;
let tip_status = maybe_await!(self.client.get_block_status(&tip_hash))?;
@ -265,7 +284,6 @@ where
fn get_confirmed_transactions(
&self, sync_state: &SyncState,
) -> Result<Vec<ConfirmedTx>, InternalError> {
// First, check the confirmation status of registered transactions as well as the
// status of dependent transactions of registered outputs.
@ -281,7 +299,8 @@ where
}
for (_, output) in &sync_state.watched_outputs {
if let Some(output_status) = maybe_await!(self.client
if let Some(output_status) = maybe_await!(self
.client
.get_output_status(&output.outpoint.txid, output.outpoint.index as u64))?
{
if let Some(spending_txid) = output_status.txid {
@ -296,13 +315,11 @@ where
}
}
if let Some(confirmed_tx) = maybe_await!(self
.get_confirmed_tx(
if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(
spending_txid,
spending_tx_status.block_hash,
spending_tx_status.block_height,
))?
{
))? {
confirmed_txs.push(confirmed_tx);
}
}
@ -328,7 +345,13 @@ where
let block_hash = block_header.block_hash();
if let Some(expected_block_hash) = expected_block_hash {
if expected_block_hash != block_hash {
log_trace!(self.logger, "Inconsistency: Tx {} expected in block {}, but is confirmed in {}", txid, expected_block_hash, block_hash);
log_trace!(
self.logger,
"Inconsistency: Tx {} expected in block {}, but is confirmed in {}",
txid,
expected_block_hash,
block_hash
);
return Err(InternalError::Inconsistency);
}
}
@ -360,7 +383,11 @@ where
} else {
// If any previously-confirmed block suddenly is no longer confirmed, we found
// an inconsistency and should start over.
log_trace!(self.logger, "Inconsistency: Tx {} was unconfirmed during syncing.", txid);
log_trace!(
self.logger,
"Inconsistency: Tx {} was unconfirmed during syncing.",
txid
);
return Err(InternalError::Inconsistency);
}
}
@ -369,9 +396,12 @@ where
}
#[maybe_async]
fn get_unconfirmed_transactions(
&self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>,
) -> Result<Vec<Txid>, InternalError> {
fn get_unconfirmed_transactions<C: Deref>(
&self, confirmables: &Vec<C>,
) -> Result<Vec<Txid>, InternalError>
where
C::Target: Confirm,
{
// Query the interface for relevant txids and check whether the relevant blocks are still
// in the best chain, mark them unconfirmed otherwise
let relevant_txids = confirmables
@ -417,7 +447,6 @@ type EsploraClientType = AsyncClient;
#[cfg(not(feature = "async-interface"))]
type EsploraClientType = BlockingClient;
impl<L: Deref> Filter for EsploraSyncClient<L>
where
L::Target: Logger,

View file

@ -60,10 +60,8 @@
#![deny(rustdoc::broken_intra_doc_links)]
#![deny(rustdoc::private_intra_doc_links)]
#![deny(missing_docs)]
#![deny(unsafe_code)]
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
@ -83,7 +81,7 @@ mod error;
#[cfg(any(feature = "esplora-blocking", feature = "esplora-async", feature = "electrum"))]
pub use error::TxSyncError;
#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
pub use esplora::EsploraSyncClient;
#[cfg(feature = "electrum")]
pub use electrum::ElectrumSyncClient;
#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
pub use esplora::EsploraSyncClient;

View file

@ -1,26 +1,29 @@
#![cfg(all(not(target_os = "windows"), any(feature = "esplora-blocking", feature = "esplora-async", feature = "electrum")))]
#![cfg(all(
not(target_os = "windows"),
any(feature = "esplora-blocking", feature = "esplora-async", feature = "electrum")
))]
#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
use lightning_transaction_sync::EsploraSyncClient;
use lightning::chain::transaction::{OutPoint, TransactionData};
use lightning::chain::{Confirm, Filter, WatchedOutput};
use lightning::util::test_utils::TestLogger;
#[cfg(feature = "electrum")]
use lightning_transaction_sync::ElectrumSyncClient;
use lightning::chain::{Confirm, Filter, WatchedOutput};
use lightning::chain::transaction::{OutPoint, TransactionData};
use lightning::util::test_utils::TestLogger;
#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
use lightning_transaction_sync::EsploraSyncClient;
use electrsd::{bitcoind, bitcoind::BitcoinD, ElectrsD};
use bitcoin::{Amount, Txid, BlockHash};
use bdk_macros::maybe_await;
use bitcoin::blockdata::block::Header;
use bitcoin::blockdata::constants::genesis_block;
use bitcoin::network::Network;
use electrsd::bitcoind::bitcoincore_rpc::bitcoincore_rpc_json::AddressType;
use bitcoin::{Amount, BlockHash, Txid};
use bitcoind::bitcoincore_rpc::RpcApi;
use bdk_macros::maybe_await;
use electrsd::bitcoind::bitcoincore_rpc::bitcoincore_rpc_json::AddressType;
use electrsd::{bitcoind, bitcoind::BitcoinD, ElectrsD};
use std::collections::{HashMap, HashSet};
use std::env;
use std::sync::Mutex;
use std::time::Duration;
use std::collections::{HashMap, HashSet};
pub fn setup_bitcoind_and_electrsd() -> (BitcoinD, ElectrsD) {
let bitcoind_exe =
@ -63,8 +66,11 @@ pub fn wait_for_block(electrsd: &ElectrsD, min_height: usize) {
// it didn't. Since we can't proceed without subscribing, we try again after a delay
// and panic if it still fails.
std::thread::sleep(Duration::from_secs(1));
electrsd.client.block_headers_subscribe_raw().expect("failed to subscribe to block headers")
}
electrsd
.client
.block_headers_subscribe_raw()
.expect("failed to subscribe to block headers")
},
};
loop {
if header.height >= min_height {
@ -90,7 +96,7 @@ where
None if delay.as_millis() < 512 => {
delay = delay.mul_f32(2.0);
tries += 1;
}
},
None if tries == 10 => panic!("Exceeded our maximum wait time."),
None => tries += 1,
}
@ -132,7 +138,8 @@ impl Confirm for TestConfirmable {
let block_hash = header.block_hash();
self.confirmed_txs.lock().unwrap().insert(txid, (block_hash, height));
self.unconfirmed_txs.lock().unwrap().remove(&txid);
self.events.lock().unwrap().push(TestConfirmableEvent::Confirmed(txid, block_hash, height));
let event = TestConfirmableEvent::Confirmed(txid, block_hash, height);
self.events.lock().unwrap().push(event);
}
}
@ -145,11 +152,13 @@ impl Confirm for TestConfirmable {
fn best_block_updated(&self, header: &Header, height: u32) {
let block_hash = header.block_hash();
*self.best_block.lock().unwrap() = (block_hash, height);
self.events.lock().unwrap().push(TestConfirmableEvent::BestBlockUpdated(block_hash, height));
let event = TestConfirmableEvent::BestBlockUpdated(block_hash, height);
self.events.lock().unwrap().push(event);
}
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
self.confirmed_txs.lock().unwrap().iter().map(|(&txid, (hash, height))| (txid, *height, Some(*hash))).collect::<Vec<_>>()
let lock = self.confirmed_txs.lock().unwrap();
lock.iter().map(|(&txid, (hash, height))| (txid, *height, Some(*hash))).collect()
}
}
@ -165,12 +174,37 @@ macro_rules! test_syncing {
assert_eq!(events.len(), 1);
// Check registered confirmed transactions are marked confirmed
let new_address = $bitcoind.client.get_new_address(Some("test"),
Some(AddressType::Legacy)).unwrap().assume_checked();
let txid = $bitcoind.client.send_to_address(&new_address, Amount::from_sat(5000), None, None,
None, None, None, None).unwrap();
let second_txid = $bitcoind.client.send_to_address(&new_address, Amount::from_sat(5000), None,
None, None, None, None, None).unwrap();
let new_address = $bitcoind
.client
.get_new_address(Some("test"), Some(AddressType::Legacy))
.unwrap()
.assume_checked();
let txid = $bitcoind
.client
.send_to_address(
&new_address,
Amount::from_sat(5000),
None,
None,
None,
None,
None,
None,
)
.unwrap();
let second_txid = $bitcoind
.client
.send_to_address(
&new_address,
Amount::from_sat(5000),
None,
None,
None,
None,
None,
None,
)
.unwrap();
$tx_sync.register_tx(&txid, &new_address.payload().script_pubkey());
maybe_await!($tx_sync.sync(vec![&$confirmable])).unwrap();
@ -193,13 +227,17 @@ macro_rules! test_syncing {
let block_hash = tx_res.info.blockhash.unwrap();
let tx = tx_res.transaction().unwrap();
let prev_outpoint = tx.input.first().unwrap().previous_output;
let prev_tx = $bitcoind.client.get_transaction(&prev_outpoint.txid, None).unwrap().transaction()
let prev_tx = $bitcoind
.client
.get_transaction(&prev_outpoint.txid, None)
.unwrap()
.transaction()
.unwrap();
let prev_script_pubkey = prev_tx.output[prev_outpoint.vout as usize].script_pubkey.clone();
let output = WatchedOutput {
block_hash: Some(block_hash),
outpoint: OutPoint { txid: prev_outpoint.txid, index: prev_outpoint.vout as u16 },
script_pubkey: prev_script_pubkey
script_pubkey: prev_script_pubkey,
};
$tx_sync.register_output(output);

View file

@ -30,12 +30,6 @@
./lightning-rapid-gossip-sync/src/error.rs
./lightning-rapid-gossip-sync/src/lib.rs
./lightning-rapid-gossip-sync/src/processing.rs
./lightning-transaction-sync/src/common.rs
./lightning-transaction-sync/src/electrum.rs
./lightning-transaction-sync/src/error.rs
./lightning-transaction-sync/src/esplora.rs
./lightning-transaction-sync/src/lib.rs
./lightning-transaction-sync/tests/integration_tests.rs
./lightning/src/blinded_path/message.rs
./lightning/src/blinded_path/mod.rs
./lightning/src/blinded_path/payment.rs