lightningd/plugin_hook.c: Make db_write a chained hook.

Fixes: #4219

Changelog-Changed: Plugins: Multiple plugins can now register `db_write` hooks.
This commit is contained in:
ZmnSCPxj jxPCSnmZ 2020-11-23 13:05:56 +08:00 committed by Rusty Russell
parent 904e110554
commit 32de621886
4 changed files with 105 additions and 24 deletions

View File

@ -897,6 +897,14 @@ to error without
committing to the database!
This is the expected way to halt and catch fire.
`db_write` is a parallel-chained hook, i.e., multiple plugins can
register it, and all of them will be invoked simultaneously without
regard for order of registration.
The hook is considered handled if all registered plugins return
`{"result": "continue"}`.
If any plugin returns anything else, `lightningd` will error without
committing to the database.
### `invoice_payment`
This hook is called whenever a valid payment for an unpaid invoice has arrived.

View File

@ -294,20 +294,27 @@ bool plugin_hook_call_(struct lightningd *ld, const struct plugin_hook *hook,
* annoying, and to make it clear that it's totally synchronous. */
/* Special synchronous hook for db */
static struct plugin_hook db_write_hook = {"db_write", PLUGIN_HOOK_SINGLE, NULL,
static struct plugin_hook db_write_hook = {"db_write", PLUGIN_HOOK_CHAIN, NULL,
NULL, NULL};
AUTODATA(hooks, &db_write_hook);
/* A `db_write` for one particular plugin hook. */
struct db_write_hook_req {
struct plugin *plugin;
struct plugin_hook_request *ph_req;
size_t *num_hooks;
};
static void db_hook_response(const char *buffer, const jsmntok_t *toks,
const jsmntok_t *idtok,
struct plugin_hook_request *ph_req)
struct db_write_hook_req *dwh_req)
{
const jsmntok_t *resulttok;
resulttok = json_get_member(buffer, toks, "result");
if (!resulttok)
fatal("Plugin returned an invalid response to the db_write "
"hook: %s", buffer);
fatal("Plugin '%s' returned an invalid response to the "
"db_write hook: %s", dwh_req->plugin->cmd, buffer);
/* We expect result: { 'result' : 'continue' }.
* Anything else we abort.
@ -315,13 +322,23 @@ static void db_hook_response(const char *buffer, const jsmntok_t *toks,
resulttok = json_get_member(buffer, resulttok, "result");
if (resulttok) {
if (!json_tok_streq(buffer, resulttok, "continue"))
fatal("Plugin returned failed db_write: %s.", buffer);
fatal("Plugin '%s' returned failed db_write: %s.",
dwh_req->plugin->cmd,
buffer);
} else
fatal("Plugin returned an invalid result to the db_write "
"hook: %s", buffer);
fatal("Plugin '%s' returned an invalid result to the db_write "
"hook: %s",
dwh_req->plugin->cmd,
buffer);
assert((*dwh_req->num_hooks) != 0);
--(*dwh_req->num_hooks);
/* If there are other runners, do not exit yet. */
if ((*dwh_req->num_hooks) != 0)
return;
/* We're done, exit exclusive loop. */
io_break(ph_req);
io_break(dwh_req->ph_req);
}
void plugin_hook_db_sync(struct db *db)
@ -332,35 +349,47 @@ void plugin_hook_db_sync(struct db *db)
void *ret;
struct plugin **plugins;
size_t i;
size_t num_hooks;
const char **changes = db_changes(db);
if (tal_count(hook->hooks) == 0)
num_hooks = tal_count(hook->hooks);
if (num_hooks == 0)
return;
plugins = notleak(tal_arr(NULL, struct plugin *,
tal_count(hook->hooks)));
for (i = 0; i < tal_count(hook->hooks); ++i)
num_hooks));
for (i = 0; i < num_hooks; ++i)
plugins[i] = hook->hooks[i]->plugin;
ph_req = notleak(tal(hook->hooks, struct plugin_hook_request));
/* FIXME: do IO logging for this! */
req = jsonrpc_request_start(NULL, hook->name, NULL, NULL,
db_hook_response,
ph_req);
ph_req->hook = hook;
ph_req->db = db;
ph_req->plugin = hook->hooks[0]->plugin;
ph_req->cb_arg = &num_hooks;
json_add_num(req->stream, "data_version", db_data_version_get(db));
for (i = 0; i < num_hooks; ++i) {
/* Create an object for this plugin. */
struct db_write_hook_req *dwh_req;
dwh_req = tal(ph_req, struct db_write_hook_req);
dwh_req->plugin = plugins[i];
dwh_req->ph_req = ph_req;
dwh_req->num_hooks = &num_hooks;
json_array_start(req->stream, "writes");
for (size_t i = 0; i < tal_count(changes); i++)
json_add_string(req->stream, NULL, changes[i]);
json_array_end(req->stream);
jsonrpc_request_end(req);
/* FIXME: do IO logging for this! */
req = jsonrpc_request_start(NULL, hook->name, NULL, NULL,
db_hook_response,
dwh_req);
plugin_request_send(ph_req->plugin, req);
json_add_num(req->stream, "data_version",
db_data_version_get(db));
json_array_start(req->stream, "writes");
for (size_t i = 0; i < tal_count(changes); i++)
json_add_string(req->stream, NULL, changes[i]);
json_array_end(req->stream);
jsonrpc_request_end(req);
plugin_request_send(plugins[i], req);
}
/* We can be called on way out of an io_loop, which is already breaking.
* That will make this immediately return; save the break value and call
@ -371,7 +400,9 @@ void plugin_hook_db_sync(struct db *db)
assert(ret2 == ph_req);
io_break(ret);
}
assert(num_hooks == 0);
tal_free(plugins);
tal_free(ph_req);
}
static void add_deps(const char ***arr,

16
tests/plugins/dbdummy.py Executable file
View File

@ -0,0 +1,16 @@
#! /usr/bin/env python3
'''This plugin is a do-nothing backup plugin which just checks that we
can handle multiple backup plugins.
'''
from pyln.client import Plugin
plugin = Plugin()
@plugin.hook('db_write')
def db_write(plugin, **kwargs):
return {'result': 'continue'}
plugin.run()

View File

@ -478,6 +478,32 @@ def test_db_hook(node_factory, executor):
assert [x for x in db1.iterdump()] == [x for x in db2.iterdump()]
@unittest.skipIf(os.getenv('TEST_DB_PROVIDER', 'sqlite3') != 'sqlite3', "Only sqlite3 implements the db_write_hook currently")
def test_db_hook_multiple(node_factory, executor):
"""This tests the db hook for multiple-plugin case."""
dbfile = os.path.join(node_factory.directory, "dblog.sqlite3")
l1 = node_factory.get_node(options={'plugin': os.path.join(os.getcwd(), 'tests/plugins/dblog.py'),
'important-plugin': os.path.join(os.getcwd(), 'tests/plugins/dbdummy.py'),
'dblog-file': dbfile})
# It should see the db being created, and sometime later actually get
# initted.
# This precedes startup, so needle already past
assert l1.daemon.is_in_log(r'plugin-dblog.py: deferring \d+ commands')
l1.daemon.logsearch_start = 0
l1.daemon.wait_for_log('plugin-dblog.py: replaying pre-init data:')
l1.daemon.wait_for_log('plugin-dblog.py: CREATE TABLE version \\(version INTEGER\\)')
l1.daemon.wait_for_log("plugin-dblog.py: initialized.* 'startup': True")
l1.stop()
# Databases should be identical.
db1 = sqlite3.connect(os.path.join(l1.daemon.lightning_dir, TEST_NETWORK, 'lightningd.sqlite3'))
db2 = sqlite3.connect(dbfile)
assert [x for x in db1.iterdump()] == [x for x in db2.iterdump()]
def test_utf8_passthrough(node_factory, executor):
l1 = node_factory.get_node(options={'plugin': os.path.join(os.getcwd(), 'tests/plugins/utf8.py'),
'log-level': 'io'})