mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-03-15 03:51:23 +01:00
sweep: add new interface Cluster
to manage grouping inputs
This commit adds a new interface `Cluster` to manage cluster-level inputs grouping. This new interface replaces the `inputCluster` and will be futher refactored so the sweeper can use a much smaller coin selection lock.
This commit is contained in:
parent
a7e9c08baf
commit
9d5ddf29f3
6 changed files with 171 additions and 353 deletions
|
@ -4,6 +4,8 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
"github.com/btcsuite/btcd/wire"
|
"github.com/btcsuite/btcd/wire"
|
||||||
|
"github.com/lightningnetwork/lnd/input"
|
||||||
|
"github.com/lightningnetwork/lnd/lnwallet"
|
||||||
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
|
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -21,12 +23,152 @@ const (
|
||||||
DefaultFeeRateBucketSize = 10
|
DefaultFeeRateBucketSize = 10
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// inputSet is a set of inputs that can be used as the basis to generate a tx
|
||||||
|
// on.
|
||||||
|
type inputSet []input.Input
|
||||||
|
|
||||||
|
// Cluster defines an interface that prepares inputs of a cluster to be grouped
|
||||||
|
// into a list of sets that can be used to create sweep transactions.
|
||||||
|
type Cluster interface {
|
||||||
|
// CreateInputSets goes through the cluster's inputs and constructs
|
||||||
|
// sets of inputs that can be used to generate a sensible transaction.
|
||||||
|
CreateInputSets(wallet Wallet, maxFeeRate chainfee.SatPerKWeight,
|
||||||
|
maxInputs int) ([]inputSet, error)
|
||||||
|
|
||||||
|
// FeeRate returns the fee rate of the cluster.
|
||||||
|
FeeRate() chainfee.SatPerKWeight
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compile-time constraint to ensure inputCluster implements Cluster.
|
||||||
|
var _ Cluster = (*inputCluster)(nil)
|
||||||
|
|
||||||
|
// inputCluster is a helper struct to gather a set of pending inputs that
|
||||||
|
// should be swept with the specified fee rate.
|
||||||
|
type inputCluster struct {
|
||||||
|
lockTime *uint32
|
||||||
|
sweepFeeRate chainfee.SatPerKWeight
|
||||||
|
inputs pendingInputs
|
||||||
|
}
|
||||||
|
|
||||||
|
// FeeRate returns the fee rate of the cluster.
|
||||||
|
func (c *inputCluster) FeeRate() chainfee.SatPerKWeight {
|
||||||
|
return c.sweepFeeRate
|
||||||
|
}
|
||||||
|
|
||||||
|
// GroupInputs goes through the cluster's inputs and constructs sets of inputs
|
||||||
|
// that can be used to generate a sensible transaction. Each set contains up to
|
||||||
|
// the configured maximum number of inputs. Negative yield inputs are skipped.
|
||||||
|
// No input sets with a total value after fees below the dust limit are
|
||||||
|
// returned.
|
||||||
|
func (c *inputCluster) CreateInputSets(
|
||||||
|
wallet Wallet, maxFeeRate chainfee.SatPerKWeight,
|
||||||
|
maxInputs int) ([]inputSet, error) {
|
||||||
|
|
||||||
|
// Turn the inputs into a slice so we can sort them.
|
||||||
|
inputList := make([]*pendingInput, 0, len(c.inputs))
|
||||||
|
for _, input := range c.inputs {
|
||||||
|
inputList = append(inputList, input)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Yield is calculated as the difference between value and added fee
|
||||||
|
// for this input. The fee calculation excludes fee components that are
|
||||||
|
// common to all inputs, as those wouldn't influence the order. The
|
||||||
|
// single component that is differentiating is witness size.
|
||||||
|
//
|
||||||
|
// For witness size, the upper limit is taken. The actual size depends
|
||||||
|
// on the signature length, which is not known yet at this point.
|
||||||
|
calcYield := func(input *pendingInput) int64 {
|
||||||
|
size, _, err := input.WitnessType().SizeUpperBound()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Failed to get input weight: %v", err)
|
||||||
|
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
yield := input.SignDesc().Output.Value -
|
||||||
|
int64(c.sweepFeeRate.FeeForWeight(int64(size)))
|
||||||
|
|
||||||
|
return yield
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort input by yield. We will start constructing input sets starting
|
||||||
|
// with the highest yield inputs. This is to prevent the construction
|
||||||
|
// of a set with an output below the dust limit, causing the sweep
|
||||||
|
// process to stop, while there are still higher value inputs
|
||||||
|
// available. It also allows us to stop evaluating more inputs when the
|
||||||
|
// first input in this ordering is encountered with a negative yield.
|
||||||
|
sort.Slice(inputList, func(i, j int) bool {
|
||||||
|
// Because of the specific ordering and termination condition
|
||||||
|
// that is described above, we place force sweeps at the start
|
||||||
|
// of the list. Otherwise we can't be sure that they will be
|
||||||
|
// included in an input set.
|
||||||
|
if inputList[i].parameters().Force {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return calcYield(inputList[i]) > calcYield(inputList[j])
|
||||||
|
})
|
||||||
|
|
||||||
|
// Select blocks of inputs up to the configured maximum number.
|
||||||
|
var sets []inputSet
|
||||||
|
for len(inputList) > 0 {
|
||||||
|
// Start building a set of positive-yield tx inputs under the
|
||||||
|
// condition that the tx will be published with the specified
|
||||||
|
// fee rate.
|
||||||
|
txInputs := newTxInputSet(
|
||||||
|
wallet, c.sweepFeeRate, maxFeeRate, maxInputs,
|
||||||
|
)
|
||||||
|
|
||||||
|
// From the set of sweepable inputs, keep adding inputs to the
|
||||||
|
// input set until the tx output value no longer goes up or the
|
||||||
|
// maximum number of inputs is reached.
|
||||||
|
txInputs.addPositiveYieldInputs(inputList)
|
||||||
|
|
||||||
|
// If there are no positive yield inputs, we can stop here.
|
||||||
|
inputCount := len(txInputs.inputs)
|
||||||
|
if inputCount == 0 {
|
||||||
|
return sets, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the current output value and add wallet utxos if
|
||||||
|
// needed to push the output value to the lower limit.
|
||||||
|
if err := txInputs.tryAddWalletInputsIfNeeded(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the output value of this block of inputs does not reach
|
||||||
|
// the dust limit, stop sweeping. Because of the sorting,
|
||||||
|
// continuing with the remaining inputs will only lead to sets
|
||||||
|
// with an even lower output value.
|
||||||
|
if !txInputs.enoughInput() {
|
||||||
|
// The change output is always a p2tr here.
|
||||||
|
dl := lnwallet.DustLimitForSize(input.P2TRSize)
|
||||||
|
log.Debugf("Input set value %v (required=%v, "+
|
||||||
|
"change=%v) below dust limit of %v",
|
||||||
|
txInputs.totalOutput(), txInputs.requiredOutput,
|
||||||
|
txInputs.changeOutput, dl)
|
||||||
|
return sets, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Candidate sweep set of size=%v (+%v wallet inputs),"+
|
||||||
|
" has yield=%v, weight=%v",
|
||||||
|
inputCount, len(txInputs.inputs)-inputCount,
|
||||||
|
txInputs.totalOutput()-txInputs.walletInputTotal,
|
||||||
|
txInputs.weightEstimate(true).weight())
|
||||||
|
|
||||||
|
sets = append(sets, txInputs.inputs)
|
||||||
|
inputList = inputList[inputCount:]
|
||||||
|
}
|
||||||
|
|
||||||
|
return sets, nil
|
||||||
|
}
|
||||||
|
|
||||||
// UtxoAggregator defines an interface that takes a list of inputs and
|
// UtxoAggregator defines an interface that takes a list of inputs and
|
||||||
// aggregate them into groups. Each group is used as the inputs to create a
|
// aggregate them into groups. Each group is used as the inputs to create a
|
||||||
// sweeping transaction.
|
// sweeping transaction.
|
||||||
type UtxoAggregator interface {
|
type UtxoAggregator interface {
|
||||||
// ClusterInputs takes a list of inputs and groups them into clusters.
|
// ClusterInputs takes a list of inputs and groups them into clusters.
|
||||||
ClusterInputs(pendingInputs) []inputCluster
|
ClusterInputs(pendingInputs) []Cluster
|
||||||
}
|
}
|
||||||
|
|
||||||
// SimpleAggregator aggregates inputs known by the Sweeper based on each
|
// SimpleAggregator aggregates inputs known by the Sweeper based on each
|
||||||
|
@ -72,11 +214,7 @@ func NewSimpleUtxoAggregator(estimator chainfee.Estimator,
|
||||||
// inputs known by the UtxoSweeper. It clusters inputs by
|
// inputs known by the UtxoSweeper. It clusters inputs by
|
||||||
// 1) Required tx locktime
|
// 1) Required tx locktime
|
||||||
// 2) Similar fee rates.
|
// 2) Similar fee rates.
|
||||||
//
|
func (s *SimpleAggregator) ClusterInputs(inputs pendingInputs) []Cluster {
|
||||||
// TODO(yy): remove this nolint once done refactoring.
|
|
||||||
//
|
|
||||||
//nolint:revive
|
|
||||||
func (s *SimpleAggregator) ClusterInputs(inputs pendingInputs) []inputCluster {
|
|
||||||
// We start by getting the inputs clusters by locktime. Since the
|
// We start by getting the inputs clusters by locktime. Since the
|
||||||
// inputs commit to the locktime, they can only be clustered together
|
// inputs commit to the locktime, they can only be clustered together
|
||||||
// if the locktime is equal.
|
// if the locktime is equal.
|
||||||
|
@ -88,7 +226,19 @@ func (s *SimpleAggregator) ClusterInputs(inputs pendingInputs) []inputCluster {
|
||||||
// Since the inputs that we clustered by fee rate don't commit to a
|
// Since the inputs that we clustered by fee rate don't commit to a
|
||||||
// specific locktime, we can try to merge a locktime cluster with a fee
|
// specific locktime, we can try to merge a locktime cluster with a fee
|
||||||
// cluster.
|
// cluster.
|
||||||
return zipClusters(lockTimeClusters, feeClusters)
|
clusters := zipClusters(lockTimeClusters, feeClusters)
|
||||||
|
|
||||||
|
sort.Slice(clusters, func(i, j int) bool {
|
||||||
|
return clusters[i].sweepFeeRate >
|
||||||
|
clusters[j].sweepFeeRate
|
||||||
|
})
|
||||||
|
|
||||||
|
result := make([]Cluster, 0, len(clusters))
|
||||||
|
for _, c := range clusters {
|
||||||
|
result = append(result, &c)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// clusterByLockTime takes the given set of pending inputs and clusters those
|
// clusterByLockTime takes the given set of pending inputs and clusters those
|
||||||
|
|
|
@ -37,8 +37,8 @@ type mockUtxoAggregator struct {
|
||||||
var _ UtxoAggregator = (*mockUtxoAggregator)(nil)
|
var _ UtxoAggregator = (*mockUtxoAggregator)(nil)
|
||||||
|
|
||||||
// ClusterInputs takes a list of inputs and groups them into clusters.
|
// ClusterInputs takes a list of inputs and groups them into clusters.
|
||||||
func (m *mockUtxoAggregator) ClusterInputs(pendingInputs) []inputCluster {
|
func (m *mockUtxoAggregator) ClusterInputs(pendingInputs) []Cluster {
|
||||||
args := m.Called(pendingInputs{})
|
args := m.Called(pendingInputs{})
|
||||||
|
|
||||||
return args.Get(0).([]inputCluster)
|
return args.Get(0).([]Cluster)
|
||||||
}
|
}
|
||||||
|
|
139
sweep/sweeper.go
139
sweep/sweeper.go
|
@ -4,7 +4,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sort"
|
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
@ -224,14 +223,6 @@ func (p *pendingInput) terminated() bool {
|
||||||
// pendingInputs is a type alias for a set of pending inputs.
|
// pendingInputs is a type alias for a set of pending inputs.
|
||||||
type pendingInputs = map[wire.OutPoint]*pendingInput
|
type pendingInputs = map[wire.OutPoint]*pendingInput
|
||||||
|
|
||||||
// inputCluster is a helper struct to gather a set of pending inputs that should
|
|
||||||
// be swept with the specified fee rate.
|
|
||||||
type inputCluster struct {
|
|
||||||
lockTime *uint32
|
|
||||||
sweepFeeRate chainfee.SatPerKWeight
|
|
||||||
inputs pendingInputs
|
|
||||||
}
|
|
||||||
|
|
||||||
// pendingSweepsReq is an internal message we'll use to represent an external
|
// pendingSweepsReq is an internal message we'll use to represent an external
|
||||||
// caller's intent to retrieve all of the pending inputs the UtxoSweeper is
|
// caller's intent to retrieve all of the pending inputs the UtxoSweeper is
|
||||||
// attempting to sweep.
|
// attempting to sweep.
|
||||||
|
@ -750,60 +741,26 @@ func (s *UtxoSweeper) removeExclusiveGroup(group uint64) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// sweepCluster tries to sweep the given input cluster.
|
// sweepCluster tries to sweep the given input cluster.
|
||||||
func (s *UtxoSweeper) sweepCluster(cluster inputCluster) error {
|
func (s *UtxoSweeper) sweepCluster(cluster Cluster) error {
|
||||||
// Execute the sweep within a coin select lock. Otherwise the coins
|
// Execute the sweep within a coin select lock. Otherwise the coins
|
||||||
// that we are going to spend may be selected for other transactions
|
// that we are going to spend may be selected for other transactions
|
||||||
// like funding of a channel.
|
// like funding of a channel.
|
||||||
|
//
|
||||||
|
// TODO(yy): decrease the lock scope.
|
||||||
return s.cfg.Wallet.WithCoinSelectLock(func() error {
|
return s.cfg.Wallet.WithCoinSelectLock(func() error {
|
||||||
// Examine pending inputs and try to construct lists of inputs.
|
// Examine pending inputs and try to construct lists of inputs.
|
||||||
allSets, newSets, err := s.getInputLists(cluster)
|
sets, err := cluster.CreateInputSets(
|
||||||
|
s.cfg.Wallet,
|
||||||
|
s.cfg.MaxFeeRate.FeePerKWeight(),
|
||||||
|
s.cfg.MaxInputsPerTx,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("examine pending inputs: %w", err)
|
return fmt.Errorf("examine pending inputs: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// errAllSets records the error from broadcasting the sweeping
|
// Create sweeping transaction for each set.
|
||||||
// transactions for all input sets.
|
for _, inputs := range sets {
|
||||||
var errAllSets error
|
err := s.sweep(inputs, cluster.FeeRate())
|
||||||
|
|
||||||
// allSets contains retried inputs and new inputs. To avoid
|
|
||||||
// creating an RBF for the new inputs, we'd sweep this set
|
|
||||||
// first.
|
|
||||||
for _, inputs := range allSets {
|
|
||||||
errAllSets = s.sweep(inputs, cluster.sweepFeeRate)
|
|
||||||
// TODO(yy): we should also find out which set created
|
|
||||||
// this error. If there are new inputs in this set, we
|
|
||||||
// should give it a second chance by sweeping them
|
|
||||||
// below. To enable this, we need to provide richer
|
|
||||||
// state for each input other than just recording the
|
|
||||||
// publishAttempts. We'd also need to refactor how we
|
|
||||||
// create the input sets. Atm, the steps are,
|
|
||||||
// 1. create a list of input sets.
|
|
||||||
// 2. sweep each set by creating and publishing the tx.
|
|
||||||
// We should change the flow as,
|
|
||||||
// 1. create a list of input sets, and for each set,
|
|
||||||
// 2. when created, we create and publish the tx.
|
|
||||||
// 3. if the publish fails, find out which input is
|
|
||||||
// causing the failure and retry the rest of the
|
|
||||||
// inputs.
|
|
||||||
if errAllSets != nil {
|
|
||||||
log.Errorf("Sweep all inputs got error: %v",
|
|
||||||
errAllSets)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we have successfully swept all inputs, there's no need to
|
|
||||||
// sweep the new inputs as it'd create an RBF case.
|
|
||||||
if allSets != nil && errAllSets == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// We'd end up there if there's no retried inputs or the above
|
|
||||||
// sweeping tx failed. In this case, we'd sweep the new input
|
|
||||||
// sets. If there's an error when sweeping a given set, we'd
|
|
||||||
// log the error and sweep the next set.
|
|
||||||
for _, inputs := range newSets {
|
|
||||||
err := s.sweep(inputs, cluster.sweepFeeRate)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("sweep new inputs: %w", err)
|
log.Errorf("sweep new inputs: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -844,76 +801,6 @@ func (s *UtxoSweeper) signalResult(pi *pendingInput, result Result) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getInputLists goes through the given inputs and constructs multiple distinct
|
|
||||||
// sweep lists with the given fee rate, each up to the configured maximum
|
|
||||||
// number of inputs. Negative yield inputs are skipped. Transactions with an
|
|
||||||
// output below the dust limit are not published. Those inputs remain pending
|
|
||||||
// and will be bundled with future inputs if possible. It returns two list -
|
|
||||||
// one containing all inputs and the other containing only the new inputs. If
|
|
||||||
// there's no retried inputs, the first set returned will be empty.
|
|
||||||
func (s *UtxoSweeper) getInputLists(
|
|
||||||
cluster inputCluster) ([]inputSet, []inputSet, error) {
|
|
||||||
|
|
||||||
// Filter for inputs that need to be swept. Create two lists: all
|
|
||||||
// sweepable inputs and a list containing only the new, never tried
|
|
||||||
// inputs.
|
|
||||||
//
|
|
||||||
// We want to create as large a tx as possible, so we return a final
|
|
||||||
// set list that starts with sets created from all inputs. However,
|
|
||||||
// there is a chance that those txes will not publish, because they
|
|
||||||
// already contain inputs that failed before. Therefore we also add
|
|
||||||
// sets consisting of only new inputs to the list, to make sure that
|
|
||||||
// new inputs are given a good, isolated chance of being published.
|
|
||||||
//
|
|
||||||
// TODO(yy): this would lead to conflict transactions as the same input
|
|
||||||
// can be used in two sweeping transactions, and our rebroadcaster will
|
|
||||||
// retry the failed one. We should instead understand why the input is
|
|
||||||
// failed in the first place, and start tracking input states in
|
|
||||||
// sweeper to avoid this.
|
|
||||||
var newInputs, retryInputs []txInput
|
|
||||||
for _, input := range cluster.inputs {
|
|
||||||
// Add input to the either one of the lists.
|
|
||||||
if input.publishAttempts == 0 {
|
|
||||||
newInputs = append(newInputs, input)
|
|
||||||
} else {
|
|
||||||
retryInputs = append(retryInputs, input)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Convert the max fee rate's unit from sat/vb to sat/kw.
|
|
||||||
maxFeeRate := s.cfg.MaxFeeRate.FeePerKWeight()
|
|
||||||
|
|
||||||
// If there is anything to retry, combine it with the new inputs and
|
|
||||||
// form input sets.
|
|
||||||
var allSets []inputSet
|
|
||||||
if len(retryInputs) > 0 {
|
|
||||||
var err error
|
|
||||||
allSets, err = generateInputPartitionings(
|
|
||||||
append(retryInputs, newInputs...),
|
|
||||||
cluster.sweepFeeRate, maxFeeRate,
|
|
||||||
s.cfg.MaxInputsPerTx, s.cfg.Wallet,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("input partitionings: %w",
|
|
||||||
err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create sets for just the new inputs.
|
|
||||||
newSets, err := generateInputPartitionings(
|
|
||||||
newInputs, cluster.sweepFeeRate, maxFeeRate,
|
|
||||||
s.cfg.MaxInputsPerTx, s.cfg.Wallet,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("input partitionings: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Debugf("Sweep candidates at height=%v: total_num_pending=%v, "+
|
|
||||||
"total_num_new=%v", s.currentHeight, len(allSets), len(newSets))
|
|
||||||
|
|
||||||
return allSets, newSets, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// sweep takes a set of preselected inputs, creates a sweep tx and publishes the
|
// sweep takes a set of preselected inputs, creates a sweep tx and publishes the
|
||||||
// tx. The output address is only marked as used if the publish succeeds.
|
// tx. The output address is only marked as used if the publish succeeds.
|
||||||
func (s *UtxoSweeper) sweep(inputs inputSet,
|
func (s *UtxoSweeper) sweep(inputs inputSet,
|
||||||
|
@ -1678,10 +1565,6 @@ func (s *UtxoSweeper) sweepPendingInputs(inputs pendingInputs) {
|
||||||
// rate order. We do this to ensure any inputs which have had their fee
|
// rate order. We do this to ensure any inputs which have had their fee
|
||||||
// rate bumped are broadcast first in order enforce the RBF policy.
|
// rate bumped are broadcast first in order enforce the RBF policy.
|
||||||
inputClusters := s.cfg.Aggregator.ClusterInputs(inputs)
|
inputClusters := s.cfg.Aggregator.ClusterInputs(inputs)
|
||||||
sort.Slice(inputClusters, func(i, j int) bool {
|
|
||||||
return inputClusters[i].sweepFeeRate >
|
|
||||||
inputClusters[j].sweepFeeRate
|
|
||||||
})
|
|
||||||
|
|
||||||
for _, cluster := range inputClusters {
|
for _, cluster := range inputClusters {
|
||||||
err := s.sweepCluster(cluster)
|
err := s.sweepCluster(cluster)
|
||||||
|
|
|
@ -1882,115 +1882,6 @@ func TestSweeperShutdownHandling(t *testing.T) {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestGetInputLists checks that the expected input sets are returned based on
|
|
||||||
// whether there are retried inputs or not.
|
|
||||||
func TestGetInputLists(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
// Create a test param with a dummy fee preference. This is needed so
|
|
||||||
// `feeRateForPreference` won't throw an error.
|
|
||||||
param := Params{Fee: FeeEstimateInfo{ConfTarget: 1}}
|
|
||||||
|
|
||||||
// Create a mock input and mock all the methods used in this test.
|
|
||||||
testInput := &input.MockInput{}
|
|
||||||
testInput.On("RequiredLockTime").Return(0, false)
|
|
||||||
testInput.On("WitnessType").Return(input.CommitmentAnchor)
|
|
||||||
testInput.On("OutPoint").Return(&wire.OutPoint{Index: 1})
|
|
||||||
testInput.On("RequiredTxOut").Return(nil)
|
|
||||||
testInput.On("UnconfParent").Return(nil)
|
|
||||||
testInput.On("SignDesc").Return(&input.SignDescriptor{
|
|
||||||
Output: &wire.TxOut{Value: 100_000},
|
|
||||||
})
|
|
||||||
|
|
||||||
// Create a new and a retried input.
|
|
||||||
//
|
|
||||||
// NOTE: we use the same input.Input for both pending inputs as we only
|
|
||||||
// test the logic of returning the correct non-nil input sets, and not
|
|
||||||
// the content the of sets. To validate the content of the sets, we
|
|
||||||
// should test `generateInputPartitionings` instead.
|
|
||||||
newInput := &pendingInput{
|
|
||||||
Input: testInput,
|
|
||||||
params: param,
|
|
||||||
}
|
|
||||||
oldInput := &pendingInput{
|
|
||||||
Input: testInput,
|
|
||||||
params: param,
|
|
||||||
publishAttempts: 1,
|
|
||||||
}
|
|
||||||
|
|
||||||
// clusterNew contains only new inputs.
|
|
||||||
clusterNew := pendingInputs{
|
|
||||||
wire.OutPoint{Index: 1}: newInput,
|
|
||||||
}
|
|
||||||
|
|
||||||
// clusterMixed contains a mixed of new and retried inputs.
|
|
||||||
clusterMixed := pendingInputs{
|
|
||||||
wire.OutPoint{Index: 1}: newInput,
|
|
||||||
wire.OutPoint{Index: 2}: oldInput,
|
|
||||||
}
|
|
||||||
|
|
||||||
// clusterOld contains only retried inputs.
|
|
||||||
clusterOld := pendingInputs{
|
|
||||||
wire.OutPoint{Index: 2}: oldInput,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a test sweeper.
|
|
||||||
s := New(&UtxoSweeperConfig{
|
|
||||||
MaxInputsPerTx: DefaultMaxInputsPerTx,
|
|
||||||
})
|
|
||||||
|
|
||||||
testCases := []struct {
|
|
||||||
name string
|
|
||||||
cluster inputCluster
|
|
||||||
expectedNilAllSet bool
|
|
||||||
expectNilNewSet bool
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
// When there are only new inputs, we'd expect the
|
|
||||||
// first returned set(allSets) to be empty.
|
|
||||||
name: "new inputs only",
|
|
||||||
cluster: inputCluster{inputs: clusterNew},
|
|
||||||
expectedNilAllSet: true,
|
|
||||||
expectNilNewSet: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
// When there are only retried inputs, we'd expect the
|
|
||||||
// second returned set(newSet) to be empty.
|
|
||||||
name: "retried inputs only",
|
|
||||||
cluster: inputCluster{inputs: clusterOld},
|
|
||||||
expectedNilAllSet: false,
|
|
||||||
expectNilNewSet: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
// When there are mixed inputs, we'd expect two sets
|
|
||||||
// are returned.
|
|
||||||
name: "mixed inputs",
|
|
||||||
cluster: inputCluster{inputs: clusterMixed},
|
|
||||||
expectedNilAllSet: false,
|
|
||||||
expectNilNewSet: false,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range testCases {
|
|
||||||
tc := tc
|
|
||||||
|
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
allSets, newSets, err := s.getInputLists(tc.cluster)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
if tc.expectNilNewSet {
|
|
||||||
require.Nil(t, newSets)
|
|
||||||
}
|
|
||||||
|
|
||||||
if tc.expectedNilAllSet {
|
|
||||||
require.Nil(t, allSets)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestMarkInputsPendingPublish checks that given a list of inputs with
|
// TestMarkInputsPendingPublish checks that given a list of inputs with
|
||||||
// different states, only the non-terminal state will be marked as `Published`.
|
// different states, only the non-terminal state will be marked as `Published`.
|
||||||
func TestMarkInputsPendingPublish(t *testing.T) {
|
func TestMarkInputsPendingPublish(t *testing.T) {
|
||||||
|
|
|
@ -331,7 +331,7 @@ func (t *txInputSet) add(input input.Input, constraints addConstraints) bool {
|
||||||
// up the utxo set even if it costs us some fees up front. In the spirit of
|
// up the utxo set even if it costs us some fees up front. In the spirit of
|
||||||
// minimizing any negative externalities we cause for the Bitcoin system as a
|
// minimizing any negative externalities we cause for the Bitcoin system as a
|
||||||
// whole.
|
// whole.
|
||||||
func (t *txInputSet) addPositiveYieldInputs(sweepableInputs []txInput) {
|
func (t *txInputSet) addPositiveYieldInputs(sweepableInputs []*pendingInput) {
|
||||||
for i, inp := range sweepableInputs {
|
for i, inp := range sweepableInputs {
|
||||||
// Apply relaxed constraints for force sweeps.
|
// Apply relaxed constraints for force sweeps.
|
||||||
constraints := constraintsRegular
|
constraints := constraintsRegular
|
||||||
|
|
|
@ -37,112 +37,6 @@ type txInput interface {
|
||||||
parameters() Params
|
parameters() Params
|
||||||
}
|
}
|
||||||
|
|
||||||
// inputSet is a set of inputs that can be used as the basis to generate a tx
|
|
||||||
// on.
|
|
||||||
type inputSet []input.Input
|
|
||||||
|
|
||||||
// generateInputPartitionings goes through all given inputs and constructs sets
|
|
||||||
// of inputs that can be used to generate a sensible transaction. Each set
|
|
||||||
// contains up to the configured maximum number of inputs. Negative yield
|
|
||||||
// inputs are skipped. No input sets with a total value after fees below the
|
|
||||||
// dust limit are returned.
|
|
||||||
func generateInputPartitionings(sweepableInputs []txInput,
|
|
||||||
feePerKW, maxFeeRate chainfee.SatPerKWeight, maxInputsPerTx int,
|
|
||||||
wallet Wallet) ([]inputSet, error) {
|
|
||||||
|
|
||||||
// Sort input by yield. We will start constructing input sets starting
|
|
||||||
// with the highest yield inputs. This is to prevent the construction
|
|
||||||
// of a set with an output below the dust limit, causing the sweep
|
|
||||||
// process to stop, while there are still higher value inputs
|
|
||||||
// available. It also allows us to stop evaluating more inputs when the
|
|
||||||
// first input in this ordering is encountered with a negative yield.
|
|
||||||
//
|
|
||||||
// Yield is calculated as the difference between value and added fee
|
|
||||||
// for this input. The fee calculation excludes fee components that are
|
|
||||||
// common to all inputs, as those wouldn't influence the order. The
|
|
||||||
// single component that is differentiating is witness size.
|
|
||||||
//
|
|
||||||
// For witness size, the upper limit is taken. The actual size depends
|
|
||||||
// on the signature length, which is not known yet at this point.
|
|
||||||
yields := make(map[wire.OutPoint]int64)
|
|
||||||
for _, input := range sweepableInputs {
|
|
||||||
size, _, err := input.WitnessType().SizeUpperBound()
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf(
|
|
||||||
"failed adding input weight: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
yields[*input.OutPoint()] = input.SignDesc().Output.Value -
|
|
||||||
int64(feePerKW.FeeForWeight(int64(size)))
|
|
||||||
}
|
|
||||||
|
|
||||||
sort.Slice(sweepableInputs, func(i, j int) bool {
|
|
||||||
// Because of the specific ordering and termination condition
|
|
||||||
// that is described above, we place force sweeps at the start
|
|
||||||
// of the list. Otherwise we can't be sure that they will be
|
|
||||||
// included in an input set.
|
|
||||||
if sweepableInputs[i].parameters().Force {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return yields[*sweepableInputs[i].OutPoint()] >
|
|
||||||
yields[*sweepableInputs[j].OutPoint()]
|
|
||||||
})
|
|
||||||
|
|
||||||
// Select blocks of inputs up to the configured maximum number.
|
|
||||||
var sets []inputSet
|
|
||||||
for len(sweepableInputs) > 0 {
|
|
||||||
// Start building a set of positive-yield tx inputs under the
|
|
||||||
// condition that the tx will be published with the specified
|
|
||||||
// fee rate.
|
|
||||||
txInputs := newTxInputSet(
|
|
||||||
wallet, feePerKW, maxFeeRate, maxInputsPerTx,
|
|
||||||
)
|
|
||||||
|
|
||||||
// From the set of sweepable inputs, keep adding inputs to the
|
|
||||||
// input set until the tx output value no longer goes up or the
|
|
||||||
// maximum number of inputs is reached.
|
|
||||||
txInputs.addPositiveYieldInputs(sweepableInputs)
|
|
||||||
|
|
||||||
// If there are no positive yield inputs, we can stop here.
|
|
||||||
inputCount := len(txInputs.inputs)
|
|
||||||
if inputCount == 0 {
|
|
||||||
return sets, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check the current output value and add wallet utxos if
|
|
||||||
// needed to push the output value to the lower limit.
|
|
||||||
if err := txInputs.tryAddWalletInputsIfNeeded(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the output value of this block of inputs does not reach
|
|
||||||
// the dust limit, stop sweeping. Because of the sorting,
|
|
||||||
// continuing with the remaining inputs will only lead to sets
|
|
||||||
// with an even lower output value.
|
|
||||||
if !txInputs.enoughInput() {
|
|
||||||
// The change output is always a p2tr here.
|
|
||||||
dl := lnwallet.DustLimitForSize(input.P2TRSize)
|
|
||||||
log.Debugf("Input set value %v (required=%v, "+
|
|
||||||
"change=%v) below dust limit of %v",
|
|
||||||
txInputs.totalOutput(), txInputs.requiredOutput,
|
|
||||||
txInputs.changeOutput, dl)
|
|
||||||
return sets, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Infof("Candidate sweep set of size=%v (+%v wallet inputs), "+
|
|
||||||
"has yield=%v, weight=%v",
|
|
||||||
inputCount, len(txInputs.inputs)-inputCount,
|
|
||||||
txInputs.totalOutput()-txInputs.walletInputTotal,
|
|
||||||
txInputs.weightEstimate(true).weight())
|
|
||||||
|
|
||||||
sets = append(sets, txInputs.inputs)
|
|
||||||
sweepableInputs = sweepableInputs[inputCount:]
|
|
||||||
}
|
|
||||||
|
|
||||||
return sets, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// createSweepTx builds a signed tx spending the inputs to the given outputs,
|
// createSweepTx builds a signed tx spending the inputs to the given outputs,
|
||||||
// sending any leftover change to the change script.
|
// sending any leftover change to the change script.
|
||||||
func createSweepTx(inputs []input.Input, outputs []*wire.TxOut,
|
func createSweepTx(inputs []input.Input, outputs []*wire.TxOut,
|
||||||
|
|
Loading…
Add table
Reference in a new issue