lightningd: allow a connection to specify db batching.

Previous commit was a hack which *always* batched where possible, this
is a more sophisticated opt-in varaint, with a timeout sanity check.

Final performance for cleaning up 1M pays/forwards/invoices:

```
$ time l1-cli autoclean-once succeededpays 1
{
   "autoclean": {
      "succeededpays": {
         "cleaned": 1000000,
         "uncleaned": 26895
      }
   }
}

real	6m9.828s
user	0m0.003s
sys	0m0.001s
$ time l2-cli autoclean-once succeededforwards 1
{
   "autoclean": {
      "succeededforwards": {
         "cleaned": 1000000,
         "uncleaned": 40
      }
   }
}

real	3m20.789s
user	0m0.004s
sys	0m0.001s
$ time l3-cli autoclean-once paidinvoices 1
{
   "autoclean": {
      "paidinvoices": {
         "cleaned": 1000000,
         "uncleaned": 0
      }
   }
}

real	6m47.941s
user	0m0.001s
sys	0m0.000s
```

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
Changelog-Added: JSON-RPC: `batching` command to allow database transactions to cross multiple back-to-back JSON commands.
This commit is contained in:
Rusty Russell 2022-09-19 10:24:23 +09:30 committed by Christian Decker
parent 555b8a2f7a
commit fa7d732ba6
9 changed files with 167 additions and 11 deletions

View file

@ -9,6 +9,7 @@ MANPAGES := doc/lightning-cli.1 \
doc/lightningd-config.5 \
doc/lightning-addgossip.7 \
doc/lightning-autoclean-status.7 \
doc/lightning-batching.7 \
doc/lightning-bkpr-channelsapy.7 \
doc/lightning-bkpr-dumpincomecsv.7 \
doc/lightning-bkpr-inspect.7 \

View file

@ -31,6 +31,7 @@ Core Lightning Documentation
lightning-addgossip <lightning-addgossip.7.md>
lightning-autoclean-status <lightning-autoclean-status.7.md>
lightning-batching <lightning-batching.7.md>
lightning-bkpr-channelsapy <lightning-bkpr-channelsapy.7.md>
lightning-bkpr-dumpincomecsv <lightning-bkpr-dumpincomecsv.7.md>
lightning-bkpr-inspect <lightning-bkpr-inspect.7.md>

View file

@ -0,0 +1,55 @@
lightning-batching -- Command to allow database batching.
=========================================================
SYNOPSIS
--------
**batching** *enable*
DESCRIPTION
-----------
The **batching** RPC command allows (but does not guarantee!) database
commitments to be deferred when multiple commands are issued on this RPC
connection. This is only useful if many commands are being given at once, in
which case it can offer a performance improvement (the cost being that if
there is a crash, it's unclear how many of the commands will have been
persisted).
*enable* is *true* to enable batching, *false* to disable it (the
default).
EXAMPLE JSON REQUEST
--------------------
```json
{
"id": 82,
"method": "batching",
"params": {
"enable": true
}
}
```
RETURN VALUE
------------
[comment]: # (GENERATE-FROM-SCHEMA-START)
On success, an empty object is returned.
[comment]: # (GENERATE-FROM-SCHEMA-END)
On failure, one of the following error codes may be returned:
- -32602: Error in given parameters.
AUTHOR
------
Rusty Russell <<rusty@blockstream.com>> wrote the initial version of this man page.
RESOURCES
---------
Main web site: <https://github.com/ElementsProject/lightning>
[comment]: # ( SHA256STAMP:326e5801f65998e13e909d8b682e9fbc9824f3a43aa7da1d76b871882e52f293)

View file

@ -0,0 +1,14 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": false,
"required": [
"enable"
],
"properties": {
"enable": {
"type": "boolean",
"description": "Whether to enable or disable transaction batching"
}
}
}

View file

@ -0,0 +1,6 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": false,
"properties": {}
}

View file

