diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index cbff019a4..11d4d7af2 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -38,6 +38,8 @@ use lightning::routing::router::Router; use lightning::routing::scoring::{Score, WriteableScore}; use lightning::util::logger::Logger; use lightning::util::persist::Persister; +#[cfg(feature = "std")] +use lightning::util::wakers::Sleeper; use lightning_rapid_gossip_sync::RapidGossipSync; use core::ops::Deref; @@ -388,15 +390,20 @@ pub(crate) mod futures_util { use core::task::{Poll, Waker, RawWaker, RawWakerVTable}; use core::pin::Pin; use core::marker::Unpin; - pub(crate) struct Selector + Unpin, B: Future + Unpin> { + pub(crate) struct Selector< + A: Future + Unpin, B: Future + Unpin, C: Future + Unpin + > { pub a: A, pub b: B, + pub c: C, } pub(crate) enum SelectorOutput { - A, B(bool), + A, B, C(bool), } - impl + Unpin, B: Future + Unpin> Future for Selector { + impl< + A: Future + Unpin, B: Future + Unpin, C: Future + Unpin + > Future for Selector { type Output = SelectorOutput; fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll { match Pin::new(&mut self.a).poll(ctx) { @@ -404,7 +411,11 @@ pub(crate) mod futures_util { Poll::Pending => {}, } match Pin::new(&mut self.b).poll(ctx) { - Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); }, + Poll::Ready(()) => { return Poll::Ready(SelectorOutput::B); }, + Poll::Pending => {}, + } + match Pin::new(&mut self.c).poll(ctx) { + Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); }, Poll::Pending => {}, } Poll::Pending @@ -514,11 +525,13 @@ where gossip_sync, peer_manager, logger, scorer, should_break, { let fut = Selector { a: channel_manager.get_persistable_update_future(), - b: sleeper(Duration::from_millis(100)), + b: chain_monitor.get_update_future(), + c: sleeper(Duration::from_millis(100)), }; match fut.await { SelectorOutput::A => true, - SelectorOutput::B(exit) => { + SelectorOutput::B => false, + SelectorOutput::C(exit) => { should_break = exit; false } @@ -643,7 +656,10 @@ impl BackgroundProcessor { define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), channel_manager, channel_manager.process_pending_events(&event_handler), gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), - channel_manager.get_persistable_update_future().wait_timeout(Duration::from_millis(100)), + Sleeper::from_two_futures( + channel_manager.get_persistable_update_future(), + chain_monitor.get_update_future() + ).wait_timeout(Duration::from_millis(100)), |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 4bfb47de4..f4109ac17 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -37,6 +37,7 @@ use crate::events::{Event, EventHandler}; use crate::util::atomic_counter::AtomicCounter; use crate::util::logger::Logger; use crate::util::errors::APIError; +use crate::util::wakers::{Future, Notifier}; use crate::ln::channelmanager::ChannelDetails; use crate::prelude::*; @@ -240,6 +241,8 @@ pub struct ChainMonitor, Option)>>, /// The best block height seen, used as a proxy for the passage of time. highest_chain_height: AtomicUsize, + + event_notifier: Notifier, } impl ChainMonitor @@ -300,6 +303,7 @@ where C::Target: chain::Filter, ChannelMonitorUpdateStatus::PermanentFailure => { monitor_state.channel_perm_failed.store(true, Ordering::Release); self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id())); + self.event_notifier.notify(); }, ChannelMonitorUpdateStatus::InProgress => { log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); @@ -345,6 +349,7 @@ where C::Target: chain::Filter, persister, pending_monitor_events: Mutex::new(Vec::new()), highest_chain_height: AtomicUsize::new(0), + event_notifier: Notifier::new(), } } @@ -472,6 +477,7 @@ where C::Target: chain::Filter, } }, } + self.event_notifier.notify(); Ok(()) } @@ -486,6 +492,7 @@ where C::Target: chain::Filter, funding_txo, monitor_update_id, }], counterparty_node_id)); + self.event_notifier.notify(); } #[cfg(any(test, fuzzing, feature = "_test_utils"))] @@ -514,6 +521,18 @@ where C::Target: chain::Filter, handler(event).await; } } + + /// Gets a [`Future`] that completes when an event is available either via + /// [`chain::Watch::release_pending_monitor_events`] or + /// [`EventsProvider::process_pending_events`]. + /// + /// Note that callbacks registered on the [`Future`] MUST NOT call back into this + /// [`ChainMonitor`] and should instead register actions to be taken later. + /// + /// [`EventsProvider::process_pending_events`]: crate::events::EventsProvider::process_pending_events + pub fn get_update_future(&self) -> Future { + self.event_notifier.get_future() + } } impl