bitcoin/test/functional/rpc_getblockstats.py

198 lines
8.5 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
# Copyright (c) 2017-2021 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
#
# Test getblockstats rpc call
#
from test_framework.blocktools import COINBASE_MATURITY
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
assert_raises_rpc_error,
)
import json
import os
TESTSDIR = os.path.dirname(os.path.realpath(__file__))
class GetblockstatsTest(BitcoinTestFramework):
start_height = 101
max_stat_pos = 2
def add_options(self, parser):
parser.add_argument('--gen-test-data', dest='gen_test_data',
default=False, action='store_true',
help='Generate test data')
parser.add_argument('--test-data', dest='test_data',
default='data/rpc_getblockstats.json',
action='store', metavar='FILE',
help='Test data file')
def set_test_params(self):
self.num_nodes = 1
self.setup_clean_chain = True
self.supports_cli = False
def get_stats(self):
return [self.nodes[0].getblockstats(hash_or_height=self.start_height + i) for i in range(self.max_stat_pos+1)]
def generate_test_data(self, filename):
mocktime = 1525107225
self.nodes[0].setmocktime(mocktime)
self.nodes[0].createwallet(wallet_name='test')
privkey = self.nodes[0].get_deterministic_priv_key().key
self.nodes[0].importprivkey(privkey)
self.generate(self.nodes[0], COINBASE_MATURITY + 1)
address = self.nodes[0].get_deterministic_priv_key().address
self.nodes[0].sendtoaddress(address=address, amount=10, subtractfeefromamount=True)
self.generate(self.nodes[0], 1)
self.nodes[0].sendtoaddress(address=address, amount=10, subtractfeefromamount=True)
self.nodes[0].sendtoaddress(address=address, amount=10, subtractfeefromamount=False)
self.nodes[0].settxfee(amount=0.003)
self.nodes[0].sendtoaddress(address=address, amount=1, subtractfeefromamount=True)
# Send to OP_RETURN output to test its exclusion from statistics
self.nodes[0].send(outputs={"data": "21"})
self.sync_all()
self.generate(self.nodes[0], 1)
self.expected_stats = self.get_stats()
blocks = []
tip = self.nodes[0].getbestblockhash()
blockhash = None
height = 0
while tip != blockhash:
blockhash = self.nodes[0].getblockhash(height)
blocks.append(self.nodes[0].getblock(blockhash, 0))
height += 1
to_dump = {
'blocks': blocks,
'mocktime': int(mocktime),
'stats': self.expected_stats,
}
with open(filename, 'w', encoding="utf8") as f:
json.dump(to_dump, f, sort_keys=True, indent=2)
def load_test_data(self, filename):
with open(filename, 'r', encoding="utf8") as f:
d = json.load(f)
blocks = d['blocks']
mocktime = d['mocktime']
self.expected_stats = d['stats']
# Set the timestamps from the file so that the nodes can get out of Initial Block Download
self.nodes[0].setmocktime(mocktime)
self.sync_all()
for b in blocks:
self.nodes[0].submitblock(b)
def run_test(self):
test_data = os.path.join(TESTSDIR, self.options.test_data)
if self.options.gen_test_data:
self.generate_test_data(test_data)
else:
self.load_test_data(test_data)
self.sync_all()
stats = self.get_stats()
# Make sure all valid statistics are included but nothing else is
expected_keys = self.expected_stats[0].keys()
assert_equal(set(stats[0].keys()), set(expected_keys))
assert_equal(stats[0]['height'], self.start_height)
assert_equal(stats[self.max_stat_pos]['height'], self.start_height + self.max_stat_pos)
for i in range(self.max_stat_pos+1):
self.log.info('Checking block %d' % (i))
assert_equal(stats[i], self.expected_stats[i])
# Check selecting block by hash too
blockhash = self.expected_stats[i]['blockhash']
stats_by_hash = self.nodes[0].getblockstats(hash_or_height=blockhash)
assert_equal(stats_by_hash, self.expected_stats[i])
# Make sure each stat can be queried on its own
for stat in expected_keys:
for i in range(self.max_stat_pos+1):
result = self.nodes[0].getblockstats(hash_or_height=self.start_height + i, stats=[stat])
assert_equal(list(result.keys()), [stat])
if result[stat] != self.expected_stats[i][stat]:
self.log.info('result[%s] (%d) failed, %r != %r' % (
stat, i, result[stat], self.expected_stats[i][stat]))
assert_equal(result[stat], self.expected_stats[i][stat])
# Make sure only the selected statistics are included (more than one)
some_stats = {'minfee', 'maxfee'}
stats = self.nodes[0].getblockstats(hash_or_height=1, stats=list(some_stats))
assert_equal(set(stats.keys()), some_stats)
# Test invalid parameters raise the proper json exceptions
tip = self.start_height + self.max_stat_pos
assert_raises_rpc_error(-8, 'Target block height %d after current tip %d' % (tip+1, tip),
self.nodes[0].getblockstats, hash_or_height=tip+1)
assert_raises_rpc_error(-8, 'Target block height %d is negative' % (-1),
self.nodes[0].getblockstats, hash_or_height=-1)
# Make sure not valid stats aren't allowed
inv_sel_stat = 'asdfghjkl'
inv_stats = [
[inv_sel_stat],
['minfee', inv_sel_stat],
[inv_sel_stat, 'minfee'],
['minfee', inv_sel_stat, 'maxfee'],
]
for inv_stat in inv_stats:
assert_raises_rpc_error(-8, f"Invalid selected statistic '{inv_sel_stat}'",
self.nodes[0].getblockstats, hash_or_height=1, stats=inv_stat)
# Make sure we aren't always returning inv_sel_stat as the culprit stat
assert_raises_rpc_error(-8, f"Invalid selected statistic 'aaa{inv_sel_stat}'",
self.nodes[0].getblockstats, hash_or_height=1, stats=['minfee', f'aaa{inv_sel_stat}'])
# Mainchain's genesis block shouldn't be found on regtest
assert_raises_rpc_error(-5, 'Block not found', self.nodes[0].getblockstats,
hash_or_height='000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f')
# Invalid number of args
assert_raises_rpc_error(-1, 'getblockstats hash_or_height ( stats )', self.nodes[0].getblockstats, '00', 1, 2)
assert_raises_rpc_error(-1, 'getblockstats hash_or_height ( stats )', self.nodes[0].getblockstats)
self.log.info('Test block height 0')
genesis_stats = self.nodes[0].getblockstats(0)
assert_equal(genesis_stats["blockhash"], "0f9188f13cb7b2c71f2a335e3a4fc328bf5beb436012afca590b1a11466e2206")
assert_equal(genesis_stats["utxo_increase"], 1)
assert_equal(genesis_stats["utxo_size_inc"], 117)
assert_equal(genesis_stats["utxo_increase_actual"], 0)
assert_equal(genesis_stats["utxo_size_inc_actual"], 0)
self.log.info('Test tip including OP_RETURN')
tip_stats = self.nodes[0].getblockstats(tip)
assert_equal(tip_stats["utxo_increase"], 6)
assert_equal(tip_stats["utxo_size_inc"], 441)
assert_equal(tip_stats["utxo_increase_actual"], 4)
assert_equal(tip_stats["utxo_size_inc_actual"], 300)
self.log.info("Test when only header is known")
block = self.generateblock(self.nodes[0], output="raw(55)", transactions=[], submit=False)
self.nodes[0].submitheader(block["hex"])
assert_raises_rpc_error(-1, "Block not available (not fully downloaded)", lambda: self.nodes[0].getblockstats(block['hash']))
self.log.info('Test when block is missing')
(self.nodes[0].blocks_path / 'blk00000.dat').rename(self.nodes[0].blocks_path / 'blk00000.dat.backup')
assert_raises_rpc_error(-1, 'Block not found on disk', self.nodes[0].getblockstats, hash_or_height=1)
(self.nodes[0].blocks_path / 'blk00000.dat.backup').rename(self.nodes[0].blocks_path / 'blk00000.dat')
if __name__ == '__main__':
GetblockstatsTest(__file__).main()