mirror of
https://github.com/lightningdevkit/rust-lightning.git
synced 2025-02-24 23:08:36 +01:00
Store Broadcaster and FeeEstimator on MonitorUpdatingPersister.
`MonitorUpdatingPersister` does not currently correctly archive monitors because it neglects any unapplied updates. In order to start applying these updates, the archiving methods will require access to instances of `BroadcasterInterface` and `FeeEstimator`. This commit requires that the `MonitorUpdatingPersister` be instantiated with those instances, obviating the need for passing them around, and laying the foundation for the following commit.
This commit is contained in:
parent
0d7ae8616b
commit
61197390d6
1 changed files with 43 additions and 43 deletions
|
@ -400,28 +400,34 @@ where
|
|||
/// If you have many stale updates stored (such as after a crash with pending lazy deletes), and
|
||||
/// would like to get rid of them, consider using the
|
||||
/// [`MonitorUpdatingPersister::cleanup_stale_updates`] function.
|
||||
pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref>
|
||||
pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
|
||||
where
|
||||
K::Target: KVStore,
|
||||
L::Target: Logger,
|
||||
ES::Target: EntropySource + Sized,
|
||||
SP::Target: SignerProvider + Sized,
|
||||
BI::Target: BroadcasterInterface,
|
||||
FE::Target: FeeEstimator
|
||||
{
|
||||
kv_store: K,
|
||||
logger: L,
|
||||
maximum_pending_updates: u64,
|
||||
entropy_source: ES,
|
||||
signer_provider: SP,
|
||||
broadcaster: BI,
|
||||
fee_estimator: FE
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl<K: Deref, L: Deref, ES: Deref, SP: Deref>
|
||||
MonitorUpdatingPersister<K, L, ES, SP>
|
||||
impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
|
||||
MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
|
||||
where
|
||||
K::Target: KVStore,
|
||||
L::Target: Logger,
|
||||
ES::Target: EntropySource + Sized,
|
||||
SP::Target: SignerProvider + Sized,
|
||||
BI::Target: BroadcasterInterface,
|
||||
FE::Target: FeeEstimator
|
||||
{
|
||||
/// Constructs a new [`MonitorUpdatingPersister`].
|
||||
///
|
||||
|
@ -441,7 +447,7 @@ where
|
|||
/// [`MonitorUpdatingPersister::cleanup_stale_updates`].
|
||||
pub fn new(
|
||||
kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES,
|
||||
signer_provider: SP,
|
||||
signer_provider: SP, broadcaster: BI, fee_estimator: FE
|
||||
) -> Self {
|
||||
MonitorUpdatingPersister {
|
||||
kv_store,
|
||||
|
@ -449,6 +455,8 @@ where
|
|||
maximum_pending_updates,
|
||||
entropy_source,
|
||||
signer_provider,
|
||||
broadcaster,
|
||||
fee_estimator
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -457,24 +465,14 @@ where
|
|||
/// It is extremely important that your [`KVStore::read`] implementation uses the
|
||||
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
|
||||
/// documentation for [`MonitorUpdatingPersister`].
|
||||
pub fn read_all_channel_monitors_with_updates<B: Deref, F: Deref>(
|
||||
&self, broadcaster: &B, fee_estimator: &F,
|
||||
) -> Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>, io::Error>
|
||||
where
|
||||
B::Target: BroadcasterInterface,
|
||||
F::Target: FeeEstimator,
|
||||
{
|
||||
pub fn read_all_channel_monitors_with_updates(&self) -> Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>, io::Error> {
|
||||
let monitor_list = self.kv_store.list(
|
||||
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
|
||||
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
|
||||
)?;
|
||||
let mut res = Vec::with_capacity(monitor_list.len());
|
||||
for monitor_key in monitor_list {
|
||||
res.push(self.read_channel_monitor_with_updates(
|
||||
broadcaster,
|
||||
fee_estimator,
|
||||
monitor_key,
|
||||
)?)
|
||||
res.push(self.read_channel_monitor_with_updates(monitor_key)?)
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
|
@ -496,13 +494,9 @@ where
|
|||
///
|
||||
/// Loading a large number of monitors will be faster if done in parallel. You can use this
|
||||
/// function to accomplish this. Take care to limit the number of parallel readers.
|
||||
pub fn read_channel_monitor_with_updates<B: Deref, F: Deref>(
|
||||
&self, broadcaster: &B, fee_estimator: &F, monitor_key: String,
|
||||
) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
|
||||
where
|
||||
B::Target: BroadcasterInterface,
|
||||
F::Target: FeeEstimator,
|
||||
{
|
||||
pub fn read_channel_monitor_with_updates(
|
||||
&self, monitor_key: String,
|
||||
) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error> {
|
||||
let monitor_name = MonitorName::new(monitor_key)?;
|
||||
let (block_hash, monitor) = self.read_monitor(&monitor_name)?;
|
||||
let mut current_update_id = monitor.get_latest_update_id();
|
||||
|
@ -521,7 +515,7 @@ where
|
|||
Err(err) => return Err(err),
|
||||
};
|
||||
|
||||
monitor.update_monitor(&update, broadcaster, fee_estimator, &self.logger)
|
||||
monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
|
||||
.map_err(|e| {
|
||||
log_error!(
|
||||
self.logger,
|
||||
|
@ -639,13 +633,15 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<ChannelSigner: EcdsaChannelSigner, K: Deref, L: Deref, ES: Deref, SP: Deref>
|
||||
Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP>
|
||||
impl<ChannelSigner: EcdsaChannelSigner, K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
|
||||
Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
|
||||
where
|
||||
K::Target: KVStore,
|
||||
L::Target: Logger,
|
||||
ES::Target: EntropySource + Sized,
|
||||
SP::Target: SignerProvider + Sized,
|
||||
BI::Target: BroadcasterInterface,
|
||||
FE::Target: FeeEstimator
|
||||
{
|
||||
/// Persists a new channel. This means writing the entire monitor to the
|
||||
/// parametrized [`KVStore`].
|
||||
|
@ -788,12 +784,14 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<K: Deref, L: Deref, ES: Deref, SP: Deref> MonitorUpdatingPersister<K, L, ES, SP>
|
||||
impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref> MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
|
||||
where
|
||||
ES::Target: EntropySource + Sized,
|
||||
K::Target: KVStore,
|
||||
L::Target: Logger,
|
||||
SP::Target: SignerProvider + Sized
|
||||
SP::Target: SignerProvider + Sized,
|
||||
BI::Target: BroadcasterInterface,
|
||||
FE::Target: FeeEstimator
|
||||
{
|
||||
// Cleans up monitor updates for given monitor in range `start..=end`.
|
||||
fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) {
|
||||
|
@ -962,6 +960,8 @@ mod tests {
|
|||
maximum_pending_updates: persister_0_max_pending_updates,
|
||||
entropy_source: &chanmon_cfgs[0].keys_manager,
|
||||
signer_provider: &chanmon_cfgs[0].keys_manager,
|
||||
broadcaster: &chanmon_cfgs[0].tx_broadcaster,
|
||||
fee_estimator: &chanmon_cfgs[0].fee_estimator,
|
||||
};
|
||||
let persister_1 = MonitorUpdatingPersister {
|
||||
kv_store: &TestStore::new(false),
|
||||
|
@ -969,6 +969,8 @@ mod tests {
|
|||
maximum_pending_updates: persister_1_max_pending_updates,
|
||||
entropy_source: &chanmon_cfgs[1].keys_manager,
|
||||
signer_provider: &chanmon_cfgs[1].keys_manager,
|
||||
broadcaster: &chanmon_cfgs[1].tx_broadcaster,
|
||||
fee_estimator: &chanmon_cfgs[1].fee_estimator,
|
||||
};
|
||||
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
|
||||
let chain_mon_0 = test_utils::TestChainMonitor::new(
|
||||
|
@ -991,23 +993,18 @@ mod tests {
|
|||
node_cfgs[1].chain_monitor = chain_mon_1;
|
||||
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
|
||||
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
|
||||
let broadcaster_0 = &chanmon_cfgs[2].tx_broadcaster;
|
||||
let broadcaster_1 = &chanmon_cfgs[3].tx_broadcaster;
|
||||
|
||||
// Check that the persisted channel data is empty before any channels are
|
||||
// open.
|
||||
let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates(
|
||||
&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
|
||||
let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
|
||||
assert_eq!(persisted_chan_data_0.len(), 0);
|
||||
let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates(
|
||||
&broadcaster_1, &&chanmon_cfgs[1].fee_estimator).unwrap();
|
||||
let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
|
||||
assert_eq!(persisted_chan_data_1.len(), 0);
|
||||
|
||||
// Helper to make sure the channel is on the expected update ID.
|
||||
macro_rules! check_persisted_data {
|
||||
($expected_update_id: expr) => {
|
||||
persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates(
|
||||
&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
|
||||
persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
|
||||
// check that we stored only one monitor
|
||||
assert_eq!(persisted_chan_data_0.len(), 1);
|
||||
for (_, mon) in persisted_chan_data_0.iter() {
|
||||
|
@ -1026,8 +1023,7 @@ mod tests {
|
|||
);
|
||||
}
|
||||
}
|
||||
persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates(
|
||||
&broadcaster_1, &&chanmon_cfgs[1].fee_estimator).unwrap();
|
||||
persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
|
||||
assert_eq!(persisted_chan_data_1.len(), 1);
|
||||
for (_, mon) in persisted_chan_data_1.iter() {
|
||||
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
|
||||
|
@ -1095,7 +1091,7 @@ mod tests {
|
|||
check_persisted_data!(CLOSED_CHANNEL_UPDATE_ID);
|
||||
|
||||
// Make sure the expected number of stale updates is present.
|
||||
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
|
||||
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap();
|
||||
let (_, monitor) = &persisted_chan_data[0];
|
||||
let monitor_name = MonitorName::from(monitor.get_funding_txo().0);
|
||||
// The channel should have 0 updates, as it wrote a full monitor and consolidated.
|
||||
|
@ -1129,6 +1125,8 @@ mod tests {
|
|||
maximum_pending_updates: 11,
|
||||
entropy_source: node_cfgs[0].keys_manager,
|
||||
signer_provider: node_cfgs[0].keys_manager,
|
||||
broadcaster: node_cfgs[0].tx_broadcaster,
|
||||
fee_estimator: node_cfgs[0].fee_estimator,
|
||||
};
|
||||
match ro_persister.persist_new_channel(test_txo, &added_monitors[0].1) {
|
||||
ChannelMonitorUpdateStatus::UnrecoverableError => {
|
||||
|
@ -1168,6 +1166,8 @@ mod tests {
|
|||
maximum_pending_updates: test_max_pending_updates,
|
||||
entropy_source: &chanmon_cfgs[0].keys_manager,
|
||||
signer_provider: &chanmon_cfgs[0].keys_manager,
|
||||
broadcaster: &chanmon_cfgs[0].tx_broadcaster,
|
||||
fee_estimator: &chanmon_cfgs[0].fee_estimator,
|
||||
};
|
||||
let persister_1 = MonitorUpdatingPersister {
|
||||
kv_store: &TestStore::new(false),
|
||||
|
@ -1175,6 +1175,8 @@ mod tests {
|
|||
maximum_pending_updates: test_max_pending_updates,
|
||||
entropy_source: &chanmon_cfgs[1].keys_manager,
|
||||
signer_provider: &chanmon_cfgs[1].keys_manager,
|
||||
broadcaster: &chanmon_cfgs[1].tx_broadcaster,
|
||||
fee_estimator: &chanmon_cfgs[1].fee_estimator,
|
||||
};
|
||||
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
|
||||
let chain_mon_0 = test_utils::TestChainMonitor::new(
|
||||
|
@ -1198,11 +1200,9 @@ mod tests {
|
|||
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
|
||||
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
|
||||
|
||||
let broadcaster_0 = &chanmon_cfgs[2].tx_broadcaster;
|
||||
|
||||
// Check that the persisted channel data is empty before any channels are
|
||||
// open.
|
||||
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
|
||||
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap();
|
||||
assert_eq!(persisted_chan_data.len(), 0);
|
||||
|
||||
// Create some initial channel
|
||||
|
@ -1213,7 +1213,7 @@ mod tests {
|
|||
send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000);
|
||||
|
||||
// Get the monitor and make a fake stale update at update_id=1 (lowest height of an update possible)
|
||||
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
|
||||
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap();
|
||||
let (_, monitor) = &persisted_chan_data[0];
|
||||
let monitor_name = MonitorName::from(monitor.get_funding_txo().0);
|
||||
persister_0
|
||||
|
|
Loading…
Add table
Reference in a new issue