Unify message sending to use PeerManager::enqueue_message

This makes our logging consistent and somewhat simplifies message
sending code in a few places.
This commit is contained in:
Matt Corallo 2021-06-22 01:30:19 +00:00
parent 133e28ffe6
commit 3ea4279d55

View file

@ -549,14 +549,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
}
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
macro_rules! encode_and_send_msg {
($msg: expr) => {
{
log_trace!(self.logger, "Encoding and sending sync update message of type {} to {}", $msg.type_id(), log_pubkey!(peer.their_node_id.unwrap()));
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg)[..]));
}
}
}
while !peer.awaiting_write_event {
if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE {
match peer.sync_status {
@ -565,12 +557,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
let steps = ((OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len() + 2) / 3) as u8;
let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps);
for &(ref announce, ref update_a_option, ref update_b_option) in all_messages.iter() {
encode_and_send_msg!(announce);
self.enqueue_message(peer, announce);
if let &Some(ref update_a) = update_a_option {
encode_and_send_msg!(update_a);
self.enqueue_message(peer, update_a);
}
if let &Some(ref update_b) = update_b_option {
encode_and_send_msg!(update_b);
self.enqueue_message(peer, update_b);
}
peer.sync_status = InitSyncTracker::ChannelsSyncing(announce.contents.short_channel_id + 1);
}
@ -582,7 +574,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
let steps = (OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len()) as u8;
let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps);
for msg in all_messages.iter() {
encode_and_send_msg!(msg);
self.enqueue_message(peer, msg);
peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
}
if all_messages.is_empty() || all_messages.len() != steps as usize {
@ -594,7 +586,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
let steps = (OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len()) as u8;
let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps);
for msg in all_messages.iter() {
encode_and_send_msg!(msg);
self.enqueue_message(peer, msg);
peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
}
if all_messages.is_empty() || all_messages.len() != steps as usize {
@ -1158,15 +1150,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
log_trace!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.temporary_channel_id));
let peer = get_peer_for_forwarding!(node_id);
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
log_trace!(self.logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.temporary_channel_id));
let peer = get_peer_for_forwarding!(node_id);
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
log_trace!(self.logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
@ -1175,29 +1165,25 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
log_funding_channel_id!(msg.funding_txid, msg.funding_output_index));
// TODO: If the peer is gone we should generate a DiscardFunding event
// indicating to the wallet that they should just throw away this funding transaction
let peer = get_peer_for_forwarding!(node_id);
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
log_trace!(self.logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
let peer = get_peer_for_forwarding!(node_id);
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => {
log_trace!(self.logger, "Handling SendFundingLocked event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
let peer = get_peer_for_forwarding!(node_id);
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
log_trace!(self.logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
let peer = get_peer_for_forwarding!(node_id);
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
log_trace!(self.logger, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
@ -1208,49 +1194,45 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
log_bytes!(commitment_signed.channel_id));
let peer = get_peer_for_forwarding!(node_id);
for msg in update_add_htlcs {
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
self.enqueue_message(peer, msg);
}
for msg in update_fulfill_htlcs {
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
self.enqueue_message(peer, msg);
}
for msg in update_fail_htlcs {
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
self.enqueue_message(peer, msg);
}
for msg in update_fail_malformed_htlcs {
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
self.enqueue_message(peer, msg);
}
if let &Some(ref msg) = update_fee {
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
self.enqueue_message(peer, msg);
}
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(commitment_signed)));
self.enqueue_message(peer, commitment_signed);
},
MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
log_trace!(self.logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
let peer = get_peer_for_forwarding!(node_id);
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
log_trace!(self.logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
let peer = get_peer_for_forwarding!(node_id);
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
log_trace!(self.logger, "Handling Shutdown event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
let peer = get_peer_for_forwarding!(node_id);
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
log_trace!(self.logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
log_bytes!(msg.channel_id));
let peer = get_peer_for_forwarding!(node_id);
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => {
log_trace!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
@ -1283,7 +1265,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
log_pubkey!(node_id),
msg.data);
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
self.enqueue_message(&mut peer, msg);
// This isn't guaranteed to work, but if there is enough free
// room in the send buffer, put the error message there...
self.do_attempt_write_data(&mut descriptor, &mut peer);
@ -1300,18 +1282,15 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
log_trace!(self.logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
log_pubkey!(node_id),
msg.data);
let peer = get_peer_for_forwarding!(node_id);
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
},
}
},
MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => {
let peer = get_peer_for_forwarding!(node_id);
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => {
let peer = get_peer_for_forwarding!(node_id);
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
}
MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => {
log_trace!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
@ -1320,8 +1299,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
msg.first_blocknum,
msg.number_of_blocks,
msg.sync_complete);
let peer = get_peer_for_forwarding!(node_id);
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
}
}
}
@ -1420,7 +1398,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
ponglen: 0,
byteslen: 64,
};
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(&ping)));
self.enqueue_message(peer, &ping);
let mut descriptor_clone = descriptor.clone();
self.do_attempt_write_data(&mut descriptor_clone, peer);