From 9d5ddf29f3fff0d68ace9da195352dc9c3ac84a6 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 30 Oct 2023 18:22:29 +0800 Subject: [PATCH] sweep: add new interface `Cluster` to manage grouping inputs This commit adds a new interface `Cluster` to manage cluster-level inputs grouping. This new interface replaces the `inputCluster` and will be futher refactored so the sweeper can use a much smaller coin selection lock. --- sweep/aggregator.go | 164 ++++++++++++++++++++++++++++++++++++++++-- sweep/mocks.go | 4 +- sweep/sweeper.go | 139 +++-------------------------------- sweep/sweeper_test.go | 109 ---------------------------- sweep/tx_input_set.go | 2 +- sweep/txgenerator.go | 106 --------------------------- 6 files changed, 171 insertions(+), 353 deletions(-) diff --git a/sweep/aggregator.go b/sweep/aggregator.go index 6797e3573..dc75500e3 100644 --- a/sweep/aggregator.go +++ b/sweep/aggregator.go @@ -4,6 +4,8 @@ import ( "sort" "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/input" + "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet/chainfee" ) @@ -21,12 +23,152 @@ const ( DefaultFeeRateBucketSize = 10 ) +// inputSet is a set of inputs that can be used as the basis to generate a tx +// on. +type inputSet []input.Input + +// Cluster defines an interface that prepares inputs of a cluster to be grouped +// into a list of sets that can be used to create sweep transactions. +type Cluster interface { + // CreateInputSets goes through the cluster's inputs and constructs + // sets of inputs that can be used to generate a sensible transaction. + CreateInputSets(wallet Wallet, maxFeeRate chainfee.SatPerKWeight, + maxInputs int) ([]inputSet, error) + + // FeeRate returns the fee rate of the cluster. + FeeRate() chainfee.SatPerKWeight +} + +// Compile-time constraint to ensure inputCluster implements Cluster. +var _ Cluster = (*inputCluster)(nil) + +// inputCluster is a helper struct to gather a set of pending inputs that +// should be swept with the specified fee rate. +type inputCluster struct { + lockTime *uint32 + sweepFeeRate chainfee.SatPerKWeight + inputs pendingInputs +} + +// FeeRate returns the fee rate of the cluster. +func (c *inputCluster) FeeRate() chainfee.SatPerKWeight { + return c.sweepFeeRate +} + +// GroupInputs goes through the cluster's inputs and constructs sets of inputs +// that can be used to generate a sensible transaction. Each set contains up to +// the configured maximum number of inputs. Negative yield inputs are skipped. +// No input sets with a total value after fees below the dust limit are +// returned. +func (c *inputCluster) CreateInputSets( + wallet Wallet, maxFeeRate chainfee.SatPerKWeight, + maxInputs int) ([]inputSet, error) { + + // Turn the inputs into a slice so we can sort them. + inputList := make([]*pendingInput, 0, len(c.inputs)) + for _, input := range c.inputs { + inputList = append(inputList, input) + } + + // Yield is calculated as the difference between value and added fee + // for this input. The fee calculation excludes fee components that are + // common to all inputs, as those wouldn't influence the order. The + // single component that is differentiating is witness size. + // + // For witness size, the upper limit is taken. The actual size depends + // on the signature length, which is not known yet at this point. + calcYield := func(input *pendingInput) int64 { + size, _, err := input.WitnessType().SizeUpperBound() + if err != nil { + log.Errorf("Failed to get input weight: %v", err) + + return 0 + } + + yield := input.SignDesc().Output.Value - + int64(c.sweepFeeRate.FeeForWeight(int64(size))) + + return yield + } + + // Sort input by yield. We will start constructing input sets starting + // with the highest yield inputs. This is to prevent the construction + // of a set with an output below the dust limit, causing the sweep + // process to stop, while there are still higher value inputs + // available. It also allows us to stop evaluating more inputs when the + // first input in this ordering is encountered with a negative yield. + sort.Slice(inputList, func(i, j int) bool { + // Because of the specific ordering and termination condition + // that is described above, we place force sweeps at the start + // of the list. Otherwise we can't be sure that they will be + // included in an input set. + if inputList[i].parameters().Force { + return true + } + + return calcYield(inputList[i]) > calcYield(inputList[j]) + }) + + // Select blocks of inputs up to the configured maximum number. + var sets []inputSet + for len(inputList) > 0 { + // Start building a set of positive-yield tx inputs under the + // condition that the tx will be published with the specified + // fee rate. + txInputs := newTxInputSet( + wallet, c.sweepFeeRate, maxFeeRate, maxInputs, + ) + + // From the set of sweepable inputs, keep adding inputs to the + // input set until the tx output value no longer goes up or the + // maximum number of inputs is reached. + txInputs.addPositiveYieldInputs(inputList) + + // If there are no positive yield inputs, we can stop here. + inputCount := len(txInputs.inputs) + if inputCount == 0 { + return sets, nil + } + + // Check the current output value and add wallet utxos if + // needed to push the output value to the lower limit. + if err := txInputs.tryAddWalletInputsIfNeeded(); err != nil { + return nil, err + } + + // If the output value of this block of inputs does not reach + // the dust limit, stop sweeping. Because of the sorting, + // continuing with the remaining inputs will only lead to sets + // with an even lower output value. + if !txInputs.enoughInput() { + // The change output is always a p2tr here. + dl := lnwallet.DustLimitForSize(input.P2TRSize) + log.Debugf("Input set value %v (required=%v, "+ + "change=%v) below dust limit of %v", + txInputs.totalOutput(), txInputs.requiredOutput, + txInputs.changeOutput, dl) + return sets, nil + } + + log.Infof("Candidate sweep set of size=%v (+%v wallet inputs),"+ + " has yield=%v, weight=%v", + inputCount, len(txInputs.inputs)-inputCount, + txInputs.totalOutput()-txInputs.walletInputTotal, + txInputs.weightEstimate(true).weight()) + + sets = append(sets, txInputs.inputs) + inputList = inputList[inputCount:] + } + + return sets, nil +} + // UtxoAggregator defines an interface that takes a list of inputs and // aggregate them into groups. Each group is used as the inputs to create a // sweeping transaction. type UtxoAggregator interface { // ClusterInputs takes a list of inputs and groups them into clusters. - ClusterInputs(pendingInputs) []inputCluster + ClusterInputs(pendingInputs) []Cluster } // SimpleAggregator aggregates inputs known by the Sweeper based on each @@ -72,11 +214,7 @@ func NewSimpleUtxoAggregator(estimator chainfee.Estimator, // inputs known by the UtxoSweeper. It clusters inputs by // 1) Required tx locktime // 2) Similar fee rates. -// -// TODO(yy): remove this nolint once done refactoring. -// -//nolint:revive -func (s *SimpleAggregator) ClusterInputs(inputs pendingInputs) []inputCluster { +func (s *SimpleAggregator) ClusterInputs(inputs pendingInputs) []Cluster { // We start by getting the inputs clusters by locktime. Since the // inputs commit to the locktime, they can only be clustered together // if the locktime is equal. @@ -88,7 +226,19 @@ func (s *SimpleAggregator) ClusterInputs(inputs pendingInputs) []inputCluster { // Since the inputs that we clustered by fee rate don't commit to a // specific locktime, we can try to merge a locktime cluster with a fee // cluster. - return zipClusters(lockTimeClusters, feeClusters) + clusters := zipClusters(lockTimeClusters, feeClusters) + + sort.Slice(clusters, func(i, j int) bool { + return clusters[i].sweepFeeRate > + clusters[j].sweepFeeRate + }) + + result := make([]Cluster, 0, len(clusters)) + for _, c := range clusters { + result = append(result, &c) + } + + return result } // clusterByLockTime takes the given set of pending inputs and clusters those diff --git a/sweep/mocks.go b/sweep/mocks.go index 3c8882308..0e144ab4f 100644 --- a/sweep/mocks.go +++ b/sweep/mocks.go @@ -37,8 +37,8 @@ type mockUtxoAggregator struct { var _ UtxoAggregator = (*mockUtxoAggregator)(nil) // ClusterInputs takes a list of inputs and groups them into clusters. -func (m *mockUtxoAggregator) ClusterInputs(pendingInputs) []inputCluster { +func (m *mockUtxoAggregator) ClusterInputs(pendingInputs) []Cluster { args := m.Called(pendingInputs{}) - return args.Get(0).([]inputCluster) + return args.Get(0).([]Cluster) } diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 549095868..c3ba0a7e6 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "math/rand" - "sort" "sync" "sync/atomic" "time" @@ -224,14 +223,6 @@ func (p *pendingInput) terminated() bool { // pendingInputs is a type alias for a set of pending inputs. type pendingInputs = map[wire.OutPoint]*pendingInput -// inputCluster is a helper struct to gather a set of pending inputs that should -// be swept with the specified fee rate. -type inputCluster struct { - lockTime *uint32 - sweepFeeRate chainfee.SatPerKWeight - inputs pendingInputs -} - // pendingSweepsReq is an internal message we'll use to represent an external // caller's intent to retrieve all of the pending inputs the UtxoSweeper is // attempting to sweep. @@ -750,60 +741,26 @@ func (s *UtxoSweeper) removeExclusiveGroup(group uint64) { } // sweepCluster tries to sweep the given input cluster. -func (s *UtxoSweeper) sweepCluster(cluster inputCluster) error { +func (s *UtxoSweeper) sweepCluster(cluster Cluster) error { // Execute the sweep within a coin select lock. Otherwise the coins // that we are going to spend may be selected for other transactions // like funding of a channel. + // + // TODO(yy): decrease the lock scope. return s.cfg.Wallet.WithCoinSelectLock(func() error { // Examine pending inputs and try to construct lists of inputs. - allSets, newSets, err := s.getInputLists(cluster) + sets, err := cluster.CreateInputSets( + s.cfg.Wallet, + s.cfg.MaxFeeRate.FeePerKWeight(), + s.cfg.MaxInputsPerTx, + ) if err != nil { return fmt.Errorf("examine pending inputs: %w", err) } - // errAllSets records the error from broadcasting the sweeping - // transactions for all input sets. - var errAllSets error - - // allSets contains retried inputs and new inputs. To avoid - // creating an RBF for the new inputs, we'd sweep this set - // first. - for _, inputs := range allSets { - errAllSets = s.sweep(inputs, cluster.sweepFeeRate) - // TODO(yy): we should also find out which set created - // this error. If there are new inputs in this set, we - // should give it a second chance by sweeping them - // below. To enable this, we need to provide richer - // state for each input other than just recording the - // publishAttempts. We'd also need to refactor how we - // create the input sets. Atm, the steps are, - // 1. create a list of input sets. - // 2. sweep each set by creating and publishing the tx. - // We should change the flow as, - // 1. create a list of input sets, and for each set, - // 2. when created, we create and publish the tx. - // 3. if the publish fails, find out which input is - // causing the failure and retry the rest of the - // inputs. - if errAllSets != nil { - log.Errorf("Sweep all inputs got error: %v", - errAllSets) - break - } - } - - // If we have successfully swept all inputs, there's no need to - // sweep the new inputs as it'd create an RBF case. - if allSets != nil && errAllSets == nil { - return nil - } - - // We'd end up there if there's no retried inputs or the above - // sweeping tx failed. In this case, we'd sweep the new input - // sets. If there's an error when sweeping a given set, we'd - // log the error and sweep the next set. - for _, inputs := range newSets { - err := s.sweep(inputs, cluster.sweepFeeRate) + // Create sweeping transaction for each set. + for _, inputs := range sets { + err := s.sweep(inputs, cluster.FeeRate()) if err != nil { log.Errorf("sweep new inputs: %w", err) } @@ -844,76 +801,6 @@ func (s *UtxoSweeper) signalResult(pi *pendingInput, result Result) { } } -// getInputLists goes through the given inputs and constructs multiple distinct -// sweep lists with the given fee rate, each up to the configured maximum -// number of inputs. Negative yield inputs are skipped. Transactions with an -// output below the dust limit are not published. Those inputs remain pending -// and will be bundled with future inputs if possible. It returns two list - -// one containing all inputs and the other containing only the new inputs. If -// there's no retried inputs, the first set returned will be empty. -func (s *UtxoSweeper) getInputLists( - cluster inputCluster) ([]inputSet, []inputSet, error) { - - // Filter for inputs that need to be swept. Create two lists: all - // sweepable inputs and a list containing only the new, never tried - // inputs. - // - // We want to create as large a tx as possible, so we return a final - // set list that starts with sets created from all inputs. However, - // there is a chance that those txes will not publish, because they - // already contain inputs that failed before. Therefore we also add - // sets consisting of only new inputs to the list, to make sure that - // new inputs are given a good, isolated chance of being published. - // - // TODO(yy): this would lead to conflict transactions as the same input - // can be used in two sweeping transactions, and our rebroadcaster will - // retry the failed one. We should instead understand why the input is - // failed in the first place, and start tracking input states in - // sweeper to avoid this. - var newInputs, retryInputs []txInput - for _, input := range cluster.inputs { - // Add input to the either one of the lists. - if input.publishAttempts == 0 { - newInputs = append(newInputs, input) - } else { - retryInputs = append(retryInputs, input) - } - } - - // Convert the max fee rate's unit from sat/vb to sat/kw. - maxFeeRate := s.cfg.MaxFeeRate.FeePerKWeight() - - // If there is anything to retry, combine it with the new inputs and - // form input sets. - var allSets []inputSet - if len(retryInputs) > 0 { - var err error - allSets, err = generateInputPartitionings( - append(retryInputs, newInputs...), - cluster.sweepFeeRate, maxFeeRate, - s.cfg.MaxInputsPerTx, s.cfg.Wallet, - ) - if err != nil { - return nil, nil, fmt.Errorf("input partitionings: %w", - err) - } - } - - // Create sets for just the new inputs. - newSets, err := generateInputPartitionings( - newInputs, cluster.sweepFeeRate, maxFeeRate, - s.cfg.MaxInputsPerTx, s.cfg.Wallet, - ) - if err != nil { - return nil, nil, fmt.Errorf("input partitionings: %w", err) - } - - log.Debugf("Sweep candidates at height=%v: total_num_pending=%v, "+ - "total_num_new=%v", s.currentHeight, len(allSets), len(newSets)) - - return allSets, newSets, nil -} - // sweep takes a set of preselected inputs, creates a sweep tx and publishes the // tx. The output address is only marked as used if the publish succeeds. func (s *UtxoSweeper) sweep(inputs inputSet, @@ -1678,10 +1565,6 @@ func (s *UtxoSweeper) sweepPendingInputs(inputs pendingInputs) { // rate order. We do this to ensure any inputs which have had their fee // rate bumped are broadcast first in order enforce the RBF policy. inputClusters := s.cfg.Aggregator.ClusterInputs(inputs) - sort.Slice(inputClusters, func(i, j int) bool { - return inputClusters[i].sweepFeeRate > - inputClusters[j].sweepFeeRate - }) for _, cluster := range inputClusters { err := s.sweepCluster(cluster) diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index 0a914c4c6..a13b41fc8 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -1882,115 +1882,6 @@ func TestSweeperShutdownHandling(t *testing.T) { require.Error(t, err) } -// TestGetInputLists checks that the expected input sets are returned based on -// whether there are retried inputs or not. -func TestGetInputLists(t *testing.T) { - t.Parallel() - - // Create a test param with a dummy fee preference. This is needed so - // `feeRateForPreference` won't throw an error. - param := Params{Fee: FeeEstimateInfo{ConfTarget: 1}} - - // Create a mock input and mock all the methods used in this test. - testInput := &input.MockInput{} - testInput.On("RequiredLockTime").Return(0, false) - testInput.On("WitnessType").Return(input.CommitmentAnchor) - testInput.On("OutPoint").Return(&wire.OutPoint{Index: 1}) - testInput.On("RequiredTxOut").Return(nil) - testInput.On("UnconfParent").Return(nil) - testInput.On("SignDesc").Return(&input.SignDescriptor{ - Output: &wire.TxOut{Value: 100_000}, - }) - - // Create a new and a retried input. - // - // NOTE: we use the same input.Input for both pending inputs as we only - // test the logic of returning the correct non-nil input sets, and not - // the content the of sets. To validate the content of the sets, we - // should test `generateInputPartitionings` instead. - newInput := &pendingInput{ - Input: testInput, - params: param, - } - oldInput := &pendingInput{ - Input: testInput, - params: param, - publishAttempts: 1, - } - - // clusterNew contains only new inputs. - clusterNew := pendingInputs{ - wire.OutPoint{Index: 1}: newInput, - } - - // clusterMixed contains a mixed of new and retried inputs. - clusterMixed := pendingInputs{ - wire.OutPoint{Index: 1}: newInput, - wire.OutPoint{Index: 2}: oldInput, - } - - // clusterOld contains only retried inputs. - clusterOld := pendingInputs{ - wire.OutPoint{Index: 2}: oldInput, - } - - // Create a test sweeper. - s := New(&UtxoSweeperConfig{ - MaxInputsPerTx: DefaultMaxInputsPerTx, - }) - - testCases := []struct { - name string - cluster inputCluster - expectedNilAllSet bool - expectNilNewSet bool - }{ - { - // When there are only new inputs, we'd expect the - // first returned set(allSets) to be empty. - name: "new inputs only", - cluster: inputCluster{inputs: clusterNew}, - expectedNilAllSet: true, - expectNilNewSet: false, - }, - { - // When there are only retried inputs, we'd expect the - // second returned set(newSet) to be empty. - name: "retried inputs only", - cluster: inputCluster{inputs: clusterOld}, - expectedNilAllSet: false, - expectNilNewSet: true, - }, - { - // When there are mixed inputs, we'd expect two sets - // are returned. - name: "mixed inputs", - cluster: inputCluster{inputs: clusterMixed}, - expectedNilAllSet: false, - expectNilNewSet: false, - }, - } - - for _, tc := range testCases { - tc := tc - - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - allSets, newSets, err := s.getInputLists(tc.cluster) - require.NoError(t, err) - - if tc.expectNilNewSet { - require.Nil(t, newSets) - } - - if tc.expectedNilAllSet { - require.Nil(t, allSets) - } - }) - } -} - // TestMarkInputsPendingPublish checks that given a list of inputs with // different states, only the non-terminal state will be marked as `Published`. func TestMarkInputsPendingPublish(t *testing.T) { diff --git a/sweep/tx_input_set.go b/sweep/tx_input_set.go index ecec52eb9..8be408c89 100644 --- a/sweep/tx_input_set.go +++ b/sweep/tx_input_set.go @@ -331,7 +331,7 @@ func (t *txInputSet) add(input input.Input, constraints addConstraints) bool { // up the utxo set even if it costs us some fees up front. In the spirit of // minimizing any negative externalities we cause for the Bitcoin system as a // whole. -func (t *txInputSet) addPositiveYieldInputs(sweepableInputs []txInput) { +func (t *txInputSet) addPositiveYieldInputs(sweepableInputs []*pendingInput) { for i, inp := range sweepableInputs { // Apply relaxed constraints for force sweeps. constraints := constraintsRegular diff --git a/sweep/txgenerator.go b/sweep/txgenerator.go index 2fae5b886..3c6c6d378 100644 --- a/sweep/txgenerator.go +++ b/sweep/txgenerator.go @@ -37,112 +37,6 @@ type txInput interface { parameters() Params } -// inputSet is a set of inputs that can be used as the basis to generate a tx -// on. -type inputSet []input.Input - -// generateInputPartitionings goes through all given inputs and constructs sets -// of inputs that can be used to generate a sensible transaction. Each set -// contains up to the configured maximum number of inputs. Negative yield -// inputs are skipped. No input sets with a total value after fees below the -// dust limit are returned. -func generateInputPartitionings(sweepableInputs []txInput, - feePerKW, maxFeeRate chainfee.SatPerKWeight, maxInputsPerTx int, - wallet Wallet) ([]inputSet, error) { - - // Sort input by yield. We will start constructing input sets starting - // with the highest yield inputs. This is to prevent the construction - // of a set with an output below the dust limit, causing the sweep - // process to stop, while there are still higher value inputs - // available. It also allows us to stop evaluating more inputs when the - // first input in this ordering is encountered with a negative yield. - // - // Yield is calculated as the difference between value and added fee - // for this input. The fee calculation excludes fee components that are - // common to all inputs, as those wouldn't influence the order. The - // single component that is differentiating is witness size. - // - // For witness size, the upper limit is taken. The actual size depends - // on the signature length, which is not known yet at this point. - yields := make(map[wire.OutPoint]int64) - for _, input := range sweepableInputs { - size, _, err := input.WitnessType().SizeUpperBound() - if err != nil { - return nil, fmt.Errorf( - "failed adding input weight: %v", err) - } - - yields[*input.OutPoint()] = input.SignDesc().Output.Value - - int64(feePerKW.FeeForWeight(int64(size))) - } - - sort.Slice(sweepableInputs, func(i, j int) bool { - // Because of the specific ordering and termination condition - // that is described above, we place force sweeps at the start - // of the list. Otherwise we can't be sure that they will be - // included in an input set. - if sweepableInputs[i].parameters().Force { - return true - } - - return yields[*sweepableInputs[i].OutPoint()] > - yields[*sweepableInputs[j].OutPoint()] - }) - - // Select blocks of inputs up to the configured maximum number. - var sets []inputSet - for len(sweepableInputs) > 0 { - // Start building a set of positive-yield tx inputs under the - // condition that the tx will be published with the specified - // fee rate. - txInputs := newTxInputSet( - wallet, feePerKW, maxFeeRate, maxInputsPerTx, - ) - - // From the set of sweepable inputs, keep adding inputs to the - // input set until the tx output value no longer goes up or the - // maximum number of inputs is reached. - txInputs.addPositiveYieldInputs(sweepableInputs) - - // If there are no positive yield inputs, we can stop here. - inputCount := len(txInputs.inputs) - if inputCount == 0 { - return sets, nil - } - - // Check the current output value and add wallet utxos if - // needed to push the output value to the lower limit. - if err := txInputs.tryAddWalletInputsIfNeeded(); err != nil { - return nil, err - } - - // If the output value of this block of inputs does not reach - // the dust limit, stop sweeping. Because of the sorting, - // continuing with the remaining inputs will only lead to sets - // with an even lower output value. - if !txInputs.enoughInput() { - // The change output is always a p2tr here. - dl := lnwallet.DustLimitForSize(input.P2TRSize) - log.Debugf("Input set value %v (required=%v, "+ - "change=%v) below dust limit of %v", - txInputs.totalOutput(), txInputs.requiredOutput, - txInputs.changeOutput, dl) - return sets, nil - } - - log.Infof("Candidate sweep set of size=%v (+%v wallet inputs), "+ - "has yield=%v, weight=%v", - inputCount, len(txInputs.inputs)-inputCount, - txInputs.totalOutput()-txInputs.walletInputTotal, - txInputs.weightEstimate(true).weight()) - - sets = append(sets, txInputs.inputs) - sweepableInputs = sweepableInputs[inputCount:] - } - - return sets, nil -} - // createSweepTx builds a signed tx spending the inputs to the given outputs, // sending any leftover change to the change script. func createSweepTx(inputs []input.Input, outputs []*wire.TxOut,