Replay MPP claims via background events using new CM metadata

When we claim an MPP payment, then crash before persisting all the
relevant `ChannelMonitor`s, we rely on the payment data being
available in the `ChannelManager` on restart to re-claim any parts
that haven't yet been claimed. This is fine as long as the
`ChannelManager` was persisted before the `PaymentClaimable` event
was processed, which is generally the case in our
`lightning-background-processor`, but may not be in other cases or
in a somewhat rare race.

In order to fix this, we need to track where all the MPP parts of
a payment are in the `ChannelMonitor`, allowing us to re-claim any
missing pieces without reference to any `ChannelManager` data.

Further, in order to properly generate a `PaymentClaimed` event
against the re-started claim, we have to store various payment
metadata with the HTLC list as well.

Here we finally implement claiming using the new MPP part list and
metadata stored in `ChannelMonitor`s. In doing so, we use much more
of the existing HTLC-claiming pipeline in `ChannelManager`,
utilizing the on-startup background events flow as well as properly
re-applying the RAA-blockers to ensure preimages cannot be lost.
This commit is contained in:
Matt Corallo 2024-09-30 20:09:01 +00:00
parent 254b78fd35
commit 4896e20086
3 changed files with 202 additions and 75 deletions

View file

@ -1290,7 +1290,7 @@ pub(super) struct ChannelContext<SP: Deref> where SP::Target: SignerProvider {
// further `send_update_fee` calls, dropping the previous holding cell update entirely. // further `send_update_fee` calls, dropping the previous holding cell update entirely.
holding_cell_update_fee: Option<u32>, holding_cell_update_fee: Option<u32>,
next_holder_htlc_id: u64, next_holder_htlc_id: u64,
next_counterparty_htlc_id: u64, pub(super) next_counterparty_htlc_id: u64,
feerate_per_kw: u32, feerate_per_kw: u32,
/// The timestamp set on our latest `channel_update` message for this channel. It is updated /// The timestamp set on our latest `channel_update` message for this channel. It is updated

View file

@ -1130,9 +1130,34 @@ impl_writeable_tlv_based_enum!(EventCompletionAction,
} }
); );
/// The source argument which is passed to [`ChannelManager::claim_mpp_part`].
///
/// This is identical to [`MPPClaimHTLCSource`] except that [`Self::counterparty_node_id`] is an
/// `Option`, whereas it is required in [`MPPClaimHTLCSource`]. In the future, we should ideally
/// drop this and merge the two, however doing so may break upgrades for nodes which have pending
/// forwarded payments.
struct HTLCClaimSource {
counterparty_node_id: Option<PublicKey>,
funding_txo: OutPoint,
channel_id: ChannelId,
htlc_id: u64,
}
impl From<&MPPClaimHTLCSource> for HTLCClaimSource {
fn from(o: &MPPClaimHTLCSource) -> HTLCClaimSource {
HTLCClaimSource {
counterparty_node_id: Some(o.counterparty_node_id),
funding_txo: o.funding_txo,
channel_id: o.channel_id,
htlc_id: o.htlc_id,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
/// The source of an HTLC which is being claimed as a part of an incoming payment. Each part is /// The source of an HTLC which is being claimed as a part of an incoming payment. Each part is
/// tracked in [`PendingMPPClaim`]. /// tracked in [`PendingMPPClaim`] as well as in [`ChannelMonitor`]s, so that it can be converted
/// to an [`HTLCClaimSource`] for claim replays on startup.
struct MPPClaimHTLCSource { struct MPPClaimHTLCSource {
counterparty_node_id: PublicKey, counterparty_node_id: PublicKey,
funding_txo: OutPoint, funding_txo: OutPoint,
@ -6896,6 +6921,27 @@ where
>( >(
&self, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, &self, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage,
payment_info: Option<PaymentClaimDetails>, completion_action: ComplFunc, payment_info: Option<PaymentClaimDetails>, completion_action: ComplFunc,
) {
let counterparty_node_id =
match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()),
None => None
};
let htlc_source = HTLCClaimSource {
counterparty_node_id,
funding_txo: prev_hop.outpoint,
channel_id: prev_hop.channel_id,
htlc_id: prev_hop.htlc_id,
};
self.claim_mpp_part(htlc_source, payment_preimage, payment_info, completion_action)
}
fn claim_mpp_part<
ComplFunc: FnOnce(Option<u64>, bool) -> (Option<MonitorUpdateCompletionAction>, Option<RAAMonitorUpdateBlockingAction>)
>(
&self, prev_hop: HTLCClaimSource, payment_preimage: PaymentPreimage,
payment_info: Option<PaymentClaimDetails>, completion_action: ComplFunc,
) { ) {
//TODO: Delay the claimed_funds relaying just like we do outbound relay! //TODO: Delay the claimed_funds relaying just like we do outbound relay!
@ -6912,12 +6958,8 @@ where
{ {
let per_peer_state = self.per_peer_state.read().unwrap(); let per_peer_state = self.per_peer_state.read().unwrap();
let chan_id = prev_hop.channel_id; let chan_id = prev_hop.channel_id;
let counterparty_node_id_opt = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()),
None => None
};
let peer_state_opt = counterparty_node_id_opt.as_ref().map( let peer_state_opt = prev_hop.counterparty_node_id.as_ref().map(
|counterparty_node_id| per_peer_state.get(counterparty_node_id) |counterparty_node_id| per_peer_state.get(counterparty_node_id)
.map(|peer_mutex| peer_mutex.lock().unwrap()) .map(|peer_mutex| peer_mutex.lock().unwrap())
).unwrap_or(None); ).unwrap_or(None);
@ -6944,7 +6986,7 @@ where
peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker); peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker);
} }
if !during_init { if !during_init {
handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock, handle_new_monitor_update!(self, prev_hop.funding_txo, monitor_update, peer_state_lock,
peer_state, per_peer_state, chan); peer_state, per_peer_state, chan);
} else { } else {
// If we're running during init we cannot update a monitor directly - // If we're running during init we cannot update a monitor directly -
@ -6953,7 +6995,7 @@ where
self.pending_background_events.lock().unwrap().push( self.pending_background_events.lock().unwrap().push(
BackgroundEvent::MonitorUpdateRegeneratedOnStartup { BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
counterparty_node_id, counterparty_node_id,
funding_txo: prev_hop.outpoint, funding_txo: prev_hop.funding_txo,
channel_id: prev_hop.channel_id, channel_id: prev_hop.channel_id,
update: monitor_update.clone(), update: monitor_update.clone(),
}); });
@ -7027,7 +7069,7 @@ where
} }
let preimage_update = ChannelMonitorUpdate { let preimage_update = ChannelMonitorUpdate {
update_id: CLOSED_CHANNEL_UPDATE_ID, update_id: CLOSED_CHANNEL_UPDATE_ID,
counterparty_node_id: None, counterparty_node_id: prev_hop.counterparty_node_id,
updates: vec![ChannelMonitorUpdateStep::PaymentPreimage { updates: vec![ChannelMonitorUpdateStep::PaymentPreimage {
payment_preimage, payment_preimage,
payment_info, payment_info,
@ -7038,7 +7080,7 @@ where
if !during_init { if !during_init {
// We update the ChannelMonitor on the backward link, after // We update the ChannelMonitor on the backward link, after
// receiving an `update_fulfill_htlc` from the forward link. // receiving an `update_fulfill_htlc` from the forward link.
let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update); let update_res = self.chain_monitor.update_channel(prev_hop.funding_txo, &preimage_update);
if update_res != ChannelMonitorUpdateStatus::Completed { if update_res != ChannelMonitorUpdateStatus::Completed {
// TODO: This needs to be handled somehow - if we receive a monitor update // TODO: This needs to be handled somehow - if we receive a monitor update
// with a preimage we *must* somehow manage to propagate it to the upstream // with a preimage we *must* somehow manage to propagate it to the upstream
@ -7061,7 +7103,7 @@ where
// complete the monitor update completion action from `completion_action`. // complete the monitor update completion action from `completion_action`.
self.pending_background_events.lock().unwrap().push( self.pending_background_events.lock().unwrap().push(
BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup(( BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((
prev_hop.outpoint, prev_hop.channel_id, preimage_update, prev_hop.funding_txo, prev_hop.channel_id, preimage_update,
))); )));
} }
// Note that we do process the completion action here. This totally could be a // Note that we do process the completion action here. This totally could be a
@ -7312,7 +7354,7 @@ where
onion_fields, onion_fields,
payment_id, payment_id,
}) = payment { }) = payment {
self.pending_events.lock().unwrap().push_back((events::Event::PaymentClaimed { let event = events::Event::PaymentClaimed {
payment_hash, payment_hash,
purpose, purpose,
amount_msat, amount_msat,
@ -7321,7 +7363,16 @@ where
sender_intended_total_msat, sender_intended_total_msat,
onion_fields, onion_fields,
payment_id, payment_id,
}, None)); };
let event_action = (event, None);
let mut pending_events = self.pending_events.lock().unwrap();
// If we're replaying a claim on startup we may end up duplicating an event
// that's already in our queue, so check before we push another one. The
// `payment_id` should suffice to ensure we never spuriously drop a second
// event for a duplicate payment.
if !pending_events.contains(&event_action) {
pending_events.push_back(event_action);
}
} }
}, },
MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel { MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
@ -13130,7 +13181,65 @@ where
}; };
for (_, monitor) in args.channel_monitors.iter() { for (_, monitor) in args.channel_monitors.iter() {
for (payment_hash, (payment_preimage, _)) in monitor.get_stored_preimages() { for (payment_hash, (payment_preimage, payment_claims)) in monitor.get_stored_preimages() {
if !payment_claims.is_empty() {
for payment_claim in payment_claims {
if payment_claim.mpp_parts.is_empty() {
return Err(DecodeError::InvalidValue);
}
let pending_claims = PendingMPPClaim {
channels_without_preimage: payment_claim.mpp_parts.clone(),
channels_with_preimage: Vec::new(),
};
let pending_claim_ptr_opt = Some(Arc::new(Mutex::new(pending_claims)));
// While it may be duplicative to generate a PaymentClaimed here, trying to
// figure out if the user definitely saw it before shutdown would require some
// nontrivial logic and may break as we move away from regularly persisting
// ChannelManager. Instead, we rely on the users' event handler being
// idempotent and just blindly generate one no matter what, letting the
// preimages eventually timing out from ChannelMonitors to prevent us from
// doing so forever.
let claim_found =
channel_manager.claimable_payments.lock().unwrap().begin_claiming_payment(
payment_hash, &channel_manager.node_signer, &channel_manager.logger,
&channel_manager.inbound_payment_id_secret, true,
);
if claim_found.is_err() {
let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap();
match claimable_payments.pending_claiming_payments.entry(payment_hash) {
hash_map::Entry::Occupied(_) => {
debug_assert!(false, "Entry was added in begin_claiming_payment");
return Err(DecodeError::InvalidValue);
},
hash_map::Entry::Vacant(entry) => {
entry.insert(payment_claim.claiming_payment);
},
}
}
for part in payment_claim.mpp_parts.iter() {
let pending_mpp_claim = pending_claim_ptr_opt.as_ref().map(|ptr| (
part.counterparty_node_id, part.channel_id, part.htlc_id,
PendingMPPClaimPointer(Arc::clone(&ptr))
));
let pending_claim_ptr = pending_claim_ptr_opt.as_ref().map(|ptr|
RAAMonitorUpdateBlockingAction::ClaimedMPPPayment {
pending_claim: PendingMPPClaimPointer(Arc::clone(&ptr)),
}
);
// Note that we don't need to pass the `payment_info` here - its
// already (clearly) durably on disk in the `ChannelMonitor` so there's
// no need to worry about getting it into others.
channel_manager.claim_mpp_part(
part.into(), payment_preimage, None,
|_, _|
(Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim }), pending_claim_ptr)
);
}
}
} else {
let per_peer_state = channel_manager.per_peer_state.read().unwrap(); let per_peer_state = channel_manager.per_peer_state.read().unwrap();
let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap(); let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap();
let payment = claimable_payments.claimable_payments.remove(&payment_hash); let payment = claimable_payments.claimable_payments.remove(&payment_hash);
@ -13194,6 +13303,7 @@ where
} }
} }
} }
}
for htlc_source in failed_htlcs.drain(..) { for htlc_source in failed_htlcs.drain(..) {
let (source, payment_hash, counterparty_node_id, channel_id) = htlc_source; let (source, payment_hash, counterparty_node_id, channel_id) = htlc_source;

View file

@ -878,27 +878,39 @@ fn do_test_partial_claim_before_restart(persist_both_monitors: bool) {
// Now restart nodes[3]. // Now restart nodes[3].
reload_node!(nodes[3], original_manager, &[&updated_monitor.0, &original_monitor.0], persister, new_chain_monitor, nodes_3_deserialized); reload_node!(nodes[3], original_manager, &[&updated_monitor.0, &original_monitor.0], persister, new_chain_monitor, nodes_3_deserialized);
// On startup the preimage should have been copied into the non-persisted monitor: // Until the startup background events are processed (in `get_and_clear_pending_events`,
// below), the preimage is not copied to the non-persisted monitor...
assert!(get_monitor!(nodes[3], chan_id_persisted).get_stored_preimages().contains_key(&payment_hash)); assert!(get_monitor!(nodes[3], chan_id_persisted).get_stored_preimages().contains_key(&payment_hash));
assert!(get_monitor!(nodes[3], chan_id_not_persisted).get_stored_preimages().contains_key(&payment_hash)); assert_eq!(
get_monitor!(nodes[3], chan_id_not_persisted).get_stored_preimages().contains_key(&payment_hash),
persist_both_monitors,
);
nodes[1].node.peer_disconnected(nodes[3].node.get_our_node_id()); nodes[1].node.peer_disconnected(nodes[3].node.get_our_node_id());
nodes[2].node.peer_disconnected(nodes[3].node.get_our_node_id()); nodes[2].node.peer_disconnected(nodes[3].node.get_our_node_id());
// During deserialization, we should have closed one channel and broadcast its latest // During deserialization, we should have closed one channel and broadcast its latest
// commitment transaction. We should also still have the original PaymentClaimable event we // commitment transaction. We should also still have the original PaymentClaimable event we
// never finished processing. // never finished processing as well as a PaymentClaimed event regenerated when we replayed the
// preimage onto the non-persisted monitor.
let events = nodes[3].node.get_and_clear_pending_events(); let events = nodes[3].node.get_and_clear_pending_events();
assert_eq!(events.len(), if persist_both_monitors { 4 } else { 3 }); assert_eq!(events.len(), if persist_both_monitors { 4 } else { 3 });
if let Event::PaymentClaimable { amount_msat: 15_000_000, .. } = events[0] { } else { panic!(); } if let Event::PaymentClaimable { amount_msat: 15_000_000, .. } = events[0] { } else { panic!(); }
if let Event::ChannelClosed { reason: ClosureReason::OutdatedChannelManager, .. } = events[1] { } else { panic!(); } if let Event::ChannelClosed { reason: ClosureReason::OutdatedChannelManager, .. } = events[1] { } else { panic!(); }
if persist_both_monitors { if persist_both_monitors {
if let Event::ChannelClosed { reason: ClosureReason::OutdatedChannelManager, .. } = events[2] { } else { panic!(); } if let Event::ChannelClosed { reason: ClosureReason::OutdatedChannelManager, .. } = events[2] { } else { panic!(); }
check_added_monitors(&nodes[3], 2); if let Event::PaymentClaimed { amount_msat: 15_000_000, .. } = events[3] { } else { panic!(); }
check_added_monitors(&nodes[3], 6);
} else { } else {
check_added_monitors(&nodes[3], 1); if let Event::PaymentClaimed { amount_msat: 15_000_000, .. } = events[2] { } else { panic!(); }
check_added_monitors(&nodes[3], 3);
} }
// Now that we've processed background events, the preimage should have been copied into the
// non-persisted monitor:
assert!(get_monitor!(nodes[3], chan_id_persisted).get_stored_preimages().contains_key(&payment_hash));
assert!(get_monitor!(nodes[3], chan_id_not_persisted).get_stored_preimages().contains_key(&payment_hash));
// On restart, we should also get a duplicate PaymentClaimed event as we persisted the // On restart, we should also get a duplicate PaymentClaimed event as we persisted the
// ChannelManager prior to handling the original one. // ChannelManager prior to handling the original one.
if let Event::PaymentClaimed { payment_hash: our_payment_hash, amount_msat: 15_000_000, .. } = if let Event::PaymentClaimed { payment_hash: our_payment_hash, amount_msat: 15_000_000, .. } =
@ -948,6 +960,11 @@ fn do_test_partial_claim_before_restart(persist_both_monitors: bool) {
nodes[0].node.handle_update_fulfill_htlc(nodes[2].node.get_our_node_id(), &cs_updates.update_fulfill_htlcs[0]); nodes[0].node.handle_update_fulfill_htlc(nodes[2].node.get_our_node_id(), &cs_updates.update_fulfill_htlcs[0]);
commitment_signed_dance!(nodes[0], nodes[2], cs_updates.commitment_signed, false, true); commitment_signed_dance!(nodes[0], nodes[2], cs_updates.commitment_signed, false, true);
expect_payment_sent!(nodes[0], payment_preimage); expect_payment_sent!(nodes[0], payment_preimage);
// Ensure that the remaining channel is fully operation and not blocked (and that after a
// cycle of commitment updates the payment preimage is ultimately pruned).
send_payment(&nodes[0], &[&nodes[2], &nodes[3]], 100_000);
assert!(!get_monitor!(nodes[3], chan_id_not_persisted).get_stored_preimages().contains_key(&payment_hash));
} }
} }