mirror of
https://github.com/lightningdevkit/rust-lightning.git
synced 2025-01-18 21:34:48 +01:00
Move the pub wait
methods from ChannelManager
to Future
Rather than having three ways to await a `ChannelManager` being persistable, this moves to just exposing the awaitable `Future` and having sleep functions on that.
This commit is contained in:
parent
b455fb5e77
commit
efcb5e02dc
@ -643,7 +643,7 @@ impl BackgroundProcessor {
|
||||
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
|
||||
channel_manager, channel_manager.process_pending_events(&event_handler),
|
||||
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
|
||||
channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
|
||||
channel_manager.get_persistable_update_future().wait_timeout(Duration::from_millis(100)),
|
||||
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
|
||||
});
|
||||
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
|
||||
|
@ -6147,34 +6147,11 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Blocks until ChannelManager needs to be persisted or a timeout is reached. It returns a bool
|
||||
/// indicating whether persistence is necessary. Only one listener on
|
||||
/// [`await_persistable_update`], [`await_persistable_update_timeout`], or a future returned by
|
||||
/// [`get_persistable_update_future`] is guaranteed to be woken up.
|
||||
/// Gets a [`Future`] that completes when this [`ChannelManager`] needs to be persisted.
|
||||
///
|
||||
/// Note that this method is not available with the `no-std` feature.
|
||||
/// Note that callbacks registered on the [`Future`] MUST NOT call back into this
|
||||
/// [`ChannelManager`] and should instead register actions to be taken later.
|
||||
///
|
||||
/// [`await_persistable_update`]: Self::await_persistable_update
|
||||
/// [`await_persistable_update_timeout`]: Self::await_persistable_update_timeout
|
||||
/// [`get_persistable_update_future`]: Self::get_persistable_update_future
|
||||
#[cfg(any(test, feature = "std"))]
|
||||
pub fn await_persistable_update_timeout(&self, max_wait: Duration) -> bool {
|
||||
self.persistence_notifier.wait_timeout(max_wait)
|
||||
}
|
||||
|
||||
/// Blocks until ChannelManager needs to be persisted. Only one listener on
|
||||
/// [`await_persistable_update`], `await_persistable_update_timeout`, or a future returned by
|
||||
/// [`get_persistable_update_future`] is guaranteed to be woken up.
|
||||
///
|
||||
/// [`await_persistable_update`]: Self::await_persistable_update
|
||||
/// [`get_persistable_update_future`]: Self::get_persistable_update_future
|
||||
pub fn await_persistable_update(&self) {
|
||||
self.persistence_notifier.wait()
|
||||
}
|
||||
|
||||
/// Gets a [`Future`] that completes when a persistable update is available. Note that
|
||||
/// callbacks registered on the [`Future`] MUST NOT call back into this [`ChannelManager`] and
|
||||
/// should instead register actions to be taken later.
|
||||
pub fn get_persistable_update_future(&self) -> Future {
|
||||
self.persistence_notifier.get_future()
|
||||
}
|
||||
@ -7954,9 +7931,9 @@ mod tests {
|
||||
|
||||
// All nodes start with a persistable update pending as `create_network` connects each node
|
||||
// with all other nodes to make most tests simpler.
|
||||
assert!(nodes[0].node.await_persistable_update_timeout(Duration::from_millis(1)));
|
||||
assert!(nodes[1].node.await_persistable_update_timeout(Duration::from_millis(1)));
|
||||
assert!(nodes[2].node.await_persistable_update_timeout(Duration::from_millis(1)));
|
||||
assert!(nodes[0].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1)));
|
||||
assert!(nodes[1].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1)));
|
||||
assert!(nodes[2].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1)));
|
||||
|
||||
let mut chan = create_announced_chan_between_nodes(&nodes, 0, 1);
|
||||
|
||||
@ -7970,19 +7947,19 @@ mod tests {
|
||||
&nodes[0].node.get_our_node_id()).pop().unwrap();
|
||||
|
||||
// The first two nodes (which opened a channel) should now require fresh persistence
|
||||
assert!(nodes[0].node.await_persistable_update_timeout(Duration::from_millis(1)));
|
||||
assert!(nodes[1].node.await_persistable_update_timeout(Duration::from_millis(1)));
|
||||
assert!(nodes[0].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1)));
|
||||
assert!(nodes[1].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1)));
|
||||
// ... but the last node should not.
|
||||
assert!(!nodes[2].node.await_persistable_update_timeout(Duration::from_millis(1)));
|
||||
assert!(!nodes[2].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1)));
|
||||
// After persisting the first two nodes they should no longer need fresh persistence.
|
||||
assert!(!nodes[0].node.await_persistable_update_timeout(Duration::from_millis(1)));
|
||||
assert!(!nodes[1].node.await_persistable_update_timeout(Duration::from_millis(1)));
|
||||
assert!(!nodes[0].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1)));
|
||||
assert!(!nodes[1].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1)));
|
||||
|
||||
// Node 3, unrelated to the only channel, shouldn't care if it receives a channel_update
|
||||
// about the channel.
|
||||
nodes[2].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &chan.0);
|
||||
nodes[2].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &chan.1);
|
||||
assert!(!nodes[2].node.await_persistable_update_timeout(Duration::from_millis(1)));
|
||||
assert!(!nodes[2].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1)));
|
||||
|
||||
// The nodes which are a party to the channel should also ignore messages from unrelated
|
||||
// parties.
|
||||
@ -7990,8 +7967,8 @@ mod tests {
|
||||
nodes[0].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.1);
|
||||
nodes[1].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.0);
|
||||
nodes[1].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.1);
|
||||
assert!(!nodes[0].node.await_persistable_update_timeout(Duration::from_millis(1)));
|
||||
assert!(!nodes[1].node.await_persistable_update_timeout(Duration::from_millis(1)));
|
||||
assert!(!nodes[0].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1)));
|
||||
assert!(!nodes[1].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1)));
|
||||
|
||||
// At this point the channel info given by peers should still be the same.
|
||||
assert_eq!(nodes[0].node.list_channels()[0], node_a_chan_info);
|
||||
@ -8008,8 +7985,8 @@ mod tests {
|
||||
// persisted and that its channel info remains the same.
|
||||
nodes[0].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &as_update);
|
||||
nodes[1].node.handle_channel_update(&nodes[0].node.get_our_node_id(), &bs_update);
|
||||
assert!(!nodes[0].node.await_persistable_update_timeout(Duration::from_millis(1)));
|
||||
assert!(!nodes[1].node.await_persistable_update_timeout(Duration::from_millis(1)));
|
||||
assert!(!nodes[0].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1)));
|
||||
assert!(!nodes[1].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1)));
|
||||
assert_eq!(nodes[0].node.list_channels()[0], node_a_chan_info);
|
||||
assert_eq!(nodes[1].node.list_channels()[0], node_b_chan_info);
|
||||
|
||||
@ -8017,8 +7994,8 @@ mod tests {
|
||||
// the channel info has updated.
|
||||
nodes[0].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &bs_update);
|
||||
nodes[1].node.handle_channel_update(&nodes[0].node.get_our_node_id(), &as_update);
|
||||
assert!(nodes[0].node.await_persistable_update_timeout(Duration::from_millis(1)));
|
||||
assert!(nodes[1].node.await_persistable_update_timeout(Duration::from_millis(1)));
|
||||
assert!(nodes[0].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1)));
|
||||
assert!(nodes[1].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1)));
|
||||
assert_ne!(nodes[0].node.list_channels()[0], node_a_chan_info);
|
||||
assert_ne!(nodes[1].node.list_channels()[0], node_b_chan_info);
|
||||
}
|
||||
|
@ -39,15 +39,6 @@ impl Notifier {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn wait(&self) {
|
||||
Sleeper::from_single_future(self.get_future()).wait();
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "std"))]
|
||||
pub(crate) fn wait_timeout(&self, max_wait: Duration) -> bool {
|
||||
Sleeper::from_single_future(self.get_future()).wait_timeout(max_wait)
|
||||
}
|
||||
|
||||
/// Wake waiters, tracking that wake needs to occur even if there are currently no waiters.
|
||||
pub(crate) fn notify(&self) {
|
||||
let mut lock = self.notify_pending.lock().unwrap();
|
||||
@ -167,6 +158,19 @@ impl Future {
|
||||
pub fn register_callback_fn<F: 'static + FutureCallback>(&self, callback: F) {
|
||||
self.register_callback(Box::new(callback));
|
||||
}
|
||||
|
||||
/// Waits until this [`Future`] completes.
|
||||
pub fn wait(self) {
|
||||
Sleeper::from_single_future(self).wait();
|
||||
}
|
||||
|
||||
/// Waits until this [`Future`] completes or the given amount of time has elapsed.
|
||||
///
|
||||
/// Returns true if the [`Future`] completed, false if the time elapsed.
|
||||
#[cfg(any(test, feature = "std"))]
|
||||
pub fn wait_timeout(self, max_wait: Duration) -> bool {
|
||||
Sleeper::from_single_future(self).wait_timeout(max_wait)
|
||||
}
|
||||
}
|
||||
|
||||
use core::task::Waker;
|
||||
@ -369,12 +373,12 @@ mod tests {
|
||||
});
|
||||
|
||||
// Check that we can block indefinitely until updates are available.
|
||||
let _ = persistence_notifier.wait();
|
||||
let _ = persistence_notifier.get_future().wait();
|
||||
|
||||
// Check that the Notifier will return after the given duration if updates are
|
||||
// available.
|
||||
loop {
|
||||
if persistence_notifier.wait_timeout(Duration::from_millis(100)) {
|
||||
if persistence_notifier.get_future().wait_timeout(Duration::from_millis(100)) {
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -384,7 +388,7 @@ mod tests {
|
||||
// Check that the Notifier will return after the given duration even if no updates
|
||||
// are available.
|
||||
loop {
|
||||
if !persistence_notifier.wait_timeout(Duration::from_millis(100)) {
|
||||
if !persistence_notifier.get_future().wait_timeout(Duration::from_millis(100)) {
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -482,8 +486,8 @@ mod tests {
|
||||
|
||||
// If we get a future and don't touch it we're definitely still notify-required.
|
||||
notifier.get_future();
|
||||
assert!(notifier.wait_timeout(Duration::from_millis(1)));
|
||||
assert!(!notifier.wait_timeout(Duration::from_millis(1)));
|
||||
assert!(notifier.get_future().wait_timeout(Duration::from_millis(1)));
|
||||
assert!(!notifier.get_future().wait_timeout(Duration::from_millis(1)));
|
||||
|
||||
// Even if we poll'd once but didn't observe a `Ready`, we should be notify-required.
|
||||
let mut future = notifier.get_future();
|
||||
@ -492,7 +496,7 @@ mod tests {
|
||||
|
||||
notifier.notify();
|
||||
assert!(woken.load(Ordering::SeqCst));
|
||||
assert!(notifier.wait_timeout(Duration::from_millis(1)));
|
||||
assert!(notifier.get_future().wait_timeout(Duration::from_millis(1)));
|
||||
|
||||
// However, once we do poll `Ready` it should wipe the notify-required flag.
|
||||
let mut future = notifier.get_future();
|
||||
@ -502,7 +506,7 @@ mod tests {
|
||||
notifier.notify();
|
||||
assert!(woken.load(Ordering::SeqCst));
|
||||
assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
|
||||
assert!(!notifier.wait_timeout(Duration::from_millis(1)));
|
||||
assert!(!notifier.get_future().wait_timeout(Duration::from_millis(1)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
Loading…
Reference in New Issue
Block a user