mirror of
https://github.com/lightningdevkit/rust-lightning.git
synced 2025-02-23 14:50:45 +01:00
Use channel ID over funding outpoint to track monitors in ChannelManager
As motivated by the previous commit, we do some of the same work here at the `ChannelManager` level instead. Unfortunately, we still need to track the funding outpoint to support downgrades by writing the in flight monitor updates as two separate TLVs, one using the channel IDs, and the other using the funding outpoints. Once we are willing to stop supporting downgrades past this version, we can fully drop it.
This commit is contained in:
parent
d205d8d515
commit
e8854f9a07
4 changed files with 88 additions and 66 deletions
|
@ -173,7 +173,6 @@ struct LatestMonitorState {
|
|||
/// A set of (monitor id, serialized `ChannelMonitor`)s which we're currently "persisting",
|
||||
/// from LDK's perspective.
|
||||
pending_monitors: Vec<(u64, Vec<u8>)>,
|
||||
funding_txo: OutPoint,
|
||||
}
|
||||
|
||||
struct TestChainMonitor {
|
||||
|
@ -219,14 +218,12 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
|
|||
let mut ser = VecWriter(Vec::new());
|
||||
monitor.write(&mut ser).unwrap();
|
||||
let monitor_id = monitor.get_latest_update_id();
|
||||
let funding_txo = monitor.get_funding_txo().0;
|
||||
let res = self.chain_monitor.watch_channel(channel_id, monitor);
|
||||
let state = match res {
|
||||
Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState {
|
||||
persisted_monitor_id: monitor_id,
|
||||
persisted_monitor: ser.0,
|
||||
pending_monitors: Vec::new(),
|
||||
funding_txo,
|
||||
},
|
||||
Ok(chain::ChannelMonitorUpdateStatus::InProgress) => {
|
||||
panic!("The test currently doesn't test initial-persistence via the async pipeline")
|
||||
|
@ -716,7 +713,7 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
|
|||
let mut old_monitors = $old_monitors.latest_monitors.lock().unwrap();
|
||||
for (channel_id, mut prev_state) in old_monitors.drain() {
|
||||
monitors.insert(
|
||||
prev_state.funding_txo,
|
||||
channel_id,
|
||||
<(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(
|
||||
&mut Cursor::new(&prev_state.persisted_monitor),
|
||||
(&*$keys_manager, &*$keys_manager),
|
||||
|
@ -731,8 +728,8 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
|
|||
chain_monitor.latest_monitors.lock().unwrap().insert(channel_id, prev_state);
|
||||
}
|
||||
let mut monitor_refs = new_hash_map();
|
||||
for (outpoint, monitor) in monitors.iter() {
|
||||
monitor_refs.insert(*outpoint, monitor);
|
||||
for (channel_id, monitor) in monitors.iter() {
|
||||
monitor_refs.insert(*channel_id, monitor);
|
||||
}
|
||||
|
||||
let read_args = ChannelManagerReadArgs {
|
||||
|
@ -755,9 +752,9 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
|
|||
.1,
|
||||
chain_monitor.clone(),
|
||||
);
|
||||
for (_, mon) in monitors.drain() {
|
||||
for (channel_id, mon) in monitors.drain() {
|
||||
assert_eq!(
|
||||
chain_monitor.chain_monitor.watch_channel(mon.channel_id(), mon),
|
||||
chain_monitor.chain_monitor.watch_channel(channel_id, mon),
|
||||
Ok(ChannelMonitorUpdateStatus::Completed)
|
||||
);
|
||||
}
|
||||
|
|
|
@ -1334,7 +1334,8 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
|
|||
/// for broadcast messages, where ordering isn't as strict).
|
||||
pub(super) pending_msg_events: Vec<MessageSendEvent>,
|
||||
/// Map from Channel IDs to pending [`ChannelMonitorUpdate`]s which have been passed to the
|
||||
/// user but which have not yet completed.
|
||||
/// user but which have not yet completed. We still keep the funding outpoint around to backfill
|
||||
/// the legacy TLV field to support downgrading.
|
||||
///
|
||||
/// Note that the channel may no longer exist. For example if the channel was closed but we
|
||||
/// later needed to claim an HTLC which is pending on-chain, we may generate a monitor update
|
||||
|
@ -1346,7 +1347,7 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
|
|||
/// where we complete one [`ChannelMonitorUpdate`] (but there are more pending as background
|
||||
/// events) but we conclude all pending [`ChannelMonitorUpdate`]s have completed and its safe
|
||||
/// to run post-completion actions.
|
||||
in_flight_monitor_updates: BTreeMap<OutPoint, Vec<ChannelMonitorUpdate>>,
|
||||
in_flight_monitor_updates: BTreeMap<ChannelId, (OutPoint, Vec<ChannelMonitorUpdate>)>,
|
||||
/// Map from a specific channel to some action(s) that should be taken when all pending
|
||||
/// [`ChannelMonitorUpdate`]s for the channel complete updating.
|
||||
///
|
||||
|
@ -1389,7 +1390,7 @@ impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider {
|
|||
if require_disconnected && self.is_connected {
|
||||
return false
|
||||
}
|
||||
for (_, updates) in self.in_flight_monitor_updates.iter() {
|
||||
for (_, updates) in self.in_flight_monitor_updates.values() {
|
||||
if !updates.is_empty() {
|
||||
return false;
|
||||
}
|
||||
|
@ -3309,8 +3310,8 @@ macro_rules! handle_new_monitor_update {
|
|||
$chan_id: expr, $counterparty_node_id: expr, $in_flight_updates: ident, $update_idx: ident,
|
||||
_internal_outer, $completed: expr
|
||||
) => { {
|
||||
$in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo)
|
||||
.or_insert_with(Vec::new);
|
||||
$in_flight_updates = &mut $peer_state.in_flight_monitor_updates.entry($chan_id)
|
||||
.or_insert_with(|| ($funding_txo, Vec::new())).1;
|
||||
// During startup, we push monitor updates as background events through to here in
|
||||
// order to replay updates that were in-flight when we shut down. Thus, we have to
|
||||
// filter for uniqueness here.
|
||||
|
@ -4031,11 +4032,11 @@ where
|
|||
// `MonitorUpdateCompletionAction`s.
|
||||
// TODO: If we do the `in_flight_monitor_updates.is_empty()` check in
|
||||
// `locked_close_channel` we can skip the locks here.
|
||||
if let Some(funding_txo) = shutdown_res.channel_funding_txo {
|
||||
if shutdown_res.channel_funding_txo.is_some() {
|
||||
let per_peer_state = self.per_peer_state.read().unwrap();
|
||||
if let Some(peer_state_mtx) = per_peer_state.get(&shutdown_res.counterparty_node_id) {
|
||||
let mut peer_state = peer_state_mtx.lock().unwrap();
|
||||
if peer_state.in_flight_monitor_updates.get(&funding_txo).map(|l| l.is_empty()).unwrap_or(true) {
|
||||
if peer_state.in_flight_monitor_updates.get(&shutdown_res.channel_id).map(|(_, updates)| updates.is_empty()).unwrap_or(true) {
|
||||
let update_actions = peer_state.monitor_update_blocked_actions
|
||||
.remove(&shutdown_res.channel_id).unwrap_or(Vec::new());
|
||||
|
||||
|
@ -7593,7 +7594,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
|
|||
let peer_state = &mut *peer_state_lock;
|
||||
|
||||
let remaining_in_flight =
|
||||
if let Some(pending) = peer_state.in_flight_monitor_updates.get_mut(funding_txo) {
|
||||
if let Some((_, pending)) = peer_state.in_flight_monitor_updates.get_mut(channel_id) {
|
||||
pending.retain(|upd| upd.update_id > highest_applied_update_id);
|
||||
pending.len()
|
||||
} else { 0 };
|
||||
|
@ -12931,12 +12932,15 @@ where
|
|||
pending_claiming_payments = None;
|
||||
}
|
||||
|
||||
let mut in_flight_monitor_updates: Option<HashMap<(&PublicKey, &OutPoint), &Vec<ChannelMonitorUpdate>>> = None;
|
||||
let mut legacy_in_flight_monitor_updates: Option<HashMap<(&PublicKey, &OutPoint), &Vec<ChannelMonitorUpdate>>> = None;
|
||||
let mut in_flight_monitor_updates: Option<HashMap<(&PublicKey, &ChannelId), &Vec<ChannelMonitorUpdate>>> = None;
|
||||
for ((counterparty_id, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) {
|
||||
for (funding_outpoint, updates) in peer_state.in_flight_monitor_updates.iter() {
|
||||
for (channel_id, (funding_txo, updates)) in peer_state.in_flight_monitor_updates.iter() {
|
||||
if !updates.is_empty() {
|
||||
if in_flight_monitor_updates.is_none() { in_flight_monitor_updates = Some(new_hash_map()); }
|
||||
in_flight_monitor_updates.as_mut().unwrap().insert((counterparty_id, funding_outpoint), updates);
|
||||
legacy_in_flight_monitor_updates.get_or_insert_with(|| new_hash_map())
|
||||
.insert((counterparty_id, funding_txo), updates);
|
||||
in_flight_monitor_updates.get_or_insert_with(|| new_hash_map())
|
||||
.insert((counterparty_id, channel_id), updates);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -12951,11 +12955,12 @@ where
|
|||
(7, self.fake_scid_rand_bytes, required),
|
||||
(8, if events_not_backwards_compatible { Some(&*events) } else { None }, option),
|
||||
(9, htlc_purposes, required_vec),
|
||||
(10, in_flight_monitor_updates, option),
|
||||
(10, legacy_in_flight_monitor_updates, option),
|
||||
(11, self.probing_cookie_secret, required),
|
||||
(13, htlc_onion_fields, optional_vec),
|
||||
(14, decode_update_add_htlcs_opt, option),
|
||||
(15, self.inbound_payment_id_secret, required),
|
||||
(17, in_flight_monitor_updates, required),
|
||||
});
|
||||
|
||||
Ok(())
|
||||
|
@ -13091,8 +13096,7 @@ where
|
|||
/// runtime settings which were stored when the ChannelManager was serialized.
|
||||
pub default_config: UserConfig,
|
||||
|
||||
/// A map from channel funding outpoints to ChannelMonitors for those channels (ie
|
||||
/// value.context.get_funding_txo() should be the key).
|
||||
/// A map from channel IDs to ChannelMonitors for those channels.
|
||||
///
|
||||
/// If a monitor is inconsistent with the channel state during deserialization the channel will
|
||||
/// be force-closed using the data in the ChannelMonitor and the channel will be dropped. This
|
||||
|
@ -13103,7 +13107,7 @@ where
|
|||
/// this struct.
|
||||
///
|
||||
/// This is not exported to bindings users because we have no HashMap bindings
|
||||
pub channel_monitors: HashMap<OutPoint, &'a ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>>,
|
||||
pub channel_monitors: HashMap<ChannelId, &'a ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>>,
|
||||
}
|
||||
|
||||
impl<'a, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, MR: Deref, L: Deref>
|
||||
|
@ -13132,7 +13136,7 @@ where
|
|||
entropy_source, node_signer, signer_provider, fee_estimator, chain_monitor,
|
||||
tx_broadcaster, router, message_router, logger, default_config,
|
||||
channel_monitors: hash_map_from_iter(
|
||||
channel_monitors.drain(..).map(|monitor| { (monitor.get_funding_txo().0, monitor) })
|
||||
channel_monitors.drain(..).map(|monitor| { (monitor.channel_id(), monitor) })
|
||||
),
|
||||
}
|
||||
}
|
||||
|
@ -13195,22 +13199,21 @@ where
|
|||
|
||||
let mut failed_htlcs = Vec::new();
|
||||
let channel_count: u64 = Readable::read(reader)?;
|
||||
let mut funding_txo_set = hash_set_with_capacity(cmp::min(channel_count as usize, 128));
|
||||
let mut channel_id_set = hash_set_with_capacity(cmp::min(channel_count as usize, 128));
|
||||
let mut per_peer_state = hash_map_with_capacity(cmp::min(channel_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(PublicKey, Mutex<PeerState<SP>>)>()));
|
||||
let mut outpoint_to_peer = hash_map_with_capacity(cmp::min(channel_count as usize, 128));
|
||||
let mut short_to_chan_info = hash_map_with_capacity(cmp::min(channel_count as usize, 128));
|
||||
let mut channel_closures = VecDeque::new();
|
||||
let mut close_background_events = Vec::new();
|
||||
let mut funding_txo_to_channel_id = hash_map_with_capacity(channel_count as usize);
|
||||
for _ in 0..channel_count {
|
||||
let mut channel: FundedChannel<SP> = FundedChannel::read(reader, (
|
||||
&args.entropy_source, &args.signer_provider, best_block_height, &provided_channel_type_features(&args.default_config)
|
||||
))?;
|
||||
let logger = WithChannelContext::from(&args.logger, &channel.context, None);
|
||||
let channel_id = channel.context.channel_id();
|
||||
channel_id_set.insert(channel_id);
|
||||
let funding_txo = channel.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?;
|
||||
funding_txo_to_channel_id.insert(funding_txo, channel.context.channel_id());
|
||||
funding_txo_set.insert(funding_txo.clone());
|
||||
if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) {
|
||||
if let Some(ref mut monitor) = args.channel_monitors.get_mut(&channel_id) {
|
||||
if channel.get_cur_holder_commitment_transaction_number() > monitor.get_cur_holder_commitment_number() ||
|
||||
channel.get_revoked_counterparty_commitment_transaction_number() > monitor.get_min_seen_secret() ||
|
||||
channel.get_cur_counterparty_commitment_transaction_number() > monitor.get_cur_counterparty_commitment_number() ||
|
||||
|
@ -13293,9 +13296,7 @@ where
|
|||
if let Some(short_channel_id) = channel.context.get_short_channel_id() {
|
||||
short_to_chan_info.insert(short_channel_id, (channel.context.get_counterparty_node_id(), channel.context.channel_id()));
|
||||
}
|
||||
if let Some(funding_txo) = channel.context.get_funding_txo() {
|
||||
outpoint_to_peer.insert(funding_txo, channel.context.get_counterparty_node_id());
|
||||
}
|
||||
outpoint_to_peer.insert(funding_txo, channel.context.get_counterparty_node_id());
|
||||
per_peer_state.entry(channel.context.get_counterparty_node_id())
|
||||
.or_insert_with(|| Mutex::new(empty_peer_state()))
|
||||
.get_mut().unwrap()
|
||||
|
@ -13325,8 +13326,8 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
for (funding_txo, monitor) in args.channel_monitors.iter() {
|
||||
if !funding_txo_set.contains(funding_txo) {
|
||||
for (channel_id, monitor) in args.channel_monitors.iter() {
|
||||
if !channel_id_set.contains(channel_id) {
|
||||
let mut should_queue_fc_update = false;
|
||||
if let Some(counterparty_node_id) = monitor.get_counterparty_node_id() {
|
||||
// If the ChannelMonitor had any updates, we may need to update it further and
|
||||
|
@ -13364,10 +13365,11 @@ where
|
|||
updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast: true }],
|
||||
channel_id: Some(monitor.channel_id()),
|
||||
};
|
||||
let funding_txo = monitor.get_funding_txo().0;
|
||||
if let Some(counterparty_node_id) = monitor.get_counterparty_node_id() {
|
||||
let update = BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
|
||||
counterparty_node_id,
|
||||
funding_txo: *funding_txo,
|
||||
funding_txo,
|
||||
channel_id,
|
||||
update: monitor_update,
|
||||
};
|
||||
|
@ -13380,7 +13382,7 @@ where
|
|||
// generate a `ChannelMonitorUpdate` for it aside from this
|
||||
// `ChannelForceClosed` one.
|
||||
monitor_update.update_id = u64::MAX;
|
||||
close_background_events.push(BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((*funding_txo, channel_id, monitor_update)));
|
||||
close_background_events.push(BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((funding_txo, channel_id, monitor_update)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -13480,7 +13482,10 @@ where
|
|||
let mut pending_claiming_payments = Some(new_hash_map());
|
||||
let mut monitor_update_blocked_actions_per_peer: Option<Vec<(_, BTreeMap<_, Vec<_>>)>> = Some(Vec::new());
|
||||
let mut events_override = None;
|
||||
let mut in_flight_monitor_updates: Option<HashMap<(PublicKey, OutPoint), Vec<ChannelMonitorUpdate>>> = None;
|
||||
let mut legacy_in_flight_monitor_updates: Option<HashMap<(PublicKey, OutPoint), Vec<ChannelMonitorUpdate>>> = None;
|
||||
// We use this one over the legacy since they represent the same data, just with a different
|
||||
// key. We still need to read the legacy one as it's an even TLV.
|
||||
let mut in_flight_monitor_updates: Option<HashMap<(PublicKey, ChannelId), Vec<ChannelMonitorUpdate>>> = None;
|
||||
let mut decode_update_add_htlcs: Option<HashMap<u64, Vec<msgs::UpdateAddHTLC>>> = None;
|
||||
let mut inbound_payment_id_secret = None;
|
||||
read_tlv_fields!(reader, {
|
||||
|
@ -13493,11 +13498,12 @@ where
|
|||
(7, fake_scid_rand_bytes, option),
|
||||
(8, events_override, option),
|
||||
(9, claimable_htlc_purposes, optional_vec),
|
||||
(10, in_flight_monitor_updates, option),
|
||||
(10, legacy_in_flight_monitor_updates, option),
|
||||
(11, probing_cookie_secret, option),
|
||||
(13, claimable_htlc_onion_fields, optional_vec),
|
||||
(14, decode_update_add_htlcs, option),
|
||||
(15, inbound_payment_id_secret, option),
|
||||
(17, in_flight_monitor_updates, required),
|
||||
});
|
||||
let mut decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
|
||||
if fake_scid_rand_bytes.is_none() {
|
||||
|
@ -13531,6 +13537,27 @@ where
|
|||
}
|
||||
let pending_outbounds = OutboundPayments::new(pending_outbound_payments.unwrap());
|
||||
|
||||
// Handle transitioning from the legacy TLV to the new one on upgrades.
|
||||
if let Some(legacy_in_flight_upds) = legacy_in_flight_monitor_updates {
|
||||
// We should never serialize an empty map.
|
||||
if legacy_in_flight_upds.is_empty() {
|
||||
return Err(DecodeError::InvalidValue);
|
||||
}
|
||||
if in_flight_monitor_updates.is_none() {
|
||||
let in_flight_upds = in_flight_monitor_updates.get_or_insert_with(|| new_hash_map());
|
||||
for ((counterparty_node_id, funding_txo), updates) in legacy_in_flight_upds {
|
||||
// All channels with legacy in flight monitor updates are v1 channels.
|
||||
let channel_id = ChannelId::v1_from_funding_outpoint(funding_txo);
|
||||
in_flight_upds.insert((counterparty_node_id, channel_id), updates);
|
||||
}
|
||||
} else {
|
||||
// We should never serialize an empty map.
|
||||
if in_flight_monitor_updates.as_ref().unwrap().is_empty() {
|
||||
return Err(DecodeError::InvalidValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We have to replay (or skip, if they were completed after we wrote the `ChannelManager`)
|
||||
// each `ChannelMonitorUpdate` in `in_flight_monitor_updates`. After doing so, we have to
|
||||
// check that each channel we have isn't newer than the latest `ChannelMonitorUpdate`(s) we
|
||||
|
@ -13544,11 +13571,12 @@ where
|
|||
// Because the actual handling of the in-flight updates is the same, it's macro'ized here:
|
||||
let mut pending_background_events = Vec::new();
|
||||
macro_rules! handle_in_flight_updates {
|
||||
($counterparty_node_id: expr, $chan_in_flight_upds: expr, $funding_txo: expr,
|
||||
$monitor: expr, $peer_state: expr, $logger: expr, $channel_info_log: expr
|
||||
($counterparty_node_id: expr, $chan_in_flight_upds: expr, $monitor: expr,
|
||||
$peer_state: expr, $logger: expr, $channel_info_log: expr
|
||||
) => { {
|
||||
let mut max_in_flight_update_id = 0;
|
||||
$chan_in_flight_upds.retain(|upd| upd.update_id > $monitor.get_latest_update_id());
|
||||
let funding_txo = $monitor.get_funding_txo().0;
|
||||
for update in $chan_in_flight_upds.iter() {
|
||||
log_trace!($logger, "Replaying ChannelMonitorUpdate {} for {}channel {}",
|
||||
update.update_id, $channel_info_log, &$monitor.channel_id());
|
||||
|
@ -13556,7 +13584,7 @@ where
|
|||
pending_background_events.push(
|
||||
BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
|
||||
counterparty_node_id: $counterparty_node_id,
|
||||
funding_txo: $funding_txo,
|
||||
funding_txo: funding_txo,
|
||||
channel_id: $monitor.channel_id(),
|
||||
update: update.clone(),
|
||||
});
|
||||
|
@ -13575,7 +13603,7 @@ where
|
|||
.and_modify(|v| *v = cmp::max(max_in_flight_update_id, *v))
|
||||
.or_insert(max_in_flight_update_id);
|
||||
}
|
||||
if $peer_state.in_flight_monitor_updates.insert($funding_txo, $chan_in_flight_upds).is_some() {
|
||||
if $peer_state.in_flight_monitor_updates.insert($monitor.channel_id(), (funding_txo, $chan_in_flight_upds)).is_some() {
|
||||
log_error!($logger, "Duplicate in-flight monitor update set for the same channel!");
|
||||
return Err(DecodeError::InvalidValue);
|
||||
}
|
||||
|
@ -13586,28 +13614,27 @@ where
|
|||
for (counterparty_id, peer_state_mtx) in per_peer_state.iter_mut() {
|
||||
let mut peer_state_lock = peer_state_mtx.lock().unwrap();
|
||||
let peer_state = &mut *peer_state_lock;
|
||||
for chan in peer_state.channel_by_id.values() {
|
||||
for (chan_id, chan) in peer_state.channel_by_id.iter() {
|
||||
if let Some(funded_chan) = chan.as_funded() {
|
||||
let logger = WithChannelContext::from(&args.logger, &funded_chan.context, None);
|
||||
|
||||
// Channels that were persisted have to be funded, otherwise they should have been
|
||||
// discarded.
|
||||
let funding_txo = funded_chan.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?;
|
||||
let monitor = args.channel_monitors.get(&funding_txo)
|
||||
let monitor = args.channel_monitors.get(chan_id)
|
||||
.expect("We already checked for monitor presence when loading channels");
|
||||
let mut max_in_flight_update_id = monitor.get_latest_update_id();
|
||||
if let Some(in_flight_upds) = &mut in_flight_monitor_updates {
|
||||
if let Some(mut chan_in_flight_upds) = in_flight_upds.remove(&(*counterparty_id, funding_txo)) {
|
||||
if let Some(mut chan_in_flight_upds) = in_flight_upds.remove(&(*counterparty_id, *chan_id)) {
|
||||
max_in_flight_update_id = cmp::max(max_in_flight_update_id,
|
||||
handle_in_flight_updates!(*counterparty_id, chan_in_flight_upds,
|
||||
funding_txo, monitor, peer_state, logger, ""));
|
||||
monitor, peer_state, logger, ""));
|
||||
}
|
||||
}
|
||||
if funded_chan.get_latest_unblocked_monitor_update_id() > max_in_flight_update_id {
|
||||
// If the channel is ahead of the monitor, return DangerousValue:
|
||||
log_error!(logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!");
|
||||
log_error!(logger, " The ChannelMonitor for channel {} is at update_id {} with update_id through {} in-flight",
|
||||
funded_chan.context.channel_id(), monitor.get_latest_update_id(), max_in_flight_update_id);
|
||||
chan_id, monitor.get_latest_update_id(), max_in_flight_update_id);
|
||||
log_error!(logger, " but the ChannelManager is at update_id {}.", funded_chan.get_latest_unblocked_monitor_update_id());
|
||||
log_error!(logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,");
|
||||
log_error!(logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!");
|
||||
|
@ -13625,10 +13652,9 @@ where
|
|||
}
|
||||
|
||||
if let Some(in_flight_upds) = in_flight_monitor_updates {
|
||||
for ((counterparty_id, funding_txo), mut chan_in_flight_updates) in in_flight_upds {
|
||||
let channel_id = funding_txo_to_channel_id.get(&funding_txo).copied();
|
||||
let logger = WithContext::from(&args.logger, Some(counterparty_id), channel_id, None);
|
||||
if let Some(monitor) = args.channel_monitors.get(&funding_txo) {
|
||||
for ((counterparty_id, channel_id), mut chan_in_flight_updates) in in_flight_upds {
|
||||
let logger = WithContext::from(&args.logger, Some(counterparty_id), Some(channel_id), None);
|
||||
if let Some(monitor) = args.channel_monitors.get(&channel_id) {
|
||||
// Now that we've removed all the in-flight monitor updates for channels that are
|
||||
// still open, we need to replay any monitor updates that are for closed channels,
|
||||
// creating the neccessary peer_state entries as we go.
|
||||
|
@ -13636,12 +13662,11 @@ where
|
|||
Mutex::new(empty_peer_state())
|
||||
});
|
||||
let mut peer_state = peer_state_mutex.lock().unwrap();
|
||||
handle_in_flight_updates!(counterparty_id, chan_in_flight_updates,
|
||||
funding_txo, monitor, peer_state, logger, "closed ");
|
||||
handle_in_flight_updates!(counterparty_id, chan_in_flight_updates, monitor,
|
||||
peer_state, logger, "closed ");
|
||||
} else {
|
||||
log_error!(logger, "A ChannelMonitor is missing even though we have in-flight updates for it! This indicates a potentially-critical violation of the chain::Watch API!");
|
||||
log_error!(logger, " The ChannelMonitor for channel {} is missing.", if let Some(channel_id) =
|
||||
channel_id { channel_id.to_string() } else { format!("with outpoint {}", funding_txo) } );
|
||||
log_error!(logger, " The ChannelMonitor for channel {} is missing.", channel_id);
|
||||
log_error!(logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,");
|
||||
log_error!(logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!");
|
||||
log_error!(logger, " Without the latest ChannelMonitor we cannot continue without risking funds.");
|
||||
|
@ -13692,9 +13717,9 @@ where
|
|||
.and_modify(|v| *v = cmp::max(update.update_id, *v))
|
||||
.or_insert(update.update_id);
|
||||
}
|
||||
let in_flight_updates = per_peer_state.in_flight_monitor_updates
|
||||
.entry(*funding_txo)
|
||||
.or_insert_with(Vec::new);
|
||||
let in_flight_updates = &mut per_peer_state.in_flight_monitor_updates
|
||||
.entry(*channel_id)
|
||||
.or_insert_with(|| (*funding_txo, Vec::new())).1;
|
||||
debug_assert!(!in_flight_updates.iter().any(|upd| upd == update));
|
||||
in_flight_updates.push(update.clone());
|
||||
}
|
||||
|
@ -13818,7 +13843,7 @@ where
|
|||
.filter_map(|(htlc_source, (htlc, preimage_opt))| {
|
||||
if let HTLCSource::PreviousHopData(prev_hop) = &htlc_source {
|
||||
if let Some(payment_preimage) = preimage_opt {
|
||||
let inbound_edge_monitor = args.channel_monitors.get(&prev_hop.outpoint);
|
||||
let inbound_edge_monitor = args.channel_monitors.get(&prev_hop.channel_id);
|
||||
// Note that for channels which have gone to chain,
|
||||
// `get_all_current_outbound_htlcs` is never pruned and always returns
|
||||
// a constant set until the monitor is removed/archived. Thus, we
|
||||
|
@ -14306,7 +14331,7 @@ where
|
|||
);
|
||||
}
|
||||
}
|
||||
if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.outpoint) {
|
||||
if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.channel_id) {
|
||||
// Note that this is unsafe as we no longer require the
|
||||
// `ChannelMonitor`s to be re-persisted prior to this
|
||||
// `ChannelManager` being persisted after we get started running.
|
||||
|
|
|
@ -712,7 +712,7 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> {
|
|||
{
|
||||
let mut channel_monitors = new_hash_map();
|
||||
for monitor in deserialized_monitors.iter() {
|
||||
channel_monitors.insert(monitor.get_funding_txo().0, monitor);
|
||||
channel_monitors.insert(monitor.channel_id(), monitor);
|
||||
}
|
||||
|
||||
let scorer = RwLock::new(test_utils::TestScorer::new());
|
||||
|
@ -1138,7 +1138,7 @@ pub fn _reload_node<'a, 'b, 'c>(node: &'a Node<'a, 'b, 'c>, default_config: User
|
|||
let (_, node_deserialized) = {
|
||||
let mut channel_monitors = new_hash_map();
|
||||
for monitor in monitors_read.iter() {
|
||||
assert!(channel_monitors.insert(monitor.get_funding_txo().0, monitor).is_none());
|
||||
assert!(channel_monitors.insert(monitor.channel_id(), monitor).is_none());
|
||||
}
|
||||
<(BlockHash, TestChannelManager<'b, 'c>)>::read(&mut node_read, ChannelManagerReadArgs {
|
||||
default_config,
|
||||
|
|
|
@ -432,7 +432,7 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() {
|
|||
chain_monitor: nodes[0].chain_monitor,
|
||||
tx_broadcaster: nodes[0].tx_broadcaster,
|
||||
logger: &logger,
|
||||
channel_monitors: node_0_stale_monitors.iter().map(|monitor| { (monitor.get_funding_txo().0, monitor) }).collect(),
|
||||
channel_monitors: node_0_stale_monitors.iter().map(|monitor| { (monitor.channel_id(), monitor) }).collect(),
|
||||
}) { } else {
|
||||
panic!("If the monitor(s) are stale, this indicates a bug and we should get an Err return");
|
||||
};
|
||||
|
@ -450,7 +450,7 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() {
|
|||
chain_monitor: nodes[0].chain_monitor,
|
||||
tx_broadcaster: nodes[0].tx_broadcaster,
|
||||
logger: &logger,
|
||||
channel_monitors: node_0_monitors.iter().map(|monitor| { (monitor.get_funding_txo().0, monitor) }).collect(),
|
||||
channel_monitors: node_0_monitors.iter().map(|monitor| { (monitor.channel_id(), monitor) }).collect(),
|
||||
}).unwrap();
|
||||
nodes_0_deserialized = nodes_0_deserialized_tmp;
|
||||
assert!(nodes_0_read.is_empty());
|
||||
|
|
Loading…
Add table
Reference in a new issue