diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index eeeab90a8..1df6db7db 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1233,7 +1233,7 @@ enum NotifyOption { /// We allow callers to either always notify by constructing with `notify_on_drop` or choose to /// notify or not based on whether relevant changes have been made, providing a closure to /// `optionally_notify` which returns a `NotifyOption`. -struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> { +struct PersistenceNotifierGuard<'a, F: FnMut() -> NotifyOption> { event_persist_notifier: &'a Notifier, needs_persist_flag: &'a AtomicBool, should_persist: F, @@ -1248,12 +1248,12 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w /// This must always be called if the changes included a `ChannelMonitorUpdate`, as well as in /// other cases where losing the changes on restart may result in a force-close or otherwise /// isn't ideal. - fn notify_on_drop(cm: &'a C) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> { + fn notify_on_drop(cm: &'a C) -> PersistenceNotifierGuard<'a, impl FnMut() -> NotifyOption> { Self::optionally_notify(cm, || -> NotifyOption { NotifyOption::DoPersist }) } - fn optionally_notify NotifyOption, C: AChannelManager>(cm: &'a C, persist_check: F) - -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> { + fn optionally_notify NotifyOption, C: AChannelManager>(cm: &'a C, mut persist_check: F) + -> PersistenceNotifierGuard<'a, impl FnMut() -> NotifyOption> { let read_guard = cm.get_cm().total_consistency_lock.read().unwrap(); let force_notify = cm.get_cm().process_background_events(); @@ -1292,7 +1292,7 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w } } -impl<'a, F: Fn() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> { +impl<'a, F: FnMut() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> { fn drop(&mut self) { match (self.should_persist)() { NotifyOption::DoPersist => { @@ -7778,76 +7778,82 @@ where return Err(()); } - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); + let mut res = Ok(()); - // If we have too many peers connected which don't have funded channels, disconnect the - // peer immediately (as long as it doesn't have funded channels). If we have a bunch of - // unfunded channels taking up space in memory for disconnected peers, we still let new - // peers connect, but we'll reject new channels from them. - let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected); - let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS; + PersistenceNotifierGuard::optionally_notify(self, || { + // If we have too many peers connected which don't have funded channels, disconnect the + // peer immediately (as long as it doesn't have funded channels). If we have a bunch of + // unfunded channels taking up space in memory for disconnected peers, we still let new + // peers connect, but we'll reject new channels from them. + let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected); + let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS; - { - let mut peer_state_lock = self.per_peer_state.write().unwrap(); - match peer_state_lock.entry(counterparty_node_id.clone()) { - hash_map::Entry::Vacant(e) => { - if inbound_peer_limited { - return Err(()); - } - e.insert(Mutex::new(PeerState { - channel_by_id: HashMap::new(), - inbound_channel_request_by_id: HashMap::new(), - latest_features: init_msg.features.clone(), - pending_msg_events: Vec::new(), - in_flight_monitor_updates: BTreeMap::new(), - monitor_update_blocked_actions: BTreeMap::new(), - actions_blocking_raa_monitor_updates: BTreeMap::new(), - is_connected: true, - })); - }, - hash_map::Entry::Occupied(e) => { - let mut peer_state = e.get().lock().unwrap(); - peer_state.latest_features = init_msg.features.clone(); + { + let mut peer_state_lock = self.per_peer_state.write().unwrap(); + match peer_state_lock.entry(counterparty_node_id.clone()) { + hash_map::Entry::Vacant(e) => { + if inbound_peer_limited { + res = Err(()); + return NotifyOption::SkipPersistNoEvents; + } + e.insert(Mutex::new(PeerState { + channel_by_id: HashMap::new(), + inbound_channel_request_by_id: HashMap::new(), + latest_features: init_msg.features.clone(), + pending_msg_events: Vec::new(), + in_flight_monitor_updates: BTreeMap::new(), + monitor_update_blocked_actions: BTreeMap::new(), + actions_blocking_raa_monitor_updates: BTreeMap::new(), + is_connected: true, + })); + }, + hash_map::Entry::Occupied(e) => { + let mut peer_state = e.get().lock().unwrap(); + peer_state.latest_features = init_msg.features.clone(); - let best_block_height = self.best_block.read().unwrap().height(); - if inbound_peer_limited && - Self::unfunded_channel_count(&*peer_state, best_block_height) == - peer_state.channel_by_id.len() - { - return Err(()); - } + let best_block_height = self.best_block.read().unwrap().height(); + if inbound_peer_limited && + Self::unfunded_channel_count(&*peer_state, best_block_height) == + peer_state.channel_by_id.len() + { + res = Err(()); + return NotifyOption::SkipPersistNoEvents; + } - debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice"); - peer_state.is_connected = true; - }, - } - } - - log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id)); - - let per_peer_state = self.per_peer_state.read().unwrap(); - if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) { - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; - let pending_msg_events = &mut peer_state.pending_msg_events; - - peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)| - if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { - // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted - // (so won't be recovered after a crash), they shouldn't exist here and we would never need to - // worry about closing and removing them. - debug_assert!(false); - None + debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice"); + peer_state.is_connected = true; + }, } - ).for_each(|chan| { - pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish { - node_id: chan.context.get_counterparty_node_id(), - msg: chan.get_channel_reestablish(&self.logger), + } + + log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id)); + + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let pending_msg_events = &mut peer_state.pending_msg_events; + + peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)| + if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { + // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted + // (so won't be recovered after a crash), they shouldn't exist here and we would never need to + // worry about closing and removing them. + debug_assert!(false); + None + } + ).for_each(|chan| { + pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish { + node_id: chan.context.get_counterparty_node_id(), + msg: chan.get_channel_reestablish(&self.logger), + }); }); - }); - } - //TODO: Also re-broadcast announcement_signatures - Ok(()) + } + + return NotifyOption::SkipPersistHandleEvents; + //TODO: Also re-broadcast announcement_signatures + }); + res } fn handle_error(&self, counterparty_node_id: &PublicKey, msg: &msgs::ErrorMessage) {