itest: shorten functions inside harness node

This commit refactors the long function start() into smaller pieces and
moves commonly used functions into test_common.go.
This commit is contained in:
yyforyongyu 2021-09-18 13:00:39 +08:00
parent ebc1547abc
commit f8cf7c8775
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
3 changed files with 236 additions and 229 deletions

View File

@ -1746,3 +1746,26 @@ func (n *NetworkHarness) RestoreDb(hn *HarnessNode) error {
return nil
}
// getChanPointFundingTxid returns the given channel point's funding txid in
// raw bytes.
func getChanPointFundingTxid(chanPoint *lnrpc.ChannelPoint) ([]byte, error) {
var txid []byte
// A channel point's funding txid can be get/set as a byte slice or a
// string. In the case it is a string, decode it.
switch chanPoint.GetFundingTxid().(type) {
case *lnrpc.ChannelPoint_FundingTxidBytes:
txid = chanPoint.GetFundingTxidBytes()
case *lnrpc.ChannelPoint_FundingTxidStr:
s := chanPoint.GetFundingTxidStr()
h, err := chainhash.NewHashFromStr(s)
if err != nil {
return nil, err
}
txid = h[:]
}
return txid, nil
}

View File

@ -6,7 +6,6 @@ import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
@ -19,7 +18,6 @@ import (
"time"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/integration/rpctest"
"github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcd/wire"
@ -190,28 +188,32 @@ func (cfg NodeConfig) genArgs() []string {
backendArgs := cfg.BackendCfg.GenArgs()
args = append(args, backendArgs...)
args = append(args, "--bitcoin.active")
args = append(args, "--nobootstrap")
args = append(args, "--debuglevel=debug")
args = append(args, "--bitcoin.defaultchanconfs=1")
args = append(args, fmt.Sprintf("--db.batch-commit-interval=%v", commitInterval))
args = append(args, fmt.Sprintf("--bitcoin.defaultremotedelay=%v", DefaultCSV))
args = append(args, fmt.Sprintf("--rpclisten=%v", cfg.RPCAddr()))
args = append(args, fmt.Sprintf("--restlisten=%v", cfg.RESTAddr()))
args = append(args, fmt.Sprintf("--restcors=https://%v", cfg.RESTAddr()))
args = append(args, fmt.Sprintf("--listen=%v", cfg.P2PAddr()))
args = append(args, fmt.Sprintf("--externalip=%v", cfg.P2PAddr()))
args = append(args, fmt.Sprintf("--logdir=%v", cfg.LogDir))
args = append(args, fmt.Sprintf("--datadir=%v", cfg.DataDir))
args = append(args, fmt.Sprintf("--tlscertpath=%v", cfg.TLSCertPath))
args = append(args, fmt.Sprintf("--tlskeypath=%v", cfg.TLSKeyPath))
args = append(args, fmt.Sprintf("--configfile=%v", cfg.DataDir))
args = append(args, fmt.Sprintf("--adminmacaroonpath=%v", cfg.AdminMacPath))
args = append(args, fmt.Sprintf("--readonlymacaroonpath=%v", cfg.ReadMacPath))
args = append(args, fmt.Sprintf("--invoicemacaroonpath=%v", cfg.InvoiceMacPath))
args = append(args, fmt.Sprintf("--trickledelay=%v", trickleDelay))
args = append(args, fmt.Sprintf("--profile=%d", cfg.ProfilePort))
args = append(args, fmt.Sprintf("--caches.rpc-graph-cache-duration=0"))
nodeArgs := []string{
"--bitcoin.active",
"--nobootstrap",
"--debuglevel=debug",
"--bitcoin.defaultchanconfs=1",
fmt.Sprintf("--db.batch-commit-interval=%v", commitInterval),
fmt.Sprintf("--bitcoin.defaultremotedelay=%v", DefaultCSV),
fmt.Sprintf("--rpclisten=%v", cfg.RPCAddr()),
fmt.Sprintf("--restlisten=%v", cfg.RESTAddr()),
fmt.Sprintf("--restcors=https://%v", cfg.RESTAddr()),
fmt.Sprintf("--listen=%v", cfg.P2PAddr()),
fmt.Sprintf("--externalip=%v", cfg.P2PAddr()),
fmt.Sprintf("--logdir=%v", cfg.LogDir),
fmt.Sprintf("--datadir=%v", cfg.DataDir),
fmt.Sprintf("--tlscertpath=%v", cfg.TLSCertPath),
fmt.Sprintf("--tlskeypath=%v", cfg.TLSKeyPath),
fmt.Sprintf("--configfile=%v", cfg.DataDir),
fmt.Sprintf("--adminmacaroonpath=%v", cfg.AdminMacPath),
fmt.Sprintf("--readonlymacaroonpath=%v", cfg.ReadMacPath),
fmt.Sprintf("--invoicemacaroonpath=%v", cfg.InvoiceMacPath),
fmt.Sprintf("--trickledelay=%v", trickleDelay),
fmt.Sprintf("--profile=%d", cfg.ProfilePort),
fmt.Sprintf("--caches.rpc-graph-cache-duration=0"),
}
args = append(args, nodeArgs...)
if !cfg.HasSeed {
args = append(args, "--noseedbackup")
@ -295,7 +297,6 @@ type HarnessNode struct {
PubKeyStr string
cmd *exec.Cmd
pidFile string
logFile *os.File
// chanWatchRequests receives a request for watching a particular event
@ -358,6 +359,16 @@ var _ lnrpc.LightningClient = (*HarnessNode)(nil)
var _ lnrpc.WalletUnlockerClient = (*HarnessNode)(nil)
var _ invoicesrpc.InvoicesClient = (*HarnessNode)(nil)
// nextNodeID generates a unique sequence to be used as the node's ID.
func nextNodeID() int {
numActiveNodesMtx.Lock()
defer numActiveNodesMtx.Unlock()
nodeNum := numActiveNodes
numActiveNodes++
return nodeNum
}
// newNode creates a new test lightning node instance from the passed config.
func newNode(cfg NodeConfig) (*HarnessNode, error) {
if cfg.BaseDir == "" {
@ -397,14 +408,9 @@ func newNode(cfg NodeConfig) (*HarnessNode, error) {
cfg.PostgresDsn = postgresDatabaseDsn(dbName)
}
numActiveNodesMtx.Lock()
nodeNum := numActiveNodes
numActiveNodes++
numActiveNodesMtx.Unlock()
return &HarnessNode{
Cfg: &cfg,
NodeID: nodeNum,
NodeID: nextNodeID(),
chanWatchRequests: make(chan *chanWatchRequest),
openChans: make(map[wire.OutPoint]int),
openChanWatchers: make(map[wire.OutPoint][]chan struct{}),
@ -621,29 +627,9 @@ func (hn *HarnessNode) InvoiceMacPath() string {
return hn.Cfg.InvoiceMacPath
}
// renameFile is a helper to rename (log) files created during integration tests.
func renameFile(fromFileName, toFileName string) {
err := os.Rename(fromFileName, toFileName)
if err != nil {
fmt.Printf("could not rename %s to %s: %v\n",
fromFileName, toFileName, err)
}
}
// Start launches a new process running lnd. Additionally, the PID of the
// launched process is saved in order to possibly kill the process forcibly
// later.
//
// This may not clean up properly if an error is returned, so the caller should
// call shutdown() regardless of the return value.
func (hn *HarnessNode) start(lndBinary string, lndError chan<- error,
wait bool) error {
// Init the runCtx.
ctxt, cancel := context.WithCancel(context.Background())
hn.runCtx = ctxt
hn.cancel = cancel
// startLnd handles the startup of lnd, creating log files, and possibly kills
// the process when needed.
func (hn *HarnessNode) startLnd(lndBinary string, lndError chan<- error) error {
args := hn.Cfg.genArgs()
hn.cmd = exec.Command(lndBinary, args...)
@ -651,87 +637,17 @@ func (hn *HarnessNode) start(lndBinary string, lndError chan<- error,
var errb bytes.Buffer
hn.cmd.Stderr = &errb
// Make sure the log file cleanup function is initialized, even
// if no log file is created.
var finalizeLogfile = func() {
if hn.logFile != nil {
hn.logFile.Close()
}
}
getFinalizedLogFilePrefix := func() string {
pubKeyHex := hex.EncodeToString(
hn.PubKey[:logPubKeyBytes],
)
return fmt.Sprintf("%s/%d-%s-%s-%s",
GetLogDir(), hn.NodeID,
hn.Cfg.LogFilenamePrefix,
hn.Cfg.Name, pubKeyHex)
}
finalizeEtcdLog := func() {
if hn.Cfg.DbBackend != BackendEtcd {
return
}
etcdLogFileName := fmt.Sprintf("%s/etcd.log", hn.Cfg.LogDir)
newEtcdLogFileName := fmt.Sprintf("%v-etcd.log",
getFinalizedLogFilePrefix(),
)
renameFile(etcdLogFileName, newEtcdLogFileName)
}
// If the logoutput flag is passed, redirect output from the nodes to
// log files.
var (
fileName string
err error
)
if *logOutput {
dir := GetLogDir()
fileName := fmt.Sprintf("%s/%d-%s-%s-%s.log", dir, hn.NodeID,
hn.Cfg.LogFilenamePrefix, hn.Cfg.Name,
hex.EncodeToString(hn.PubKey[:logPubKeyBytes]))
// If the node's PubKey is not yet initialized, create a
// temporary file name. Later, after the PubKey has been
// initialized, the file can be moved to its final name with
// the PubKey included.
if bytes.Equal(hn.PubKey[:4], []byte{0, 0, 0, 0}) {
fileName = fmt.Sprintf("%s/%d-%s-%s-tmp__.log", dir,
hn.NodeID, hn.Cfg.LogFilenamePrefix,
hn.Cfg.Name)
}
// Once the node has done its work, the log file can be
// renamed.
finalizeLogfile = func() {
if hn.logFile != nil {
hn.logFile.Close()
newFileName := fmt.Sprintf("%v.log",
getFinalizedLogFilePrefix(),
)
renameFile(fileName, newFileName)
}
}
// Create file if not exists, otherwise append.
file, err := os.OpenFile(fileName,
os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
fileName, err = addLogFile(hn)
if err != nil {
return err
}
// Pass node's stderr to both errb and the file.
w := io.MultiWriter(&errb, file)
hn.cmd.Stderr = w
// Pass the node's stdout only to the file.
hn.cmd.Stdout = file
// Let the node keep a reference to this file, such
// that we can add to it if necessary.
hn.logFile = file
}
if err := hn.cmd.Start(); err != nil {
@ -750,21 +666,32 @@ func (hn *HarnessNode) start(lndBinary string, lndError chan<- error,
}
// Make sure log file is closed and renamed if necessary.
finalizeLogfile()
finalizeLogfile(hn, fileName)
// Rename the etcd.log file if the node was running on embedded
// etcd.
finalizeEtcdLog()
finalizeEtcdLog(hn)
}()
// Write process ID to a file.
if err := hn.writePidFile(); err != nil {
err = fmt.Errorf("writePidFile err: %w", err)
cmdErr := hn.cmd.Process.Kill()
if cmdErr != nil {
err = fmt.Errorf("kill process got err: %w: %v",
cmdErr, err)
}
return nil
}
// Start launches a new process running lnd. Additionally, the PID of the
// launched process is saved in order to possibly kill the process forcibly
// later.
//
// This may not clean up properly if an error is returned, so the caller should
// call shutdown() regardless of the return value.
func (hn *HarnessNode) start(lndBinary string, lndError chan<- error,
wait bool) error {
// Init the runCtx.
ctxt, cancel := context.WithCancel(context.Background())
hn.runCtx = ctxt
hn.cancel = cancel
// Start lnd and prepare logs.
if err := hn.startLnd(lndBinary, lndError); err != nil {
return err
}
@ -872,7 +799,8 @@ func (hn *HarnessNode) waitForState(conn grpc.ClientConnInterface,
// WaitUntilLeader attempts to finish the start procedure by initiating an RPC
// connection and setting up the wallet unlocker client. This is needed when
// a node that has recently been started was waiting to become the leader and
// we're at the point when we expect that it is the leader now (awaiting unlock).
// we're at the point when we expect that it is the leader now (awaiting
// unlock).
func (hn *HarnessNode) WaitUntilLeader(timeout time.Duration) error {
var (
conn *grpc.ClientConn
@ -1124,25 +1052,6 @@ func (hn *HarnessNode) AddToLog(format string, a ...interface{}) {
}
}
// writePidFile writes the process ID of the running lnd process to a .pid file.
func (hn *HarnessNode) writePidFile() error {
filePath := filepath.Join(hn.Cfg.BaseDir, fmt.Sprintf("%v.pid", hn.NodeID))
pid, err := os.Create(filePath)
if err != nil {
return err
}
defer pid.Close()
_, err = fmt.Fprintf(pid, "%v\n", hn.cmd.Process.Pid)
if err != nil {
return err
}
hn.pidFile = filePath
return nil
}
// ReadMacaroon waits a given duration for the macaroon file to be created. If
// the file is readable within the timeout, its content is de-serialized as a
// macaroon and returned.
@ -1239,7 +1148,8 @@ func (hn *HarnessNode) cleanup() error {
if hn.backupDbDir != "" {
err := os.RemoveAll(hn.backupDbDir)
if err != nil {
return fmt.Errorf("unable to remove backup dir: %v", err)
return fmt.Errorf("unable to remove backup dir: %v",
err)
}
}
@ -1332,8 +1242,8 @@ func (hn *HarnessNode) stop() error {
return nil
}
// shutdown stops the active lnd process and cleans up any temporary directories
// created along the way.
// shutdown stops the active lnd process and cleans up any temporary
// directories created along the way.
func (hn *HarnessNode) shutdown() error {
if err := hn.stop(); err != nil {
return err
@ -1380,29 +1290,6 @@ type chanWatchRequest struct {
includeUnannounced bool
}
// getChanPointFundingTxid returns the given channel point's funding txid in
// raw bytes.
func getChanPointFundingTxid(chanPoint *lnrpc.ChannelPoint) ([]byte, error) {
var txid []byte
// A channel point's funding txid can be get/set as a byte slice or a
// string. In the case it is a string, decode it.
switch chanPoint.GetFundingTxid().(type) {
case *lnrpc.ChannelPoint_FundingTxidBytes:
txid = chanPoint.GetFundingTxidBytes()
case *lnrpc.ChannelPoint_FundingTxidStr:
s := chanPoint.GetFundingTxidStr()
h, err := chainhash.NewHashFromStr(s)
if err != nil {
return nil, err
}
txid = h[:]
}
return txid, nil
}
func (hn *HarnessNode) checkChanPointInGraph(chanPoint wire.OutPoint) bool {
ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
@ -1667,19 +1554,6 @@ func (hn *HarnessNode) PrintErr(format string, a ...interface{}) {
hn.Cfg.Name, fmt.Sprintf(format, a...))
}
// MakeOutpoint returns the outpoint of the channel's funding transaction.
func MakeOutpoint(chanPoint *lnrpc.ChannelPoint) (wire.OutPoint, error) {
fundingTxID, err := lnrpc.GetChanPointFundingTxid(chanPoint)
if err != nil {
return wire.OutPoint{}, err
}
return wire.OutPoint{
Hash: *fundingTxID,
Index: chanPoint.OutputIndex,
}, nil
}
// handleChannelEdgeUpdates takes a series of channel edge updates, extracts
// the outpoints, and saves them to harness node's internal state.
func (hn *HarnessNode) handleChannelEdgeUpdates(
@ -1903,37 +1777,6 @@ func (hn *HarnessNode) receiveTopologyClientStream(
}
}
// CheckChannelPolicy checks that the policy matches the expected one.
func CheckChannelPolicy(policy, expectedPolicy *lnrpc.RoutingPolicy) error {
if policy.FeeBaseMsat != expectedPolicy.FeeBaseMsat {
return fmt.Errorf("expected base fee %v, got %v",
expectedPolicy.FeeBaseMsat, policy.FeeBaseMsat)
}
if policy.FeeRateMilliMsat != expectedPolicy.FeeRateMilliMsat {
return fmt.Errorf("expected fee rate %v, got %v",
expectedPolicy.FeeRateMilliMsat,
policy.FeeRateMilliMsat)
}
if policy.TimeLockDelta != expectedPolicy.TimeLockDelta {
return fmt.Errorf("expected time lock delta %v, got %v",
expectedPolicy.TimeLockDelta,
policy.TimeLockDelta)
}
if policy.MinHtlc != expectedPolicy.MinHtlc {
return fmt.Errorf("expected min htlc %v, got %v",
expectedPolicy.MinHtlc, policy.MinHtlc)
}
if policy.MaxHtlcMsat != expectedPolicy.MaxHtlcMsat {
return fmt.Errorf("expected max htlc %v, got %v",
expectedPolicy.MaxHtlcMsat, policy.MaxHtlcMsat)
}
if policy.Disabled != expectedPolicy.Disabled {
return errors.New("edge should be disabled but isn't")
}
return nil
}
// handlePolicyUpdateWatchRequest checks that if the expected policy can be
// found either in the node's interval state or describe graph response. If
// found, it will signal the request by closing the event channel. Otherwise it
@ -2007,3 +1850,96 @@ func (hn *HarnessNode) getChannelPolicies(include bool) policyUpdateMap {
return policyUpdates
}
// renameFile is a helper to rename (log) files created during integration
// tests.
func renameFile(fromFileName, toFileName string) {
err := os.Rename(fromFileName, toFileName)
if err != nil {
fmt.Printf("could not rename %s to %s: %v\n",
fromFileName, toFileName, err)
}
}
// getFinalizedLogFilePrefix returns the finalize log filename.
func getFinalizedLogFilePrefix(hn *HarnessNode) string {
pubKeyHex := hex.EncodeToString(
hn.PubKey[:logPubKeyBytes],
)
return fmt.Sprintf("%s/%d-%s-%s-%s",
GetLogDir(), hn.NodeID,
hn.Cfg.LogFilenamePrefix,
hn.Cfg.Name, pubKeyHex)
}
// finalizeLogfile makes sure the log file cleanup function is initialized,
// even if no log file is created.
func finalizeLogfile(hn *HarnessNode, fileName string) {
if hn.logFile != nil {
hn.logFile.Close()
// If logoutput flag is not set, return early.
if !*logOutput {
return
}
newFileName := fmt.Sprintf("%v.log",
getFinalizedLogFilePrefix(hn),
)
renameFile(fileName, newFileName)
}
}
func finalizeEtcdLog(hn *HarnessNode) {
if hn.Cfg.DbBackend != BackendEtcd {
return
}
etcdLogFileName := fmt.Sprintf("%s/etcd.log", hn.Cfg.LogDir)
newEtcdLogFileName := fmt.Sprintf("%v-etcd.log",
getFinalizedLogFilePrefix(hn),
)
renameFile(etcdLogFileName, newEtcdLogFileName)
}
func addLogFile(hn *HarnessNode) (string, error) {
var fileName string
dir := GetLogDir()
fileName = fmt.Sprintf("%s/%d-%s-%s-%s.log", dir, hn.NodeID,
hn.Cfg.LogFilenamePrefix, hn.Cfg.Name,
hex.EncodeToString(hn.PubKey[:logPubKeyBytes]))
// If the node's PubKey is not yet initialized, create a
// temporary file name. Later, after the PubKey has been
// initialized, the file can be moved to its final name with
// the PubKey included.
if bytes.Equal(hn.PubKey[:4], []byte{0, 0, 0, 0}) {
fileName = fmt.Sprintf("%s/%d-%s-%s-tmp__.log", dir,
hn.NodeID, hn.Cfg.LogFilenamePrefix,
hn.Cfg.Name)
}
// Create file if not exists, otherwise append.
file, err := os.OpenFile(fileName,
os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
return fileName, err
}
// Pass node's stderr to both errb and the file.
w := io.MultiWriter(hn.cmd.Stderr, file)
hn.cmd.Stderr = w
// Pass the node's stdout only to the file.
hn.cmd.Stdout = file
// Let the node keep a reference to this file, such
// that we can add to it if necessary.
hn.logFile = file
return fileName, nil
}

View File

@ -1,10 +1,14 @@
package lntest
import (
"errors"
"flag"
"fmt"
"net"
"sync/atomic"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/lnrpc"
)
const (
@ -114,3 +118,47 @@ func GenerateBtcdListenerAddresses() (string, string) {
return fmt.Sprintf(listenerFormat, NextAvailablePort()),
fmt.Sprintf(listenerFormat, NextAvailablePort())
}
// MakeOutpoint returns the outpoint of the channel's funding transaction.
func MakeOutpoint(chanPoint *lnrpc.ChannelPoint) (wire.OutPoint, error) {
fundingTxID, err := lnrpc.GetChanPointFundingTxid(chanPoint)
if err != nil {
return wire.OutPoint{}, err
}
return wire.OutPoint{
Hash: *fundingTxID,
Index: chanPoint.OutputIndex,
}, nil
}
// CheckChannelPolicy checks that the policy matches the expected one.
func CheckChannelPolicy(policy, expectedPolicy *lnrpc.RoutingPolicy) error {
if policy.FeeBaseMsat != expectedPolicy.FeeBaseMsat {
return fmt.Errorf("expected base fee %v, got %v",
expectedPolicy.FeeBaseMsat, policy.FeeBaseMsat)
}
if policy.FeeRateMilliMsat != expectedPolicy.FeeRateMilliMsat {
return fmt.Errorf("expected fee rate %v, got %v",
expectedPolicy.FeeRateMilliMsat,
policy.FeeRateMilliMsat)
}
if policy.TimeLockDelta != expectedPolicy.TimeLockDelta {
return fmt.Errorf("expected time lock delta %v, got %v",
expectedPolicy.TimeLockDelta,
policy.TimeLockDelta)
}
if policy.MinHtlc != expectedPolicy.MinHtlc {
return fmt.Errorf("expected min htlc %v, got %v",
expectedPolicy.MinHtlc, policy.MinHtlc)
}
if policy.MaxHtlcMsat != expectedPolicy.MaxHtlcMsat {
return fmt.Errorf("expected max htlc %v, got %v",
expectedPolicy.MaxHtlcMsat, policy.MaxHtlcMsat)
}
if policy.Disabled != expectedPolicy.Disabled {
return errors.New("edge should be disabled but isn't")
}
return nil
}