Stop cleaning monitor updates on new block connect

Previously, we used to cleanup monitor updates at both consolidation
threshold and new block connects. With this change we will only
cleanup when our consolidation criteria is met. Also, we remove
monitor read from cleanup logic, in case of update consolidation.
Note: In case of channel-closing monitor update, we still need to
read the old monitor before persisting the new one in order to
determine the cleanup range.
This commit is contained in:
Gursharan Singh 2023-12-08 11:28:19 -08:00
parent c2bbfffb1e
commit ef0909627d
No known key found for this signature in database
GPG key ID: 36026714CF2F1B3F

View file

@ -346,9 +346,10 @@ where
/// ///
/// # Pruning stale channel updates /// # Pruning stale channel updates
/// ///
/// Stale updates are pruned when a full monitor is written. The old monitor is first read, and if /// Stale updates are pruned when the consolidation threshold is reached according to `maximum_pending_updates`.
/// that succeeds, updates in the range between the old and new monitors are deleted. The `lazy` /// Monitor updates in the range between the latest `update_id` and `update_id - maximum_pending_updates`
/// flag is used on the [`KVStore::remove`] method, so there are no guarantees that the deletions /// are deleted.
/// The `lazy` flag is used on the [`KVStore::remove`] method, so there are no guarantees that the deletions
/// will complete. However, stale updates are not a problem for data integrity, since updates are /// will complete. However, stale updates are not a problem for data integrity, since updates are
/// only read that are higher than the stored [`ChannelMonitor`]'s `update_id`. /// only read that are higher than the stored [`ChannelMonitor`]'s `update_id`.
/// ///
@ -610,24 +611,6 @@ where
) -> chain::ChannelMonitorUpdateStatus { ) -> chain::ChannelMonitorUpdateStatus {
// Determine the proper key for this monitor // Determine the proper key for this monitor
let monitor_name = MonitorName::from(funding_txo); let monitor_name = MonitorName::from(funding_txo);
let maybe_old_monitor = self.read_monitor(&monitor_name);
match maybe_old_monitor {
Ok((_, ref old_monitor)) => {
// Check that this key isn't already storing a monitor with a higher update_id
// (collision)
if old_monitor.get_latest_update_id() > monitor.get_latest_update_id() {
log_error!(
self.logger,
"Tried to write a monitor at the same outpoint {} with a higher update_id!",
monitor_name.as_str()
);
return chain::ChannelMonitorUpdateStatus::UnrecoverableError;
}
}
// This means the channel monitor is new.
Err(ref e) if e.kind() == io::ErrorKind::NotFound => {}
_ => return chain::ChannelMonitorUpdateStatus::UnrecoverableError,
}
// Serialize and write the new monitor // Serialize and write the new monitor
let mut monitor_bytes = Vec::with_capacity( let mut monitor_bytes = Vec::with_capacity(
MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() + monitor.serialized_length(), MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() + monitor.serialized_length(),
@ -641,65 +624,12 @@ where
&monitor_bytes, &monitor_bytes,
) { ) {
Ok(_) => { Ok(_) => {
// Assess cleanup. Typically, we'll clean up only between the last two known full
// monitors.
if let Ok((_, old_monitor)) = maybe_old_monitor {
let start = old_monitor.get_latest_update_id();
let end = if monitor.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
// We don't want to clean the rest of u64, so just do possible pending
// updates. Note that we never write updates at
// `CLOSED_CHANNEL_UPDATE_ID`.
cmp::min(
start.saturating_add(self.maximum_pending_updates),
CLOSED_CHANNEL_UPDATE_ID - 1,
)
} else {
monitor.get_latest_update_id().saturating_sub(1)
};
// We should bother cleaning up only if there's at least one update
// expected.
for update_id in start..=end {
let update_name = UpdateName::from(update_id);
#[cfg(debug_assertions)]
{
if let Ok(update) =
self.read_monitor_update(&monitor_name, &update_name)
{
// Assert that we are reading what we think we are.
debug_assert_eq!(update.update_id, update_name.0);
} else if update_id != start && monitor.get_latest_update_id() != CLOSED_CHANNEL_UPDATE_ID
{
// We're deleting something we should know doesn't exist.
panic!(
"failed to read monitor update {}",
update_name.as_str()
);
}
// On closed channels, we will unavoidably try to read
// non-existent updates since we have to guess at the range of
// stale updates, so do nothing.
}
if let Err(e) = self.kv_store.remove(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str(),
update_name.as_str(),
true,
) {
log_error!(
self.logger,
"error cleaning up channel monitor updates for monitor {}, reason: {}",
monitor_name.as_str(),
e
);
};
}
};
chain::ChannelMonitorUpdateStatus::Completed chain::ChannelMonitorUpdateStatus::Completed
} }
Err(e) => { Err(e) => {
log_error!( log_error!(
self.logger, self.logger,
"error writing channel monitor {}/{}/{} reason: {}", "Failed to write ChannelMonitor {}/{}/{} reason: {}",
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_name.as_str(), monitor_name.as_str(),
@ -741,7 +671,7 @@ where
Err(e) => { Err(e) => {
log_error!( log_error!(
self.logger, self.logger,
"error writing channel monitor update {}/{}/{} reason: {}", "Failed to write ChannelMonitorUpdate {}/{}/{} reason: {}",
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str(), monitor_name.as_str(),
update_name.as_str(), update_name.as_str(),
@ -751,8 +681,41 @@ where
} }
} }
} else { } else {
// We could write this update, but it meets criteria of our design that call for a full monitor write. let monitor_name = MonitorName::from(funding_txo);
self.persist_new_channel(funding_txo, monitor, monitor_update_call_id) // In case of channel-close monitor update, we need to read old monitor before persisting
// the new one in order to determine the cleanup range.
let maybe_old_monitor = match monitor.get_latest_update_id() {
CLOSED_CHANNEL_UPDATE_ID => self.read_monitor(&monitor_name).ok(),
_ => None
};
// We could write this update, but it meets criteria of our design that calls for a full monitor write.
let monitor_update_status = self.persist_new_channel(funding_txo, monitor, monitor_update_call_id);
if let chain::ChannelMonitorUpdateStatus::Completed = monitor_update_status {
let cleanup_range = if monitor.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
// If there is an error while reading old monitor, we skip clean up.
maybe_old_monitor.map(|(_, ref old_monitor)| {
let start = old_monitor.get_latest_update_id();
// We never persist an update with update_id = CLOSED_CHANNEL_UPDATE_ID
let end = cmp::min(
start.saturating_add(self.maximum_pending_updates),
CLOSED_CHANNEL_UPDATE_ID - 1,
);
(start, end)
})
} else {
let end = monitor.get_latest_update_id();
let start = end.saturating_sub(self.maximum_pending_updates);
Some((start, end))
};
if let Some((start, end)) = cleanup_range {
self.cleanup_in_range(monitor_name, start, end);
}
}
monitor_update_status
} }
} else { } else {
// There is no update given, so we must persist a new monitor. // There is no update given, so we must persist a new monitor.
@ -761,6 +724,34 @@ where
} }
} }
impl<K: Deref, L: Deref, ES: Deref, SP: Deref> MonitorUpdatingPersister<K, L, ES, SP>
where
ES::Target: EntropySource + Sized,
K::Target: KVStore,
L::Target: Logger,
SP::Target: SignerProvider + Sized
{
// Cleans up monitor updates for given monitor in range `start..=end`.
fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) {
for update_id in start..=end {
let update_name = UpdateName::from(update_id);
if let Err(e) = self.kv_store.remove(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str(),
update_name.as_str(),
true,
) {
log_error!(
self.logger,
"Failed to clean up channel monitor updates for monitor {}, reason: {}",
monitor_name.as_str(),
e
);
};
}
}
}
/// A struct representing a name for a monitor. /// A struct representing a name for a monitor.
#[derive(Debug)] #[derive(Debug)]
struct MonitorName(String); struct MonitorName(String);
@ -896,20 +887,21 @@ mod tests {
#[test] #[test]
fn persister_with_real_monitors() { fn persister_with_real_monitors() {
// This value is used later to limit how many iterations we perform. // This value is used later to limit how many iterations we perform.
let test_max_pending_updates = 7; let persister_0_max_pending_updates = 7;
// Intentionally set this to a smaller value to test a different alignment.
let persister_1_max_pending_updates = 3;
let chanmon_cfgs = create_chanmon_cfgs(4); let chanmon_cfgs = create_chanmon_cfgs(4);
let persister_0 = MonitorUpdatingPersister { let persister_0 = MonitorUpdatingPersister {
kv_store: &TestStore::new(false), kv_store: &TestStore::new(false),
logger: &TestLogger::new(), logger: &TestLogger::new(),
maximum_pending_updates: test_max_pending_updates, maximum_pending_updates: persister_0_max_pending_updates,
entropy_source: &chanmon_cfgs[0].keys_manager, entropy_source: &chanmon_cfgs[0].keys_manager,
signer_provider: &chanmon_cfgs[0].keys_manager, signer_provider: &chanmon_cfgs[0].keys_manager,
}; };
let persister_1 = MonitorUpdatingPersister { let persister_1 = MonitorUpdatingPersister {
kv_store: &TestStore::new(false), kv_store: &TestStore::new(false),
logger: &TestLogger::new(), logger: &TestLogger::new(),
// Intentionally set this to a smaller value to test a different alignment. maximum_pending_updates: persister_1_max_pending_updates,
maximum_pending_updates: 3,
entropy_source: &chanmon_cfgs[1].keys_manager, entropy_source: &chanmon_cfgs[1].keys_manager,
signer_provider: &chanmon_cfgs[1].keys_manager, signer_provider: &chanmon_cfgs[1].keys_manager,
}; };
@ -934,7 +926,6 @@ mod tests {
node_cfgs[1].chain_monitor = chain_mon_1; node_cfgs[1].chain_monitor = chain_mon_1;
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs); let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let broadcaster_0 = &chanmon_cfgs[2].tx_broadcaster; let broadcaster_0 = &chanmon_cfgs[2].tx_broadcaster;
let broadcaster_1 = &chanmon_cfgs[3].tx_broadcaster; let broadcaster_1 = &chanmon_cfgs[3].tx_broadcaster;
@ -957,10 +948,11 @@ mod tests {
for (_, mon) in persisted_chan_data_0.iter() { for (_, mon) in persisted_chan_data_0.iter() {
// check that when we read it, we got the right update id // check that when we read it, we got the right update id
assert_eq!(mon.get_latest_update_id(), $expected_update_id); assert_eq!(mon.get_latest_update_id(), $expected_update_id);
// if the CM is at the correct update id without updates, ensure no updates are stored
// if the CM is at consolidation threshold, ensure no updates are stored.
let monitor_name = MonitorName::from(mon.get_funding_txo().0); let monitor_name = MonitorName::from(mon.get_funding_txo().0);
let (_, cm_0) = persister_0.read_monitor(&monitor_name).unwrap(); if mon.get_latest_update_id() % persister_0_max_pending_updates == 0
if cm_0.get_latest_update_id() == $expected_update_id { || mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
assert_eq!( assert_eq!(
persister_0.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, persister_0.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str()).unwrap().len(), monitor_name.as_str()).unwrap().len(),
@ -975,8 +967,9 @@ mod tests {
for (_, mon) in persisted_chan_data_1.iter() { for (_, mon) in persisted_chan_data_1.iter() {
assert_eq!(mon.get_latest_update_id(), $expected_update_id); assert_eq!(mon.get_latest_update_id(), $expected_update_id);
let monitor_name = MonitorName::from(mon.get_funding_txo().0); let monitor_name = MonitorName::from(mon.get_funding_txo().0);
let (_, cm_1) = persister_1.read_monitor(&monitor_name).unwrap(); // if the CM is at consolidation threshold, ensure no updates are stored.
if cm_1.get_latest_update_id() == $expected_update_id { if mon.get_latest_update_id() % persister_1_max_pending_updates == 0
|| mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
assert_eq!( assert_eq!(
persister_1.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, persister_1.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str()).unwrap().len(), monitor_name.as_str()).unwrap().len(),
@ -1001,7 +994,7 @@ mod tests {
// Send a few more payments to try all the alignments of max pending updates with // Send a few more payments to try all the alignments of max pending updates with
// updates for a payment sent and received. // updates for a payment sent and received.
let mut sender = 0; let mut sender = 0;
for i in 3..=test_max_pending_updates * 2 { for i in 3..=persister_0_max_pending_updates * 2 {
let receiver; let receiver;
if sender == 0 { if sender == 0 {
sender = 1; sender = 1;