bkpr: add two custom notifications that we listen for

It might be nice to let the bookkeeper keep track of external accounts
as well as the internal onchain wallet? To this end, we add some new
custom notifications, which the bookkeeper will ingest and add to its
ledger.

Suggested-By: @chrisguida

Changelog-Added: PLUGINS: `bookkeeper` now listens for two custom events: `utxo_deposit` and `utxo_spend`. This allows for 3rd party plugins to send onchain coin events to the `bookkeeper`.  See the new plugins/bkpr/README.md for details on how these work!
This commit is contained in:
niftynei 2024-07-01 13:35:01 -05:00 committed by Rusty Russell
parent cd4298de84
commit 0e99f2e718
6 changed files with 375 additions and 11 deletions

77
plugins/bkpr/README.md Normal file
View file

@ -0,0 +1,77 @@
The bookkeeper keeps track of coins moving through your Lightning node.
See the doc/PLUGINS.md#coin_movement section on the message that CLN emits for us to process.
// FIXME: add more detailed documenation for how bookkeeper works.
## 3rd Party Coin Movements
Bookeeper ingests 3rd party plugin notifications about on-chain movements that it should watch.
This allows for us to account for non-internal on-chain wallets in the single place, making `bookkeeper` your single source of truth for bitcoin for an organization or node-operator.
As a plugin writer, if you want to emit onchain events that the bookkeeper should track, you should emit an event with the following format:
```
{
"utxo_deposit": {
"account": "nifty's secret stash",
"transfer_from: null,
"outpoint": xxxx:x,
"amount_msat": "10000sat",
"coin_type": "bc",
"timestamp": xxxx,
"blockheight": xxx,
}
}
```
```
{
"utxo_spend": {
"account": "nifty's secret stash",
"outpoint": xxxx:x,
"spending_txid": xxxx,
"amount_msat": "10000sat",
"coin_type": "bc",
"timestamp": xxxx,
"blockheight": xxx,
}
}
```
## Withdrawing money (sending to a external account)
Sending money to an external account is a bit unintuitive in in the UTXO model that we're using to track coin moves; technically a send to an external account is a "deposit" to 3rd party's UTXO.
To account for these, `bookkeeper` expects to receive a `utxo_deposit` event for the creation of an output to a 3rd party. It's assumed that you'll issue these at transaction creation time, and that they won't be final until we receive notice of spend of the inputs in the tx that created them.
To notify that money is being sent to a 3rd party output, here's the event we'd expect.
The two keys here are the following:
- The `account` is `external`. This is a special account in `bookkeeper` and used for tracking external deposits (aka sends)
- The `transfer_from` field is set to the name of the account that is sending out the money.
```
{
"utxo_deposit": {
"account": "external",
"transfer_from": "nifty's secret stash",
"outpoint": xxxx:x,
"amount_msat": "10000sat",
"coin_type": "bc",
"timestamp": xxxx,
"blockheight": xxx,
}
}
```
## List of todos
List of things to check/work on, as a todo list.
- Transfers btw a 3rd party wallet and the internal CLN wallet? These should be registered as internal transfers and not show up in `listincome`

View file

