mirror of
https://github.com/lightningnetwork/lnd.git
synced 2024-11-19 01:43:16 +01:00
sweep: simplify polling logic in sweeper
This commit attempts to make the polling logic in sweeper more linear. Previously, the sweep's timer is reset/restarted in multiple places, such as when a new input comes in, or a new block comes in, or a previous input being spent, making it difficult to follow. We now remove the old timer and replaces it with a simple polling logic - we will schedule sweeps every 5s(default), and if there's no input to be swept, we'd skip, just like the previous `scheduleSweep` does. It's also worthy noting that, although `scheduleSweep` triggers the timer to tick, by the time we do the actual sweep in `sweepCluster`, conditions may have changed. This is now also fixed because we only have one place to create the clusters and sweeps.
This commit is contained in:
parent
4ba09098d1
commit
7de4186766
@ -228,9 +228,6 @@ type UtxoSweeper struct {
|
||||
// requested to sweep.
|
||||
pendingInputs pendingInputs
|
||||
|
||||
// timer is the channel that signals expiry of the sweep batch timer.
|
||||
timer <-chan time.Time
|
||||
|
||||
testSpendChan chan wire.OutPoint
|
||||
|
||||
currentOutputScript []byte
|
||||
@ -606,6 +603,12 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
|
||||
return
|
||||
}
|
||||
|
||||
// Create a ticker based on the config duration.
|
||||
ticker := time.NewTicker(s.cfg.TickerDuration)
|
||||
defer ticker.Stop()
|
||||
|
||||
log.Debugf("Sweep ticker started")
|
||||
|
||||
for {
|
||||
select {
|
||||
// A new inputs is offered to the sweeper. We check to see if
|
||||
@ -617,7 +620,7 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
|
||||
// A spend of one of our inputs is detected. Signal sweep
|
||||
// results to the caller(s).
|
||||
case spend := <-s.spendChan:
|
||||
s.handleInputSpent(spend, bestHeight)
|
||||
s.handleInputSpent(spend)
|
||||
|
||||
// A new external request has been received to retrieve all of
|
||||
// the inputs we're currently attempting to sweep.
|
||||
@ -634,12 +637,11 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
|
||||
}
|
||||
|
||||
// The timer expires and we are going to (re)sweep.
|
||||
case <-s.timer:
|
||||
log.Debugf("Sweep timer expired")
|
||||
case <-ticker.C:
|
||||
log.Debugf("Sweep ticker ticks, attempt sweeping...")
|
||||
s.handleSweep(bestHeight)
|
||||
|
||||
// A new block comes in. Things may have changed, so we retry a
|
||||
// sweep.
|
||||
// A new block comes in, update the bestHeight.
|
||||
case epoch, ok := <-blockEpochs:
|
||||
if !ok {
|
||||
return
|
||||
@ -650,10 +652,6 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
|
||||
log.Debugf("New block: height=%v, sha=%v",
|
||||
epoch.Height, epoch.Hash)
|
||||
|
||||
if err := s.scheduleSweep(bestHeight); err != nil {
|
||||
log.Errorf("schedule sweep: %v", err)
|
||||
}
|
||||
|
||||
case <-s.quit:
|
||||
return
|
||||
}
|
||||
@ -982,51 +980,6 @@ func mergeClusters(a, b inputCluster) []inputCluster {
|
||||
return []inputCluster{newCluster}
|
||||
}
|
||||
|
||||
// scheduleSweep starts the sweep timer to create an opportunity for more inputs
|
||||
// to be added.
|
||||
func (s *UtxoSweeper) scheduleSweep(currentHeight int32) error {
|
||||
// The timer is already ticking, no action needed for the sweep to
|
||||
// happen.
|
||||
if s.timer != nil {
|
||||
log.Debugf("Timer still ticking at height=%v", currentHeight)
|
||||
return nil
|
||||
}
|
||||
|
||||
// We'll only start our timer once we have inputs we're able to sweep.
|
||||
startTimer := false
|
||||
for _, cluster := range s.createInputClusters() {
|
||||
// Examine pending inputs and try to construct lists of inputs.
|
||||
// We don't need to obtain the coin selection lock, because we
|
||||
// just need an indication as to whether we can sweep. More
|
||||
// inputs may be added until we publish the transaction and
|
||||
// coins that we select now may be used in other transactions.
|
||||
inputLists, err := s.getInputLists(cluster, currentHeight)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get input lists: %v", err)
|
||||
}
|
||||
|
||||
log.Infof("Sweep candidates at height=%v with fee_rate=%v, "+
|
||||
"yield %v distinct txns", currentHeight,
|
||||
cluster.sweepFeeRate, len(inputLists))
|
||||
|
||||
if len(inputLists) != 0 {
|
||||
startTimer = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !startTimer {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start sweep timer to create opportunity for more inputs to be added
|
||||
// before a tx is constructed.
|
||||
s.timer = time.NewTicker(s.cfg.TickerDuration).C
|
||||
|
||||
log.Debugf("Sweep timer started")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// signalAndRemove notifies the listeners of the final result of the input
|
||||
// sweep. It cancels any pending spend notification and removes the input from
|
||||
// the list of pending inputs. When this function returns, the sweeper has
|
||||
@ -1423,10 +1376,6 @@ func (s *UtxoSweeper) handleUpdateReq(req *updateReq, bestHeight int32) (
|
||||
pendingInput.minPublishHeight = bestHeight
|
||||
}
|
||||
|
||||
if err := s.scheduleSweep(bestHeight); err != nil {
|
||||
log.Errorf("Unable to schedule sweep: %v", err)
|
||||
}
|
||||
|
||||
resultChan := make(chan Result, 1)
|
||||
pendingInput.listeners = append(pendingInput.listeners, resultChan)
|
||||
|
||||
@ -1522,13 +1471,6 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage,
|
||||
}
|
||||
|
||||
pendInput.ntfnRegCancel = cancel
|
||||
|
||||
// Check to see if with this new input a sweep tx can be formed.
|
||||
if err := s.scheduleSweep(bestHeight); err != nil {
|
||||
log.Errorf("schedule sweep: %v", err)
|
||||
}
|
||||
|
||||
log.Tracef("input %v scheduled", outpoint)
|
||||
}
|
||||
|
||||
// handleExistingInput processes an input that is already known to the sweeper.
|
||||
@ -1571,9 +1513,7 @@ func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage,
|
||||
|
||||
// handleInputSpent takes a spend event of our input and updates the sweeper's
|
||||
// internal state to remove the input.
|
||||
func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail,
|
||||
bestHeight int32) {
|
||||
|
||||
func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail) {
|
||||
// For testing purposes.
|
||||
if s.testSpendChan != nil {
|
||||
s.testSpendChan <- *spend.SpentOutPoint
|
||||
@ -1637,21 +1577,11 @@ func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail,
|
||||
s.removeExclusiveGroup(*input.params.ExclusiveGroup)
|
||||
}
|
||||
}
|
||||
|
||||
// Now that an input of ours is spent, we can try to resweep the
|
||||
// remaining inputs.
|
||||
if err := s.scheduleSweep(bestHeight); err != nil {
|
||||
log.Errorf("schedule sweep: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// handleSweep is called when the ticker fires. It will create clusters and
|
||||
// attempt to create and publish the sweeping transactions.
|
||||
func (s *UtxoSweeper) handleSweep(bestHeight int32) {
|
||||
// Set timer to nil so we know that a new timer needs to be started
|
||||
// when new inputs arrive.
|
||||
s.timer = nil
|
||||
|
||||
// We'll attempt to cluster all of our inputs with similar fee rates.
|
||||
// Before attempting to sweep them, we'll sort them in descending fee
|
||||
// rate order. We do this to ensure any inputs which have had their fee
|
||||
|
@ -4,7 +4,6 @@ import (
|
||||
"errors"
|
||||
"os"
|
||||
"reflect"
|
||||
"runtime/debug"
|
||||
"runtime/pprof"
|
||||
"sort"
|
||||
"testing"
|
||||
@ -44,7 +43,6 @@ type sweeperTestContext struct {
|
||||
backend *mockBackend
|
||||
store *MockSweeperStore
|
||||
|
||||
timeoutChan chan chan time.Time
|
||||
publishChan chan wire.MsgTx
|
||||
}
|
||||
|
||||
@ -123,7 +121,6 @@ func createSweeperTestContext(t *testing.T) *sweeperTestContext {
|
||||
estimator: estimator,
|
||||
backend: backend,
|
||||
store: store,
|
||||
timeoutChan: make(chan chan time.Time, 1),
|
||||
}
|
||||
|
||||
ctx.sweeper = New(&UtxoSweeperConfig{
|
||||
@ -163,43 +160,6 @@ func (ctx *sweeperTestContext) restartSweeper() {
|
||||
ctx.sweeper.Start()
|
||||
}
|
||||
|
||||
func (ctx *sweeperTestContext) tick() {
|
||||
testLog.Trace("Waiting for tick to be consumed")
|
||||
select {
|
||||
case c := <-ctx.timeoutChan:
|
||||
select {
|
||||
case c <- time.Time{}:
|
||||
testLog.Trace("Tick")
|
||||
case <-time.After(defaultTestTimeout):
|
||||
debug.PrintStack()
|
||||
ctx.t.Fatal("tick timeout - tick not consumed")
|
||||
}
|
||||
case <-time.After(defaultTestTimeout):
|
||||
debug.PrintStack()
|
||||
ctx.t.Fatal("tick timeout - no new timer created")
|
||||
}
|
||||
}
|
||||
|
||||
// assertNoTick asserts that the sweeper does not wait for a tick.
|
||||
func (ctx *sweeperTestContext) assertNoTick() {
|
||||
ctx.t.Helper()
|
||||
|
||||
select {
|
||||
case <-ctx.timeoutChan:
|
||||
ctx.t.Fatal("unexpected tick")
|
||||
|
||||
case <-time.After(processingDelay):
|
||||
}
|
||||
}
|
||||
|
||||
func (ctx *sweeperTestContext) assertNoNewTimer() {
|
||||
select {
|
||||
case <-ctx.timeoutChan:
|
||||
ctx.t.Fatal("no new timer expected")
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (ctx *sweeperTestContext) finish(expectedGoroutineCount int) {
|
||||
// We assume that when finish is called, sweeper has finished all its
|
||||
// goroutines. This implies that the waitgroup is empty.
|
||||
@ -233,7 +193,6 @@ func (ctx *sweeperTestContext) finish(expectedGoroutineCount int) {
|
||||
// We should have consumed and asserted all published transactions in
|
||||
// our unit tests.
|
||||
ctx.assertNoTx()
|
||||
ctx.assertNoNewTimer()
|
||||
if !ctx.backend.isDone() {
|
||||
ctx.t.Fatal("unconfirmed txes remaining")
|
||||
}
|
||||
@ -383,8 +342,6 @@ func TestSuccess(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx.tick()
|
||||
|
||||
sweepTx := ctx.receiveTx()
|
||||
|
||||
ctx.backend.mine()
|
||||
@ -437,8 +394,6 @@ func TestDust(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx.tick()
|
||||
|
||||
// The second input brings the sweep output above the dust limit. We
|
||||
// expect a sweep tx now.
|
||||
|
||||
@ -478,8 +433,6 @@ func TestWalletUtxo(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx.tick()
|
||||
|
||||
sweepTx := ctx.receiveTx()
|
||||
if len(sweepTx.TxIn) != 2 {
|
||||
t.Fatalf("Expected tx to sweep 2 inputs, but contains %v "+
|
||||
@ -532,8 +485,6 @@ func TestNegativeInput(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx.tick()
|
||||
|
||||
// We expect that a sweep tx is published now, but it should only
|
||||
// contain the large input. The negative input should stay out of sweeps
|
||||
// until fees come down to get a positive net yield.
|
||||
@ -557,8 +508,6 @@ func TestNegativeInput(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx.tick()
|
||||
|
||||
sweepTx2 := ctx.receiveTx()
|
||||
assertTxSweepsInputs(t, &sweepTx2, &secondLargeInput, &negInput)
|
||||
|
||||
@ -582,8 +531,6 @@ func TestChunks(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
ctx.tick()
|
||||
|
||||
// We expect two txes to be published because of the max input count of
|
||||
// three.
|
||||
sweepTx1 := ctx.receiveTx()
|
||||
@ -645,7 +592,6 @@ func testRemoteSpend(t *testing.T, postSweep bool) {
|
||||
}
|
||||
|
||||
if postSweep {
|
||||
ctx.tick()
|
||||
|
||||
// Tx publication by sweeper returns ErrDoubleSpend. Sweeper
|
||||
// will retry the inputs without reporting a result. It could be
|
||||
@ -669,7 +615,6 @@ func testRemoteSpend(t *testing.T, postSweep bool) {
|
||||
|
||||
if !postSweep {
|
||||
// Assert that the sweeper sweeps the remaining input.
|
||||
ctx.tick()
|
||||
sweepTx := ctx.receiveTx()
|
||||
|
||||
if len(sweepTx.TxIn) != 1 {
|
||||
@ -710,8 +655,6 @@ func TestIdempotency(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx.tick()
|
||||
|
||||
ctx.receiveTx()
|
||||
|
||||
resultChan3, err := ctx.sweeper.SweepInput(input, defaultFeePref)
|
||||
@ -739,7 +682,6 @@ func TestIdempotency(t *testing.T) {
|
||||
|
||||
// Timer is still running, but spend notification was delivered before
|
||||
// it expired.
|
||||
ctx.tick()
|
||||
|
||||
ctx.finish(1)
|
||||
}
|
||||
@ -762,7 +704,6 @@ func TestRestart(t *testing.T) {
|
||||
if _, err := ctx.sweeper.SweepInput(input1, defaultFeePref); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ctx.tick()
|
||||
|
||||
ctx.receiveTx()
|
||||
|
||||
@ -798,8 +739,6 @@ func TestRestart(t *testing.T) {
|
||||
|
||||
// Timer tick should trigger republishing a sweep for the remaining
|
||||
// input.
|
||||
ctx.tick()
|
||||
|
||||
ctx.receiveTx()
|
||||
|
||||
ctx.backend.mine()
|
||||
@ -837,8 +776,6 @@ func TestRestartRemoteSpend(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx.tick()
|
||||
|
||||
sweepTx := ctx.receiveTx()
|
||||
|
||||
// Restart sweeper.
|
||||
@ -869,8 +806,6 @@ func TestRestartRemoteSpend(t *testing.T) {
|
||||
|
||||
// Expect sweeper to construct a new tx, because input 1 was spend
|
||||
// remotely.
|
||||
ctx.tick()
|
||||
|
||||
ctx.receiveTx()
|
||||
|
||||
ctx.backend.mine()
|
||||
@ -891,8 +826,6 @@ func TestRestartConfirmed(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx.tick()
|
||||
|
||||
ctx.receiveTx()
|
||||
|
||||
// Restart sweeper.
|
||||
@ -910,9 +843,6 @@ func TestRestartConfirmed(t *testing.T) {
|
||||
// Here we expect again a successful sweep.
|
||||
ctx.expectResult(spendChan, nil)
|
||||
|
||||
// Timer started but not needed because spend ntfn was sent.
|
||||
ctx.tick()
|
||||
|
||||
ctx.finish(1)
|
||||
}
|
||||
|
||||
@ -927,8 +857,6 @@ func TestRetry(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx.tick()
|
||||
|
||||
// We expect a sweep to be published.
|
||||
ctx.receiveTx()
|
||||
|
||||
@ -944,8 +872,6 @@ func TestRetry(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx.tick()
|
||||
|
||||
// Two txes are expected to be published, because new and retry inputs
|
||||
// are separated.
|
||||
ctx.receiveTx()
|
||||
@ -971,8 +897,6 @@ func TestGiveUp(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx.tick()
|
||||
|
||||
// We expect a sweep to be published at height 100 (mockChainIOHeight).
|
||||
ctx.receiveTx()
|
||||
|
||||
@ -983,12 +907,10 @@ func TestGiveUp(t *testing.T) {
|
||||
|
||||
// Second attempt
|
||||
ctx.notifier.NotifyEpoch(101)
|
||||
ctx.tick()
|
||||
ctx.receiveTx()
|
||||
|
||||
// Third attempt
|
||||
ctx.notifier.NotifyEpoch(103)
|
||||
ctx.tick()
|
||||
ctx.receiveTx()
|
||||
|
||||
ctx.expectResult(resultChan0, ErrTooManyAttempts)
|
||||
@ -1038,10 +960,6 @@ func TestDifferentFeePreferences(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Start the sweeper's batch ticker, which should cause the sweep
|
||||
// transactions to be broadcast in order of high to low fee preference.
|
||||
ctx.tick()
|
||||
|
||||
// Generate the same type of sweep script that was used for weight
|
||||
// estimation.
|
||||
changePk, err := ctx.sweeper.cfg.GenSweepScript()
|
||||
@ -1121,7 +1039,6 @@ func TestPendingInputs(t *testing.T) {
|
||||
// rate sweep to ensure we can detect pending inputs after a sweep.
|
||||
// Once the higher fee rate sweep confirms, we should no longer see
|
||||
// those inputs pending.
|
||||
ctx.tick()
|
||||
ctx.receiveTx()
|
||||
lowFeeRateTx := ctx.receiveTx()
|
||||
ctx.backend.deleteUnconfirmed(lowFeeRateTx.TxHash())
|
||||
@ -1133,7 +1050,6 @@ func TestPendingInputs(t *testing.T) {
|
||||
// sweep. Once again we'll ensure those inputs are no longer pending
|
||||
// once the sweep transaction confirms.
|
||||
ctx.backend.notifier.NotifyEpoch(101)
|
||||
ctx.tick()
|
||||
ctx.receiveTx()
|
||||
ctx.backend.mine()
|
||||
ctx.expectResult(resultChan3, nil)
|
||||
@ -1179,7 +1095,6 @@ func TestBumpFeeRBF(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
// Ensure that a transaction is broadcast with the lower fee preference.
|
||||
ctx.tick()
|
||||
lowFeeTx := ctx.receiveTx()
|
||||
assertTxFeeRate(t, &lowFeeTx, lowFeeRate, changePk, &input)
|
||||
|
||||
@ -1200,7 +1115,6 @@ func TestBumpFeeRBF(t *testing.T) {
|
||||
require.NoError(t, err, "unable to bump input's fee")
|
||||
|
||||
// A higher fee rate transaction should be immediately broadcast.
|
||||
ctx.tick()
|
||||
highFeeTx := ctx.receiveTx()
|
||||
assertTxFeeRate(t, &highFeeTx, highFeeRate, changePk, &input)
|
||||
|
||||
@ -1234,7 +1148,6 @@ func TestExclusiveGroup(t *testing.T) {
|
||||
|
||||
// We expect all inputs to be published in separate transactions, even
|
||||
// though they share the same fee preference.
|
||||
ctx.tick()
|
||||
for i := 0; i < 3; i++ {
|
||||
sweepTx := ctx.receiveTx()
|
||||
if len(sweepTx.TxOut) != 1 {
|
||||
@ -1306,10 +1219,6 @@ func TestCpfp(t *testing.T) {
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Because we sweep at 1000 sat/kw, the parent cannot be paid for. We
|
||||
// expect the sweeper to remain idle.
|
||||
ctx.assertNoTick()
|
||||
|
||||
// Increase the fee estimate to above the parent tx fee rate.
|
||||
ctx.estimator.updateFees(5000, chainfee.FeePerKwFloor)
|
||||
|
||||
@ -1319,7 +1228,6 @@ func TestCpfp(t *testing.T) {
|
||||
|
||||
// Now we do expect a sweep transaction to be published with our input
|
||||
// and an attached wallet utxo.
|
||||
ctx.tick()
|
||||
tx := ctx.receiveTx()
|
||||
require.Len(t, tx.TxIn, 2)
|
||||
require.Len(t, tx.TxOut, 1)
|
||||
@ -1691,10 +1599,6 @@ func TestLockTimes(t *testing.T) {
|
||||
inputs[*op] = inp
|
||||
}
|
||||
|
||||
// We expect all inputs to be published in separate transactions, even
|
||||
// though they share the same fee preference.
|
||||
ctx.tick()
|
||||
|
||||
// Check the sweeps transactions, ensuring all inputs are there, and
|
||||
// all the locktimes are satisfied.
|
||||
for i := 0; i < numSweeps; i++ {
|
||||
@ -2139,9 +2043,6 @@ func TestRequiredTxOuts(t *testing.T) {
|
||||
inputs[*op] = inp
|
||||
}
|
||||
|
||||
// Tick, which should trigger a sweep of all inputs.
|
||||
ctx.tick()
|
||||
|
||||
// Check the sweeps transactions, ensuring all inputs
|
||||
// are there, and all the locktimes are satisfied.
|
||||
var sweeps []*wire.MsgTx
|
||||
|
Loading…
Reference in New Issue
Block a user