From fb814a7a9edd36af932cedf06f9b32da30dac9d8 Mon Sep 17 00:00:00 2001 From: Christian Decker Date: Wed, 1 Feb 2017 15:49:01 +0100 Subject: [PATCH] gossip: Adding announcement handling to the gossip subdaemon. We now have all the pieces to wire in the handling functionality to the new gossip subdaemon. --- daemon/routing.c | 2 +- lightningd/gossip/Makefile | 8 +++++- lightningd/gossip/gossip.c | 51 +++++++++++++++++++++++++++++++++++--- 3 files changed, 55 insertions(+), 6 deletions(-) diff --git a/daemon/routing.c b/daemon/routing.c index e70a5d0f7..31d3eee13 100644 --- a/daemon/routing.c +++ b/daemon/routing.c @@ -4,6 +4,7 @@ #include "packets.h" #include "pseudorand.h" #include "routing.h" +#include "wire/gen_peer_wire.h" #include #include #include @@ -11,7 +12,6 @@ #include #include #include -#include "wire/gen_peer_wire.h" /* 365.25 * 24 * 60 / 10 */ #define BLOCKS_PER_YEAR 52596 diff --git a/lightningd/gossip/Makefile b/lightningd/gossip/Makefile index b15ca40e3..68f7fd0c8 100644 --- a/lightningd/gossip/Makefile +++ b/lightningd/gossip/Makefile @@ -13,9 +13,15 @@ LIGHTNINGD_GOSSIP_CONTROL_SRC := lightningd/gossip/gen_gossip_control_wire.c \ lightningd/gossip/gen_gossip_status_wire.c LIGHTNINGD_GOSSIP_CONTROL_OBJS := $(LIGHTNINGD_GOSSIP_CONTROL_SRC:.c=.o) +# These should eventually be migrated to the lightningd directory, after +# deprecating the legacy daemons +LIGHTNINGD_GOSSIP_LEGACY_HEADERS := daemon/routing.h daemon/broadcast.h \ + daemon/log.h daemon/pseudorand.h + # lightningd/gossip needs these: LIGHTNINGD_GOSSIP_HEADERS := lightningd/gossip/gen_gossip_control_wire.h \ - lightningd/gossip/gen_gossip_status_wire.h + lightningd/gossip/gen_gossip_status_wire.h \ + $(LIGHTNINGD_GOSSIP_LEGACY_HEADERS) LIGHTNINGD_GOSSIP_SRC := lightningd/gossip/gossip.c \ $(LIGHTNINGD_GOSSIP_HEADERS:.h=.c) LIGHTNINGD_GOSSIP_OBJS := $(LIGHTNINGD_GOSSIP_SRC:.c=.o) diff --git a/lightningd/gossip/gossip.c b/lightningd/gossip/gossip.c index 0ef9bd811..b70aee818 100644 --- a/lightningd/gossip/gossip.c +++ b/lightningd/gossip/gossip.c @@ -9,6 +9,8 @@ #include #include #include +#include +#include #include #include #include @@ -29,6 +31,9 @@ struct daemon { struct list_head peers; u8 *msg_in; + + /* Routing information */ + struct routing_state *rstate; }; struct peer { @@ -41,11 +46,15 @@ struct peer { /* File descriptor corresponding to conn. */ int fd; + /* Our connection (and owner) */ struct io_conn *conn; /* If this is non-NULL, it means we failed. */ const char *error; + + /* High water mark for the staggered broadcast */ + u64 broadcast_index; }; static void destroy_peer(struct peer *peer) @@ -83,9 +92,15 @@ static struct io_plan *peer_msgin(struct io_conn *conn, return io_close(conn); case WIRE_CHANNEL_ANNOUNCEMENT: + handle_channel_announcement(peer->daemon->rstate, msg, tal_count(msg)); + return peer_read_message(conn, peer->cs, peer_msgin); + case WIRE_NODE_ANNOUNCEMENT: + handle_node_announcement(peer->daemon->rstate, msg, tal_count(msg)); + return peer_read_message(conn, peer->cs, peer_msgin); + case WIRE_CHANNEL_UPDATE: - /* FIXME: Handle gossip! */ + handle_channel_update(peer->daemon->rstate, msg, tal_count(msg)); return peer_read_message(conn, peer->cs, peer_msgin); case WIRE_INIT: @@ -128,10 +143,33 @@ static struct io_plan *peer_msgin(struct io_conn *conn, return io_close(conn); } +/* Gets called by the outgoing IO loop when woken up. Sends messages + * to the peer if there are any queued. Also checks if we have any + * queued gossip messages and processes them. */ +static struct io_plan *pkt_out(struct io_conn *conn, struct peer *peer); + +/* Loop through the backlog of channel_{announcements,updates} and + * node_announcements, writing out one on each iteration. Once we are + * through wait for the broadcast interval and start again. */ static struct io_plan *peer_dump_gossip(struct io_conn *conn, struct peer *peer) { - /* FIXME: Dump gossip here, then when done... */ - return peer_read_message(conn, peer->cs, peer_msgin); + struct queued_message *next; + next = next_broadcast_message( + peer->daemon->rstate->broadcasts, &peer->broadcast_index); + + if (!next) { + //FIXME(cdecker) Add wakeup timer once timers are refactored. + return io_out_wait(conn, peer, pkt_out, peer); + } else { + return peer_write_message(conn, peer->cs, next->payload, peer_dump_gossip); + } +} + +static struct io_plan *pkt_out(struct io_conn *conn, struct peer *peer) +{ + //FIXME(cdecker) Add logic to enable sending of non-broadcast messages + /* Send any queued up messages */ + return peer_dump_gossip(conn, peer); } static bool has_even_bit(const u8 *bitmap) @@ -182,7 +220,11 @@ static struct io_plan *peer_parse_init(struct io_conn *conn, */ status_send(towire_gossipstatus_peer_ready(msg, peer->unique_id)); - return peer_dump_gossip(conn, peer); + /* Need to go duplex here, otherwise backpressure would mean + * we both wait indefinitely */ + return io_duplex(conn, + peer_read_message(conn, peer->cs, peer_msgin), + peer_dump_gossip(conn, peer)); } static struct io_plan *peer_init_sent(struct io_conn *conn, struct peer *peer) @@ -308,6 +350,7 @@ int main(int argc, char *argv[]) } daemon = tal(NULL, struct daemon); + daemon->rstate = new_routing_state(daemon, NULL); list_head_init(&daemon->peers); daemon->msg_in = NULL;