mirror of
https://github.com/lightningnetwork/lnd.git
synced 2024-11-19 09:53:54 +01:00
multi: registration complete MW interceptor msg
In this commit, we change the flow of the rpc middleware registration a bit. In order to allow a client to add rpc middleware interceptors in a deterministic order, we now make the server send a "registration complete" message to the client after compeleting the registration process so that the client knows when it can go ahead and register the next client.
This commit is contained in:
parent
95a6425189
commit
8a17009afa
@ -64,6 +64,10 @@
|
||||
true so the payment won't be failed unless a terminal error has occurred,
|
||||
which is useful for constructing MPP.
|
||||
|
||||
* [Add a message to the RPC MW registration
|
||||
flow](https://github.com/lightningnetwork/lnd/pull/6754) so that the server
|
||||
can indicate to the client that it has completed the RPC MW registration.
|
||||
|
||||
## Bug Fixes
|
||||
|
||||
* Fixed data race found in
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -4379,6 +4379,14 @@ message RPCMiddlewareRequest {
|
||||
the same type, or replaced by an error message.
|
||||
*/
|
||||
RPCMessage response = 6;
|
||||
|
||||
/*
|
||||
This is used to indicate to the client that the server has successfully
|
||||
registered the interceptor. This is only used in the very first message
|
||||
that the server sends to the client after the client sends the server
|
||||
the middleware registration message.
|
||||
*/
|
||||
bool reg_complete = 8;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -5995,6 +5995,10 @@
|
||||
"$ref": "#/definitions/lnrpcRPCMessage",
|
||||
"description": "Intercept outgoing gRPC response message: all outgoing messages, both on\nstreaming and unary RPCs, are forwarded to the middleware for inspection\nand amendment. The response in this message is the original response as\nit was generated by the main RPC server. It can either be accepted\n(=forwarded to the client), replaced/overwritten with a new message of\nthe same type, or replaced by an error message."
|
||||
},
|
||||
"reg_complete": {
|
||||
"type": "boolean",
|
||||
"description": "This is used to indicate to the client that the server has successfully\nregistered the interceptor. This is only used in the very first message\nthat the server sends to the client after the client sends the server\nthe middleware registration message."
|
||||
},
|
||||
"msg_id": {
|
||||
"type": "string",
|
||||
"format": "uint64",
|
||||
|
@ -68,7 +68,7 @@ func testRPCMiddlewareInterceptor(net *lntest.NetworkHarness, t *harnessTest) {
|
||||
tt, net.Alice, &lnrpc.MiddlewareRegistration{
|
||||
MiddlewareName: "itest-interceptor",
|
||||
ReadOnlyMode: true,
|
||||
},
|
||||
}, true,
|
||||
)
|
||||
defer registration.cancel()
|
||||
|
||||
@ -86,7 +86,7 @@ func testRPCMiddlewareInterceptor(net *lntest.NetworkHarness, t *harnessTest) {
|
||||
tt, net.Alice, &lnrpc.MiddlewareRegistration{
|
||||
MiddlewareName: "itest-interceptor",
|
||||
CustomMacaroonCaveatName: "itest-caveat",
|
||||
},
|
||||
}, true,
|
||||
)
|
||||
defer registration.cancel()
|
||||
|
||||
@ -103,7 +103,7 @@ func testRPCMiddlewareInterceptor(net *lntest.NetworkHarness, t *harnessTest) {
|
||||
tt, net.Alice, &lnrpc.MiddlewareRegistration{
|
||||
MiddlewareName: "itest-interceptor",
|
||||
ReadOnlyMode: true,
|
||||
},
|
||||
}, true,
|
||||
)
|
||||
defer registration.cancel()
|
||||
|
||||
@ -120,7 +120,7 @@ func testRPCMiddlewareInterceptor(net *lntest.NetworkHarness, t *harnessTest) {
|
||||
tt, net.Alice, &lnrpc.MiddlewareRegistration{
|
||||
MiddlewareName: "itest-interceptor",
|
||||
CustomMacaroonCaveatName: "itest-caveat",
|
||||
},
|
||||
}, true,
|
||||
)
|
||||
defer registration.cancel()
|
||||
|
||||
@ -174,7 +174,7 @@ func middlewareRegistrationRestrictionTests(t *testing.T,
|
||||
|
||||
t.Run(fmt.Sprintf("%d", idx), func(tt *testing.T) {
|
||||
invalidName := registerMiddleware(
|
||||
tt, node, tc.registration,
|
||||
tt, node, tc.registration, false,
|
||||
)
|
||||
_, err := invalidName.stream.Recv()
|
||||
require.Error(tt, err)
|
||||
@ -578,7 +578,7 @@ func middlewareMandatoryTest(t *testing.T, node *lntest.HarnessNode,
|
||||
t, node, &lnrpc.MiddlewareRegistration{
|
||||
MiddlewareName: "itest-interceptor",
|
||||
CustomMacaroonCaveatName: "itest-caveat",
|
||||
},
|
||||
}, true,
|
||||
)
|
||||
defer registration.cancel()
|
||||
|
||||
@ -649,7 +649,8 @@ type middlewareHarness struct {
|
||||
// registerMiddleware creates a new middleware harness and sends the initial
|
||||
// register message to the RPC server.
|
||||
func registerMiddleware(t *testing.T, node *lntest.HarnessNode,
|
||||
registration *lnrpc.MiddlewareRegistration) *middlewareHarness {
|
||||
registration *lnrpc.MiddlewareRegistration,
|
||||
waitForRegister bool) *middlewareHarness {
|
||||
|
||||
ctxc, cancel := context.WithCancel(context.Background())
|
||||
|
||||
@ -663,6 +664,13 @@ func registerMiddleware(t *testing.T, node *lntest.HarnessNode,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
if waitForRegister {
|
||||
// Wait for the registration complete message.
|
||||
regCompleteMsg, err := middlewareStream.Recv()
|
||||
require.NoError(t, err)
|
||||
require.True(t, regCompleteMsg.GetRegComplete())
|
||||
}
|
||||
|
||||
return &middlewareHarness{
|
||||
t: t,
|
||||
cancel: cancel,
|
||||
|
39
rpcserver.go
39
rpcserver.go
@ -7504,8 +7504,9 @@ func (r *rpcServer) RegisterRPCMiddleware(
|
||||
// middleware must be a registration message containing its name and the
|
||||
// custom caveat it wants to register for.
|
||||
var (
|
||||
registerChan = make(chan *lnrpc.MiddlewareRegistration, 1)
|
||||
errChan = make(chan error, 1)
|
||||
registerChan = make(chan *lnrpc.MiddlewareRegistration, 1)
|
||||
registerDoneChan = make(chan struct{})
|
||||
errChan = make(chan error, 1)
|
||||
)
|
||||
ctxc, cancel := context.WithTimeout(
|
||||
stream.Context(), r.cfg.RPCMiddleware.InterceptTimeout,
|
||||
@ -7580,6 +7581,40 @@ func (r *rpcServer) RegisterRPCMiddleware(
|
||||
}
|
||||
defer r.interceptorChain.RemoveMiddleware(registerMsg.MiddlewareName)
|
||||
|
||||
// Send a message to the client to indicate that the registration has
|
||||
// successfully completed.
|
||||
regCompleteMsg := &lnrpc.RPCMiddlewareRequest{
|
||||
InterceptType: &lnrpc.RPCMiddlewareRequest_RegComplete{
|
||||
RegComplete: true,
|
||||
},
|
||||
}
|
||||
|
||||
// Send the message in a goroutine because the Send method blocks until
|
||||
// the message is read by the client.
|
||||
go func() {
|
||||
err := stream.Send(regCompleteMsg)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
|
||||
close(registerDoneChan)
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-errChan:
|
||||
return fmt.Errorf("error sending middleware registration "+
|
||||
"complete message: %v", err)
|
||||
|
||||
case <-ctxc.Done():
|
||||
return ctxc.Err()
|
||||
|
||||
case <-r.quit:
|
||||
return ErrServerShuttingDown
|
||||
|
||||
case <-registerDoneChan:
|
||||
}
|
||||
|
||||
return middleware.Run()
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user