From 05e64db6cdc24db4194c271dd34e3681d802536e Mon Sep 17 00:00:00 2001 From: Christian Decker Date: Sat, 11 Mar 2017 15:31:17 +0100 Subject: [PATCH] gossip: Refactored non-local peers to use `daemon_conn` --- lightningd/gossip/gossip.c | 94 ++++++++++++++++---------------------- 1 file changed, 40 insertions(+), 54 deletions(-) diff --git a/lightningd/gossip/gossip.c b/lightningd/gossip/gossip.c index 4184f2c7a..07119ec1f 100644 --- a/lightningd/gossip/gossip.c +++ b/lightningd/gossip/gossip.c @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -65,10 +66,7 @@ struct peer { bool gossip_sync; /* The peer owner will use this to talk to gossipd */ - int proxy_fd; - struct io_conn *proxy_conn; - u8 *proxy_in; - u8 **proxy_msg_out; + struct daemon_conn owner_conn; /* Are we the owner of the peer? */ bool local; @@ -97,7 +95,6 @@ static struct peer *setup_new_peer(struct daemon *daemon, const u8 *msg) peer->error = NULL; peer->local = true; peer->msg_out = tal_arr(peer, u8*, 0); - peer->proxy_msg_out = tal_arr(peer, u8*, 0); list_add_tail(&daemon->peers, &peer->list); tal_add_destructor(peer, destroy_peer); wake_pkt_out(peer); @@ -197,7 +194,10 @@ static void wake_pkt_out(struct peer *peer) peer->gossip_sync = true; new_reltimer(&peer->daemon->timers, peer, time_from_sec(30), wake_pkt_out, peer); + /* Notify the peer-write loop */ io_wake(peer); + /* Notify the daemon_conn-write loop */ + io_wake(&peer->owner_conn); } /* Loop through the backlog of channel_{announcements,updates} and @@ -256,72 +256,57 @@ static bool has_even_bit(const u8 *bitmap) return false; } -static struct io_plan *recv_client_req(struct io_conn *conn, struct peer *peer); -static struct io_plan *client_req_in(struct io_conn *conn, struct peer *peer) +/** + * owner_msg_in - Called by the `peer->owner_conn` upon receiving a + * message + */ +static struct io_plan *owner_msg_in(struct io_conn *conn, + struct daemon_conn *dc) { - /* TODO(cdecker) Handle incoming requests */ - status_trace("Received message from client %s", tal_hexstr(peer, peer->proxy_in, tal_count(peer->proxy_in))); - return recv_client_req(conn, peer); + struct peer *peer = container_of(dc, struct peer, owner_conn); + u8 *msg = dc->msg_in; + + int type = fromwire_peektype(msg); + if (type == WIRE_CHANNEL_ANNOUNCEMENT || type == WIRE_CHANNEL_UPDATE || + type == WIRE_NODE_ANNOUNCEMENT) { + handle_gossip_msg(peer->daemon->rstate, dc->msg_in); + } + return daemon_conn_read_next(conn, dc); } -static struct io_plan *client_pkt_out(struct io_conn *conn, struct peer *peer); -static struct io_plan *recv_client_req(struct io_conn *conn, struct peer *peer) -{ - return io_read_wire(conn, peer, &peer->proxy_in, client_req_in, peer); -} - -static struct io_plan *client_dump_gossip(struct io_conn *conn, struct peer *peer) +/** + * nonlocal_dump_gossip - catch the nonlocal peer up with the latest gossip. + * + * Registered as `msg_queue_cleared_cb` by the `peer->owner_conn`. + */ +static struct io_plan *nonlocal_dump_gossip(struct io_conn *conn, struct daemon_conn *dc) { struct queued_message *next; + struct peer *peer = container_of(dc, struct peer, owner_conn); + + + /* Make sure we are not connected directly */ + if (peer->local) + return io_out_wait(conn, peer, daemon_conn_write_next, dc); + next = next_broadcast_message(peer->daemon->rstate->broadcasts, &peer->broadcast_index); if (!next) { - return io_out_wait(conn, peer, client_pkt_out, peer); + return io_out_wait(conn, peer, daemon_conn_write_next, dc); } else { - return io_write_wire(conn, next->payload, client_dump_gossip, peer); + return io_write_wire(conn, next->payload, nonlocal_dump_gossip, dc); } } -static struct io_plan *client_pkt_out(struct io_conn *conn, struct peer *peer) -{ - u8 *out; - size_t n = tal_count(peer->proxy_msg_out); - if (n > 0) { - out = peer->proxy_msg_out[0]; - memmove(peer->proxy_msg_out, peer->proxy_msg_out + 1, (sizeof(*peer->proxy_msg_out)*(n-1))); - tal_resize(&peer->proxy_msg_out, n-1); - return peer_write_message(conn, &peer->pcs, take(out), pkt_out); - } - - if (peer->local) { - /* Not our turn, the local loop is taking care of broadcasts */ - /* Going to wake up in pkt_out since we mix time based and - * message based wakeups */ - return io_out_wait(conn, peer, client_pkt_out, peer); - } else if (peer->gossip_sync) { - /* Send any queued up broadcast messages */ - peer->gossip_sync = false; - return client_dump_gossip(conn, peer); - } else { - return io_out_wait(conn, peer, pkt_out, peer); - } -} - -static struct io_plan *peer_proxy_init(struct io_conn *conn, struct peer *peer) -{ - return io_duplex(conn, recv_client_req(conn, peer), - client_pkt_out(conn, peer)); -} - -static int peer_create_gossip_client(struct peer *peer) +static int peer_create_owner_conn(struct peer *peer) { int fds[2]; if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) { return -1; } - peer->proxy_fd = fds[0]; - peer->proxy_conn = io_new_conn(peer, fds[0], peer_proxy_init, peer); + daemon_conn_init(peer, &peer->owner_conn, fds[0], owner_msg_in); + peer->owner_conn.msg_queue_cleared_cb = nonlocal_dump_gossip; return fds[1]; } @@ -354,7 +339,8 @@ static struct io_plan *peer_parse_init(struct io_conn *conn, return io_close(conn); } - client_fd = peer_create_gossip_client(peer); + client_fd = peer_create_owner_conn(peer); + if (client_fd == -1) { peer->error = tal_fmt(msg, "Internal error"); return io_close(conn);