mirror of
https://github.com/lightningdevkit/rust-lightning.git
synced 2025-02-24 15:02:20 +01:00
Merge pull request #3193 from tnull/2024-07-2995-followups
#2995 followups
This commit is contained in:
commit
0d7ae8616b
3 changed files with 58 additions and 18 deletions
|
@ -2321,8 +2321,8 @@ mod tests {
|
|||
|
||||
begin_open_channel!(nodes[0], nodes[1], channel_value);
|
||||
assert_eq!(
|
||||
first_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE)),
|
||||
second_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
|
||||
first_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE)).unwrap(),
|
||||
second_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE)).unwrap()
|
||||
);
|
||||
|
||||
if !std::thread::panicking() {
|
||||
|
|
|
@ -776,8 +776,9 @@ pub enum Event {
|
|||
/// replies. Handlers should connect to the node otherwise any buffered messages may be lost.
|
||||
///
|
||||
/// # Failure Behavior and Persistence
|
||||
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
|
||||
/// returning `Err(ReplayEvent ())`), but won't be persisted across restarts.
|
||||
/// This event won't be replayed after failures-to-handle
|
||||
/// (i.e., the event handler returning `Err(ReplayEvent ())`), and also won't be persisted
|
||||
/// across restarts.
|
||||
///
|
||||
/// [`OnionMessage`]: msgs::OnionMessage
|
||||
/// [`MessageRouter`]: crate::onion_message::messenger::MessageRouter
|
||||
|
|
|
@ -1047,21 +1047,25 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
macro_rules! drop_handled_events_and_abort { ($self: expr, $res: expr, $offset: expr, $event_queue: expr) => {
|
||||
macro_rules! drop_handled_events_and_abort { ($self: expr, $res_iter: expr, $event_queue: expr) => {
|
||||
// We want to make sure to cleanly abort upon event handling failure. To this end, we drop all
|
||||
// successfully handled events from the given queue, reset the events processing flag, and
|
||||
// return, to have the events eventually replayed upon next invocation.
|
||||
{
|
||||
let mut queue_lock = $event_queue.lock().unwrap();
|
||||
|
||||
// We skip `$offset` result entries to reach the ones relevant for the given `$event_queue`.
|
||||
let mut res_iter = $res.iter().skip($offset);
|
||||
|
||||
// Keep all events which previously error'd *or* any that have been added since we dropped
|
||||
// the Mutex before.
|
||||
queue_lock.retain(|_| res_iter.next().map_or(true, |r| r.is_err()));
|
||||
let mut any_error = false;
|
||||
queue_lock.retain(|_| {
|
||||
$res_iter.next().map_or(true, |r| {
|
||||
let is_err = r.is_err();
|
||||
any_error |= is_err;
|
||||
is_err
|
||||
})
|
||||
});
|
||||
|
||||
if $res.iter().any(|r| r.is_err()) {
|
||||
if any_error {
|
||||
// We failed handling some events. Return to have them eventually replayed.
|
||||
$self.pending_events_processor.store(false, Ordering::Release);
|
||||
$self.event_notifier.notify();
|
||||
|
@ -1426,7 +1430,8 @@ where
|
|||
}
|
||||
// Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
|
||||
let res = MultiResultFuturePoller::new(futures).await;
|
||||
drop_handled_events_and_abort!(self, res, intercepted_msgs_offset, self.pending_intercepted_msgs_events);
|
||||
let mut res_iter = res.iter().skip(intercepted_msgs_offset);
|
||||
drop_handled_events_and_abort!(self, res_iter, self.pending_intercepted_msgs_events);
|
||||
}
|
||||
|
||||
{
|
||||
|
@ -1449,7 +1454,8 @@ where
|
|||
futures.push(future);
|
||||
}
|
||||
let res = MultiResultFuturePoller::new(futures).await;
|
||||
drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
|
||||
let mut res_iter = res.iter();
|
||||
drop_handled_events_and_abort!(self, res_iter, self.pending_peer_connected_events);
|
||||
}
|
||||
}
|
||||
self.pending_events_processor.store(false, Ordering::Release);
|
||||
|
@ -1508,7 +1514,7 @@ where
|
|||
{
|
||||
let pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
|
||||
intercepted_msgs = pending_intercepted_msgs_events.clone();
|
||||
let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
|
||||
let pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
|
||||
peer_connecteds = pending_peer_connected_events.clone();
|
||||
#[cfg(debug_assertions)] {
|
||||
for ev in pending_intercepted_msgs_events.iter() {
|
||||
|
@ -1518,14 +1524,47 @@ where
|
|||
if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
|
||||
}
|
||||
}
|
||||
pending_peer_connected_events.shrink_to(10); // Limit total heap usage
|
||||
}
|
||||
|
||||
let res = intercepted_msgs.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
|
||||
drop_handled_events_and_abort!(self, res, 0, self.pending_intercepted_msgs_events);
|
||||
let mut handling_intercepted_msgs_failed = false;
|
||||
let mut num_handled_intercepted_events = 0;
|
||||
for ev in intercepted_msgs {
|
||||
match handler.handle_event(ev) {
|
||||
Ok(()) => num_handled_intercepted_events += 1,
|
||||
Err(ReplayEvent ()) => {
|
||||
handling_intercepted_msgs_failed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let res = peer_connecteds.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
|
||||
drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
|
||||
{
|
||||
let mut pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
|
||||
pending_intercepted_msgs_events.drain(..num_handled_intercepted_events);
|
||||
}
|
||||
|
||||
if handling_intercepted_msgs_failed {
|
||||
self.pending_events_processor.store(false, Ordering::Release);
|
||||
self.event_notifier.notify();
|
||||
return;
|
||||
}
|
||||
|
||||
let mut num_handled_peer_connecteds = 0;
|
||||
for ev in peer_connecteds {
|
||||
match handler.handle_event(ev) {
|
||||
Ok(()) => num_handled_peer_connecteds += 1,
|
||||
Err(ReplayEvent ()) => {
|
||||
self.event_notifier.notify();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
|
||||
pending_peer_connected_events.drain(..num_handled_peer_connecteds);
|
||||
pending_peer_connected_events.shrink_to(10); // Limit total heap usage
|
||||
}
|
||||
|
||||
self.pending_events_processor.store(false, Ordering::Release);
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue