From 41b379ed897ad24bf2d68ce022eb15339e430761 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Mon, 18 Jul 2022 21:42:18 +0930 Subject: [PATCH] lightningd: hand fds to connectd, not receive them from connectd. Before this patch: 1. connectd says it's connected (peer_connected) 2. we tell connectd we want to talk about each channel (peer_make_active) 3. connectd gives us an fd for each channel, and we connect it to a subd (peer_active) 4. OR, connectd says it sent something about a channel we didn't tell it about, with an fd (peer_active) Now: 1. connectd says it's connected (peer_connected) 2. we start all appropriate subds and tell connectd to what channels/fds (peer_connect_subd). 3. if connectd says it sent something about a channel we didn't tell it about, we either tell it to hang up (peer_final_msg), or connect a new opening daemon (peer_connect_subd). This is the minimal-size patch, which is why we create socket pairs in so many places to use the existing functions. Many cleanups are possible, since the new flow is so simple. Signed-off-by: Rusty Russell --- connectd/connectd.c | 18 +- connectd/connectd_wire.csv | 19 +- connectd/multiplex.c | 187 +++++-------- connectd/multiplex.h | 2 +- lightningd/channel_control.c | 11 +- lightningd/channel_control.h | 2 +- lightningd/connect_control.c | 8 +- lightningd/dual_open_control.c | 68 +++-- lightningd/dual_open_control.h | 2 +- lightningd/opening_control.c | 23 +- lightningd/peer_control.c | 287 ++++++++++++-------- lightningd/peer_control.h | 3 +- lightningd/test/run-invoice-select-inchan.c | 19 +- wallet/test/run-wallet.c | 19 +- 14 files changed, 380 insertions(+), 288 deletions(-) diff --git a/connectd/connectd.c b/connectd/connectd.c index 510fe67e5..bf28c04e4 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -1899,6 +1899,15 @@ static void dev_suppress_gossip(struct daemon *daemon, const u8 *msg) } #endif /* DEVELOPER */ +static struct io_plan *recv_peer_connect_subd(struct io_conn *conn, + const u8 *msg, + int fd, + struct daemon *daemon) +{ + peer_connect_subd(daemon, msg, fd); + return daemon_conn_read_next(conn, daemon->master); +} + static struct io_plan *recv_req(struct io_conn *conn, const u8 *msg, struct daemon *daemon) @@ -1940,9 +1949,10 @@ static struct io_plan *recv_req(struct io_conn *conn, send_custommsg(daemon, msg); goto out; - case WIRE_CONNECTD_PEER_MAKE_ACTIVE: - peer_make_active(daemon, msg); - goto out; + case WIRE_CONNECTD_PEER_CONNECT_SUBD: + /* This comes with an fd */ + return daemon_conn_read_with_fd(conn, daemon->master, + recv_peer_connect_subd, daemon); case WIRE_CONNECTD_DEV_MEMLEAK: #if DEVELOPER @@ -1958,7 +1968,7 @@ static struct io_plan *recv_req(struct io_conn *conn, case WIRE_CONNECTD_INIT_REPLY: case WIRE_CONNECTD_ACTIVATE_REPLY: case WIRE_CONNECTD_PEER_CONNECTED: - case WIRE_CONNECTD_PEER_ACTIVE: + case WIRE_CONNECTD_PEER_SPOKE: case WIRE_CONNECTD_CONNECT_FAILED: case WIRE_CONNECTD_DEV_MEMLEAK_REPLY: case WIRE_CONNECTD_PING_REPLY: diff --git a/connectd/connectd_wire.csv b/connectd/connectd_wire.csv index 740c86a30..380542262 100644 --- a/connectd/connectd_wire.csv +++ b/connectd/connectd_wire.csv @@ -73,17 +73,16 @@ msgdata,connectd_peer_connected,features,u8,flen msgtype,connectd_peer_disconnect_done,2006 msgdata,connectd_peer_disconnect_done,id,node_id, -# Master -> connectd: make peer active immediately (we want to talk) -msgtype,connectd_peer_make_active,2004 -msgdata,connectd_peer_make_active,id,node_id, -msgdata,connectd_peer_make_active,channel_id,channel_id, +# Master -> connectd: make peer active immediately (we want to talk) (+ fd to subd). +msgtype,connectd_peer_connect_subd,2004 +msgdata,connectd_peer_connect_subd,id,node_id, +msgdata,connectd_peer_connect_subd,channel_id,channel_id, -# Connectd -> master: peer said something interesting (or you said make_active) -# Plus fd for peer daemon. -msgtype,connectd_peer_active,2005 -msgdata,connectd_peer_active,id,node_id, -msgdata,connectd_peer_active,msgtype,?u16, -msgdata,connectd_peer_active,channel_id,channel_id, +# Connectd -> master: peer said something interesting +msgtype,connectd_peer_spoke,2005 +msgdata,connectd_peer_spoke,id,node_id, +msgdata,connectd_peer_spoke,msgtype,u16, +msgdata,connectd_peer_spoke,channel_id,channel_id, # master -> connectd: peer no longer wanted, you can disconnect. msgtype,connectd_discard_peer,2015 diff --git a/connectd/multiplex.c b/connectd/multiplex.c index 78bdbacce..2b9aa522d 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -50,7 +50,7 @@ struct subd { /* The opening revocation basepoint, for v2 channel_id. */ struct pubkey *opener_revocation_basepoint; - /* The actual connection to talk to it */ + /* The actual connection to talk to it (NULL if it's not connected yet) */ struct io_conn *conn; /* Input buffer */ @@ -539,66 +539,6 @@ void send_custommsg(struct daemon *daemon, const u8 *msg) inject_peer_msg(peer, take(custommsg)); } -/* FIXME: fwd decl */ -static struct subd *multiplex_subd_setup(struct peer *peer, - const struct channel_id *channel_id, - int *fd_for_subd); - -static struct subd *activate_subd(struct peer *peer, - const enum peer_wire *type, - const struct channel_id *channel_id) -{ - int fd_for_subd; - u16 t, *tp; - struct subd *subd; - - subd = multiplex_subd_setup(peer, channel_id, &fd_for_subd); - if (!subd) - return NULL; - - /* wire routines want a u16, not an enum */ - if (type) { - t = *type; - tp = &t; - } else { - tp = NULL; - } - - /* We tell lightningd to fire up a subdaemon to handle this! */ - daemon_conn_send(peer->daemon->master, - take(towire_connectd_peer_active(NULL, &peer->id, - tp, - channel_id))); - daemon_conn_send_fd(peer->daemon->master, fd_for_subd); - return subd; -} - -void peer_make_active(struct daemon *daemon, const u8 *msg) -{ - struct node_id id; - struct peer *peer; - struct channel_id channel_id; - - if (!fromwire_connectd_peer_make_active(msg, &id, &channel_id)) - master_badmsg(WIRE_CONNECTD_PEER_MAKE_ACTIVE, msg); - - /* Races can happen: this might be gone by now. */ - peer = peer_htable_get(&daemon->peers, &id); - if (!peer) - return; - - /* Could be disconnecting now */ - if (!peer->to_peer) - return; - - /* Could be made active already by receiving a message (esp reestablish!) */ - if (find_subd(peer, &channel_id)) - return; - - if (!activate_subd(peer, NULL, &channel_id)) - tal_free(peer); -} - static void handle_ping_in(struct peer *peer, const u8 *msg) { u8 *pong; @@ -1037,6 +977,41 @@ static struct io_plan *write_to_subd(struct io_conn *subd_conn, return io_write_wire(subd_conn, take(msg), write_to_subd, subd); } +static void destroy_subd(struct subd *subd) +{ + struct peer *peer = subd->peer; + size_t pos; + + for (pos = 0; peer->subds[pos] != subd; pos++) + assert(pos < tal_count(peer->subds)); + + tal_arr_remove(&peer->subds, pos); + + /* Make sure we try to keep reading from peer (might + * have been waiting for write_to_subd) */ + io_wake(&peer->peer_in); +} + +static struct subd *new_subd(struct peer *peer, + const struct channel_id *channel_id) +{ + struct subd *subd; + + subd = tal(peer->subds, struct subd); + subd->peer = peer; + subd->outq = msg_queue_new(subd, false); + subd->channel_id = *channel_id; + subd->temporary_channel_id = NULL; + subd->opener_revocation_basepoint = NULL; + subd->conn = NULL; + + /* Connect it to the peer */ + tal_arr_expand(&peer->subds, subd); + tal_add_destructor(subd, destroy_subd); + + return subd; +} + static struct io_plan *read_hdr_from_peer(struct io_conn *peer_conn, struct peer *peer); static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn, @@ -1094,15 +1069,18 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn, return read_hdr_from_peer(peer_conn, peer); } - /* If we don't find a subdaemon for this, activate a new one. */ + /* If we don't find a subdaemon for this, crteat a new one. */ subd = find_subd(peer, &channel_id); if (!subd) { enum peer_wire t = fromwire_peektype(decrypted); status_peer_debug(&peer->id, "Activating for message %s", peer_wire_name(t)); - subd = activate_subd(peer, &t, &channel_id); - if (!subd) - return io_close(peer_conn); + subd = new_subd(peer, &channel_id); + /* We tell lightningd to fire up a subdaemon to handle this! */ + daemon_conn_send(peer->daemon->master, + take(towire_connectd_peer_spoke(NULL, &peer->id, + t, + &channel_id))); } /* Even if we just created it, call this to catch open_channel2 */ @@ -1156,53 +1134,6 @@ static struct io_plan *subd_conn_init(struct io_conn *subd_conn, write_to_subd(subd_conn, subd)); } -static void destroy_subd(struct subd *subd) -{ - struct peer *peer = subd->peer; - size_t pos; - - for (pos = 0; peer->subds[pos] != subd; pos++) - assert(pos < tal_count(peer->subds)); - - tal_arr_remove(&peer->subds, pos); - - /* Make sure we try to keep reading from peer (might - * have been waiting for write_to_subd) */ - io_wake(&peer->peer_in); -} - -static struct subd *multiplex_subd_setup(struct peer *peer, - const struct channel_id *channel_id, - int *fd_for_subd) -{ - int fds[2]; - struct subd *subd; - - if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) { - status_broken("Failed to create socketpair: %s", - strerror(errno)); - return NULL; - } - - subd = tal(NULL, struct subd); - subd->peer = peer; - subd->outq = msg_queue_new(subd, false); - subd->channel_id = *channel_id; - subd->temporary_channel_id = NULL; - subd->opener_revocation_basepoint = NULL; - /* This sets subd->conn inside subd_conn_init */ - io_new_conn(peer->subds, fds[0], subd_conn_init, subd); - /* When conn dies, subd is freed. */ - tal_steal(subd->conn, subd); - - /* Connect it to the peer */ - tal_arr_expand(&peer->subds, subd); - tal_add_destructor(subd, destroy_subd); - - *fd_for_subd = fds[1]; - return subd; -} - static void destroy_peer_conn(struct io_conn *peer_conn, struct peer *peer) { assert(peer->to_peer == peer_conn); @@ -1239,6 +1170,38 @@ struct io_plan *multiplex_peer_setup(struct io_conn *peer_conn, write_to_peer(peer_conn, peer)); } +void peer_connect_subd(struct daemon *daemon, const u8 *msg, int fd) +{ + struct node_id id; + struct peer *peer; + struct channel_id channel_id; + struct subd *subd; + + if (!fromwire_connectd_peer_connect_subd(msg, &id, &channel_id)) + master_badmsg(WIRE_CONNECTD_PEER_CONNECT_SUBD, msg); + + /* Races can happen: this might be gone by now. */ + peer = peer_htable_get(&daemon->peers, &id); + if (!peer) { + close(fd); + return; + } + + /* Could be disconnecting now */ + if (!peer->to_peer) { + close(fd); + return; + } + + /* If peer said something, we created this and queued msg. */ + subd = find_subd(peer, &channel_id); + if (!subd) + subd = new_subd(peer, &channel_id); + + assert(!subd->conn); + /* This sets subd->conn inside subd_conn_init */ + io_new_conn(subd, fd, subd_conn_init, subd); +} /* Lightningd says to send a ping */ void send_manual_ping(struct daemon *daemon, const u8 *msg) diff --git a/connectd/multiplex.h b/connectd/multiplex.h index ce2ed7b02..77a41fa2a 100644 --- a/connectd/multiplex.h +++ b/connectd/multiplex.h @@ -33,5 +33,5 @@ void send_manual_ping(struct daemon *daemon, const u8 *msg); void send_custommsg(struct daemon *daemon, const u8 *msg); /* Lightningd wants to talk to you. */ -void peer_make_active(struct daemon *daemon, const u8 *msg); +void peer_connect_subd(struct daemon *daemon, const u8 *msg, int fd); #endif /* LIGHTNING_CONNECTD_MULTIPLEX_H */ diff --git a/lightningd/channel_control.c b/lightningd/channel_control.c index acf9fa13e..c5b16f6cd 100644 --- a/lightningd/channel_control.c +++ b/lightningd/channel_control.c @@ -595,7 +595,7 @@ static unsigned channel_msg(struct subd *sd, const u8 *msg, const int *fds) return 0; } -void peer_start_channeld(struct channel *channel, +bool peer_start_channeld(struct channel *channel, struct peer_fd *peer_fd, const u8 *fwd_msg, bool reconnected, @@ -639,7 +639,7 @@ void peer_start_channeld(struct channel *channel, strerror(errno)); channel_fail_reconnect_later(channel, "Failed to subdaemon channel"); - return; + return false; } htlcs = peer_htlcs(tmpctx, channel); @@ -677,7 +677,7 @@ void peer_start_channeld(struct channel *channel, REASON_LOCAL, "Could not get revocation secret %"PRIu64, num_revocations-1); - return; + return false; } /* Warn once. */ @@ -691,7 +691,7 @@ void peer_start_channeld(struct channel *channel, channel_internal_error(channel, "Could not load remote announcement" " signatures"); - return; + return false; } pbases = wallet_penalty_base_load_for_channel( @@ -706,7 +706,7 @@ void peer_start_channeld(struct channel *channel, channel_internal_error(channel, "Could not derive final_ext_key %"PRIu64, channel->final_key_idx); - return; + return false; } initmsg = towire_channeld_init(tmpctx, @@ -796,6 +796,7 @@ void peer_start_channeld(struct channel *channel, subd_send_msg(channel->owner, take(towire_channeld_funding_depth( NULL, channel->scid, channel->alias[LOCAL], 0))); + return true; } bool channel_tell_depth(struct lightningd *ld, diff --git a/lightningd/channel_control.h b/lightningd/channel_control.h index 365e87655..8f3b8ab19 100644 --- a/lightningd/channel_control.h +++ b/lightningd/channel_control.h @@ -10,7 +10,7 @@ struct lightningd; struct peer_fd; struct peer; -void peer_start_channeld(struct channel *channel, +bool peer_start_channeld(struct channel *channel, struct peer_fd *peer_fd, const u8 *fwd_msg, bool reconnected, diff --git a/lightningd/connect_control.c b/lightningd/connect_control.c index b50d16d47..c0f8670a5 100644 --- a/lightningd/connect_control.c +++ b/lightningd/connect_control.c @@ -471,7 +471,7 @@ static unsigned connectd_msg(struct subd *connectd, const u8 *msg, const int *fd case WIRE_CONNECTD_DEV_MEMLEAK: case WIRE_CONNECTD_DEV_SUPPRESS_GOSSIP: case WIRE_CONNECTD_PEER_FINAL_MSG: - case WIRE_CONNECTD_PEER_MAKE_ACTIVE: + case WIRE_CONNECTD_PEER_CONNECT_SUBD: case WIRE_CONNECTD_PING: case WIRE_CONNECTD_SEND_ONIONMSG: case WIRE_CONNECTD_CUSTOMMSG_OUT: @@ -486,10 +486,8 @@ static unsigned connectd_msg(struct subd *connectd, const u8 *msg, const int *fd peer_connected(connectd->ld, msg); break; - case WIRE_CONNECTD_PEER_ACTIVE: - if (tal_count(fds) != 1) - return 1; - peer_active(connectd->ld, msg, fds[0]); + case WIRE_CONNECTD_PEER_SPOKE: + peer_spoke(connectd->ld, msg); break; case WIRE_CONNECTD_PEER_DISCONNECT_DONE: diff --git a/lightningd/dual_open_control.c b/lightningd/dual_open_control.c index fd33c85dd..9ebc9f2ce 100644 --- a/lightningd/dual_open_control.c +++ b/lightningd/dual_open_control.c @@ -2585,6 +2585,7 @@ static struct command_result *json_openchannel_init(struct command *cmd, struct open_attempt *oa; struct lease_rates *rates; struct command_result *res; + int fds[2]; if (!param(cmd, buffer, params, p_req("id", param_node_id, &id), @@ -2743,10 +2744,28 @@ static struct command_result *json_openchannel_init(struct command *cmd, false, rates); - /* Tell connectd to hand us this so we can start dualopend */ + if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) { + return command_fail(cmd, FUND_MAX_EXCEEDED, + "Failed to create socketpair: %s", + strerror(errno)); + } + + /* Start dualopend! */ + if (!peer_start_dualopend(peer, new_peer_fd(cmd, fds[0]), channel)) { + close(fds[1]); + /* FIXME: gets completed by failure path above! */ + return command_its_complicated("completed by peer_start_dualopend"); + } + + /* Go! */ + subd_send_msg(channel->owner, channel->open_attempt->open_msg); + + /* Tell connectd connect this to this channel id. */ subd_send_msg(peer->ld->connectd, - take(towire_connectd_peer_make_active(NULL, &peer->id, - &channel->cid))); + take(towire_connectd_peer_connect_subd(NULL, + &peer->id, + &channel->cid))); + subd_send_fd(peer->ld->connectd, fds[1]); return command_still_pending(cmd); } @@ -3109,6 +3128,7 @@ static struct command_result *json_queryrates(struct command *cmd, struct open_attempt *oa; u32 *our_upfront_shutdown_script_wallet_index; struct command_result *res; + int fds[2]; if (!param(cmd, buffer, params, p_req("id", param_node_id, &id), @@ -3210,13 +3230,30 @@ static struct command_result *json_queryrates(struct command *cmd, true, NULL); - /* Tell connectd to hand us this so we can start dualopend */ - subd_send_msg(peer->ld->connectd, - take(towire_connectd_peer_make_active(NULL, &peer->id, - &channel->cid))); - return command_still_pending(cmd); + if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) { + return command_fail(cmd, FUND_MAX_EXCEEDED, + "Failed to create socketpair: %s", + strerror(errno)); + } -} + /* Start dualopend! */ + if (!peer_start_dualopend(peer, new_peer_fd(cmd, fds[0]), channel)) { + close(fds[1]); + /* FIXME: gets completed by failure path above! */ + return command_its_complicated("completed by peer_start_dualopend"); + } + + /* Go! */ + subd_send_msg(channel->owner, channel->open_attempt->open_msg); + + /* Tell connectd connect this to this channel id. */ + subd_send_msg(peer->ld->connectd, + take(towire_connectd_peer_connect_subd(NULL, + &peer->id, + &channel->cid))); + subd_send_fd(peer->ld->connectd, fds[1]); + return command_still_pending(cmd); + } static const struct json_command queryrates_command = { "dev-queryrates", @@ -3333,7 +3370,7 @@ bool peer_start_dualopend(struct peer *peer, return true; } -void peer_restart_dualopend(struct peer *peer, +bool peer_restart_dualopend(struct peer *peer, struct peer_fd *peer_fd, struct channel *channel) { @@ -3345,10 +3382,9 @@ void peer_restart_dualopend(struct peer *peer, u32 *local_shutdown_script_wallet_index; u8 *msg; - if (channel_unsaved(channel)) { - peer_start_dualopend(peer, peer_fd, channel); - return; - } + if (channel_unsaved(channel)) + return peer_start_dualopend(peer, peer_fd, channel); + hsmfd = hsm_get_client_fd(peer->ld, &peer->id, channel->dbid, HSM_CAP_COMMITMENT_POINT | HSM_CAP_SIGN_REMOTE_TX @@ -3371,7 +3407,7 @@ void peer_restart_dualopend(struct peer *peer, strerror(errno)); channel_fail_reconnect_later(channel, "Failed to subdaemon channel"); - return; + return false; } /* Find the max self delay and min htlc capacity */ @@ -3434,6 +3470,6 @@ void peer_restart_dualopend(struct peer *peer, inflight->lease_chan_max_msat, inflight->lease_chan_max_ppt); - subd_send_msg(channel->owner, take(msg)); + return true; } diff --git a/lightningd/dual_open_control.h b/lightningd/dual_open_control.h index aa623659f..7d34d9816 100644 --- a/lightningd/dual_open_control.h +++ b/lightningd/dual_open_control.h @@ -9,7 +9,7 @@ struct peer_fd; bool peer_start_dualopend(struct peer *peer, struct peer_fd *peer_fd, struct channel *channel); -void peer_restart_dualopend(struct peer *peer, +bool peer_restart_dualopend(struct peer *peer, struct peer_fd *peer_fd, struct channel *channel); diff --git a/lightningd/opening_control.c b/lightningd/opening_control.c index e2081e479..928d64325 100644 --- a/lightningd/opening_control.c +++ b/lightningd/opening_control.c @@ -1083,7 +1083,7 @@ static struct command_result *json_fundchannel_start(struct command *cmd, struct peer *peer; bool *announce_channel; u32 *feerate_per_kw, *mindepth; - + int fds[2]; struct amount_sat *amount; struct amount_msat *push_msat; u32 *upfront_shutdown_script_wallet_index; @@ -1236,10 +1236,25 @@ static struct command_result *json_fundchannel_start(struct command *cmd, &tmp_channel_id, fc->channel_flags); - /* Tell connectd to make this active; when it does, we can continue */ + if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) { + return command_fail(cmd, FUND_MAX_EXCEEDED, + "Failed to create socketpair: %s", + strerror(errno)); + } + if (!peer_start_openingd(peer, new_peer_fd(cmd, fds[0]))) { + close(fds[1]); + /* FIXME: gets completed by failure path above! */ + return command_its_complicated("completed by peer_start_openingd"); + } + /* Tell it to start funding */ + subd_send_msg(peer->uncommitted_channel->open_daemon, fc->open_msg); + + /* Tell connectd connect this to this channel id. */ subd_send_msg(peer->ld->connectd, - take(towire_connectd_peer_make_active(NULL, &peer->id, - &tmp_channel_id))); + take(towire_connectd_peer_connect_subd(NULL, + &peer->id, + &peer->uncommitted_channel->cid))); + subd_send_fd(peer->ld->connectd, fds[1]); return command_still_pending(cmd); } diff --git a/lightningd/peer_control.c b/lightningd/peer_control.c index 93cfa0724..518e4cff2 100644 --- a/lightningd/peer_control.c +++ b/lightningd/peer_control.c @@ -1007,6 +1007,91 @@ peer_connected_serialize(struct peer_connected_hook_payload *payload, json_object_end(stream); /* .peer */ } +/* Talk to connectd about an active channel */ +static void connect_activate_subd(struct lightningd *ld, struct channel *channel) +{ + const u8 *error; + int fds[2]; + + /* If we have a canned error for this channel, send it now */ + if (channel->error) { + error = channel->error; + goto send_error; + } + + switch (channel->state) { + case ONCHAIN: + case FUNDING_SPEND_SEEN: + case CLOSINGD_COMPLETE: + case CLOSED: + /* Channel is active */ + abort(); + case AWAITING_UNILATERAL: + /* channel->error is not saved in db, so this can + * happen if we restart. */ + error = towire_errorfmt(tmpctx, &channel->cid, + "Awaiting unilateral close"); + goto send_error; + + case DUALOPEND_OPEN_INIT: + case DUALOPEND_AWAITING_LOCKIN: + assert(!channel->owner); + if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) { + log_broken(channel->log, + "Failed to create socketpair: %s", + strerror(errno)); + error = towire_warningfmt(tmpctx, &channel->cid, + "Trouble in paradise?"); + goto send_error; + } + if (peer_restart_dualopend(channel->peer, + new_peer_fd(tmpctx, fds[0]), + channel)) + goto tell_connectd; + close(fds[1]); + return; + + case CHANNELD_AWAITING_LOCKIN: + case CHANNELD_NORMAL: + case CHANNELD_SHUTTING_DOWN: + case CLOSINGD_SIGEXCHANGE: + assert(!channel->owner); + if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) { + log_broken(channel->log, + "Failed to create socketpair: %s", + strerror(errno)); + error = towire_warningfmt(tmpctx, &channel->cid, + "Trouble in paradise?"); + goto send_error; + } + if (peer_start_channeld(channel, + new_peer_fd(tmpctx, fds[0]), + NULL, true, + NULL)) { + goto tell_connectd; + } + close(fds[1]); + return; + } + abort(); + +tell_connectd: + subd_send_msg(ld->connectd, + take(towire_connectd_peer_connect_subd(NULL, + &channel->peer->id, + &channel->cid))); + subd_send_fd(ld->connectd, fds[1]); + return; + +send_error: + log_debug(channel->log, "Telling connectd to send error %s", + tal_hex(tmpctx, error)); + /* Get connectd to send error and close. */ + subd_send_msg(ld->connectd, + take(towire_connectd_peer_final_msg(NULL, &channel->peer->id, + error))); +} + static void peer_connected_hook_final(struct peer_connected_hook_payload *payload STEALS) { struct lightningd *ld = payload->ld; @@ -1041,23 +1126,27 @@ static void peer_connected_hook_final(struct peer_connected_hook_payload *payloa /* Notify anyone who cares */ notify_connect(ld, &peer->id, payload->incoming, &addr); - list_for_each(&peer->channels, channel, list) { #if DEVELOPER - if (dev_disconnect_permanent(ld)) { + /* Developer hack to fail all channels on permfail line. */ + if (dev_disconnect_permanent(ld)) { + list_for_each(&peer->channels, channel, list) { channel_fail_permanent(channel, REASON_LOCAL, "dev_disconnect permfail"); - error = channel->error; - goto send_error; + subd_send_msg(ld->connectd, + take(towire_connectd_peer_final_msg(NULL, &peer->id, + channel->error))); } + return; + } #endif + /* connect appropriate subds for all (active) channels! */ + list_for_each(&peer->channels, channel, list) { if (channel_active(channel)) { - log_debug(channel->log, "Peer has reconnected, state %s: telling connectd to make active", + log_debug(channel->log, "Peer has reconnected, state %s: connecting subd", channel_state_name(channel)); - subd_send_msg(ld->connectd, - take(towire_connectd_peer_make_active(NULL, &peer->id, - &channel->cid))); + connect_activate_subd(ld, channel); } } return; @@ -1239,22 +1328,23 @@ void peer_connected(struct lightningd *ld, const u8 *msg) plugin_hook_call_peer_connected(ld, hook_payload); } -/* connectd tells us a peer has an interesting message, and hands us an - * fd to give to the correct subdaemon. Unlike peer_connected, this is racy: - * we might have just told it to disconnect peer. */ -void peer_active(struct lightningd *ld, const u8 *msg, int fd) +/* connectd tells us a peer has a message and we've not already attached + * a subd. Normally this is a race, but it happens for real when opening + * a new channel, or referring to a channel we no longer want to talk to + * it about. */ +void peer_spoke(struct lightningd *ld, const u8 *msg) { struct node_id id; - u16 *msgtype; + u16 msgtype; struct channel *channel; struct channel_id channel_id; struct peer *peer; bool dual_fund; u8 *error; - struct peer_fd *peer_fd = new_peer_fd(tmpctx, fd); + int fds[2]; - if (!fromwire_connectd_peer_active(msg, msg, &id, &msgtype, &channel_id)) - fatal("Connectd gave bad CONNECTD_PEER_ACTIVE message %s", + if (!fromwire_connectd_peer_spoke(msg, &id, &msgtype, &channel_id)) + fatal("Connectd gave bad CONNECTD_PEER_SPOKE message %s", tal_hex(msg, msg)); peer = peer_by_id(ld, &id); @@ -1274,83 +1364,44 @@ void peer_active(struct lightningd *ld, const u8 *msg, int fd) goto send_error; } - switch (channel->state) { - case ONCHAIN: - case FUNDING_SPEND_SEEN: - case CLOSINGD_COMPLETE: - goto channel_is_closed; - case CLOSED: - /* Channel should not have been loaded */ - abort(); - case AWAITING_UNILATERAL: { - /* channel->error is not saved in db, so this can - * happen if we restart. */ - error = towire_errorfmt(tmpctx, &channel->cid, - "Awaiting unilateral close"); - goto send_error; - } - case DUALOPEND_OPEN_INIT: - /* We asked for this, to open? */ - if (!msgtype - && channel->open_attempt - && channel->open_attempt->open_msg) { - if (peer_start_dualopend(peer, peer_fd, channel)) - subd_send_msg(channel->owner, channel->open_attempt->open_msg); - return; + /* If channel is active, we raced, so ignore this: + * subd will get it soon. */ + if (channel_active(channel)) + return; + + if (msgtype == WIRE_CHANNEL_REESTABLISH) { + log_debug(channel->log, + "Reestablish on %s channel: using channeld to reply", + channel_state_name(channel)); + if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) { + log_broken(channel->log, + "Failed to create socketpair: %s", + strerror(errno)); + error = towire_warningfmt(tmpctx, &channel->cid, + "Trouble in paradise?"); + goto send_error; } - /* Fall through. */ - case DUALOPEND_AWAITING_LOCKIN: - assert(!channel->owner); - peer_restart_dualopend(peer, peer_fd, channel); - return; - case CHANNELD_AWAITING_LOCKIN: - case CHANNELD_NORMAL: - case CHANNELD_SHUTTING_DOWN: - case CLOSINGD_SIGEXCHANGE: - /* Maybe old owner was too slow exiting? */ - tal_free(channel->owner); - peer_start_channeld(channel, - peer_fd, - NULL, true, - NULL); + if (peer_start_channeld(channel, new_peer_fd(tmpctx, fds[0]), NULL, true, true)) { + goto tell_connectd; + } + /* FIXME: Send informative error? */ + close(fds[1]); return; } - abort(); + + /* Send generic error. */ + error = towire_errorfmt(tmpctx, &channel_id, + "channel in state %s", + channel_state_name(channel)); + goto send_error; } dual_fund = feature_negotiated(ld->our_features, peer->their_features, OPT_DUAL_FUND); - /* Did we ask for this? */ - if (!msgtype) { - /* If it was dual_fund, it will have peer_unsaved_channel above */ - if (dual_fund) { - log_broken(ld->log, "Unsolicited active df peer %s?", - type_to_string(tmpctx, struct node_id, - &peer->id)); - } else { - const struct uncommitted_channel *uc - = peer->uncommitted_channel; - - if (!uc->open_daemon - && uc->fc - && uc->fc->open_msg) { - if (peer_start_openingd(peer, peer_fd)) { - subd_send_msg(uc->open_daemon, - uc->fc->open_msg); - } - } else { - log_broken(ld->log, "Unsolicited active peer %s?", - type_to_string(tmpctx, struct node_id, - &peer->id)); - } - } - return; - } - /* OK, it's an unknown channel. Create a new one if they're trying. */ - switch (*msgtype) { + switch (msgtype) { case WIRE_OPEN_CHANNEL: if (dual_fund) { error = towire_errorfmt(tmpctx, &channel_id, @@ -1364,8 +1415,21 @@ void peer_active(struct lightningd *ld, const u8 *msg, int fd) } peer->uncommitted_channel = new_uncommitted_channel(peer); peer->uncommitted_channel->cid = channel_id; - peer_start_openingd(peer, peer_fd); - break; + if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) { + log_broken(ld->log, + "Failed to create socketpair: %s", + strerror(errno)); + error = towire_warningfmt(tmpctx, &channel_id, + "Trouble in paradise?"); + goto send_error; + } + if (peer_start_openingd(peer, new_peer_fd(tmpctx, fds[0]))) { + goto tell_connectd; + } + /* FIXME: Send informative error? */ + close(fds[1]); + return; + case WIRE_OPEN_CHANNEL2: if (!dual_fund) { error = towire_errorfmt(tmpctx, &channel_id, @@ -1376,36 +1440,29 @@ void peer_active(struct lightningd *ld, const u8 *msg, int fd) peer->ld->config.fee_base, peer->ld->config.fee_per_satoshi); channel->cid = channel_id; - peer_start_dualopend(peer, peer_fd, channel); - break; - default: - log_peer_unusual(ld->log, &peer->id, - "Unknown channel %s for %s", - type_to_string(tmpctx, struct channel_id, - &channel_id), - peer_wire_name(*msgtype)); - error = towire_errorfmt(tmpctx, &channel_id, - "Unknown channel for %s", peer_wire_name(*msgtype)); - goto send_error; - break; - } - return; - -channel_is_closed: - if (msgtype && *msgtype == WIRE_CHANNEL_REESTABLISH) { - log_debug(channel->log, - "Reestablish on %s channel: using channeld to reply", - channel_state_name(channel)); - peer_start_channeld(channel, peer_fd, NULL, true, true); + if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) { + log_broken(ld->log, + "Failed to create socketpair: %s", + strerror(errno)); + error = towire_warningfmt(tmpctx, &channel_id, + "Trouble in paradise?"); + goto send_error; + } + if (peer_start_dualopend(peer, new_peer_fd(tmpctx, fds[0]), channel)) + goto tell_connectd; + /* FIXME: Send informative error? */ + close(fds[1]); return; } - /* Retransmit error if we have one. Otherwise generic error. */ - error = channel->error; - if (!error) - error = towire_errorfmt(tmpctx, &channel_id, - "channel in state %s", - channel_state_name(channel)); + /* Weird message? Log and reply with error. */ + log_peer_unusual(ld->log, &peer->id, + "Unknown channel %s for %s", + type_to_string(tmpctx, struct channel_id, + &channel_id), + peer_wire_name(msgtype)); + error = towire_errorfmt(tmpctx, &channel_id, + "Unknown channel for %s", peer_wire_name(msgtype)); send_error: log_peer_debug(ld->log, &peer->id, "Telling connectd to send error %s", @@ -1414,6 +1471,12 @@ send_error: subd_send_msg(ld->connectd, take(towire_connectd_peer_final_msg(NULL, &peer->id, error))); + return; + +tell_connectd: + subd_send_msg(ld->connectd, + take(towire_connectd_peer_connect_subd(NULL, &id, &channel_id))); + subd_send_fd(ld->connectd, fds[1]); } struct disconnect_command { diff --git a/lightningd/peer_control.h b/lightningd/peer_control.h index e4d94777a..f7ff4abd5 100644 --- a/lightningd/peer_control.h +++ b/lightningd/peer_control.h @@ -80,7 +80,8 @@ struct peer *peer_from_json(struct lightningd *ld, /* connectd tells us what peer is doing */ void peer_connected(struct lightningd *ld, const u8 *msg); void peer_disconnect_done(struct lightningd *ld, const u8 *msg); -void peer_active(struct lightningd *ld, const u8 *msg, int peer_fd); +void peer_spoke(struct lightningd *ld, const u8 *msg); + /* May delete peer! */ void peer_channels_cleanup_on_disconnect(struct peer *peer); diff --git a/lightningd/test/run-invoice-select-inchan.c b/lightningd/test/run-invoice-select-inchan.c index 5e0c2174c..45c914bdc 100644 --- a/lightningd/test/run-invoice-select-inchan.c +++ b/lightningd/test/run-invoice-select-inchan.c @@ -220,15 +220,15 @@ bool fromwire_channel_id(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, /* Generated stub for fromwire_channeld_dev_memleak_reply */ bool fromwire_channeld_dev_memleak_reply(const void *p UNNEEDED, bool *leak UNNEEDED) { fprintf(stderr, "fromwire_channeld_dev_memleak_reply called!\n"); abort(); } -/* Generated stub for fromwire_connectd_peer_active */ -bool fromwire_connectd_peer_active(const tal_t *ctx UNNEEDED, const void *p UNNEEDED, struct node_id *id UNNEEDED, u16 **msgtype UNNEEDED, struct channel_id *channel_id UNNEEDED) -{ fprintf(stderr, "fromwire_connectd_peer_active called!\n"); abort(); } /* Generated stub for fromwire_connectd_peer_connected */ bool fromwire_connectd_peer_connected(const tal_t *ctx UNNEEDED, const void *p UNNEEDED, struct node_id *id UNNEEDED, struct wireaddr_internal *addr UNNEEDED, struct wireaddr **remote_addr UNNEEDED, bool *incoming UNNEEDED, u8 **features UNNEEDED) { fprintf(stderr, "fromwire_connectd_peer_connected called!\n"); abort(); } /* Generated stub for fromwire_connectd_peer_disconnect_done */ bool fromwire_connectd_peer_disconnect_done(const void *p UNNEEDED, struct node_id *id UNNEEDED) { fprintf(stderr, "fromwire_connectd_peer_disconnect_done called!\n"); abort(); } +/* Generated stub for fromwire_connectd_peer_spoke */ +bool fromwire_connectd_peer_spoke(const void *p UNNEEDED, struct node_id *id UNNEEDED, u16 *msgtype UNNEEDED, struct channel_id *channel_id UNNEEDED) +{ fprintf(stderr, "fromwire_connectd_peer_spoke called!\n"); abort(); } /* Generated stub for fromwire_dualopend_dev_memleak_reply */ bool fromwire_dualopend_dev_memleak_reply(const void *p UNNEEDED, bool *leak UNNEEDED) { fprintf(stderr, "fromwire_dualopend_dev_memleak_reply called!\n"); abort(); } @@ -647,12 +647,12 @@ struct command_result *param_u64(struct command *cmd UNNEEDED, const char *name struct channel *peer_any_active_channel(struct peer *peer UNNEEDED, bool *others UNNEEDED) { fprintf(stderr, "peer_any_active_channel called!\n"); abort(); } /* Generated stub for peer_restart_dualopend */ -void peer_restart_dualopend(struct peer *peer UNNEEDED, +bool peer_restart_dualopend(struct peer *peer UNNEEDED, struct peer_fd *peer_fd UNNEEDED, struct channel *channel UNNEEDED) { fprintf(stderr, "peer_restart_dualopend called!\n"); abort(); } /* Generated stub for peer_start_channeld */ -void peer_start_channeld(struct channel *channel UNNEEDED, +bool peer_start_channeld(struct channel *channel UNNEEDED, struct peer_fd *peer_fd UNNEEDED, const u8 *fwd_msg UNNEEDED, bool reconnected UNNEEDED, @@ -693,6 +693,9 @@ struct subd_req *subd_req_(const tal_t *ctx UNNEEDED, void (*replycb)(struct subd * UNNEEDED, const u8 * UNNEEDED, const int * UNNEEDED, void *) UNNEEDED, void *replycb_data UNNEEDED) { fprintf(stderr, "subd_req_ called!\n"); abort(); } +/* Generated stub for subd_send_fd */ +void subd_send_fd(struct subd *sd UNNEEDED, int fd UNNEEDED) +{ fprintf(stderr, "subd_send_fd called!\n"); abort(); } /* Generated stub for subd_send_msg */ void subd_send_msg(struct subd *sd UNNEEDED, const u8 *msg_out UNNEEDED) { fprintf(stderr, "subd_send_msg called!\n"); abort(); } @@ -711,12 +714,12 @@ u8 *towire_channeld_dev_memleak(const tal_t *ctx UNNEEDED) /* Generated stub for towire_channeld_dev_reenable_commit */ u8 *towire_channeld_dev_reenable_commit(const tal_t *ctx UNNEEDED) { fprintf(stderr, "towire_channeld_dev_reenable_commit called!\n"); abort(); } +/* Generated stub for towire_connectd_peer_connect_subd */ +u8 *towire_connectd_peer_connect_subd(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, const struct channel_id *channel_id UNNEEDED) +{ fprintf(stderr, "towire_connectd_peer_connect_subd called!\n"); abort(); } /* Generated stub for towire_connectd_peer_final_msg */ u8 *towire_connectd_peer_final_msg(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, const u8 *msg UNNEEDED) { fprintf(stderr, "towire_connectd_peer_final_msg called!\n"); abort(); } -/* Generated stub for towire_connectd_peer_make_active */ -u8 *towire_connectd_peer_make_active(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, const struct channel_id *channel_id UNNEEDED) -{ fprintf(stderr, "towire_connectd_peer_make_active called!\n"); abort(); } /* Generated stub for towire_dualopend_dev_memleak */ u8 *towire_dualopend_dev_memleak(const tal_t *ctx UNNEEDED) { fprintf(stderr, "towire_dualopend_dev_memleak called!\n"); abort(); } diff --git a/wallet/test/run-wallet.c b/wallet/test/run-wallet.c index 7f0c46968..ccbc34fdf 100644 --- a/wallet/test/run-wallet.c +++ b/wallet/test/run-wallet.c @@ -151,15 +151,15 @@ bool fromwire_channeld_offer_htlc_reply(const tal_t *ctx UNNEEDED, const void *p /* Generated stub for fromwire_channeld_sending_commitsig */ bool fromwire_channeld_sending_commitsig(const tal_t *ctx UNNEEDED, const void *p UNNEEDED, u64 *commitnum UNNEEDED, struct penalty_base **pbase UNNEEDED, struct fee_states **fee_states UNNEEDED, struct height_states **blockheight_states UNNEEDED, struct changed_htlc **changed UNNEEDED, struct bitcoin_signature *commit_sig UNNEEDED, struct bitcoin_signature **htlc_sigs UNNEEDED) { fprintf(stderr, "fromwire_channeld_sending_commitsig called!\n"); abort(); } -/* Generated stub for fromwire_connectd_peer_active */ -bool fromwire_connectd_peer_active(const tal_t *ctx UNNEEDED, const void *p UNNEEDED, struct node_id *id UNNEEDED, u16 **msgtype UNNEEDED, struct channel_id *channel_id UNNEEDED) -{ fprintf(stderr, "fromwire_connectd_peer_active called!\n"); abort(); } /* Generated stub for fromwire_connectd_peer_connected */ bool fromwire_connectd_peer_connected(const tal_t *ctx UNNEEDED, const void *p UNNEEDED, struct node_id *id UNNEEDED, struct wireaddr_internal *addr UNNEEDED, struct wireaddr **remote_addr UNNEEDED, bool *incoming UNNEEDED, u8 **features UNNEEDED) { fprintf(stderr, "fromwire_connectd_peer_connected called!\n"); abort(); } /* Generated stub for fromwire_connectd_peer_disconnect_done */ bool fromwire_connectd_peer_disconnect_done(const void *p UNNEEDED, struct node_id *id UNNEEDED) { fprintf(stderr, "fromwire_connectd_peer_disconnect_done called!\n"); abort(); } +/* Generated stub for fromwire_connectd_peer_spoke */ +bool fromwire_connectd_peer_spoke(const void *p UNNEEDED, struct node_id *id UNNEEDED, u16 *msgtype UNNEEDED, struct channel_id *channel_id UNNEEDED) +{ fprintf(stderr, "fromwire_connectd_peer_spoke called!\n"); abort(); } /* Generated stub for fromwire_dualopend_dev_memleak_reply */ bool fromwire_dualopend_dev_memleak_reply(const void *p UNNEEDED, bool *leak UNNEEDED) { fprintf(stderr, "fromwire_dualopend_dev_memleak_reply called!\n"); abort(); } @@ -633,12 +633,12 @@ void payment_succeeded(struct lightningd *ld UNNEEDED, struct htlc_out *hout UNN const struct preimage *rval UNNEEDED) { fprintf(stderr, "payment_succeeded called!\n"); abort(); } /* Generated stub for peer_restart_dualopend */ -void peer_restart_dualopend(struct peer *peer UNNEEDED, +bool peer_restart_dualopend(struct peer *peer UNNEEDED, struct peer_fd *peer_fd UNNEEDED, struct channel *channel UNNEEDED) { fprintf(stderr, "peer_restart_dualopend called!\n"); abort(); } /* Generated stub for peer_start_channeld */ -void peer_start_channeld(struct channel *channel UNNEEDED, +bool peer_start_channeld(struct channel *channel UNNEEDED, struct peer_fd *peer_fd UNNEEDED, const u8 *fwd_msg UNNEEDED, bool reconnected UNNEEDED, @@ -696,6 +696,9 @@ struct subd_req *subd_req_(const tal_t *ctx UNNEEDED, void (*replycb)(struct subd * UNNEEDED, const u8 * UNNEEDED, const int * UNNEEDED, void *) UNNEEDED, void *replycb_data UNNEEDED) { fprintf(stderr, "subd_req_ called!\n"); abort(); } +/* Generated stub for subd_send_fd */ +void subd_send_fd(struct subd *sd UNNEEDED, int fd UNNEEDED) +{ fprintf(stderr, "subd_send_fd called!\n"); abort(); } /* Generated stub for subd_send_msg */ void subd_send_msg(struct subd *sd UNNEEDED, const u8 *msg_out UNNEEDED) { fprintf(stderr, "subd_send_msg called!\n"); abort(); } @@ -738,12 +741,12 @@ u8 *towire_channeld_offer_htlc(const tal_t *ctx UNNEEDED, struct amount_msat amo /* Generated stub for towire_channeld_sending_commitsig_reply */ u8 *towire_channeld_sending_commitsig_reply(const tal_t *ctx UNNEEDED) { fprintf(stderr, "towire_channeld_sending_commitsig_reply called!\n"); abort(); } +/* Generated stub for towire_connectd_peer_connect_subd */ +u8 *towire_connectd_peer_connect_subd(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, const struct channel_id *channel_id UNNEEDED) +{ fprintf(stderr, "towire_connectd_peer_connect_subd called!\n"); abort(); } /* Generated stub for towire_connectd_peer_final_msg */ u8 *towire_connectd_peer_final_msg(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, const u8 *msg UNNEEDED) { fprintf(stderr, "towire_connectd_peer_final_msg called!\n"); abort(); } -/* Generated stub for towire_connectd_peer_make_active */ -u8 *towire_connectd_peer_make_active(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, const struct channel_id *channel_id UNNEEDED) -{ fprintf(stderr, "towire_connectd_peer_make_active called!\n"); abort(); } /* Generated stub for towire_dualopend_dev_memleak */ u8 *towire_dualopend_dev_memleak(const tal_t *ctx UNNEEDED) { fprintf(stderr, "towire_dualopend_dev_memleak called!\n"); abort(); }