Merge pull request #3449 from TheBlueMatt/2024-12-event-processing-logging

Log before and after `Event` processing calls
This commit is contained in:
Jeffrey Czyz 2024-12-08 21:59:41 -06:00 committed by GitHub
commit fe1cf69b58
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 40 additions and 15 deletions

View file

@ -573,7 +573,7 @@ where C::Target: chain::Filter,
for funding_txo in mons_to_process {
let mut ev;
match super::channelmonitor::process_events_body!(
self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await) {
self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), self.logger, ev, handler(ev).await) {
Ok(()) => {},
Err(ReplayEvent ()) => {
self.event_notifier.notify();
@ -914,7 +914,7 @@ impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref,
/// [`BumpTransaction`]: events::Event::BumpTransaction
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
for monitor_state in self.monitors.read().unwrap().values() {
match monitor_state.monitor.process_pending_events(&handler) {
match monitor_state.monitor.process_pending_events(&handler, &self.logger) {
Ok(()) => {},
Err(ReplayEvent ()) => {
self.event_notifier.notify();

View file

@ -1236,7 +1236,7 @@ impl<Signer: EcdsaChannelSigner> Writeable for ChannelMonitorImpl<Signer> {
}
macro_rules! _process_events_body {
($self_opt: expr, $event_to_handle: expr, $handle_event: expr) => {
($self_opt: expr, $logger: expr, $event_to_handle: expr, $handle_event: expr) => {
loop {
let mut handling_res = Ok(());
let (pending_events, repeated_events);
@ -1253,8 +1253,11 @@ macro_rules! _process_events_body {
let mut num_handled_events = 0;
for event in pending_events {
log_trace!($logger, "Handling event {:?}...", event);
$event_to_handle = event;
match $handle_event {
let event_handling_result = $handle_event;
log_trace!($logger, "Done handling event, result: {:?}", event_handling_result);
match event_handling_result {
Ok(()) => num_handled_events += 1,
Err(e) => {
// If we encounter an error we stop handling events and make sure to replay
@ -1614,19 +1617,23 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
///
/// [`SpendableOutputs`]: crate::events::Event::SpendableOutputs
/// [`BumpTransaction`]: crate::events::Event::BumpTransaction
pub fn process_pending_events<H: Deref>(&self, handler: &H) -> Result<(), ReplayEvent> where H::Target: EventHandler {
pub fn process_pending_events<H: Deref, L: Deref>(&self, handler: &H, logger: &L)
-> Result<(), ReplayEvent> where H::Target: EventHandler, L::Target: Logger {
let mut ev;
process_events_body!(Some(self), ev, handler.handle_event(ev))
process_events_body!(Some(self), logger, ev, handler.handle_event(ev))
}
/// Processes any events asynchronously.
///
/// See [`Self::process_pending_events`] for more information.
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>>, H: Fn(Event) -> Future>(
&self, handler: &H
) -> Result<(), ReplayEvent> {
pub async fn process_pending_events_async<
Future: core::future::Future<Output = Result<(), ReplayEvent>>, H: Fn(Event) -> Future,
L: Deref,
>(
&self, handler: &H, logger: &L,
) -> Result<(), ReplayEvent> where L::Target: Logger {
let mut ev;
process_events_body!(Some(self), ev, { handler(ev).await })
process_events_body!(Some(self), logger, ev, { handler(ev).await })
}
#[cfg(test)]

View file

@ -3390,8 +3390,11 @@ macro_rules! process_events_body {
let mut num_handled_events = 0;
for (event, action_opt) in pending_events {
log_trace!($self.logger, "Handling event {:?}...", event);
$event_to_handle = event;
match $handle_event {
let event_handling_result = $handle_event;
log_trace!($self.logger, "Done handling event, result: {:?}", event_handling_result);
match event_handling_result {
Ok(()) => {
if let Some(action) = action_opt {
post_event_actions.push(action);

View file

@ -1427,7 +1427,9 @@ where
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
if let Some(addresses) = addresses.take() {
let future = ResultFuture::Pending(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }));
let event = Event::ConnectionNeeded { node_id: *node_id, addresses };
log_trace!(self.logger, "Handling event {:?} async...", event);
let future = ResultFuture::Pending(handler(event));
futures.push(future);
}
}
@ -1439,11 +1441,13 @@ where
for ev in intercepted_msgs {
if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
log_trace!(self.logger, "Handling event {:?} async...", ev);
let future = ResultFuture::Pending(handler(ev));
futures.push(future);
}
// Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
let res = MultiResultFuturePoller::new(futures).await;
log_trace!(self.logger, "Done handling events async, results: {:?}", res);
let mut res_iter = res.iter().skip(intercepted_msgs_offset);
drop_handled_events_and_abort!(self, res_iter, self.pending_intercepted_msgs_events);
}
@ -1464,10 +1468,12 @@ where
} else {
let mut futures = Vec::new();
for event in peer_connecteds {
log_trace!(self.logger, "Handling event {:?} async...", event);
let future = ResultFuture::Pending(handler(event));
futures.push(future);
}
let res = MultiResultFuturePoller::new(futures).await;
log_trace!(self.logger, "Done handling events async, results: {:?}", res);
let mut res_iter = res.iter();
drop_handled_events_and_abort!(self, res_iter, self.pending_peer_connected_events);
}
@ -1520,7 +1526,10 @@ where
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
if let Some(addresses) = addresses.take() {
let _ = handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses });
let event = Event::ConnectionNeeded { node_id: *node_id, addresses };
log_trace!(self.logger, "Handling event {:?}...", event);
let res = handler.handle_event(event);
log_trace!(self.logger, "Done handling event, ignoring result: {:?}", res);
}
}
}
@ -1544,7 +1553,10 @@ where
let mut handling_intercepted_msgs_failed = false;
let mut num_handled_intercepted_events = 0;
for ev in intercepted_msgs {
match handler.handle_event(ev) {
log_trace!(self.logger, "Handling event {:?}...", ev);
let res = handler.handle_event(ev);
log_trace!(self.logger, "Done handling event, result: {:?}", res);
match res {
Ok(()) => num_handled_intercepted_events += 1,
Err(ReplayEvent ()) => {
handling_intercepted_msgs_failed = true;
@ -1566,7 +1578,10 @@ where
let mut num_handled_peer_connecteds = 0;
for ev in peer_connecteds {
match handler.handle_event(ev) {
log_trace!(self.logger, "Handling event {:?}...", ev);
let res = handler.handle_event(ev);
log_trace!(self.logger, "Done handling event, result: {:?}", res);
match res {
Ok(()) => num_handled_peer_connecteds += 1,
Err(ReplayEvent ()) => {
self.event_notifier.notify();