mirror of
https://github.com/lightningdevkit/rust-lightning.git
synced 2025-02-25 15:20:24 +01:00
Merge pull request #2995 from tnull/2024-04-fallible-event-handler
Make event handling fallible
This commit is contained in:
commit
2bfddea062
9 changed files with 595 additions and 259 deletions
|
@ -26,6 +26,8 @@ use lightning::chain::chainmonitor::{ChainMonitor, Persist};
|
||||||
use lightning::events::EventHandler;
|
use lightning::events::EventHandler;
|
||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
use lightning::events::EventsProvider;
|
use lightning::events::EventsProvider;
|
||||||
|
#[cfg(feature = "futures")]
|
||||||
|
use lightning::events::ReplayEvent;
|
||||||
use lightning::events::{Event, PathFailure};
|
use lightning::events::{Event, PathFailure};
|
||||||
|
|
||||||
use lightning::ln::channelmanager::AChannelManager;
|
use lightning::ln::channelmanager::AChannelManager;
|
||||||
|
@ -583,6 +585,7 @@ use futures_util::{dummy_waker, Selector, SelectorOutput};
|
||||||
/// could setup `process_events_async` like this:
|
/// could setup `process_events_async` like this:
|
||||||
/// ```
|
/// ```
|
||||||
/// # use lightning::io;
|
/// # use lightning::io;
|
||||||
|
/// # use lightning::events::ReplayEvent;
|
||||||
/// # use std::sync::{Arc, RwLock};
|
/// # use std::sync::{Arc, RwLock};
|
||||||
/// # use std::sync::atomic::{AtomicBool, Ordering};
|
/// # use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
/// # use std::time::SystemTime;
|
/// # use std::time::SystemTime;
|
||||||
|
@ -600,7 +603,7 @@ use futures_util::{dummy_waker, Selector, SelectorOutput};
|
||||||
/// # }
|
/// # }
|
||||||
/// # struct EventHandler {}
|
/// # struct EventHandler {}
|
||||||
/// # impl EventHandler {
|
/// # impl EventHandler {
|
||||||
/// # async fn handle_event(&self, _: lightning::events::Event) {}
|
/// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) }
|
||||||
/// # }
|
/// # }
|
||||||
/// # #[derive(Eq, PartialEq, Clone, Hash)]
|
/// # #[derive(Eq, PartialEq, Clone, Hash)]
|
||||||
/// # struct SocketDescriptor {}
|
/// # struct SocketDescriptor {}
|
||||||
|
@ -698,7 +701,7 @@ pub async fn process_events_async<
|
||||||
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
|
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
|
||||||
L: 'static + Deref + Send + Sync,
|
L: 'static + Deref + Send + Sync,
|
||||||
P: 'static + Deref + Send + Sync,
|
P: 'static + Deref + Send + Sync,
|
||||||
EventHandlerFuture: core::future::Future<Output = ()>,
|
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
|
||||||
EventHandler: Fn(Event) -> EventHandlerFuture,
|
EventHandler: Fn(Event) -> EventHandlerFuture,
|
||||||
PS: 'static + Deref + Send,
|
PS: 'static + Deref + Send,
|
||||||
M: 'static
|
M: 'static
|
||||||
|
@ -751,12 +754,16 @@ where
|
||||||
if update_scorer(scorer, &event, duration_since_epoch) {
|
if update_scorer(scorer, &event, duration_since_epoch) {
|
||||||
log_trace!(logger, "Persisting scorer after update");
|
log_trace!(logger, "Persisting scorer after update");
|
||||||
if let Err(e) = persister.persist_scorer(&scorer) {
|
if let Err(e) = persister.persist_scorer(&scorer) {
|
||||||
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
|
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
|
||||||
|
// We opt not to abort early on persistence failure here as persisting
|
||||||
|
// the scorer is non-critical and we still hope that it will have
|
||||||
|
// resolved itself when it is potentially critical in event handling
|
||||||
|
// below.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
event_handler(event).await;
|
event_handler(event).await
|
||||||
})
|
})
|
||||||
};
|
};
|
||||||
define_run_body!(
|
define_run_body!(
|
||||||
|
@ -913,7 +920,7 @@ impl BackgroundProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
event_handler.handle_event(event);
|
event_handler.handle_event(event)
|
||||||
};
|
};
|
||||||
define_run_body!(
|
define_run_body!(
|
||||||
persister,
|
persister,
|
||||||
|
@ -1012,10 +1019,13 @@ mod tests {
|
||||||
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
|
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
|
||||||
use bitcoin::transaction::Version;
|
use bitcoin::transaction::Version;
|
||||||
use bitcoin::{Amount, ScriptBuf, Txid};
|
use bitcoin::{Amount, ScriptBuf, Txid};
|
||||||
|
use core::sync::atomic::{AtomicBool, Ordering};
|
||||||
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
|
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
|
||||||
use lightning::chain::transaction::OutPoint;
|
use lightning::chain::transaction::OutPoint;
|
||||||
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
|
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
|
||||||
use lightning::events::{Event, MessageSendEvent, MessageSendEventsProvider, PathFailure};
|
use lightning::events::{
|
||||||
|
Event, MessageSendEvent, MessageSendEventsProvider, PathFailure, ReplayEvent,
|
||||||
|
};
|
||||||
use lightning::ln::channelmanager;
|
use lightning::ln::channelmanager;
|
||||||
use lightning::ln::channelmanager::{
|
use lightning::ln::channelmanager::{
|
||||||
ChainParameters, PaymentId, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA,
|
ChainParameters, PaymentId, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA,
|
||||||
|
@ -1757,7 +1767,7 @@ mod tests {
|
||||||
// Initiate the background processors to watch each node.
|
// Initiate the background processors to watch each node.
|
||||||
let data_dir = nodes[0].kv_store.get_data_dir();
|
let data_dir = nodes[0].kv_store.get_data_dir();
|
||||||
let persister = Arc::new(Persister::new(data_dir));
|
let persister = Arc::new(Persister::new(data_dir));
|
||||||
let event_handler = |_: _| {};
|
let event_handler = |_: _| Ok(());
|
||||||
let bg_processor = BackgroundProcessor::start(
|
let bg_processor = BackgroundProcessor::start(
|
||||||
persister,
|
persister,
|
||||||
event_handler,
|
event_handler,
|
||||||
|
@ -1847,7 +1857,7 @@ mod tests {
|
||||||
let (_, nodes) = create_nodes(1, "test_timer_tick_called");
|
let (_, nodes) = create_nodes(1, "test_timer_tick_called");
|
||||||
let data_dir = nodes[0].kv_store.get_data_dir();
|
let data_dir = nodes[0].kv_store.get_data_dir();
|
||||||
let persister = Arc::new(Persister::new(data_dir));
|
let persister = Arc::new(Persister::new(data_dir));
|
||||||
let event_handler = |_: _| {};
|
let event_handler = |_: _| Ok(());
|
||||||
let bg_processor = BackgroundProcessor::start(
|
let bg_processor = BackgroundProcessor::start(
|
||||||
persister,
|
persister,
|
||||||
event_handler,
|
event_handler,
|
||||||
|
@ -1889,7 +1899,7 @@ mod tests {
|
||||||
let persister = Arc::new(
|
let persister = Arc::new(
|
||||||
Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
|
Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
|
||||||
);
|
);
|
||||||
let event_handler = |_: _| {};
|
let event_handler = |_: _| Ok(());
|
||||||
let bg_processor = BackgroundProcessor::start(
|
let bg_processor = BackgroundProcessor::start(
|
||||||
persister,
|
persister,
|
||||||
event_handler,
|
event_handler,
|
||||||
|
@ -1924,7 +1934,7 @@ mod tests {
|
||||||
|
|
||||||
let bp_future = super::process_events_async(
|
let bp_future = super::process_events_async(
|
||||||
persister,
|
persister,
|
||||||
|_: _| async {},
|
|_: _| async { Ok(()) },
|
||||||
nodes[0].chain_monitor.clone(),
|
nodes[0].chain_monitor.clone(),
|
||||||
nodes[0].node.clone(),
|
nodes[0].node.clone(),
|
||||||
Some(nodes[0].messenger.clone()),
|
Some(nodes[0].messenger.clone()),
|
||||||
|
@ -1957,7 +1967,7 @@ mod tests {
|
||||||
let data_dir = nodes[0].kv_store.get_data_dir();
|
let data_dir = nodes[0].kv_store.get_data_dir();
|
||||||
let persister =
|
let persister =
|
||||||
Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
|
Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
|
||||||
let event_handler = |_: _| {};
|
let event_handler = |_: _| Ok(());
|
||||||
let bg_processor = BackgroundProcessor::start(
|
let bg_processor = BackgroundProcessor::start(
|
||||||
persister,
|
persister,
|
||||||
event_handler,
|
event_handler,
|
||||||
|
@ -1986,7 +1996,7 @@ mod tests {
|
||||||
let data_dir = nodes[0].kv_store.get_data_dir();
|
let data_dir = nodes[0].kv_store.get_data_dir();
|
||||||
let persister =
|
let persister =
|
||||||
Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
|
Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
|
||||||
let event_handler = |_: _| {};
|
let event_handler = |_: _| Ok(());
|
||||||
let bg_processor = BackgroundProcessor::start(
|
let bg_processor = BackgroundProcessor::start(
|
||||||
persister,
|
persister,
|
||||||
event_handler,
|
event_handler,
|
||||||
|
@ -2021,13 +2031,16 @@ mod tests {
|
||||||
// Set up a background event handler for FundingGenerationReady events.
|
// Set up a background event handler for FundingGenerationReady events.
|
||||||
let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
|
let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
|
||||||
let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
|
let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
|
||||||
let event_handler = move |event: Event| match event {
|
let event_handler = move |event: Event| {
|
||||||
Event::FundingGenerationReady { .. } => funding_generation_send
|
match event {
|
||||||
.send(handle_funding_generation_ready!(event, channel_value))
|
Event::FundingGenerationReady { .. } => funding_generation_send
|
||||||
.unwrap(),
|
.send(handle_funding_generation_ready!(event, channel_value))
|
||||||
Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
|
.unwrap(),
|
||||||
Event::ChannelReady { .. } => {},
|
Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
|
||||||
_ => panic!("Unexpected event: {:?}", event),
|
Event::ChannelReady { .. } => {},
|
||||||
|
_ => panic!("Unexpected event: {:?}", event),
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
};
|
};
|
||||||
|
|
||||||
let bg_processor = BackgroundProcessor::start(
|
let bg_processor = BackgroundProcessor::start(
|
||||||
|
@ -2082,11 +2095,14 @@ mod tests {
|
||||||
|
|
||||||
// Set up a background event handler for SpendableOutputs events.
|
// Set up a background event handler for SpendableOutputs events.
|
||||||
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
|
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
|
||||||
let event_handler = move |event: Event| match event {
|
let event_handler = move |event: Event| {
|
||||||
Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
|
match event {
|
||||||
Event::ChannelReady { .. } => {},
|
Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
|
||||||
Event::ChannelClosed { .. } => {},
|
Event::ChannelReady { .. } => {},
|
||||||
_ => panic!("Unexpected event: {:?}", event),
|
Event::ChannelClosed { .. } => {},
|
||||||
|
_ => panic!("Unexpected event: {:?}", event),
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
};
|
};
|
||||||
let persister = Arc::new(Persister::new(data_dir));
|
let persister = Arc::new(Persister::new(data_dir));
|
||||||
let bg_processor = BackgroundProcessor::start(
|
let bg_processor = BackgroundProcessor::start(
|
||||||
|
@ -2215,12 +2231,60 @@ mod tests {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_event_handling_failures_are_replayed() {
|
||||||
|
let (_, nodes) = create_nodes(2, "test_event_handling_failures_are_replayed");
|
||||||
|
let channel_value = 100000;
|
||||||
|
let data_dir = nodes[0].kv_store.get_data_dir();
|
||||||
|
let persister = Arc::new(Persister::new(data_dir.clone()));
|
||||||
|
|
||||||
|
let (first_event_send, first_event_recv) = std::sync::mpsc::sync_channel(1);
|
||||||
|
let (second_event_send, second_event_recv) = std::sync::mpsc::sync_channel(1);
|
||||||
|
let should_fail_event_handling = Arc::new(AtomicBool::new(true));
|
||||||
|
let event_handler = move |event: Event| {
|
||||||
|
if let Ok(true) = should_fail_event_handling.compare_exchange(
|
||||||
|
true,
|
||||||
|
false,
|
||||||
|
Ordering::Acquire,
|
||||||
|
Ordering::Relaxed,
|
||||||
|
) {
|
||||||
|
first_event_send.send(event).unwrap();
|
||||||
|
return Err(ReplayEvent());
|
||||||
|
}
|
||||||
|
|
||||||
|
second_event_send.send(event).unwrap();
|
||||||
|
Ok(())
|
||||||
|
};
|
||||||
|
|
||||||
|
let bg_processor = BackgroundProcessor::start(
|
||||||
|
persister,
|
||||||
|
event_handler,
|
||||||
|
nodes[0].chain_monitor.clone(),
|
||||||
|
nodes[0].node.clone(),
|
||||||
|
Some(nodes[0].messenger.clone()),
|
||||||
|
nodes[0].no_gossip_sync(),
|
||||||
|
nodes[0].peer_manager.clone(),
|
||||||
|
nodes[0].logger.clone(),
|
||||||
|
Some(nodes[0].scorer.clone()),
|
||||||
|
);
|
||||||
|
|
||||||
|
begin_open_channel!(nodes[0], nodes[1], channel_value);
|
||||||
|
assert_eq!(
|
||||||
|
first_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE)),
|
||||||
|
second_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
|
||||||
|
);
|
||||||
|
|
||||||
|
if !std::thread::panicking() {
|
||||||
|
bg_processor.stop().unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_scorer_persistence() {
|
fn test_scorer_persistence() {
|
||||||
let (_, nodes) = create_nodes(2, "test_scorer_persistence");
|
let (_, nodes) = create_nodes(2, "test_scorer_persistence");
|
||||||
let data_dir = nodes[0].kv_store.get_data_dir();
|
let data_dir = nodes[0].kv_store.get_data_dir();
|
||||||
let persister = Arc::new(Persister::new(data_dir));
|
let persister = Arc::new(Persister::new(data_dir));
|
||||||
let event_handler = |_: _| {};
|
let event_handler = |_: _| Ok(());
|
||||||
let bg_processor = BackgroundProcessor::start(
|
let bg_processor = BackgroundProcessor::start(
|
||||||
persister,
|
persister,
|
||||||
event_handler,
|
event_handler,
|
||||||
|
@ -2315,7 +2379,7 @@ mod tests {
|
||||||
let data_dir = nodes[0].kv_store.get_data_dir();
|
let data_dir = nodes[0].kv_store.get_data_dir();
|
||||||
let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
|
let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
|
||||||
|
|
||||||
let event_handler = |_: _| {};
|
let event_handler = |_: _| Ok(());
|
||||||
let background_processor = BackgroundProcessor::start(
|
let background_processor = BackgroundProcessor::start(
|
||||||
persister,
|
persister,
|
||||||
event_handler,
|
event_handler,
|
||||||
|
@ -2350,7 +2414,7 @@ mod tests {
|
||||||
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
|
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
|
||||||
let bp_future = super::process_events_async(
|
let bp_future = super::process_events_async(
|
||||||
persister,
|
persister,
|
||||||
|_: _| async {},
|
|_: _| async { Ok(()) },
|
||||||
nodes[0].chain_monitor.clone(),
|
nodes[0].chain_monitor.clone(),
|
||||||
nodes[0].node.clone(),
|
nodes[0].node.clone(),
|
||||||
Some(nodes[0].messenger.clone()),
|
Some(nodes[0].messenger.clone()),
|
||||||
|
@ -2492,12 +2556,15 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn test_payment_path_scoring() {
|
fn test_payment_path_scoring() {
|
||||||
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
|
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
|
||||||
let event_handler = move |event: Event| match event {
|
let event_handler = move |event: Event| {
|
||||||
Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
|
match event {
|
||||||
Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
|
Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
|
||||||
Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
|
Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
|
||||||
Event::ProbeFailed { .. } => sender.send(event).unwrap(),
|
Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
|
||||||
_ => panic!("Unexpected event: {:?}", event),
|
Event::ProbeFailed { .. } => sender.send(event).unwrap(),
|
||||||
|
_ => panic!("Unexpected event: {:?}", event),
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
};
|
};
|
||||||
|
|
||||||
let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
|
let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
|
||||||
|
@ -2543,6 +2610,7 @@ mod tests {
|
||||||
Event::ProbeFailed { .. } => sender_ref.send(event).await.unwrap(),
|
Event::ProbeFailed { .. } => sender_ref.send(event).await.unwrap(),
|
||||||
_ => panic!("Unexpected event: {:?}", event),
|
_ => panic!("Unexpected event: {:?}", event),
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -1391,6 +1391,7 @@ mod test {
|
||||||
} else {
|
} else {
|
||||||
other_events.borrow_mut().push(event);
|
other_events.borrow_mut().push(event);
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
};
|
};
|
||||||
nodes[fwd_idx].node.process_pending_events(&forward_event_handler);
|
nodes[fwd_idx].node.process_pending_events(&forward_event_handler);
|
||||||
nodes[fwd_idx].node.process_pending_events(&forward_event_handler);
|
nodes[fwd_idx].node.process_pending_events(&forward_event_handler);
|
||||||
|
|
|
@ -33,8 +33,7 @@ use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance
|
||||||
use crate::chain::transaction::{OutPoint, TransactionData};
|
use crate::chain::transaction::{OutPoint, TransactionData};
|
||||||
use crate::ln::types::ChannelId;
|
use crate::ln::types::ChannelId;
|
||||||
use crate::sign::ecdsa::EcdsaChannelSigner;
|
use crate::sign::ecdsa::EcdsaChannelSigner;
|
||||||
use crate::events;
|
use crate::events::{self, Event, EventHandler, ReplayEvent};
|
||||||
use crate::events::{Event, EventHandler};
|
|
||||||
use crate::util::logger::{Logger, WithContext};
|
use crate::util::logger::{Logger, WithContext};
|
||||||
use crate::util::errors::APIError;
|
use crate::util::errors::APIError;
|
||||||
use crate::util::wakers::{Future, Notifier};
|
use crate::util::wakers::{Future, Notifier};
|
||||||
|
@ -533,7 +532,7 @@ where C::Target: chain::Filter,
|
||||||
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
|
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
|
||||||
use crate::events::EventsProvider;
|
use crate::events::EventsProvider;
|
||||||
let events = core::cell::RefCell::new(Vec::new());
|
let events = core::cell::RefCell::new(Vec::new());
|
||||||
let event_handler = |event: events::Event| events.borrow_mut().push(event);
|
let event_handler = |event: events::Event| Ok(events.borrow_mut().push(event));
|
||||||
self.process_pending_events(&event_handler);
|
self.process_pending_events(&event_handler);
|
||||||
events.into_inner()
|
events.into_inner()
|
||||||
}
|
}
|
||||||
|
@ -544,7 +543,7 @@ where C::Target: chain::Filter,
|
||||||
/// See the trait-level documentation of [`EventsProvider`] for requirements.
|
/// See the trait-level documentation of [`EventsProvider`] for requirements.
|
||||||
///
|
///
|
||||||
/// [`EventsProvider`]: crate::events::EventsProvider
|
/// [`EventsProvider`]: crate::events::EventsProvider
|
||||||
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
|
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>>, H: Fn(Event) -> Future>(
|
||||||
&self, handler: H
|
&self, handler: H
|
||||||
) {
|
) {
|
||||||
// Sadly we can't hold the monitors read lock through an async call. Thus we have to do a
|
// Sadly we can't hold the monitors read lock through an async call. Thus we have to do a
|
||||||
|
@ -552,8 +551,13 @@ where C::Target: chain::Filter,
|
||||||
let mons_to_process = self.monitors.read().unwrap().keys().cloned().collect::<Vec<_>>();
|
let mons_to_process = self.monitors.read().unwrap().keys().cloned().collect::<Vec<_>>();
|
||||||
for funding_txo in mons_to_process {
|
for funding_txo in mons_to_process {
|
||||||
let mut ev;
|
let mut ev;
|
||||||
super::channelmonitor::process_events_body!(
|
match super::channelmonitor::process_events_body!(
|
||||||
self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await);
|
self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await) {
|
||||||
|
Ok(()) => {},
|
||||||
|
Err(ReplayEvent ()) => {
|
||||||
|
self.event_notifier.notify();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -880,7 +884,12 @@ impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref,
|
||||||
/// [`BumpTransaction`]: events::Event::BumpTransaction
|
/// [`BumpTransaction`]: events::Event::BumpTransaction
|
||||||
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
|
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
|
||||||
for monitor_state in self.monitors.read().unwrap().values() {
|
for monitor_state in self.monitors.read().unwrap().values() {
|
||||||
monitor_state.monitor.process_pending_events(&handler);
|
match monitor_state.monitor.process_pending_events(&handler) {
|
||||||
|
Ok(()) => {},
|
||||||
|
Err(ReplayEvent ()) => {
|
||||||
|
self.event_notifier.notify();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,7 +51,7 @@ use crate::chain::Filter;
|
||||||
use crate::util::logger::{Logger, Record};
|
use crate::util::logger::{Logger, Record};
|
||||||
use crate::util::ser::{Readable, ReadableArgs, RequiredWrapper, MaybeReadable, UpgradableRequired, Writer, Writeable, U48};
|
use crate::util::ser::{Readable, ReadableArgs, RequiredWrapper, MaybeReadable, UpgradableRequired, Writer, Writeable, U48};
|
||||||
use crate::util::byte_utils;
|
use crate::util::byte_utils;
|
||||||
use crate::events::{ClosureReason, Event, EventHandler};
|
use crate::events::{ClosureReason, Event, EventHandler, ReplayEvent};
|
||||||
use crate::events::bump_transaction::{AnchorDescriptor, BumpTransactionEvent};
|
use crate::events::bump_transaction::{AnchorDescriptor, BumpTransactionEvent};
|
||||||
|
|
||||||
#[allow(unused_imports)]
|
#[allow(unused_imports)]
|
||||||
|
@ -1159,34 +1159,53 @@ impl<Signer: EcdsaChannelSigner> Writeable for ChannelMonitorImpl<Signer> {
|
||||||
macro_rules! _process_events_body {
|
macro_rules! _process_events_body {
|
||||||
($self_opt: expr, $event_to_handle: expr, $handle_event: expr) => {
|
($self_opt: expr, $event_to_handle: expr, $handle_event: expr) => {
|
||||||
loop {
|
loop {
|
||||||
|
let mut handling_res = Ok(());
|
||||||
let (pending_events, repeated_events);
|
let (pending_events, repeated_events);
|
||||||
if let Some(us) = $self_opt {
|
if let Some(us) = $self_opt {
|
||||||
let mut inner = us.inner.lock().unwrap();
|
let mut inner = us.inner.lock().unwrap();
|
||||||
if inner.is_processing_pending_events {
|
if inner.is_processing_pending_events {
|
||||||
break;
|
break handling_res;
|
||||||
}
|
}
|
||||||
inner.is_processing_pending_events = true;
|
inner.is_processing_pending_events = true;
|
||||||
|
|
||||||
pending_events = inner.pending_events.clone();
|
pending_events = inner.pending_events.clone();
|
||||||
repeated_events = inner.get_repeated_events();
|
repeated_events = inner.get_repeated_events();
|
||||||
} else { break; }
|
} else { break handling_res; }
|
||||||
let num_events = pending_events.len();
|
|
||||||
|
|
||||||
for event in pending_events.into_iter().chain(repeated_events.into_iter()) {
|
let mut num_handled_events = 0;
|
||||||
|
for event in pending_events {
|
||||||
$event_to_handle = event;
|
$event_to_handle = event;
|
||||||
$handle_event;
|
match $handle_event {
|
||||||
|
Ok(()) => num_handled_events += 1,
|
||||||
|
Err(e) => {
|
||||||
|
// If we encounter an error we stop handling events and make sure to replay
|
||||||
|
// any unhandled events on the next invocation.
|
||||||
|
handling_res = Err(e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if handling_res.is_ok() {
|
||||||
|
for event in repeated_events {
|
||||||
|
// For repeated events we ignore any errors as they will be replayed eventually
|
||||||
|
// anyways.
|
||||||
|
$event_to_handle = event;
|
||||||
|
let _ = $handle_event;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(us) = $self_opt {
|
if let Some(us) = $self_opt {
|
||||||
let mut inner = us.inner.lock().unwrap();
|
let mut inner = us.inner.lock().unwrap();
|
||||||
inner.pending_events.drain(..num_events);
|
inner.pending_events.drain(..num_handled_events);
|
||||||
inner.is_processing_pending_events = false;
|
inner.is_processing_pending_events = false;
|
||||||
if !inner.pending_events.is_empty() {
|
if handling_res.is_ok() && !inner.pending_events.is_empty() {
|
||||||
// If there's more events to process, go ahead and do so.
|
// If there's more events to process and we didn't fail so far, go ahead and do
|
||||||
|
// so.
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break handling_res;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1498,21 +1517,23 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
|
||||||
/// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
|
/// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
|
||||||
/// order to handle these events.
|
/// order to handle these events.
|
||||||
///
|
///
|
||||||
|
/// Will return a [`ReplayEvent`] error if event handling failed and should eventually be retried.
|
||||||
|
///
|
||||||
/// [`SpendableOutputs`]: crate::events::Event::SpendableOutputs
|
/// [`SpendableOutputs`]: crate::events::Event::SpendableOutputs
|
||||||
/// [`BumpTransaction`]: crate::events::Event::BumpTransaction
|
/// [`BumpTransaction`]: crate::events::Event::BumpTransaction
|
||||||
pub fn process_pending_events<H: Deref>(&self, handler: &H) where H::Target: EventHandler {
|
pub fn process_pending_events<H: Deref>(&self, handler: &H) -> Result<(), ReplayEvent> where H::Target: EventHandler {
|
||||||
let mut ev;
|
let mut ev;
|
||||||
process_events_body!(Some(self), ev, handler.handle_event(ev));
|
process_events_body!(Some(self), ev, handler.handle_event(ev))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Processes any events asynchronously.
|
/// Processes any events asynchronously.
|
||||||
///
|
///
|
||||||
/// See [`Self::process_pending_events`] for more information.
|
/// See [`Self::process_pending_events`] for more information.
|
||||||
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
|
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>>, H: Fn(Event) -> Future>(
|
||||||
&self, handler: &H
|
&self, handler: &H
|
||||||
) {
|
) -> Result<(), ReplayEvent> {
|
||||||
let mut ev;
|
let mut ev;
|
||||||
process_events_body!(Some(self), ev, { handler(ev).await });
|
process_events_body!(Some(self), ev, { handler(ev).await })
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
|
@ -551,6 +551,10 @@ pub enum Event {
|
||||||
/// Note that *all inputs* in the funding transaction must spend SegWit outputs or your
|
/// Note that *all inputs* in the funding transaction must spend SegWit outputs or your
|
||||||
/// counterparty can steal your funds!
|
/// counterparty can steal your funds!
|
||||||
///
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`), but won't be persisted across restarts.
|
||||||
|
///
|
||||||
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
|
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
|
||||||
/// [`ChannelManager::funding_transaction_generated`]: crate::ln::channelmanager::ChannelManager::funding_transaction_generated
|
/// [`ChannelManager::funding_transaction_generated`]: crate::ln::channelmanager::ChannelManager::funding_transaction_generated
|
||||||
FundingGenerationReady {
|
FundingGenerationReady {
|
||||||
|
@ -608,6 +612,10 @@ pub enum Event {
|
||||||
/// # Note
|
/// # Note
|
||||||
/// This event used to be called `PaymentReceived` in LDK versions 0.0.112 and earlier.
|
/// This event used to be called `PaymentReceived` in LDK versions 0.0.112 and earlier.
|
||||||
///
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`) and will be persisted across restarts.
|
||||||
|
///
|
||||||
/// [`ChannelManager::claim_funds`]: crate::ln::channelmanager::ChannelManager::claim_funds
|
/// [`ChannelManager::claim_funds`]: crate::ln::channelmanager::ChannelManager::claim_funds
|
||||||
/// [`ChannelManager::claim_funds_with_known_custom_tlvs`]: crate::ln::channelmanager::ChannelManager::claim_funds_with_known_custom_tlvs
|
/// [`ChannelManager::claim_funds_with_known_custom_tlvs`]: crate::ln::channelmanager::ChannelManager::claim_funds_with_known_custom_tlvs
|
||||||
/// [`FailureCode::InvalidOnionPayload`]: crate::ln::channelmanager::FailureCode::InvalidOnionPayload
|
/// [`FailureCode::InvalidOnionPayload`]: crate::ln::channelmanager::FailureCode::InvalidOnionPayload
|
||||||
|
@ -677,6 +685,10 @@ pub enum Event {
|
||||||
/// [`ChannelManager::claim_funds`] twice for the same [`Event::PaymentClaimable`] you may get
|
/// [`ChannelManager::claim_funds`] twice for the same [`Event::PaymentClaimable`] you may get
|
||||||
/// multiple `PaymentClaimed` events.
|
/// multiple `PaymentClaimed` events.
|
||||||
///
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`) and will be persisted across restarts.
|
||||||
|
///
|
||||||
/// [`ChannelManager::claim_funds`]: crate::ln::channelmanager::ChannelManager::claim_funds
|
/// [`ChannelManager::claim_funds`]: crate::ln::channelmanager::ChannelManager::claim_funds
|
||||||
PaymentClaimed {
|
PaymentClaimed {
|
||||||
/// The node that received the payment.
|
/// The node that received the payment.
|
||||||
|
@ -716,6 +728,10 @@ pub enum Event {
|
||||||
/// This event will not be generated for onion message forwards; only for sends including
|
/// This event will not be generated for onion message forwards; only for sends including
|
||||||
/// replies. Handlers should connect to the node otherwise any buffered messages may be lost.
|
/// replies. Handlers should connect to the node otherwise any buffered messages may be lost.
|
||||||
///
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`), but won't be persisted across restarts.
|
||||||
|
///
|
||||||
/// [`OnionMessage`]: msgs::OnionMessage
|
/// [`OnionMessage`]: msgs::OnionMessage
|
||||||
/// [`MessageRouter`]: crate::onion_message::messenger::MessageRouter
|
/// [`MessageRouter`]: crate::onion_message::messenger::MessageRouter
|
||||||
/// [`Destination`]: crate::onion_message::messenger::Destination
|
/// [`Destination`]: crate::onion_message::messenger::Destination
|
||||||
|
@ -730,6 +746,10 @@ pub enum Event {
|
||||||
/// or was explicitly abandoned by [`ChannelManager::abandon_payment`]. This may be for an
|
/// or was explicitly abandoned by [`ChannelManager::abandon_payment`]. This may be for an
|
||||||
/// [`InvoiceRequest`] sent for an [`Offer`] or for a [`Refund`] that hasn't been redeemed.
|
/// [`InvoiceRequest`] sent for an [`Offer`] or for a [`Refund`] that hasn't been redeemed.
|
||||||
///
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`) and will be persisted across restarts.
|
||||||
|
///
|
||||||
/// [`ChannelManager::abandon_payment`]: crate::ln::channelmanager::ChannelManager::abandon_payment
|
/// [`ChannelManager::abandon_payment`]: crate::ln::channelmanager::ChannelManager::abandon_payment
|
||||||
/// [`InvoiceRequest`]: crate::offers::invoice_request::InvoiceRequest
|
/// [`InvoiceRequest`]: crate::offers::invoice_request::InvoiceRequest
|
||||||
/// [`Offer`]: crate::offers::offer::Offer
|
/// [`Offer`]: crate::offers::offer::Offer
|
||||||
|
@ -746,6 +766,10 @@ pub enum Event {
|
||||||
/// [`ChannelManager::abandon_payment`] to abandon the associated payment. See those docs for
|
/// [`ChannelManager::abandon_payment`] to abandon the associated payment. See those docs for
|
||||||
/// further details.
|
/// further details.
|
||||||
///
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`) and will be persisted across restarts.
|
||||||
|
///
|
||||||
/// [`InvoiceRequest`]: crate::offers::invoice_request::InvoiceRequest
|
/// [`InvoiceRequest`]: crate::offers::invoice_request::InvoiceRequest
|
||||||
/// [`Refund`]: crate::offers::refund::Refund
|
/// [`Refund`]: crate::offers::refund::Refund
|
||||||
/// [`UserConfig::manually_handle_bolt12_invoices`]: crate::util::config::UserConfig::manually_handle_bolt12_invoices
|
/// [`UserConfig::manually_handle_bolt12_invoices`]: crate::util::config::UserConfig::manually_handle_bolt12_invoices
|
||||||
|
@ -768,6 +792,10 @@ pub enum Event {
|
||||||
///
|
///
|
||||||
/// Note for MPP payments: in rare cases, this event may be preceded by a `PaymentPathFailed`
|
/// Note for MPP payments: in rare cases, this event may be preceded by a `PaymentPathFailed`
|
||||||
/// event. In this situation, you SHOULD treat this payment as having succeeded.
|
/// event. In this situation, you SHOULD treat this payment as having succeeded.
|
||||||
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`) and will be persisted across restarts.
|
||||||
PaymentSent {
|
PaymentSent {
|
||||||
/// The `payment_id` passed to [`ChannelManager::send_payment`].
|
/// The `payment_id` passed to [`ChannelManager::send_payment`].
|
||||||
///
|
///
|
||||||
|
@ -806,6 +834,10 @@ pub enum Event {
|
||||||
/// received and processed. In this case, the [`Event::PaymentFailed`] event MUST be ignored,
|
/// received and processed. In this case, the [`Event::PaymentFailed`] event MUST be ignored,
|
||||||
/// and the payment MUST be treated as having succeeded.
|
/// and the payment MUST be treated as having succeeded.
|
||||||
///
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`) and will be persisted across restarts.
|
||||||
|
///
|
||||||
/// [`Retry`]: crate::ln::channelmanager::Retry
|
/// [`Retry`]: crate::ln::channelmanager::Retry
|
||||||
/// [`ChannelManager::abandon_payment`]: crate::ln::channelmanager::ChannelManager::abandon_payment
|
/// [`ChannelManager::abandon_payment`]: crate::ln::channelmanager::ChannelManager::abandon_payment
|
||||||
PaymentFailed {
|
PaymentFailed {
|
||||||
|
@ -825,6 +857,10 @@ pub enum Event {
|
||||||
///
|
///
|
||||||
/// Always generated after [`Event::PaymentSent`] and thus useful for scoring channels. See
|
/// Always generated after [`Event::PaymentSent`] and thus useful for scoring channels. See
|
||||||
/// [`Event::PaymentSent`] for obtaining the payment preimage.
|
/// [`Event::PaymentSent`] for obtaining the payment preimage.
|
||||||
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`) and will be persisted across restarts.
|
||||||
PaymentPathSuccessful {
|
PaymentPathSuccessful {
|
||||||
/// The `payment_id` passed to [`ChannelManager::send_payment`].
|
/// The `payment_id` passed to [`ChannelManager::send_payment`].
|
||||||
///
|
///
|
||||||
|
@ -850,6 +886,10 @@ pub enum Event {
|
||||||
/// See [`ChannelManager::abandon_payment`] for giving up on this payment before its retries have
|
/// See [`ChannelManager::abandon_payment`] for giving up on this payment before its retries have
|
||||||
/// been exhausted.
|
/// been exhausted.
|
||||||
///
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`) and will be persisted across restarts.
|
||||||
|
///
|
||||||
/// [`ChannelManager::abandon_payment`]: crate::ln::channelmanager::ChannelManager::abandon_payment
|
/// [`ChannelManager::abandon_payment`]: crate::ln::channelmanager::ChannelManager::abandon_payment
|
||||||
PaymentPathFailed {
|
PaymentPathFailed {
|
||||||
/// The `payment_id` passed to [`ChannelManager::send_payment`].
|
/// The `payment_id` passed to [`ChannelManager::send_payment`].
|
||||||
|
@ -889,6 +929,10 @@ pub enum Event {
|
||||||
error_data: Option<Vec<u8>>,
|
error_data: Option<Vec<u8>>,
|
||||||
},
|
},
|
||||||
/// Indicates that a probe payment we sent returned successful, i.e., only failed at the destination.
|
/// Indicates that a probe payment we sent returned successful, i.e., only failed at the destination.
|
||||||
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`) and will be persisted across restarts.
|
||||||
ProbeSuccessful {
|
ProbeSuccessful {
|
||||||
/// The id returned by [`ChannelManager::send_probe`].
|
/// The id returned by [`ChannelManager::send_probe`].
|
||||||
///
|
///
|
||||||
|
@ -902,6 +946,10 @@ pub enum Event {
|
||||||
path: Path,
|
path: Path,
|
||||||
},
|
},
|
||||||
/// Indicates that a probe payment we sent failed at an intermediary node on the path.
|
/// Indicates that a probe payment we sent failed at an intermediary node on the path.
|
||||||
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`) and will be persisted across restarts.
|
||||||
ProbeFailed {
|
ProbeFailed {
|
||||||
/// The id returned by [`ChannelManager::send_probe`].
|
/// The id returned by [`ChannelManager::send_probe`].
|
||||||
///
|
///
|
||||||
|
@ -923,6 +971,10 @@ pub enum Event {
|
||||||
/// Used to indicate that [`ChannelManager::process_pending_htlc_forwards`] should be called at
|
/// Used to indicate that [`ChannelManager::process_pending_htlc_forwards`] should be called at
|
||||||
/// a time in the future.
|
/// a time in the future.
|
||||||
///
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`) and will be regenerated after restarts.
|
||||||
|
///
|
||||||
/// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards
|
/// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards
|
||||||
PendingHTLCsForwardable {
|
PendingHTLCsForwardable {
|
||||||
/// The minimum amount of time that should be waited prior to calling
|
/// The minimum amount of time that should be waited prior to calling
|
||||||
|
@ -939,6 +991,10 @@ pub enum Event {
|
||||||
/// [`ChannelManager::fail_intercepted_htlc`] MUST be called in response to this event. See
|
/// [`ChannelManager::fail_intercepted_htlc`] MUST be called in response to this event. See
|
||||||
/// their docs for more information.
|
/// their docs for more information.
|
||||||
///
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`) and will be persisted across restarts.
|
||||||
|
///
|
||||||
/// [`ChannelManager::get_intercept_scid`]: crate::ln::channelmanager::ChannelManager::get_intercept_scid
|
/// [`ChannelManager::get_intercept_scid`]: crate::ln::channelmanager::ChannelManager::get_intercept_scid
|
||||||
/// [`UserConfig::accept_intercept_htlcs`]: crate::util::config::UserConfig::accept_intercept_htlcs
|
/// [`UserConfig::accept_intercept_htlcs`]: crate::util::config::UserConfig::accept_intercept_htlcs
|
||||||
/// [`ChannelManager::forward_intercepted_htlc`]: crate::ln::channelmanager::ChannelManager::forward_intercepted_htlc
|
/// [`ChannelManager::forward_intercepted_htlc`]: crate::ln::channelmanager::ChannelManager::forward_intercepted_htlc
|
||||||
|
@ -974,6 +1030,10 @@ pub enum Event {
|
||||||
/// You may hand them to the [`OutputSweeper`] utility which will store and (re-)generate spending
|
/// You may hand them to the [`OutputSweeper`] utility which will store and (re-)generate spending
|
||||||
/// transactions for you.
|
/// transactions for you.
|
||||||
///
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`) and will be persisted across restarts.
|
||||||
|
///
|
||||||
/// [`OutputSweeper`]: crate::util::sweep::OutputSweeper
|
/// [`OutputSweeper`]: crate::util::sweep::OutputSweeper
|
||||||
SpendableOutputs {
|
SpendableOutputs {
|
||||||
/// The outputs which you should store as spendable by you.
|
/// The outputs which you should store as spendable by you.
|
||||||
|
@ -985,6 +1045,10 @@ pub enum Event {
|
||||||
},
|
},
|
||||||
/// This event is generated when a payment has been successfully forwarded through us and a
|
/// This event is generated when a payment has been successfully forwarded through us and a
|
||||||
/// forwarding fee earned.
|
/// forwarding fee earned.
|
||||||
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`) and will be persisted across restarts.
|
||||||
PaymentForwarded {
|
PaymentForwarded {
|
||||||
/// The channel id of the incoming channel between the previous node and us.
|
/// The channel id of the incoming channel between the previous node and us.
|
||||||
///
|
///
|
||||||
|
@ -1046,6 +1110,10 @@ pub enum Event {
|
||||||
/// This event is emitted when the funding transaction has been signed and is broadcast to the
|
/// This event is emitted when the funding transaction has been signed and is broadcast to the
|
||||||
/// network. For 0conf channels it will be immediately followed by the corresponding
|
/// network. For 0conf channels it will be immediately followed by the corresponding
|
||||||
/// [`Event::ChannelReady`] event.
|
/// [`Event::ChannelReady`] event.
|
||||||
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`) and will be persisted across restarts.
|
||||||
ChannelPending {
|
ChannelPending {
|
||||||
/// The `channel_id` of the channel that is pending confirmation.
|
/// The `channel_id` of the channel that is pending confirmation.
|
||||||
channel_id: ChannelId,
|
channel_id: ChannelId,
|
||||||
|
@ -1075,6 +1143,10 @@ pub enum Event {
|
||||||
/// be used. This event is emitted either when the funding transaction has been confirmed
|
/// be used. This event is emitted either when the funding transaction has been confirmed
|
||||||
/// on-chain, or, in case of a 0conf channel, when both parties have confirmed the channel
|
/// on-chain, or, in case of a 0conf channel, when both parties have confirmed the channel
|
||||||
/// establishment.
|
/// establishment.
|
||||||
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`) and will be persisted across restarts.
|
||||||
ChannelReady {
|
ChannelReady {
|
||||||
/// The `channel_id` of the channel that is ready.
|
/// The `channel_id` of the channel that is ready.
|
||||||
channel_id: ChannelId,
|
channel_id: ChannelId,
|
||||||
|
@ -1101,6 +1173,10 @@ pub enum Event {
|
||||||
///
|
///
|
||||||
/// [`ChannelManager::accept_inbound_channel`]: crate::ln::channelmanager::ChannelManager::accept_inbound_channel
|
/// [`ChannelManager::accept_inbound_channel`]: crate::ln::channelmanager::ChannelManager::accept_inbound_channel
|
||||||
/// [`UserConfig::manually_accept_inbound_channels`]: crate::util::config::UserConfig::manually_accept_inbound_channels
|
/// [`UserConfig::manually_accept_inbound_channels`]: crate::util::config::UserConfig::manually_accept_inbound_channels
|
||||||
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`) and will be persisted across restarts.
|
||||||
ChannelClosed {
|
ChannelClosed {
|
||||||
/// The `channel_id` of the channel which has been closed. Note that on-chain transactions
|
/// The `channel_id` of the channel which has been closed. Note that on-chain transactions
|
||||||
/// resolving the channel are likely still awaiting confirmation.
|
/// resolving the channel are likely still awaiting confirmation.
|
||||||
|
@ -1135,6 +1211,10 @@ pub enum Event {
|
||||||
/// inputs for another purpose.
|
/// inputs for another purpose.
|
||||||
///
|
///
|
||||||
/// This event is not guaranteed to be generated for channels that are closed due to a restart.
|
/// This event is not guaranteed to be generated for channels that are closed due to a restart.
|
||||||
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`) and will be persisted across restarts.
|
||||||
DiscardFunding {
|
DiscardFunding {
|
||||||
/// The channel_id of the channel which has been closed.
|
/// The channel_id of the channel which has been closed.
|
||||||
channel_id: ChannelId,
|
channel_id: ChannelId,
|
||||||
|
@ -1150,6 +1230,10 @@ pub enum Event {
|
||||||
/// The event is only triggered when a new open channel request is received and the
|
/// The event is only triggered when a new open channel request is received and the
|
||||||
/// [`UserConfig::manually_accept_inbound_channels`] config flag is set to true.
|
/// [`UserConfig::manually_accept_inbound_channels`] config flag is set to true.
|
||||||
///
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`) and will be persisted across restarts.
|
||||||
|
///
|
||||||
/// [`ChannelManager::accept_inbound_channel`]: crate::ln::channelmanager::ChannelManager::accept_inbound_channel
|
/// [`ChannelManager::accept_inbound_channel`]: crate::ln::channelmanager::ChannelManager::accept_inbound_channel
|
||||||
/// [`ChannelManager::force_close_without_broadcasting_txn`]: crate::ln::channelmanager::ChannelManager::force_close_without_broadcasting_txn
|
/// [`ChannelManager::force_close_without_broadcasting_txn`]: crate::ln::channelmanager::ChannelManager::force_close_without_broadcasting_txn
|
||||||
/// [`UserConfig::manually_accept_inbound_channels`]: crate::util::config::UserConfig::manually_accept_inbound_channels
|
/// [`UserConfig::manually_accept_inbound_channels`]: crate::util::config::UserConfig::manually_accept_inbound_channels
|
||||||
|
@ -1206,6 +1290,10 @@ pub enum Event {
|
||||||
///
|
///
|
||||||
/// This event, however, does not get generated if an HTLC fails to meet the forwarding
|
/// This event, however, does not get generated if an HTLC fails to meet the forwarding
|
||||||
/// requirements (i.e. insufficient fees paid, or a CLTV that is too soon).
|
/// requirements (i.e. insufficient fees paid, or a CLTV that is too soon).
|
||||||
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`) and will be persisted across restarts.
|
||||||
HTLCHandlingFailed {
|
HTLCHandlingFailed {
|
||||||
/// The channel over which the HTLC was received.
|
/// The channel over which the HTLC was received.
|
||||||
prev_channel_id: ChannelId,
|
prev_channel_id: ChannelId,
|
||||||
|
@ -1219,6 +1307,10 @@ pub enum Event {
|
||||||
/// [`ChannelHandshakeConfig::negotiate_anchors_zero_fee_htlc_tx`] config flag is set to true.
|
/// [`ChannelHandshakeConfig::negotiate_anchors_zero_fee_htlc_tx`] config flag is set to true.
|
||||||
/// It is limited to the scope of channels with anchor outputs.
|
/// It is limited to the scope of channels with anchor outputs.
|
||||||
///
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`), but will only be regenerated as needed after restarts.
|
||||||
|
///
|
||||||
/// [`ChannelHandshakeConfig::negotiate_anchors_zero_fee_htlc_tx`]: crate::util::config::ChannelHandshakeConfig::negotiate_anchors_zero_fee_htlc_tx
|
/// [`ChannelHandshakeConfig::negotiate_anchors_zero_fee_htlc_tx`]: crate::util::config::ChannelHandshakeConfig::negotiate_anchors_zero_fee_htlc_tx
|
||||||
BumpTransaction(BumpTransactionEvent),
|
BumpTransaction(BumpTransactionEvent),
|
||||||
/// We received an onion message that is intended to be forwarded to a peer
|
/// We received an onion message that is intended to be forwarded to a peer
|
||||||
|
@ -1226,6 +1318,10 @@ pub enum Event {
|
||||||
/// `OnionMessenger` was initialized with
|
/// `OnionMessenger` was initialized with
|
||||||
/// [`OnionMessenger::new_with_offline_peer_interception`], see its docs.
|
/// [`OnionMessenger::new_with_offline_peer_interception`], see its docs.
|
||||||
///
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`), but won't be persisted across restarts.
|
||||||
|
///
|
||||||
/// [`OnionMessenger::new_with_offline_peer_interception`]: crate::onion_message::messenger::OnionMessenger::new_with_offline_peer_interception
|
/// [`OnionMessenger::new_with_offline_peer_interception`]: crate::onion_message::messenger::OnionMessenger::new_with_offline_peer_interception
|
||||||
OnionMessageIntercepted {
|
OnionMessageIntercepted {
|
||||||
/// The node id of the offline peer.
|
/// The node id of the offline peer.
|
||||||
|
@ -1239,6 +1335,10 @@ pub enum Event {
|
||||||
/// initialized with
|
/// initialized with
|
||||||
/// [`OnionMessenger::new_with_offline_peer_interception`], see its docs.
|
/// [`OnionMessenger::new_with_offline_peer_interception`], see its docs.
|
||||||
///
|
///
|
||||||
|
/// # Failure Behavior and Persistence
|
||||||
|
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||||
|
/// returning `Err(ReplayEvent ())`), but won't be persisted across restarts.
|
||||||
|
///
|
||||||
/// [`OnionMessenger::new_with_offline_peer_interception`]: crate::onion_message::messenger::OnionMessenger::new_with_offline_peer_interception
|
/// [`OnionMessenger::new_with_offline_peer_interception`]: crate::onion_message::messenger::OnionMessenger::new_with_offline_peer_interception
|
||||||
OnionMessagePeerConnected {
|
OnionMessagePeerConnected {
|
||||||
/// The node id of the peer we just connected to, who advertises support for
|
/// The node id of the peer we just connected to, who advertises support for
|
||||||
|
@ -2300,8 +2400,12 @@ pub trait MessageSendEventsProvider {
|
||||||
///
|
///
|
||||||
/// In order to ensure no [`Event`]s are lost, implementors of this trait will persist [`Event`]s
|
/// In order to ensure no [`Event`]s are lost, implementors of this trait will persist [`Event`]s
|
||||||
/// and replay any unhandled events on startup. An [`Event`] is considered handled when
|
/// and replay any unhandled events on startup. An [`Event`] is considered handled when
|
||||||
/// [`process_pending_events`] returns, thus handlers MUST fully handle [`Event`]s and persist any
|
/// [`process_pending_events`] returns `Ok(())`, thus handlers MUST fully handle [`Event`]s and
|
||||||
/// relevant changes to disk *before* returning.
|
/// persist any relevant changes to disk *before* returning `Ok(())`. In case of an error (e.g.,
|
||||||
|
/// persistence failure) implementors should return `Err(ReplayEvent())`, signalling to the
|
||||||
|
/// [`EventsProvider`] to replay unhandled events on the next invocation (generally immediately).
|
||||||
|
/// Note that some events might not be replayed, please refer to the documentation for
|
||||||
|
/// the individual [`Event`] variants for more detail.
|
||||||
///
|
///
|
||||||
/// Further, because an application may crash between an [`Event`] being handled and the
|
/// Further, because an application may crash between an [`Event`] being handled and the
|
||||||
/// implementor of this trait being re-serialized, [`Event`] handling must be idempotent - in
|
/// implementor of this trait being re-serialized, [`Event`] handling must be idempotent - in
|
||||||
|
@ -2328,26 +2432,34 @@ pub trait EventsProvider {
|
||||||
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler;
|
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// An error type that may be returned to LDK in order to safely abort event handling if it can't
|
||||||
|
/// currently succeed (e.g., due to a persistence failure).
|
||||||
|
///
|
||||||
|
/// Depending on the type, LDK may ensure the event is persisted and will eventually be replayed.
|
||||||
|
/// Please refer to the documentation of each [`Event`] variant for more details.
|
||||||
|
#[derive(Clone, Copy, Debug)]
|
||||||
|
pub struct ReplayEvent();
|
||||||
|
|
||||||
/// A trait implemented for objects handling events from [`EventsProvider`].
|
/// A trait implemented for objects handling events from [`EventsProvider`].
|
||||||
///
|
///
|
||||||
/// An async variation also exists for implementations of [`EventsProvider`] that support async
|
/// An async variation also exists for implementations of [`EventsProvider`] that support async
|
||||||
/// event handling. The async event handler should satisfy the generic bounds: `F:
|
/// event handling. The async event handler should satisfy the generic bounds: `F:
|
||||||
/// core::future::Future, H: Fn(Event) -> F`.
|
/// core::future::Future<Output = Result<(), ReplayEvent>>, H: Fn(Event) -> F`.
|
||||||
pub trait EventHandler {
|
pub trait EventHandler {
|
||||||
/// Handles the given [`Event`].
|
/// Handles the given [`Event`].
|
||||||
///
|
///
|
||||||
/// See [`EventsProvider`] for details that must be considered when implementing this method.
|
/// See [`EventsProvider`] for details that must be considered when implementing this method.
|
||||||
fn handle_event(&self, event: Event);
|
fn handle_event(&self, event: Event) -> Result<(), ReplayEvent>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F> EventHandler for F where F: Fn(Event) {
|
impl<F> EventHandler for F where F: Fn(Event) -> Result<(), ReplayEvent> {
|
||||||
fn handle_event(&self, event: Event) {
|
fn handle_event(&self, event: Event) -> Result<(), ReplayEvent> {
|
||||||
self(event)
|
self(event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: EventHandler> EventHandler for Arc<T> {
|
impl<T: EventHandler> EventHandler for Arc<T> {
|
||||||
fn handle_event(&self, event: Event) {
|
fn handle_event(&self, event: Event) -> Result<(), ReplayEvent> {
|
||||||
self.deref().handle_event(event)
|
self.deref().handle_event(event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,7 @@ use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, Fee
|
||||||
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, WithChannelMonitor, ChannelMonitorUpdateStep, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY, MonitorEvent, CLOSED_CHANNEL_UPDATE_ID};
|
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, WithChannelMonitor, ChannelMonitorUpdateStep, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY, MonitorEvent, CLOSED_CHANNEL_UPDATE_ID};
|
||||||
use crate::chain::transaction::{OutPoint, TransactionData};
|
use crate::chain::transaction::{OutPoint, TransactionData};
|
||||||
use crate::events;
|
use crate::events;
|
||||||
use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination, PaymentFailureReason};
|
use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination, PaymentFailureReason, ReplayEvent};
|
||||||
// Since this struct is returned in `list_channels` methods, expose it here in case users want to
|
// Since this struct is returned in `list_channels` methods, expose it here in case users want to
|
||||||
// construct one themselves.
|
// construct one themselves.
|
||||||
use crate::ln::inbound_payment;
|
use crate::ln::inbound_payment;
|
||||||
|
@ -1395,35 +1395,38 @@ where
|
||||||
/// }
|
/// }
|
||||||
///
|
///
|
||||||
/// // On the event processing thread once the peer has responded
|
/// // On the event processing thread once the peer has responded
|
||||||
/// channel_manager.process_pending_events(&|event| match event {
|
/// channel_manager.process_pending_events(&|event| {
|
||||||
/// Event::FundingGenerationReady {
|
/// match event {
|
||||||
/// temporary_channel_id, counterparty_node_id, channel_value_satoshis, output_script,
|
/// Event::FundingGenerationReady {
|
||||||
/// user_channel_id, ..
|
/// temporary_channel_id, counterparty_node_id, channel_value_satoshis, output_script,
|
||||||
/// } => {
|
/// user_channel_id, ..
|
||||||
/// assert_eq!(user_channel_id, 42);
|
/// } => {
|
||||||
/// let funding_transaction = wallet.create_funding_transaction(
|
/// assert_eq!(user_channel_id, 42);
|
||||||
/// channel_value_satoshis, output_script
|
/// let funding_transaction = wallet.create_funding_transaction(
|
||||||
/// );
|
/// channel_value_satoshis, output_script
|
||||||
/// match channel_manager.funding_transaction_generated(
|
/// );
|
||||||
/// &temporary_channel_id, &counterparty_node_id, funding_transaction
|
/// match channel_manager.funding_transaction_generated(
|
||||||
/// ) {
|
/// &temporary_channel_id, &counterparty_node_id, funding_transaction
|
||||||
/// Ok(()) => println!("Funding channel {}", temporary_channel_id),
|
/// ) {
|
||||||
/// Err(e) => println!("Error funding channel {}: {:?}", temporary_channel_id, e),
|
/// Ok(()) => println!("Funding channel {}", temporary_channel_id),
|
||||||
/// }
|
/// Err(e) => println!("Error funding channel {}: {:?}", temporary_channel_id, e),
|
||||||
/// },
|
/// }
|
||||||
/// Event::ChannelPending { channel_id, user_channel_id, former_temporary_channel_id, .. } => {
|
/// },
|
||||||
/// assert_eq!(user_channel_id, 42);
|
/// Event::ChannelPending { channel_id, user_channel_id, former_temporary_channel_id, .. } => {
|
||||||
/// println!(
|
/// assert_eq!(user_channel_id, 42);
|
||||||
/// "Channel {} now {} pending (funding transaction has been broadcasted)", channel_id,
|
/// println!(
|
||||||
/// former_temporary_channel_id.unwrap()
|
/// "Channel {} now {} pending (funding transaction has been broadcasted)", channel_id,
|
||||||
/// );
|
/// former_temporary_channel_id.unwrap()
|
||||||
/// },
|
/// );
|
||||||
/// Event::ChannelReady { channel_id, user_channel_id, .. } => {
|
/// },
|
||||||
/// assert_eq!(user_channel_id, 42);
|
/// Event::ChannelReady { channel_id, user_channel_id, .. } => {
|
||||||
/// println!("Channel {} ready", channel_id);
|
/// assert_eq!(user_channel_id, 42);
|
||||||
/// },
|
/// println!("Channel {} ready", channel_id);
|
||||||
/// // ...
|
/// },
|
||||||
/// # _ => {},
|
/// // ...
|
||||||
|
/// # _ => {},
|
||||||
|
/// }
|
||||||
|
/// Ok(())
|
||||||
/// });
|
/// });
|
||||||
/// # }
|
/// # }
|
||||||
/// ```
|
/// ```
|
||||||
|
@ -1447,28 +1450,31 @@ where
|
||||||
/// # fn example<T: AChannelManager>(channel_manager: T) {
|
/// # fn example<T: AChannelManager>(channel_manager: T) {
|
||||||
/// # let channel_manager = channel_manager.get_cm();
|
/// # let channel_manager = channel_manager.get_cm();
|
||||||
/// # let error_message = "Channel force-closed";
|
/// # let error_message = "Channel force-closed";
|
||||||
/// channel_manager.process_pending_events(&|event| match event {
|
/// channel_manager.process_pending_events(&|event| {
|
||||||
/// Event::OpenChannelRequest { temporary_channel_id, counterparty_node_id, .. } => {
|
/// match event {
|
||||||
/// if !is_trusted(counterparty_node_id) {
|
/// Event::OpenChannelRequest { temporary_channel_id, counterparty_node_id, .. } => {
|
||||||
/// match channel_manager.force_close_without_broadcasting_txn(
|
/// if !is_trusted(counterparty_node_id) {
|
||||||
/// &temporary_channel_id, &counterparty_node_id, error_message.to_string()
|
/// match channel_manager.force_close_without_broadcasting_txn(
|
||||||
/// ) {
|
/// &temporary_channel_id, &counterparty_node_id, error_message.to_string()
|
||||||
/// Ok(()) => println!("Rejecting channel {}", temporary_channel_id),
|
/// ) {
|
||||||
/// Err(e) => println!("Error rejecting channel {}: {:?}", temporary_channel_id, e),
|
/// Ok(()) => println!("Rejecting channel {}", temporary_channel_id),
|
||||||
|
/// Err(e) => println!("Error rejecting channel {}: {:?}", temporary_channel_id, e),
|
||||||
|
/// }
|
||||||
|
/// return Ok(());
|
||||||
/// }
|
/// }
|
||||||
/// return;
|
|
||||||
/// }
|
|
||||||
///
|
///
|
||||||
/// let user_channel_id = 43;
|
/// let user_channel_id = 43;
|
||||||
/// match channel_manager.accept_inbound_channel(
|
/// match channel_manager.accept_inbound_channel(
|
||||||
/// &temporary_channel_id, &counterparty_node_id, user_channel_id
|
/// &temporary_channel_id, &counterparty_node_id, user_channel_id
|
||||||
/// ) {
|
/// ) {
|
||||||
/// Ok(()) => println!("Accepting channel {}", temporary_channel_id),
|
/// Ok(()) => println!("Accepting channel {}", temporary_channel_id),
|
||||||
/// Err(e) => println!("Error accepting channel {}: {:?}", temporary_channel_id, e),
|
/// Err(e) => println!("Error accepting channel {}: {:?}", temporary_channel_id, e),
|
||||||
/// }
|
/// }
|
||||||
/// },
|
/// },
|
||||||
/// // ...
|
/// // ...
|
||||||
/// # _ => {},
|
/// # _ => {},
|
||||||
|
/// }
|
||||||
|
/// Ok(())
|
||||||
/// });
|
/// });
|
||||||
/// # }
|
/// # }
|
||||||
/// ```
|
/// ```
|
||||||
|
@ -1497,13 +1503,16 @@ where
|
||||||
/// }
|
/// }
|
||||||
///
|
///
|
||||||
/// // On the event processing thread
|
/// // On the event processing thread
|
||||||
/// channel_manager.process_pending_events(&|event| match event {
|
/// channel_manager.process_pending_events(&|event| {
|
||||||
/// Event::ChannelClosed { channel_id, user_channel_id, .. } => {
|
/// match event {
|
||||||
/// assert_eq!(user_channel_id, 42);
|
/// Event::ChannelClosed { channel_id, user_channel_id, .. } => {
|
||||||
/// println!("Channel {} closed", channel_id);
|
/// assert_eq!(user_channel_id, 42);
|
||||||
/// },
|
/// println!("Channel {} closed", channel_id);
|
||||||
/// // ...
|
/// },
|
||||||
/// # _ => {},
|
/// // ...
|
||||||
|
/// # _ => {},
|
||||||
|
/// }
|
||||||
|
/// Ok(())
|
||||||
/// });
|
/// });
|
||||||
/// # }
|
/// # }
|
||||||
/// ```
|
/// ```
|
||||||
|
@ -1553,30 +1562,33 @@ where
|
||||||
/// };
|
/// };
|
||||||
///
|
///
|
||||||
/// // On the event processing thread
|
/// // On the event processing thread
|
||||||
/// channel_manager.process_pending_events(&|event| match event {
|
/// channel_manager.process_pending_events(&|event| {
|
||||||
/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
|
/// match event {
|
||||||
/// PaymentPurpose::Bolt11InvoicePayment { payment_preimage: Some(payment_preimage), .. } => {
|
/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
|
||||||
|
/// PaymentPurpose::Bolt11InvoicePayment { payment_preimage: Some(payment_preimage), .. } => {
|
||||||
|
/// assert_eq!(payment_hash, known_payment_hash);
|
||||||
|
/// println!("Claiming payment {}", payment_hash);
|
||||||
|
/// channel_manager.claim_funds(payment_preimage);
|
||||||
|
/// },
|
||||||
|
/// PaymentPurpose::Bolt11InvoicePayment { payment_preimage: None, .. } => {
|
||||||
|
/// println!("Unknown payment hash: {}", payment_hash);
|
||||||
|
/// },
|
||||||
|
/// PaymentPurpose::SpontaneousPayment(payment_preimage) => {
|
||||||
|
/// assert_ne!(payment_hash, known_payment_hash);
|
||||||
|
/// println!("Claiming spontaneous payment {}", payment_hash);
|
||||||
|
/// channel_manager.claim_funds(payment_preimage);
|
||||||
|
/// },
|
||||||
|
/// // ...
|
||||||
|
/// # _ => {},
|
||||||
|
/// },
|
||||||
|
/// Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
|
||||||
/// assert_eq!(payment_hash, known_payment_hash);
|
/// assert_eq!(payment_hash, known_payment_hash);
|
||||||
/// println!("Claiming payment {}", payment_hash);
|
/// println!("Claimed {} msats", amount_msat);
|
||||||
/// channel_manager.claim_funds(payment_preimage);
|
|
||||||
/// },
|
|
||||||
/// PaymentPurpose::Bolt11InvoicePayment { payment_preimage: None, .. } => {
|
|
||||||
/// println!("Unknown payment hash: {}", payment_hash);
|
|
||||||
/// },
|
|
||||||
/// PaymentPurpose::SpontaneousPayment(payment_preimage) => {
|
|
||||||
/// assert_ne!(payment_hash, known_payment_hash);
|
|
||||||
/// println!("Claiming spontaneous payment {}", payment_hash);
|
|
||||||
/// channel_manager.claim_funds(payment_preimage);
|
|
||||||
/// },
|
/// },
|
||||||
/// // ...
|
/// // ...
|
||||||
/// # _ => {},
|
/// # _ => {},
|
||||||
/// },
|
/// }
|
||||||
/// Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
|
/// Ok(())
|
||||||
/// assert_eq!(payment_hash, known_payment_hash);
|
|
||||||
/// println!("Claimed {} msats", amount_msat);
|
|
||||||
/// },
|
|
||||||
/// // ...
|
|
||||||
/// # _ => {},
|
|
||||||
/// });
|
/// });
|
||||||
/// # }
|
/// # }
|
||||||
/// ```
|
/// ```
|
||||||
|
@ -1619,11 +1631,14 @@ where
|
||||||
/// );
|
/// );
|
||||||
///
|
///
|
||||||
/// // On the event processing thread
|
/// // On the event processing thread
|
||||||
/// channel_manager.process_pending_events(&|event| match event {
|
/// channel_manager.process_pending_events(&|event| {
|
||||||
/// Event::PaymentSent { payment_hash, .. } => println!("Paid {}", payment_hash),
|
/// match event {
|
||||||
/// Event::PaymentFailed { payment_hash, .. } => println!("Failed paying {}", payment_hash),
|
/// Event::PaymentSent { payment_hash, .. } => println!("Paid {}", payment_hash),
|
||||||
/// // ...
|
/// Event::PaymentFailed { payment_hash, .. } => println!("Failed paying {}", payment_hash),
|
||||||
/// # _ => {},
|
/// // ...
|
||||||
|
/// # _ => {},
|
||||||
|
/// }
|
||||||
|
/// Ok(())
|
||||||
/// });
|
/// });
|
||||||
/// # }
|
/// # }
|
||||||
/// ```
|
/// ```
|
||||||
|
@ -1657,23 +1672,25 @@ where
|
||||||
/// let bech32_offer = offer.to_string();
|
/// let bech32_offer = offer.to_string();
|
||||||
///
|
///
|
||||||
/// // On the event processing thread
|
/// // On the event processing thread
|
||||||
/// channel_manager.process_pending_events(&|event| match event {
|
/// channel_manager.process_pending_events(&|event| {
|
||||||
/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
|
/// match event {
|
||||||
/// PaymentPurpose::Bolt12OfferPayment { payment_preimage: Some(payment_preimage), .. } => {
|
/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
|
||||||
/// println!("Claiming payment {}", payment_hash);
|
/// PaymentPurpose::Bolt12OfferPayment { payment_preimage: Some(payment_preimage), .. } => {
|
||||||
/// channel_manager.claim_funds(payment_preimage);
|
/// println!("Claiming payment {}", payment_hash);
|
||||||
|
/// channel_manager.claim_funds(payment_preimage);
|
||||||
|
/// },
|
||||||
|
/// PaymentPurpose::Bolt12OfferPayment { payment_preimage: None, .. } => {
|
||||||
|
/// println!("Unknown payment hash: {}", payment_hash);
|
||||||
|
/// }
|
||||||
|
/// # _ => {},
|
||||||
/// },
|
/// },
|
||||||
/// PaymentPurpose::Bolt12OfferPayment { payment_preimage: None, .. } => {
|
/// Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
|
||||||
/// println!("Unknown payment hash: {}", payment_hash);
|
/// println!("Claimed {} msats", amount_msat);
|
||||||
/// },
|
/// },
|
||||||
/// // ...
|
/// // ...
|
||||||
/// # _ => {},
|
/// # _ => {},
|
||||||
/// },
|
/// }
|
||||||
/// Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
|
/// Ok(())
|
||||||
/// println!("Claimed {} msats", amount_msat);
|
|
||||||
/// },
|
|
||||||
/// // ...
|
|
||||||
/// # _ => {},
|
|
||||||
/// });
|
/// });
|
||||||
/// # Ok(())
|
/// # Ok(())
|
||||||
/// # }
|
/// # }
|
||||||
|
@ -1719,12 +1736,15 @@ where
|
||||||
/// );
|
/// );
|
||||||
///
|
///
|
||||||
/// // On the event processing thread
|
/// // On the event processing thread
|
||||||
/// channel_manager.process_pending_events(&|event| match event {
|
/// channel_manager.process_pending_events(&|event| {
|
||||||
/// Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id),
|
/// match event {
|
||||||
/// Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
|
/// Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id),
|
||||||
/// Event::InvoiceRequestFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
|
/// Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
|
||||||
/// // ...
|
/// Event::InvoiceRequestFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
|
||||||
/// # _ => {},
|
/// // ...
|
||||||
|
/// # _ => {},
|
||||||
|
/// }
|
||||||
|
/// Ok(())
|
||||||
/// });
|
/// });
|
||||||
/// # }
|
/// # }
|
||||||
/// ```
|
/// ```
|
||||||
|
@ -1779,11 +1799,14 @@ where
|
||||||
/// );
|
/// );
|
||||||
///
|
///
|
||||||
/// // On the event processing thread
|
/// // On the event processing thread
|
||||||
/// channel_manager.process_pending_events(&|event| match event {
|
/// channel_manager.process_pending_events(&|event| {
|
||||||
/// Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id),
|
/// match event {
|
||||||
/// Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
|
/// Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id),
|
||||||
/// // ...
|
/// Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
|
||||||
/// # _ => {},
|
/// // ...
|
||||||
|
/// # _ => {},
|
||||||
|
/// }
|
||||||
|
/// Ok(())
|
||||||
/// });
|
/// });
|
||||||
/// # Ok(())
|
/// # Ok(())
|
||||||
/// # }
|
/// # }
|
||||||
|
@ -1809,18 +1832,19 @@ where
|
||||||
/// };
|
/// };
|
||||||
///
|
///
|
||||||
/// // On the event processing thread
|
/// // On the event processing thread
|
||||||
/// channel_manager.process_pending_events(&|event| match event {
|
/// channel_manager.process_pending_events(&|event| {
|
||||||
/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
|
/// match event {
|
||||||
/// PaymentPurpose::Bolt12RefundPayment { payment_preimage: Some(payment_preimage), .. } => {
|
/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
|
||||||
/// assert_eq!(payment_hash, known_payment_hash);
|
/// PaymentPurpose::Bolt12RefundPayment { payment_preimage: Some(payment_preimage), .. } => {
|
||||||
/// println!("Claiming payment {}", payment_hash);
|
/// assert_eq!(payment_hash, known_payment_hash);
|
||||||
/// channel_manager.claim_funds(payment_preimage);
|
/// println!("Claiming payment {}", payment_hash);
|
||||||
/// },
|
/// channel_manager.claim_funds(payment_preimage);
|
||||||
/// PaymentPurpose::Bolt12RefundPayment { payment_preimage: None, .. } => {
|
/// },
|
||||||
/// println!("Unknown payment hash: {}", payment_hash);
|
/// PaymentPurpose::Bolt12RefundPayment { payment_preimage: None, .. } => {
|
||||||
/// },
|
/// println!("Unknown payment hash: {}", payment_hash);
|
||||||
/// // ...
|
/// },
|
||||||
/// # _ => {},
|
/// // ...
|
||||||
|
/// # _ => {},
|
||||||
/// },
|
/// },
|
||||||
/// Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
|
/// Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
|
||||||
/// assert_eq!(payment_hash, known_payment_hash);
|
/// assert_eq!(payment_hash, known_payment_hash);
|
||||||
|
@ -1828,6 +1852,8 @@ where
|
||||||
/// },
|
/// },
|
||||||
/// // ...
|
/// // ...
|
||||||
/// # _ => {},
|
/// # _ => {},
|
||||||
|
/// }
|
||||||
|
/// Ok(())
|
||||||
/// });
|
/// });
|
||||||
/// # }
|
/// # }
|
||||||
/// ```
|
/// ```
|
||||||
|
@ -2831,8 +2857,9 @@ macro_rules! handle_new_monitor_update {
|
||||||
|
|
||||||
macro_rules! process_events_body {
|
macro_rules! process_events_body {
|
||||||
($self: expr, $event_to_handle: expr, $handle_event: expr) => {
|
($self: expr, $event_to_handle: expr, $handle_event: expr) => {
|
||||||
|
let mut handling_failed = false;
|
||||||
let mut processed_all_events = false;
|
let mut processed_all_events = false;
|
||||||
while !processed_all_events {
|
while !handling_failed && !processed_all_events {
|
||||||
if $self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
|
if $self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -2856,24 +2883,34 @@ macro_rules! process_events_body {
|
||||||
}
|
}
|
||||||
|
|
||||||
let pending_events = $self.pending_events.lock().unwrap().clone();
|
let pending_events = $self.pending_events.lock().unwrap().clone();
|
||||||
let num_events = pending_events.len();
|
|
||||||
if !pending_events.is_empty() {
|
if !pending_events.is_empty() {
|
||||||
result = NotifyOption::DoPersist;
|
result = NotifyOption::DoPersist;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut post_event_actions = Vec::new();
|
let mut post_event_actions = Vec::new();
|
||||||
|
|
||||||
|
let mut num_handled_events = 0;
|
||||||
for (event, action_opt) in pending_events {
|
for (event, action_opt) in pending_events {
|
||||||
$event_to_handle = event;
|
$event_to_handle = event;
|
||||||
$handle_event;
|
match $handle_event {
|
||||||
if let Some(action) = action_opt {
|
Ok(()) => {
|
||||||
post_event_actions.push(action);
|
if let Some(action) = action_opt {
|
||||||
|
post_event_actions.push(action);
|
||||||
|
}
|
||||||
|
num_handled_events += 1;
|
||||||
|
}
|
||||||
|
Err(_e) => {
|
||||||
|
// If we encounter an error we stop handling events and make sure to replay
|
||||||
|
// any unhandled events on the next invocation.
|
||||||
|
handling_failed = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut pending_events = $self.pending_events.lock().unwrap();
|
let mut pending_events = $self.pending_events.lock().unwrap();
|
||||||
pending_events.drain(..num_events);
|
pending_events.drain(..num_handled_events);
|
||||||
processed_all_events = pending_events.is_empty();
|
processed_all_events = pending_events.is_empty();
|
||||||
// Note that `push_pending_forwards_ev` relies on `pending_events_processor` being
|
// Note that `push_pending_forwards_ev` relies on `pending_events_processor` being
|
||||||
// updated here with the `pending_events` lock acquired.
|
// updated here with the `pending_events` lock acquired.
|
||||||
|
@ -9240,7 +9277,7 @@ where
|
||||||
#[cfg(any(test, feature = "_test_utils"))]
|
#[cfg(any(test, feature = "_test_utils"))]
|
||||||
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
|
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
|
||||||
let events = core::cell::RefCell::new(Vec::new());
|
let events = core::cell::RefCell::new(Vec::new());
|
||||||
let event_handler = |event: events::Event| events.borrow_mut().push(event);
|
let event_handler = |event: events::Event| Ok(events.borrow_mut().push(event));
|
||||||
self.process_pending_events(&event_handler);
|
self.process_pending_events(&event_handler);
|
||||||
events.into_inner()
|
events.into_inner()
|
||||||
}
|
}
|
||||||
|
@ -9347,7 +9384,7 @@ where
|
||||||
/// using the given event handler.
|
/// using the given event handler.
|
||||||
///
|
///
|
||||||
/// See the trait-level documentation of [`EventsProvider`] for requirements.
|
/// See the trait-level documentation of [`EventsProvider`] for requirements.
|
||||||
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
|
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>>, H: Fn(Event) -> Future>(
|
||||||
&self, handler: H
|
&self, handler: H
|
||||||
) {
|
) {
|
||||||
let mut ev;
|
let mut ev;
|
||||||
|
|
|
@ -307,7 +307,7 @@ fn disconnect_peers(node_a: &MessengerNode, node_b: &MessengerNode) {
|
||||||
|
|
||||||
fn release_events(node: &MessengerNode) -> Vec<Event> {
|
fn release_events(node: &MessengerNode) -> Vec<Event> {
|
||||||
let events = core::cell::RefCell::new(Vec::new());
|
let events = core::cell::RefCell::new(Vec::new());
|
||||||
node.messenger.process_pending_events(&|e| events.borrow_mut().push(e));
|
node.messenger.process_pending_events(&|e| Ok(events.borrow_mut().push(e)));
|
||||||
events.into_inner()
|
events.into_inner()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,7 @@ use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey};
|
||||||
use crate::blinded_path::{BlindedPath, IntroductionNode, NextMessageHop, NodeIdLookUp};
|
use crate::blinded_path::{BlindedPath, IntroductionNode, NextMessageHop, NodeIdLookUp};
|
||||||
use crate::blinded_path::message::{advance_path_by_one, ForwardNode, ForwardTlvs, MessageContext, OffersContext, ReceiveTlvs};
|
use crate::blinded_path::message::{advance_path_by_one, ForwardNode, ForwardTlvs, MessageContext, OffersContext, ReceiveTlvs};
|
||||||
use crate::blinded_path::utils;
|
use crate::blinded_path::utils;
|
||||||
use crate::events::{Event, EventHandler, EventsProvider};
|
use crate::events::{Event, EventHandler, EventsProvider, ReplayEvent};
|
||||||
use crate::sign::{EntropySource, NodeSigner, Recipient};
|
use crate::sign::{EntropySource, NodeSigner, Recipient};
|
||||||
use crate::ln::features::{InitFeatures, NodeFeatures};
|
use crate::ln::features::{InitFeatures, NodeFeatures};
|
||||||
use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler, SocketAddress};
|
use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler, SocketAddress};
|
||||||
|
@ -31,11 +31,13 @@ use super::packet::OnionMessageContents;
|
||||||
use super::packet::ParsedOnionMessageContents;
|
use super::packet::ParsedOnionMessageContents;
|
||||||
use super::offers::OffersMessageHandler;
|
use super::offers::OffersMessageHandler;
|
||||||
use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload, ReceiveControlTlvs, SMALL_PACKET_HOP_DATA_LEN};
|
use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload, ReceiveControlTlvs, SMALL_PACKET_HOP_DATA_LEN};
|
||||||
|
use crate::util::async_poll::{MultiResultFuturePoller, ResultFuture};
|
||||||
use crate::util::logger::{Logger, WithContext};
|
use crate::util::logger::{Logger, WithContext};
|
||||||
use crate::util::ser::Writeable;
|
use crate::util::ser::Writeable;
|
||||||
|
|
||||||
use core::fmt;
|
use core::fmt;
|
||||||
use core::ops::Deref;
|
use core::ops::Deref;
|
||||||
|
use core::sync::atomic::{AtomicBool, Ordering};
|
||||||
use crate::io;
|
use crate::io;
|
||||||
use crate::sync::Mutex;
|
use crate::sync::Mutex;
|
||||||
use crate::prelude::*;
|
use crate::prelude::*;
|
||||||
|
@ -261,12 +263,9 @@ pub struct OnionMessenger<
|
||||||
async_payments_handler: APH,
|
async_payments_handler: APH,
|
||||||
custom_handler: CMH,
|
custom_handler: CMH,
|
||||||
intercept_messages_for_offline_peers: bool,
|
intercept_messages_for_offline_peers: bool,
|
||||||
pending_events: Mutex<PendingEvents>,
|
pending_intercepted_msgs_events: Mutex<Vec<Event>>,
|
||||||
}
|
pending_peer_connected_events: Mutex<Vec<Event>>,
|
||||||
|
pending_events_processor: AtomicBool,
|
||||||
struct PendingEvents {
|
|
||||||
intercepted_msgs: Vec<Event>,
|
|
||||||
peer_connecteds: Vec<Event>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// [`OnionMessage`]s buffered to be sent.
|
/// [`OnionMessage`]s buffered to be sent.
|
||||||
|
@ -1021,6 +1020,28 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
macro_rules! drop_handled_events_and_abort { ($self: expr, $res: expr, $offset: expr, $event_queue: expr) => {
|
||||||
|
// We want to make sure to cleanly abort upon event handling failure. To this end, we drop all
|
||||||
|
// successfully handled events from the given queue, reset the events processing flag, and
|
||||||
|
// return, to have the events eventually replayed upon next invocation.
|
||||||
|
{
|
||||||
|
let mut queue_lock = $event_queue.lock().unwrap();
|
||||||
|
|
||||||
|
// We skip `$offset` result entries to reach the ones relevant for the given `$event_queue`.
|
||||||
|
let mut res_iter = $res.iter().skip($offset);
|
||||||
|
|
||||||
|
// Keep all events which previously error'd *or* any that have been added since we dropped
|
||||||
|
// the Mutex before.
|
||||||
|
queue_lock.retain(|_| res_iter.next().map_or(true, |r| r.is_err()));
|
||||||
|
|
||||||
|
if $res.iter().any(|r| r.is_err()) {
|
||||||
|
// We failed handling some events. Return to have them eventually replayed.
|
||||||
|
$self.pending_events_processor.store(false, Ordering::Release);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}}
|
||||||
|
|
||||||
impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, APH: Deref, CMH: Deref>
|
impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, APH: Deref, CMH: Deref>
|
||||||
OnionMessenger<ES, NS, L, NL, MR, OMH, APH, CMH>
|
OnionMessenger<ES, NS, L, NL, MR, OMH, APH, CMH>
|
||||||
where
|
where
|
||||||
|
@ -1095,10 +1116,9 @@ where
|
||||||
async_payments_handler,
|
async_payments_handler,
|
||||||
custom_handler,
|
custom_handler,
|
||||||
intercept_messages_for_offline_peers,
|
intercept_messages_for_offline_peers,
|
||||||
pending_events: Mutex::new(PendingEvents {
|
pending_intercepted_msgs_events: Mutex::new(Vec::new()),
|
||||||
intercepted_msgs: Vec::new(),
|
pending_peer_connected_events: Mutex::new(Vec::new()),
|
||||||
peer_connecteds: Vec::new(),
|
pending_events_processor: AtomicBool::new(false),
|
||||||
}),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1316,14 +1336,15 @@ where
|
||||||
|
|
||||||
fn enqueue_intercepted_event(&self, event: Event) {
|
fn enqueue_intercepted_event(&self, event: Event) {
|
||||||
const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256;
|
const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256;
|
||||||
let mut pending_events = self.pending_events.lock().unwrap();
|
let mut pending_intercepted_msgs_events =
|
||||||
let total_buffered_bytes: usize =
|
self.pending_intercepted_msgs_events.lock().unwrap();
|
||||||
pending_events.intercepted_msgs.iter().map(|ev| ev.serialized_length()).sum();
|
let total_buffered_bytes: usize = pending_intercepted_msgs_events.iter()
|
||||||
|
.map(|ev| ev.serialized_length()).sum();
|
||||||
if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE {
|
if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE {
|
||||||
log_trace!(self.logger, "Dropping event {:?}: buffer full", event);
|
log_trace!(self.logger, "Dropping event {:?}: buffer full", event);
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
pending_events.intercepted_msgs.push(event);
|
pending_intercepted_msgs_events.push(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Processes any events asynchronously using the given handler.
|
/// Processes any events asynchronously using the given handler.
|
||||||
|
@ -1333,42 +1354,63 @@ where
|
||||||
/// have an ordering requirement.
|
/// have an ordering requirement.
|
||||||
///
|
///
|
||||||
/// See the trait-level documentation of [`EventsProvider`] for requirements.
|
/// See the trait-level documentation of [`EventsProvider`] for requirements.
|
||||||
pub async fn process_pending_events_async<Future: core::future::Future<Output = ()> + core::marker::Unpin, H: Fn(Event) -> Future>(
|
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>> + core::marker::Unpin, H: Fn(Event) -> Future>(
|
||||||
&self, handler: H
|
&self, handler: H
|
||||||
) {
|
) {
|
||||||
let mut intercepted_msgs = Vec::new();
|
if self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
|
||||||
let mut peer_connecteds = Vec::new();
|
return;
|
||||||
{
|
|
||||||
let mut pending_events = self.pending_events.lock().unwrap();
|
|
||||||
core::mem::swap(&mut pending_events.intercepted_msgs, &mut intercepted_msgs);
|
|
||||||
core::mem::swap(&mut pending_events.peer_connecteds, &mut peer_connecteds);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut futures = Vec::with_capacity(intercepted_msgs.len());
|
{
|
||||||
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
|
let intercepted_msgs = self.pending_intercepted_msgs_events.lock().unwrap().clone();
|
||||||
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
|
let mut futures = Vec::with_capacity(intercepted_msgs.len());
|
||||||
if let Some(addresses) = addresses.take() {
|
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
|
||||||
futures.push(Some(handler(Event::ConnectionNeeded { node_id: *node_id, addresses })));
|
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
|
||||||
|
if let Some(addresses) = addresses.take() {
|
||||||
|
let future = ResultFuture::Pending(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }));
|
||||||
|
futures.push(future);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
for ev in intercepted_msgs {
|
// The offset in the `futures` vec at which `intercepted_msgs` start. We don't bother
|
||||||
if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
|
// replaying `ConnectionNeeded` events.
|
||||||
futures.push(Some(handler(ev)));
|
let intercepted_msgs_offset = futures.len();
|
||||||
}
|
|
||||||
// Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
|
|
||||||
crate::util::async_poll::MultiFuturePoller(futures).await;
|
|
||||||
|
|
||||||
if peer_connecteds.len() <= 1 {
|
for ev in intercepted_msgs {
|
||||||
for event in peer_connecteds { handler(event).await; }
|
if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
|
||||||
} else {
|
let future = ResultFuture::Pending(handler(ev));
|
||||||
let mut futures = Vec::new();
|
futures.push(future);
|
||||||
for event in peer_connecteds {
|
|
||||||
futures.push(Some(handler(event)));
|
|
||||||
}
|
}
|
||||||
crate::util::async_poll::MultiFuturePoller(futures).await;
|
// Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
|
||||||
|
let res = MultiResultFuturePoller::new(futures).await;
|
||||||
|
drop_handled_events_and_abort!(self, res, intercepted_msgs_offset, self.pending_intercepted_msgs_events);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let peer_connecteds = self.pending_peer_connected_events.lock().unwrap().clone();
|
||||||
|
let num_peer_connecteds = peer_connecteds.len();
|
||||||
|
if num_peer_connecteds <= 1 {
|
||||||
|
for event in peer_connecteds {
|
||||||
|
if handler(event).await.is_ok() {
|
||||||
|
self.pending_peer_connected_events.lock().unwrap().drain(..num_peer_connecteds);
|
||||||
|
} else {
|
||||||
|
// We failed handling the event. Return to have it eventually replayed.
|
||||||
|
self.pending_events_processor.store(false, Ordering::Release);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let mut futures = Vec::new();
|
||||||
|
for event in peer_connecteds {
|
||||||
|
let future = ResultFuture::Pending(handler(event));
|
||||||
|
futures.push(future);
|
||||||
|
}
|
||||||
|
let res = MultiResultFuturePoller::new(futures).await;
|
||||||
|
drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.pending_events_processor.store(false, Ordering::Release);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1408,31 +1450,42 @@ where
|
||||||
CMH::Target: CustomOnionMessageHandler,
|
CMH::Target: CustomOnionMessageHandler,
|
||||||
{
|
{
|
||||||
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
|
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
|
||||||
|
if self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
|
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
|
||||||
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
|
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
|
||||||
if let Some(addresses) = addresses.take() {
|
if let Some(addresses) = addresses.take() {
|
||||||
handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses });
|
let _ = handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let mut events = Vec::new();
|
let intercepted_msgs;
|
||||||
|
let peer_connecteds;
|
||||||
{
|
{
|
||||||
let mut pending_events = self.pending_events.lock().unwrap();
|
let pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
|
||||||
|
intercepted_msgs = pending_intercepted_msgs_events.clone();
|
||||||
|
let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
|
||||||
|
peer_connecteds = pending_peer_connected_events.clone();
|
||||||
#[cfg(debug_assertions)] {
|
#[cfg(debug_assertions)] {
|
||||||
for ev in pending_events.intercepted_msgs.iter() {
|
for ev in pending_intercepted_msgs_events.iter() {
|
||||||
if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); }
|
if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); }
|
||||||
}
|
}
|
||||||
for ev in pending_events.peer_connecteds.iter() {
|
for ev in pending_peer_connected_events.iter() {
|
||||||
if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
|
if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
core::mem::swap(&mut pending_events.intercepted_msgs, &mut events);
|
pending_peer_connected_events.shrink_to(10); // Limit total heap usage
|
||||||
events.append(&mut pending_events.peer_connecteds);
|
|
||||||
pending_events.peer_connecteds.shrink_to(10); // Limit total heap usage
|
|
||||||
}
|
|
||||||
for ev in events {
|
|
||||||
handler.handle_event(ev);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let res = intercepted_msgs.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
|
||||||
|
drop_handled_events_and_abort!(self, res, 0, self.pending_intercepted_msgs_events);
|
||||||
|
|
||||||
|
let res = peer_connecteds.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
|
||||||
|
drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
|
||||||
|
|
||||||
|
self.pending_events_processor.store(false, Ordering::Release);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1558,7 +1611,9 @@ where
|
||||||
.or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()))
|
.or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()))
|
||||||
.mark_connected();
|
.mark_connected();
|
||||||
if self.intercept_messages_for_offline_peers {
|
if self.intercept_messages_for_offline_peers {
|
||||||
self.pending_events.lock().unwrap().peer_connecteds.push(
|
let mut pending_peer_connected_events =
|
||||||
|
self.pending_peer_connected_events.lock().unwrap();
|
||||||
|
pending_peer_connected_events.push(
|
||||||
Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
|
Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,29 +15,62 @@ use core::marker::Unpin;
|
||||||
use core::pin::Pin;
|
use core::pin::Pin;
|
||||||
use core::task::{Context, Poll};
|
use core::task::{Context, Poll};
|
||||||
|
|
||||||
pub(crate) struct MultiFuturePoller<F: Future<Output = ()> + Unpin>(pub Vec<Option<F>>);
|
pub(crate) enum ResultFuture<F: Future<Output = Result<(), E>>, E: Copy + Unpin> {
|
||||||
|
Pending(F),
|
||||||
|
Ready(Result<(), E>),
|
||||||
|
}
|
||||||
|
|
||||||
impl<F: Future<Output = ()> + Unpin> Future for MultiFuturePoller<F> {
|
pub(crate) struct MultiResultFuturePoller<
|
||||||
type Output = ();
|
F: Future<Output = Result<(), E>> + Unpin,
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
|
E: Copy + Unpin,
|
||||||
|
> {
|
||||||
|
futures_state: Vec<ResultFuture<F, E>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F: Future<Output = Result<(), E>> + Unpin, E: Copy + Unpin> MultiResultFuturePoller<F, E> {
|
||||||
|
pub fn new(futures_state: Vec<ResultFuture<F, E>>) -> Self {
|
||||||
|
Self { futures_state }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F: Future<Output = Result<(), E>> + Unpin, E: Copy + Unpin> Future
|
||||||
|
for MultiResultFuturePoller<F, E>
|
||||||
|
{
|
||||||
|
type Output = Vec<Result<(), E>>;
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Vec<Result<(), E>>> {
|
||||||
let mut have_pending_futures = false;
|
let mut have_pending_futures = false;
|
||||||
for fut_option in self.get_mut().0.iter_mut() {
|
let futures_state = &mut self.get_mut().futures_state;
|
||||||
let mut fut = match fut_option.take() {
|
for state in futures_state.iter_mut() {
|
||||||
None => continue,
|
match state {
|
||||||
Some(fut) => fut,
|
ResultFuture::Pending(ref mut fut) => match Pin::new(fut).poll(cx) {
|
||||||
};
|
Poll::Ready(res) => {
|
||||||
match Pin::new(&mut fut).poll(cx) {
|
*state = ResultFuture::Ready(res);
|
||||||
Poll::Ready(()) => {},
|
},
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
have_pending_futures = true;
|
have_pending_futures = true;
|
||||||
*fut_option = Some(fut);
|
},
|
||||||
},
|
},
|
||||||
|
ResultFuture::Ready(_) => continue,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if have_pending_futures {
|
if have_pending_futures {
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
} else {
|
} else {
|
||||||
Poll::Ready(())
|
let results = futures_state
|
||||||
|
.drain(..)
|
||||||
|
.filter_map(|e| match e {
|
||||||
|
ResultFuture::Ready(res) => Some(res),
|
||||||
|
ResultFuture::Pending(_) => {
|
||||||
|
debug_assert!(
|
||||||
|
false,
|
||||||
|
"All futures are expected to be ready if none are pending"
|
||||||
|
);
|
||||||
|
None
|
||||||
|
},
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
Poll::Ready(results)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue