Merge pull request #1962 from TheBlueMatt/2023-01-bp-no-std

Use the user-provided `SleepFuture` for interval checks in `background-processor`
This commit is contained in:
Matt Corallo 2023-01-17 23:48:48 +00:00 committed by GitHub
commit c86950d510
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 68 additions and 32 deletions

View file

@ -15,11 +15,14 @@ rustdoc-args = ["--cfg", "docsrs"]
[features] [features]
futures = [ "futures-util" ] futures = [ "futures-util" ]
std = ["lightning/std", "lightning-rapid-gossip-sync/std"]
default = ["std"]
[dependencies] [dependencies]
bitcoin = "0.29.0" bitcoin = { version = "0.29.0", default-features = false }
lightning = { version = "0.0.113", path = "../lightning", features = ["std"] } lightning = { version = "0.0.113", path = "../lightning", default-features = false }
lightning-rapid-gossip-sync = { version = "0.0.113", path = "../lightning-rapid-gossip-sync" } lightning-rapid-gossip-sync = { version = "0.0.113", path = "../lightning-rapid-gossip-sync", default-features = false }
futures-util = { version = "0.3", default-features = false, features = ["async-await-macro"], optional = true } futures-util = { version = "0.3", default-features = false, features = ["async-await-macro"], optional = true }
[dev-dependencies] [dev-dependencies]

View file

