test: use f-strings in interface_*.py tests

This commit is contained in:
fanquake 2021-06-12 14:26:31 +08:00
parent 86d958262d
commit c2a5d560df
No known key found for this signature in database
GPG Key ID: 2EEB9F5CC09526C1
5 changed files with 54 additions and 54 deletions

View File

@ -87,12 +87,12 @@ class TestBitcoinCli(BitcoinTestFramework):
user, password = get_auth_cookie(self.nodes[0].datadir, self.chain)
self.log.info("Test -stdinrpcpass option")
assert_equal(BLOCKS, self.nodes[0].cli('-rpcuser={}'.format(user), '-stdinrpcpass', input=password).getblockcount())
assert_raises_process_error(1, 'Incorrect rpcuser or rpcpassword', self.nodes[0].cli('-rpcuser={}'.format(user), '-stdinrpcpass', input='foo').echo)
assert_equal(BLOCKS, self.nodes[0].cli(f'-rpcuser={user}', '-stdinrpcpass', input=password).getblockcount())
assert_raises_process_error(1, 'Incorrect rpcuser or rpcpassword', self.nodes[0].cli(f'-rpcuser={user}', '-stdinrpcpass', input='foo').echo)
self.log.info("Test -stdin and -stdinrpcpass")
assert_equal(['foo', 'bar'], self.nodes[0].cli('-rpcuser={}'.format(user), '-stdin', '-stdinrpcpass', input=password + '\nfoo\nbar').echo())
assert_raises_process_error(1, 'Incorrect rpcuser or rpcpassword', self.nodes[0].cli('-rpcuser={}'.format(user), '-stdin', '-stdinrpcpass', input='foo').echo)
assert_equal(['foo', 'bar'], self.nodes[0].cli(f'-rpcuser={user}', '-stdin', '-stdinrpcpass', input=f'{password}\nfoo\nbar').echo())
assert_raises_process_error(1, 'Incorrect rpcuser or rpcpassword', self.nodes[0].cli(f'-rpcuser={user}', '-stdin', '-stdinrpcpass', input='foo').echo)
self.log.info("Test connecting to a non-existing server")
assert_raises_process_error(1, "Could not connect to the server", self.nodes[0].cli('-rpcport=1').echo)
@ -150,8 +150,8 @@ class TestBitcoinCli(BitcoinTestFramework):
w1 = self.nodes[0].get_wallet_rpc(wallets[0])
w2 = self.nodes[0].get_wallet_rpc(wallets[1])
w3 = self.nodes[0].get_wallet_rpc(wallets[2])
rpcwallet2 = '-rpcwallet={}'.format(wallets[1])
rpcwallet3 = '-rpcwallet={}'.format(wallets[2])
rpcwallet2 = f'-rpcwallet={wallets[1]}'
rpcwallet3 = f'-rpcwallet={wallets[2]}'
w1.walletpassphrase(password, self.rpc_timeout)
w2.encryptwallet(password)
w1.sendtoaddress(w2.getnewaddress(), amounts[1])
@ -162,7 +162,7 @@ class TestBitcoinCli(BitcoinTestFramework):
self.log.info("Test -getinfo with multiple wallets and -rpcwallet returns specified wallet balance")
for i in range(len(wallets)):
cli_get_info_string = self.nodes[0].cli('-getinfo', '-rpcwallet={}'.format(wallets[i])).send_cli()
cli_get_info_string = self.nodes[0].cli('-getinfo', f'-rpcwallet={wallets[i]}').send_cli()
cli_get_info = cli_get_info_string_to_dict(cli_get_info_string)
assert 'Balances' not in cli_get_info_string
assert_equal(cli_get_info["Wallet"], wallets[i])
@ -296,7 +296,7 @@ class TestBitcoinCli(BitcoinTestFramework):
self.log.info("Test -version with node stopped")
self.stop_node(0)
cli_response = self.nodes[0].cli('-version').send_cli()
assert "{} RPC client version".format(self.config['environment']['PACKAGE_NAME']) in cli_response
assert f"{self.config['environment']['PACKAGE_NAME']} RPC client version" in cli_response
self.log.info("Test -rpcwait option successfully waits for RPC connection")
self.nodes[0].start() # start node without RPC connection

View File

@ -24,8 +24,8 @@ class HTTPBasicsTest (BitcoinTestFramework):
# lowlevel check for http persistent connection #
#################################################
url = urllib.parse.urlparse(self.nodes[0].url)
authpair = url.username + ':' + url.password
headers = {"Authorization": "Basic " + str_to_b64str(authpair)}
authpair = f'{url.username}:{url.password}'
headers = {"Authorization": f"Basic {str_to_b64str(authpair)}"}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
@ -42,7 +42,7 @@ class HTTPBasicsTest (BitcoinTestFramework):
conn.close()
#same should be if we add keep-alive because this should be the std. behaviour
headers = {"Authorization": "Basic " + str_to_b64str(authpair), "Connection": "keep-alive"}
headers = {"Authorization": f"Basic {str_to_b64str(authpair)}", "Connection": "keep-alive"}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
@ -59,7 +59,7 @@ class HTTPBasicsTest (BitcoinTestFramework):
conn.close()
#now do the same with "Connection: close"
headers = {"Authorization": "Basic " + str_to_b64str(authpair), "Connection":"close"}
headers = {"Authorization": f"Basic {str_to_b64str(authpair)}", "Connection":"close"}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
@ -70,8 +70,8 @@ class HTTPBasicsTest (BitcoinTestFramework):
#node1 (2nd node) is running with disabled keep-alive option
urlNode1 = urllib.parse.urlparse(self.nodes[1].url)
authpair = urlNode1.username + ':' + urlNode1.password
headers = {"Authorization": "Basic " + str_to_b64str(authpair)}
authpair = f'{urlNode1.username}:{urlNode1.password}'
headers = {"Authorization": f"Basic {str_to_b64str(authpair)}"}
conn = http.client.HTTPConnection(urlNode1.hostname, urlNode1.port)
conn.connect()
@ -81,8 +81,8 @@ class HTTPBasicsTest (BitcoinTestFramework):
#node2 (third node) is running with standard keep-alive parameters which means keep-alive is on
urlNode2 = urllib.parse.urlparse(self.nodes[2].url)
authpair = urlNode2.username + ':' + urlNode2.password
headers = {"Authorization": "Basic " + str_to_b64str(authpair)}
authpair = f'{urlNode2.username}:{urlNode2.password}'
headers = {"Authorization": f"Basic {str_to_b64str(authpair)}"}
conn = http.client.HTTPConnection(urlNode2.hostname, urlNode2.port)
conn.connect()
@ -94,13 +94,13 @@ class HTTPBasicsTest (BitcoinTestFramework):
# Check excessive request size
conn = http.client.HTTPConnection(urlNode2.hostname, urlNode2.port)
conn.connect()
conn.request('GET', '/' + ('x'*1000), '', headers)
conn.request('GET', f'/{"x"*1000}', '', headers)
out1 = conn.getresponse()
assert_equal(out1.status, http.client.NOT_FOUND)
conn = http.client.HTTPConnection(urlNode2.hostname, urlNode2.port)
conn.connect()
conn.request('GET', '/' + ('x'*10000), '', headers)
conn.request('GET', f'/{"x"*10000}', '', headers)
out1 = conn.getresponse()
assert_equal(out1.status, http.client.BAD_REQUEST)

View File

