rustfmt: Run on util/persist.rs

This commit is contained in:
Elias Rohrer 2024-09-18 09:40:36 +02:00
parent 873b35a328
commit 7e276f2c32
No known key found for this signature in database
GPG key ID: 36153082BDF676FD

View file

@ -10,28 +10,31 @@
//! //!
//! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager //! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
use bitcoin::{BlockHash, Txid};
use core::cmp; use core::cmp;
use core::ops::Deref; use core::ops::Deref;
use core::str::FromStr; use core::str::FromStr;
use bitcoin::{BlockHash, Txid};
use crate::{io, log_error};
use crate::prelude::*; use crate::prelude::*;
use crate::{io, log_error};
use crate::chain; use crate::chain;
use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use crate::chain::chainmonitor::Persist; use crate::chain::chainmonitor::Persist;
use crate::sign::{EntropySource, ecdsa::EcdsaChannelSigner, SignerProvider}; use crate::chain::channelmonitor::{
ChannelMonitor, ChannelMonitorUpdate, CLOSED_CHANNEL_UPDATE_ID,
};
use crate::chain::transaction::OutPoint; use crate::chain::transaction::OutPoint;
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, CLOSED_CHANNEL_UPDATE_ID};
use crate::ln::channelmanager::AChannelManager; use crate::ln::channelmanager::AChannelManager;
use crate::routing::gossip::NetworkGraph; use crate::routing::gossip::NetworkGraph;
use crate::routing::scoring::WriteableScore; use crate::routing::scoring::WriteableScore;
use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider};
use crate::util::logger::Logger; use crate::util::logger::Logger;
use crate::util::ser::{Readable, ReadableArgs, Writeable}; use crate::util::ser::{Readable, ReadableArgs, Writeable};
/// The alphabet of characters allowed for namespaces and keys. /// The alphabet of characters allowed for namespaces and keys.
pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-"; pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str =
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-";
/// The maximum number of characters namespaces and keys may have. /// The maximum number of characters namespaces and keys may have.
pub const KVSTORE_NAMESPACE_KEY_MAX_LEN: usize = 120; pub const KVSTORE_NAMESPACE_KEY_MAX_LEN: usize = 120;
@ -124,12 +127,16 @@ pub trait KVStore {
/// `primary_namespace` and `secondary_namespace`. /// `primary_namespace` and `secondary_namespace`.
/// ///
/// [`ErrorKind::NotFound`]: io::ErrorKind::NotFound /// [`ErrorKind::NotFound`]: io::ErrorKind::NotFound
fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Result<Vec<u8>, io::Error>; fn read(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> Result<Vec<u8>, io::Error>;
/// Persists the given data under the given `key`. /// Persists the given data under the given `key`.
/// ///
/// Will create the given `primary_namespace` and `secondary_namespace` if not already present /// Will create the given `primary_namespace` and `secondary_namespace` if not already present
/// in the store. /// in the store.
fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> Result<(), io::Error>; fn write(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
) -> Result<(), io::Error>;
/// Removes any data that had previously been persisted under the given `key`. /// Removes any data that had previously been persisted under the given `key`.
/// ///
/// If the `lazy` flag is set to `true`, the backend implementation might choose to lazily /// If the `lazy` flag is set to `true`, the backend implementation might choose to lazily
@ -145,13 +152,17 @@ pub trait KVStore {
/// Returns successfully if no data will be stored for the given `primary_namespace`, /// Returns successfully if no data will be stored for the given `primary_namespace`,
/// `secondary_namespace`, and `key`, independently of whether it was present before its /// `secondary_namespace`, and `key`, independently of whether it was present before its
/// invokation or not. /// invokation or not.
fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Result<(), io::Error>; fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
) -> Result<(), io::Error>;
/// Returns a list of keys that are stored under the given `secondary_namespace` in /// Returns a list of keys that are stored under the given `secondary_namespace` in
/// `primary_namespace`. /// `primary_namespace`.
/// ///
/// Returns the keys in arbitrary order, so users requiring a particular order need to sort the /// Returns the keys in arbitrary order, so users requiring a particular order need to sort the
/// returned keys. Returns an empty list if `primary_namespace` or `secondary_namespace` is unknown. /// returned keys. Returns an empty list if `primary_namespace` or `secondary_namespace` is unknown.
fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Result<Vec<String>, io::Error>; fn list(
&self, primary_namespace: &str, secondary_namespace: &str,
) -> Result<Vec<String>, io::Error>;
} }
/// Trait that handles persisting a [`ChannelManager`], [`NetworkGraph`], and [`WriteableScore`] to disk. /// Trait that handles persisting a [`ChannelManager`], [`NetworkGraph`], and [`WriteableScore`] to disk.
@ -175,7 +186,6 @@ where
fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error>; fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error>;
} }
impl<'a, A: KVStore + ?Sized, CM: Deref, L: Deref, S: Deref> Persister<'a, CM, L, S> for A impl<'a, A: KVStore + ?Sized, CM: Deref, L: Deref, S: Deref> Persister<'a, CM, L, S> for A
where where
CM::Target: 'static + AChannelManager, CM::Target: 'static + AChannelManager,
@ -183,24 +193,30 @@ where
S::Target: WriteableScore<'a>, S::Target: WriteableScore<'a>,
{ {
fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error> { fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error> {
self.write(CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, self.write(
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_KEY,
&channel_manager.get_cm().encode()) &channel_manager.get_cm().encode(),
)
} }
fn persist_graph(&self, network_graph: &NetworkGraph<L>) -> Result<(), io::Error> { fn persist_graph(&self, network_graph: &NetworkGraph<L>) -> Result<(), io::Error> {
self.write(NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, self.write(
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_KEY,
&network_graph.encode()) &network_graph.encode(),
)
} }
fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error> { fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error> {
self.write(SCORER_PERSISTENCE_PRIMARY_NAMESPACE, self.write(
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_KEY,
&scorer.encode()) &scorer.encode(),
)
} }
} }
@ -210,27 +226,34 @@ impl<ChannelSigner: EcdsaChannelSigner, K: KVStore + ?Sized> Persist<ChannelSign
// Then we should return InProgress rather than UnrecoverableError, implying we should probably // Then we should return InProgress rather than UnrecoverableError, implying we should probably
// just shut down the node since we're not retrying persistence! // just shut down the node since we're not retrying persistence!
fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>) -> chain::ChannelMonitorUpdateStatus { fn persist_new_channel(
&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>,
) -> chain::ChannelMonitorUpdateStatus {
let key = format!("{}_{}", funding_txo.txid.to_string(), funding_txo.index); let key = format!("{}_{}", funding_txo.txid.to_string(), funding_txo.index);
match self.write( match self.write(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
&key, &monitor.encode()) &key,
{ &monitor.encode(),
) {
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed, Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError,
} }
} }
fn update_persisted_channel(&self, funding_txo: OutPoint, _update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>) -> chain::ChannelMonitorUpdateStatus { fn update_persisted_channel(
&self, funding_txo: OutPoint, _update: Option<&ChannelMonitorUpdate>,
monitor: &ChannelMonitor<ChannelSigner>,
) -> chain::ChannelMonitorUpdateStatus {
let key = format!("{}_{}", funding_txo.txid.to_string(), funding_txo.index); let key = format!("{}_{}", funding_txo.txid.to_string(), funding_txo.index);
match self.write( match self.write(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
&key, &monitor.encode()) &key,
{ &monitor.encode(),
) {
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed, Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError,
} }
} }
@ -242,7 +265,7 @@ impl<ChannelSigner: EcdsaChannelSigner, K: KVStore + ?Sized> Persist<ChannelSign
monitor_name.as_str(), monitor_name.as_str(),
) { ) {
Ok(monitor) => monitor, Ok(monitor) => monitor,
Err(_) => return Err(_) => return,
}; };
match self.write( match self.write(
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
@ -250,8 +273,8 @@ impl<ChannelSigner: EcdsaChannelSigner, K: KVStore + ?Sized> Persist<ChannelSign
monitor_name.as_str(), monitor_name.as_str(),
&monitor, &monitor,
) { ) {
Ok(()) => {} Ok(()) => {},
Err(_e) => return Err(_e) => return,
}; };
let _ = self.remove( let _ = self.remove(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
@ -274,12 +297,14 @@ where
let mut res = Vec::new(); let mut res = Vec::new();
for stored_key in kv_store.list( for stored_key in kv_store.list(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE)? CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
{ CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
)? {
if stored_key.len() < 66 { if stored_key.len() < 66 {
return Err(io::Error::new( return Err(io::Error::new(
io::ErrorKind::InvalidData, io::ErrorKind::InvalidData,
"Stored key has invalid length")); "Stored key has invalid length",
));
} }
let txid = Txid::from_str(stored_key.split_at(64).0).map_err(|_| { let txid = Txid::from_str(stored_key.split_at(64).0).map_err(|_| {
@ -291,8 +316,11 @@ where
})?; })?;
match <(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>::read( match <(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>::read(
&mut io::Cursor::new( &mut io::Cursor::new(kv_store.read(
kv_store.read(CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, &stored_key)?), CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
&stored_key,
)?),
(&*entropy_source, &*signer_provider), (&*entropy_source, &*signer_provider),
) { ) {
Ok((block_hash, channel_monitor)) => { Ok((block_hash, channel_monitor)) => {
@ -305,13 +333,13 @@ where
)); ));
} }
res.push((block_hash, channel_monitor)); res.push((block_hash, channel_monitor));
} },
Err(_) => { Err(_) => {
return Err(io::Error::new( return Err(io::Error::new(
io::ErrorKind::InvalidData, io::ErrorKind::InvalidData,
"Failed to read ChannelMonitor" "Failed to read ChannelMonitor",
)) ))
} },
} }
} }
Ok(res) Ok(res)
@ -407,7 +435,7 @@ where
ES::Target: EntropySource + Sized, ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized, SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface, BI::Target: BroadcasterInterface,
FE::Target: FeeEstimator FE::Target: FeeEstimator,
{ {
kv_store: K, kv_store: K,
logger: L, logger: L,
@ -415,7 +443,7 @@ where
entropy_source: ES, entropy_source: ES,
signer_provider: SP, signer_provider: SP,
broadcaster: BI, broadcaster: BI,
fee_estimator: FE fee_estimator: FE,
} }
#[allow(dead_code)] #[allow(dead_code)]
@ -427,7 +455,7 @@ where
ES::Target: EntropySource + Sized, ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized, SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface, BI::Target: BroadcasterInterface,
FE::Target: FeeEstimator FE::Target: FeeEstimator,
{ {
/// Constructs a new [`MonitorUpdatingPersister`]. /// Constructs a new [`MonitorUpdatingPersister`].
/// ///
@ -447,7 +475,7 @@ where
/// [`MonitorUpdatingPersister::cleanup_stale_updates`]. /// [`MonitorUpdatingPersister::cleanup_stale_updates`].
pub fn new( pub fn new(
kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES, kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES,
signer_provider: SP, broadcaster: BI, fee_estimator: FE signer_provider: SP, broadcaster: BI, fee_estimator: FE,
) -> Self { ) -> Self {
MonitorUpdatingPersister { MonitorUpdatingPersister {
kv_store, kv_store,
@ -456,7 +484,7 @@ where
entropy_source, entropy_source,
signer_provider, signer_provider,
broadcaster, broadcaster,
fee_estimator fee_estimator,
} }
} }
@ -465,7 +493,12 @@ where
/// It is extremely important that your [`KVStore::read`] implementation uses the /// It is extremely important that your [`KVStore::read`] implementation uses the
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
/// documentation for [`MonitorUpdatingPersister`]. /// documentation for [`MonitorUpdatingPersister`].
pub fn read_all_channel_monitors_with_updates(&self) -> Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>, io::Error> { pub fn read_all_channel_monitors_with_updates(
&self,
) -> Result<
Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>,
io::Error,
> {
let monitor_list = self.kv_store.list( let monitor_list = self.kv_store.list(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
@ -496,7 +529,8 @@ where
/// function to accomplish this. Take care to limit the number of parallel readers. /// function to accomplish this. Take care to limit the number of parallel readers.
pub fn read_channel_monitor_with_updates( pub fn read_channel_monitor_with_updates(
&self, monitor_key: String, &self, monitor_key: String,
) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error> { ) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
{
let monitor_name = MonitorName::new(monitor_key)?; let monitor_name = MonitorName::new(monitor_key)?;
let (block_hash, monitor) = self.read_monitor(&monitor_name)?; let (block_hash, monitor) = self.read_monitor(&monitor_name)?;
let mut current_update_id = monitor.get_latest_update_id(); let mut current_update_id = monitor.get_latest_update_id();
@ -511,21 +545,22 @@ where
Err(err) if err.kind() == io::ErrorKind::NotFound => { Err(err) if err.kind() == io::ErrorKind::NotFound => {
// We can't find any more updates, so we are done. // We can't find any more updates, so we are done.
break; break;
} },
Err(err) => return Err(err), Err(err) => return Err(err),
}; };
monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger) monitor
.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
.map_err(|e| { .map_err(|e| {
log_error!( log_error!(
self.logger, self.logger,
"Monitor update failed. monitor: {} update: {} reason: {:?}", "Monitor update failed. monitor: {} update: {} reason: {:?}",
monitor_name.as_str(), monitor_name.as_str(),
update_name.as_str(), update_name.as_str(),
e e
); );
io::Error::new(io::ErrorKind::Other, "Monitor update failed") io::Error::new(io::ErrorKind::Other, "Monitor update failed")
})?; })?;
} }
Ok((block_hash, monitor)) Ok((block_hash, monitor))
} }
@ -533,7 +568,8 @@ where
/// Read a channel monitor. /// Read a channel monitor.
fn read_monitor( fn read_monitor(
&self, monitor_name: &MonitorName, &self, monitor_name: &MonitorName,
) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error> { ) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
{
let outpoint: OutPoint = monitor_name.try_into()?; let outpoint: OutPoint = monitor_name.try_into()?;
let mut monitor_cursor = io::Cursor::new(self.kv_store.read( let mut monitor_cursor = io::Cursor::new(self.kv_store.read(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
@ -564,7 +600,7 @@ where
} else { } else {
Ok((blockhash, channel_monitor)) Ok((blockhash, channel_monitor))
} }
} },
Err(e) => { Err(e) => {
log_error!( log_error!(
self.logger, self.logger,
@ -573,7 +609,7 @@ where
e, e,
); );
Err(io::Error::new(io::ErrorKind::InvalidData, "Failed to read ChannelMonitor")) Err(io::Error::new(io::ErrorKind::InvalidData, "Failed to read ChannelMonitor"))
} },
} }
} }
@ -613,9 +649,10 @@ where
for monitor_key in monitor_keys { for monitor_key in monitor_keys {
let monitor_name = MonitorName::new(monitor_key)?; let monitor_name = MonitorName::new(monitor_key)?;
let (_, current_monitor) = self.read_monitor(&monitor_name)?; let (_, current_monitor) = self.read_monitor(&monitor_name)?;
let updates = self let updates = self.kv_store.list(
.kv_store CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str())?; monitor_name.as_str(),
)?;
for update in updates { for update in updates {
let update_name = UpdateName::new(update)?; let update_name = UpdateName::new(update)?;
// if the update_id is lower than the stored monitor, delete // if the update_id is lower than the stored monitor, delete
@ -633,20 +670,27 @@ where
} }
} }
impl<ChannelSigner: EcdsaChannelSigner, K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref> impl<
Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP, BI, FE> ChannelSigner: EcdsaChannelSigner,
K: Deref,
L: Deref,
ES: Deref,
SP: Deref,
BI: Deref,
FE: Deref,
> Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
where where
K::Target: KVStore, K::Target: KVStore,
L::Target: Logger, L::Target: Logger,
ES::Target: EntropySource + Sized, ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized, SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface, BI::Target: BroadcasterInterface,
FE::Target: FeeEstimator FE::Target: FeeEstimator,
{ {
/// Persists a new channel. This means writing the entire monitor to the /// Persists a new channel. This means writing the entire monitor to the
/// parametrized [`KVStore`]. /// parametrized [`KVStore`].
fn persist_new_channel( fn persist_new_channel(
&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner> &self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>,
) -> chain::ChannelMonitorUpdateStatus { ) -> chain::ChannelMonitorUpdateStatus {
// Determine the proper key for this monitor // Determine the proper key for this monitor
let monitor_name = MonitorName::from(funding_txo); let monitor_name = MonitorName::from(funding_txo);
@ -662,9 +706,7 @@ where
monitor_name.as_str(), monitor_name.as_str(),
&monitor_bytes, &monitor_bytes,
) { ) {
Ok(_) => { Ok(_) => chain::ChannelMonitorUpdateStatus::Completed,
chain::ChannelMonitorUpdateStatus::Completed
}
Err(e) => { Err(e) => {
log_error!( log_error!(
self.logger, self.logger,
@ -675,7 +717,7 @@ where
e e
); );
chain::ChannelMonitorUpdateStatus::UnrecoverableError chain::ChannelMonitorUpdateStatus::UnrecoverableError
} },
} }
} }
@ -690,7 +732,7 @@ where
/// - The update is at [`CLOSED_CHANNEL_UPDATE_ID`] /// - The update is at [`CLOSED_CHANNEL_UPDATE_ID`]
fn update_persisted_channel( fn update_persisted_channel(
&self, funding_txo: OutPoint, update: Option<&ChannelMonitorUpdate>, &self, funding_txo: OutPoint, update: Option<&ChannelMonitorUpdate>,
monitor: &ChannelMonitor<ChannelSigner> monitor: &ChannelMonitor<ChannelSigner>,
) -> chain::ChannelMonitorUpdateStatus { ) -> chain::ChannelMonitorUpdateStatus {
if let Some(update) = update { if let Some(update) = update {
if update.update_id != CLOSED_CHANNEL_UPDATE_ID if update.update_id != CLOSED_CHANNEL_UPDATE_ID
@ -715,7 +757,7 @@ where
e e
); );
chain::ChannelMonitorUpdateStatus::UnrecoverableError chain::ChannelMonitorUpdateStatus::UnrecoverableError
} },
} }
} else { } else {
let monitor_name = MonitorName::from(funding_txo); let monitor_name = MonitorName::from(funding_txo);
@ -723,29 +765,30 @@ where
// the new one in order to determine the cleanup range. // the new one in order to determine the cleanup range.
let maybe_old_monitor = match monitor.get_latest_update_id() { let maybe_old_monitor = match monitor.get_latest_update_id() {
CLOSED_CHANNEL_UPDATE_ID => self.read_monitor(&monitor_name).ok(), CLOSED_CHANNEL_UPDATE_ID => self.read_monitor(&monitor_name).ok(),
_ => None _ => None,
}; };
// We could write this update, but it meets criteria of our design that calls for a full monitor write. // We could write this update, but it meets criteria of our design that calls for a full monitor write.
let monitor_update_status = self.persist_new_channel(funding_txo, monitor); let monitor_update_status = self.persist_new_channel(funding_txo, monitor);
if let chain::ChannelMonitorUpdateStatus::Completed = monitor_update_status { if let chain::ChannelMonitorUpdateStatus::Completed = monitor_update_status {
let cleanup_range = if monitor.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID { let cleanup_range =
// If there is an error while reading old monitor, we skip clean up. if monitor.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
maybe_old_monitor.map(|(_, ref old_monitor)| { // If there is an error while reading old monitor, we skip clean up.
let start = old_monitor.get_latest_update_id(); maybe_old_monitor.map(|(_, ref old_monitor)| {
// We never persist an update with update_id = CLOSED_CHANNEL_UPDATE_ID let start = old_monitor.get_latest_update_id();
let end = cmp::min( // We never persist an update with update_id = CLOSED_CHANNEL_UPDATE_ID
start.saturating_add(self.maximum_pending_updates), let end = cmp::min(
CLOSED_CHANNEL_UPDATE_ID - 1, start.saturating_add(self.maximum_pending_updates),
); CLOSED_CHANNEL_UPDATE_ID - 1,
(start, end) );
}) (start, end)
} else { })
let end = monitor.get_latest_update_id(); } else {
let start = end.saturating_sub(self.maximum_pending_updates); let end = monitor.get_latest_update_id();
Some((start, end)) let start = end.saturating_sub(self.maximum_pending_updates);
}; Some((start, end))
};
if let Some((start, end)) = cleanup_range { if let Some((start, end)) = cleanup_range {
self.cleanup_in_range(monitor_name, start, end); self.cleanup_in_range(monitor_name, start, end);
@ -765,7 +808,7 @@ where
let monitor_key = monitor_name.as_str().to_string(); let monitor_key = monitor_name.as_str().to_string();
let monitor = match self.read_channel_monitor_with_updates(monitor_key) { let monitor = match self.read_channel_monitor_with_updates(monitor_key) {
Ok((_block_hash, monitor)) => monitor, Ok((_block_hash, monitor)) => monitor,
Err(_) => return Err(_) => return,
}; };
match self.kv_store.write( match self.kv_store.write(
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
@ -773,7 +816,7 @@ where
monitor_name.as_str(), monitor_name.as_str(),
&monitor.encode(), &monitor.encode(),
) { ) {
Ok(()) => {} Ok(()) => {},
Err(_e) => return, Err(_e) => return,
}; };
let _ = self.kv_store.remove( let _ = self.kv_store.remove(
@ -785,14 +828,15 @@ where
} }
} }
impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref> MonitorUpdatingPersister<K, L, ES, SP, BI, FE> impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
where where
ES::Target: EntropySource + Sized, ES::Target: EntropySource + Sized,
K::Target: KVStore, K::Target: KVStore,
L::Target: Logger, L::Target: Logger,
SP::Target: SignerProvider + Sized, SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface, BI::Target: BroadcasterInterface,
FE::Target: FeeEstimator FE::Target: FeeEstimator,
{ {
// Cleans up monitor updates for given monitor in range `start..=end`. // Cleans up monitor updates for given monitor in range `start..=end`.
fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) { fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) {
@ -883,7 +927,7 @@ impl UpdateName {
Ok(u) => Ok(u.into()), Ok(u) => Ok(u.into()),
Err(_) => { Err(_) => {
Err(io::Error::new(io::ErrorKind::InvalidData, "cannot parse u64 from update name")) Err(io::Error::new(io::ErrorKind::InvalidData, "cannot parse u64 from update name"))
} },
} }
} }
@ -905,10 +949,10 @@ mod tests {
use crate::chain::ChannelMonitorUpdateStatus; use crate::chain::ChannelMonitorUpdateStatus;
use crate::events::{ClosureReason, MessageSendEventsProvider}; use crate::events::{ClosureReason, MessageSendEventsProvider};
use crate::ln::functional_test_utils::*; use crate::ln::functional_test_utils::*;
use crate::util::test_utils::{self, TestLogger, TestStore};
use crate::{check_added_monitors, check_closed_broadcast};
use crate::sync::Arc; use crate::sync::Arc;
use crate::util::test_channel_signer::TestChannelSigner; use crate::util::test_channel_signer::TestChannelSigner;
use crate::util::test_utils::{self, TestLogger, TestStore};
use crate::{check_added_monitors, check_closed_broadcast};
const EXPECTED_UPDATES_PER_PAYMENT: u64 = 5; const EXPECTED_UPDATES_PER_PAYMENT: u64 = 5;
@ -928,23 +972,44 @@ mod tests {
#[test] #[test]
fn monitor_from_outpoint_works() { fn monitor_from_outpoint_works() {
let monitor_name1 = MonitorName::from(OutPoint { let monitor_name1 = MonitorName::from(OutPoint {
txid: Txid::from_str("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef").unwrap(), txid: Txid::from_str(
"deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef",
)
.unwrap(),
index: 1, index: 1,
}); });
assert_eq!(monitor_name1.as_str(), "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1"); assert_eq!(
monitor_name1.as_str(),
"deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1"
);
let monitor_name2 = MonitorName::from(OutPoint { let monitor_name2 = MonitorName::from(OutPoint {
txid: Txid::from_str("f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef").unwrap(), txid: Txid::from_str(
"f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef",
)
.unwrap(),
index: u16::MAX, index: u16::MAX,
}); });
assert_eq!(monitor_name2.as_str(), "f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef_65535"); assert_eq!(
monitor_name2.as_str(),
"f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef_65535"
);
} }
#[test] #[test]
fn bad_monitor_string_fails() { fn bad_monitor_string_fails() {
assert!(MonitorName::new("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef".to_string()).is_err()); assert!(MonitorName::new(
assert!(MonitorName::new("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_65536".to_string()).is_err()); "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef".to_string()
assert!(MonitorName::new("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_21".to_string()).is_err()); )
.is_err());
assert!(MonitorName::new(
"deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_65536".to_string()
)
.is_err());
assert!(MonitorName::new(
"deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_21".to_string()
)
.is_err());
} }
// Exercise the `MonitorUpdatingPersister` with real channels and payments. // Exercise the `MonitorUpdatingPersister` with real channels and payments.
@ -997,15 +1062,18 @@ mod tests {
// Check that the persisted channel data is empty before any channels are // Check that the persisted channel data is empty before any channels are
// open. // open.
let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap(); let mut persisted_chan_data_0 =
persister_0.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_0.len(), 0); assert_eq!(persisted_chan_data_0.len(), 0);
let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap(); let mut persisted_chan_data_1 =
persister_1.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_1.len(), 0); assert_eq!(persisted_chan_data_1.len(), 0);
// Helper to make sure the channel is on the expected update ID. // Helper to make sure the channel is on the expected update ID.
macro_rules! check_persisted_data { macro_rules! check_persisted_data {
($expected_update_id: expr) => { ($expected_update_id: expr) => {
persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap(); persisted_chan_data_0 =
persister_0.read_all_channel_monitors_with_updates().unwrap();
// check that we stored only one monitor // check that we stored only one monitor
assert_eq!(persisted_chan_data_0.len(), 1); assert_eq!(persisted_chan_data_0.len(), 1);
for (_, mon) in persisted_chan_data_0.iter() { for (_, mon) in persisted_chan_data_0.iter() {
@ -1015,26 +1083,41 @@ mod tests {
// if the CM is at consolidation threshold, ensure no updates are stored. // if the CM is at consolidation threshold, ensure no updates are stored.
let monitor_name = MonitorName::from(mon.get_funding_txo().0); let monitor_name = MonitorName::from(mon.get_funding_txo().0);
if mon.get_latest_update_id() % persister_0_max_pending_updates == 0 if mon.get_latest_update_id() % persister_0_max_pending_updates == 0
|| mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID { || mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID
{
assert_eq!( assert_eq!(
persister_0.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, persister_0
monitor_name.as_str()).unwrap().len(), .kv_store
.list(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str()
)
.unwrap()
.len(),
0, 0,
"updates stored when they shouldn't be in persister 0" "updates stored when they shouldn't be in persister 0"
); );
} }
} }
persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap(); persisted_chan_data_1 =
persister_1.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_1.len(), 1); assert_eq!(persisted_chan_data_1.len(), 1);
for (_, mon) in persisted_chan_data_1.iter() { for (_, mon) in persisted_chan_data_1.iter() {
assert_eq!(mon.get_latest_update_id(), $expected_update_id); assert_eq!(mon.get_latest_update_id(), $expected_update_id);
let monitor_name = MonitorName::from(mon.get_funding_txo().0); let monitor_name = MonitorName::from(mon.get_funding_txo().0);
// if the CM is at consolidation threshold, ensure no updates are stored. // if the CM is at consolidation threshold, ensure no updates are stored.
if mon.get_latest_update_id() % persister_1_max_pending_updates == 0 if mon.get_latest_update_id() % persister_1_max_pending_updates == 0
|| mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID { || mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID
{
assert_eq!( assert_eq!(
persister_1.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, persister_1
monitor_name.as_str()).unwrap().len(), .kv_store
.list(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str()
)
.unwrap()
.len(),
0, 0,
"updates stored when they shouldn't be in persister 1" "updates stored when they shouldn't be in persister 1"
); );
@ -1102,8 +1185,22 @@ mod tests {
let (_, monitor) = &persisted_chan_data[0]; let (_, monitor) = &persisted_chan_data[0];
let monitor_name = MonitorName::from(monitor.get_funding_txo().0); let monitor_name = MonitorName::from(monitor.get_funding_txo().0);
// The channel should have 0 updates, as it wrote a full monitor and consolidated. // The channel should have 0 updates, as it wrote a full monitor and consolidated.
assert_eq!(persister_0.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str()).unwrap().len(), 0); assert_eq!(
assert_eq!(persister_1.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str()).unwrap().len(), 0); persister_0
.kv_store
.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str())
.unwrap()
.len(),
0
);
assert_eq!(
persister_1
.kv_store
.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str())
.unwrap()
.len(),
0
);
} }
// Test that if the `MonitorUpdatingPersister`'s can't actually write, trying to persist a // Test that if the `MonitorUpdatingPersister`'s can't actually write, trying to persist a
@ -1126,7 +1223,9 @@ mod tests {
let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap(); let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
let cmu_map = nodes[1].chain_monitor.monitor_updates.lock().unwrap(); let cmu_map = nodes[1].chain_monitor.monitor_updates.lock().unwrap();
let cmu = &cmu_map.get(&added_monitors[0].1.channel_id()).unwrap()[0]; let cmu = &cmu_map.get(&added_monitors[0].1.channel_id()).unwrap()[0];
let txid = Txid::from_str("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(); let txid =
Txid::from_str("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be")
.unwrap();
let test_txo = OutPoint { txid, index: 0 }; let test_txo = OutPoint { txid, index: 0 };
let ro_persister = MonitorUpdatingPersister { let ro_persister = MonitorUpdatingPersister {
@ -1141,24 +1240,24 @@ mod tests {
match ro_persister.persist_new_channel(test_txo, &added_monitors[0].1) { match ro_persister.persist_new_channel(test_txo, &added_monitors[0].1) {
ChannelMonitorUpdateStatus::UnrecoverableError => { ChannelMonitorUpdateStatus::UnrecoverableError => {
// correct result // correct result
} },
ChannelMonitorUpdateStatus::Completed => { ChannelMonitorUpdateStatus::Completed => {
panic!("Completed persisting new channel when shouldn't have") panic!("Completed persisting new channel when shouldn't have")
} },
ChannelMonitorUpdateStatus::InProgress => { ChannelMonitorUpdateStatus::InProgress => {
panic!("Returned InProgress when shouldn't have") panic!("Returned InProgress when shouldn't have")
} },
} }
match ro_persister.update_persisted_channel(test_txo, Some(cmu), &added_monitors[0].1) { match ro_persister.update_persisted_channel(test_txo, Some(cmu), &added_monitors[0].1) {
ChannelMonitorUpdateStatus::UnrecoverableError => { ChannelMonitorUpdateStatus::UnrecoverableError => {
// correct result // correct result
} },
ChannelMonitorUpdateStatus::Completed => { ChannelMonitorUpdateStatus::Completed => {
panic!("Completed persisting new channel when shouldn't have") panic!("Completed persisting new channel when shouldn't have")
} },
ChannelMonitorUpdateStatus::InProgress => { ChannelMonitorUpdateStatus::InProgress => {
panic!("Returned InProgress when shouldn't have") panic!("Returned InProgress when shouldn't have")
} },
} }
added_monitors.clear(); added_monitors.clear();
} }
@ -1228,7 +1327,12 @@ mod tests {
let monitor_name = MonitorName::from(monitor.get_funding_txo().0); let monitor_name = MonitorName::from(monitor.get_funding_txo().0);
persister_0 persister_0
.kv_store .kv_store
.write(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str(), UpdateName::from(1).as_str(), &[0u8; 1]) .write(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str(),
UpdateName::from(1).as_str(),
&[0u8; 1],
)
.unwrap(); .unwrap();
// Do the stale update cleanup // Do the stale update cleanup
@ -1237,7 +1341,11 @@ mod tests {
// Confirm the stale update is unreadable/gone // Confirm the stale update is unreadable/gone
assert!(persister_0 assert!(persister_0
.kv_store .kv_store
.read(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str(), UpdateName::from(1).as_str()) .read(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str(),
UpdateName::from(1).as_str()
)
.is_err()); .is_err());
// Force close. // Force close.
@ -1253,7 +1361,12 @@ mod tests {
// Write an update near u64::MAX // Write an update near u64::MAX
persister_0 persister_0
.kv_store .kv_store
.write(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str(), UpdateName::from(u64::MAX - 1).as_str(), &[0u8; 1]) .write(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str(),
UpdateName::from(u64::MAX - 1).as_str(),
&[0u8; 1],
)
.unwrap(); .unwrap();
// Do the stale update cleanup // Do the stale update cleanup
@ -1262,11 +1375,18 @@ mod tests {
// Confirm the stale update is unreadable/gone // Confirm the stale update is unreadable/gone
assert!(persister_0 assert!(persister_0
.kv_store .kv_store
.read(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str(), UpdateName::from(u64::MAX - 1).as_str()) .read(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str(),
UpdateName::from(u64::MAX - 1).as_str()
)
.is_err()); .is_err());
} }
fn persist_fn<P: Deref, ChannelSigner: EcdsaChannelSigner>(_persist: P) -> bool where P::Target: Persist<ChannelSigner> { fn persist_fn<P: Deref, ChannelSigner: EcdsaChannelSigner>(_persist: P) -> bool
where
P::Target: Persist<ChannelSigner>,
{
true true
} }