@ -11,6 +11,11 @@
#![cfg_attr(docsrs, feature(doc_auto_cfg))] #![cfg_attr(docsrs, feature(doc_auto_cfg))]
#![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
#[cfg(any(test, feature = "std"))]
extern crate core;
#[macro_use] extern crate lightning; #[macro_use] extern crate lightning;
extern crate lightning_rapid_gossip_sync; extern crate lightning_rapid_gossip_sync;
@ -28,15 +33,22 @@ use lightning::util::events::{Event, EventHandler, EventsProvider};
use lightning::util::logger::Logger; use lightning::util::logger::Logger;
use lightning::util::persist::Persister; use lightning::util::persist::Persister;
use lightning_rapid_gossip_sync::RapidGossipSync; use lightning_rapid_gossip_sync::RapidGossipSync;
use lightning::io;
use core::ops::Deref;
use core::time::Duration;
#[cfg(feature = "std")]
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering}; #[cfg(feature = "std")]
use std::thread; use core::sync::atomic::{AtomicBool, Ordering};
use std::thread::JoinHandle; #[cfg(feature = "std")]
use std::time::{Duration, Instant}; use std::thread::{self, JoinHandle};
use std::ops::Deref; #[cfg(feature = "std")]
use std::time::Instant;
#[cfg(feature = "futures")] #[cfg(feature = "futures")]
use futures_util::{select_biased, future::FutureExt}; use futures_util::{select_biased, future::FutureExt, task};
/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
@ -62,6 +74,7 @@ use futures_util::{select_biased, future::FutureExt};
/// ///
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
/// [`Event`]: lightning::util::events::Event /// [`Event`]: lightning::util::events::Event
#[cfg(feature = "std")]
#[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."] #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
pub struct BackgroundProcessor { pub struct BackgroundProcessor {
stop_thread: Arc<AtomicBool>, stop_thread: Arc<AtomicBool>,
@ -207,15 +220,15 @@ macro_rules! define_run_body {
($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
$channel_manager: ident, $process_channel_manager_events: expr, $channel_manager: ident, $process_channel_manager_events: expr,
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident, $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
$loop_exit_check: expr, $await: expr) $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
=> { { => { {
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
$channel_manager.timer_tick_occurred(); $channel_manager.timer_tick_occurred();
let mut last_freshness_call = Instant::now(); let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
let mut last_ping_call = Instant::now(); let mut last_ping_call = $get_timer(PING_TIMER);
let mut last_prune_call = Instant::now(); let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
let mut last_scorer_persist_call = Instant::now(); let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
let mut have_pruned = false; let mut have_pruned = false;
loop { loop {
@ -237,9 +250,9 @@ macro_rules! define_run_body {
// We wait up to 100ms, but track how long it takes to detect being put to sleep, // We wait up to 100ms, but track how long it takes to detect being put to sleep,
// see `await_start`'s use below. // see `await_start`'s use below.
let await_start = Instant::now(); let mut await_start = $get_timer(1);
let updates_available = $await; let updates_available = $await;
let await_time = await_start.elapsed(); let await_slow = $timer_elapsed(&mut await_start, 1);
if updates_available { if updates_available {
log_trace!($logger, "Persisting ChannelManager..."); log_trace!($logger, "Persisting ChannelManager...");
@ -251,12 +264,12 @@ macro_rules! define_run_body {
log_trace!($logger, "Terminating background processor."); log_trace!($logger, "Terminating background processor.");
break; break;
} }
if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER { if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred"); log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
$channel_manager.timer_tick_occurred(); $channel_manager.timer_tick_occurred();
last_freshness_call = Instant::now(); last_freshness_call = $get_timer(FRESHNESS_TIMER);
} }
if await_time > Duration::from_secs(1) { if await_slow {
// On various platforms, we may be starved of CPU cycles for several reasons. // On various platforms, we may be starved of CPU cycles for several reasons.
// E.g. on iOS, if we've been in the background, we will be entirely paused. // E.g. on iOS, if we've been in the background, we will be entirely paused.
// Similarly, if we're on a desktop platform and the device has been asleep, we // Similarly, if we're on a desktop platform and the device has been asleep, we
@ -271,40 +284,46 @@ macro_rules! define_run_body {
// peers. // peers.
log_trace!($logger, "100ms sleep took more than a second, disconnecting peers."); log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
$peer_manager.disconnect_all_peers(); $peer_manager.disconnect_all_peers();
last_ping_call = Instant::now(); last_ping_call = $get_timer(PING_TIMER);
} else if last_ping_call.elapsed().as_secs() > PING_TIMER { } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
log_trace!($logger, "Calling PeerManager's timer_tick_occurred"); log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
$peer_manager.timer_tick_occurred(); $peer_manager.timer_tick_occurred();
last_ping_call = Instant::now(); last_ping_call = $get_timer(PING_TIMER);
} }
// Note that we want to run a graph prune once not long after startup before // Note that we want to run a graph prune once not long after startup before
// falling back to our usual hourly prunes. This avoids short-lived clients never // falling back to our usual hourly prunes. This avoids short-lived clients never
// pruning their network graph. We run once 60 seconds after startup before // pruning their network graph. We run once 60 seconds after startup before
// continuing our normal cadence. // continuing our normal cadence.
if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } { if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
// The network graph must not be pruned while rapid sync completion is pending // The network graph must not be pruned while rapid sync completion is pending
if let Some(network_graph) = $gossip_sync.prunable_network_graph() { if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
#[cfg(feature = "std")] {
log_trace!($logger, "Pruning and persisting network graph."); log_trace!($logger, "Pruning and persisting network graph.");
network_graph.remove_stale_channels_and_tracking(); network_graph.remove_stale_channels_and_tracking();
}
#[cfg(not(feature = "std"))] {
log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
log_trace!($logger, "Persisting network graph.");
}
if let Err(e) = $persister.persist_graph(network_graph) { if let Err(e) = $persister.persist_graph(network_graph) {
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e) log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
} }
last_prune_call = Instant::now(); last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
have_pruned = true; have_pruned = true;
} }
} }
if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER { if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
if let Some(ref scorer) = $scorer { if let Some(ref scorer) = $scorer {
log_trace!($logger, "Persisting scorer"); log_trace!($logger, "Persisting scorer");
if let Err(e) = $persister.persist_scorer(&scorer) { if let Err(e) = $persister.persist_scorer(&scorer) {
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
} }
} }
last_scorer_persist_call = Instant::now(); last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
} }
} }
@ -334,6 +353,11 @@ macro_rules! define_run_body {
/// future which outputs true, the loop will exit and this function's future will complete. /// future which outputs true, the loop will exit and this function's future will complete.
/// ///
/// See [`BackgroundProcessor::start`] for information on which actions this handles. /// See [`BackgroundProcessor::start`] for information on which actions this handles.
///
/// Requires the `futures` feature. Note that while this method is available without the `std`
/// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
/// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
/// manually instead.
#[cfg(feature = "futures")] #[cfg(feature = "futures")]
pub async fn process_events_async< pub async fn process_events_async<
'a, 'a,
@ -364,13 +388,13 @@ pub async fn process_events_async<
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync, PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
S: 'static + Deref<Target = SC> + Send + Sync, S: 'static + Deref<Target = SC> + Send + Sync,
SC: WriteableScore<'a>, SC: WriteableScore<'a>,
SleepFuture: core::future::Future<Output = bool>, SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
Sleeper: Fn(Duration) -> SleepFuture Sleeper: Fn(Duration) -> SleepFuture
>( >(
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM, persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>, gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
sleeper: Sleeper, sleeper: Sleeper,
) -> Result<(), std::io::Error> ) -> Result<(), io::Error>
where where
CA::Target: 'static + chain::Access, CA::Target: 'static + chain::Access,
CF::Target: 'static + chain::Filter, CF::Target: 'static + chain::Filter,
@ -411,9 +435,15 @@ where
false false
} }
} }
}, |t| sleeper(Duration::from_secs(t)),
|fut: &mut SleepFuture, _| {
let mut waker = task::noop_waker();
let mut ctx = task::Context::from_waker(&mut waker);
core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
}) })
} }
#[cfg(feature = "std")]
impl BackgroundProcessor { impl BackgroundProcessor {
/// Start a background thread that takes care of responsibilities enumerated in the [top-level /// Start a background thread that takes care of responsibilities enumerated in the [top-level
/// documentation]. /// documentation].
@ -522,7 +552,8 @@ impl BackgroundProcessor {
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
channel_manager, channel_manager.process_pending_events(&event_handler), channel_manager, channel_manager.process_pending_events(&event_handler),
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
channel_manager.await_persistable_update_timeout(Duration::from_millis(100))) channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
}); });
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
} }
@ -568,13 +599,14 @@ impl BackgroundProcessor {
} }
} }
#[cfg(feature = "std")]
impl Drop for BackgroundProcessor { impl Drop for BackgroundProcessor {
fn drop(&mut self) { fn drop(&mut self) {
self.stop_and_join_thread().unwrap(); self.stop_and_join_thread().unwrap();
} }
} }
#[cfg(test)] #[cfg(all(feature = "std", test))]
mod tests { mod tests {
use bitcoin::blockdata::block::BlockHeader; use bitcoin::blockdata::block::BlockHeader;
use bitcoin::blockdata::constants::genesis_block; use bitcoin::blockdata::constants::genesis_block;

View file

@ -10,3 +10,4 @@ default = ["lightning/no-std", "lightning-invoice/no-std", "lightning-rapid-goss
lightning = { path = "../lightning", default-features = false } lightning = { path = "../lightning", default-features = false }
lightning-invoice = { path = "../lightning-invoice", default-features = false } lightning-invoice = { path = "../lightning-invoice", default-features = false }
lightning-rapid-gossip-sync = { path = "../lightning-rapid-gossip-sync", default-features = false } lightning-rapid-gossip-sync = { path = "../lightning-rapid-gossip-sync", default-features = false }
lightning-background-processor = { path = "../lightning-background-processor", features = ["futures"], default-features = false }