Pass monitor updates by reference, not owned

In the next commit(s) we'll start holding `ChannelMonitorUpdate`s
that are being persisted in `Channel`s until they're done
persisting. In order to do that, switch to applying the updates by
reference instead of value.
This commit is contained in:
Matt Corallo 2022-11-12 18:26:38 +00:00
parent ce6bcf68a1
commit 7e23afe1dc
9 changed files with 34 additions and 34 deletions

View file

@ -152,7 +152,7 @@ impl chain::Watch<EnforcingSigner> for TestChainMonitor {
self.chain_monitor.watch_channel(funding_txo, monitor)
}
fn update_channel(&self, funding_txo: OutPoint, update: channelmonitor::ChannelMonitorUpdate) -> chain::ChannelMonitorUpdateStatus {
fn update_channel(&self, funding_txo: OutPoint, update: &channelmonitor::ChannelMonitorUpdate) -> chain::ChannelMonitorUpdateStatus {
let mut map_lock = self.latest_monitors.lock().unwrap();
let mut map_entry = match map_lock.entry(funding_txo) {
hash_map::Entry::Occupied(entry) => entry,
@ -160,7 +160,7 @@ impl chain::Watch<EnforcingSigner> for TestChainMonitor {
};
let deserialized_monitor = <(BlockHash, channelmonitor::ChannelMonitor<EnforcingSigner>)>::
read(&mut Cursor::new(&map_entry.get().1), (&*self.keys, &*self.keys)).unwrap().1;
deserialized_monitor.update_monitor(&update, &&TestBroadcaster{}, &FuzzEstimator { ret_val: atomic::AtomicU32::new(253) }, &self.logger).unwrap();
deserialized_monitor.update_monitor(update, &&TestBroadcaster{}, &FuzzEstimator { ret_val: atomic::AtomicU32::new(253) }, &self.logger).unwrap();
let mut ser = VecWriter(Vec::new());
deserialized_monitor.write(&mut ser).unwrap();
map_entry.insert((update.update_id, ser.0));

View file

@ -14,7 +14,7 @@ impl chainmonitor::Persist<EnforcingSigner> for TestPersister {
self.update_ret.lock().unwrap().clone()
}
fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &Option<channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor<EnforcingSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: Option<&channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor<EnforcingSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
self.update_ret.lock().unwrap().clone()
}
}

View file

@ -144,7 +144,7 @@ pub trait Persist<ChannelSigner: Sign> {
/// [`ChannelMonitorUpdateStatus`] for requirements when returning errors.
///
/// [`Writeable::write`]: crate::util::ser::Writeable::write
fn update_persisted_channel(&self, channel_id: OutPoint, update: &Option<ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
fn update_persisted_channel(&self, channel_id: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
}
struct MonitorHolder<ChannelSigner: Sign> {
@ -294,7 +294,7 @@ where C::Target: chain::Filter,
}
log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) {
match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) {
ChannelMonitorUpdateStatus::Completed =>
log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
ChannelMonitorUpdateStatus::PermanentFailure => {
@ -646,7 +646,7 @@ where C::Target: chain::Filter,
/// Note that we persist the given `ChannelMonitor` update while holding the
/// `ChainMonitor` monitors lock.
fn update_channel(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus {
fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus {
// Update the monitor that watches the channel referred to by the given outpoint.
let monitors = self.monitors.read().unwrap();
match monitors.get(&funding_txo) {
@ -664,15 +664,15 @@ where C::Target: chain::Filter,
Some(monitor_state) => {
let monitor = &monitor_state.monitor;
log_trace!(self.logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor));
let update_res = monitor.update_monitor(&update, &self.broadcaster, &*self.fee_estimator, &self.logger);
let update_res = monitor.update_monitor(update, &self.broadcaster, &*self.fee_estimator, &self.logger);
if update_res.is_err() {
log_error!(self.logger, "Failed to update ChannelMonitor for channel {}.", log_funding_info!(monitor));
}
// Even if updating the monitor returns an error, the monitor's state will
// still be changed. So, persist the updated monitor despite the error.
let update_id = MonitorUpdateId::from_monitor_update(&update);
let update_id = MonitorUpdateId::from_monitor_update(update);
let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
let persist_res = self.persister.update_persisted_channel(funding_txo, &Some(update), monitor, update_id);
let persist_res = self.persister.update_persisted_channel(funding_txo, Some(update), monitor, update_id);
match persist_res {
ChannelMonitorUpdateStatus::InProgress => {
pending_monitor_updates.push(update_id);

View file

@ -312,7 +312,7 @@ pub trait Watch<ChannelSigner: Sign> {
/// [`ChannelMonitorUpdateStatus`] for invariants around returning an error.
///
/// [`update_monitor`]: channelmonitor::ChannelMonitor::update_monitor
fn update_channel(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus;
fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus;
/// Returns any monitor events since the last call. Subsequent calls must only return new
/// events.

View file

@ -147,9 +147,9 @@ fn test_monitor_and_persister_update_fail() {
// Check that even though the persister is returning a InProgress,
// because the update is bogus, ultimately the error that's returned
// should be a PermanentFailure.
if let ChannelMonitorUpdateStatus::PermanentFailure = chain_mon.chain_monitor.update_channel(outpoint, update.clone()) {} else { panic!("Expected monitor error to be permanent"); }
if let ChannelMonitorUpdateStatus::PermanentFailure = chain_mon.chain_monitor.update_channel(outpoint, &update) {} else { panic!("Expected monitor error to be permanent"); }
logger.assert_log_regex("lightning::chain::chainmonitor".to_string(), regex::Regex::new("Persistence of ChannelMonitorUpdate for channel [0-9a-f]* in progress").unwrap(), 1);
assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, update), ChannelMonitorUpdateStatus::Completed);
assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::Completed);
} else { assert!(false); }
}

View file

@ -1709,7 +1709,7 @@ where
// Update the monitor with the shutdown script if necessary.
if let Some(monitor_update) = monitor_update {
let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), monitor_update);
let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), &monitor_update);
let (result, is_permanent) =
handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
if is_permanent {
@ -1807,7 +1807,7 @@ where
// force-closing. The monitor update on the required in-memory copy should broadcast
// the latest local state, which is the best we can do anyway. Thus, it is safe to
// ignore the result here.
let _ = self.chain_monitor.update_channel(funding_txo, monitor_update);
let _ = self.chain_monitor.update_channel(funding_txo, &monitor_update);
}
}
@ -2336,7 +2336,7 @@ where
chan)
} {
Some((update_add, commitment_signed, monitor_update)) => {
let update_err = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update);
let update_err = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update);
let chan_id = chan.get().channel_id();
match (update_err,
handle_monitor_update_res!(self, update_err, chan,
@ -3284,7 +3284,7 @@ where
BackgroundEvent::ClosingMonitorUpdate((funding_txo, update)) => {
// The channel has already been closed, so no use bothering to care about the
// monitor updating completing.
let _ = self.chain_monitor.update_channel(funding_txo, update);
let _ = self.chain_monitor.update_channel(funding_txo, &update);
},
}
}
@ -3807,7 +3807,7 @@ where
match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) {
Ok(msgs_monitor_option) => {
if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option {
match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) {
ChannelMonitorUpdateStatus::Completed => {},
e => {
log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Debug },
@ -3844,7 +3844,7 @@ where
}
},
Err((e, monitor_update)) => {
match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) {
ChannelMonitorUpdateStatus::Completed => {},
e => {
// TODO: This needs to be handled somehow - if we receive a monitor update
@ -3880,7 +3880,7 @@ where
};
// We update the ChannelMonitor on the backward link, after
// receiving an `update_fulfill_htlc` from the forward link.
let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, preimage_update);
let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
if update_res != ChannelMonitorUpdateStatus::Completed {
// TODO: This needs to be handled somehow - if we receive a monitor update
// with a preimage we *must* somehow manage to propagate it to the upstream
@ -4449,7 +4449,7 @@ where
// Update the monitor with the shutdown script if necessary.
if let Some(monitor_update) = monitor_update {
let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), monitor_update);
let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), &monitor_update);
let (result, is_permanent) =
handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
if is_permanent {
@ -4650,13 +4650,13 @@ where
Err((None, e)) => try_chan_entry!(self, Err(e), chan),
Err((Some(update), e)) => {
assert!(chan.get().is_awaiting_monitor_update());
let _ = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), update);
let _ = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &update);
try_chan_entry!(self, Err(e), chan);
unreachable!();
},
Ok(res) => res
};
let update_res = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update);
let update_res = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update);
if let Err(e) = handle_monitor_update_res!(self, update_res, chan, RAACommitmentOrder::RevokeAndACKFirst, true, commitment_signed.is_some()) {
return Err(e);
}
@ -4792,7 +4792,7 @@ where
let raa_updates = break_chan_entry!(self,
chan.get_mut().revoke_and_ack(&msg, &self.logger), chan);
htlcs_to_fail = raa_updates.holding_cell_failed_htlcs;
let update_res = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), raa_updates.monitor_update);
let update_res = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &raa_updates.monitor_update);
if was_paused_for_mon_update {
assert!(update_res != ChannelMonitorUpdateStatus::Completed);
assert!(raa_updates.commitment_update.is_none());
@ -5097,7 +5097,7 @@ where
));
}
if let Some((commitment_update, monitor_update)) = commitment_opt {
match self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) {
match self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), &monitor_update) {
ChannelMonitorUpdateStatus::Completed => {
pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
node_id: chan.get_counterparty_node_id(),

View file

@ -8135,8 +8135,8 @@ fn test_update_err_monitor_lockdown() {
let mut node_0_peer_state_lock;
let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan_1.2);
if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
assert_eq!(watchtower.chain_monitor.update_channel(outpoint, update.clone()), ChannelMonitorUpdateStatus::PermanentFailure);
assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, update), ChannelMonitorUpdateStatus::Completed);
assert_eq!(watchtower.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::PermanentFailure);
assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::Completed);
} else { assert!(false); }
}
// Our local monitor is in-sync and hasn't processed yet timeout
@ -8230,9 +8230,9 @@ fn test_concurrent_monitor_claim() {
let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan_1.2);
if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
// Watchtower Alice should already have seen the block and reject the update
assert_eq!(watchtower_alice.chain_monitor.update_channel(outpoint, update.clone()), ChannelMonitorUpdateStatus::PermanentFailure);
assert_eq!(watchtower_bob.chain_monitor.update_channel(outpoint, update.clone()), ChannelMonitorUpdateStatus::Completed);
assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, update), ChannelMonitorUpdateStatus::Completed);
assert_eq!(watchtower_alice.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::PermanentFailure);
assert_eq!(watchtower_bob.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::Completed);
assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::Completed);
} else { assert!(false); }
}
// Our local monitor is in-sync and hasn't processed yet timeout

