Merge pull request #2521 from TheBlueMatt/2023-08-one-less-write

Avoid persisting ChannelManager in some cases and separate event from persist notifies
This commit is contained in:
Matt Corallo 2023-09-13 15:40:12 +00:00 committed by GitHub
commit 286d1db2cd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 346 additions and 190 deletions

View file

@ -125,7 +125,6 @@ struct TestChainMonitor {
// "fails" if we ever force-close a channel, we avoid doing so, always saving the latest
// fully-serialized monitor state here, as well as the corresponding update_id.
pub latest_monitors: Mutex<HashMap<OutPoint, (u64, Vec<u8>)>>,
pub should_update_manager: atomic::AtomicBool,
}
impl TestChainMonitor {
pub fn new(broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>, persister: Arc<TestPersister>, keys: Arc<KeyProvider>) -> Self {
@ -135,7 +134,6 @@ impl TestChainMonitor {
keys,
persister,
latest_monitors: Mutex::new(HashMap::new()),
should_update_manager: atomic::AtomicBool::new(false),
}
}
}
@ -146,7 +144,6 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
if let Some(_) = self.latest_monitors.lock().unwrap().insert(funding_txo, (monitor.get_latest_update_id(), ser.0)) {
panic!("Already had monitor pre-watch_channel");
}
self.should_update_manager.store(true, atomic::Ordering::Relaxed);
self.chain_monitor.watch_channel(funding_txo, monitor)
}
@ -162,7 +159,6 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
let mut ser = VecWriter(Vec::new());
deserialized_monitor.write(&mut ser).unwrap();
map_entry.insert((update.update_id, ser.0));
self.should_update_manager.store(true, atomic::Ordering::Relaxed);
self.chain_monitor.update_channel(funding_txo, update)
}
@ -1101,11 +1097,9 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out) {
if !chan_a_disconnected {
nodes[1].peer_disconnected(&nodes[0].get_our_node_id());
chan_a_disconnected = true;
drain_msg_events_on_disconnect!(0);
}
if monitor_a.should_update_manager.load(atomic::Ordering::Relaxed) {
node_a_ser.0.clear();
nodes[0].write(&mut node_a_ser).unwrap();
push_excess_b_events!(nodes[1].get_and_clear_pending_msg_events().drain(..), Some(0));
ab_events.clear();
ba_events.clear();
}
let (new_node_a, new_monitor_a) = reload_node!(node_a_ser, 0, monitor_a, keys_manager_a, fee_est_a);
nodes[0] = new_node_a;
@ -1134,11 +1128,9 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out) {
if !chan_b_disconnected {
nodes[1].peer_disconnected(&nodes[2].get_our_node_id());
chan_b_disconnected = true;
drain_msg_events_on_disconnect!(2);
}
if monitor_c.should_update_manager.load(atomic::Ordering::Relaxed) {
node_c_ser.0.clear();
nodes[2].write(&mut node_c_ser).unwrap();
push_excess_b_events!(nodes[1].get_and_clear_pending_msg_events().drain(..), Some(2));
bc_events.clear();
cb_events.clear();
}
let (new_node_c, new_monitor_c) = reload_node!(node_c_ser, 2, monitor_c, keys_manager_c, fee_est_c);
nodes[2] = new_node_c;
@ -1304,15 +1296,18 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out) {
_ => test_return!(),
}
node_a_ser.0.clear();
nodes[0].write(&mut node_a_ser).unwrap();
monitor_a.should_update_manager.store(false, atomic::Ordering::Relaxed);
node_b_ser.0.clear();
nodes[1].write(&mut node_b_ser).unwrap();
monitor_b.should_update_manager.store(false, atomic::Ordering::Relaxed);
node_c_ser.0.clear();
nodes[2].write(&mut node_c_ser).unwrap();
monitor_c.should_update_manager.store(false, atomic::Ordering::Relaxed);
if nodes[0].get_and_clear_needs_persistence() == true {
node_a_ser.0.clear();
nodes[0].write(&mut node_a_ser).unwrap();
}
if nodes[1].get_and_clear_needs_persistence() == true {
node_b_ser.0.clear();
nodes[1].write(&mut node_b_ser).unwrap();
}
if nodes[2].get_and_clear_needs_persistence() == true {
node_c_ser.0.clear();
nodes[2].write(&mut node_c_ser).unwrap();
}
}
}

View file

