mirror of
https://github.com/ElementsProject/lightning.git
synced 2025-03-03 18:57:06 +01:00
gossipd: send per-peer daemons offsets into gossip store.
Instead of reading the store ourselves, we can just send them an offset. This saves gossipd a lot of work, putting it where it belongs (in the daemon responsible for the specific peer). MCP bench results: store_load_msec:28509-31001(29206.6+/-9.4e+02) vsz_kb:580004-580016(580006+/-4.8) store_rewrite_sec:11.640000-12.730000(11.908+/-0.41) listnodes_sec:1.790000-1.880000(1.83+/-0.032) listchannels_sec:21.180000-21.950000(21.476+/-0.27) routing_sec:2.210000-11.160000(7.126+/-3.1) peer_write_all_sec:36.270000-41.200000(38.168+/-1.9) Signficant savings in streaming gossip: -peer_write_all_sec:48.160000-51.480000(49.608+/-1.1) +peer_write_all_sec:35.780000-37.980000(36.43+/-0.81) Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
parent
0e37ac2433
commit
f5a218f9d1
12 changed files with 46 additions and 27 deletions
|
@ -47,6 +47,7 @@ CHANNELD_COMMON_OBJS := \
|
||||||
common/features.o \
|
common/features.o \
|
||||||
common/gen_status_wire.o \
|
common/gen_status_wire.o \
|
||||||
common/gen_peer_status_wire.o \
|
common/gen_peer_status_wire.o \
|
||||||
|
common/gossip_store.o \
|
||||||
common/htlc_state.o \
|
common/htlc_state.o \
|
||||||
common/htlc_tx.o \
|
common/htlc_tx.o \
|
||||||
common/htlc_wire.o \
|
common/htlc_wire.o \
|
||||||
|
|
|
@ -3068,7 +3068,8 @@ int main(int argc, char *argv[])
|
||||||
}
|
}
|
||||||
status_trace("Now dealing with deferred gossip %u",
|
status_trace("Now dealing with deferred gossip %u",
|
||||||
fromwire_peektype(msg));
|
fromwire_peektype(msg));
|
||||||
handle_gossip_msg(PEER_FD, &peer->cs, take(msg));
|
handle_gossip_msg(PEER_FD, GOSSIP_STORE_FD,
|
||||||
|
&peer->cs, take(msg));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3111,7 +3112,8 @@ int main(int argc, char *argv[])
|
||||||
fdpass_recv(GOSSIP_FD));
|
fdpass_recv(GOSSIP_FD));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
handle_gossip_msg(PEER_FD, &peer->cs, take(msg));
|
handle_gossip_msg(PEER_FD, GOSSIP_STORE_FD,
|
||||||
|
&peer->cs, take(msg));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -55,6 +55,7 @@ CLOSINGD_COMMON_OBJS := \
|
||||||
common/derive_basepoints.o \
|
common/derive_basepoints.o \
|
||||||
common/gen_peer_status_wire.o \
|
common/gen_peer_status_wire.o \
|
||||||
common/gen_status_wire.o \
|
common/gen_status_wire.o \
|
||||||
|
common/gossip_store.o \
|
||||||
common/htlc_wire.o \
|
common/htlc_wire.o \
|
||||||
common/key_derive.o \
|
common/key_derive.o \
|
||||||
common/memleak.o \
|
common/memleak.o \
|
||||||
|
|
|
@ -107,7 +107,8 @@ static u8 *closing_read_peer_msg(const tal_t *ctx,
|
||||||
fdpass_recv(GOSSIP_FD));
|
fdpass_recv(GOSSIP_FD));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
handle_gossip_msg(PEER_FD, cs, take(msg));
|
handle_gossip_msg(PEER_FD, GOSSIP_STORE_FD,
|
||||||
|
cs, take(msg));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (!handle_peer_gossip_or_error(PEER_FD, GOSSIP_FD,
|
if (!handle_peer_gossip_or_error(PEER_FD, GOSSIP_FD,
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
#include <common/crypto_sync.h>
|
#include <common/crypto_sync.h>
|
||||||
|
#include <common/gossip_store.h>
|
||||||
#include <common/peer_failed.h>
|
#include <common/peer_failed.h>
|
||||||
#include <common/read_peer_msg.h>
|
#include <common/read_peer_msg.h>
|
||||||
#include <common/status.h>
|
#include <common/status.h>
|
||||||
|
@ -81,11 +82,15 @@ bool is_wrong_channel(const u8 *msg, const struct channel_id *expected,
|
||||||
return !channel_id_eq(expected, actual);
|
return !channel_id_eq(expected, actual);
|
||||||
}
|
}
|
||||||
|
|
||||||
void handle_gossip_msg(int peer_fd, struct crypto_state *cs, const u8 *msg TAKES)
|
void handle_gossip_msg(int peer_fd, int gossip_store_fd,
|
||||||
|
struct crypto_state *cs, const u8 *msg TAKES)
|
||||||
{
|
{
|
||||||
u8 *gossip;
|
u8 *gossip;
|
||||||
|
u64 offset;
|
||||||
|
|
||||||
if (!fromwire_gossipd_send_gossip(tmpctx, msg, &gossip)) {
|
if (fromwire_gossipd_send_gossip_from_store(msg, &offset))
|
||||||
|
gossip = gossip_store_read(tmpctx, gossip_store_fd, offset);
|
||||||
|
else if (!fromwire_gossipd_send_gossip(tmpctx, msg, &gossip)) {
|
||||||
status_broken("Got bad message from gossipd: %s",
|
status_broken("Got bad message from gossipd: %s",
|
||||||
tal_hex(msg, msg));
|
tal_hex(msg, msg));
|
||||||
peer_failed_connection_lost();
|
peer_failed_connection_lost();
|
||||||
|
|
|
@ -71,7 +71,9 @@ bool handle_peer_gossip_or_error(int peer_fd, int gossip_fd, int gossip_store_fd
|
||||||
const u8 *msg TAKES);
|
const u8 *msg TAKES);
|
||||||
|
|
||||||
/* We got this message from gossipd: forward/quit as it asks. */
|
/* We got this message from gossipd: forward/quit as it asks. */
|
||||||
void handle_gossip_msg(int peer_fd, struct crypto_state *cs,
|
void handle_gossip_msg(int peer_fd,
|
||||||
|
int gossip_store_fd,
|
||||||
|
struct crypto_state *cs,
|
||||||
const u8 *msg TAKES);
|
const u8 *msg TAKES);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -104,17 +104,16 @@ struct broadcastable *next_broadcast_raw(struct broadcast_state *bstate,
|
||||||
return b;
|
return b;
|
||||||
}
|
}
|
||||||
|
|
||||||
const u8 *next_broadcast(const tal_t *ctx,
|
struct broadcastable *next_broadcast(struct broadcast_state *bstate,
|
||||||
struct broadcast_state *bstate,
|
u32 timestamp_min, u32 timestamp_max,
|
||||||
u32 timestamp_min, u32 timestamp_max,
|
u32 *last_index)
|
||||||
u32 *last_index)
|
|
||||||
{
|
{
|
||||||
struct broadcastable *b;
|
struct broadcastable *b;
|
||||||
|
|
||||||
while ((b = next_broadcast_raw(bstate, last_index)) != NULL) {
|
while ((b = next_broadcast_raw(bstate, last_index)) != NULL) {
|
||||||
if (b->timestamp >= timestamp_min
|
if (b->timestamp >= timestamp_min
|
||||||
&& b->timestamp <= timestamp_max) {
|
&& b->timestamp <= timestamp_max) {
|
||||||
return gossip_store_get(ctx, bstate->gs, b->index);
|
return b;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -176,13 +175,15 @@ struct broadcast_state *broadcast_state_check(struct broadcast_state *b,
|
||||||
u32 index = 0;
|
u32 index = 0;
|
||||||
u64 htlc_minimum_msat;
|
u64 htlc_minimum_msat;
|
||||||
struct pubkey_set pubkeys;
|
struct pubkey_set pubkeys;
|
||||||
|
struct broadcastable *bcast;
|
||||||
/* We actually only need a set, not a map. */
|
/* We actually only need a set, not a map. */
|
||||||
UINTMAP(u64 *) channels;
|
UINTMAP(u64 *) channels;
|
||||||
|
|
||||||
pubkey_set_init(&pubkeys);
|
pubkey_set_init(&pubkeys);
|
||||||
uintmap_init(&channels);
|
uintmap_init(&channels);
|
||||||
|
|
||||||
while ((msg = next_broadcast(b, 0, UINT32_MAX, &index)) != NULL) {
|
while ((bcast = next_broadcast_raw(b, &index)) != NULL) {
|
||||||
|
msg = gossip_store_get(b, b->gs, b->index);
|
||||||
if (fromwire_channel_announcement(tmpctx, msg, &sig, &sig, &sig,
|
if (fromwire_channel_announcement(tmpctx, msg, &sig, &sig, &sig,
|
||||||
&sig, &features, &chain_hash,
|
&sig, &features, &chain_hash,
|
||||||
&scid, &node_id_1, &node_id_2,
|
&scid, &node_id_1, &node_id_2,
|
||||||
|
|
|
@ -61,10 +61,9 @@ struct broadcastable *next_broadcast_raw(struct broadcast_state *bstate,
|
||||||
/* Return the broadcast with index >= *last_index, timestamp >= min and <= max
|
/* Return the broadcast with index >= *last_index, timestamp >= min and <= max
|
||||||
* and update *last_index.
|
* and update *last_index.
|
||||||
* There's no broadcast with index 0. */
|
* There's no broadcast with index 0. */
|
||||||
const u8 *next_broadcast(const tal_t *ctx,
|
struct broadcastable *next_broadcast(struct broadcast_state *bstate,
|
||||||
struct broadcast_state *bstate,
|
u32 timestamp_min, u32 timestamp_max,
|
||||||
u32 timestamp_min, u32 timestamp_max,
|
u32 *last_index);
|
||||||
u32 *last_index);
|
|
||||||
|
|
||||||
/* index of last entry. */
|
/* index of last entry. */
|
||||||
u64 broadcast_final_index(const struct broadcast_state *bstate);
|
u64 broadcast_final_index(const struct broadcast_state *bstate);
|
||||||
|
|
|
@ -8,11 +8,15 @@ gossipd_get_update_reply,3601
|
||||||
gossipd_get_update_reply,,len,u16
|
gossipd_get_update_reply,,len,u16
|
||||||
gossipd_get_update_reply,,update,len*u8
|
gossipd_get_update_reply,,update,len*u8
|
||||||
|
|
||||||
# Gossipd can tell channeld etc about gossip to fwd.
|
# Gossipd can tell channeld etc about raw messages to fwd.
|
||||||
gossipd_send_gossip,3502
|
gossipd_send_gossip,3502
|
||||||
gossipd_send_gossip,,len,u16
|
gossipd_send_gossip,,len,u16
|
||||||
gossipd_send_gossip,,gossip,len*u8
|
gossipd_send_gossip,,gossip,len*u8
|
||||||
|
|
||||||
|
# But usually gossipd just gives an offset into the gossip_store
|
||||||
|
gossipd_send_gossip_from_store,3506
|
||||||
|
gossipd_send_gossip_from_store,,offset,u64
|
||||||
|
|
||||||
# Both sides have seen the funding tx being locked, but we have not
|
# Both sides have seen the funding tx being locked, but we have not
|
||||||
# yet reached the announcement depth. So we add the channel locally so
|
# yet reached the announcement depth. So we add the channel locally so
|
||||||
# we (and peer) can update it already.
|
# we (and peer) can update it already.
|
||||||
|
|
|
|
@ -235,15 +235,15 @@ static void queue_peer_msg(struct peer *peer, const u8 *msg TAKES)
|
||||||
daemon_conn_send(peer->dc, take(send));
|
daemon_conn_send(peer->dc, take(send));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Load a message from the gossip_store, and queue to send. */
|
/*~ We have a shortcut for messages from the store: we send the offset, and
|
||||||
|
* the other daemon reads and sends, saving us much work. */
|
||||||
static void queue_peer_from_store(struct peer *peer,
|
static void queue_peer_from_store(struct peer *peer,
|
||||||
const struct broadcastable *bcast)
|
const struct broadcastable *bcast)
|
||||||
{
|
{
|
||||||
const u8 *msg;
|
const u8 *msg = towire_gossipd_send_gossip_from_store(NULL,
|
||||||
|
bcast->index);
|
||||||
|
|
||||||
msg = gossip_store_get(NULL, peer->daemon->rstate->broadcasts->gs,
|
daemon_conn_send(peer->dc, take(msg));
|
||||||
bcast->index);
|
|
||||||
queue_peer_msg(peer, take(msg));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This pokes daemon_conn, which calls dump_gossip: the NULL gossip_timer
|
/* This pokes daemon_conn, which calls dump_gossip: the NULL gossip_timer
|
||||||
|
@ -1214,7 +1214,7 @@ static void maybe_create_next_scid_reply(struct peer *peer)
|
||||||
/*~ If we're supposed to be sending gossip, do so now. */
|
/*~ If we're supposed to be sending gossip, do so now. */
|
||||||
static void maybe_queue_gossip(struct peer *peer)
|
static void maybe_queue_gossip(struct peer *peer)
|
||||||
{
|
{
|
||||||
const u8 *next;
|
struct broadcastable *next;
|
||||||
|
|
||||||
/* If the gossip timer is still running, don't send. */
|
/* If the gossip timer is still running, don't send. */
|
||||||
if (peer->gossip_timer)
|
if (peer->gossip_timer)
|
||||||
|
@ -1230,13 +1230,13 @@ static void maybe_queue_gossip(struct peer *peer)
|
||||||
* only needs to keep an index; this returns the next gossip message
|
* only needs to keep an index; this returns the next gossip message
|
||||||
* which is past the previous index and within the timestamp: it
|
* which is past the previous index and within the timestamp: it
|
||||||
* also updates `broadcast_index`. */
|
* also updates `broadcast_index`. */
|
||||||
next = next_broadcast(NULL, peer->daemon->rstate->broadcasts,
|
next = next_broadcast(peer->daemon->rstate->broadcasts,
|
||||||
peer->gossip_timestamp_min,
|
peer->gossip_timestamp_min,
|
||||||
peer->gossip_timestamp_max,
|
peer->gossip_timestamp_max,
|
||||||
&peer->broadcast_index);
|
&peer->broadcast_index);
|
||||||
|
|
||||||
if (next) {
|
if (next) {
|
||||||
queue_peer_msg(peer, take(next));
|
queue_peer_from_store(peer, next);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1686,6 +1686,7 @@ static struct io_plan *peer_msg_in(struct io_conn *conn,
|
||||||
case WIRE_GOSSIPD_GET_UPDATE_REPLY:
|
case WIRE_GOSSIPD_GET_UPDATE_REPLY:
|
||||||
case WIRE_GOSSIPD_SEND_GOSSIP:
|
case WIRE_GOSSIPD_SEND_GOSSIP:
|
||||||
case WIRE_GOSSIPD_NEW_STORE_FD:
|
case WIRE_GOSSIPD_NEW_STORE_FD:
|
||||||
|
case WIRE_GOSSIPD_SEND_GOSSIP_FROM_STORE:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -50,6 +50,7 @@ OPENINGD_COMMON_OBJS := \
|
||||||
common/funding_tx.o \
|
common/funding_tx.o \
|
||||||
common/gen_status_wire.o \
|
common/gen_status_wire.o \
|
||||||
common/gen_peer_status_wire.o \
|
common/gen_peer_status_wire.o \
|
||||||
|
common/gossip_store.o \
|
||||||
common/htlc_wire.o \
|
common/htlc_wire.o \
|
||||||
common/initial_channel.o \
|
common/initial_channel.o \
|
||||||
common/initial_commit_tx.o \
|
common/initial_commit_tx.o \
|
||||||
|
|
|
@ -383,7 +383,8 @@ static u8 *opening_negotiate_msg(const tal_t *ctx, struct state *state,
|
||||||
fdpass_recv(GOSSIP_FD));
|
fdpass_recv(GOSSIP_FD));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
handle_gossip_msg(PEER_FD, &state->cs, take(msg));
|
handle_gossip_msg(PEER_FD, GOSSIP_STORE_FD,
|
||||||
|
&state->cs, take(msg));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1347,7 +1348,7 @@ static void handle_gossip_in(struct state *state)
|
||||||
status_failed(STATUS_FAIL_GOSSIP_IO,
|
status_failed(STATUS_FAIL_GOSSIP_IO,
|
||||||
"Reading gossip: %s", strerror(errno));
|
"Reading gossip: %s", strerror(errno));
|
||||||
|
|
||||||
handle_gossip_msg(PEER_FD, &state->cs, take(msg));
|
handle_gossip_msg(PEER_FD, GOSSIP_STORE_FD, &state->cs, take(msg));
|
||||||
}
|
}
|
||||||
|
|
||||||
/*~ Is this message of type `error` with the special zero-id
|
/*~ Is this message of type `error` with the special zero-id
|
||||||
|
|
Loading…
Add table
Reference in a new issue