common/gossip_store: avoid fd pass for new store, use end marker.

This is also simpler.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell 2021-05-22 16:39:53 +09:30
parent c2a88912e1
commit 9dadcc858b
9 changed files with 43 additions and 139 deletions

View File

@ -60,6 +60,7 @@ CLOSINGD_COMMON_OBJS := \
common/version.o \
common/wire_error.o \
common/wireaddr.o \
gossipd/gossip_store_wiregen.o \
gossipd/gossipd_peerd_wiregen.o
lightningd/lightning_closingd: $(CLOSINGD_OBJS) $(WIRE_ONION_OBJS) $(CLOSINGD_COMMON_OBJS) $(WIRE_OBJS) $(BITCOIN_OBJS) $(HSMD_CLIENT_OBJS)

View File

@ -8,7 +8,11 @@
#include <common/status.h>
#include <common/utils.h>
#include <errno.h>
#include <fcntl.h>
#include <gossipd/gossip_store_wiregen.h>
#include <inttypes.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <wire/peer_wire.h>
@ -72,6 +76,32 @@ static void failed_read(int fd, int len)
lseek(fd, -len, SEEK_CUR);
}
static void reopen_gossip_store(struct per_peer_state *pps,
const u8 *msg)
{
u64 equivalent_offset;
int newfd;
if (!fromwire_gossip_store_ended(msg, &equivalent_offset))
status_failed(STATUS_FAIL_GOSSIP_IO,
"Bad gossipd GOSSIP_STORE_ENDED msg: %s",
tal_hex(tmpctx, msg));
newfd = open(GOSSIP_STORE_FILENAME, O_RDONLY);
if (newfd < 0)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Cannot open %s: %s",
GOSSIP_STORE_FILENAME,
strerror(errno));
status_debug("gossip_store at end, new fd moved to %"PRIu64,
equivalent_offset);
lseek(newfd, equivalent_offset, SEEK_SET);
close(pps->gossip_store_fd);
pps->gossip_store_fd = newfd;
}
u8 *gossip_store_next(const tal_t *ctx, struct per_peer_state *pps)
{
u8 *msg = NULL;
@ -132,9 +162,11 @@ u8 *gossip_store_next(const tal_t *ctx, struct per_peer_state *pps)
continue;
}
/* Ignore gossipd internal messages. */
type = fromwire_peektype(msg);
if (type != WIRE_CHANNEL_ANNOUNCEMENT
if (type == WIRE_GOSSIP_STORE_ENDED)
reopen_gossip_store(pps, msg);
/* Ignore gossipd internal messages. */
else if (type != WIRE_CHANNEL_ANNOUNCEMENT
&& type != WIRE_CHANNEL_UPDATE
&& type != WIRE_NODE_ANNOUNCEMENT)
msg = tal_free(msg);
@ -144,54 +176,3 @@ u8 *gossip_store_next(const tal_t *ctx, struct per_peer_state *pps)
return msg;
}
/* newfd is at offset 1. We need to adjust it to similar offset as our
* current one. */
void gossip_store_switch_fd(struct per_peer_state *pps,
int newfd, u64 offset_shorter)
{
u64 cur = lseek(pps->gossip_store_fd, 0, SEEK_CUR);
/* If we're already at end (common), we know where to go in new one. */
if (cur == lseek(pps->gossip_store_fd, 0, SEEK_END)) {
status_debug("gossip_store at end, new fd moved to %"PRIu64,
cur - offset_shorter);
assert(cur > offset_shorter);
lseek(newfd, cur - offset_shorter, SEEK_SET);
} else if (cur > offset_shorter) {
/* We're part way through. Worst case, we should move back by
* offset_shorter (that's how much the *end* moved), but in
* practice we'll probably end up retransmitting some stuff */
u64 target = cur - offset_shorter;
size_t num = 0;
status_debug("gossip_store new fd moving back %"PRIu64
" to %"PRIu64,
cur, target);
cur = 1;
while (cur < target) {
u32 msglen;
struct gossip_hdr hdr;
if (read(newfd, &hdr, sizeof(hdr)) != sizeof(hdr))
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"gossip_store: "
"can't read hdr offset %"PRIu64
" in new store target %"PRIu64,
cur, target);
/* Skip over it. */
msglen = (be32_to_cpu(hdr.len)
& ~GOSSIP_STORE_LEN_DELETED_BIT);
cur = lseek(newfd, msglen, SEEK_CUR);
num++;
}
status_debug("gossip_store: skipped %zu records to %"PRIu64,
num, cur);
} else
status_debug("gossip_store new fd moving back %"PRIu64
" to start (offset_shorter=%"PRIu64")",
cur, offset_shorter);
close(pps->gossip_store_fd);
pps->gossip_store_fd = newfd;
}

