Make ChainMonitor::monitors private and expose monitor via getter

Exposing a `RwLock<HashMap<>>` directly was always a bit strange,
and in upcoming changes we'd like to change the internal
datastructure in `ChainMonitor`.

Further, the use of `RwLock` and `HashMap` meant we weren't able
to expose the ChannelMonitors themselves to users in bindings,
leaving a bindings/rust API gap.

Thus, we take this opportunity go expose ChannelMonitors directly
via a wrapper, hiding the internals of `ChainMonitor` behind
getters. We also update tests to use the new API.
This commit is contained in:
Matt Corallo 2021-10-08 19:07:00 +00:00
parent 6a7c48b60d
commit 79541b11e8
7 changed files with 163 additions and 113 deletions

View file

@ -38,7 +38,7 @@ use util::events::EventHandler;
use ln::channelmanager::ChannelDetails;
use prelude::*;
use sync::RwLock;
use sync::{RwLock, RwLockReadGuard};
use core::ops::Deref;
/// `Persist` defines behavior for persisting channel monitors: this could mean
@ -92,6 +92,26 @@ pub trait Persist<ChannelSigner: Sign> {
fn update_persisted_channel(&self, id: OutPoint, update: &ChannelMonitorUpdate, data: &ChannelMonitor<ChannelSigner>) -> Result<(), ChannelMonitorUpdateErr>;
}
struct MonitorHolder<ChannelSigner: Sign> {
monitor: ChannelMonitor<ChannelSigner>,
}
/// A read-only reference to a current ChannelMonitor.
///
/// Note that this holds a mutex in [`ChainMonitor`] and may block other events until it is
/// released.
pub struct LockedChannelMonitor<'a, ChannelSigner: Sign> {
lock: RwLockReadGuard<'a, HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
funding_txo: OutPoint,
}
impl<ChannelSigner: Sign> Deref for LockedChannelMonitor<'_, ChannelSigner> {
type Target = ChannelMonitor<ChannelSigner>;
fn deref(&self) -> &ChannelMonitor<ChannelSigner> {
&self.lock.get(&self.funding_txo).expect("Checked at construction").monitor
}
}
/// An implementation of [`chain::Watch`] for monitoring channels.
///
/// Connected and disconnected blocks must be provided to `ChainMonitor` as documented by
@ -108,8 +128,7 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
{
/// The monitors
pub monitors: RwLock<HashMap<OutPoint, ChannelMonitor<ChannelSigner>>>,
monitors: RwLock<HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
chain_source: Option<C>,
broadcaster: T,
logger: L,
@ -138,9 +157,9 @@ where C::Target: chain::Filter,
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
{
let mut dependent_txdata = Vec::new();
let monitors = self.monitors.read().unwrap();
for monitor in monitors.values() {
let mut txn_outputs = process(monitor, txdata);
let monitor_states = self.monitors.read().unwrap();
for monitor_state in monitor_states.values() {
let mut txn_outputs = process(&monitor_state.monitor, txdata);
// Register any new outputs with the chain source for filtering, storing any dependent
// transactions from within the block that previously had not been included in txdata.
@ -202,8 +221,8 @@ where C::Target: chain::Filter,
/// inclusion in the return value.
pub fn get_claimable_balances(&self, ignored_channels: &[&ChannelDetails]) -> Vec<Balance> {
let mut ret = Vec::new();
let monitors = self.monitors.read().unwrap();
for (_, monitor) in monitors.iter().filter(|(funding_outpoint, _)| {
let monitor_states = self.monitors.read().unwrap();
for (_, monitor_state) in monitor_states.iter().filter(|(funding_outpoint, _)| {
for chan in ignored_channels {
if chan.funding_txo.as_ref() == Some(funding_outpoint) {
return false;
@ -211,11 +230,38 @@ where C::Target: chain::Filter,
}
true
}) {
ret.append(&mut monitor.get_claimable_balances());
ret.append(&mut monitor_state.monitor.get_claimable_balances());
}
ret
}
/// Gets the [`LockedChannelMonitor`] for a given funding outpoint, returning an `Err` if no
/// such [`ChannelMonitor`] is currently being monitored for.
///
/// Note that the result holds a mutex over our monitor set, and should not be held
/// indefinitely.
pub fn get_monitor(&self, funding_txo: OutPoint) -> Result<LockedChannelMonitor<'_, ChannelSigner>, ()> {
let lock = self.monitors.read().unwrap();
if lock.get(&funding_txo).is_some() {
Ok(LockedChannelMonitor { lock, funding_txo })
} else {
Err(())
}
}
/// Lists the funding outpoint of each [`ChannelMonitor`] being monitored.
///
/// Note that [`ChannelMonitor`]s are not removed when a channel is closed as they are always
/// monitoring for on-chain state resolutions.
pub fn list_monitors(&self) -> Vec<OutPoint> {
self.monitors.read().unwrap().keys().map(|outpoint| *outpoint).collect()
}
#[cfg(test)]
pub fn remove_monitor(&self, funding_txo: &OutPoint) -> ChannelMonitor<ChannelSigner> {
self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor
}
#[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
use util::events::EventsProvider;
@ -246,10 +292,10 @@ where
}
fn block_disconnected(&self, header: &BlockHeader, height: u32) {
let monitors = self.monitors.read().unwrap();
let monitor_states = self.monitors.read().unwrap();
log_debug!(self.logger, "Latest block {} at height {} removed via block_disconnected", header.block_hash(), height);
for monitor in monitors.values() {
monitor.block_disconnected(
for monitor_state in monitor_states.values() {
monitor_state.monitor.block_disconnected(
header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
}
}
@ -274,9 +320,9 @@ where
fn transaction_unconfirmed(&self, txid: &Txid) {
log_debug!(self.logger, "Transaction {} reorganized out of chain", txid);
let monitors = self.monitors.read().unwrap();
for monitor in monitors.values() {
monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
let monitor_states = self.monitors.read().unwrap();
for monitor_state in monitor_states.values() {
monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
}
}
@ -293,9 +339,9 @@ where
fn get_relevant_txids(&self) -> Vec<Txid> {
let mut txids = Vec::new();
let monitors = self.monitors.read().unwrap();
for monitor in monitors.values() {
txids.append(&mut monitor.get_relevant_txids());
let monitor_states = self.monitors.read().unwrap();
for monitor_state in monitor_states.values() {
txids.append(&mut monitor_state.monitor.get_relevant_txids());
}
txids.sort_unstable();
@ -338,7 +384,7 @@ where C::Target: chain::Filter,
monitor.load_outputs_to_watch(chain_source);
}
}
entry.insert(monitor);
entry.insert(MonitorHolder { monitor });
Ok(())
}
@ -359,7 +405,8 @@ where C::Target: chain::Filter,
#[cfg(not(any(test, feature = "fuzztarget")))]
Err(ChannelMonitorUpdateErr::PermanentFailure)
},
Some(monitor) => {
Some(monitor_state) => {
let monitor = &monitor_state.monitor;
log_trace!(self.logger, "Updating Channel Monitor for channel {}", log_funding_info!(monitor));
let update_res = monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger);
if let Err(e) = &update_res {
@ -382,8 +429,8 @@ where C::Target: chain::Filter,
fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
let mut pending_monitor_events = Vec::new();
for monitor in self.monitors.read().unwrap().values() {
pending_monitor_events.append(&mut monitor.get_and_clear_pending_monitor_events());
for monitor_state in self.monitors.read().unwrap().values() {
pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events());
}
pending_monitor_events
}
@ -404,8 +451,8 @@ impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> even
/// [`SpendableOutputs`]: events::Event::SpendableOutputs
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
let mut pending_events = Vec::new();
for monitor in self.monitors.read().unwrap().values() {
pending_events.append(&mut monitor.get_and_clear_pending_events());
for monitor_state in self.monitors.read().unwrap().values() {
pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
}
for event in pending_events.drain(..) {
handler.handle_event(&event);

View file

@ -114,8 +114,7 @@ fn test_monitor_and_persister_update_fail() {
blocks: Arc::new(Mutex::new(vec![(genesis_block(Network::Testnet).header, 200); 200])),
};
let chain_mon = {
let monitors = nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap();
let monitor = monitors.get(&outpoint).unwrap();
let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(outpoint).unwrap();
let mut w = test_utils::TestVecWriter(Vec::new());
monitor.write(&mut w).unwrap();
let new_monitor = <(BlockHash, ChannelMonitor<EnforcingSigner>)>::read(
@ -2256,7 +2255,7 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) {
if reload_a {
let nodes_0_serialized = nodes[0].node.encode();
let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new());
nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter().next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap();
get_monitor!(nodes[0], chan_id).write(&mut chan_0_monitor_serialized).unwrap();
persister = test_utils::TestPersister::new();
let keys_manager = &chanmon_cfgs[0].keys_manager;

View file

@ -274,10 +274,9 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> {
let feeest = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) };
let mut deserialized_monitors = Vec::new();
{
let old_monitors = self.chain_monitor.chain_monitor.monitors.read().unwrap();
for (_, old_monitor) in old_monitors.iter() {
for outpoint in self.chain_monitor.chain_monitor.list_monitors() {
let mut w = test_utils::TestVecWriter(Vec::new());
old_monitor.write(&mut w).unwrap();
self.chain_monitor.chain_monitor.get_monitor(outpoint).unwrap().write(&mut w).unwrap();
let (_, deserialized_monitor) = <(BlockHash, ChannelMonitor<EnforcingSigner>)>::read(
&mut io::Cursor::new(&w.0), self.keys_manager).unwrap();
deserialized_monitors.push(deserialized_monitor);
@ -437,20 +436,35 @@ macro_rules! get_feerate {
}
}
/// Returns a channel monitor given a channel id, making some naive assumptions
#[macro_export]
macro_rules! get_monitor {
($node: expr, $channel_id: expr) => {
{
use bitcoin::hashes::Hash;
let mut monitor = None;
// Assume funding vout is either 0 or 1 blindly
for index in 0..2 {
if let Ok(mon) = $node.chain_monitor.chain_monitor.get_monitor(
$crate::chain::transaction::OutPoint {
txid: bitcoin::Txid::from_slice(&$channel_id[..]).unwrap(), index
})
{
monitor = Some(mon);
break;
}
}
monitor.unwrap()
}
}
}
/// Returns any local commitment transactions for the channel.
#[macro_export]
macro_rules! get_local_commitment_txn {
($node: expr, $channel_id: expr) => {
{
let monitors = $node.chain_monitor.chain_monitor.monitors.read().unwrap();
let mut commitment_txn = None;
for (funding_txo, monitor) in monitors.iter() {
if funding_txo.to_channel_id() == $channel_id {
commitment_txn = Some(monitor.unsafe_get_latest_holder_commitment_txn(&$node.logger));
break;
}
}
commitment_txn.unwrap()
$crate::get_monitor!($node, $channel_id).unsafe_get_latest_holder_commitment_txn(&$node.logger)
}
}
}

View file

@ -35,7 +35,7 @@ use util::errors::APIError;
use util::ser::{Writeable, ReadableArgs};
use util::config::UserConfig;
use bitcoin::hash_types::{Txid, BlockHash};
use bitcoin::hash_types::BlockHash;
use bitcoin::blockdata::block::{Block, BlockHeader};
use bitcoin::blockdata::script::Builder;
use bitcoin::blockdata::opcodes;
@ -2104,7 +2104,7 @@ fn channel_monitor_network_test() {
// Drop the ChannelMonitor for the previous channel to avoid it broadcasting transactions and
// confusing us in the following tests.
let chan_3_mon = nodes[3].chain_monitor.chain_monitor.monitors.write().unwrap().remove(&OutPoint { txid: chan_3.3.txid(), index: 0 }).unwrap();
let chan_3_mon = nodes[3].chain_monitor.chain_monitor.remove_monitor(&OutPoint { txid: chan_3.3.txid(), index: 0 });
// One pending HTLC to time out:
let payment_preimage_2 = route_payment(&nodes[3], &vec!(&nodes[4])[..], 3000000).0;
@ -2171,7 +2171,7 @@ fn channel_monitor_network_test() {
assert_eq!(nodes[3].node.list_channels().len(), 0);
assert_eq!(nodes[4].node.list_channels().len(), 0);
nodes[3].chain_monitor.chain_monitor.monitors.write().unwrap().insert(OutPoint { txid: chan_3.3.txid(), index: 0 }, chan_3_mon);
nodes[3].chain_monitor.chain_monitor.watch_channel(OutPoint { txid: chan_3.3.txid(), index: 0 }, chan_3_mon).unwrap();
check_closed_event!(nodes[3], 1, ClosureReason::CommitmentTxConfirmed);
check_closed_event!(nodes[4], 1, ClosureReason::CommitmentTxConfirmed);
}
@ -3268,8 +3268,7 @@ fn test_force_close_fail_back() {
// Now check that if we add the preimage to ChannelMonitor it broadcasts our HTLC-Success..
{
let mut monitors = nodes[2].chain_monitor.chain_monitor.monitors.read().unwrap();
monitors.get(&OutPoint{ txid: Txid::from_slice(&payment_event.commitment_msg.channel_id[..]).unwrap(), index: 0 }).unwrap()
get_monitor!(nodes[2], payment_event.commitment_msg.channel_id)
.provide_payment_preimage(&our_payment_hash, &our_payment_preimage, &node_cfgs[2].tx_broadcaster, &node_cfgs[2].fee_estimator, &node_cfgs[2].logger);
}
mine_transaction(&nodes[2], &tx);
@ -3621,10 +3620,12 @@ fn test_funding_peer_disconnect() {
confirm_transaction(&nodes[0], &tx);
let events_1 = nodes[0].node.get_and_clear_pending_msg_events();
let chan_id;
assert_eq!(events_1.len(), 1);
match events_1[0] {
MessageSendEvent::SendFundingLocked { ref node_id, msg: _ } => {
MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => {
assert_eq!(*node_id, nodes[1].node.get_our_node_id());
chan_id = msg.channel_id;
},
_ => panic!("Unexpected event"),
}
@ -3696,7 +3697,7 @@ fn test_funding_peer_disconnect() {
let nodes_0_serialized = nodes[0].node.encode();
let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new());
nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter().next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap();
get_monitor!(nodes[0], chan_id).write(&mut chan_0_monitor_serialized).unwrap();
persister = test_utils::TestPersister::new();
let keys_manager = &chanmon_cfgs[0].keys_manager;
@ -4040,7 +4041,8 @@ fn test_no_txn_manager_serialize_deserialize() {
let nodes_0_serialized = nodes[0].node.encode();
let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new());
nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter().next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap();
get_monitor!(nodes[0], OutPoint { txid: tx.txid(), index: 0 }.to_channel_id())
.write(&mut chan_0_monitor_serialized).unwrap();
logger = test_utils::TestLogger::new();
fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) };
@ -4120,7 +4122,7 @@ fn test_dup_htlc_onchain_fails_on_reload() {
let nodes_0_deserialized: ChannelManager<EnforcingSigner, &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestLogger>;
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()).2;
// Route a payment, but force-close the channel before the HTLC fulfill message arrives at
// nodes[0].
@ -4156,7 +4158,7 @@ fn test_dup_htlc_onchain_fails_on_reload() {
// fairly normal behavior as ChannelMonitor(s) are often not re-serialized when on-chain events
// happen, unlike ChannelManager which tends to be re-serialized after any relevant event(s).
let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new());
nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter().next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap();
get_monitor!(nodes[0], chan_id).write(&mut chan_0_monitor_serialized).unwrap();
header.prev_blockhash = nodes[0].best_block_hash();
let claim_block = Block { header, txdata: claim_txn};
@ -4243,7 +4245,8 @@ fn test_manager_serialize_deserialize_events() {
added_monitors.clear();
}
node_a.node.handle_funding_signed(&node_b.node.get_our_node_id(), &get_event_msg!(node_b, MessageSendEvent::SendFundingSigned, node_a.node.get_our_node_id()));
let bs_funding_signed = get_event_msg!(node_b, MessageSendEvent::SendFundingSigned, node_a.node.get_our_node_id());
node_a.node.handle_funding_signed(&node_b.node.get_our_node_id(), &bs_funding_signed);
{
let mut added_monitors = node_a.chain_monitor.added_monitors.lock().unwrap();
assert_eq!(added_monitors.len(), 1);
@ -4258,7 +4261,7 @@ fn test_manager_serialize_deserialize_events() {
// Start the de/seriailization process mid-channel creation to check that the channel manager will hold onto events that are serialized
let nodes_0_serialized = nodes[0].node.encode();
let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new());
nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter().next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap();
get_monitor!(nodes[0], bs_funding_signed.channel_id).write(&mut chan_0_monitor_serialized).unwrap();
fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) };
logger = test_utils::TestLogger::new();
@ -4336,7 +4339,7 @@ fn test_simple_manager_serialize_deserialize() {
let new_chain_monitor: test_utils::TestChainMonitor;
let nodes_0_deserialized: ChannelManager<EnforcingSigner, &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestLogger>;
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()).2;
let (our_payment_preimage, _, _) = route_payment(&nodes[0], &[&nodes[1]], 1000000);
let (_, our_payment_hash, _) = route_payment(&nodes[0], &[&nodes[1]], 1000000);
@ -4345,7 +4348,7 @@ fn test_simple_manager_serialize_deserialize() {
let nodes_0_serialized = nodes[0].node.encode();
let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new());
nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter().next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap();
get_monitor!(nodes[0], chan_id).write(&mut chan_0_monitor_serialized).unwrap();
logger = test_utils::TestLogger::new();
fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) };
@ -4397,14 +4400,14 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() {
let new_chain_monitor: test_utils::TestChainMonitor;
let nodes_0_deserialized: ChannelManager<EnforcingSigner, &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestLogger>;
let mut nodes = create_network(4, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
create_announced_chan_between_nodes(&nodes, 2, 0, InitFeatures::known(), InitFeatures::known());
let chan_id_1 = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()).2;
let chan_id_2 = create_announced_chan_between_nodes(&nodes, 2, 0, InitFeatures::known(), InitFeatures::known()).2;
let (_, _, channel_id, funding_tx) = create_announced_chan_between_nodes(&nodes, 0, 3, InitFeatures::known(), InitFeatures::known());
let mut node_0_stale_monitors_serialized = Vec::new();
for monitor in nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter() {
for chan_id_iter in &[chan_id_1, chan_id_2, channel_id] {
let mut writer = test_utils::TestVecWriter(Vec::new());
monitor.1.write(&mut writer).unwrap();
get_monitor!(nodes[0], chan_id_iter).write(&mut writer).unwrap();
node_0_stale_monitors_serialized.push(writer.0);
}
@ -4421,9 +4424,9 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() {
// Now the ChannelMonitor (which is now out-of-sync with ChannelManager for channel w/
// nodes[3])
let mut node_0_monitors_serialized = Vec::new();
for monitor in nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter() {
for chan_id_iter in &[chan_id_1, chan_id_2, channel_id] {
let mut writer = test_utils::TestVecWriter(Vec::new());
monitor.1.write(&mut writer).unwrap();
get_monitor!(nodes[0], chan_id_iter).write(&mut writer).unwrap();
node_0_monitors_serialized.push(writer.0);
}
@ -7163,7 +7166,7 @@ fn test_data_loss_protect() {
// Cache node A state before any channel update
let previous_node_state = nodes[0].node.encode();
let mut previous_chain_monitor_state = test_utils::TestVecWriter(Vec::new());
nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter().next().unwrap().1.write(&mut previous_chain_monitor_state).unwrap();
get_monitor!(nodes[0], chan.2).write(&mut previous_chain_monitor_state).unwrap();
send_payment(&nodes[0], &vec!(&nodes[1])[..], 8000000);
send_payment(&nodes[0], &vec!(&nodes[1])[..], 8000000);
@ -7404,7 +7407,7 @@ fn test_priv_forwarding_rejection() {
let nodes_1_deserialized: ChannelManager<EnforcingSigner, &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestLogger>;
let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 500_000_000, InitFeatures::known(), InitFeatures::known());
let chan_id_1 = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 500_000_000, InitFeatures::known(), InitFeatures::known()).2;
// Note that the create_*_chan functions in utils requires announcement_signatures, which we do
// not send for private channels.
@ -7419,7 +7422,8 @@ fn test_priv_forwarding_rejection() {
nodes[2].node.handle_funding_created(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendFundingCreated, nodes[2].node.get_our_node_id()));
check_added_monitors!(nodes[2], 1);
nodes[1].node.handle_funding_signed(&nodes[2].node.get_our_node_id(), &get_event_msg!(nodes[2], MessageSendEvent::SendFundingSigned, nodes[1].node.get_our_node_id()));
let cs_funding_signed = get_event_msg!(nodes[2], MessageSendEvent::SendFundingSigned, nodes[1].node.get_our_node_id());
nodes[1].node.handle_funding_signed(&nodes[2].node.get_our_node_id(), &cs_funding_signed);
check_added_monitors!(nodes[1], 1);
let conf_height = core::cmp::max(nodes[1].best_block_info().1 + 1, nodes[2].best_block_info().1 + 1);
@ -7479,12 +7483,8 @@ fn test_priv_forwarding_rejection() {
let nodes_1_serialized = nodes[1].node.encode();
let mut monitor_a_serialized = test_utils::TestVecWriter(Vec::new());
let mut monitor_b_serialized = test_utils::TestVecWriter(Vec::new());
{
let mons = nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap();
let mut mon_iter = mons.iter();
mon_iter.next().unwrap().1.write(&mut monitor_a_serialized).unwrap();
mon_iter.next().unwrap().1.write(&mut monitor_b_serialized).unwrap();
}
get_monitor!(nodes[1], chan_id_1).write(&mut monitor_a_serialized).unwrap();
get_monitor!(nodes[1], cs_funding_signed.channel_id).write(&mut monitor_b_serialized).unwrap();
persister = test_utils::TestPersister::new();
let keys_manager = &chanmon_cfgs[1].keys_manager;
@ -8004,11 +8004,9 @@ fn test_bump_txn_sanitize_tracking_maps() {
connect_block(&nodes[0], &Block { header: header_130, txdata: penalty_txn });
connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1);
{
let monitors = nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap();
if let Some(monitor) = monitors.get(&OutPoint { txid: chan.3.txid(), index: 0 }) {
assert!(monitor.inner.lock().unwrap().onchain_tx_handler.pending_claim_requests.is_empty());
assert!(monitor.inner.lock().unwrap().onchain_tx_handler.claimable_outpoints.is_empty());
}
let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(OutPoint { txid: chan.3.txid(), index: 0 }).unwrap();
assert!(monitor.inner.lock().unwrap().onchain_tx_handler.pending_claim_requests.is_empty());
assert!(monitor.inner.lock().unwrap().onchain_tx_handler.claimable_outpoints.is_empty());
}
}
@ -8273,8 +8271,7 @@ fn test_update_err_monitor_lockdown() {
let logger = test_utils::TestLogger::with_id(format!("node {}", 0));
let persister = test_utils::TestPersister::new();
let watchtower = {
let monitors = nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap();
let monitor = monitors.get(&outpoint).unwrap();
let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(outpoint).unwrap();
let mut w = test_utils::TestVecWriter(Vec::new());
monitor.write(&mut w).unwrap();
let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor<EnforcingSigner>)>::read(
@ -8335,8 +8332,7 @@ fn test_concurrent_monitor_claim() {
let logger = test_utils::TestLogger::with_id(format!("node {}", "Alice"));
let persister = test_utils::TestPersister::new();
let watchtower_alice = {
let monitors = nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap();
let monitor = monitors.get(&outpoint).unwrap();
let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(outpoint).unwrap();
let mut w = test_utils::TestVecWriter(Vec::new());
monitor.write(&mut w).unwrap();
let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor<EnforcingSigner>)>::read(
@ -8364,8 +8360,7 @@ fn test_concurrent_monitor_claim() {
let logger = test_utils::TestLogger::with_id(format!("node {}", "Bob"));
let persister = test_utils::TestPersister::new();
let watchtower_bob = {
let monitors = nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap();
let monitor = monitors.get(&outpoint).unwrap();
let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(outpoint).unwrap();
let mut w = test_utils::TestVecWriter(Vec::new());
monitor.write(&mut w).unwrap();
let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor<EnforcingSigner>)>::read(
@ -9050,8 +9045,8 @@ fn test_forwardable_regen() {
let new_chain_monitor: test_utils::TestChainMonitor;
let nodes_1_deserialized: ChannelManager<EnforcingSigner, &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestLogger>;
let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
create_announced_chan_between_nodes(&nodes, 1, 2, InitFeatures::known(), InitFeatures::known());
let chan_id_1 = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()).2;
let chan_id_2 = create_announced_chan_between_nodes(&nodes, 1, 2, InitFeatures::known(), InitFeatures::known()).2;
// First send a payment to nodes[1]
let (route, payment_hash, payment_preimage, payment_secret) = get_route_and_payment_hash!(nodes[0], nodes[1], 100_000);
@ -9088,12 +9083,8 @@ fn test_forwardable_regen() {
let nodes_1_serialized = nodes[1].node.encode();
let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new());
let mut chan_1_monitor_serialized = test_utils::TestVecWriter(Vec::new());
{
let monitors = nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap();
let mut monitor_iter = monitors.iter();
monitor_iter.next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap();
monitor_iter.next().unwrap().1.write(&mut chan_1_monitor_serialized).unwrap();
}
get_monitor!(nodes[1], chan_id_1).write(&mut chan_0_monitor_serialized).unwrap();
get_monitor!(nodes[1], chan_id_2).write(&mut chan_1_monitor_serialized).unwrap();
persister = test_utils::TestPersister::new();
let keys_manager = &chanmon_cfgs[1].keys_manager;

View file

@ -107,9 +107,9 @@ fn chanmon_claim_value_coop_close() {
assert_eq!(vec![Balance::ClaimableOnChannelClose {
claimable_amount_satoshis: 1_000_000 - 1_000 - chan_feerate * channel::COMMITMENT_TX_BASE_WEIGHT / 1000
}],
nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances());
nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances());
assert_eq!(vec![Balance::ClaimableOnChannelClose { claimable_amount_satoshis: 1_000, }],
nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances());
nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances());
nodes[0].node.close_channel(&chan_id).unwrap();
let node_0_shutdown = get_event_msg!(nodes[0], MessageSendEvent::SendShutdown, nodes[1].node.get_our_node_id());
@ -143,20 +143,20 @@ fn chanmon_claim_value_coop_close() {
claimable_amount_satoshis: 1_000_000 - 1_000 - chan_feerate * channel::COMMITMENT_TX_BASE_WEIGHT / 1000,
confirmation_height: nodes[0].best_block_info().1 + ANTI_REORG_DELAY - 1,
}],
nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances());
nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances());
assert_eq!(vec![Balance::ClaimableAwaitingConfirmations {
claimable_amount_satoshis: 1000,
confirmation_height: nodes[1].best_block_info().1 + ANTI_REORG_DELAY - 1,
}],
nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances());
nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances());
connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1);
connect_blocks(&nodes[1], ANTI_REORG_DELAY - 1);
assert_eq!(Vec::<Balance>::new(),
nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances());
nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances());
assert_eq!(Vec::<Balance>::new(),
nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances());
nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances());
let mut node_a_spendable = nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events();
assert_eq!(node_a_spendable.len(), 1);
@ -230,11 +230,11 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) {
claimable_amount_satoshis: 4_000,
claimable_height: htlc_cltv_timeout,
}]),
sorted_vec(nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances()));
sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
assert_eq!(vec![Balance::ClaimableOnChannelClose {
claimable_amount_satoshis: 1_000,
}],
nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances());
nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances());
nodes[1].node.claim_funds(payment_preimage);
check_added_monitors!(nodes[1], 1);
@ -284,11 +284,11 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) {
});
}
assert_eq!(sorted_vec(a_expected_balances),
sorted_vec(nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances()));
sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
assert_eq!(vec![Balance::ClaimableOnChannelClose {
claimable_amount_satoshis: 1_000 + 3_000 + 4_000,
}],
nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances());
nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances());
// Broadcast the closing transaction (which has both pending HTLCs in it) and get B's
// broadcasted HTLC claim transaction with preimage.
@ -342,7 +342,7 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) {
claimable_amount_satoshis: 4_000,
claimable_height: htlc_cltv_timeout,
}]),
sorted_vec(nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances()));
sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
// The main non-HTLC balance is just awaiting confirmations, but the claimable height is the
// CSV delay, not ANTI_REORG_DELAY.
assert_eq!(sorted_vec(vec![Balance::ClaimableAwaitingConfirmations {
@ -358,7 +358,7 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) {
claimable_amount_satoshis: 4_000,
timeout_height: htlc_cltv_timeout,
}]),
sorted_vec(nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances()));
sorted_vec(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1);
expect_payment_failed!(nodes[0], dust_payment_hash, true);
@ -373,7 +373,7 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) {
claimable_amount_satoshis: 4_000,
claimable_height: htlc_cltv_timeout,
}]),
sorted_vec(nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances()));
sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
assert_eq!(sorted_vec(vec![Balance::ClaimableAwaitingConfirmations {
claimable_amount_satoshis: 1_000,
confirmation_height: node_b_commitment_claimable,
@ -384,7 +384,7 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) {
claimable_amount_satoshis: 4_000,
timeout_height: htlc_cltv_timeout,
}]),
sorted_vec(nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances()));
sorted_vec(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
let mut node_a_spendable = nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events();
assert_eq!(node_a_spendable.len(), 1);
@ -410,13 +410,13 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) {
claimable_amount_satoshis: 4_000,
claimable_height: htlc_cltv_timeout,
}]),
sorted_vec(nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances()));
sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1);
assert_eq!(vec![Balance::MaybeClaimableHTLCAwaitingTimeout {
claimable_amount_satoshis: 4_000,
claimable_height: htlc_cltv_timeout,
}],
nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances());
nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances());
// When the HTLC timeout output is spendable in the next block, A should broadcast it
connect_blocks(&nodes[0], htlc_cltv_timeout - nodes[0].best_block_info().1 - 1);
@ -441,12 +441,12 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) {
claimable_amount_satoshis: 4_000,
confirmation_height: nodes[0].best_block_info().1 + ANTI_REORG_DELAY - 1,
}],
nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances());
nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances());
// After ANTI_REORG_DELAY, A will generate a SpendableOutputs event and drop the claimable
// balance entry.
connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1);
assert_eq!(Vec::<Balance>::new(),
nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances());
nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances());
expect_payment_failed!(nodes[0], timeout_payment_hash, true);
let mut node_a_spendable = nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events();
@ -474,7 +474,7 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) {
claimable_amount_satoshis: 4_000,
timeout_height: htlc_cltv_timeout,
}]),
sorted_vec(nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances()));
sorted_vec(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
// After reaching the commitment output CSV, we'll get a SpendableOutputs event for it and have
// only the HTLCs claimable on node B.
@ -496,7 +496,7 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) {
claimable_amount_satoshis: 4_000,
timeout_height: htlc_cltv_timeout,
}]),
sorted_vec(nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances()));
sorted_vec(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
// After reaching the claimed HTLC output CSV, we'll get a SpendableOutptus event for it and
// have only one HTLC output left spendable.
@ -515,7 +515,7 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) {
claimable_amount_satoshis: 4_000,
timeout_height: htlc_cltv_timeout,
}],
nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances());
nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances());
// Finally, mine the HTLC timeout transaction that A broadcasted (even though B should be able
// to claim this HTLC with the preimage it knows!). It will remain listed as a claimable HTLC
@ -525,10 +525,10 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) {
claimable_amount_satoshis: 4_000,
timeout_height: htlc_cltv_timeout,
}],
nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances());
nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances());
connect_blocks(&nodes[1], ANTI_REORG_DELAY - 1);
assert_eq!(Vec::<Balance>::new(),
nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances());
nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances());
}
#[test]

View file

@ -238,7 +238,7 @@ fn do_test_unconf_chan(reload_node: bool, reorg_after_reload: bool, use_funding_
// it when we go to deserialize, and then use the ChannelManager.
let nodes_0_serialized = nodes[0].node.encode();
let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new());
nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter().next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap();
get_monitor!(nodes[0], chan.2).write(&mut chan_0_monitor_serialized).unwrap();
persister = test_utils::TestPersister::new();
let keys_manager = &chanmon_cfgs[0].keys_manager;

View file

@ -156,8 +156,7 @@ impl<'a> chain::Watch<EnforcingSigner> for TestChainMonitor<'a> {
let update_res = self.chain_monitor.update_channel(funding_txo, update);
// At every point where we get a monitor update, we should be able to send a useful monitor
// to a watchtower and disk...
let monitors = self.chain_monitor.monitors.read().unwrap();
let monitor = monitors.get(&funding_txo).unwrap();
let monitor = self.chain_monitor.get_monitor(funding_txo).unwrap();
w.0.clear();
monitor.write(&mut w).unwrap();
let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor<EnforcingSigner>)>::read(