Move events into ChannelMonitor from ManyChannelMonitor

This is the next step after "Move pending-HTLC-updated ChannelMonitor
from ManyChannelMonitor", moving our events into ChannelMonitor as
well and leaving only new-outputs-to-watch in the return value for
ChannelMonitor::block_connected (which is fine as those are
duplicatively tracked in the ChannelMonitor directly, so
losing/replaying them is acceptable).
This commit is contained in:
Matt Corallo 2020-02-12 16:55:04 -05:00
parent bfd4ac4995
commit 26008bbc0b

View file

@ -37,7 +37,7 @@ use chain::chaininterface::{ChainListener, ChainWatchInterface, BroadcasterInter
use chain::transaction::OutPoint;
use chain::keysinterface::{SpendableOutputDescriptor, ChannelKeys};
use util::logger::Logger;
use util::ser::{ReadableArgs, Readable, Writer, Writeable, U48};
use util::ser::{ReadableArgs, Readable, MaybeReadable, Writer, Writeable, U48};
use util::{byte_utils, events};
use std::collections::{HashMap, hash_map, HashSet};
@ -222,7 +222,6 @@ pub struct SimpleManyChannelMonitor<Key, ChanSigner: ChannelKeys, T: Deref, F: D
monitors: Mutex<HashMap<Key, ChannelMonitor<ChanSigner>>>,
chain_monitor: Arc<ChainWatchInterface>,
broadcaster: T,
pending_events: Mutex<Vec<events::Event>>,
logger: Arc<Logger>,
fee_estimator: F
}
@ -234,16 +233,10 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref +
{
fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) {
let block_hash = header.bitcoin_hash();
let mut new_events: Vec<events::Event> = Vec::with_capacity(0);
{
let mut monitors = self.monitors.lock().unwrap();
for monitor in monitors.values_mut() {
let (txn_outputs, spendable_outputs) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator);
if spendable_outputs.len() > 0 {
new_events.push(events::Event::SpendableOutputs {
outputs: spendable_outputs,
});
}
let txn_outputs = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator);
for (ref txid, ref outputs) in txn_outputs {
for (idx, output) in outputs.iter().enumerate() {
@ -252,8 +245,6 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref +
}
}
}
let mut pending_events = self.pending_events.lock().unwrap();
pending_events.append(&mut new_events);
}
fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
@ -276,7 +267,6 @@ impl<Key : Send + cmp::Eq + hash::Hash + 'static, ChanSigner: ChannelKeys, T: De
monitors: Mutex::new(HashMap::new()),
chain_monitor,
broadcaster,
pending_events: Mutex::new(Vec::new()),
logger,
fee_estimator: feeest,
};
@ -362,10 +352,11 @@ impl<Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref, F: De
F::Target: FeeEstimator
{
fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
let mut pending_events = self.pending_events.lock().unwrap();
let mut ret = Vec::new();
mem::swap(&mut ret, &mut *pending_events);
ret
let mut pending_events = Vec::new();
for chan in self.monitors.lock().unwrap().values_mut() {
pending_events.append(&mut chan.get_and_clear_pending_events());
}
pending_events
}
}
@ -835,6 +826,7 @@ pub struct ChannelMonitor<ChanSigner: ChannelKeys> {
payment_preimages: HashMap<PaymentHash, PaymentPreimage>,
pending_htlcs_updated: Vec<HTLCUpdate>,
pending_events: Vec<events::Event>,
destination_script: Script,
// Thanks to data loss protection, we may be able to claim our non-htlc funds
@ -948,6 +940,7 @@ impl<ChanSigner: ChannelKeys> PartialEq for ChannelMonitor<ChanSigner> {
self.current_local_signed_commitment_tx != other.current_local_signed_commitment_tx ||
self.payment_preimages != other.payment_preimages ||
self.pending_htlcs_updated != other.pending_htlcs_updated ||
self.pending_events.len() != other.pending_events.len() || // We trust events to round-trip properly
self.destination_script != other.destination_script ||
self.to_remote_rescue != other.to_remote_rescue ||
self.pending_claim_requests != other.pending_claim_requests ||
@ -1135,6 +1128,11 @@ impl<ChanSigner: ChannelKeys + Writeable> ChannelMonitor<ChanSigner> {
data.write(writer)?;
}
writer.write_all(&byte_utils::be64_to_array(self.pending_events.len() as u64))?;
for event in self.pending_events.iter() {
event.write(writer)?;
}
self.last_block_hash.write(writer)?;
self.destination_script.write(writer)?;
if let Some((ref to_remote_script, ref local_key)) = self.to_remote_rescue {
@ -1267,6 +1265,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
payment_preimages: HashMap::new(),
pending_htlcs_updated: Vec::new(),
pending_events: Vec::new(),
destination_script: destination_script.clone(),
to_remote_rescue: None,
@ -1560,6 +1559,18 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
ret
}
/// Gets the list of pending events which were generated by previous actions, clearing the list
/// in the process.
///
/// This is called by ManyChannelMonitor::get_and_clear_pending_events() and is equivalent to
/// EventsProvider::get_and_clear_pending_events() except that it requires &mut self as we do
/// no internal locking in ChannelMonitors.
pub fn get_and_clear_pending_events(&mut self) -> Vec<events::Event> {
let mut ret = Vec::new();
mem::swap(&mut ret, &mut self.pending_events);
ret
}
/// Can only fail if idx is < get_min_seen_secret
pub(super) fn get_secret(&self, idx: u64) -> Option<[u8; 32]> {
self.commitment_secrets.get_secret(idx)
@ -2534,7 +2545,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
/// Eventually this should be pub and, roughly, implement ChainListener, however this requires
/// &mut self, as well as returns new spendable outputs and outpoints to watch for spending of
/// on-chain.
fn block_connected<B: Deref, F: Deref>(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: B, fee_estimator: F)-> (Vec<(Sha256dHash, Vec<TxOut>)>, Vec<SpendableOutputDescriptor>)
fn block_connected<B: Deref, F: Deref>(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: B, fee_estimator: F)-> Vec<(Sha256dHash, Vec<TxOut>)>
where B::Target: BroadcasterInterface,
F::Target: FeeEstimator
{
@ -2767,7 +2778,14 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
for &(ref txid, ref output_scripts) in watch_outputs.iter() {
self.outputs_to_watch.insert(txid.clone(), output_scripts.iter().map(|o| o.script_pubkey.clone()).collect());
}
(watch_outputs, spendable_outputs)
if spendable_outputs.len() > 0 {
self.pending_events.push(events::Event::SpendableOutputs {
outputs: spendable_outputs,
});
}
watch_outputs
}
fn block_disconnected<B: Deref, F: Deref>(&mut self, height: u32, block_hash: &Sha256dHash, broadcaster: B, fee_estimator: F)
@ -3369,6 +3387,14 @@ impl<R: ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>> ReadableArgs<R,
pending_htlcs_updated.push(Readable::read(reader)?);
}
let pending_events_len: u64 = Readable::read(reader)?;
let mut pending_events = Vec::with_capacity(cmp::min(pending_events_len as usize, MAX_ALLOC_SIZE / mem::size_of::<events::Event>()));
for _ in 0..pending_events_len {
if let Some(event) = MaybeReadable::read(reader)? {
pending_events.push(event);
}
}
let last_block_hash: Sha256dHash = Readable::read(reader)?;
let destination_script = Readable::read(reader)?;
let to_remote_rescue = match <u8 as Readable<R>>::read(reader)? {
@ -3471,6 +3497,7 @@ impl<R: ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>> ReadableArgs<R,
payment_preimages,
pending_htlcs_updated,
pending_events,
destination_script,
to_remote_rescue,