View File

@ -116,13 +116,7 @@ bool is_wrong_channel(const u8 *msg, const struct channel_id *expected,
void handle_gossip_msg(struct per_peer_state *pps, const u8 *msg TAKES)
{
u8 *gossip;
u64 offset_shorter;
if (fromwire_gossipd_new_store_fd(msg, &offset_shorter)) {
gossip_store_switch_fd(pps, fdpass_recv(pps->gossip_fd),
offset_shorter);
goto out;
} else
/* It's a raw gossip msg: this copies or takes() */
gossip = tal_dup_talarr(tmpctx, u8, msg);
@ -133,10 +127,6 @@ void handle_gossip_msg(struct per_peer_state *pps, const u8 *msg TAKES)
} else {
sync_crypto_write(pps, gossip);
}
out:
if (taken(msg))
tal_free(msg);
}
/* takes iff returns true */

View File

@ -459,7 +459,7 @@ bool gossip_store_compact(struct gossip_store *gs)
{
size_t count = 0, deleted = 0;
int fd;
u64 off, len = sizeof(gs->version), oldlen, idx;
u64 off, len = sizeof(gs->version), idx;
struct offmap *offmap;
struct gossip_hdr hdr;
struct offmap_iter oit;
@ -572,18 +572,15 @@ bool gossip_store_compact(struct gossip_store *gs)
deleted, count, len);
/* Write end marker now new one is ready */
oldlen = gs->len;
append_msg(gs->fd, towire_gossip_store_ended(tmpctx, len),
0, false, &oldlen);
0, false, &gs->len);
gs->count = count;
gs->deleted = 0;
off = gs->len - len;
gs->len = len;
close(gs->fd);
gs->fd = fd;
update_peers_broadcast_index(gs->peers, off);
return true;
unlink_disable:

View File

