connectd: do nagle by packet type.

channeld can't do it any more: it's using local sockets.  Connectd
can do it, and simply does it by type.

Amazingly, on my machine the timing change *always* caused
test_channel_receivable() to fail, due to a latent race.

Includes feedback from @cdecker.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell 2022-01-08 23:56:29 +10:30
parent 7a514112ec
commit e37a638c0c
7 changed files with 118 additions and 53 deletions

View File

@ -1123,7 +1123,7 @@ static void send_ping(struct peer *peer)
exit(0);
}
peer_write_no_delay(peer->pps, take(make_ping(NULL, 1, 0)));
peer_write(peer->pps, take(make_ping(NULL, 1, 0)));
peer->expecting_pong = PONG_EXPECTED_PROBING;
set_ping_timer(peer);
}
@ -1415,7 +1415,7 @@ static void send_commit(struct peer *peer)
msg = towire_commitment_signed(NULL, &peer->channel_id,
&commit_sig.s,
raw_sigs(tmpctx, htlc_sigs));
peer_write_no_delay(peer->pps, take(msg));
peer_write(peer->pps, take(msg));
maybe_send_shutdown(peer);
@ -1583,7 +1583,7 @@ static void send_revocation(struct peer *peer,
WIRE_CHANNELD_GOT_COMMITSIG_REPLY);
/* Now we can finally send revoke_and_ack to peer */
peer_write_no_delay(peer->pps, take(msg));
peer_write(peer->pps, take(msg));
}
static void handle_peer_commit_sig(struct peer *peer, const u8 *msg)
@ -3631,7 +3631,7 @@ static void handle_send_ping(struct peer *peer, const u8 *msg)
if (tal_count(ping) > 65535)
status_failed(STATUS_FAIL_MASTER_IO, "Oversize ping");
peer_write_no_delay(peer->pps, take(ping));
peer_write(peer->pps, take(ping));
/* Since we're doing this manually, kill and restart timer. */
status_debug("sending ping expecting %sresponse",

View File

@ -22,50 +22,6 @@ void peer_write(struct per_peer_state *pps, const void *msg TAKES)
peer_failed_connection_lost();
}
/* We're happy for the kernel to batch update and gossip messages, but a
* commitment message, for example, should be instantly sent. There's no
* great way of doing this, unfortunately.
*
* Setting TCP_NODELAY on Linux flushes the socket, which really means
* we'd want to toggle on then off it *after* sending. But Linux has
* TCP_CORK. On FreeBSD, it seems (looking at source) not to, so
* there we'd want to set it before the send, and reenable it
* afterwards. Even if this is wrong on other non-Linux platforms, it
* only means one extra packet.
*/
void peer_write_no_delay(struct per_peer_state *pps, const void *msg TAKES)
{
int val;
int opt;
const char *optname;
static bool complained = false;
#ifdef TCP_CORK
opt = TCP_CORK;
optname = "TCP_CORK";
#elif defined(TCP_NODELAY)
opt = TCP_NODELAY;
optname = "TCP_NODELAY";
#else
#error "Please report platform with neither TCP_CORK nor TCP_NODELAY?"
#endif
val = 1;
if (setsockopt(pps->peer_fd, IPPROTO_TCP, opt, &val, sizeof(val)) != 0) {
/* This actually happens in testing, where we blackhole the fd */
if (!complained) {
status_unusual("setsockopt %s=1: %s",
optname,
strerror(errno));
complained = true;
}
}
peer_write(pps, msg);
val = 0;
setsockopt(pps->peer_fd, IPPROTO_TCP, opt, &val, sizeof(val));
}
u8 *peer_read(const tal_t *ctx, struct per_peer_state *pps)
{
u8 *dec = wire_sync_read(ctx, pps->peer_fd);

View File

@ -9,9 +9,6 @@ struct per_peer_state;
/* Exits with peer_failed_connection_lost() if write fails. */
void peer_write(struct per_peer_state *pps, const void *msg TAKES);
/* Same, but disabled nagle for this message. */
void peer_write_no_delay(struct per_peer_state *pps, const void *msg TAKES);
/* Exits with peer_failed_connection_lost() if can't read packet. */
u8 *peer_read(const tal_t *ctx, struct per_peer_state *pps);

View File

@ -454,6 +454,7 @@ static struct peer *new_peer(struct daemon *daemon,
peer->subd_in = NULL;
peer->peer_in = NULL;
peer->sent_to_peer = NULL;
peer->urgent = false;
peer->peer_outq = msg_queue_new(peer);
peer->subd_outq = msg_queue_new(peer);

View File

@ -10,8 +10,11 @@
#include <common/utils.h>
#include <connectd/multiplex.h>
#include <errno.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <wire/peer_wire.h>
#include <wire/wire.h>
#include <wire/wire_io.h>
@ -48,15 +51,118 @@ static struct io_plan *dev_leave_hanging(struct io_conn *peer_conn,
}
#endif /* DEVELOPER */
/* We're happy for the kernel to batch update and gossip messages, but a
* commitment message, for example, should be instantly sent. There's no
* great way of doing this, unfortunately.
*
* Setting TCP_NODELAY on Linux flushes the socket, which really means
* we'd want to toggle on then off it *after* sending. But Linux has
* TCP_CORK. On FreeBSD, it seems (looking at source) not to, so
* there we'd want to set it before the send, and reenable it
* afterwards. Even if this is wrong on other non-Linux platforms, it
* only means one extra packet.
*/
static void set_urgent_flag(struct peer *peer, bool urgent)
{
int val;
int opt;
const char *optname;
static bool complained = false;
if (urgent == peer->urgent)
return;
#ifdef TCP_CORK
opt = TCP_CORK;
optname = "TCP_CORK";
#elif defined(TCP_NODELAY)
opt = TCP_NODELAY;
optname = "TCP_NODELAY";
#else
#error "Please report platform with neither TCP_CORK nor TCP_NODELAY?"
#endif
val = urgent;
if (setsockopt(io_conn_fd(peer->to_peer),
IPPROTO_TCP, opt, &val, sizeof(val)) != 0) {
/* This actually happens in testing, where we blackhole the fd */
if (!complained) {
status_unusual("setsockopt %s=1: %s",
optname,
strerror(errno));
complained = true;
}
}
peer->urgent = urgent;
}
static bool is_urgent(enum peer_wire type)
{
switch (type) {
case WIRE_INIT:
case WIRE_ERROR:
case WIRE_WARNING:
case WIRE_TX_ADD_INPUT:
case WIRE_TX_ADD_OUTPUT:
case WIRE_TX_REMOVE_INPUT:
case WIRE_TX_REMOVE_OUTPUT:
case WIRE_TX_COMPLETE:
case WIRE_TX_SIGNATURES:
case WIRE_OPEN_CHANNEL:
case WIRE_ACCEPT_CHANNEL:
case WIRE_FUNDING_CREATED:
case WIRE_FUNDING_SIGNED:
case WIRE_FUNDING_LOCKED:
case WIRE_OPEN_CHANNEL2:
case WIRE_ACCEPT_CHANNEL2:
case WIRE_INIT_RBF:
case WIRE_ACK_RBF:
case WIRE_SHUTDOWN:
case WIRE_CLOSING_SIGNED:
case WIRE_UPDATE_ADD_HTLC:
case WIRE_UPDATE_FULFILL_HTLC:
case WIRE_UPDATE_FAIL_HTLC:
case WIRE_UPDATE_FAIL_MALFORMED_HTLC:
case WIRE_UPDATE_FEE:
case WIRE_UPDATE_BLOCKHEIGHT:
case WIRE_CHANNEL_REESTABLISH:
case WIRE_ANNOUNCEMENT_SIGNATURES:
case WIRE_CHANNEL_ANNOUNCEMENT:
case WIRE_NODE_ANNOUNCEMENT:
case WIRE_CHANNEL_UPDATE:
case WIRE_QUERY_SHORT_CHANNEL_IDS:
case WIRE_REPLY_SHORT_CHANNEL_IDS_END:
case WIRE_QUERY_CHANNEL_RANGE:
case WIRE_REPLY_CHANNEL_RANGE:
case WIRE_GOSSIP_TIMESTAMP_FILTER:
case WIRE_OBS2_ONION_MESSAGE:
case WIRE_ONION_MESSAGE:
#if EXPERIMENTAL_FEATURES
case WIRE_STFU:
#endif
return false;
/* These are time-sensitive, and so send without delay. */
case WIRE_PING:
case WIRE_PONG:
case WIRE_COMMITMENT_SIGNED:
case WIRE_REVOKE_AND_ACK:
return true;
};
/* plugins can inject other messages; assume not urgent. */
return false;
}
static struct io_plan *encrypt_and_send(struct peer *peer,
const u8 *msg TAKES,
struct io_plan *(*next)
(struct io_conn *peer_conn,
struct peer *peer))
{
#if DEVELOPER
int type = fromwire_peektype(msg);
#if DEVELOPER
switch (dev_disconnect(&peer->id, type)) {
case DEV_DISCONNECT_BEFORE:
if (taken(msg))
@ -75,6 +181,7 @@ static struct io_plan *encrypt_and_send(struct peer *peer,
break;
}
#endif
set_urgent_flag(peer, is_urgent(type));
/* We free this and the encrypted version in next write_to_peer */
peer->sent_to_peer = cryptomsg_encrypt_msg(peer, &peer->cs, msg);

View File

@ -20,6 +20,9 @@ struct peer {
/* Final message to send to peer (and hangup) */
u8 *final_msg;
/* When we write something which wants Nagle overridden */
bool urgent;
/* Input buffers. */
u8 *subd_in, *peer_in;

View File

@ -2339,7 +2339,8 @@ def test_channel_receivable(node_factory, bitcoind):
assert l2.rpc.listpeers()['peers'][0]['channels'][0]['receivable_msat'] == Millisatoshi(0)
l1.rpc.waitsendpay(payment_hash, TIMEOUT)
# Make sure l2 thinks it's all over.
# Make sure both think it's all over.
wait_for(lambda: len(l1.rpc.listpeers()['peers'][0]['channels'][0]['htlcs']) == 0)
wait_for(lambda: len(l2.rpc.listpeers()['peers'][0]['channels'][0]['htlcs']) == 0)
# Now, reverse should work similarly.
inv = l1.rpc.invoice('any', 'inv', 'for testing')