lnd/lntemp/node/harness_node.go
yyforyongyu d32539ab86
lntemp: add HarnessNode to manage lnd's process
This commit adds a new struct, `HarnessNode`, to manage the lnd process
used in our itest. This struct is built upon `HarnessRPC`, `State`, and
`NodeWatcher`.
2022-10-14 15:45:23 +08:00

862 lines
22 KiB
Go

package node
import (
"bytes"
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lntemp/rpc"
"github.com/lightningnetwork/lnd/lntest"
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/macaroons"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
"gopkg.in/macaroon.v2"
)
const (
// logPubKeyBytes is the number of bytes of the node's PubKey that will
// be appended to the log file name. The whole PubKey is too long and
// not really necessary to quickly identify what node produced which
// log file.
logPubKeyBytes = 4
// trickleDelay is the amount of time in milliseconds between each
// release of announcements by AuthenticatedGossiper to the network.
trickleDelay = 50
postgresDsn = "postgres://postgres:postgres@localhost:" +
"6432/%s?sslmode=disable"
// commitInterval specifies the maximum interval the graph database
// will wait between attempting to flush a batch of modifications to
// disk(db.batch-commit-interval).
commitInterval = 10 * time.Millisecond
)
// HarnessNode represents an instance of lnd running within our test network
// harness. It's responsible for managing the lnd process, grpc connection, and
// wallet auth. A HarnessNode is built upon its rpc clients, represented in
// `HarnessRPC`. It also has a `State` which holds its internal state, and a
// `Watcher` that keeps track of its topology updates.
type HarnessNode struct {
*testing.T
// Cfg holds the config values for the node.
Cfg *BaseNodeConfig
// RPC holds a list of RPC clients.
RPC *rpc.HarnessRPC
// State records the current state of the node.
State *State
// Watcher watches the node's topology updates.
Watcher *nodeWatcher
// PubKey is the serialized compressed identity public key of the node.
// This field will only be populated once the node itself has been
// started via the start() method.
PubKey [33]byte
PubKeyStr string
// conn is the underlying connection to the grpc endpoint of the node.
conn *grpc.ClientConn
// runCtx is a context with cancel method. It's used to signal when the
// node needs to quit, and used as the parent context when spawning
// children contexts for RPC requests.
runCtx context.Context
cancel context.CancelFunc
// filename is the log file's name.
filename string
cmd *exec.Cmd
logFile *os.File
}
// NewHarnessNode creates a new test lightning node instance from the passed
// config.
func NewHarnessNode(t *testing.T, cfg *BaseNodeConfig) (*HarnessNode, error) {
if cfg.BaseDir == "" {
var err error
cfg.BaseDir, err = ioutil.TempDir("", "lndtest-node")
if err != nil {
return nil, err
}
}
cfg.DataDir = filepath.Join(cfg.BaseDir, "data")
cfg.LogDir = filepath.Join(cfg.BaseDir, "logs")
cfg.TLSCertPath = filepath.Join(cfg.BaseDir, "tls.cert")
cfg.TLSKeyPath = filepath.Join(cfg.BaseDir, "tls.key")
networkDir := filepath.Join(
cfg.DataDir, "chain", "bitcoin", cfg.NetParams.Name,
)
cfg.AdminMacPath = filepath.Join(networkDir, "admin.macaroon")
cfg.ReadMacPath = filepath.Join(networkDir, "readonly.macaroon")
cfg.InvoiceMacPath = filepath.Join(networkDir, "invoice.macaroon")
cfg.GenerateListeningPorts()
// Create temporary database.
var dbName string
if cfg.DbBackend == lntest.BackendPostgres {
var err error
dbName, err = createTempPgDb()
if err != nil {
return nil, err
}
cfg.PostgresDsn = postgresDatabaseDsn(dbName)
}
cfg.OriginalExtraArgs = cfg.ExtraArgs
cfg.postgresDbName = dbName
return &HarnessNode{
T: t,
Cfg: cfg,
}, nil
}
// InitRPCClients initializes a list of RPC clients for the node.
func (hn *HarnessNode) InitRPCClients(c *grpc.ClientConn) {
hn.conn = c
// Init all the rpc clients.
hn.RPC = rpc.NewHarnessRPC(hn.runCtx, hn.T, c, hn.Name())
// Init the node's internal state.
hn.State = newState(hn.RPC)
// Init the topology watcher.
hn.Watcher = newNodeWatcher(hn.RPC, hn.State)
}
// Name returns the name of this node set during initialization.
func (hn *HarnessNode) Name() string {
return hn.Cfg.Name
}
// UpdateState updates the node's internal state.
func (hn *HarnessNode) UpdateState() {
hn.State.updateState()
}
// String gives the internal state of the node which is useful for debugging.
func (hn *HarnessNode) String() string {
type nodeCfg struct {
LogFilenamePrefix string
ExtraArgs []string
HasSeed bool
P2PPort int
RPCPort int
RESTPort int
ProfilePort int
AcceptKeySend bool
FeeURL string
}
nodeState := struct {
NodeID uint32
Name string
PubKey string
State *State
NodeCfg nodeCfg
}{
NodeID: hn.Cfg.NodeID,
Name: hn.Cfg.Name,
PubKey: hn.PubKeyStr,
State: hn.State,
NodeCfg: nodeCfg{
LogFilenamePrefix: hn.Cfg.LogFilenamePrefix,
ExtraArgs: hn.Cfg.ExtraArgs,
P2PPort: hn.Cfg.P2PPort,
RPCPort: hn.Cfg.RPCPort,
RESTPort: hn.Cfg.RESTPort,
},
}
stateBytes, err := json.MarshalIndent(nodeState, "", "\t")
if err != nil {
return fmt.Sprintf("\n encode node state with err: %v", err)
}
return fmt.Sprintf("\nnode state: %s", stateBytes)
}
// WaitUntilStarted waits until the wallet state flips from "WAITING_TO_START".
func (hn *HarnessNode) WaitUntilStarted() error {
return hn.waitTillServerState(func(s lnrpc.WalletState) bool {
return s != lnrpc.WalletState_WAITING_TO_START
})
}
// WaitUntilServerActive waits until the lnd daemon is fully started.
func (hn *HarnessNode) WaitUntilServerActive() error {
return hn.waitTillServerState(func(s lnrpc.WalletState) bool {
return s == lnrpc.WalletState_SERVER_ACTIVE
})
}
// Unlock attempts to unlock the wallet of the target HarnessNode. This method
// should be called after the restart of a HarnessNode that was created with a
// seed+password. Once this method returns, the HarnessNode will be ready to
// accept normal gRPC requests and harness command.
func (hn *HarnessNode) Unlock(unlockReq *lnrpc.UnlockWalletRequest) error {
// Otherwise, we'll need to unlock the node before it's able to start
// up properly.
hn.RPC.UnlockWallet(unlockReq)
// Now that the wallet has been unlocked, we'll wait for the RPC client
// to be ready, then establish the normal gRPC connection.
return hn.InitNode(nil)
}
// AddToLogf adds a line of choice to the node's logfile. This is useful
// to interleave test output with output from the node.
func (hn *HarnessNode) AddToLogf(format string, a ...interface{}) {
// If this node was not set up with a log file, just return early.
if hn.logFile == nil {
return
}
desc := fmt.Sprintf("itest: %s\n", fmt.Sprintf(format, a...))
if _, err := hn.logFile.WriteString(desc); err != nil {
hn.printErrf("write to log err: %v", err)
}
}
// ReadMacaroon waits a given duration for the macaroon file to be created. If
// the file is readable within the timeout, its content is de-serialized as a
// macaroon and returned.
func (hn *HarnessNode) ReadMacaroon(macPath string, timeout time.Duration) (
*macaroon.Macaroon, error) {
// Wait until macaroon file is created and has valid content before
// using it.
var mac *macaroon.Macaroon
err := wait.NoError(func() error {
macBytes, err := ioutil.ReadFile(macPath)
if err != nil {
return fmt.Errorf("error reading macaroon file: %v",
err)
}
newMac := &macaroon.Macaroon{}
if err = newMac.UnmarshalBinary(macBytes); err != nil {
return fmt.Errorf("error unmarshalling macaroon "+
"file: %v", err)
}
mac = newMac
return nil
}, timeout)
return mac, err
}
// ConnectRPCWithMacaroon uses the TLS certificate and given macaroon to
// create a gRPC client connection.
func (hn *HarnessNode) ConnectRPCWithMacaroon(mac *macaroon.Macaroon) (
*grpc.ClientConn, error) {
// Wait until TLS certificate is created and has valid content before
// using it, up to 30 sec.
var tlsCreds credentials.TransportCredentials
err := wait.NoError(func() error {
var err error
tlsCreds, err = credentials.NewClientTLSFromFile(
hn.Cfg.TLSCertPath, "",
)
return err
}, DefaultTimeout)
if err != nil {
return nil, fmt.Errorf("error reading TLS cert: %v", err)
}
opts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithTransportCredentials(tlsCreds),
}
ctx, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
defer cancel()
if mac == nil {
return grpc.DialContext(ctx, hn.Cfg.RPCAddr(), opts...)
}
macCred, err := macaroons.NewMacaroonCredential(mac)
if err != nil {
return nil, fmt.Errorf("error cloning mac: %v", err)
}
opts = append(opts, grpc.WithPerRPCCredentials(macCred))
return grpc.DialContext(ctx, hn.Cfg.RPCAddr(), opts...)
}
// ConnectRPC uses the TLS certificate and admin macaroon files written by the
// lnd node to create a gRPC client connection.
func (hn *HarnessNode) ConnectRPC() (*grpc.ClientConn, error) {
// If we should use a macaroon, always take the admin macaroon as a
// default.
mac, err := hn.ReadMacaroon(hn.Cfg.AdminMacPath, DefaultTimeout)
if err != nil {
return nil, err
}
return hn.ConnectRPCWithMacaroon(mac)
}
// SetExtraArgs assigns the ExtraArgs field for the node's configuration. The
// changes will take effect on restart.
func (hn *HarnessNode) SetExtraArgs(extraArgs []string) {
hn.Cfg.ExtraArgs = extraArgs
}
// StartLndCmd handles the startup of lnd, creating log files, and possibly
// kills the process when needed.
func (hn *HarnessNode) StartLndCmd() error {
// Init the runCtx.
hn.runCtx, hn.cancel = context.WithCancel(context.Background())
args := hn.Cfg.GenArgs()
hn.cmd = exec.Command(hn.Cfg.LndBinary, args...) //nolint:gosec
// Redirect stderr output to buffer
var errb bytes.Buffer
hn.cmd.Stderr = &errb
// If the logoutput flag is passed, redirect output from the nodes to
// log files.
if *lntest.LogOutput {
err := addLogFile(hn)
if err != nil {
return err
}
}
// Start the process.
if err := hn.cmd.Start(); err != nil {
return err
}
return nil
}
// StartWithSeed will start the lnd process, creates the grpc connection
// without macaroon auth, and waits until the server is reported as waiting to
// start.
//
// NOTE: caller needs to take extra step to create and unlock the wallet.
func (hn *HarnessNode) StartWithSeed() error {
// Start lnd process and prepare logs.
if err := hn.StartLndCmd(); err != nil {
return fmt.Errorf("start lnd error: %w", err)
}
// Create an unauthed connection.
conn, err := hn.ConnectRPCWithMacaroon(nil)
if err != nil {
return fmt.Errorf("ConnectRPCWithMacaroon err: %w", err)
}
// Since the conn is not authed, only the `WalletUnlocker` and `State`
// clients can be inited from this conn.
hn.conn = conn
hn.RPC = rpc.NewHarnessRPC(hn.runCtx, hn.T, conn, hn.Name())
// Wait till the server is starting.
return hn.WaitUntilStarted()
}
// Start will start the lnd process, creates the grpc connection, and waits
// until the server is fully started.
func (hn *HarnessNode) Start() error {
// Start lnd process and prepare logs.
if err := hn.StartLndCmd(); err != nil {
return fmt.Errorf("start lnd error: %w", err)
}
// Since Stop uses the LightningClient to stop the node, if we fail to
// get a connected client, we have to kill the process.
conn, err := hn.ConnectRPC()
if err != nil {
err = fmt.Errorf("ConnectRPC err: %w", err)
cmdErr := hn.kill()
if cmdErr != nil {
err = fmt.Errorf("kill process got err: %w: %v",
cmdErr, err)
}
return err
}
// Init all the RPC clients.
hn.InitRPCClients(conn)
// Wait till the server is starting.
if err := hn.WaitUntilStarted(); err != nil {
return fmt.Errorf("waiting for start got: %v", err)
}
// Subscribe for topology updates.
return hn.initLightningClient()
}
// InitNode waits until the main gRPC server is detected as active, then
// complete the normal HarnessNode gRPC connection creation. A non-nil
// `macBytes` indicates the node is initialized stateless, otherwise it will
// use the admin macaroon.
func (hn *HarnessNode) InitNode(macBytes []byte) error {
var (
conn *grpc.ClientConn
err error
)
// If the node has been initialized stateless, we need to pass the
// macaroon to the client.
if macBytes != nil {
adminMac := &macaroon.Macaroon{}
err := adminMac.UnmarshalBinary(macBytes)
if err != nil {
return fmt.Errorf("unmarshal failed: %w", err)
}
conn, err = hn.ConnectRPCWithMacaroon(adminMac)
if err != nil {
return err
}
} else {
// Normal initialization, we expect a macaroon to be in the
// file system.
conn, err = hn.ConnectRPC()
if err != nil {
return err
}
}
// Init all the RPC clients.
hn.InitRPCClients(conn)
return hn.initLightningClient()
}
// waitTillServerState makes a subscription to the server's state change and
// blocks until the server is in the targeted state.
func (hn *HarnessNode) waitTillServerState(
predicate func(state lnrpc.WalletState) bool) error {
client := hn.RPC.SubscribeState()
errChan := make(chan error, 1)
done := make(chan struct{})
go func() {
for {
resp, err := client.Recv()
if err != nil {
errChan <- err
return
}
if predicate(resp.State) {
close(done)
return
}
}
}()
for {
select {
case <-time.After(lntest.NodeStartTimeout):
return fmt.Errorf("timeout waiting for server state")
case err := <-errChan:
return fmt.Errorf("receive server state err: %v", err)
case <-done:
return nil
}
}
}
// initLightningClient blocks until the lnd server is fully started and
// subscribes the harness node to graph topology updates. This method also
// spawns a lightning network watcher for this node, which watches for topology
// changes.
func (hn *HarnessNode) initLightningClient() error {
// Wait until the server is fully started.
if err := hn.WaitUntilServerActive(); err != nil {
return err
}
// Set the harness node's pubkey to what the node claims in GetInfo.
// The RPC must have been started at this point.
if err := hn.attachPubKey(); err != nil {
return err
}
// Launch the watcher that will hook into graph related topology change
// from the PoV of this node.
started := make(chan error, 1)
go hn.Watcher.topologyWatcher(hn.runCtx, started)
select {
// First time reading the channel indicates the topology client is
// started.
case err := <-started:
if err != nil {
return fmt.Errorf("create topology client stream "+
"got err: %v", err)
}
case <-time.After(DefaultTimeout):
return fmt.Errorf("timeout creating topology client stream")
}
// Catch topology client stream error inside a goroutine.
go func() {
select {
case err := <-started:
hn.printErrf("topology client: %v", err)
case <-hn.runCtx.Done():
}
}()
return nil
}
// attachPubKey queries an unlocked node to retrieve its public key.
func (hn *HarnessNode) attachPubKey() error {
// Obtain the lnid of this node for quick identification purposes.
info := hn.RPC.GetInfo()
hn.PubKeyStr = info.IdentityPubkey
pubkey, err := hex.DecodeString(info.IdentityPubkey)
if err != nil {
return err
}
copy(hn.PubKey[:], pubkey)
return nil
}
// cleanup cleans up all the temporary files created by the node's process.
func (hn *HarnessNode) cleanup() error {
if hn.Cfg.backupDbDir != "" {
err := os.RemoveAll(hn.Cfg.backupDbDir)
if err != nil {
return fmt.Errorf("unable to remove backup dir: %v",
err)
}
}
return os.RemoveAll(hn.Cfg.BaseDir)
}
// waitForProcessExit Launch a new goroutine which that bubbles up any
// potential fatal process errors to the goroutine running the tests.
func (hn *HarnessNode) waitForProcessExit() {
errChan := make(chan error, 1)
go func() {
err := hn.cmd.Wait()
errChan <- err
}()
select {
case err := <-errChan:
if err == nil {
break
}
// If the process has already been canceled, we can exit early
// as the logs have already been saved.
if strings.Contains(err.Error(), "Wait was already called") {
return
}
// Otherwise, we print the error, break the select and save
// logs.
hn.printErrf("wait process exit got err: %v", err)
break
case <-time.After(DefaultTimeout * 2):
hn.printErrf("timeout waiting for process to exit")
}
// Make sure log file is closed and renamed if necessary.
finalizeLogfile(hn)
// Rename the etcd.log file if the node was running on embedded
// etcd.
finalizeEtcdLog(hn)
}
// Stop attempts to stop the active lnd process.
func (hn *HarnessNode) Stop() error {
// Do nothing if the process is not running.
if hn.runCtx == nil {
hn.printErrf("found nil run context")
return nil
}
// Stop the runCtx.
hn.cancel()
// Wait for lnd process to exit in the end.
defer hn.waitForProcessExit()
// If we ever reaches the state where `Watcher` is initialized, it
// means the node has an authed connection and all its RPC clients are
// ready for use. Thus we will try to stop it via the RPC.
if hn.Watcher != nil {
// Don't watch for error because sometimes the RPC connection
// gets closed before a response is returned.
req := lnrpc.StopRequest{}
ctxt, cancel := context.WithCancel(context.Background())
defer cancel()
err := wait.NoError(func() error {
_, err := hn.RPC.LN.StopDaemon(ctxt, &req)
switch {
case err == nil:
return nil
// Try again if a recovery/rescan is in progress.
case strings.Contains(
err.Error(), "recovery in progress",
):
return err
default:
return nil
}
}, DefaultTimeout)
if err != nil {
return err
}
// Wait for goroutines to be finished.
done := make(chan struct{})
go func() {
hn.Watcher.wg.Wait()
close(done)
}()
// If the goroutines fail to finish before timeout, we'll print
// the error to console and continue.
select {
case <-time.After(DefaultTimeout):
hn.printErrf("timeout on wait group")
case <-done:
}
} else {
// If the rpc clients are not initiated, we'd kill the process
// manually.
hn.printErrf("found nil RPC clients")
if err := hn.kill(); err != nil {
return fmt.Errorf("killing process got: %v", err)
}
}
// Close any attempts at further grpc connections.
if hn.conn != nil {
err := status.Code(hn.conn.Close())
switch err {
case codes.OK:
return nil
// When the context is canceled above, we might get the
// following error as the context is no longer active.
case codes.Canceled:
return nil
case codes.Unknown:
return fmt.Errorf("unknown error attempting to stop "+
"grpc client: %v", err)
default:
return fmt.Errorf("error attempting to stop "+
"grpc client: %v", err)
}
}
return nil
}
// Shutdown stops the active lnd process and cleans up any temporary
// directories created along the way.
func (hn *HarnessNode) Shutdown() error {
if err := hn.Stop(); err != nil {
return err
}
if err := hn.cleanup(); err != nil {
return err
}
return nil
}
// kill kills the lnd process.
func (hn *HarnessNode) kill() error {
return hn.cmd.Process.Kill()
}
// printErrf prints an error to the console.
func (hn *HarnessNode) printErrf(format string, a ...interface{}) {
fmt.Printf("itest error from [%s:%s]: %s\n", // nolint:forbidigo
hn.Cfg.LogFilenamePrefix, hn.Cfg.Name,
fmt.Sprintf(format, a...))
}
func postgresDatabaseDsn(dbName string) string {
return fmt.Sprintf(postgresDsn, dbName)
}
// createTempPgDb creates a temp postgres database.
func createTempPgDb() (string, error) {
// Create random database name.
randBytes := make([]byte, 8)
_, err := rand.Read(randBytes)
if err != nil {
return "", err
}
dbName := "itest_" + hex.EncodeToString(randBytes)
// Create database.
err = executePgQuery("CREATE DATABASE " + dbName)
if err != nil {
return "", err
}
return dbName, nil
}
// executePgQuery executes a SQL statement in a postgres db.
func executePgQuery(query string) error {
pool, err := pgxpool.Connect(
context.Background(),
postgresDatabaseDsn("postgres"),
)
if err != nil {
return fmt.Errorf("unable to connect to database: %v", err)
}
defer pool.Close()
_, err = pool.Exec(context.Background(), query)
return err
}
// renameFile is a helper to rename (log) files created during integration
// tests.
func renameFile(fromFileName, toFileName string) {
err := os.Rename(fromFileName, toFileName)
if err != nil {
fmt.Printf("could not rename %s to %s: %v\n", // nolint:forbidigo
fromFileName, toFileName, err)
}
}
// getFinalizedLogFilePrefix returns the finalize log filename.
func getFinalizedLogFilePrefix(hn *HarnessNode) string {
pubKeyHex := hex.EncodeToString(
hn.PubKey[:logPubKeyBytes],
)
return fmt.Sprintf("%s/%d-%s-%s-%s",
lntest.GetLogDir(), hn.Cfg.NodeID,
hn.Cfg.LogFilenamePrefix,
hn.Cfg.Name, pubKeyHex)
}
// finalizeLogfile makes sure the log file cleanup function is initialized,
// even if no log file is created.
func finalizeLogfile(hn *HarnessNode) {
// Exit early if there's no log file.
if hn.logFile == nil {
return
}
hn.logFile.Close()
// If logoutput flag is not set, return early.
if !*lntest.LogOutput {
return
}
newFileName := fmt.Sprintf("%v.log",
getFinalizedLogFilePrefix(hn),
)
renameFile(hn.filename, newFileName)
}
// finalizeEtcdLog saves the etcd log files when test ends.
func finalizeEtcdLog(hn *HarnessNode) {
// Exit early if this is not etcd backend.
if hn.Cfg.DbBackend != lntest.BackendEtcd {
return
}
etcdLogFileName := fmt.Sprintf("%s/etcd.log", hn.Cfg.LogDir)
newEtcdLogFileName := fmt.Sprintf("%v-etcd.log",
getFinalizedLogFilePrefix(hn),
)
renameFile(etcdLogFileName, newEtcdLogFileName)
}
// addLogFile creates log files used by this node.
func addLogFile(hn *HarnessNode) error {
var fileName string
dir := lntest.GetLogDir()
fileName = fmt.Sprintf("%s/%d-%s-%s-%s.log", dir, hn.Cfg.NodeID,
hn.Cfg.LogFilenamePrefix, hn.Cfg.Name,
hex.EncodeToString(hn.PubKey[:logPubKeyBytes]))
// If the node's PubKey is not yet initialized, create a temporary file
// name. Later, after the PubKey has been initialized, the file can be
// moved to its final name with the PubKey included.
if bytes.Equal(hn.PubKey[:4], []byte{0, 0, 0, 0}) {
fileName = fmt.Sprintf("%s/%d-%s-%s-tmp__.log", dir,
hn.Cfg.NodeID, hn.Cfg.LogFilenamePrefix,
hn.Cfg.Name)
}
// Create file if not exists, otherwise append.
file, err := os.OpenFile(fileName,
os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
return err
}
// Pass node's stderr to both errb and the file.
w := io.MultiWriter(hn.cmd.Stderr, file)
hn.cmd.Stderr = w
// Pass the node's stdout only to the file.
hn.cmd.Stdout = file
// Let the node keep a reference to this file, such that we can add to
// it if necessary.
hn.logFile = file
hn.filename = fileName
return nil
}