From d8021c7891ca039751b250511fbcc06da32e9e7e Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 16 Apr 2024 10:52:51 +0200 Subject: [PATCH 1/7] Introduce `OutputSpender` trait and implement for `(Phantom)KeysManager` .. we move `spend_spendable_outputs` to a new trait which we can easily reuse in `OutputSweeper` later. --- lightning/src/ln/functional_tests.rs | 6 +- lightning/src/ln/monitor_tests.rs | 2 +- lightning/src/ln/reorg_tests.rs | 1 + lightning/src/sign/mod.rs | 139 +++++++++++++++------------ 4 files changed, 84 insertions(+), 64 deletions(-) diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 8dd3f1fc9..17616c5d8 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -17,7 +17,7 @@ use crate::chain::chaininterface::LowerBoundedFeeEstimator; use crate::chain::channelmonitor; use crate::chain::channelmonitor::{CLOSED_CHANNEL_UPDATE_ID, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY}; use crate::chain::transaction::OutPoint; -use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider}; +use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, OutputSpender, SignerProvider}; use crate::events::{Event, MessageSendEvent, MessageSendEventsProvider, PathFailure, PaymentPurpose, ClosureReason, HTLCDestination, PaymentFailureReason}; use crate::ln::{ChannelId, PaymentPreimage, PaymentSecret, PaymentHash}; use crate::ln::channel::{commitment_tx_base_weight, COMMITMENT_TX_WEIGHT_PER_HTLC, CONCURRENT_INBOUND_HTLC_FEE_BUFFER, FEE_SPIKE_BUFFER_FEE_INCREASE_MULTIPLE, MIN_AFFORDABLE_HTLC_COUNT, get_holder_selected_channel_reserve_satoshis, OutboundV1Channel, InboundV1Channel, COINBASE_MATURITY, ChannelPhase}; @@ -9951,9 +9951,9 @@ fn do_test_max_dust_htlc_exposure(dust_outbound_balance: bool, exposure_breach_e let dust_outbound_htlc_on_holder_tx: u64 = max_dust_htlc_exposure_msat / dust_outbound_htlc_on_holder_tx_msat; // Substract 3 sats for multiplier and 2 sats for fixed limit to make sure we are 50% below the dust limit. - // This is to make sure we fully use the dust limit. If we don't, we could end up with `dust_ibd_htlc_on_holder_tx` being 1 + // This is to make sure we fully use the dust limit. If we don't, we could end up with `dust_ibd_htlc_on_holder_tx` being 1 // while `max_dust_htlc_exposure_msat` is not equal to `dust_outbound_htlc_on_holder_tx_msat`. - let dust_inbound_htlc_on_holder_tx_msat: u64 = (dust_buffer_feerate * htlc_success_tx_weight(&channel_type_features) / 1000 + open_channel.common_fields.dust_limit_satoshis - if multiplier_dust_limit { 3 } else { 2 }) * 1000; + let dust_inbound_htlc_on_holder_tx_msat: u64 = (dust_buffer_feerate * htlc_success_tx_weight(&channel_type_features) / 1000 + open_channel.common_fields.dust_limit_satoshis - if multiplier_dust_limit { 3 } else { 2 }) * 1000; let dust_inbound_htlc_on_holder_tx: u64 = max_dust_htlc_exposure_msat / dust_inbound_htlc_on_holder_tx_msat; let dust_htlc_on_counterparty_tx: u64 = 4; diff --git a/lightning/src/ln/monitor_tests.rs b/lightning/src/ln/monitor_tests.rs index d5f0dc153..1d20fdb07 100644 --- a/lightning/src/ln/monitor_tests.rs +++ b/lightning/src/ln/monitor_tests.rs @@ -9,7 +9,7 @@ //! Further functional tests which test blockchain reorganizations. -use crate::sign::{ecdsa::EcdsaChannelSigner, SpendableOutputDescriptor}; +use crate::sign::{ecdsa::EcdsaChannelSigner, OutputSpender, SpendableOutputDescriptor}; use crate::chain::channelmonitor::{ANTI_REORG_DELAY, LATENCY_GRACE_PERIOD_BLOCKS, Balance}; use crate::chain::transaction::OutPoint; use crate::chain::chaininterface::{LowerBoundedFeeEstimator, compute_feerate_sat_per_1000_weight}; diff --git a/lightning/src/ln/reorg_tests.rs b/lightning/src/ln/reorg_tests.rs index 62c82b01f..c15365629 100644 --- a/lightning/src/ln/reorg_tests.rs +++ b/lightning/src/ln/reorg_tests.rs @@ -15,6 +15,7 @@ use crate::chain::transaction::OutPoint; use crate::chain::Confirm; use crate::events::{Event, MessageSendEventsProvider, ClosureReason, HTLCDestination, MessageSendEvent}; use crate::ln::msgs::{ChannelMessageHandler, Init}; +use crate::sign::OutputSpender; use crate::util::test_utils; use crate::util::ser::Writeable; use crate::util::string::UntrustedString; diff --git a/lightning/src/sign/mod.rs b/lightning/src/sign/mod.rs index 23266c13b..1459151de 100644 --- a/lightning/src/sign/mod.rs +++ b/lightning/src/sign/mod.rs @@ -805,6 +805,28 @@ pub trait NodeSigner { fn sign_gossip_message(&self, msg: UnsignedGossipMessage) -> Result; } +/// A trait that describes a wallet capable of creating a spending [`Transaction`] from a set of +/// [`SpendableOutputDescriptor`]s. +pub trait OutputSpender { + /// Creates a [`Transaction`] which spends the given descriptors to the given outputs, plus an + /// output to the given change destination (if sufficient change value remains). The + /// transaction will have a feerate, at least, of the given value. + /// + /// The `locktime` argument is used to set the transaction's locktime. If `None`, the + /// transaction will have a locktime of 0. It it recommended to set this to the current block + /// height to avoid fee sniping, unless you have some specific reason to use a different + /// locktime. + /// + /// Returns `Err(())` if the output value is greater than the input value minus required fee, + /// if a descriptor was duplicated, or if an output descriptor `script_pubkey` + /// does not match the one we can spend. + fn spend_spendable_outputs( + &self, descriptors: &[&SpendableOutputDescriptor], outputs: Vec, + change_destination_script: ScriptBuf, feerate_sat_per_1000_weight: u32, + locktime: Option, secp_ctx: &Secp256k1, + ) -> Result; +} + // Primarily needed in doctests because of https://github.com/rust-lang/rust/issues/67295 /// A dynamic [`SignerProvider`] temporarily needed for doc tests. #[cfg(taproot)] @@ -1991,50 +2013,6 @@ impl KeysManager { Ok(psbt) } - - /// Creates a [`Transaction`] which spends the given descriptors to the given outputs, plus an - /// output to the given change destination (if sufficient change value remains). The - /// transaction will have a feerate, at least, of the given value. - /// - /// The `locktime` argument is used to set the transaction's locktime. If `None`, the - /// transaction will have a locktime of 0. It it recommended to set this to the current block - /// height to avoid fee sniping, unless you have some specific reason to use a different - /// locktime. - /// - /// Returns `Err(())` if the output value is greater than the input value minus required fee, - /// if a descriptor was duplicated, or if an output descriptor `script_pubkey` - /// does not match the one we can spend. - /// - /// We do not enforce that outputs meet the dust limit or that any output scripts are standard. - /// - /// May panic if the [`SpendableOutputDescriptor`]s were not generated by channels which used - /// this [`KeysManager`] or one of the [`InMemorySigner`] created by this [`KeysManager`]. - pub fn spend_spendable_outputs( - &self, descriptors: &[&SpendableOutputDescriptor], outputs: Vec, - change_destination_script: ScriptBuf, feerate_sat_per_1000_weight: u32, - locktime: Option, secp_ctx: &Secp256k1, - ) -> Result { - let (mut psbt, expected_max_weight) = - SpendableOutputDescriptor::create_spendable_outputs_psbt( - descriptors, - outputs, - change_destination_script, - feerate_sat_per_1000_weight, - locktime, - )?; - psbt = self.sign_spendable_outputs_psbt(descriptors, psbt, secp_ctx)?; - - let spend_tx = psbt.extract_tx(); - - debug_assert!(expected_max_weight >= spend_tx.weight().to_wu()); - // Note that witnesses with a signature vary somewhat in size, so allow - // `expected_max_weight` to overshoot by up to 3 bytes per input. - debug_assert!( - expected_max_weight <= spend_tx.weight().to_wu() + descriptors.len() as u64 * 3 - ); - - Ok(spend_tx) - } } impl EntropySource for KeysManager { @@ -2106,6 +2084,44 @@ impl NodeSigner for KeysManager { } } +impl OutputSpender for KeysManager { + /// Creates a [`Transaction`] which spends the given descriptors to the given outputs, plus an + /// output to the given change destination (if sufficient change value remains). + /// + /// See [`OutputSpender::spend_spendable_outputs`] documentation for more information. + /// + /// We do not enforce that outputs meet the dust limit or that any output scripts are standard. + /// + /// May panic if the [`SpendableOutputDescriptor`]s were not generated by channels which used + /// this [`KeysManager`] or one of the [`InMemorySigner`] created by this [`KeysManager`]. + fn spend_spendable_outputs( + &self, descriptors: &[&SpendableOutputDescriptor], outputs: Vec, + change_destination_script: ScriptBuf, feerate_sat_per_1000_weight: u32, + locktime: Option, secp_ctx: &Secp256k1, + ) -> Result { + let (mut psbt, expected_max_weight) = + SpendableOutputDescriptor::create_spendable_outputs_psbt( + descriptors, + outputs, + change_destination_script, + feerate_sat_per_1000_weight, + locktime, + )?; + psbt = self.sign_spendable_outputs_psbt(descriptors, psbt, secp_ctx)?; + + let spend_tx = psbt.extract_tx(); + + debug_assert!(expected_max_weight >= spend_tx.weight().to_wu()); + // Note that witnesses with a signature vary somewhat in size, so allow + // `expected_max_weight` to overshoot by up to 3 bytes per input. + debug_assert!( + expected_max_weight <= spend_tx.weight().to_wu() + descriptors.len() as u64 * 3 + ); + + Ok(spend_tx) + } +} + impl SignerProvider for KeysManager { type EcdsaSigner = InMemorySigner; #[cfg(taproot)] @@ -2238,6 +2254,25 @@ impl NodeSigner for PhantomKeysManager { } } +impl OutputSpender for PhantomKeysManager { + /// See [`OutputSpender::spend_spendable_outputs`] and [`KeysManager::spend_spendable_outputs`] + /// for documentation on this method. + fn spend_spendable_outputs( + &self, descriptors: &[&SpendableOutputDescriptor], outputs: Vec, + change_destination_script: ScriptBuf, feerate_sat_per_1000_weight: u32, + locktime: Option, secp_ctx: &Secp256k1, + ) -> Result { + self.inner.spend_spendable_outputs( + descriptors, + outputs, + change_destination_script, + feerate_sat_per_1000_weight, + locktime, + secp_ctx, + ) + } +} + impl SignerProvider for PhantomKeysManager { type EcdsaSigner = InMemorySigner; #[cfg(taproot)] @@ -2299,22 +2334,6 @@ impl PhantomKeysManager { } } - /// See [`KeysManager::spend_spendable_outputs`] for documentation on this method. - pub fn spend_spendable_outputs( - &self, descriptors: &[&SpendableOutputDescriptor], outputs: Vec, - change_destination_script: ScriptBuf, feerate_sat_per_1000_weight: u32, - locktime: Option, secp_ctx: &Secp256k1, - ) -> Result { - self.inner.spend_spendable_outputs( - descriptors, - outputs, - change_destination_script, - feerate_sat_per_1000_weight, - locktime, - secp_ctx, - ) - } - /// See [`KeysManager::derive_channel_keys`] for documentation on this method. pub fn derive_channel_keys( &self, channel_value_satoshis: u64, params: &[u8; 32], From cd4cc203a2bbb716c8747bd45ac84e42b8f53b84 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 16 Apr 2024 11:12:43 +0200 Subject: [PATCH 2/7] Introduce `ChangeDestinationSource` trait .. which users should implement on their on-chain wallet to allow us to retrieve a new change destination script. --- lightning/src/sign/mod.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/lightning/src/sign/mod.rs b/lightning/src/sign/mod.rs index 1459151de..9b5efee4b 100644 --- a/lightning/src/sign/mod.rs +++ b/lightning/src/sign/mod.rs @@ -904,6 +904,17 @@ pub trait SignerProvider { fn get_shutdown_scriptpubkey(&self) -> Result; } +/// A helper trait that describes an on-chain wallet capable of returning a (change) destination +/// script. +pub trait ChangeDestinationSource { + /// Returns a script pubkey which can be used as a change destination for + /// [`OutputSpender::spend_spendable_outputs`]. + /// + /// This method should return a different value each time it is called, to avoid linking + /// on-chain funds controlled to the same user. + fn get_change_destination_script(&self) -> Result; +} + /// A simple implementation of [`WriteableEcdsaChannelSigner`] that just keeps the private keys in memory. /// /// This implementation performs no policy checks and is insufficient by itself as From ec956516d1db59d5e667cf3ebec2dc2e08148cdd Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 16 Apr 2024 13:41:57 +0200 Subject: [PATCH 3/7] Add `ConfirmationTarget::OutputSpendingFee` --- fuzz/src/chanmon_consistency.rs | 2 +- lightning/src/chain/chaininterface.rs | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 36e7cea8a..b3cf867d6 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -84,7 +84,7 @@ impl FeeEstimator for FuzzEstimator { // Background feerate which is <= the minimum Normal feerate. match conf_target { ConfirmationTarget::OnChainSweep => MAX_FEE, - ConfirmationTarget::ChannelCloseMinimum|ConfirmationTarget::AnchorChannelFee|ConfirmationTarget::MinAllowedAnchorChannelRemoteFee|ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee => 253, + ConfirmationTarget::ChannelCloseMinimum|ConfirmationTarget::AnchorChannelFee|ConfirmationTarget::MinAllowedAnchorChannelRemoteFee|ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee|ConfirmationTarget::OutputSpendingFee => 253, ConfirmationTarget::NonAnchorChannelFee => cmp::min(self.ret_val.load(atomic::Ordering::Acquire), MAX_FEE), } } diff --git a/lightning/src/chain/chaininterface.rs b/lightning/src/chain/chaininterface.rs index 2bf6d6130..68dea58dc 100644 --- a/lightning/src/chain/chaininterface.rs +++ b/lightning/src/chain/chaininterface.rs @@ -124,6 +124,8 @@ pub enum ConfirmationTarget { /// /// [`ChannelManager::close_channel_with_feerate_and_script`]: crate::ln::channelmanager::ChannelManager::close_channel_with_feerate_and_script ChannelCloseMinimum, + /// XXX + OutputSpendingFee, } /// A trait which should be implemented to provide feerate information on a number of time From 071337a1f1ba1ce99e39da86ba6fc6dee1280ff1 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 12 Jan 2024 11:36:08 +0100 Subject: [PATCH 4/7] Add `OutputSweeper` utility persisting and sweeping spendable outputs We add a `OutputSweeper` utility that allows to track the state of spendable output descriptors as emitted by `Event::SpendableOutputs`. To this end, the `OutputSweeper` persists the necessary information in our `KVStore` and regularly tries to sweep the spendable outputs, removing them after reaching threshold confirmations, i.e., `ANTI_REORG_DELAY`. --- lightning/src/chain/chaininterface.rs | 11 +- lightning/src/chain/mod.rs | 6 + lightning/src/util/mod.rs | 1 + lightning/src/util/persist.rs | 14 + lightning/src/util/sweep.rs | 826 ++++++++++++++++++++++++++ 5 files changed, 857 insertions(+), 1 deletion(-) create mode 100644 lightning/src/util/sweep.rs diff --git a/lightning/src/chain/chaininterface.rs b/lightning/src/chain/chaininterface.rs index 68dea58dc..2e37127e0 100644 --- a/lightning/src/chain/chaininterface.rs +++ b/lightning/src/chain/chaininterface.rs @@ -124,7 +124,16 @@ pub enum ConfirmationTarget { /// /// [`ChannelManager::close_channel_with_feerate_and_script`]: crate::ln::channelmanager::ChannelManager::close_channel_with_feerate_and_script ChannelCloseMinimum, - /// XXX + /// The feerate [`OutputSweeper`] will use on transactions spending + /// [`SpendableOutputDescriptor`]s after a channel closure. + /// + /// Generally spending these outputs is safe as long as they eventually confirm, so a value + /// (slightly above) the mempool minimum should suffice. However, as this value will influence + /// how long funds will be unavailable after channel closure, [`FeeEstimator`] implementors + /// might want to choose a higher feerate to regain control over funds faster. + /// + /// [`OutputSweeper`]: crate::util::sweep::OutputSweeper + /// [`SpendableOutputDescriptor`]: crate::sign::SpendableOutputDescriptor OutputSpendingFee, } diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index e22ccca98..1fb30a9ae 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -20,6 +20,7 @@ use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Monitor use crate::ln::ChannelId; use crate::sign::ecdsa::WriteableEcdsaChannelSigner; use crate::chain::transaction::{OutPoint, TransactionData}; +use crate::impl_writeable_tlv_based; #[allow(unused_imports)] use crate::prelude::*; @@ -56,6 +57,11 @@ impl BestBlock { } } +impl_writeable_tlv_based!(BestBlock, { + (0, block_hash, required), + (2, height, required), +}); + /// The `Listen` trait is used to notify when blocks have been connected or disconnected from the /// chain. diff --git a/lightning/src/util/mod.rs b/lightning/src/util/mod.rs index 31bdf1ca5..c1ab8c75c 100644 --- a/lightning/src/util/mod.rs +++ b/lightning/src/util/mod.rs @@ -22,6 +22,7 @@ pub mod invoice; pub mod persist; pub mod scid_utils; pub mod string; +pub mod sweep; pub mod wakers; #[cfg(fuzzing)] pub mod base32; diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 6fd0048da..23e41e4b5 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -70,6 +70,20 @@ pub const SCORER_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; /// The key under which the [`WriteableScore`] will be persisted. pub const SCORER_PERSISTENCE_KEY: &str = "scorer"; +/// The primary namespace under which [`OutputSweeper`] state will be persisted. +/// +/// [`OutputSweeper`]: crate::util::sweep::OutputSweeper +pub const OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE: &str = ""; +/// The secondary namespace under which [`OutputSweeper`] state will be persisted. +/// +/// [`OutputSweeper`]: crate::util::sweep::OutputSweeper +pub const OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; +/// The secondary namespace under which [`OutputSweeper`] state will be persisted. +/// The key under which [`OutputSweeper`] state will be persisted. +/// +/// [`OutputSweeper`]: crate::util::sweep::OutputSweeper +pub const OUTPUT_SWEEPER_PERSISTENCE_KEY: &str = "output_sweeper"; + /// A sentinel value to be prepended to monitors persisted by the [`MonitorUpdatingPersister`]. /// /// This serves to prevent someone from accidentally loading such monitors (which may need diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs new file mode 100644 index 000000000..a8393f70e --- /dev/null +++ b/lightning/src/util/sweep.rs @@ -0,0 +1,826 @@ +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +//! This module contains an [`OutputSweeper`] utility that keeps track of +//! [`SpendableOutputDescriptor`]s, i.e., persists them in a given [`KVStore`] and regularly retries +//! sweeping them. + +use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; +use crate::chain::channelmonitor::ANTI_REORG_DELAY; +use crate::chain::{self, BestBlock, Confirm, Filter, Listen, WatchedOutput}; +use crate::io; +use crate::ln::msgs::DecodeError; +use crate::ln::ChannelId; +use crate::prelude::Vec; +use crate::sign::{ChangeDestinationSource, OutputSpender, SpendableOutputDescriptor}; +use crate::sync::Mutex; +use crate::util::logger::Logger; +use crate::util::persist::{ + KVStore, OUTPUT_SWEEPER_PERSISTENCE_KEY, OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, +}; +use crate::util::ser::{Readable, ReadableArgs, Writeable}; +use crate::{impl_writeable_tlv_based, log_debug, log_error}; + +use bitcoin::blockdata::block::Header; +use bitcoin::blockdata::locktime::absolute::LockTime; +use bitcoin::secp256k1::Secp256k1; +use bitcoin::{BlockHash, Transaction, Txid}; + +use core::ops::Deref; + +/// The state of a spendable output currently tracked by an [`OutputSweeper`]. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct TrackedSpendableOutput { + /// The tracked output descriptor. + pub descriptor: SpendableOutputDescriptor, + /// The channel this output belongs to. + /// + /// Will be `None` if no `channel_id` was given to [`OutputSweeper::track_spendable_outputs`] + pub channel_id: Option, + /// The current status of the output spend. + pub status: OutputSpendStatus, +} + +impl TrackedSpendableOutput { + fn to_watched_output(&self, cur_hash: BlockHash) -> WatchedOutput { + let block_hash = self.status.first_broadcast_hash().or(Some(cur_hash)); + match &self.descriptor { + SpendableOutputDescriptor::StaticOutput { outpoint, output, channel_keys_id: _ } => { + WatchedOutput { + block_hash, + outpoint: *outpoint, + script_pubkey: output.script_pubkey.clone(), + } + }, + SpendableOutputDescriptor::DelayedPaymentOutput(output) => WatchedOutput { + block_hash, + outpoint: output.outpoint, + script_pubkey: output.output.script_pubkey.clone(), + }, + SpendableOutputDescriptor::StaticPaymentOutput(output) => WatchedOutput { + block_hash, + outpoint: output.outpoint, + script_pubkey: output.output.script_pubkey.clone(), + }, + } + } + + fn is_spent_in(&self, tx: &Transaction) -> bool { + let prev_outpoint = match &self.descriptor { + SpendableOutputDescriptor::StaticOutput { outpoint, .. } => *outpoint, + SpendableOutputDescriptor::DelayedPaymentOutput(output) => output.outpoint, + SpendableOutputDescriptor::StaticPaymentOutput(output) => output.outpoint, + } + .into_bitcoin_outpoint(); + + tx.input.iter().any(|input| input.previous_output == prev_outpoint) + } +} + +impl_writeable_tlv_based!(TrackedSpendableOutput, { + (0, descriptor, required), + (2, channel_id, option), + (4, status, required), +}); + +/// The current status of the output spend. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum OutputSpendStatus { + /// The output is tracked but an initial spending transaction hasn't been generated and + /// broadcasted yet. + PendingInitialBroadcast, + /// A transaction spending the output has been broadcasted but is pending its first confirmation on-chain. + PendingFirstConfirmation { + /// The hash of the chain tip when we first broadcast a transaction spending this output. + first_broadcast_hash: BlockHash, + /// The best height when we last broadcast a transaction spending this output. + latest_broadcast_height: u32, + /// The transaction spending this output we last broadcasted. + latest_spending_tx: Transaction, + }, + /// A transaction spending the output has been confirmed on-chain but will be tracked until it + /// reaches [`ANTI_REORG_DELAY`] confirmations. + PendingThresholdConfirmations { + /// The hash of the chain tip when we first broadcast a transaction spending this output. + first_broadcast_hash: BlockHash, + /// The best height when we last broadcast a transaction spending this output. + latest_broadcast_height: u32, + /// The transaction spending this output we saw confirmed on-chain. + latest_spending_tx: Transaction, + /// The height at which the spending transaction was confirmed. + confirmation_height: u32, + /// The hash of the block in which the spending transaction was confirmed. + confirmation_hash: BlockHash, + }, +} + +impl OutputSpendStatus { + fn broadcast(&mut self, cur_hash: BlockHash, cur_height: u32, latest_spending_tx: Transaction) { + match self { + Self::PendingInitialBroadcast => { + *self = Self::PendingFirstConfirmation { + first_broadcast_hash: cur_hash, + latest_broadcast_height: cur_height, + latest_spending_tx, + }; + }, + Self::PendingFirstConfirmation { first_broadcast_hash, .. } => { + *self = Self::PendingFirstConfirmation { + first_broadcast_hash: *first_broadcast_hash, + latest_broadcast_height: cur_height, + latest_spending_tx, + }; + }, + Self::PendingThresholdConfirmations { .. } => { + debug_assert!(false, "We should never rebroadcast confirmed transactions."); + }, + } + } + + fn confirmed( + &mut self, confirmation_hash: BlockHash, confirmation_height: u32, + latest_spending_tx: Transaction, + ) { + match self { + Self::PendingInitialBroadcast => { + // Generally we can't see any of our transactions confirmed if they haven't been + // broadcasted yet, so this should never be reachable via `transactions_confirmed`. + debug_assert!(false, "We should never confirm when we haven't broadcasted. This a bug and should never happen, please report."); + *self = Self::PendingThresholdConfirmations { + first_broadcast_hash: confirmation_hash, + latest_broadcast_height: confirmation_height, + latest_spending_tx, + confirmation_height, + confirmation_hash, + }; + }, + Self::PendingFirstConfirmation { + first_broadcast_hash, + latest_broadcast_height, + .. + } => { + debug_assert!(confirmation_height >= *latest_broadcast_height); + *self = Self::PendingThresholdConfirmations { + first_broadcast_hash: *first_broadcast_hash, + latest_broadcast_height: *latest_broadcast_height, + latest_spending_tx, + confirmation_height, + confirmation_hash, + }; + }, + Self::PendingThresholdConfirmations { + first_broadcast_hash, + latest_broadcast_height, + .. + } => { + *self = Self::PendingThresholdConfirmations { + first_broadcast_hash: *first_broadcast_hash, + latest_broadcast_height: *latest_broadcast_height, + latest_spending_tx, + confirmation_height, + confirmation_hash, + }; + }, + } + } + + fn unconfirmed(&mut self) { + match self { + Self::PendingInitialBroadcast => { + debug_assert!( + false, + "We should only mark a spend as unconfirmed if it used to be confirmed." + ); + }, + Self::PendingFirstConfirmation { .. } => { + debug_assert!( + false, + "We should only mark a spend as unconfirmed if it used to be confirmed." + ); + }, + Self::PendingThresholdConfirmations { + first_broadcast_hash, + latest_broadcast_height, + latest_spending_tx, + .. + } => { + *self = Self::PendingFirstConfirmation { + first_broadcast_hash: *first_broadcast_hash, + latest_broadcast_height: *latest_broadcast_height, + latest_spending_tx: latest_spending_tx.clone(), + }; + }, + } + } + + fn first_broadcast_hash(&self) -> Option { + match self { + Self::PendingInitialBroadcast => None, + Self::PendingFirstConfirmation { first_broadcast_hash, .. } => { + Some(*first_broadcast_hash) + }, + Self::PendingThresholdConfirmations { first_broadcast_hash, .. } => { + Some(*first_broadcast_hash) + }, + } + } + + fn latest_broadcast_height(&self) -> Option { + match self { + Self::PendingInitialBroadcast => None, + Self::PendingFirstConfirmation { latest_broadcast_height, .. } => { + Some(*latest_broadcast_height) + }, + Self::PendingThresholdConfirmations { latest_broadcast_height, .. } => { + Some(*latest_broadcast_height) + }, + } + } + + fn confirmation_height(&self) -> Option { + match self { + Self::PendingInitialBroadcast => None, + Self::PendingFirstConfirmation { .. } => None, + Self::PendingThresholdConfirmations { confirmation_height, .. } => { + Some(*confirmation_height) + }, + } + } + + fn confirmation_hash(&self) -> Option { + match self { + Self::PendingInitialBroadcast => None, + Self::PendingFirstConfirmation { .. } => None, + Self::PendingThresholdConfirmations { confirmation_hash, .. } => { + Some(*confirmation_hash) + }, + } + } + + fn latest_spending_tx(&self) -> Option<&Transaction> { + match self { + Self::PendingInitialBroadcast => None, + Self::PendingFirstConfirmation { latest_spending_tx, .. } => Some(latest_spending_tx), + Self::PendingThresholdConfirmations { latest_spending_tx, .. } => { + Some(latest_spending_tx) + }, + } + } + + fn is_confirmed(&self) -> bool { + match self { + Self::PendingInitialBroadcast => false, + Self::PendingFirstConfirmation { .. } => false, + Self::PendingThresholdConfirmations { .. } => true, + } + } +} + +impl_writeable_tlv_based_enum!(OutputSpendStatus, + (0, PendingInitialBroadcast) => {}, + (2, PendingFirstConfirmation) => { + (0, first_broadcast_hash, required), + (2, latest_broadcast_height, required), + (4, latest_spending_tx, required), + }, + (4, PendingThresholdConfirmations) => { + (0, first_broadcast_hash, required), + (2, latest_broadcast_height, required), + (4, latest_spending_tx, required), + (6, confirmation_height, required), + (8, confirmation_hash, required), + }; +); + +/// A utility that keeps track of [`SpendableOutputDescriptor`]s, persists them in a given +/// [`KVStore`] and regularly retries sweeping them based on a callback given to the constructor +/// methods. +/// +/// Users should call [`Self::track_spendable_outputs`] for any [`SpendableOutputDescriptor`]s received via [`Event::SpendableOutputs`]. +/// +/// This needs to be notified of chain state changes either via its [`Listen`] or [`Confirm`] +/// implementation and hence has to be connected with the utilized chain data sources. +/// +/// If chain data is provided via the [`Confirm`] interface or via filtered blocks, users are +/// required to give their chain data sources (i.e., [`Filter`] implementation) to the respective +/// constructor. +/// +/// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs +pub struct OutputSweeper +where + B::Target: BroadcasterInterface, + D::Target: ChangeDestinationSource, + E::Target: FeeEstimator, + F::Target: Filter + Sync + Send, + K::Target: KVStore, + L::Target: Logger, + O::Target: OutputSpender, +{ + sweeper_state: Mutex, + broadcaster: B, + fee_estimator: E, + chain_data_source: Option, + output_spender: O, + change_destination_source: D, + kv_store: K, + logger: L, +} + +impl + OutputSweeper +where + B::Target: BroadcasterInterface, + D::Target: ChangeDestinationSource, + E::Target: FeeEstimator, + F::Target: Filter + Sync + Send, + K::Target: KVStore, + L::Target: Logger, + O::Target: OutputSpender, +{ + /// Constructs a new [`OutputSweeper`]. + /// + /// If chain data is provided via the [`Confirm`] interface or via filtered blocks, users also + /// need to register their [`Filter`] implementation via the given `chain_data_source`. + pub fn new( + best_block: BestBlock, broadcaster: B, fee_estimator: E, chain_data_source: Option, + output_spender: O, change_destination_source: D, kv_store: K, logger: L, + ) -> Self { + let outputs = Vec::new(); + let sweeper_state = Mutex::new(SweeperState { outputs, best_block }); + Self { + sweeper_state, + broadcaster, + fee_estimator, + chain_data_source, + output_spender, + change_destination_source, + kv_store, + logger, + } + } + + /// Tells the sweeper to track the given outputs descriptors. + /// + /// Usually, this should be called based on the values emitted by the + /// [`Event::SpendableOutputs`]. + /// + /// The given `exclude_static_ouputs` flag controls whether the sweeper will filter out + /// [`SpendableOutputDescriptor::StaticOutput`]s, which may be handled directly by the on-chain + /// wallet implementation. + /// + /// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs + pub fn track_spendable_outputs( + &self, output_descriptors: Vec, channel_id: Option, + exclude_static_ouputs: bool, + ) { + let mut relevant_descriptors = output_descriptors + .into_iter() + .filter(|desc| { + !(exclude_static_ouputs + && matches!(desc, SpendableOutputDescriptor::StaticOutput { .. })) + }) + .peekable(); + + if relevant_descriptors.peek().is_none() { + return; + } + + let mut spending_tx_opt; + { + let mut state_lock = self.sweeper_state.lock().unwrap(); + for descriptor in relevant_descriptors { + let output_info = TrackedSpendableOutput { + descriptor, + channel_id, + status: OutputSpendStatus::PendingInitialBroadcast, + }; + + if state_lock + .outputs + .iter() + .find(|o| o.descriptor == output_info.descriptor) + .is_some() + { + continue; + } + + state_lock.outputs.push(output_info); + } + spending_tx_opt = self.regenerate_spend_if_necessary(&mut *state_lock); + self.persist_state(&*state_lock).unwrap_or_else(|e| { + log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); + // Skip broadcasting if the persist failed. + spending_tx_opt = None; + }); + } + + if let Some(spending_tx) = spending_tx_opt { + self.broadcaster.broadcast_transactions(&[&spending_tx]); + } + } + + /// Returns a list of the currently tracked spendable outputs. + pub fn tracked_spendable_outputs(&self) -> Vec { + self.sweeper_state.lock().unwrap().outputs.clone() + } + + /// Gets the latest best block which was connected either via the [`Listen`] or + /// [`Confirm`] interfaces. + pub fn current_best_block(&self) -> BestBlock { + self.sweeper_state.lock().unwrap().best_block + } + + fn regenerate_spend_if_necessary( + &self, sweeper_state: &mut SweeperState, + ) -> Option { + let cur_height = sweeper_state.best_block.height; + let cur_hash = sweeper_state.best_block.block_hash; + let filter_fn = |o: &TrackedSpendableOutput| { + if o.status.is_confirmed() { + // Don't rebroadcast confirmed txs. + return false; + } + + if o.status.latest_broadcast_height() >= Some(cur_height) { + // Only broadcast once per block height. + return false; + } + + true + }; + + let respend_descriptors: Vec<&SpendableOutputDescriptor> = + sweeper_state.outputs.iter().filter(|o| filter_fn(*o)).map(|o| &o.descriptor).collect(); + + if respend_descriptors.is_empty() { + // Nothing to do. + return None; + } + + let spending_tx = match self.spend_outputs(&*sweeper_state, respend_descriptors) { + Ok(spending_tx) => { + log_debug!( + self.logger, + "Generating and broadcasting sweeping transaction {}", + spending_tx.txid() + ); + spending_tx + }, + Err(e) => { + log_error!(self.logger, "Error spending outputs: {:?}", e); + return None; + }, + }; + + // As we didn't modify the state so far, the same filter_fn yields the same elements as + // above. + let respend_outputs = sweeper_state.outputs.iter_mut().filter(|o| filter_fn(&**o)); + for output_info in respend_outputs { + if let Some(filter) = self.chain_data_source.as_ref() { + let watched_output = output_info.to_watched_output(cur_hash); + filter.register_output(watched_output); + } + + output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone()); + } + + Some(spending_tx) + } + + fn prune_confirmed_outputs(&self, sweeper_state: &mut SweeperState) { + let cur_height = sweeper_state.best_block.height; + + // Prune all outputs that have sufficient depth by now. + sweeper_state.outputs.retain(|o| { + if let Some(confirmation_height) = o.status.confirmation_height() { + if cur_height >= confirmation_height + ANTI_REORG_DELAY - 1 { + log_debug!(self.logger, + "Pruning swept output as sufficiently confirmed via spend in transaction {:?}. Pruned descriptor: {:?}", + o.status.latest_spending_tx().map(|t| t.txid()), o.descriptor + ); + return false; + } + } + true + }); + } + + fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> { + self.kv_store + .write( + OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_KEY, + &sweeper_state.encode(), + ) + .map_err(|e| { + log_error!( + self.logger, + "Write for key {}/{}/{} failed due to: {}", + OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_KEY, + e + ); + e + }) + } + + fn spend_outputs( + &self, sweeper_state: &SweeperState, descriptors: Vec<&SpendableOutputDescriptor>, + ) -> Result { + let tx_feerate = + self.fee_estimator.get_est_sat_per_1000_weight(ConfirmationTarget::OutputSpendingFee); + let change_destination_script = + self.change_destination_source.get_change_destination_script()?; + let cur_height = sweeper_state.best_block.height; + let locktime = Some(LockTime::from_height(cur_height).unwrap_or(LockTime::ZERO)); + self.output_spender.spend_spendable_outputs( + &descriptors, + Vec::new(), + change_destination_script, + tx_feerate, + locktime, + &Secp256k1::new(), + ) + } + + fn transactions_confirmed_internal( + &self, sweeper_state: &mut SweeperState, header: &Header, + txdata: &chain::transaction::TransactionData, height: u32, + ) { + let confirmation_hash = header.block_hash(); + for (_, tx) in txdata { + for output_info in sweeper_state.outputs.iter_mut() { + if output_info.is_spent_in(*tx) { + output_info.status.confirmed(confirmation_hash, height, (*tx).clone()) + } + } + } + } + + fn best_block_updated_internal( + &self, sweeper_state: &mut SweeperState, header: &Header, height: u32, + ) -> Option { + sweeper_state.best_block = BestBlock::new(header.block_hash(), height); + self.prune_confirmed_outputs(sweeper_state); + let spending_tx_opt = self.regenerate_spend_if_necessary(sweeper_state); + spending_tx_opt + } +} + +impl Listen + for OutputSweeper +where + B::Target: BroadcasterInterface, + D::Target: ChangeDestinationSource, + E::Target: FeeEstimator, + F::Target: Filter + Sync + Send, + K::Target: KVStore, + L::Target: Logger, + O::Target: OutputSpender, +{ + fn filtered_block_connected( + &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32, + ) { + let mut spending_tx_opt; + { + let mut state_lock = self.sweeper_state.lock().unwrap(); + assert_eq!(state_lock.best_block.block_hash, header.prev_blockhash, + "Blocks must be connected in chain-order - the connected header must build on the last connected header"); + assert_eq!(state_lock.best_block.height, height - 1, + "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height"); + + self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height); + spending_tx_opt = self.best_block_updated_internal(&mut *state_lock, header, height); + + self.persist_state(&*state_lock).unwrap_or_else(|e| { + log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); + // Skip broadcasting if the persist failed. + spending_tx_opt = None; + }); + } + + if let Some(spending_tx) = spending_tx_opt { + self.broadcaster.broadcast_transactions(&[&spending_tx]); + } + } + + fn block_disconnected(&self, header: &Header, height: u32) { + let mut state_lock = self.sweeper_state.lock().unwrap(); + + let new_height = height - 1; + let block_hash = header.block_hash(); + + assert_eq!(state_lock.best_block.block_hash, block_hash, + "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header"); + assert_eq!(state_lock.best_block.height, height, + "Blocks must be disconnected in chain-order - the disconnected block must have the correct height"); + state_lock.best_block = BestBlock::new(header.prev_blockhash, new_height); + + for output_info in state_lock.outputs.iter_mut() { + if output_info.status.confirmation_hash() == Some(block_hash) { + debug_assert_eq!(output_info.status.confirmation_height(), Some(height)); + output_info.status.unconfirmed(); + } + } + + self.persist_state(&*state_lock).unwrap_or_else(|e| { + log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); + }); + } +} + +impl Confirm + for OutputSweeper +where + B::Target: BroadcasterInterface, + D::Target: ChangeDestinationSource, + E::Target: FeeEstimator, + F::Target: Filter + Sync + Send, + K::Target: KVStore, + L::Target: Logger, + O::Target: OutputSpender, +{ + fn transactions_confirmed( + &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32, + ) { + let mut state_lock = self.sweeper_state.lock().unwrap(); + self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height); + self.persist_state(&*state_lock).unwrap_or_else(|e| { + log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); + }); + } + + fn transaction_unconfirmed(&self, txid: &Txid) { + let mut state_lock = self.sweeper_state.lock().unwrap(); + + // Get what height was unconfirmed. + let unconf_height = state_lock + .outputs + .iter() + .find(|o| o.status.latest_spending_tx().map(|tx| tx.txid()) == Some(*txid)) + .and_then(|o| o.status.confirmation_height()); + + if let Some(unconf_height) = unconf_height { + // Unconfirm all >= this height. + state_lock + .outputs + .iter_mut() + .filter(|o| o.status.confirmation_height() >= Some(unconf_height)) + .for_each(|o| o.status.unconfirmed()); + + self.persist_state(&*state_lock).unwrap_or_else(|e| { + log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); + }); + } + } + + fn best_block_updated(&self, header: &Header, height: u32) { + let mut spending_tx_opt; + { + let mut state_lock = self.sweeper_state.lock().unwrap(); + spending_tx_opt = self.best_block_updated_internal(&mut *state_lock, header, height); + self.persist_state(&*state_lock).unwrap_or_else(|e| { + log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); + // Skip broadcasting if the persist failed. + spending_tx_opt = None; + }); + } + + if let Some(spending_tx) = spending_tx_opt { + self.broadcaster.broadcast_transactions(&[&spending_tx]); + } + } + + fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option)> { + let state_lock = self.sweeper_state.lock().unwrap(); + state_lock + .outputs + .iter() + .filter_map(|o| match o.status { + OutputSpendStatus::PendingThresholdConfirmations { + ref latest_spending_tx, + confirmation_height, + confirmation_hash, + .. + } => Some((latest_spending_tx.txid(), confirmation_height, Some(confirmation_hash))), + _ => None, + }) + .collect::>() + } +} + +#[derive(Debug, Clone)] +struct SweeperState { + outputs: Vec, + best_block: BestBlock, +} + +impl_writeable_tlv_based!(SweeperState, { + (0, outputs, required_vec), + (2, best_block, required), +}); + +impl + ReadableArgs<(B, E, Option, O, D, K, L)> for OutputSweeper +where + B::Target: BroadcasterInterface, + D::Target: ChangeDestinationSource, + E::Target: FeeEstimator, + F::Target: Filter + Sync + Send, + K::Target: KVStore, + L::Target: Logger, + O::Target: OutputSpender, +{ + #[inline] + fn read( + reader: &mut R, args: (B, E, Option, O, D, K, L), + ) -> Result { + let ( + broadcaster, + fee_estimator, + chain_data_source, + output_spender, + change_destination_source, + kv_store, + logger, + ) = args; + let state = SweeperState::read(reader)?; + let best_block = state.best_block; + + if let Some(filter) = chain_data_source.as_ref() { + for output_info in &state.outputs { + let watched_output = output_info.to_watched_output(best_block.block_hash); + filter.register_output(watched_output); + } + } + + let sweeper_state = Mutex::new(state); + Ok(Self { + sweeper_state, + broadcaster, + fee_estimator, + chain_data_source, + output_spender, + change_destination_source, + kv_store, + logger, + }) + } +} + +impl + ReadableArgs<(B, E, Option, O, D, K, L)> for (BestBlock, OutputSweeper) +where + B::Target: BroadcasterInterface, + D::Target: ChangeDestinationSource, + E::Target: FeeEstimator, + F::Target: Filter + Sync + Send, + K::Target: KVStore, + L::Target: Logger, + O::Target: OutputSpender, +{ + #[inline] + fn read( + reader: &mut R, args: (B, E, Option, O, D, K, L), + ) -> Result { + let ( + broadcaster, + fee_estimator, + chain_data_source, + output_spender, + change_destination_source, + kv_store, + logger, + ) = args; + let state = SweeperState::read(reader)?; + let best_block = state.best_block; + + if let Some(filter) = chain_data_source.as_ref() { + for output_info in &state.outputs { + let watched_output = output_info.to_watched_output(best_block.block_hash); + filter.register_output(watched_output); + } + } + + let sweeper_state = Mutex::new(state); + Ok(( + best_block, + OutputSweeper { + sweeper_state, + broadcaster, + fee_estimator, + chain_data_source, + output_spender, + change_destination_source, + kv_store, + logger, + }, + )) + } +} From a574f8b4a7baeecbdf8573cb908d004906dbec39 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 16 Apr 2024 14:19:54 +0200 Subject: [PATCH 5/7] Allow delaying generation and broadcasting of spending txs .. which enables users to batch output spends. --- lightning/src/util/sweep.rs | 77 ++++++++++++++++++++++++++++++------- 1 file changed, 63 insertions(+), 14 deletions(-) diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs index a8393f70e..59d3a088c 100644 --- a/lightning/src/util/sweep.rs +++ b/lightning/src/util/sweep.rs @@ -69,7 +69,8 @@ impl TrackedSpendableOutput { } } - fn is_spent_in(&self, tx: &Transaction) -> bool { + /// Returns whether the output is spent in the given transaction. + pub fn is_spent_in(&self, tx: &Transaction) -> bool { let prev_outpoint = match &self.descriptor { SpendableOutputDescriptor::StaticOutput { outpoint, .. } => *outpoint, SpendableOutputDescriptor::DelayedPaymentOutput(output) => output.outpoint, @@ -92,7 +93,10 @@ impl_writeable_tlv_based!(TrackedSpendableOutput, { pub enum OutputSpendStatus { /// The output is tracked but an initial spending transaction hasn't been generated and /// broadcasted yet. - PendingInitialBroadcast, + PendingInitialBroadcast { + /// The height at which we will first generate and broadcast a spending transaction. + delayed_until_height: Option, + }, /// A transaction spending the output has been broadcasted but is pending its first confirmation on-chain. PendingFirstConfirmation { /// The hash of the chain tip when we first broadcast a transaction spending this output. @@ -121,7 +125,13 @@ pub enum OutputSpendStatus { impl OutputSpendStatus { fn broadcast(&mut self, cur_hash: BlockHash, cur_height: u32, latest_spending_tx: Transaction) { match self { - Self::PendingInitialBroadcast => { + Self::PendingInitialBroadcast { delayed_until_height } => { + if let Some(delayed_until_height) = delayed_until_height { + debug_assert!( + cur_height >= *delayed_until_height, + "We should never broadcast before the required height is reached." + ); + } *self = Self::PendingFirstConfirmation { first_broadcast_hash: cur_hash, latest_broadcast_height: cur_height, @@ -146,7 +156,7 @@ impl OutputSpendStatus { latest_spending_tx: Transaction, ) { match self { - Self::PendingInitialBroadcast => { + Self::PendingInitialBroadcast { .. } => { // Generally we can't see any of our transactions confirmed if they haven't been // broadcasted yet, so this should never be reachable via `transactions_confirmed`. debug_assert!(false, "We should never confirm when we haven't broadcasted. This a bug and should never happen, please report."); @@ -190,7 +200,7 @@ impl OutputSpendStatus { fn unconfirmed(&mut self) { match self { - Self::PendingInitialBroadcast => { + Self::PendingInitialBroadcast { .. } => { debug_assert!( false, "We should only mark a spend as unconfirmed if it used to be confirmed." @@ -217,9 +227,19 @@ impl OutputSpendStatus { } } + fn is_delayed(&self, cur_height: u32) -> bool { + match self { + Self::PendingInitialBroadcast { delayed_until_height } => { + delayed_until_height.map_or(false, |req_height| cur_height < req_height) + }, + Self::PendingFirstConfirmation { .. } => false, + Self::PendingThresholdConfirmations { .. } => false, + } + } + fn first_broadcast_hash(&self) -> Option { match self { - Self::PendingInitialBroadcast => None, + Self::PendingInitialBroadcast { .. } => None, Self::PendingFirstConfirmation { first_broadcast_hash, .. } => { Some(*first_broadcast_hash) }, @@ -231,7 +251,7 @@ impl OutputSpendStatus { fn latest_broadcast_height(&self) -> Option { match self { - Self::PendingInitialBroadcast => None, + Self::PendingInitialBroadcast { .. } => None, Self::PendingFirstConfirmation { latest_broadcast_height, .. } => { Some(*latest_broadcast_height) }, @@ -243,7 +263,7 @@ impl OutputSpendStatus { fn confirmation_height(&self) -> Option { match self { - Self::PendingInitialBroadcast => None, + Self::PendingInitialBroadcast { .. } => None, Self::PendingFirstConfirmation { .. } => None, Self::PendingThresholdConfirmations { confirmation_height, .. } => { Some(*confirmation_height) @@ -253,7 +273,7 @@ impl OutputSpendStatus { fn confirmation_hash(&self) -> Option { match self { - Self::PendingInitialBroadcast => None, + Self::PendingInitialBroadcast { .. } => None, Self::PendingFirstConfirmation { .. } => None, Self::PendingThresholdConfirmations { confirmation_hash, .. } => { Some(*confirmation_hash) @@ -263,7 +283,7 @@ impl OutputSpendStatus { fn latest_spending_tx(&self) -> Option<&Transaction> { match self { - Self::PendingInitialBroadcast => None, + Self::PendingInitialBroadcast { .. } => None, Self::PendingFirstConfirmation { latest_spending_tx, .. } => Some(latest_spending_tx), Self::PendingThresholdConfirmations { latest_spending_tx, .. } => { Some(latest_spending_tx) @@ -273,7 +293,7 @@ impl OutputSpendStatus { fn is_confirmed(&self) -> bool { match self { - Self::PendingInitialBroadcast => false, + Self::PendingInitialBroadcast { .. } => false, Self::PendingFirstConfirmation { .. } => false, Self::PendingThresholdConfirmations { .. } => true, } @@ -281,7 +301,9 @@ impl OutputSpendStatus { } impl_writeable_tlv_based_enum!(OutputSpendStatus, - (0, PendingInitialBroadcast) => {}, + (0, PendingInitialBroadcast) => { + (0, delayed_until_height, option), + }, (2, PendingFirstConfirmation) => { (0, first_broadcast_hash, required), (2, latest_broadcast_height, required), @@ -372,10 +394,13 @@ where /// [`SpendableOutputDescriptor::StaticOutput`]s, which may be handled directly by the on-chain /// wallet implementation. /// + /// If `delay_until_height` is set, we will delay the spending until the respective block + /// height is reached. This can be used to batch spends, e.g., to reduce on-chain fees. + /// /// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs pub fn track_spendable_outputs( &self, output_descriptors: Vec, channel_id: Option, - exclude_static_ouputs: bool, + exclude_static_ouputs: bool, delay_until_height: Option, ) { let mut relevant_descriptors = output_descriptors .into_iter() @@ -396,7 +421,9 @@ where let output_info = TrackedSpendableOutput { descriptor, channel_id, - status: OutputSpendStatus::PendingInitialBroadcast, + status: OutputSpendStatus::PendingInitialBroadcast { + delayed_until_height: delay_until_height, + }, }; if state_lock @@ -445,6 +472,11 @@ where return false; } + if o.status.is_delayed(cur_height) { + // Don't generate and broadcast if still delayed + return false; + } + if o.status.latest_broadcast_height() >= Some(cur_height) { // Only broadcast once per block height. return false; @@ -726,6 +758,23 @@ impl_writeable_tlv_based!(SweeperState, { (2, best_block, required), }); +/// A `enum` signalling to the [`OutputSweeper`] that it should delay spending an output until a +/// future block height is reached. +#[derive(Debug, Clone)] +pub enum SpendingDelay { + /// A relative delay indicating we shouldn't spend the output before `cur_height + num_blocks` + /// is reached. + Relative { + /// The number of blocks until we'll generate and broadcast the spending transaction. + num_blocks: u32, + }, + /// An absolute delay indicating we shouldn't spend the output before `height` is reached. + Absolute { + /// The height at which we'll generate and broadcast the spending transaction. + height: u32, + }, +} + impl ReadableArgs<(B, E, Option, O, D, K, L)> for OutputSweeper where From 681106d23f9d801bd3a41d598e115dd477461d96 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 12 Jan 2024 17:46:37 +0100 Subject: [PATCH 6/7] Add basic `OutputSweeper` test in BP .. we simply check that the `OutputSweeper` generates a spending tx and that the `TrackedSpendableOutput` is pruned once it reaches `ANTI_REORG_DELAY`. --- lightning-background-processor/src/lib.rs | 140 +++++++++++++++++++++- 1 file changed, 136 insertions(+), 4 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 3736bd603..3fe4da00d 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -919,14 +919,16 @@ impl Drop for BackgroundProcessor { #[cfg(all(feature = "std", test))] mod tests { + use bitcoin::{ScriptBuf, Txid}; use bitcoin::blockdata::constants::{genesis_block, ChainHash}; use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::blockdata::transaction::{Transaction, TxOut}; + use bitcoin::hashes::Hash; use bitcoin::network::constants::Network; use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1}; - use lightning::chain::{BestBlock, Confirm, chainmonitor}; + use lightning::chain::{BestBlock, Confirm, chainmonitor, Filter}; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; - use lightning::sign::{InMemorySigner, KeysManager}; + use lightning::sign::{InMemorySigner, KeysManager, ChangeDestinationSource}; use lightning::chain::transaction::OutPoint; use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent}; use lightning::{get_event_msg, get_event}; @@ -947,6 +949,7 @@ mod tests { CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY}; + use lightning::util::sweep::{OutputSweeper, OutputSpendStatus}; use lightning_persister::fs_store::FilesystemStore; use std::collections::VecDeque; use std::{fs, env}; @@ -1009,6 +1012,9 @@ mod tests { logger: Arc, best_block: BestBlock, scorer: Arc>, + sweeper: Arc, Arc, + Arc, Arc, Arc, + Arc, Arc>>, } impl Node { @@ -1247,6 +1253,14 @@ mod tests { } } + struct TestWallet {} + + impl ChangeDestinationSource for TestWallet { + fn get_change_destination_script(&self) -> Result { + Ok(ScriptBuf::new()) + } + } + fn get_full_filepath(filepath: String, filename: String) -> String { let mut path = PathBuf::from(filepath); path.push(filename); @@ -1271,10 +1285,15 @@ mod tests { let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), Arc::clone(&keys_manager), scorer.clone(), Default::default())); let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin)); let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into())); + let now = Duration::from_secs(genesis_block.header.time as u64); + let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos())); let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), kv_store.clone())); let best_block = BestBlock::from_network(network); let params = ChainParameters { network, best_block }; let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params, genesis_block.header.time)); + let wallet = Arc::new(TestWallet {}); + let sweeper = Arc::new(OutputSweeper::new(best_block, Arc::clone(&tx_broadcaster), Arc::clone(&fee_estimator), + None::>, Arc::clone(&keys_manager), wallet, Arc::clone(&kv_store), Arc::clone(&logger))); let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone())); let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone())); let msg_handler = MessageHandler { @@ -1283,7 +1302,7 @@ mod tests { onion_message_handler: IgnoringMessageHandler{}, custom_message_handler: IgnoringMessageHandler{} }; let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), keys_manager.clone())); - let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, kv_store, tx_broadcaster, network_graph, logger, best_block, scorer }; + let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, kv_store, tx_broadcaster, network_graph, logger, best_block, scorer, sweeper }; nodes.push(node); } @@ -1352,15 +1371,40 @@ mod tests { 1 => { node.node.transactions_confirmed(&header, &txdata, height); node.chain_monitor.transactions_confirmed(&header, &txdata, height); + node.sweeper.transactions_confirmed(&header, &txdata, height); }, x if x == depth => { + // We need the TestBroadcaster to know about the new height so that it doesn't think + // we're violating the time lock requirements of transactions broadcasted at that + // point. + node.tx_broadcaster.blocks.lock().unwrap().push((genesis_block(Network::Bitcoin), height)); node.node.best_block_updated(&header, height); node.chain_monitor.best_block_updated(&header, height); + node.sweeper.best_block_updated(&header, height); }, _ => {}, } } } + + fn advance_chain(node: &mut Node, num_blocks: u32) { + for i in 1..=num_blocks { + let prev_blockhash = node.best_block.block_hash; + let height = node.best_block.height + 1; + let header = create_dummy_header(prev_blockhash, height); + node.best_block = BestBlock::new(header.block_hash(), height); + if i == num_blocks { + // We need the TestBroadcaster to know about the new height so that it doesn't think + // we're violating the time lock requirements of transactions broadcasted at that + // point. + node.tx_broadcaster.blocks.lock().unwrap().push((genesis_block(Network::Bitcoin), height)); + node.node.best_block_updated(&header, height); + node.chain_monitor.best_block_updated(&header, height); + node.sweeper.best_block_updated(&header, height); + } + } + } + fn confirm_transaction(node: &mut Node, tx: &Transaction) { confirm_transaction_depth(node, tx, ANTI_REORG_DELAY); } @@ -1592,6 +1636,9 @@ mod tests { let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id()); nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding); let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id()); + let broadcast_funding = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap(); + assert_eq!(broadcast_funding.txid(), funding_tx.txid()); + assert!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty()); if !std::thread::panicking() { bg_processor.stop().unwrap(); @@ -1617,10 +1664,95 @@ mod tests { .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) .expect("Events not handled within deadline"); match event { - Event::SpendableOutputs { .. } => {}, + Event::SpendableOutputs { outputs, channel_id } => { + nodes[0].sweeper.track_spendable_outputs(outputs, channel_id, false, Some(153)); + }, _ => panic!("Unexpected event: {:?}", event), } + // Check we don't generate an initial sweeping tx until we reach the required height. + assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1); + let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone(); + if let Some(sweep_tx_0) = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop() { + assert!(!tracked_output.is_spent_in(&sweep_tx_0)); + match tracked_output.status { + OutputSpendStatus::PendingInitialBroadcast { delayed_until_height } => { + assert_eq!(delayed_until_height, Some(153)); + } + _ => panic!("Unexpected status"), + } + } + + advance_chain(&mut nodes[0], 3); + + // Check we generate an initial sweeping tx. + assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1); + let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone(); + let sweep_tx_0 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap(); + match tracked_output.status { + OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => { + assert_eq!(sweep_tx_0.txid(), latest_spending_tx.txid()); + } + _ => panic!("Unexpected status"), + } + + // Check we regenerate and rebroadcast the sweeping tx each block. + advance_chain(&mut nodes[0], 1); + assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1); + let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone(); + let sweep_tx_1 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap(); + match tracked_output.status { + OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => { + assert_eq!(sweep_tx_1.txid(), latest_spending_tx.txid()); + } + _ => panic!("Unexpected status"), + } + assert_ne!(sweep_tx_0, sweep_tx_1); + + advance_chain(&mut nodes[0], 1); + assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1); + let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone(); + let sweep_tx_2 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap(); + match tracked_output.status { + OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => { + assert_eq!(sweep_tx_2.txid(), latest_spending_tx.txid()); + } + _ => panic!("Unexpected status"), + } + assert_ne!(sweep_tx_0, sweep_tx_2); + assert_ne!(sweep_tx_1, sweep_tx_2); + + // Check we still track the spendable outputs up to ANTI_REORG_DELAY confirmations. + confirm_transaction_depth(&mut nodes[0], &sweep_tx_2, 5); + assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1); + let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone(); + match tracked_output.status { + OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => { + assert_eq!(sweep_tx_2.txid(), latest_spending_tx.txid()); + } + _ => panic!("Unexpected status"), + } + + // Check we still see the transaction as confirmed if we unconfirm any untracked + // transaction. (We previously had a bug that would mark tracked transactions as + // unconfirmed if any transaction at an unknown block height would be unconfirmed.) + let unconf_txid = Txid::from_slice(&[0; 32]).unwrap(); + nodes[0].sweeper.transaction_unconfirmed(&unconf_txid); + + assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1); + let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone(); + match tracked_output.status { + OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => { + assert_eq!(sweep_tx_2.txid(), latest_spending_tx.txid()); + } + _ => panic!("Unexpected status"), + } + + // Check we stop tracking the spendable outputs when one of the txs reaches + // ANTI_REORG_DELAY confirmations. + confirm_transaction_depth(&mut nodes[0], &sweep_tx_0, ANTI_REORG_DELAY); + assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 0); + if !std::thread::panicking() { bg_processor.stop().unwrap(); } From be8c0d099d9fe132727606cdf019fb21d1e5a6a5 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 16 Apr 2024 14:50:51 +0200 Subject: [PATCH 7/7] Edit `Event::SpendableOutputs` docs to mention `OutputSweeper` --- lightning/src/events/mod.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lightning/src/events/mod.rs b/lightning/src/events/mod.rs index f6e7f7164..4d644fe43 100644 --- a/lightning/src/events/mod.rs +++ b/lightning/src/events/mod.rs @@ -792,9 +792,15 @@ pub enum Event { }, /// Used to indicate that an output which you should know how to spend was confirmed on chain /// and is now spendable. - /// Such an output will *not* ever be spent by rust-lightning, and are not at risk of your + /// + /// Such an output will *never* be spent directly by LDK, and are not at risk of your /// counterparty spending them due to some kind of timeout. Thus, you need to store them /// somewhere and spend them when you create on-chain transactions. + /// + /// You may hand them to the [`OutputSweeper`] utility which will store and (re-)generate spending + /// transactions for you. + /// + /// [`OutputSweeper`]: crate::util::sweep::OutputSweeper SpendableOutputs { /// The outputs which you should store as spendable by you. outputs: Vec,