test: add support for all networks in CAddress in messages.py

Also removes TorV2 from messages.py
See https://github.com/bitcoin/bitcoin/pull/22050

Co-authored-by: Matthew Zipkin <pinheadmz@gmail.com>
This commit is contained in:
brunoerg 2023-03-21 10:53:22 -03:00 committed by Matthew Zipkin
parent bc4f6b13fe
commit 80f64a3d40
No known key found for this signature in database
GPG key ID: E7E2984B6289C93A
2 changed files with 58 additions and 6 deletions

View file

@ -27,6 +27,7 @@ import random
import socket import socket
import struct import struct
import time import time
import unittest
from test_framework.siphash import siphash256 from test_framework.siphash import siphash256
from test_framework.util import assert_equal from test_framework.util import assert_equal
@ -77,6 +78,10 @@ def sha256(s):
return hashlib.sha256(s).digest() return hashlib.sha256(s).digest()
def sha3(s):
return hashlib.sha3_256(s).digest()
def hash256(s): def hash256(s):
return sha256(sha256(s)) return sha256(sha256(s))
@ -229,16 +234,25 @@ class CAddress:
# see https://github.com/bitcoin/bips/blob/master/bip-0155.mediawiki # see https://github.com/bitcoin/bips/blob/master/bip-0155.mediawiki
NET_IPV4 = 1 NET_IPV4 = 1
NET_IPV6 = 2
NET_TORV3 = 4
NET_I2P = 5 NET_I2P = 5
NET_CJDNS = 6
ADDRV2_NET_NAME = { ADDRV2_NET_NAME = {
NET_IPV4: "IPv4", NET_IPV4: "IPv4",
NET_I2P: "I2P" NET_IPV6: "IPv6",
NET_TORV3: "TorV3",
NET_I2P: "I2P",
NET_CJDNS: "CJDNS"
} }
ADDRV2_ADDRESS_LENGTH = { ADDRV2_ADDRESS_LENGTH = {
NET_IPV4: 4, NET_IPV4: 4,
NET_I2P: 32 NET_IPV6: 16,
NET_TORV3: 32,
NET_I2P: 32,
NET_CJDNS: 16
} }
I2P_PAD = "====" I2P_PAD = "===="
@ -285,7 +299,7 @@ class CAddress:
self.nServices = deser_compact_size(f) self.nServices = deser_compact_size(f)
self.net = struct.unpack("B", f.read(1))[0] self.net = struct.unpack("B", f.read(1))[0]
assert self.net in (self.NET_IPV4, self.NET_I2P) assert self.net in self.ADDRV2_NET_NAME
address_length = deser_compact_size(f) address_length = deser_compact_size(f)
assert address_length == self.ADDRV2_ADDRESS_LENGTH[self.net] assert address_length == self.ADDRV2_ADDRESS_LENGTH[self.net]
@ -293,14 +307,25 @@ class CAddress:
addr_bytes = f.read(address_length) addr_bytes = f.read(address_length)
if self.net == self.NET_IPV4: if self.net == self.NET_IPV4:
self.ip = socket.inet_ntoa(addr_bytes) self.ip = socket.inet_ntoa(addr_bytes)
else: elif self.net == self.NET_IPV6:
self.ip = socket.inet_ntop(socket.AF_INET6, addr_bytes)
elif self.net == self.NET_TORV3:
prefix = b".onion checksum"
version = bytes([3])
checksum = sha3(prefix + addr_bytes + version)[:2]
self.ip = b32encode(addr_bytes + checksum + version).decode("ascii").lower() + ".onion"
elif self.net == self.NET_I2P:
self.ip = b32encode(addr_bytes)[0:-len(self.I2P_PAD)].decode("ascii").lower() + ".b32.i2p" self.ip = b32encode(addr_bytes)[0:-len(self.I2P_PAD)].decode("ascii").lower() + ".b32.i2p"
elif self.net == self.NET_CJDNS:
self.ip = socket.inet_ntop(socket.AF_INET6, addr_bytes)
else:
raise Exception(f"Address type not supported")
self.port = struct.unpack(">H", f.read(2))[0] self.port = struct.unpack(">H", f.read(2))[0]
def serialize_v2(self): def serialize_v2(self):
"""Serialize in addrv2 format (BIP155)""" """Serialize in addrv2 format (BIP155)"""
assert self.net in (self.NET_IPV4, self.NET_I2P) assert self.net in self.ADDRV2_NET_NAME
r = b"" r = b""
r += struct.pack("<I", self.time) r += struct.pack("<I", self.time)
r += ser_compact_size(self.nServices) r += ser_compact_size(self.nServices)
@ -308,10 +333,20 @@ class CAddress:
r += ser_compact_size(self.ADDRV2_ADDRESS_LENGTH[self.net]) r += ser_compact_size(self.ADDRV2_ADDRESS_LENGTH[self.net])
if self.net == self.NET_IPV4: if self.net == self.NET_IPV4:
r += socket.inet_aton(self.ip) r += socket.inet_aton(self.ip)
else: elif self.net == self.NET_IPV6:
r += socket.inet_pton(socket.AF_INET6, self.ip)
elif self.net == self.NET_TORV3:
sfx = ".onion"
assert self.ip.endswith(sfx)
r += b32decode(self.ip[0:-len(sfx)], True)[0:32]
elif self.net == self.NET_I2P:
sfx = ".b32.i2p" sfx = ".b32.i2p"
assert self.ip.endswith(sfx) assert self.ip.endswith(sfx)
r += b32decode(self.ip[0:-len(sfx)] + self.I2P_PAD, True) r += b32decode(self.ip[0:-len(sfx)] + self.I2P_PAD, True)
elif self.net == self.NET_CJDNS:
r += socket.inet_pton(socket.AF_INET6, self.ip)
else:
raise Exception(f"Address type not supported")
r += struct.pack(">H", self.port) r += struct.pack(">H", self.port)
return r return r
@ -1852,3 +1887,19 @@ class msg_sendtxrcncl:
def __repr__(self): def __repr__(self):
return "msg_sendtxrcncl(version=%lu, salt=%lu)" %\ return "msg_sendtxrcncl(version=%lu, salt=%lu)" %\
(self.version, self.salt) (self.version, self.salt)
class TestFrameworkScript(unittest.TestCase):
def test_addrv2_encode_decode(self):
def check_addrv2(ip, net):
addr = CAddress()
addr.net, addr.ip = net, ip
ser = addr.serialize_v2()
actual = CAddress()
actual.deserialize_v2(BytesIO(ser))
self.assertEqual(actual, addr)
check_addrv2("1.65.195.98", CAddress.NET_IPV4)
check_addrv2("2001:41f0::62:6974:636f:696e", CAddress.NET_IPV6)
check_addrv2("2bqghnldu6mcug4pikzprwhtjjnsyederctvci6klcwzepnjd46ikjyd.onion", CAddress.NET_TORV3)
check_addrv2("255fhcp6ajvftnyo7bwz3an3t4a4brhopm3bamyh2iu5r3gnr2rq.b32.i2p", CAddress.NET_I2P)
check_addrv2("fc32:17ea:e415:c3bf:9808:149d:b5a2:c9aa", CAddress.NET_CJDNS)

View file

@ -76,6 +76,7 @@ TEST_FRAMEWORK_MODULES = [
"blocktools", "blocktools",
"ellswift", "ellswift",
"key", "key",
"messages",
"muhash", "muhash",
"ripemd160", "ripemd160",
"script", "script",