mirror of
https://github.com/lightningdevkit/rust-lightning.git
synced 2025-02-24 23:08:36 +01:00
Merge pull request #2946 from tnull/2024-03-txsync-readd-reorged-output-spends
Tx-Sync: Track spent `WatchedOutput`s and re-add if unconfirmed
This commit is contained in:
commit
650caa099d
3 changed files with 68 additions and 11 deletions
|
@ -1,4 +1,5 @@
|
|||
use lightning::chain::{Confirm, WatchedOutput};
|
||||
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
|
||||
use bitcoin::{Txid, BlockHash, Transaction, OutPoint};
|
||||
use bitcoin::block::Header;
|
||||
|
||||
|
@ -13,6 +14,9 @@ pub(crate) struct SyncState {
|
|||
// Outputs that were previously processed, but must not be forgotten yet as
|
||||
// as we still need to monitor any spends on-chain.
|
||||
pub watched_outputs: HashMap<OutPoint, WatchedOutput>,
|
||||
// Outputs for which we previously saw a spend on-chain but kept around until the spends reach
|
||||
// sufficient depth.
|
||||
pub outputs_spends_pending_threshold_conf: Vec<(Txid, u32, OutPoint, WatchedOutput)>,
|
||||
// The tip hash observed during our last sync.
|
||||
pub last_sync_hash: Option<BlockHash>,
|
||||
// Indicates whether we need to resync, e.g., after encountering an error.
|
||||
|
@ -24,6 +28,7 @@ impl SyncState {
|
|||
Self {
|
||||
watched_transactions: HashSet::new(),
|
||||
watched_outputs: HashMap::new(),
|
||||
outputs_spends_pending_threshold_conf: Vec::new(),
|
||||
last_sync_hash: None,
|
||||
pending_sync: false,
|
||||
}
|
||||
|
@ -38,6 +43,17 @@ impl SyncState {
|
|||
}
|
||||
|
||||
self.watched_transactions.insert(txid);
|
||||
|
||||
// If a previously-confirmed output spend is unconfirmed, re-add the watched output to
|
||||
// the tracking map.
|
||||
self.outputs_spends_pending_threshold_conf.retain(|(conf_txid, _, prev_outpoint, output)| {
|
||||
if txid == *conf_txid {
|
||||
self.watched_outputs.insert(*prev_outpoint, output.clone());
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -57,10 +73,18 @@ impl SyncState {
|
|||
self.watched_transactions.remove(&ctx.tx.txid());
|
||||
|
||||
for input in &ctx.tx.input {
|
||||
self.watched_outputs.remove(&input.previous_output);
|
||||
if let Some(output) = self.watched_outputs.remove(&input.previous_output) {
|
||||
self.outputs_spends_pending_threshold_conf.push((ctx.tx.txid(), ctx.block_height, input.previous_output, output));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn prune_output_spends(&mut self, cur_height: u32) {
|
||||
self.outputs_spends_pending_threshold_conf.retain(|(_, conf_height, _, _)| {
|
||||
cur_height < conf_height + ANTI_REORG_DELAY - 1
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -104,6 +128,7 @@ impl FilterQueue {
|
|||
#[derive(Debug)]
|
||||
pub(crate) struct ConfirmedTx {
|
||||
pub tx: Transaction,
|
||||
pub txid: Txid,
|
||||
pub block_header: Header,
|
||||
pub block_height: u32,
|
||||
pub pos: usize,
|
||||
|
|
|
@ -157,6 +157,9 @@ where
|
|||
for c in &confirmables {
|
||||
c.best_block_updated(&tip_header, tip_height);
|
||||
}
|
||||
|
||||
// Prune any sufficiently confirmed output spends
|
||||
sync_state.prune_output_spends(tip_height);
|
||||
}
|
||||
|
||||
match self.get_confirmed_transactions(&sync_state) {
|
||||
|
@ -254,7 +257,7 @@ where
|
|||
|
||||
// First, check the confirmation status of registered transactions as well as the
|
||||
// status of dependent transactions of registered outputs.
|
||||
let mut confirmed_txs = Vec::new();
|
||||
let mut confirmed_txs: Vec<ConfirmedTx> = Vec::new();
|
||||
let mut watched_script_pubkeys = Vec::with_capacity(
|
||||
sync_state.watched_transactions.len() + sync_state.watched_outputs.len());
|
||||
let mut watched_txs = Vec::with_capacity(sync_state.watched_transactions.len());
|
||||
|
@ -302,6 +305,9 @@ where
|
|||
|
||||
for (i, script_history) in tx_results.iter().enumerate() {
|
||||
let (txid, tx) = &watched_txs[i];
|
||||
if confirmed_txs.iter().any(|ctx| ctx.txid == **txid) {
|
||||
continue;
|
||||
}
|
||||
let mut filtered_history = script_history.iter().filter(|h| h.tx_hash == **txid);
|
||||
if let Some(history) = filtered_history.next()
|
||||
{
|
||||
|
@ -321,6 +327,10 @@ where
|
|||
}
|
||||
|
||||
let txid = possible_output_spend.tx_hash;
|
||||
if confirmed_txs.iter().any(|ctx| ctx.txid == txid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
match self.client.transaction_get(&txid) {
|
||||
Ok(tx) => {
|
||||
let mut is_spend = false;
|
||||
|
@ -416,6 +426,7 @@ where
|
|||
}
|
||||
let confirmed_tx = ConfirmedTx {
|
||||
tx: tx.clone(),
|
||||
txid,
|
||||
block_header, block_height: prob_conf_height,
|
||||
pos,
|
||||
};
|
||||
|
|
|
@ -153,7 +153,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
match maybe_await!(self.sync_best_block_updated(&confirmables, &tip_hash)) {
|
||||
match maybe_await!(self.sync_best_block_updated(&confirmables, &mut sync_state, &tip_hash)) {
|
||||
Ok(()) => {}
|
||||
Err(InternalError::Inconsistency) => {
|
||||
// Immediately restart syncing when we encounter any inconsistencies.
|
||||
|
@ -238,7 +238,7 @@ where
|
|||
|
||||
#[maybe_async]
|
||||
fn sync_best_block_updated(
|
||||
&self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, tip_hash: &BlockHash,
|
||||
&self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, sync_state: &mut SyncState, tip_hash: &BlockHash,
|
||||
) -> Result<(), InternalError> {
|
||||
|
||||
// Inform the interface of the new block.
|
||||
|
@ -249,6 +249,9 @@ where
|
|||
for c in confirmables {
|
||||
c.best_block_updated(&tip_header, tip_height);
|
||||
}
|
||||
|
||||
// Prune any sufficiently confirmed output spends
|
||||
sync_state.prune_output_spends(tip_height);
|
||||
}
|
||||
} else {
|
||||
return Err(InternalError::Inconsistency);
|
||||
|
@ -264,10 +267,13 @@ where
|
|||
// First, check the confirmation status of registered transactions as well as the
|
||||
// status of dependent transactions of registered outputs.
|
||||
|
||||
let mut confirmed_txs = Vec::new();
|
||||
let mut confirmed_txs: Vec<ConfirmedTx> = Vec::new();
|
||||
|
||||
for txid in &sync_state.watched_transactions {
|
||||
if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(&txid, None, None))? {
|
||||
if confirmed_txs.iter().any(|ctx| ctx.txid == *txid) {
|
||||
continue;
|
||||
}
|
||||
if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(*txid, None, None))? {
|
||||
confirmed_txs.push(confirmed_tx);
|
||||
}
|
||||
}
|
||||
|
@ -278,9 +284,19 @@ where
|
|||
{
|
||||
if let Some(spending_txid) = output_status.txid {
|
||||
if let Some(spending_tx_status) = output_status.status {
|
||||
if confirmed_txs.iter().any(|ctx| ctx.txid == spending_txid) {
|
||||
if spending_tx_status.confirmed {
|
||||
// Skip inserting duplicate ConfirmedTx entry
|
||||
continue;
|
||||
} else {
|
||||
log_trace!(self.logger, "Inconsistency: Detected previously-confirmed Tx {} as unconfirmed", spending_txid);
|
||||
return Err(InternalError::Inconsistency);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(confirmed_tx) = maybe_await!(self
|
||||
.get_confirmed_tx(
|
||||
&spending_txid,
|
||||
spending_txid,
|
||||
spending_tx_status.block_hash,
|
||||
spending_tx_status.block_height,
|
||||
))?
|
||||
|
@ -303,7 +319,7 @@ where
|
|||
|
||||
#[maybe_async]
|
||||
fn get_confirmed_tx(
|
||||
&self, txid: &Txid, expected_block_hash: Option<BlockHash>, known_block_height: Option<u32>,
|
||||
&self, txid: Txid, expected_block_hash: Option<BlockHash>, known_block_height: Option<u32>,
|
||||
) -> Result<Option<ConfirmedTx>, InternalError> {
|
||||
if let Some(merkle_block) = maybe_await!(self.client.get_merkle_block(&txid))? {
|
||||
let block_header = merkle_block.header;
|
||||
|
@ -318,7 +334,7 @@ where
|
|||
let mut matches = Vec::new();
|
||||
let mut indexes = Vec::new();
|
||||
let _ = merkle_block.txn.extract_matches(&mut matches, &mut indexes);
|
||||
if indexes.len() != 1 || matches.len() != 1 || matches[0] != *txid {
|
||||
if indexes.len() != 1 || matches.len() != 1 || matches[0] != txid {
|
||||
log_error!(self.logger, "Retrieved Merkle block for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid);
|
||||
return Err(InternalError::Failed);
|
||||
}
|
||||
|
@ -326,14 +342,19 @@ where
|
|||
// unwrap() safety: len() > 0 is checked above
|
||||
let pos = *indexes.first().unwrap() as usize;
|
||||
if let Some(tx) = maybe_await!(self.client.get_tx(&txid))? {
|
||||
if tx.txid() != txid {
|
||||
log_error!(self.logger, "Retrieved transaction for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid);
|
||||
return Err(InternalError::Failed);
|
||||
}
|
||||
|
||||
if let Some(block_height) = known_block_height {
|
||||
// We can take a shortcut here if a previous call already gave us the height.
|
||||
return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height }));
|
||||
return Ok(Some(ConfirmedTx { tx, txid, block_header, pos, block_height }));
|
||||
}
|
||||
|
||||
let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
|
||||
if let Some(block_height) = block_status.height {
|
||||
return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height }));
|
||||
return Ok(Some(ConfirmedTx { tx, txid, block_header, pos, block_height }));
|
||||
} else {
|
||||
// If any previously-confirmed block suddenly is no longer confirmed, we found
|
||||
// an inconsistency and should start over.
|
||||
|
|
Loading…
Add table
Reference in a new issue