@ -83,6 +83,9 @@ struct json_connection {
/* Are notifications enabled? */
bool notifications_enabled;
/* Are we allowed to batch database commitments? */
bool db_batching;
/* Our json_streams (owned by the commands themselves while running).
* Since multiple streams could start returning data at once, we
* always service these in order, freeing once empty. */
@ -1006,6 +1009,7 @@ static struct io_plan *read_json(struct io_conn *conn,
{
bool complete;
bool in_transaction = false;
struct timemono start_time = time_mono();
if (jcon->len_read)
log_io(jcon->log, LOG_IO_IN, NULL, "",
@ -1063,8 +1067,24 @@ again:
jsmn_init(&jcon->input_parser);
toks_reset(jcon->input_toks);
if (jcon->used)
/* Do we have more already read? */
if (jcon->used) {
if (!jcon->db_batching) {
db_commit_transaction(jcon->ld->wallet->db);
in_transaction = false;
} else {
/* FIXME: io_always() should interleave with
* real IO, and then we should rotate order we
* service fds in, to avoid starvation. */
if (time_greater(timemono_between(time_mono(),
start_time),
time_from_msec(250))) {
db_commit_transaction(jcon->ld->wallet->db);
return io_always(conn, read_json, jcon);
}
}
goto again;
}
read_more:
if (in_transaction)
@ -1090,6 +1110,7 @@ static struct io_plan *jcon_connected(struct io_conn *conn,
jsmn_init(&jcon->input_parser);
jcon->input_toks = toks_alloc(jcon);
jcon->notifications_enabled = false;
jcon->db_batching = false;
list_head_init(&jcon->commands);
/* We want to log on destruction, so we free this in destructor. */
@ -1436,7 +1457,6 @@ static const struct json_command check_command = {
"Don't run {command_to_check}, just verify parameters.",
.verbose = "check command_to_check [parameters...]\n"
};
AUTODATA(json_command, &check_command);
static struct command_result *json_notifications(struct command *cmd,
@ -1461,7 +1481,32 @@ static const struct json_command notifications_command = {
"notifications",
"utility",
json_notifications,
"Enable notifications for {level} (or 'false' to disable)",
"{enable} notifications",
};
AUTODATA(json_command, &notifications_command);
static struct command_result *json_batching(struct command *cmd,
const char *buffer,
const jsmntok_t *obj UNNEEDED,
const jsmntok_t *params)
{
bool *enable;
if (!param(cmd, buffer, params,
p_req("enable", param_bool, &enable),
NULL))
return command_param_failed();
/* Catch the case where they sent this command then hung up. */
if (cmd->jcon)
cmd->jcon->db_batching = *enable;
return command_success(cmd, json_stream_success(cmd));
}
static const struct json_command batching_command = {
"batching",
"utility",
json_batching,
"Database transaction batching {enable}",
};
AUTODATA(json_command, &batching_command);

View file

@ -552,6 +552,9 @@ static const char *init(struct plugin *p,
rpc_scan_datastore_str(plugin, datastore_path(tmpctx, i, "num"),
JSON_SCAN(json_to_u64, &total_cleaned[i]));
}
/* Optimization FTW! */
rpc_enable_batching(p);
return NULL;
}

View file

@ -520,17 +520,16 @@ static const jsmntok_t *read_rpc_reply(const tal_t *ctx,
return toks;
}
static const char *rpc_scan_core(const tal_t *ctx,
/* Send request, return response, set resp/len to reponse */
static const jsmntok_t *sync_req(const tal_t *ctx,
struct plugin *plugin,
const char *method,
const struct json_out *params TAKES,
const char *guide,
va_list ap)
const char **resp)
{
bool error;
const jsmntok_t *contents;
int reqlen;
const char *p;
struct json_out *jout = json_out_new(tmpctx);
json_out_start(jout, NULL, '{');
@ -542,12 +541,27 @@ static const char *rpc_scan_core(const tal_t *ctx,
tal_free(params);
finish_and_send_json(plugin->rpc_conn->fd, jout);
read_rpc_reply(tmpctx, plugin, &contents, &error, &reqlen);
read_rpc_reply(ctx, plugin, &contents, &error, &reqlen);
if (error)
plugin_err(plugin, "Got error reply to %s: '%.*s'",
method, reqlen, membuf_elems(&plugin->rpc_conn->mb));
method, reqlen, membuf_elems(&plugin->rpc_conn->mb));
p = membuf_consume(&plugin->rpc_conn->mb, reqlen);
*resp = membuf_consume(&plugin->rpc_conn->mb, reqlen);
return contents;
}
/* Returns contents of scanning guide on 'result' */
static const char *rpc_scan_core(const tal_t *ctx,
struct plugin *plugin,
const char *method,
const struct json_out *params TAKES,
const char *guide,
va_list ap)
{
const jsmntok_t *contents;
const char *p;
contents = sync_req(tmpctx, plugin, method, params, &p);
return json_scanv(ctx, p, contents, guide, ap);
}
@ -631,6 +645,21 @@ bool rpc_scan_datastore_hex(struct plugin *plugin,
return ret;
}
void rpc_enable_batching(struct plugin *plugin)
{
const char *p;
struct json_out *params;
params = json_out_new(NULL);
json_out_start(params, NULL, '{');
json_out_add(params, "enable", false, "true");
json_out_end(params, '}');
json_out_finished(params);
/* We don't actually care about (empty) response */
sync_req(tmpctx, plugin, "batching", take(params), &p);
}
static struct command_result *datastore_fail(struct command *command,
const char *buf,
const jsmntok_t *result,

View file

@ -283,6 +283,8 @@ bool rpc_scan_datastore_hex(struct plugin *plugin,
const char *path,
...);
/* This sets batching of database commitments */
void rpc_enable_batching(struct plugin *plugin);
/* Send an async rpc request to lightningd. */
struct command_result *send_outreq(struct plugin *plugin,