lnrpc: receive custom message

This commit is contained in:
Joost Jager 2021-05-31 12:06:48 +02:00
parent ae959b16ae
commit ade50d0b2c
No known key found for this signature in database
GPG key ID: A61B9D4C393C59C7
16 changed files with 4406 additions and 3836 deletions

View file

@ -2,6 +2,7 @@ package main
import ( import (
"encoding/hex" "encoding/hex"
"fmt"
"github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc"
"github.com/urfave/cli" "github.com/urfave/cli"
@ -51,3 +52,32 @@ func sendCustom(ctx *cli.Context) error {
return err return err
} }
var subscribeCustomCommand = cli.Command{
Name: "subscribecustom",
Action: actionDecorator(subscribeCustom),
}
func subscribeCustom(ctx *cli.Context) error {
ctxc := getContext()
client, cleanUp := getClient(ctx)
defer cleanUp()
stream, err := client.SubscribeCustomMessages(
ctxc,
&lnrpc.SubscribeCustomMessagesRequest{},
)
if err != nil {
return err
}
for {
msg, err := stream.Recv()
if err != nil {
return err
}
fmt.Printf("Received from peer %x: type=%d, data=%x\n",
msg.Peer, msg.Type, msg.Data)
}
}

View file

@ -385,6 +385,7 @@ func main() {
getStateCommand, getStateCommand,
deletePaymentsCommand, deletePaymentsCommand,
sendCustomCommand, sendCustomCommand,
subscribeCustomCommand,
} }
// Add any extra commands determined by build flags. // Add any extra commands determined by build flags.

View file

@ -205,7 +205,23 @@ If you use a strange system or changed group membership of the group running LND
you may want to check your system to see if it introduces additional risk for you may want to check your system to see if it introduces additional risk for
you. you.
## Safety ## Custom peer messages
Lightning nodes have a connection to each of their peers for exchanging
messages. In regular operation, these messages coordinate processes such as
channel opening and payment forwarding.
The lightning spec however also defines a custom range (>= 32768) for
experimental and application-specific peer messages.
With this release, [custom peer message
exchange](https://github.com/lightningnetwork/lnd/pull/5346) is added to open up
a range of new possibilities. Custom peer messages allow the lightning protocol
with its transport mechanisms (including tor) and public key authentication to
be leveraged for application-level communication. Note that peers exchange these
messages directly. There is no routing/path finding involved.
# Safety
* Locally force closed channels are now [kept in the channel.backup file until * Locally force closed channels are now [kept in the channel.backup file until
their time lock has fully matured](https://github.com/lightningnetwork/lnd/pull/5528). their time lock has fully matured](https://github.com/lightningnetwork/lnd/pull/5528).
@ -504,6 +520,7 @@ change](https://github.com/lightningnetwork/lnd/pull/5613).
* Hampus Sjöberg * Hampus Sjöberg
* Harsha Goli * Harsha Goli
* Jesse de Wit * Jesse de Wit
* Joost Jager
* Martin Habovstiak * Martin Habovstiak
* Naveen Srinivasan * Naveen Srinivasan
* Oliver Gugger * Oliver Gugger

File diff suppressed because it is too large Load diff

View file

@ -2337,6 +2337,23 @@ func local_request_Lightning_SendCustomMessage_0(ctx context.Context, marshaler
} }
func request_Lightning_SubscribeCustomMessages_0(ctx context.Context, marshaler runtime.Marshaler, client LightningClient, req *http.Request, pathParams map[string]string) (Lightning_SubscribeCustomMessagesClient, runtime.ServerMetadata, error) {
var protoReq SubscribeCustomMessagesRequest
var metadata runtime.ServerMetadata
stream, err := client.SubscribeCustomMessages(ctx, &protoReq)
if err != nil {
return nil, metadata, err
}
header, err := stream.Header()
if err != nil {
return nil, metadata, err
}
metadata.HeaderMD = header
return stream, metadata, nil
}
// RegisterLightningHandlerServer registers the http handlers for service Lightning to "mux". // RegisterLightningHandlerServer registers the http handlers for service Lightning to "mux".
// UnaryRPC :call LightningServer directly. // UnaryRPC :call LightningServer directly.
// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906. // StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906.
@ -3616,6 +3633,13 @@ func RegisterLightningHandlerServer(ctx context.Context, mux *runtime.ServeMux,
}) })
mux.Handle("GET", pattern_Lightning_SubscribeCustomMessages_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
err := status.Error(codes.Unimplemented, "streaming calls are not yet supported in the in-process transport")
_, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
})
return nil return nil
} }
@ -4917,6 +4941,26 @@ func RegisterLightningHandlerClient(ctx context.Context, mux *runtime.ServeMux,
}) })
mux.Handle("GET", pattern_Lightning_SubscribeCustomMessages_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateContext(ctx, mux, req, "/lnrpc.Lightning/SubscribeCustomMessages", runtime.WithHTTPPathPattern("/v1/custommessage/subscribe"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Lightning_SubscribeCustomMessages_0(rctx, inboundMarshaler, client, req, pathParams)
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
forward_Lightning_SubscribeCustomMessages_0(ctx, mux, outboundMarshaler, w, req, func() (proto.Message, error) { return resp.Recv() }, mux.GetForwardResponseOptions()...)
})
return nil return nil
} }
@ -5046,6 +5090,8 @@ var (
pattern_Lightning_RegisterRPCMiddleware_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "middleware"}, "")) pattern_Lightning_RegisterRPCMiddleware_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "middleware"}, ""))
pattern_Lightning_SendCustomMessage_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "custommessage"}, "")) pattern_Lightning_SendCustomMessage_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "custommessage"}, ""))
pattern_Lightning_SubscribeCustomMessages_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v1", "custommessage", "subscribe"}, ""))
) )
var ( var (
@ -5174,4 +5220,6 @@ var (
forward_Lightning_RegisterRPCMiddleware_0 = runtime.ForwardResponseStream forward_Lightning_RegisterRPCMiddleware_0 = runtime.ForwardResponseStream
forward_Lightning_SendCustomMessage_0 = runtime.ForwardResponseMessage forward_Lightning_SendCustomMessage_0 = runtime.ForwardResponseMessage
forward_Lightning_SubscribeCustomMessages_0 = runtime.ForwardResponseStream
) )

View file

@ -1658,4 +1658,46 @@ func RegisterLightningJSONCallbacks(registry map[string]func(ctx context.Context
} }
callback(string(respBytes), nil) callback(string(respBytes), nil)
} }
registry["lnrpc.Lightning.SubscribeCustomMessages"] = func(ctx context.Context,
conn *grpc.ClientConn, reqJSON string, callback func(string, error)) {
req := &SubscribeCustomMessagesRequest{}
err := marshaler.Unmarshal([]byte(reqJSON), req)
if err != nil {
callback("", err)
return
}
client := NewLightningClient(conn)
stream, err := client.SubscribeCustomMessages(ctx, req)
if err != nil {
callback("", err)
return
}
go func() {
for {
select {
case <-stream.Context().Done():
callback("", stream.Context().Err())
return
default:
}
resp, err := stream.Recv()
if err != nil {
callback("", err)
return
}
respBytes, err := marshaler.Marshal(resp)
if err != nil {
callback("", err)
return
}
callback(string(respBytes), nil)
}
}()
}
} }

View file

