wait: new command to wait on indexes.

This will initially be for listinvoices, but can be expanded to other
list commands.

It's documented, but it makes promises which currently don't exist:

* listinvoice does not support `index` or `start` yet.
* It doesn't actually fire when invoices change yet.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
Changelog-Added: JSON-RPC: `wait`: new generic command to wait for events.
This commit is contained in:
Rusty Russell 2023-07-22 13:56:36 +09:30
parent 8bcf65f02a
commit c2eadb88be
8 changed files with 361 additions and 0 deletions

View File

@ -104,6 +104,7 @@ MANPAGES := doc/lightning-cli.1 \
doc/lightning-txsend.7 \
doc/lightning-unreserveinputs.7 \
doc/lightning-utxopsbt.7 \
doc/lightning-wait.7 \
doc/lightning-waitinvoice.7 \
doc/lightning-waitanyinvoice.7 \
doc/lightning-waitblockheight.7 \

View File

@ -140,6 +140,7 @@ Core Lightning Documentation
lightning-unreserveinputs <lightning-unreserveinputs.7.md>
lightning-upgradewallet <lightning-upgradewallet.7.md>
lightning-utxopsbt <lightning-utxopsbt.7.md>
lightning-wait <lightning-wait.7.md>
lightning-waitanyinvoice <lightning-waitanyinvoice.7.md>
lightning-waitblockheight <lightning-waitblockheight.7.md>
lightning-waitinvoice <lightning-waitinvoice.7.md>

79
doc/lightning-wait.7.md Normal file
View File

