core-lightning/lightningd/wait.c
2024-07-31 14:42:58 +09:30

219 lines
5.2 KiB
C

/* Code to be notified when various standardized events happen. */
#include "config.h"
#include <ccan/array_size/array_size.h>
#include <ccan/tal/str/str.h>
#include <common/json_command.h>
#include <common/overflows.h>
#include <db/exec.h>
#include <lightningd/jsonrpc.h>
#include <lightningd/lightningd.h>
#include <lightningd/wait.h>
struct waiter {
struct list_node list;
struct command *cmd;
/* These are pointers because of how param_ works */
enum wait_subsystem *subsystem;
enum wait_index *index;
u64 *nextval;
};
static const char *subsystem_names[] = {
"forwards",
"sendpays",
"invoices",
};
static const char *index_names[] = {
"created",
"updated",
"deleted",
};
/* This is part of the API, so no changing! */
const char *wait_index_name(enum wait_index index)
{
switch (index) {
case WAIT_INDEX_CREATED:
case WAIT_INDEX_UPDATED:
case WAIT_INDEX_DELETED:
return index_names[index];
}
abort();
}
const char *wait_subsystem_name(enum wait_subsystem subsystem)
{
switch (subsystem) {
case WAIT_SUBSYSTEM_FORWARD:
case WAIT_SUBSYSTEM_SENDPAY:
case WAIT_SUBSYSTEM_INVOICE:
return subsystem_names[subsystem];
}
abort();
}
static u64 *wait_index_ptr(struct lightningd *ld,
enum wait_subsystem subsystem,
enum wait_index index)
{
struct indexes *indexes;
assert(subsystem < ARRAY_SIZE(ld->indexes));
indexes = &ld->indexes[subsystem];
assert(index < ARRAY_SIZE(indexes->i));
return &indexes->i[index];
}
static void json_add_index(struct json_stream *response,
enum wait_subsystem subsystem,
enum wait_index index,
u64 val,
va_list *ap)
{
const char *name, *value;
json_add_string(response, "subsystem", wait_subsystem_name(subsystem));
json_add_u64(response, wait_index_name(index), val);
if (!ap)
return;
json_object_start(response, "details");
while ((name = va_arg(*ap, const char *)) != NULL) {
value = va_arg(*ap, const char *);
if (!value)
continue;
/* This is a hack! */
if (name[0] == '=') {
/* Copy in literallty! */
json_add_jsonstr(response, name + 1, value, strlen(value));
} else {
json_add_string(response, name, value);
}
}
json_object_end(response);
}
u64 wait_index_increment(struct lightningd *ld,
enum wait_subsystem subsystem,
enum wait_index index,
...)
{
struct waiter *i, *n;
va_list ap;
u64 *idxval = wait_index_ptr(ld, subsystem, index);
assert(!add_overflows_u64(*idxval, 1));
(*idxval)++;
/* FIXME: We can optimize this! It's always the max of the fields in
* the table, *unless* we delete one. So we can lazily write this on
* delete, and fix it up to MAX() when we startup. */
db_set_intvar(ld->wallet->db,
tal_fmt(tmpctx, "last_%s_%s_index",
wait_subsystem_name(subsystem),
wait_index_name(index)),
*idxval);
list_for_each_safe(&ld->wait_commands, i, n, list) {
struct json_stream *response;
if (*i->subsystem != subsystem)
continue;
if (*i->index != index)
continue;
if (*idxval < *i->nextval)
continue;
response = json_stream_success(i->cmd);
va_start(ap, index);
json_add_index(response, subsystem, index, *idxval, &ap);
va_end(ap);
/* Delete before freeing */
list_del_from(&ld->wait_commands, &i->list);
was_pending(command_success(i->cmd, response));
}
return *idxval;
}
static struct command_result *param_subsystem(struct command *cmd,
const char *name,
const char *buffer,
const jsmntok_t *tok,
enum wait_subsystem **subsystem)
{
for (size_t i = 0; i < ARRAY_SIZE(subsystem_names); i++) {
if (json_tok_streq(buffer, tok, subsystem_names[i])) {
*subsystem = tal(cmd, enum wait_subsystem);
**subsystem = i;
return NULL;
}
}
return command_fail_badparam(cmd, name, buffer, tok,
"unknown subsystem");
}
struct command_result *param_index(struct command *cmd,
const char *name,
const char *buffer,
const jsmntok_t *tok,
enum wait_index **index)
{
for (size_t i = 0; i < ARRAY_SIZE(index_names); i++) {
if (json_tok_streq(buffer, tok, index_names[i])) {
*index = tal(cmd, enum wait_index);
**index = i;
return NULL;
}
}
return command_fail_badparam(cmd, name, buffer, tok,
"unknown index");
}
static struct command_result *json_wait(struct command *cmd,
const char *buffer,
const jsmntok_t *obj UNNEEDED,
const jsmntok_t *params)
{
struct waiter *waiter = tal(cmd, struct waiter);
u64 val;
if (!param(cmd, buffer, params,
p_req("subsystem", param_subsystem,
&waiter->subsystem),
p_req("indexname", param_index, &waiter->index),
p_req("nextvalue", param_u64, &waiter->nextval),
NULL))
return command_param_failed();
/* Are we there already? Return immediately. */
val = *wait_index_ptr(cmd->ld, *waiter->subsystem, *waiter->index);
if (val >= *waiter->nextval) {
struct json_stream *response;
response = json_stream_success(cmd);
json_add_index(response,
*waiter->subsystem,
*waiter->index,
val, NULL);
return command_success(cmd, response);
}
waiter->cmd = cmd;
list_add_tail(&cmd->ld->wait_commands, &waiter->list);
return command_still_pending(cmd);
}
static const struct json_command wait_command = {
"wait",
json_wait,
};
AUTODATA(json_command, &wait_command);