Merge pull request #872 from valentinewallace/timer-tick-in-bg-processor

Call timer_tick_occurred in BackgroundProcessor
This commit is contained in:
Matt Corallo 2021-04-13 01:37:52 +00:00 committed by GitHub
commit 3d51b11fe9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 24 additions and 22 deletions

View file

@ -27,7 +27,8 @@ use std::time::{Duration, Instant};
/// * Monitoring whether the ChannelManager needs to be re-persisted to disk, and if so, /// * Monitoring whether the ChannelManager needs to be re-persisted to disk, and if so,
/// writing it to disk/backups by invoking the callback given to it at startup. /// writing it to disk/backups by invoking the callback given to it at startup.
/// ChannelManager persistence should be done in the background. /// ChannelManager persistence should be done in the background.
/// * Calling `ChannelManager::timer_chan_freshness_every_min()` every minute (can be done in the /// * Calling `ChannelManager::timer_tick_occurred()` and
/// `PeerManager::timer_tick_occurred()` every minute (can be done in the
/// background). /// background).
/// ///
/// Note that if ChannelManager persistence fails and the persisted manager becomes out-of-date, /// Note that if ChannelManager persistence fails and the persisted manager becomes out-of-date,
@ -42,9 +43,9 @@ pub struct BackgroundProcessor {
} }
#[cfg(not(test))] #[cfg(not(test))]
const CHAN_FRESHNESS_TIMER: u64 = 60; const FRESHNESS_TIMER: u64 = 60;
#[cfg(test)] #[cfg(test)]
const CHAN_FRESHNESS_TIMER: u64 = 1; const FRESHNESS_TIMER: u64 = 1;
impl BackgroundProcessor { impl BackgroundProcessor {
/// Start a background thread that takes care of responsibilities enumerated in the top-level /// Start a background thread that takes care of responsibilities enumerated in the top-level
@ -101,9 +102,10 @@ impl BackgroundProcessor {
log_trace!(logger, "Terminating background processor."); log_trace!(logger, "Terminating background processor.");
return Ok(()); return Ok(());
} }
if current_time.elapsed().as_secs() > CHAN_FRESHNESS_TIMER { if current_time.elapsed().as_secs() > FRESHNESS_TIMER {
log_trace!(logger, "Calling manager's timer_chan_freshness_every_min"); log_trace!(logger, "Calling ChannelManager's and PeerManager's timer_tick_occurred");
channel_manager.timer_chan_freshness_every_min(); channel_manager.timer_tick_occurred();
peer_manager.timer_tick_occurred();
current_time = Instant::now(); current_time = Instant::now();
} }
} }
@ -294,16 +296,16 @@ mod tests {
} }
#[test] #[test]
fn test_chan_freshness_called() { fn test_timer_tick_called() {
// Test that ChannelManager's `timer_chan_freshness_every_min` is called every // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
// `CHAN_FRESHNESS_TIMER`. // `FRESHNESS_TIMER`.
let nodes = create_nodes(1, "test_chan_freshness_called".to_string()); let nodes = create_nodes(1, "test_timer_tick_called".to_string());
let data_dir = nodes[0].persister.get_data_dir(); let data_dir = nodes[0].persister.get_data_dir();
let callback = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node); let callback = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
loop { loop {
let log_entries = nodes[0].logger.lines.lock().unwrap(); let log_entries = nodes[0].logger.lines.lock().unwrap();
let desired_log = "Calling manager's timer_chan_freshness_every_min".to_string(); let desired_log = "Calling ChannelManager's and PeerManager's timer_tick_occurred".to_string();
if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() { if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() {
break break
} }

View file

@ -250,7 +250,7 @@ pub const INITIAL_COMMITMENT_NUMBER: u64 = (1 << 48) - 1;
/// Liveness is called to fluctuate given peer disconnecton/monitor failures/closing. /// Liveness is called to fluctuate given peer disconnecton/monitor failures/closing.
/// If channel is public, network should have a liveness view announced by us on a /// If channel is public, network should have a liveness view announced by us on a
/// best-effort, which means we may filter out some status transitions to avoid spam. /// best-effort, which means we may filter out some status transitions to avoid spam.
/// See further timer_chan_freshness_every_min. /// See further timer_tick_occurred.
#[derive(PartialEq)] #[derive(PartialEq)]
enum UpdateStatus { enum UpdateStatus {
/// Status has been gossiped. /// Status has been gossiped.

View file

@ -338,7 +338,7 @@ pub(super) struct ChannelHolder<Signer: Sign> {
} }
/// Events which we process internally but cannot be procsesed immediately at the generation site /// Events which we process internally but cannot be procsesed immediately at the generation site
/// for some reason. They are handled in timer_chan_freshness_every_min, so may be processed with /// for some reason. They are handled in timer_tick_occurred, so may be processed with
/// quite some time lag. /// quite some time lag.
enum BackgroundEvent { enum BackgroundEvent {
/// Handle a ChannelMonitorUpdate that closes a channel, broadcasting its current latest holder /// Handle a ChannelMonitorUpdate that closes a channel, broadcasting its current latest holder
@ -403,7 +403,7 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L> = ChannelManage
/// ChannelUpdate messages informing peers that the channel is temporarily disabled. To avoid /// ChannelUpdate messages informing peers that the channel is temporarily disabled. To avoid
/// spam due to quick disconnection/reconnection, updates are not sent until the channel has been /// spam due to quick disconnection/reconnection, updates are not sent until the channel has been
/// offline for a full minute. In order to track this, you must call /// offline for a full minute. In order to track this, you must call
/// timer_chan_freshness_every_min roughly once per minute, though it doesn't have to be perfect. /// timer_tick_occurred roughly once per minute, though it doesn't have to be perfect.
/// ///
/// Rather than using a plain ChannelManager, it is preferable to use either a SimpleArcChannelManager /// Rather than using a plain ChannelManager, it is preferable to use either a SimpleArcChannelManager
/// a SimpleRefChannelManager, for conciseness. See their documentation for more details, but /// a SimpleRefChannelManager, for conciseness. See their documentation for more details, but
@ -1959,10 +1959,10 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
events.append(&mut new_events); events.append(&mut new_events);
} }
/// Free the background events, generally called from timer_chan_freshness_every_min. /// Free the background events, generally called from timer_tick_occurred.
/// ///
/// Exposed for testing to allow us to process events quickly without generating accidental /// Exposed for testing to allow us to process events quickly without generating accidental
/// BroadcastChannelUpdate events in timer_chan_freshness_every_min. /// BroadcastChannelUpdate events in timer_tick_occurred.
/// ///
/// Expects the caller to have a total_consistency_lock read lock. /// Expects the caller to have a total_consistency_lock read lock.
fn process_background_events(&self) { fn process_background_events(&self) {
@ -1991,7 +1991,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
/// This method handles all the details, and must be called roughly once per minute. /// This method handles all the details, and must be called roughly once per minute.
/// ///
/// Note that in some rare cases this may generate a `chain::Watch::update_channel` call. /// Note that in some rare cases this may generate a `chain::Watch::update_channel` call.
pub fn timer_chan_freshness_every_min(&self) { pub fn timer_tick_occurred(&self) {
let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
self.process_background_events(); self.process_background_events();
@ -3274,7 +3274,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
// We cannot broadcast our latest local state via monitor update (as // We cannot broadcast our latest local state via monitor update (as
// Channel::force_shutdown tries to make us do) as we may still be in initialization, // Channel::force_shutdown tries to make us do) as we may still be in initialization,
// so we track the update internally and handle it when the user next calls // so we track the update internally and handle it when the user next calls
// timer_chan_freshness_every_min, guaranteeing we're running normally. // timer_tick_occurred, guaranteeing we're running normally.
if let Some((funding_txo, update)) = failure.0.take() { if let Some((funding_txo, update)) = failure.0.take() {
assert_eq!(update.updates.len(), 1); assert_eq!(update.updates.len(), 1);
if let ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast } = update.updates[0] { if let ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast } = update.updates[0] {

View file

@ -7557,7 +7557,7 @@ fn test_check_htlc_underpaying() {
#[test] #[test]
fn test_announce_disable_channels() { fn test_announce_disable_channels() {
// Create 2 channels between A and B. Disconnect B. Call timer_chan_freshness_every_min and check for generated // Create 2 channels between A and B. Disconnect B. Call timer_tick_occurred and check for generated
// ChannelUpdate. Reconnect B, reestablish and check there is non-generated ChannelUpdate. // ChannelUpdate. Reconnect B, reestablish and check there is non-generated ChannelUpdate.
let chanmon_cfgs = create_chanmon_cfgs(2); let chanmon_cfgs = create_chanmon_cfgs(2);
@ -7573,8 +7573,8 @@ fn test_announce_disable_channels() {
nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false); nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
nodes[0].node.timer_chan_freshness_every_min(); // dirty -> stagged nodes[0].node.timer_tick_occurred(); // dirty -> stagged
nodes[0].node.timer_chan_freshness_every_min(); // staged -> fresh nodes[0].node.timer_tick_occurred(); // staged -> fresh
let msg_events = nodes[0].node.get_and_clear_pending_msg_events(); let msg_events = nodes[0].node.get_and_clear_pending_msg_events();
assert_eq!(msg_events.len(), 3); assert_eq!(msg_events.len(), 3);
for e in msg_events { for e in msg_events {
@ -7613,7 +7613,7 @@ fn test_announce_disable_channels() {
nodes[1].node.handle_channel_reestablish(&nodes[0].node.get_our_node_id(), &reestablish_1[2]); nodes[1].node.handle_channel_reestablish(&nodes[0].node.get_our_node_id(), &reestablish_1[2]);
handle_chan_reestablish_msgs!(nodes[1], nodes[0]); handle_chan_reestablish_msgs!(nodes[1], nodes[0]);
nodes[0].node.timer_chan_freshness_every_min(); nodes[0].node.timer_tick_occurred();
assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
} }