@ -563,6 +563,27 @@ service Lightning {
*/ */
rpc SendCustomMessage (SendCustomMessageRequest) rpc SendCustomMessage (SendCustomMessageRequest)
returns (SendCustomMessageResponse); returns (SendCustomMessageResponse);
/* lncli: `subscribecustom`
SubscribeCustomMessages subscribes to a stream of incoming custom peer
messages.
*/
rpc SubscribeCustomMessages (SubscribeCustomMessagesRequest)
returns (stream CustomMessage);
}
message SubscribeCustomMessagesRequest {
}
message CustomMessage {
// Peer from which the message originates
bytes peer = 1;
// Message type. This value will be in the custom range (>= 32768).
uint32 type = 2;
// Raw message data
bytes data = 3;
} }
message SendCustomMessageRequest { message SendCustomMessageRequest {

View file

@ -884,6 +884,38 @@
] ]
} }
}, },
"/v1/custommessage/subscribe": {
"get": {
"summary": "lncli: `subscribecustom`\nSubscribeCustomMessages subscribes to a stream of incoming custom peer\nmessages.",
"operationId": "Lightning_SubscribeCustomMessages",
"responses": {
"200": {
"description": "A successful response.(streaming responses)",
"schema": {
"type": "object",
"properties": {
"result": {
"$ref": "#/definitions/lnrpcCustomMessage"
},
"error": {
"$ref": "#/definitions/rpcStatus"
}
},
"title": "Stream result of lnrpcCustomMessage"
}
},
"default": {
"description": "An unexpected error response.",
"schema": {
"$ref": "#/definitions/rpcStatus"
}
}
},
"tags": [
"Lightning"
]
}
},
"/v1/debuglevel": { "/v1/debuglevel": {
"post": { "post": {
"summary": "lncli: `debuglevel`\nDebugLevel allows a caller to programmatically set the logging verbosity of\nlnd. The logging can be targeted according to a coarse daemon-wide logging\nlevel, or in a granular fashion to specify the logging for a target\nsub-system.", "summary": "lncli: `debuglevel`\nDebugLevel allows a caller to programmatically set the logging verbosity of\nlnd. The logging can be targeted according to a coarse daemon-wide logging\nlevel, or in a granular fashion to specify the logging for a target\nsub-system.",
@ -3830,6 +3862,26 @@
"lnrpcConnectPeerResponse": { "lnrpcConnectPeerResponse": {
"type": "object" "type": "object"
}, },
"lnrpcCustomMessage": {
"type": "object",
"properties": {
"peer": {
"type": "string",
"format": "byte",
"title": "Peer from which the message originates"
},
"type": {
"type": "integer",
"format": "int64",
"description": "Message type. This value will be in the custom range (\u003e= 32768)."
},
"data": {
"type": "string",
"format": "byte",
"title": "Raw message data"
}
}
},
"lnrpcDebugLevelRequest": { "lnrpcDebugLevelRequest": {
"type": "object", "type": "object",
"properties": { "properties": {

View file

@ -158,3 +158,5 @@ http:
- selector: lnrpc.Lightning.SendCustomMessage - selector: lnrpc.Lightning.SendCustomMessage
post: "/v1/custommessage" post: "/v1/custommessage"
body: "*" body: "*"
- selector: lnrpc.Lightning.SubscribeCustomMessages
get: "/v1/custommessage/subscribe"

View file

@ -406,6 +406,10 @@ type LightningClient interface {
// lncli: `sendcustom` // lncli: `sendcustom`
//SendCustomMessage sends a custom peer message. //SendCustomMessage sends a custom peer message.
SendCustomMessage(ctx context.Context, in *SendCustomMessageRequest, opts ...grpc.CallOption) (*SendCustomMessageResponse, error) SendCustomMessage(ctx context.Context, in *SendCustomMessageRequest, opts ...grpc.CallOption) (*SendCustomMessageResponse, error)
// lncli: `subscribecustom`
//SubscribeCustomMessages subscribes to a stream of incoming custom peer
//messages.
SubscribeCustomMessages(ctx context.Context, in *SubscribeCustomMessagesRequest, opts ...grpc.CallOption) (Lightning_SubscribeCustomMessagesClient, error)
} }
type lightningClient struct { type lightningClient struct {
@ -1266,6 +1270,38 @@ func (c *lightningClient) SendCustomMessage(ctx context.Context, in *SendCustomM
return out, nil return out, nil
} }
func (c *lightningClient) SubscribeCustomMessages(ctx context.Context, in *SubscribeCustomMessagesRequest, opts ...grpc.CallOption) (Lightning_SubscribeCustomMessagesClient, error) {
stream, err := c.cc.NewStream(ctx, &Lightning_ServiceDesc.Streams[12], "/lnrpc.Lightning/SubscribeCustomMessages", opts...)
if err != nil {
return nil, err
}
x := &lightningSubscribeCustomMessagesClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Lightning_SubscribeCustomMessagesClient interface {
Recv() (*CustomMessage, error)
grpc.ClientStream
}
type lightningSubscribeCustomMessagesClient struct {
grpc.ClientStream
}
func (x *lightningSubscribeCustomMessagesClient) Recv() (*CustomMessage, error) {
m := new(CustomMessage)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// LightningServer is the server API for Lightning service. // LightningServer is the server API for Lightning service.
// All implementations must embed UnimplementedLightningServer // All implementations must embed UnimplementedLightningServer
// for forward compatibility // for forward compatibility
@ -1658,6 +1694,10 @@ type LightningServer interface {
// lncli: `sendcustom` // lncli: `sendcustom`
//SendCustomMessage sends a custom peer message. //SendCustomMessage sends a custom peer message.
SendCustomMessage(context.Context, *SendCustomMessageRequest) (*SendCustomMessageResponse, error) SendCustomMessage(context.Context, *SendCustomMessageRequest) (*SendCustomMessageResponse, error)
// lncli: `subscribecustom`
//SubscribeCustomMessages subscribes to a stream of incoming custom peer
//messages.
SubscribeCustomMessages(*SubscribeCustomMessagesRequest, Lightning_SubscribeCustomMessagesServer) error
mustEmbedUnimplementedLightningServer() mustEmbedUnimplementedLightningServer()
} }
@ -1857,6 +1897,9 @@ func (UnimplementedLightningServer) RegisterRPCMiddleware(Lightning_RegisterRPCM
func (UnimplementedLightningServer) SendCustomMessage(context.Context, *SendCustomMessageRequest) (*SendCustomMessageResponse, error) { func (UnimplementedLightningServer) SendCustomMessage(context.Context, *SendCustomMessageRequest) (*SendCustomMessageResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method SendCustomMessage not implemented") return nil, status.Errorf(codes.Unimplemented, "method SendCustomMessage not implemented")
} }
func (UnimplementedLightningServer) SubscribeCustomMessages(*SubscribeCustomMessagesRequest, Lightning_SubscribeCustomMessagesServer) error {
return status.Errorf(codes.Unimplemented, "method SubscribeCustomMessages not implemented")
}
func (UnimplementedLightningServer) mustEmbedUnimplementedLightningServer() {} func (UnimplementedLightningServer) mustEmbedUnimplementedLightningServer() {}
// UnsafeLightningServer may be embedded to opt out of forward compatibility for this service. // UnsafeLightningServer may be embedded to opt out of forward compatibility for this service.
@ -3078,6 +3121,27 @@ func _Lightning_SendCustomMessage_Handler(srv interface{}, ctx context.Context,
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _Lightning_SubscribeCustomMessages_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(SubscribeCustomMessagesRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(LightningServer).SubscribeCustomMessages(m, &lightningSubscribeCustomMessagesServer{stream})
}
type Lightning_SubscribeCustomMessagesServer interface {
Send(*CustomMessage) error
grpc.ServerStream
}
type lightningSubscribeCustomMessagesServer struct {
grpc.ServerStream
}
func (x *lightningSubscribeCustomMessagesServer) Send(m *CustomMessage) error {
return x.ServerStream.SendMsg(m)
}
// Lightning_ServiceDesc is the grpc.ServiceDesc for Lightning service. // Lightning_ServiceDesc is the grpc.ServiceDesc for Lightning service.
// It's only intended for direct use with grpc.RegisterService, // It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy) // and not to be introspected or modified (even as a copy)
@ -3359,6 +3423,11 @@ var Lightning_ServiceDesc = grpc.ServiceDesc{
ServerStreams: true, ServerStreams: true,
ClientStreams: true, ClientStreams: true,
}, },
{
StreamName: "SubscribeCustomMessages",
Handler: _Lightning_SubscribeCustomMessages_Handler,
ServerStreams: true,
},
}, },
Metadata: "lightning.proto", Metadata: "lightning.proto",
} }

View file

@ -240,7 +240,7 @@ func TestMaxOutPointIndex(t *testing.T) {
func TestEmptyMessageUnknownType(t *testing.T) { func TestEmptyMessageUnknownType(t *testing.T) {
t.Parallel() t.Parallel()
fakeType := MessageType(math.MaxUint16) fakeType := CustomTypeStart - 1
if _, err := makeEmptyMessage(fakeType); err == nil { if _, err := makeEmptyMessage(fakeType); err == nil {
t.Fatalf("should not be able to make an empty message of an " + t.Fatalf("should not be able to make an empty message of an " +
"unknown type") "unknown type")

View file

@ -233,7 +233,12 @@ func makeEmptyMessage(msgType MessageType) (Message, error) {
case MsgGossipTimestampRange: case MsgGossipTimestampRange:
msg = &GossipTimestampRange{} msg = &GossipTimestampRange{}
default: default:
return nil, &UnknownMessage{msgType} if msgType < CustomTypeStart {
return nil, &UnknownMessage{msgType}
}
msg = &Custom{
Type: msgType,
}
} }
return msg, nil return msg, nil

View file

@ -94,6 +94,11 @@ type newChannelMsg struct {
err chan error err chan error
} }
type customMsg struct {
peer [33]byte
msg lnwire.Custom
}
// closeMsg is a wrapper struct around any wire messages that deal with the // closeMsg is a wrapper struct around any wire messages that deal with the
// cooperative channel closure negotiation process. This struct includes the // cooperative channel closure negotiation process. This struct includes the
// raw channel ID targeted along with the original message. // raw channel ID targeted along with the original message.
@ -318,6 +323,10 @@ type Config struct {
// that is accumulated before signing a new commitment. // that is accumulated before signing a new commitment.
ChannelCommitBatchSize uint32 ChannelCommitBatchSize uint32
// HandleCustomMessage is called whenever a custom message is received
// from the peer.
HandleCustomMessage func(peer [33]byte, msg *lnwire.Custom) error
// Quit is the server's quit channel. If this is closed, we halt operation. // Quit is the server's quit channel. If this is closed, we halt operation.
Quit chan struct{} Quit chan struct{}
} }
@ -1449,6 +1458,13 @@ out:
discStream.AddMsg(msg) discStream.AddMsg(msg)
case *lnwire.Custom:
err := p.handleCustomMessage(msg)
if err != nil {
p.storeError(err)
peerLog.Errorf("peer: %v, %v", p, err)
}
default: default:
// If the message we received is unknown to us, store // If the message we received is unknown to us, store
// the type to track the failure. // the type to track the failure.
@ -1486,6 +1502,17 @@ out:
peerLog.Tracef("readHandler for peer %v done", p) peerLog.Tracef("readHandler for peer %v done", p)
} }
// handleCustomMessage handles the given custom message if a handler is
// registered.
func (p *Brontide) handleCustomMessage(msg *lnwire.Custom) error {
if p.cfg.HandleCustomMessage == nil {
return fmt.Errorf("no custom message handler for "+
"message type %v", uint16(msg.MsgType()))
}
return p.cfg.HandleCustomMessage(p.PubKey(), msg)
}
// isActiveChannel returns true if the provided channel id is active, otherwise // isActiveChannel returns true if the provided channel id is active, otherwise
// returns false. // returns false.
func (p *Brontide) isActiveChannel(chanID lnwire.ChannelID) bool { func (p *Brontide) isActiveChannel(chanID lnwire.ChannelID) bool {
@ -1686,6 +1713,8 @@ func messageSummary(msg lnwire.Message) string {
time.Unix(int64(msg.FirstTimestamp), 0), time.Unix(int64(msg.FirstTimestamp), 0),
msg.TimestampRange) msg.TimestampRange)
case *lnwire.Custom:
return fmt.Sprintf("type=%d", msg.Type)
} }
return "" return ""
@ -1714,8 +1743,15 @@ func (p *Brontide) logWireMessage(msg lnwire.Message, read bool) {
preposition = "from" preposition = "from"
} }
var msgType string
if msg.MsgType() < lnwire.CustomTypeStart {
msgType = msg.MsgType().String()
} else {
msgType = "custom"
}
return fmt.Sprintf("%v %v%s %v %s", summaryPrefix, return fmt.Sprintf("%v %v%s %v %s", summaryPrefix,
msg.MsgType(), summary, preposition, p) msgType, summary, preposition, p)
})) }))
switch m := msg.(type) { switch m := msg.(type) {

View file

@ -1071,6 +1071,8 @@ func TestPeerCustomMessage(t *testing.T) {
mockConn := newMockConn(t, 1) mockConn := newMockConn(t, 1)
receivedCustomChan := make(chan *customMsg)
remoteKey := [33]byte{8} remoteKey := [33]byte{8}
notifier := &mock.ChainNotifier{ notifier := &mock.ChainNotifier{
@ -1092,6 +1094,15 @@ func TestPeerCustomMessage(t *testing.T) {
ReadPool: readPool, ReadPool: readPool,
Conn: mockConn, Conn: mockConn,
ChainNotifier: notifier, ChainNotifier: notifier,
HandleCustomMessage: func(
peer [33]byte, msg *lnwire.Custom) error {
receivedCustomChan <- &customMsg{
peer: peer,
msg: *msg,
}
return nil
},
}) })
// Set up the init sequence. // Set up the init sequence.
@ -1117,7 +1128,9 @@ func TestPeerCustomMessage(t *testing.T) {
require.NoError(t, alicePeer.Start()) require.NoError(t, alicePeer.Start())
// Send a custom message. // Send a custom message.
customMsg, err := lnwire.NewCustom(lnwire.MessageType(40000), []byte{1, 2, 3}) customMsg, err := lnwire.NewCustom(
lnwire.MessageType(40000), []byte{1, 2, 3},
)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, alicePeer.SendMessageLazy(false, customMsg)) require.NoError(t, alicePeer.SendMessageLazy(false, customMsg))
@ -1125,4 +1138,18 @@ func TestPeerCustomMessage(t *testing.T) {
// Verify that it is passed down to the noise layer correctly. // Verify that it is passed down to the noise layer correctly.
writtenMsg := <-mockConn.writtenMessages writtenMsg := <-mockConn.writtenMessages
require.Equal(t, []byte{0x9c, 0x40, 0x1, 0x2, 0x3}, writtenMsg) require.Equal(t, []byte{0x9c, 0x40, 0x1, 0x2, 0x3}, writtenMsg)
// Receive a custom message.
receivedCustomMsg, err := lnwire.NewCustom(
lnwire.MessageType(40001), []byte{4, 5, 6},
)
require.NoError(t, err)
receivedData := []byte{0x9c, 0x41, 0x4, 0x5, 0x6}
mockConn.readMessages <- receivedData
// Verify that it is propagated up to the custom message handler.
receivedCustom := <-receivedCustomChan
require.Equal(t, remoteKey, receivedCustom.peer)
require.Equal(t, receivedCustomMsg, &receivedCustom.msg)
} }

View file

@ -572,6 +572,10 @@ func MainRPCServerPermissions() map[string][]bakery.Op {
Entity: "offchain", Entity: "offchain",
Action: "write", Action: "write",
}}, }},
"/lnrpc.Lightning/SubscribeCustomMessages": {{
Entity: "offchain",
Action: "read",
}},
} }
} }
@ -7345,3 +7349,37 @@ func (r *rpcServer) SendCustomMessage(ctx context.Context, req *lnrpc.SendCustom
return &lnrpc.SendCustomMessageResponse{}, nil return &lnrpc.SendCustomMessageResponse{}, nil
} }
// SubscribeCustomMessages subscribes to a stream of incoming custom peer
// messages.
func (r *rpcServer) SubscribeCustomMessages(req *lnrpc.SubscribeCustomMessagesRequest,
server lnrpc.Lightning_SubscribeCustomMessagesServer) error {
client, err := r.server.SubscribeCustomMessages()
if err != nil {
return err
}
defer client.Cancel()
for {
select {
case <-client.Quit():
return errors.New("shutdown")
case <-server.Context().Done():
return server.Context().Err()
case update := <-client.Updates():
customMsg := update.(*CustomMessage)
err := server.Send(&lnrpc.CustomMessage{
Peer: customMsg.Peer[:],
Data: customMsg.Msg.Data,
Type: uint32(customMsg.Msg.Type),
})
if err != nil {
return err
}
}
}
}

