connectd: implement "transient" connections.

Currently, anything which doesn't have a live channel is considered transient.
We free this first under stress, and also if they're still connecting.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell 2024-05-14 14:01:44 +09:30
parent 9aed594177
commit 8268df9a4b
4 changed files with 99 additions and 29 deletions

View file

@ -152,7 +152,7 @@ static struct peer *new_peer(struct daemon *daemon,
const u8 *their_features,
enum is_websocket is_websocket,
struct io_conn *conn STEALS,
bool deliberate_connection,
enum connection_prio prio,
int *fd_for_subd)
{
struct peer *peer = tal(daemon, struct peer);
@ -169,7 +169,7 @@ static struct peer *new_peer(struct daemon *daemon,
peer->peer_outq = msg_queue_new(peer, false);
peer->last_recv_time = time_now();
peer->is_websocket = is_websocket;
peer->deliberate_connection = deliberate_connection;
peer->prio = prio;
peer->dev_writes_enabled = NULL;
peer->dev_read_enabled = true;
@ -202,7 +202,7 @@ struct io_plan *peer_connected(struct io_conn *conn,
int subd_fd;
bool option_gossip_queries;
struct connecting *connect;
bool deliberate_connection;
enum connection_prio prio;
/* We remove any previous connection immediately, on the assumption it's dead */
peer = peer_htable_get(daemon->peers, id);
@ -257,18 +257,22 @@ struct io_plan *peer_connected(struct io_conn *conn,
}
if (connect) {
deliberate_connection = true;
if (connect->transient)
prio = PRIO_TRANSIENT;
else
prio = PRIO_DELIBERATE;
/*~ Now we've connected, disable the callback which would
* cause us to to try the next address on failure. */
io_set_finish(connect->conn, NULL, NULL);
tal_free(connect);
} else {
deliberate_connection = false;
prio = PRIO_UNSOLICITED;
}
/* This contains the per-peer state info; gossipd fills in pps->gs */
peer = new_peer(daemon, id, cs, their_features, is_websocket, conn,
deliberate_connection, &subd_fd);
prio, &subd_fd);
/* Only takes over conn if it succeeds. */
if (!peer)
return io_close(conn);
@ -386,23 +390,31 @@ static struct io_plan *conn_in(struct io_conn *conn,
* Note, again, the notleak() to avoid our simplistic leak detection
* code from thinking `conn` (which we don't keep a pointer to) is
* leaked */
return responder_handshake(notleak(conn), &daemon->mykey,
return responder_handshake(notleak_with_children(conn), &daemon->mykey,
&conn_in_arg->addr, timeout,
conn_in_arg->is_websocket,
handshake_in_success, daemon);
}
/* How much is peer worth (when considering disconnet)? */
static size_t peer_score(const struct peer *peer)
/* How much is peer worth (when considering disconnect)? */
static size_t peer_score(enum connection_prio prio,
struct subd **subds)
{
#define PEER_SCORE_MAX 2
#define PEER_SCORE_MAX 3
/* We deliberately connected to it? Highest prio */
if (peer->deliberate_connection)
return 2;
/* It has subds now? Higher prio */
if (tal_count(peer->subds))
switch (prio) {
case PRIO_DELIBERATE:
/* We definitely want this one */
return 3;
case PRIO_TRANSIENT:
/* We're explicitly told to dispose of these! */
return 0;
case PRIO_UNSOLICITED:
/* It has subds now? Higher prio */
if (tal_count(subds))
return 2;
return 1;
}
return 0;
}
@ -413,13 +425,31 @@ void close_random_connection(struct daemon *daemon)
struct peer *peer, *best_peer = NULL;
size_t best_peer_score = PEER_SCORE_MAX + 1;
struct peer_htable_iter it;
struct connecting *c;
struct connecting_htable_iter cit;
bool closed_connect_attempt = false;
/* First, close all transient connection attempts in-flight */
for (c = connecting_htable_first(daemon->connecting, &cit);
c;
c = connecting_htable_next(daemon->connecting, &cit)) {
if (!c->transient)
continue;
status_debug("due to stress, closing transient connect attempt to %s",
fmt_node_id(tmpctx, &c->id));
/* This tells destructor why it was closed */
errno = EMFILE;
tal_free(c);
closed_connect_attempt = true;
}
/* Prefer ones with no subds (just chatting), or failing that,
* ones we didn't deliberately connect to. */
peer = peer_htable_pick(daemon->peers, pseudorand_u64(), &it);
for (size_t i = 0; i < peer_htable_count(daemon->peers); i++) {
size_t score = peer_score(peer);
size_t score = peer_score(peer->prio, peer->subds);
if (score < best_peer_score) {
best_peer = peer;
best_peer_score = score;
@ -432,11 +462,17 @@ void close_random_connection(struct daemon *daemon)
peer = peer_htable_first(daemon->peers, &it);
}
if (best_peer) {
status_debug("due to stress, randomly closing peer %s (score %zu)",
fmt_node_id(tmpctx, &best_peer->id), best_peer_score);
io_close(best_peer->to_peer);
}
if (!best_peer)
return;
/* Don't close active peer if we closed an attempt */
if (closed_connect_attempt
&& best_peer_score > peer_score(PRIO_UNSOLICITED, NULL))
return;
status_debug("due to stress, randomly closing peer %s (score %zu)",
fmt_node_id(tmpctx, &best_peer->id), best_peer_score);
io_close(best_peer->to_peer);
}
/*~ When we get a direct connection in we set up its network address
@ -667,6 +703,8 @@ static void destroy_io_conn(struct io_conn *conn, struct connecting *connect)
errstr = "peer closed connection";
if (streq(connect->connstate, "Cryptographic handshake"))
errstr = "peer closed connection (wrong key?)";
} else if (errno == EMFILE) {
errstr = "Terminated due to too many connections";
}
add_errors_to_error_list(connect,
@ -891,6 +929,12 @@ static void try_connect_one_addr(struct connecting *connect)
}
fd = socket(af, SOCK_STREAM, 0);
/* If we're out of fds, and can drop one, re-try */
if (fd < 0 && errno == EMFILE) {
close_random_connection(connect->daemon);
fd = socket(af, SOCK_STREAM, 0);
}
if (fd < 0) {
tal_append_fmt(&connect->errors,
"%s: opening %i socket gave %s. ",
@ -1631,7 +1675,8 @@ static void try_connect_peer(struct daemon *daemon,
const struct node_id *id,
struct wireaddr *gossip_addrs,
struct wireaddr_internal *addrhint STEALS,
bool dns_fallback)
bool dns_fallback,
bool transient)
{
struct wireaddr_internal *addrs;
bool use_proxy = daemon->always_use_proxy;
@ -1641,8 +1686,9 @@ static void try_connect_peer(struct daemon *daemon,
/* Already existing? Must have crossed over, it'll know soon. */
peer = peer_htable_get(daemon->peers, id);
if (peer) {
/* Note now that we explicitly tried to connect */
peer->deliberate_connection = true;
/* Note if we explicitly tried to connect non-transiently */
if (!transient)
peer->prio = PRIO_DELIBERATE;
return;
}
@ -1716,6 +1762,7 @@ static void try_connect_peer(struct daemon *daemon,
connect->addrhint = tal_steal(connect, addrhint);
connect->errors = tal_strdup(connect, "");
connect->conn = NULL;
connect->transient = transient;
connecting_htable_add(daemon->connecting, connect);
tal_add_destructor(connect, destroy_connecting);
@ -1730,13 +1777,15 @@ static void connect_to_peer(struct daemon *daemon, const u8 *msg)
struct wireaddr_internal *addrhint;
struct wireaddr *addrs;
bool dns_fallback;
bool transient;
if (!fromwire_connectd_connect_to_peer(tmpctx, msg,
&id, &addrs, &addrhint,
&dns_fallback))
&dns_fallback,
&transient))
master_badmsg(WIRE_CONNECTD_CONNECT_TO_PEER, msg);
try_connect_peer(daemon, &id, addrs, addrhint, dns_fallback);
try_connect_peer(daemon, &id, addrs, addrhint, dns_fallback, transient);
}
/* lightningd tells us a peer should be disconnected. */

View file

@ -40,6 +40,16 @@ enum pong_expect_type {
PONG_EXPECTED_PROBING = 2,
};
/*~ We classify connections by loose priority */
enum connection_prio {
/* We deliberately connected to them. */
PRIO_DELIBERATE,
/* They connected to us, unsolicited. */
PRIO_UNSOLICITED,
/* We connected, but transiently. */
PRIO_TRANSIENT,
};
/*~ We keep a hash table (ccan/htable) of peers, which tells us what peers are
* already connected (by peer->id). */
struct peer {
@ -90,8 +100,8 @@ struct peer {
/* Last time we received traffic */
struct timeabs last_recv_time;
/* Were we explicitly told to connect to this peer? */
bool deliberate_connection;
/* How important does this peer seem to be? */
enum connection_prio prio;
bool dev_read_enabled;
/* If non-NULL, this counts down; 0 means disable */
@ -144,6 +154,9 @@ struct connecting {
/* Accumulated errors */
char *errors;
/* Is this a transient connection? */
bool transient;
};
static const struct node_id *connecting_keyof(const struct connecting *connecting)

View file

@ -55,6 +55,7 @@ msgdata,connectd_connect_to_peer,len,u32,
msgdata,connectd_connect_to_peer,addrs,wireaddr,len
msgdata,connectd_connect_to_peer,addrhint,?wireaddr_internal,
msgdata,connectd_connect_to_peer,dns_fallback,bool,
msgdata,connectd_connect_to_peer,transient,bool,
# Connectd->master: connect failed.
msgtype,connectd_connect_failed,2020

1 #include <bitcoin/block.h>
55 msgdata,connectd_connect_failed,failcode,enum jsonrpc_errcode, msgdata,connectd_connect_failed,id,node_id,
56 msgdata,connectd_connect_failed,failreason,wirestring, msgdata,connectd_connect_failed,failcode,enum jsonrpc_errcode,
57 msgdata,connectd_connect_failed,addrhint,?wireaddr_internal, msgdata,connectd_connect_failed,failreason,wirestring,
58 msgdata,connectd_connect_failed,addrhint,?wireaddr_internal,
59 # Connectd -> master: we got a peer.
60 msgtype,connectd_peer_connected,2002
61 msgdata,connectd_peer_connected,id,node_id,

View file

@ -292,16 +292,23 @@ static void gossipd_got_addrs(struct subd *subd,
{
struct wireaddr *addrs;
u8 *connectmsg;
struct peer *peer;
bool transient;
if (!fromwire_gossipd_get_addrs_reply(tmpctx, msg, &addrs))
fatal("Gossipd gave bad GOSSIPD_GET_ADDRS_REPLY %s",
tal_hex(msg, msg));
/* We consider this transient unless we have a channel */
peer = peer_by_id(d->ld, &d->id);
transient = !peer || !peer_any_channel(peer, channel_state_wants_peercomms, NULL);
connectmsg = towire_connectd_connect_to_peer(NULL,
&d->id,
addrs,
d->addrhint,
d->dns_fallback);
d->dns_fallback,
transient);
subd_send_msg(d->ld->connectd, take(connectmsg));
tal_free(d);
}