mirror of
https://github.com/lightningdevkit/rust-lightning.git
synced 2025-02-24 15:02:20 +01:00
Merge pull request #2964 from jbesraa/prune-stale-chanmonitor
Add `archive_fully_resolved_monitors` to `ChainMonitor`
This commit is contained in:
commit
195e666953
6 changed files with 226 additions and 0 deletions
|
@ -17,4 +17,7 @@ impl chainmonitor::Persist<TestChannelSigner> for TestPersister {
|
|||
fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: Option<&channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor<TestChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
|
||||
self.update_ret.lock().unwrap().clone()
|
||||
}
|
||||
|
||||
fn archive_persisted_channel(&self, _: OutPoint) {
|
||||
}
|
||||
}
|
||||
|
|
|
@ -194,6 +194,11 @@ pub trait Persist<ChannelSigner: WriteableEcdsaChannelSigner> {
|
|||
///
|
||||
/// [`Writeable::write`]: crate::util::ser::Writeable::write
|
||||
fn update_persisted_channel(&self, channel_funding_outpoint: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
|
||||
/// Prevents the channel monitor from being loaded on startup.
|
||||
///
|
||||
/// Archiving the data in a backup location (rather than deleting it fully) is useful for
|
||||
/// hedging against data loss in case of unexpected failure.
|
||||
fn archive_persisted_channel(&self, channel_funding_outpoint: OutPoint);
|
||||
}
|
||||
|
||||
struct MonitorHolder<ChannelSigner: WriteableEcdsaChannelSigner> {
|
||||
|
@ -656,6 +661,41 @@ where C::Target: chain::Filter,
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Archives fully resolved channel monitors by calling [`Persist::archive_persisted_channel`].
|
||||
///
|
||||
/// This is useful for pruning fully resolved monitors from the monitor set and primary
|
||||
/// storage so they are not kept in memory and reloaded on restart.
|
||||
///
|
||||
/// Should be called occasionally (once every handful of blocks or on startup).
|
||||
///
|
||||
/// Depending on the implementation of [`Persist::archive_persisted_channel`] the monitor
|
||||
/// data could be moved to an archive location or removed entirely.
|
||||
pub fn archive_fully_resolved_channel_monitors(&self) {
|
||||
let mut have_monitors_to_prune = false;
|
||||
for (_, monitor_holder) in self.monitors.read().unwrap().iter() {
|
||||
let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor);
|
||||
if monitor_holder.monitor.is_fully_resolved(&logger) {
|
||||
have_monitors_to_prune = true;
|
||||
}
|
||||
}
|
||||
if have_monitors_to_prune {
|
||||
let mut monitors = self.monitors.write().unwrap();
|
||||
monitors.retain(|funding_txo, monitor_holder| {
|
||||
let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor);
|
||||
if monitor_holder.monitor.is_fully_resolved(&logger) {
|
||||
log_info!(logger,
|
||||
"Archiving fully resolved ChannelMonitor for funding txo {}",
|
||||
funding_txo
|
||||
);
|
||||
self.persister.archive_persisted_channel(*funding_txo);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
|
||||
|
|
|
@ -935,6 +935,9 @@ pub(crate) struct ChannelMonitorImpl<Signer: WriteableEcdsaChannelSigner> {
|
|||
/// Ordering of tuple data: (their_per_commitment_point, feerate_per_kw, to_broadcaster_sats,
|
||||
/// to_countersignatory_sats)
|
||||
initial_counterparty_commitment_info: Option<(PublicKey, u32, u64, u64)>,
|
||||
|
||||
/// The first block height at which we had no remaining claimable balances.
|
||||
balances_empty_height: Option<u32>,
|
||||
}
|
||||
|
||||
/// Transaction outputs to watch for on-chain spends.
|
||||
|
@ -1145,6 +1148,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Writeable for ChannelMonitorImpl<Signe
|
|||
(15, self.counterparty_fulfilled_htlcs, required),
|
||||
(17, self.initial_counterparty_commitment_info, option),
|
||||
(19, self.channel_id, required),
|
||||
(21, self.balances_empty_height, option),
|
||||
});
|
||||
|
||||
Ok(())
|
||||
|
@ -1328,6 +1332,7 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
|
|||
best_block,
|
||||
counterparty_node_id: Some(counterparty_node_id),
|
||||
initial_counterparty_commitment_info: None,
|
||||
balances_empty_height: None,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -1856,6 +1861,52 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
|
|||
spendable_outputs
|
||||
}
|
||||
|
||||
/// Checks if the monitor is fully resolved. Resolved monitor is one that has claimed all of
|
||||
/// its outputs and balances (i.e. [`Self::get_claimable_balances`] returns an empty set).
|
||||
///
|
||||
/// This function returns true only if [`Self::get_claimable_balances`] has been empty for at least
|
||||
/// 2016 blocks as an additional protection against any bugs resulting in spuriously empty balance sets.
|
||||
pub fn is_fully_resolved<L: Logger>(&self, logger: &L) -> bool {
|
||||
let mut is_all_funds_claimed = self.get_claimable_balances().is_empty();
|
||||
let current_height = self.current_best_block().height;
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
|
||||
if is_all_funds_claimed {
|
||||
if !inner.funding_spend_seen {
|
||||
debug_assert!(false, "We should see funding spend by the time a monitor clears out");
|
||||
is_all_funds_claimed = false;
|
||||
}
|
||||
}
|
||||
|
||||
match (inner.balances_empty_height, is_all_funds_claimed) {
|
||||
(Some(balances_empty_height), true) => {
|
||||
// Claimed all funds, check if reached the blocks threshold.
|
||||
const BLOCKS_THRESHOLD: u32 = 4032; // ~four weeks
|
||||
return current_height >= balances_empty_height + BLOCKS_THRESHOLD;
|
||||
},
|
||||
(Some(_), false) => {
|
||||
// previously assumed we claimed all funds, but we have new funds to claim.
|
||||
// Should not happen in practice.
|
||||
debug_assert!(false, "Thought we were done claiming funds, but claimable_balances now has entries");
|
||||
log_error!(logger,
|
||||
"WARNING: LDK thought it was done claiming all the available funds in the ChannelMonitor for channel {}, but later decided it had more to claim. This is potentially an important bug in LDK, please report it at https://github.com/lightningdevkit/rust-lightning/issues/new",
|
||||
inner.get_funding_txo().0);
|
||||
inner.balances_empty_height = None;
|
||||
false
|
||||
},
|
||||
(None, true) => {
|
||||
// Claimed all funds but `balances_empty_height` is None. It is set to the
|
||||
// current block height.
|
||||
inner.balances_empty_height = Some(current_height);
|
||||
false
|
||||
},
|
||||
(None, false) => {
|
||||
// Have funds to claim.
|
||||
false
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn get_counterparty_payment_script(&self) -> ScriptBuf {
|
||||
self.inner.lock().unwrap().counterparty_payment_script.clone()
|
||||
|
@ -4632,6 +4683,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
|
|||
let mut spendable_txids_confirmed = Some(Vec::new());
|
||||
let mut counterparty_fulfilled_htlcs = Some(new_hash_map());
|
||||
let mut initial_counterparty_commitment_info = None;
|
||||
let mut balances_empty_height = None;
|
||||
let mut channel_id = None;
|
||||
read_tlv_fields!(reader, {
|
||||
(1, funding_spend_confirmed, option),
|
||||
|
@ -4644,6 +4696,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
|
|||
(15, counterparty_fulfilled_htlcs, option),
|
||||
(17, initial_counterparty_commitment_info, option),
|
||||
(19, channel_id, option),
|
||||
(21, balances_empty_height, option),
|
||||
});
|
||||
|
||||
// `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. If we have both
|
||||
|
@ -4722,6 +4775,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
|
|||
best_block,
|
||||
counterparty_node_id,
|
||||
initial_counterparty_commitment_info,
|
||||
balances_empty_height,
|
||||
})))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -158,6 +158,60 @@ fn revoked_output_htlc_resolution_timing() {
|
|||
expect_payment_failed!(nodes[1], payment_hash_1, false);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn archive_fully_resolved_monitors() {
|
||||
// Test we can archive fully resolved channel monitor.
|
||||
let chanmon_cfgs = create_chanmon_cfgs(2);
|
||||
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
|
||||
let mut user_config = test_default_channel_config();
|
||||
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[Some(user_config), Some(user_config)]);
|
||||
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
|
||||
|
||||
let (_, _, chan_id, funding_tx) =
|
||||
create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 1_000_000);
|
||||
|
||||
nodes[0].node.close_channel(&chan_id, &nodes[1].node.get_our_node_id()).unwrap();
|
||||
let node_0_shutdown = get_event_msg!(nodes[0], MessageSendEvent::SendShutdown, nodes[1].node.get_our_node_id());
|
||||
nodes[1].node.handle_shutdown(&nodes[0].node.get_our_node_id(), &node_0_shutdown);
|
||||
let node_1_shutdown = get_event_msg!(nodes[1], MessageSendEvent::SendShutdown, nodes[0].node.get_our_node_id());
|
||||
nodes[0].node.handle_shutdown(&nodes[1].node.get_our_node_id(), &node_1_shutdown);
|
||||
|
||||
let node_0_closing_signed = get_event_msg!(nodes[0], MessageSendEvent::SendClosingSigned, nodes[1].node.get_our_node_id());
|
||||
nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &node_0_closing_signed);
|
||||
let node_1_closing_signed = get_event_msg!(nodes[1], MessageSendEvent::SendClosingSigned, nodes[0].node.get_our_node_id());
|
||||
nodes[0].node.handle_closing_signed(&nodes[1].node.get_our_node_id(), &node_1_closing_signed);
|
||||
let (_, node_0_2nd_closing_signed) = get_closing_signed_broadcast!(nodes[0].node, nodes[1].node.get_our_node_id());
|
||||
nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &node_0_2nd_closing_signed.unwrap());
|
||||
let (_, _) = get_closing_signed_broadcast!(nodes[1].node, nodes[0].node.get_our_node_id());
|
||||
|
||||
let shutdown_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
|
||||
|
||||
mine_transaction(&nodes[0], &shutdown_tx[0]);
|
||||
mine_transaction(&nodes[1], &shutdown_tx[0]);
|
||||
|
||||
connect_blocks(&nodes[0], 6);
|
||||
connect_blocks(&nodes[1], 6);
|
||||
|
||||
check_closed_event!(nodes[0], 1, ClosureReason::LocallyInitiatedCooperativeClosure, [nodes[1].node.get_our_node_id()], 1000000);
|
||||
check_closed_event!(nodes[1], 1, ClosureReason::CounterpartyInitiatedCooperativeClosure, [nodes[0].node.get_our_node_id()], 1000000);
|
||||
|
||||
assert_eq!(nodes[0].chain_monitor.chain_monitor.list_monitors().len(), 1);
|
||||
// First archive should set balances_empty_height to current block height
|
||||
nodes[0].chain_monitor.chain_monitor.archive_fully_resolved_channel_monitors();
|
||||
assert_eq!(nodes[0].chain_monitor.chain_monitor.list_monitors().len(), 1);
|
||||
connect_blocks(&nodes[0], 4032);
|
||||
// Second call after 4032 blocks, should archive the monitor
|
||||
nodes[0].chain_monitor.chain_monitor.archive_fully_resolved_channel_monitors();
|
||||
// Should have no monitors left
|
||||
assert_eq!(nodes[0].chain_monitor.chain_monitor.list_monitors().len(), 0);
|
||||
// Remove the corresponding outputs and transactions the chain source is
|
||||
// watching. This is to make sure the `Drop` function assertions pass.
|
||||
nodes.get_mut(0).unwrap().chain_source.remove_watched_txn_and_outputs(
|
||||
OutPoint { txid: funding_tx.txid(), index: 0 },
|
||||
funding_tx.output[0].script_pubkey.clone()
|
||||
);
|
||||
}
|
||||
|
||||
fn do_chanmon_claim_value_coop_close(anchors: bool) {
|
||||
// Tests `get_claimable_balances` returns the correct values across a simple cooperative claim.
|
||||
// Specifically, this tests that the channel non-HTLC balances show up in
|
||||
|
|
|
@ -56,6 +56,11 @@ pub const CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
|
|||
/// The primary namespace under which [`ChannelMonitorUpdate`]s will be persisted.
|
||||
pub const CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE: &str = "monitor_updates";
|
||||
|
||||
/// The primary namespace under which archived [`ChannelMonitor`]s will be persisted.
|
||||
pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE: &str = "archived_monitors";
|
||||
/// The secondary namespace under which archived [`ChannelMonitor`]s will be persisted.
|
||||
pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
|
||||
|
||||
/// The primary namespace under which the [`NetworkGraph`] will be persisted.
|
||||
pub const NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
|
||||
/// The secondary namespace under which the [`NetworkGraph`] will be persisted.
|
||||
|
@ -226,6 +231,33 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore + ?Sized> Persist<Ch
|
|||
Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
|
||||
}
|
||||
}
|
||||
|
||||
fn archive_persisted_channel(&self, funding_txo: OutPoint) {
|
||||
let monitor_name = MonitorName::from(funding_txo);
|
||||
let monitor = match self.read(
|
||||
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
|
||||
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
|
||||
monitor_name.as_str(),
|
||||
) {
|
||||
Ok(monitor) => monitor,
|
||||
Err(_) => return
|
||||
};
|
||||
match self.write(
|
||||
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
|
||||
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
|
||||
monitor_name.as_str(),
|
||||
&monitor,
|
||||
) {
|
||||
Ok(()) => {}
|
||||
Err(_e) => return
|
||||
};
|
||||
let _ = self.remove(
|
||||
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
|
||||
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
|
||||
monitor_name.as_str(),
|
||||
true,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Read previously persisted [`ChannelMonitor`]s from the store.
|
||||
|
@ -732,6 +764,29 @@ where
|
|||
self.persist_new_channel(funding_txo, monitor, monitor_update_call_id)
|
||||
}
|
||||
}
|
||||
|
||||
fn archive_persisted_channel(&self, funding_txo: OutPoint) {
|
||||
let monitor_name = MonitorName::from(funding_txo);
|
||||
let monitor = match self.read_monitor(&monitor_name) {
|
||||
Ok((_block_hash, monitor)) => monitor,
|
||||
Err(_) => return
|
||||
};
|
||||
match self.kv_store.write(
|
||||
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
|
||||
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
|
||||
monitor_name.as_str(),
|
||||
&monitor.encode()
|
||||
) {
|
||||
Ok(()) => {},
|
||||
Err(_e) => return,
|
||||
};
|
||||
let _ = self.kv_store.remove(
|
||||
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
|
||||
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
|
||||
monitor_name.as_str(),
|
||||
true,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
impl<K: Deref, L: Deref, ES: Deref, SP: Deref> MonitorUpdatingPersister<K, L, ES, SP>
|
||||
|
|
|
@ -504,6 +504,10 @@ impl<Signer: sign::ecdsa::WriteableEcdsaChannelSigner> chainmonitor::Persist<Sig
|
|||
}
|
||||
res
|
||||
}
|
||||
|
||||
fn archive_persisted_channel(&self, funding_txo: OutPoint) {
|
||||
<TestPersister as chainmonitor::Persist<TestChannelSigner>>::archive_persisted_channel(&self.persister, funding_txo);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TestPersister {
|
||||
|
@ -552,6 +556,18 @@ impl<Signer: sign::ecdsa::WriteableEcdsaChannelSigner> chainmonitor::Persist<Sig
|
|||
}
|
||||
ret
|
||||
}
|
||||
|
||||
fn archive_persisted_channel(&self, funding_txo: OutPoint) {
|
||||
// remove the channel from the offchain_monitor_updates map
|
||||
match self.offchain_monitor_updates.lock().unwrap().remove(&funding_txo) {
|
||||
Some(_) => {},
|
||||
None => {
|
||||
// If the channel was not in the offchain_monitor_updates map, it should be in the
|
||||
// chain_sync_monitor_persistences map.
|
||||
assert!(self.chain_sync_monitor_persistences.lock().unwrap().remove(&funding_txo).is_some());
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TestStore {
|
||||
|
@ -1366,6 +1382,10 @@ impl TestChainSource {
|
|||
watched_outputs: Mutex::new(new_hash_set()),
|
||||
}
|
||||
}
|
||||
pub fn remove_watched_txn_and_outputs(&self, outpoint: OutPoint, script_pubkey: ScriptBuf) {
|
||||
self.watched_outputs.lock().unwrap().remove(&(outpoint, script_pubkey.clone()));
|
||||
self.watched_txn.lock().unwrap().remove(&(outpoint.txid, script_pubkey));
|
||||
}
|
||||
}
|
||||
|
||||
impl UtxoLookup for TestChainSource {
|
||||
|
|
Loading…
Add table
Reference in a new issue