lnrpc: send custom message

This commit is contained in:
Joost Jager 2021-05-31 10:03:47 +02:00
parent 5d7e814ea8
commit ae959b16ae
No known key found for this signature in database
GPG key ID: A61B9D4C393C59C7
14 changed files with 4451 additions and 3825 deletions

53
cmd/lncli/cmd_custom.go Normal file
View file

@ -0,0 +1,53 @@
package main
import (
"encoding/hex"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/urfave/cli"
)
var sendCustomCommand = cli.Command{
Name: "sendcustom",
Flags: []cli.Flag{
cli.StringFlag{
Name: "peer",
},
cli.Uint64Flag{
Name: "type",
},
cli.StringFlag{
Name: "data",
},
},
Action: actionDecorator(sendCustom),
}
func sendCustom(ctx *cli.Context) error {
ctxc := getContext()
client, cleanUp := getClient(ctx)
defer cleanUp()
peer, err := hex.DecodeString(ctx.String("peer"))
if err != nil {
return err
}
msgType := ctx.Uint64("type")
data, err := hex.DecodeString(ctx.String("data"))
if err != nil {
return err
}
_, err = client.SendCustomMessage(
ctxc,
&lnrpc.SendCustomMessageRequest{
Peer: peer,
Type: uint32(msgType),
Data: data,
},
)
return err
}

View file

@ -384,6 +384,7 @@ func main() {
profileSubCommand,
getStateCommand,
deletePaymentsCommand,
sendCustomCommand,
}
// Add any extra commands determined by build flags.

File diff suppressed because it is too large Load diff

View file

@ -2303,6 +2303,40 @@ func request_Lightning_RegisterRPCMiddleware_0(ctx context.Context, marshaler ru
return stream, metadata, nil
}
func request_Lightning_SendCustomMessage_0(ctx context.Context, marshaler runtime.Marshaler, client LightningClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq SendCustomMessageRequest
var metadata runtime.ServerMetadata
newReader, berr := utilities.IOReaderFactory(req.Body)
if berr != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr)
}
if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.SendCustomMessage(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Lightning_SendCustomMessage_0(ctx context.Context, marshaler runtime.Marshaler, server LightningServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq SendCustomMessageRequest
var metadata runtime.ServerMetadata
newReader, berr := utilities.IOReaderFactory(req.Body)
if berr != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr)
}
if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := server.SendCustomMessage(ctx, &protoReq)
return msg, metadata, err
}
// RegisterLightningHandlerServer registers the http handlers for service Lightning to "mux".
// UnaryRPC :call LightningServer directly.
// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906.
@ -3559,6 +3593,29 @@ func RegisterLightningHandlerServer(ctx context.Context, mux *runtime.ServeMux,
return
})
mux.Handle("POST", pattern_Lightning_SendCustomMessage_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/lnrpc.Lightning/SendCustomMessage", runtime.WithHTTPPathPattern("/v1/custommessage"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Lightning_SendCustomMessage_0(rctx, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
forward_Lightning_SendCustomMessage_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
@ -4840,6 +4897,26 @@ func RegisterLightningHandlerClient(ctx context.Context, mux *runtime.ServeMux,
})
mux.Handle("POST", pattern_Lightning_SendCustomMessage_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateContext(ctx, mux, req, "/lnrpc.Lightning/SendCustomMessage", runtime.WithHTTPPathPattern("/v1/custommessage"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Lightning_SendCustomMessage_0(rctx, inboundMarshaler, client, req, pathParams)
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
forward_Lightning_SendCustomMessage_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
@ -4967,6 +5044,8 @@ var (
pattern_Lightning_CheckMacaroonPermissions_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v1", "macaroon", "checkpermissions"}, ""))
pattern_Lightning_RegisterRPCMiddleware_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "middleware"}, ""))
pattern_Lightning_SendCustomMessage_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "custommessage"}, ""))
)
var (
@ -5093,4 +5172,6 @@ var (
forward_Lightning_CheckMacaroonPermissions_0 = runtime.ForwardResponseMessage
forward_Lightning_RegisterRPCMiddleware_0 = runtime.ForwardResponseStream
forward_Lightning_SendCustomMessage_0 = runtime.ForwardResponseMessage
)

View file

@ -1633,4 +1633,29 @@ func RegisterLightningJSONCallbacks(registry map[string]func(ctx context.Context
}
callback(string(respBytes), nil)
}
registry["lnrpc.Lightning.SendCustomMessage"] = func(ctx context.Context,
conn *grpc.ClientConn, reqJSON string, callback func(string, error)) {
req := &SendCustomMessageRequest{}
err := marshaler.Unmarshal([]byte(reqJSON), req)
if err != nil {
callback("", err)
return
}
client := NewLightningClient(conn)
resp, err := client.SendCustomMessage(ctx, req)
if err != nil {
callback("", err)
return
}
respBytes, err := marshaler.Marshal(resp)
if err != nil {
callback("", err)
return
}
callback(string(respBytes), nil)
}
}