@ -1519,7 +1519,8 @@ parse_and_log_chain_move(struct command *cmd,
/* Go see if there's any deposits to an external /* Go see if there's any deposits to an external
* that are now confirmed */ * that are now confirmed */
/* FIXME: might need updating when we can splice? */ /* FIXME: might need updating when we can splice? */
maybe_closeout_external_deposits(db, e); maybe_closeout_external_deposits(db, e->spending_txid,
e->blockheight);
db_commit_transaction(db); db_commit_transaction(db);
} }
@ -1674,6 +1675,173 @@ static char *parse_tags(const tal_t *ctx,
return NULL; return NULL;
} }
static struct command_result *json_utxo_deposit(struct command *cmd, const char *buf, const jsmntok_t *params)
{
const char *move_tag ="utxo_deposit";
struct chain_event *ev = tal(cmd, struct chain_event);
struct account *acct;
const char *err;
err = json_scan(tmpctx, buf, params,
"{payload:{utxo_deposit:{"
"account:%"
",transfer_from:%"
",outpoint:%"
",amount_msat:%"
",coin_type:%"
",timestamp:%"
",blockheight:%"
"}}}",
JSON_SCAN_TAL(tmpctx, json_strdup, &ev->acct_name),
JSON_SCAN_TAL(tmpctx, json_strdup, &ev->origin_acct),
JSON_SCAN(json_to_outpoint, &ev->outpoint),
JSON_SCAN(json_to_msat, &ev->credit),
JSON_SCAN_TAL(tmpctx, json_strdup, &ev->currency),
JSON_SCAN(json_to_u64, &ev->timestamp),
JSON_SCAN(json_to_u32, &ev->blockheight));
if (err)
plugin_err(cmd->plugin,
"`%s` payload did not scan %s: %.*s",
move_tag, err, json_tok_full_len(params),
json_tok_full(buf, params));
/* Log the thing */
db_begin_transaction(db);
acct = find_account(tmpctx, db, ev->acct_name);
if (!acct) {
acct = new_account(tmpctx, ev->acct_name, NULL);
account_add(db, acct);
}
ev->tag = "deposit";
ev->ignored = false;
ev->stealable = false;
ev->rebalance = false;
ev->debit = AMOUNT_MSAT(0);
ev->output_value = ev->credit;
ev->spending_txid = NULL;
ev->payment_id = NULL;
ev->desc = NULL;
plugin_log(cmd->plugin, LOG_DBG, "%s (%s|%s) %s -%s %"PRIu64" %d %s",
move_tag, ev->tag, ev->acct_name,
fmt_amount_msat(tmpctx, ev->credit),
fmt_amount_msat(tmpctx, ev->debit),
ev->timestamp, ev->blockheight,
fmt_bitcoin_outpoint(tmpctx, &ev->outpoint));
if (!log_chain_event(db, acct, ev)) {
db_commit_transaction(db);
/* This is not a new event, do nothing */
return notification_handled(cmd);
}
/* Can we calculate any onchain fees now? */
err = maybe_update_onchain_fees(cmd, db, &ev->outpoint.txid);
db_commit_transaction(db);
if (err)
plugin_err(cmd->plugin,
"Unable to update onchain fees %s",
err);
/* FIXME: do account close checks, when allow onchain close to externals? */
return notification_handled(cmd);;
}
static struct command_result *json_utxo_spend(struct command *cmd, const char *buf, const jsmntok_t *params)
{
const char *move_tag ="utxo_spend";
struct account *acct;
struct chain_event *ev = tal(cmd, struct chain_event);
const char *err, *acct_name;
ev->spending_txid = tal(ev, struct bitcoin_txid);
err = json_scan(tmpctx, buf, params,
"{payload:{utxo_spend:{"
"account:%"
",outpoint:%"
",spending_txid:%"
",amount_msat:%"
",coin_type:%"
",timestamp:%"
",blockheight:%"
"}}}",
JSON_SCAN_TAL(tmpctx, json_strdup, &acct_name),
JSON_SCAN(json_to_outpoint, &ev->outpoint),
JSON_SCAN(json_to_txid, ev->spending_txid),
JSON_SCAN(json_to_msat, &ev->debit),
JSON_SCAN_TAL(tmpctx, json_strdup, &ev->currency),
JSON_SCAN(json_to_u64, &ev->timestamp),
JSON_SCAN(json_to_u32, &ev->blockheight));
if (err)
plugin_err(cmd->plugin,
"`%s` payload did not scan %s: %.*s",
move_tag, err, json_tok_full_len(params),
json_tok_full(buf, params));
/* Log the thing */
db_begin_transaction(db);
acct = find_account(tmpctx, db, acct_name);
if (!acct) {
acct = new_account(tmpctx, acct_name, NULL);
account_add(db, acct);
}
ev->origin_acct = NULL;
ev->tag = "withdrawal";
ev->ignored = false;
ev->stealable = false;
ev->rebalance = false;
ev->credit = AMOUNT_MSAT(0);
ev->output_value = ev->debit;
ev->payment_id = NULL;
ev->desc = NULL;
plugin_log(cmd->plugin, LOG_DBG, "%s (%s|%s) %s -%s %"PRIu64" %d %s %s",
move_tag, ev->tag, acct_name,
fmt_amount_msat(tmpctx, ev->credit),
fmt_amount_msat(tmpctx, ev->debit),
ev->timestamp, ev->blockheight,
fmt_bitcoin_outpoint(tmpctx, &ev->outpoint),
fmt_bitcoin_txid(tmpctx, ev->spending_txid));
if (!log_chain_event(db, acct, ev)) {
db_commit_transaction(db);
/* This is not a new event, do nothing */
return notification_handled(cmd);
}
err = maybe_update_onchain_fees(cmd, db, ev->spending_txid);
if (err) {
db_commit_transaction(db);
plugin_err(cmd->plugin,
"Unable to update onchain fees %s",
err);
}
err = maybe_update_onchain_fees(cmd, db, &ev->outpoint.txid);
if (err) {
db_commit_transaction(db);
plugin_err(cmd->plugin,
"Unable to update onchain fees %s",
err);
}
/* Go see if there's any deposits to an external
* that are now confirmed */
/* FIXME: might need updating when we can splice? */
maybe_closeout_external_deposits(db, ev->spending_txid,
ev->blockheight);
db_commit_transaction(db);
/* FIXME: do account close checks, when allow onchain close to externals? */
return notification_handled(cmd);;
}
static struct command_result *json_coin_moved(struct command *cmd, static struct command_result *json_coin_moved(struct command *cmd,
const char *buf, const char *buf,
const jsmntok_t *params) const jsmntok_t *params)
@ -1749,7 +1917,15 @@ const struct plugin_notification notifs[] = {
{ {
"balance_snapshot", "balance_snapshot",
json_balance_snapshot, json_balance_snapshot,
} },
{
"utxo_deposit",
json_utxo_deposit,
},
{
"utxo_spend",
json_utxo_spend,
},
}; };
static const struct plugin_command commands[] = { static const struct plugin_command commands[] = {

View file

@ -1950,11 +1950,12 @@ finished:
} }
void maybe_closeout_external_deposits(struct db *db, void maybe_closeout_external_deposits(struct db *db,
struct chain_event *ev) const struct bitcoin_txid *txid,
u32 blockheight)
{ {
struct db_stmt *stmt; struct db_stmt *stmt;
assert(ev->spending_txid); assert(txid);
stmt = db_prepare_v2(db, SQL("SELECT " stmt = db_prepare_v2(db, SQL("SELECT "
" e.id" " e.id"
" FROM chain_events e" " FROM chain_events e"
@ -1966,7 +1967,7 @@ void maybe_closeout_external_deposits(struct db *db,
/* Blockheight for unconfirmeds is zero */ /* Blockheight for unconfirmeds is zero */
db_bind_int(stmt, 0); db_bind_int(stmt, 0);
db_bind_txid(stmt, ev->spending_txid); db_bind_txid(stmt, txid);
db_bind_text(stmt, EXTERNAL_ACCT); db_bind_text(stmt, EXTERNAL_ACCT);
db_query_prepared(stmt); db_query_prepared(stmt);
@ -1979,7 +1980,7 @@ void maybe_closeout_external_deposits(struct db *db,
" blockheight = ?" " blockheight = ?"
" WHERE id = ?")); " WHERE id = ?"));
db_bind_int(update_stmt, ev->blockheight); db_bind_int(update_stmt, blockheight);
db_bind_u64(update_stmt, id); db_bind_u64(update_stmt, id);
db_exec_prepared_v2(take(update_stmt)); db_exec_prepared_v2(take(update_stmt));
} }
@ -1994,7 +1995,7 @@ bool log_chain_event(struct db *db,
struct db_stmt *stmt; struct db_stmt *stmt;
/* We're responsible for de-duping chain events! */ /* We're responsible for de-duping chain events! */
if (find_chain_event(e, db, acct, if (find_chain_event(tmpctx, db, acct,
&e->outpoint, e->spending_txid, &e->outpoint, e->spending_txid,
e->tag)) e->tag))
return false; return false;

View file

@ -208,7 +208,9 @@ void add_payment_hash_desc(struct db *db,
* *
* This method updates the blockheight on these events to the * This method updates the blockheight on these events to the
* height an input was spent into */ * height an input was spent into */
void maybe_closeout_external_deposits(struct db *db, struct chain_event *ev); void maybe_closeout_external_deposits(struct db *db,
const struct bitcoin_txid *txid,
u32 blockheight);
/* Keep track of rebalancing payments (payments paid to/from ourselves. /* Keep track of rebalancing payments (payments paid to/from ourselves.
* Returns true if was rebalance */ * Returns true if was rebalance */
@ -224,9 +226,10 @@ void log_channel_event(struct db *db,
struct channel_event *e); struct channel_event *e);
/* Log a chain event. /* Log a chain event.
* Returns true if inserted, false if already exists */ * Returns true if inserted, false if already exists;
* ctx is for allocating objects onto chain_event `e` */
bool log_chain_event(struct db *db, bool log_chain_event(struct db *db,
const struct account *acct, const struct account *acct,
struct chain_event *e); struct chain_event *e);
#endif /* LIGHTNING_PLUGINS_BKPR_RECORDER_H */ #endif /* LIGHTNING_PLUGINS_BKPR_RECORDER_H */

View file

@ -0,0 +1,52 @@
#!/usr/bin/env python3
from pyln.client import Plugin
plugin = Plugin()
UTXO_DEPOSIT_TAG = "utxo_deposit"
UTXO_SPEND_TAG = "utxo_spend"
@plugin.method("sendspend")
def emit_spend(plugin, acct, outpoint, txid, amount, **kwargs):
"""Emit a 'utxo_spend' movement
"""
utxo_spend = {
"account": acct,
"outpoint": outpoint,
"spending_txid": txid,
"amount_msat": amount,
"coin_type": "bcrt",
"timestamp": 1679955976,
"blockheight": 111,
}
plugin.notify(UTXO_SPEND_TAG, {UTXO_SPEND_TAG: utxo_spend})
@plugin.method("senddeposit")
def emit_deposit(plugin, acct, is_withdraw, outpoint, amount, **kwargs):
"""Emit a 'utxo_deposit' movement
"""
transfer_from = None
if is_withdraw:
acct = "external"
transfer_from = acct
utxo_deposit = {
"account": acct,
"transfer_from": transfer_from,
"outpoint": outpoint,
"amount_msat": amount,
"coin_type": "bcrt",
"timestamp": 1679955976,
"blockheight": 111,
}
plugin.notify(UTXO_DEPOSIT_TAG, {UTXO_DEPOSIT_TAG: utxo_deposit})
plugin.add_notification_topic(UTXO_DEPOSIT_TAG)
plugin.add_notification_topic(UTXO_SPEND_TAG)
plugin.run()

View file

@ -892,3 +892,58 @@ def test_bookkeeper_lease_fee_dupe_migration(node_factory):
accts_db = Sqlite3Db(accts_db_path) accts_db = Sqlite3Db(accts_db_path)
assert accts_db.query('SELECT tag from channel_events where tag = \'lease_fee\';') == [{'tag': 'lease_fee'}] assert accts_db.query('SELECT tag from channel_events where tag = \'lease_fee\';') == [{'tag': 'lease_fee'}]
def test_bookkeeper_custom_notifs(node_factory):
# FIXME: what happens if we send internal funds to 'external' wallet?
plugin = os.path.join(
os.path.dirname(__file__), "plugins", "bookkeeper_custom_coins.py"
)
l1, l2 = node_factory.line_graph(2, opts=[{'plugin': plugin}, {}])
outpoint_in = 'aa' * 32 + ':0'
spend_txid = 'bb' * 32
amount = 180000000
withdraw_amt = 55555000
fee = 2000
change_deposit = 'bb' * 32 + ':0'
external_deposit = 'bb' * 32 + ':1'
acct = "nifty's secret stash"
l1.rpc.senddeposit(acct, False, outpoint_in, amount)
l1.rpc.sendspend(acct, outpoint_in, spend_txid, amount)
l1.daemon.wait_for_log(r"utxo_deposit \(deposit|nifty's secret stash\) .* -0msat 1679955976 111 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa:0")
l1.daemon.wait_for_log(r"utxo_spend \(withdrawal|nifty's secret stash\) 0msat -12345678000msat 1679955976 111 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa:0 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
# balance should be zero
bals = l1.rpc.bkpr_listbalances()['accounts']
for bal in bals:
if bal['account'] == acct:
# FIXME: how to account for withdraw to external
assert only_one(bal['balances'])['balance_msat'] == Millisatoshi(0)
l1.rpc.senddeposit(acct, False, change_deposit, amount - withdraw_amt - fee)
l1.daemon.wait_for_log(r"utxo_deposit \(deposit|nifty's secret stash\) .* -0msat 1679955976 111 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb:0")
# balance should be equal to amount
events = l1.rpc.bkpr_listaccountevents(acct)['events']
for bal in l1.rpc.bkpr_listbalances()['accounts']:
if bal['account'] == acct:
assert only_one(bal['balances'])['balance_msat'] == Millisatoshi(amount - fee - withdraw_amt)
onchain_fee_one = only_one([x['credit_msat'] for x in events if x['type'] == 'onchain_fee'])
assert onchain_fee_one == fee + withdraw_amt
l1.rpc.senddeposit(acct, True, external_deposit, withdraw_amt)
l1.daemon.wait_for_log(r"utxo_deposit \(deposit|external\) .* -0msat 1679955976 111 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb:1")
events = l1.rpc.bkpr_listaccountevents(acct)['events']
onchain_fees = [x for x in events if x['type'] == 'onchain_fee']
assert len(onchain_fees) == 2
assert onchain_fees[0]['credit_msat'] == onchain_fee_one
assert onchain_fees[1]['debit_msat'] == withdraw_amt
# This should not blow up
incomes = l1.rpc.bkpr_listincome()['income_events']
acct_fee = only_one([inc['debit_msat'] for inc in incomes if inc['account'] == acct and inc['tag'] == 'onchain_fee'])
assert acct_fee == Millisatoshi(fee)