Merge pull request #2845 from wpaulino/decode-htlc-onion-when-committed

Support decoding HTLC onions once fully committed
This commit is contained in:
Matt Corallo 2024-03-28 20:44:36 +00:00 committed by GitHub
commit 9cc0e9816a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 760 additions and 364 deletions

View file

@ -151,7 +151,7 @@ mod tests {
let writeable_len = $obj.serialized_length() as u64 + 16;
let write_adapter = ChaChaPolyWriteAdapter::new(rho, &$obj);
let encrypted_writeable_bytes = write_adapter.encode();
let encrypted_writeable = &encrypted_writeable_bytes[..];
let encrypted_writeable = &mut &encrypted_writeable_bytes[..];
// Now deserialize the object back and make sure it matches the original.
let mut rd = FixedLengthReader::new(encrypted_writeable, writeable_len);

View file

@ -302,6 +302,8 @@ pub enum HTLCDestination {
/// Short channel id we are requesting to forward an HTLC to.
requested_forward_scid: u64
},
/// We couldn't decode the incoming onion to obtain the forwarding details.
InvalidOnion,
/// Failure scenario where an HTLC may have been forwarded to be intended for us,
/// but is invalid for some reason, so we reject it.
///
@ -329,6 +331,7 @@ impl_writeable_tlv_based_enum_upgradable!(HTLCDestination,
(2, UnknownNextHop) => {
(0, requested_forward_scid, required),
},
(3, InvalidOnion) => {},
(4, FailedPayment) => {
(0, payment_hash, required),
},
@ -1294,7 +1297,7 @@ impl MaybeReadable for Event {
// Note that we do not write a length-prefixed TLV for FundingGenerationReady events.
0u8 => Ok(None),
1u8 => {
let f = || {
let mut f = || {
let mut payment_hash = PaymentHash([0; 32]);
let mut payment_preimage = None;
let mut payment_secret = None;
@ -1342,7 +1345,7 @@ impl MaybeReadable for Event {
f()
},
2u8 => {
let f = || {
let mut f = || {
let mut payment_preimage = PaymentPreimage([0; 32]);
let mut payment_hash = None;
let mut payment_id = None;
@ -1366,7 +1369,7 @@ impl MaybeReadable for Event {
f()
},
3u8 => {
let f = || {
let mut f = || {
#[cfg(test)]
let error_code = Readable::read(reader)?;
#[cfg(test)]
@ -1409,7 +1412,7 @@ impl MaybeReadable for Event {
},
4u8 => Ok(None),
5u8 => {
let f = || {
let mut f = || {
let mut outputs = WithoutLength(Vec::new());
let mut channel_id: Option<ChannelId> = None;
read_tlv_fields!(reader, {
@ -1445,7 +1448,7 @@ impl MaybeReadable for Event {
}))
},
7u8 => {
let f = || {
let mut f = || {
let mut prev_channel_id = None;
let mut next_channel_id = None;
let mut prev_user_channel_id = None;
@ -1473,7 +1476,7 @@ impl MaybeReadable for Event {
f()
},
9u8 => {
let f = || {
let mut f = || {
let mut channel_id = ChannelId::new_zero();
let mut reason = UpgradableRequired(None);
let mut user_channel_id_low_opt: Option<u64> = None;
@ -1503,7 +1506,7 @@ impl MaybeReadable for Event {
f()
},
11u8 => {
let f = || {
let mut f = || {
let mut channel_id = ChannelId::new_zero();
let mut transaction = Transaction{ version: 2, lock_time: LockTime::ZERO, input: Vec::new(), output: Vec::new() };
read_tlv_fields!(reader, {
@ -1515,7 +1518,7 @@ impl MaybeReadable for Event {
f()
},
13u8 => {
let f = || {
let mut f = || {
_init_and_read_len_prefixed_tlv_fields!(reader, {
(0, payment_id, required),
(2, payment_hash, option),
@ -1531,7 +1534,7 @@ impl MaybeReadable for Event {
f()
},
15u8 => {
let f = || {
let mut f = || {
let mut payment_hash = PaymentHash([0; 32]);
let mut payment_id = PaymentId([0; 32]);
let mut reason = None;
@ -1553,7 +1556,7 @@ impl MaybeReadable for Event {
Ok(None)
},
19u8 => {
let f = || {
let mut f = || {
let mut payment_hash = PaymentHash([0; 32]);
let mut purpose = UpgradableRequired(None);
let mut amount_msat = 0;
@ -1580,7 +1583,7 @@ impl MaybeReadable for Event {
f()
},
21u8 => {
let f = || {
let mut f = || {
_init_and_read_len_prefixed_tlv_fields!(reader, {
(0, payment_id, required),
(2, payment_hash, required),
@ -1596,7 +1599,7 @@ impl MaybeReadable for Event {
f()
},
23u8 => {
let f = || {
let mut f = || {
_init_and_read_len_prefixed_tlv_fields!(reader, {
(0, payment_id, required),
(2, payment_hash, required),
@ -1614,7 +1617,7 @@ impl MaybeReadable for Event {
f()
},
25u8 => {
let f = || {
let mut f = || {
let mut prev_channel_id = ChannelId::new_zero();
let mut failed_next_destination_opt = UpgradableRequired(None);
read_tlv_fields!(reader, {
@ -1630,7 +1633,7 @@ impl MaybeReadable for Event {
},
27u8 => Ok(None),
29u8 => {
let f = || {
let mut f = || {
let mut channel_id = ChannelId::new_zero();
let mut user_channel_id: u128 = 0;
let mut counterparty_node_id = RequiredWrapper(None);
@ -1652,7 +1655,7 @@ impl MaybeReadable for Event {
f()
},
31u8 => {
let f = || {
let mut f = || {
let mut channel_id = ChannelId::new_zero();
let mut user_channel_id: u128 = 0;
let mut former_temporary_channel_id = None;
@ -1680,7 +1683,7 @@ impl MaybeReadable for Event {
f()
},
33u8 => {
let f = || {
let mut f = || {
_init_and_read_len_prefixed_tlv_fields!(reader, {
(0, payment_id, required),
});

View file

@ -104,10 +104,38 @@ enum InboundHTLCRemovalReason {
Fulfill(PaymentPreimage),
}
/// Represents the resolution status of an inbound HTLC.
#[derive(Clone)]
enum InboundHTLCResolution {
/// Resolved implies the action we must take with the inbound HTLC has already been determined,
/// i.e., we already know whether it must be failed back or forwarded.
//
// TODO: Once this variant is removed, we should also clean up
// [`MonitorRestoreUpdates::accepted_htlcs`] as the path will be unreachable.
Resolved {
pending_htlc_status: PendingHTLCStatus,
},
/// Pending implies we will attempt to resolve the inbound HTLC once it has been fully committed
/// to by both sides of the channel, i.e., once a `revoke_and_ack` has been processed by both
/// nodes for the state update in which it was proposed.
Pending {
update_add_htlc: msgs::UpdateAddHTLC,
},
}
impl_writeable_tlv_based_enum!(InboundHTLCResolution,
(0, Resolved) => {
(0, pending_htlc_status, required),
},
(2, Pending) => {
(0, update_add_htlc, required),
};
);
enum InboundHTLCState {
/// Offered by remote, to be included in next local commitment tx. I.e., the remote sent an
/// update_add_htlc message for this HTLC.
RemoteAnnounced(PendingHTLCStatus),
RemoteAnnounced(InboundHTLCResolution),
/// Included in a received commitment_signed message (implying we've
/// revoke_and_ack'd it), but the remote hasn't yet revoked their previous
/// state (see the example below). We have not yet included this HTLC in a
@ -137,13 +165,13 @@ enum InboundHTLCState {
/// Implies AwaitingRemoteRevoke.
///
/// [BOLT #2]: https://github.com/lightning/bolts/blob/master/02-peer-protocol.md
AwaitingRemoteRevokeToAnnounce(PendingHTLCStatus),
AwaitingRemoteRevokeToAnnounce(InboundHTLCResolution),
/// Included in a received commitment_signed message (implying we've revoke_and_ack'd it).
/// We have also included this HTLC in our latest commitment_signed and are now just waiting
/// on the remote's revoke_and_ack to make this HTLC an irrevocable part of the state of the
/// channel (before it can then get forwarded and/or removed).
/// Implies AwaitingRemoteRevoke.
AwaitingAnnouncedRemoteRevoke(PendingHTLCStatus),
AwaitingAnnouncedRemoteRevoke(InboundHTLCResolution),
Committed,
/// Removed by us and a new commitment_signed was sent (if we were AwaitingRemoteRevoke when we
/// created it we would have put it in the holding cell instead). When they next revoke_and_ack
@ -1044,6 +1072,7 @@ pub(super) struct MonitorRestoreUpdates {
pub accepted_htlcs: Vec<(PendingHTLCInfo, u64)>,
pub failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
pub finalized_claimed_htlcs: Vec<HTLCSource>,
pub pending_update_adds: Vec<msgs::UpdateAddHTLC>,
pub funding_broadcastable: Option<Transaction>,
pub channel_ready: Option<msgs::ChannelReady>,
pub announcement_sigs: Option<msgs::AnnouncementSignatures>,
@ -1291,6 +1320,7 @@ pub(super) struct ChannelContext<SP: Deref> where SP::Target: SignerProvider {
monitor_pending_forwards: Vec<(PendingHTLCInfo, u64)>,
monitor_pending_failures: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
monitor_pending_finalized_fulfills: Vec<HTLCSource>,
monitor_pending_update_adds: Vec<msgs::UpdateAddHTLC>,
/// If we went to send a commitment update (ie some messages then [`msgs::CommitmentSigned`])
/// but our signer (initially) refused to give us a signature, we should retry at some point in
@ -1755,6 +1785,7 @@ impl<SP: Deref> ChannelContext<SP> where SP::Target: SignerProvider {
monitor_pending_forwards: Vec::new(),
monitor_pending_failures: Vec::new(),
monitor_pending_finalized_fulfills: Vec::new(),
monitor_pending_update_adds: Vec::new(),
signer_pending_commitment_update: false,
signer_pending_funding: false,
@ -1976,6 +2007,7 @@ impl<SP: Deref> ChannelContext<SP> where SP::Target: SignerProvider {
monitor_pending_forwards: Vec::new(),
monitor_pending_failures: Vec::new(),
monitor_pending_finalized_fulfills: Vec::new(),
monitor_pending_update_adds: Vec::new(),
signer_pending_commitment_update: false,
signer_pending_funding: false,
@ -4089,20 +4121,12 @@ impl<SP: Deref> Channel<SP> where
Ok(self.get_announcement_sigs(node_signer, chain_hash, user_config, best_block.height, logger))
}
pub fn update_add_htlc<F, FE: Deref, L: Deref>(
&mut self, msg: &msgs::UpdateAddHTLC, mut pending_forward_status: PendingHTLCStatus,
create_pending_htlc_status: F, fee_estimator: &LowerBoundedFeeEstimator<FE>, logger: &L
) -> Result<(), ChannelError>
where F: for<'a> Fn(&'a Self, PendingHTLCStatus, u16) -> PendingHTLCStatus,
FE::Target: FeeEstimator, L::Target: Logger,
{
pub fn update_add_htlc(
&mut self, msg: &msgs::UpdateAddHTLC, pending_forward_status: PendingHTLCStatus,
) -> Result<(), ChannelError> {
if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) {
return Err(ChannelError::Close("Got add HTLC message when channel was not in an operational state".to_owned()));
}
// We can't accept HTLCs sent after we've sent a shutdown.
if self.context.channel_state.is_local_shutdown_sent() {
pending_forward_status = create_pending_htlc_status(self, pending_forward_status, 0x4000|8);
}
// If the remote has sent a shutdown prior to adding this HTLC, then they are in violation of the spec.
if self.context.channel_state.is_remote_shutdown_sent() {
return Err(ChannelError::Close("Got add HTLC message when channel was not in an operational state".to_owned()));
@ -4121,7 +4145,6 @@ impl<SP: Deref> Channel<SP> where
}
let inbound_stats = self.context.get_inbound_pending_htlc_stats(None);
let outbound_stats = self.context.get_outbound_pending_htlc_stats(None);
if inbound_stats.pending_htlcs + 1 > self.context.holder_max_accepted_htlcs as u32 {
return Err(ChannelError::Close(format!("Remote tried to push more than our max accepted HTLCs ({})", self.context.holder_max_accepted_htlcs)));
}
@ -4150,34 +4173,6 @@ impl<SP: Deref> Channel<SP> where
}
}
let max_dust_htlc_exposure_msat = self.context.get_max_dust_htlc_exposure_msat(fee_estimator);
let (htlc_timeout_dust_limit, htlc_success_dust_limit) = if self.context.get_channel_type().supports_anchors_zero_fee_htlc_tx() {
(0, 0)
} else {
let dust_buffer_feerate = self.context.get_dust_buffer_feerate(None) as u64;
(dust_buffer_feerate * htlc_timeout_tx_weight(self.context.get_channel_type()) / 1000,
dust_buffer_feerate * htlc_success_tx_weight(self.context.get_channel_type()) / 1000)
};
let exposure_dust_limit_timeout_sats = htlc_timeout_dust_limit + self.context.counterparty_dust_limit_satoshis;
if msg.amount_msat / 1000 < exposure_dust_limit_timeout_sats {
let on_counterparty_tx_dust_htlc_exposure_msat = inbound_stats.on_counterparty_tx_dust_exposure_msat + outbound_stats.on_counterparty_tx_dust_exposure_msat + msg.amount_msat;
if on_counterparty_tx_dust_htlc_exposure_msat > max_dust_htlc_exposure_msat {
log_info!(logger, "Cannot accept value that would put our exposure to dust HTLCs at {} over the limit {} on counterparty commitment tx",
on_counterparty_tx_dust_htlc_exposure_msat, max_dust_htlc_exposure_msat);
pending_forward_status = create_pending_htlc_status(self, pending_forward_status, 0x1000|7);
}
}
let exposure_dust_limit_success_sats = htlc_success_dust_limit + self.context.holder_dust_limit_satoshis;
if msg.amount_msat / 1000 < exposure_dust_limit_success_sats {
let on_holder_tx_dust_htlc_exposure_msat = inbound_stats.on_holder_tx_dust_exposure_msat + outbound_stats.on_holder_tx_dust_exposure_msat + msg.amount_msat;
if on_holder_tx_dust_htlc_exposure_msat > max_dust_htlc_exposure_msat {
log_info!(logger, "Cannot accept value that would put our exposure to dust HTLCs at {} over the limit {} on holder commitment tx",
on_holder_tx_dust_htlc_exposure_msat, max_dust_htlc_exposure_msat);
pending_forward_status = create_pending_htlc_status(self, pending_forward_status, 0x1000|7);
}
}
let pending_value_to_self_msat =
self.context.value_to_self_msat + inbound_stats.pending_htlcs_value_msat - removed_outbound_total_msat;
let pending_remote_value_msat =
@ -4211,23 +4206,7 @@ impl<SP: Deref> Channel<SP> where
} else {
0
};
if !self.context.is_outbound() {
// `Some(())` is for the fee spike buffer we keep for the remote. This deviates from
// the spec because the fee spike buffer requirement doesn't exist on the receiver's
// side, only on the sender's. Note that with anchor outputs we are no longer as
// sensitive to fee spikes, so we need to account for them.
let htlc_candidate = HTLCCandidate::new(msg.amount_msat, HTLCInitiator::RemoteOffered);
let mut remote_fee_cost_incl_stuck_buffer_msat = self.context.next_remote_commit_tx_fee_msat(htlc_candidate, Some(()));
if !self.context.get_channel_type().supports_anchors_zero_fee_htlc_tx() {
remote_fee_cost_incl_stuck_buffer_msat *= FEE_SPIKE_BUFFER_FEE_INCREASE_MULTIPLE;
}
if pending_remote_value_msat.saturating_sub(msg.amount_msat).saturating_sub(self.context.holder_selected_channel_reserve_satoshis * 1000).saturating_sub(anchor_outputs_value_msat) < remote_fee_cost_incl_stuck_buffer_msat {
// Note that if the pending_forward_status is not updated here, then it's because we're already failing
// the HTLC, i.e. its status is already set to failing.
log_info!(logger, "Attempting to fail HTLC due to fee spike buffer violation in channel {}. Rebalancing is required.", &self.context.channel_id());
pending_forward_status = create_pending_htlc_status(self, pending_forward_status, 0x1000|7);
}
} else {
if self.context.is_outbound() {
// Check that they won't violate our local required channel reserve by adding this HTLC.
let htlc_candidate = HTLCCandidate::new(msg.amount_msat, HTLCInitiator::RemoteOffered);
let local_commit_tx_fee_msat = self.context.next_local_commit_tx_fee_msat(htlc_candidate, None);
@ -4255,7 +4234,9 @@ impl<SP: Deref> Channel<SP> where
amount_msat: msg.amount_msat,
payment_hash: msg.payment_hash,
cltv_expiry: msg.cltv_expiry,
state: InboundHTLCState::RemoteAnnounced(pending_forward_status),
state: InboundHTLCState::RemoteAnnounced(InboundHTLCResolution::Resolved {
pending_htlc_status: pending_forward_status
}),
});
Ok(())
}
@ -4461,13 +4442,13 @@ impl<SP: Deref> Channel<SP> where
}
for htlc in self.context.pending_inbound_htlcs.iter_mut() {
let new_forward = if let &InboundHTLCState::RemoteAnnounced(ref forward_info) = &htlc.state {
Some(forward_info.clone())
let htlc_resolution = if let &InboundHTLCState::RemoteAnnounced(ref resolution) = &htlc.state {
Some(resolution.clone())
} else { None };
if let Some(forward_info) = new_forward {
if let Some(htlc_resolution) = htlc_resolution {
log_trace!(logger, "Updating HTLC {} to AwaitingRemoteRevokeToAnnounce due to commitment_signed in channel {}.",
&htlc.payment_hash, &self.context.channel_id);
htlc.state = InboundHTLCState::AwaitingRemoteRevokeToAnnounce(forward_info);
htlc.state = InboundHTLCState::AwaitingRemoteRevokeToAnnounce(htlc_resolution);
need_commitment = true;
}
}
@ -4777,6 +4758,7 @@ impl<SP: Deref> Channel<SP> where
log_trace!(logger, "Updating HTLCs on receipt of RAA in channel {}...", &self.context.channel_id());
let mut to_forward_infos = Vec::new();
let mut pending_update_adds = Vec::new();
let mut revoked_htlcs = Vec::new();
let mut finalized_claimed_htlcs = Vec::new();
let mut update_fail_htlcs = Vec::new();
@ -4824,29 +4806,37 @@ impl<SP: Deref> Channel<SP> where
let mut state = InboundHTLCState::Committed;
mem::swap(&mut state, &mut htlc.state);
if let InboundHTLCState::AwaitingRemoteRevokeToAnnounce(forward_info) = state {
if let InboundHTLCState::AwaitingRemoteRevokeToAnnounce(resolution) = state {
log_trace!(logger, " ...promoting inbound AwaitingRemoteRevokeToAnnounce {} to AwaitingAnnouncedRemoteRevoke", &htlc.payment_hash);
htlc.state = InboundHTLCState::AwaitingAnnouncedRemoteRevoke(forward_info);
htlc.state = InboundHTLCState::AwaitingAnnouncedRemoteRevoke(resolution);
require_commitment = true;
} else if let InboundHTLCState::AwaitingAnnouncedRemoteRevoke(forward_info) = state {
match forward_info {
PendingHTLCStatus::Fail(fail_msg) => {
log_trace!(logger, " ...promoting inbound AwaitingAnnouncedRemoteRevoke {} to LocalRemoved due to PendingHTLCStatus indicating failure", &htlc.payment_hash);
require_commitment = true;
match fail_msg {
HTLCFailureMsg::Relay(msg) => {
htlc.state = InboundHTLCState::LocalRemoved(InboundHTLCRemovalReason::FailRelay(msg.reason.clone()));
update_fail_htlcs.push(msg)
},
HTLCFailureMsg::Malformed(msg) => {
htlc.state = InboundHTLCState::LocalRemoved(InboundHTLCRemovalReason::FailMalformed((msg.sha256_of_onion, msg.failure_code)));
update_fail_malformed_htlcs.push(msg)
} else if let InboundHTLCState::AwaitingAnnouncedRemoteRevoke(resolution) = state {
match resolution {
InboundHTLCResolution::Resolved { pending_htlc_status } =>
match pending_htlc_status {
PendingHTLCStatus::Fail(fail_msg) => {
log_trace!(logger, " ...promoting inbound AwaitingAnnouncedRemoteRevoke {} to LocalRemoved due to PendingHTLCStatus indicating failure", &htlc.payment_hash);
require_commitment = true;
match fail_msg {
HTLCFailureMsg::Relay(msg) => {
htlc.state = InboundHTLCState::LocalRemoved(InboundHTLCRemovalReason::FailRelay(msg.reason.clone()));
update_fail_htlcs.push(msg)
},
HTLCFailureMsg::Malformed(msg) => {
htlc.state = InboundHTLCState::LocalRemoved(InboundHTLCRemovalReason::FailMalformed((msg.sha256_of_onion, msg.failure_code)));
update_fail_malformed_htlcs.push(msg)
},
}
},
PendingHTLCStatus::Forward(forward_info) => {
log_trace!(logger, " ...promoting inbound AwaitingAnnouncedRemoteRevoke {} to Committed, attempting to forward", &htlc.payment_hash);
to_forward_infos.push((forward_info, htlc.htlc_id));
htlc.state = InboundHTLCState::Committed;
}
}
},
PendingHTLCStatus::Forward(forward_info) => {
InboundHTLCResolution::Pending { update_add_htlc } => {
log_trace!(logger, " ...promoting inbound AwaitingAnnouncedRemoteRevoke {} to Committed", &htlc.payment_hash);
to_forward_infos.push((forward_info, htlc.htlc_id));
pending_update_adds.push(update_add_htlc);
htlc.state = InboundHTLCState::Committed;
}
}
@ -4907,6 +4897,8 @@ impl<SP: Deref> Channel<SP> where
}
}
self.context.monitor_pending_update_adds.append(&mut pending_update_adds);
if self.context.channel_state.is_monitor_update_in_progress() {
// We can't actually generate a new commitment transaction (incl by freeing holding
// cells) while we can't update the monitor, so we just return what we have.
@ -5207,13 +5199,16 @@ impl<SP: Deref> Channel<SP> where
mem::swap(&mut failed_htlcs, &mut self.context.monitor_pending_failures);
let mut finalized_claimed_htlcs = Vec::new();
mem::swap(&mut finalized_claimed_htlcs, &mut self.context.monitor_pending_finalized_fulfills);
let mut pending_update_adds = Vec::new();
mem::swap(&mut pending_update_adds, &mut self.context.monitor_pending_update_adds);
if self.context.channel_state.is_peer_disconnected() {
self.context.monitor_pending_revoke_and_ack = false;
self.context.monitor_pending_commitment_signed = false;
return MonitorRestoreUpdates {
raa: None, commitment_update: None, order: RAACommitmentOrder::RevokeAndACKFirst,
accepted_htlcs, failed_htlcs, finalized_claimed_htlcs, funding_broadcastable, channel_ready, announcement_sigs
accepted_htlcs, failed_htlcs, finalized_claimed_htlcs, pending_update_adds,
funding_broadcastable, channel_ready, announcement_sigs
};
}
@ -5235,7 +5230,8 @@ impl<SP: Deref> Channel<SP> where
if commitment_update.is_some() { "a" } else { "no" }, if raa.is_some() { "an" } else { "no" },
match order { RAACommitmentOrder::CommitmentFirst => "commitment", RAACommitmentOrder::RevokeAndACKFirst => "RAA"});
MonitorRestoreUpdates {
raa, commitment_update, order, accepted_htlcs, failed_htlcs, finalized_claimed_htlcs, funding_broadcastable, channel_ready, announcement_sigs
raa, commitment_update, order, accepted_htlcs, failed_htlcs, finalized_claimed_htlcs,
pending_update_adds, funding_broadcastable, channel_ready, announcement_sigs
}
}
@ -6099,6 +6095,86 @@ impl<SP: Deref> Channel<SP> where
})
}
pub fn can_accept_incoming_htlc<F: Deref, L: Deref>(
&self, msg: &msgs::UpdateAddHTLC, fee_estimator: &LowerBoundedFeeEstimator<F>, logger: L
) -> Result<(), (&'static str, u16)>
where
F::Target: FeeEstimator,
L::Target: Logger
{
if self.context.channel_state.is_local_shutdown_sent() {
return Err(("Shutdown was already sent", 0x4000|8))
}
let inbound_stats = self.context.get_inbound_pending_htlc_stats(None);
let outbound_stats = self.context.get_outbound_pending_htlc_stats(None);
let max_dust_htlc_exposure_msat = self.context.get_max_dust_htlc_exposure_msat(fee_estimator);
let (htlc_timeout_dust_limit, htlc_success_dust_limit) = if self.context.get_channel_type().supports_anchors_zero_fee_htlc_tx() {
(0, 0)
} else {
let dust_buffer_feerate = self.context.get_dust_buffer_feerate(None) as u64;
(dust_buffer_feerate * htlc_timeout_tx_weight(self.context.get_channel_type()) / 1000,
dust_buffer_feerate * htlc_success_tx_weight(self.context.get_channel_type()) / 1000)
};
let exposure_dust_limit_timeout_sats = htlc_timeout_dust_limit + self.context.counterparty_dust_limit_satoshis;
if msg.amount_msat / 1000 < exposure_dust_limit_timeout_sats {
let on_counterparty_tx_dust_htlc_exposure_msat = inbound_stats.on_counterparty_tx_dust_exposure_msat + outbound_stats.on_counterparty_tx_dust_exposure_msat + msg.amount_msat;
if on_counterparty_tx_dust_htlc_exposure_msat > max_dust_htlc_exposure_msat {
log_info!(logger, "Cannot accept value that would put our exposure to dust HTLCs at {} over the limit {} on counterparty commitment tx",
on_counterparty_tx_dust_htlc_exposure_msat, max_dust_htlc_exposure_msat);
return Err(("Exceeded our dust exposure limit on counterparty commitment tx", 0x1000|7))
}
}
let exposure_dust_limit_success_sats = htlc_success_dust_limit + self.context.holder_dust_limit_satoshis;
if msg.amount_msat / 1000 < exposure_dust_limit_success_sats {
let on_holder_tx_dust_htlc_exposure_msat = inbound_stats.on_holder_tx_dust_exposure_msat + outbound_stats.on_holder_tx_dust_exposure_msat + msg.amount_msat;
if on_holder_tx_dust_htlc_exposure_msat > max_dust_htlc_exposure_msat {
log_info!(logger, "Cannot accept value that would put our exposure to dust HTLCs at {} over the limit {} on holder commitment tx",
on_holder_tx_dust_htlc_exposure_msat, max_dust_htlc_exposure_msat);
return Err(("Exceeded our dust exposure limit on holder commitment tx", 0x1000|7))
}
}
let anchor_outputs_value_msat = if self.context.get_channel_type().supports_anchors_zero_fee_htlc_tx() {
ANCHOR_OUTPUT_VALUE_SATOSHI * 2 * 1000
} else {
0
};
let mut removed_outbound_total_msat = 0;
for ref htlc in self.context.pending_outbound_htlcs.iter() {
if let OutboundHTLCState::AwaitingRemoteRevokeToRemove(OutboundHTLCOutcome::Success(_)) = htlc.state {
removed_outbound_total_msat += htlc.amount_msat;
} else if let OutboundHTLCState::AwaitingRemovedRemoteRevoke(OutboundHTLCOutcome::Success(_)) = htlc.state {
removed_outbound_total_msat += htlc.amount_msat;
}
}
let pending_value_to_self_msat =
self.context.value_to_self_msat + inbound_stats.pending_htlcs_value_msat - removed_outbound_total_msat;
let pending_remote_value_msat =
self.context.channel_value_satoshis * 1000 - pending_value_to_self_msat;
if !self.context.is_outbound() {
// `Some(())` is for the fee spike buffer we keep for the remote. This deviates from
// the spec because the fee spike buffer requirement doesn't exist on the receiver's
// side, only on the sender's. Note that with anchor outputs we are no longer as
// sensitive to fee spikes, so we need to account for them.
let htlc_candidate = HTLCCandidate::new(msg.amount_msat, HTLCInitiator::RemoteOffered);
let mut remote_fee_cost_incl_stuck_buffer_msat = self.context.next_remote_commit_tx_fee_msat(htlc_candidate, Some(()));
if !self.context.get_channel_type().supports_anchors_zero_fee_htlc_tx() {
remote_fee_cost_incl_stuck_buffer_msat *= FEE_SPIKE_BUFFER_FEE_INCREASE_MULTIPLE;
}
if pending_remote_value_msat.saturating_sub(msg.amount_msat).saturating_sub(self.context.holder_selected_channel_reserve_satoshis * 1000).saturating_sub(anchor_outputs_value_msat) < remote_fee_cost_incl_stuck_buffer_msat {
log_info!(logger, "Attempting to fail HTLC due to fee spike buffer violation in channel {}. Rebalancing is required.", &self.context.channel_id());
return Err(("Fee spike buffer violation", 0x1000|7));
}
}
Ok(())
}
pub fn get_cur_holder_commitment_transaction_number(&self) -> u64 {
self.context.cur_holder_commitment_transaction_number + 1
}
@ -8232,7 +8308,7 @@ fn get_initial_channel_type(config: &UserConfig, their_features: &InitFeatures)
ret
}
const SERIALIZATION_VERSION: u8 = 3;
const SERIALIZATION_VERSION: u8 = 4;
const MIN_SERIALIZATION_VERSION: u8 = 3;
impl_writeable_tlv_based_enum!(InboundHTLCRemovalReason,;
@ -8294,7 +8370,18 @@ impl<SP: Deref> Writeable for Channel<SP> where SP::Target: SignerProvider {
// Note that we write out as if remove_uncommitted_htlcs_and_mark_paused had just been
// called.
write_ver_prefix!(writer, MIN_SERIALIZATION_VERSION, MIN_SERIALIZATION_VERSION);
let version_to_write = if self.context.pending_inbound_htlcs.iter().any(|htlc| match htlc.state {
InboundHTLCState::AwaitingRemoteRevokeToAnnounce(ref htlc_resolution)|
InboundHTLCState::AwaitingAnnouncedRemoteRevoke(ref htlc_resolution) => {
matches!(htlc_resolution, InboundHTLCResolution::Pending { .. })
},
_ => false,
}) {
SERIALIZATION_VERSION
} else {
MIN_SERIALIZATION_VERSION
};
write_ver_prefix!(writer, version_to_write, MIN_SERIALIZATION_VERSION);
// `user_id` used to be a single u64 value. In order to remain backwards compatible with
// versions prior to 0.0.113, the u128 is serialized as two separate u64 values. We write
@ -8350,13 +8437,29 @@ impl<SP: Deref> Writeable for Channel<SP> where SP::Target: SignerProvider {
htlc.payment_hash.write(writer)?;
match &htlc.state {
&InboundHTLCState::RemoteAnnounced(_) => unreachable!(),
&InboundHTLCState::AwaitingRemoteRevokeToAnnounce(ref htlc_state) => {
&InboundHTLCState::AwaitingRemoteRevokeToAnnounce(ref htlc_resolution) => {
1u8.write(writer)?;
htlc_state.write(writer)?;
if version_to_write <= 3 {
if let InboundHTLCResolution::Resolved { pending_htlc_status } = htlc_resolution {
pending_htlc_status.write(writer)?;
} else {
panic!();
}
} else {
htlc_resolution.write(writer)?;
}
},
&InboundHTLCState::AwaitingAnnouncedRemoteRevoke(ref htlc_state) => {
&InboundHTLCState::AwaitingAnnouncedRemoteRevoke(ref htlc_resolution) => {
2u8.write(writer)?;
htlc_state.write(writer)?;
if version_to_write <= 3 {
if let InboundHTLCResolution::Resolved { pending_htlc_status } = htlc_resolution {
pending_htlc_status.write(writer)?;
} else {
panic!();
}
} else {
htlc_resolution.write(writer)?;
}
},
&InboundHTLCState::Committed => {
3u8.write(writer)?;
@ -8582,6 +8685,11 @@ impl<SP: Deref> Writeable for Channel<SP> where SP::Target: SignerProvider {
let holder_max_accepted_htlcs = if self.context.holder_max_accepted_htlcs == DEFAULT_MAX_HTLCS { None } else { Some(self.context.holder_max_accepted_htlcs) };
let mut monitor_pending_update_adds = None;
if !self.context.monitor_pending_update_adds.is_empty() {
monitor_pending_update_adds = Some(&self.context.monitor_pending_update_adds);
}
write_tlv_fields!(writer, {
(0, self.context.announcement_sigs, option),
// minimum_depth and counterparty_selected_channel_reserve_satoshis used to have a
@ -8599,6 +8707,7 @@ impl<SP: Deref> Writeable for Channel<SP> where SP::Target: SignerProvider {
(7, self.context.shutdown_scriptpubkey, option),
(8, self.context.blocked_monitor_updates, optional_vec),
(9, self.context.target_closing_feerate_sats_per_kw, option),
(10, monitor_pending_update_adds, option), // Added in 0.0.122
(11, self.context.monitor_pending_finalized_fulfills, required_vec),
(13, self.context.channel_creation_height, required),
(15, preimages, required_vec),
@ -8693,8 +8802,22 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
cltv_expiry: Readable::read(reader)?,
payment_hash: Readable::read(reader)?,
state: match <u8 as Readable>::read(reader)? {
1 => InboundHTLCState::AwaitingRemoteRevokeToAnnounce(Readable::read(reader)?),
2 => InboundHTLCState::AwaitingAnnouncedRemoteRevoke(Readable::read(reader)?),
1 => {
let resolution = if ver <= 3 {
InboundHTLCResolution::Resolved { pending_htlc_status: Readable::read(reader)? }
} else {
Readable::read(reader)?
};
InboundHTLCState::AwaitingRemoteRevokeToAnnounce(resolution)
},
2 => {
let resolution = if ver <= 3 {
InboundHTLCResolution::Resolved { pending_htlc_status: Readable::read(reader)? }
} else {
Readable::read(reader)?
};
InboundHTLCState::AwaitingAnnouncedRemoteRevoke(resolution)
},
3 => InboundHTLCState::Committed,
4 => InboundHTLCState::LocalRemoved(Readable::read(reader)?),
_ => return Err(DecodeError::InvalidValue),
@ -8911,6 +9034,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
let mut holding_cell_blinding_points_opt: Option<Vec<Option<PublicKey>>> = None;
let mut malformed_htlcs: Option<Vec<(u64, u16, [u8; 32])>> = None;
let mut monitor_pending_update_adds: Option<Vec<msgs::UpdateAddHTLC>> = None;
read_tlv_fields!(reader, {
(0, announcement_sigs, option),
@ -8923,6 +9047,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
(7, shutdown_scriptpubkey, option),
(8, blocked_monitor_updates, optional_vec),
(9, target_closing_feerate_sats_per_kw, option),
(10, monitor_pending_update_adds, option), // Added in 0.0.122
(11, monitor_pending_finalized_fulfills, optional_vec),
(13, channel_creation_height, option),
(15, preimages_opt, optional_vec),
@ -9094,6 +9219,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
monitor_pending_forwards,
monitor_pending_failures,
monitor_pending_finalized_fulfills: monitor_pending_finalized_fulfills.unwrap(),
monitor_pending_update_adds: monitor_pending_update_adds.unwrap_or(Vec::new()),
signer_pending_commitment_update: false,
signer_pending_funding: false,

View file

@ -1181,6 +1181,8 @@ where
// | |
// | |__`pending_intercepted_htlcs`
// |
// |__`decode_update_add_htlcs`
// |
// |__`per_peer_state`
// |
// |__`pending_inbound_payments`
@ -1271,6 +1273,18 @@ where
/// See `ChannelManager` struct-level documentation for lock order requirements.
pending_intercepted_htlcs: Mutex<HashMap<InterceptId, PendingAddHTLCInfo>>,
/// SCID/SCID Alias -> pending `update_add_htlc`s to decode.
///
/// Note that because we may have an SCID Alias as the key we can have two entries per channel,
/// though in practice we probably won't be receiving HTLCs for a channel both via the alias
/// and via the classic SCID.
///
/// Note that no consistency guarantees are made about the existence of a channel with the
/// `short_channel_id` here, nor the `channel_id` in `UpdateAddHTLC`!
///
/// See `ChannelManager` struct-level documentation for lock order requirements.
decode_update_add_htlcs: Mutex<HashMap<u64, Vec<msgs::UpdateAddHTLC>>>,
/// The sets of payments which are claimable or currently being claimed. See
/// [`ClaimablePayments`]' individual field docs for more info.
///
@ -2238,9 +2252,9 @@ macro_rules! handle_monitor_update_completion {
let update_actions = $peer_state.monitor_update_blocked_actions
.remove(&$chan.context.channel_id()).unwrap_or(Vec::new());
let htlc_forwards = $self.handle_channel_resumption(
let (htlc_forwards, decode_update_add_htlcs) = $self.handle_channel_resumption(
&mut $peer_state.pending_msg_events, $chan, updates.raa,
updates.commitment_update, updates.order, updates.accepted_htlcs,
updates.commitment_update, updates.order, updates.accepted_htlcs, updates.pending_update_adds,
updates.funding_broadcastable, updates.channel_ready,
updates.announcement_sigs);
if let Some(upd) = channel_update {
@ -2301,6 +2315,9 @@ macro_rules! handle_monitor_update_completion {
if let Some(forwards) = htlc_forwards {
$self.forward_htlcs(&mut [forwards][..]);
}
if let Some(decode) = decode_update_add_htlcs {
$self.push_decode_update_add_htlcs(decode);
}
$self.finalize_claims(updates.finalized_claimed_htlcs);
for failure in updates.failed_htlcs.drain(..) {
let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id };
@ -2477,6 +2494,7 @@ where
pending_inbound_payments: Mutex::new(new_hash_map()),
pending_outbound_payments: OutboundPayments::new(),
forward_htlcs: Mutex::new(new_hash_map()),
decode_update_add_htlcs: Mutex::new(new_hash_map()),
claimable_payments: Mutex::new(ClaimablePayments { claimable_payments: new_hash_map(), pending_claiming_payments: new_hash_map() }),
pending_intercepted_htlcs: Mutex::new(new_hash_map()),
outpoint_to_peer: Mutex::new(new_hash_map()),
@ -3076,6 +3094,163 @@ where
}
}
fn can_forward_htlc_to_outgoing_channel(
&self, chan: &mut Channel<SP>, msg: &msgs::UpdateAddHTLC, next_packet: &NextPacketDetails
) -> Result<(), (&'static str, u16, Option<msgs::ChannelUpdate>)> {
if !chan.context.should_announce() && !self.default_configuration.accept_forwards_to_priv_channels {
// Note that the behavior here should be identical to the above block - we
// should NOT reveal the existence or non-existence of a private channel if
// we don't allow forwards outbound over them.
return Err(("Refusing to forward to a private channel based on our config.", 0x4000 | 10, None));
}
if chan.context.get_channel_type().supports_scid_privacy() && next_packet.outgoing_scid != chan.context.outbound_scid_alias() {
// `option_scid_alias` (referred to in LDK as `scid_privacy`) means
// "refuse to forward unless the SCID alias was used", so we pretend
// we don't have the channel here.
return Err(("Refusing to forward over real channel SCID as our counterparty requested.", 0x4000 | 10, None));
}
// Note that we could technically not return an error yet here and just hope
// that the connection is reestablished or monitor updated by the time we get
// around to doing the actual forward, but better to fail early if we can and
// hopefully an attacker trying to path-trace payments cannot make this occur
// on a small/per-node/per-channel scale.
if !chan.context.is_live() { // channel_disabled
// If the channel_update we're going to return is disabled (i.e. the
// peer has been disabled for some time), return `channel_disabled`,
// otherwise return `temporary_channel_failure`.
let chan_update_opt = self.get_channel_update_for_onion(next_packet.outgoing_scid, chan).ok();
if chan_update_opt.as_ref().map(|u| u.contents.flags & 2 == 2).unwrap_or(false) {
return Err(("Forwarding channel has been disconnected for some time.", 0x1000 | 20, chan_update_opt));
} else {
return Err(("Forwarding channel is not in a ready state.", 0x1000 | 7, chan_update_opt));
}
}
if next_packet.outgoing_amt_msat < chan.context.get_counterparty_htlc_minimum_msat() { // amount_below_minimum
let chan_update_opt = self.get_channel_update_for_onion(next_packet.outgoing_scid, chan).ok();
return Err(("HTLC amount was below the htlc_minimum_msat", 0x1000 | 11, chan_update_opt));
}
if let Err((err, code)) = chan.htlc_satisfies_config(msg, next_packet.outgoing_amt_msat, next_packet.outgoing_cltv_value) {
let chan_update_opt = self.get_channel_update_for_onion(next_packet.outgoing_scid, chan).ok();
return Err((err, code, chan_update_opt));
}
Ok(())
}
/// Executes a callback `C` that returns some value `X` on the channel found with the given
/// `scid`. `None` is returned when the channel is not found.
fn do_funded_channel_callback<X, C: Fn(&mut Channel<SP>) -> X>(
&self, scid: u64, callback: C,
) -> Option<X> {
let (counterparty_node_id, channel_id) = match self.short_to_chan_info.read().unwrap().get(&scid).cloned() {
None => return None,
Some((cp_id, id)) => (cp_id, id),
};
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
if peer_state_mutex_opt.is_none() {
return None;
}
let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
let peer_state = &mut *peer_state_lock;
match peer_state.channel_by_id.get_mut(&channel_id).and_then(
|chan_phase| if let ChannelPhase::Funded(chan) = chan_phase { Some(chan) } else { None }
) {
None => None,
Some(chan) => Some(callback(chan)),
}
}
fn can_forward_htlc(
&self, msg: &msgs::UpdateAddHTLC, next_packet_details: &NextPacketDetails
) -> Result<(), (&'static str, u16, Option<msgs::ChannelUpdate>)> {
match self.do_funded_channel_callback(next_packet_details.outgoing_scid, |chan: &mut Channel<SP>| {
self.can_forward_htlc_to_outgoing_channel(chan, msg, next_packet_details)
}) {
Some(Ok(())) => {},
Some(Err(e)) => return Err(e),
None => {
// If we couldn't find the channel info for the scid, it may be a phantom or
// intercept forward.
if (self.default_configuration.accept_intercept_htlcs &&
fake_scid::is_valid_intercept(&self.fake_scid_rand_bytes, next_packet_details.outgoing_scid, &self.chain_hash)) ||
fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, next_packet_details.outgoing_scid, &self.chain_hash)
{} else {
return Err(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
}
}
}
let cur_height = self.best_block.read().unwrap().height + 1;
if let Err((err_msg, err_code)) = check_incoming_htlc_cltv(
cur_height, next_packet_details.outgoing_cltv_value, msg.cltv_expiry
) {
let chan_update_opt = self.do_funded_channel_callback(next_packet_details.outgoing_scid, |chan: &mut Channel<SP>| {
self.get_channel_update_for_onion(next_packet_details.outgoing_scid, chan).ok()
}).flatten();
return Err((err_msg, err_code, chan_update_opt));
}
Ok(())
}
fn htlc_failure_from_update_add_err(
&self, msg: &msgs::UpdateAddHTLC, counterparty_node_id: &PublicKey, err_msg: &'static str,
mut err_code: u16, chan_update: Option<msgs::ChannelUpdate>, is_intro_node_blinded_forward: bool,
shared_secret: &[u8; 32]
) -> HTLCFailureMsg {
let mut res = VecWriter(Vec::with_capacity(chan_update.serialized_length() + 2 + 8 + 2));
if chan_update.is_some() && err_code & 0x1000 == 0x1000 {
let chan_update = chan_update.unwrap();
if err_code == 0x1000 | 11 || err_code == 0x1000 | 12 {
msg.amount_msat.write(&mut res).expect("Writes cannot fail");
}
else if err_code == 0x1000 | 13 {
msg.cltv_expiry.write(&mut res).expect("Writes cannot fail");
}
else if err_code == 0x1000 | 20 {
// TODO: underspecified, follow https://github.com/lightning/bolts/issues/791
0u16.write(&mut res).expect("Writes cannot fail");
}
(chan_update.serialized_length() as u16 + 2).write(&mut res).expect("Writes cannot fail");
msgs::ChannelUpdate::TYPE.write(&mut res).expect("Writes cannot fail");
chan_update.write(&mut res).expect("Writes cannot fail");
} else if err_code & 0x1000 == 0x1000 {
// If we're trying to return an error that requires a `channel_update` but
// we're forwarding to a phantom or intercept "channel" (i.e. cannot
// generate an update), just use the generic "temporary_node_failure"
// instead.
err_code = 0x2000 | 2;
}
log_info!(
WithContext::from(&self.logger, Some(*counterparty_node_id), Some(msg.channel_id)),
"Failed to accept/forward incoming HTLC: {}", err_msg
);
// If `msg.blinding_point` is set, we must always fail with malformed.
if msg.blinding_point.is_some() {
return HTLCFailureMsg::Malformed(msgs::UpdateFailMalformedHTLC {
channel_id: msg.channel_id,
htlc_id: msg.htlc_id,
sha256_of_onion: [0; 32],
failure_code: INVALID_ONION_BLINDING,
});
}
let (err_code, err_data) = if is_intro_node_blinded_forward {
(INVALID_ONION_BLINDING, &[0; 32][..])
} else {
(err_code, &res.0[..])
};
HTLCFailureMsg::Relay(msgs::UpdateFailHTLC {
channel_id: msg.channel_id,
htlc_id: msg.htlc_id,
reason: HTLCFailReason::reason(err_code, err_data.to_vec())
.get_encrypted_failure_packet(shared_secret, &None),
})
}
fn decode_update_add_htlc_onion(
&self, msg: &msgs::UpdateAddHTLC, counterparty_node_id: &PublicKey,
) -> Result<
@ -3085,48 +3260,7 @@ where
msg, &self.node_signer, &self.logger, &self.secp_ctx
)?;
let is_intro_node_forward = match next_hop {
onion_utils::Hop::Forward {
next_hop_data: msgs::InboundOnionPayload::BlindedForward {
intro_node_blinding_point: Some(_), ..
}, ..
} => true,
_ => false,
};
macro_rules! return_err {
($msg: expr, $err_code: expr, $data: expr) => {
{
log_info!(
WithContext::from(&self.logger, Some(*counterparty_node_id), Some(msg.channel_id)),
"Failed to accept/forward incoming HTLC: {}", $msg
);
// If `msg.blinding_point` is set, we must always fail with malformed.
if msg.blinding_point.is_some() {
return Err(HTLCFailureMsg::Malformed(msgs::UpdateFailMalformedHTLC {
channel_id: msg.channel_id,
htlc_id: msg.htlc_id,
sha256_of_onion: [0; 32],
failure_code: INVALID_ONION_BLINDING,
}));
}
let (err_code, err_data) = if is_intro_node_forward {
(INVALID_ONION_BLINDING, &[0; 32][..])
} else { ($err_code, $data) };
return Err(HTLCFailureMsg::Relay(msgs::UpdateFailHTLC {
channel_id: msg.channel_id,
htlc_id: msg.htlc_id,
reason: HTLCFailReason::reason(err_code, err_data.to_vec())
.get_encrypted_failure_packet(&shared_secret, &None),
}));
}
}
}
let NextPacketDetails {
next_packet_pubkey, outgoing_amt_msat, outgoing_scid, outgoing_cltv_value
} = match next_packet_details_opt {
let next_packet_details = match next_packet_details_opt {
Some(next_packet_details) => next_packet_details,
// it is a receive, so no need for outbound checks
None => return Ok((next_hop, shared_secret, None)),
@ -3134,124 +3268,15 @@ where
// Perform outbound checks here instead of in [`Self::construct_pending_htlc_info`] because we
// can't hold the outbound peer state lock at the same time as the inbound peer state lock.
if let Some((err, mut code, chan_update)) = loop {
let id_option = self.short_to_chan_info.read().unwrap().get(&outgoing_scid).cloned();
let forwarding_chan_info_opt = match id_option {
None => { // unknown_next_peer
// Note that this is likely a timing oracle for detecting whether an scid is a
// phantom or an intercept.
if (self.default_configuration.accept_intercept_htlcs &&
fake_scid::is_valid_intercept(&self.fake_scid_rand_bytes, outgoing_scid, &self.chain_hash)) ||
fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, outgoing_scid, &self.chain_hash)
{
None
} else {
break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
}
},
Some((cp_id, id)) => Some((cp_id.clone(), id.clone())),
};
let chan_update_opt = if let Some((counterparty_node_id, forwarding_id)) = forwarding_chan_info_opt {
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
if peer_state_mutex_opt.is_none() {
break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
}
let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
let peer_state = &mut *peer_state_lock;
let chan = match peer_state.channel_by_id.get_mut(&forwarding_id).map(
|chan_phase| if let ChannelPhase::Funded(chan) = chan_phase { Some(chan) } else { None }
).flatten() {
None => {
// Channel was removed. The short_to_chan_info and channel_by_id maps
// have no consistency guarantees.
break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
},
Some(chan) => chan
};
if !chan.context.should_announce() && !self.default_configuration.accept_forwards_to_priv_channels {
// Note that the behavior here should be identical to the above block - we
// should NOT reveal the existence or non-existence of a private channel if
// we don't allow forwards outbound over them.
break Some(("Refusing to forward to a private channel based on our config.", 0x4000 | 10, None));
}
if chan.context.get_channel_type().supports_scid_privacy() && outgoing_scid != chan.context.outbound_scid_alias() {
// `option_scid_alias` (referred to in LDK as `scid_privacy`) means
// "refuse to forward unless the SCID alias was used", so we pretend
// we don't have the channel here.
break Some(("Refusing to forward over real channel SCID as our counterparty requested.", 0x4000 | 10, None));
}
let chan_update_opt = self.get_channel_update_for_onion(outgoing_scid, chan).ok();
self.can_forward_htlc(&msg, &next_packet_details).map_err(|e| {
let (err_msg, err_code, chan_update_opt) = e;
self.htlc_failure_from_update_add_err(
msg, counterparty_node_id, err_msg, err_code, chan_update_opt,
next_hop.is_intro_node_blinded_forward(), &shared_secret
)
})?;
// Note that we could technically not return an error yet here and just hope
// that the connection is reestablished or monitor updated by the time we get
// around to doing the actual forward, but better to fail early if we can and
// hopefully an attacker trying to path-trace payments cannot make this occur
// on a small/per-node/per-channel scale.
if !chan.context.is_live() { // channel_disabled
// If the channel_update we're going to return is disabled (i.e. the
// peer has been disabled for some time), return `channel_disabled`,
// otherwise return `temporary_channel_failure`.
if chan_update_opt.as_ref().map(|u| u.contents.flags & 2 == 2).unwrap_or(false) {
break Some(("Forwarding channel has been disconnected for some time.", 0x1000 | 20, chan_update_opt));
} else {
break Some(("Forwarding channel is not in a ready state.", 0x1000 | 7, chan_update_opt));
}
}
if outgoing_amt_msat < chan.context.get_counterparty_htlc_minimum_msat() { // amount_below_minimum
break Some(("HTLC amount was below the htlc_minimum_msat", 0x1000 | 11, chan_update_opt));
}
if let Err((err, code)) = chan.htlc_satisfies_config(&msg, outgoing_amt_msat, outgoing_cltv_value) {
break Some((err, code, chan_update_opt));
}
chan_update_opt
} else {
None
};
let cur_height = self.best_block.read().unwrap().height + 1;
if let Err((err_msg, code)) = check_incoming_htlc_cltv(
cur_height, outgoing_cltv_value, msg.cltv_expiry
) {
if code & 0x1000 != 0 && chan_update_opt.is_none() {
// We really should set `incorrect_cltv_expiry` here but as we're not
// forwarding over a real channel we can't generate a channel_update
// for it. Instead we just return a generic temporary_node_failure.
break Some((err_msg, 0x2000 | 2, None))
}
let chan_update_opt = if code & 0x1000 != 0 { chan_update_opt } else { None };
break Some((err_msg, code, chan_update_opt));
}
break None;
}
{
let mut res = VecWriter(Vec::with_capacity(chan_update.serialized_length() + 2 + 8 + 2));
if let Some(chan_update) = chan_update {
if code == 0x1000 | 11 || code == 0x1000 | 12 {
msg.amount_msat.write(&mut res).expect("Writes cannot fail");
}
else if code == 0x1000 | 13 {
msg.cltv_expiry.write(&mut res).expect("Writes cannot fail");
}
else if code == 0x1000 | 20 {
// TODO: underspecified, follow https://github.com/lightning/bolts/issues/791
0u16.write(&mut res).expect("Writes cannot fail");
}
(chan_update.serialized_length() as u16 + 2).write(&mut res).expect("Writes cannot fail");
msgs::ChannelUpdate::TYPE.write(&mut res).expect("Writes cannot fail");
chan_update.write(&mut res).expect("Writes cannot fail");
} else if code & 0x1000 == 0x1000 {
// If we're trying to return an error that requires a `channel_update` but
// we're forwarding to a phantom or intercept "channel" (i.e. cannot
// generate an update), just use the generic "temporary_node_failure"
// instead.
code = 0x2000 | 2;
}
return_err!(err, code, &res.0[..]);
}
Ok((next_hop, shared_secret, Some(next_packet_pubkey)))
Ok((next_hop, shared_secret, Some(next_packet_details.next_packet_pubkey)))
}
fn construct_pending_htlc_status<'a>(
@ -4279,6 +4304,145 @@ where
Ok(())
}
fn process_pending_update_add_htlcs(&self) {
let mut decode_update_add_htlcs = new_hash_map();
mem::swap(&mut decode_update_add_htlcs, &mut self.decode_update_add_htlcs.lock().unwrap());
let get_failed_htlc_destination = |outgoing_scid_opt: Option<u64>, payment_hash: PaymentHash| {
if let Some(outgoing_scid) = outgoing_scid_opt {
match self.short_to_chan_info.read().unwrap().get(&outgoing_scid) {
Some((outgoing_counterparty_node_id, outgoing_channel_id)) =>
HTLCDestination::NextHopChannel {
node_id: Some(*outgoing_counterparty_node_id),
channel_id: *outgoing_channel_id,
},
None => HTLCDestination::UnknownNextHop {
requested_forward_scid: outgoing_scid,
},
}
} else {
HTLCDestination::FailedPayment { payment_hash }
}
};
'outer_loop: for (incoming_scid, update_add_htlcs) in decode_update_add_htlcs {
let incoming_channel_details_opt = self.do_funded_channel_callback(incoming_scid, |chan: &mut Channel<SP>| {
let counterparty_node_id = chan.context.get_counterparty_node_id();
let channel_id = chan.context.channel_id();
let funding_txo = chan.context.get_funding_txo().unwrap();
let user_channel_id = chan.context.get_user_id();
let accept_underpaying_htlcs = chan.context.config().accept_underpaying_htlcs;
(counterparty_node_id, channel_id, funding_txo, user_channel_id, accept_underpaying_htlcs)
});
let (
incoming_counterparty_node_id, incoming_channel_id, incoming_funding_txo,
incoming_user_channel_id, incoming_accept_underpaying_htlcs
) = if let Some(incoming_channel_details) = incoming_channel_details_opt {
incoming_channel_details
} else {
// The incoming channel no longer exists, HTLCs should be resolved onchain instead.
continue;
};
let mut htlc_forwards = Vec::new();
let mut htlc_fails = Vec::new();
for update_add_htlc in &update_add_htlcs {
let (next_hop, shared_secret, next_packet_details_opt) = match decode_incoming_update_add_htlc_onion(
&update_add_htlc, &self.node_signer, &self.logger, &self.secp_ctx
) {
Ok(decoded_onion) => decoded_onion,
Err(htlc_fail) => {
htlc_fails.push((htlc_fail, HTLCDestination::InvalidOnion));
continue;
},
};
let is_intro_node_blinded_forward = next_hop.is_intro_node_blinded_forward();
let outgoing_scid_opt = next_packet_details_opt.as_ref().map(|d| d.outgoing_scid);
// Process the HTLC on the incoming channel.
match self.do_funded_channel_callback(incoming_scid, |chan: &mut Channel<SP>| {
let logger = WithChannelContext::from(&self.logger, &chan.context);
chan.can_accept_incoming_htlc(
update_add_htlc, &self.fee_estimator, &logger,
)
}) {
Some(Ok(_)) => {},
Some(Err((err, code))) => {
let outgoing_chan_update_opt = if let Some(outgoing_scid) = outgoing_scid_opt.as_ref() {
self.do_funded_channel_callback(*outgoing_scid, |chan: &mut Channel<SP>| {
self.get_channel_update_for_onion(*outgoing_scid, chan).ok()
}).flatten()
} else {
None
};
let htlc_fail = self.htlc_failure_from_update_add_err(
&update_add_htlc, &incoming_counterparty_node_id, err, code,
outgoing_chan_update_opt, is_intro_node_blinded_forward, &shared_secret,
);
let htlc_destination = get_failed_htlc_destination(outgoing_scid_opt, update_add_htlc.payment_hash);
htlc_fails.push((htlc_fail, htlc_destination));
continue;
},
// The incoming channel no longer exists, HTLCs should be resolved onchain instead.
None => continue 'outer_loop,
}
// Now process the HTLC on the outgoing channel if it's a forward.
if let Some(next_packet_details) = next_packet_details_opt.as_ref() {
if let Err((err, code, chan_update_opt)) = self.can_forward_htlc(
&update_add_htlc, next_packet_details
) {
let htlc_fail = self.htlc_failure_from_update_add_err(
&update_add_htlc, &incoming_counterparty_node_id, err, code,
chan_update_opt, is_intro_node_blinded_forward, &shared_secret,
);
let htlc_destination = get_failed_htlc_destination(outgoing_scid_opt, update_add_htlc.payment_hash);
htlc_fails.push((htlc_fail, htlc_destination));
continue;
}
}
match self.construct_pending_htlc_status(
&update_add_htlc, &incoming_counterparty_node_id, shared_secret, next_hop,
incoming_accept_underpaying_htlcs, next_packet_details_opt.map(|d| d.next_packet_pubkey),
) {
PendingHTLCStatus::Forward(htlc_forward) => {
htlc_forwards.push((htlc_forward, update_add_htlc.htlc_id));
},
PendingHTLCStatus::Fail(htlc_fail) => {
let htlc_destination = get_failed_htlc_destination(outgoing_scid_opt, update_add_htlc.payment_hash);
htlc_fails.push((htlc_fail, htlc_destination));
},
}
}
// Process all of the forwards and failures for the channel in which the HTLCs were
// proposed to as a batch.
let pending_forwards = (incoming_scid, incoming_funding_txo, incoming_channel_id,
incoming_user_channel_id, htlc_forwards.drain(..).collect());
self.forward_htlcs_without_forward_event(&mut [pending_forwards]);
for (htlc_fail, htlc_destination) in htlc_fails.drain(..) {
let failure = match htlc_fail {
HTLCFailureMsg::Relay(fail_htlc) => HTLCForwardInfo::FailHTLC {
htlc_id: fail_htlc.htlc_id,
err_packet: fail_htlc.reason,
},
HTLCFailureMsg::Malformed(fail_malformed_htlc) => HTLCForwardInfo::FailMalformedHTLC {
htlc_id: fail_malformed_htlc.htlc_id,
sha256_of_onion: fail_malformed_htlc.sha256_of_onion,
failure_code: fail_malformed_htlc.failure_code,
},
};
self.forward_htlcs.lock().unwrap().entry(incoming_scid).or_insert(vec![]).push(failure);
self.pending_events.lock().unwrap().push_back((events::Event::HTLCHandlingFailed {
prev_channel_id: incoming_channel_id,
failed_next_destination: htlc_destination,
}, None));
}
}
}
/// Processes HTLCs which are pending waiting on random forward delay.
///
/// Should only really ever be called in response to a PendingHTLCsForwardable event.
@ -4286,6 +4450,8 @@ where
pub fn process_pending_htlc_forwards(&self) {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
self.process_pending_update_add_htlcs();
let mut new_events = VecDeque::new();
let mut failed_forwards = Vec::new();
let mut phantom_receives: Vec<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new();
@ -5330,9 +5496,14 @@ where
}
}
fn fail_htlc_backwards_internal(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) {
let push_forward_event = self.fail_htlc_backwards_internal_without_forward_event(source, payment_hash, onion_error, destination);
if push_forward_event { self.push_pending_forwards_ev(); }
}
/// Fails an HTLC backwards to the sender of it to us.
/// Note that we do not assume that channels corresponding to failed HTLCs are still available.
fn fail_htlc_backwards_internal(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) {
fn fail_htlc_backwards_internal_without_forward_event(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) -> bool {
// Ensure that no peer state channel storage lock is held when calling this function.
// This ensures that future code doesn't introduce a lock-order requirement for
// `forward_htlcs` to be locked after the `per_peer_state` peer locks, which calling
@ -5350,12 +5521,12 @@ where
// Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called
// from block_connected which may run during initialization prior to the chain_monitor
// being fully configured. See the docs for `ChannelManagerReadArgs` for more.
let mut push_forward_event;
match source {
HTLCSource::OutboundRoute { ref path, ref session_priv, ref payment_id, .. } => {
if self.pending_outbound_payments.fail_htlc(source, payment_hash, onion_error, path,
push_forward_event = self.pending_outbound_payments.fail_htlc(source, payment_hash, onion_error, path,
session_priv, payment_id, self.probing_cookie_secret, &self.secp_ctx,
&self.pending_events, &self.logger)
{ self.push_pending_forwards_ev(); }
&self.pending_events, &self.logger);
},
HTLCSource::PreviousHopData(HTLCPreviousHopData {
ref short_channel_id, ref htlc_id, ref incoming_packet_shared_secret,
@ -5389,11 +5560,9 @@ where
}
};
let mut push_forward_ev = false;
push_forward_event = self.decode_update_add_htlcs.lock().unwrap().is_empty();
let mut forward_htlcs = self.forward_htlcs.lock().unwrap();
if forward_htlcs.is_empty() {
push_forward_ev = true;
}
push_forward_event &= forward_htlcs.is_empty();
match forward_htlcs.entry(*short_channel_id) {
hash_map::Entry::Occupied(mut entry) => {
entry.get_mut().push(failure);
@ -5403,7 +5572,6 @@ where
}
}
mem::drop(forward_htlcs);
if push_forward_ev { self.push_pending_forwards_ev(); }
let mut pending_events = self.pending_events.lock().unwrap();
pending_events.push_back((events::Event::HTLCHandlingFailed {
prev_channel_id: *channel_id,
@ -5411,6 +5579,7 @@ where
}, None));
},
}
push_forward_event
}
/// Provides a payment preimage in response to [`Event::PaymentClaimable`], generating any
@ -5929,24 +6098,31 @@ where
fn handle_channel_resumption(&self, pending_msg_events: &mut Vec<MessageSendEvent>,
channel: &mut Channel<SP>, raa: Option<msgs::RevokeAndACK>,
commitment_update: Option<msgs::CommitmentUpdate>, order: RAACommitmentOrder,
pending_forwards: Vec<(PendingHTLCInfo, u64)>, funding_broadcastable: Option<Transaction>,
pending_forwards: Vec<(PendingHTLCInfo, u64)>, pending_update_adds: Vec<msgs::UpdateAddHTLC>,
funding_broadcastable: Option<Transaction>,
channel_ready: Option<msgs::ChannelReady>, announcement_sigs: Option<msgs::AnnouncementSignatures>)
-> Option<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)> {
-> (Option<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)>, Option<(u64, Vec<msgs::UpdateAddHTLC>)>) {
let logger = WithChannelContext::from(&self.logger, &channel.context);
log_trace!(logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {}broadcasting funding, {} channel ready, {} announcement",
log_trace!(logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {} pending update_add_htlcs, {}broadcasting funding, {} channel ready, {} announcement",
&channel.context.channel_id(),
if raa.is_some() { "an" } else { "no" },
if commitment_update.is_some() { "a" } else { "no" }, pending_forwards.len(),
if commitment_update.is_some() { "a" } else { "no" },
pending_forwards.len(), pending_update_adds.len(),
if funding_broadcastable.is_some() { "" } else { "not " },
if channel_ready.is_some() { "sending" } else { "without" },
if announcement_sigs.is_some() { "sending" } else { "without" });
let mut htlc_forwards = None;
let counterparty_node_id = channel.context.get_counterparty_node_id();
let short_channel_id = channel.context.get_short_channel_id().unwrap_or(channel.context.outbound_scid_alias());
let mut htlc_forwards = None;
if !pending_forwards.is_empty() {
htlc_forwards = Some((channel.context.get_short_channel_id().unwrap_or(channel.context.outbound_scid_alias()),
channel.context.get_funding_txo().unwrap(), channel.context.channel_id(), channel.context.get_user_id(), pending_forwards));
htlc_forwards = Some((short_channel_id, channel.context.get_funding_txo().unwrap(),
channel.context.channel_id(), channel.context.get_user_id(), pending_forwards));
}
let mut decode_update_add_htlcs = None;
if !pending_update_adds.is_empty() {
decode_update_add_htlcs = Some((short_channel_id, pending_update_adds));
}
if let Some(msg) = channel_ready {
@ -5997,7 +6173,7 @@ where
emit_channel_ready_event!(pending_events, channel);
}
htlc_forwards
(htlc_forwards, decode_update_add_htlcs)
}
fn channel_monitor_updated(&self, funding_txo: &OutPoint, channel_id: &ChannelId, highest_applied_update_id: u64, counterparty_node_id: Option<&PublicKey>) {
@ -6782,7 +6958,7 @@ where
match peer_state.channel_by_id.entry(msg.channel_id) {
hash_map::Entry::Occupied(mut chan_phase_entry) => {
if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
let pending_forward_info = match decoded_hop_res {
let mut pending_forward_info = match decoded_hop_res {
Ok((next_hop, shared_secret, next_packet_pk_opt)) =>
self.construct_pending_htlc_status(
msg, counterparty_node_id, shared_secret, next_hop,
@ -6790,44 +6966,45 @@ where
),
Err(e) => PendingHTLCStatus::Fail(e)
};
let create_pending_htlc_status = |chan: &Channel<SP>, pending_forward_info: PendingHTLCStatus, error_code: u16| {
let logger = WithChannelContext::from(&self.logger, &chan.context);
// If the update_add is completely bogus, the call will Err and we will close,
// but if we've sent a shutdown and they haven't acknowledged it yet, we just
// want to reject the new HTLC and fail it backwards instead of forwarding.
if let Err((_, error_code)) = chan.can_accept_incoming_htlc(&msg, &self.fee_estimator, &logger) {
if msg.blinding_point.is_some() {
return PendingHTLCStatus::Fail(HTLCFailureMsg::Malformed(
msgs::UpdateFailMalformedHTLC {
channel_id: msg.channel_id,
htlc_id: msg.htlc_id,
sha256_of_onion: [0; 32],
failure_code: INVALID_ONION_BLINDING,
}
))
}
// If the update_add is completely bogus, the call will Err and we will close,
// but if we've sent a shutdown and they haven't acknowledged it yet, we just
// want to reject the new HTLC and fail it backwards instead of forwarding.
match pending_forward_info {
PendingHTLCStatus::Forward(PendingHTLCInfo {
ref incoming_shared_secret, ref routing, ..
}) => {
let reason = if routing.blinded_failure().is_some() {
HTLCFailReason::reason(INVALID_ONION_BLINDING, vec![0; 32])
} else if (error_code & 0x1000) != 0 {
let (real_code, error_data) = self.get_htlc_inbound_temp_fail_err_and_data(error_code, chan);
HTLCFailReason::reason(real_code, error_data)
} else {
HTLCFailReason::from_failure_code(error_code)
}.get_encrypted_failure_packet(incoming_shared_secret, &None);
let msg = msgs::UpdateFailHTLC {
pending_forward_info = PendingHTLCStatus::Fail(HTLCFailureMsg::Malformed(
msgs::UpdateFailMalformedHTLC {
channel_id: msg.channel_id,
htlc_id: msg.htlc_id,
reason
};
PendingHTLCStatus::Fail(HTLCFailureMsg::Relay(msg))
},
_ => pending_forward_info
sha256_of_onion: [0; 32],
failure_code: INVALID_ONION_BLINDING,
}
))
} else {
match pending_forward_info {
PendingHTLCStatus::Forward(PendingHTLCInfo {
ref incoming_shared_secret, ref routing, ..
}) => {
let reason = if routing.blinded_failure().is_some() {
HTLCFailReason::reason(INVALID_ONION_BLINDING, vec![0; 32])
} else if (error_code & 0x1000) != 0 {
let (real_code, error_data) = self.get_htlc_inbound_temp_fail_err_and_data(error_code, chan);
HTLCFailReason::reason(real_code, error_data)
} else {
HTLCFailReason::from_failure_code(error_code)
}.get_encrypted_failure_packet(incoming_shared_secret, &None);
let msg = msgs::UpdateFailHTLC {
channel_id: msg.channel_id,
htlc_id: msg.htlc_id,
reason
};
pending_forward_info = PendingHTLCStatus::Fail(HTLCFailureMsg::Relay(msg));
},
_ => {},
}
}
};
let logger = WithChannelContext::from(&self.logger, &chan.context);
try_chan_phase_entry!(self, chan.update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.fee_estimator, &&logger), chan_phase_entry);
}
try_chan_phase_entry!(self, chan.update_add_htlc(&msg, pending_forward_info), chan_phase_entry);
} else {
return try_chan_phase_entry!(self, Err(ChannelError::Close(
"Got an update_add_htlc message for an unfunded channel!".into())), chan_phase_entry);
@ -6971,10 +7148,28 @@ where
}
}
fn push_decode_update_add_htlcs(&self, mut update_add_htlcs: (u64, Vec<msgs::UpdateAddHTLC>)) {
let mut push_forward_event = self.forward_htlcs.lock().unwrap().is_empty();
let mut decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap();
push_forward_event &= decode_update_add_htlcs.is_empty();
let scid = update_add_htlcs.0;
match decode_update_add_htlcs.entry(scid) {
hash_map::Entry::Occupied(mut e) => { e.get_mut().append(&mut update_add_htlcs.1); },
hash_map::Entry::Vacant(e) => { e.insert(update_add_htlcs.1); },
}
if push_forward_event { self.push_pending_forwards_ev(); }
}
#[inline]
fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) {
let push_forward_event = self.forward_htlcs_without_forward_event(per_source_pending_forwards);
if push_forward_event { self.push_pending_forwards_ev() }
}
#[inline]
fn forward_htlcs_without_forward_event(&self, per_source_pending_forwards: &mut [(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) -> bool {
let mut push_forward_event = false;
for &mut (prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards {
let mut push_forward_event = false;
let mut new_intercept_events = VecDeque::new();
let mut failed_intercept_forwards = Vec::new();
if !pending_forwards.is_empty() {
@ -6987,6 +7182,7 @@ where
// Pull this now to avoid introducing a lock order with `forward_htlcs`.
let is_our_scid = self.short_to_chan_info.read().unwrap().contains_key(&scid);
let decode_update_add_htlcs_empty = self.decode_update_add_htlcs.lock().unwrap().is_empty();
let mut forward_htlcs = self.forward_htlcs.lock().unwrap();
let forward_htlcs_empty = forward_htlcs.is_empty();
match forward_htlcs.entry(scid) {
@ -7035,9 +7231,7 @@ where
} else {
// We don't want to generate a PendingHTLCsForwardable event if only intercepted
// payments are being processed.
if forward_htlcs_empty {
push_forward_event = true;
}
push_forward_event |= forward_htlcs_empty && decode_update_add_htlcs_empty;
entry.insert(vec!(HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info })));
}
@ -7047,15 +7241,15 @@ where
}
for (htlc_source, payment_hash, failure_reason, destination) in failed_intercept_forwards.drain(..) {
self.fail_htlc_backwards_internal(&htlc_source, &payment_hash, &failure_reason, destination);
push_forward_event |= self.fail_htlc_backwards_internal_without_forward_event(&htlc_source, &payment_hash, &failure_reason, destination);
}
if !new_intercept_events.is_empty() {
let mut events = self.pending_events.lock().unwrap();
events.append(&mut new_intercept_events);
}
if push_forward_event { self.push_pending_forwards_ev() }
}
push_forward_event
}
fn push_pending_forwards_ev(&self) {
@ -7265,7 +7459,6 @@ where
}
fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<NotifyOption, MsgHandleErrInternal> {
let htlc_forwards;
let need_lnd_workaround = {
let per_peer_state = self.per_peer_state.read().unwrap();
@ -7308,9 +7501,11 @@ where
}
}
let need_lnd_workaround = chan.context.workaround_lnd_bug_4006.take();
htlc_forwards = self.handle_channel_resumption(
let (htlc_forwards, decode_update_add_htlcs) = self.handle_channel_resumption(
&mut peer_state.pending_msg_events, chan, responses.raa, responses.commitment_update, responses.order,
Vec::new(), None, responses.channel_ready, responses.announcement_sigs);
Vec::new(), Vec::new(), None, responses.channel_ready, responses.announcement_sigs);
debug_assert!(htlc_forwards.is_none());
debug_assert!(decode_update_add_htlcs.is_none());
if let Some(upd) = channel_update {
peer_state.pending_msg_events.push(upd);
}
@ -7356,16 +7551,10 @@ where
}
};
let mut persist = NotifyOption::SkipPersistHandleEvents;
if let Some(forwards) = htlc_forwards {
self.forward_htlcs(&mut [forwards][..]);
persist = NotifyOption::DoPersist;
}
if let Some(channel_ready_msg) = need_lnd_workaround {
self.internal_channel_ready(counterparty_node_id, &channel_ready_msg)?;
}
Ok(persist)
Ok(NotifyOption::SkipPersistHandleEvents)
}
/// Process pending events from the [`chain::Watch`], returning whether any events were processed.
@ -10198,6 +10387,12 @@ where
}
}
let mut decode_update_add_htlcs_opt = None;
let decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap();
if !decode_update_add_htlcs.is_empty() {
decode_update_add_htlcs_opt = Some(decode_update_add_htlcs);
}
let per_peer_state = self.per_peer_state.write().unwrap();
let pending_inbound_payments = self.pending_inbound_payments.lock().unwrap();
@ -10349,6 +10544,7 @@ where
(10, in_flight_monitor_updates, option),
(11, self.probing_cookie_secret, required),
(13, htlc_onion_fields, optional_vec),
(14, decode_update_add_htlcs_opt, option),
});
Ok(())
@ -10814,6 +11010,7 @@ where
let mut monitor_update_blocked_actions_per_peer: Option<Vec<(_, BTreeMap<_, Vec<_>>)>> = Some(Vec::new());
let mut events_override = None;
let mut in_flight_monitor_updates: Option<HashMap<(PublicKey, OutPoint), Vec<ChannelMonitorUpdate>>> = None;
let mut decode_update_add_htlcs: Option<HashMap<u64, Vec<msgs::UpdateAddHTLC>>> = None;
read_tlv_fields!(reader, {
(1, pending_outbound_payments_no_retry, option),
(2, pending_intercepted_htlcs, option),
@ -10827,7 +11024,9 @@ where
(10, in_flight_monitor_updates, option),
(11, probing_cookie_secret, option),
(13, claimable_htlc_onion_fields, optional_vec),
(14, decode_update_add_htlcs, option),
});
let mut decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
if fake_scid_rand_bytes.is_none() {
fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes());
}
@ -11047,6 +11246,18 @@ where
// still have an entry for this HTLC in `forward_htlcs` or
// `pending_intercepted_htlcs`, we were apparently not persisted after
// the monitor was when forwarding the payment.
decode_update_add_htlcs.retain(|scid, update_add_htlcs| {
update_add_htlcs.retain(|update_add_htlc| {
let matches = *scid == prev_hop_data.short_channel_id &&
update_add_htlc.htlc_id == prev_hop_data.htlc_id;
if matches {
log_info!(logger, "Removing pending to-decode HTLC with hash {} as it was forwarded to the closed channel {}",
&htlc.payment_hash, &monitor.channel_id());
}
!matches
});
!update_add_htlcs.is_empty()
});
forward_htlcs.retain(|_, forwards| {
forwards.retain(|forward| {
if let HTLCForwardInfo::AddHTLC(htlc_info) = forward {
@ -11128,7 +11339,7 @@ where
}
}
if !forward_htlcs.is_empty() || pending_outbounds.needs_abandon() {
if !forward_htlcs.is_empty() || !decode_update_add_htlcs.is_empty() || pending_outbounds.needs_abandon() {
// If we have pending HTLCs to forward, assume we either dropped a
// `PendingHTLCsForwardable` or the user received it but never processed it as they
// shut down before the timer hit. Either way, set the time_forwardable to a small
@ -11362,6 +11573,7 @@ where
pending_intercepted_htlcs: Mutex::new(pending_intercepted_htlcs.unwrap()),
forward_htlcs: Mutex::new(forward_htlcs),
decode_update_add_htlcs: Mutex::new(decode_update_add_htlcs),
claimable_payments: Mutex::new(ClaimablePayments { claimable_payments, pending_claiming_payments: pending_claiming_payments.unwrap() }),
outbound_scid_aliases: Mutex::new(outbound_scid_aliases),
outpoint_to_peer: Mutex::new(outpoint_to_peer),

View file

@ -1827,14 +1827,9 @@ macro_rules! expect_htlc_handling_failed_destinations {
/// there are any [`Event::HTLCHandlingFailed`] events their [`HTLCDestination`] is included in the
/// `expected_failures` set.
pub fn expect_pending_htlcs_forwardable_conditions(events: Vec<Event>, expected_failures: &[HTLCDestination]) {
match events[0] {
Event::PendingHTLCsForwardable { .. } => { },
_ => panic!("Unexpected event {:?}", events),
};
let count = expected_failures.len() + 1;
assert_eq!(events.len(), count);
assert!(events.iter().find(|event| matches!(event, Event::PendingHTLCsForwardable { .. })).is_some());
if expected_failures.len() > 0 {
expect_htlc_handling_failed_destinations!(events, expected_failures)
}

View file

@ -2750,7 +2750,7 @@ fn claim_htlc_outputs_single_tx() {
check_added_monitors!(nodes[1], 1);
check_closed_event!(nodes[1], 1, ClosureReason::CommitmentTxConfirmed, [nodes[0].node.get_our_node_id()], 100000);
let mut events = nodes[0].node.get_and_clear_pending_events();
expect_pending_htlcs_forwardable_from_events!(nodes[0], events[0..1], true);
expect_pending_htlcs_forwardable_conditions(events[0..2].to_vec(), &[HTLCDestination::FailedPayment { payment_hash: payment_hash_2 }]);
match events.last().unwrap() {
Event::ChannelClosed { reason: ClosureReason::CommitmentTxConfirmed, .. } => {}
_ => panic!("Unexpected event"),
@ -3312,13 +3312,13 @@ fn do_test_commitment_revoked_fail_backward_exhaustive(deliver_bs_raa: bool, use
let events = nodes[1].node.get_and_clear_pending_events();
assert_eq!(events.len(), 2);
match events[0] {
Event::PendingHTLCsForwardable { .. } => { },
_ => panic!("Unexpected event"),
};
match events[1] {
Event::HTLCHandlingFailed { .. } => { },
_ => panic!("Unexpected event"),
}
match events[1] {
Event::PendingHTLCsForwardable { .. } => { },
_ => panic!("Unexpected event"),
};
// Deliberately don't process the pending fail-back so they all fail back at once after
// block connection just like the !deliver_bs_raa case
}
@ -5351,7 +5351,7 @@ fn do_test_fail_backwards_unrevoked_remote_announce(deliver_last_raa: bool, anno
connect_blocks(&nodes[2], ANTI_REORG_DELAY - 1);
check_closed_broadcast!(nodes[2], true);
if deliver_last_raa {
expect_pending_htlcs_forwardable_from_events!(nodes[2], events[0..1], true);
expect_pending_htlcs_forwardable_from_events!(nodes[2], events[1..2], true);
let expected_destinations: Vec<HTLCDestination> = repeat(HTLCDestination::NextHopChannel { node_id: Some(nodes[3].node.get_our_node_id()), channel_id: chan_2_3.2 }).take(3).collect();
expect_htlc_handling_failed_destinations!(nodes[2].node.get_and_clear_pending_events(), expected_destinations);
@ -6182,7 +6182,7 @@ fn test_fail_holding_cell_htlc_upon_free_multihop() {
// nodes[1]'s ChannelManager will now signal that we have HTLC forwards to process.
let process_htlc_forwards_event = nodes[1].node.get_and_clear_pending_events();
assert_eq!(process_htlc_forwards_event.len(), 2);
match &process_htlc_forwards_event[0] {
match &process_htlc_forwards_event[1] {
&Event::PendingHTLCsForwardable { .. } => {},
_ => panic!("Unexpected event"),
}
@ -7543,7 +7543,7 @@ fn test_bump_penalty_txn_on_revoked_htlcs() {
let route_params = RouteParameters::from_payment_params_and_value(payment_params, 3_000_000);
let route = get_route(&nodes[1].node.get_our_node_id(), &route_params, &nodes[1].network_graph.read_only(), None,
nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes).unwrap();
send_along_route(&nodes[1], route, &[&nodes[0]], 3_000_000);
let failed_payment_hash = send_along_route(&nodes[1], route, &[&nodes[0]], 3_000_000).1;
let revoked_local_txn = get_local_commitment_txn!(nodes[1], chan.2);
assert_eq!(revoked_local_txn[0].input.len(), 1);
@ -7582,7 +7582,7 @@ fn test_bump_penalty_txn_on_revoked_htlcs() {
let block_129 = create_dummy_block(block_11.block_hash(), 42, vec![revoked_htlc_txn[0].clone(), revoked_htlc_txn[1].clone()]);
connect_block(&nodes[0], &block_129);
let events = nodes[0].node.get_and_clear_pending_events();
expect_pending_htlcs_forwardable_from_events!(nodes[0], events[0..1], true);
expect_pending_htlcs_forwardable_conditions(events[0..2].to_vec(), &[HTLCDestination::FailedPayment { payment_hash: failed_payment_hash }]);
match events.last().unwrap() {
Event::ChannelClosed { reason: ClosureReason::CommitmentTxConfirmed, .. } => {}
_ => panic!("Unexpected event"),

View file

@ -1072,6 +1072,21 @@ pub(crate) enum Hop {
},
}
impl Hop {
pub(crate) fn is_intro_node_blinded_forward(&self) -> bool {
match self {
Self::Forward {
next_hop_data:
msgs::InboundOnionPayload::BlindedForward {
intro_node_blinding_point: Some(_), ..
},
..
} => true,
_ => false,
}
}
}
/// Error returned when we fail to decode the onion packet.
#[derive(Debug)]
pub(crate) enum OnionDecodeErr {

View file

@ -108,14 +108,14 @@ impl Writer for LengthCalculatingWriter {
/// forward to ensure we always consume exactly the fixed length specified.
///
/// This is not exported to bindings users as manual TLV building is not currently supported in bindings
pub struct FixedLengthReader<R: Read> {
read: R,
pub struct FixedLengthReader<'a, R: Read> {
read: &'a mut R,
bytes_read: u64,
total_bytes: u64,
}
impl<R: Read> FixedLengthReader<R> {
impl<'a, R: Read> FixedLengthReader<'a, R> {
/// Returns a new [`FixedLengthReader`].
pub fn new(read: R, total_bytes: u64) -> Self {
pub fn new(read: &'a mut R, total_bytes: u64) -> Self {
Self { read, bytes_read: 0, total_bytes }
}
@ -136,7 +136,7 @@ impl<R: Read> FixedLengthReader<R> {
}
}
}
impl<R: Read> Read for FixedLengthReader<R> {
impl<'a, R: Read> Read for FixedLengthReader<'a, R> {
#[inline]
fn read(&mut self, dest: &mut [u8]) -> Result<usize, io::Error> {
if self.total_bytes == self.bytes_read {
@ -154,7 +154,7 @@ impl<R: Read> Read for FixedLengthReader<R> {
}
}
impl<R: Read> LengthRead for FixedLengthReader<R> {
impl<'a, R: Read> LengthRead for FixedLengthReader<'a, R> {
#[inline]
fn total_bytes(&self) -> u64 {
self.total_bytes
@ -820,6 +820,49 @@ macro_rules! impl_for_vec {
}
}
// Alternatives to impl_writeable_for_vec/impl_readable_for_vec that add a length prefix to each
// element in the Vec. Intended to be used when elements have variable lengths.
macro_rules! impl_writeable_for_vec_with_element_length_prefix {
($ty: ty $(, $name: ident)*) => {
impl<$($name : Writeable),*> Writeable for Vec<$ty> {
#[inline]
fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
CollectionLength(self.len() as u64).write(w)?;
for elem in self.iter() {
CollectionLength(elem.serialized_length() as u64).write(w)?;
elem.write(w)?;
}
Ok(())
}
}
}
}
macro_rules! impl_readable_for_vec_with_element_length_prefix {
($ty: ty $(, $name: ident)*) => {
impl<$($name : Readable),*> Readable for Vec<$ty> {
#[inline]
fn read<R: Read>(r: &mut R) -> Result<Self, DecodeError> {
let len: CollectionLength = Readable::read(r)?;
let mut ret = Vec::with_capacity(cmp::min(len.0 as usize, MAX_BUF_SIZE / core::mem::size_of::<$ty>()));
for _ in 0..len.0 {
let elem_len: CollectionLength = Readable::read(r)?;
let mut elem_reader = FixedLengthReader::new(r, elem_len.0);
if let Some(val) = MaybeReadable::read(&mut elem_reader)? {
ret.push(val);
}
}
Ok(ret)
}
}
}
}
macro_rules! impl_for_vec_with_element_length_prefix {
($ty: ty $(, $name: ident)*) => {
impl_writeable_for_vec_with_element_length_prefix!($ty $(, $name)*);
impl_readable_for_vec_with_element_length_prefix!($ty $(, $name)*);
}
}
impl Writeable for Vec<u8> {
#[inline]
fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
@ -851,6 +894,8 @@ impl_for_vec!(crate::ln::msgs::SocketAddress);
impl_for_vec!((A, B), A, B);
impl_writeable_for_vec!(&crate::routing::router::BlindedTail);
impl_readable_for_vec!(crate::routing::router::BlindedTail);
impl_for_vec_with_element_length_prefix!(crate::ln::msgs::UpdateAddHTLC);
impl_writeable_for_vec_with_element_length_prefix!(&crate::ln::msgs::UpdateAddHTLC);
impl Writeable for Vec<Witness> {
#[inline]

View file

@ -1033,7 +1033,7 @@ macro_rules! impl_writeable_tlv_based_enum {
$($variant_id => {
// Because read_tlv_fields creates a labeled loop, we cannot call it twice
// in the same function body. Instead, we define a closure and call it.
let f = || {
let mut f = || {
$crate::_init_and_read_len_prefixed_tlv_fields!(reader, {
$(($type, $field, $fieldty)),*
});
@ -1087,7 +1087,7 @@ macro_rules! impl_writeable_tlv_based_enum_upgradable {
$($variant_id => {
// Because read_tlv_fields creates a labeled loop, we cannot call it twice
// in the same function body. Instead, we define a closure and call it.
let f = || {
let mut f = || {
$crate::_init_and_read_len_prefixed_tlv_fields!(reader, {
$(($type, $field, $fieldty)),*
});