mirror of
https://github.com/ElementsProject/lightning.git
synced 2025-02-23 06:55:13 +01:00
gossipd: adjust peers' broadcast_offset when compacting store.
When we compact the store, we need to adjust the broadast index for peers so they know where they're up to. Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
parent
fdb42c3170
commit
261921dee2
10 changed files with 75 additions and 14 deletions
|
@ -19,12 +19,14 @@ static void destroy_broadcast_state(struct broadcast_state *bstate)
|
|||
}
|
||||
|
||||
struct broadcast_state *new_broadcast_state(struct routing_state *rstate,
|
||||
struct gossip_store *gs)
|
||||
struct gossip_store *gs,
|
||||
struct list_head *peers)
|
||||
{
|
||||
struct broadcast_state *bstate = tal(rstate, struct broadcast_state);
|
||||
uintmap_init(&bstate->broadcasts);
|
||||
bstate->count = 0;
|
||||
bstate->gs = gs;
|
||||
bstate->peers = peers;
|
||||
tal_add_destructor(bstate, destroy_broadcast_state);
|
||||
return bstate;
|
||||
}
|
||||
|
@ -63,6 +65,8 @@ void insert_broadcast(struct broadcast_state **bstate,
|
|||
const u8 *msg,
|
||||
struct broadcastable *bcast)
|
||||
{
|
||||
u32 offset;
|
||||
|
||||
/* If we're loading from the store, we already have index */
|
||||
if (!bcast->index) {
|
||||
u64 idx;
|
||||
|
@ -79,7 +83,9 @@ void insert_broadcast(struct broadcast_state **bstate,
|
|||
insert_broadcast_nostore(*bstate, bcast);
|
||||
|
||||
/* If it compacts, it replaces *bstate */
|
||||
gossip_store_maybe_compact((*bstate)->gs, bstate);
|
||||
gossip_store_maybe_compact((*bstate)->gs, bstate, &offset);
|
||||
if (offset)
|
||||
update_peers_broadcast_index((*bstate)->peers, offset);
|
||||
}
|
||||
|
||||
struct broadcastable *next_broadcast_raw(struct broadcast_state *bstate,
|
||||
|
|
|
@ -24,6 +24,7 @@ struct broadcast_state {
|
|||
UINTMAP(struct broadcastable *) broadcasts;
|
||||
size_t count;
|
||||
struct gossip_store *gs;
|
||||
struct list_head *peers;
|
||||
};
|
||||
|
||||
static inline void broadcastable_init(struct broadcastable *bcast)
|
||||
|
@ -32,7 +33,8 @@ static inline void broadcastable_init(struct broadcastable *bcast)
|
|||
}
|
||||
|
||||
struct broadcast_state *new_broadcast_state(struct routing_state *rstate,
|
||||
struct gossip_store *gs);
|
||||
struct gossip_store *gs,
|
||||
struct list_head *peers);
|
||||
|
||||
/* Append a queued message for broadcast. Must be explicitly deleted.
|
||||
* Also adds it to the gossip store. */
|
||||
|
@ -68,4 +70,6 @@ u64 broadcast_final_index(const struct broadcast_state *bstate);
|
|||
struct broadcast_state *broadcast_state_check(struct broadcast_state *b,
|
||||
const char *abortstr);
|
||||
|
||||
/* Callback for after we compacted the store */
|
||||
void update_peers_broadcast_index(struct list_head *peers, u32 offset);
|
||||
#endif /* LIGHTNING_GOSSIPD_BROADCAST_H */
|
||||
|
|
|
@ -253,9 +253,13 @@ static bool add_local_unnannounced(int in_fd, int out_fd,
|
|||
*
|
||||
* Creates a new file, writes all the updates from the `broadcast_state`, and
|
||||
* then atomically swaps the files.
|
||||
*
|
||||
* Returns the amount of shrinkage in @offset on success, otherwise @offset
|
||||
* is unchanged.
|
||||
*/
|
||||
bool gossip_store_compact(struct gossip_store *gs,
|
||||
struct broadcast_state **bs)
|
||||
struct broadcast_state **bs,
|
||||
u32 *offset)
|
||||
{
|
||||
size_t count = 0;
|
||||
int fd;
|
||||
|
@ -274,7 +278,7 @@ bool gossip_store_compact(struct gossip_store *gs,
|
|||
"Compacting gossip_store with %zu entries, %zu of which are stale",
|
||||
gs->count, gs->count - oldb->count);
|
||||
|
||||
newb = new_broadcast_state(gs->rstate, gs);
|
||||
newb = new_broadcast_state(gs->rstate, gs, oldb->peers);
|
||||
fd = open(GOSSIP_STORE_TEMP_FILENAME, O_RDWR|O_APPEND|O_CREAT, 0600);
|
||||
|
||||
if (fd < 0) {
|
||||
|
@ -349,6 +353,7 @@ bool gossip_store_compact(struct gossip_store *gs,
|
|||
"Compaction completed: dropped %zu messages, new count %zu, len %"PRIu64,
|
||||
gs->count - count, count, len);
|
||||
gs->count = count;
|
||||
*offset = gs->len - len;
|
||||
gs->len = len;
|
||||
close(gs->fd);
|
||||
gs->fd = fd;
|
||||
|
@ -368,8 +373,11 @@ disable:
|
|||
}
|
||||
|
||||
void gossip_store_maybe_compact(struct gossip_store *gs,
|
||||
struct broadcast_state **bs)
|
||||
struct broadcast_state **bs,
|
||||
u32 *offset)
|
||||
{
|
||||
*offset = 0;
|
||||
|
||||
/* Don't compact while loading! */
|
||||
if (!gs->writable)
|
||||
return;
|
||||
|
@ -378,7 +386,7 @@ void gossip_store_maybe_compact(struct gossip_store *gs,
|
|||
if (gs->count < (*bs)->count * 1.25)
|
||||
return;
|
||||
|
||||
gossip_store_compact(gs, bs);
|
||||
gossip_store_compact(gs, bs, offset);
|
||||
}
|
||||
|
||||
u64 gossip_store_add(struct gossip_store *gs, const u8 *gossip_msg)
|
||||
|
|
|
@ -51,12 +51,20 @@ const u8 *gossip_store_get(const tal_t *ctx,
|
|||
* If we need to compact the gossip store, do so.
|
||||
* @gs: the gossip store.
|
||||
* @bs: a pointer to the broadcast state: replaced if we compact it.
|
||||
* @offset: the change in the store, if any.
|
||||
*
|
||||
* If @offset is non-zero on return, caller must update peers.
|
||||
*/
|
||||
void gossip_store_maybe_compact(struct gossip_store *gs,
|
||||
struct broadcast_state **bs);
|
||||
struct broadcast_state **bs,
|
||||
u32 *offset);
|
||||
|
||||
|
||||
/* Expose for dev-compact-gossip-store to force compaction. */
|
||||
bool gossip_store_compact(struct gossip_store *gs,
|
||||
struct broadcast_state **bs);
|
||||
struct broadcast_state **bs,
|
||||
u32 *offset);
|
||||
|
||||
/* Callback for when gossip_store indexes move */
|
||||
|
||||
#endif /* LIGHTNING_GOSSIPD_GOSSIP_STORE_H */
|
||||
|
|
|
@ -695,6 +695,21 @@ static u8 *handle_gossip_timestamp_filter(struct peer *peer, const u8 *msg)
|
|||
return NULL;
|
||||
}
|
||||
|
||||
/*~ When we compact the gossip store, all the broadcast indexs move.
|
||||
* We simply offset everyone, which means in theory they could retransmit
|
||||
* some, but that's a lesser evil than skipping some. */
|
||||
void update_peers_broadcast_index(struct list_head *peers, u32 offset)
|
||||
{
|
||||
struct peer *peer;
|
||||
|
||||
list_for_each(peers, peer, list) {
|
||||
if (peer->broadcast_index < offset)
|
||||
peer->broadcast_index = 0;
|
||||
else
|
||||
peer->broadcast_index -= offset;
|
||||
}
|
||||
}
|
||||
|
||||
/*~ We can send multiple replies when the peer queries for all channels in
|
||||
* a given range of blocks; each one indicates the range of blocks it covers. */
|
||||
static void reply_channel_range(struct peer *peer,
|
||||
|
@ -1956,6 +1971,7 @@ static struct io_plan *gossip_init(struct io_conn *conn,
|
|||
chainparams_by_chainhash(&daemon->chain_hash),
|
||||
&daemon->id,
|
||||
update_channel_interval * 2,
|
||||
&daemon->peers,
|
||||
dev_gossip_time,
|
||||
dev_unknown_channel_satoshis);
|
||||
|
||||
|
@ -2565,8 +2581,15 @@ static struct io_plan *dev_compact_store(struct io_conn *conn,
|
|||
struct daemon *daemon,
|
||||
const u8 *msg)
|
||||
{
|
||||
u32 offset;
|
||||
bool done = gossip_store_compact(daemon->rstate->broadcasts->gs,
|
||||
&daemon->rstate->broadcasts);
|
||||
&daemon->rstate->broadcasts,
|
||||
&offset);
|
||||
|
||||
/* Peers keep an offset into where they are with gossip. */
|
||||
if (done)
|
||||
update_peers_broadcast_index(&daemon->peers, offset);
|
||||
|
||||
daemon_conn_send(daemon->master,
|
||||
take(towire_gossip_dev_compact_store_reply(NULL,
|
||||
done)));
|
||||
|
|
|
@ -181,12 +181,14 @@ struct routing_state *new_routing_state(const tal_t *ctx,
|
|||
const struct chainparams *chainparams,
|
||||
const struct node_id *local_id,
|
||||
u32 prune_timeout,
|
||||
struct list_head *peers,
|
||||
const u32 *dev_gossip_time,
|
||||
const struct amount_sat *dev_unknown_channel_satoshis)
|
||||
{
|
||||
struct routing_state *rstate = tal(ctx, struct routing_state);
|
||||
rstate->nodes = empty_node_map(rstate);
|
||||
rstate->broadcasts = new_broadcast_state(rstate, gossip_store_new(rstate));
|
||||
rstate->broadcasts
|
||||
= new_broadcast_state(rstate, gossip_store_new(rstate), peers);
|
||||
rstate->chainparams = chainparams;
|
||||
rstate->local_id = *local_id;
|
||||
rstate->prune_timeout = prune_timeout;
|
||||
|
|
|
@ -226,6 +226,7 @@ struct routing_state *new_routing_state(const tal_t *ctx,
|
|||
const struct chainparams *chainparams,
|
||||
const struct node_id *local_id,
|
||||
u32 prune_timeout,
|
||||
struct list_head *peers,
|
||||
const u32 *dev_gossip_time,
|
||||
const struct amount_sat *dev_unknown_channel_satoshis);
|
||||
|
||||
|
|
|
@ -97,6 +97,9 @@ u8 *towire_gossip_store_local_add_channel(const tal_t *ctx UNNEEDED, const u8 *l
|
|||
/* Generated stub for towire_gossip_store_node_announcement */
|
||||
u8 *towire_gossip_store_node_announcement(const tal_t *ctx UNNEEDED, const u8 *announcement UNNEEDED)
|
||||
{ fprintf(stderr, "towire_gossip_store_node_announcement called!\n"); abort(); }
|
||||
/* Generated stub for update_peers_broadcast_index */
|
||||
void update_peers_broadcast_index(struct list_head *peers UNNEEDED, u32 offset UNNEEDED)
|
||||
{ fprintf(stderr, "update_peers_broadcast_index called!\n"); abort(); }
|
||||
/* Generated stub for wire_type_name */
|
||||
const char *wire_type_name(int e UNNEEDED)
|
||||
{ fprintf(stderr, "wire_type_name called!\n"); abort(); }
|
||||
|
@ -214,7 +217,7 @@ int main(int argc, char *argv[])
|
|||
setup_tmpctx();
|
||||
|
||||
me = nodeid(0);
|
||||
rstate = new_routing_state(tmpctx, NULL, &me, 0, NULL, NULL);
|
||||
rstate = new_routing_state(tmpctx, NULL, &me, 0, NULL, NULL, NULL);
|
||||
opt_register_noarg("--perfme", opt_set_bool, &perfme,
|
||||
"Run perfme-start and perfme-stop around benchmark");
|
||||
|
||||
|
|
|
@ -86,6 +86,9 @@ u8 *towire_gossip_store_local_add_channel(const tal_t *ctx UNNEEDED, const u8 *l
|
|||
/* Generated stub for towire_gossip_store_node_announcement */
|
||||
u8 *towire_gossip_store_node_announcement(const tal_t *ctx UNNEEDED, const u8 *announcement UNNEEDED)
|
||||
{ fprintf(stderr, "towire_gossip_store_node_announcement called!\n"); abort(); }
|
||||
/* Generated stub for update_peers_broadcast_index */
|
||||
void update_peers_broadcast_index(struct list_head *peers UNNEEDED, u32 offset UNNEEDED)
|
||||
{ fprintf(stderr, "update_peers_broadcast_index called!\n"); abort(); }
|
||||
/* Generated stub for wire_type_name */
|
||||
const char *wire_type_name(int e UNNEEDED)
|
||||
{ fprintf(stderr, "wire_type_name called!\n"); abort(); }
|
||||
|
@ -171,7 +174,7 @@ int main(void)
|
|||
strlen("02cca6c5c966fcf61d121e3a70e03a1cd9eeeea024b26ea666ce974d43b242e636"),
|
||||
&d);
|
||||
|
||||
rstate = new_routing_state(tmpctx, NULL, &a, 0, NULL, NULL);
|
||||
rstate = new_routing_state(tmpctx, NULL, &a, 0, NULL, NULL, NULL);
|
||||
|
||||
/* [{'active': True, 'short_id': '6990:2:1/1', 'fee_per_kw': 10, 'delay': 5, 'message_flags': 0, 'channel_flags': 1, 'destination': '0230ad0e74ea03976b28fda587bb75bdd357a1938af4424156a18265167f5e40ae', 'source': '02ea622d5c8d6143f15ed3ce1d501dd0d3d09d3b1c83a44d0034949f8a9ab60f06', 'last_update': 1504064344}, */
|
||||
|
||||
|
|
|
@ -84,6 +84,9 @@ u8 *towire_gossip_store_local_add_channel(const tal_t *ctx UNNEEDED, const u8 *l
|
|||
/* Generated stub for towire_gossip_store_node_announcement */
|
||||
u8 *towire_gossip_store_node_announcement(const tal_t *ctx UNNEEDED, const u8 *announcement UNNEEDED)
|
||||
{ fprintf(stderr, "towire_gossip_store_node_announcement called!\n"); abort(); }
|
||||
/* Generated stub for update_peers_broadcast_index */
|
||||
void update_peers_broadcast_index(struct list_head *peers UNNEEDED, u32 offset UNNEEDED)
|
||||
{ fprintf(stderr, "update_peers_broadcast_index called!\n"); abort(); }
|
||||
/* Generated stub for wire_type_name */
|
||||
const char *wire_type_name(int e UNNEEDED)
|
||||
{ fprintf(stderr, "wire_type_name called!\n"); abort(); }
|
||||
|
@ -205,7 +208,7 @@ int main(void)
|
|||
|
||||
memset(&tmp, 'a', sizeof(tmp));
|
||||
node_id_from_privkey(&tmp, &a);
|
||||
rstate = new_routing_state(tmpctx, NULL, &a, 0, NULL, NULL);
|
||||
rstate = new_routing_state(tmpctx, NULL, &a, 0, NULL, NULL, NULL);
|
||||
|
||||
new_node(rstate, &a);
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue