gossipd: add timestamp in gossip store header.

(We don't increment the gossip_store version, since there are only a
few commits since the last time we did this).

This lets the reader simply filter messages; this is especially nice since
the channel_announcement timestamp is *derived*, not in the actual message.

This also creates a 'struct gossip_hdr' which makes the code a bit
clearer.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell 2019-06-04 03:47:25 +09:30
parent bad9734dc7
commit 948490ec58
9 changed files with 81 additions and 57 deletions

View file

@ -1,6 +1,5 @@
#include <assert.h>
#include <ccan/crc/crc.h>
#include <ccan/endian/endian.h>
#include <common/gossip_store.h>
#include <common/per_peer_state.h>
#include <common/status.h>
@ -19,26 +18,26 @@ u8 *gossip_store_next(const tal_t *ctx, struct per_peer_state *pps)
return NULL;
while (!msg) {
beint32_t hdr[2];
struct gossip_hdr hdr;
u32 msglen, checksum;
int type;
if (read(pps->gossip_store_fd, hdr, sizeof(hdr)) != sizeof(hdr)) {
if (read(pps->gossip_store_fd, &hdr, sizeof(hdr)) != sizeof(hdr)) {
per_peer_state_reset_gossip_timer(pps);
return NULL;
}
/* Skip any deleted entries. */
if (be32_to_cpu(hdr[0]) & GOSSIP_STORE_LEN_DELETED_BIT) {
if (be32_to_cpu(hdr.len) & GOSSIP_STORE_LEN_DELETED_BIT) {
/* Skip over it. */
lseek(pps->gossip_store_fd,
be32_to_cpu(hdr[0]) & ~GOSSIP_STORE_LEN_DELETED_BIT,
be32_to_cpu(hdr.len) & ~GOSSIP_STORE_LEN_DELETED_BIT,
SEEK_CUR);
continue;
}
msglen = be32_to_cpu(hdr[0]);
checksum = be32_to_cpu(hdr[1]);
msglen = be32_to_cpu(hdr.len);
checksum = be32_to_cpu(hdr.crc);
msg = tal_arr(ctx, u8, msglen);
if (read(pps->gossip_store_fd, msg, msglen) != msglen)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
@ -48,7 +47,7 @@ u8 *gossip_store_next(const tal_t *ctx, struct per_peer_state *pps)
(s64)lseek(pps->gossip_store_fd,
0, SEEK_CUR));
if (checksum != crc32c(0, msg, msglen))
if (checksum != crc32c(be32_to_cpu(hdr.timestamp), msg, msglen))
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"gossip_store: bad checksum offset %"
PRIi64": %s",
@ -93,16 +92,16 @@ void gossip_store_switch_fd(struct per_peer_state *pps,
cur = 1;
while (cur < target) {
u32 msglen;
beint32_t hdr[2];
struct gossip_hdr hdr;
if (read(newfd, hdr, sizeof(hdr)) != sizeof(hdr))
if (read(newfd, &hdr, sizeof(hdr)) != sizeof(hdr))
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"gossip_store: "
"can't read hdr offset %"PRIu64
" in new store target %"PRIu64,
cur, target);
/* Skip over it. */
msglen = (be32_to_cpu(hdr[0])
msglen = (be32_to_cpu(hdr.len)
& ~GOSSIP_STORE_LEN_DELETED_BIT);
cur = lseek(newfd, msglen, SEEK_CUR);
num++;

View file

@ -1,6 +1,7 @@
#ifndef LIGHTNING_COMMON_GOSSIP_STORE_H
#define LIGHTNING_COMMON_GOSSIP_STORE_H
#include "config.h"
#include <ccan/endian/endian.h>
#include <ccan/short_types/short_types.h>
#include <ccan/tal/tal.h>
@ -16,6 +17,15 @@ struct per_peer_state;
*/
#define GOSSIP_STORE_LEN_DELETED_BIT 0x80000000U
/**
* gossip_hdr -- On-disk format header.
*/
struct gossip_hdr {
beint32_t len; /* Length of message after header. */
beint32_t crc; /* crc of message of timestamp, after header. */
beint32_t timestamp; /* timestamp of msg. */
};
/**
* Direct store accessor: loads gossip msg from store.
*

View file

@ -17,7 +17,7 @@ int main(int argc, char *argv[])
{
int fd;
u8 version;
beint32_t belen, becsum;
struct gossip_hdr hdr;
size_t off;
setup_locale();
@ -42,10 +42,9 @@ int main(int argc, char *argv[])
printf("GOSSIP VERSION %u\n", version);
off = 1;
while (read(fd, &belen, sizeof(belen)) == sizeof(belen) &&
read(fd, &becsum, sizeof(becsum)) == sizeof(becsum)) {
while (read(fd, &hdr, sizeof(hdr)) == sizeof(hdr)) {
struct amount_sat sat;
u32 msglen = be32_to_cpu(belen);
u32 msglen = be32_to_cpu(hdr.len);
u8 *msg, *inner;
bool deleted = (msglen & GOSSIP_STORE_LEN_DELETED_BIT);
@ -54,7 +53,8 @@ int main(int argc, char *argv[])
if (read(fd, msg, msglen) != msglen)
errx(1, "%zu: Truncated file?", off);
if (be32_to_cpu(becsum) != crc32c(0, msg, msglen))
if (be32_to_cpu(hdr.crc)
!= crc32c(be32_to_cpu(hdr.timestamp), msg, msglen))
warnx("Checksum verification failed");
if (deleted) {
@ -63,14 +63,17 @@ int main(int argc, char *argv[])
printf("%zu: channel_amount: %s\n", off,
type_to_string(tmpctx, struct amount_sat, &sat));
} else if (fromwire_peektype(msg) == WIRE_CHANNEL_ANNOUNCEMENT) {
printf("%zu: channel_announcement: %s\n",
off, tal_hex(msg, msg));
printf("%zu: t=%u channel_announcement: %s\n",
off, be32_to_cpu(hdr.timestamp),
tal_hex(msg, msg));
} else if (fromwire_peektype(msg) == WIRE_CHANNEL_UPDATE) {
printf("%zu: channel_update: %s\n",
off, tal_hex(msg, msg));
printf("%zu: t=%u channel_update: %s\n",
off, be32_to_cpu(hdr.timestamp),
tal_hex(msg, msg));
} else if (fromwire_peektype(msg) == WIRE_NODE_ANNOUNCEMENT) {
printf("%zu: node_announcement: %s\n",
off, tal_hex(msg, msg));
printf("%zu: t=%u node_announcement: %s\n",
off, be32_to_cpu(hdr.timestamp),
tal_hex(msg, msg));
} else if (fromwire_peektype(msg) == WIRE_GOSSIPD_LOCAL_ADD_CHANNEL) {
printf("%zu: local_add_channel: %s\n",
off, tal_hex(msg, msg));
@ -82,7 +85,7 @@ int main(int argc, char *argv[])
warnx("%zu: Unknown message %u",
off, fromwire_peektype(msg));
}
off += sizeof(belen) + sizeof(becsum) + msglen;
off += sizeof(hdr) + msglen;
tal_free(msg);
}
return 0;

View file

@ -68,11 +68,13 @@ void insert_broadcast(struct broadcast_state **bstate,
{
u32 offset;
assert(bcast->timestamp);
/* If we're loading from the store, we already have index */
if (!bcast->index) {
u64 idx;
bcast->index = idx = gossip_store_add((*bstate)->gs, msg,
bcast->timestamp,
addendum);
if (!idx)
status_failed(STATUS_FAIL_INTERNAL_ERROR,

View file

@ -36,11 +36,15 @@ struct broadcast_state *new_broadcast_state(struct routing_state *rstate,
struct gossip_store *gs,
struct list_head *peers);
/* Append a queued message for broadcast. Must be explicitly deleted.
* Also adds it to the gossip store.
/**
* Append a queued message for broadcast.
* @bstate: the broadcast state, will be replaced if we compact the store.
* @msg: the message to append.
* @addendum: an extra message (for GOSSIP_STORE_CHANNEL_AMOUNT after announce)
* @bcast: broadcast location.
*
* If it's a channel_announcement, caller sets addendum to the
* WIRE_GOSSIP_STORE_CHANNEL_AMOUNT to immediately follow the announcement.
* If @bcast.index is 0, it is written into the store and set.
* @bcast.timestamp must be set, and non-zero.
*/
void insert_broadcast(struct broadcast_state **bstate,
const u8 *msg,

View file

@ -50,21 +50,22 @@ static void gossip_store_destroy(struct gossip_store *gs)
close(gs->fd);
}
static bool append_msg(int fd, const u8 *msg, u64 *len)
static bool append_msg(int fd, const u8 *msg, u32 timestamp, u64 *len)
{
beint32_t hdr[2];
struct gossip_hdr hdr;
u32 msglen;
struct iovec iov[2];
msglen = tal_count(msg);
hdr[0] = cpu_to_be32(msglen);
hdr[1] = cpu_to_be32(crc32c(0, msg, msglen));
hdr.len = cpu_to_be32(msglen);
hdr.crc = cpu_to_be32(crc32c(timestamp, msg, msglen));
hdr.timestamp = cpu_to_be32(timestamp);
if (len)
*len += sizeof(hdr) + msglen;
/* Use writev so it will appear in store atomically */
iov[0].iov_base = hdr;
iov[0].iov_base = &hdr;
iov[0].iov_len = sizeof(hdr);
iov[1].iov_base = (void *)msg;
iov[1].iov_len = msglen;
@ -110,21 +111,21 @@ struct gossip_store *gossip_store_new(struct routing_state *rstate)
static size_t transfer_store_msg(int from_fd, size_t from_off, int to_fd,
int *type)
{
beint32_t hdr[2];
struct gossip_hdr hdr;
u32 msglen;
u8 *msg;
const u8 *p;
size_t tmplen;
*type = -1;
if (pread(from_fd, hdr, sizeof(hdr), from_off) != sizeof(hdr)) {
if (pread(from_fd, &hdr, sizeof(hdr), from_off) != sizeof(hdr)) {
status_broken("Failed reading header from to gossip store @%zu"
": %s",
from_off, strerror(errno));
return 0;
}
msglen = be32_to_cpu(hdr[0]);
msglen = be32_to_cpu(hdr.len);
if (msglen & GOSSIP_STORE_LEN_DELETED_BIT) {
status_broken("Can't transfer deleted msg from gossip store @%zu",
from_off);
@ -133,7 +134,7 @@ static size_t transfer_store_msg(int from_fd, size_t from_off, int to_fd,
/* FIXME: Reuse buffer? */
msg = tal_arr(tmpctx, u8, sizeof(hdr) + msglen);
memcpy(msg, hdr, sizeof(hdr));
memcpy(msg, &hdr, sizeof(hdr));
if (pread(from_fd, msg + sizeof(hdr), msglen, from_off + sizeof(hdr))
!= msglen) {
status_broken("Failed reading %u from to gossip store @%zu"
@ -181,7 +182,7 @@ static bool add_local_unnannounced(int in_fd, int out_fd,
msg = towire_gossipd_local_add_channel(tmpctx, &c->scid,
&peer->id, c->sat);
if (!append_msg(out_fd, msg, len))
if (!append_msg(out_fd, msg, 0, len))
return false;
for (size_t i = 0; i < 2; i++) {
@ -340,6 +341,7 @@ bool gossip_store_maybe_compact(struct gossip_store *gs,
}
u64 gossip_store_add(struct gossip_store *gs, const u8 *gossip_msg,
u32 timestamp,
const u8 *addendum)
{
u64 off = gs->len;
@ -347,12 +349,12 @@ u64 gossip_store_add(struct gossip_store *gs, const u8 *gossip_msg,
/* Should never get here during loading! */
assert(gs->writable);
if (!append_msg(gs->fd, gossip_msg, &gs->len)) {
if (!append_msg(gs->fd, gossip_msg, timestamp, &gs->len)) {
status_broken("Failed writing to gossip store: %s",
strerror(errno));
return 0;
}
if (addendum && !append_msg(gs->fd, addendum, &gs->len)) {
if (addendum && !append_msg(gs->fd, addendum, 0, &gs->len)) {
status_broken("Failed writing addendum to gossip store: %s",
strerror(errno));
return 0;
@ -367,7 +369,7 @@ u64 gossip_store_add_private_update(struct gossip_store *gs, const u8 *update)
/* A local update for an unannounced channel: not broadcastable, but
* otherwise the same as a normal channel_update */
const u8 *pupdate = towire_gossip_store_private_update(tmpctx, update);
return gossip_store_add(gs, pupdate, NULL);
return gossip_store_add(gs, pupdate, 0, NULL);
}
void gossip_store_delete(struct gossip_store *gs,
@ -416,7 +418,7 @@ const u8 *gossip_store_get(const tal_t *ctx,
struct gossip_store *gs,
u64 offset)
{
beint32_t hdr[2];
struct gossip_hdr hdr;
u32 msglen, checksum;
u8 *msg;
@ -424,7 +426,7 @@ const u8 *gossip_store_get(const tal_t *ctx,
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"gossip_store: can't access offset %"PRIu64,
offset);
if (pread(gs->fd, hdr, sizeof(hdr), offset) != sizeof(hdr)) {
if (pread(gs->fd, &hdr, sizeof(hdr), offset) != sizeof(hdr)) {
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"gossip_store: can't read hdr offset %"PRIu64
"/%"PRIu64": %s",
@ -432,15 +434,15 @@ const u8 *gossip_store_get(const tal_t *ctx,
}
/* FIXME: We should skip over these deleted entries! */
msglen = be32_to_cpu(hdr[0]) & ~GOSSIP_STORE_LEN_DELETED_BIT;
checksum = be32_to_cpu(hdr[1]);
msglen = be32_to_cpu(hdr.len) & ~GOSSIP_STORE_LEN_DELETED_BIT;
checksum = be32_to_cpu(hdr.crc);
msg = tal_arr(ctx, u8, msglen);
if (pread(gs->fd, msg, msglen, offset + sizeof(hdr)) != msglen)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"gossip_store: can't read len %u offset %"PRIu64
"/%"PRIu64, msglen, offset, gs->len);
if (checksum != crc32c(0, msg, msglen))
if (checksum != crc32c(be32_to_cpu(hdr.timestamp), msg, msglen))
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"gossip_store: bad checksum offset %"PRIu64": %s",
offset, tal_hex(tmpctx, msg));
@ -476,7 +478,7 @@ int gossip_store_readonly_fd(struct gossip_store *gs)
void gossip_store_load(struct routing_state *rstate, struct gossip_store *gs)
{
beint32_t hdr[2];
struct gossip_hdr hdr;
u32 msglen, checksum;
u8 *msg;
struct amount_sat satoshis;
@ -487,9 +489,9 @@ void gossip_store_load(struct routing_state *rstate, struct gossip_store *gs)
u64 chan_ann_off = 0; /* Spurious gcc-9 (Ubuntu 9-20190402-1ubuntu1) 9.0.1 20190402 (experimental) warning */
gs->writable = false;
while (pread(gs->fd, hdr, sizeof(hdr), gs->len) == sizeof(hdr)) {
msglen = be32_to_cpu(hdr[0]) & ~GOSSIP_STORE_LEN_DELETED_BIT;
checksum = be32_to_cpu(hdr[1]);
while (pread(gs->fd, &hdr, sizeof(hdr), gs->len) == sizeof(hdr)) {
msglen = be32_to_cpu(hdr.len) & ~GOSSIP_STORE_LEN_DELETED_BIT;
checksum = be32_to_cpu(hdr.crc);
msg = tal_arr(tmpctx, u8, msglen);
if (pread(gs->fd, msg, msglen, gs->len+sizeof(hdr)) != msglen) {
@ -497,13 +499,13 @@ void gossip_store_load(struct routing_state *rstate, struct gossip_store *gs)
goto truncate_nomsg;
}
if (checksum != crc32c(0, msg, msglen)) {
if (checksum != crc32c(be32_to_cpu(hdr.timestamp), msg, msglen)) {
bad = "Checksum verification failed";
goto truncate;
}
/* Skip deleted entries */
if (be32_to_cpu(hdr[0]) & GOSSIP_STORE_LEN_DELETED_BIT)
if (be32_to_cpu(hdr.len) & GOSSIP_STORE_LEN_DELETED_BIT)
goto next;
switch (fromwire_peektype(msg)) {

View file

@ -35,7 +35,7 @@ u64 gossip_store_add_private_update(struct gossip_store *gs, const u8 *update);
* Add a gossip message to the gossip_store (and optional addendum)
*/
u64 gossip_store_add(struct gossip_store *gs, const u8 *gossip_msg,
const u8 *addendum);
u32 timestamp, const u8 *addendum);
/**

View file

@ -2517,7 +2517,7 @@ bool handle_local_add_channel(struct routing_state *rstate,
/* Create new (unannounced) channel */
chan = new_chan(rstate, &scid, &rstate->local_id, &remote_node_id, sat);
if (!index)
index = gossip_store_add(rstate->broadcasts->gs, msg, NULL);
index = gossip_store_add(rstate->broadcasts->gs, msg, 0, NULL);
chan->bcast.index = index;
return true;
}

View file

@ -865,25 +865,29 @@ def test_gossip_store_load(node_factory):
with open(os.path.join(l1.daemon.lightning_dir, 'gossip_store'), 'wb') as f:
f.write(bytearray.fromhex("05" # GOSSIP_STORE_VERSION
"000001b0" # len
"697dac9f" # csum
"dc5bef89" # csum
"5b8d9b44" # timestamp
"0100" # WIRE_CHANNEL_ANNOUNCEMENT
"bb8d7b6998cca3c2b3ce12a6bd73a8872c808bb48de2a30c5ad9cdf835905d1e27505755087e675fb517bbac6beb227629b694ea68f49d357458327138978ebfd7adfde1c69d0d2f497154256f6d5567a5cf2317c589e0046c0cc2b3e986cf9b6d3b44742bd57bce32d72cd1180a7f657795976130b20508b239976d3d4cdc4d0d6e6fbb9ab6471f664a662972e406f519eab8bce87a8c0365646df5acbc04c91540b4c7c518cec680a4a6af14dae1aca0fd5525220f7f0e96fcd2adef3c803ac9427fe71034b55a50536638820ef21903d09ccddd38396675b598587fa886ca711415c813fc6d69f46552b9a0a539c18f265debd0e2e286980a118ba349c216000043497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea33090000000013a63c0000b50001021bf3de4e84e3d52f9a3e36fbdcd2c4e8dbf203b9ce4fc07c2f03be6c21d0c67503f113414ebdc6c1fb0f33c99cd5a1d09dd79e7fdf2468cf1fe1af6674361695d203801fd8ab98032f11cc9e4916dd940417082727077609d5c7f8cc6e9a3ad25dd102517164b97ab46cee3826160841a36c46a2b7b9c74da37bdc070ed41ba172033a"
"0000000a" # len
"7a0168df" # csum
"00000000" # timestamp
"1005" # WIRE_GOSSIP_STORE_CHANNEL_AMOUNT
"0000000001000000"
"00000082" # len
"f56ae7ee" # csum
"389dc73e" # csum
"5b8d9b44" # timestamp
"0102" # WIRE_CHANNEL_UPDATE
"1ea7c2eadf8a29eb8690511a519b5656e29aa0a853771c4e38e65c5abf43d907295a915e69e451f4c7a0c3dc13dd943cfbe3ae88c0b96667cd7d58955dbfedcf43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea33090000000013a63c0000b500015b8d9b440000009000000000000003e8000003e800000001"
"00000095" # len
"3d934473" # csum
"ba5693e3" # csum
"5aab817c" # timestamp
"0101" # WIRE_NODE_ANNOUNCEMENT
"cf5d870bc7ecabcb7cd16898ef66891e5f0c6c5851bd85b670f03d325bc44d7544d367cd852e18ec03f7f4ff369b06860a3b12b07b29f36fb318ca11348bf8ec00005aab817c03f113414ebdc6c1fb0f33c99cd5a1d09dd79e7fdf2468cf1fe1af6674361695d23974b250757a7a6c6549544300000000000000000000000000000000000000000000000007010566933e2607"))
l1.start()
# May preceed the Started msg waited for in 'start'.
wait_for(lambda: l1.daemon.is_in_log('gossip_store: Read 1/1/1/0 cannounce/cupdate/nannounce/cdelete from store in 754 bytes'))
wait_for(lambda: l1.daemon.is_in_log('gossip_store: Read 1/1/1/0 cannounce/cupdate/nannounce/cdelete from store in 770 bytes'))
assert not l1.daemon.is_in_log('gossip_store.*truncating')