lightningd: hook forwards into the wait system.

This table doesn't have `id`, except as the implicit one in Sqlite3,
so we need to add it for postgres.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell 2023-10-28 13:41:01 +10:30
parent c4f2ada2ff
commit e58ae31947
5 changed files with 116 additions and 2 deletions

View File

@ -1,14 +1,62 @@
#include "config.h" #include "config.h"
#include <ccan/mem/mem.h> #include <ccan/mem/mem.h>
#include <ccan/tal/str/str.h>
#include <common/json_command.h> #include <common/json_command.h>
#include <common/json_param.h> #include <common/json_param.h>
#include <common/json_stream.h> #include <common/json_stream.h>
#include <inttypes.h>
#include <lightningd/forwards.h> #include <lightningd/forwards.h>
#include <lightningd/htlc_end.h> #include <lightningd/htlc_end.h>
#include <lightningd/jsonrpc.h> #include <lightningd/jsonrpc.h>
#include <lightningd/lightningd.h> #include <lightningd/lightningd.h>
#include <wallet/wallet.h> #include <wallet/wallet.h>
static u64 forward_index_inc(struct lightningd *ld,
enum forward_status status,
struct short_channel_id in_channel,
struct amount_msat in_amount,
const struct short_channel_id *out_channel,
enum wait_index idx)
{
return wait_index_increment(ld, WAIT_SUBSYSTEM_FORWARD, idx,
"status", forward_status_name(status),
"in_channel", short_channel_id_to_str(tmpctx, &in_channel),
"=in_msat", tal_fmt(tmpctx, "%"PRIu64, in_amount.millisatoshis), /* Raw: JSON output */
"out_channel", out_channel ? short_channel_id_to_str(tmpctx, out_channel): NULL,
NULL);
}
void forward_index_deleted(struct lightningd *ld,
enum forward_status status,
struct short_channel_id in_channel,
struct amount_msat in_amount,
const struct short_channel_id *out_channel)
{
forward_index_inc(ld, status, in_channel, in_amount, out_channel,
WAIT_INDEX_DELETED);
}
/* Fortuntely, dbids start at 1, not 0! */
u64 forward_index_created(struct lightningd *ld,
enum forward_status status,
struct short_channel_id in_channel,
struct amount_msat in_amount,
const struct short_channel_id *out_channel)
{
return forward_index_inc(ld, status, in_channel, in_amount, out_channel,
WAIT_INDEX_CREATED);
}
u64 forward_index_update_status(struct lightningd *ld,
enum forward_status status,
struct short_channel_id in_channel,
struct amount_msat in_amount,
const struct short_channel_id *out_channel)
{
return forward_index_inc(ld, status, in_channel, in_amount, out_channel,
WAIT_INDEX_UPDATED);
}
bool string_to_forward_status(const char *status_str, bool string_to_forward_status(const char *status_str,
size_t len, size_t len,
enum forward_status *status) enum forward_status *status)

View File

@ -5,6 +5,7 @@
#include <wire/onion_wire.h> #include <wire/onion_wire.h>
struct json_stream; struct json_stream;
struct lightningd;
struct sha256; struct sha256;
/* /!\ This is a DB ENUM, please do not change the numbering of any /* /!\ This is a DB ENUM, please do not change the numbering of any
@ -87,4 +88,20 @@ static inline const char *forward_style_name(enum forward_style style)
abort(); abort();
} }
/* wait() hooks in here */
void forward_index_deleted(struct lightningd *ld,
enum forward_status status,
struct short_channel_id in_channel,
struct amount_msat in_amount,
const struct short_channel_id *out_channel);
u64 forward_index_created(struct lightningd *ld,
enum forward_status status,
struct short_channel_id in_channel,
struct amount_msat in_amount,
const struct short_channel_id *out_channel);
u64 forward_index_update_status(struct lightningd *ld,
enum forward_status status,
struct short_channel_id in_channel,
struct amount_msat in_amount,
const struct short_channel_id *out_channel);
#endif /* LIGHTNING_LIGHTNINGD_FORWARDS_H */ #endif /* LIGHTNING_LIGHTNINGD_FORWARDS_H */

View File