View file

@ -557,6 +557,26 @@ service Lightning {
*/
rpc RegisterRPCMiddleware (stream RPCMiddlewareResponse)
returns (stream RPCMiddlewareRequest);
/* lncli: `sendcustom`
SendCustomMessage sends a custom peer message.
*/
rpc SendCustomMessage (SendCustomMessageRequest)
returns (SendCustomMessageResponse);
}
message SendCustomMessageRequest {
// Peer to send the message to
bytes peer = 1;
// Message type. This value needs to be in the custom range (>= 32768).
uint32 type = 2;
// Raw message data.
bytes data = 3;
}
message SendCustomMessageResponse {
}
message Utxo {

View file

@ -851,6 +851,39 @@
]
}
},
"/v1/custommessage": {
"post": {
"summary": "lncli: `sendcustom`\nSendCustomMessage sends a custom peer message.",
"operationId": "Lightning_SendCustomMessage",
"responses": {
"200": {
"description": "A successful response.",
"schema": {
"$ref": "#/definitions/lnrpcSendCustomMessageResponse"
}
},
"default": {
"description": "An unexpected error response.",
"schema": {
"$ref": "#/definitions/rpcStatus"
}
}
},
"parameters": [
{
"name": "body",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/lnrpcSendCustomMessageRequest"
}
}
],
"tags": [
"Lightning"
]
}
},
"/v1/debuglevel": {
"post": {
"summary": "lncli: `debuglevel`\nDebugLevel allows a caller to programmatically set the logging verbosity of\nlnd. The logging can be targeted according to a coarse daemon-wide logging\nlevel, or in a granular fashion to specify the logging for a target\nsub-system.",
@ -1702,17 +1735,6 @@
}
}
},
"parameters": [
{
"name": "body",
"description": " (streaming inputs)",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/lnrpcRPCMiddlewareResponse"
}
}
],
"tags": [
"Lightning"
]
@ -5766,24 +5788,6 @@
}
}
},
"lnrpcRPCMiddlewareResponse": {
"type": "object",
"properties": {
"request_id": {
"type": "string",
"format": "uint64",
"description": "The unique ID of the intercepted request that this response refers to. Must\nalways be set when giving feedback to an intercept but is ignored for the\ninitial registration message."
},
"register": {
"$ref": "#/definitions/lnrpcMiddlewareRegistration",
"title": "The registration message identifies the middleware that's being\nregistered in lnd. The registration message must be sent immediately\nafter initiating the RegisterRpcMiddleware stream, otherwise lnd will\ntime out the attempt and terminate the request. NOTE: The middleware\nwill only receive interception messages for requests that contain a\nmacaroon with the custom caveat that the middleware declares it is\nresponsible for handling in the registration message! As a security\nmeasure, _no_ middleware can intercept requests made with _unencumbered_\nmacaroons!"
},
"feedback": {
"$ref": "#/definitions/lnrpcInterceptFeedback",
"description": "The middleware received an interception request and gives feedback to\nit. The request_id indicates what message the feedback refers to."
}
}
},
"lnrpcReadyForPsbtFunding": {
"type": "object",
"properties": {
@ -6008,6 +6012,29 @@
}
}
},
"lnrpcSendCustomMessageRequest": {
"type": "object",
"properties": {
"peer": {
"type": "string",
"format": "byte",
"title": "Peer to send the message to"
},
"type": {
"type": "integer",
"format": "int64",
"description": "Message type. This value needs to be in the custom range (\u003e= 32768)."
},
"data": {
"type": "string",
"format": "byte",
"description": "Raw message data."
}
}
},
"lnrpcSendCustomMessageResponse": {
"type": "object"
},
"lnrpcSendManyRequest": {
"type": "object",
"properties": {

View file

@ -155,4 +155,6 @@ http:
body: "*"
- selector: lnrpc.Lightning.RegisterRPCMiddleware
post: "/v1/middleware"
- selector: lnrpc.Lightning.SendCustomMessage
post: "/v1/custommessage"
body: "*"

View file

@ -403,6 +403,9 @@ type LightningClient interface {
//allowed to modify any responses. As a security measure, _no_ middleware can
//modify responses for requests made with _unencumbered_ macaroons!
RegisterRPCMiddleware(ctx context.Context, opts ...grpc.CallOption) (Lightning_RegisterRPCMiddlewareClient, error)
// lncli: `sendcustom`
//SendCustomMessage sends a custom peer message.
SendCustomMessage(ctx context.Context, in *SendCustomMessageRequest, opts ...grpc.CallOption) (*SendCustomMessageResponse, error)
}
type lightningClient struct {
@ -1254,6 +1257,15 @@ func (x *lightningRegisterRPCMiddlewareClient) Recv() (*RPCMiddlewareRequest, er
return m, nil
}
func (c *lightningClient) SendCustomMessage(ctx context.Context, in *SendCustomMessageRequest, opts ...grpc.CallOption) (*SendCustomMessageResponse, error) {
out := new(SendCustomMessageResponse)
err := c.cc.Invoke(ctx, "/lnrpc.Lightning/SendCustomMessage", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// LightningServer is the server API for Lightning service.
// All implementations must embed UnimplementedLightningServer
// for forward compatibility
@ -1643,6 +1655,9 @@ type LightningServer interface {
//allowed to modify any responses. As a security measure, _no_ middleware can
//modify responses for requests made with _unencumbered_ macaroons!
RegisterRPCMiddleware(Lightning_RegisterRPCMiddlewareServer) error
// lncli: `sendcustom`
//SendCustomMessage sends a custom peer message.
SendCustomMessage(context.Context, *SendCustomMessageRequest) (*SendCustomMessageResponse, error)
mustEmbedUnimplementedLightningServer()
}
@ -1839,6 +1854,9 @@ func (UnimplementedLightningServer) CheckMacaroonPermissions(context.Context, *C
func (UnimplementedLightningServer) RegisterRPCMiddleware(Lightning_RegisterRPCMiddlewareServer) error {
return status.Errorf(codes.Unimplemented, "method RegisterRPCMiddleware not implemented")
}
func (UnimplementedLightningServer) SendCustomMessage(context.Context, *SendCustomMessageRequest) (*SendCustomMessageResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method SendCustomMessage not implemented")
}
func (UnimplementedLightningServer) mustEmbedUnimplementedLightningServer() {}
// UnsafeLightningServer may be embedded to opt out of forward compatibility for this service.
@ -3042,6 +3060,24 @@ func (x *lightningRegisterRPCMiddlewareServer) Recv() (*RPCMiddlewareResponse, e
return m, nil
}
func _Lightning_SendCustomMessage_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(SendCustomMessageRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(LightningServer).SendCustomMessage(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/lnrpc.Lightning/SendCustomMessage",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(LightningServer).SendCustomMessage(ctx, req.(*SendCustomMessageRequest))
}
return interceptor(ctx, in, info, handler)
}
// Lightning_ServiceDesc is the grpc.ServiceDesc for Lightning service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@ -3253,6 +3289,10 @@ var Lightning_ServiceDesc = grpc.ServiceDesc{
MethodName: "CheckMacaroonPermissions",
Handler: _Lightning_CheckMacaroonPermissions_Handler,
},
{
MethodName: "SendCustomMessage",
Handler: _Lightning_SendCustomMessage_Handler,
},
},
Streams: []grpc.StreamDesc{
{

65
lnwire/custom.go Normal file
View file

@ -0,0 +1,65 @@
package lnwire
import (
"bytes"
"errors"
"io"
)
// CustomTypeStart is the start of the custom type range for peer messages as
// defined in BOLT 01.
var CustomTypeStart MessageType = 32768
// Custom represents an application-defined wire message.
type Custom struct {
Type MessageType
Data []byte
}
// A compile time check to ensure FundingCreated implements the lnwire.Message
// interface.
var _ Message = (*Custom)(nil)
// NewCustom instanties a new custom message.
func NewCustom(msgType MessageType, data []byte) (*Custom, error) {
if msgType < CustomTypeStart {
return nil, errors.New("msg type not in custom range")
}
return &Custom{
Type: msgType,
Data: data,
}, nil
}
// Encode serializes the target Custom message into the passed io.Writer
// implementation.
//
// This is part of the lnwire.Message interface.
func (c *Custom) Encode(b *bytes.Buffer, pver uint32) error {
_, err := b.Write(c.Data)
return err
}
// Decode deserializes the serialized Custom message stored in the passed
// io.Reader into the target Custom message.
//
// This is part of the lnwire.Message interface.
func (c *Custom) Decode(r io.Reader, pver uint32) error {
var b bytes.Buffer
if _, err := io.Copy(&b, r); err != nil {
return err
}
c.Data = b.Bytes()
return nil
}
// MsgType returns the uint32 code which uniquely identifies this message as a
// Custom message on the wire.
//
// This is part of the lnwire.Message interface.
func (c *Custom) MsgType() MessageType {
return c.Type
}

View file

@ -2,9 +2,11 @@ package peer
import (
"bytes"
"io/ioutil"
"testing"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
@ -17,6 +19,7 @@ import (
"github.com/lightningnetwork/lnd/lnwallet/chancloser"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/pool"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -1031,3 +1034,95 @@ func genScript(t *testing.T, address string) lnwire.DeliveryAddress {
return script
}
// TestPeerCustomMessage tests custom message exchange between peers.
func TestPeerCustomMessage(t *testing.T) {
t.Parallel()
// Set up node Alice.
alicePath, err := ioutil.TempDir("", "alicedb")
require.NoError(t, err)
dbAlice, err := channeldb.Open(alicePath)
require.NoError(t, err)
aliceKey, err := btcec.NewPrivateKey(btcec.S256())
require.NoError(t, err)
writeBufferPool := pool.NewWriteBuffer(
pool.DefaultWriteBufferGCInterval,
pool.DefaultWriteBufferExpiryInterval,
)
writePool := pool.NewWrite(
writeBufferPool, 1, timeout,
)
require.NoError(t, writePool.Start())
readBufferPool := pool.NewReadBuffer(
pool.DefaultReadBufferGCInterval,
pool.DefaultReadBufferExpiryInterval,
)
readPool := pool.NewRead(
readBufferPool, 1, timeout,
)
require.NoError(t, readPool.Start())
mockConn := newMockConn(t, 1)
remoteKey := [33]byte{8}
notifier := &mock.ChainNotifier{
SpendChan: make(chan *chainntnfs.SpendDetail),
EpochChan: make(chan *chainntnfs.BlockEpoch),
ConfChan: make(chan *chainntnfs.TxConfirmation),
}
alicePeer := NewBrontide(Config{
PubKeyBytes: remoteKey,
ChannelDB: dbAlice.ChannelStateDB(),
Addr: &lnwire.NetAddress{
IdentityKey: aliceKey.PubKey(),
},
PrunePersistentPeerConnection: func([33]byte) {},
Features: lnwire.EmptyFeatureVector(),
LegacyFeatures: lnwire.EmptyFeatureVector(),
WritePool: writePool,
ReadPool: readPool,
Conn: mockConn,
ChainNotifier: notifier,
})
// Set up the init sequence.
go func() {
// Read init message.
<-mockConn.writtenMessages
// Write the init reply message.
initReplyMsg := lnwire.NewInitMessage(
lnwire.NewRawFeatureVector(
lnwire.DataLossProtectRequired,
),
lnwire.NewRawFeatureVector(),
)
var b bytes.Buffer
_, err = lnwire.WriteMessage(&b, initReplyMsg, 0)
assert.NoError(t, err)
mockConn.readMessages <- b.Bytes()
}()
// Start the peer.
require.NoError(t, alicePeer.Start())
// Send a custom message.
customMsg, err := lnwire.NewCustom(lnwire.MessageType(40000), []byte{1, 2, 3})
require.NoError(t, err)
require.NoError(t, alicePeer.SendMessageLazy(false, customMsg))
// Verify that it is passed down to the noise layer correctly.
writtenMsg := <-mockConn.writtenMessages
require.Equal(t, []byte{0x9c, 0x40, 0x1, 0x2, 0x3}, writtenMsg)
}

View file

@ -460,12 +460,16 @@ type mockMessageConn struct {
// writtenMessages is a channel that our mock pushes written messages into.
writtenMessages chan []byte
readMessages chan []byte
curReadMessage []byte
}
func newMockConn(t *testing.T, expectedMessages int) *mockMessageConn {
return &mockMessageConn{
t: t,
writtenMessages: make(chan []byte, expectedMessages),
readMessages: make(chan []byte, 1),
}
}
@ -502,3 +506,16 @@ func (m *mockMessageConn) assertWrite(expected []byte) {
m.t.Fatalf("timeout waiting for write: %v", expected)
}
}
func (m *mockMessageConn) SetReadDeadline(t time.Time) error {
return nil
}
func (m *mockMessageConn) ReadNextHeader() (uint32, error) {
m.curReadMessage = <-m.readMessages
return uint32(len(m.curReadMessage)), nil
}
func (m *mockMessageConn) ReadNextBody(buf []byte) ([]byte, error) {
return m.curReadMessage, nil
}

View file

@ -568,6 +568,10 @@ func MainRPCServerPermissions() map[string][]bakery.Op {
Entity: "macaroon",
Action: "write",
}},
"/lnrpc.Lightning/SendCustomMessage": {{
Entity: "offchain",
Action: "write",
}},
}
}
@ -7319,3 +7323,25 @@ func (r *rpcServer) RegisterRPCMiddleware(
return middleware.Run()
}
// SendCustomMessage sends a custom peer message.
func (r *rpcServer) SendCustomMessage(ctx context.Context, req *lnrpc.SendCustomMessageRequest) (
*lnrpc.SendCustomMessageResponse, error) {
peer, err := route.NewVertexFromBytes(req.Peer)
if err != nil {
return nil, err
}
err = r.server.SendCustomMessage(
peer, lnwire.MessageType(req.Type), req.Data,
)
switch {
case err == ErrPeerNotConnected:
return nil, status.Error(codes.NotFound, err.Error())
case err != nil:
return nil, err
}
return &lnrpc.SendCustomMessageResponse{}, nil
}

View file

@ -4179,6 +4179,35 @@ func (s *server) applyChannelUpdate(update *lnwire.ChannelUpdate) error {
}
}
// SendCustomMessage sends a custom message to the peer with the specified
// pubkey.
func (s *server) SendCustomMessage(peerPub [33]byte, msgType lnwire.MessageType,
data []byte) error {
peer, err := s.FindPeerByPubStr(string(peerPub[:]))
if err != nil {
return err
}
// We'll wait until the peer is active.
select {
case <-peer.ActiveSignal():
case <-peer.QuitSignal():
return fmt.Errorf("peer %x disconnected", peerPub)
case <-s.quit:
return ErrServerShuttingDown
}
msg, err := lnwire.NewCustom(msgType, data)
if err != nil {
return err
}
// Send the message as low-priority. For now we assume that all
// application-defined message are low priority.
return peer.SendMessageLazy(true, msg)
}
// newSweepPkScriptGen creates closure that generates a new public key script
// which should be used to sweep any funds into the on-chain wallet.
// Specifically, the script generated is a version 0, pay-to-witness-pubkey-hash