pytest: Move NodeFactory to utils.py

We are starting to move things out of test_lightningd.py so this is a logical
first step.
This commit is contained in:
Christian Decker 2018-08-02 16:20:48 +02:00 committed by Rusty Russell
parent 6dae525c07
commit 88217369c2
3 changed files with 204 additions and 160 deletions

View File

@ -1,11 +1,12 @@
from concurrent import futures
from test_lightningd import NodeFactory
from utils import NodeFactory
import logging
import os
import pytest
import re
import shutil
import sys
import tempfile
import utils
@ -18,6 +19,10 @@ DEVELOPER = os.getenv("DEVELOPER", config['DEVELOPER']) == "1"
TEST_DEBUG = os.getenv("TEST_DEBUG", "0") == "1"
if TEST_DEBUG:
logging.basicConfig(level=logging.DEBUG, stream=sys.stdout)
# A dict in which we count how often a particular test has run so far. Used to
# give each attempt its own numbered directory, and avoid clashes.
__attempts = {}

View File

@ -2,7 +2,7 @@ from concurrent import futures
from decimal import Decimal
from ephemeral_port_reserve import reserve as reserve_port
from flaky import flaky
from utils import wait_for
from utils import NodeFactory, wait_for, only_one
import copy
import json
@ -45,13 +45,6 @@ def to_json(arg):
return json.loads(json.dumps(arg))
def only_one(arr):
"""Many JSON RPC calls return an array; often we only expect a single entry
"""
assert len(arr) == 1
return arr[0]
def setupBitcoind(directory):
global bitcoind
bitcoind = utils.BitcoinD(bitcoin_dir=directory, rpcport=None)
@ -103,156 +96,6 @@ def teardown_bitcoind():
bitcoind.proc.wait()
class NodeFactory(object):
"""A factory to setup and start `lightningd` daemons.
"""
def __init__(self, testname, bitcoind, executor, directory=None):
self.testname = testname
self.next_id = 1
self.nodes = []
self.executor = executor
self.bitcoind = bitcoind
if directory is not None:
self.directory = directory
else:
self.directory = os.path.join(TEST_DIR, testname)
self.lock = threading.Lock()
def split_options(self, opts):
"""Split node options from cli options
Some options are used to instrument the node wrapper and some are passed
to the daemon on the command line. Split them so we know where to use
them.
"""
node_opt_keys = [
'disconnect',
'may_fail',
'may_reconnect',
'random_hsm',
'fake_bitcoin_cli'
]
node_opts = {k: v for k, v in opts.items() if k in node_opt_keys}
cli_opts = {k: v for k, v in opts.items() if k not in node_opt_keys}
return node_opts, cli_opts
def get_next_port(self):
with self.lock:
return reserve_port()
def get_nodes(self, num_nodes, opts=None):
"""Start a number of nodes in parallel, each with its own options
"""
if opts is None:
# No opts were passed in, give some dummy opts
opts = [{} for _ in range(num_nodes)]
elif isinstance(opts, dict):
# A single dict was passed in, so we use these opts for all nodes
opts = [opts] * num_nodes
assert len(opts) == num_nodes
jobs = []
for i in range(num_nodes):
node_opts, cli_opts = self.split_options(opts[i])
jobs.append(self.executor.submit(self.get_node, options=cli_opts, **node_opts))
return [j.result() for j in jobs]
def get_node(self, disconnect=None, options=None, may_fail=False, may_reconnect=False, random_hsm=False, fake_bitcoin_cli=False):
with self.lock:
node_id = self.next_id
self.next_id += 1
port = self.get_next_port()
lightning_dir = os.path.join(
self.directory, "lightning-{}/".format(node_id))
if os.path.exists(lightning_dir):
shutil.rmtree(lightning_dir)
socket_path = os.path.join(lightning_dir, "lightning-rpc").format(node_id)
daemon = utils.LightningD(
lightning_dir, self.bitcoind.bitcoin_dir,
port=port, random_hsm=random_hsm, node_id=node_id
)
# If we have a disconnect string, dump it to a file for daemon.
if disconnect:
with open(os.path.join(lightning_dir, "dev_disconnect"), "w") as f:
f.write("\n".join(disconnect))
daemon.opts["dev-disconnect"] = "dev_disconnect"
if DEVELOPER:
daemon.opts["dev-fail-on-subdaemon-fail"] = None
daemon.env["LIGHTNINGD_DEV_MEMLEAK"] = "1"
if VALGRIND:
daemon.env["LIGHTNINGD_DEV_NO_BACKTRACE"] = "1"
if not may_reconnect:
daemon.opts["dev-no-reconnect"] = None
if fake_bitcoin_cli:
cli = os.path.join(lightning_dir, "fake-bitcoin-cli")
with open(cli, "w") as text_file:
print("""#! /bin/sh
! [ -f bitcoin-cli-fail ] || exit `cat bitcoin-cli-fail`
exec bitcoin-cli "$@"
""", file=text_file)
os.chmod(cli, os.stat(cli).st_mode | stat.S_IEXEC)
daemon.opts['bitcoin-cli'] = cli
if options is not None:
daemon.opts.update(options)
rpc = LightningRpc(socket_path, self.executor)
node = utils.LightningNode(daemon, rpc, self.bitcoind, self.executor, may_fail=may_fail, may_reconnect=may_reconnect)
self.nodes.append(node)
if VALGRIND:
node.daemon.cmd_prefix = [
'valgrind',
'-q',
'--trace-children=yes',
'--trace-children-skip=*bitcoin-cli*',
'--error-exitcode=7',
'--log-file={}/valgrind-errors.%p'.format(node.daemon.lightning_dir)
]
try:
node.start()
except Exception:
node.daemon.stop()
raise
# Cache `getinfo`, we'll be using it a lot
node.info = node.rpc.getinfo()
return node
def killall(self, expected_successes):
"""Returns true if every node we expected to succeed actually succeeded"""
unexpected_fail = False
for i in range(len(self.nodes)):
leaks = None
# leak detection upsets VALGRIND by reading uninitialized mem.
# If it's dead, we'll catch it below.
if not VALGRIND:
try:
# This also puts leaks in log.
leaks = self.nodes[i].rpc.dev_memleak()['leaks']
except Exception:
pass
try:
self.nodes[i].stop()
except Exception:
if expected_successes[i]:
unexpected_fail = True
if leaks is not None and len(leaks) != 0:
raise Exception("Node {} has memory leaks: {}"
.format(self.nodes[i].daemon.lightning_dir, leaks))
return not unexpected_fail
class BaseLightningDTests(unittest.TestCase):
def setUp(self):
bitcoin_dir = os.path.join(TEST_DIR, self._testMethodName, "bitcoind")
@ -260,7 +103,7 @@ class BaseLightningDTests(unittest.TestCase):
# Most of the executor threads will be waiting for IO, so
# let's have a few of them
self.executor = futures.ThreadPoolExecutor(max_workers=20)
self.node_factory = NodeFactory(self._testMethodName, bitcoind, self.executor)
self.node_factory = NodeFactory(self._testMethodName, bitcoind, self.executor, directory=TEST_DIR)
def getValgrindErrors(self, node):
for error_file in os.listdir(node.daemon.lightning_dir):

