bkpr: add journal entry for offset account balances; report listbalances

When the node starts up, it records missing/updated account balances
to the 'channel' events... which is kinda fucked for wallet + external
events now that i think about it but these are all treated the same
anyway so it's fine.

This is the magic piece that lets your bookkeeping data startup ok on an
already running/established node.
This commit is contained in:
niftynei 2022-07-19 17:04:35 +09:30 committed by Rusty Russell
parent b7d85f1d0b
commit 29c6884468
4 changed files with 328 additions and 2 deletions

View File

@ -1,7 +1,9 @@
#include "config.h"
#include <ccan/array_size/array_size.h>
#include <ccan/time/time.h>
#include <common/coin_mvt.h>
#include <common/json_param.h>
#include <common/json_stream.h>
#include <common/memleak.h>
#include <common/type_to_string.h>
#include <db/exec.h>
@ -26,11 +28,51 @@ static struct command_result *json_list_balances(struct command *cmd,
const jsmntok_t *params)
{
struct json_stream *res;
struct account **accts;
char *err;
if (!param(cmd, buf, params, NULL))
return command_param_failed();
res = jsonrpc_stream_success(cmd);
/* List of accts */
db_begin_transaction(db);
accts = list_accounts(cmd, db);
json_array_start(res, "accounts");
for (size_t i = 0; i < tal_count(accts); i++) {
struct acct_balance **balances;
err = account_get_balance(cmd, db,
accts[i]->name,
&balances);
if (err)
plugin_err(cmd->plugin,
"Get account balance returned err"
" for account %s: %s",
accts[i]->name, err);
/* Add it to the result data */
json_object_start(res, NULL);
json_add_string(res, "account_id", accts[i]->name);
json_array_start(res, "balances");
for (size_t j = 0; j < tal_count(balances); j++) {
json_object_start(res, NULL);
json_add_amount_msat_only(res, "balance",
balances[j]->balance);
json_add_string(res, "coin_type",
balances[j]->currency);
json_object_end(res);
}
json_array_end(res);
json_object_end(res);
}
json_array_end(res);
db_commit_transaction(db);
return command_finished(cmd, res);
}
@ -79,8 +121,13 @@ static struct command_result *json_balance_snapshot(struct command *cmd,
json_tok_full(buf, params));
snaps = tal_arr(cmd, struct account_snap, accounts_tok->size);
db_begin_transaction(db);
json_for_each_arr(i, acct_tok, accounts_tok) {
struct acct_balance **balances;
struct amount_msat balance;
struct account_snap s = snaps[i];
err = json_scan(cmd, buf, acct_tok,
"{account_id:%"
",balance_msat:%"
@ -99,9 +146,72 @@ static struct command_result *json_balance_snapshot(struct command *cmd,
plugin_log(cmd->plugin, LOG_DBG, "account %s has balance %s",
s.name,
type_to_string(tmpctx, struct amount_msat, &s.amt));
}
// FIXME: check balances are ok!
/* Find the account and verify the balance */
err = account_get_balance(cmd, db, s.name,
&balances);
if (err)
plugin_err(cmd->plugin,
"Get account balance returned err"
" for account %s: %s",
s.name, err);
/* FIXME: multiple currency balances */
balance = AMOUNT_MSAT(0);
for (size_t j = 0; j < tal_count(balances); j++) {
bool ok;
ok = amount_msat_add(&balance, balance,
balances[j]->balance);
assert(ok);
}
if (!amount_msat_eq(s.amt, balance)) {
struct account *acct;
struct channel_event ev;
plugin_log(cmd->plugin, LOG_UNUSUAL,
"Snapshot balance does not equal ondisk"
" reported %s, on disk %s (account %s)."
" Logging journal entry.",
type_to_string(tmpctx, struct amount_msat, &s.amt),
type_to_string(tmpctx, struct amount_msat, &balance),
s.name);
if (!amount_msat_sub(&ev.credit, s.amt, balance)) {
ev.credit = AMOUNT_MSAT(0);
if (!amount_msat_sub(&ev.debit, balance, s.amt))
plugin_err(cmd->plugin,
"Unable to sub amt");
} else
ev.debit = AMOUNT_MSAT(0);
/* Log a channel "journal entry" to get
* the balances inline */
acct = find_account(cmd, db, s.name);
if (!acct) {
plugin_log(cmd->plugin, LOG_INFORM,
"account %s not found, adding"
" along with new balance",
s.name);
/* FIXME: lookup peer id for channel? */
acct = new_account(cmd, s.name, NULL);
account_add(db, acct);
}
/* This is *not* a coin_mvt tag type */
ev.tag = "journal_entry";
ev.fees = AMOUNT_MSAT(0);
ev.currency = s.coin_type;
ev.part_id = 0;
memset(&ev.payment_id, 0, sizeof(struct sha256));
/* Use current time for this */
ev.timestamp = time_now().ts.tv_sec;
log_channel_event(db, acct, &ev);
}
}
db_commit_transaction(db);
return notification_handled(cmd);
}

