From e9f1934e5b3cd76bdfb92395c018a564d26c80b3 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 7 Oct 2021 04:14:35 +0000 Subject: [PATCH 1/8] Move MonitorEvent serialization to TLV-enum-upgradable from custom --- lightning/src/chain/channelmonitor.rs | 13 ++++++++++--- lightning/src/util/ser_macros.rs | 10 ++++++++-- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index c483d01f3..4c7da902b 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -132,6 +132,10 @@ pub enum MonitorEvent { /// A monitor event that the Channel's commitment transaction was confirmed. CommitmentTxConfirmed(OutPoint), } +impl_writeable_tlv_based_enum_upgradable!(MonitorEvent, ; + (2, HTLCEvent), + (4, CommitmentTxConfirmed), +); /// Simple structure sent back by `chain::Watch` when an HTLC from a forward channel is detected on /// chain. Used to update the corresponding HTLC in the backward channel. Failing to pass the @@ -891,6 +895,7 @@ impl Writeable for ChannelMonitorImpl { write_tlv_fields!(writer, { (1, self.funding_spend_confirmed, option), (3, self.htlcs_resolved_on_chain, vec_type), + (5, self.pending_monitor_events, vec_type), }); Ok(()) @@ -3000,14 +3005,15 @@ impl<'a, Signer: Sign, K: KeysInterface> ReadableArgs<&'a K> } let pending_monitor_events_len: u64 = Readable::read(reader)?; - let mut pending_monitor_events = Vec::with_capacity(cmp::min(pending_monitor_events_len as usize, MAX_ALLOC_SIZE / (32 + 8*3))); + let mut pending_monitor_events = Some( + Vec::with_capacity(cmp::min(pending_monitor_events_len as usize, MAX_ALLOC_SIZE / (32 + 8*3)))); for _ in 0..pending_monitor_events_len { let ev = match ::read(reader)? { 0 => MonitorEvent::HTLCEvent(Readable::read(reader)?), 1 => MonitorEvent::CommitmentTxConfirmed(funding_info.0), _ => return Err(DecodeError::InvalidValue) }; - pending_monitor_events.push(ev); + pending_monitor_events.as_mut().unwrap().push(ev); } let pending_events_len: u64 = Readable::read(reader)?; @@ -3068,6 +3074,7 @@ impl<'a, Signer: Sign, K: KeysInterface> ReadableArgs<&'a K> read_tlv_fields!(reader, { (1, funding_spend_confirmed, option), (3, htlcs_resolved_on_chain, vec_type), + (5, pending_monitor_events, vec_type), }); let mut secp_ctx = Secp256k1::new(); @@ -3107,7 +3114,7 @@ impl<'a, Signer: Sign, K: KeysInterface> ReadableArgs<&'a K> current_holder_commitment_number, payment_preimages, - pending_monitor_events, + pending_monitor_events: pending_monitor_events.unwrap(), pending_events, onchain_events_awaiting_threshold_conf, diff --git a/lightning/src/util/ser_macros.rs b/lightning/src/util/ser_macros.rs index b80ca537b..0827a023a 100644 --- a/lightning/src/util/ser_macros.rs +++ b/lightning/src/util/ser_macros.rs @@ -455,9 +455,12 @@ macro_rules! _impl_writeable_tlv_based_enum_common { macro_rules! impl_writeable_tlv_based_enum_upgradable { ($st: ident, $(($variant_id: expr, $variant_name: ident) => {$(($type: expr, $field: ident, $fieldty: tt)),* $(,)*} - ),* $(,)*) => { + ),* $(,)* + $(; + $(($tuple_variant_id: expr, $tuple_variant_name: ident)),* $(,)*)*) => { _impl_writeable_tlv_based_enum_common!($st, - $(($variant_id, $variant_name) => {$(($type, $field, $fieldty)),*}),*; ); + $(($variant_id, $variant_name) => {$(($type, $field, $fieldty)),*}),*; + $($(($tuple_variant_id, $tuple_variant_name)),*)*); impl ::util::ser::MaybeReadable for $st { fn read(reader: &mut R) -> Result, ::ln::msgs::DecodeError> { @@ -481,6 +484,9 @@ macro_rules! impl_writeable_tlv_based_enum_upgradable { }; f() }),* + $($($tuple_variant_id => { + Ok(Some($st::$tuple_variant_name(Readable::read(reader)?))) + }),*)* _ if id % 2 == 1 => Ok(None), _ => Err(DecodeError::UnknownRequiredFeature), } From df8bde9b2efdd12dcb5a4b8d5d5d2507af402598 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 8 Oct 2021 22:54:32 +0000 Subject: [PATCH 2/8] Move the two-AtomicUsize counter in peer_handler to a util struct We also take this opportunity to drop byte_utils::le64_to_array, as our MSRV now supports the native to_le_bytes() call. --- lightning/src/ln/peer_handler.rs | 27 ++++---------------- lightning/src/util/atomic_counter.rs | 31 +++++++++++++++++++++++ lightning/src/util/byte_utils.rs | 15 ----------- lightning/src/util/chacha20poly1305rfc.rs | 10 +++----- lightning/src/util/mod.rs | 1 + 5 files changed, 41 insertions(+), 43 deletions(-) create mode 100644 lightning/src/util/atomic_counter.rs diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 1815d4a35..ebb0322f8 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -24,7 +24,7 @@ use ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}; use util::ser::{VecWriter, Writeable, Writer}; use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep}; use ln::wire; -use util::byte_utils; +use util::atomic_counter::AtomicCounter; use util::events::{MessageSendEvent, MessageSendEventsProvider}; use util::logger::Logger; use routing::network_graph::NetGraphMsgHandler; @@ -33,7 +33,6 @@ use prelude::*; use io; use alloc::collections::LinkedList; use sync::{Arc, Mutex}; -use core::sync::atomic::{AtomicUsize, Ordering}; use core::{cmp, hash, fmt, mem}; use core::ops::Deref; use core::convert::Infallible; @@ -343,12 +342,6 @@ struct PeerHolder { node_id_to_descriptor: HashMap, } -#[cfg(not(any(target_pointer_width = "32", target_pointer_width = "64")))] -fn _check_usize_is_32_or_64() { - // See below, less than 32 bit pointers may be unsafe here! - unsafe { mem::transmute::<*const usize, [u8; 4]>(panic!()); } -} - /// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g. /// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static /// lifetimes). Other times you can afford a reference, which is more efficient, in which case @@ -394,10 +387,7 @@ pub struct PeerManager P }), our_node_secret, ephemeral_key_midstate, - peer_counter_low: AtomicUsize::new(0), - peer_counter_high: AtomicUsize::new(0), + peer_counter: AtomicCounter::new(), logger, custom_message_handler, } @@ -509,14 +498,8 @@ impl P fn get_ephemeral_key(&self) -> SecretKey { let mut ephemeral_hash = self.ephemeral_key_midstate.clone(); - let low = self.peer_counter_low.fetch_add(1, Ordering::AcqRel); - let high = if low == 0 { - self.peer_counter_high.fetch_add(1, Ordering::AcqRel) - } else { - self.peer_counter_high.load(Ordering::Acquire) - }; - ephemeral_hash.input(&byte_utils::le64_to_array(low as u64)); - ephemeral_hash.input(&byte_utils::le64_to_array(high as u64)); + let counter = self.peer_counter.get_increment(); + ephemeral_hash.input(&counter.to_le_bytes()); SecretKey::from_slice(&Sha256::from_engine(ephemeral_hash).into_inner()).expect("You broke SHA-256!") } diff --git a/lightning/src/util/atomic_counter.rs b/lightning/src/util/atomic_counter.rs new file mode 100644 index 000000000..81cc1f496 --- /dev/null +++ b/lightning/src/util/atomic_counter.rs @@ -0,0 +1,31 @@ +//! A simple atomic counter that uses AtomicUsize to give a u64 counter. + +#[cfg(not(any(target_pointer_width = "32", target_pointer_width = "64")))] +compile_error!("We need at least 32-bit pointers for atomic counter (and to have enough memory to run LDK)"); + +use core::sync::atomic::{AtomicUsize, Ordering}; + +pub(crate) struct AtomicCounter { + // Usize needs to be at least 32 bits to avoid overflowing both low and high. If usize is 64 + // bits we will never realistically count into high: + counter_low: AtomicUsize, + counter_high: AtomicUsize, +} + +impl AtomicCounter { + pub(crate) fn new() -> Self { + Self { + counter_low: AtomicUsize::new(0), + counter_high: AtomicUsize::new(0), + } + } + pub(crate) fn get_increment(&self) -> u64 { + let low = self.counter_low.fetch_add(1, Ordering::AcqRel) as u64; + let high = if low == 0 { + self.counter_high.fetch_add(1, Ordering::AcqRel) as u64 + } else { + self.counter_high.load(Ordering::Acquire) as u64 + }; + (high << 32) | low + } +} diff --git a/lightning/src/util/byte_utils.rs b/lightning/src/util/byte_utils.rs index 0c6530f29..1ab6384e3 100644 --- a/lightning/src/util/byte_utils.rs +++ b/lightning/src/util/byte_utils.rs @@ -70,20 +70,6 @@ pub fn be64_to_array(u: u64) -> [u8; 8] { v } -#[inline] -pub fn le64_to_array(u: u64) -> [u8; 8] { - let mut v = [0; 8]; - v[0] = ((u >> 8*0) & 0xff) as u8; - v[1] = ((u >> 8*1) & 0xff) as u8; - v[2] = ((u >> 8*2) & 0xff) as u8; - v[3] = ((u >> 8*3) & 0xff) as u8; - v[4] = ((u >> 8*4) & 0xff) as u8; - v[5] = ((u >> 8*5) & 0xff) as u8; - v[6] = ((u >> 8*6) & 0xff) as u8; - v[7] = ((u >> 8*7) & 0xff) as u8; - v -} - #[cfg(test)] mod tests { use super::*; @@ -96,6 +82,5 @@ mod tests { assert_eq!(be32_to_array(0xdeadbeef), [0xde, 0xad, 0xbe, 0xef]); assert_eq!(be48_to_array(0xdeadbeef1bad), [0xde, 0xad, 0xbe, 0xef, 0x1b, 0xad]); assert_eq!(be64_to_array(0xdeadbeef1bad1dea), [0xde, 0xad, 0xbe, 0xef, 0x1b, 0xad, 0x1d, 0xea]); - assert_eq!(le64_to_array(0xdeadbeef1bad1dea), [0xea, 0x1d, 0xad, 0x1b, 0xef, 0xbe, 0xad, 0xde]); } } diff --git a/lightning/src/util/chacha20poly1305rfc.rs b/lightning/src/util/chacha20poly1305rfc.rs index 3908116cc..fdd51e757 100644 --- a/lightning/src/util/chacha20poly1305rfc.rs +++ b/lightning/src/util/chacha20poly1305rfc.rs @@ -16,8 +16,6 @@ mod real_chachapoly { use util::poly1305::Poly1305; use bitcoin::hashes::cmp::fixed_time_eq; - use util::byte_utils; - #[derive(Clone, Copy)] pub struct ChaCha20Poly1305RFC { cipher: ChaCha20, @@ -67,8 +65,8 @@ mod real_chachapoly { self.mac.input(output); ChaCha20Poly1305RFC::pad_mac_16(&mut self.mac, self.data_len); self.finished = true; - self.mac.input(&byte_utils::le64_to_array(self.aad_len)); - self.mac.input(&byte_utils::le64_to_array(self.data_len as u64)); + self.mac.input(&self.aad_len.to_le_bytes()); + self.mac.input(&(self.data_len as u64).to_le_bytes()); self.mac.raw_result(out_tag); } @@ -82,8 +80,8 @@ mod real_chachapoly { self.data_len += input.len(); ChaCha20Poly1305RFC::pad_mac_16(&mut self.mac, self.data_len); - self.mac.input(&byte_utils::le64_to_array(self.aad_len)); - self.mac.input(&byte_utils::le64_to_array(self.data_len as u64)); + self.mac.input(&self.aad_len.to_le_bytes()); + self.mac.input(&(self.data_len as u64).to_le_bytes()); let mut calc_tag = [0u8; 16]; self.mac.raw_result(&mut calc_tag); diff --git a/lightning/src/util/mod.rs b/lightning/src/util/mod.rs index cc0c3192a..34e661901 100644 --- a/lightning/src/util/mod.rs +++ b/lightning/src/util/mod.rs @@ -20,6 +20,7 @@ pub mod errors; pub mod ser; pub mod message_signing; +pub(crate) mod atomic_counter; pub(crate) mod byte_utils; pub(crate) mod chacha20; #[cfg(feature = "fuzztarget")] From 4500270488e6ed918c5f6e07310eb4a384eb6e21 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 7 Oct 2021 18:51:49 +0000 Subject: [PATCH 3/8] Move ChannelManager::monitor_updated to a MonitorEvent In the next commit we'll need ChainMonitor to "see" when a monitor persistence completes, which means `monitor_updated` needs to move to `ChainMonitor`. The simplest way to then communicate that information to `ChannelManager` is via `MonitorEvet`s, which seems to line up ok, even if they're now constructed by multiple different places. --- fuzz/src/chanmon_consistency.rs | 24 ++++++---- lightning/src/chain/chainmonitor.rs | 29 +++++++++++- lightning/src/chain/channelmonitor.rs | 32 +++++++++++-- lightning/src/chain/mod.rs | 25 ++++++---- lightning/src/ln/chanmon_update_fail_tests.rs | 46 +++++++++---------- lightning/src/ln/channelmanager.rs | 33 +++++-------- 6 files changed, 123 insertions(+), 66 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index f3b952ff9..63e179ac4 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -855,22 +855,26 @@ pub fn do_test(data: &[u8], out: Out) { 0x08 => { if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) { - nodes[0].channel_monitor_updated(&chan_1_funding, *id); + monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id); + nodes[0].process_monitor_events(); } }, 0x09 => { if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) { - nodes[1].channel_monitor_updated(&chan_1_funding, *id); + monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id); + nodes[1].process_monitor_events(); } }, 0x0a => { if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) { - nodes[1].channel_monitor_updated(&chan_2_funding, *id); + monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id); + nodes[1].process_monitor_events(); } }, 0x0b => { if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) { - nodes[2].channel_monitor_updated(&chan_2_funding, *id); + monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id); + nodes[2].process_monitor_events(); } }, @@ -1077,16 +1081,20 @@ pub fn do_test(data: &[u8], out: Out) { *monitor_c.persister.update_ret.lock().unwrap() = Ok(()); if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) { - nodes[0].channel_monitor_updated(&chan_1_funding, *id); + monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id); + nodes[0].process_monitor_events(); } if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) { - nodes[1].channel_monitor_updated(&chan_1_funding, *id); + monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id); + nodes[1].process_monitor_events(); } if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) { - nodes[1].channel_monitor_updated(&chan_2_funding, *id); + monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id); + nodes[1].process_monitor_events(); } if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) { - nodes[2].channel_monitor_updated(&chan_2_funding, *id); + monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id); + nodes[2].process_monitor_events(); } // Next, make sure peers are all connected to each other diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index f1ce0f79a..9e92264b0 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -38,7 +38,7 @@ use util::events::EventHandler; use ln::channelmanager::ChannelDetails; use prelude::*; -use sync::{RwLock, RwLockReadGuard}; +use sync::{RwLock, RwLockReadGuard, Mutex}; use core::ops::Deref; /// `Persist` defines behavior for persisting channel monitors: this could mean @@ -134,6 +134,7 @@ pub struct ChainMonitor>, } impl ChainMonitor @@ -207,6 +208,7 @@ where C::Target: chain::Filter, logger, fee_estimator: feeest, persister, + pending_monitor_events: Mutex::new(Vec::new()), } } @@ -262,6 +264,29 @@ where C::Target: chain::Filter, self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor } + /// Indicates the persistence of a [`ChannelMonitor`] has completed after + /// [`ChannelMonitorUpdateErr::TemporaryFailure`] was returned from an update operation. + /// + /// All ChannelMonitor updates up to and including highest_applied_update_id must have been + /// fully committed in every copy of the given channels' ChannelMonitors. + /// + /// Note that there is no effect to calling with a highest_applied_update_id other than the + /// current latest ChannelMonitorUpdate and one call to this function after multiple + /// ChannelMonitorUpdateErr::TemporaryFailures is fine. The highest_applied_update_id field + /// exists largely only to prevent races between this and concurrent update_monitor calls. + /// + /// Thus, the anticipated use is, at a high level: + /// 1) This [`ChainMonitor`] calls [`Persist::update_persisted_channel`] which stores the + /// update to disk and begins updating any remote (e.g. watchtower/backup) copies, + /// returning [`ChannelMonitorUpdateErr::TemporaryFailure`], + /// 2) once all remote copies are updated, you call this function with the update_id that + /// completed, and once it is the latest the Channel will be re-enabled. + pub fn channel_monitor_updated(&self, funding_txo: OutPoint, highest_applied_update_id: u64) { + self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted { + funding_txo, monitor_update_id: highest_applied_update_id + }); + } + #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))] pub fn get_and_clear_pending_events(&self) -> Vec { use util::events::EventsProvider; @@ -431,7 +456,7 @@ where C::Target: chain::Filter, } fn release_pending_monitor_events(&self) -> Vec { - let mut pending_monitor_events = Vec::new(); + let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0); for monitor_state in self.monitors.read().unwrap().values() { pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events()); } diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 4c7da902b..a7506acb4 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -131,8 +131,29 @@ pub enum MonitorEvent { /// A monitor event that the Channel's commitment transaction was confirmed. CommitmentTxConfirmed(OutPoint), + + /// Indicates a [`ChannelMonitor`] update has completed. See + /// [`ChannelMonitorUpdateErr::TemporaryFailure`] for more information on how this is used. + /// + /// [`ChannelMonitorUpdateErr::TemporaryFailure`]: super::ChannelMonitorUpdateErr::TemporaryFailure + UpdateCompleted { + /// The funding outpoint of the [`ChannelMonitor`] that was updated + funding_txo: OutPoint, + /// The Update ID from [`ChannelMonitorUpdate::update_id`] which was applied or + /// [`ChannelMonitor::get_latest_update_id`]. + /// + /// Note that this should only be set to a given update's ID if all previous updates for the + /// same [`ChannelMonitor`] have been applied and persisted. + monitor_update_id: u64, + }, } -impl_writeable_tlv_based_enum_upgradable!(MonitorEvent, ; +impl_writeable_tlv_based_enum_upgradable!(MonitorEvent, + // Note that UpdateCompleted is currently never serialized to disk as it is generated only in ChainMonitor + (0, UpdateCompleted) => { + (0, funding_txo, required), + (2, monitor_update_id, required), + }, +; (2, HTLCEvent), (4, CommitmentTxConfirmed), ); @@ -854,14 +875,19 @@ impl Writeable for ChannelMonitorImpl { writer.write_all(&payment_preimage.0[..])?; } - writer.write_all(&byte_utils::be64_to_array(self.pending_monitor_events.len() as u64))?; + writer.write_all(&(self.pending_monitor_events.iter().filter(|ev| match ev { + MonitorEvent::HTLCEvent(_) => true, + MonitorEvent::CommitmentTxConfirmed(_) => true, + _ => false, + }).count() as u64).to_be_bytes())?; for event in self.pending_monitor_events.iter() { match event { MonitorEvent::HTLCEvent(upd) => { 0u8.write(writer)?; upd.write(writer)?; }, - MonitorEvent::CommitmentTxConfirmed(_) => 1u8.write(writer)? + MonitorEvent::CommitmentTxConfirmed(_) => 1u8.write(writer)?, + _ => {}, // Covered in the TLV writes below } } diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index f22d152ec..55755073b 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -182,9 +182,10 @@ pub enum ChannelMonitorUpdateErr { /// our state failed, but is expected to succeed at some point in the future). /// /// Such a failure will "freeze" a channel, preventing us from revoking old states or - /// submitting new commitment transactions to the counterparty. Once the update(s) which failed - /// have been successfully applied, ChannelManager::channel_monitor_updated can be used to - /// restore the channel to an operational state. + /// submitting new commitment transactions to the counterparty. Once the update(s) that failed + /// have been successfully applied, a [`MonitorEvent::UpdateCompleted`] event should be returned + /// via [`Watch::release_pending_monitor_events`] which will then restore the channel to an + /// operational state. /// /// Note that a given ChannelManager will *never* re-generate a given ChannelMonitorUpdate. If /// you return a TemporaryFailure you must ensure that it is written to disk safely before @@ -198,13 +199,14 @@ pub enum ChannelMonitorUpdateErr { /// the channel which would invalidate previous ChannelMonitors are not made when a channel has /// been "frozen". /// - /// Note that even if updates made after TemporaryFailure succeed you must still call - /// channel_monitor_updated to ensure you have the latest monitor and re-enable normal channel - /// operation. + /// Note that even if updates made after TemporaryFailure succeed you must still provide a + /// [`MonitorEvent::UpdateCompleted`] to ensure you have the latest monitor and re-enable + /// normal channel operation. Note that this is normally generated through a call to + /// [`ChainMonitor::channel_monitor_updated`]. /// - /// Note that the update being processed here will not be replayed for you when you call - /// ChannelManager::channel_monitor_updated, so you must store the update itself along - /// with the persisted ChannelMonitor on your own local disk prior to returning a + /// Note that the update being processed here will not be replayed for you when you return a + /// [`MonitorEvent::UpdateCompleted`] event via [`Watch::release_pending_monitor_events`], so + /// you must store the update itself on your own local disk prior to returning a /// TemporaryFailure. You may, of course, employ a journaling approach, storing only the /// ChannelMonitorUpdate on disk without updating the monitor itself, replaying the journal at /// reload-time. @@ -212,6 +214,8 @@ pub enum ChannelMonitorUpdateErr { /// For deployments where a copy of ChannelMonitors and other local state are backed up in a /// remote location (with local copies persisted immediately), it is anticipated that all /// updates will return TemporaryFailure until the remote copies could be updated. + /// + /// [`ChainMonitor::channel_monitor_updated`]: chainmonitor::ChainMonitor::channel_monitor_updated TemporaryFailure, /// Used to indicate no further channel monitor updates will be allowed (eg we've moved on to a /// different watchtower and cannot update with all watchtowers that were previously informed @@ -280,6 +284,9 @@ pub trait Watch { /// Returns any monitor events since the last call. Subsequent calls must only return new /// events. + /// + /// For details on asynchronous [`ChannelMonitor`] updating and returning + /// [`MonitorEvent::UpdateCompleted`] here, see [`ChannelMonitorUpdateErr::TemporaryFailure`]. fn release_pending_monitor_events(&self) -> Vec; } diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 6d62f8db9..819c71f4f 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -177,7 +177,7 @@ fn do_test_simple_monitor_temporary_update_fail(disconnect: bool) { chanmon_cfgs[0].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].node.channel_monitor_updated(&outpoint, latest_update); + nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[0], 0); let mut events_2 = nodes[0].node.get_and_clear_pending_msg_events(); @@ -330,7 +330,7 @@ fn do_test_monitor_temporary_update_fail(disconnect_count: usize) { // Now fix monitor updating... chanmon_cfgs[0].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].node.channel_monitor_updated(&outpoint, latest_update); + nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[0], 0); macro_rules! disconnect_reconnect_peers { () => { { @@ -628,7 +628,7 @@ fn test_monitor_update_fail_cs() { chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let responses = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(responses.len(), 2); @@ -662,7 +662,7 @@ fn test_monitor_update_fail_cs() { chanmon_cfgs[0].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].node.channel_monitor_updated(&outpoint, latest_update); + nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[0], 0); let final_raa = get_event_msg!(nodes[0], MessageSendEvent::SendRevokeAndACK, nodes[1].node.get_our_node_id()); @@ -722,7 +722,7 @@ fn test_monitor_update_fail_no_rebroadcast() { chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); check_added_monitors!(nodes[1], 0); expect_pending_htlcs_forwardable!(nodes[1]); @@ -783,7 +783,7 @@ fn test_monitor_update_raa_while_paused() { chanmon_cfgs[0].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].node.channel_monitor_updated(&outpoint, latest_update); + nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[0], 0); let as_update_raa = get_revoke_commit_msgs!(nodes[0], nodes[1].node.get_our_node_id()); @@ -909,7 +909,7 @@ fn do_test_monitor_update_fail_raa(test_ignore_second_cs: bool) { // update_add update. chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_2.2).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); expect_pending_htlcs_forwardable!(nodes[1]); check_added_monitors!(nodes[1], 1); @@ -1148,7 +1148,7 @@ fn test_monitor_update_fail_reestablish() { chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); @@ -1231,7 +1231,7 @@ fn raa_no_response_awaiting_raa_state() { chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); // nodes[1] should be AwaitingRAA here! check_added_monitors!(nodes[1], 0); let bs_responses = get_revoke_commit_msgs!(nodes[1], nodes[0].node.get_our_node_id()); @@ -1348,7 +1348,7 @@ fn claim_while_disconnected_monitor_update_fail() { // receiving the commitment update from A, and the resulting commitment dances. chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let bs_msgs = nodes[1].node.get_and_clear_pending_msg_events(); @@ -1457,7 +1457,7 @@ fn monitor_failed_no_reestablish_response() { chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let bs_responses = get_revoke_commit_msgs!(nodes[1], nodes[0].node.get_our_node_id()); @@ -1550,7 +1550,7 @@ fn first_message_on_recv_ordering() { chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); expect_pending_htlcs_forwardable!(nodes[1]); @@ -1634,7 +1634,7 @@ fn test_monitor_update_fail_claim() { // Now restore monitor updating on the 0<->1 channel and claim the funds on B. let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let bs_fulfill_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); @@ -1729,7 +1729,7 @@ fn test_monitor_update_on_pending_forwards() { chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let bs_updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); @@ -1790,7 +1790,7 @@ fn monitor_update_claim_fail_no_response() { chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); @@ -1849,7 +1849,7 @@ fn do_during_funding_monitor_fail(confirm_a_first: bool, restore_b_before_conf: assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); chanmon_cfgs[0].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].node.channel_monitor_updated(&outpoint, latest_update); + nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[0], 0); let events = nodes[0].node.get_and_clear_pending_events(); @@ -1881,7 +1881,7 @@ fn do_during_funding_monitor_fail(confirm_a_first: bool, restore_b_before_conf: chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let (channel_id, (announcement, as_update, bs_update)) = if !confirm_a_first { @@ -1966,7 +1966,7 @@ fn test_path_paused_mpp() { // And check that, after we successfully update the monitor for chan_2 we can pass the second // HTLC along to nodes[3] and claim the whole payment back to nodes[0]. let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_2_id).unwrap().clone(); - nodes[0].node.channel_monitor_updated(&outpoint, latest_update); + nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); let mut events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); pass_along_path(&nodes[0], &[&nodes[2], &nodes[3]], 200_000, payment_hash.clone(), Some(payment_secret), events.pop().unwrap(), true, None); @@ -2302,7 +2302,7 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) { // not occur prior to #756). chanmon_cfgs[0].persister.set_update_ret(Ok(())); let (funding_txo, mon_id) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id).unwrap().clone(); - nodes[0].node.channel_monitor_updated(&funding_txo, mon_id); + nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(funding_txo, mon_id); // New outbound messages should be generated immediately upon a call to // get_and_clear_pending_msg_events (but not before). @@ -2500,14 +2500,14 @@ fn test_temporary_error_during_shutdown() { chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].node.channel_monitor_updated(&outpoint, latest_update); + nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendClosingSigned, nodes[1].node.get_our_node_id())); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); nodes[0].node.handle_closing_signed(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendClosingSigned, nodes[0].node.get_our_node_id())); let (_, closing_signed_a) = get_closing_signed_broadcast!(nodes[0].node, nodes[1].node.get_our_node_id()); @@ -2596,10 +2596,10 @@ fn double_temp_error() { chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (_, latest_update_2) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&funding_tx, latest_update_1); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(funding_tx, latest_update_1); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); check_added_monitors!(nodes[1], 0); - nodes[1].node.channel_monitor_updated(&funding_tx, latest_update_2); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(funding_tx, latest_update_2); // Complete the first HTLC. let events = nodes[1].node.get_and_clear_pending_msg_events(); diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 77faac089..72f0bebdf 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -3385,27 +3385,7 @@ impl ChannelMana self.our_network_pubkey.clone() } - /// Restores a single, given channel to normal operation after a - /// ChannelMonitorUpdateErr::TemporaryFailure was returned from a channel monitor update - /// operation. - /// - /// All ChannelMonitor updates up to and including highest_applied_update_id must have been - /// fully committed in every copy of the given channels' ChannelMonitors. - /// - /// Note that there is no effect to calling with a highest_applied_update_id other than the - /// current latest ChannelMonitorUpdate and one call to this function after multiple - /// ChannelMonitorUpdateErr::TemporaryFailures is fine. The highest_applied_update_id field - /// exists largely only to prevent races between this and concurrent update_monitor calls. - /// - /// Thus, the anticipated use is, at a high level: - /// 1) You register a chain::Watch with this ChannelManager, - /// 2) it stores each update to disk, and begins updating any remote (eg watchtower) copies of - /// said ChannelMonitors as it can, returning ChannelMonitorUpdateErr::TemporaryFailures - /// any time it cannot do so instantly, - /// 3) update(s) are applied to each remote copy of a ChannelMonitor, - /// 4) once all remote copies are updated, you call this function with the update_id that - /// completed, and once it is the latest the Channel will be re-enabled. - pub fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) { + fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let chan_restoration_res; @@ -4135,6 +4115,9 @@ impl ChannelMana }); } }, + MonitorEvent::UpdateCompleted { funding_txo, monitor_update_id } => { + self.channel_monitor_updated(&funding_txo, monitor_update_id); + }, } } @@ -4145,6 +4128,14 @@ impl ChannelMana has_pending_monitor_events } + /// In chanmon_consistency_target, we'd like to be able to restore monitor updating without + /// handling all pending events (i.e. not PendingHTLCsForwardable). Thus, we expose monitor + /// update events as a separate process method here. + #[cfg(feature = "fuzztarget")] + pub fn process_monitor_events(&self) { + self.process_pending_monitor_events(); + } + /// Check the holding cell in each channel and free any pending HTLCs in them if possible. /// Returns whether there were any updates such as if pending HTLCs were freed or a monitor /// update was applied. From 89ad05954891276dfb8524be904af78a8ec7ee82 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 7 Oct 2021 23:59:47 +0000 Subject: [PATCH 4/8] Use an opaque type to describe monitor updates in Persist In the next commit, we'll be originating monitor updates both from the ChainMonitor and from the ChannelManager, making simple sequential update IDs impossible. Further, the existing async monitor update API was somewhat hard to work with - instead of being able to generate monitor_updated callbacks whenever a persistence process finishes, you had to ensure you only did so at least once all previous updates had also been persisted. Here we eat the complexity for the user by moving to an opaque type for monitor updates, tracking which updates are in-flight for the user and only generating monitor-persisted events once all pending updates have been committed. --- fuzz/src/chanmon_consistency.rs | 18 +- fuzz/src/utils/test_persister.rs | 5 +- lightning-persister/src/lib.rs | 12 +- lightning/src/chain/chainmonitor.rs | 169 ++++++++++++++---- lightning/src/ln/chanmon_update_fail_tests.rs | 92 +++++----- lightning/src/util/test_utils.rs | 13 +- 6 files changed, 206 insertions(+), 103 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 63e179ac4..28bab5a28 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -855,25 +855,25 @@ pub fn do_test(data: &[u8], out: Out) { 0x08 => { if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) { - monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id); + monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id); nodes[0].process_monitor_events(); } }, 0x09 => { if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) { - monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id); + monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id); nodes[1].process_monitor_events(); } }, 0x0a => { if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) { - monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id); + monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id); nodes[1].process_monitor_events(); } }, 0x0b => { if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) { - monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id); + monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id); nodes[2].process_monitor_events(); } }, @@ -1075,25 +1075,25 @@ pub fn do_test(data: &[u8], out: Out) { // Test that no channel is in a stuck state where neither party can send funds even // after we resolve all pending events. // First make sure there are no pending monitor updates, resetting the error state - // and calling channel_monitor_updated for each monitor. + // and calling force_channel_monitor_updated for each monitor. *monitor_a.persister.update_ret.lock().unwrap() = Ok(()); *monitor_b.persister.update_ret.lock().unwrap() = Ok(()); *monitor_c.persister.update_ret.lock().unwrap() = Ok(()); if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) { - monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id); + monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id); nodes[0].process_monitor_events(); } if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) { - monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id); + monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id); nodes[1].process_monitor_events(); } if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) { - monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id); + monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id); nodes[1].process_monitor_events(); } if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) { - monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id); + monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id); nodes[2].process_monitor_events(); } diff --git a/fuzz/src/utils/test_persister.rs b/fuzz/src/utils/test_persister.rs index f02f8587a..4c18d8261 100644 --- a/fuzz/src/utils/test_persister.rs +++ b/fuzz/src/utils/test_persister.rs @@ -1,5 +1,6 @@ use lightning::chain; use lightning::chain::{chainmonitor, channelmonitor}; +use lightning::chain::chainmonitor::MonitorUpdateId; use lightning::chain::transaction::OutPoint; use lightning::util::enforcing_trait_impls::EnforcingSigner; @@ -9,11 +10,11 @@ pub struct TestPersister { pub update_ret: Mutex>, } impl chainmonitor::Persist for TestPersister { - fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor) -> Result<(), chain::ChannelMonitorUpdateErr> { + fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor, _update_id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { self.update_ret.lock().unwrap().clone() } - fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &channelmonitor::ChannelMonitorUpdate, _data: &channelmonitor::ChannelMonitor) -> Result<(), chain::ChannelMonitorUpdateErr> { + fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &channelmonitor::ChannelMonitorUpdate, _data: &channelmonitor::ChannelMonitor, _update_id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { self.update_ret.lock().unwrap().clone() } } diff --git a/lightning-persister/src/lib.rs b/lightning-persister/src/lib.rs index 88b103c71..4ba6d96ea 100644 --- a/lightning-persister/src/lib.rs +++ b/lightning-persister/src/lib.rs @@ -159,13 +159,13 @@ impl FilesystemPersister { } impl chainmonitor::Persist for FilesystemPersister { - fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor) -> Result<(), chain::ChannelMonitorUpdateErr> { + fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index); util::write_to_file(self.path_to_monitor_data(), filename, monitor) .map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure) } - fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &ChannelMonitorUpdate, monitor: &ChannelMonitor) -> Result<(), chain::ChannelMonitorUpdateErr> { + fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &ChannelMonitorUpdate, monitor: &ChannelMonitor, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index); util::write_to_file(self.path_to_monitor_data(), filename, monitor) .map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure) @@ -296,6 +296,8 @@ mod tests { nodes[1].node.force_close_channel(&chan.2).unwrap(); check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed); let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap(); + let update_map = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap(); + let update_id = update_map.get(&added_monitors[0].0.to_channel_id()).unwrap(); // Set the persister's directory to read-only, which should result in // returning a permanent failure when we then attempt to persist a @@ -309,7 +311,7 @@ mod tests { txid: Txid::from_hex("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(), index: 0 }; - match persister.persist_new_channel(test_txo, &added_monitors[0].1) { + match persister.persist_new_channel(test_txo, &added_monitors[0].1, update_id.2) { Err(ChannelMonitorUpdateErr::PermanentFailure) => {}, _ => panic!("unexpected result from persisting new channel") } @@ -333,6 +335,8 @@ mod tests { nodes[1].node.force_close_channel(&chan.2).unwrap(); check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed); let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap(); + let update_map = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap(); + let update_id = update_map.get(&added_monitors[0].0.to_channel_id()).unwrap(); // Create the persister with an invalid directory name and test that the // channel fails to open because the directories fail to be created. There @@ -344,7 +348,7 @@ mod tests { txid: Txid::from_hex("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(), index: 0 }; - match persister.persist_new_channel(test_txo, &added_monitors[0].1) { + match persister.persist_new_channel(test_txo, &added_monitors[0].1, update_id.2) { Err(ChannelMonitorUpdateErr::PermanentFailure) => {}, _ => panic!("unexpected result from persisting new channel") } diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 9e92264b0..323598f20 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -33,44 +33,75 @@ use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, Monit use chain::transaction::{OutPoint, TransactionData}; use chain::keysinterface::Sign; use util::logger::Logger; +use util::errors::APIError; use util::events; use util::events::EventHandler; use ln::channelmanager::ChannelDetails; use prelude::*; -use sync::{RwLock, RwLockReadGuard, Mutex}; +use sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard}; use core::ops::Deref; +#[derive(Clone, Copy, Hash, PartialEq, Eq)] +enum UpdateOrigin { + OffChain(u64), +} + +/// An opaque identifier describing a specific [`Persist`] method call. +#[derive(Clone, Copy, Hash, PartialEq, Eq)] +pub struct MonitorUpdateId { + contents: UpdateOrigin, +} + +impl MonitorUpdateId { + pub(crate) fn from_monitor_update(update: &ChannelMonitorUpdate) -> Self { + Self { contents: UpdateOrigin::OffChain(update.update_id) } + } + pub(crate) fn from_new_monitor(monitor: &ChannelMonitor) -> Self { + Self { contents: UpdateOrigin::OffChain(monitor.get_latest_update_id()) } + } +} + /// `Persist` defines behavior for persisting channel monitors: this could mean /// writing once to disk, and/or uploading to one or more backup services. /// -/// Note that for every new monitor, you **must** persist the new `ChannelMonitor` -/// to disk/backups. And, on every update, you **must** persist either the -/// `ChannelMonitorUpdate` or the updated monitor itself. Otherwise, there is risk -/// of situations such as revoking a transaction, then crashing before this -/// revocation can be persisted, then unintentionally broadcasting a revoked -/// transaction and losing money. This is a risk because previous channel states -/// are toxic, so it's important that whatever channel state is persisted is -/// kept up-to-date. +/// Each method can return three possible values: +/// * If persistence (including any relevant `fsync()` calls) happens immediately, the +/// implementation should return `Ok(())`, indicating normal channel operation should continue. +/// * If persistence happens asynchronously, implementations should first ensure the +/// [`ChannelMonitor`] or [`ChannelMonitorUpdate`] are written durably to disk, and then return +/// `Err(ChannelMonitorUpdateErr::TemporaryFailure)` while the update continues in the +/// background. Once the update completes, [`ChainMonitor::channel_monitor_updated`] should be +/// called with the corresponding [`MonitorUpdateId`]. +/// +/// Note that unlike the direct [`chain::Watch`] interface, +/// [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs. +/// +/// * If persistence fails for some reason, implementations should return +/// `Err(ChannelMonitorUpdateErr::PermanentFailure)`, in which case the channel will likely be +/// closed without broadcasting the latest state. See +/// [`ChannelMonitorUpdateErr::PermanentFailure`] for more details. pub trait Persist { - /// Persist a new channel's data. The data can be stored any way you want, but - /// the identifier provided by Rust-Lightning is the channel's outpoint (and - /// it is up to you to maintain a correct mapping between the outpoint and the - /// stored channel data). Note that you **must** persist every new monitor to - /// disk. See the `Persist` trait documentation for more details. + /// Persist a new channel's data. The data can be stored any way you want, but the identifier + /// provided by LDK is the channel's outpoint (and it is up to you to maintain a correct + /// mapping between the outpoint and the stored channel data). Note that you **must** persist + /// every new monitor to disk. + /// + /// The `update_id` is used to identify this call to [`ChainMonitor::channel_monitor_updated`], + /// if you return [`ChannelMonitorUpdateErr::TemporaryFailure`]. /// /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor` /// and [`ChannelMonitorUpdateErr`] for requirements when returning errors. /// /// [`Writeable::write`]: crate::util::ser::Writeable::write - fn persist_new_channel(&self, id: OutPoint, data: &ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr>; + fn persist_new_channel(&self, channel_id: OutPoint, data: &ChannelMonitor, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>; - /// Update one channel's data. The provided `ChannelMonitor` has already - /// applied the given update. + /// Update one channel's data. The provided [`ChannelMonitor`] has already applied the given + /// update. /// - /// Note that on every update, you **must** persist either the - /// `ChannelMonitorUpdate` or the updated monitor itself to disk/backups. See - /// the `Persist` trait documentation for more details. + /// Note that on every update, you **must** persist either the [`ChannelMonitorUpdate`] or the + /// updated monitor itself to disk/backups. See the [`Persist`] trait documentation for more + /// details. /// /// If an implementer chooses to persist the updates only, they need to make /// sure that all the updates are applied to the `ChannelMonitors` *before* @@ -84,16 +115,33 @@ pub trait Persist { /// them in batches. The size of each monitor grows `O(number of state updates)` /// whereas updates are small and `O(1)`. /// + /// The `update_id` is used to identify this call to [`ChainMonitor::channel_monitor_updated`], + /// if you return [`ChannelMonitorUpdateErr::TemporaryFailure`]. + /// /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`, /// [`Writeable::write`] on [`ChannelMonitorUpdate`] for writing out an update, and /// [`ChannelMonitorUpdateErr`] for requirements when returning errors. /// /// [`Writeable::write`]: crate::util::ser::Writeable::write - fn update_persisted_channel(&self, id: OutPoint, update: &ChannelMonitorUpdate, data: &ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr>; + fn update_persisted_channel(&self, channel_id: OutPoint, update: &ChannelMonitorUpdate, data: &ChannelMonitor, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>; } struct MonitorHolder { monitor: ChannelMonitor, + /// The full set of pending monitor updates for this Channel. + /// + /// Note that this lock must be held during updates to prevent a race where we call + /// update_persisted_channel, the user returns a TemporaryFailure, and then calls + /// channel_monitor_updated immediately, racing our insertion of the pending update into the + /// contained Vec. + pending_monitor_updates: Mutex>, +} + +impl MonitorHolder { + fn has_pending_offchain_updates(&self, pending_monitor_updates_lock: &MutexGuard>) -> bool { + pending_monitor_updates_lock.iter().any(|update_id| + if let UpdateOrigin::OffChain(_) = update_id.contents { true } else { false }) + } } /// A read-only reference to a current ChannelMonitor. @@ -267,23 +315,61 @@ where C::Target: chain::Filter, /// Indicates the persistence of a [`ChannelMonitor`] has completed after /// [`ChannelMonitorUpdateErr::TemporaryFailure`] was returned from an update operation. /// - /// All ChannelMonitor updates up to and including highest_applied_update_id must have been - /// fully committed in every copy of the given channels' ChannelMonitors. - /// - /// Note that there is no effect to calling with a highest_applied_update_id other than the - /// current latest ChannelMonitorUpdate and one call to this function after multiple - /// ChannelMonitorUpdateErr::TemporaryFailures is fine. The highest_applied_update_id field - /// exists largely only to prevent races between this and concurrent update_monitor calls. - /// /// Thus, the anticipated use is, at a high level: /// 1) This [`ChainMonitor`] calls [`Persist::update_persisted_channel`] which stores the /// update to disk and begins updating any remote (e.g. watchtower/backup) copies, /// returning [`ChannelMonitorUpdateErr::TemporaryFailure`], - /// 2) once all remote copies are updated, you call this function with the update_id that - /// completed, and once it is the latest the Channel will be re-enabled. - pub fn channel_monitor_updated(&self, funding_txo: OutPoint, highest_applied_update_id: u64) { + /// 2) once all remote copies are updated, you call this function with the + /// `completed_update_id` that completed, and once all pending updates have completed the + /// channel will be re-enabled. + // Note that we re-enable only after `UpdateOrigin::OffChain` updates complete, we don't + // care about `UpdateOrigin::ChainSync` updates for the channel state being updated. We + // only care about `UpdateOrigin::ChainSync` for returning `MonitorEvent`s. + /// + /// Returns an [`APIError::APIMisuseError`] if `funding_txo` does not match any currently + /// registered [`ChannelMonitor`]s. + pub fn channel_monitor_updated(&self, funding_txo: OutPoint, completed_update_id: MonitorUpdateId) -> Result<(), APIError> { + let monitors = self.monitors.read().unwrap(); + let monitor_data = if let Some(mon) = monitors.get(&funding_txo) { mon } else { + return Err(APIError::APIMisuseError { err: format!("No ChannelMonitor matching funding outpoint {:?} found", funding_txo) }); + }; + let mut pending_monitor_updates = monitor_data.pending_monitor_updates.lock().unwrap(); + pending_monitor_updates.retain(|update_id| *update_id != completed_update_id); + + match completed_update_id { + MonitorUpdateId { .. } => { + // Note that we only check for `UpdateOrigin::OffChain` failures here - if + // we're being told that a `UpdateOrigin::OffChain` monitor update completed, + // we only care about ensuring we don't tell the `ChannelManager` to restore + // the channel to normal operation until all `UpdateOrigin::OffChain` updates + // complete. + // If there's some `UpdateOrigin::ChainSync` update still pending that's okay + // - we can still update our channel state, just as long as we don't return + // `MonitorEvent`s from the monitor back to the `ChannelManager` until they + // complete. + let monitor_is_pending_updates = monitor_data.has_pending_offchain_updates(&pending_monitor_updates); + if monitor_is_pending_updates { + // If there are still monitor updates pending, we cannot yet construct an + // UpdateCompleted event. + return Ok(()); + } + self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted { + funding_txo, + monitor_update_id: monitor_data.monitor.get_latest_update_id(), + }); + } + } + Ok(()) + } + + /// This wrapper avoids having to update some of our tests for now as they assume the direct + /// chain::Watch API wherein we mark a monitor fully-updated by just calling + /// channel_monitor_updated once with the highest ID. + #[cfg(any(test, feature = "fuzztarget"))] + pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) { self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted { - funding_txo, monitor_update_id: highest_applied_update_id + funding_txo, + monitor_update_id, }); } @@ -397,12 +483,16 @@ where C::Target: chain::Filter, return Err(ChannelMonitorUpdateErr::PermanentFailure)}, hash_map::Entry::Vacant(e) => e, }; - let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor); + let update_id = MonitorUpdateId::from_new_monitor(&monitor); + let mut pending_monitor_updates = Vec::new(); + let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor, update_id); if persist_res.is_err() { log_error!(self.logger, "Failed to persist new channel data: {:?}", persist_res); } if persist_res == Err(ChannelMonitorUpdateErr::PermanentFailure) { return persist_res; + } else if persist_res.is_err() { + pending_monitor_updates.push(update_id); } { let funding_txo = monitor.get_funding_txo(); @@ -412,7 +502,7 @@ where C::Target: chain::Filter, monitor.load_outputs_to_watch(chain_source); } } - entry.insert(MonitorHolder { monitor }); + entry.insert(MonitorHolder { monitor, pending_monitor_updates: Mutex::new(pending_monitor_updates) }); persist_res } @@ -442,8 +532,13 @@ where C::Target: chain::Filter, } // Even if updating the monitor returns an error, the monitor's state will // still be changed. So, persist the updated monitor despite the error. - let persist_res = self.persister.update_persisted_channel(funding_txo, &update, monitor); - if let Err(ref e) = persist_res { + let update_id = MonitorUpdateId::from_monitor_update(&update); + let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); + let persist_res = self.persister.update_persisted_channel(funding_txo, &update, monitor, update_id); + if let Err(e) = persist_res { + if e == ChannelMonitorUpdateErr::TemporaryFailure { + pending_monitor_updates.push(update_id); + } log_error!(self.logger, "Failed to persist channel monitor update: {:?}", e); } if update_res.is_err() { diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 819c71f4f..0b1c16d35 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -176,8 +176,8 @@ fn do_test_simple_monitor_temporary_update_fail(disconnect: bool) { } chanmon_cfgs[0].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[0], 0); let mut events_2 = nodes[0].node.get_and_clear_pending_msg_events(); @@ -329,8 +329,8 @@ fn do_test_monitor_temporary_update_fail(disconnect_count: usize) { // Now fix monitor updating... chanmon_cfgs[0].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[0], 0); macro_rules! disconnect_reconnect_peers { () => { { @@ -627,8 +627,8 @@ fn test_monitor_update_fail_cs() { assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let responses = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(responses.len(), 2); @@ -661,8 +661,8 @@ fn test_monitor_update_fail_cs() { } chanmon_cfgs[0].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[0], 0); let final_raa = get_event_msg!(nodes[0], MessageSendEvent::SendRevokeAndACK, nodes[1].node.get_our_node_id()); @@ -721,8 +721,8 @@ fn test_monitor_update_fail_no_rebroadcast() { check_added_monitors!(nodes[1], 1); chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); check_added_monitors!(nodes[1], 0); expect_pending_htlcs_forwardable!(nodes[1]); @@ -782,8 +782,8 @@ fn test_monitor_update_raa_while_paused() { check_added_monitors!(nodes[0], 1); chanmon_cfgs[0].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[0], 0); let as_update_raa = get_revoke_commit_msgs!(nodes[0], nodes[1].node.get_our_node_id()); @@ -908,8 +908,8 @@ fn do_test_monitor_update_fail_raa(test_ignore_second_cs: bool) { // Restore monitor updating, ensuring we immediately get a fail-back update and a // update_add update. chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_2.2).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_2.2).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); expect_pending_htlcs_forwardable!(nodes[1]); check_added_monitors!(nodes[1], 1); @@ -1147,8 +1147,8 @@ fn test_monitor_update_fail_reestablish() { .contents.flags & 2, 0); // The "disabled" bit should be unset as we just reconnected chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); @@ -1230,8 +1230,8 @@ fn raa_no_response_awaiting_raa_state() { check_added_monitors!(nodes[1], 1); chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); // nodes[1] should be AwaitingRAA here! check_added_monitors!(nodes[1], 0); let bs_responses = get_revoke_commit_msgs!(nodes[1], nodes[0].node.get_our_node_id()); @@ -1347,8 +1347,8 @@ fn claim_while_disconnected_monitor_update_fail() { // Now un-fail the monitor, which will result in B sending its original commitment update, // receiving the commitment update from A, and the resulting commitment dances. chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let bs_msgs = nodes[1].node.get_and_clear_pending_msg_events(); @@ -1456,8 +1456,8 @@ fn monitor_failed_no_reestablish_response() { let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id()); chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let bs_responses = get_revoke_commit_msgs!(nodes[1], nodes[0].node.get_our_node_id()); @@ -1549,8 +1549,8 @@ fn first_message_on_recv_ordering() { nodes[1].logger.assert_log("lightning::ln::channelmanager".to_string(), "Previous monitor update failure prevented generation of RAA".to_string(), 1); chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); expect_pending_htlcs_forwardable!(nodes[1]); @@ -1633,8 +1633,8 @@ fn test_monitor_update_fail_claim() { commitment_signed_dance!(nodes[1], nodes[2], payment_event.commitment_msg, false, true); // Now restore monitor updating on the 0<->1 channel and claim the funds on B. - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let bs_fulfill_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); @@ -1728,8 +1728,8 @@ fn test_monitor_update_on_pending_forwards() { nodes[1].logger.assert_log("lightning::ln::channelmanager".to_string(), "Failed to update ChannelMonitor".to_string(), 1); chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let bs_updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); @@ -1789,8 +1789,8 @@ fn monitor_update_claim_fail_no_response() { nodes[1].logger.assert_log("lightning::ln::channelmanager".to_string(), "Temporary failure claiming HTLC, treating as success: Failed to update ChannelMonitor".to_string(), 1); chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); @@ -1848,8 +1848,8 @@ fn do_during_funding_monitor_fail(confirm_a_first: bool, restore_b_before_conf: check_added_monitors!(nodes[0], 1); assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); chanmon_cfgs[0].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[0], 0); let events = nodes[0].node.get_and_clear_pending_events(); @@ -1880,8 +1880,8 @@ fn do_during_funding_monitor_fail(confirm_a_first: bool, restore_b_before_conf: } chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let (channel_id, (announcement, as_update, bs_update)) = if !confirm_a_first { @@ -1965,8 +1965,8 @@ fn test_path_paused_mpp() { // And check that, after we successfully update the monitor for chan_2 we can pass the second // HTLC along to nodes[3] and claim the whole payment back to nodes[0]. - let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_2_id).unwrap().clone(); - nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_2_id).unwrap().clone(); + nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); let mut events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); pass_along_path(&nodes[0], &[&nodes[2], &nodes[3]], 200_000, payment_hash.clone(), Some(payment_secret), events.pop().unwrap(), true, None); @@ -2301,8 +2301,8 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) { // If we finish updating the monitor, we should free the holding cell right away (this did // not occur prior to #756). chanmon_cfgs[0].persister.set_update_ret(Ok(())); - let (funding_txo, mon_id) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id).unwrap().clone(); - nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(funding_txo, mon_id); + let (funding_txo, mon_id, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id).unwrap().clone(); + nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_txo, mon_id); // New outbound messages should be generated immediately upon a call to // get_and_clear_pending_msg_events (but not before). @@ -2499,15 +2499,15 @@ fn test_temporary_error_during_shutdown() { chanmon_cfgs[0].persister.set_update_ret(Ok(())); chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendClosingSigned, nodes[1].node.get_our_node_id())); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); nodes[0].node.handle_closing_signed(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendClosingSigned, nodes[0].node.get_our_node_id())); let (_, closing_signed_a) = get_closing_signed_broadcast!(nodes[0].node, nodes[1].node.get_our_node_id()); @@ -2586,7 +2586,7 @@ fn double_temp_error() { // `claim_funds` results in a ChannelMonitorUpdate. assert!(nodes[1].node.claim_funds(payment_preimage_1)); check_added_monitors!(nodes[1], 1); - let (funding_tx, latest_update_1) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + let (funding_tx, latest_update_1, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); chanmon_cfgs[1].persister.set_update_ret(Err(ChannelMonitorUpdateErr::TemporaryFailure)); // Previously, this would've panicked due to a double-call to `Channel::monitor_update_failed`, @@ -2595,11 +2595,11 @@ fn double_temp_error() { check_added_monitors!(nodes[1], 1); chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (_, latest_update_2) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(funding_tx, latest_update_1); + let (_, latest_update_2, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_tx, latest_update_1); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); check_added_monitors!(nodes[1], 0); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(funding_tx, latest_update_2); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_tx, latest_update_2); // Complete the first HTLC. let events = nodes[1].node.get_and_clear_pending_msg_events(); diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 15eaa7d46..f0e453f9b 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -12,6 +12,7 @@ use chain::WatchedOutput; use chain::chaininterface; use chain::chaininterface::ConfirmationTarget; use chain::chainmonitor; +use chain::chainmonitor::MonitorUpdateId; use chain::channelmonitor; use chain::channelmonitor::MonitorEvent; use chain::transaction::OutPoint; @@ -88,7 +89,7 @@ impl keysinterface::KeysInterface for OnlyReadsKeysInterface { pub struct TestChainMonitor<'a> { pub added_monitors: Mutex)>>, - pub latest_monitor_update_id: Mutex>, + pub latest_monitor_update_id: Mutex>, pub chain_monitor: chainmonitor::ChainMonitor>, pub keys_manager: &'a TestKeysInterface, /// If this is set to Some(), the next update_channel call (not watch_channel) must be a @@ -116,7 +117,8 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor)>::read( &mut io::Cursor::new(&w.0), self.keys_manager).unwrap().1; assert!(new_monitor == monitor); - self.latest_monitor_update_id.lock().unwrap().insert(funding_txo.to_channel_id(), (funding_txo, monitor.get_latest_update_id())); + self.latest_monitor_update_id.lock().unwrap().insert(funding_txo.to_channel_id(), + (funding_txo, monitor.get_latest_update_id(), MonitorUpdateId::from_new_monitor(&monitor))); self.added_monitors.lock().unwrap().push((funding_txo, monitor)); self.chain_monitor.watch_channel(funding_txo, new_monitor) } @@ -136,7 +138,8 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { } else { panic!(); } } - self.latest_monitor_update_id.lock().unwrap().insert(funding_txo.to_channel_id(), (funding_txo, update.update_id)); + self.latest_monitor_update_id.lock().unwrap().insert(funding_txo.to_channel_id(), + (funding_txo, update.update_id, MonitorUpdateId::from_monitor_update(&update))); let update_res = self.chain_monitor.update_channel(funding_txo, update); // At every point where we get a monitor update, we should be able to send a useful monitor // to a watchtower and disk... @@ -179,7 +182,7 @@ impl TestPersister { } } impl chainmonitor::Persist for TestPersister { - fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor) -> Result<(), chain::ChannelMonitorUpdateErr> { + fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor, _id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { let ret = self.update_ret.lock().unwrap().clone(); if let Some(next_ret) = self.next_update_ret.lock().unwrap().take() { *self.update_ret.lock().unwrap() = next_ret; @@ -187,7 +190,7 @@ impl chainmonitor::Persist for TestPersiste ret } - fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &channelmonitor::ChannelMonitorUpdate, _data: &channelmonitor::ChannelMonitor) -> Result<(), chain::ChannelMonitorUpdateErr> { + fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &channelmonitor::ChannelMonitorUpdate, _data: &channelmonitor::ChannelMonitor, _id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { let ret = self.update_ret.lock().unwrap().clone(); if let Some(next_ret) = self.next_update_ret.lock().unwrap().take() { *self.update_ret.lock().unwrap() = next_ret; From 5c2ff2cb30ef1639c80b275eea209a289dd91b77 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 13 Oct 2021 20:05:48 +0000 Subject: [PATCH 5/8] Persist `ChannelMonitor`s after new blocks are connected This resolves several user complaints (and issues in the sample node) where startup is substantially delayed as we're always waiting for the chain data to sync. Further, in an upcoming PR, we'll be reloading pending payments from ChannelMonitors on restart, at which point we'll need the change here which avoids handling events until after the user has confirmed the `ChannelMonitor` has been persisted to disk. It will avoid a race where we * send a payment/HTLC (persisting the monitor to disk with the HTLC pending), * force-close the channel, removing the channel entry from the ChannelManager entirely, * persist the ChannelManager, * connect a block which contains a fulfill of the HTLC, generating a claim event, * handle the claim event while the `ChannelMonitor` is being persisted, * persist the ChannelManager (before the CHannelMonitor is persisted fully), * restart, reloading the HTLC as a pending payment in the ChannelManager, which now has no references to it except from the ChannelMonitor which still has the pending HTLC, * replay the block connection, generating a duplicate PaymentSent event. --- fuzz/src/utils/test_persister.rs | 2 +- lightning-persister/src/lib.rs | 7 +- lightning/src/chain/chainmonitor.rs | 152 +++++++++++++++++++++----- lightning/src/chain/channelmonitor.rs | 20 +++- lightning/src/chain/mod.rs | 4 + lightning/src/ln/channelmanager.rs | 43 +++++--- lightning/src/util/test_utils.rs | 2 +- 7 files changed, 184 insertions(+), 46 deletions(-) diff --git a/fuzz/src/utils/test_persister.rs b/fuzz/src/utils/test_persister.rs index 4c18d8261..7ca1ff96d 100644 --- a/fuzz/src/utils/test_persister.rs +++ b/fuzz/src/utils/test_persister.rs @@ -14,7 +14,7 @@ impl chainmonitor::Persist for TestPersister { self.update_ret.lock().unwrap().clone() } - fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &channelmonitor::ChannelMonitorUpdate, _data: &channelmonitor::ChannelMonitor, _update_id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { + fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &Option, _data: &channelmonitor::ChannelMonitor, _update_id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { self.update_ret.lock().unwrap().clone() } } diff --git a/lightning-persister/src/lib.rs b/lightning-persister/src/lib.rs index 4ba6d96ea..eba069248 100644 --- a/lightning-persister/src/lib.rs +++ b/lightning-persister/src/lib.rs @@ -159,13 +159,18 @@ impl FilesystemPersister { } impl chainmonitor::Persist for FilesystemPersister { + // TODO: We really need a way for the persister to inform the user that its time to crash/shut + // down once these start returning failure. + // A PermanentFailure implies we need to shut down since we're force-closing channels without + // even broadcasting! + fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index); util::write_to_file(self.path_to_monitor_data(), filename, monitor) .map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure) } - fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &ChannelMonitorUpdate, monitor: &ChannelMonitor, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { + fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &Option, monitor: &ChannelMonitor, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index); util::write_to_file(self.path_to_monitor_data(), filename, monitor) .map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 323598f20..d99d6708a 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -32,6 +32,7 @@ use chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs}; use chain::transaction::{OutPoint, TransactionData}; use chain::keysinterface::Sign; +use util::atomic_counter::AtomicCounter; use util::logger::Logger; use util::errors::APIError; use util::events; @@ -41,10 +42,19 @@ use ln::channelmanager::ChannelDetails; use prelude::*; use sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard}; use core::ops::Deref; +use core::sync::atomic::{AtomicBool, Ordering}; #[derive(Clone, Copy, Hash, PartialEq, Eq)] +/// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents +/// entirely opaque. enum UpdateOrigin { + /// An update that was generated by the `ChannelManager` (via our `chain::Watch` + /// implementation). This corresponds to an actual [`ChannelMonitorUpdate::update_id`] field + /// and [`ChannelMonitor::get_latest_update_id`]. OffChain(u64), + /// An update that was generated during blockchain processing. The ID here is specific to the + /// generating [`ChainMonitor`] and does *not* correspond to any on-disk IDs. + ChainSync(u64), } /// An opaque identifier describing a specific [`Persist`] method call. @@ -103,6 +113,12 @@ pub trait Persist { /// updated monitor itself to disk/backups. See the [`Persist`] trait documentation for more /// details. /// + /// During blockchain synchronization operations, this may be called with no + /// [`ChannelMonitorUpdate`], in which case the full [`ChannelMonitor`] needs to be persisted. + /// Note that after the full [`ChannelMonitor`] is persisted any previous + /// [`ChannelMonitorUpdate`]s which were persisted should be discarded - they can no longer be + /// applied to the persisted [`ChannelMonitor`] as they were already applied. + /// /// If an implementer chooses to persist the updates only, they need to make /// sure that all the updates are applied to the `ChannelMonitors` *before* /// the set of channel monitors is given to the `ChannelManager` @@ -123,7 +139,7 @@ pub trait Persist { /// [`ChannelMonitorUpdateErr`] for requirements when returning errors. /// /// [`Writeable::write`]: crate::util::ser::Writeable::write - fn update_persisted_channel(&self, channel_id: OutPoint, update: &ChannelMonitorUpdate, data: &ChannelMonitor, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>; + fn update_persisted_channel(&self, channel_id: OutPoint, update: &Option, data: &ChannelMonitor, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>; } struct MonitorHolder { @@ -134,7 +150,24 @@ struct MonitorHolder { /// update_persisted_channel, the user returns a TemporaryFailure, and then calls /// channel_monitor_updated immediately, racing our insertion of the pending update into the /// contained Vec. + /// + /// Beyond the synchronization of updates themselves, we cannot handle user events until after + /// any chain updates have been stored on disk. Thus, we scan this list when returning updates + /// to the ChannelManager, refusing to return any updates for a ChannelMonitor which is still + /// being persisted fully to disk after a chain update. + /// + /// This avoids the possibility of handling, e.g. an on-chain claim, generating a claim monitor + /// event, resulting in the relevant ChannelManager generating a PaymentSent event and dropping + /// the pending payment entry, and then reloading before the monitor is persisted, resulting in + /// the ChannelManager re-adding the same payment entry, before the same block is replayed, + /// resulting in a duplicate PaymentSent event. pending_monitor_updates: Mutex>, + /// When the user returns a PermanentFailure error from an update_persisted_channel call during + /// block processing, we inform the ChannelManager that the channel should be closed + /// asynchronously. In order to ensure no further changes happen before the ChannelManager has + /// processed the closure event, we set this to true and return PermanentFailure for any other + /// chain::Watch events. + channel_perm_failed: AtomicBool, } impl MonitorHolder { @@ -142,6 +175,10 @@ impl MonitorHolder { pending_monitor_updates_lock.iter().any(|update_id| if let UpdateOrigin::OffChain(_) = update_id.contents { true } else { false }) } + fn has_pending_chainsync_updates(&self, pending_monitor_updates_lock: &MutexGuard>) -> bool { + pending_monitor_updates_lock.iter().any(|update_id| + if let UpdateOrigin::ChainSync(_) = update_id.contents { true } else { false }) + } } /// A read-only reference to a current ChannelMonitor. @@ -177,11 +214,17 @@ pub struct ChainMonitor, { monitors: RwLock>>, + /// When we generate a [`MonitorUpdateId`] for a chain-event monitor persistence, we need a + /// unique ID, which we calculate by simply getting the next value from this counter. Note that + /// the ID is never persisted so it's ok that they reset on restart. + sync_persistence_id: AtomicCounter, chain_source: Option, broadcaster: T, logger: L, fee_estimator: F, persister: P, + /// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly + /// from the user and not from a [`ChannelMonitor`]. pending_monitor_events: Mutex>, } @@ -206,26 +249,50 @@ where C::Target: chain::Filter, FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { let mut dependent_txdata = Vec::new(); - let monitor_states = self.monitors.read().unwrap(); - for monitor_state in monitor_states.values() { - let mut txn_outputs = process(&monitor_state.monitor, txdata); + { + let monitor_states = self.monitors.write().unwrap(); + for (funding_outpoint, monitor_state) in monitor_states.iter() { + let monitor = &monitor_state.monitor; + let mut txn_outputs; + { + txn_outputs = process(monitor, txdata); + let update_id = MonitorUpdateId { + contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()), + }; + let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); - // Register any new outputs with the chain source for filtering, storing any dependent - // transactions from within the block that previously had not been included in txdata. - if let Some(ref chain_source) = self.chain_source { - let block_hash = header.block_hash(); - for (txid, mut outputs) in txn_outputs.drain(..) { - for (idx, output) in outputs.drain(..) { - // Register any new outputs with the chain source for filtering and recurse - // if it indicates that there are dependent transactions within the block - // that had not been previously included in txdata. - let output = WatchedOutput { - block_hash: Some(block_hash), - outpoint: OutPoint { txid, index: idx as u16 }, - script_pubkey: output.script_pubkey, - }; - if let Some(tx) = chain_source.register_output(output) { - dependent_txdata.push(tx); + log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); + match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) { + Ok(()) => + log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)), + Err(ChannelMonitorUpdateErr::PermanentFailure) => { + monitor_state.channel_perm_failed.store(true, Ordering::Release); + self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateFailed(*funding_outpoint)); + }, + Err(ChannelMonitorUpdateErr::TemporaryFailure) => { + log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); + pending_monitor_updates.push(update_id); + }, + } + } + + // Register any new outputs with the chain source for filtering, storing any dependent + // transactions from within the block that previously had not been included in txdata. + if let Some(ref chain_source) = self.chain_source { + let block_hash = header.block_hash(); + for (txid, mut outputs) in txn_outputs.drain(..) { + for (idx, output) in outputs.drain(..) { + // Register any new outputs with the chain source for filtering and recurse + // if it indicates that there are dependent transactions within the block + // that had not been previously included in txdata. + let output = WatchedOutput { + block_hash: Some(block_hash), + outpoint: OutPoint { txid, index: idx as u16 }, + script_pubkey: output.script_pubkey, + }; + if let Some(tx) = chain_source.register_output(output) { + dependent_txdata.push(tx); + } } } } @@ -251,6 +318,7 @@ where C::Target: chain::Filter, pub fn new(chain_source: Option, broadcaster: T, logger: L, feeest: F, persister: P) -> Self { Self { monitors: RwLock::new(HashMap::new()), + sync_persistence_id: AtomicCounter::new(), chain_source, broadcaster, logger, @@ -337,7 +405,7 @@ where C::Target: chain::Filter, pending_monitor_updates.retain(|update_id| *update_id != completed_update_id); match completed_update_id { - MonitorUpdateId { .. } => { + MonitorUpdateId { contents: UpdateOrigin::OffChain(_) } => { // Note that we only check for `UpdateOrigin::OffChain` failures here - if // we're being told that a `UpdateOrigin::OffChain` monitor update completed, // we only care about ensuring we don't tell the `ChannelManager` to restore @@ -348,8 +416,9 @@ where C::Target: chain::Filter, // `MonitorEvent`s from the monitor back to the `ChannelManager` until they // complete. let monitor_is_pending_updates = monitor_data.has_pending_offchain_updates(&pending_monitor_updates); - if monitor_is_pending_updates { - // If there are still monitor updates pending, we cannot yet construct an + if monitor_is_pending_updates || monitor_data.channel_perm_failed.load(Ordering::Acquire) { + // If there are still monitor updates pending (or an old monitor update + // finished after a later one perm-failed), we cannot yet construct an // UpdateCompleted event. return Ok(()); } @@ -357,7 +426,12 @@ where C::Target: chain::Filter, funding_txo, monitor_update_id: monitor_data.monitor.get_latest_update_id(), }); - } + }, + MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => { + // We've already done everything we need to, the next time + // release_pending_monitor_events is called, any events for this ChannelMonitor + // will be returned if there's no more SyncPersistId events left. + }, } Ok(()) } @@ -502,7 +576,11 @@ where C::Target: chain::Filter, monitor.load_outputs_to_watch(chain_source); } } - entry.insert(MonitorHolder { monitor, pending_monitor_updates: Mutex::new(pending_monitor_updates) }); + entry.insert(MonitorHolder { + monitor, + pending_monitor_updates: Mutex::new(pending_monitor_updates), + channel_perm_failed: AtomicBool::new(false), + }); persist_res } @@ -534,15 +612,19 @@ where C::Target: chain::Filter, // still be changed. So, persist the updated monitor despite the error. let update_id = MonitorUpdateId::from_monitor_update(&update); let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); - let persist_res = self.persister.update_persisted_channel(funding_txo, &update, monitor, update_id); + let persist_res = self.persister.update_persisted_channel(funding_txo, &Some(update), monitor, update_id); if let Err(e) = persist_res { if e == ChannelMonitorUpdateErr::TemporaryFailure { pending_monitor_updates.push(update_id); + } else { + monitor_state.channel_perm_failed.store(true, Ordering::Release); } log_error!(self.logger, "Failed to persist channel monitor update: {:?}", e); } if update_res.is_err() { Err(ChannelMonitorUpdateErr::PermanentFailure) + } else if monitor_state.channel_perm_failed.load(Ordering::Acquire) { + Err(ChannelMonitorUpdateErr::PermanentFailure) } else { persist_res } @@ -553,7 +635,23 @@ where C::Target: chain::Filter, fn release_pending_monitor_events(&self) -> Vec { let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0); for monitor_state in self.monitors.read().unwrap().values() { - pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events()); + let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap()); + if is_pending_monitor_update { + log_info!(self.logger, "A Channel Monitor sync is still in progress, refusing to provide monitor events!"); + } else { + if monitor_state.channel_perm_failed.load(Ordering::Acquire) { + // If a `UpdateOrigin::ChainSync` persistence failed with `PermanantFailure`, + // we don't really know if the latest `ChannelMonitor` state is on disk or not. + // We're supposed to hold monitor updates until the latest state is on disk to + // avoid duplicate events, but the user told us persistence is screw-y and may + // not complete. We can't hold events forever because we may learn some payment + // preimage, so instead we just log and hope the user complied with the + // `PermanentFailure` requirements of having at least the local-disk copy + // updated. + log_info!(self.logger, "A Channel Monitor sync returned PermanentFailure. Returning monitor events but duplicate events may appear after reload!"); + } + pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events()); + } } pending_monitor_events } diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index a7506acb4..7fad40d9c 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -146,9 +146,16 @@ pub enum MonitorEvent { /// same [`ChannelMonitor`] have been applied and persisted. monitor_update_id: u64, }, + + /// Indicates a [`ChannelMonitor`] update has failed. See + /// [`ChannelMonitorUpdateErr::PermanentFailure`] for more information on how this is used. + /// + /// [`ChannelMonitorUpdateErr::PermanentFailure`]: super::ChannelMonitorUpdateErr::PermanentFailure + UpdateFailed(OutPoint), } impl_writeable_tlv_based_enum_upgradable!(MonitorEvent, - // Note that UpdateCompleted is currently never serialized to disk as it is generated only in ChainMonitor + // Note that UpdateCompleted and UpdateFailed are currently never serialized to disk as they are + // generated only in ChainMonitor (0, UpdateCompleted) => { (0, funding_txo, required), (2, monitor_update_id, required), @@ -156,6 +163,7 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorEvent, ; (2, HTLCEvent), (4, CommitmentTxConfirmed), + (6, UpdateFailed), ); /// Simple structure sent back by `chain::Watch` when an HTLC from a forward channel is detected on @@ -649,7 +657,17 @@ pub(crate) struct ChannelMonitorImpl { payment_preimages: HashMap, + // Note that `MonitorEvent`s MUST NOT be generated during update processing, only generated + // during chain data processing. This prevents a race in `ChainMonitor::update_channel` (and + // presumably user implementations thereof as well) where we update the in-memory channel + // object, then before the persistence finishes (as it's all under a read-lock), we return + // pending events to the user or to the relevant `ChannelManager`. Then, on reload, we'll have + // the pre-event state here, but have processed the event in the `ChannelManager`. + // Note that because the `event_lock` in `ChainMonitor` is only taken in + // block/transaction-connected events and *not* during block/transaction-disconnected events, + // we further MUST NOT generate events during block/transaction-disconnection. pending_monitor_events: Vec, + pending_events: Vec, // Used to track on-chain events (i.e., transactions part of channels confirmed on chain) on diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index 55755073b..25e5a97d2 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -285,6 +285,10 @@ pub trait Watch { /// Returns any monitor events since the last call. Subsequent calls must only return new /// events. /// + /// Note that after any block- or transaction-connection calls to a [`ChannelMonitor`], no + /// further events may be returned here until the [`ChannelMonitor`] has been fully persisted + /// to disk. + /// /// For details on asynchronous [`ChannelMonitor`] updating and returning /// [`MonitorEvent::UpdateCompleted`] here, see [`ChannelMonitorUpdateErr::TemporaryFailure`]. fn release_pending_monitor_events(&self) -> Vec; diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 72f0bebdf..1f3ad5541 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -4090,7 +4090,8 @@ impl ChannelMana self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() }); } }, - MonitorEvent::CommitmentTxConfirmed(funding_outpoint) => { + MonitorEvent::CommitmentTxConfirmed(funding_outpoint) | + MonitorEvent::UpdateFailed(funding_outpoint) => { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; let by_id = &mut channel_state.by_id; @@ -4106,7 +4107,12 @@ impl ChannelMana msg: update }); } - self.issue_channel_close_events(&chan, ClosureReason::CommitmentTxConfirmed); + let reason = if let MonitorEvent::UpdateFailed(_) = monitor_event { + ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() } + } else { + ClosureReason::CommitmentTxConfirmed + }; + self.issue_channel_close_events(&chan, reason); pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: chan.get_counterparty_node_id(), action: msgs::ErrorAction::SendErrorMessage { @@ -5440,20 +5446,25 @@ impl Writeable f /// /// At a high-level, the process for deserializing a ChannelManager and resuming normal operation /// is: -/// 1) Deserialize all stored ChannelMonitors. -/// 2) Deserialize the ChannelManager by filling in this struct and calling: -/// <(BlockHash, ChannelManager)>::read(reader, args) -/// This may result in closing some Channels if the ChannelMonitor is newer than the stored -/// ChannelManager state to ensure no loss of funds. Thus, transactions may be broadcasted. -/// 3) If you are not fetching full blocks, register all relevant ChannelMonitor outpoints the same -/// way you would handle a `chain::Filter` call using ChannelMonitor::get_outputs_to_watch() and -/// ChannelMonitor::get_funding_txo(). -/// 4) Reconnect blocks on your ChannelMonitors. -/// 5) Disconnect/connect blocks on the ChannelManager. -/// 6) Move the ChannelMonitors into your local chain::Watch. +/// 1) Deserialize all stored [`ChannelMonitor`]s. +/// 2) Deserialize the [`ChannelManager`] by filling in this struct and calling: +/// `<(BlockHash, ChannelManager)>::read(reader, args)` +/// This may result in closing some channels if the [`ChannelMonitor`] is newer than the stored +/// [`ChannelManager`] state to ensure no loss of funds. Thus, transactions may be broadcasted. +/// 3) If you are not fetching full blocks, register all relevant [`ChannelMonitor`] outpoints the +/// same way you would handle a [`chain::Filter`] call using +/// [`ChannelMonitor::get_outputs_to_watch`] and [`ChannelMonitor::get_funding_txo`]. +/// 4) Reconnect blocks on your [`ChannelMonitor`]s. +/// 5) Disconnect/connect blocks on the [`ChannelManager`]. +/// 6) Re-persist the [`ChannelMonitor`]s to ensure the latest state is on disk. +/// Note that if you're using a [`ChainMonitor`] for your [`chain::Watch`] implementation, you +/// will likely accomplish this as a side-effect of calling [`chain::Watch::watch_channel`] in +/// the next step. +/// 7) Move the [`ChannelMonitor`]s into your local [`chain::Watch`]. If you're using a +/// [`ChainMonitor`], this is done by calling [`chain::Watch::watch_channel`]. /// -/// Note that the ordering of #4-6 is not of importance, however all three must occur before you -/// call any other methods on the newly-deserialized ChannelManager. +/// Note that the ordering of #4-7 is not of importance, however all four must occur before you +/// call any other methods on the newly-deserialized [`ChannelManager`]. /// /// Note that because some channels may be closed during deserialization, it is critical that you /// always deserialize only the latest version of a ChannelManager and ChannelMonitors available to @@ -5461,6 +5472,8 @@ impl Writeable f /// broadcast), and then later deserialize a newer version of the same ChannelManager (which will /// not force-close the same channels but consider them live), you may end up revoking a state for /// which you've already broadcasted the transaction. +/// +/// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor pub struct ChannelManagerReadArgs<'a, Signer: 'a + Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> where M::Target: chain::Watch, T::Target: BroadcasterInterface, diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index f0e453f9b..7ec794555 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -190,7 +190,7 @@ impl chainmonitor::Persist for TestPersiste ret } - fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &channelmonitor::ChannelMonitorUpdate, _data: &channelmonitor::ChannelMonitor, _id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { + fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &Option, _data: &channelmonitor::ChannelMonitor, _id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { let ret = self.update_ret.lock().unwrap().clone(); if let Some(next_ret) = self.next_update_ret.lock().unwrap().take() { *self.update_ret.lock().unwrap() = next_ret; From 3016ed2d9146bfff116f035404b333ba950a1195 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 10 Oct 2021 18:02:17 +0000 Subject: [PATCH 6/8] Update test_dup_htlc_onchain_fails_on_reload for new persist API ChannelMonitors now require that they be re-persisted before MonitorEvents be provided to the ChannelManager, the exact thing that test_dup_htlc_onchain_fails_on_reload was testing for when it *didn't* happen. As such, test_dup_htlc_onchain_fails_on_reload is now testing that we bahve correctly when the API guarantees are not met, something we don't need to do. Here, we adapt it to test the new API requirements through ChainMonitor's calls to the Persist trait instead. --- lightning/src/ln/functional_tests.rs | 78 +++++++++++++++++++--------- lightning/src/util/test_utils.rs | 10 +++- 2 files changed, 62 insertions(+), 26 deletions(-) diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index ca4fa3b35..63bf52b41 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -12,7 +12,7 @@ //! claim outputs on-chain. use chain; -use chain::{Confirm, Listen, Watch}; +use chain::{Confirm, Listen, Watch, ChannelMonitorUpdateErr}; use chain::channelmonitor; use chain::channelmonitor::{ChannelMonitor, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY}; use chain::transaction::OutPoint; @@ -4099,21 +4099,15 @@ fn test_no_txn_manager_serialize_deserialize() { send_payment(&nodes[0], &[&nodes[1]], 1000000); } -#[test] -fn test_dup_htlc_onchain_fails_on_reload() { +fn do_test_dup_htlc_onchain_fails_on_reload(persist_manager_post_event: bool) { // When a Channel is closed, any outbound HTLCs which were relayed through it are simply // dropped when the Channel is. From there, the ChannelManager relies on the ChannelMonitor // having a copy of the relevant fail-/claim-back data and processes the HTLC fail/claim when // the ChannelMonitor tells it to. // - // If, due to an on-chain event, an HTLC is failed/claimed, and then we serialize the - // ChannelManager, we generally expect there not to be a duplicate HTLC fail/claim (eg via a - // PaymentPathFailed event appearing). However, because we may not serialize the relevant - // ChannelMonitor at the same time, this isn't strictly guaranteed. In order to provide this - // consistency, the ChannelManager explicitly tracks pending-onchain-resolution outbound HTLCs - // and de-duplicates ChannelMonitor events. - // - // This tests that explicit tracking behavior. + // If, due to an on-chain event, an HTLC is failed/claimed, we should avoid providing the + // ChannelManager the HTLC event until after the monitor is re-persisted. This should prevent a + // duplicate HTLC fail/claim (e.g. via a PaymentPathFailed event). let chanmon_cfgs = create_chanmon_cfgs(2); let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); @@ -4122,7 +4116,7 @@ fn test_dup_htlc_onchain_fails_on_reload() { let nodes_0_deserialized: ChannelManager; let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); - let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()).2; + let (_, _, chan_id, funding_tx) = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()); // Route a payment, but force-close the channel before the HTLC fulfill message arrives at // nodes[0]. @@ -4140,35 +4134,59 @@ fn test_dup_htlc_onchain_fails_on_reload() { let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0); assert_eq!(node_txn.len(), 3); assert_eq!(node_txn[0], node_txn[1]); + check_spends!(node_txn[1], funding_tx); + check_spends!(node_txn[2], node_txn[1]); assert!(nodes[1].node.claim_funds(payment_preimage)); check_added_monitors!(nodes[1], 1); let mut header = BlockHeader { version: 0x20000000, prev_blockhash: nodes[1].best_block_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 }; - connect_block(&nodes[1], &Block { header, txdata: vec![node_txn[1].clone(), node_txn[2].clone()]}); + connect_block(&nodes[1], &Block { header, txdata: vec![node_txn[1].clone()]}); check_closed_broadcast!(nodes[1], true); check_added_monitors!(nodes[1], 1); check_closed_event!(nodes[1], 1, ClosureReason::CommitmentTxConfirmed); let claim_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0); header.prev_blockhash = nodes[0].best_block_hash(); - connect_block(&nodes[0], &Block { header, txdata: vec![node_txn[1].clone(), node_txn[2].clone()]}); + connect_block(&nodes[0], &Block { header, txdata: vec![node_txn[1].clone()]}); - // Serialize out the ChannelMonitor before connecting the on-chain claim transactions. This is - // fairly normal behavior as ChannelMonitor(s) are often not re-serialized when on-chain events - // happen, unlike ChannelManager which tends to be re-serialized after any relevant event(s). - let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new()); - get_monitor!(nodes[0], chan_id).write(&mut chan_0_monitor_serialized).unwrap(); + // Now connect the HTLC claim transaction with the ChainMonitor-generated ChannelMonitor update + // returning TemporaryFailure. This should cause the claim event to never make its way to the + // ChannelManager. + chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear(); + chanmon_cfgs[0].persister.set_update_ret(Err(ChannelMonitorUpdateErr::TemporaryFailure)); header.prev_blockhash = nodes[0].best_block_hash(); - let claim_block = Block { header, txdata: claim_txn}; + let claim_block = Block { header, txdata: claim_txn }; connect_block(&nodes[0], &claim_block); + + let funding_txo = OutPoint { txid: funding_tx.txid(), index: 0 }; + let mon_updates: Vec<_> = chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap() + .get_mut(&funding_txo).unwrap().drain().collect(); + assert_eq!(mon_updates.len(), 1); + assert!(nodes[0].chain_monitor.release_pending_monitor_events().is_empty()); + assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); + + // If we persist the ChannelManager here, we should get the PaymentSent event after + // deserialization. + let mut chan_manager_serialized = test_utils::TestVecWriter(Vec::new()); + if !persist_manager_post_event { + nodes[0].node.write(&mut chan_manager_serialized).unwrap(); + } + + // Now persist the ChannelMonitor and inform the ChainMonitor that we're done, generating the + // payment sent event. + chanmon_cfgs[0].persister.set_update_ret(Ok(())); + let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new()); + get_monitor!(nodes[0], chan_id).write(&mut chan_0_monitor_serialized).unwrap(); + nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(funding_txo, mon_updates[0]).unwrap(); expect_payment_sent!(nodes[0], payment_preimage); - // ChannelManagers generally get re-serialized after any relevant event(s). Since we just - // connected a highly-relevant block, it likely gets serialized out now. - let mut chan_manager_serialized = test_utils::TestVecWriter(Vec::new()); - nodes[0].node.write(&mut chan_manager_serialized).unwrap(); + // If we persist the ChannelManager after we get the PaymentSent event, we shouldn't get it + // twice. + if persist_manager_post_event { + nodes[0].node.write(&mut chan_manager_serialized).unwrap(); + } // Now reload nodes[0]... persister = test_utils::TestPersister::new(); @@ -4200,6 +4218,12 @@ fn test_dup_htlc_onchain_fails_on_reload() { check_added_monitors!(nodes[0], 1); nodes[0].node = &nodes_0_deserialized; + if persist_manager_post_event { + assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); + } else { + expect_payment_sent!(nodes[0], payment_preimage); + } + // Note that if we re-connect the block which exposed nodes[0] to the payment preimage (but // which the current ChannelMonitor has not seen), the ChannelManager's de-duplication of // payment events should kick in, leaving us with no pending events here. @@ -4208,6 +4232,12 @@ fn test_dup_htlc_onchain_fails_on_reload() { assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); } +#[test] +fn test_dup_htlc_onchain_fails_on_reload() { + do_test_dup_htlc_onchain_fails_on_reload(true); + do_test_dup_htlc_onchain_fails_on_reload(false); +} + #[test] fn test_manager_serialize_deserialize_events() { // This test makes sure the events field in ChannelManager survives de/serialization diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 7ec794555..5a125929f 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -163,13 +163,16 @@ pub struct TestPersister { /// If this is set to Some(), after the next return, we'll always return this until update_ret /// is changed: pub next_update_ret: Mutex>>, - + /// When we get an update_persisted_channel call with no ChannelMonitorUpdate, we insert the + /// MonitorUpdateId here. + pub chain_sync_monitor_persistences: Mutex>>, } impl TestPersister { pub fn new() -> Self { Self { update_ret: Mutex::new(Ok(())), next_update_ret: Mutex::new(None), + chain_sync_monitor_persistences: Mutex::new(HashMap::new()), } } @@ -190,11 +193,14 @@ impl chainmonitor::Persist for TestPersiste ret } - fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &Option, _data: &channelmonitor::ChannelMonitor, _id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { + fn update_persisted_channel(&self, funding_txo: OutPoint, update: &Option, _data: &channelmonitor::ChannelMonitor, update_id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { let ret = self.update_ret.lock().unwrap().clone(); if let Some(next_ret) = self.next_update_ret.lock().unwrap().take() { *self.update_ret.lock().unwrap() = next_ret; } + if update.is_none() { + self.chain_sync_monitor_persistences.lock().unwrap().entry(funding_txo).or_insert(HashSet::new()).insert(update_id); + } ret } } From e6bc2b541e8bccab7d72c8fa6828c5675a1a7431 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 14 Oct 2021 23:38:08 +0000 Subject: [PATCH 7/8] Always release `MonitorEvent`s to `ChannelManager` after 3 blocks If we have a `ChannelMonitor` update from an on-chain event which returns a `TemporaryFailure`, we block `MonitorEvent`s from that `ChannelMonitor` until the update is persisted. This prevents duplicate payment send events to the user after payments get reloaded from monitors on restart. However, if the event being avoided isn't going to generate a PaymentSent, but instead result in us claiming an HTLC from an upstream channel (ie the HTLC was forwarded), then the result of a user delaying the event is that we delay getting our money, not a duplicate event. Because user persistence may take an arbitrary amount of time, we need to bound the amount of time we can possibly wait to return events, which we do here by bounding it to 3 blocks. Thanks to Val for catching this in review. --- lightning/src/chain/chainmonitor.rs | 63 ++++++++++++++++++++++++----- 1 file changed, 52 insertions(+), 11 deletions(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index d99d6708a..13a4ac378 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -29,7 +29,7 @@ use bitcoin::hash_types::Txid; use chain; use chain::{ChannelMonitorUpdateErr, Filter, WatchedOutput}; use chain::chaininterface::{BroadcasterInterface, FeeEstimator}; -use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs}; +use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS}; use chain::transaction::{OutPoint, TransactionData}; use chain::keysinterface::Sign; use util::atomic_counter::AtomicCounter; @@ -42,7 +42,7 @@ use ln::channelmanager::ChannelDetails; use prelude::*; use sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard}; use core::ops::Deref; -use core::sync::atomic::{AtomicBool, Ordering}; +use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; #[derive(Clone, Copy, Hash, PartialEq, Eq)] /// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents @@ -168,6 +168,13 @@ struct MonitorHolder { /// processed the closure event, we set this to true and return PermanentFailure for any other /// chain::Watch events. channel_perm_failed: AtomicBool, + /// The last block height at which no [`UpdateOrigin::ChainSync`] monitor updates were present + /// in `pending_monitor_updates`. + /// If it's been more than [`LATENCY_GRACE_PERIOD_BLOCKS`] since we started waiting on a chain + /// sync event, we let monitor events return to `ChannelManager` because we cannot hold them up + /// forever or we'll end up with HTLC preimages waiting to feed back into an upstream channel + /// forever, risking funds loss. + last_chain_persist_height: AtomicUsize, } impl MonitorHolder { @@ -226,6 +233,8 @@ pub struct ChainMonitor>, + /// The best block height seen, used as a proxy for the passage of time. + highest_chain_height: AtomicUsize, } impl ChainMonitor @@ -244,13 +253,25 @@ where C::Target: chain::Filter, /// calls must not exclude any transactions matching the new outputs nor any in-block /// descendants of such transactions. It is not necessary to re-fetch the block to obtain /// updated `txdata`. - fn process_chain_data(&self, header: &BlockHeader, txdata: &TransactionData, process: FN) + /// + /// Calls which represent a new blockchain tip height should set `best_height`. + fn process_chain_data(&self, header: &BlockHeader, best_height: Option, txdata: &TransactionData, process: FN) where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { let mut dependent_txdata = Vec::new(); { let monitor_states = self.monitors.write().unwrap(); + if let Some(height) = best_height { + // If the best block height is being updated, update highest_chain_height under the + // monitors write lock. + let old_height = self.highest_chain_height.load(Ordering::Acquire); + let new_height = height as usize; + if new_height > old_height { + self.highest_chain_height.store(new_height, Ordering::Release); + } + } + for (funding_outpoint, monitor_state) in monitor_states.iter() { let monitor = &monitor_state.monitor; let mut txn_outputs; @@ -260,6 +281,14 @@ where C::Target: chain::Filter, contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()), }; let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); + if let Some(height) = best_height { + if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) { + // If there are not ChainSync persists awaiting completion, go ahead and + // set last_chain_persist_height here - we wouldn't want the first + // TemporaryFailure to always immediately be considered "overly delayed". + monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release); + } + } log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) { @@ -304,7 +333,7 @@ where C::Target: chain::Filter, dependent_txdata.sort_unstable_by_key(|(index, _tx)| *index); dependent_txdata.dedup_by_key(|(index, _tx)| *index); let txdata: Vec<_> = dependent_txdata.iter().map(|(index, tx)| (*index, tx)).collect(); - self.process_chain_data(header, &txdata, process); + self.process_chain_data(header, None, &txdata, process); // We skip the best height the second go-around } } @@ -325,6 +354,7 @@ where C::Target: chain::Filter, fee_estimator: feeest, persister, pending_monitor_events: Mutex::new(Vec::new()), + highest_chain_height: AtomicUsize::new(0), } } @@ -428,9 +458,11 @@ where C::Target: chain::Filter, }); }, MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => { - // We've already done everything we need to, the next time - // release_pending_monitor_events is called, any events for this ChannelMonitor - // will be returned if there's no more SyncPersistId events left. + if !monitor_data.has_pending_chainsync_updates(&pending_monitor_updates) { + monitor_data.last_chain_persist_height.store(self.highest_chain_height.load(Ordering::Acquire), Ordering::Release); + // The next time release_pending_monitor_events is called, any events for this + // ChannelMonitor will be returned. + } }, } Ok(()) @@ -470,7 +502,7 @@ where let header = &block.header; let txdata: Vec<_> = block.txdata.iter().enumerate().collect(); log_debug!(self.logger, "New best block {} at height {} provided via block_connected", header.block_hash(), height); - self.process_chain_data(header, &txdata, |monitor, txdata| { + self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| { monitor.block_connected( header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger) }); @@ -497,7 +529,7 @@ where { fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) { log_debug!(self.logger, "{} provided transactions confirmed at height {} in block {}", txdata.len(), height, header.block_hash()); - self.process_chain_data(header, txdata, |monitor, txdata| { + self.process_chain_data(header, None, txdata, |monitor, txdata| { monitor.transactions_confirmed( header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger) }); @@ -513,7 +545,7 @@ where fn best_block_updated(&self, header: &BlockHeader, height: u32) { log_debug!(self.logger, "New best block {} at height {} provided via best_block_updated", header.block_hash(), height); - self.process_chain_data(header, &[], |monitor, txdata| { + self.process_chain_data(header, Some(height), &[], |monitor, txdata| { // While in practice there shouldn't be any recursive calls when given empty txdata, // it's still possible if a chain::Filter implementation returns a transaction. debug_assert!(txdata.is_empty()); @@ -580,6 +612,7 @@ where C::Target: chain::Filter, monitor, pending_monitor_updates: Mutex::new(pending_monitor_updates), channel_perm_failed: AtomicBool::new(false), + last_chain_persist_height: AtomicUsize::new(self.highest_chain_height.load(Ordering::Acquire)), }); persist_res } @@ -636,7 +669,10 @@ where C::Target: chain::Filter, let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0); for monitor_state in self.monitors.read().unwrap().values() { let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap()); - if is_pending_monitor_update { + if is_pending_monitor_update && + monitor_state.last_chain_persist_height.load(Ordering::Acquire) + LATENCY_GRACE_PERIOD_BLOCKS as usize + > self.highest_chain_height.load(Ordering::Acquire) + { log_info!(self.logger, "A Channel Monitor sync is still in progress, refusing to provide monitor events!"); } else { if monitor_state.channel_perm_failed.load(Ordering::Acquire) { @@ -650,6 +686,11 @@ where C::Target: chain::Filter, // updated. log_info!(self.logger, "A Channel Monitor sync returned PermanentFailure. Returning monitor events but duplicate events may appear after reload!"); } + if is_pending_monitor_update { + log_error!(self.logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS); + log_error!(self.logger, " To avoid funds-loss, we are allowing monitor updates to be released."); + log_error!(self.logger, " This may cause duplicate payment events to be generated."); + } pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events()); } } From 6fb5bd36a7b83184029b599d7a07838e84cbaa52 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 19 Oct 2021 21:49:03 +0000 Subject: [PATCH 8/8] Clarify the contexts in which persist_new_channel may be called Its somewhat confusing that `persist_new_channel` is called on startup for an existing channel in common deployments, so we call it out explicitly. --- lightning/src/chain/chainmonitor.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 13a4ac378..71b0b3e50 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -92,10 +92,12 @@ impl MonitorUpdateId { /// closed without broadcasting the latest state. See /// [`ChannelMonitorUpdateErr::PermanentFailure`] for more details. pub trait Persist { - /// Persist a new channel's data. The data can be stored any way you want, but the identifier - /// provided by LDK is the channel's outpoint (and it is up to you to maintain a correct - /// mapping between the outpoint and the stored channel data). Note that you **must** persist - /// every new monitor to disk. + /// Persist a new channel's data in response to a [`chain::Watch::watch_channel`] call. This is + /// called by [`ChannelManager`] for new channels, or may be called directly, e.g. on startup. + /// + /// The data can be stored any way you want, but the identifier provided by LDK is the + /// channel's outpoint (and it is up to you to maintain a correct mapping between the outpoint + /// and the stored channel data). Note that you **must** persist every new monitor to disk. /// /// The `update_id` is used to identify this call to [`ChainMonitor::channel_monitor_updated`], /// if you return [`ChannelMonitorUpdateErr::TemporaryFailure`]. @@ -103,6 +105,7 @@ pub trait Persist { /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor` /// and [`ChannelMonitorUpdateErr`] for requirements when returning errors. /// + /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager /// [`Writeable::write`]: crate::util::ser::Writeable::write fn persist_new_channel(&self, channel_id: OutPoint, data: &ChannelMonitor, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>;