package sweep import ( "errors" "fmt" "math/rand" "sort" "sync" "sync/atomic" "time" "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/labels" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet/chainfee" ) const ( // DefaultFeeRateBucketSize is the default size of fee rate buckets // we'll use when clustering inputs into buckets with similar fee rates // within the UtxoSweeper. // // Given a minimum relay fee rate of 1 sat/vbyte, a multiplier of 10 // would result in the following fee rate buckets up to the maximum fee // rate: // // #1: min = 1 sat/vbyte, max = 10 sat/vbyte // #2: min = 11 sat/vbyte, max = 20 sat/vbyte... DefaultFeeRateBucketSize = 10 ) var ( // ErrRemoteSpend is returned in case an output that we try to sweep is // confirmed in a tx of the remote party. ErrRemoteSpend = errors.New("remote party swept utxo") // ErrTooManyAttempts is returned in case sweeping an output has failed // for the configured max number of attempts. ErrTooManyAttempts = errors.New("sweep failed after max attempts") // ErrNoFeePreference is returned when we attempt to satisfy a sweep // request from a client whom did not specify a fee preference. ErrNoFeePreference = errors.New("no fee preference specified") // ErrFeePreferenceTooLow is returned when the fee preference gives a // fee rate that's below the relay fee rate. ErrFeePreferenceTooLow = errors.New("fee preference too low") // ErrExclusiveGroupSpend is returned in case a different input of the // same exclusive group was spent. ErrExclusiveGroupSpend = errors.New("other member of exclusive group " + "was spent") // ErrSweeperShuttingDown is an error returned when a client attempts to // make a request to the UtxoSweeper, but it is unable to handle it as // it is/has already been stopped. ErrSweeperShuttingDown = errors.New("utxo sweeper shutting down") // DefaultMaxSweepAttempts specifies the default maximum number of times // an input is included in a publish attempt before giving up and // returning an error to the caller. DefaultMaxSweepAttempts = 10 ) // Params contains the parameters that control the sweeping process. type Params struct { // Fee is the fee preference of the client who requested the input to be // swept. If a confirmation target is specified, then we'll map it into // a fee rate whenever we attempt to cluster inputs for a sweep. Fee FeePreference // Force indicates whether the input should be swept regardless of // whether it is economical to do so. Force bool // ExclusiveGroup is an identifier that, if set, prevents other inputs // with the same identifier from being batched together. ExclusiveGroup *uint64 } // ParamsUpdate contains a new set of parameters to update a pending sweep with. type ParamsUpdate struct { // Fee is the fee preference of the client who requested the input to be // swept. If a confirmation target is specified, then we'll map it into // a fee rate whenever we attempt to cluster inputs for a sweep. Fee FeePreference // Force indicates whether the input should be swept regardless of // whether it is economical to do so. Force bool } // String returns a human readable interpretation of the sweep parameters. func (p Params) String() string { if p.ExclusiveGroup != nil { return fmt.Sprintf("fee=%v, force=%v, exclusive_group=%v", p.Fee, p.Force, *p.ExclusiveGroup) } return fmt.Sprintf("fee=%v, force=%v, exclusive_group=nil", p.Fee, p.Force) } // pendingInput is created when an input reaches the main loop for the first // time. It wraps the input and tracks all relevant state that is needed for // sweeping. type pendingInput struct { input.Input // listeners is a list of channels over which the final outcome of the // sweep needs to be broadcasted. listeners []chan Result // ntfnRegCancel is populated with a function that cancels the chain // notifier spend registration. ntfnRegCancel func() // minPublishHeight indicates the minimum block height at which this // input may be (re)published. minPublishHeight int32 // publishAttempts records the number of attempts that have already been // made to sweep this tx. publishAttempts int // params contains the parameters that control the sweeping process. params Params // lastFeeRate is the most recent fee rate used for this input within a // transaction broadcast to the network. lastFeeRate chainfee.SatPerKWeight } // parameters returns the sweep parameters for this input. // // NOTE: Part of the txInput interface. func (p *pendingInput) parameters() Params { return p.params } // pendingInputs is a type alias for a set of pending inputs. type pendingInputs = map[wire.OutPoint]*pendingInput // inputCluster is a helper struct to gather a set of pending inputs that should // be swept with the specified fee rate. type inputCluster struct { lockTime *uint32 sweepFeeRate chainfee.SatPerKWeight inputs pendingInputs } // pendingSweepsReq is an internal message we'll use to represent an external // caller's intent to retrieve all of the pending inputs the UtxoSweeper is // attempting to sweep. type pendingSweepsReq struct { respChan chan map[wire.OutPoint]*PendingInput errChan chan error } // PendingInput contains information about an input that is currently being // swept by the UtxoSweeper. type PendingInput struct { // OutPoint is the identify outpoint of the input being swept. OutPoint wire.OutPoint // WitnessType is the witness type of the input being swept. WitnessType input.WitnessType // Amount is the amount of the input being swept. Amount btcutil.Amount // LastFeeRate is the most recent fee rate used for the input being // swept within a transaction broadcast to the network. LastFeeRate chainfee.SatPerKWeight // BroadcastAttempts is the number of attempts we've made to sweept the // input. BroadcastAttempts int // NextBroadcastHeight is the next height of the chain at which we'll // attempt to broadcast a transaction sweeping the input. NextBroadcastHeight uint32 // Params contains the sweep parameters for this pending request. Params Params } // updateReq is an internal message we'll use to represent an external caller's // intent to update the sweep parameters of a given input. type updateReq struct { input wire.OutPoint params ParamsUpdate responseChan chan *updateResp } // updateResp is an internal message we'll use to hand off the response of a // updateReq from the UtxoSweeper's main event loop back to the caller. type updateResp struct { resultChan chan Result err error } // UtxoSweeper is responsible for sweeping outputs back into the wallet type UtxoSweeper struct { started uint32 // To be used atomically. stopped uint32 // To be used atomically. cfg *UtxoSweeperConfig newInputs chan *sweepInputMessage spendChan chan *chainntnfs.SpendDetail // pendingSweepsReq is a channel that will be sent requests by external // callers in order to retrieve the set of pending inputs the // UtxoSweeper is attempting to sweep. pendingSweepsReqs chan *pendingSweepsReq // updateReqs is a channel that will be sent requests by external // callers who wish to bump the fee rate of a given input. updateReqs chan *updateReq // pendingInputs is the total set of inputs the UtxoSweeper has been // requested to sweep. pendingInputs pendingInputs testSpendChan chan wire.OutPoint currentOutputScript []byte relayFeeRate chainfee.SatPerKWeight quit chan struct{} wg sync.WaitGroup } // feeDeterminer defines an alias to the function signature of // `DetermineFeePerKw`. type feeDeterminer func(chainfee.Estimator, FeePreference) (chainfee.SatPerKWeight, error) // UtxoSweeperConfig contains dependencies of UtxoSweeper. type UtxoSweeperConfig struct { // GenSweepScript generates a P2WKH script belonging to the wallet where // funds can be swept. GenSweepScript func() ([]byte, error) // DetermineFeePerKw determines the fee in sat/kw based on the given // estimator and fee preference. DetermineFeePerKw feeDeterminer // FeeEstimator is used when crafting sweep transactions to estimate // the necessary fee relative to the expected size of the sweep // transaction. FeeEstimator chainfee.Estimator // Wallet contains the wallet functions that sweeper requires. Wallet Wallet // TickerDuration is used to create a channel that will be sent on when // a certain time window has passed. During this time window, new // inputs can still be added to the sweep tx that is about to be // generated. TickerDuration time.Duration // Notifier is an instance of a chain notifier we'll use to watch for // certain on-chain events. Notifier chainntnfs.ChainNotifier // Store stores the published sweeper txes. Store SweeperStore // Signer is used by the sweeper to generate valid witnesses at the // time the incubated outputs need to be spent. Signer input.Signer // MaxInputsPerTx specifies the default maximum number of inputs allowed // in a single sweep tx. If more need to be swept, multiple txes are // created and published. MaxInputsPerTx int // MaxSweepAttempts specifies the maximum number of times an input is // included in a publish attempt before giving up and returning an error // to the caller. MaxSweepAttempts int // NextAttemptDeltaFunc returns given the number of already attempted // sweeps, how many blocks to wait before retrying to sweep. NextAttemptDeltaFunc func(int) int32 // MaxFeeRate is the maximum fee rate allowed within the // UtxoSweeper. MaxFeeRate chainfee.SatPerVByte // FeeRateBucketSize is the default size of fee rate buckets we'll use // when clustering inputs into buckets with similar fee rates within the // UtxoSweeper. // // Given a minimum relay fee rate of 1 sat/vbyte, a fee rate bucket size // of 10 would result in the following fee rate buckets up to the // maximum fee rate: // // #1: min = 1 sat/vbyte, max (exclusive) = 11 sat/vbyte // #2: min = 11 sat/vbyte, max (exclusive) = 21 sat/vbyte... FeeRateBucketSize int } // Result is the struct that is pushed through the result channel. Callers can // use this to be informed of the final sweep result. In case of a remote // spend, Err will be ErrRemoteSpend. type Result struct { // Err is the final result of the sweep. It is nil when the input is // swept successfully by us. ErrRemoteSpend is returned when another // party took the input. Err error // Tx is the transaction that spent the input. Tx *wire.MsgTx } // sweepInputMessage structs are used in the internal channel between the // SweepInput call and the sweeper main loop. type sweepInputMessage struct { input input.Input params Params resultChan chan Result } // New returns a new Sweeper instance. func New(cfg *UtxoSweeperConfig) *UtxoSweeper { return &UtxoSweeper{ cfg: cfg, newInputs: make(chan *sweepInputMessage), spendChan: make(chan *chainntnfs.SpendDetail), updateReqs: make(chan *updateReq), pendingSweepsReqs: make(chan *pendingSweepsReq), quit: make(chan struct{}), pendingInputs: make(pendingInputs), } } // Start starts the process of constructing and publish sweep txes. func (s *UtxoSweeper) Start() error { if !atomic.CompareAndSwapUint32(&s.started, 0, 1) { return nil } log.Info("Sweeper starting") // Retrieve relay fee for dust limit calculation. Assume that this will // not change from here on. s.relayFeeRate = s.cfg.FeeEstimator.RelayFeePerKW() // We need to register for block epochs and retry sweeping every block. // We should get a notification with the current best block immediately // if we don't provide any epoch. We'll wait for that in the collector. blockEpochs, err := s.cfg.Notifier.RegisterBlockEpochNtfn(nil) if err != nil { return fmt.Errorf("register block epoch ntfn: %w", err) } // Start sweeper main loop. s.wg.Add(1) go func() { defer blockEpochs.Cancel() defer s.wg.Done() s.collector(blockEpochs.Epochs) // The collector exited and won't longer handle incoming // requests. This can happen on shutdown, when the block // notifier shuts down before the sweeper and its clients. In // order to not deadlock the clients waiting for their requests // being handled, we handle them here and immediately return an // error. When the sweeper finally is shut down we can exit as // the clients will be notified. for { select { case inp := <-s.newInputs: inp.resultChan <- Result{ Err: ErrSweeperShuttingDown, } case req := <-s.pendingSweepsReqs: req.errChan <- ErrSweeperShuttingDown case req := <-s.updateReqs: req.responseChan <- &updateResp{ err: ErrSweeperShuttingDown, } case <-s.quit: return } } }() return nil } // RelayFeePerKW returns the minimum fee rate required for transactions to be // relayed. func (s *UtxoSweeper) RelayFeePerKW() chainfee.SatPerKWeight { return s.relayFeeRate } // Stop stops sweeper from listening to block epochs and constructing sweep // txes. func (s *UtxoSweeper) Stop() error { if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) { return nil } log.Info("Sweeper shutting down...") defer log.Debug("Sweeper shutdown complete") close(s.quit) s.wg.Wait() return nil } // SweepInput sweeps inputs back into the wallet. The inputs will be batched and // swept after the batch time window ends. A custom fee preference can be // provided to determine what fee rate should be used for the input. Note that // the input may not always be swept with this exact value, as its possible for // it to be batched under the same transaction with other similar fee rate // inputs. // // NOTE: Extreme care needs to be taken that input isn't changed externally. // Because it is an interface and we don't know what is exactly behind it, we // cannot make a local copy in sweeper. func (s *UtxoSweeper) SweepInput(input input.Input, params Params) (chan Result, error) { if input == nil || input.OutPoint() == nil || input.SignDesc() == nil { return nil, errors.New("nil input received") } // Ensure the client provided a sane fee preference. if _, err := s.feeRateForPreference(params.Fee); err != nil { return nil, err } absoluteTimeLock, _ := input.RequiredLockTime() log.Infof("Sweep request received: out_point=%v, witness_type=%v, "+ "relative_time_lock=%v, absolute_time_lock=%v, amount=%v, "+ "parent=(%v), params=(%v)", input.OutPoint(), input.WitnessType(), input.BlocksToMaturity(), absoluteTimeLock, btcutil.Amount(input.SignDesc().Output.Value), input.UnconfParent(), params) sweeperInput := &sweepInputMessage{ input: input, params: params, resultChan: make(chan Result, 1), } // Deliver input to the main event loop. select { case s.newInputs <- sweeperInput: case <-s.quit: return nil, ErrSweeperShuttingDown } return sweeperInput.resultChan, nil } // feeRateForPreference returns a fee rate for the given fee preference. It // ensures that the fee rate respects the bounds of the UtxoSweeper. func (s *UtxoSweeper) feeRateForPreference( feePreference FeePreference) (chainfee.SatPerKWeight, error) { // Ensure a type of fee preference is specified to prevent using a // default below. if feePreference.FeeRate == 0 && feePreference.ConfTarget == 0 { return 0, ErrNoFeePreference } feeRate, err := s.cfg.DetermineFeePerKw( s.cfg.FeeEstimator, feePreference, ) if err != nil { return 0, err } if feeRate < s.relayFeeRate { return 0, fmt.Errorf("%w: got %v, minimum is %v", ErrFeePreferenceTooLow, feeRate, s.relayFeeRate) } // If the estimated fee rate is above the maximum allowed fee rate, // default to the max fee rate. if feeRate > s.cfg.MaxFeeRate.FeePerKWeight() { log.Warnf("Estimated fee rate %v exceeds max allowed fee "+ "rate %v, using max fee rate instead", feeRate, s.cfg.MaxFeeRate.FeePerKWeight()) return s.cfg.MaxFeeRate.FeePerKWeight(), nil } return feeRate, nil } // removeConflictSweepDescendants removes any transactions from the wallet that // spend outputs included in the passed outpoint set. This needs to be done in // cases where we're not the only ones that can sweep an output, but there may // exist unconfirmed spends that spend outputs created by a sweep transaction. // The most common case for this is when someone sweeps our anchor outputs // after 16 blocks. Moreover this is also needed for wallets which use neutrino // as a backend when a channel is force closed and anchor cpfp txns are // created to bump the initial commitment transaction. In this case an anchor // cpfp is broadcasted for up to 3 commitment transactions (local, // remote-dangling, remote). Using neutrino all of those transactions will be // accepted (the commitment tx will be different in all of those cases) and have // to be removed as soon as one of them confirmes (they do have the same // ExclusiveGroup). For neutrino backends the corresponding BIP 157 serving full // nodes do not signal invalid transactions anymore. func (s *UtxoSweeper) removeConflictSweepDescendants( outpoints map[wire.OutPoint]struct{}) error { // Obtain all the past sweeps that we've done so far. We'll need these // to ensure that if the spendingTx spends any of the same inputs, then // we remove any transaction that may be spending those inputs from the // wallet. // // TODO(roasbeef): can be last sweep here if we remove anything confirmed // from the store? pastSweepHashes, err := s.cfg.Store.ListSweeps() if err != nil { return err } // We'll now go through each past transaction we published during this // epoch and cross reference the spent inputs. If there're any inputs // in common with the inputs the spendingTx spent, then we'll remove // those. // // TODO(roasbeef): need to start to remove all transaction hashes after // every N blocks (assumed point of no return) for _, sweepHash := range pastSweepHashes { sweepTx, err := s.cfg.Wallet.FetchTx(sweepHash) if err != nil { return err } // Transaction wasn't found in the wallet, may have already // been replaced/removed. if sweepTx == nil { // If it was removed, then we'll play it safe and mark // it as no longer need to be rebroadcasted. s.cfg.Wallet.CancelRebroadcast(sweepHash) continue } // Check to see if this past sweep transaction spent any of the // same inputs as spendingTx. var isConflicting bool for _, txIn := range sweepTx.TxIn { if _, ok := outpoints[txIn.PreviousOutPoint]; ok { isConflicting = true break } } if !isConflicting { continue } // If it is conflicting, then we'll signal the wallet to remove // all the transactions that are descendants of outputs created // by the sweepTx and the sweepTx itself. log.Debugf("Removing sweep txid=%v from wallet: %v", sweepTx.TxHash(), spew.Sdump(sweepTx)) err = s.cfg.Wallet.RemoveDescendants(sweepTx) if err != nil { log.Warnf("Unable to remove descendants: %v", err) } // If this transaction was conflicting, then we'll stop // rebroadcasting it in the background. s.cfg.Wallet.CancelRebroadcast(sweepHash) } return nil } // collector is the sweeper main loop. It processes new inputs, spend // notifications and counts down to publication of the sweep tx. func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) { // We registered for the block epochs with a nil request. The notifier // should send us the current best block immediately. So we need to wait // for it here because we need to know the current best height. var bestHeight int32 select { case bestBlock := <-blockEpochs: bestHeight = bestBlock.Height case <-s.quit: return } // Create a ticker based on the config duration. ticker := time.NewTicker(s.cfg.TickerDuration) defer ticker.Stop() log.Debugf("Sweep ticker started") for { select { // A new inputs is offered to the sweeper. We check to see if // we are already trying to sweep this input and if not, set up // a listener to spend and schedule a sweep. case input := <-s.newInputs: s.handleNewInput(input, bestHeight) // A spend of one of our inputs is detected. Signal sweep // results to the caller(s). case spend := <-s.spendChan: s.handleInputSpent(spend) // A new external request has been received to retrieve all of // the inputs we're currently attempting to sweep. case req := <-s.pendingSweepsReqs: req.respChan <- s.handlePendingSweepsReq(req) // A new external request has been received to bump the fee rate // of a given input. case req := <-s.updateReqs: resultChan, err := s.handleUpdateReq(req, bestHeight) req.responseChan <- &updateResp{ resultChan: resultChan, err: err, } // The timer expires and we are going to (re)sweep. case <-ticker.C: log.Debugf("Sweep ticker ticks, attempt sweeping...") s.handleSweep(bestHeight) // A new block comes in, update the bestHeight. case epoch, ok := <-blockEpochs: if !ok { return } bestHeight = epoch.Height log.Debugf("New block: height=%v, sha=%v", epoch.Height, epoch.Hash) case <-s.quit: return } } } // removeExclusiveGroup removes all inputs in the given exclusive group. This // function is called when one of the exclusive group inputs has been spent. The // other inputs won't ever be spendable and can be removed. This also prevents // them from being part of future sweep transactions that would fail. In // addition sweep transactions of those inputs will be removed from the wallet. func (s *UtxoSweeper) removeExclusiveGroup(group uint64) { for outpoint, input := range s.pendingInputs { outpoint := outpoint // Skip inputs that aren't exclusive. if input.params.ExclusiveGroup == nil { continue } // Skip inputs from other exclusive groups. if *input.params.ExclusiveGroup != group { continue } // Signal result channels. s.signalAndRemove(&outpoint, Result{ Err: ErrExclusiveGroupSpend, }) // Remove all unconfirmed transactions from the wallet which // spend the passed outpoint of the same exclusive group. outpoints := map[wire.OutPoint]struct{}{ outpoint: {}, } err := s.removeConflictSweepDescendants(outpoints) if err != nil { log.Warnf("Unable to remove conflicting sweep tx from "+ "wallet for outpoint %v : %v", outpoint, err) } } } // sweepCluster tries to sweep the given input cluster. func (s *UtxoSweeper) sweepCluster(cluster inputCluster, currentHeight int32) error { // Execute the sweep within a coin select lock. Otherwise the coins // that we are going to spend may be selected for other transactions // like funding of a channel. return s.cfg.Wallet.WithCoinSelectLock(func() error { // Examine pending inputs and try to construct lists of inputs. allSets, newSets, err := s.getInputLists(cluster, currentHeight) if err != nil { return fmt.Errorf("examine pending inputs: %w", err) } // errAllSets records the error from broadcasting the sweeping // transactions for all input sets. var errAllSets error // allSets contains retried inputs and new inputs. To avoid // creating an RBF for the new inputs, we'd sweep this set // first. for _, inputs := range allSets { errAllSets = s.sweep( inputs, cluster.sweepFeeRate, currentHeight, ) // TODO(yy): we should also find out which set created // this error. If there are new inputs in this set, we // should give it a second chance by sweeping them // below. To enable this, we need to provide richer // state for each input other than just recording the // publishAttempts. We'd also need to refactor how we // create the input sets. Atm, the steps are, // 1. create a list of input sets. // 2. sweep each set by creating and publishing the tx. // We should change the flow as, // 1. create a list of input sets, and for each set, // 2. when created, we create and publish the tx. // 3. if the publish fails, find out which input is // causing the failure and retry the rest of the // inputs. if errAllSets != nil { log.Errorf("sweep all inputs: %w", err) break } } // If we have successfully swept all inputs, there's no need to // sweep the new inputs as it'd create an RBF case. if allSets != nil && errAllSets == nil { return nil } // We'd end up there if there's no retried inputs. In this // case, we'd sweep the new input sets. If there's an error // when sweeping a given set, we'd log the error and sweep the // next set. for _, inputs := range newSets { err := s.sweep( inputs, cluster.sweepFeeRate, currentHeight, ) if err != nil { log.Errorf("sweep new inputs: %w", err) } } return nil }) } // bucketForFeeReate determines the proper bucket for a fee rate. This is done // in order to batch inputs with similar fee rates together. func (s *UtxoSweeper) bucketForFeeRate( feeRate chainfee.SatPerKWeight) int { // Create an isolated bucket for sweeps at the minimum fee rate. This is // to prevent very small outputs (anchors) from becoming uneconomical if // their fee rate would be averaged with higher fee rate inputs in a // regular bucket. if feeRate == s.relayFeeRate { return 0 } return 1 + int(feeRate-s.relayFeeRate)/s.cfg.FeeRateBucketSize } // createInputClusters creates a list of input clusters from the set of pending // inputs known by the UtxoSweeper. It clusters inputs by // 1) Required tx locktime // 2) Similar fee rates. func (s *UtxoSweeper) createInputClusters() []inputCluster { inputs := s.pendingInputs // We start by getting the inputs clusters by locktime. Since the // inputs commit to the locktime, they can only be clustered together // if the locktime is equal. lockTimeClusters, nonLockTimeInputs := s.clusterByLockTime(inputs) // Cluster the remaining inputs by sweep fee rate. feeClusters := s.clusterBySweepFeeRate(nonLockTimeInputs) // Since the inputs that we clustered by fee rate don't commit to a // specific locktime, we can try to merge a locktime cluster with a fee // cluster. return zipClusters(lockTimeClusters, feeClusters) } // clusterByLockTime takes the given set of pending inputs and clusters those // with equal locktime together. Each cluster contains a sweep fee rate, which // is determined by calculating the average fee rate of all inputs within that // cluster. In addition to the created clusters, inputs that did not specify a // required lock time are returned. func (s *UtxoSweeper) clusterByLockTime(inputs pendingInputs) ([]inputCluster, pendingInputs) { locktimes := make(map[uint32]pendingInputs) rem := make(pendingInputs) // Go through all inputs and check if they require a certain locktime. for op, input := range inputs { lt, ok := input.RequiredLockTime() if !ok { rem[op] = input continue } // Check if we already have inputs with this locktime. cluster, ok := locktimes[lt] if !ok { cluster = make(pendingInputs) } // Get the fee rate based on the fee preference. If an error is // returned, we'll skip sweeping this input for this round of // cluster creation and retry it when we create the clusters // from the pending inputs again. feeRate, err := s.feeRateForPreference(input.params.Fee) if err != nil { log.Warnf("Skipping input %v: %v", op, err) continue } log.Debugf("Adding input %v to cluster with locktime=%v, "+ "feeRate=%v", op, lt, feeRate) // Attach the fee rate to the input. input.lastFeeRate = feeRate // Update the cluster about the updated input. cluster[op] = input locktimes[lt] = cluster } // We'll then determine the sweep fee rate for each set of inputs by // calculating the average fee rate of the inputs within each set. inputClusters := make([]inputCluster, 0, len(locktimes)) for lt, cluster := range locktimes { lt := lt var sweepFeeRate chainfee.SatPerKWeight for _, input := range cluster { sweepFeeRate += input.lastFeeRate } sweepFeeRate /= chainfee.SatPerKWeight(len(cluster)) inputClusters = append(inputClusters, inputCluster{ lockTime: <, sweepFeeRate: sweepFeeRate, inputs: cluster, }) } return inputClusters, rem } // clusterBySweepFeeRate takes the set of pending inputs within the UtxoSweeper // and clusters those together with similar fee rates. Each cluster contains a // sweep fee rate, which is determined by calculating the average fee rate of // all inputs within that cluster. func (s *UtxoSweeper) clusterBySweepFeeRate(inputs pendingInputs) []inputCluster { bucketInputs := make(map[int]*bucketList) inputFeeRates := make(map[wire.OutPoint]chainfee.SatPerKWeight) // First, we'll group together all inputs with similar fee rates. This // is done by determining the fee rate bucket they should belong in. for op, input := range inputs { feeRate, err := s.feeRateForPreference(input.params.Fee) if err != nil { log.Warnf("Skipping input %v: %v", op, err) continue } // Only try to sweep inputs with an unconfirmed parent if the // current sweep fee rate exceeds the parent tx fee rate. This // assumes that such inputs are offered to the sweeper solely // for the purpose of anchoring down the parent tx using cpfp. parentTx := input.UnconfParent() if parentTx != nil { parentFeeRate := chainfee.SatPerKWeight(parentTx.Fee*1000) / chainfee.SatPerKWeight(parentTx.Weight) if parentFeeRate >= feeRate { log.Debugf("Skipping cpfp input %v: fee_rate=%v, "+ "parent_fee_rate=%v", op, feeRate, parentFeeRate) continue } } feeGroup := s.bucketForFeeRate(feeRate) // Create a bucket list for this fee rate if there isn't one // yet. buckets, ok := bucketInputs[feeGroup] if !ok { buckets = &bucketList{} bucketInputs[feeGroup] = buckets } // Request the bucket list to add this input. The bucket list // will take into account exclusive group constraints. buckets.add(input) input.lastFeeRate = feeRate inputFeeRates[op] = feeRate } // We'll then determine the sweep fee rate for each set of inputs by // calculating the average fee rate of the inputs within each set. inputClusters := make([]inputCluster, 0, len(bucketInputs)) for _, buckets := range bucketInputs { for _, inputs := range buckets.buckets { var sweepFeeRate chainfee.SatPerKWeight for op := range inputs { sweepFeeRate += inputFeeRates[op] } sweepFeeRate /= chainfee.SatPerKWeight(len(inputs)) inputClusters = append(inputClusters, inputCluster{ sweepFeeRate: sweepFeeRate, inputs: inputs, }) } } return inputClusters } // zipClusters merges pairwise clusters from as and bs such that cluster a from // as is merged with a cluster from bs that has at least the fee rate of a. // This to ensure we don't delay confirmation by decreasing the fee rate (the // lock time inputs are typically second level HTLC transactions, that are time // sensitive). func zipClusters(as, bs []inputCluster) []inputCluster { // Sort the clusters by decreasing fee rates. sort.Slice(as, func(i, j int) bool { return as[i].sweepFeeRate > as[j].sweepFeeRate }) sort.Slice(bs, func(i, j int) bool { return bs[i].sweepFeeRate > bs[j].sweepFeeRate }) var ( finalClusters []inputCluster j int ) // Go through each cluster in as, and merge with the next one from bs // if it has at least the fee rate needed. for i := range as { a := as[i] switch { // If the fee rate for the next one from bs is at least a's, we // merge. case j < len(bs) && bs[j].sweepFeeRate >= a.sweepFeeRate: merged := mergeClusters(a, bs[j]) finalClusters = append(finalClusters, merged...) // Increment j for the next round. j++ // We did not merge, meaning all the remaining clusters from bs // have lower fee rate. Instead we add a directly to the final // clusters. default: finalClusters = append(finalClusters, a) } } // Add any remaining clusters from bs. for ; j < len(bs); j++ { b := bs[j] finalClusters = append(finalClusters, b) } return finalClusters } // mergeClusters attempts to merge cluster a and b if they are compatible. The // new cluster will have the locktime set if a or b had a locktime set, and a // sweep fee rate that is the maximum of a and b's. If the two clusters are not // compatible, they will be returned unchanged. func mergeClusters(a, b inputCluster) []inputCluster { newCluster := inputCluster{} switch { // Incompatible locktimes, return the sets without merging them. case a.lockTime != nil && b.lockTime != nil && *a.lockTime != *b.lockTime: return []inputCluster{a, b} case a.lockTime != nil: newCluster.lockTime = a.lockTime case b.lockTime != nil: newCluster.lockTime = b.lockTime } if a.sweepFeeRate > b.sweepFeeRate { newCluster.sweepFeeRate = a.sweepFeeRate } else { newCluster.sweepFeeRate = b.sweepFeeRate } newCluster.inputs = make(pendingInputs) for op, in := range a.inputs { newCluster.inputs[op] = in } for op, in := range b.inputs { newCluster.inputs[op] = in } return []inputCluster{newCluster} } // signalAndRemove notifies the listeners of the final result of the input // sweep. It cancels any pending spend notification and removes the input from // the list of pending inputs. When this function returns, the sweeper has // completely forgotten about the input. func (s *UtxoSweeper) signalAndRemove(outpoint *wire.OutPoint, result Result) { pendInput := s.pendingInputs[*outpoint] listeners := pendInput.listeners if result.Err == nil { log.Debugf("Dispatching sweep success for %v to %v listeners", outpoint, len(listeners), ) } else { log.Debugf("Dispatching sweep error for %v to %v listeners: %v", outpoint, len(listeners), result.Err, ) } // Signal all listeners. Channel is buffered. Because we only send once // on every channel, it should never block. for _, resultChan := range listeners { resultChan <- result } // Cancel spend notification with chain notifier. This is not necessary // in case of a success, except for that a reorg could still happen. if pendInput.ntfnRegCancel != nil { log.Debugf("Canceling spend ntfn for %v", outpoint) pendInput.ntfnRegCancel() } // Inputs are no longer pending after result has been sent. delete(s.pendingInputs, *outpoint) } // getInputLists goes through the given inputs and constructs multiple distinct // sweep lists with the given fee rate, each up to the configured maximum // number of inputs. Negative yield inputs are skipped. Transactions with an // output below the dust limit are not published. Those inputs remain pending // and will be bundled with future inputs if possible. It returns two list - // one containing all inputs and the other containing only the new inputs. If // there's no retried inputs, the first set returned will be empty. func (s *UtxoSweeper) getInputLists(cluster inputCluster, currentHeight int32) ([]inputSet, []inputSet, error) { // Filter for inputs that need to be swept. Create two lists: all // sweepable inputs and a list containing only the new, never tried // inputs. // // We want to create as large a tx as possible, so we return a final // set list that starts with sets created from all inputs. However, // there is a chance that those txes will not publish, because they // already contain inputs that failed before. Therefore we also add // sets consisting of only new inputs to the list, to make sure that // new inputs are given a good, isolated chance of being published. // // TODO(yy): this would lead to conflict transactions as the same input // can be used in two sweeping transactions, and our rebroadcaster will // retry the failed one. We should instead understand why the input is // failed in the first place, and start tracking input states in // sweeper to avoid this. var newInputs, retryInputs []txInput for _, input := range cluster.inputs { // Skip inputs that have a minimum publish height that is not // yet reached. if input.minPublishHeight > currentHeight { continue } // Add input to the either one of the lists. if input.publishAttempts == 0 { newInputs = append(newInputs, input) } else { retryInputs = append(retryInputs, input) } } // Convert the max fee rate's unit from sat/vb to sat/kw. maxFeeRate := s.cfg.MaxFeeRate.FeePerKWeight() // If there is anything to retry, combine it with the new inputs and // form input sets. var allSets []inputSet if len(retryInputs) > 0 { var err error allSets, err = generateInputPartitionings( append(retryInputs, newInputs...), cluster.sweepFeeRate, maxFeeRate, s.cfg.MaxInputsPerTx, s.cfg.Wallet, ) if err != nil { return nil, nil, fmt.Errorf("input partitionings: %w", err) } } // Create sets for just the new inputs. newSets, err := generateInputPartitionings( newInputs, cluster.sweepFeeRate, maxFeeRate, s.cfg.MaxInputsPerTx, s.cfg.Wallet, ) if err != nil { return nil, nil, fmt.Errorf("input partitionings: %w", err) } log.Debugf("Sweep candidates at height=%v: total_num_pending=%v, "+ "total_num_new=%v", currentHeight, len(allSets), len(newSets)) return allSets, newSets, nil } // sweep takes a set of preselected inputs, creates a sweep tx and publishes the // tx. The output address is only marked as used if the publish succeeds. func (s *UtxoSweeper) sweep(inputs inputSet, feeRate chainfee.SatPerKWeight, currentHeight int32) error { // Generate an output script if there isn't an unused script available. if s.currentOutputScript == nil { pkScript, err := s.cfg.GenSweepScript() if err != nil { return fmt.Errorf("gen sweep script: %w", err) } s.currentOutputScript = pkScript } // Create sweep tx. tx, err := createSweepTx( inputs, nil, s.currentOutputScript, uint32(currentHeight), feeRate, s.cfg.MaxFeeRate.FeePerKWeight(), s.cfg.Signer, ) if err != nil { return fmt.Errorf("create sweep tx: %w", err) } // Add tx before publication, so that we will always know that a spend // by this tx is ours. Otherwise if the publish doesn't return, but did // publish, we loose track of this tx. Even republication on startup // doesn't prevent this, because that call returns a double spend error // then and would also not add the hash to the store. err = s.cfg.Store.NotifyPublishTx(tx) if err != nil { return fmt.Errorf("notify publish tx: %w", err) } // Reschedule the inputs that we just tried to sweep. This is done in // case the following publish fails, we'd like to update the inputs' // publish attempts and rescue them in the next sweep. s.rescheduleInputs(tx.TxIn, currentHeight) log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v", tx.TxHash(), len(tx.TxIn), currentHeight) // Publish the sweeping tx with customized label. err = s.cfg.Wallet.PublishTransaction( tx, labels.MakeLabel(labels.LabelTypeSweepTransaction, nil), ) if err != nil { return err } // If there's no error, remove the output script. Otherwise keep it so // that it can be reused for the next transaction and causes no address // inflation. s.currentOutputScript = nil return nil } // rescheduleInputs updates the pending inputs with the given tx inputs. It // increments the `publishAttempts` and calculates the next broadcast height // for each input. When the publishAttempts exceeds MaxSweepAttemps(10), this // input will be removed. func (s *UtxoSweeper) rescheduleInputs(inputs []*wire.TxIn, currentHeight int32) { // Reschedule sweep. for _, input := range inputs { pi, ok := s.pendingInputs[input.PreviousOutPoint] if !ok { // It can be that the input has been removed because it // exceed the maximum number of attempts in a previous // input set. It could also be that this input is an // additional wallet input that was attached. In that // case there also isn't a pending input to update. continue } // Record another publish attempt. pi.publishAttempts++ // We don't care what the result of the publish call was. Even // if it is published successfully, it can still be that it // needs to be retried. Call NextAttemptDeltaFunc to calculate // when to resweep this input. nextAttemptDelta := s.cfg.NextAttemptDeltaFunc( pi.publishAttempts, ) pi.minPublishHeight = currentHeight + nextAttemptDelta log.Debugf("Rescheduling input %v after %v attempts at "+ "height %v (delta %v)", input.PreviousOutPoint, pi.publishAttempts, pi.minPublishHeight, nextAttemptDelta) if pi.publishAttempts >= s.cfg.MaxSweepAttempts { log.Warnf("input %v: publishAttempts(%v) exceeds "+ "MaxSweepAttempts(%v), removed", input.PreviousOutPoint, pi.publishAttempts, s.cfg.MaxSweepAttempts) // Signal result channels sweep result. s.signalAndRemove(&input.PreviousOutPoint, Result{ Err: ErrTooManyAttempts, }) } } } // monitorSpend registers a spend notification with the chain notifier. It // returns a cancel function that can be used to cancel the registration. func (s *UtxoSweeper) monitorSpend(outpoint wire.OutPoint, script []byte, heightHint uint32) (func(), error) { log.Tracef("Wait for spend of %v at heightHint=%v", outpoint, heightHint) spendEvent, err := s.cfg.Notifier.RegisterSpendNtfn( &outpoint, script, heightHint, ) if err != nil { return nil, fmt.Errorf("register spend ntfn: %w", err) } s.wg.Add(1) go func() { defer s.wg.Done() select { case spend, ok := <-spendEvent.Spend: if !ok { log.Debugf("Spend ntfn for %v canceled", outpoint) return } log.Debugf("Delivering spend ntfn for %v", outpoint) select { case s.spendChan <- spend: log.Debugf("Delivered spend ntfn for %v", outpoint) case <-s.quit: } case <-s.quit: } }() return spendEvent.Cancel, nil } // PendingInputs returns the set of inputs that the UtxoSweeper is currently // attempting to sweep. func (s *UtxoSweeper) PendingInputs() (map[wire.OutPoint]*PendingInput, error) { respChan := make(chan map[wire.OutPoint]*PendingInput, 1) errChan := make(chan error, 1) select { case s.pendingSweepsReqs <- &pendingSweepsReq{ respChan: respChan, errChan: errChan, }: case <-s.quit: return nil, ErrSweeperShuttingDown } select { case pendingSweeps := <-respChan: return pendingSweeps, nil case err := <-errChan: return nil, err case <-s.quit: return nil, ErrSweeperShuttingDown } } // handlePendingSweepsReq handles a request to retrieve all pending inputs the // UtxoSweeper is attempting to sweep. func (s *UtxoSweeper) handlePendingSweepsReq( req *pendingSweepsReq) map[wire.OutPoint]*PendingInput { pendingInputs := make(map[wire.OutPoint]*PendingInput, len(s.pendingInputs)) for _, pendingInput := range s.pendingInputs { // Only the exported fields are set, as we expect the response // to only be consumed externally. op := *pendingInput.OutPoint() pendingInputs[op] = &PendingInput{ OutPoint: op, WitnessType: pendingInput.WitnessType(), Amount: btcutil.Amount( pendingInput.SignDesc().Output.Value, ), LastFeeRate: pendingInput.lastFeeRate, BroadcastAttempts: pendingInput.publishAttempts, NextBroadcastHeight: uint32(pendingInput.minPublishHeight), Params: pendingInput.params, } } return pendingInputs } // UpdateParams allows updating the sweep parameters of a pending input in the // UtxoSweeper. This function can be used to provide an updated fee preference // and force flag that will be used for a new sweep transaction of the input // that will act as a replacement transaction (RBF) of the original sweeping // transaction, if any. The exclusive group is left unchanged. // // NOTE: This currently doesn't do any fee rate validation to ensure that a bump // is actually successful. The responsibility of doing so should be handled by // the caller. func (s *UtxoSweeper) UpdateParams(input wire.OutPoint, params ParamsUpdate) (chan Result, error) { // Ensure the client provided a sane fee preference. if _, err := s.feeRateForPreference(params.Fee); err != nil { return nil, err } responseChan := make(chan *updateResp, 1) select { case s.updateReqs <- &updateReq{ input: input, params: params, responseChan: responseChan, }: case <-s.quit: return nil, ErrSweeperShuttingDown } select { case response := <-responseChan: return response.resultChan, response.err case <-s.quit: return nil, ErrSweeperShuttingDown } } // handleUpdateReq handles an update request by simply updating the sweep // parameters of the pending input. Currently, no validation is done on the new // fee preference to ensure it will properly create a replacement transaction. // // TODO(wilmer): // - Validate fee preference to ensure we'll create a valid replacement // transaction to allow the new fee rate to propagate throughout the // network. // - Ensure we don't combine this input with any other unconfirmed inputs that // did not exist in the original sweep transaction, resulting in an invalid // replacement transaction. func (s *UtxoSweeper) handleUpdateReq(req *updateReq, bestHeight int32) ( chan Result, error) { // If the UtxoSweeper is already trying to sweep this input, then we can // simply just increase its fee rate. This will allow the input to be // batched with others which also have a similar fee rate, creating a // higher fee rate transaction that replaces the original input's // sweeping transaction. pendingInput, ok := s.pendingInputs[req.input] if !ok { return nil, lnwallet.ErrNotMine } // Create the updated parameters struct. Leave the exclusive group // unchanged. newParams := pendingInput.params newParams.Fee = req.params.Fee newParams.Force = req.params.Force log.Debugf("Updating sweep parameters for %v from %v to %v", req.input, pendingInput.params, newParams) pendingInput.params = newParams // We'll reset the input's publish height to the current so that a new // transaction can be created that replaces the transaction currently // spending the input. We only do this for inputs that have been // broadcast at least once to ensure we don't spend an input before its // maturity height. // // NOTE: The UtxoSweeper is not yet offered time-locked inputs, so the // check for broadcast attempts is redundant at the moment. if pendingInput.publishAttempts > 0 { pendingInput.minPublishHeight = bestHeight } resultChan := make(chan Result, 1) pendingInput.listeners = append(pendingInput.listeners, resultChan) return resultChan, nil } // CreateSweepTx accepts a list of inputs and signs and generates a txn that // spends from them. This method also makes an accurate fee estimate before // generating the required witnesses. // // The created transaction has a single output sending all the funds back to // the source wallet, after accounting for the fee estimate. // // The value of currentBlockHeight argument will be set as the tx locktime. // This function assumes that all CLTV inputs will be unlocked after // currentBlockHeight. Reasons not to use the maximum of all actual CLTV expiry // values of the inputs: // // - Make handling re-orgs easier. // - Thwart future possible fee sniping attempts. // - Make us blend in with the bitcoind wallet. func (s *UtxoSweeper) CreateSweepTx(inputs []input.Input, feePref FeePreference, currentBlockHeight uint32) (*wire.MsgTx, error) { feePerKw, err := s.cfg.DetermineFeePerKw(s.cfg.FeeEstimator, feePref) if err != nil { return nil, err } // Generate the receiving script to which the funds will be swept. pkScript, err := s.cfg.GenSweepScript() if err != nil { return nil, err } return createSweepTx( inputs, nil, pkScript, currentBlockHeight, feePerKw, s.cfg.MaxFeeRate.FeePerKWeight(), s.cfg.Signer, ) } // DefaultNextAttemptDeltaFunc is the default calculation for next sweep attempt // scheduling. It implements exponential back-off with some randomness. This is // to prevent a stuck tx (for example because fee is too low and can't be bumped // in btcd) from blocking all other retried inputs in the same tx. func DefaultNextAttemptDeltaFunc(attempts int) int32 { return 1 + rand.Int31n(1< inputClusters[j].sweepFeeRate }) for _, cluster := range inputClusters { err := s.sweepCluster(cluster, bestHeight) if err != nil { log.Errorf("input cluster sweep: %v", err) } } } // init initializes the random generator for random input rescheduling. func init() { rand.Seed(time.Now().Unix()) }