Merge pull request #9447 from yyforyongyu/yy-sweeper-fix

sweep: start tracking input spending status in the fee bumper
This commit is contained in:
Olaoluwa Osuntokun 2025-02-20 16:56:45 -08:00 committed by GitHub
commit 553899bffb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 2210 additions and 507 deletions

View file

@ -211,7 +211,7 @@ func (m *MempoolNotifier) findRelevantInputs(tx *btcutil.Tx) (inputsWithTx,
// If found, save it to watchedInputs to notify the
// subscriber later.
Log.Infof("Found input %s, spent in %s", op, txid)
Log.Debugf("Found input %s, spent in %s", op, txid)
// Construct the spend details.
details := &SpendDetail{

View file

@ -98,9 +98,11 @@ func (c *anchorResolver) Resolve() (ContractResolver, error) {
select {
case sweepRes := <-c.sweepResultChan:
switch sweepRes.Err {
err := sweepRes.Err
switch {
// Anchor was swept successfully.
case nil:
case err == nil:
sweepTxID := sweepRes.Tx.TxHash()
spendTx = &sweepTxID
@ -108,7 +110,9 @@ func (c *anchorResolver) Resolve() (ContractResolver, error) {
// Anchor was swept by someone else. This is possible after the
// 16 block csv lock.
case sweep.ErrRemoteSpend:
case errors.Is(err, sweep.ErrRemoteSpend),
errors.Is(err, sweep.ErrInputMissing):
c.log.Warnf("our anchor spent by someone else")
outcome = channeldb.ResolverOutcomeUnclaimed

View file

@ -344,6 +344,11 @@ The underlying functionality between those two options remain the same.
* A code refactor that [replaces min/max helpers with built-in min/max
functions](https://github.com/lightningnetwork/lnd/pull/9451).
* [Unified](https://github.com/lightningnetwork/lnd/pull/9447) the monitoring
inputs spending logic in the sweeper so it can properly handle missing inputs
and recover from restart.
## Tooling and Documentation
* [Improved `lncli create` command help text](https://github.com/lightningnetwork/lnd/pull/9077)

View file

@ -662,6 +662,10 @@ var allTestCases = []*lntest.TestCase{
Name: "invoice migration",
TestFunc: testInvoiceMigration,
},
{
Name: "fee replacement",
TestFunc: testFeeReplacement,
},
}
// appendPrefixed is used to add a prefix to each test name in the subtests

File diff suppressed because it is too large Load diff

View file

@ -15,6 +15,7 @@ import (
"github.com/lightningnetwork/lnd/lnrpc/walletrpc"
"github.com/lightningnetwork/lnd/lntest"
"github.com/lightningnetwork/lnd/lntest/node"
"github.com/lightningnetwork/lnd/lntest/rpc"
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
@ -2081,3 +2082,255 @@ func testBumpForceCloseFee(ht *lntest.HarnessTest) {
// This is needed to clean up the mempool.
ht.MineBlocksAndAssertNumTxes(1, 2)
}
// testFeeReplacement tests that when a sweeping txns aggregates multiple
// outgoing HTLCs, and one of the outgoing HTLCs has been spent via the direct
// preimage path by the remote peer, the remaining HTLCs will be grouped again
// and swept immediately.
//
// Setup:
// 1. Fund Alice with 1 UTXOs - she only needs one for the funding process,
// 2. Fund Bob with 3 UTXOs - he needs one for the funding process, one for
// his CPFP anchor sweeping, and one for sweeping his outgoing HTLCs.
// 3. Create a linear network from Alice -> Bob -> Carol.
// 4. Alice pays two invoices to Carol, with Carol holding the settlement.
// 5. Bob goes offline.
// 6. Carol settles one of the invoices, so she can later spend Bob's outgoing
// HTLC via the direct preimage path.
// 7. Carol goes offline and Bob comes online.
// 8. Mine enough blocks so Bob will force close Bob=>Carol to claim his
// outgoing HTLCs.
// 9. Carol comes online, sweeps one of Bob's outgoing HTLCs and it confirms.
// 10. Bob creates a new sweeping tx to sweep his remaining HTLC with a
// previous fee rate.
//
// Test:
// 1. Bob will immediately sweeps his remaining outgoing HTLC given that the
// other one has been spent by Carol.
// 2. Bob's new sweeping tx will use the previous fee rate instead of
// initializing a new starting fee rate.
func testFeeReplacement(ht *lntest.HarnessTest) {
// Set the min relay feerate to be 10 sat/vbyte so the non-CPFP anchor
// is never swept.
//
// TODO(yy): delete this line once the normal anchor sweeping is
// removed.
ht.SetMinRelayFeerate(10_000)
// Setup testing params.
//
// Invoice is 100k sats.
invoiceAmt := btcutil.Amount(100_000)
// Alice will send two payments.
numPayments := 2
// Use the smallest CLTV so we can mine fewer blocks.
cltvDelta := routing.MinCLTVDelta
// Prepare params.
cfg := []string{
"--protocol.anchors",
// Use a small CLTV to mine less blocks.
fmt.Sprintf("--bitcoin.timelockdelta=%d", cltvDelta),
// Use a very large CSV, this way to_local outputs are never
// swept so we can focus on testing HTLCs.
fmt.Sprintf("--bitcoin.defaultremotedelay=%v", cltvDelta*10),
}
cfgs := [][]string{cfg, cfg, cfg}
openChannelParams := lntest.OpenChannelParams{
Amt: invoiceAmt * 100,
}
// Create a three hop network: Alice -> Bob -> Carol.
_, nodes := ht.CreateSimpleNetwork(cfgs, openChannelParams)
// Unwrap the results.
alice, bob, carol := nodes[0], nodes[1], nodes[2]
// Bob needs two more wallet utxos:
// - when sweeping anchors, he needs one utxo for each sweep.
// - when sweeping HTLCs, he needs one utxo for each sweep.
numUTXOs := 2
// Bob should have enough wallet UTXOs here to sweep the HTLC in the
// end of this test. However, due to a known issue, Bob's wallet may
// report there's no UTXO available. For details,
// - https://github.com/lightningnetwork/lnd/issues/8786
//
// TODO(yy): remove this extra UTXO once the issue is resolved.
numUTXOs++
// For neutrino backend, we need two more UTXOs for Bob to create his
// sweeping txns.
if ht.IsNeutrinoBackend() {
numUTXOs += 2
}
ht.FundNumCoins(bob, numUTXOs)
// We also give Carol 2 coins to create her sweeping txns.
ht.FundNumCoins(carol, 2)
// Create numPayments HTLCs on Bob's incoming and outgoing channels.
preimages := make([][]byte, 0, numPayments)
streams := make([]rpc.SingleInvoiceClient, 0, numPayments)
for i := 0; i < numPayments; i++ {
// Create the preimage.
var preimage lntypes.Preimage
copy(preimage[:], ht.Random32Bytes())
payHashHold := preimage.Hash()
preimages = append(preimages, preimage[:])
// Subscribe the invoices.
stream := carol.RPC.SubscribeSingleInvoice(payHashHold[:])
streams = append(streams, stream)
// Carol create the hold invoice.
invoiceReqHold := &invoicesrpc.AddHoldInvoiceRequest{
Value: int64(invoiceAmt),
CltvExpiry: finalCltvDelta,
Hash: payHashHold[:],
}
invoiceHold := carol.RPC.AddHoldInvoice(invoiceReqHold)
// Let Alice pay the invoices.
req := &routerrpc.SendPaymentRequest{
PaymentRequest: invoiceHold.PaymentRequest,
TimeoutSeconds: 60,
FeeLimitMsat: noFeeLimitMsat,
}
// Assert the payments are inflight.
ht.SendPaymentAndAssertStatus(
alice, req, lnrpc.Payment_IN_FLIGHT,
)
// Wait for Carol to mark invoice as accepted. There is a small
// gap to bridge between adding the htlc to the channel and
// executing the exit hop logic.
ht.AssertInvoiceState(stream, lnrpc.Invoice_ACCEPTED)
}
// At this point, all 3 nodes should now have an active channel with
// the created HTLCs pending on all of them.
//
// Alice should have numPayments outgoing HTLCs on channel Alice -> Bob.
ht.AssertNumActiveHtlcs(alice, numPayments)
// Bob should have 2 * numPayments HTLCs,
// - numPayments incoming HTLCs on channel Alice -> Bob.
// - numPayments outgoing HTLCs on channel Bob -> Carol.
ht.AssertNumActiveHtlcs(bob, numPayments*2)
// Carol should have numPayments incoming HTLCs on channel Bob -> Carol.
ht.AssertNumActiveHtlcs(carol, numPayments)
// Suspend Bob so he won't get the preimage from Carol.
restartBob := ht.SuspendNode(bob)
// Carol settles the first invoice.
carol.RPC.SettleInvoice(preimages[0])
ht.AssertInvoiceState(streams[0], lnrpc.Invoice_SETTLED)
// Carol goes offline so the preimage won't be sent to Bob.
restartCarol := ht.SuspendNode(carol)
// Bob comes online.
require.NoError(ht, restartBob())
// We'll now mine enough blocks to trigger Bob to force close channel
// Bob->Carol due to his outgoing HTLC is about to timeout. With the
// default outgoing broadcast delta of zero, this will be the same
// height as the outgoing htlc's expiry height.
numBlocks := padCLTV(uint32(
finalCltvDelta - lncfg.DefaultOutgoingBroadcastDelta,
))
ht.MineEmptyBlocks(int(numBlocks))
// Assert Bob's force closing tx has been broadcast. We should see two
// txns in the mempool:
// 1. Bob's force closing tx.
// 2. Bob's anchor sweeping tx CPFPing the force close tx.
ht.AssertForceCloseAndAnchorTxnsInMempool()
// Mine a block to confirm Bob's force close tx and anchor sweeping tx
// so we can focus on testing his outgoing HTLCs.
ht.MineBlocksAndAssertNumTxes(1, 2)
// Bob should have numPayments pending sweep for the outgoing HTLCs.
ht.AssertNumPendingSweeps(bob, numPayments)
// Bob should have one sweeping tx in the mempool, which sweeps all his
// outgoing HTLCs.
outgoingSweep0 := ht.GetNumTxsFromMempool(1)[0]
// We now mine one empty block so Bob will perform one fee bump, after
// which his sweeping tx should be updated with a new fee rate. We do
// this so we can test later when Bob sweeps his remaining HTLC, the new
// sweeping tx will start with the current fee rate.
//
// Calculate Bob's initial sweeping fee rate.
initialFeeRate := ht.CalculateTxFeeRate(outgoingSweep0)
// Mine one block to trigger Bob's RBF.
ht.MineEmptyBlocks(1)
// Make sure Bob's old sweeping tx has been removed from the mempool.
ht.AssertTxNotInMempool(outgoingSweep0.TxHash())
// Get the feerate of Bob's current sweeping tx.
outgoingSweep1 := ht.GetNumTxsFromMempool(1)[0]
currentFeeRate := ht.CalculateTxFeeRate(outgoingSweep1)
// Assert the Bob has updated the fee rate.
require.Greater(ht, currentFeeRate, initialFeeRate)
delta := currentFeeRate - initialFeeRate
// Check the shape of the sweeping tx - we expect it to be
// 3-input-3-output as a wallet utxo is used and a required output is
// made.
require.Len(ht, outgoingSweep1.TxIn, numPayments+1)
require.Len(ht, outgoingSweep1.TxOut, numPayments+1)
// Restart Carol, once she is online, she will try to settle the HTLCs
// via the direct preimage spend.
require.NoError(ht, restartCarol())
// Carol should have 1 incoming HTLC and 1 anchor output to sweep.
ht.AssertNumPendingSweeps(carol, 2)
// Assert Bob's sweeping tx has been replaced by Carol's.
ht.AssertTxNotInMempool(outgoingSweep1.TxHash())
carolSweepTx := ht.GetNumTxsFromMempool(1)[0]
// Assume the miner is now happy with Carol's fee, and it gets included
// in the next block.
ht.MineBlockWithTx(carolSweepTx)
// Upon receiving the above block, Bob should immediately create a
// sweeping tx and broadcast it using the remaining outgoing HTLC.
//
// Bob should have numPayments-1 pending sweep for the outgoing HTLCs.
ht.AssertNumPendingSweeps(bob, numPayments-1)
// Assert Bob immediately sweeps his remaining HTLC with the previous
// fee rate.
outgoingSweep2 := ht.GetNumTxsFromMempool(1)[0]
// Calculate the fee rate.
feeRate := ht.CalculateTxFeeRate(outgoingSweep2)
// We expect the current fee rate to be equal to the last fee rate he
// used plus the delta, as we expect the fee rate to stay on the initial
// line given by his fee function.
expectedFeeRate := currentFeeRate + delta
require.InEpsilonf(ht, uint64(expectedFeeRate),
uint64(feeRate), 0.02, "want %d, got %d in tx=%v",
currentFeeRate, feeRate, outgoingSweep2.TxHash())
// Finally, clean the mempol.
ht.MineBlocksAndAssertNumTxes(1, 1)
}

View file

@ -40,9 +40,13 @@ var (
// preparation, usually due to the output being dust.
ErrTxNoOutput = errors.New("tx has no output")
// ErrThirdPartySpent is returned when a third party has spent the
// input in the sweeping tx.
ErrThirdPartySpent = errors.New("third party spent the output")
// ErrUnknownSpent is returned when an unknown tx has spent an input in
// the sweeping tx.
ErrUnknownSpent = errors.New("unknown spend of input")
// ErrInputMissing is returned when a given input no longer exists,
// e.g., spending from an orphan tx.
ErrInputMissing = errors.New("input no longer exists")
)
var (
@ -81,10 +85,6 @@ const (
// bumper. In either case the inputs in this tx should be retried with
// either a different grouping strategy or an increased budget.
//
// NOTE: We also send this event when there's a third party spend
// event, and the sweeper will handle cleaning this up once it's
// confirmed.
//
// TODO(yy): Remove the above usage once we remove sweeping non-CPFP
// anchors.
TxFailed
@ -95,6 +95,17 @@ const (
// TxConfirmed is sent when the tx is confirmed.
TxConfirmed
// TxUnknownSpend is sent when at least one of the inputs is spent but
// not by the current sweeping tx, this can happen when,
// - a remote party has replaced our sweeping tx by spending the
// input(s), e.g., via the direct preimage spend on our outgoing HTLC.
// - a third party has replaced our sweeping tx, e.g., the anchor output
// after 16 blocks.
// - A previous sweeping tx has confirmed but the fee bumper is not
// aware of it, e.g., a restart happens right after the sweeping tx is
// broadcast and confirmed.
TxUnknownSpend
// TxFatal is sent when the inputs in this tx cannot be retried. Txns
// will end up in this state if they have encountered a non-fee related
// error, which means they cannot be retried with increased budget.
@ -115,6 +126,8 @@ func (e BumpEvent) String() string {
return "Replaced"
case TxConfirmed:
return "Confirmed"
case TxUnknownSpend:
return "UnknownSpend"
case TxFatal:
return "Fatal"
default:
@ -264,6 +277,10 @@ type BumpResult struct {
// Err is the error that occurred during the broadcast.
Err error
// SpentInputs are the inputs spent by another tx which caused the
// current tx to be failed.
SpentInputs map[wire.OutPoint]*wire.MsgTx
// requestID is the ID of the request that created this record.
requestID uint64
}
@ -280,7 +297,8 @@ func (b *BumpResult) String() string {
// Validate validates the BumpResult so it's safe to use.
func (b *BumpResult) Validate() error {
isFailureEvent := b.Event == TxFailed || b.Event == TxFatal
isFailureEvent := b.Event == TxFailed || b.Event == TxFatal ||
b.Event == TxUnknownSpend
// Every result must have a tx except the fatal or failed case.
if b.Tx == nil && !isFailureEvent {
@ -530,7 +548,7 @@ func (t *TxPublisher) createRBFCompliantTx(
for {
// Create a new tx with the given fee rate and check its
// mempool acceptance.
sweepCtx, err := t.createAndCheckTx(r.req, f)
sweepCtx, err := t.createAndCheckTx(r)
switch {
case err == nil:
@ -593,8 +611,9 @@ func (t *TxPublisher) createRBFCompliantTx(
// script, and the fee rate. In addition, it validates the tx's mempool
// acceptance before returning a tx that can be published directly, along with
// its fee.
func (t *TxPublisher) createAndCheckTx(req *BumpRequest,
f FeeFunction) (*sweepTxCtx, error) {
func (t *TxPublisher) createAndCheckTx(r *monitorRecord) (*sweepTxCtx, error) {
req := r.req
f := r.feeFunction
// Create the sweep tx with max fee rate of 0 as the fee function
// guarantees the fee rate used here won't exceed the max fee rate.
@ -638,10 +657,68 @@ func (t *TxPublisher) createAndCheckTx(req *BumpRequest,
return sweepCtx, nil
}
// If the inputs are spent by another tx, we will exit with the latest
// sweepCtx and an error.
if errors.Is(err, chain.ErrMissingInputs) {
log.Debugf("Tx %v missing inputs, it's likely the input has "+
"been spent by others", sweepCtx.tx.TxHash())
// Make sure to update the record with the latest attempt.
t.updateRecord(r, sweepCtx)
return sweepCtx, ErrInputMissing
}
return sweepCtx, fmt.Errorf("tx=%v failed mempool check: %w",
sweepCtx.tx.TxHash(), err)
}
// handleMissingInputs handles the case when the chain backend reports back a
// missing inputs error, which could happen when one of the input has been spent
// in another tx, or the input is referencing an orphan. When the input is
// spent, it will be handled via the TxUnknownSpend flow by creating a
// TxUnknownSpend bump result, otherwise, a TxFatal bump result is returned.
func (t *TxPublisher) handleMissingInputs(r *monitorRecord) *BumpResult {
// Get the spending txns.
spends := t.getSpentInputs(r)
// Attach the spending txns.
r.spentInputs = spends
// If there are no spending txns found and the input is missing, the
// input is referencing an orphan tx that's no longer valid, e.g., the
// spending the anchor output from the remote commitment after the local
// commitment has confirmed. In this case we will mark it as fatal and
// exit.
if len(spends) == 0 {
log.Warnf("Failing record=%v: found orphan inputs: %v\n",
r.requestID, inputTypeSummary(r.req.Inputs))
// Create a result that will be sent to the resultChan which is
// listened by the caller.
result := &BumpResult{
Event: TxFatal,
Tx: r.tx,
requestID: r.requestID,
Err: ErrInputMissing,
}
return result
}
// Check that the spending tx matches the sweeping tx - given that the
// current sweeping tx has been failed due to missing inputs, the
// spending tx must be a different tx, thus it should NOT be matched. We
// perform a sanity check here to catch the unexpected state.
if !t.isUnknownSpent(r, spends) {
log.Errorf("Sweeping tx %v has missing inputs, yet the "+
"spending tx is the sweeping tx itself: %v",
r.tx.TxHash(), r.spentInputs)
}
return t.createUnknownSpentBumpResult(r)
}
// broadcast takes a monitored tx and publishes it to the network. Prior to the
// broadcast, it will subscribe the tx's confirmation notification and attach
// the event channel to the record. Any broadcast-related errors will not be
@ -754,6 +831,11 @@ func (t *TxPublisher) removeResult(result *BumpResult) {
log.Debugf("Removing monitor record=%v due to fatal err: %v",
id, result.Err)
case TxUnknownSpend:
// Remove the record if there's an unknown spend.
log.Debugf("Removing monitor record=%v due unknown spent: "+
"%v", id, result.Err)
// Do nothing if it's neither failed or confirmed.
default:
log.Tracef("Skipping record removal for id=%v, event=%v", id,
@ -797,6 +879,10 @@ type monitorRecord struct {
// outpointToTxIndex is a map of outpoint to tx index.
outpointToTxIndex map[wire.OutPoint]int
// spentInputs are the inputs spent by another tx which caused the
// current tx failed.
spentInputs map[wire.OutPoint]*wire.MsgTx
}
// Start starts the publisher by subscribing to block epoch updates and kicking
@ -878,8 +964,6 @@ func (t *TxPublisher) processRecords() {
// failedRecords stores a map of records which has inputs being spent
// by a third party.
//
// NOTE: this is only used for neutrino backend.
failedRecords := make(map[uint64]*monitorRecord)
// initialRecords stores a map of records which are being created and
@ -889,32 +973,58 @@ func (t *TxPublisher) processRecords() {
// visitor is a helper closure that visits each record and divides them
// into two groups.
visitor := func(requestID uint64, r *monitorRecord) error {
if r.tx == nil {
initialRecords[requestID] = r
return nil
}
log.Tracef("Checking monitor recordID=%v", requestID)
log.Tracef("Checking monitor recordID=%v for tx=%v", requestID,
r.tx.TxHash())
// Check whether the inputs have already been spent.
spends := t.getSpentInputs(r)
// If the tx is already confirmed, we can stop monitoring it.
if t.isConfirmed(r.tx.TxHash()) {
// If the any of the inputs has been spent, the record will be
// marked as failed or confirmed.
if len(spends) != 0 {
// Attach the spending txns.
r.spentInputs = spends
// When tx is nil, it means we haven't tried the initial
// broadcast yet the input is already spent. This could
// happen when the node shuts down, a previous sweeping
// tx confirmed, then the node comes back online and
// reoffers the inputs. Another case is the remote node
// spends the input quickly before we even attempt the
// sweep. In either case we will fail the record and let
// the sweeper handles it.
if r.tx == nil {
failedRecords[requestID] = r
return nil
}
// Check whether the inputs has been spent by a unknown
// tx.
if t.isUnknownSpent(r, spends) {
failedRecords[requestID] = r
// Move to the next record.
return nil
}
// The tx is ours, we can move it to the confirmed queue
// and stop monitoring it.
confirmedRecords[requestID] = r
// Move to the next record.
return nil
}
// Check whether the inputs has been spent by a third party.
//
// NOTE: this check is only done for neutrino backend.
if t.isThirdPartySpent(r.tx.TxHash(), r.req.Inputs) {
failedRecords[requestID] = r
// This is the first time we see this record, so we put it in
// the initial queue.
if r.tx == nil {
initialRecords[requestID] = r
// Move to the next record.
return nil
}
// We can only get here when the inputs are not spent and a
// previous sweeping tx has been attempted. In this case we will
// perform an RBF on it in the current block.
feeBumpRecords[requestID] = r
// Return nil to move to the next record.
@ -932,7 +1042,6 @@ func (t *TxPublisher) processRecords() {
// For records that are confirmed, we'll notify the caller about this
// result.
for _, r := range confirmedRecords {
log.Debugf("Tx=%v is confirmed", r.tx.TxHash())
t.wg.Add(1)
go t.handleTxConfirmed(r)
}
@ -942,7 +1051,6 @@ func (t *TxPublisher) processRecords() {
// For records that are not confirmed, we perform a fee bump if needed.
for _, r := range feeBumpRecords {
log.Debugf("Attempting to fee bump Tx=%v", r.tx.TxHash())
t.wg.Add(1)
go t.handleFeeBumpTx(r, currentHeight)
}
@ -950,10 +1058,8 @@ func (t *TxPublisher) processRecords() {
// For records that are failed, we'll notify the caller about this
// result.
for _, r := range failedRecords {
log.Debugf("Tx=%v has inputs been spent by a third party, "+
"failing it now", r.tx.TxHash())
t.wg.Add(1)
go t.handleThirdPartySpent(r)
go t.handleUnknownSpent(r)
}
}
@ -964,6 +1070,8 @@ func (t *TxPublisher) processRecords() {
func (t *TxPublisher) handleTxConfirmed(r *monitorRecord) {
defer t.wg.Done()
log.Debugf("Record %v is spent in tx=%v", r.requestID, r.tx.TxHash())
// Create a result that will be sent to the resultChan which is
// listened by the caller.
result := &BumpResult{
@ -980,27 +1088,36 @@ func (t *TxPublisher) handleTxConfirmed(r *monitorRecord) {
// handleInitialTxError takes the error from `initializeTx` and decides the
// bump event. It will construct a BumpResult and handles it.
func (t *TxPublisher) handleInitialTxError(requestID uint64, err error) {
// We now decide what type of event to send.
var event BumpEvent
func (t *TxPublisher) handleInitialTxError(r *monitorRecord, err error) {
// Create a bump result to be sent to the sweeper.
result := &BumpResult{
Err: err,
requestID: r.requestID,
}
// We now decide what type of event to send.
switch {
// When the error is due to a dust output, we'll send a TxFailed so
// these inputs can be retried with a different group in the next
// block.
case errors.Is(err, ErrTxNoOutput):
event = TxFailed
result.Event = TxFailed
// When the error is due to budget being used up, we'll send a TxFailed
// so these inputs can be retried with a different group in the next
// block.
case errors.Is(err, ErrMaxPosition):
event = TxFailed
result.Event = TxFailed
// When the error is due to zero fee rate delta, we'll send a TxFailed
// so these inputs can be retried in the next block.
case errors.Is(err, ErrZeroFeeRateDelta):
event = TxFailed
result.Event = TxFailed
// When there are missing inputs, we'll create a TxUnknownSpend bump
// result here so the rest of the inputs can be retried.
case errors.Is(err, ErrInputMissing):
result = t.handleMissingInputs(r)
// Otherwise this is not a fee-related error and the tx cannot be
// retried. In that case we will fail ALL the inputs in this tx, which
@ -1010,13 +1127,7 @@ func (t *TxPublisher) handleInitialTxError(requestID uint64, err error) {
// TODO(yy): Find out which input is causing the failure and fail that
// one only.
default:
event = TxFatal
}
result := &BumpResult{
Event: event,
Err: err,
requestID: requestID,
result.Event = TxFatal
}
t.handleResult(result)
@ -1044,7 +1155,7 @@ func (t *TxPublisher) handleInitialBroadcast(r *monitorRecord) {
log.Errorf("Initial broadcast failed: %v", err)
// We now handle the initialization error and exit.
t.handleInitialTxError(r.requestID, err)
t.handleInitialTxError(r, err)
return
}
@ -1073,6 +1184,9 @@ func (t *TxPublisher) handleInitialBroadcast(r *monitorRecord) {
func (t *TxPublisher) handleFeeBumpTx(r *monitorRecord, currentHeight int32) {
defer t.wg.Done()
log.Debugf("Attempting to fee bump tx=%v in record %v", r.tx.TxHash(),
r.requestID)
oldTxid := r.tx.TxHash()
// Get the current conf target for this record.
@ -1110,29 +1224,93 @@ func (t *TxPublisher) handleFeeBumpTx(r *monitorRecord, currentHeight int32) {
})
}
// handleThirdPartySpent is called when the inputs in an unconfirmed tx is
// spent. It will notify the subscriber then remove the record from the maps
// and send a TxFailed event to the subscriber.
// handleUnknownSpent is called when the inputs are spent by a unknown tx. It
// will notify the subscriber then remove the record from the maps and send a
// TxUnknownSpend event to the subscriber.
//
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
func (t *TxPublisher) handleThirdPartySpent(r *monitorRecord) {
func (t *TxPublisher) handleUnknownSpent(r *monitorRecord) {
defer t.wg.Done()
// Create a result that will be sent to the resultChan which is
// listened by the caller.
//
// TODO(yy): create a new state `TxThirdPartySpent` to notify the
// sweeper to remove the input, hence moving the monitoring of inputs
// spent inside the fee bumper.
log.Debugf("Record %v has inputs spent by a tx unknown to the fee "+
"bumper, failing it now:\n%v", r.requestID,
inputTypeSummary(r.req.Inputs))
// Create a result that will be sent to the resultChan which is listened
// by the caller.
result := t.createUnknownSpentBumpResult(r)
// Notify the sweeper about this result in the end.
t.handleResult(result)
}
// createUnknownSpentBumpResult creates and returns a BumpResult given the
// monitored record has unknown spends.
func (t *TxPublisher) createUnknownSpentBumpResult(
r *monitorRecord) *BumpResult {
// Create a result that will be sent to the resultChan which is listened
// by the caller.
result := &BumpResult{
Event: TxFailed,
Tx: r.tx,
requestID: r.requestID,
Err: ErrThirdPartySpent,
Event: TxUnknownSpend,
Tx: r.tx,
requestID: r.requestID,
Err: ErrUnknownSpent,
SpentInputs: r.spentInputs,
}
// Notify that this tx is confirmed and remove the record from the map.
t.handleResult(result)
// Get the fee function, which will be used to decided the next fee rate
// to use if the sweeper decides to retry sweeping this input.
feeFunc := r.feeFunction
// When the record is failed before the initial broadcast is attempted,
// it will have a nil fee func. In this case, we'll create the fee func
// here.
//
// NOTE: Since the current record is failed and will be deleted, we
// don't need to update the record on this fee function. We only need
// the fee rate data so the sweeper can pick up where we left off.
if feeFunc == nil {
f, err := t.initializeFeeFunction(r.req)
// TODO(yy): The only error we would receive here is when the
// pkScript is not recognized by the weightEstimator. What we
// should do instead is to check the pkScript immediately after
// receiving a sweep request so we don't need to check it again,
// which will also save us from error checking from several
// callsites.
if err != nil {
log.Errorf("Failed to create fee func for record %v: "+
"%v", r.requestID, err)
// Overwrite the event and error so the sweeper will
// remove this input.
result.Event = TxFatal
result.Err = err
return result
}
feeFunc = f
}
// Since the sweeping tx has been replaced by another party's tx, we
// missed this block window to increase its fee rate. To make sure the
// fee rate stays in the initial line, we now ask the fee function to
// give us the next fee rate as if the sweeping tx were RBFed. This new
// fee rate will be used as the starting fee rate if the upper system
// decides to continue sweeping the rest of the inputs.
_, err := feeFunc.Increment()
if err != nil {
// The fee function has reached its max position - nothing we
// can do here other than letting the user increase the budget.
log.Errorf("Failed to calculate the next fee rate for "+
"Record(%v): %v", r.requestID, err)
}
// Attach the new fee rate to be used for the next sweeping attempt.
result.FeeRate = feeFunc.FeeRate()
return result
}
// createAndPublishTx creates a new tx with a higher fee rate and publishes it
@ -1149,48 +1327,12 @@ func (t *TxPublisher) createAndPublishTx(
// NOTE: The fee function is expected to have increased its returned
// fee rate after calling the SkipFeeBump method. So we can use it
// directly here.
sweepCtx, err := t.createAndCheckTx(r.req, r.feeFunction)
sweepCtx, err := t.createAndCheckTx(r)
// If the error is fee related, we will return no error and let the fee
// bumper retry it at next block.
//
// NOTE: we can check the RBF error here and ask the fee function to
// recalculate the fee rate. However, this would defeat the purpose of
// using a deadline based fee function:
// - if the deadline is far away, there's no rush to RBF the tx.
// - if the deadline is close, we expect the fee function to give us a
// higher fee rate. If the fee rate cannot satisfy the RBF rules, it
// means the budget is not enough.
if errors.Is(err, chain.ErrInsufficientFee) ||
errors.Is(err, lnwallet.ErrMempoolFee) {
log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
return fn.None[BumpResult]()
}
// If the error is not fee related, we will return a `TxFailed` event
// so this input can be retried.
// If there's an error creating the replacement tx, we need to abort the
// flow and handle it.
if err != nil {
// If the tx doesn't not have enought budget, we will return a
// result so the sweeper can handle it by re-clustering the
// utxos.
if errors.Is(err, ErrNotEnoughBudget) {
log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(),
err)
} else {
// Otherwise, an unexpected error occurred, we will
// fail the tx and let the sweeper retry the whole
// process.
log.Errorf("Failed to bump tx %v: %v", oldTx.TxHash(),
err)
}
return fn.Some(BumpResult{
Event: TxFailed,
Tx: oldTx,
Err: err,
requestID: r.requestID,
})
return t.handleReplacementTxError(r, oldTx, err)
}
// The tx has been created without any errors, we now register a new
@ -1214,7 +1356,9 @@ func (t *TxPublisher) createAndPublishTx(
if errors.Is(result.Err, chain.ErrInsufficientFee) ||
errors.Is(result.Err, lnwallet.ErrMempoolFee) {
log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(),
result.Err)
return fn.None[BumpResult]()
}
@ -1236,43 +1380,59 @@ func (t *TxPublisher) createAndPublishTx(
return fn.Some(*result)
}
// isConfirmed checks the btcwallet to see whether the tx is confirmed.
func (t *TxPublisher) isConfirmed(txid chainhash.Hash) bool {
details, err := t.cfg.Wallet.GetTransactionDetails(&txid)
if err != nil {
log.Warnf("Failed to get tx details for %v: %v", txid, err)
return false
// isUnknownSpent checks whether the inputs of the tx has already been spent by
// a tx not known to us. When a tx is not confirmed, yet its inputs has been
// spent, then it must be spent by a different tx other than the sweeping tx
// here.
func (t *TxPublisher) isUnknownSpent(r *monitorRecord,
spends map[wire.OutPoint]*wire.MsgTx) bool {
txid := r.tx.TxHash()
// Iterate all the spending txns and check if they match the sweeping
// tx.
for op, spendingTx := range spends {
spendingTxID := spendingTx.TxHash()
// If the spending tx is the same as the sweeping tx then we are
// good.
if spendingTxID == txid {
continue
}
log.Warnf("Detected unknown spend of input=%v in tx=%v", op,
spendingTx.TxHash())
return true
}
return details.NumConfirmations > 0
return false
}
// isThirdPartySpent checks whether the inputs of the tx has already been spent
// by a third party. When a tx is not confirmed, yet its inputs has been spent,
// then it must be spent by a different tx other than the sweeping tx here.
//
// NOTE: this check is only performed for neutrino backend as it has no
// reliable way to tell a tx has been replaced.
func (t *TxPublisher) isThirdPartySpent(txid chainhash.Hash,
inputs []input.Input) bool {
// getSpentInputs performs a non-blocking read on the spending subscriptions to
// see whether any of the monitored inputs has been spent. A map of inputs with
// their spending txns are returned if found.
func (t *TxPublisher) getSpentInputs(
r *monitorRecord) map[wire.OutPoint]*wire.MsgTx {
// Skip this check for if this is not neutrino backend.
if !t.isNeutrinoBackend() {
return false
}
// Create a slice to record the inputs spent.
spentInputs := make(map[wire.OutPoint]*wire.MsgTx, len(r.req.Inputs))
// Iterate all the inputs and check if they have been spent already.
for _, inp := range inputs {
for _, inp := range r.req.Inputs {
op := inp.OutPoint()
// For wallet utxos, the height hint is not set - we don't need
// to monitor them for third party spend.
//
// TODO(yy): We need to properly lock wallet utxos before
// skipping this check as the same wallet utxo can be used by
// different sweeping txns.
heightHint := inp.HeightHint()
if heightHint == 0 {
log.Debugf("Skipped third party check for wallet "+
"input %v", op)
continue
heightHint = uint32(t.currentHeight.Load())
log.Debugf("Checking wallet input %v using heightHint "+
"%v", op, heightHint)
}
// If the input has already been spent after the height hint, a
@ -1283,7 +1443,8 @@ func (t *TxPublisher) isThirdPartySpent(txid chainhash.Hash,
if err != nil {
log.Criticalf("Failed to register spend ntfn for "+
"input=%v: %v", op, err)
return false
return nil
}
// Remove the subscription when exit.
@ -1294,28 +1455,24 @@ func (t *TxPublisher) isThirdPartySpent(txid chainhash.Hash,
case spend, ok := <-spendEvent.Spend:
if !ok {
log.Debugf("Spend ntfn for %v canceled", op)
return false
}
spendingTxID := spend.SpendingTx.TxHash()
// If the spending tx is the same as the sweeping tx
// then we are good.
if spendingTxID == txid {
continue
}
log.Warnf("Detected third party spent of output=%v "+
"in tx=%v", op, spend.SpendingTx.TxHash())
spendingTx := spend.SpendingTx
return true
log.Debugf("Detected spent of input=%v in tx=%v", op,
spendingTx.TxHash())
spentInputs[op] = spendingTx
// Move to the next input.
default:
log.Tracef("Input %v not spent yet", op)
}
}
return false
return spentInputs
}
// calcCurrentConfTarget calculates the current confirmation target based on
@ -1670,3 +1827,59 @@ func prepareSweepTx(inputs []input.Input, changePkScript lnwallet.AddrWithKey,
return txFee, changeOutsOpt, locktimeOpt, nil
}
// handleReplacementTxError handles the error returned from creating the
// replacement tx. It returns a BumpResult that should be notified to the
// sweeper.
func (t *TxPublisher) handleReplacementTxError(r *monitorRecord,
oldTx *wire.MsgTx, err error) fn.Option[BumpResult] {
// If the error is fee related, we will return no error and let the fee
// bumper retry it at next block.
//
// NOTE: we can check the RBF error here and ask the fee function to
// recalculate the fee rate. However, this would defeat the purpose of
// using a deadline based fee function:
// - if the deadline is far away, there's no rush to RBF the tx.
// - if the deadline is close, we expect the fee function to give us a
// higher fee rate. If the fee rate cannot satisfy the RBF rules, it
// means the budget is not enough.
if errors.Is(err, chain.ErrInsufficientFee) ||
errors.Is(err, lnwallet.ErrMempoolFee) {
log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
return fn.None[BumpResult]()
}
// At least one of the inputs is missing, which means it has already
// been spent by another tx and confirmed. In this case we will handle
// it by returning a TxUnknownSpend bump result.
if errors.Is(err, ErrInputMissing) {
log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(), err)
bumpResult := t.handleMissingInputs(r)
return fn.Some(*bumpResult)
}
// If the error is not fee related, we will return a `TxFailed` event
// so this input can be retried.
result := fn.Some(BumpResult{
Event: TxFailed,
Tx: oldTx,
Err: err,
requestID: r.requestID,
})
// If the tx doesn't not have enought budget, we will return a result so
// the sweeper can handle it by re-clustering the utxos.
if errors.Is(err, ErrNotEnoughBudget) {
log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(), err)
return result
}
// Otherwise, an unexpected error occurred, we will log an error and let
// the sweeper retry the whole process.
log.Errorf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
return result
}

View file

@ -55,7 +55,7 @@ func createTestInput(value int64,
PubKey: testPubKey,
},
},
0,
1,
nil,
)
@ -504,9 +504,14 @@ func TestCreateAndCheckTx(t *testing.T) {
for _, tc := range testCases {
tc := tc
r := &monitorRecord{
req: tc.req,
feeFunction: m.feeFunc,
}
t.Run(tc.name, func(t *testing.T) {
// Call the method under test.
_, err := tp.createAndCheckTx(tc.req, m.feeFunc)
_, err := tp.createAndCheckTx(r)
// Check the result is as expected.
require.ErrorIs(t, err, tc.expectedErr)
@ -1481,60 +1486,163 @@ func TestHandleFeeBumpTx(t *testing.T) {
require.True(t, found)
}
// TestProcessRecords validates processRecords behaves as expected.
func TestProcessRecords(t *testing.T) {
// TestProcessRecordsInitial validates processRecords behaves as expected when
// processing the initial broadcast.
func TestProcessRecordsInitial(t *testing.T) {
t.Parallel()
// Create a publisher using the mocks.
tp, m := createTestPublisher(t)
// Create testing objects.
requestID1 := uint64(1)
req1 := createTestBumpRequest()
tx1 := &wire.MsgTx{LockTime: 1}
txid1 := tx1.TxHash()
requestID := uint64(1)
req := createTestBumpRequest()
op := req.Inputs[0].OutPoint()
requestID2 := uint64(2)
req2 := createTestBumpRequest()
tx2 := &wire.MsgTx{LockTime: 2}
txid2 := tx2.TxHash()
// Create a monitor record that's confirmed.
recordConfirmed := &monitorRecord{
requestID: requestID1,
req: req1,
feeFunction: m.feeFunc,
tx: tx1,
// Mock RegisterSpendNtfn.
//
// Create the spending event that doesn't send an event.
se := &chainntnfs.SpendEvent{
Cancel: func() {},
}
m.wallet.On("GetTransactionDetails", &txid1).Return(
&lnwallet.TransactionDetail{
NumConfirmations: 1,
}, nil,
).Once()
m.notifier.On("RegisterSpendNtfn",
&op, mock.Anything, mock.Anything).Return(se, nil).Once()
// Create a monitor record that's not confirmed. We know it's not
// confirmed because the num of confirms is zero.
recordFeeBump := &monitorRecord{
requestID: requestID2,
req: req2,
feeFunction: m.feeFunc,
tx: tx2,
// Create a monitor record that's broadcast the first time.
record := &monitorRecord{
requestID: requestID,
req: req,
}
m.wallet.On("GetTransactionDetails", &txid2).Return(
&lnwallet.TransactionDetail{
NumConfirmations: 0,
}, nil,
).Once()
m.wallet.On("BackEnd").Return("test-backend").Once()
// Setup the initial publisher state by adding the records to the maps.
subscriberConfirmed := make(chan *BumpResult, 1)
tp.subscriberChans.Store(requestID1, subscriberConfirmed)
tp.records.Store(requestID1, recordConfirmed)
subscriber := make(chan *BumpResult, 1)
tp.subscriberChans.Store(requestID, subscriber)
tp.records.Store(requestID, record)
subscriberReplaced := make(chan *BumpResult, 1)
tp.subscriberChans.Store(requestID2, subscriberReplaced)
tp.records.Store(requestID2, recordFeeBump)
// The following methods should only be called once when creating the
// initial broadcast tx.
//
// Mock the signer to always return a valid script.
m.signer.On("ComputeInputScript", mock.Anything,
mock.Anything).Return(&input.Script{}, nil).Once()
// Mock the testmempoolaccept to return nil.
m.wallet.On("CheckMempoolAcceptance", mock.Anything).Return(nil).Once()
// Mock the wallet to publish successfully.
m.wallet.On("PublishTransaction",
mock.Anything, mock.Anything).Return(nil).Once()
// Call processRecords and expect the results are notified back.
tp.processRecords()
// We expect the published tx to be notified back.
select {
case <-time.After(time.Second):
t.Fatal("timeout waiting for subscriber")
case result := <-subscriber:
// We expect the result to be TxPublished.
require.Equal(t, TxPublished, result.Event)
// Expect the tx to be set but not the replaced tx.
require.NotNil(t, result.Tx)
require.Nil(t, result.ReplacedTx)
// No error should be set.
require.Nil(t, result.Err)
require.Equal(t, requestID, result.requestID)
}
}
// TestProcessRecordsInitialSpent validates processRecords behaves as expected
// when processing the initial broadcast when the input is spent.
func TestProcessRecordsInitialSpent(t *testing.T) {
t.Parallel()
// Create a publisher using the mocks.
tp, m := createTestPublisher(t)
// Create testing objects.
requestID := uint64(1)
req := createTestBumpRequest()
tx := &wire.MsgTx{LockTime: 1}
op := req.Inputs[0].OutPoint()
// Mock RegisterSpendNtfn.
se := createTestSpendEvent(tx)
m.notifier.On("RegisterSpendNtfn",
&op, mock.Anything, mock.Anything).Return(se, nil).Once()
// Create a monitor record that's broadcast the first time.
record := &monitorRecord{
requestID: requestID,
req: req,
}
// Setup the initial publisher state by adding the records to the maps.
subscriber := make(chan *BumpResult, 1)
tp.subscriberChans.Store(requestID, subscriber)
tp.records.Store(requestID, record)
// Call processRecords and expect the results are notified back.
tp.processRecords()
// We expect the published tx to be notified back.
select {
case <-time.After(time.Second):
t.Fatal("timeout waiting for subscriber")
case result := <-subscriber:
// We expect the result to be TxUnknownSpend.
require.Equal(t, TxUnknownSpend, result.Event)
// Expect the tx and the replaced tx to be nil.
require.Nil(t, result.Tx)
require.Nil(t, result.ReplacedTx)
// The error should be set.
require.ErrorIs(t, result.Err, ErrUnknownSpent)
require.Equal(t, requestID, result.requestID)
}
}
// TestProcessRecordsFeeBump validates processRecords behaves as expected when
// processing fee bump records.
func TestProcessRecordsFeeBump(t *testing.T) {
t.Parallel()
// Create a publisher using the mocks.
tp, m := createTestPublisher(t)
// Create testing objects.
requestID := uint64(1)
req := createTestBumpRequest()
tx := &wire.MsgTx{LockTime: 1}
op := req.Inputs[0].OutPoint()
// Mock RegisterSpendNtfn.
//
// Create the spending event that doesn't send an event.
se := &chainntnfs.SpendEvent{
Cancel: func() {},
}
m.notifier.On("RegisterSpendNtfn",
&op, mock.Anything, mock.Anything).Return(se, nil).Once()
// Create a monitor record that's not confirmed. We know it's not
// confirmed because the `SpendEvent` is empty.
record := &monitorRecord{
requestID: requestID,
req: req,
feeFunction: m.feeFunc,
tx: tx,
}
// Setup the initial publisher state by adding the records to the maps.
subscriber := make(chan *BumpResult, 1)
tp.subscriberChans.Store(requestID, subscriber)
tp.records.Store(requestID, record)
// Create a test feerate and return it from the mock fee function.
feerate := chainfee.SatPerKWeight(1000)
@ -1560,40 +1668,141 @@ func TestProcessRecords(t *testing.T) {
// Call processRecords and expect the results are notified back.
tp.processRecords()
// We expect two results to be received. One for the confirmed tx and
// one for the replaced tx.
//
// Check the confirmed tx result.
select {
case <-time.After(time.Second):
t.Fatal("timeout waiting for subscriberConfirmed")
case result := <-subscriberConfirmed:
// We expect the result to be TxConfirmed.
require.Equal(t, TxConfirmed, result.Event)
require.Equal(t, tx1, result.Tx)
// No error should be set.
require.Nil(t, result.Err)
require.Equal(t, requestID1, result.requestID)
}
// Now check the replaced tx result.
// We expect the replaced tx to be notified back.
select {
case <-time.After(time.Second):
t.Fatal("timeout waiting for subscriberReplaced")
case result := <-subscriberReplaced:
case result := <-subscriber:
// We expect the result to be TxReplaced.
require.Equal(t, TxReplaced, result.Event)
// The new tx and old tx should be properly set.
require.NotEqual(t, tx2, result.Tx)
require.Equal(t, tx2, result.ReplacedTx)
require.NotEqual(t, tx, result.Tx)
require.Equal(t, tx, result.ReplacedTx)
// No error should be set.
require.Nil(t, result.Err)
require.Equal(t, requestID2, result.requestID)
require.Equal(t, requestID, result.requestID)
}
}
// TestProcessRecordsConfirmed validates processRecords behaves as expected when
// processing confirmed records.
func TestProcessRecordsConfirmed(t *testing.T) {
t.Parallel()
// Create a publisher using the mocks.
tp, m := createTestPublisher(t)
// Create testing objects.
requestID := uint64(1)
req := createTestBumpRequest()
tx := &wire.MsgTx{LockTime: 1}
op := req.Inputs[0].OutPoint()
// Mock RegisterSpendNtfn.
se := createTestSpendEvent(tx)
m.notifier.On("RegisterSpendNtfn",
&op, mock.Anything, mock.Anything).Return(se, nil).Once()
// Create a monitor record that's confirmed.
recordConfirmed := &monitorRecord{
requestID: requestID,
req: req,
feeFunction: m.feeFunc,
tx: tx,
}
// Setup the initial publisher state by adding the records to the maps.
subscriber := make(chan *BumpResult, 1)
tp.subscriberChans.Store(requestID, subscriber)
tp.records.Store(requestID, recordConfirmed)
// Create a test feerate and return it from the mock fee function.
feerate := chainfee.SatPerKWeight(1000)
m.feeFunc.On("FeeRate").Return(feerate)
// Call processRecords and expect the results are notified back.
tp.processRecords()
// Check the confirmed tx result.
select {
case <-time.After(time.Second):
t.Fatal("timeout waiting for subscriber")
case result := <-subscriber:
// We expect the result to be TxConfirmed.
require.Equal(t, TxConfirmed, result.Event)
require.Equal(t, tx, result.Tx)
// No error should be set.
require.Nil(t, result.Err)
require.Equal(t, requestID, result.requestID)
}
}
// TestProcessRecordsSpent validates processRecords behaves as expected when
// processing unknown spent records.
func TestProcessRecordsSpent(t *testing.T) {
t.Parallel()
// Create a publisher using the mocks.
tp, m := createTestPublisher(t)
// Create testing objects.
requestID := uint64(1)
req := createTestBumpRequest()
tx := &wire.MsgTx{LockTime: 1}
op := req.Inputs[0].OutPoint()
// Create a unknown tx.
txUnknown := &wire.MsgTx{LockTime: 2}
// Mock RegisterSpendNtfn.
se := createTestSpendEvent(txUnknown)
m.notifier.On("RegisterSpendNtfn",
&op, mock.Anything, mock.Anything).Return(se, nil).Once()
// Create a monitor record that's spent by txUnknown.
recordConfirmed := &monitorRecord{
requestID: requestID,
req: req,
feeFunction: m.feeFunc,
tx: tx,
}
// Setup the initial publisher state by adding the records to the maps.
subscriber := make(chan *BumpResult, 1)
tp.subscriberChans.Store(requestID, subscriber)
tp.records.Store(requestID, recordConfirmed)
// Mock the fee function to increase feerate.
m.feeFunc.On("Increment").Return(true, nil).Once()
// Create a test feerate and return it from the mock fee function.
feerate := chainfee.SatPerKWeight(1000)
m.feeFunc.On("FeeRate").Return(feerate)
// Call processRecords and expect the results are notified back.
tp.processRecords()
// Check the unknown tx result.
select {
case <-time.After(time.Second):
t.Fatal("timeout waiting for subscriber")
case result := <-subscriber:
// We expect the result to be TxUnknownSpend.
require.Equal(t, TxUnknownSpend, result.Event)
require.Equal(t, tx, result.Tx)
// We expect the fee rate to be updated.
require.Equal(t, feerate, result.FeeRate)
// No error should be set.
require.ErrorIs(t, result.Err, ErrUnknownSpent)
require.Equal(t, requestID, result.requestID)
}
}
@ -1776,3 +1985,126 @@ func TestHandleInitialBroadcastFail(t *testing.T) {
require.Equal(t, 0, tp.records.Len())
require.Equal(t, 0, tp.subscriberChans.Len())
}
// TestHasInputsSpent checks the expected outpoint:tx map is returned.
func TestHasInputsSpent(t *testing.T) {
t.Parallel()
// Create a publisher using the mocks.
tp, m := createTestPublisher(t)
// Create mock inputs.
op1 := wire.OutPoint{
Hash: chainhash.Hash{1},
Index: 1,
}
inp1 := &input.MockInput{}
heightHint1 := uint32(1)
defer inp1.AssertExpectations(t)
op2 := wire.OutPoint{
Hash: chainhash.Hash{1},
Index: 2,
}
inp2 := &input.MockInput{}
heightHint2 := uint32(2)
defer inp2.AssertExpectations(t)
op3 := wire.OutPoint{
Hash: chainhash.Hash{1},
Index: 3,
}
walletInp := &input.MockInput{}
heightHint3 := uint32(0)
defer walletInp.AssertExpectations(t)
// We expect all the inputs to call OutPoint and HeightHint.
inp1.On("OutPoint").Return(op1).Once()
inp2.On("OutPoint").Return(op2).Once()
walletInp.On("OutPoint").Return(op3).Once()
inp1.On("HeightHint").Return(heightHint1).Once()
inp2.On("HeightHint").Return(heightHint2).Once()
walletInp.On("HeightHint").Return(heightHint3).Once()
// We expect the normal inputs to call SignDesc.
pkScript1 := []byte{1}
sd1 := &input.SignDescriptor{
Output: &wire.TxOut{
PkScript: pkScript1,
},
}
inp1.On("SignDesc").Return(sd1).Once()
pkScript2 := []byte{1}
sd2 := &input.SignDescriptor{
Output: &wire.TxOut{
PkScript: pkScript2,
},
}
inp2.On("SignDesc").Return(sd2).Once()
pkScript3 := []byte{3}
sd3 := &input.SignDescriptor{
Output: &wire.TxOut{
PkScript: pkScript3,
},
}
walletInp.On("SignDesc").Return(sd3).Once()
// Mock RegisterSpendNtfn.
//
// spendingTx1 is the tx spending op1.
spendingTx1 := &wire.MsgTx{}
se1 := createTestSpendEvent(spendingTx1)
m.notifier.On("RegisterSpendNtfn",
&op1, pkScript1, heightHint1).Return(se1, nil).Once()
// Create the spending event that doesn't send an event.
se2 := &chainntnfs.SpendEvent{
Cancel: func() {},
}
m.notifier.On("RegisterSpendNtfn",
&op2, pkScript2, heightHint2).Return(se2, nil).Once()
se3 := &chainntnfs.SpendEvent{
Cancel: func() {},
}
m.notifier.On("RegisterSpendNtfn",
&op3, pkScript3, heightHint3).Return(se3, nil).Once()
// Prepare the test inputs.
inputs := []input.Input{inp1, inp2, walletInp}
// Prepare the test record.
record := &monitorRecord{
req: &BumpRequest{
Inputs: inputs,
},
}
// Call the method under test.
result := tp.getSpentInputs(record)
// Assert the expected map is created.
expected := map[wire.OutPoint]*wire.MsgTx{
op1: spendingTx1,
}
require.Equal(t, expected, result)
}
// createTestSpendEvent creates a SpendEvent which places the specified tx in
// the channel, which can be read by a spending subscriber.
func createTestSpendEvent(tx *wire.MsgTx) *chainntnfs.SpendEvent {
// Create a monitor record that's confirmed.
spendDetails := chainntnfs.SpendDetail{
SpendingTx: tx,
}
spendChan1 := make(chan *chainntnfs.SpendDetail, 1)
spendChan1 <- &spendDetails
// Create the spend events.
return &chainntnfs.SpendEvent{
Spend: spendChan1,
Cancel: func() {},
}
}

View file

@ -24,10 +24,10 @@ func NewMockSweeperStore() *MockSweeperStore {
}
// IsOurTx determines whether a tx is published by us, based on its hash.
func (s *MockSweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) {
func (s *MockSweeperStore) IsOurTx(hash chainhash.Hash) bool {
args := s.Called(hash)
return args.Bool(0), args.Error(1)
return args.Bool(0)
}
// StoreTx stores a tx we are about to publish.

View file

@ -121,7 +121,7 @@ func deserializeTxRecord(r io.Reader) (*TxRecord, error) {
type SweeperStore interface {
// IsOurTx determines whether a tx is published by us, based on its
// hash.
IsOurTx(hash chainhash.Hash) (bool, error)
IsOurTx(hash chainhash.Hash) bool
// StoreTx stores a tx hash we are about to publish.
StoreTx(*TxRecord) error
@ -276,15 +276,17 @@ func (s *sweeperStore) StoreTx(tr *TxRecord) error {
}, func() {})
}
// IsOurTx determines whether a tx is published by us, based on its
// hash.
func (s *sweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) {
// IsOurTx determines whether a tx is published by us, based on its hash.
func (s *sweeperStore) IsOurTx(hash chainhash.Hash) bool {
var ours bool
err := kvdb.View(s.db, func(tx kvdb.RTx) error {
txHashesBucket := tx.ReadBucket(txHashesBucketKey)
// If the root bucket cannot be found, we consider the tx to be
// not found in our db.
if txHashesBucket == nil {
return errNoTxHashesBucket
log.Error("Tx hashes bucket not found in sweeper store")
return nil
}
ours = txHashesBucket.Get(hash[:]) != nil
@ -294,10 +296,10 @@ func (s *sweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) {
ours = false
})
if err != nil {
return false, err
return false
}
return ours, nil
return ours
}
// ListSweeps lists all the sweep transactions we have in the sweeper store.

View file

@ -57,18 +57,15 @@ func TestStore(t *testing.T) {
require.NoError(t, err)
// Assert that both txes are recognized as our own.
ours, err := store.IsOurTx(tx1.TxHash())
require.NoError(t, err)
ours := store.IsOurTx(tx1.TxHash())
require.True(t, ours, "expected tx to be ours")
ours, err = store.IsOurTx(tx2.TxHash())
require.NoError(t, err)
ours = store.IsOurTx(tx2.TxHash())
require.True(t, ours, "expected tx to be ours")
// An different hash should be reported as not being ours.
var unknownHash chainhash.Hash
ours, err = store.IsOurTx(unknownHash)
require.NoError(t, err)
ours = store.IsOurTx(unknownHash)
require.False(t, ours, "expected tx to not be ours")
txns, err := store.ListSweeps()

View file

@ -1230,21 +1230,26 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error {
}
// This is a new input, and we want to query the mempool to see if this
// input has already been spent. If so, we'll start the input with
// state Published and attach the RBFInfo.
state, rbfInfo := s.decideStateAndRBFInfo(input.input.OutPoint())
// input has already been spent. If so, we'll start the input with the
// RBFInfo.
rbfInfo := s.decideRBFInfo(input.input.OutPoint())
// Create a new pendingInput and initialize the listeners slice with
// the passed in result channel. If this input is offered for sweep
// again, the result channel will be appended to this slice.
pi = &SweeperInput{
state: state,
state: Init,
listeners: []chan Result{input.resultChan},
Input: input.input,
params: input.params,
rbf: rbfInfo,
}
// Set the starting fee rate if a previous sweeping tx is found.
rbfInfo.WhenSome(func(info RBFInfo) {
pi.params.StartingFeeRate = fn.Some(info.FeeRate)
})
// Set the acutal deadline height.
pi.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr(
s.calculateDefaultDeadline(pi),
@ -1267,7 +1272,7 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error {
)
if err != nil {
err := fmt.Errorf("wait for spend: %w", err)
s.markInputFatal(pi, err)
s.markInputFatal(pi, nil, err)
return err
}
@ -1277,13 +1282,12 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error {
return nil
}
// decideStateAndRBFInfo queries the mempool to see whether the given input has
// already been spent. If so, the state Published will be returned, otherwise
// state Init. When spent, it will query the sweeper store to fetch the fee
// info of the spending transction, and construct an RBFInfo based on it.
// Suppose an error occurs, fn.None is returned.
func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
SweepState, fn.Option[RBFInfo]) {
// decideRBFInfo queries the mempool to see whether the given input has already
// been spent. When spent, it will query the sweeper store to fetch the fee info
// of the spending transction, and construct an RBFInfo based on it. Suppose an
// error occurs, fn.None is returned.
func (s *UtxoSweeper) decideRBFInfo(
op wire.OutPoint) fn.Option[RBFInfo] {
// Check if we can find the spending tx of this input in mempool.
txOption := s.mempoolLookup(op)
@ -1301,7 +1305,7 @@ func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
// - for neutrino we don't have a mempool.
// - for btcd below v0.24.1 we don't have `gettxspendingprevout`.
if tx == nil {
return Init, fn.None[RBFInfo]()
return fn.None[RBFInfo]()
}
// Otherwise the input is already spent in the mempool, so eventually
@ -1313,12 +1317,15 @@ func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
txid := tx.TxHash()
tr, err := s.cfg.Store.GetTx(txid)
log.Debugf("Found spending tx %v in mempool for input %v", tx.TxHash(),
op)
// If the tx is not found in the store, it means it's not broadcast by
// us, hence we can't find the fee info. This is fine as, later on when
// this tx is confirmed, we will remove the input from our inputs.
if errors.Is(err, ErrTxNotFound) {
log.Warnf("Spending tx %v not found in sweeper store", txid)
return Published, fn.None[RBFInfo]()
return fn.None[RBFInfo]()
}
// Exit if we get an db error.
@ -1326,7 +1333,7 @@ func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
log.Errorf("Unable to get tx %v from sweeper store: %v",
txid, err)
return Published, fn.None[RBFInfo]()
return fn.None[RBFInfo]()
}
// Prepare the fee info and return it.
@ -1336,7 +1343,7 @@ func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
})
return Published, rbf
return rbf
}
// handleExistingInput processes an input that is already known to the sweeper.
@ -1387,12 +1394,7 @@ func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage,
func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail) {
// Query store to find out if we ever published this tx.
spendHash := *spend.SpenderTxHash
isOurTx, err := s.cfg.Store.IsOurTx(spendHash)
if err != nil {
log.Errorf("cannot determine if tx %v is ours: %v",
spendHash, err)
return
}
isOurTx := s.cfg.Store.IsOurTx(spendHash)
// If this isn't our transaction, it means someone else swept outputs
// that we were attempting to sweep. This can happen for anchor outputs
@ -1482,12 +1484,17 @@ func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) {
// markInputFatal marks the given input as fatal and won't be retried. It
// will also notify all the subscribers of this input.
func (s *UtxoSweeper) markInputFatal(pi *SweeperInput, err error) {
func (s *UtxoSweeper) markInputFatal(pi *SweeperInput, tx *wire.MsgTx,
err error) {
log.Errorf("Failed to sweep input: %v, error: %v", pi, err)
pi.state = Fatal
s.signalResult(pi, Result{Err: err})
s.signalResult(pi, Result{
Tx: tx,
Err: err,
})
}
// updateSweeperInputs updates the sweeper's internal state and returns a map
@ -1819,7 +1826,7 @@ func (s *UtxoSweeper) markInputsFatal(set InputSet, err error) {
continue
}
s.markInputFatal(input, err)
s.markInputFatal(input, nil, err)
}
}
@ -1847,6 +1854,12 @@ func (s *UtxoSweeper) handleBumpEvent(r *bumpResp) error {
case TxReplaced:
return s.handleBumpEventTxReplaced(r)
// There are inputs being spent in a tx which the fee bumper doesn't
// understand. We will remove the tx from the sweeper db and mark the
// inputs as swept.
case TxUnknownSpend:
s.handleBumpEventTxUnknownSpend(r)
// There's a fatal error in creating the tx, we will remove the tx from
// the sweeper db and mark the inputs as failed.
case TxFatal:
@ -1861,20 +1874,139 @@ func (s *UtxoSweeper) handleBumpEvent(r *bumpResp) error {
// NOTE: It is enough to check the txid because the sweeper will create
// outpoints which solely belong to the internal LND wallet.
func (s *UtxoSweeper) IsSweeperOutpoint(op wire.OutPoint) bool {
found, err := s.cfg.Store.IsOurTx(op.Hash)
// In case there is an error fetching the transaction details from the
// sweeper store we assume the outpoint is still used by the sweeper
// (worst case scenario).
//
// TODO(ziggie): Ensure that confirmed outpoints are deleted from the
// bucket.
if err != nil && !errors.Is(err, errNoTxHashesBucket) {
log.Errorf("failed to fetch info for outpoint(%v:%d) "+
"with: %v, we assume it is still in use by the sweeper",
op.Hash, op.Index, err)
return s.cfg.Store.IsOurTx(op.Hash)
}
return true
// markInputSwept marks the given input as swept by the tx. It will also notify
// all the subscribers of this input.
func (s *UtxoSweeper) markInputSwept(inp *SweeperInput, tx *wire.MsgTx) {
log.Debugf("Marking input as swept: %v from state=%v", inp.OutPoint(),
inp.state)
inp.state = Swept
// Signal result channels.
s.signalResult(inp, Result{
Tx: tx,
})
// Remove all other inputs in this exclusive group.
if inp.params.ExclusiveGroup != nil {
s.removeExclusiveGroup(*inp.params.ExclusiveGroup)
}
}
// handleUnknownSpendTx takes an input and its spending tx. If the spending tx
// cannot be found in the sweeper store, the input will be marked as fatal,
// otherwise it will be marked as swept.
func (s *UtxoSweeper) handleUnknownSpendTx(inp *SweeperInput, tx *wire.MsgTx) {
op := inp.OutPoint()
txid := tx.TxHash()
isOurTx := s.cfg.Store.IsOurTx(txid)
// If this is our tx, it means it's a previous sweeping tx that got
// confirmed, which could happen when a restart happens during the
// sweeping process.
if isOurTx {
log.Debugf("Found our sweeping tx %v, marking input %v as "+
"swept", txid, op)
// We now use the spending tx to update the state of the inputs.
s.markInputSwept(inp, tx)
return
}
return found
// Since the input is spent by others, we now mark it as fatal and won't
// be retried.
s.markInputFatal(inp, tx, ErrRemoteSpend)
log.Debugf("Removing descendant txns invalidated by (txid=%v): %v",
txid, lnutils.SpewLogClosure(tx))
// Construct a map of the inputs this transaction spends.
spentInputs := make(map[wire.OutPoint]struct{}, len(tx.TxIn))
for _, txIn := range tx.TxIn {
spentInputs[txIn.PreviousOutPoint] = struct{}{}
}
err := s.removeConflictSweepDescendants(spentInputs)
if err != nil {
log.Warnf("unable to remove descendant transactions "+
"due to tx %v: ", txid)
}
}
// handleBumpEventTxUnknownSpend handles the case where the confirmed tx is
// unknown to the fee bumper. In the case when the sweeping tx has been replaced
// by another party with their tx being confirmed. It will retry sweeping the
// "good" inputs once the "bad" ones are kicked out.
func (s *UtxoSweeper) handleBumpEventTxUnknownSpend(r *bumpResp) {
// Mark the inputs as publish failed, which means they will be retried
// later.
s.markInputsPublishFailed(r.set)
// Get all the inputs that are not spent in the current sweeping tx.
spentInputs := r.result.SpentInputs
// Create a slice to track inputs to be retried.
inputsToRetry := make([]input.Input, 0, len(r.set.Inputs()))
// Iterate all the inputs found in this bump and mark the ones spent by
// the third party as failed. The rest of inputs will then be updated
// with a new fee rate and be retried immediately.
for _, inp := range r.set.Inputs() {
op := inp.OutPoint()
input, ok := s.inputs[op]
// Wallet inputs are not tracked so we will not find them from
// the inputs map.
if !ok {
log.Debugf("Skipped marking input: %v not found in "+
"pending inputs", op)
continue
}
// Check whether this input has been spent, if so we mark it as
// fatal or swept based on whether this is one of our previous
// sweeping txns, then move to the next.
tx, spent := spentInputs[op]
if spent {
s.handleUnknownSpendTx(input, tx)
continue
}
log.Debugf("Input(%v): updating params: starting fee rate "+
"[%v -> %v], immediate [%v -> true]", op,
input.params.StartingFeeRate, r.result.FeeRate,
input.params.Immediate)
// Update the input using the fee rate specified from the
// BumpResult, which should be the starting fee rate to use for
// the next sweeping attempt.
input.params.StartingFeeRate = fn.Some(r.result.FeeRate)
input.params.Immediate = true
inputsToRetry = append(inputsToRetry, input)
}
// Exit early if there are no inputs to be retried.
if len(inputsToRetry) == 0 {
return
}
log.Debugf("Retry sweeping inputs with updated params: %v",
inputTypeSummary(inputsToRetry))
// Get the latest inputs, which should put the PublishFailed inputs back
// to the sweeping queue.
inputs := s.updateSweeperInputs()
// Immediately sweep the remaining inputs - the previous inputs should
// now be swept with the updated StartingFeeRate immediately. We may
// also include more inputs in the new sweeping tx if new ones with the
// same deadline are offered.
s.sweepPendingInputs(inputs)
}

View file

@ -497,10 +497,9 @@ func TestUpdateSweeperInputs(t *testing.T) {
require.Equal(expectedInputs, s.inputs)
}
// TestDecideStateAndRBFInfo checks that the expected state and RBFInfo are
// returned based on whether this input can be found both in mempool and the
// sweeper store.
func TestDecideStateAndRBFInfo(t *testing.T) {
// TestDecideRBFInfo checks that the expected RBFInfo is returned based on
// whether this input can be found both in mempool and the sweeper store.
func TestDecideRBFInfo(t *testing.T) {
t.Parallel()
require := require.New(t)
@ -524,11 +523,9 @@ func TestDecideStateAndRBFInfo(t *testing.T) {
mockMempool.On("LookupInputMempoolSpend", op).Return(
fn.None[wire.MsgTx]()).Once()
// Since the mempool lookup failed, we exepect state Init and no
// RBFInfo.
state, rbf := s.decideStateAndRBFInfo(op)
// Since the mempool lookup failed, we expect no RBFInfo.
rbf := s.decideRBFInfo(op)
require.True(rbf.IsNone())
require.Equal(Init, state)
// Mock the mempool lookup to return a tx three times as we are calling
// attachAvailableRBFInfo three times.
@ -539,19 +536,17 @@ func TestDecideStateAndRBFInfo(t *testing.T) {
// Mock the store to return an error saying the tx cannot be found.
mockStore.On("GetTx", tx.TxHash()).Return(nil, ErrTxNotFound).Once()
// Although the db lookup failed, we expect the state to be Published.
state, rbf = s.decideStateAndRBFInfo(op)
// The db lookup failed, we expect no RBFInfo.
rbf = s.decideRBFInfo(op)
require.True(rbf.IsNone())
require.Equal(Published, state)
// Mock the store to return a db error.
dummyErr := errors.New("dummy error")
mockStore.On("GetTx", tx.TxHash()).Return(nil, dummyErr).Once()
// Although the db lookup failed, we expect the state to be Published.
state, rbf = s.decideStateAndRBFInfo(op)
// The db lookup failed, we expect no RBFInfo.
rbf = s.decideRBFInfo(op)
require.True(rbf.IsNone())
require.Equal(Published, state)
// Mock the store to return a record.
tr := &TxRecord{
@ -561,7 +556,7 @@ func TestDecideStateAndRBFInfo(t *testing.T) {
mockStore.On("GetTx", tx.TxHash()).Return(tr, nil).Once()
// Call the method again.
state, rbf = s.decideStateAndRBFInfo(op)
rbf = s.decideRBFInfo(op)
// Assert that the RBF info is returned.
rbfInfo := fn.Some(RBFInfo{
@ -570,9 +565,6 @@ func TestDecideStateAndRBFInfo(t *testing.T) {
FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
})
require.Equal(rbfInfo, rbf)
// Assert the state is updated.
require.Equal(Published, state)
}
// TestMarkInputFatal checks that the input is marked as expected.
@ -596,7 +588,7 @@ func TestMarkInputFailed(t *testing.T) {
}
// Call the method under test.
s.markInputFatal(pi, errors.New("dummy error"))
s.markInputFatal(pi, nil, errors.New("dummy error"))
// Assert the state is updated.
require.Equal(t, Fatal, pi.state)
@ -1199,3 +1191,246 @@ func TestHandleBumpEventTxFatal(t *testing.T) {
err = s.handleBumpEventTxFatal(resp)
rt.NoError(err)
}
// TestHandleUnknownSpendTxOurs checks that `handleUnknownSpendTx` correctly
// marks an input as swept given the tx is ours.
func TestHandleUnknownSpendTxOurs(t *testing.T) {
t.Parallel()
// Create a mock store.
store := &MockSweeperStore{}
defer store.AssertExpectations(t)
// Create a mock input set.
set := &MockInputSet{}
defer set.AssertExpectations(t)
// Create a test sweeper.
s := New(&UtxoSweeperConfig{
Store: store,
})
// Create a mock input.
inp := createMockInput(t, s, PublishFailed)
op := inp.OutPoint()
si, ok := s.inputs[op]
require.True(t, ok)
// Create a testing tx that spends the input.
tx := &wire.MsgTx{
LockTime: 1,
TxIn: []*wire.TxIn{
{PreviousOutPoint: op},
},
}
txid := tx.TxHash()
// Mock the store to return true when calling IsOurTx.
store.On("IsOurTx", txid).Return(true).Once()
// Call the method under test.
s.handleUnknownSpendTx(si, tx)
// Assert the state of the input is updated.
require.Equal(t, Swept, s.inputs[op].state)
}
// TestHandleUnknownSpendTxThirdParty checks that `handleUnknownSpendTx`
// correctly marks an input as fatal given the tx is not ours.
func TestHandleInputSpendTxThirdParty(t *testing.T) {
t.Parallel()
// Create a mock store.
store := &MockSweeperStore{}
defer store.AssertExpectations(t)
// Create a mock input set.
set := &MockInputSet{}
defer set.AssertExpectations(t)
// Create a test sweeper.
s := New(&UtxoSweeperConfig{
Store: store,
})
// Create a mock input.
inp := createMockInput(t, s, PublishFailed)
op := inp.OutPoint()
si, ok := s.inputs[op]
require.True(t, ok)
// Create a testing tx that spends the input.
tx := &wire.MsgTx{
LockTime: 1,
TxIn: []*wire.TxIn{
{PreviousOutPoint: op},
},
}
txid := tx.TxHash()
// Mock the store to return false when calling IsOurTx.
store.On("IsOurTx", txid).Return(false).Once()
// Mock `ListSweeps` to return an empty slice as we are testing the
// workflow here, not the method `removeConflictSweepDescendants`.
store.On("ListSweeps").Return([]chainhash.Hash{}, nil).Once()
// Call the method under test.
s.handleUnknownSpendTx(si, tx)
// Assert the state of the input is updated.
require.Equal(t, Fatal, s.inputs[op].state)
}
// TestHandleBumpEventTxUnknownSpendNoRetry checks the case when all the inputs
// are failed due to them being spent by another party.
func TestHandleBumpEventTxUnknownSpendNoRetry(t *testing.T) {
t.Parallel()
// Create a mock store.
store := &MockSweeperStore{}
defer store.AssertExpectations(t)
// Create a mock input set.
set := &MockInputSet{}
defer set.AssertExpectations(t)
// Create a test sweeper.
s := New(&UtxoSweeperConfig{
Store: store,
})
// Create a mock input.
inp := createMockInput(t, s, PendingPublish)
set.On("Inputs").Return([]input.Input{inp})
op := inp.OutPoint()
// Create a testing tx that spends the input.
tx := &wire.MsgTx{
LockTime: 1,
TxIn: []*wire.TxIn{
{PreviousOutPoint: op},
},
}
txid := tx.TxHash()
// Create a testing bump result.
br := &BumpResult{
Tx: tx,
Event: TxUnknownSpend,
SpentInputs: map[wire.OutPoint]*wire.MsgTx{
op: tx,
},
}
// Create a testing bump response.
resp := &bumpResp{
result: br,
set: set,
}
// Mock the store to return true when calling IsOurTx.
store.On("IsOurTx", txid).Return(true).Once()
// Call the method under test.
s.handleBumpEventTxUnknownSpend(resp)
// Assert the state of the input is updated.
require.Equal(t, Swept, s.inputs[op].state)
}
// TestHandleBumpEventTxUnknownSpendWithRetry checks the case when some the
// inputs are retried after the bad inputs are filtered out.
func TestHandleBumpEventTxUnknownSpendWithRetry(t *testing.T) {
t.Parallel()
// Create a mock store.
store := &MockSweeperStore{}
defer store.AssertExpectations(t)
// Create a mock wallet and aggregator.
wallet := &MockWallet{}
defer wallet.AssertExpectations(t)
aggregator := &mockUtxoAggregator{}
defer aggregator.AssertExpectations(t)
publisher := &MockBumper{}
defer publisher.AssertExpectations(t)
// Create a test sweeper.
s := New(&UtxoSweeperConfig{
Wallet: wallet,
Aggregator: aggregator,
Publisher: publisher,
GenSweepScript: func() fn.Result[lnwallet.AddrWithKey] {
//nolint:ll
return fn.Ok(lnwallet.AddrWithKey{
DeliveryAddress: testPubKey.SerializeCompressed(),
})
},
NoDeadlineConfTarget: uint32(DefaultDeadlineDelta),
Store: store,
})
// Create a mock input set.
set := &MockInputSet{}
defer set.AssertExpectations(t)
// Create mock inputs - inp1 will be the bad input, and inp2 will be
// retried.
inp1 := createMockInput(t, s, PendingPublish)
inp2 := createMockInput(t, s, PendingPublish)
set.On("Inputs").Return([]input.Input{inp1, inp2})
op1 := inp1.OutPoint()
op2 := inp2.OutPoint()
inp2.On("RequiredLockTime").Return(
uint32(s.currentHeight), false).Once()
inp2.On("BlocksToMaturity").Return(uint32(0)).Once()
inp2.On("HeightHint").Return(uint32(s.currentHeight)).Once()
// Create a testing tx that spends inp1.
tx := &wire.MsgTx{
LockTime: 1,
TxIn: []*wire.TxIn{
{PreviousOutPoint: op1},
},
}
txid := tx.TxHash()
// Create a testing bump result.
br := &BumpResult{
Tx: tx,
Event: TxUnknownSpend,
SpentInputs: map[wire.OutPoint]*wire.MsgTx{
op1: tx,
},
}
// Create a testing bump response.
resp := &bumpResp{
result: br,
set: set,
}
// Mock the store to return true when calling IsOurTx.
store.On("IsOurTx", txid).Return(true).Once()
// Mock the aggregator to return an empty slice as we are not testing
// the actual sweeping behavior.
aggregator.On("ClusterInputs", mock.Anything).Return([]InputSet{})
// Call the method under test.
s.handleBumpEventTxUnknownSpend(resp)
// Assert the first input is removed.
require.NotContains(t, s.inputs, op1)
// Assert the state of the input is updated.
require.Equal(t, PublishFailed, s.inputs[op2].state)
}