From ba2bb5531da450febd0557ea570b7dea66b6c637 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Tue, 9 Jul 2024 22:42:29 +0930 Subject: [PATCH] gossmap: add linear streaming interface. Signed-off-by: Rusty Russell --- common/gossmap.c | 171 +++++++++++++++++++++++++++++++++++++++++++++++ common/gossmap.h | 22 ++++++ 2 files changed, 193 insertions(+) diff --git a/common/gossmap.c b/common/gossmap.c index fe2f420fd..76143018f 100644 --- a/common/gossmap.c +++ b/common/gossmap.c @@ -48,6 +48,9 @@ HTABLE_DEFINE_TYPE(ptrint_t, nodeidx_id, nodeid_hash, nodeidx_eq_id, nodeidx_htable); struct gossmap { + /* We updated this every time we reopen, so we know to update iterators! */ + u64 generation; + /* The file descriptor and filename to monitor */ int fd; /* NULL means we don't own fd */ @@ -634,6 +637,7 @@ static bool reopen_store(struct gossmap *map, size_t ended_off) close(map->fd); map->fd = fd; + map->generation++; return gossmap_refresh_mayfail(map, NULL); } @@ -1043,6 +1047,7 @@ struct gossmap *gossmap_load(const tal_t *ctx, const char *filename, size_t *num_channel_updates_rejected) { map = tal(ctx, struct gossmap); + map->generation = 0; map->fname = tal_strdup(map, filename); map->fd = open(map->fname, O_RDONLY); if (map->fd < 0) @@ -1457,3 +1462,169 @@ size_t gossmap_lengths(const struct gossmap *map, size_t *total) *total = map->map_size; return map->map_end; } + +struct gossmap_iter { + u64 generation; + u64 offset; +}; + +/* For iterating the gossmap: returns iterator at start. */ +struct gossmap_iter *gossmap_iter_new(const tal_t *ctx, + const struct gossmap *map) +{ + struct gossmap_iter *iter = tal(ctx, struct gossmap_iter); + iter->generation = map->generation; + /* Skip version byte */ + iter->offset = 1; + + return iter; +} + +/* Copy an existing iterator (same offset) */ +struct gossmap_iter *gossmap_iter_dup(const tal_t *ctx, + const struct gossmap_iter *iter) +{ + return tal_dup(ctx, struct gossmap_iter, iter); +} + +/* FIXME: I tried returning a direct ptr into mmap where possible, but + * we would have to re-engineer packet paths to handle non-talarr msgs! */ +const void *gossmap_stream_next(const tal_t *ctx, + const struct gossmap *map, + struct gossmap_iter *iter, + u32 *timestamp) +{ + /* We grab hdr and type together. Beware alignment! */ + struct hdr { + union { + struct gossip_hdr ghdr; + struct { + u8 raw_hdr[sizeof(struct gossip_hdr)]; + be16 type; + } type; + } u; + }; + struct hdr h; + + /* If we have reopened (unlikely!), we need to restart iteration */ + if (iter->generation != map->generation) { + iter->generation = map->generation; + /* Skip version byte */ + iter->offset = 1; + } + + while (iter->offset + sizeof(h.u.type) <= map->map_size) { + void *ret; + size_t len; + + map_copy(map, iter->offset, &h, sizeof(h.u.type)); + + /* Make sure we can read entire record. */ + len = be16_to_cpu(h.u.ghdr.len); + if (iter->offset + sizeof(h.u.ghdr) + len > map->map_size) + break; + + /* Increase iterator now we're committed. */ + iter->offset += sizeof(h.u.ghdr) + len; + + /* Ignore deleted and dying */ + if (be16_to_cpu(h.u.ghdr.flags) & + (GOSSIP_STORE_DELETED_BIT|GOSSIP_STORE_DYING_BIT)) + continue; + + /* Use a switch as insurance against addition of new public + * gossip messages! */ + switch ((enum peer_wire)be16_to_cpu(h.u.type.type)) { + case WIRE_CHANNEL_ANNOUNCEMENT: + case WIRE_CHANNEL_UPDATE: + case WIRE_NODE_ANNOUNCEMENT: + ret = tal_arr(ctx, u8, len); + map_copy(map, iter->offset - len, ret, len); + if (timestamp) + *timestamp = be32_to_cpu(h.u.ghdr.timestamp); + return ret; + + case WIRE_INIT: + case WIRE_ERROR: + case WIRE_WARNING: + case WIRE_PING: + case WIRE_PONG: + case WIRE_TX_ADD_INPUT: + case WIRE_TX_ADD_OUTPUT: + case WIRE_TX_REMOVE_INPUT: + case WIRE_TX_REMOVE_OUTPUT: + case WIRE_TX_COMPLETE: + case WIRE_TX_SIGNATURES: + case WIRE_TX_INIT_RBF: + case WIRE_TX_ACK_RBF: + case WIRE_TX_ABORT: + case WIRE_OPEN_CHANNEL: + case WIRE_ACCEPT_CHANNEL: + case WIRE_FUNDING_CREATED: + case WIRE_FUNDING_SIGNED: + case WIRE_CHANNEL_READY: + case WIRE_OPEN_CHANNEL2: + case WIRE_ACCEPT_CHANNEL2: + case WIRE_STFU: + case WIRE_SPLICE: + case WIRE_SPLICE_ACK: + case WIRE_SPLICE_LOCKED: + case WIRE_SHUTDOWN: + case WIRE_CLOSING_SIGNED: + case WIRE_UPDATE_ADD_HTLC: + case WIRE_UPDATE_FULFILL_HTLC: + case WIRE_UPDATE_FAIL_HTLC: + case WIRE_UPDATE_FAIL_MALFORMED_HTLC: + case WIRE_COMMITMENT_SIGNED: + case WIRE_REVOKE_AND_ACK: + case WIRE_UPDATE_FEE: + case WIRE_UPDATE_BLOCKHEIGHT: + case WIRE_CHANNEL_REESTABLISH: + case WIRE_PEER_STORAGE: + case WIRE_YOUR_PEER_STORAGE: + case WIRE_ANNOUNCEMENT_SIGNATURES: + case WIRE_QUERY_SHORT_CHANNEL_IDS: + case WIRE_REPLY_SHORT_CHANNEL_IDS_END: + case WIRE_QUERY_CHANNEL_RANGE: + case WIRE_REPLY_CHANNEL_RANGE: + case WIRE_GOSSIP_TIMESTAMP_FILTER: + case WIRE_ONION_MESSAGE: + break; + } + } + return NULL; +} + +/* For fast-forwarding to the given timestamp */ +void gossmap_iter_fast_forward(const struct gossmap *map, + struct gossmap_iter *iter, + u64 timestamp) +{ + /* If we have reopened (unlikely!), we need to restart iteration */ + if (iter->generation != map->generation) { + iter->generation = map->generation; + iter->offset = 1; + } + + while (iter->offset + sizeof(struct gossip_hdr) <= map->map_size) { + struct gossip_hdr ghdr; + + map_copy(map, iter->offset, &ghdr, sizeof(ghdr)); + + if (be32_to_cpu(ghdr.timestamp) >= timestamp) + break; + } +} + +void gossmap_iter_end(const struct gossmap *map, struct gossmap_iter *iter) +{ + if (iter->generation != map->generation) + iter->generation = map->generation; + + iter->offset = map->map_end; +} + +int gossmap_fd(const struct gossmap *map) +{ + return map->fd; +} diff --git a/common/gossmap.h b/common/gossmap.h index b42045337..1d46be9a7 100644 --- a/common/gossmap.h +++ b/common/gossmap.h @@ -258,6 +258,28 @@ struct gossmap_chan *gossmap_first_chan(const struct gossmap *map); struct gossmap_chan *gossmap_next_chan(const struct gossmap *map, struct gossmap_chan *prev); +/* For iterating the gossmap: returns iterator at start. */ +struct gossmap_iter *gossmap_iter_new(const tal_t *ctx, + const struct gossmap *map); +/* Copy an existing iterator (same offset) */ +struct gossmap_iter *gossmap_iter_dup(const tal_t *ctx, + const struct gossmap_iter *iter); + +/* Get next message, and optional timestamp */ +const void *gossmap_stream_next(const tal_t *ctx, + const struct gossmap *map, + struct gossmap_iter *iter, + u32 *timestamp); +/* For fast-forwarding to the given timestamp */ +void gossmap_iter_fast_forward(const struct gossmap *map, + struct gossmap_iter *iter, + u64 timestamp); +/* Moves iterator to the end. */ +void gossmap_iter_end(const struct gossmap *map, struct gossmap_iter *iter); + /* For debugging: returns length read, and total known length of file */ size_t gossmap_lengths(const struct gossmap *map, size_t *total); + +/* Debugging: connectd wants to enumerate fds */ +int gossmap_fd(const struct gossmap *map); #endif /* LIGHTNING_COMMON_GOSSMAP_H */