broadcast: don't keep payload pointer.

If we need the payload, pull it from the gossip store.

MCP results from 5 runs, min-max(mean +/- stddev):
	store_load_msec:30189-52561(39416.4+/-8.8e+03)
	vsz_kb:1812904
	store_rewrite_sec:21.390000-27.070000(23.596+/-2.4)
	listnodes_sec:1.120000-1.230000(1.176+/-0.044)
	listchannels_sec:38.900000-50.580000(44.716+/-3.9)
	routing_sec:45.080000-48.160000(46.814+/-1.1)
	peer_write_all_sec:58.780000-87.150000(72.278+/-9.7)

MCP notable changes from previous patch (>1 stddev):
	-vsz_kb:2288784
	+vsz_kb:1812904
	-store_rewrite_sec:38.060000-39.130000(38.426+/-0.39)
	+store_rewrite_sec:21.390000-27.070000(23.596+/-2.4)
	-listnodes_sec:0.750000-0.850000(0.794+/-0.042)
	+listnodes_sec:1.120000-1.230000(1.176+/-0.044)
	-listchannels_sec:30.740000-31.760000(31.096+/-0.35)
	+listchannels_sec:38.900000-50.580000(44.716+/-3.9)
	-routing_sec:29.600000-33.560000(30.472+/-1.5)
	+routing_sec:45.080000-48.160000(46.814+/-1.1)
	-peer_write_all_sec:49.220000-52.690000(50.892+/-1.3)
	+peer_write_all_sec:58.780000-87.150000(72.278+/-9.7)

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell 2019-04-10 17:01:29 +09:30 committed by neil saitug
parent da845b660b
commit 6b9069ee28
5 changed files with 90 additions and 70 deletions

View File

