sweep: refactor markInputsPendingPublish to take InputSet

This commit changes `markInputsPendingPublish` to take `InputSet` only.
This is needed for the following commits as we won't be able to know the
tx being created beforehand, yet we still want to make sure these inputs
won't be grouped to another input set as it complicates our RBF process.
This commit is contained in:
yyforyongyu 2024-02-21 16:00:05 +08:00
parent 6202c59cb3
commit bd5eec8e1f
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
2 changed files with 40 additions and 78 deletions

View File

@ -794,10 +794,7 @@ func (s *UtxoSweeper) sweep(set InputSet) error {
// Reschedule the inputs that we just tried to sweep. This is done in // Reschedule the inputs that we just tried to sweep. This is done in
// case the following publish fails, we'd like to update the inputs' // case the following publish fails, we'd like to update the inputs'
// publish attempts and rescue them in the next sweep. // publish attempts and rescue them in the next sweep.
err = s.markInputsPendingPublish(tr, tx.TxIn) s.markInputsPendingPublish(set)
if err != nil {
return err
}
log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v", log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v",
tx.TxHash(), len(tx.TxIn), s.currentHeight) tx.TxHash(), len(tx.TxIn), s.currentHeight)
@ -832,31 +829,19 @@ func (s *UtxoSweeper) sweep(set InputSet) error {
return nil return nil
} }
// markInputsPendingPublish saves the sweeping tx to db and updates the pending // markInputsPendingPublish updates the pending inputs with the given tx
// inputs with the given tx inputs. It also increments the `publishAttempts`. // inputs. It also increments the `publishAttempts`.
func (s *UtxoSweeper) markInputsPendingPublish(tr *TxRecord, func (s *UtxoSweeper) markInputsPendingPublish(set InputSet) {
inputs []*wire.TxIn) error {
// Add tx to db before publication, so that we will always know that a
// spend by this tx is ours. Otherwise if the publish doesn't return,
// but did publish, we'd lose track of this tx. Even republication on
// startup doesn't prevent this, because that call returns a double
// spend error then and would also not add the hash to the store.
err := s.cfg.Store.StoreTx(tr)
if err != nil {
return fmt.Errorf("store tx: %w", err)
}
// Reschedule sweep. // Reschedule sweep.
for _, input := range inputs { for _, input := range set.Inputs() {
pi, ok := s.pendingInputs[input.PreviousOutPoint] pi, ok := s.pendingInputs[*input.OutPoint()]
if !ok { if !ok {
// It could be that this input is an additional wallet // It could be that this input is an additional wallet
// input that was attached. In that case there also // input that was attached. In that case there also
// isn't a pending input to update. // isn't a pending input to update.
log.Debugf("Skipped marking input as pending "+ log.Debugf("Skipped marking input as pending "+
"published: %v not found in pending inputs", "published: %v not found in pending inputs",
input.PreviousOutPoint) input.OutPoint())
continue continue
} }
@ -868,7 +853,7 @@ func (s *UtxoSweeper) markInputsPendingPublish(tr *TxRecord,
if pi.terminated() { if pi.terminated() {
log.Errorf("Expect input %v to not have terminated "+ log.Errorf("Expect input %v to not have terminated "+
"state, instead it has %v", "state, instead it has %v",
input.PreviousOutPoint, pi.state) input.OutPoint, pi.state)
continue continue
} }
@ -876,19 +861,9 @@ func (s *UtxoSweeper) markInputsPendingPublish(tr *TxRecord,
// Update the input's state. // Update the input's state.
pi.state = StatePendingPublish pi.state = StatePendingPublish
// Record the fees and fee rate of this tx to prepare possible
// RBF.
pi.rbf = fn.Some(RBFInfo{
Txid: tr.Txid,
FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
Fee: btcutil.Amount(tr.Fee),
})
// Record another publish attempt. // Record another publish attempt.
pi.publishAttempts++ pi.publishAttempts++
} }
return nil
} }
// markInputsPublished updates the sweeping tx in db and marks the list of // markInputsPublished updates the sweeping tx in db and marks the list of

View File

@ -1897,87 +1897,74 @@ func TestMarkInputsPendingPublish(t *testing.T) {
require := require.New(t) require := require.New(t)
// Create a mock sweeper store.
mockStore := NewMockSweeperStore()
// Create a test TxRecord and a dummy error.
dummyTR := &TxRecord{}
dummyErr := errors.New("dummy error")
// Create a test sweeper. // Create a test sweeper.
s := New(&UtxoSweeperConfig{ s := New(&UtxoSweeperConfig{})
Store: mockStore,
}) // Create a mock input set.
set := &MockInputSet{}
defer set.AssertExpectations(t)
// Create three testing inputs. // Create three testing inputs.
// //
// inputNotExist specifies an input that's not found in the sweeper's // inputNotExist specifies an input that's not found in the sweeper's
// `pendingInputs` map. // `pendingInputs` map.
inputNotExist := &wire.TxIn{ inputNotExist := &input.MockInput{}
PreviousOutPoint: wire.OutPoint{Index: 1}, defer inputNotExist.AssertExpectations(t)
}
inputNotExist.On("OutPoint").Return(&wire.OutPoint{Index: 0})
// inputInit specifies a newly created input. // inputInit specifies a newly created input.
inputInit := &wire.TxIn{ inputInit := &input.MockInput{}
PreviousOutPoint: wire.OutPoint{Index: 2}, defer inputInit.AssertExpectations(t)
}
s.pendingInputs[inputInit.PreviousOutPoint] = &pendingInput{ inputInit.On("OutPoint").Return(&wire.OutPoint{Index: 1})
s.pendingInputs[*inputInit.OutPoint()] = &pendingInput{
state: StateInit, state: StateInit,
} }
// inputPendingPublish specifies an input that's about to be published. // inputPendingPublish specifies an input that's about to be published.
inputPendingPublish := &wire.TxIn{ inputPendingPublish := &input.MockInput{}
PreviousOutPoint: wire.OutPoint{Index: 3}, defer inputPendingPublish.AssertExpectations(t)
}
s.pendingInputs[inputPendingPublish.PreviousOutPoint] = &pendingInput{ inputPendingPublish.On("OutPoint").Return(&wire.OutPoint{Index: 2})
s.pendingInputs[*inputPendingPublish.OutPoint()] = &pendingInput{
state: StatePendingPublish, state: StatePendingPublish,
} }
// inputTerminated specifies an input that's terminated. // inputTerminated specifies an input that's terminated.
inputTerminated := &wire.TxIn{ inputTerminated := &input.MockInput{}
PreviousOutPoint: wire.OutPoint{Index: 4}, defer inputTerminated.AssertExpectations(t)
}
s.pendingInputs[inputTerminated.PreviousOutPoint] = &pendingInput{ inputTerminated.On("OutPoint").Return(&wire.OutPoint{Index: 3})
s.pendingInputs[*inputTerminated.OutPoint()] = &pendingInput{
state: StateExcluded, state: StateExcluded,
} }
// First, check that when an error is returned from db, it's properly
// returned here.
mockStore.On("StoreTx", dummyTR).Return(dummyErr).Once()
err := s.markInputsPendingPublish(dummyTR, nil)
require.ErrorIs(err, dummyErr)
// Then, check that the target input has will be correctly marked as
// published.
//
// Mock the store to return nil
mockStore.On("StoreTx", dummyTR).Return(nil).Once()
// Mark the test inputs. We expect the non-exist input and the // Mark the test inputs. We expect the non-exist input and the
// inputTerminated to be skipped, and the rest to be marked as pending // inputTerminated to be skipped, and the rest to be marked as pending
// publish. // publish.
err = s.markInputsPendingPublish(dummyTR, []*wire.TxIn{ set.On("Inputs").Return([]input.Input{
inputNotExist, inputInit, inputPendingPublish, inputTerminated, inputNotExist, inputInit, inputPendingPublish, inputTerminated,
}) })
require.NoError(err) s.markInputsPendingPublish(set)
// We expect unchanged number of pending inputs. // We expect unchanged number of pending inputs.
require.Len(s.pendingInputs, 3) require.Len(s.pendingInputs, 3)
// We expect the init input's state to become pending publish. // We expect the init input's state to become pending publish.
require.Equal(StatePendingPublish, require.Equal(StatePendingPublish,
s.pendingInputs[inputInit.PreviousOutPoint].state) s.pendingInputs[*inputInit.OutPoint()].state)
// We expect the pending-publish to stay unchanged. // We expect the pending-publish to stay unchanged.
require.Equal(StatePendingPublish, require.Equal(StatePendingPublish,
s.pendingInputs[inputPendingPublish.PreviousOutPoint].state) s.pendingInputs[*inputPendingPublish.OutPoint()].state)
// We expect the terminated to stay unchanged. // We expect the terminated to stay unchanged.
require.Equal(StateExcluded, require.Equal(StateExcluded,
s.pendingInputs[inputTerminated.PreviousOutPoint].state) s.pendingInputs[*inputTerminated.OutPoint()].state)
// Assert mocked statements are executed as expected.
mockStore.AssertExpectations(t)
} }
// TestMarkInputsPublished checks that given a list of inputs with different // TestMarkInputsPublished checks that given a list of inputs with different