multi: query mempool spend when a new input is received

This commit changes how a new input sweep request is handled - now we
will query the mempool and see if it's already been spent. If so, we'll
update its state as we may need to RBF this input.
This commit is contained in:
yyforyongyu 2023-10-24 13:14:21 +08:00
parent 6a2e3fb203
commit 47478718d4
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
4 changed files with 180 additions and 5 deletions

41
chainntnfs/mocks.go Normal file
View File

@ -0,0 +1,41 @@
package chainntnfs
import (
"github.com/btcsuite/btcd/wire"
"github.com/stretchr/testify/mock"
)
// MockMempoolWatcher is a mock implementation of the MempoolWatcher interface.
// This is used by other subsystems to mock the behavior of the mempool
// watcher.
type MockMempoolWatcher struct {
mock.Mock
}
// NewMockMempoolWatcher returns a new instance of a mock mempool watcher.
func NewMockMempoolWatcher() *MockMempoolWatcher {
return &MockMempoolWatcher{}
}
// Compile-time check to ensure MockMempoolWatcher implements MempoolWatcher.
var _ MempoolWatcher = (*MockMempoolWatcher)(nil)
// SubscribeMempoolSpent implements the MempoolWatcher interface.
func (m *MockMempoolWatcher) SubscribeMempoolSpent(
op wire.OutPoint) (*MempoolSpendEvent, error) {
args := m.Called(op)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*MempoolSpendEvent), args.Error(1)
}
// CancelMempoolSpendEvent implements the MempoolWatcher interface.
func (m *MockMempoolWatcher) CancelMempoolSpendEvent(
sub *MempoolSpendEvent) {
m.Called(sub)
}

View File

@ -1073,6 +1073,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
Signer: cc.Wallet.Cfg.Signer,
Wallet: newSweeperWallet(cc.Wallet),
TickerDuration: cfg.Sweeper.BatchWindowDuration,
Mempool: cc.MempoolNotifier,
Notifier: cc.ChainNotifier,
Store: sweeperStore,
MaxInputsPerTx: sweep.DefaultMaxInputsPerTx,

View File

@ -304,6 +304,10 @@ type UtxoSweeperConfig struct {
// certain on-chain events.
Notifier chainntnfs.ChainNotifier
// Mempool is the mempool watcher that will be used to query whether a
// given input is already being spent by a transaction in the mempool.
Mempool chainntnfs.MempoolWatcher
// Store stores the published sweeper txes.
Store SweeperStore
@ -1319,15 +1323,59 @@ func (s *UtxoSweeper) ListSweeps() ([]chainhash.Hash, error) {
return s.cfg.Store.ListSweeps()
}
// mempoolLookup takes an input's outpoint and queries the mempool to see
// whether it's already been spent in a transaction found in the mempool.
// Returns the transaction if found.
func (s *UtxoSweeper) mempoolLookup(op wire.OutPoint) (*wire.MsgTx, bool) {
// For neutrino backend, there's no mempool available, so we exit
// early.
if s.cfg.Mempool == nil {
log.Debugf("Skipping mempool lookup for %v, no mempool ", op)
return nil, false
}
// Make a subscription to the mempool. If this outpoint is already
// spent in mempool, we should get a spending event back immediately.
mempoolSpent, err := s.cfg.Mempool.SubscribeMempoolSpent(op)
if err != nil {
log.Errorf("Unable to subscribe to mempool spend for input "+
"%v: %v", op, err)
return nil, false
}
// We want to cancel this subscription in the end as we are only
// interested in a one-time query and this subscription won't be
// listened once this method returns.
defer s.cfg.Mempool.CancelMempoolSpendEvent(mempoolSpent)
// Do a non-blocking read on the spent event channel.
select {
case details := <-mempoolSpent.Spend:
log.Debugf("Found mempool spend of input %s in tx=%s",
op, details.SpenderTxHash)
// Found the spending transaction in mempool. This means we
// need to consider RBF constraints if we want to include this
// input in a new sweeping transaction.
return details.SpendingTx, true
default:
}
return nil, false
}
// handleNewInput processes a new input by registering spend notification and
// scheduling sweeping for it.
func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) {
outpoint := *input.input.OutPoint()
pendInput, pending := s.pendingInputs[outpoint]
pi, pending := s.pendingInputs[outpoint]
if pending {
log.Debugf("Already pending input %v received", outpoint)
s.handleExistingInput(input, pendInput)
s.handleExistingInput(input, pi)
return
}
@ -1335,14 +1383,22 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) {
// Create a new pendingInput and initialize the listeners slice with
// the passed in result channel. If this input is offered for sweep
// again, the result channel will be appended to this slice.
pendInput = &pendingInput{
pi = &pendingInput{
state: StateInit,
listeners: []chan Result{input.resultChan},
Input: input.input,
minPublishHeight: s.currentHeight,
params: input.params,
}
s.pendingInputs[outpoint] = pendInput
// If the input is already spent in the mempool, update its state to
// StatePublished.
_, spent := s.mempoolLookup(outpoint)
if spent {
pi.state = StatePublished
}
s.pendingInputs[outpoint] = pi
log.Tracef("input %v added to pendingInputs", outpoint)
// Start watching for spend of this input, either by us or the remote
@ -1358,7 +1414,7 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) {
return
}
pendInput.ntfnRegCancel = cancel
pi.ntfnRegCancel = cancel
}
// handleExistingInput processes an input that is already known to the sweeper.

