mirror of
https://github.com/lightningdevkit/rust-lightning.git
synced 2025-01-19 05:43:55 +01:00
Handle events asynchronously in the BackgroundProcessor's async variant
This commit is contained in:
parent
55b714c01d
commit
8d20ebc376
@ -236,15 +236,11 @@ where A::Target: chain::Access, L::Target: Logger {
|
||||
}
|
||||
|
||||
macro_rules! define_run_body {
|
||||
($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
|
||||
($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
|
||||
$channel_manager: ident, $process_channel_manager_events: expr,
|
||||
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
|
||||
$loop_exit_check: expr, $await: expr)
|
||||
=> { {
|
||||
let event_handler = DecoratingEventHandler {
|
||||
event_handler: $event_handler,
|
||||
gossip_sync: &$gossip_sync,
|
||||
};
|
||||
|
||||
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
|
||||
$channel_manager.timer_tick_occurred();
|
||||
|
||||
@ -255,8 +251,8 @@ macro_rules! define_run_body {
|
||||
let mut have_pruned = false;
|
||||
|
||||
loop {
|
||||
$channel_manager.process_pending_events(&event_handler);
|
||||
$chain_monitor.process_pending_events(&event_handler);
|
||||
$process_channel_manager_events;
|
||||
$process_chain_monitor_events;
|
||||
|
||||
// Note that the PeerManager::process_events may block on ChannelManager's locks,
|
||||
// hence it comes last here. When the ChannelManager finishes whatever it's doing,
|
||||
@ -389,7 +385,8 @@ pub async fn process_events_async<
|
||||
CMH: 'static + Deref + Send + Sync,
|
||||
RMH: 'static + Deref + Send + Sync,
|
||||
OMH: 'static + Deref + Send + Sync,
|
||||
EH: 'static + EventHandler + Send,
|
||||
EventHandlerFuture: core::future::Future<Output = ()>,
|
||||
EventHandler: Fn(Event) -> EventHandlerFuture,
|
||||
PS: 'static + Deref + Send,
|
||||
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
|
||||
CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
|
||||
@ -402,7 +399,7 @@ pub async fn process_events_async<
|
||||
SleepFuture: core::future::Future<Output = bool>,
|
||||
Sleeper: Fn(Duration) -> SleepFuture
|
||||
>(
|
||||
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
|
||||
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
|
||||
gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
|
||||
sleeper: Sleeper,
|
||||
) -> Result<(), std::io::Error>
|
||||
@ -422,7 +419,19 @@ where
|
||||
PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
|
||||
{
|
||||
let mut should_break = true;
|
||||
define_run_body!(persister, event_handler, chain_monitor, channel_manager,
|
||||
let async_event_handler = |event| {
|
||||
let network_graph = gossip_sync.network_graph();
|
||||
let event_handler = &event_handler;
|
||||
async move {
|
||||
if let Some(network_graph) = network_graph {
|
||||
handle_network_graph_update(network_graph, &event)
|
||||
}
|
||||
event_handler(event).await;
|
||||
}
|
||||
};
|
||||
define_run_body!(persister,
|
||||
chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
|
||||
channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
|
||||
gossip_sync, peer_manager, logger, scorer, should_break, {
|
||||
select_biased! {
|
||||
_ = channel_manager.get_persistable_update_future().fuse() => true,
|
||||
@ -527,7 +536,12 @@ impl BackgroundProcessor {
|
||||
let stop_thread = Arc::new(AtomicBool::new(false));
|
||||
let stop_thread_clone = stop_thread.clone();
|
||||
let handle = thread::spawn(move || -> Result<(), std::io::Error> {
|
||||
define_run_body!(persister, event_handler, chain_monitor, channel_manager,
|
||||
let event_handler = DecoratingEventHandler {
|
||||
event_handler,
|
||||
gossip_sync: &gossip_sync,
|
||||
};
|
||||
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
|
||||
channel_manager, channel_manager.process_pending_events(&event_handler),
|
||||
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
|
||||
channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
|
||||
});
|
||||
|
Loading…
Reference in New Issue
Block a user