From f8cf7c8775a941bc4a7ca6797bc99f1446d81b49 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sat, 18 Sep 2021 13:00:39 +0800 Subject: [PATCH 1/7] itest: shorten functions inside harness node This commit refactors the long function start() into smaller pieces and moves commonly used functions into test_common.go. --- lntest/harness_net.go | 23 ++ lntest/{node.go => harness_node.go} | 394 ++++++++++++---------------- lntest/test_common.go | 48 ++++ 3 files changed, 236 insertions(+), 229 deletions(-) rename lntest/{node.go => harness_node.go} (88%) diff --git a/lntest/harness_net.go b/lntest/harness_net.go index a3633e0c0..76e571274 100644 --- a/lntest/harness_net.go +++ b/lntest/harness_net.go @@ -1746,3 +1746,26 @@ func (n *NetworkHarness) RestoreDb(hn *HarnessNode) error { return nil } + +// getChanPointFundingTxid returns the given channel point's funding txid in +// raw bytes. +func getChanPointFundingTxid(chanPoint *lnrpc.ChannelPoint) ([]byte, error) { + var txid []byte + + // A channel point's funding txid can be get/set as a byte slice or a + // string. In the case it is a string, decode it. + switch chanPoint.GetFundingTxid().(type) { + case *lnrpc.ChannelPoint_FundingTxidBytes: + txid = chanPoint.GetFundingTxidBytes() + case *lnrpc.ChannelPoint_FundingTxidStr: + s := chanPoint.GetFundingTxidStr() + h, err := chainhash.NewHashFromStr(s) + if err != nil { + return nil, err + } + + txid = h[:] + } + + return txid, nil +} diff --git a/lntest/node.go b/lntest/harness_node.go similarity index 88% rename from lntest/node.go rename to lntest/harness_node.go index 87095b971..f115c9c2f 100644 --- a/lntest/node.go +++ b/lntest/harness_node.go @@ -6,7 +6,6 @@ import ( "crypto/rand" "encoding/hex" "encoding/json" - "errors" "fmt" "io" "io/ioutil" @@ -19,7 +18,6 @@ import ( "time" "github.com/btcsuite/btcd/chaincfg" - "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/integration/rpctest" "github.com/btcsuite/btcd/rpcclient" "github.com/btcsuite/btcd/wire" @@ -190,28 +188,32 @@ func (cfg NodeConfig) genArgs() []string { backendArgs := cfg.BackendCfg.GenArgs() args = append(args, backendArgs...) - args = append(args, "--bitcoin.active") - args = append(args, "--nobootstrap") - args = append(args, "--debuglevel=debug") - args = append(args, "--bitcoin.defaultchanconfs=1") - args = append(args, fmt.Sprintf("--db.batch-commit-interval=%v", commitInterval)) - args = append(args, fmt.Sprintf("--bitcoin.defaultremotedelay=%v", DefaultCSV)) - args = append(args, fmt.Sprintf("--rpclisten=%v", cfg.RPCAddr())) - args = append(args, fmt.Sprintf("--restlisten=%v", cfg.RESTAddr())) - args = append(args, fmt.Sprintf("--restcors=https://%v", cfg.RESTAddr())) - args = append(args, fmt.Sprintf("--listen=%v", cfg.P2PAddr())) - args = append(args, fmt.Sprintf("--externalip=%v", cfg.P2PAddr())) - args = append(args, fmt.Sprintf("--logdir=%v", cfg.LogDir)) - args = append(args, fmt.Sprintf("--datadir=%v", cfg.DataDir)) - args = append(args, fmt.Sprintf("--tlscertpath=%v", cfg.TLSCertPath)) - args = append(args, fmt.Sprintf("--tlskeypath=%v", cfg.TLSKeyPath)) - args = append(args, fmt.Sprintf("--configfile=%v", cfg.DataDir)) - args = append(args, fmt.Sprintf("--adminmacaroonpath=%v", cfg.AdminMacPath)) - args = append(args, fmt.Sprintf("--readonlymacaroonpath=%v", cfg.ReadMacPath)) - args = append(args, fmt.Sprintf("--invoicemacaroonpath=%v", cfg.InvoiceMacPath)) - args = append(args, fmt.Sprintf("--trickledelay=%v", trickleDelay)) - args = append(args, fmt.Sprintf("--profile=%d", cfg.ProfilePort)) - args = append(args, fmt.Sprintf("--caches.rpc-graph-cache-duration=0")) + + nodeArgs := []string{ + "--bitcoin.active", + "--nobootstrap", + "--debuglevel=debug", + "--bitcoin.defaultchanconfs=1", + fmt.Sprintf("--db.batch-commit-interval=%v", commitInterval), + fmt.Sprintf("--bitcoin.defaultremotedelay=%v", DefaultCSV), + fmt.Sprintf("--rpclisten=%v", cfg.RPCAddr()), + fmt.Sprintf("--restlisten=%v", cfg.RESTAddr()), + fmt.Sprintf("--restcors=https://%v", cfg.RESTAddr()), + fmt.Sprintf("--listen=%v", cfg.P2PAddr()), + fmt.Sprintf("--externalip=%v", cfg.P2PAddr()), + fmt.Sprintf("--logdir=%v", cfg.LogDir), + fmt.Sprintf("--datadir=%v", cfg.DataDir), + fmt.Sprintf("--tlscertpath=%v", cfg.TLSCertPath), + fmt.Sprintf("--tlskeypath=%v", cfg.TLSKeyPath), + fmt.Sprintf("--configfile=%v", cfg.DataDir), + fmt.Sprintf("--adminmacaroonpath=%v", cfg.AdminMacPath), + fmt.Sprintf("--readonlymacaroonpath=%v", cfg.ReadMacPath), + fmt.Sprintf("--invoicemacaroonpath=%v", cfg.InvoiceMacPath), + fmt.Sprintf("--trickledelay=%v", trickleDelay), + fmt.Sprintf("--profile=%d", cfg.ProfilePort), + fmt.Sprintf("--caches.rpc-graph-cache-duration=0"), + } + args = append(args, nodeArgs...) if !cfg.HasSeed { args = append(args, "--noseedbackup") @@ -295,7 +297,6 @@ type HarnessNode struct { PubKeyStr string cmd *exec.Cmd - pidFile string logFile *os.File // chanWatchRequests receives a request for watching a particular event @@ -358,6 +359,16 @@ var _ lnrpc.LightningClient = (*HarnessNode)(nil) var _ lnrpc.WalletUnlockerClient = (*HarnessNode)(nil) var _ invoicesrpc.InvoicesClient = (*HarnessNode)(nil) +// nextNodeID generates a unique sequence to be used as the node's ID. +func nextNodeID() int { + numActiveNodesMtx.Lock() + defer numActiveNodesMtx.Unlock() + nodeNum := numActiveNodes + numActiveNodes++ + + return nodeNum +} + // newNode creates a new test lightning node instance from the passed config. func newNode(cfg NodeConfig) (*HarnessNode, error) { if cfg.BaseDir == "" { @@ -397,14 +408,9 @@ func newNode(cfg NodeConfig) (*HarnessNode, error) { cfg.PostgresDsn = postgresDatabaseDsn(dbName) } - numActiveNodesMtx.Lock() - nodeNum := numActiveNodes - numActiveNodes++ - numActiveNodesMtx.Unlock() - return &HarnessNode{ Cfg: &cfg, - NodeID: nodeNum, + NodeID: nextNodeID(), chanWatchRequests: make(chan *chanWatchRequest), openChans: make(map[wire.OutPoint]int), openChanWatchers: make(map[wire.OutPoint][]chan struct{}), @@ -621,29 +627,9 @@ func (hn *HarnessNode) InvoiceMacPath() string { return hn.Cfg.InvoiceMacPath } -// renameFile is a helper to rename (log) files created during integration tests. -func renameFile(fromFileName, toFileName string) { - err := os.Rename(fromFileName, toFileName) - if err != nil { - fmt.Printf("could not rename %s to %s: %v\n", - fromFileName, toFileName, err) - } -} - -// Start launches a new process running lnd. Additionally, the PID of the -// launched process is saved in order to possibly kill the process forcibly -// later. -// -// This may not clean up properly if an error is returned, so the caller should -// call shutdown() regardless of the return value. -func (hn *HarnessNode) start(lndBinary string, lndError chan<- error, - wait bool) error { - - // Init the runCtx. - ctxt, cancel := context.WithCancel(context.Background()) - hn.runCtx = ctxt - hn.cancel = cancel - +// startLnd handles the startup of lnd, creating log files, and possibly kills +// the process when needed. +func (hn *HarnessNode) startLnd(lndBinary string, lndError chan<- error) error { args := hn.Cfg.genArgs() hn.cmd = exec.Command(lndBinary, args...) @@ -651,87 +637,17 @@ func (hn *HarnessNode) start(lndBinary string, lndError chan<- error, var errb bytes.Buffer hn.cmd.Stderr = &errb - // Make sure the log file cleanup function is initialized, even - // if no log file is created. - var finalizeLogfile = func() { - if hn.logFile != nil { - hn.logFile.Close() - } - } - - getFinalizedLogFilePrefix := func() string { - pubKeyHex := hex.EncodeToString( - hn.PubKey[:logPubKeyBytes], - ) - - return fmt.Sprintf("%s/%d-%s-%s-%s", - GetLogDir(), hn.NodeID, - hn.Cfg.LogFilenamePrefix, - hn.Cfg.Name, pubKeyHex) - } - - finalizeEtcdLog := func() { - if hn.Cfg.DbBackend != BackendEtcd { - return - } - - etcdLogFileName := fmt.Sprintf("%s/etcd.log", hn.Cfg.LogDir) - newEtcdLogFileName := fmt.Sprintf("%v-etcd.log", - getFinalizedLogFilePrefix(), - ) - - renameFile(etcdLogFileName, newEtcdLogFileName) - } - // If the logoutput flag is passed, redirect output from the nodes to // log files. + var ( + fileName string + err error + ) if *logOutput { - dir := GetLogDir() - fileName := fmt.Sprintf("%s/%d-%s-%s-%s.log", dir, hn.NodeID, - hn.Cfg.LogFilenamePrefix, hn.Cfg.Name, - hex.EncodeToString(hn.PubKey[:logPubKeyBytes])) - - // If the node's PubKey is not yet initialized, create a - // temporary file name. Later, after the PubKey has been - // initialized, the file can be moved to its final name with - // the PubKey included. - if bytes.Equal(hn.PubKey[:4], []byte{0, 0, 0, 0}) { - fileName = fmt.Sprintf("%s/%d-%s-%s-tmp__.log", dir, - hn.NodeID, hn.Cfg.LogFilenamePrefix, - hn.Cfg.Name) - } - - // Once the node has done its work, the log file can be - // renamed. - finalizeLogfile = func() { - if hn.logFile != nil { - hn.logFile.Close() - - newFileName := fmt.Sprintf("%v.log", - getFinalizedLogFilePrefix(), - ) - - renameFile(fileName, newFileName) - } - } - - // Create file if not exists, otherwise append. - file, err := os.OpenFile(fileName, - os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) + fileName, err = addLogFile(hn) if err != nil { return err } - - // Pass node's stderr to both errb and the file. - w := io.MultiWriter(&errb, file) - hn.cmd.Stderr = w - - // Pass the node's stdout only to the file. - hn.cmd.Stdout = file - - // Let the node keep a reference to this file, such - // that we can add to it if necessary. - hn.logFile = file } if err := hn.cmd.Start(); err != nil { @@ -750,21 +666,32 @@ func (hn *HarnessNode) start(lndBinary string, lndError chan<- error, } // Make sure log file is closed and renamed if necessary. - finalizeLogfile() + finalizeLogfile(hn, fileName) // Rename the etcd.log file if the node was running on embedded // etcd. - finalizeEtcdLog() + finalizeEtcdLog(hn) }() - // Write process ID to a file. - if err := hn.writePidFile(); err != nil { - err = fmt.Errorf("writePidFile err: %w", err) - cmdErr := hn.cmd.Process.Kill() - if cmdErr != nil { - err = fmt.Errorf("kill process got err: %w: %v", - cmdErr, err) - } + return nil +} + +// Start launches a new process running lnd. Additionally, the PID of the +// launched process is saved in order to possibly kill the process forcibly +// later. +// +// This may not clean up properly if an error is returned, so the caller should +// call shutdown() regardless of the return value. +func (hn *HarnessNode) start(lndBinary string, lndError chan<- error, + wait bool) error { + + // Init the runCtx. + ctxt, cancel := context.WithCancel(context.Background()) + hn.runCtx = ctxt + hn.cancel = cancel + + // Start lnd and prepare logs. + if err := hn.startLnd(lndBinary, lndError); err != nil { return err } @@ -872,7 +799,8 @@ func (hn *HarnessNode) waitForState(conn grpc.ClientConnInterface, // WaitUntilLeader attempts to finish the start procedure by initiating an RPC // connection and setting up the wallet unlocker client. This is needed when // a node that has recently been started was waiting to become the leader and -// we're at the point when we expect that it is the leader now (awaiting unlock). +// we're at the point when we expect that it is the leader now (awaiting +// unlock). func (hn *HarnessNode) WaitUntilLeader(timeout time.Duration) error { var ( conn *grpc.ClientConn @@ -1124,25 +1052,6 @@ func (hn *HarnessNode) AddToLog(format string, a ...interface{}) { } } -// writePidFile writes the process ID of the running lnd process to a .pid file. -func (hn *HarnessNode) writePidFile() error { - filePath := filepath.Join(hn.Cfg.BaseDir, fmt.Sprintf("%v.pid", hn.NodeID)) - - pid, err := os.Create(filePath) - if err != nil { - return err - } - defer pid.Close() - - _, err = fmt.Fprintf(pid, "%v\n", hn.cmd.Process.Pid) - if err != nil { - return err - } - - hn.pidFile = filePath - return nil -} - // ReadMacaroon waits a given duration for the macaroon file to be created. If // the file is readable within the timeout, its content is de-serialized as a // macaroon and returned. @@ -1239,7 +1148,8 @@ func (hn *HarnessNode) cleanup() error { if hn.backupDbDir != "" { err := os.RemoveAll(hn.backupDbDir) if err != nil { - return fmt.Errorf("unable to remove backup dir: %v", err) + return fmt.Errorf("unable to remove backup dir: %v", + err) } } @@ -1332,8 +1242,8 @@ func (hn *HarnessNode) stop() error { return nil } -// shutdown stops the active lnd process and cleans up any temporary directories -// created along the way. +// shutdown stops the active lnd process and cleans up any temporary +// directories created along the way. func (hn *HarnessNode) shutdown() error { if err := hn.stop(); err != nil { return err @@ -1380,29 +1290,6 @@ type chanWatchRequest struct { includeUnannounced bool } -// getChanPointFundingTxid returns the given channel point's funding txid in -// raw bytes. -func getChanPointFundingTxid(chanPoint *lnrpc.ChannelPoint) ([]byte, error) { - var txid []byte - - // A channel point's funding txid can be get/set as a byte slice or a - // string. In the case it is a string, decode it. - switch chanPoint.GetFundingTxid().(type) { - case *lnrpc.ChannelPoint_FundingTxidBytes: - txid = chanPoint.GetFundingTxidBytes() - case *lnrpc.ChannelPoint_FundingTxidStr: - s := chanPoint.GetFundingTxidStr() - h, err := chainhash.NewHashFromStr(s) - if err != nil { - return nil, err - } - - txid = h[:] - } - - return txid, nil -} - func (hn *HarnessNode) checkChanPointInGraph(chanPoint wire.OutPoint) bool { ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout) @@ -1667,19 +1554,6 @@ func (hn *HarnessNode) PrintErr(format string, a ...interface{}) { hn.Cfg.Name, fmt.Sprintf(format, a...)) } -// MakeOutpoint returns the outpoint of the channel's funding transaction. -func MakeOutpoint(chanPoint *lnrpc.ChannelPoint) (wire.OutPoint, error) { - fundingTxID, err := lnrpc.GetChanPointFundingTxid(chanPoint) - if err != nil { - return wire.OutPoint{}, err - } - - return wire.OutPoint{ - Hash: *fundingTxID, - Index: chanPoint.OutputIndex, - }, nil -} - // handleChannelEdgeUpdates takes a series of channel edge updates, extracts // the outpoints, and saves them to harness node's internal state. func (hn *HarnessNode) handleChannelEdgeUpdates( @@ -1903,37 +1777,6 @@ func (hn *HarnessNode) receiveTopologyClientStream( } } -// CheckChannelPolicy checks that the policy matches the expected one. -func CheckChannelPolicy(policy, expectedPolicy *lnrpc.RoutingPolicy) error { - if policy.FeeBaseMsat != expectedPolicy.FeeBaseMsat { - return fmt.Errorf("expected base fee %v, got %v", - expectedPolicy.FeeBaseMsat, policy.FeeBaseMsat) - } - if policy.FeeRateMilliMsat != expectedPolicy.FeeRateMilliMsat { - return fmt.Errorf("expected fee rate %v, got %v", - expectedPolicy.FeeRateMilliMsat, - policy.FeeRateMilliMsat) - } - if policy.TimeLockDelta != expectedPolicy.TimeLockDelta { - return fmt.Errorf("expected time lock delta %v, got %v", - expectedPolicy.TimeLockDelta, - policy.TimeLockDelta) - } - if policy.MinHtlc != expectedPolicy.MinHtlc { - return fmt.Errorf("expected min htlc %v, got %v", - expectedPolicy.MinHtlc, policy.MinHtlc) - } - if policy.MaxHtlcMsat != expectedPolicy.MaxHtlcMsat { - return fmt.Errorf("expected max htlc %v, got %v", - expectedPolicy.MaxHtlcMsat, policy.MaxHtlcMsat) - } - if policy.Disabled != expectedPolicy.Disabled { - return errors.New("edge should be disabled but isn't") - } - - return nil -} - // handlePolicyUpdateWatchRequest checks that if the expected policy can be // found either in the node's interval state or describe graph response. If // found, it will signal the request by closing the event channel. Otherwise it @@ -2007,3 +1850,96 @@ func (hn *HarnessNode) getChannelPolicies(include bool) policyUpdateMap { return policyUpdates } + +// renameFile is a helper to rename (log) files created during integration +// tests. +func renameFile(fromFileName, toFileName string) { + err := os.Rename(fromFileName, toFileName) + if err != nil { + fmt.Printf("could not rename %s to %s: %v\n", + fromFileName, toFileName, err) + } +} + +// getFinalizedLogFilePrefix returns the finalize log filename. +func getFinalizedLogFilePrefix(hn *HarnessNode) string { + pubKeyHex := hex.EncodeToString( + hn.PubKey[:logPubKeyBytes], + ) + + return fmt.Sprintf("%s/%d-%s-%s-%s", + GetLogDir(), hn.NodeID, + hn.Cfg.LogFilenamePrefix, + hn.Cfg.Name, pubKeyHex) +} + +// finalizeLogfile makes sure the log file cleanup function is initialized, +// even if no log file is created. +func finalizeLogfile(hn *HarnessNode, fileName string) { + if hn.logFile != nil { + hn.logFile.Close() + + // If logoutput flag is not set, return early. + if !*logOutput { + return + } + + newFileName := fmt.Sprintf("%v.log", + getFinalizedLogFilePrefix(hn), + ) + + renameFile(fileName, newFileName) + } +} + +func finalizeEtcdLog(hn *HarnessNode) { + if hn.Cfg.DbBackend != BackendEtcd { + return + } + + etcdLogFileName := fmt.Sprintf("%s/etcd.log", hn.Cfg.LogDir) + newEtcdLogFileName := fmt.Sprintf("%v-etcd.log", + getFinalizedLogFilePrefix(hn), + ) + + renameFile(etcdLogFileName, newEtcdLogFileName) +} + +func addLogFile(hn *HarnessNode) (string, error) { + var fileName string + + dir := GetLogDir() + fileName = fmt.Sprintf("%s/%d-%s-%s-%s.log", dir, hn.NodeID, + hn.Cfg.LogFilenamePrefix, hn.Cfg.Name, + hex.EncodeToString(hn.PubKey[:logPubKeyBytes])) + + // If the node's PubKey is not yet initialized, create a + // temporary file name. Later, after the PubKey has been + // initialized, the file can be moved to its final name with + // the PubKey included. + if bytes.Equal(hn.PubKey[:4], []byte{0, 0, 0, 0}) { + fileName = fmt.Sprintf("%s/%d-%s-%s-tmp__.log", dir, + hn.NodeID, hn.Cfg.LogFilenamePrefix, + hn.Cfg.Name) + } + + // Create file if not exists, otherwise append. + file, err := os.OpenFile(fileName, + os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) + if err != nil { + return fileName, err + } + + // Pass node's stderr to both errb and the file. + w := io.MultiWriter(hn.cmd.Stderr, file) + hn.cmd.Stderr = w + + // Pass the node's stdout only to the file. + hn.cmd.Stdout = file + + // Let the node keep a reference to this file, such + // that we can add to it if necessary. + hn.logFile = file + + return fileName, nil +} diff --git a/lntest/test_common.go b/lntest/test_common.go index 25d52e892..717694ceb 100644 --- a/lntest/test_common.go +++ b/lntest/test_common.go @@ -1,10 +1,14 @@ package lntest import ( + "errors" "flag" "fmt" "net" "sync/atomic" + + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/lnrpc" ) const ( @@ -114,3 +118,47 @@ func GenerateBtcdListenerAddresses() (string, string) { return fmt.Sprintf(listenerFormat, NextAvailablePort()), fmt.Sprintf(listenerFormat, NextAvailablePort()) } + +// MakeOutpoint returns the outpoint of the channel's funding transaction. +func MakeOutpoint(chanPoint *lnrpc.ChannelPoint) (wire.OutPoint, error) { + fundingTxID, err := lnrpc.GetChanPointFundingTxid(chanPoint) + if err != nil { + return wire.OutPoint{}, err + } + + return wire.OutPoint{ + Hash: *fundingTxID, + Index: chanPoint.OutputIndex, + }, nil +} + +// CheckChannelPolicy checks that the policy matches the expected one. +func CheckChannelPolicy(policy, expectedPolicy *lnrpc.RoutingPolicy) error { + if policy.FeeBaseMsat != expectedPolicy.FeeBaseMsat { + return fmt.Errorf("expected base fee %v, got %v", + expectedPolicy.FeeBaseMsat, policy.FeeBaseMsat) + } + if policy.FeeRateMilliMsat != expectedPolicy.FeeRateMilliMsat { + return fmt.Errorf("expected fee rate %v, got %v", + expectedPolicy.FeeRateMilliMsat, + policy.FeeRateMilliMsat) + } + if policy.TimeLockDelta != expectedPolicy.TimeLockDelta { + return fmt.Errorf("expected time lock delta %v, got %v", + expectedPolicy.TimeLockDelta, + policy.TimeLockDelta) + } + if policy.MinHtlc != expectedPolicy.MinHtlc { + return fmt.Errorf("expected min htlc %v, got %v", + expectedPolicy.MinHtlc, policy.MinHtlc) + } + if policy.MaxHtlcMsat != expectedPolicy.MaxHtlcMsat { + return fmt.Errorf("expected max htlc %v, got %v", + expectedPolicy.MaxHtlcMsat, policy.MaxHtlcMsat) + } + if policy.Disabled != expectedPolicy.Disabled { + return errors.New("edge should be disabled but isn't") + } + + return nil +} From f5f1289dab76d80c1529f371fc488f6033fd3973 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sat, 18 Sep 2021 13:45:55 +0800 Subject: [PATCH 2/7] itest: use node.rpc namespace inside harness net This commit adds a new struct RPCClients to better handle rpc clients. A private field, rpc, is added to HarnessNode to prevent direct access to its clients. Inside RPCClients, all clients are exported in case a test case need access to a specific client. --- lntest/harness_node.go | 167 ++++++++++++++++++++++++++--------------- 1 file changed, 106 insertions(+), 61 deletions(-) diff --git a/lntest/harness_node.go b/lntest/harness_node.go index f115c9c2f..bc1d85c27 100644 --- a/lntest/harness_node.go +++ b/lntest/harness_node.go @@ -296,8 +296,8 @@ type HarnessNode struct { PubKey [33]byte PubKeyStr string - cmd *exec.Cmd - logFile *os.File + // rpc holds a list of RPC clients. + rpc *RPCClients // chanWatchRequests receives a request for watching a particular event // for a given channel. @@ -317,41 +317,50 @@ type HarnessNode struct { // node and the outpoint. policyUpdates policyUpdateMap - // runCtx is a context with cancel method. It's used to signal when the - // node needs to quit, and used as the parent context when spawning - // children contexts for RPC requests. - runCtx context.Context - cancel context.CancelFunc - - wg sync.WaitGroup - - lnrpc.LightningClient - - lnrpc.WalletUnlockerClient - - invoicesrpc.InvoicesClient - - // SignerClient cannot be embedded because the name collisions of the - // methods SignMessage and VerifyMessage. - SignerClient signrpc.SignerClient - - // conn is the underlying connection to the grpc endpoint of the node. - conn *grpc.ClientConn - - // RouterClient, WalletKitClient, WatchtowerClient cannot be embedded, - // because a name collision would occur with LightningClient. - RouterClient routerrpc.RouterClient - WalletKitClient walletrpc.WalletKitClient - Watchtower watchtowerrpc.WatchtowerClient - WatchtowerClient wtclientrpc.WatchtowerClientClient - StateClient lnrpc.StateClient - // backupDbDir is the path where a database backup is stored, if any. backupDbDir string // postgresDbName is the name of the postgres database where lnd data is // stored in. postgresDbName string + + // runCtx is a context with cancel method. It's used to signal when the + // node needs to quit, and used as the parent context when spawning + // children contexts for RPC requests. + runCtx context.Context + cancel context.CancelFunc + + wg sync.WaitGroup + cmd *exec.Cmd + logFile *os.File + + // TODO(yy): remove + lnrpc.LightningClient + lnrpc.WalletUnlockerClient + invoicesrpc.InvoicesClient + SignerClient signrpc.SignerClient + RouterClient routerrpc.RouterClient + WalletKitClient walletrpc.WalletKitClient + Watchtower watchtowerrpc.WatchtowerClient + WatchtowerClient wtclientrpc.WatchtowerClientClient + StateClient lnrpc.StateClient +} + +// RPCClients wraps a list of RPC clients into a single struct for easier +// access. +type RPCClients struct { + // conn is the underlying connection to the grpc endpoint of the node. + conn *grpc.ClientConn + + LN lnrpc.LightningClient + WalletUnlocker lnrpc.WalletUnlockerClient + Invoice invoicesrpc.InvoicesClient + Signer signrpc.SignerClient + Router routerrpc.RouterClient + WalletKit walletrpc.WalletKitClient + Watchtower watchtowerrpc.WatchtowerClient + WatchtowerClient wtclientrpc.WatchtowerClientClient + State lnrpc.StateClient } // Assert *HarnessNode implements the lnrpc.LightningClient interface. @@ -719,16 +728,20 @@ func (hn *HarnessNode) start(lndBinary string, lndError chan<- error, return err } + // Init all the RPC clients. + hn.initRPCClients(conn) + // If the node was created with a seed, we will need to perform an // additional step to unlock the wallet. The connection returned will // only use the TLS certs, and can only perform operations necessary to // unlock the daemon. if hn.Cfg.HasSeed { + // TODO(yy): remove hn.WalletUnlockerClient = lnrpc.NewWalletUnlockerClient(conn) return nil } - return hn.initLightningClient(conn) + return hn.initLightningClient() } // WaitUntilStarted waits until the wallet state flips from "WAITING_TO_START". @@ -757,7 +770,7 @@ func (hn *HarnessNode) waitForState(conn grpc.ClientConnInterface, predicate func(state lnrpc.WalletState) bool) error { stateClient := lnrpc.NewStateClient(conn) - ctx, cancel := context.WithCancel(hn.runCtx) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() stateStream, err := stateClient.SubscribeState( @@ -820,16 +833,21 @@ func (hn *HarnessNode) WaitUntilLeader(timeout time.Duration) error { return err } + // Init all the RPC clients. + hn.initRPCClients(conn) + // If the node was created with a seed, we will need to perform an // additional step to unlock the wallet. The connection returned will // only use the TLS certs, and can only perform operations necessary to // unlock the daemon. if hn.Cfg.HasSeed { + // TODO(yy): remove hn.WalletUnlockerClient = lnrpc.NewWalletUnlockerClient(conn) + return nil } - return hn.initLightningClient(conn) + return hn.initLightningClient() } // initClientWhenReady waits until the main gRPC server is detected as active, @@ -847,7 +865,10 @@ func (hn *HarnessNode) initClientWhenReady(timeout time.Duration) error { return err } - return hn.initLightningClient(conn) + // Init all the RPC clients. + hn.initRPCClients(conn) + + return hn.initLightningClient() } // Init initializes a harness node by passing the init request via rpc. After @@ -889,7 +910,10 @@ func (hn *HarnessNode) Init( return nil, err } - return response, hn.initLightningClient(conn) + // Init all the RPC clients. + hn.initRPCClients(conn) + + return response, hn.initLightningClient() } // InitChangePassword initializes a harness node by passing the change password @@ -932,7 +956,10 @@ func (hn *HarnessNode) InitChangePassword( return nil, err } - return response, hn.initLightningClient(conn) + // Init all the RPC clients. + hn.initRPCClients(conn) + + return response, hn.initLightningClient() } // Unlock attempts to unlock the wallet of the target HarnessNode. This method @@ -945,7 +972,8 @@ func (hn *HarnessNode) Unlock(unlockReq *lnrpc.UnlockWalletRequest) error { // Otherwise, we'll need to unlock the node before it's able to start // up properly. - if _, err := hn.UnlockWallet(ctxt, unlockReq); err != nil { + _, err := hn.rpc.WalletUnlocker.UnlockWallet(ctxt, unlockReq) + if err != nil { return err } @@ -960,7 +988,7 @@ func (hn *HarnessNode) waitTillServerStarted() error { ctxt, cancel := context.WithTimeout(hn.runCtx, NodeStartTimeout) defer cancel() - client, err := hn.StateClient.SubscribeState( + client, err := hn.rpc.State.SubscribeState( ctxt, &lnrpc.SubscribeStateRequest{}, ) if err != nil { @@ -978,17 +1006,33 @@ func (hn *HarnessNode) waitTillServerStarted() error { return nil } } - } -// initLightningClient constructs the grpc LightningClient from the given client -// connection and subscribes the harness node to graph topology updates. -// This method also spawns a lightning network watcher for this node, -// which watches for topology changes. -func (hn *HarnessNode) initLightningClient(conn *grpc.ClientConn) error { +// initRPCClients initializes a list of RPC clients for the node. +func (hn *HarnessNode) initRPCClients(c *grpc.ClientConn) { + hn.rpc = &RPCClients{ + conn: c, + LN: lnrpc.NewLightningClient(c), + Invoice: invoicesrpc.NewInvoicesClient(c), + Router: routerrpc.NewRouterClient(c), + WalletKit: walletrpc.NewWalletKitClient(c), + WalletUnlocker: lnrpc.NewWalletUnlockerClient(c), + Watchtower: watchtowerrpc.NewWatchtowerClient(c), + WatchtowerClient: wtclientrpc.NewWatchtowerClientClient(c), + Signer: signrpc.NewSignerClient(c), + State: lnrpc.NewStateClient(c), + } +} + +// initLightningClient blocks until the lnd server is fully started and +// subscribes the harness node to graph topology updates. This method also +// spawns a lightning network watcher for this node, which watches for topology +// changes. +func (hn *HarnessNode) initLightningClient() error { + // TODO(yy): remove // Construct the LightningClient that will allow us to use the // HarnessNode directly for normal rpc operations. - hn.conn = conn + conn := hn.rpc.conn hn.LightningClient = lnrpc.NewLightningClient(conn) hn.InvoicesClient = invoicesrpc.NewInvoicesClient(conn) hn.RouterClient = routerrpc.NewRouterClient(conn) @@ -1021,8 +1065,7 @@ func (hn *HarnessNode) initLightningClient(conn *grpc.ClientConn) error { // FetchNodeInfo queries an unlocked node to retrieve its public key. func (hn *HarnessNode) FetchNodeInfo() error { // Obtain the lnid of this node for quick identification purposes. - ctxb := context.Background() - info, err := hn.GetInfo(ctxb, &lnrpc.GetInfoRequest{}) + info, err := hn.rpc.LN.GetInfo(hn.runCtx, &lnrpc.GetInfoRequest{}) if err != nil { return err } @@ -1163,21 +1206,23 @@ func (hn *HarnessNode) stop() error { return nil } - // If start() failed before creating a client, we will just wait for the + // If start() failed before creating clients, we will just wait for the // child process to die. - if hn.LightningClient != nil { - // Don't watch for error because sometimes the RPC connection gets - // closed before a response is returned. + if hn.rpc != nil && hn.rpc.LN != nil { + // Don't watch for error because sometimes the RPC connection + // gets closed before a response is returned. req := lnrpc.StopRequest{} err := wait.NoError(func() error { - _, err := hn.LightningClient.StopDaemon(hn.runCtx, &req) + _, err := hn.rpc.LN.StopDaemon(hn.runCtx, &req) switch { case err == nil: return nil // Try again if a recovery/rescan is in progress. - case strings.Contains(err.Error(), "recovery in progress"): + case strings.Contains( + err.Error(), "recovery in progress", + ): return err default: @@ -1217,8 +1262,8 @@ func (hn *HarnessNode) stop() error { hn.WatchtowerClient = nil // Close any attempts at further grpc connections. - if hn.conn != nil { - err := status.Code(hn.conn.Close()) + if hn.rpc.conn != nil { + err := status.Code(hn.rpc.conn.Close()) switch err { case codes.OK: return nil @@ -1497,7 +1542,7 @@ func (hn *HarnessNode) WaitForBlockchainSync() error { defer ticker.Stop() for { - resp, err := hn.GetInfo(ctxt, &lnrpc.GetInfoRequest{}) + resp, err := hn.rpc.LN.GetInfo(ctxt, &lnrpc.GetInfoRequest{}) if err != nil { return err } @@ -1525,7 +1570,7 @@ func (hn *HarnessNode) WaitForBalance(expectedBalance btcutil.Amount, var lastBalance btcutil.Amount doesBalanceMatch := func() bool { - balance, err := hn.WalletBalance(hn.runCtx, req) + balance, err := hn.rpc.LN.WalletBalance(hn.runCtx, req) if err != nil { return false } @@ -1695,7 +1740,7 @@ func (hn *HarnessNode) newTopologyClient( ctx context.Context) (topologyClient, error) { req := &lnrpc.GraphTopologySubscription{} - client, err := hn.SubscribeChannelGraph(ctx, req) + client, err := hn.rpc.LN.SubscribeChannelGraph(ctx, req) if err != nil { return nil, fmt.Errorf("%s(%d): unable to create topology "+ "client: %v (%s)", hn.Name(), hn.NodeID, err, @@ -1765,7 +1810,7 @@ func (hn *HarnessNode) receiveTopologyClientStream( default: // An expected error is returned, return and leave it // to be handled by the caller. - return fmt.Errorf("graph subscription err: %v", err) + return fmt.Errorf("graph subscription err: %w", err) } // Send the update or quit. @@ -1814,7 +1859,7 @@ func (hn *HarnessNode) getChannelPolicies(include bool) policyUpdateMap { ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout) defer cancel() - graph, err := hn.DescribeGraph(ctxt, &lnrpc.ChannelGraphRequest{ + graph, err := hn.rpc.LN.DescribeGraph(ctxt, &lnrpc.ChannelGraphRequest{ IncludeUnannounced: include, }) if err != nil { From 75da5cabc81faaaeb4f01ea10e149e35fa9f7a4e Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sat, 18 Sep 2021 14:19:16 +0800 Subject: [PATCH 3/7] itest: add method waitTillServerState in harness node --- lntest/harness_node.go | 124 +++++++----------- .../lnd_rpc_middleware_interceptor_test.go | 4 +- 2 files changed, 50 insertions(+), 78 deletions(-) diff --git a/lntest/harness_node.go b/lntest/harness_node.go index bc1d85c27..dc36535ba 100644 --- a/lntest/harness_node.go +++ b/lntest/harness_node.go @@ -724,13 +724,13 @@ func (hn *HarnessNode) start(lndBinary string, lndError chan<- error, return err } - if err := hn.WaitUntilStarted(conn, DefaultTimeout); err != nil { - return err - } - // Init all the RPC clients. hn.initRPCClients(conn) + if err := hn.WaitUntilStarted(); err != nil { + return err + } + // If the node was created with a seed, we will need to perform an // additional step to unlock the wallet. The connection returned will // only use the TLS certs, and can only perform operations necessary to @@ -745,68 +745,27 @@ func (hn *HarnessNode) start(lndBinary string, lndError chan<- error, } // WaitUntilStarted waits until the wallet state flips from "WAITING_TO_START". -func (hn *HarnessNode) WaitUntilStarted(conn grpc.ClientConnInterface, - timeout time.Duration) error { - - return hn.waitForState(conn, timeout, func(s lnrpc.WalletState) bool { +func (hn *HarnessNode) WaitUntilStarted() error { + return hn.waitTillServerState(func(s lnrpc.WalletState) bool { return s != lnrpc.WalletState_WAITING_TO_START }) } // WaitUntilStateReached waits until the given wallet state (or one of the // states following it) has been reached. -func (hn *HarnessNode) WaitUntilStateReached(conn grpc.ClientConnInterface, - timeout time.Duration, desiredState lnrpc.WalletState) error { +func (hn *HarnessNode) WaitUntilStateReached( + desiredState lnrpc.WalletState) error { - return hn.waitForState(conn, timeout, func(s lnrpc.WalletState) bool { + return hn.waitTillServerState(func(s lnrpc.WalletState) bool { return s >= desiredState }) } -// waitForState waits until the current node state fulfills the given -// predicate. -func (hn *HarnessNode) waitForState(conn grpc.ClientConnInterface, - timeout time.Duration, - predicate func(state lnrpc.WalletState) bool) error { - - stateClient := lnrpc.NewStateClient(conn) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - stateStream, err := stateClient.SubscribeState( - ctx, &lnrpc.SubscribeStateRequest{}, - ) - if err != nil { - return err - } - - errChan := make(chan error, 1) - started := make(chan struct{}) - go func() { - for { - resp, err := stateStream.Recv() - if err != nil { - errChan <- err - return - } - - if predicate(resp.State) { - close(started) - return - } - } - }() - - select { - - case <-started: - case err = <-errChan: - - case <-time.After(timeout): - return fmt.Errorf("WaitUntilLeader timed out") - } - - return err +// WaitUntilServerActive waits until the lnd daemon is fully started. +func (hn *HarnessNode) WaitUntilServerActive() error { + return hn.waitTillServerState(func(s lnrpc.WalletState) bool { + return s == lnrpc.WalletState_SERVER_ACTIVE + }) } // WaitUntilLeader attempts to finish the start procedure by initiating an RPC @@ -820,18 +779,12 @@ func (hn *HarnessNode) WaitUntilLeader(timeout time.Duration) error { connErr error ) - startTs := time.Now() if err := wait.NoError(func() error { conn, connErr = hn.ConnectRPC(!hn.Cfg.HasSeed) return connErr }, timeout); err != nil { return err } - timeout -= time.Since(startTs) - - if err := hn.WaitUntilStarted(conn, timeout); err != nil { - return err - } // Init all the RPC clients. hn.initRPCClients(conn) @@ -982,9 +935,11 @@ func (hn *HarnessNode) Unlock(unlockReq *lnrpc.UnlockWalletRequest) error { return hn.initClientWhenReady(DefaultTimeout) } -// waitTillServerStarted makes a subscription to the server's state change and -// blocks until the server is in state ServerActive. -func (hn *HarnessNode) waitTillServerStarted() error { +// waitTillServerState makes a subscription to the server's state change and +// blocks until the server is in the targeted state. +func (hn *HarnessNode) waitTillServerState( + predicate func(state lnrpc.WalletState) bool) error { + ctxt, cancel := context.WithTimeout(hn.runCtx, NodeStartTimeout) defer cancel() @@ -995,15 +950,35 @@ func (hn *HarnessNode) waitTillServerStarted() error { return fmt.Errorf("failed to subscribe to state: %w", err) } - for { - resp, err := client.Recv() - if err != nil { - return fmt.Errorf("failed to receive state "+ - "client stream: %w", err) - } + errChan := make(chan error, 1) + done := make(chan struct{}) + go func() { + for { + resp, err := client.Recv() + if err != nil { + errChan <- err + return + } - if resp.State == lnrpc.WalletState_SERVER_ACTIVE { + if predicate(resp.State) { + close(done) + return + } + } + }() + + var lastErr error + for { + select { + case err := <-errChan: + lastErr = err + + case <-done: return nil + + case <-time.After(NodeStartTimeout): + return fmt.Errorf("timeout waiting for state, "+ + "got err from stream: %v", lastErr) } } } @@ -1043,14 +1018,13 @@ func (hn *HarnessNode) initLightningClient() error { hn.StateClient = lnrpc.NewStateClient(conn) // Wait until the server is fully started. - if err := hn.waitTillServerStarted(); err != nil { + if err := hn.WaitUntilServerActive(); err != nil { return err } // Set the harness node's pubkey to what the node claims in GetInfo. - // Since the RPC might not be immediately active, we wrap the call in a - // wait.NoError. - if err := wait.NoError(hn.FetchNodeInfo, DefaultTimeout); err != nil { + // The RPC must have been started at this point. + if err := hn.FetchNodeInfo(); err != nil { return err } diff --git a/lntest/itest/lnd_rpc_middleware_interceptor_test.go b/lntest/itest/lnd_rpc_middleware_interceptor_test.go index fd9e5e95b..0ada62954 100644 --- a/lntest/itest/lnd_rpc_middleware_interceptor_test.go +++ b/lntest/itest/lnd_rpc_middleware_interceptor_test.go @@ -403,9 +403,7 @@ func middlewareMandatoryTest(t *testing.T, node *lntest.HarnessNode, // test case. So we need to do the wait and client setup manually here. conn, err := node.ConnectRPC(true) require.NoError(t, err) - err = node.WaitUntilStateReached( - conn, defaultTimeout, lnrpc.WalletState_RPC_ACTIVE, - ) + err = node.WaitUntilStateReached(lnrpc.WalletState_RPC_ACTIVE) require.NoError(t, err) node.LightningClient = lnrpc.NewLightningClient(conn) From 61d6f5da11f733d46a631641f80677734e1672f7 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sat, 18 Sep 2021 14:20:50 +0800 Subject: [PATCH 4/7] itest: refactor initClientWhenReady to clean up init node --- lntest/harness_node.go | 137 +++++++++++++++++++---------------------- 1 file changed, 63 insertions(+), 74 deletions(-) diff --git a/lntest/harness_node.go b/lntest/harness_node.go index dc36535ba..58fef454c 100644 --- a/lntest/harness_node.go +++ b/lntest/harness_node.go @@ -789,6 +789,10 @@ func (hn *HarnessNode) WaitUntilLeader(timeout time.Duration) error { // Init all the RPC clients. hn.initRPCClients(conn) + if err := hn.WaitUntilStarted(); err != nil { + return err + } + // If the node was created with a seed, we will need to perform an // additional step to unlock the wallet. The connection returned will // only use the TLS certs, and can only perform operations necessary to @@ -804,18 +808,37 @@ func (hn *HarnessNode) WaitUntilLeader(timeout time.Duration) error { } // initClientWhenReady waits until the main gRPC server is detected as active, -// then complete the normal HarnessNode gRPC connection creation. This can be -// used it a node has just been unlocked, or has its wallet state initialized. -func (hn *HarnessNode) initClientWhenReady(timeout time.Duration) error { +// then complete the normal HarnessNode gRPC connection creation. If the node +// is initialized stateless, the macaroon is returned so that the client can +// use it. +func (hn *HarnessNode) initClientWhenReady(stateless bool, + macBytes []byte) error { + + // Wait for the wallet to finish unlocking, such that we can connect to + // it via a macaroon-authenticated rpc connection. var ( - conn *grpc.ClientConn - connErr error + conn *grpc.ClientConn + err error ) - if err := wait.NoError(func() error { - conn, connErr = hn.ConnectRPC(true) - return connErr - }, timeout); err != nil { + if err = wait.NoError(func() error { + // If the node has been initialized stateless, we need to pass + // the macaroon to the client. + if stateless { + adminMac := &macaroon.Macaroon{} + err := adminMac.UnmarshalBinary(macBytes) + if err != nil { + return fmt.Errorf("unmarshal failed: %w", err) + } + conn, err = hn.ConnectRPCWithMacaroon(adminMac) + return err + } + + // Normal initialization, we expect a macaroon to be in the + // file system. + conn, err = hn.ConnectRPC(true) return err + }, DefaultTimeout); err != nil { + return fmt.Errorf("timeout while init client: %w", err) } // Init all the RPC clients. @@ -834,39 +857,20 @@ func (hn *HarnessNode) Init( ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout) defer cancel() - response, err := hn.InitWallet(ctxt, initReq) + + response, err := hn.rpc.WalletUnlocker.InitWallet(ctxt, initReq) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to init wallet: %w", err) } - // Wait for the wallet to finish unlocking, such that we can connect to - // it via a macaroon-authenticated rpc connection. - var conn *grpc.ClientConn - if err = wait.Predicate(func() bool { - // If the node has been initialized stateless, we need to pass - // the macaroon to the client. - if initReq.StatelessInit { - adminMac := &macaroon.Macaroon{} - err := adminMac.UnmarshalBinary(response.AdminMacaroon) - if err != nil { - return false - } - conn, err = hn.ConnectRPCWithMacaroon(adminMac) - return err == nil - } - - // Normal initialization, we expect a macaroon to be in the - // file system. - conn, err = hn.ConnectRPC(true) - return err == nil - }, DefaultTimeout); err != nil { - return nil, err + err = hn.initClientWhenReady( + initReq.StatelessInit, response.AdminMacaroon, + ) + if err != nil { + return nil, fmt.Errorf("faied to init: %w", err) } - // Init all the RPC clients. - hn.initRPCClients(conn) - - return response, hn.initLightningClient() + return response, nil } // InitChangePassword initializes a harness node by passing the change password @@ -880,39 +884,19 @@ func (hn *HarnessNode) InitChangePassword( ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout) defer cancel() - response, err := hn.ChangePassword(ctxt, chngPwReq) + + response, err := hn.rpc.WalletUnlocker.ChangePassword(ctxt, chngPwReq) + if err != nil { + return nil, err + } + err = hn.initClientWhenReady( + chngPwReq.StatelessInit, response.AdminMacaroon, + ) if err != nil { return nil, err } - // Wait for the wallet to finish unlocking, such that we can connect to - // it via a macaroon-authenticated rpc connection. - var conn *grpc.ClientConn - if err = wait.Predicate(func() bool { - // If the node has been initialized stateless, we need to pass - // the macaroon to the client. - if chngPwReq.StatelessInit { - adminMac := &macaroon.Macaroon{} - err := adminMac.UnmarshalBinary(response.AdminMacaroon) - if err != nil { - return false - } - conn, err = hn.ConnectRPCWithMacaroon(adminMac) - return err == nil - } - - // Normal initialization, we expect a macaroon to be in the - // file system. - conn, err = hn.ConnectRPC(true) - return err == nil - }, DefaultTimeout); err != nil { - return nil, err - } - - // Init all the RPC clients. - hn.initRPCClients(conn) - - return response, hn.initLightningClient() + return response, nil } // Unlock attempts to unlock the wallet of the target HarnessNode. This method @@ -932,7 +916,7 @@ func (hn *HarnessNode) Unlock(unlockReq *lnrpc.UnlockWalletRequest) error { // Now that the wallet has been unlocked, we'll wait for the RPC client // to be ready, then establish the normal gRPC connection. - return hn.initClientWhenReady(DefaultTimeout) + return hn.initClientWhenReady(false, nil) } // waitTillServerState makes a subscription to the server's state change and @@ -1081,7 +1065,8 @@ func (hn *HarnessNode) ReadMacaroon(macPath string, timeout time.Duration) ( err := wait.NoError(func() error { macBytes, err := ioutil.ReadFile(macPath) if err != nil { - return fmt.Errorf("error reading macaroon file: %v", err) + return fmt.Errorf("error reading macaroon file: %v", + err) } newMac := &macaroon.Macaroon{} @@ -1345,6 +1330,7 @@ func (hn *HarnessNode) lightningNetworkWatcher() { go func() { defer hn.wg.Done() err := hn.receiveTopologyClientStream(graphUpdates) + if err != nil { hn.PrintErr("receive topology client stream "+ "got err:%v", err) @@ -1504,8 +1490,8 @@ func (hn *HarnessNode) WaitForChannelPolicyUpdate( } } -// WaitForBlockchainSync waits for the target node to be fully synchronized with -// the blockchain. If the passed context object has a set timeout, it will +// WaitForBlockchainSync waits for the target node to be fully synchronized +// with the blockchain. If the passed context object has a set timeout, it will // continually poll until the timeout has elapsed. In the case that the chain // isn't synced before the timeout is up, this function will return an error. func (hn *HarnessNode) WaitForBlockchainSync() error { @@ -1551,17 +1537,20 @@ func (hn *HarnessNode) WaitForBalance(expectedBalance btcutil.Amount, if confirmed { lastBalance = btcutil.Amount(balance.ConfirmedBalance) - return btcutil.Amount(balance.ConfirmedBalance) == expectedBalance + return btcutil.Amount(balance.ConfirmedBalance) == + expectedBalance } lastBalance = btcutil.Amount(balance.UnconfirmedBalance) - return btcutil.Amount(balance.UnconfirmedBalance) == expectedBalance + return btcutil.Amount(balance.UnconfirmedBalance) == + expectedBalance } err := wait.Predicate(doesBalanceMatch, DefaultTimeout) if err != nil { return fmt.Errorf("balances not synced after deadline: "+ - "expected %v, only have %v", expectedBalance, lastBalance) + "expected %v, only have %v", expectedBalance, + lastBalance) } return nil @@ -1737,7 +1726,7 @@ func (hn *HarnessNode) receiveTopologyClientStream( // Create a topology client to receive graph updates. client, err := hn.newTopologyClient(hn.runCtx) if err != nil { - return fmt.Errorf("create topologyClient failed: %v", err) + return fmt.Errorf("create topologyClient failed: %w", err) } // We use the context to time out when retrying graph subscription. From 337aa6670b6c49acfc767bc7d67ad70c6df1944d Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sat, 18 Sep 2021 14:35:23 +0800 Subject: [PATCH 5/7] itest: introduce harness miner This commit adds a new component, harness miner, to the itest. This newly added component is responsible for checking the mempool and blocks for the itest. --- lntest/harness_miner.go | 161 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 161 insertions(+) create mode 100644 lntest/harness_miner.go diff --git a/lntest/harness_miner.go b/lntest/harness_miner.go new file mode 100644 index 000000000..238fd3365 --- /dev/null +++ b/lntest/harness_miner.go @@ -0,0 +1,161 @@ +package lntest + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strings" + "time" + + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/integration/rpctest" + "github.com/btcsuite/btcd/rpcclient" +) + +const ( + // minerLogFilename is the default log filename for the miner node. + minerLogFilename = "output_btcd_miner.log" + + // minerLogDir is the default log dir for the miner node. + minerLogDir = ".minerlogs" +) + +var harnessNetParams = &chaincfg.RegressionNetParams + +type HarnessMiner struct { + *rpctest.Harness + + // runCtx is a context with cancel method. It's used to signal when the + // node needs to quit, and used as the parent context when spawning + runCtx context.Context + cancel context.CancelFunc + + // logPath is the directory path of the miner's logs. + logPath string + + // logFilename is the saved log filename of the miner node. + logFilename string +} + +// NewMiner creates a new miner using btcd backend with the default log file +// dir and name. +func NewMinerTemp() (*HarnessMiner, error) { // TODO(yy): rename + return newMiner(minerLogDir, minerLogFilename) +} + +// NewTempMiner creates a new miner using btcd backend with the specified log +// file dir and name. +func NewTempMiner(tempDir, tempLogFilename string) (*HarnessMiner, error) { + return newMiner(tempDir, tempLogFilename) +} + +// newMiner creates a new miner using btcd's rpctest. +func newMiner(minerDirName, logFilename string) (*HarnessMiner, error) { + handler := &rpcclient.NotificationHandlers{} + btcdBinary := GetBtcdBinary() + baseLogPath := fmt.Sprintf("%s/%s", GetLogDir(), minerDirName) + + args := []string{ + "--rejectnonstd", + "--txindex", + "--nowinservice", + "--nobanning", + "--debuglevel=debug", + "--logdir=" + baseLogPath, + "--trickleinterval=100ms", + // Don't disconnect if a reply takes too long. + "--nostalldetect", + } + + miner, err := rpctest.New(harnessNetParams, handler, args, btcdBinary) + if err != nil { + return nil, fmt.Errorf("unable to create mining node: %v", err) + } + + ctxt, cancel := context.WithCancel(context.Background()) + m := &HarnessMiner{ + Harness: miner, + runCtx: ctxt, + cancel: cancel, + logPath: baseLogPath, + logFilename: logFilename, + } + return m, nil +} + +// Stop shuts down the miner and saves its logs. +func (h *HarnessMiner) Stop() error { + h.cancel() + + if err := h.TearDown(); err != nil { + return fmt.Errorf("tear down miner got error: %s", err) + } + + return h.saveLogs() +} + +// saveLogs copies the node logs and save it to the file specified by +// h.logFilename. +func (h *HarnessMiner) saveLogs() error { + // After shutting down the miner, we'll make a copy of the log files + // before deleting the temporary log dir. + path := fmt.Sprintf("%s/%s", h.logPath, harnessNetParams.Name) + files, err := ioutil.ReadDir(path) + if err != nil { + return fmt.Errorf("unable to read log directory: %v", err) + } + + for _, file := range files { + newFilename := strings.Replace( + file.Name(), "btcd.log", h.logFilename, 1, + ) + copyPath := fmt.Sprintf("%s/../%s", h.logPath, newFilename) + + logFile := fmt.Sprintf("%s/%s", path, file.Name()) + err := CopyFile(filepath.Clean(copyPath), logFile) + if err != nil { + return fmt.Errorf("unable to copy file: %v", err) + } + } + + if err = os.RemoveAll(h.logPath); err != nil { + return fmt.Errorf("cannot remove dir %s: %v", h.logPath, err) + } + + return nil +} + +// waitForTxInMempool blocks until the target txid is seen in the mempool. If +// the transaction isn't seen within the network before the passed timeout, +// then an error is returned. +func (h *HarnessMiner) waitForTxInMempool(txid chainhash.Hash) error { + ticker := time.NewTicker(50 * time.Millisecond) + defer ticker.Stop() + + var mempool []*chainhash.Hash + for { + select { + case <-h.runCtx.Done(): + return fmt.Errorf("NetworkHarness has been torn down") + case <-time.After(DefaultTimeout): + return fmt.Errorf("wanted %v, found %v txs "+ + "in mempool: %v", txid, len(mempool), mempool) + + case <-ticker.C: + var err error + mempool, err = h.Client.GetRawMempool() + if err != nil { + return err + } + + for _, mempoolTx := range mempool { + if *mempoolTx == txid { + return nil + } + } + } + } +} From 587273174e49050efb79fbf49620570164b368e8 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sat, 18 Sep 2021 14:41:20 +0800 Subject: [PATCH 6/7] itest: start using harness miner in harness net This commit replaces the old miner with the new HarnessMiner and cleans harness_node.go by moving methods into the test_common.go. --- lntest/harness_miner.go | 2 +- lntest/harness_net.go | 76 ++------------------ lntest/harness_node.go | 70 ------------------ lntest/itest/assertions.go | 3 +- lntest/itest/lnd_channel_force_close_test.go | 3 +- lntest/itest/lnd_open_channel_test.go | 14 ++-- lntest/itest/lnd_test.go | 9 +-- lntest/test_common.go | 23 ++++++ 8 files changed, 38 insertions(+), 162 deletions(-) diff --git a/lntest/harness_miner.go b/lntest/harness_miner.go index 238fd3365..b9f54619a 100644 --- a/lntest/harness_miner.go +++ b/lntest/harness_miner.go @@ -42,7 +42,7 @@ type HarnessMiner struct { // NewMiner creates a new miner using btcd backend with the default log file // dir and name. -func NewMinerTemp() (*HarnessMiner, error) { // TODO(yy): rename +func NewMiner() (*HarnessMiner, error) { return newMiner(minerLogDir, minerLogFilename) } diff --git a/lntest/harness_net.go b/lntest/harness_net.go index 76e571274..118cea1b5 100644 --- a/lntest/harness_net.go +++ b/lntest/harness_net.go @@ -17,7 +17,6 @@ import ( "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/btcsuite/btcd/integration/rpctest" "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" @@ -55,7 +54,7 @@ type NetworkHarness struct { // Miner is a reference to a running full node that can be used to // create new blocks on the network. - Miner *rpctest.Harness + Miner *HarnessMiner // BackendCfg houses the information necessary to use a node as LND // chain backend, such as rpc configuration, P2P information etc. @@ -94,7 +93,7 @@ type NetworkHarness struct { // TODO(roasbeef): add option to use golang's build library to a binary of the // current repo. This will save developers from having to manually `go install` // within the repo each time before changes -func NewNetworkHarness(r *rpctest.Harness, b BackendConfig, lndBinary string, +func NewNetworkHarness(m *HarnessMiner, b BackendConfig, lndBinary string, dbBackend DatabaseBackend) (*NetworkHarness, error) { feeService := startFeeService() @@ -105,8 +104,8 @@ func NewNetworkHarness(r *rpctest.Harness, b BackendConfig, lndBinary string, activeNodes: make(map[int]*HarnessNode), nodesByPub: make(map[string]*HarnessNode), lndErrorChan: make(chan error), - netParams: r.ActiveNet, - Miner: r, + netParams: m.ActiveNet, + Miner: m, BackendCfg: b, feeService: feeService, runCtx: ctxt, @@ -949,41 +948,6 @@ func saveProfilesPage(node *HarnessNode) error { return nil } -// waitForTxInMempool blocks until the target txid is seen in the mempool. If -// the transaction isn't seen within the network before the passed timeout, -// then an error is returned. -func (n *NetworkHarness) waitForTxInMempool(txid chainhash.Hash) error { - ticker := time.NewTicker(50 * time.Millisecond) - defer ticker.Stop() - - ctxt, cancel := context.WithTimeout(n.runCtx, DefaultTimeout) - defer cancel() - - var mempool []*chainhash.Hash - for { - select { - case <-n.runCtx.Done(): - return fmt.Errorf("NetworkHarness has been torn down") - case <-ctxt.Done(): - return fmt.Errorf("wanted %v, found %v txs "+ - "in mempool: %v", txid, len(mempool), mempool) - - case <-ticker.C: - var err error - mempool, err = n.Miner.Client.GetRawMempool() - if err != nil { - return err - } - - for _, mempoolTx := range mempool { - if *mempoolTx == txid { - return nil - } - } - } - } -} - // OpenChannelParams houses the params to specify when opening a new channel. type OpenChannelParams struct { // Amt is the local amount being put into the channel. @@ -1332,7 +1296,7 @@ func (n *NetworkHarness) CloseChannel(lnNode *HarnessNode, return fmt.Errorf("unable to decode closeTxid: "+ "%v", err) } - if err := n.waitForTxInMempool(*closeTxid); err != nil { + if err := n.Miner.waitForTxInMempool(*closeTxid); err != nil { return fmt.Errorf("error while waiting for "+ "broadcast tx: %v", err) } @@ -1601,36 +1565,6 @@ func (n *NetworkHarness) SetFeeEstimateWithConf( n.feeService.setFeeWithConf(fee, conf) } -// CopyFile copies the file src to dest. -func CopyFile(dest, src string) error { - s, err := os.Open(src) - if err != nil { - return err - } - defer s.Close() - - d, err := os.Create(dest) - if err != nil { - return err - } - - if _, err := io.Copy(d, s); err != nil { - d.Close() - return err - } - - return d.Close() -} - -// FileExists returns true if the file at path exists. -func FileExists(path string) bool { - if _, err := os.Stat(path); os.IsNotExist(err) { - return false - } - - return true -} - // copyAll copies all files and directories from srcDir to dstDir recursively. // Note that this function does not support links. func copyAll(dstDir, srcDir string) error { diff --git a/lntest/harness_node.go b/lntest/harness_node.go index 58fef454c..0ee4ef6f7 100644 --- a/lntest/harness_node.go +++ b/lntest/harness_node.go @@ -18,8 +18,6 @@ import ( "time" "github.com/btcsuite/btcd/chaincfg" - "github.com/btcsuite/btcd/integration/rpctest" - "github.com/btcsuite/btcd/rpcclient" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/jackc/pgx/v4/pgxpool" @@ -465,74 +463,6 @@ func executePgQuery(query string) error { return err } -// NewMiner creates a new miner using btcd backend. The baseLogDir specifies -// the miner node's log dir. When tests are finished, during clean up, its log -// files, including any compressed log files from logrotate, are copied to -// logDir as logFilename. -func NewMiner(baseLogDir, logFilename string, netParams *chaincfg.Params, - handler *rpcclient.NotificationHandlers, - btcdBinary string) (*rpctest.Harness, func() error, error) { - - args := []string{ - "--rejectnonstd", - "--txindex", - "--nowinservice", - "--nobanning", - "--debuglevel=debug", - "--logdir=" + baseLogDir, - "--trickleinterval=100ms", - // Don't disconnect if a reply takes too long. - "--nostalldetect", - } - - miner, err := rpctest.New(netParams, handler, args, btcdBinary) - if err != nil { - return nil, nil, fmt.Errorf( - "unable to create mining node: %v", err, - ) - } - - cleanUp := func() error { - if err := miner.TearDown(); err != nil { - return fmt.Errorf( - "failed to tear down miner, got error: %s", err, - ) - } - - // After shutting down the miner, we'll make a copy of - // the log files before deleting the temporary log dir. - logDir := fmt.Sprintf("%s/%s", baseLogDir, netParams.Name) - files, err := ioutil.ReadDir(logDir) - if err != nil { - return fmt.Errorf("unable to read log directory: %v", err) - } - - for _, file := range files { - logFile := fmt.Sprintf( - "%s/%s", logDir, file.Name(), - ) - newFilename := strings.Replace(file.Name(), "btcd.log", logFilename, 1) - copyPath := fmt.Sprintf( - "%s/../%s", baseLogDir, newFilename, - ) - - err := CopyFile(filepath.Clean(copyPath), logFile) - if err != nil { - return fmt.Errorf("unable to copy file: %v", err) - } - } - - if err = os.RemoveAll(baseLogDir); err != nil { - return fmt.Errorf( - "cannot remove dir %s: %v", baseLogDir, err, - ) - } - return nil - } - - return miner, cleanUp, nil -} - // String gives the internal state of the node which is useful for debugging. func (hn *HarnessNode) String() string { type nodeCfg struct { diff --git a/lntest/itest/assertions.go b/lntest/itest/assertions.go index 822508d5a..922e34de8 100644 --- a/lntest/itest/assertions.go +++ b/lntest/itest/assertions.go @@ -10,7 +10,6 @@ import ( "time" "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/btcsuite/btcd/integration/rpctest" "github.com/btcsuite/btcd/rpcclient" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" @@ -691,7 +690,7 @@ func assertChannelPolicy(t *harnessTest, node *lntest.HarnessNode, // assertMinerBlockHeightDelta ensures that tempMiner is 'delta' blocks ahead // of miner. func assertMinerBlockHeightDelta(t *harnessTest, - miner, tempMiner *rpctest.Harness, delta int32) { + miner, tempMiner *lntest.HarnessMiner, delta int32) { // Ensure the chain lengths are what we expect. var predErr error diff --git a/lntest/itest/lnd_channel_force_close_test.go b/lntest/itest/lnd_channel_force_close_test.go index 9c0069e18..5a9c6e207 100644 --- a/lntest/itest/lnd_channel_force_close_test.go +++ b/lntest/itest/lnd_channel_force_close_test.go @@ -7,7 +7,6 @@ import ( "testing" "github.com/btcsuite/btcd/blockchain" - "github.com/btcsuite/btcd/integration/rpctest" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/go-errors/errors" @@ -212,7 +211,7 @@ func testCommitmentTransactionDeadline(net *lntest.NetworkHarness, // calculateTxnsFeeRate takes a list of transactions and estimates the fee rate // used to sweep them. func calculateTxnsFeeRate(t *testing.T, - miner *rpctest.Harness, txns []*wire.MsgTx) int64 { + miner *lntest.HarnessMiner, txns []*wire.MsgTx) int64 { var totalWeight, totalFee int64 for _, tx := range txns { diff --git a/lntest/itest/lnd_open_channel_test.go b/lntest/itest/lnd_open_channel_test.go index 52ae84a00..392db06d8 100644 --- a/lntest/itest/lnd_open_channel_test.go +++ b/lntest/itest/lnd_open_channel_test.go @@ -8,7 +8,6 @@ import ( "github.com/btcsuite/btcd/btcjson" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/integration/rpctest" - "github.com/btcsuite/btcd/rpcclient" "github.com/btcsuite/btcutil" "github.com/lightningnetwork/lnd/funding" "github.com/lightningnetwork/lnd/lnrpc" @@ -34,16 +33,13 @@ func testOpenChannelAfterReorg(net *lntest.NetworkHarness, t *harnessTest) { ) // Set up a new miner that we can use to cause a reorg. - tempLogDir := fmt.Sprintf("%s/.tempminerlogs", lntest.GetLogDir()) + tempLogDir := ".tempminerlogs" logFilename := "output-open_channel_reorg-temp_miner.log" - tempMiner, tempMinerCleanUp, err := lntest.NewMiner( - tempLogDir, logFilename, harnessNetParams, - &rpcclient.NotificationHandlers{}, lntest.GetBtcdBinary(), - ) + tempMiner, err := lntest.NewTempMiner(tempLogDir, logFilename) require.NoError(t.t, err, "failed to create temp miner") defer func() { require.NoError( - t.t, tempMinerCleanUp(), + t.t, tempMiner.Stop(), "failed to clean up temp miner", ) }() @@ -61,7 +57,7 @@ func testOpenChannelAfterReorg(net *lntest.NetworkHarness, t *harnessTest) { if err != nil { t.Fatalf("unable to remove node: %v", err) } - nodeSlice := []*rpctest.Harness{net.Miner, tempMiner} + nodeSlice := []*rpctest.Harness{net.Miner.Harness, tempMiner.Harness} if err := rpctest.JoinNodes(nodeSlice, rpctest.Blocks); err != nil { t.Fatalf("unable to join node on blocks: %v", err) } @@ -186,7 +182,7 @@ func testOpenChannelAfterReorg(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("unable to remove node: %v", err) } - nodes := []*rpctest.Harness{tempMiner, net.Miner} + nodes := []*rpctest.Harness{tempMiner.Harness, net.Miner.Harness} if err := rpctest.JoinNodes(nodes, rpctest.Blocks); err != nil { t.Fatalf("unable to join node on blocks: %v", err) } diff --git a/lntest/itest/lnd_test.go b/lntest/itest/lnd_test.go index 0b342e382..a2e991184 100644 --- a/lntest/itest/lnd_test.go +++ b/lntest/itest/lnd_test.go @@ -9,7 +9,6 @@ import ( "time" "github.com/btcsuite/btcd/integration/rpctest" - "github.com/btcsuite/btcd/rpcclient" "github.com/lightningnetwork/lnd/lntest" "github.com/stretchr/testify/require" ) @@ -113,14 +112,10 @@ func TestLightningNetworkDaemon(t *testing.T) { // guarantees of getting included in to blocks. // // We will also connect it to our chain backend. - minerLogDir := fmt.Sprintf("%s/.minerlogs", logDir) - miner, minerCleanUp, err := lntest.NewMiner( - minerLogDir, "output_btcd_miner.log", harnessNetParams, - &rpcclient.NotificationHandlers{}, lntest.GetBtcdBinary(), - ) + miner, err := lntest.NewMiner() require.NoError(t, err, "failed to create new miner") defer func() { - require.NoError(t, minerCleanUp(), "failed to clean up miner") + require.NoError(t, miner.Stop(), "failed to stop miner") }() // Start a chain backend. diff --git a/lntest/test_common.go b/lntest/test_common.go index 717694ceb..e858c168c 100644 --- a/lntest/test_common.go +++ b/lntest/test_common.go @@ -4,7 +4,9 @@ import ( "errors" "flag" "fmt" + "io" "net" + "os" "sync/atomic" "github.com/btcsuite/btcd/wire" @@ -162,3 +164,24 @@ func CheckChannelPolicy(policy, expectedPolicy *lnrpc.RoutingPolicy) error { return nil } + +// CopyFile copies the file src to dest. +func CopyFile(dest, src string) error { + s, err := os.Open(src) + if err != nil { + return err + } + defer s.Close() + + d, err := os.Create(dest) + if err != nil { + return err + } + + if _, err := io.Copy(d, s); err != nil { + d.Close() + return err + } + + return d.Close() +} From 10d876ae9853ede61fb34fcca37c303ca9c8dcda Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Wed, 5 Jan 2022 12:57:17 +0100 Subject: [PATCH 7/7] itest: fix RPC middleware itest --- lntest/harness_node.go | 10 +++++----- lntest/itest/lnd_rpc_middleware_interceptor_test.go | 1 + 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/lntest/harness_node.go b/lntest/harness_node.go index 0ee4ef6f7..add14a85c 100644 --- a/lntest/harness_node.go +++ b/lntest/harness_node.go @@ -655,7 +655,7 @@ func (hn *HarnessNode) start(lndBinary string, lndError chan<- error, } // Init all the RPC clients. - hn.initRPCClients(conn) + hn.InitRPCClients(conn) if err := hn.WaitUntilStarted(); err != nil { return err @@ -717,7 +717,7 @@ func (hn *HarnessNode) WaitUntilLeader(timeout time.Duration) error { } // Init all the RPC clients. - hn.initRPCClients(conn) + hn.InitRPCClients(conn) if err := hn.WaitUntilStarted(); err != nil { return err @@ -772,7 +772,7 @@ func (hn *HarnessNode) initClientWhenReady(stateless bool, } // Init all the RPC clients. - hn.initRPCClients(conn) + hn.InitRPCClients(conn) return hn.initLightningClient() } @@ -897,8 +897,8 @@ func (hn *HarnessNode) waitTillServerState( } } -// initRPCClients initializes a list of RPC clients for the node. -func (hn *HarnessNode) initRPCClients(c *grpc.ClientConn) { +// InitRPCClients initializes a list of RPC clients for the node. +func (hn *HarnessNode) InitRPCClients(c *grpc.ClientConn) { hn.rpc = &RPCClients{ conn: c, LN: lnrpc.NewLightningClient(c), diff --git a/lntest/itest/lnd_rpc_middleware_interceptor_test.go b/lntest/itest/lnd_rpc_middleware_interceptor_test.go index 0ada62954..48be71fca 100644 --- a/lntest/itest/lnd_rpc_middleware_interceptor_test.go +++ b/lntest/itest/lnd_rpc_middleware_interceptor_test.go @@ -403,6 +403,7 @@ func middlewareMandatoryTest(t *testing.T, node *lntest.HarnessNode, // test case. So we need to do the wait and client setup manually here. conn, err := node.ConnectRPC(true) require.NoError(t, err) + node.InitRPCClients(conn) err = node.WaitUntilStateReached(lnrpc.WalletState_RPC_ACTIVE) require.NoError(t, err) node.LightningClient = lnrpc.NewLightningClient(conn)