lightningd: hand fds to connectd, not receive them from connectd.

Before this patch:
1. connectd says it's connected (peer_connected)
2. we tell connectd we want to talk about each channel (peer_make_active)
3. connectd gives us an fd for each channel, and we connect it to a subd (peer_active)
4. OR, connectd says it sent something about a channel we didn't tell it about, with an fd (peer_active)

Now:
1. connectd says it's connected (peer_connected)
2. we start all appropriate subds and tell connectd to what channels/fds (peer_connect_subd).
3. if connectd says it sent something about a channel we didn't tell it about, we either tell
   it to hang up (peer_final_msg), or connect a new opening daemon (peer_connect_subd).

This is the minimal-size patch, which is why we create socket pairs in
so many places to use the existing functions.  Many cleanups are
possible, since the new flow is so simple.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell 2022-07-18 21:42:18 +09:30 committed by neil saitug
parent 430d6521a0
commit 41b379ed89
14 changed files with 380 additions and 288 deletions

View file

@ -1899,6 +1899,15 @@ static void dev_suppress_gossip(struct daemon *daemon, const u8 *msg)
}
#endif /* DEVELOPER */
static struct io_plan *recv_peer_connect_subd(struct io_conn *conn,
const u8 *msg,
int fd,
struct daemon *daemon)
{
peer_connect_subd(daemon, msg, fd);
return daemon_conn_read_next(conn, daemon->master);
}
static struct io_plan *recv_req(struct io_conn *conn,
const u8 *msg,
struct daemon *daemon)
@ -1940,9 +1949,10 @@ static struct io_plan *recv_req(struct io_conn *conn,
send_custommsg(daemon, msg);
goto out;
case WIRE_CONNECTD_PEER_MAKE_ACTIVE:
peer_make_active(daemon, msg);
goto out;
case WIRE_CONNECTD_PEER_CONNECT_SUBD:
/* This comes with an fd */
return daemon_conn_read_with_fd(conn, daemon->master,
recv_peer_connect_subd, daemon);
case WIRE_CONNECTD_DEV_MEMLEAK:
#if DEVELOPER
@ -1958,7 +1968,7 @@ static struct io_plan *recv_req(struct io_conn *conn,
case WIRE_CONNECTD_INIT_REPLY:
case WIRE_CONNECTD_ACTIVATE_REPLY:
case WIRE_CONNECTD_PEER_CONNECTED:
case WIRE_CONNECTD_PEER_ACTIVE:
case WIRE_CONNECTD_PEER_SPOKE:
case WIRE_CONNECTD_CONNECT_FAILED:
case WIRE_CONNECTD_DEV_MEMLEAK_REPLY:
case WIRE_CONNECTD_PING_REPLY:

View file

@ -73,17 +73,16 @@ msgdata,connectd_peer_connected,features,u8,flen
msgtype,connectd_peer_disconnect_done,2006
msgdata,connectd_peer_disconnect_done,id,node_id,
# Master -> connectd: make peer active immediately (we want to talk)
msgtype,connectd_peer_make_active,2004
msgdata,connectd_peer_make_active,id,node_id,
msgdata,connectd_peer_make_active,channel_id,channel_id,
# Master -> connectd: make peer active immediately (we want to talk) (+ fd to subd).
msgtype,connectd_peer_connect_subd,2004
msgdata,connectd_peer_connect_subd,id,node_id,
msgdata,connectd_peer_connect_subd,channel_id,channel_id,
# Connectd -> master: peer said something interesting (or you said make_active)
# Plus fd for peer daemon.
msgtype,connectd_peer_active,2005
msgdata,connectd_peer_active,id,node_id,
msgdata,connectd_peer_active,msgtype,?u16,
msgdata,connectd_peer_active,channel_id,channel_id,
# Connectd -> master: peer said something interesting
msgtype,connectd_peer_spoke,2005
msgdata,connectd_peer_spoke,id,node_id,
msgdata,connectd_peer_spoke,msgtype,u16,
msgdata,connectd_peer_spoke,channel_id,channel_id,
# master -> connectd: peer no longer wanted, you can disconnect.
msgtype,connectd_discard_peer,2015

1 #include <bitcoin/block.h>
73 msgtype,connectd_peer_active,2005 msgdata,connectd_peer_spoke,id,node_id,
74 msgdata,connectd_peer_active,id,node_id, msgdata,connectd_peer_spoke,msgtype,u16,
75 msgdata,connectd_peer_active,msgtype,?u16, msgdata,connectd_peer_spoke,channel_id,channel_id,
76 msgdata,connectd_peer_active,channel_id,channel_id, # master -> connectd: peer no longer wanted, you can disconnect.
77 # master -> connectd: peer no longer wanted, you can disconnect. msgtype,connectd_discard_peer,2015
78 msgtype,connectd_discard_peer,2015 msgdata,connectd_discard_peer,id,node_id,
79 msgdata,connectd_discard_peer,id,node_id, # master -> connectd: give message to peer and disconnect.
80 # master -> connectd: give message to peer and disconnect. msgtype,connectd_peer_final_msg,2003
81 msgtype,connectd_peer_final_msg,2003 msgdata,connectd_peer_final_msg,id,node_id,
82 msgdata,connectd_peer_final_msg,id,node_id, msgdata,connectd_peer_final_msg,len,u16,
83 msgdata,connectd_peer_final_msg,len,u16, msgdata,connectd_peer_final_msg,msg,u8,len
84 msgdata,connectd_peer_final_msg,msg,u8,len # master -> connectd: do you have a memleak?
85 # master -> connectd: do you have a memleak? msgtype,connectd_dev_memleak,2033
msgtype,connectd_dev_memleak,2033
86 msgtype,connectd_dev_memleak_reply,2133
87 msgdata,connectd_dev_memleak_reply,leak,bool,
88 # Ping/pong test. Waits for a reply if it expects one.

View file

@ -50,7 +50,7 @@ struct subd {
/* The opening revocation basepoint, for v2 channel_id. */
struct pubkey *opener_revocation_basepoint;
/* The actual connection to talk to it */
/* The actual connection to talk to it (NULL if it's not connected yet) */
struct io_conn *conn;
/* Input buffer */
@ -539,66 +539,6 @@ void send_custommsg(struct daemon *daemon, const u8 *msg)
inject_peer_msg(peer, take(custommsg));
}
/* FIXME: fwd decl */
static struct subd *multiplex_subd_setup(struct peer *peer,
const struct channel_id *channel_id,
int *fd_for_subd);
static struct subd *activate_subd(struct peer *peer,
const enum peer_wire *type,
const struct channel_id *channel_id)
{
int fd_for_subd;
u16 t, *tp;
struct subd *subd;
subd = multiplex_subd_setup(peer, channel_id, &fd_for_subd);
if (!subd)
return NULL;
/* wire routines want a u16, not an enum */
if (type) {
t = *type;
tp = &t;
} else {
tp = NULL;
}
/* We tell lightningd to fire up a subdaemon to handle this! */
daemon_conn_send(peer->daemon->master,
take(towire_connectd_peer_active(NULL, &peer->id,
tp,
channel_id)));
daemon_conn_send_fd(peer->daemon->master, fd_for_subd);
return subd;
}
void peer_make_active(struct daemon *daemon, const u8 *msg)
{
struct node_id id;
struct peer *peer;
struct channel_id channel_id;
if (!fromwire_connectd_peer_make_active(msg, &id, &channel_id))
master_badmsg(WIRE_CONNECTD_PEER_MAKE_ACTIVE, msg);
/* Races can happen: this might be gone by now. */
peer = peer_htable_get(&daemon->peers, &id);
if (!peer)
return;
/* Could be disconnecting now */
if (!peer->to_peer)
return;
/* Could be made active already by receiving a message (esp reestablish!) */
if (find_subd(peer, &channel_id))
return;
if (!activate_subd(peer, NULL, &channel_id))
tal_free(peer);
}
static void handle_ping_in(struct peer *peer, const u8 *msg)
{
u8 *pong;
@ -1037,6 +977,41 @@ static struct io_plan *write_to_subd(struct io_conn *subd_conn,
return io_write_wire(subd_conn, take(msg), write_to_subd, subd);
}
static void destroy_subd(struct subd *subd)
{
struct peer *peer = subd->peer;
size_t pos;
for (pos = 0; peer->subds[pos] != subd; pos++)
assert(pos < tal_count(peer->subds));
tal_arr_remove(&peer->subds, pos);
/* Make sure we try to keep reading from peer (might
* have been waiting for write_to_subd) */
io_wake(&peer->peer_in);
}
static struct subd *new_subd(struct peer *peer,
const struct channel_id *channel_id)
{
struct subd *subd;
subd = tal(peer->subds, struct subd);
subd->peer = peer;
subd->outq = msg_queue_new(subd, false);
subd->channel_id = *channel_id;
subd->temporary_channel_id = NULL;
subd->opener_revocation_basepoint = NULL;
subd->conn = NULL;
/* Connect it to the peer */
tal_arr_expand(&peer->subds, subd);
tal_add_destructor(subd, destroy_subd);
return subd;
}
static struct io_plan *read_hdr_from_peer(struct io_conn *peer_conn,
struct peer *peer);
static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn,
@ -1094,15 +1069,18 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn,
return read_hdr_from_peer(peer_conn, peer);
}
/* If we don't find a subdaemon for this, activate a new one. */
/* If we don't find a subdaemon for this, crteat a new one. */
subd = find_subd(peer, &channel_id);
if (!subd) {
enum peer_wire t = fromwire_peektype(decrypted);
status_peer_debug(&peer->id, "Activating for message %s",
peer_wire_name(t));
subd = activate_subd(peer, &t, &channel_id);
if (!subd)
return io_close(peer_conn);
subd = new_subd(peer, &channel_id);
/* We tell lightningd to fire up a subdaemon to handle this! */
daemon_conn_send(peer->daemon->master,
take(towire_connectd_peer_spoke(NULL, &peer->id,
t,
&channel_id)));
}
/* Even if we just created it, call this to catch open_channel2 */
@ -1156,53 +1134,6 @@ static struct io_plan *subd_conn_init(struct io_conn *subd_conn,
write_to_subd(subd_conn, subd));
}
static void destroy_subd(struct subd *subd)
{
struct peer *peer = subd->peer;
size_t pos;
for (pos = 0; peer->subds[pos] != subd; pos++)
assert(pos < tal_count(peer->subds));
tal_arr_remove(&peer->subds, pos);
/* Make sure we try to keep reading from peer (might
* have been waiting for write_to_subd) */
io_wake(&peer->peer_in);
}
static struct subd *multiplex_subd_setup(struct peer *peer,
const struct channel_id *channel_id,
int *fd_for_subd)
{
int fds[2];
struct subd *subd;
if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) {
status_broken("Failed to create socketpair: %s",
strerror(errno));
return NULL;
}
subd = tal(NULL, struct subd);
subd->peer = peer;
subd->outq = msg_queue_new(subd, false);
subd->channel_id = *channel_id;
subd->temporary_channel_id = NULL;
subd->opener_revocation_basepoint = NULL;
/* This sets subd->conn inside subd_conn_init */
io_new_conn(peer->subds, fds[0], subd_conn_init, subd);
/* When conn dies, subd is freed. */
tal_steal(subd->conn, subd);
/* Connect it to the peer */
tal_arr_expand(&peer->subds, subd);
tal_add_destructor(subd, destroy_subd);
*fd_for_subd = fds[1];
return subd;
}
static void destroy_peer_conn(struct io_conn *peer_conn, struct peer *peer)
{
assert(peer->to_peer == peer_conn);
@ -1239,6 +1170,38 @@ struct io_plan *multiplex_peer_setup(struct io_conn *peer_conn,
write_to_peer(peer_conn, peer));
}
void peer_connect_subd(struct daemon *daemon, const u8 *msg, int fd)
{
struct node_id id;
struct peer *peer;
struct channel_id channel_id;
struct subd *subd;
if (!fromwire_connectd_peer_connect_subd(msg, &id, &channel_id))
master_badmsg(WIRE_CONNECTD_PEER_CONNECT_SUBD, msg);
/* Races can happen: this might be gone by now. */
peer = peer_htable_get(&daemon->peers, &id);
if (!peer) {
close(fd);
return;
}
/* Could be disconnecting now */
if (!peer->to_peer) {
close(fd);
return;
}
/* If peer said something, we created this and queued msg. */
subd = find_subd(peer, &channel_id);
if (!subd)
subd = new_subd(peer, &channel_id);
assert(!subd->conn);
/* This sets subd->conn inside subd_conn_init */
io_new_conn(subd, fd, subd_conn_init, subd);
}
/* Lightningd says to send a ping */
void send_manual_ping(struct daemon *daemon, const u8 *msg)

