plugin: Migrate request creation to json_stream

We can use the internal buffering of the json_stream instead of
manually building JSON-RPC calls. This makes it a lot easier to handle
these requests.

Notice that we do not flush concurrently and still buffer all the
things, but it avoids double-buffering things.

Signed-off-by: Christian Decker <decker.christian@gmail.com>
This commit is contained in:
Christian Decker 2018-11-29 15:41:09 +01:00 committed by Rusty Russell
parent dc4fb650dc
commit 230730eca4

View File

@ -42,9 +42,10 @@ struct plugin {
char *buffer;
size_t used, len_read;
/* Stuff we write */
struct list_head output;
const char *outbuf;
/* Our json_streams. Since multiple streams could start
* returning data at once, we always service these in order,
* freeing once empty. */
struct json_stream **js_arr;
struct log *log;
@ -69,6 +70,7 @@ struct plugin_request {
const char *json_params;
const char *response;
const jsmntok_t *resulttok, *errortok, *toks;
struct json_stream *stream;
/* The response handler to be called on success or error */
void (*cb)(const struct plugin_request *, void *);
@ -90,11 +92,6 @@ struct plugins {
struct timers timers;
};
struct json_output {
struct list_node list;
const char *json;
};
/* Represents a pending JSON-RPC request that was forwarded to a
* plugin and is currently waiting for it to return the result. */
struct plugin_rpc_request {
@ -138,7 +135,8 @@ void plugin_register(struct plugins *plugins, const char* path TAKES)
list_add_tail(&plugins->plugins, &p->list);
p->plugins = plugins;
p->cmd = tal_strdup(p, path);
p->outbuf = NULL;
p->js_arr = tal_arr(p, struct json_stream *, 0);
p->used = 0;
p->log = new_log(p, plugins->log_book, "plugin-%s",
path_basename(tmpctx, p->cmd));
@ -201,6 +199,62 @@ static void PRINTF_FMT(2,3) plugin_kill(struct plugin *plugin, char *fmt, ...)
tal_free(plugin);
}
/**
* Create the header of a JSON-RPC request and return open stream.
*
* This is a partial request, missing the params element, which the
* caller needs to add. We can't open it yet since we don't know
* whether it is supposed to be an object (name-value pairs) or an
* array.
*/
static struct plugin_request *
plugin_request_new_(struct plugin *plugin, const char *method,
void (*cb)(const struct plugin_request *, void *),
void *arg)
{
static u64 next_request_id = 0;
struct plugin_request *req = tal(plugin, struct plugin_request);
u64 request_id = next_request_id++;
req->id = request_id;
req->method = tal_strdup(req, method);
req->cb = cb;
req->arg = arg;
req->plugin = plugin;
/* We will not concurrently drain, if we do we must set the
* writer to non-NULL */
req->stream = new_json_stream(req, NULL);
/* Add to map so we can find it later when routing the response */
uintmap_add(&plugin->plugins->pending_requests, req->id, req);
json_object_start(req->stream, NULL);
json_add_string(req->stream, "jsonrpc", "2.0");
json_add_string(req->stream, "method", method);
json_add_u64(req->stream, "id", request_id);
return req;
}
#define plugin_request_new(plugin, method, cb, arg) \
plugin_request_new_( \
(plugin), (method), \
typesafe_cb_preargs(void, void *, (cb), (arg), \
const struct plugin_request *), \
(arg))
/**
* Given a request, send it to the plugin.
*/
static void plugin_request_queue(struct plugin_request *req)
{
/* Finish the `params` object and submit the request */
json_object_end(req->stream); /* root element */
json_stream_append(req->stream, "\n\n");
*tal_arr_expand(&req->plugin->js_arr) = req->stream;
io_wake(req->plugin);
}
/**
* Try to parse a complete message from the plugin's buffer.
*
@ -300,71 +354,31 @@ static struct io_plan *plugin_read_json(struct io_conn *conn UNUSED,
&plugin->len_read, plugin_read_json, plugin);
}
static struct io_plan *plugin_write_json(struct io_conn *conn UNUSED,
/* Mutual recursion */
static struct io_plan *plugin_write_json(struct io_conn *conn,
struct plugin *plugin);
static struct io_plan *plugin_stream_complete(struct io_conn *conn, struct json_stream *js, struct plugin *plugin)
{
size_t pending = tal_count(plugin->js_arr);
/* Remove js and shift all remainig over */
tal_free(plugin->js_arr[0]);
memmove(plugin->js_arr, plugin->js_arr + 1, (pending - 1) * sizeof(plugin->js_arr[0]));
tal_resize(&plugin->js_arr, pending-1);
return plugin_write_json(conn, plugin);
}
static struct io_plan *plugin_write_json(struct io_conn *conn,
struct plugin *plugin)
{
struct json_output *out;
if (plugin->outbuf)
plugin->outbuf = tal_free(plugin->outbuf);
out = list_pop(&plugin->output, struct json_output, list);
if (!out) {
if (plugin->stop) {
return io_close(conn);
} else {
return io_out_wait(plugin->stdin_conn, plugin,
plugin_write_json, plugin);
}
if (tal_count(plugin->js_arr)) {
return json_stream_output(plugin->js_arr[0], plugin->stdin_conn, plugin_stream_complete, plugin);
}
/* We have a message we'd like to send */
plugin->outbuf = tal_steal(plugin, out->json);
tal_free(out);
return io_write(conn, plugin->outbuf, strlen(plugin->outbuf),
plugin_write_json, plugin);
return io_out_wait(conn, plugin, plugin_write_json, plugin);
}
static void plugin_request_send_(
struct plugin *plugin, const char *method TAKES, const char *params TAKES,
void (*cb)(const struct plugin_request *, void *), void *arg)
{
static u64 next_request_id = 0;
struct plugin_request *req = tal(plugin, struct plugin_request);
struct json_output *out = tal(plugin, struct json_output);
u64 request_id = next_request_id++;
req->id = request_id;
req->method = tal_strdup(req, method);
req->json_params = tal_strdup(req, params);
req->cb = cb;
req->arg = arg;
req->plugin = plugin;
/* Add to map so we can find it later when routing the response */
uintmap_add(&plugin->plugins->pending_requests, req->id, req);
/* Wrap the request in the JSON-RPC request object. Terminate
* with an empty line that serves as a hint that the JSON
* object is done. */
out->json = tal_fmt(out, "{"
"\"jsonrpc\": \"2.0\", "
"\"method\": \"%s\", "
"\"params\" : %s, "
"\"id\" : %" PRIu64 " }\n\n",
method, params, request_id);
/* Queue and notify the writer */
list_add_tail(&plugin->output, &out->list);
io_wake(plugin);
}
#define plugin_request_send(plugin, method, params, cb, arg) \
plugin_request_send_( \
(plugin), (method), (params), \
typesafe_cb_preargs(void, void *, (cb), (arg), \
const struct plugin_request *), \
(arg))
static struct io_plan *plugin_stdin_conn_init(struct io_conn *conn,
struct plugin *plugin)
{
@ -505,6 +519,7 @@ static void plugin_rpcmethod_dispatch(struct command *cmd, const char *buffer,
struct plugin_rpc_request *request;
struct plugins *plugins = cmd->ld->plugins;
struct plugin *plugin;
struct plugin_request *req;
if (cmd->mode == CMD_USAGE) {
cmd->usage = "[params]";
@ -549,7 +564,9 @@ found:
assert(request->plugin);
tal_steal(request->plugin, request);
plugin_request_send(request->plugin, request->method, request->params, plugin_rpcmethod_cb, request);
req = plugin_request_new(request->plugin, request->method, plugin_rpcmethod_cb, request);
json_stream_append_fmt(req->stream, ", \"params\": %s", request->params);
plugin_request_queue(req);
command_still_pending(cmd);
}
@ -735,6 +752,7 @@ void plugins_init(struct plugins *plugins)
char **cmd;
int stdin, stdout;
struct timer *expired;
struct plugin_request *req;
plugins->pending_manifests = 0;
uintmap_init(&plugins->pending_requests);
@ -748,17 +766,17 @@ void plugins_init(struct plugins *plugins)
if (p->pid == -1)
fatal("error starting plugin '%s': %s", p->cmd,
strerror(errno));
list_head_init(&p->output);
p->buffer = tal_arr(p, char, 64);
p->used = 0;
p->stop = false;
/* Create two connections, one read-only on top of p->stdin, and one
* write-only on p->stdout */
io_new_conn(p, stdout, plugin_stdout_conn_init, p);
io_new_conn(p, stdin, plugin_stdin_conn_init, p);
plugin_request_send(p, "getmanifest", "[]", plugin_manifest_cb, p);
req = plugin_request_new(p, "getmanifest", plugin_manifest_cb, p);
json_array_start(req->stream, "params");
json_array_end(req->stream);
plugin_request_queue(req);
plugins->pending_manifests++;
p->timeout_timer = new_reltimer(
&plugins->timers, p, time_from_sec(PLUGIN_MANIFEST_TIMEOUT),
@ -787,19 +805,25 @@ static void plugin_config_cb(const struct plugin_request *req,
static void plugin_config(struct plugin *plugin)
{
struct plugin_opt *opt;
bool first = true;
const char *name, *sep;
char *conf = tal_fmt(tmpctx, "{\n \"options\": {");
const char *name;
struct plugin_request *req;
/* No writer since we don't flush concurrently. */
req = plugin_request_new(plugin, "init", plugin_config_cb, plugin);
json_object_start(req->stream, "params"); /* start of .params */
/* Add .params.options */
json_object_start(req->stream, "options");
list_for_each(&plugin->plugin_opts, opt, list) {
/* Trim the `--` that we added before */
name = opt->name + 2;
/* Separator between elements in the same object */
sep = first?"":",";
first = false;
tal_append_fmt(&conf, "%s\n \"%s\": \"%s\"", sep, name, opt->value);
json_add_string(req->stream, name, opt->value);
}
tal_append_fmt(&conf, "\n }\n}");
plugin_request_send(plugin, "init", conf, plugin_config_cb, plugin);
json_object_end(req->stream); /* end of .params.options */
json_object_end(req->stream); /* end of .params */
plugin_request_queue(req);
}
void plugins_config(struct plugins *plugins)