@ -285,32 +285,6 @@ static u8 *handle_channel_update_msg(struct peer *peer, const u8 *msg)
return NULL;
}
/*~ When we compact the gossip store, all the broadcast indexs move.
* We simply offset everyone, which means in theory they could retransmit
* some, but that's a lesser evil than skipping some. */
void update_peers_broadcast_index(struct list_head *peers, u32 offset)
{
struct peer *peer, *next;
list_for_each_safe(peers, peer, next, list) {
int gs_fd;
/*~ Since store has been compacted, they need a new fd for the
* new store. We also tell them how much this is shrunk, so
* they can (approximately) tell where to start in the new store.
*/
gs_fd = gossip_store_readonly_fd(peer->daemon->rstate->gs);
if (gs_fd < 0) {
status_broken("Can't get read-only gossip store fd:"
" killing peer");
tal_free(peer);
} else {
u8 *msg = towire_gossipd_new_store_fd(NULL, offset);
daemon_conn_send(peer->dc, take(msg));
daemon_conn_send_fd(peer->dc, gs_fd);
}
}
}
/*~ For simplicity, all pings and pongs are forwarded to us here in gossipd. */
static u8 *handle_ping(struct peer *peer, const u8 *ping)
{
@ -787,7 +761,6 @@ static struct io_plan *peer_msg_in(struct io_conn *conn,
/* These are the ones we send, not them */
case WIRE_GOSSIPD_GET_UPDATE_REPLY:
case WIRE_GOSSIPD_NEW_STORE_FD:
break;
}

View File

@ -22,11 +22,6 @@ msgdata,gossipd_local_channel_update,fee_base_msat,u32,
msgdata,gossipd_local_channel_update,fee_proportional_millionths,u32,
msgdata,gossipd_local_channel_update,htlc_maximum_msat,amount_msat,
# Update your gossip_store fd: + gossip_store_fd
msgtype,gossipd_new_store_fd,3505
# How much shorter the new store is, so you can offset streaming.
msgdata,gossipd_new_store_fd,offset_shorter,u64,
# Send this channel_announcement
msgtype,gossipd_local_channel_announcement,3506
msgdata,gossipd_local_channel_announcement,len,u16,

1 # These must be distinct from WIRE_CHANNEL_ANNOUNCEMENT etc. gossip msgs!
22 msgtype,gossipd_new_store_fd,3505 msgtype,gossipd_local_channel_announcement,3506
23 # How much shorter the new store is, so you can offset streaming. msgdata,gossipd_local_channel_announcement,len,u16,
24 msgdata,gossipd_new_store_fd,offset_shorter,u64, msgdata,gossipd_local_channel_announcement,cannount,u8,len
# Send this channel_announcement
msgtype,gossipd_local_channel_announcement,3506
msgdata,gossipd_local_channel_announcement,len,u16,
msgdata,gossipd_local_channel_announcement,cannount,u8,len
25
26
27

View File

@ -24,7 +24,6 @@ const char *gossipd_peerd_wire_name(int e)
case WIRE_GOSSIPD_GET_UPDATE: return "WIRE_GOSSIPD_GET_UPDATE";
case WIRE_GOSSIPD_GET_UPDATE_REPLY: return "WIRE_GOSSIPD_GET_UPDATE_REPLY";
case WIRE_GOSSIPD_LOCAL_CHANNEL_UPDATE: return "WIRE_GOSSIPD_LOCAL_CHANNEL_UPDATE";
case WIRE_GOSSIPD_NEW_STORE_FD: return "WIRE_GOSSIPD_NEW_STORE_FD";
case WIRE_GOSSIPD_LOCAL_CHANNEL_ANNOUNCEMENT: return "WIRE_GOSSIPD_LOCAL_CHANNEL_ANNOUNCEMENT";
}
@ -38,7 +37,6 @@ bool gossipd_peerd_wire_is_defined(u16 type)
case WIRE_GOSSIPD_GET_UPDATE:;
case WIRE_GOSSIPD_GET_UPDATE_REPLY:;
case WIRE_GOSSIPD_LOCAL_CHANNEL_UPDATE:;
case WIRE_GOSSIPD_NEW_STORE_FD:;
case WIRE_GOSSIPD_LOCAL_CHANNEL_ANNOUNCEMENT:;
return true;
}
@ -135,30 +133,6 @@ bool fromwire_gossipd_local_channel_update(const void *p, struct short_channel_i
return cursor != NULL;
}
/* WIRE: GOSSIPD_NEW_STORE_FD */
/* Update your gossip_store fd: + gossip_store_fd */
u8 *towire_gossipd_new_store_fd(const tal_t *ctx, u64 offset_shorter)
{
u8 *p = tal_arr(ctx, u8, 0);
towire_u16(&p, WIRE_GOSSIPD_NEW_STORE_FD);
/* How much shorter the new store is */
towire_u64(&p, offset_shorter);
return memcheck(p, tal_count(p));
}
bool fromwire_gossipd_new_store_fd(const void *p, u64 *offset_shorter)
{
const u8 *cursor = p;
size_t plen = tal_count(p);
if (fromwire_u16(&cursor, &plen) != WIRE_GOSSIPD_NEW_STORE_FD)
return false;
/* How much shorter the new store is */
*offset_shorter = fromwire_u64(&cursor, &plen);
return cursor != NULL;
}
/* WIRE: GOSSIPD_LOCAL_CHANNEL_ANNOUNCEMENT */
/* Send this channel_announcement */
u8 *towire_gossipd_local_channel_announcement(const tal_t *ctx, const u8 *cannount)
@ -187,4 +161,4 @@ bool fromwire_gossipd_local_channel_announcement(const tal_t *ctx, const void *p
fromwire_u8_array(&cursor, &plen, *cannount, len);
return cursor != NULL;
}
// SHA256STAMP:3ffcd3b7d7815b6fbeaadc1b3b7235190eb584284f47e46ab8518eac91fd71b5
// SHA256STAMP:2ef99c782b9877add7912c680d3a48bed3372c6a6fe2410716651dbe777493eb

