From fd922942a7cb5599b31f87811d801c79fc1ff461 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 11 Jan 2024 07:54:32 +0800 Subject: [PATCH] sweep: patch unit tests for `markInputsSwept` and `markInputsPendingPublish` Now that the refactor is done, we start patching unit tests for these two methods. Minor changes are also made based on the feedback from the tests. --- sweep/sweeper.go | 61 +++++++++------ sweep/sweeper_test.go | 169 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 206 insertions(+), 24 deletions(-) diff --git a/sweep/sweeper.go b/sweep/sweeper.go index cb4e67784..1aca20012 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -950,7 +950,7 @@ func (s *UtxoSweeper) sweep(inputs inputSet, // Reschedule the inputs that we just tried to sweep. This is done in // case the following publish fails, we'd like to update the inputs' // publish attempts and rescue them in the next sweep. - err = s.markInputsPendingPublish(tr, tx) + err = s.markInputsPendingPublish(tr, tx.TxIn) if err != nil { return err } @@ -986,7 +986,7 @@ func (s *UtxoSweeper) sweep(inputs inputSet, // markInputsPendingPublish saves the sweeping tx to db and updates the pending // inputs with the given tx inputs. It also increments the `publishAttempts`. func (s *UtxoSweeper) markInputsPendingPublish(tr *TxRecord, - tx *wire.MsgTx) error { + inputs []*wire.TxIn) error { // Add tx to db before publication, so that we will always know that a // spend by this tx is ours. Otherwise if the publish doesn't return, @@ -999,14 +999,28 @@ func (s *UtxoSweeper) markInputsPendingPublish(tr *TxRecord, } // Reschedule sweep. - for _, input := range tx.TxIn { + for _, input := range inputs { pi, ok := s.pendingInputs[input.PreviousOutPoint] if !ok { - // It can be that the input has been removed because it - // exceed the maximum number of attempts in a previous - // input set. It could also be that this input is an - // additional wallet input that was attached. In that - // case there also isn't a pending input to update. + // It could be that this input is an additional wallet + // input that was attached. In that case there also + // isn't a pending input to update. + log.Debugf("Skipped marking input as pending "+ + "published: %v not found in pending inputs", + input.PreviousOutPoint) + + continue + } + + // If this input has already terminated, there's clearly + // something wrong as it would have been removed. In this case + // we log an error and skip marking this input as pending + // publish. + if pi.terminated() { + log.Errorf("Expect input %v to not have terminated "+ + "state, instead it has %v", + input.PreviousOutPoint, pi.state) + continue } @@ -1016,7 +1030,7 @@ func (s *UtxoSweeper) markInputsPendingPublish(tr *TxRecord, // Record the fees and fee rate of this tx to prepare possible // RBF. pi.rbf = fn.Some(RBFInfo{ - Txid: tx.TxHash(), + Txid: tr.Txid, FeeRate: chainfee.SatPerKWeight(tr.FeeRate), Fee: btcutil.Amount(tr.Fee), }) @@ -1047,11 +1061,9 @@ func (s *UtxoSweeper) markInputsPublished(tr *TxRecord, for _, input := range inputs { pi, ok := s.pendingInputs[input.PreviousOutPoint] if !ok { - // It can be that the input has been removed because it - // exceed the maximum number of attempts in a previous - // input set. It could also be that this input is an - // additional wallet input that was attached. In that - // case there also isn't a pending input to update. + // It could be that this input is an additional wallet + // input that was attached. In that case there also + // isn't a pending input to update. log.Debugf("Skipped marking input as published: %v "+ "not found in pending inputs", input.PreviousOutPoint) @@ -1081,11 +1093,9 @@ func (s *UtxoSweeper) markInputsPublishFailed(inputs []*wire.TxIn) { for _, input := range inputs { pi, ok := s.pendingInputs[input.PreviousOutPoint] if !ok { - // It can be that the input has been removed because it - // exceed the maximum number of attempts in a previous - // input set. It could also be that this input is an - // additional wallet input that was attached. In that - // case there also isn't a pending input to update. + // It could be that this input is an additional wallet + // input that was attached. In that case there also + // isn't a pending input to update. log.Debugf("Skipped marking input as publish failed: "+ "%v not found in pending inputs", input.PreviousOutPoint) @@ -1576,7 +1586,7 @@ func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail) { // markInputsSwept marks all inputs swept by the spending transaction as swept. // It will also notify all the subscribers of this input. -func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) error { +func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) { for _, txIn := range tx.TxIn { outpoint := txIn.PreviousOutPoint @@ -1586,6 +1596,11 @@ func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) error { // could be not one of our inputs. input, ok := s.pendingInputs[outpoint] if !ok { + // It's very likely that a spending tx contains inputs + // that we don't know. + log.Debugf("Skipped marking input as swept: %v not "+ + "found in pending inputs", outpoint) + continue } @@ -1593,8 +1608,8 @@ func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) error { // spend notification, which is likely to happen as one sweep // transaction usually sweeps multiple inputs. if input.terminated() { - log.Tracef("Skipped sending swept result for input %v,"+ - " state=%v", outpoint, input.state) + log.Debugf("Skipped marking input as swept: %v "+ + "state=%v", outpoint, input.state) continue } @@ -1620,8 +1635,6 @@ func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) error { s.removeExclusiveGroup(*input.params.ExclusiveGroup) } } - - return nil } // markInputFailed marks the given input as failed and won't be retried. It diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index c77a21dbc..4fea4a419 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -1985,6 +1985,96 @@ func TestGetInputLists(t *testing.T) { } } +// TestMarkInputsPendingPublish checks that given a list of inputs with +// different states, only the non-terminal state will be marked as `Published`. +func TestMarkInputsPendingPublish(t *testing.T) { + t.Parallel() + + require := require.New(t) + + // Create a mock sweeper store. + mockStore := NewMockSweeperStore() + + // Create a test TxRecord and a dummy error. + dummyTR := &TxRecord{} + dummyErr := errors.New("dummy error") + + // Create a test sweeper. + s := New(&UtxoSweeperConfig{ + Store: mockStore, + }) + + // Create three testing inputs. + // + // inputNotExist specifies an input that's not found in the sweeper's + // `pendingInputs` map. + inputNotExist := &wire.TxIn{ + PreviousOutPoint: wire.OutPoint{Index: 1}, + } + + // inputInit specifies a newly created input. + inputInit := &wire.TxIn{ + PreviousOutPoint: wire.OutPoint{Index: 2}, + } + s.pendingInputs[inputInit.PreviousOutPoint] = &pendingInput{ + state: StateInit, + } + + // inputPendingPublish specifies an input that's about to be published. + inputPendingPublish := &wire.TxIn{ + PreviousOutPoint: wire.OutPoint{Index: 3}, + } + s.pendingInputs[inputPendingPublish.PreviousOutPoint] = &pendingInput{ + state: StatePendingPublish, + } + + // inputTerminated specifies an input that's terminated. + inputTerminated := &wire.TxIn{ + PreviousOutPoint: wire.OutPoint{Index: 4}, + } + s.pendingInputs[inputTerminated.PreviousOutPoint] = &pendingInput{ + state: StateExcluded, + } + + // First, check that when an error is returned from db, it's properly + // returned here. + mockStore.On("StoreTx", dummyTR).Return(dummyErr).Once() + err := s.markInputsPendingPublish(dummyTR, nil) + require.ErrorIs(err, dummyErr) + + // Then, check that the target input has will be correctly marked as + // published. + // + // Mock the store to return nil + mockStore.On("StoreTx", dummyTR).Return(nil).Once() + + // Mark the test inputs. We expect the non-exist input and the + // inputTerminated to be skipped, and the rest to be marked as pending + // publish. + err = s.markInputsPendingPublish(dummyTR, []*wire.TxIn{ + inputNotExist, inputInit, inputPendingPublish, inputTerminated, + }) + require.NoError(err) + + // We expect unchanged number of pending inputs. + require.Len(s.pendingInputs, 3) + + // We expect the init input's state to become pending publish. + require.Equal(StatePendingPublish, + s.pendingInputs[inputInit.PreviousOutPoint].state) + + // We expect the pending-publish to stay unchanged. + require.Equal(StatePendingPublish, + s.pendingInputs[inputPendingPublish.PreviousOutPoint].state) + + // We expect the terminated to stay unchanged. + require.Equal(StateExcluded, + s.pendingInputs[inputTerminated.PreviousOutPoint].state) + + // Assert mocked statements are executed as expected. + mockStore.AssertExpectations(t) +} + // TestMarkInputsPublished checks that given a list of inputs with different // states, only the state `StatePendingPublish` will be marked as `Published`. func TestMarkInputsPublished(t *testing.T) { @@ -2133,6 +2223,85 @@ func TestMarkInputsPublishFailed(t *testing.T) { mockStore.AssertExpectations(t) } +// TestMarkInputsSwept checks that given a list of inputs with different +// states, only the non-terminal state will be marked as `StateSwept`. +func TestMarkInputsSwept(t *testing.T) { + t.Parallel() + + require := require.New(t) + + // Create a mock input. + mockInput := &input.MockInput{} + defer mockInput.AssertExpectations(t) + + // Mock the `OutPoint` to return a dummy outpoint. + mockInput.On("OutPoint").Return(&wire.OutPoint{Hash: chainhash.Hash{1}}) + + // Create a test sweeper. + s := New(&UtxoSweeperConfig{}) + + // Create three testing inputs. + // + // inputNotExist specifies an input that's not found in the sweeper's + // `pendingInputs` map. + inputNotExist := &wire.TxIn{ + PreviousOutPoint: wire.OutPoint{Index: 1}, + } + + // inputInit specifies a newly created input. + inputInit := &wire.TxIn{ + PreviousOutPoint: wire.OutPoint{Index: 2}, + } + s.pendingInputs[inputInit.PreviousOutPoint] = &pendingInput{ + state: StateInit, + Input: mockInput, + } + + // inputPendingPublish specifies an input that's about to be published. + inputPendingPublish := &wire.TxIn{ + PreviousOutPoint: wire.OutPoint{Index: 3}, + } + s.pendingInputs[inputPendingPublish.PreviousOutPoint] = &pendingInput{ + state: StatePendingPublish, + Input: mockInput, + } + + // inputTerminated specifies an input that's terminated. + inputTerminated := &wire.TxIn{ + PreviousOutPoint: wire.OutPoint{Index: 4}, + } + s.pendingInputs[inputTerminated.PreviousOutPoint] = &pendingInput{ + state: StateExcluded, + Input: mockInput, + } + + tx := &wire.MsgTx{ + TxIn: []*wire.TxIn{ + inputNotExist, inputInit, + inputPendingPublish, inputTerminated, + }, + } + + // Mark the test inputs. We expect the inputTerminated to be skipped, + // and the rest to be marked as swept. + s.markInputsSwept(tx, true) + + // We expect unchanged number of pending inputs. + require.Len(s.pendingInputs, 3) + + // We expect the init input's state to become swept. + require.Equal(StateSwept, + s.pendingInputs[inputInit.PreviousOutPoint].state) + + // We expect the pending-publish becomes swept. + require.Equal(StateSwept, + s.pendingInputs[inputPendingPublish.PreviousOutPoint].state) + + // We expect the terminated to stay unchanged. + require.Equal(StateExcluded, + s.pendingInputs[inputTerminated.PreviousOutPoint].state) +} + // TestMempoolLookup checks that the method `mempoolLookup` works as expected. func TestMempoolLookup(t *testing.T) { t.Parallel()