package node import ( "bytes" "context" "crypto/rand" "encoding/hex" "encoding/json" "errors" "fmt" "io" "os" "os/exec" "path/filepath" "strings" "testing" "time" "github.com/jackc/pgx/v4/pgxpool" "github.com/lightningnetwork/lnd" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lntest/rpc" "github.com/lightningnetwork/lnd/lntest/wait" "github.com/lightningnetwork/lnd/macaroons" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/status" "gopkg.in/macaroon.v2" ) const ( // logPubKeyBytes is the number of bytes of the node's PubKey that will // be appended to the log file name. The whole PubKey is too long and // not really necessary to quickly identify what node produced which // log file. logPubKeyBytes = 4 // trickleDelay is the amount of time in milliseconds between each // release of announcements by AuthenticatedGossiper to the network. trickleDelay = 50 postgresDsn = "postgres://postgres:postgres@localhost:" + "6432/%s?sslmode=disable" // commitInterval specifies the maximum interval the graph database // will wait between attempting to flush a batch of modifications to // disk(db.batch-commit-interval). commitInterval = 10 * time.Millisecond ) // HarnessNode represents an instance of lnd running within our test network // harness. It's responsible for managing the lnd process, grpc connection, and // wallet auth. A HarnessNode is built upon its rpc clients, represented in // `HarnessRPC`. It also has a `State` which holds its internal state, and a // `Watcher` that keeps track of its topology updates. type HarnessNode struct { *testing.T // Cfg holds the config values for the node. Cfg *BaseNodeConfig // RPC holds a list of RPC clients. RPC *rpc.HarnessRPC // State records the current state of the node. State *State // Watcher watches the node's topology updates. Watcher *nodeWatcher // PubKey is the serialized compressed identity public key of the node. // This field will only be populated once the node itself has been // started via the start() method. PubKey [33]byte PubKeyStr string // conn is the underlying connection to the grpc endpoint of the node. conn *grpc.ClientConn // runCtx is a context with cancel method. It's used to signal when the // node needs to quit, and used as the parent context when spawning // children contexts for RPC requests. runCtx context.Context //nolint:containedctx cancel context.CancelFunc // filename is the log file's name. filename string cmd *exec.Cmd logFile *os.File } // NewHarnessNode creates a new test lightning node instance from the passed // config. func NewHarnessNode(t *testing.T, cfg *BaseNodeConfig) (*HarnessNode, error) { if cfg.BaseDir == "" { var err error // Create a temporary directory for the node's data and logs. // Use dash suffix as a separator between base name and random // suffix. dirBaseName := fmt.Sprintf("lndtest-node-%s-", cfg.Name) cfg.BaseDir, err = os.MkdirTemp("", dirBaseName) if err != nil { return nil, err } } cfg.DataDir = filepath.Join(cfg.BaseDir, "data") cfg.LogDir = filepath.Join(cfg.BaseDir, "logs") cfg.TLSCertPath = filepath.Join(cfg.BaseDir, "tls.cert") cfg.TLSKeyPath = filepath.Join(cfg.BaseDir, "tls.key") networkDir := filepath.Join( cfg.DataDir, "chain", lnd.BitcoinChainName, cfg.NetParams.Name, ) cfg.AdminMacPath = filepath.Join(networkDir, "admin.macaroon") cfg.ReadMacPath = filepath.Join(networkDir, "readonly.macaroon") cfg.InvoiceMacPath = filepath.Join(networkDir, "invoice.macaroon") cfg.GenerateListeningPorts() // Create temporary database. var dbName string if cfg.DBBackend == BackendPostgres { var err error dbName, err = createTempPgDB() if err != nil { return nil, err } cfg.PostgresDsn = postgresDatabaseDsn(dbName) } cfg.OriginalExtraArgs = cfg.ExtraArgs cfg.postgresDBName = dbName return &HarnessNode{ T: t, Cfg: cfg, }, nil } // Initialize creates a list of new RPC clients using the passed connection, // initializes the node's internal state and creates a topology watcher. func (hn *HarnessNode) Initialize(c *grpc.ClientConn) { hn.conn = c // Init all the rpc clients. hn.RPC = rpc.NewHarnessRPC(hn.runCtx, hn.T, c, hn.Name()) // Init the node's state. // // If we already have a state, it means we are restarting the node and // we will only reset its internal states. Otherwise we'll create a new // state. if hn.State != nil { hn.State.resetEphermalStates(hn.RPC) } else { hn.State = newState(hn.RPC) } // Init the topology watcher. hn.Watcher = newNodeWatcher(hn.RPC, hn.State) } // Name returns the name of this node set during initialization. func (hn *HarnessNode) Name() string { return hn.Cfg.Name } // UpdateState updates the node's internal state. func (hn *HarnessNode) UpdateState() { hn.State.updateState() } // String gives the internal state of the node which is useful for debugging. func (hn *HarnessNode) String() string { type nodeCfg struct { LogFilenamePrefix string ExtraArgs []string SkipUnlock bool Password []byte P2PPort int RPCPort int RESTPort int AcceptKeySend bool FeeURL string } nodeState := struct { NodeID uint32 Name string PubKey string State *State NodeCfg nodeCfg }{ NodeID: hn.Cfg.NodeID, Name: hn.Cfg.Name, PubKey: hn.PubKeyStr, State: hn.State, NodeCfg: nodeCfg{ SkipUnlock: hn.Cfg.SkipUnlock, Password: hn.Cfg.Password, LogFilenamePrefix: hn.Cfg.LogFilenamePrefix, ExtraArgs: hn.Cfg.ExtraArgs, P2PPort: hn.Cfg.P2PPort, RPCPort: hn.Cfg.RPCPort, RESTPort: hn.Cfg.RESTPort, }, } stateBytes, err := json.MarshalIndent(nodeState, "", "\t") if err != nil { return fmt.Sprintf("\n encode node state with err: %v", err) } return fmt.Sprintf("\nnode state: %s", stateBytes) } // WaitUntilStarted waits until the wallet state flips from "WAITING_TO_START". func (hn *HarnessNode) WaitUntilStarted() error { return hn.waitTillServerState(func(s lnrpc.WalletState) bool { return s != lnrpc.WalletState_WAITING_TO_START }) } // WaitUntilServerActive waits until the lnd daemon is fully started. func (hn *HarnessNode) WaitUntilServerActive() error { return hn.waitTillServerState(func(s lnrpc.WalletState) bool { return s == lnrpc.WalletState_SERVER_ACTIVE }) } // WaitUntilLeader attempts to finish the start procedure by initiating an RPC // connection and setting up the wallet unlocker client. This is needed when // a node that has recently been started was waiting to become the leader and // we're at the point when we expect that it is the leader now (awaiting // unlock). func (hn *HarnessNode) WaitUntilLeader(timeout time.Duration) error { var ( conn *grpc.ClientConn connErr error ) if err := wait.NoError(func() error { conn, connErr = hn.ConnectRPCWithMacaroon(nil) return connErr }, timeout); err != nil { return err } // Since the conn is not authed, only the `WalletUnlocker` and `State` // clients can be inited from this conn. hn.conn = conn hn.RPC = rpc.NewHarnessRPC(hn.runCtx, hn.T, conn, hn.Name()) // Wait till the server is starting. return hn.WaitUntilStarted() } // Unlock attempts to unlock the wallet of the target HarnessNode. This method // should be called after the restart of a HarnessNode that was created with a // seed+password. Once this method returns, the HarnessNode will be ready to // accept normal gRPC requests and harness command. func (hn *HarnessNode) Unlock(unlockReq *lnrpc.UnlockWalletRequest) error { // Otherwise, we'll need to unlock the node before it's able to start // up properly. hn.RPC.UnlockWallet(unlockReq) // Now that the wallet has been unlocked, we'll wait for the RPC client // to be ready, then establish the normal gRPC connection. return hn.InitNode(nil) } // AddToLogf adds a line of choice to the node's logfile. This is useful // to interleave test output with output from the node. func (hn *HarnessNode) AddToLogf(format string, a ...interface{}) { // If this node was not set up with a log file, just return early. if hn.logFile == nil { return } desc := fmt.Sprintf("itest: %s\n", fmt.Sprintf(format, a...)) if _, err := hn.logFile.WriteString(desc); err != nil { hn.printErrf("write to log err: %v", err) } } // ReadMacaroon waits a given duration for the macaroon file to be created. If // the file is readable within the timeout, its content is de-serialized as a // macaroon and returned. func (hn *HarnessNode) ReadMacaroon(macPath string, timeout time.Duration) ( *macaroon.Macaroon, error) { // Wait until macaroon file is created and has valid content before // using it. var mac *macaroon.Macaroon err := wait.NoError(func() error { macBytes, err := os.ReadFile(macPath) if err != nil { return fmt.Errorf("error reading macaroon file: %w", err) } newMac := &macaroon.Macaroon{} if err = newMac.UnmarshalBinary(macBytes); err != nil { return fmt.Errorf("error unmarshalling macaroon "+ "file: %w", err) } mac = newMac return nil }, timeout) return mac, err } // ConnectRPCWithMacaroon uses the TLS certificate and given macaroon to // create a gRPC client connection. func (hn *HarnessNode) ConnectRPCWithMacaroon(mac *macaroon.Macaroon) ( *grpc.ClientConn, error) { // Wait until TLS certificate is created and has valid content before // using it, up to 30 sec. var tlsCreds credentials.TransportCredentials err := wait.NoError(func() error { var err error tlsCreds, err = credentials.NewClientTLSFromFile( hn.Cfg.TLSCertPath, "", ) return err }, wait.DefaultTimeout) if err != nil { return nil, fmt.Errorf("error reading TLS cert: %w", err) } opts := []grpc.DialOption{ grpc.WithBlock(), grpc.WithTransportCredentials(tlsCreds), } ctx, cancel := context.WithTimeout(hn.runCtx, wait.DefaultTimeout) defer cancel() if mac == nil { return grpc.DialContext(ctx, hn.Cfg.RPCAddr(), opts...) } macCred, err := macaroons.NewMacaroonCredential(mac) if err != nil { return nil, fmt.Errorf("error cloning mac: %w", err) } opts = append(opts, grpc.WithPerRPCCredentials(macCred)) return grpc.DialContext(ctx, hn.Cfg.RPCAddr(), opts...) } // ConnectRPC uses the TLS certificate and admin macaroon files written by the // lnd node to create a gRPC client connection. func (hn *HarnessNode) ConnectRPC() (*grpc.ClientConn, error) { // If we should use a macaroon, always take the admin macaroon as a // default. mac, err := hn.ReadMacaroon(hn.Cfg.AdminMacPath, wait.DefaultTimeout) if err != nil { return nil, err } return hn.ConnectRPCWithMacaroon(mac) } // SetExtraArgs assigns the ExtraArgs field for the node's configuration. The // changes will take effect on restart. func (hn *HarnessNode) SetExtraArgs(extraArgs []string) { hn.Cfg.ExtraArgs = extraArgs } // StartLndCmd handles the startup of lnd, creating log files, and possibly // kills the process when needed. func (hn *HarnessNode) StartLndCmd(ctxb context.Context) error { // Init the run context. hn.runCtx, hn.cancel = context.WithCancel(ctxb) args := hn.Cfg.GenArgs() hn.cmd = exec.Command(hn.Cfg.LndBinary, args...) // Redirect stderr output to buffer var errb bytes.Buffer hn.cmd.Stderr = &errb // If the logoutput flag is passed, redirect output from the nodes to // log files. if *logOutput { err := addLogFile(hn) if err != nil { return err } } // Start the process. if err := hn.cmd.Start(); err != nil { return err } return nil } // StartWithNoAuth will start the lnd process, creates the grpc connection // without macaroon auth, and waits until the server is reported as waiting to // start. // // NOTE: caller needs to take extra step to create and unlock the wallet. func (hn *HarnessNode) StartWithNoAuth(ctxt context.Context) error { // Start lnd process and prepare logs. if err := hn.StartLndCmd(ctxt); err != nil { return fmt.Errorf("start lnd error: %w", err) } // Create an unauthed connection. conn, err := hn.ConnectRPCWithMacaroon(nil) if err != nil { return fmt.Errorf("ConnectRPCWithMacaroon err: %w", err) } // Since the conn is not authed, only the `WalletUnlocker` and `State` // clients can be inited from this conn. hn.conn = conn hn.RPC = rpc.NewHarnessRPC(hn.runCtx, hn.T, conn, hn.Name()) // Wait till the server is starting. return hn.WaitUntilStarted() } // Start will start the lnd process, creates the grpc connection, and waits // until the server is fully started. func (hn *HarnessNode) Start(ctxt context.Context) error { // Start lnd process and prepare logs. if err := hn.StartLndCmd(ctxt); err != nil { return fmt.Errorf("start lnd error: %w", err) } // Since Stop uses the LightningClient to stop the node, if we fail to // get a connected client, we have to kill the process. conn, err := hn.ConnectRPC() if err != nil { err = fmt.Errorf("ConnectRPC err: %w", err) cmdErr := hn.Kill() if cmdErr != nil { err = fmt.Errorf("kill process got err: %w: %v", cmdErr, err) } return err } // Init the node by creating the RPC clients, initializing node's // internal state and watcher. hn.Initialize(conn) // Wait till the server is starting. if err := hn.WaitUntilStarted(); err != nil { return fmt.Errorf("waiting for start got: %w", err) } // Subscribe for topology updates. return hn.initLightningClient() } // InitNode waits until the main gRPC server is detected as active, then // complete the normal HarnessNode gRPC connection creation. A non-nil // `macBytes` indicates the node is initialized stateless, otherwise it will // use the admin macaroon. func (hn *HarnessNode) InitNode(macBytes []byte) error { var ( conn *grpc.ClientConn err error ) // If the node has been initialized stateless, we need to pass the // macaroon to the client. if macBytes != nil { adminMac := &macaroon.Macaroon{} err := adminMac.UnmarshalBinary(macBytes) if err != nil { return fmt.Errorf("unmarshal failed: %w", err) } conn, err = hn.ConnectRPCWithMacaroon(adminMac) if err != nil { return err } } else { // Normal initialization, we expect a macaroon to be in the // file system. conn, err = hn.ConnectRPC() if err != nil { return err } } // Init the node by creating the RPC clients, initializing node's // internal state and watcher. hn.Initialize(conn) // Wait till the server is starting. if err := hn.WaitUntilStarted(); err != nil { return fmt.Errorf("waiting for start got: %w", err) } return hn.initLightningClient() } // InitChangePassword initializes a harness node by passing the change password // request via RPC. After the request is submitted, this method will block until // a macaroon-authenticated RPC connection can be established to the harness // node. Once established, the new connection is used to initialize the // RPC clients and subscribes the HarnessNode to topology changes. func (hn *HarnessNode) ChangePasswordAndInit( req *lnrpc.ChangePasswordRequest) ( *lnrpc.ChangePasswordResponse, error) { response := hn.RPC.ChangePassword(req) return response, hn.InitNode(response.AdminMacaroon) } // waitTillServerState makes a subscription to the server's state change and // blocks until the server is in the targeted state. func (hn *HarnessNode) waitTillServerState( predicate func(state lnrpc.WalletState) bool) error { client := hn.RPC.SubscribeState() errChan := make(chan error, 1) done := make(chan struct{}) go func() { for { resp, err := client.Recv() if err != nil { errChan <- err return } if predicate(resp.State) { close(done) return } } }() for { select { case <-time.After(wait.NodeStartTimeout): return fmt.Errorf("timeout waiting for server state") case err := <-errChan: return fmt.Errorf("receive server state err: %w", err) case <-done: return nil } } } // initLightningClient blocks until the lnd server is fully started and // subscribes the harness node to graph topology updates. This method also // spawns a lightning network watcher for this node, which watches for topology // changes. func (hn *HarnessNode) initLightningClient() error { // Wait until the server is fully started. if err := hn.WaitUntilServerActive(); err != nil { return fmt.Errorf("waiting for server active: %w", err) } // Set the harness node's pubkey to what the node claims in GetInfo. // The RPC must have been started at this point. if err := hn.attachPubKey(); err != nil { return err } // Launch the watcher that will hook into graph related topology change // from the PoV of this node. started := make(chan error, 1) go hn.Watcher.topologyWatcher(hn.runCtx, started) select { // First time reading the channel indicates the topology client is // started. case err := <-started: if err != nil { return fmt.Errorf("create topology client stream "+ "got err: %v", err) } case <-time.After(wait.DefaultTimeout): return fmt.Errorf("timeout creating topology client stream") } // Catch topology client stream error inside a goroutine. go func() { select { case err := <-started: hn.printErrf("topology client: %v", err) case <-hn.runCtx.Done(): } }() return nil } // attachPubKey queries an unlocked node to retrieve its public key. func (hn *HarnessNode) attachPubKey() error { // Obtain the lnid of this node for quick identification purposes. info := hn.RPC.GetInfo() hn.PubKeyStr = info.IdentityPubkey pubkey, err := hex.DecodeString(info.IdentityPubkey) if err != nil { return err } copy(hn.PubKey[:], pubkey) return nil } // cleanup cleans up all the temporary files created by the node's process. func (hn *HarnessNode) cleanup() error { if hn.Cfg.backupDBDir != "" { err := os.RemoveAll(hn.Cfg.backupDBDir) if err != nil { return fmt.Errorf("unable to remove backup dir: %w", err) } } return os.RemoveAll(hn.Cfg.BaseDir) } // waitForProcessExit Launch a new goroutine which that bubbles up any // potential fatal process errors to the goroutine running the tests. func (hn *HarnessNode) WaitForProcessExit() error { var err error errChan := make(chan error, 1) go func() { err = hn.cmd.Wait() errChan <- err }() select { case err := <-errChan: if err == nil { break } // If the process has already been canceled, we can exit early // as the logs have already been saved. if strings.Contains(err.Error(), "Wait was already called") { return nil } // Otherwise, we print the error, break the select and save // logs. hn.printErrf("wait process exit got err: %v", err) break case <-time.After(wait.DefaultTimeout): err = errors.New("timeout waiting for process to exit") hn.printErrf(err.Error()) } // Make sure log file is closed and renamed if necessary. finalizeLogfile(hn) // Rename the etcd.log file if the node was running on embedded // etcd. finalizeEtcdLog(hn) return err } // Stop attempts to stop the active lnd process. func (hn *HarnessNode) Stop() error { // Do nothing if the process is not running. if hn.runCtx == nil { hn.printErrf("found nil run context") return nil } // Stop the runCtx. hn.cancel() // If we ever reaches the state where `Watcher` is initialized, it // means the node has an authed connection and all its RPC clients are // ready for use. Thus we will try to stop it via the RPC. if hn.Watcher != nil { // Don't watch for error because sometimes the RPC connection // gets closed before a response is returned. req := lnrpc.StopRequest{} ctxt, cancel := context.WithCancel(context.Background()) defer cancel() err := wait.NoError(func() error { _, err := hn.RPC.LN.StopDaemon(ctxt, &req) switch { case err == nil: return nil // Try again if a recovery/rescan is in progress. case strings.Contains( err.Error(), "recovery in progress", ): return err default: return nil } }, wait.DefaultTimeout) if err != nil { return err } // Wait for goroutines to be finished. done := make(chan struct{}) go func() { hn.Watcher.wg.Wait() close(done) }() // If the goroutines fail to finish before timeout, we'll print // the error to console and continue. select { case <-time.After(wait.DefaultTimeout): hn.printErrf("timeout on wait group") case <-done: } } else { // If the rpc clients are not initiated, we'd kill the process // manually. hn.printErrf("found nil RPC clients") if err := hn.Kill(); err != nil { // Skip the error if the process is already dead. if !strings.Contains( err.Error(), "process already finished", ) { return fmt.Errorf("killing process got: %w", err) } } } // Close any attempts at further grpc connections. if hn.conn != nil { if err := hn.CloseConn(); err != nil { return err } } // Wait for lnd process to exit in the end. return hn.WaitForProcessExit() } // CloseConn closes the grpc connection. func (hn *HarnessNode) CloseConn() error { err := status.Code(hn.conn.Close()) switch err { case codes.OK: return nil // When the context is canceled above, we might get the // following error as the context is no longer active. case codes.Canceled: return nil case codes.Unknown: return fmt.Errorf("unknown error attempting to stop "+ "grpc client: %v", err) default: return fmt.Errorf("error attempting to stop "+ "grpc client: %v", err) } } // Shutdown stops the active lnd process and cleans up any temporary // directories created along the way. func (hn *HarnessNode) Shutdown() error { if err := hn.Stop(); err != nil { return err } if err := hn.cleanup(); err != nil { return err } return nil } // Kill kills the lnd process. func (hn *HarnessNode) Kill() error { return hn.cmd.Process.Kill() } // printErrf prints an error to the console. func (hn *HarnessNode) printErrf(format string, a ...interface{}) { fmt.Printf("itest error from [%s:%s]: %s\n", //nolint:forbidigo hn.Cfg.LogFilenamePrefix, hn.Cfg.Name, fmt.Sprintf(format, a...)) } // BackupDB creates a backup of the current database. func (hn *HarnessNode) BackupDB() error { if hn.Cfg.backupDBDir != "" { return fmt.Errorf("backup already created") } if hn.Cfg.postgresDBName != "" { // Backup database. backupDBName := hn.Cfg.postgresDBName + "_backup" err := executePgQuery( "CREATE DATABASE " + backupDBName + " WITH TEMPLATE " + hn.Cfg.postgresDBName, ) if err != nil { return err } } else { // Backup files. tempDir, err := os.MkdirTemp("", "past-state") if err != nil { return fmt.Errorf("unable to create temp db folder: %w", err) } if err := copyAll(tempDir, hn.Cfg.DBDir()); err != nil { return fmt.Errorf("unable to copy database files: %w", err) } hn.Cfg.backupDBDir = tempDir } return nil } // RestoreDB restores a database backup. func (hn *HarnessNode) RestoreDB() error { if hn.Cfg.postgresDBName != "" { // Restore database. backupDBName := hn.Cfg.postgresDBName + "_backup" err := executePgQuery( "DROP DATABASE " + hn.Cfg.postgresDBName, ) if err != nil { return err } err = executePgQuery( "ALTER DATABASE " + backupDBName + " RENAME TO " + hn.Cfg.postgresDBName, ) if err != nil { return err } } else { // Restore files. if hn.Cfg.backupDBDir == "" { return fmt.Errorf("no database backup created") } err := copyAll(hn.Cfg.DBDir(), hn.Cfg.backupDBDir) if err != nil { return fmt.Errorf("unable to copy database files: %w", err) } if err := os.RemoveAll(hn.Cfg.backupDBDir); err != nil { return fmt.Errorf("unable to remove backup dir: %w", err) } hn.Cfg.backupDBDir = "" } return nil } // UpdateGlobalPolicy updates a node's global channel policy. func (hn *HarnessNode) UpdateGlobalPolicy(policy *lnrpc.RoutingPolicy) { updateFeeReq := &lnrpc.PolicyUpdateRequest{ BaseFeeMsat: policy.FeeBaseMsat, FeeRate: float64(policy.FeeRateMilliMsat) / float64(1_000_000), TimeLockDelta: policy.TimeLockDelta, Scope: &lnrpc.PolicyUpdateRequest_Global{Global: true}, MaxHtlcMsat: policy.MaxHtlcMsat, } hn.RPC.UpdateChannelPolicy(updateFeeReq) } func postgresDatabaseDsn(dbName string) string { return fmt.Sprintf(postgresDsn, dbName) } // createTempPgDB creates a temp postgres database. func createTempPgDB() (string, error) { // Create random database name. randBytes := make([]byte, 8) _, err := rand.Read(randBytes) if err != nil { return "", err } dbName := "itest_" + hex.EncodeToString(randBytes) // Create database. err = executePgQuery("CREATE DATABASE " + dbName) if err != nil { return "", err } return dbName, nil } // executePgQuery executes a SQL statement in a postgres db. func executePgQuery(query string) error { pool, err := pgxpool.Connect( context.Background(), postgresDatabaseDsn("postgres"), ) if err != nil { return fmt.Errorf("unable to connect to database: %w", err) } defer pool.Close() _, err = pool.Exec(context.Background(), query) return err } // renameFile is a helper to rename (log) files created during integration // tests. func renameFile(fromFileName, toFileName string) { err := os.Rename(fromFileName, toFileName) if err != nil { fmt.Printf("could not rename %s to %s: %v\n", // nolint:forbidigo fromFileName, toFileName, err) } } // getFinalizedLogFilePrefix returns the finalize log filename. func getFinalizedLogFilePrefix(hn *HarnessNode) string { pubKeyHex := hex.EncodeToString( hn.PubKey[:logPubKeyBytes], ) return fmt.Sprintf("%s/%d-%s-%s-%s", GetLogDir(), hn.Cfg.NodeID, hn.Cfg.LogFilenamePrefix, hn.Cfg.Name, pubKeyHex) } // finalizeLogfile makes sure the log file cleanup function is initialized, // even if no log file is created. func finalizeLogfile(hn *HarnessNode) { // Exit early if there's no log file. if hn.logFile == nil { return } hn.logFile.Close() // If logoutput flag is not set, return early. if !*logOutput { return } newFileName := fmt.Sprintf("%v.log", getFinalizedLogFilePrefix(hn), ) renameFile(hn.filename, newFileName) } // finalizeEtcdLog saves the etcd log files when test ends. func finalizeEtcdLog(hn *HarnessNode) { // Exit early if this is not etcd backend. if hn.Cfg.DBBackend != BackendEtcd { return } etcdLogFileName := fmt.Sprintf("%s/etcd.log", hn.Cfg.LogDir) newEtcdLogFileName := fmt.Sprintf("%v-etcd.log", getFinalizedLogFilePrefix(hn), ) renameFile(etcdLogFileName, newEtcdLogFileName) } // addLogFile creates log files used by this node. func addLogFile(hn *HarnessNode) error { var fileName string dir := GetLogDir() fileName = fmt.Sprintf("%s/%d-%s-%s-%s.log", dir, hn.Cfg.NodeID, hn.Cfg.LogFilenamePrefix, hn.Cfg.Name, hex.EncodeToString(hn.PubKey[:logPubKeyBytes])) // If the node's PubKey is not yet initialized, create a temporary file // name. Later, after the PubKey has been initialized, the file can be // moved to its final name with the PubKey included. if bytes.Equal(hn.PubKey[:4], []byte{0, 0, 0, 0}) { fileName = fmt.Sprintf("%s/%d-%s-%s-tmp__.log", dir, hn.Cfg.NodeID, hn.Cfg.LogFilenamePrefix, hn.Cfg.Name) } // Create file if not exists, otherwise append. file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) if err != nil { return err } // Pass node's stderr to both errb and the file. w := io.MultiWriter(hn.cmd.Stderr, file) hn.cmd.Stderr = w // Pass the node's stdout only to the file. hn.cmd.Stdout = file // Let the node keep a reference to this file, such that we can add to // it if necessary. hn.logFile = file hn.filename = fileName return nil } // copyAll copies all files and directories from srcDir to dstDir recursively. // Note that this function does not support links. func copyAll(dstDir, srcDir string) error { entries, err := os.ReadDir(srcDir) if err != nil { return err } for _, entry := range entries { srcPath := filepath.Join(srcDir, entry.Name()) dstPath := filepath.Join(dstDir, entry.Name()) info, err := os.Stat(srcPath) if err != nil { return err } if info.IsDir() { err := os.Mkdir(dstPath, info.Mode()) if err != nil && !os.IsExist(err) { return err } err = copyAll(dstPath, srcPath) if err != nil { return err } } else if err := CopyFile(dstPath, srcPath); err != nil { return err } } return nil }