diff --git a/connectd/connectd.c b/connectd/connectd.c index b70f9bafc..f3cb14afb 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -210,57 +210,6 @@ static void peer_connected_in(struct daemon *daemon, tal_free(connect); } -/*~ Every per-peer daemon needs a connection to the gossip daemon; this allows - * it to forward gossip to/from the peer. The gossip daemon needs to know a - * few of the features of the peer and its id (for reporting). - * - * Every peer also has read-only access to the gossip_store, which is handed - * out by gossipd too, and also a "gossip_state" indicating where we're up to. - * - * 'features' is a field in the `init` message, indicating properties of the - * node. - */ -static int get_gossipfd(struct daemon *daemon, - const struct node_id *id, - const u8 *their_features) -{ - bool gossip_queries_feature, success; - u8 *msg; - - /*~ The way features generally work is that both sides need to offer it; - * we always offer `gossip_queries`, but this check is explicit. */ - gossip_queries_feature - = feature_negotiated(daemon->our_features, their_features, - OPT_GOSSIP_QUERIES); - - /*~ We do this communication sync, since gossipd is our friend and - * it's easier. If gossipd fails, we fail. */ - msg = towire_gossipd_new_peer(NULL, id, gossip_queries_feature); - if (!wire_sync_write(GOSSIPCTL_FD, take(msg))) - status_failed(STATUS_FAIL_INTERNAL_ERROR, - "Failed writing to gossipctl: %s", - strerror(errno)); - - msg = wire_sync_read(tmpctx, GOSSIPCTL_FD); - if (!fromwire_gossipd_new_peer_reply(msg, &success)) - status_failed(STATUS_FAIL_INTERNAL_ERROR, - "Failed parsing msg gossipctl: %s", - tal_hex(tmpctx, msg)); - - /* Gossipd might run out of file descriptors, so it tells us, and we - * give up on connecting this peer. */ - if (!success) { - status_broken("Gossipd did not give us an fd: losing peer %s", - type_to_string(tmpctx, struct node_id, id)); - return -1; - } - - /* Otherwise, the next thing in the socket will be the file descriptor - * for the per-peer daemon. */ - return fdpass_recv(GOSSIPCTL_FD); - -} - /*~ This is an ad-hoc marshalling structure where we store arguments so we * can call peer_connected again. */ struct peer_reconnected { @@ -336,8 +285,6 @@ static struct io_plan *peer_reconnected(struct io_conn *conn, /*~ When we free a peer, we remove it from the daemon's hashtable */ static void destroy_peer(struct peer *peer, struct daemon *daemon) { - if (peer->gossip_fd >= 0) - close(peer->gossip_fd); peer_htable_del(&daemon->peers, peer); } @@ -398,6 +345,7 @@ struct io_plan *peer_connected(struct io_conn *conn, int unsup; size_t depender, missing; int subd_fd; + bool option_gossip_queries; peer = peer_htable_get(&daemon->peers, id); if (peer) @@ -455,12 +403,12 @@ struct io_plan *peer_connected(struct io_conn *conn, if (!peer) return io_close(conn); - /* If gossipd can't give us a file descriptor, we give up connecting. */ - peer->gossip_fd = get_gossipfd(daemon, id, their_features); - if (peer->gossip_fd < 0) { - close(subd_fd); - return tal_free(peer); - } + /* Tell gossipd it can ask query this new peer for gossip */ + option_gossip_queries = feature_negotiated(daemon->our_features, + their_features, + OPT_GOSSIP_QUERIES); + msg = towire_gossipd_new_peer(NULL, id, option_gossip_queries); + daemon_conn_send(daemon->gossipd, take(msg)); /* Get ready for streaming gossip from the store */ setup_peer_gossip_store(peer, daemon->our_features, their_features); @@ -1846,6 +1794,10 @@ void peer_conn_closed(struct peer *peer) assert(!peer->to_peer); assert(peer->told_to_close); + /* Tell gossipd to stop asking this peer gossip queries */ + daemon_conn_send(peer->daemon->gossipd, + take(towire_gossipd_peer_gone(NULL, &peer->id))); + /* Wake up in case there's a reconnecting peer waiting in io_wait. */ io_wake(peer); diff --git a/connectd/connectd.h b/connectd/connectd.h index a630ff649..7b2eaa98d 100644 --- a/connectd/connectd.h +++ b/connectd/connectd.h @@ -81,9 +81,6 @@ struct peer { /* Random ping timer, to detect dead connections. */ struct oneshot *ping_timer; - /* FIXME: remove! */ - int gossip_fd; - #if DEVELOPER bool dev_read_enabled; /* If non-NULL, this counts down; 0 means disable */ diff --git a/connectd/connectd_gossipd_wire.csv b/connectd/connectd_gossipd_wire.csv index bdb9a75f5..195a73421 100644 --- a/connectd/connectd_gossipd_wire.csv +++ b/connectd/connectd_gossipd_wire.csv @@ -8,9 +8,9 @@ msgdata,gossipd_new_peer,id,node_id, # Did we negotiate OPT_GOSSIP_QUERIES? msgdata,gossipd_new_peer,gossip_queries_feature,bool, -# if success: + gossip fd -msgtype,gossipd_new_peer_reply,4100 -msgdata,gossipd_new_peer_reply,success,bool, +# peer is done +msgtype,gossipd_peer_gone,4101 +msgdata,gossipd_peer_gone,id,node_id, # connectd tells gossipd a gossip msg it received for peer. msgtype,gossipd_recv_gossip,4002 diff --git a/gossipd/gossipd.c b/gossipd/gossipd.c index d7797c068..5fef825a3 100644 --- a/gossipd/gossipd.c +++ b/gossipd/gossipd.c @@ -89,12 +89,6 @@ static void destroy_peer(struct peer *peer) node = get_node(peer->daemon->rstate, &peer->id); if (node) peer_disable_channels(peer->daemon, node); - - /* This is tricky: our lifetime is tied to the daemon_conn; it's our - * parent, so we are freed if it is, but we need to free it if we're - * freed manually. tal_free() treats this as a noop if it's already - * being freed */ - tal_free(peer->dc); } /* Search for a peer. */ @@ -347,103 +341,26 @@ static void handle_local_private_channel(struct daemon *daemon, const u8 *msg) } } -/*~ This is where the per-peer daemons send us messages. It's either forwarded - * gossip, or a request for information. We deliberately use non-overlapping - * message types so we can distinguish them. */ -static struct io_plan *peer_msg_in(struct io_conn *conn, - const u8 *msg, - struct peer *peer) +/*~ This is where connectd tells us about a new peer we might want to + * gossip with. */ +static void connectd_new_peer(struct daemon *daemon, const u8 *msg) { - /* These are messages relayed from peer */ - switch ((enum peer_wire)fromwire_peektype(msg)) { - /* These are not sent by peer (connectd sends us gossip msgs) */ - case WIRE_CHANNEL_ANNOUNCEMENT: - case WIRE_CHANNEL_UPDATE: - case WIRE_NODE_ANNOUNCEMENT: - case WIRE_QUERY_CHANNEL_RANGE: - case WIRE_REPLY_CHANNEL_RANGE: - case WIRE_QUERY_SHORT_CHANNEL_IDS: - case WIRE_REPLY_SHORT_CHANNEL_IDS_END: - case WIRE_WARNING: - case WIRE_INIT: - case WIRE_ERROR: - case WIRE_PING: - case WIRE_PONG: - case WIRE_OPEN_CHANNEL: - case WIRE_ACCEPT_CHANNEL: - case WIRE_FUNDING_CREATED: - case WIRE_FUNDING_SIGNED: - case WIRE_FUNDING_LOCKED: - case WIRE_SHUTDOWN: - case WIRE_CLOSING_SIGNED: - case WIRE_UPDATE_ADD_HTLC: - case WIRE_UPDATE_FULFILL_HTLC: - case WIRE_UPDATE_FAIL_HTLC: - case WIRE_UPDATE_FAIL_MALFORMED_HTLC: - case WIRE_COMMITMENT_SIGNED: - case WIRE_REVOKE_AND_ACK: - case WIRE_UPDATE_FEE: - case WIRE_UPDATE_BLOCKHEIGHT: - case WIRE_CHANNEL_REESTABLISH: - case WIRE_ANNOUNCEMENT_SIGNATURES: - case WIRE_GOSSIP_TIMESTAMP_FILTER: - case WIRE_TX_ADD_INPUT: - case WIRE_TX_REMOVE_INPUT: - case WIRE_TX_ADD_OUTPUT: - case WIRE_TX_REMOVE_OUTPUT: - case WIRE_TX_COMPLETE: - case WIRE_TX_SIGNATURES: - case WIRE_OPEN_CHANNEL2: - case WIRE_ACCEPT_CHANNEL2: - case WIRE_INIT_RBF: - case WIRE_ACK_RBF: - case WIRE_OBS2_ONION_MESSAGE: - case WIRE_ONION_MESSAGE: -#if EXPERIMENTAL_FEATURES - case WIRE_STFU: -#endif - status_broken("peer %s: relayed unexpected msg of type %s", - type_to_string(tmpctx, struct node_id, &peer->id), - peer_wire_name(fromwire_peektype(msg))); - return io_close(conn); - } - - /* Anything else should not have been sent to us: close on it */ - status_peer_broken(&peer->id, "unexpected cmd of type %i", - fromwire_peektype(msg)); - return io_close(conn); -} - -/*~ This is where connectd tells us about a new peer, and we hand back an fd for - * it to send us messages via peer_msg_in above */ -static struct io_plan *connectd_new_peer(struct io_conn *conn, - struct daemon *daemon, - const u8 *msg) -{ - struct peer *peer = tal(conn, struct peer); + struct peer *peer = tal(daemon, struct peer); struct node *node; - int fds[2]; if (!fromwire_gossipd_new_peer(msg, &peer->id, &peer->gossip_queries_feature)) { - status_broken("Bad new_peer msg from connectd: %s", + status_failed(STATUS_FAIL_INTERNAL_ERROR, + "Bad new_peer msg from connectd: %s", tal_hex(tmpctx, msg)); - return io_close(conn); } - /* This can happen: we handle it gracefully, returning a `failed` msg. */ - if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) { - status_broken("Failed to create socketpair: %s", - strerror(errno)); - daemon_conn_send(daemon->connectd, - take(towire_gossipd_new_peer_reply(NULL, - false))); - goto done; + if (find_peer(daemon, &peer->id)) { + status_broken("Peer %s already here?", + type_to_string(tmpctx, struct node_id, &peer->id)); + tal_free(find_peer(daemon, &peer->id)); } - /* We might not have noticed old peer is dead; kill it now. */ - tal_free(find_peer(daemon, &peer->id)); - /* Populate the rest of the peer info. */ peer->daemon = daemon; peer->gossip_counter = 0; @@ -459,27 +376,30 @@ static struct io_plan *connectd_new_peer(struct io_conn *conn, list_add_tail(&peer->daemon->peers, &peer->list); tal_add_destructor(peer, destroy_peer); - /* This is the new connection. */ - peer->dc = daemon_conn_new(daemon, fds[0], - peer_msg_in, - NULL, peer); - /* Free peer if conn closed (destroy_peer closes conn if peer freed) */ - tal_steal(peer->dc, peer); - node = get_node(daemon->rstate, &peer->id); if (node) peer_enable_channels(daemon, node); /* This sends the initial timestamp filter. */ seeker_setup_peer_gossip(daemon->seeker, peer); +} - /* Reply with success, and the new fd and gossip_state. */ - daemon_conn_send(daemon->connectd, - take(towire_gossipd_new_peer_reply(NULL, true))); - daemon_conn_send_fd(daemon->connectd, fds[1]); +static void connectd_peer_gone(struct daemon *daemon, const u8 *msg) +{ + struct node_id id; + struct peer *peer; -done: - return daemon_conn_read_next(conn, daemon->connectd); + if (!fromwire_gossipd_peer_gone(msg, &id)) { + status_failed(STATUS_FAIL_INTERNAL_ERROR, + "Bad peer_gone msg from connectd: %s", + tal_hex(tmpctx, msg)); + } + + peer = find_peer(daemon, &id); + if (!peer) + status_broken("Peer %s already gone?", + type_to_string(tmpctx, struct node_id, &id)); + tal_free(peer); } /*~ lightningd asks us if we know any addresses for a given id. */ @@ -520,12 +440,11 @@ static void handle_recv_gossip(struct daemon *daemon, const u8 *outermsg) tal_hex(tmpctx, outermsg)); } - /* FIXME: happens when peer closes! */ peer = find_peer(daemon, &id); if (!peer) { - status_debug("connectd sent gossip msg %s for unknown peer %s", - peer_wire_name(fromwire_peektype(msg)), - type_to_string(tmpctx, struct node_id, &id)); + status_broken("connectd sent gossip msg %s for unknown peer %s", + peer_wire_name(fromwire_peektype(msg)), + type_to_string(tmpctx, struct node_id, &id)); return; } @@ -613,20 +532,17 @@ static struct io_plan *connectd_req(struct io_conn *conn, enum connectd_gossipd_wire t = fromwire_peektype(msg); switch (t) { - case WIRE_GOSSIPD_NEW_PEER: - return connectd_new_peer(conn, daemon, msg); - /* This is not for this fd! */ case WIRE_GOSSIPD_RECV_GOSSIP: + case WIRE_GOSSIPD_NEW_PEER: + case WIRE_GOSSIPD_PEER_GONE: /* We send these, don't receive them. */ - case WIRE_GOSSIPD_NEW_PEER_REPLY: case WIRE_GOSSIPD_SEND_GOSSIP: break; } - status_broken("Bad msg from connectd: %s", - tal_hex(tmpctx, msg)); - return io_close(conn); + status_failed(STATUS_FAIL_INTERNAL_ERROR, + "Bad msg from connectd: %s", tal_hex(tmpctx, msg)); } /*~ connectd's input handler is very simple. */ @@ -641,10 +557,15 @@ static struct io_plan *connectd_gossip_req(struct io_conn *conn, handle_recv_gossip(daemon, msg); goto handled; - /* This is not for this fd! */ case WIRE_GOSSIPD_NEW_PEER: + connectd_new_peer(daemon, msg); + goto handled; + + case WIRE_GOSSIPD_PEER_GONE: + connectd_peer_gone(daemon, msg); + goto handled; + /* We send these, don't receive them. */ - case WIRE_GOSSIPD_NEW_PEER_REPLY: case WIRE_GOSSIPD_SEND_GOSSIP: break; } diff --git a/gossipd/gossipd.h b/gossipd/gossipd.h index 75b39b8c6..9a73c8974 100644 --- a/gossipd/gossipd.h +++ b/gossipd/gossipd.h @@ -109,9 +109,6 @@ struct peer { void (*query_channel_range_cb)(struct peer *peer, u32 first_blocknum, u32 number_of_blocks, const struct range_query_reply *replies); - - /* The daemon_conn used to queue messages to/from the peer. */ - struct daemon_conn *dc; }; /* Search for a peer. */