mirror of
https://github.com/lightningdevkit/rust-lightning.git
synced 2025-03-15 15:39:09 +01:00
Replace use of ChainWatchInterface with WatchEvent
SimpleManyChannelMonitor is parameterized by ChainWatchInterface to signal what transactions and outputs to watch for on chain. The interface has grown to cover chain access (via get_chain_utxo) and block block filtering (via filter_block and reentered), which has added complexity for implementations and user (see ChainWatchInterfaceUtil). Pull the watch functionality out as a first step to eliminating ChainWatchInterface entirely.
This commit is contained in:
parent
1ab0a1acc1
commit
bd39b20f64
4 changed files with 157 additions and 49 deletions
|
@ -9,6 +9,41 @@
|
|||
|
||||
//! Structs and traits which allow other parts of rust-lightning to interact with the blockchain.
|
||||
|
||||
use bitcoin::blockdata::script::Script;
|
||||
use bitcoin::hash_types::Txid;
|
||||
|
||||
use chain::transaction::OutPoint;
|
||||
|
||||
pub mod chaininterface;
|
||||
pub mod transaction;
|
||||
pub mod keysinterface;
|
||||
|
||||
/// An interface for providing [`WatchEvent`]s.
|
||||
///
|
||||
/// [`WatchEvent`]: enum.WatchEvent.html
|
||||
pub trait WatchEventProvider {
|
||||
/// Releases events produced since the last call. Subsequent calls must only return new events.
|
||||
fn release_pending_watch_events(&self) -> Vec<WatchEvent>;
|
||||
}
|
||||
|
||||
/// An event indicating on-chain activity to watch for pertaining to a channel.
|
||||
pub enum WatchEvent {
|
||||
/// Watch for a transaction with `txid` and having an output with `script_pubkey` as a spending
|
||||
/// condition.
|
||||
WatchTransaction {
|
||||
/// Identifier of the transaction.
|
||||
txid: Txid,
|
||||
|
||||
/// Spending condition for an output of the transaction.
|
||||
script_pubkey: Script,
|
||||
},
|
||||
/// Watch for spends of a transaction output identified by `outpoint` having `script_pubkey` as
|
||||
/// the spending condition.
|
||||
WatchOutput {
|
||||
/// Identifier for the output.
|
||||
outpoint: OutPoint,
|
||||
|
||||
/// Spending condition for the output.
|
||||
script_pubkey: Script,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,7 +40,8 @@ use ln::chan_utils;
|
|||
use ln::chan_utils::{CounterpartyCommitmentSecrets, HTLCOutputInCommitment, HolderCommitmentTransaction, HTLCType};
|
||||
use ln::channelmanager::{HTLCSource, PaymentPreimage, PaymentHash};
|
||||
use ln::onchaintx::{OnchainTxHandler, InputDescriptors};
|
||||
use chain::chaininterface::{ChainListener, ChainWatchInterface, BroadcasterInterface, FeeEstimator};
|
||||
use chain;
|
||||
use chain::chaininterface::{ChainListener, ChainWatchInterface, ChainWatchedUtil, BroadcasterInterface, FeeEstimator};
|
||||
use chain::transaction::OutPoint;
|
||||
use chain::keysinterface::{SpendableOutputDescriptor, ChannelKeys};
|
||||
use util::logger::Logger;
|
||||
|
@ -48,7 +49,7 @@ use util::ser::{Readable, MaybeReadable, Writer, Writeable, U48};
|
|||
use util::{byte_utils, events};
|
||||
use util::events::Event;
|
||||
|
||||
use std::collections::{HashMap, hash_map};
|
||||
use std::collections::{HashMap, HashSet, hash_map};
|
||||
use std::sync::Mutex;
|
||||
use std::{hash,cmp, mem};
|
||||
use std::ops::Deref;
|
||||
|
@ -198,12 +199,74 @@ pub struct SimpleManyChannelMonitor<Key, ChanSigner: ChannelKeys, T: Deref, F: D
|
|||
{
|
||||
/// The monitors
|
||||
pub monitors: Mutex<HashMap<Key, ChannelMonitor<ChanSigner>>>,
|
||||
watch_events: Mutex<WatchEventQueue>,
|
||||
chain_monitor: C,
|
||||
broadcaster: T,
|
||||
logger: L,
|
||||
fee_estimator: F
|
||||
}
|
||||
|
||||
struct WatchEventQueue {
|
||||
watched: ChainWatchedUtil,
|
||||
events: Vec<chain::WatchEvent>,
|
||||
}
|
||||
|
||||
impl WatchEventQueue {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
watched: ChainWatchedUtil::new(),
|
||||
events: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn watch_tx(&mut self, txid: &Txid, script_pubkey: &Script) {
|
||||
if self.watched.register_tx(txid, script_pubkey) {
|
||||
self.events.push(chain::WatchEvent::WatchTransaction {
|
||||
txid: *txid,
|
||||
script_pubkey: script_pubkey.clone()
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn watch_output(&mut self, outpoint: (&Txid, usize), script_pubkey: &Script) {
|
||||
let (txid, index) = outpoint;
|
||||
if self.watched.register_outpoint((*txid, index as u32), script_pubkey) {
|
||||
self.events.push(chain::WatchEvent::WatchOutput {
|
||||
outpoint: OutPoint {
|
||||
txid: *txid,
|
||||
index: index as u16,
|
||||
},
|
||||
script_pubkey: script_pubkey.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn dequeue_events(&mut self) -> Vec<chain::WatchEvent> {
|
||||
let mut pending_events = Vec::with_capacity(self.events.len());
|
||||
pending_events.append(&mut self.events);
|
||||
pending_events
|
||||
}
|
||||
|
||||
fn filter_block<'a>(&self, txdata: &[(usize, &'a Transaction)]) -> Vec<(usize, &'a Transaction)> {
|
||||
let mut matched_txids = HashSet::new();
|
||||
txdata.iter().filter(|&&(_, tx)| {
|
||||
// A tx matches the filter if it either matches the filter directly (via does_match_tx)
|
||||
// or if it is a descendant of another matched transaction within the same block.
|
||||
let mut matched = self.watched.does_match_tx(tx);
|
||||
for input in tx.input.iter() {
|
||||
if matched || matched_txids.contains(&input.previous_output.txid) {
|
||||
matched = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if matched {
|
||||
matched_txids.insert(tx.txid());
|
||||
}
|
||||
matched
|
||||
}).map(|e| *e).collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl<Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref + Sync + Send, F: Deref + Sync + Send, L: Deref + Sync + Send, C: Deref + Sync + Send>
|
||||
ChainListener for SimpleManyChannelMonitor<Key, ChanSigner, T, F, L, C>
|
||||
where T::Target: BroadcasterInterface,
|
||||
|
@ -212,24 +275,19 @@ impl<Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref + Sync
|
|||
C::Target: ChainWatchInterface,
|
||||
{
|
||||
fn block_connected(&self, header: &BlockHeader, txdata: &[(usize, &Transaction)], height: u32) {
|
||||
let mut reentered = true;
|
||||
while reentered {
|
||||
let matched_indexes = self.chain_monitor.filter_block(header, txdata);
|
||||
let matched_txn: Vec<_> = matched_indexes.iter().map(|index| txdata[*index]).collect();
|
||||
let last_seen = self.chain_monitor.reentered();
|
||||
{
|
||||
let mut monitors = self.monitors.lock().unwrap();
|
||||
for monitor in monitors.values_mut() {
|
||||
let txn_outputs = monitor.block_connected(header, &matched_txn, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
|
||||
let mut watch_events = self.watch_events.lock().unwrap();
|
||||
let matched_txn = watch_events.filter_block(txdata);
|
||||
{
|
||||
let mut monitors = self.monitors.lock().unwrap();
|
||||
for monitor in monitors.values_mut() {
|
||||
let txn_outputs = monitor.block_connected(header, &matched_txn, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
|
||||
|
||||
for (ref txid, ref outputs) in txn_outputs {
|
||||
for (idx, output) in outputs.iter().enumerate() {
|
||||
self.chain_monitor.install_watch_outpoint((txid.clone(), idx as u32), &output.script_pubkey);
|
||||
}
|
||||
for (ref txid, ref outputs) in txn_outputs {
|
||||
for (idx, output) in outputs.iter().enumerate() {
|
||||
watch_events.watch_output((txid, idx), &output.script_pubkey);
|
||||
}
|
||||
}
|
||||
}
|
||||
reentered = last_seen != self.chain_monitor.reentered();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -252,6 +310,7 @@ impl<Key : Send + cmp::Eq + hash::Hash + 'static, ChanSigner: ChannelKeys, T: De
|
|||
pub fn new(chain_monitor: C, broadcaster: T, logger: L, feeest: F) -> SimpleManyChannelMonitor<Key, ChanSigner, T, F, L, C> {
|
||||
let res = SimpleManyChannelMonitor {
|
||||
monitors: Mutex::new(HashMap::new()),
|
||||
watch_events: Mutex::new(WatchEventQueue::new()),
|
||||
chain_monitor,
|
||||
broadcaster,
|
||||
logger,
|
||||
|
@ -263,6 +322,7 @@ impl<Key : Send + cmp::Eq + hash::Hash + 'static, ChanSigner: ChannelKeys, T: De
|
|||
|
||||
/// Adds or updates the monitor which monitors the channel referred to by the given key.
|
||||
pub fn add_monitor_by_key(&self, key: Key, monitor: ChannelMonitor<ChanSigner>) -> Result<(), MonitorUpdateError> {
|
||||
let mut watch_events = self.watch_events.lock().unwrap();
|
||||
let mut monitors = self.monitors.lock().unwrap();
|
||||
let entry = match monitors.entry(key) {
|
||||
hash_map::Entry::Occupied(_) => return Err(MonitorUpdateError("Channel monitor for given key is already present")),
|
||||
|
@ -271,11 +331,11 @@ impl<Key : Send + cmp::Eq + hash::Hash + 'static, ChanSigner: ChannelKeys, T: De
|
|||
{
|
||||
let funding_txo = monitor.get_funding_txo();
|
||||
log_trace!(self.logger, "Got new Channel Monitor for channel {}", log_bytes!(funding_txo.0.to_channel_id()[..]));
|
||||
self.chain_monitor.install_watch_tx(&funding_txo.0.txid, &funding_txo.1);
|
||||
self.chain_monitor.install_watch_outpoint((funding_txo.0.txid, funding_txo.0.index as u32), &funding_txo.1);
|
||||
watch_events.watch_tx(&funding_txo.0.txid, &funding_txo.1);
|
||||
watch_events.watch_output((&funding_txo.0.txid, funding_txo.0.index as usize), &funding_txo.1);
|
||||
for (txid, outputs) in monitor.get_outputs_to_watch().iter() {
|
||||
for (idx, script) in outputs.iter().enumerate() {
|
||||
self.chain_monitor.install_watch_outpoint((*txid, idx as u32), script);
|
||||
watch_events.watch_output((txid, idx), script);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -342,6 +402,17 @@ impl<Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref, F: De
|
|||
}
|
||||
}
|
||||
|
||||
impl<Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref, C: Deref> chain::WatchEventProvider for SimpleManyChannelMonitor<Key, ChanSigner, T, F, L, C>
|
||||
where T::Target: BroadcasterInterface,
|
||||
F::Target: FeeEstimator,
|
||||
L::Target: Logger,
|
||||
C::Target: ChainWatchInterface,
|
||||
{
|
||||
fn release_pending_watch_events(&self) -> Vec<chain::WatchEvent> {
|
||||
self.watch_events.lock().unwrap().dequeue_events()
|
||||
}
|
||||
}
|
||||
|
||||
/// If an HTLC expires within this many blocks, don't try to claim it in a shared transaction,
|
||||
/// instead claiming it in its own individual transaction.
|
||||
pub(crate) const CLTV_SHARED_CLAIM_BUFFER: u32 = 12;
|
||||
|
@ -881,30 +952,14 @@ pub trait ManyChannelMonitor: Send + Sync {
|
|||
|
||||
/// Adds a monitor for the given `funding_txo`.
|
||||
///
|
||||
/// Implementer must also ensure that the funding_txo txid *and* outpoint are registered with
|
||||
/// any relevant ChainWatchInterfaces such that the provided monitor receives block_connected
|
||||
/// callbacks with the funding transaction, or any spends of it.
|
||||
///
|
||||
/// Further, the implementer must also ensure that each output returned in
|
||||
/// monitor.get_outputs_to_watch() is registered to ensure that the provided monitor learns about
|
||||
/// any spends of any of the outputs.
|
||||
///
|
||||
/// Any spends of outputs which should have been registered which aren't passed to
|
||||
/// ChannelMonitors via block_connected may result in FUNDS LOSS.
|
||||
/// Implementations must ensure that `monitor` receives block_connected calls for blocks with
|
||||
/// the funding transaction or any spends of it, as well as any spends of outputs returned by
|
||||
/// get_outputs_to_watch. Not doing so may result in LOST FUNDS.
|
||||
fn add_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor<Self::Keys>) -> Result<(), ChannelMonitorUpdateErr>;
|
||||
|
||||
/// Updates a monitor for the given `funding_txo`.
|
||||
///
|
||||
/// Implementer must also ensure that the funding_txo txid *and* outpoint are registered with
|
||||
/// any relevant ChainWatchInterfaces such that the provided monitor receives block_connected
|
||||
/// callbacks with the funding transaction, or any spends of it.
|
||||
///
|
||||
/// Further, the implementer must also ensure that each output returned in
|
||||
/// monitor.get_watch_outputs() is registered to ensure that the provided monitor learns about
|
||||
/// any spends of any of the outputs.
|
||||
///
|
||||
/// Any spends of outputs which should have been registered which aren't passed to
|
||||
/// ChannelMonitors via block_connected may result in FUNDS LOSS.
|
||||
/// TODO(jkczyz): Determine where this should go from e73036c6845fd3cc16479a1b497db82a5ebb3897.
|
||||
///
|
||||
/// In case of distributed watchtowers deployment, even if an Err is return, the new version
|
||||
/// must be written to disk, as state may have been stored but rejected due to a block forcing
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
//! A bunch of useful utilities for building networks of nodes and exchanging messages between
|
||||
//! nodes for functional tests.
|
||||
|
||||
use chain;
|
||||
use chain::chaininterface;
|
||||
use chain::transaction::OutPoint;
|
||||
use ln::channelmanager::{ChannelManager, ChannelManagerReadArgs, RAACommitmentOrder, PaymentPreimage, PaymentHash, PaymentSecret, PaymentSendFailure};
|
||||
|
@ -80,9 +81,29 @@ pub fn connect_blocks<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, depth: u32, he
|
|||
}
|
||||
|
||||
pub fn connect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, block: &Block, height: u32) {
|
||||
node.block_notifier.block_connected(block, height)
|
||||
use chain::WatchEventProvider;
|
||||
use chain::chaininterface::ChainListener;
|
||||
|
||||
let watch_events = node.chan_monitor.simple_monitor.release_pending_watch_events();
|
||||
process_chain_watch_events(&watch_events);
|
||||
|
||||
let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
|
||||
loop {
|
||||
node.chan_monitor.simple_monitor.block_connected(&block.header, &txdata, height);
|
||||
|
||||
let watch_events = node.chan_monitor.simple_monitor.release_pending_watch_events();
|
||||
process_chain_watch_events(&watch_events);
|
||||
|
||||
if watch_events.is_empty() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
node.node.block_connected(&block.header, &txdata, height);
|
||||
}
|
||||
|
||||
fn process_chain_watch_events(_events: &Vec<chain::WatchEvent>) {}
|
||||
|
||||
pub fn disconnect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, header: &BlockHeader, height: u32) {
|
||||
node.block_notifier.block_disconnected(header, height)
|
||||
}
|
||||
|
|
|
@ -417,9 +417,6 @@ fn test_1_conf_open() {
|
|||
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
|
||||
|
||||
let tx = create_chan_between_nodes_with_value_init(&nodes[0], &nodes[1], 100000, 10001, InitFeatures::known(), InitFeatures::known());
|
||||
assert!(nodes[0].chain_monitor.does_match_tx(&tx));
|
||||
assert!(nodes[1].chain_monitor.does_match_tx(&tx));
|
||||
|
||||
let block = Block {
|
||||
header: BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 },
|
||||
txdata: vec![tx],
|
||||
|
@ -2785,8 +2782,8 @@ fn claim_htlc_outputs_single_tx() {
|
|||
|
||||
#[test]
|
||||
fn test_htlc_on_chain_success() {
|
||||
// Test that in case of a unilateral close onchain, we detect the state of output thanks to
|
||||
// ChainWatchInterface and pass the preimage backward accordingly. So here we test that ChannelManager is
|
||||
// Test that in case of a unilateral close onchain, we detect the state of output and pass
|
||||
// the preimage backward accordingly. So here we test that ChannelManager is
|
||||
// broadcasting the right event to other nodes in payment path.
|
||||
// We test with two HTLCs simultaneously as that was not handled correctly in the past.
|
||||
// A --------------------> B ----------------------> C (preimage)
|
||||
|
@ -2964,8 +2961,8 @@ fn test_htlc_on_chain_success() {
|
|||
|
||||
#[test]
|
||||
fn test_htlc_on_chain_timeout() {
|
||||
// Test that in case of a unilateral close onchain, we detect the state of output thanks to
|
||||
// ChainWatchInterface and timeout the HTLC backward accordingly. So here we test that ChannelManager is
|
||||
// Test that in case of a unilateral close onchain, we detect the state of output and
|
||||
// timeout the HTLC backward accordingly. So here we test that ChannelManager is
|
||||
// broadcasting the right event to other nodes in payment path.
|
||||
// A ------------------> B ----------------------> C (timeout)
|
||||
// B's commitment tx C's commitment tx
|
||||
|
@ -5177,8 +5174,8 @@ fn test_static_spendable_outputs_justice_tx_revoked_htlc_success_tx() {
|
|||
|
||||
#[test]
|
||||
fn test_onchain_to_onchain_claim() {
|
||||
// Test that in case of channel closure, we detect the state of output thanks to
|
||||
// ChainWatchInterface and claim HTLC on downstream peer's remote commitment tx.
|
||||
// Test that in case of channel closure, we detect the state of output and claim HTLC
|
||||
// on downstream peer's remote commitment tx.
|
||||
// First, have C claim an HTLC against its own latest commitment transaction.
|
||||
// Then, broadcast these to B, which should update the monitor downstream on the A<->B
|
||||
// channel.
|
||||
|
|
Loading…
Add table
Reference in a new issue