mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-01-19 14:45:23 +01:00
19265ac8ed
So these inputs can be retried by the sweeper.
954 lines
29 KiB
Go
954 lines
29 KiB
Go
package sweep
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/btcsuite/btcd/btcutil"
|
|
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
|
"github.com/btcsuite/btcd/rpcclient"
|
|
"github.com/btcsuite/btcd/wire"
|
|
"github.com/btcsuite/btcwallet/chain"
|
|
"github.com/davecgh/go-spew/spew"
|
|
"github.com/lightningnetwork/lnd/chainntnfs"
|
|
"github.com/lightningnetwork/lnd/fn"
|
|
"github.com/lightningnetwork/lnd/input"
|
|
"github.com/lightningnetwork/lnd/labels"
|
|
"github.com/lightningnetwork/lnd/lnutils"
|
|
"github.com/lightningnetwork/lnd/lnwallet"
|
|
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
|
|
)
|
|
|
|
var (
|
|
// ErrInvalidBumpResult is returned when the bump result is invalid.
|
|
ErrInvalidBumpResult = errors.New("invalid bump result")
|
|
|
|
// ErrNotEnoughBudget is returned when the fee bumper decides the
|
|
// current budget cannot cover the fee.
|
|
ErrNotEnoughBudget = errors.New("not enough budget")
|
|
)
|
|
|
|
// Bumper defines an interface that can be used by other subsystems for fee
|
|
// bumping.
|
|
type Bumper interface {
|
|
// Broadcast is used to publish the tx created from the given inputs
|
|
// specified in the request. It handles the tx creation, broadcasts it,
|
|
// and monitors its confirmation status for potential fee bumping. It
|
|
// returns a chan that the caller can use to receive updates about the
|
|
// broadcast result and potential RBF attempts.
|
|
Broadcast(req *BumpRequest) (<-chan *BumpResult, error)
|
|
}
|
|
|
|
// BumpEvent represents the event of a fee bumping attempt.
|
|
type BumpEvent uint8
|
|
|
|
const (
|
|
// TxPublished is sent when the broadcast attempt is finished.
|
|
TxPublished BumpEvent = iota
|
|
|
|
// TxFailed is sent when the broadcast attempt fails.
|
|
TxFailed
|
|
|
|
// TxReplaced is sent when the original tx is replaced by a new one.
|
|
TxReplaced
|
|
|
|
// TxConfirmed is sent when the tx is confirmed.
|
|
TxConfirmed
|
|
|
|
// sentinalEvent is used to check if an event is unknown.
|
|
sentinalEvent
|
|
)
|
|
|
|
// String returns a human-readable string for the event.
|
|
func (e BumpEvent) String() string {
|
|
switch e {
|
|
case TxPublished:
|
|
return "Published"
|
|
case TxFailed:
|
|
return "Failed"
|
|
case TxReplaced:
|
|
return "Replaced"
|
|
case TxConfirmed:
|
|
return "Confirmed"
|
|
default:
|
|
return "Unknown"
|
|
}
|
|
}
|
|
|
|
// Unknown returns true if the event is unknown.
|
|
func (e BumpEvent) Unknown() bool {
|
|
return e >= sentinalEvent
|
|
}
|
|
|
|
// BumpRequest is used by the caller to give the Bumper the necessary info to
|
|
// create and manage potential fee bumps for a set of inputs.
|
|
type BumpRequest struct {
|
|
// Budget givens the total amount that can be used as fees by these
|
|
// inputs.
|
|
Budget btcutil.Amount
|
|
|
|
// Inputs is the set of inputs to sweep.
|
|
Inputs []input.Input
|
|
|
|
// DeadlineHeight is the block height at which the tx should be
|
|
// confirmed.
|
|
DeadlineHeight int32
|
|
|
|
// DeliveryAddress is the script to send the change output to.
|
|
DeliveryAddress []byte
|
|
|
|
// MaxFeeRate is the maximum fee rate that can be used for fee bumping.
|
|
MaxFeeRate chainfee.SatPerKWeight
|
|
}
|
|
|
|
// MaxFeeRateAllowed returns the maximum fee rate allowed for the given
|
|
// request. It calculates the feerate using the supplied budget and the weight,
|
|
// compares it with the specified MaxFeeRate, and returns the smaller of the
|
|
// two.
|
|
func (r *BumpRequest) MaxFeeRateAllowed() (chainfee.SatPerKWeight, error) {
|
|
// Get the size of the sweep tx, which will be used to calculate the
|
|
// budget fee rate.
|
|
size, err := calcSweepTxWeight(r.Inputs, r.DeliveryAddress)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Use the budget and MaxFeeRate to decide the max allowed fee rate.
|
|
// This is needed as, when the input has a large value and the user
|
|
// sets the budget to be proportional to the input value, the fee rate
|
|
// can be very high and we need to make sure it doesn't exceed the max
|
|
// fee rate.
|
|
maxFeeRateAllowed := chainfee.NewSatPerKWeight(r.Budget, size)
|
|
if maxFeeRateAllowed > r.MaxFeeRate {
|
|
log.Debugf("Budget feerate %v exceeds MaxFeeRate %v, use "+
|
|
"MaxFeeRate instead", maxFeeRateAllowed, r.MaxFeeRate)
|
|
|
|
return r.MaxFeeRate, nil
|
|
}
|
|
|
|
log.Debugf("Budget feerate %v below MaxFeeRate %v, use budget feerate "+
|
|
"instead", maxFeeRateAllowed, r.MaxFeeRate)
|
|
|
|
return maxFeeRateAllowed, nil
|
|
}
|
|
|
|
// calcSweepTxWeight calculates the weight of the sweep tx. It assumes a
|
|
// sweeping tx always has a single output(change).
|
|
func calcSweepTxWeight(inputs []input.Input,
|
|
outputPkScript []byte) (uint64, error) {
|
|
|
|
// Use a const fee rate as we only use the weight estimator to
|
|
// calculate the size.
|
|
const feeRate = 1
|
|
|
|
// Initialize the tx weight estimator with,
|
|
// - nil outputs as we only have one single change output.
|
|
// - const fee rate as we don't care about the fees here.
|
|
// - 0 maxfeerate as we don't care about fees here.
|
|
//
|
|
// TODO(yy): we should refactor the weight estimator to not require a
|
|
// fee rate and max fee rate and make it a pure tx weight calculator.
|
|
_, estimator, err := getWeightEstimate(
|
|
inputs, nil, feeRate, 0, outputPkScript,
|
|
)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return uint64(estimator.weight()), nil
|
|
}
|
|
|
|
// BumpResult is used by the Bumper to send updates about the tx being
|
|
// broadcast.
|
|
type BumpResult struct {
|
|
// Event is the type of event that the result is for.
|
|
Event BumpEvent
|
|
|
|
// Tx is the tx being broadcast.
|
|
Tx *wire.MsgTx
|
|
|
|
// ReplacedTx is the old, replaced tx if a fee bump is attempted.
|
|
ReplacedTx *wire.MsgTx
|
|
|
|
// FeeRate is the fee rate used for the new tx.
|
|
FeeRate chainfee.SatPerKWeight
|
|
|
|
// Fee is the fee paid by the new tx.
|
|
Fee btcutil.Amount
|
|
|
|
// Err is the error that occurred during the broadcast.
|
|
Err error
|
|
|
|
// requestID is the ID of the request that created this record.
|
|
requestID uint64
|
|
}
|
|
|
|
// Validate validates the BumpResult so it's safe to use.
|
|
func (b *BumpResult) Validate() error {
|
|
// Every result must have a tx.
|
|
if b.Tx == nil {
|
|
return fmt.Errorf("%w: nil tx", ErrInvalidBumpResult)
|
|
}
|
|
|
|
// Every result must have a known event.
|
|
if b.Event.Unknown() {
|
|
return fmt.Errorf("%w: unknown event", ErrInvalidBumpResult)
|
|
}
|
|
|
|
// If it's a replacing event, it must have a replaced tx.
|
|
if b.Event == TxReplaced && b.ReplacedTx == nil {
|
|
return fmt.Errorf("%w: nil replacing tx", ErrInvalidBumpResult)
|
|
}
|
|
|
|
// If it's a failed event, it must have an error.
|
|
if b.Event == TxFailed && b.Err == nil {
|
|
return fmt.Errorf("%w: nil error", ErrInvalidBumpResult)
|
|
}
|
|
|
|
// If it's a confirmed event, it must have a fee rate and fee.
|
|
if b.Event == TxConfirmed && (b.FeeRate == 0 || b.Fee == 0) {
|
|
return fmt.Errorf("%w: missing fee rate or fee",
|
|
ErrInvalidBumpResult)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// TxPublisherConfig is the config used to create a new TxPublisher.
|
|
type TxPublisherConfig struct {
|
|
// Signer is used to create the tx signature.
|
|
Signer input.Signer
|
|
|
|
// Wallet is used primarily to publish the tx.
|
|
Wallet Wallet
|
|
|
|
// Estimator is used to estimate the fee rate for the new tx based on
|
|
// its deadline conf target.
|
|
Estimator chainfee.Estimator
|
|
|
|
// Notifier is used to monitor the confirmation status of the tx.
|
|
Notifier chainntnfs.ChainNotifier
|
|
}
|
|
|
|
// TxPublisher is an implementation of the Bumper interface. It utilizes the
|
|
// `testmempoolaccept` RPC to bump the fee of txns it created based on
|
|
// different fee function selected or configed by the caller. Its purpose is to
|
|
// take a list of inputs specified, and create a tx that spends them to a
|
|
// specified output. It will then monitor the confirmation status of the tx,
|
|
// and if it's not confirmed within a certain time frame, it will attempt to
|
|
// bump the fee of the tx by creating a new tx that spends the same inputs to
|
|
// the same output, but with a higher fee rate. It will continue to do this
|
|
// until the tx is confirmed or the fee rate reaches the maximum fee rate
|
|
// specified by the caller.
|
|
type TxPublisher struct {
|
|
wg sync.WaitGroup
|
|
|
|
// cfg specifies the configuration of the TxPublisher.
|
|
cfg *TxPublisherConfig
|
|
|
|
// currentHeight is the current block height.
|
|
currentHeight int32
|
|
|
|
// records is a map keyed by the requestCounter and the value is the tx
|
|
// being monitored.
|
|
records lnutils.SyncMap[uint64, *monitorRecord]
|
|
|
|
// requestCounter is a monotonically increasing counter used to keep
|
|
// track of how many requests have been made.
|
|
requestCounter atomic.Uint64
|
|
|
|
// subscriberChans is a map keyed by the requestCounter, each item is
|
|
// the chan that the publisher sends the fee bump result to.
|
|
subscriberChans lnutils.SyncMap[uint64, chan *BumpResult]
|
|
|
|
// quit is used to signal the publisher to stop.
|
|
quit chan struct{}
|
|
}
|
|
|
|
// Compile-time constraint to ensure TxPublisher implements Bumper.
|
|
var _ Bumper = (*TxPublisher)(nil)
|
|
|
|
// NewTxPublisher creates a new TxPublisher.
|
|
func NewTxPublisher(cfg TxPublisherConfig) *TxPublisher {
|
|
return &TxPublisher{
|
|
cfg: &cfg,
|
|
records: lnutils.SyncMap[uint64, *monitorRecord]{},
|
|
subscriberChans: lnutils.SyncMap[uint64, chan *BumpResult]{},
|
|
quit: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Broadcast is used to publish the tx created from the given inputs. It will,
|
|
// 1. init a fee function based on the given strategy.
|
|
// 2. create an RBF-compliant tx and monitor it for confirmation.
|
|
// 3. notify the initial broadcast result back to the caller.
|
|
// The initial broadcast is guaranteed to be RBF-compliant unless the budget
|
|
// specified cannot cover the fee.
|
|
//
|
|
// NOTE: part of the Bumper interface.
|
|
func (t *TxPublisher) Broadcast(req *BumpRequest) (<-chan *BumpResult, error) {
|
|
log.Tracef("Received broadcast request: %s", newLogClosure(
|
|
func() string {
|
|
return spew.Sdump(req)
|
|
})())
|
|
|
|
// Attempt an initial broadcast which is guaranteed to comply with the
|
|
// RBF rules.
|
|
result, err := t.initialBroadcast(req)
|
|
if err != nil {
|
|
log.Errorf("Initial broadcast failed: %v", err)
|
|
|
|
return nil, err
|
|
}
|
|
|
|
// Create a chan to send the result to the caller.
|
|
subscriber := make(chan *BumpResult, 1)
|
|
t.subscriberChans.Store(result.requestID, subscriber)
|
|
|
|
// Send the initial broadcast result to the caller.
|
|
t.handleResult(result)
|
|
|
|
return subscriber, nil
|
|
}
|
|
|
|
// initialBroadcast initializes a fee function, creates an RBF-compliant tx and
|
|
// broadcasts it.
|
|
func (t *TxPublisher) initialBroadcast(req *BumpRequest) (*BumpResult, error) {
|
|
// Create a fee bumping algorithm to be used for future RBF.
|
|
feeAlgo, err := t.initializeFeeFunction(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("init fee function: %w", err)
|
|
}
|
|
|
|
// Create the initial tx to be broadcasted. This tx is guaranteed to
|
|
// comply with the RBF restrictions.
|
|
requestID, err := t.createRBFCompliantTx(req, feeAlgo)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create RBF-compliant tx: %w", err)
|
|
}
|
|
|
|
// Broadcast the tx and return the monitored record.
|
|
result, err := t.broadcast(requestID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("broadcast sweep tx: %w", err)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// initializeFeeFunction initializes a fee function to be used for this request
|
|
// for future fee bumping.
|
|
func (t *TxPublisher) initializeFeeFunction(
|
|
req *BumpRequest) (FeeFunction, error) {
|
|
|
|
// Get the max allowed feerate.
|
|
maxFeeRateAllowed, err := req.MaxFeeRateAllowed()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Get the initial conf target.
|
|
confTarget := calcCurrentConfTarget(t.currentHeight, req.DeadlineHeight)
|
|
|
|
// Initialize the fee function and return it.
|
|
//
|
|
// TODO(yy): return based on differet req.Strategy?
|
|
return NewLinearFeeFunction(
|
|
maxFeeRateAllowed, confTarget, t.cfg.Estimator,
|
|
)
|
|
}
|
|
|
|
// createRBFCompliantTx creates a tx that is compliant with RBF rules. It does
|
|
// so by creating a tx, validate it using `TestMempoolAccept`, and bump its fee
|
|
// and redo the process until the tx is valid, or return an error when non-RBF
|
|
// related errors occur or the budget has been used up.
|
|
func (t *TxPublisher) createRBFCompliantTx(req *BumpRequest,
|
|
f FeeFunction) (uint64, error) {
|
|
|
|
for {
|
|
// Create a new tx with the given fee rate and check its
|
|
// mempool acceptance.
|
|
tx, fee, err := t.createAndCheckTx(req, f)
|
|
|
|
switch {
|
|
case err == nil:
|
|
// The tx is valid, return the request ID.
|
|
requestID := t.storeRecord(tx, req, f, fee)
|
|
|
|
log.Infof("Created tx %v for %v inputs: feerate=%v, "+
|
|
"fee=%v, inputs=%v", tx.TxHash(),
|
|
len(req.Inputs), f.FeeRate(), fee,
|
|
inputTypeSummary(req.Inputs))
|
|
|
|
return requestID, nil
|
|
|
|
// If the error indicates the fees paid is not enough, we will
|
|
// ask the fee function to increase the fee rate and retry.
|
|
case errors.Is(err, lnwallet.ErrMempoolFee):
|
|
// We should at least start with a feerate above the
|
|
// mempool min feerate, so if we get this error, it
|
|
// means something is wrong earlier in the pipeline.
|
|
log.Errorf("Current fee=%v, feerate=%v, %v", fee,
|
|
f.FeeRate(), err)
|
|
|
|
fallthrough
|
|
|
|
// We are not paying enough fees so we increase it.
|
|
case errors.Is(err, rpcclient.ErrInsufficientFee):
|
|
increased := false
|
|
|
|
// Keep calling the fee function until the fee rate is
|
|
// increased or maxed out.
|
|
for !increased {
|
|
log.Debugf("Increasing fee for next round, "+
|
|
"current fee=%v, feerate=%v", fee,
|
|
f.FeeRate())
|
|
|
|
// If the fee function tells us that we have
|
|
// used up the budget, we will return an error
|
|
// indicating this tx cannot be made. The
|
|
// sweeper should handle this error and try to
|
|
// cluster these inputs differetly.
|
|
increased, err = f.Increment()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
|
|
// TODO(yy): suppose there's only one bad input, we can do a
|
|
// binary search to find out which input is causing this error
|
|
// by recreating a tx using half of the inputs and check its
|
|
// mempool acceptance.
|
|
default:
|
|
log.Debugf("Failed to create RBF-compliant tx: %v", err)
|
|
return 0, err
|
|
}
|
|
}
|
|
}
|
|
|
|
// storeRecord stores the given record in the records map.
|
|
func (t *TxPublisher) storeRecord(tx *wire.MsgTx, req *BumpRequest,
|
|
f FeeFunction, fee btcutil.Amount) uint64 {
|
|
|
|
// Increase the request counter.
|
|
//
|
|
// NOTE: this is the only place where we increase the
|
|
// counter.
|
|
requestID := t.requestCounter.Add(1)
|
|
|
|
// Register the record.
|
|
t.records.Store(requestID, &monitorRecord{
|
|
tx: tx,
|
|
req: req,
|
|
feeFunction: f,
|
|
fee: fee,
|
|
})
|
|
|
|
return requestID
|
|
}
|
|
|
|
// createAndCheckTx creates a tx based on the given inputs, change output
|
|
// script, and the fee rate. In addition, it validates the tx's mempool
|
|
// acceptance before returning a tx that can be published directly, along with
|
|
// its fee.
|
|
func (t *TxPublisher) createAndCheckTx(req *BumpRequest, f FeeFunction) (
|
|
*wire.MsgTx, btcutil.Amount, error) {
|
|
|
|
// Create the sweep tx with max fee rate of 0 as the fee function
|
|
// guarantees the fee rate used here won't exceed the max fee rate.
|
|
//
|
|
// TODO(yy): refactor this function to not require a max fee rate.
|
|
tx, fee, err := createSweepTx(
|
|
req.Inputs, nil, req.DeliveryAddress, uint32(t.currentHeight),
|
|
f.FeeRate(), 0, t.cfg.Signer,
|
|
)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("create sweep tx: %w", err)
|
|
}
|
|
|
|
// Sanity check the budget still covers the fee.
|
|
if fee > req.Budget {
|
|
return nil, 0, fmt.Errorf("%w: budget=%v, fee=%v",
|
|
ErrNotEnoughBudget, req.Budget, fee)
|
|
}
|
|
|
|
// Validate the tx's mempool acceptance.
|
|
err = t.cfg.Wallet.CheckMempoolAcceptance(tx)
|
|
|
|
// Exit early if the tx is valid.
|
|
if err == nil {
|
|
return tx, fee, nil
|
|
}
|
|
|
|
// Print an error log if the chain backend doesn't support the mempool
|
|
// acceptance test RPC.
|
|
if errors.Is(err, rpcclient.ErrBackendVersion) {
|
|
log.Errorf("TestMempoolAccept not supported by backend, " +
|
|
"consider upgrading it to a newer version")
|
|
return tx, fee, nil
|
|
}
|
|
|
|
// We are running on a backend that doesn't implement the RPC
|
|
// testmempoolaccept, eg, neutrino, so we'll skip the check.
|
|
if errors.Is(err, chain.ErrUnimplemented) {
|
|
log.Debug("Skipped testmempoolaccept due to not implemented")
|
|
return tx, fee, nil
|
|
}
|
|
|
|
return nil, 0, err
|
|
}
|
|
|
|
// broadcast takes a monitored tx and publishes it to the network. Prior to the
|
|
// broadcast, it will subscribe the tx's confirmation notification and attach
|
|
// the event channel to the record. Any broadcast-related errors will not be
|
|
// returned here, instead, they will be put inside the `BumpResult` and
|
|
// returned to the caller.
|
|
func (t *TxPublisher) broadcast(requestID uint64) (*BumpResult, error) {
|
|
// Get the record being monitored.
|
|
record, ok := t.records.Load(requestID)
|
|
if !ok {
|
|
return nil, fmt.Errorf("tx record %v not found", requestID)
|
|
}
|
|
|
|
txid := record.tx.TxHash()
|
|
|
|
tx := record.tx
|
|
log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v",
|
|
txid, len(tx.TxIn), t.currentHeight)
|
|
|
|
// Set the event, and change it to TxFailed if the wallet fails to
|
|
// publish it.
|
|
event := TxPublished
|
|
|
|
// Publish the sweeping tx with customized label. If the publish fails,
|
|
// this error will be saved in the `BumpResult` and it will be removed
|
|
// from being monitored.
|
|
err := t.cfg.Wallet.PublishTransaction(
|
|
tx, labels.MakeLabel(labels.LabelTypeSweepTransaction, nil),
|
|
)
|
|
if err != nil {
|
|
// NOTE: we decide to attach this error to the result instead
|
|
// of returning it here because by the time the tx reaches
|
|
// here, it should have passed the mempool acceptance check. If
|
|
// it still fails to be broadcast, it's likely a non-RBF
|
|
// related error happened. So we send this error back to the
|
|
// caller so that it can handle it properly.
|
|
//
|
|
// TODO(yy): find out which input is causing the failure.
|
|
log.Errorf("Failed to publish tx %v: %v", txid, err)
|
|
event = TxFailed
|
|
}
|
|
|
|
result := &BumpResult{
|
|
Event: event,
|
|
Tx: record.tx,
|
|
Fee: record.fee,
|
|
FeeRate: record.feeFunction.FeeRate(),
|
|
Err: err,
|
|
requestID: requestID,
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// notifyResult sends the result to the resultChan specified by the requestID.
|
|
// This channel is expected to be read by the caller.
|
|
func (t *TxPublisher) notifyResult(result *BumpResult) {
|
|
id := result.requestID
|
|
subscriber, ok := t.subscriberChans.Load(id)
|
|
if !ok {
|
|
log.Errorf("Result chan for id=%v not found", id)
|
|
return
|
|
}
|
|
|
|
log.Debugf("Sending result for requestID=%v, tx=%v", id,
|
|
result.Tx.TxHash())
|
|
|
|
select {
|
|
// Send the result to the subscriber.
|
|
//
|
|
// TODO(yy): Add timeout in case it's blocking?
|
|
case subscriber <- result:
|
|
case <-t.quit:
|
|
log.Debug("Fee bumper stopped")
|
|
}
|
|
}
|
|
|
|
// removeResult removes the tracking of the result if the result contains a
|
|
// non-nil error, or the tx is confirmed, the record will be removed from the
|
|
// maps.
|
|
func (t *TxPublisher) removeResult(result *BumpResult) {
|
|
id := result.requestID
|
|
|
|
// Remove the record from the maps if there's an error. This means this
|
|
// tx has failed its broadcast and cannot be retried. There are two
|
|
// cases,
|
|
// - when the budget cannot cover the fee.
|
|
// - when a non-RBF related error occurs.
|
|
switch result.Event {
|
|
case TxFailed:
|
|
log.Errorf("Removing monitor record=%v, tx=%v, due to err: %v",
|
|
id, result.Tx.TxHash(), result.Err)
|
|
|
|
case TxConfirmed:
|
|
// Remove the record is the tx is confirmed.
|
|
log.Debugf("Removing confirmed monitor record=%v, tx=%v", id,
|
|
result.Tx.TxHash())
|
|
|
|
// Do nothing if it's neither failed or confirmed.
|
|
default:
|
|
log.Tracef("Skipping record removal for id=%v, event=%v", id,
|
|
result.Event)
|
|
|
|
return
|
|
}
|
|
|
|
t.records.Delete(id)
|
|
t.subscriberChans.Delete(id)
|
|
}
|
|
|
|
// handleResult handles the result of a tx broadcast. It will notify the
|
|
// subscriber and remove the record if the tx is confirmed or failed to be
|
|
// broadcast.
|
|
func (t *TxPublisher) handleResult(result *BumpResult) {
|
|
// Notify the subscriber.
|
|
t.notifyResult(result)
|
|
|
|
// Remove the record if it's failed or confirmed.
|
|
t.removeResult(result)
|
|
}
|
|
|
|
// monitorRecord is used to keep track of the tx being monitored by the
|
|
// publisher internally.
|
|
type monitorRecord struct {
|
|
// tx is the tx being monitored.
|
|
tx *wire.MsgTx
|
|
|
|
// req is the original request.
|
|
req *BumpRequest
|
|
|
|
// feeFunction is the fee bumping algorithm used by the publisher.
|
|
feeFunction FeeFunction
|
|
|
|
// fee is the fee paid by the tx.
|
|
fee btcutil.Amount
|
|
}
|
|
|
|
// Start starts the publisher by subscribing to block epoch updates and kicking
|
|
// off the monitor loop.
|
|
func (t *TxPublisher) Start() error {
|
|
log.Info("TxPublisher starting...")
|
|
defer log.Debugf("TxPublisher started")
|
|
|
|
blockEvent, err := t.cfg.Notifier.RegisterBlockEpochNtfn(nil)
|
|
if err != nil {
|
|
return fmt.Errorf("register block epoch ntfn: %w", err)
|
|
}
|
|
|
|
t.wg.Add(1)
|
|
go t.monitor(blockEvent)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop stops the publisher and waits for the monitor loop to exit.
|
|
func (t *TxPublisher) Stop() {
|
|
log.Info("TxPublisher stopping...")
|
|
defer log.Debugf("TxPublisher stopped")
|
|
|
|
close(t.quit)
|
|
|
|
t.wg.Wait()
|
|
}
|
|
|
|
// monitor is the main loop driven by new blocks. Whevenr a new block arrives,
|
|
// it will examine all the txns being monitored, and check if any of them needs
|
|
// to be bumped. If so, it will attempt to bump the fee of the tx.
|
|
//
|
|
// NOTE: Must be run as a goroutine.
|
|
func (t *TxPublisher) monitor(blockEvent *chainntnfs.BlockEpochEvent) {
|
|
defer blockEvent.Cancel()
|
|
defer t.wg.Done()
|
|
|
|
for {
|
|
select {
|
|
case epoch, ok := <-blockEvent.Epochs:
|
|
if !ok {
|
|
// We should stop the publisher before stopping
|
|
// the chain service. Otherwise it indicates an
|
|
// error.
|
|
log.Error("Block epoch channel closed, exit " +
|
|
"monitor")
|
|
|
|
return
|
|
}
|
|
|
|
log.Debugf("TxPublisher received new block: %v",
|
|
epoch.Height)
|
|
|
|
// Update the best known height for the publisher.
|
|
t.currentHeight = epoch.Height
|
|
|
|
// Check all monitored txns to see if any of them needs
|
|
// to be bumped.
|
|
t.processRecords()
|
|
|
|
case <-t.quit:
|
|
log.Debug("Fee bumper stopped, exit monitor")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// processRecords checks all the txns being monitored, and checks if any of
|
|
// them needs to be bumped. If so, it will attempt to bump the fee of the tx.
|
|
func (t *TxPublisher) processRecords() {
|
|
// confirmedRecords stores a map of the records which have been
|
|
// confirmed.
|
|
confirmedRecords := make(map[uint64]*monitorRecord)
|
|
|
|
// feeBumpRecords stores a map of the records which need to be bumped.
|
|
feeBumpRecords := make(map[uint64]*monitorRecord)
|
|
|
|
// visitor is a helper closure that visits each record and divides them
|
|
// into two groups.
|
|
visitor := func(requestID uint64, r *monitorRecord) error {
|
|
log.Tracef("Checking monitor recordID=%v for tx=%v", requestID,
|
|
r.tx.TxHash())
|
|
|
|
// If the tx is already confirmed, we can stop monitoring it.
|
|
if t.isConfirmed(r.tx.TxHash()) {
|
|
confirmedRecords[requestID] = r
|
|
|
|
// Move to the next record.
|
|
return nil
|
|
}
|
|
|
|
feeBumpRecords[requestID] = r
|
|
|
|
// Return nil to move to the next record.
|
|
return nil
|
|
}
|
|
|
|
// Iterate through all the records and divide them into two groups.
|
|
t.records.ForEach(visitor)
|
|
|
|
// For records that are confirmed, we'll notify the caller about this
|
|
// result.
|
|
for requestID, r := range confirmedRecords {
|
|
rec := r
|
|
|
|
log.Debugf("Tx=%v is confirmed", r.tx.TxHash())
|
|
t.wg.Add(1)
|
|
go t.handleTxConfirmed(rec, requestID)
|
|
}
|
|
|
|
// Get the current height to be used in the following goroutines.
|
|
currentHeight := t.currentHeight
|
|
|
|
// For records that are not confirmed, we perform a fee bump if needed.
|
|
for requestID, r := range feeBumpRecords {
|
|
rec := r
|
|
|
|
log.Debugf("Attempting to fee bump Tx=%v", r.tx.TxHash())
|
|
t.wg.Add(1)
|
|
go t.handleFeeBumpTx(requestID, rec, currentHeight)
|
|
}
|
|
}
|
|
|
|
// handleTxConfirmed is called when a monitored tx is confirmed. It will
|
|
// notify the subscriber then remove the record from the maps .
|
|
//
|
|
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
|
|
func (t *TxPublisher) handleTxConfirmed(r *monitorRecord, requestID uint64) {
|
|
defer t.wg.Done()
|
|
|
|
// Create a result that will be sent to the resultChan which is
|
|
// listened by the caller.
|
|
result := &BumpResult{
|
|
Event: TxConfirmed,
|
|
Tx: r.tx,
|
|
requestID: requestID,
|
|
Fee: r.fee,
|
|
FeeRate: r.feeFunction.FeeRate(),
|
|
}
|
|
|
|
// Notify that this tx is confirmed and remove the record from the map.
|
|
t.handleResult(result)
|
|
}
|
|
|
|
// handleFeeBumpTx checks if the tx needs to be bumped, and if so, it will
|
|
// attempt to bump the fee of the tx.
|
|
//
|
|
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
|
|
func (t *TxPublisher) handleFeeBumpTx(requestID uint64, r *monitorRecord,
|
|
currentHeight int32) {
|
|
|
|
defer t.wg.Done()
|
|
|
|
oldTxid := r.tx.TxHash()
|
|
|
|
// Get the current conf target for this record.
|
|
confTarget := calcCurrentConfTarget(currentHeight, r.req.DeadlineHeight)
|
|
|
|
// Ask the fee function whether a bump is needed. We expect the fee
|
|
// function to increase its returned fee rate after calling this
|
|
// method.
|
|
increased, err := r.feeFunction.IncreaseFeeRate(confTarget)
|
|
if err != nil {
|
|
// TODO(yy): send this error back to the sweeper so it can
|
|
// re-group the inputs?
|
|
log.Errorf("Failed to increase fee rate for tx %v at "+
|
|
"height=%v: %v", oldTxid, t.currentHeight, err)
|
|
|
|
return
|
|
}
|
|
|
|
// If the fee rate was not increased, there's no need to bump the fee.
|
|
if !increased {
|
|
log.Tracef("Skip bumping tx %v at height=%v", oldTxid,
|
|
t.currentHeight)
|
|
|
|
return
|
|
}
|
|
|
|
// The fee function now has a new fee rate, we will use it to bump the
|
|
// fee of the tx.
|
|
resultOpt := t.createAndPublishTx(requestID, r)
|
|
|
|
// If there's a result, we will notify the caller about the result.
|
|
resultOpt.WhenSome(func(result BumpResult) {
|
|
// Notify the new result.
|
|
t.handleResult(&result)
|
|
})
|
|
}
|
|
|
|
// createAndPublishTx creates a new tx with a higher fee rate and publishes it
|
|
// to the network. It will update the record with the new tx and fee rate if
|
|
// successfully created, and return the result when published successfully.
|
|
func (t *TxPublisher) createAndPublishTx(requestID uint64,
|
|
r *monitorRecord) fn.Option[BumpResult] {
|
|
|
|
// Fetch the old tx.
|
|
oldTx := r.tx
|
|
|
|
// Create a new tx with the new fee rate.
|
|
//
|
|
// NOTE: The fee function is expected to have increased its returned
|
|
// fee rate after calling the SkipFeeBump method. So we can use it
|
|
// directly here.
|
|
tx, fee, err := t.createAndCheckTx(r.req, r.feeFunction)
|
|
|
|
// If the error is fee related, we will return an error and let the fee
|
|
// bumper retry it at next block.
|
|
//
|
|
// NOTE: we can check the RBF error here and ask the fee function to
|
|
// recalculate the fee rate. However, this would defeat the purpose of
|
|
// using a deadline based fee function:
|
|
// - if the deadline is far away, there's no rush to RBF the tx.
|
|
// - if the deadline is close, we expect the fee function to give us a
|
|
// higher fee rate. If the fee rate cannot satisfy the RBF rules, it
|
|
// means the budget is not enough.
|
|
if errors.Is(err, rpcclient.ErrInsufficientFee) ||
|
|
errors.Is(err, lnwallet.ErrMempoolFee) {
|
|
|
|
log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
|
|
return fn.None[BumpResult]()
|
|
}
|
|
|
|
// If the error is not fee related, we will return a `TxFailed` event
|
|
// so this input can be retried.
|
|
if err != nil {
|
|
// If the tx doesn't not have enought budget, we will return a
|
|
// result so the sweeper can handle it by re-clustering the
|
|
// utxos.
|
|
if errors.Is(err, ErrNotEnoughBudget) {
|
|
log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(),
|
|
err)
|
|
} else {
|
|
// Otherwise, an unexpected error occurred, we will
|
|
// fail the tx and let the sweeper retry the whole
|
|
// process.
|
|
log.Errorf("Failed to bump tx %v: %v", oldTx.TxHash(),
|
|
err)
|
|
}
|
|
|
|
return fn.Some(BumpResult{
|
|
Event: TxFailed,
|
|
Tx: oldTx,
|
|
Err: err,
|
|
requestID: requestID,
|
|
})
|
|
}
|
|
|
|
// The tx has been created without any errors, we now register a new
|
|
// record by overwriting the same requestID.
|
|
t.records.Store(requestID, &monitorRecord{
|
|
tx: tx,
|
|
req: r.req,
|
|
feeFunction: r.feeFunction,
|
|
fee: fee,
|
|
})
|
|
|
|
// Attempt to broadcast this new tx.
|
|
result, err := t.broadcast(requestID)
|
|
if err != nil {
|
|
log.Infof("Failed to broadcast replacement tx %v: %v",
|
|
tx.TxHash(), err)
|
|
|
|
return fn.None[BumpResult]()
|
|
}
|
|
|
|
// A successful replacement tx is created, attach the old tx.
|
|
result.ReplacedTx = oldTx
|
|
|
|
// If the new tx failed to be published, we will return the result so
|
|
// the caller can handle it.
|
|
if result.Event == TxFailed {
|
|
return fn.Some(*result)
|
|
}
|
|
|
|
log.Infof("Replaced tx=%v with new tx=%v", oldTx.TxHash(), tx.TxHash())
|
|
|
|
// Otherwise, it's a successful RBF, set the event and return.
|
|
result.Event = TxReplaced
|
|
|
|
return fn.Some(*result)
|
|
}
|
|
|
|
// isConfirmed checks the btcwallet to see whether the tx is confirmed.
|
|
func (t *TxPublisher) isConfirmed(txid chainhash.Hash) bool {
|
|
details, err := t.cfg.Wallet.GetTransactionDetails(&txid)
|
|
if err != nil {
|
|
log.Warnf("Failed to get tx details for %v: %v", txid, err)
|
|
return false
|
|
}
|
|
|
|
return details.NumConfirmations > 0
|
|
}
|
|
|
|
// calcCurrentConfTarget calculates the current confirmation target based on
|
|
// the deadline height. The conf target is capped at 0 if the deadline has
|
|
// already been past.
|
|
func calcCurrentConfTarget(currentHeight, deadline int32) uint32 {
|
|
var confTarget uint32
|
|
|
|
// Calculate how many blocks left until the deadline.
|
|
deadlineDelta := deadline - currentHeight
|
|
|
|
// If we are already past the deadline, we will set the conf target to
|
|
// be 1.
|
|
if deadlineDelta <= 0 {
|
|
log.Warnf("Deadline is %d blocks behind current height %v",
|
|
-deadlineDelta, currentHeight)
|
|
|
|
confTarget = 1
|
|
} else {
|
|
confTarget = uint32(deadlineDelta)
|
|
}
|
|
|
|
return confTarget
|
|
}
|