@ -57,7 +57,7 @@ class RESTTest (BitcoinTestFramework):
rest_uri += '.hex'
conn = http.client.HTTPConnection(self.url.hostname, self.url.port)
self.log.debug('%s %s %s', http_method, rest_uri, body)
self.log.debug(f'{http_method} {rest_uri} {body}')
if http_method == 'GET':
conn.request('GET', rest_uri)
elif http_method == 'POST':
@ -92,11 +92,11 @@ class RESTTest (BitcoinTestFramework):
self.log.info("Test the /tx URI")
json_obj = self.test_rest_request("/tx/{}".format(txid))
json_obj = self.test_rest_request(f"/tx/{txid}")
assert_equal(json_obj['txid'], txid)
# Check hex format response
hex_response = self.test_rest_request("/tx/{}".format(txid), req_type=ReqType.HEX, ret_type=RetType.OBJ)
hex_response = self.test_rest_request(f"/tx/{txid}", req_type=ReqType.HEX, ret_type=RetType.OBJ)
assert_greater_than_or_equal(int(hex_response.getheader('content-length')),
json_obj['size']*2)
@ -114,7 +114,7 @@ class RESTTest (BitcoinTestFramework):
assert_equal(self.nodes[1].getbalance(), Decimal("0.1"))
# Check chainTip response
json_obj = self.test_rest_request("/getutxos/{}-{}".format(*spending))
json_obj = self.test_rest_request(f"/getutxos/{spending[0]}-{spending[1]}")
assert_equal(json_obj['chaintipHash'], bb_hash)
# Make sure there is one utxo
@ -123,7 +123,7 @@ class RESTTest (BitcoinTestFramework):
self.log.info("Query a spent TXO using the /getutxos URI")
json_obj = self.test_rest_request("/getutxos/{}-{}".format(*spent))
json_obj = self.test_rest_request(f"/getutxos/{spent[0]}-{spent[1]}")
# Check chainTip response
assert_equal(json_obj['chaintipHash'], bb_hash)
@ -136,7 +136,7 @@ class RESTTest (BitcoinTestFramework):
self.log.info("Query two TXOs using the /getutxos URI")
json_obj = self.test_rest_request("/getutxos/{}-{}/{}-{}".format(*(spending + spent)))
json_obj = self.test_rest_request(f"/getutxos/{spending[0]}-{spending[1]}/{spent[0]}-{spent[1]}")
assert_equal(len(json_obj['utxos']), 1)
assert_equal(json_obj['bitmap'], "10")
@ -163,32 +163,32 @@ class RESTTest (BitcoinTestFramework):
# do a tx and don't sync
txid = self.nodes[0].sendtoaddress(self.nodes[1].getnewaddress(), 0.1)
json_obj = self.test_rest_request("/tx/{}".format(txid))
json_obj = self.test_rest_request(f"/tx/{txid}")
# get the spent output to later check for utxo (should be spent by then)
spent = (json_obj['vin'][0]['txid'], json_obj['vin'][0]['vout'])
# get n of 0.1 outpoint
n, = filter_output_indices_by_value(json_obj['vout'], Decimal('0.1'))
spending = (txid, n)
json_obj = self.test_rest_request("/getutxos/{}-{}".format(*spending))
json_obj = self.test_rest_request(f"/getutxos/{spending[0]}-{spending[1]}")
assert_equal(len(json_obj['utxos']), 0)
json_obj = self.test_rest_request("/getutxos/checkmempool/{}-{}".format(*spending))
json_obj = self.test_rest_request(f"/getutxos/checkmempool/{spending[0]}-{spending[1]}")
assert_equal(len(json_obj['utxos']), 1)
json_obj = self.test_rest_request("/getutxos/{}-{}".format(*spent))
json_obj = self.test_rest_request(f"/getutxos/{spent[0]}-{spent[1]}")
assert_equal(len(json_obj['utxos']), 1)
json_obj = self.test_rest_request("/getutxos/checkmempool/{}-{}".format(*spent))
json_obj = self.test_rest_request(f"/getutxos/checkmempool/{spent[0]}-{spent[1]}")
assert_equal(len(json_obj['utxos']), 0)
self.nodes[0].generate(1)
self.sync_all()
json_obj = self.test_rest_request("/getutxos/{}-{}".format(*spending))
json_obj = self.test_rest_request(f"/getutxos/{spending[0]}-{spending[1]}")
assert_equal(len(json_obj['utxos']), 1)
json_obj = self.test_rest_request("/getutxos/checkmempool/{}-{}".format(*spending))
json_obj = self.test_rest_request(f"/getutxos/checkmempool/{spending[0]}-{spending[1]}")
assert_equal(len(json_obj['utxos']), 1)
# Do some invalid requests
@ -197,11 +197,11 @@ class RESTTest (BitcoinTestFramework):
self.test_rest_request("/getutxos/checkmempool", http_method='POST', req_type=ReqType.JSON, status=400, ret_type=RetType.OBJ)
# Test limits
long_uri = '/'.join(["{}-{}".format(txid, n_) for n_ in range(20)])
self.test_rest_request("/getutxos/checkmempool/{}".format(long_uri), http_method='POST', status=400, ret_type=RetType.OBJ)
long_uri = '/'.join([f"{txid}-{n_}" for n_ in range(20)])
self.test_rest_request(f"/getutxos/checkmempool/{long_uri}", http_method='POST', status=400, ret_type=RetType.OBJ)
long_uri = '/'.join(['{}-{}'.format(txid, n_) for n_ in range(15)])
self.test_rest_request("/getutxos/checkmempool/{}".format(long_uri), http_method='POST', status=200)
long_uri = '/'.join([f'{txid}-{n_}' for n_ in range(15)])
self.test_rest_request(f"/getutxos/checkmempool/{long_uri}", http_method='POST', status=200)
self.nodes[0].generate(1) # generate block to not affect upcoming tests
self.sync_all()
@ -215,42 +215,42 @@ class RESTTest (BitcoinTestFramework):
# Check result if block is not in the active chain
self.nodes[0].invalidateblock(bb_hash)
assert_equal(self.test_rest_request('/headers/1/{}'.format(bb_hash)), [])
self.test_rest_request('/block/{}'.format(bb_hash))
assert_equal(self.test_rest_request(f'/headers/1/{bb_hash}'), [])
self.test_rest_request(f'/block/{bb_hash}')
self.nodes[0].reconsiderblock(bb_hash)
# Check binary format
response = self.test_rest_request("/block/{}".format(bb_hash), req_type=ReqType.BIN, ret_type=RetType.OBJ)
response = self.test_rest_request(f"/block/{bb_hash}", req_type=ReqType.BIN, ret_type=RetType.OBJ)
assert_greater_than(int(response.getheader('content-length')), BLOCK_HEADER_SIZE)
response_bytes = response.read()
# Compare with block header
response_header = self.test_rest_request("/headers/1/{}".format(bb_hash), req_type=ReqType.BIN, ret_type=RetType.OBJ)
response_header = self.test_rest_request(f"/headers/1/{bb_hash}", req_type=ReqType.BIN, ret_type=RetType.OBJ)
assert_equal(int(response_header.getheader('content-length')), BLOCK_HEADER_SIZE)
response_header_bytes = response_header.read()
assert_equal(response_bytes[:BLOCK_HEADER_SIZE], response_header_bytes)
# Check block hex format
response_hex = self.test_rest_request("/block/{}".format(bb_hash), req_type=ReqType.HEX, ret_type=RetType.OBJ)
response_hex = self.test_rest_request(f"/block/{bb_hash}", req_type=ReqType.HEX, ret_type=RetType.OBJ)
assert_greater_than(int(response_hex.getheader('content-length')), BLOCK_HEADER_SIZE*2)
response_hex_bytes = response_hex.read().strip(b'\n')
assert_equal(response_bytes.hex().encode(), response_hex_bytes)
# Compare with hex block header
response_header_hex = self.test_rest_request("/headers/1/{}".format(bb_hash), req_type=ReqType.HEX, ret_type=RetType.OBJ)
response_header_hex = self.test_rest_request(f"/headers/1/{bb_hash}", req_type=ReqType.HEX, ret_type=RetType.OBJ)
assert_greater_than(int(response_header_hex.getheader('content-length')), BLOCK_HEADER_SIZE*2)
response_header_hex_bytes = response_header_hex.read(BLOCK_HEADER_SIZE*2)
assert_equal(response_bytes[:BLOCK_HEADER_SIZE].hex().encode(), response_header_hex_bytes)
# Check json format
block_json_obj = self.test_rest_request("/block/{}".format(bb_hash))
block_json_obj = self.test_rest_request(f"/block/{bb_hash}")
assert_equal(block_json_obj['hash'], bb_hash)
assert_equal(self.test_rest_request("/blockhashbyheight/{}".format(block_json_obj['height']))['blockhash'], bb_hash)
assert_equal(self.test_rest_request(f"/blockhashbyheight/{block_json_obj['height']}")['blockhash'], bb_hash)
# Check hex/bin format
resp_hex = self.test_rest_request("/blockhashbyheight/{}".format(block_json_obj['height']), req_type=ReqType.HEX, ret_type=RetType.OBJ)
resp_hex = self.test_rest_request(f"/blockhashbyheight/{block_json_obj['height']}", req_type=ReqType.HEX, ret_type=RetType.OBJ)
assert_equal(resp_hex.read().decode('utf-8').rstrip(), bb_hash)
resp_bytes = self.test_rest_request("/blockhashbyheight/{}".format(block_json_obj['height']), req_type=ReqType.BIN, ret_type=RetType.BYTES)
resp_bytes = self.test_rest_request(f"/blockhashbyheight/{block_json_obj['height']}", req_type=ReqType.BIN, ret_type=RetType.BYTES)
blockhash = resp_bytes[::-1].hex()
assert_equal(blockhash, bb_hash)
@ -264,7 +264,7 @@ class RESTTest (BitcoinTestFramework):
self.test_rest_request("/blockhashbyheight/", ret_type=RetType.OBJ, status=400)
# Compare with json block header
json_obj = self.test_rest_request("/headers/1/{}".format(bb_hash))
json_obj = self.test_rest_request(f"/headers/1/{bb_hash}")
assert_equal(len(json_obj), 1) # ensure that there is one header in the json response
assert_equal(json_obj[0]['hash'], bb_hash) # request/response hash should be the same
@ -276,7 +276,7 @@ class RESTTest (BitcoinTestFramework):
# See if we can get 5 headers in one response
self.nodes[1].generate(5)
self.sync_all()
json_obj = self.test_rest_request("/headers/5/{}".format(bb_hash))
json_obj = self.test_rest_request(f"/headers/5/{bb_hash}")
assert_equal(len(json_obj), 5) # now we should have 5 header objects
self.log.info("Test tx inclusion in the /mempool and /block URIs")
@ -306,13 +306,13 @@ class RESTTest (BitcoinTestFramework):
self.sync_all()
# Check if the 3 tx show up in the new block
json_obj = self.test_rest_request("/block/{}".format(newblockhash[0]))
json_obj = self.test_rest_request(f"/block/{newblockhash[0]}")
non_coinbase_txs = {tx['txid'] for tx in json_obj['tx']
if 'coinbase' not in tx['vin'][0]}
assert_equal(non_coinbase_txs, set(txs))
# Check the same but without tx details
json_obj = self.test_rest_request("/block/notxdetails/{}".format(newblockhash[0]))
json_obj = self.test_rest_request(f"/block/notxdetails/{newblockhash[0]}")
for tx in txs:
assert tx in json_obj['tx']

