connectd: serve gossip_store file for the peer.

We actually intercept the gossip_timestamp_filter, so the gossip_store
mechanism inside the per-peer daemon never kicks off for normal connections.

The gossipwith tool doesn't set OPT_GOSSIP_QUERIES, so it gets both, but
that only effects one place.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell 2022-01-08 23:58:29 +10:30
parent 1f608acd4e
commit 029d65cf2e
10 changed files with 496 additions and 160 deletions

View File

@ -44,7 +44,7 @@ void gossip_setup_timestamp_filter(struct per_peer_state *pps,
lseek(pps->gossip_store_fd, 1, SEEK_SET);
}
static bool timestamp_filter(const struct per_peer_state *pps, u32 timestamp)
static bool timestamp_filter(const struct gossip_state *gs, u32 timestamp)
{
/* BOLT #7:
*
@ -53,8 +53,8 @@ static bool timestamp_filter(const struct per_peer_state *pps, u32 timestamp)
* `timestamp_range`.
*/
/* Note that we turn first_timestamp & timestamp_range into an inclusive range */
return timestamp >= pps->gs->timestamp_min
&& timestamp <= pps->gs->timestamp_max;
return timestamp >= gs->timestamp_min
&& timestamp <= gs->timestamp_max;
}
/* Not all the data we expected was there: rewind file */
@ -71,8 +71,7 @@ static void failed_read(int fd, int len)
lseek(fd, -len, SEEK_CUR);
}
static void reopen_gossip_store(struct per_peer_state *pps,
const u8 *msg)
static void reopen_gossip_store(int *gossip_store_fd, const u8 *msg)
{
u64 equivalent_offset;
int newfd;
@ -93,53 +92,59 @@ static void reopen_gossip_store(struct per_peer_state *pps,
equivalent_offset);
lseek(newfd, equivalent_offset, SEEK_SET);
close(pps->gossip_store_fd);
pps->gossip_store_fd = newfd;
close(*gossip_store_fd);
*gossip_store_fd = newfd;
}
u8 *gossip_store_next(const tal_t *ctx, struct per_peer_state *pps)
u8 *gossip_store_iter(const tal_t *ctx,
int *gossip_store_fd,
struct gossip_state *gs,
struct gossip_rcvd_filter *grf,
size_t *off)
{
u8 *msg = NULL;
/* Don't read until we're initialized. */
if (!pps->gs)
return NULL;
while (!msg) {
struct gossip_hdr hdr;
u32 msglen, checksum, timestamp;
bool push;
int type, r;
r = read(pps->gossip_store_fd, &hdr, sizeof(hdr));
if (off)
r = pread(*gossip_store_fd, &hdr, sizeof(hdr), *off);
else
r = read(*gossip_store_fd, &hdr, sizeof(hdr));
if (r != sizeof(hdr)) {
/* We expect a 0 read here at EOF */
if (r != 0)
failed_read(pps->gossip_store_fd, r);
per_peer_state_reset_gossip_timer(pps);
if (r != 0 && off)
failed_read(*gossip_store_fd, r);
return NULL;
}
/* Skip any deleted entries. */
if (be32_to_cpu(hdr.len) & GOSSIP_STORE_LEN_DELETED_BIT) {
/* Skip over it. */
lseek(pps->gossip_store_fd,
be32_to_cpu(hdr.len) & GOSSIP_STORE_LEN_MASK,
SEEK_CUR);
continue;
}
msglen = be32_to_cpu(hdr.len);
push = (msglen & GOSSIP_STORE_LEN_PUSH_BIT);
msglen &= GOSSIP_STORE_LEN_MASK;
/* Skip any deleted entries. */
if (be32_to_cpu(hdr.len) & GOSSIP_STORE_LEN_DELETED_BIT) {
/* Skip over it. */
if (off)
*off += r + msglen;
else
lseek(*gossip_store_fd, msglen, SEEK_CUR);
continue;
}
checksum = be32_to_cpu(hdr.crc);
timestamp = be32_to_cpu(hdr.timestamp);
msg = tal_arr(ctx, u8, msglen);
r = read(pps->gossip_store_fd, msg, msglen);
if (off)
r = pread(*gossip_store_fd, msg, msglen, *off + r);
else
r = read(*gossip_store_fd, msg, msglen);
if (r != msglen) {
failed_read(pps->gossip_store_fd, r);
per_peer_state_reset_gossip_timer(pps);
if (!off)
failed_read(*gossip_store_fd, r);
return NULL;
}
@ -147,27 +152,74 @@ u8 *gossip_store_next(const tal_t *ctx, struct per_peer_state *pps)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"gossip_store: bad checksum offset %"
PRIi64": %s",
(s64)lseek(pps->gossip_store_fd,
off ? (s64)*off :
(s64)lseek(*gossip_store_fd,
0, SEEK_CUR) - msglen,
tal_hex(tmpctx, msg));
/* Definitely processing it now */
if (off)
*off += sizeof(hdr) + msglen;
/* Don't send back gossip they sent to us! */
if (gossip_rcvd_filter_del(pps->grf, msg)) {
if (gossip_rcvd_filter_del(grf, msg)) {
msg = tal_free(msg);
continue;
}
type = fromwire_peektype(msg);
if (type == WIRE_GOSSIP_STORE_ENDED)
reopen_gossip_store(pps, msg);
reopen_gossip_store(gossip_store_fd, msg);
/* Ignore gossipd internal messages. */
else if (type != WIRE_CHANNEL_ANNOUNCEMENT
&& type != WIRE_CHANNEL_UPDATE
&& type != WIRE_NODE_ANNOUNCEMENT)
msg = tal_free(msg);
else if (!push && !timestamp_filter(pps, timestamp))
else if (!push && !timestamp_filter(gs, timestamp))
msg = tal_free(msg);
}
return msg;
}
u8 *gossip_store_next(const tal_t *ctx, struct per_peer_state *pps)
{
u8 *msg;
/* Don't read until we're initialized. */
if (!pps->gs)
return NULL;
/* FIXME: We are only caller using off == NULL */
msg = gossip_store_iter(ctx, &pps->gossip_store_fd,
pps->gs, pps->grf, NULL);
if (!msg)
per_peer_state_reset_gossip_timer(pps);
return msg;
}
size_t find_gossip_store_end(int gossip_store_fd, size_t off)
{
/* We cheat and read first two bytes of message too. */
struct {
struct gossip_hdr hdr;
be16 type;
} buf;
int r;
while ((r = read(gossip_store_fd, &buf,
sizeof(buf.hdr) + sizeof(buf.type)))
== sizeof(buf.hdr) + sizeof(buf.type)) {
u32 msglen = be32_to_cpu(buf.hdr.len) & GOSSIP_STORE_LEN_MASK;
/* Don't swallow end marker! */
if (buf.type == CPU_TO_BE16(WIRE_GOSSIP_STORE_ENDED))
break;
off += sizeof(buf.hdr) + msglen;
lseek(gossip_store_fd, off, SEEK_SET);
}
return off;
}

View File

@ -6,6 +6,8 @@
#include <ccan/tal/tal.h>
struct per_peer_state;
struct gossip_state;
struct gossip_rcvd_filter;
/**
* gossip_store -- On-disk storage related information
@ -43,10 +45,29 @@ struct gossip_hdr {
*/
u8 *gossip_store_next(const tal_t *ctx, struct per_peer_state *pps);
/**
* Direct store accessor: loads gossip msg from store.
*
* Returns NULL if there are no more gossip msgs.
*/
u8 *gossip_store_iter(const tal_t *ctx,
int *gossip_store_fd,
struct gossip_state *gs,
struct gossip_rcvd_filter *grf,
size_t *off);
/**
* Sets up the tiemstamp filter once they told us to set it.(
*/
void gossip_setup_timestamp_filter(struct per_peer_state *pps,
u32 first_timestamp,
u32 timestamp_range);
/**
* Gossipd will be writing to this, and it's not atomic! Safest
* way to find the "end" is to walk through.
* @old_end: 1 if no previous end.
*/
size_t find_gossip_store_end(int gossip_store_fd, size_t old_end);
#endif /* LIGHTNING_COMMON_GOSSIP_STORE_H */

View File

@ -50,6 +50,7 @@ CONNECTD_COMMON_OBJS := \
common/ecdh_hsmd.o \
common/features.o \
common/status_wiregen.o \
common/gossip_store.o \
common/gossip_rcvd_filter.o \
common/key_derive.o \
common/memleak.o \
@ -71,6 +72,7 @@ CONNECTD_COMMON_OBJS := \
common/wireaddr.o \
common/wire_error.o \
gossipd/gossipd_wiregen.o \
gossipd/gossip_store_wiregen.o \
wire/onion$(EXP)_wiregen.o
lightningd/lightning_connectd: $(CONNECTD_OBJS) $(CONNECTD_COMMON_OBJS) $(BITCOIN_OBJS) $(WIRE_OBJS) $(HSMD_CLIENT_OBJS)

View File

@ -10,9 +10,7 @@
#include "config.h"
#include <ccan/array_size/array_size.h>
#include <ccan/asort/asort.h>
#include <ccan/crypto/siphash24/siphash24.h>
#include <ccan/fdpass/fdpass.h>
#include <ccan/htable/htable_type.h>
#include <ccan/noerr/noerr.h>
#include <ccan/tal/str/str.h>
#include <common/bech32.h>
@ -20,9 +18,10 @@
#include <common/daemon_conn.h>
#include <common/dev_disconnect.h>
#include <common/ecdh_hsmd.h>
#include <common/gossip_rcvd_filter.h>
#include <common/gossip_store.h>
#include <common/jsonrpc_errors.h>
#include <common/memleak.h>
#include <common/pseudorand.h>
#include <common/status.h>
#include <common/subdaemon.h>
#include <common/timeout.h>
@ -61,97 +60,6 @@
#define INITIAL_WAIT_SECONDS 1
#define MAX_WAIT_SECONDS 300
/*~ We keep a hash table (ccan/htable) of peers, which tells us what peers are
* already connected (by peer->id). */
/*~ The HTABLE_DEFINE_TYPE() macro needs a keyof() function to extract the key:
*/
static const struct node_id *peer_keyof(const struct peer *peer)
{
return &peer->id;
}
/*~ We also need to define a hashing function. siphash24 is a fast yet
* cryptographic hash in ccan/crypto/siphash24; we might be able to get away
* with a slightly faster hash with fewer guarantees, but it's good hygiene to
* use this unless it's a proven bottleneck. siphash_seed() is a function in
* common/pseudorand which sets up a seed for our hashing; it's different
* every time the program is run. */
static size_t node_id_hash(const struct node_id *id)
{
return siphash24(siphash_seed(), id->k, sizeof(id->k));
}
/*~ We also define an equality function: is this element equal to this key? */
static bool peer_eq_node_id(const struct peer *peer,
const struct node_id *id)
{
return node_id_eq(&peer->id, id);
}
/*~ This defines 'struct peer_htable' which contains 'struct peer' pointers. */
HTABLE_DEFINE_TYPE(struct peer,
peer_keyof,
node_id_hash,
peer_eq_node_id,
peer_htable);
/*~ This is the global state, like `struct lightningd *ld` in lightningd. */
struct daemon {
/* Who am I? */
struct node_id id;
/* pubkey equivalent. */
struct pubkey mykey;
/* Base for timeout timers, and how long to wait for init msg */
struct timers timers;
u32 timeout_secs;
/* Peers that we've handed to `lightningd`, which it hasn't told us
* have disconnected. */
struct peer_htable peers;
/* Peers we are trying to reach */
struct list_head connecting;
/* Connection to main daemon. */
struct daemon_conn *master;
/* Allow localhost to be considered "public": DEVELOPER-only option,
* but for simplicity we don't #if DEVELOPER-wrap it here. */
bool dev_allow_localhost;
/* We support use of a SOCKS5 proxy (e.g. Tor) */
struct addrinfo *proxyaddr;
/* They can tell us we must use proxy even for non-Tor addresses. */
bool always_use_proxy;
/* There are DNS seeds we can use to look up node addresses as a last
* resort, but doing so leaks our address so can be disabled. */
bool use_dns;
/* The address that the broken response returns instead of
* NXDOMAIN. NULL if we have not detected a broken resolver. */
struct sockaddr *broken_resolver_response;
/* File descriptors to listen on once we're activated. */
struct listen_fd *listen_fds;
/* Allow to define the default behavior of tor services calls*/
bool use_v3_autotor;
/* Our features, as lightningd told us */
struct feature_set *our_features;
/* Subdaemon to proxy websocket requests. */
char *websocket_helper;
/* If non-zero, port to listen for websocket connections. */
u16 websocket_port;
};
/* Peers we're trying to reach: we iterate through addrs until we succeed
* or fail. */
struct connecting {
@ -448,6 +356,7 @@ static struct peer *new_peer(struct daemon *daemon,
{
struct peer *peer = tal(daemon, struct peer);
peer->daemon = daemon;
peer->id = *id;
peer->cs = *cs;
peer->final_msg = NULL;
@ -457,6 +366,7 @@ static struct peer *new_peer(struct daemon *daemon,
peer->urgent = false;
peer->peer_outq = msg_queue_new(peer);
peer->subd_outq = msg_queue_new(peer);
peer->grf = new_gossip_rcvd_filter(peer);
/* Aim for connection to shuffle data back and forth: sets up
* peer->to_subd */
@ -466,6 +376,8 @@ static struct peer *new_peer(struct daemon *daemon,
peer->to_peer = tal_steal(peer, conn);
peer_htable_add(&daemon->peers, peer);
tal_add_destructor2(peer, destroy_peer, daemon);
peer->gs = NULL;
return peer;
}
@ -550,6 +462,9 @@ struct io_plan *peer_connected(struct io_conn *conn,
return tal_free(peer);
}
/* Get ready for streaming gossip from the store */
setup_peer_gossip_store(peer, daemon->our_features, their_features);
/* Create message to tell master peer has connected. */
msg = towire_connectd_peer_connected(NULL, id, addr, incoming,
pps, their_features);
@ -1600,6 +1515,7 @@ static void connect_init(struct daemon *daemon, const u8 *msg)
enum addr_listen_announce *proposed_listen_announce;
struct wireaddr *announcable;
char *tor_password;
bool dev_fast_gossip;
bool dev_disconnect;
/* Fields which require allocation are allocated off daemon */
@ -1617,12 +1533,18 @@ static void connect_init(struct daemon *daemon, const u8 *msg)
&daemon->timeout_secs,
&daemon->websocket_helper,
&daemon->websocket_port,
&dev_fast_gossip,
&dev_disconnect)) {
/* This is a helper which prints the type expected and the actual
* message, then exits (it should never be called!). */
master_badmsg(WIRE_CONNECTD_INIT, msg);
}
#if DEVELOPER
/*~ Clearly mark this as a developer-only flag! */
daemon->dev_fast_gossip = dev_fast_gossip;
#endif
if (!pubkey_from_node_id(&daemon->mykey, &daemon->id))
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Invalid id for me %s",
@ -2111,6 +2033,8 @@ int main(int argc, char *argv[])
list_head_init(&daemon->connecting);
daemon->listen_fds = tal_arr(daemon, struct listen_fd, 0);
timers_init(&daemon->timers, time_mono());
daemon->gossip_store_fd = -1;
/* stdin == control */
daemon->master = daemon_conn_new(daemon, STDIN_FILENO, recv_req, NULL,
daemon);

View File

@ -2,14 +2,155 @@
#define LIGHTNING_CONNECTD_CONNECTD_H
#include "config.h"
#include <bitcoin/pubkey.h>
#include <ccan/crypto/siphash24/siphash24.h>
#include <ccan/htable/htable_type.h>
#include <ccan/timer/timer.h>
#include <common/crypto_state.h>
#include <common/node_id.h>
#include <common/pseudorand.h>
struct io_conn;
struct connecting;
struct daemon;
struct node_id;
struct wireaddr_internal;
/*~ We keep a hash table (ccan/htable) of peers, which tells us what peers are
* already connected (by peer->id). */
struct peer {
/* Main daemon */
struct daemon *daemon;
/* The pubkey of the node */
struct node_id id;
/* Counters and keys for symmetric crypto */
struct crypto_state cs;
/* Connection to the peer */
struct io_conn *to_peer;
/* Connection to the subdaemon */
struct io_conn *to_subd;
/* Final message to send to peer (and hangup) */
u8 *final_msg;
/* When we write something which wants Nagle overridden */
bool urgent;
/* Input buffers. */
u8 *subd_in, *peer_in;
/* Output buffers. */
struct msg_queue *subd_outq, *peer_outq;
/* Peer sent buffer (for freeing after sending) */
const u8 *sent_to_peer;
/* Gossip store. */
struct gossip_state *gs;
/* FIXME: move into gs. */
struct gossip_rcvd_filter *grf;
size_t gossip_store_off;
struct oneshot *gossip_timer;
};
/*~ The HTABLE_DEFINE_TYPE() macro needs a keyof() function to extract the key:
*/
static const struct node_id *peer_keyof(const struct peer *peer)
{
return &peer->id;
}
/*~ We also need to define a hashing function. siphash24 is a fast yet
* cryptographic hash in ccan/crypto/siphash24; we might be able to get away
* with a slightly faster hash with fewer guarantees, but it's good hygiene to
* use this unless it's a proven bottleneck. siphash_seed() is a function in
* common/pseudorand which sets up a seed for our hashing; it's different
* every time the program is run. */
static size_t node_id_hash(const struct node_id *id)
{
return siphash24(siphash_seed(), id->k, sizeof(id->k));
}
/*~ We also define an equality function: is this element equal to this key? */
static bool peer_eq_node_id(const struct peer *peer,
const struct node_id *id)
{
return node_id_eq(&peer->id, id);
}
/*~ This defines 'struct peer_htable' which contains 'struct peer' pointers. */
HTABLE_DEFINE_TYPE(struct peer,
peer_keyof,
node_id_hash,
peer_eq_node_id,
peer_htable);
/*~ This is the global state, like `struct lightningd *ld` in lightningd. */
struct daemon {
/* Who am I? */
struct node_id id;
/* pubkey equivalent. */
struct pubkey mykey;
/* Base for timeout timers, and how long to wait for init msg */
struct timers timers;
u32 timeout_secs;
/* Peers that we've handed to `lightningd`, which it hasn't told us
* have disconnected. */
struct peer_htable peers;
/* Peers we are trying to reach */
struct list_head connecting;
/* Connection to main daemon. */
struct daemon_conn *master;
/* Allow localhost to be considered "public": DEVELOPER-only option,
* but for simplicity we don't #if DEVELOPER-wrap it here. */
bool dev_allow_localhost;
/* We support use of a SOCKS5 proxy (e.g. Tor) */
struct addrinfo *proxyaddr;
/* They can tell us we must use proxy even for non-Tor addresses. */
bool always_use_proxy;
/* There are DNS seeds we can use to look up node addresses as a last
* resort, but doing so leaks our address so can be disabled. */
bool use_dns;
/* The address that the broken response returns instead of
* NXDOMAIN. NULL if we have not detected a broken resolver. */
struct sockaddr *broken_resolver_response;
/* File descriptors to listen on once we're activated. */
struct listen_fd *listen_fds;
/* Allow to define the default behavior of tor services calls*/
bool use_v3_autotor;
/* Our features, as lightningd told us */
struct feature_set *our_features;
/* Subdaemon to proxy websocket requests. */
char *websocket_helper;
/* If non-zero, port to listen for websocket connections. */
u16 websocket_port;
/* The gossip_store */
int gossip_store_fd;
size_t gossip_store_end;
#if DEVELOPER
/* Hack to speed up gossip timer */
bool dev_fast_gossip;
#endif
};
/* Called by io_tor_connect once it has a connection out. */
struct io_plan *connection_out(struct io_conn *conn, struct connecting *connect);

View File

@ -21,6 +21,7 @@ msgdata,connectd_init,use_v3_autotor,bool,
msgdata,connectd_init,timeout_secs,u32,
msgdata,connectd_init,websocket_helper,wirestring,
msgdata,connectd_init,websocket_port,u16,
msgdata,connectd_init,dev_fast_gossip,bool,
# If this is set, then fd 5 is dev_disconnect_fd.
msgdata,connectd_init,dev_disconnect,bool,

1 #include <bitcoin/block.h>
21 msgdata,connectd_init,websocket_helper,wirestring,
22 msgdata,connectd_init,websocket_port,u16,
23 # If this is set, then fd 5 is dev_disconnect_fd. msgdata,connectd_init,dev_fast_gossip,bool,
24 # If this is set, then fd 5 is dev_disconnect_fd.
25 msgdata,connectd_init,dev_disconnect,bool,
26 # Connectd->master, here are the addresses I bound, can announce.
27 msgtype,connectd_init_reply,2100

View File

@ -2,17 +2,28 @@
* itself, and the subdaemons. */
#include "config.h"
#include <assert.h>
#include <bitcoin/block.h>
#include <bitcoin/chainparams.h>
#include <ccan/io/io.h>
#include <common/cryptomsg.h>
#include <common/dev_disconnect.h>
#include <common/features.h>
#include <common/gossip_constants.h>
#include <common/gossip_rcvd_filter.h>
#include <common/gossip_store.h>
#include <common/per_peer_state.h>
#include <common/status.h>
#include <common/timeout.h>
#include <common/utils.h>
#include <common/wire_error.h>
#include <connectd/connectd.h>
#include <connectd/multiplex.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <wire/peer_wire.h>
#include <wire/wire.h>
@ -23,6 +34,117 @@ void queue_peer_msg(struct peer *peer, const u8 *msg TAKES)
msg_enqueue(peer->peer_outq, msg);
}
/* Send warning, close connection to peer */
static void send_warning(struct peer *peer, const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
status_vfmt(LOG_UNUSUAL, &peer->id, fmt, ap);
va_end(ap);
/* Close locally, send msg as final warning */
io_close(peer->to_subd);
va_start(ap, fmt);
peer->final_msg = towire_warningfmtv(peer, NULL, fmt, ap);
va_end(ap);
}
/* Either for initial setup, or when they ask by timestamp */
static bool setup_gossip_filter(struct peer *peer,
u32 first_timestamp,
u32 timestamp_range)
{
bool immediate_sync;
/* If this is the first filter, we gossip sync immediately. */
if (!peer->gs) {
peer->gs = tal(peer, struct gossip_state);
peer->gs->next_gossip = time_mono();
immediate_sync = true;
} else
immediate_sync = false;
/* BOLT #7:
*
* The receiver:
* - SHOULD send all gossip messages whose `timestamp` is greater or
* equal to `first_timestamp`, and less than `first_timestamp` plus
* `timestamp_range`.
* - MAY wait for the next outgoing gossip flush to send these.
* ...
* - SHOULD restrict future gossip messages to those whose `timestamp`
* is greater or equal to `first_timestamp`, and less than
* `first_timestamp` plus `timestamp_range`.
*/
peer->gs->timestamp_min = first_timestamp;
peer->gs->timestamp_max = first_timestamp + timestamp_range - 1;
/* Make sure we never leave it on an impossible value. */
if (peer->gs->timestamp_max < peer->gs->timestamp_min)
peer->gs->timestamp_max = UINT32_MAX;
peer->gossip_store_off = 1;
return immediate_sync;
}
/* This is called once we need it: otherwise, the gossip_store may not exist,
* since we start at the same time as gossipd itself. */
static void setup_gossip_store(struct daemon *daemon)
{
daemon->gossip_store_fd = open(GOSSIP_STORE_FILENAME, O_RDONLY);
if (daemon->gossip_store_fd < 0)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Opening gossip_store %s: %s",
GOSSIP_STORE_FILENAME, strerror(errno));
/* gossipd will be writing to this, and it's not atomic! Safest
* way to find the "end" is to walk through. */
daemon->gossip_store_end
= find_gossip_store_end(daemon->gossip_store_fd, 1);
}
void setup_peer_gossip_store(struct peer *peer,
const struct feature_set *our_features,
const u8 *their_features)
{
/* Lazy setup */
if (peer->daemon->gossip_store_fd == -1)
setup_gossip_store(peer->daemon);
peer->gossip_timer = NULL;
/* BOLT #7:
*
* A node:
* - if the `gossip_queries` feature is negotiated:
* - MUST NOT relay any gossip messages it did not generate itself,
* unless explicitly requested.
*/
if (feature_negotiated(our_features, their_features, OPT_GOSSIP_QUERIES))
return;
setup_gossip_filter(peer, 0, UINT32_MAX);
/* BOLT #7:
*
* - upon receiving an `init` message with the
* `initial_routing_sync` flag set to 1:
* - SHOULD send gossip messages for all known channels and
* nodes, as if they were just received.
* - if the `initial_routing_sync` flag is set to 0, OR if the
* initial sync was completed:
* - SHOULD resume normal operation, as specified in the
* following [Rebroadcasting](#rebroadcasting) section.
*/
if (!feature_offered(their_features, OPT_INITIAL_ROUTING_SYNC)) {
/* During tests, particularly, we find that the gossip_store
* moves fast, so make sure it really does start at the end. */
peer->gossip_store_off
= find_gossip_store_end(peer->daemon->gossip_store_fd,
peer->daemon->gossip_store_end);
}
}
/* These four function handle subd->peer */
static struct io_plan *after_final_msg(struct io_conn *peer_conn,
struct peer *peer)
@ -191,6 +313,87 @@ static struct io_plan *encrypt_and_send(struct peer *peer,
next, peer);
}
/* Kicks off write_to_peer() to look for more gossip to send from store */
static void wake_gossip(struct peer *peer)
{
peer->gossip_timer = NULL;
io_wake(peer->peer_outq);
}
/* If we are streaming gossip, get something from gossip store */
static u8 *maybe_from_gossip_store(const tal_t *ctx, struct peer *peer)
{
u8 *msg;
/* Not streaming yet? */
if (!peer->gs)
return NULL;
/* Still waiting for timer? */
if (peer->gossip_timer != NULL)
return NULL;
msg = gossip_store_iter(ctx, &peer->daemon->gossip_store_fd,
peer->gs, peer->grf, &peer->gossip_store_off);
/* Cache highest valid offset (FIXME: doesn't really work when
* gossip_store gets rewritten!) */
if (peer->gossip_store_off > peer->daemon->gossip_store_end)
peer->daemon->gossip_store_end = peer->gossip_store_off;
if (msg) {
status_peer_io(LOG_IO_OUT, &peer->id, msg);
return msg;
}
/* BOLT #7:
*
* A node:
*...
* - SHOULD flush outgoing gossip messages once every 60 seconds,
* independently of the arrival times of the messages.
* - Note: this results in staggered announcements that are unique
* (not duplicated).
*/
/* We do 60 seconds from *start*, not from *now* */
peer->gs->next_gossip
= timemono_add(time_mono(),
time_from_sec(GOSSIP_FLUSH_INTERVAL(
peer->daemon->dev_fast_gossip)));
peer->gossip_timer = new_abstimer(&peer->daemon->timers, peer,
peer->gs->next_gossip,
wake_gossip, peer);
return NULL;
}
/* We only handle gossip_timestamp_filter for now */
static bool handle_message_locally(struct peer *peer, const u8 *msg)
{
struct bitcoin_blkid chain_hash;
u32 first_timestamp, timestamp_range;
/* We remember these so we don't rexmit them */
if (is_msg_gossip_broadcast(msg))
gossip_rcvd_filter_add(peer->grf, msg);
if (!fromwire_gossip_timestamp_filter(msg, &chain_hash,
&first_timestamp,
&timestamp_range)) {
return false;
}
if (!bitcoin_blkid_eq(&chainparams->genesis_blockhash, &chain_hash)) {
send_warning(peer, "gossip_timestamp_filter for bad chain: %s",
tal_hex(tmpctx, msg));
return true;
}
/* Returns true the first time. */
if (setup_gossip_filter(peer, first_timestamp, timestamp_range))
wake_gossip(peer);
return true;
}
static struct io_plan *write_to_peer(struct io_conn *peer_conn,
struct peer *peer)
{
@ -211,12 +414,16 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn,
peer->final_msg,
after_final_msg);
}
/* Tell them to read again, */
io_wake(&peer->subd_in);
/* If they want us to send gossip, do so now. */
msg = maybe_from_gossip_store(NULL, peer);
if (!msg) {
/* Tell them to read again, */
io_wake(&peer->subd_in);
/* Wait for them to wake us */
return msg_queue_wait(peer_conn, peer->peer_outq,
write_to_peer, peer);
/* Wait for them to wake us */
return msg_queue_wait(peer_conn, peer->peer_outq,
write_to_peer, peer);
}
}
return encrypt_and_send(peer, take(msg), write_to_peer);
@ -278,6 +485,12 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn,
return io_close(peer_conn);
tal_free(peer->peer_in);
/* If we swallow this, just try again. */
if (handle_message_locally(peer, decrypted)) {
tal_free(decrypted);
return read_hdr_from_peer(peer_conn, peer);
}
/* Tell them to write. */
msg_enqueue(peer->subd_outq, take(decrypted));

