sweep: retry sweeping inputs upon TxUnknownSpend

We now start handling `TxUnknownSpend` in our sweeper to make sure the
failed inputs are retried when possible.
This commit is contained in:
yyforyongyu 2025-01-24 06:05:19 +08:00
parent 2f1205a394
commit 42818949dc
No known key found for this signature in database
GPG key ID: 9BCD95C4FF296868
2 changed files with 387 additions and 0 deletions

View file

@ -1847,6 +1847,12 @@ func (s *UtxoSweeper) handleBumpEvent(r *bumpResp) error {
case TxReplaced:
return s.handleBumpEventTxReplaced(r)
// There are inputs being spent in a tx which the fee bumper doesn't
// understand. We will remove the tx from the sweeper db and mark the
// inputs as swept.
case TxUnknownSpend:
s.handleBumpEventTxUnknownSpend(r)
// There's a fatal error in creating the tx, we will remove the tx from
// the sweeper db and mark the inputs as failed.
case TxFatal:
@ -1878,3 +1884,141 @@ func (s *UtxoSweeper) IsSweeperOutpoint(op wire.OutPoint) bool {
return found
}
// markInputSwept marks the given input as swept by the tx. It will also notify
// all the subscribers of this input.
func (s *UtxoSweeper) markInputSwept(inp *SweeperInput, tx *wire.MsgTx) {
log.Debugf("Marking input as swept: %v from state=%v", inp.OutPoint(),
inp.state)
inp.state = Swept
// Signal result channels.
s.signalResult(inp, Result{
Tx: tx,
})
// Remove all other inputs in this exclusive group.
if inp.params.ExclusiveGroup != nil {
s.removeExclusiveGroup(*inp.params.ExclusiveGroup)
}
}
// handleUnknownSpendTx takes an input and its spending tx. If the spending tx
// cannot be found in the sweeper store, the input will be marked as fatal,
// otherwise it will be marked as swept.
func (s *UtxoSweeper) handleUnknownSpendTx(inp *SweeperInput, tx *wire.MsgTx) {
op := inp.OutPoint()
txid := tx.TxHash()
isOurTx, err := s.cfg.Store.IsOurTx(txid)
if err != nil {
log.Errorf("Cannot determine if tx %v is ours: %v", txid, err)
return
}
// If this is our tx, it means it's a previous sweeping tx that got
// confirmed, which could happen when a restart happens during the
// sweeping process.
if isOurTx {
log.Debugf("Found our sweeping tx %v, marking input %v as "+
"swept", txid, op)
// We now use the spending tx to update the state of the inputs.
s.markInputSwept(inp, tx)
return
}
// Since the input is spent by others, we now mark it as fatal and won't
// be retried.
s.markInputFatal(inp, ErrRemoteSpend)
log.Debugf("Removing descendant txns invalidated by (txid=%v): %v",
txid, lnutils.SpewLogClosure(tx))
// Construct a map of the inputs this transaction spends.
spentInputs := make(map[wire.OutPoint]struct{}, len(tx.TxIn))
for _, txIn := range tx.TxIn {
spentInputs[txIn.PreviousOutPoint] = struct{}{}
}
err = s.removeConflictSweepDescendants(spentInputs)
if err != nil {
log.Warnf("unable to remove descendant transactions "+
"due to tx %v: ", txid)
}
}
// handleBumpEventTxUnknownSpend handles the case where the confirmed tx is
// unknown to the fee bumper. In the case when the sweeping tx has been replaced
// by another party with their tx being confirmed. It will retry sweeping the
// "good" inputs once the "bad" ones are kicked out.
func (s *UtxoSweeper) handleBumpEventTxUnknownSpend(r *bumpResp) {
// Mark the inputs as publish failed, which means they will be retried
// later.
s.markInputsPublishFailed(r.set)
// Get all the inputs that are not spent in the current sweeping tx.
spentInputs := r.result.SpentInputs
// Create a slice to track inputs to be retried.
inputsToRetry := make([]input.Input, 0, len(r.set.Inputs()))
// Iterate all the inputs found in this bump and mark the ones spent by
// the third party as failed. The rest of inputs will then be updated
// with a new fee rate and be retried immediately.
for _, inp := range r.set.Inputs() {
op := inp.OutPoint()
input, ok := s.inputs[op]
// Wallet inputs are not tracked so we will not find them from
// the inputs map.
if !ok {
log.Debugf("Skipped marking input: %v not found in "+
"pending inputs", op)
continue
}
// Check whether this input has been spent, if so we mark it as
// fatal or swept based on whether this is one of our previous
// sweeping txns, then move to the next.
tx, spent := spentInputs[op]
if spent {
s.handleUnknownSpendTx(input, tx)
continue
}
log.Debugf("Input(%v): updating params: starting fee rate "+
"[%v -> %v], immediate [%v -> true]", op,
input.params.StartingFeeRate, r.result.FeeRate,
input.params.Immediate)
// Update the input using the fee rate specified from the
// BumpResult, which should be the starting fee rate to use for
// the next sweeping attempt.
input.params.StartingFeeRate = fn.Some(r.result.FeeRate)
input.params.Immediate = true
inputsToRetry = append(inputsToRetry, input)
}
// Exit early if there are no inputs to be retried.
if len(inputsToRetry) == 0 {
return
}
log.Debugf("Retry sweeping inputs with updated params: %v",
inputTypeSummary(inputsToRetry))
// Get the latest inputs, which should put the PublishFailed inputs back
// to the sweeping queue.
inputs := s.updateSweeperInputs()
// Immediately sweep the remaining inputs - the previous inputs should
// now be swept with the updated StartingFeeRate immediately. We may
// also include more inputs in the new sweeping tx if new ones with the
// same deadline are offered.
s.sweepPendingInputs(inputs)
}

View file

@ -1199,3 +1199,246 @@ func TestHandleBumpEventTxFatal(t *testing.T) {
err = s.handleBumpEventTxFatal(resp)
rt.NoError(err)
}
// TestHandleUnknownSpendTxOurs checks that `handleUnknownSpendTx` correctly
// marks an input as swept given the tx is ours.
func TestHandleUnknownSpendTxOurs(t *testing.T) {
t.Parallel()
// Create a mock store.
store := &MockSweeperStore{}
defer store.AssertExpectations(t)
// Create a mock input set.
set := &MockInputSet{}
defer set.AssertExpectations(t)
// Create a test sweeper.
s := New(&UtxoSweeperConfig{
Store: store,
})
// Create a mock input.
inp := createMockInput(t, s, PublishFailed)
op := inp.OutPoint()
si, ok := s.inputs[op]
require.True(t, ok)
// Create a testing tx that spends the input.
tx := &wire.MsgTx{
LockTime: 1,
TxIn: []*wire.TxIn{
{PreviousOutPoint: op},
},
}
txid := tx.TxHash()
// Mock the store to return true when calling IsOurTx.
store.On("IsOurTx", txid).Return(true, nil).Once()
// Call the method under test.
s.handleUnknownSpendTx(si, tx)
// Assert the state of the input is updated.
require.Equal(t, Swept, s.inputs[op].state)
}
// TestHandleUnknownSpendTxThirdParty checks that `handleUnknownSpendTx`
// correctly marks an input as fatal given the tx is not ours.
func TestHandleInputSpendTxThirdParty(t *testing.T) {
t.Parallel()
// Create a mock store.
store := &MockSweeperStore{}
defer store.AssertExpectations(t)
// Create a mock input set.
set := &MockInputSet{}
defer set.AssertExpectations(t)
// Create a test sweeper.
s := New(&UtxoSweeperConfig{
Store: store,
})
// Create a mock input.
inp := createMockInput(t, s, PublishFailed)
op := inp.OutPoint()
si, ok := s.inputs[op]
require.True(t, ok)
// Create a testing tx that spends the input.
tx := &wire.MsgTx{
LockTime: 1,
TxIn: []*wire.TxIn{
{PreviousOutPoint: op},
},
}
txid := tx.TxHash()
// Mock the store to return false when calling IsOurTx.
store.On("IsOurTx", txid).Return(false, nil).Once()
// Mock `ListSweeps` to return an empty slice as we are testing the
// workflow here, not the method `removeConflictSweepDescendants`.
store.On("ListSweeps").Return([]chainhash.Hash{}, nil).Once()
// Call the method under test.
s.handleUnknownSpendTx(si, tx)
// Assert the state of the input is updated.
require.Equal(t, Fatal, s.inputs[op].state)
}
// TestHandleBumpEventTxUnknownSpendNoRetry checks the case when all the inputs
// are failed due to them being spent by another party.
func TestHandleBumpEventTxUnknownSpendNoRetry(t *testing.T) {
t.Parallel()
// Create a mock store.
store := &MockSweeperStore{}
defer store.AssertExpectations(t)
// Create a mock input set.
set := &MockInputSet{}
defer set.AssertExpectations(t)
// Create a test sweeper.
s := New(&UtxoSweeperConfig{
Store: store,
})
// Create a mock input.
inp := createMockInput(t, s, PendingPublish)
set.On("Inputs").Return([]input.Input{inp})
op := inp.OutPoint()
// Create a testing tx that spends the input.
tx := &wire.MsgTx{
LockTime: 1,
TxIn: []*wire.TxIn{
{PreviousOutPoint: op},
},
}
txid := tx.TxHash()
// Create a testing bump result.
br := &BumpResult{
Tx: tx,
Event: TxUnknownSpend,
SpentInputs: map[wire.OutPoint]*wire.MsgTx{
op: tx,
},
}
// Create a testing bump response.
resp := &bumpResp{
result: br,
set: set,
}
// Mock the store to return true when calling IsOurTx.
store.On("IsOurTx", txid).Return(true, nil).Once()
// Call the method under test.
s.handleBumpEventTxUnknownSpend(resp)
// Assert the state of the input is updated.
require.Equal(t, Swept, s.inputs[op].state)
}
// TestHandleBumpEventTxUnknownSpendWithRetry checks the case when some the
// inputs are retried after the bad inputs are filtered out.
func TestHandleBumpEventTxUnknownSpendWithRetry(t *testing.T) {
t.Parallel()
// Create a mock store.
store := &MockSweeperStore{}
defer store.AssertExpectations(t)
// Create a mock wallet and aggregator.
wallet := &MockWallet{}
defer wallet.AssertExpectations(t)
aggregator := &mockUtxoAggregator{}
defer aggregator.AssertExpectations(t)
publisher := &MockBumper{}
defer publisher.AssertExpectations(t)
// Create a test sweeper.
s := New(&UtxoSweeperConfig{
Wallet: wallet,
Aggregator: aggregator,
Publisher: publisher,
GenSweepScript: func() fn.Result[lnwallet.AddrWithKey] {
//nolint:ll
return fn.Ok(lnwallet.AddrWithKey{
DeliveryAddress: testPubKey.SerializeCompressed(),
})
},
NoDeadlineConfTarget: uint32(DefaultDeadlineDelta),
Store: store,
})
// Create a mock input set.
set := &MockInputSet{}
defer set.AssertExpectations(t)
// Create mock inputs - inp1 will be the bad input, and inp2 will be
// retried.
inp1 := createMockInput(t, s, PendingPublish)
inp2 := createMockInput(t, s, PendingPublish)
set.On("Inputs").Return([]input.Input{inp1, inp2})
op1 := inp1.OutPoint()
op2 := inp2.OutPoint()
inp2.On("RequiredLockTime").Return(
uint32(s.currentHeight), false).Once()
inp2.On("BlocksToMaturity").Return(uint32(0)).Once()
inp2.On("HeightHint").Return(uint32(s.currentHeight)).Once()
// Create a testing tx that spends inp1.
tx := &wire.MsgTx{
LockTime: 1,
TxIn: []*wire.TxIn{
{PreviousOutPoint: op1},
},
}
txid := tx.TxHash()
// Create a testing bump result.
br := &BumpResult{
Tx: tx,
Event: TxUnknownSpend,
SpentInputs: map[wire.OutPoint]*wire.MsgTx{
op1: tx,
},
}
// Create a testing bump response.
resp := &bumpResp{
result: br,
set: set,
}
// Mock the store to return true when calling IsOurTx.
store.On("IsOurTx", txid).Return(true, nil).Once()
// Mock the aggregator to return an empty slice as we are not testing
// the actual sweeping behavior.
aggregator.On("ClusterInputs", mock.Anything).Return([]InputSet{})
// Call the method under test.
s.handleBumpEventTxUnknownSpend(resp)
// Assert the first input is removed.
require.NotContains(t, s.inputs, op1)
// Assert the state of the input is updated.
require.Equal(t, PublishFailed, s.inputs[op2].state)
}