View File

@ -16,7 +16,7 @@ def expect_http_status(expected_http_status, expected_rpc_code,
fcn, *args):
try:
fcn(*args)
raise AssertionError("Expected RPC error %d, got none" % expected_rpc_code)
raise AssertionError(f"Expected RPC error {expected_rpc_code}, got none")
except JSONRPCException as exc:
assert_equal(exc.error["code"], expected_rpc_code)
assert_equal(exc.http_status, expected_http_status)

View File

@ -132,7 +132,7 @@ class ZMQTest (BitcoinTestFramework):
socket = self.ctx.socket(zmq.SUB)
subscribers.append(ZMQSubscriber(socket, topic.encode()))
self.restart_node(0, ["-zmqpub%s=%s" % (topic, address) for topic, address in services] +
self.restart_node(0, [f"-zmqpub{topic}={address}" for topic, address in services] +
self.extra_args[0])
for i, sub in enumerate(subscribers):
@ -184,7 +184,7 @@ class ZMQTest (BitcoinTestFramework):
rawtx = subs[3]
num_blocks = 5
self.log.info("Generate %(n)d blocks (and %(n)d coinbase txes)" % {"n": num_blocks})
self.log.info(f"Generate {num_blocks} blocks (and {num_blocks} coinbase txes)")
genhashes = self.nodes[0].generatetoaddress(num_blocks, ADDRESS_BCRT1_UNSPENDABLE)
self.sync_all()
@ -504,7 +504,7 @@ class ZMQTest (BitcoinTestFramework):
if mempool_sequence is not None:
zmq_mem_seq = mempool_sequence
if zmq_mem_seq > get_raw_seq:
raise Exception("We somehow jumped mempool sequence numbers! zmq_mem_seq: {} > get_raw_seq: {}".format(zmq_mem_seq, get_raw_seq))
raise Exception(f"We somehow jumped mempool sequence numbers! zmq_mem_seq: {zmq_mem_seq} > get_raw_seq: {get_raw_seq}")
# 4) Moving forward, we apply the delta to our local view
# remaining txs(5) + 1 rbf(A+R) + 1 block connect + 1 final tx
@ -520,7 +520,7 @@ class ZMQTest (BitcoinTestFramework):
assert mempool_sequence > expected_sequence
r_gap += mempool_sequence - expected_sequence
else:
raise Exception("WARNING: txhash has unexpected mempool sequence value: {} vs expected {}".format(mempool_sequence, expected_sequence))
raise Exception(f"WARNING: txhash has unexpected mempool sequence value: {mempool_sequence} vs expected {expected_sequence}")
if label == "A":
assert hash_str not in mempool_view
mempool_view.add(hash_str)