mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-01-18 21:35:24 +01:00
msgmux: add new abstract message router
In this commit, we add a new abstract message router. Over time, the goal is that this message router replaces the logic we currently have in the readHandler (the giant switch for each message). With this new abstraction, can reduce the responsibilities of the readHandler to *just* reading messages off the wire and handing them off to the msg router. The readHandler no longer needs to know *where* the messages should go, or how they should be dispatched. This will be used in tandem with the new `protofsm` module in an upcoming PR implementing the new rbf-coop close.
This commit is contained in:
parent
77c7f776d5
commit
f124195ae9
2
go.mod
2
go.mod
@ -35,7 +35,7 @@ require (
|
||||
github.com/lightningnetwork/lightning-onion v1.2.1-0.20240712235311-98bd56499dfb
|
||||
github.com/lightningnetwork/lnd/cert v1.2.2
|
||||
github.com/lightningnetwork/lnd/clock v1.1.1
|
||||
github.com/lightningnetwork/lnd/fn v1.2.0
|
||||
github.com/lightningnetwork/lnd/fn v1.2.1
|
||||
github.com/lightningnetwork/lnd/healthcheck v1.2.5
|
||||
github.com/lightningnetwork/lnd/kvdb v1.4.10
|
||||
github.com/lightningnetwork/lnd/queue v1.1.1
|
||||
|
4
go.sum
4
go.sum
@ -450,8 +450,8 @@ github.com/lightningnetwork/lnd/cert v1.2.2 h1:71YK6hogeJtxSxw2teq3eGeuy4rHGKcFf
|
||||
github.com/lightningnetwork/lnd/cert v1.2.2/go.mod h1:jQmFn/Ez4zhDgq2hnYSw8r35bqGVxViXhX6Cd7HXM6U=
|
||||
github.com/lightningnetwork/lnd/clock v1.1.1 h1:OfR3/zcJd2RhH0RU+zX/77c0ZiOnIMsDIBjgjWdZgA0=
|
||||
github.com/lightningnetwork/lnd/clock v1.1.1/go.mod h1:mGnAhPyjYZQJmebS7aevElXKTFDuO+uNFFfMXK1W8xQ=
|
||||
github.com/lightningnetwork/lnd/fn v1.2.0 h1:YTb2m8NN5ZiJAskHeBZAmR1AiPY8SXziIYPAX1VI/ZM=
|
||||
github.com/lightningnetwork/lnd/fn v1.2.0/go.mod h1:SyFohpVrARPKH3XVAJZlXdVe+IwMYc4OMAvrDY32kw0=
|
||||
github.com/lightningnetwork/lnd/fn v1.2.1 h1:pPsVGrwi9QBwdLJzaEGK33wmiVKOxs/zc8H7+MamFf0=
|
||||
github.com/lightningnetwork/lnd/fn v1.2.1/go.mod h1:SyFohpVrARPKH3XVAJZlXdVe+IwMYc4OMAvrDY32kw0=
|
||||
github.com/lightningnetwork/lnd/healthcheck v1.2.5 h1:aTJy5xeBpcWgRtW/PGBDe+LMQEmNm/HQewlQx2jt7OA=
|
||||
github.com/lightningnetwork/lnd/healthcheck v1.2.5/go.mod h1:G7Tst2tVvWo7cx6mSBEToQC5L1XOGxzZTPB29g9Rv2I=
|
||||
github.com/lightningnetwork/lnd/kvdb v1.4.10 h1:vK89IVv1oVH9ubQWU+EmoCQFeVRaC8kfmOrqHbY5zoY=
|
||||
|
32
msgmux/log.go
Normal file
32
msgmux/log.go
Normal file
@ -0,0 +1,32 @@
|
||||
package msgmux
|
||||
|
||||
import (
|
||||
"github.com/btcsuite/btclog"
|
||||
"github.com/lightningnetwork/lnd/build"
|
||||
)
|
||||
|
||||
// Subsystem defines the logging code for this subsystem.
|
||||
const Subsystem = "MSGX"
|
||||
|
||||
// log is a logger that is initialized with no output filters. This
|
||||
// means the package will not perform any logging by default until the caller
|
||||
// requests it.
|
||||
var log btclog.Logger
|
||||
|
||||
// The default amount of logging is none.
|
||||
func init() {
|
||||
UseLogger(build.NewSubLogger(Subsystem, nil))
|
||||
}
|
||||
|
||||
// DisableLog disables all library log output. Logging output is disabled
|
||||
// by default until UseLogger is called.
|
||||
func DisableLog() {
|
||||
UseLogger(btclog.Disabled)
|
||||
}
|
||||
|
||||
// UseLogger uses a specified Logger to output package logging info.
|
||||
// This should be used in preference to SetLogWriter if the caller is also
|
||||
// using btclog.
|
||||
func UseLogger(logger btclog.Logger) {
|
||||
log = logger
|
||||
}
|
274
msgmux/msg_router.go
Normal file
274
msgmux/msg_router.go
Normal file
@ -0,0 +1,274 @@
|
||||
package msgmux
|
||||
|
||||
// For some reason golangci-lint has a false positive on the sort order of the
|
||||
// imports for the new "maps" package... We need the nolint directive here to
|
||||
// ignore that.
|
||||
//
|
||||
//nolint:gci
|
||||
import (
|
||||
"fmt"
|
||||
"maps"
|
||||
"sync"
|
||||
|
||||
"github.com/btcsuite/btcd/btcec/v2"
|
||||
"github.com/lightningnetwork/lnd/fn"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrDuplicateEndpoint is returned when an endpoint is registered with
|
||||
// a name that already exists.
|
||||
ErrDuplicateEndpoint = fmt.Errorf("endpoint already registered")
|
||||
|
||||
// ErrUnableToRouteMsg is returned when a message is unable to be
|
||||
// routed to any endpoints.
|
||||
ErrUnableToRouteMsg = fmt.Errorf("unable to route message")
|
||||
)
|
||||
|
||||
// EndpointName is the name of a given endpoint. This MUST be unique across all
|
||||
// registered endpoints.
|
||||
type EndpointName = string
|
||||
|
||||
// PeerMsg is a wire message that includes the public key of the peer that sent
|
||||
// it.
|
||||
type PeerMsg struct {
|
||||
lnwire.Message
|
||||
|
||||
// PeerPub is the public key of the peer that sent this message.
|
||||
PeerPub btcec.PublicKey
|
||||
}
|
||||
|
||||
// Endpoint is an interface that represents a message endpoint, or the
|
||||
// sub-system that will handle processing an incoming wire message.
|
||||
type Endpoint interface {
|
||||
// Name returns the name of this endpoint. This MUST be unique across
|
||||
// all registered endpoints.
|
||||
Name() EndpointName
|
||||
|
||||
// CanHandle returns true if the target message can be routed to this
|
||||
// endpoint.
|
||||
CanHandle(msg PeerMsg) bool
|
||||
|
||||
// SendMessage handles the target message, and returns true if the
|
||||
// message was able being processed.
|
||||
SendMessage(msg PeerMsg) bool
|
||||
}
|
||||
|
||||
// MsgRouter is an interface that represents a message router, which is generic
|
||||
// sub-system capable of routing any incoming wire message to a set of
|
||||
// registered endpoints.
|
||||
type Router interface {
|
||||
// RegisterEndpoint registers a new endpoint with the router. If a
|
||||
// duplicate endpoint exists, an error is returned.
|
||||
RegisterEndpoint(Endpoint) error
|
||||
|
||||
// UnregisterEndpoint unregisters the target endpoint from the router.
|
||||
UnregisterEndpoint(EndpointName) error
|
||||
|
||||
// RouteMsg attempts to route the target message to a registered
|
||||
// endpoint. If ANY endpoint could handle the message, then nil is
|
||||
// returned. Otherwise, ErrUnableToRouteMsg is returned.
|
||||
RouteMsg(PeerMsg) error
|
||||
|
||||
// Start starts the peer message router.
|
||||
Start()
|
||||
|
||||
// Stop stops the peer message router.
|
||||
Stop()
|
||||
}
|
||||
|
||||
// sendQuery sends a query to the main event loop, and returns the response.
|
||||
func sendQuery[Q any, R any](sendChan chan fn.Req[Q, R], queryArg Q,
|
||||
quit chan struct{}) fn.Result[R] {
|
||||
|
||||
query, respChan := fn.NewReq[Q, R](queryArg)
|
||||
|
||||
if !fn.SendOrQuit(sendChan, query, quit) {
|
||||
return fn.Errf[R]("router shutting down")
|
||||
}
|
||||
|
||||
return fn.NewResult(fn.RecvResp(respChan, nil, quit))
|
||||
}
|
||||
|
||||
// sendQueryErr is a helper function based on sendQuery that can be used when
|
||||
// the query only needs an error response.
|
||||
func sendQueryErr[Q any](sendChan chan fn.Req[Q, error], queryArg Q,
|
||||
quitChan chan struct{}) error {
|
||||
|
||||
return fn.ElimEither(
|
||||
fn.Iden, fn.Iden,
|
||||
sendQuery(sendChan, queryArg, quitChan).Either,
|
||||
)
|
||||
}
|
||||
|
||||
// EndpointsMap is a map of all registered endpoints.
|
||||
type EndpointsMap map[EndpointName]Endpoint
|
||||
|
||||
// MultiMsgRouter is a type of message router that is capable of routing new
|
||||
// incoming messages, permitting a message to be routed to multiple registered
|
||||
// endpoints.
|
||||
type MultiMsgRouter struct {
|
||||
startOnce sync.Once
|
||||
stopOnce sync.Once
|
||||
|
||||
// registerChan is the channel that all new endpoints will be sent to.
|
||||
registerChan chan fn.Req[Endpoint, error]
|
||||
|
||||
// unregisterChan is the channel that all endpoints that are to be
|
||||
// removed are sent to.
|
||||
unregisterChan chan fn.Req[EndpointName, error]
|
||||
|
||||
// msgChan is the channel that all messages will be sent to for
|
||||
// processing.
|
||||
msgChan chan fn.Req[PeerMsg, error]
|
||||
|
||||
// endpointsQueries is a channel that all queries to the endpoints map
|
||||
// will be sent to.
|
||||
endpointQueries chan fn.Req[Endpoint, EndpointsMap]
|
||||
|
||||
wg sync.WaitGroup
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
// NewMultiMsgRouter creates a new instance of a peer message router.
|
||||
func NewMultiMsgRouter() *MultiMsgRouter {
|
||||
return &MultiMsgRouter{
|
||||
registerChan: make(chan fn.Req[Endpoint, error]),
|
||||
unregisterChan: make(chan fn.Req[EndpointName, error]),
|
||||
msgChan: make(chan fn.Req[PeerMsg, error]),
|
||||
endpointQueries: make(chan fn.Req[Endpoint, EndpointsMap]),
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts the peer message router.
|
||||
func (p *MultiMsgRouter) Start() {
|
||||
log.Infof("Starting Router")
|
||||
|
||||
p.startOnce.Do(func() {
|
||||
p.wg.Add(1)
|
||||
go p.msgRouter()
|
||||
})
|
||||
}
|
||||
|
||||
// Stop stops the peer message router.
|
||||
func (p *MultiMsgRouter) Stop() {
|
||||
log.Infof("Stopping Router")
|
||||
|
||||
p.stopOnce.Do(func() {
|
||||
close(p.quit)
|
||||
p.wg.Wait()
|
||||
})
|
||||
}
|
||||
|
||||
// RegisterEndpoint registers a new endpoint with the router. If a duplicate
|
||||
// endpoint exists, an error is returned.
|
||||
func (p *MultiMsgRouter) RegisterEndpoint(endpoint Endpoint) error {
|
||||
return sendQueryErr(p.registerChan, endpoint, p.quit)
|
||||
}
|
||||
|
||||
// UnregisterEndpoint unregisters the target endpoint from the router.
|
||||
func (p *MultiMsgRouter) UnregisterEndpoint(name EndpointName) error {
|
||||
return sendQueryErr(p.unregisterChan, name, p.quit)
|
||||
}
|
||||
|
||||
// RouteMsg attempts to route the target message to a registered endpoint. If
|
||||
// ANY endpoint could handle the message, then nil is returned.
|
||||
func (p *MultiMsgRouter) RouteMsg(msg PeerMsg) error {
|
||||
return sendQueryErr(p.msgChan, msg, p.quit)
|
||||
}
|
||||
|
||||
// Endpoints returns a list of all registered endpoints.
|
||||
func (p *MultiMsgRouter) endpoints() fn.Result[EndpointsMap] {
|
||||
return sendQuery(p.endpointQueries, nil, p.quit)
|
||||
}
|
||||
|
||||
// msgRouter is the main goroutine that handles all incoming messages.
|
||||
func (p *MultiMsgRouter) msgRouter() {
|
||||
defer p.wg.Done()
|
||||
|
||||
// endpoints is a map of all registered endpoints.
|
||||
endpoints := make(map[EndpointName]Endpoint)
|
||||
|
||||
for {
|
||||
select {
|
||||
// A new endpoint was just sent in, so we'll add it to our set
|
||||
// of registered endpoints.
|
||||
case newEndpointMsg := <-p.registerChan:
|
||||
endpoint := newEndpointMsg.Request
|
||||
|
||||
log.Infof("MsgRouter: registering new "+
|
||||
"Endpoint(%s)", endpoint.Name())
|
||||
|
||||
// If this endpoint already exists, then we'll return
|
||||
// an error as we require unique names.
|
||||
if _, ok := endpoints[endpoint.Name()]; ok {
|
||||
log.Errorf("MsgRouter: rejecting "+
|
||||
"duplicate endpoint: %v",
|
||||
endpoint.Name())
|
||||
|
||||
newEndpointMsg.Resolve(ErrDuplicateEndpoint)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
endpoints[endpoint.Name()] = endpoint
|
||||
|
||||
newEndpointMsg.Resolve(nil)
|
||||
|
||||
// A request to unregister an endpoint was just sent in, so
|
||||
// we'll attempt to remove it.
|
||||
case endpointName := <-p.unregisterChan:
|
||||
delete(endpoints, endpointName.Request)
|
||||
|
||||
log.Infof("MsgRouter: unregistering "+
|
||||
"Endpoint(%s)", endpointName.Request)
|
||||
|
||||
endpointName.Resolve(nil)
|
||||
|
||||
// A new message was just sent in. We'll attempt to route it to
|
||||
// all the endpoints that can handle it.
|
||||
case msgQuery := <-p.msgChan:
|
||||
msg := msgQuery.Request
|
||||
|
||||
// Loop through all the endpoints and send the message
|
||||
// to those that can handle it the message.
|
||||
var couldSend bool
|
||||
for _, endpoint := range endpoints {
|
||||
if endpoint.CanHandle(msg) {
|
||||
log.Tracef("MsgRouter: sending "+
|
||||
"msg %T to endpoint %s", msg,
|
||||
endpoint.Name())
|
||||
|
||||
sent := endpoint.SendMessage(msg)
|
||||
couldSend = couldSend || sent
|
||||
}
|
||||
}
|
||||
|
||||
var err error
|
||||
if !couldSend {
|
||||
log.Tracef("MsgRouter: unable to route "+
|
||||
"msg %T", msg)
|
||||
|
||||
err = ErrUnableToRouteMsg
|
||||
}
|
||||
|
||||
msgQuery.Resolve(err)
|
||||
|
||||
// A query for the endpoint state just came in, we'll send back
|
||||
// a copy of our current state.
|
||||
case endpointQuery := <-p.endpointQueries:
|
||||
endpointsCopy := make(EndpointsMap, len(endpoints))
|
||||
maps.Copy(endpointsCopy, endpoints)
|
||||
|
||||
endpointQuery.Resolve(endpointsCopy)
|
||||
|
||||
case <-p.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// A compile time check to ensure MultiMsgRouter implements the MsgRouter
|
||||
// interface.
|
||||
var _ Router = (*MultiMsgRouter)(nil)
|
157
msgmux/msg_router_test.go
Normal file
157
msgmux/msg_router_test.go
Normal file
@ -0,0 +1,157 @@
|
||||
package msgmux
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type mockEndpoint struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *mockEndpoint) Name() string {
|
||||
args := m.Called()
|
||||
|
||||
return args.String(0)
|
||||
}
|
||||
|
||||
func (m *mockEndpoint) CanHandle(msg PeerMsg) bool {
|
||||
args := m.Called(msg)
|
||||
|
||||
return args.Bool(0)
|
||||
}
|
||||
|
||||
func (m *mockEndpoint) SendMessage(msg PeerMsg) bool {
|
||||
args := m.Called(msg)
|
||||
|
||||
return args.Bool(0)
|
||||
}
|
||||
|
||||
// TestMessageRouterOperation tests the basic operation of the message router:
|
||||
// add new endpoints, route to them, remove, them, etc.
|
||||
func TestMessageRouterOperation(t *testing.T) {
|
||||
msgRouter := NewMultiMsgRouter()
|
||||
msgRouter.Start()
|
||||
defer msgRouter.Stop()
|
||||
|
||||
openChanMsg := PeerMsg{
|
||||
Message: &lnwire.OpenChannel{},
|
||||
}
|
||||
commitSigMsg := PeerMsg{
|
||||
Message: &lnwire.CommitSig{},
|
||||
}
|
||||
|
||||
errorMsg := PeerMsg{
|
||||
Message: &lnwire.Error{},
|
||||
}
|
||||
|
||||
// For this test, we'll have two endpoints, each with distinct names.
|
||||
// One endpoint will only handle OpenChannel, while the other will
|
||||
// handle the CommitSig message.
|
||||
fundingEndpoint := &mockEndpoint{}
|
||||
fundingEndpointName := "funding"
|
||||
fundingEndpoint.On("Name").Return(fundingEndpointName)
|
||||
fundingEndpoint.On("CanHandle", openChanMsg).Return(true)
|
||||
fundingEndpoint.On("CanHandle", errorMsg).Return(false)
|
||||
fundingEndpoint.On("CanHandle", commitSigMsg).Return(false)
|
||||
fundingEndpoint.On("SendMessage", openChanMsg).Return(true)
|
||||
|
||||
commitEndpoint := &mockEndpoint{}
|
||||
commitEndpointName := "commit"
|
||||
commitEndpoint.On("Name").Return(commitEndpointName)
|
||||
commitEndpoint.On("CanHandle", commitSigMsg).Return(true)
|
||||
commitEndpoint.On("CanHandle", openChanMsg).Return(false)
|
||||
commitEndpoint.On("CanHandle", errorMsg).Return(false)
|
||||
commitEndpoint.On("SendMessage", commitSigMsg).Return(true)
|
||||
|
||||
t.Run("add endpoints", func(t *testing.T) {
|
||||
// First, we'll add the funding endpoint to the router.
|
||||
require.NoError(t, msgRouter.RegisterEndpoint(fundingEndpoint))
|
||||
|
||||
endpoints, err := msgRouter.endpoints().Unpack()
|
||||
require.NoError(t, err)
|
||||
|
||||
// There should be a single endpoint registered.
|
||||
require.Len(t, endpoints, 1)
|
||||
|
||||
// The name of the registered endpoint should be "funding".
|
||||
require.Equal(
|
||||
t, "funding", endpoints[fundingEndpointName].Name(),
|
||||
)
|
||||
})
|
||||
|
||||
t.Run("duplicate endpoint reject", func(t *testing.T) {
|
||||
// Next, we'll attempt to add the funding endpoint again. This
|
||||
// should return an ErrDuplicateEndpoint error.
|
||||
require.ErrorIs(
|
||||
t, msgRouter.RegisterEndpoint(fundingEndpoint),
|
||||
ErrDuplicateEndpoint,
|
||||
)
|
||||
})
|
||||
|
||||
t.Run("route to endpoint", func(t *testing.T) {
|
||||
// Next, we'll add our other endpoint, then attempt to route a
|
||||
// message.
|
||||
require.NoError(t, msgRouter.RegisterEndpoint(commitEndpoint))
|
||||
|
||||
// If we try to route a message none of the endpoints know of,
|
||||
// we should get an error.
|
||||
require.ErrorIs(
|
||||
t, msgRouter.RouteMsg(errorMsg), ErrUnableToRouteMsg,
|
||||
)
|
||||
|
||||
fundingEndpoint.AssertCalled(t, "CanHandle", errorMsg)
|
||||
commitEndpoint.AssertCalled(t, "CanHandle", errorMsg)
|
||||
|
||||
// Next, we'll route the open channel message. Only the
|
||||
// fundingEndpoint should be used.
|
||||
require.NoError(t, msgRouter.RouteMsg(openChanMsg))
|
||||
|
||||
fundingEndpoint.AssertCalled(t, "CanHandle", openChanMsg)
|
||||
commitEndpoint.AssertCalled(t, "CanHandle", openChanMsg)
|
||||
|
||||
fundingEndpoint.AssertCalled(t, "SendMessage", openChanMsg)
|
||||
commitEndpoint.AssertNotCalled(t, "SendMessage", openChanMsg)
|
||||
|
||||
// We'll do the same for the commit sig message.
|
||||
require.NoError(t, msgRouter.RouteMsg(commitSigMsg))
|
||||
|
||||
fundingEndpoint.AssertCalled(t, "CanHandle", commitSigMsg)
|
||||
commitEndpoint.AssertCalled(t, "CanHandle", commitSigMsg)
|
||||
|
||||
commitEndpoint.AssertCalled(t, "SendMessage", commitSigMsg)
|
||||
fundingEndpoint.AssertNotCalled(t, "SendMessage", commitSigMsg)
|
||||
})
|
||||
|
||||
t.Run("remove endpoints", func(t *testing.T) {
|
||||
// Finally, we'll remove both endpoints.
|
||||
require.NoError(
|
||||
t, msgRouter.UnregisterEndpoint(fundingEndpointName),
|
||||
)
|
||||
require.NoError(
|
||||
t, msgRouter.UnregisterEndpoint(commitEndpointName),
|
||||
)
|
||||
|
||||
endpoints, err := msgRouter.endpoints().Unpack()
|
||||
require.NoError(t, err)
|
||||
|
||||
// There should be no endpoints registered.
|
||||
require.Len(t, endpoints, 0)
|
||||
|
||||
// Trying to route a message should fail.
|
||||
require.ErrorIs(
|
||||
t, msgRouter.RouteMsg(openChanMsg),
|
||||
ErrUnableToRouteMsg,
|
||||
)
|
||||
require.ErrorIs(
|
||||
t, msgRouter.RouteMsg(commitSigMsg),
|
||||
ErrUnableToRouteMsg,
|
||||
)
|
||||
})
|
||||
|
||||
commitEndpoint.AssertExpectations(t)
|
||||
fundingEndpoint.AssertExpectations(t)
|
||||
}
|
Loading…
Reference in New Issue
Block a user