From 2f581104b236eace500d3f8153ee5c5dafa42390 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 19 Mar 2024 15:07:27 +0100 Subject: [PATCH 1/2] Track spent `WatchedOutput`s and re-add if unconfirmed Previously, we would track a spending transaction but wouldn't account for it being reorged out of the chain, in which case we wouldn't monitor the `WatchedOutput`s until they'd be reloaded on restart. Here, we keep any `WatchedOutput`s around until their spends are sufficiently confirmed and only prune them after `ANTI_REORG_DELAY`. --- lightning-transaction-sync/src/common.rs | 26 +++++++++++++++++++++- lightning-transaction-sync/src/electrum.rs | 3 +++ lightning-transaction-sync/src/esplora.rs | 7 ++++-- 3 files changed, 33 insertions(+), 3 deletions(-) diff --git a/lightning-transaction-sync/src/common.rs b/lightning-transaction-sync/src/common.rs index be49fbe96..442f3bd31 100644 --- a/lightning-transaction-sync/src/common.rs +++ b/lightning-transaction-sync/src/common.rs @@ -1,4 +1,5 @@ use lightning::chain::{Confirm, WatchedOutput}; +use lightning::chain::channelmonitor::ANTI_REORG_DELAY; use bitcoin::{Txid, BlockHash, Transaction, OutPoint}; use bitcoin::block::Header; @@ -13,6 +14,9 @@ pub(crate) struct SyncState { // Outputs that were previously processed, but must not be forgotten yet as // as we still need to monitor any spends on-chain. pub watched_outputs: HashMap, + // Outputs for which we previously saw a spend on-chain but kept around until the spends reach + // sufficient depth. + pub outputs_spends_pending_threshold_conf: Vec<(Txid, u32, OutPoint, WatchedOutput)>, // The tip hash observed during our last sync. pub last_sync_hash: Option, // Indicates whether we need to resync, e.g., after encountering an error. @@ -24,6 +28,7 @@ impl SyncState { Self { watched_transactions: HashSet::new(), watched_outputs: HashMap::new(), + outputs_spends_pending_threshold_conf: Vec::new(), last_sync_hash: None, pending_sync: false, } @@ -38,6 +43,17 @@ impl SyncState { } self.watched_transactions.insert(txid); + + // If a previously-confirmed output spend is unconfirmed, re-add the watched output to + // the tracking map. + self.outputs_spends_pending_threshold_conf.retain(|(conf_txid, _, prev_outpoint, output)| { + if txid == *conf_txid { + self.watched_outputs.insert(*prev_outpoint, output.clone()); + false + } else { + true + } + }) } } @@ -57,10 +73,18 @@ impl SyncState { self.watched_transactions.remove(&ctx.tx.txid()); for input in &ctx.tx.input { - self.watched_outputs.remove(&input.previous_output); + if let Some(output) = self.watched_outputs.remove(&input.previous_output) { + self.outputs_spends_pending_threshold_conf.push((ctx.tx.txid(), ctx.block_height, input.previous_output, output)); + } } } } + + pub fn prune_output_spends(&mut self, cur_height: u32) { + self.outputs_spends_pending_threshold_conf.retain(|(_, conf_height, _, _)| { + cur_height < conf_height + ANTI_REORG_DELAY - 1 + }); + } } diff --git a/lightning-transaction-sync/src/electrum.rs b/lightning-transaction-sync/src/electrum.rs index d0c8afef7..efffafb14 100644 --- a/lightning-transaction-sync/src/electrum.rs +++ b/lightning-transaction-sync/src/electrum.rs @@ -157,6 +157,9 @@ where for c in &confirmables { c.best_block_updated(&tip_header, tip_height); } + + // Prune any sufficiently confirmed output spends + sync_state.prune_output_spends(tip_height); } match self.get_confirmed_transactions(&sync_state) { diff --git a/lightning-transaction-sync/src/esplora.rs b/lightning-transaction-sync/src/esplora.rs index eb52faf33..ffe687792 100644 --- a/lightning-transaction-sync/src/esplora.rs +++ b/lightning-transaction-sync/src/esplora.rs @@ -153,7 +153,7 @@ where } } - match maybe_await!(self.sync_best_block_updated(&confirmables, &tip_hash)) { + match maybe_await!(self.sync_best_block_updated(&confirmables, &mut sync_state, &tip_hash)) { Ok(()) => {} Err(InternalError::Inconsistency) => { // Immediately restart syncing when we encounter any inconsistencies. @@ -238,7 +238,7 @@ where #[maybe_async] fn sync_best_block_updated( - &self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, tip_hash: &BlockHash, + &self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, sync_state: &mut SyncState, tip_hash: &BlockHash, ) -> Result<(), InternalError> { // Inform the interface of the new block. @@ -249,6 +249,9 @@ where for c in confirmables { c.best_block_updated(&tip_header, tip_height); } + + // Prune any sufficiently confirmed output spends + sync_state.prune_output_spends(tip_height); } } else { return Err(InternalError::Inconsistency); From b71c6e2f6701f0e0f1b4c92db00772edbc4d262d Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 19 Mar 2024 15:30:16 +0100 Subject: [PATCH 2/2] Dedup `confirmed_txs` `Vec` Previously, we would just push to the `confirmed_txs` `Vec`, leading to redundant `Confirm::transactions_confirmed` calls, especially now that we re-confirm previously disconnected spends. Here, we ensure that we don't push additional `ConfirmedTx` entries if already one with matching `Txid` is present. This not only gets rid of the spurious `transactions_confirmed` calls (which are harmless), but more importantly saves us from issuing unnecessary network calls, which improves latency. --- lightning-transaction-sync/src/common.rs | 1 + lightning-transaction-sync/src/electrum.rs | 10 ++++++- lightning-transaction-sync/src/esplora.rs | 32 +++++++++++++++++----- 3 files changed, 35 insertions(+), 8 deletions(-) diff --git a/lightning-transaction-sync/src/common.rs b/lightning-transaction-sync/src/common.rs index 442f3bd31..c635f7385 100644 --- a/lightning-transaction-sync/src/common.rs +++ b/lightning-transaction-sync/src/common.rs @@ -128,6 +128,7 @@ impl FilterQueue { #[derive(Debug)] pub(crate) struct ConfirmedTx { pub tx: Transaction, + pub txid: Txid, pub block_header: Header, pub block_height: u32, pub pos: usize, diff --git a/lightning-transaction-sync/src/electrum.rs b/lightning-transaction-sync/src/electrum.rs index efffafb14..046f698d4 100644 --- a/lightning-transaction-sync/src/electrum.rs +++ b/lightning-transaction-sync/src/electrum.rs @@ -257,7 +257,7 @@ where // First, check the confirmation status of registered transactions as well as the // status of dependent transactions of registered outputs. - let mut confirmed_txs = Vec::new(); + let mut confirmed_txs: Vec = Vec::new(); let mut watched_script_pubkeys = Vec::with_capacity( sync_state.watched_transactions.len() + sync_state.watched_outputs.len()); let mut watched_txs = Vec::with_capacity(sync_state.watched_transactions.len()); @@ -305,6 +305,9 @@ where for (i, script_history) in tx_results.iter().enumerate() { let (txid, tx) = &watched_txs[i]; + if confirmed_txs.iter().any(|ctx| ctx.txid == **txid) { + continue; + } let mut filtered_history = script_history.iter().filter(|h| h.tx_hash == **txid); if let Some(history) = filtered_history.next() { @@ -324,6 +327,10 @@ where } let txid = possible_output_spend.tx_hash; + if confirmed_txs.iter().any(|ctx| ctx.txid == txid) { + continue; + } + match self.client.transaction_get(&txid) { Ok(tx) => { let mut is_spend = false; @@ -419,6 +426,7 @@ where } let confirmed_tx = ConfirmedTx { tx: tx.clone(), + txid, block_header, block_height: prob_conf_height, pos, }; diff --git a/lightning-transaction-sync/src/esplora.rs b/lightning-transaction-sync/src/esplora.rs index ffe687792..538918ada 100644 --- a/lightning-transaction-sync/src/esplora.rs +++ b/lightning-transaction-sync/src/esplora.rs @@ -267,10 +267,13 @@ where // First, check the confirmation status of registered transactions as well as the // status of dependent transactions of registered outputs. - let mut confirmed_txs = Vec::new(); + let mut confirmed_txs: Vec = Vec::new(); for txid in &sync_state.watched_transactions { - if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(&txid, None, None))? { + if confirmed_txs.iter().any(|ctx| ctx.txid == *txid) { + continue; + } + if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(*txid, None, None))? { confirmed_txs.push(confirmed_tx); } } @@ -281,9 +284,19 @@ where { if let Some(spending_txid) = output_status.txid { if let Some(spending_tx_status) = output_status.status { + if confirmed_txs.iter().any(|ctx| ctx.txid == spending_txid) { + if spending_tx_status.confirmed { + // Skip inserting duplicate ConfirmedTx entry + continue; + } else { + log_trace!(self.logger, "Inconsistency: Detected previously-confirmed Tx {} as unconfirmed", spending_txid); + return Err(InternalError::Inconsistency); + } + } + if let Some(confirmed_tx) = maybe_await!(self .get_confirmed_tx( - &spending_txid, + spending_txid, spending_tx_status.block_hash, spending_tx_status.block_height, ))? @@ -306,7 +319,7 @@ where #[maybe_async] fn get_confirmed_tx( - &self, txid: &Txid, expected_block_hash: Option, known_block_height: Option, + &self, txid: Txid, expected_block_hash: Option, known_block_height: Option, ) -> Result, InternalError> { if let Some(merkle_block) = maybe_await!(self.client.get_merkle_block(&txid))? { let block_header = merkle_block.header; @@ -321,7 +334,7 @@ where let mut matches = Vec::new(); let mut indexes = Vec::new(); let _ = merkle_block.txn.extract_matches(&mut matches, &mut indexes); - if indexes.len() != 1 || matches.len() != 1 || matches[0] != *txid { + if indexes.len() != 1 || matches.len() != 1 || matches[0] != txid { log_error!(self.logger, "Retrieved Merkle block for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid); return Err(InternalError::Failed); } @@ -329,14 +342,19 @@ where // unwrap() safety: len() > 0 is checked above let pos = *indexes.first().unwrap() as usize; if let Some(tx) = maybe_await!(self.client.get_tx(&txid))? { + if tx.txid() != txid { + log_error!(self.logger, "Retrieved transaction for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid); + return Err(InternalError::Failed); + } + if let Some(block_height) = known_block_height { // We can take a shortcut here if a previous call already gave us the height. - return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height })); + return Ok(Some(ConfirmedTx { tx, txid, block_header, pos, block_height })); } let block_status = maybe_await!(self.client.get_block_status(&block_hash))?; if let Some(block_height) = block_status.height { - return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height })); + return Ok(Some(ConfirmedTx { tx, txid, block_header, pos, block_height })); } else { // If any previously-confirmed block suddenly is no longer confirmed, we found // an inconsistency and should start over.