Merge pull request #1108 from TheBlueMatt/2021-10-persist-mon-blocks

Persist ChannelMonitors after new blocks are connected
This commit is contained in:
Matt Corallo 2021-10-20 00:53:26 +00:00 committed by GitHub
commit 107c6c7939
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 627 additions and 238 deletions

View file

@ -855,22 +855,26 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
0x08 => {
if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
nodes[0].channel_monitor_updated(&chan_1_funding, *id);
monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
nodes[0].process_monitor_events();
}
},
0x09 => {
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
nodes[1].channel_monitor_updated(&chan_1_funding, *id);
monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
nodes[1].process_monitor_events();
}
},
0x0a => {
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
nodes[1].channel_monitor_updated(&chan_2_funding, *id);
monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
nodes[1].process_monitor_events();
}
},
0x0b => {
if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
nodes[2].channel_monitor_updated(&chan_2_funding, *id);
monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
nodes[2].process_monitor_events();
}
},
@ -1071,22 +1075,26 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
// Test that no channel is in a stuck state where neither party can send funds even
// after we resolve all pending events.
// First make sure there are no pending monitor updates, resetting the error state
// and calling channel_monitor_updated for each monitor.
// and calling force_channel_monitor_updated for each monitor.
*monitor_a.persister.update_ret.lock().unwrap() = Ok(());
*monitor_b.persister.update_ret.lock().unwrap() = Ok(());
*monitor_c.persister.update_ret.lock().unwrap() = Ok(());
if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
nodes[0].channel_monitor_updated(&chan_1_funding, *id);
monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
nodes[0].process_monitor_events();
}
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
nodes[1].channel_monitor_updated(&chan_1_funding, *id);
monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
nodes[1].process_monitor_events();
}
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
nodes[1].channel_monitor_updated(&chan_2_funding, *id);
monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
nodes[1].process_monitor_events();
}
if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
nodes[2].channel_monitor_updated(&chan_2_funding, *id);
monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
nodes[2].process_monitor_events();
}
// Next, make sure peers are all connected to each other

View file

@ -1,5 +1,6 @@
use lightning::chain;
use lightning::chain::{chainmonitor, channelmonitor};
use lightning::chain::chainmonitor::MonitorUpdateId;
use lightning::chain::transaction::OutPoint;
use lightning::util::enforcing_trait_impls::EnforcingSigner;
@ -9,11 +10,11 @@ pub struct TestPersister {
pub update_ret: Mutex<Result<(), chain::ChannelMonitorUpdateErr>>,
}
impl chainmonitor::Persist<EnforcingSigner> for TestPersister {
fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor<EnforcingSigner>) -> Result<(), chain::ChannelMonitorUpdateErr> {
fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor<EnforcingSigner>, _update_id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
self.update_ret.lock().unwrap().clone()
}
fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &channelmonitor::ChannelMonitorUpdate, _data: &channelmonitor::ChannelMonitor<EnforcingSigner>) -> Result<(), chain::ChannelMonitorUpdateErr> {
fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &Option<channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor<EnforcingSigner>, _update_id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
self.update_ret.lock().unwrap().clone()
}
}

View file

