mirror of
https://github.com/lightningnetwork/lnd.git
synced 2024-11-19 09:53:54 +01:00
106fbeae85
This commit moves the definition of timeout values into package `wait`, preparing the incoming merging of `lntemp` and `lntest`.
1929 lines
54 KiB
Go
1929 lines
54 KiB
Go
package lntest
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"os"
|
|
"os/exec"
|
|
"path"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/btcsuite/btcd/btcutil"
|
|
"github.com/btcsuite/btcd/chaincfg"
|
|
"github.com/btcsuite/btcd/wire"
|
|
"github.com/jackc/pgx/v4/pgxpool"
|
|
"github.com/lightningnetwork/lnd/chanbackup"
|
|
"github.com/lightningnetwork/lnd/lnrpc"
|
|
"github.com/lightningnetwork/lnd/lnrpc/chainrpc"
|
|
"github.com/lightningnetwork/lnd/lnrpc/invoicesrpc"
|
|
"github.com/lightningnetwork/lnd/lnrpc/neutrinorpc"
|
|
"github.com/lightningnetwork/lnd/lnrpc/peersrpc"
|
|
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
|
|
"github.com/lightningnetwork/lnd/lnrpc/signrpc"
|
|
"github.com/lightningnetwork/lnd/lnrpc/walletrpc"
|
|
"github.com/lightningnetwork/lnd/lnrpc/watchtowerrpc"
|
|
"github.com/lightningnetwork/lnd/lnrpc/wtclientrpc"
|
|
"github.com/lightningnetwork/lnd/lntest/wait"
|
|
"github.com/lightningnetwork/lnd/macaroons"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/status"
|
|
"gopkg.in/macaroon.v2"
|
|
)
|
|
|
|
const (
|
|
// logPubKeyBytes is the number of bytes of the node's PubKey that will
|
|
// be appended to the log file name. The whole PubKey is too long and
|
|
// not really necessary to quickly identify what node produced which
|
|
// log file.
|
|
logPubKeyBytes = 4
|
|
|
|
// trickleDelay is the amount of time in milliseconds between each
|
|
// release of announcements by AuthenticatedGossiper to the network.
|
|
trickleDelay = 50
|
|
|
|
postgresDsn = "postgres://postgres:postgres@localhost:6432/%s?sslmode=disable"
|
|
|
|
// commitInterval specifies the maximum interval the graph database
|
|
// will wait between attempting to flush a batch of modifications to
|
|
// disk(db.batch-commit-interval).
|
|
commitInterval = 10 * time.Millisecond
|
|
|
|
DefaultTimeout = wait.DefaultTimeout
|
|
NodeStartTimeout = wait.NodeStartTimeout
|
|
ChannelOpenTimeout = wait.ChannelOpenTimeout
|
|
ChannelCloseTimeout = wait.ChannelCloseTimeout
|
|
)
|
|
|
|
var (
|
|
// numActiveNodes is the number of active nodes within the test network.
|
|
numActiveNodes = 0
|
|
numActiveNodesMtx sync.Mutex
|
|
)
|
|
|
|
func postgresDatabaseDsn(dbName string) string {
|
|
return fmt.Sprintf(postgresDsn, dbName)
|
|
}
|
|
|
|
// BackendConfig is an interface that abstracts away the specific chain backend
|
|
// node implementation.
|
|
type BackendConfig interface {
|
|
// GenArgs returns the arguments needed to be passed to LND at startup
|
|
// for using this node as a chain backend.
|
|
GenArgs() []string
|
|
|
|
// ConnectMiner is called to establish a connection to the test miner.
|
|
ConnectMiner() error
|
|
|
|
// DisconnectMiner is called to disconnect the miner.
|
|
DisconnectMiner() error
|
|
|
|
// Name returns the name of the backend type.
|
|
Name() string
|
|
|
|
// Credentials returns the rpc username, password and host for the
|
|
// backend.
|
|
Credentials() (string, string, string, error)
|
|
}
|
|
|
|
// NodeConfig is the basic interface a node configuration must implement.
|
|
type NodeConfig interface {
|
|
// BaseConfig returns the base node configuration struct.
|
|
BaseConfig() *BaseNodeConfig
|
|
|
|
// GenerateListeningPorts generates the ports to listen on designated
|
|
// for the current lightning network test.
|
|
GenerateListeningPorts()
|
|
|
|
// GenArgs generates a slice of command line arguments from the
|
|
// lightning node config struct.
|
|
GenArgs() []string
|
|
}
|
|
|
|
// BaseNodeConfig is the base node configuration.
|
|
type BaseNodeConfig struct {
|
|
Name string
|
|
|
|
// LogFilenamePrefix is used to prefix node log files. Can be used
|
|
// to store the current test case for simpler postmortem debugging.
|
|
LogFilenamePrefix string
|
|
|
|
BackendCfg BackendConfig
|
|
NetParams *chaincfg.Params
|
|
BaseDir string
|
|
ExtraArgs []string
|
|
|
|
DataDir string
|
|
LogDir string
|
|
TLSCertPath string
|
|
TLSKeyPath string
|
|
AdminMacPath string
|
|
ReadMacPath string
|
|
InvoiceMacPath string
|
|
|
|
HasSeed bool
|
|
Password []byte
|
|
|
|
P2PPort int
|
|
RPCPort int
|
|
RESTPort int
|
|
ProfilePort int
|
|
|
|
AcceptKeySend bool
|
|
AcceptAMP bool
|
|
|
|
FeeURL string
|
|
|
|
DbBackend DatabaseBackend
|
|
PostgresDsn string
|
|
}
|
|
|
|
func (cfg BaseNodeConfig) P2PAddr() string {
|
|
return fmt.Sprintf(ListenerFormat, cfg.P2PPort)
|
|
}
|
|
|
|
func (cfg BaseNodeConfig) RPCAddr() string {
|
|
return fmt.Sprintf(ListenerFormat, cfg.RPCPort)
|
|
}
|
|
|
|
func (cfg BaseNodeConfig) RESTAddr() string {
|
|
return fmt.Sprintf(ListenerFormat, cfg.RESTPort)
|
|
}
|
|
|
|
// DBDir returns the holding directory path of the graph database.
|
|
func (cfg BaseNodeConfig) DBDir() string {
|
|
return filepath.Join(cfg.DataDir, "graph", cfg.NetParams.Name)
|
|
}
|
|
|
|
func (cfg BaseNodeConfig) DBPath() string {
|
|
return filepath.Join(cfg.DBDir(), "channel.db")
|
|
}
|
|
|
|
func (cfg BaseNodeConfig) ChanBackupPath() string {
|
|
return filepath.Join(
|
|
cfg.DataDir, "chain", "bitcoin",
|
|
fmt.Sprintf(
|
|
"%v/%v", cfg.NetParams.Name,
|
|
chanbackup.DefaultBackupFileName,
|
|
),
|
|
)
|
|
}
|
|
|
|
// GenerateListeningPorts generates the ports to listen on designated for the
|
|
// current lightning network test.
|
|
func (cfg *BaseNodeConfig) GenerateListeningPorts() {
|
|
if cfg.P2PPort == 0 {
|
|
cfg.P2PPort = NextAvailablePort()
|
|
}
|
|
if cfg.RPCPort == 0 {
|
|
cfg.RPCPort = NextAvailablePort()
|
|
}
|
|
if cfg.RESTPort == 0 {
|
|
cfg.RESTPort = NextAvailablePort()
|
|
}
|
|
if cfg.ProfilePort == 0 {
|
|
cfg.ProfilePort = NextAvailablePort()
|
|
}
|
|
}
|
|
|
|
// BaseConfig returns the base node configuration struct.
|
|
func (cfg *BaseNodeConfig) BaseConfig() *BaseNodeConfig {
|
|
return cfg
|
|
}
|
|
|
|
// GenArgs generates a slice of command line arguments from the lightning node
|
|
// config struct.
|
|
func (cfg *BaseNodeConfig) GenArgs() []string {
|
|
var args []string
|
|
|
|
switch cfg.NetParams {
|
|
case &chaincfg.TestNet3Params:
|
|
args = append(args, "--bitcoin.testnet")
|
|
case &chaincfg.SimNetParams:
|
|
args = append(args, "--bitcoin.simnet")
|
|
case &chaincfg.RegressionNetParams:
|
|
args = append(args, "--bitcoin.regtest")
|
|
}
|
|
|
|
backendArgs := cfg.BackendCfg.GenArgs()
|
|
args = append(args, backendArgs...)
|
|
|
|
nodeArgs := []string{
|
|
"--bitcoin.active",
|
|
"--nobootstrap",
|
|
"--debuglevel=debug",
|
|
"--bitcoin.defaultchanconfs=1",
|
|
"--keep-failed-payment-attempts",
|
|
fmt.Sprintf("--db.batch-commit-interval=%v", commitInterval),
|
|
fmt.Sprintf("--bitcoin.defaultremotedelay=%v", DefaultCSV),
|
|
fmt.Sprintf("--rpclisten=%v", cfg.RPCAddr()),
|
|
fmt.Sprintf("--restlisten=%v", cfg.RESTAddr()),
|
|
fmt.Sprintf("--restcors=https://%v", cfg.RESTAddr()),
|
|
fmt.Sprintf("--listen=%v", cfg.P2PAddr()),
|
|
fmt.Sprintf("--externalip=%v", cfg.P2PAddr()),
|
|
fmt.Sprintf("--lnddir=%v", cfg.BaseDir),
|
|
fmt.Sprintf("--adminmacaroonpath=%v", cfg.AdminMacPath),
|
|
fmt.Sprintf("--readonlymacaroonpath=%v", cfg.ReadMacPath),
|
|
fmt.Sprintf("--invoicemacaroonpath=%v", cfg.InvoiceMacPath),
|
|
fmt.Sprintf("--trickledelay=%v", trickleDelay),
|
|
fmt.Sprintf("--profile=%d", cfg.ProfilePort),
|
|
fmt.Sprintf("--caches.rpc-graph-cache-duration=%d", 0),
|
|
}
|
|
args = append(args, nodeArgs...)
|
|
|
|
if !cfg.HasSeed {
|
|
args = append(args, "--noseedbackup")
|
|
}
|
|
|
|
if cfg.ExtraArgs != nil {
|
|
args = append(args, cfg.ExtraArgs...)
|
|
}
|
|
|
|
if cfg.AcceptKeySend {
|
|
args = append(args, "--accept-keysend")
|
|
}
|
|
|
|
if cfg.AcceptAMP {
|
|
args = append(args, "--accept-amp")
|
|
}
|
|
|
|
switch cfg.DbBackend {
|
|
case BackendEtcd:
|
|
args = append(args, "--db.backend=etcd")
|
|
args = append(args, "--db.etcd.embedded")
|
|
args = append(
|
|
args, fmt.Sprintf(
|
|
"--db.etcd.embedded_client_port=%v",
|
|
NextAvailablePort(),
|
|
),
|
|
)
|
|
args = append(
|
|
args, fmt.Sprintf(
|
|
"--db.etcd.embedded_peer_port=%v",
|
|
NextAvailablePort(),
|
|
),
|
|
)
|
|
args = append(
|
|
args, fmt.Sprintf(
|
|
"--db.etcd.embedded_log_file=%v",
|
|
path.Join(cfg.LogDir, "etcd.log"),
|
|
),
|
|
)
|
|
|
|
case BackendPostgres:
|
|
args = append(args, "--db.backend=postgres")
|
|
args = append(args, "--db.postgres.dsn="+cfg.PostgresDsn)
|
|
|
|
case BackendSqlite:
|
|
args = append(args, "--db.backend=sqlite")
|
|
args = append(args, fmt.Sprintf("--db.sqlite.busytimeout=%v",
|
|
wait.SqliteBusyTimeout))
|
|
}
|
|
|
|
if cfg.FeeURL != "" {
|
|
args = append(args, "--feeurl="+cfg.FeeURL)
|
|
}
|
|
|
|
return args
|
|
}
|
|
|
|
// policyUpdateMap defines a type to store channel policy updates. It has the
|
|
// format,
|
|
//
|
|
// {
|
|
// "chanPoint1": {
|
|
// "advertisingNode1": [
|
|
// policy1, policy2, ...
|
|
// ],
|
|
// "advertisingNode2": [
|
|
// policy1, policy2, ...
|
|
// ]
|
|
// },
|
|
// "chanPoint2": ...
|
|
// }.
|
|
type policyUpdateMap map[string]map[string][]*lnrpc.RoutingPolicy
|
|
|
|
// HarnessNode represents an instance of lnd running within our test network
|
|
// harness. Each HarnessNode instance also fully embeds an RPC client in
|
|
// order to pragmatically drive the node.
|
|
type HarnessNode struct {
|
|
Cfg *BaseNodeConfig
|
|
|
|
// NodeID is a unique identifier for the node within a NetworkHarness.
|
|
NodeID int
|
|
|
|
// PubKey is the serialized compressed identity public key of the node.
|
|
// This field will only be populated once the node itself has been
|
|
// started via the start() method.
|
|
PubKey [33]byte
|
|
PubKeyStr string
|
|
|
|
// rpc holds a list of RPC clients.
|
|
rpc *RPCClients
|
|
|
|
// chanWatchRequests receives a request for watching a particular event
|
|
// for a given channel.
|
|
chanWatchRequests chan *chanWatchRequest
|
|
|
|
// For each outpoint, we'll track an integer which denotes the number of
|
|
// edges seen for that channel within the network. When this number
|
|
// reaches 2, then it means that both edge advertisements has propagated
|
|
// through the network.
|
|
openChans map[wire.OutPoint]int
|
|
openChanWatchers map[wire.OutPoint][]chan struct{}
|
|
|
|
closedChans map[wire.OutPoint]struct{}
|
|
closeChanWatchers map[wire.OutPoint][]chan struct{}
|
|
|
|
// policyUpdates stores a slice of seen polices by each advertising
|
|
// node and the outpoint.
|
|
policyUpdates policyUpdateMap
|
|
|
|
// backupDbDir is the path where a database backup is stored, if any.
|
|
backupDbDir string
|
|
|
|
// postgresDbName is the name of the postgres database where lnd data is
|
|
// stored in.
|
|
postgresDbName string
|
|
|
|
// runCtx is a context with cancel method. It's used to signal when the
|
|
// node needs to quit, and used as the parent context when spawning
|
|
// children contexts for RPC requests.
|
|
runCtx context.Context
|
|
cancel context.CancelFunc
|
|
|
|
wg sync.WaitGroup
|
|
cmd *exec.Cmd
|
|
logFile *os.File
|
|
|
|
// TODO(yy): remove
|
|
lnrpc.LightningClient
|
|
lnrpc.WalletUnlockerClient
|
|
invoicesrpc.InvoicesClient
|
|
peersrpc.PeersClient
|
|
SignerClient signrpc.SignerClient
|
|
RouterClient routerrpc.RouterClient
|
|
WalletKitClient walletrpc.WalletKitClient
|
|
Watchtower watchtowerrpc.WatchtowerClient
|
|
WatchtowerClient wtclientrpc.WatchtowerClientClient
|
|
StateClient lnrpc.StateClient
|
|
ChainClient chainrpc.ChainNotifierClient
|
|
ChainKit chainrpc.ChainKitClient
|
|
NeutrinoClient neutrinorpc.NeutrinoKitClient
|
|
}
|
|
|
|
// RPCClients wraps a list of RPC clients into a single struct for easier
|
|
// access.
|
|
type RPCClients struct {
|
|
// conn is the underlying connection to the grpc endpoint of the node.
|
|
conn *grpc.ClientConn
|
|
|
|
LN lnrpc.LightningClient
|
|
WalletUnlocker lnrpc.WalletUnlockerClient
|
|
Invoice invoicesrpc.InvoicesClient
|
|
Signer signrpc.SignerClient
|
|
Router routerrpc.RouterClient
|
|
WalletKit walletrpc.WalletKitClient
|
|
Watchtower watchtowerrpc.WatchtowerClient
|
|
WatchtowerClient wtclientrpc.WatchtowerClientClient
|
|
State lnrpc.StateClient
|
|
ChainClient chainrpc.ChainNotifierClient
|
|
ChainKit chainrpc.ChainKitClient
|
|
NeutrinoClient neutrinorpc.NeutrinoKitClient
|
|
}
|
|
|
|
// Assert *HarnessNode implements the lnrpc.LightningClient interface.
|
|
var _ lnrpc.LightningClient = (*HarnessNode)(nil)
|
|
var _ lnrpc.WalletUnlockerClient = (*HarnessNode)(nil)
|
|
var _ invoicesrpc.InvoicesClient = (*HarnessNode)(nil)
|
|
var _ peersrpc.PeersClient = (*HarnessNode)(nil)
|
|
|
|
// nextNodeID generates a unique sequence to be used as the node's ID.
|
|
func nextNodeID() int {
|
|
numActiveNodesMtx.Lock()
|
|
defer numActiveNodesMtx.Unlock()
|
|
nodeNum := numActiveNodes
|
|
numActiveNodes++
|
|
|
|
return nodeNum
|
|
}
|
|
|
|
// newNode creates a new test lightning node instance from the passed config.
|
|
func newNode(cfg *BaseNodeConfig) (*HarnessNode, error) {
|
|
if cfg.BaseDir == "" {
|
|
var err error
|
|
cfg.BaseDir, err = ioutil.TempDir("", "lndtest-node")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
cfg.DataDir = filepath.Join(cfg.BaseDir, "data")
|
|
cfg.LogDir = filepath.Join(cfg.BaseDir, "logs")
|
|
cfg.TLSCertPath = filepath.Join(cfg.BaseDir, "tls.cert")
|
|
cfg.TLSKeyPath = filepath.Join(cfg.BaseDir, "tls.key")
|
|
|
|
networkDir := filepath.Join(
|
|
cfg.DataDir, "chain", "bitcoin", cfg.NetParams.Name,
|
|
)
|
|
cfg.AdminMacPath = filepath.Join(networkDir, "admin.macaroon")
|
|
cfg.ReadMacPath = filepath.Join(networkDir, "readonly.macaroon")
|
|
cfg.InvoiceMacPath = filepath.Join(networkDir, "invoice.macaroon")
|
|
|
|
cfg.GenerateListeningPorts()
|
|
|
|
// Run all tests with accept keysend. The keysend code is very isolated
|
|
// and it is highly unlikely that it would affect regular itests when
|
|
// enabled.
|
|
cfg.AcceptKeySend = true
|
|
|
|
// Create temporary database.
|
|
var dbName string
|
|
if cfg.DbBackend == BackendPostgres {
|
|
var err error
|
|
dbName, err = createTempPgDb()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cfg.PostgresDsn = postgresDatabaseDsn(dbName)
|
|
}
|
|
|
|
return &HarnessNode{
|
|
Cfg: cfg,
|
|
NodeID: nextNodeID(),
|
|
chanWatchRequests: make(chan *chanWatchRequest),
|
|
openChans: make(map[wire.OutPoint]int),
|
|
openChanWatchers: make(map[wire.OutPoint][]chan struct{}),
|
|
|
|
closedChans: make(map[wire.OutPoint]struct{}),
|
|
closeChanWatchers: make(map[wire.OutPoint][]chan struct{}),
|
|
|
|
policyUpdates: policyUpdateMap{},
|
|
|
|
postgresDbName: dbName,
|
|
}, nil
|
|
}
|
|
|
|
func createTempPgDb() (string, error) {
|
|
// Create random database name.
|
|
randBytes := make([]byte, 8)
|
|
_, err := rand.Read(randBytes)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
dbName := "itest_" + hex.EncodeToString(randBytes)
|
|
|
|
// Create database.
|
|
err = executePgQuery("CREATE DATABASE " + dbName)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return dbName, nil
|
|
}
|
|
|
|
func executePgQuery(query string) error {
|
|
pool, err := pgxpool.Connect(
|
|
context.Background(),
|
|
postgresDatabaseDsn("postgres"),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to connect to database: %w", err)
|
|
}
|
|
defer pool.Close()
|
|
|
|
_, err = pool.Exec(context.Background(), query)
|
|
return err
|
|
}
|
|
|
|
// String gives the internal state of the node which is useful for debugging.
|
|
func (hn *HarnessNode) String() string {
|
|
type nodeCfg struct {
|
|
LogFilenamePrefix string
|
|
ExtraArgs []string
|
|
HasSeed bool
|
|
P2PPort int
|
|
RPCPort int
|
|
RESTPort int
|
|
ProfilePort int
|
|
AcceptKeySend bool
|
|
AcceptAMP bool
|
|
FeeURL string
|
|
}
|
|
|
|
nodeState := struct {
|
|
NodeID int
|
|
Name string
|
|
PubKey string
|
|
OpenChans map[string]int
|
|
ClosedChans map[string]struct{}
|
|
NodeCfg nodeCfg
|
|
}{
|
|
NodeID: hn.NodeID,
|
|
Name: hn.Cfg.Name,
|
|
PubKey: hn.PubKeyStr,
|
|
OpenChans: make(map[string]int),
|
|
ClosedChans: make(map[string]struct{}),
|
|
NodeCfg: nodeCfg{
|
|
LogFilenamePrefix: hn.Cfg.LogFilenamePrefix,
|
|
ExtraArgs: hn.Cfg.ExtraArgs,
|
|
HasSeed: hn.Cfg.HasSeed,
|
|
P2PPort: hn.Cfg.P2PPort,
|
|
RPCPort: hn.Cfg.RPCPort,
|
|
RESTPort: hn.Cfg.RESTPort,
|
|
AcceptKeySend: hn.Cfg.AcceptKeySend,
|
|
AcceptAMP: hn.Cfg.AcceptAMP,
|
|
FeeURL: hn.Cfg.FeeURL,
|
|
},
|
|
}
|
|
|
|
for outpoint, count := range hn.openChans {
|
|
nodeState.OpenChans[outpoint.String()] = count
|
|
}
|
|
for outpoint, count := range hn.closedChans {
|
|
nodeState.ClosedChans[outpoint.String()] = count
|
|
}
|
|
|
|
stateBytes, err := json.MarshalIndent(nodeState, "", "\t")
|
|
if err != nil {
|
|
return fmt.Sprintf("\n encode node state with err: %v", err)
|
|
}
|
|
|
|
return fmt.Sprintf("\nnode state: %s", stateBytes)
|
|
}
|
|
|
|
// DBPath returns the filepath to the channeldb database file for this node.
|
|
func (hn *HarnessNode) DBPath() string {
|
|
return hn.Cfg.DBPath()
|
|
}
|
|
|
|
// DBDir returns the path for the directory holding channeldb file(s).
|
|
func (hn *HarnessNode) DBDir() string {
|
|
return hn.Cfg.DBDir()
|
|
}
|
|
|
|
// Name returns the name of this node set during initialization.
|
|
func (hn *HarnessNode) Name() string {
|
|
return hn.Cfg.Name
|
|
}
|
|
|
|
// TLSCertStr returns the path where the TLS certificate is stored.
|
|
func (hn *HarnessNode) TLSCertStr() string {
|
|
return hn.Cfg.TLSCertPath
|
|
}
|
|
|
|
// TLSKeyStr returns the path where the TLS key is stored.
|
|
func (hn *HarnessNode) TLSKeyStr() string {
|
|
return hn.Cfg.TLSKeyPath
|
|
}
|
|
|
|
// ChanBackupPath returns the fielpath to the on-disk channel.backup file for
|
|
// this node.
|
|
func (hn *HarnessNode) ChanBackupPath() string {
|
|
return hn.Cfg.ChanBackupPath()
|
|
}
|
|
|
|
// AdminMacPath returns the filepath to the admin.macaroon file for this node.
|
|
func (hn *HarnessNode) AdminMacPath() string {
|
|
return hn.Cfg.AdminMacPath
|
|
}
|
|
|
|
// ReadMacPath returns the filepath to the readonly.macaroon file for this node.
|
|
func (hn *HarnessNode) ReadMacPath() string {
|
|
return hn.Cfg.ReadMacPath
|
|
}
|
|
|
|
// InvoiceMacPath returns the filepath to the invoice.macaroon file for this
|
|
// node.
|
|
func (hn *HarnessNode) InvoiceMacPath() string {
|
|
return hn.Cfg.InvoiceMacPath
|
|
}
|
|
|
|
// startLnd handles the startup of lnd, creating log files, and possibly kills
|
|
// the process when needed.
|
|
func (hn *HarnessNode) startLnd(lndBinary string, lndError chan<- error) error {
|
|
args := hn.Cfg.GenArgs()
|
|
hn.cmd = exec.Command(lndBinary, args...)
|
|
|
|
// Redirect stderr output to buffer
|
|
var errb bytes.Buffer
|
|
hn.cmd.Stderr = &errb
|
|
|
|
// If the logoutput flag is passed, redirect output from the nodes to
|
|
// log files.
|
|
var (
|
|
fileName string
|
|
err error
|
|
)
|
|
if *LogOutput {
|
|
fileName, err = addLogFile(hn)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err := hn.cmd.Start(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Launch a new goroutine which that bubbles up any potential fatal
|
|
// process errors to the goroutine running the tests.
|
|
hn.wg.Add(1)
|
|
go func() {
|
|
defer hn.wg.Done()
|
|
|
|
err := hn.cmd.Wait()
|
|
if err != nil {
|
|
lndError <- fmt.Errorf("%v\n%v", err, errb.String())
|
|
}
|
|
|
|
// Make sure log file is closed and renamed if necessary.
|
|
finalizeLogfile(hn, fileName)
|
|
|
|
// Rename the etcd.log file if the node was running on embedded
|
|
// etcd.
|
|
finalizeEtcdLog(hn)
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Start launches a new process running lnd. Additionally, the PID of the
|
|
// launched process is saved in order to possibly kill the process forcibly
|
|
// later.
|
|
//
|
|
// This may not clean up properly if an error is returned, so the caller should
|
|
// call shutdown() regardless of the return value.
|
|
func (hn *HarnessNode) start(lndBinary string, lndError chan<- error,
|
|
wait bool) error {
|
|
|
|
// Init the runCtx.
|
|
ctxt, cancel := context.WithCancel(context.Background())
|
|
hn.runCtx = ctxt
|
|
hn.cancel = cancel
|
|
|
|
// Start lnd and prepare logs.
|
|
if err := hn.startLnd(lndBinary, lndError); err != nil {
|
|
return err
|
|
}
|
|
|
|
// We may want to skip waiting for the node to come up (eg. the node
|
|
// is waiting to become the leader).
|
|
if !wait {
|
|
return nil
|
|
}
|
|
|
|
// Since Stop uses the LightningClient to stop the node, if we fail to
|
|
// get a connected client, we have to kill the process.
|
|
useMacaroons := !hn.Cfg.HasSeed
|
|
conn, err := hn.ConnectRPC(useMacaroons)
|
|
if err != nil {
|
|
err = fmt.Errorf("ConnectRPC err: %w", err)
|
|
cmdErr := hn.cmd.Process.Kill()
|
|
if cmdErr != nil {
|
|
err = fmt.Errorf("kill process got err: %w: %v",
|
|
cmdErr, err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Init all the RPC clients.
|
|
hn.InitRPCClients(conn)
|
|
|
|
if err := hn.WaitUntilStarted(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// If the node was created with a seed, we will need to perform an
|
|
// additional step to unlock the wallet. The connection returned will
|
|
// only use the TLS certs, and can only perform operations necessary to
|
|
// unlock the daemon.
|
|
if hn.Cfg.HasSeed {
|
|
// TODO(yy): remove
|
|
hn.WalletUnlockerClient = lnrpc.NewWalletUnlockerClient(conn)
|
|
return nil
|
|
}
|
|
|
|
return hn.initLightningClient()
|
|
}
|
|
|
|
// WaitUntilStarted waits until the wallet state flips from "WAITING_TO_START".
|
|
func (hn *HarnessNode) WaitUntilStarted() error {
|
|
return hn.waitTillServerState(func(s lnrpc.WalletState) bool {
|
|
return s != lnrpc.WalletState_WAITING_TO_START
|
|
})
|
|
}
|
|
|
|
// WaitUntilStateReached waits until the given wallet state (or one of the
|
|
// states following it) has been reached.
|
|
func (hn *HarnessNode) WaitUntilStateReached(
|
|
desiredState lnrpc.WalletState) error {
|
|
|
|
return hn.waitTillServerState(func(s lnrpc.WalletState) bool {
|
|
return s >= desiredState
|
|
})
|
|
}
|
|
|
|
// WaitUntilServerActive waits until the lnd daemon is fully started.
|
|
func (hn *HarnessNode) WaitUntilServerActive() error {
|
|
return hn.waitTillServerState(func(s lnrpc.WalletState) bool {
|
|
return s == lnrpc.WalletState_SERVER_ACTIVE
|
|
})
|
|
}
|
|
|
|
// WaitUntilLeader attempts to finish the start procedure by initiating an RPC
|
|
// connection and setting up the wallet unlocker client. This is needed when
|
|
// a node that has recently been started was waiting to become the leader and
|
|
// we're at the point when we expect that it is the leader now (awaiting
|
|
// unlock).
|
|
func (hn *HarnessNode) WaitUntilLeader(timeout time.Duration) error {
|
|
var (
|
|
conn *grpc.ClientConn
|
|
connErr error
|
|
)
|
|
|
|
if err := wait.NoError(func() error {
|
|
conn, connErr = hn.ConnectRPC(!hn.Cfg.HasSeed)
|
|
return connErr
|
|
}, timeout); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Init all the RPC clients.
|
|
hn.InitRPCClients(conn)
|
|
|
|
if err := hn.WaitUntilStarted(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// If the node was created with a seed, we will need to perform an
|
|
// additional step to unlock the wallet. The connection returned will
|
|
// only use the TLS certs, and can only perform operations necessary to
|
|
// unlock the daemon.
|
|
if hn.Cfg.HasSeed {
|
|
// TODO(yy): remove
|
|
hn.WalletUnlockerClient = lnrpc.NewWalletUnlockerClient(conn)
|
|
|
|
return nil
|
|
}
|
|
|
|
return hn.initLightningClient()
|
|
}
|
|
|
|
// initClientWhenReady waits until the main gRPC server is detected as active,
|
|
// then complete the normal HarnessNode gRPC connection creation. If the node
|
|
// is initialized stateless, the macaroon is returned so that the client can
|
|
// use it.
|
|
func (hn *HarnessNode) initClientWhenReady(stateless bool,
|
|
macBytes []byte) error {
|
|
|
|
// Wait for the wallet to finish unlocking, such that we can connect to
|
|
// it via a macaroon-authenticated rpc connection.
|
|
var (
|
|
conn *grpc.ClientConn
|
|
err error
|
|
)
|
|
if err = wait.NoError(func() error {
|
|
// If the node has been initialized stateless, we need to pass
|
|
// the macaroon to the client.
|
|
if stateless {
|
|
adminMac := &macaroon.Macaroon{}
|
|
err := adminMac.UnmarshalBinary(macBytes)
|
|
if err != nil {
|
|
return fmt.Errorf("unmarshal failed: %w", err)
|
|
}
|
|
conn, err = hn.ConnectRPCWithMacaroon(adminMac)
|
|
return err
|
|
}
|
|
|
|
// Normal initialization, we expect a macaroon to be in the
|
|
// file system.
|
|
conn, err = hn.ConnectRPC(true)
|
|
return err
|
|
}, DefaultTimeout); err != nil {
|
|
return fmt.Errorf("timeout while init client: %w", err)
|
|
}
|
|
|
|
// Init all the RPC clients.
|
|
hn.InitRPCClients(conn)
|
|
|
|
return hn.initLightningClient()
|
|
}
|
|
|
|
// Init initializes a harness node by passing the init request via rpc. After
|
|
// the request is submitted, this method will block until a
|
|
// macaroon-authenticated RPC connection can be established to the harness
|
|
// node. Once established, the new connection is used to initialize the
|
|
// LightningClient and subscribes the HarnessNode to topology changes.
|
|
func (hn *HarnessNode) Init(
|
|
initReq *lnrpc.InitWalletRequest) (*lnrpc.InitWalletResponse, error) {
|
|
|
|
ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
|
|
defer cancel()
|
|
|
|
response, err := hn.rpc.WalletUnlocker.InitWallet(ctxt, initReq)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to init wallet: %w", err)
|
|
}
|
|
|
|
err = hn.initClientWhenReady(
|
|
initReq.StatelessInit, response.AdminMacaroon,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to init: %w", err)
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
// InitChangePassword initializes a harness node by passing the change password
|
|
// request via RPC. After the request is submitted, this method will block until
|
|
// a macaroon-authenticated RPC connection can be established to the harness
|
|
// node. Once established, the new connection is used to initialize the
|
|
// LightningClient and subscribes the HarnessNode to topology changes.
|
|
func (hn *HarnessNode) InitChangePassword(
|
|
chngPwReq *lnrpc.ChangePasswordRequest) (*lnrpc.ChangePasswordResponse,
|
|
error) {
|
|
|
|
ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
|
|
defer cancel()
|
|
|
|
response, err := hn.rpc.WalletUnlocker.ChangePassword(ctxt, chngPwReq)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = hn.initClientWhenReady(
|
|
chngPwReq.StatelessInit, response.AdminMacaroon,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
// Unlock attempts to unlock the wallet of the target HarnessNode. This method
|
|
// should be called after the restart of a HarnessNode that was created with a
|
|
// seed+password. Once this method returns, the HarnessNode will be ready to
|
|
// accept normal gRPC requests and harness command.
|
|
func (hn *HarnessNode) Unlock(unlockReq *lnrpc.UnlockWalletRequest) error {
|
|
ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
|
|
defer cancel()
|
|
|
|
// Otherwise, we'll need to unlock the node before it's able to start
|
|
// up properly.
|
|
_, err := hn.rpc.WalletUnlocker.UnlockWallet(ctxt, unlockReq)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Now that the wallet has been unlocked, we'll wait for the RPC client
|
|
// to be ready, then establish the normal gRPC connection.
|
|
return hn.initClientWhenReady(false, nil)
|
|
}
|
|
|
|
// waitTillServerState makes a subscription to the server's state change and
|
|
// blocks until the server is in the targeted state.
|
|
func (hn *HarnessNode) waitTillServerState(
|
|
predicate func(state lnrpc.WalletState) bool) error {
|
|
|
|
ctxt, cancel := context.WithTimeout(hn.runCtx, NodeStartTimeout)
|
|
defer cancel()
|
|
|
|
client, err := hn.rpc.State.SubscribeState(
|
|
ctxt, &lnrpc.SubscribeStateRequest{},
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to subscribe to state: %w", err)
|
|
}
|
|
|
|
errChan := make(chan error, 1)
|
|
done := make(chan struct{})
|
|
go func() {
|
|
for {
|
|
resp, err := client.Recv()
|
|
if err != nil {
|
|
errChan <- err
|
|
return
|
|
}
|
|
|
|
if predicate(resp.State) {
|
|
close(done)
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
var lastErr error
|
|
for {
|
|
select {
|
|
case err := <-errChan:
|
|
lastErr = err
|
|
|
|
case <-done:
|
|
return nil
|
|
|
|
case <-time.After(NodeStartTimeout):
|
|
return fmt.Errorf("timeout waiting for state, "+
|
|
"got err from stream: %v", lastErr)
|
|
}
|
|
}
|
|
}
|
|
|
|
// InitRPCClients initializes a list of RPC clients for the node.
|
|
func (hn *HarnessNode) InitRPCClients(c *grpc.ClientConn) {
|
|
hn.rpc = &RPCClients{
|
|
conn: c,
|
|
LN: lnrpc.NewLightningClient(c),
|
|
Invoice: invoicesrpc.NewInvoicesClient(c),
|
|
Router: routerrpc.NewRouterClient(c),
|
|
WalletKit: walletrpc.NewWalletKitClient(c),
|
|
WalletUnlocker: lnrpc.NewWalletUnlockerClient(c),
|
|
Watchtower: watchtowerrpc.NewWatchtowerClient(c),
|
|
WatchtowerClient: wtclientrpc.NewWatchtowerClientClient(c),
|
|
Signer: signrpc.NewSignerClient(c),
|
|
State: lnrpc.NewStateClient(c),
|
|
ChainClient: chainrpc.NewChainNotifierClient(c),
|
|
ChainKit: chainrpc.NewChainKitClient(c),
|
|
NeutrinoClient: neutrinorpc.NewNeutrinoKitClient(c),
|
|
}
|
|
}
|
|
|
|
// initLightningClient blocks until the lnd server is fully started and
|
|
// subscribes the harness node to graph topology updates. This method also
|
|
// spawns a lightning network watcher for this node, which watches for topology
|
|
// changes.
|
|
func (hn *HarnessNode) initLightningClient() error {
|
|
// TODO(yy): remove
|
|
// Construct the LightningClient that will allow us to use the
|
|
// HarnessNode directly for normal rpc operations.
|
|
conn := hn.rpc.conn
|
|
hn.LightningClient = lnrpc.NewLightningClient(conn)
|
|
hn.InvoicesClient = invoicesrpc.NewInvoicesClient(conn)
|
|
hn.RouterClient = routerrpc.NewRouterClient(conn)
|
|
hn.WalletKitClient = walletrpc.NewWalletKitClient(conn)
|
|
hn.Watchtower = watchtowerrpc.NewWatchtowerClient(conn)
|
|
hn.WatchtowerClient = wtclientrpc.NewWatchtowerClientClient(conn)
|
|
hn.SignerClient = signrpc.NewSignerClient(conn)
|
|
hn.PeersClient = peersrpc.NewPeersClient(conn)
|
|
hn.StateClient = lnrpc.NewStateClient(conn)
|
|
hn.ChainClient = chainrpc.NewChainNotifierClient(conn)
|
|
hn.ChainKit = chainrpc.NewChainKitClient(conn)
|
|
hn.NeutrinoClient = neutrinorpc.NewNeutrinoKitClient(conn)
|
|
|
|
// Wait until the server is fully started.
|
|
if err := hn.WaitUntilServerActive(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Set the harness node's pubkey to what the node claims in GetInfo.
|
|
// The RPC must have been started at this point.
|
|
if err := hn.FetchNodeInfo(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Launch the watcher that will hook into graph related topology change
|
|
// from the PoV of this node.
|
|
hn.wg.Add(1)
|
|
go hn.lightningNetworkWatcher()
|
|
|
|
return nil
|
|
}
|
|
|
|
// FetchNodeInfo queries an unlocked node to retrieve its public key.
|
|
func (hn *HarnessNode) FetchNodeInfo() error {
|
|
// Obtain the lnid of this node for quick identification purposes.
|
|
info, err := hn.rpc.LN.GetInfo(hn.runCtx, &lnrpc.GetInfoRequest{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
hn.PubKeyStr = info.IdentityPubkey
|
|
|
|
pubkey, err := hex.DecodeString(info.IdentityPubkey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
copy(hn.PubKey[:], pubkey)
|
|
|
|
return nil
|
|
}
|
|
|
|
// AddToLogf adds a line of choice to the node's logfile. This is useful
|
|
// to interleave test output with output from the node.
|
|
func (hn *HarnessNode) AddToLogf(format string, a ...interface{}) {
|
|
// If this node was not set up with a log file, just return early.
|
|
if hn.logFile == nil {
|
|
return
|
|
}
|
|
|
|
desc := fmt.Sprintf("itest: %s\n", fmt.Sprintf(format, a...))
|
|
if _, err := hn.logFile.WriteString(desc); err != nil {
|
|
hn.PrintErrf("write to log err: %v", err)
|
|
}
|
|
}
|
|
|
|
// ReadMacaroon waits a given duration for the macaroon file to be created. If
|
|
// the file is readable within the timeout, its content is de-serialized as a
|
|
// macaroon and returned.
|
|
func (hn *HarnessNode) ReadMacaroon(macPath string, timeout time.Duration) (
|
|
*macaroon.Macaroon, error) {
|
|
|
|
// Wait until macaroon file is created and has valid content before
|
|
// using it.
|
|
var mac *macaroon.Macaroon
|
|
err := wait.NoError(func() error {
|
|
macBytes, err := ioutil.ReadFile(macPath)
|
|
if err != nil {
|
|
return fmt.Errorf("error reading macaroon file: %v",
|
|
err)
|
|
}
|
|
|
|
newMac := &macaroon.Macaroon{}
|
|
if err = newMac.UnmarshalBinary(macBytes); err != nil {
|
|
return fmt.Errorf("error unmarshalling macaroon "+
|
|
"file: %v", err)
|
|
}
|
|
mac = newMac
|
|
|
|
return nil
|
|
}, timeout)
|
|
|
|
return mac, err
|
|
}
|
|
|
|
// ConnectRPCWithMacaroon uses the TLS certificate and given macaroon to
|
|
// create a gRPC client connection.
|
|
func (hn *HarnessNode) ConnectRPCWithMacaroon(mac *macaroon.Macaroon) (
|
|
*grpc.ClientConn, error) {
|
|
|
|
// Wait until TLS certificate is created and has valid content before
|
|
// using it, up to 30 sec.
|
|
var tlsCreds credentials.TransportCredentials
|
|
err := wait.NoError(func() error {
|
|
var err error
|
|
tlsCreds, err = credentials.NewClientTLSFromFile(
|
|
hn.Cfg.TLSCertPath, "",
|
|
)
|
|
return err
|
|
}, DefaultTimeout)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error reading TLS cert: %v", err)
|
|
}
|
|
|
|
opts := []grpc.DialOption{
|
|
grpc.WithBlock(),
|
|
grpc.WithTransportCredentials(tlsCreds),
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
|
|
defer cancel()
|
|
|
|
if mac == nil {
|
|
return grpc.DialContext(ctx, hn.Cfg.RPCAddr(), opts...)
|
|
}
|
|
macCred, err := macaroons.NewMacaroonCredential(mac)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error cloning mac: %v", err)
|
|
}
|
|
opts = append(opts, grpc.WithPerRPCCredentials(macCred))
|
|
|
|
return grpc.DialContext(ctx, hn.Cfg.RPCAddr(), opts...)
|
|
}
|
|
|
|
// ConnectRPC uses the TLS certificate and admin macaroon files written by the
|
|
// lnd node to create a gRPC client connection.
|
|
func (hn *HarnessNode) ConnectRPC(useMacs bool) (*grpc.ClientConn, error) {
|
|
// If we don't want to use macaroons, just pass nil, the next method
|
|
// will handle it correctly.
|
|
if !useMacs {
|
|
return hn.ConnectRPCWithMacaroon(nil)
|
|
}
|
|
|
|
// If we should use a macaroon, always take the admin macaroon as a
|
|
// default.
|
|
mac, err := hn.ReadMacaroon(hn.Cfg.AdminMacPath, DefaultTimeout)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return hn.ConnectRPCWithMacaroon(mac)
|
|
}
|
|
|
|
// SetExtraArgs assigns the ExtraArgs field for the node's configuration. The
|
|
// changes will take effect on restart.
|
|
func (hn *HarnessNode) SetExtraArgs(extraArgs []string) {
|
|
hn.Cfg.ExtraArgs = extraArgs
|
|
}
|
|
|
|
// cleanup cleans up all the temporary files created by the node's process.
|
|
func (hn *HarnessNode) cleanup() error {
|
|
if hn.backupDbDir != "" {
|
|
err := os.RemoveAll(hn.backupDbDir)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to remove backup dir: %v",
|
|
err)
|
|
}
|
|
}
|
|
|
|
return os.RemoveAll(hn.Cfg.BaseDir)
|
|
}
|
|
|
|
// Stop attempts to stop the active lnd process.
|
|
func (hn *HarnessNode) stop() error {
|
|
// Do nothing if the process is not running.
|
|
if hn.runCtx == nil {
|
|
return nil
|
|
}
|
|
|
|
// If start() failed before creating clients, we will just wait for the
|
|
// child process to die.
|
|
if hn.rpc != nil && hn.rpc.LN != nil {
|
|
// Don't watch for error because sometimes the RPC connection
|
|
// gets closed before a response is returned.
|
|
req := lnrpc.StopRequest{}
|
|
|
|
err := wait.NoError(func() error {
|
|
_, err := hn.rpc.LN.StopDaemon(hn.runCtx, &req)
|
|
switch {
|
|
case err == nil:
|
|
return nil
|
|
|
|
// Try again if a recovery/rescan is in progress.
|
|
case strings.Contains(
|
|
err.Error(), "recovery in progress",
|
|
):
|
|
return err
|
|
|
|
default:
|
|
return nil
|
|
}
|
|
}, DefaultTimeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Stop the runCtx and wait for goroutines to finish.
|
|
hn.cancel()
|
|
|
|
// Wait for lnd process to exit.
|
|
err := wait.NoError(func() error {
|
|
if hn.cmd.ProcessState == nil {
|
|
return fmt.Errorf("process did not exit")
|
|
}
|
|
|
|
if !hn.cmd.ProcessState.Exited() {
|
|
return fmt.Errorf("process did not exit")
|
|
}
|
|
|
|
// Wait for goroutines to be finished.
|
|
hn.wg.Wait()
|
|
|
|
return nil
|
|
}, DefaultTimeout*2)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
hn.LightningClient = nil
|
|
hn.WalletUnlockerClient = nil
|
|
hn.Watchtower = nil
|
|
hn.WatchtowerClient = nil
|
|
hn.NeutrinoClient = nil
|
|
|
|
// Close any attempts at further grpc connections.
|
|
if hn.rpc.conn != nil {
|
|
err := status.Code(hn.rpc.conn.Close())
|
|
switch err {
|
|
case codes.OK:
|
|
return nil
|
|
|
|
// When the context is canceled above, we might get the
|
|
// following error as the context is no longer active.
|
|
case codes.Canceled:
|
|
return nil
|
|
|
|
case codes.Unknown:
|
|
return fmt.Errorf("unknown error attempting to stop "+
|
|
"grpc client: %v", err)
|
|
|
|
default:
|
|
return fmt.Errorf("error attempting to stop "+
|
|
"grpc client: %v", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// shutdown stops the active lnd process and cleans up any temporary
|
|
// directories created along the way.
|
|
func (hn *HarnessNode) shutdown() error {
|
|
if err := hn.stop(); err != nil {
|
|
return err
|
|
}
|
|
if err := hn.cleanup(); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// kill kills the lnd process.
|
|
func (hn *HarnessNode) kill() error {
|
|
return hn.cmd.Process.Kill()
|
|
}
|
|
|
|
type chanWatchType uint8
|
|
|
|
const (
|
|
// watchOpenChannel specifies that this is a request to watch an open
|
|
// channel event.
|
|
watchOpenChannel chanWatchType = iota
|
|
|
|
// watchCloseChannel specifies that this is a request to watch a close
|
|
// channel event.
|
|
watchCloseChannel
|
|
|
|
// watchPolicyUpdate specifies that this is a request to watch a policy
|
|
// update event.
|
|
watchPolicyUpdate
|
|
)
|
|
|
|
// closeChanWatchRequest is a request to the lightningNetworkWatcher to be
|
|
// notified once it's detected within the test Lightning Network, that a
|
|
// channel has either been added or closed.
|
|
type chanWatchRequest struct {
|
|
chanPoint wire.OutPoint
|
|
|
|
chanWatchType chanWatchType
|
|
|
|
eventChan chan struct{}
|
|
|
|
advertisingNode string
|
|
policy *lnrpc.RoutingPolicy
|
|
includeUnannounced bool
|
|
}
|
|
|
|
func (hn *HarnessNode) checkChanPointInGraph(chanPoint wire.OutPoint) bool {
|
|
ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
|
|
defer cancel()
|
|
|
|
chanGraph, err := hn.DescribeGraph(ctxt, &lnrpc.ChannelGraphRequest{})
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
targetChanPoint := chanPoint.String()
|
|
for _, chanEdge := range chanGraph.Edges {
|
|
candidateChanPoint := chanEdge.ChanPoint
|
|
if targetChanPoint == candidateChanPoint {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// lightningNetworkWatcher is a goroutine which is able to dispatch
|
|
// notifications once it has been observed that a target channel has been
|
|
// closed or opened within the network. In order to dispatch these
|
|
// notifications, the GraphTopologySubscription client exposed as part of the
|
|
// gRPC interface is used.
|
|
func (hn *HarnessNode) lightningNetworkWatcher() {
|
|
defer hn.wg.Done()
|
|
|
|
graphUpdates := make(chan *lnrpc.GraphTopologyUpdate)
|
|
|
|
// Start a goroutine to receive graph updates.
|
|
hn.wg.Add(1)
|
|
go func() {
|
|
defer hn.wg.Done()
|
|
err := hn.receiveTopologyClientStream(graphUpdates)
|
|
|
|
if err != nil {
|
|
hn.PrintErrf("receive topology client stream "+
|
|
"got err:%v", err)
|
|
}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
// A new graph update has just been received, so we'll examine
|
|
// the current set of registered clients to see if we can
|
|
// dispatch any requests.
|
|
case graphUpdate := <-graphUpdates:
|
|
hn.handleChannelEdgeUpdates(graphUpdate.ChannelUpdates)
|
|
hn.handleClosedChannelUpdate(graphUpdate.ClosedChans)
|
|
// TODO(yy): handle node updates too
|
|
|
|
// A new watch request, has just arrived. We'll either be able
|
|
// to dispatch immediately, or need to add the client for
|
|
// processing later.
|
|
case watchRequest := <-hn.chanWatchRequests:
|
|
switch watchRequest.chanWatchType {
|
|
case watchOpenChannel:
|
|
// TODO(roasbeef): add update type also, checks
|
|
// for multiple of 2
|
|
hn.handleOpenChannelWatchRequest(watchRequest)
|
|
|
|
case watchCloseChannel:
|
|
hn.handleCloseChannelWatchRequest(watchRequest)
|
|
|
|
case watchPolicyUpdate:
|
|
hn.handlePolicyUpdateWatchRequest(watchRequest)
|
|
}
|
|
|
|
case <-hn.runCtx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// WaitForNetworkChannelOpen will block until a channel with the target
|
|
// outpoint is seen as being fully advertised within the network. A channel is
|
|
// considered "fully advertised" once both of its directional edges have been
|
|
// advertised within the test Lightning Network.
|
|
func (hn *HarnessNode) WaitForNetworkChannelOpen(
|
|
chanPoint *lnrpc.ChannelPoint) error {
|
|
|
|
ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
|
|
defer cancel()
|
|
|
|
eventChan := make(chan struct{})
|
|
|
|
op, err := MakeOutpoint(chanPoint)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create outpoint for %v "+
|
|
"got err: %v", chanPoint, err)
|
|
}
|
|
|
|
hn.chanWatchRequests <- &chanWatchRequest{
|
|
chanPoint: op,
|
|
eventChan: eventChan,
|
|
chanWatchType: watchOpenChannel,
|
|
}
|
|
|
|
select {
|
|
case <-eventChan:
|
|
return nil
|
|
case <-ctxt.Done():
|
|
return fmt.Errorf("channel:%s not opened before timeout: %s",
|
|
op, hn)
|
|
}
|
|
}
|
|
|
|
// WaitForNetworkChannelClose will block until a channel with the target
|
|
// outpoint is seen as closed within the network. A channel is considered
|
|
// closed once a transaction spending the funding outpoint is seen within a
|
|
// confirmed block.
|
|
func (hn *HarnessNode) WaitForNetworkChannelClose(
|
|
chanPoint *lnrpc.ChannelPoint) error {
|
|
|
|
ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
|
|
defer cancel()
|
|
|
|
eventChan := make(chan struct{})
|
|
|
|
op, err := MakeOutpoint(chanPoint)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create outpoint for %v "+
|
|
"got err: %v", chanPoint, err)
|
|
}
|
|
|
|
hn.chanWatchRequests <- &chanWatchRequest{
|
|
chanPoint: op,
|
|
eventChan: eventChan,
|
|
chanWatchType: watchCloseChannel,
|
|
}
|
|
|
|
select {
|
|
case <-eventChan:
|
|
return nil
|
|
case <-ctxt.Done():
|
|
return fmt.Errorf("channel:%s not closed before timeout: "+
|
|
"%s", op, hn)
|
|
}
|
|
}
|
|
|
|
// WaitForChannelPolicyUpdate will block until a channel policy with the target
|
|
// outpoint and advertisingNode is seen within the network.
|
|
func (hn *HarnessNode) WaitForChannelPolicyUpdate(
|
|
advertisingNode string, policy *lnrpc.RoutingPolicy,
|
|
chanPoint *lnrpc.ChannelPoint, includeUnannounced bool) error {
|
|
|
|
ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
|
|
defer cancel()
|
|
|
|
eventChan := make(chan struct{})
|
|
|
|
op, err := MakeOutpoint(chanPoint)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create outpoint for %v"+
|
|
"got err: %v", chanPoint, err)
|
|
}
|
|
|
|
ticker := time.NewTicker(wait.PollInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
// Send a watch request every second.
|
|
case <-ticker.C:
|
|
// Did the event can close in the meantime? We want to
|
|
// avoid a "close of closed channel" panic since we're
|
|
// re-using the same event chan for multiple requests.
|
|
select {
|
|
case <-eventChan:
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
hn.chanWatchRequests <- &chanWatchRequest{
|
|
chanPoint: op,
|
|
eventChan: eventChan,
|
|
chanWatchType: watchPolicyUpdate,
|
|
policy: policy,
|
|
advertisingNode: advertisingNode,
|
|
includeUnannounced: includeUnannounced,
|
|
}
|
|
|
|
case <-eventChan:
|
|
return nil
|
|
|
|
case <-ctxt.Done():
|
|
return fmt.Errorf("channel:%s policy not updated "+
|
|
"before timeout: [%s:%v] %s", op,
|
|
advertisingNode, policy, hn.String())
|
|
}
|
|
}
|
|
}
|
|
|
|
// WaitForBlockchainSync waits for the target node to be fully synchronized
|
|
// with the blockchain. If the passed context object has a set timeout, it will
|
|
// continually poll until the timeout has elapsed. In the case that the chain
|
|
// isn't synced before the timeout is up, this function will return an error.
|
|
func (hn *HarnessNode) WaitForBlockchainSync() error {
|
|
ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
|
|
defer cancel()
|
|
|
|
ticker := time.NewTicker(time.Millisecond * 100)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
resp, err := hn.rpc.LN.GetInfo(ctxt, &lnrpc.GetInfoRequest{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if resp.SyncedToChain {
|
|
return nil
|
|
}
|
|
|
|
select {
|
|
case <-ctxt.Done():
|
|
return fmt.Errorf("timeout while waiting for " +
|
|
"blockchain sync")
|
|
case <-hn.runCtx.Done():
|
|
return nil
|
|
case <-ticker.C:
|
|
}
|
|
}
|
|
}
|
|
|
|
// WaitForBalance waits until the node sees the expected confirmed/unconfirmed
|
|
// balance within their wallet.
|
|
func (hn *HarnessNode) WaitForBalance(expectedBalance btcutil.Amount,
|
|
confirmed bool) error {
|
|
|
|
req := &lnrpc.WalletBalanceRequest{}
|
|
|
|
var lastBalance btcutil.Amount
|
|
doesBalanceMatch := func() bool {
|
|
balance, err := hn.rpc.LN.WalletBalance(hn.runCtx, req)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
if confirmed {
|
|
lastBalance = btcutil.Amount(balance.ConfirmedBalance)
|
|
return btcutil.Amount(balance.ConfirmedBalance) ==
|
|
expectedBalance
|
|
}
|
|
|
|
lastBalance = btcutil.Amount(balance.UnconfirmedBalance)
|
|
return btcutil.Amount(balance.UnconfirmedBalance) ==
|
|
expectedBalance
|
|
}
|
|
|
|
err := wait.Predicate(doesBalanceMatch, DefaultTimeout)
|
|
if err != nil {
|
|
return fmt.Errorf("balances not synced after deadline: "+
|
|
"expected %v, only have %v", expectedBalance,
|
|
lastBalance)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// PrintErrf prints an error to the console.
|
|
func (hn *HarnessNode) PrintErrf(format string, a ...interface{}) {
|
|
fmt.Printf("itest error from [node:%s]: %s\n", // nolint:forbidigo
|
|
hn.Cfg.Name, fmt.Sprintf(format, a...))
|
|
}
|
|
|
|
// handleChannelEdgeUpdates takes a series of channel edge updates, extracts
|
|
// the outpoints, and saves them to harness node's internal state.
|
|
func (hn *HarnessNode) handleChannelEdgeUpdates(
|
|
updates []*lnrpc.ChannelEdgeUpdate) {
|
|
|
|
// For each new channel, we'll increment the number of
|
|
// edges seen by one.
|
|
for _, newChan := range updates {
|
|
op, err := MakeOutpoint(newChan.ChanPoint)
|
|
if err != nil {
|
|
hn.PrintErrf("failed to create outpoint for %v "+
|
|
"got err: %v", newChan.ChanPoint, err)
|
|
return
|
|
}
|
|
hn.openChans[op]++
|
|
|
|
// For this new channel, if the number of edges seen is less
|
|
// than two, then the channel hasn't been fully announced yet.
|
|
if numEdges := hn.openChans[op]; numEdges < 2 {
|
|
return
|
|
}
|
|
|
|
// Otherwise, we'll notify all the registered watchers and
|
|
// remove the dispatched watchers.
|
|
for _, eventChan := range hn.openChanWatchers[op] {
|
|
close(eventChan)
|
|
}
|
|
delete(hn.openChanWatchers, op)
|
|
|
|
// Check whether there's a routing policy update. If so, save
|
|
// it to the node state.
|
|
if newChan.RoutingPolicy == nil {
|
|
continue
|
|
}
|
|
|
|
// Append the policy to the slice.
|
|
node := newChan.AdvertisingNode
|
|
policies := hn.policyUpdates[op.String()]
|
|
|
|
// If the map[op] is nil, we need to initialize the map first.
|
|
if policies == nil {
|
|
policies = make(map[string][]*lnrpc.RoutingPolicy)
|
|
}
|
|
policies[node] = append(
|
|
policies[node], newChan.RoutingPolicy,
|
|
)
|
|
hn.policyUpdates[op.String()] = policies
|
|
}
|
|
}
|
|
|
|
// handleOpenChannelWatchRequest processes a watch open channel request by
|
|
// checking the number of the edges seen for a given channel point. If the
|
|
// number is no less than 2 then the channel is considered open. Otherwise, we
|
|
// will attempt to find it in its channel graph. If neither can be found, the
|
|
// request is added to a watch request list than will be handled by
|
|
// handleChannelEdgeUpdates.
|
|
func (hn *HarnessNode) handleOpenChannelWatchRequest(req *chanWatchRequest) {
|
|
targetChan := req.chanPoint
|
|
|
|
// If this is an open request, then it can be dispatched if the number
|
|
// of edges seen for the channel is at least two.
|
|
if numEdges := hn.openChans[targetChan]; numEdges >= 2 {
|
|
close(req.eventChan)
|
|
return
|
|
}
|
|
|
|
// Before we add the channel to our set of open clients, we'll check to
|
|
// see if the channel is already in the channel graph of the target
|
|
// node. This lets us handle the case where a node has already seen a
|
|
// channel before a notification has been requested, causing us to miss
|
|
// it.
|
|
chanFound := hn.checkChanPointInGraph(targetChan)
|
|
if chanFound {
|
|
close(req.eventChan)
|
|
return
|
|
}
|
|
|
|
// Otherwise, we'll add this to the list of open channel watchers for
|
|
// this out point.
|
|
hn.openChanWatchers[targetChan] = append(
|
|
hn.openChanWatchers[targetChan],
|
|
req.eventChan,
|
|
)
|
|
}
|
|
|
|
// handleClosedChannelUpdate takes a series of closed channel updates, extracts
|
|
// the outpoints, saves them to harness node's internal state, and notifies all
|
|
// registered clients.
|
|
func (hn *HarnessNode) handleClosedChannelUpdate(
|
|
updates []*lnrpc.ClosedChannelUpdate) {
|
|
|
|
// For each channel closed, we'll mark that we've detected a channel
|
|
// closure while lnd was pruning the channel graph.
|
|
for _, closedChan := range updates {
|
|
op, err := MakeOutpoint(closedChan.ChanPoint)
|
|
if err != nil {
|
|
hn.PrintErrf("failed to create outpoint for %v "+
|
|
"got err: %v", closedChan.ChanPoint, err)
|
|
return
|
|
}
|
|
|
|
hn.closedChans[op] = struct{}{}
|
|
|
|
// As the channel has been closed, we'll notify all register
|
|
// watchers.
|
|
for _, eventChan := range hn.closeChanWatchers[op] {
|
|
close(eventChan)
|
|
}
|
|
delete(hn.closeChanWatchers, op)
|
|
}
|
|
}
|
|
|
|
// handleCloseChannelWatchRequest processes a watch close channel request by
|
|
// checking whether the given channel point can be found in the node's internal
|
|
// state. If not, the request is added to a watch request list than will be
|
|
// handled by handleCloseChannelWatchRequest.
|
|
func (hn *HarnessNode) handleCloseChannelWatchRequest(req *chanWatchRequest) {
|
|
targetChan := req.chanPoint
|
|
|
|
// If this is a close request, then it can be immediately dispatched if
|
|
// we've already seen a channel closure for this channel.
|
|
if _, ok := hn.closedChans[targetChan]; ok {
|
|
close(req.eventChan)
|
|
return
|
|
}
|
|
|
|
// Otherwise, we'll add this to the list of close channel watchers for
|
|
// this out point.
|
|
hn.closeChanWatchers[targetChan] = append(
|
|
hn.closeChanWatchers[targetChan],
|
|
req.eventChan,
|
|
)
|
|
}
|
|
|
|
type topologyClient lnrpc.Lightning_SubscribeChannelGraphClient
|
|
|
|
// newTopologyClient creates a topology client.
|
|
func (hn *HarnessNode) newTopologyClient(
|
|
ctx context.Context) (topologyClient, error) {
|
|
|
|
req := &lnrpc.GraphTopologySubscription{}
|
|
client, err := hn.rpc.LN.SubscribeChannelGraph(ctx, req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("%s(%d): unable to create topology "+
|
|
"client: %v (%s)", hn.Name(), hn.NodeID, err,
|
|
time.Now().String())
|
|
}
|
|
|
|
return client, nil
|
|
}
|
|
|
|
// receiveTopologyClientStream initializes a topologyClient to subscribe
|
|
// topology update events. Due to a race condition between the ChannelRouter
|
|
// starting and us making the subscription request, it's possible for our graph
|
|
// subscription to fail. In that case, we will retry the subscription until it
|
|
// succeeds or fail after 10 seconds.
|
|
//
|
|
// NOTE: must be run as a goroutine.
|
|
func (hn *HarnessNode) receiveTopologyClientStream(
|
|
receiver chan *lnrpc.GraphTopologyUpdate) error {
|
|
|
|
// Create a topology client to receive graph updates.
|
|
client, err := hn.newTopologyClient(hn.runCtx)
|
|
if err != nil {
|
|
return fmt.Errorf("create topologyClient failed: %w", err)
|
|
}
|
|
|
|
// We use the context to time out when retrying graph subscription.
|
|
ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
|
|
defer cancel()
|
|
|
|
for {
|
|
update, err := client.Recv()
|
|
|
|
switch {
|
|
case err == nil:
|
|
// Good case. We will send the update to the receiver.
|
|
|
|
case strings.Contains(err.Error(), "router not started"):
|
|
// If the router hasn't been started, we will retry
|
|
// every 200 ms until it has been started or fail
|
|
// after the ctxt is timed out.
|
|
select {
|
|
case <-ctxt.Done():
|
|
return fmt.Errorf("graph subscription: " +
|
|
"router not started before timeout")
|
|
case <-time.After(wait.PollInterval):
|
|
case <-hn.runCtx.Done():
|
|
return nil
|
|
}
|
|
|
|
// Re-create the topology client.
|
|
client, err = hn.newTopologyClient(hn.runCtx)
|
|
if err != nil {
|
|
return fmt.Errorf("create topologyClient "+
|
|
"failed: %v", err)
|
|
}
|
|
|
|
continue
|
|
|
|
case strings.Contains(err.Error(), "EOF"):
|
|
// End of subscription stream. Do nothing and quit.
|
|
return nil
|
|
|
|
case strings.Contains(err.Error(), context.Canceled.Error()):
|
|
// End of subscription stream. Do nothing and quit.
|
|
return nil
|
|
|
|
default:
|
|
// An expected error is returned, return and leave it
|
|
// to be handled by the caller.
|
|
return fmt.Errorf("graph subscription err: %w", err)
|
|
}
|
|
|
|
// Send the update or quit.
|
|
select {
|
|
case receiver <- update:
|
|
case <-hn.runCtx.Done():
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// handlePolicyUpdateWatchRequest checks that if the expected policy can be
|
|
// found either in the node's interval state or describe graph response. If
|
|
// found, it will signal the request by closing the event channel. Otherwise it
|
|
// does nothing but returns nil.
|
|
func (hn *HarnessNode) handlePolicyUpdateWatchRequest(req *chanWatchRequest) {
|
|
op := req.chanPoint
|
|
|
|
// Get a list of known policies for this chanPoint+advertisingNode
|
|
// combination. Start searching in the node state first.
|
|
policies, ok := hn.policyUpdates[op.String()][req.advertisingNode]
|
|
|
|
if !ok {
|
|
// If it cannot be found in the node state, try searching it
|
|
// from the node's DescribeGraph.
|
|
policyMap := hn.getChannelPolicies(req.includeUnannounced)
|
|
policies, ok = policyMap[op.String()][req.advertisingNode]
|
|
if !ok {
|
|
return
|
|
}
|
|
}
|
|
|
|
// Check if there's a matched policy.
|
|
for _, policy := range policies {
|
|
if CheckChannelPolicy(policy, req.policy) == nil {
|
|
close(req.eventChan)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// getChannelPolicies queries the channel graph and formats the policies into
|
|
// the format defined in type policyUpdateMap.
|
|
func (hn *HarnessNode) getChannelPolicies(include bool) policyUpdateMap {
|
|
ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
|
|
defer cancel()
|
|
|
|
graph, err := hn.rpc.LN.DescribeGraph(ctxt, &lnrpc.ChannelGraphRequest{
|
|
IncludeUnannounced: include,
|
|
})
|
|
if err != nil {
|
|
hn.PrintErrf("DescribeGraph got err: %v", err)
|
|
return nil
|
|
}
|
|
|
|
policyUpdates := policyUpdateMap{}
|
|
|
|
for _, e := range graph.Edges {
|
|
policies := policyUpdates[e.ChanPoint]
|
|
|
|
// If the map[op] is nil, we need to initialize the map first.
|
|
if policies == nil {
|
|
policies = make(map[string][]*lnrpc.RoutingPolicy)
|
|
}
|
|
|
|
if e.Node1Policy != nil {
|
|
policies[e.Node1Pub] = append(
|
|
policies[e.Node1Pub], e.Node1Policy,
|
|
)
|
|
}
|
|
|
|
if e.Node2Policy != nil {
|
|
policies[e.Node2Pub] = append(
|
|
policies[e.Node2Pub], e.Node2Policy,
|
|
)
|
|
}
|
|
|
|
policyUpdates[e.ChanPoint] = policies
|
|
}
|
|
|
|
return policyUpdates
|
|
}
|
|
|
|
// renameFile is a helper to rename (log) files created during integration
|
|
// tests.
|
|
func renameFile(fromFileName, toFileName string) {
|
|
err := os.Rename(fromFileName, toFileName)
|
|
if err != nil {
|
|
fmt.Printf("could not rename %s to %s: %v\n", // nolint:forbidigo
|
|
fromFileName, toFileName, err)
|
|
}
|
|
}
|
|
|
|
// getFinalizedLogFilePrefix returns the finalize log filename.
|
|
func getFinalizedLogFilePrefix(hn *HarnessNode) string {
|
|
pubKeyHex := hex.EncodeToString(
|
|
hn.PubKey[:logPubKeyBytes],
|
|
)
|
|
|
|
return fmt.Sprintf("%s/%d-%s-%s-%s",
|
|
GetLogDir(), hn.NodeID,
|
|
hn.Cfg.LogFilenamePrefix,
|
|
hn.Cfg.Name, pubKeyHex)
|
|
}
|
|
|
|
// finalizeLogfile makes sure the log file cleanup function is initialized,
|
|
// even if no log file is created.
|
|
func finalizeLogfile(hn *HarnessNode, fileName string) {
|
|
if hn.logFile != nil {
|
|
hn.logFile.Close()
|
|
|
|
// If logoutput flag is not set, return early.
|
|
if !*LogOutput {
|
|
return
|
|
}
|
|
|
|
newFileName := fmt.Sprintf("%v.log",
|
|
getFinalizedLogFilePrefix(hn),
|
|
)
|
|
|
|
renameFile(fileName, newFileName)
|
|
}
|
|
}
|
|
|
|
func finalizeEtcdLog(hn *HarnessNode) {
|
|
if hn.Cfg.DbBackend != BackendEtcd {
|
|
return
|
|
}
|
|
|
|
etcdLogFileName := fmt.Sprintf("%s/etcd.log", hn.Cfg.LogDir)
|
|
newEtcdLogFileName := fmt.Sprintf("%v-etcd.log",
|
|
getFinalizedLogFilePrefix(hn),
|
|
)
|
|
|
|
renameFile(etcdLogFileName, newEtcdLogFileName)
|
|
}
|
|
|
|
func addLogFile(hn *HarnessNode) (string, error) {
|
|
var fileName string
|
|
|
|
dir := GetLogDir()
|
|
fileName = fmt.Sprintf("%s/%d-%s-%s-%s.log", dir, hn.NodeID,
|
|
hn.Cfg.LogFilenamePrefix, hn.Cfg.Name,
|
|
hex.EncodeToString(hn.PubKey[:logPubKeyBytes]))
|
|
|
|
// If the node's PubKey is not yet initialized, create a
|
|
// temporary file name. Later, after the PubKey has been
|
|
// initialized, the file can be moved to its final name with
|
|
// the PubKey included.
|
|
if bytes.Equal(hn.PubKey[:4], []byte{0, 0, 0, 0}) {
|
|
fileName = fmt.Sprintf("%s/%d-%s-%s-tmp__.log", dir,
|
|
hn.NodeID, hn.Cfg.LogFilenamePrefix,
|
|
hn.Cfg.Name)
|
|
}
|
|
|
|
// Create file if not exists, otherwise append.
|
|
file, err := os.OpenFile(fileName,
|
|
os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
|
|
if err != nil {
|
|
return fileName, err
|
|
}
|
|
|
|
// Pass node's stderr to both errb and the file.
|
|
w := io.MultiWriter(hn.cmd.Stderr, file)
|
|
hn.cmd.Stderr = w
|
|
|
|
// Pass the node's stdout only to the file.
|
|
hn.cmd.Stdout = file
|
|
|
|
// Let the node keep a reference to this file, such
|
|
// that we can add to it if necessary.
|
|
hn.logFile = file
|
|
|
|
return fileName, nil
|
|
}
|