View file

@ -307,6 +307,8 @@ type server struct {
// livelinessMonitor monitors that lnd has access to critical resources. // livelinessMonitor monitors that lnd has access to critical resources.
livelinessMonitor *healthcheck.Monitor livelinessMonitor *healthcheck.Monitor
customMessageServer *subscribe.Server
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
@ -395,6 +397,15 @@ func (s *server) updatePersistentPeerAddrs() error {
return nil return nil
} }
// CustomMessage is a custom message that is received from a peer.
type CustomMessage struct {
// Peer is the peer pubkey
Peer [33]byte
// Msg is the custom wire message.
Msg *lnwire.Custom
}
// parseAddr parses an address from its string format to a net.Addr. // parseAddr parses an address from its string format to a net.Addr.
func parseAddr(address string, netCfg tor.Net) (net.Addr, error) { func parseAddr(address string, netCfg tor.Net) (net.Addr, error) {
var ( var (
@ -568,6 +579,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer), peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer),
peerDisconnectedListeners: make(map[string][]chan<- struct{}), peerDisconnectedListeners: make(map[string][]chan<- struct{}),
customMessageServer: subscribe.NewServer(),
featureMgr: featureMgr, featureMgr: featureMgr,
quit: make(chan struct{}), quit: make(chan struct{}),
} }
@ -1637,6 +1650,11 @@ func (s *server) Start() error {
cleanup := cleaner{} cleanup := cleaner{}
s.start.Do(func() { s.start.Do(func() {
if err := s.customMessageServer.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.customMessageServer.Stop)
if s.hostAnn != nil { if s.hostAnn != nil {
if err := s.hostAnn.Start(); err != nil { if err := s.hostAnn.Start(); err != nil {
@ -3336,6 +3354,24 @@ func (s *server) cancelConnReqs(pubStr string, skip *uint64) {
delete(s.persistentConnReqs, pubStr) delete(s.persistentConnReqs, pubStr)
} }
// handleCustomMessage dispatches an incoming custom peers message to
// subscribers.
func (s *server) handleCustomMessage(peer [33]byte, msg *lnwire.Custom) error {
srvrLog.Debugf("Custom message received: peer=%x, type=%d",
peer, msg.Type)
return s.customMessageServer.SendUpdate(&CustomMessage{
Peer: peer,
Msg: msg,
})
}
// SubscribeCustomMessages subscribes to a stream of incoming custom peer
// messages.
func (s *server) SubscribeCustomMessages() (*subscribe.Client, error) {
return s.customMessageServer.Subscribe()
}
// peerConnected is a function that handles initialization a newly connected // peerConnected is a function that handles initialization a newly connected
// peer by adding it to the server's global list of all active peers, and // peer by adding it to the server's global list of all active peers, and
// starting all the goroutines the peer needs to function properly. The inbound // starting all the goroutines the peer needs to function properly. The inbound
@ -3431,6 +3467,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
s.cfg.MaxCommitFeeRateAnchors * 1000).FeePerKWeight(), s.cfg.MaxCommitFeeRateAnchors * 1000).FeePerKWeight(),
ChannelCommitInterval: s.cfg.ChannelCommitInterval, ChannelCommitInterval: s.cfg.ChannelCommitInterval,
ChannelCommitBatchSize: s.cfg.ChannelCommitBatchSize, ChannelCommitBatchSize: s.cfg.ChannelCommitBatchSize,
HandleCustomMessage: s.handleCustomMessage,
Quit: s.quit, Quit: s.quit,
} }