mirror of
https://github.com/lightningnetwork/lnd.git
synced 2024-11-19 09:53:54 +01:00
Merge pull request #7448 from Roasbeef/tx-rebroadcaster
lnwallet: add new rebroadcaster interface, use for background tx publish
This commit is contained in:
commit
75ee5574a9
@ -11,18 +11,23 @@ import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/btcsuite/btcd/chaincfg"
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/btcsuite/btclog"
|
||||
"github.com/btcsuite/btcwallet/waddrmgr"
|
||||
"github.com/btcsuite/btcwallet/wallet"
|
||||
"github.com/btcsuite/btcwallet/walletdb"
|
||||
proxy "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
|
||||
"github.com/lightninglabs/neutrino"
|
||||
"github.com/lightninglabs/neutrino/blockntfns"
|
||||
"github.com/lightninglabs/neutrino/headerfs"
|
||||
"github.com/lightninglabs/neutrino/pushtx"
|
||||
"github.com/lightningnetwork/lnd/blockcache"
|
||||
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||
"github.com/lightningnetwork/lnd/chainreg"
|
||||
"github.com/lightningnetwork/lnd/channeldb"
|
||||
"github.com/lightningnetwork/lnd/invoices"
|
||||
@ -606,6 +611,65 @@ func (d *DefaultWalletImpl) BuildWalletConfig(ctx context.Context,
|
||||
return partialChainControl, walletConfig, cleanUp, nil
|
||||
}
|
||||
|
||||
// proxyBlockEpoch proxies a block epoch subsections to the underlying neutrino
|
||||
// rebroadcaster client.
|
||||
func proxyBlockEpoch(notifier chainntnfs.ChainNotifier,
|
||||
) func() (*blockntfns.Subscription, error) {
|
||||
|
||||
return func() (*blockntfns.Subscription, error) {
|
||||
blockEpoch, err := notifier.RegisterBlockEpochNtfn(
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sub := blockntfns.Subscription{
|
||||
Notifications: make(chan blockntfns.BlockNtfn, 6),
|
||||
Cancel: blockEpoch.Cancel,
|
||||
}
|
||||
go func() {
|
||||
for blk := range blockEpoch.Epochs {
|
||||
ntfn := blockntfns.NewBlockConnected(
|
||||
*blk.BlockHeader,
|
||||
uint32(blk.Height),
|
||||
)
|
||||
|
||||
sub.Notifications <- ntfn
|
||||
}
|
||||
}()
|
||||
|
||||
return &sub, nil
|
||||
}
|
||||
}
|
||||
|
||||
// walletReBroadcaster is a simple wrapper around the pushtx.Broadcaster
|
||||
// interface to adhere to the expanded lnwallet.Rebraodcaster interface.
|
||||
type walletReBroadcaster struct {
|
||||
started atomic.Bool
|
||||
|
||||
*pushtx.Broadcaster
|
||||
}
|
||||
|
||||
// newWalletReBroadcaster creates a new instance of the walletReBroadcaster.
|
||||
func newWalletReBroadcaster(broadcaster *pushtx.Broadcaster) *walletReBroadcaster {
|
||||
return &walletReBroadcaster{
|
||||
Broadcaster: broadcaster,
|
||||
}
|
||||
}
|
||||
|
||||
// Start launches all goroutines the rebroadcaster needs to operate.
|
||||
func (w *walletReBroadcaster) Start() error {
|
||||
defer w.started.Store(true)
|
||||
|
||||
return w.Broadcaster.Start()
|
||||
}
|
||||
|
||||
// Started returns true if the broadcaster is already active.
|
||||
func (w *walletReBroadcaster) Started() bool {
|
||||
return w.started.Load()
|
||||
}
|
||||
|
||||
// BuildChainControl is responsible for creating a fully populated chain
|
||||
// control instance from a wallet.
|
||||
//
|
||||
@ -641,6 +705,29 @@ func (d *DefaultWalletImpl) BuildChainControl(
|
||||
NetParams: *walletConfig.NetParams,
|
||||
}
|
||||
|
||||
// The broadcast is already always active for neutrino nodes, so we
|
||||
// don't want to create a rebroadcast loop.
|
||||
if partialChainControl.Cfg.NeutrinoCS == nil {
|
||||
broadcastCfg := pushtx.Config{
|
||||
Broadcast: func(tx *wire.MsgTx) error {
|
||||
cs := partialChainControl.ChainSource
|
||||
_, err := cs.SendRawTransaction(
|
||||
tx, true,
|
||||
)
|
||||
|
||||
return err
|
||||
},
|
||||
SubscribeBlocks: proxyBlockEpoch(
|
||||
partialChainControl.ChainNotifier,
|
||||
),
|
||||
RebroadcastInterval: pushtx.DefaultRebroadcastInterval,
|
||||
}
|
||||
|
||||
lnWalletConfig.Rebroadcaster = newWalletReBroadcaster(
|
||||
pushtx.NewBroadcaster(&broadcastCfg),
|
||||
)
|
||||
}
|
||||
|
||||
// We've created the wallet configuration now, so we can finish
|
||||
// initializing the main chain control.
|
||||
activeChainControl, cleanUp, err := chainreg.NewChainControl(
|
||||
|
@ -12,6 +12,12 @@ that might lead to channel updates being missed, causing channel graph being
|
||||
incomplete. Aside from that, a potential announcement messages being sent out
|
||||
of order is also [fixed](https://github.com/lightningnetwork/lnd/pull/7264).
|
||||
|
||||
`lnd` will now attempt to [rebroadcast unconfirmed
|
||||
transactions](https://github.com/lightningnetwork/lnd/pull/7448) with each
|
||||
passing block the transaction hasn't been confirmed. This was already the
|
||||
default for the neutrino backend. This complements the existing behavior where
|
||||
all unconfirmed transactions are rebroadcast on start up.
|
||||
|
||||
## BOLT Specs
|
||||
|
||||
* Warning messages from peers are now recognized and
|
||||
|
18
lnutils/chan.go
Normal file
18
lnutils/chan.go
Normal file
@ -0,0 +1,18 @@
|
||||
package lnutils
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// RecvOrTimeout attempts to recv over chan c, returning the value. If the
|
||||
// timeout passes before the recv succeeds, an error is returned
|
||||
func RecvOrTimeout[T any](c <-chan T, timeout time.Duration) (*T, error) {
|
||||
select {
|
||||
case m := <-c:
|
||||
return &m, nil
|
||||
|
||||
case <-time.After(timeout):
|
||||
return nil, fmt.Errorf("timeout hit")
|
||||
}
|
||||
}
|
@ -56,4 +56,9 @@ type Config struct {
|
||||
// NetParams is the set of parameters that tells the wallet which chain
|
||||
// it will be operating on.
|
||||
NetParams chaincfg.Params
|
||||
|
||||
// Rebroadcaster is an optional config param that can be used to
|
||||
// passively rebroadcast transactions in the background until they're
|
||||
// detected as being confirmed.
|
||||
Rebroadcaster Rebroadcaster
|
||||
}
|
||||
|
351
lnwallet/mock.go
Normal file
351
lnwallet/mock.go
Normal file
@ -0,0 +1,351 @@
|
||||
package lnwallet
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/btcsuite/btcd/btcec/v2"
|
||||
"github.com/btcsuite/btcd/btcutil"
|
||||
"github.com/btcsuite/btcd/btcutil/hdkeychain"
|
||||
"github.com/btcsuite/btcd/btcutil/psbt"
|
||||
"github.com/btcsuite/btcd/chaincfg"
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/btcsuite/btcwallet/waddrmgr"
|
||||
base "github.com/btcsuite/btcwallet/wallet"
|
||||
"github.com/btcsuite/btcwallet/wallet/txauthor"
|
||||
"github.com/btcsuite/btcwallet/wtxmgr"
|
||||
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
|
||||
)
|
||||
|
||||
var (
|
||||
CoinPkScript, _ = hex.DecodeString("001431df1bde03c074d0cf21ea2529427e1499b8f1de")
|
||||
)
|
||||
|
||||
// mockWalletController is a mock implementation of the WalletController
|
||||
// interface. It let's us mock the interaction with the bitcoin network.
|
||||
type mockWalletController struct {
|
||||
RootKey *btcec.PrivateKey
|
||||
PublishedTransactions chan *wire.MsgTx
|
||||
index uint32
|
||||
Utxos []*Utxo
|
||||
}
|
||||
|
||||
// BackEnd returns "mock" to signify a mock wallet controller.
|
||||
func (w *mockWalletController) BackEnd() string {
|
||||
return "mock"
|
||||
}
|
||||
|
||||
// FetchInputInfo will be called to get info about the inputs to the funding
|
||||
// transaction.
|
||||
func (w *mockWalletController) FetchInputInfo(
|
||||
prevOut *wire.OutPoint) (*Utxo, error) {
|
||||
|
||||
utxo := &Utxo{
|
||||
AddressType: WitnessPubKey,
|
||||
Value: 10 * btcutil.SatoshiPerBitcoin,
|
||||
PkScript: []byte("dummy"),
|
||||
Confirmations: 1,
|
||||
OutPoint: *prevOut,
|
||||
}
|
||||
return utxo, nil
|
||||
}
|
||||
|
||||
// ScriptForOutput returns the address, witness program and redeem script for a
|
||||
// given UTXO. An error is returned if the UTXO does not belong to our wallet or
|
||||
// it is not a managed pubKey address.
|
||||
func (w *mockWalletController) ScriptForOutput(*wire.TxOut) (
|
||||
waddrmgr.ManagedPubKeyAddress, []byte, []byte, error) {
|
||||
|
||||
return nil, nil, nil, nil
|
||||
}
|
||||
|
||||
// ConfirmedBalance currently returns dummy values.
|
||||
func (w *mockWalletController) ConfirmedBalance(int32, string) (btcutil.Amount,
|
||||
error) {
|
||||
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// NewAddress is called to get new addresses for delivery, change etc.
|
||||
func (w *mockWalletController) NewAddress(AddressType, bool,
|
||||
string) (btcutil.Address, error) {
|
||||
|
||||
addr, _ := btcutil.NewAddressPubKey(
|
||||
w.RootKey.PubKey().SerializeCompressed(), &chaincfg.MainNetParams,
|
||||
)
|
||||
return addr, nil
|
||||
}
|
||||
|
||||
// LastUnusedAddress currently returns dummy values.
|
||||
func (w *mockWalletController) LastUnusedAddress(AddressType,
|
||||
string) (btcutil.Address, error) {
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// IsOurAddress currently returns a dummy value.
|
||||
func (w *mockWalletController) IsOurAddress(btcutil.Address) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// AddressInfo currently returns a dummy value.
|
||||
func (w *mockWalletController) AddressInfo(
|
||||
btcutil.Address) (waddrmgr.ManagedAddress, error) {
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// ListAccounts currently returns a dummy value.
|
||||
func (w *mockWalletController) ListAccounts(string,
|
||||
*waddrmgr.KeyScope) ([]*waddrmgr.AccountProperties, error) {
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// RequiredReserve currently returns a dummy value.
|
||||
func (w *mockWalletController) RequiredReserve(uint32) btcutil.Amount {
|
||||
return 0
|
||||
}
|
||||
|
||||
// ListAddresses currently returns a dummy value.
|
||||
func (w *mockWalletController) ListAddresses(string,
|
||||
bool) (AccountAddressMap, error) {
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// ImportAccount currently returns a dummy value.
|
||||
func (w *mockWalletController) ImportAccount(string, *hdkeychain.ExtendedKey,
|
||||
uint32, *waddrmgr.AddressType, bool) (*waddrmgr.AccountProperties,
|
||||
[]btcutil.Address, []btcutil.Address, error) {
|
||||
|
||||
return nil, nil, nil, nil
|
||||
}
|
||||
|
||||
// ImportPublicKey currently returns a dummy value.
|
||||
func (w *mockWalletController) ImportPublicKey(*btcec.PublicKey,
|
||||
waddrmgr.AddressType) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ImportTaprootScript currently returns a dummy value.
|
||||
func (w *mockWalletController) ImportTaprootScript(waddrmgr.KeyScope,
|
||||
*waddrmgr.Tapscript) (waddrmgr.ManagedAddress, error) {
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// SendOutputs currently returns dummy values.
|
||||
func (w *mockWalletController) SendOutputs([]*wire.TxOut,
|
||||
chainfee.SatPerKWeight, int32, string) (*wire.MsgTx, error) {
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// CreateSimpleTx currently returns dummy values.
|
||||
func (w *mockWalletController) CreateSimpleTx([]*wire.TxOut,
|
||||
chainfee.SatPerKWeight, int32, bool) (*txauthor.AuthoredTx, error) {
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// ListUnspentWitness is called by the wallet when doing coin selection. We just
|
||||
// need one unspent for the funding transaction.
|
||||
func (w *mockWalletController) ListUnspentWitness(int32, int32,
|
||||
string) ([]*Utxo, error) {
|
||||
|
||||
// If the mock already has a list of utxos, return it.
|
||||
if w.Utxos != nil {
|
||||
return w.Utxos, nil
|
||||
}
|
||||
|
||||
// Otherwise create one to return.
|
||||
utxo := &Utxo{
|
||||
AddressType: WitnessPubKey,
|
||||
Value: btcutil.Amount(10 * btcutil.SatoshiPerBitcoin),
|
||||
PkScript: CoinPkScript,
|
||||
OutPoint: wire.OutPoint{
|
||||
Hash: chainhash.Hash{},
|
||||
Index: w.index,
|
||||
},
|
||||
}
|
||||
atomic.AddUint32(&w.index, 1)
|
||||
var ret []*Utxo
|
||||
ret = append(ret, utxo)
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// ListTransactionDetails currently returns dummy values.
|
||||
func (w *mockWalletController) ListTransactionDetails(int32, int32,
|
||||
string) ([]*TransactionDetail, error) {
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// LockOutpoint currently does nothing.
|
||||
func (w *mockWalletController) LockOutpoint(o wire.OutPoint) {}
|
||||
|
||||
// UnlockOutpoint currently does nothing.
|
||||
func (w *mockWalletController) UnlockOutpoint(o wire.OutPoint) {}
|
||||
|
||||
// LeaseOutput returns the current time and a nil error.
|
||||
func (w *mockWalletController) LeaseOutput(wtxmgr.LockID, wire.OutPoint,
|
||||
time.Duration) (time.Time, []byte, btcutil.Amount, error) {
|
||||
|
||||
return time.Now(), nil, 0, nil
|
||||
}
|
||||
|
||||
// ReleaseOutput currently does nothing.
|
||||
func (w *mockWalletController) ReleaseOutput(wtxmgr.LockID, wire.OutPoint) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *mockWalletController) ListLeasedOutputs() ([]*base.ListLeasedOutputResult,
|
||||
error) {
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// FundPsbt currently does nothing.
|
||||
func (w *mockWalletController) FundPsbt(*psbt.Packet, int32, chainfee.SatPerKWeight,
|
||||
string, *waddrmgr.KeyScope) (int32, error) {
|
||||
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// SignPsbt currently does nothing.
|
||||
func (w *mockWalletController) SignPsbt(*psbt.Packet) ([]uint32, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// FinalizePsbt currently does nothing.
|
||||
func (w *mockWalletController) FinalizePsbt(_ *psbt.Packet, _ string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// PublishTransaction sends a transaction to the PublishedTransactions chan.
|
||||
func (w *mockWalletController) PublishTransaction(tx *wire.MsgTx, _ string) error {
|
||||
w.PublishedTransactions <- tx
|
||||
return nil
|
||||
}
|
||||
|
||||
// LabelTransaction currently does nothing.
|
||||
func (w *mockWalletController) LabelTransaction(chainhash.Hash, string,
|
||||
bool) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SubscribeTransactions currently does nothing.
|
||||
func (w *mockWalletController) SubscribeTransactions() (TransactionSubscription,
|
||||
error) {
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// IsSynced currently returns dummy values.
|
||||
func (w *mockWalletController) IsSynced() (bool, int64, error) {
|
||||
return true, int64(0), nil
|
||||
}
|
||||
|
||||
// GetRecoveryInfo currently returns dummy values.
|
||||
func (w *mockWalletController) GetRecoveryInfo() (bool, float64, error) {
|
||||
return true, float64(1), nil
|
||||
}
|
||||
|
||||
// Start currently does nothing.
|
||||
func (w *mockWalletController) Start() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop currently does nothing.
|
||||
func (w *mockWalletController) Stop() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *mockWalletController) FetchTx(chainhash.Hash) (*wire.MsgTx, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (w *mockWalletController) RemoveDescendants(*wire.MsgTx) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// mockChainNotifier is a mock implementation of the ChainNotifier interface.
|
||||
type mockChainNotifier struct {
|
||||
SpendChan chan *chainntnfs.SpendDetail
|
||||
EpochChan chan *chainntnfs.BlockEpoch
|
||||
ConfChan chan *chainntnfs.TxConfirmation
|
||||
}
|
||||
|
||||
// RegisterConfirmationsNtfn returns a ConfirmationEvent that contains a channel
|
||||
// that the tx confirmation will go over.
|
||||
func (c *mockChainNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
|
||||
pkScript []byte, numConfs, heightHint uint32,
|
||||
opts ...chainntnfs.NotifierOption) (*chainntnfs.ConfirmationEvent, error) {
|
||||
|
||||
return &chainntnfs.ConfirmationEvent{
|
||||
Confirmed: c.ConfChan,
|
||||
Cancel: func() {},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// RegisterSpendNtfn returns a SpendEvent that contains a channel that the spend
|
||||
// details will go over.
|
||||
func (c *mockChainNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
|
||||
pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) {
|
||||
|
||||
return &chainntnfs.SpendEvent{
|
||||
Spend: c.SpendChan,
|
||||
Cancel: func() {},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// RegisterBlockEpochNtfn returns a BlockEpochEvent that contains a channel that
|
||||
// block epochs will go over.
|
||||
func (c *mockChainNotifier) RegisterBlockEpochNtfn(blockEpoch *chainntnfs.BlockEpoch) (
|
||||
*chainntnfs.BlockEpochEvent, error) {
|
||||
|
||||
return &chainntnfs.BlockEpochEvent{
|
||||
Epochs: c.EpochChan,
|
||||
Cancel: func() {},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Start currently returns a dummy value.
|
||||
func (c *mockChainNotifier) Start() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Started currently returns a dummy value.
|
||||
func (c *mockChainNotifier) Started() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Stop currently returns a dummy value.
|
||||
func (c *mockChainNotifier) Stop() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type mockChainIO struct{}
|
||||
|
||||
func (*mockChainIO) GetBestBlock() (*chainhash.Hash, int32, error) {
|
||||
return nil, 0, nil
|
||||
}
|
||||
|
||||
func (*mockChainIO) GetUtxo(op *wire.OutPoint, _ []byte,
|
||||
heightHint uint32, _ <-chan struct{}) (*wire.TxOut, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (*mockChainIO) GetBlockHash(blockHeight int64) (*chainhash.Hash, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (*mockChainIO) GetBlock(blockHash *chainhash.Hash) (*wire.MsgBlock, error) {
|
||||
return nil, nil
|
||||
}
|
27
lnwallet/rebroadcaster.go
Normal file
27
lnwallet/rebroadcaster.go
Normal file
@ -0,0 +1,27 @@
|
||||
package lnwallet
|
||||
|
||||
import (
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
)
|
||||
|
||||
// Rebroadcaster is an abstract rebroadcaster instance that'll continually
|
||||
// rebroadcast transactions in the background until they're confirmed.
|
||||
type Rebroadcaster interface {
|
||||
// Start launches all goroutines the rebroadcaster needs to operate.
|
||||
Start() error
|
||||
|
||||
// Started returns true if the broadcaster is already active.
|
||||
Started() bool
|
||||
|
||||
// Stop terminates the rebroadcaster and all goroutines it spawned.
|
||||
Stop()
|
||||
|
||||
// Broadcast enqueues a transaction to be rebroadcast until it's been
|
||||
// confirmed.
|
||||
Broadcast(tx *wire.MsgTx) error
|
||||
|
||||
// MarkAsConfirmed marks a transaction as confirmed, so it won't be
|
||||
// rebroadcast.
|
||||
MarkAsConfirmed(txid chainhash.Hash)
|
||||
}
|
197
lnwallet/rebroadcaster_test.go
Normal file
197
lnwallet/rebroadcaster_test.go
Normal file
@ -0,0 +1,197 @@
|
||||
package lnwallet
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
|
||||
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||
"github.com/lightningnetwork/lnd/lnutils"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type mockRebroadcaster struct {
|
||||
started atomic.Bool
|
||||
|
||||
rebroadcastAttempt chan struct{}
|
||||
|
||||
falseStart bool
|
||||
|
||||
confSignal chan struct{}
|
||||
|
||||
startSignal chan struct{}
|
||||
}
|
||||
|
||||
func newMockRebroadcaster(falseStart bool) *mockRebroadcaster {
|
||||
return &mockRebroadcaster{
|
||||
rebroadcastAttempt: make(chan struct{}, 1),
|
||||
falseStart: falseStart,
|
||||
confSignal: make(chan struct{}, 1),
|
||||
startSignal: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockRebroadcaster) Start() error {
|
||||
if !m.falseStart {
|
||||
defer m.started.Store(true)
|
||||
defer close(m.startSignal)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockRebroadcaster) Started() bool {
|
||||
return m.started.Load()
|
||||
}
|
||||
|
||||
func (m *mockRebroadcaster) Stop() {
|
||||
}
|
||||
|
||||
// Broadcast enqueues a transaction to be rebroadcast until it's been
|
||||
// confirmed.
|
||||
func (m *mockRebroadcaster) Broadcast(tx *wire.MsgTx) error {
|
||||
m.rebroadcastAttempt <- struct{}{}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockRebroadcaster) MarkAsConfirmed(txid chainhash.Hash) {
|
||||
m.confSignal <- struct{}{}
|
||||
}
|
||||
|
||||
func assertBroadcasterBypass(t *testing.T, wallet *LightningWallet,
|
||||
rebroadcaster *mockRebroadcaster,
|
||||
walletController *mockWalletController) {
|
||||
|
||||
testTx := wire.NewMsgTx(2)
|
||||
timeout := time.Second * 1
|
||||
|
||||
require.NoError(t, wallet.PublishTransaction(testTx, ""))
|
||||
|
||||
// The tx should go to the backend.
|
||||
_, err := lnutils.RecvOrTimeout(
|
||||
walletController.PublishedTransactions, timeout,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
// It shouldn't go to the rebroadcaster.
|
||||
select {
|
||||
case <-time.After(timeout):
|
||||
case <-rebroadcaster.rebroadcastAttempt:
|
||||
t.Fatal("tx sent to rebroadcaster")
|
||||
}
|
||||
}
|
||||
|
||||
func assertBroadcasterSend(t *testing.T, wallet *LightningWallet,
|
||||
rebroadcaster *mockRebroadcaster,
|
||||
walletController *mockWalletController) {
|
||||
|
||||
testTx := wire.NewMsgTx(2)
|
||||
testTx.AddTxOut(&wire.TxOut{})
|
||||
|
||||
timeout := time.Second * 1
|
||||
|
||||
require.NoError(t, wallet.PublishTransaction(testTx, ""))
|
||||
|
||||
// The tx should go to the backend.
|
||||
_, err := lnutils.RecvOrTimeout(
|
||||
walletController.PublishedTransactions, timeout,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
// It should also go to the rebroadcaster.
|
||||
select {
|
||||
case <-time.After(timeout):
|
||||
t.Fatal("tx not sent to rebroadcaster")
|
||||
case <-rebroadcaster.rebroadcastAttempt:
|
||||
}
|
||||
}
|
||||
|
||||
// TestWalletRebroadcaster tests that the wallet properly manages the existence
|
||||
// or lack of existence of the rebroadcaster, and also properly marks the
|
||||
// transaction as confirmed.
|
||||
func TestWalletRebroadcaster(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
rebroadcaster := newMockRebroadcaster(false)
|
||||
walletController := &mockWalletController{
|
||||
PublishedTransactions: make(chan *wire.MsgTx, 1),
|
||||
}
|
||||
chainIO := &mockChainIO{}
|
||||
notifier := &mockChainNotifier{
|
||||
SpendChan: make(chan *chainntnfs.SpendDetail, 1),
|
||||
EpochChan: make(chan *chainntnfs.BlockEpoch, 1),
|
||||
ConfChan: make(chan *chainntnfs.TxConfirmation, 1),
|
||||
}
|
||||
cfg := &Config{
|
||||
Rebroadcaster: rebroadcaster,
|
||||
WalletController: walletController,
|
||||
Notifier: notifier,
|
||||
ChainIO: chainIO,
|
||||
}
|
||||
|
||||
t.Run("rebroadcast bypass", func(t *testing.T) {
|
||||
// We'll make a copy of the config, but without the
|
||||
// broadcaster.
|
||||
testCfg := *cfg
|
||||
testCfg.Rebroadcaster = nil
|
||||
|
||||
wallet, err := NewLightningWallet(testCfg)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, wallet.Startup())
|
||||
|
||||
// If we try to broadcast, it should go straight to the wallet
|
||||
// backend and skip the broadcaster.
|
||||
assertBroadcasterBypass(
|
||||
t, wallet, rebroadcaster, walletController,
|
||||
)
|
||||
|
||||
wallet.Shutdown()
|
||||
|
||||
// If we make a new wallet, that has the broadcaster, but
|
||||
// hasn't started yet, we should see the same behavior.
|
||||
testCfg.Rebroadcaster = newMockRebroadcaster(true)
|
||||
|
||||
wallet, err = NewLightningWallet(testCfg)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, wallet.Startup())
|
||||
|
||||
assertBroadcasterBypass(
|
||||
t, wallet, rebroadcaster, walletController,
|
||||
)
|
||||
|
||||
wallet.Shutdown()
|
||||
})
|
||||
|
||||
t.Run("rebroadcast normal", func(t *testing.T) {
|
||||
wallet, err := NewLightningWallet(*cfg)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, wallet.Startup())
|
||||
|
||||
defer wallet.Shutdown()
|
||||
|
||||
// Wait for the broadcaster to start.
|
||||
_, err = lnutils.RecvOrTimeout(
|
||||
rebroadcaster.startSignal, time.Second,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
// We'll now broadcast a new test transaction, asserting that
|
||||
// it goes to both the backend and the rebroadcaster.
|
||||
assertBroadcasterSend(
|
||||
t, wallet, rebroadcaster, walletController,
|
||||
)
|
||||
|
||||
// We'll now mark the transaction as confirmed, and assert that
|
||||
// the rebroadcaster was notified.
|
||||
notifier.ConfChan <- &chainntnfs.TxConfirmation{}
|
||||
|
||||
_, err = lnutils.RecvOrTimeout(
|
||||
rebroadcaster.confSignal, time.Second,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
})
|
||||
}
|
@ -407,6 +407,15 @@ func (l *LightningWallet) Startup() error {
|
||||
return err
|
||||
}
|
||||
|
||||
if l.Cfg.Rebroadcaster != nil {
|
||||
go func() {
|
||||
if err := l.Cfg.Rebroadcaster.Start(); err != nil {
|
||||
walletLog.Errorf("unable to start "+
|
||||
"rebroadcaster: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
l.wg.Add(1)
|
||||
// TODO(roasbeef): multiple request handlers?
|
||||
go l.requestHandler()
|
||||
@ -426,11 +435,74 @@ func (l *LightningWallet) Shutdown() error {
|
||||
return err
|
||||
}
|
||||
|
||||
if l.Cfg.Rebroadcaster != nil && l.Cfg.Rebroadcaster.Started() {
|
||||
l.Cfg.Rebroadcaster.Stop()
|
||||
}
|
||||
|
||||
close(l.quit)
|
||||
l.wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
// PublishTransaction wraps the wallet controller tx publish method with an
|
||||
// extra rebroadcaster layer if the sub-system is configured.
|
||||
func (l *LightningWallet) PublishTransaction(tx *wire.MsgTx,
|
||||
label string) error {
|
||||
|
||||
sendTxToWallet := func() error {
|
||||
return l.WalletController.PublishTransaction(tx, label)
|
||||
}
|
||||
|
||||
// If we don't have rebroadcaster then we can exit early (and send only
|
||||
// to the wallet).
|
||||
if l.Cfg.Rebroadcaster == nil || !l.Cfg.Rebroadcaster.Started() {
|
||||
return sendTxToWallet()
|
||||
}
|
||||
|
||||
// We pass this into the rebroadcaster first, so the initial attempt
|
||||
// will succeed if the transaction isn't yet in the mempool. However we
|
||||
// ignore the error here as this might be resent on start up and the
|
||||
// transaction already exists.
|
||||
_ = l.Cfg.Rebroadcaster.Broadcast(tx)
|
||||
|
||||
// Then we pass things into the wallet as normal, which'll add the
|
||||
// transaction label on disk.
|
||||
if err := sendTxToWallet(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO(roasbeef): want diff height actually? no context though
|
||||
_, bestHeight, err := l.Cfg.ChainIO.GetBestBlock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
txHash := tx.TxHash()
|
||||
go func() {
|
||||
const numConfs = 6
|
||||
|
||||
txConf, err := l.Cfg.Notifier.RegisterConfirmationsNtfn(
|
||||
&txHash, tx.TxOut[0].PkScript, numConfs, uint32(bestHeight),
|
||||
)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-txConf.Confirmed:
|
||||
// TODO(roasbeef): also want to remove from
|
||||
// rebroadcaster if conflict happens...deeper wallet
|
||||
// integration?
|
||||
l.Cfg.Rebroadcaster.MarkAsConfirmed(tx.TxHash())
|
||||
|
||||
case <-l.quit:
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ConfirmedBalance returns the current confirmed balance of a wallet account.
|
||||
// This methods wraps the internal WalletController method so we're able to
|
||||
// properly hold the coin select mutex while we compute the balance.
|
||||
|
Loading…
Reference in New Issue
Block a user