Add a background processing function that is async.

Adds a method which operates like BackgroundProcessor::start but
instead of functioning by spawning a background thread it is async.
This commit is contained in:
Matt Corallo 2022-08-09 06:01:10 +00:00
parent c6890cfc33
commit 2a5bac22bf
3 changed files with 206 additions and 123 deletions

View File

@ -17,6 +17,7 @@ rustdoc-args = ["--cfg", "docsrs"]
bitcoin = "0.28.1"
lightning = { version = "0.0.110", path = "../lightning", features = ["std"] }
lightning-rapid-gossip-sync = { version = "0.0.110", path = "../lightning-rapid-gossip-sync" }
futures = { version = "0.3", optional = true }
[dev-dependencies]
lightning = { version = "0.0.110", path = "../lightning", features = ["_test_utils"] }

View File

@ -31,6 +31,9 @@ use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use std::ops::Deref;
#[cfg(feature = "futures")]
use futures::{select, future::FutureExt};
/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
/// responsibilities are:
@ -219,6 +222,203 @@ where A::Target: chain::Access, L::Target: Logger {
}
}
macro_rules! define_run_body {
($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
$loop_exit_check: expr, $await: expr)
=> { {
let event_handler = DecoratingEventHandler {
event_handler: $event_handler,
gossip_sync: &$gossip_sync,
};
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
$channel_manager.timer_tick_occurred();
let mut last_freshness_call = Instant::now();
let mut last_ping_call = Instant::now();
let mut last_prune_call = Instant::now();
let mut last_scorer_persist_call = Instant::now();
let mut have_pruned = false;
loop {
$channel_manager.process_pending_events(&event_handler);
$chain_monitor.process_pending_events(&event_handler);
// Note that the PeerManager::process_events may block on ChannelManager's locks,
// hence it comes last here. When the ChannelManager finishes whatever it's doing,
// we want to ensure we get into `persist_manager` as quickly as we can, especially
// without running the normal event processing above and handing events to users.
//
// Specifically, on an *extremely* slow machine, we may see ChannelManager start
// processing a message effectively at any point during this loop. In order to
// minimize the time between such processing completing and persisting the updated
// ChannelManager, we want to minimize methods blocking on a ChannelManager
// generally, and as a fallback place such blocking only immediately before
// persistence.
$peer_manager.process_events();
// We wait up to 100ms, but track how long it takes to detect being put to sleep,
// see `await_start`'s use below.
let await_start = Instant::now();
let updates_available = $await;
let await_time = await_start.elapsed();
if updates_available {
log_trace!($logger, "Persisting ChannelManager...");
$persister.persist_manager(&*$channel_manager)?;
log_trace!($logger, "Done persisting ChannelManager.");
}
// Exit the loop if the background processor was requested to stop.
if $loop_exit_check {
log_trace!($logger, "Terminating background processor.");
break;
}
if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
$channel_manager.timer_tick_occurred();
last_freshness_call = Instant::now();
}
if await_time > Duration::from_secs(1) {
// On various platforms, we may be starved of CPU cycles for several reasons.
// E.g. on iOS, if we've been in the background, we will be entirely paused.
// Similarly, if we're on a desktop platform and the device has been asleep, we
// may not get any cycles.
// We detect this by checking if our max-100ms-sleep, above, ran longer than a
// full second, at which point we assume sockets may have been killed (they
// appear to be at least on some platforms, even if it has only been a second).
// Note that we have to take care to not get here just because user event
// processing was slow at the top of the loop. For example, the sample client
// may call Bitcoin Core RPCs during event handling, which very often takes
// more than a handful of seconds to complete, and shouldn't disconnect all our
// peers.
log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
$peer_manager.disconnect_all_peers();
last_ping_call = Instant::now();
} else if last_ping_call.elapsed().as_secs() > PING_TIMER {
log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
$peer_manager.timer_tick_occurred();
last_ping_call = Instant::now();
}
// Note that we want to run a graph prune once not long after startup before
// falling back to our usual hourly prunes. This avoids short-lived clients never
// pruning their network graph. We run once 60 seconds after startup before
// continuing our normal cadence.
if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
// The network graph must not be pruned while rapid sync completion is pending
log_trace!($logger, "Assessing prunability of network graph");
if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
network_graph.remove_stale_channels();
if let Err(e) = $persister.persist_graph(network_graph) {
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
}
last_prune_call = Instant::now();
have_pruned = true;
} else {
log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
}
}
if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
if let Some(ref scorer) = $scorer {
log_trace!($logger, "Persisting scorer");
if let Err(e) = $persister.persist_scorer(&scorer) {
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
}
}
last_scorer_persist_call = Instant::now();
}
}
// After we exit, ensure we persist the ChannelManager one final time - this avoids
// some races where users quit while channel updates were in-flight, with
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
$persister.persist_manager(&*$channel_manager)?;
// Persist Scorer on exit
if let Some(ref scorer) = $scorer {
$persister.persist_scorer(&scorer)?;
}
// Persist NetworkGraph on exit
if let Some(network_graph) = $gossip_sync.network_graph() {
$persister.persist_graph(network_graph)?;
}
Ok(())
} }
}
/// Processes background events in a future.
///
/// `sleeper` should return a future which completes in the given amount of time and returns a
/// boolean indicating whether the background processing should continue. Once `sleeper` returns a
/// future which outputs false, the loop will exit and this function's future will complete.
///
/// See [`BackgroundProcessor::start`] for information on which actions this handles.
#[cfg(feature = "futures")]
pub async fn process_events_async<
'a,
Signer: 'static + Sign,
CA: 'static + Deref + Send + Sync,
CF: 'static + Deref + Send + Sync,
CW: 'static + Deref + Send + Sync,
T: 'static + Deref + Send + Sync,
K: 'static + Deref + Send + Sync,
F: 'static + Deref + Send + Sync,
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
L: 'static + Deref + Send + Sync,
P: 'static + Deref + Send + Sync,
Descriptor: 'static + SocketDescriptor + Send + Sync,
CMH: 'static + Deref + Send + Sync,
RMH: 'static + Deref + Send + Sync,
EH: 'static + EventHandler + Send,
PS: 'static + Deref + Send,
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
UMH: 'static + Deref + Send + Sync,
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
S: 'static + Deref<Target = SC> + Send + Sync,
SC: WriteableScore<'a>,
SleepFuture: core::future::Future<Output = bool>,
Sleeper: Fn(Duration) -> SleepFuture
>(
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
sleeper: Sleeper,
) -> Result<(), std::io::Error>
where
CA::Target: 'static + chain::Access,
CF::Target: 'static + chain::Filter,
CW::Target: 'static + chain::Watch<Signer>,
T::Target: 'static + BroadcasterInterface,
K::Target: 'static + KeysInterface<Signer = Signer>,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
P::Target: 'static + Persist<Signer>,
CMH::Target: 'static + ChannelMessageHandler,
RMH::Target: 'static + RoutingMessageHandler,
UMH::Target: 'static + CustomMessageHandler,
PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
{
let mut should_continue = true;
define_run_body!(persister, event_handler, chain_monitor, channel_manager,
gossip_sync, peer_manager, logger, scorer, should_continue, {
select! {
_ = channel_manager.get_persistable_update_future().fuse() => true,
cont = sleeper(Duration::from_millis(100)).fuse() => {
should_continue = cont;
false
}
}
})
}
impl BackgroundProcessor {
/// Start a background thread that takes care of responsibilities enumerated in the [top-level
/// documentation].
@ -310,129 +510,9 @@ impl BackgroundProcessor {
let stop_thread = Arc::new(AtomicBool::new(false));
let stop_thread_clone = stop_thread.clone();
let handle = thread::spawn(move || -> Result<(), std::io::Error> {
let event_handler = DecoratingEventHandler {
event_handler,
gossip_sync: &gossip_sync,
};
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
channel_manager.timer_tick_occurred();
let mut last_freshness_call = Instant::now();
let mut last_ping_call = Instant::now();
let mut last_prune_call = Instant::now();
let mut last_scorer_persist_call = Instant::now();
let mut have_pruned = false;
loop {
channel_manager.process_pending_events(&event_handler);
chain_monitor.process_pending_events(&event_handler);
// Note that the PeerManager::process_events may block on ChannelManager's locks,
// hence it comes last here. When the ChannelManager finishes whatever it's doing,
// we want to ensure we get into `persist_manager` as quickly as we can, especially
// without running the normal event processing above and handing events to users.
//
// Specifically, on an *extremely* slow machine, we may see ChannelManager start
// processing a message effectively at any point during this loop. In order to
// minimize the time between such processing completing and persisting the updated
// ChannelManager, we want to minimize methods blocking on a ChannelManager
// generally, and as a fallback place such blocking only immediately before
// persistence.
peer_manager.process_events();
// We wait up to 100ms, but track how long it takes to detect being put to sleep,
// see `await_start`'s use below.
let await_start = Instant::now();
let updates_available =
channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
let await_time = await_start.elapsed();
if updates_available {
log_trace!(logger, "Persisting ChannelManager...");
persister.persist_manager(&*channel_manager)?;
log_trace!(logger, "Done persisting ChannelManager.");
}
// Exit the loop if the background processor was requested to stop.
if stop_thread.load(Ordering::Acquire) == true {
log_trace!(logger, "Terminating background processor.");
break;
}
if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
channel_manager.timer_tick_occurred();
last_freshness_call = Instant::now();
}
if await_time > Duration::from_secs(1) {
// On various platforms, we may be starved of CPU cycles for several reasons.
// E.g. on iOS, if we've been in the background, we will be entirely paused.
// Similarly, if we're on a desktop platform and the device has been asleep, we
// may not get any cycles.
// We detect this by checking if our max-100ms-sleep, above, ran longer than a
// full second, at which point we assume sockets may have been killed (they
// appear to be at least on some platforms, even if it has only been a second).
// Note that we have to take care to not get here just because user event
// processing was slow at the top of the loop. For example, the sample client
// may call Bitcoin Core RPCs during event handling, which very often takes
// more than a handful of seconds to complete, and shouldn't disconnect all our
// peers.
log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
peer_manager.disconnect_all_peers();
last_ping_call = Instant::now();
} else if last_ping_call.elapsed().as_secs() > PING_TIMER {
log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
peer_manager.timer_tick_occurred();
last_ping_call = Instant::now();
}
// Note that we want to run a graph prune once not long after startup before
// falling back to our usual hourly prunes. This avoids short-lived clients never
// pruning their network graph. We run once 60 seconds after startup before
// continuing our normal cadence.
if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
// The network graph must not be pruned while rapid sync completion is pending
log_trace!(logger, "Assessing prunability of network graph");
if let Some(network_graph) = gossip_sync.prunable_network_graph() {
network_graph.remove_stale_channels();
if let Err(e) = persister.persist_graph(network_graph) {
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
}
last_prune_call = Instant::now();
have_pruned = true;
} else {
log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
}
}
if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
if let Some(ref scorer) = scorer {
log_trace!(logger, "Persisting scorer");
if let Err(e) = persister.persist_scorer(&scorer) {
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
}
}
last_scorer_persist_call = Instant::now();
}
}
// After we exit, ensure we persist the ChannelManager one final time - this avoids
// some races where users quit while channel updates were in-flight, with
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
persister.persist_manager(&*channel_manager)?;
// Persist Scorer on exit
if let Some(ref scorer) = scorer {
persister.persist_scorer(&scorer)?;
}
// Persist NetworkGraph on exit
if let Some(network_graph) = gossip_sync.network_graph() {
persister.persist_graph(network_graph)?;
}
Ok(())
define_run_body!(persister, event_handler, chain_monitor, channel_manager,
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
}

View File

@ -18,6 +18,8 @@ use core::mem;
use core::time::Duration;
use sync::{Condvar, Mutex};
use prelude::{Box, Vec};
#[cfg(any(test, feature = "std"))]
use std::time::Instant;