peer: update readHandler to dispatch to msgRouter if set

Over time with this, we should be able to significantly reduce the size
of the peer.Brontide struct as we only need all those deps as the peer
needs to recognize and handle each incoming wire message itself.
This commit is contained in:
Olaoluwa Osuntokun 2024-01-30 18:00:11 -08:00 committed by Oliver Gugger
parent e5e5da381d
commit 371e0148dc
No known key found for this signature in database
GPG key ID: 8E4256593F177720

View file

@ -42,6 +42,7 @@ import (
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/netann" "github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/pool" "github.com/lightningnetwork/lnd/pool"
"github.com/lightningnetwork/lnd/protofsm"
"github.com/lightningnetwork/lnd/queue" "github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/subscribe" "github.com/lightningnetwork/lnd/subscribe"
"github.com/lightningnetwork/lnd/ticker" "github.com/lightningnetwork/lnd/ticker"
@ -493,6 +494,10 @@ type Brontide struct {
// potentially holding lots of un-consumed events. // potentially holding lots of un-consumed events.
channelEventClient *subscribe.Client channelEventClient *subscribe.Client
// msgRouter is an instance of the MsgRouter which is used to send off
// new wire messages for handing.
msgRouter fn.Option[protofsm.MsgRouter]
startReady chan struct{} startReady chan struct{}
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
@ -530,6 +535,9 @@ func NewBrontide(cfg Config) *Brontide {
startReady: make(chan struct{}), startReady: make(chan struct{}),
quit: make(chan struct{}), quit: make(chan struct{}),
log: build.NewPrefixLog(logPrefix, peerLog), log: build.NewPrefixLog(logPrefix, peerLog),
msgRouter: fn.Some[protofsm.MsgRouter](
protofsm.NewMultiMsgRouter(),
),
} }
var ( var (
@ -704,6 +712,12 @@ func (p *Brontide) Start() error {
return err return err
} }
// Register the message router now as we may need to register some
// endpoints while loading the channels below.
p.msgRouter.WhenSome(func(router protofsm.MsgRouter) {
router.Start()
})
msgs, err := p.loadActiveChannels(activeChans) msgs, err := p.loadActiveChannels(activeChans)
if err != nil { if err != nil {
return fmt.Errorf("unable to load channels: %w", err) return fmt.Errorf("unable to load channels: %w", err)
@ -882,7 +896,8 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
p.cfg.Signer, dbChan, p.cfg.SigPool, chanOpts..., p.cfg.Signer, dbChan, p.cfg.SigPool, chanOpts...,
) )
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("unable to create channel "+
"state machine: %w", err)
} }
chanPoint := dbChan.FundingOutpoint chanPoint := dbChan.FundingOutpoint
@ -1270,6 +1285,10 @@ func (p *Brontide) Disconnect(reason error) {
p.log.Errorf("couldn't stop pingManager during disconnect: %v", p.log.Errorf("couldn't stop pingManager during disconnect: %v",
err) err)
} }
p.msgRouter.WhenSome(func(router protofsm.MsgRouter) {
router.Stop()
})
} }
// String returns the string representation of this peer. // String returns the string representation of this peer.
@ -1709,6 +1728,24 @@ out:
} }
} }
// If a message router is active, then we'll try to have it
// handle this message. If it can, then we're able to skip the
// rest of the message handling logic.
err = fn.MapOptionZ(
p.msgRouter, func(r protofsm.MsgRouter) error {
return r.RouteMsg(protofsm.PeerMsg{
PeerPub: *p.IdentityKey(),
Message: nextMsg,
})
},
)
// No error occurred, and the message was handled by the
// router.
if err == nil {
continue
}
var ( var (
targetChan lnwire.ChannelID targetChan lnwire.ChannelID
isLinkUpdate bool isLinkUpdate bool