Merge pull request #8622 from lightningnetwork/aux-funding

[3/5]: multi: add new AuxFundingController for custom external funding flows
This commit is contained in:
Oliver Gugger 2024-05-27 15:26:13 +02:00 committed by GitHub
commit 652ff81461
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 1064 additions and 83 deletions

View file

@ -8,6 +8,7 @@ import (
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/fn"
)
// ChannelEdgeInfo represents a fully authenticated channel along with all its
@ -62,6 +63,11 @@ type ChannelEdgeInfo struct {
// the value output in the outpoint that created this channel.
Capacity btcutil.Amount
// TapscriptRoot is the optional Merkle root of the tapscript tree if
// this channel is a taproot channel that also commits to a tapscript
// tree (custom channel).
TapscriptRoot fn.Option[chainhash.Hash]
// ExtraOpaqueData is the set of data that was appended to this
// message, some of which we may not actually know how to iterate or
// parse. By holding onto this data, we ensure that we're able to

View file

@ -35,6 +35,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/funding"
"github.com/lightningnetwork/lnd/invoices"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/kvdb"
@ -44,6 +45,7 @@ import (
"github.com/lightningnetwork/lnd/lnwallet/btcwallet"
"github.com/lightningnetwork/lnd/lnwallet/rpcwallet"
"github.com/lightningnetwork/lnd/macaroons"
"github.com/lightningnetwork/lnd/protofsm"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/rpcperms"
"github.com/lightningnetwork/lnd/signal"
@ -162,6 +164,16 @@ type AuxComponents struct {
// TrafficShaper is an optional traffic shaper that can be used to
// control the outgoing channel of a payment.
TrafficShaper fn.Option[routing.TlvTrafficShaper]
// MsgRouter is an optional message router that if set will be used in
// place of a new blank default message router.
MsgRouter fn.Option[protofsm.MsgRouter]
// AuxFundingController is an optional controller that can be used to
// modify the way we handle certain custom chanenl types. It's also
// able to automatically handle new custom protocol messages related to
// the funding process.
AuxFundingController fn.Option[funding.AuxFundingController]
}
// DefaultWalletImpl is the default implementation of our normal, btcwallet

View file

@ -20,6 +20,7 @@ import (
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/models"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnpeer"
@ -82,9 +83,10 @@ var (
// can provide that serve useful when processing a specific network
// announcement.
type optionalMsgFields struct {
capacity *btcutil.Amount
channelPoint *wire.OutPoint
remoteAlias *lnwire.ShortChannelID
capacity *btcutil.Amount
channelPoint *wire.OutPoint
remoteAlias *lnwire.ShortChannelID
tapscriptRoot fn.Option[chainhash.Hash]
}
// apply applies the optional fields within the functional options.
@ -115,6 +117,14 @@ func ChannelPoint(op wire.OutPoint) OptionalMsgField {
}
}
// TapscriptRoot is an optional field that lets the gossiper know of the root of
// the tapscript tree for a custom channel.
func TapscriptRoot(root fn.Option[chainhash.Hash]) OptionalMsgField {
return func(f *optionalMsgFields) {
f.tapscriptRoot = root
}
}
// RemoteAlias is an optional field that lets the gossiper know that a locally
// sent channel update is actually an update for the peer that should replace
// the ShortChannelID field with the remote's alias. This is only used for
@ -2513,6 +2523,9 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
cp := *nMsg.optionalMsgFields.channelPoint
edge.ChannelPoint = cp
}
// Optional tapscript root for custom channels.
edge.TapscriptRoot = nMsg.optionalMsgFields.tapscriptRoot
}
log.Debugf("Adding edge for short_chan_id: %v",

82
funding/aux_funding.go Normal file
View file

@ -0,0 +1,82 @@
package funding
import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/protofsm"
)
// AuxFundingController permits the implementation of the funding of custom
// channels types. The controller serves as a MsgEndpoint which allows it to
// intercept custom messages, or even the regular funding messages. The
// controller might also pass along an aux funding desc based on an existing
// pending channel ID.
type AuxFundingController interface {
// MsgEndpoint is the embedded interface that signals that the funding
// controller is also a message endpoint. This'll allow it to handle
// custom messages specific to the funding type.
protofsm.MsgEndpoint
// DescFromPendingChanID takes a pending channel ID, that may already be
// known due to prior custom channel messages, and maybe returns an aux
// funding desc which can be used to modify how a channel is funded.
DescFromPendingChanID(pid PendingChanID,
openChan *channeldb.OpenChannel,
localKeyRing, remoteKeyRing lnwallet.CommitmentKeyRing,
initiator bool) (fn.Option[lnwallet.AuxFundingDesc], error)
// DeriveTapscriptRoot takes a pending channel ID and maybe returns a
// tapscript root that should be used when creating any MuSig2 sessions
// for a channel.
DeriveTapscriptRoot(PendingChanID) (fn.Option[chainhash.Hash], error)
// ChannelReady is called when a channel has been fully opened (multiple
// confirmations) and is ready to be used. This can be used to perform
// any final setup or cleanup.
ChannelReady(openChan *channeldb.OpenChannel) error
// ChannelFinalized is called when a channel has been fully finalized.
// In this state, we've received the commitment sig from the remote
// party, so we are safe to broadcast the funding transaction.
ChannelFinalized(PendingChanID) error
}
// descFromPendingChanID takes a pending channel ID, that may already be known
// due to prior custom channel messages, and maybe returns an aux funding desc
// which can be used to modify how a channel is funded.
func descFromPendingChanID(controller fn.Option[AuxFundingController],
chanID PendingChanID, openChan *channeldb.OpenChannel,
localKeyRing, remoteKeyRing lnwallet.CommitmentKeyRing,
initiator bool) (fn.Option[lnwallet.AuxFundingDesc], error) {
var result fn.Option[lnwallet.AuxFundingDesc]
mapErr := fn.MapOptionZ(controller, func(c AuxFundingController) error {
var err error
result, err = c.DescFromPendingChanID(
chanID, openChan, localKeyRing, remoteKeyRing,
initiator,
)
return err
})
return result, mapErr
}
// deriveTapscriptRoot takes a pending channel ID and maybe returns a
// tapscript root that should be used when creating any MuSig2 sessions for a
// channel.
func deriveTapscriptRoot(controller fn.Option[AuxFundingController],
chanID PendingChanID) (fn.Option[chainhash.Hash], error) {
var result fn.Option[chainhash.Hash]
mapErr := fn.MapOptionZ(controller, func(c AuxFundingController) error {
var err error
result, err = c.DeriveTapscriptRoot(chanID)
return err
})
return result, mapErr
}

View file

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/blockchain"
@ -98,7 +99,6 @@ const (
// you and limitless channel size (apart from 21 million cap).
MaxBtcFundingAmountWumbo = btcutil.Amount(1000000000)
// TODO(roasbeef): tune.
msgBufferSize = 50
// MaxWaitNumBlocksFundingConf is the maximum number of blocks to wait
@ -288,7 +288,7 @@ type InitFundingMsg struct {
// PendingChanID is not all zeroes (the default value), then this will
// be the pending channel ID used for the funding flow within the wire
// protocol.
PendingChanID [32]byte
PendingChanID PendingChanID
// ChannelType allows the caller to use an explicit channel type for the
// funding negotiation. This type will only be observed if BOTH sides
@ -318,7 +318,7 @@ type fundingMsg struct {
// pendingChannels is a map instantiated per-peer which tracks all active
// pending single funded channels indexed by their pending channel identifier,
// which is a set of 32-bytes generated via a CSPRNG.
type pendingChannels map[[32]byte]*reservationWithCtx
type pendingChannels map[PendingChanID]*reservationWithCtx
// serializedPubKey is used within the FundingManager's activeReservations list
// to identify the nodes with which the FundingManager is actively working to
@ -542,6 +542,12 @@ type Config struct {
// AuxLeafStore is an optional store that can be used to store auxiliary
// leaves for certain custom channel types.
AuxLeafStore fn.Option[lnwallet.AuxLeafStore]
// AuxFundingController is an optional controller that can be used to
// modify the way we handle certain custom channel types. It's also
// able to automatically handle new custom protocol messages related to
// the funding process.
AuxFundingController fn.Option[AuxFundingController]
}
// Manager acts as an orchestrator/bridge between the wallet's
@ -567,8 +573,10 @@ type Manager struct {
// chanIDNonce is a nonce that's incremented for each new funding
// reservation created.
nonceMtx sync.RWMutex
chanIDNonce uint64
chanIDNonce atomic.Uint64
// nonceMtx is a mutex that guards the pendingMusigNonces.
nonceMtx sync.RWMutex
// pendingMusigNonces is used to store the musig2 nonce we generate to
// send funding locked until we receive a funding locked message from
@ -590,7 +598,7 @@ type Manager struct {
// required as mid funding flow, we switch to referencing the channel
// by its full channel ID once the commitment transactions have been
// signed by both parties.
signedReservations map[lnwire.ChannelID][32]byte
signedReservations map[lnwire.ChannelID]PendingChanID
// resMtx guards both of the maps above to ensure that all access is
// goroutine safe.
@ -797,24 +805,28 @@ func (f *Manager) rebroadcastFundingTx(c *channeldb.OpenChannel) {
}
}
// PendingChanID is a type that represents a pending channel ID. This might be
// selected by the caller, but if not, will be automatically selected.
type PendingChanID = [32]byte
// nextPendingChanID returns the next free pending channel ID to be used to
// identify a particular future channel funding workflow.
func (f *Manager) nextPendingChanID() [32]byte {
// Obtain a fresh nonce. We do this by encoding the current nonce
// counter, then incrementing it by one.
f.nonceMtx.Lock()
var nonce [8]byte
binary.LittleEndian.PutUint64(nonce[:], f.chanIDNonce)
f.chanIDNonce++
f.nonceMtx.Unlock()
func (f *Manager) nextPendingChanID() PendingChanID {
// Obtain a fresh nonce. We do this by encoding the incremented nonce.
nextNonce := f.chanIDNonce.Add(1)
var nonceBytes [8]byte
binary.LittleEndian.PutUint64(nonceBytes[:], nextNonce)
// We'll generate the next pending channelID by "encrypting" 32-bytes
// of zeroes which'll extract 32 random bytes from our stream cipher.
var (
nextChanID [32]byte
nextChanID PendingChanID
zeroes [32]byte
)
salsa20.XORKeyStream(nextChanID[:], zeroes[:], nonce[:], &f.chanIDKey)
salsa20.XORKeyStream(
nextChanID[:], zeroes[:], nonceBytes[:], &f.chanIDKey,
)
return nextChanID
}
@ -1044,7 +1056,8 @@ func (f *Manager) reservationCoordinator() {
//
// NOTE: This MUST be run as a goroutine.
func (f *Manager) advanceFundingState(channel *channeldb.OpenChannel,
pendingChanID [32]byte, updateChan chan<- *lnrpc.OpenStatusUpdate) {
pendingChanID PendingChanID,
updateChan chan<- *lnrpc.OpenStatusUpdate) {
defer f.wg.Done()
@ -1119,7 +1132,7 @@ func (f *Manager) advanceFundingState(channel *channeldb.OpenChannel,
// updateChan can be set non-nil to get OpenStatusUpdates.
func (f *Manager) stateStep(channel *channeldb.OpenChannel,
lnChannel *lnwallet.LightningChannel,
shortChanID *lnwire.ShortChannelID, pendingChanID [32]byte,
shortChanID *lnwire.ShortChannelID, pendingChanID PendingChanID,
channelState channelOpeningState,
updateChan chan<- *lnrpc.OpenStatusUpdate) error {
@ -1242,8 +1255,8 @@ func (f *Manager) stateStep(channel *channeldb.OpenChannel,
// advancePendingChannelState waits for a pending channel's funding tx to
// confirm, and marks it open in the database when that happens.
func (f *Manager) advancePendingChannelState(
channel *channeldb.OpenChannel, pendingChanID [32]byte) error {
func (f *Manager) advancePendingChannelState(channel *channeldb.OpenChannel,
pendingChanID PendingChanID) error {
if channel.IsZeroConf() {
// Persist the alias to the alias database.
@ -1605,6 +1618,18 @@ func (f *Manager) fundeeProcessOpenChannel(peer lnpeer.Peer,
return
}
// At this point, if we have an AuxFundingController active, we'll
// check to see if we have a special tapscript root to use in our
// MuSig funding output.
tapscriptRoot, err := deriveTapscriptRoot(
f.cfg.AuxFundingController, msg.PendingChannelID,
)
if err != nil {
err = fmt.Errorf("error deriving tapscript root: %w", err)
log.Error(err)
f.failFundingFlow(peer, cid, err)
}
req := &lnwallet.InitFundingReserveMsg{
ChainHash: &msg.ChainHash,
PendingChanID: msg.PendingChannelID,
@ -1621,6 +1646,7 @@ func (f *Manager) fundeeProcessOpenChannel(peer lnpeer.Peer,
ZeroConf: zeroConf,
OptionScidAlias: scid,
ScidAliasFeature: scidFeatureVal,
TapscriptRoot: tapscriptRoot,
}
reservation, err := f.cfg.Wallet.InitChannelReservation(req)
@ -2217,10 +2243,27 @@ func (f *Manager) waitForPsbt(intent *chanfunding.PsbtIntent,
return
}
// At this point, we'll see if there's an AuxFundingDesc we
// need to deliver so the funding process can continue
// properly.
chanState := resCtx.reservation.ChanState()
localKeys, remoteKeys := resCtx.reservation.CommitmentKeyRings()
auxFundingDesc, err := descFromPendingChanID(
f.cfg.AuxFundingController, cid.tempChanID, chanState,
*localKeys, *remoteKeys, true,
)
if err != nil {
failFlow("error continuing PSBT flow", err)
return
}
// A non-nil error means we can continue the funding flow.
// Notify the wallet so it can prepare everything we need to
// continue.
err = resCtx.reservation.ProcessPsbt()
//
// We'll also pass along the aux funding controller as well,
// which may be used to help process the finalized PSBT.
err = resCtx.reservation.ProcessPsbt(auxFundingDesc)
if err != nil {
failFlow("error continuing PSBT flow", err)
return
@ -2346,7 +2389,6 @@ func (f *Manager) fundeeProcessFundingCreated(peer lnpeer.Peer,
// final funding transaction, as well as a signature for our version of
// the commitment transaction. So at this point, we can validate the
// initiator's commitment transaction, then send our own if it's valid.
// TODO(roasbeef): make case (p vs P) consistent throughout
fundingOut := msg.FundingPoint
log.Infof("completing pending_id(%x) with ChannelPoint(%v)",
pendingChanID[:], fundingOut)
@ -2378,16 +2420,33 @@ func (f *Manager) fundeeProcessFundingCreated(peer lnpeer.Peer,
}
}
// At this point, we'll see if there's an AuxFundingDesc we need to
// deliver so the funding process can continue properly.
chanState := resCtx.reservation.ChanState()
localKeys, remoteKeys := resCtx.reservation.CommitmentKeyRings()
auxFundingDesc, err := descFromPendingChanID(
f.cfg.AuxFundingController, cid.tempChanID, chanState,
*localKeys, *remoteKeys, true,
)
if err != nil {
log.Errorf("error continuing PSBT flow: %v", err)
f.failFundingFlow(peer, cid, err)
return
}
// With all the necessary data available, attempt to advance the
// funding workflow to the next stage. If this succeeds then the
// funding transaction will broadcast after our next message.
// CompleteReservationSingle will also mark the channel as 'IsPending'
// in the database.
//
// We'll also directly pass in the AuxFunding controller as well,
// which may be used by the reservation system to finalize funding our
// side.
completeChan, err := resCtx.reservation.CompleteReservationSingle(
&fundingOut, commitSig,
&fundingOut, commitSig, auxFundingDesc,
)
if err != nil {
// TODO(roasbeef): better error logging: peerID, channelID, etc.
log.Errorf("unable to complete single reservation: %v", err)
f.failFundingFlow(peer, cid, err)
return
@ -2683,6 +2742,22 @@ func (f *Manager) funderProcessFundingSigned(peer lnpeer.Peer,
}
}
// Before we proceed, if we have a funding hook that wants a
// notification that it's safe to broadcast the funding transaction,
// then we'll send that now.
err = fn.MapOptionZ(
f.cfg.AuxFundingController,
func(controller AuxFundingController) error {
return controller.ChannelFinalized(cid.tempChanID)
},
)
if err != nil {
cid := newChanIdentifier(msg.ChanID)
f.sendWarning(peer, cid, err)
return
}
// Now that we have a finalized reservation for this funding flow,
// we'll send the to be active channel to the ChainArbitrator so it can
// watch for any on-chain actions before the channel has fully
@ -2698,9 +2773,6 @@ func (f *Manager) funderProcessFundingSigned(peer lnpeer.Peer,
// Send an update to the upstream client that the negotiation process
// is over.
//
// TODO(roasbeef): add abstraction over updates to accommodate
// long-polling, or SSE, etc.
upd := &lnrpc.OpenStatusUpdate{
Update: &lnrpc.OpenStatusUpdate_ChanPending{
ChanPending: &lnrpc.PendingUpdate{
@ -2744,7 +2816,7 @@ type confirmedChannel struct {
// channel as closed. The error is only returned for the responder of the
// channel flow.
func (f *Manager) fundingTimeout(c *channeldb.OpenChannel,
pendingID [32]byte) error {
pendingID PendingChanID) error {
// We'll get a timeout if the number of blocks mined since the channel
// was initiated reaches MaxWaitNumBlocksFundingConf and we are not the
@ -3409,6 +3481,7 @@ func (f *Manager) addToRouterGraph(completeChan *channeldb.OpenChannel,
errChan := f.cfg.SendAnnouncement(
ann.chanAnn, discovery.ChannelCapacity(completeChan.Capacity),
discovery.ChannelPoint(completeChan.FundingOutpoint),
discovery.TapscriptRoot(completeChan.TapscriptRoot),
)
select {
case err := <-errChan:
@ -3607,7 +3680,7 @@ func (f *Manager) annAfterSixConfs(completeChan *channeldb.OpenChannel,
// a zero-conf channel. This will wait for the real confirmation, add the
// confirmed SCID to the router graph, and then announce after six confs.
func (f *Manager) waitForZeroConfChannel(c *channeldb.OpenChannel,
pendingID [32]byte) error {
_ PendingChanID) error {
// First we'll check whether the channel is confirmed on-chain. If it
// is already confirmed, the chainntnfs subsystem will return with the
@ -3938,6 +4011,19 @@ func (f *Manager) handleChannelReady(peer lnpeer.Peer, //nolint:funlen
PubNonce: remoteNonce,
}),
)
err = fn.MapOptionZ(
f.cfg.AuxFundingController,
func(controller AuxFundingController) error {
return controller.ChannelReady(channel)
},
)
if err != nil {
cid := newChanIdentifier(msg.ChanID)
f.sendWarning(peer, cid, err)
return
}
}
// The channel_ready message contains the next commitment point we'll
@ -3975,7 +4061,7 @@ func (f *Manager) handleChannelReady(peer lnpeer.Peer, //nolint:funlen
// channel is now active, thus we change its state to `addedToRouterGraph` to
// let the channel start handling routing.
func (f *Manager) handleChannelReadyReceived(channel *channeldb.OpenChannel,
scid *lnwire.ShortChannelID, pendingChanID [32]byte,
scid *lnwire.ShortChannelID, pendingChanID PendingChanID,
updateChan chan<- *lnrpc.OpenStatusUpdate) error {
chanID := lnwire.NewChanIDFromOutPoint(channel.FundingOutpoint)
@ -4024,6 +4110,17 @@ func (f *Manager) handleChannelReadyReceived(channel *channeldb.OpenChannel,
log.Debugf("Channel(%v) with ShortChanID %v: successfully "+
"added to router graph", chanID, scid)
err = fn.MapOptionZ(
f.cfg.AuxFundingController,
func(controller AuxFundingController) error {
return controller.ChannelReady(channel)
},
)
if err != nil {
return fmt.Errorf("failed notifying aux funding controller "+
"about channel ready: %w", err)
}
// Give the caller a final update notifying them that the channel is
fundingPoint := channel.FundingOutpoint
cp := &lnrpc.ChannelPoint{
@ -4337,9 +4434,9 @@ func (f *Manager) announceChannel(localIDKey, remoteIDKey *btcec.PublicKey,
//
// We can pass in zeroes for the min and max htlc policy, because we
// only use the channel announcement message from the returned struct.
ann, err := f.newChanAnnouncement(localIDKey, remoteIDKey,
localFundingKey, remoteFundingKey, shortChanID, chanID,
0, 0, nil, chanType,
ann, err := f.newChanAnnouncement(
localIDKey, remoteIDKey, localFundingKey, remoteFundingKey,
shortChanID, chanID, 0, 0, nil, chanType,
)
if err != nil {
log.Errorf("can't generate channel announcement: %v", err)
@ -4405,7 +4502,6 @@ func (f *Manager) announceChannel(localIDKey, remoteIDKey *btcec.PublicKey,
// InitFundingWorkflow sends a message to the funding manager instructing it
// to initiate a single funder workflow with the source peer.
// TODO(roasbeef): re-visit blocking nature..
func (f *Manager) InitFundingWorkflow(msg *InitFundingMsg) {
f.fundingRequests <- msg
}
@ -4499,7 +4595,7 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) {
// If the caller specified their own channel ID, then we'll use that.
// Otherwise we'll generate a fresh one as normal. This will be used
// to track this reservation throughout its lifetime.
var chanID [32]byte
var chanID PendingChanID
if msg.PendingChanID == zeroID {
chanID = f.nextPendingChanID()
} else {
@ -4595,6 +4691,20 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) {
scidFeatureVal = true
}
// At this point, if we have an AuxFundingController active, we'll check
// to see if we have a special tapscript root to use in our MuSig2
// funding output.
tapscriptRoot, err := deriveTapscriptRoot(
f.cfg.AuxFundingController, chanID,
)
if err != nil {
err = fmt.Errorf("error deriving tapscript root: %w", err)
log.Error(err)
msg.Err <- err
return
}
req := &lnwallet.InitFundingReserveMsg{
ChainHash: &msg.ChainHash,
PendingChanID: chanID,
@ -4618,6 +4728,7 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) {
OptionScidAlias: scid,
ScidAliasFeature: scidFeatureVal,
Memo: msg.Memo,
TapscriptRoot: tapscriptRoot,
}
reservation, err := f.cfg.Wallet.InitChannelReservation(req)
@ -4904,7 +5015,8 @@ func (f *Manager) pruneZombieReservations() {
// cancelReservationCtx does all needed work in order to securely cancel the
// reservation.
func (f *Manager) cancelReservationCtx(peerKey *btcec.PublicKey,
pendingChanID [32]byte, byRemote bool) (*reservationWithCtx, error) {
pendingChanID PendingChanID,
byRemote bool) (*reservationWithCtx, error) {
log.Infof("Cancelling funding reservation for node_key=%x, "+
"chan_id=%x", peerKey.SerializeCompressed(), pendingChanID[:])
@ -4952,7 +5064,7 @@ func (f *Manager) cancelReservationCtx(peerKey *btcec.PublicKey,
// deleteReservationCtx deletes the reservation uniquely identified by the
// target public key of the peer, and the specified pending channel ID.
func (f *Manager) deleteReservationCtx(peerKey *btcec.PublicKey,
pendingChanID [32]byte) {
pendingChanID PendingChanID) {
peerIDKey := newSerializedKey(peerKey)
f.resMtx.Lock()
@ -4975,7 +5087,7 @@ func (f *Manager) deleteReservationCtx(peerKey *btcec.PublicKey,
// getReservationCtx returns the reservation context for a particular pending
// channel ID for a target peer.
func (f *Manager) getReservationCtx(peerKey *btcec.PublicKey,
pendingChanID [32]byte) (*reservationWithCtx, error) {
pendingChanID PendingChanID) (*reservationWithCtx, error) {
peerIDKey := newSerializedKey(peerKey)
f.resMtx.RLock()
@ -4995,7 +5107,7 @@ func (f *Manager) getReservationCtx(peerKey *btcec.PublicKey,
// of being funded. After the funding transaction has been confirmed, the
// channel will receive a new, permanent channel ID, and will no longer be
// considered pending.
func (f *Manager) IsPendingChannel(pendingChanID [32]byte,
func (f *Manager) IsPendingChannel(pendingChanID PendingChanID,
peer lnpeer.Peer) bool {
peerIDKey := newSerializedKey(peer.IdentityKey())

View file

@ -177,6 +177,17 @@ func runPsbtChanFunding(ht *lntest.HarnessTest, carol, dave *node.HarnessNode,
},
)
// If this is a taproot channel, then we'll decode the PSBT to assert
// that an internal key is included.
if commitType == lnrpc.CommitmentType_SIMPLE_TAPROOT {
decodedPSBT, err := psbt.NewFromRawBytes(
bytes.NewReader(tempPsbt), false,
)
require.NoError(ht, err)
require.Len(ht, decodedPSBT.Outputs[0].TaprootInternalKey, 32)
}
// Let's add a second channel to the batch. This time between Carol and
// Alice. We will publish the batch TX once this channel funding is
// complete.

View file

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcec/v2/schnorr/musig2"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
@ -105,6 +106,26 @@ func (s *ShimIntent) FundingOutput() ([]byte, *wire.TxOut, error) {
)
}
// TaprootInternalKey may return the internal key for a MuSig2 funding output,
// but only if this is actually a MuSig2 channel.
func (s *ShimIntent) TaprootInternalKey() fn.Option[*btcec.PublicKey] {
if !s.musig2 {
return fn.None[*btcec.PublicKey]()
}
// Similar to the existing p2wsh script, we'll always ensure the keys
// are sorted before use. Since we're only interested in the internal
// key, we don't need to take into account any tapscript root.
//
// We ignore the error here as this is only called after FundingOutput
// is called.
combinedKey, _, _, _ := musig2.AggregateKeys(
[]*btcec.PublicKey{s.localKey.PubKey, s.remoteKey}, true,
)
return fn.Some(combinedKey.PreTweakedKey)
}
// Cancel allows the caller to cancel a funding Intent at any time. This will
// return any resources such as coins back to the eligible pool to be used in
// order channel fundings.

View file

@ -6,11 +6,14 @@ import (
"sync"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcec/v2/schnorr"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/btcutil/psbt"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
)
@ -162,6 +165,13 @@ func (i *PsbtIntent) BindKeys(localKey *keychain.KeyDescriptor,
i.State = PsbtOutputKnown
}
// BindTapscriptRoot takes an optional tapscript root and binds it to the
// underlying funding intent. This only applies to musig2 channels, and will be
// used to make the musig2 funding output.
func (i *PsbtIntent) BindTapscriptRoot(root fn.Option[chainhash.Hash]) {
i.tapscriptRoot = root
}
// FundingParams returns the parameters that are necessary to start funding the
// channel output this intent was created for. It returns the P2WSH funding
// address, the exact funding amount and a PSBT packet that contains exactly one
@ -208,7 +218,18 @@ func (i *PsbtIntent) FundingParams() (btcutil.Address, int64, *psbt.Packet,
}
}
packet.UnsignedTx.TxOut = append(packet.UnsignedTx.TxOut, out)
packet.Outputs = append(packet.Outputs, psbt.POutput{})
var pOut psbt.POutput
// If this is a MuSig2 channel, we also need to communicate the internal
// key to the caller. Otherwise, they cannot verify the construction of
// the P2TR output script.
pOut.TaprootInternalKey = fn.MapOptionZ(
i.TaprootInternalKey(), schnorr.SerializePubKey,
)
packet.Outputs = append(packet.Outputs, pOut)
return addr, out.Value, packet, nil
}

View file

@ -11,6 +11,7 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnwallet/chanfunding"
@ -602,12 +603,15 @@ func (r *ChannelReservation) IsCannedShim() bool {
}
// ProcessPsbt continues a previously paused funding flow that involves PSBT to
// construct the funding transaction. This method can be called once the PSBT is
// finalized and the signed transaction is available.
func (r *ChannelReservation) ProcessPsbt() error {
// construct the funding transaction. This method can be called once the PSBT
// is finalized and the signed transaction is available.
func (r *ChannelReservation) ProcessPsbt(
auxFundingDesc fn.Option[AuxFundingDesc]) error {
errChan := make(chan error, 1)
r.wallet.msgChan <- &continueContributionMsg{
auxFundingDesc: auxFundingDesc,
pendingFundingID: r.reservationID,
err: errChan,
}
@ -709,8 +713,10 @@ func (r *ChannelReservation) CompleteReservation(fundingInputScripts []*input.Sc
// available via the .OurSignatures() method. As this method should only be
// called as a response to a single funder channel, only a commitment signature
// will be populated.
func (r *ChannelReservation) CompleteReservationSingle(fundingPoint *wire.OutPoint,
commitSig input.Signature) (*channeldb.OpenChannel, error) {
func (r *ChannelReservation) CompleteReservationSingle(
fundingPoint *wire.OutPoint, commitSig input.Signature,
auxFundingDesc fn.Option[AuxFundingDesc]) (*channeldb.OpenChannel,
error) {
errChan := make(chan error, 1)
completeChan := make(chan *channeldb.OpenChannel, 1)
@ -720,6 +726,7 @@ func (r *ChannelReservation) CompleteReservationSingle(fundingPoint *wire.OutPoi
fundingOutpoint: fundingPoint,
theirCommitmentSig: commitSig,
completeChan: completeChan,
auxFundingDesc: auxFundingDesc,
err: errChan,
}
@ -805,6 +812,38 @@ func (r *ChannelReservation) Cancel() error {
return <-errChan
}
// ChanState the current open channel state.
func (r *ChannelReservation) ChanState() *channeldb.OpenChannel {
r.RLock()
defer r.RUnlock()
return r.partialState
}
// CommitmentKeyRings returns the local+remote key ring used for the very first
// commitment transaction both parties.
func (r *ChannelReservation) CommitmentKeyRings() (*CommitmentKeyRing,
*CommitmentKeyRing) {
r.RLock()
defer r.RUnlock()
chanType := r.partialState.ChanType
ourChanCfg := r.ourContribution.ChannelConfig
theirChanCfg := r.theirContribution.ChannelConfig
localKeys := DeriveCommitmentKeys(
r.ourContribution.FirstCommitmentPoint, true, chanType,
ourChanCfg, theirChanCfg,
)
remoteKeys := DeriveCommitmentKeys(
r.theirContribution.FirstCommitmentPoint, false, chanType,
ourChanCfg, theirChanCfg,
)
return localKeys, remoteKeys
}
// VerifyConstraints is a helper function that can be used to check the sanity
// of various channel constraints.
func VerifyConstraints(c *channeldb.ChannelConstraints,

View file

@ -34,6 +34,7 @@ import (
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/chainntnfs/btcdnotify"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/kvdb"
@ -936,6 +937,7 @@ func testSingleFunderReservationWorkflow(miner *rpctest.Harness,
fundingPoint := aliceChanReservation.FundingOutpoint()
_, err = bobChanReservation.CompleteReservationSingle(
fundingPoint, aliceCommitSig,
fn.None[lnwallet.AuxFundingDesc](),
)
require.NoError(t, err, "bob unable to consume single reservation")

View file

@ -31,6 +31,7 @@ import (
"github.com/lightningnetwork/lnd/lnwallet/chanvalidate"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/shachain"
"github.com/lightningnetwork/lnd/tlv"
)
const (
@ -89,6 +90,33 @@ func (p *PsbtFundingRequired) Error() string {
return ErrPsbtFundingRequired.Error()
}
// AuxFundingDesc stores a series of attributes that may be used to modify the
// way the channel funding occurs. This struct contains information that can
// only be derived once both sides have received and sent their contributions
// to the channel (keys, etc.).
type AuxFundingDesc struct {
// CustomFundingBlob is a custom blob that'll be stored in the database
// within the OpenChannel struct. This should represent information
// static to the channel lifetime.
CustomFundingBlob tlv.Blob
// CustomLocalCommitBlob is a custom blob that'll be stored in the
// first commitment entry for the local party.
CustomLocalCommitBlob tlv.Blob
// CustomRemoteCommitBlob is a custom blob that'll be stored in the
// first commitment entry for the remote party.
CustomRemoteCommitBlob tlv.Blob
// LocalInitAuxLeaves is the set of aux leaves that'll be used for our
// very first commitment state.
LocalInitAuxLeaves CommitAuxLeaves
// RemoteInitAuxLeaves is the set of aux leaves that'll be used for the
// very first commitment state for the remote party.
RemoteInitAuxLeaves CommitAuxLeaves
}
// InitFundingReserveMsg is the first message sent to initiate the workflow
// required to open a payment channel with a remote peer. The initial required
// parameters are configurable across channels. These parameters are to be
@ -201,9 +229,8 @@ type InitFundingReserveMsg struct {
// channel that will be useful to our future selves.
Memo []byte
// TapscriptRoot is the root of the tapscript tree that will be used to
// create the funding output. This is an optional field that should
// only be set for taproot channels.
// TapscriptRoot is an optional tapscript root that if provided, will
// be used to create the combined key for musig2 based channels.
TapscriptRoot fn.Option[chainhash.Hash]
// err is a channel in which all errors will be sent across. Will be
@ -241,7 +268,6 @@ type fundingReserveCancelMsg struct {
type addContributionMsg struct {
pendingFundingID uint64
// TODO(roasbeef): Should also carry SPV proofs in we're in SPV mode
contribution *ChannelContribution
// NOTE: In order to avoid deadlocks, this channel MUST be buffered.
@ -254,6 +280,10 @@ type addContributionMsg struct {
type continueContributionMsg struct {
pendingFundingID uint64
// auxFundingDesc is an optional descriptor that contains information
// about the custom channel funding flow.
auxFundingDesc fn.Option[AuxFundingDesc]
// NOTE: In order to avoid deadlocks, this channel MUST be buffered.
err chan error
}
@ -309,6 +339,10 @@ type addCounterPartySigsMsg struct {
type addSingleFunderSigsMsg struct {
pendingFundingID uint64
// auxFundingDesc is an optional descriptor that contains information
// about the custom channel funding flow.
auxFundingDesc fn.Option[AuxFundingDesc]
// fundingOutpoint is the outpoint of the completed funding
// transaction as assembled by the workflow initiator.
fundingOutpoint *wire.OutPoint
@ -412,8 +446,6 @@ type LightningWallet struct {
quit chan struct{}
wg sync.WaitGroup
// TODO(roasbeef): handle wallet lock/unlock
}
// NewLightningWallet creates/opens and initializes a LightningWallet instance.
@ -458,7 +490,6 @@ func (l *LightningWallet) Startup() error {
}
l.wg.Add(1)
// TODO(roasbeef): multiple request handlers?
go l.requestHandler()
return nil
@ -1412,7 +1443,6 @@ func (l *LightningWallet) initOurContribution(reservation *ChannelReservation,
// transaction via coin selection are freed allowing future reservations to
// include them.
func (l *LightningWallet) handleFundingCancelRequest(req *fundingReserveCancelMsg) {
// TODO(roasbeef): holding lock too long
l.limboMtx.Lock()
defer l.limboMtx.Unlock()
@ -1437,11 +1467,6 @@ func (l *LightningWallet) handleFundingCancelRequest(req *fundingReserveCancelMs
)
}
// TODO(roasbeef): is it even worth it to keep track of unused keys?
// TODO(roasbeef): Is it possible to mark the unused change also as
// available?
delete(l.fundingLimbo, req.pendingFundingID)
pid := pendingReservation.pendingChanID
@ -1461,7 +1486,8 @@ func (l *LightningWallet) handleFundingCancelRequest(req *fundingReserveCancelMs
// createCommitOpts is a struct that holds the options for creating a new
// commitment transaction.
type createCommitOpts struct {
auxLeaves fn.Option[CommitAuxLeaves]
localAuxLeaves fn.Option[CommitAuxLeaves]
remoteAuxLeaves fn.Option[CommitAuxLeaves]
}
// defaultCommitOpts returns a new createCommitOpts with default values.
@ -1469,6 +1495,17 @@ func defaultCommitOpts() createCommitOpts {
return createCommitOpts{}
}
// WithAuxLeaves is a functional option that can be used to set the aux leaves
// for a new commitment transaction.
func WithAuxLeaves(localLeaves,
remoteLeaves fn.Option[CommitAuxLeaves]) CreateCommitOpt {
return func(o *createCommitOpts) {
o.localAuxLeaves = localLeaves
o.remoteAuxLeaves = remoteLeaves
}
}
// CreateCommitOpt is a functional option that can be used to modify the way a
// new commitment transaction is created.
type CreateCommitOpt func(*createCommitOpts)
@ -1500,7 +1537,7 @@ func CreateCommitmentTxns(localBalance, remoteBalance btcutil.Amount,
ourCommitTx, err := CreateCommitTx(
chanType, fundingTxIn, localCommitmentKeys, ourChanCfg,
theirChanCfg, localBalance, remoteBalance, 0, initiator,
leaseExpiry, options.auxLeaves,
leaseExpiry, options.localAuxLeaves,
)
if err != nil {
return nil, nil, err
@ -1514,7 +1551,7 @@ func CreateCommitmentTxns(localBalance, remoteBalance btcutil.Amount,
theirCommitTx, err := CreateCommitTx(
chanType, fundingTxIn, remoteCommitmentKeys, theirChanCfg,
ourChanCfg, remoteBalance, localBalance, 0, !initiator,
leaseExpiry, options.auxLeaves,
leaseExpiry, options.remoteAuxLeaves,
)
if err != nil {
return nil, nil, err
@ -1607,16 +1644,24 @@ func (l *LightningWallet) handleContributionMsg(req *addContributionMsg) {
// and remote key which will be needed to calculate the multisig
// funding output in a next step.
pendingChanID := pendingReservation.pendingChanID
walletLog.Debugf("Advancing PSBT funding flow for "+
"pending_id(%x), binding keys local_key=%v, "+
"remote_key=%x", pendingChanID,
&ourContribution.MultiSigKey,
theirContribution.MultiSigKey.PubKey.SerializeCompressed())
fundingIntent.BindKeys(
&ourContribution.MultiSigKey,
theirContribution.MultiSigKey.PubKey,
)
// We might have a tapscript root, so we'll bind that now to
// ensure we make the proper funding output.
fundingIntent.BindTapscriptRoot(
pendingReservation.partialState.TapscriptRoot,
)
// Exit early because we can't continue the funding flow yet.
req.err <- &PsbtFundingRequired{
Intent: fundingIntent,
@ -1689,16 +1734,17 @@ func (l *LightningWallet) handleContributionMsg(req *addContributionMsg) {
// the commitment transaction for the remote party, and verify their incoming
// partial signature.
func genMusigSession(ourContribution, theirContribution *ChannelContribution,
signer input.MuSig2Signer,
fundingOutput *wire.TxOut) *MusigPairSession {
signer input.MuSig2Signer, fundingOutput *wire.TxOut,
tapscriptRoot fn.Option[chainhash.Hash]) *MusigPairSession {
return NewMusigPairSession(&MusigSessionCfg{
LocalKey: ourContribution.MultiSigKey,
RemoteKey: theirContribution.MultiSigKey,
LocalNonce: *ourContribution.LocalNonce,
RemoteNonce: *theirContribution.LocalNonce,
Signer: signer,
InputTxOut: fundingOutput,
LocalKey: ourContribution.MultiSigKey,
RemoteKey: theirContribution.MultiSigKey,
LocalNonce: *ourContribution.LocalNonce,
RemoteNonce: *theirContribution.LocalNonce,
Signer: signer,
InputTxOut: fundingOutput,
TapscriptTweak: tapscriptRoot,
})
}
@ -1748,6 +1794,7 @@ func (l *LightningWallet) signCommitTx(pendingReservation *ChannelReservation,
musigSessions := genMusigSession(
ourContribution, theirContribution,
l.Cfg.Signer, fundingOutput,
pendingReservation.partialState.TapscriptRoot,
)
pendingReservation.musigSessions = musigSessions
}
@ -1783,6 +1830,26 @@ func (l *LightningWallet) handleChanPointReady(req *continueContributionMsg) {
return
}
chanState := pendingReservation.partialState
// If we have an aux funding desc, then we can use it to populate some
// of the optional, but opaque TLV blobs we'll carry for the channel.
chanState.CustomBlob = fn.MapOption(func(desc AuxFundingDesc) tlv.Blob {
return desc.CustomFundingBlob
})(req.auxFundingDesc)
chanState.LocalCommitment.CustomBlob = fn.MapOption(
func(desc AuxFundingDesc) tlv.Blob {
return desc.CustomLocalCommitBlob
},
)(req.auxFundingDesc)
chanState.RemoteCommitment.CustomBlob = fn.MapOption(
func(desc AuxFundingDesc) tlv.Blob {
return desc.CustomRemoteCommitBlob
},
)(req.auxFundingDesc)
ourContribution := pendingReservation.ourContribution
theirContribution := pendingReservation.theirContribution
chanPoint := pendingReservation.partialState.FundingOutpoint
@ -1841,7 +1908,6 @@ func (l *LightningWallet) handleChanPointReady(req *continueContributionMsg) {
// Store their current commitment point. We'll need this after the
// first state transition in order to verify the authenticity of the
// revocation.
chanState := pendingReservation.partialState
chanState.RemoteCurrentRevocation = theirContribution.FirstCommitmentPoint
// Create the txin to our commitment transaction; required to construct
@ -1857,6 +1923,18 @@ func (l *LightningWallet) handleChanPointReady(req *continueContributionMsg) {
if pendingReservation.partialState.ChanType.HasLeaseExpiration() {
leaseExpiry = pendingReservation.partialState.ThawHeight
}
localAuxLeaves := fn.MapOption(
func(desc AuxFundingDesc) CommitAuxLeaves {
return desc.LocalInitAuxLeaves
},
)(req.auxFundingDesc)
remoteAuxLeaves := fn.MapOption(
func(desc AuxFundingDesc) CommitAuxLeaves {
return desc.RemoteInitAuxLeaves
},
)(req.auxFundingDesc)
ourCommitTx, theirCommitTx, err := CreateCommitmentTxns(
localBalance, remoteBalance, ourContribution.ChannelConfig,
theirContribution.ChannelConfig,
@ -1864,6 +1942,7 @@ func (l *LightningWallet) handleChanPointReady(req *continueContributionMsg) {
theirContribution.FirstCommitmentPoint, fundingTxIn,
pendingReservation.partialState.ChanType,
pendingReservation.partialState.IsInitiator, leaseExpiry,
WithAuxLeaves(localAuxLeaves, remoteAuxLeaves),
)
if err != nil {
req.err <- err
@ -2129,6 +2208,7 @@ func (l *LightningWallet) verifyCommitSig(res *ChannelReservation,
res.musigSessions = genMusigSession(
res.ourContribution, res.theirContribution,
l.Cfg.Signer, fundingOutput,
res.partialState.TapscriptRoot,
)
}
@ -2219,9 +2299,6 @@ func (l *LightningWallet) handleFundingCounterPartySigs(msg *addCounterPartySigs
// As we're about to broadcast the funding transaction, we'll take note
// of the current height for record keeping purposes.
//
// TODO(roasbeef): this info can also be piped into light client's
// basic fee estimation?
_, bestHeight, err := l.Cfg.ChainIO.GetBestBlock()
if err != nil {
msg.err <- err
@ -2282,6 +2359,23 @@ func (l *LightningWallet) handleSingleFunderSigs(req *addSingleFunderSigsMsg) {
defer pendingReservation.Unlock()
chanState := pendingReservation.partialState
// If we have an aux funding desc, then we can use it to populate some
// of the optional, but opaque TLV blobs we'll carry for the channel.
chanState.CustomBlob = fn.MapOption(func(desc AuxFundingDesc) tlv.Blob {
return desc.CustomFundingBlob
})(req.auxFundingDesc)
chanState.LocalCommitment.CustomBlob = fn.MapOption(
func(desc AuxFundingDesc) tlv.Blob {
return desc.CustomLocalCommitBlob
},
)(req.auxFundingDesc)
chanState.RemoteCommitment.CustomBlob = fn.MapOption(
func(desc AuxFundingDesc) tlv.Blob {
return desc.CustomRemoteCommitBlob
},
)(req.auxFundingDesc)
chanType := pendingReservation.partialState.ChanType
chanState.FundingOutpoint = *req.fundingOutpoint
fundingTxIn := wire.NewTxIn(req.fundingOutpoint, nil, nil)
@ -2295,6 +2389,18 @@ func (l *LightningWallet) handleSingleFunderSigs(req *addSingleFunderSigsMsg) {
if pendingReservation.partialState.ChanType.HasLeaseExpiration() {
leaseExpiry = pendingReservation.partialState.ThawHeight
}
localAuxLeaves := fn.MapOption(
func(desc AuxFundingDesc) CommitAuxLeaves {
return desc.LocalInitAuxLeaves
},
)(req.auxFundingDesc)
remoteAuxLeaves := fn.MapOption(
func(desc AuxFundingDesc) CommitAuxLeaves {
return desc.RemoteInitAuxLeaves
},
)(req.auxFundingDesc)
ourCommitTx, theirCommitTx, err := CreateCommitmentTxns(
localBalance, remoteBalance,
pendingReservation.ourContribution.ChannelConfig,
@ -2303,6 +2409,7 @@ func (l *LightningWallet) handleSingleFunderSigs(req *addSingleFunderSigsMsg) {
pendingReservation.theirContribution.FirstCommitmentPoint,
*fundingTxIn, chanType,
pendingReservation.partialState.IsInitiator, leaseExpiry,
WithAuxLeaves(localAuxLeaves, remoteAuxLeaves),
)
if err != nil {
req.err <- err

2
log.go
View file

@ -40,6 +40,7 @@ import (
"github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/peer"
"github.com/lightningnetwork/lnd/peernotifier"
"github.com/lightningnetwork/lnd/protofsm"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/rpcperms"
"github.com/lightningnetwork/lnd/signal"
@ -179,6 +180,7 @@ func SetupLoggers(root *build.RotatingLogWriter, interceptor signal.Interceptor)
AddSubLogger(root, btcwallet.Subsystem, interceptor, btcwallet.UseLogger)
AddSubLogger(root, rpcwallet.Subsystem, interceptor, rpcwallet.UseLogger)
AddSubLogger(root, peersrpc.Subsystem, interceptor, peersrpc.UseLogger)
AddSubLogger(root, protofsm.Subsystem, interceptor, protofsm.UseLogger)
}
// AddSubLogger is a helper method to conveniently create and register the

View file

@ -42,6 +42,7 @@ import (
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/pool"
"github.com/lightningnetwork/lnd/protofsm"
"github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/subscribe"
"github.com/lightningnetwork/lnd/ticker"
@ -374,6 +375,11 @@ type Config struct {
// invalid.
DisallowRouteBlinding bool
// MsgRouter is an optional instance of the main message router that
// the peer will use. If None, then a new default version will be used
// in place.
MsgRouter fn.Option[protofsm.MsgRouter]
// Quit is the server's quit channel. If this is closed, we halt operation.
Quit chan struct{}
}
@ -493,6 +499,10 @@ type Brontide struct {
// potentially holding lots of un-consumed events.
channelEventClient *subscribe.Client
// msgRouter is an instance of the MsgRouter which is used to send off
// new wire messages for handing.
msgRouter fn.Option[protofsm.MsgRouter]
startReady chan struct{}
quit chan struct{}
wg sync.WaitGroup
@ -508,6 +518,12 @@ var _ lnpeer.Peer = (*Brontide)(nil)
func NewBrontide(cfg Config) *Brontide {
logPrefix := fmt.Sprintf("Peer(%x):", cfg.PubKeyBytes)
// We'll either use the msg router instance passed in, or create a new
// blank instance.
msgRouter := cfg.MsgRouter.Alt(fn.Some[protofsm.MsgRouter](
protofsm.NewMultiMsgRouter(),
))
p := &Brontide{
cfg: cfg,
activeSignal: make(chan struct{}),
@ -530,6 +546,7 @@ func NewBrontide(cfg Config) *Brontide {
startReady: make(chan struct{}),
quit: make(chan struct{}),
log: build.NewPrefixLog(logPrefix, peerLog),
msgRouter: msgRouter,
}
var (
@ -704,6 +721,12 @@ func (p *Brontide) Start() error {
return err
}
// Register the message router now as we may need to register some
// endpoints while loading the channels below.
p.msgRouter.WhenSome(func(router protofsm.MsgRouter) {
router.Start()
})
msgs, err := p.loadActiveChannels(activeChans)
if err != nil {
return fmt.Errorf("unable to load channels: %w", err)
@ -882,7 +905,8 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
p.cfg.Signer, dbChan, p.cfg.SigPool, chanOpts...,
)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to create channel "+
"state machine: %w", err)
}
chanPoint := dbChan.FundingOutpoint
@ -1270,6 +1294,10 @@ func (p *Brontide) Disconnect(reason error) {
p.log.Errorf("couldn't stop pingManager during disconnect: %v",
err)
}
p.msgRouter.WhenSome(func(router protofsm.MsgRouter) {
router.Stop()
})
}
// String returns the string representation of this peer.
@ -1709,6 +1737,24 @@ out:
}
}
// If a message router is active, then we'll try to have it
// handle this message. If it can, then we're able to skip the
// rest of the message handling logic.
err = fn.MapOptionZ(
p.msgRouter, func(r protofsm.MsgRouter) error {
return r.RouteMsg(protofsm.PeerMsg{
PeerPub: *p.IdentityKey(),
Message: nextMsg,
})
},
)
// No error occurred, and the message was handled by the
// router.
if err == nil {
continue
}
var (
targetChan lnwire.ChannelID
isLinkUpdate bool

32
protofsm/log.go Normal file
View file

@ -0,0 +1,32 @@
package protofsm
import (
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/build"
)
// Subsystem defines the logging code for this subsystem.
const Subsystem = "PFSM"
// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log btclog.Logger
// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger(Subsystem, nil))
}
// DisableLog disables all library log output. Logging output is disabled
// by default until UseLogger is called.
func DisableLog() {
UseLogger(btclog.Disabled)
}
// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}

313
protofsm/msg_router.go Normal file
View file

@ -0,0 +1,313 @@
package protofsm
// For some reason golangci-lint has a false positive on the sort order of the
// imports for the new "maps" package... We need the nolint directive here to
// ignore that.
//
//nolint:gci
import (
"fmt"
"maps"
"sync"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/lnwire"
)
var (
// ErrDuplicateEndpoint is returned when an endpoint is registered with
// a name that already exists.
ErrDuplicateEndpoint = fmt.Errorf("endpoint already registered")
// ErrUnableToRouteMsg is returned when a message is unable to be
// routed to any endpoints.
ErrUnableToRouteMsg = fmt.Errorf("unable to route message")
)
// EndpointName is the name of a given endpoint. This MUST be unique across all
// registered endpoints.
type EndpointName = string
// PeerMsg is a wire message that includes the public key of the peer that sent
// it.
type PeerMsg struct {
lnwire.Message
// PeerPub is the public key of the peer that sent this message.
PeerPub btcec.PublicKey
}
// MsgEndpoint is an interface that represents a message endpoint, or the
// sub-system that will handle processing an incoming wire message.
type MsgEndpoint interface {
// Name returns the name of this endpoint. This MUST be unique across
// all registered endpoints.
Name() EndpointName
// CanHandle returns true if the target message can be routed to this
// endpoint.
CanHandle(msg PeerMsg) bool
// SendMessage handles the target message, and returns true if the
// message was able being processed.
SendMessage(msg PeerMsg) bool
}
// MsgRouter is an interface that represents a message router, which is generic
// sub-system capable of routing any incoming wire message to a set of
// registered endpoints.
type MsgRouter interface {
// RegisterEndpoint registers a new endpoint with the router. If a
// duplicate endpoint exists, an error is returned.
RegisterEndpoint(MsgEndpoint) error
// UnregisterEndpoint unregisters the target endpoint from the router.
UnregisterEndpoint(EndpointName) error
// RouteMsg attempts to route the target message to a registered
// endpoint. If ANY endpoint could handle the message, then nil is
// returned. Otherwise, ErrUnableToRouteMsg is returned.
RouteMsg(PeerMsg) error
// Start starts the peer message router.
Start()
// Stop stops the peer message router.
Stop()
}
// queryMsg is a message sent into the main event loop to query or modify the
// internal state.
type queryMsg[Q any, R any] struct {
query Q
respChan chan fn.Either[R, error]
}
// SendError sends a response to the query with the given error value.
func (q *queryMsg[Q, R]) SendError(err error) {
q.respChan <- fn.NewRight[R, error](err)
}
// SendResponse sends a response to the query with the given response value.
func (q *queryMsg[Q, R]) SendResponse(resp R) {
q.respChan <- fn.NewLeft[R, error](resp)
}
// sendQuery sends a query to the main event loop, and returns the response.
func sendQuery[Q any, R any](sendChan chan queryMsg[Q, R], queryArg Q,
quit chan struct{}) fn.Either[R, error] {
query := queryMsg[Q, R]{
query: queryArg,
respChan: make(chan fn.Either[R, error], 1),
}
if !fn.SendOrQuit(sendChan, query, quit) {
return fn.NewRight[R](fmt.Errorf("router shutting down"))
}
resp, err := fn.RecvResp(query.respChan, nil, quit)
if err != nil {
return fn.NewRight[R](err)
}
return resp
}
// sendQueryErr is a helper function based on sendQuery that can be used when
// the query only needs an error response.
func sendQueryErr[Q any](sendChan chan queryMsg[Q, error], queryArg Q,
quitChan chan struct{}) error {
var err error
resp := sendQuery(sendChan, queryArg, quitChan)
resp.WhenRight(func(e error) {
err = e
})
resp.WhenLeft(func(e error) {
err = e
})
return err
}
// EndpointsMap is a map of all registered endpoints.
type EndpointsMap map[EndpointName]MsgEndpoint
// MultiMsgRouter is a type of message router that is capable of routing new
// incoming messages, permitting a message to be routed to multiple registered
// endpoints.
type MultiMsgRouter struct {
startOnce sync.Once
stopOnce sync.Once
// registerChan is the channel that all new endpoints will be sent to.
registerChan chan queryMsg[MsgEndpoint, error]
// unregisterChan is the channel that all endpoints that are to be
// removed are sent to.
unregisterChan chan queryMsg[EndpointName, error]
// msgChan is the channel that all messages will be sent to for
// processing.
msgChan chan queryMsg[PeerMsg, error]
// endpointsQueries is a channel that all queries to the endpoints map
// will be sent to.
endpointQueries chan queryMsg[MsgEndpoint, EndpointsMap]
wg sync.WaitGroup
quit chan struct{}
}
// NewMultiMsgRouter creates a new instance of a peer message router.
func NewMultiMsgRouter() *MultiMsgRouter {
return &MultiMsgRouter{
registerChan: make(chan queryMsg[MsgEndpoint, error]),
unregisterChan: make(chan queryMsg[EndpointName, error]),
msgChan: make(chan queryMsg[PeerMsg, error]),
endpointQueries: make(chan queryMsg[MsgEndpoint, EndpointsMap]),
quit: make(chan struct{}),
}
}
// Start starts the peer message router.
func (p *MultiMsgRouter) Start() {
log.Infof("Starting MsgRouter")
p.startOnce.Do(func() {
p.wg.Add(1)
go p.msgRouter()
})
}
// Stop stops the peer message router.
func (p *MultiMsgRouter) Stop() {
log.Infof("Stopping MsgRouter")
p.stopOnce.Do(func() {
close(p.quit)
p.wg.Wait()
})
}
// RegisterEndpoint registers a new endpoint with the router. If a duplicate
// endpoint exists, an error is returned.
func (p *MultiMsgRouter) RegisterEndpoint(endpoint MsgEndpoint) error {
return sendQueryErr(p.registerChan, endpoint, p.quit)
}
// UnregisterEndpoint unregisters the target endpoint from the router.
func (p *MultiMsgRouter) UnregisterEndpoint(name EndpointName) error {
return sendQueryErr(p.unregisterChan, name, p.quit)
}
// RouteMsg attempts to route the target message to a registered endpoint. If
// ANY endpoint could handle the message, then true is
// returned.
func (p *MultiMsgRouter) RouteMsg(msg PeerMsg) error {
return sendQueryErr(p.msgChan, msg, p.quit)
}
// Endpoints returns a list of all registered endpoints.
func (p *MultiMsgRouter) Endpoints() EndpointsMap {
resp := sendQuery(p.endpointQueries, nil, p.quit)
var endpoints EndpointsMap
resp.WhenLeft(func(e EndpointsMap) {
endpoints = e
})
return endpoints
}
// msgRouter is the main goroutine that handles all incoming messages.
func (p *MultiMsgRouter) msgRouter() {
defer p.wg.Done()
// endpoints is a map of all registered endpoints.
endpoints := make(map[EndpointName]MsgEndpoint)
for {
select {
// A new endpoint was just sent in, so we'll add it to our set
// of registered endpoints.
case newEndpointMsg := <-p.registerChan:
endpoint := newEndpointMsg.query
log.Infof("MsgRouter: registering new MsgEndpoint(%s)",
endpoint.Name())
// If this endpoint already exists, then we'll return
// an error as we require unique names.
if _, ok := endpoints[endpoint.Name()]; ok {
log.Errorf("MsgRouter: rejecting duplicate "+
"endpoint: %v", endpoint.Name())
newEndpointMsg.SendError(ErrDuplicateEndpoint)
continue
}
endpoints[endpoint.Name()] = endpoint
newEndpointMsg.SendError(nil)
// A request to unregister an endpoint was just sent in, so
// we'll attempt to remove it.
case endpointName := <-p.unregisterChan:
delete(endpoints, endpointName.query)
log.Infof("MsgRouter: unregistering MsgEndpoint(%s)",
endpointName.query)
endpointName.SendError(nil)
// A new message was just sent in. We'll attempt to route it to
// all the endpoints that can handle it.
case msgQuery := <-p.msgChan:
msg := msgQuery.query
// Loop through all the endpoints and send the message
// to those that can handle it the message.
var couldSend bool
for _, endpoint := range endpoints {
if endpoint.CanHandle(msg) {
log.Debugf("MsgRouter: sending msg %T "+
"to endpoint %s", msg.Message,
endpoint.Name())
sent := endpoint.SendMessage(msg)
couldSend = couldSend || sent
}
}
var err error
if !couldSend {
log.Debugf("MsgRouter: unable to route msg %T",
msg)
err = ErrUnableToRouteMsg
}
msgQuery.SendError(err)
// A query for the endpoint state just came in, we'll send back
// a copy of our current state.
case endpointQuery := <-p.endpointQueries:
endpointsCopy := make(EndpointsMap, len(endpoints))
maps.Copy(endpointsCopy, endpoints)
endpointQuery.SendResponse(endpointsCopy)
case <-p.quit:
return
}
}
}
// A compile time check to ensure MultiMsgRouter implements the MsgRouter
// interface.
var _ MsgRouter = (*MultiMsgRouter)(nil)

152
protofsm/msg_router_test.go Normal file
View file

@ -0,0 +1,152 @@
package protofsm
import (
"testing"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
type mockEndpoint struct {
mock.Mock
}
func (m *mockEndpoint) Name() string {
args := m.Called()
return args.String(0)
}
func (m *mockEndpoint) CanHandle(msg PeerMsg) bool {
args := m.Called(msg)
return args.Bool(0)
}
func (m *mockEndpoint) SendMessage(msg PeerMsg) bool {
args := m.Called(msg)
return args.Bool(0)
}
// TestMessageRouterOperation tests the basic operation of the message router:
// add new endpoints, route to them, remove, them, etc.
func TestMessageRouterOperation(t *testing.T) {
msgRouter := NewMultiMsgRouter()
msgRouter.Start()
defer msgRouter.Stop()
openChanMsg := PeerMsg{
Message: &lnwire.OpenChannel{},
}
commitSigMsg := PeerMsg{
Message: &lnwire.CommitSig{},
}
errorMsg := PeerMsg{
Message: &lnwire.Error{},
}
// For this test, we'll have two endpoints, each with distinct names.
// One endpoint will only handle OpenChannel, while the other will
// handle the CommitSig message.
fundingEndpoint := &mockEndpoint{}
fundingEndpointName := "funding"
fundingEndpoint.On("Name").Return(fundingEndpointName)
fundingEndpoint.On("CanHandle", openChanMsg).Return(true)
fundingEndpoint.On("CanHandle", errorMsg).Return(false)
fundingEndpoint.On("CanHandle", commitSigMsg).Return(false)
fundingEndpoint.On("SendMessage", openChanMsg).Return(true)
commitEndpoint := &mockEndpoint{}
commitEndpointName := "commit"
commitEndpoint.On("Name").Return(commitEndpointName)
commitEndpoint.On("CanHandle", commitSigMsg).Return(true)
commitEndpoint.On("CanHandle", openChanMsg).Return(false)
commitEndpoint.On("CanHandle", errorMsg).Return(false)
commitEndpoint.On("SendMessage", commitSigMsg).Return(true)
t.Run("add endpoints", func(t *testing.T) {
// First, we'll add the funding endpoint to the router.
require.NoError(t, msgRouter.RegisterEndpoint(fundingEndpoint))
// There should be a single endpoint registered.
require.Len(t, msgRouter.Endpoints(), 1)
// The name of the registered endpoint should be "funding".
require.Equal(
t, "funding",
msgRouter.Endpoints()[fundingEndpointName].Name(),
)
})
t.Run("duplicate endpoint reject", func(t *testing.T) {
// Next, we'll attempt to add the funding endpoint again. This
// should return an ErrDuplicateEndpoint error.
require.ErrorIs(
t, msgRouter.RegisterEndpoint(fundingEndpoint),
ErrDuplicateEndpoint,
)
})
t.Run("route to endpoint", func(t *testing.T) {
// Next, we'll add our other endpoint, then attempt to route a
// message.
require.NoError(t, msgRouter.RegisterEndpoint(commitEndpoint))
// If we try to route a message none of the endpoints know of,
// we should get an error.
require.ErrorIs(
t, msgRouter.RouteMsg(errorMsg), ErrUnableToRouteMsg,
)
fundingEndpoint.AssertCalled(t, "CanHandle", errorMsg)
commitEndpoint.AssertCalled(t, "CanHandle", errorMsg)
// Next, we'll route the open channel message. Only the
// fundingEndpoint should be used.
require.NoError(t, msgRouter.RouteMsg(openChanMsg))
fundingEndpoint.AssertCalled(t, "CanHandle", openChanMsg)
commitEndpoint.AssertCalled(t, "CanHandle", openChanMsg)
fundingEndpoint.AssertCalled(t, "SendMessage", openChanMsg)
commitEndpoint.AssertNotCalled(t, "SendMessage", openChanMsg)
// We'll do the same for the commit sig message.
require.NoError(t, msgRouter.RouteMsg(commitSigMsg))
fundingEndpoint.AssertCalled(t, "CanHandle", commitSigMsg)
commitEndpoint.AssertCalled(t, "CanHandle", commitSigMsg)
commitEndpoint.AssertCalled(t, "SendMessage", commitSigMsg)
fundingEndpoint.AssertNotCalled(t, "SendMessage", commitSigMsg)
})
t.Run("remove endpoints", func(t *testing.T) {
// Finally, we'll remove both endpoints.
require.NoError(
t, msgRouter.UnregisterEndpoint(fundingEndpointName),
)
require.NoError(
t, msgRouter.UnregisterEndpoint(commitEndpointName),
)
// There should be no endpoints registered.
require.Len(t, msgRouter.Endpoints(), 0)
// Trying to route a message should fail.
require.ErrorIs(
t, msgRouter.RouteMsg(openChanMsg),
ErrUnableToRouteMsg,
)
require.ErrorIs(
t, msgRouter.RouteMsg(commitSigMsg),
ErrUnableToRouteMsg,
)
})
commitEndpoint.AssertExpectations(t)
fundingEndpoint.AssertExpectations(t)
}

View file

@ -12,6 +12,7 @@ import (
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors"
@ -1510,8 +1511,8 @@ func (r *ChannelRouter) addZombieEdge(chanID uint64) error {
// segwit v1 (taproot) channels.
//
// TODO(roasbeef: export and use elsewhere?
func makeFundingScript(bitcoinKey1, bitcoinKey2 []byte,
chanFeatures []byte) ([]byte, error) {
func makeFundingScript(bitcoinKey1, bitcoinKey2 []byte, chanFeatures []byte,
tapscriptRoot fn.Option[chainhash.Hash]) ([]byte, error) {
legacyFundingScript := func() ([]byte, error) {
witnessScript, err := input.GenMultiSigScript(
@ -1557,8 +1558,15 @@ func makeFundingScript(bitcoinKey1, bitcoinKey2 []byte,
return nil, err
}
var fundingOpts []input.FundingScriptOpt
tapscriptRoot.WhenSome(func(root chainhash.Hash) {
fundingOpts = append(
fundingOpts, input.WithTapscriptRoot(root),
)
})
fundingScript, _, err := input.GenTaprootFundingScript(
pubKey1, pubKey2, 0,
pubKey1, pubKey2, 0, fundingOpts...,
)
if err != nil {
return nil, err
@ -1683,7 +1691,7 @@ func (r *ChannelRouter) processUpdate(msg interface{},
// reality.
fundingPkScript, err := makeFundingScript(
msg.BitcoinKey1Bytes[:], msg.BitcoinKey2Bytes[:],
msg.Features,
msg.Features, msg.TapscriptRoot,
)
if err != nil {
return err

View file

@ -1505,8 +1505,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
EnableUpfrontShutdown: cfg.EnableUpfrontShutdown,
MaxAnchorsCommitFeeRate: chainfee.SatPerKVByte(
s.cfg.MaxCommitFeeRateAnchors * 1000).FeePerKWeight(),
DeleteAliasEdge: deleteAliasEdge,
AliasManager: s.aliasMgr,
DeleteAliasEdge: deleteAliasEdge,
AliasManager: s.aliasMgr,
AuxFundingController: implCfg.AuxFundingController,
})
if err != nil {
return nil, err
@ -3924,6 +3925,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
DisallowRouteBlinding: s.cfg.ProtocolOptions.NoRouteBlinding(),
Quit: s.quit,
AuxLeafStore: s.implCfg.AuxLeafStore,
MsgRouter: s.implCfg.MsgRouter,
}
copy(pCfg.PubKeyBytes[:], peerAddr.IdentityKey.SerializeCompressed())