View File

@ -18,8 +18,6 @@ enum gossipd_peerd_wire {
WIRE_GOSSIPD_GET_UPDATE_REPLY = 3601,
/* Send this channel_update. */
WIRE_GOSSIPD_LOCAL_CHANNEL_UPDATE = 3504,
/* Update your gossip_store fd: + gossip_store_fd */
WIRE_GOSSIPD_NEW_STORE_FD = 3505,
/* Send this channel_announcement */
WIRE_GOSSIPD_LOCAL_CHANNEL_ANNOUNCEMENT = 3506,
};
@ -52,11 +50,6 @@ bool fromwire_gossipd_get_update_reply(const tal_t *ctx, const void *p, u8 **upd
u8 *towire_gossipd_local_channel_update(const tal_t *ctx, const struct short_channel_id *short_channel_id, bool disable, u16 cltv_expiry_delta, struct amount_msat htlc_minimum_msat, u32 fee_base_msat, u32 fee_proportional_millionths, struct amount_msat htlc_maximum_msat);
bool fromwire_gossipd_local_channel_update(const void *p, struct short_channel_id *short_channel_id, bool *disable, u16 *cltv_expiry_delta, struct amount_msat *htlc_minimum_msat, u32 *fee_base_msat, u32 *fee_proportional_millionths, struct amount_msat *htlc_maximum_msat);
/* WIRE: GOSSIPD_NEW_STORE_FD */
/* Update your gossip_store fd: + gossip_store_fd */
u8 *towire_gossipd_new_store_fd(const tal_t *ctx, u64 offset_shorter);
bool fromwire_gossipd_new_store_fd(const void *p, u64 *offset_shorter);
/* WIRE: GOSSIPD_LOCAL_CHANNEL_ANNOUNCEMENT */
/* Send this channel_announcement */
u8 *towire_gossipd_local_channel_announcement(const tal_t *ctx, const u8 *cannount);
@ -64,4 +57,4 @@ bool fromwire_gossipd_local_channel_announcement(const tal_t *ctx, const void *p
#endif /* LIGHTNING_GOSSIPD_GOSSIPD_PEERD_WIREGEN_H */
// SHA256STAMP:3ffcd3b7d7815b6fbeaadc1b3b7235190eb584284f47e46ab8518eac91fd71b5
// SHA256STAMP:2ef99c782b9877add7912c680d3a48bed3372c6a6fe2410716651dbe777493eb

View File

@ -81,8 +81,8 @@ OPENINGD_COMMON_OBJS := \
common/version.o \
common/wire_error.o \
common/wireaddr.o \
gossipd/gossipd_peerd_wiregen.o \
lightningd/gossip_msg.o
gossipd/gossip_store_wiregen.o \
gossipd/gossipd_peerd_wiregen.o
lightningd/lightning_openingd: $(OPENINGD_OBJS) $(OPENINGD_COMMON_OBJS) $(WIRE_OBJS) $(BITCOIN_OBJS) $(HSMD_CLIENT_OBJS)