@ -0,0 +1,79 @@
lightning-wait -- Command to wait for creations, changes and deletions
======================================================================
SYNOPSIS
--------
**wait** *subsystem* *indexname* *nextvalue*
DESCRIPTION
-----------
The **wait** RPC command returns once the index given by *indexname*
in *subsystem* reaches or exceeds *nextvalue*. All indexes start at 0, when no
events have happened (**wait** with a *nextvalue* of 0 is a way of getting
the current index, though naturally this is racy!).
*indexname* is one of `created`, `updated` or `deleted`:
- `created` is incremented by one for every new object.
- `updated` is incremented by one every time an object is changed.
- `deleted` is incremented by one every time an object is deleted.
*subsystem* is one of:
- `invoices`: corresponding to `listinvoices`.
RELIABILITY
-----------
Indices can go forward by more than one; in particlar, if multiple
objects were created and the one deleted, you could see this effect.
Similarly, there are some places (e.g. invoice expiration) where we
can update multiple entries at once.
Indices only monotoncally increase.
USAGE
-----
The **wait** RPC is used to track changes in the system. Consider
tracking invoices being paid or expiring. The simplest (and
inefficient method) would be:
1. Call `listinvoices` to get the current state of all invoices, and
remember the highest `updated_index`. Say it was 5.
2. Call `wait invoices updated 6`.
3. When it returns, call `listinvoices` again to see what changed.
This is obviously inefficient, so there are two optimizations:
1. Call `listinvoices` with `index=updated` and `start=6` to only see invoices
with `updated_index` greater than or equal to 6.
2. `wait` itself may also return some limited subset of fields from the list
command (it can't do this in all cases); for `invoices` this is `label`
and `status`, allowing many callers to avoid the `listinvoices` call.
RETURN VALUE
------------
FIXME
On error the returned object will contain `code` and `message` properties,
with `code` being one of the following:
- -32602: If the given parameters are wrong.
AUTHOR
------
Rusty Russell <<rusty@rustcorp.com.au>> is mainly
responsible.
SEE ALSO
--------
lightning-listinvoice(7)
RESOURCES
---------
Main web site: <https://github.com/ElementsProject/lightning>

View File

@ -39,6 +39,7 @@ LIGHTNINGD_SRC := \
lightningd/routehint.c \
lightningd/runes.c \
lightningd/subd.c \
lightningd/wait.c \
lightningd/watch.c
LIGHTNINGD_SRC_NOHDR := \

View File

@ -213,6 +213,7 @@ static struct lightningd *new_lightningd(const tal_t *ctx)
list_head_init(&ld->ping_commands);
list_head_init(&ld->disconnect_commands);
list_head_init(&ld->waitblockheight_commands);
list_head_init(&ld->wait_commands);
/*~ Tal also explicitly supports arrays: it stores the number of
* elements, which can be accessed with tal_count() (or tal_bytelen()

View File

@ -6,6 +6,7 @@
#include <lightningd/htlc_set.h>
#include <lightningd/options.h>
#include <lightningd/peer_control.h>
#include <lightningd/wait.h>
#include <signal.h>
#include <sys/stat.h>
#include <wallet/wallet.h>
@ -237,6 +238,8 @@ struct lightningd {
struct list_head ping_commands;
/* Outstanding disconnect commands. */
struct list_head disconnect_commands;
/* Outstanding wait commands */
struct list_head wait_commands;
/* Maintained by invoices.c */
struct invoices *invoices;
@ -265,6 +268,9 @@ struct lightningd {
/* Announce names in config as DNS records (recently BOLT 7 addition) */
bool announce_dns;
/* Indexes used by all the wait infra */
struct indexes indexes[NUM_WAIT_SUBSYSTEM];
#if DEVELOPER
/* If we want to debug a subdaemon/plugin. */
char *dev_debug_subprocess;

216
lightningd/wait.c Normal file
View File

@ -0,0 +1,216 @@
/* Code to be notified when various standardized events happen. */
#include "config.h"
#include <ccan/array_size/array_size.h>
#include <ccan/tal/str/str.h>
#include <common/json_command.h>
#include <common/overflows.h>
#include <db/exec.h>
#include <lightningd/jsonrpc.h>
#include <lightningd/lightningd.h>
#include <lightningd/wait.h>
struct waiter {
struct list_node list;
struct command *cmd;
/* These are pointers because of how param_ works */
enum wait_subsystem *subsystem;
enum wait_index *index;
u64 *nextval;
};
static const char *subsystem_names[] = {
"invoices",
};
static const char *index_names[] = {
"created",
"updated",
"deleted",
};
/* This is part of the API, so no changing! */
const char *wait_index_name(enum wait_index index)
{
switch (index) {
case WAIT_INDEX_CREATED:
case WAIT_INDEX_UPDATED:
case WAIT_INDEX_DELETED:
return index_names[index];
}
abort();
}
const char *wait_subsystem_name(enum wait_subsystem subsystem)
{
switch (subsystem) {
case WAIT_SUBSYSTEM_INVOICE:
return subsystem_names[subsystem];
}
abort();
}
static u64 *wait_index_ptr(struct lightningd *ld,
enum wait_subsystem subsystem,
enum wait_index index)
{
struct indexes *indexes;
assert(subsystem < ARRAY_SIZE(ld->indexes));
indexes = &ld->indexes[subsystem];
assert(index < ARRAY_SIZE(indexes->i));
return &indexes->i[index];
}
static void json_add_index(struct json_stream *response,
enum wait_subsystem subsystem,
enum wait_index index,
u64 val,
va_list *ap)
{
const char *name, *value;
json_add_string(response, "subsystem", wait_subsystem_name(subsystem));
json_add_u64(response, wait_index_name(index), val);
if (!ap)
return;
json_object_start(response, "details");
while ((name = va_arg(*ap, const char *)) != NULL) {
value = va_arg(*ap, const char *);
if (!value)
continue;
/* This is a hack! */
if (name[0] == '=') {
/* Copy in literallty! */
json_add_jsonstr(response, name + 1, value, strlen(value));
} else {
json_add_string(response, name, value);
}
}
json_object_end(response);
}
u64 wait_index_increment(struct lightningd *ld,
enum wait_subsystem subsystem,
enum wait_index index,
...)
{
struct waiter *i, *n;
va_list ap;
u64 *idxval = wait_index_ptr(ld, subsystem, index);
assert(!add_overflows_u64(*idxval, 1));
(*idxval)++;
/* FIXME: We can optimize this! It's always the max of the fields in
* the table, *unless* we delete one. So we can lazily write this on
* delete, and fix it up to MAX() when we startup. */
db_set_intvar(ld->wallet->db,
tal_fmt(tmpctx, "last_%s_%s_index",
wait_subsystem_name(subsystem),
wait_index_name(index)),
*idxval);
list_for_each_safe(&ld->wait_commands, i, n, list) {
struct json_stream *response;
if (*i->subsystem != subsystem)
continue;
if (*i->index != index)
continue;
if (*idxval < *i->nextval)
continue;
response = json_stream_success(i->cmd);
va_start(ap, index);
json_add_index(response, subsystem, index, *idxval, &ap);
va_end(ap);
/* Delete before freeing */
list_del_from(&ld->wait_commands, &i->list);
was_pending(command_success(i->cmd, response));
}
return *idxval;
}
static struct command_result *param_subsystem(struct command *cmd,
const char *name,
const char *buffer,
const jsmntok_t *tok,
enum wait_subsystem **subsystem)
{
for (size_t i = 0; i < ARRAY_SIZE(subsystem_names); i++) {
if (json_tok_streq(buffer, tok, subsystem_names[i])) {
*subsystem = tal(cmd, enum wait_subsystem);
**subsystem = i;
return NULL;
}
}
return command_fail_badparam(cmd, name, buffer, tok,
"unknown subsystem");
}
struct command_result *param_index(struct command *cmd,
const char *name,
const char *buffer,
const jsmntok_t *tok,
enum wait_index **index)
{
for (size_t i = 0; i < ARRAY_SIZE(index_names); i++) {
if (json_tok_streq(buffer, tok, index_names[i])) {
*index = tal(cmd, enum wait_index);
**index = i;
return NULL;
}
}
return command_fail_badparam(cmd, name, buffer, tok,
"unknown index");
}
static struct command_result *json_wait(struct command *cmd,
const char *buffer,
const jsmntok_t *obj UNNEEDED,
const jsmntok_t *params)
{
struct waiter *waiter = tal(cmd, struct waiter);
u64 val;
if (!param(cmd, buffer, params,
p_req("subsystem", param_subsystem,
&waiter->subsystem),
p_req("indexname", param_index, &waiter->index),
p_req("nextvalue", param_u64, &waiter->nextval),
NULL))
return command_param_failed();
/* Are we there already? Return immediately. */
val = *wait_index_ptr(cmd->ld, *waiter->subsystem, *waiter->index);
if (val >= *waiter->nextval) {
struct json_stream *response;
response = json_stream_success(cmd);
json_add_index(response,
*waiter->subsystem,
*waiter->index,
val, NULL);
return command_success(cmd, response);
}
waiter->cmd = cmd;
list_add_tail(&cmd->ld->wait_commands, &waiter->list);
return command_still_pending(cmd);
}
static const struct json_command wait_command = {
"wait",
"utility",
json_wait,
"Wait for {subsystem} {indexname} to reach or exceed {value})"
};
AUTODATA(json_command, &wait_command);

56
lightningd/wait.h Normal file
View File

@ -0,0 +1,56 @@
#ifndef LIGHTNING_LIGHTNINGD_WAIT_H
#define LIGHTNING_LIGHTNINGD_WAIT_H
#include "config.h"
#include <common/json_param.h>
struct lightningd;
/* This WAIT_SUBSYSTEM_X corresponds to listX */
enum wait_subsystem {
WAIT_SUBSYSTEM_INVOICE
};
#define NUM_WAIT_SUBSYSTEM (WAIT_SUBSYSTEM_INVOICE+1)
enum wait_index {
WAIT_INDEX_CREATED,
WAIT_INDEX_UPDATED,
WAIT_INDEX_DELETED,
};
#define NUM_WAIT_INDEX (WAIT_INDEX_DELETED+1)
/**
* structure for keeping created/updated/deleted indexes in the db
*/
struct indexes {
u64 i[NUM_WAIT_INDEX];
};
/* Get a string */
const char *wait_index_name(enum wait_index index);
const char *wait_subsystem_name(enum wait_subsystem subsystem);
/**
* wait_index_increment - increment an index, tell waiters.
* @ld: the lightningd
* @subsystem: subsystem for index
* @index: which index
* ...: name/value pairs, followed by NULL.
*
* Increase index, write to db, wake any waiters, give them any name/value pairs.
* If the value is NULL, omit that name.
* If the name starts with '=', the value is a JSON literal (and skip over the =)
*
* Returns the updated index value (always > 0).
*/
u64 LAST_ARG_NULL wait_index_increment(struct lightningd *ld,
enum wait_subsystem subsystem,
enum wait_index index,
...);
/* For passing in index parameters. */
struct command_result *param_index(struct command *cmd, const char *name,
const char *buffer,
const jsmntok_t *tok,
enum wait_index **index);
#endif /* LIGHTNING_LIGHTNINGD_WAIT_H */