mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-01-19 05:45:21 +01:00
2bc6b22a43
For anchor channels and neutrino backends we need to make sure that sweeps of the same exclusive group are removed when one of them is confirmed. Otherwise for neutrino backends those sweep transaction are always rebroadcasted and are blocking funds in the worst case scenario.
1676 lines
54 KiB
Go
1676 lines
54 KiB
Go
package sweep
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"math/rand"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/btcsuite/btcd/btcutil"
|
|
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
|
"github.com/btcsuite/btcd/wire"
|
|
"github.com/davecgh/go-spew/spew"
|
|
"github.com/lightningnetwork/lnd/chainntnfs"
|
|
"github.com/lightningnetwork/lnd/input"
|
|
"github.com/lightningnetwork/lnd/labels"
|
|
"github.com/lightningnetwork/lnd/lnwallet"
|
|
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
|
|
)
|
|
|
|
const (
|
|
// DefaultFeeRateBucketSize is the default size of fee rate buckets
|
|
// we'll use when clustering inputs into buckets with similar fee rates
|
|
// within the UtxoSweeper.
|
|
//
|
|
// Given a minimum relay fee rate of 1 sat/vbyte, a multiplier of 10
|
|
// would result in the following fee rate buckets up to the maximum fee
|
|
// rate:
|
|
//
|
|
// #1: min = 1 sat/vbyte, max = 10 sat/vbyte
|
|
// #2: min = 11 sat/vbyte, max = 20 sat/vbyte...
|
|
DefaultFeeRateBucketSize = 10
|
|
)
|
|
|
|
var (
|
|
// ErrRemoteSpend is returned in case an output that we try to sweep is
|
|
// confirmed in a tx of the remote party.
|
|
ErrRemoteSpend = errors.New("remote party swept utxo")
|
|
|
|
// ErrTooManyAttempts is returned in case sweeping an output has failed
|
|
// for the configured max number of attempts.
|
|
ErrTooManyAttempts = errors.New("sweep failed after max attempts")
|
|
|
|
// ErrNoFeePreference is returned when we attempt to satisfy a sweep
|
|
// request from a client whom did not specify a fee preference.
|
|
ErrNoFeePreference = errors.New("no fee preference specified")
|
|
|
|
// ErrFeePreferenceTooLow is returned when the fee preference gives a
|
|
// fee rate that's below the relay fee rate.
|
|
ErrFeePreferenceTooLow = errors.New("fee preference too low")
|
|
|
|
// ErrExclusiveGroupSpend is returned in case a different input of the
|
|
// same exclusive group was spent.
|
|
ErrExclusiveGroupSpend = errors.New("other member of exclusive group " +
|
|
"was spent")
|
|
|
|
// ErrSweeperShuttingDown is an error returned when a client attempts to
|
|
// make a request to the UtxoSweeper, but it is unable to handle it as
|
|
// it is/has already been stopped.
|
|
ErrSweeperShuttingDown = errors.New("utxo sweeper shutting down")
|
|
|
|
// DefaultMaxSweepAttempts specifies the default maximum number of times
|
|
// an input is included in a publish attempt before giving up and
|
|
// returning an error to the caller.
|
|
DefaultMaxSweepAttempts = 10
|
|
)
|
|
|
|
// Params contains the parameters that control the sweeping process.
|
|
type Params struct {
|
|
// Fee is the fee preference of the client who requested the input to be
|
|
// swept. If a confirmation target is specified, then we'll map it into
|
|
// a fee rate whenever we attempt to cluster inputs for a sweep.
|
|
Fee FeePreference
|
|
|
|
// Force indicates whether the input should be swept regardless of
|
|
// whether it is economical to do so.
|
|
Force bool
|
|
|
|
// ExclusiveGroup is an identifier that, if set, prevents other inputs
|
|
// with the same identifier from being batched together.
|
|
ExclusiveGroup *uint64
|
|
}
|
|
|
|
// ParamsUpdate contains a new set of parameters to update a pending sweep with.
|
|
type ParamsUpdate struct {
|
|
// Fee is the fee preference of the client who requested the input to be
|
|
// swept. If a confirmation target is specified, then we'll map it into
|
|
// a fee rate whenever we attempt to cluster inputs for a sweep.
|
|
Fee FeePreference
|
|
|
|
// Force indicates whether the input should be swept regardless of
|
|
// whether it is economical to do so.
|
|
Force bool
|
|
}
|
|
|
|
// String returns a human readable interpretation of the sweep parameters.
|
|
func (p Params) String() string {
|
|
if p.ExclusiveGroup != nil {
|
|
return fmt.Sprintf("fee=%v, force=%v, exclusive_group=%v",
|
|
p.Fee, p.Force, *p.ExclusiveGroup)
|
|
}
|
|
|
|
return fmt.Sprintf("fee=%v, force=%v, exclusive_group=nil",
|
|
p.Fee, p.Force)
|
|
}
|
|
|
|
// pendingInput is created when an input reaches the main loop for the first
|
|
// time. It wraps the input and tracks all relevant state that is needed for
|
|
// sweeping.
|
|
type pendingInput struct {
|
|
input.Input
|
|
|
|
// listeners is a list of channels over which the final outcome of the
|
|
// sweep needs to be broadcasted.
|
|
listeners []chan Result
|
|
|
|
// ntfnRegCancel is populated with a function that cancels the chain
|
|
// notifier spend registration.
|
|
ntfnRegCancel func()
|
|
|
|
// minPublishHeight indicates the minimum block height at which this
|
|
// input may be (re)published.
|
|
minPublishHeight int32
|
|
|
|
// publishAttempts records the number of attempts that have already been
|
|
// made to sweep this tx.
|
|
publishAttempts int
|
|
|
|
// params contains the parameters that control the sweeping process.
|
|
params Params
|
|
|
|
// lastFeeRate is the most recent fee rate used for this input within a
|
|
// transaction broadcast to the network.
|
|
lastFeeRate chainfee.SatPerKWeight
|
|
}
|
|
|
|
// parameters returns the sweep parameters for this input.
|
|
//
|
|
// NOTE: Part of the txInput interface.
|
|
func (p *pendingInput) parameters() Params {
|
|
return p.params
|
|
}
|
|
|
|
// pendingInputs is a type alias for a set of pending inputs.
|
|
type pendingInputs = map[wire.OutPoint]*pendingInput
|
|
|
|
// inputCluster is a helper struct to gather a set of pending inputs that should
|
|
// be swept with the specified fee rate.
|
|
type inputCluster struct {
|
|
lockTime *uint32
|
|
sweepFeeRate chainfee.SatPerKWeight
|
|
inputs pendingInputs
|
|
}
|
|
|
|
// pendingSweepsReq is an internal message we'll use to represent an external
|
|
// caller's intent to retrieve all of the pending inputs the UtxoSweeper is
|
|
// attempting to sweep.
|
|
type pendingSweepsReq struct {
|
|
respChan chan map[wire.OutPoint]*PendingInput
|
|
errChan chan error
|
|
}
|
|
|
|
// PendingInput contains information about an input that is currently being
|
|
// swept by the UtxoSweeper.
|
|
type PendingInput struct {
|
|
// OutPoint is the identify outpoint of the input being swept.
|
|
OutPoint wire.OutPoint
|
|
|
|
// WitnessType is the witness type of the input being swept.
|
|
WitnessType input.WitnessType
|
|
|
|
// Amount is the amount of the input being swept.
|
|
Amount btcutil.Amount
|
|
|
|
// LastFeeRate is the most recent fee rate used for the input being
|
|
// swept within a transaction broadcast to the network.
|
|
LastFeeRate chainfee.SatPerKWeight
|
|
|
|
// BroadcastAttempts is the number of attempts we've made to sweept the
|
|
// input.
|
|
BroadcastAttempts int
|
|
|
|
// NextBroadcastHeight is the next height of the chain at which we'll
|
|
// attempt to broadcast a transaction sweeping the input.
|
|
NextBroadcastHeight uint32
|
|
|
|
// Params contains the sweep parameters for this pending request.
|
|
Params Params
|
|
}
|
|
|
|
// updateReq is an internal message we'll use to represent an external caller's
|
|
// intent to update the sweep parameters of a given input.
|
|
type updateReq struct {
|
|
input wire.OutPoint
|
|
params ParamsUpdate
|
|
responseChan chan *updateResp
|
|
}
|
|
|
|
// updateResp is an internal message we'll use to hand off the response of a
|
|
// updateReq from the UtxoSweeper's main event loop back to the caller.
|
|
type updateResp struct {
|
|
resultChan chan Result
|
|
err error
|
|
}
|
|
|
|
// UtxoSweeper is responsible for sweeping outputs back into the wallet
|
|
type UtxoSweeper struct {
|
|
started uint32 // To be used atomically.
|
|
stopped uint32 // To be used atomically.
|
|
|
|
cfg *UtxoSweeperConfig
|
|
|
|
newInputs chan *sweepInputMessage
|
|
spendChan chan *chainntnfs.SpendDetail
|
|
|
|
// pendingSweepsReq is a channel that will be sent requests by external
|
|
// callers in order to retrieve the set of pending inputs the
|
|
// UtxoSweeper is attempting to sweep.
|
|
pendingSweepsReqs chan *pendingSweepsReq
|
|
|
|
// updateReqs is a channel that will be sent requests by external
|
|
// callers who wish to bump the fee rate of a given input.
|
|
updateReqs chan *updateReq
|
|
|
|
// pendingInputs is the total set of inputs the UtxoSweeper has been
|
|
// requested to sweep.
|
|
pendingInputs pendingInputs
|
|
|
|
testSpendChan chan wire.OutPoint
|
|
|
|
currentOutputScript []byte
|
|
|
|
relayFeeRate chainfee.SatPerKWeight
|
|
|
|
quit chan struct{}
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// feeDeterminer defines an alias to the function signature of
|
|
// `DetermineFeePerKw`.
|
|
type feeDeterminer func(chainfee.Estimator,
|
|
FeePreference) (chainfee.SatPerKWeight, error)
|
|
|
|
// UtxoSweeperConfig contains dependencies of UtxoSweeper.
|
|
type UtxoSweeperConfig struct {
|
|
// GenSweepScript generates a P2WKH script belonging to the wallet where
|
|
// funds can be swept.
|
|
GenSweepScript func() ([]byte, error)
|
|
|
|
// DetermineFeePerKw determines the fee in sat/kw based on the given
|
|
// estimator and fee preference.
|
|
DetermineFeePerKw feeDeterminer
|
|
|
|
// FeeEstimator is used when crafting sweep transactions to estimate
|
|
// the necessary fee relative to the expected size of the sweep
|
|
// transaction.
|
|
FeeEstimator chainfee.Estimator
|
|
|
|
// Wallet contains the wallet functions that sweeper requires.
|
|
Wallet Wallet
|
|
|
|
// TickerDuration is used to create a channel that will be sent on when
|
|
// a certain time window has passed. During this time window, new
|
|
// inputs can still be added to the sweep tx that is about to be
|
|
// generated.
|
|
TickerDuration time.Duration
|
|
|
|
// Notifier is an instance of a chain notifier we'll use to watch for
|
|
// certain on-chain events.
|
|
Notifier chainntnfs.ChainNotifier
|
|
|
|
// Store stores the published sweeper txes.
|
|
Store SweeperStore
|
|
|
|
// Signer is used by the sweeper to generate valid witnesses at the
|
|
// time the incubated outputs need to be spent.
|
|
Signer input.Signer
|
|
|
|
// MaxInputsPerTx specifies the default maximum number of inputs allowed
|
|
// in a single sweep tx. If more need to be swept, multiple txes are
|
|
// created and published.
|
|
MaxInputsPerTx int
|
|
|
|
// MaxSweepAttempts specifies the maximum number of times an input is
|
|
// included in a publish attempt before giving up and returning an error
|
|
// to the caller.
|
|
MaxSweepAttempts int
|
|
|
|
// NextAttemptDeltaFunc returns given the number of already attempted
|
|
// sweeps, how many blocks to wait before retrying to sweep.
|
|
NextAttemptDeltaFunc func(int) int32
|
|
|
|
// MaxFeeRate is the the maximum fee rate allowed within the
|
|
// UtxoSweeper.
|
|
MaxFeeRate chainfee.SatPerVByte
|
|
|
|
// FeeRateBucketSize is the default size of fee rate buckets we'll use
|
|
// when clustering inputs into buckets with similar fee rates within the
|
|
// UtxoSweeper.
|
|
//
|
|
// Given a minimum relay fee rate of 1 sat/vbyte, a fee rate bucket size
|
|
// of 10 would result in the following fee rate buckets up to the
|
|
// maximum fee rate:
|
|
//
|
|
// #1: min = 1 sat/vbyte, max (exclusive) = 11 sat/vbyte
|
|
// #2: min = 11 sat/vbyte, max (exclusive) = 21 sat/vbyte...
|
|
FeeRateBucketSize int
|
|
}
|
|
|
|
// Result is the struct that is pushed through the result channel. Callers can
|
|
// use this to be informed of the final sweep result. In case of a remote
|
|
// spend, Err will be ErrRemoteSpend.
|
|
type Result struct {
|
|
// Err is the final result of the sweep. It is nil when the input is
|
|
// swept successfully by us. ErrRemoteSpend is returned when another
|
|
// party took the input.
|
|
Err error
|
|
|
|
// Tx is the transaction that spent the input.
|
|
Tx *wire.MsgTx
|
|
}
|
|
|
|
// sweepInputMessage structs are used in the internal channel between the
|
|
// SweepInput call and the sweeper main loop.
|
|
type sweepInputMessage struct {
|
|
input input.Input
|
|
params Params
|
|
resultChan chan Result
|
|
}
|
|
|
|
// New returns a new Sweeper instance.
|
|
func New(cfg *UtxoSweeperConfig) *UtxoSweeper {
|
|
return &UtxoSweeper{
|
|
cfg: cfg,
|
|
newInputs: make(chan *sweepInputMessage),
|
|
spendChan: make(chan *chainntnfs.SpendDetail),
|
|
updateReqs: make(chan *updateReq),
|
|
pendingSweepsReqs: make(chan *pendingSweepsReq),
|
|
quit: make(chan struct{}),
|
|
pendingInputs: make(pendingInputs),
|
|
}
|
|
}
|
|
|
|
// Start starts the process of constructing and publish sweep txes.
|
|
func (s *UtxoSweeper) Start() error {
|
|
if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
|
|
return nil
|
|
}
|
|
|
|
log.Info("Sweeper starting")
|
|
|
|
// Retrieve relay fee for dust limit calculation. Assume that this will
|
|
// not change from here on.
|
|
s.relayFeeRate = s.cfg.FeeEstimator.RelayFeePerKW()
|
|
|
|
// We need to register for block epochs and retry sweeping every block.
|
|
// We should get a notification with the current best block immediately
|
|
// if we don't provide any epoch. We'll wait for that in the collector.
|
|
blockEpochs, err := s.cfg.Notifier.RegisterBlockEpochNtfn(nil)
|
|
if err != nil {
|
|
return fmt.Errorf("register block epoch ntfn: %v", err)
|
|
}
|
|
|
|
// Start sweeper main loop.
|
|
s.wg.Add(1)
|
|
go func() {
|
|
defer blockEpochs.Cancel()
|
|
defer s.wg.Done()
|
|
|
|
s.collector(blockEpochs.Epochs)
|
|
|
|
// The collector exited and won't longer handle incoming
|
|
// requests. This can happen on shutdown, when the block
|
|
// notifier shuts down before the sweeper and its clients. In
|
|
// order to not deadlock the clients waiting for their requests
|
|
// being handled, we handle them here and immediately return an
|
|
// error. When the sweeper finally is shut down we can exit as
|
|
// the clients will be notified.
|
|
for {
|
|
select {
|
|
case inp := <-s.newInputs:
|
|
inp.resultChan <- Result{
|
|
Err: ErrSweeperShuttingDown,
|
|
}
|
|
|
|
case req := <-s.pendingSweepsReqs:
|
|
req.errChan <- ErrSweeperShuttingDown
|
|
|
|
case req := <-s.updateReqs:
|
|
req.responseChan <- &updateResp{
|
|
err: ErrSweeperShuttingDown,
|
|
}
|
|
|
|
case <-s.quit:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// RelayFeePerKW returns the minimum fee rate required for transactions to be
|
|
// relayed.
|
|
func (s *UtxoSweeper) RelayFeePerKW() chainfee.SatPerKWeight {
|
|
return s.relayFeeRate
|
|
}
|
|
|
|
// Stop stops sweeper from listening to block epochs and constructing sweep
|
|
// txes.
|
|
func (s *UtxoSweeper) Stop() error {
|
|
if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
|
|
return nil
|
|
}
|
|
|
|
log.Info("Sweeper shutting down...")
|
|
defer log.Debug("Sweeper shutdown complete")
|
|
|
|
close(s.quit)
|
|
s.wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
// SweepInput sweeps inputs back into the wallet. The inputs will be batched and
|
|
// swept after the batch time window ends. A custom fee preference can be
|
|
// provided to determine what fee rate should be used for the input. Note that
|
|
// the input may not always be swept with this exact value, as its possible for
|
|
// it to be batched under the same transaction with other similar fee rate
|
|
// inputs.
|
|
//
|
|
// NOTE: Extreme care needs to be taken that input isn't changed externally.
|
|
// Because it is an interface and we don't know what is exactly behind it, we
|
|
// cannot make a local copy in sweeper.
|
|
func (s *UtxoSweeper) SweepInput(input input.Input,
|
|
params Params) (chan Result, error) {
|
|
|
|
if input == nil || input.OutPoint() == nil || input.SignDesc() == nil {
|
|
return nil, errors.New("nil input received")
|
|
}
|
|
|
|
// Ensure the client provided a sane fee preference.
|
|
if _, err := s.feeRateForPreference(params.Fee); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
absoluteTimeLock, _ := input.RequiredLockTime()
|
|
log.Infof("Sweep request received: out_point=%v, witness_type=%v, "+
|
|
"relative_time_lock=%v, absolute_time_lock=%v, amount=%v, "+
|
|
"parent=(%v), params=(%v)", input.OutPoint(),
|
|
input.WitnessType(), input.BlocksToMaturity(), absoluteTimeLock,
|
|
btcutil.Amount(input.SignDesc().Output.Value),
|
|
input.UnconfParent(), params)
|
|
|
|
sweeperInput := &sweepInputMessage{
|
|
input: input,
|
|
params: params,
|
|
resultChan: make(chan Result, 1),
|
|
}
|
|
|
|
// Deliver input to the main event loop.
|
|
select {
|
|
case s.newInputs <- sweeperInput:
|
|
case <-s.quit:
|
|
return nil, ErrSweeperShuttingDown
|
|
}
|
|
|
|
return sweeperInput.resultChan, nil
|
|
}
|
|
|
|
// feeRateForPreference returns a fee rate for the given fee preference. It
|
|
// ensures that the fee rate respects the bounds of the UtxoSweeper.
|
|
func (s *UtxoSweeper) feeRateForPreference(
|
|
feePreference FeePreference) (chainfee.SatPerKWeight, error) {
|
|
|
|
// Ensure a type of fee preference is specified to prevent using a
|
|
// default below.
|
|
if feePreference.FeeRate == 0 && feePreference.ConfTarget == 0 {
|
|
return 0, ErrNoFeePreference
|
|
}
|
|
|
|
feeRate, err := s.cfg.DetermineFeePerKw(
|
|
s.cfg.FeeEstimator, feePreference,
|
|
)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if feeRate < s.relayFeeRate {
|
|
return 0, fmt.Errorf("%w: got %v, minimum is %v",
|
|
ErrFeePreferenceTooLow, feeRate, s.relayFeeRate)
|
|
}
|
|
|
|
// If the estimated fee rate is above the maximum allowed fee rate,
|
|
// default to the max fee rate.
|
|
if feeRate > s.cfg.MaxFeeRate.FeePerKWeight() {
|
|
log.Warnf("Estimated fee rate %v exceeds max allowed fee "+
|
|
"rate %v, using max fee rate instead", feeRate,
|
|
s.cfg.MaxFeeRate.FeePerKWeight())
|
|
|
|
return s.cfg.MaxFeeRate.FeePerKWeight(), nil
|
|
}
|
|
|
|
return feeRate, nil
|
|
}
|
|
|
|
// removeConflictSweepDescendants removes any transactions from the wallet that
|
|
// spend outputs included in the passed outpoint set. This needs to be done in
|
|
// cases where we're not the only ones that can sweep an output, but there may
|
|
// exist unconfirmed spends that spend outputs created by a sweep transaction.
|
|
// The most common case for this is when someone sweeps our anchor outputs
|
|
// after 16 blocks. Moreover this is also needed for wallets which use neutrino
|
|
// as a backend when a channel is force closed and anchor cpfp txns are
|
|
// created to bump the initial commitment transaction. In this case an anchor
|
|
// cpfp is broadcasted for up to 3 commitment transactions (local,
|
|
// remote-dangling, remote). Using neutrino all of those transactions will be
|
|
// accepted (the commitment tx will be different in all of those cases) and have
|
|
// to be removed as soon as one of them confirmes (they do have the same
|
|
// ExclusiveGroup). For neutrino backends the corresponding BIP 157 serving full
|
|
// nodes do not signal invalid transactions anymore.
|
|
func (s *UtxoSweeper) removeConflictSweepDescendants(
|
|
outpoints map[wire.OutPoint]struct{}) error {
|
|
|
|
// Obtain all the past sweeps that we've done so far. We'll need these
|
|
// to ensure that if the spendingTx spends any of the same inputs, then
|
|
// we remove any transaction that may be spending those inputs from the
|
|
// wallet.
|
|
//
|
|
// TODO(roasbeef): can be last sweep here if we remove anything confirmed
|
|
// from the store?
|
|
pastSweepHashes, err := s.cfg.Store.ListSweeps()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// We'll now go through each past transaction we published during this
|
|
// epoch and cross reference the spent inputs. If there're any inputs
|
|
// in common with the inputs the spendingTx spent, then we'll remove
|
|
// those.
|
|
//
|
|
// TODO(roasbeef): need to start to remove all transaction hashes after
|
|
// every N blocks (assumed point of no return)
|
|
for _, sweepHash := range pastSweepHashes {
|
|
sweepTx, err := s.cfg.Wallet.FetchTx(sweepHash)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Transaction wasn't found in the wallet, may have already
|
|
// been replaced/removed.
|
|
if sweepTx == nil {
|
|
// If it was removed, then we'll play it safe and mark
|
|
// it as no longer need to be rebroadcasted.
|
|
s.cfg.Wallet.CancelRebroadcast(sweepHash)
|
|
continue
|
|
}
|
|
|
|
// Check to see if this past sweep transaction spent any of the
|
|
// same inputs as spendingTx.
|
|
var isConflicting bool
|
|
for _, txIn := range sweepTx.TxIn {
|
|
if _, ok := outpoints[txIn.PreviousOutPoint]; ok {
|
|
isConflicting = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !isConflicting {
|
|
continue
|
|
}
|
|
|
|
// If it is conflicting, then we'll signal the wallet to remove
|
|
// all the transactions that are descendants of outputs created
|
|
// by the sweepTx and the sweepTx itself.
|
|
log.Debugf("Removing sweep txid=%v from wallet: %v",
|
|
sweepTx.TxHash(), spew.Sdump(sweepTx))
|
|
|
|
err = s.cfg.Wallet.RemoveDescendants(sweepTx)
|
|
if err != nil {
|
|
log.Warnf("Unable to remove descendants: %v", err)
|
|
}
|
|
|
|
// If this transaction was conflicting, then we'll stop
|
|
// rebroadcasting it in the background.
|
|
s.cfg.Wallet.CancelRebroadcast(sweepHash)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// collector is the sweeper main loop. It processes new inputs, spend
|
|
// notifications and counts down to publication of the sweep tx.
|
|
func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
|
|
// We registered for the block epochs with a nil request. The notifier
|
|
// should send us the current best block immediately. So we need to wait
|
|
// for it here because we need to know the current best height.
|
|
var bestHeight int32
|
|
select {
|
|
case bestBlock := <-blockEpochs:
|
|
bestHeight = bestBlock.Height
|
|
|
|
case <-s.quit:
|
|
return
|
|
}
|
|
|
|
// Create a ticker based on the config duration.
|
|
ticker := time.NewTicker(s.cfg.TickerDuration)
|
|
defer ticker.Stop()
|
|
|
|
log.Debugf("Sweep ticker started")
|
|
|
|
for {
|
|
select {
|
|
// A new inputs is offered to the sweeper. We check to see if
|
|
// we are already trying to sweep this input and if not, set up
|
|
// a listener to spend and schedule a sweep.
|
|
case input := <-s.newInputs:
|
|
s.handleNewInput(input, bestHeight)
|
|
|
|
// A spend of one of our inputs is detected. Signal sweep
|
|
// results to the caller(s).
|
|
case spend := <-s.spendChan:
|
|
s.handleInputSpent(spend)
|
|
|
|
// A new external request has been received to retrieve all of
|
|
// the inputs we're currently attempting to sweep.
|
|
case req := <-s.pendingSweepsReqs:
|
|
req.respChan <- s.handlePendingSweepsReq(req)
|
|
|
|
// A new external request has been received to bump the fee rate
|
|
// of a given input.
|
|
case req := <-s.updateReqs:
|
|
resultChan, err := s.handleUpdateReq(req, bestHeight)
|
|
req.responseChan <- &updateResp{
|
|
resultChan: resultChan,
|
|
err: err,
|
|
}
|
|
|
|
// The timer expires and we are going to (re)sweep.
|
|
case <-ticker.C:
|
|
log.Debugf("Sweep ticker ticks, attempt sweeping...")
|
|
s.handleSweep(bestHeight)
|
|
|
|
// A new block comes in, update the bestHeight.
|
|
case epoch, ok := <-blockEpochs:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
bestHeight = epoch.Height
|
|
|
|
log.Debugf("New block: height=%v, sha=%v",
|
|
epoch.Height, epoch.Hash)
|
|
|
|
case <-s.quit:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// removeExclusiveGroup removes all inputs in the given exclusive group. This
|
|
// function is called when one of the exclusive group inputs has been spent. The
|
|
// other inputs won't ever be spendable and can be removed. This also prevents
|
|
// them from being part of future sweep transactions that would fail. In
|
|
// addition sweep transactions of those inputs will be removed from the wallet.
|
|
func (s *UtxoSweeper) removeExclusiveGroup(group uint64) {
|
|
for outpoint, input := range s.pendingInputs {
|
|
outpoint := outpoint
|
|
|
|
// Skip inputs that aren't exclusive.
|
|
if input.params.ExclusiveGroup == nil {
|
|
continue
|
|
}
|
|
|
|
// Skip inputs from other exclusive groups.
|
|
if *input.params.ExclusiveGroup != group {
|
|
continue
|
|
}
|
|
|
|
// Signal result channels.
|
|
s.signalAndRemove(&outpoint, Result{
|
|
Err: ErrExclusiveGroupSpend,
|
|
})
|
|
|
|
// Remove all unconfirmed transactions from the wallet which
|
|
// spend the passed outpoint of the same exclusive group.
|
|
outpoints := map[wire.OutPoint]struct{}{
|
|
outpoint: {},
|
|
}
|
|
err := s.removeConflictSweepDescendants(outpoints)
|
|
if err != nil {
|
|
log.Warnf("Unable to remove conflicting sweep tx from "+
|
|
"wallet for outpoint %v : %v", outpoint, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// sweepCluster tries to sweep the given input cluster.
|
|
func (s *UtxoSweeper) sweepCluster(cluster inputCluster,
|
|
currentHeight int32) error {
|
|
|
|
// Execute the sweep within a coin select lock. Otherwise the coins
|
|
// that we are going to spend may be selected for other transactions
|
|
// like funding of a channel.
|
|
return s.cfg.Wallet.WithCoinSelectLock(func() error {
|
|
// Examine pending inputs and try to construct lists of inputs.
|
|
allSets, newSets, err := s.getInputLists(cluster, currentHeight)
|
|
if err != nil {
|
|
return fmt.Errorf("examine pending inputs: %w", err)
|
|
}
|
|
|
|
// errAllSets records the error from broadcasting the sweeping
|
|
// transactions for all input sets.
|
|
var errAllSets error
|
|
|
|
// allSets contains retried inputs and new inputs. To avoid
|
|
// creating an RBF for the new inputs, we'd sweep this set
|
|
// first.
|
|
for _, inputs := range allSets {
|
|
errAllSets = s.sweep(
|
|
inputs, cluster.sweepFeeRate, currentHeight,
|
|
)
|
|
// TODO(yy): we should also find out which set created
|
|
// this error. If there are new inputs in this set, we
|
|
// should give it a second chance by sweeping them
|
|
// below. To enable this, we need to provide richer
|
|
// state for each input other than just recording the
|
|
// publishAttempts. We'd also need to refactor how we
|
|
// create the input sets. Atm, the steps are,
|
|
// 1. create a list of input sets.
|
|
// 2. sweep each set by creating and publishing the tx.
|
|
// We should change the flow as,
|
|
// 1. create a list of input sets, and for each set,
|
|
// 2. when created, we create and publish the tx.
|
|
// 3. if the publish fails, find out which input is
|
|
// causing the failure and retry the rest of the
|
|
// inputs.
|
|
if errAllSets != nil {
|
|
log.Errorf("sweep all inputs: %w", err)
|
|
break
|
|
}
|
|
}
|
|
|
|
// If we have successfully swept all inputs, there's no need to
|
|
// sweep the new inputs as it'd create an RBF case.
|
|
if allSets != nil && errAllSets == nil {
|
|
return nil
|
|
}
|
|
|
|
// We'd end up there if there's no retried inputs. In this
|
|
// case, we'd sweep the new input sets. If there's an error
|
|
// when sweeping a given set, we'd log the error and sweep the
|
|
// next set.
|
|
for _, inputs := range newSets {
|
|
err := s.sweep(
|
|
inputs, cluster.sweepFeeRate, currentHeight,
|
|
)
|
|
if err != nil {
|
|
log.Errorf("sweep new inputs: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// bucketForFeeReate determines the proper bucket for a fee rate. This is done
|
|
// in order to batch inputs with similar fee rates together.
|
|
func (s *UtxoSweeper) bucketForFeeRate(
|
|
feeRate chainfee.SatPerKWeight) int {
|
|
|
|
// Create an isolated bucket for sweeps at the minimum fee rate. This is
|
|
// to prevent very small outputs (anchors) from becoming uneconomical if
|
|
// their fee rate would be averaged with higher fee rate inputs in a
|
|
// regular bucket.
|
|
if feeRate == s.relayFeeRate {
|
|
return 0
|
|
}
|
|
|
|
return 1 + int(feeRate-s.relayFeeRate)/s.cfg.FeeRateBucketSize
|
|
}
|
|
|
|
// createInputClusters creates a list of input clusters from the set of pending
|
|
// inputs known by the UtxoSweeper. It clusters inputs by
|
|
// 1) Required tx locktime
|
|
// 2) Similar fee rates.
|
|
func (s *UtxoSweeper) createInputClusters() []inputCluster {
|
|
inputs := s.pendingInputs
|
|
|
|
// We start by getting the inputs clusters by locktime. Since the
|
|
// inputs commit to the locktime, they can only be clustered together
|
|
// if the locktime is equal.
|
|
lockTimeClusters, nonLockTimeInputs := s.clusterByLockTime(inputs)
|
|
|
|
// Cluster the the remaining inputs by sweep fee rate.
|
|
feeClusters := s.clusterBySweepFeeRate(nonLockTimeInputs)
|
|
|
|
// Since the inputs that we clustered by fee rate don't commit to a
|
|
// specific locktime, we can try to merge a locktime cluster with a fee
|
|
// cluster.
|
|
return zipClusters(lockTimeClusters, feeClusters)
|
|
}
|
|
|
|
// clusterByLockTime takes the given set of pending inputs and clusters those
|
|
// with equal locktime together. Each cluster contains a sweep fee rate, which
|
|
// is determined by calculating the average fee rate of all inputs within that
|
|
// cluster. In addition to the created clusters, inputs that did not specify a
|
|
// required lock time are returned.
|
|
func (s *UtxoSweeper) clusterByLockTime(inputs pendingInputs) ([]inputCluster,
|
|
pendingInputs) {
|
|
|
|
locktimes := make(map[uint32]pendingInputs)
|
|
rem := make(pendingInputs)
|
|
|
|
// Go through all inputs and check if they require a certain locktime.
|
|
for op, input := range inputs {
|
|
lt, ok := input.RequiredLockTime()
|
|
if !ok {
|
|
rem[op] = input
|
|
continue
|
|
}
|
|
|
|
// Check if we already have inputs with this locktime.
|
|
cluster, ok := locktimes[lt]
|
|
if !ok {
|
|
cluster = make(pendingInputs)
|
|
}
|
|
|
|
// Get the fee rate based on the fee preference. If an error is
|
|
// returned, we'll skip sweeping this input for this round of
|
|
// cluster creation and retry it when we create the clusters
|
|
// from the pending inputs again.
|
|
feeRate, err := s.feeRateForPreference(input.params.Fee)
|
|
if err != nil {
|
|
log.Warnf("Skipping input %v: %v", op, err)
|
|
continue
|
|
}
|
|
|
|
log.Debugf("Adding input %v to cluster with locktime=%v, "+
|
|
"feeRate=%v", op, lt, feeRate)
|
|
|
|
// Attach the fee rate to the input.
|
|
input.lastFeeRate = feeRate
|
|
|
|
// Update the cluster about the updated input.
|
|
cluster[op] = input
|
|
locktimes[lt] = cluster
|
|
}
|
|
|
|
// We'll then determine the sweep fee rate for each set of inputs by
|
|
// calculating the average fee rate of the inputs within each set.
|
|
inputClusters := make([]inputCluster, 0, len(locktimes))
|
|
for lt, cluster := range locktimes {
|
|
lt := lt
|
|
|
|
var sweepFeeRate chainfee.SatPerKWeight
|
|
for _, input := range cluster {
|
|
sweepFeeRate += input.lastFeeRate
|
|
}
|
|
|
|
sweepFeeRate /= chainfee.SatPerKWeight(len(cluster))
|
|
inputClusters = append(inputClusters, inputCluster{
|
|
lockTime: <,
|
|
sweepFeeRate: sweepFeeRate,
|
|
inputs: cluster,
|
|
})
|
|
}
|
|
|
|
return inputClusters, rem
|
|
}
|
|
|
|
// clusterBySweepFeeRate takes the set of pending inputs within the UtxoSweeper
|
|
// and clusters those together with similar fee rates. Each cluster contains a
|
|
// sweep fee rate, which is determined by calculating the average fee rate of
|
|
// all inputs within that cluster.
|
|
func (s *UtxoSweeper) clusterBySweepFeeRate(inputs pendingInputs) []inputCluster {
|
|
bucketInputs := make(map[int]*bucketList)
|
|
inputFeeRates := make(map[wire.OutPoint]chainfee.SatPerKWeight)
|
|
|
|
// First, we'll group together all inputs with similar fee rates. This
|
|
// is done by determining the fee rate bucket they should belong in.
|
|
for op, input := range inputs {
|
|
feeRate, err := s.feeRateForPreference(input.params.Fee)
|
|
if err != nil {
|
|
log.Warnf("Skipping input %v: %v", op, err)
|
|
continue
|
|
}
|
|
|
|
// Only try to sweep inputs with an unconfirmed parent if the
|
|
// current sweep fee rate exceeds the parent tx fee rate. This
|
|
// assumes that such inputs are offered to the sweeper solely
|
|
// for the purpose of anchoring down the parent tx using cpfp.
|
|
parentTx := input.UnconfParent()
|
|
if parentTx != nil {
|
|
parentFeeRate :=
|
|
chainfee.SatPerKWeight(parentTx.Fee*1000) /
|
|
chainfee.SatPerKWeight(parentTx.Weight)
|
|
|
|
if parentFeeRate >= feeRate {
|
|
log.Debugf("Skipping cpfp input %v: fee_rate=%v, "+
|
|
"parent_fee_rate=%v", op, feeRate,
|
|
parentFeeRate)
|
|
|
|
continue
|
|
}
|
|
}
|
|
|
|
feeGroup := s.bucketForFeeRate(feeRate)
|
|
|
|
// Create a bucket list for this fee rate if there isn't one
|
|
// yet.
|
|
buckets, ok := bucketInputs[feeGroup]
|
|
if !ok {
|
|
buckets = &bucketList{}
|
|
bucketInputs[feeGroup] = buckets
|
|
}
|
|
|
|
// Request the bucket list to add this input. The bucket list
|
|
// will take into account exclusive group constraints.
|
|
buckets.add(input)
|
|
|
|
input.lastFeeRate = feeRate
|
|
inputFeeRates[op] = feeRate
|
|
}
|
|
|
|
// We'll then determine the sweep fee rate for each set of inputs by
|
|
// calculating the average fee rate of the inputs within each set.
|
|
inputClusters := make([]inputCluster, 0, len(bucketInputs))
|
|
for _, buckets := range bucketInputs {
|
|
for _, inputs := range buckets.buckets {
|
|
var sweepFeeRate chainfee.SatPerKWeight
|
|
for op := range inputs {
|
|
sweepFeeRate += inputFeeRates[op]
|
|
}
|
|
sweepFeeRate /= chainfee.SatPerKWeight(len(inputs))
|
|
inputClusters = append(inputClusters, inputCluster{
|
|
sweepFeeRate: sweepFeeRate,
|
|
inputs: inputs,
|
|
})
|
|
}
|
|
}
|
|
|
|
return inputClusters
|
|
}
|
|
|
|
// zipClusters merges pairwise clusters from as and bs such that cluster a from
|
|
// as is merged with a cluster from bs that has at least the fee rate of a.
|
|
// This to ensure we don't delay confirmation by decreasing the fee rate (the
|
|
// lock time inputs are typically second level HTLC transactions, that are time
|
|
// sensitive).
|
|
func zipClusters(as, bs []inputCluster) []inputCluster {
|
|
// Sort the clusters by decreasing fee rates.
|
|
sort.Slice(as, func(i, j int) bool {
|
|
return as[i].sweepFeeRate >
|
|
as[j].sweepFeeRate
|
|
})
|
|
sort.Slice(bs, func(i, j int) bool {
|
|
return bs[i].sweepFeeRate >
|
|
bs[j].sweepFeeRate
|
|
})
|
|
|
|
var (
|
|
finalClusters []inputCluster
|
|
j int
|
|
)
|
|
|
|
// Go through each cluster in as, and merge with the next one from bs
|
|
// if it has at least the fee rate needed.
|
|
for i := range as {
|
|
a := as[i]
|
|
|
|
switch {
|
|
// If the fee rate for the next one from bs is at least a's, we
|
|
// merge.
|
|
case j < len(bs) && bs[j].sweepFeeRate >= a.sweepFeeRate:
|
|
merged := mergeClusters(a, bs[j])
|
|
finalClusters = append(finalClusters, merged...)
|
|
|
|
// Increment j for the next round.
|
|
j++
|
|
|
|
// We did not merge, meaning all the remaining clusters from bs
|
|
// have lower fee rate. Instead we add a directly to the final
|
|
// clusters.
|
|
default:
|
|
finalClusters = append(finalClusters, a)
|
|
}
|
|
}
|
|
|
|
// Add any remaining clusters from bs.
|
|
for ; j < len(bs); j++ {
|
|
b := bs[j]
|
|
finalClusters = append(finalClusters, b)
|
|
}
|
|
|
|
return finalClusters
|
|
}
|
|
|
|
// mergeClusters attempts to merge cluster a and b if they are compatible. The
|
|
// new cluster will have the locktime set if a or b had a locktime set, and a
|
|
// sweep fee rate that is the maximum of a and b's. If the two clusters are not
|
|
// compatible, they will be returned unchanged.
|
|
func mergeClusters(a, b inputCluster) []inputCluster {
|
|
newCluster := inputCluster{}
|
|
|
|
switch {
|
|
// Incompatible locktimes, return the sets without merging them.
|
|
case a.lockTime != nil && b.lockTime != nil && *a.lockTime != *b.lockTime:
|
|
return []inputCluster{a, b}
|
|
|
|
case a.lockTime != nil:
|
|
newCluster.lockTime = a.lockTime
|
|
|
|
case b.lockTime != nil:
|
|
newCluster.lockTime = b.lockTime
|
|
}
|
|
|
|
if a.sweepFeeRate > b.sweepFeeRate {
|
|
newCluster.sweepFeeRate = a.sweepFeeRate
|
|
} else {
|
|
newCluster.sweepFeeRate = b.sweepFeeRate
|
|
}
|
|
|
|
newCluster.inputs = make(pendingInputs)
|
|
|
|
for op, in := range a.inputs {
|
|
newCluster.inputs[op] = in
|
|
}
|
|
|
|
for op, in := range b.inputs {
|
|
newCluster.inputs[op] = in
|
|
}
|
|
|
|
return []inputCluster{newCluster}
|
|
}
|
|
|
|
// signalAndRemove notifies the listeners of the final result of the input
|
|
// sweep. It cancels any pending spend notification and removes the input from
|
|
// the list of pending inputs. When this function returns, the sweeper has
|
|
// completely forgotten about the input.
|
|
func (s *UtxoSweeper) signalAndRemove(outpoint *wire.OutPoint, result Result) {
|
|
pendInput := s.pendingInputs[*outpoint]
|
|
listeners := pendInput.listeners
|
|
|
|
if result.Err == nil {
|
|
log.Debugf("Dispatching sweep success for %v to %v listeners",
|
|
outpoint, len(listeners),
|
|
)
|
|
} else {
|
|
log.Debugf("Dispatching sweep error for %v to %v listeners: %v",
|
|
outpoint, len(listeners), result.Err,
|
|
)
|
|
}
|
|
|
|
// Signal all listeners. Channel is buffered. Because we only send once
|
|
// on every channel, it should never block.
|
|
for _, resultChan := range listeners {
|
|
resultChan <- result
|
|
}
|
|
|
|
// Cancel spend notification with chain notifier. This is not necessary
|
|
// in case of a success, except for that a reorg could still happen.
|
|
if pendInput.ntfnRegCancel != nil {
|
|
log.Debugf("Canceling spend ntfn for %v", outpoint)
|
|
|
|
pendInput.ntfnRegCancel()
|
|
}
|
|
|
|
// Inputs are no longer pending after result has been sent.
|
|
delete(s.pendingInputs, *outpoint)
|
|
}
|
|
|
|
// getInputLists goes through the given inputs and constructs multiple distinct
|
|
// sweep lists with the given fee rate, each up to the configured maximum
|
|
// number of inputs. Negative yield inputs are skipped. Transactions with an
|
|
// output below the dust limit are not published. Those inputs remain pending
|
|
// and will be bundled with future inputs if possible. It returns two list -
|
|
// one containing all inputs and the other containing only the new inputs. If
|
|
// there's no retried inputs, the first set returned will be empty.
|
|
func (s *UtxoSweeper) getInputLists(cluster inputCluster,
|
|
currentHeight int32) ([]inputSet, []inputSet, error) {
|
|
|
|
// Filter for inputs that need to be swept. Create two lists: all
|
|
// sweepable inputs and a list containing only the new, never tried
|
|
// inputs.
|
|
//
|
|
// We want to create as large a tx as possible, so we return a final
|
|
// set list that starts with sets created from all inputs. However,
|
|
// there is a chance that those txes will not publish, because they
|
|
// already contain inputs that failed before. Therefore we also add
|
|
// sets consisting of only new inputs to the list, to make sure that
|
|
// new inputs are given a good, isolated chance of being published.
|
|
//
|
|
// TODO(yy): this would lead to conflict transactions as the same input
|
|
// can be used in two sweeping transactions, and our rebroadcaster will
|
|
// retry the failed one. We should instead understand why the input is
|
|
// failed in the first place, and start tracking input states in
|
|
// sweeper to avoid this.
|
|
var newInputs, retryInputs []txInput
|
|
for _, input := range cluster.inputs {
|
|
// Skip inputs that have a minimum publish height that is not
|
|
// yet reached.
|
|
if input.minPublishHeight > currentHeight {
|
|
continue
|
|
}
|
|
|
|
// Add input to the either one of the lists.
|
|
if input.publishAttempts == 0 {
|
|
newInputs = append(newInputs, input)
|
|
} else {
|
|
retryInputs = append(retryInputs, input)
|
|
}
|
|
}
|
|
|
|
// Convert the max fee rate's unit from sat/vb to sat/kw.
|
|
maxFeeRate := s.cfg.MaxFeeRate.FeePerKWeight()
|
|
|
|
// If there is anything to retry, combine it with the new inputs and
|
|
// form input sets.
|
|
var allSets []inputSet
|
|
if len(retryInputs) > 0 {
|
|
var err error
|
|
allSets, err = generateInputPartitionings(
|
|
append(retryInputs, newInputs...),
|
|
cluster.sweepFeeRate, maxFeeRate,
|
|
s.cfg.MaxInputsPerTx, s.cfg.Wallet,
|
|
)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("input partitionings: %w",
|
|
err)
|
|
}
|
|
}
|
|
|
|
// Create sets for just the new inputs.
|
|
newSets, err := generateInputPartitionings(
|
|
newInputs, cluster.sweepFeeRate, maxFeeRate,
|
|
s.cfg.MaxInputsPerTx, s.cfg.Wallet,
|
|
)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("input partitionings: %w", err)
|
|
}
|
|
|
|
log.Debugf("Sweep candidates at height=%v: total_num_pending=%v, "+
|
|
"total_num_new=%v", currentHeight, len(allSets), len(newSets))
|
|
|
|
return allSets, newSets, nil
|
|
}
|
|
|
|
// sweep takes a set of preselected inputs, creates a sweep tx and publishes the
|
|
// tx. The output address is only marked as used if the publish succeeds.
|
|
func (s *UtxoSweeper) sweep(inputs inputSet, feeRate chainfee.SatPerKWeight,
|
|
currentHeight int32) error {
|
|
|
|
// Generate an output script if there isn't an unused script available.
|
|
if s.currentOutputScript == nil {
|
|
pkScript, err := s.cfg.GenSweepScript()
|
|
if err != nil {
|
|
return fmt.Errorf("gen sweep script: %v", err)
|
|
}
|
|
s.currentOutputScript = pkScript
|
|
}
|
|
|
|
// Create sweep tx.
|
|
tx, err := createSweepTx(
|
|
inputs, nil, s.currentOutputScript, uint32(currentHeight),
|
|
feeRate, s.cfg.MaxFeeRate.FeePerKWeight(), s.cfg.Signer,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("create sweep tx: %v", err)
|
|
}
|
|
|
|
// Add tx before publication, so that we will always know that a spend
|
|
// by this tx is ours. Otherwise if the publish doesn't return, but did
|
|
// publish, we loose track of this tx. Even republication on startup
|
|
// doesn't prevent this, because that call returns a double spend error
|
|
// then and would also not add the hash to the store.
|
|
err = s.cfg.Store.NotifyPublishTx(tx)
|
|
if err != nil {
|
|
return fmt.Errorf("notify publish tx: %v", err)
|
|
}
|
|
|
|
// Reschedule the inputs that we just tried to sweep. This is done in
|
|
// case the following publish fails, we'd like to update the inputs'
|
|
// publish attempts and rescue them in the next sweep.
|
|
s.rescheduleInputs(tx.TxIn, currentHeight)
|
|
|
|
log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v",
|
|
tx.TxHash(), len(tx.TxIn), currentHeight)
|
|
|
|
// Publish the sweeping tx with customized label.
|
|
err = s.cfg.Wallet.PublishTransaction(
|
|
tx, labels.MakeLabel(labels.LabelTypeSweepTransaction, nil),
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// If there's no error, remove the output script. Otherwise keep it so
|
|
// that it can be reused for the next transaction and causes no address
|
|
// inflation.
|
|
s.currentOutputScript = nil
|
|
|
|
return nil
|
|
}
|
|
|
|
// rescheduleInputs updates the pending inputs with the given tx inputs. It
|
|
// increments the `publishAttempts` and calculates the next broadcast height
|
|
// for each input. When the publishAttempts exceeds MaxSweepAttemps(10), this
|
|
// input will be removed.
|
|
func (s *UtxoSweeper) rescheduleInputs(inputs []*wire.TxIn,
|
|
currentHeight int32) {
|
|
|
|
// Reschedule sweep.
|
|
for _, input := range inputs {
|
|
pi, ok := s.pendingInputs[input.PreviousOutPoint]
|
|
if !ok {
|
|
// It can be that the input has been removed because it
|
|
// exceed the maximum number of attempts in a previous
|
|
// input set. It could also be that this input is an
|
|
// additional wallet input that was attached. In that
|
|
// case there also isn't a pending input to update.
|
|
continue
|
|
}
|
|
|
|
// Record another publish attempt.
|
|
pi.publishAttempts++
|
|
|
|
// We don't care what the result of the publish call was. Even
|
|
// if it is published successfully, it can still be that it
|
|
// needs to be retried. Call NextAttemptDeltaFunc to calculate
|
|
// when to resweep this input.
|
|
nextAttemptDelta := s.cfg.NextAttemptDeltaFunc(
|
|
pi.publishAttempts,
|
|
)
|
|
|
|
pi.minPublishHeight = currentHeight + nextAttemptDelta
|
|
|
|
log.Debugf("Rescheduling input %v after %v attempts at "+
|
|
"height %v (delta %v)", input.PreviousOutPoint,
|
|
pi.publishAttempts, pi.minPublishHeight,
|
|
nextAttemptDelta)
|
|
|
|
if pi.publishAttempts >= s.cfg.MaxSweepAttempts {
|
|
log.Warnf("input %v: publishAttempts(%v) exceeds "+
|
|
"MaxSweepAttempts(%v), removed",
|
|
input.PreviousOutPoint, pi.publishAttempts,
|
|
s.cfg.MaxSweepAttempts)
|
|
|
|
// Signal result channels sweep result.
|
|
s.signalAndRemove(&input.PreviousOutPoint, Result{
|
|
Err: ErrTooManyAttempts,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// monitorSpend registers a spend notification with the chain notifier. It
|
|
// returns a cancel function that can be used to cancel the registration.
|
|
func (s *UtxoSweeper) monitorSpend(outpoint wire.OutPoint,
|
|
script []byte, heightHint uint32) (func(), error) {
|
|
|
|
log.Tracef("Wait for spend of %v at heightHint=%v",
|
|
outpoint, heightHint)
|
|
|
|
spendEvent, err := s.cfg.Notifier.RegisterSpendNtfn(
|
|
&outpoint, script, heightHint,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("register spend ntfn: %v", err)
|
|
}
|
|
|
|
s.wg.Add(1)
|
|
go func() {
|
|
defer s.wg.Done()
|
|
|
|
select {
|
|
case spend, ok := <-spendEvent.Spend:
|
|
if !ok {
|
|
log.Debugf("Spend ntfn for %v canceled",
|
|
outpoint)
|
|
return
|
|
}
|
|
|
|
log.Debugf("Delivering spend ntfn for %v",
|
|
outpoint)
|
|
select {
|
|
case s.spendChan <- spend:
|
|
log.Debugf("Delivered spend ntfn for %v",
|
|
outpoint)
|
|
|
|
case <-s.quit:
|
|
}
|
|
case <-s.quit:
|
|
}
|
|
}()
|
|
|
|
return spendEvent.Cancel, nil
|
|
}
|
|
|
|
// PendingInputs returns the set of inputs that the UtxoSweeper is currently
|
|
// attempting to sweep.
|
|
func (s *UtxoSweeper) PendingInputs() (map[wire.OutPoint]*PendingInput, error) {
|
|
respChan := make(chan map[wire.OutPoint]*PendingInput, 1)
|
|
errChan := make(chan error, 1)
|
|
select {
|
|
case s.pendingSweepsReqs <- &pendingSweepsReq{
|
|
respChan: respChan,
|
|
errChan: errChan,
|
|
}:
|
|
case <-s.quit:
|
|
return nil, ErrSweeperShuttingDown
|
|
}
|
|
|
|
select {
|
|
case pendingSweeps := <-respChan:
|
|
return pendingSweeps, nil
|
|
case err := <-errChan:
|
|
return nil, err
|
|
case <-s.quit:
|
|
return nil, ErrSweeperShuttingDown
|
|
}
|
|
}
|
|
|
|
// handlePendingSweepsReq handles a request to retrieve all pending inputs the
|
|
// UtxoSweeper is attempting to sweep.
|
|
func (s *UtxoSweeper) handlePendingSweepsReq(
|
|
req *pendingSweepsReq) map[wire.OutPoint]*PendingInput {
|
|
|
|
pendingInputs := make(map[wire.OutPoint]*PendingInput, len(s.pendingInputs))
|
|
for _, pendingInput := range s.pendingInputs {
|
|
// Only the exported fields are set, as we expect the response
|
|
// to only be consumed externally.
|
|
op := *pendingInput.OutPoint()
|
|
pendingInputs[op] = &PendingInput{
|
|
OutPoint: op,
|
|
WitnessType: pendingInput.WitnessType(),
|
|
Amount: btcutil.Amount(
|
|
pendingInput.SignDesc().Output.Value,
|
|
),
|
|
LastFeeRate: pendingInput.lastFeeRate,
|
|
BroadcastAttempts: pendingInput.publishAttempts,
|
|
NextBroadcastHeight: uint32(pendingInput.minPublishHeight),
|
|
Params: pendingInput.params,
|
|
}
|
|
}
|
|
|
|
return pendingInputs
|
|
}
|
|
|
|
// UpdateParams allows updating the sweep parameters of a pending input in the
|
|
// UtxoSweeper. This function can be used to provide an updated fee preference
|
|
// and force flag that will be used for a new sweep transaction of the input
|
|
// that will act as a replacement transaction (RBF) of the original sweeping
|
|
// transaction, if any. The exclusive group is left unchanged.
|
|
//
|
|
// NOTE: This currently doesn't do any fee rate validation to ensure that a bump
|
|
// is actually successful. The responsibility of doing so should be handled by
|
|
// the caller.
|
|
func (s *UtxoSweeper) UpdateParams(input wire.OutPoint,
|
|
params ParamsUpdate) (chan Result, error) {
|
|
|
|
// Ensure the client provided a sane fee preference.
|
|
if _, err := s.feeRateForPreference(params.Fee); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
responseChan := make(chan *updateResp, 1)
|
|
select {
|
|
case s.updateReqs <- &updateReq{
|
|
input: input,
|
|
params: params,
|
|
responseChan: responseChan,
|
|
}:
|
|
case <-s.quit:
|
|
return nil, ErrSweeperShuttingDown
|
|
}
|
|
|
|
select {
|
|
case response := <-responseChan:
|
|
return response.resultChan, response.err
|
|
case <-s.quit:
|
|
return nil, ErrSweeperShuttingDown
|
|
}
|
|
}
|
|
|
|
// handleUpdateReq handles an update request by simply updating the sweep
|
|
// parameters of the pending input. Currently, no validation is done on the new
|
|
// fee preference to ensure it will properly create a replacement transaction.
|
|
//
|
|
// TODO(wilmer):
|
|
// - Validate fee preference to ensure we'll create a valid replacement
|
|
// transaction to allow the new fee rate to propagate throughout the
|
|
// network.
|
|
// - Ensure we don't combine this input with any other unconfirmed inputs that
|
|
// did not exist in the original sweep transaction, resulting in an invalid
|
|
// replacement transaction.
|
|
func (s *UtxoSweeper) handleUpdateReq(req *updateReq, bestHeight int32) (
|
|
chan Result, error) {
|
|
|
|
// If the UtxoSweeper is already trying to sweep this input, then we can
|
|
// simply just increase its fee rate. This will allow the input to be
|
|
// batched with others which also have a similar fee rate, creating a
|
|
// higher fee rate transaction that replaces the original input's
|
|
// sweeping transaction.
|
|
pendingInput, ok := s.pendingInputs[req.input]
|
|
if !ok {
|
|
return nil, lnwallet.ErrNotMine
|
|
}
|
|
|
|
// Create the updated parameters struct. Leave the exclusive group
|
|
// unchanged.
|
|
newParams := pendingInput.params
|
|
newParams.Fee = req.params.Fee
|
|
newParams.Force = req.params.Force
|
|
|
|
log.Debugf("Updating sweep parameters for %v from %v to %v", req.input,
|
|
pendingInput.params, newParams)
|
|
|
|
pendingInput.params = newParams
|
|
|
|
// We'll reset the input's publish height to the current so that a new
|
|
// transaction can be created that replaces the transaction currently
|
|
// spending the input. We only do this for inputs that have been
|
|
// broadcast at least once to ensure we don't spend an input before its
|
|
// maturity height.
|
|
//
|
|
// NOTE: The UtxoSweeper is not yet offered time-locked inputs, so the
|
|
// check for broadcast attempts is redundant at the moment.
|
|
if pendingInput.publishAttempts > 0 {
|
|
pendingInput.minPublishHeight = bestHeight
|
|
}
|
|
|
|
resultChan := make(chan Result, 1)
|
|
pendingInput.listeners = append(pendingInput.listeners, resultChan)
|
|
|
|
return resultChan, nil
|
|
}
|
|
|
|
// CreateSweepTx accepts a list of inputs and signs and generates a txn that
|
|
// spends from them. This method also makes an accurate fee estimate before
|
|
// generating the required witnesses.
|
|
//
|
|
// The created transaction has a single output sending all the funds back to
|
|
// the source wallet, after accounting for the fee estimate.
|
|
//
|
|
// The value of currentBlockHeight argument will be set as the tx locktime.
|
|
// This function assumes that all CLTV inputs will be unlocked after
|
|
// currentBlockHeight. Reasons not to use the maximum of all actual CLTV expiry
|
|
// values of the inputs:
|
|
//
|
|
// - Make handling re-orgs easier.
|
|
// - Thwart future possible fee sniping attempts.
|
|
// - Make us blend in with the bitcoind wallet.
|
|
func (s *UtxoSweeper) CreateSweepTx(inputs []input.Input, feePref FeePreference,
|
|
currentBlockHeight uint32) (*wire.MsgTx, error) {
|
|
|
|
feePerKw, err := s.cfg.DetermineFeePerKw(s.cfg.FeeEstimator, feePref)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Generate the receiving script to which the funds will be swept.
|
|
pkScript, err := s.cfg.GenSweepScript()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return createSweepTx(
|
|
inputs, nil, pkScript, currentBlockHeight, feePerKw,
|
|
s.cfg.MaxFeeRate.FeePerKWeight(), s.cfg.Signer,
|
|
)
|
|
}
|
|
|
|
// DefaultNextAttemptDeltaFunc is the default calculation for next sweep attempt
|
|
// scheduling. It implements exponential back-off with some randomness. This is
|
|
// to prevent a stuck tx (for example because fee is too low and can't be bumped
|
|
// in btcd) from blocking all other retried inputs in the same tx.
|
|
func DefaultNextAttemptDeltaFunc(attempts int) int32 {
|
|
return 1 + rand.Int31n(1<<uint(attempts-1))
|
|
}
|
|
|
|
// ListSweeps returns a list of the the sweeps recorded by the sweep store.
|
|
func (s *UtxoSweeper) ListSweeps() ([]chainhash.Hash, error) {
|
|
return s.cfg.Store.ListSweeps()
|
|
}
|
|
|
|
// handleNewInput processes a new input by registering spend notification and
|
|
// scheduling sweeping for it.
|
|
func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage,
|
|
bestHeight int32) {
|
|
|
|
outpoint := *input.input.OutPoint()
|
|
pendInput, pending := s.pendingInputs[outpoint]
|
|
if pending {
|
|
log.Debugf("Already pending input %v received", outpoint)
|
|
|
|
s.handleExistingInput(input, pendInput)
|
|
|
|
return
|
|
}
|
|
|
|
// Create a new pendingInput and initialize the listeners slice with
|
|
// the passed in result channel. If this input is offered for sweep
|
|
// again, the result channel will be appended to this slice.
|
|
pendInput = &pendingInput{
|
|
listeners: []chan Result{input.resultChan},
|
|
Input: input.input,
|
|
minPublishHeight: bestHeight,
|
|
params: input.params,
|
|
}
|
|
s.pendingInputs[outpoint] = pendInput
|
|
log.Tracef("input %v added to pendingInputs", outpoint)
|
|
|
|
// Start watching for spend of this input, either by us or the remote
|
|
// party.
|
|
cancel, err := s.monitorSpend(
|
|
outpoint, input.input.SignDesc().Output.PkScript,
|
|
input.input.HeightHint(),
|
|
)
|
|
if err != nil {
|
|
err := fmt.Errorf("wait for spend: %w", err)
|
|
s.signalAndRemove(&outpoint, Result{Err: err})
|
|
|
|
return
|
|
}
|
|
|
|
pendInput.ntfnRegCancel = cancel
|
|
}
|
|
|
|
// handleExistingInput processes an input that is already known to the sweeper.
|
|
// It will overwrite the params of the old input with the new ones.
|
|
func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage,
|
|
oldInput *pendingInput) {
|
|
|
|
// Before updating the input details, check if an exclusive group was
|
|
// set. In case the same input is registered again without an exclusive
|
|
// group set, the previous input and its sweep parameters are outdated
|
|
// hence need to be replaced. This scenario currently only happens for
|
|
// anchor outputs. When a channel is force closed, in the worst case 3
|
|
// different sweeps with the same exclusive group are registered with
|
|
// the sweeper to bump the closing transaction (cpfp) when its time
|
|
// critical. Receiving an input which was already registered with the
|
|
// sweeper but now without an exclusive group means non of the previous
|
|
// inputs were used as CPFP, so we need to make sure we update the
|
|
// sweep parameters but also remove all inputs with the same exclusive
|
|
// group because the are outdated too.
|
|
var prevExclGroup *uint64
|
|
if oldInput.params.ExclusiveGroup != nil &&
|
|
input.params.ExclusiveGroup == nil {
|
|
|
|
prevExclGroup = new(uint64)
|
|
*prevExclGroup = *oldInput.params.ExclusiveGroup
|
|
}
|
|
|
|
// Update input details and sweep parameters. The re-offered input
|
|
// details may contain a change to the unconfirmed parent tx info.
|
|
oldInput.params = input.params
|
|
oldInput.Input = input.input
|
|
|
|
// Add additional result channel to signal spend of this input.
|
|
oldInput.listeners = append(oldInput.listeners, input.resultChan)
|
|
|
|
if prevExclGroup != nil {
|
|
s.removeExclusiveGroup(*prevExclGroup)
|
|
}
|
|
}
|
|
|
|
// handleInputSpent takes a spend event of our input and updates the sweeper's
|
|
// internal state to remove the input.
|
|
func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail) {
|
|
// For testing purposes.
|
|
if s.testSpendChan != nil {
|
|
s.testSpendChan <- *spend.SpentOutPoint
|
|
}
|
|
|
|
// Query store to find out if we ever published this tx.
|
|
spendHash := *spend.SpenderTxHash
|
|
isOurTx, err := s.cfg.Store.IsOurTx(spendHash)
|
|
if err != nil {
|
|
log.Errorf("cannot determine if tx %v is ours: %v",
|
|
spendHash, err)
|
|
return
|
|
}
|
|
|
|
// If this isn't our transaction, it means someone else swept outputs
|
|
// that we were attempting to sweep. This can happen for anchor outputs
|
|
// as well as justice transactions. In this case, we'll notify the
|
|
// wallet to remove any spends that descent from this output.
|
|
if !isOurTx {
|
|
// Construct a map of the inputs this transaction spends.
|
|
spendingTx := spend.SpendingTx
|
|
inputsSpent := make(
|
|
map[wire.OutPoint]struct{}, len(spendingTx.TxIn),
|
|
)
|
|
for _, txIn := range spendingTx.TxIn {
|
|
inputsSpent[txIn.PreviousOutPoint] = struct{}{}
|
|
}
|
|
|
|
log.Debugf("Attempting to remove descendant txns invalidated "+
|
|
"by (txid=%v): %v", spendingTx.TxHash(),
|
|
spew.Sdump(spendingTx))
|
|
|
|
err := s.removeConflictSweepDescendants(inputsSpent)
|
|
if err != nil {
|
|
log.Warnf("unable to remove descendant transactions "+
|
|
"due to tx %v: ", spendHash)
|
|
}
|
|
|
|
log.Debugf("Detected third party spend related to in flight "+
|
|
"inputs (is_ours=%v): %v", isOurTx,
|
|
newLogClosure(func() string {
|
|
return spew.Sdump(spend.SpendingTx)
|
|
}),
|
|
)
|
|
}
|
|
|
|
// Signal sweep results for inputs in this confirmed tx.
|
|
for _, txIn := range spend.SpendingTx.TxIn {
|
|
outpoint := txIn.PreviousOutPoint
|
|
|
|
// Check if this input is known to us. It could probably be
|
|
// unknown if we canceled the registration, deleted from
|
|
// pendingInputs but the ntfn was in-flight already. Or this
|
|
// could be not one of our inputs.
|
|
input, ok := s.pendingInputs[outpoint]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
// Return either a nil or a remote spend result.
|
|
var err error
|
|
if !isOurTx {
|
|
err = ErrRemoteSpend
|
|
}
|
|
|
|
// Signal result channels.
|
|
s.signalAndRemove(&outpoint, Result{
|
|
Tx: spend.SpendingTx,
|
|
Err: err,
|
|
})
|
|
|
|
// Remove all other inputs in this exclusive group.
|
|
if input.params.ExclusiveGroup != nil {
|
|
s.removeExclusiveGroup(*input.params.ExclusiveGroup)
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleSweep is called when the ticker fires. It will create clusters and
|
|
// attempt to create and publish the sweeping transactions.
|
|
func (s *UtxoSweeper) handleSweep(bestHeight int32) {
|
|
// We'll attempt to cluster all of our inputs with similar fee rates.
|
|
// Before attempting to sweep them, we'll sort them in descending fee
|
|
// rate order. We do this to ensure any inputs which have had their fee
|
|
// rate bumped are broadcast first in order enforce the RBF policy.
|
|
inputClusters := s.createInputClusters()
|
|
sort.Slice(inputClusters, func(i, j int) bool {
|
|
return inputClusters[i].sweepFeeRate >
|
|
inputClusters[j].sweepFeeRate
|
|
})
|
|
|
|
for _, cluster := range inputClusters {
|
|
err := s.sweepCluster(cluster, bestHeight)
|
|
if err != nil {
|
|
log.Errorf("input cluster sweep: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// init initializes the random generator for random input rescheduling.
|
|
func init() {
|
|
rand.Seed(time.Now().Unix())
|
|
}
|