mirror of
https://github.com/ElementsProject/lightning.git
synced 2025-03-03 02:39:28 +01:00
plugin: Add request muxing to the plugin subsystem
This commit is contained in:
parent
d0de6e59c6
commit
084224f134
1 changed files with 113 additions and 1 deletions
|
@ -1,5 +1,6 @@
|
|||
#include "lightningd/plugin.h"
|
||||
|
||||
#include <ccan/intmap/intmap.h>
|
||||
#include <ccan/io/io.h>
|
||||
#include <ccan/list/list.h>
|
||||
#include <ccan/pipecmd/pipecmd.h>
|
||||
|
@ -13,6 +14,7 @@ struct plugin {
|
|||
char *cmd;
|
||||
struct io_conn *stdin_conn, *stdout_conn;
|
||||
bool stop;
|
||||
struct plugins *plugins;
|
||||
|
||||
/* Stuff we read */
|
||||
char *buffer;
|
||||
|
@ -23,8 +25,27 @@ struct plugin {
|
|||
const char *outbuf;
|
||||
};
|
||||
|
||||
struct plugin_request {
|
||||
u64 id;
|
||||
/* Method to be called */
|
||||
const char *method;
|
||||
|
||||
/* JSON encoded params, either a dict or an array */
|
||||
const char *json_params;
|
||||
const char *response;
|
||||
const jsmntok_t *resulttok, *errortok;
|
||||
|
||||
/* The response handler to be called on success or error */
|
||||
void (*cb)(const struct plugin_request *, void *);
|
||||
void *arg;
|
||||
};
|
||||
|
||||
struct plugins {
|
||||
struct plugin **plugins;
|
||||
size_t pending_init;
|
||||
|
||||
/* Currently pending requests by their request ID */
|
||||
UINTMAP(struct plugin_request *) pending_requests;
|
||||
};
|
||||
|
||||
struct json_output {
|
||||
|
@ -46,6 +67,7 @@ void plugin_register(struct plugins *plugins, const char* path TAKES)
|
|||
tal_resize(&plugins->plugins, n+1);
|
||||
p = tal(plugins, struct plugin);
|
||||
plugins->plugins[n] = p;
|
||||
p->plugins = plugins;
|
||||
p->cmd = tal_strdup(p, path);
|
||||
}
|
||||
|
||||
|
@ -59,6 +81,13 @@ static bool plugin_read_json_one(struct plugin *plugin)
|
|||
{
|
||||
jsmntok_t *toks;
|
||||
bool valid;
|
||||
u64 id;
|
||||
const jsmntok_t *idtok, *resulttok, *errortok;
|
||||
struct plugin_request *request;
|
||||
|
||||
/* FIXME: This could be done more efficiently by storing the
|
||||
* toks and doing an incremental parse, like lightning-cli
|
||||
* does. */
|
||||
toks = json_parse_input(plugin->buffer, plugin->used, &valid);
|
||||
if (!toks) {
|
||||
if (!valid) {
|
||||
|
@ -75,7 +104,35 @@ static bool plugin_read_json_one(struct plugin *plugin)
|
|||
return false;
|
||||
}
|
||||
|
||||
/* FIXME(cdecker) Call dispatch to handle this message. */
|
||||
resulttok = json_get_member(plugin->buffer, toks, "result");
|
||||
errortok = json_get_member(plugin->buffer, toks, "error");
|
||||
idtok = json_get_member(plugin->buffer, toks, "id");
|
||||
|
||||
/* FIXME(cdecker) Kill the plugin if either of these fails */
|
||||
if (!idtok) {
|
||||
return false;
|
||||
} else if (!resulttok && !errortok) {
|
||||
return false;
|
||||
}
|
||||
|
||||
/* We only send u64 ids, so if this fails it's a critical error */
|
||||
if (!json_to_u64(plugin->buffer, idtok, &id)) {
|
||||
/* FIXME (cdecker) Log an error message and kill the plugin */
|
||||
return false;
|
||||
}
|
||||
|
||||
request = uintmap_get(&plugin->plugins->pending_requests, id);
|
||||
|
||||
if (!request) {
|
||||
/* FIXME(cdecker) Log an error and kill the plugin */
|
||||
return false;
|
||||
}
|
||||
|
||||
/* We expect the request->cb to copy if needed */
|
||||
request->response = plugin->buffer;
|
||||
request->errortok = errortok;
|
||||
request->resulttok = resulttok;
|
||||
request->cb(request, request->arg);
|
||||
|
||||
/* Move this object out of the buffer */
|
||||
memmove(plugin->buffer, plugin->buffer + toks[0].end,
|
||||
|
@ -126,6 +183,44 @@ static struct io_plan *plugin_write_json(struct io_conn *conn UNUSED,
|
|||
plugin_write_json, plugin);
|
||||
}
|
||||
|
||||
static void plugin_request_send_(
|
||||
struct plugin *plugin, const char *method TAKES, const char *params TAKES,
|
||||
void (*cb)(const struct plugin_request *, void *), void *arg)
|
||||
{
|
||||
static u64 next_request_id = 0;
|
||||
struct plugin_request *req = tal(plugin, struct plugin_request);
|
||||
struct json_output *out = tal(plugin, struct json_output);
|
||||
u64 request_id = next_request_id++;
|
||||
|
||||
req->id = request_id;
|
||||
req->method = tal_strdup(req, method);
|
||||
req->json_params = tal_strdup(req, params);
|
||||
req->cb = cb;
|
||||
req->arg = arg;
|
||||
|
||||
/* Add to map so we can find it later when routing the response */
|
||||
uintmap_add(&plugin->plugins->pending_requests, req->id, req);
|
||||
|
||||
/* Wrap the request in the JSON-RPC request object */
|
||||
out->json = tal_fmt(out, "{"
|
||||
"\"jsonrpc\": \"2.0\", "
|
||||
"\"method\": \"%s\", "
|
||||
"\"params\" : %s, "
|
||||
"\"id\" : %" PRIu64 " }\n",
|
||||
method, params, request_id);
|
||||
|
||||
/* Queue and notify the writer */
|
||||
list_add_tail(&plugin->output, &out->list);
|
||||
io_wake(plugin);
|
||||
}
|
||||
|
||||
#define plugin_request_send(plugin, method, params, cb, arg) \
|
||||
plugin_request_send_( \
|
||||
(plugin), (method), (params), \
|
||||
typesafe_cb_preargs(void, void *, (cb), (arg), \
|
||||
const struct plugin_request *), \
|
||||
(arg))
|
||||
|
||||
static struct io_plan *plugin_conn_init(struct io_conn *conn,
|
||||
struct plugin *plugin)
|
||||
{
|
||||
|
@ -146,10 +241,24 @@ static struct io_plan *plugin_conn_init(struct io_conn *conn,
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Callback for the plugin_init request.
|
||||
*/
|
||||
static void plugin_init_cb(const struct plugin_request *req, struct plugin *plugin)
|
||||
{
|
||||
/* Check if all plugins are initialized, and break if they are */
|
||||
plugin->plugins->pending_init--;
|
||||
if (plugin->plugins->pending_init == 0)
|
||||
io_break(plugin->plugins);
|
||||
}
|
||||
|
||||
void plugins_init(struct plugins *plugins)
|
||||
{
|
||||
struct plugin *p;
|
||||
char **cmd;
|
||||
plugins->pending_init = tal_count(plugins->plugins);
|
||||
uintmap_init(&plugins->pending_requests);
|
||||
|
||||
/* Spawn the plugin processes before entering the io_loop */
|
||||
for (size_t i=0; i<tal_count(plugins->plugins); i++) {
|
||||
p = plugins->plugins[i];
|
||||
|
@ -166,7 +275,10 @@ void plugins_init(struct plugins *plugins)
|
|||
* write-only on p->stdout */
|
||||
io_new_conn(p, p->stdout, plugin_conn_init, p);
|
||||
io_new_conn(p, p->stdin, plugin_conn_init, p);
|
||||
plugin_request_send(p, "init", "[]", plugin_init_cb, p);
|
||||
}
|
||||
if (plugins->pending_init > 0)
|
||||
io_loop(NULL, NULL);
|
||||
}
|
||||
|
||||
void json_add_opt_plugins(struct json_stream *response,
|
||||
|
|
Loading…
Add table
Reference in a new issue