#! /usr/bin/python3 from pyln.spec.bolt7 import (channel_announcement, channel_update, node_announcement) from pyln.proto import ShortChannelId, PublicKey from typing import Any, Dict, List, Optional import io import struct # These duplicate constants in lightning/common/gossip_store.h GOSSIP_STORE_VERSION = 9 GOSSIP_STORE_LEN_DELETED_BIT = 0x80000000 GOSSIP_STORE_LEN_PUSH_BIT = 0x40000000 GOSSIP_STORE_LEN_MASK = (~(GOSSIP_STORE_LEN_PUSH_BIT | GOSSIP_STORE_LEN_DELETED_BIT)) # These duplicate constants in lightning/gossipd/gossip_store_wiregen.h WIRE_GOSSIP_STORE_PRIVATE_CHANNEL = 4104 WIRE_GOSSIP_STORE_PRIVATE_UPDATE = 4102 WIRE_GOSSIP_STORE_DELETE_CHAN = 4103 WIRE_GOSSIP_STORE_ENDED = 4105 class GossipStoreHeader(object): def __init__(self, buf: bytes): length, self.crc, self.timestamp = struct.unpack('>III', buf) self.deleted = (length & GOSSIP_STORE_LEN_DELETED_BIT) != 0 self.length = (length & GOSSIP_STORE_LEN_MASK) class GossmapHalfchannel(object): """One direction of a GossmapChannel.""" def __init__(self, timestamp: int, cltv_expiry_delta: int, htlc_minimum_msat: int, htlc_maximum_msat: int, fee_base_msat: int, fee_proportional_millionths: int): self.timestamp: int = timestamp self.cltv_expiry_delta: int = cltv_expiry_delta self.htlc_minimum_msat: int = htlc_minimum_msat self.htlc_maximum_msat: Optional[int] = htlc_maximum_msat self.fee_base_msat: int = fee_base_msat self.fee_proportional_millionths: int = fee_proportional_millionths class GossmapChannel(object): """A channel: fields of channel_announcement are in .fields, optional updates are in .updates_fields, which can be None if there has been no channel update.""" def __init__(self, fields: Dict[str, Any], announce_offset: int, scid, node1_id: bytes, node2_id: bytes, is_private: bool): self.fields = fields self.announce_offset = announce_offset self.is_private = is_private self.scid = scid self.node1_id = node1_id self.node2_id = node2_id self.updates_fields: List[Optional[Dict[str, Any]]] = [None, None] self.updates_offset: List[Optional[int]] = [None, None] self.capacity = None # TODO: where do we get this? self.half_channels: List[GossmapHalfchannel] = [None, None] def update_channel(self, direction: int, fields: List[Optional[Dict[str, Any]]] = [None, None], off: List[Optional[int]] = [None, None]): self.updates_fields[direction] = fields self.updates_offset = off half = GossmapHalfchannel(fields['timestamp'], fields['cltv_expiry_delta'], fields['htlc_minimum_msat'], fields.get('htlc_maximum_msat', None), fields['fee_base_msat'], fields['fee_proportional_millionths']) self.half_channels[direction] = half def get_half_channel(self, direction: int): """ returns the GossmapHalfchannel if known by channel_update """ if not 0 <= direction <= 1: raise ValueError("direction can only be 0 or 1") return self.half_channels[direction] class GossmapNodeId(object): def __init__(self, buf: bytes): if len(buf) != 33 or (buf[0] != 2 and buf[0] != 3): raise ValueError("{} is not a valid node_id".format(buf.hex())) self.nodeid = buf def to_pubkey(self) -> PublicKey: return PublicKey(self.nodeid) def __eq__(self, other): if not isinstance(other, GossmapNodeId): return False return self.nodeid == other.nodeid def __hash__(self): return self.nodeid.__hash__() def __repr__(self): return "GossmapNodeId[0x{}]".format(self.nodeid.hex()) def from_str(self, s: str): if s.startswith('0x'): s = s[2:] if len(s) != 67: raise ValueError(f"{s} is not a valid hexstring of a node_id") return GossmapNodeId(bytes.fromhex(s)) class GossmapNode(object): """A node: fields of node_announcement are in .announce_fields, which can be None of there has been no node announcement. .channels is a list of the GossmapChannels attached to this node. """ def __init__(self, node_id: GossmapNodeId): self.announce_fields: Optional[Dict[str, Any]] = None self.announce_offset = None self.channels = [] self.node_id = node_id class Gossmap(object): """Class to represent the gossip map of the network""" def __init__(self, store_filename: str = "gossip_store"): self.store_filename = store_filename self.store_file = open(store_filename, "rb") self.store_buf = bytes() self.nodes: Dict[bytes, GossmapNode] = {} self.channels: Dict[ShortChannelId, GossmapChannel] = {} version = self.store_file.read(1) if version[0] != GOSSIP_STORE_VERSION: raise ValueError("Invalid gossip store version {}".format(version)) self.bytes_read = 1 self.refresh() def _new_channel(self, fields: Dict[str, Any], announce_offset: int, scid: ShortChannelId, node1_id: GossmapNodeId, node2_id: GossmapNodeId, is_private: bool): c = GossmapChannel(fields, announce_offset, scid, node1_id, node2_id, is_private) if node1_id not in self.nodes: self.nodes[node1_id] = GossmapNode(node1_id) if node2_id not in self.nodes: self.nodes[node2_id] = GossmapNode(node2_id) self.channels[scid] = c self.nodes[node1_id].channels.append(c) self.nodes[node2_id].channels.append(c) def _del_channel(self, scid: ShortChannelId): c = self.channels[scid] n1 = self.nodes[c.node1_id] n2 = self.nodes[c.node2_id] n1.channels.remove(c) n2.channels.remove(c) # Beware self-channels n1-n1! if len(n1.channels) == 0 and n1 != n2: del self.nodes[c.node1_id] if len(n2.channels): del self.nodes[c.node2_id] def add_channel(self, rec: bytes, off: int, is_private: bool): fields = channel_announcement.read(io.BytesIO(rec[2:]), {}) self._new_channel(fields, off, ShortChannelId.from_int(fields['short_channel_id']), GossmapNodeId(fields['node_id_1']), GossmapNodeId(fields['node_id_2']), is_private) def get_channel(self, short_channel_id: ShortChannelId): """ Resolves a channel by its short channel id """ if type(short_channel_id) == str: short_channel_id = ShortChannelId.from_str(short_channel_id) return self.channels.get(short_channel_id) def get_node(self, node_id: GossmapNodeId): """ Resolves a node by its public key node_id """ if type(node_id) == str: node_id = GossmapNodeId.from_str(node_id) return self.nodes.get(node_id) def update_channel(self, rec: bytes, off: int): fields = channel_update.read(io.BytesIO(rec[2:]), {}) direction = fields['channel_flags'] & 1 c = self.channels[ShortChannelId.from_int(fields['short_channel_id'])] c.update_channel(direction, fields, off) def add_node_announcement(self, rec: bytes, off: int): fields = node_announcement.read(io.BytesIO(rec[2:]), {}) node_id = GossmapNodeId(fields['node_id']) self.nodes[node_id].announce_fields = fields self.nodes[node_id].announce_offset = off def reopen_store(self): """FIXME: Implement!""" assert False def remove_channel_by_deletemsg(self, rec: bytes): scid, = struct.unpack(">Q", rec[2:]) # It might have already been deleted when we skipped it. if scid in self.channels: self._del_channel(scid) def _pull_bytes(self, length: int) -> bool: """Pull bytes from file into our internal buffer""" if len(self.store_buf) < length: self.store_buf += self.store_file.read(length - len(self.store_buf)) return len(self.store_buf) >= length def _read_record(self) -> Optional[bytes]: """If a whole record is not in the file, returns None. If deleted, returns empty.""" if not self._pull_bytes(12): return None hdr = GossipStoreHeader(self.store_buf[:12]) if not self._pull_bytes(12 + hdr.length): return None self.bytes_read += len(self.store_buf) ret = self.store_buf[12:] self.store_buf = bytes() if hdr.deleted: ret = bytes() return ret def refresh(self): """Catch up with any changes to the gossip store""" while True: off = self.bytes_read rec = self._read_record() # EOF? if rec is None: break # Deleted? if len(rec) == 0: continue rectype, = struct.unpack(">H", rec[:2]) if rectype == channel_announcement.number: self.add_channel(rec, off, False) elif rectype == WIRE_GOSSIP_STORE_PRIVATE_CHANNEL: self.add_channel(rec[2 + 8 + 2:], off + 2 + 8 + 2, True) elif rectype == channel_update.number: self.update_channel(rec, off) elif rectype == WIRE_GOSSIP_STORE_PRIVATE_UPDATE: self.update_channel(rec[2 + 2:], off + 2 + 2) elif rectype == WIRE_GOSSIP_STORE_DELETE_CHAN: self.remove_channel_by_deletemsg(rec) elif rectype == node_announcement.number: self.add_node_announcement(rec, off) elif rectype == WIRE_GOSSIP_STORE_ENDED: self.reopen_store() else: continue