itest: add method waitTillServerState in harness node

This commit is contained in:
yyforyongyu 2021-09-18 14:19:16 +08:00
parent f5f1289dab
commit 75da5cabc8
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
2 changed files with 50 additions and 78 deletions

View File

@ -724,13 +724,13 @@ func (hn *HarnessNode) start(lndBinary string, lndError chan<- error,
return err
}
if err := hn.WaitUntilStarted(conn, DefaultTimeout); err != nil {
return err
}
// Init all the RPC clients.
hn.initRPCClients(conn)
if err := hn.WaitUntilStarted(); err != nil {
return err
}
// If the node was created with a seed, we will need to perform an
// additional step to unlock the wallet. The connection returned will
// only use the TLS certs, and can only perform operations necessary to
@ -745,68 +745,27 @@ func (hn *HarnessNode) start(lndBinary string, lndError chan<- error,
}
// WaitUntilStarted waits until the wallet state flips from "WAITING_TO_START".
func (hn *HarnessNode) WaitUntilStarted(conn grpc.ClientConnInterface,
timeout time.Duration) error {
return hn.waitForState(conn, timeout, func(s lnrpc.WalletState) bool {
func (hn *HarnessNode) WaitUntilStarted() error {
return hn.waitTillServerState(func(s lnrpc.WalletState) bool {
return s != lnrpc.WalletState_WAITING_TO_START
})
}
// WaitUntilStateReached waits until the given wallet state (or one of the
// states following it) has been reached.
func (hn *HarnessNode) WaitUntilStateReached(conn grpc.ClientConnInterface,
timeout time.Duration, desiredState lnrpc.WalletState) error {
func (hn *HarnessNode) WaitUntilStateReached(
desiredState lnrpc.WalletState) error {
return hn.waitForState(conn, timeout, func(s lnrpc.WalletState) bool {
return hn.waitTillServerState(func(s lnrpc.WalletState) bool {
return s >= desiredState
})
}
// waitForState waits until the current node state fulfills the given
// predicate.
func (hn *HarnessNode) waitForState(conn grpc.ClientConnInterface,
timeout time.Duration,
predicate func(state lnrpc.WalletState) bool) error {
stateClient := lnrpc.NewStateClient(conn)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stateStream, err := stateClient.SubscribeState(
ctx, &lnrpc.SubscribeStateRequest{},
)
if err != nil {
return err
}
errChan := make(chan error, 1)
started := make(chan struct{})
go func() {
for {
resp, err := stateStream.Recv()
if err != nil {
errChan <- err
return
}
if predicate(resp.State) {
close(started)
return
}
}
}()
select {
case <-started:
case err = <-errChan:
case <-time.After(timeout):
return fmt.Errorf("WaitUntilLeader timed out")
}
return err
// WaitUntilServerActive waits until the lnd daemon is fully started.
func (hn *HarnessNode) WaitUntilServerActive() error {
return hn.waitTillServerState(func(s lnrpc.WalletState) bool {
return s == lnrpc.WalletState_SERVER_ACTIVE
})
}
// WaitUntilLeader attempts to finish the start procedure by initiating an RPC
@ -820,18 +779,12 @@ func (hn *HarnessNode) WaitUntilLeader(timeout time.Duration) error {
connErr error
)
startTs := time.Now()
if err := wait.NoError(func() error {
conn, connErr = hn.ConnectRPC(!hn.Cfg.HasSeed)
return connErr
}, timeout); err != nil {
return err
}
timeout -= time.Since(startTs)
if err := hn.WaitUntilStarted(conn, timeout); err != nil {
return err
}
// Init all the RPC clients.
hn.initRPCClients(conn)
@ -982,9 +935,11 @@ func (hn *HarnessNode) Unlock(unlockReq *lnrpc.UnlockWalletRequest) error {
return hn.initClientWhenReady(DefaultTimeout)
}
// waitTillServerStarted makes a subscription to the server's state change and
// blocks until the server is in state ServerActive.
func (hn *HarnessNode) waitTillServerStarted() error {
// waitTillServerState makes a subscription to the server's state change and
// blocks until the server is in the targeted state.
func (hn *HarnessNode) waitTillServerState(
predicate func(state lnrpc.WalletState) bool) error {
ctxt, cancel := context.WithTimeout(hn.runCtx, NodeStartTimeout)
defer cancel()
@ -995,15 +950,35 @@ func (hn *HarnessNode) waitTillServerStarted() error {
return fmt.Errorf("failed to subscribe to state: %w", err)
}
for {
resp, err := client.Recv()
if err != nil {
return fmt.Errorf("failed to receive state "+
"client stream: %w", err)
}
errChan := make(chan error, 1)
done := make(chan struct{})
go func() {
for {
resp, err := client.Recv()
if err != nil {
errChan <- err
return
}
if resp.State == lnrpc.WalletState_SERVER_ACTIVE {
if predicate(resp.State) {
close(done)
return
}
}
}()
var lastErr error
for {
select {
case err := <-errChan:
lastErr = err
case <-done:
return nil
case <-time.After(NodeStartTimeout):
return fmt.Errorf("timeout waiting for state, "+
"got err from stream: %v", lastErr)
}
}
}
@ -1043,14 +1018,13 @@ func (hn *HarnessNode) initLightningClient() error {
hn.StateClient = lnrpc.NewStateClient(conn)
// Wait until the server is fully started.
if err := hn.waitTillServerStarted(); err != nil {
if err := hn.WaitUntilServerActive(); err != nil {
return err
}
// Set the harness node's pubkey to what the node claims in GetInfo.
// Since the RPC might not be immediately active, we wrap the call in a
// wait.NoError.
if err := wait.NoError(hn.FetchNodeInfo, DefaultTimeout); err != nil {
// The RPC must have been started at this point.
if err := hn.FetchNodeInfo(); err != nil {
return err
}

View File

@ -403,9 +403,7 @@ func middlewareMandatoryTest(t *testing.T, node *lntest.HarnessNode,
// test case. So we need to do the wait and client setup manually here.
conn, err := node.ConnectRPC(true)
require.NoError(t, err)
err = node.WaitUntilStateReached(
conn, defaultTimeout, lnrpc.WalletState_RPC_ACTIVE,
)
err = node.WaitUntilStateReached(lnrpc.WalletState_RPC_ACTIVE)
require.NoError(t, err)
node.LightningClient = lnrpc.NewLightningClient(conn)