View file

@ -33,5 +33,5 @@ void send_manual_ping(struct daemon *daemon, const u8 *msg);
void send_custommsg(struct daemon *daemon, const u8 *msg);
/* Lightningd wants to talk to you. */
void peer_make_active(struct daemon *daemon, const u8 *msg);
void peer_connect_subd(struct daemon *daemon, const u8 *msg, int fd);
#endif /* LIGHTNING_CONNECTD_MULTIPLEX_H */

View file

@ -595,7 +595,7 @@ static unsigned channel_msg(struct subd *sd, const u8 *msg, const int *fds)
return 0;
}
void peer_start_channeld(struct channel *channel,
bool peer_start_channeld(struct channel *channel,
struct peer_fd *peer_fd,
const u8 *fwd_msg,
bool reconnected,
@ -639,7 +639,7 @@ void peer_start_channeld(struct channel *channel,
strerror(errno));
channel_fail_reconnect_later(channel,
"Failed to subdaemon channel");
return;
return false;
}
htlcs = peer_htlcs(tmpctx, channel);
@ -677,7 +677,7 @@ void peer_start_channeld(struct channel *channel,
REASON_LOCAL,
"Could not get revocation secret %"PRIu64,
num_revocations-1);
return;
return false;
}
/* Warn once. */
@ -691,7 +691,7 @@ void peer_start_channeld(struct channel *channel,
channel_internal_error(channel,
"Could not load remote announcement"
" signatures");
return;
return false;
}
pbases = wallet_penalty_base_load_for_channel(
@ -706,7 +706,7 @@ void peer_start_channeld(struct channel *channel,
channel_internal_error(channel,
"Could not derive final_ext_key %"PRIu64,
channel->final_key_idx);
return;
return false;
}
initmsg = towire_channeld_init(tmpctx,
@ -796,6 +796,7 @@ void peer_start_channeld(struct channel *channel,
subd_send_msg(channel->owner,
take(towire_channeld_funding_depth(
NULL, channel->scid, channel->alias[LOCAL], 0)));
return true;
}
bool channel_tell_depth(struct lightningd *ld,

View file

@ -10,7 +10,7 @@ struct lightningd;
struct peer_fd;
struct peer;
void peer_start_channeld(struct channel *channel,
bool peer_start_channeld(struct channel *channel,
struct peer_fd *peer_fd,
const u8 *fwd_msg,
bool reconnected,

View file

@ -471,7 +471,7 @@ static unsigned connectd_msg(struct subd *connectd, const u8 *msg, const int *fd
case WIRE_CONNECTD_DEV_MEMLEAK:
case WIRE_CONNECTD_DEV_SUPPRESS_GOSSIP:
case WIRE_CONNECTD_PEER_FINAL_MSG:
case WIRE_CONNECTD_PEER_MAKE_ACTIVE:
case WIRE_CONNECTD_PEER_CONNECT_SUBD:
case WIRE_CONNECTD_PING:
case WIRE_CONNECTD_SEND_ONIONMSG:
case WIRE_CONNECTD_CUSTOMMSG_OUT:
@ -486,10 +486,8 @@ static unsigned connectd_msg(struct subd *connectd, const u8 *msg, const int *fd
peer_connected(connectd->ld, msg);
break;
case WIRE_CONNECTD_PEER_ACTIVE:
if (tal_count(fds) != 1)
return 1;
peer_active(connectd->ld, msg, fds[0]);
case WIRE_CONNECTD_PEER_SPOKE:
peer_spoke(connectd->ld, msg);
break;
case WIRE_CONNECTD_PEER_DISCONNECT_DONE:

View file

@ -2585,6 +2585,7 @@ static struct command_result *json_openchannel_init(struct command *cmd,
struct open_attempt *oa;
struct lease_rates *rates;
struct command_result *res;
int fds[2];
if (!param(cmd, buffer, params,
p_req("id", param_node_id, &id),
@ -2743,10 +2744,28 @@ static struct command_result *json_openchannel_init(struct command *cmd,
false,
rates);
/* Tell connectd to hand us this so we can start dualopend */
if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) {
return command_fail(cmd, FUND_MAX_EXCEEDED,
"Failed to create socketpair: %s",
strerror(errno));
}
/* Start dualopend! */
if (!peer_start_dualopend(peer, new_peer_fd(cmd, fds[0]), channel)) {
close(fds[1]);
/* FIXME: gets completed by failure path above! */
return command_its_complicated("completed by peer_start_dualopend");
}
/* Go! */
subd_send_msg(channel->owner, channel->open_attempt->open_msg);
/* Tell connectd connect this to this channel id. */
subd_send_msg(peer->ld->connectd,
take(towire_connectd_peer_make_active(NULL, &peer->id,
&channel->cid)));
take(towire_connectd_peer_connect_subd(NULL,
&peer->id,
&channel->cid)));
subd_send_fd(peer->ld->connectd, fds[1]);
return command_still_pending(cmd);
}
@ -3109,6 +3128,7 @@ static struct command_result *json_queryrates(struct command *cmd,
struct open_attempt *oa;
u32 *our_upfront_shutdown_script_wallet_index;
struct command_result *res;
int fds[2];
if (!param(cmd, buffer, params,
p_req("id", param_node_id, &id),
@ -3210,13 +3230,30 @@ static struct command_result *json_queryrates(struct command *cmd,
true,
NULL);
/* Tell connectd to hand us this so we can start dualopend */
subd_send_msg(peer->ld->connectd,
take(towire_connectd_peer_make_active(NULL, &peer->id,
&channel->cid)));
return command_still_pending(cmd);
if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) {
return command_fail(cmd, FUND_MAX_EXCEEDED,
"Failed to create socketpair: %s",
strerror(errno));
}
}
/* Start dualopend! */
if (!peer_start_dualopend(peer, new_peer_fd(cmd, fds[0]), channel)) {
close(fds[1]);
/* FIXME: gets completed by failure path above! */
return command_its_complicated("completed by peer_start_dualopend");
}
/* Go! */
subd_send_msg(channel->owner, channel->open_attempt->open_msg);
/* Tell connectd connect this to this channel id. */
subd_send_msg(peer->ld->connectd,
take(towire_connectd_peer_connect_subd(NULL,
&peer->id,
&channel->cid)));
subd_send_fd(peer->ld->connectd, fds[1]);
return command_still_pending(cmd);
}
static const struct json_command queryrates_command = {
"dev-queryrates",
@ -3333,7 +3370,7 @@ bool peer_start_dualopend(struct peer *peer,
return true;
}
void peer_restart_dualopend(struct peer *peer,
bool peer_restart_dualopend(struct peer *peer,
struct peer_fd *peer_fd,
struct channel *channel)
{
@ -3345,10 +3382,9 @@ void peer_restart_dualopend(struct peer *peer,
u32 *local_shutdown_script_wallet_index;
u8 *msg;
if (channel_unsaved(channel)) {
peer_start_dualopend(peer, peer_fd, channel);
return;
}
if (channel_unsaved(channel))
return peer_start_dualopend(peer, peer_fd, channel);
hsmfd = hsm_get_client_fd(peer->ld, &peer->id, channel->dbid,
HSM_CAP_COMMITMENT_POINT
| HSM_CAP_SIGN_REMOTE_TX
@ -3371,7 +3407,7 @@ void peer_restart_dualopend(struct peer *peer,
strerror(errno));
channel_fail_reconnect_later(channel,
"Failed to subdaemon channel");
return;
return false;
}
/* Find the max self delay and min htlc capacity */
@ -3434,6 +3470,6 @@ void peer_restart_dualopend(struct peer *peer,
inflight->lease_chan_max_msat,
inflight->lease_chan_max_ppt);
subd_send_msg(channel->owner, take(msg));
return true;
}

View file

@ -9,7 +9,7 @@ struct peer_fd;
bool peer_start_dualopend(struct peer *peer, struct peer_fd *peer_fd,
struct channel *channel);
void peer_restart_dualopend(struct peer *peer,
bool peer_restart_dualopend(struct peer *peer,
struct peer_fd *peer_fd,
struct channel *channel);

View file

@ -1083,7 +1083,7 @@ static struct command_result *json_fundchannel_start(struct command *cmd,
struct peer *peer;
bool *announce_channel;
u32 *feerate_per_kw, *mindepth;
int fds[2];
struct amount_sat *amount;
struct amount_msat *push_msat;
u32 *upfront_shutdown_script_wallet_index;
@ -1236,10 +1236,25 @@ static struct command_result *json_fundchannel_start(struct command *cmd,
&tmp_channel_id,
fc->channel_flags);
/* Tell connectd to make this active; when it does, we can continue */
if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) {
return command_fail(cmd, FUND_MAX_EXCEEDED,
"Failed to create socketpair: %s",
strerror(errno));
}
if (!peer_start_openingd(peer, new_peer_fd(cmd, fds[0]))) {
close(fds[1]);
/* FIXME: gets completed by failure path above! */
return command_its_complicated("completed by peer_start_openingd");
}
/* Tell it to start funding */
subd_send_msg(peer->uncommitted_channel->open_daemon, fc->open_msg);
/* Tell connectd connect this to this channel id. */
subd_send_msg(peer->ld->connectd,
take(towire_connectd_peer_make_active(NULL, &peer->id,
&tmp_channel_id)));
take(towire_connectd_peer_connect_subd(NULL,
&peer->id,
&peer->uncommitted_channel->cid)));
subd_send_fd(peer->ld->connectd, fds[1]);
return command_still_pending(cmd);
}

View file

@ -1007,6 +1007,91 @@ peer_connected_serialize(struct peer_connected_hook_payload *payload,
json_object_end(stream); /* .peer */
}
/* Talk to connectd about an active channel */
static void connect_activate_subd(struct lightningd *ld, struct channel *channel)
{
const u8 *error;
int fds[2];
/* If we have a canned error for this channel, send it now */
if (channel->error) {
error = channel->error;
goto send_error;
}
switch (channel->state) {
case ONCHAIN:
case FUNDING_SPEND_SEEN:
case CLOSINGD_COMPLETE:
case CLOSED:
/* Channel is active */
abort();
case AWAITING_UNILATERAL:
/* channel->error is not saved in db, so this can
* happen if we restart. */
error = towire_errorfmt(tmpctx, &channel->cid,
"Awaiting unilateral close");
goto send_error;
case DUALOPEND_OPEN_INIT:
case DUALOPEND_AWAITING_LOCKIN:
assert(!channel->owner);
if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) {
log_broken(channel->log,
"Failed to create socketpair: %s",
strerror(errno));
error = towire_warningfmt(tmpctx, &channel->cid,
"Trouble in paradise?");
goto send_error;
}
if (peer_restart_dualopend(channel->peer,
new_peer_fd(tmpctx, fds[0]),
channel))
goto tell_connectd;
close(fds[1]);
return;
case CHANNELD_AWAITING_LOCKIN:
case CHANNELD_NORMAL:
case CHANNELD_SHUTTING_DOWN:
case CLOSINGD_SIGEXCHANGE:
assert(!channel->owner);
if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) {
log_broken(channel->log,
"Failed to create socketpair: %s",
strerror(errno));
error = towire_warningfmt(tmpctx, &channel->cid,
"Trouble in paradise?");
goto send_error;
}
if (peer_start_channeld(channel,
new_peer_fd(tmpctx, fds[0]),
NULL, true,
NULL)) {
goto tell_connectd;
}
close(fds[1]);
return;
}
abort();
tell_connectd:
subd_send_msg(ld->connectd,
take(towire_connectd_peer_connect_subd(NULL,
&channel->peer->id,
&channel->cid)));
subd_send_fd(ld->connectd, fds[1]);
return;
send_error:
log_debug(channel->log, "Telling connectd to send error %s",
tal_hex(tmpctx, error));
/* Get connectd to send error and close. */
subd_send_msg(ld->connectd,
take(towire_connectd_peer_final_msg(NULL, &channel->peer->id,
error)));
}
static void peer_connected_hook_final(struct peer_connected_hook_payload *payload STEALS)
{
struct lightningd *ld = payload->ld;
@ -1041,23 +1126,27 @@ static void peer_connected_hook_final(struct peer_connected_hook_payload *payloa
/* Notify anyone who cares */
notify_connect(ld, &peer->id, payload->incoming, &addr);
list_for_each(&peer->channels, channel, list) {
#if DEVELOPER
if (dev_disconnect_permanent(ld)) {
/* Developer hack to fail all channels on permfail line. */
if (dev_disconnect_permanent(ld)) {
list_for_each(&peer->channels, channel, list) {
channel_fail_permanent(channel, REASON_LOCAL,
"dev_disconnect permfail");
error = channel->error;
goto send_error;
subd_send_msg(ld->connectd,
take(towire_connectd_peer_final_msg(NULL, &peer->id,
channel->error)));
}
return;
}
#endif
/* connect appropriate subds for all (active) channels! */
list_for_each(&peer->channels, channel, list) {
if (channel_active(channel)) {
log_debug(channel->log, "Peer has reconnected, state %s: telling connectd to make active",
log_debug(channel->log, "Peer has reconnected, state %s: connecting subd",
channel_state_name(channel));
subd_send_msg(ld->connectd,
take(towire_connectd_peer_make_active(NULL, &peer->id,
&channel->cid)));
connect_activate_subd(ld, channel);
}
}
return;
@ -1239,22 +1328,23 @@ void peer_connected(struct lightningd *ld, const u8 *msg)
plugin_hook_call_peer_connected(ld, hook_payload);
}
/* connectd tells us a peer has an interesting message, and hands us an
* fd to give to the correct subdaemon. Unlike peer_connected, this is racy:
* we might have just told it to disconnect peer. */
void peer_active(struct lightningd *ld, const u8 *msg, int fd)
/* connectd tells us a peer has a message and we've not already attached
* a subd. Normally this is a race, but it happens for real when opening
* a new channel, or referring to a channel we no longer want to talk to
* it about. */
void peer_spoke(struct lightningd *ld, const u8 *msg)
{
struct node_id id;
u16 *msgtype;
u16 msgtype;
struct channel *channel;
struct channel_id channel_id;
struct peer *peer;
bool dual_fund;
u8 *error;
struct peer_fd *peer_fd = new_peer_fd(tmpctx, fd);
int fds[2];
if (!fromwire_connectd_peer_active(msg, msg, &id, &msgtype, &channel_id))
fatal("Connectd gave bad CONNECTD_PEER_ACTIVE message %s",
if (!fromwire_connectd_peer_spoke(msg, &id, &msgtype, &channel_id))
fatal("Connectd gave bad CONNECTD_PEER_SPOKE message %s",
tal_hex(msg, msg));
peer = peer_by_id(ld, &id);
@ -1274,83 +1364,44 @@ void peer_active(struct lightningd *ld, const u8 *msg, int fd)
goto send_error;
}
switch (channel->state) {
case ONCHAIN:
case FUNDING_SPEND_SEEN:
case CLOSINGD_COMPLETE:
goto channel_is_closed;
case CLOSED:
/* Channel should not have been loaded */
abort();
case AWAITING_UNILATERAL: {
/* channel->error is not saved in db, so this can
* happen if we restart. */
error = towire_errorfmt(tmpctx, &channel->cid,
"Awaiting unilateral close");
goto send_error;
}
case DUALOPEND_OPEN_INIT:
/* We asked for this, to open? */
if (!msgtype
&& channel->open_attempt
&& channel->open_attempt->open_msg) {
if (peer_start_dualopend(peer, peer_fd, channel))
subd_send_msg(channel->owner, channel->open_attempt->open_msg);
return;
/* If channel is active, we raced, so ignore this:
* subd will get it soon. */
if (channel_active(channel))
return;
if (msgtype == WIRE_CHANNEL_REESTABLISH) {
log_debug(channel->log,
"Reestablish on %s channel: using channeld to reply",
channel_state_name(channel));
if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) {
log_broken(channel->log,
"Failed to create socketpair: %s",
strerror(errno));
error = towire_warningfmt(tmpctx, &channel->cid,
"Trouble in paradise?");
goto send_error;
}
/* Fall through. */
case DUALOPEND_AWAITING_LOCKIN:
assert(!channel->owner);
peer_restart_dualopend(peer, peer_fd, channel);
return;
case CHANNELD_AWAITING_LOCKIN:
case CHANNELD_NORMAL:
case CHANNELD_SHUTTING_DOWN:
case CLOSINGD_SIGEXCHANGE:
/* Maybe old owner was too slow exiting? */
tal_free(channel->owner);
peer_start_channeld(channel,
peer_fd,
NULL, true,
NULL);
if (peer_start_channeld(channel, new_peer_fd(tmpctx, fds[0]), NULL, true, true)) {
goto tell_connectd;
}
/* FIXME: Send informative error? */
close(fds[1]);
return;
}
abort();
/* Send generic error. */
error = towire_errorfmt(tmpctx, &channel_id,
"channel in state %s",
channel_state_name(channel));
goto send_error;
}
dual_fund = feature_negotiated(ld->our_features,
peer->their_features,
OPT_DUAL_FUND);
/* Did we ask for this? */
if (!msgtype) {
/* If it was dual_fund, it will have peer_unsaved_channel above */
if (dual_fund) {
log_broken(ld->log, "Unsolicited active df peer %s?",
type_to_string(tmpctx, struct node_id,
&peer->id));
} else {
const struct uncommitted_channel *uc
= peer->uncommitted_channel;
if (!uc->open_daemon
&& uc->fc
&& uc->fc->open_msg) {
if (peer_start_openingd(peer, peer_fd)) {
subd_send_msg(uc->open_daemon,
uc->fc->open_msg);
}
} else {
log_broken(ld->log, "Unsolicited active peer %s?",
type_to_string(tmpctx, struct node_id,
&peer->id));
}
}
return;
}
/* OK, it's an unknown channel. Create a new one if they're trying. */
switch (*msgtype) {
switch (msgtype) {
case WIRE_OPEN_CHANNEL:
if (dual_fund) {
error = towire_errorfmt(tmpctx, &channel_id,
@ -1364,8 +1415,21 @@ void peer_active(struct lightningd *ld, const u8 *msg, int fd)
}
peer->uncommitted_channel = new_uncommitted_channel(peer);
peer->uncommitted_channel->cid = channel_id;
peer_start_openingd(peer, peer_fd);
break;
if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) {
log_broken(ld->log,
"Failed to create socketpair: %s",
strerror(errno));
error = towire_warningfmt(tmpctx, &channel_id,
"Trouble in paradise?");
goto send_error;
}
if (peer_start_openingd(peer, new_peer_fd(tmpctx, fds[0]))) {
goto tell_connectd;
}
/* FIXME: Send informative error? */
close(fds[1]);
return;
case WIRE_OPEN_CHANNEL2:
if (!dual_fund) {
error = towire_errorfmt(tmpctx, &channel_id,
@ -1376,36 +1440,29 @@ void peer_active(struct lightningd *ld, const u8 *msg, int fd)
peer->ld->config.fee_base,
peer->ld->config.fee_per_satoshi);
channel->cid = channel_id;
peer_start_dualopend(peer, peer_fd, channel);
break;
default:
log_peer_unusual(ld->log, &peer->id,
"Unknown channel %s for %s",
type_to_string(tmpctx, struct channel_id,
&channel_id),
peer_wire_name(*msgtype));
error = towire_errorfmt(tmpctx, &channel_id,
"Unknown channel for %s", peer_wire_name(*msgtype));
goto send_error;
break;
}
return;
channel_is_closed:
if (msgtype && *msgtype == WIRE_CHANNEL_REESTABLISH) {
log_debug(channel->log,
"Reestablish on %s channel: using channeld to reply",
channel_state_name(channel));
peer_start_channeld(channel, peer_fd, NULL, true, true);
if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) {
log_broken(ld->log,
"Failed to create socketpair: %s",
strerror(errno));
error = towire_warningfmt(tmpctx, &channel_id,
"Trouble in paradise?");
goto send_error;
}
if (peer_start_dualopend(peer, new_peer_fd(tmpctx, fds[0]), channel))
goto tell_connectd;
/* FIXME: Send informative error? */
close(fds[1]);
return;
}
/* Retransmit error if we have one. Otherwise generic error. */
error = channel->error;
if (!error)
error = towire_errorfmt(tmpctx, &channel_id,
"channel in state %s",
channel_state_name(channel));
/* Weird message? Log and reply with error. */
log_peer_unusual(ld->log, &peer->id,
"Unknown channel %s for %s",
type_to_string(tmpctx, struct channel_id,
&channel_id),
peer_wire_name(msgtype));
error = towire_errorfmt(tmpctx, &channel_id,
"Unknown channel for %s", peer_wire_name(msgtype));
send_error:
log_peer_debug(ld->log, &peer->id, "Telling connectd to send error %s",
@ -1414,6 +1471,12 @@ send_error:
subd_send_msg(ld->connectd,
take(towire_connectd_peer_final_msg(NULL, &peer->id,
error)));
return;
tell_connectd:
subd_send_msg(ld->connectd,
take(towire_connectd_peer_connect_subd(NULL, &id, &channel_id)));
subd_send_fd(ld->connectd, fds[1]);
}
struct disconnect_command {

View file

@ -80,7 +80,8 @@ struct peer *peer_from_json(struct lightningd *ld,
/* connectd tells us what peer is doing */
void peer_connected(struct lightningd *ld, const u8 *msg);
void peer_disconnect_done(struct lightningd *ld, const u8 *msg);
void peer_active(struct lightningd *ld, const u8 *msg, int peer_fd);
void peer_spoke(struct lightningd *ld, const u8 *msg);
/* May delete peer! */
void peer_channels_cleanup_on_disconnect(struct peer *peer);

View file

@ -220,15 +220,15 @@ bool fromwire_channel_id(const u8 **cursor UNNEEDED, size_t *max UNNEEDED,
/* Generated stub for fromwire_channeld_dev_memleak_reply */
bool fromwire_channeld_dev_memleak_reply(const void *p UNNEEDED, bool *leak UNNEEDED)
{ fprintf(stderr, "fromwire_channeld_dev_memleak_reply called!\n"); abort(); }
/* Generated stub for fromwire_connectd_peer_active */
bool fromwire_connectd_peer_active(const tal_t *ctx UNNEEDED, const void *p UNNEEDED, struct node_id *id UNNEEDED, u16 **msgtype UNNEEDED, struct channel_id *channel_id UNNEEDED)
{ fprintf(stderr, "fromwire_connectd_peer_active called!\n"); abort(); }
/* Generated stub for fromwire_connectd_peer_connected */
bool fromwire_connectd_peer_connected(const tal_t *ctx UNNEEDED, const void *p UNNEEDED, struct node_id *id UNNEEDED, struct wireaddr_internal *addr UNNEEDED, struct wireaddr **remote_addr UNNEEDED, bool *incoming UNNEEDED, u8 **features UNNEEDED)
{ fprintf(stderr, "fromwire_connectd_peer_connected called!\n"); abort(); }
/* Generated stub for fromwire_connectd_peer_disconnect_done */
bool fromwire_connectd_peer_disconnect_done(const void *p UNNEEDED, struct node_id *id UNNEEDED)
{ fprintf(stderr, "fromwire_connectd_peer_disconnect_done called!\n"); abort(); }
/* Generated stub for fromwire_connectd_peer_spoke */
bool fromwire_connectd_peer_spoke(const void *p UNNEEDED, struct node_id *id UNNEEDED, u16 *msgtype UNNEEDED, struct channel_id *channel_id UNNEEDED)
{ fprintf(stderr, "fromwire_connectd_peer_spoke called!\n"); abort(); }
/* Generated stub for fromwire_dualopend_dev_memleak_reply */
bool fromwire_dualopend_dev_memleak_reply(const void *p UNNEEDED, bool *leak UNNEEDED)
{ fprintf(stderr, "fromwire_dualopend_dev_memleak_reply called!\n"); abort(); }
@ -647,12 +647,12 @@ struct command_result *param_u64(struct command *cmd UNNEEDED, const char *name
struct channel *peer_any_active_channel(struct peer *peer UNNEEDED, bool *others UNNEEDED)
{ fprintf(stderr, "peer_any_active_channel called!\n"); abort(); }
/* Generated stub for peer_restart_dualopend */
void peer_restart_dualopend(struct peer *peer UNNEEDED,
bool peer_restart_dualopend(struct peer *peer UNNEEDED,
struct peer_fd *peer_fd UNNEEDED,
struct channel *channel UNNEEDED)
{ fprintf(stderr, "peer_restart_dualopend called!\n"); abort(); }
/* Generated stub for peer_start_channeld */
void peer_start_channeld(struct channel *channel UNNEEDED,
bool peer_start_channeld(struct channel *channel UNNEEDED,
struct peer_fd *peer_fd UNNEEDED,
const u8 *fwd_msg UNNEEDED,
bool reconnected UNNEEDED,
@ -693,6 +693,9 @@ struct subd_req *subd_req_(const tal_t *ctx UNNEEDED,
void (*replycb)(struct subd * UNNEEDED, const u8 * UNNEEDED, const int * UNNEEDED, void *) UNNEEDED,
void *replycb_data UNNEEDED)
{ fprintf(stderr, "subd_req_ called!\n"); abort(); }
/* Generated stub for subd_send_fd */
void subd_send_fd(struct subd *sd UNNEEDED, int fd UNNEEDED)
{ fprintf(stderr, "subd_send_fd called!\n"); abort(); }
/* Generated stub for subd_send_msg */
void subd_send_msg(struct subd *sd UNNEEDED, const u8 *msg_out UNNEEDED)
{ fprintf(stderr, "subd_send_msg called!\n"); abort(); }
@ -711,12 +714,12 @@ u8 *towire_channeld_dev_memleak(const tal_t *ctx UNNEEDED)
/* Generated stub for towire_channeld_dev_reenable_commit */
u8 *towire_channeld_dev_reenable_commit(const tal_t *ctx UNNEEDED)
{ fprintf(stderr, "towire_channeld_dev_reenable_commit called!\n"); abort(); }
/* Generated stub for towire_connectd_peer_connect_subd */
u8 *towire_connectd_peer_connect_subd(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, const struct channel_id *channel_id UNNEEDED)
{ fprintf(stderr, "towire_connectd_peer_connect_subd called!\n"); abort(); }
/* Generated stub for towire_connectd_peer_final_msg */
u8 *towire_connectd_peer_final_msg(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, const u8 *msg UNNEEDED)
{ fprintf(stderr, "towire_connectd_peer_final_msg called!\n"); abort(); }
/* Generated stub for towire_connectd_peer_make_active */
u8 *towire_connectd_peer_make_active(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, const struct channel_id *channel_id UNNEEDED)
{ fprintf(stderr, "towire_connectd_peer_make_active called!\n"); abort(); }
/* Generated stub for towire_dualopend_dev_memleak */
u8 *towire_dualopend_dev_memleak(const tal_t *ctx UNNEEDED)
{ fprintf(stderr, "towire_dualopend_dev_memleak called!\n"); abort(); }

View file

@ -151,15 +151,15 @@ bool fromwire_channeld_offer_htlc_reply(const tal_t *ctx UNNEEDED, const void *p
/* Generated stub for fromwire_channeld_sending_commitsig */
bool fromwire_channeld_sending_commitsig(const tal_t *ctx UNNEEDED, const void *p UNNEEDED, u64 *commitnum UNNEEDED, struct penalty_base **pbase UNNEEDED, struct fee_states **fee_states UNNEEDED, struct height_states **blockheight_states UNNEEDED, struct changed_htlc **changed UNNEEDED, struct bitcoin_signature *commit_sig UNNEEDED, struct bitcoin_signature **htlc_sigs UNNEEDED)
{ fprintf(stderr, "fromwire_channeld_sending_commitsig called!\n"); abort(); }
/* Generated stub for fromwire_connectd_peer_active */
bool fromwire_connectd_peer_active(const tal_t *ctx UNNEEDED, const void *p UNNEEDED, struct node_id *id UNNEEDED, u16 **msgtype UNNEEDED, struct channel_id *channel_id UNNEEDED)
{ fprintf(stderr, "fromwire_connectd_peer_active called!\n"); abort(); }
/* Generated stub for fromwire_connectd_peer_connected */
bool fromwire_connectd_peer_connected(const tal_t *ctx UNNEEDED, const void *p UNNEEDED, struct node_id *id UNNEEDED, struct wireaddr_internal *addr UNNEEDED, struct wireaddr **remote_addr UNNEEDED, bool *incoming UNNEEDED, u8 **features UNNEEDED)
{ fprintf(stderr, "fromwire_connectd_peer_connected called!\n"); abort(); }
/* Generated stub for fromwire_connectd_peer_disconnect_done */
bool fromwire_connectd_peer_disconnect_done(const void *p UNNEEDED, struct node_id *id UNNEEDED)
{ fprintf(stderr, "fromwire_connectd_peer_disconnect_done called!\n"); abort(); }
/* Generated stub for fromwire_connectd_peer_spoke */
bool fromwire_connectd_peer_spoke(const void *p UNNEEDED, struct node_id *id UNNEEDED, u16 *msgtype UNNEEDED, struct channel_id *channel_id UNNEEDED)
{ fprintf(stderr, "fromwire_connectd_peer_spoke called!\n"); abort(); }
/* Generated stub for fromwire_dualopend_dev_memleak_reply */
bool fromwire_dualopend_dev_memleak_reply(const void *p UNNEEDED, bool *leak UNNEEDED)
{ fprintf(stderr, "fromwire_dualopend_dev_memleak_reply called!\n"); abort(); }
@ -633,12 +633,12 @@ void payment_succeeded(struct lightningd *ld UNNEEDED, struct htlc_out *hout UNN
const struct preimage *rval UNNEEDED)
{ fprintf(stderr, "payment_succeeded called!\n"); abort(); }
/* Generated stub for peer_restart_dualopend */
void peer_restart_dualopend(struct peer *peer UNNEEDED,
bool peer_restart_dualopend(struct peer *peer UNNEEDED,
struct peer_fd *peer_fd UNNEEDED,
struct channel *channel UNNEEDED)
{ fprintf(stderr, "peer_restart_dualopend called!\n"); abort(); }
/* Generated stub for peer_start_channeld */
void peer_start_channeld(struct channel *channel UNNEEDED,
bool peer_start_channeld(struct channel *channel UNNEEDED,
struct peer_fd *peer_fd UNNEEDED,
const u8 *fwd_msg UNNEEDED,
bool reconnected UNNEEDED,
@ -696,6 +696,9 @@ struct subd_req *subd_req_(const tal_t *ctx UNNEEDED,
void (*replycb)(struct subd * UNNEEDED, const u8 * UNNEEDED, const int * UNNEEDED, void *) UNNEEDED,
void *replycb_data UNNEEDED)
{ fprintf(stderr, "subd_req_ called!\n"); abort(); }
/* Generated stub for subd_send_fd */
void subd_send_fd(struct subd *sd UNNEEDED, int fd UNNEEDED)
{ fprintf(stderr, "subd_send_fd called!\n"); abort(); }
/* Generated stub for subd_send_msg */
void subd_send_msg(struct subd *sd UNNEEDED, const u8 *msg_out UNNEEDED)
{ fprintf(stderr, "subd_send_msg called!\n"); abort(); }
@ -738,12 +741,12 @@ u8 *towire_channeld_offer_htlc(const tal_t *ctx UNNEEDED, struct amount_msat amo
/* Generated stub for towire_channeld_sending_commitsig_reply */
u8 *towire_channeld_sending_commitsig_reply(const tal_t *ctx UNNEEDED)
{ fprintf(stderr, "towire_channeld_sending_commitsig_reply called!\n"); abort(); }
/* Generated stub for towire_connectd_peer_connect_subd */
u8 *towire_connectd_peer_connect_subd(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, const struct channel_id *channel_id UNNEEDED)
{ fprintf(stderr, "towire_connectd_peer_connect_subd called!\n"); abort(); }
/* Generated stub for towire_connectd_peer_final_msg */
u8 *towire_connectd_peer_final_msg(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, const u8 *msg UNNEEDED)
{ fprintf(stderr, "towire_connectd_peer_final_msg called!\n"); abort(); }
/* Generated stub for towire_connectd_peer_make_active */
u8 *towire_connectd_peer_make_active(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, const struct channel_id *channel_id UNNEEDED)
{ fprintf(stderr, "towire_connectd_peer_make_active called!\n"); abort(); }
/* Generated stub for towire_dualopend_dev_memleak */
u8 *towire_dualopend_dev_memleak(const tal_t *ctx UNNEEDED)
{ fprintf(stderr, "towire_dualopend_dev_memleak called!\n"); abort(); }