commando: support commands larger than 64k.

This is needed for invoice, which can be asked to commit to giant descriptions
(though that's antisocial!).

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell 2022-07-16 22:48:27 +09:30
parent 3fe246c2e7
commit 49df89556b
2 changed files with 116 additions and 26 deletions

View File

@ -15,7 +15,9 @@
#define COMMANDO_ERROR_REMOTE_AUTH 0x4c51
enum commando_msgtype {
COMMANDO_MSG_CMD = 0x4c4f,
/* Requests are split across multiple CONTINUES, then TERM. */
COMMANDO_MSG_CMD_CONTINUES = 0x4c4d,
COMMANDO_MSG_CMD_TERM = 0x4c4f,
/* Replies are split across multiple CONTINUES, then TERM. */
COMMANDO_MSG_REPLY_CONTINUES = 0x594b,
COMMANDO_MSG_REPLY_TERM = 0x594d,
@ -32,6 +34,7 @@ struct commando {
static struct plugin *plugin;
static struct commando **outgoing_commands;
static struct commando **incoming_commands;
/* NULL peer: don't care about peer. NULL id: don't care about id */
static struct commando *find_commando(struct commando **arr,
@ -83,9 +86,10 @@ struct reply {
size_t off, len;
};
/* Calls itself repeatedly: first time, result is NULL */
static struct command_result *send_response(struct command *command UNUSED,
const char *buf UNUSED,
const jsmntok_t *result UNUSED,
const jsmntok_t *result,
struct reply *reply)
{
size_t msglen = reply->len - reply->off;
@ -99,7 +103,7 @@ static struct command_result *send_response(struct command *command UNUSED,
msgtype = COMMANDO_MSG_REPLY_CONTINUES;
/* We need to make a copy first time before we call back, since
* plugin will reuse it! */
if (reply->off == 0)
if (!result)
reply->buf = tal_dup_talarr(reply, char, reply->buf);
} else {
if (msglen == 0) {
@ -140,7 +144,7 @@ static struct command_result *cmd_done(struct command *command,
reply->off = obj->start;
reply->len = obj->end;
return send_response(command, buf, obj, reply);
return send_response(command, NULL, NULL, reply);
}
static void commando_error(struct commando *incoming,
@ -248,6 +252,43 @@ static void try_command(struct node_id *peer,
send_outreq(plugin, req);
}
static void handle_incmd(struct node_id *peer,
u64 idnum,
const u8 *msg, size_t msglen,
bool terminal)
{
struct commando *incmd;
incmd = find_commando(incoming_commands, peer, NULL);
/* Don't let them buffer multiple commands: discard old. */
if (incmd && incmd->id != idnum)
incmd = tal_free(incmd);
if (!incmd) {
incmd = tal(plugin, struct commando);
incmd->id = idnum;
incmd->cmd = NULL;
incmd->peer = *peer;
incmd->contents = tal_arr(incmd, u8, 0);
tal_arr_expand(&incoming_commands, incmd);
tal_add_destructor2(incmd, destroy_commando, &incoming_commands);
}
/* 1MB should be enough for anybody! */
append_contents(incmd, msg, msglen, 1024*1024);
if (!terminal)
return;
if (!incmd->contents) {
plugin_log(plugin, LOG_UNUSUAL, "%s: ignoring oversize request",
node_id_to_hexstr(tmpctx, peer));
return;
}
try_command(peer, idnum, incmd->contents, tal_bytelen(incmd->contents));
}
static struct command_result *handle_reply(struct node_id *peer,
u64 idnum,
const u8 *msg, size_t msglen,
@ -283,7 +324,9 @@ static struct command_result *handle_reply(struct node_id *peer,
replystr = (const char *)ocmd->contents;
toks = json_parse_simple(ocmd, replystr, tal_bytelen(ocmd->contents));
if (!toks || toks[0].type != JSMN_OBJECT)
return command_fail(ocmd->cmd, COMMANDO_ERROR_LOCAL, "Reply was unparsable");
return command_fail(ocmd->cmd, COMMANDO_ERROR_LOCAL,
"Reply was unparsable: '%.*s'",
(int)tal_bytelen(ocmd->contents), replystr);
err = json_get_member(replystr, toks, "error");
if (err) {
@ -343,8 +386,10 @@ static struct command_result *handle_custommsg(struct command *cmd,
if (msg) {
switch (mtype) {
case COMMANDO_MSG_CMD:
try_command(&peer, idnum, msg, len);
case COMMANDO_MSG_CMD_CONTINUES:
case COMMANDO_MSG_CMD_TERM:
handle_incmd(&peer, idnum, msg, len,
mtype == COMMANDO_MSG_CMD_TERM);
break;
case COMMANDO_MSG_REPLY_CONTINUES:
case COMMANDO_MSG_REPLY_TERM:
@ -364,14 +409,31 @@ static const struct plugin_hook hooks[] = {
},
};
static struct command_result *send_success(struct command *command,
const char *buf,
const jsmntok_t *result,
struct commando *incoming)
{
return command_still_pending(command);
}
struct outgoing {
struct node_id peer;
size_t msg_off;
u8 **msgs;
};
static struct command_result *send_more_cmd(struct command *cmd,
const char *buf UNUSED,
const jsmntok_t *result UNUSED,
struct outgoing *outgoing)
{
struct out_req *req;
if (outgoing->msg_off == tal_count(outgoing->msgs)) {
tal_free(outgoing);
return command_still_pending(cmd);
}
req = jsonrpc_request_start(plugin, cmd, "sendcustommsg",
send_more_cmd, forward_error, outgoing);
json_add_node_id(req->js, "node_id", &outgoing->peer);
json_add_hex_talarr(req->js, "msg", outgoing->msgs[outgoing->msg_off++]);
return send_outreq(plugin, req);
}
static struct command_result *json_commando(struct command *cmd,
const char *buffer,
@ -381,9 +443,9 @@ static struct command_result *json_commando(struct command *cmd,
const char *method, *cparams;
const char *rune;
struct commando *ocmd;
struct out_req *req;
u8 *cmd_msg;
struct outgoing *outgoing;
char *json;
size_t jsonlen;
if (!param(cmd, buffer, params,
p_req("peer_id", param_node_id, &peer),
@ -410,28 +472,44 @@ static struct command_result *json_commando(struct command *cmd,
tal_append_fmt(&json, ",\"rune\":\"%s\"", rune);
tal_append_fmt(&json, "}");
cmd_msg = tal_arr(NULL, u8, 0);
towire_u16(&cmd_msg, COMMANDO_MSG_CMD);
towire_u64(&cmd_msg, ocmd->id);
towire(&cmd_msg, json, strlen(json));
req = jsonrpc_request_start(plugin, NULL, "sendcustommsg",
send_success, forward_error, ocmd);
json_add_node_id(req->js, "node_id", &ocmd->peer);
json_add_hex_talarr(req->js, "msg", cmd_msg);
tal_free(cmd_msg);
/* This is not a leak, but we don't keep a pointer. */
outgoing = notleak(tal(cmd, struct outgoing));
outgoing->peer = *peer;
outgoing->msg_off = 0;
/* 65000 per message gives sufficient headroom. */
jsonlen = tal_bytelen(json)-1;
outgoing->msgs = notleak(tal_arr(cmd, u8 *, (jsonlen + 64999) / 65000));
for (size_t i = 0; i < tal_count(outgoing->msgs); i++) {
u8 *cmd_msg = tal_arr(outgoing, u8, 0);
bool terminal = (i == tal_count(outgoing->msgs) - 1);
size_t off = i * 65000, len;
if (terminal)
len = jsonlen - off;
else
len = 65000;
towire_u16(&cmd_msg,
terminal ? COMMANDO_MSG_CMD_TERM
: COMMANDO_MSG_CMD_CONTINUES);
towire_u64(&cmd_msg, ocmd->id);
towire(&cmd_msg, json + off, len);
outgoing->msgs[i] = cmd_msg;
}
/* Keep memleak code happy! */
tal_free(peer);
tal_free(method);
tal_free(cparams);
return send_outreq(plugin, req);
return send_more_cmd(cmd, NULL, NULL, outgoing);
}
#if DEVELOPER
static void memleak_mark_globals(struct plugin *p, struct htable *memtable)
{
memleak_remove_region(memtable, outgoing_commands, tal_bytelen(outgoing_commands));
memleak_remove_region(memtable, incoming_commands, tal_bytelen(incoming_commands));
}
#endif
@ -439,6 +517,7 @@ static const char *init(struct plugin *p,
const char *buf UNUSED, const jsmntok_t *config UNUSED)
{
outgoing_commands = tal_arr(p, struct commando *, 0);
incoming_commands = tal_arr(p, struct commando *, 0);
plugin = p;
#if DEVELOPER
plugin_set_memleak_handler(p, memleak_mark_globals);

View File

@ -2587,3 +2587,14 @@ def test_commando(node_factory):
'params': {'level': 'io'}})
assert len(json.dumps(ret)) > 65535
# Command will go over multiple messages.
ret = l2.rpc.call(method='commando',
payload={'peer_id': l1.info['id'],
'method': 'invoice',
'params': {'amount_msat': 'any',
'label': 'label',
'description': 'A' * 200000,
'deschashonly': True}})
assert 'bolt11' in ret