renepay: drive *all* progress from termination of struct pay_flow.

The main function here is payment_reconsider:
* Each payment has a list of pay_flow.
* This is populated in try_paying(), calling add_payflows & sendpay_new_flows.
* When we get a notification, we resolve a pay_flow using one of the pay_flow_failedxxx
  or pay_flow_succeeded functions.
* They call payment_reconsider() which cleans up finished flows decides what to do:
  often calling try_paying again.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell 2023-08-10 11:29:44 +09:30
parent d75d68cdde
commit d7b3bdd420
10 changed files with 516 additions and 290 deletions

View File

@ -31,10 +31,6 @@
struct pay_plugin *pay_plugin;
static void timer_kick(struct payment *payment);
static struct command_result *try_paying(struct command *cmd,
struct payment *payment);
void amount_msat_accumulate_(struct amount_msat *dst,
struct amount_msat src,
const char *dstname,
@ -68,12 +64,6 @@ static void memleak_mark(struct plugin *p, struct htable *memtable)
}
#endif
static void destroy_payflow(struct pay_flow *flow)
{
remove_htlc_payflow(pay_plugin->chan_extra_map,flow);
payflow_map_del(pay_plugin->payflow_map, flow);
}
static const char *init(struct plugin *p,
const char *buf UNUSED, const jsmntok_t *config UNUSED)
{
@ -123,45 +113,6 @@ static const char *init(struct plugin *p,
return NULL;
}
static void payment_settimer(struct payment *payment)
{
payment->rexmit_timer = tal_free(payment->rexmit_timer);
payment->rexmit_timer = plugin_timer(
pay_plugin->plugin,
time_from_msec(TIMER_COLLECT_FAILURES_MSEC),
timer_kick, payment);
}
/* Happens when timer goes off, but also works to arm timer if nothing to do */
static void timer_kick(struct payment *payment)
{
plugin_log(pay_plugin->plugin,LOG_DBG,"calling %s",__PRETTY_FUNCTION__);
switch (payment->status)
{
/* Some flows succeeded, we finish the payment. */
case PAYMENT_SUCCESS:
plugin_log(pay_plugin->plugin,LOG_DBG,"status is PAYMENT_SUCCESS");
payment_success(payment);
break;
/* Some flows failed, we retry. */
case PAYMENT_FAIL:
plugin_log(pay_plugin->plugin,LOG_DBG,"status is PAYMENT_FAIL");
payment_assert_delivering_incomplete(payment);
try_paying(payment->cmd, payment);
break;
/* Nothing has returned yet, we have to wait. */
case PAYMENT_PENDING:
plugin_log(pay_plugin->plugin,LOG_DBG,"status is PAYMENT_PENDING");
payment_assert_delivering_all(payment);
payment_settimer(payment);
break;
}
}
/* Sometimes we don't know exactly who to blame... */
static void handle_unhandleable_error(struct pay_flow *flow,
const char *what)
@ -179,9 +130,8 @@ static void handle_unhandleable_error(struct pay_flow *flow,
if (n == 1)
{
payflow_fail(flow);
payment_fail(flow->payment, PAY_UNPARSEABLE_ONION,
"Got %s from the destination", what);
/* This is a terminal error. */
pay_flow_failed_final(flow, PAY_UNPARSEABLE_ONION, what);
return;
}
/* FIXME: check chan_extra_map, since we might have succeeded though
@ -199,6 +149,8 @@ static void handle_unhandleable_error(struct pay_flow *flow,
debug_paynote(flow->payment, "... eliminated %s",
type_to_string(tmpctx, struct short_channel_id,
&flow->path_scids[n]));
pay_flow_failed(flow);
}
/* We hold onto the flow (and delete the timer) while we're waiting for
@ -214,13 +166,9 @@ static struct command_result *addgossip_done(struct command *cmd,
struct addgossip *adg)
{
plugin_log(pay_plugin->plugin,LOG_DBG,"calling %s",__PRETTY_FUNCTION__);
struct payment * payment = adg->flow->payment;
/* Release this: if it's the last flow we'll retry immediately */
payflow_fail(adg->flow);
pay_flow_finished_adding_gossip(adg->flow);
tal_free(adg);
payment_settimer(payment);
return command_still_pending(cmd);
}
@ -255,8 +203,6 @@ static struct command_result *submit_update(struct pay_flow *flow,
* we don't get a rexmit before this is complete. */
adg->scid = errscid;
adg->flow = flow;
/* Disable re-xmit until this returns */
payment->rexmit_timer = tal_free(payment->rexmit_timer);
debug_paynote(payment, "... extracted channel_update, telling gossipd");
plugin_log(pay_plugin->plugin, LOG_DBG, "(update = %s)", tal_hex(tmpctx, update));
@ -352,10 +298,6 @@ static struct command_result *flow_sendpay_failed(struct command *cmd,
debug_assert(payment);
/* This is a fail. */
if (payment->status != PAYMENT_SUCCESS)
payment->status=PAYMENT_FAIL;
if (json_scan(tmpctx, buf, err,
"{code:%,message:%}",
JSON_SCAN(json_to_jsonrpc_errcode, &errcode),
@ -374,45 +316,44 @@ static struct command_result *flow_sendpay_failed(struct command *cmd,
* We just disable this scid. */
tal_arr_expand(&payment->disabled, flow->path_scids[0]);
payflow_fail(flow);
pay_flow_failed(flow);
return command_still_pending(cmd);
}
static struct command_result *
sendpay_flows(struct command *cmd,
struct payment *p,
struct pay_flow **flows STEALS)
/* Kick off all pay_flows which are in state PAY_FLOW_NOT_STARTED */
static void sendpay_new_flows(struct payment *p)
{
plugin_log(pay_plugin->plugin,LOG_DBG,"calling %s",__PRETTY_FUNCTION__);
debug_paynote(p, "Sending out batch of %zu payments", tal_count(flows));
struct pay_flow *pf;
for (size_t i = 0; i < tal_count(flows); i++) {
const size_t path_lengh = tal_count(flows[i]->amounts);
debug_paynote(p, "sendpay flow groupid=%ld, partid=%ld, delivering=%s, probability=%.3lf",
flows[i]->key.groupid,
flows[i]->key.partid,
type_to_string(tmpctx,struct amount_msat,
&flows[i]->amounts[path_lengh-1]),
flows[i]->success_prob);
list_for_each(&p->flows, pf, list) {
if (pf->state != PAY_FLOW_NOT_STARTED)
continue;
debug_paynote(p, "sendpay flow groupid=%"PRIu64", partid=%"PRIu64", delivering=%s, probability=%.3lf",
pf->key.groupid,
pf->key.partid,
fmt_amount_msat(tmpctx, payflow_delivered(pf)),
pf->success_prob);
struct out_req *req;
req = jsonrpc_request_start(cmd->plugin, cmd, "sendpay",
/* FIXME: We don't actually want cmd to own this sendpay, so we use NULL here,
* but we should use a variant which allows us to set json id! */
req = jsonrpc_request_start(pay_plugin->plugin, NULL, "sendpay",
flow_sent, flow_sendpay_failed,
flows[i]);
pf);
json_array_start(req->js, "route");
for (size_t j = 0; j < tal_count(flows[i]->path_nodes); j++) {
for (size_t j = 0; j < tal_count(pf->path_nodes); j++) {
json_object_start(req->js, NULL);
json_add_node_id(req->js, "id",
&flows[i]->path_nodes[j]);
&pf->path_nodes[j]);
json_add_short_channel_id(req->js, "channel",
&flows[i]->path_scids[j]);
&pf->path_scids[j]);
json_add_amount_msat(req->js, "amount_msat",
flows[i]->amounts[j]);
pf->amounts[j]);
json_add_num(req->js, "direction",
flows[i]->path_dirs[j]);
pf->path_dirs[j]);
json_add_u32(req->js, "delay",
flows[i]->cltv_delays[j]);
pf->cltv_delays[j]);
json_add_string(req->js,"style","tlv");
json_object_end(req->js);
}
@ -423,7 +364,7 @@ sendpay_flows(struct command *cmd,
json_add_amount_msat(req->js, "amount_msat", p->amount);
json_add_u64(req->js, "partid", flows[i]->key.partid);
json_add_u64(req->js, "partid", pf->key.partid);
json_add_u64(req->js, "groupid", p->groupid);
if (p->payment_metadata)
@ -437,48 +378,25 @@ sendpay_flows(struct command *cmd,
if (p->description)
json_add_string(req->js, "description", p->description);
amount_msat_accumulate(&p->total_sent, flows[i]->amounts[0]);
amount_msat_accumulate(&p->total_delivering,
payflow_delivered(flows[i]));
send_outreq(pay_plugin->plugin, req);
/* Flow now owned by all_flows instead of req., in this way we
* can control the destruction occurs before we remove temporary
* channels from chan_extra_map. */
tal_steal(pay_plugin,flows[i]);
/* Let's keep record of this flow. */
payflow_map_add(pay_plugin->payflow_map,flows[i]);
/* record these HTLC along the flow path */
commit_htlc_payflow(pay_plugin->chan_extra_map,flows[i]);
/* Remove the HTLC from the chan_extra_map after finish. */
tal_add_destructor(flows[i], destroy_payflow);
send_outreq(cmd->plugin, req);
/* Now you're started! */
pf->state = PAY_FLOW_IN_PROGRESS;
}
/* Safety check. */
payment_assert_delivering_all(p);
tal_free(flows);
/* Get ready to process replies */
payment_settimer(p);
return command_still_pending(cmd);
}
static struct command_result *try_paying(struct command *cmd,
struct payment *payment)
const char *try_paying(const tal_t *ctx,
struct payment *payment,
enum jsonrpc_errcode *ecode)
{
plugin_log(pay_plugin->plugin,LOG_DBG,"calling %s",__PRETTY_FUNCTION__);
struct amount_msat feebudget, fees_spent, remaining;
payment->status = PAYMENT_PENDING;
if (time_after(time_now(), payment->stop_time))
return payment_fail(payment, PAY_STOPPED_RETRYING, "Timed out");
assert(payment->status == PAYMENT_PENDING);
/* Total feebudget */
if (!amount_msat_sub(&feebudget, payment->maxspend, payment->amount))
@ -531,27 +449,22 @@ static struct command_result *try_paying(struct command *cmd,
/* We let this return an unlikely path, as it's better to try once
* than simply refuse. Plus, models are not truth! */
gossmap_apply_localmods(pay_plugin->gossmap, payment->local_gossmods);
struct pay_flow **pay_flows = get_payflows(
payment,
remaining, feebudget,
/* is entire payment? */
amount_msat_eq(payment->total_delivering, AMOUNT_MSAT(0)),
&err_msg);
err_msg = add_payflows(tmpctx,
payment,
remaining, feebudget,
/* is entire payment? */
amount_msat_eq(payment->total_delivering, AMOUNT_MSAT(0)),
ecode);
gossmap_remove_localmods(pay_plugin->gossmap, payment->local_gossmods);
// plugin_log(pay_plugin->plugin,LOG_DBG,"get_payflows produced %s",fmt_payflows(tmpctx,pay_flows));
/* MCF cannot find a feasible route, we stop. */
if (!pay_flows)
{
return payment_fail(payment, PAY_ROUTE_NOT_FOUND,
"Failed to find a route, %s",
err_msg);
}
/* Now begin making payments */
if (err_msg)
return err_msg;
return sendpay_flows(cmd, payment, pay_flows);
/* Now begin making payments */
sendpay_new_flows(payment);
return NULL;
}
static void destroy_cmd_payment_ptr(struct command *cmd,
@ -568,6 +481,9 @@ static struct command_result *listpeerchannels_done(
struct payment *payment)
{
plugin_log(pay_plugin->plugin,LOG_DBG,"calling %s",__PRETTY_FUNCTION__);
const char *errmsg;
enum jsonrpc_errcode ecode;
if (!uncertainty_network_update_from_listpeerchannels(
pay_plugin->chan_extra_map,
pay_plugin->my_id,
@ -585,9 +501,15 @@ static struct command_result *listpeerchannels_done(
/* From now on, we keep a record of the payment, so persist it beyond this cmd. */
tal_steal(pay_plugin->plugin, payment);
/* When we terminate cmd for any reason, clear it from payment so we don't do it again. */
assert(cmd == payment->cmd);
tal_add_destructor2(cmd, destroy_cmd_payment_ptr, payment);
return try_paying(cmd, payment);
/* This looks for a route, and if OK, fires off the sendpay commands */
errmsg = try_paying(tmpctx, payment, &ecode);
if (errmsg)
return payment_fail(payment, ecode, "%s", errmsg);
return command_still_pending(cmd);
}
@ -1128,7 +1050,8 @@ static struct command_result *json_pay(struct command *cmd,
return send_outreq(cmd->plugin, req);
}
static void handle_sendpay_failure_payment(struct pay_flow *flow,
/* Terminates flow */
static void handle_sendpay_failure_payment(struct pay_flow *flow STEALS,
const char *message,
u32 erridx,
enum onion_wire onionerr,
@ -1136,12 +1059,12 @@ static void handle_sendpay_failure_payment(struct pay_flow *flow,
{
struct short_channel_id errscid;
struct payment *p = flow->payment;
const u8 *update;
debug_assert(flow);
debug_assert(p);
/* Final node is usually a hard failure, but lightningd said
* TRY_OTHER_ROUTE? */
/* Final node is usually a hard failure */
if (erridx == tal_count(flow->path_scids)) {
debug_paynote(p,
"onion error %s from final node #%u: %s",
@ -1149,14 +1072,13 @@ static void handle_sendpay_failure_payment(struct pay_flow *flow,
erridx,
message);
if (onionerr == WIRE_MPP_TIMEOUT)
if (onionerr == WIRE_MPP_TIMEOUT) {
pay_flow_failed(flow);
return;
}
debug_paynote(p,"final destination failure");
payment_fail(p, PAY_DESTINATION_PERM_FAIL,
"Destination said %s: %s",
onion_wire_name(onionerr),
message);
pay_flow_failed_final(flow, PAY_DESTINATION_PERM_FAIL, message);
return;
}
@ -1190,6 +1112,7 @@ static void handle_sendpay_failure_payment(struct pay_flow *flow,
debug_paynote(p, "we're removing scid %s",
type_to_string(tmpctx,struct short_channel_id,&errscid));
tal_arr_expand(&p->disabled, errscid);
pay_flow_failed(flow);
return;
/* These can be fixed (maybe) by applying the included channel_update */
@ -1200,10 +1123,12 @@ static void handle_sendpay_failure_payment(struct pay_flow *flow,
plugin_log(pay_plugin->plugin,LOG_DBG,"sendpay_failure, apply channel_update");
/* FIXME: Check scid! */
// TODO(eduardo): check
const u8 *update = channel_update_from_onion_error(tmpctx, raw);
update = channel_update_from_onion_error(tmpctx, raw);
if (update)
{
submit_update(flow, update, errscid);
/* Don't retry until we call pay_flow_finished_adding_gossip! */
pay_flow_failed_adding_gossip(flow);
return;
}
@ -1215,9 +1140,12 @@ static void handle_sendpay_failure_payment(struct pay_flow *flow,
case WIRE_TEMPORARY_CHANNEL_FAILURE:
/* These also contain a channel_update, but in this case it's simply
* advisory, not necessary. */
const u8 *update = channel_update_from_onion_error(tmpctx, raw);
if (update)
update = channel_update_from_onion_error(tmpctx, raw);
if (update) {
submit_update(flow, update, errscid);
/* Don't retry until we call pay_flow_finished_adding_gossip! */
pay_flow_failed_adding_gossip(flow);
}
return;
@ -1233,7 +1161,7 @@ static void handle_sendpay_failure_payment(struct pay_flow *flow,
onionerr,
type_to_string(tmpctx,struct short_channel_id,&errscid));
tal_arr_expand(&p->disabled, errscid);
return;
pay_flow_failed(flow);
}
static void handle_sendpay_failure_flow(struct pay_flow *flow,
@ -1244,8 +1172,6 @@ static void handle_sendpay_failure_flow(struct pay_flow *flow,
debug_assert(flow);
struct payment * const p = flow->payment;
if (p->status != PAYMENT_SUCCESS)
p->status=PAYMENT_FAIL;
plugin_log(pay_plugin->plugin, LOG_UNUSUAL,
"onion error %s from node #%u %s: "
@ -1305,20 +1231,6 @@ static struct pay_flow *pay_flow_from_notification(const char *buf,
return payflow_map_get(pay_plugin->payflow_map, &key);
}
// TODO(eduardo): if I subscribe to a shutdown notification, the plugin takes
// forever to close and eventually it gets killed by force.
// static struct command_result *notification_shutdown(struct command *cmd,
// const char *buf,
// const jsmntok_t *params)
// {
// /* TODO(eduardo):
// * 1. at shutdown the `struct plugin *p` is not freed,
// * 2. `memleak_check` is called before we have the chance to get this
// * notification. */
// // plugin_log(pay_plugin->plugin,LOG_DBG,"received shutdown notification, freeing data.");
// pay_plugin->ctx = tal_free(pay_plugin->ctx);
// return notification_handled(cmd);
// }
static struct command_result *notification_sendpay_success(
struct command *cmd,
const char *buf,
@ -1395,10 +1307,9 @@ static struct command_result *notification_sendpay_failure(
case PAY_DESTINATION_PERM_FAIL:
break;
default:
payment_fail(flow->payment, errcode,
"Unexpected errocode from sendpay_failure: %.*s",
json_tok_full_len(params),
json_tok_full(buf, params));
pay_flow_failed_final(flow,
errcode,
"Unexpected errorcode from sendpay_failure");
goto done;
}
@ -1431,7 +1342,6 @@ static struct command_result *notification_sendpay_failure(
handle_sendpay_failure_payment(flow, msg, erridx, onionerr, raw);
done:
if(flow) payflow_fail(flow);
return notification_handled(cmd);
unhandleable:

View File

@ -69,7 +69,7 @@ struct pay_plugin {
/* Per-channel metadata: some persists between payments */
struct chan_extra_map *chan_extra_map;
/* Pending senpays. */
/* Pending sendpays (to match notifications to). */
struct payflow_map * payflow_map;
bool debug_mcf;
@ -110,4 +110,8 @@ void amount_msat_reduce_(struct amount_msat *dst,
const char *dstname,
const char *srcname);
/* Returns NULL if OK, otherwise an error msg and sets *ecode */
const char *try_paying(const tal_t *ctx,
struct payment *payment,
enum jsonrpc_errcode *ecode);
#endif /* LIGHTNING_PLUGINS_RENEPAY_PAY_H */

View File

@ -35,6 +35,12 @@
#define MAX_SHADOW_LEN 3
/* FIXME: rearrange to avoid predecls. */
static void remove_htlc_payflow(struct chan_extra_map *chan_extra_map,
struct pay_flow *flow);
static void commit_htlc_payflow(struct chan_extra_map *chan_extra_map,
const struct pay_flow *flow);
/* Returns CLTV, and fills in *shadow_fee, based on extending the path */
static u32 shadow_one_flow(const struct gossmap *gossmap,
const struct flow *f,
@ -198,26 +204,30 @@ static u32 *shadow_additions(const tal_t *ctx,
return final_cltvs;
}
/* Calculates delays and converts to scids. Frees flows. Caller is responsible
* for removing resultings flows from the chan_extra_map. */
static struct pay_flow **flows_to_pay_flows(struct payment *payment,
struct gossmap *gossmap,
struct flow **flows STEALS,
const u32 *final_cltvs,
u64 *next_partid)
static void destroy_payment_flow(struct pay_flow *pf)
{
struct pay_flow **pay_flows
= tal_arr(payment, struct pay_flow *, tal_count(flows));
list_del_from(&pf->payment->flows, &pf->list);
remove_htlc_payflow(pay_plugin->chan_extra_map, pf);
payflow_map_del(pay_plugin->payflow_map, pf);
}
/* Calculates delays and converts to scids, and links to the payment.
* Frees flows. */
static void convert_and_attach_flows(struct payment *payment,
struct gossmap *gossmap,
struct flow **flows STEALS,
const u32 *final_cltvs,
u64 *next_partid)
{
for (size_t i = 0; i < tal_count(flows); i++) {
struct flow *f = flows[i];
struct pay_flow *pf = tal(pay_flows, struct pay_flow);
struct pay_flow *pf = tal(payment, struct pay_flow);
size_t plen;
plen = tal_count(f->path);
pay_flows[i] = pf;
pf->payment = payment;
pf->state = PAY_FLOW_NOT_STARTED;
pf->key.partid = (*next_partid)++;
pf->key.groupid = payment->groupid;
pf->key.payment_hash = payment->payment_hash;
@ -243,10 +253,25 @@ static struct pay_flow **flows_to_pay_flows(struct payment *payment,
pf->amounts = tal_steal(pf, f->amounts);
pf->path_dirs = tal_steal(pf, f->dirs);
pf->success_prob = f->success_prob;
/* Payment keeps a list of its flows. */
list_add(&payment->flows, &pf->list);
/* Increase totals for payment */
amount_msat_accumulate(&payment->total_sent, pf->amounts[0]);
amount_msat_accumulate(&payment->total_delivering,
payflow_delivered(pf));
/* We keep a global map to identify notifications
* about this flow. */
payflow_map_add(pay_plugin->payflow_map, pf);
/* record these HTLC along the flow path */
commit_htlc_payflow(pay_plugin->chan_extra_map, pf);
tal_add_destructor(pf, destroy_payment_flow);
}
tal_free(flows);
return pay_flows;
}
static bitmap *make_disabled_bitmap(const tal_t *ctx,
@ -332,31 +357,28 @@ static bool disable_htlc_violations(struct payment *payment,
return disabled_some;
}
/* Get some payment flows to get this amount to destination, or NULL. */
struct pay_flow **get_payflows(struct payment *p,
struct amount_msat amount,
struct amount_msat feebudget,
bool is_entire_payment,
const char **err_msg)
const char *add_payflows(const tal_t *ctx,
struct payment *p,
struct amount_msat amount,
struct amount_msat feebudget,
bool is_entire_payment,
enum jsonrpc_errcode *ecode)
{
*err_msg = tal_fmt(tmpctx,"[no error]");
bitmap *disabled;
struct pay_flow **pay_flows;
const struct gossmap_node *src, *dst;
disabled = make_disabled_bitmap(tmpctx, pay_plugin->gossmap, p->disabled);
src = gossmap_find_node(pay_plugin->gossmap, &pay_plugin->my_id);
if (!src) {
debug_paynote(p, "We don't have any channels?");
*err_msg = tal_fmt(tmpctx,"We don't have any channels.");
goto fail;
*ecode = PAY_ROUTE_NOT_FOUND;
return tal_fmt(ctx, "We don't have any channels.");
}
dst = gossmap_find_node(pay_plugin->gossmap, &p->destination);
if (!dst) {
debug_paynote(p, "No trace of destination in network gossip");
*err_msg = tal_fmt(tmpctx,"Destination is unreacheable in the network gossip.");
goto fail;
*ecode = PAY_ROUTE_NOT_FOUND;
return tal_fmt(ctx, "Destination is unknown in the network gossip.");
}
for (;;) {
@ -380,10 +402,10 @@ struct pay_flow **get_payflows(struct payment *p,
"minflow couldn't find a feasible flow for %s",
type_to_string(tmpctx,struct amount_msat,&amount));
*err_msg = tal_fmt(tmpctx,
"minflow couldn't find a feasible flow for %s",
type_to_string(tmpctx,struct amount_msat,&amount));
goto fail;
*ecode = PAY_ROUTE_NOT_FOUND;
return tal_fmt(ctx,
"minflow couldn't find a feasible flow for %s",
type_to_string(tmpctx,struct amount_msat,&amount));
}
/* Are we unhappy? */
@ -404,12 +426,12 @@ struct pay_flow **get_payflows(struct payment *p,
debug_paynote(p, "Flows too expensive, fee = %s (max %s)",
type_to_string(tmpctx, struct amount_msat, &fee),
type_to_string(tmpctx, struct amount_msat, &feebudget));
*err_msg = tal_fmt(tmpctx,
"Fee exceeds our fee budget, "
"fee = %s (maxfee = %s)",
type_to_string(tmpctx, struct amount_msat, &fee),
type_to_string(tmpctx, struct amount_msat, &feebudget));
goto fail;
*ecode = PAY_ROUTE_TOO_EXPENSIVE;
return tal_fmt(ctx,
"Fee exceeds our fee budget, "
"fee = %s (maxfee = %s)",
type_to_string(tmpctx, struct amount_msat, &fee),
type_to_string(tmpctx, struct amount_msat, &feebudget));
}
too_delayed = (delay > p->maxdelay);
if (too_delayed) {
@ -419,11 +441,11 @@ struct pay_flow **get_payflows(struct payment *p,
/* FIXME: What is a sane limit? */
if (p->delay_feefactor > 1000) {
debug_paynote(p, "Giving up!");
*err_msg = tal_fmt(tmpctx,
"CLTV delay exceeds our CLTV budget, "
"delay = %"PRIu64" (maxdelay = %u)",
delay,p->maxdelay);
goto fail;
*ecode = PAY_ROUTE_TOO_EXPENSIVE;
return tal_fmt(ctx,
"CLTV delay exceeds our CLTV budget, "
"delay = %"PRIu64" (maxdelay = %u)",
delay, p->maxdelay);
}
p->delay_feefactor *= 2;
@ -448,18 +470,15 @@ struct pay_flow **get_payflows(struct payment *p,
* to make it look like it's going elsewhere */
final_cltvs = shadow_additions(tmpctx, pay_plugin->gossmap,
p, flows, is_entire_payment);
/* OK, we are happy with these flows: convert to
* pay_flows to outlive the current gossmap. */
pay_flows = flows_to_pay_flows(p, pay_plugin->gossmap,
flows, final_cltvs,
&p->next_partid);
break;
* pay_flows in the current payment, to outlive the
* current gossmap. */
convert_and_attach_flows(p, pay_plugin->gossmap,
flows, final_cltvs,
&p->next_partid);
return NULL;
}
return pay_flows;
fail:
return NULL;
}
const char *flow_path_to_str(const tal_t *ctx, const struct pay_flow *flow)
@ -538,7 +557,7 @@ const char* fmt_payflows(const tal_t *ctx,
return json_out_contents(jout,&len);
}
void remove_htlc_payflow(
static void remove_htlc_payflow(
struct chan_extra_map *chan_extra_map,
struct pay_flow *flow)
{
@ -573,7 +592,7 @@ void remove_htlc_payflow(
h->num_htlcs--;
}
}
void commit_htlc_payflow(
static void commit_htlc_payflow(
struct chan_extra_map *chan_extra_map,
const struct pay_flow *flow)
{
@ -607,19 +626,62 @@ struct amount_msat payflow_delivered(const struct pay_flow *flow)
return flow->amounts[tal_count(flow->amounts)-1];
}
struct pay_flow* payflow_fail(struct pay_flow *flow)
/* Remove (failed) payment from amounts. */
static void payment_remove_flowamount(const struct pay_flow *pf)
{
debug_assert(flow);
struct payment * p = flow->payment;
debug_assert(p);
if (p->status != PAYMENT_SUCCESS)
p->status = PAYMENT_FAIL;
amount_msat_reduce(&p->total_delivering, payflow_delivered(flow));
amount_msat_reduce(&p->total_sent, flow->amounts[0]);
/* Release the HTLCs in the uncertainty_network. */
return tal_free(flow);
amount_msat_reduce(&pf->payment->total_delivering,
payflow_delivered(pf));
amount_msat_reduce(&pf->payment->total_sent, pf->amounts[0]);
}
/* We've been notified that a pay_flow has failed */
void pay_flow_failed(struct pay_flow *pf)
{
assert(pf->state == PAY_FLOW_IN_PROGRESS);
pf->state = PAY_FLOW_FAILED;
payment_remove_flowamount(pf);
payment_reconsider(pf->payment);
}
/* We've been notified that a pay_flow has failed, payment is done. */
void pay_flow_failed_final(struct pay_flow *pf,
enum jsonrpc_errcode final_error,
const char *final_msg TAKES)
{
assert(pf->state == PAY_FLOW_IN_PROGRESS);
pf->state = PAY_FLOW_FAILED_FINAL;
pf->final_error = final_error;
pf->final_msg = tal_strdup(pf, final_msg);
payment_remove_flowamount(pf);
payment_reconsider(pf->payment);
}
/* We've been notified that a pay_flow has failed, adding gossip. */
void pay_flow_failed_adding_gossip(struct pay_flow *pf)
{
assert(pf->state == PAY_FLOW_IN_PROGRESS);
pf->state = PAY_FLOW_FAILED_GOSSIP_PENDING;
payment_remove_flowamount(pf);
}
/* We've finished adding gossip. */
void pay_flow_finished_adding_gossip(struct pay_flow *pf)
{
assert(pf->state == PAY_FLOW_FAILED_GOSSIP_PENDING);
pf->state = PAY_FLOW_FAILED;
payment_reconsider(pf->payment);
}
/* We've been notified that a pay_flow has succeeded. */
void pay_flow_succeeded(struct pay_flow *pf,
const struct preimage *preimage)
{
assert(pf->state == PAY_FLOW_IN_PROGRESS);
pf->state = PAY_FLOW_SUCCESS;
pf->payment_preimage = tal_dup(pf, struct preimage, preimage);
payment_reconsider(pf->payment);
}

View File

@ -8,9 +8,36 @@
#include <plugins/renepay/flow.h>
#include <plugins/renepay/payment.h>
/* There are several states a payment can be in */
enum pay_flow_state {
/* Created, but not sent to sendpay */
PAY_FLOW_NOT_STARTED,
/* Normally, here */
PAY_FLOW_IN_PROGRESS,
/* Failed: we've fed the data back to the uncertainly network. */
PAY_FLOW_FAILED,
/* Failed from the final node, so give up: see ->final_error. */
PAY_FLOW_FAILED_FINAL,
/* Failed, but still updating gossip. */
PAY_FLOW_FAILED_GOSSIP_PENDING,
/* Succeeded: see ->payment_preimage. */
PAY_FLOW_SUCCESS,
};
#define NUM_PAY_FLOW (PAY_FLOW_SUCCESS + 1)
/* This is like a struct flow, but independent of gossmap, and contains
* all we need to actually send the part payment. */
struct pay_flow {
/* Linked from payment->flows */
struct list_node list;
enum pay_flow_state state;
/* Iff state == PAY_FLOW_SUCCESS */
const struct preimage *payment_preimage;
/* Iff state == PAY_FAILED_FINAL */
enum jsonrpc_errcode final_error;
const char *final_msg;
/* So we can be an independent object for callbacks. */
struct payment * payment;
@ -76,21 +103,32 @@ HTABLE_DEFINE_TYPE(struct pay_flow,
payflow_get_key, payflow_key_hash, payflow_key_equal,
payflow_map);
/* Add one or more IN_PROGRESS pay_flow to payment. Return NULL if we did,
* otherwise an error message (and sets *ecode). */
const char *add_payflows(const tal_t *ctx,
struct payment *payment,
struct amount_msat amount,
struct amount_msat feebudget,
bool is_entire_payment,
enum jsonrpc_errcode *ecode);
struct pay_flow **get_payflows(struct payment *payment,
struct amount_msat amount,
struct amount_msat feebudget,
bool is_entire_payment,
const char **err_msg);
/* Each payflow is eventually terminated by one of these: */
void commit_htlc_payflow(
struct chan_extra_map *chan_extra_map,
const struct pay_flow *flow);
void remove_htlc_payflow(
struct chan_extra_map *chan_extra_map,
struct pay_flow *flow);
/* We've been notified that a pay_flow has failed */
void pay_flow_failed(struct pay_flow *pf STEALS);
/* We've been notified that a pay_flow has failed, payment is done. */
void pay_flow_failed_final(struct pay_flow *pf STEALS,
enum jsonrpc_errcode final_error,
const char *final_msg TAKES);
/* We've been notified that a pay_flow has failed, adding gossip. */
void pay_flow_failed_adding_gossip(struct pay_flow *pf STEALS);
/* We've finished adding gossip. */
void pay_flow_finished_adding_gossip(struct pay_flow *pf STEALS);
/* We've been notified that a pay_flow has succeeded. */
void pay_flow_succeeded(struct pay_flow *pf STEALS,
const struct preimage *preimage);
/* Formatting helpers */
const char *flow_path_to_str(const tal_t *ctx, const struct pay_flow *flow);
const char* fmt_payflows(const tal_t *ctx,
@ -99,9 +137,4 @@ const char* fmt_payflows(const tal_t *ctx,
/* How much does this flow deliver to destination? */
struct amount_msat payflow_delivered(const struct pay_flow *flow);
/* Removes amounts from payment and frees flow pointer.
* A possible destructor for flow would remove HTLCs from the
* uncertainty_network and remove the flow from any data structure. */
struct pay_flow* payflow_fail(struct pay_flow *flow);
#endif /* LIGHTNING_PLUGINS_RENEPAY_PAY_FLOW_H */

View File

@ -1,6 +1,8 @@
#include "config.h"
#include <ccan/ccan/tal/str/str.h>
#include <common/memleak.h>
#include <plugins/renepay/debug.h>
#include <plugins/renepay/pay_flow.h>
#include <plugins/renepay/payment.h>
struct payment *payment_new(const tal_t *ctx,
@ -47,6 +49,7 @@ struct payment *payment_new(const tal_t *ctx,
p->payment_secret = tal_dup_or_null(p, struct secret, payment_secret);
p->payment_metadata = tal_dup_talarr(p, u8, payment_metadata);
p->status=PAYMENT_PENDING;
list_head_init(&p->flows);
p->final_cltv=final_cltv;
// p->list=
p->description = tal_strdup_or_null(p, description);
@ -63,8 +66,8 @@ struct payment *payment_new(const tal_t *ctx,
p->local_gossmods = gossmap_localmods_new(p);
p->disabled = tal_arr(p,struct short_channel_id,0);
p->rexmit_timer = NULL;
p->next_partid=1;
p->progress_deadline = NULL;
return p;
}
@ -128,13 +131,8 @@ void payment_assert_delivering_all(const struct payment *p)
}
}
struct command_result *payment_success(struct payment *p)
static struct command_result *payment_success(struct payment *p)
{
debug_info("calling %s",__PRETTY_FUNCTION__);
p->status = PAYMENT_SUCCESS;
payment_assert_delivering_all(p);
/* We only finish command once: its destructor clears this. */
if (!p->cmd)
return NULL;
@ -162,12 +160,14 @@ struct command_result *payment_fail(
enum jsonrpc_errcode code,
const char *fmt, ...)
{
/* We only finish command once: its destructor clears this. */
if (!payment->cmd)
return NULL;
/* We usually get called because a flow failed, but we
* can also get called because we couldn't route any more
* or some strange error. */
payment->status = PAYMENT_FAIL;
if (payment->status != PAYMENT_SUCCESS)
payment->status = PAYMENT_FAIL;
/* We only finish command once: its destructor clears this. */
if (!payment->cmd)
return NULL;
va_list args;
va_start(args, fmt);
@ -184,15 +184,200 @@ u64 payment_parts(const struct payment *payment)
return payment->next_partid-1;
}
/* Either the payment succeeded or failed, we need to cleanup/set the plugin
* into a valid state before the next payment. */
void payment_cleanup(struct payment *payment)
void payment_reconsider(struct payment *payment)
{
debug_info("calling %s",__PRETTY_FUNCTION__);
// TODO(eduardo): it can happen that local_gossmods removed below
// contained a set of channels for which there is information in the
// uncertainty network (chan_extra_map) and that are part of some pending
// payflow (payflow_map). Handle this situation.
payment->local_gossmods = tal_free(payment->local_gossmods);
payment->rexmit_timer = tal_free(payment->rexmit_timer);
struct pay_flow *i, *next;
bool have_state[NUM_PAY_FLOW] = {false};
enum jsonrpc_errcode final_error, ecode;
const char *final_msg;
const char *errmsg;
plugin_log(pay_plugin->plugin, LOG_DBG, "payment_reconsider");
/* Harvest results and free up finished flows */
list_for_each_safe(&payment->flows, i, next, list) {
plugin_log(pay_plugin->plugin, LOG_DBG, "Flow in state %u", i->state);
have_state[i->state] = true;
switch (i->state) {
case PAY_FLOW_NOT_STARTED:
/* Can't happen: we start just after we add. */
plugin_err(pay_plugin->plugin, "flow not started?");
case PAY_FLOW_IN_PROGRESS:
/* Don't free, it's still going! */
continue;
case PAY_FLOW_FAILED:
break;
case PAY_FLOW_FAILED_FINAL:
final_error = i->final_error;
final_msg = tal_steal(tmpctx, i->final_msg);
break;
case PAY_FLOW_FAILED_GOSSIP_PENDING:
break;
case PAY_FLOW_SUCCESS:
if (payment->preimage) {
/* This should be impossible without breaking SHA256 */
if (!preimage_eq(payment->preimage,
i->payment_preimage)) {
plugin_err(pay_plugin->plugin,
"Impossible preimage clash for %s: %s and %s?",
type_to_string(tmpctx,
struct sha256,
&payment->payment_hash),
type_to_string(tmpctx,
struct preimage,
payment->preimage),
type_to_string(tmpctx,
struct preimage,
i->payment_preimage));
}
} else {
payment->preimage = tal_dup(payment, struct preimage,
i->payment_preimage);
}
break;
}
tal_free(i);
}
/* First, did one of these succeed? */
if (have_state[PAY_FLOW_SUCCESS]) {
plugin_log(pay_plugin->plugin, LOG_DBG, "one succeeded!");
switch (payment->status) {
case PAYMENT_PENDING:
/* The normal case: one part succeeded, we can succeed immediately */
payment_success(payment);
payment->status = PAYMENT_SUCCESS;
/* fall thru */
case PAYMENT_SUCCESS:
/* Since we already succeeded, cmd must be NULL */
assert(payment->cmd == NULL);
break;
case PAYMENT_FAIL:
/* OK, they told us it failed, but also
* succeeded? It's theoretically possible,
* but someone screwed up. */
plugin_log(pay_plugin->plugin, LOG_BROKEN,
"Destination %s succeeded payment %s"
" (preimage %s) after previous final failure?",
type_to_string(tmpctx, struct node_id,
&payment->destination),
type_to_string(tmpctx, struct sha256,
&payment->payment_hash),
type_to_string(tmpctx,
struct preimage,
payment->preimage));
break;
}
/* We don't need to do anything else. */
return;
}
/* One of these returned an error from the destination? */
if (have_state[PAY_FLOW_FAILED_FINAL]) {
plugin_log(pay_plugin->plugin, LOG_DBG, "one failed final!");
switch (payment->status) {
case PAYMENT_PENDING:
/* The normal case: we can fail immediately */
payment_fail(payment, final_error, "%s", final_msg);
/* fall thru */
case PAYMENT_FAIL:
/* Since we already failed, cmd must be NULL */
assert(payment->cmd == NULL);
break;
case PAYMENT_SUCCESS:
/* OK, they told us it failed, but also
* succeeded? It's theoretically possible,
* but someone screwed up. */
plugin_log(pay_plugin->plugin, LOG_BROKEN,
"Destination %s failed payment %s with %u/%s"
" after previous success?",
type_to_string(tmpctx, struct node_id,
&payment->destination),
type_to_string(tmpctx, struct sha256,
&payment->payment_hash),
final_error, final_msg);
break;
}
/* We don't need to do anything else. */
return;
}
/* Now, do we still care about retrying the payment? It could
* have terminated a while ago, and we're just collecting
* outstanding results. */
switch (payment->status) {
case PAYMENT_PENDING:
break;
case PAYMENT_FAIL:
case PAYMENT_SUCCESS:
assert(!payment->cmd);
plugin_log(pay_plugin->plugin, LOG_DBG, "payment already status %u!",
payment->status);
return;
}
/* Are we waiting on addgossip? We'll come back later when
* they call pay_flow_finished_adding_gossip. */
if (have_state[PAY_FLOW_FAILED_GOSSIP_PENDING]) {
plugin_log(pay_plugin->plugin, LOG_DBG,
"%s waiting on addgossip return",
type_to_string(tmpctx, struct sha256,
&payment->payment_hash));
return;
}
/* Do we still have pending payment parts? First time, we set
* up a deadline so we don't respond immediately to every
* return: it's better to gather a few failed flows before
* retrying. */
if (have_state[PAY_FLOW_IN_PROGRESS]) {
struct timemono now = time_mono();
/* If we don't have a deadline yet, set it now. */
if (!payment->progress_deadline) {
payment->progress_deadline = tal(payment, struct timemono);
*payment->progress_deadline = timemono_add(now,
time_from_msec(TIMER_COLLECT_FAILURES_MSEC));
plugin_log(pay_plugin->plugin, LOG_DBG, "Set deadline");
}
/* FIXME: add timemono_before to ccan/time */
if (time_less_(now.ts, payment->progress_deadline->ts)) {
/* Come back later. */
/* We don't care that this temporily looks like a leak; we don't even
* care if we end up with multiple outstanding. They just check
* the progress_deadline. */
plugin_log(pay_plugin->plugin, LOG_DBG, "Setting timer to kick us");
notleak(plugin_timer(pay_plugin->plugin,
timemono_between(*payment->progress_deadline, now),
payment_reconsider, payment));
return;
}
}
/* At this point, we may have some funds to deliver (or we
* could still be waiting). */
if (amount_msat_greater_eq(payment->total_delivering, payment->amount)) {
plugin_log(pay_plugin->plugin, LOG_DBG, "No more to deliver right now");
assert(have_state[PAY_FLOW_IN_PROGRESS]);
return;
}
/* If we had a deadline, reset it */
payment->progress_deadline = tal_free(payment->progress_deadline);
/* Before we do that, make sure we're not going over time. */
if (time_after(time_now(), payment->stop_time)) {
payment_fail(payment, PAY_STOPPED_RETRYING, "Timed out");
return;
}
plugin_log(pay_plugin->plugin, LOG_DBG, "Retrying payment");
errmsg = try_paying(tmpctx, payment, &ecode);
if (errmsg)
payment_fail(payment, ecode, "%s", errmsg);
}

View File

@ -8,12 +8,20 @@ enum payment_status {
PAYMENT_PENDING, PAYMENT_SUCCESS, PAYMENT_FAIL
};
struct payment {
/* Inside pay_plugin->payments list */
struct list_node list;
/* The command if still running (needed for timer func) */
/* Overall, how are we going? */
enum payment_status status;
/* The flows we are managing. */
struct list_head flows;
/* Deadline for flow status collection. */
struct timemono *progress_deadline;
/* The command if still running */
struct command *cmd;
/* Localmods to apply to gossip_map for our own use. */
@ -22,9 +30,6 @@ struct payment {
/* Channels we decided to disable for various reasons. */
struct short_channel_id *disabled;
/* Timers. */
struct plugin_timer *rexmit_timer;
/* Used in get_payflows to set ids to each pay_flow. */
u64 next_partid;
@ -66,9 +71,6 @@ struct payment {
/* Payment metadata, if specified by invoice. */
const u8 *payment_metadata;
/* To know if the last attempt failed, succeeded or is it pending. */
enum payment_status status;
u32 final_cltv;
/* Description and labels, if any. */
@ -142,14 +144,14 @@ void payment_note(struct payment *p, const char *fmt, ...);
void payment_assert_delivering_incomplete(const struct payment *p);
void payment_assert_delivering_all(const struct payment *p);
struct command_result *payment_success(struct payment *payment);
/* A flow has changed state, or we've hit a timeout: do something! */
void payment_reconsider(struct payment *p);
u64 payment_parts(const struct payment *payment);
struct command_result *payment_fail(
struct payment *payment,
enum jsonrpc_errcode code,
const char *fmt, ...);
u64 payment_parts(const struct payment *payment);
void payment_cleanup(struct payment *payment);
#endif /* LIGHTNING_PLUGINS_RENEPAY_PAYMENT_H */

View File

@ -17,6 +17,16 @@
#include <common/wireaddr.h>
#include <stdio.h>
/* AUTOGENERATED MOCKS START */
/* Generated stub for pay_plugin */
struct pay_plugin *pay_plugin;
/* Generated stub for try_paying */
const char *try_paying(const tal_t *ctx UNNEEDED,
struct payment *payment UNNEEDED,
enum jsonrpc_errcode *ecode UNNEEDED)
{ fprintf(stderr, "try_paying called!\n"); abort(); }
/* AUTOGENERATED MOCKS END */
static u8 empty_map[] = {
0
};

View File

@ -17,6 +17,16 @@
#include <inttypes.h>
#include <stdio.h>
/* AUTOGENERATED MOCKS START */
/* Generated stub for pay_plugin */
struct pay_plugin *pay_plugin;
/* Generated stub for try_paying */
const char *try_paying(const tal_t *ctx UNNEEDED,
struct payment *payment UNNEEDED,
enum jsonrpc_errcode *ecode UNNEEDED)
{ fprintf(stderr, "try_paying called!\n"); abort(); }
/* AUTOGENERATED MOCKS END */
static void swap(int *a, int *b)
{
int temp = *a;

View File

@ -16,6 +16,16 @@
#include "../uncertainty_network.c"
#include "../mcf.c"
/* AUTOGENERATED MOCKS START */
/* Generated stub for pay_plugin */
struct pay_plugin *pay_plugin;
/* Generated stub for try_paying */
const char *try_paying(const tal_t *ctx UNNEEDED,
struct payment *payment UNNEEDED,
enum jsonrpc_errcode *ecode UNNEEDED)
{ fprintf(stderr, "try_paying called!\n"); abort(); }
/* AUTOGENERATED MOCKS END */
static const u8 canned_map[] = {
0x0c, 0x80, 0x00, 0x01, 0xbc, 0x86, 0xe4, 0xbf, 0x95, 0x00, 0x00, 0x00, 0x00, 0x10, 0x08, 0x00,
0x00, 0x00, 0x00, 0x00, 0x0f, 0x42, 0x40, 0x01, 0xb0, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,

View File

@ -59,7 +59,7 @@ def test_errors(node_factory, bitcoind):
node_factory.join_nodes([l1, l3, l5],
wait_for_announce=True, fundamount=1000000)
failmsg = r'Destination is unreacheable in the network gossip.'
failmsg = r'Destination is unknown in the network gossip.'
with pytest.raises(RpcError, match=failmsg):
l1.rpc.call('renepay', {'invstring': inv})
@ -208,7 +208,7 @@ def test_limits(node_factory):
# FIXME: pylightning should define these!
# PAY_STOPPED_RETRYING = 210
PAY_ROUTE_NOT_FOUND = 205
PAY_ROUTE_TOO_EXPENSIVE = 206
inv = l6.rpc.invoice("any", "any", 'description')
@ -217,7 +217,7 @@ def test_limits(node_factory):
with pytest.raises(RpcError, match=failmsg) as err:
l1.rpc.call(
'renepay', {'invstring': inv['bolt11'], 'amount_msat': 1000000, 'maxfee': 1})
assert err.value.error['code'] == PAY_ROUTE_NOT_FOUND
assert err.value.error['code'] == PAY_ROUTE_TOO_EXPENSIVE
# TODO(eduardo): which error code shall we use here?
# TODO(eduardo): shall we list attempts in renepay?
@ -228,7 +228,7 @@ def test_limits(node_factory):
with pytest.raises(RpcError, match=failmsg) as err:
l1.rpc.call(
'renepay', {'invstring': inv['bolt11'], 'amount_msat': 1000000, 'maxdelay': 0})
assert err.value.error['code'] == PAY_ROUTE_NOT_FOUND
assert err.value.error['code'] == PAY_ROUTE_TOO_EXPENSIVE
inv2 = l6.rpc.invoice("800000sat", "inv2", 'description')
l1.rpc.call(