sweep: assign deadline values to inputs in handleNewInput

This commit changes how we transform from a deadline option to a
concrete deadline value - previously this is done when we decide to
cluster inputs, and we now move it to a step earlier - once an input is
received via `SweeperInput`, we will immediately transform its optional
deadline into a real value. For inputs that come with a deadline option,
since the Some will be used, it makes no difference. For inputs with
None as their deadlines, we need this change to make sure the default
deadlines are assigned accurately.
This commit is contained in:
yyforyongyu 2024-04-16 03:15:34 +08:00
parent 96883f307c
commit a50cdd64c5
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
5 changed files with 44 additions and 46 deletions

View File

@ -122,7 +122,7 @@ func (c *inputCluster) createInputSets(maxFeeRate chainfee.SatPerKWeight,
type UtxoAggregator interface {
// ClusterInputs takes a list of inputs and groups them into input
// sets. Each input set will be used to create a sweeping transaction.
ClusterInputs(inputs InputsMap, defaultDeadline int32) []InputSet
ClusterInputs(inputs InputsMap) []InputSet
}
// SimpleAggregator aggregates inputs known by the Sweeper based on each
@ -174,7 +174,7 @@ func NewSimpleUtxoAggregator(estimator chainfee.Estimator,
// inputs known by the UtxoSweeper. It clusters inputs by
// 1) Required tx locktime
// 2) Similar fee rates.
func (s *SimpleAggregator) ClusterInputs(inputs InputsMap, _ int32) []InputSet {
func (s *SimpleAggregator) ClusterInputs(inputs InputsMap) []InputSet {
// We start by getting the inputs clusters by locktime. Since the
// inputs commit to the locktime, they can only be clustered together
// if the locktime is equal.
@ -501,9 +501,7 @@ type clusterGroup map[int32][]SweeperInput
// 5. optionally split a cluster if it exceeds the max input limit.
// 6. create input sets from each of the clusters.
// 7. create input sets for each of the exclusive inputs.
func (b *BudgetAggregator) ClusterInputs(inputs InputsMap,
defaultDeadline int32) []InputSet {
func (b *BudgetAggregator) ClusterInputs(inputs InputsMap) []InputSet {
// Filter out inputs that have a budget below min relay fee.
filteredInputs := b.filterInputs(inputs)
@ -521,7 +519,7 @@ func (b *BudgetAggregator) ClusterInputs(inputs InputsMap,
for _, input := range filteredInputs {
// Get deadline height, and use the specified default deadline
// height if it's not set.
height := input.params.DeadlineHeight.UnwrapOr(defaultDeadline)
height := input.DeadlineHeight
// Put exclusive inputs in their own set.
if input.params.ExclusiveGroup != nil {

View File

@ -821,9 +821,9 @@ func TestBudgetInputSetClusterInputs(t *testing.T) {
// Create three deadline heights, which means there are three
// groups of inputs to be expected.
deadlineNone = fn.None[int32]()
deadline1 = fn.Some(int32(1))
deadline2 = fn.Some(int32(2))
defaultDeadline = testHeight + DefaultDeadlineDelta
deadline1 = int32(1)
deadline2 = int32(2)
)
// Create testing pending inputs.
@ -854,16 +854,16 @@ func TestBudgetInputSetClusterInputs(t *testing.T) {
Input: inpExclusive,
params: Params{
Budget: budgetHigh,
DeadlineHeight: deadline1,
ExclusiveGroup: &exclusiveGroup,
},
DeadlineHeight: deadline1,
}
// For each deadline height, create two inputs with different budgets,
// one below the min fee rate and one above it. We should see the lower
// one being filtered out.
for i, deadline := range []fn.Option[int32]{
deadlineNone, deadline1, deadline2,
for i, deadline := range []int32{
defaultDeadline, deadline1, deadline2,
} {
// Define three outpoints.
opLow := wire.OutPoint{
@ -918,8 +918,8 @@ func TestBudgetInputSetClusterInputs(t *testing.T) {
Input: inpLow,
params: Params{
Budget: budgetLow,
DeadlineHeight: deadline,
},
DeadlineHeight: deadline,
}
// Add the high inputs, which should be included.
@ -927,15 +927,15 @@ func TestBudgetInputSetClusterInputs(t *testing.T) {
Input: inpHigh1,
params: Params{
Budget: budgetHigh,
DeadlineHeight: deadline,
},
DeadlineHeight: deadline,
}
inputs[opHigh2] = &SweeperInput{
Input: inpHigh2,
params: Params{
Budget: budgetHigh,
DeadlineHeight: deadline,
},
DeadlineHeight: deadline,
}
}
@ -943,8 +943,7 @@ func TestBudgetInputSetClusterInputs(t *testing.T) {
b := NewBudgetAggregator(estimator, DefaultMaxInputsPerTx)
// Call the method under test.
defaultDeadline := testHeight + DefaultDeadlineDelta
result := b.ClusterInputs(inputs, defaultDeadline)
result := b.ClusterInputs(inputs)
// We expect four input sets to be returned, one for each deadline and
// extra one for the exclusive input.
@ -969,8 +968,8 @@ func TestBudgetInputSetClusterInputs(t *testing.T) {
// We expect to see all three deadlines.
require.Contains(t, deadlines, defaultDeadline)
require.Contains(t, deadlines, deadline1.UnwrapOrFail(t))
require.Contains(t, deadlines, deadline2.UnwrapOrFail(t))
require.Contains(t, deadlines, deadline1)
require.Contains(t, deadlines, deadline2)
}
// TestSplitOnLocktime asserts `splitOnLocktime` works as expected.

View File

@ -346,10 +346,8 @@ type mockUtxoAggregator struct {
var _ UtxoAggregator = (*mockUtxoAggregator)(nil)
// ClusterInputs takes a list of inputs and groups them into clusters.
func (m *mockUtxoAggregator) ClusterInputs(inputs InputsMap,
defaultDeadline int32) []InputSet {
args := m.Called(inputs, defaultDeadline)
func (m *mockUtxoAggregator) ClusterInputs(inputs InputsMap) []InputSet {
args := m.Called(inputs)
return args.Get(0).([]InputSet)
}

View File

@ -200,10 +200,10 @@ type SweeperInput struct {
// rbf records the RBF constraints.
rbf fn.Option[RBFInfo]
// deadlineHeight is the deadline height for this input. This is
// DeadlineHeight is the deadline height for this input. This is
// different from the DeadlineHeight in its params as it's an actual
// value than an option.
deadlineHeight int32
DeadlineHeight int32
}
// String returns a human readable interpretation of the pending input.
@ -872,10 +872,6 @@ func (s *UtxoSweeper) sweep(set InputSet) error {
// markInputsPendingPublish updates the pending inputs with the given tx
// inputs. It also increments the `publishAttempts`.
func (s *UtxoSweeper) markInputsPendingPublish(set InputSet) {
// Create a default deadline height, which will be used when there's no
// DeadlineHeight specified for a given input.
defaultDeadline := s.currentHeight + int32(s.cfg.NoDeadlineConfTarget)
// Reschedule sweep.
for _, input := range set.Inputs() {
pi, ok := s.inputs[input.OutPoint()]
@ -907,11 +903,6 @@ func (s *UtxoSweeper) markInputsPendingPublish(set InputSet) {
// Record another publish attempt.
pi.publishAttempts++
// Set the acutal deadline height.
pi.deadlineHeight = pi.params.DeadlineHeight.UnwrapOr(
defaultDeadline,
)
}
}
@ -1082,7 +1073,7 @@ func (s *UtxoSweeper) handlePendingSweepsReq(
LastFeeRate: inp.lastFeeRate,
BroadcastAttempts: inp.publishAttempts,
Params: inp.params,
DeadlineHeight: uint32(inp.deadlineHeight),
DeadlineHeight: uint32(inp.DeadlineHeight),
}
}
@ -1173,6 +1164,11 @@ func (s *UtxoSweeper) handleUpdateReq(req *updateReq) (
// TODO(yy): a dedicated state?
sweeperInput.state = Init
// If the new input specifies a deadline, update the deadline height.
sweeperInput.DeadlineHeight = req.params.DeadlineHeight.UnwrapOr(
sweeperInput.DeadlineHeight,
)
resultChan := make(chan Result, 1)
sweeperInput.listeners = append(sweeperInput.listeners, resultChan)
@ -1204,6 +1200,10 @@ func (s *UtxoSweeper) mempoolLookup(op wire.OutPoint) fn.Option[wire.MsgTx] {
// handleNewInput processes a new input by registering spend notification and
// scheduling sweeping for it.
func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error {
// Create a default deadline height, which will be used when there's no
// DeadlineHeight specified for a given input.
defaultDeadline := s.currentHeight + int32(s.cfg.NoDeadlineConfTarget)
outpoint := input.input.OutPoint()
pi, pending := s.inputs[outpoint]
if pending {
@ -1228,6 +1228,10 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error {
Input: input.input,
params: input.params,
rbf: rbfInfo,
// Set the acutal deadline height.
DeadlineHeight: input.params.DeadlineHeight.UnwrapOr(
defaultDeadline,
),
}
s.inputs[outpoint] = pi
@ -1343,6 +1347,11 @@ func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage,
oldInput.params = input.params
oldInput.Input = input.input
// If the new input specifies a deadline, update the deadline height.
oldInput.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr(
oldInput.DeadlineHeight,
)
// Add additional result channel to signal spend of this input.
oldInput.listeners = append(oldInput.listeners, input.resultChan)
@ -1541,12 +1550,8 @@ func (s *UtxoSweeper) updateSweeperInputs() InputsMap {
// sweepPendingInputs is called when the ticker fires. It will create clusters
// and attempt to create and publish the sweeping transactions.
func (s *UtxoSweeper) sweepPendingInputs(inputs InputsMap) {
// Create a default deadline height, which will be used when there's no
// DeadlineHeight specified for a given input.
defaultDeadline := s.currentHeight + int32(s.cfg.NoDeadlineConfTarget)
// Cluster all of our inputs based on the specific Aggregator.
sets := s.cfg.Aggregator.ClusterInputs(inputs, defaultDeadline)
sets := s.cfg.Aggregator.ClusterInputs(inputs)
// sweepWithLock is a helper closure that executes the sweep within a
// coin select lock to prevent the coins being selected for other

View File

@ -2747,9 +2747,7 @@ func TestSweepPendingInputs(t *testing.T) {
pis := make(InputsMap)
// Mock the aggregator to return the mocked input sets.
expectedDeadlineUsed := testHeight + DefaultDeadlineDelta
aggregator.On("ClusterInputs", pis,
expectedDeadlineUsed).Return([]InputSet{
aggregator.On("ClusterInputs", pis).Return([]InputSet{
setNeedWallet, normalSet,
})