View File

@ -6,32 +6,9 @@
#include <common/msg_queue.h>
#include <common/node_id.h>
struct peer {
struct node_id id;
/* Counters and keys for symmetric crypto */
struct crypto_state cs;
/* Connection to the peer */
struct io_conn *to_peer;
/* Connection to the subdaemon */
struct io_conn *to_subd;
/* Final message to send to peer (and hangup) */
u8 *final_msg;
/* When we write something which wants Nagle overridden */
bool urgent;
/* Input buffers. */
u8 *subd_in, *peer_in;
/* Output buffers. */
struct msg_queue *subd_outq, *peer_outq;
/* Peer sent buffer (for freeing after sending) */
const u8 *sent_to_peer;
};
struct peer;
struct io_conn;
struct feature_set;
/* Set up peer->to_subd; sets fd_for_subd to pass to lightningd. */
bool multiplex_subd_setup(struct peer *peer, int *fd_for_subd);
@ -47,4 +24,7 @@ void multiplex_final_msg(struct peer *peer,
/* Inject a message into the output stream */
void queue_peer_msg(struct peer *peer, const u8 *msg TAKES);
void setup_peer_gossip_store(struct peer *peer,
const struct feature_set *our_features,
const u8 *their_features);
#endif /* LIGHTNING_CONNECTD_MULTIPLEX_H */

View File

@ -392,6 +392,7 @@ int connectd_init(struct lightningd *ld)
ld->config.connection_timeout_secs,
websocket_helper_path,
ld->websocket_port,
IFDEV(ld->dev_fast_gossip, false),
IFDEV(ld->dev_disconnect_fd >= 0, false));
subd_req(ld->connectd, ld->connectd, take(msg), -1, 0,

View File

@ -1335,7 +1335,8 @@ def test_gossipwith(node_factory):
num_msgs += 1
# one channel announcement, two channel_updates, two node announcements.
assert num_msgs == 5
# FIXME: Currently gets double gossip!
assert num_msgs == 5 * 2
def test_gossip_notices_close(node_factory, bitcoind):