gossmap: add linear streaming interface.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell 2024-07-09 22:42:29 +09:30
parent ae5ad486ea
commit ba2bb5531d
2 changed files with 193 additions and 0 deletions

View file

@ -48,6 +48,9 @@ HTABLE_DEFINE_TYPE(ptrint_t, nodeidx_id, nodeid_hash, nodeidx_eq_id,
nodeidx_htable);
struct gossmap {
/* We updated this every time we reopen, so we know to update iterators! */
u64 generation;
/* The file descriptor and filename to monitor */
int fd;
/* NULL means we don't own fd */
@ -634,6 +637,7 @@ static bool reopen_store(struct gossmap *map, size_t ended_off)
close(map->fd);
map->fd = fd;
map->generation++;
return gossmap_refresh_mayfail(map, NULL);
}
@ -1043,6 +1047,7 @@ struct gossmap *gossmap_load(const tal_t *ctx, const char *filename,
size_t *num_channel_updates_rejected)
{
map = tal(ctx, struct gossmap);
map->generation = 0;
map->fname = tal_strdup(map, filename);
map->fd = open(map->fname, O_RDONLY);
if (map->fd < 0)
@ -1457,3 +1462,169 @@ size_t gossmap_lengths(const struct gossmap *map, size_t *total)
*total = map->map_size;
return map->map_end;
}
struct gossmap_iter {
u64 generation;
u64 offset;
};
/* For iterating the gossmap: returns iterator at start. */
struct gossmap_iter *gossmap_iter_new(const tal_t *ctx,
const struct gossmap *map)
{
struct gossmap_iter *iter = tal(ctx, struct gossmap_iter);
iter->generation = map->generation;
/* Skip version byte */
iter->offset = 1;
return iter;
}
/* Copy an existing iterator (same offset) */
struct gossmap_iter *gossmap_iter_dup(const tal_t *ctx,
const struct gossmap_iter *iter)
{
return tal_dup(ctx, struct gossmap_iter, iter);
}
/* FIXME: I tried returning a direct ptr into mmap where possible, but
* we would have to re-engineer packet paths to handle non-talarr msgs! */
const void *gossmap_stream_next(const tal_t *ctx,
const struct gossmap *map,
struct gossmap_iter *iter,
u32 *timestamp)
{
/* We grab hdr and type together. Beware alignment! */
struct hdr {
union {
struct gossip_hdr ghdr;
struct {
u8 raw_hdr[sizeof(struct gossip_hdr)];
be16 type;
} type;
} u;
};
struct hdr h;
/* If we have reopened (unlikely!), we need to restart iteration */
if (iter->generation != map->generation) {
iter->generation = map->generation;
/* Skip version byte */
iter->offset = 1;
}
while (iter->offset + sizeof(h.u.type) <= map->map_size) {
void *ret;
size_t len;
map_copy(map, iter->offset, &h, sizeof(h.u.type));
/* Make sure we can read entire record. */
len = be16_to_cpu(h.u.ghdr.len);
if (iter->offset + sizeof(h.u.ghdr) + len > map->map_size)
break;
/* Increase iterator now we're committed. */
iter->offset += sizeof(h.u.ghdr) + len;
/* Ignore deleted and dying */
if (be16_to_cpu(h.u.ghdr.flags) &
(GOSSIP_STORE_DELETED_BIT|GOSSIP_STORE_DYING_BIT))
continue;
/* Use a switch as insurance against addition of new public
* gossip messages! */
switch ((enum peer_wire)be16_to_cpu(h.u.type.type)) {
case WIRE_CHANNEL_ANNOUNCEMENT:
case WIRE_CHANNEL_UPDATE:
case WIRE_NODE_ANNOUNCEMENT:
ret = tal_arr(ctx, u8, len);
map_copy(map, iter->offset - len, ret, len);
if (timestamp)
*timestamp = be32_to_cpu(h.u.ghdr.timestamp);
return ret;
case WIRE_INIT:
case WIRE_ERROR:
case WIRE_WARNING:
case WIRE_PING:
case WIRE_PONG:
case WIRE_TX_ADD_INPUT:
case WIRE_TX_ADD_OUTPUT:
case WIRE_TX_REMOVE_INPUT:
case WIRE_TX_REMOVE_OUTPUT:
case WIRE_TX_COMPLETE:
case WIRE_TX_SIGNATURES:
case WIRE_TX_INIT_RBF:
case WIRE_TX_ACK_RBF:
case WIRE_TX_ABORT:
case WIRE_OPEN_CHANNEL:
case WIRE_ACCEPT_CHANNEL:
case WIRE_FUNDING_CREATED:
case WIRE_FUNDING_SIGNED:
case WIRE_CHANNEL_READY:
case WIRE_OPEN_CHANNEL2:
case WIRE_ACCEPT_CHANNEL2:
case WIRE_STFU:
case WIRE_SPLICE:
case WIRE_SPLICE_ACK:
case WIRE_SPLICE_LOCKED:
case WIRE_SHUTDOWN:
case WIRE_CLOSING_SIGNED:
case WIRE_UPDATE_ADD_HTLC:
case WIRE_UPDATE_FULFILL_HTLC:
case WIRE_UPDATE_FAIL_HTLC:
case WIRE_UPDATE_FAIL_MALFORMED_HTLC:
case WIRE_COMMITMENT_SIGNED:
case WIRE_REVOKE_AND_ACK:
case WIRE_UPDATE_FEE:
case WIRE_UPDATE_BLOCKHEIGHT:
case WIRE_CHANNEL_REESTABLISH:
case WIRE_PEER_STORAGE:
case WIRE_YOUR_PEER_STORAGE:
case WIRE_ANNOUNCEMENT_SIGNATURES:
case WIRE_QUERY_SHORT_CHANNEL_IDS:
case WIRE_REPLY_SHORT_CHANNEL_IDS_END:
case WIRE_QUERY_CHANNEL_RANGE:
case WIRE_REPLY_CHANNEL_RANGE:
case WIRE_GOSSIP_TIMESTAMP_FILTER:
case WIRE_ONION_MESSAGE:
break;
}
}
return NULL;
}
/* For fast-forwarding to the given timestamp */
void gossmap_iter_fast_forward(const struct gossmap *map,
struct gossmap_iter *iter,
u64 timestamp)
{
/* If we have reopened (unlikely!), we need to restart iteration */
if (iter->generation != map->generation) {
iter->generation = map->generation;
iter->offset = 1;
}
while (iter->offset + sizeof(struct gossip_hdr) <= map->map_size) {
struct gossip_hdr ghdr;
map_copy(map, iter->offset, &ghdr, sizeof(ghdr));
if (be32_to_cpu(ghdr.timestamp) >= timestamp)
break;
}
}
void gossmap_iter_end(const struct gossmap *map, struct gossmap_iter *iter)
{
if (iter->generation != map->generation)
iter->generation = map->generation;
iter->offset = map->map_end;
}
int gossmap_fd(const struct gossmap *map)
{
return map->fd;
}

