gossip: Marking peer as non-local and forwarding broadcasts

This includes some code duplication, but since the two write targets
are fundamentally different we might need to refactor a bit more to
unify them again.
This commit is contained in:
Christian Decker 2017-03-10 13:06:51 +01:00
parent 408d2f5170
commit 5721d7d194

View File

@ -68,6 +68,10 @@ struct peer {
int proxy_fd;
struct io_conn *proxy_conn;
u8 *proxy_in;
u8 **proxy_msg_out;
/* Are we the owner of the peer? */
bool local;
};
static void destroy_peer(struct peer *peer)
@ -89,7 +93,9 @@ static struct peer *setup_new_peer(struct daemon *daemon, const u8 *msg)
return tal_free(peer);
peer->daemon = daemon;
peer->error = NULL;
peer->local = true;
peer->msg_out = tal_arr(peer, u8*, 0);
peer->proxy_msg_out = tal_arr(peer, u8*, 0);
list_add_tail(&daemon->peers, &peer->list);
tal_add_destructor(peer, destroy_peer);
return peer;
@ -141,6 +147,7 @@ static struct io_plan *peer_msgin(struct io_conn *conn,
/* Not our place to handle this, so we punt */
s = towire_gossipstatus_peer_nongossip(msg, peer->unique_id,
&peer->pcs.cs, msg);
peer->local = false;
status_send(s);
status_send_fd(io_conn_fd(conn));
return io_close(conn);
@ -201,6 +208,7 @@ static struct io_plan *peer_dump_gossip(struct io_conn *conn, struct peer *peer)
static struct io_plan *pkt_out(struct io_conn *conn, struct peer *peer)
{
assert(peer->local);
/* First we process queued packets, if any */
u8 *out;
size_t n = tal_count(peer->msg_out);
@ -211,7 +219,7 @@ static struct io_plan *pkt_out(struct io_conn *conn, struct peer *peer)
return peer_write_message(conn, &peer->pcs, take(out), pkt_out);
}
if (peer->gossip_sync){
if (peer->gossip_sync && peer->local){
/* Send any queued up broadcast messages */
peer->gossip_sync = false;
return peer_dump_gossip(conn, peer);
@ -236,18 +244,63 @@ static bool has_even_bit(const u8 *bitmap)
static struct io_plan *recv_client_req(struct io_conn *conn, struct peer *peer);
static struct io_plan *client_req_in(struct io_conn *conn, struct peer *peer)
{
/* Handle incoming requests */
/* TODO(cdecker) Handle incoming requests */
status_trace("Received message from client %s", tal_hexstr(peer, peer->proxy_in, tal_count(peer->proxy_in)));
return recv_client_req(conn, peer);
}
static struct io_plan *client_pkt_out(struct io_conn *conn, struct peer *peer);
static struct io_plan *recv_client_req(struct io_conn *conn, struct peer *peer)
{
return io_read_wire(conn, peer, &peer->proxy_in, client_req_in, peer);
}
static struct io_plan *client_dump_gossip(struct io_conn *conn, struct peer *peer)
{
struct queued_message *next;
next = next_broadcast_message(peer->daemon->rstate->broadcasts,
&peer->broadcast_index);
if (!next) {
new_reltimer(&peer->daemon->timers, peer, time_from_sec(30),
wake_pkt_out, peer);
return io_out_wait(conn, peer, client_pkt_out, peer);
} else {
return io_write_wire(conn, next->payload, client_dump_gossip, peer);
}
}
static struct io_plan *client_pkt_out(struct io_conn *conn, struct peer *peer)
{
u8 *out;
size_t n = tal_count(peer->proxy_msg_out);
if (n > 0) {
out = peer->proxy_msg_out[0];
memmove(peer->proxy_msg_out, peer->proxy_msg_out + 1, (sizeof(*peer->proxy_msg_out)*(n-1)));
tal_resize(&peer->proxy_msg_out, n-1);
return peer_write_message(conn, &peer->pcs, take(out), pkt_out);
}
if (peer->local) {
/* Not our turn, the local loop is taking care of broadcasts */
new_reltimer(&peer->daemon->timers, peer, time_from_sec(30),
wake_pkt_out, peer);
/* Going to wake up in pkt_out since we mix time based and
* message based wakeups */
return io_out_wait(conn, peer, client_pkt_out, peer);
} else if (peer->gossip_sync) {
/* Send any queued up broadcast messages */
peer->gossip_sync = false;
return client_dump_gossip(conn, peer);
} else {
return io_out_wait(conn, peer, pkt_out, peer);
}
}
static struct io_plan *peer_proxy_init(struct io_conn *conn, struct peer *peer)
{
return recv_client_req(conn, peer);
return io_duplex(conn, recv_client_req(conn, peer),
client_pkt_out(conn, peer));
}
static int peer_create_gossip_client(struct peer *peer)
@ -380,6 +433,7 @@ static struct io_plan *release_peer(struct io_conn *conn, struct daemon *daemon,
out = towire_gossipctl_release_peer_reply(msg,
unique_id,
&peer->pcs.cs);
peer->local = false;
return io_write_wire(conn, out, release_peer_fd, peer);
}
}