sweep: delete pending inputs based on their states

This commit uniforms and put the deletion of pending inputs in a single
point.
This commit is contained in:
yyforyongyu 2023-10-24 13:47:14 +08:00
parent 47478718d4
commit a263d68fb9
No known key found for this signature in database
GPG key ID: 9BCD95C4FF296868
3 changed files with 191 additions and 29 deletions

View file

@ -116,6 +116,16 @@ const (
// StateSwept is the final state of a pending input. This is set when
// the input has been successfully swept.
StateSwept
// StateExcluded is the state of a pending input that has been excluded
// and can no longer be swept. For instance, when one of the three
// anchor sweeping transactions confirmed, the remaining two will be
// excluded.
StateExcluded
// StateFailed is the state when a pending input has too many failed
// publish atttempts or unknown broadcast error is returned.
StateFailed
)
// String gives a human readable text for the sweep states.
@ -136,6 +146,12 @@ func (s SweepState) String() string {
case StateSwept:
return "Swept"
case StateExcluded:
return "Excluded"
case StateFailed:
return "Failed"
default:
return "Unknown"
}
@ -181,6 +197,21 @@ func (p *pendingInput) parameters() Params {
return p.params
}
// terminated returns a boolean indicating whether the input has reached a
// final state.
func (p *pendingInput) terminated() bool {
switch p.state {
// If the input has reached a final state, that it's either
// been swept, or failed, or excluded, we will remove it from
// our sweeper.
case StateFailed, StateSwept, StateExcluded:
return true
default:
return false
}
}
// pendingInputs is a type alias for a set of pending inputs.
type pendingInputs = map[wire.OutPoint]*pendingInput
@ -609,6 +640,12 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
log.Debugf("Sweep ticker started")
for {
// Clean inputs, which will remove inputs that are swept,
// failed, or excluded from the sweeper and return inputs that
// are either new or has been published but failed back, which
// will be retried again here.
inputs := s.updateSweeperInputs()
select {
// A new inputs is offered to the sweeper. We check to see if
// we are already trying to sweep this input and if not, set up
@ -637,8 +674,11 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
// The timer expires and we are going to (re)sweep.
case <-ticker.C:
log.Debugf("Sweep ticker ticks, attempt sweeping...")
s.handleSweep()
log.Debugf("Sweep ticker ticks, attempt sweeping %d "+
"inputs", len(inputs))
// Sweep the remaining pending inputs.
s.sweepPendingInputs(inputs)
// A new block comes in, update the bestHeight.
case epoch, ok := <-blockEpochs:
@ -676,11 +716,22 @@ func (s *UtxoSweeper) removeExclusiveGroup(group uint64) {
continue
}
// Skip inputs that are already terminated.
if input.terminated() {
log.Tracef("Skipped sending error result for "+
"input %v, state=%v", outpoint, input.state)
continue
}
// Signal result channels.
s.signalAndRemove(&outpoint, Result{
s.signalResult(input, Result{
Err: ErrExclusiveGroupSpend,
})
// Update the input's state as it can no longer be swept.
input.state = StateExcluded
// Remove all unconfirmed transactions from the wallet which
// spend the passed outpoint of the same exclusive group.
outpoints := map[wire.OutPoint]struct{}{
@ -757,21 +808,19 @@ func (s *UtxoSweeper) sweepCluster(cluster inputCluster) error {
})
}
// signalAndRemove notifies the listeners of the final result of the input
// sweep. It cancels any pending spend notification and removes the input from
// the list of pending inputs. When this function returns, the sweeper has
// completely forgotten about the input.
func (s *UtxoSweeper) signalAndRemove(outpoint *wire.OutPoint, result Result) {
pendInput := s.pendingInputs[*outpoint]
listeners := pendInput.listeners
// signalResult notifies the listeners of the final result of the input sweep.
// It also cancels any pending spend notification.
func (s *UtxoSweeper) signalResult(pi *pendingInput, result Result) {
op := pi.OutPoint()
listeners := pi.listeners
if result.Err == nil {
log.Debugf("Dispatching sweep success for %v to %v listeners",
outpoint, len(listeners),
op, len(listeners),
)
} else {
log.Debugf("Dispatching sweep error for %v to %v listeners: %v",
outpoint, len(listeners), result.Err,
op, len(listeners), result.Err,
)
}
@ -783,14 +832,11 @@ func (s *UtxoSweeper) signalAndRemove(outpoint *wire.OutPoint, result Result) {
// Cancel spend notification with chain notifier. This is not necessary
// in case of a success, except for that a reorg could still happen.
if pendInput.ntfnRegCancel != nil {
log.Debugf("Canceling spend ntfn for %v", outpoint)
if pi.ntfnRegCancel != nil {
log.Debugf("Canceling spend ntfn for %v", op)
pendInput.ntfnRegCancel()
pi.ntfnRegCancel()
}
// Inputs are no longer pending after result has been sent.
delete(s.pendingInputs, *outpoint)
}
// getInputLists goes through the given inputs and constructs multiple distinct
@ -996,9 +1042,12 @@ func (s *UtxoSweeper) markInputsPendingPublish(tr *TxRecord,
s.cfg.MaxSweepAttempts)
// Signal result channels sweep result.
s.signalAndRemove(&input.PreviousOutPoint, Result{
s.signalResult(pi, Result{
Err: ErrTooManyAttempts,
})
// Mark the input as failed.
pi.state = StateFailed
}
}
@ -1409,7 +1458,7 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) {
)
if err != nil {
err := fmt.Errorf("wait for spend: %w", err)
s.signalAndRemove(&outpoint, Result{Err: err})
s.signalResult(pi, Result{Err: err})
return
}
@ -1526,8 +1575,10 @@ func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) error {
// This input may already been marked as swept by a previous
// spend notification, which is likely to happen as one sweep
// transaction usually sweeps multiple inputs.
if input.state == StateSwept {
log.Tracef("input %v already swept", outpoint)
if input.terminated() {
log.Tracef("Skipped sending swept result for input %v,"+
" state=%v", outpoint, input.state)
continue
}
@ -1536,13 +1587,13 @@ func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) error {
// Return either a nil or a remote spend result.
var err error
if !isOurTx {
log.Warnf("Input=%v was spent by remote or third "+
"party in tx=%v", outpoint, tx.TxHash())
err = ErrRemoteSpend
}
// Signal result channels.
//
// TODO(yy): don't remove it here.
s.signalAndRemove(&outpoint, Result{
s.signalResult(input, Result{
Tx: tx,
Err: err,
})
@ -1556,14 +1607,64 @@ func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) error {
return nil
}
// handleSweep is called when the ticker fires. It will create clusters and
// attempt to create and publish the sweeping transactions.
func (s *UtxoSweeper) handleSweep() {
// updateSweeperInputs updates the sweeper's internal state and returns a map
// of inputs to be swept. It will remove the inputs that are in final states,
// and returns a map of inputs that have either StateInit or
// StatePublishFailed.
func (s *UtxoSweeper) updateSweeperInputs() pendingInputs {
// Create a map of inputs to be swept.
inputs := make(pendingInputs)
// Iterate the pending inputs and update the sweeper's state.
//
// TODO(yy): sweeper is made to communicate via go channels, so no
// locks are needed to access the map. However, it'd be safer if we
// turn this pendingInputs into a SyncMap in case we wanna add
// concurrent access to the map in the future.
for op, input := range s.pendingInputs {
// If the input has reached a final state, that it's either
// been swept, or failed, or excluded, we will remove it from
// our sweeper.
if input.terminated() {
log.Debugf("Removing input(State=%v) %v from sweeper",
input.state, op)
delete(s.pendingInputs, op)
continue
}
// If this input has been included in a sweep tx that's not
// published yet, we'd skip this input and wait for the sweep
// tx to be published.
if input.state == StatePendingPublish {
continue
}
// If this input has already been published, we will need to
// check the RBF condition before attempting another sweeping.
if input.state == StatePublished {
continue
}
// If this input is new or has been failed to be published,
// we'd retry it. The assumption here is that when an error is
// returned from `PublishTransaction`, it means the tx has
// failed to meet the policy, hence it's not in the mempool.
inputs[op] = input
}
return inputs
}
// sweepPendingInputs is called when the ticker fires. It will create clusters
// and attempt to create and publish the sweeping transactions.
func (s *UtxoSweeper) sweepPendingInputs(inputs pendingInputs) {
// We'll attempt to cluster all of our inputs with similar fee rates.
// Before attempting to sweep them, we'll sort them in descending fee
// rate order. We do this to ensure any inputs which have had their fee
// rate bumped are broadcast first in order enforce the RBF policy.
inputClusters := s.cfg.Aggregator.ClusterInputs(s.pendingInputs)
inputClusters := s.cfg.Aggregator.ClusterInputs(inputs)
sort.Slice(inputClusters, func(i, j int) bool {
return inputClusters[i].sweepFeeRate >
inputClusters[j].sweepFeeRate

View file

@ -2251,3 +2251,60 @@ func TestMempoolLookup(t *testing.T) {
mockMempool.AssertExpectations(t)
}
// TestUpdateSweeperInputs checks that the method `updateSweeperInputs` will
// properly update the inputs based on their states.
func TestUpdateSweeperInputs(t *testing.T) {
t.Parallel()
require := require.New(t)
// Create a test sweeper.
s := New(nil)
// Create a list of inputs using all the states.
input0 := &pendingInput{state: StateInit}
input1 := &pendingInput{state: StatePendingPublish}
input2 := &pendingInput{state: StatePublished}
input3 := &pendingInput{state: StatePublishFailed}
input4 := &pendingInput{state: StateSwept}
input5 := &pendingInput{state: StateExcluded}
input6 := &pendingInput{state: StateFailed}
// Add the inputs to the sweeper. After the update, we should see the
// terminated inputs being removed.
s.pendingInputs = map[wire.OutPoint]*pendingInput{
{Index: 0}: input0,
{Index: 1}: input1,
{Index: 2}: input2,
{Index: 3}: input3,
{Index: 4}: input4,
{Index: 5}: input5,
{Index: 6}: input6,
}
// We expect the inputs with `StateSwept`, `StateExcluded`, and
// `StateFailed` to be removed.
expectedInputs := map[wire.OutPoint]*pendingInput{
{Index: 0}: input0,
{Index: 1}: input1,
{Index: 2}: input2,
{Index: 3}: input3,
}
// We expect only the inputs with `StateInit` and `StatePublishFailed`
// to be returned.
expectedReturn := map[wire.OutPoint]*pendingInput{
{Index: 0}: input0,
{Index: 3}: input3,
}
// Update the sweeper inputs.
inputs := s.updateSweeperInputs()
// Assert the returned inputs are as expected.
require.Equal(expectedReturn, inputs)
// Assert the sweeper inputs are as expected.
require.Equal(expectedInputs, s.pendingInputs)
}

View file

@ -99,6 +99,8 @@ func (m *MockNotifier) sendSpend(channel chan *chainntnfs.SpendDetail,
outpoint *wire.OutPoint,
spendingTx *wire.MsgTx) {
log.Debugf("Notifying spend of outpoint %v", outpoint)
spenderTxHash := spendingTx.TxHash()
channel <- &chainntnfs.SpendDetail{
SpenderTxHash: &spenderTxHash,
@ -188,6 +190,8 @@ func (m *MockNotifier) Stop() error {
func (m *MockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
_ []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) {
log.Debugf("RegisterSpendNtfn for outpoint %v", outpoint)
// Add channel to global spend ntfn map.
m.mutex.Lock()