diff --git a/ccan/README b/ccan/README index 0cf5bcc5a..022f1e52c 100644 --- a/ccan/README +++ b/ccan/README @@ -1,3 +1,3 @@ CCAN imported from http://ccodearchive.net. -CCAN version: init-2582-g2d188544 +CCAN version: init-2586-gd4932820 diff --git a/ccan/ccan/bitmap/bitmap.c b/ccan/ccan/bitmap/bitmap.c index d254b20e5..ccc0cd21e 100644 --- a/ccan/ccan/bitmap/bitmap.c +++ b/ccan/ccan/bitmap/bitmap.c @@ -9,7 +9,7 @@ #define BIT_ALIGN_DOWN(n) ((n) & ~(BITMAP_WORD_BITS - 1)) #define BIT_ALIGN_UP(n) BIT_ALIGN_DOWN((n) + BITMAP_WORD_BITS - 1) -void bitmap_zero_range(bitmap *bitmap, unsigned long n, unsigned long m) +void bitmap_zero_range(bitmap *b, unsigned long n, unsigned long m) { unsigned long an = BIT_ALIGN_UP(n); unsigned long am = BIT_ALIGN_DOWN(m); @@ -19,22 +19,22 @@ void bitmap_zero_range(bitmap *bitmap, unsigned long n, unsigned long m) assert(m >= n); if (am < an) { - BITMAP_WORD(bitmap, n) &= ~bitmap_bswap(headmask & tailmask); + BITMAP_WORD(b, n) &= ~bitmap_bswap(headmask & tailmask); return; } if (an > n) - BITMAP_WORD(bitmap, n) &= ~bitmap_bswap(headmask); + BITMAP_WORD(b, n) &= ~bitmap_bswap(headmask); if (am > an) - memset(&BITMAP_WORD(bitmap, an), 0, + memset(&BITMAP_WORD(b, an), 0, (am - an) / BITMAP_WORD_BITS * sizeof(bitmap_word)); if (m > am) - BITMAP_WORD(bitmap, m) &= ~bitmap_bswap(tailmask); + BITMAP_WORD(b, m) &= ~bitmap_bswap(tailmask); } -void bitmap_fill_range(bitmap *bitmap, unsigned long n, unsigned long m) +void bitmap_fill_range(bitmap *b, unsigned long n, unsigned long m) { unsigned long an = BIT_ALIGN_UP(n); unsigned long am = BIT_ALIGN_DOWN(m); @@ -44,19 +44,19 @@ void bitmap_fill_range(bitmap *bitmap, unsigned long n, unsigned long m) assert(m >= n); if (am < an) { - BITMAP_WORD(bitmap, n) |= bitmap_bswap(headmask & tailmask); + BITMAP_WORD(b, n) |= bitmap_bswap(headmask & tailmask); return; } if (an > n) - BITMAP_WORD(bitmap, n) |= bitmap_bswap(headmask); + BITMAP_WORD(b, n) |= bitmap_bswap(headmask); if (am > an) - memset(&BITMAP_WORD(bitmap, an), 0xff, + memset(&BITMAP_WORD(b, an), 0xff, (am - an) / BITMAP_WORD_BITS * sizeof(bitmap_word)); if (m > am) - BITMAP_WORD(bitmap, m) |= bitmap_bswap(tailmask); + BITMAP_WORD(b, m) |= bitmap_bswap(tailmask); } static int bitmap_clz(bitmap_word w) @@ -76,7 +76,7 @@ static int bitmap_clz(bitmap_word w) #endif } -unsigned long bitmap_ffs(const bitmap *bitmap, +unsigned long bitmap_ffs(const bitmap *b, unsigned long n, unsigned long m) { unsigned long an = BIT_ALIGN_UP(n); @@ -87,7 +87,7 @@ unsigned long bitmap_ffs(const bitmap *bitmap, assert(m >= n); if (am < an) { - bitmap_word w = bitmap_bswap(BITMAP_WORD(bitmap, n)); + bitmap_word w = bitmap_bswap(BITMAP_WORD(b, n)); w &= (headmask & tailmask); @@ -95,7 +95,7 @@ unsigned long bitmap_ffs(const bitmap *bitmap, } if (an > n) { - bitmap_word w = bitmap_bswap(BITMAP_WORD(bitmap, n)); + bitmap_word w = bitmap_bswap(BITMAP_WORD(b, n)); w &= headmask; @@ -104,7 +104,7 @@ unsigned long bitmap_ffs(const bitmap *bitmap, } while (an < am) { - bitmap_word w = bitmap_bswap(BITMAP_WORD(bitmap, an)); + bitmap_word w = bitmap_bswap(BITMAP_WORD(b, an)); if (w) return an + bitmap_clz(w); @@ -113,7 +113,7 @@ unsigned long bitmap_ffs(const bitmap *bitmap, } if (m > am) { - bitmap_word w = bitmap_bswap(BITMAP_WORD(bitmap, m)); + bitmap_word w = bitmap_bswap(BITMAP_WORD(b, m)); w &= tailmask; diff --git a/ccan/ccan/bitmap/bitmap.h b/ccan/ccan/bitmap/bitmap.h index 543828010..e1bf4bb76 100644 --- a/ccan/ccan/bitmap/bitmap.h +++ b/ccan/ccan/bitmap/bitmap.h @@ -58,37 +58,37 @@ static inline bitmap_word bitmap_bswap(bitmap_word w) #define BITMAP_TAIL(_bm, _nbits) \ (BITMAP_TAILWORD(_bm, _nbits) & BITMAP_TAILBITS(_nbits)) -static inline void bitmap_set_bit(bitmap *bitmap, unsigned long n) +static inline void bitmap_set_bit(bitmap *b, unsigned long n) { - BITMAP_WORD(bitmap, n) |= BITMAP_WORDBIT(n); + BITMAP_WORD(b, n) |= BITMAP_WORDBIT(n); } -static inline void bitmap_clear_bit(bitmap *bitmap, unsigned long n) +static inline void bitmap_clear_bit(bitmap *b, unsigned long n) { - BITMAP_WORD(bitmap, n) &= ~BITMAP_WORDBIT(n); + BITMAP_WORD(b, n) &= ~BITMAP_WORDBIT(n); } -static inline void bitmap_change_bit(bitmap *bitmap, unsigned long n) +static inline void bitmap_change_bit(bitmap *b, unsigned long n) { - BITMAP_WORD(bitmap, n) ^= BITMAP_WORDBIT(n); + BITMAP_WORD(b, n) ^= BITMAP_WORDBIT(n); } -static inline bool bitmap_test_bit(const bitmap *bitmap, unsigned long n) +static inline bool bitmap_test_bit(const bitmap *b, unsigned long n) { - return !!(BITMAP_WORD(bitmap, n) & BITMAP_WORDBIT(n)); + return !!(BITMAP_WORD(b, n) & BITMAP_WORDBIT(n)); } -void bitmap_zero_range(bitmap *bitmap, unsigned long n, unsigned long m); -void bitmap_fill_range(bitmap *bitmap, unsigned long n, unsigned long m); +void bitmap_zero_range(bitmap *b, unsigned long n, unsigned long m); +void bitmap_fill_range(bitmap *b, unsigned long n, unsigned long m); -static inline void bitmap_zero(bitmap *bitmap, unsigned long nbits) +static inline void bitmap_zero(bitmap *b, unsigned long nbits) { - memset(bitmap, 0, bitmap_sizeof(nbits)); + memset(b, 0, bitmap_sizeof(nbits)); } -static inline void bitmap_fill(bitmap *bitmap, unsigned long nbits) +static inline void bitmap_fill(bitmap *b, unsigned long nbits) { - memset(bitmap, 0xff, bitmap_sizeof(nbits)); + memset(b, 0xff, bitmap_sizeof(nbits)); } static inline void bitmap_copy(bitmap *dst, const bitmap *src, @@ -161,37 +161,36 @@ static inline bool bitmap_subset(const bitmap *src1, const bitmap *src2, return true; } -static inline bool bitmap_full(const bitmap *bitmap, unsigned long nbits) +static inline bool bitmap_full(const bitmap *b, unsigned long nbits) { unsigned long i; for (i = 0; i < BITMAP_HEADWORDS(nbits); i++) { - if (bitmap[i].w != -1UL) + if (b[i].w != -1UL) return false; } if (BITMAP_HASTAIL(nbits) && - (BITMAP_TAIL(bitmap, nbits) != BITMAP_TAILBITS(nbits))) + (BITMAP_TAIL(b, nbits) != BITMAP_TAILBITS(nbits))) return false; return true; } -static inline bool bitmap_empty(const bitmap *bitmap, unsigned long nbits) +static inline bool bitmap_empty(const bitmap *b, unsigned long nbits) { unsigned long i; for (i = 0; i < BITMAP_HEADWORDS(nbits); i++) { - if (bitmap[i].w != 0) + if (b[i].w != 0) return false; } - if (BITMAP_HASTAIL(nbits) && (BITMAP_TAIL(bitmap, nbits) != 0)) + if (BITMAP_HASTAIL(nbits) && (BITMAP_TAIL(b, nbits) != 0)) return false; return true; } -unsigned long bitmap_ffs(const bitmap *bitmap, - unsigned long n, unsigned long m); +unsigned long bitmap_ffs(const bitmap *b, unsigned long n, unsigned long m); /* * Allocation functions @@ -221,26 +220,26 @@ static inline bitmap *bitmap_alloc1(unsigned long nbits) return bitmap; } -static inline bitmap *bitmap_realloc0(bitmap *bitmap, +static inline bitmap *bitmap_realloc0(bitmap *b, unsigned long obits, unsigned long nbits) { - bitmap = realloc(bitmap, bitmap_sizeof(nbits)); + b = realloc(b, bitmap_sizeof(nbits)); - if ((nbits > obits) && bitmap) - bitmap_zero_range(bitmap, obits, nbits); + if ((nbits > obits) && b) + bitmap_zero_range(b, obits, nbits); - return bitmap; + return b; } -static inline bitmap *bitmap_realloc1(bitmap *bitmap, +static inline bitmap *bitmap_realloc1(bitmap *b, unsigned long obits, unsigned long nbits) { - bitmap = realloc(bitmap, bitmap_sizeof(nbits)); + b = realloc(b, bitmap_sizeof(nbits)); - if ((nbits > obits) && bitmap) - bitmap_fill_range(bitmap, obits, nbits); + if ((nbits > obits) && b) + bitmap_fill_range(b, obits, nbits); - return bitmap; + return b; } #endif /* CCAN_BITMAP_H_ */ diff --git a/ccan/ccan/io/benchmarks/Makefile b/ccan/ccan/io/benchmarks/Makefile index 7dcf9bebc..0bc0ab46d 100644 --- a/ccan/ccan/io/benchmarks/Makefile +++ b/ccan/ccan/io/benchmarks/Makefile @@ -1,16 +1,17 @@ -ALL:=run-loop run-different-speed run-length-prefix +ALL:=run-loop run-different-speed run-length-prefix run-stream-many CCANDIR:=../../.. CFLAGS:=-Wall -I$(CCANDIR) -O3 -flto LDFLAGS:=-O3 -flto LDLIBS:=-lrt -OBJS:=time.o poll.o io.o err.o timer.o list.o +OBJS:=time.o poll.o io.o err.o timer.o list.o ccan-tal.o ccan-take.o ccan-ilog.o default: $(ALL) run-loop: run-loop.o $(OBJS) run-different-speed: run-different-speed.o $(OBJS) run-length-prefix: run-length-prefix.o $(OBJS) +run-stream-many: run-stream-many.o $(OBJS) time.o: $(CCANDIR)/ccan/time/time.c $(CC) $(CFLAGS) -c -o $@ $< @@ -24,6 +25,12 @@ io.o: $(CCANDIR)/ccan/io/io.c $(CC) $(CFLAGS) -c -o $@ $< err.o: $(CCANDIR)/ccan/err/err.c $(CC) $(CFLAGS) -c -o $@ $< +ccan-ilog.o: $(CCANDIR)/ccan/ilog/ilog.c + $(CC) $(CFLAGS) -c -o $@ $< +ccan-tal.o: $(CCANDIR)/ccan/tal/tal.c + $(CC) $(CFLAGS) -c -o $@ $< +ccan-take.o: $(CCANDIR)/ccan/take/take.c + $(CC) $(CFLAGS) -c -o $@ $< clean: rm -f *.o $(ALL) diff --git a/ccan/ccan/io/benchmarks/run-stream-many.c b/ccan/ccan/io/benchmarks/run-stream-many.c new file mode 100644 index 000000000..d5dc41666 --- /dev/null +++ b/ccan/ccan/io/benchmarks/run-stream-many.c @@ -0,0 +1,132 @@ +/* Wait for many fds to connect, then try to stream the file to some of them in small chunks. + * + * This approximates the connectd behaviour in CLN, where we send gossip to peers. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* We expect num_expected connections, and how many will be writers */ +static size_t max_readers, max_writers; + +/* How many raeaders and writers still going */ +static size_t num_readers, num_writers; + +/* How many times to do the write */ +static size_t write_iterations; + +/* The buffer to write */ +static char writebuf[256]; + +/* We need this for readers, though we don't actually care! */ +static size_t len_ignored; + +struct timemono start_time; + +static void finished(void) +{ + struct timerel elapsed = timemono_since(start_time); + printf("Finished: %"PRIu64"usec\n", time_to_usec(elapsed)); + exit(0); +} + +static struct io_plan *write_loop(struct io_conn *conn, ptrint_t *iter) +{ + ptrdiff_t n = ptr2int(iter); + + if (n > write_iterations) { + --num_writers; + if (num_writers == 0) + finished(); + return io_wait(conn, conn, io_never, NULL); + } + return io_write(conn, writebuf, sizeof(writebuf), write_loop, int2ptr(n + 1)); +} + +static struct io_plan *read_loop(struct io_conn *conn, void *unused) +{ + return io_read_partial(conn, writebuf, sizeof(writebuf), &len_ignored, read_loop, unused); +} + +static void reader_failed(struct io_conn *conn, intptr_t *num) +{ + err(1, "Reader %zu/%zu", (size_t)ptr2int(num), max_readers); +} + +static void writer_failed(struct io_conn *conn, intptr_t *num) +{ + err(1, "Writer %zu/%zu", (size_t)ptr2int(num), max_writers); +} + +static struct io_plan *connection_in(struct io_conn *conn, void *sleep_on) +{ + if (num_readers < max_readers) { + printf("r"); + fflush(stdout); + num_readers++; + io_set_finish(conn, reader_failed, int2ptr(num_readers)); + return read_loop(conn, NULL); + } + + /* We assign writers last: not sure it matters, but it's more reflective + * of lightning where more recent connections tend to ask for gossip */ + num_writers++; + printf("w"); + fflush(stdout); + + io_set_finish(conn, writer_failed, int2ptr(num_writers)); + io_set_finish(conn, writer_failed, NULL); + if (num_writers < max_writers) + return io_wait(conn, sleep_on, write_loop, int2ptr(0)); + + /* Everyone is connected. Wake them and start final one */ + io_wake(sleep_on); + printf("Starting!\n"); + start_time = time_mono(); + return write_loop(conn, int2ptr(0)); +} + +int main(int argc, char *argv[]) +{ + int fd; + struct sockaddr_in s4; + int on = 1; + + if (argc != 5) + errx(1, "Usage: "); + + memset(&s4, 0, sizeof(s4)); + s4.sin_family = AF_INET; + s4.sin_port = htons(atol(argv[1])); + s4.sin_addr.s_addr = INADDR_ANY; + + max_readers = atol(argv[2]); + max_writers = atol(argv[3]); + write_iterations = atol(argv[4]) * (1024 * 1024 / sizeof(writebuf)); + + fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) + err(1, "Creating socket"); + + /* Re-use, please.. */ + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) + err(1, "Setting reuseaddr"); + + if (bind(fd, &s4, sizeof(s4)) != 0) + err(1, "Binding"); + + if (listen(fd, 1) != 0) + err(1, "Listening"); + + io_new_listener(NULL, fd, connection_in, &s4); + io_loop(NULL, NULL); + errx(1, "Sockets exited?"); +} diff --git a/ccan/ccan/io/io.c b/ccan/ccan/io/io.c index 3d00de045..58ae19bdb 100644 --- a/ccan/ccan/io/io.c +++ b/ccan/ccan/io/io.c @@ -384,9 +384,19 @@ void io_wake(const void *wait) backend_wake(wait); } -/* Returns false if this should not be touched (eg. freed). */ -static bool do_plan(struct io_conn *conn, struct io_plan *plan, - bool idle_on_epipe) +enum plan_result { + /* Destroyed, do not touch */ + FREED, + /* Worked, call again. */ + KEEP_GOING, + /* Failed with EAGAIN or did partial. */ + EXHAUSTED, + /* No longer interested in read (or write) */ + UNINTERESTED +}; + +static enum plan_result do_plan(struct io_conn *conn, struct io_plan *plan, + bool idle_on_epipe) { /* We shouldn't have polled for this event if this wasn't true! */ assert(plan->status == IO_POLLING_NOTSTARTED @@ -394,18 +404,26 @@ static bool do_plan(struct io_conn *conn, struct io_plan *plan, switch (plan->io(conn->fd.fd, &plan->arg)) { case -1: + /* This is expected, as we call optimistically! */ + if (errno == EAGAIN) + return EXHAUSTED; if (errno == EPIPE && idle_on_epipe) { plan->status = IO_UNSET; backend_new_plan(conn); - return false; + return UNINTERESTED; } io_close(conn); - return false; + return FREED; case 0: plan->status = IO_POLLING_STARTED; - return true; + /* If it started but didn't finish, don't call again. */ + return EXHAUSTED; case 1: - return next_plan(conn, plan); + if (!next_plan(conn, plan)) + return FREED; + if (plan->status == IO_POLLING_NOTSTARTED) + return KEEP_GOING; + return UNINTERESTED; default: /* IO should only return -1, 0 or 1 */ abort(); @@ -414,16 +432,43 @@ static bool do_plan(struct io_conn *conn, struct io_plan *plan, void io_ready(struct io_conn *conn, int pollflags) { - if (pollflags & POLLIN) - if (!do_plan(conn, &conn->plan[IO_IN], false)) - return; + enum plan_result res; - if (pollflags & POLLOUT) - /* If we're writing to a closed pipe, we need to wait for - * read to fail if we're duplex: we want to drain it! */ - do_plan(conn, &conn->plan[IO_OUT], - conn->plan[IO_IN].status == IO_POLLING_NOTSTARTED - || conn->plan[IO_IN].status == IO_POLLING_STARTED); + if (pollflags & POLLIN) { + for (;;) { + res = do_plan(conn, &conn->plan[IO_IN], false); + switch (res) { + case FREED: + return; + case EXHAUSTED: + case UNINTERESTED: + goto try_write; + case KEEP_GOING: + continue; + } + abort(); + } + } + +try_write: + if (pollflags & POLLOUT) { + for (;;) { + /* If we're writing to a closed pipe, we need to wait for + * read to fail if we're duplex: we want to drain it! */ + res = do_plan(conn, &conn->plan[IO_OUT], + conn->plan[IO_IN].status == IO_POLLING_NOTSTARTED + || conn->plan[IO_IN].status == IO_POLLING_STARTED); + switch (res) { + case FREED: + case EXHAUSTED: + case UNINTERESTED: + return; + case KEEP_GOING: + continue; + } + abort(); + } + } } void io_do_always(struct io_plan *plan) diff --git a/ccan/ccan/io/io.h b/ccan/ccan/io/io.h index cc92be2c5..5d084828b 100644 --- a/ccan/ccan/io/io.h +++ b/ccan/ccan/io/io.h @@ -59,7 +59,7 @@ struct io_conn; io_new_conn_((ctx), (fd), \ typesafe_cb_preargs(struct io_plan *, void *, \ (init), (arg), \ - struct io_conn *conn), \ + struct io_conn *), \ (void *)(arg)) struct io_conn *io_new_conn_(const tal_t *ctx, int fd, diff --git a/ccan/ccan/io/test/run-43-io_plan_in_started.c b/ccan/ccan/io/test/run-43-io_plan_in_started.c index f63f87790..74fcf55c4 100644 --- a/ccan/ccan/io/test/run-43-io_plan_in_started.c +++ b/ccan/ccan/io/test/run-43-io_plan_in_started.c @@ -18,9 +18,17 @@ static struct io_plan *init_in_conn(struct io_conn *conn, char *buf) return io_read(conn, buf, 2, in_conn_done, NULL); } +/* Every second time we say we're exhausted */ static int do_nothing(int fd, struct io_plan_arg *arg) { - return 1; + static bool read_once; + + read_once = !read_once; + if (read_once) + return 1; + + errno = EAGAIN; + return -1; } static struct io_plan *dummy_write(struct io_conn *conn,