ccan: update for more efficient ccan/io.

A fairly simple change: ccan/io will now call the underlying I/O
routines repeatedly until they indicate they are unfinished, *or* fail
with EAGAIN.  This should make a significant difference to large
nodes, which currently spend far too much time calling poll() to
discover a single fd is still writable (mainly, for streaming gossip).

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
Changelog-Changed: connectd: now should use far less CPU on large nodes.
This commit is contained in:
Rusty Russell 2024-06-24 21:13:23 +09:30 committed by ShahanaFarooqui
parent 7ef8645aad
commit 341b62eea3
8 changed files with 259 additions and 68 deletions

View File

@ -1,3 +1,3 @@
CCAN imported from http://ccodearchive.net.
CCAN version: init-2582-g2d188544
CCAN version: init-2586-gd4932820

View File

@ -9,7 +9,7 @@
#define BIT_ALIGN_DOWN(n) ((n) & ~(BITMAP_WORD_BITS - 1))
#define BIT_ALIGN_UP(n) BIT_ALIGN_DOWN((n) + BITMAP_WORD_BITS - 1)
void bitmap_zero_range(bitmap *bitmap, unsigned long n, unsigned long m)
void bitmap_zero_range(bitmap *b, unsigned long n, unsigned long m)
{
unsigned long an = BIT_ALIGN_UP(n);
unsigned long am = BIT_ALIGN_DOWN(m);
@ -19,22 +19,22 @@ void bitmap_zero_range(bitmap *bitmap, unsigned long n, unsigned long m)
assert(m >= n);
if (am < an) {
BITMAP_WORD(bitmap, n) &= ~bitmap_bswap(headmask & tailmask);
BITMAP_WORD(b, n) &= ~bitmap_bswap(headmask & tailmask);
return;
}
if (an > n)
BITMAP_WORD(bitmap, n) &= ~bitmap_bswap(headmask);
BITMAP_WORD(b, n) &= ~bitmap_bswap(headmask);
if (am > an)
memset(&BITMAP_WORD(bitmap, an), 0,
memset(&BITMAP_WORD(b, an), 0,
(am - an) / BITMAP_WORD_BITS * sizeof(bitmap_word));
if (m > am)
BITMAP_WORD(bitmap, m) &= ~bitmap_bswap(tailmask);
BITMAP_WORD(b, m) &= ~bitmap_bswap(tailmask);
}
void bitmap_fill_range(bitmap *bitmap, unsigned long n, unsigned long m)
void bitmap_fill_range(bitmap *b, unsigned long n, unsigned long m)
{
unsigned long an = BIT_ALIGN_UP(n);
unsigned long am = BIT_ALIGN_DOWN(m);
@ -44,19 +44,19 @@ void bitmap_fill_range(bitmap *bitmap, unsigned long n, unsigned long m)
assert(m >= n);
if (am < an) {
BITMAP_WORD(bitmap, n) |= bitmap_bswap(headmask & tailmask);
BITMAP_WORD(b, n) |= bitmap_bswap(headmask & tailmask);
return;
}
if (an > n)
BITMAP_WORD(bitmap, n) |= bitmap_bswap(headmask);
BITMAP_WORD(b, n) |= bitmap_bswap(headmask);
if (am > an)
memset(&BITMAP_WORD(bitmap, an), 0xff,
memset(&BITMAP_WORD(b, an), 0xff,
(am - an) / BITMAP_WORD_BITS * sizeof(bitmap_word));
if (m > am)
BITMAP_WORD(bitmap, m) |= bitmap_bswap(tailmask);
BITMAP_WORD(b, m) |= bitmap_bswap(tailmask);
}
static int bitmap_clz(bitmap_word w)
@ -76,7 +76,7 @@ static int bitmap_clz(bitmap_word w)
#endif
}
unsigned long bitmap_ffs(const bitmap *bitmap,
unsigned long bitmap_ffs(const bitmap *b,
unsigned long n, unsigned long m)
{
unsigned long an = BIT_ALIGN_UP(n);
@ -87,7 +87,7 @@ unsigned long bitmap_ffs(const bitmap *bitmap,
assert(m >= n);
if (am < an) {
bitmap_word w = bitmap_bswap(BITMAP_WORD(bitmap, n));
bitmap_word w = bitmap_bswap(BITMAP_WORD(b, n));
w &= (headmask & tailmask);
@ -95,7 +95,7 @@ unsigned long bitmap_ffs(const bitmap *bitmap,
}
if (an > n) {
bitmap_word w = bitmap_bswap(BITMAP_WORD(bitmap, n));
bitmap_word w = bitmap_bswap(BITMAP_WORD(b, n));
w &= headmask;
@ -104,7 +104,7 @@ unsigned long bitmap_ffs(const bitmap *bitmap,
}
while (an < am) {
bitmap_word w = bitmap_bswap(BITMAP_WORD(bitmap, an));
bitmap_word w = bitmap_bswap(BITMAP_WORD(b, an));
if (w)
return an + bitmap_clz(w);
@ -113,7 +113,7 @@ unsigned long bitmap_ffs(const bitmap *bitmap,
}
if (m > am) {
bitmap_word w = bitmap_bswap(BITMAP_WORD(bitmap, m));
bitmap_word w = bitmap_bswap(BITMAP_WORD(b, m));
w &= tailmask;

View File

@ -58,37 +58,37 @@ static inline bitmap_word bitmap_bswap(bitmap_word w)
#define BITMAP_TAIL(_bm, _nbits) \
(BITMAP_TAILWORD(_bm, _nbits) & BITMAP_TAILBITS(_nbits))
static inline void bitmap_set_bit(bitmap *bitmap, unsigned long n)
static inline void bitmap_set_bit(bitmap *b, unsigned long n)
{
BITMAP_WORD(bitmap, n) |= BITMAP_WORDBIT(n);
BITMAP_WORD(b, n) |= BITMAP_WORDBIT(n);
}
static inline void bitmap_clear_bit(bitmap *bitmap, unsigned long n)
static inline void bitmap_clear_bit(bitmap *b, unsigned long n)
{
BITMAP_WORD(bitmap, n) &= ~BITMAP_WORDBIT(n);
BITMAP_WORD(b, n) &= ~BITMAP_WORDBIT(n);
}
static inline void bitmap_change_bit(bitmap *bitmap, unsigned long n)
static inline void bitmap_change_bit(bitmap *b, unsigned long n)
{
BITMAP_WORD(bitmap, n) ^= BITMAP_WORDBIT(n);
BITMAP_WORD(b, n) ^= BITMAP_WORDBIT(n);
}
static inline bool bitmap_test_bit(const bitmap *bitmap, unsigned long n)
static inline bool bitmap_test_bit(const bitmap *b, unsigned long n)
{
return !!(BITMAP_WORD(bitmap, n) & BITMAP_WORDBIT(n));
return !!(BITMAP_WORD(b, n) & BITMAP_WORDBIT(n));
}
void bitmap_zero_range(bitmap *bitmap, unsigned long n, unsigned long m);
void bitmap_fill_range(bitmap *bitmap, unsigned long n, unsigned long m);
void bitmap_zero_range(bitmap *b, unsigned long n, unsigned long m);
void bitmap_fill_range(bitmap *b, unsigned long n, unsigned long m);
static inline void bitmap_zero(bitmap *bitmap, unsigned long nbits)
static inline void bitmap_zero(bitmap *b, unsigned long nbits)
{
memset(bitmap, 0, bitmap_sizeof(nbits));
memset(b, 0, bitmap_sizeof(nbits));
}
static inline void bitmap_fill(bitmap *bitmap, unsigned long nbits)
static inline void bitmap_fill(bitmap *b, unsigned long nbits)
{
memset(bitmap, 0xff, bitmap_sizeof(nbits));
memset(b, 0xff, bitmap_sizeof(nbits));
}
static inline void bitmap_copy(bitmap *dst, const bitmap *src,
@ -161,37 +161,36 @@ static inline bool bitmap_subset(const bitmap *src1, const bitmap *src2,
return true;
}
static inline bool bitmap_full(const bitmap *bitmap, unsigned long nbits)
static inline bool bitmap_full(const bitmap *b, unsigned long nbits)
{
unsigned long i;
for (i = 0; i < BITMAP_HEADWORDS(nbits); i++) {
if (bitmap[i].w != -1UL)
if (b[i].w != -1UL)
return false;
}
if (BITMAP_HASTAIL(nbits) &&
(BITMAP_TAIL(bitmap, nbits) != BITMAP_TAILBITS(nbits)))
(BITMAP_TAIL(b, nbits) != BITMAP_TAILBITS(nbits)))
return false;
return true;
}
static inline bool bitmap_empty(const bitmap *bitmap, unsigned long nbits)
static inline bool bitmap_empty(const bitmap *b, unsigned long nbits)
{
unsigned long i;
for (i = 0; i < BITMAP_HEADWORDS(nbits); i++) {
if (bitmap[i].w != 0)
if (b[i].w != 0)
return false;
}
if (BITMAP_HASTAIL(nbits) && (BITMAP_TAIL(bitmap, nbits) != 0))
if (BITMAP_HASTAIL(nbits) && (BITMAP_TAIL(b, nbits) != 0))
return false;
return true;
}
unsigned long bitmap_ffs(const bitmap *bitmap,
unsigned long n, unsigned long m);
unsigned long bitmap_ffs(const bitmap *b, unsigned long n, unsigned long m);
/*
* Allocation functions
@ -221,26 +220,26 @@ static inline bitmap *bitmap_alloc1(unsigned long nbits)
return bitmap;
}
static inline bitmap *bitmap_realloc0(bitmap *bitmap,
static inline bitmap *bitmap_realloc0(bitmap *b,
unsigned long obits, unsigned long nbits)
{
bitmap = realloc(bitmap, bitmap_sizeof(nbits));
b = realloc(b, bitmap_sizeof(nbits));
if ((nbits > obits) && bitmap)
bitmap_zero_range(bitmap, obits, nbits);
if ((nbits > obits) && b)
bitmap_zero_range(b, obits, nbits);
return bitmap;
return b;
}
static inline bitmap *bitmap_realloc1(bitmap *bitmap,
static inline bitmap *bitmap_realloc1(bitmap *b,
unsigned long obits, unsigned long nbits)
{
bitmap = realloc(bitmap, bitmap_sizeof(nbits));
b = realloc(b, bitmap_sizeof(nbits));
if ((nbits > obits) && bitmap)
bitmap_fill_range(bitmap, obits, nbits);
if ((nbits > obits) && b)
bitmap_fill_range(b, obits, nbits);
return bitmap;
return b;
}
#endif /* CCAN_BITMAP_H_ */

View File

@ -1,16 +1,17 @@
ALL:=run-loop run-different-speed run-length-prefix
ALL:=run-loop run-different-speed run-length-prefix run-stream-many
CCANDIR:=../../..
CFLAGS:=-Wall -I$(CCANDIR) -O3 -flto
LDFLAGS:=-O3 -flto
LDLIBS:=-lrt
OBJS:=time.o poll.o io.o err.o timer.o list.o
OBJS:=time.o poll.o io.o err.o timer.o list.o ccan-tal.o ccan-take.o ccan-ilog.o
default: $(ALL)
run-loop: run-loop.o $(OBJS)
run-different-speed: run-different-speed.o $(OBJS)
run-length-prefix: run-length-prefix.o $(OBJS)
run-stream-many: run-stream-many.o $(OBJS)
time.o: $(CCANDIR)/ccan/time/time.c
$(CC) $(CFLAGS) -c -o $@ $<
@ -24,6 +25,12 @@ io.o: $(CCANDIR)/ccan/io/io.c
$(CC) $(CFLAGS) -c -o $@ $<
err.o: $(CCANDIR)/ccan/err/err.c
$(CC) $(CFLAGS) -c -o $@ $<
ccan-ilog.o: $(CCANDIR)/ccan/ilog/ilog.c
$(CC) $(CFLAGS) -c -o $@ $<
ccan-tal.o: $(CCANDIR)/ccan/tal/tal.c
$(CC) $(CFLAGS) -c -o $@ $<
ccan-take.o: $(CCANDIR)/ccan/take/take.c
$(CC) $(CFLAGS) -c -o $@ $<
clean:
rm -f *.o $(ALL)

View File

@ -0,0 +1,132 @@
/* Wait for many fds to connect, then try to stream the file to some of them in small chunks.
*
* This approximates the connectd behaviour in CLN, where we send gossip to peers.
*/
#include <ccan/io/io.h>
#include <ccan/ptrint/ptrint.h>
#include <ccan/time/time.h>
#include <inttypes.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <err.h>
#include <signal.h>
#include <sys/socket.h>
#include <netinet/in.h>
/* We expect num_expected connections, and how many will be writers */
static size_t max_readers, max_writers;
/* How many raeaders and writers still going */
static size_t num_readers, num_writers;
/* How many times to do the write */
static size_t write_iterations;
/* The buffer to write */
static char writebuf[256];
/* We need this for readers, though we don't actually care! */
static size_t len_ignored;
struct timemono start_time;
static void finished(void)
{
struct timerel elapsed = timemono_since(start_time);
printf("Finished: %"PRIu64"usec\n", time_to_usec(elapsed));
exit(0);
}
static struct io_plan *write_loop(struct io_conn *conn, ptrint_t *iter)
{
ptrdiff_t n = ptr2int(iter);
if (n > write_iterations) {
--num_writers;
if (num_writers == 0)
finished();
return io_wait(conn, conn, io_never, NULL);
}
return io_write(conn, writebuf, sizeof(writebuf), write_loop, int2ptr(n + 1));
}
static struct io_plan *read_loop(struct io_conn *conn, void *unused)
{
return io_read_partial(conn, writebuf, sizeof(writebuf), &len_ignored, read_loop, unused);
}
static void reader_failed(struct io_conn *conn, intptr_t *num)
{
err(1, "Reader %zu/%zu", (size_t)ptr2int(num), max_readers);
}
static void writer_failed(struct io_conn *conn, intptr_t *num)
{
err(1, "Writer %zu/%zu", (size_t)ptr2int(num), max_writers);
}
static struct io_plan *connection_in(struct io_conn *conn, void *sleep_on)
{
if (num_readers < max_readers) {
printf("r");
fflush(stdout);
num_readers++;
io_set_finish(conn, reader_failed, int2ptr(num_readers));
return read_loop(conn, NULL);
}
/* We assign writers last: not sure it matters, but it's more reflective
* of lightning where more recent connections tend to ask for gossip */
num_writers++;
printf("w");
fflush(stdout);
io_set_finish(conn, writer_failed, int2ptr(num_writers));
io_set_finish(conn, writer_failed, NULL);
if (num_writers < max_writers)
return io_wait(conn, sleep_on, write_loop, int2ptr(0));
/* Everyone is connected. Wake them and start final one */
io_wake(sleep_on);
printf("Starting!\n");
start_time = time_mono();
return write_loop(conn, int2ptr(0));
}
int main(int argc, char *argv[])
{
int fd;
struct sockaddr_in s4;
int on = 1;
if (argc != 5)
errx(1, "Usage: <portnum> <num-idle> <num-streaming> <mb-streamed>");
memset(&s4, 0, sizeof(s4));
s4.sin_family = AF_INET;
s4.sin_port = htons(atol(argv[1]));
s4.sin_addr.s_addr = INADDR_ANY;
max_readers = atol(argv[2]);
max_writers = atol(argv[3]);
write_iterations = atol(argv[4]) * (1024 * 1024 / sizeof(writebuf));
fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0)
err(1, "Creating socket");
/* Re-use, please.. */
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
err(1, "Setting reuseaddr");
if (bind(fd, &s4, sizeof(s4)) != 0)
err(1, "Binding");
if (listen(fd, 1) != 0)
err(1, "Listening");
io_new_listener(NULL, fd, connection_in, &s4);
io_loop(NULL, NULL);
errx(1, "Sockets exited?");
}

View File

@ -384,9 +384,19 @@ void io_wake(const void *wait)
backend_wake(wait);
}
/* Returns false if this should not be touched (eg. freed). */
static bool do_plan(struct io_conn *conn, struct io_plan *plan,
bool idle_on_epipe)
enum plan_result {
/* Destroyed, do not touch */
FREED,
/* Worked, call again. */
KEEP_GOING,
/* Failed with EAGAIN or did partial. */
EXHAUSTED,
/* No longer interested in read (or write) */
UNINTERESTED
};
static enum plan_result do_plan(struct io_conn *conn, struct io_plan *plan,
bool idle_on_epipe)
{
/* We shouldn't have polled for this event if this wasn't true! */
assert(plan->status == IO_POLLING_NOTSTARTED
@ -394,18 +404,26 @@ static bool do_plan(struct io_conn *conn, struct io_plan *plan,
switch (plan->io(conn->fd.fd, &plan->arg)) {
case -1:
/* This is expected, as we call optimistically! */
if (errno == EAGAIN)
return EXHAUSTED;
if (errno == EPIPE && idle_on_epipe) {
plan->status = IO_UNSET;
backend_new_plan(conn);
return false;
return UNINTERESTED;
}
io_close(conn);
return false;
return FREED;
case 0:
plan->status = IO_POLLING_STARTED;
return true;
/* If it started but didn't finish, don't call again. */
return EXHAUSTED;
case 1:
return next_plan(conn, plan);
if (!next_plan(conn, plan))
return FREED;
if (plan->status == IO_POLLING_NOTSTARTED)
return KEEP_GOING;
return UNINTERESTED;
default:
/* IO should only return -1, 0 or 1 */
abort();
@ -414,16 +432,43 @@ static bool do_plan(struct io_conn *conn, struct io_plan *plan,
void io_ready(struct io_conn *conn, int pollflags)
{
if (pollflags & POLLIN)
if (!do_plan(conn, &conn->plan[IO_IN], false))
return;
enum plan_result res;
if (pollflags & POLLOUT)
/* If we're writing to a closed pipe, we need to wait for
* read to fail if we're duplex: we want to drain it! */
do_plan(conn, &conn->plan[IO_OUT],
conn->plan[IO_IN].status == IO_POLLING_NOTSTARTED
|| conn->plan[IO_IN].status == IO_POLLING_STARTED);
if (pollflags & POLLIN) {
for (;;) {
res = do_plan(conn, &conn->plan[IO_IN], false);
switch (res) {
case FREED:
return;
case EXHAUSTED:
case UNINTERESTED:
goto try_write;
case KEEP_GOING:
continue;
}
abort();
}
}
try_write:
if (pollflags & POLLOUT) {
for (;;) {
/* If we're writing to a closed pipe, we need to wait for
* read to fail if we're duplex: we want to drain it! */
res = do_plan(conn, &conn->plan[IO_OUT],
conn->plan[IO_IN].status == IO_POLLING_NOTSTARTED
|| conn->plan[IO_IN].status == IO_POLLING_STARTED);
switch (res) {
case FREED:
case EXHAUSTED:
case UNINTERESTED:
return;
case KEEP_GOING:
continue;
}
abort();
}
}
}
void io_do_always(struct io_plan *plan)

View File

@ -59,7 +59,7 @@ struct io_conn;
io_new_conn_((ctx), (fd), \
typesafe_cb_preargs(struct io_plan *, void *, \
(init), (arg), \
struct io_conn *conn), \
struct io_conn *), \
(void *)(arg))
struct io_conn *io_new_conn_(const tal_t *ctx, int fd,

View File

@ -18,9 +18,17 @@ static struct io_plan *init_in_conn(struct io_conn *conn, char *buf)
return io_read(conn, buf, 2, in_conn_done, NULL);
}
/* Every second time we say we're exhausted */
static int do_nothing(int fd, struct io_plan_arg *arg)
{
return 1;
static bool read_once;
read_once = !read_once;
if (read_once)
return 1;
errno = EAGAIN;
return -1;
}
static struct io_plan *dummy_write(struct io_conn *conn,