From 3e2dea221ba80599968797bff7c8823f3254c596 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Thu, 25 Oct 2018 10:16:08 +1030 Subject: [PATCH] common/msg_queue: make it a tal object. This way there's no need for a context pointer, and freeing a msg_queue frees its contents, as expected. Signed-off-by: Rusty Russell --- channeld/channeld.c | 14 +++++++------- common/daemon_conn.c | 16 ++++++++-------- common/msg_queue.c | 18 +++++++++++------- common/msg_queue.h | 11 ++++------- lightningd/subd.c | 10 +++++----- lightningd/subd.h | 2 +- 6 files changed, 36 insertions(+), 35 deletions(-) diff --git a/channeld/channeld.c b/channeld/channeld.c index d3ec5988a..bf30f45b2 100644 --- a/channeld/channeld.c +++ b/channeld/channeld.c @@ -105,7 +105,7 @@ struct peer { /* Messages from master / gossipd: we queue them since we * might be waiting for a specific reply. */ - struct msg_queue from_master, from_gossipd; + struct msg_queue *from_master, *from_gossipd; struct timers timers; struct oneshot *commit_timer; @@ -736,7 +736,7 @@ static u8 *master_wait_sync_reply(const tal_t *ctx, enum channel_wire_type replytype) { return wait_sync_reply(ctx, msg, replytype, - MASTER_FD, &peer->from_master, "master"); + MASTER_FD, peer->from_master, "master"); } static u8 *gossipd_wait_sync_reply(const tal_t *ctx, @@ -744,7 +744,7 @@ static u8 *gossipd_wait_sync_reply(const tal_t *ctx, enum gossip_wire_type replytype) { return wait_sync_reply(ctx, msg, replytype, - GOSSIP_FD, &peer->from_gossipd, "gossipd"); + GOSSIP_FD, peer->from_gossipd, "gossipd"); } static u8 *foreign_channel_update(const tal_t *ctx, @@ -2704,8 +2704,8 @@ int main(int argc, char *argv[]) peer->have_sigs[LOCAL] = peer->have_sigs[REMOTE] = false; peer->announce_depth_reached = false; peer->channel_local_active = false; - msg_queue_init(&peer->from_master, peer); - msg_queue_init(&peer->from_gossipd, peer); + peer->from_master = msg_queue_new(peer); + peer->from_gossipd = msg_queue_new(peer); peer->next_commit_sigs = NULL; peer->shutdown_sent[LOCAL] = false; peer->last_update_timestamp = 0; @@ -2745,7 +2745,7 @@ int main(int argc, char *argv[]) clean_tmpctx(); /* For simplicity, we process one event at a time. */ - msg = msg_dequeue(&peer->from_master); + msg = msg_dequeue(peer->from_master); if (msg) { status_trace("Now dealing with deferred %s", channel_wire_type_name( @@ -2761,7 +2761,7 @@ int main(int argc, char *argv[]) continue; } - msg = msg_dequeue(&peer->from_gossipd); + msg = msg_dequeue(peer->from_gossipd); if (msg) { status_trace("Now dealing with deferred gossip %u", fromwire_peektype(msg)); diff --git a/common/daemon_conn.c b/common/daemon_conn.c index 2103d078d..deb69dd53 100644 --- a/common/daemon_conn.c +++ b/common/daemon_conn.c @@ -10,7 +10,7 @@ struct daemon_conn { u8 *msg_in; /* Queue of outgoing messages */ - struct msg_queue out; + struct msg_queue *out; /* Underlying connection: we're freed if it closes, and vice versa */ struct io_conn *conn; @@ -47,7 +47,7 @@ static struct io_plan *daemon_conn_write_next(struct io_conn *conn, const u8 *msg; again: - msg = msg_dequeue(&dc->out); + msg = msg_dequeue(dc->out); if (msg) { int fd = msg_extract_fd(msg); if (fd >= 0) { @@ -61,7 +61,7 @@ again: if (dc->outq_empty(dc->arg)) goto again; } - return msg_queue_wait(conn, &dc->out, daemon_conn_write_next, dc); + return msg_queue_wait(conn, dc->out, daemon_conn_write_next, dc); } bool daemon_conn_sync_flush(struct daemon_conn *dc) @@ -79,7 +79,7 @@ bool daemon_conn_sync_flush(struct daemon_conn *dc) return false; /* Flush existing messages. */ - while ((msg = msg_dequeue(&dc->out)) != NULL) { + while ((msg = msg_dequeue(dc->out)) != NULL) { int fd = msg_extract_fd(msg); if (fd >= 0) { tal_free(msg); @@ -123,7 +123,7 @@ struct daemon_conn *daemon_conn_new_(const tal_t *ctx, int fd, dc->outq_empty = outq_empty; dc->arg = arg; dc->msg_in = NULL; - msg_queue_init(&dc->out, dc); + dc->out = msg_queue_new(dc); dc->conn = io_new_conn(dc, fd, daemon_conn_start, dc); tal_add_destructor2(dc->conn, destroy_dc_from_conn, dc); @@ -132,15 +132,15 @@ struct daemon_conn *daemon_conn_new_(const tal_t *ctx, int fd, void daemon_conn_send(struct daemon_conn *dc, const u8 *msg) { - msg_enqueue(&dc->out, msg); + msg_enqueue(dc->out, msg); } void daemon_conn_send_fd(struct daemon_conn *dc, int fd) { - msg_enqueue_fd(&dc->out, fd); + msg_enqueue_fd(dc->out, fd); } void daemon_conn_wake(struct daemon_conn *dc) { - msg_wake(&dc->out); + msg_wake(dc->out); } diff --git a/common/msg_queue.c b/common/msg_queue.c index f5c934829..d5e82beaa 100644 --- a/common/msg_queue.c +++ b/common/msg_queue.c @@ -1,18 +1,22 @@ #include -#include #include #include #include -void msg_queue_init(struct msg_queue *q, const tal_t *ctx) +struct msg_queue { + const u8 **q; +}; + +struct msg_queue *msg_queue_new(const tal_t *ctx) { - q->q = tal_arr(ctx, const u8 *, 0); - q->ctx = ctx; + struct msg_queue *q = tal(ctx, struct msg_queue); + q->q = tal_arr(q, const u8 *, 0); + return q; } -static void do_enqueue(struct msg_queue *q, const u8 *add) +static void do_enqueue(struct msg_queue *q, const u8 *add TAKES) { - *tal_arr_expand(&q->q) = tal_dup_arr(q->ctx, u8, add, tal_count(add), 0); + *tal_arr_expand(&q->q) = tal_dup_arr(q, u8, add, tal_count(add), 0); /* In case someone is waiting */ io_wake(q); @@ -26,7 +30,7 @@ void msg_enqueue(struct msg_queue *q, const u8 *add) void msg_enqueue_fd(struct msg_queue *q, int fd) { - u8 *fdmsg = tal_arr(q->ctx, u8, 0); + u8 *fdmsg = tal_arr(q, u8, 0); towire_u16(&fdmsg, MSG_PASS_FD); towire_u32(&fdmsg, fd); do_enqueue(q, take(fdmsg)); diff --git a/common/msg_queue.h b/common/msg_queue.h index 46d6f41d4..a311408cb 100644 --- a/common/msg_queue.h +++ b/common/msg_queue.h @@ -4,19 +4,16 @@ #include "config.h" #include #include +#include /* Reserved type used to indicate we're actually passing an fd. */ #define MSG_PASS_FD 0xFFFF -struct msg_queue { - const u8 **q; - const tal_t *ctx; -}; - -void msg_queue_init(struct msg_queue *q, const tal_t *ctx); +/* Allocate a new msg queue. */ +struct msg_queue *msg_queue_new(const tal_t *ctx); /* If add is taken(), freed after sending. msg_wake() implied. */ -void msg_enqueue(struct msg_queue *q, const u8 *add); +void msg_enqueue(struct msg_queue *q, const u8 *add TAKES); /* Fd is closed after sending. msg_wake() implied. */ void msg_enqueue_fd(struct msg_queue *q, int fd); diff --git a/lightningd/subd.c b/lightningd/subd.c index c6a667879..41bb0dd17 100644 --- a/lightningd/subd.c +++ b/lightningd/subd.c @@ -577,12 +577,12 @@ static void destroy_subd(struct subd *sd) static struct io_plan *msg_send_next(struct io_conn *conn, struct subd *sd) { - const u8 *msg = msg_dequeue(&sd->outq); + const u8 *msg = msg_dequeue(sd->outq); int fd; /* Nothing to do? Wait for msg_enqueue. */ if (!msg) - return msg_queue_wait(conn, &sd->outq, msg_send_next, sd); + return msg_queue_wait(conn, sd->outq, msg_send_next, sd); fd = msg_extract_fd(msg); if (fd >= 0) { @@ -655,7 +655,7 @@ static struct subd *new_subd(struct lightningd *ld, sd->errcb = errcb; sd->billboardcb = billboardcb; sd->fds_in = NULL; - msg_queue_init(&sd->outq, sd); + sd->outq = msg_queue_new(sd); tal_add_destructor(sd, destroy_subd); list_head_init(&sd->reqs); sd->channel = channel; @@ -723,12 +723,12 @@ void subd_send_msg(struct subd *sd, const u8 *msg_out) /* FIXME: We should use unique upper bits for each daemon, then * have generate-wire.py add them, just assert here. */ assert(!strstarts(sd->msgname(fromwire_peektype(msg_out)), "INVALID")); - msg_enqueue(&sd->outq, msg_out); + msg_enqueue(sd->outq, msg_out); } void subd_send_fd(struct subd *sd, int fd) { - msg_enqueue_fd(&sd->outq, fd); + msg_enqueue_fd(sd->outq, fd); } void subd_req_(const tal_t *ctx, diff --git a/lightningd/subd.h b/lightningd/subd.h index a0ea87a36..81730ef81 100644 --- a/lightningd/subd.h +++ b/lightningd/subd.h @@ -65,7 +65,7 @@ struct subd { bool talks_to_peer; /* Messages queue up here. */ - struct msg_queue outq; + struct msg_queue *outq; /* Callbacks for replies. */ struct list_head reqs;