@ -315,7 +315,7 @@ macro_rules! define_run_body {
// see `await_start`'s use below.
let mut await_start = None;
if $check_slow_await { await_start = Some($get_timer(1)); }
let updates_available = $await;
$await;
let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false };
// Exit the loop if the background processor was requested to stop.
@ -324,7 +324,7 @@ macro_rules! define_run_body {
break;
}
if updates_available {
if $channel_manager.get_and_clear_needs_persistence() {
log_trace!($logger, "Persisting ChannelManager...");
$persister.persist_manager(&*$channel_manager)?;
log_trace!($logger, "Done persisting ChannelManager.");
@ -655,16 +655,14 @@ where
channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
gossip_sync, peer_manager, logger, scorer, should_break, {
let fut = Selector {
a: channel_manager.get_persistable_update_future(),
a: channel_manager.get_event_or_persistence_needed_future(),
b: chain_monitor.get_update_future(),
c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
};
match fut.await {
SelectorOutput::A => true,
SelectorOutput::B => false,
SelectorOutput::A|SelectorOutput::B => {},
SelectorOutput::C(exit) => {
should_break = exit;
false
}
}
}, |t| sleeper(Duration::from_secs(t)),
@ -787,10 +785,10 @@ impl BackgroundProcessor {
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
channel_manager, channel_manager.process_pending_events(&event_handler),
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
Sleeper::from_two_futures(
channel_manager.get_persistable_update_future(),
{ Sleeper::from_two_futures(
channel_manager.get_event_or_persistence_needed_future(),
chain_monitor.get_update_future()
).wait_timeout(Duration::from_millis(100)),
).wait_timeout(Duration::from_millis(100)); },
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false)
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
@ -1326,7 +1324,7 @@ mod tests {
check_persisted_data!(nodes[0].node, filepath.clone());
loop {
if !nodes[0].node.get_persistence_condvar_value() { break }
if !nodes[0].node.get_event_or_persist_condvar_value() { break }
}
// Force-close the channel.
@ -1335,7 +1333,7 @@ mod tests {
// Check that the force-close updates are persisted.
check_persisted_data!(nodes[0].node, filepath.clone());
loop {
if !nodes[0].node.get_persistence_condvar_value() { break }
if !nodes[0].node.get_event_or_persist_condvar_value() { break }
}
// Check network graph is persisted

View file

@ -495,6 +495,10 @@ impl MsgHandleErrInternal {
channel_capacity: None,
}
}
fn closes_channel(&self) -> bool {
self.chan_id.is_some()
}
}
/// We hold back HTLCs we intend to relay for a random interval greater than this (see
@ -1185,7 +1189,8 @@ where
background_events_processed_since_startup: AtomicBool,
persistence_notifier: Notifier,
event_persist_notifier: Notifier,
needs_persist_flag: AtomicBool,
entropy_source: ES,
node_signer: NS,
@ -1214,7 +1219,8 @@ pub struct ChainParameters {
#[must_use]
enum NotifyOption {
DoPersist,
SkipPersist,
SkipPersistHandleEvents,
SkipPersistNoEvents,
}
/// Whenever we release the `ChannelManager`'s `total_consistency_lock`, from read mode, it is
@ -1227,43 +1233,75 @@ enum NotifyOption {
/// We allow callers to either always notify by constructing with `notify_on_drop` or choose to
/// notify or not based on whether relevant changes have been made, providing a closure to
/// `optionally_notify` which returns a `NotifyOption`.
struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> {
persistence_notifier: &'a Notifier,
struct PersistenceNotifierGuard<'a, F: FnMut() -> NotifyOption> {
event_persist_notifier: &'a Notifier,
needs_persist_flag: &'a AtomicBool,
should_persist: F,
// We hold onto this result so the lock doesn't get released immediately.
_read_guard: RwLockReadGuard<'a, ()>,
}
impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care what the concrete F is here, it's unused
fn notify_on_drop<C: AChannelManager>(cm: &'a C) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> {
/// Notifies any waiters and indicates that we need to persist, in addition to possibly having
/// events to handle.
///
/// This must always be called if the changes included a `ChannelMonitorUpdate`, as well as in
/// other cases where losing the changes on restart may result in a force-close or otherwise
/// isn't ideal.
fn notify_on_drop<C: AChannelManager>(cm: &'a C) -> PersistenceNotifierGuard<'a, impl FnMut() -> NotifyOption> {
Self::optionally_notify(cm, || -> NotifyOption { NotifyOption::DoPersist })
}
fn optionally_notify<F: FnMut() -> NotifyOption, C: AChannelManager>(cm: &'a C, mut persist_check: F)
-> PersistenceNotifierGuard<'a, impl FnMut() -> NotifyOption> {
let read_guard = cm.get_cm().total_consistency_lock.read().unwrap();
let _ = cm.get_cm().process_background_events(); // We always persist
let force_notify = cm.get_cm().process_background_events();
PersistenceNotifierGuard {
persistence_notifier: &cm.get_cm().persistence_notifier,
should_persist: || -> NotifyOption { NotifyOption::DoPersist },
event_persist_notifier: &cm.get_cm().event_persist_notifier,
needs_persist_flag: &cm.get_cm().needs_persist_flag,
should_persist: move || {
// Pick the "most" action between `persist_check` and the background events
// processing and return that.
let notify = persist_check();
match (notify, force_notify) {
(NotifyOption::DoPersist, _) => NotifyOption::DoPersist,
(_, NotifyOption::DoPersist) => NotifyOption::DoPersist,
(NotifyOption::SkipPersistHandleEvents, _) => NotifyOption::SkipPersistHandleEvents,
(_, NotifyOption::SkipPersistHandleEvents) => NotifyOption::SkipPersistHandleEvents,
_ => NotifyOption::SkipPersistNoEvents,
}
},
_read_guard: read_guard,
}
}
/// Note that if any [`ChannelMonitorUpdate`]s are possibly generated,
/// [`ChannelManager::process_background_events`] MUST be called first.
fn optionally_notify<F: Fn() -> NotifyOption>(lock: &'a RwLock<()>, notifier: &'a Notifier, persist_check: F) -> PersistenceNotifierGuard<'a, F> {
let read_guard = lock.read().unwrap();
/// [`ChannelManager::process_background_events`] MUST be called first (or
/// [`Self::optionally_notify`] used).
fn optionally_notify_skipping_background_events<F: Fn() -> NotifyOption, C: AChannelManager>
(cm: &'a C, persist_check: F) -> PersistenceNotifierGuard<'a, F> {
let read_guard = cm.get_cm().total_consistency_lock.read().unwrap();
PersistenceNotifierGuard {
persistence_notifier: notifier,
event_persist_notifier: &cm.get_cm().event_persist_notifier,
needs_persist_flag: &cm.get_cm().needs_persist_flag,
should_persist: persist_check,
_read_guard: read_guard,
}
}
}
impl<'a, F: Fn() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> {
impl<'a, F: FnMut() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> {
fn drop(&mut self) {
if (self.should_persist)() == NotifyOption::DoPersist {
self.persistence_notifier.notify();
match (self.should_persist)() {
NotifyOption::DoPersist => {
self.needs_persist_flag.store(true, Ordering::Release);
self.event_persist_notifier.notify()
},
NotifyOption::SkipPersistHandleEvents =>
self.event_persist_notifier.notify(),
NotifyOption::SkipPersistNoEvents => {},
}
}
}
@ -2085,7 +2123,7 @@ macro_rules! process_events_body {
return;
}
let mut result = NotifyOption::SkipPersist;
let mut result;
{
// We'll acquire our total consistency lock so that we can be sure no other
@ -2094,7 +2132,7 @@ macro_rules! process_events_body {
// Because `handle_post_event_actions` may send `ChannelMonitorUpdate`s to the user we must
// ensure any startup-generated background events are handled first.
if $self.process_background_events() == NotifyOption::DoPersist { result = NotifyOption::DoPersist; }
result = $self.process_background_events();
// TODO: This behavior should be documented. It's unintuitive that we query
// ChannelMonitors when clearing other events.
@ -2134,8 +2172,14 @@ macro_rules! process_events_body {
processed_all_events = false;
}
if result == NotifyOption::DoPersist {
$self.persistence_notifier.notify();
match result {
NotifyOption::DoPersist => {
$self.needs_persist_flag.store(true, Ordering::Release);
$self.event_persist_notifier.notify();
},
NotifyOption::SkipPersistHandleEvents =>
$self.event_persist_notifier.notify(),
NotifyOption::SkipPersistNoEvents => {},
}
}
}
@ -2214,7 +2258,9 @@ where
pending_background_events: Mutex::new(Vec::new()),
total_consistency_lock: RwLock::new(()),
background_events_processed_since_startup: AtomicBool::new(false),
persistence_notifier: Notifier::new(),
event_persist_notifier: Notifier::new(),
needs_persist_flag: AtomicBool::new(false),
entropy_source,
node_signer,
@ -4339,7 +4385,7 @@ where
let mut background_events = Vec::new();
mem::swap(&mut *self.pending_background_events.lock().unwrap(), &mut background_events);
if background_events.is_empty() {
return NotifyOption::SkipPersist;
return NotifyOption::SkipPersistNoEvents;
}
for event in background_events.drain(..) {
@ -4408,17 +4454,17 @@ where
}
fn update_channel_fee(&self, chan_id: &ChannelId, chan: &mut Channel<SP>, new_feerate: u32) -> NotifyOption {
if !chan.context.is_outbound() { return NotifyOption::SkipPersist; }
if !chan.context.is_outbound() { return NotifyOption::SkipPersistNoEvents; }
// If the feerate has decreased by less than half, don't bother
if new_feerate <= chan.context.get_feerate_sat_per_1000_weight() && new_feerate * 2 > chan.context.get_feerate_sat_per_1000_weight() {
log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {}.",
&chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
return NotifyOption::SkipPersist;
chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
return NotifyOption::SkipPersistNoEvents;
}
if !chan.context.is_live() {
log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {} as it cannot currently be updated (probably the peer is disconnected).",
&chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
return NotifyOption::SkipPersist;
chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
return NotifyOption::SkipPersistNoEvents;
}
log_trace!(self.logger, "Channel {} qualifies for a feerate change from {} to {}.",
&chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
@ -4433,8 +4479,8 @@ where
/// these a fuzz failure (as they usually indicate a channel force-close, which is exactly what
/// it wants to detect). Thus, we have a variant exposed here for its benefit.
pub fn maybe_update_chan_fees(&self) {
PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
let mut should_persist = self.process_background_events();
PersistenceNotifierGuard::optionally_notify(self, || {
let mut should_persist = NotifyOption::SkipPersistNoEvents;
let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum);
@ -4478,8 +4524,8 @@ where
/// [`ChannelUpdate`]: msgs::ChannelUpdate
/// [`ChannelConfig`]: crate::util::config::ChannelConfig
pub fn timer_tick_occurred(&self) {
PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
let mut should_persist = self.process_background_events();
PersistenceNotifierGuard::optionally_notify(self, || {
let mut should_persist = NotifyOption::SkipPersistNoEvents;
let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum);
@ -5557,6 +5603,8 @@ where
}
fn internal_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<(), MsgHandleErrInternal> {
// Note that the ChannelManager is NOT re-persisted on disk after this, so any changes are
// likely to be lost on restart!
if msg.chain_hash != self.genesis_hash {
return Err(MsgHandleErrInternal::send_err_msg_no_close("Unknown genesis block hash".to_owned(), msg.temporary_channel_id.clone()));
}
@ -5656,6 +5704,8 @@ where
}
fn internal_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) -> Result<(), MsgHandleErrInternal> {
// Note that the ChannelManager is NOT re-persisted on disk after this, so any changes are
// likely to be lost on restart!
let (value, output_script, user_id) = {
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex = per_peer_state.get(counterparty_node_id)
@ -5816,6 +5866,8 @@ where
}
fn internal_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) -> Result<(), MsgHandleErrInternal> {
// Note that the ChannelManager is NOT re-persisted on disk after this (unless we error
// closing a channel), so any changes are likely to be lost on restart!
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex = per_peer_state.get(counterparty_node_id)
.ok_or_else(|| {
@ -5994,6 +6046,9 @@ where
//encrypted with the same key. It's not immediately obvious how to usefully exploit that,
//but we should prevent it anyway.
// Note that the ChannelManager is NOT re-persisted on disk after this (unless we error
// closing a channel), so any changes are likely to be lost on restart!
let decoded_hop_res = self.decode_update_add_htlc_onion(msg);
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex = per_peer_state.get(counterparty_node_id)
@ -6086,6 +6141,8 @@ where
}
fn internal_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<(), MsgHandleErrInternal> {
// Note that the ChannelManager is NOT re-persisted on disk after this (unless we error
// closing a channel), so any changes are likely to be lost on restart!
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex = per_peer_state.get(counterparty_node_id)
.ok_or_else(|| {
@ -6109,6 +6166,8 @@ where
}
fn internal_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) -> Result<(), MsgHandleErrInternal> {
// Note that the ChannelManager is NOT re-persisted on disk after this (unless we error
// closing a channel), so any changes are likely to be lost on restart!
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex = per_peer_state.get(counterparty_node_id)
.ok_or_else(|| {
@ -6397,19 +6456,19 @@ where
Ok(())
}
/// Returns ShouldPersist if anything changed, otherwise either SkipPersist or an Err.
/// Returns DoPersist if anything changed, otherwise either SkipPersistNoEvents or an Err.
fn internal_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) -> Result<NotifyOption, MsgHandleErrInternal> {
let (chan_counterparty_node_id, chan_id) = match self.short_to_chan_info.read().unwrap().get(&msg.contents.short_channel_id) {
Some((cp_id, chan_id)) => (cp_id.clone(), chan_id.clone()),
None => {
// It's not a local channel
return Ok(NotifyOption::SkipPersist)
return Ok(NotifyOption::SkipPersistNoEvents)
}
};
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex_opt = per_peer_state.get(&chan_counterparty_node_id);
if peer_state_mutex_opt.is_none() {
return Ok(NotifyOption::SkipPersist)
return Ok(NotifyOption::SkipPersistNoEvents)
}
let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
let peer_state = &mut *peer_state_lock;
@ -6421,14 +6480,14 @@ where
// If the announcement is about a channel of ours which is public, some
// other peer may simply be forwarding all its gossip to us. Don't provide
// a scary-looking error message and return Ok instead.
return Ok(NotifyOption::SkipPersist);
return Ok(NotifyOption::SkipPersistNoEvents);
}
return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a channel_update for a channel from the wrong node - it shouldn't know about our private channels!".to_owned(), chan_id));
}
let were_node_one = self.get_our_node_id().serialize()[..] < chan.context.get_counterparty_node_id().serialize()[..];
let msg_from_node_one = msg.contents.flags & 1 == 0;
if were_node_one == msg_from_node_one {
return Ok(NotifyOption::SkipPersist);
return Ok(NotifyOption::SkipPersistNoEvents);
} else {
log_debug!(self.logger, "Received channel_update for channel {}.", chan_id);
try_chan_phase_entry!(self, chan.channel_update(&msg), chan_phase_entry);
@ -6438,12 +6497,12 @@ where
"Got a channel_update for an unfunded channel!".into())), chan_phase_entry);
}
},
hash_map::Entry::Vacant(_) => return Ok(NotifyOption::SkipPersist)
hash_map::Entry::Vacant(_) => return Ok(NotifyOption::SkipPersistNoEvents)
}
Ok(NotifyOption::DoPersist)
}
fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(), MsgHandleErrInternal> {
fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<NotifyOption, MsgHandleErrInternal> {
let htlc_forwards;
let need_lnd_workaround = {
let per_peer_state = self.per_peer_state.read().unwrap();
@ -6499,14 +6558,16 @@ where
}
};
let mut persist = NotifyOption::SkipPersistHandleEvents;
if let Some(forwards) = htlc_forwards {
self.forward_htlcs(&mut [forwards][..]);
persist = NotifyOption::DoPersist;
}
if let Some(channel_ready_msg) = need_lnd_workaround {
self.internal_channel_ready(counterparty_node_id, &channel_ready_msg)?;
}
Ok(())
Ok(persist)
}
/// Process pending events from the [`chain::Watch`], returning whether any events were processed.
@ -7056,8 +7117,8 @@ where
/// the `MessageSendEvent`s to the specific peer they were generated under.
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
let events = RefCell::new(Vec::new());
PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
let mut result = self.process_background_events();
PersistenceNotifierGuard::optionally_notify(self, || {
let mut result = NotifyOption::SkipPersistNoEvents;
// TODO: This behavior should be documented. It's unintuitive that we query
// ChannelMonitors when clearing other events.
@ -7138,8 +7199,9 @@ where
}
fn block_disconnected(&self, header: &BlockHeader, height: u32) {
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock,
&self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist });
let _persistence_guard =
PersistenceNotifierGuard::optionally_notify_skipping_background_events(
self, || -> NotifyOption { NotifyOption::DoPersist });
let new_height = height - 1;
{
let mut best_block = self.best_block.write().unwrap();
@ -7173,8 +7235,9 @@ where
let block_hash = header.block_hash();
log_trace!(self.logger, "{} transactions included in block {} at height {} provided", txdata.len(), block_hash, height);
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock,
&self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist });
let _persistence_guard =
PersistenceNotifierGuard::optionally_notify_skipping_background_events(
self, || -> NotifyOption { NotifyOption::DoPersist });
self.do_chain_event(Some(height), |channel| channel.transactions_confirmed(&block_hash, height, txdata, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger)
.map(|(a, b)| (a, Vec::new(), b)));
@ -7193,8 +7256,9 @@ where
let block_hash = header.block_hash();
log_trace!(self.logger, "New best block: {} at height {}", block_hash, height);
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock,
&self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist });
let _persistence_guard =
PersistenceNotifierGuard::optionally_notify_skipping_background_events(
self, || -> NotifyOption { NotifyOption::DoPersist });
*self.best_block.write().unwrap() = BestBlock::new(block_hash, height);
self.do_chain_event(Some(height), |channel| channel.best_block_updated(height, header.time, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger));
@ -7237,8 +7301,9 @@ where
}
fn transaction_unconfirmed(&self, txid: &Txid) {
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock,
&self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist });
let _persistence_guard =
PersistenceNotifierGuard::optionally_notify_skipping_background_events(
self, || -> NotifyOption { NotifyOption::DoPersist });
self.do_chain_event(None, |channel| {
if let Some(funding_txo) = channel.context.get_funding_txo() {
if funding_txo.txid == *txid {
@ -7421,18 +7486,26 @@ where
}
}
/// Gets a [`Future`] that completes when this [`ChannelManager`] needs to be persisted.
/// Gets a [`Future`] that completes when this [`ChannelManager`] may need to be persisted or
/// may have events that need processing.
///
/// In order to check if this [`ChannelManager`] needs persisting, call
/// [`Self::get_and_clear_needs_persistence`].
///
/// Note that callbacks registered on the [`Future`] MUST NOT call back into this
/// [`ChannelManager`] and should instead register actions to be taken later.
///
pub fn get_persistable_update_future(&self) -> Future {
self.persistence_notifier.get_future()
pub fn get_event_or_persistence_needed_future(&self) -> Future {
self.event_persist_notifier.get_future()
}
/// Returns true if this [`ChannelManager`] needs to be persisted.
pub fn get_and_clear_needs_persistence(&self) -> bool {
self.needs_persist_flag.swap(false, Ordering::AcqRel)
}
#[cfg(any(test, feature = "_test_utils"))]
pub fn get_persistence_condvar_value(&self) -> bool {
self.persistence_notifier.notify_pending()
pub fn get_event_or_persist_condvar_value(&self) -> bool {
self.event_persist_notifier.notify_pending()
}
/// Gets the latest best block which was connected either via the [`chain::Listen`] or
@ -7489,8 +7562,21 @@ where
L::Target: Logger,
{
fn handle_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
let _ = handle_error!(self, self.internal_open_channel(counterparty_node_id, msg), *counterparty_node_id);
// Note that we never need to persist the updated ChannelManager for an inbound
// open_channel message - pre-funded channels are never written so there should be no
// change to the contents.
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
let res = self.internal_open_channel(counterparty_node_id, msg);
let persist = match &res {
Err(e) if e.closes_channel() => {
debug_assert!(false, "We shouldn't close a new channel");
NotifyOption::DoPersist
},
_ => NotifyOption::SkipPersistHandleEvents,
};
let _ = handle_error!(self, res, *counterparty_node_id);
persist
});
}
fn handle_open_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannelV2) {
@ -7500,8 +7586,13 @@ where
}
fn handle_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, msg), *counterparty_node_id);
// Note that we never need to persist the updated ChannelManager for an inbound
// accept_channel message - pre-funded channels are never written so there should be no
// change to the contents.
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, msg), *counterparty_node_id);
NotifyOption::SkipPersistHandleEvents
});
}
fn handle_accept_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannelV2) {
@ -7521,8 +7612,19 @@ where
}
fn handle_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
let _ = handle_error!(self, self.internal_channel_ready(counterparty_node_id, msg), *counterparty_node_id);
// Note that we never need to persist the updated ChannelManager for an inbound
// channel_ready message - while the channel's state will change, any channel_ready message
// will ultimately be re-sent on startup and the `ChannelMonitor` won't be updated so we
// will not force-close the channel on startup.
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
let res = self.internal_channel_ready(counterparty_node_id, msg);
let persist = match &res {
Err(e) if e.closes_channel() => NotifyOption::DoPersist,
_ => NotifyOption::SkipPersistHandleEvents,
};
let _ = handle_error!(self, res, *counterparty_node_id);
persist
});
}
fn handle_shutdown(&self, counterparty_node_id: &PublicKey, msg: &msgs::Shutdown) {
@ -7536,8 +7638,19 @@ where
}
fn handle_update_add_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
let _ = handle_error!(self, self.internal_update_add_htlc(counterparty_node_id, msg), *counterparty_node_id);
// Note that we never need to persist the updated ChannelManager for an inbound
// update_add_htlc message - the message itself doesn't change our channel state only the
// `commitment_signed` message afterwards will.
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
let res = self.internal_update_add_htlc(counterparty_node_id, msg);
let persist = match &res {
Err(e) if e.closes_channel() => NotifyOption::DoPersist,
Err(_) => NotifyOption::SkipPersistHandleEvents,
Ok(()) => NotifyOption::SkipPersistNoEvents,
};
let _ = handle_error!(self, res, *counterparty_node_id);
persist
});
}
fn handle_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) {
@ -7546,13 +7659,35 @@ where
}
fn handle_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
let _ = handle_error!(self, self.internal_update_fail_htlc(counterparty_node_id, msg), *counterparty_node_id);
// Note that we never need to persist the updated ChannelManager for an inbound
// update_fail_htlc message - the message itself doesn't change our channel state only the
// `commitment_signed` message afterwards will.
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
let res = self.internal_update_fail_htlc(counterparty_node_id, msg);
let persist = match &res {
Err(e) if e.closes_channel() => NotifyOption::DoPersist,
Err(_) => NotifyOption::SkipPersistHandleEvents,
Ok(()) => NotifyOption::SkipPersistNoEvents,
};
let _ = handle_error!(self, res, *counterparty_node_id);
persist
});
}
fn handle_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
let _ = handle_error!(self, self.internal_update_fail_malformed_htlc(counterparty_node_id, msg), *counterparty_node_id);
// Note that we never need to persist the updated ChannelManager for an inbound
// update_fail_malformed_htlc message - the message itself doesn't change our channel state
// only the `commitment_signed` message afterwards will.
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
let res = self.internal_update_fail_malformed_htlc(counterparty_node_id, msg);
let persist = match &res {
Err(e) if e.closes_channel() => NotifyOption::DoPersist,
Err(_) => NotifyOption::SkipPersistHandleEvents,
Ok(()) => NotifyOption::SkipPersistNoEvents,
};
let _ = handle_error!(self, res, *counterparty_node_id);
persist
});
}
fn handle_commitment_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::CommitmentSigned) {
@ -7566,8 +7701,19 @@ where
}
fn handle_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
let _ = handle_error!(self, self.internal_update_fee(counterparty_node_id, msg), *counterparty_node_id);
// Note that we never need to persist the updated ChannelManager for an inbound
// update_fee message - the message itself doesn't change our channel state only the
// `commitment_signed` message afterwards will.
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
let res = self.internal_update_fee(counterparty_node_id, msg);
let persist = match &res {
Err(e) if e.closes_channel() => NotifyOption::DoPersist,
Err(_) => NotifyOption::SkipPersistHandleEvents,
Ok(()) => NotifyOption::SkipPersistNoEvents,
};
let _ = handle_error!(self, res, *counterparty_node_id);
persist
});
}
fn handle_announcement_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) {
@ -7576,23 +7722,32 @@ where
}
fn handle_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) {
PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
let force_persist = self.process_background_events();
PersistenceNotifierGuard::optionally_notify(self, || {
if let Ok(persist) = handle_error!(self, self.internal_channel_update(counterparty_node_id, msg), *counterparty_node_id) {
if force_persist == NotifyOption::DoPersist { NotifyOption::DoPersist } else { persist }
persist
} else {
NotifyOption::SkipPersist
NotifyOption::DoPersist
}
});
}
fn handle_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
let _ = handle_error!(self, self.internal_channel_reestablish(counterparty_node_id, msg), *counterparty_node_id);
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
let res = self.internal_channel_reestablish(counterparty_node_id, msg);
let persist = match &res {
Err(e) if e.closes_channel() => NotifyOption::DoPersist,
Err(_) => NotifyOption::SkipPersistHandleEvents,
Ok(persist) => *persist,
};
let _ = handle_error!(self, res, *counterparty_node_id);
persist
});
}
fn peer_disconnected(&self, counterparty_node_id: &PublicKey) {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(
self, || NotifyOption::SkipPersistHandleEvents);
let mut failed_channels = Vec::new();
let mut per_peer_state = self.per_peer_state.write().unwrap();
let remove_peer = {
@ -7691,76 +7846,82 @@ where
return Err(());
}
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
let mut res = Ok(());
// If we have too many peers connected which don't have funded channels, disconnect the
// peer immediately (as long as it doesn't have funded channels). If we have a bunch of
// unfunded channels taking up space in memory for disconnected peers, we still let new
// peers connect, but we'll reject new channels from them.
let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected);
let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS;
PersistenceNotifierGuard::optionally_notify(self, || {
// If we have too many peers connected which don't have funded channels, disconnect the
// peer immediately (as long as it doesn't have funded channels). If we have a bunch of
// unfunded channels taking up space in memory for disconnected peers, we still let new
// peers connect, but we'll reject new channels from them.
let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected);
let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS;
{
let mut peer_state_lock = self.per_peer_state.write().unwrap();
match peer_state_lock.entry(counterparty_node_id.clone()) {
hash_map::Entry::Vacant(e) => {
if inbound_peer_limited {
return Err(());
}
e.insert(Mutex::new(PeerState {
channel_by_id: HashMap::new(),
inbound_channel_request_by_id: HashMap::new(),
latest_features: init_msg.features.clone(),
pending_msg_events: Vec::new(),
in_flight_monitor_updates: BTreeMap::new(),
monitor_update_blocked_actions: BTreeMap::new(),
actions_blocking_raa_monitor_updates: BTreeMap::new(),
is_connected: true,
}));
},
hash_map::Entry::Occupied(e) => {
let mut peer_state = e.get().lock().unwrap();
peer_state.latest_features = init_msg.features.clone();
{
let mut peer_state_lock = self.per_peer_state.write().unwrap();
match peer_state_lock.entry(counterparty_node_id.clone()) {
hash_map::Entry::Vacant(e) => {
if inbound_peer_limited {
res = Err(());
return NotifyOption::SkipPersistNoEvents;
}
e.insert(Mutex::new(PeerState {
channel_by_id: HashMap::new(),
inbound_channel_request_by_id: HashMap::new(),
latest_features: init_msg.features.clone(),
pending_msg_events: Vec::new(),
in_flight_monitor_updates: BTreeMap::new(),
monitor_update_blocked_actions: BTreeMap::new(),
actions_blocking_raa_monitor_updates: BTreeMap::new(),
is_connected: true,
}));
},
hash_map::Entry::Occupied(e) => {
let mut peer_state = e.get().lock().unwrap();
peer_state.latest_features = init_msg.features.clone();
let best_block_height = self.best_block.read().unwrap().height();
if inbound_peer_limited &&
Self::unfunded_channel_count(&*peer_state, best_block_height) ==
peer_state.channel_by_id.len()
{
return Err(());
}
let best_block_height = self.best_block.read().unwrap().height();
if inbound_peer_limited &&
Self::unfunded_channel_count(&*peer_state, best_block_height) ==
peer_state.channel_by_id.len()
{
res = Err(());
return NotifyOption::SkipPersistNoEvents;
}
debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice");
peer_state.is_connected = true;
},
}
}
log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id));
let per_peer_state = self.per_peer_state.read().unwrap();
if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) {
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
let pending_msg_events = &mut peer_state.pending_msg_events;
peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)|
if let ChannelPhase::Funded(chan) = phase { Some(chan) } else {
// Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted
// (so won't be recovered after a crash), they shouldn't exist here and we would never need to
// worry about closing and removing them.
debug_assert!(false);
None
debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice");
peer_state.is_connected = true;
},
}
).for_each(|chan| {
pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
node_id: chan.context.get_counterparty_node_id(),
msg: chan.get_channel_reestablish(&self.logger),
}
log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id));
let per_peer_state = self.per_peer_state.read().unwrap();
if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) {
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
let pending_msg_events = &mut peer_state.pending_msg_events;
peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)|
if let ChannelPhase::Funded(chan) = phase { Some(chan) } else {
// Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted
// (so won't be recovered after a crash), they shouldn't exist here and we would never need to
// worry about closing and removing them.
debug_assert!(false);
None
}
).for_each(|chan| {
pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
node_id: chan.context.get_counterparty_node_id(),
msg: chan.get_channel_reestablish(&self.logger),
});
});
});
}
//TODO: Also re-broadcast announcement_signatures
Ok(())
}
return NotifyOption::SkipPersistHandleEvents;
//TODO: Also re-broadcast announcement_signatures
});
res
}
fn handle_error(&self, counterparty_node_id: &PublicKey, msg: &msgs::ErrorMessage) {
@ -9604,7 +9765,9 @@ where
pending_background_events: Mutex::new(pending_background_events),
total_consistency_lock: RwLock::new(()),
background_events_processed_since_startup: AtomicBool::new(false),
persistence_notifier: Notifier::new(),
event_persist_notifier: Notifier::new(),
needs_persist_flag: AtomicBool::new(false),
entropy_source: args.entropy_source,
node_signer: args.node_signer,
@ -9666,9 +9829,9 @@ mod tests {
// All nodes start with a persistable update pending as `create_network` connects each node
// with all other nodes to make most tests simpler.
assert!(nodes[0].node.get_persistable_update_future().poll_is_complete());
assert!(nodes[1].node.get_persistable_update_future().poll_is_complete());
assert!(nodes[2].node.get_persistable_update_future().poll_is_complete());
assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
assert!(nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete());
let mut chan = create_announced_chan_between_nodes(&nodes, 0, 1);
@ -9682,19 +9845,19 @@ mod tests {
&nodes[0].node.get_our_node_id()).pop().unwrap();
// The first two nodes (which opened a channel) should now require fresh persistence
assert!(nodes[0].node.get_persistable_update_future().poll_is_complete());
assert!(nodes[1].node.get_persistable_update_future().poll_is_complete());
assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
// ... but the last node should not.
assert!(!nodes[2].node.get_persistable_update_future().poll_is_complete());
assert!(!nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete());
// After persisting the first two nodes they should no longer need fresh persistence.
assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete());
assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete());
assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
// Node 3, unrelated to the only channel, shouldn't care if it receives a channel_update
// about the channel.
nodes[2].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &chan.0);
nodes[2].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &chan.1);
assert!(!nodes[2].node.get_persistable_update_future().poll_is_complete());
assert!(!nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete());
// The nodes which are a party to the channel should also ignore messages from unrelated
// parties.
@ -9702,8 +9865,8 @@ mod tests {
nodes[0].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.1);
nodes[1].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.0);
nodes[1].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.1);
assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete());
assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete());
assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
// At this point the channel info given by peers should still be the same.
assert_eq!(nodes[0].node.list_channels()[0], node_a_chan_info);
@ -9720,8 +9883,8 @@ mod tests {
// persisted and that its channel info remains the same.
nodes[0].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &as_update);
nodes[1].node.handle_channel_update(&nodes[0].node.get_our_node_id(), &bs_update);
assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete());
assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete());
assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
assert_eq!(nodes[0].node.list_channels()[0], node_a_chan_info);
assert_eq!(nodes[1].node.list_channels()[0], node_b_chan_info);
@ -9729,8 +9892,8 @@ mod tests {
// the channel info has updated.
nodes[0].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &bs_update);
nodes[1].node.handle_channel_update(&nodes[0].node.get_our_node_id(), &as_update);
assert!(nodes[0].node.get_persistable_update_future().poll_is_complete());
assert!(nodes[1].node.get_persistable_update_future().poll_is_complete());
assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
assert_ne!(nodes[0].node.list_channels()[0], node_a_chan_info);
assert_ne!(nodes[1].node.list_channels()[0], node_b_chan_info);
}