contrib: Refactor verifbinaries to support subcommands

Prepares for the option to provide local binaries, sha256sums, and
signatures directly.
This commit is contained in:
Andrew Chow 2023-03-22 21:15:06 -04:00 committed by Cory Fields
parent 37c9fb7a59
commit 17575c0efa
2 changed files with 191 additions and 153 deletions

View file

@ -8,12 +8,12 @@ from pathlib import Path
def main():
"""Tests ordered roughly from faster to slower."""
expect_code(run_verify('0.32'), 4, "Nonexistent version should fail")
expect_code(run_verify('0.32.awefa.12f9h'), 11, "Malformed version should fail")
expect_code(run_verify('22.0 --min-good-sigs 20'), 9, "--min-good-sigs 20 should fail")
expect_code(run_verify("", "pub", '0.32'), 4, "Nonexistent version should fail")
expect_code(run_verify("", "pub", '0.32.awefa.12f9h'), 11, "Malformed version should fail")
expect_code(run_verify('--min-good-sigs 20', "pub", "22.0"), 9, "--min-good-sigs 20 should fail")
print("- testing multisig verification (22.0)", flush=True)
_220 = run_verify('22.0 --json')
_220 = run_verify("--json", "pub", "22.0")
try:
result = json.loads(_220.stdout.decode())
except Exception:
@ -29,12 +29,15 @@ def main():
assert v['bitcoin-22.0-x86_64-linux-gnu.tar.gz'] == '59ebd25dd82a51638b7a6bb914586201e67db67b919b2a1ff08925a7936d1b16'
def run_verify(extra: str) -> subprocess.CompletedProcess:
def run_verify(global_args: str, command: str, command_args: str) -> subprocess.CompletedProcess:
maybe_here = Path.cwd() / 'verify.py'
path = maybe_here if maybe_here.exists() else Path.cwd() / 'contrib' / 'verifybinaries' / 'verify.py'
if command == "pub":
command += " --cleanup"
return subprocess.run(
f"{path} --cleanup {extra}",
f"{path} {global_args} {command} {command_args}",
stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)

View file

