Merge pull request #2280 from TheBlueMatt/2023-05-event-deadlock

Never block a thread on the `PeerManager` event handling lock
This commit is contained in:
Wilmer Paulino 2023-05-24 10:51:16 -07:00 committed by GitHub
commit 64c58a565b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 383 additions and 334 deletions

View file

@ -36,7 +36,7 @@ use crate::prelude::*;
use crate::io;
use alloc::collections::LinkedList;
use crate::sync::{Arc, Mutex, MutexGuard, FairRwLock};
use core::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use core::sync::atomic::{AtomicBool, AtomicU32, AtomicI32, Ordering};
use core::{cmp, hash, fmt, mem};
use core::ops::Deref;
use core::convert::Infallible;
@ -696,15 +696,18 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: D
/// lock held. Entries may be added with only the `peers` read lock held (though the
/// `Descriptor` value must already exist in `peers`).
node_id_to_descriptor: Mutex<HashMap<PublicKey, Descriptor>>,
/// We can only have one thread processing events at once, but we don't usually need the full
/// `peers` write lock to do so, so instead we block on this empty mutex when entering
/// `process_events`.
event_processing_lock: Mutex<()>,
/// Because event processing is global and always does all available work before returning,
/// there is no reason for us to have many event processors waiting on the lock at once.
/// Instead, we limit the total blocked event processors to always exactly one by setting this
/// when an event process call is waiting.
blocked_event_processors: AtomicBool,
/// We can only have one thread processing events at once, but if a second call to
/// `process_events` happens while a first call is in progress, one of the two calls needs to
/// start from the top to ensure any new messages are also handled.
///
/// Because the event handler calls into user code which may block, we don't want to block a
/// second thread waiting for another thread to handle events which is then blocked on user
/// code, so we store an atomic counter here:
/// * 0 indicates no event processor is running
/// * 1 indicates an event processor is running
/// * > 1 indicates an event processor is running but needs to start again from the top once
/// it finishes as another thread tried to start processing events but returned early.
event_processing_state: AtomicI32,
/// Used to track the last value sent in a node_announcement "timestamp" field. We ensure this
/// value increases strictly since we don't assume access to a time source.
@ -874,8 +877,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
message_handler,
peers: FairRwLock::new(HashMap::new()),
node_id_to_descriptor: Mutex::new(HashMap::new()),
event_processing_lock: Mutex::new(()),
blocked_event_processors: AtomicBool::new(false),
event_processing_state: AtomicI32::new(0),
ephemeral_key_midstate,
peer_counter: AtomicCounter::new(),
gossip_processing_backlogged: AtomicBool::new(false),
@ -1814,356 +1816,351 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
/// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards
/// [`send_data`]: SocketDescriptor::send_data
pub fn process_events(&self) {
let mut _single_processor_lock = self.event_processing_lock.try_lock();
if _single_processor_lock.is_err() {
// While we could wake the older sleeper here with a CV and make more even waiting
// times, that would be a lot of overengineering for a simple "reduce total waiter
// count" goal.
match self.blocked_event_processors.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) {
Err(val) => {
debug_assert!(val, "compare_exchange failed spuriously?");
return;
},
Ok(val) => {
debug_assert!(!val, "compare_exchange succeeded spuriously?");
// We're the only waiter, as the running process_events may have emptied the
// pending events "long" ago and there are new events for us to process, wait until
// its done and process any leftover events before returning.
_single_processor_lock = Ok(self.event_processing_lock.lock().unwrap());
self.blocked_event_processors.store(false, Ordering::Release);
}
}
if self.event_processing_state.fetch_add(1, Ordering::AcqRel) > 0 {
// If we're not the first event processor to get here, just return early, the increment
// we just did will be treated as "go around again" at the end.
return;
}
self.update_gossip_backlogged();
let flush_read_disabled = self.gossip_processing_backlog_lifted.swap(false, Ordering::Relaxed);
loop {
self.update_gossip_backlogged();
let flush_read_disabled = self.gossip_processing_backlog_lifted.swap(false, Ordering::Relaxed);
let mut peers_to_disconnect = HashMap::new();
let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
let mut peers_to_disconnect = HashMap::new();
let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
{
// TODO: There are some DoS attacks here where you can flood someone's outbound send
// buffer by doing things like announcing channels on another node. We should be willing to
// drop optional-ish messages when send buffers get full!
{
// TODO: There are some DoS attacks here where you can flood someone's outbound send
// buffer by doing things like announcing channels on another node. We should be willing to
// drop optional-ish messages when send buffers get full!
let peers_lock = self.peers.read().unwrap();
let peers = &*peers_lock;
macro_rules! get_peer_for_forwarding {
($node_id: expr) => {
{
if peers_to_disconnect.get($node_id).is_some() {
// If we've "disconnected" this peer, do not send to it.
continue;
}
let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned();
match descriptor_opt {
Some(descriptor) => match peers.get(&descriptor) {
Some(peer_mutex) => {
let peer_lock = peer_mutex.lock().unwrap();
if !peer_lock.handshake_complete() {
let peers_lock = self.peers.read().unwrap();
let peers = &*peers_lock;
macro_rules! get_peer_for_forwarding {
($node_id: expr) => {
{
if peers_to_disconnect.get($node_id).is_some() {
// If we've "disconnected" this peer, do not send to it.
continue;
}
let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned();
match descriptor_opt {
Some(descriptor) => match peers.get(&descriptor) {
Some(peer_mutex) => {
let peer_lock = peer_mutex.lock().unwrap();
if !peer_lock.handshake_complete() {
continue;
}
peer_lock
},
None => {
debug_assert!(false, "Inconsistent peers set state!");
continue;
}
peer_lock
},
None => {
debug_assert!(false, "Inconsistent peers set state!");
continue;
}
},
None => {
continue;
},
},
}
}
}
}
}
for event in events_generated.drain(..) {
match event {
MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.temporary_channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.temporary_channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.temporary_channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.temporary_channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
log_pubkey!(node_id),
log_bytes!(msg.temporary_channel_id),
log_funding_channel_id!(msg.funding_txid, msg.funding_output_index));
// TODO: If the peer is gone we should generate a DiscardFunding event
// indicating to the wallet that they should just throw away this funding transaction
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendChannelReady { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendChannelReady event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxAddInput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxAddOutput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxComplete { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxComplete event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxSignatures event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxInitRbf event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxAckRbf event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxAbort { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxAbort event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
log_debug!(self.logger, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
log_pubkey!(node_id),
update_add_htlcs.len(),
update_fulfill_htlcs.len(),
update_fail_htlcs.len(),
log_bytes!(commitment_signed.channel_id));
let mut peer = get_peer_for_forwarding!(node_id);
for msg in update_add_htlcs {
self.enqueue_message(&mut *peer, msg);
}
for msg in update_fulfill_htlcs {
self.enqueue_message(&mut *peer, msg);
}
for msg in update_fail_htlcs {
self.enqueue_message(&mut *peer, msg);
}
for msg in update_fail_malformed_htlcs {
self.enqueue_message(&mut *peer, msg);
}
if let &Some(ref msg) = update_fee {
self.enqueue_message(&mut *peer, msg);
}
self.enqueue_message(&mut *peer, commitment_signed);
},
MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling Shutdown event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => {
log_debug!(self.logger, "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}",
log_pubkey!(node_id),
msg.contents.short_channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), update_msg);
},
MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => {
log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
match self.message_handler.route_handler.handle_channel_announcement(&msg) {
Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None),
_ => {},
}
if let Some(msg) = update_msg {
for event in events_generated.drain(..) {
match event {
MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.temporary_channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.temporary_channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.temporary_channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.temporary_channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
log_pubkey!(node_id),
log_bytes!(msg.temporary_channel_id),
log_funding_channel_id!(msg.funding_txid, msg.funding_output_index));
// TODO: If the peer is gone we should generate a DiscardFunding event
// indicating to the wallet that they should just throw away this funding transaction
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendChannelReady { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendChannelReady event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxAddInput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxAddOutput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxComplete { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxComplete event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxSignatures event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxInitRbf event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxAckRbf event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxAbort { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxAbort event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
log_debug!(self.logger, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
log_pubkey!(node_id),
update_add_htlcs.len(),
update_fulfill_htlcs.len(),
update_fail_htlcs.len(),
log_bytes!(commitment_signed.channel_id));
let mut peer = get_peer_for_forwarding!(node_id);
for msg in update_add_htlcs {
self.enqueue_message(&mut *peer, msg);
}
for msg in update_fulfill_htlcs {
self.enqueue_message(&mut *peer, msg);
}
for msg in update_fail_htlcs {
self.enqueue_message(&mut *peer, msg);
}
for msg in update_fail_malformed_htlcs {
self.enqueue_message(&mut *peer, msg);
}
if let &Some(ref msg) = update_fee {
self.enqueue_message(&mut *peer, msg);
}
self.enqueue_message(&mut *peer, commitment_signed);
},
MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling Shutdown event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => {
log_debug!(self.logger, "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}",
log_pubkey!(node_id),
msg.contents.short_channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), update_msg);
},
MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => {
log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
match self.message_handler.route_handler.handle_channel_announcement(&msg) {
Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None),
_ => {},
}
if let Some(msg) = update_msg {
match self.message_handler.route_handler.handle_channel_update(&msg) {
Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None),
_ => {},
}
}
},
MessageSendEvent::BroadcastChannelUpdate { msg } => {
log_debug!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id);
match self.message_handler.route_handler.handle_channel_update(&msg) {
Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None),
_ => {},
}
},
MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
log_debug!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}", msg.contents.node_id);
match self.message_handler.route_handler.handle_node_announcement(&msg) {
Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None),
_ => {},
}
},
MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => {
log_trace!(self.logger, "Handling SendChannelUpdate event in peer_handler for node {} for channel {}",
log_pubkey!(node_id), msg.contents.short_channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::HandleError { ref node_id, ref action } => {
match *action {
msgs::ErrorAction::DisconnectPeer { ref msg } => {
// We do not have the peers write lock, so we just store that we're
// about to disconenct the peer and do it after we finish
// processing most messages.
peers_to_disconnect.insert(*node_id, msg.clone());
},
msgs::ErrorAction::IgnoreAndLog(level) => {
log_given_level!(self.logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
},
msgs::ErrorAction::IgnoreDuplicateGossip => {},
msgs::ErrorAction::IgnoreError => {
log_debug!(self.logger, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
},
msgs::ErrorAction::SendErrorMessage { ref msg } => {
log_trace!(self.logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
log_pubkey!(node_id),
msg.data);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => {
log_given_level!(self.logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}",
log_pubkey!(node_id),
msg.data);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
}
},
MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => {
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => {
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
}
},
MessageSendEvent::BroadcastChannelUpdate { msg } => {
log_debug!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id);
match self.message_handler.route_handler.handle_channel_update(&msg) {
Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None),
_ => {},
MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => {
log_gossip!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
log_pubkey!(node_id),
msg.short_channel_ids.len(),
msg.first_blocknum,
msg.number_of_blocks,
msg.sync_complete);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
}
},
MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
log_debug!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}", msg.contents.node_id);
match self.message_handler.route_handler.handle_node_announcement(&msg) {
Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None),
_ => {},
MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => {
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
}
},
MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => {
log_trace!(self.logger, "Handling SendChannelUpdate event in peer_handler for node {} for channel {}",
log_pubkey!(node_id), msg.contents.short_channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::HandleError { ref node_id, ref action } => {
match *action {
msgs::ErrorAction::DisconnectPeer { ref msg } => {
// We do not have the peers write lock, so we just store that we're
// about to disconenct the peer and do it after we finish
// processing most messages.
peers_to_disconnect.insert(*node_id, msg.clone());
},
msgs::ErrorAction::IgnoreAndLog(level) => {
log_given_level!(self.logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
},
msgs::ErrorAction::IgnoreDuplicateGossip => {},
msgs::ErrorAction::IgnoreError => {
log_debug!(self.logger, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
},
msgs::ErrorAction::SendErrorMessage { ref msg } => {
log_trace!(self.logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
}
}
for (node_id, msg) in self.message_handler.custom_message_handler.get_and_clear_pending_msg() {
if peers_to_disconnect.get(&node_id).is_some() { continue; }
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg);
}
for (descriptor, peer_mutex) in peers.iter() {
let mut peer = peer_mutex.lock().unwrap();
if flush_read_disabled { peer.received_channel_announce_since_backlogged = false; }
self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer, flush_read_disabled);
}
}
if !peers_to_disconnect.is_empty() {
let mut peers_lock = self.peers.write().unwrap();
let peers = &mut *peers_lock;
for (node_id, msg) in peers_to_disconnect.drain() {
// Note that since we are holding the peers *write* lock we can
// remove from node_id_to_descriptor immediately (as no other
// thread can be holding the peer lock if we have the global write
// lock).
let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
if let Some(mut descriptor) = descriptor_opt {
if let Some(peer_mutex) = peers.remove(&descriptor) {
let mut peer = peer_mutex.lock().unwrap();
if let Some(msg) = msg {
log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
log_pubkey!(node_id),
msg.data);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => {
log_given_level!(self.logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}",
log_pubkey!(node_id),
msg.data);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
}
},
MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => {
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => {
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
}
MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => {
log_gossip!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
log_pubkey!(node_id),
msg.short_channel_ids.len(),
msg.first_blocknum,
msg.number_of_blocks,
msg.sync_complete);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
}
MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => {
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *peer, &msg);
// This isn't guaranteed to work, but if there is enough free
// room in the send buffer, put the error message there...
self.do_attempt_write_data(&mut descriptor, &mut *peer, false);
}
self.do_disconnect(descriptor, &*peer, "DisconnectPeer HandleError");
} else { debug_assert!(false, "Missing connection for peer"); }
}
}
}
for (node_id, msg) in self.message_handler.custom_message_handler.get_and_clear_pending_msg() {
if peers_to_disconnect.get(&node_id).is_some() { continue; }
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg);
}
for (descriptor, peer_mutex) in peers.iter() {
let mut peer = peer_mutex.lock().unwrap();
if flush_read_disabled { peer.received_channel_announce_since_backlogged = false; }
self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer, flush_read_disabled);
}
}
if !peers_to_disconnect.is_empty() {
let mut peers_lock = self.peers.write().unwrap();
let peers = &mut *peers_lock;
for (node_id, msg) in peers_to_disconnect.drain() {
// Note that since we are holding the peers *write* lock we can
// remove from node_id_to_descriptor immediately (as no other
// thread can be holding the peer lock if we have the global write
// lock).
let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
if let Some(mut descriptor) = descriptor_opt {
if let Some(peer_mutex) = peers.remove(&descriptor) {
let mut peer = peer_mutex.lock().unwrap();
if let Some(msg) = msg {
log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
log_pubkey!(node_id),
msg.data);
self.enqueue_message(&mut *peer, &msg);
// This isn't guaranteed to work, but if there is enough free
// room in the send buffer, put the error message there...
self.do_attempt_write_data(&mut descriptor, &mut *peer, false);
}
self.do_disconnect(descriptor, &*peer, "DisconnectPeer HandleError");
} else { debug_assert!(false, "Missing connection for peer"); }
}
if self.event_processing_state.fetch_sub(1, Ordering::AcqRel) != 1 {
// If another thread incremented the state while we were running we should go
// around again, but only once.
self.event_processing_state.store(1, Ordering::Release);
continue;
}
break;
}
}
@ -3004,4 +3001,53 @@ mod tests {
// For (None)
assert_eq!(filter_addresses(None), None);
}
#[test]
#[cfg(feature = "std")]
fn test_process_events_multithreaded() {
use std::time::{Duration, Instant};
// Test that `process_events` getting called on multiple threads doesn't generate too many
// loop iterations.
// Each time `process_events` goes around the loop we call
// `get_and_clear_pending_msg_events`, which we count using the `TestMessageHandler`.
// Because the loop should go around once more after a call which fails to take the
// single-threaded lock, if we write zero to the counter before calling `process_events` we
// should never observe there having been more than 2 loop iterations.
// Further, because the last thread to exit will call `process_events` before returning, we
// should always have at least one count at the end.
let cfg = Arc::new(create_peermgr_cfgs(1));
// Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }.
let peer = Arc::new(create_network(1, unsafe { &*(&*cfg as *const _) as &'static _ }).pop().unwrap());
let exit_flag = Arc::new(AtomicBool::new(false));
macro_rules! spawn_thread { () => { {
let thread_cfg = Arc::clone(&cfg);
let thread_peer = Arc::clone(&peer);
let thread_exit = Arc::clone(&exit_flag);
std::thread::spawn(move || {
while !thread_exit.load(Ordering::Acquire) {
thread_cfg[0].chan_handler.message_fetch_counter.store(0, Ordering::Release);
thread_peer.process_events();
std::thread::sleep(Duration::from_micros(1));
}
})
} } }
let thread_a = spawn_thread!();
let thread_b = spawn_thread!();
let thread_c = spawn_thread!();
let start_time = Instant::now();
while start_time.elapsed() < Duration::from_millis(100) {
let val = cfg[0].chan_handler.message_fetch_counter.load(Ordering::Acquire);
assert!(val <= 2);
std::thread::yield_now(); // Winblowz seemingly doesn't ever interrupt threads?!
}
exit_flag.store(true, Ordering::Release);
thread_a.join().unwrap();
thread_b.join().unwrap();
thread_c.join().unwrap();
assert!(cfg[0].chan_handler.message_fetch_counter.load(Ordering::Acquire) >= 1);
}
}

View file

@ -362,6 +362,7 @@ pub struct TestChannelMessageHandler {
pub pending_events: Mutex<Vec<events::MessageSendEvent>>,
expected_recv_msgs: Mutex<Option<Vec<wire::Message<()>>>>,
connected_peers: Mutex<HashSet<PublicKey>>,
pub message_fetch_counter: AtomicUsize,
}
impl TestChannelMessageHandler {
@ -370,6 +371,7 @@ impl TestChannelMessageHandler {
pending_events: Mutex::new(Vec::new()),
expected_recv_msgs: Mutex::new(None),
connected_peers: Mutex::new(HashSet::new()),
message_fetch_counter: AtomicUsize::new(0),
}
}
@ -520,6 +522,7 @@ impl msgs::ChannelMessageHandler for TestChannelMessageHandler {
impl events::MessageSendEventsProvider for TestChannelMessageHandler {
fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
self.message_fetch_counter.fetch_add(1, Ordering::AcqRel);
let mut pending_events = self.pending_events.lock().unwrap();
let mut ret = Vec::new();
mem::swap(&mut ret, &mut *pending_events);