View File

@ -187,6 +187,103 @@ static struct chain_event *find_chain_event(const tal_t *ctx,
return e;
}
char *account_get_balance(const tal_t *ctx,
struct db *db,
const char *acct_name,
struct acct_balance ***balances)
{
struct db_stmt *stmt;
stmt = db_prepare_v2(db, SQL("SELECT"
" CAST(SUM(ce.credit) AS BIGINT) as credit"
", CAST(SUM(ce.debit) AS BIGINT) as debit"
", ce.currency"
" FROM chain_events ce"
" LEFT OUTER JOIN accounts a"
" ON a.id = ce.account_id"
" WHERE a.name = ?"
" GROUP BY ce.currency"));
db_bind_text(stmt, 0, acct_name);
db_query_prepared(stmt);
*balances = tal_arr(ctx, struct acct_balance *, 0);
while (db_step(stmt)) {
struct acct_balance *bal;
bal = tal(*balances, struct acct_balance);
bal->currency = db_col_strdup(bal, stmt, "ce.currency");
db_col_amount_msat(stmt, "credit", &bal->credit);
db_col_amount_msat(stmt, "debit", &bal->debit);
tal_arr_expand(balances, bal);
}
tal_free(stmt);
stmt = db_prepare_v2(db, SQL("SELECT"
" CAST(SUM(ce.credit) AS BIGINT) as credit"
", CAST(SUM(ce.debit) AS BIGINT) as debit"
", ce.currency"
" FROM channel_events ce"
" LEFT OUTER JOIN accounts a"
" ON a.id = ce.account_id"
" WHERE a.name = ?"
" GROUP BY ce.currency"));
db_bind_text(stmt, 0, acct_name);
db_query_prepared(stmt);
while (db_step(stmt)) {
struct amount_msat amt;
struct acct_balance *bal = NULL;
char *currency;
currency = db_col_strdup(ctx, stmt, "ce.currency");
/* Find the currency entry from above */
for (size_t i = 0; i < tal_count(*balances); i++) {
if (streq((*balances)[i]->currency, currency)) {
bal = (*balances)[i];
break;
}
}
if (!bal) {
bal = tal(*balances, struct acct_balance);
bal->credit = AMOUNT_MSAT(0);
bal->debit = AMOUNT_MSAT(0);
bal->currency = tal_steal(bal, currency);
tal_arr_expand(balances, bal);
}
db_col_amount_msat(stmt, "credit", &amt);
if (!amount_msat_add(&bal->credit, bal->credit, amt)) {
tal_free(stmt);
return "overflow adding channel_event credits";
}
db_col_amount_msat(stmt, "debit", &amt);
if (!amount_msat_add(&bal->debit, bal->debit, amt)) {
tal_free(stmt);
return "overflow adding channel_event debits";
}
}
tal_free(stmt);
for (size_t i = 0; i < tal_count(*balances); i++) {
struct acct_balance *bal = (*balances)[i];
if (!amount_msat_sub(&bal->balance, bal->credit, bal->debit))
return tal_fmt(ctx,
"%s channel balance is negative? %s - %s",
bal->currency,
type_to_string(ctx, struct amount_msat,
&bal->credit),
type_to_string(ctx, struct amount_msat,
&bal->debit));
}
return NULL;
}
struct channel_event **account_get_channel_events(const tal_t *ctx,
struct db *db,
struct account *acct)

View File

@ -12,6 +12,13 @@ struct db;
enum mvt_tag;
struct onchain_fee;
struct acct_balance {
char *currency;
struct amount_msat credit;
struct amount_msat debit;
struct amount_msat balance;
};
/* Get all accounts */
struct account **list_accounts(const tal_t *ctx, struct db *db);
@ -30,6 +37,12 @@ struct chain_event **account_get_chain_events(const tal_t *ctx,
struct db *db,
struct account *acct);
/* Calculate the balances for an account */
char *account_get_balance(const tal_t *ctx,
struct db *db,
const char *acct_name,
struct acct_balance ***balances);
/* List all chain fees, for all accounts */
struct onchain_fee **list_chain_fees(const tal_t *ctx, struct db *db);

