mirror of
https://github.com/bitcoin/bips.git
synced 2024-11-19 01:40:05 +01:00
Add reference.py with test vectors
* reference.py contains the silent payment specific code * secp256k1.py for doing the EC operations * bech32m.py contains code for encoding/decoding bech32(m) addresses * bitcoin_utils.py contains some helper code, not specific to silent payments * send_and_receive_test_vectors.json contains the wallet unit test vectors Co-Authored-By: S3RK <1466284+S3RK@users.noreply.github.com> Co-Authored-By: Oghenovo Usiwoma <37949128+Eunovo@users.noreply.github.com> Co-authored-by: S.Blagogee <34041358+setavenger@users.noreply.github.com>
This commit is contained in:
parent
96f4e5a4c4
commit
33a99a1a17
@ -370,6 +370,58 @@ If using a seed/seed phrase only style backup, the user can recover the wallet's
|
||||
|
||||
Silent payments introduces a new address format and protocol for sending and as such is not compatible with older wallet software or wallets which have not implemented the silent payments protocol.
|
||||
|
||||
== Test Vectors ==
|
||||
|
||||
A [[bip-0352/send_and_receive_test_vectors.json|collection of test vectors in JSON format]] are provided, along with a [[bip-0352/reference.py|python reference implementation]]. Each test vector consists of a sending test case and corresponding receiving test case. This is to allow sending and receiving to be implemented separately. To ensure determinism while testing, sort the array of ''B<sub>m</sub>'' by amount (see the [[bip-0352/reference.py|reference implementation]]). Test cases use the following schema:
|
||||
|
||||
''' test_case '''
|
||||
|
||||
{
|
||||
"comment": "Comment describing the behavior being tested",
|
||||
"sending": [<array of sender test objects>],
|
||||
"receiving": [<array of recipient test objects>],
|
||||
}
|
||||
|
||||
''' sender '''
|
||||
|
||||
{
|
||||
"given": {
|
||||
"vin": [<array of vin objects with an added field for the private key. These objects are structured to match the `vin` output field from `getrawtransaction verbosity=2`>],
|
||||
"recipients": [<array of strings, where each string is a bech32m encoding representing a silent payment address>]
|
||||
},
|
||||
"expected": {
|
||||
"outputs": [<array of strings, where each string is a hex encoding of 32-byte X-only public key; contains all possible output sets, test must match a subset of size `n_outputs`>],
|
||||
"n_outouts": <integer for the exact number of expected outputs>,
|
||||
},
|
||||
}
|
||||
|
||||
''' recipient '''
|
||||
|
||||
{
|
||||
"given": {
|
||||
"vin": [<array of vin objects. These objects are structured to match the `vin` output field from `getrawtransaction verbosity=2`>],
|
||||
"key_material": {
|
||||
"scan_priv_key": <hex encoded scan private key>,
|
||||
"spend_priv_key": <hex encoded spend private key>,
|
||||
}
|
||||
"labels": [<array of ints, representing labels the receiver has used>],
|
||||
},
|
||||
"expected": {
|
||||
"addresses": [<array of bech32m strings, one for the silent payment address and each labeled address (if used)>],
|
||||
"outputs": [<array of outputs with tweak and signature; contains all possible output sets, tester must match a subset of size `n_outputs`>
|
||||
{
|
||||
"priv_key_tweak": <hex encoded private key tweak data>,
|
||||
"pub_key": <hex encoded X-only public key>,
|
||||
"signature": <hex encoded signature for the output (produced with spend_priv_key + priv_key_tweak)>
|
||||
},
|
||||
...
|
||||
],
|
||||
"n_outputs": <integer for the exact number of expected outputs>
|
||||
}
|
||||
}
|
||||
|
||||
Wallets should include inputs not in the ''[[#inputs-for-shared-secret-derivation|Inputs For Shared Secret Derivation]]'' list when testing to ensure that only inputs from the list are being used for shared secret derivation. Additionally, receiving wallets should include non-silent payment outputs for themselves in testing to ensure silent payments scanning does not interfere with regular outputs detection.
|
||||
|
||||
=== Functional tests ===
|
||||
|
||||
Below is a list of functional tests which should be included in sending and receiving implementations.
|
||||
|
135
bip-0352/bech32m.py
Normal file
135
bip-0352/bech32m.py
Normal file
@ -0,0 +1,135 @@
|
||||
# Copyright (c) 2017, 2020 Pieter Wuille
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in
|
||||
# all copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
# THE SOFTWARE.
|
||||
|
||||
"""Reference implementation for Bech32/Bech32m and segwit addresses."""
|
||||
|
||||
|
||||
from enum import Enum
|
||||
|
||||
class Encoding(Enum):
|
||||
"""Enumeration type to list the various supported encodings."""
|
||||
BECH32 = 1
|
||||
BECH32M = 2
|
||||
|
||||
CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l"
|
||||
BECH32M_CONST = 0x2bc830a3
|
||||
|
||||
def bech32_polymod(values):
|
||||
"""Internal function that computes the Bech32 checksum."""
|
||||
generator = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3]
|
||||
chk = 1
|
||||
for value in values:
|
||||
top = chk >> 25
|
||||
chk = (chk & 0x1ffffff) << 5 ^ value
|
||||
for i in range(5):
|
||||
chk ^= generator[i] if ((top >> i) & 1) else 0
|
||||
return chk
|
||||
|
||||
|
||||
def bech32_hrp_expand(hrp):
|
||||
"""Expand the HRP into values for checksum computation."""
|
||||
return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp]
|
||||
|
||||
|
||||
def bech32_verify_checksum(hrp, data):
|
||||
"""Verify a checksum given HRP and converted data characters."""
|
||||
const = bech32_polymod(bech32_hrp_expand(hrp) + data)
|
||||
if const == 1:
|
||||
return Encoding.BECH32
|
||||
if const == BECH32M_CONST:
|
||||
return Encoding.BECH32M
|
||||
return None
|
||||
|
||||
def bech32_create_checksum(hrp, data, spec):
|
||||
"""Compute the checksum values given HRP and data."""
|
||||
values = bech32_hrp_expand(hrp) + data
|
||||
const = BECH32M_CONST if spec == Encoding.BECH32M else 1
|
||||
polymod = bech32_polymod(values + [0, 0, 0, 0, 0, 0]) ^ const
|
||||
return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)]
|
||||
|
||||
|
||||
def bech32_encode(hrp, data, spec):
|
||||
"""Compute a Bech32 string given HRP and data values."""
|
||||
combined = data + bech32_create_checksum(hrp, data, spec)
|
||||
return hrp + '1' + ''.join([CHARSET[d] for d in combined])
|
||||
|
||||
def bech32_decode(bech):
|
||||
"""Validate a Bech32/Bech32m string, and determine HRP and data."""
|
||||
if ((any(ord(x) < 33 or ord(x) > 126 for x in bech)) or
|
||||
(bech.lower() != bech and bech.upper() != bech)):
|
||||
return (None, None, None)
|
||||
bech = bech.lower()
|
||||
pos = bech.rfind('1')
|
||||
|
||||
# remove the requirement that bech32m be less than 90 chars
|
||||
if pos < 1 or pos + 7 > len(bech):
|
||||
return (None, None, None)
|
||||
if not all(x in CHARSET for x in bech[pos+1:]):
|
||||
return (None, None, None)
|
||||
hrp = bech[:pos]
|
||||
data = [CHARSET.find(x) for x in bech[pos+1:]]
|
||||
spec = bech32_verify_checksum(hrp, data)
|
||||
if spec is None:
|
||||
return (None, None, None)
|
||||
return (hrp, data[:-6], spec)
|
||||
|
||||
def convertbits(data, frombits, tobits, pad=True):
|
||||
"""General power-of-2 base conversion."""
|
||||
acc = 0
|
||||
bits = 0
|
||||
ret = []
|
||||
maxv = (1 << tobits) - 1
|
||||
max_acc = (1 << (frombits + tobits - 1)) - 1
|
||||
for value in data:
|
||||
if value < 0 or (value >> frombits):
|
||||
return None
|
||||
acc = ((acc << frombits) | value) & max_acc
|
||||
bits += frombits
|
||||
while bits >= tobits:
|
||||
bits -= tobits
|
||||
ret.append((acc >> bits) & maxv)
|
||||
if pad:
|
||||
if bits:
|
||||
ret.append((acc << (tobits - bits)) & maxv)
|
||||
elif bits >= frombits or ((acc << (tobits - bits)) & maxv):
|
||||
return None
|
||||
return ret
|
||||
|
||||
|
||||
def decode(hrp, addr):
|
||||
"""Decode a segwit address."""
|
||||
hrpgot, data, spec = bech32_decode(addr)
|
||||
if hrpgot != hrp:
|
||||
return (None, None)
|
||||
decoded = convertbits(data[1:], 5, 8, False)
|
||||
if decoded is None or len(decoded) < 2:
|
||||
return (None, None)
|
||||
if data[0] > 16:
|
||||
return (None, None)
|
||||
return (data[0], decoded)
|
||||
|
||||
|
||||
def encode(hrp, witver, witprog):
|
||||
"""Encode a segwit address."""
|
||||
spec = Encoding.BECH32 if witver == 0 else Encoding.BECH32M
|
||||
ret = bech32_encode(hrp, [witver] + convertbits(witprog, 8, 5), spec)
|
||||
if decode(hrp, ret) == (None, None):
|
||||
return None
|
||||
return ret
|
158
bip-0352/bitcoin_utils.py
Normal file
158
bip-0352/bitcoin_utils.py
Normal file
@ -0,0 +1,158 @@
|
||||
import hashlib
|
||||
import struct
|
||||
from io import BytesIO
|
||||
from secp256k1 import ECKey
|
||||
from typing import Union
|
||||
|
||||
|
||||
def from_hex(hex_string):
|
||||
"""Deserialize from a hex string representation (e.g. from RPC)"""
|
||||
return BytesIO(bytes.fromhex(hex_string))
|
||||
|
||||
|
||||
def ser_uint32(u: int) -> bytes:
|
||||
return u.to_bytes(4, "big")
|
||||
|
||||
|
||||
def ser_uint256(u):
|
||||
return u.to_bytes(32, 'little')
|
||||
|
||||
|
||||
def deser_uint256(f):
|
||||
return int.from_bytes(f.read(32), 'little')
|
||||
|
||||
|
||||
def deser_txid(txid: str):
|
||||
# recall that txids are serialized little-endian, but displayed big-endian
|
||||
# this means when converting from a human readable hex txid, we need to first
|
||||
# reverse it before deserializing it
|
||||
dixt = "".join(map(str.__add__, txid[-2::-2], txid[-1::-2]))
|
||||
return bytes.fromhex(dixt)
|
||||
|
||||
|
||||
def deser_compact_size(f: BytesIO):
|
||||
view = f.getbuffer()
|
||||
nbytes = view.nbytes;
|
||||
view.release()
|
||||
if (nbytes == 0):
|
||||
return 0 # end of stream
|
||||
|
||||
nit = struct.unpack("<B", f.read(1))[0]
|
||||
if nit == 253:
|
||||
nit = struct.unpack("<H", f.read(2))[0]
|
||||
elif nit == 254:
|
||||
nit = struct.unpack("<I", f.read(4))[0]
|
||||
elif nit == 255:
|
||||
nit = struct.unpack("<Q", f.read(8))[0]
|
||||
return nit
|
||||
|
||||
|
||||
def deser_string(f: BytesIO):
|
||||
nit = deser_compact_size(f)
|
||||
return f.read(nit)
|
||||
|
||||
|
||||
def deser_string_vector(f: BytesIO):
|
||||
nit = deser_compact_size(f)
|
||||
r = []
|
||||
for _ in range(nit):
|
||||
t = deser_string(f)
|
||||
r.append(t)
|
||||
return r
|
||||
|
||||
|
||||
class COutPoint:
|
||||
__slots__ = ("hash", "n",)
|
||||
|
||||
def __init__(self, hash=b"", n=0,):
|
||||
self.hash = hash
|
||||
self.n = n
|
||||
|
||||
def serialize(self):
|
||||
r = b""
|
||||
r += self.hash
|
||||
r += struct.pack("<I", self.n)
|
||||
return r
|
||||
|
||||
def deserialize(self, f):
|
||||
self.hash = f.read(32)
|
||||
self.n = struct.unpack("<I", f.read(4))[0]
|
||||
|
||||
|
||||
class VinInfo:
|
||||
__slots__ = ("outpoint", "scriptSig", "txinwitness", "prevout", "private_key")
|
||||
|
||||
def __init__(self, outpoint=None, scriptSig=b"", txinwitness=None, prevout=b"", private_key=None):
|
||||
if outpoint is None:
|
||||
self.outpoint = COutPoint()
|
||||
else:
|
||||
self.outpoint = outpoint
|
||||
if txinwitness is None:
|
||||
self.txinwitness = CTxInWitness()
|
||||
else:
|
||||
self.txinwitness = txinwitness
|
||||
if private_key is None:
|
||||
self.private_key = ECKey()
|
||||
else:
|
||||
self.private_key = private_key
|
||||
self.scriptSig = scriptSig
|
||||
self.prevout = prevout
|
||||
|
||||
|
||||
class CScriptWitness:
|
||||
__slots__ = ("stack",)
|
||||
|
||||
def __init__(self):
|
||||
# stack is a vector of strings
|
||||
self.stack = []
|
||||
|
||||
def is_null(self):
|
||||
if self.stack:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class CTxInWitness:
|
||||
__slots__ = ("scriptWitness",)
|
||||
|
||||
def __init__(self):
|
||||
self.scriptWitness = CScriptWitness()
|
||||
|
||||
def deserialize(self, f: BytesIO):
|
||||
self.scriptWitness.stack = deser_string_vector(f)
|
||||
return self
|
||||
|
||||
def is_null(self):
|
||||
return self.scriptWitness.is_null()
|
||||
|
||||
|
||||
def hash160(s: Union[bytes, bytearray]) -> bytes:
|
||||
return hashlib.new("ripemd160", hashlib.sha256(s).digest()).digest()
|
||||
|
||||
|
||||
def is_p2tr(spk: bytes) -> bool:
|
||||
if len(spk) != 34:
|
||||
return False
|
||||
# OP_1 OP_PUSHBYTES_32 <32 bytes>
|
||||
return (spk[0] == 0x51) & (spk[1] == 0x20)
|
||||
|
||||
|
||||
def is_p2wpkh(spk: bytes) -> bool:
|
||||
if len(spk) != 22:
|
||||
return False
|
||||
# OP_0 OP_PUSHBYTES_20 <20 bytes>
|
||||
return (spk[0] == 0x00) & (spk[1] == 0x14)
|
||||
|
||||
|
||||
def is_p2sh(spk: bytes) -> bool:
|
||||
if len(spk) != 23:
|
||||
return False
|
||||
# OP_HASH160 OP_PUSHBYTES_20 <20 bytes> OP_EQUAL
|
||||
return (spk[0] == 0xA9) & (spk[1] == 0x14) & (spk[-1] == 0x87)
|
||||
|
||||
|
||||
def is_p2pkh(spk: bytes) -> bool:
|
||||
if len(spk) != 25:
|
||||
return False
|
||||
# OP_DUP OP_HASH160 OP_PUSHBYTES_20 <20 bytes> OP_EQUALVERIFY OP_CHECKSIG
|
||||
return (spk[0] == 0x76) & (spk[1] == 0xA9) & (spk[2] == 0x14) & (spk[-2] == 0x88) & (spk[-1] == 0xAC)
|
335
bip-0352/reference.py
Executable file
335
bip-0352/reference.py
Executable file
@ -0,0 +1,335 @@
|
||||
#!/usr/bin/env python3
|
||||
# For running the test vectors, run this script:
|
||||
# ./reference.py send_and_receive_test_vectors.json
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
from typing import List, Tuple, Dict, cast
|
||||
from sys import argv, exit
|
||||
from functools import reduce
|
||||
from itertools import permutations
|
||||
|
||||
# local files
|
||||
from bech32m import convertbits, bech32_encode, decode, Encoding
|
||||
from secp256k1 import ECKey, ECPubKey, TaggedHash, NUMS_H
|
||||
from bitcoin_utils import (
|
||||
deser_txid,
|
||||
from_hex,
|
||||
hash160,
|
||||
is_p2pkh,
|
||||
is_p2sh,
|
||||
is_p2wpkh,
|
||||
is_p2tr,
|
||||
ser_uint32,
|
||||
COutPoint,
|
||||
CTxInWitness,
|
||||
VinInfo,
|
||||
)
|
||||
|
||||
|
||||
def get_pubkey_from_input(vin: VinInfo) -> ECPubKey:
|
||||
if is_p2pkh(vin.prevout):
|
||||
# skip the first 3 op_codes and grab the 20 byte hash
|
||||
# from the scriptPubKey
|
||||
spk_hash = vin.prevout[3:3 + 20]
|
||||
for i in range(len(vin.scriptSig), 0, -1):
|
||||
if i - 33 >= 0:
|
||||
# starting from the back, we move over the scriptSig with a 33 byte
|
||||
# window (to match a compressed pubkey). we hash this and check if it matches
|
||||
# the 20 byte has from the scriptPubKey. for standard scriptSigs, this will match
|
||||
# right away because the pubkey is the last item in the scriptSig.
|
||||
# if its a non-standard (malleated) scriptSig, we will still find the pubkey if its
|
||||
# a compressed pubkey.
|
||||
#
|
||||
# note: this is an incredibly inefficient implementation, for demonstration purposes only.
|
||||
pubkey_bytes = vin.scriptSig[i - 33:i]
|
||||
pubkey_hash = hash160(pubkey_bytes)
|
||||
if pubkey_hash == spk_hash:
|
||||
pubkey = ECPubKey().set(pubkey_bytes)
|
||||
if (pubkey.valid) & (pubkey.compressed):
|
||||
return pubkey
|
||||
if is_p2sh(vin.prevout):
|
||||
redeem_script = vin.scriptSig[1:]
|
||||
if is_p2wpkh(redeem_script):
|
||||
pubkey = ECPubKey().set(vin.txinwitness.scriptWitness.stack[-1])
|
||||
if (pubkey.valid) & (pubkey.compressed):
|
||||
return pubkey
|
||||
if is_p2wpkh(vin.prevout):
|
||||
txin = vin.txinwitness
|
||||
pubkey = ECPubKey().set(txin.scriptWitness.stack[-1])
|
||||
if (pubkey.valid) & (pubkey.compressed):
|
||||
return pubkey
|
||||
if is_p2tr(vin.prevout):
|
||||
witnessStack = vin.txinwitness.scriptWitness.stack
|
||||
if (len(witnessStack) >= 1):
|
||||
if (len(witnessStack) > 1 and witnessStack[-1][0] == 0x50):
|
||||
# Last item is annex
|
||||
witnessStack.pop()
|
||||
|
||||
if (len(witnessStack) > 1):
|
||||
# Script-path spend
|
||||
control_block = witnessStack[-1]
|
||||
# control block is <control byte> <32 byte internal key> and 0 or more <32 byte hash>
|
||||
internal_key = control_block[1:33]
|
||||
if (internal_key == NUMS_H.to_bytes(32, 'big')):
|
||||
# Skip if NUMS_H
|
||||
return ECPubKey()
|
||||
|
||||
pubkey = ECPubKey().set(vin.prevout[2:])
|
||||
if (pubkey.valid) & (pubkey.compressed):
|
||||
return pubkey
|
||||
|
||||
|
||||
return ECPubKey()
|
||||
|
||||
|
||||
def get_input_hash(outpoints: List[COutPoint], sum_input_pubkeys: ECPubKey) -> bytes:
|
||||
lowest_outpoint = sorted(outpoints, key=lambda outpoint: outpoint.serialize())[0]
|
||||
return TaggedHash("BIP0352/Inputs", lowest_outpoint.serialize() + cast(bytes, sum_input_pubkeys.get_bytes(False)))
|
||||
|
||||
|
||||
|
||||
def encode_silent_payment_address(B_scan: ECPubKey, B_m: ECPubKey, hrp: str = "tsp", version: int = 0) -> str:
|
||||
data = convertbits(cast(bytes, B_scan.get_bytes(False)) + cast(bytes, B_m.get_bytes(False)), 8, 5)
|
||||
return bech32_encode(hrp, [version] + cast(List[int], data), Encoding.BECH32M)
|
||||
|
||||
|
||||
def generate_label(b_scan: ECKey, m: int) -> bytes:
|
||||
return TaggedHash("BIP0352/Label", b_scan.get_bytes() + ser_uint32(m))
|
||||
|
||||
|
||||
def create_labeled_silent_payment_address(b_scan: ECKey, B_spend: ECPubKey, m: int, hrp: str = "tsp", version: int = 0) -> str:
|
||||
G = ECKey().set(1).get_pubkey()
|
||||
B_scan = b_scan.get_pubkey()
|
||||
B_m = B_spend + generate_label(b_scan, m) * G
|
||||
labeled_address = encode_silent_payment_address(B_scan, B_m, hrp, version)
|
||||
|
||||
return labeled_address
|
||||
|
||||
|
||||
def decode_silent_payment_address(address: str, hrp: str = "tsp") -> Tuple[ECPubKey, ECPubKey]:
|
||||
_, data = decode(hrp, address)
|
||||
if data is None:
|
||||
return ECPubKey(), ECPubKey()
|
||||
B_scan = ECPubKey().set(data[:33])
|
||||
B_spend = ECPubKey().set(data[33:])
|
||||
|
||||
return B_scan, B_spend
|
||||
|
||||
|
||||
def create_outputs(input_priv_keys: List[Tuple[ECKey, bool]], input_hash: bytes, recipients: List[str], hrp="tsp") -> List[str]:
|
||||
G = ECKey().set(1).get_pubkey()
|
||||
negated_keys = []
|
||||
for key, is_xonly in input_priv_keys:
|
||||
k = ECKey().set(key.get_bytes())
|
||||
if is_xonly and k.get_pubkey().get_y() % 2 != 0:
|
||||
k.negate()
|
||||
negated_keys.append(k)
|
||||
|
||||
a_sum = sum(negated_keys)
|
||||
silent_payment_groups: Dict[ECPubKey, List[ECPubKey]] = {}
|
||||
for recipient in recipients:
|
||||
B_scan, B_m = decode_silent_payment_address(recipient, hrp=hrp)
|
||||
if B_scan in silent_payment_groups:
|
||||
silent_payment_groups[B_scan].append(B_m)
|
||||
else:
|
||||
silent_payment_groups[B_scan] = [B_m]
|
||||
|
||||
outputs = []
|
||||
for B_scan, B_m_values in silent_payment_groups.items():
|
||||
ecdh_shared_secret = input_hash * a_sum * B_scan
|
||||
k = 0
|
||||
for B_m in B_m_values:
|
||||
t_k = TaggedHash("BIP0352/SharedSecret", ecdh_shared_secret.get_bytes(False) + ser_uint32(k))
|
||||
P_km = B_m + t_k * G
|
||||
outputs.append(P_km.get_bytes().hex())
|
||||
k += 1
|
||||
|
||||
return list(set(outputs))
|
||||
|
||||
|
||||
def scanning(b_scan: ECKey, B_spend: ECPubKey, A_sum: ECPubKey, input_hash: bytes, outputs_to_check: List[ECPubKey], labels: Dict[str, str] = {}) -> List[Dict[str, str]]:
|
||||
G = ECKey().set(1).get_pubkey()
|
||||
ecdh_shared_secret = input_hash * b_scan * A_sum
|
||||
k = 0
|
||||
wallet = []
|
||||
while True:
|
||||
t_k = TaggedHash("BIP0352/SharedSecret", ecdh_shared_secret.get_bytes(False) + ser_uint32(k))
|
||||
P_k = B_spend + t_k * G
|
||||
for output in outputs_to_check:
|
||||
if P_k == output:
|
||||
wallet.append({"pub_key": P_k.get_bytes().hex(), "priv_key_tweak": t_k.hex()})
|
||||
outputs_to_check.remove(output)
|
||||
k += 1
|
||||
break
|
||||
elif labels:
|
||||
m_G_sub = output - P_k
|
||||
if m_G_sub.get_bytes(False).hex() in labels:
|
||||
P_km = P_k + m_G_sub
|
||||
wallet.append({
|
||||
"pub_key": P_km.get_bytes().hex(),
|
||||
"priv_key_tweak": (ECKey().set(t_k).add(
|
||||
bytes.fromhex(labels[m_G_sub.get_bytes(False).hex()])
|
||||
)).get_bytes().hex(),
|
||||
})
|
||||
outputs_to_check.remove(output)
|
||||
k += 1
|
||||
break
|
||||
else:
|
||||
output.negate()
|
||||
m_G_sub = output - P_k
|
||||
if m_G_sub.get_bytes(False).hex() in labels:
|
||||
P_km = P_k + m_G_sub
|
||||
wallet.append({
|
||||
"pub_key": P_km.get_bytes().hex(),
|
||||
"priv_key_tweak": (ECKey().set(t_k).add(
|
||||
bytes.fromhex(labels[m_G_sub.get_bytes(False).hex()])
|
||||
)).get_bytes().hex(),
|
||||
})
|
||||
outputs_to_check.remove(output)
|
||||
k += 1
|
||||
break
|
||||
else:
|
||||
break
|
||||
return wallet
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(argv) != 2 or argv[1] in ('-h', '--help'):
|
||||
print("Usage: ./reference.py send_and_receive_test_vectors.json")
|
||||
exit(0)
|
||||
|
||||
with open(argv[1], "r") as f:
|
||||
test_data = json.loads(f.read())
|
||||
|
||||
# G , needed for generating the labels "database"
|
||||
G = ECKey().set(1).get_pubkey()
|
||||
for case in test_data:
|
||||
print(case["comment"])
|
||||
# Test sending
|
||||
for sending_test in case["sending"]:
|
||||
given = sending_test["given"]
|
||||
expected = sending_test["expected"]
|
||||
|
||||
vins = [
|
||||
VinInfo(
|
||||
outpoint=COutPoint(hash=deser_txid(input["txid"]), n=input["vout"]),
|
||||
scriptSig=bytes.fromhex(input["scriptSig"]),
|
||||
txinwitness=CTxInWitness().deserialize(from_hex(input["txinwitness"])),
|
||||
prevout=bytes.fromhex(input["prevout"]["scriptPubKey"]["hex"]),
|
||||
private_key=ECKey().set(bytes.fromhex(input["private_key"])),
|
||||
)
|
||||
for input in given["vin"]
|
||||
]
|
||||
# Conver the tuples to lists so they can be easily compared to the json list of lists from the given test vectors
|
||||
input_priv_keys = []
|
||||
input_pub_keys = []
|
||||
for vin in vins:
|
||||
pubkey = get_pubkey_from_input(vin)
|
||||
if not pubkey.valid:
|
||||
continue
|
||||
input_priv_keys.append((
|
||||
vin.private_key,
|
||||
is_p2tr(vin.prevout),
|
||||
))
|
||||
input_pub_keys.append(pubkey)
|
||||
|
||||
sending_outputs = []
|
||||
if (len(input_pub_keys) > 0):
|
||||
A_sum = reduce(lambda x, y: x + y, input_pub_keys)
|
||||
input_hash = get_input_hash([vin.outpoint for vin in vins], A_sum)
|
||||
sending_outputs = create_outputs(input_priv_keys, input_hash, given["recipients"], hrp="sp")
|
||||
|
||||
# Note: order doesn't matter for creating/finding the outputs. However, different orderings of the recipient addresses
|
||||
# will produce different generated outputs if sending to multiple silent payment addresses belonging to the
|
||||
# same sender but with different labels. Because of this, expected["outputs"] contains all possible valid output sets,
|
||||
# based on all possible permutations of recipient address orderings. Must match exactly one of the possible output sets.
|
||||
assert(any(set(sending_outputs) == set(lst) for lst in expected["outputs"])), "Sending test failed"
|
||||
else:
|
||||
assert(sending_outputs == expected["outputs"][0] == []), "Sending test failed"
|
||||
|
||||
# Test receiving
|
||||
msg = hashlib.sha256(b"message").digest()
|
||||
aux = hashlib.sha256(b"random auxiliary data").digest()
|
||||
for receiving_test in case["receiving"]:
|
||||
given = receiving_test["given"]
|
||||
expected = receiving_test["expected"]
|
||||
outputs_to_check = [
|
||||
ECPubKey().set(bytes.fromhex(p)) for p in given["outputs"]
|
||||
]
|
||||
vins = [
|
||||
VinInfo(
|
||||
outpoint=COutPoint(hash=deser_txid(input["txid"]), n=input["vout"]),
|
||||
scriptSig=bytes.fromhex(input["scriptSig"]),
|
||||
txinwitness=CTxInWitness().deserialize(from_hex(input["txinwitness"])),
|
||||
prevout=bytes.fromhex(input["prevout"]["scriptPubKey"]["hex"]),
|
||||
)
|
||||
for input in given["vin"]
|
||||
]
|
||||
# Check that the given inputs for the receiving test match what was generated during the sending test
|
||||
receiving_addresses = []
|
||||
b_scan = ECKey().set(bytes.fromhex(given["key_material"]["scan_priv_key"]))
|
||||
b_spend = ECKey().set(
|
||||
bytes.fromhex(given["key_material"]["spend_priv_key"])
|
||||
)
|
||||
B_scan = b_scan.get_pubkey()
|
||||
B_spend = b_spend.get_pubkey()
|
||||
receiving_addresses.append(
|
||||
encode_silent_payment_address(B_scan, B_spend, hrp="sp")
|
||||
)
|
||||
if given["labels"]:
|
||||
for label in given["labels"]:
|
||||
receiving_addresses.append(
|
||||
create_labeled_silent_payment_address(
|
||||
b_scan, B_spend, m=label, hrp="sp"
|
||||
)
|
||||
)
|
||||
|
||||
# Check that the silent payment addresses match for the given BIP32 seed and labels dictionary
|
||||
assert (receiving_addresses == expected["addresses"]), "Receiving addresses don't match"
|
||||
input_pub_keys = []
|
||||
for vin in vins:
|
||||
pubkey = get_pubkey_from_input(vin)
|
||||
if not pubkey.valid:
|
||||
continue
|
||||
input_pub_keys.append(pubkey)
|
||||
|
||||
add_to_wallet = []
|
||||
if (len(input_pub_keys) > 0):
|
||||
A_sum = reduce(lambda x, y: x + y, input_pub_keys)
|
||||
input_hash = get_input_hash([vin.outpoint for vin in vins], A_sum)
|
||||
pre_computed_labels = {
|
||||
(generate_label(b_scan, label) * G).get_bytes(False).hex(): generate_label(b_scan, label).hex()
|
||||
for label in given["labels"]
|
||||
}
|
||||
add_to_wallet = scanning(
|
||||
b_scan=b_scan,
|
||||
B_spend=B_spend,
|
||||
A_sum=A_sum,
|
||||
input_hash=input_hash,
|
||||
outputs_to_check=outputs_to_check,
|
||||
labels=pre_computed_labels,
|
||||
)
|
||||
|
||||
# Check that the private key is correct for the found output public key
|
||||
for output in add_to_wallet:
|
||||
pub_key = ECPubKey().set(bytes.fromhex(output["pub_key"]))
|
||||
full_private_key = b_spend.add(bytes.fromhex(output["priv_key_tweak"]))
|
||||
if full_private_key.get_pubkey().get_y() % 2 != 0:
|
||||
full_private_key.negate()
|
||||
|
||||
sig = full_private_key.sign_schnorr(msg, aux)
|
||||
assert pub_key.verify_schnorr(sig, msg), f"Invalid signature for {pub_key}"
|
||||
output["signature"] = sig.hex()
|
||||
|
||||
# Note: order doesn't matter for creating/finding the outputs. However, different orderings of the recipient addresses
|
||||
# will produce different generated outputs if sending to multiple silent payment addresses belonging to the
|
||||
# same sender but with different labels. Because of this, expected["outputs"] contains all possible valid output sets,
|
||||
# based on all possible permutations of recipient address orderings. Must match exactly one of the possible found output
|
||||
# sets in expected["outputs"]
|
||||
generated_set = {frozenset(d.items()) for d in add_to_wallet}
|
||||
expected_set = {frozenset(d.items()) for d in expected["outputs"]}
|
||||
assert generated_set == expected_set, "Receive test failed"
|
||||
|
||||
|
||||
print("All tests passed")
|
696
bip-0352/secp256k1.py
Normal file
696
bip-0352/secp256k1.py
Normal file
@ -0,0 +1,696 @@
|
||||
# Copyright (c) 2019 Pieter Wuille
|
||||
# Distributed under the MIT software license, see the accompanying
|
||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
"""Test-only secp256k1 elliptic curve implementation
|
||||
|
||||
WARNING: This code is slow, uses bad randomness, does not properly protect
|
||||
keys, and is trivially vulnerable to side channel attacks. Do not use for
|
||||
anything but tests."""
|
||||
import random
|
||||
import hashlib
|
||||
import hmac
|
||||
|
||||
def TaggedHash(tag, data):
|
||||
ss = hashlib.sha256(tag.encode('utf-8')).digest()
|
||||
ss += ss
|
||||
ss += data
|
||||
return hashlib.sha256(ss).digest()
|
||||
|
||||
def modinv(a, n):
|
||||
"""Compute the modular inverse of a modulo n
|
||||
|
||||
See https://en.wikipedia.org/wiki/Extended_Euclidean_algorithm#Modular_integers.
|
||||
"""
|
||||
t1, t2 = 0, 1
|
||||
r1, r2 = n, a
|
||||
while r2 != 0:
|
||||
q = r1 // r2
|
||||
t1, t2 = t2, t1 - q * t2
|
||||
r1, r2 = r2, r1 - q * r2
|
||||
if r1 > 1:
|
||||
return None
|
||||
if t1 < 0:
|
||||
t1 += n
|
||||
return t1
|
||||
|
||||
def jacobi_symbol(n, k):
|
||||
"""Compute the Jacobi symbol of n modulo k
|
||||
|
||||
See http://en.wikipedia.org/wiki/Jacobi_symbol
|
||||
|
||||
For our application k is always prime, so this is the same as the Legendre symbol."""
|
||||
assert k > 0 and k & 1, "jacobi symbol is only defined for positive odd k"
|
||||
n %= k
|
||||
t = 0
|
||||
while n != 0:
|
||||
while n & 1 == 0:
|
||||
n >>= 1
|
||||
r = k & 7
|
||||
t ^= (r == 3 or r == 5)
|
||||
n, k = k, n
|
||||
t ^= (n & k & 3 == 3)
|
||||
n = n % k
|
||||
if k == 1:
|
||||
return -1 if t else 1
|
||||
return 0
|
||||
|
||||
def modsqrt(a, p):
|
||||
"""Compute the square root of a modulo p when p % 4 = 3.
|
||||
|
||||
The Tonelli-Shanks algorithm can be used. See https://en.wikipedia.org/wiki/Tonelli-Shanks_algorithm
|
||||
|
||||
Limiting this function to only work for p % 4 = 3 means we don't need to
|
||||
iterate through the loop. The highest n such that p - 1 = 2^n Q with Q odd
|
||||
is n = 1. Therefore Q = (p-1)/2 and sqrt = a^((Q+1)/2) = a^((p+1)/4)
|
||||
|
||||
secp256k1's is defined over field of size 2**256 - 2**32 - 977, which is 3 mod 4.
|
||||
"""
|
||||
if p % 4 != 3:
|
||||
raise NotImplementedError("modsqrt only implemented for p % 4 = 3")
|
||||
sqrt = pow(a, (p + 1)//4, p)
|
||||
if pow(sqrt, 2, p) == a % p:
|
||||
return sqrt
|
||||
return None
|
||||
|
||||
def int_or_bytes(s):
|
||||
"Convert 32-bytes to int while accepting also int and returning it as is."
|
||||
if isinstance(s, bytes):
|
||||
assert(len(s) == 32)
|
||||
s = int.from_bytes(s, 'big')
|
||||
elif not isinstance(s, int):
|
||||
raise TypeError
|
||||
return s
|
||||
|
||||
class EllipticCurve:
|
||||
def __init__(self, p, a, b):
|
||||
"""Initialize elliptic curve y^2 = x^3 + a*x + b over GF(p)."""
|
||||
self.p = p
|
||||
self.a = a % p
|
||||
self.b = b % p
|
||||
|
||||
def affine(self, p1):
|
||||
"""Convert a Jacobian point tuple p1 to affine form, or None if at infinity.
|
||||
|
||||
An affine point is represented as the Jacobian (x, y, 1)"""
|
||||
x1, y1, z1 = p1
|
||||
if z1 == 0:
|
||||
return None
|
||||
inv = modinv(z1, self.p)
|
||||
inv_2 = (inv**2) % self.p
|
||||
inv_3 = (inv_2 * inv) % self.p
|
||||
return ((inv_2 * x1) % self.p, (inv_3 * y1) % self.p, 1)
|
||||
|
||||
def has_even_y(self, p1):
|
||||
"""Whether the point p1 has an even Y coordinate when expressed in affine coordinates."""
|
||||
return not (p1[2] == 0 or self.affine(p1)[1] & 1)
|
||||
|
||||
def negate(self, p1):
|
||||
"""Negate a Jacobian point tuple p1."""
|
||||
x1, y1, z1 = p1
|
||||
return (x1, (self.p - y1) % self.p, z1)
|
||||
|
||||
def on_curve(self, p1):
|
||||
"""Determine whether a Jacobian tuple p is on the curve (and not infinity)"""
|
||||
x1, y1, z1 = p1
|
||||
z2 = pow(z1, 2, self.p)
|
||||
z4 = pow(z2, 2, self.p)
|
||||
return z1 != 0 and (pow(x1, 3, self.p) + self.a * x1 * z4 + self.b * z2 * z4 - pow(y1, 2, self.p)) % self.p == 0
|
||||
|
||||
def is_x_coord(self, x):
|
||||
"""Test whether x is a valid X coordinate on the curve."""
|
||||
x_3 = pow(x, 3, self.p)
|
||||
return jacobi_symbol(x_3 + self.a * x + self.b, self.p) != -1
|
||||
|
||||
def lift_x(self, x):
|
||||
"""Given an X coordinate on the curve, return a corresponding affine point."""
|
||||
x_3 = pow(x, 3, self.p)
|
||||
v = x_3 + self.a * x + self.b
|
||||
y = modsqrt(v, self.p)
|
||||
if y is None:
|
||||
return None
|
||||
return (x, y, 1)
|
||||
|
||||
def double(self, p1):
|
||||
"""Double a Jacobian tuple p1
|
||||
|
||||
See https://en.wikibooks.org/wiki/Cryptography/Prime_Curve/Jacobian_Coordinates - Point Doubling"""
|
||||
x1, y1, z1 = p1
|
||||
if z1 == 0:
|
||||
return (0, 1, 0)
|
||||
y1_2 = (y1**2) % self.p
|
||||
y1_4 = (y1_2**2) % self.p
|
||||
x1_2 = (x1**2) % self.p
|
||||
s = (4*x1*y1_2) % self.p
|
||||
m = 3*x1_2
|
||||
if self.a:
|
||||
m += self.a * pow(z1, 4, self.p)
|
||||
m = m % self.p
|
||||
x2 = (m**2 - 2*s) % self.p
|
||||
y2 = (m*(s - x2) - 8*y1_4) % self.p
|
||||
z2 = (2*y1*z1) % self.p
|
||||
return (x2, y2, z2)
|
||||
|
||||
def add_mixed(self, p1, p2):
|
||||
"""Add a Jacobian tuple p1 and an affine tuple p2
|
||||
|
||||
See https://en.wikibooks.org/wiki/Cryptography/Prime_Curve/Jacobian_Coordinates - Point Addition (with affine point)"""
|
||||
x1, y1, z1 = p1
|
||||
x2, y2, z2 = p2
|
||||
assert(z2 == 1)
|
||||
# Adding to the point at infinity is a no-op
|
||||
if z1 == 0:
|
||||
return p2
|
||||
z1_2 = (z1**2) % self.p
|
||||
z1_3 = (z1_2 * z1) % self.p
|
||||
u2 = (x2 * z1_2) % self.p
|
||||
s2 = (y2 * z1_3) % self.p
|
||||
if x1 == u2:
|
||||
if (y1 != s2):
|
||||
# p1 and p2 are inverses. Return the point at infinity.
|
||||
return (0, 1, 0)
|
||||
# p1 == p2. The formulas below fail when the two points are equal.
|
||||
return self.double(p1)
|
||||
h = u2 - x1
|
||||
r = s2 - y1
|
||||
h_2 = (h**2) % self.p
|
||||
h_3 = (h_2 * h) % self.p
|
||||
u1_h_2 = (x1 * h_2) % self.p
|
||||
x3 = (r**2 - h_3 - 2*u1_h_2) % self.p
|
||||
y3 = (r*(u1_h_2 - x3) - y1*h_3) % self.p
|
||||
z3 = (h*z1) % self.p
|
||||
return (x3, y3, z3)
|
||||
|
||||
def add(self, p1, p2):
|
||||
"""Add two Jacobian tuples p1 and p2
|
||||
|
||||
See https://en.wikibooks.org/wiki/Cryptography/Prime_Curve/Jacobian_Coordinates - Point Addition"""
|
||||
x1, y1, z1 = p1
|
||||
x2, y2, z2 = p2
|
||||
# Adding the point at infinity is a no-op
|
||||
if z1 == 0:
|
||||
return p2
|
||||
if z2 == 0:
|
||||
return p1
|
||||
# Adding an Affine to a Jacobian is more efficient since we save field multiplications and squarings when z = 1
|
||||
if z1 == 1:
|
||||
return self.add_mixed(p2, p1)
|
||||
if z2 == 1:
|
||||
return self.add_mixed(p1, p2)
|
||||
z1_2 = (z1**2) % self.p
|
||||
z1_3 = (z1_2 * z1) % self.p
|
||||
z2_2 = (z2**2) % self.p
|
||||
z2_3 = (z2_2 * z2) % self.p
|
||||
u1 = (x1 * z2_2) % self.p
|
||||
u2 = (x2 * z1_2) % self.p
|
||||
s1 = (y1 * z2_3) % self.p
|
||||
s2 = (y2 * z1_3) % self.p
|
||||
if u1 == u2:
|
||||
if (s1 != s2):
|
||||
# p1 and p2 are inverses. Return the point at infinity.
|
||||
return (0, 1, 0)
|
||||
# p1 == p2. The formulas below fail when the two points are equal.
|
||||
return self.double(p1)
|
||||
h = u2 - u1
|
||||
r = s2 - s1
|
||||
h_2 = (h**2) % self.p
|
||||
h_3 = (h_2 * h) % self.p
|
||||
u1_h_2 = (u1 * h_2) % self.p
|
||||
x3 = (r**2 - h_3 - 2*u1_h_2) % self.p
|
||||
y3 = (r*(u1_h_2 - x3) - s1*h_3) % self.p
|
||||
z3 = (h*z1*z2) % self.p
|
||||
return (x3, y3, z3)
|
||||
|
||||
def mul(self, ps):
|
||||
"""Compute a (multi) point multiplication
|
||||
|
||||
ps is a list of (Jacobian tuple, scalar) pairs.
|
||||
"""
|
||||
r = (0, 1, 0)
|
||||
for i in range(255, -1, -1):
|
||||
r = self.double(r)
|
||||
for (p, n) in ps:
|
||||
if ((n >> i) & 1):
|
||||
r = self.add(r, p)
|
||||
return r
|
||||
|
||||
SECP256K1_FIELD_SIZE = 2**256 - 2**32 - 977
|
||||
SECP256K1 = EllipticCurve(SECP256K1_FIELD_SIZE, 0, 7)
|
||||
SECP256K1_G = (0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798, 0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8, 1)
|
||||
SECP256K1_ORDER = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141
|
||||
SECP256K1_ORDER_HALF = SECP256K1_ORDER // 2
|
||||
NUMS_H = 0x50929b74c1a04954b78b4b6035e97a5e078a5a0f28ec96d547bfee9ace803ac0
|
||||
|
||||
class ECPubKey():
|
||||
"""A secp256k1 public key"""
|
||||
|
||||
def __init__(self):
|
||||
"""Construct an uninitialized public key"""
|
||||
self.valid = False
|
||||
|
||||
def __repr__(self):
|
||||
return self.get_bytes().hex()
|
||||
|
||||
def __eq__(self, other):
|
||||
assert isinstance(other, ECPubKey)
|
||||
return self.get_bytes() == other.get_bytes()
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.get_bytes())
|
||||
|
||||
def set(self, data):
|
||||
"""Construct a public key from a serialization in compressed or uncompressed DER format or BIP340 format"""
|
||||
if (len(data) == 65 and data[0] == 0x04):
|
||||
p = (int.from_bytes(data[1:33], 'big'), int.from_bytes(data[33:65], 'big'), 1)
|
||||
self.valid = SECP256K1.on_curve(p)
|
||||
if self.valid:
|
||||
self.p = p
|
||||
self.compressed = False
|
||||
elif (len(data) == 33 and (data[0] == 0x02 or data[0] == 0x03)):
|
||||
x = int.from_bytes(data[1:33], 'big')
|
||||
if SECP256K1.is_x_coord(x):
|
||||
p = SECP256K1.lift_x(x)
|
||||
# if the oddness of the y co-ord isn't correct, find the other
|
||||
# valid y
|
||||
if (p[1] & 1) != (data[0] & 1):
|
||||
p = SECP256K1.negate(p)
|
||||
self.p = p
|
||||
self.valid = True
|
||||
self.compressed = True
|
||||
else:
|
||||
self.valid = False
|
||||
elif (len(data) == 32):
|
||||
x = int.from_bytes(data[0:32], 'big')
|
||||
if SECP256K1.is_x_coord(x):
|
||||
p = SECP256K1.lift_x(x)
|
||||
# if the oddness of the y co-ord isn't correct, find the other
|
||||
# valid y
|
||||
if p[1]%2 != 0:
|
||||
p = SECP256K1.negate(p)
|
||||
self.p = p
|
||||
self.valid = True
|
||||
self.compressed = True
|
||||
else:
|
||||
self.valid = False
|
||||
else:
|
||||
self.valid = False
|
||||
return self
|
||||
|
||||
@property
|
||||
def is_compressed(self):
|
||||
return self.compressed
|
||||
|
||||
@property
|
||||
def is_valid(self):
|
||||
return self.valid
|
||||
|
||||
def get_y(self):
|
||||
return SECP256K1.affine(self.p)[1]
|
||||
|
||||
def get_x(self):
|
||||
return SECP256K1.affine(self.p)[0]
|
||||
|
||||
def get_bytes(self, bip340=True):
|
||||
assert(self.valid)
|
||||
p = SECP256K1.affine(self.p)
|
||||
if p is None:
|
||||
return None
|
||||
if bip340:
|
||||
return bytes(p[0].to_bytes(32, 'big'))
|
||||
elif self.compressed:
|
||||
return bytes([0x02 + (p[1] & 1)]) + p[0].to_bytes(32, 'big')
|
||||
else:
|
||||
return bytes([0x04]) + p[0].to_bytes(32, 'big') + p[1].to_bytes(32, 'big')
|
||||
|
||||
def verify_ecdsa(self, sig, msg, low_s=True):
|
||||
"""Verify a strictly DER-encoded ECDSA signature against this pubkey.
|
||||
|
||||
See https://en.wikipedia.org/wiki/Elliptic_Curve_Digital_Signature_Algorithm for the
|
||||
ECDSA verifier algorithm"""
|
||||
assert(self.valid)
|
||||
|
||||
# Extract r and s from the DER formatted signature. Return false for
|
||||
# any DER encoding errors.
|
||||
if (sig[1] + 2 != len(sig)):
|
||||
return False
|
||||
if (len(sig) < 4):
|
||||
return False
|
||||
if (sig[0] != 0x30):
|
||||
return False
|
||||
if (sig[2] != 0x02):
|
||||
return False
|
||||
rlen = sig[3]
|
||||
if (len(sig) < 6 + rlen):
|
||||
return False
|
||||
if rlen < 1 or rlen > 33:
|
||||
return False
|
||||
if sig[4] >= 0x80:
|
||||
return False
|
||||
if (rlen > 1 and (sig[4] == 0) and not (sig[5] & 0x80)):
|
||||
return False
|
||||
r = int.from_bytes(sig[4:4+rlen], 'big')
|
||||
if (sig[4+rlen] != 0x02):
|
||||
return False
|
||||
slen = sig[5+rlen]
|
||||
if slen < 1 or slen > 33:
|
||||
return False
|
||||
if (len(sig) != 6 + rlen + slen):
|
||||
return False
|
||||
if sig[6+rlen] >= 0x80:
|
||||
return False
|
||||
if (slen > 1 and (sig[6+rlen] == 0) and not (sig[7+rlen] & 0x80)):
|
||||
return False
|
||||
s = int.from_bytes(sig[6+rlen:6+rlen+slen], 'big')
|
||||
|
||||
# Verify that r and s are within the group order
|
||||
if r < 1 or s < 1 or r >= SECP256K1_ORDER or s >= SECP256K1_ORDER:
|
||||
return False
|
||||
if low_s and s >= SECP256K1_ORDER_HALF:
|
||||
return False
|
||||
z = int.from_bytes(msg, 'big')
|
||||
|
||||
# Run verifier algorithm on r, s
|
||||
w = modinv(s, SECP256K1_ORDER)
|
||||
u1 = z*w % SECP256K1_ORDER
|
||||
u2 = r*w % SECP256K1_ORDER
|
||||
R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, u1), (self.p, u2)]))
|
||||
if R is None or R[0] != r:
|
||||
return False
|
||||
return True
|
||||
|
||||
def verify_schnorr(self, sig, msg):
|
||||
assert(len(msg) == 32)
|
||||
assert(len(sig) == 64)
|
||||
assert(self.valid)
|
||||
r = int.from_bytes(sig[0:32], 'big')
|
||||
if r >= SECP256K1_FIELD_SIZE:
|
||||
return False
|
||||
s = int.from_bytes(sig[32:64], 'big')
|
||||
if s >= SECP256K1_ORDER:
|
||||
return False
|
||||
e = int.from_bytes(TaggedHash("BIP0340/challenge", sig[0:32] + self.get_bytes() + msg), 'big') % SECP256K1_ORDER
|
||||
R = SECP256K1.mul([(SECP256K1_G, s), (self.p, SECP256K1_ORDER - e)])
|
||||
if not SECP256K1.has_even_y(R):
|
||||
return False
|
||||
if ((r * R[2] * R[2]) % SECP256K1_FIELD_SIZE) != R[0]:
|
||||
return False
|
||||
return True
|
||||
|
||||
def __add__(self, other):
|
||||
"""Adds two ECPubKey points."""
|
||||
assert isinstance(other, ECPubKey)
|
||||
assert self.valid
|
||||
assert other.valid
|
||||
ret = ECPubKey()
|
||||
ret.p = SECP256K1.add(other.p, self.p)
|
||||
ret.valid = True
|
||||
ret.compressed = self.compressed
|
||||
return ret
|
||||
|
||||
def __radd__(self, other):
|
||||
"""Allows this ECPubKey to be added to 0 for sum()"""
|
||||
if other == 0:
|
||||
return self
|
||||
else:
|
||||
return self + other
|
||||
|
||||
def __mul__(self, other):
|
||||
"""Multiplies ECPubKey point with a scalar(int/32bytes/ECKey)."""
|
||||
if isinstance(other, ECKey):
|
||||
assert self.valid
|
||||
assert other.secret is not None
|
||||
multiplier = other.secret
|
||||
else:
|
||||
# int_or_bytes checks that other is `int` or `bytes`
|
||||
multiplier = int_or_bytes(other)
|
||||
|
||||
assert multiplier < SECP256K1_ORDER
|
||||
multiplier = multiplier % SECP256K1_ORDER
|
||||
ret = ECPubKey()
|
||||
ret.p = SECP256K1.mul([(self.p, multiplier)])
|
||||
ret.valid = True
|
||||
ret.compressed = self.compressed
|
||||
return ret
|
||||
|
||||
def __rmul__(self, other):
|
||||
"""Multiplies a scalar(int/32bytes/ECKey) with an ECPubKey point"""
|
||||
return self * other
|
||||
|
||||
def __sub__(self, other):
|
||||
"""Subtract one point from another"""
|
||||
assert isinstance(other, ECPubKey)
|
||||
assert self.valid
|
||||
assert other.valid
|
||||
ret = ECPubKey()
|
||||
ret.p = SECP256K1.add(self.p, SECP256K1.negate(other.p))
|
||||
ret.valid = True
|
||||
ret.compressed = self.compressed
|
||||
return ret
|
||||
|
||||
def tweak_add(self, tweak):
|
||||
assert(self.valid)
|
||||
t = int_or_bytes(tweak)
|
||||
if t >= SECP256K1_ORDER:
|
||||
return None
|
||||
tweaked = SECP256K1.affine(SECP256K1.mul([(self.p, 1), (SECP256K1_G, t)]))
|
||||
if tweaked is None:
|
||||
return None
|
||||
ret = ECPubKey()
|
||||
ret.p = tweaked
|
||||
ret.valid = True
|
||||
ret.compressed = self.compressed
|
||||
return ret
|
||||
|
||||
def mul(self, data):
|
||||
"""Multiplies ECPubKey point with scalar data."""
|
||||
assert self.valid
|
||||
other = ECKey()
|
||||
other.set(data, True)
|
||||
return self * other
|
||||
|
||||
def negate(self):
|
||||
self.p = SECP256K1.affine(SECP256K1.negate(self.p))
|
||||
|
||||
def rfc6979_nonce(key):
|
||||
"""Compute signing nonce using RFC6979."""
|
||||
v = bytes([1] * 32)
|
||||
k = bytes([0] * 32)
|
||||
k = hmac.new(k, v + b"\x00" + key, 'sha256').digest()
|
||||
v = hmac.new(k, v, 'sha256').digest()
|
||||
k = hmac.new(k, v + b"\x01" + key, 'sha256').digest()
|
||||
v = hmac.new(k, v, 'sha256').digest()
|
||||
return hmac.new(k, v, 'sha256').digest()
|
||||
|
||||
class ECKey():
|
||||
"""A secp256k1 private key"""
|
||||
|
||||
def __init__(self):
|
||||
self.valid = False
|
||||
|
||||
def __repr__(self):
|
||||
return str(self.secret)
|
||||
|
||||
def __eq__(self, other):
|
||||
assert isinstance(other, ECKey)
|
||||
return self.secret == other.secret
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.secret)
|
||||
|
||||
def set(self, secret, compressed=True):
|
||||
"""Construct a private key object from either 32-bytes or an int secret and a compressed flag."""
|
||||
secret = int_or_bytes(secret)
|
||||
|
||||
self.valid = (secret > 0 and secret < SECP256K1_ORDER)
|
||||
if self.valid:
|
||||
self.secret = secret
|
||||
self.compressed = compressed
|
||||
return self
|
||||
|
||||
def generate(self, compressed=True):
|
||||
"""Generate a random private key (compressed or uncompressed)."""
|
||||
self.set(random.randrange(1, SECP256K1_ORDER).to_bytes(32, 'big'), compressed)
|
||||
return self
|
||||
|
||||
def get_bytes(self):
|
||||
"""Retrieve the 32-byte representation of this key."""
|
||||
assert(self.valid)
|
||||
return self.secret.to_bytes(32, 'big')
|
||||
|
||||
def as_int(self):
|
||||
return self.secret
|
||||
|
||||
def from_int(self, secret, compressed=True):
|
||||
self.valid = (secret > 0 and secret < SECP256K1_ORDER)
|
||||
if self.valid:
|
||||
self.secret = secret
|
||||
self.compressed = compressed
|
||||
|
||||
def __add__(self, other):
|
||||
"""Add key secrets. Returns compressed key."""
|
||||
assert isinstance(other, ECKey)
|
||||
assert other.secret > 0 and other.secret < SECP256K1_ORDER
|
||||
assert self.valid is True
|
||||
ret_data = ((self.secret + other.secret) % SECP256K1_ORDER).to_bytes(32, 'big')
|
||||
ret = ECKey()
|
||||
ret.set(ret_data, True)
|
||||
return ret
|
||||
|
||||
def __radd__(self, other):
|
||||
"""Allows this ECKey to be added to 0 for sum()"""
|
||||
if other == 0:
|
||||
return self
|
||||
else:
|
||||
return self + other
|
||||
|
||||
def __sub__(self, other):
|
||||
"""Subtract key secrets. Returns compressed key."""
|
||||
assert isinstance(other, ECKey)
|
||||
assert other.secret > 0 and other.secret < SECP256K1_ORDER
|
||||
assert self.valid is True
|
||||
ret_data = ((self.secret - other.secret) % SECP256K1_ORDER).to_bytes(32, 'big')
|
||||
ret = ECKey()
|
||||
ret.set(ret_data, True)
|
||||
return ret
|
||||
|
||||
def __mul__(self, other):
|
||||
"""Multiply a private key by another private key or multiply a public key by a private key. Returns compressed key."""
|
||||
if isinstance(other, ECKey):
|
||||
assert other.secret > 0 and other.secret < SECP256K1_ORDER
|
||||
assert self.valid is True
|
||||
ret_data = ((self.secret * other.secret) % SECP256K1_ORDER).to_bytes(32, 'big')
|
||||
ret = ECKey()
|
||||
ret.set(ret_data, True)
|
||||
return ret
|
||||
elif isinstance(other, ECPubKey):
|
||||
return other * self
|
||||
else:
|
||||
# ECKey().set() checks that other is an `int` or `bytes`
|
||||
assert self.valid
|
||||
second = ECKey().set(other, self.compressed)
|
||||
return self * second
|
||||
|
||||
def __rmul__(self, other):
|
||||
return self * other
|
||||
|
||||
def add(self, data):
|
||||
"""Add key to scalar data. Returns compressed key."""
|
||||
other = ECKey()
|
||||
other.set(data, True)
|
||||
return self + other
|
||||
|
||||
def mul(self, data):
|
||||
"""Multiply key secret with scalar data. Returns compressed key."""
|
||||
other = ECKey()
|
||||
other.set(data, True)
|
||||
return self * other
|
||||
|
||||
def negate(self):
|
||||
"""Negate a private key."""
|
||||
assert self.valid
|
||||
self.secret = SECP256K1_ORDER - self.secret
|
||||
|
||||
@property
|
||||
def is_valid(self):
|
||||
return self.valid
|
||||
|
||||
@property
|
||||
def is_compressed(self):
|
||||
return self.compressed
|
||||
|
||||
def get_pubkey(self):
|
||||
"""Compute an ECPubKey object for this secret key."""
|
||||
assert(self.valid)
|
||||
ret = ECPubKey()
|
||||
p = SECP256K1.mul([(SECP256K1_G, self.secret)])
|
||||
ret.p = p
|
||||
ret.valid = True
|
||||
ret.compressed = self.compressed
|
||||
return ret
|
||||
|
||||
def sign_ecdsa(self, msg, low_s=True, rfc6979=False):
|
||||
"""Construct a DER-encoded ECDSA signature with this key.
|
||||
|
||||
See https://en.wikipedia.org/wiki/Elliptic_Curve_Digital_Signature_Algorithm for the
|
||||
ECDSA signer algorithm."""
|
||||
assert(self.valid)
|
||||
z = int.from_bytes(msg, 'big')
|
||||
# Note: no RFC6979 by default, but a simple random nonce (some tests rely on distinct transactions for the same operation)
|
||||
if rfc6979:
|
||||
k = int.from_bytes(rfc6979_nonce(self.secret.to_bytes(32, 'big') + msg), 'big')
|
||||
else:
|
||||
k = random.randrange(1, SECP256K1_ORDER)
|
||||
R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, k)]))
|
||||
r = R[0] % SECP256K1_ORDER
|
||||
s = (modinv(k, SECP256K1_ORDER) * (z + self.secret * r)) % SECP256K1_ORDER
|
||||
if low_s and s > SECP256K1_ORDER_HALF:
|
||||
s = SECP256K1_ORDER - s
|
||||
# Represent in DER format. The byte representations of r and s have
|
||||
# length rounded up (255 bits becomes 32 bytes and 256 bits becomes 33
|
||||
# bytes).
|
||||
rb = r.to_bytes((r.bit_length() + 8) // 8, 'big')
|
||||
sb = s.to_bytes((s.bit_length() + 8) // 8, 'big')
|
||||
return b'\x30' + bytes([4 + len(rb) + len(sb), 2, len(rb)]) + rb + bytes([2, len(sb)]) + sb
|
||||
|
||||
def sign_schnorr(self, msg, aux=None):
|
||||
"""Create a Schnorr signature (see BIP340)."""
|
||||
if aux is None:
|
||||
aux = bytes(32)
|
||||
|
||||
assert self.valid
|
||||
assert len(msg) == 32
|
||||
assert len(aux) == 32
|
||||
|
||||
t = (self.secret ^ int.from_bytes(TaggedHash("BIP0340/aux", aux), 'big')).to_bytes(32, 'big')
|
||||
kp = int.from_bytes(TaggedHash("BIP0340/nonce", t + self.get_pubkey().get_bytes() + msg), 'big') % SECP256K1_ORDER
|
||||
assert kp != 0
|
||||
R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, kp)]))
|
||||
k = kp if SECP256K1.has_even_y(R) else SECP256K1_ORDER - kp
|
||||
e = int.from_bytes(TaggedHash("BIP0340/challenge", R[0].to_bytes(32, 'big') + self.get_pubkey().get_bytes() + msg), 'big') % SECP256K1_ORDER
|
||||
return R[0].to_bytes(32, 'big') + ((k + e * self.secret) % SECP256K1_ORDER).to_bytes(32, 'big')
|
||||
|
||||
def tweak_add(self, tweak):
|
||||
"""Return a tweaked version of this private key."""
|
||||
assert(self.valid)
|
||||
t = int_or_bytes(tweak)
|
||||
if t >= SECP256K1_ORDER:
|
||||
return None
|
||||
tweaked = (self.secret + t) % SECP256K1_ORDER
|
||||
if tweaked == 0:
|
||||
return None
|
||||
ret = ECKey()
|
||||
ret.set(tweaked.to_bytes(32, 'big'), self.compressed)
|
||||
return ret
|
||||
|
||||
def generate_key_pair(secret=None, compressed=True):
|
||||
"""Convenience function to generate a private-public key pair."""
|
||||
d = ECKey()
|
||||
if secret:
|
||||
d.set(secret, compressed)
|
||||
else:
|
||||
d.generate(compressed)
|
||||
|
||||
P = d.get_pubkey()
|
||||
return d, P
|
||||
|
||||
def generate_bip340_key_pair():
|
||||
"""Convenience function to generate a BIP0340 private-public key pair."""
|
||||
d = ECKey()
|
||||
d.generate()
|
||||
P = d.get_pubkey()
|
||||
if P.get_y()%2 != 0:
|
||||
d.negate()
|
||||
P.negate()
|
||||
return d, P
|
||||
|
||||
def generate_schnorr_nonce():
|
||||
"""Generate a random valid BIP340 nonce.
|
||||
|
||||
See https://github.com/bitcoin/bips/blob/master/bip-0340.mediawiki.
|
||||
This implementation ensures the y-coordinate of the nonce point is even."""
|
||||
kp = random.randrange(1, SECP256K1_ORDER)
|
||||
assert kp != 0
|
||||
R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, kp)]))
|
||||
k = kp if R[1] % 2 == 0 else SECP256K1_ORDER - kp
|
||||
k_key = ECKey()
|
||||
k_key.set(k.to_bytes(32, 'big'), True)
|
||||
return k_key
|
2673
bip-0352/send_and_receive_test_vectors.json
Normal file
2673
bip-0352/send_and_receive_test_vectors.json
Normal file
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user