common/msg_queue: make it a tal object.

This way there's no need for a context pointer, and freeing a msg_queue
frees its contents, as expected.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell 2018-10-25 10:16:08 +10:30
parent 3c97f3954e
commit 3e2dea221b
6 changed files with 36 additions and 35 deletions

View File

@ -105,7 +105,7 @@ struct peer {
/* Messages from master / gossipd: we queue them since we /* Messages from master / gossipd: we queue them since we
* might be waiting for a specific reply. */ * might be waiting for a specific reply. */
struct msg_queue from_master, from_gossipd; struct msg_queue *from_master, *from_gossipd;
struct timers timers; struct timers timers;
struct oneshot *commit_timer; struct oneshot *commit_timer;
@ -736,7 +736,7 @@ static u8 *master_wait_sync_reply(const tal_t *ctx,
enum channel_wire_type replytype) enum channel_wire_type replytype)
{ {
return wait_sync_reply(ctx, msg, replytype, return wait_sync_reply(ctx, msg, replytype,
MASTER_FD, &peer->from_master, "master"); MASTER_FD, peer->from_master, "master");
} }
static u8 *gossipd_wait_sync_reply(const tal_t *ctx, static u8 *gossipd_wait_sync_reply(const tal_t *ctx,
@ -744,7 +744,7 @@ static u8 *gossipd_wait_sync_reply(const tal_t *ctx,
enum gossip_wire_type replytype) enum gossip_wire_type replytype)
{ {
return wait_sync_reply(ctx, msg, replytype, return wait_sync_reply(ctx, msg, replytype,
GOSSIP_FD, &peer->from_gossipd, "gossipd"); GOSSIP_FD, peer->from_gossipd, "gossipd");
} }
static u8 *foreign_channel_update(const tal_t *ctx, static u8 *foreign_channel_update(const tal_t *ctx,
@ -2704,8 +2704,8 @@ int main(int argc, char *argv[])
peer->have_sigs[LOCAL] = peer->have_sigs[REMOTE] = false; peer->have_sigs[LOCAL] = peer->have_sigs[REMOTE] = false;
peer->announce_depth_reached = false; peer->announce_depth_reached = false;
peer->channel_local_active = false; peer->channel_local_active = false;
msg_queue_init(&peer->from_master, peer); peer->from_master = msg_queue_new(peer);
msg_queue_init(&peer->from_gossipd, peer); peer->from_gossipd = msg_queue_new(peer);
peer->next_commit_sigs = NULL; peer->next_commit_sigs = NULL;
peer->shutdown_sent[LOCAL] = false; peer->shutdown_sent[LOCAL] = false;
peer->last_update_timestamp = 0; peer->last_update_timestamp = 0;
@ -2745,7 +2745,7 @@ int main(int argc, char *argv[])
clean_tmpctx(); clean_tmpctx();
/* For simplicity, we process one event at a time. */ /* For simplicity, we process one event at a time. */
msg = msg_dequeue(&peer->from_master); msg = msg_dequeue(peer->from_master);
if (msg) { if (msg) {
status_trace("Now dealing with deferred %s", status_trace("Now dealing with deferred %s",
channel_wire_type_name( channel_wire_type_name(
@ -2761,7 +2761,7 @@ int main(int argc, char *argv[])
continue; continue;
} }
msg = msg_dequeue(&peer->from_gossipd); msg = msg_dequeue(peer->from_gossipd);
if (msg) { if (msg) {
status_trace("Now dealing with deferred gossip %u", status_trace("Now dealing with deferred gossip %u",
fromwire_peektype(msg)); fromwire_peektype(msg));

View File

@ -10,7 +10,7 @@ struct daemon_conn {
u8 *msg_in; u8 *msg_in;
/* Queue of outgoing messages */ /* Queue of outgoing messages */
struct msg_queue out; struct msg_queue *out;
/* Underlying connection: we're freed if it closes, and vice versa */ /* Underlying connection: we're freed if it closes, and vice versa */
struct io_conn *conn; struct io_conn *conn;
@ -47,7 +47,7 @@ static struct io_plan *daemon_conn_write_next(struct io_conn *conn,
const u8 *msg; const u8 *msg;
again: again:
msg = msg_dequeue(&dc->out); msg = msg_dequeue(dc->out);
if (msg) { if (msg) {
int fd = msg_extract_fd(msg); int fd = msg_extract_fd(msg);
if (fd >= 0) { if (fd >= 0) {
@ -61,7 +61,7 @@ again:
if (dc->outq_empty(dc->arg)) if (dc->outq_empty(dc->arg))
goto again; goto again;
} }
return msg_queue_wait(conn, &dc->out, daemon_conn_write_next, dc); return msg_queue_wait(conn, dc->out, daemon_conn_write_next, dc);
} }
bool daemon_conn_sync_flush(struct daemon_conn *dc) bool daemon_conn_sync_flush(struct daemon_conn *dc)
@ -79,7 +79,7 @@ bool daemon_conn_sync_flush(struct daemon_conn *dc)
return false; return false;
/* Flush existing messages. */ /* Flush existing messages. */
while ((msg = msg_dequeue(&dc->out)) != NULL) { while ((msg = msg_dequeue(dc->out)) != NULL) {
int fd = msg_extract_fd(msg); int fd = msg_extract_fd(msg);
if (fd >= 0) { if (fd >= 0) {
tal_free(msg); tal_free(msg);
@ -123,7 +123,7 @@ struct daemon_conn *daemon_conn_new_(const tal_t *ctx, int fd,
dc->outq_empty = outq_empty; dc->outq_empty = outq_empty;
dc->arg = arg; dc->arg = arg;
dc->msg_in = NULL; dc->msg_in = NULL;
msg_queue_init(&dc->out, dc); dc->out = msg_queue_new(dc);
dc->conn = io_new_conn(dc, fd, daemon_conn_start, dc); dc->conn = io_new_conn(dc, fd, daemon_conn_start, dc);
tal_add_destructor2(dc->conn, destroy_dc_from_conn, dc); tal_add_destructor2(dc->conn, destroy_dc_from_conn, dc);
@ -132,15 +132,15 @@ struct daemon_conn *daemon_conn_new_(const tal_t *ctx, int fd,
void daemon_conn_send(struct daemon_conn *dc, const u8 *msg) void daemon_conn_send(struct daemon_conn *dc, const u8 *msg)
{ {
msg_enqueue(&dc->out, msg); msg_enqueue(dc->out, msg);
} }
void daemon_conn_send_fd(struct daemon_conn *dc, int fd) void daemon_conn_send_fd(struct daemon_conn *dc, int fd)
{ {
msg_enqueue_fd(&dc->out, fd); msg_enqueue_fd(dc->out, fd);
} }
void daemon_conn_wake(struct daemon_conn *dc) void daemon_conn_wake(struct daemon_conn *dc)
{ {
msg_wake(&dc->out); msg_wake(dc->out);
} }

View File

@ -1,18 +1,22 @@
#include <assert.h> #include <assert.h>
#include <ccan/take/take.h>
#include <common/msg_queue.h> #include <common/msg_queue.h>
#include <common/utils.h> #include <common/utils.h>
#include <wire/wire.h> #include <wire/wire.h>
void msg_queue_init(struct msg_queue *q, const tal_t *ctx) struct msg_queue {
const u8 **q;
};
struct msg_queue *msg_queue_new(const tal_t *ctx)
{ {
q->q = tal_arr(ctx, const u8 *, 0); struct msg_queue *q = tal(ctx, struct msg_queue);
q->ctx = ctx; q->q = tal_arr(q, const u8 *, 0);
return q;
} }
static void do_enqueue(struct msg_queue *q, const u8 *add) static void do_enqueue(struct msg_queue *q, const u8 *add TAKES)
{ {
*tal_arr_expand(&q->q) = tal_dup_arr(q->ctx, u8, add, tal_count(add), 0); *tal_arr_expand(&q->q) = tal_dup_arr(q, u8, add, tal_count(add), 0);
/* In case someone is waiting */ /* In case someone is waiting */
io_wake(q); io_wake(q);
@ -26,7 +30,7 @@ void msg_enqueue(struct msg_queue *q, const u8 *add)
void msg_enqueue_fd(struct msg_queue *q, int fd) void msg_enqueue_fd(struct msg_queue *q, int fd)
{ {
u8 *fdmsg = tal_arr(q->ctx, u8, 0); u8 *fdmsg = tal_arr(q, u8, 0);
towire_u16(&fdmsg, MSG_PASS_FD); towire_u16(&fdmsg, MSG_PASS_FD);
towire_u32(&fdmsg, fd); towire_u32(&fdmsg, fd);
do_enqueue(q, take(fdmsg)); do_enqueue(q, take(fdmsg));

View File

@ -4,19 +4,16 @@
#include "config.h" #include "config.h"
#include <ccan/io/io.h> #include <ccan/io/io.h>
#include <ccan/short_types/short_types.h> #include <ccan/short_types/short_types.h>
#include <ccan/take/take.h>
/* Reserved type used to indicate we're actually passing an fd. */ /* Reserved type used to indicate we're actually passing an fd. */
#define MSG_PASS_FD 0xFFFF #define MSG_PASS_FD 0xFFFF
struct msg_queue { /* Allocate a new msg queue. */
const u8 **q; struct msg_queue *msg_queue_new(const tal_t *ctx);
const tal_t *ctx;
};
void msg_queue_init(struct msg_queue *q, const tal_t *ctx);
/* If add is taken(), freed after sending. msg_wake() implied. */ /* If add is taken(), freed after sending. msg_wake() implied. */
void msg_enqueue(struct msg_queue *q, const u8 *add); void msg_enqueue(struct msg_queue *q, const u8 *add TAKES);
/* Fd is closed after sending. msg_wake() implied. */ /* Fd is closed after sending. msg_wake() implied. */
void msg_enqueue_fd(struct msg_queue *q, int fd); void msg_enqueue_fd(struct msg_queue *q, int fd);

View File

@ -577,12 +577,12 @@ static void destroy_subd(struct subd *sd)
static struct io_plan *msg_send_next(struct io_conn *conn, struct subd *sd) static struct io_plan *msg_send_next(struct io_conn *conn, struct subd *sd)
{ {
const u8 *msg = msg_dequeue(&sd->outq); const u8 *msg = msg_dequeue(sd->outq);
int fd; int fd;
/* Nothing to do? Wait for msg_enqueue. */ /* Nothing to do? Wait for msg_enqueue. */
if (!msg) if (!msg)
return msg_queue_wait(conn, &sd->outq, msg_send_next, sd); return msg_queue_wait(conn, sd->outq, msg_send_next, sd);
fd = msg_extract_fd(msg); fd = msg_extract_fd(msg);
if (fd >= 0) { if (fd >= 0) {
@ -655,7 +655,7 @@ static struct subd *new_subd(struct lightningd *ld,
sd->errcb = errcb; sd->errcb = errcb;
sd->billboardcb = billboardcb; sd->billboardcb = billboardcb;
sd->fds_in = NULL; sd->fds_in = NULL;
msg_queue_init(&sd->outq, sd); sd->outq = msg_queue_new(sd);
tal_add_destructor(sd, destroy_subd); tal_add_destructor(sd, destroy_subd);
list_head_init(&sd->reqs); list_head_init(&sd->reqs);
sd->channel = channel; sd->channel = channel;
@ -723,12 +723,12 @@ void subd_send_msg(struct subd *sd, const u8 *msg_out)
/* FIXME: We should use unique upper bits for each daemon, then /* FIXME: We should use unique upper bits for each daemon, then
* have generate-wire.py add them, just assert here. */ * have generate-wire.py add them, just assert here. */
assert(!strstarts(sd->msgname(fromwire_peektype(msg_out)), "INVALID")); assert(!strstarts(sd->msgname(fromwire_peektype(msg_out)), "INVALID"));
msg_enqueue(&sd->outq, msg_out); msg_enqueue(sd->outq, msg_out);
} }
void subd_send_fd(struct subd *sd, int fd) void subd_send_fd(struct subd *sd, int fd)
{ {
msg_enqueue_fd(&sd->outq, fd); msg_enqueue_fd(sd->outq, fd);
} }
void subd_req_(const tal_t *ctx, void subd_req_(const tal_t *ctx,

View File

@ -65,7 +65,7 @@ struct subd {
bool talks_to_peer; bool talks_to_peer;
/* Messages queue up here. */ /* Messages queue up here. */
struct msg_queue outq; struct msg_queue *outq;
/* Callbacks for replies. */ /* Callbacks for replies. */
struct list_head reqs; struct list_head reqs;