Merge pull request #8091 from yyforyongyu/remove-rbf-sweeper

sweep: remove possible RBF when sweeping new inputs
This commit is contained in:
Olaoluwa Osuntokun 2023-10-25 15:14:32 -07:00 committed by GitHub
commit a1fa195493
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 472 additions and 476 deletions

View file

@ -28,6 +28,10 @@
* LND will now [enforce pong responses
](https://github.com/lightningnetwork/lnd/pull/7828) from its peers
* [Fixed a possible unintended RBF
attempt](https://github.com/lightningnetwork/lnd/pull/8091) when sweeping new
inputs with retried ones.
# New Features
## Functional Enhancements

View file

@ -1524,18 +1524,12 @@ func assertDLPExecuted(ht *lntest.HarnessTest,
if commitType == lnrpc.CommitmentType_SCRIPT_ENFORCED_LEASE {
// Dave should sweep his anchor only, since he still has the
// lease CLTV constraint on his commitment output.
ht.Miner.AssertNumTxsInMempool(1)
// lease CLTV constraint on his commitment output. We'd also
// see Carol's anchor sweep here.
ht.Miner.AssertNumTxsInMempool(2)
// Mine Dave's anchor sweep tx.
ht.MineBlocksAndAssertNumTxes(1, 1)
blocksMined++
// The above block will trigger Carol's sweeper to reconsider
// the anchor sweeping. Because we are now sweeping at the fee
// rate floor, the sweeper will consider this input has
// positive yield thus attempts the sweeping.
ht.MineBlocksAndAssertNumTxes(1, 1)
// Mine anchor sweep txes for Carol and Dave.
ht.MineBlocksAndAssertNumTxes(1, 2)
blocksMined++
// After Carol's output matures, she should also reclaim her
@ -1564,10 +1558,10 @@ func assertDLPExecuted(ht *lntest.HarnessTest,
ht.AssertNumPendingForceClose(dave, 0)
} else {
// Dave should sweep his funds immediately, as they are not
// timelocked. We also expect Dave to sweep his anchor, if
// present.
// timelocked. We also expect Carol and Dave sweep their
// anchors.
if lntest.CommitTypeHasAnchors(commitType) {
ht.MineBlocksAndAssertNumTxes(1, 2)
ht.MineBlocksAndAssertNumTxes(1, 3)
} else {
ht.MineBlocksAndAssertNumTxes(1, 1)
}
@ -1577,15 +1571,6 @@ func assertDLPExecuted(ht *lntest.HarnessTest,
// Now Dave should consider the channel fully closed.
ht.AssertNumPendingForceClose(dave, 0)
// The above block will trigger Carol's sweeper to reconsider
// the anchor sweeping. Because we are now sweeping at the fee
// rate floor, the sweeper will consider this input has
// positive yield thus attempts the sweeping.
if lntest.CommitTypeHasAnchors(commitType) {
ht.MineBlocksAndAssertNumTxes(1, 1)
blocksMined++
}
// After Carol's output matures, she should also reclaim her
// funds.
//

View file

@ -94,7 +94,7 @@ func testCommitmentTransactionDeadline(ht *lntest.HarnessTest) {
// calculateSweepFeeRate runs multiple steps to calculate the fee rate
// used in sweeping the transactions.
calculateSweepFeeRate := func(expectedSweepTxNum, deadline int) int64 {
calculateSweepFeeRate := func(expectAnchor bool, deadline int) int64 {
// Create two nodes, Alice and Bob.
alice := setupNode("Alice")
defer ht.Shutdown(alice)
@ -143,12 +143,32 @@ func testCommitmentTransactionDeadline(ht *lntest.HarnessTest) {
// section.
ht.AssertChannelWaitingClose(alice, chanPoint)
// We should see Alice's force closing tx in the mempool.
expectedNumTxes := 1
// If anchor is expected, we should see the anchor sweep tx in
// the mempool too.
if expectAnchor {
expectedNumTxes = 2
}
// Check our sweep transactions can be found in mempool.
sweepTxns := ht.Miner.GetNumTxsFromMempool(expectedSweepTxNum)
sweepTxns := ht.Miner.GetNumTxsFromMempool(expectedNumTxes)
// Mine a block to confirm these transactions such that they
// don't remain in the mempool for any subsequent tests.
ht.MineBlocks(1)
ht.MineBlocksAndAssertNumTxes(1, expectedNumTxes)
// Bob should now sweep his to_local output and anchor output.
expectedNumTxes = 2
// If Alice's anchor is not swept above, we should see it here.
if !expectAnchor {
expectedNumTxes = 3
}
// Mine one more block to assert the sweep transactions.
ht.MineBlocksAndAssertNumTxes(1, expectedNumTxes)
// Calculate the fee rate used.
feeRate := ht.CalculateTxesFeeRate(sweepTxns)
@ -163,7 +183,7 @@ func testCommitmentTransactionDeadline(ht *lntest.HarnessTest) {
// Calculate fee rate used and assert only the force close tx is
// broadcast.
feeRate := calculateSweepFeeRate(1, deadline)
feeRate := calculateSweepFeeRate(false, deadline)
// We expect the default max fee rate is used. Allow some deviation
// because weight estimates during tx generation are estimates.
@ -181,7 +201,7 @@ func testCommitmentTransactionDeadline(ht *lntest.HarnessTest) {
// Calculate fee rate used and assert only the force close tx is
// broadcast.
feeRate = calculateSweepFeeRate(1, defaultDeadline)
feeRate = calculateSweepFeeRate(false, defaultDeadline)
// We expect the default max fee rate is used. Allow some deviation
// because weight estimates during tx generation are estimates.
@ -198,7 +218,7 @@ func testCommitmentTransactionDeadline(ht *lntest.HarnessTest) {
// Calculate fee rate used and assert both the force close tx and the
// anchor sweeping tx are broadcast.
feeRate = calculateSweepFeeRate(2, deadline)
feeRate = calculateSweepFeeRate(true, deadline)
// We expect the anchor to be swept with the deadline, which has the
// fee rate of feeRateLarge.

View file

@ -319,15 +319,17 @@ func runMultiHopHtlcLocalTimeout(ht *lntest.HarnessTest,
// was in fact mined.
ht.MineBlocksAndAssertNumTxes(1, 1)
// Mine an additional block to prompt Bob to broadcast their
// second layer sweep due to the CSV on the HTLC timeout output.
ht.MineBlocksAndAssertNumTxes(1, 0)
// Mine one more block to trigger the timeout path.
ht.MineEmptyBlocks(1)
// Bob's sweeper should now broadcast his second layer sweep
// due to the CSV on the HTLC timeout output.
ht.Miner.AssertOutpointInMempool(htlcTimeoutOutpoint)
}
// Next, we'll mine a final block that should confirm the sweeping
// transactions left.
ht.MineBlocks(1)
ht.MineBlocksAndAssertNumTxes(1, 1)
// Once this transaction has been confirmed, Bob should detect that he
// no longer has any pending channels.
@ -482,7 +484,7 @@ func runMultiHopReceiverChainClaim(ht *lntest.HarnessTest,
// We'll now mine an additional block which should confirm both the
// second layer transactions.
ht.MineBlocks(1)
ht.MineBlocksAndAssertNumTxes(1, expectedTxes)
// Carol's pending channel report should now show two outputs under
// limbo: her commitment output, as well as the second-layer claim
@ -494,16 +496,16 @@ func runMultiHopReceiverChainClaim(ht *lntest.HarnessTest,
// clearing the HTLC off-chain.
ht.AssertNumActiveHtlcs(alice, 0)
// If we mine 4 additional blocks, then Carol can sweep the second level
// HTLC output.
ht.MineBlocks(defaultCSV)
// If we mine 4 additional blocks, then Carol can sweep the second
// level HTLC output once the CSV expires.
ht.MineEmptyBlocks(defaultCSV)
// We should have a new transaction in the mempool.
ht.Miner.AssertNumTxsInMempool(1)
// Finally, if we mine an additional block to confirm these two sweep
// transactions, Carol should not show a pending channel in her report
// afterwards.
// Finally, if we mine an additional block to confirm Carol's second
// level success transaction. Carol should not show a pending channel
// in her report afterwards.
ht.MineBlocks(1)
ht.AssertNumPendingForceClose(carol, 0)
@ -815,15 +817,16 @@ func runMultiHopRemoteForceCloseOnChainHtlcTimeout(ht *lntest.HarnessTest,
case lnrpc.CommitmentType_LEGACY:
expectedTxes = 1
// Bob can sweep his commit and anchor outputs immediately.
// Bob can sweep his commit and anchor outputs immediately. Carol will
// also sweep her anchor.
case lnrpc.CommitmentType_ANCHORS, lnrpc.CommitmentType_SIMPLE_TAPROOT:
expectedTxes = 2
expectedTxes = 3
// Bob can't sweep his commit output yet as he was the initiator of a
// script-enforced leased channel, so he'll always incur the additional
// CLTV. He can still sweep his anchor output however.
case lnrpc.CommitmentType_SCRIPT_ENFORCED_LEASE:
expectedTxes = 1
expectedTxes = 2
default:
ht.Fatalf("unhandled commitment type %v", c)
@ -833,15 +836,6 @@ func runMultiHopRemoteForceCloseOnChainHtlcTimeout(ht *lntest.HarnessTest,
ht.MineBlocksAndAssertNumTxes(1, expectedTxes)
blocksMined++
// The above block will trigger Carol's sweeper to reconsider the
// anchor sweeping. Because we are now sweeping at the fee rate floor,
// the sweeper will consider this input has positive yield thus
// attempts the sweeping.
if lntest.CommitTypeHasAnchors(c) {
ht.MineBlocksAndAssertNumTxes(1, 1)
blocksMined++
}
// Next, we'll mine enough blocks for the HTLC to expire. At this
// point, Bob should hand off the output to his internal utxo nursery,
// which will broadcast a sweep transaction.
@ -1000,15 +994,16 @@ func runMultiHopHtlcLocalChainClaim(ht *lntest.HarnessTest,
case lnrpc.CommitmentType_LEGACY:
expectedTxes = 1
// Alice will sweep her commitment and anchor output immediately.
// Alice will sweep her commitment and anchor output immediately. Bob
// will also sweep his anchor.
case lnrpc.CommitmentType_ANCHORS, lnrpc.CommitmentType_SIMPLE_TAPROOT:
expectedTxes = 2
expectedTxes = 3
// Alice will sweep her anchor output immediately. Her commitment
// output cannot be swept yet as it has incurred an additional CLTV due
// to being the initiator of a script-enforced leased channel.
case lnrpc.CommitmentType_SCRIPT_ENFORCED_LEASE:
expectedTxes = 1
expectedTxes = 2
default:
ht.Fatalf("unhandled commitment type %v", c)
@ -2162,8 +2157,9 @@ func runExtraPreimageFromRemoteCommit(ht *lntest.HarnessTest,
numBlocks = htlc.ExpirationHeight - uint32(height) -
lncfg.DefaultOutgoingBroadcastDelta
// We should now have Carol's htlc suucess tx in the mempool.
// We should now have Carol's htlc success tx in the mempool.
numTxesMempool := 1
ht.Miner.AssertNumTxsInMempool(numTxesMempool)
// For neutrino backend, the timeout resolver needs to extract the
// preimage from the blocks.
@ -2171,7 +2167,9 @@ func runExtraPreimageFromRemoteCommit(ht *lntest.HarnessTest,
// Mine a block to confirm Carol's 2nd level success tx.
ht.MineBlocksAndAssertNumTxes(1, 1)
numTxesMempool--
numBlocks--
}
// Mine empty blocks so Carol's htlc success tx stays in mempool. Once
// the height is reached, Bob's timeout resolver will resolve the htlc
// by extracing the preimage from the mempool.

View file

@ -528,7 +528,6 @@ func testPrivateChannels(ht *lntest.HarnessTest) {
Private: true,
},
)
defer ht.CloseChannel(carol, chanPointPrivate)
// The channel should be available for payments between Carol and Alice.
// We check this by sending payments from Carol to Bob, that
@ -602,6 +601,7 @@ func testPrivateChannels(ht *lntest.HarnessTest) {
ht.CloseChannel(alice, chanPointAlice)
ht.CloseChannel(dave, chanPointDave)
ht.CloseChannel(carol, chanPointCarol)
ht.CloseChannel(carol, chanPointPrivate)
}
// testInvoiceRoutingHints tests that the routing hints for an invoice are

View file

@ -1059,14 +1059,12 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
}
s.sweeper = sweep.New(&sweep.UtxoSweeperConfig{
FeeEstimator: cc.FeeEstimator,
DetermineFeePerKw: sweep.DetermineFeePerKw,
GenSweepScript: newSweepPkScriptGen(cc.Wallet),
Signer: cc.Wallet.Cfg.Signer,
Wallet: newSweeperWallet(cc.Wallet),
NewBatchTimer: func() <-chan time.Time {
return time.NewTimer(cfg.Sweeper.BatchWindowDuration).C
},
FeeEstimator: cc.FeeEstimator,
DetermineFeePerKw: sweep.DetermineFeePerKw,
GenSweepScript: newSweepPkScriptGen(cc.Wallet),
Signer: cc.Wallet.Cfg.Signer,
Wallet: newSweeperWallet(cc.Wallet),
TickerDuration: cfg.Sweeper.BatchWindowDuration,
Notifier: cc.ChainNotifier,
Store: sweeperStore,
MaxInputsPerTx: sweep.DefaultMaxInputsPerTx,

View file

@ -228,9 +228,6 @@ type UtxoSweeper struct {
// requested to sweep.
pendingInputs pendingInputs
// timer is the channel that signals expiry of the sweep batch timer.
timer <-chan time.Time
testSpendChan chan wire.OutPoint
currentOutputScript []byte
@ -264,10 +261,11 @@ type UtxoSweeperConfig struct {
// Wallet contains the wallet functions that sweeper requires.
Wallet Wallet
// NewBatchTimer creates a channel that will be sent on when a certain
// time window has passed. During this time window, new inputs can still
// be added to the sweep tx that is about to be generated.
NewBatchTimer func() <-chan time.Time
// TickerDuration is used to create a channel that will be sent on when
// a certain time window has passed. During this time window, new
// inputs can still be added to the sweep tx that is about to be
// generated.
TickerDuration time.Duration
// Notifier is an instance of a chain notifier we'll use to watch for
// certain on-chain events.
@ -605,167 +603,24 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
return
}
// Create a ticker based on the config duration.
ticker := time.NewTicker(s.cfg.TickerDuration)
defer ticker.Stop()
log.Debugf("Sweep ticker started")
for {
select {
// A new inputs is offered to the sweeper. We check to see if we
// are already trying to sweep this input and if not, set up a
// listener to spend and schedule a sweep.
// A new inputs is offered to the sweeper. We check to see if
// we are already trying to sweep this input and if not, set up
// a listener to spend and schedule a sweep.
case input := <-s.newInputs:
outpoint := *input.input.OutPoint()
pendInput, pending := s.pendingInputs[outpoint]
if pending {
log.Debugf("Already pending input %v received",
outpoint)
// Before updating the input details, check if
// an exclusive group was set, and if so, assume
// this input as finalized and remove all other
// inputs belonging to the same exclusive group.
var prevExclGroup *uint64
if pendInput.params.ExclusiveGroup != nil &&
input.params.ExclusiveGroup == nil {
prevExclGroup = new(uint64)
*prevExclGroup = *pendInput.params.ExclusiveGroup
}
// Update input details and sweep parameters.
// The re-offered input details may contain a
// change to the unconfirmed parent tx info.
pendInput.params = input.params
pendInput.Input = input.input
// Add additional result channel to signal
// spend of this input.
pendInput.listeners = append(
pendInput.listeners, input.resultChan,
)
if prevExclGroup != nil {
s.removeExclusiveGroup(*prevExclGroup)
}
continue
}
// Create a new pendingInput and initialize the
// listeners slice with the passed in result channel. If
// this input is offered for sweep again, the result
// channel will be appended to this slice.
pendInput = &pendingInput{
listeners: []chan Result{input.resultChan},
Input: input.input,
minPublishHeight: bestHeight,
params: input.params,
}
s.pendingInputs[outpoint] = pendInput
log.Tracef("input %v added to pendingInputs", outpoint)
// Start watching for spend of this input, either by us
// or the remote party.
cancel, err := s.waitForSpend(
outpoint,
input.input.SignDesc().Output.PkScript,
input.input.HeightHint(),
)
if err != nil {
err := fmt.Errorf("wait for spend: %v", err)
s.signalAndRemove(&outpoint, Result{Err: err})
continue
}
pendInput.ntfnRegCancel = cancel
// Check to see if with this new input a sweep tx can be
// formed.
if err := s.scheduleSweep(bestHeight); err != nil {
log.Errorf("schedule sweep: %v", err)
}
log.Tracef("input %v scheduled", outpoint)
s.handleNewInput(input, bestHeight)
// A spend of one of our inputs is detected. Signal sweep
// results to the caller(s).
case spend := <-s.spendChan:
// For testing purposes.
if s.testSpendChan != nil {
s.testSpendChan <- *spend.SpentOutPoint
}
// Query store to find out if we ever published this
// tx.
spendHash := *spend.SpenderTxHash
isOurTx, err := s.cfg.Store.IsOurTx(spendHash)
if err != nil {
log.Errorf("cannot determine if tx %v "+
"is ours: %v", spendHash, err,
)
continue
}
// If this isn't our transaction, it means someone else
// swept outputs that we were attempting to sweep. This
// can happen for anchor outputs as well as justice
// transactions. In this case, we'll notify the wallet
// to remove any spends that a descent from this
// output.
if !isOurTx {
err := s.removeLastSweepDescendants(
spend.SpendingTx,
)
if err != nil {
log.Warnf("unable to remove descendant "+
"transactions due to tx %v: ",
spendHash)
}
log.Debugf("Detected spend related to in flight inputs "+
"(is_ours=%v): %v",
newLogClosure(func() string {
return spew.Sdump(spend.SpendingTx)
}), isOurTx,
)
}
// Signal sweep results for inputs in this confirmed
// tx.
for _, txIn := range spend.SpendingTx.TxIn {
outpoint := txIn.PreviousOutPoint
// Check if this input is known to us. It could
// probably be unknown if we canceled the
// registration, deleted from pendingInputs but
// the ntfn was in-flight already. Or this could
// be not one of our inputs.
input, ok := s.pendingInputs[outpoint]
if !ok {
continue
}
// Return either a nil or a remote spend result.
var err error
if !isOurTx {
err = ErrRemoteSpend
}
// Signal result channels.
s.signalAndRemove(&outpoint, Result{
Tx: spend.SpendingTx,
Err: err,
})
// Remove all other inputs in this exclusive
// group.
if input.params.ExclusiveGroup != nil {
s.removeExclusiveGroup(
*input.params.ExclusiveGroup,
)
}
}
// Now that an input of ours is spent, we can try to
// resweep the remaining inputs.
if err := s.scheduleSweep(bestHeight); err != nil {
log.Errorf("schedule sweep: %v", err)
}
s.handleInputSpent(spend)
// A new external request has been received to retrieve all of
// the inputs we're currently attempting to sweep.
@ -782,34 +637,11 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
}
// The timer expires and we are going to (re)sweep.
case <-s.timer:
log.Debugf("Sweep timer expired")
case <-ticker.C:
log.Debugf("Sweep ticker ticks, attempt sweeping...")
s.handleSweep(bestHeight)
// Set timer to nil so we know that a new timer needs to
// be started when new inputs arrive.
s.timer = nil
// We'll attempt to cluster all of our inputs with
// similar fee rates. Before attempting to sweep them,
// we'll sort them in descending fee rate order. We do
// this to ensure any inputs which have had their fee
// rate bumped are broadcast first in order enforce the
// RBF policy.
inputClusters := s.createInputClusters()
sort.Slice(inputClusters, func(i, j int) bool {
return inputClusters[i].sweepFeeRate >
inputClusters[j].sweepFeeRate
})
for _, cluster := range inputClusters {
err := s.sweepCluster(cluster, bestHeight)
if err != nil {
log.Errorf("input cluster sweep: %v",
err)
}
}
// A new block comes in. Things may have changed, so we retry a
// sweep.
// A new block comes in, update the bestHeight.
case epoch, ok := <-blockEpochs:
if !ok {
return
@ -820,10 +652,6 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
log.Debugf("New block: height=%v, sha=%v",
epoch.Height, epoch.Hash)
if err := s.scheduleSweep(bestHeight); err != nil {
log.Errorf("schedule sweep: %v", err)
}
case <-s.quit:
return
}
@ -859,22 +687,64 @@ func (s *UtxoSweeper) removeExclusiveGroup(group uint64) {
func (s *UtxoSweeper) sweepCluster(cluster inputCluster,
currentHeight int32) error {
// Execute the sweep within a coin select lock. Otherwise the coins that
// we are going to spend may be selected for other transactions like
// funding of a channel.
// Execute the sweep within a coin select lock. Otherwise the coins
// that we are going to spend may be selected for other transactions
// like funding of a channel.
return s.cfg.Wallet.WithCoinSelectLock(func() error {
// Examine pending inputs and try to construct
// lists of inputs.
inputLists, err := s.getInputLists(cluster, currentHeight)
// Examine pending inputs and try to construct lists of inputs.
allSets, newSets, err := s.getInputLists(cluster, currentHeight)
if err != nil {
return fmt.Errorf("unable to examine pending inputs: %v", err)
return fmt.Errorf("examine pending inputs: %w", err)
}
// Sweep selected inputs.
for _, inputs := range inputLists {
err := s.sweep(inputs, cluster.sweepFeeRate, currentHeight)
// errAllSets records the error from broadcasting the sweeping
// transactions for all input sets.
var errAllSets error
// allSets contains retried inputs and new inputs. To avoid
// creating an RBF for the new inputs, we'd sweep this set
// first.
for _, inputs := range allSets {
errAllSets = s.sweep(
inputs, cluster.sweepFeeRate, currentHeight,
)
// TODO(yy): we should also find out which set created
// this error. If there are new inputs in this set, we
// should give it a second chance by sweeping them
// below. To enable this, we need to provide richer
// state for each input other than just recording the
// publishAttempts. We'd also need to refactor how we
// create the input sets. Atm, the steps are,
// 1. create a list of input sets.
// 2. sweep each set by creating and publishing the tx.
// We should change the flow as,
// 1. create a list of input sets, and for each set,
// 2. when created, we create and publish the tx.
// 3. if the publish fails, find out which input is
// causing the failure and retry the rest of the
// inputs.
if errAllSets != nil {
log.Errorf("sweep all inputs: %w", err)
break
}
}
// If we have successfully swept all inputs, there's no need to
// sweep the new inputs as it'd create an RBF case.
if allSets != nil && errAllSets == nil {
return nil
}
// We'd end up there if there's no retried inputs. In this
// case, we'd sweep the new input sets. If there's an error
// when sweeping a given set, we'd log the error and sweep the
// next set.
for _, inputs := range newSets {
err := s.sweep(
inputs, cluster.sweepFeeRate, currentHeight,
)
if err != nil {
return fmt.Errorf("unable to sweep inputs: %v", err)
log.Errorf("sweep new inputs: %w", err)
}
}
@ -1152,51 +1022,6 @@ func mergeClusters(a, b inputCluster) []inputCluster {
return []inputCluster{newCluster}
}
// scheduleSweep starts the sweep timer to create an opportunity for more inputs
// to be added.
func (s *UtxoSweeper) scheduleSweep(currentHeight int32) error {
// The timer is already ticking, no action needed for the sweep to
// happen.
if s.timer != nil {
log.Debugf("Timer still ticking at height=%v", currentHeight)
return nil
}
// We'll only start our timer once we have inputs we're able to sweep.
startTimer := false
for _, cluster := range s.createInputClusters() {
// Examine pending inputs and try to construct lists of inputs.
// We don't need to obtain the coin selection lock, because we
// just need an indication as to whether we can sweep. More
// inputs may be added until we publish the transaction and
// coins that we select now may be used in other transactions.
inputLists, err := s.getInputLists(cluster, currentHeight)
if err != nil {
return fmt.Errorf("get input lists: %v", err)
}
log.Infof("Sweep candidates at height=%v with fee_rate=%v, "+
"yield %v distinct txns", currentHeight,
cluster.sweepFeeRate, len(inputLists))
if len(inputLists) != 0 {
startTimer = true
break
}
}
if !startTimer {
return nil
}
// Start sweep timer to create opportunity for more inputs to be added
// before a tx is constructed.
s.timer = s.cfg.NewBatchTimer()
log.Debugf("Sweep timer started")
return nil
}
// signalAndRemove notifies the listeners of the final result of the input
// sweep. It cancels any pending spend notification and removes the input from
// the list of pending inputs. When this function returns, the sweeper has
@ -1234,23 +1059,25 @@ func (s *UtxoSweeper) signalAndRemove(outpoint *wire.OutPoint, result Result) {
}
// getInputLists goes through the given inputs and constructs multiple distinct
// sweep lists with the given fee rate, each up to the configured maximum number
// of inputs. Negative yield inputs are skipped. Transactions with an output
// below the dust limit are not published. Those inputs remain pending and will
// be bundled with future inputs if possible.
// sweep lists with the given fee rate, each up to the configured maximum
// number of inputs. Negative yield inputs are skipped. Transactions with an
// output below the dust limit are not published. Those inputs remain pending
// and will be bundled with future inputs if possible. It returns two list -
// one containing all inputs and the other containing only the new inputs. If
// there's no retried inputs, the first set returned will be empty.
func (s *UtxoSweeper) getInputLists(cluster inputCluster,
currentHeight int32) ([]inputSet, error) {
currentHeight int32) ([]inputSet, []inputSet, error) {
// Filter for inputs that need to be swept. Create two lists: all
// sweepable inputs and a list containing only the new, never tried
// inputs.
//
// We want to create as large a tx as possible, so we return a final set
// list that starts with sets created from all inputs. However, there is
// a chance that those txes will not publish, because they already
// contain inputs that failed before. Therefore we also add sets
// consisting of only new inputs to the list, to make sure that new
// inputs are given a good, isolated chance of being published.
// We want to create as large a tx as possible, so we return a final
// set list that starts with sets created from all inputs. However,
// there is a chance that those txes will not publish, because they
// already contain inputs that failed before. Therefore we also add
// sets consisting of only new inputs to the list, to make sure that
// new inputs are given a good, isolated chance of being published.
//
// TODO(yy): this would lead to conflict transactions as the same input
// can be used in two sweeping transactions, and our rebroadcaster will
@ -1287,7 +1114,8 @@ func (s *UtxoSweeper) getInputLists(cluster inputCluster,
s.cfg.MaxInputsPerTx, s.cfg.Wallet,
)
if err != nil {
return nil, fmt.Errorf("input partitionings: %v", err)
return nil, nil, fmt.Errorf("input partitionings: %w",
err)
}
}
@ -1297,15 +1125,13 @@ func (s *UtxoSweeper) getInputLists(cluster inputCluster,
s.cfg.MaxInputsPerTx, s.cfg.Wallet,
)
if err != nil {
return nil, fmt.Errorf("input partitionings: %v", err)
return nil, nil, fmt.Errorf("input partitionings: %w", err)
}
log.Debugf("Sweep candidates at height=%v: total_num_pending=%v, "+
"total_num_new=%v", currentHeight, len(allSets), len(newSets))
// Append the new sets at the end of the list, because those tx likely
// have a higher fee per input.
return append(allSets, newSets...), nil
return allSets, newSets, nil
}
// sweep takes a set of preselected inputs, creates a sweep tx and publishes the
@ -1341,38 +1167,39 @@ func (s *UtxoSweeper) sweep(inputs inputSet, feeRate chainfee.SatPerKWeight,
return fmt.Errorf("notify publish tx: %v", err)
}
// Publish sweep tx.
// Reschedule the inputs that we just tried to sweep. This is done in
// case the following publish fails, we'd like to update the inputs'
// publish attempts and rescue them in the next sweep.
s.rescheduleInputs(tx.TxIn, currentHeight)
log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v",
tx.TxHash(), len(tx.TxIn), currentHeight)
log.Tracef("Sweep tx at height=%v: %v", currentHeight,
newLogClosure(func() string {
return spew.Sdump(tx)
}),
)
// Publish the sweeping tx with customized label.
err = s.cfg.Wallet.PublishTransaction(
tx, labels.MakeLabel(labels.LabelTypeSweepTransaction, nil),
)
// In case of an unexpected error, don't try to recover.
if err != nil && err != lnwallet.ErrDoubleSpend {
return fmt.Errorf("publish tx: %v", err)
}
// Otherwise log the error.
if err != nil {
log.Errorf("Publish sweep tx %v got error: %v", tx.TxHash(),
err)
} else {
// If there's no error, remove the output script. Otherwise
// keep it so that it can be reused for the next transaction
// and causes no address inflation.
s.currentOutputScript = nil
return err
}
// If there's no error, remove the output script. Otherwise keep it so
// that it can be reused for the next transaction and causes no address
// inflation.
s.currentOutputScript = nil
return nil
}
// rescheduleInputs updates the pending inputs with the given tx inputs. It
// increments the `publishAttempts` and calculates the next broadcast height
// for each input. When the publishAttempts exceeds MaxSweepAttemps(10), this
// input will be removed.
func (s *UtxoSweeper) rescheduleInputs(inputs []*wire.TxIn,
currentHeight int32) {
// Reschedule sweep.
for _, input := range tx.TxIn {
for _, input := range inputs {
pi, ok := s.pendingInputs[input.PreviousOutPoint]
if !ok {
// It can be that the input has been removed because it
@ -1413,13 +1240,11 @@ func (s *UtxoSweeper) sweep(inputs inputSet, feeRate chainfee.SatPerKWeight,
})
}
}
return nil
}
// waitForSpend registers a spend notification with the chain notifier. It
// monitorSpend registers a spend notification with the chain notifier. It
// returns a cancel function that can be used to cancel the registration.
func (s *UtxoSweeper) waitForSpend(outpoint wire.OutPoint,
func (s *UtxoSweeper) monitorSpend(outpoint wire.OutPoint,
script []byte, heightHint uint32) (func(), error) {
log.Tracef("Wait for spend of %v at heightHint=%v",
@ -1593,10 +1418,6 @@ func (s *UtxoSweeper) handleUpdateReq(req *updateReq, bestHeight int32) (
pendingInput.minPublishHeight = bestHeight
}
if err := s.scheduleSweep(bestHeight); err != nil {
log.Errorf("Unable to schedule sweep: %v", err)
}
resultChan := make(chan Result, 1)
pendingInput.listeners = append(pendingInput.listeners, resultChan)
@ -1651,6 +1472,176 @@ func (s *UtxoSweeper) ListSweeps() ([]chainhash.Hash, error) {
return s.cfg.Store.ListSweeps()
}
// handleNewInput processes a new input by registering spend notification and
// scheduling sweeping for it.
func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage,
bestHeight int32) {
outpoint := *input.input.OutPoint()
pendInput, pending := s.pendingInputs[outpoint]
if pending {
log.Debugf("Already pending input %v received", outpoint)
s.handleExistingInput(input, pendInput)
return
}
// Create a new pendingInput and initialize the listeners slice with
// the passed in result channel. If this input is offered for sweep
// again, the result channel will be appended to this slice.
pendInput = &pendingInput{
listeners: []chan Result{input.resultChan},
Input: input.input,
minPublishHeight: bestHeight,
params: input.params,
}
s.pendingInputs[outpoint] = pendInput
log.Tracef("input %v added to pendingInputs", outpoint)
// Start watching for spend of this input, either by us or the remote
// party.
cancel, err := s.monitorSpend(
outpoint, input.input.SignDesc().Output.PkScript,
input.input.HeightHint(),
)
if err != nil {
err := fmt.Errorf("wait for spend: %w", err)
s.signalAndRemove(&outpoint, Result{Err: err})
return
}
pendInput.ntfnRegCancel = cancel
}
// handleExistingInput processes an input that is already known to the sweeper.
// It will overwrite the params of the old input with the new ones.
func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage,
oldInput *pendingInput) {
// Before updating the input details, check if an exclusive group was
// set. In case the same input is registered again without an exclusive
// group set, the previous input and its sweep parameters are outdated
// hence need to be replaced. This scenario currently only happens for
// anchor outputs. When a channel is force closed, in the worst case 3
// different sweeps with the same exclusive group are registered with
// the sweeper to bump the closing transaction (cpfp) when its time
// critical. Receiving an input which was already registered with the
// sweeper but now without an exclusive group means non of the previous
// inputs were used as CPFP, so we need to make sure we update the
// sweep parameters but also remove all inputs with the same exclusive
// group because the are outdated too.
var prevExclGroup *uint64
if oldInput.params.ExclusiveGroup != nil &&
input.params.ExclusiveGroup == nil {
prevExclGroup = new(uint64)
*prevExclGroup = *oldInput.params.ExclusiveGroup
}
// Update input details and sweep parameters. The re-offered input
// details may contain a change to the unconfirmed parent tx info.
oldInput.params = input.params
oldInput.Input = input.input
// Add additional result channel to signal spend of this input.
oldInput.listeners = append(oldInput.listeners, input.resultChan)
if prevExclGroup != nil {
s.removeExclusiveGroup(*prevExclGroup)
}
}
// handleInputSpent takes a spend event of our input and updates the sweeper's
// internal state to remove the input.
func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail) {
// For testing purposes.
if s.testSpendChan != nil {
s.testSpendChan <- *spend.SpentOutPoint
}
// Query store to find out if we ever published this tx.
spendHash := *spend.SpenderTxHash
isOurTx, err := s.cfg.Store.IsOurTx(spendHash)
if err != nil {
log.Errorf("cannot determine if tx %v is ours: %v",
spendHash, err)
return
}
// If this isn't our transaction, it means someone else swept outputs
// that we were attempting to sweep. This can happen for anchor outputs
// as well as justice transactions. In this case, we'll notify the
// wallet to remove any spends that descent from this output.
if !isOurTx {
err := s.removeLastSweepDescendants(spend.SpendingTx)
if err != nil {
log.Warnf("unable to remove descendant transactions "+
"due to tx %v: ", spendHash)
}
log.Debugf("Detected third party spend related to in flight "+
"inputs (is_ours=%v): %v",
newLogClosure(func() string {
return spew.Sdump(spend.SpendingTx)
}), isOurTx,
)
}
// Signal sweep results for inputs in this confirmed tx.
for _, txIn := range spend.SpendingTx.TxIn {
outpoint := txIn.PreviousOutPoint
// Check if this input is known to us. It could probably be
// unknown if we canceled the registration, deleted from
// pendingInputs but the ntfn was in-flight already. Or this
// could be not one of our inputs.
input, ok := s.pendingInputs[outpoint]
if !ok {
continue
}
// Return either a nil or a remote spend result.
var err error
if !isOurTx {
err = ErrRemoteSpend
}
// Signal result channels.
s.signalAndRemove(&outpoint, Result{
Tx: spend.SpendingTx,
Err: err,
})
// Remove all other inputs in this exclusive group.
if input.params.ExclusiveGroup != nil {
s.removeExclusiveGroup(*input.params.ExclusiveGroup)
}
}
}
// handleSweep is called when the ticker fires. It will create clusters and
// attempt to create and publish the sweeping transactions.
func (s *UtxoSweeper) handleSweep(bestHeight int32) {
// We'll attempt to cluster all of our inputs with similar fee rates.
// Before attempting to sweep them, we'll sort them in descending fee
// rate order. We do this to ensure any inputs which have had their fee
// rate bumped are broadcast first in order enforce the RBF policy.
inputClusters := s.createInputClusters()
sort.Slice(inputClusters, func(i, j int) bool {
return inputClusters[i].sweepFeeRate >
inputClusters[j].sweepFeeRate
})
for _, cluster := range inputClusters {
err := s.sweepCluster(cluster, bestHeight)
if err != nil {
log.Errorf("input cluster sweep: %v", err)
}
}
}
// init initializes the random generator for random input rescheduling.
func init() {
rand.Seed(time.Now().Unix())

View file

@ -4,7 +4,6 @@ import (
"errors"
"os"
"reflect"
"runtime/debug"
"runtime/pprof"
"sort"
"testing"
@ -44,7 +43,6 @@ type sweeperTestContext struct {
backend *mockBackend
store *MockSweeperStore
timeoutChan chan chan time.Time
publishChan chan wire.MsgTx
}
@ -123,19 +121,14 @@ func createSweeperTestContext(t *testing.T) *sweeperTestContext {
estimator: estimator,
backend: backend,
store: store,
timeoutChan: make(chan chan time.Time, 1),
}
ctx.sweeper = New(&UtxoSweeperConfig{
Notifier: notifier,
Wallet: backend,
NewBatchTimer: func() <-chan time.Time {
c := make(chan time.Time, 1)
ctx.timeoutChan <- c
return c
},
Store: store,
Signer: &mock.DummySigner{},
Notifier: notifier,
Wallet: backend,
TickerDuration: 100 * time.Millisecond,
Store: store,
Signer: &mock.DummySigner{},
GenSweepScript: func() ([]byte, error) {
script := make([]byte, input.P2WPKHSize)
script[0] = 0
@ -167,43 +160,6 @@ func (ctx *sweeperTestContext) restartSweeper() {
ctx.sweeper.Start()
}
func (ctx *sweeperTestContext) tick() {
testLog.Trace("Waiting for tick to be consumed")
select {
case c := <-ctx.timeoutChan:
select {
case c <- time.Time{}:
testLog.Trace("Tick")
case <-time.After(defaultTestTimeout):
debug.PrintStack()
ctx.t.Fatal("tick timeout - tick not consumed")
}
case <-time.After(defaultTestTimeout):
debug.PrintStack()
ctx.t.Fatal("tick timeout - no new timer created")
}
}
// assertNoTick asserts that the sweeper does not wait for a tick.
func (ctx *sweeperTestContext) assertNoTick() {
ctx.t.Helper()
select {
case <-ctx.timeoutChan:
ctx.t.Fatal("unexpected tick")
case <-time.After(processingDelay):
}
}
func (ctx *sweeperTestContext) assertNoNewTimer() {
select {
case <-ctx.timeoutChan:
ctx.t.Fatal("no new timer expected")
default:
}
}
func (ctx *sweeperTestContext) finish(expectedGoroutineCount int) {
// We assume that when finish is called, sweeper has finished all its
// goroutines. This implies that the waitgroup is empty.
@ -237,7 +193,6 @@ func (ctx *sweeperTestContext) finish(expectedGoroutineCount int) {
// We should have consumed and asserted all published transactions in
// our unit tests.
ctx.assertNoTx()
ctx.assertNoNewTimer()
if !ctx.backend.isDone() {
ctx.t.Fatal("unconfirmed txes remaining")
}
@ -387,8 +342,6 @@ func TestSuccess(t *testing.T) {
t.Fatal(err)
}
ctx.tick()
sweepTx := ctx.receiveTx()
ctx.backend.mine()
@ -441,8 +394,6 @@ func TestDust(t *testing.T) {
t.Fatal(err)
}
ctx.tick()
// The second input brings the sweep output above the dust limit. We
// expect a sweep tx now.
@ -482,8 +433,6 @@ func TestWalletUtxo(t *testing.T) {
t.Fatal(err)
}
ctx.tick()
sweepTx := ctx.receiveTx()
if len(sweepTx.TxIn) != 2 {
t.Fatalf("Expected tx to sweep 2 inputs, but contains %v "+
@ -536,8 +485,6 @@ func TestNegativeInput(t *testing.T) {
t.Fatal(err)
}
ctx.tick()
// We expect that a sweep tx is published now, but it should only
// contain the large input. The negative input should stay out of sweeps
// until fees come down to get a positive net yield.
@ -561,8 +508,6 @@ func TestNegativeInput(t *testing.T) {
t.Fatal(err)
}
ctx.tick()
sweepTx2 := ctx.receiveTx()
assertTxSweepsInputs(t, &sweepTx2, &secondLargeInput, &negInput)
@ -586,8 +531,6 @@ func TestChunks(t *testing.T) {
}
}
ctx.tick()
// We expect two txes to be published because of the max input count of
// three.
sweepTx1 := ctx.receiveTx()
@ -649,7 +592,6 @@ func testRemoteSpend(t *testing.T, postSweep bool) {
}
if postSweep {
ctx.tick()
// Tx publication by sweeper returns ErrDoubleSpend. Sweeper
// will retry the inputs without reporting a result. It could be
@ -673,7 +615,6 @@ func testRemoteSpend(t *testing.T, postSweep bool) {
if !postSweep {
// Assert that the sweeper sweeps the remaining input.
ctx.tick()
sweepTx := ctx.receiveTx()
if len(sweepTx.TxIn) != 1 {
@ -714,8 +655,6 @@ func TestIdempotency(t *testing.T) {
t.Fatal(err)
}
ctx.tick()
ctx.receiveTx()
resultChan3, err := ctx.sweeper.SweepInput(input, defaultFeePref)
@ -743,7 +682,6 @@ func TestIdempotency(t *testing.T) {
// Timer is still running, but spend notification was delivered before
// it expired.
ctx.tick()
ctx.finish(1)
}
@ -766,7 +704,6 @@ func TestRestart(t *testing.T) {
if _, err := ctx.sweeper.SweepInput(input1, defaultFeePref); err != nil {
t.Fatal(err)
}
ctx.tick()
ctx.receiveTx()
@ -802,8 +739,6 @@ func TestRestart(t *testing.T) {
// Timer tick should trigger republishing a sweep for the remaining
// input.
ctx.tick()
ctx.receiveTx()
ctx.backend.mine()
@ -841,8 +776,6 @@ func TestRestartRemoteSpend(t *testing.T) {
t.Fatal(err)
}
ctx.tick()
sweepTx := ctx.receiveTx()
// Restart sweeper.
@ -873,8 +806,6 @@ func TestRestartRemoteSpend(t *testing.T) {
// Expect sweeper to construct a new tx, because input 1 was spend
// remotely.
ctx.tick()
ctx.receiveTx()
ctx.backend.mine()
@ -895,8 +826,6 @@ func TestRestartConfirmed(t *testing.T) {
t.Fatal(err)
}
ctx.tick()
ctx.receiveTx()
// Restart sweeper.
@ -914,9 +843,6 @@ func TestRestartConfirmed(t *testing.T) {
// Here we expect again a successful sweep.
ctx.expectResult(spendChan, nil)
// Timer started but not needed because spend ntfn was sent.
ctx.tick()
ctx.finish(1)
}
@ -931,15 +857,9 @@ func TestRetry(t *testing.T) {
t.Fatal(err)
}
ctx.tick()
// We expect a sweep to be published.
ctx.receiveTx()
// New block arrives. This should trigger a new sweep attempt timer
// start.
ctx.notifier.NotifyEpoch(1000)
// Offer a fresh input.
resultChan1, err := ctx.sweeper.SweepInput(
spendableInputs[1], defaultFeePref,
@ -948,11 +868,7 @@ func TestRetry(t *testing.T) {
t.Fatal(err)
}
ctx.tick()
// Two txes are expected to be published, because new and retry inputs
// are separated.
ctx.receiveTx()
// A single tx is expected to be published.
ctx.receiveTx()
ctx.backend.mine()
@ -975,8 +891,6 @@ func TestGiveUp(t *testing.T) {
t.Fatal(err)
}
ctx.tick()
// We expect a sweep to be published at height 100 (mockChainIOHeight).
ctx.receiveTx()
@ -987,12 +901,10 @@ func TestGiveUp(t *testing.T) {
// Second attempt
ctx.notifier.NotifyEpoch(101)
ctx.tick()
ctx.receiveTx()
// Third attempt
ctx.notifier.NotifyEpoch(103)
ctx.tick()
ctx.receiveTx()
ctx.expectResult(resultChan0, ErrTooManyAttempts)
@ -1042,10 +954,6 @@ func TestDifferentFeePreferences(t *testing.T) {
t.Fatal(err)
}
// Start the sweeper's batch ticker, which should cause the sweep
// transactions to be broadcast in order of high to low fee preference.
ctx.tick()
// Generate the same type of sweep script that was used for weight
// estimation.
changePk, err := ctx.sweeper.cfg.GenSweepScript()
@ -1125,7 +1033,6 @@ func TestPendingInputs(t *testing.T) {
// rate sweep to ensure we can detect pending inputs after a sweep.
// Once the higher fee rate sweep confirms, we should no longer see
// those inputs pending.
ctx.tick()
ctx.receiveTx()
lowFeeRateTx := ctx.receiveTx()
ctx.backend.deleteUnconfirmed(lowFeeRateTx.TxHash())
@ -1137,7 +1044,6 @@ func TestPendingInputs(t *testing.T) {
// sweep. Once again we'll ensure those inputs are no longer pending
// once the sweep transaction confirms.
ctx.backend.notifier.NotifyEpoch(101)
ctx.tick()
ctx.receiveTx()
ctx.backend.mine()
ctx.expectResult(resultChan3, nil)
@ -1183,7 +1089,6 @@ func TestBumpFeeRBF(t *testing.T) {
require.NoError(t, err)
// Ensure that a transaction is broadcast with the lower fee preference.
ctx.tick()
lowFeeTx := ctx.receiveTx()
assertTxFeeRate(t, &lowFeeTx, lowFeeRate, changePk, &input)
@ -1204,7 +1109,6 @@ func TestBumpFeeRBF(t *testing.T) {
require.NoError(t, err, "unable to bump input's fee")
// A higher fee rate transaction should be immediately broadcast.
ctx.tick()
highFeeTx := ctx.receiveTx()
assertTxFeeRate(t, &highFeeTx, highFeeRate, changePk, &input)
@ -1238,7 +1142,6 @@ func TestExclusiveGroup(t *testing.T) {
// We expect all inputs to be published in separate transactions, even
// though they share the same fee preference.
ctx.tick()
for i := 0; i < 3; i++ {
sweepTx := ctx.receiveTx()
if len(sweepTx.TxOut) != 1 {
@ -1310,10 +1213,6 @@ func TestCpfp(t *testing.T) {
)
require.NoError(t, err)
// Because we sweep at 1000 sat/kw, the parent cannot be paid for. We
// expect the sweeper to remain idle.
ctx.assertNoTick()
// Increase the fee estimate to above the parent tx fee rate.
ctx.estimator.updateFees(5000, chainfee.FeePerKwFloor)
@ -1323,7 +1222,6 @@ func TestCpfp(t *testing.T) {
// Now we do expect a sweep transaction to be published with our input
// and an attached wallet utxo.
ctx.tick()
tx := ctx.receiveTx()
require.Len(t, tx.TxIn, 2)
require.Len(t, tx.TxOut, 1)
@ -1695,10 +1593,6 @@ func TestLockTimes(t *testing.T) {
inputs[*op] = inp
}
// We expect all inputs to be published in separate transactions, even
// though they share the same fee preference.
ctx.tick()
// Check the sweeps transactions, ensuring all inputs are there, and
// all the locktimes are satisfied.
for i := 0; i < numSweeps; i++ {
@ -2143,9 +2037,6 @@ func TestRequiredTxOuts(t *testing.T) {
inputs[*op] = inp
}
// Tick, which should trigger a sweep of all inputs.
ctx.tick()
// Check the sweeps transactions, ensuring all inputs
// are there, and all the locktimes are satisfied.
var sweeps []*wire.MsgTx
@ -2259,7 +2150,7 @@ func TestFeeRateForPreference(t *testing.T) {
return 0, dummyErr
}
// Set the relay fee rate to be 1.
// Set the relay fee rate to be 1 sat/kw.
s.relayFeeRate = 1
// smallFeeFunc is a mock over DetermineFeePerKw that always return a
@ -2307,7 +2198,7 @@ func TestFeeRateForPreference(t *testing.T) {
expectedErr: ErrNoFeePreference,
},
{
// When an error is returned from the fee determinor,
// When an error is returned from the fee determiner,
// we should return it.
name: "error from DetermineFeePerKw",
feePref: FeePreference{FeeRate: 1},
@ -2539,3 +2430,112 @@ func TestClusterByLockTime(t *testing.T) {
})
}
}
// TestGetInputLists checks that the expected input sets are returned based on
// whether there are retried inputs or not.
func TestGetInputLists(t *testing.T) {
t.Parallel()
// Create a test param with a dummy fee preference. This is needed so
// `feeRateForPreference` won't throw an error.
param := Params{Fee: FeePreference{ConfTarget: 1}}
// Create a mock input and mock all the methods used in this test.
testInput := &input.MockInput{}
testInput.On("RequiredLockTime").Return(0, false)
testInput.On("WitnessType").Return(input.CommitmentAnchor)
testInput.On("OutPoint").Return(&wire.OutPoint{Index: 1})
testInput.On("RequiredTxOut").Return(nil)
testInput.On("UnconfParent").Return(nil)
testInput.On("SignDesc").Return(&input.SignDescriptor{
Output: &wire.TxOut{Value: 100_000},
})
// Create a new and a retried input.
//
// NOTE: we use the same input.Input for both pending inputs as we only
// test the logic of returning the correct non-nil input sets, and not
// the content the of sets. To validate the content of the sets, we
// should test `generateInputPartitionings` instead.
newInput := &pendingInput{
Input: testInput,
params: param,
}
oldInput := &pendingInput{
Input: testInput,
params: param,
publishAttempts: 1,
}
// clusterNew contains only new inputs.
clusterNew := pendingInputs{
wire.OutPoint{Index: 1}: newInput,
}
// clusterMixed contains a mixed of new and retried inputs.
clusterMixed := pendingInputs{
wire.OutPoint{Index: 1}: newInput,
wire.OutPoint{Index: 2}: oldInput,
}
// clusterOld contains only retried inputs.
clusterOld := pendingInputs{
wire.OutPoint{Index: 2}: oldInput,
}
// Create a test sweeper.
s := New(&UtxoSweeperConfig{
MaxInputsPerTx: DefaultMaxInputsPerTx,
})
testCases := []struct {
name string
cluster inputCluster
expectedNilAllSet bool
expectNilNewSet bool
}{
{
// When there are only new inputs, we'd expect the
// first returned set(allSets) to be empty.
name: "new inputs only",
cluster: inputCluster{inputs: clusterNew},
expectedNilAllSet: true,
expectNilNewSet: false,
},
{
// When there are only retried inputs, we'd expect the
// second returned set(newSet) to be empty.
name: "retried inputs only",
cluster: inputCluster{inputs: clusterOld},
expectedNilAllSet: false,
expectNilNewSet: true,
},
{
// When there are mixed inputs, we'd expect two sets
// are returned.
name: "mixed inputs",
cluster: inputCluster{inputs: clusterMixed},
expectedNilAllSet: false,
expectNilNewSet: false,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
allSets, newSets, err := s.getInputLists(tc.cluster, 0)
require.NoError(t, err)
if tc.expectNilNewSet {
require.Nil(t, newSets)
}
if tc.expectedNilAllSet {
require.Nil(t, allSets)
}
})
}
}