mirror of
https://github.com/ElementsProject/lightning.git
synced 2024-11-19 01:43:36 +01:00
plugins/sql: refresh listnodes and listchannels by monitoring the gossip_store.
It's quite a lot of code, but these are the most expensive commands we do so it's worth it. Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
parent
aa3a1131aa
commit
9b08c4f25a
@ -218,7 +218,7 @@ plugins/sql-schema_gen.h: plugins/Makefile $(SQL_LISTRPCS_SCHEMAS)
|
||||
@$(call VERBOSE,GEN $@, (SEP=""; echo -n '"{'; for f in $(SQL_LISTRPCS); do echo -n "$$SEP\\\"$$f\\\":"; sed -e s/\"description\":\ *\".\*\"/\"\":\"\"/ -e s/\".*\":\ *{}/\"\":{}/ -e s/\"/\\\\\"/g < doc/schemas/$$f.schema.json | tr -d ' \n'; SEP=","; done; echo '}"') > $@)
|
||||
|
||||
plugins/sql.o: plugins/sql-schema_gen.h
|
||||
plugins/sql: $(PLUGIN_SQL_OBJS) $(PLUGIN_LIB_OBJS) $(PLUGIN_COMMON_OBJS) $(JSMN_OBJS)
|
||||
plugins/sql: $(PLUGIN_SQL_OBJS) $(PLUGIN_LIB_OBJS) $(PLUGIN_COMMON_OBJS) $(JSMN_OBJS) common/gossip_store.o gossipd/gossip_store_wiregen.o
|
||||
|
||||
# Generated from PLUGINS definition in plugins/Makefile
|
||||
ALL_C_HEADERS += plugins/list_of_builtin_plugins_gen.h
|
||||
|
296
plugins/sql.c
296
plugins/sql.c
@ -4,11 +4,19 @@
|
||||
#include <ccan/err/err.h>
|
||||
#include <ccan/strmap/strmap.h>
|
||||
#include <ccan/tal/str/str.h>
|
||||
#include <common/gossip_store.h>
|
||||
#include <common/json_param.h>
|
||||
#include <common/json_stream.h>
|
||||
#include <common/memleak.h>
|
||||
#include <common/type_to_string.h>
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <gossipd/gossip_store_wiregen.h>
|
||||
#include <plugins/libplugin.h>
|
||||
#include <sqlite3.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/types.h>
|
||||
#include <unistd.h>
|
||||
|
||||
/* Minimized schemas. C23 #embed, Where Art Thou? */
|
||||
static const char schemas[] =
|
||||
@ -107,6 +115,8 @@ static STRMAP(struct table_desc *) tablemap;
|
||||
static size_t max_dbmem = 500000000;
|
||||
static struct sqlite3 *db;
|
||||
static const char *dbfilename;
|
||||
static int gosstore_fd = -1;
|
||||
static size_t gosstore_nodes_off = 0, gosstore_channels_off = 0;
|
||||
|
||||
/* It was tempting to put these in the schema, but they're really
|
||||
* just for our usage. Though that would allow us to autogen the
|
||||
@ -622,6 +632,285 @@ static struct command_result *default_refresh(struct command *cmd,
|
||||
return send_outreq(cmd->plugin, req);
|
||||
}
|
||||
|
||||
static bool extract_scid(int gosstore_fd, size_t off, u16 type,
|
||||
struct short_channel_id *scid)
|
||||
{
|
||||
be64 raw;
|
||||
|
||||
/* BOLT #7:
|
||||
* 1. type: 258 (`channel_update`)
|
||||
* 2. data:
|
||||
* * [`signature`:`signature`]
|
||||
* * [`chain_hash`:`chain_hash`]
|
||||
* * [`short_channel_id`:`short_channel_id`]
|
||||
*/
|
||||
/* Note that first two bytes are message type */
|
||||
const size_t update_scid_off = 2 + (64 + 32);
|
||||
|
||||
off += sizeof(struct gossip_hdr);
|
||||
/* For delete_chan scid immediately follows type */
|
||||
if (type == WIRE_GOSSIP_STORE_DELETE_CHAN)
|
||||
off += 2;
|
||||
else if (type == WIRE_GOSSIP_STORE_PRIVATE_UPDATE)
|
||||
/* Prepend header */
|
||||
off += 2 + 2 + update_scid_off;
|
||||
else if (type == WIRE_CHANNEL_UPDATE)
|
||||
off += update_scid_off;
|
||||
else
|
||||
abort();
|
||||
|
||||
if (pread(gosstore_fd, &raw, sizeof(raw), off) != sizeof(raw))
|
||||
return false;
|
||||
scid->u64 = be64_to_cpu(raw);
|
||||
return true;
|
||||
}
|
||||
|
||||
/* Note: this deletes up to two rows, one for each direction. */
|
||||
static void delete_channel_from_db(struct command *cmd,
|
||||
struct short_channel_id scid)
|
||||
{
|
||||
int err;
|
||||
char *errmsg;
|
||||
|
||||
err = sqlite3_exec(db,
|
||||
tal_fmt(tmpctx,
|
||||
"DELETE FROM channels"
|
||||
" WHERE short_channel_id = '%s'",
|
||||
short_channel_id_to_str(tmpctx, &scid)),
|
||||
NULL, NULL, &errmsg);
|
||||
if (err != SQLITE_OK)
|
||||
plugin_err(cmd->plugin, "Could not delete from channels: %s",
|
||||
errmsg);
|
||||
}
|
||||
|
||||
static struct command_result *channels_refresh(struct command *cmd,
|
||||
const struct table_desc *td,
|
||||
struct db_query *dbq);
|
||||
|
||||
static struct command_result *listchannels_one_done(struct command *cmd,
|
||||
const char *buf,
|
||||
const jsmntok_t *result,
|
||||
struct db_query *dbq)
|
||||
{
|
||||
const struct table_desc *td = dbq->tables[0];
|
||||
struct command_result *ret;
|
||||
|
||||
ret = process_json_result(cmd, buf, result, td);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
/* Continue to refresh more channels */
|
||||
return channels_refresh(cmd, td, dbq);
|
||||
}
|
||||
|
||||
static struct command_result *channels_refresh(struct command *cmd,
|
||||
const struct table_desc *td,
|
||||
struct db_query *dbq)
|
||||
{
|
||||
struct out_req *req;
|
||||
size_t msglen;
|
||||
u16 type, flags;
|
||||
|
||||
if (gosstore_fd == -1) {
|
||||
gosstore_fd = open("gossip_store", O_RDONLY);
|
||||
if (gosstore_fd == -1)
|
||||
plugin_err(cmd->plugin, "Could not open gossip_store: %s",
|
||||
strerror(errno));
|
||||
}
|
||||
|
||||
/* First time, set off to end and load from scratch */
|
||||
if (gosstore_channels_off == 0) {
|
||||
gosstore_channels_off = find_gossip_store_end(gosstore_fd, 1);
|
||||
return default_refresh(cmd, td, dbq);
|
||||
}
|
||||
|
||||
plugin_log(cmd->plugin, LOG_DBG, "Refreshing channels @%zu...",
|
||||
gosstore_channels_off);
|
||||
|
||||
/* OK, try catching up! */
|
||||
while (gossip_store_readhdr(gosstore_fd, gosstore_channels_off,
|
||||
&msglen, NULL, &flags, &type)) {
|
||||
struct short_channel_id scid;
|
||||
size_t off = gosstore_channels_off;
|
||||
|
||||
gosstore_channels_off += sizeof(struct gossip_hdr) + msglen;
|
||||
|
||||
if (flags & GOSSIP_STORE_DELETED_BIT)
|
||||
continue;
|
||||
|
||||
if (type == WIRE_GOSSIP_STORE_ENDED) {
|
||||
/* Force a reopen */
|
||||
gosstore_channels_off = gosstore_nodes_off = 0;
|
||||
close(gosstore_fd);
|
||||
gosstore_fd = -1;
|
||||
return channels_refresh(cmd, td, dbq);
|
||||
}
|
||||
|
||||
/* If we see a channel_announcement, we don't care until we
|
||||
* see the channel_update */
|
||||
if (type == WIRE_CHANNEL_UPDATE
|
||||
|| type == WIRE_GOSSIP_STORE_PRIVATE_UPDATE) {
|
||||
/* This can fail if entry not fully written yet. */
|
||||
if (!extract_scid(gosstore_fd, off, type, &scid)) {
|
||||
gosstore_channels_off = off;
|
||||
break;
|
||||
}
|
||||
|
||||
plugin_log(cmd->plugin, LOG_DBG, "Refreshing channel: %s",
|
||||
type_to_string(tmpctx, struct short_channel_id, &scid));
|
||||
/* FIXME: sqlite 3.24.0 (2018-06-04) added UPSERT, but
|
||||
* we don't require it. */
|
||||
delete_channel_from_db(cmd, scid);
|
||||
req = jsonrpc_request_start(cmd->plugin, cmd, "listchannels",
|
||||
listchannels_one_done,
|
||||
forward_error,
|
||||
dbq);
|
||||
json_add_short_channel_id(req->js, "short_channel_id", &scid);
|
||||
return send_outreq(cmd->plugin, req);
|
||||
} else if (type == WIRE_GOSSIP_STORE_DELETE_CHAN) {
|
||||
/* This can fail if entry not fully written yet. */
|
||||
if (!extract_scid(gosstore_fd, off, type, &scid)) {
|
||||
gosstore_channels_off = off;
|
||||
break;
|
||||
}
|
||||
plugin_log(cmd->plugin, LOG_DBG, "Deleting channel: %s",
|
||||
type_to_string(tmpctx, struct short_channel_id, &scid));
|
||||
delete_channel_from_db(cmd, scid);
|
||||
}
|
||||
}
|
||||
|
||||
return one_refresh_done(cmd, dbq);
|
||||
}
|
||||
|
||||
static struct command_result *nodes_refresh(struct command *cmd,
|
||||
const struct table_desc *td,
|
||||
struct db_query *dbq);
|
||||
|
||||
static struct command_result *listnodes_one_done(struct command *cmd,
|
||||
const char *buf,
|
||||
const jsmntok_t *result,
|
||||
struct db_query *dbq)
|
||||
{
|
||||
const struct table_desc *td = dbq->tables[0];
|
||||
struct command_result *ret;
|
||||
|
||||
ret = process_json_result(cmd, buf, result, td);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
/* Continue to refresh more nodes */
|
||||
return nodes_refresh(cmd, td, dbq);
|
||||
}
|
||||
|
||||
static void delete_node_from_db(struct command *cmd,
|
||||
const struct node_id *id)
|
||||
{
|
||||
int err;
|
||||
char *errmsg;
|
||||
|
||||
err = sqlite3_exec(db,
|
||||
tal_fmt(tmpctx,
|
||||
"DELETE FROM nodes"
|
||||
" WHERE nodeid = %s",
|
||||
node_id_to_hexstr(tmpctx, id)),
|
||||
NULL, NULL, &errmsg);
|
||||
if (err != SQLITE_OK)
|
||||
plugin_err(cmd->plugin, "Could not delete from nodes: %s",
|
||||
errmsg);
|
||||
}
|
||||
|
||||
static bool extract_node_id(int gosstore_fd, size_t off, u16 type,
|
||||
struct node_id *id)
|
||||
{
|
||||
/* BOLT #7:
|
||||
* 1. type: 257 (`node_announcement`)
|
||||
* 2. data:
|
||||
* * [`signature`:`signature`]
|
||||
* * [`u16`:`flen`]
|
||||
* * [`flen*byte`:`features`]
|
||||
* * [`u32`:`timestamp`]
|
||||
* * [`point`:`node_id`]
|
||||
*/
|
||||
const size_t feature_len_off = 2 + 64;
|
||||
be16 flen;
|
||||
size_t node_id_off;
|
||||
|
||||
off += sizeof(struct gossip_hdr);
|
||||
|
||||
if (pread(gosstore_fd, &flen, sizeof(flen), off + feature_len_off)
|
||||
!= sizeof(flen))
|
||||
return false;
|
||||
|
||||
node_id_off = off + feature_len_off + 2 + flen + 4;
|
||||
if (pread(gosstore_fd, id, sizeof(*id), node_id_off) != sizeof(*id))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static struct command_result *nodes_refresh(struct command *cmd,
|
||||
const struct table_desc *td,
|
||||
struct db_query *dbq)
|
||||
{
|
||||
struct out_req *req;
|
||||
size_t msglen;
|
||||
u16 type, flags;
|
||||
|
||||
if (gosstore_fd == -1) {
|
||||
gosstore_fd = open("gossip_store", O_RDONLY);
|
||||
if (gosstore_fd == -1)
|
||||
plugin_err(cmd->plugin, "Could not open gossip_store: %s",
|
||||
strerror(errno));
|
||||
}
|
||||
|
||||
/* First time, set off to end and load from scratch */
|
||||
if (gosstore_nodes_off == 0) {
|
||||
gosstore_nodes_off = find_gossip_store_end(gosstore_fd, 1);
|
||||
return default_refresh(cmd, td, dbq);
|
||||
}
|
||||
|
||||
/* OK, try catching up! */
|
||||
while (gossip_store_readhdr(gosstore_fd, gosstore_nodes_off,
|
||||
&msglen, NULL, &flags, &type)) {
|
||||
struct node_id id;
|
||||
size_t off = gosstore_nodes_off;
|
||||
|
||||
gosstore_nodes_off += sizeof(struct gossip_hdr) + msglen;
|
||||
|
||||
if (flags & GOSSIP_STORE_DELETED_BIT)
|
||||
continue;
|
||||
|
||||
if (type == WIRE_GOSSIP_STORE_ENDED) {
|
||||
/* Force a reopen */
|
||||
gosstore_nodes_off = gosstore_channels_off = 0;
|
||||
close(gosstore_fd);
|
||||
gosstore_fd = -1;
|
||||
return nodes_refresh(cmd, td, dbq);
|
||||
}
|
||||
|
||||
if (type == WIRE_NODE_ANNOUNCEMENT) {
|
||||
/* This can fail if entry not fully written yet. */
|
||||
if (!extract_node_id(gosstore_fd, off, type, &id)) {
|
||||
gosstore_nodes_off = off;
|
||||
break;
|
||||
}
|
||||
|
||||
/* FIXME: sqlite 3.24.0 (2018-06-04) added UPSERT, but
|
||||
* we don't require it. */
|
||||
delete_node_from_db(cmd, &id);
|
||||
req = jsonrpc_request_start(cmd->plugin, cmd, "listnodes",
|
||||
listnodes_one_done,
|
||||
forward_error,
|
||||
dbq);
|
||||
json_add_node_id(req->js, "id", &id);
|
||||
return send_outreq(cmd->plugin, req);
|
||||
}
|
||||
/* FIXME: Add WIRE_GOSSIP_STORE_DELETE_NODE marker! */
|
||||
}
|
||||
|
||||
return one_refresh_done(cmd, dbq);
|
||||
}
|
||||
|
||||
static struct command_result *refresh_tables(struct command *cmd,
|
||||
struct db_query *dbq)
|
||||
{
|
||||
@ -806,7 +1095,12 @@ static struct table_desc *new_table_desc(struct table_desc *parent,
|
||||
td->is_subobject = is_subobject;
|
||||
td->arrname = json_strdup(td, schemas, arrname);
|
||||
td->columns = tal_arr(td, struct column, 0);
|
||||
td->refresh = default_refresh;
|
||||
if (streq(td->name, "channels"))
|
||||
td->refresh = channels_refresh;
|
||||
else if (streq(td->name, "nodes"))
|
||||
td->refresh = nodes_refresh;
|
||||
else
|
||||
td->refresh = default_refresh;
|
||||
return td;
|
||||
}
|
||||
|
||||
|
@ -3734,7 +3734,7 @@ def test_sql(node_factory, bitcoind):
|
||||
|
||||
# And I need at least one HTLC in-flight so listpeers.channels.htlcs isn't empty:
|
||||
l3.rpc.plugin_start(os.path.join(os.getcwd(), 'tests/plugins/hold_invoice.py'),
|
||||
holdtime=10000)
|
||||
holdtime=TIMEOUT * 2)
|
||||
inv = l3.rpc.invoice(amount_msat=12300, label='inv3', description='description')
|
||||
route = l1.rpc.getroute(l3.info['id'], 12300, 1)['route']
|
||||
l1.rpc.sendpay(route, inv['payment_hash'], payment_secret=inv['payment_secret'])
|
||||
@ -3772,3 +3772,18 @@ def test_sql(node_factory, bitcoind):
|
||||
|
||||
with pytest.raises(RpcError, match='Unauthorized'):
|
||||
l2.rpc.sql("DELETE FROM forwards;")
|
||||
|
||||
assert len(l3.rpc.sql("SELECT * FROM channels;")['rows']) == 4
|
||||
# Check that channels gets refreshed!
|
||||
scid = l1.get_channel_scid(l2)
|
||||
l1.rpc.setchannel(scid, feebase=123)
|
||||
wait_for(lambda: l3.rpc.sql("SELECT short_channel_id FROM channels WHERE base_fee_millisatoshi = 123;")['rows'] == [[scid]])
|
||||
l3.daemon.wait_for_log("Refreshing channels...")
|
||||
l3.daemon.wait_for_log("Refreshing channel: {}".format(scid))
|
||||
|
||||
# This has to wait for the hold_invoice plugin to let go!
|
||||
l1.rpc.close(l2.info['id'])
|
||||
bitcoind.generate_block(13, wait_for_mempool=1)
|
||||
wait_for(lambda: len(l3.rpc.listchannels()['channels']) == 2)
|
||||
assert len(l3.rpc.sql("SELECT * FROM channels;")['rows']) == 2
|
||||
l3.daemon.wait_for_log("Deleting channel: {}".format(scid))
|
||||
|
Loading…
Reference in New Issue
Block a user