mirror of
https://github.com/lightningdevkit/rust-lightning.git
synced 2025-03-14 07:06:42 +01:00
Handle PeerStorage Message and its Persistence
This commit introduces the handling and persistence of PeerStorage messages on a per-peer basis. The peer storage is stored within the PeerState to simplify management, ensuring we do not need to remove it when there are no active channels with the peer. Key changes include: - Add PeerStorage to PeerState for persistent storage. - Implement internal_peer_storage to manage PeerStorage and its updates. - Add resend logic in peer_connected() to resend PeerStorage before sending the channel reestablish message upon reconnection. - Update PeerState's write() and read() methods to support PeerStorage persistence.
This commit is contained in:
parent
81e89d82fd
commit
068549def9
1 changed files with 76 additions and 3 deletions
|
@ -1404,6 +1404,8 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
|
|||
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
|
||||
/// [`ChannelMessageHandler::peer_disconnected`].
|
||||
pub is_connected: bool,
|
||||
/// Holds the peer storage data for the channel partner on a per-peer basis.
|
||||
peer_storage: Vec<u8>,
|
||||
}
|
||||
|
||||
impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider {
|
||||
|
@ -2872,6 +2874,13 @@ const MAX_UNFUNDED_CHANS_PER_PEER: usize = 4;
|
|||
/// this many peers we reject new (inbound) channels from peers with which we don't have a channel.
|
||||
const MAX_UNFUNDED_CHANNEL_PEERS: usize = 50;
|
||||
|
||||
/// The maximum allowed size for peer storage, in bytes.
|
||||
///
|
||||
/// This constant defines the upper limit for the size of data
|
||||
/// that can be stored for a peer. It is set to 1024 bytes (1 kilobyte)
|
||||
/// to prevent excessive resource consumption.
|
||||
const MAX_PEER_STORAGE_SIZE: usize = 1024;
|
||||
|
||||
/// The maximum number of peers which we do not have a (funded) channel with. Once we reach this
|
||||
/// many peers we reject new (inbound) connections.
|
||||
const MAX_NO_CHANNEL_PEERS: usize = 250;
|
||||
|
@ -8245,11 +8254,50 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
|
|||
}
|
||||
}
|
||||
|
||||
fn internal_peer_storage_retrieval(&self, _counterparty_node_id: PublicKey, _msg: msgs::PeerStorageRetrieval) -> Result<(), MsgHandleErrInternal> {
|
||||
Ok(())
|
||||
fn internal_peer_storage_retrieval(&self, counterparty_node_id: PublicKey, _msg: msgs::PeerStorageRetrieval) -> Result<(), MsgHandleErrInternal> {
|
||||
// TODO: Decrypt and check if have any stale or missing ChannelMonitor.
|
||||
let logger = WithContext::from(&self.logger, Some(counterparty_node_id), None, None);
|
||||
|
||||
log_debug!(logger, "Received unexpected peer_storage_retrieval from {}. This is unusual since we do not yet distribute peer storage. Sending a warning.", log_pubkey!(counterparty_node_id));
|
||||
|
||||
Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn(
|
||||
"Invalid peer_storage_retrieval message received.".into(),
|
||||
), ChannelId([0; 32])))
|
||||
}
|
||||
|
||||
fn internal_peer_storage(&self, _counterparty_node_id: PublicKey, _msg: msgs::PeerStorage) -> Result<(), MsgHandleErrInternal> {
|
||||
fn internal_peer_storage(&self, counterparty_node_id: PublicKey, msg: msgs::PeerStorage) -> Result<(), MsgHandleErrInternal> {
|
||||
let per_peer_state = self.per_peer_state.read().unwrap();
|
||||
let peer_state_mutex = per_peer_state.get(&counterparty_node_id)
|
||||
.ok_or_else(|| {
|
||||
debug_assert!(false);
|
||||
MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), ChannelId([0; 32]))
|
||||
})?;
|
||||
|
||||
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
|
||||
let peer_state = &mut *peer_state_lock;
|
||||
let logger = WithContext::from(&self.logger, Some(counterparty_node_id), None, None);
|
||||
|
||||
// Check if we have any channels with the peer (Currently we only provide the service to peers we have a channel with).
|
||||
if !peer_state.channel_by_id.values().any(|phase| phase.is_funded()) {
|
||||
log_debug!(logger, "Ignoring peer storage request from {} as we don't have any funded channels with them.", log_pubkey!(counterparty_node_id));
|
||||
return Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn(
|
||||
"Ignoring peer_storage message, as peer storage is currently supported only for \
|
||||
peers with an active funded channel.".into(),
|
||||
), ChannelId([0; 32])));
|
||||
}
|
||||
|
||||
#[cfg(not(test))]
|
||||
if msg.data.len() > MAX_PEER_STORAGE_SIZE {
|
||||
log_debug!(logger, "Sending warning to peer and ignoring peer storage request from {} as its over 1KiB", log_pubkey!(counterparty_node_id));
|
||||
|
||||
return Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn(
|
||||
format!("Supports only data up to {} bytes in peer storage.", MAX_PEER_STORAGE_SIZE)
|
||||
), ChannelId([0; 32])));
|
||||
}
|
||||
|
||||
log_trace!(logger, "Received peer_storage from {}", log_pubkey!(counterparty_node_id));
|
||||
peer_state.peer_storage = msg.data;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -11758,6 +11806,7 @@ where
|
|||
actions_blocking_raa_monitor_updates: BTreeMap::new(),
|
||||
closed_channel_monitor_update_ids: BTreeMap::new(),
|
||||
is_connected: true,
|
||||
peer_storage: Vec::new(),
|
||||
}));
|
||||
},
|
||||
hash_map::Entry::Occupied(e) => {
|
||||
|
@ -11787,6 +11836,15 @@ where
|
|||
let peer_state = &mut *peer_state_lock;
|
||||
let pending_msg_events = &mut peer_state.pending_msg_events;
|
||||
|
||||
if !peer_state.peer_storage.is_empty() {
|
||||
pending_msg_events.push(events::MessageSendEvent::SendPeerStorageRetrieval {
|
||||
node_id: counterparty_node_id.clone(),
|
||||
msg: msgs::PeerStorageRetrieval {
|
||||
data: peer_state.peer_storage.clone()
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
for (_, chan) in peer_state.channel_by_id.iter_mut() {
|
||||
let logger = WithChannelContext::from(&self.logger, &chan.context(), None);
|
||||
match chan.peer_connected_get_handshake(self.chain_hash, &&logger) {
|
||||
|
@ -12995,6 +13053,8 @@ where
|
|||
peer_states.push(peer_state_mutex.unsafe_well_ordered_double_lock_self());
|
||||
}
|
||||
|
||||
let mut peer_storage_dir: Vec<(&PublicKey, &Vec<u8>)> = Vec::new();
|
||||
|
||||
(serializable_peer_count).write(writer)?;
|
||||
for ((peer_pubkey, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) {
|
||||
// Peers which we have no channels to should be dropped once disconnected. As we
|
||||
|
@ -13004,6 +13064,8 @@ where
|
|||
if !peer_state.ok_to_remove(false) {
|
||||
peer_pubkey.write(writer)?;
|
||||
peer_state.latest_features.write(writer)?;
|
||||
peer_storage_dir.push((peer_pubkey, &peer_state.peer_storage));
|
||||
|
||||
if !peer_state.monitor_update_blocked_actions.is_empty() {
|
||||
monitor_update_blocked_actions_per_peer
|
||||
.get_or_insert_with(Vec::new)
|
||||
|
@ -13125,6 +13187,7 @@ where
|
|||
(14, decode_update_add_htlcs_opt, option),
|
||||
(15, self.inbound_payment_id_secret, required),
|
||||
(17, in_flight_monitor_updates, required),
|
||||
(19, peer_storage_dir, optional_vec),
|
||||
});
|
||||
|
||||
Ok(())
|
||||
|
@ -13357,6 +13420,7 @@ where
|
|||
monitor_update_blocked_actions: BTreeMap::new(),
|
||||
actions_blocking_raa_monitor_updates: BTreeMap::new(),
|
||||
closed_channel_monitor_update_ids: BTreeMap::new(),
|
||||
peer_storage: Vec::new(),
|
||||
is_connected: false,
|
||||
}
|
||||
};
|
||||
|
@ -13652,6 +13716,7 @@ where
|
|||
let mut in_flight_monitor_updates: Option<HashMap<(PublicKey, ChannelId), Vec<ChannelMonitorUpdate>>> = None;
|
||||
let mut decode_update_add_htlcs: Option<HashMap<u64, Vec<msgs::UpdateAddHTLC>>> = None;
|
||||
let mut inbound_payment_id_secret = None;
|
||||
let mut peer_storage_dir: Option<Vec<(PublicKey, Vec<u8>)>> = None;
|
||||
read_tlv_fields!(reader, {
|
||||
(1, pending_outbound_payments_no_retry, option),
|
||||
(2, pending_intercepted_htlcs, option),
|
||||
|
@ -13668,8 +13733,10 @@ where
|
|||
(14, decode_update_add_htlcs, option),
|
||||
(15, inbound_payment_id_secret, option),
|
||||
(17, in_flight_monitor_updates, required),
|
||||
(19, peer_storage_dir, optional_vec),
|
||||
});
|
||||
let mut decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
|
||||
let peer_storage_dir: Vec<(PublicKey, Vec<u8>)> = peer_storage_dir.unwrap_or_else(Vec::new);
|
||||
if fake_scid_rand_bytes.is_none() {
|
||||
fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes());
|
||||
}
|
||||
|
@ -13701,6 +13768,12 @@ where
|
|||
}
|
||||
let pending_outbounds = OutboundPayments::new(pending_outbound_payments.unwrap());
|
||||
|
||||
for (peer_pubkey, peer_storage) in peer_storage_dir {
|
||||
if let Some(peer_state) = per_peer_state.get_mut(&peer_pubkey) {
|
||||
peer_state.get_mut().unwrap().peer_storage = peer_storage;
|
||||
}
|
||||
}
|
||||
|
||||
// Handle transitioning from the legacy TLV to the new one on upgrades.
|
||||
if let Some(legacy_in_flight_upds) = legacy_in_flight_monitor_updates {
|
||||
// We should never serialize an empty map.
|
||||
|
|
Loading…
Add table
Reference in a new issue