View File

@ -13,6 +13,7 @@ import (
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
@ -2174,3 +2175,79 @@ func TestMarkInputsPublishFailed(t *testing.T) {
// Assert mocked statements are executed as expected.
mockStore.AssertExpectations(t)
}
// TestMempoolLookup checks that the method `mempoolLookup` works as expected.
func TestMempoolLookup(t *testing.T) {
t.Parallel()
require := require.New(t)
// Create a test outpoint.
op := wire.OutPoint{Index: 1}
// Create a mock mempool watcher.
mockMempool := chainntnfs.NewMockMempoolWatcher()
// Create a test sweeper without a mempool.
s := New(&UtxoSweeperConfig{})
// Since we don't have a mempool, we expect the call to return an empty
// transaction plus a false value indicating it's not found.
tx, found := s.mempoolLookup(op)
require.Nil(tx)
require.False(found)
// Re-create the sweeper with the mocked mempool watcher.
s = New(&UtxoSweeperConfig{
Mempool: mockMempool,
})
// Create a mempool spend event to be returned by the mempool watcher.
spendChan := make(chan *chainntnfs.SpendDetail, 1)
spendEvent := &chainntnfs.MempoolSpendEvent{
Spend: spendChan,
}
// Mock the cancel subscription calls.
mockMempool.On("CancelMempoolSpendEvent", spendEvent)
// Mock the mempool watcher to return an error.
dummyErr := errors.New("dummy err")
mockMempool.On("SubscribeMempoolSpent", op).Return(nil, dummyErr).Once()
// We expect a nil tx and a false value to be returned.
//
// TODO(yy): this means the behavior of not having a mempool is the
// same as an erroneous mempool. The question is should we
// differentiate the two from their returned values?
tx, found = s.mempoolLookup(op)
require.Nil(tx)
require.False(found)
// Mock the mempool to subscribe to the outpoint.
mockMempool.On("SubscribeMempoolSpent", op).Return(
spendEvent, nil).Once()
// Without sending a spending details to the `spendChan`, we still
// expect a nil tx and a false value to be returned.
tx, found = s.mempoolLookup(op)
require.Nil(tx)
require.False(found)
// Send a dummy spending details to the `spendChan`.
dummyTx := &wire.MsgTx{}
spendChan <- &chainntnfs.SpendDetail{
SpendingTx: dummyTx,
}
// Mock the mempool to subscribe to the outpoint.
mockMempool.On("SubscribeMempoolSpent", op).Return(
spendEvent, nil).Once()
// Calling the loopup again, we expect the dummyTx to be returned.
tx, found = s.mempoolLookup(op)
require.Equal(dummyTx, tx)
require.True(found)
mockMempool.AssertExpectations(t)
}