connectd: prepare for multiple subd connections.

We still always have 1, but the infrastructure is now in place.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell 2022-03-22 19:22:13 +10:30
parent 005d69c463
commit fcd0b2eb42
4 changed files with 136 additions and 53 deletions

View File

@ -307,13 +307,12 @@ static struct peer *new_peer(struct daemon *daemon,
peer->id = *id;
peer->cs = *cs;
peer->final_msg = NULL;
peer->subd_in = NULL;
peer->subds = tal_arr(peer, struct subd *, 0);
peer->peer_in = NULL;
peer->sent_to_peer = NULL;
peer->urgent = false;
peer->told_to_close = false;
peer->peer_outq = msg_queue_new(peer, false);
peer->subd_outq = msg_queue_new(peer, false);
#if DEVELOPER
peer->dev_writes_enabled = NULL;
@ -323,7 +322,7 @@ static struct peer *new_peer(struct daemon *daemon,
peer->to_peer = conn;
/* Aim for connection to shuffle data back and forth: sets up
* peer->to_subd */
* peer->subds[0] */
if (!multiplex_subd_setup(peer, fd_for_subd))
return tal_free(peer);
@ -1792,7 +1791,7 @@ static void try_connect_peer(struct daemon *daemon,
existing = peer_htable_get(&daemon->peers, id);
if (existing) {
/* If it's exiting now, we've raced: reconnect after */
if (existing->to_subd
if (tal_count(existing->subds) != 0
&& existing->to_peer
&& !existing->told_to_close)
return;
@ -1892,7 +1891,7 @@ void peer_conn_closed(struct peer *peer)
struct connecting *connect = find_connecting(peer->daemon, &peer->id);
/* These should be closed already! */
assert(!peer->to_subd);
assert(!peer->subds);
assert(!peer->to_peer);
assert(peer->told_to_close);

View File

@ -5,6 +5,7 @@
#include <ccan/crypto/siphash24/siphash24.h>
#include <ccan/htable/htable_type.h>
#include <ccan/timer/timer.h>
#include <common/channel_id.h>
#include <common/crypto_state.h>
#include <common/node_id.h>
#include <common/pseudorand.h>
@ -52,8 +53,8 @@ struct peer {
/* Connection to the peer */
struct io_conn *to_peer;
/* Connection to the subdaemon */
struct io_conn *to_subd;
/* Connections to the subdaemons */
struct subd **subds;
/* Final message to send to peer (and hangup) */
u8 *final_msg;
@ -64,11 +65,11 @@ struct peer {
/* When socket has Nagle overridden */
bool urgent;
/* Input buffers. */
u8 *subd_in, *peer_in;
/* Input buffer. */
u8 *peer_in;
/* Output buffers. */
struct msg_queue *subd_outq, *peer_outq;
/* Output buffer. */
struct msg_queue *peer_outq;
/* Peer sent buffer (for freeing after sending) */
const u8 *sent_to_peer;

View File

@ -36,6 +36,26 @@
#include <wire/wire_io.h>
#include <wire/wire_sync.h>
struct subd {
/* Owner: we are in peer->subds[] */
struct peer *peer;
/* The temporary or permanant channel_id */
struct channel_id channel_id;
/* In passing, we can have a temporary one, too. */
struct channel_id *temporary_channel_id;
/* The actual connection to talk to it */
struct io_conn *conn;
/* Input buffer */
u8 *in;
/* Output buffer */
struct msg_queue *outq;
};
void inject_peer_msg(struct peer *peer, const u8 *msg TAKES)
{
status_peer_io(LOG_IO_OUT, &peer->id, msg);
@ -51,10 +71,10 @@ static void send_warning(struct peer *peer, const char *fmt, ...)
status_vfmt(LOG_UNUSUAL, &peer->id, fmt, ap);
va_end(ap);
/* Close locally, send msg as final warning */
io_close(peer->to_subd);
peer->to_subd = NULL;
/* Close to any subdaemons. */
peer->subds = tal_free(peer->subds);
/* Send warning as final message. */
va_start(ap, fmt);
peer->final_msg = towire_warningfmtv(peer, NULL, fmt, ap);
va_end(ap);
@ -573,7 +593,7 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn,
msg = msg_dequeue(peer->peer_outq);
/* Is it time to send final? */
if (!msg && peer->final_msg && !peer->to_subd) {
if (!msg && peer->final_msg && !peer->subds) {
/* OK, send this then close. */
msg = peer->final_msg;
peer->final_msg = NULL;
@ -584,7 +604,7 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn,
/* Still nothing to send? */
if (!msg) {
/* We close once subds are all closed. */
if (!peer->to_subd) {
if (!peer->subds) {
set_closing_timer(peer, peer_conn);
return io_sock_shutdown(peer_conn);
}
@ -593,7 +613,7 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn,
msg = maybe_from_gossip_store(NULL, peer);
if (!msg) {
/* Tell them to read again, */
io_wake(&peer->subd_in);
io_wake(&peer->subds);
/* Wait for them to wake us */
return msg_queue_wait(peer_conn, peer->peer_outq,
@ -617,50 +637,59 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn,
}
static struct io_plan *read_from_subd(struct io_conn *subd_conn,
struct peer *peer);
struct subd *subd);
static struct io_plan *read_from_subd_done(struct io_conn *subd_conn,
struct peer *peer)
struct subd *subd)
{
/* Tell them to encrypt & write. */
msg_enqueue(peer->peer_outq, take(peer->subd_in));
peer->subd_in = NULL;
msg_enqueue(subd->peer->peer_outq, take(subd->in));
subd->in = NULL;
/* Wait for them to wake us */
return io_wait(subd_conn, &peer->subd_in, read_from_subd, peer);
return io_wait(subd_conn, &subd->peer->subds, read_from_subd, subd);
}
static struct io_plan *read_from_subd(struct io_conn *subd_conn,
struct peer *peer)
struct subd *subd)
{
return io_read_wire(subd_conn, peer, &peer->subd_in,
read_from_subd_done, peer);
return io_read_wire(subd_conn, subd, &subd->in,
read_from_subd_done, subd);
}
/* These four function handle peer->subd */
static struct io_plan *write_to_subd(struct io_conn *subd_conn,
struct peer *peer)
struct subd *subd)
{
const u8 *msg;
assert(peer->to_subd == subd_conn);
assert(subd->conn == subd_conn);
/* Pop tail of send queue */
msg = msg_dequeue(peer->subd_outq);
msg = msg_dequeue(subd->outq);
/* Nothing to send? */
if (!msg) {
/* If peer is closed, close this. */
if (!peer->to_peer)
if (!subd->peer->to_peer)
return io_close(subd_conn);
/* Tell them to read again. */
io_wake(&peer->peer_in);
io_wake(&subd->peer->peer_in);
/* Wait for them to wake us */
return msg_queue_wait(subd_conn, peer->subd_outq,
write_to_subd, peer);
return msg_queue_wait(subd_conn, subd->outq,
write_to_subd, subd);
}
return io_write_wire(subd_conn, take(msg), write_to_subd, peer);
return io_write_wire(subd_conn, take(msg), write_to_subd, subd);
}
/* FIXME: We only currently have one subd */
static struct subd *find_subd(struct peer *peer,
const struct channel_id *channel_id)
{
if (tal_count(peer->subds) == 0)
return NULL;
return peer->subds[0];
}
static struct io_plan *read_hdr_from_peer(struct io_conn *peer_conn,
@ -669,6 +698,8 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn,
struct peer *peer)
{
u8 *decrypted;
struct channel_id channel_id;
struct subd *subd;
decrypted = cryptomsg_decrypt_body(tmpctx, &peer->cs,
peer->peer_in);
@ -691,12 +722,39 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn,
if (handle_message_locally(peer, decrypted))
return read_hdr_from_peer(peer_conn, peer);
/* If there's no subd, discard and keep reading. */
if (!peer->to_subd)
/* After this we should be able to match to subd by channel_id */
if (!extract_channel_id(decrypted, &channel_id)) {
enum peer_wire type = fromwire_peektype(decrypted);
/* We won't log this anywhere else, so do it here. */
status_peer_io(LOG_IO_IN, &peer->id, decrypted);
/* Could be a all-channel error or warning? Log it
* more verbose, and hang up. */
if (type == WIRE_ERROR || type == WIRE_WARNING) {
char *desc = sanitize_error(tmpctx, decrypted, NULL);
status_peer_info(&peer->id,
"Received %s: %s",
peer_wire_name(type), desc);
return io_close(peer_conn);
}
/* This sets final_msg: will close after sending warning */
send_warning(peer, "Unexpected message %s: %s",
peer_wire_name(type),
tal_hex(tmpctx, decrypted));
io_wake(peer->peer_outq);
return read_hdr_from_peer(peer_conn, peer);
}
/* If we don't find a subdaemon for this, discard and keep reading. */
subd = find_subd(peer, &channel_id);
if (!subd)
return read_hdr_from_peer(peer_conn, peer);
/* Tell them to write. */
msg_enqueue(peer->subd_outq, take(decrypted));
msg_enqueue(subd->outq, take(decrypted));
/* Wait for them to wake us */
return io_wait(peer_conn, &peer->peer_in, read_hdr_from_peer, peer);
@ -734,21 +792,33 @@ static struct io_plan *read_hdr_from_peer(struct io_conn *peer_conn,
read_body_from_peer, peer);
}
static struct io_plan *subd_conn_init(struct io_conn *subd_conn, struct peer *peer)
static struct io_plan *subd_conn_init(struct io_conn *subd_conn,
struct subd *subd)
{
peer->to_subd = subd_conn;
subd->conn = subd_conn;
return io_duplex(subd_conn,
read_from_subd(subd_conn, peer),
write_to_subd(subd_conn, peer));
read_from_subd(subd_conn, subd),
write_to_subd(subd_conn, subd));
}
static void destroy_subd_conn(struct io_conn *subd_conn, struct peer *peer)
static void destroy_subd(struct subd *subd)
{
assert(subd_conn == peer->to_subd);
peer->to_subd = NULL;
/* In case they were waiting for this to send final_msg */
if (peer->final_msg)
msg_wake(peer->peer_outq);
struct peer *peer = subd->peer;
size_t pos;
for (pos = 0; peer->subds[pos] != subd; pos++)
assert(pos < tal_count(peer->subds));
tal_arr_remove(&peer->subds, pos);
/* Last one out frees array, sets to NULL as an indicator */
if (tal_count(peer->subds) == 0) {
peer->subds = tal_free(peer->subds);
/* In case they were waiting for this to send final_msg */
if (peer->final_msg)
msg_wake(peer->peer_outq);
}
/* Make sure we try to keep reading from peer, so we know if
* it hangs up! */
@ -765,7 +835,7 @@ void close_peer_conn(struct peer *peer)
peer->told_to_close = true;
/* Already dead? */
if (!peer->to_subd && !peer->to_peer) {
if (!peer->subds && !peer->to_peer) {
peer_conn_closed(peer);
return;
}
@ -777,14 +847,26 @@ void close_peer_conn(struct peer *peer)
bool multiplex_subd_setup(struct peer *peer, int *fd_for_subd)
{
int fds[2];
struct subd *subd;
if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) {
status_broken("Failed to create socketpair: %s",
strerror(errno));
return false;
}
peer->to_subd = io_new_conn(peer, fds[0], subd_conn_init, peer);
tal_add_destructor2(peer->to_subd, destroy_subd_conn, peer);
subd = tal(peer->subds, struct subd);
subd->peer = peer;
subd->outq = msg_queue_new(subd, false);
/* This sets subd->conn inside subd_conn_init */
io_new_conn(peer, fds[0], subd_conn_init, subd);
/* When conn dies, subd is freed. */
tal_steal(subd->conn, subd);
/* Connect it to the peer */
tal_arr_expand(&peer->subds, subd);
tal_add_destructor(subd, destroy_subd);
*fd_for_subd = fds[1];
return true;
}
@ -795,8 +877,9 @@ static void destroy_peer_conn(struct io_conn *peer_conn, struct peer *peer)
peer->to_peer = NULL;
/* Flush internal connections if not already. */
if (peer->to_subd) {
msg_wake(peer->subd_outq);
if (peer->subds) {
for (size_t i = 0; i < tal_count(peer->subds); i++)
msg_wake(peer->subds[i]->outq);
return;
}
@ -824,7 +907,7 @@ void multiplex_final_msg(struct peer *peer, const u8 *final_msg TAKES)
{
peer->told_to_close = true;
peer->final_msg = tal_dup_talarr(peer, u8, final_msg);
if (!peer->to_subd)
if (!peer->subds)
io_wake(peer->peer_outq);
}

View File

@ -440,7 +440,7 @@ def test_plugin_connected_hook_chaining(node_factory):
])
# FIXME: this error occurs *after* connection, so we connect then drop.
l3.daemon.wait_for_log(r"chan#1: peer_in WIRE_WARNING")
l3.daemon.wait_for_log(r"-connectd: peer_in WIRE_WARNING")
l3.daemon.wait_for_log(r"You are in reject list")
def check_disconnect():