Add BackgroundProcessor for ChannelManager persistence and other

Other includes calling timer_chan_freshness_every_minute() and in the
future, possibly persisting channel graph data.

This struct is suitable for things that need to happen periodically and
can happen in the background.
This commit is contained in:
Valentine Wallace 2021-01-11 18:03:32 -05:00
parent 12c735ab3a
commit a368093803
No known key found for this signature in database
GPG key ID: F88EC43B95E601B8
5 changed files with 350 additions and 3 deletions

View file

@ -5,6 +5,7 @@ members = [
"lightning-block-sync",
"lightning-net-tokio",
"lightning-persister",
"background-processor",
]
# Our tests do actual crypo and lots of work, the tradeoff for -O1 is well worth it.

View file

@ -0,0 +1,15 @@
[package]
name = "background-processor"
version = "0.1.0"
authors = ["Valentine Wallace <vwallace@protonmail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
bitcoin = "0.24"
lightning = { version = "0.0.12", path = "../lightning", features = ["allow_wallclock_use"] }
lightning-persister = { version = "0.0.1", path = "../lightning-persister" }
[dev-dependencies]
lightning = { version = "0.0.12", path = "../lightning", features = ["_test_utils"] }

View file

@ -0,0 +1,294 @@
#[macro_use] extern crate lightning;
use lightning::chain;
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use lightning::chain::keysinterface::{ChannelKeys, KeysInterface};
use lightning::ln::channelmanager::ChannelManager;
use lightning::util::logger::Logger;
use lightning::util::ser::Writeable;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
/// BackgroundProcessor takes care of tasks that (1) need to happen periodically to keep
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
/// responsibilities are:
/// * Monitoring whether the ChannelManager needs to be re-persisted to disk, and if so,
/// writing it to disk/backups by invoking the callback given to it at startup.
/// ChannelManager persistence should be done in the background.
/// * Calling `ChannelManager::timer_chan_freshness_every_min()` every minute (can be done in the
/// background).
///
/// Note that if ChannelManager persistence fails and the persisted manager becomes out-of-date,
/// then there is a risk of channels force-closing on startup when the manager realizes it's
/// outdated. However, as long as `ChannelMonitor` backups are sound, no funds besides those used
/// for unilateral chain closure fees are at risk.
pub struct BackgroundProcessor {
stop_thread: Arc<AtomicBool>,
/// May be used to retrieve and handle the error if `BackgroundProcessor`'s thread
/// exits due to an error while persisting.
pub thread_handle: JoinHandle<Result<(), std::io::Error>>,
}
#[cfg(not(test))]
const CHAN_FRESHNESS_TIMER: u64 = 60;
#[cfg(test)]
const CHAN_FRESHNESS_TIMER: u64 = 1;
impl BackgroundProcessor {
/// Start a background thread that takes care of responsibilities enumerated in the top-level
/// documentation.
///
/// If `persist_manager` returns an error, then this thread will return said error (and `start()`
/// will need to be called again to restart the `BackgroundProcessor`). Users should wait on
/// [`thread_handle`]'s `join()` method to be able to tell if and when an error is returned, or
/// implement `persist_manager` such that an error is never returned to the `BackgroundProcessor`
///
/// `persist_manager` is responsible for writing out the `ChannelManager` to disk, and/or uploading
/// to one or more backup services. See [`ChannelManager::write`] for writing out a `ChannelManager`.
/// See [`FilesystemPersister::persist_manager`] for Rust-Lightning's provided implementation.
///
/// [`thread_handle`]: struct.BackgroundProcessor.html#structfield.thread_handle
/// [`ChannelManager::write`]: ../lightning/ln/channelmanager/struct.ChannelManager.html#method.write
/// [`FilesystemPersister::persist_manager`]: ../lightning_persister/struct.FilesystemPersister.html#impl
pub fn start<PM, ChanSigner, M, T, K, F, L>(persist_manager: PM, manager: Arc<ChannelManager<ChanSigner, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>>, logger: Arc<L>) -> Self
where ChanSigner: 'static + ChannelKeys + Writeable,
M: 'static + chain::Watch<Keys=ChanSigner>,
T: 'static + BroadcasterInterface,
K: 'static + KeysInterface<ChanKeySigner=ChanSigner>,
F: 'static + FeeEstimator,
L: 'static + Logger,
PM: 'static + Send + Fn(&ChannelManager<ChanSigner, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>) -> Result<(), std::io::Error>,
{
let stop_thread = Arc::new(AtomicBool::new(false));
let stop_thread_clone = stop_thread.clone();
let handle = thread::spawn(move || -> Result<(), std::io::Error> {
let mut current_time = Instant::now();
loop {
let updates_available = manager.wait_timeout(Duration::from_millis(100));
if updates_available {
persist_manager(&*manager)?;
}
// Exit the loop if the background processor was requested to stop.
if stop_thread.load(Ordering::Acquire) == true {
log_trace!(logger, "Terminating background processor.");
return Ok(())
}
if current_time.elapsed().as_secs() > CHAN_FRESHNESS_TIMER {
log_trace!(logger, "Calling manager's timer_chan_freshness_every_min");
manager.timer_chan_freshness_every_min();
current_time = Instant::now();
}
}
});
Self {
stop_thread: stop_thread_clone,
thread_handle: handle,
}
}
/// Stop `BackgroundProcessor`'s thread.
pub fn stop(self) -> Result<(), std::io::Error> {
self.stop_thread.store(true, Ordering::Release);
self.thread_handle.join().unwrap()
}
}
#[cfg(test)]
mod tests {
use bitcoin::blockdata::constants::genesis_block;
use bitcoin::blockdata::transaction::{Transaction, TxOut};
use bitcoin::network::constants::Network;
use lightning::chain;
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use lightning::chain::chainmonitor;
use lightning::chain::keysinterface::{ChannelKeys, InMemoryChannelKeys, KeysInterface, KeysManager};
use lightning::chain::transaction::OutPoint;
use lightning::get_event_msg;
use lightning::ln::channelmanager::{ChannelManager, SimpleArcChannelManager};
use lightning::ln::features::InitFeatures;
use lightning::ln::msgs::ChannelMessageHandler;
use lightning::util::config::UserConfig;
use lightning::util::events::{Event, EventsProvider, MessageSendEventsProvider, MessageSendEvent};
use lightning::util::logger::Logger;
use lightning::util::ser::Writeable;
use lightning::util::test_utils;
use lightning_persister::FilesystemPersister;
use std::fs;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use super::BackgroundProcessor;
type ChainMonitor = chainmonitor::ChainMonitor<InMemoryChannelKeys, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
struct Node {
node: SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>,
persister: Arc<FilesystemPersister>,
logger: Arc<test_utils::TestLogger>,
}
impl Drop for Node {
fn drop(&mut self) {
let data_dir = self.persister.get_data_dir();
match fs::remove_dir_all(data_dir.clone()) {
Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
_ => {}
}
}
}
fn get_full_filepath(filepath: String, filename: String) -> String {
let mut path = PathBuf::from(filepath);
path.push(filename);
path.to_str().unwrap().to_string()
}
fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
let mut nodes = Vec::new();
for i in 0..num_nodes {
let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new())});
let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 });
let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
let seed = [i as u8; 32];
let network = Network::Testnet;
let now = Duration::from_secs(genesis_block(network).header.time as u64);
let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
let manager = Arc::new(ChannelManager::new(Network::Testnet, fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster, logger.clone(), keys_manager.clone(), UserConfig::default(), i));
let node = Node { node: manager, persister, logger };
nodes.push(node);
}
nodes
}
macro_rules! open_channel {
($node_a: expr, $node_b: expr, $channel_value: expr) => {{
$node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
$node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
$node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
let events = $node_a.node.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
let (temporary_channel_id, tx, funding_output) = match events[0] {
Event::FundingGenerationReady { ref temporary_channel_id, ref channel_value_satoshis, ref output_script, user_channel_id } => {
assert_eq!(*channel_value_satoshis, $channel_value);
assert_eq!(user_channel_id, 42);
let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut {
value: *channel_value_satoshis, script_pubkey: output_script.clone(),
}]};
let funding_outpoint = OutPoint { txid: tx.txid(), index: 0 };
(*temporary_channel_id, tx, funding_outpoint)
},
_ => panic!("Unexpected event"),
};
$node_a.node.funding_transaction_generated(&temporary_channel_id, funding_output);
$node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
$node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
tx
}}
}
#[test]
fn test_background_processor() {
// Test that when a new channel is created, the ChannelManager needs to be re-persisted with
// updates. Also test that when new updates are available, the manager signals that it needs
// re-persistence and is successfully re-persisted.
let nodes = create_nodes(2, "test_background_processor".to_string());
// Initiate the background processors to watch each node.
let data_dir = nodes[0].persister.get_data_dir();
let callback = move |node: &ChannelManager<InMemoryChannelKeys, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].logger.clone());
// Go through the channel creation process until each node should have something persisted.
let tx = open_channel!(nodes[0], nodes[1], 100000);
macro_rules! check_persisted_data {
($node: expr, $filepath: expr, $expected_bytes: expr) => {
match $node.write(&mut $expected_bytes) {
Ok(()) => {
loop {
match std::fs::read($filepath) {
Ok(bytes) => {
if bytes == $expected_bytes {
break
} else {
continue
}
},
Err(_) => continue
}
}
},
Err(e) => panic!("Unexpected error: {}", e)
}
}
}
// Check that the initial channel manager data is persisted as expected.
let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
let mut expected_bytes = Vec::new();
check_persisted_data!(nodes[0].node, filepath.clone(), expected_bytes);
loop {
if !nodes[0].node.get_persistence_condvar_value() { break }
}
// Force-close the channel.
nodes[0].node.force_close_channel(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id()).unwrap();
// Check that the force-close updates are persisted.
let mut expected_bytes = Vec::new();
check_persisted_data!(nodes[0].node, filepath.clone(), expected_bytes);
loop {
if !nodes[0].node.get_persistence_condvar_value() { break }
}
assert!(bg_processor.stop().is_ok());
}
#[test]
fn test_chan_freshness_called() {
// Test that ChannelManager's `timer_chan_freshness_every_min` is called every
// `CHAN_FRESHNESS_TIMER`.
let nodes = create_nodes(1, "test_chan_freshness_called".to_string());
let data_dir = nodes[0].persister.get_data_dir();
let callback = move |node: &ChannelManager<InMemoryChannelKeys, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].logger.clone());
loop {
let log_entries = nodes[0].logger.lines.lock().unwrap();
let desired_log = "Calling manager's timer_chan_freshness_every_min".to_string();
if log_entries.get(&("background_processor".to_string(), desired_log)).is_some() {
break
}
}
assert!(bg_processor.stop().is_ok());
}
#[test]
fn test_persist_error() {
// Test that if we encounter an error during manager persistence, the thread panics.
fn persist_manager<ChanSigner, M, T, K, F, L>(_data: &ChannelManager<ChanSigner, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>) -> Result<(), std::io::Error>
where ChanSigner: 'static + ChannelKeys + Writeable,
M: 'static + chain::Watch<Keys=ChanSigner>,
T: 'static + BroadcasterInterface,
K: 'static + KeysInterface<ChanKeySigner=ChanSigner>,
F: 'static + FeeEstimator,
L: 'static + Logger,
{
Err(std::io::Error::new(std::io::ErrorKind::Other, "test"))
}
let nodes = create_nodes(2, "test_persist_error".to_string());
let bg_processor = BackgroundProcessor::start(persist_manager, nodes[0].node.clone(), nodes[0].logger.clone());
open_channel!(nodes[0], nodes[1], 100000);
let _ = bg_processor.thread_handle.join().unwrap().expect_err("Errored persisting manager: test");
}
}