View file

@ -94,7 +94,7 @@ impl<ChannelSigner: Sign, K: KVStorePersister> Persist<ChannelSigner> for K {
}
}
fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &Option<ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
fn update_persisted_channel(&self, funding_txo: OutPoint, _update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
let key = format!("monitors/{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
match self.persist(&key, monitor) {
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,

View file

@ -184,12 +184,12 @@ impl<'a> chain::Watch<EnforcingSigner> for TestChainMonitor<'a> {
self.chain_monitor.watch_channel(funding_txo, new_monitor)
}
fn update_channel(&self, funding_txo: OutPoint, update: channelmonitor::ChannelMonitorUpdate) -> chain::ChannelMonitorUpdateStatus {
fn update_channel(&self, funding_txo: OutPoint, update: &channelmonitor::ChannelMonitorUpdate) -> chain::ChannelMonitorUpdateStatus {
// Every monitor update should survive roundtrip
let mut w = TestVecWriter(Vec::new());
update.write(&mut w).unwrap();
assert!(channelmonitor::ChannelMonitorUpdate::read(
&mut io::Cursor::new(&w.0)).unwrap() == update);
&mut io::Cursor::new(&w.0)).unwrap() == *update);
self.monitor_updates.lock().unwrap().entry(funding_txo.to_channel_id()).or_insert(Vec::new()).push(update.clone());
@ -202,7 +202,7 @@ impl<'a> chain::Watch<EnforcingSigner> for TestChainMonitor<'a> {
}
self.latest_monitor_update_id.lock().unwrap().insert(funding_txo.to_channel_id(),
(funding_txo, update.update_id, MonitorUpdateId::from_monitor_update(&update)));
(funding_txo, update.update_id, MonitorUpdateId::from_monitor_update(update)));
let update_res = self.chain_monitor.update_channel(funding_txo, update);
// At every point where we get a monitor update, we should be able to send a useful monitor
// to a watchtower and disk...
@ -254,7 +254,7 @@ impl<Signer: keysinterface::Sign> chainmonitor::Persist<Signer> for TestPersiste
chain::ChannelMonitorUpdateStatus::Completed
}
fn update_persisted_channel(&self, funding_txo: OutPoint, update: &Option<channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor<Signer>, update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
fn update_persisted_channel(&self, funding_txo: OutPoint, update: Option<&channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor<Signer>, update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
let mut ret = chain::ChannelMonitorUpdateStatus::Completed;
if let Some(update_ret) = self.update_rets.lock().unwrap().pop_front() {
ret = update_ret;