Merge pull request #1863 from TheBlueMatt/2022-11-holding-cell-batch-update

Lean on the holding cell when batch-forwarding/failing HTLCs
This commit is contained in:
Matt Corallo 2022-12-07 17:52:04 +00:00 committed by GitHub
commit d9d4611e65
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 134 additions and 233 deletions

View file

@ -1955,9 +1955,26 @@ impl<Signer: Sign> Channel<Signer> {
/// an HTLC more than once or fulfill once and then attempt to fail after reconnect. We cannot,
/// however, fail more than once as we wait for an upstream failure to be irrevocably committed
/// before we fail backwards.
/// If we do fail twice, we debug_assert!(false) and return Ok(None). Thus, will always return
/// Ok(_) if debug assertions are turned on or preconditions are met.
pub fn get_update_fail_htlc<L: Deref>(&mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, logger: &L) -> Result<Option<msgs::UpdateFailHTLC>, ChannelError> where L::Target: Logger {
///
/// If we do fail twice, we `debug_assert!(false)` and return `Ok(None)`. Thus, this will always
/// return `Ok(_)` if preconditions are met. In any case, `Err`s will only be
/// [`ChannelError::Ignore`].
pub fn queue_fail_htlc<L: Deref>(&mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, logger: &L)
-> Result<(), ChannelError> where L::Target: Logger {
self.fail_htlc(htlc_id_arg, err_packet, true, logger)
.map(|msg_opt| assert!(msg_opt.is_none(), "We forced holding cell?"))
}
/// We can only have one resolution per HTLC. In some cases around reconnect, we may fulfill
/// an HTLC more than once or fulfill once and then attempt to fail after reconnect. We cannot,
/// however, fail more than once as we wait for an upstream failure to be irrevocably committed
/// before we fail backwards.
///
/// If we do fail twice, we `debug_assert!(false)` and return `Ok(None)`. Thus, this will always
/// return `Ok(_)` if preconditions are met. In any case, `Err`s will only be
/// [`ChannelError::Ignore`].
fn fail_htlc<L: Deref>(&mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, mut force_holding_cell: bool, logger: &L)
-> Result<Option<msgs::UpdateFailHTLC>, ChannelError> where L::Target: Logger {
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
panic!("Was asked to fail an HTLC when channel was not in an operational state");
}
@ -1995,8 +2012,13 @@ impl<Signer: Sign> Channel<Signer> {
return Ok(None);
}
// Now update local state:
if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) != 0 {
debug_assert!(force_holding_cell, "!force_holding_cell is only called when emptying the holding cell, so we shouldn't end up back in it!");
force_holding_cell = true;
}
// Now update local state:
if force_holding_cell {
for pending_update in self.holding_cell_htlc_updates.iter() {
match pending_update {
&HTLCUpdateAwaitingACK::ClaimHTLC { htlc_id, .. } => {
@ -3171,8 +3193,8 @@ impl<Signer: Sign> Channel<Signer> {
} else { Ok((None, Vec::new())) }
}
/// Used to fulfill holding_cell_htlcs when we get a remote ack (or implicitly get it by them
/// fulfilling or failing the last pending HTLC)
/// Frees any pending commitment updates in the holding cell, generating the relevant messages
/// for our counterparty.
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, 0);
if self.holding_cell_htlc_updates.len() != 0 || self.holding_cell_update_fee.is_some() {
@ -3198,7 +3220,7 @@ impl<Signer: Sign> Channel<Signer> {
// to rebalance channels.
match &htlc_update {
&HTLCUpdateAwaitingACK::AddHTLC {amount_msat, cltv_expiry, ref payment_hash, ref source, ref onion_routing_packet, ..} => {
match self.send_htlc(amount_msat, *payment_hash, cltv_expiry, source.clone(), onion_routing_packet.clone(), logger) {
match self.send_htlc(amount_msat, *payment_hash, cltv_expiry, source.clone(), onion_routing_packet.clone(), false, logger) {
Ok(update_add_msg_option) => update_add_htlcs.push(update_add_msg_option.unwrap()),
Err(e) => {
match e {
@ -3234,13 +3256,13 @@ impl<Signer: Sign> Channel<Signer> {
monitor_update.updates.append(&mut additional_monitor_update.updates);
},
&HTLCUpdateAwaitingACK::FailHTLC { htlc_id, ref err_packet } => {
match self.get_update_fail_htlc(htlc_id, err_packet.clone(), logger) {
match self.fail_htlc(htlc_id, err_packet.clone(), false, logger) {
Ok(update_fail_msg_option) => {
// If an HTLC failure was previously added to the holding cell (via
// `get_update_fail_htlc`) then generating the fail message itself
// must not fail - we should never end up in a state where we
// double-fail an HTLC or fail-then-claim an HTLC as it indicates
// we didn't wait for a full revocation before failing.
// `queue_fail_htlc`) then generating the fail message itself must
// not fail - we should never end up in a state where we double-fail
// an HTLC or fail-then-claim an HTLC as it indicates we didn't wait
// for a full revocation before failing.
update_fail_htlcs.push(update_fail_msg_option.unwrap())
},
Err(e) => {
@ -3257,7 +3279,7 @@ impl<Signer: Sign> Channel<Signer> {
return Ok((None, htlcs_to_fail));
}
let update_fee = if let Some(feerate) = self.holding_cell_update_fee.take() {
self.send_update_fee(feerate, logger)
self.send_update_fee(feerate, false, logger)
} else {
None
};
@ -3557,12 +3579,22 @@ impl<Signer: Sign> Channel<Signer> {
}
}
/// Queues up an outbound update fee by placing it in the holding cell. You should call
/// [`Self::maybe_free_holding_cell_htlcs`] in order to actually generate and send the
/// commitment update.
pub fn queue_update_fee<L: Deref>(&mut self, feerate_per_kw: u32, logger: &L) where L::Target: Logger {
let msg_opt = self.send_update_fee(feerate_per_kw, true, logger);
assert!(msg_opt.is_none(), "We forced holding cell?");
}
/// Adds a pending update to this channel. See the doc for send_htlc for
/// further details on the optionness of the return value.
/// If our balance is too low to cover the cost of the next commitment transaction at the
/// new feerate, the update is cancelled.
/// You MUST call send_commitment prior to any other calls on this Channel
fn send_update_fee<L: Deref>(&mut self, feerate_per_kw: u32, logger: &L) -> Option<msgs::UpdateFee> where L::Target: Logger {
///
/// You MUST call [`Self::send_commitment_no_state_update`] prior to any other calls on this
/// [`Channel`] if `force_holding_cell` is false.
fn send_update_fee<L: Deref>(&mut self, feerate_per_kw: u32, mut force_holding_cell: bool, logger: &L) -> Option<msgs::UpdateFee> where L::Target: Logger {
if !self.is_outbound() {
panic!("Cannot send fee from inbound channel");
}
@ -3599,6 +3631,10 @@ impl<Signer: Sign> Channel<Signer> {
}
if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::MonitorUpdateInProgress as u32)) != 0 {
force_holding_cell = true;
}
if force_holding_cell {
self.holding_cell_update_fee = Some(feerate_per_kw);
return None;
}
@ -3612,16 +3648,6 @@ impl<Signer: Sign> Channel<Signer> {
})
}
pub fn send_update_fee_and_commit<L: Deref>(&mut self, feerate_per_kw: u32, logger: &L) -> Result<Option<(msgs::UpdateFee, msgs::CommitmentSigned, ChannelMonitorUpdate)>, ChannelError> where L::Target: Logger {
match self.send_update_fee(feerate_per_kw, logger) {
Some(update_fee) => {
let (commitment_signed, monitor_update) = self.send_commitment_no_status_check(logger)?;
Ok(Some((update_fee, commitment_signed, monitor_update)))
},
None => Ok(None)
}
}
/// Removes any uncommitted inbound HTLCs and resets the state of uncommitted outbound HTLC
/// updates, to be used on peer disconnection. After this, update_*_htlc messages need to be
/// resent.
@ -5495,8 +5521,26 @@ impl<Signer: Sign> Channel<Signer> {
// Send stuff to our remote peers:
/// Queues up an outbound HTLC to send by placing it in the holding cell. You should call
/// [`Self::maybe_free_holding_cell_htlcs`] in order to actually generate and send the
/// commitment update.
///
/// `Err`s will only be [`ChannelError::Ignore`].
pub fn queue_add_htlc<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource,
onion_routing_packet: msgs::OnionPacket, logger: &L)
-> Result<(), ChannelError> where L::Target: Logger {
self
.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, true, logger)
.map(|msg_opt| assert!(msg_opt.is_none(), "We forced holding cell?"))
.map_err(|err| {
if let ChannelError::Ignore(_) = err { /* fine */ }
else { debug_assert!(false, "Queueing cannot trigger channel failure"); }
err
})
}
/// Adds a pending outbound HTLC to this channel, note that you probably want
/// send_htlc_and_commit instead cause you'll want both messages at once.
/// [`Self::send_htlc_and_commit`] instead cause you'll want both messages at once.
///
/// This returns an optional UpdateAddHTLC as we may be in a state where we cannot add HTLCs on
/// the wire:
@ -5507,10 +5551,13 @@ impl<Signer: Sign> Channel<Signer> {
/// we may not yet have sent the previous commitment update messages and will need to
/// regenerate them.
///
/// You MUST call send_commitment prior to calling any other methods on this Channel!
/// You MUST call [`Self::send_commitment_no_state_update`] prior to calling any other methods
/// on this [`Channel`] if `force_holding_cell` is false.
///
/// If an Err is returned, it's a ChannelError::Ignore!
pub fn send_htlc<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, logger: &L) -> Result<Option<msgs::UpdateAddHTLC>, ChannelError> where L::Target: Logger {
/// `Err`s will only be [`ChannelError::Ignore`].
fn send_htlc<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource,
onion_routing_packet: msgs::OnionPacket, mut force_holding_cell: bool, logger: &L)
-> Result<Option<msgs::UpdateAddHTLC>, ChannelError> where L::Target: Logger {
if (self.channel_state & (ChannelState::ChannelReady as u32 | BOTH_SIDES_SHUTDOWN_MASK)) != (ChannelState::ChannelReady as u32) {
return Err(ChannelError::Ignore("Cannot send HTLC until channel is fully established and we haven't started shutting down".to_owned()));
}
@ -5605,8 +5652,12 @@ impl<Signer: Sign> Channel<Signer> {
return Err(ChannelError::Ignore(format!("Cannot send value that would put our balance under counterparty-announced channel reserve value ({})", chan_reserve_msat)));
}
// Now update local state:
if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::MonitorUpdateInProgress as u32)) != 0 {
force_holding_cell = true;
}
// Now update local state:
if force_holding_cell {
self.holding_cell_htlc_updates.push(HTLCUpdateAwaitingACK::AddHTLC {
amount_msat,
payment_hash,
@ -5639,41 +5690,6 @@ impl<Signer: Sign> Channel<Signer> {
Ok(Some(res))
}
/// Creates a signed commitment transaction to send to the remote peer.
/// Always returns a ChannelError::Close if an immediately-preceding (read: the
/// last call to this Channel) send_htlc returned Ok(Some(_)) and there is an Err.
/// May panic if called except immediately after a successful, Ok(Some(_))-returning send_htlc.
pub fn send_commitment<L: Deref>(&mut self, logger: &L) -> Result<(msgs::CommitmentSigned, ChannelMonitorUpdate), ChannelError> where L::Target: Logger {
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
panic!("Cannot create commitment tx until channel is fully established");
}
if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == (ChannelState::AwaitingRemoteRevoke as u32) {
panic!("Cannot create commitment tx until remote revokes their previous commitment");
}
if (self.channel_state & (ChannelState::PeerDisconnected as u32)) == (ChannelState::PeerDisconnected as u32) {
panic!("Cannot create commitment tx while disconnected, as send_htlc will have returned an Err so a send_commitment precondition has been violated");
}
if (self.channel_state & (ChannelState::MonitorUpdateInProgress as u32)) == (ChannelState::MonitorUpdateInProgress as u32) {
panic!("Cannot create commitment tx while awaiting monitor update unfreeze, as send_htlc will have returned an Err so a send_commitment precondition has been violated");
}
let mut have_updates = self.is_outbound() && self.pending_update_fee.is_some();
for htlc in self.pending_outbound_htlcs.iter() {
if let OutboundHTLCState::LocalAnnounced(_) = htlc.state {
have_updates = true;
}
if have_updates { break; }
}
for htlc in self.pending_inbound_htlcs.iter() {
if let InboundHTLCState::LocalRemoved(_) = htlc.state {
have_updates = true;
}
if have_updates { break; }
}
if !have_updates {
panic!("Cannot create commitment tx until we have some updates to send");
}
self.send_commitment_no_status_check(logger)
}
/// Only fails in case of bad keys
fn send_commitment_no_status_check<L: Deref>(&mut self, logger: &L) -> Result<(msgs::CommitmentSigned, ChannelMonitorUpdate), ChannelError> where L::Target: Logger {
log_trace!(logger, "Updating HTLC state for a newly-sent commitment_signed...");
@ -5796,10 +5812,11 @@ impl<Signer: Sign> Channel<Signer> {
/// Adds a pending outbound HTLC to this channel, and creates a signed commitment transaction
/// to send to the remote peer in one go.
/// Shorthand for calling send_htlc() followed by send_commitment(), see docs on those for
/// more info.
///
/// Shorthand for calling [`Self::send_htlc`] followed by a commitment update, see docs on
/// [`Self::send_htlc`] and [`Self::send_commitment_no_state_update`] for more info.
pub fn send_htlc_and_commit<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, logger: &L) -> Result<Option<(msgs::UpdateAddHTLC, msgs::CommitmentSigned, ChannelMonitorUpdate)>, ChannelError> where L::Target: Logger {
match self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, logger)? {
match self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, false, logger)? {
Some(update_add_htlc) => {
let (commitment_signed, monitor_update) = self.send_commitment_no_status_check(logger)?;
Ok(Some((update_add_htlc, commitment_signed, monitor_update)))

View file

@ -3163,7 +3163,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
let mut new_events = Vec::new();
let mut failed_forwards = Vec::new();
let mut phantom_receives: Vec<(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new();
let mut handle_errors = Vec::new();
{
let mut forward_htlcs = HashMap::new();
mem::swap(&mut forward_htlcs, &mut self.forward_htlcs.lock().unwrap());
@ -3279,8 +3278,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
continue;
},
hash_map::Entry::Occupied(mut chan) => {
let mut add_htlc_msgs = Vec::new();
let mut fail_htlc_msgs = Vec::new();
for forward_info in pending_forwards.drain(..) {
match forward_info {
HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
@ -3299,34 +3296,21 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
// Phantom payments are only PendingHTLCRouting::Receive.
phantom_shared_secret: None,
});
match chan.get_mut().send_htlc(outgoing_amt_msat, payment_hash, outgoing_cltv_value, htlc_source.clone(), onion_packet, &self.logger) {
Err(e) => {
if let ChannelError::Ignore(msg) = e {
log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg);
} else {
panic!("Stated return value requirements in send_htlc() were not met");
}
let (failure_code, data) = self.get_htlc_temp_fail_err_and_data(0x1000|7, short_chan_id, chan.get());
failed_forwards.push((htlc_source, payment_hash,
HTLCFailReason::reason(failure_code, data),
HTLCDestination::NextHopChannel { node_id: Some(chan.get().get_counterparty_node_id()), channel_id: forward_chan_id }
));
continue;
},
Ok(update_add) => {
match update_add {
Some(msg) => { add_htlc_msgs.push(msg); },
None => {
// Nothing to do here...we're waiting on a remote
// revoke_and_ack before we can add anymore HTLCs. The Channel
// will automatically handle building the update_add_htlc and
// commitment_signed messages when we can.
// TODO: Do some kind of timer to set the channel as !is_live()
// as we don't really want others relying on us relaying through
// this channel currently :/.
}
}
if let Err(e) = chan.get_mut().queue_add_htlc(outgoing_amt_msat,
payment_hash, outgoing_cltv_value, htlc_source.clone(),
onion_packet, &self.logger)
{
if let ChannelError::Ignore(msg) = e {
log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg);
} else {
panic!("Stated return value requirements in send_htlc() were not met");
}
let (failure_code, data) = self.get_htlc_temp_fail_err_and_data(0x1000|7, short_chan_id, chan.get());
failed_forwards.push((htlc_source, payment_hash,
HTLCFailReason::reason(failure_code, data),
HTLCDestination::NextHopChannel { node_id: Some(chan.get().get_counterparty_node_id()), channel_id: forward_chan_id }
));
continue;
}
},
HTLCForwardInfo::AddHTLC { .. } => {
@ -3334,77 +3318,22 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
},
HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => {
log_trace!(self.logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id);
match chan.get_mut().get_update_fail_htlc(htlc_id, err_packet, &self.logger) {
Err(e) => {
if let ChannelError::Ignore(msg) = e {
log_trace!(self.logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg);
} else {
panic!("Stated return value requirements in get_update_fail_htlc() were not met");
}
// fail-backs are best-effort, we probably already have one
// pending, and if not that's OK, if not, the channel is on
// the chain and sending the HTLC-Timeout is their problem.
continue;
},
Ok(Some(msg)) => { fail_htlc_msgs.push(msg); },
Ok(None) => {
// Nothing to do here...we're waiting on a remote
// revoke_and_ack before we can update the commitment
// transaction. The Channel will automatically handle
// building the update_fail_htlc and commitment_signed
// messages when we can.
// We don't need any kind of timer here as they should fail
// the channel onto the chain if they can't get our
// update_fail_htlc in time, it's not our problem.
if let Err(e) = chan.get_mut().queue_fail_htlc(
htlc_id, err_packet, &self.logger
) {
if let ChannelError::Ignore(msg) = e {
log_trace!(self.logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg);
} else {
panic!("Stated return value requirements in queue_fail_htlc() were not met");
}
// fail-backs are best-effort, we probably already have one
// pending, and if not that's OK, if not, the channel is on
// the chain and sending the HTLC-Timeout is their problem.
continue;
}
},
}
}
if !add_htlc_msgs.is_empty() || !fail_htlc_msgs.is_empty() {
let (commitment_msg, monitor_update) = match chan.get_mut().send_commitment(&self.logger) {
Ok(res) => res,
Err(e) => {
// We surely failed send_commitment due to bad keys, in that case
// close channel and then send error message to peer.
let counterparty_node_id = chan.get().get_counterparty_node_id();
let err: Result<(), _> = match e {
ChannelError::Ignore(_) | ChannelError::Warn(_) => {
panic!("Stated return value requirements in send_commitment() were not met");
}
ChannelError::Close(msg) => {
log_trace!(self.logger, "Closing channel {} due to Close-required error: {}", log_bytes!(chan.key()[..]), msg);
let mut channel = remove_channel!(self, chan);
// ChannelClosed event is generated by handle_error for us.
Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel.channel_id(), channel.get_user_id(), channel.force_shutdown(true), self.get_channel_update_for_broadcast(&channel).ok()))
},
};
handle_errors.push((counterparty_node_id, err));
continue;
}
};
match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
ChannelMonitorUpdateStatus::Completed => {},
e => {
handle_errors.push((chan.get().get_counterparty_node_id(), handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, true)));
continue;
}
}
log_debug!(self.logger, "Forwarding HTLCs resulted in a commitment update with {} HTLCs added and {} HTLCs failed for channel {}",
add_htlc_msgs.len(), fail_htlc_msgs.len(), log_bytes!(chan.get().channel_id()));
channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
node_id: chan.get().get_counterparty_node_id(),
updates: msgs::CommitmentUpdate {
update_add_htlcs: add_htlc_msgs,
update_fulfill_htlcs: Vec::new(),
update_fail_htlcs: fail_htlc_msgs,
update_fail_malformed_htlcs: Vec::new(),
update_fee: None,
commitment_signed: commitment_msg,
},
});
}
}
}
} else {
@ -3608,9 +3537,11 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
}
self.forward_htlcs(&mut phantom_receives);
for (counterparty_node_id, err) in handle_errors.drain(..) {
let _ = handle_error!(self, err, counterparty_node_id);
}
// Freeing the holding cell here is relatively redundant - in practice we'll do it when we
// next get a `get_and_clear_pending_msg_events` call, but some tests rely on it, and it's
// nice to do the work now if we can rather than while we're trying to get messages in the
// network stack.
self.check_free_holding_cells();
if new_events.is_empty() { return }
let mut events = self.pending_events.lock().unwrap();
@ -3648,59 +3579,24 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
self.process_background_events();
}
fn update_channel_fee(&self, pending_msg_events: &mut Vec<events::MessageSendEvent>, chan_id: &[u8; 32], chan: &mut Channel<<K::Target as KeysInterface>::Signer>, new_feerate: u32) -> (bool, NotifyOption, Result<(), MsgHandleErrInternal>) {
if !chan.is_outbound() { return (true, NotifyOption::SkipPersist, Ok(())); }
fn update_channel_fee(&self, chan_id: &[u8; 32], chan: &mut Channel<<K::Target as KeysInterface>::Signer>, new_feerate: u32) -> NotifyOption {
if !chan.is_outbound() { return NotifyOption::SkipPersist; }
// If the feerate has decreased by less than half, don't bother
if new_feerate <= chan.get_feerate() && new_feerate * 2 > chan.get_feerate() {
log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {}.",
log_bytes!(chan_id[..]), chan.get_feerate(), new_feerate);
return (true, NotifyOption::SkipPersist, Ok(()));
return NotifyOption::SkipPersist;
}
if !chan.is_live() {
log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {} as it cannot currently be updated (probably the peer is disconnected).",
log_bytes!(chan_id[..]), chan.get_feerate(), new_feerate);
return (true, NotifyOption::SkipPersist, Ok(()));
return NotifyOption::SkipPersist;
}
log_trace!(self.logger, "Channel {} qualifies for a feerate change from {} to {}.",
log_bytes!(chan_id[..]), chan.get_feerate(), new_feerate);
let mut retain_channel = true;
let res = match chan.send_update_fee_and_commit(new_feerate, &self.logger) {
Ok(res) => Ok(res),
Err(e) => {
let (drop, res) = convert_chan_err!(self, e, chan, chan_id);
if drop { retain_channel = false; }
Err(res)
}
};
let ret_err = match res {
Ok(Some((update_fee, commitment_signed, monitor_update))) => {
match self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) {
ChannelMonitorUpdateStatus::Completed => {
pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
node_id: chan.get_counterparty_node_id(),
updates: msgs::CommitmentUpdate {
update_add_htlcs: Vec::new(),
update_fulfill_htlcs: Vec::new(),
update_fail_htlcs: Vec::new(),
update_fail_malformed_htlcs: Vec::new(),
update_fee: Some(update_fee),
commitment_signed,
},
});
Ok(())
},
e => {
let (res, drop) = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, chan_id, COMMITMENT_UPDATE_ONLY);
if drop { retain_channel = false; }
res
}
}
},
Ok(None) => Ok(()),
Err(e) => Err(e),
};
(retain_channel, NotifyOption::DoPersist, ret_err)
chan.queue_update_fee(new_feerate, &self.logger);
NotifyOption::DoPersist
}
#[cfg(fuzzing)]
@ -3714,19 +3610,10 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
let new_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
let mut handle_errors = Vec::new();
{
let mut channel_state_lock = self.channel_state.lock().unwrap();
let channel_state = &mut *channel_state_lock;
let pending_msg_events = &mut channel_state.pending_msg_events;
channel_state.by_id.retain(|chan_id, chan| {
let (retain_channel, chan_needs_persist, err) = self.update_channel_fee(pending_msg_events, chan_id, chan, new_feerate);
if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; }
if err.is_err() {
handle_errors.push(err);
}
retain_channel
});
let mut channel_state = self.channel_state.lock().unwrap();
for (chan_id, chan) in channel_state.by_id.iter_mut() {
let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate);
if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; }
}
should_persist
@ -3791,20 +3678,15 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
let new_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
let mut handle_errors = Vec::new();
let mut handle_errors: Vec<(Result<(), _>, _)> = Vec::new();
let mut timed_out_mpp_htlcs = Vec::new();
{
let mut channel_state_lock = self.channel_state.lock().unwrap();
let channel_state = &mut *channel_state_lock;
let pending_msg_events = &mut channel_state.pending_msg_events;
channel_state.by_id.retain(|chan_id, chan| {
let counterparty_node_id = chan.get_counterparty_node_id();
let (retain_channel, chan_needs_persist, err) = self.update_channel_fee(pending_msg_events, chan_id, chan, new_feerate);
let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate);
if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; }
if err.is_err() {
handle_errors.push((err, counterparty_node_id));
}
if !retain_channel { return false; }
if let Err(e) = chan.timer_check_closing_negotiation_progress() {
let (needs_close, err) = convert_chan_err!(self, e, chan, chan_id);
@ -3879,6 +3761,13 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
self.remove_stale_resolved_payments();
// Technically we don't need to do this here, but if we have holding cell entries in a
// channel that need freeing, it's better to do that here and block a background task
// than block the message queueing pipeline.
if self.check_free_holding_cells() {
should_persist = NotifyOption::DoPersist;
}
should_persist
});
}
@ -5502,11 +5391,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
/// Check the holding cell in each channel and free any pending HTLCs in them if possible.
/// Returns whether there were any updates such as if pending HTLCs were freed or a monitor
/// update was applied.
///
/// This should only apply to HTLCs which were added to the holding cell because we were
/// waiting on a monitor update to finish. In that case, we don't want to free the holding cell
/// directly in `channel_monitor_updated` as it may introduce deadlocks calling back into user
/// code to inform them of a channel monitor update.
fn check_free_holding_cells(&self) -> bool {
let mut has_monitor_update = false;
let mut failed_htlcs = Vec::new();