sweep: remove possible RBF when creating sweeping tx for new inputs

This commit changes how we create the input sets which are used to
construct the sweeping transactions. Assume the sweeper has two inputs,
one is new and one is retried, we'd end up having two transactions,
- tx1: which spends both the new and old inputs.
- tx2: which spends the new inputs only.
When publishing these txes, depending on which one gets into the mempool
first, the other one will be viewed as an RBF for the first one since
they both spending the same input(the new input).

This is now fixed by only attempt to publish the second tx when there
isn't a first tx - when there is a tx1, it means the new inputs are
already used in this tx along with retried inputs, hence there's no need
to publish tx2 which spends the new inputs only.
This commit is contained in:
yyforyongyu 2023-10-12 20:35:46 +08:00
parent acc15d8113
commit 92837621ec
No known key found for this signature in database
GPG key ID: 9BCD95C4FF296868
2 changed files with 180 additions and 34 deletions

View file

@ -687,22 +687,64 @@ func (s *UtxoSweeper) removeExclusiveGroup(group uint64) {
func (s *UtxoSweeper) sweepCluster(cluster inputCluster,
currentHeight int32) error {
// Execute the sweep within a coin select lock. Otherwise the coins that
// we are going to spend may be selected for other transactions like
// funding of a channel.
// Execute the sweep within a coin select lock. Otherwise the coins
// that we are going to spend may be selected for other transactions
// like funding of a channel.
return s.cfg.Wallet.WithCoinSelectLock(func() error {
// Examine pending inputs and try to construct
// lists of inputs.
inputLists, err := s.getInputLists(cluster, currentHeight)
// Examine pending inputs and try to construct lists of inputs.
allSets, newSets, err := s.getInputLists(cluster, currentHeight)
if err != nil {
return fmt.Errorf("unable to examine pending inputs: %v", err)
return fmt.Errorf("examine pending inputs: %w", err)
}
// Sweep selected inputs.
for _, inputs := range inputLists {
err := s.sweep(inputs, cluster.sweepFeeRate, currentHeight)
// errAllSets records the error from broadcasting the sweeping
// transactions for all input sets.
var errAllSets error
// allSets contains retried inputs and new inputs. To avoid
// creating an RBF for the new inputs, we'd sweep this set
// first.
for _, inputs := range allSets {
errAllSets = s.sweep(
inputs, cluster.sweepFeeRate, currentHeight,
)
// TODO(yy): we should also find out which set created
// this error. If there are new inputs in this set, we
// should give it a second chance by sweeping them
// below. To enable this, we need to provide richer
// state for each input other than just recording the
// publishAttempts. We'd also need to refactor how we
// create the input sets. Atm, the steps are,
// 1. create a list of input sets.
// 2. sweep each set by creating and publishing the tx.
// We should change the flow as,
// 1. create a list of input sets, and for each set,
// 2. when created, we create and publish the tx.
// 3. if the publish fails, find out which input is
// causing the failure and retry the rest of the
// inputs.
if errAllSets != nil {
log.Errorf("sweep all inputs: %w", err)
break
}
}
// If we have successfully swept all inputs, there's no need to
// sweep the new inputs as it'd create an RBF case.
if allSets != nil && errAllSets == nil {
return nil
}
// We'd end up there if there's no retried inputs. In this
// case, we'd sweep the new input sets. If there's an error
// when sweeping a given set, we'd log the error and sweep the
// next set.
for _, inputs := range newSets {
err := s.sweep(
inputs, cluster.sweepFeeRate, currentHeight,
)
if err != nil {
return fmt.Errorf("unable to sweep inputs: %v", err)
log.Errorf("sweep new inputs: %w", err)
}
}
@ -1017,23 +1059,25 @@ func (s *UtxoSweeper) signalAndRemove(outpoint *wire.OutPoint, result Result) {
}
// getInputLists goes through the given inputs and constructs multiple distinct
// sweep lists with the given fee rate, each up to the configured maximum number
// of inputs. Negative yield inputs are skipped. Transactions with an output
// below the dust limit are not published. Those inputs remain pending and will
// be bundled with future inputs if possible.
// sweep lists with the given fee rate, each up to the configured maximum
// number of inputs. Negative yield inputs are skipped. Transactions with an
// output below the dust limit are not published. Those inputs remain pending
// and will be bundled with future inputs if possible. It returns two list -
// one containing all inputs and the other containing only the new inputs. If
// there's no retried inputs, the first set returned will be empty.
func (s *UtxoSweeper) getInputLists(cluster inputCluster,
currentHeight int32) ([]inputSet, error) {
currentHeight int32) ([]inputSet, []inputSet, error) {
// Filter for inputs that need to be swept. Create two lists: all
// sweepable inputs and a list containing only the new, never tried
// inputs.
//
// We want to create as large a tx as possible, so we return a final set
// list that starts with sets created from all inputs. However, there is
// a chance that those txes will not publish, because they already
// contain inputs that failed before. Therefore we also add sets
// consisting of only new inputs to the list, to make sure that new
// inputs are given a good, isolated chance of being published.
// We want to create as large a tx as possible, so we return a final
// set list that starts with sets created from all inputs. However,
// there is a chance that those txes will not publish, because they
// already contain inputs that failed before. Therefore we also add
// sets consisting of only new inputs to the list, to make sure that
// new inputs are given a good, isolated chance of being published.
//
// TODO(yy): this would lead to conflict transactions as the same input
// can be used in two sweeping transactions, and our rebroadcaster will
@ -1070,7 +1114,8 @@ func (s *UtxoSweeper) getInputLists(cluster inputCluster,
s.cfg.MaxInputsPerTx, s.cfg.Wallet,
)
if err != nil {
return nil, fmt.Errorf("input partitionings: %v", err)
return nil, nil, fmt.Errorf("input partitionings: %w",
err)
}
}
@ -1080,15 +1125,13 @@ func (s *UtxoSweeper) getInputLists(cluster inputCluster,
s.cfg.MaxInputsPerTx, s.cfg.Wallet,
)
if err != nil {
return nil, fmt.Errorf("input partitionings: %v", err)
return nil, nil, fmt.Errorf("input partitionings: %w", err)
}
log.Debugf("Sweep candidates at height=%v: total_num_pending=%v, "+
"total_num_new=%v", currentHeight, len(allSets), len(newSets))
// Append the new sets at the end of the list, because those tx likely
// have a higher fee per input.
return append(allSets, newSets...), nil
return allSets, newSets, nil
}
// sweep takes a set of preselected inputs, creates a sweep tx and publishes the

View file

@ -860,10 +860,6 @@ func TestRetry(t *testing.T) {
// We expect a sweep to be published.
ctx.receiveTx()
// New block arrives. This should trigger a new sweep attempt timer
// start.
ctx.notifier.NotifyEpoch(1000)
// Offer a fresh input.
resultChan1, err := ctx.sweeper.SweepInput(
spendableInputs[1], defaultFeePref,
@ -872,9 +868,7 @@ func TestRetry(t *testing.T) {
t.Fatal(err)
}
// Two txes are expected to be published, because new and retry inputs
// are separated.
ctx.receiveTx()
// A single tx is expected to be published.
ctx.receiveTx()
ctx.backend.mine()
@ -2436,3 +2430,112 @@ func TestClusterByLockTime(t *testing.T) {
})
}
}
// TestGetInputLists checks that the expected input sets are returned based on
// whether there are retried inputs or not.
func TestGetInputLists(t *testing.T) {
t.Parallel()
// Create a test param with a dummy fee preference. This is needed so
// `feeRateForPreference` won't throw an error.
param := Params{Fee: FeePreference{ConfTarget: 1}}
// Create a mock input and mock all the methods used in this test.
testInput := &input.MockInput{}
testInput.On("RequiredLockTime").Return(0, false)
testInput.On("WitnessType").Return(input.CommitmentAnchor)
testInput.On("OutPoint").Return(&wire.OutPoint{Index: 1})
testInput.On("RequiredTxOut").Return(nil)
testInput.On("UnconfParent").Return(nil)
testInput.On("SignDesc").Return(&input.SignDescriptor{
Output: &wire.TxOut{Value: 100_000},
})
// Create a new and a retried input.
//
// NOTE: we use the same input.Input for both pending inputs as we only
// test the logic of returning the correct non-nil input sets, and not
// the content the of sets. To validate the content of the sets, we
// should test `generateInputPartitionings` instead.
newInput := &pendingInput{
Input: testInput,
params: param,
}
oldInput := &pendingInput{
Input: testInput,
params: param,
publishAttempts: 1,
}
// clusterNew contains only new inputs.
clusterNew := pendingInputs{
wire.OutPoint{Index: 1}: newInput,
}
// clusterMixed contains a mixed of new and retried inputs.
clusterMixed := pendingInputs{
wire.OutPoint{Index: 1}: newInput,
wire.OutPoint{Index: 2}: oldInput,
}
// clusterOld contains only retried inputs.
clusterOld := pendingInputs{
wire.OutPoint{Index: 2}: oldInput,
}
// Create a test sweeper.
s := New(&UtxoSweeperConfig{
MaxInputsPerTx: DefaultMaxInputsPerTx,
})
testCases := []struct {
name string
cluster inputCluster
expectedNilAllSet bool
expectNilNewSet bool
}{
{
// When there are only new inputs, we'd expect the
// first returned set(allSets) to be empty.
name: "new inputs only",
cluster: inputCluster{inputs: clusterNew},
expectedNilAllSet: true,
expectNilNewSet: false,
},
{
// When there are only retried inputs, we'd expect the
// second returned set(newSet) to be empty.
name: "retried inputs only",
cluster: inputCluster{inputs: clusterOld},
expectedNilAllSet: false,
expectNilNewSet: true,
},
{
// When there are mixed inputs, we'd expect two sets
// are returned.
name: "mixed inputs",
cluster: inputCluster{inputs: clusterMixed},
expectedNilAllSet: false,
expectNilNewSet: false,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
allSets, newSets, err := s.getInputLists(tc.cluster, 0)
require.NoError(t, err)
if tc.expectNilNewSet {
require.Nil(t, newSets)
}
if tc.expectedNilAllSet {
require.Nil(t, allSets)
}
})
}
}