2018-11-21 11:29:59 +01:00
from bitcoin . rpc import RawProxy as BitcoinProxy
from btcproxy import BitcoinRpcProxy
2018-12-04 03:04:55 +01:00
from collections import OrderedDict
2018-11-21 11:29:59 +01:00
from decimal import Decimal
from ephemeral_port_reserve import reserve
from lightning import LightningRpc
import json
2017-01-14 20:32:08 +01:00
import logging
import os
2018-08-03 14:16:33 +02:00
import random
2017-01-14 20:32:08 +01:00
import re
2018-08-02 23:47:36 +02:00
import shutil
2017-08-14 15:59:35 +02:00
import sqlite3
2018-08-03 14:16:33 +02:00
import string
2017-01-14 20:32:08 +01:00
import subprocess
import threading
import time
2018-02-21 19:46:04 +01:00
2017-01-14 20:32:08 +01:00
BITCOIND_CONFIG = {
2018-06-20 09:18:01 +02:00
" regtest " : 1 ,
2017-01-14 20:32:08 +01:00
" rpcuser " : " rpcuser " ,
" rpcpassword " : " rpcpass " ,
}
2018-12-04 03:04:55 +01:00
LIGHTNINGD_CONFIG = OrderedDict ( {
2017-01-14 20:32:08 +01:00
" log-level " : " debug " ,
2017-10-23 06:16:57 +02:00
" cltv-delta " : 6 ,
" cltv-final " : 5 ,
2018-05-17 06:46:22 +02:00
" watchtime-blocks " : 5 ,
2018-04-21 17:39:14 +02:00
" rescan " : 1 ,
2018-06-20 13:42:39 +02:00
' disable-dns ' : None ,
2018-12-04 03:04:55 +01:00
} )
2017-01-14 20:32:08 +01:00
2018-07-16 07:52:56 +02:00
with open ( ' config.vars ' ) as configfile :
config = dict ( [ ( line . rstrip ( ) . split ( ' = ' , 1 ) ) for line in configfile ] )
DEVELOPER = os . getenv ( " DEVELOPER " , config [ ' DEVELOPER ' ] ) == " 1 "
2018-12-03 00:04:06 +01:00
EXPERIMENTAL_FEATURES = os . getenv ( " EXPERIMENTAL_FEATURES " , config [ ' EXPERIMENTAL_FEATURES ' ] ) == " 1 "
2018-05-10 09:49:34 +02:00
TIMEOUT = int ( os . getenv ( " TIMEOUT " , " 60 " ) )
2018-08-02 16:20:48 +02:00
VALGRIND = os . getenv ( " VALGRIND " , config [ ' VALGRIND ' ] ) == " 1 "
2018-10-28 12:36:58 +01:00
SLOW_MACHINE = os . getenv ( " SLOW_MACHINE " , " 0 " ) == " 1 "
2017-01-14 20:32:08 +01:00
2018-02-21 19:46:04 +01:00
2018-10-10 01:11:55 +02:00
def wait_for ( success , timeout = TIMEOUT ) :
2018-05-03 14:44:14 +02:00
start_time = time . time ( )
2018-10-10 01:11:55 +02:00
interval = 0.25
2018-05-03 14:44:14 +02:00
while not success ( ) and time . time ( ) < start_time + timeout :
time . sleep ( interval )
2018-10-10 01:11:55 +02:00
interval * = 2
if interval > 5 :
interval = 5
2018-05-03 14:44:14 +02:00
if time . time ( ) > start_time + timeout :
raise ValueError ( " Error waiting for {} " , success )
2018-06-20 09:23:52 +02:00
def write_config ( filename , opts , regtest_opts = None ) :
2017-01-14 20:32:08 +01:00
with open ( filename , ' w ' ) as f :
for k , v in opts . items ( ) :
f . write ( " {} = {} \n " . format ( k , v ) )
2018-06-20 09:23:52 +02:00
if regtest_opts :
f . write ( " [regtest] \n " )
for k , v in regtest_opts . items ( ) :
f . write ( " {} = {} \n " . format ( k , v ) )
2017-01-14 20:32:08 +01:00
2018-08-02 16:20:48 +02:00
def only_one ( arr ) :
""" Many JSON RPC calls return an array; often we only expect a single entry
"""
assert len ( arr ) == 1
return arr [ 0 ]
2018-08-03 14:16:33 +02:00
def sync_blockheight ( bitcoind , nodes ) :
height = bitcoind . rpc . getblockchaininfo ( ) [ ' blocks ' ]
for n in nodes :
wait_for ( lambda : n . rpc . getinfo ( ) [ ' blockheight ' ] == height )
2018-09-27 07:33:27 +02:00
def wait_channel_quiescent ( n1 , n2 ) :
wait_for ( lambda : only_one ( only_one ( n1 . rpc . listpeers ( n2 . info [ ' id ' ] ) [ ' peers ' ] ) [ ' channels ' ] ) [ ' htlcs ' ] == [ ] )
wait_for ( lambda : only_one ( only_one ( n2 . rpc . listpeers ( n1 . info [ ' id ' ] ) [ ' peers ' ] ) [ ' channels ' ] ) [ ' htlcs ' ] == [ ] )
2019-01-15 06:22:22 +01:00
def get_tx_p2wsh_outnum ( bitcoind , tx , amount ) :
""" Get output number of this tx which is p2wsh of amount """
decoded = bitcoind . rpc . decoderawtransaction ( tx , True )
for out in decoded [ ' vout ' ] :
if out [ ' scriptPubKey ' ] [ ' type ' ] == ' witness_v0_scripthash ' :
if out [ ' value ' ] == Decimal ( amount ) / 10 * * 8 :
return out [ ' n ' ]
return None
2017-01-14 20:32:08 +01:00
class TailableProc ( object ) :
""" A monitorable process that we can start, stop and tail.
This is the base class for the daemons . It allows us to directly
tail the processes and react to their output .
"""
2018-05-03 17:00:46 +02:00
def __init__ ( self , outputDir = None , verbose = True ) :
2017-01-14 20:32:08 +01:00
self . logs = [ ]
self . logs_cond = threading . Condition ( threading . RLock ( ) )
2018-08-17 06:16:34 +02:00
self . env = os . environ . copy ( )
2017-01-14 20:32:08 +01:00
self . running = False
self . proc = None
2017-05-02 07:33:35 +02:00
self . outputDir = outputDir
2017-06-20 07:39:03 +02:00
self . logsearch_start = 0
2017-09-28 05:31:47 +02:00
2018-05-03 17:00:46 +02:00
# Should we be logging lines we read from stdout?
self . verbose = verbose
2018-05-21 06:32:47 +02:00
# A filter function that'll tell us whether to filter out the line (not
# pass it to the log matcher and not print it to stdout).
self . log_filter = lambda line : False
2017-01-14 20:32:08 +01:00
def start ( self ) :
""" Start the underlying process and start monitoring it.
"""
logging . debug ( " Starting ' %s ' " , " " . join ( self . cmd_line ) )
2017-12-17 04:02:30 +01:00
self . proc = subprocess . Popen ( self . cmd_line , stdout = subprocess . PIPE , env = self . env )
2017-08-14 16:52:19 +02:00
self . thread = threading . Thread ( target = self . tail )
self . thread . daemon = True
2017-01-14 20:32:08 +01:00
self . thread . start ( )
self . running = True
2017-10-03 06:12:01 +02:00
def save_log ( self ) :
2017-05-02 07:33:35 +02:00
if self . outputDir :
logpath = os . path . join ( self . outputDir , ' log ' )
with open ( logpath , ' w ' ) as f :
for l in self . logs :
f . write ( l + ' \n ' )
2017-10-03 06:12:01 +02:00
def stop ( self , timeout = 10 ) :
self . save_log ( )
2017-09-06 03:07:19 +02:00
self . proc . terminate ( )
2017-09-28 05:31:47 +02:00
# Now give it some time to react to the signal
rc = self . proc . wait ( timeout )
if rc is None :
self . proc . kill ( )
2017-09-06 03:07:19 +02:00
self . proc . wait ( )
self . thread . join ( )
2017-01-14 20:32:08 +01:00
2017-12-13 13:44:09 +01:00
if self . proc . returncode :
raise ValueError ( " Process ' {} ' did not cleanly shutdown: return code {} " . format ( self . proc . pid , rc ) )
2017-09-28 05:31:47 +02:00
return self . proc . returncode
2018-01-17 21:29:50 +01:00
def kill ( self ) :
""" Kill process without giving it warning. """
self . proc . kill ( )
self . proc . wait ( )
self . thread . join ( )
2018-01-24 14:18:12 +01:00
2017-01-14 20:32:08 +01:00
def tail ( self ) :
""" Tail the stdout of the process and remember it.
Stores the lines of output produced by the process in
self . logs and signals that a new line was read so that it can
be picked up by consumers .
"""
for line in iter ( self . proc . stdout . readline , ' ' ) :
if len ( line ) == 0 :
break
2018-05-21 06:32:47 +02:00
if self . log_filter ( line . decode ( ' ASCII ' ) ) :
continue
2018-05-03 17:00:46 +02:00
if self . verbose :
logging . debug ( " %s : %s " , self . prefix , line . decode ( ) . rstrip ( ) )
2017-01-14 20:32:08 +01:00
with self . logs_cond :
self . logs . append ( str ( line . rstrip ( ) ) )
self . logs_cond . notifyAll ( )
self . running = False
2018-01-07 18:21:37 +01:00
self . proc . stdout . close ( )
2017-04-29 03:02:27 +02:00
2018-02-21 19:46:04 +01:00
def is_in_log ( self , regex , start = 0 ) :
2017-04-29 03:02:27 +02:00
""" Look for `regex` in the logs. """
ex = re . compile ( regex )
2018-01-29 02:53:14 +01:00
for l in self . logs [ start : ] :
2017-04-29 03:02:27 +02:00
if ex . search ( l ) :
logging . debug ( " Found ' %s ' in logs " , regex )
2018-03-26 02:08:11 +02:00
return l
2017-04-29 03:02:27 +02:00
logging . debug ( " Did not find ' %s ' in logs " , regex )
2018-03-26 02:08:11 +02:00
return None
2017-04-29 03:02:27 +02:00
2018-05-10 09:49:34 +02:00
def wait_for_logs ( self , regexs , timeout = TIMEOUT ) :
2017-07-01 09:10:18 +02:00
""" Look for `regexs` in the logs.
2017-01-14 20:32:08 +01:00
2017-07-01 09:10:18 +02:00
We tail the stdout of the process and look for each regex in ` regexs ` ,
starting from last of the previous waited - for log entries ( if any ) . We
2017-06-20 07:39:03 +02:00
fail if the timeout is exceeded or if the underlying process
2017-07-01 09:10:18 +02:00
exits before all the ` regexs ` were found .
2018-01-30 11:10:13 +01:00
If timeout is None , no time - out is applied .
2017-01-14 20:32:08 +01:00
"""
2017-07-01 09:10:18 +02:00
logging . debug ( " Waiting for {} in the logs " . format ( regexs ) )
2017-09-26 06:57:31 +02:00
exs = [ re . compile ( r ) for r in regexs ]
2017-01-14 20:32:08 +01:00
start_time = time . time ( )
2017-06-20 07:39:03 +02:00
pos = self . logsearch_start
2017-01-14 20:32:08 +01:00
while True :
2018-01-30 11:10:13 +01:00
if timeout is not None and time . time ( ) > start_time + timeout :
print ( " Time-out: can ' t find {} in logs " . format ( exs ) )
2017-07-01 09:10:18 +02:00
for r in exs :
if self . is_in_log ( r ) :
print ( " ( {} was previously in logs!) " . format ( r ) )
raise TimeoutError ( ' Unable to find " {} " in logs. ' . format ( exs ) )
2017-01-14 20:32:08 +01:00
elif not self . running :
raise ValueError ( ' Process died while waiting for logs ' )
with self . logs_cond :
if pos > = len ( self . logs ) :
self . logs_cond . wait ( 1 )
continue
2017-07-01 09:10:18 +02:00
for r in exs . copy ( ) :
2018-02-21 19:46:04 +01:00
self . logsearch_start = pos + 1
2017-07-01 09:10:18 +02:00
if r . search ( self . logs [ pos ] ) :
logging . debug ( " Found ' %s ' in logs " , r )
exs . remove ( r )
2017-09-26 06:57:31 +02:00
break
2017-07-01 09:10:18 +02:00
if len ( exs ) == 0 :
2017-01-14 20:32:08 +01:00
return self . logs [ pos ]
pos + = 1
2018-05-10 09:49:34 +02:00
def wait_for_log ( self , regex , timeout = TIMEOUT ) :
2017-07-01 09:10:18 +02:00
""" Look for `regex` in the logs.
Convenience wrapper for the common case of only seeking a single entry .
"""
return self . wait_for_logs ( [ regex ] , timeout )
2017-01-14 20:32:08 +01:00
2017-12-22 15:17:22 +01:00
2017-04-27 14:17:16 +02:00
class SimpleBitcoinProxy :
""" Wrapper for BitcoinProxy to reconnect.
Long wait times between calls to the Bitcoin RPC could result in
` bitcoind ` closing the connection , so here we just create
throwaway connections . This is easier than to reach into the RPC
library to close , reopen and reauth upon failure .
"""
2017-12-22 15:17:22 +01:00
def __init__ ( self , btc_conf_file , * args , * * kwargs ) :
2018-02-21 19:46:04 +01:00
self . __btc_conf_file__ = btc_conf_file
2017-04-27 14:17:16 +02:00
def __getattr__ ( self , name ) :
if name . startswith ( ' __ ' ) and name . endswith ( ' __ ' ) :
# Python internal stuff
raise AttributeError
# Create a callable to do the actual call
2017-12-22 15:17:22 +01:00
proxy = BitcoinProxy ( btc_conf_file = self . __btc_conf_file__ )
2018-08-21 14:19:20 +02:00
def f ( * args ) :
return proxy . _call ( name , * args )
2017-04-27 14:17:16 +02:00
# Make debuggers show <function bitcoin.rpc.name> rather than <function
# bitcoin.rpc.<lambda>>
f . __name__ = name
return f
2017-01-14 20:32:08 +01:00
class BitcoinD ( TailableProc ) :
2018-05-04 19:19:44 +02:00
def __init__ ( self , bitcoin_dir = " /tmp/bitcoind-test " , rpcport = None ) :
2018-05-03 17:00:46 +02:00
TailableProc . __init__ ( self , bitcoin_dir , verbose = False )
2017-09-28 05:31:47 +02:00
2018-05-04 19:19:44 +02:00
if rpcport is None :
rpcport = reserve ( )
2017-01-14 20:32:08 +01:00
self . bitcoin_dir = bitcoin_dir
self . rpcport = rpcport
self . prefix = ' bitcoind '
regtestdir = os . path . join ( bitcoin_dir , ' regtest ' )
if not os . path . exists ( regtestdir ) :
os . makedirs ( regtestdir )
self . cmd_line = [
2017-01-24 05:07:56 +01:00
' bitcoind ' ,
2017-01-14 20:32:08 +01:00
' -datadir= {} ' . format ( bitcoin_dir ) ,
' -printtoconsole ' ,
' -server ' ,
' -logtimestamps ' ,
' -nolisten ' ,
]
2018-06-20 09:23:52 +02:00
# For up to and including 0.16.1, this needs to be in main section.
2017-01-14 20:32:08 +01:00
BITCOIND_CONFIG [ ' rpcport ' ] = rpcport
2018-06-20 09:23:52 +02:00
# For after 0.16.1 (eg. 3f398d7a17f136cd4a67998406ca41a124ae2966), this
# needs its own [regtest] section.
BITCOIND_REGTEST = { ' rpcport ' : rpcport }
2018-06-20 09:19:54 +02:00
btc_conf_file = os . path . join ( bitcoin_dir , ' bitcoin.conf ' )
2018-06-20 09:23:52 +02:00
write_config ( btc_conf_file , BITCOIND_CONFIG , BITCOIND_REGTEST )
2017-12-22 15:17:22 +01:00
self . rpc = SimpleBitcoinProxy ( btc_conf_file = btc_conf_file )
2017-01-14 20:32:08 +01:00
def start ( self ) :
TailableProc . start ( self )
2018-05-10 09:49:34 +02:00
self . wait_for_log ( " Done loading " , timeout = TIMEOUT )
2017-02-08 15:12:08 +01:00
2017-01-14 20:32:08 +01:00
logging . info ( " BitcoinD started " )
2017-11-28 05:39:22 +01:00
def generate_block ( self , numblocks = 1 ) :
# As of 0.16, generate() is removed; use generatetoaddress.
2018-10-26 07:34:56 +02:00
return self . rpc . generatetoaddress ( numblocks , self . rpc . getnewaddress ( ) )
2017-11-28 05:39:22 +01:00
2018-02-21 19:46:04 +01:00
2017-01-14 20:32:08 +01:00
class LightningD ( TailableProc ) :
2018-09-04 16:00:09 +02:00
def __init__ ( self , lightning_dir , bitcoind , port = 9735 , random_hsm = False , node_id = 0 ) :
2017-05-02 07:33:35 +02:00
TailableProc . __init__ ( self , lightning_dir )
2018-11-20 06:24:17 +01:00
self . executable = ' lightningd/lightningd '
2017-01-14 20:32:08 +01:00
self . lightning_dir = lightning_dir
self . port = port
2018-04-04 18:16:48 +02:00
self . cmd_prefix = [ ]
2018-08-08 16:08:58 +02:00
self . disconnect_file = None
2018-04-04 18:16:48 +02:00
2018-09-04 16:00:09 +02:00
self . rpcproxy = BitcoinRpcProxy ( bitcoind )
2018-04-04 18:16:48 +02:00
self . opts = LIGHTNINGD_CONFIG . copy ( )
opts = {
' lightning-dir ' : lightning_dir ,
2018-05-07 06:29:22 +02:00
' addr ' : ' 127.0.0.1: {} ' . format ( port ) ,
2018-04-04 18:16:48 +02:00
' allow-deprecated-apis ' : ' false ' ,
' network ' : ' regtest ' ,
' ignore-fee-limits ' : ' false ' ,
2018-08-21 22:26:02 +02:00
' bitcoin-rpcuser ' : BITCOIND_CONFIG [ ' rpcuser ' ] ,
' bitcoin-rpcpassword ' : BITCOIND_CONFIG [ ' rpcpassword ' ] ,
2018-04-04 18:16:48 +02:00
}
for k , v in opts . items ( ) :
self . opts [ k ] = v
2018-05-03 14:20:29 +02:00
if not os . path . exists ( lightning_dir ) :
os . makedirs ( lightning_dir )
2017-10-22 15:32:16 +02:00
# Last 32-bytes of final part of dir -> seed.
seed = ( bytes ( re . search ( ' ([^/]+)/*$ ' , lightning_dir ) . group ( 1 ) , encoding = ' utf-8 ' ) + bytes ( 32 ) ) [ : 32 ]
2018-05-03 14:20:29 +02:00
if not random_hsm :
with open ( os . path . join ( lightning_dir , ' hsm_secret ' ) , ' wb ' ) as f :
f . write ( seed )
2017-10-24 04:06:14 +02:00
if DEVELOPER :
2018-04-04 18:16:48 +02:00
self . opts [ ' dev-broadcast-interval ' ] = 1000
2018-05-17 06:08:24 +02:00
self . opts [ ' dev-bitcoind-poll ' ] = 1
2018-05-05 17:15:53 +02:00
self . prefix = ' lightningd- %d ' % ( node_id )
2017-01-14 20:32:08 +01:00
2018-08-08 16:08:58 +02:00
def cleanup ( self ) :
# To force blackhole to exit, disconnect file must be truncated!
if self . disconnect_file :
with open ( self . disconnect_file , " w " ) as f :
f . truncate ( )
2018-04-04 18:16:48 +02:00
@property
def cmd_line ( self ) :
opts = [ ]
2018-12-04 03:04:55 +01:00
for k , v in self . opts . items ( ) :
2018-04-04 18:16:48 +02:00
if v is None :
opts . append ( " -- {} " . format ( k ) )
2018-06-23 06:29:21 +02:00
elif isinstance ( v , list ) :
for i in v :
opts . append ( " -- {} = {} " . format ( k , i ) )
2018-04-04 18:16:48 +02:00
else :
opts . append ( " -- {} = {} " . format ( k , v ) )
2018-11-20 06:24:17 +01:00
return self . cmd_prefix + [ self . executable ] + opts
2018-04-04 18:16:48 +02:00
2017-01-14 20:32:08 +01:00
def start ( self ) :
2018-09-04 16:00:09 +02:00
self . rpcproxy . start ( )
self . opts [ ' bitcoin-rpcport ' ] = self . rpcproxy . rpcport
2017-01-14 20:32:08 +01:00
TailableProc . start ( self )
2018-01-24 20:33:47 +01:00
self . wait_for_log ( " Server started with public key " )
2017-01-14 20:32:08 +01:00
logging . info ( " LightningD started " )
2017-09-28 05:31:47 +02:00
def wait ( self , timeout = 10 ) :
""" Wait for the daemon to stop for up to timeout seconds
Returns the returncode of the process , None if the process did
not return before the timeout triggers .
"""
self . proc . wait ( timeout )
2018-09-04 16:00:09 +02:00
self . rpcproxy . stop ( )
2017-09-28 05:31:47 +02:00
return self . proc . returncode
2017-01-14 20:32:08 +01:00
2018-02-21 19:46:04 +01:00
2017-01-14 20:32:08 +01:00
class LightningNode ( object ) :
2018-04-23 12:07:44 +02:00
def __init__ ( self , daemon , rpc , btc , executor , may_fail = False , may_reconnect = False ) :
2017-01-14 20:32:08 +01:00
self . rpc = rpc
self . daemon = daemon
self . bitcoin = btc
self . executor = executor
2017-09-28 18:01:09 +02:00
self . may_fail = may_fail
2018-04-23 12:07:44 +02:00
self . may_reconnect = may_reconnect
2017-01-14 20:32:08 +01:00
2018-12-13 17:35:01 +01:00
def connect ( self , remote_node ) :
self . rpc . connect ( remote_node . info [ ' id ' ] , ' 127.0.0.1 ' , remote_node . daemon . port )
def is_connected ( self , remote_node ) :
return remote_node . info [ ' id ' ] in [ p [ ' id ' ] for p in self . rpc . listpeers ( ) [ ' peers ' ] ]
2018-12-08 00:27:14 +01:00
def openchannel ( self , remote_node , capacity , addrtype = " p2sh-segwit " , confirm = True , wait_for_announce = True , connect = True ) :
2018-08-03 17:29:38 +02:00
addr , wallettxid = self . fundwallet ( 10 * capacity , addrtype )
2018-10-17 15:37:15 +02:00
2018-12-13 17:35:01 +01:00
if connect and not self . is_connected ( remote_node ) :
self . connect ( remote_node )
2018-10-17 15:37:15 +02:00
2018-01-24 19:37:23 +01:00
fundingtx = self . rpc . fundchannel ( remote_node . info [ ' id ' ] , capacity )
2018-05-29 13:50:34 +02:00
# Wait for the funding transaction to be in bitcoind's mempool
wait_for ( lambda : fundingtx [ ' txid ' ] in self . bitcoin . rpc . getrawmempool ( ) )
2018-12-08 00:27:14 +01:00
if confirm or wait_for_announce :
2018-05-29 13:50:34 +02:00
self . bitcoin . generate_block ( 1 )
2018-12-08 00:27:14 +01:00
if wait_for_announce :
2018-05-29 13:50:34 +02:00
self . bitcoin . generate_block ( 5 )
2018-12-08 00:27:14 +01:00
if confirm or wait_for_announce :
2018-05-29 13:50:34 +02:00
self . daemon . wait_for_log (
r ' Funding tx {} depth ' . format ( fundingtx [ ' txid ' ] ) )
2018-01-24 19:37:23 +01:00
return { ' address ' : addr , ' wallettxid ' : wallettxid , ' fundingtx ' : fundingtx }
def fundwallet ( self , sats , addrtype = " p2sh-segwit " ) :
addr = self . rpc . newaddr ( addrtype ) [ ' address ' ]
2018-08-03 17:29:38 +02:00
txid = self . bitcoin . rpc . sendtoaddress ( addr , sats / 10 * * 8 )
2018-01-24 19:37:23 +01:00
self . bitcoin . generate_block ( 1 )
self . daemon . wait_for_log ( ' Owning output .* txid {} ' . format ( txid ) )
return addr , txid
2017-04-24 20:27:53 +02:00
2018-01-24 14:18:12 +01:00
def getactivechannels ( self ) :
return [ c for c in self . rpc . listchannels ( ) [ ' channels ' ] if c [ ' active ' ] ]
2018-07-31 06:34:12 +02:00
def db_query ( self , query , use_copy = True ) :
2018-01-24 15:47:35 +01:00
orig = os . path . join ( self . daemon . lightning_dir , " lightningd.sqlite3 " )
2018-07-31 06:34:12 +02:00
if use_copy :
copy = os . path . join ( self . daemon . lightning_dir , " lightningd-copy.sqlite3 " )
2018-08-04 12:31:31 +02:00
shutil . copyfile ( orig , copy )
2018-07-31 06:34:12 +02:00
db = sqlite3 . connect ( copy )
else :
db = sqlite3 . connect ( orig )
2018-01-24 15:47:35 +01:00
2017-08-14 15:59:35 +02:00
db . row_factory = sqlite3 . Row
c = db . cursor ( )
c . execute ( query )
rows = c . fetchall ( )
result = [ ]
for row in rows :
result . append ( dict ( zip ( row . keys ( ) , row ) ) )
2018-07-31 06:34:12 +02:00
db . commit ( )
2017-08-14 15:59:35 +02:00
c . close ( )
db . close ( )
return result
2017-09-20 06:45:41 +02:00
2018-02-17 05:41:08 +01:00
# Assumes node is stopped!
def db_manip ( self , query ) :
db = sqlite3 . connect ( os . path . join ( self . daemon . lightning_dir , " lightningd.sqlite3 " ) )
db . row_factory = sqlite3 . Row
c = db . cursor ( )
c . execute ( query )
db . commit ( )
c . close ( )
db . close ( )
2018-05-07 05:44:39 +02:00
def start ( self ) :
self . daemon . start ( )
2018-08-22 02:21:57 +02:00
# Cache `getinfo`, we'll be using it a lot
self . info = self . rpc . getinfo ( )
2018-05-07 06:28:12 +02:00
# This shortcut is sufficient for our simple tests.
2018-08-22 02:21:57 +02:00
self . port = self . info [ ' binding ' ] [ 0 ] [ ' port ' ]
2018-05-07 05:44:39 +02:00
2017-09-28 05:31:47 +02:00
def stop ( self , timeout = 10 ) :
""" Attempt to do a clean shutdown, but kill if it hangs
"""
# Tell the daemon to stop
try :
# May fail if the process already died
self . rpc . stop ( )
2018-02-24 18:59:33 +01:00
except Exception :
2017-09-28 05:31:47 +02:00
pass
rc = self . daemon . wait ( timeout )
# If it did not stop be more insistent
if rc is None :
rc = self . daemon . stop ( )
2017-10-03 06:12:01 +02:00
self . daemon . save_log ( )
2018-08-08 16:08:58 +02:00
self . daemon . cleanup ( )
2017-10-03 06:12:01 +02:00
2017-09-28 18:01:09 +02:00
if rc != 0 and not self . may_fail :
2017-09-28 05:31:47 +02:00
raise ValueError ( " Node did not exit cleanly, rc= {} " . format ( rc ) )
else :
return rc
2017-12-13 13:44:09 +01:00
def restart ( self , timeout = 10 , clean = True ) :
""" Stop and restart the lightning node.
Keyword arguments :
timeout : number of seconds to wait for a shutdown
clean : whether to issue a ` stop ` RPC command before killing
"""
if clean :
self . stop ( timeout )
else :
self . daemon . stop ( )
2018-05-07 05:44:39 +02:00
self . start ( )
2018-05-03 14:44:14 +02:00
2018-11-22 03:17:29 +01:00
def fund_channel ( self , l2 , amount , wait_for_active = True ) :
2018-05-03 14:44:14 +02:00
# Give yourself some funds to work with
addr = self . rpc . newaddr ( ) [ ' address ' ]
self . bitcoin . rpc . sendtoaddress ( addr , ( amount + 1000000 ) / 10 * * 8 )
numfunds = len ( self . rpc . listfunds ( ) [ ' outputs ' ] )
self . bitcoin . generate_block ( 1 )
wait_for ( lambda : len ( self . rpc . listfunds ( ) [ ' outputs ' ] ) > numfunds )
# Now go ahead and open a channel
num_tx = len ( self . bitcoin . rpc . getrawmempool ( ) )
tx = self . rpc . fundchannel ( l2 . info [ ' id ' ] , amount ) [ ' tx ' ]
wait_for ( lambda : len ( self . bitcoin . rpc . getrawmempool ( ) ) == num_tx + 1 )
self . bitcoin . generate_block ( 1 )
# Hacky way to find our output.
2019-01-15 06:22:22 +01:00
scid = " {} x1x {} " . format ( self . bitcoin . rpc . getblockcount ( ) ,
get_tx_p2wsh_outnum ( self . bitcoin , tx , amount ) )
2018-08-08 16:07:58 +02:00
2018-11-22 03:17:29 +01:00
if wait_for_active :
# We wait until gossipd sees both local updates, as well as status NORMAL,
# so it can definitely route through.
2019-01-15 05:11:27 +01:00
self . daemon . wait_for_logs ( [ r ' update for channel {} /0 now ACTIVE '
2018-11-22 03:17:29 +01:00
. format ( scid ) ,
2019-01-15 05:11:27 +01:00
r ' update for channel {} /1 now ACTIVE '
2018-11-22 03:17:29 +01:00
. format ( scid ) ,
' to CHANNELD_NORMAL ' ] )
2019-01-15 05:11:27 +01:00
l2 . daemon . wait_for_logs ( [ r ' update for channel {} /0 now ACTIVE '
2018-11-22 03:17:29 +01:00
. format ( scid ) ,
2019-01-15 05:11:27 +01:00
r ' update for channel {} /1 now ACTIVE '
2018-11-22 03:17:29 +01:00
. format ( scid ) ,
' to CHANNELD_NORMAL ' ] )
2018-08-08 16:07:58 +02:00
return scid
2018-05-28 18:10:24 +02:00
2018-05-31 04:32:27 +02:00
def subd_pid ( self , subd ) :
""" Get the process id of the given subdaemon, eg channeld or gossipd """
2018-06-04 06:28:52 +02:00
ex = re . compile ( r ' lightning_ {} .*: pid ([0-9]*), ' . format ( subd ) )
# Make sure we get latest one if it's restarted!
for l in reversed ( self . daemon . logs ) :
group = ex . search ( l )
if group :
return group . group ( 1 )
raise ValueError ( " No daemon {} found " . format ( subd ) )
2018-05-31 04:32:27 +02:00
2018-08-02 16:23:04 +02:00
def channel_state ( self , other ) :
""" Return the state of the channel to the other node.
Returns None if there is no such peer , or a channel hasn ' t been funded
yet .
"""
peers = self . rpc . listpeers ( other . info [ ' id ' ] ) [ ' peers ' ]
if not peers or ' channels ' not in peers [ 0 ] :
return None
channel = peers [ 0 ] [ ' channels ' ] [ 0 ]
return channel [ ' state ' ]
def get_channel_scid ( self , other ) :
""" Get the short_channel_id for the channel to the other node.
"""
peers = self . rpc . listpeers ( other . info [ ' id ' ] ) [ ' peers ' ]
if not peers or ' channels ' not in peers [ 0 ] :
return None
channel = peers [ 0 ] [ ' channels ' ] [ 0 ]
return channel [ ' short_channel_id ' ]
2018-05-28 18:10:24 +02:00
def is_channel_active ( self , chanid ) :
channels = self . rpc . listchannels ( ) [ ' channels ' ]
2018-09-20 02:59:46 +02:00
active = [ ( c [ ' short_channel_id ' ] , c [ ' channel_flags ' ] ) for c in channels if c [ ' active ' ] ]
2018-05-28 18:10:24 +02:00
return ( chanid , 0 ) in active and ( chanid , 1 ) in active
2018-09-19 06:06:07 +02:00
def wait_for_channel_onchain ( self , peerid ) :
txid = only_one ( only_one ( self . rpc . listpeers ( peerid ) [ ' peers ' ] ) [ ' channels ' ] ) [ ' scratch_txid ' ]
wait_for ( lambda : txid in self . bitcoin . rpc . getrawmempool ( ) )
2018-05-28 18:10:24 +02:00
def wait_channel_active ( self , chanid ) :
2018-10-10 01:11:55 +02:00
wait_for ( lambda : self . is_channel_active ( chanid ) )
2018-08-02 16:20:48 +02:00
2018-08-02 23:47:36 +02:00
# This waits until gossipd sees channel_update in both directions
# (or for local channels, at least a local announcement)
def wait_for_routes ( self , channel_ids ) :
# Could happen in any order...
2019-01-15 05:11:27 +01:00
self . daemon . wait_for_logs ( [ ' Received channel_update for channel {} /0 ' . format ( c )
2018-10-26 07:49:53 +02:00
for c in channel_ids ]
2019-01-15 05:11:27 +01:00
+ [ ' Received channel_update for channel {} /1 ' . format ( c )
2018-10-26 07:49:53 +02:00
for c in channel_ids ] )
2018-08-02 23:47:36 +02:00
2018-08-03 14:16:33 +02:00
def pay ( self , dst , amt , label = None ) :
if not label :
label = ' ' . join ( random . choice ( string . ascii_letters + string . digits ) for _ in range ( 20 ) )
rhash = dst . rpc . invoice ( amt , label , label ) [ ' payment_hash ' ]
invoices = dst . rpc . listinvoices ( label ) [ ' invoices ' ]
assert len ( invoices ) == 1 and invoices [ 0 ] [ ' status ' ] == ' unpaid '
routestep = {
' msatoshi ' : amt ,
' id ' : dst . info [ ' id ' ] ,
' delay ' : 5 ,
2019-01-10 22:42:18 +01:00
' channel ' : ' 1x1x1 '
2018-08-03 14:16:33 +02:00
}
def wait_pay ( ) :
# Up to 10 seconds for payment to succeed.
start_time = time . time ( )
while dst . rpc . listinvoices ( label ) [ ' invoices ' ] [ 0 ] [ ' status ' ] != ' paid ' :
if time . time ( ) > start_time + 10 :
raise TimeoutError ( ' Payment timed out ' )
time . sleep ( 0.1 )
# sendpay is async now
self . rpc . sendpay ( [ routestep ] , rhash )
# wait for sendpay to comply
self . rpc . waitsendpay ( rhash )
2018-08-22 02:14:57 +02:00
# Note: this feeds through the smoother in update_feerate, so changing
# it on a running daemon may not give expected result!
def set_feerates ( self , feerates , wait_for_effect = True ) :
# (bitcoind returns bitcoin per kb, so these are * 4)
2018-09-05 01:01:50 +02:00
def mock_estimatesmartfee ( r ) :
params = r [ ' params ' ]
if params == [ 2 , ' CONSERVATIVE ' ] :
feerate = feerates [ 0 ] * 4
elif params == [ 4 , ' ECONOMICAL ' ] :
feerate = feerates [ 1 ] * 4
elif params == [ 100 , ' ECONOMICAL ' ] :
feerate = feerates [ 2 ] * 4
else :
raise ValueError ( )
return {
' id ' : r [ ' id ' ] ,
' error ' : None ,
' result ' : {
' feerate ' : Decimal ( feerate ) / 10 * * 8
} ,
}
self . daemon . rpcproxy . mock_rpc ( ' estimatesmartfee ' , mock_estimatesmartfee )
2018-09-20 06:22:15 +02:00
# Technically, this waits until it's called, not until it's processed.
# We wait until all three levels have been called.
2018-08-22 02:14:57 +02:00
if wait_for_effect :
2018-09-20 06:22:15 +02:00
wait_for ( lambda : self . daemon . rpcproxy . mock_counts [ ' estimatesmartfee ' ] > = 3 )
2018-08-22 02:14:57 +02:00
2018-09-20 08:02:10 +02:00
def wait_for_onchaind_broadcast ( self , name , resolve = None ) :
""" Wait for onchaind to drop tx name to resolve (if any) """
if resolve :
r = self . daemon . wait_for_log ( ' Broadcasting {} .* to resolve {} '
. format ( name , resolve ) )
else :
r = self . daemon . wait_for_log ( ' Broadcasting {} .* to resolve '
. format ( name ) )
2018-10-04 20:51:49 +02:00
rawtx = re . search ( r ' .* \ (([0-9a-fA-F]*) \ ) ' , r ) . group ( 1 )
2018-09-20 08:02:10 +02:00
txid = self . bitcoin . rpc . decoderawtransaction ( rawtx , True ) [ ' txid ' ]
wait_for ( lambda : txid in self . bitcoin . rpc . getrawmempool ( ) )
2018-08-02 16:20:48 +02:00
class NodeFactory ( object ) :
""" A factory to setup and start `lightningd` daemons.
"""
def __init__ ( self , testname , bitcoind , executor , directory ) :
self . testname = testname
self . next_id = 1
self . nodes = [ ]
self . executor = executor
self . bitcoind = bitcoind
self . directory = directory
self . lock = threading . Lock ( )
def split_options ( self , opts ) :
""" Split node options from cli options
Some options are used to instrument the node wrapper and some are passed
to the daemon on the command line . Split them so we know where to use
them .
"""
node_opt_keys = [
' disconnect ' ,
' may_fail ' ,
' may_reconnect ' ,
' random_hsm ' ,
2018-09-24 03:41:14 +02:00
' log_all_io ' ,
2018-08-02 16:20:48 +02:00
]
node_opts = { k : v for k , v in opts . items ( ) if k in node_opt_keys }
cli_opts = { k : v for k , v in opts . items ( ) if k not in node_opt_keys }
return node_opts , cli_opts
def get_next_port ( self ) :
with self . lock :
return reserve ( )
def get_nodes ( self , num_nodes , opts = None ) :
""" Start a number of nodes in parallel, each with its own options
"""
if opts is None :
# No opts were passed in, give some dummy opts
opts = [ { } for _ in range ( num_nodes ) ]
elif isinstance ( opts , dict ) :
# A single dict was passed in, so we use these opts for all nodes
opts = [ opts ] * num_nodes
assert len ( opts ) == num_nodes
jobs = [ ]
for i in range ( num_nodes ) :
node_opts , cli_opts = self . split_options ( opts [ i ] )
jobs . append ( self . executor . submit ( self . get_node , options = cli_opts , * * node_opts ) )
return [ j . result ( ) for j in jobs ]
2018-09-05 10:47:39 +02:00
def get_node ( self , disconnect = None , options = None , may_fail = False ,
may_reconnect = False , random_hsm = False ,
feerates = ( 15000 , 7500 , 3750 ) , start = True , log_all_io = False ) :
2018-08-02 16:20:48 +02:00
with self . lock :
node_id = self . next_id
self . next_id + = 1
port = self . get_next_port ( )
lightning_dir = os . path . join (
self . directory , " lightning- {} / " . format ( node_id ) )
if os . path . exists ( lightning_dir ) :
shutil . rmtree ( lightning_dir )
socket_path = os . path . join ( lightning_dir , " lightning-rpc " ) . format ( node_id )
daemon = LightningD (
2018-09-04 16:00:09 +02:00
lightning_dir , self . bitcoind ,
port = port , random_hsm = random_hsm , node_id = node_id
2018-08-02 16:20:48 +02:00
)
# If we have a disconnect string, dump it to a file for daemon.
if disconnect :
2018-08-08 16:08:58 +02:00
daemon . disconnect_file = os . path . join ( lightning_dir , " dev_disconnect " )
with open ( daemon . disconnect_file , " w " ) as f :
2018-08-02 16:20:48 +02:00
f . write ( " \n " . join ( disconnect ) )
daemon . opts [ " dev-disconnect " ] = " dev_disconnect "
2018-08-17 06:16:34 +02:00
if log_all_io :
assert DEVELOPER
daemon . env [ " LIGHTNINGD_DEV_LOG_IO " ] = " 1 "
daemon . opts [ " log-level " ] = " io "
2018-08-02 16:20:48 +02:00
if DEVELOPER :
daemon . opts [ " dev-fail-on-subdaemon-fail " ] = None
daemon . env [ " LIGHTNINGD_DEV_MEMLEAK " ] = " 1 "
2018-09-04 04:54:32 +02:00
if os . getenv ( " DEBUG_SUBD " ) :
daemon . opts [ " dev-debugger " ] = os . getenv ( " DEBUG_SUBD " )
2018-08-02 16:20:48 +02:00
if VALGRIND :
daemon . env [ " LIGHTNINGD_DEV_NO_BACKTRACE " ] = " 1 "
if not may_reconnect :
daemon . opts [ " dev-no-reconnect " ] = None
if options is not None :
daemon . opts . update ( options )
rpc = LightningRpc ( socket_path , self . executor )
node = LightningNode ( daemon , rpc , self . bitcoind , self . executor , may_fail = may_fail ,
may_reconnect = may_reconnect )
2018-08-22 02:14:57 +02:00
# Regtest estimatefee are unusable, so override.
node . set_feerates ( feerates , False )
2018-08-02 16:20:48 +02:00
self . nodes . append ( node )
if VALGRIND :
node . daemon . cmd_prefix = [
' valgrind ' ,
' -q ' ,
' --trace-children=yes ' ,
2019-01-15 05:07:27 +01:00
' --trace-children-skip=*python*,*bitcoin-cli* ' ,
2018-08-02 16:20:48 +02:00
' --error-exitcode=7 ' ,
' --log-file= {} /valgrind-errors. % p ' . format ( node . daemon . lightning_dir )
]
2018-08-22 02:21:57 +02:00
if start :
try :
node . start ( )
except Exception :
node . daemon . stop ( )
raise
2018-08-02 16:20:48 +02:00
return node
2018-12-08 00:38:41 +01:00
def line_graph ( self , num_nodes , fundchannel = True , fundamount = 10 * * 6 , wait_for_announce = False , opts = None , announce_channels = True ) :
2018-08-02 16:20:48 +02:00
""" Create nodes, connect them and optionally fund channels.
"""
2018-12-08 00:38:41 +01:00
assert not ( wait_for_announce and not announce_channels ) , " You ' ve asked to wait for an announcement that ' s not coming. (wait_for_announce=True,announce_channels=False) "
2018-08-02 18:30:23 +02:00
nodes = self . get_nodes ( num_nodes , opts = opts )
2018-08-02 16:20:48 +02:00
bitcoin = nodes [ 0 ] . bitcoin
2018-08-02 23:47:36 +02:00
connections = [ ( nodes [ i ] , nodes [ i + 1 ] ) for i in range ( 0 , num_nodes - 1 ) ]
2018-08-02 16:20:48 +02:00
for src , dst in connections :
src . rpc . connect ( dst . info [ ' id ' ] , ' localhost ' , dst . port )
2018-08-09 02:25:19 +02:00
# If we're returning now, make sure dst all show connections in
# getpeers.
2018-08-02 16:20:48 +02:00
if not fundchannel :
2018-08-09 02:25:19 +02:00
for src , dst in connections :
dst . daemon . wait_for_log ( ' openingd- {} chan #[0-9]*: Handed peer, entering loop ' . format ( src . info [ ' id ' ] ) )
2018-08-02 16:20:48 +02:00
return nodes
# If we got here, we want to fund channels
for src , dst in connections :
addr = src . rpc . newaddr ( ) [ ' address ' ]
src . bitcoin . rpc . sendtoaddress ( addr , ( fundamount + 1000000 ) / 10 * * 8 )
bitcoin . generate_block ( 1 )
for src , dst in connections :
wait_for ( lambda : len ( src . rpc . listfunds ( ) [ ' outputs ' ] ) > 0 )
2018-12-08 00:38:41 +01:00
tx = src . rpc . fundchannel ( dst . info [ ' id ' ] , fundamount , announce = announce_channels )
2018-08-02 16:20:48 +02:00
wait_for ( lambda : tx [ ' txid ' ] in bitcoin . rpc . getrawmempool ( ) )
# Confirm all channels and wait for them to become usable
bitcoin . generate_block ( 1 )
2018-10-09 10:41:52 +02:00
scids = [ ]
2018-08-02 16:20:48 +02:00
for src , dst in connections :
wait_for ( lambda : src . channel_state ( dst ) == ' CHANNELD_NORMAL ' )
scid = src . get_channel_scid ( dst )
2019-01-15 05:11:27 +01:00
src . daemon . wait_for_log ( r ' Received channel_update for channel {scid} /. now ACTIVE ' . format ( scid = scid ) )
2018-10-09 10:41:52 +02:00
scids . append ( scid )
2018-08-02 16:20:48 +02:00
2018-12-08 00:27:14 +01:00
if not wait_for_announce :
2018-08-02 16:20:48 +02:00
return nodes
bitcoin . generate_block ( 5 )
2018-10-09 10:41:52 +02:00
def both_dirs_ready ( n , scid ) :
resp = n . rpc . listchannels ( scid )
return [ a [ ' active ' ] for a in resp [ ' channels ' ] ] == [ True , True ]
# Make sure everyone sees all channels: we can cheat and
# simply check the ends (since it's a line).
wait_for ( lambda : both_dirs_ready ( nodes [ 0 ] , scids [ - 1 ] ) )
wait_for ( lambda : both_dirs_ready ( nodes [ - 1 ] , scids [ 0 ] ) )
2018-10-24 06:04:02 +02:00
# Make sure we have all node announcements, too (just check ends)
for n in nodes :
for end in ( nodes [ 0 ] , nodes [ - 1 ] ) :
wait_for ( lambda : ' alias ' in only_one ( end . rpc . listnodes ( n . info [ ' id ' ] ) [ ' nodes ' ] ) )
2018-08-02 16:20:48 +02:00
return nodes
def killall ( self , expected_successes ) :
""" Returns true if every node we expected to succeed actually succeeded """
unexpected_fail = False
for i in range ( len ( self . nodes ) ) :
leaks = None
# leak detection upsets VALGRIND by reading uninitialized mem.
# If it's dead, we'll catch it below.
if not VALGRIND :
try :
# This also puts leaks in log.
leaks = self . nodes [ i ] . rpc . dev_memleak ( ) [ ' leaks ' ]
except Exception :
pass
try :
self . nodes [ i ] . stop ( )
except Exception :
if expected_successes [ i ] :
unexpected_fail = True
if leaks is not None and len ( leaks ) != 0 :
2018-11-21 11:29:59 +01:00
raise Exception ( " Node {} has memory leaks: {} " . format (
self . nodes [ i ] . daemon . lightning_dir ,
json . dumps ( leaks , sort_keys = True , indent = 4 )
) )
2018-08-02 16:20:48 +02:00
return not unexpected_fail