View file

@ -6,17 +6,21 @@ extern crate libc;
use bitcoin::hashes::hex::ToHex;
use crate::util::DiskWriteable;
use lightning::chain;
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use lightning::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr};
use lightning::chain::channelmonitor;
use lightning::chain::keysinterface::ChannelKeys;
use lightning::chain::keysinterface::{ChannelKeys, KeysInterface};
use lightning::chain::transaction::OutPoint;
use lightning::ln::channelmanager::ChannelManager;
use lightning::util::logger::Logger;
use lightning::util::ser::Writeable;
use std::fs;
use std::io::Error;
use std::sync::Arc;
#[cfg(test)]
use {
lightning::chain::keysinterface::KeysInterface,
lightning::util::ser::ReadableArgs,
bitcoin::{BlockHash, Txid},
bitcoin::hashes::hex::FromHex,
@ -46,6 +50,19 @@ impl<ChanSigner: ChannelKeys> DiskWriteable for ChannelMonitor<ChanSigner> {
}
}
impl<ChanSigner, M, T, K, F, L> DiskWriteable for ChannelManager<ChanSigner, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>
where ChanSigner: ChannelKeys + Writeable,
M: chain::Watch<Keys=ChanSigner>,
T: BroadcasterInterface,
K: KeysInterface<ChanKeySigner=ChanSigner>,
F: FeeEstimator,
L: Logger,
{
fn write_to_file(&self, writer: &mut fs::File) -> Result<(), std::io::Error> {
self.write(writer)
}
}
impl FilesystemPersister {
/// Initialize a new FilesystemPersister and set the path to the individual channels'
/// files.
@ -55,6 +72,26 @@ impl FilesystemPersister {
}
}
pub fn get_data_dir(&self) -> String {
self.path_to_channel_data.clone()
}
/// Writes the provided `ChannelManager` to the path provided at `FilesystemPersister`
/// initialization, within a file called "manager".
pub fn persist_manager<ChanSigner, M, T, K, F, L>(
data_dir: String,
manager: &ChannelManager<ChanSigner, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>
) -> Result<(), std::io::Error>
where ChanSigner: ChannelKeys + Writeable,
M: chain::Watch<Keys=ChanSigner>,
T: BroadcasterInterface,
K: KeysInterface<ChanKeySigner=ChanSigner>,
F: FeeEstimator,
L: Logger
{
util::write_to_file(data_dir, "manager".to_string(), manager)
}
#[cfg(test)]
fn load_channel_data<Keys: KeysInterface>(&self, keys: &Keys) ->
Result<HashMap<OutPoint, ChannelMonitor<Keys::ChanKeySigner>>, ChannelMonitorUpdateErr> {

View file

@ -17,7 +17,7 @@ pub(crate) trait DiskWriteable {
fn write_to_file(&self, writer: &mut fs::File) -> Result<(), std::io::Error>;
}
pub fn get_full_filepath(filepath: String, filename: String) -> String {
pub(crate) fn get_full_filepath(filepath: String, filename: String) -> String {
let mut path = PathBuf::from(filepath);
path.push(filename);
path.to_str().unwrap().to_string()