multi: registration complete MW interceptor msg

In this commit, we change the flow of the rpc middleware registration
a bit. In order to allow a client to add rpc middleware interceptors in
a deterministic order, we now make the server send a "registration
complete" message to the client after compeleting the registration
process so that the client knows when it can go ahead and register the
next client.
This commit is contained in:
Elle Mouton 2022-07-19 11:03:14 -05:00
parent 0ac421907a
commit 25acb51ba3
No known key found for this signature in database
GPG Key ID: D7D916376026F177
6 changed files with 566 additions and 485 deletions

View File

@ -65,6 +65,10 @@
true so the payment won't be failed unless a terminal error has occurred,
which is useful for constructing MPP.
* [Add a message to the RPC MW registration
flow](https://github.com/lightningnetwork/lnd/pull/6754) so that the server
can indicate to the client that it has completed the RPC MW registration.
## Bug Fixes
* Fixed data race found in

File diff suppressed because it is too large Load Diff

View File

@ -4379,6 +4379,14 @@ message RPCMiddlewareRequest {
the same type, or replaced by an error message.
*/
RPCMessage response = 6;
/*
This is used to indicate to the client that the server has successfully
registered the interceptor. This is only used in the very first message
that the server sends to the client after the client sends the server
the middleware registration message.
*/
bool reg_complete = 8;
}
/*

View File

@ -5995,6 +5995,10 @@
"$ref": "#/definitions/lnrpcRPCMessage",
"description": "Intercept outgoing gRPC response message: all outgoing messages, both on\nstreaming and unary RPCs, are forwarded to the middleware for inspection\nand amendment. The response in this message is the original response as\nit was generated by the main RPC server. It can either be accepted\n(=forwarded to the client), replaced/overwritten with a new message of\nthe same type, or replaced by an error message."
},
"reg_complete": {
"type": "boolean",
"description": "This is used to indicate to the client that the server has successfully\nregistered the interceptor. This is only used in the very first message\nthat the server sends to the client after the client sends the server\nthe middleware registration message."
},
"msg_id": {
"type": "string",
"format": "uint64",

View File

@ -68,7 +68,7 @@ func testRPCMiddlewareInterceptor(net *lntest.NetworkHarness, t *harnessTest) {
tt, net.Alice, &lnrpc.MiddlewareRegistration{
MiddlewareName: "itest-interceptor",
ReadOnlyMode: true,
},
}, true,
)
defer registration.cancel()
@ -86,7 +86,7 @@ func testRPCMiddlewareInterceptor(net *lntest.NetworkHarness, t *harnessTest) {
tt, net.Alice, &lnrpc.MiddlewareRegistration{
MiddlewareName: "itest-interceptor",
CustomMacaroonCaveatName: "itest-caveat",
},
}, true,
)
defer registration.cancel()
@ -103,7 +103,7 @@ func testRPCMiddlewareInterceptor(net *lntest.NetworkHarness, t *harnessTest) {
tt, net.Alice, &lnrpc.MiddlewareRegistration{
MiddlewareName: "itest-interceptor",
ReadOnlyMode: true,
},
}, true,
)
defer registration.cancel()
@ -120,7 +120,7 @@ func testRPCMiddlewareInterceptor(net *lntest.NetworkHarness, t *harnessTest) {
tt, net.Alice, &lnrpc.MiddlewareRegistration{
MiddlewareName: "itest-interceptor",
CustomMacaroonCaveatName: "itest-caveat",
},
}, true,
)
defer registration.cancel()
@ -174,7 +174,7 @@ func middlewareRegistrationRestrictionTests(t *testing.T,
t.Run(fmt.Sprintf("%d", idx), func(tt *testing.T) {
invalidName := registerMiddleware(
tt, node, tc.registration,
tt, node, tc.registration, false,
)
_, err := invalidName.stream.Recv()
require.Error(tt, err)
@ -578,7 +578,7 @@ func middlewareMandatoryTest(t *testing.T, node *lntest.HarnessNode,
t, node, &lnrpc.MiddlewareRegistration{
MiddlewareName: "itest-interceptor",
CustomMacaroonCaveatName: "itest-caveat",
},
}, true,
)
defer registration.cancel()
@ -649,7 +649,8 @@ type middlewareHarness struct {
// registerMiddleware creates a new middleware harness and sends the initial
// register message to the RPC server.
func registerMiddleware(t *testing.T, node *lntest.HarnessNode,
registration *lnrpc.MiddlewareRegistration) *middlewareHarness {
registration *lnrpc.MiddlewareRegistration,
waitForRegister bool) *middlewareHarness {
ctxc, cancel := context.WithCancel(context.Background())
@ -663,6 +664,13 @@ func registerMiddleware(t *testing.T, node *lntest.HarnessNode,
})
require.NoError(t, err)
if waitForRegister {
// Wait for the registration complete message.
regCompleteMsg, err := middlewareStream.Recv()
require.NoError(t, err)
require.True(t, regCompleteMsg.GetRegComplete())
}
return &middlewareHarness{
t: t,
cancel: cancel,

View File

@ -7504,8 +7504,9 @@ func (r *rpcServer) RegisterRPCMiddleware(
// middleware must be a registration message containing its name and the
// custom caveat it wants to register for.
var (
registerChan = make(chan *lnrpc.MiddlewareRegistration, 1)
errChan = make(chan error, 1)
registerChan = make(chan *lnrpc.MiddlewareRegistration, 1)
registerDoneChan = make(chan struct{})
errChan = make(chan error, 1)
)
ctxc, cancel := context.WithTimeout(
stream.Context(), r.cfg.RPCMiddleware.InterceptTimeout,
@ -7580,6 +7581,40 @@ func (r *rpcServer) RegisterRPCMiddleware(
}
defer r.interceptorChain.RemoveMiddleware(registerMsg.MiddlewareName)
// Send a message to the client to indicate that the registration has
// successfully completed.
regCompleteMsg := &lnrpc.RPCMiddlewareRequest{
InterceptType: &lnrpc.RPCMiddlewareRequest_RegComplete{
RegComplete: true,
},
}
// Send the message in a goroutine because the Send method blocks until
// the message is read by the client.
go func() {
err := stream.Send(regCompleteMsg)
if err != nil {
errChan <- err
return
}
close(registerDoneChan)
}()
select {
case err := <-errChan:
return fmt.Errorf("error sending middleware registration "+
"complete message: %v", err)
case <-ctxc.Done():
return ctxc.Err()
case <-r.quit:
return ErrServerShuttingDown
case <-registerDoneChan:
}
return middleware.Run()
}