diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 70482f1af..4d920846d 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -687,22 +687,64 @@ func (s *UtxoSweeper) removeExclusiveGroup(group uint64) { func (s *UtxoSweeper) sweepCluster(cluster inputCluster, currentHeight int32) error { - // Execute the sweep within a coin select lock. Otherwise the coins that - // we are going to spend may be selected for other transactions like - // funding of a channel. + // Execute the sweep within a coin select lock. Otherwise the coins + // that we are going to spend may be selected for other transactions + // like funding of a channel. return s.cfg.Wallet.WithCoinSelectLock(func() error { - // Examine pending inputs and try to construct - // lists of inputs. - inputLists, err := s.getInputLists(cluster, currentHeight) + // Examine pending inputs and try to construct lists of inputs. + allSets, newSets, err := s.getInputLists(cluster, currentHeight) if err != nil { - return fmt.Errorf("unable to examine pending inputs: %v", err) + return fmt.Errorf("examine pending inputs: %w", err) } - // Sweep selected inputs. - for _, inputs := range inputLists { - err := s.sweep(inputs, cluster.sweepFeeRate, currentHeight) + // errAllSets records the error from broadcasting the sweeping + // transactions for all input sets. + var errAllSets error + + // allSets contains retried inputs and new inputs. To avoid + // creating an RBF for the new inputs, we'd sweep this set + // first. + for _, inputs := range allSets { + errAllSets = s.sweep( + inputs, cluster.sweepFeeRate, currentHeight, + ) + // TODO(yy): we should also find out which set created + // this error. If there are new inputs in this set, we + // should give it a second chance by sweeping them + // below. To enable this, we need to provide richer + // state for each input other than just recording the + // publishAttempts. We'd also need to refactor how we + // create the input sets. Atm, the steps are, + // 1. create a list of input sets. + // 2. sweep each set by creating and publishing the tx. + // We should change the flow as, + // 1. create a list of input sets, and for each set, + // 2. when created, we create and publish the tx. + // 3. if the publish fails, find out which input is + // causing the failure and retry the rest of the + // inputs. + if errAllSets != nil { + log.Errorf("sweep all inputs: %w", err) + break + } + } + + // If we have successfully swept all inputs, there's no need to + // sweep the new inputs as it'd create an RBF case. + if allSets != nil && errAllSets == nil { + return nil + } + + // We'd end up there if there's no retried inputs. In this + // case, we'd sweep the new input sets. If there's an error + // when sweeping a given set, we'd log the error and sweep the + // next set. + for _, inputs := range newSets { + err := s.sweep( + inputs, cluster.sweepFeeRate, currentHeight, + ) if err != nil { - return fmt.Errorf("unable to sweep inputs: %v", err) + log.Errorf("sweep new inputs: %w", err) } } @@ -1017,23 +1059,25 @@ func (s *UtxoSweeper) signalAndRemove(outpoint *wire.OutPoint, result Result) { } // getInputLists goes through the given inputs and constructs multiple distinct -// sweep lists with the given fee rate, each up to the configured maximum number -// of inputs. Negative yield inputs are skipped. Transactions with an output -// below the dust limit are not published. Those inputs remain pending and will -// be bundled with future inputs if possible. +// sweep lists with the given fee rate, each up to the configured maximum +// number of inputs. Negative yield inputs are skipped. Transactions with an +// output below the dust limit are not published. Those inputs remain pending +// and will be bundled with future inputs if possible. It returns two list - +// one containing all inputs and the other containing only the new inputs. If +// there's no retried inputs, the first set returned will be empty. func (s *UtxoSweeper) getInputLists(cluster inputCluster, - currentHeight int32) ([]inputSet, error) { + currentHeight int32) ([]inputSet, []inputSet, error) { // Filter for inputs that need to be swept. Create two lists: all // sweepable inputs and a list containing only the new, never tried // inputs. // - // We want to create as large a tx as possible, so we return a final set - // list that starts with sets created from all inputs. However, there is - // a chance that those txes will not publish, because they already - // contain inputs that failed before. Therefore we also add sets - // consisting of only new inputs to the list, to make sure that new - // inputs are given a good, isolated chance of being published. + // We want to create as large a tx as possible, so we return a final + // set list that starts with sets created from all inputs. However, + // there is a chance that those txes will not publish, because they + // already contain inputs that failed before. Therefore we also add + // sets consisting of only new inputs to the list, to make sure that + // new inputs are given a good, isolated chance of being published. // // TODO(yy): this would lead to conflict transactions as the same input // can be used in two sweeping transactions, and our rebroadcaster will @@ -1070,7 +1114,8 @@ func (s *UtxoSweeper) getInputLists(cluster inputCluster, s.cfg.MaxInputsPerTx, s.cfg.Wallet, ) if err != nil { - return nil, fmt.Errorf("input partitionings: %v", err) + return nil, nil, fmt.Errorf("input partitionings: %w", + err) } } @@ -1080,15 +1125,13 @@ func (s *UtxoSweeper) getInputLists(cluster inputCluster, s.cfg.MaxInputsPerTx, s.cfg.Wallet, ) if err != nil { - return nil, fmt.Errorf("input partitionings: %v", err) + return nil, nil, fmt.Errorf("input partitionings: %w", err) } log.Debugf("Sweep candidates at height=%v: total_num_pending=%v, "+ "total_num_new=%v", currentHeight, len(allSets), len(newSets)) - // Append the new sets at the end of the list, because those tx likely - // have a higher fee per input. - return append(allSets, newSets...), nil + return allSets, newSets, nil } // sweep takes a set of preselected inputs, creates a sweep tx and publishes the diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index b10b22b8c..14150032c 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -860,10 +860,6 @@ func TestRetry(t *testing.T) { // We expect a sweep to be published. ctx.receiveTx() - // New block arrives. This should trigger a new sweep attempt timer - // start. - ctx.notifier.NotifyEpoch(1000) - // Offer a fresh input. resultChan1, err := ctx.sweeper.SweepInput( spendableInputs[1], defaultFeePref, @@ -872,9 +868,7 @@ func TestRetry(t *testing.T) { t.Fatal(err) } - // Two txes are expected to be published, because new and retry inputs - // are separated. - ctx.receiveTx() + // A single tx is expected to be published. ctx.receiveTx() ctx.backend.mine() @@ -2436,3 +2430,112 @@ func TestClusterByLockTime(t *testing.T) { }) } } + +// TestGetInputLists checks that the expected input sets are returned based on +// whether there are retried inputs or not. +func TestGetInputLists(t *testing.T) { + t.Parallel() + + // Create a test param with a dummy fee preference. This is needed so + // `feeRateForPreference` won't throw an error. + param := Params{Fee: FeePreference{ConfTarget: 1}} + + // Create a mock input and mock all the methods used in this test. + testInput := &input.MockInput{} + testInput.On("RequiredLockTime").Return(0, false) + testInput.On("WitnessType").Return(input.CommitmentAnchor) + testInput.On("OutPoint").Return(&wire.OutPoint{Index: 1}) + testInput.On("RequiredTxOut").Return(nil) + testInput.On("UnconfParent").Return(nil) + testInput.On("SignDesc").Return(&input.SignDescriptor{ + Output: &wire.TxOut{Value: 100_000}, + }) + + // Create a new and a retried input. + // + // NOTE: we use the same input.Input for both pending inputs as we only + // test the logic of returning the correct non-nil input sets, and not + // the content the of sets. To validate the content of the sets, we + // should test `generateInputPartitionings` instead. + newInput := &pendingInput{ + Input: testInput, + params: param, + } + oldInput := &pendingInput{ + Input: testInput, + params: param, + publishAttempts: 1, + } + + // clusterNew contains only new inputs. + clusterNew := pendingInputs{ + wire.OutPoint{Index: 1}: newInput, + } + + // clusterMixed contains a mixed of new and retried inputs. + clusterMixed := pendingInputs{ + wire.OutPoint{Index: 1}: newInput, + wire.OutPoint{Index: 2}: oldInput, + } + + // clusterOld contains only retried inputs. + clusterOld := pendingInputs{ + wire.OutPoint{Index: 2}: oldInput, + } + + // Create a test sweeper. + s := New(&UtxoSweeperConfig{ + MaxInputsPerTx: DefaultMaxInputsPerTx, + }) + + testCases := []struct { + name string + cluster inputCluster + expectedNilAllSet bool + expectNilNewSet bool + }{ + { + // When there are only new inputs, we'd expect the + // first returned set(allSets) to be empty. + name: "new inputs only", + cluster: inputCluster{inputs: clusterNew}, + expectedNilAllSet: true, + expectNilNewSet: false, + }, + { + // When there are only retried inputs, we'd expect the + // second returned set(newSet) to be empty. + name: "retried inputs only", + cluster: inputCluster{inputs: clusterOld}, + expectedNilAllSet: false, + expectNilNewSet: true, + }, + { + // When there are mixed inputs, we'd expect two sets + // are returned. + name: "mixed inputs", + cluster: inputCluster{inputs: clusterMixed}, + expectedNilAllSet: false, + expectNilNewSet: false, + }, + } + + for _, tc := range testCases { + tc := tc + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + allSets, newSets, err := s.getInputLists(tc.cluster, 0) + require.NoError(t, err) + + if tc.expectNilNewSet { + require.Nil(t, newSets) + } + + if tc.expectedNilAllSet { + require.Nil(t, allSets) + } + }) + } +}