gossip: Implemented the staggered broadcast

This commit is contained in:
Christian Decker 2016-12-14 18:59:08 +01:00 committed by Rusty Russell
parent fca9bf1a16
commit 611f4833c5
3 changed files with 88 additions and 3 deletions

View File

@ -371,6 +371,7 @@ static struct lightningd_state *lightningd_state(void)
dstate->reexec = NULL;
dstate->external_ip = NULL;
dstate->announce = NULL;
list_head_init(&dstate->broadcast_queue);
return dstate;
}

View File

@ -147,5 +147,8 @@ struct lightningd_state {
/* Announce timer. */
struct oneshot *announce;
/* Outgoing messages queued for the staggered broadcast */
struct list_head broadcast_queue;
};
#endif /* LIGHTNING_DAEMON_LIGHTNING_H */

View File

@ -12,6 +12,24 @@
#include <ccan/tal/tal.h>
#include <secp256k1.h>
struct queued_message {
int type;
/* Unique tag specifying the msg origin */
void *tag;
/* Timestamp for `channel_update`s and `node_announcement`s, 0
* for `channel_announcement`s */
u32 timestamp;
/* Serialized payload */
u8 *payload;
struct list_node list;
/* who told us about this message? */
struct peer *origin;
};
u8 ipv4prefix[] = {
0x00, 0x00, 0x00, 0x00,
@ -59,6 +77,38 @@ static void broadcast(struct lightningd_state *dstate,
}
}
static void queue_broadcast(struct lightningd_state *dstate,
const int type,
const u32 timestamp,
const u8 *tag,
const u8 *payload,
struct peer *origin)
{
struct queued_message *el, *msg;
list_for_each(&dstate->broadcast_queue, el, list) {
if (el->type == type &&
tal_count(tag) == tal_count(el->tag) &&
memcmp(el->tag, tag, tal_count(tag)) == 0 &&
el->timestamp < timestamp){
/* Found a replacement */
el->payload = tal_free(el->payload);
el->payload = tal_dup_arr(el, u8, payload, tal_count(payload), 0);
el->timestamp = timestamp;
el->origin = origin;
return;
}
}
/* No match found, add a new message to the queue */
msg = tal(dstate, struct queued_message);
msg->type = type;
msg->timestamp = timestamp;
msg->tag = tal_dup_arr(msg, u8, tag, tal_count(tag), 0);
msg->payload = tal_dup_arr(msg, u8, payload, tal_count(payload), 0);
msg->origin = origin;
list_add_tail(&dstate->broadcast_queue, &msg->list);
}
static bool add_channel_direction(struct lightningd_state *dstate,
const struct pubkey *from,
const struct pubkey *to,
@ -110,7 +160,14 @@ void handle_channel_announcement(
}
serialized = towire_channel_announcement(msg, msg);
broadcast(peer->dstate, WIRE_CHANNEL_ANNOUNCEMENT, serialized, peer);
u8 *tag = tal_arr(msg, u8, 0);
towire_channel_id(&tag, &msg->channel_id);
queue_broadcast(peer->dstate,
WIRE_CHANNEL_ANNOUNCEMENT,
0, /* `channel_announcement`s do not have a timestamp */
tag,
serialized, peer);
tal_free(msg);
}
@ -159,7 +216,13 @@ void handle_channel_update(struct peer *peer, const struct msg_channel_update *m
msg->flags
);
broadcast(peer->dstate, WIRE_CHANNEL_UPDATE, serialized, peer);
u8 *tag = tal_arr(msg, u8, 0);
towire_channel_id(&tag, &msg->channel_id);
queue_broadcast(peer->dstate,
WIRE_CHANNEL_UPDATE,
msg->timestamp,
tag,
serialized, peer);
tal_free(msg);
}
@ -203,7 +266,13 @@ void handle_node_announcement(
node->port = msg->port;
memcpy(node->rgb_color, msg->rgb_color, 3);
broadcast(peer->dstate, WIRE_NODE_ANNOUNCEMENT, serialized, peer);
u8 *tag = tal_arr(msg, u8, 0);
towire_pubkey(&tag, &msg->node_id);
queue_broadcast(peer->dstate,
WIRE_NODE_ANNOUNCEMENT,
msg->timestamp,
tag,
serialized, peer);
tal_free(msg);
}
@ -335,9 +404,21 @@ void announce_channel(struct lightningd_state *dstate, struct peer *peer)
broadcast_channel_announcement(dstate, peer);
broadcast_channel_update(dstate, peer);
broadcast_node_announcement(dstate);
}
static void process_broadcast_queue(struct lightningd_state *dstate)
{
new_reltimer(dstate, dstate, time_from_sec(30), process_broadcast_queue, dstate);
struct queued_message *el;
while ((el = list_pop(&dstate->broadcast_queue, struct queued_message, list)) != NULL) {
broadcast(dstate, el->type, el->payload, NULL);
tal_free(el);
}
}
void setup_p2p_announce(struct lightningd_state *dstate)
{
new_reltimer(dstate, dstate, time_from_sec(30), announce, dstate);
new_reltimer(dstate, dstate, time_from_sec(30), process_broadcast_queue, dstate);
}