autoclean: call list in easy stages.

listforwards on a large node can easily run out of memory.  Sip, don't
gulp!

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell 2024-06-20 11:34:04 +09:30
parent c62e1f432f
commit ead211e5e4
2 changed files with 132 additions and 14 deletions

View file

@ -13,6 +13,7 @@ static struct clean_info *timer_cinfo;
static struct plugin *plugin;
/* This is NULL if it's running now. */
static struct plugin_timer *cleantimer;
static u64 max_entries_per_call = 10000;
enum subsystem_type {
FORWARDS,
@ -34,8 +35,8 @@ struct subsystem_ops {
/* "success" and "failure" names for JSON formatting. */
const char *names[NUM_SUBSYSTEM_VARIANTS];
/* name of "list" command */
const char *list_command;
/* name of system for wait and "list" */
const char *system_name;
/* Name of array inside "list" command return */
const char *arr_name;
@ -89,7 +90,7 @@ static void add_forward_del_fields(struct out_req *req,
static const struct subsystem_ops subsystem_ops[NUM_SUBSYSTEM_TYPES] = {
{ {"succeededforwards", "failedforwards"},
"listforwards",
"forwards",
"forwards",
"\"in_channel\":true,\"in_htlc_id\":true,\"resolved_time\":true,\"received_time\":true,\"status\":true",
"delforward",
@ -97,7 +98,7 @@ static const struct subsystem_ops subsystem_ops[NUM_SUBSYSTEM_TYPES] = {
add_forward_del_fields,
},
{ {"succeededpays", "failedpays"},
"listsendpays",
"sendpays",
"payments",
"\"created_at\":true,\"status\":true,\"payment_hash\":true,\"groupid\":true,\"partid\":true",
"delpay",
@ -105,7 +106,7 @@ static const struct subsystem_ops subsystem_ops[NUM_SUBSYSTEM_TYPES] = {
add_sendpays_del_fields,
},
{ {"paidinvoices", "expiredinvoices"},
"listinvoices",
"invoices",
"invoices",
"\"label\":true,\"status\":true,\"expires_at\":true,\"paid_at\":true",
"delinvoice",
@ -170,6 +171,10 @@ struct per_subsystem {
struct clean_info *cinfo;
enum subsystem_type type;
/* How far are we through the listing? */
u64 offset, max;
/* How many did we ignore? */
u64 num_uncleaned;
struct per_variant variants[NUM_SUBSYSTEM_VARIANTS];
};
@ -202,6 +207,7 @@ static const struct subsystem_ops *get_subsystem_ops(const struct per_subsystem
/* Mutual recursion */
static void do_clean_timer(void *unused);
static struct command_result *do_clean(struct clean_info *cinfo);
static struct clean_info *new_clean_info(const tal_t *ctx,
struct command *cmd)
@ -214,7 +220,6 @@ static struct clean_info *new_clean_info(const tal_t *ctx,
struct per_subsystem *ps = &cinfo->per_subsystem[i];
ps->cinfo = cinfo;
ps->type = i;
ps->num_uncleaned = 0;
for (enum subsystem_variant j = 0; j < NUM_SUBSYSTEM_VARIANTS; j++) {
struct per_variant *pv = &ps->variants[j];
@ -294,7 +299,8 @@ static struct command_result *clean_finished_one(struct clean_info *cinfo)
if (--cinfo->cleanup_reqs_remaining > 0)
return command_still_pending(cinfo->cmd);
return clean_finished(cinfo);
/* See if there are more entries we need to list. */
return do_clean(cinfo);
}
static struct command_result *del_done(struct command *cmd,
@ -508,6 +514,7 @@ static struct command_result *list_done(struct command *cmd,
send_outreq(plugin, req);
}
subsystem->offset += max_entries_per_call;
return clean_finished_one(subsystem->cinfo);
}
@ -516,8 +523,8 @@ static struct command_result *list_failed(struct command *cmd,
const jsmntok_t *result,
struct per_subsystem *subsystem)
{
plugin_err(plugin, "Failed '%s': '%.*s'",
get_subsystem_ops(subsystem)->list_command,
plugin_err(plugin, "Failed 'list%s': '%.*s'",
get_subsystem_ops(subsystem)->system_name,
json_tok_full_len(result),
json_tok_full(buf, result));
}
@ -532,9 +539,7 @@ static struct command_result *do_clean(struct clean_info *cinfo)
const char *filter;
const struct subsystem_ops *ops = get_subsystem_ops(ps);
ps->num_uncleaned = 0;
for (size_t j = 0; j < NUM_SUBSYSTEM_VARIANTS; j++) {
ps->variants[j].num_cleaned = 0;
if (ps->variants[j].age)
have_variant = true;
}
@ -543,13 +548,24 @@ static struct command_result *do_clean(struct clean_info *cinfo)
if (!have_variant)
continue;
/* Don't bother if we're past the end already. */
if (ps->offset >= ps->max)
continue;
filter = tal_fmt(tmpctx, "{\"%s\":[{%s}]}",
ops->arr_name, ops->list_filter);
req = jsonrpc_request_with_filter_start(plugin, NULL,
ops->list_command,
tal_fmt(tmpctx,
"list%s",
ops->system_name),
filter,
list_done, list_failed,
ps);
/* Don't overwhelm lightningd or us if there are millions of
* entries! */
json_add_string(req->js, "index", "created");
json_add_u64(req->js, "start", ps->offset);
json_add_u64(req->js, "limit", max_entries_per_call);
send_outreq(plugin, req);
cinfo->cleanup_reqs_remaining++;
}
@ -559,12 +575,77 @@ static struct command_result *do_clean(struct clean_info *cinfo)
return clean_finished(cinfo);
}
static struct command_result *wait_done(struct command *cmd,
const char *buf,
const jsmntok_t *result,
struct per_subsystem *ps)
{
const char *err;
err = json_scan(tmpctx, buf, result, "{created:%}",
JSON_SCAN(json_to_u64, &ps->max));
if (err)
plugin_err(plugin, "Failed parsing wait response: (%s): '%.*s'",
err,
json_tok_full_len(result),
json_tok_full(buf, result));
/* We do three of these, make sure they're all complete. */
assert(ps->cinfo->cleanup_reqs_remaining != 0);
if (--ps->cinfo->cleanup_reqs_remaining > 0)
return command_still_pending(ps->cinfo->cmd);
return do_clean(ps->cinfo);
}
static struct command_result *wait_failed(struct command *cmd,
const char *buf,
const jsmntok_t *result,
struct per_subsystem *subsystem)
{
plugin_err(plugin, "Failed wait '%s': '%.*s'",
get_subsystem_ops(subsystem)->system_name,
json_tok_full_len(result),
json_tok_full(buf, result));
}
static struct command_result *start_clean(struct clean_info *cinfo)
{
cinfo->cleanup_reqs_remaining = 0;
/* We have to get max indexes first. */
for (size_t i = 0; i < NUM_SUBSYSTEM_TYPES; i++) {
struct per_subsystem *ps = &cinfo->per_subsystem[i];
const struct subsystem_ops *ops = get_subsystem_ops(ps);
struct out_req *req;
/* Reset counters while we're here */
ps->num_uncleaned = 0;
for (enum subsystem_variant j = 0; j < NUM_SUBSYSTEM_VARIANTS; j++) {
struct per_variant *pv = &ps->variants[j];
pv->num_cleaned = 0;
}
ps->offset = 0;
req = jsonrpc_request_start(plugin, NULL,
"wait",
wait_done, wait_failed, ps);
json_add_string(req->js, "subsystem", ops->system_name);
json_add_string(req->js, "indexname", "created");
json_add_u64(req->js, "nextvalue", 0);
send_outreq(plugin, req);
cinfo->cleanup_reqs_remaining++;
}
return command_still_pending(cinfo->cmd);
}
/* Needs a different signature than do_clean */
static void do_clean_timer(void *unused)
{
assert(timer_cinfo->cleanup_reqs_remaining == 0);
cleantimer = NULL;
do_clean(timer_cinfo);
start_clean(timer_cinfo);
}
static struct command_result *param_subsystem(struct command *cmd,
@ -650,7 +731,7 @@ static struct command_result *json_autoclean_once(struct command *cmd,
cinfo = new_clean_info(cmd, cmd);
get_per_variant(cinfo, sv)->age = *age;
return do_clean(cinfo);
return start_clean(cinfo);
}
static void memleak_mark_timer_cinfo(struct plugin *plugin,
@ -768,5 +849,10 @@ int main(int argc, char *argv[])
"How old do expired invoices have to be before deletion (0 = never)",
u64_option, u64_jsonfmt_unless_zero,
&timer_cinfo->per_subsystem[INVOICES].variants[FAILURE].age),
plugin_option_dev_dynamic("dev-autoclean-max-batch",
"int",
"Maximum cleans to do at a time",
u64_option, u64_jsonfmt,
&max_entries_per_call),
NULL);
}

View file

@ -4352,3 +4352,35 @@ def test_plugin_startdir_lol(node_factory):
"""Though we fail to start many of them, we don't crash!"""
l1 = node_factory.get_node(broken_log='.*')
l1.rpc.plugin_startdir(os.path.join(os.getcwd(), 'tests/plugins'))
def test_autoclean_batch(node_factory):
l1 = node_factory.get_node(1)
# Many expired invoices
for i in range(100):
l1.rpc.invoice(amount_msat=12300, label=f'inv1{i}', description='description', expiry=1)
time.sleep(3)
l1.rpc.setconfig('dev-autoclean-max-batch', 2)
# Test manual clean
ret = l1.rpc.autoclean_once('expiredinvoices', 1)
assert ret == {'autoclean': {'expiredinvoices': {'cleaned': 100, 'uncleaned': 0}}}
for i in range(100):
l1.rpc.invoice(amount_msat=12300, label=f'inv2{i}', description='description', expiry=1)
time.sleep(3)
# Test cycle clean
assert (l1.rpc.autoclean_status('expiredinvoices')
== {'autoclean': {'expiredinvoices': {'enabled': False, 'cleaned': 100}}})
l1.rpc.setconfig('autoclean-expiredinvoices-age', 2)
assert (l1.rpc.autoclean_status('expiredinvoices')
== {'autoclean': {'expiredinvoices': {'enabled': True, 'cleaned': 100, 'age': 2}}})
l1.rpc.setconfig('autoclean-cycle', 5)
wait_for(lambda: l1.rpc.autoclean_status('expiredinvoices')
== {'autoclean': {'expiredinvoices': {'enabled': True, 'cleaned': 200, 'age': 2}}})