connectd: implement @ correctly.

dev_blackhole_fd was a hack, and doesn't work well now we are async
(it worked for sync comms in per-peer daemons, but now we could sneak
through a read before we get to the next write).

So, make explicit flags and use them.  This is much easier now we
have all peer comms in one place.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell 2022-01-11 11:46:10 +10:30
parent bb5beeddd7
commit a93c49ca65
5 changed files with 37 additions and 71 deletions

View file

@ -105,56 +105,4 @@ void dev_sabotage_fd(int fd, bool close_fd)
dup2(fds[1], fd);
close(fds[1]);
}
/* Replace fd with blackhole until dev_disconnect file is truncated. */
void dev_blackhole_fd(int fd)
{
int fds[2];
int i;
struct stat st;
int maxfd;
if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) != 0)
err(1, "dev_blackhole_fd: creating socketpair");
switch (fork()) {
case -1:
err(1, "dev_blackhole_fd: forking");
case 0:
/* Close everything but the dev_disconnect_fd, the socket
* which is pretending to be the peer, and stderr.
* The "correct" way to do this would be to move the
* fds we want to preserve to the low end (0, 1, 2...)
* of the fd space and then just do a single closefrom
* call, but dup2 could fail with ENFILE (which is a
* *system*-level error, i.e. the entire system has too
* many processes with open files) and we have no
* convenient way to inform the parent of the error.
* So loop until we reach whichever is higher of fds[0]
* or dev_disconnect_fd, and *then* closefrom after that.
*/
maxfd = (fds[0] > dev_disconnect_fd) ? fds[0] :
dev_disconnect_fd ;
for (i = 0; i < maxfd; i++)
if (i != fds[0]
&& i != dev_disconnect_fd
&& i != STDERR_FILENO)
close(i);
closefrom(maxfd + 1);
/* Close once dev_disconnect file is truncated. */
for (;;) {
if (fstat(dev_disconnect_fd, &st) != 0)
err(1, "fstat of dev_disconnect_fd failed");
if (st.st_size == 0)
_exit(0);
sleep(1);
}
}
close(fds[0]);
dup2(fds[1], fd);
close(fds[1]);
}
#endif

View file

@ -25,9 +25,6 @@ enum dev_disconnect dev_disconnect(const struct node_id *id, int pkt_type);
/* Make next write on fd fail as if they'd disconnected. */
void dev_sabotage_fd(int fd, bool close_fd);
/* No more data to arrive, what's written is swallowed. */
void dev_blackhole_fd(int fd);
/* For debug code to set in daemon. */
void dev_disconnect_init(int fd);

View file

@ -358,6 +358,11 @@ static struct peer *new_peer(struct daemon *daemon,
peer->peer_outq = msg_queue_new(peer);
peer->subd_outq = msg_queue_new(peer);
#if DEVELOPER
peer->dev_writes_enabled = NULL;
peer->dev_read_enabled = true;
#endif
/* Aim for connection to shuffle data back and forth: sets up
* peer->to_subd */
if (!multiplex_subd_setup(peer, fd_for_subd))

View file

@ -61,6 +61,12 @@ struct peer {
/* We stream from the gossip_store for them, when idle */
struct gossip_state gs;
#if DEVELOPER
bool dev_read_enabled;
/* If non-NULL, this counts down; 0 means disable */
u32 *dev_writes_enabled;
#endif
};
/*~ The HTABLE_DEFINE_TYPE() macro needs a keyof() function to extract the key:

View file

@ -155,20 +155,6 @@ static struct io_plan *after_final_msg(struct io_conn *peer_conn,
return io_close(peer_conn);
}
#if DEVELOPER
static struct io_plan *write_to_peer(struct io_conn *peer_conn,
struct peer *peer);
static struct io_plan *dev_leave_hanging(struct io_conn *peer_conn,
struct peer *peer)
{
/* We don't tell the peer we're disconnecting, but from now on
* our writes go nowhere, and there's nothing to read. */
dev_sabotage_fd(io_conn_fd(peer_conn), false);
return write_to_peer(peer_conn, peer);
}
#endif /* DEVELOPER */
/* We're happy for the kernel to batch update and gossip messages, but a
* commitment message, for example, should be instantly sent. There's no
* great way of doing this, unfortunately.
@ -287,15 +273,21 @@ static struct io_plan *encrypt_and_send(struct peer *peer,
tal_free(msg);
return io_close(peer->to_peer);
case DEV_DISCONNECT_AFTER:
/* Disallow reads from now on */
peer->dev_read_enabled = false;
next = (void *)io_close_cb;
break;
case DEV_DISCONNECT_BLACKHOLE:
dev_blackhole_fd(io_conn_fd(peer->to_peer));
/* Disable both reads and writes from now on */
peer->dev_read_enabled = false;
peer->dev_writes_enabled = talz(peer, u32);
break;
case DEV_DISCONNECT_NORMAL:
break;
case DEV_DISCONNECT_DISABLE_AFTER:
next = dev_leave_hanging;
peer->dev_read_enabled = false;
peer->dev_writes_enabled = tal(peer, u32);
*peer->dev_writes_enabled = 1;
break;
}
#endif
@ -426,6 +418,18 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn,
}
}
/* dev_disconnect can disable writes */
#if DEVELOPER
if (peer->dev_writes_enabled) {
if (*peer->dev_writes_enabled == 0) {
tal_free(msg);
/* Continue, to drain queue */
return write_to_peer(peer_conn, peer);
}
(*peer->dev_writes_enabled)--;
}
#endif
return encrypt_and_send(peer, take(msg), write_to_peer);
}
@ -488,6 +492,12 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn,
}
tal_free(peer->peer_in);
/* dev_disconnect can disable read */
if (!IFDEV(peer->dev_read_enabled, true)) {
tal_free(decrypted);
return read_hdr_from_peer(peer_conn, peer);
}
/* If we swallow this, just try again. */
if (handle_message_locally(peer, decrypted)) {
tal_free(decrypted);