coin moves: persist the coin movement index counter to disk

Should make it easier to track when coin moves in the plugin are
disjoint from what c-lightning says it's broadcast already.
This commit is contained in:
lisa neigut 2020-04-03 18:58:04 -05:00 committed by Rusty Russell
parent 44f747b29a
commit ffd9467f14
10 changed files with 77 additions and 15 deletions

View file

@ -32,8 +32,6 @@ const char *mvt_unit_str(enum mvt_unit_type unit)
return mvt_units[unit]; return mvt_units[unit];
} }
static u64 mvt_count = 0;
struct channel_coin_mvt *new_channel_coin_mvt(const tal_t *ctx, struct channel_coin_mvt *new_channel_coin_mvt(const tal_t *ctx,
struct bitcoin_txid *funding_txid, struct bitcoin_txid *funding_txid,
u32 funding_outnum, u32 funding_outnum,
@ -131,7 +129,8 @@ struct chain_coin_mvt *new_chain_coin_mvt_sat(const tal_t *ctx,
struct coin_mvt *finalize_chain_mvt(const tal_t *ctx, struct coin_mvt *finalize_chain_mvt(const tal_t *ctx,
const struct chain_coin_mvt *chain_mvt, const struct chain_coin_mvt *chain_mvt,
u32 timestamp, u32 timestamp,
struct node_id *node_id) struct node_id *node_id,
s64 count)
{ {
struct coin_mvt *mvt = tal(ctx, struct coin_mvt); struct coin_mvt *mvt = tal(ctx, struct coin_mvt);
@ -151,14 +150,15 @@ struct coin_mvt *finalize_chain_mvt(const tal_t *ctx,
mvt->blockheight = chain_mvt->blockheight; mvt->blockheight = chain_mvt->blockheight;
mvt->version = COIN_MVT_VERSION; mvt->version = COIN_MVT_VERSION;
mvt->node_id = node_id; mvt->node_id = node_id;
mvt->counter = mvt_count++; mvt->counter = count;
return mvt; return mvt;
} }
struct coin_mvt *finalize_channel_mvt(const tal_t *ctx, struct coin_mvt *finalize_channel_mvt(const tal_t *ctx,
const struct channel_coin_mvt *chan_mvt, const struct channel_coin_mvt *chan_mvt,
u32 timestamp, struct node_id *node_id) u32 timestamp, struct node_id *node_id,
s64 count)
{ {
struct coin_mvt *mvt = tal(ctx, struct coin_mvt); struct coin_mvt *mvt = tal(ctx, struct coin_mvt);
@ -179,7 +179,7 @@ struct coin_mvt *finalize_channel_mvt(const tal_t *ctx,
mvt->blockheight = 0; mvt->blockheight = 0;
mvt->version = COIN_MVT_VERSION; mvt->version = COIN_MVT_VERSION;
mvt->node_id = node_id; mvt->node_id = node_id;
mvt->counter = mvt_count++; mvt->counter = count;
return mvt; return mvt;
} }

View file

@ -123,7 +123,6 @@ struct coin_mvt {
struct node_id *node_id; struct node_id *node_id;
/* id of this movement (on this node) */ /* id of this movement (on this node) */
// FIXME: what if node restarts?
u64 counter; u64 counter;
}; };
@ -161,10 +160,12 @@ struct chain_coin_mvt *new_chain_coin_mvt_sat(const tal_t *ctx,
struct coin_mvt *finalize_chain_mvt(const tal_t *ctx, struct coin_mvt *finalize_chain_mvt(const tal_t *ctx,
const struct chain_coin_mvt *chain_mvt, const struct chain_coin_mvt *chain_mvt,
u32 timestamp, u32 timestamp,
struct node_id *node_id); struct node_id *node_id,
s64 mvt_count);
struct coin_mvt *finalize_channel_mvt(const tal_t *ctx, struct coin_mvt *finalize_channel_mvt(const tal_t *ctx,
const struct channel_coin_mvt *chan_mvt, const struct channel_coin_mvt *chan_mvt,
u32 timestamp, struct node_id *node_id); u32 timestamp, struct node_id *node_id,
s64 mvt_count);
const char *mvt_type_str(enum mvt_type type); const char *mvt_type_str(enum mvt_type type);
const char *mvt_tag_str(enum mvt_tag tag); const char *mvt_tag_str(enum mvt_tag tag);

View file

@ -1,14 +1,25 @@
#include <lightningd/coin_mvts.h> #include <lightningd/coin_mvts.h>
#include <lightningd/notification.h> #include <lightningd/notification.h>
static s64 update_count(struct lightningd *ld)
{
s64 count;
count = ++ld->coin_moves_count;
db_set_intvar(ld->wallet->db, "coin_moves_count", count);
return count;
}
void notify_channel_mvt(struct lightningd *ld, const struct channel_coin_mvt *mvt) void notify_channel_mvt(struct lightningd *ld, const struct channel_coin_mvt *mvt)
{ {
const struct coin_mvt *cm; const struct coin_mvt *cm;
u32 timestamp; u32 timestamp;
s64 count;
timestamp = time_now().ts.tv_sec; timestamp = time_now().ts.tv_sec;
count = update_count(ld);
cm = finalize_channel_mvt(mvt, mvt, timestamp, cm = finalize_channel_mvt(mvt, mvt, timestamp,
&ld->id); &ld->id, count);
notify_coin_mvt(ld, cm); notify_coin_mvt(ld, cm);
} }
@ -16,9 +27,25 @@ void notify_chain_mvt(struct lightningd *ld, const struct chain_coin_mvt *mvt)
{ {
const struct coin_mvt *cm; const struct coin_mvt *cm;
u32 timestamp; u32 timestamp;
s64 count;
timestamp = time_now().ts.tv_sec; timestamp = time_now().ts.tv_sec;
count = update_count(ld);
cm = finalize_chain_mvt(mvt, mvt, timestamp, cm = finalize_chain_mvt(mvt, mvt, timestamp,
&ld->id); &ld->id, count);
notify_coin_mvt(ld, cm); notify_coin_mvt(ld, cm);
} }
void coin_mvts_init_count(struct lightningd *ld)
{
s64 count;
db_begin_transaction(ld->wallet->db);
count = db_get_intvar(ld->wallet->db,
"coin_moves_count", -1);
db_commit_transaction(ld->wallet->db);
if (count == -1)
fatal("Something went wrong attempting to fetch"
"the latest `coin_moves_count` from the intvars "
"table");
ld->coin_moves_count = count;
}

View file

@ -7,4 +7,7 @@
void notify_channel_mvt(struct lightningd *ld, const struct channel_coin_mvt *mvt); void notify_channel_mvt(struct lightningd *ld, const struct channel_coin_mvt *mvt);
void notify_chain_mvt(struct lightningd *ld, const struct chain_coin_mvt *mvt); void notify_chain_mvt(struct lightningd *ld, const struct chain_coin_mvt *mvt);
/* Initialize the coin movement counter on lightningd */
void coin_mvts_init_count(struct lightningd *ld);
#endif /* LIGHTNING_LIGHTNINGD_COIN_MVTS_H */ #endif /* LIGHTNING_LIGHTNINGD_COIN_MVTS_H */

View file

@ -70,6 +70,7 @@
#include <lightningd/bitcoind.h> #include <lightningd/bitcoind.h>
#include <lightningd/chaintopology.h> #include <lightningd/chaintopology.h>
#include <lightningd/channel_control.h> #include <lightningd/channel_control.h>
#include <lightningd/coin_mvts.h>
#include <lightningd/connect_control.h> #include <lightningd/connect_control.h>
#include <lightningd/invoice.h> #include <lightningd/invoice.h>
#include <lightningd/io_loop_with_timers.h> #include <lightningd/io_loop_with_timers.h>
@ -825,6 +826,10 @@ int main(int argc, char *argv[])
* states, invoices, payments, blocks and bitcoin transactions. */ * states, invoices, payments, blocks and bitcoin transactions. */
ld->wallet = wallet_new(ld, ld->timers); ld->wallet = wallet_new(ld, ld->timers);
/*~ We keep track of how many 'coin moves' we've ever made.
* Initialize the starting value from the database here. */
coin_mvts_init_count(ld);
/*~ We keep a filter of scriptpubkeys we're interested in. */ /*~ We keep a filter of scriptpubkeys we're interested in. */
ld->owned_txfilter = txfilter_new(ld); ld->owned_txfilter = txfilter_new(ld);

View file

@ -268,6 +268,10 @@ struct lightningd {
alt_subdaemon_map alt_subdaemons; alt_subdaemon_map alt_subdaemons;
enum lightningd_state state; enum lightningd_state state;
/* Total number of coin moves we've seen, since
* coin move tracking was cool */
s64 coin_moves_count;
}; };
/* Turning this on allows a tal allocation to return NULL, rather than aborting. /* Turning this on allows a tal allocation to return NULL, rather than aborting.

View file

@ -28,6 +28,9 @@ size_t bigsize_put(u8 buf[BIGSIZE_MAX_LEN] UNNEEDED, bigsize_t v UNNEEDED)
void channel_notify_new_block(struct lightningd *ld UNNEEDED, void channel_notify_new_block(struct lightningd *ld UNNEEDED,
u32 block_height UNNEEDED) u32 block_height UNNEEDED)
{ fprintf(stderr, "channel_notify_new_block called!\n"); abort(); } { fprintf(stderr, "channel_notify_new_block called!\n"); abort(); }
/* Generated stub for coin_mvts_init_count */
void coin_mvts_init_count(struct lightningd *ld UNNEEDED)
{ fprintf(stderr, "coin_mvts_init_count called!\n"); abort(); }
/* Generated stub for connectd_activate */ /* Generated stub for connectd_activate */
void connectd_activate(struct lightningd *ld UNNEEDED) void connectd_activate(struct lightningd *ld UNNEEDED)
{ fprintf(stderr, "connectd_activate called!\n"); abort(); } { fprintf(stderr, "connectd_activate called!\n"); abort(); }

View file

@ -7,7 +7,7 @@ from pyln.proto import Invoice
from utils import ( from utils import (
DEVELOPER, only_one, sync_blockheight, TIMEOUT, wait_for, TEST_NETWORK, DEVELOPER, only_one, sync_blockheight, TIMEOUT, wait_for, TEST_NETWORK,
DEPRECATED_APIS, expected_peer_features, expected_node_features, account_balance, DEPRECATED_APIS, expected_peer_features, expected_node_features, account_balance,
check_coin_moves, first_channel_id check_coin_moves, first_channel_id, check_coin_moves_idx
) )
import json import json
@ -1391,9 +1391,9 @@ def test_coin_movement_notices(node_factory, bitcoind):
] ]
l1, l2, l3 = node_factory.line_graph(3, opts=[ l1, l2, l3 = node_factory.line_graph(3, opts=[
{}, {'may_reconnect': True},
{'plugin': os.path.join(os.getcwd(), 'tests/plugins/coin_movements.py')}, {'may_reconnect': True, 'plugin': os.path.join(os.getcwd(), 'tests/plugins/coin_movements.py')},
{} {'may_reconnect': True},
], wait_for_announce=True) ], wait_for_announce=True)
bitcoind.generate_block(5) bitcoind.generate_block(5)
@ -1432,6 +1432,9 @@ def test_coin_movement_notices(node_factory, bitcoind):
l2.rpc.sendpay(route, payment_hash21) l2.rpc.sendpay(route, payment_hash21)
l2.rpc.waitsendpay(payment_hash21) l2.rpc.waitsendpay(payment_hash21)
# restart to test index
l2.restart()
# close the channel down # close the channel down
chan1 = l2.get_channel_scid(l1) chan1 = l2.get_channel_scid(l1)
chan3 = l2.get_channel_scid(l3) chan3 = l2.get_channel_scid(l3)
@ -1460,3 +1463,4 @@ def test_coin_movement_notices(node_factory, bitcoind):
check_coin_moves(l2, chanid_1, l1_l2_mvts) check_coin_moves(l2, chanid_1, l1_l2_mvts)
check_coin_moves(l2, chanid_3, l2_l3_mvts) check_coin_moves(l2, chanid_3, l2_l3_mvts)
check_coin_moves(l2, 'wallet', l2_wallet_mvts) check_coin_moves(l2, 'wallet', l2_wallet_mvts)
check_coin_moves_idx(l2)

View file

@ -48,6 +48,19 @@ def check_coin_moves(n, account_id, expected_moves):
assert mv['blockheight'] is not None assert mv['blockheight'] is not None
def check_coin_moves_idx(n):
""" Just check that the counter increments smoothly"""
moves = n.rpc.call('listcoinmoves_plugin')['coin_moves']
idx = 0
for m in moves:
c_idx = m['movement_idx']
# verify that the index count increments smoothly here, also
if c_idx == 0 and idx == 0:
continue
assert c_idx == idx + 1
idx = c_idx
def account_balance(n, account_id): def account_balance(n, account_id):
moves = n.rpc.call('listcoinmoves_plugin')['coin_moves'] moves = n.rpc.call('listcoinmoves_plugin')['coin_moves']
chan_moves = [m for m in moves if m['account_id'] == account_id] chan_moves = [m for m in moves if m['account_id'] == account_id]

View file

@ -610,6 +610,8 @@ static struct migration dbmigrations[] = {
/* For incoming HTLCs, we now keep track of whether or not we provided /* For incoming HTLCs, we now keep track of whether or not we provided
* the preimage for it, or not. */ * the preimage for it, or not. */
{SQL("ALTER TABLE channel_htlcs ADD we_filled INTEGER;"), NULL}, {SQL("ALTER TABLE channel_htlcs ADD we_filled INTEGER;"), NULL},
/* We track the counter for coin_moves, as a convenience for notification consumers */
{SQL("INSERT INTO vars (name, intval) VALUES ('coin_moves_count', 0);"), NULL},
}; };
/* Leak tracking. */ /* Leak tracking. */