gossip: Refactored non-local peers to use daemon_conn

This commit is contained in:
Christian Decker 2017-03-11 15:31:17 +01:00
parent 1af6262822
commit 05e64db6cd

View File

@ -16,6 +16,7 @@
#include <errno.h>
#include <fcntl.h>
#include <inttypes.h>
#include <lightningd/connection.h>
#include <lightningd/cryptomsg.h>
#include <lightningd/debug.h>
#include <lightningd/gossip/gen_gossip_wire.h>
@ -65,10 +66,7 @@ struct peer {
bool gossip_sync;
/* The peer owner will use this to talk to gossipd */
int proxy_fd;
struct io_conn *proxy_conn;
u8 *proxy_in;
u8 **proxy_msg_out;
struct daemon_conn owner_conn;
/* Are we the owner of the peer? */
bool local;
@ -97,7 +95,6 @@ static struct peer *setup_new_peer(struct daemon *daemon, const u8 *msg)
peer->error = NULL;
peer->local = true;
peer->msg_out = tal_arr(peer, u8*, 0);
peer->proxy_msg_out = tal_arr(peer, u8*, 0);
list_add_tail(&daemon->peers, &peer->list);
tal_add_destructor(peer, destroy_peer);
wake_pkt_out(peer);
@ -197,7 +194,10 @@ static void wake_pkt_out(struct peer *peer)
peer->gossip_sync = true;
new_reltimer(&peer->daemon->timers, peer, time_from_sec(30),
wake_pkt_out, peer);
/* Notify the peer-write loop */
io_wake(peer);
/* Notify the daemon_conn-write loop */
io_wake(&peer->owner_conn);
}
/* Loop through the backlog of channel_{announcements,updates} and
@ -256,72 +256,57 @@ static bool has_even_bit(const u8 *bitmap)
return false;
}
static struct io_plan *recv_client_req(struct io_conn *conn, struct peer *peer);
static struct io_plan *client_req_in(struct io_conn *conn, struct peer *peer)
/**
* owner_msg_in - Called by the `peer->owner_conn` upon receiving a
* message
*/
static struct io_plan *owner_msg_in(struct io_conn *conn,
struct daemon_conn *dc)
{
/* TODO(cdecker) Handle incoming requests */
status_trace("Received message from client %s", tal_hexstr(peer, peer->proxy_in, tal_count(peer->proxy_in)));
return recv_client_req(conn, peer);
struct peer *peer = container_of(dc, struct peer, owner_conn);
u8 *msg = dc->msg_in;
int type = fromwire_peektype(msg);
if (type == WIRE_CHANNEL_ANNOUNCEMENT || type == WIRE_CHANNEL_UPDATE ||
type == WIRE_NODE_ANNOUNCEMENT) {
handle_gossip_msg(peer->daemon->rstate, dc->msg_in);
}
return daemon_conn_read_next(conn, dc);
}
static struct io_plan *client_pkt_out(struct io_conn *conn, struct peer *peer);
static struct io_plan *recv_client_req(struct io_conn *conn, struct peer *peer)
{
return io_read_wire(conn, peer, &peer->proxy_in, client_req_in, peer);
}
static struct io_plan *client_dump_gossip(struct io_conn *conn, struct peer *peer)
/**
* nonlocal_dump_gossip - catch the nonlocal peer up with the latest gossip.
*
* Registered as `msg_queue_cleared_cb` by the `peer->owner_conn`.
*/
static struct io_plan *nonlocal_dump_gossip(struct io_conn *conn, struct daemon_conn *dc)
{
struct queued_message *next;
struct peer *peer = container_of(dc, struct peer, owner_conn);
/* Make sure we are not connected directly */
if (peer->local)
return io_out_wait(conn, peer, daemon_conn_write_next, dc);
next = next_broadcast_message(peer->daemon->rstate->broadcasts,
&peer->broadcast_index);
if (!next) {
return io_out_wait(conn, peer, client_pkt_out, peer);
return io_out_wait(conn, peer, daemon_conn_write_next, dc);
} else {
return io_write_wire(conn, next->payload, client_dump_gossip, peer);
return io_write_wire(conn, next->payload, nonlocal_dump_gossip, dc);
}
}
static struct io_plan *client_pkt_out(struct io_conn *conn, struct peer *peer)
{
u8 *out;
size_t n = tal_count(peer->proxy_msg_out);
if (n > 0) {
out = peer->proxy_msg_out[0];
memmove(peer->proxy_msg_out, peer->proxy_msg_out + 1, (sizeof(*peer->proxy_msg_out)*(n-1)));
tal_resize(&peer->proxy_msg_out, n-1);
return peer_write_message(conn, &peer->pcs, take(out), pkt_out);
}
if (peer->local) {
/* Not our turn, the local loop is taking care of broadcasts */
/* Going to wake up in pkt_out since we mix time based and
* message based wakeups */
return io_out_wait(conn, peer, client_pkt_out, peer);
} else if (peer->gossip_sync) {
/* Send any queued up broadcast messages */
peer->gossip_sync = false;
return client_dump_gossip(conn, peer);
} else {
return io_out_wait(conn, peer, pkt_out, peer);
}
}
static struct io_plan *peer_proxy_init(struct io_conn *conn, struct peer *peer)
{
return io_duplex(conn, recv_client_req(conn, peer),
client_pkt_out(conn, peer));
}
static int peer_create_gossip_client(struct peer *peer)
static int peer_create_owner_conn(struct peer *peer)
{
int fds[2];
if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) {
return -1;
}
peer->proxy_fd = fds[0];
peer->proxy_conn = io_new_conn(peer, fds[0], peer_proxy_init, peer);
daemon_conn_init(peer, &peer->owner_conn, fds[0], owner_msg_in);
peer->owner_conn.msg_queue_cleared_cb = nonlocal_dump_gossip;
return fds[1];
}
@ -354,7 +339,8 @@ static struct io_plan *peer_parse_init(struct io_conn *conn,
return io_close(conn);
}
client_fd = peer_create_gossip_client(peer);
client_fd = peer_create_owner_conn(peer);
if (client_fd == -1) {
peer->error = tal_fmt(msg, "Internal error");
return io_close(conn);