diff --git a/channeld/channeld.c b/channeld/channeld.c index 63f161ace..b9a660524 100644 --- a/channeld/channeld.c +++ b/channeld/channeld.c @@ -1123,7 +1123,7 @@ static void send_ping(struct peer *peer) exit(0); } - peer_write_no_delay(peer->pps, take(make_ping(NULL, 1, 0))); + peer_write(peer->pps, take(make_ping(NULL, 1, 0))); peer->expecting_pong = PONG_EXPECTED_PROBING; set_ping_timer(peer); } @@ -1415,7 +1415,7 @@ static void send_commit(struct peer *peer) msg = towire_commitment_signed(NULL, &peer->channel_id, &commit_sig.s, raw_sigs(tmpctx, htlc_sigs)); - peer_write_no_delay(peer->pps, take(msg)); + peer_write(peer->pps, take(msg)); maybe_send_shutdown(peer); @@ -1583,7 +1583,7 @@ static void send_revocation(struct peer *peer, WIRE_CHANNELD_GOT_COMMITSIG_REPLY); /* Now we can finally send revoke_and_ack to peer */ - peer_write_no_delay(peer->pps, take(msg)); + peer_write(peer->pps, take(msg)); } static void handle_peer_commit_sig(struct peer *peer, const u8 *msg) @@ -3631,7 +3631,7 @@ static void handle_send_ping(struct peer *peer, const u8 *msg) if (tal_count(ping) > 65535) status_failed(STATUS_FAIL_MASTER_IO, "Oversize ping"); - peer_write_no_delay(peer->pps, take(ping)); + peer_write(peer->pps, take(ping)); /* Since we're doing this manually, kill and restart timer. */ status_debug("sending ping expecting %sresponse", diff --git a/common/peer_io.c b/common/peer_io.c index 995920fd5..587a14901 100644 --- a/common/peer_io.c +++ b/common/peer_io.c @@ -22,50 +22,6 @@ void peer_write(struct per_peer_state *pps, const void *msg TAKES) peer_failed_connection_lost(); } -/* We're happy for the kernel to batch update and gossip messages, but a - * commitment message, for example, should be instantly sent. There's no - * great way of doing this, unfortunately. - * - * Setting TCP_NODELAY on Linux flushes the socket, which really means - * we'd want to toggle on then off it *after* sending. But Linux has - * TCP_CORK. On FreeBSD, it seems (looking at source) not to, so - * there we'd want to set it before the send, and reenable it - * afterwards. Even if this is wrong on other non-Linux platforms, it - * only means one extra packet. - */ -void peer_write_no_delay(struct per_peer_state *pps, const void *msg TAKES) -{ - int val; - int opt; - const char *optname; - static bool complained = false; - -#ifdef TCP_CORK - opt = TCP_CORK; - optname = "TCP_CORK"; -#elif defined(TCP_NODELAY) - opt = TCP_NODELAY; - optname = "TCP_NODELAY"; -#else -#error "Please report platform with neither TCP_CORK nor TCP_NODELAY?" -#endif - - val = 1; - if (setsockopt(pps->peer_fd, IPPROTO_TCP, opt, &val, sizeof(val)) != 0) { - /* This actually happens in testing, where we blackhole the fd */ - if (!complained) { - status_unusual("setsockopt %s=1: %s", - optname, - strerror(errno)); - complained = true; - } - } - peer_write(pps, msg); - - val = 0; - setsockopt(pps->peer_fd, IPPROTO_TCP, opt, &val, sizeof(val)); -} - u8 *peer_read(const tal_t *ctx, struct per_peer_state *pps) { u8 *dec = wire_sync_read(ctx, pps->peer_fd); diff --git a/common/peer_io.h b/common/peer_io.h index 0ca6670ac..621e71db3 100644 --- a/common/peer_io.h +++ b/common/peer_io.h @@ -9,9 +9,6 @@ struct per_peer_state; /* Exits with peer_failed_connection_lost() if write fails. */ void peer_write(struct per_peer_state *pps, const void *msg TAKES); -/* Same, but disabled nagle for this message. */ -void peer_write_no_delay(struct per_peer_state *pps, const void *msg TAKES); - /* Exits with peer_failed_connection_lost() if can't read packet. */ u8 *peer_read(const tal_t *ctx, struct per_peer_state *pps); diff --git a/connectd/connectd.c b/connectd/connectd.c index 3789ec128..2fe095399 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -454,6 +454,7 @@ static struct peer *new_peer(struct daemon *daemon, peer->subd_in = NULL; peer->peer_in = NULL; peer->sent_to_peer = NULL; + peer->urgent = false; peer->peer_outq = msg_queue_new(peer); peer->subd_outq = msg_queue_new(peer); diff --git a/connectd/multiplex.c b/connectd/multiplex.c index 52e43c765..ccbe5854b 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -10,8 +10,11 @@ #include #include #include +#include +#include #include #include +#include #include #include @@ -48,15 +51,118 @@ static struct io_plan *dev_leave_hanging(struct io_conn *peer_conn, } #endif /* DEVELOPER */ +/* We're happy for the kernel to batch update and gossip messages, but a + * commitment message, for example, should be instantly sent. There's no + * great way of doing this, unfortunately. + * + * Setting TCP_NODELAY on Linux flushes the socket, which really means + * we'd want to toggle on then off it *after* sending. But Linux has + * TCP_CORK. On FreeBSD, it seems (looking at source) not to, so + * there we'd want to set it before the send, and reenable it + * afterwards. Even if this is wrong on other non-Linux platforms, it + * only means one extra packet. + */ +static void set_urgent_flag(struct peer *peer, bool urgent) +{ + int val; + int opt; + const char *optname; + static bool complained = false; + + if (urgent == peer->urgent) + return; + +#ifdef TCP_CORK + opt = TCP_CORK; + optname = "TCP_CORK"; +#elif defined(TCP_NODELAY) + opt = TCP_NODELAY; + optname = "TCP_NODELAY"; +#else +#error "Please report platform with neither TCP_CORK nor TCP_NODELAY?" +#endif + + val = urgent; + if (setsockopt(io_conn_fd(peer->to_peer), + IPPROTO_TCP, opt, &val, sizeof(val)) != 0) { + /* This actually happens in testing, where we blackhole the fd */ + if (!complained) { + status_unusual("setsockopt %s=1: %s", + optname, + strerror(errno)); + complained = true; + } + } + peer->urgent = urgent; +} + +static bool is_urgent(enum peer_wire type) +{ + switch (type) { + case WIRE_INIT: + case WIRE_ERROR: + case WIRE_WARNING: + case WIRE_TX_ADD_INPUT: + case WIRE_TX_ADD_OUTPUT: + case WIRE_TX_REMOVE_INPUT: + case WIRE_TX_REMOVE_OUTPUT: + case WIRE_TX_COMPLETE: + case WIRE_TX_SIGNATURES: + case WIRE_OPEN_CHANNEL: + case WIRE_ACCEPT_CHANNEL: + case WIRE_FUNDING_CREATED: + case WIRE_FUNDING_SIGNED: + case WIRE_FUNDING_LOCKED: + case WIRE_OPEN_CHANNEL2: + case WIRE_ACCEPT_CHANNEL2: + case WIRE_INIT_RBF: + case WIRE_ACK_RBF: + case WIRE_SHUTDOWN: + case WIRE_CLOSING_SIGNED: + case WIRE_UPDATE_ADD_HTLC: + case WIRE_UPDATE_FULFILL_HTLC: + case WIRE_UPDATE_FAIL_HTLC: + case WIRE_UPDATE_FAIL_MALFORMED_HTLC: + case WIRE_UPDATE_FEE: + case WIRE_UPDATE_BLOCKHEIGHT: + case WIRE_CHANNEL_REESTABLISH: + case WIRE_ANNOUNCEMENT_SIGNATURES: + case WIRE_CHANNEL_ANNOUNCEMENT: + case WIRE_NODE_ANNOUNCEMENT: + case WIRE_CHANNEL_UPDATE: + case WIRE_QUERY_SHORT_CHANNEL_IDS: + case WIRE_REPLY_SHORT_CHANNEL_IDS_END: + case WIRE_QUERY_CHANNEL_RANGE: + case WIRE_REPLY_CHANNEL_RANGE: + case WIRE_GOSSIP_TIMESTAMP_FILTER: + case WIRE_OBS2_ONION_MESSAGE: + case WIRE_ONION_MESSAGE: +#if EXPERIMENTAL_FEATURES + case WIRE_STFU: +#endif + return false; + + /* These are time-sensitive, and so send without delay. */ + case WIRE_PING: + case WIRE_PONG: + case WIRE_COMMITMENT_SIGNED: + case WIRE_REVOKE_AND_ACK: + return true; + }; + + /* plugins can inject other messages; assume not urgent. */ + return false; +} + static struct io_plan *encrypt_and_send(struct peer *peer, const u8 *msg TAKES, struct io_plan *(*next) (struct io_conn *peer_conn, struct peer *peer)) { -#if DEVELOPER int type = fromwire_peektype(msg); +#if DEVELOPER switch (dev_disconnect(&peer->id, type)) { case DEV_DISCONNECT_BEFORE: if (taken(msg)) @@ -75,6 +181,7 @@ static struct io_plan *encrypt_and_send(struct peer *peer, break; } #endif + set_urgent_flag(peer, is_urgent(type)); /* We free this and the encrypted version in next write_to_peer */ peer->sent_to_peer = cryptomsg_encrypt_msg(peer, &peer->cs, msg); diff --git a/connectd/multiplex.h b/connectd/multiplex.h index 12ccfa082..ccd037939 100644 --- a/connectd/multiplex.h +++ b/connectd/multiplex.h @@ -20,6 +20,9 @@ struct peer { /* Final message to send to peer (and hangup) */ u8 *final_msg; + /* When we write something which wants Nagle overridden */ + bool urgent; + /* Input buffers. */ u8 *subd_in, *peer_in; diff --git a/tests/test_pay.py b/tests/test_pay.py index 5516099d9..569b56f7b 100644 --- a/tests/test_pay.py +++ b/tests/test_pay.py @@ -2339,7 +2339,8 @@ def test_channel_receivable(node_factory, bitcoind): assert l2.rpc.listpeers()['peers'][0]['channels'][0]['receivable_msat'] == Millisatoshi(0) l1.rpc.waitsendpay(payment_hash, TIMEOUT) - # Make sure l2 thinks it's all over. + # Make sure both think it's all over. + wait_for(lambda: len(l1.rpc.listpeers()['peers'][0]['channels'][0]['htlcs']) == 0) wait_for(lambda: len(l2.rpc.listpeers()['peers'][0]['channels'][0]['htlcs']) == 0) # Now, reverse should work similarly. inv = l1.rpc.invoice('any', 'inv', 'for testing')