improve clarity and up max ipv6 ASNs

This commit is contained in:
Baas 2022-04-10 11:28:50 +02:00
parent 87c7dcc60d
commit c457fb144c

View File

@ -10,12 +10,16 @@ import re
import sys
import dns.resolver
import collections
from typing import List, Dict, Union
NSEEDS=512
MAX_SEEDS_PER_ASN=2
MAX_SEEDS_PER_ASN = {
'ipv4': 2,
'ipv6': 10,
}
MIN_BLOCKS = 337600
MIN_BLOCKS = 730000
# These are hosts that have been observed to be behaving strangely (e.g.
# aggressively connecting to every node).
@ -40,9 +44,13 @@ PATTERN_AGENT = re.compile(
r"23.99"
r")")
def parseline(line):
def parseline(line: str) -> Union[dict, None]:
""" Parses a line from `seeds_main.txt` into a dictionary of details for that line.
or `None`, if the line could not be parsed.
"""
sline = line.split()
if len(sline) < 11:
# line too short to be valid, skip it.
return None
m = PATTERN_IPV4.match(sline[0])
sortkey = None
@ -107,25 +115,26 @@ def parseline(line):
'sortkey': sortkey,
}
def dedup(ips):
'''deduplicate by address,port'''
def dedup(ips: List[Dict]) -> List[Dict]:
""" Remove duplicates from `ips` where multiple ips share address and port. """
d = {}
for ip in ips:
d[ip['ip'],ip['port']] = ip
return list(d.values())
def filtermultiport(ips):
'''Filter out hosts with more nodes per IP'''
def filtermultiport(ips: List[Dict]) -> List[Dict]:
""" Filter out hosts with more nodes per IP"""
hist = collections.defaultdict(list)
for ip in ips:
hist[ip['sortkey']].append(ip)
return [value[0] for (key,value) in list(hist.items()) if len(value)==1]
def lookup_asn(net, ip):
'''
Look up the asn for an IP (4 or 6) address by querying cymru.com, or None
if it could not be found.
'''
def lookup_asn(net: str, ip: str) -> Union[int, None]:
""" Look up the asn for an `ip` address by querying cymru.com
on network `net` (e.g. ipv4 or ipv6).
Returns in integer ASN or None if it could not be found.
"""
try:
if net == 'ipv4':
ipaddr = ip
@ -147,20 +156,33 @@ def lookup_asn(net, ip):
return None
# Based on Greg Maxwell's seed_filter.py
def filterbyasn(ips, max_per_asn, max_per_net):
def filterbyasn(ips: List[Dict], max_per_asn: Dict, max_per_net: int) -> List[Dict]:
""" Prunes `ips` by
(a) trimming ips to have at most `max_per_net` ips from each net (e.g. ipv4, ipv6); and
(b) trimming ips to have at most `max_per_asn` ips from each asn in each net.
"""
# Sift out ips by type
ips_ipv46 = [ip for ip in ips if ip['net'] in ['ipv4', 'ipv6']]
ips_onion = [ip for ip in ips if ip['net'] == 'onion']
# Filter IPv46 by ASN, and limit to max_per_net per network
result = []
net_count = collections.defaultdict(int)
asn_count = collections.defaultdict(int)
for ip in ips_ipv46:
net_count: Dict[str, int] = collections.defaultdict(int)
asn_count: Dict[int, int] = collections.defaultdict(int)
for i, ip in enumerate(ips_ipv46):
if i % 10 == 0:
# give progress update
print(f"{i:6d}/{len(ips_ipv46)} [{100*i/len(ips_ipv46):04.1f}%]\r", file=sys.stderr, end='', flush=True)
if net_count[ip['net']] == max_per_net:
# do not add this ip as we already too many
# ips from this network
continue
asn = lookup_asn(ip['net'], ip['ip'])
if asn is None or asn_count[asn] == max_per_asn:
if asn is None or asn_count[asn] == max_per_asn[ip['net']]:
# do not add this ip as we already have too many
# ips from this ASN on this network
continue
asn_count[asn] += 1
net_count[ip['net']] += 1
@ -170,35 +192,36 @@ def filterbyasn(ips, max_per_asn, max_per_net):
result.extend(ips_onion[0:max_per_net])
return result
def ip_stats(ips):
hist = collections.defaultdict(int)
def ip_stats(ips: List[Dict]) -> str:
""" Format and return pretty string from `ips`. """
hist: Dict[str, int] = collections.defaultdict(int)
for ip in ips:
if ip is not None:
hist[ip['net']] += 1
return '%6d %6d %6d' % (hist['ipv4'], hist['ipv6'], hist['onion'])
return f"{hist['ipv4']:6d} {hist['ipv6']:6d} {hist['onion']:6d}"
def main():
lines = sys.stdin.readlines()
ips = [parseline(line) for line in lines]
print('\x1b[7m IPv4 IPv6 Onion Pass \x1b[0m', file=sys.stderr)
print('%s Initial' % (ip_stats(ips)), file=sys.stderr)
print(f'{ip_stats(ips):s} Initial', file=sys.stderr)
# Skip entries with invalid address.
ips = [ip for ip in ips if ip is not None]
print('%s Skip entries with invalid address' % (ip_stats(ips)), file=sys.stderr)
print(f'{ip_stats(ips):s} Skip entries with invalid address', file=sys.stderr)
# Skip duplicates (in case multiple seeds files were concatenated)
ips = dedup(ips)
print('%s After removing duplicates' % (ip_stats(ips)), file=sys.stderr)
print(f'{ip_stats(ips):s} After removing duplicates', file=sys.stderr)
# Skip entries from suspicious hosts.
ips = [ip for ip in ips if ip['ip'] not in SUSPICIOUS_HOSTS]
print('%s Skip entries from suspicious hosts' % (ip_stats(ips)), file=sys.stderr)
print(f'{ip_stats(ips):s} Skip entries from suspicious hosts', file=sys.stderr)
# Enforce minimal number of blocks.
ips = [ip for ip in ips if ip['blocks'] >= MIN_BLOCKS]
print('%s Enforce minimal number of blocks' % (ip_stats(ips)), file=sys.stderr)
print(f'{ip_stats(ips):s} Enforce minimal number of blocks', file=sys.stderr)
# Require service bit 1.
ips = [ip for ip in ips if (ip['service'] & 1) == 1]
print('%s Require service bit 1' % (ip_stats(ips)), file=sys.stderr)
print(f'{ip_stats(ips):s} Require service bit 1', file=sys.stderr)
# Require at least 50% 30-day uptime for clearnet, 10% for onion.
req_uptime = {
'ipv4': 50,
@ -206,18 +229,18 @@ def main():
'onion': 10,
}
ips = [ip for ip in ips if ip['uptime'] > req_uptime[ip['net']]]
print('%s Require minimum uptime' % (ip_stats(ips)), file=sys.stderr)
print(f'{ip_stats(ips):s} Require minimum uptime', file=sys.stderr)
# Require a known and recent user agent.
ips = [ip for ip in ips if PATTERN_AGENT.match(ip['agent'])]
print('%s Require a known and recent user agent' % (ip_stats(ips)), file=sys.stderr)
print(f'{ip_stats(ips):s} Require a known and recent user agent', file=sys.stderr)
# Sort by availability (and use last success as tie breaker)
ips.sort(key=lambda x: (x['uptime'], x['lastsuccess'], x['ip']), reverse=True)
# Filter out hosts with multiple bitcoin ports, these are likely abusive
ips = filtermultiport(ips)
print('%s Filter out hosts with multiple bitcoin ports' % (ip_stats(ips)), file=sys.stderr)
print(f'{ip_stats(ips):s} Filter out hosts with multiple bitcoin ports', file=sys.stderr)
# Look up ASNs and limit results, both per ASN and globally.
ips = filterbyasn(ips, MAX_SEEDS_PER_ASN, NSEEDS)
print('%s Look up ASNs and limit results per ASN and per net' % (ip_stats(ips)), file=sys.stderr)
print(f'{ip_stats(ips):s} Look up ASNs and limit results per ASN and per net', file=sys.stderr)
# Sort the results by IP address (for deterministic output).
ips.sort(key=lambda x: (x['net'], x['sortkey']))
for ip in ips: