From 5721d7d19447eb7d2fbbcd82f1f4514715f74e4b Mon Sep 17 00:00:00 2001 From: Christian Decker Date: Fri, 10 Mar 2017 13:06:51 +0100 Subject: [PATCH] gossip: Marking peer as non-local and forwarding broadcasts This includes some code duplication, but since the two write targets are fundamentally different we might need to refactor a bit more to unify them again. --- lightningd/gossip/gossip.c | 60 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 57 insertions(+), 3 deletions(-) diff --git a/lightningd/gossip/gossip.c b/lightningd/gossip/gossip.c index 03e8cd98a..580a87b69 100644 --- a/lightningd/gossip/gossip.c +++ b/lightningd/gossip/gossip.c @@ -68,6 +68,10 @@ struct peer { int proxy_fd; struct io_conn *proxy_conn; u8 *proxy_in; + u8 **proxy_msg_out; + + /* Are we the owner of the peer? */ + bool local; }; static void destroy_peer(struct peer *peer) @@ -89,7 +93,9 @@ static struct peer *setup_new_peer(struct daemon *daemon, const u8 *msg) return tal_free(peer); peer->daemon = daemon; peer->error = NULL; + peer->local = true; peer->msg_out = tal_arr(peer, u8*, 0); + peer->proxy_msg_out = tal_arr(peer, u8*, 0); list_add_tail(&daemon->peers, &peer->list); tal_add_destructor(peer, destroy_peer); return peer; @@ -141,6 +147,7 @@ static struct io_plan *peer_msgin(struct io_conn *conn, /* Not our place to handle this, so we punt */ s = towire_gossipstatus_peer_nongossip(msg, peer->unique_id, &peer->pcs.cs, msg); + peer->local = false; status_send(s); status_send_fd(io_conn_fd(conn)); return io_close(conn); @@ -201,6 +208,7 @@ static struct io_plan *peer_dump_gossip(struct io_conn *conn, struct peer *peer) static struct io_plan *pkt_out(struct io_conn *conn, struct peer *peer) { + assert(peer->local); /* First we process queued packets, if any */ u8 *out; size_t n = tal_count(peer->msg_out); @@ -211,7 +219,7 @@ static struct io_plan *pkt_out(struct io_conn *conn, struct peer *peer) return peer_write_message(conn, &peer->pcs, take(out), pkt_out); } - if (peer->gossip_sync){ + if (peer->gossip_sync && peer->local){ /* Send any queued up broadcast messages */ peer->gossip_sync = false; return peer_dump_gossip(conn, peer); @@ -236,18 +244,63 @@ static bool has_even_bit(const u8 *bitmap) static struct io_plan *recv_client_req(struct io_conn *conn, struct peer *peer); static struct io_plan *client_req_in(struct io_conn *conn, struct peer *peer) { - /* Handle incoming requests */ + /* TODO(cdecker) Handle incoming requests */ + status_trace("Received message from client %s", tal_hexstr(peer, peer->proxy_in, tal_count(peer->proxy_in))); return recv_client_req(conn, peer); } +static struct io_plan *client_pkt_out(struct io_conn *conn, struct peer *peer); static struct io_plan *recv_client_req(struct io_conn *conn, struct peer *peer) { return io_read_wire(conn, peer, &peer->proxy_in, client_req_in, peer); } +static struct io_plan *client_dump_gossip(struct io_conn *conn, struct peer *peer) +{ + struct queued_message *next; + next = next_broadcast_message(peer->daemon->rstate->broadcasts, + &peer->broadcast_index); + + if (!next) { + new_reltimer(&peer->daemon->timers, peer, time_from_sec(30), + wake_pkt_out, peer); + return io_out_wait(conn, peer, client_pkt_out, peer); + } else { + return io_write_wire(conn, next->payload, client_dump_gossip, peer); + } +} + +static struct io_plan *client_pkt_out(struct io_conn *conn, struct peer *peer) +{ + u8 *out; + size_t n = tal_count(peer->proxy_msg_out); + if (n > 0) { + out = peer->proxy_msg_out[0]; + memmove(peer->proxy_msg_out, peer->proxy_msg_out + 1, (sizeof(*peer->proxy_msg_out)*(n-1))); + tal_resize(&peer->proxy_msg_out, n-1); + return peer_write_message(conn, &peer->pcs, take(out), pkt_out); + } + + if (peer->local) { + /* Not our turn, the local loop is taking care of broadcasts */ + new_reltimer(&peer->daemon->timers, peer, time_from_sec(30), + wake_pkt_out, peer); + /* Going to wake up in pkt_out since we mix time based and + * message based wakeups */ + return io_out_wait(conn, peer, client_pkt_out, peer); + } else if (peer->gossip_sync) { + /* Send any queued up broadcast messages */ + peer->gossip_sync = false; + return client_dump_gossip(conn, peer); + } else { + return io_out_wait(conn, peer, pkt_out, peer); + } +} + static struct io_plan *peer_proxy_init(struct io_conn *conn, struct peer *peer) { - return recv_client_req(conn, peer); + return io_duplex(conn, recv_client_req(conn, peer), + client_pkt_out(conn, peer)); } static int peer_create_gossip_client(struct peer *peer) @@ -380,6 +433,7 @@ static struct io_plan *release_peer(struct io_conn *conn, struct daemon *daemon, out = towire_gossipctl_release_peer_reply(msg, unique_id, &peer->pcs.cs); + peer->local = false; return io_write_wire(conn, out, release_peer_fd, peer); } }