Track an EventCompletionAction for after an Event is processed

This will allow us to block `ChannelMonitorUpdate`s on `Event`
processing in the next commit.

Note that this gets dangerously close to breaking forwards
compatibility - if we have an `Event` with an
`EventCompletionAction` tied to it, we persist a new, even, TLV in
the `ChannelManager`. Hopefully this should be uncommon, as it
implies an `Event` was delayed until after a full round-trip to a
peer.
This commit is contained in:
Matt Corallo 2023-04-28 04:24:25 +00:00
parent ac5efa2755
commit 9ede794e8e
2 changed files with 200 additions and 112 deletions

View file

@ -521,6 +521,20 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
(2, EmitEvent) => { (0, event, upgradable_required) },
);
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum EventCompletionAction {
ReleaseRAAChannelMonitorUpdate {
counterparty_node_id: PublicKey,
channel_funding_outpoint: OutPoint,
},
}
impl_writeable_tlv_based_enum!(EventCompletionAction,
(0, ReleaseRAAChannelMonitorUpdate) => {
(0, channel_funding_outpoint, required),
(2, counterparty_node_id, required),
};
);
/// State we hold per-peer.
pub(super) struct PeerState<Signer: ChannelSigner> {
/// `temporary_channel_id` or `channel_id` -> `channel`.
@ -932,8 +946,17 @@ where
#[cfg(any(test, feature = "_test_utils"))]
pub(super) per_peer_state: FairRwLock<HashMap<PublicKey, Mutex<PeerState<<SP::Target as SignerProvider>::Signer>>>>,
/// The set of events which we need to give to the user to handle. In some cases an event may
/// require some further action after the user handles it (currently only blocking a monitor
/// update from being handed to the user to ensure the included changes to the channel state
/// are handled by the user before they're persisted durably to disk). In that case, the second
/// element in the tuple is set to `Some` with further details of the action.
///
/// Note that events MUST NOT be removed from pending_events after deserialization, as they
/// could be in the middle of being processed without the direct mutex held.
///
/// See `ChannelManager` struct-level documentation for lock order requirements.
pending_events: Mutex<Vec<events::Event>>,
pending_events: Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>,
/// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
pending_events_processor: AtomicBool,
/// See `ChannelManager` struct-level documentation for lock order requirements.
@ -1446,10 +1469,10 @@ macro_rules! handle_error {
});
}
if let Some((channel_id, user_channel_id)) = chan_id {
$self.pending_events.lock().unwrap().push(events::Event::ChannelClosed {
$self.pending_events.lock().unwrap().push_back((events::Event::ChannelClosed {
channel_id, user_channel_id,
reason: ClosureReason::ProcessingError { err: err.err.clone() }
});
}, None));
}
}
@ -1581,13 +1604,13 @@ macro_rules! send_channel_ready {
macro_rules! emit_channel_pending_event {
($locked_events: expr, $channel: expr) => {
if $channel.should_emit_channel_pending_event() {
$locked_events.push(events::Event::ChannelPending {
$locked_events.push_back((events::Event::ChannelPending {
channel_id: $channel.channel_id(),
former_temporary_channel_id: $channel.temporary_channel_id(),
counterparty_node_id: $channel.get_counterparty_node_id(),
user_channel_id: $channel.get_user_id(),
funding_txo: $channel.get_funding_txo().unwrap().into_bitcoin_outpoint(),
});
}, None));
$channel.set_channel_pending_event_emitted();
}
}
@ -1597,12 +1620,12 @@ macro_rules! emit_channel_ready_event {
($locked_events: expr, $channel: expr) => {
if $channel.should_emit_channel_ready_event() {
debug_assert!($channel.channel_pending_event_emitted());
$locked_events.push(events::Event::ChannelReady {
$locked_events.push_back((events::Event::ChannelReady {
channel_id: $channel.channel_id(),
user_channel_id: $channel.get_user_id(),
counterparty_node_id: $channel.get_counterparty_node_id(),
channel_type: $channel.get_channel_type().clone(),
});
}, None));
$channel.set_channel_ready_event_emitted();
}
}
@ -1721,7 +1744,7 @@ macro_rules! process_events_body {
result = NotifyOption::DoPersist;
}
for event in pending_events {
for (event, _action) in pending_events {
$event_to_handle = event;
$handle_event;
}
@ -1802,7 +1825,7 @@ where
per_peer_state: FairRwLock::new(HashMap::new()),
pending_events: Mutex::new(Vec::new()),
pending_events: Mutex::new(VecDeque::new()),
pending_events_processor: AtomicBool::new(false),
pending_background_events: Mutex::new(Vec::new()),
total_consistency_lock: RwLock::new(()),
@ -2010,15 +2033,17 @@ where
let mut pending_events_lock = self.pending_events.lock().unwrap();
match channel.unbroadcasted_funding() {
Some(transaction) => {
pending_events_lock.push(events::Event::DiscardFunding { channel_id: channel.channel_id(), transaction })
pending_events_lock.push_back((events::Event::DiscardFunding {
channel_id: channel.channel_id(), transaction
}, None));
},
None => {},
}
pending_events_lock.push(events::Event::ChannelClosed {
pending_events_lock.push_back((events::Event::ChannelClosed {
channel_id: channel.channel_id(),
user_channel_id: channel.get_user_id(),
reason: closure_reason
});
}, None));
}
fn close_channel_internal(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, target_feerate_sats_per_1000_weight: Option<u32>) -> Result<(), APIError> {
@ -3233,7 +3258,7 @@ where
pub fn process_pending_htlc_forwards(&self) {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
let mut new_events = Vec::new();
let mut new_events = VecDeque::new();
let mut failed_forwards = Vec::new();
let mut phantom_receives: Vec<(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new();
{
@ -3559,7 +3584,7 @@ where
htlcs.push(claimable_htlc);
let amount_msat = htlcs.iter().map(|htlc| htlc.value).sum();
htlcs.iter_mut().for_each(|htlc| htlc.total_value_received = Some(amount_msat));
new_events.push(events::Event::PaymentClaimable {
new_events.push_back((events::Event::PaymentClaimable {
receiver_node_id: Some(receiver_node_id),
payment_hash,
purpose: purpose(),
@ -3568,7 +3593,7 @@ where
via_user_channel_id: Some(prev_user_channel_id),
claim_deadline: Some(earliest_expiry - HTLC_FAIL_BACK_BUFFER),
onion_fields: claimable_payment.onion_fields.clone(),
});
}, None));
payment_claimable_generated = true;
} else {
// Nothing to do - we haven't reached the total
@ -3629,7 +3654,7 @@ where
htlcs: vec![claimable_htlc],
});
let prev_channel_id = prev_funding_outpoint.to_channel_id();
new_events.push(events::Event::PaymentClaimable {
new_events.push_back((events::Event::PaymentClaimable {
receiver_node_id: Some(receiver_node_id),
payment_hash,
amount_msat,
@ -3638,7 +3663,7 @@ where
via_user_channel_id: Some(prev_user_channel_id),
claim_deadline,
onion_fields: Some(onion_fields),
});
}, None));
},
hash_map::Entry::Occupied(_) => {
log_trace!(self.logger, "Failing new keysend HTLC with payment_hash {} for a duplicative payment hash", log_bytes!(payment_hash.0));
@ -4116,10 +4141,10 @@ where
mem::drop(forward_htlcs);
if push_forward_ev { self.push_pending_forwards_ev(); }
let mut pending_events = self.pending_events.lock().unwrap();
pending_events.push(events::Event::HTLCHandlingFailed {
pending_events.push_back((events::Event::HTLCHandlingFailed {
prev_channel_id: outpoint.to_channel_id(),
failed_next_destination: destination,
});
}, None));
},
}
}
@ -4382,13 +4407,13 @@ where
MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => {
let payment = self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash);
if let Some(ClaimingPayment { amount_msat, payment_purpose: purpose, receiver_node_id }) = payment {
self.pending_events.lock().unwrap().push(events::Event::PaymentClaimed {
self.pending_events.lock().unwrap().push_back((events::Event::PaymentClaimed {
payment_hash, purpose, amount_msat, receiver_node_id: Some(receiver_node_id),
});
}, None));
}
},
MonitorUpdateCompletionAction::EmitEvent { event } => {
self.pending_events.lock().unwrap().push(event);
self.pending_events.lock().unwrap().push_back((event, None));
},
}
}
@ -4712,15 +4737,13 @@ where
});
} else {
let mut pending_events = self.pending_events.lock().unwrap();
pending_events.push(
events::Event::OpenChannelRequest {
temporary_channel_id: msg.temporary_channel_id.clone(),
counterparty_node_id: counterparty_node_id.clone(),
funding_satoshis: msg.funding_satoshis,
push_msat: msg.push_msat,
channel_type: channel.get_channel_type().clone(),
}
);
pending_events.push_back((events::Event::OpenChannelRequest {
temporary_channel_id: msg.temporary_channel_id.clone(),
counterparty_node_id: counterparty_node_id.clone(),
funding_satoshis: msg.funding_satoshis,
push_msat: msg.push_msat,
channel_type: channel.get_channel_type().clone(),
}, None));
}
entry.insert(channel);
@ -4748,13 +4771,13 @@ where
}
};
let mut pending_events = self.pending_events.lock().unwrap();
pending_events.push(events::Event::FundingGenerationReady {
pending_events.push_back((events::Event::FundingGenerationReady {
temporary_channel_id: msg.temporary_channel_id,
counterparty_node_id: *counterparty_node_id,
channel_value_satoshis: value,
output_script,
user_channel_id: user_id,
});
}, None));
Ok(())
}
@ -5144,7 +5167,7 @@ where
fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)]) {
for &mut (prev_short_channel_id, prev_funding_outpoint, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards {
let mut push_forward_event = false;
let mut new_intercept_events = Vec::new();
let mut new_intercept_events = VecDeque::new();
let mut failed_intercept_forwards = Vec::new();
if !pending_forwards.is_empty() {
for (forward_info, prev_htlc_id) in pending_forwards.drain(..) {
@ -5171,13 +5194,13 @@ where
let mut pending_intercepts = self.pending_intercepted_htlcs.lock().unwrap();
match pending_intercepts.entry(intercept_id) {
hash_map::Entry::Vacant(entry) => {
new_intercept_events.push(events::Event::HTLCIntercepted {
new_intercept_events.push_back((events::Event::HTLCIntercepted {
requested_next_hop_scid: scid,
payment_hash: forward_info.payment_hash,
inbound_amount_msat: forward_info.incoming_amt_msat.unwrap(),
expected_outbound_amount_msat: forward_info.outgoing_amt_msat,
intercept_id
});
}, None));
entry.insert(PendingAddHTLCInfo {
prev_short_channel_id, prev_funding_outpoint, prev_htlc_id, prev_user_channel_id, forward_info });
},
@ -5227,13 +5250,13 @@ where
fn push_pending_forwards_ev(&self) {
let mut pending_events = self.pending_events.lock().unwrap();
let forward_ev_exists = pending_events.iter()
.find(|ev| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false })
.find(|(ev, _)| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false })
.is_some();
if !forward_ev_exists {
pending_events.push(events::Event::PendingHTLCsForwardable {
pending_events.push_back((events::Event::PendingHTLCsForwardable {
time_forwardable:
Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS),
});
}, None));
}
}
@ -5884,13 +5907,13 @@ where
#[cfg(feature = "_test_utils")]
pub fn push_pending_event(&self, event: events::Event) {
let mut events = self.pending_events.lock().unwrap();
events.push(event);
events.push_back((event, None));
}
#[cfg(test)]
pub fn pop_pending_event(&self) -> Option<events::Event> {
let mut events = self.pending_events.lock().unwrap();
if events.is_empty() { None } else { Some(events.remove(0)) }
events.pop_front().map(|(e, _)| e)
}
#[cfg(test)]
@ -7227,9 +7250,19 @@ where
}
let events = self.pending_events.lock().unwrap();
(events.len() as u64).write(writer)?;
for event in events.iter() {
event.write(writer)?;
// LDK versions prior to 0.0.115 don't support post-event actions, thus if there's no
// actions at all, skip writing the required TLV. Otherwise, pre-0.0.115 versions will
// refuse to read the new ChannelManager.
let events_not_backwards_compatible = events.iter().any(|(_, action)| action.is_some());
if events_not_backwards_compatible {
// If we're gonna write a even TLV that will overwrite our events anyway we might as
// well save the space and not write any events here.
0u64.write(writer)?;
} else {
(events.len() as u64).write(writer)?;
for (event, _) in events.iter() {
event.write(writer)?;
}
}
let background_events = self.pending_background_events.lock().unwrap();
@ -7310,6 +7343,7 @@ where
(5, self.our_network_pubkey, required),
(6, monitor_update_blocked_actions_per_peer, option),
(7, self.fake_scid_rand_bytes, required),
(8, if events_not_backwards_compatible { Some(&*events) } else { None }, option),
(9, htlc_purposes, vec_type),
(11, self.probing_cookie_secret, required),
(13, htlc_onion_fields, optional_vec),
@ -7319,6 +7353,47 @@ where
}
}
impl Writeable for VecDeque<(Event, Option<EventCompletionAction>)> {
fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
(self.len() as u64).write(w)?;
for (event, action) in self.iter() {
event.write(w)?;
action.write(w)?;
#[cfg(debug_assertions)] {
// Events are MaybeReadable, in some cases indicating that they shouldn't actually
// be persisted and are regenerated on restart. However, if such an event has a
// post-event-handling action we'll write nothing for the event and would have to
// either forget the action or fail on deserialization (which we do below). Thus,
// check that the event is sane here.
let event_encoded = event.encode();
let event_read: Option<Event> =
MaybeReadable::read(&mut &event_encoded[..]).unwrap();
if action.is_some() { assert!(event_read.is_some()); }
}
}
Ok(())
}
}
impl Readable for VecDeque<(Event, Option<EventCompletionAction>)> {
fn read<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
let len: u64 = Readable::read(reader)?;
const MAX_ALLOC_SIZE: u64 = 1024 * 16;
let mut events: Self = VecDeque::with_capacity(cmp::min(
MAX_ALLOC_SIZE/mem::size_of::<(events::Event, Option<EventCompletionAction>)>() as u64,
len) as usize);
for _ in 0..len {
let ev_opt = MaybeReadable::read(reader)?;
let action = Readable::read(reader)?;
if let Some(ev) = ev_opt {
events.push_back((ev, action));
} else if action.is_some() {
return Err(DecodeError::InvalidValue);
}
}
Ok(events)
}
}
/// Arguments for the creation of a ChannelManager that are not deserialized.
///
/// At a high-level, the process for deserializing a ChannelManager and resuming normal operation
@ -7485,7 +7560,7 @@ where
let mut peer_channels: HashMap<PublicKey, HashMap<[u8; 32], Channel<<SP::Target as SignerProvider>::Signer>>> = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
let mut id_to_peer = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
let mut short_to_chan_info = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
let mut channel_closures = Vec::new();
let mut channel_closures = VecDeque::new();
let mut pending_background_events = Vec::new();
for _ in 0..channel_count {
let mut channel: Channel<<SP::Target as SignerProvider>::Signer> = Channel::read(reader, (
@ -7521,11 +7596,11 @@ where
pending_background_events.push(BackgroundEvent::ClosingMonitorUpdate(monitor_update));
}
failed_htlcs.append(&mut new_failed_htlcs);
channel_closures.push(events::Event::ChannelClosed {
channel_closures.push_back((events::Event::ChannelClosed {
channel_id: channel.channel_id(),
user_channel_id: channel.get_user_id(),
reason: ClosureReason::OutdatedChannelManager
});
}, None));
for (channel_htlc_source, payment_hash) in channel.inflight_htlc_sources() {
let mut found_htlc = false;
for (monitor_htlc_source, _) in monitor.get_all_current_outbound_htlcs() {
@ -7570,11 +7645,11 @@ where
// was in-progress, we never broadcasted the funding transaction and can still
// safely discard the channel.
let _ = channel.force_shutdown(false);
channel_closures.push(events::Event::ChannelClosed {
channel_closures.push_back((events::Event::ChannelClosed {
channel_id: channel.channel_id(),
user_channel_id: channel.get_user_id(),
reason: ClosureReason::DisconnectedPeer,
});
}, None));
} else {
log_error!(args.logger, "Missing ChannelMonitor for channel {} needed by ChannelManager.", log_bytes!(channel.channel_id()));
log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,");
@ -7635,10 +7710,11 @@ where
}
let event_count: u64 = Readable::read(reader)?;
let mut pending_events_read: Vec<events::Event> = Vec::with_capacity(cmp::min(event_count as usize, MAX_ALLOC_SIZE/mem::size_of::<events::Event>()));
let mut pending_events_read: VecDeque<(events::Event, Option<EventCompletionAction>)> =
VecDeque::with_capacity(cmp::min(event_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(events::Event, Option<EventCompletionAction>)>()));
for _ in 0..event_count {
match MaybeReadable::read(reader)? {
Some(event) => pending_events_read.push(event),
Some(event) => pending_events_read.push_back((event, None)),
None => continue,
}
}
@ -7694,6 +7770,7 @@ where
let mut claimable_htlc_onion_fields = None;
let mut pending_claiming_payments = Some(HashMap::new());
let mut monitor_update_blocked_actions_per_peer = Some(Vec::new());
let mut events_override = None;
read_tlv_fields!(reader, {
(1, pending_outbound_payments_no_retry, option),
(2, pending_intercepted_htlcs, option),
@ -7702,6 +7779,7 @@ where
(5, received_network_pubkey, option),
(6, monitor_update_blocked_actions_per_peer, option),
(7, fake_scid_rand_bytes, option),
(8, events_override, option),
(9, claimable_htlc_purposes, vec_type),
(11, probing_cookie_secret, option),
(13, claimable_htlc_onion_fields, optional_vec),
@ -7714,6 +7792,10 @@ where
probing_cookie_secret = Some(args.entropy_source.get_secure_random_bytes());
}
if let Some(events) = events_override {
pending_events_read = events;
}
if !channel_closures.is_empty() {
pending_events_read.append(&mut channel_closures);
}
@ -7809,7 +7891,7 @@ where
if pending_forward_matches_htlc(&htlc_info) {
log_info!(args.logger, "Removing pending intercepted HTLC with hash {} as it was forwarded to the closed channel {}",
log_bytes!(htlc.payment_hash.0), log_bytes!(monitor.get_funding_txo().0.to_channel_id()));
pending_events_read.retain(|event| {
pending_events_read.retain(|(event, _)| {
if let Event::HTLCIntercepted { intercept_id: ev_id, .. } = event {
intercepted_id != ev_id
} else { true }
@ -7845,9 +7927,9 @@ where
// shut down before the timer hit. Either way, set the time_forwardable to a small
// constant as enough time has likely passed that we should simply handle the forwards
// now, or at least after the user gets a chance to reconnect to our peers.
pending_events_read.push(events::Event::PendingHTLCsForwardable {
pending_events_read.push_back((events::Event::PendingHTLCsForwardable {
time_forwardable: Duration::from_secs(2),
});
}, None));
}
let inbound_pmt_key_material = args.node_signer.get_inbound_payment_key_material();
@ -8001,12 +8083,12 @@ where
previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &args.tx_broadcaster, &bounded_fee_estimator, &args.logger);
}
}
pending_events_read.push(events::Event::PaymentClaimed {
pending_events_read.push_back((events::Event::PaymentClaimed {
receiver_node_id,
payment_hash,
purpose: payment.purpose,
amount_msat: claimable_amt_msat,
});
}, None));
}
}
}

View file

@ -16,7 +16,7 @@ use bitcoin::secp256k1::{self, Secp256k1, SecretKey};
use crate::chain::keysinterface::{EntropySource, NodeSigner, Recipient};
use crate::events::{self, PaymentFailureReason};
use crate::ln::{PaymentHash, PaymentPreimage, PaymentSecret};
use crate::ln::channelmanager::{ChannelDetails, HTLCSource, IDEMPOTENCY_TIMEOUT_TICKS, PaymentId};
use crate::ln::channelmanager::{ChannelDetails, EventCompletionAction, HTLCSource, IDEMPOTENCY_TIMEOUT_TICKS, PaymentId};
use crate::ln::onion_utils::HTLCFailReason;
use crate::routing::router::{InFlightHtlcs, Path, PaymentParameters, Route, RouteParameters, Router};
use crate::util::errors::APIError;
@ -487,7 +487,7 @@ impl OutboundPayments {
retry_strategy: Retry, route_params: RouteParameters, router: &R,
first_hops: Vec<ChannelDetails>, compute_inflight_htlcs: IH, entropy_source: &ES,
node_signer: &NS, best_block_height: u32, logger: &L,
pending_events: &Mutex<Vec<events::Event>>, send_payment_along_path: SP,
pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>, send_payment_along_path: SP,
) -> Result<(), RetryableSendFailure>
where
R::Target: Router,
@ -525,7 +525,7 @@ impl OutboundPayments {
payment_id: PaymentId, retry_strategy: Retry, route_params: RouteParameters, router: &R,
first_hops: Vec<ChannelDetails>, inflight_htlcs: IH, entropy_source: &ES,
node_signer: &NS, best_block_height: u32, logger: &L,
pending_events: &Mutex<Vec<events::Event>>, send_payment_along_path: SP
pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>, send_payment_along_path: SP
) -> Result<PaymentHash, RetryableSendFailure>
where
R::Target: Router,
@ -575,7 +575,8 @@ impl OutboundPayments {
pub(super) fn check_retry_payments<R: Deref, ES: Deref, NS: Deref, SP, IH, FH, L: Deref>(
&self, router: &R, first_hops: FH, inflight_htlcs: IH, entropy_source: &ES, node_signer: &NS,
best_block_height: u32, pending_events: &Mutex<Vec<events::Event>>, logger: &L,
best_block_height: u32,
pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>, logger: &L,
send_payment_along_path: SP,
)
where
@ -617,11 +618,11 @@ impl OutboundPayments {
if !pmt.is_auto_retryable_now() && pmt.remaining_parts() == 0 {
pmt.mark_abandoned(PaymentFailureReason::RetriesExhausted);
if let PendingOutboundPayment::Abandoned { payment_hash, reason, .. } = pmt {
pending_events.lock().unwrap().push(events::Event::PaymentFailed {
pending_events.lock().unwrap().push_back((events::Event::PaymentFailed {
payment_id: *pmt_id,
payment_hash: *payment_hash,
reason: *reason,
});
}, None));
retain = false;
}
}
@ -645,7 +646,7 @@ impl OutboundPayments {
keysend_preimage: Option<PaymentPreimage>, retry_strategy: Retry, route_params: RouteParameters,
router: &R, first_hops: Vec<ChannelDetails>, inflight_htlcs: IH, entropy_source: &ES,
node_signer: &NS, best_block_height: u32, logger: &L,
pending_events: &Mutex<Vec<events::Event>>, send_payment_along_path: SP,
pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>, send_payment_along_path: SP,
) -> Result<(), RetryableSendFailure>
where
R::Target: Router,
@ -686,7 +687,7 @@ impl OutboundPayments {
&self, payment_hash: PaymentHash, payment_id: PaymentId, route_params: RouteParameters,
router: &R, first_hops: Vec<ChannelDetails>, inflight_htlcs: &IH, entropy_source: &ES,
node_signer: &NS, best_block_height: u32, logger: &L,
pending_events: &Mutex<Vec<events::Event>>, send_payment_along_path: &SP,
pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>, send_payment_along_path: &SP,
)
where
R::Target: Router,
@ -736,11 +737,11 @@ impl OutboundPayments {
$payment.get_mut().mark_abandoned($reason);
if let PendingOutboundPayment::Abandoned { reason, .. } = $payment.get() {
if $payment.get().remaining_parts() == 0 {
pending_events.lock().unwrap().push(events::Event::PaymentFailed {
pending_events.lock().unwrap().push_back((events::Event::PaymentFailed {
payment_id,
payment_hash,
reason: *reason,
});
}, None));
$payment.remove();
}
}
@ -808,7 +809,7 @@ impl OutboundPayments {
&self, err: PaymentSendFailure, payment_id: PaymentId, payment_hash: PaymentHash, route: Route,
mut route_params: RouteParameters, router: &R, first_hops: Vec<ChannelDetails>,
inflight_htlcs: &IH, entropy_source: &ES, node_signer: &NS, best_block_height: u32, logger: &L,
pending_events: &Mutex<Vec<events::Event>>, send_payment_along_path: &SP,
pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>, send_payment_along_path: &SP,
)
where
R::Target: Router,
@ -851,7 +852,8 @@ impl OutboundPayments {
fn push_path_failed_evs_and_scids<I: ExactSizeIterator + Iterator<Item = Result<(), APIError>>, L: Deref>(
payment_id: PaymentId, payment_hash: PaymentHash, route_params: &mut RouteParameters,
paths: Vec<Path>, path_results: I, logger: &L, pending_events: &Mutex<Vec<events::Event>>
paths: Vec<Path>, path_results: I, logger: &L,
pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>,
) where L::Target: Logger {
let mut events = pending_events.lock().unwrap();
debug_assert_eq!(paths.len(), path_results.len());
@ -865,7 +867,7 @@ impl OutboundPayments {
failed_scid = Some(scid);
route_params.payment_params.previously_failed_channels.push(scid);
}
events.push(events::Event::PaymentPathFailed {
events.push_back((events::Event::PaymentPathFailed {
payment_id: Some(payment_id),
payment_hash,
payment_failed_permanently: false,
@ -876,7 +878,7 @@ impl OutboundPayments {
error_code: None,
#[cfg(test)]
error_data: None,
});
}, None));
}
}
}
@ -1112,7 +1114,9 @@ impl OutboundPayments {
pub(super) fn claim_htlc<L: Deref>(
&self, payment_id: PaymentId, payment_preimage: PaymentPreimage, session_priv: SecretKey,
path: Path, from_onchain: bool, pending_events: &Mutex<Vec<events::Event>>, logger: &L
path: Path, from_onchain: bool,
pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>,
logger: &L,
) where L::Target: Logger {
let mut session_priv_bytes = [0; 32];
session_priv_bytes.copy_from_slice(&session_priv[..]);
@ -1122,14 +1126,12 @@ impl OutboundPayments {
if !payment.get().is_fulfilled() {
let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner());
let fee_paid_msat = payment.get().get_pending_fee_msat();
pending_events.push(
events::Event::PaymentSent {
payment_id: Some(payment_id),
payment_preimage,
payment_hash,
fee_paid_msat,
}
);
pending_events.push_back((events::Event::PaymentSent {
payment_id: Some(payment_id),
payment_preimage,
payment_hash,
fee_paid_msat,
}, None));
payment.get_mut().mark_fulfilled();
}
@ -1142,13 +1144,11 @@ impl OutboundPayments {
// irrevocably fulfilled.
if payment.get_mut().remove(&session_priv_bytes, Some(&path)) {
let payment_hash = Some(PaymentHash(Sha256::hash(&payment_preimage.0).into_inner()));
pending_events.push(
events::Event::PaymentPathSuccessful {
payment_id,
payment_hash,
path,
}
);
pending_events.push_back((events::Event::PaymentPathSuccessful {
payment_id,
payment_hash,
path,
}, None));
}
}
} else {
@ -1156,7 +1156,9 @@ impl OutboundPayments {
}
}
pub(super) fn finalize_claims(&self, sources: Vec<HTLCSource>, pending_events: &Mutex<Vec<events::Event>>) {
pub(super) fn finalize_claims(&self, sources: Vec<HTLCSource>,
pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>)
{
let mut outbounds = self.pending_outbound_payments.lock().unwrap();
let mut pending_events = pending_events.lock().unwrap();
for source in sources {
@ -1166,20 +1168,20 @@ impl OutboundPayments {
if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) {
assert!(payment.get().is_fulfilled());
if payment.get_mut().remove(&session_priv_bytes, None) {
pending_events.push(
events::Event::PaymentPathSuccessful {
payment_id,
payment_hash: payment.get().payment_hash(),
path,
}
);
pending_events.push_back((events::Event::PaymentPathSuccessful {
payment_id,
payment_hash: payment.get().payment_hash(),
path,
}, None));
}
}
}
}
}
pub(super) fn remove_stale_resolved_payments(&self, pending_events: &Mutex<Vec<events::Event>>) {
pub(super) fn remove_stale_resolved_payments(&self,
pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>)
{
// If an outbound payment was completed, and no pending HTLCs remain, we should remove it
// from the map. However, if we did that immediately when the last payment HTLC is claimed,
// this could race the user making a duplicate send_payment call and our idempotency
@ -1193,7 +1195,7 @@ impl OutboundPayments {
if let PendingOutboundPayment::Fulfilled { session_privs, timer_ticks_without_htlcs, .. } = payment {
let mut no_remaining_entries = session_privs.is_empty();
if no_remaining_entries {
for ev in pending_events.iter() {
for (ev, _) in pending_events.iter() {
match ev {
events::Event::PaymentSent { payment_id: Some(ev_payment_id), .. } |
events::Event::PaymentPathSuccessful { payment_id: ev_payment_id, .. } |
@ -1221,8 +1223,9 @@ impl OutboundPayments {
// Returns a bool indicating whether a PendingHTLCsForwardable event should be generated.
pub(super) fn fail_htlc<L: Deref>(
&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason,
path: &Path, session_priv: &SecretKey, payment_id: &PaymentId, probing_cookie_secret: [u8; 32],
secp_ctx: &Secp256k1<secp256k1::All>, pending_events: &Mutex<Vec<events::Event>>, logger: &L
path: &Path, session_priv: &SecretKey, payment_id: &PaymentId,
probing_cookie_secret: [u8; 32], secp_ctx: &Secp256k1<secp256k1::All>,
pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>, logger: &L,
) -> bool where L::Target: Logger {
#[cfg(test)]
let (network_update, short_channel_id, payment_retryable, onion_error_code, onion_error_data) = onion_error.decode_onion_failure(secp_ctx, logger, &source);
@ -1334,24 +1337,25 @@ impl OutboundPayments {
}
};
let mut pending_events = pending_events.lock().unwrap();
pending_events.push(path_failure);
if let Some(ev) = full_failure_ev { pending_events.push(ev); }
pending_events.push_back((path_failure, None));
if let Some(ev) = full_failure_ev { pending_events.push_back((ev, None)); }
pending_retry_ev
}
pub(super) fn abandon_payment(
&self, payment_id: PaymentId, reason: PaymentFailureReason, pending_events: &Mutex<Vec<events::Event>>
&self, payment_id: PaymentId, reason: PaymentFailureReason,
pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>
) {
let mut outbounds = self.pending_outbound_payments.lock().unwrap();
if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) {
payment.get_mut().mark_abandoned(reason);
if let PendingOutboundPayment::Abandoned { payment_hash, reason, .. } = payment.get() {
if payment.get().remaining_parts() == 0 {
pending_events.lock().unwrap().push(events::Event::PaymentFailed {
pending_events.lock().unwrap().push_back((events::Event::PaymentFailed {
payment_id,
payment_hash: *payment_hash,
reason: *reason,
});
}, None));
payment.remove();
}
}
@ -1435,6 +1439,8 @@ mod tests {
use crate::util::errors::APIError;
use crate::util::test_utils;
use alloc::collections::VecDeque;
#[test]
#[cfg(feature = "std")]
fn fails_paying_after_expiration() {
@ -1460,7 +1466,7 @@ mod tests {
payment_params,
final_value_msat: 0,
};
let pending_events = Mutex::new(Vec::new());
let pending_events = Mutex::new(VecDeque::new());
if on_retry {
outbound_payments.add_new_pending_payment(PaymentHash([0; 32]), RecipientOnionFields::spontaneous_empty(),
PaymentId([0; 32]), None, &Route { paths: vec![], payment_params: None },
@ -1472,7 +1478,7 @@ mod tests {
&pending_events, &|_, _, _, _, _, _, _, _| Ok(()));
let events = pending_events.lock().unwrap();
assert_eq!(events.len(), 1);
if let Event::PaymentFailed { ref reason, .. } = events[0] {
if let Event::PaymentFailed { ref reason, .. } = events[0].0 {
assert_eq!(reason.unwrap(), PaymentFailureReason::PaymentExpired);
} else { panic!("Unexpected event"); }
} else {
@ -1508,7 +1514,7 @@ mod tests {
router.expect_find_route(route_params.clone(),
Err(LightningError { err: String::new(), action: ErrorAction::IgnoreError }));
let pending_events = Mutex::new(Vec::new());
let pending_events = Mutex::new(VecDeque::new());
if on_retry {
outbound_payments.add_new_pending_payment(PaymentHash([0; 32]), RecipientOnionFields::spontaneous_empty(),
PaymentId([0; 32]), None, &Route { paths: vec![], payment_params: None },
@ -1520,7 +1526,7 @@ mod tests {
&pending_events, &|_, _, _, _, _, _, _, _| Ok(()));
let events = pending_events.lock().unwrap();
assert_eq!(events.len(), 1);
if let Event::PaymentFailed { .. } = events[0] { } else { panic!("Unexpected event"); }
if let Event::PaymentFailed { .. } = events[0].0 { } else { panic!("Unexpected event"); }
} else {
let err = outbound_payments.send_payment(
PaymentHash([0; 32]), RecipientOnionFields::spontaneous_empty(), PaymentId([0; 32]),
@ -1570,7 +1576,7 @@ mod tests {
// Ensure that a ChannelUnavailable error will result in blaming an scid in the
// PaymentPathFailed event.
let pending_events = Mutex::new(Vec::new());
let pending_events = Mutex::new(VecDeque::new());
outbound_payments.send_payment(
PaymentHash([0; 32]), RecipientOnionFields::spontaneous_empty(), PaymentId([0; 32]),
Retry::Attempts(0), route_params.clone(), &&router, vec![], || InFlightHtlcs::new(),
@ -1581,11 +1587,11 @@ mod tests {
assert_eq!(events.len(), 2);
if let Event::PaymentPathFailed {
short_channel_id,
failure: PathFailure::InitialSend { err: APIError::ChannelUnavailable { .. }}, .. } = events[0]
failure: PathFailure::InitialSend { err: APIError::ChannelUnavailable { .. }}, .. } = events[0].0
{
assert_eq!(short_channel_id, Some(failed_scid));
} else { panic!("Unexpected event"); }
if let Event::PaymentFailed { .. } = events[1] { } else { panic!("Unexpected event"); }
if let Event::PaymentFailed { .. } = events[1].0 { } else { panic!("Unexpected event"); }
events.clear();
core::mem::drop(events);
@ -1608,10 +1614,10 @@ mod tests {
assert_eq!(events.len(), 2);
if let Event::PaymentPathFailed {
short_channel_id,
failure: PathFailure::InitialSend { err: APIError::APIMisuseError { .. }}, .. } = events[0]
failure: PathFailure::InitialSend { err: APIError::APIMisuseError { .. }}, .. } = events[0].0
{
assert_eq!(short_channel_id, None);
} else { panic!("Unexpected event"); }
if let Event::PaymentFailed { .. } = events[1] { } else { panic!("Unexpected event"); }
if let Event::PaymentFailed { .. } = events[1].0 { } else { panic!("Unexpected event"); }
}
}