View file

@ -258,6 +258,28 @@ struct gossmap_chan *gossmap_first_chan(const struct gossmap *map);
struct gossmap_chan *gossmap_next_chan(const struct gossmap *map,
struct gossmap_chan *prev);
/* For iterating the gossmap: returns iterator at start. */
struct gossmap_iter *gossmap_iter_new(const tal_t *ctx,
const struct gossmap *map);
/* Copy an existing iterator (same offset) */
struct gossmap_iter *gossmap_iter_dup(const tal_t *ctx,
const struct gossmap_iter *iter);
/* Get next message, and optional timestamp */
const void *gossmap_stream_next(const tal_t *ctx,
const struct gossmap *map,
struct gossmap_iter *iter,
u32 *timestamp);
/* For fast-forwarding to the given timestamp */
void gossmap_iter_fast_forward(const struct gossmap *map,
struct gossmap_iter *iter,
u64 timestamp);
/* Moves iterator to the end. */
void gossmap_iter_end(const struct gossmap *map, struct gossmap_iter *iter);
/* For debugging: returns length read, and total known length of file */
size_t gossmap_lengths(const struct gossmap *map, size_t *total);
/* Debugging: connectd wants to enumerate fds */
int gossmap_fd(const struct gossmap *map);
#endif /* LIGHTNING_COMMON_GOSSMAP_H */