From 88217369c2f2faf13808ec3730775dc72cbd7c24 Mon Sep 17 00:00:00 2001 From: Christian Decker Date: Thu, 2 Aug 2018 16:20:48 +0200 Subject: [PATCH] pytest: Move NodeFactory to utils.py We are starting to move things out of test_lightningd.py so this is a logical first step. --- tests/fixtures.py | 7 +- tests/test_lightningd.py | 161 +------------------------------- tests/utils.py | 196 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 204 insertions(+), 160 deletions(-) diff --git a/tests/fixtures.py b/tests/fixtures.py index a1392a303..01df94b26 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -1,11 +1,12 @@ from concurrent import futures -from test_lightningd import NodeFactory +from utils import NodeFactory import logging import os import pytest import re import shutil +import sys import tempfile import utils @@ -18,6 +19,10 @@ DEVELOPER = os.getenv("DEVELOPER", config['DEVELOPER']) == "1" TEST_DEBUG = os.getenv("TEST_DEBUG", "0") == "1" +if TEST_DEBUG: + logging.basicConfig(level=logging.DEBUG, stream=sys.stdout) + + # A dict in which we count how often a particular test has run so far. Used to # give each attempt its own numbered directory, and avoid clashes. __attempts = {} diff --git a/tests/test_lightningd.py b/tests/test_lightningd.py index 935223883..0d7768a31 100644 --- a/tests/test_lightningd.py +++ b/tests/test_lightningd.py @@ -2,7 +2,7 @@ from concurrent import futures from decimal import Decimal from ephemeral_port_reserve import reserve as reserve_port from flaky import flaky -from utils import wait_for +from utils import NodeFactory, wait_for, only_one import copy import json @@ -45,13 +45,6 @@ def to_json(arg): return json.loads(json.dumps(arg)) -def only_one(arr): - """Many JSON RPC calls return an array; often we only expect a single entry - """ - assert len(arr) == 1 - return arr[0] - - def setupBitcoind(directory): global bitcoind bitcoind = utils.BitcoinD(bitcoin_dir=directory, rpcport=None) @@ -103,156 +96,6 @@ def teardown_bitcoind(): bitcoind.proc.wait() -class NodeFactory(object): - """A factory to setup and start `lightningd` daemons. - """ - def __init__(self, testname, bitcoind, executor, directory=None): - self.testname = testname - self.next_id = 1 - self.nodes = [] - self.executor = executor - self.bitcoind = bitcoind - if directory is not None: - self.directory = directory - else: - self.directory = os.path.join(TEST_DIR, testname) - self.lock = threading.Lock() - - def split_options(self, opts): - """Split node options from cli options - - Some options are used to instrument the node wrapper and some are passed - to the daemon on the command line. Split them so we know where to use - them. - """ - node_opt_keys = [ - 'disconnect', - 'may_fail', - 'may_reconnect', - 'random_hsm', - 'fake_bitcoin_cli' - ] - node_opts = {k: v for k, v in opts.items() if k in node_opt_keys} - cli_opts = {k: v for k, v in opts.items() if k not in node_opt_keys} - return node_opts, cli_opts - - def get_next_port(self): - with self.lock: - return reserve_port() - - def get_nodes(self, num_nodes, opts=None): - """Start a number of nodes in parallel, each with its own options - """ - if opts is None: - # No opts were passed in, give some dummy opts - opts = [{} for _ in range(num_nodes)] - elif isinstance(opts, dict): - # A single dict was passed in, so we use these opts for all nodes - opts = [opts] * num_nodes - - assert len(opts) == num_nodes - - jobs = [] - for i in range(num_nodes): - node_opts, cli_opts = self.split_options(opts[i]) - jobs.append(self.executor.submit(self.get_node, options=cli_opts, **node_opts)) - - return [j.result() for j in jobs] - - def get_node(self, disconnect=None, options=None, may_fail=False, may_reconnect=False, random_hsm=False, fake_bitcoin_cli=False): - with self.lock: - node_id = self.next_id - self.next_id += 1 - port = self.get_next_port() - - lightning_dir = os.path.join( - self.directory, "lightning-{}/".format(node_id)) - - if os.path.exists(lightning_dir): - shutil.rmtree(lightning_dir) - - socket_path = os.path.join(lightning_dir, "lightning-rpc").format(node_id) - daemon = utils.LightningD( - lightning_dir, self.bitcoind.bitcoin_dir, - port=port, random_hsm=random_hsm, node_id=node_id - ) - # If we have a disconnect string, dump it to a file for daemon. - if disconnect: - with open(os.path.join(lightning_dir, "dev_disconnect"), "w") as f: - f.write("\n".join(disconnect)) - daemon.opts["dev-disconnect"] = "dev_disconnect" - if DEVELOPER: - daemon.opts["dev-fail-on-subdaemon-fail"] = None - daemon.env["LIGHTNINGD_DEV_MEMLEAK"] = "1" - if VALGRIND: - daemon.env["LIGHTNINGD_DEV_NO_BACKTRACE"] = "1" - if not may_reconnect: - daemon.opts["dev-no-reconnect"] = None - - if fake_bitcoin_cli: - cli = os.path.join(lightning_dir, "fake-bitcoin-cli") - with open(cli, "w") as text_file: - print("""#! /bin/sh - ! [ -f bitcoin-cli-fail ] || exit `cat bitcoin-cli-fail` - exec bitcoin-cli "$@" - """, file=text_file) - os.chmod(cli, os.stat(cli).st_mode | stat.S_IEXEC) - daemon.opts['bitcoin-cli'] = cli - - if options is not None: - daemon.opts.update(options) - - rpc = LightningRpc(socket_path, self.executor) - - node = utils.LightningNode(daemon, rpc, self.bitcoind, self.executor, may_fail=may_fail, may_reconnect=may_reconnect) - self.nodes.append(node) - if VALGRIND: - node.daemon.cmd_prefix = [ - 'valgrind', - '-q', - '--trace-children=yes', - '--trace-children-skip=*bitcoin-cli*', - '--error-exitcode=7', - '--log-file={}/valgrind-errors.%p'.format(node.daemon.lightning_dir) - ] - - try: - node.start() - except Exception: - node.daemon.stop() - raise - - # Cache `getinfo`, we'll be using it a lot - node.info = node.rpc.getinfo() - return node - - def killall(self, expected_successes): - """Returns true if every node we expected to succeed actually succeeded""" - unexpected_fail = False - for i in range(len(self.nodes)): - leaks = None - # leak detection upsets VALGRIND by reading uninitialized mem. - # If it's dead, we'll catch it below. - if not VALGRIND: - try: - # This also puts leaks in log. - leaks = self.nodes[i].rpc.dev_memleak()['leaks'] - except Exception: - pass - - try: - self.nodes[i].stop() - except Exception: - if expected_successes[i]: - unexpected_fail = True - - if leaks is not None and len(leaks) != 0: - raise Exception("Node {} has memory leaks: {}" - .format(self.nodes[i].daemon.lightning_dir, leaks)) - - return not unexpected_fail - - class BaseLightningDTests(unittest.TestCase): def setUp(self): bitcoin_dir = os.path.join(TEST_DIR, self._testMethodName, "bitcoind") @@ -260,7 +103,7 @@ class BaseLightningDTests(unittest.TestCase): # Most of the executor threads will be waiting for IO, so # let's have a few of them self.executor = futures.ThreadPoolExecutor(max_workers=20) - self.node_factory = NodeFactory(self._testMethodName, bitcoind, self.executor) + self.node_factory = NodeFactory(self._testMethodName, bitcoind, self.executor, directory=TEST_DIR) def getValgrindErrors(self, node): for error_file in os.listdir(node.daemon.lightning_dir): diff --git a/tests/utils.py b/tests/utils.py index 9c1e35104..d79d4216a 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -2,6 +2,7 @@ import logging import os import re import sqlite3 +import stat import subprocess import threading import time @@ -9,6 +10,7 @@ import time from bitcoin.rpc import RawProxy as BitcoinProxy from decimal import Decimal from ephemeral_port_reserve import reserve +from lightning import LightningRpc, RpcError BITCOIND_CONFIG = { @@ -32,6 +34,7 @@ with open('config.vars') as configfile: DEVELOPER = os.getenv("DEVELOPER", config['DEVELOPER']) == "1" TIMEOUT = int(os.getenv("TIMEOUT", "60")) +VALGRIND = os.getenv("VALGRIND", config['VALGRIND']) == "1" def wait_for(success, timeout=TIMEOUT, interval=0.1): @@ -52,6 +55,13 @@ def write_config(filename, opts, regtest_opts=None): f.write("{}={}\n".format(k, v)) +def only_one(arr): + """Many JSON RPC calls return an array; often we only expect a single entry + """ + assert len(arr) == 1 + return arr[0] + + class TailableProc(object): """A monitorable process that we can start, stop and tail. @@ -504,3 +514,189 @@ class LightningNode(object): def wait_channel_active(self, chanid): wait_for(lambda: self.is_channel_active(chanid), interval=1) + + +class NodeFactory(object): + """A factory to setup and start `lightningd` daemons. + """ + def __init__(self, testname, bitcoind, executor, directory): + self.testname = testname + self.next_id = 1 + self.nodes = [] + self.executor = executor + self.bitcoind = bitcoind + self.directory = directory + self.lock = threading.Lock() + + def split_options(self, opts): + """Split node options from cli options + + Some options are used to instrument the node wrapper and some are passed + to the daemon on the command line. Split them so we know where to use + them. + """ + node_opt_keys = [ + 'disconnect', + 'may_fail', + 'may_reconnect', + 'random_hsm', + 'fake_bitcoin_cli' + ] + node_opts = {k: v for k, v in opts.items() if k in node_opt_keys} + cli_opts = {k: v for k, v in opts.items() if k not in node_opt_keys} + return node_opts, cli_opts + + def get_next_port(self): + with self.lock: + return reserve() + + def get_nodes(self, num_nodes, opts=None): + """Start a number of nodes in parallel, each with its own options + """ + if opts is None: + # No opts were passed in, give some dummy opts + opts = [{} for _ in range(num_nodes)] + elif isinstance(opts, dict): + # A single dict was passed in, so we use these opts for all nodes + opts = [opts] * num_nodes + + assert len(opts) == num_nodes + + jobs = [] + for i in range(num_nodes): + node_opts, cli_opts = self.split_options(opts[i]) + jobs.append(self.executor.submit(self.get_node, options=cli_opts, **node_opts)) + + return [j.result() for j in jobs] + + def get_node(self, disconnect=None, options=None, may_fail=False, may_reconnect=False, random_hsm=False, + fake_bitcoin_cli=False): + with self.lock: + node_id = self.next_id + self.next_id += 1 + port = self.get_next_port() + + lightning_dir = os.path.join( + self.directory, "lightning-{}/".format(node_id)) + + if os.path.exists(lightning_dir): + shutil.rmtree(lightning_dir) + + socket_path = os.path.join(lightning_dir, "lightning-rpc").format(node_id) + daemon = LightningD( + lightning_dir, self.bitcoind.bitcoin_dir, + port=port, random_hsm=random_hsm, node_id=node_id + ) + # If we have a disconnect string, dump it to a file for daemon. + if disconnect: + with open(os.path.join(lightning_dir, "dev_disconnect"), "w") as f: + f.write("\n".join(disconnect)) + daemon.opts["dev-disconnect"] = "dev_disconnect" + if DEVELOPER: + daemon.opts["dev-fail-on-subdaemon-fail"] = None + daemon.env["LIGHTNINGD_DEV_MEMLEAK"] = "1" + if VALGRIND: + daemon.env["LIGHTNINGD_DEV_NO_BACKTRACE"] = "1" + if not may_reconnect: + daemon.opts["dev-no-reconnect"] = None + + if fake_bitcoin_cli: + cli = os.path.join(lightning_dir, "fake-bitcoin-cli") + with open(cli, "w") as text_file: + print("""#! /bin/sh + ! [ -f bitcoin-cli-fail ] || exit `cat bitcoin-cli-fail` + exec bitcoin-cli "$@" + """, file=text_file) + os.chmod(cli, os.stat(cli).st_mode | stat.S_IEXEC) + daemon.opts['bitcoin-cli'] = cli + + if options is not None: + daemon.opts.update(options) + + rpc = LightningRpc(socket_path, self.executor) + + node = LightningNode(daemon, rpc, self.bitcoind, self.executor, may_fail=may_fail, + may_reconnect=may_reconnect) + self.nodes.append(node) + if VALGRIND: + node.daemon.cmd_prefix = [ + 'valgrind', + '-q', + '--trace-children=yes', + '--trace-children-skip=*bitcoin-cli*', + '--error-exitcode=7', + '--log-file={}/valgrind-errors.%p'.format(node.daemon.lightning_dir) + ] + + try: + node.start() + except Exception: + node.daemon.stop() + raise + + # Cache `getinfo`, we'll be using it a lot + node.info = node.rpc.getinfo() + return node + + def line_graph(self, num_nodes, fundchannel=True, fundamount=10**6, announce=False): + """ Create nodes, connect them and optionally fund channels. + """ + nodes = self.get_nodes(num_nodes) + bitcoin = nodes[0].bitcoin + connections = [(nodes[i], nodes[i+1]) for i in range(0, num_nodes-1)] + + for src, dst in connections: + src.rpc.connect(dst.info['id'], 'localhost', dst.port) + + if not fundchannel: + return nodes + + # If we got here, we want to fund channels + for src, dst in connections: + addr = src.rpc.newaddr()['address'] + src.bitcoin.rpc.sendtoaddress(addr, (fundamount + 1000000) / 10**8) + + bitcoin.generate_block(1) + for src, dst in connections: + wait_for(lambda: len(src.rpc.listfunds()['outputs']) > 0) + tx = src.rpc.fundchannel(dst.info['id'], fundamount) + wait_for(lambda: tx['txid'] in bitcoin.rpc.getrawmempool()) + + # Confirm all channels and wait for them to become usable + bitcoin.generate_block(1) + for src, dst in connections: + wait_for(lambda: src.channel_state(dst) == 'CHANNELD_NORMAL') + scid = src.get_channel_scid(dst) + src.daemon.wait_for_log(r'Received channel_update for channel {scid}\(.\) now ACTIVE'.format(scid=scid)) + + if not announce: + return nodes + + bitcoin.generate_block(5) + return nodes + + def killall(self, expected_successes): + """Returns true if every node we expected to succeed actually succeeded""" + unexpected_fail = False + for i in range(len(self.nodes)): + leaks = None + # leak detection upsets VALGRIND by reading uninitialized mem. + # If it's dead, we'll catch it below. + if not VALGRIND: + try: + # This also puts leaks in log. + leaks = self.nodes[i].rpc.dev_memleak()['leaks'] + except Exception: + pass + + try: + self.nodes[i].stop() + except Exception: + if expected_successes[i]: + unexpected_fail = True + + if leaks is not None and len(leaks) != 0: + raise Exception("Node {} has memory leaks: {}" + .format(self.nodes[i].daemon.lightning_dir, leaks)) + + return not unexpected_fail