mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-03-04 09:48:19 +01:00
sweep: make sure recovered inputs are retried
Previously, when a given input is found spent in the mempool, we'd mark it as Published and never offer it to the fee bumper. This is dangerous as the input will never be fee bumped. We now fix it by always initializing the input with state Init, and only use mempool to check for fee and fee rate. This changes the current restart behavior - as previously when a sweeping tx is broadcast, the node shuts down, when it starts again, the input will be offered to the sweeper again, but not to the fee bumper, which means the sweeping tx will stay in the mempool with the last-tried fee rate. After this change, after a restart, the input will be swept again, and the fee bumper will monitor its status. The restart will also behave like a fee bump if there's already an existing sweeping tx in the mempool.
This commit is contained in:
parent
4bd1a344b9
commit
74161f0d57
2 changed files with 32 additions and 33 deletions
|
@ -1230,21 +1230,26 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is a new input, and we want to query the mempool to see if this
|
// This is a new input, and we want to query the mempool to see if this
|
||||||
// input has already been spent. If so, we'll start the input with
|
// input has already been spent. If so, we'll start the input with the
|
||||||
// state Published and attach the RBFInfo.
|
// RBFInfo.
|
||||||
state, rbfInfo := s.decideStateAndRBFInfo(input.input.OutPoint())
|
rbfInfo := s.decideRBFInfo(input.input.OutPoint())
|
||||||
|
|
||||||
// Create a new pendingInput and initialize the listeners slice with
|
// Create a new pendingInput and initialize the listeners slice with
|
||||||
// the passed in result channel. If this input is offered for sweep
|
// the passed in result channel. If this input is offered for sweep
|
||||||
// again, the result channel will be appended to this slice.
|
// again, the result channel will be appended to this slice.
|
||||||
pi = &SweeperInput{
|
pi = &SweeperInput{
|
||||||
state: state,
|
state: Init,
|
||||||
listeners: []chan Result{input.resultChan},
|
listeners: []chan Result{input.resultChan},
|
||||||
Input: input.input,
|
Input: input.input,
|
||||||
params: input.params,
|
params: input.params,
|
||||||
rbf: rbfInfo,
|
rbf: rbfInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set the starting fee rate if a previous sweeping tx is found.
|
||||||
|
rbfInfo.WhenSome(func(info RBFInfo) {
|
||||||
|
pi.params.StartingFeeRate = fn.Some(info.FeeRate)
|
||||||
|
})
|
||||||
|
|
||||||
// Set the acutal deadline height.
|
// Set the acutal deadline height.
|
||||||
pi.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr(
|
pi.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr(
|
||||||
s.calculateDefaultDeadline(pi),
|
s.calculateDefaultDeadline(pi),
|
||||||
|
@ -1277,13 +1282,12 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// decideStateAndRBFInfo queries the mempool to see whether the given input has
|
// decideRBFInfo queries the mempool to see whether the given input has already
|
||||||
// already been spent. If so, the state Published will be returned, otherwise
|
// been spent. When spent, it will query the sweeper store to fetch the fee info
|
||||||
// state Init. When spent, it will query the sweeper store to fetch the fee
|
// of the spending transction, and construct an RBFInfo based on it. Suppose an
|
||||||
// info of the spending transction, and construct an RBFInfo based on it.
|
// error occurs, fn.None is returned.
|
||||||
// Suppose an error occurs, fn.None is returned.
|
func (s *UtxoSweeper) decideRBFInfo(
|
||||||
func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
|
op wire.OutPoint) fn.Option[RBFInfo] {
|
||||||
SweepState, fn.Option[RBFInfo]) {
|
|
||||||
|
|
||||||
// Check if we can find the spending tx of this input in mempool.
|
// Check if we can find the spending tx of this input in mempool.
|
||||||
txOption := s.mempoolLookup(op)
|
txOption := s.mempoolLookup(op)
|
||||||
|
@ -1301,7 +1305,7 @@ func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
|
||||||
// - for neutrino we don't have a mempool.
|
// - for neutrino we don't have a mempool.
|
||||||
// - for btcd below v0.24.1 we don't have `gettxspendingprevout`.
|
// - for btcd below v0.24.1 we don't have `gettxspendingprevout`.
|
||||||
if tx == nil {
|
if tx == nil {
|
||||||
return Init, fn.None[RBFInfo]()
|
return fn.None[RBFInfo]()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Otherwise the input is already spent in the mempool, so eventually
|
// Otherwise the input is already spent in the mempool, so eventually
|
||||||
|
@ -1313,12 +1317,15 @@ func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
|
||||||
txid := tx.TxHash()
|
txid := tx.TxHash()
|
||||||
tr, err := s.cfg.Store.GetTx(txid)
|
tr, err := s.cfg.Store.GetTx(txid)
|
||||||
|
|
||||||
|
log.Debugf("Found spending tx %v in mempool for input %v", tx.TxHash(),
|
||||||
|
op)
|
||||||
|
|
||||||
// If the tx is not found in the store, it means it's not broadcast by
|
// If the tx is not found in the store, it means it's not broadcast by
|
||||||
// us, hence we can't find the fee info. This is fine as, later on when
|
// us, hence we can't find the fee info. This is fine as, later on when
|
||||||
// this tx is confirmed, we will remove the input from our inputs.
|
// this tx is confirmed, we will remove the input from our inputs.
|
||||||
if errors.Is(err, ErrTxNotFound) {
|
if errors.Is(err, ErrTxNotFound) {
|
||||||
log.Warnf("Spending tx %v not found in sweeper store", txid)
|
log.Warnf("Spending tx %v not found in sweeper store", txid)
|
||||||
return Published, fn.None[RBFInfo]()
|
return fn.None[RBFInfo]()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exit if we get an db error.
|
// Exit if we get an db error.
|
||||||
|
@ -1326,7 +1333,7 @@ func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
|
||||||
log.Errorf("Unable to get tx %v from sweeper store: %v",
|
log.Errorf("Unable to get tx %v from sweeper store: %v",
|
||||||
txid, err)
|
txid, err)
|
||||||
|
|
||||||
return Published, fn.None[RBFInfo]()
|
return fn.None[RBFInfo]()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prepare the fee info and return it.
|
// Prepare the fee info and return it.
|
||||||
|
@ -1336,7 +1343,7 @@ func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
|
||||||
FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
|
FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
|
||||||
})
|
})
|
||||||
|
|
||||||
return Published, rbf
|
return rbf
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleExistingInput processes an input that is already known to the sweeper.
|
// handleExistingInput processes an input that is already known to the sweeper.
|
||||||
|
|
|
@ -497,10 +497,9 @@ func TestUpdateSweeperInputs(t *testing.T) {
|
||||||
require.Equal(expectedInputs, s.inputs)
|
require.Equal(expectedInputs, s.inputs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestDecideStateAndRBFInfo checks that the expected state and RBFInfo are
|
// TestDecideRBFInfo checks that the expected RBFInfo is returned based on
|
||||||
// returned based on whether this input can be found both in mempool and the
|
// whether this input can be found both in mempool and the sweeper store.
|
||||||
// sweeper store.
|
func TestDecideRBFInfo(t *testing.T) {
|
||||||
func TestDecideStateAndRBFInfo(t *testing.T) {
|
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
require := require.New(t)
|
require := require.New(t)
|
||||||
|
@ -524,11 +523,9 @@ func TestDecideStateAndRBFInfo(t *testing.T) {
|
||||||
mockMempool.On("LookupInputMempoolSpend", op).Return(
|
mockMempool.On("LookupInputMempoolSpend", op).Return(
|
||||||
fn.None[wire.MsgTx]()).Once()
|
fn.None[wire.MsgTx]()).Once()
|
||||||
|
|
||||||
// Since the mempool lookup failed, we exepect state Init and no
|
// Since the mempool lookup failed, we expect no RBFInfo.
|
||||||
// RBFInfo.
|
rbf := s.decideRBFInfo(op)
|
||||||
state, rbf := s.decideStateAndRBFInfo(op)
|
|
||||||
require.True(rbf.IsNone())
|
require.True(rbf.IsNone())
|
||||||
require.Equal(Init, state)
|
|
||||||
|
|
||||||
// Mock the mempool lookup to return a tx three times as we are calling
|
// Mock the mempool lookup to return a tx three times as we are calling
|
||||||
// attachAvailableRBFInfo three times.
|
// attachAvailableRBFInfo three times.
|
||||||
|
@ -539,19 +536,17 @@ func TestDecideStateAndRBFInfo(t *testing.T) {
|
||||||
// Mock the store to return an error saying the tx cannot be found.
|
// Mock the store to return an error saying the tx cannot be found.
|
||||||
mockStore.On("GetTx", tx.TxHash()).Return(nil, ErrTxNotFound).Once()
|
mockStore.On("GetTx", tx.TxHash()).Return(nil, ErrTxNotFound).Once()
|
||||||
|
|
||||||
// Although the db lookup failed, we expect the state to be Published.
|
// The db lookup failed, we expect no RBFInfo.
|
||||||
state, rbf = s.decideStateAndRBFInfo(op)
|
rbf = s.decideRBFInfo(op)
|
||||||
require.True(rbf.IsNone())
|
require.True(rbf.IsNone())
|
||||||
require.Equal(Published, state)
|
|
||||||
|
|
||||||
// Mock the store to return a db error.
|
// Mock the store to return a db error.
|
||||||
dummyErr := errors.New("dummy error")
|
dummyErr := errors.New("dummy error")
|
||||||
mockStore.On("GetTx", tx.TxHash()).Return(nil, dummyErr).Once()
|
mockStore.On("GetTx", tx.TxHash()).Return(nil, dummyErr).Once()
|
||||||
|
|
||||||
// Although the db lookup failed, we expect the state to be Published.
|
// The db lookup failed, we expect no RBFInfo.
|
||||||
state, rbf = s.decideStateAndRBFInfo(op)
|
rbf = s.decideRBFInfo(op)
|
||||||
require.True(rbf.IsNone())
|
require.True(rbf.IsNone())
|
||||||
require.Equal(Published, state)
|
|
||||||
|
|
||||||
// Mock the store to return a record.
|
// Mock the store to return a record.
|
||||||
tr := &TxRecord{
|
tr := &TxRecord{
|
||||||
|
@ -561,7 +556,7 @@ func TestDecideStateAndRBFInfo(t *testing.T) {
|
||||||
mockStore.On("GetTx", tx.TxHash()).Return(tr, nil).Once()
|
mockStore.On("GetTx", tx.TxHash()).Return(tr, nil).Once()
|
||||||
|
|
||||||
// Call the method again.
|
// Call the method again.
|
||||||
state, rbf = s.decideStateAndRBFInfo(op)
|
rbf = s.decideRBFInfo(op)
|
||||||
|
|
||||||
// Assert that the RBF info is returned.
|
// Assert that the RBF info is returned.
|
||||||
rbfInfo := fn.Some(RBFInfo{
|
rbfInfo := fn.Some(RBFInfo{
|
||||||
|
@ -570,9 +565,6 @@ func TestDecideStateAndRBFInfo(t *testing.T) {
|
||||||
FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
|
FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
|
||||||
})
|
})
|
||||||
require.Equal(rbfInfo, rbf)
|
require.Equal(rbfInfo, rbf)
|
||||||
|
|
||||||
// Assert the state is updated.
|
|
||||||
require.Equal(Published, state)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestMarkInputFatal checks that the input is marked as expected.
|
// TestMarkInputFatal checks that the input is marked as expected.
|
||||||
|
|
Loading…
Add table
Reference in a new issue