sweep: handle unknown spent in processRecords

This commit refactors the `processRecords` to always handle the inputs
spent when processing the records. We now make sure to handle unknown
spends for all backends (previously only neutrino), and rely solely on
the spending notification to give us the onchain status of inputs.
This commit is contained in:
yyforyongyu 2025-01-24 04:45:10 +08:00
parent 61cec43951
commit 50bc191feb
No known key found for this signature in database
GPG key ID: 9BCD95C4FF296868
2 changed files with 297 additions and 89 deletions

View file

@ -897,8 +897,6 @@ func (t *TxPublisher) processRecords() {
// failedRecords stores a map of records which has inputs being spent
// by a third party.
//
// NOTE: this is only used for neutrino backend.
failedRecords := make(map[uint64]*monitorRecord)
// initialRecords stores a map of records which are being created and
@ -908,32 +906,55 @@ func (t *TxPublisher) processRecords() {
// visitor is a helper closure that visits each record and divides them
// into two groups.
visitor := func(requestID uint64, r *monitorRecord) error {
if r.tx == nil {
initialRecords[requestID] = r
return nil
}
log.Tracef("Checking monitor recordID=%v", requestID)
log.Tracef("Checking monitor recordID=%v for tx=%v", requestID,
r.tx.TxHash())
// Check whether the inputs have already been spent.
spends := t.getSpentInputs(r)
// If the tx is already confirmed, we can stop monitoring it.
if t.isConfirmed(r.tx.TxHash()) {
// If the any of the inputs has been spent, the record will be
// marked as failed or confirmed.
if len(spends) != 0 {
// When tx is nil, it means we haven't tried the initial
// broadcast yet the input is already spent. This could
// happen when the node shuts down, a previous sweeping
// tx confirmed, then the node comes back online and
// reoffers the inputs. Another case is the remote node
// spends the input quickly before we even attempt the
// sweep. In either case we will fail the record and let
// the sweeper handles it.
if r.tx == nil {
failedRecords[requestID] = r
return nil
}
// Check whether the inputs has been spent by a unknown
// tx.
if t.isThirdPartySpent(r, spends) {
failedRecords[requestID] = r
// Move to the next record.
return nil
}
// The tx is ours, we can move it to the confirmed queue
// and stop monitoring it.
confirmedRecords[requestID] = r
// Move to the next record.
return nil
}
// Check whether the inputs has been spent by a third party.
//
// NOTE: this check is only done for neutrino backend.
if t.isThirdPartySpent(r) {
failedRecords[requestID] = r
// This is the first time we see this record, so we put it in
// the initial queue.
if r.tx == nil {
initialRecords[requestID] = r
// Move to the next record.
return nil
}
// We can only get here when the inputs are not spent and a
// previous sweeping tx has been attempted. In this case we will
// perform an RBF on it in the current block.
feeBumpRecords[requestID] = r
// Return nil to move to the next record.
@ -1265,17 +1286,10 @@ func (t *TxPublisher) isConfirmed(txid chainhash.Hash) bool {
// isThirdPartySpent checks whether the inputs of the tx has already been spent
// by a third party. When a tx is not confirmed, yet its inputs has been spent,
// then it must be spent by a different tx other than the sweeping tx here.
//
// NOTE: this check is only performed for neutrino backend as it has no
// reliable way to tell a tx has been replaced.
func (t *TxPublisher) isThirdPartySpent(r *monitorRecord) bool {
// Skip this check for if this is not neutrino backend.
if !t.isNeutrinoBackend() {
return false
}
func (t *TxPublisher) isThirdPartySpent(r *monitorRecord,
spends map[wire.OutPoint]*wire.MsgTx) bool {
txid := r.tx.TxHash()
spends := t.getSpentInputs(r)
// Iterate all the spending txns and check if they match the sweeping
// tx.

View file

@ -1481,60 +1481,163 @@ func TestHandleFeeBumpTx(t *testing.T) {
require.True(t, found)
}
// TestProcessRecords validates processRecords behaves as expected.
func TestProcessRecords(t *testing.T) {
// TestProcessRecordsInitial validates processRecords behaves as expected when
// processing the initial broadcast.
func TestProcessRecordsInitial(t *testing.T) {
t.Parallel()
// Create a publisher using the mocks.
tp, m := createTestPublisher(t)
// Create testing objects.
requestID1 := uint64(1)
req1 := createTestBumpRequest()
tx1 := &wire.MsgTx{LockTime: 1}
txid1 := tx1.TxHash()
requestID := uint64(1)
req := createTestBumpRequest()
op := req.Inputs[0].OutPoint()
requestID2 := uint64(2)
req2 := createTestBumpRequest()
tx2 := &wire.MsgTx{LockTime: 2}
txid2 := tx2.TxHash()
// Create a monitor record that's confirmed.
recordConfirmed := &monitorRecord{
requestID: requestID1,
req: req1,
feeFunction: m.feeFunc,
tx: tx1,
// Mock RegisterSpendNtfn.
//
// Create the spending event that doesn't send an event.
se := &chainntnfs.SpendEvent{
Cancel: func() {},
}
m.wallet.On("GetTransactionDetails", &txid1).Return(
&lnwallet.TransactionDetail{
NumConfirmations: 1,
}, nil,
).Once()
m.notifier.On("RegisterSpendNtfn",
&op, mock.Anything, mock.Anything).Return(se, nil).Once()
// Create a monitor record that's not confirmed. We know it's not
// confirmed because the num of confirms is zero.
recordFeeBump := &monitorRecord{
requestID: requestID2,
req: req2,
feeFunction: m.feeFunc,
tx: tx2,
// Create a monitor record that's broadcast the first time.
record := &monitorRecord{
requestID: requestID,
req: req,
}
m.wallet.On("GetTransactionDetails", &txid2).Return(
&lnwallet.TransactionDetail{
NumConfirmations: 0,
}, nil,
).Once()
m.wallet.On("BackEnd").Return("test-backend").Once()
// Setup the initial publisher state by adding the records to the maps.
subscriberConfirmed := make(chan *BumpResult, 1)
tp.subscriberChans.Store(requestID1, subscriberConfirmed)
tp.records.Store(requestID1, recordConfirmed)
subscriber := make(chan *BumpResult, 1)
tp.subscriberChans.Store(requestID, subscriber)
tp.records.Store(requestID, record)
subscriberReplaced := make(chan *BumpResult, 1)
tp.subscriberChans.Store(requestID2, subscriberReplaced)
tp.records.Store(requestID2, recordFeeBump)
// The following methods should only be called once when creating the
// initial broadcast tx.
//
// Mock the signer to always return a valid script.
m.signer.On("ComputeInputScript", mock.Anything,
mock.Anything).Return(&input.Script{}, nil).Once()
// Mock the testmempoolaccept to return nil.
m.wallet.On("CheckMempoolAcceptance", mock.Anything).Return(nil).Once()
// Mock the wallet to publish successfully.
m.wallet.On("PublishTransaction",
mock.Anything, mock.Anything).Return(nil).Once()
// Call processRecords and expect the results are notified back.
tp.processRecords()
// We expect the published tx to be notified back.
select {
case <-time.After(time.Second):
t.Fatal("timeout waiting for subscriber")
case result := <-subscriber:
// We expect the result to be TxPublished.
require.Equal(t, TxPublished, result.Event)
// Expect the tx to be set but not the replaced tx.
require.NotNil(t, result.Tx)
require.Nil(t, result.ReplacedTx)
// No error should be set.
require.Nil(t, result.Err)
require.Equal(t, requestID, result.requestID)
}
}
// TestProcessRecordsInitialSpent validates processRecords behaves as expected
// when processing the initial broadcast when the input is spent.
func TestProcessRecordsInitialSpent(t *testing.T) {
t.Parallel()
// Create a publisher using the mocks.
tp, m := createTestPublisher(t)
// Create testing objects.
requestID := uint64(1)
req := createTestBumpRequest()
tx := &wire.MsgTx{LockTime: 1}
op := req.Inputs[0].OutPoint()
// Mock RegisterSpendNtfn.
se := createTestSpendEvent(tx)
m.notifier.On("RegisterSpendNtfn",
&op, mock.Anything, mock.Anything).Return(se, nil).Once()
// Create a monitor record that's broadcast the first time.
record := &monitorRecord{
requestID: requestID,
req: req,
}
// Setup the initial publisher state by adding the records to the maps.
subscriber := make(chan *BumpResult, 1)
tp.subscriberChans.Store(requestID, subscriber)
tp.records.Store(requestID, record)
// Call processRecords and expect the results are notified back.
tp.processRecords()
// We expect the published tx to be notified back.
select {
case <-time.After(time.Second):
t.Fatal("timeout waiting for subscriber")
case result := <-subscriber:
// We expect the result to be TxUnknownSpend.
require.Equal(t, TxUnknownSpend, result.Event)
// Expect the tx and the replaced tx to be nil.
require.Nil(t, result.Tx)
require.Nil(t, result.ReplacedTx)
// The error should be set.
require.ErrorIs(t, result.Err, ErrThirdPartySpent)
require.Equal(t, requestID, result.requestID)
}
}
// TestProcessRecordsFeeBump validates processRecords behaves as expected when
// processing fee bump records.
func TestProcessRecordsFeeBump(t *testing.T) {
t.Parallel()
// Create a publisher using the mocks.
tp, m := createTestPublisher(t)
// Create testing objects.
requestID := uint64(1)
req := createTestBumpRequest()
tx := &wire.MsgTx{LockTime: 1}
op := req.Inputs[0].OutPoint()
// Mock RegisterSpendNtfn.
//
// Create the spending event that doesn't send an event.
se := &chainntnfs.SpendEvent{
Cancel: func() {},
}
m.notifier.On("RegisterSpendNtfn",
&op, mock.Anything, mock.Anything).Return(se, nil).Once()
// Create a monitor record that's not confirmed. We know it's not
// confirmed because the `SpendEvent` is empty.
record := &monitorRecord{
requestID: requestID,
req: req,
feeFunction: m.feeFunc,
tx: tx,
}
// Setup the initial publisher state by adding the records to the maps.
subscriber := make(chan *BumpResult, 1)
tp.subscriberChans.Store(requestID, subscriber)
tp.records.Store(requestID, record)
// Create a test feerate and return it from the mock fee function.
feerate := chainfee.SatPerKWeight(1000)
@ -1560,40 +1663,131 @@ func TestProcessRecords(t *testing.T) {
// Call processRecords and expect the results are notified back.
tp.processRecords()
// We expect two results to be received. One for the confirmed tx and
// one for the replaced tx.
//
// Check the confirmed tx result.
select {
case <-time.After(time.Second):
t.Fatal("timeout waiting for subscriberConfirmed")
case result := <-subscriberConfirmed:
// We expect the result to be TxConfirmed.
require.Equal(t, TxConfirmed, result.Event)
require.Equal(t, tx1, result.Tx)
// No error should be set.
require.Nil(t, result.Err)
require.Equal(t, requestID1, result.requestID)
}
// Now check the replaced tx result.
// We expect the replaced tx to be notified back.
select {
case <-time.After(time.Second):
t.Fatal("timeout waiting for subscriberReplaced")
case result := <-subscriberReplaced:
case result := <-subscriber:
// We expect the result to be TxReplaced.
require.Equal(t, TxReplaced, result.Event)
// The new tx and old tx should be properly set.
require.NotEqual(t, tx2, result.Tx)
require.Equal(t, tx2, result.ReplacedTx)
require.NotEqual(t, tx, result.Tx)
require.Equal(t, tx, result.ReplacedTx)
// No error should be set.
require.Nil(t, result.Err)
require.Equal(t, requestID2, result.requestID)
require.Equal(t, requestID, result.requestID)
}
}
// TestProcessRecordsConfirmed validates processRecords behaves as expected when
// processing confirmed records.
func TestProcessRecordsConfirmed(t *testing.T) {
t.Parallel()
// Create a publisher using the mocks.
tp, m := createTestPublisher(t)
// Create testing objects.
requestID := uint64(1)
req := createTestBumpRequest()
tx := &wire.MsgTx{LockTime: 1}
op := req.Inputs[0].OutPoint()
// Mock RegisterSpendNtfn.
se := createTestSpendEvent(tx)
m.notifier.On("RegisterSpendNtfn",
&op, mock.Anything, mock.Anything).Return(se, nil).Once()
// Create a monitor record that's confirmed.
recordConfirmed := &monitorRecord{
requestID: requestID,
req: req,
feeFunction: m.feeFunc,
tx: tx,
}
// Setup the initial publisher state by adding the records to the maps.
subscriber := make(chan *BumpResult, 1)
tp.subscriberChans.Store(requestID, subscriber)
tp.records.Store(requestID, recordConfirmed)
// Create a test feerate and return it from the mock fee function.
feerate := chainfee.SatPerKWeight(1000)
m.feeFunc.On("FeeRate").Return(feerate)
// Call processRecords and expect the results are notified back.
tp.processRecords()
// Check the confirmed tx result.
select {
case <-time.After(time.Second):
t.Fatal("timeout waiting for subscriber")
case result := <-subscriber:
// We expect the result to be TxConfirmed.
require.Equal(t, TxConfirmed, result.Event)
require.Equal(t, tx, result.Tx)
// No error should be set.
require.Nil(t, result.Err)
require.Equal(t, requestID, result.requestID)
}
}
// TestProcessRecordsSpent validates processRecords behaves as expected when
// processing unknown spent records.
func TestProcessRecordsSpent(t *testing.T) {
t.Parallel()
// Create a publisher using the mocks.
tp, m := createTestPublisher(t)
// Create testing objects.
requestID := uint64(1)
req := createTestBumpRequest()
tx := &wire.MsgTx{LockTime: 1}
op := req.Inputs[0].OutPoint()
// Create a unknown tx.
txUnknown := &wire.MsgTx{LockTime: 2}
// Mock RegisterSpendNtfn.
se := createTestSpendEvent(txUnknown)
m.notifier.On("RegisterSpendNtfn",
&op, mock.Anything, mock.Anything).Return(se, nil).Once()
// Create a monitor record that's spent by txUnknown.
recordConfirmed := &monitorRecord{
requestID: requestID,
req: req,
feeFunction: m.feeFunc,
tx: tx,
}
// Setup the initial publisher state by adding the records to the maps.
subscriber := make(chan *BumpResult, 1)
tp.subscriberChans.Store(requestID, subscriber)
tp.records.Store(requestID, recordConfirmed)
// Call processRecords and expect the results are notified back.
tp.processRecords()
// Check the unknown tx result.
select {
case <-time.After(time.Second):
t.Fatal("timeout waiting for subscriber")
case result := <-subscriber:
// We expect the result to be TxUnknownSpend.
require.Equal(t, TxUnknownSpend, result.Event)
require.Equal(t, tx, result.Tx)
// No error should be set.
require.ErrorIs(t, result.Err, ErrThirdPartySpent)
require.Equal(t, requestID, result.requestID)
}
}