lnd/lntest/harness_node.go
Olaoluwa Osuntokun 2a18263336
lntest: keep all failed payments by default for HarnessNode
In this commit, we fix a detected flake: we go to make a payment, and
then want to assert that 2 attempts we required. The existing logic
assumes that the success and the payment will still be on disk. With a
recent PR, we now delete failed payments by default, but after the fact,
in a non-atomic fashion.

We fix this issue simply by having all the nodes keep around failed
payments for the sake of all the old itests that assumed this
information would always be on disk.

Introduced in: https://github.com/lightningnetwork/lnd/pull/6438.

Fixes https://github.com/lightningnetwork/lnd/issues/6711.
2022-07-07 19:28:30 -07:00

1908 lines
53 KiB
Go

package lntest
import (
"bytes"
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path"
"path/filepath"
"strings"
"sync"
"time"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/wire"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/lightningnetwork/lnd/chanbackup"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/chainrpc"
"github.com/lightningnetwork/lnd/lnrpc/invoicesrpc"
"github.com/lightningnetwork/lnd/lnrpc/peersrpc"
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
"github.com/lightningnetwork/lnd/lnrpc/signrpc"
"github.com/lightningnetwork/lnd/lnrpc/walletrpc"
"github.com/lightningnetwork/lnd/lnrpc/watchtowerrpc"
"github.com/lightningnetwork/lnd/lnrpc/wtclientrpc"
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/macaroons"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
"gopkg.in/macaroon.v2"
)
const (
// logPubKeyBytes is the number of bytes of the node's PubKey that will
// be appended to the log file name. The whole PubKey is too long and
// not really necessary to quickly identify what node produced which
// log file.
logPubKeyBytes = 4
// trickleDelay is the amount of time in milliseconds between each
// release of announcements by AuthenticatedGossiper to the network.
trickleDelay = 50
postgresDsn = "postgres://postgres:postgres@localhost:6432/%s?sslmode=disable"
// commitInterval specifies the maximum interval the graph database
// will wait between attempting to flush a batch of modifications to
// disk(db.batch-commit-interval).
commitInterval = 10 * time.Millisecond
)
var (
// numActiveNodes is the number of active nodes within the test network.
numActiveNodes = 0
numActiveNodesMtx sync.Mutex
)
func postgresDatabaseDsn(dbName string) string {
return fmt.Sprintf(postgresDsn, dbName)
}
// BackendConfig is an interface that abstracts away the specific chain backend
// node implementation.
type BackendConfig interface {
// GenArgs returns the arguments needed to be passed to LND at startup
// for using this node as a chain backend.
GenArgs() []string
// ConnectMiner is called to establish a connection to the test miner.
ConnectMiner() error
// DisconnectMiner is called to disconnect the miner.
DisconnectMiner() error
// Name returns the name of the backend type.
Name() string
// Credentials returns the rpc username, password and host for the
// backend.
Credentials() (string, string, string, error)
}
// NodeConfig is the basic interface a node configuration must implement.
type NodeConfig interface {
// BaseConfig returns the base node configuration struct.
BaseConfig() *BaseNodeConfig
// GenerateListeningPorts generates the ports to listen on designated
// for the current lightning network test.
GenerateListeningPorts()
// GenArgs generates a slice of command line arguments from the
// lightning node config struct.
GenArgs() []string
}
// BaseNodeConfig is the base node configuration.
type BaseNodeConfig struct {
Name string
// LogFilenamePrefix is used to prefix node log files. Can be used
// to store the current test case for simpler postmortem debugging.
LogFilenamePrefix string
BackendCfg BackendConfig
NetParams *chaincfg.Params
BaseDir string
ExtraArgs []string
DataDir string
LogDir string
TLSCertPath string
TLSKeyPath string
AdminMacPath string
ReadMacPath string
InvoiceMacPath string
HasSeed bool
Password []byte
P2PPort int
RPCPort int
RESTPort int
ProfilePort int
AcceptKeySend bool
AcceptAMP bool
FeeURL string
DbBackend DatabaseBackend
PostgresDsn string
}
func (cfg BaseNodeConfig) P2PAddr() string {
return fmt.Sprintf(ListenerFormat, cfg.P2PPort)
}
func (cfg BaseNodeConfig) RPCAddr() string {
return fmt.Sprintf(ListenerFormat, cfg.RPCPort)
}
func (cfg BaseNodeConfig) RESTAddr() string {
return fmt.Sprintf(ListenerFormat, cfg.RESTPort)
}
// DBDir returns the holding directory path of the graph database.
func (cfg BaseNodeConfig) DBDir() string {
return filepath.Join(cfg.DataDir, "graph", cfg.NetParams.Name)
}
func (cfg BaseNodeConfig) DBPath() string {
return filepath.Join(cfg.DBDir(), "channel.db")
}
func (cfg BaseNodeConfig) ChanBackupPath() string {
return filepath.Join(
cfg.DataDir, "chain", "bitcoin",
fmt.Sprintf(
"%v/%v", cfg.NetParams.Name,
chanbackup.DefaultBackupFileName,
),
)
}
// GenerateListeningPorts generates the ports to listen on designated for the
// current lightning network test.
func (cfg *BaseNodeConfig) GenerateListeningPorts() {
if cfg.P2PPort == 0 {
cfg.P2PPort = NextAvailablePort()
}
if cfg.RPCPort == 0 {
cfg.RPCPort = NextAvailablePort()
}
if cfg.RESTPort == 0 {
cfg.RESTPort = NextAvailablePort()
}
if cfg.ProfilePort == 0 {
cfg.ProfilePort = NextAvailablePort()
}
}
// BaseConfig returns the base node configuration struct.
func (cfg *BaseNodeConfig) BaseConfig() *BaseNodeConfig {
return cfg
}
// GenArgs generates a slice of command line arguments from the lightning node
// config struct.
func (cfg *BaseNodeConfig) GenArgs() []string {
var args []string
switch cfg.NetParams {
case &chaincfg.TestNet3Params:
args = append(args, "--bitcoin.testnet")
case &chaincfg.SimNetParams:
args = append(args, "--bitcoin.simnet")
case &chaincfg.RegressionNetParams:
args = append(args, "--bitcoin.regtest")
}
backendArgs := cfg.BackendCfg.GenArgs()
args = append(args, backendArgs...)
nodeArgs := []string{
"--bitcoin.active",
"--nobootstrap",
"--debuglevel=debug",
"--bitcoin.defaultchanconfs=1",
"--keep-failed-payment-attempts",
fmt.Sprintf("--db.batch-commit-interval=%v", commitInterval),
fmt.Sprintf("--bitcoin.defaultremotedelay=%v", DefaultCSV),
fmt.Sprintf("--rpclisten=%v", cfg.RPCAddr()),
fmt.Sprintf("--restlisten=%v", cfg.RESTAddr()),
fmt.Sprintf("--restcors=https://%v", cfg.RESTAddr()),
fmt.Sprintf("--listen=%v", cfg.P2PAddr()),
fmt.Sprintf("--externalip=%v", cfg.P2PAddr()),
fmt.Sprintf("--lnddir=%v", cfg.BaseDir),
fmt.Sprintf("--adminmacaroonpath=%v", cfg.AdminMacPath),
fmt.Sprintf("--readonlymacaroonpath=%v", cfg.ReadMacPath),
fmt.Sprintf("--invoicemacaroonpath=%v", cfg.InvoiceMacPath),
fmt.Sprintf("--trickledelay=%v", trickleDelay),
fmt.Sprintf("--profile=%d", cfg.ProfilePort),
fmt.Sprintf("--caches.rpc-graph-cache-duration=%d", 0),
}
args = append(args, nodeArgs...)
if !cfg.HasSeed {
args = append(args, "--noseedbackup")
}
if cfg.ExtraArgs != nil {
args = append(args, cfg.ExtraArgs...)
}
if cfg.AcceptKeySend {
args = append(args, "--accept-keysend")
}
if cfg.AcceptAMP {
args = append(args, "--accept-amp")
}
switch cfg.DbBackend {
case BackendEtcd:
args = append(args, "--db.backend=etcd")
args = append(args, "--db.etcd.embedded")
args = append(
args, fmt.Sprintf(
"--db.etcd.embedded_client_port=%v",
NextAvailablePort(),
),
)
args = append(
args, fmt.Sprintf(
"--db.etcd.embedded_peer_port=%v",
NextAvailablePort(),
),
)
args = append(
args, fmt.Sprintf(
"--db.etcd.embedded_log_file=%v",
path.Join(cfg.LogDir, "etcd.log"),
),
)
case BackendPostgres:
args = append(args, "--db.backend=postgres")
args = append(args, "--db.postgres.dsn="+cfg.PostgresDsn)
}
if cfg.FeeURL != "" {
args = append(args, "--feeurl="+cfg.FeeURL)
}
return args
}
// policyUpdateMap defines a type to store channel policy updates. It has the
// format,
// {
// "chanPoint1": {
// "advertisingNode1": [
// policy1, policy2, ...
// ],
// "advertisingNode2": [
// policy1, policy2, ...
// ]
// },
// "chanPoint2": ...
// }.
type policyUpdateMap map[string]map[string][]*lnrpc.RoutingPolicy
// HarnessNode represents an instance of lnd running within our test network
// harness. Each HarnessNode instance also fully embeds an RPC client in
// order to pragmatically drive the node.
type HarnessNode struct {
Cfg *BaseNodeConfig
// NodeID is a unique identifier for the node within a NetworkHarness.
NodeID int
// PubKey is the serialized compressed identity public key of the node.
// This field will only be populated once the node itself has been
// started via the start() method.
PubKey [33]byte
PubKeyStr string
// rpc holds a list of RPC clients.
rpc *RPCClients
// chanWatchRequests receives a request for watching a particular event
// for a given channel.
chanWatchRequests chan *chanWatchRequest
// For each outpoint, we'll track an integer which denotes the number of
// edges seen for that channel within the network. When this number
// reaches 2, then it means that both edge advertisements has propagated
// through the network.
openChans map[wire.OutPoint]int
openChanWatchers map[wire.OutPoint][]chan struct{}
closedChans map[wire.OutPoint]struct{}
closeChanWatchers map[wire.OutPoint][]chan struct{}
// policyUpdates stores a slice of seen polices by each advertising
// node and the outpoint.
policyUpdates policyUpdateMap
// backupDbDir is the path where a database backup is stored, if any.
backupDbDir string
// postgresDbName is the name of the postgres database where lnd data is
// stored in.
postgresDbName string
// runCtx is a context with cancel method. It's used to signal when the
// node needs to quit, and used as the parent context when spawning
// children contexts for RPC requests.
runCtx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
cmd *exec.Cmd
logFile *os.File
// TODO(yy): remove
lnrpc.LightningClient
lnrpc.WalletUnlockerClient
invoicesrpc.InvoicesClient
peersrpc.PeersClient
SignerClient signrpc.SignerClient
RouterClient routerrpc.RouterClient
WalletKitClient walletrpc.WalletKitClient
Watchtower watchtowerrpc.WatchtowerClient
WatchtowerClient wtclientrpc.WatchtowerClientClient
StateClient lnrpc.StateClient
ChainClient chainrpc.ChainNotifierClient
}
// RPCClients wraps a list of RPC clients into a single struct for easier
// access.
type RPCClients struct {
// conn is the underlying connection to the grpc endpoint of the node.
conn *grpc.ClientConn
LN lnrpc.LightningClient
WalletUnlocker lnrpc.WalletUnlockerClient
Invoice invoicesrpc.InvoicesClient
Signer signrpc.SignerClient
Router routerrpc.RouterClient
WalletKit walletrpc.WalletKitClient
Watchtower watchtowerrpc.WatchtowerClient
WatchtowerClient wtclientrpc.WatchtowerClientClient
State lnrpc.StateClient
ChainClient chainrpc.ChainNotifierClient
}
// Assert *HarnessNode implements the lnrpc.LightningClient interface.
var _ lnrpc.LightningClient = (*HarnessNode)(nil)
var _ lnrpc.WalletUnlockerClient = (*HarnessNode)(nil)
var _ invoicesrpc.InvoicesClient = (*HarnessNode)(nil)
var _ peersrpc.PeersClient = (*HarnessNode)(nil)
// nextNodeID generates a unique sequence to be used as the node's ID.
func nextNodeID() int {
numActiveNodesMtx.Lock()
defer numActiveNodesMtx.Unlock()
nodeNum := numActiveNodes
numActiveNodes++
return nodeNum
}
// newNode creates a new test lightning node instance from the passed config.
func newNode(cfg *BaseNodeConfig) (*HarnessNode, error) {
if cfg.BaseDir == "" {
var err error
cfg.BaseDir, err = ioutil.TempDir("", "lndtest-node")
if err != nil {
return nil, err
}
}
cfg.DataDir = filepath.Join(cfg.BaseDir, "data")
cfg.LogDir = filepath.Join(cfg.BaseDir, "logs")
cfg.TLSCertPath = filepath.Join(cfg.BaseDir, "tls.cert")
cfg.TLSKeyPath = filepath.Join(cfg.BaseDir, "tls.key")
networkDir := filepath.Join(
cfg.DataDir, "chain", "bitcoin", cfg.NetParams.Name,
)
cfg.AdminMacPath = filepath.Join(networkDir, "admin.macaroon")
cfg.ReadMacPath = filepath.Join(networkDir, "readonly.macaroon")
cfg.InvoiceMacPath = filepath.Join(networkDir, "invoice.macaroon")
cfg.GenerateListeningPorts()
// Run all tests with accept keysend. The keysend code is very isolated
// and it is highly unlikely that it would affect regular itests when
// enabled.
cfg.AcceptKeySend = true
// Create temporary database.
var dbName string
if cfg.DbBackend == BackendPostgres {
var err error
dbName, err = createTempPgDb()
if err != nil {
return nil, err
}
cfg.PostgresDsn = postgresDatabaseDsn(dbName)
}
return &HarnessNode{
Cfg: cfg,
NodeID: nextNodeID(),
chanWatchRequests: make(chan *chanWatchRequest),
openChans: make(map[wire.OutPoint]int),
openChanWatchers: make(map[wire.OutPoint][]chan struct{}),
closedChans: make(map[wire.OutPoint]struct{}),
closeChanWatchers: make(map[wire.OutPoint][]chan struct{}),
policyUpdates: policyUpdateMap{},
postgresDbName: dbName,
}, nil
}
func createTempPgDb() (string, error) {
// Create random database name.
randBytes := make([]byte, 8)
_, err := rand.Read(randBytes)
if err != nil {
return "", err
}
dbName := "itest_" + hex.EncodeToString(randBytes)
// Create database.
err = executePgQuery("CREATE DATABASE " + dbName)
if err != nil {
return "", err
}
return dbName, nil
}
func executePgQuery(query string) error {
pool, err := pgxpool.Connect(
context.Background(),
postgresDatabaseDsn("postgres"),
)
if err != nil {
return fmt.Errorf("unable to connect to database: %v", err)
}
defer pool.Close()
_, err = pool.Exec(context.Background(), query)
return err
}
// String gives the internal state of the node which is useful for debugging.
func (hn *HarnessNode) String() string {
type nodeCfg struct {
LogFilenamePrefix string
ExtraArgs []string
HasSeed bool
P2PPort int
RPCPort int
RESTPort int
ProfilePort int
AcceptKeySend bool
AcceptAMP bool
FeeURL string
}
nodeState := struct {
NodeID int
Name string
PubKey string
OpenChans map[string]int
ClosedChans map[string]struct{}
NodeCfg nodeCfg
}{
NodeID: hn.NodeID,
Name: hn.Cfg.Name,
PubKey: hn.PubKeyStr,
OpenChans: make(map[string]int),
ClosedChans: make(map[string]struct{}),
NodeCfg: nodeCfg{
LogFilenamePrefix: hn.Cfg.LogFilenamePrefix,
ExtraArgs: hn.Cfg.ExtraArgs,
HasSeed: hn.Cfg.HasSeed,
P2PPort: hn.Cfg.P2PPort,
RPCPort: hn.Cfg.RPCPort,
RESTPort: hn.Cfg.RESTPort,
AcceptKeySend: hn.Cfg.AcceptKeySend,
AcceptAMP: hn.Cfg.AcceptAMP,
FeeURL: hn.Cfg.FeeURL,
},
}
for outpoint, count := range hn.openChans {
nodeState.OpenChans[outpoint.String()] = count
}
for outpoint, count := range hn.closedChans {
nodeState.ClosedChans[outpoint.String()] = count
}
stateBytes, err := json.MarshalIndent(nodeState, "", "\t")
if err != nil {
return fmt.Sprintf("\n encode node state with err: %v", err)
}
return fmt.Sprintf("\nnode state: %s", stateBytes)
}
// DBPath returns the filepath to the channeldb database file for this node.
func (hn *HarnessNode) DBPath() string {
return hn.Cfg.DBPath()
}
// DBDir returns the path for the directory holding channeldb file(s).
func (hn *HarnessNode) DBDir() string {
return hn.Cfg.DBDir()
}
// Name returns the name of this node set during initialization.
func (hn *HarnessNode) Name() string {
return hn.Cfg.Name
}
// TLSCertStr returns the path where the TLS certificate is stored.
func (hn *HarnessNode) TLSCertStr() string {
return hn.Cfg.TLSCertPath
}
// TLSKeyStr returns the path where the TLS key is stored.
func (hn *HarnessNode) TLSKeyStr() string {
return hn.Cfg.TLSKeyPath
}
// ChanBackupPath returns the fielpath to the on-disk channel.backup file for
// this node.
func (hn *HarnessNode) ChanBackupPath() string {
return hn.Cfg.ChanBackupPath()
}
// AdminMacPath returns the filepath to the admin.macaroon file for this node.
func (hn *HarnessNode) AdminMacPath() string {
return hn.Cfg.AdminMacPath
}
// ReadMacPath returns the filepath to the readonly.macaroon file for this node.
func (hn *HarnessNode) ReadMacPath() string {
return hn.Cfg.ReadMacPath
}
// InvoiceMacPath returns the filepath to the invoice.macaroon file for this
// node.
func (hn *HarnessNode) InvoiceMacPath() string {
return hn.Cfg.InvoiceMacPath
}
// startLnd handles the startup of lnd, creating log files, and possibly kills
// the process when needed.
func (hn *HarnessNode) startLnd(lndBinary string, lndError chan<- error) error {
args := hn.Cfg.GenArgs()
hn.cmd = exec.Command(lndBinary, args...)
// Redirect stderr output to buffer
var errb bytes.Buffer
hn.cmd.Stderr = &errb
// If the logoutput flag is passed, redirect output from the nodes to
// log files.
var (
fileName string
err error
)
if *logOutput {
fileName, err = addLogFile(hn)
if err != nil {
return err
}
}
if err := hn.cmd.Start(); err != nil {
return err
}
// Launch a new goroutine which that bubbles up any potential fatal
// process errors to the goroutine running the tests.
hn.wg.Add(1)
go func() {
defer hn.wg.Done()
err := hn.cmd.Wait()
if err != nil {
lndError <- fmt.Errorf("%v\n%v", err, errb.String())
}
// Make sure log file is closed and renamed if necessary.
finalizeLogfile(hn, fileName)
// Rename the etcd.log file if the node was running on embedded
// etcd.
finalizeEtcdLog(hn)
}()
return nil
}
// Start launches a new process running lnd. Additionally, the PID of the
// launched process is saved in order to possibly kill the process forcibly
// later.
//
// This may not clean up properly if an error is returned, so the caller should
// call shutdown() regardless of the return value.
func (hn *HarnessNode) start(lndBinary string, lndError chan<- error,
wait bool) error {
// Init the runCtx.
ctxt, cancel := context.WithCancel(context.Background())
hn.runCtx = ctxt
hn.cancel = cancel
// Start lnd and prepare logs.
if err := hn.startLnd(lndBinary, lndError); err != nil {
return err
}
// We may want to skip waiting for the node to come up (eg. the node
// is waiting to become the leader).
if !wait {
return nil
}
// Since Stop uses the LightningClient to stop the node, if we fail to
// get a connected client, we have to kill the process.
useMacaroons := !hn.Cfg.HasSeed
conn, err := hn.ConnectRPC(useMacaroons)
if err != nil {
err = fmt.Errorf("ConnectRPC err: %w", err)
cmdErr := hn.cmd.Process.Kill()
if cmdErr != nil {
err = fmt.Errorf("kill process got err: %w: %v",
cmdErr, err)
}
return err
}
// Init all the RPC clients.
hn.InitRPCClients(conn)
if err := hn.WaitUntilStarted(); err != nil {
return err
}
// If the node was created with a seed, we will need to perform an
// additional step to unlock the wallet. The connection returned will
// only use the TLS certs, and can only perform operations necessary to
// unlock the daemon.
if hn.Cfg.HasSeed {
// TODO(yy): remove
hn.WalletUnlockerClient = lnrpc.NewWalletUnlockerClient(conn)
return nil
}
return hn.initLightningClient()
}
// WaitUntilStarted waits until the wallet state flips from "WAITING_TO_START".
func (hn *HarnessNode) WaitUntilStarted() error {
return hn.waitTillServerState(func(s lnrpc.WalletState) bool {
return s != lnrpc.WalletState_WAITING_TO_START
})
}
// WaitUntilStateReached waits until the given wallet state (or one of the
// states following it) has been reached.
func (hn *HarnessNode) WaitUntilStateReached(
desiredState lnrpc.WalletState) error {
return hn.waitTillServerState(func(s lnrpc.WalletState) bool {
return s >= desiredState
})
}
// WaitUntilServerActive waits until the lnd daemon is fully started.
func (hn *HarnessNode) WaitUntilServerActive() error {
return hn.waitTillServerState(func(s lnrpc.WalletState) bool {
return s == lnrpc.WalletState_SERVER_ACTIVE
})
}
// WaitUntilLeader attempts to finish the start procedure by initiating an RPC
// connection and setting up the wallet unlocker client. This is needed when
// a node that has recently been started was waiting to become the leader and
// we're at the point when we expect that it is the leader now (awaiting
// unlock).
func (hn *HarnessNode) WaitUntilLeader(timeout time.Duration) error {
var (
conn *grpc.ClientConn
connErr error
)
if err := wait.NoError(func() error {
conn, connErr = hn.ConnectRPC(!hn.Cfg.HasSeed)
return connErr
}, timeout); err != nil {
return err
}
// Init all the RPC clients.
hn.InitRPCClients(conn)
if err := hn.WaitUntilStarted(); err != nil {
return err
}
// If the node was created with a seed, we will need to perform an
// additional step to unlock the wallet. The connection returned will
// only use the TLS certs, and can only perform operations necessary to
// unlock the daemon.
if hn.Cfg.HasSeed {
// TODO(yy): remove
hn.WalletUnlockerClient = lnrpc.NewWalletUnlockerClient(conn)
return nil
}
return hn.initLightningClient()
}
// initClientWhenReady waits until the main gRPC server is detected as active,
// then complete the normal HarnessNode gRPC connection creation. If the node
// is initialized stateless, the macaroon is returned so that the client can
// use it.
func (hn *HarnessNode) initClientWhenReady(stateless bool,
macBytes []byte) error {
// Wait for the wallet to finish unlocking, such that we can connect to
// it via a macaroon-authenticated rpc connection.
var (
conn *grpc.ClientConn
err error
)
if err = wait.NoError(func() error {
// If the node has been initialized stateless, we need to pass
// the macaroon to the client.
if stateless {
adminMac := &macaroon.Macaroon{}
err := adminMac.UnmarshalBinary(macBytes)
if err != nil {
return fmt.Errorf("unmarshal failed: %w", err)
}
conn, err = hn.ConnectRPCWithMacaroon(adminMac)
return err
}
// Normal initialization, we expect a macaroon to be in the
// file system.
conn, err = hn.ConnectRPC(true)
return err
}, DefaultTimeout); err != nil {
return fmt.Errorf("timeout while init client: %w", err)
}
// Init all the RPC clients.
hn.InitRPCClients(conn)
return hn.initLightningClient()
}
// Init initializes a harness node by passing the init request via rpc. After
// the request is submitted, this method will block until a
// macaroon-authenticated RPC connection can be established to the harness
// node. Once established, the new connection is used to initialize the
// LightningClient and subscribes the HarnessNode to topology changes.
func (hn *HarnessNode) Init(
initReq *lnrpc.InitWalletRequest) (*lnrpc.InitWalletResponse, error) {
ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
defer cancel()
response, err := hn.rpc.WalletUnlocker.InitWallet(ctxt, initReq)
if err != nil {
return nil, fmt.Errorf("failed to init wallet: %w", err)
}
err = hn.initClientWhenReady(
initReq.StatelessInit, response.AdminMacaroon,
)
if err != nil {
return nil, fmt.Errorf("failed to init: %w", err)
}
return response, nil
}
// InitChangePassword initializes a harness node by passing the change password
// request via RPC. After the request is submitted, this method will block until
// a macaroon-authenticated RPC connection can be established to the harness
// node. Once established, the new connection is used to initialize the
// LightningClient and subscribes the HarnessNode to topology changes.
func (hn *HarnessNode) InitChangePassword(
chngPwReq *lnrpc.ChangePasswordRequest) (*lnrpc.ChangePasswordResponse,
error) {
ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
defer cancel()
response, err := hn.rpc.WalletUnlocker.ChangePassword(ctxt, chngPwReq)
if err != nil {
return nil, err
}
err = hn.initClientWhenReady(
chngPwReq.StatelessInit, response.AdminMacaroon,
)
if err != nil {
return nil, err
}
return response, nil
}
// Unlock attempts to unlock the wallet of the target HarnessNode. This method
// should be called after the restart of a HarnessNode that was created with a
// seed+password. Once this method returns, the HarnessNode will be ready to
// accept normal gRPC requests and harness command.
func (hn *HarnessNode) Unlock(unlockReq *lnrpc.UnlockWalletRequest) error {
ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
defer cancel()
// Otherwise, we'll need to unlock the node before it's able to start
// up properly.
_, err := hn.rpc.WalletUnlocker.UnlockWallet(ctxt, unlockReq)
if err != nil {
return err
}
// Now that the wallet has been unlocked, we'll wait for the RPC client
// to be ready, then establish the normal gRPC connection.
return hn.initClientWhenReady(false, nil)
}
// waitTillServerState makes a subscription to the server's state change and
// blocks until the server is in the targeted state.
func (hn *HarnessNode) waitTillServerState(
predicate func(state lnrpc.WalletState) bool) error {
ctxt, cancel := context.WithTimeout(hn.runCtx, NodeStartTimeout)
defer cancel()
client, err := hn.rpc.State.SubscribeState(
ctxt, &lnrpc.SubscribeStateRequest{},
)
if err != nil {
return fmt.Errorf("failed to subscribe to state: %w", err)
}
errChan := make(chan error, 1)
done := make(chan struct{})
go func() {
for {
resp, err := client.Recv()
if err != nil {
errChan <- err
return
}
if predicate(resp.State) {
close(done)
return
}
}
}()
var lastErr error
for {
select {
case err := <-errChan:
lastErr = err
case <-done:
return nil
case <-time.After(NodeStartTimeout):
return fmt.Errorf("timeout waiting for state, "+
"got err from stream: %v", lastErr)
}
}
}
// InitRPCClients initializes a list of RPC clients for the node.
func (hn *HarnessNode) InitRPCClients(c *grpc.ClientConn) {
hn.rpc = &RPCClients{
conn: c,
LN: lnrpc.NewLightningClient(c),
Invoice: invoicesrpc.NewInvoicesClient(c),
Router: routerrpc.NewRouterClient(c),
WalletKit: walletrpc.NewWalletKitClient(c),
WalletUnlocker: lnrpc.NewWalletUnlockerClient(c),
Watchtower: watchtowerrpc.NewWatchtowerClient(c),
WatchtowerClient: wtclientrpc.NewWatchtowerClientClient(c),
Signer: signrpc.NewSignerClient(c),
State: lnrpc.NewStateClient(c),
ChainClient: chainrpc.NewChainNotifierClient(c),
}
}
// initLightningClient blocks until the lnd server is fully started and
// subscribes the harness node to graph topology updates. This method also
// spawns a lightning network watcher for this node, which watches for topology
// changes.
func (hn *HarnessNode) initLightningClient() error {
// TODO(yy): remove
// Construct the LightningClient that will allow us to use the
// HarnessNode directly for normal rpc operations.
conn := hn.rpc.conn
hn.LightningClient = lnrpc.NewLightningClient(conn)
hn.InvoicesClient = invoicesrpc.NewInvoicesClient(conn)
hn.RouterClient = routerrpc.NewRouterClient(conn)
hn.WalletKitClient = walletrpc.NewWalletKitClient(conn)
hn.Watchtower = watchtowerrpc.NewWatchtowerClient(conn)
hn.WatchtowerClient = wtclientrpc.NewWatchtowerClientClient(conn)
hn.SignerClient = signrpc.NewSignerClient(conn)
hn.PeersClient = peersrpc.NewPeersClient(conn)
hn.StateClient = lnrpc.NewStateClient(conn)
hn.ChainClient = chainrpc.NewChainNotifierClient(conn)
// Wait until the server is fully started.
if err := hn.WaitUntilServerActive(); err != nil {
return err
}
// Set the harness node's pubkey to what the node claims in GetInfo.
// The RPC must have been started at this point.
if err := hn.FetchNodeInfo(); err != nil {
return err
}
// Launch the watcher that will hook into graph related topology change
// from the PoV of this node.
hn.wg.Add(1)
go hn.lightningNetworkWatcher()
return nil
}
// FetchNodeInfo queries an unlocked node to retrieve its public key.
func (hn *HarnessNode) FetchNodeInfo() error {
// Obtain the lnid of this node for quick identification purposes.
info, err := hn.rpc.LN.GetInfo(hn.runCtx, &lnrpc.GetInfoRequest{})
if err != nil {
return err
}
hn.PubKeyStr = info.IdentityPubkey
pubkey, err := hex.DecodeString(info.IdentityPubkey)
if err != nil {
return err
}
copy(hn.PubKey[:], pubkey)
return nil
}
// AddToLogf adds a line of choice to the node's logfile. This is useful
// to interleave test output with output from the node.
func (hn *HarnessNode) AddToLogf(format string, a ...interface{}) {
// If this node was not set up with a log file, just return early.
if hn.logFile == nil {
return
}
desc := fmt.Sprintf("itest: %s\n", fmt.Sprintf(format, a...))
if _, err := hn.logFile.WriteString(desc); err != nil {
hn.PrintErrf("write to log err: %v", err)
}
}
// ReadMacaroon waits a given duration for the macaroon file to be created. If
// the file is readable within the timeout, its content is de-serialized as a
// macaroon and returned.
func (hn *HarnessNode) ReadMacaroon(macPath string, timeout time.Duration) (
*macaroon.Macaroon, error) {
// Wait until macaroon file is created and has valid content before
// using it.
var mac *macaroon.Macaroon
err := wait.NoError(func() error {
macBytes, err := ioutil.ReadFile(macPath)
if err != nil {
return fmt.Errorf("error reading macaroon file: %v",
err)
}
newMac := &macaroon.Macaroon{}
if err = newMac.UnmarshalBinary(macBytes); err != nil {
return fmt.Errorf("error unmarshalling macaroon "+
"file: %v", err)
}
mac = newMac
return nil
}, timeout)
return mac, err
}
// ConnectRPCWithMacaroon uses the TLS certificate and given macaroon to
// create a gRPC client connection.
func (hn *HarnessNode) ConnectRPCWithMacaroon(mac *macaroon.Macaroon) (
*grpc.ClientConn, error) {
// Wait until TLS certificate is created and has valid content before
// using it, up to 30 sec.
var tlsCreds credentials.TransportCredentials
err := wait.NoError(func() error {
var err error
tlsCreds, err = credentials.NewClientTLSFromFile(
hn.Cfg.TLSCertPath, "",
)
return err
}, DefaultTimeout)
if err != nil {
return nil, fmt.Errorf("error reading TLS cert: %v", err)
}
opts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithTransportCredentials(tlsCreds),
}
ctx, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
defer cancel()
if mac == nil {
return grpc.DialContext(ctx, hn.Cfg.RPCAddr(), opts...)
}
macCred, err := macaroons.NewMacaroonCredential(mac)
if err != nil {
return nil, fmt.Errorf("error cloning mac: %v", err)
}
opts = append(opts, grpc.WithPerRPCCredentials(macCred))
return grpc.DialContext(ctx, hn.Cfg.RPCAddr(), opts...)
}
// ConnectRPC uses the TLS certificate and admin macaroon files written by the
// lnd node to create a gRPC client connection.
func (hn *HarnessNode) ConnectRPC(useMacs bool) (*grpc.ClientConn, error) {
// If we don't want to use macaroons, just pass nil, the next method
// will handle it correctly.
if !useMacs {
return hn.ConnectRPCWithMacaroon(nil)
}
// If we should use a macaroon, always take the admin macaroon as a
// default.
mac, err := hn.ReadMacaroon(hn.Cfg.AdminMacPath, DefaultTimeout)
if err != nil {
return nil, err
}
return hn.ConnectRPCWithMacaroon(mac)
}
// SetExtraArgs assigns the ExtraArgs field for the node's configuration. The
// changes will take effect on restart.
func (hn *HarnessNode) SetExtraArgs(extraArgs []string) {
hn.Cfg.ExtraArgs = extraArgs
}
// cleanup cleans up all the temporary files created by the node's process.
func (hn *HarnessNode) cleanup() error {
if hn.backupDbDir != "" {
err := os.RemoveAll(hn.backupDbDir)
if err != nil {
return fmt.Errorf("unable to remove backup dir: %v",
err)
}
}
return os.RemoveAll(hn.Cfg.BaseDir)
}
// Stop attempts to stop the active lnd process.
func (hn *HarnessNode) stop() error {
// Do nothing if the process is not running.
if hn.runCtx == nil {
return nil
}
// If start() failed before creating clients, we will just wait for the
// child process to die.
if hn.rpc != nil && hn.rpc.LN != nil {
// Don't watch for error because sometimes the RPC connection
// gets closed before a response is returned.
req := lnrpc.StopRequest{}
err := wait.NoError(func() error {
_, err := hn.rpc.LN.StopDaemon(hn.runCtx, &req)
switch {
case err == nil:
return nil
// Try again if a recovery/rescan is in progress.
case strings.Contains(
err.Error(), "recovery in progress",
):
return err
default:
return nil
}
}, DefaultTimeout)
if err != nil {
return err
}
}
// Stop the runCtx and wait for goroutines to finish.
hn.cancel()
// Wait for lnd process to exit.
err := wait.NoError(func() error {
if hn.cmd.ProcessState == nil {
return fmt.Errorf("process did not exit")
}
if !hn.cmd.ProcessState.Exited() {
return fmt.Errorf("process did not exit")
}
// Wait for goroutines to be finished.
hn.wg.Wait()
return nil
}, DefaultTimeout*2)
if err != nil {
return err
}
hn.LightningClient = nil
hn.WalletUnlockerClient = nil
hn.Watchtower = nil
hn.WatchtowerClient = nil
// Close any attempts at further grpc connections.
if hn.rpc.conn != nil {
err := status.Code(hn.rpc.conn.Close())
switch err {
case codes.OK:
return nil
// When the context is canceled above, we might get the
// following error as the context is no longer active.
case codes.Canceled:
return nil
case codes.Unknown:
return fmt.Errorf("unknown error attempting to stop "+
"grpc client: %v", err)
default:
return fmt.Errorf("error attempting to stop "+
"grpc client: %v", err)
}
}
return nil
}
// shutdown stops the active lnd process and cleans up any temporary
// directories created along the way.
func (hn *HarnessNode) shutdown() error {
if err := hn.stop(); err != nil {
return err
}
if err := hn.cleanup(); err != nil {
return err
}
return nil
}
// kill kills the lnd process.
func (hn *HarnessNode) kill() error {
return hn.cmd.Process.Kill()
}
type chanWatchType uint8
const (
// watchOpenChannel specifies that this is a request to watch an open
// channel event.
watchOpenChannel chanWatchType = iota
// watchCloseChannel specifies that this is a request to watch a close
// channel event.
watchCloseChannel
// watchPolicyUpdate specifies that this is a request to watch a policy
// update event.
watchPolicyUpdate
)
// closeChanWatchRequest is a request to the lightningNetworkWatcher to be
// notified once it's detected within the test Lightning Network, that a
// channel has either been added or closed.
type chanWatchRequest struct {
chanPoint wire.OutPoint
chanWatchType chanWatchType
eventChan chan struct{}
advertisingNode string
policy *lnrpc.RoutingPolicy
includeUnannounced bool
}
func (hn *HarnessNode) checkChanPointInGraph(chanPoint wire.OutPoint) bool {
ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
defer cancel()
chanGraph, err := hn.DescribeGraph(ctxt, &lnrpc.ChannelGraphRequest{})
if err != nil {
return false
}
targetChanPoint := chanPoint.String()
for _, chanEdge := range chanGraph.Edges {
candidateChanPoint := chanEdge.ChanPoint
if targetChanPoint == candidateChanPoint {
return true
}
}
return false
}
// lightningNetworkWatcher is a goroutine which is able to dispatch
// notifications once it has been observed that a target channel has been
// closed or opened within the network. In order to dispatch these
// notifications, the GraphTopologySubscription client exposed as part of the
// gRPC interface is used.
func (hn *HarnessNode) lightningNetworkWatcher() {
defer hn.wg.Done()
graphUpdates := make(chan *lnrpc.GraphTopologyUpdate)
// Start a goroutine to receive graph updates.
hn.wg.Add(1)
go func() {
defer hn.wg.Done()
err := hn.receiveTopologyClientStream(graphUpdates)
if err != nil {
hn.PrintErrf("receive topology client stream "+
"got err:%v", err)
}
}()
for {
select {
// A new graph update has just been received, so we'll examine
// the current set of registered clients to see if we can
// dispatch any requests.
case graphUpdate := <-graphUpdates:
hn.handleChannelEdgeUpdates(graphUpdate.ChannelUpdates)
hn.handleClosedChannelUpdate(graphUpdate.ClosedChans)
// TODO(yy): handle node updates too
// A new watch request, has just arrived. We'll either be able
// to dispatch immediately, or need to add the client for
// processing later.
case watchRequest := <-hn.chanWatchRequests:
switch watchRequest.chanWatchType {
case watchOpenChannel:
// TODO(roasbeef): add update type also, checks
// for multiple of 2
hn.handleOpenChannelWatchRequest(watchRequest)
case watchCloseChannel:
hn.handleCloseChannelWatchRequest(watchRequest)
case watchPolicyUpdate:
hn.handlePolicyUpdateWatchRequest(watchRequest)
}
case <-hn.runCtx.Done():
return
}
}
}
// WaitForNetworkChannelOpen will block until a channel with the target
// outpoint is seen as being fully advertised within the network. A channel is
// considered "fully advertised" once both of its directional edges has been
// advertised within the test Lightning Network.
func (hn *HarnessNode) WaitForNetworkChannelOpen(
chanPoint *lnrpc.ChannelPoint) error {
ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
defer cancel()
eventChan := make(chan struct{})
op, err := MakeOutpoint(chanPoint)
if err != nil {
return fmt.Errorf("failed to create outpoint for %v "+
"got err: %v", chanPoint, err)
}
hn.chanWatchRequests <- &chanWatchRequest{
chanPoint: op,
eventChan: eventChan,
chanWatchType: watchOpenChannel,
}
select {
case <-eventChan:
return nil
case <-ctxt.Done():
return fmt.Errorf("channel:%s not opened before timeout: %s",
op, hn)
}
}
// WaitForNetworkChannelClose will block until a channel with the target
// outpoint is seen as closed within the network. A channel is considered
// closed once a transaction spending the funding outpoint is seen within a
// confirmed block.
func (hn *HarnessNode) WaitForNetworkChannelClose(
chanPoint *lnrpc.ChannelPoint) error {
ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
defer cancel()
eventChan := make(chan struct{})
op, err := MakeOutpoint(chanPoint)
if err != nil {
return fmt.Errorf("failed to create outpoint for %v "+
"got err: %v", chanPoint, err)
}
hn.chanWatchRequests <- &chanWatchRequest{
chanPoint: op,
eventChan: eventChan,
chanWatchType: watchCloseChannel,
}
select {
case <-eventChan:
return nil
case <-ctxt.Done():
return fmt.Errorf("channel:%s not closed before timeout: "+
"%s", op, hn)
}
}
// WaitForChannelPolicyUpdate will block until a channel policy with the target
// outpoint and advertisingNode is seen within the network.
func (hn *HarnessNode) WaitForChannelPolicyUpdate(
advertisingNode string, policy *lnrpc.RoutingPolicy,
chanPoint *lnrpc.ChannelPoint, includeUnannounced bool) error {
ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
defer cancel()
eventChan := make(chan struct{})
op, err := MakeOutpoint(chanPoint)
if err != nil {
return fmt.Errorf("failed to create outpoint for %v"+
"got err: %v", chanPoint, err)
}
ticker := time.NewTicker(wait.PollInterval)
defer ticker.Stop()
for {
select {
// Send a watch request every second.
case <-ticker.C:
// Did the event can close in the meantime? We want to
// avoid a "close of closed channel" panic since we're
// re-using the same event chan for multiple requests.
select {
case <-eventChan:
return nil
default:
}
hn.chanWatchRequests <- &chanWatchRequest{
chanPoint: op,
eventChan: eventChan,
chanWatchType: watchPolicyUpdate,
policy: policy,
advertisingNode: advertisingNode,
includeUnannounced: includeUnannounced,
}
case <-eventChan:
return nil
case <-ctxt.Done():
return fmt.Errorf("channel:%s policy not updated "+
"before timeout: [%s:%v] %s", op,
advertisingNode, policy, hn.String())
}
}
}
// WaitForBlockchainSync waits for the target node to be fully synchronized
// with the blockchain. If the passed context object has a set timeout, it will
// continually poll until the timeout has elapsed. In the case that the chain
// isn't synced before the timeout is up, this function will return an error.
func (hn *HarnessNode) WaitForBlockchainSync() error {
ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
defer cancel()
ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()
for {
resp, err := hn.rpc.LN.GetInfo(ctxt, &lnrpc.GetInfoRequest{})
if err != nil {
return err
}
if resp.SyncedToChain {
return nil
}
select {
case <-ctxt.Done():
return fmt.Errorf("timeout while waiting for " +
"blockchain sync")
case <-hn.runCtx.Done():
return nil
case <-ticker.C:
}
}
}
// WaitForBalance waits until the node sees the expected confirmed/unconfirmed
// balance within their wallet.
func (hn *HarnessNode) WaitForBalance(expectedBalance btcutil.Amount,
confirmed bool) error {
req := &lnrpc.WalletBalanceRequest{}
var lastBalance btcutil.Amount
doesBalanceMatch := func() bool {
balance, err := hn.rpc.LN.WalletBalance(hn.runCtx, req)
if err != nil {
return false
}
if confirmed {
lastBalance = btcutil.Amount(balance.ConfirmedBalance)
return btcutil.Amount(balance.ConfirmedBalance) ==
expectedBalance
}
lastBalance = btcutil.Amount(balance.UnconfirmedBalance)
return btcutil.Amount(balance.UnconfirmedBalance) ==
expectedBalance
}
err := wait.Predicate(doesBalanceMatch, DefaultTimeout)
if err != nil {
return fmt.Errorf("balances not synced after deadline: "+
"expected %v, only have %v", expectedBalance,
lastBalance)
}
return nil
}
// PrintErrf prints an error to the console.
func (hn *HarnessNode) PrintErrf(format string, a ...interface{}) {
fmt.Printf("itest error from [node:%s]: %s\n", // nolint:forbidigo
hn.Cfg.Name, fmt.Sprintf(format, a...))
}
// handleChannelEdgeUpdates takes a series of channel edge updates, extracts
// the outpoints, and saves them to harness node's internal state.
func (hn *HarnessNode) handleChannelEdgeUpdates(
updates []*lnrpc.ChannelEdgeUpdate) {
// For each new channel, we'll increment the number of
// edges seen by one.
for _, newChan := range updates {
op, err := MakeOutpoint(newChan.ChanPoint)
if err != nil {
hn.PrintErrf("failed to create outpoint for %v "+
"got err: %v", newChan.ChanPoint, err)
return
}
hn.openChans[op]++
// For this new channel, if the number of edges seen is less
// than two, then the channel hasn't been fully announced yet.
if numEdges := hn.openChans[op]; numEdges < 2 {
return
}
// Otherwise, we'll notify all the registered watchers and
// remove the dispatched watchers.
for _, eventChan := range hn.openChanWatchers[op] {
close(eventChan)
}
delete(hn.openChanWatchers, op)
// Check whether there's a routing policy update. If so, save
// it to the node state.
if newChan.RoutingPolicy == nil {
continue
}
// Append the policy to the slice.
node := newChan.AdvertisingNode
policies := hn.policyUpdates[op.String()]
// If the map[op] is nil, we need to initialize the map first.
if policies == nil {
policies = make(map[string][]*lnrpc.RoutingPolicy)
}
policies[node] = append(
policies[node], newChan.RoutingPolicy,
)
hn.policyUpdates[op.String()] = policies
}
}
// handleOpenChannelWatchRequest processes a watch open channel request by
// checking the number of the edges seen for a given channel point. If the
// number is no less than 2 then the channel is considered open. Otherwise, we
// will attempt to find it in its channel graph. If neither can be found, the
// request is added to a watch request list than will be handled by
// handleChannelEdgeUpdates.
func (hn *HarnessNode) handleOpenChannelWatchRequest(req *chanWatchRequest) {
targetChan := req.chanPoint
// If this is an open request, then it can be dispatched if the number
// of edges seen for the channel is at least two.
if numEdges := hn.openChans[targetChan]; numEdges >= 2 {
close(req.eventChan)
return
}
// Before we add the channel to our set of open clients, we'll check to
// see if the channel is already in the channel graph of the target
// node. This lets us handle the case where a node has already seen a
// channel before a notification has been requested, causing us to miss
// it.
chanFound := hn.checkChanPointInGraph(targetChan)
if chanFound {
close(req.eventChan)
return
}
// Otherwise, we'll add this to the list of open channel watchers for
// this out point.
hn.openChanWatchers[targetChan] = append(
hn.openChanWatchers[targetChan],
req.eventChan,
)
}
// handleClosedChannelUpdate takes a series of closed channel updates, extracts
// the outpoints, saves them to harness node's internal state, and notifies all
// registered clients.
func (hn *HarnessNode) handleClosedChannelUpdate(
updates []*lnrpc.ClosedChannelUpdate) {
// For each channel closed, we'll mark that we've detected a channel
// closure while lnd was pruning the channel graph.
for _, closedChan := range updates {
op, err := MakeOutpoint(closedChan.ChanPoint)
if err != nil {
hn.PrintErrf("failed to create outpoint for %v "+
"got err: %v", closedChan.ChanPoint, err)
return
}
hn.closedChans[op] = struct{}{}
// As the channel has been closed, we'll notify all register
// watchers.
for _, eventChan := range hn.closeChanWatchers[op] {
close(eventChan)
}
delete(hn.closeChanWatchers, op)
}
}
// handleCloseChannelWatchRequest processes a watch close channel request by
// checking whether the given channel point can be found in the node's internal
// state. If not, the request is added to a watch request list than will be
// handled by handleCloseChannelWatchRequest.
func (hn *HarnessNode) handleCloseChannelWatchRequest(req *chanWatchRequest) {
targetChan := req.chanPoint
// If this is a close request, then it can be immediately dispatched if
// we've already seen a channel closure for this channel.
if _, ok := hn.closedChans[targetChan]; ok {
close(req.eventChan)
return
}
// Otherwise, we'll add this to the list of close channel watchers for
// this out point.
hn.closeChanWatchers[targetChan] = append(
hn.closeChanWatchers[targetChan],
req.eventChan,
)
}
type topologyClient lnrpc.Lightning_SubscribeChannelGraphClient
// newTopologyClient creates a topology client.
func (hn *HarnessNode) newTopologyClient(
ctx context.Context) (topologyClient, error) {
req := &lnrpc.GraphTopologySubscription{}
client, err := hn.rpc.LN.SubscribeChannelGraph(ctx, req)
if err != nil {
return nil, fmt.Errorf("%s(%d): unable to create topology "+
"client: %v (%s)", hn.Name(), hn.NodeID, err,
time.Now().String())
}
return client, nil
}
// receiveTopologyClientStream initializes a topologyClient to subscribe
// topology update events. Due to a race condition between the ChannelRouter
// starting and us making the subscription request, it's possible for our graph
// subscription to fail. In that case, we will retry the subscription until it
// succeeds or fail after 10 seconds.
//
// NOTE: must be run as a goroutine.
func (hn *HarnessNode) receiveTopologyClientStream(
receiver chan *lnrpc.GraphTopologyUpdate) error {
// Create a topology client to receive graph updates.
client, err := hn.newTopologyClient(hn.runCtx)
if err != nil {
return fmt.Errorf("create topologyClient failed: %w", err)
}
// We use the context to time out when retrying graph subscription.
ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
defer cancel()
for {
update, err := client.Recv()
switch {
case err == nil:
// Good case. We will send the update to the receiver.
case strings.Contains(err.Error(), "router not started"):
// If the router hasn't been started, we will retry
// every 200 ms until it has been started or fail
// after the ctxt is timed out.
select {
case <-ctxt.Done():
return fmt.Errorf("graph subscription: " +
"router not started before timeout")
case <-time.After(wait.PollInterval):
case <-hn.runCtx.Done():
return nil
}
// Re-create the topology client.
client, err = hn.newTopologyClient(hn.runCtx)
if err != nil {
return fmt.Errorf("create topologyClient "+
"failed: %v", err)
}
continue
case strings.Contains(err.Error(), "EOF"):
// End of subscription stream. Do nothing and quit.
return nil
case strings.Contains(err.Error(), context.Canceled.Error()):
// End of subscription stream. Do nothing and quit.
return nil
default:
// An expected error is returned, return and leave it
// to be handled by the caller.
return fmt.Errorf("graph subscription err: %w", err)
}
// Send the update or quit.
select {
case receiver <- update:
case <-hn.runCtx.Done():
return nil
}
}
}
// handlePolicyUpdateWatchRequest checks that if the expected policy can be
// found either in the node's interval state or describe graph response. If
// found, it will signal the request by closing the event channel. Otherwise it
// does nothing but returns nil.
func (hn *HarnessNode) handlePolicyUpdateWatchRequest(req *chanWatchRequest) {
op := req.chanPoint
// Get a list of known policies for this chanPoint+advertisingNode
// combination. Start searching in the node state first.
policies, ok := hn.policyUpdates[op.String()][req.advertisingNode]
if !ok {
// If it cannot be found in the node state, try searching it
// from the node's DescribeGraph.
policyMap := hn.getChannelPolicies(req.includeUnannounced)
policies, ok = policyMap[op.String()][req.advertisingNode]
if !ok {
return
}
}
// Check if there's a matched policy.
for _, policy := range policies {
if CheckChannelPolicy(policy, req.policy) == nil {
close(req.eventChan)
return
}
}
}
// getChannelPolicies queries the channel graph and formats the policies into
// the format defined in type policyUpdateMap.
func (hn *HarnessNode) getChannelPolicies(include bool) policyUpdateMap {
ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
defer cancel()
graph, err := hn.rpc.LN.DescribeGraph(ctxt, &lnrpc.ChannelGraphRequest{
IncludeUnannounced: include,
})
if err != nil {
hn.PrintErrf("DescribeGraph got err: %v", err)
return nil
}
policyUpdates := policyUpdateMap{}
for _, e := range graph.Edges {
policies := policyUpdates[e.ChanPoint]
// If the map[op] is nil, we need to initialize the map first.
if policies == nil {
policies = make(map[string][]*lnrpc.RoutingPolicy)
}
if e.Node1Policy != nil {
policies[e.Node1Pub] = append(
policies[e.Node1Pub], e.Node1Policy,
)
}
if e.Node2Policy != nil {
policies[e.Node2Pub] = append(
policies[e.Node2Pub], e.Node2Policy,
)
}
policyUpdates[e.ChanPoint] = policies
}
return policyUpdates
}
// renameFile is a helper to rename (log) files created during integration
// tests.
func renameFile(fromFileName, toFileName string) {
err := os.Rename(fromFileName, toFileName)
if err != nil {
fmt.Printf("could not rename %s to %s: %v\n", // nolint:forbidigo
fromFileName, toFileName, err)
}
}
// getFinalizedLogFilePrefix returns the finalize log filename.
func getFinalizedLogFilePrefix(hn *HarnessNode) string {
pubKeyHex := hex.EncodeToString(
hn.PubKey[:logPubKeyBytes],
)
return fmt.Sprintf("%s/%d-%s-%s-%s",
GetLogDir(), hn.NodeID,
hn.Cfg.LogFilenamePrefix,
hn.Cfg.Name, pubKeyHex)
}
// finalizeLogfile makes sure the log file cleanup function is initialized,
// even if no log file is created.
func finalizeLogfile(hn *HarnessNode, fileName string) {
if hn.logFile != nil {
hn.logFile.Close()
// If logoutput flag is not set, return early.
if !*logOutput {
return
}
newFileName := fmt.Sprintf("%v.log",
getFinalizedLogFilePrefix(hn),
)
renameFile(fileName, newFileName)
}
}
func finalizeEtcdLog(hn *HarnessNode) {
if hn.Cfg.DbBackend != BackendEtcd {
return
}
etcdLogFileName := fmt.Sprintf("%s/etcd.log", hn.Cfg.LogDir)
newEtcdLogFileName := fmt.Sprintf("%v-etcd.log",
getFinalizedLogFilePrefix(hn),
)
renameFile(etcdLogFileName, newEtcdLogFileName)
}
func addLogFile(hn *HarnessNode) (string, error) {
var fileName string
dir := GetLogDir()
fileName = fmt.Sprintf("%s/%d-%s-%s-%s.log", dir, hn.NodeID,
hn.Cfg.LogFilenamePrefix, hn.Cfg.Name,
hex.EncodeToString(hn.PubKey[:logPubKeyBytes]))
// If the node's PubKey is not yet initialized, create a
// temporary file name. Later, after the PubKey has been
// initialized, the file can be moved to its final name with
// the PubKey included.
if bytes.Equal(hn.PubKey[:4], []byte{0, 0, 0, 0}) {
fileName = fmt.Sprintf("%s/%d-%s-%s-tmp__.log", dir,
hn.NodeID, hn.Cfg.LogFilenamePrefix,
hn.Cfg.Name)
}
// Create file if not exists, otherwise append.
file, err := os.OpenFile(fileName,
os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
return fileName, err
}
// Pass node's stderr to both errb and the file.
w := io.MultiWriter(hn.cmd.Stderr, file)
hn.cmd.Stderr = w
// Pass the node's stdout only to the file.
hn.cmd.Stdout = file
// Let the node keep a reference to this file, such
// that we can add to it if necessary.
hn.logFile = file
return fileName, nil
}