@ -20,6 +20,7 @@ struct waiter {
static const char *subsystem_names[] = { static const char *subsystem_names[] = {
"forwards",
"sendpays", "sendpays",
"invoices", "invoices",
}; };
@ -45,8 +46,8 @@ const char *wait_index_name(enum wait_index index)
const char *wait_subsystem_name(enum wait_subsystem subsystem) const char *wait_subsystem_name(enum wait_subsystem subsystem)
{ {
switch (subsystem) { switch (subsystem) {
case WAIT_SUBSYSTEM_FORWARD:
case WAIT_SUBSYSTEM_SENDPAY: case WAIT_SUBSYSTEM_SENDPAY:
return subsystem_names[subsystem];
case WAIT_SUBSYSTEM_INVOICE: case WAIT_SUBSYSTEM_INVOICE:
return subsystem_names[subsystem]; return subsystem_names[subsystem];
} }

View File

@ -5,8 +5,9 @@
struct lightningd; struct lightningd;
/* This WAIT_SUBSYSTEM_X corresponds to listX */ /* This WAIT_SUBSYSTEM_X corresponds to listXs */
enum wait_subsystem { enum wait_subsystem {
WAIT_SUBSYSTEM_FORWARD,
WAIT_SUBSYSTEM_SENDPAY, WAIT_SUBSYSTEM_SENDPAY,
WAIT_SUBSYSTEM_INVOICE, WAIT_SUBSYSTEM_INVOICE,
}; };

View File

@ -76,6 +76,12 @@ static void migrate_invoice_created_index_var(struct lightningd *ld,
static void migrate_initialize_payment_wait_indexes(struct lightningd *ld, static void migrate_initialize_payment_wait_indexes(struct lightningd *ld,
struct db *db); struct db *db);
static void migrate_forwards_add_rowid(struct lightningd *ld,
struct db *db);
static void migrate_initialize_forwards_wait_indexes(struct lightningd *ld,
struct db *db);
/* Do not reorder or remove elements from this array, it is used to /* Do not reorder or remove elements from this array, it is used to
* migrate existing databases from a previous state, based on the * migrate existing databases from a previous state, based on the
* string indices */ * string indices */
@ -1000,6 +1006,10 @@ static struct migration dbmigrations[] = {
{SQL("ALTER TABLE payments ADD updated_index BIGINT DEFAULT 0"), NULL}, {SQL("ALTER TABLE payments ADD updated_index BIGINT DEFAULT 0"), NULL},
{SQL("CREATE INDEX payments_update_idx ON payments (updated_index)"), NULL}, {SQL("CREATE INDEX payments_update_idx ON payments (updated_index)"), NULL},
{NULL, migrate_initialize_payment_wait_indexes}, {NULL, migrate_initialize_payment_wait_indexes},
{NULL, migrate_forwards_add_rowid},
{SQL("ALTER TABLE forwards ADD updated_index BIGINT DEFAULT 0"), NULL},
{SQL("CREATE INDEX forwards_updated_idx ON forwards (updated_index)"), NULL},
{NULL, migrate_initialize_forwards_wait_indexes},
}; };
/** /**
@ -1778,6 +1788,43 @@ static void migrate_initialize_payment_wait_indexes(struct lightningd *ld,
"MAX(id)"); "MAX(id)");
} }
static void migrate_forwards_add_rowid(struct lightningd *ld,
struct db *db)
{
struct db_stmt *stmt;
/* sqlite3 has implicit "rowid" column already */
if (streq(db->config->name, "sqlite3"))
return;
stmt = db_prepare_v2(db, SQL("ALTER TABLE forwards ADD rowid BIGINT"));
db_exec_prepared_v2(take(stmt));
/* Yes, I got ChatGPT to write this for me! */
stmt = db_prepare_v2(db, SQL("WITH numbered_rows AS ("
" SELECT in_channel_scid, in_htlc_id, row_number() OVER () AS rn"
" FROM forwards)"
" UPDATE forwards"
" SET rowid = numbered_rows.rn"
" FROM numbered_rows"
" WHERE forwards.in_channel_scid = numbered_rows.in_channel_scid"
" AND forwards.in_htlc_id = numbered_rows.in_htlc_id;"));
db_exec_prepared_v2(take(stmt));
stmt = db_prepare_v2(db, SQL("CREATE INDEX forwards_created_idx ON forwards (rowid)"));
db_exec_prepared_v2(take(stmt));
}
static void migrate_initialize_forwards_wait_indexes(struct lightningd *ld,
struct db *db)
{
migrate_initialize_wait_indexes(db,
WAIT_SUBSYSTEM_FORWARD,
WAIT_INDEX_CREATED,
SQL("SELECT MAX(rowid) FROM forwards;"),
"MAX(rowid)");
}
static void complain_unfixed(struct lightningd *ld, static void complain_unfixed(struct lightningd *ld,
enum channel_state state, enum channel_state state,
u64 id, u64 id,