Use wrapper to add context to logging

Using a decorator pattern, add peer_id and channel_id to Record
stored on Logger.
This commit is contained in:
henghonglee 2023-09-06 10:37:54 +08:00 committed by Jeffrey Czyz
parent a727ccab94
commit df3ab2ee27
No known key found for this signature in database
GPG key ID: 3A4E08275D5E96D2
5 changed files with 314 additions and 227 deletions

View file

@ -29,7 +29,7 @@ use bitcoin::hash_types::{Txid, BlockHash};
use crate::chain;
use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput};
use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS};
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor, LATENCY_GRACE_PERIOD_BLOCKS};
use crate::chain::transaction::{OutPoint, TransactionData};
use crate::sign::ecdsa::WriteableEcdsaChannelSigner;
use crate::events;
@ -359,6 +359,7 @@ where C::Target: chain::Filter,
process: FN, funding_outpoint: &OutPoint, monitor_state: &MonitorHolder<ChannelSigner>
) -> Result<(), ()> where FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs> {
let monitor = &monitor_state.monitor;
let logger = WithChannelMonitor::from(&self.logger, &monitor);
let mut txn_outputs;
{
txn_outputs = process(monitor, txdata);
@ -375,12 +376,12 @@ where C::Target: chain::Filter,
}
}
log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
log_trace!(logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) {
ChannelMonitorUpdateStatus::Completed =>
log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
log_trace!(logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
ChannelMonitorUpdateStatus::InProgress => {
log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
log_debug!(logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
pending_monitor_updates.push(update_id);
},
ChannelMonitorUpdateStatus::UnrecoverableError => {
@ -619,8 +620,9 @@ where C::Target: chain::Filter,
pub fn rebroadcast_pending_claims(&self) {
let monitors = self.monitors.read().unwrap();
for (_, monitor_holder) in &*monitors {
let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor);
monitor_holder.monitor.rebroadcast_pending_claims(
&*self.broadcaster, &*self.fee_estimator, &*self.logger
&*self.broadcaster, &*self.fee_estimator, &logger
)
}
}
@ -638,8 +640,9 @@ where
fn filtered_block_connected(&self, header: &Header, txdata: &TransactionData, height: u32) {
log_debug!(self.logger, "New best block {} at height {} provided via block_connected", header.block_hash(), height);
self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| {
let logger = WithChannelMonitor::from(&self.logger, &monitor);
monitor.block_connected(
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &logger)
});
}
@ -647,8 +650,9 @@ where
let monitor_states = self.monitors.read().unwrap();
log_debug!(self.logger, "Latest block {} at height {} removed via block_disconnected", header.block_hash(), height);
for monitor_state in monitor_states.values() {
let logger = WithChannelMonitor::from(&self.logger, &monitor_state.monitor);
monitor_state.monitor.block_disconnected(
header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
header, height, &*self.broadcaster, &*self.fee_estimator, &logger);
}
}
}
@ -665,8 +669,9 @@ where
fn transactions_confirmed(&self, header: &Header, txdata: &TransactionData, height: u32) {
log_debug!(self.logger, "{} provided transactions confirmed at height {} in block {}", txdata.len(), height, header.block_hash());
self.process_chain_data(header, None, txdata, |monitor, txdata| {
let logger = WithChannelMonitor::from(&self.logger, &monitor);
monitor.transactions_confirmed(
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &logger)
});
}
@ -674,18 +679,20 @@ where
log_debug!(self.logger, "Transaction {} reorganized out of chain", txid);
let monitor_states = self.monitors.read().unwrap();
for monitor_state in monitor_states.values() {
monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
let logger = WithChannelMonitor::from(&self.logger, &monitor_state.monitor);
monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &logger);
}
}
fn best_block_updated(&self, header: &Header, height: u32) {
log_debug!(self.logger, "New best block {} at height {} provided via best_block_updated", header.block_hash(), height);
self.process_chain_data(header, Some(height), &[], |monitor, txdata| {
let logger = WithChannelMonitor::from(&self.logger, &monitor);
// While in practice there shouldn't be any recursive calls when given empty txdata,
// it's still possible if a chain::Filter implementation returns a transaction.
debug_assert!(txdata.is_empty());
monitor.best_block_updated(
header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
header, height, &*self.broadcaster, &*self.fee_estimator, &logger)
});
}
@ -711,29 +718,30 @@ where C::Target: chain::Filter,
P::Target: Persist<ChannelSigner>,
{
fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor<ChannelSigner>) -> Result<ChannelMonitorUpdateStatus, ()> {
let logger = WithChannelMonitor::from(&self.logger, &monitor);
let mut monitors = self.monitors.write().unwrap();
let entry = match monitors.entry(funding_outpoint) {
hash_map::Entry::Occupied(_) => {
log_error!(self.logger, "Failed to add new channel data: channel monitor for given outpoint is already present");
log_error!(logger, "Failed to add new channel data: channel monitor for given outpoint is already present");
return Err(());
},
hash_map::Entry::Vacant(e) => e,
};
log_trace!(self.logger, "Got new ChannelMonitor for channel {}", log_funding_info!(monitor));
log_trace!(logger, "Got new ChannelMonitor for channel {}", log_funding_info!(monitor));
let update_id = MonitorUpdateId::from_new_monitor(&monitor);
let mut pending_monitor_updates = Vec::new();
let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor, update_id);
match persist_res {
ChannelMonitorUpdateStatus::InProgress => {
log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor));
log_info!(logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor));
pending_monitor_updates.push(update_id);
},
ChannelMonitorUpdateStatus::Completed => {
log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor));
log_info!(logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor));
},
ChannelMonitorUpdateStatus::UnrecoverableError => {
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
log_error!(self.logger, "{}", err_str);
log_error!(logger, "{}", err_str);
panic!("{}", err_str);
},
}
@ -765,7 +773,8 @@ where C::Target: chain::Filter,
},
Some(monitor_state) => {
let monitor = &monitor_state.monitor;
log_trace!(self.logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor));
let logger = WithChannelMonitor::from(&self.logger, &monitor);
log_trace!(logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor));
let update_res = monitor.update_monitor(update, &self.broadcaster, &self.fee_estimator, &self.logger);
let update_id = MonitorUpdateId::from_monitor_update(update);
@ -776,7 +785,7 @@ where C::Target: chain::Filter,
// We don't want to persist a `monitor_update` which results in a failure to apply later
// while reading `channel_monitor` with updates from storage. Instead, we should persist
// the entire `channel_monitor` here.
log_warn!(self.logger, "Failed to update ChannelMonitor for channel {}. Going ahead and persisting the entire ChannelMonitor", log_funding_info!(monitor));
log_warn!(logger, "Failed to update ChannelMonitor for channel {}. Going ahead and persisting the entire ChannelMonitor", log_funding_info!(monitor));
self.persister.update_persisted_channel(funding_txo, None, monitor, update_id)
} else {
self.persister.update_persisted_channel(funding_txo, Some(update), monitor, update_id)
@ -784,10 +793,10 @@ where C::Target: chain::Filter,
match persist_res {
ChannelMonitorUpdateStatus::InProgress => {
pending_monitor_updates.push(update_id);
log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} in progress", log_funding_info!(monitor));
log_debug!(logger, "Persistence of ChannelMonitorUpdate for channel {} in progress", log_funding_info!(monitor));
},
ChannelMonitorUpdateStatus::Completed => {
log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} completed", log_funding_info!(monitor));
log_debug!(logger, "Persistence of ChannelMonitorUpdate for channel {} completed", log_funding_info!(monitor));
},
ChannelMonitorUpdateStatus::UnrecoverableError => { /* we'll panic in a moment */ },
}
@ -799,12 +808,16 @@ where C::Target: chain::Filter,
}
};
if let ChannelMonitorUpdateStatus::UnrecoverableError = ret {
let logger = WithChannelMonitor::from(
&self.logger, &monitors.get(&funding_txo).unwrap().monitor
);
// Take the monitors lock for writing so that we poison it and any future
// operations going forward fail immediately.
core::mem::drop(monitors);
let _poison = self.monitors.write().unwrap();
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
log_error!(self.logger, "{}", err_str);
log_error!(logger, "{}", err_str);
panic!("{}", err_str);
}
ret
@ -813,12 +826,13 @@ where C::Target: chain::Filter,
fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)> {
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
for monitor_state in self.monitors.read().unwrap().values() {
let logger = WithChannelMonitor::from(&self.logger, &monitor_state.monitor);
let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap());
if !is_pending_monitor_update || monitor_state.last_chain_persist_height.load(Ordering::Acquire) + LATENCY_GRACE_PERIOD_BLOCKS as usize <= self.highest_chain_height.load(Ordering::Acquire) {
if is_pending_monitor_update {
log_error!(self.logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS);
log_error!(self.logger, " To avoid funds-loss, we are allowing monitor updates to be released.");
log_error!(self.logger, " This may cause duplicate payment events to be generated.");
log_error!(logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS);
log_error!(logger, " To avoid funds-loss, we are allowing monitor updates to be released.");
log_error!(logger, " This may cause duplicate payment events to be generated.");
}
let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();
if monitor_events.len() > 0 {

View file

@ -4168,11 +4168,11 @@ where
L::Target: Logger,
{
fn filtered_block_connected(&self, header: &Header, txdata: &TransactionData, height: u32) {
self.0.block_connected(header, txdata, height, &*self.1, &*self.2, &*self.3);
self.0.block_connected(header, txdata, height, &*self.1, &*self.2, &WithChannelMonitor::from(&self.3, &self.0));
}
fn block_disconnected(&self, header: &Header, height: u32) {
self.0.block_disconnected(header, height, &*self.1, &*self.2, &*self.3);
self.0.block_disconnected(header, height, &*self.1, &*self.2, &WithChannelMonitor::from(&self.3, &self.0));
}
}
@ -4184,15 +4184,15 @@ where
L::Target: Logger,
{
fn transactions_confirmed(&self, header: &Header, txdata: &TransactionData, height: u32) {
self.0.transactions_confirmed(header, txdata, height, &*self.1, &*self.2, &*self.3);
self.0.transactions_confirmed(header, txdata, height, &*self.1, &*self.2, &WithChannelMonitor::from(&self.3, &self.0));
}
fn transaction_unconfirmed(&self, txid: &Txid) {
self.0.transaction_unconfirmed(txid, &*self.1, &*self.2, &*self.3);
self.0.transaction_unconfirmed(txid, &*self.1, &*self.2, &WithChannelMonitor::from(&self.3, &self.0));
}
fn best_block_updated(&self, header: &Header, height: u32) {
self.0.best_block_updated(header, height, &*self.1, &*self.2, &*self.3);
self.0.best_block_updated(header, height, &*self.1, &*self.2, &WithChannelMonitor::from(&self.3, &self.0));
}
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {

View file

@ -35,7 +35,7 @@ use crate::ln::chan_utils;
use crate::ln::onion_utils::HTLCFailReason;
use crate::chain::BestBlock;
use crate::chain::chaininterface::{FeeEstimator, ConfirmationTarget, LowerBoundedFeeEstimator};
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, LATENCY_GRACE_PERIOD_BLOCKS, CLOSED_CHANNEL_UPDATE_ID};
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, WithChannelMonitor, LATENCY_GRACE_PERIOD_BLOCKS, CLOSED_CHANNEL_UPDATE_ID};
use crate::chain::transaction::{OutPoint, TransactionData};
use crate::sign::ecdsa::{EcdsaChannelSigner, WriteableEcdsaChannelSigner};
use crate::sign::{EntropySource, ChannelSigner, SignerProvider, NodeSigner, Recipient};
@ -2773,14 +2773,14 @@ impl<SP: Deref> Channel<SP> where
funding_redeemscript.clone(), self.context.channel_value_satoshis,
obscure_factor,
holder_commitment_tx, best_block, self.context.counterparty_node_id);
let logger_with_chan_monitor = WithChannelMonitor::from(logger, &channel_monitor);
channel_monitor.provide_initial_counterparty_commitment_tx(
counterparty_initial_bitcoin_tx.txid, Vec::new(),
self.context.cur_counterparty_commitment_transaction_number,
self.context.counterparty_cur_commitment_point.unwrap(),
counterparty_initial_commitment_tx.feerate_per_kw(),
counterparty_initial_commitment_tx.to_broadcaster_value_sat(),
counterparty_initial_commitment_tx.to_countersignatory_value_sat(), logger);
counterparty_initial_commitment_tx.to_countersignatory_value_sat(), &&logger_with_chan_monitor);
assert_eq!(self.context.channel_state & (ChannelState::MonitorUpdateInProgress as u32), 0); // We have no had any monitor(s) yet to fail update!
if self.context.is_batch_funding() {
@ -6976,13 +6976,13 @@ impl<SP: Deref> InboundV1Channel<SP> where SP::Target: SignerProvider {
funding_redeemscript.clone(), self.context.channel_value_satoshis,
obscure_factor,
holder_commitment_tx, best_block, self.context.counterparty_node_id);
let logger_with_chan_monitor = WithChannelMonitor::from(logger, &channel_monitor);
channel_monitor.provide_initial_counterparty_commitment_tx(
counterparty_initial_commitment_tx.trust().txid(), Vec::new(),
self.context.cur_counterparty_commitment_transaction_number + 1,
self.context.counterparty_cur_commitment_point.unwrap(), self.context.feerate_per_kw,
counterparty_initial_commitment_tx.to_broadcaster_value_sat(),
counterparty_initial_commitment_tx.to_countersignatory_value_sat(), logger);
counterparty_initial_commitment_tx.to_countersignatory_value_sat(), &&logger_with_chan_monitor);
log_info!(logger, "{} funding_signed for peer for channel {}",
if funding_signed.is_some() { "Generated" } else { "Waiting for signature on" }, &self.context.channel_id());

File diff suppressed because it is too large Load diff

View file

@ -35,7 +35,7 @@ use crate::onion_message::{SimpleArcOnionMessenger, SimpleRefOnionMessenger};
use crate::onion_message::{CustomOnionMessageHandler, OffersMessage, OffersMessageHandler, OnionMessageContents, PendingOnionMessage};
use crate::routing::gossip::{NetworkGraph, P2PGossipSync, NodeId, NodeAlias};
use crate::util::atomic_counter::AtomicCounter;
use crate::util::logger::Logger;
use crate::util::logger::{Logger, WithContext};
use crate::util::string::PrintableString;
use crate::prelude::*;
@ -1253,10 +1253,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
/// Append a message to a peer's pending outbound/write buffer
fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
let logger = WithContext::from(&self.logger, Some(peer.their_node_id.unwrap().0), None);
if is_gossip_msg(message.type_id()) {
log_gossip!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0));
log_gossip!(logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0));
} else {
log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0))
log_trace!(logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0))
}
peer.msgs_sent_since_pong += 1;
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(message));
@ -1355,9 +1356,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
macro_rules! insert_node_id {
() => {
let logger = WithContext::from(&self.logger, Some(peer.their_node_id.unwrap().0), None);
match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap().0) {
hash_map::Entry::Occupied(e) => {
log_trace!(self.logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap().0));
log_trace!(logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap().0));
peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event
// Check that the peers map is consistent with the
// node_id_to_descriptor map, as this has been broken
@ -1366,7 +1368,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
return Err(PeerHandleError { })
},
hash_map::Entry::Vacant(entry) => {
log_debug!(self.logger, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap().0));
log_debug!(logger, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap().0));
entry.insert(peer_descriptor.clone())
},
};
@ -1434,6 +1436,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
peer.pending_read_buffer.resize(18, 0);
peer.pending_read_is_header = true;
let logger = WithContext::from(&self.logger, Some(peer.their_node_id.unwrap().0), None);
let message = match message_result {
Ok(x) => x,
Err(e) => {
@ -1443,16 +1446,16 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
// the messages enqueued here to not actually
// be sent before the peer is disconnected.
(msgs::DecodeError::UnknownRequiredFeature, Some(ty)) if is_gossip_msg(ty) => {
log_gossip!(self.logger, "Got a channel/node announcement with an unknown required feature flag, you may want to update!");
log_gossip!(logger, "Got a channel/node announcement with an unknown required feature flag, you may want to update!");
continue;
}
(msgs::DecodeError::UnsupportedCompression, _) => {
log_gossip!(self.logger, "We don't support zlib-compressed message fields, sending a warning and ignoring message");
log_gossip!(logger, "We don't support zlib-compressed message fields, sending a warning and ignoring message");
self.enqueue_message(peer, &msgs::WarningMessage { channel_id: ChannelId::new_zero(), data: "Unsupported message compression: zlib".to_owned() });
continue;
}
(_, Some(ty)) if is_gossip_msg(ty) => {
log_gossip!(self.logger, "Got an invalid value while deserializing a gossip message");
log_gossip!(logger, "Got an invalid value while deserializing a gossip message");
self.enqueue_message(peer, &msgs::WarningMessage {
channel_id: ChannelId::new_zero(),
data: format!("Unreadable/bogus gossip message of type {}", ty),
@ -1460,16 +1463,16 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
continue;
}
(msgs::DecodeError::UnknownRequiredFeature, _) => {
log_debug!(self.logger, "Received a message with an unknown required feature flag or TLV, you may want to update!");
log_debug!(logger, "Received a message with an unknown required feature flag or TLV, you may want to update!");
return Err(PeerHandleError { });
}
(msgs::DecodeError::UnknownVersion, _) => return Err(PeerHandleError { }),
(msgs::DecodeError::InvalidValue, _) => {
log_debug!(self.logger, "Got an invalid value while deserializing message");
log_debug!(logger, "Got an invalid value while deserializing message");
return Err(PeerHandleError { });
}
(msgs::DecodeError::ShortRead, _) => {
log_debug!(self.logger, "Deserialization failed due to shortness of message");
log_debug!(logger, "Deserialization failed due to shortness of message");
return Err(PeerHandleError { });
}
(msgs::DecodeError::BadLengthDescriptor, _) => return Err(PeerHandleError { }),
@ -1519,6 +1522,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
message: wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>
) -> Result<Option<wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> {
let their_node_id = peer_lock.their_node_id.clone().expect("We know the peer's public key by the time we receive messages").0;
let logger = WithContext::from(&self.logger, Some(their_node_id), None);
peer_lock.received_message_since_timer_tick = true;
// Need an Init as first message
@ -1536,7 +1540,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
}
}
if !have_compatible_chains {
log_debug!(self.logger, "Peer does not support any of our supported chains");
log_debug!(logger, "Peer does not support any of our supported chains");
return Err(PeerHandleError { }.into());
}
}
@ -1544,12 +1548,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
let our_features = self.init_features(&their_node_id);
if msg.features.requires_unknown_bits_from(&our_features) {
log_debug!(self.logger, "Peer requires features unknown to us");
log_debug!(logger, "Peer requires features unknown to us");
return Err(PeerHandleError { }.into());
}
if our_features.requires_unknown_bits_from(&msg.features) {
log_debug!(self.logger, "We require features unknown to our peer");
log_debug!(logger, "We require features unknown to our peer");
return Err(PeerHandleError { }.into());
}
@ -1557,7 +1561,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
return Err(PeerHandleError { }.into());
}
log_info!(self.logger, "Received peer Init message from {}: {}", log_pubkey!(their_node_id), msg.features);
log_info!(logger, "Received peer Init message from {}: {}", log_pubkey!(their_node_id), msg.features);
// For peers not supporting gossip queries start sync now, otherwise wait until we receive a filter.
if msg.features.initial_routing_sync() && !msg.features.supports_gossip_queries() {
@ -1565,22 +1569,22 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
}
if let Err(()) = self.message_handler.route_handler.peer_connected(&their_node_id, &msg, peer_lock.inbound_connection) {
log_debug!(self.logger, "Route Handler decided we couldn't communicate with peer {}", log_pubkey!(their_node_id));
log_debug!(logger, "Route Handler decided we couldn't communicate with peer {}", log_pubkey!(their_node_id));
return Err(PeerHandleError { }.into());
}
if let Err(()) = self.message_handler.chan_handler.peer_connected(&their_node_id, &msg, peer_lock.inbound_connection) {
log_debug!(self.logger, "Channel Handler decided we couldn't communicate with peer {}", log_pubkey!(their_node_id));
log_debug!(logger, "Channel Handler decided we couldn't communicate with peer {}", log_pubkey!(their_node_id));
return Err(PeerHandleError { }.into());
}
if let Err(()) = self.message_handler.onion_message_handler.peer_connected(&their_node_id, &msg, peer_lock.inbound_connection) {
log_debug!(self.logger, "Onion Message Handler decided we couldn't communicate with peer {}", log_pubkey!(their_node_id));
log_debug!(logger, "Onion Message Handler decided we couldn't communicate with peer {}", log_pubkey!(their_node_id));
return Err(PeerHandleError { }.into());
}
peer_lock.their_features = Some(msg.features);
return Ok(None);
} else if peer_lock.their_features.is_none() {
log_debug!(self.logger, "Peer {} sent non-Init first message", log_pubkey!(their_node_id));
log_debug!(logger, "Peer {} sent non-Init first message", log_pubkey!(their_node_id));
return Err(PeerHandleError { }.into());
}
@ -1602,9 +1606,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
mem::drop(peer_lock);
if is_gossip_msg(message.type_id()) {
log_gossip!(self.logger, "Received message {:?} from {}", message, log_pubkey!(their_node_id));
log_gossip!(logger, "Received message {:?} from {}", message, log_pubkey!(their_node_id));
} else {
log_trace!(self.logger, "Received message {:?} from {}", message, log_pubkey!(their_node_id));
log_trace!(logger, "Received message {:?} from {}", message, log_pubkey!(their_node_id));
}
let mut should_forward = None;
@ -1618,14 +1622,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
// Handled above
},
wire::Message::Error(msg) => {
log_debug!(self.logger, "Got Err message from {}: {}", log_pubkey!(their_node_id), PrintableString(&msg.data));
log_debug!(logger, "Got Err message from {}: {}", log_pubkey!(their_node_id), PrintableString(&msg.data));
self.message_handler.chan_handler.handle_error(&their_node_id, &msg);
if msg.channel_id.is_zero() {
return Err(PeerHandleError { }.into());
}
},
wire::Message::Warning(msg) => {
log_debug!(self.logger, "Got warning message from {}: {}", log_pubkey!(their_node_id), PrintableString(&msg.data));
log_debug!(logger, "Got warning message from {}: {}", log_pubkey!(their_node_id), PrintableString(&msg.data));
},
wire::Message::Ping(msg) => {
@ -1789,11 +1793,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
// Unknown messages:
wire::Message::Unknown(type_id) if message.is_even() => {
log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", type_id);
log_debug!(logger, "Received unknown even message of type {}, disconnecting peer!", type_id);
return Err(PeerHandleError { }.into());
},
wire::Message::Unknown(type_id) => {
log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", type_id);
log_trace!(logger, "Received unknown odd message of type {}, ignoring", type_id);
},
wire::Message::Custom(custom) => {
self.message_handler.custom_message_handler.handle_custom_message(custom, &their_node_id)?;
@ -1810,6 +1814,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
for (_, peer_mutex) in peers.iter() {
let mut peer = peer_mutex.lock().unwrap();
let logger = WithContext::from(&self.logger, Some(peer.their_node_id.unwrap().0), None);
if !peer.handshake_complete() ||
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
continue
@ -1817,7 +1822,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
debug_assert!(peer.their_node_id.is_some());
debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
if peer.buffer_full_drop_gossip_broadcast() {
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
log_gossip!(logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
if let Some((_, their_node_id)) = peer.their_node_id {
@ -1837,6 +1842,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
for (_, peer_mutex) in peers.iter() {
let mut peer = peer_mutex.lock().unwrap();
let logger = WithContext::from(&self.logger, Some(peer.their_node_id.unwrap().0), None);
if !peer.handshake_complete() ||
!peer.should_forward_node_announcement(msg.contents.node_id) {
continue
@ -1844,7 +1850,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
debug_assert!(peer.their_node_id.is_some());
debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
if peer.buffer_full_drop_gossip_broadcast() {
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
log_gossip!(logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
if let Some((_, their_node_id)) = peer.their_node_id {
@ -1864,6 +1870,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
for (_, peer_mutex) in peers.iter() {
let mut peer = peer_mutex.lock().unwrap();
let logger = WithContext::from(&self.logger, Some(peer.their_node_id.unwrap().0), None);
if !peer.handshake_complete() ||
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
continue
@ -1871,7 +1878,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
debug_assert!(peer.their_node_id.is_some());
debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
if peer.buffer_full_drop_gossip_broadcast() {
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
log_gossip!(logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
@ -1953,31 +1960,31 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
for event in events_generated.drain(..) {
match event {
MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id)), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.temporary_channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}",
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id)), "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.temporary_channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id)), "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.temporary_channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}",
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id)), "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.temporary_channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id)), "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
log_pubkey!(node_id),
&msg.temporary_channel_id,
log_funding_channel_id!(msg.funding_txid, msg.funding_output_index));
@ -1986,13 +1993,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendChannelReady { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendChannelReady event in peer_handler for node {} for channel {}",
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendChannelReady event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
@ -2022,67 +2029,67 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
}
MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxAddInput event in peer_handler for node {} for channel {}",
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAddInput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxAddOutput event in peer_handler for node {} for channel {}",
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAddOutput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}",
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}",
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxComplete { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxComplete event in peer_handler for node {} for channel {}",
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxComplete event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxSignatures event in peer_handler for node {} for channel {}",
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxSignatures event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxInitRbf event in peer_handler for node {} for channel {}",
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxInitRbf event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxAckRbf event in peer_handler for node {} for channel {}",
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAckRbf event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxAbort { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxAbort event in peer_handler for node {} for channel {}",
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAbort event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
log_debug!(self.logger, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(commitment_signed.channel_id)), "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
log_pubkey!(node_id),
update_add_htlcs.len(),
update_fulfill_htlcs.len(),
@ -2107,31 +2114,31 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
self.enqueue_message(&mut *peer, commitment_signed);
},
MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling Shutdown event in peer_handler for node {} for channel {}",
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling Shutdown event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => {
log_debug!(self.logger, "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}",
log_debug!(WithContext::from(&self.logger, Some(*node_id), None), "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}",
log_pubkey!(node_id),
msg.contents.short_channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
@ -2169,18 +2176,19 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
}
},
MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => {
log_trace!(self.logger, "Handling SendChannelUpdate event in peer_handler for node {} for channel {}",
log_trace!(WithContext::from(&self.logger, Some(*node_id), None), "Handling SendChannelUpdate event in peer_handler for node {} for channel {}",
log_pubkey!(node_id), msg.contents.short_channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::HandleError { node_id, action } => {
let logger = WithContext::from(&self.logger, Some(node_id), None);
match action {
msgs::ErrorAction::DisconnectPeer { msg } => {
if let Some(msg) = msg.as_ref() {
log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
log_trace!(logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
log_pubkey!(node_id), msg.data);
} else {
log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {}",
log_trace!(logger, "Handling DisconnectPeer HandleError event in peer_handler for node {}",
log_pubkey!(node_id));
}
// We do not have the peers write lock, so we just store that we're
@ -2190,7 +2198,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
peers_to_disconnect.insert(node_id, msg);
},
msgs::ErrorAction::DisconnectPeerWithWarning { msg } => {
log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
log_trace!(logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
log_pubkey!(node_id), msg.data);
// We do not have the peers write lock, so we just store that we're
// about to disconenct the peer and do it after we finish
@ -2198,20 +2206,20 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
peers_to_disconnect.insert(node_id, Some(wire::Message::Warning(msg)));
},
msgs::ErrorAction::IgnoreAndLog(level) => {
log_given_level!(self.logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
log_given_level!(logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
},
msgs::ErrorAction::IgnoreDuplicateGossip => {},
msgs::ErrorAction::IgnoreError => {
log_debug!(self.logger, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
log_debug!(logger, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
},
msgs::ErrorAction::SendErrorMessage { ref msg } => {
log_trace!(self.logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
log_trace!(logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
log_pubkey!(node_id),
msg.data);
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg);
},
msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => {
log_given_level!(self.logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}",
log_given_level!(logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}",
log_pubkey!(node_id),
msg.data);
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg);
@ -2225,7 +2233,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
}
MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => {
log_gossip!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
log_gossip!(WithContext::from(&self.logger, Some(*node_id), None), "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
log_pubkey!(node_id),
msg.short_channel_ids.len(),
msg.first_blocknum,
@ -2299,7 +2307,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
debug_assert!(peer.their_node_id.is_some());
if let Some((node_id, _)) = peer.their_node_id {
log_trace!(self.logger, "Disconnecting peer with id {} due to {}", node_id, reason);
log_trace!(WithContext::from(&self.logger, Some(node_id), None), "Disconnecting peer with id {} due to {}", node_id, reason);
self.message_handler.chan_handler.peer_disconnected(&node_id);
self.message_handler.onion_message_handler.peer_disconnected(&node_id);
}
@ -2318,7 +2326,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
Some(peer_lock) => {
let peer = peer_lock.lock().unwrap();
if let Some((node_id, _)) = peer.their_node_id {
log_trace!(self.logger, "Handling disconnection of peer {}", log_pubkey!(node_id));
log_trace!(WithContext::from(&self.logger, Some(node_id), None), "Handling disconnection of peer {}", log_pubkey!(node_id));
let removed = self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
debug_assert!(removed.is_some(), "descriptor maps should be consistent");
if !peer.handshake_complete() { return; }