@ -159,13 +159,18 @@ impl FilesystemPersister {
}
impl<ChannelSigner: Sign> chainmonitor::Persist<ChannelSigner> for FilesystemPersister {
fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>) -> Result<(), chain::ChannelMonitorUpdateErr> {
// TODO: We really need a way for the persister to inform the user that its time to crash/shut
// down once these start returning failure.
// A PermanentFailure implies we need to shut down since we're force-closing channels without
// even broadcasting!
fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
util::write_to_file(self.path_to_monitor_data(), filename, monitor)
.map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure)
}
fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &ChannelMonitorUpdate, monitor: &ChannelMonitor<ChannelSigner>) -> Result<(), chain::ChannelMonitorUpdateErr> {
fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &Option<ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
util::write_to_file(self.path_to_monitor_data(), filename, monitor)
.map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure)
@ -296,6 +301,8 @@ mod tests {
nodes[1].node.force_close_channel(&chan.2).unwrap();
check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed);
let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
let update_map = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap();
let update_id = update_map.get(&added_monitors[0].0.to_channel_id()).unwrap();
// Set the persister's directory to read-only, which should result in
// returning a permanent failure when we then attempt to persist a
@ -309,7 +316,7 @@ mod tests {
txid: Txid::from_hex("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(),
index: 0
};
match persister.persist_new_channel(test_txo, &added_monitors[0].1) {
match persister.persist_new_channel(test_txo, &added_monitors[0].1, update_id.2) {
Err(ChannelMonitorUpdateErr::PermanentFailure) => {},
_ => panic!("unexpected result from persisting new channel")
}
@ -333,6 +340,8 @@ mod tests {
nodes[1].node.force_close_channel(&chan.2).unwrap();
check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed);
let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
let update_map = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap();
let update_id = update_map.get(&added_monitors[0].0.to_channel_id()).unwrap();
// Create the persister with an invalid directory name and test that the
// channel fails to open because the directories fail to be created. There
@ -344,7 +353,7 @@ mod tests {
txid: Txid::from_hex("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(),
index: 0
};
match persister.persist_new_channel(test_txo, &added_monitors[0].1) {
match persister.persist_new_channel(test_txo, &added_monitors[0].1, update_id.2) {
Err(ChannelMonitorUpdateErr::PermanentFailure) => {},
_ => panic!("unexpected result from persisting new channel")
}

View file

@ -29,48 +29,98 @@ use bitcoin::hash_types::Txid;
use chain;
use chain::{ChannelMonitorUpdateErr, Filter, WatchedOutput};
use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs};
use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS};
use chain::transaction::{OutPoint, TransactionData};
use chain::keysinterface::Sign;
use util::atomic_counter::AtomicCounter;
use util::logger::Logger;
use util::errors::APIError;
use util::events;
use util::events::EventHandler;
use ln::channelmanager::ChannelDetails;
use prelude::*;
use sync::{RwLock, RwLockReadGuard};
use sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
use core::ops::Deref;
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
#[derive(Clone, Copy, Hash, PartialEq, Eq)]
/// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents
/// entirely opaque.
enum UpdateOrigin {
/// An update that was generated by the `ChannelManager` (via our `chain::Watch`
/// implementation). This corresponds to an actual [`ChannelMonitorUpdate::update_id`] field
/// and [`ChannelMonitor::get_latest_update_id`].
OffChain(u64),
/// An update that was generated during blockchain processing. The ID here is specific to the
/// generating [`ChainMonitor`] and does *not* correspond to any on-disk IDs.
ChainSync(u64),
}
/// An opaque identifier describing a specific [`Persist`] method call.
#[derive(Clone, Copy, Hash, PartialEq, Eq)]
pub struct MonitorUpdateId {
contents: UpdateOrigin,
}
impl MonitorUpdateId {
pub(crate) fn from_monitor_update(update: &ChannelMonitorUpdate) -> Self {
Self { contents: UpdateOrigin::OffChain(update.update_id) }
}
pub(crate) fn from_new_monitor<ChannelSigner: Sign>(monitor: &ChannelMonitor<ChannelSigner>) -> Self {
Self { contents: UpdateOrigin::OffChain(monitor.get_latest_update_id()) }
}
}
/// `Persist` defines behavior for persisting channel monitors: this could mean
/// writing once to disk, and/or uploading to one or more backup services.
///
/// Note that for every new monitor, you **must** persist the new `ChannelMonitor`
/// to disk/backups. And, on every update, you **must** persist either the
/// `ChannelMonitorUpdate` or the updated monitor itself. Otherwise, there is risk
/// of situations such as revoking a transaction, then crashing before this
/// revocation can be persisted, then unintentionally broadcasting a revoked
/// transaction and losing money. This is a risk because previous channel states
/// are toxic, so it's important that whatever channel state is persisted is
/// kept up-to-date.
/// Each method can return three possible values:
/// * If persistence (including any relevant `fsync()` calls) happens immediately, the
/// implementation should return `Ok(())`, indicating normal channel operation should continue.
/// * If persistence happens asynchronously, implementations should first ensure the
/// [`ChannelMonitor`] or [`ChannelMonitorUpdate`] are written durably to disk, and then return
/// `Err(ChannelMonitorUpdateErr::TemporaryFailure)` while the update continues in the
/// background. Once the update completes, [`ChainMonitor::channel_monitor_updated`] should be
/// called with the corresponding [`MonitorUpdateId`].
///
/// Note that unlike the direct [`chain::Watch`] interface,
/// [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs.
///
/// * If persistence fails for some reason, implementations should return
/// `Err(ChannelMonitorUpdateErr::PermanentFailure)`, in which case the channel will likely be
/// closed without broadcasting the latest state. See
/// [`ChannelMonitorUpdateErr::PermanentFailure`] for more details.
pub trait Persist<ChannelSigner: Sign> {
/// Persist a new channel's data. The data can be stored any way you want, but
/// the identifier provided by Rust-Lightning is the channel's outpoint (and
/// it is up to you to maintain a correct mapping between the outpoint and the
/// stored channel data). Note that you **must** persist every new monitor to
/// disk. See the `Persist` trait documentation for more details.
/// Persist a new channel's data in response to a [`chain::Watch::watch_channel`] call. This is
/// called by [`ChannelManager`] for new channels, or may be called directly, e.g. on startup.
///
/// The data can be stored any way you want, but the identifier provided by LDK is the
/// channel's outpoint (and it is up to you to maintain a correct mapping between the outpoint
/// and the stored channel data). Note that you **must** persist every new monitor to disk.
///
/// The `update_id` is used to identify this call to [`ChainMonitor::channel_monitor_updated`],
/// if you return [`ChannelMonitorUpdateErr::TemporaryFailure`].
///
/// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`
/// and [`ChannelMonitorUpdateErr`] for requirements when returning errors.
///
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
/// [`Writeable::write`]: crate::util::ser::Writeable::write
fn persist_new_channel(&self, id: OutPoint, data: &ChannelMonitor<ChannelSigner>) -> Result<(), ChannelMonitorUpdateErr>;
fn persist_new_channel(&self, channel_id: OutPoint, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>;
/// Update one channel's data. The provided `ChannelMonitor` has already
/// applied the given update.
/// Update one channel's data. The provided [`ChannelMonitor`] has already applied the given
/// update.
///
/// Note that on every update, you **must** persist either the
/// `ChannelMonitorUpdate` or the updated monitor itself to disk/backups. See
/// the `Persist` trait documentation for more details.
/// Note that on every update, you **must** persist either the [`ChannelMonitorUpdate`] or the
/// updated monitor itself to disk/backups. See the [`Persist`] trait documentation for more
/// details.
///
/// During blockchain synchronization operations, this may be called with no
/// [`ChannelMonitorUpdate`], in which case the full [`ChannelMonitor`] needs to be persisted.
/// Note that after the full [`ChannelMonitor`] is persisted any previous
/// [`ChannelMonitorUpdate`]s which were persisted should be discarded - they can no longer be
/// applied to the persisted [`ChannelMonitor`] as they were already applied.
///
/// If an implementer chooses to persist the updates only, they need to make
/// sure that all the updates are applied to the `ChannelMonitors` *before*
@ -84,16 +134,61 @@ pub trait Persist<ChannelSigner: Sign> {
/// them in batches. The size of each monitor grows `O(number of state updates)`
/// whereas updates are small and `O(1)`.
///
/// The `update_id` is used to identify this call to [`ChainMonitor::channel_monitor_updated`],
/// if you return [`ChannelMonitorUpdateErr::TemporaryFailure`].
///
/// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`,
/// [`Writeable::write`] on [`ChannelMonitorUpdate`] for writing out an update, and
/// [`ChannelMonitorUpdateErr`] for requirements when returning errors.
///
/// [`Writeable::write`]: crate::util::ser::Writeable::write
fn update_persisted_channel(&self, id: OutPoint, update: &ChannelMonitorUpdate, data: &ChannelMonitor<ChannelSigner>) -> Result<(), ChannelMonitorUpdateErr>;
fn update_persisted_channel(&self, channel_id: OutPoint, update: &Option<ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>;
}
struct MonitorHolder<ChannelSigner: Sign> {
monitor: ChannelMonitor<ChannelSigner>,
/// The full set of pending monitor updates for this Channel.
///
/// Note that this lock must be held during updates to prevent a race where we call
/// update_persisted_channel, the user returns a TemporaryFailure, and then calls
/// channel_monitor_updated immediately, racing our insertion of the pending update into the
/// contained Vec.
///
/// Beyond the synchronization of updates themselves, we cannot handle user events until after
/// any chain updates have been stored on disk. Thus, we scan this list when returning updates
/// to the ChannelManager, refusing to return any updates for a ChannelMonitor which is still
/// being persisted fully to disk after a chain update.
///
/// This avoids the possibility of handling, e.g. an on-chain claim, generating a claim monitor
/// event, resulting in the relevant ChannelManager generating a PaymentSent event and dropping
/// the pending payment entry, and then reloading before the monitor is persisted, resulting in
/// the ChannelManager re-adding the same payment entry, before the same block is replayed,
/// resulting in a duplicate PaymentSent event.
pending_monitor_updates: Mutex<Vec<MonitorUpdateId>>,
/// When the user returns a PermanentFailure error from an update_persisted_channel call during
/// block processing, we inform the ChannelManager that the channel should be closed
/// asynchronously. In order to ensure no further changes happen before the ChannelManager has
/// processed the closure event, we set this to true and return PermanentFailure for any other
/// chain::Watch events.
channel_perm_failed: AtomicBool,
/// The last block height at which no [`UpdateOrigin::ChainSync`] monitor updates were present
/// in `pending_monitor_updates`.
/// If it's been more than [`LATENCY_GRACE_PERIOD_BLOCKS`] since we started waiting on a chain
/// sync event, we let monitor events return to `ChannelManager` because we cannot hold them up
/// forever or we'll end up with HTLC preimages waiting to feed back into an upstream channel
/// forever, risking funds loss.
last_chain_persist_height: AtomicUsize,
}
impl<ChannelSigner: Sign> MonitorHolder<ChannelSigner> {
fn has_pending_offchain_updates(&self, pending_monitor_updates_lock: &MutexGuard<Vec<MonitorUpdateId>>) -> bool {
pending_monitor_updates_lock.iter().any(|update_id|
if let UpdateOrigin::OffChain(_) = update_id.contents { true } else { false })
}
fn has_pending_chainsync_updates(&self, pending_monitor_updates_lock: &MutexGuard<Vec<MonitorUpdateId>>) -> bool {
pending_monitor_updates_lock.iter().any(|update_id|
if let UpdateOrigin::ChainSync(_) = update_id.contents { true } else { false })
}
}
/// A read-only reference to a current ChannelMonitor.
@ -129,11 +224,20 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
P::Target: Persist<ChannelSigner>,
{
monitors: RwLock<HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
/// When we generate a [`MonitorUpdateId`] for a chain-event monitor persistence, we need a
/// unique ID, which we calculate by simply getting the next value from this counter. Note that
/// the ID is never persisted so it's ok that they reset on restart.
sync_persistence_id: AtomicCounter,
chain_source: Option<C>,
broadcaster: T,
logger: L,
fee_estimator: F,
persister: P,
/// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
/// from the user and not from a [`ChannelMonitor`].
pending_monitor_events: Mutex<Vec<MonitorEvent>>,
/// The best block height seen, used as a proxy for the passage of time.
highest_chain_height: AtomicUsize,
}
impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
@ -152,31 +256,75 @@ where C::Target: chain::Filter,
/// calls must not exclude any transactions matching the new outputs nor any in-block
/// descendants of such transactions. It is not necessary to re-fetch the block to obtain
/// updated `txdata`.
fn process_chain_data<FN>(&self, header: &BlockHeader, txdata: &TransactionData, process: FN)
///
/// Calls which represent a new blockchain tip height should set `best_height`.
fn process_chain_data<FN>(&self, header: &BlockHeader, best_height: Option<u32>, txdata: &TransactionData, process: FN)
where
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
{
let mut dependent_txdata = Vec::new();
let monitor_states = self.monitors.read().unwrap();
for monitor_state in monitor_states.values() {
let mut txn_outputs = process(&monitor_state.monitor, txdata);
{
let monitor_states = self.monitors.write().unwrap();
if let Some(height) = best_height {
// If the best block height is being updated, update highest_chain_height under the
// monitors write lock.
let old_height = self.highest_chain_height.load(Ordering::Acquire);
let new_height = height as usize;
if new_height > old_height {
self.highest_chain_height.store(new_height, Ordering::Release);
}
}
// Register any new outputs with the chain source for filtering, storing any dependent
// transactions from within the block that previously had not been included in txdata.
if let Some(ref chain_source) = self.chain_source {
let block_hash = header.block_hash();
for (txid, mut outputs) in txn_outputs.drain(..) {
for (idx, output) in outputs.drain(..) {
// Register any new outputs with the chain source for filtering and recurse
// if it indicates that there are dependent transactions within the block
// that had not been previously included in txdata.
let output = WatchedOutput {
block_hash: Some(block_hash),
outpoint: OutPoint { txid, index: idx as u16 },
script_pubkey: output.script_pubkey,
};
if let Some(tx) = chain_source.register_output(output) {
dependent_txdata.push(tx);
for (funding_outpoint, monitor_state) in monitor_states.iter() {
let monitor = &monitor_state.monitor;
let mut txn_outputs;
{
txn_outputs = process(monitor, txdata);
let update_id = MonitorUpdateId {
contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
};
let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
if let Some(height) = best_height {
if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
// If there are not ChainSync persists awaiting completion, go ahead and
// set last_chain_persist_height here - we wouldn't want the first
// TemporaryFailure to always immediately be considered "overly delayed".
monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
}
}
log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) {
Ok(()) =>
log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
Err(ChannelMonitorUpdateErr::PermanentFailure) => {
monitor_state.channel_perm_failed.store(true, Ordering::Release);
self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateFailed(*funding_outpoint));
},
Err(ChannelMonitorUpdateErr::TemporaryFailure) => {
log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
pending_monitor_updates.push(update_id);
},
}
}
// Register any new outputs with the chain source for filtering, storing any dependent
// transactions from within the block that previously had not been included in txdata.
if let Some(ref chain_source) = self.chain_source {
let block_hash = header.block_hash();
for (txid, mut outputs) in txn_outputs.drain(..) {
for (idx, output) in outputs.drain(..) {
// Register any new outputs with the chain source for filtering and recurse
// if it indicates that there are dependent transactions within the block
// that had not been previously included in txdata.
let output = WatchedOutput {
block_hash: Some(block_hash),
outpoint: OutPoint { txid, index: idx as u16 },
script_pubkey: output.script_pubkey,
};
if let Some(tx) = chain_source.register_output(output) {
dependent_txdata.push(tx);
}
}
}
}
@ -188,7 +336,7 @@ where C::Target: chain::Filter,
dependent_txdata.sort_unstable_by_key(|(index, _tx)| *index);
dependent_txdata.dedup_by_key(|(index, _tx)| *index);
let txdata: Vec<_> = dependent_txdata.iter().map(|(index, tx)| (*index, tx)).collect();
self.process_chain_data(header, &txdata, process);
self.process_chain_data(header, None, &txdata, process); // We skip the best height the second go-around
}
}
@ -202,11 +350,14 @@ where C::Target: chain::Filter,
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P) -> Self {
Self {
monitors: RwLock::new(HashMap::new()),
sync_persistence_id: AtomicCounter::new(),
chain_source,
broadcaster,
logger,
fee_estimator: feeest,
persister,
pending_monitor_events: Mutex::new(Vec::new()),
highest_chain_height: AtomicUsize::new(0),
}
}
@ -262,6 +413,75 @@ where C::Target: chain::Filter,
self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor
}
/// Indicates the persistence of a [`ChannelMonitor`] has completed after
/// [`ChannelMonitorUpdateErr::TemporaryFailure`] was returned from an update operation.
///
/// Thus, the anticipated use is, at a high level:
/// 1) This [`ChainMonitor`] calls [`Persist::update_persisted_channel`] which stores the
/// update to disk and begins updating any remote (e.g. watchtower/backup) copies,
/// returning [`ChannelMonitorUpdateErr::TemporaryFailure`],
/// 2) once all remote copies are updated, you call this function with the
/// `completed_update_id` that completed, and once all pending updates have completed the
/// channel will be re-enabled.
// Note that we re-enable only after `UpdateOrigin::OffChain` updates complete, we don't
// care about `UpdateOrigin::ChainSync` updates for the channel state being updated. We
// only care about `UpdateOrigin::ChainSync` for returning `MonitorEvent`s.
///
/// Returns an [`APIError::APIMisuseError`] if `funding_txo` does not match any currently
/// registered [`ChannelMonitor`]s.
pub fn channel_monitor_updated(&self, funding_txo: OutPoint, completed_update_id: MonitorUpdateId) -> Result<(), APIError> {
let monitors = self.monitors.read().unwrap();
let monitor_data = if let Some(mon) = monitors.get(&funding_txo) { mon } else {
return Err(APIError::APIMisuseError { err: format!("No ChannelMonitor matching funding outpoint {:?} found", funding_txo) });
};
let mut pending_monitor_updates = monitor_data.pending_monitor_updates.lock().unwrap();
pending_monitor_updates.retain(|update_id| *update_id != completed_update_id);
match completed_update_id {
MonitorUpdateId { contents: UpdateOrigin::OffChain(_) } => {
// Note that we only check for `UpdateOrigin::OffChain` failures here - if
// we're being told that a `UpdateOrigin::OffChain` monitor update completed,
// we only care about ensuring we don't tell the `ChannelManager` to restore
// the channel to normal operation until all `UpdateOrigin::OffChain` updates
// complete.
// If there's some `UpdateOrigin::ChainSync` update still pending that's okay
// - we can still update our channel state, just as long as we don't return
// `MonitorEvent`s from the monitor back to the `ChannelManager` until they
// complete.
let monitor_is_pending_updates = monitor_data.has_pending_offchain_updates(&pending_monitor_updates);
if monitor_is_pending_updates || monitor_data.channel_perm_failed.load(Ordering::Acquire) {
// If there are still monitor updates pending (or an old monitor update
// finished after a later one perm-failed), we cannot yet construct an
// UpdateCompleted event.
return Ok(());
}
self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted {
funding_txo,
monitor_update_id: monitor_data.monitor.get_latest_update_id(),
});
},
MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => {
if !monitor_data.has_pending_chainsync_updates(&pending_monitor_updates) {
monitor_data.last_chain_persist_height.store(self.highest_chain_height.load(Ordering::Acquire), Ordering::Release);
// The next time release_pending_monitor_events is called, any events for this
// ChannelMonitor will be returned.
}
},
}
Ok(())
}
/// This wrapper avoids having to update some of our tests for now as they assume the direct
/// chain::Watch API wherein we mark a monitor fully-updated by just calling
/// channel_monitor_updated once with the highest ID.
#[cfg(any(test, feature = "fuzztarget"))]
pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) {
self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted {
funding_txo,
monitor_update_id,
});
}
#[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
use util::events::EventsProvider;
@ -285,7 +505,7 @@ where
let header = &block.header;
let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
log_debug!(self.logger, "New best block {} at height {} provided via block_connected", header.block_hash(), height);
self.process_chain_data(header, &txdata, |monitor, txdata| {
self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| {
monitor.block_connected(
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
});
@ -312,7 +532,7 @@ where
{
fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
log_debug!(self.logger, "{} provided transactions confirmed at height {} in block {}", txdata.len(), height, header.block_hash());
self.process_chain_data(header, txdata, |monitor, txdata| {
self.process_chain_data(header, None, txdata, |monitor, txdata| {
monitor.transactions_confirmed(
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
});
@ -328,7 +548,7 @@ where
fn best_block_updated(&self, header: &BlockHeader, height: u32) {
log_debug!(self.logger, "New best block {} at height {} provided via best_block_updated", header.block_hash(), height);
self.process_chain_data(header, &[], |monitor, txdata| {
self.process_chain_data(header, Some(height), &[], |monitor, txdata| {
// While in practice there shouldn't be any recursive calls when given empty txdata,
// it's still possible if a chain::Filter implementation returns a transaction.
debug_assert!(txdata.is_empty());
@ -372,12 +592,16 @@ where C::Target: chain::Filter,
return Err(ChannelMonitorUpdateErr::PermanentFailure)},
hash_map::Entry::Vacant(e) => e,
};
let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor);
let update_id = MonitorUpdateId::from_new_monitor(&monitor);
let mut pending_monitor_updates = Vec::new();
let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor, update_id);
if persist_res.is_err() {
log_error!(self.logger, "Failed to persist new channel data: {:?}", persist_res);
}
if persist_res == Err(ChannelMonitorUpdateErr::PermanentFailure) {
return persist_res;
} else if persist_res.is_err() {
pending_monitor_updates.push(update_id);
}
{
let funding_txo = monitor.get_funding_txo();
@ -387,7 +611,12 @@ where C::Target: chain::Filter,
monitor.load_outputs_to_watch(chain_source);
}
}
entry.insert(MonitorHolder { monitor });
entry.insert(MonitorHolder {
monitor,
pending_monitor_updates: Mutex::new(pending_monitor_updates),
channel_perm_failed: AtomicBool::new(false),
last_chain_persist_height: AtomicUsize::new(self.highest_chain_height.load(Ordering::Acquire)),
});
persist_res
}
@ -417,12 +646,21 @@ where C::Target: chain::Filter,
}
// Even if updating the monitor returns an error, the monitor's state will
// still be changed. So, persist the updated monitor despite the error.
let persist_res = self.persister.update_persisted_channel(funding_txo, &update, monitor);
if let Err(ref e) = persist_res {
let update_id = MonitorUpdateId::from_monitor_update(&update);
let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
let persist_res = self.persister.update_persisted_channel(funding_txo, &Some(update), monitor, update_id);
if let Err(e) = persist_res {
if e == ChannelMonitorUpdateErr::TemporaryFailure {
pending_monitor_updates.push(update_id);
} else {
monitor_state.channel_perm_failed.store(true, Ordering::Release);
}
log_error!(self.logger, "Failed to persist channel monitor update: {:?}", e);
}
if update_res.is_err() {
Err(ChannelMonitorUpdateErr::PermanentFailure)
} else if monitor_state.channel_perm_failed.load(Ordering::Acquire) {
Err(ChannelMonitorUpdateErr::PermanentFailure)
} else {
persist_res
}
@ -431,9 +669,33 @@ where C::Target: chain::Filter,
}
fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
let mut pending_monitor_events = Vec::new();
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
for monitor_state in self.monitors.read().unwrap().values() {
pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events());
let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap());
if is_pending_monitor_update &&
monitor_state.last_chain_persist_height.load(Ordering::Acquire) + LATENCY_GRACE_PERIOD_BLOCKS as usize
> self.highest_chain_height.load(Ordering::Acquire)
{
log_info!(self.logger, "A Channel Monitor sync is still in progress, refusing to provide monitor events!");
} else {
if monitor_state.channel_perm_failed.load(Ordering::Acquire) {
// If a `UpdateOrigin::ChainSync` persistence failed with `PermanantFailure`,
// we don't really know if the latest `ChannelMonitor` state is on disk or not.
// We're supposed to hold monitor updates until the latest state is on disk to
// avoid duplicate events, but the user told us persistence is screw-y and may
// not complete. We can't hold events forever because we may learn some payment
// preimage, so instead we just log and hope the user complied with the
// `PermanentFailure` requirements of having at least the local-disk copy
// updated.
log_info!(self.logger, "A Channel Monitor sync returned PermanentFailure. Returning monitor events but duplicate events may appear after reload!");
}
if is_pending_monitor_update {
log_error!(self.logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS);
log_error!(self.logger, " To avoid funds-loss, we are allowing monitor updates to be released.");
log_error!(self.logger, " This may cause duplicate payment events to be generated.");
}
pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events());
}
}
pending_monitor_events
}

View file

@ -131,7 +131,40 @@ pub enum MonitorEvent {
/// A monitor event that the Channel's commitment transaction was confirmed.
CommitmentTxConfirmed(OutPoint),
/// Indicates a [`ChannelMonitor`] update has completed. See
/// [`ChannelMonitorUpdateErr::TemporaryFailure`] for more information on how this is used.
///
/// [`ChannelMonitorUpdateErr::TemporaryFailure`]: super::ChannelMonitorUpdateErr::TemporaryFailure
UpdateCompleted {
/// The funding outpoint of the [`ChannelMonitor`] that was updated
funding_txo: OutPoint,
/// The Update ID from [`ChannelMonitorUpdate::update_id`] which was applied or
/// [`ChannelMonitor::get_latest_update_id`].
///
/// Note that this should only be set to a given update's ID if all previous updates for the
/// same [`ChannelMonitor`] have been applied and persisted.
monitor_update_id: u64,
},
/// Indicates a [`ChannelMonitor`] update has failed. See
/// [`ChannelMonitorUpdateErr::PermanentFailure`] for more information on how this is used.
///
/// [`ChannelMonitorUpdateErr::PermanentFailure`]: super::ChannelMonitorUpdateErr::PermanentFailure
UpdateFailed(OutPoint),
}
impl_writeable_tlv_based_enum_upgradable!(MonitorEvent,
// Note that UpdateCompleted and UpdateFailed are currently never serialized to disk as they are
// generated only in ChainMonitor
(0, UpdateCompleted) => {
(0, funding_txo, required),
(2, monitor_update_id, required),
},
;
(2, HTLCEvent),
(4, CommitmentTxConfirmed),
(6, UpdateFailed),
);
/// Simple structure sent back by `chain::Watch` when an HTLC from a forward channel is detected on
/// chain. Used to update the corresponding HTLC in the backward channel. Failing to pass the
@ -624,7 +657,17 @@ pub(crate) struct ChannelMonitorImpl<Signer: Sign> {
payment_preimages: HashMap<PaymentHash, PaymentPreimage>,
// Note that `MonitorEvent`s MUST NOT be generated during update processing, only generated
// during chain data processing. This prevents a race in `ChainMonitor::update_channel` (and
// presumably user implementations thereof as well) where we update the in-memory channel
// object, then before the persistence finishes (as it's all under a read-lock), we return
// pending events to the user or to the relevant `ChannelManager`. Then, on reload, we'll have
// the pre-event state here, but have processed the event in the `ChannelManager`.
// Note that because the `event_lock` in `ChainMonitor` is only taken in
// block/transaction-connected events and *not* during block/transaction-disconnected events,
// we further MUST NOT generate events during block/transaction-disconnection.
pending_monitor_events: Vec<MonitorEvent>,
pending_events: Vec<Event>,
// Used to track on-chain events (i.e., transactions part of channels confirmed on chain) on
@ -850,14 +893,19 @@ impl<Signer: Sign> Writeable for ChannelMonitorImpl<Signer> {
writer.write_all(&payment_preimage.0[..])?;
}
writer.write_all(&byte_utils::be64_to_array(self.pending_monitor_events.len() as u64))?;
writer.write_all(&(self.pending_monitor_events.iter().filter(|ev| match ev {
MonitorEvent::HTLCEvent(_) => true,
MonitorEvent::CommitmentTxConfirmed(_) => true,
_ => false,
}).count() as u64).to_be_bytes())?;
for event in self.pending_monitor_events.iter() {
match event {
MonitorEvent::HTLCEvent(upd) => {
0u8.write(writer)?;
upd.write(writer)?;
},
MonitorEvent::CommitmentTxConfirmed(_) => 1u8.write(writer)?
MonitorEvent::CommitmentTxConfirmed(_) => 1u8.write(writer)?,
_ => {}, // Covered in the TLV writes below
}
}
@ -891,6 +939,7 @@ impl<Signer: Sign> Writeable for ChannelMonitorImpl<Signer> {
write_tlv_fields!(writer, {
(1, self.funding_spend_confirmed, option),
(3, self.htlcs_resolved_on_chain, vec_type),
(5, self.pending_monitor_events, vec_type),
});
Ok(())
@ -3000,14 +3049,15 @@ impl<'a, Signer: Sign, K: KeysInterface<Signer = Signer>> ReadableArgs<&'a K>
}
let pending_monitor_events_len: u64 = Readable::read(reader)?;
let mut pending_monitor_events = Vec::with_capacity(cmp::min(pending_monitor_events_len as usize, MAX_ALLOC_SIZE / (32 + 8*3)));
let mut pending_monitor_events = Some(
Vec::with_capacity(cmp::min(pending_monitor_events_len as usize, MAX_ALLOC_SIZE / (32 + 8*3))));
for _ in 0..pending_monitor_events_len {
let ev = match <u8 as Readable>::read(reader)? {
0 => MonitorEvent::HTLCEvent(Readable::read(reader)?),
1 => MonitorEvent::CommitmentTxConfirmed(funding_info.0),
_ => return Err(DecodeError::InvalidValue)
};
pending_monitor_events.push(ev);
pending_monitor_events.as_mut().unwrap().push(ev);
}
let pending_events_len: u64 = Readable::read(reader)?;
@ -3068,6 +3118,7 @@ impl<'a, Signer: Sign, K: KeysInterface<Signer = Signer>> ReadableArgs<&'a K>
read_tlv_fields!(reader, {
(1, funding_spend_confirmed, option),
(3, htlcs_resolved_on_chain, vec_type),
(5, pending_monitor_events, vec_type),
});
let mut secp_ctx = Secp256k1::new();
@ -3107,7 +3158,7 @@ impl<'a, Signer: Sign, K: KeysInterface<Signer = Signer>> ReadableArgs<&'a K>
current_holder_commitment_number,
payment_preimages,
pending_monitor_events,
pending_monitor_events: pending_monitor_events.unwrap(),
pending_events,
onchain_events_awaiting_threshold_conf,

View file

@ -182,9 +182,10 @@ pub enum ChannelMonitorUpdateErr {
/// our state failed, but is expected to succeed at some point in the future).
///
/// Such a failure will "freeze" a channel, preventing us from revoking old states or
/// submitting new commitment transactions to the counterparty. Once the update(s) which failed
/// have been successfully applied, ChannelManager::channel_monitor_updated can be used to
/// restore the channel to an operational state.
/// submitting new commitment transactions to the counterparty. Once the update(s) that failed
/// have been successfully applied, a [`MonitorEvent::UpdateCompleted`] event should be returned
/// via [`Watch::release_pending_monitor_events`] which will then restore the channel to an
/// operational state.
///
/// Note that a given ChannelManager will *never* re-generate a given ChannelMonitorUpdate. If
/// you return a TemporaryFailure you must ensure that it is written to disk safely before
@ -198,13 +199,14 @@ pub enum ChannelMonitorUpdateErr {
/// the channel which would invalidate previous ChannelMonitors are not made when a channel has
/// been "frozen".
///
/// Note that even if updates made after TemporaryFailure succeed you must still call
/// channel_monitor_updated to ensure you have the latest monitor and re-enable normal channel
/// operation.
/// Note that even if updates made after TemporaryFailure succeed you must still provide a
/// [`MonitorEvent::UpdateCompleted`] to ensure you have the latest monitor and re-enable
/// normal channel operation. Note that this is normally generated through a call to
/// [`ChainMonitor::channel_monitor_updated`].
///
/// Note that the update being processed here will not be replayed for you when you call
/// ChannelManager::channel_monitor_updated, so you must store the update itself along
/// with the persisted ChannelMonitor on your own local disk prior to returning a
/// Note that the update being processed here will not be replayed for you when you return a
/// [`MonitorEvent::UpdateCompleted`] event via [`Watch::release_pending_monitor_events`], so
/// you must store the update itself on your own local disk prior to returning a
/// TemporaryFailure. You may, of course, employ a journaling approach, storing only the
/// ChannelMonitorUpdate on disk without updating the monitor itself, replaying the journal at
/// reload-time.
@ -212,6 +214,8 @@ pub enum ChannelMonitorUpdateErr {
/// For deployments where a copy of ChannelMonitors and other local state are backed up in a
/// remote location (with local copies persisted immediately), it is anticipated that all
/// updates will return TemporaryFailure until the remote copies could be updated.
///
/// [`ChainMonitor::channel_monitor_updated`]: chainmonitor::ChainMonitor::channel_monitor_updated
TemporaryFailure,
/// Used to indicate no further channel monitor updates will be allowed (eg we've moved on to a
/// different watchtower and cannot update with all watchtowers that were previously informed
@ -280,6 +284,13 @@ pub trait Watch<ChannelSigner: Sign> {
/// Returns any monitor events since the last call. Subsequent calls must only return new
/// events.
///
/// Note that after any block- or transaction-connection calls to a [`ChannelMonitor`], no
/// further events may be returned here until the [`ChannelMonitor`] has been fully persisted
/// to disk.
///
/// For details on asynchronous [`ChannelMonitor`] updating and returning
/// [`MonitorEvent::UpdateCompleted`] here, see [`ChannelMonitorUpdateErr::TemporaryFailure`].
fn release_pending_monitor_events(&self) -> Vec<MonitorEvent>;
}

View file

@ -176,8 +176,8 @@ fn do_test_simple_monitor_temporary_update_fail(disconnect: bool) {
}
chanmon_cfgs[0].persister.set_update_ret(Ok(()));
let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[0].node.channel_monitor_updated(&outpoint, latest_update);
let (outpoint, latest_update, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
check_added_monitors!(nodes[0], 0);
let mut events_2 = nodes[0].node.get_and_clear_pending_msg_events();
@ -329,8 +329,8 @@ fn do_test_monitor_temporary_update_fail(disconnect_count: usize) {
// Now fix monitor updating...
chanmon_cfgs[0].persister.set_update_ret(Ok(()));
let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[0].node.channel_monitor_updated(&outpoint, latest_update);
let (outpoint, latest_update, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
check_added_monitors!(nodes[0], 0);
macro_rules! disconnect_reconnect_peers { () => { {
@ -627,8 +627,8 @@ fn test_monitor_update_fail_cs() {
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
check_added_monitors!(nodes[1], 0);
let responses = nodes[1].node.get_and_clear_pending_msg_events();
assert_eq!(responses.len(), 2);
@ -661,8 +661,8 @@ fn test_monitor_update_fail_cs() {
}
chanmon_cfgs[0].persister.set_update_ret(Ok(()));
let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[0].node.channel_monitor_updated(&outpoint, latest_update);
let (outpoint, latest_update, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
check_added_monitors!(nodes[0], 0);
let final_raa = get_event_msg!(nodes[0], MessageSendEvent::SendRevokeAndACK, nodes[1].node.get_our_node_id());
@ -721,8 +721,8 @@ fn test_monitor_update_fail_no_rebroadcast() {
check_added_monitors!(nodes[1], 1);
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
check_added_monitors!(nodes[1], 0);
expect_pending_htlcs_forwardable!(nodes[1]);
@ -782,8 +782,8 @@ fn test_monitor_update_raa_while_paused() {
check_added_monitors!(nodes[0], 1);
chanmon_cfgs[0].persister.set_update_ret(Ok(()));
let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[0].node.channel_monitor_updated(&outpoint, latest_update);
let (outpoint, latest_update, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
check_added_monitors!(nodes[0], 0);
let as_update_raa = get_revoke_commit_msgs!(nodes[0], nodes[1].node.get_our_node_id());
@ -908,8 +908,8 @@ fn do_test_monitor_update_fail_raa(test_ignore_second_cs: bool) {
// Restore monitor updating, ensuring we immediately get a fail-back update and a
// update_add update.
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_2.2).unwrap().clone();
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_2.2).unwrap().clone();
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
check_added_monitors!(nodes[1], 0);
expect_pending_htlcs_forwardable!(nodes[1]);
check_added_monitors!(nodes[1], 1);
@ -1147,8 +1147,8 @@ fn test_monitor_update_fail_reestablish() {
.contents.flags & 2, 0); // The "disabled" bit should be unset as we just reconnected
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone();
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone();
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
check_added_monitors!(nodes[1], 0);
updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
@ -1230,8 +1230,8 @@ fn raa_no_response_awaiting_raa_state() {
check_added_monitors!(nodes[1], 1);
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
// nodes[1] should be AwaitingRAA here!
check_added_monitors!(nodes[1], 0);
let bs_responses = get_revoke_commit_msgs!(nodes[1], nodes[0].node.get_our_node_id());
@ -1347,8 +1347,8 @@ fn claim_while_disconnected_monitor_update_fail() {
// Now un-fail the monitor, which will result in B sending its original commitment update,
// receiving the commitment update from A, and the resulting commitment dances.
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
check_added_monitors!(nodes[1], 0);
let bs_msgs = nodes[1].node.get_and_clear_pending_msg_events();
@ -1456,8 +1456,8 @@ fn monitor_failed_no_reestablish_response() {
let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
check_added_monitors!(nodes[1], 0);
let bs_responses = get_revoke_commit_msgs!(nodes[1], nodes[0].node.get_our_node_id());
@ -1549,8 +1549,8 @@ fn first_message_on_recv_ordering() {
nodes[1].logger.assert_log("lightning::ln::channelmanager".to_string(), "Previous monitor update failure prevented generation of RAA".to_string(), 1);
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
check_added_monitors!(nodes[1], 0);
expect_pending_htlcs_forwardable!(nodes[1]);
@ -1633,8 +1633,8 @@ fn test_monitor_update_fail_claim() {
commitment_signed_dance!(nodes[1], nodes[2], payment_event.commitment_msg, false, true);
// Now restore monitor updating on the 0<->1 channel and claim the funds on B.
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone();
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone();
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
check_added_monitors!(nodes[1], 0);
let bs_fulfill_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
@ -1728,8 +1728,8 @@ fn test_monitor_update_on_pending_forwards() {
nodes[1].logger.assert_log("lightning::ln::channelmanager".to_string(), "Failed to update ChannelMonitor".to_string(), 1);
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone();
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone();
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
check_added_monitors!(nodes[1], 0);
let bs_updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
@ -1789,8 +1789,8 @@ fn monitor_update_claim_fail_no_response() {
nodes[1].logger.assert_log("lightning::ln::channelmanager".to_string(), "Temporary failure claiming HTLC, treating as success: Failed to update ChannelMonitor".to_string(), 1);
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
check_added_monitors!(nodes[1], 0);
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
@ -1848,8 +1848,8 @@ fn do_during_funding_monitor_fail(confirm_a_first: bool, restore_b_before_conf:
check_added_monitors!(nodes[0], 1);
assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
chanmon_cfgs[0].persister.set_update_ret(Ok(()));
let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[0].node.channel_monitor_updated(&outpoint, latest_update);
let (outpoint, latest_update, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
check_added_monitors!(nodes[0], 0);
let events = nodes[0].node.get_and_clear_pending_events();
@ -1880,8 +1880,8 @@ fn do_during_funding_monitor_fail(confirm_a_first: bool, restore_b_before_conf:
}
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
check_added_monitors!(nodes[1], 0);
let (channel_id, (announcement, as_update, bs_update)) = if !confirm_a_first {
@ -1965,8 +1965,8 @@ fn test_path_paused_mpp() {
// And check that, after we successfully update the monitor for chan_2 we can pass the second
// HTLC along to nodes[3] and claim the whole payment back to nodes[0].
let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_2_id).unwrap().clone();
nodes[0].node.channel_monitor_updated(&outpoint, latest_update);
let (outpoint, latest_update, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_2_id).unwrap().clone();
nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
let mut events = nodes[0].node.get_and_clear_pending_msg_events();
assert_eq!(events.len(), 1);
pass_along_path(&nodes[0], &[&nodes[2], &nodes[3]], 200_000, payment_hash.clone(), Some(payment_secret), events.pop().unwrap(), true, None);
@ -2301,8 +2301,8 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) {
// If we finish updating the monitor, we should free the holding cell right away (this did
// not occur prior to #756).
chanmon_cfgs[0].persister.set_update_ret(Ok(()));
let (funding_txo, mon_id) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id).unwrap().clone();
nodes[0].node.channel_monitor_updated(&funding_txo, mon_id);
let (funding_txo, mon_id, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id).unwrap().clone();
nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_txo, mon_id);
// New outbound messages should be generated immediately upon a call to
// get_and_clear_pending_msg_events (but not before).
@ -2499,15 +2499,15 @@ fn test_temporary_error_during_shutdown() {
chanmon_cfgs[0].persister.set_update_ret(Ok(()));
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[0].node.channel_monitor_updated(&outpoint, latest_update);
let (outpoint, latest_update, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendClosingSigned, nodes[1].node.get_our_node_id()));
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
nodes[0].node.handle_closing_signed(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendClosingSigned, nodes[0].node.get_our_node_id()));
let (_, closing_signed_a) = get_closing_signed_broadcast!(nodes[0].node, nodes[1].node.get_our_node_id());
@ -2586,7 +2586,7 @@ fn double_temp_error() {
// `claim_funds` results in a ChannelMonitorUpdate.
assert!(nodes[1].node.claim_funds(payment_preimage_1));
check_added_monitors!(nodes[1], 1);
let (funding_tx, latest_update_1) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
let (funding_tx, latest_update_1, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
chanmon_cfgs[1].persister.set_update_ret(Err(ChannelMonitorUpdateErr::TemporaryFailure));
// Previously, this would've panicked due to a double-call to `Channel::monitor_update_failed`,
@ -2595,11 +2595,11 @@ fn double_temp_error() {
check_added_monitors!(nodes[1], 1);
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
let (_, latest_update_2) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].node.channel_monitor_updated(&funding_tx, latest_update_1);
let (_, latest_update_2, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_tx, latest_update_1);
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
check_added_monitors!(nodes[1], 0);
nodes[1].node.channel_monitor_updated(&funding_tx, latest_update_2);
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_tx, latest_update_2);
// Complete the first HTLC.
let events = nodes[1].node.get_and_clear_pending_msg_events();

View file

@ -3404,27 +3404,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
self.our_network_pubkey.clone()
}
/// Restores a single, given channel to normal operation after a
/// ChannelMonitorUpdateErr::TemporaryFailure was returned from a channel monitor update
/// operation.
///
/// All ChannelMonitor updates up to and including highest_applied_update_id must have been
/// fully committed in every copy of the given channels' ChannelMonitors.
///
/// Note that there is no effect to calling with a highest_applied_update_id other than the
/// current latest ChannelMonitorUpdate and one call to this function after multiple
/// ChannelMonitorUpdateErr::TemporaryFailures is fine. The highest_applied_update_id field
/// exists largely only to prevent races between this and concurrent update_monitor calls.
///
/// Thus, the anticipated use is, at a high level:
/// 1) You register a chain::Watch with this ChannelManager,
/// 2) it stores each update to disk, and begins updating any remote (eg watchtower) copies of
/// said ChannelMonitors as it can, returning ChannelMonitorUpdateErr::TemporaryFailures
/// any time it cannot do so instantly,
/// 3) update(s) are applied to each remote copy of a ChannelMonitor,
/// 4) once all remote copies are updated, you call this function with the update_id that
/// completed, and once it is the latest the Channel will be re-enabled.
pub fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) {
fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
let chan_restoration_res;
@ -4129,7 +4109,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
}
},
MonitorEvent::CommitmentTxConfirmed(funding_outpoint) => {
MonitorEvent::CommitmentTxConfirmed(funding_outpoint) |
MonitorEvent::UpdateFailed(funding_outpoint) => {
let mut channel_lock = self.channel_state.lock().unwrap();
let channel_state = &mut *channel_lock;
let by_id = &mut channel_state.by_id;
@ -4145,7 +4126,12 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
msg: update
});
}
self.issue_channel_close_events(&chan, ClosureReason::CommitmentTxConfirmed);
let reason = if let MonitorEvent::UpdateFailed(_) = monitor_event {
ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() }
} else {
ClosureReason::CommitmentTxConfirmed
};
self.issue_channel_close_events(&chan, reason);
pending_msg_events.push(events::MessageSendEvent::HandleError {
node_id: chan.get_counterparty_node_id(),
action: msgs::ErrorAction::SendErrorMessage {
@ -4154,6 +4140,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
});
}
},
MonitorEvent::UpdateCompleted { funding_txo, monitor_update_id } => {
self.channel_monitor_updated(&funding_txo, monitor_update_id);
},
}
}
@ -4164,6 +4153,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
has_pending_monitor_events
}
/// In chanmon_consistency_target, we'd like to be able to restore monitor updating without
/// handling all pending events (i.e. not PendingHTLCsForwardable). Thus, we expose monitor
/// update events as a separate process method here.
#[cfg(feature = "fuzztarget")]
pub fn process_monitor_events(&self) {
self.process_pending_monitor_events();
}
/// Check the holding cell in each channel and free any pending HTLCs in them if possible.
/// Returns whether there were any updates such as if pending HTLCs were freed or a monitor
/// update was applied.
@ -5468,20 +5465,25 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
///
/// At a high-level, the process for deserializing a ChannelManager and resuming normal operation
/// is:
/// 1) Deserialize all stored ChannelMonitors.
/// 2) Deserialize the ChannelManager by filling in this struct and calling:
/// <(BlockHash, ChannelManager)>::read(reader, args)
/// This may result in closing some Channels if the ChannelMonitor is newer than the stored
/// ChannelManager state to ensure no loss of funds. Thus, transactions may be broadcasted.
/// 3) If you are not fetching full blocks, register all relevant ChannelMonitor outpoints the same
/// way you would handle a `chain::Filter` call using ChannelMonitor::get_outputs_to_watch() and
/// ChannelMonitor::get_funding_txo().
/// 4) Reconnect blocks on your ChannelMonitors.
/// 5) Disconnect/connect blocks on the ChannelManager.
/// 6) Move the ChannelMonitors into your local chain::Watch.
/// 1) Deserialize all stored [`ChannelMonitor`]s.
/// 2) Deserialize the [`ChannelManager`] by filling in this struct and calling:
/// `<(BlockHash, ChannelManager)>::read(reader, args)`
/// This may result in closing some channels if the [`ChannelMonitor`] is newer than the stored
/// [`ChannelManager`] state to ensure no loss of funds. Thus, transactions may be broadcasted.
/// 3) If you are not fetching full blocks, register all relevant [`ChannelMonitor`] outpoints the
/// same way you would handle a [`chain::Filter`] call using
/// [`ChannelMonitor::get_outputs_to_watch`] and [`ChannelMonitor::get_funding_txo`].
/// 4) Reconnect blocks on your [`ChannelMonitor`]s.
/// 5) Disconnect/connect blocks on the [`ChannelManager`].
/// 6) Re-persist the [`ChannelMonitor`]s to ensure the latest state is on disk.
/// Note that if you're using a [`ChainMonitor`] for your [`chain::Watch`] implementation, you
/// will likely accomplish this as a side-effect of calling [`chain::Watch::watch_channel`] in
/// the next step.
/// 7) Move the [`ChannelMonitor`]s into your local [`chain::Watch`]. If you're using a
/// [`ChainMonitor`], this is done by calling [`chain::Watch::watch_channel`].
///
/// Note that the ordering of #4-6 is not of importance, however all three must occur before you
/// call any other methods on the newly-deserialized ChannelManager.
/// Note that the ordering of #4-7 is not of importance, however all four must occur before you
/// call any other methods on the newly-deserialized [`ChannelManager`].
///
/// Note that because some channels may be closed during deserialization, it is critical that you
/// always deserialize only the latest version of a ChannelManager and ChannelMonitors available to
@ -5489,6 +5491,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
/// broadcast), and then later deserialize a newer version of the same ChannelManager (which will
/// not force-close the same channels but consider them live), you may end up revoking a state for
/// which you've already broadcasted the transaction.
///
/// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor
pub struct ChannelManagerReadArgs<'a, Signer: 'a + Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
where M::Target: chain::Watch<Signer>,
T::Target: BroadcasterInterface,

View file

@ -12,7 +12,7 @@
//! claim outputs on-chain.
use chain;
use chain::{Confirm, Listen, Watch};
use chain::{Confirm, Listen, Watch, ChannelMonitorUpdateErr};
use chain::channelmonitor;
use chain::channelmonitor::{ChannelMonitor, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY};
use chain::transaction::OutPoint;
@ -4100,21 +4100,15 @@ fn test_no_txn_manager_serialize_deserialize() {
send_payment(&nodes[0], &[&nodes[1]], 1000000);
}
#[test]
fn test_dup_htlc_onchain_fails_on_reload() {
fn do_test_dup_htlc_onchain_fails_on_reload(persist_manager_post_event: bool) {
// When a Channel is closed, any outbound HTLCs which were relayed through it are simply
// dropped when the Channel is. From there, the ChannelManager relies on the ChannelMonitor
// having a copy of the relevant fail-/claim-back data and processes the HTLC fail/claim when
// the ChannelMonitor tells it to.
//
// If, due to an on-chain event, an HTLC is failed/claimed, and then we serialize the
// ChannelManager, we generally expect there not to be a duplicate HTLC fail/claim (eg via a
// PaymentPathFailed event appearing). However, because we may not serialize the relevant
// ChannelMonitor at the same time, this isn't strictly guaranteed. In order to provide this
// consistency, the ChannelManager explicitly tracks pending-onchain-resolution outbound HTLCs
// and de-duplicates ChannelMonitor events.
//
// This tests that explicit tracking behavior.
// If, due to an on-chain event, an HTLC is failed/claimed, we should avoid providing the
// ChannelManager the HTLC event until after the monitor is re-persisted. This should prevent a
// duplicate HTLC fail/claim (e.g. via a PaymentPathFailed event).
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
@ -4123,7 +4117,7 @@ fn test_dup_htlc_onchain_fails_on_reload() {
let nodes_0_deserialized: ChannelManager<EnforcingSigner, &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestLogger>;
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()).2;
let (_, _, chan_id, funding_tx) = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
// Route a payment, but force-close the channel before the HTLC fulfill message arrives at
// nodes[0].
@ -4141,35 +4135,59 @@ fn test_dup_htlc_onchain_fails_on_reload() {
let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
assert_eq!(node_txn.len(), 3);
assert_eq!(node_txn[0], node_txn[1]);
check_spends!(node_txn[1], funding_tx);
check_spends!(node_txn[2], node_txn[1]);
assert!(nodes[1].node.claim_funds(payment_preimage));
check_added_monitors!(nodes[1], 1);
let mut header = BlockHeader { version: 0x20000000, prev_blockhash: nodes[1].best_block_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
connect_block(&nodes[1], &Block { header, txdata: vec![node_txn[1].clone(), node_txn[2].clone()]});
connect_block(&nodes[1], &Block { header, txdata: vec![node_txn[1].clone()]});
check_closed_broadcast!(nodes[1], true);
check_added_monitors!(nodes[1], 1);
check_closed_event!(nodes[1], 1, ClosureReason::CommitmentTxConfirmed);
let claim_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
header.prev_blockhash = nodes[0].best_block_hash();
connect_block(&nodes[0], &Block { header, txdata: vec![node_txn[1].clone(), node_txn[2].clone()]});
connect_block(&nodes[0], &Block { header, txdata: vec![node_txn[1].clone()]});
// Serialize out the ChannelMonitor before connecting the on-chain claim transactions. This is
// fairly normal behavior as ChannelMonitor(s) are often not re-serialized when on-chain events
// happen, unlike ChannelManager which tends to be re-serialized after any relevant event(s).
let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new());
get_monitor!(nodes[0], chan_id).write(&mut chan_0_monitor_serialized).unwrap();
// Now connect the HTLC claim transaction with the ChainMonitor-generated ChannelMonitor update
// returning TemporaryFailure. This should cause the claim event to never make its way to the
// ChannelManager.
chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
chanmon_cfgs[0].persister.set_update_ret(Err(ChannelMonitorUpdateErr::TemporaryFailure));
header.prev_blockhash = nodes[0].best_block_hash();
let claim_block = Block { header, txdata: claim_txn};
let claim_block = Block { header, txdata: claim_txn };
connect_block(&nodes[0], &claim_block);
let funding_txo = OutPoint { txid: funding_tx.txid(), index: 0 };
let mon_updates: Vec<_> = chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap()
.get_mut(&funding_txo).unwrap().drain().collect();
assert_eq!(mon_updates.len(), 1);
assert!(nodes[0].chain_monitor.release_pending_monitor_events().is_empty());
assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
// If we persist the ChannelManager here, we should get the PaymentSent event after
// deserialization.
let mut chan_manager_serialized = test_utils::TestVecWriter(Vec::new());
if !persist_manager_post_event {
nodes[0].node.write(&mut chan_manager_serialized).unwrap();
}
// Now persist the ChannelMonitor and inform the ChainMonitor that we're done, generating the
// payment sent event.
chanmon_cfgs[0].persister.set_update_ret(Ok(()));
let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new());
get_monitor!(nodes[0], chan_id).write(&mut chan_0_monitor_serialized).unwrap();
nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(funding_txo, mon_updates[0]).unwrap();
expect_payment_sent!(nodes[0], payment_preimage);
// ChannelManagers generally get re-serialized after any relevant event(s). Since we just
// connected a highly-relevant block, it likely gets serialized out now.
let mut chan_manager_serialized = test_utils::TestVecWriter(Vec::new());
nodes[0].node.write(&mut chan_manager_serialized).unwrap();
// If we persist the ChannelManager after we get the PaymentSent event, we shouldn't get it
// twice.
if persist_manager_post_event {
nodes[0].node.write(&mut chan_manager_serialized).unwrap();
}
// Now reload nodes[0]...
persister = test_utils::TestPersister::new();
@ -4201,6 +4219,12 @@ fn test_dup_htlc_onchain_fails_on_reload() {
check_added_monitors!(nodes[0], 1);
nodes[0].node = &nodes_0_deserialized;
if persist_manager_post_event {
assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
} else {
expect_payment_sent!(nodes[0], payment_preimage);
}
// Note that if we re-connect the block which exposed nodes[0] to the payment preimage (but
// which the current ChannelMonitor has not seen), the ChannelManager's de-duplication of
// payment events should kick in, leaving us with no pending events here.
@ -4209,6 +4233,12 @@ fn test_dup_htlc_onchain_fails_on_reload() {
assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
}
#[test]
fn test_dup_htlc_onchain_fails_on_reload() {
do_test_dup_htlc_onchain_fails_on_reload(true);
do_test_dup_htlc_onchain_fails_on_reload(false);
}
#[test]
fn test_manager_serialize_deserialize_events() {
// This test makes sure the events field in ChannelManager survives de/serialization

View file

@ -24,7 +24,7 @@ use ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager};
use util::ser::{VecWriter, Writeable, Writer};
use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
use ln::wire;
use util::byte_utils;
use util::atomic_counter::AtomicCounter;
use util::events::{MessageSendEvent, MessageSendEventsProvider};
use util::logger::Logger;
use routing::network_graph::NetGraphMsgHandler;
@ -33,7 +33,6 @@ use prelude::*;
use io;
use alloc::collections::LinkedList;
use sync::{Arc, Mutex};
use core::sync::atomic::{AtomicUsize, Ordering};
use core::{cmp, hash, fmt, mem};
use core::ops::Deref;
use core::convert::Infallible;
@ -343,12 +342,6 @@ struct PeerHolder<Descriptor: SocketDescriptor> {
node_id_to_descriptor: HashMap<PublicKey, Descriptor>,
}
#[cfg(not(any(target_pointer_width = "32", target_pointer_width = "64")))]
fn _check_usize_is_32_or_64() {
// See below, less than 32 bit pointers may be unsafe here!
unsafe { mem::transmute::<*const usize, [u8; 4]>(panic!()); }
}
/// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
/// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static
/// lifetimes). Other times you can afford a reference, which is more efficient, in which case
@ -394,10 +387,7 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: De
ephemeral_key_midstate: Sha256Engine,
custom_message_handler: CMH,
// Usize needs to be at least 32 bits to avoid overflowing both low and high. If usize is 64
// bits we will never realistically count into high:
peer_counter_low: AtomicUsize,
peer_counter_high: AtomicUsize,
peer_counter: AtomicCounter,
logger: L,
}
@ -485,8 +475,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
}),
our_node_secret,
ephemeral_key_midstate,
peer_counter_low: AtomicUsize::new(0),
peer_counter_high: AtomicUsize::new(0),
peer_counter: AtomicCounter::new(),
logger,
custom_message_handler,
}
@ -509,14 +498,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
fn get_ephemeral_key(&self) -> SecretKey {
let mut ephemeral_hash = self.ephemeral_key_midstate.clone();
let low = self.peer_counter_low.fetch_add(1, Ordering::AcqRel);
let high = if low == 0 {
self.peer_counter_high.fetch_add(1, Ordering::AcqRel)
} else {
self.peer_counter_high.load(Ordering::Acquire)
};
ephemeral_hash.input(&byte_utils::le64_to_array(low as u64));
ephemeral_hash.input(&byte_utils::le64_to_array(high as u64));
let counter = self.peer_counter.get_increment();
ephemeral_hash.input(&counter.to_le_bytes());
SecretKey::from_slice(&Sha256::from_engine(ephemeral_hash).into_inner()).expect("You broke SHA-256!")
}

View file

@ -0,0 +1,31 @@
//! A simple atomic counter that uses AtomicUsize to give a u64 counter.
#[cfg(not(any(target_pointer_width = "32", target_pointer_width = "64")))]
compile_error!("We need at least 32-bit pointers for atomic counter (and to have enough memory to run LDK)");
use core::sync::atomic::{AtomicUsize, Ordering};
pub(crate) struct AtomicCounter {
// Usize needs to be at least 32 bits to avoid overflowing both low and high. If usize is 64
// bits we will never realistically count into high:
counter_low: AtomicUsize,
counter_high: AtomicUsize,
}
impl AtomicCounter {
pub(crate) fn new() -> Self {
Self {
counter_low: AtomicUsize::new(0),
counter_high: AtomicUsize::new(0),
}
}
pub(crate) fn get_increment(&self) -> u64 {
let low = self.counter_low.fetch_add(1, Ordering::AcqRel) as u64;
let high = if low == 0 {
self.counter_high.fetch_add(1, Ordering::AcqRel) as u64
} else {
self.counter_high.load(Ordering::Acquire) as u64
};
(high << 32) | low
}
}

View file

@ -70,20 +70,6 @@ pub fn be64_to_array(u: u64) -> [u8; 8] {
v
}
#[inline]
pub fn le64_to_array(u: u64) -> [u8; 8] {
let mut v = [0; 8];
v[0] = ((u >> 8*0) & 0xff) as u8;
v[1] = ((u >> 8*1) & 0xff) as u8;
v[2] = ((u >> 8*2) & 0xff) as u8;
v[3] = ((u >> 8*3) & 0xff) as u8;
v[4] = ((u >> 8*4) & 0xff) as u8;
v[5] = ((u >> 8*5) & 0xff) as u8;
v[6] = ((u >> 8*6) & 0xff) as u8;
v[7] = ((u >> 8*7) & 0xff) as u8;
v
}
#[cfg(test)]
mod tests {
use super::*;
@ -96,6 +82,5 @@ mod tests {
assert_eq!(be32_to_array(0xdeadbeef), [0xde, 0xad, 0xbe, 0xef]);
assert_eq!(be48_to_array(0xdeadbeef1bad), [0xde, 0xad, 0xbe, 0xef, 0x1b, 0xad]);
assert_eq!(be64_to_array(0xdeadbeef1bad1dea), [0xde, 0xad, 0xbe, 0xef, 0x1b, 0xad, 0x1d, 0xea]);
assert_eq!(le64_to_array(0xdeadbeef1bad1dea), [0xea, 0x1d, 0xad, 0x1b, 0xef, 0xbe, 0xad, 0xde]);
}
}

View file

@ -16,8 +16,6 @@ mod real_chachapoly {
use util::poly1305::Poly1305;
use bitcoin::hashes::cmp::fixed_time_eq;
use util::byte_utils;
#[derive(Clone, Copy)]
pub struct ChaCha20Poly1305RFC {
cipher: ChaCha20,
@ -67,8 +65,8 @@ mod real_chachapoly {
self.mac.input(output);
ChaCha20Poly1305RFC::pad_mac_16(&mut self.mac, self.data_len);
self.finished = true;
self.mac.input(&byte_utils::le64_to_array(self.aad_len));
self.mac.input(&byte_utils::le64_to_array(self.data_len as u64));
self.mac.input(&self.aad_len.to_le_bytes());
self.mac.input(&(self.data_len as u64).to_le_bytes());
self.mac.raw_result(out_tag);
}
@ -82,8 +80,8 @@ mod real_chachapoly {
self.data_len += input.len();
ChaCha20Poly1305RFC::pad_mac_16(&mut self.mac, self.data_len);
self.mac.input(&byte_utils::le64_to_array(self.aad_len));
self.mac.input(&byte_utils::le64_to_array(self.data_len as u64));
self.mac.input(&self.aad_len.to_le_bytes());
self.mac.input(&(self.data_len as u64).to_le_bytes());
let mut calc_tag = [0u8; 16];
self.mac.raw_result(&mut calc_tag);

View file

@ -20,6 +20,7 @@ pub mod errors;
pub mod ser;
pub mod message_signing;
pub(crate) mod atomic_counter;
pub(crate) mod byte_utils;
pub(crate) mod chacha20;
#[cfg(feature = "fuzztarget")]

View file

@ -455,9 +455,12 @@ macro_rules! _impl_writeable_tlv_based_enum_common {
macro_rules! impl_writeable_tlv_based_enum_upgradable {
($st: ident, $(($variant_id: expr, $variant_name: ident) =>
{$(($type: expr, $field: ident, $fieldty: tt)),* $(,)*}
),* $(,)*) => {
),* $(,)*
$(;
$(($tuple_variant_id: expr, $tuple_variant_name: ident)),* $(,)*)*) => {
_impl_writeable_tlv_based_enum_common!($st,
$(($variant_id, $variant_name) => {$(($type, $field, $fieldty)),*}),*; );
$(($variant_id, $variant_name) => {$(($type, $field, $fieldty)),*}),*;
$($(($tuple_variant_id, $tuple_variant_name)),*)*);
impl ::util::ser::MaybeReadable for $st {
fn read<R: $crate::io::Read>(reader: &mut R) -> Result<Option<Self>, ::ln::msgs::DecodeError> {
@ -481,6 +484,9 @@ macro_rules! impl_writeable_tlv_based_enum_upgradable {
};
f()
}),*
$($($tuple_variant_id => {
Ok(Some($st::$tuple_variant_name(Readable::read(reader)?)))
}),*)*
_ if id % 2 == 1 => Ok(None),
_ => Err(DecodeError::UnknownRequiredFeature),
}

View file

@ -12,6 +12,7 @@ use chain::WatchedOutput;
use chain::chaininterface;
use chain::chaininterface::ConfirmationTarget;
use chain::chainmonitor;
use chain::chainmonitor::MonitorUpdateId;
use chain::channelmonitor;
use chain::channelmonitor::MonitorEvent;
use chain::transaction::OutPoint;
@ -88,7 +89,7 @@ impl keysinterface::KeysInterface for OnlyReadsKeysInterface {
pub struct TestChainMonitor<'a> {
pub added_monitors: Mutex<Vec<(OutPoint, channelmonitor::ChannelMonitor<EnforcingSigner>)>>,
pub latest_monitor_update_id: Mutex<HashMap<[u8; 32], (OutPoint, u64)>>,
pub latest_monitor_update_id: Mutex<HashMap<[u8; 32], (OutPoint, u64, MonitorUpdateId)>>,
pub chain_monitor: chainmonitor::ChainMonitor<EnforcingSigner, &'a TestChainSource, &'a chaininterface::BroadcasterInterface, &'a TestFeeEstimator, &'a TestLogger, &'a chainmonitor::Persist<EnforcingSigner>>,
pub keys_manager: &'a TestKeysInterface,
/// If this is set to Some(), the next update_channel call (not watch_channel) must be a
@ -116,7 +117,8 @@ impl<'a> chain::Watch<EnforcingSigner> for TestChainMonitor<'a> {
let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor<EnforcingSigner>)>::read(
&mut io::Cursor::new(&w.0), self.keys_manager).unwrap().1;
assert!(new_monitor == monitor);
self.latest_monitor_update_id.lock().unwrap().insert(funding_txo.to_channel_id(), (funding_txo, monitor.get_latest_update_id()));
self.latest_monitor_update_id.lock().unwrap().insert(funding_txo.to_channel_id(),
(funding_txo, monitor.get_latest_update_id(), MonitorUpdateId::from_new_monitor(&monitor)));
self.added_monitors.lock().unwrap().push((funding_txo, monitor));
self.chain_monitor.watch_channel(funding_txo, new_monitor)
}
@ -136,7 +138,8 @@ impl<'a> chain::Watch<EnforcingSigner> for TestChainMonitor<'a> {
} else { panic!(); }
}
self.latest_monitor_update_id.lock().unwrap().insert(funding_txo.to_channel_id(), (funding_txo, update.update_id));
self.latest_monitor_update_id.lock().unwrap().insert(funding_txo.to_channel_id(),
(funding_txo, update.update_id, MonitorUpdateId::from_monitor_update(&update)));
let update_res = self.chain_monitor.update_channel(funding_txo, update);
// At every point where we get a monitor update, we should be able to send a useful monitor
// to a watchtower and disk...
@ -160,13 +163,16 @@ pub struct TestPersister {
/// If this is set to Some(), after the next return, we'll always return this until update_ret
/// is changed:
pub next_update_ret: Mutex<Option<Result<(), chain::ChannelMonitorUpdateErr>>>,
/// When we get an update_persisted_channel call with no ChannelMonitorUpdate, we insert the
/// MonitorUpdateId here.
pub chain_sync_monitor_persistences: Mutex<HashMap<OutPoint, HashSet<MonitorUpdateId>>>,
}
impl TestPersister {
pub fn new() -> Self {
Self {
update_ret: Mutex::new(Ok(())),
next_update_ret: Mutex::new(None),
chain_sync_monitor_persistences: Mutex::new(HashMap::new()),
}
}
@ -179,7 +185,7 @@ impl TestPersister {
}
}
impl<Signer: keysinterface::Sign> chainmonitor::Persist<Signer> for TestPersister {
fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor<Signer>) -> Result<(), chain::ChannelMonitorUpdateErr> {
fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor<Signer>, _id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
let ret = self.update_ret.lock().unwrap().clone();
if let Some(next_ret) = self.next_update_ret.lock().unwrap().take() {
*self.update_ret.lock().unwrap() = next_ret;
@ -187,11 +193,14 @@ impl<Signer: keysinterface::Sign> chainmonitor::Persist<Signer> for TestPersiste
ret
}
fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &channelmonitor::ChannelMonitorUpdate, _data: &channelmonitor::ChannelMonitor<Signer>) -> Result<(), chain::ChannelMonitorUpdateErr> {
fn update_persisted_channel(&self, funding_txo: OutPoint, update: &Option<channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor<Signer>, update_id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
let ret = self.update_ret.lock().unwrap().clone();
if let Some(next_ret) = self.next_update_ret.lock().unwrap().take() {
*self.update_ret.lock().unwrap() = next_ret;
}
if update.is_none() {
self.chain_sync_monitor_persistences.lock().unwrap().entry(funding_txo).or_insert(HashSet::new()).insert(update_id);
}
ret
}
}