@ -98,60 +98,6 @@ def bool_from_env(key, default=False) -> bool:
VERSION_FORMAT = "<major>.<minor>[.<patch>][-rc[0-9]][-platform]"
VERSION_EXAMPLE = "22.0-x86_64 or 0.21.0-rc2-osx"
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'version', type=str, help=(
f'version of the bitcoin release to download; of the format '
f'{VERSION_FORMAT}. Example: {VERSION_EXAMPLE}')
)
parser.add_argument(
'-v', '--verbose', action='store_true',
default=bool_from_env('BINVERIFY_VERBOSE'),
)
parser.add_argument(
'-q', '--quiet', action='store_true',
default=bool_from_env('BINVERIFY_QUIET'),
)
parser.add_argument(
'--cleanup', action='store_true',
default=bool_from_env('BINVERIFY_CLEANUP'),
help='if specified, clean up files afterwards'
)
parser.add_argument(
'--import-keys', action='store_true',
default=bool_from_env('BINVERIFY_IMPORTKEYS'),
help='if specified, ask to import each unknown builder key'
)
parser.add_argument(
'--require-all-hosts', action='store_true',
default=bool_from_env('BINVERIFY_REQUIRE_ALL_HOSTS'),
help=(
f'If set, require all hosts ({HOST1}, {HOST2}) to provide signatures. '
'(Sometimes bitcoin.org lags behind bitcoincore.org.)')
)
parser.add_argument(
'--min-good-sigs', type=int, action='store', nargs='?',
default=int(os.environ.get('BINVERIFY_MIN_GOOD_SIGS', 3)),
help=(
'The minimum number of good signatures to require successful termination.'),
)
parser.add_argument(
'--keyserver', action='store', nargs='?',
default=os.environ.get('BINVERIFY_KEYSERVER', 'hkp://keyserver.ubuntu.com'),
help='which keyserver to use',
)
parser.add_argument(
'--trusted-keys', action='store', nargs='?',
default=os.environ.get('BINVERIFY_TRUSTED_KEYS', ''),
help='A list of trusted signer GPG keys, separated by commas. Not "trusted keys" in the GPG sense.',
)
parser.add_argument(
'--json', action='store_true',
default=bool_from_env('BINVERIFY_JSON'),
help='If set, output the result as JSON',
)
def parse_version_string(version_str):
if version_str.startswith(VERSIONPREFIX): # remove version prefix
version_str = version_str[len(VERSIONPREFIX):]
@ -386,7 +332,7 @@ def get_files_from_hosts_and_compare(
return ReturnCode.SUCCESS
def check_multisig(sigfilename: str, args: argparse.Namespace):
def check_multisig(sigfilename: Path, args: argparse.Namespace) -> t.Tuple[int, str, t.List[SigData], t.List[SigData], t.List[SigData]]:
# check signature
#
# We don't write output to a file because this command will almost certainly
@ -423,12 +369,106 @@ def prompt_yn(prompt) -> bool:
got = input(prompt).lower()
return got == 'y'
def verify_shasums_signature(
signature_file_path: str, sums_file_path: str, args: argparse.Namespace
) -> t.Tuple[
ReturnCode, t.List[SigData], t.List[SigData], t.List[SigData], t.List[SigData]
]:
min_good_sigs = args.min_good_sigs
gpg_allowed_codes = [0, 2] # 2 is returned when untrusted signatures are present.
def main(args):
args = parser.parse_args()
if args.quiet:
log.setLevel(logging.WARNING)
gpg_retval, gpg_output, good, unknown, bad = check_multisig(signature_file_path, args)
if gpg_retval not in gpg_allowed_codes:
if gpg_retval == 1:
log.critical(f"Bad signature (code: {gpg_retval}).")
if gpg_retval == 2:
log.critical(
"gpg error. Do you have the Bitcoin Core binary release "
"signing key installed?")
else:
log.critical(f"unexpected GPG exit code ({gpg_retval})")
log.error(f"gpg output:\n{indent(gpg_output)}")
return (ReturnCode.INTEGRITY_FAILURE, [], [], [], [])
# Decide which keys we trust, though not "trust" in the GPG sense, but rather
# which pubkeys convince us that this sums file is legitimate. In other words,
# which pubkeys within the Bitcoin community do we trust for the purposes of
# binary verification?
trusted_keys = set()
if args.trusted_keys:
trusted_keys |= set(args.trusted_keys.split(','))
# Tally signatures and make sure we have enough goods to fulfill
# our threshold.
good_trusted = [sig for sig in good if sig.trusted or sig.key in trusted_keys]
good_untrusted = [sig for sig in good if sig not in good_trusted]
num_trusted = len(good_trusted) + len(good_untrusted)
log.info(f"got {num_trusted} good signatures")
if num_trusted < min_good_sigs:
log.info("Maybe you need to import "
f"(`gpg --keyserver {args.keyserver} --recv-keys <key-id>`) "
"some of the following keys: ")
log.info('')
for sig in unknown:
log.info(f" {sig.key} ({sig.name})")
log.info('')
log.error(
"not enough trusted sigs to meet threshold "
f"({num_trusted} vs. {min_good_sigs})")
return (ReturnCode.NOT_ENOUGH_GOOD_SIGS, [], [], [], [])
for sig in good_trusted:
log.info(f"GOOD SIGNATURE: {sig}")
for sig in good_untrusted:
log.info(f"GOOD SIGNATURE (untrusted): {sig}")
for sig in [sig for sig in good if sig.status == 'expired']:
log.warning(f"key {sig.key} for {sig.name} is expired")
for sig in bad:
log.warning(f"BAD SIGNATURE: {sig}")
for sig in unknown:
log.warning(f"UNKNOWN SIGNATURE: {sig}")
return (ReturnCode.SUCCESS, good_trusted, good_untrusted, unknown, bad)
def parse_sums_file(sums_file_path: Path, filename_filter: str) -> t.List[t.List[str]]:
# extract hashes/filenames of binaries to verify from hash file;
# each line has the following format: "<hash> <binary_filename>"
with open(sums_file_path, 'r', encoding='utf8') as hash_file:
return [line.split()[:2] for line in hash_file if filename_filter in line]
def verify_binary_hashes(hashes_to_verify: t.List[t.List[str]]) -> t.Tuple[ReturnCode, t.Dict[str, str]]:
offending_files = []
files_to_hashes = {}
for hash_expected, binary_filename in hashes_to_verify:
with open(binary_filename, 'rb') as binary_file:
hash_calculated = sha256(binary_file.read()).hexdigest()
if hash_calculated != hash_expected:
offending_files.append(binary_filename)
else:
files_to_hashes[binary_filename] = hash_calculated
if offending_files:
joined_files = '\n'.join(offending_files)
log.critical(
"Hashes don't match.\n"
f"Offending files:\n{joined_files}")
return (ReturnCode.INTEGRITY_FAILURE, files_to_hashes)
return (ReturnCode.SUCCESS, files_to_hashes)
def verify_published_handler(args: argparse.Namespace) -> ReturnCode:
WORKINGDIR = Path(tempfile.gettempdir()) / f"bitcoin_verify_binaries.{args.version}"
def cleanup():
@ -464,83 +504,25 @@ def main(args):
return got_sig_status
# Multi-sig verification is available after 22.0.
if version_tuple[0] >= 22:
min_good_sigs = args.min_good_sigs
gpg_allowed_codes = [0, 2] # 2 is returned when untrusted signatures are present.
got_sums_status = get_files_from_hosts_and_compare(
hosts, remote_sums_path, SUMS_FILENAME, args.require_all_hosts)
if got_sums_status != ReturnCode.SUCCESS:
return got_sums_status
gpg_retval, gpg_output, good, unknown, bad = check_multisig(SIGNATUREFILENAME, args)
else:
if version_tuple[0] < 22:
log.error("Version too old - single sig not supported. Use a previous "
"version of this script from the repo.")
return ReturnCode.BAD_VERSION
if gpg_retval not in gpg_allowed_codes:
if gpg_retval == 1:
log.critical(f"Bad signature (code: {gpg_retval}).")
if gpg_retval == 2:
log.critical(
"gpg error. Do you have the Bitcoin Core binary release "
"signing key installed?")
else:
log.critical(f"unexpected GPG exit code ({gpg_retval})")
got_sums_status = get_files_from_hosts_and_compare(
hosts, remote_sums_path, SUMS_FILENAME, args.require_all_hosts)
if got_sums_status != ReturnCode.SUCCESS:
return got_sums_status
log.error(f"gpg output:\n{indent(gpg_output)}")
cleanup()
return ReturnCode.INTEGRITY_FAILURE
# Verify the signature on the SHA256SUMS file
sigs_status, good_trusted, good_untrusted, unknown, bad = verify_shasums_signature(SIGNATUREFILENAME, SUMS_FILENAME, args)
if sigs_status != ReturnCode.SUCCESS:
if sigs_status == ReturnCode.INTEGRITY_FAILURE:
cleanup()
return sigs_status
# Decide which keys we trust, though not "trust" in the GPG sense, but rather
# which pubkeys convince us that this sums file is legitimate. In other words,
# which pubkeys within the Bitcoin community do we trust for the purposes of
# binary verification?
trusted_keys = set()
if args.trusted_keys:
trusted_keys |= set(args.trusted_keys.split(','))
# Tally signatures and make sure we have enough goods to fulfill
# our threshold.
good_trusted = {sig for sig in good if sig.trusted or sig.key in trusted_keys}
good_untrusted = [sig for sig in good if sig not in good_trusted]
num_trusted = len(good_trusted) + len(good_untrusted)
log.info(f"got {num_trusted} good signatures")
if num_trusted < min_good_sigs:
log.info("Maybe you need to import "
f"(`gpg --keyserver {args.keyserver} --recv-keys <key-id>`) "
"some of the following keys: ")
log.info('')
for sig in unknown:
log.info(f" {sig.key} ({sig.name})")
log.info('')
log.error(
"not enough trusted sigs to meet threshold "
f"({num_trusted} vs. {min_good_sigs})")
return ReturnCode.NOT_ENOUGH_GOOD_SIGS
for sig in good_trusted:
log.info(f"GOOD SIGNATURE: {sig}")
for sig in good_untrusted:
log.info(f"GOOD SIGNATURE (untrusted): {sig}")
for sig in [sig for sig in good if sig.status == 'expired']:
log.warning(f"key {sig.key} for {sig.name} is expired")
for sig in bad:
log.warning(f"BAD SIGNATURE: {sig}")
for sig in unknown:
log.warning(f"UNKNOWN SIGNATURE: {sig}")
# extract hashes/filenames of binaries to verify from hash file;
# each line has the following format: "<hash> <binary_filename>"
with open(SUMS_FILENAME, 'r', encoding='utf8') as hash_file:
hashes_to_verify = [line.split()[:2] for line in hash_file if os_filter in line]
# Extract hashes and filenames
hashes_to_verify = parse_sums_file(SUMS_FILENAME, os_filter)
remove_files([SUMS_FILENAME])
if not hashes_to_verify:
log.error("no files matched the platform specified")
@ -570,23 +552,10 @@ def main(args):
return ReturnCode.BINARY_DOWNLOAD_FAILED
# verify hashes
offending_files = []
files_to_hashes = {}
hashes_status, files_to_hashes = verify_binary_hashes(hashes_to_verify)
if hashes_status != ReturnCode.SUCCESS:
return hashes_status
for hash_expected, binary_filename in hashes_to_verify:
with open(binary_filename, 'rb') as binary_file:
hash_calculated = sha256(binary_file.read()).hexdigest()
if hash_calculated != hash_expected:
offending_files.append(binary_filename)
else:
files_to_hashes[binary_filename] = hash_calculated
if offending_files:
joined_files = '\n'.join(offending_files)
log.critical(
"Hashes don't match.\n"
f"Offending files:\n{joined_files}")
return ReturnCode.INTEGRITY_FAILURE
if args.cleanup:
cleanup()
@ -609,5 +578,71 @@ def main(args):
return ReturnCode.SUCCESS
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'-v', '--verbose', action='store_true',
default=bool_from_env('BINVERIFY_VERBOSE'),
)
parser.add_argument(
'-q', '--quiet', action='store_true',
default=bool_from_env('BINVERIFY_QUIET'),
)
parser.add_argument(
'--import-keys', action='store_true',
default=bool_from_env('BINVERIFY_IMPORTKEYS'),
help='if specified, ask to import each unknown builder key'
)
parser.add_argument(
'--min-good-sigs', type=int, action='store', nargs='?',
default=int(os.environ.get('BINVERIFY_MIN_GOOD_SIGS', 3)),
help=(
'The minimum number of good signatures to require successful termination.'),
)
parser.add_argument(
'--keyserver', action='store', nargs='?',
default=os.environ.get('BINVERIFY_KEYSERVER', 'hkp://keyserver.ubuntu.com'),
help='which keyserver to use',
)
parser.add_argument(
'--trusted-keys', action='store', nargs='?',
default=os.environ.get('BINVERIFY_TRUSTED_KEYS', ''),
help='A list of trusted signer GPG keys, separated by commas. Not "trusted keys" in the GPG sense.',
)
parser.add_argument(
'--json', action='store_true',
default=bool_from_env('BINVERIFY_JSON'),
help='If set, output the result as JSON',
)
subparsers = parser.add_subparsers(title="Commands", required=True, dest="command")
pub_parser = subparsers.add_parser("pub", help="Verify a published release.")
pub_parser.set_defaults(func=verify_published_handler)
pub_parser.add_argument(
'version', type=str, help=(
f'version of the bitcoin release to download; of the format '
f'{VERSION_FORMAT}. Example: {VERSION_EXAMPLE}')
)
pub_parser.add_argument(
'--cleanup', action='store_true',
default=bool_from_env('BINVERIFY_CLEANUP'),
help='if specified, clean up files afterwards'
)
pub_parser.add_argument(
'--require-all-hosts', action='store_true',
default=bool_from_env('BINVERIFY_REQUIRE_ALL_HOSTS'),
help=(
f'If set, require all hosts ({HOST1}, {HOST2}) to provide signatures. '
'(Sometimes bitcoin.org lags behind bitcoincore.org.)')
)
args = parser.parse_args()
if args.quiet:
log.setLevel(logging.WARNING)
return args.func(args)
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))
sys.exit(main())