View File

@ -2,6 +2,7 @@ import logging
import os
import re
import sqlite3
import stat
import subprocess
import threading
import time
@ -9,6 +10,7 @@ import time
from bitcoin.rpc import RawProxy as BitcoinProxy
from decimal import Decimal
from ephemeral_port_reserve import reserve
from lightning import LightningRpc, RpcError
BITCOIND_CONFIG = {
@ -32,6 +34,7 @@ with open('config.vars') as configfile:
DEVELOPER = os.getenv("DEVELOPER", config['DEVELOPER']) == "1"
TIMEOUT = int(os.getenv("TIMEOUT", "60"))
VALGRIND = os.getenv("VALGRIND", config['VALGRIND']) == "1"
def wait_for(success, timeout=TIMEOUT, interval=0.1):
@ -52,6 +55,13 @@ def write_config(filename, opts, regtest_opts=None):
f.write("{}={}\n".format(k, v))
def only_one(arr):
"""Many JSON RPC calls return an array; often we only expect a single entry
"""
assert len(arr) == 1
return arr[0]
class TailableProc(object):
"""A monitorable process that we can start, stop and tail.
@ -504,3 +514,189 @@ class LightningNode(object):
def wait_channel_active(self, chanid):
wait_for(lambda: self.is_channel_active(chanid), interval=1)
class NodeFactory(object):
"""A factory to setup and start `lightningd` daemons.
"""
def __init__(self, testname, bitcoind, executor, directory):
self.testname = testname
self.next_id = 1
self.nodes = []
self.executor = executor
self.bitcoind = bitcoind
self.directory = directory
self.lock = threading.Lock()
def split_options(self, opts):
"""Split node options from cli options
Some options are used to instrument the node wrapper and some are passed
to the daemon on the command line. Split them so we know where to use
them.
"""
node_opt_keys = [
'disconnect',
'may_fail',
'may_reconnect',
'random_hsm',
'fake_bitcoin_cli'
]
node_opts = {k: v for k, v in opts.items() if k in node_opt_keys}
cli_opts = {k: v for k, v in opts.items() if k not in node_opt_keys}
return node_opts, cli_opts
def get_next_port(self):
with self.lock:
return reserve()
def get_nodes(self, num_nodes, opts=None):
"""Start a number of nodes in parallel, each with its own options
"""
if opts is None:
# No opts were passed in, give some dummy opts
opts = [{} for _ in range(num_nodes)]
elif isinstance(opts, dict):
# A single dict was passed in, so we use these opts for all nodes
opts = [opts] * num_nodes
assert len(opts) == num_nodes
jobs = []
for i in range(num_nodes):
node_opts, cli_opts = self.split_options(opts[i])
jobs.append(self.executor.submit(self.get_node, options=cli_opts, **node_opts))
return [j.result() for j in jobs]
def get_node(self, disconnect=None, options=None, may_fail=False, may_reconnect=False, random_hsm=False,
fake_bitcoin_cli=False):
with self.lock:
node_id = self.next_id
self.next_id += 1
port = self.get_next_port()
lightning_dir = os.path.join(
self.directory, "lightning-{}/".format(node_id))
if os.path.exists(lightning_dir):
shutil.rmtree(lightning_dir)
socket_path = os.path.join(lightning_dir, "lightning-rpc").format(node_id)
daemon = LightningD(
lightning_dir, self.bitcoind.bitcoin_dir,
port=port, random_hsm=random_hsm, node_id=node_id
)
# If we have a disconnect string, dump it to a file for daemon.
if disconnect:
with open(os.path.join(lightning_dir, "dev_disconnect"), "w") as f:
f.write("\n".join(disconnect))
daemon.opts["dev-disconnect"] = "dev_disconnect"
if DEVELOPER:
daemon.opts["dev-fail-on-subdaemon-fail"] = None
daemon.env["LIGHTNINGD_DEV_MEMLEAK"] = "1"
if VALGRIND:
daemon.env["LIGHTNINGD_DEV_NO_BACKTRACE"] = "1"
if not may_reconnect:
daemon.opts["dev-no-reconnect"] = None
if fake_bitcoin_cli:
cli = os.path.join(lightning_dir, "fake-bitcoin-cli")
with open(cli, "w") as text_file:
print("""#! /bin/sh
! [ -f bitcoin-cli-fail ] || exit `cat bitcoin-cli-fail`
exec bitcoin-cli "$@"
""", file=text_file)
os.chmod(cli, os.stat(cli).st_mode | stat.S_IEXEC)
daemon.opts['bitcoin-cli'] = cli
if options is not None:
daemon.opts.update(options)
rpc = LightningRpc(socket_path, self.executor)
node = LightningNode(daemon, rpc, self.bitcoind, self.executor, may_fail=may_fail,
may_reconnect=may_reconnect)
self.nodes.append(node)
if VALGRIND:
node.daemon.cmd_prefix = [
'valgrind',
'-q',
'--trace-children=yes',
'--trace-children-skip=*bitcoin-cli*',
'--error-exitcode=7',
'--log-file={}/valgrind-errors.%p'.format(node.daemon.lightning_dir)
]
try:
node.start()
except Exception:
node.daemon.stop()
raise
# Cache `getinfo`, we'll be using it a lot
node.info = node.rpc.getinfo()
return node
def line_graph(self, num_nodes, fundchannel=True, fundamount=10**6, announce=False):
""" Create nodes, connect them and optionally fund channels.
"""
nodes = self.get_nodes(num_nodes)
bitcoin = nodes[0].bitcoin
connections = [(nodes[i], nodes[i+1]) for i in range(0, num_nodes-1)]
for src, dst in connections:
src.rpc.connect(dst.info['id'], 'localhost', dst.port)
if not fundchannel:
return nodes
# If we got here, we want to fund channels
for src, dst in connections:
addr = src.rpc.newaddr()['address']
src.bitcoin.rpc.sendtoaddress(addr, (fundamount + 1000000) / 10**8)
bitcoin.generate_block(1)
for src, dst in connections:
wait_for(lambda: len(src.rpc.listfunds()['outputs']) > 0)
tx = src.rpc.fundchannel(dst.info['id'], fundamount)
wait_for(lambda: tx['txid'] in bitcoin.rpc.getrawmempool())
# Confirm all channels and wait for them to become usable
bitcoin.generate_block(1)
for src, dst in connections:
wait_for(lambda: src.channel_state(dst) == 'CHANNELD_NORMAL')
scid = src.get_channel_scid(dst)
src.daemon.wait_for_log(r'Received channel_update for channel {scid}\(.\) now ACTIVE'.format(scid=scid))
if not announce:
return nodes
bitcoin.generate_block(5)
return nodes
def killall(self, expected_successes):
"""Returns true if every node we expected to succeed actually succeeded"""
unexpected_fail = False
for i in range(len(self.nodes)):
leaks = None
# leak detection upsets VALGRIND by reading uninitialized mem.
# If it's dead, we'll catch it below.
if not VALGRIND:
try:
# This also puts leaks in log.
leaks = self.nodes[i].rpc.dev_memleak()['leaks']
except Exception:
pass
try:
self.nodes[i].stop()
except Exception:
if expected_successes[i]:
unexpected_fail = True
if leaks is not None and len(leaks) != 0:
raise Exception("Node {} has memory leaks: {}"
.format(self.nodes[i].daemon.lightning_dir, leaks))
return not unexpected_fail