From 42818949dc26b801a23b408889ebb00d976ec1ee Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 24 Jan 2025 06:05:19 +0800 Subject: [PATCH] sweep: retry sweeping inputs upon `TxUnknownSpend` We now start handling `TxUnknownSpend` in our sweeper to make sure the failed inputs are retried when possible. --- sweep/sweeper.go | 144 +++++++++++++++++++++++++ sweep/sweeper_test.go | 243 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 387 insertions(+) diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 8f74a6da9..e1e870828 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -1847,6 +1847,12 @@ func (s *UtxoSweeper) handleBumpEvent(r *bumpResp) error { case TxReplaced: return s.handleBumpEventTxReplaced(r) + // There are inputs being spent in a tx which the fee bumper doesn't + // understand. We will remove the tx from the sweeper db and mark the + // inputs as swept. + case TxUnknownSpend: + s.handleBumpEventTxUnknownSpend(r) + // There's a fatal error in creating the tx, we will remove the tx from // the sweeper db and mark the inputs as failed. case TxFatal: @@ -1878,3 +1884,141 @@ func (s *UtxoSweeper) IsSweeperOutpoint(op wire.OutPoint) bool { return found } + +// markInputSwept marks the given input as swept by the tx. It will also notify +// all the subscribers of this input. +func (s *UtxoSweeper) markInputSwept(inp *SweeperInput, tx *wire.MsgTx) { + log.Debugf("Marking input as swept: %v from state=%v", inp.OutPoint(), + inp.state) + + inp.state = Swept + + // Signal result channels. + s.signalResult(inp, Result{ + Tx: tx, + }) + + // Remove all other inputs in this exclusive group. + if inp.params.ExclusiveGroup != nil { + s.removeExclusiveGroup(*inp.params.ExclusiveGroup) + } +} + +// handleUnknownSpendTx takes an input and its spending tx. If the spending tx +// cannot be found in the sweeper store, the input will be marked as fatal, +// otherwise it will be marked as swept. +func (s *UtxoSweeper) handleUnknownSpendTx(inp *SweeperInput, tx *wire.MsgTx) { + op := inp.OutPoint() + txid := tx.TxHash() + + isOurTx, err := s.cfg.Store.IsOurTx(txid) + if err != nil { + log.Errorf("Cannot determine if tx %v is ours: %v", txid, err) + return + } + + // If this is our tx, it means it's a previous sweeping tx that got + // confirmed, which could happen when a restart happens during the + // sweeping process. + if isOurTx { + log.Debugf("Found our sweeping tx %v, marking input %v as "+ + "swept", txid, op) + + // We now use the spending tx to update the state of the inputs. + s.markInputSwept(inp, tx) + + return + } + + // Since the input is spent by others, we now mark it as fatal and won't + // be retried. + s.markInputFatal(inp, ErrRemoteSpend) + + log.Debugf("Removing descendant txns invalidated by (txid=%v): %v", + txid, lnutils.SpewLogClosure(tx)) + + // Construct a map of the inputs this transaction spends. + spentInputs := make(map[wire.OutPoint]struct{}, len(tx.TxIn)) + for _, txIn := range tx.TxIn { + spentInputs[txIn.PreviousOutPoint] = struct{}{} + } + + err = s.removeConflictSweepDescendants(spentInputs) + if err != nil { + log.Warnf("unable to remove descendant transactions "+ + "due to tx %v: ", txid) + } +} + +// handleBumpEventTxUnknownSpend handles the case where the confirmed tx is +// unknown to the fee bumper. In the case when the sweeping tx has been replaced +// by another party with their tx being confirmed. It will retry sweeping the +// "good" inputs once the "bad" ones are kicked out. +func (s *UtxoSweeper) handleBumpEventTxUnknownSpend(r *bumpResp) { + // Mark the inputs as publish failed, which means they will be retried + // later. + s.markInputsPublishFailed(r.set) + + // Get all the inputs that are not spent in the current sweeping tx. + spentInputs := r.result.SpentInputs + + // Create a slice to track inputs to be retried. + inputsToRetry := make([]input.Input, 0, len(r.set.Inputs())) + + // Iterate all the inputs found in this bump and mark the ones spent by + // the third party as failed. The rest of inputs will then be updated + // with a new fee rate and be retried immediately. + for _, inp := range r.set.Inputs() { + op := inp.OutPoint() + input, ok := s.inputs[op] + + // Wallet inputs are not tracked so we will not find them from + // the inputs map. + if !ok { + log.Debugf("Skipped marking input: %v not found in "+ + "pending inputs", op) + + continue + } + + // Check whether this input has been spent, if so we mark it as + // fatal or swept based on whether this is one of our previous + // sweeping txns, then move to the next. + tx, spent := spentInputs[op] + if spent { + s.handleUnknownSpendTx(input, tx) + + continue + } + + log.Debugf("Input(%v): updating params: starting fee rate "+ + "[%v -> %v], immediate [%v -> true]", op, + input.params.StartingFeeRate, r.result.FeeRate, + input.params.Immediate) + + // Update the input using the fee rate specified from the + // BumpResult, which should be the starting fee rate to use for + // the next sweeping attempt. + input.params.StartingFeeRate = fn.Some(r.result.FeeRate) + input.params.Immediate = true + inputsToRetry = append(inputsToRetry, input) + } + + // Exit early if there are no inputs to be retried. + if len(inputsToRetry) == 0 { + return + } + + log.Debugf("Retry sweeping inputs with updated params: %v", + inputTypeSummary(inputsToRetry)) + + // Get the latest inputs, which should put the PublishFailed inputs back + // to the sweeping queue. + inputs := s.updateSweeperInputs() + + // Immediately sweep the remaining inputs - the previous inputs should + // now be swept with the updated StartingFeeRate immediately. We may + // also include more inputs in the new sweeping tx if new ones with the + // same deadline are offered. + s.sweepPendingInputs(inputs) +} diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index 40b25425d..da49c601e 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -1199,3 +1199,246 @@ func TestHandleBumpEventTxFatal(t *testing.T) { err = s.handleBumpEventTxFatal(resp) rt.NoError(err) } + +// TestHandleUnknownSpendTxOurs checks that `handleUnknownSpendTx` correctly +// marks an input as swept given the tx is ours. +func TestHandleUnknownSpendTxOurs(t *testing.T) { + t.Parallel() + + // Create a mock store. + store := &MockSweeperStore{} + defer store.AssertExpectations(t) + + // Create a mock input set. + set := &MockInputSet{} + defer set.AssertExpectations(t) + + // Create a test sweeper. + s := New(&UtxoSweeperConfig{ + Store: store, + }) + + // Create a mock input. + inp := createMockInput(t, s, PublishFailed) + op := inp.OutPoint() + + si, ok := s.inputs[op] + require.True(t, ok) + + // Create a testing tx that spends the input. + tx := &wire.MsgTx{ + LockTime: 1, + TxIn: []*wire.TxIn{ + {PreviousOutPoint: op}, + }, + } + txid := tx.TxHash() + + // Mock the store to return true when calling IsOurTx. + store.On("IsOurTx", txid).Return(true, nil).Once() + + // Call the method under test. + s.handleUnknownSpendTx(si, tx) + + // Assert the state of the input is updated. + require.Equal(t, Swept, s.inputs[op].state) +} + +// TestHandleUnknownSpendTxThirdParty checks that `handleUnknownSpendTx` +// correctly marks an input as fatal given the tx is not ours. +func TestHandleInputSpendTxThirdParty(t *testing.T) { + t.Parallel() + + // Create a mock store. + store := &MockSweeperStore{} + defer store.AssertExpectations(t) + + // Create a mock input set. + set := &MockInputSet{} + defer set.AssertExpectations(t) + + // Create a test sweeper. + s := New(&UtxoSweeperConfig{ + Store: store, + }) + + // Create a mock input. + inp := createMockInput(t, s, PublishFailed) + op := inp.OutPoint() + + si, ok := s.inputs[op] + require.True(t, ok) + + // Create a testing tx that spends the input. + tx := &wire.MsgTx{ + LockTime: 1, + TxIn: []*wire.TxIn{ + {PreviousOutPoint: op}, + }, + } + txid := tx.TxHash() + + // Mock the store to return false when calling IsOurTx. + store.On("IsOurTx", txid).Return(false, nil).Once() + + // Mock `ListSweeps` to return an empty slice as we are testing the + // workflow here, not the method `removeConflictSweepDescendants`. + store.On("ListSweeps").Return([]chainhash.Hash{}, nil).Once() + + // Call the method under test. + s.handleUnknownSpendTx(si, tx) + + // Assert the state of the input is updated. + require.Equal(t, Fatal, s.inputs[op].state) +} + +// TestHandleBumpEventTxUnknownSpendNoRetry checks the case when all the inputs +// are failed due to them being spent by another party. +func TestHandleBumpEventTxUnknownSpendNoRetry(t *testing.T) { + t.Parallel() + + // Create a mock store. + store := &MockSweeperStore{} + defer store.AssertExpectations(t) + + // Create a mock input set. + set := &MockInputSet{} + defer set.AssertExpectations(t) + + // Create a test sweeper. + s := New(&UtxoSweeperConfig{ + Store: store, + }) + + // Create a mock input. + inp := createMockInput(t, s, PendingPublish) + set.On("Inputs").Return([]input.Input{inp}) + + op := inp.OutPoint() + + // Create a testing tx that spends the input. + tx := &wire.MsgTx{ + LockTime: 1, + TxIn: []*wire.TxIn{ + {PreviousOutPoint: op}, + }, + } + txid := tx.TxHash() + + // Create a testing bump result. + br := &BumpResult{ + Tx: tx, + Event: TxUnknownSpend, + SpentInputs: map[wire.OutPoint]*wire.MsgTx{ + op: tx, + }, + } + + // Create a testing bump response. + resp := &bumpResp{ + result: br, + set: set, + } + + // Mock the store to return true when calling IsOurTx. + store.On("IsOurTx", txid).Return(true, nil).Once() + + // Call the method under test. + s.handleBumpEventTxUnknownSpend(resp) + + // Assert the state of the input is updated. + require.Equal(t, Swept, s.inputs[op].state) +} + +// TestHandleBumpEventTxUnknownSpendWithRetry checks the case when some the +// inputs are retried after the bad inputs are filtered out. +func TestHandleBumpEventTxUnknownSpendWithRetry(t *testing.T) { + t.Parallel() + + // Create a mock store. + store := &MockSweeperStore{} + defer store.AssertExpectations(t) + + // Create a mock wallet and aggregator. + wallet := &MockWallet{} + defer wallet.AssertExpectations(t) + + aggregator := &mockUtxoAggregator{} + defer aggregator.AssertExpectations(t) + + publisher := &MockBumper{} + defer publisher.AssertExpectations(t) + + // Create a test sweeper. + s := New(&UtxoSweeperConfig{ + Wallet: wallet, + Aggregator: aggregator, + Publisher: publisher, + GenSweepScript: func() fn.Result[lnwallet.AddrWithKey] { + //nolint:ll + return fn.Ok(lnwallet.AddrWithKey{ + DeliveryAddress: testPubKey.SerializeCompressed(), + }) + }, + NoDeadlineConfTarget: uint32(DefaultDeadlineDelta), + Store: store, + }) + + // Create a mock input set. + set := &MockInputSet{} + defer set.AssertExpectations(t) + + // Create mock inputs - inp1 will be the bad input, and inp2 will be + // retried. + inp1 := createMockInput(t, s, PendingPublish) + inp2 := createMockInput(t, s, PendingPublish) + set.On("Inputs").Return([]input.Input{inp1, inp2}) + + op1 := inp1.OutPoint() + op2 := inp2.OutPoint() + + inp2.On("RequiredLockTime").Return( + uint32(s.currentHeight), false).Once() + inp2.On("BlocksToMaturity").Return(uint32(0)).Once() + inp2.On("HeightHint").Return(uint32(s.currentHeight)).Once() + + // Create a testing tx that spends inp1. + tx := &wire.MsgTx{ + LockTime: 1, + TxIn: []*wire.TxIn{ + {PreviousOutPoint: op1}, + }, + } + txid := tx.TxHash() + + // Create a testing bump result. + br := &BumpResult{ + Tx: tx, + Event: TxUnknownSpend, + SpentInputs: map[wire.OutPoint]*wire.MsgTx{ + op1: tx, + }, + } + + // Create a testing bump response. + resp := &bumpResp{ + result: br, + set: set, + } + + // Mock the store to return true when calling IsOurTx. + store.On("IsOurTx", txid).Return(true, nil).Once() + + // Mock the aggregator to return an empty slice as we are not testing + // the actual sweeping behavior. + aggregator.On("ClusterInputs", mock.Anything).Return([]InputSet{}) + + // Call the method under test. + s.handleBumpEventTxUnknownSpend(resp) + + // Assert the first input is removed. + require.NotContains(t, s.inputs, op1) + + // Assert the state of the input is updated. + require.Equal(t, PublishFailed, s.inputs[op2].state) +}