@ -13,57 +13,49 @@
#include <gossipd/gossip_store.h> #include <gossipd/gossip_store.h>
#include <wire/gen_peer_wire.h> #include <wire/gen_peer_wire.h>
struct queued_message { static void destroy_broadcast_state(struct broadcast_state *bstate)
struct broadcastable *bcast; {
uintmap_clear(&bstate->broadcasts);
}
/* Serialized payload */ struct broadcast_state *new_broadcast_state(struct routing_state *rstate,
const u8 *payload; struct gossip_store *gs)
};
struct broadcast_state *new_broadcast_state(struct routing_state *rstate)
{ {
struct broadcast_state *bstate = tal(rstate, struct broadcast_state); struct broadcast_state *bstate = tal(rstate, struct broadcast_state);
uintmap_init(&bstate->broadcasts); uintmap_init(&bstate->broadcasts);
bstate->count = 0; bstate->count = 0;
bstate->gs = gossip_store_new(rstate); bstate->gs = gs;
tal_add_destructor(bstate, destroy_broadcast_state);
return bstate; return bstate;
} }
void broadcast_del(struct broadcast_state *bstate, void broadcast_del(struct broadcast_state *bstate,
struct broadcastable *bcast) struct broadcastable *bcast)
{ {
const struct queued_message *q const struct broadcastable *b
= uintmap_del(&bstate->broadcasts, bcast->index); = uintmap_del(&bstate->broadcasts, bcast->index);
if (q != NULL) { if (b != NULL) {
assert(q->bcast == bcast); assert(b == bcast);
tal_free(q);
bstate->count--; bstate->count--;
broadcast_state_check(bstate, "broadcast_del"); broadcast_state_check(bstate, "broadcast_del");
bcast->index = 0; bcast->index = 0;
} }
} }
static struct queued_message *new_queued_message(struct broadcast_state *bstate, static void add_broadcast(struct broadcast_state *bstate,
const u8 *payload, struct broadcastable *bcast)
struct broadcastable *bcast)
{ {
struct queued_message *msg = tal(bstate, struct queued_message);
assert(payload);
assert(bcast); assert(bcast);
assert(bcast->index); assert(bcast->index);
msg->payload = payload; if (!uintmap_add(&bstate->broadcasts, bcast->index, bcast))
msg->bcast = bcast;
if (!uintmap_add(&bstate->broadcasts, bcast->index, msg))
abort(); abort();
bstate->count++; bstate->count++;
return msg;
} }
void insert_broadcast_nostore(struct broadcast_state *bstate, void insert_broadcast_nostore(struct broadcast_state *bstate,
const u8 *msg,
struct broadcastable *bcast) struct broadcastable *bcast)
{ {
new_queued_message(bstate, msg, bcast); add_broadcast(bstate, bcast);
broadcast_state_check(bstate, "insert_broadcast"); broadcast_state_check(bstate, "insert_broadcast");
} }
@ -84,40 +76,38 @@ void insert_broadcast(struct broadcast_state **bstate,
assert(idx == bcast->index); assert(idx == bcast->index);
} }
insert_broadcast_nostore(*bstate, msg, bcast); insert_broadcast_nostore(*bstate, bcast);
/* If it compacts, it replaces *bstate */ /* If it compacts, it replaces *bstate */
gossip_store_maybe_compact((*bstate)->gs, bstate); gossip_store_maybe_compact((*bstate)->gs, bstate);
} }
const u8 *pop_first_broadcast(struct broadcast_state *bstate, struct broadcastable *next_broadcast_raw(struct broadcast_state *bstate,
struct broadcastable **bcast) u32 *last_index)
{ {
u64 idx; struct broadcastable *b;
const u8 *msg; u64 idx = *last_index;
struct queued_message *q = uintmap_first(&bstate->broadcasts, &idx);
if (!q) b = uintmap_after(&bstate->broadcasts, &idx);
if (!b)
return NULL; return NULL;
/* Assert no overflow */
*bcast = q->bcast; *last_index = idx;
msg = q->payload; assert(*last_index == idx);
return b;
broadcast_del(bstate, *bcast);
return msg;
} }
const u8 *next_broadcast(struct broadcast_state *bstate, const u8 *next_broadcast(const tal_t *ctx,
struct broadcast_state *bstate,
u32 timestamp_min, u32 timestamp_max, u32 timestamp_min, u32 timestamp_max,
u32 *last_index) u32 *last_index)
{ {
struct queued_message *m; struct broadcastable *b;
u64 idx = *last_index;
while ((m = uintmap_after(&bstate->broadcasts, &idx)) != NULL) { while ((b = next_broadcast_raw(bstate, last_index)) != NULL) {
if (m->bcast->timestamp >= timestamp_min if (b->timestamp >= timestamp_min
&& m->bcast->timestamp <= timestamp_max) { && b->timestamp <= timestamp_max) {
*last_index = idx; return gossip_store_get(ctx, bstate->gs, b->index);
return m->payload;
} }
} }
return NULL; return NULL;

View File

@ -11,12 +11,6 @@
struct routing_state; struct routing_state;
struct broadcast_state {
UINTMAP(struct queued_message *) broadcasts;
size_t count;
struct gossip_store *gs;
};
/* This is nested inside a node, chan or half_chan; rewriting the store can /* This is nested inside a node, chan or half_chan; rewriting the store can
* cause it to change! */ * cause it to change! */
struct broadcastable { struct broadcastable {
@ -26,13 +20,19 @@ struct broadcastable {
u32 timestamp; u32 timestamp;
}; };
struct broadcast_state {
UINTMAP(struct broadcastable *) broadcasts;
size_t count;
struct gossip_store *gs;
};
static inline void broadcastable_init(struct broadcastable *bcast) static inline void broadcastable_init(struct broadcastable *bcast)
{ {
bcast->index = 0; bcast->index = 0;
} }
struct broadcast_state *new_broadcast_state(struct routing_state *rstate); struct broadcast_state *new_broadcast_state(struct routing_state *rstate,
struct gossip_store *gs);
/* Append a queued message for broadcast. Must be explicitly deleted. /* Append a queued message for broadcast. Must be explicitly deleted.
* Also adds it to the gossip store. */ * Also adds it to the gossip store. */
@ -42,27 +42,27 @@ void insert_broadcast(struct broadcast_state **bstate,
/* Add to broadcast, but not store: for gossip store compaction. */ /* Add to broadcast, but not store: for gossip store compaction. */
void insert_broadcast_nostore(struct broadcast_state *bstate, void insert_broadcast_nostore(struct broadcast_state *bstate,
const u8 *msg,
struct broadcastable *bcast); struct broadcastable *bcast);
/* Delete a broadcast: not usually needed, since destructor does it */ /* Delete a broadcast: not usually needed, since destructor does it */
void broadcast_del(struct broadcast_state *bstate, void broadcast_del(struct broadcast_state *bstate,
struct broadcastable *bcast); struct broadcastable *bcast);
/* Return the next broadcastable entry; doesn't load it. */
struct broadcastable *next_broadcast_raw(struct broadcast_state *bstate,
u32 *last_index);
/* Return the broadcast with index >= *last_index, timestamp >= min and <= max /* Return the broadcast with index >= *last_index, timestamp >= min and <= max
* and update *last_index. * and update *last_index.
* There's no broadcast with index 0. */ * There's no broadcast with index 0. */
const u8 *next_broadcast(struct broadcast_state *bstate, const u8 *next_broadcast(const tal_t *ctx,
struct broadcast_state *bstate,
u32 timestamp_min, u32 timestamp_max, u32 timestamp_min, u32 timestamp_max,
u32 *last_index); u32 *last_index);
/* index of last entry. */ /* index of last entry. */
u64 broadcast_final_index(const struct broadcast_state *bstate); u64 broadcast_final_index(const struct broadcast_state *bstate);
/* Return and remove first element: used by gossip_store_compact */
const u8 *pop_first_broadcast(struct broadcast_state *bstate,
struct broadcastable **bcast);
/* Returns b if all OK, otherwise aborts if abortstr non-NULL, otherwise returns /* Returns b if all OK, otherwise aborts if abortstr non-NULL, otherwise returns
* NULL. */ * NULL. */
struct broadcast_state *broadcast_state_check(struct broadcast_state *b, struct broadcast_state *broadcast_state_check(struct broadcast_state *b,

View File

@ -208,12 +208,12 @@ bool gossip_store_compact(struct gossip_store *gs,
{ {
size_t count = 0; size_t count = 0;
int fd; int fd;
const u8 *msg;
struct node *self; struct node *self;
u64 len = sizeof(gs->version); u64 len = sizeof(gs->version);
struct broadcastable *bcast; struct broadcastable *bcast;
struct broadcast_state *oldb = *bs; struct broadcast_state *oldb = *bs;
struct broadcast_state *newb; struct broadcast_state *newb;
u32 idx = 0;
if (gs->disable_compaction) if (gs->disable_compaction)
return false; return false;
@ -223,7 +223,7 @@ bool gossip_store_compact(struct gossip_store *gs,
"Compacting gossip_store with %zu entries, %zu of which are stale", "Compacting gossip_store with %zu entries, %zu of which are stale",
gs->count, gs->count - oldb->count); gs->count, gs->count - oldb->count);
newb = new_broadcast_state(gs->rstate); newb = new_broadcast_state(gs->rstate, gs);
fd = open(GOSSIP_STORE_TEMP_FILENAME, O_RDWR|O_APPEND|O_CREAT, 0600); fd = open(GOSSIP_STORE_TEMP_FILENAME, O_RDWR|O_APPEND|O_CREAT, 0600);
if (fd < 0) { if (fd < 0) {
@ -238,14 +238,45 @@ bool gossip_store_compact(struct gossip_store *gs,
goto unlink_disable; goto unlink_disable;
} }
while ((msg = pop_first_broadcast(oldb, &bcast)) != NULL) { /* Copy entries one at a time. */
while ((bcast = next_broadcast_raw(oldb, &idx)) != NULL) {
beint32_t belen, becsum;
u32 msglen;
u8 *msg;
/* FIXME: optimize both read and allocation */
if (lseek(gs->fd, bcast->index, SEEK_SET) < 0
|| read(gs->fd, &belen, sizeof(belen)) != sizeof(belen)
|| read(gs->fd, &becsum, sizeof(becsum)) != sizeof(becsum)) {
status_broken("Failed reading header from to gossip store @%u"
": %s",
bcast->index, strerror(errno));
goto unlink_disable;
}
msglen = be32_to_cpu(belen);
msg = tal_arr(tmpctx, u8, sizeof(belen) + sizeof(becsum) + msglen);
memcpy(msg, &belen, sizeof(belen));
memcpy(msg + sizeof(belen), &becsum, sizeof(becsum));
if (read(gs->fd, msg + sizeof(belen) + sizeof(becsum), msglen)
!= msglen) {
status_broken("Failed reading %u from to gossip store @%u"
": %s",
msglen, bcast->index, strerror(errno));
goto unlink_disable;
}
broadcast_del(oldb, bcast);
bcast->index = len; bcast->index = len;
insert_broadcast_nostore(newb, msg, bcast); insert_broadcast_nostore(newb, bcast);
if (!gossip_store_append(fd, gs->rstate, msg, &len)) {
if (write(fd, msg, msglen + sizeof(belen) + sizeof(becsum))
!= msglen + sizeof(belen) + sizeof(becsum)) {
status_broken("Failed writing to gossip store: %s", status_broken("Failed writing to gossip store: %s",
strerror(errno)); strerror(errno));
goto unlink_disable; goto unlink_disable;
} }
len += sizeof(belen) + sizeof(becsum) + msglen;
count++; count++;
} }
@ -266,8 +297,8 @@ bool gossip_store_compact(struct gossip_store *gs,
} }
status_trace( status_trace(
"Compaction completed: dropped %zu messages, new count %zu", "Compaction completed: dropped %zu messages, new count %zu, len %"PRIu64,
gs->count - count, count); gs->count - count, count, len);
gs->count = count; gs->count = count;
gs->len = len; gs->len = len;
close(gs->fd); close(gs->fd);
@ -359,7 +390,7 @@ const u8 *gossip_store_get(const tal_t *ctx,
msglen = be32_to_cpu(belen); msglen = be32_to_cpu(belen);
checksum = be32_to_cpu(becsum); checksum = be32_to_cpu(becsum);
msg = tal_arr(ctx, u8, msglen); msg = tal_arr(tmpctx, u8, msglen);
if (read(gs->fd, msg, msglen) != msglen) if (read(gs->fd, msg, msglen) != msglen)
status_failed(STATUS_FAIL_INTERNAL_ERROR, status_failed(STATUS_FAIL_INTERNAL_ERROR,
"gossip_store: can't read len %u offset %"PRIu64 "gossip_store: can't read len %u offset %"PRIu64

View File

@ -1127,13 +1127,13 @@ static void maybe_queue_gossip(struct peer *peer)
* only needs to keep an index; this returns the next gossip message * only needs to keep an index; this returns the next gossip message
* which is past the previous index and within the timestamp: it * which is past the previous index and within the timestamp: it
* also updates `broadcast_index`. */ * also updates `broadcast_index`. */
next = next_broadcast(peer->daemon->rstate->broadcasts, next = next_broadcast(NULL, peer->daemon->rstate->broadcasts,
peer->gossip_timestamp_min, peer->gossip_timestamp_min,
peer->gossip_timestamp_max, peer->gossip_timestamp_max,
&peer->broadcast_index); &peer->broadcast_index);
if (next) { if (next) {
queue_peer_msg(peer, next); queue_peer_msg(peer, take(next));
return; return;
} }

View File

@ -154,7 +154,7 @@ struct routing_state *new_routing_state(const tal_t *ctx,
{ {
struct routing_state *rstate = tal(ctx, struct routing_state); struct routing_state *rstate = tal(ctx, struct routing_state);
rstate->nodes = empty_node_map(rstate); rstate->nodes = empty_node_map(rstate);
rstate->broadcasts = new_broadcast_state(rstate); rstate->broadcasts = new_broadcast_state(rstate, gossip_store_new(rstate));
rstate->chainparams = chainparams; rstate->chainparams = chainparams;
rstate->local_id = *local_id; rstate->local_id = *local_id;
rstate->prune_timeout = prune_timeout; rstate->prune_timeout = prune_timeout;
@ -1919,7 +1919,6 @@ void memleak_remove_routing_tables(struct htable *memtable,
memleak_remove_htable(memtable, &rstate->nodes->raw); memleak_remove_htable(memtable, &rstate->nodes->raw);
memleak_remove_htable(memtable, &rstate->pending_node_map->raw); memleak_remove_htable(memtable, &rstate->pending_node_map->raw);
memleak_remove_uintmap(memtable, &rstate->broadcasts->broadcasts);
for (n = node_map_first(rstate->nodes, &nit); for (n = node_map_first(rstate->nodes, &nit);
n; n;