Merge pull request #9448 from yyforyongyu/sweeper-fix

sweep: properly handle failed sweeping txns
This commit is contained in:
Yong 2025-02-20 14:33:42 +08:00 committed by GitHub
commit 7adc900667
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 1698 additions and 366 deletions

View file

@ -108,7 +108,7 @@ func (c *anchorResolver) Resolve() (ContractResolver, error) {
// Anchor was swept by someone else. This is possible after the
// 16 block csv lock.
case sweep.ErrRemoteSpend:
case sweep.ErrRemoteSpend, sweep.ErrInputMissing:
c.log.Warnf("our anchor spent by someone else")
outcome = channeldb.ResolverOutcomeUnclaimed

View file

@ -352,6 +352,11 @@ The underlying functionality between those two options remain the same.
* A code refactor that [replaces min/max helpers with built-in min/max
functions](https://github.com/lightningnetwork/lnd/pull/9451).
* [Unified](https://github.com/lightningnetwork/lnd/pull/9448) the monitoring
inputs spending logic in the sweeper so it can properly handle missing inputs
and recover from restart.
## Tooling and Documentation
* [Improved `lncli create` command help text](https://github.com/lightningnetwork/lnd/pull/9077)

View file

@ -662,6 +662,10 @@ var allTestCases = []*lntest.TestCase{
Name: "invoice migration",
TestFunc: testInvoiceMigration,
},
{
Name: "fee replacement",
TestFunc: testFeeReplacement,
},
}
// appendPrefixed is used to add a prefix to each test name in the subtests

File diff suppressed because it is too large Load diff

View file

@ -15,6 +15,7 @@ import (
"github.com/lightningnetwork/lnd/lnrpc/walletrpc"
"github.com/lightningnetwork/lnd/lntest"
"github.com/lightningnetwork/lnd/lntest/node"
"github.com/lightningnetwork/lnd/lntest/rpc"
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
@ -2081,3 +2082,255 @@ func testBumpForceCloseFee(ht *lntest.HarnessTest) {
// This is needed to clean up the mempool.
ht.MineBlocksAndAssertNumTxes(1, 2)
}
// testFeeReplacement tests that when a sweeping txns aggregates multiple
// outgoing HTLCs, and one of the outgoing HTLCs has been spent via the direct
// preimage path by the remote peer, the remaining HTLCs will be grouped again
// and swept immediately.
//
// Setup:
// 1. Fund Alice with 1 UTXOs - she only needs one for the funding process,
// 2. Fund Bob with 3 UTXOs - he needs one for the funding process, one for
// his CPFP anchor sweeping, and one for sweeping his outgoing HTLCs.
// 3. Create a linear network from Alice -> Bob -> Carol.
// 4. Alice pays two invoices to Carol, with Carol holding the settlement.
// 5. Bob goes offline.
// 6. Carol settles one of the invoices, so she can later spend Bob's outgoing
// HTLC via the direct preimage path.
// 7. Carol goes offline and Bob comes online.
// 8. Mine enough blocks so Bob will force close Bob=>Carol to claim his
// outgoing HTLCs.
// 9. Carol comes online, sweeps one of Bob's outgoing HTLCs and it confirms.
// 10. Bob creates a new sweeping tx to sweep his remaining HTLC with a
// previous fee rate.
//
// Test:
// 1. Bob will immediately sweeps his remaining outgoing HTLC given that the
// other one has been spent by Carol.
// 2. Bob's new sweeping tx will use the previous fee rate instead of
// initializing a new starting fee rate.
func testFeeReplacement(ht *lntest.HarnessTest) {
// Set the min relay feerate to be 10 sat/vbyte so the non-CPFP anchor
// is never swept.
//
// TODO(yy): delete this line once the normal anchor sweeping is
// removed.
ht.SetMinRelayFeerate(10_000)
// Setup testing params.
//
// Invoice is 100k sats.
invoiceAmt := btcutil.Amount(100_000)
// Alice will send two payments.
numPayments := 2
// Use the smallest CLTV so we can mine fewer blocks.
cltvDelta := routing.MinCLTVDelta
// Prepare params.
cfg := []string{
"--protocol.anchors",
// Use a small CLTV to mine less blocks.
fmt.Sprintf("--bitcoin.timelockdelta=%d", cltvDelta),
// Use a very large CSV, this way to_local outputs are never
// swept so we can focus on testing HTLCs.
fmt.Sprintf("--bitcoin.defaultremotedelay=%v", cltvDelta*10),
}
cfgs := [][]string{cfg, cfg, cfg}
openChannelParams := lntest.OpenChannelParams{
Amt: invoiceAmt * 100,
}
// Create a three hop network: Alice -> Bob -> Carol.
_, nodes := ht.CreateSimpleNetwork(cfgs, openChannelParams)
// Unwrap the results.
alice, bob, carol := nodes[0], nodes[1], nodes[2]
// Bob needs two more wallet utxos:
// - when sweeping anchors, he needs one utxo for each sweep.
// - when sweeping HTLCs, he needs one utxo for each sweep.
numUTXOs := 2
// Bob should have enough wallet UTXOs here to sweep the HTLC in the
// end of this test. However, due to a known issue, Bob's wallet may
// report there's no UTXO available. For details,
// - https://github.com/lightningnetwork/lnd/issues/8786
//
// TODO(yy): remove this extra UTXO once the issue is resolved.
numUTXOs++
// For neutrino backend, we need two more UTXOs for Bob to create his
// sweeping txns.
if ht.IsNeutrinoBackend() {
numUTXOs += 2
}
ht.FundNumCoins(bob, numUTXOs)
// We also give Carol 2 coins to create her sweeping txns.
ht.FundNumCoins(carol, 2)
// Create numPayments HTLCs on Bob's incoming and outgoing channels.
preimages := make([][]byte, 0, numPayments)
streams := make([]rpc.SingleInvoiceClient, 0, numPayments)
for i := 0; i < numPayments; i++ {
// Create the preimage.
var preimage lntypes.Preimage
copy(preimage[:], ht.Random32Bytes())
payHashHold := preimage.Hash()
preimages = append(preimages, preimage[:])
// Subscribe the invoices.
stream := carol.RPC.SubscribeSingleInvoice(payHashHold[:])
streams = append(streams, stream)
// Carol create the hold invoice.
invoiceReqHold := &invoicesrpc.AddHoldInvoiceRequest{
Value: int64(invoiceAmt),
CltvExpiry: finalCltvDelta,
Hash: payHashHold[:],
}
invoiceHold := carol.RPC.AddHoldInvoice(invoiceReqHold)
// Let Alice pay the invoices.
req := &routerrpc.SendPaymentRequest{
PaymentRequest: invoiceHold.PaymentRequest,
TimeoutSeconds: 60,
FeeLimitMsat: noFeeLimitMsat,
}
// Assert the payments are inflight.
ht.SendPaymentAndAssertStatus(
alice, req, lnrpc.Payment_IN_FLIGHT,
)
// Wait for Carol to mark invoice as accepted. There is a small
// gap to bridge between adding the htlc to the channel and
// executing the exit hop logic.
ht.AssertInvoiceState(stream, lnrpc.Invoice_ACCEPTED)
}
// At this point, all 3 nodes should now have an active channel with
// the created HTLCs pending on all of them.
//
// Alice should have numPayments outgoing HTLCs on channel Alice -> Bob.
ht.AssertNumActiveHtlcs(alice, numPayments)
// Bob should have 2 * numPayments HTLCs,
// - numPayments incoming HTLCs on channel Alice -> Bob.
// - numPayments outgoing HTLCs on channel Bob -> Carol.
ht.AssertNumActiveHtlcs(bob, numPayments*2)
// Carol should have numPayments incoming HTLCs on channel Bob -> Carol.
ht.AssertNumActiveHtlcs(carol, numPayments)
// Suspend Bob so he won't get the preimage from Carol.
restartBob := ht.SuspendNode(bob)
// Carol settles the first invoice.
carol.RPC.SettleInvoice(preimages[0])
ht.AssertInvoiceState(streams[0], lnrpc.Invoice_SETTLED)
// Carol goes offline so the preimage won't be sent to Bob.
restartCarol := ht.SuspendNode(carol)
// Bob comes online.
require.NoError(ht, restartBob())
// We'll now mine enough blocks to trigger Bob to force close channel
// Bob->Carol due to his outgoing HTLC is about to timeout. With the
// default outgoing broadcast delta of zero, this will be the same
// height as the outgoing htlc's expiry height.
numBlocks := padCLTV(uint32(
finalCltvDelta - lncfg.DefaultOutgoingBroadcastDelta,
))
ht.MineEmptyBlocks(int(numBlocks))
// Assert Bob's force closing tx has been broadcast. We should see two
// txns in the mempool:
// 1. Bob's force closing tx.
// 2. Bob's anchor sweeping tx CPFPing the force close tx.
ht.AssertForceCloseAndAnchorTxnsInMempool()
// Mine a block to confirm Bob's force close tx and anchor sweeping tx
// so we can focus on testing his outgoing HTLCs.
ht.MineBlocksAndAssertNumTxes(1, 2)
// Bob should have numPayments pending sweep for the outgoing HTLCs.
ht.AssertNumPendingSweeps(bob, numPayments)
// Bob should have one sweeping tx in the mempool, which sweeps all his
// outgoing HTLCs.
outgoingSweep0 := ht.GetNumTxsFromMempool(1)[0]
// We now mine one empty block so Bob will perform one fee bump, after
// which his sweeping tx should be updated with a new fee rate. We do
// this so we can test later when Bob sweeps his remaining HTLC, the new
// sweeping tx will start with the current fee rate.
//
// Calculate Bob's initial sweeping fee rate.
initialFeeRate := ht.CalculateTxFeeRate(outgoingSweep0)
// Mine one block to trigger Bob's RBF.
ht.MineEmptyBlocks(1)
// Make sure Bob's old sweeping tx has been removed from the mempool.
ht.AssertTxNotInMempool(outgoingSweep0.TxHash())
// Get the feerate of Bob's current sweeping tx.
outgoingSweep1 := ht.GetNumTxsFromMempool(1)[0]
currentFeeRate := ht.CalculateTxFeeRate(outgoingSweep1)
// Assert the Bob has updated the fee rate.
require.Greater(ht, currentFeeRate, initialFeeRate)
delta := currentFeeRate - initialFeeRate
// Check the shape of the sweeping tx - we expect it to be
// 3-input-3-output as a wallet utxo is used and a required output is
// made.
require.Len(ht, outgoingSweep1.TxIn, numPayments+1)
require.Len(ht, outgoingSweep1.TxOut, numPayments+1)
// Restart Carol, once she is online, she will try to settle the HTLCs
// via the direct preimage spend.
require.NoError(ht, restartCarol())
// Carol should have 1 incoming HTLC and 1 anchor output to sweep.
ht.AssertNumPendingSweeps(carol, 2)
// Assert Bob's sweeping tx has been replaced by Carol's.
ht.AssertTxNotInMempool(outgoingSweep1.TxHash())
carolSweepTx := ht.GetNumTxsFromMempool(1)[0]
// Assume the miner is now happy with Carol's fee, and it gets included
// in the next block.
ht.MineBlockWithTx(carolSweepTx)
// Upon receiving the above block, Bob should immediately create a
// sweeping tx and broadcast it using the remaining outgoing HTLC.
//
// Bob should have numPayments-1 pending sweep for the outgoing HTLCs.
ht.AssertNumPendingSweeps(bob, numPayments-1)
// Assert Bob immediately sweeps his remaining HTLC with the previous
// fee rate.
outgoingSweep2 := ht.GetNumTxsFromMempool(1)[0]
// Calculate the fee rate.
feeRate := ht.CalculateTxFeeRate(outgoingSweep2)
// We expect the current fee rate to be equal to the last fee rate he
// used plus the delta, as we expect the fee rate to stay on the initial
// line given by his fee function.
expectedFeeRate := currentFeeRate + delta
require.InEpsilonf(ht, uint64(expectedFeeRate),
uint64(feeRate), 0.02, "want %d, got %d in tx=%v",
currentFeeRate, feeRate, outgoingSweep2.TxHash())
// Finally, clean the mempol.
ht.MineBlocksAndAssertNumTxes(1, 1)
}

View file

@ -43,6 +43,10 @@ var (
// ErrUnknownSpent is returned when an unknown tx has spent an input in
// the sweeping tx.
ErrUnknownSpent = errors.New("unknown spend of input")
// ErrInputMissing is returned when a given input no longer exists,
// e.g., spending from an orphan tx.
ErrInputMissing = errors.New("input no longer exists")
)
var (
@ -273,6 +277,10 @@ type BumpResult struct {
// Err is the error that occurred during the broadcast.
Err error
// SpentInputs are the inputs spent by another tx which caused the
// current tx to be failed.
SpentInputs map[wire.OutPoint]*wire.MsgTx
// requestID is the ID of the request that created this record.
requestID uint64
}
@ -540,7 +548,7 @@ func (t *TxPublisher) createRBFCompliantTx(
for {
// Create a new tx with the given fee rate and check its
// mempool acceptance.
sweepCtx, err := t.createAndCheckTx(r.req, f)
sweepCtx, err := t.createAndCheckTx(r)
switch {
case err == nil:
@ -603,8 +611,9 @@ func (t *TxPublisher) createRBFCompliantTx(
// script, and the fee rate. In addition, it validates the tx's mempool
// acceptance before returning a tx that can be published directly, along with
// its fee.
func (t *TxPublisher) createAndCheckTx(req *BumpRequest,
f FeeFunction) (*sweepTxCtx, error) {
func (t *TxPublisher) createAndCheckTx(r *monitorRecord) (*sweepTxCtx, error) {
req := r.req
f := r.feeFunction
// Create the sweep tx with max fee rate of 0 as the fee function
// guarantees the fee rate used here won't exceed the max fee rate.
@ -648,10 +657,68 @@ func (t *TxPublisher) createAndCheckTx(req *BumpRequest,
return sweepCtx, nil
}
// If the inputs are spent by another tx, we will exit with the latest
// sweepCtx and an error.
if errors.Is(err, chain.ErrMissingInputs) {
log.Debugf("Tx %v missing inputs, it's likely the input has "+
"been spent by others", sweepCtx.tx.TxHash())
// Make sure to update the record with the latest attempt.
t.updateRecord(r, sweepCtx)
return sweepCtx, ErrInputMissing
}
return sweepCtx, fmt.Errorf("tx=%v failed mempool check: %w",
sweepCtx.tx.TxHash(), err)
}
// handleMissingInputs handles the case when the chain backend reports back a
// missing inputs error, which could happen when one of the input has been spent
// in another tx, or the input is referencing an orphan. When the input is
// spent, it will be handled via the TxUnknownSpend flow by creating a
// TxUnknownSpend bump result, otherwise, a TxFatal bump result is returned.
func (t *TxPublisher) handleMissingInputs(r *monitorRecord) *BumpResult {
// Get the spending txns.
spends := t.getSpentInputs(r)
// Attach the spending txns.
r.spentInputs = spends
// If there are no spending txns found and the input is missing, the
// input is referencing an orphan tx that's no longer valid, e.g., the
// spending the anchor output from the remote commitment after the local
// commitment has confirmed. In this case we will mark it as fatal and
// exit.
if len(spends) == 0 {
log.Warnf("Failing record=%v: found orphan inputs: %v\n",
r.requestID, inputTypeSummary(r.req.Inputs))
// Create a result that will be sent to the resultChan which is
// listened by the caller.
result := &BumpResult{
Event: TxFatal,
Tx: r.tx,
requestID: r.requestID,
Err: ErrInputMissing,
}
return result
}
// Check that the spending tx matches the sweeping tx - given that the
// current sweeping tx has been failed due to missing inputs, the
// spending tx must be a different tx, thus it should NOT be matched. We
// perform a sanity check here to catch the unexpected state.
if !t.isUnknownSpent(r, spends) {
log.Errorf("Sweeping tx %v has missing inputs, yet the "+
"spending tx is the sweeping tx itself: %v",
r.tx.TxHash(), r.spentInputs)
}
return t.createUnknownSpentBumpResult(r)
}
// broadcast takes a monitored tx and publishes it to the network. Prior to the
// broadcast, it will subscribe the tx's confirmation notification and attach
// the event channel to the record. Any broadcast-related errors will not be
@ -812,6 +879,10 @@ type monitorRecord struct {
// outpointToTxIndex is a map of outpoint to tx index.
outpointToTxIndex map[wire.OutPoint]int
// spentInputs are the inputs spent by another tx which caused the
// current tx failed.
spentInputs map[wire.OutPoint]*wire.MsgTx
}
// Start starts the publisher by subscribing to block epoch updates and kicking
@ -910,6 +981,9 @@ func (t *TxPublisher) processRecords() {
// If the any of the inputs has been spent, the record will be
// marked as failed or confirmed.
if len(spends) != 0 {
// Attach the spending txns.
r.spentInputs = spends
// When tx is nil, it means we haven't tried the initial
// broadcast yet the input is already spent. This could
// happen when the node shuts down, a previous sweeping
@ -1014,27 +1088,36 @@ func (t *TxPublisher) handleTxConfirmed(r *monitorRecord) {
// handleInitialTxError takes the error from `initializeTx` and decides the
// bump event. It will construct a BumpResult and handles it.
func (t *TxPublisher) handleInitialTxError(requestID uint64, err error) {
// We now decide what type of event to send.
var event BumpEvent
func (t *TxPublisher) handleInitialTxError(r *monitorRecord, err error) {
// Create a bump result to be sent to the sweeper.
result := &BumpResult{
Err: err,
requestID: r.requestID,
}
// We now decide what type of event to send.
switch {
// When the error is due to a dust output, we'll send a TxFailed so
// these inputs can be retried with a different group in the next
// block.
case errors.Is(err, ErrTxNoOutput):
event = TxFailed
result.Event = TxFailed
// When the error is due to budget being used up, we'll send a TxFailed
// so these inputs can be retried with a different group in the next
// block.
case errors.Is(err, ErrMaxPosition):
event = TxFailed
result.Event = TxFailed
// When the error is due to zero fee rate delta, we'll send a TxFailed
// so these inputs can be retried in the next block.
case errors.Is(err, ErrZeroFeeRateDelta):
event = TxFailed
result.Event = TxFailed
// When there are missing inputs, we'll create a TxUnknownSpend bump
// result here so the rest of the inputs can be retried.
case errors.Is(err, ErrInputMissing):
result = t.handleMissingInputs(r)
// Otherwise this is not a fee-related error and the tx cannot be
// retried. In that case we will fail ALL the inputs in this tx, which
@ -1044,13 +1127,7 @@ func (t *TxPublisher) handleInitialTxError(requestID uint64, err error) {
// TODO(yy): Find out which input is causing the failure and fail that
// one only.
default:
event = TxFatal
}
result := &BumpResult{
Event: event,
Err: err,
requestID: requestID,
result.Event = TxFatal
}
t.handleResult(result)
@ -1078,7 +1155,7 @@ func (t *TxPublisher) handleInitialBroadcast(r *monitorRecord) {
log.Errorf("Initial broadcast failed: %v", err)
// We now handle the initialization error and exit.
t.handleInitialTxError(r.requestID, err)
t.handleInitialTxError(r, err)
return
}
@ -1159,17 +1236,81 @@ func (t *TxPublisher) handleUnknownSpent(r *monitorRecord) {
"bumper, failing it now:\n%v", r.requestID,
inputTypeSummary(r.req.Inputs))
// Create a result that will be sent to the resultChan which is
// listened by the caller.
// Create a result that will be sent to the resultChan which is listened
// by the caller.
result := t.createUnknownSpentBumpResult(r)
// Notify the sweeper about this result in the end.
t.handleResult(result)
}
// createUnknownSpentBumpResult creates and returns a BumpResult given the
// monitored record has unknown spends.
func (t *TxPublisher) createUnknownSpentBumpResult(
r *monitorRecord) *BumpResult {
// Create a result that will be sent to the resultChan which is listened
// by the caller.
result := &BumpResult{
Event: TxUnknownSpend,
Tx: r.tx,
requestID: r.requestID,
Err: ErrUnknownSpent,
Event: TxUnknownSpend,
Tx: r.tx,
requestID: r.requestID,
Err: ErrUnknownSpent,
SpentInputs: r.spentInputs,
}
// Notify that this tx is confirmed and remove the record from the map.
t.handleResult(result)
// Get the fee function, which will be used to decided the next fee rate
// to use if the sweeper decides to retry sweeping this input.
feeFunc := r.feeFunction
// When the record is failed before the initial broadcast is attempted,
// it will have a nil fee func. In this case, we'll create the fee func
// here.
//
// NOTE: Since the current record is failed and will be deleted, we
// don't need to update the record on this fee function. We only need
// the fee rate data so the sweeper can pick up where we left off.
if feeFunc == nil {
f, err := t.initializeFeeFunction(r.req)
// TODO(yy): The only error we would receive here is when the
// pkScript is not recognized by the weightEstimator. What we
// should do instead is to check the pkScript immediately after
// receiving a sweep request so we don't need to check it again,
// which will also save us from error checking from several
// callsites.
if err != nil {
log.Errorf("Failed to create fee func for record %v: "+
"%v", r.requestID, err)
// Overwrite the event and error so the sweeper will
// remove this input.
result.Event = TxFatal
result.Err = err
return result
}
feeFunc = f
}
// Since the sweeping tx has been replaced by another party's tx, we
// missed this block window to increase its fee rate. To make sure the
// fee rate stays in the initial line, we now ask the fee function to
// give us the next fee rate as if the sweeping tx were RBFed. This new
// fee rate will be used as the starting fee rate if the upper system
// decides to continue sweeping the rest of the inputs.
_, err := feeFunc.Increment()
if err != nil {
// The fee function has reached its max position - nothing we
// can do here other than letting the user increase the budget.
log.Errorf("Failed to calculate the next fee rate for "+
"Record(%v): %v", r.requestID, err)
}
// Attach the new fee rate to be used for the next sweeping attempt.
result.FeeRate = feeFunc.FeeRate()
return result
}
// createAndPublishTx creates a new tx with a higher fee rate and publishes it
@ -1186,48 +1327,12 @@ func (t *TxPublisher) createAndPublishTx(
// NOTE: The fee function is expected to have increased its returned
// fee rate after calling the SkipFeeBump method. So we can use it
// directly here.
sweepCtx, err := t.createAndCheckTx(r.req, r.feeFunction)
sweepCtx, err := t.createAndCheckTx(r)
// If the error is fee related, we will return no error and let the fee
// bumper retry it at next block.
//
// NOTE: we can check the RBF error here and ask the fee function to
// recalculate the fee rate. However, this would defeat the purpose of
// using a deadline based fee function:
// - if the deadline is far away, there's no rush to RBF the tx.
// - if the deadline is close, we expect the fee function to give us a
// higher fee rate. If the fee rate cannot satisfy the RBF rules, it
// means the budget is not enough.
if errors.Is(err, chain.ErrInsufficientFee) ||
errors.Is(err, lnwallet.ErrMempoolFee) {
log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
return fn.None[BumpResult]()
}
// If the error is not fee related, we will return a `TxFailed` event
// so this input can be retried.
// If there's an error creating the replacement tx, we need to abort the
// flow and handle it.
if err != nil {
// If the tx doesn't not have enought budget, we will return a
// result so the sweeper can handle it by re-clustering the
// utxos.
if errors.Is(err, ErrNotEnoughBudget) {
log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(),
err)
} else {
// Otherwise, an unexpected error occurred, we will
// fail the tx and let the sweeper retry the whole
// process.
log.Errorf("Failed to bump tx %v: %v", oldTx.TxHash(),
err)
}
return fn.Some(BumpResult{
Event: TxFailed,
Tx: oldTx,
Err: err,
requestID: r.requestID,
})
return t.handleReplacementTxError(r, oldTx, err)
}
// The tx has been created without any errors, we now register a new
@ -1251,7 +1356,9 @@ func (t *TxPublisher) createAndPublishTx(
if errors.Is(result.Err, chain.ErrInsufficientFee) ||
errors.Is(result.Err, lnwallet.ErrMempoolFee) {
log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(),
result.Err)
return fn.None[BumpResult]()
}
@ -1720,3 +1827,59 @@ func prepareSweepTx(inputs []input.Input, changePkScript lnwallet.AddrWithKey,
return txFee, changeOutsOpt, locktimeOpt, nil
}
// handleReplacementTxError handles the error returned from creating the
// replacement tx. It returns a BumpResult that should be notified to the
// sweeper.
func (t *TxPublisher) handleReplacementTxError(r *monitorRecord,
oldTx *wire.MsgTx, err error) fn.Option[BumpResult] {
// If the error is fee related, we will return no error and let the fee
// bumper retry it at next block.
//
// NOTE: we can check the RBF error here and ask the fee function to
// recalculate the fee rate. However, this would defeat the purpose of
// using a deadline based fee function:
// - if the deadline is far away, there's no rush to RBF the tx.
// - if the deadline is close, we expect the fee function to give us a
// higher fee rate. If the fee rate cannot satisfy the RBF rules, it
// means the budget is not enough.
if errors.Is(err, chain.ErrInsufficientFee) ||
errors.Is(err, lnwallet.ErrMempoolFee) {
log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
return fn.None[BumpResult]()
}
// At least one of the inputs is missing, which means it has already
// been spent by another tx and confirmed. In this case we will handle
// it by returning a TxUnknownSpend bump result.
if errors.Is(err, ErrInputMissing) {
log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(), err)
bumpResult := t.handleMissingInputs(r)
return fn.Some(*bumpResult)
}
// If the error is not fee related, we will return a `TxFailed` event
// so this input can be retried.
result := fn.Some(BumpResult{
Event: TxFailed,
Tx: oldTx,
Err: err,
requestID: r.requestID,
})
// If the tx doesn't not have enought budget, we will return a result so
// the sweeper can handle it by re-clustering the utxos.
if errors.Is(err, ErrNotEnoughBudget) {
log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(), err)
return result
}
// Otherwise, an unexpected error occurred, we will log an error and let
// the sweeper retry the whole process.
log.Errorf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
return result
}

View file

@ -504,9 +504,14 @@ func TestCreateAndCheckTx(t *testing.T) {
for _, tc := range testCases {
tc := tc
r := &monitorRecord{
req: tc.req,
feeFunction: m.feeFunc,
}
t.Run(tc.name, func(t *testing.T) {
// Call the method under test.
_, err := tp.createAndCheckTx(tc.req, m.feeFunc)
_, err := tp.createAndCheckTx(r)
// Check the result is as expected.
require.ErrorIs(t, err, tc.expectedErr)
@ -1772,6 +1777,13 @@ func TestProcessRecordsSpent(t *testing.T) {
tp.subscriberChans.Store(requestID, subscriber)
tp.records.Store(requestID, recordConfirmed)
// Mock the fee function to increase feerate.
m.feeFunc.On("Increment").Return(true, nil).Once()
// Create a test feerate and return it from the mock fee function.
feerate := chainfee.SatPerKWeight(1000)
m.feeFunc.On("FeeRate").Return(feerate)
// Call processRecords and expect the results are notified back.
tp.processRecords()
@ -1785,6 +1797,9 @@ func TestProcessRecordsSpent(t *testing.T) {
require.Equal(t, TxUnknownSpend, result.Event)
require.Equal(t, tx, result.Tx)
// We expect the fee rate to be updated.
require.Equal(t, feerate, result.FeeRate)
// No error should be set.
require.ErrorIs(t, result.Err, ErrUnknownSpent)
require.Equal(t, requestID, result.requestID)

View file

@ -24,10 +24,10 @@ func NewMockSweeperStore() *MockSweeperStore {
}
// IsOurTx determines whether a tx is published by us, based on its hash.
func (s *MockSweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) {
func (s *MockSweeperStore) IsOurTx(hash chainhash.Hash) bool {
args := s.Called(hash)
return args.Bool(0), args.Error(1)
return args.Bool(0)
}
// StoreTx stores a tx we are about to publish.

View file

@ -121,7 +121,7 @@ func deserializeTxRecord(r io.Reader) (*TxRecord, error) {
type SweeperStore interface {
// IsOurTx determines whether a tx is published by us, based on its
// hash.
IsOurTx(hash chainhash.Hash) (bool, error)
IsOurTx(hash chainhash.Hash) bool
// StoreTx stores a tx hash we are about to publish.
StoreTx(*TxRecord) error
@ -276,15 +276,17 @@ func (s *sweeperStore) StoreTx(tr *TxRecord) error {
}, func() {})
}
// IsOurTx determines whether a tx is published by us, based on its
// hash.
func (s *sweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) {
// IsOurTx determines whether a tx is published by us, based on its hash.
func (s *sweeperStore) IsOurTx(hash chainhash.Hash) bool {
var ours bool
err := kvdb.View(s.db, func(tx kvdb.RTx) error {
txHashesBucket := tx.ReadBucket(txHashesBucketKey)
// If the root bucket cannot be found, we consider the tx to be
// not found in our db.
if txHashesBucket == nil {
return errNoTxHashesBucket
log.Error("Tx hashes bucket not found in sweeper store")
return nil
}
ours = txHashesBucket.Get(hash[:]) != nil
@ -294,10 +296,10 @@ func (s *sweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) {
ours = false
})
if err != nil {
return false, err
return false
}
return ours, nil
return ours
}
// ListSweeps lists all the sweep transactions we have in the sweeper store.

View file

@ -57,18 +57,15 @@ func TestStore(t *testing.T) {
require.NoError(t, err)
// Assert that both txes are recognized as our own.
ours, err := store.IsOurTx(tx1.TxHash())
require.NoError(t, err)
ours := store.IsOurTx(tx1.TxHash())
require.True(t, ours, "expected tx to be ours")
ours, err = store.IsOurTx(tx2.TxHash())
require.NoError(t, err)
ours = store.IsOurTx(tx2.TxHash())
require.True(t, ours, "expected tx to be ours")
// An different hash should be reported as not being ours.
var unknownHash chainhash.Hash
ours, err = store.IsOurTx(unknownHash)
require.NoError(t, err)
ours = store.IsOurTx(unknownHash)
require.False(t, ours, "expected tx to not be ours")
txns, err := store.ListSweeps()

View file

@ -1230,21 +1230,26 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error {
}
// This is a new input, and we want to query the mempool to see if this
// input has already been spent. If so, we'll start the input with
// state Published and attach the RBFInfo.
state, rbfInfo := s.decideStateAndRBFInfo(input.input.OutPoint())
// input has already been spent. If so, we'll start the input with the
// RBFInfo.
rbfInfo := s.decideRBFInfo(input.input.OutPoint())
// Create a new pendingInput and initialize the listeners slice with
// the passed in result channel. If this input is offered for sweep
// again, the result channel will be appended to this slice.
pi = &SweeperInput{
state: state,
state: Init,
listeners: []chan Result{input.resultChan},
Input: input.input,
params: input.params,
rbf: rbfInfo,
}
// Set the starting fee rate if a previous sweeping tx is found.
rbfInfo.WhenSome(func(info RBFInfo) {
pi.params.StartingFeeRate = fn.Some(info.FeeRate)
})
// Set the acutal deadline height.
pi.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr(
s.calculateDefaultDeadline(pi),
@ -1267,7 +1272,7 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error {
)
if err != nil {
err := fmt.Errorf("wait for spend: %w", err)
s.markInputFatal(pi, err)
s.markInputFatal(pi, nil, err)
return err
}
@ -1277,13 +1282,12 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error {
return nil
}
// decideStateAndRBFInfo queries the mempool to see whether the given input has
// already been spent. If so, the state Published will be returned, otherwise
// state Init. When spent, it will query the sweeper store to fetch the fee
// info of the spending transction, and construct an RBFInfo based on it.
// Suppose an error occurs, fn.None is returned.
func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
SweepState, fn.Option[RBFInfo]) {
// decideRBFInfo queries the mempool to see whether the given input has already
// been spent. When spent, it will query the sweeper store to fetch the fee info
// of the spending transction, and construct an RBFInfo based on it. Suppose an
// error occurs, fn.None is returned.
func (s *UtxoSweeper) decideRBFInfo(
op wire.OutPoint) fn.Option[RBFInfo] {
// Check if we can find the spending tx of this input in mempool.
txOption := s.mempoolLookup(op)
@ -1301,7 +1305,7 @@ func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
// - for neutrino we don't have a mempool.
// - for btcd below v0.24.1 we don't have `gettxspendingprevout`.
if tx == nil {
return Init, fn.None[RBFInfo]()
return fn.None[RBFInfo]()
}
// Otherwise the input is already spent in the mempool, so eventually
@ -1313,12 +1317,15 @@ func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
txid := tx.TxHash()
tr, err := s.cfg.Store.GetTx(txid)
log.Debugf("Found spending tx %v in mempool for input %v", tx.TxHash(),
op)
// If the tx is not found in the store, it means it's not broadcast by
// us, hence we can't find the fee info. This is fine as, later on when
// this tx is confirmed, we will remove the input from our inputs.
if errors.Is(err, ErrTxNotFound) {
log.Warnf("Spending tx %v not found in sweeper store", txid)
return Published, fn.None[RBFInfo]()
return fn.None[RBFInfo]()
}
// Exit if we get an db error.
@ -1326,7 +1333,7 @@ func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
log.Errorf("Unable to get tx %v from sweeper store: %v",
txid, err)
return Published, fn.None[RBFInfo]()
return fn.None[RBFInfo]()
}
// Prepare the fee info and return it.
@ -1336,7 +1343,7 @@ func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
})
return Published, rbf
return rbf
}
// handleExistingInput processes an input that is already known to the sweeper.
@ -1387,12 +1394,7 @@ func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage,
func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail) {
// Query store to find out if we ever published this tx.
spendHash := *spend.SpenderTxHash
isOurTx, err := s.cfg.Store.IsOurTx(spendHash)
if err != nil {
log.Errorf("cannot determine if tx %v is ours: %v",
spendHash, err)
return
}
isOurTx := s.cfg.Store.IsOurTx(spendHash)
// If this isn't our transaction, it means someone else swept outputs
// that we were attempting to sweep. This can happen for anchor outputs
@ -1482,12 +1484,17 @@ func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) {
// markInputFatal marks the given input as fatal and won't be retried. It
// will also notify all the subscribers of this input.
func (s *UtxoSweeper) markInputFatal(pi *SweeperInput, err error) {
func (s *UtxoSweeper) markInputFatal(pi *SweeperInput, tx *wire.MsgTx,
err error) {
log.Errorf("Failed to sweep input: %v, error: %v", pi, err)
pi.state = Fatal
s.signalResult(pi, Result{Err: err})
s.signalResult(pi, Result{
Tx: tx,
Err: err,
})
}
// updateSweeperInputs updates the sweeper's internal state and returns a map
@ -1819,7 +1826,7 @@ func (s *UtxoSweeper) markInputsFatal(set InputSet, err error) {
continue
}
s.markInputFatal(input, err)
s.markInputFatal(input, nil, err)
}
}
@ -1847,6 +1854,12 @@ func (s *UtxoSweeper) handleBumpEvent(r *bumpResp) error {
case TxReplaced:
return s.handleBumpEventTxReplaced(r)
// There are inputs being spent in a tx which the fee bumper doesn't
// understand. We will remove the tx from the sweeper db and mark the
// inputs as swept.
case TxUnknownSpend:
s.handleBumpEventTxUnknownSpend(r)
// There's a fatal error in creating the tx, we will remove the tx from
// the sweeper db and mark the inputs as failed.
case TxFatal:
@ -1861,20 +1874,139 @@ func (s *UtxoSweeper) handleBumpEvent(r *bumpResp) error {
// NOTE: It is enough to check the txid because the sweeper will create
// outpoints which solely belong to the internal LND wallet.
func (s *UtxoSweeper) IsSweeperOutpoint(op wire.OutPoint) bool {
found, err := s.cfg.Store.IsOurTx(op.Hash)
// In case there is an error fetching the transaction details from the
// sweeper store we assume the outpoint is still used by the sweeper
// (worst case scenario).
//
// TODO(ziggie): Ensure that confirmed outpoints are deleted from the
// bucket.
if err != nil && !errors.Is(err, errNoTxHashesBucket) {
log.Errorf("failed to fetch info for outpoint(%v:%d) "+
"with: %v, we assume it is still in use by the sweeper",
op.Hash, op.Index, err)
return s.cfg.Store.IsOurTx(op.Hash)
}
return true
// markInputSwept marks the given input as swept by the tx. It will also notify
// all the subscribers of this input.
func (s *UtxoSweeper) markInputSwept(inp *SweeperInput, tx *wire.MsgTx) {
log.Debugf("Marking input as swept: %v from state=%v", inp.OutPoint(),
inp.state)
inp.state = Swept
// Signal result channels.
s.signalResult(inp, Result{
Tx: tx,
})
// Remove all other inputs in this exclusive group.
if inp.params.ExclusiveGroup != nil {
s.removeExclusiveGroup(*inp.params.ExclusiveGroup)
}
}
// handleUnknownSpendTx takes an input and its spending tx. If the spending tx
// cannot be found in the sweeper store, the input will be marked as fatal,
// otherwise it will be marked as swept.
func (s *UtxoSweeper) handleUnknownSpendTx(inp *SweeperInput, tx *wire.MsgTx) {
op := inp.OutPoint()
txid := tx.TxHash()
isOurTx := s.cfg.Store.IsOurTx(txid)
// If this is our tx, it means it's a previous sweeping tx that got
// confirmed, which could happen when a restart happens during the
// sweeping process.
if isOurTx {
log.Debugf("Found our sweeping tx %v, marking input %v as "+
"swept", txid, op)
// We now use the spending tx to update the state of the inputs.
s.markInputSwept(inp, tx)
return
}
return found
// Since the input is spent by others, we now mark it as fatal and won't
// be retried.
s.markInputFatal(inp, tx, ErrRemoteSpend)
log.Debugf("Removing descendant txns invalidated by (txid=%v): %v",
txid, lnutils.SpewLogClosure(tx))
// Construct a map of the inputs this transaction spends.
spentInputs := make(map[wire.OutPoint]struct{}, len(tx.TxIn))
for _, txIn := range tx.TxIn {
spentInputs[txIn.PreviousOutPoint] = struct{}{}
}
err := s.removeConflictSweepDescendants(spentInputs)
if err != nil {
log.Warnf("unable to remove descendant transactions "+
"due to tx %v: ", txid)
}
}
// handleBumpEventTxUnknownSpend handles the case where the confirmed tx is
// unknown to the fee bumper. In the case when the sweeping tx has been replaced
// by another party with their tx being confirmed. It will retry sweeping the
// "good" inputs once the "bad" ones are kicked out.
func (s *UtxoSweeper) handleBumpEventTxUnknownSpend(r *bumpResp) {
// Mark the inputs as publish failed, which means they will be retried
// later.
s.markInputsPublishFailed(r.set)
// Get all the inputs that are not spent in the current sweeping tx.
spentInputs := r.result.SpentInputs
// Create a slice to track inputs to be retried.
inputsToRetry := make([]input.Input, 0, len(r.set.Inputs()))
// Iterate all the inputs found in this bump and mark the ones spent by
// the third party as failed. The rest of inputs will then be updated
// with a new fee rate and be retried immediately.
for _, inp := range r.set.Inputs() {
op := inp.OutPoint()
input, ok := s.inputs[op]
// Wallet inputs are not tracked so we will not find them from
// the inputs map.
if !ok {
log.Debugf("Skipped marking input: %v not found in "+
"pending inputs", op)
continue
}
// Check whether this input has been spent, if so we mark it as
// fatal or swept based on whether this is one of our previous
// sweeping txns, then move to the next.
tx, spent := spentInputs[op]
if spent {
s.handleUnknownSpendTx(input, tx)
continue
}
log.Debugf("Input(%v): updating params: starting fee rate "+
"[%v -> %v], immediate [%v -> true]", op,
input.params.StartingFeeRate, r.result.FeeRate,
input.params.Immediate)
// Update the input using the fee rate specified from the
// BumpResult, which should be the starting fee rate to use for
// the next sweeping attempt.
input.params.StartingFeeRate = fn.Some(r.result.FeeRate)
input.params.Immediate = true
inputsToRetry = append(inputsToRetry, input)
}
// Exit early if there are no inputs to be retried.
if len(inputsToRetry) == 0 {
return
}
log.Debugf("Retry sweeping inputs with updated params: %v",
inputTypeSummary(inputsToRetry))
// Get the latest inputs, which should put the PublishFailed inputs back
// to the sweeping queue.
inputs := s.updateSweeperInputs()
// Immediately sweep the remaining inputs - the previous inputs should
// now be swept with the updated StartingFeeRate immediately. We may
// also include more inputs in the new sweeping tx if new ones with the
// same deadline are offered.
s.sweepPendingInputs(inputs)
}

View file

@ -497,10 +497,9 @@ func TestUpdateSweeperInputs(t *testing.T) {
require.Equal(expectedInputs, s.inputs)
}
// TestDecideStateAndRBFInfo checks that the expected state and RBFInfo are
// returned based on whether this input can be found both in mempool and the
// sweeper store.
func TestDecideStateAndRBFInfo(t *testing.T) {
// TestDecideRBFInfo checks that the expected RBFInfo is returned based on
// whether this input can be found both in mempool and the sweeper store.
func TestDecideRBFInfo(t *testing.T) {
t.Parallel()
require := require.New(t)
@ -524,11 +523,9 @@ func TestDecideStateAndRBFInfo(t *testing.T) {
mockMempool.On("LookupInputMempoolSpend", op).Return(
fn.None[wire.MsgTx]()).Once()
// Since the mempool lookup failed, we exepect state Init and no
// RBFInfo.
state, rbf := s.decideStateAndRBFInfo(op)
// Since the mempool lookup failed, we expect no RBFInfo.
rbf := s.decideRBFInfo(op)
require.True(rbf.IsNone())
require.Equal(Init, state)
// Mock the mempool lookup to return a tx three times as we are calling
// attachAvailableRBFInfo three times.
@ -539,19 +536,17 @@ func TestDecideStateAndRBFInfo(t *testing.T) {
// Mock the store to return an error saying the tx cannot be found.
mockStore.On("GetTx", tx.TxHash()).Return(nil, ErrTxNotFound).Once()
// Although the db lookup failed, we expect the state to be Published.
state, rbf = s.decideStateAndRBFInfo(op)
// The db lookup failed, we expect no RBFInfo.
rbf = s.decideRBFInfo(op)
require.True(rbf.IsNone())
require.Equal(Published, state)
// Mock the store to return a db error.
dummyErr := errors.New("dummy error")
mockStore.On("GetTx", tx.TxHash()).Return(nil, dummyErr).Once()
// Although the db lookup failed, we expect the state to be Published.
state, rbf = s.decideStateAndRBFInfo(op)
// The db lookup failed, we expect no RBFInfo.
rbf = s.decideRBFInfo(op)
require.True(rbf.IsNone())
require.Equal(Published, state)
// Mock the store to return a record.
tr := &TxRecord{
@ -561,7 +556,7 @@ func TestDecideStateAndRBFInfo(t *testing.T) {
mockStore.On("GetTx", tx.TxHash()).Return(tr, nil).Once()
// Call the method again.
state, rbf = s.decideStateAndRBFInfo(op)
rbf = s.decideRBFInfo(op)
// Assert that the RBF info is returned.
rbfInfo := fn.Some(RBFInfo{
@ -570,9 +565,6 @@ func TestDecideStateAndRBFInfo(t *testing.T) {
FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
})
require.Equal(rbfInfo, rbf)
// Assert the state is updated.
require.Equal(Published, state)
}
// TestMarkInputFatal checks that the input is marked as expected.
@ -596,7 +588,7 @@ func TestMarkInputFailed(t *testing.T) {
}
// Call the method under test.
s.markInputFatal(pi, errors.New("dummy error"))
s.markInputFatal(pi, nil, errors.New("dummy error"))
// Assert the state is updated.
require.Equal(t, Fatal, pi.state)
@ -1199,3 +1191,246 @@ func TestHandleBumpEventTxFatal(t *testing.T) {
err = s.handleBumpEventTxFatal(resp)
rt.NoError(err)
}
// TestHandleUnknownSpendTxOurs checks that `handleUnknownSpendTx` correctly
// marks an input as swept given the tx is ours.
func TestHandleUnknownSpendTxOurs(t *testing.T) {
t.Parallel()
// Create a mock store.
store := &MockSweeperStore{}
defer store.AssertExpectations(t)
// Create a mock input set.
set := &MockInputSet{}
defer set.AssertExpectations(t)
// Create a test sweeper.
s := New(&UtxoSweeperConfig{
Store: store,
})
// Create a mock input.
inp := createMockInput(t, s, PublishFailed)
op := inp.OutPoint()
si, ok := s.inputs[op]
require.True(t, ok)
// Create a testing tx that spends the input.
tx := &wire.MsgTx{
LockTime: 1,
TxIn: []*wire.TxIn{
{PreviousOutPoint: op},
},
}
txid := tx.TxHash()
// Mock the store to return true when calling IsOurTx.
store.On("IsOurTx", txid).Return(true).Once()
// Call the method under test.
s.handleUnknownSpendTx(si, tx)
// Assert the state of the input is updated.
require.Equal(t, Swept, s.inputs[op].state)
}
// TestHandleUnknownSpendTxThirdParty checks that `handleUnknownSpendTx`
// correctly marks an input as fatal given the tx is not ours.
func TestHandleInputSpendTxThirdParty(t *testing.T) {
t.Parallel()
// Create a mock store.
store := &MockSweeperStore{}
defer store.AssertExpectations(t)
// Create a mock input set.
set := &MockInputSet{}
defer set.AssertExpectations(t)
// Create a test sweeper.
s := New(&UtxoSweeperConfig{
Store: store,
})
// Create a mock input.
inp := createMockInput(t, s, PublishFailed)
op := inp.OutPoint()
si, ok := s.inputs[op]
require.True(t, ok)
// Create a testing tx that spends the input.
tx := &wire.MsgTx{
LockTime: 1,
TxIn: []*wire.TxIn{
{PreviousOutPoint: op},
},
}
txid := tx.TxHash()
// Mock the store to return false when calling IsOurTx.
store.On("IsOurTx", txid).Return(false).Once()
// Mock `ListSweeps` to return an empty slice as we are testing the
// workflow here, not the method `removeConflictSweepDescendants`.
store.On("ListSweeps").Return([]chainhash.Hash{}, nil).Once()
// Call the method under test.
s.handleUnknownSpendTx(si, tx)
// Assert the state of the input is updated.
require.Equal(t, Fatal, s.inputs[op].state)
}
// TestHandleBumpEventTxUnknownSpendNoRetry checks the case when all the inputs
// are failed due to them being spent by another party.
func TestHandleBumpEventTxUnknownSpendNoRetry(t *testing.T) {
t.Parallel()
// Create a mock store.
store := &MockSweeperStore{}
defer store.AssertExpectations(t)
// Create a mock input set.
set := &MockInputSet{}
defer set.AssertExpectations(t)
// Create a test sweeper.
s := New(&UtxoSweeperConfig{
Store: store,
})
// Create a mock input.
inp := createMockInput(t, s, PendingPublish)
set.On("Inputs").Return([]input.Input{inp})
op := inp.OutPoint()
// Create a testing tx that spends the input.
tx := &wire.MsgTx{
LockTime: 1,
TxIn: []*wire.TxIn{
{PreviousOutPoint: op},
},
}
txid := tx.TxHash()
// Create a testing bump result.
br := &BumpResult{
Tx: tx,
Event: TxUnknownSpend,
SpentInputs: map[wire.OutPoint]*wire.MsgTx{
op: tx,
},
}
// Create a testing bump response.
resp := &bumpResp{
result: br,
set: set,
}
// Mock the store to return true when calling IsOurTx.
store.On("IsOurTx", txid).Return(true).Once()
// Call the method under test.
s.handleBumpEventTxUnknownSpend(resp)
// Assert the state of the input is updated.
require.Equal(t, Swept, s.inputs[op].state)
}
// TestHandleBumpEventTxUnknownSpendWithRetry checks the case when some the
// inputs are retried after the bad inputs are filtered out.
func TestHandleBumpEventTxUnknownSpendWithRetry(t *testing.T) {
t.Parallel()
// Create a mock store.
store := &MockSweeperStore{}
defer store.AssertExpectations(t)
// Create a mock wallet and aggregator.
wallet := &MockWallet{}
defer wallet.AssertExpectations(t)
aggregator := &mockUtxoAggregator{}
defer aggregator.AssertExpectations(t)
publisher := &MockBumper{}
defer publisher.AssertExpectations(t)
// Create a test sweeper.
s := New(&UtxoSweeperConfig{
Wallet: wallet,
Aggregator: aggregator,
Publisher: publisher,
GenSweepScript: func() fn.Result[lnwallet.AddrWithKey] {
//nolint:ll
return fn.Ok(lnwallet.AddrWithKey{
DeliveryAddress: testPubKey.SerializeCompressed(),
})
},
NoDeadlineConfTarget: uint32(DefaultDeadlineDelta),
Store: store,
})
// Create a mock input set.
set := &MockInputSet{}
defer set.AssertExpectations(t)
// Create mock inputs - inp1 will be the bad input, and inp2 will be
// retried.
inp1 := createMockInput(t, s, PendingPublish)
inp2 := createMockInput(t, s, PendingPublish)
set.On("Inputs").Return([]input.Input{inp1, inp2})
op1 := inp1.OutPoint()
op2 := inp2.OutPoint()
inp2.On("RequiredLockTime").Return(
uint32(s.currentHeight), false).Once()
inp2.On("BlocksToMaturity").Return(uint32(0)).Once()
inp2.On("HeightHint").Return(uint32(s.currentHeight)).Once()
// Create a testing tx that spends inp1.
tx := &wire.MsgTx{
LockTime: 1,
TxIn: []*wire.TxIn{
{PreviousOutPoint: op1},
},
}
txid := tx.TxHash()
// Create a testing bump result.
br := &BumpResult{
Tx: tx,
Event: TxUnknownSpend,
SpentInputs: map[wire.OutPoint]*wire.MsgTx{
op1: tx,
},
}
// Create a testing bump response.
resp := &bumpResp{
result: br,
set: set,
}
// Mock the store to return true when calling IsOurTx.
store.On("IsOurTx", txid).Return(true).Once()
// Mock the aggregator to return an empty slice as we are not testing
// the actual sweeping behavior.
aggregator.On("ClusterInputs", mock.Anything).Return([]InputSet{})
// Call the method under test.
s.handleBumpEventTxUnknownSpend(resp)
// Assert the first input is removed.
require.NotContains(t, s.inputs, op1)
// Assert the state of the input is updated.
require.Equal(t, PublishFailed, s.inputs[op2].state)
}