diff --git a/daemon/lightningd.c b/daemon/lightningd.c index dd73facfa..abde82c70 100644 --- a/daemon/lightningd.c +++ b/daemon/lightningd.c @@ -371,6 +371,7 @@ static struct lightningd_state *lightningd_state(void) dstate->reexec = NULL; dstate->external_ip = NULL; dstate->announce = NULL; + list_head_init(&dstate->broadcast_queue); return dstate; } diff --git a/daemon/lightningd.h b/daemon/lightningd.h index 52975afed..5c327b7ac 100644 --- a/daemon/lightningd.h +++ b/daemon/lightningd.h @@ -147,5 +147,8 @@ struct lightningd_state { /* Announce timer. */ struct oneshot *announce; + + /* Outgoing messages queued for the staggered broadcast */ + struct list_head broadcast_queue; }; #endif /* LIGHTNING_DAEMON_LIGHTNING_H */ diff --git a/daemon/p2p_announce.c b/daemon/p2p_announce.c index a384c608f..d48446eba 100644 --- a/daemon/p2p_announce.c +++ b/daemon/p2p_announce.c @@ -12,6 +12,24 @@ #include #include +struct queued_message { + int type; + + /* Unique tag specifying the msg origin */ + void *tag; + + /* Timestamp for `channel_update`s and `node_announcement`s, 0 + * for `channel_announcement`s */ + u32 timestamp; + + /* Serialized payload */ + u8 *payload; + + struct list_node list; + + /* who told us about this message? */ + struct peer *origin; +}; u8 ipv4prefix[] = { 0x00, 0x00, 0x00, 0x00, @@ -59,6 +77,38 @@ static void broadcast(struct lightningd_state *dstate, } } +static void queue_broadcast(struct lightningd_state *dstate, + const int type, + const u32 timestamp, + const u8 *tag, + const u8 *payload, + struct peer *origin) +{ + struct queued_message *el, *msg; + list_for_each(&dstate->broadcast_queue, el, list) { + if (el->type == type && + tal_count(tag) == tal_count(el->tag) && + memcmp(el->tag, tag, tal_count(tag)) == 0 && + el->timestamp < timestamp){ + /* Found a replacement */ + el->payload = tal_free(el->payload); + el->payload = tal_dup_arr(el, u8, payload, tal_count(payload), 0); + el->timestamp = timestamp; + el->origin = origin; + return; + } + } + + /* No match found, add a new message to the queue */ + msg = tal(dstate, struct queued_message); + msg->type = type; + msg->timestamp = timestamp; + msg->tag = tal_dup_arr(msg, u8, tag, tal_count(tag), 0); + msg->payload = tal_dup_arr(msg, u8, payload, tal_count(payload), 0); + msg->origin = origin; + list_add_tail(&dstate->broadcast_queue, &msg->list); +} + static bool add_channel_direction(struct lightningd_state *dstate, const struct pubkey *from, const struct pubkey *to, @@ -110,7 +160,14 @@ void handle_channel_announcement( } serialized = towire_channel_announcement(msg, msg); - broadcast(peer->dstate, WIRE_CHANNEL_ANNOUNCEMENT, serialized, peer); + + u8 *tag = tal_arr(msg, u8, 0); + towire_channel_id(&tag, &msg->channel_id); + queue_broadcast(peer->dstate, + WIRE_CHANNEL_ANNOUNCEMENT, + 0, /* `channel_announcement`s do not have a timestamp */ + tag, + serialized, peer); tal_free(msg); } @@ -159,7 +216,13 @@ void handle_channel_update(struct peer *peer, const struct msg_channel_update *m msg->flags ); - broadcast(peer->dstate, WIRE_CHANNEL_UPDATE, serialized, peer); + u8 *tag = tal_arr(msg, u8, 0); + towire_channel_id(&tag, &msg->channel_id); + queue_broadcast(peer->dstate, + WIRE_CHANNEL_UPDATE, + msg->timestamp, + tag, + serialized, peer); tal_free(msg); } @@ -203,7 +266,13 @@ void handle_node_announcement( node->port = msg->port; memcpy(node->rgb_color, msg->rgb_color, 3); - broadcast(peer->dstate, WIRE_NODE_ANNOUNCEMENT, serialized, peer); + u8 *tag = tal_arr(msg, u8, 0); + towire_pubkey(&tag, &msg->node_id); + queue_broadcast(peer->dstate, + WIRE_NODE_ANNOUNCEMENT, + msg->timestamp, + tag, + serialized, peer); tal_free(msg); } @@ -335,9 +404,21 @@ void announce_channel(struct lightningd_state *dstate, struct peer *peer) broadcast_channel_announcement(dstate, peer); broadcast_channel_update(dstate, peer); broadcast_node_announcement(dstate); + +} + +static void process_broadcast_queue(struct lightningd_state *dstate) +{ + new_reltimer(dstate, dstate, time_from_sec(30), process_broadcast_queue, dstate); + struct queued_message *el; + while ((el = list_pop(&dstate->broadcast_queue, struct queued_message, list)) != NULL) { + broadcast(dstate, el->type, el->payload, NULL); + tal_free(el); + } } void setup_p2p_announce(struct lightningd_state *dstate) { new_reltimer(dstate, dstate, time_from_sec(30), announce, dstate); + new_reltimer(dstate, dstate, time_from_sec(30), process_broadcast_queue, dstate); }