View File

@ -290,6 +290,25 @@ static bool chain_events_eq(struct chain_event *e1, struct chain_event *e2)
return true;
}
static struct channel_event *make_channel_event(const tal_t *ctx,
char *tag,
struct amount_msat credit,
struct amount_msat debit,
char payment_char)
{
struct channel_event *ev = tal(ctx, struct channel_event);
memset(&ev->payment_id, payment_char, sizeof(struct sha256));
ev->credit = credit;
ev->debit = debit;
ev->fees = AMOUNT_MSAT(104);
ev->currency = "btc";
ev->timestamp = 1919191;
ev->part_id = 19;
ev->tag = tag;
return ev;
}
static struct chain_event *make_chain_event(const tal_t *ctx,
char *tag,
struct amount_msat credit,
@ -827,6 +846,92 @@ static bool test_chain_event_crud(const tal_t *ctx, struct plugin *p)
return true;
}
static bool test_account_balances(const tal_t *ctx, struct plugin *p)
{
struct db *db = db_setup(ctx, p, tmp_dsn(ctx));
struct node_id peer_id;
struct account *acct, *acct2;
struct chain_event *ev1;
struct acct_balance **balances;
char *err;
memset(&peer_id, 3, sizeof(struct node_id));
acct = new_account(ctx, tal_fmt(ctx, "example"), &peer_id);
acct2 = new_account(ctx, tal_fmt(ctx, "wallet"), &peer_id);
db_begin_transaction(db);
account_add(db, acct);
account_add(db, acct2);
/* +1000btc */
log_chain_event(db, acct,
make_chain_event(ctx, "one",
AMOUNT_MSAT(1000),
AMOUNT_MSAT(0),
'A', 1, '*'));
/* -999btc */
log_chain_event(db, acct,
make_chain_event(ctx, "two",
AMOUNT_MSAT(0),
AMOUNT_MSAT(999),
'A', 2, '*'));
/* -440btc */
log_channel_event(db, acct,
make_channel_event(ctx, "chan",
AMOUNT_MSAT(0),
AMOUNT_MSAT(440),
'C'));
/* 500btc */
log_channel_event(db, acct,
make_channel_event(ctx, "chan",
AMOUNT_MSAT(500),
AMOUNT_MSAT(0),
'D'));
/* +5000chf */
ev1 = make_chain_event(ctx, "two", AMOUNT_MSAT(5000), AMOUNT_MSAT(0),
'A', 3, '*');
ev1->currency = "chf";
log_chain_event(db, acct, ev1);
/* Add same chain event to a different account, shouldn't show */
log_chain_event(db, acct2, ev1);
err = account_get_balance(ctx, db, acct->name,
&balances);
CHECK_MSG(!err, err);
db_commit_transaction(db);
CHECK_MSG(!db_err, db_err);
/* Should have 2 balances */
CHECK(tal_count(balances) == 2);
CHECK(streq(balances[0]->currency, "btc"));
CHECK(amount_msat_eq(balances[0]->balance, AMOUNT_MSAT(500 - 440 + 1)));
CHECK(streq(balances[1]->currency, "chf"));
CHECK(amount_msat_eq(balances[1]->balance, AMOUNT_MSAT(5000)));
/* Should error if account balance is negative */
db_begin_transaction(db);
/* -5001chf */
ev1 = make_chain_event(ctx, "two",
AMOUNT_MSAT(0), AMOUNT_MSAT(5001),
'A', 4, '*');
ev1->currency = "chf";
log_chain_event(db, acct, ev1);
err = account_get_balance(ctx, db, acct->name,
&balances);
CHECK_MSG(err != NULL, "Expected err message");
CHECK(streq(err, "chf channel balance is negative? 5000msat - 5001msat"));
db_commit_transaction(db);
return true;
}
static bool test_account_crud(const tal_t *ctx, struct plugin *p)
{
struct db *db = db_setup(ctx, p, tmp_dsn(ctx));
@ -940,6 +1045,7 @@ int main(int argc, char *argv[])
ok &= test_account_crud(tmpctx, plugin);
ok &= test_channel_event_crud(tmpctx, plugin);
ok &= test_chain_event_crud(tmpctx, plugin);
ok &= test_account_balances(tmpctx, plugin);
ok &= test_onchain_fee_chan_close(tmpctx, plugin);
ok &= test_onchain_fee_chan_open(tmpctx, plugin);
ok &= test_onchain_fee_wallet_spend(tmpctx, plugin);