diff --git a/ARCH.md b/ARCH.md index c4f94280c..3e4aba324 100644 --- a/ARCH.md +++ b/ARCH.md @@ -48,11 +48,11 @@ At a high level, some of the common interfaces fit together as follows: | (as ------------------ ---------------------- | ChannelMessageHandler)-> | ChannelManager | ----> | ManyChannelMonitor | v / ------------------ ---------------------- ---------------- / ^ (as EventsProvider) ^ -| PeerManager |- | \ / / ---------------- | -------\---/---------- - | ----------------------- / \ / - | | ChainWatchInterface | - v +--------------- / (as EventsProvider) +| PeerManager |- \ / +--------------- \ / + | ----------------------- \ / + | | ChainWatchInterface | v | ----------------------- --------- | | | Event | (as RoutingMessageHandler) v --------- diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 661bda39e..a1738444f 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -28,9 +28,8 @@ use bitcoin::hashes::Hash as TraitImport; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hash_types::{BlockHash, WPubkeyHash}; -use lightning::chain::chaininterface; use lightning::chain::transaction::OutPoint; -use lightning::chain::chaininterface::{BroadcasterInterface,ConfirmationTarget,ChainListener,FeeEstimator,ChainWatchInterfaceUtil,ChainWatchInterface}; +use lightning::chain::chaininterface::{BroadcasterInterface, ChainListener, ConfirmationTarget, FeeEstimator}; use lightning::chain::keysinterface::{KeysInterface, InMemoryChannelKeys}; use lightning::ln::channelmonitor; use lightning::ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, MonitorEvent}; @@ -83,7 +82,7 @@ impl Writer for VecWriter { struct TestChannelMonitor { pub logger: Arc, - pub simple_monitor: Arc, Arc, Arc, Arc>>, + pub simple_monitor: Arc, Arc, Arc>>, pub update_ret: Mutex>, // If we reload a node with an old copy of ChannelMonitors, the ChannelManager deserialization // logic will automatically force-close our channels for us (as we don't have an up-to-date @@ -94,9 +93,9 @@ struct TestChannelMonitor { pub should_update_manager: atomic::AtomicBool, } impl TestChannelMonitor { - pub fn new(chain_monitor: Arc, broadcaster: Arc, logger: Arc, feeest: Arc) -> Self { + pub fn new(broadcaster: Arc, logger: Arc, feeest: Arc) -> Self { Self { - simple_monitor: Arc::new(channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger.clone(), feeest)), + simple_monitor: Arc::new(channelmonitor::SimpleManyChannelMonitor::new(broadcaster, logger.clone(), feeest)), logger, update_ret: Mutex::new(Ok(())), latest_monitors: Mutex::new(HashMap::new()), @@ -191,8 +190,7 @@ pub fn do_test(data: &[u8], out: Out) { macro_rules! make_node { ($node_id: expr) => { { let logger: Arc = Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone())); - let watch = Arc::new(ChainWatchInterfaceUtil::new(Network::Bitcoin)); - let monitor = Arc::new(TestChannelMonitor::new(watch.clone(), broadcast.clone(), logger.clone(), fee_est.clone())); + let monitor = Arc::new(TestChannelMonitor::new(broadcast.clone(), logger.clone(), fee_est.clone())); let keys_manager = Arc::new(KeyProvider { node_id: $node_id, rand_bytes_id: atomic::AtomicU8::new(0) }); let mut config = UserConfig::default(); @@ -207,8 +205,7 @@ pub fn do_test(data: &[u8], out: Out) { macro_rules! reload_node { ($ser: expr, $node_id: expr, $old_monitors: expr) => { { let logger: Arc = Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone())); - let watch = Arc::new(ChainWatchInterfaceUtil::new(Network::Bitcoin)); - let monitor = Arc::new(TestChannelMonitor::new(watch.clone(), broadcast.clone(), logger.clone(), fee_est.clone())); + let monitor = Arc::new(TestChannelMonitor::new(broadcast.clone(), logger.clone(), fee_est.clone())); let keys_manager = Arc::new(KeyProvider { node_id: $node_id, rand_bytes_id: atomic::AtomicU8::new(0) }); let mut config = UserConfig::default(); diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index 98dc228d6..d37e961bf 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -144,14 +144,14 @@ impl<'a> std::hash::Hash for Peer<'a> { type ChannelMan = ChannelManager< EnforcingChannelKeys, - Arc, Arc, Arc, Arc>>, + Arc, Arc, Arc>>, Arc, Arc, Arc, Arc>; type PeerMan<'a> = PeerManager, Arc, Arc, Arc>>, Arc>; struct MoneyLossDetector<'a> { manager: Arc, monitor: Arc, Arc, Arc, Arc>>, + OutPoint, EnforcingChannelKeys, Arc, Arc, Arc>>, handler: PeerMan<'a>, peers: &'a RefCell<[bool; 256]>, @@ -165,7 +165,7 @@ struct MoneyLossDetector<'a> { impl<'a> MoneyLossDetector<'a> { pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc, - monitor: Arc, Arc, Arc, Arc>>, + monitor: Arc, Arc, Arc>>, handler: PeerMan<'a>) -> Self { MoneyLossDetector { manager, @@ -334,7 +334,7 @@ pub fn do_test(data: &[u8], logger: &Arc) { let watch = Arc::new(ChainWatchInterfaceUtil::new(Network::Bitcoin)); let broadcast = Arc::new(TestBroadcaster{}); - let monitor = Arc::new(channelmonitor::SimpleManyChannelMonitor::new(watch.clone(), broadcast.clone(), Arc::clone(&logger), fee_est.clone())); + let monitor = Arc::new(channelmonitor::SimpleManyChannelMonitor::new(broadcast.clone(), Arc::clone(&logger), fee_est.clone())); let keys_manager = Arc::new(KeyProvider { node_secret: our_network_key.clone(), counter: AtomicU64::new(0) }); let mut config = UserConfig::default(); diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index a7818d7b8..2ab2e7d4a 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -35,7 +35,7 @@ //! type FeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator; //! type Logger = dyn lightning::util::logger::Logger; //! type ChainWatchInterface = dyn lightning::chain::chaininterface::ChainWatchInterface; -//! type ChannelMonitor = lightning::ln::channelmonitor::SimpleManyChannelMonitor, Arc, Arc, Arc>; +//! type ChannelMonitor = lightning::ln::channelmonitor::SimpleManyChannelMonitor, Arc, Arc>; //! type ChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager; //! type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager; //! diff --git a/lightning/src/ln/channelmonitor.rs b/lightning/src/ln/channelmonitor.rs index fd724c1ca..8f4f97fc2 100644 --- a/lightning/src/ln/channelmonitor.rs +++ b/lightning/src/ln/channelmonitor.rs @@ -41,7 +41,7 @@ use ln::chan_utils::{CounterpartyCommitmentSecrets, HTLCOutputInCommitment, Hold use ln::channelmanager::{HTLCSource, PaymentPreimage, PaymentHash}; use ln::onchaintx::{OnchainTxHandler, InputDescriptors}; use chain; -use chain::chaininterface::{ChainListener, ChainWatchInterface, ChainWatchedUtil, BroadcasterInterface, FeeEstimator}; +use chain::chaininterface::{ChainListener, ChainWatchedUtil, BroadcasterInterface, FeeEstimator}; use chain::transaction::OutPoint; use chain::keysinterface::{SpendableOutputDescriptor, ChannelKeys}; use util::logger::Logger; @@ -147,6 +147,11 @@ pub enum ChannelMonitorUpdateErr { /// Note that even when you fail a holder commitment transaction update, you must store the /// update to ensure you can claim from it in case of a duplicate copy of this ChannelMonitor /// broadcasts it (e.g distributed channel-monitor deployment) + /// + /// In case of distributed watchtowers deployment, the new version must be written to disk, as + /// state may have been stored but rejected due to a block forcing a commitment broadcast. This + /// storage is used to claim outputs of rejected state confirmed onchain by another watchtower, + /// lagging behind on block processing. PermanentFailure, } @@ -191,16 +196,14 @@ impl_writeable!(HTLCUpdate, 0, { payment_hash, payment_preimage, source }); /// `OutPoint` as the key, which will give you a ManyChannelMonitor implementation. /// /// (C-not exported) due to an unconstrained generic in `Key` -pub struct SimpleManyChannelMonitor +pub struct SimpleManyChannelMonitor where T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, - C::Target: ChainWatchInterface, { /// The monitors pub monitors: Mutex>>, watch_events: Mutex, - chain_monitor: C, broadcaster: T, logger: L, fee_estimator: F @@ -267,12 +270,11 @@ impl WatchEventQueue { } } -impl - ChainListener for SimpleManyChannelMonitor +impl + ChainListener for SimpleManyChannelMonitor where T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, - C::Target: ChainWatchInterface, { fn block_connected(&self, header: &BlockHeader, txdata: &[(usize, &Transaction)], height: u32) { let mut watch_events = self.watch_events.lock().unwrap(); @@ -299,19 +301,17 @@ impl SimpleManyChannelMonitor +impl SimpleManyChannelMonitor where T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, - C::Target: ChainWatchInterface, { /// Creates a new object which can be used to monitor several channels given the chain /// interface with which to register to receive notifications. - pub fn new(chain_monitor: C, broadcaster: T, logger: L, feeest: F) -> SimpleManyChannelMonitor { + pub fn new(broadcaster: T, logger: L, feeest: F) -> SimpleManyChannelMonitor { let res = SimpleManyChannelMonitor { monitors: Mutex::new(HashMap::new()), watch_events: Mutex::new(WatchEventQueue::new()), - chain_monitor, broadcaster, logger, fee_estimator: feeest, @@ -356,11 +356,10 @@ impl ManyChannelMonitor for SimpleManyChannelMonitor +impl ManyChannelMonitor for SimpleManyChannelMonitor where T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, - C::Target: ChainWatchInterface, { type Keys = ChanSigner; @@ -387,11 +386,10 @@ impl events::EventsProvider for SimpleManyChannelMonitor +impl events::EventsProvider for SimpleManyChannelMonitor where T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, - C::Target: ChainWatchInterface, { fn get_and_clear_pending_events(&self) -> Vec { let mut pending_events = Vec::new(); @@ -402,11 +400,10 @@ impl chain::WatchEventProvider for SimpleManyChannelMonitor +impl chain::WatchEventProvider for SimpleManyChannelMonitor where T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, - C::Target: ChainWatchInterface, { fn release_pending_watch_events(&self) -> Vec { self.watch_events.lock().unwrap().dequeue_events() @@ -958,13 +955,6 @@ pub trait ManyChannelMonitor: Send + Sync { fn add_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr>; /// Updates a monitor for the given `funding_txo`. - /// - /// TODO(jkczyz): Determine where this should go from e73036c6845fd3cc16479a1b497db82a5ebb3897. - /// - /// In case of distributed watchtowers deployment, even if an Err is return, the new version - /// must be written to disk, as state may have been stored but rejected due to a block forcing - /// a commitment broadcast. This storage is used to claim outputs of rejected state confirmed - /// onchain by another watchtower, lagging behind on block processing. fn update_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitorUpdate) -> Result<(), ChannelMonitorUpdateErr>; /// Used by ChannelManager to get list of HTLC resolved onchain and which needed to be updated diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index c164cd6bb..f9804f4b7 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -216,16 +216,12 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> { }).unwrap(); } - let chain_watch = chaininterface::ChainWatchInterfaceUtil::new(Network::Testnet); - let channel_monitor = test_utils::TestChannelMonitor::new(&chain_watch, self.tx_broadcaster.clone(), &self.logger, &feeest); + let channel_monitor = test_utils::TestChannelMonitor::new(self.tx_broadcaster.clone(), &self.logger, &feeest); for deserialized_monitor in deserialized_monitors.drain(..) { if let Err(_) = channel_monitor.add_monitor(deserialized_monitor.get_funding_txo().0, deserialized_monitor) { panic!(); } } - if chain_watch != *self.chain_monitor { - panic!(); - } } } } @@ -1125,7 +1121,7 @@ pub fn create_node_cfgs<'a>(node_count: usize, chanmon_cfgs: &'a Vec)>::read(&mut chan_0_monitor_read).unwrap(); @@ -4433,7 +4432,7 @@ fn test_manager_serialize_deserialize_events() { fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 }; logger = test_utils::TestLogger::new(); - new_chan_monitor = test_utils::TestChannelMonitor::new(nodes[0].chain_monitor.clone(), nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator); + new_chan_monitor = test_utils::TestChannelMonitor::new(nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator); nodes[0].chan_monitor = &new_chan_monitor; let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..]; let (_, mut chan_0_monitor) = <(BlockHash, ChannelMonitor)>::read(&mut chan_0_monitor_read).unwrap(); @@ -4524,7 +4523,7 @@ fn test_simple_manager_serialize_deserialize() { logger = test_utils::TestLogger::new(); fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 }; - new_chan_monitor = test_utils::TestChannelMonitor::new(nodes[0].chain_monitor.clone(), nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator); + new_chan_monitor = test_utils::TestChannelMonitor::new(nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator); nodes[0].chan_monitor = &new_chan_monitor; let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..]; let (_, mut chan_0_monitor) = <(BlockHash, ChannelMonitor)>::read(&mut chan_0_monitor_read).unwrap(); @@ -4602,7 +4601,7 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() { logger = test_utils::TestLogger::new(); fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 }; - new_chan_monitor = test_utils::TestChannelMonitor::new(nodes[0].chain_monitor.clone(), nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator); + new_chan_monitor = test_utils::TestChannelMonitor::new(nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator); nodes[0].chan_monitor = &new_chan_monitor; let mut node_0_stale_monitors = Vec::new(); @@ -5748,7 +5747,7 @@ fn test_key_derivation_params() { // We manually create the node configuration to backup the seed. let seed = [42; 32]; let keys_manager = test_utils::TestKeysInterface::new(&seed, Network::Testnet); - let chan_monitor = test_utils::TestChannelMonitor::new(&chanmon_cfgs[0].chain_monitor, &chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator); + let chan_monitor = test_utils::TestChannelMonitor::new(&chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator); let node = NodeCfg { chain_monitor: &chanmon_cfgs[0].chain_monitor, logger: &chanmon_cfgs[0].logger, tx_broadcaster: &chanmon_cfgs[0].tx_broadcaster, fee_estimator: &chanmon_cfgs[0].fee_estimator, chan_monitor, keys_manager, node_seed: seed }; let mut node_cfgs = create_node_cfgs(3, &chanmon_cfgs); node_cfgs.remove(0); @@ -7499,7 +7498,7 @@ fn test_data_loss_protect() { tx_broadcaster = test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new())}; fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 }; keys_manager = test_utils::TestKeysInterface::new(&nodes[0].node_seed, Network::Testnet); - monitor = test_utils::TestChannelMonitor::new(&chain_monitor, &tx_broadcaster, &logger, &fee_estimator); + monitor = test_utils::TestChannelMonitor::new(&tx_broadcaster, &logger, &fee_estimator); node_state_0 = { let mut channel_monitors = HashMap::new(); channel_monitors.insert(OutPoint { txid: chan.3.txid(), index: 0 }, &mut chan_monitor); @@ -8363,7 +8362,6 @@ fn test_update_err_monitor_lockdown() { // Copy SimpleManyChannelMonitor to simulate a watchtower and update block height of node 0 until its ChannelMonitor timeout HTLC onchain let logger = test_utils::TestLogger::with_id(format!("node {}", 0)); - let chain_monitor = chaininterface::ChainWatchInterfaceUtil::new(Network::Testnet); let watchtower = { let monitors = nodes[0].chan_monitor.simple_monitor.monitors.lock().unwrap(); let monitor = monitors.get(&outpoint).unwrap(); @@ -8372,7 +8370,7 @@ fn test_update_err_monitor_lockdown() { let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor)>::read( &mut ::std::io::Cursor::new(&w.0)).unwrap().1; assert!(new_monitor == *monitor); - let watchtower = test_utils::TestChannelMonitor::new(&chain_monitor, &chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator); + let watchtower = test_utils::TestChannelMonitor::new(&chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator); assert!(watchtower.add_monitor(outpoint, new_monitor).is_ok()); watchtower }; @@ -8421,7 +8419,6 @@ fn test_concurrent_monitor_claim() { // Copy SimpleManyChannelMonitor to simulate watchtower Alice and update block height her ChannelMonitor timeout HTLC onchain let logger = test_utils::TestLogger::with_id(format!("node {}", "Alice")); - let chain_monitor = chaininterface::ChainWatchInterfaceUtil::new(Network::Testnet); let watchtower_alice = { let monitors = nodes[0].chan_monitor.simple_monitor.monitors.lock().unwrap(); let monitor = monitors.get(&outpoint).unwrap(); @@ -8430,7 +8427,7 @@ fn test_concurrent_monitor_claim() { let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor)>::read( &mut ::std::io::Cursor::new(&w.0)).unwrap().1; assert!(new_monitor == *monitor); - let watchtower = test_utils::TestChannelMonitor::new(&chain_monitor, &chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator); + let watchtower = test_utils::TestChannelMonitor::new(&chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator); assert!(watchtower.add_monitor(outpoint, new_monitor).is_ok()); watchtower }; @@ -8446,7 +8443,6 @@ fn test_concurrent_monitor_claim() { // Copy SimpleManyChannelMonitor to simulate watchtower Bob and make it receive a commitment update first. let logger = test_utils::TestLogger::with_id(format!("node {}", "Bob")); - let chain_monitor = chaininterface::ChainWatchInterfaceUtil::new(Network::Testnet); let watchtower_bob = { let monitors = nodes[0].chan_monitor.simple_monitor.monitors.lock().unwrap(); let monitor = monitors.get(&outpoint).unwrap(); @@ -8455,7 +8451,7 @@ fn test_concurrent_monitor_claim() { let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor)>::read( &mut ::std::io::Cursor::new(&w.0)).unwrap().1; assert!(new_monitor == *monitor); - let watchtower = test_utils::TestChannelMonitor::new(&chain_monitor, &chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator); + let watchtower = test_utils::TestChannelMonitor::new(&chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator); assert!(watchtower.add_monitor(outpoint, new_monitor).is_ok()); watchtower }; diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 07dcb0b3a..4f4ddc23f 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -62,18 +62,18 @@ impl chaininterface::FeeEstimator for TestFeeEstimator { pub struct TestChannelMonitor<'a> { pub added_monitors: Mutex)>>, pub latest_monitor_update_id: Mutex>, - pub simple_monitor: channelmonitor::SimpleManyChannelMonitor, + pub simple_monitor: channelmonitor::SimpleManyChannelMonitor, pub update_ret: Mutex>, // If this is set to Some(), after the next return, we'll always return this until update_ret // is changed: pub next_update_ret: Mutex>>, } impl<'a> TestChannelMonitor<'a> { - pub fn new(chain_monitor: &'a chaininterface::ChainWatchInterface, broadcaster: &'a chaininterface::BroadcasterInterface, logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator) -> Self { + pub fn new(broadcaster: &'a chaininterface::BroadcasterInterface, logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator) -> Self { Self { added_monitors: Mutex::new(Vec::new()), latest_monitor_update_id: Mutex::new(HashMap::new()), - simple_monitor: channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger, fee_estimator), + simple_monitor: channelmonitor::SimpleManyChannelMonitor::new(broadcaster, logger, fee_estimator), update_ret: Mutex::new(Ok(())), next_update_ret: Mutex::new(None), }