Update NetworkGraph in BackgroundProcessor

Decorate the user-supplied EventHandler with NetGraphMsgHandler in
the BackgroundProcessor. The resulting handler will intercept
PaymentFailed events in order to update the NetworkGraph in the
background before delegating to the user's event handler.
This commit is contained in:
Jeffrey Czyz 2021-09-14 21:38:00 -05:00
parent e2f088c371
commit 992df51001
No known key found for this signature in database
GPG Key ID: 3A4E08275D5E96D2

View File

@ -17,7 +17,8 @@ use lightning::ln::channelmanager::ChannelManager;
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
use lightning::ln::peer_handler::{PeerManager, SocketDescriptor};
use lightning::ln::peer_handler::CustomMessageHandler;
use lightning::util::events::{EventHandler, EventsProvider};
use lightning::routing::network_graph::NetGraphMsgHandler;
use lightning::util::events::{Event, EventHandler, EventsProvider};
use lightning::util::logger::Logger;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
@ -99,6 +100,33 @@ ChannelManagerPersister<Signer, M, T, K, F, L> for Fun where
}
}
/// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
struct DecoratingEventHandler<
E: EventHandler,
N: Deref<Target = NetGraphMsgHandler<A, L>>,
A: Deref,
L: Deref,
>
where A::Target: chain::Access, L::Target: Logger {
event_handler: E,
net_graph_msg_handler: Option<N>,
}
impl<
E: EventHandler,
N: Deref<Target = NetGraphMsgHandler<A, L>>,
A: Deref,
L: Deref,
> EventHandler for DecoratingEventHandler<E, N, A, L>
where A::Target: chain::Access, L::Target: Logger {
fn handle_event(&self, event: &Event) {
if let Some(event_handler) = &self.net_graph_msg_handler {
event_handler.handle_event(event);
}
self.event_handler.handle_event(event);
}
}
impl BackgroundProcessor {
/// Start a background thread that takes care of responsibilities enumerated in the [top-level
/// documentation].
@ -121,8 +149,9 @@ impl BackgroundProcessor {
/// # Event Handling
///
/// `event_handler` is responsible for handling events that users should be notified of (e.g.,
/// payment failed). A user's [`EventHandler`] may be decorated with other handlers to implement
/// common functionality. See individual [`Event`]s for further details.
/// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
/// functionality implemented by other handlers.
/// * [`NetGraphMsgHandler`] if given will update the [`NetworkGraph`] based on payment failures.
///
/// [top-level documentation]: Self
/// [`join`]: Self::join
@ -130,9 +159,10 @@ impl BackgroundProcessor {
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
/// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
/// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister::persist_manager
/// [`Event`]: lightning::util::events::Event
/// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
pub fn start<
Signer: 'static + Sign,
CA: 'static + Deref + Send + Sync,
CF: 'static + Deref + Send + Sync,
CW: 'static + Deref + Send + Sync,
T: 'static + Deref + Send + Sync,
@ -147,11 +177,15 @@ impl BackgroundProcessor {
CMP: 'static + Send + ChannelManagerPersister<Signer, CW, T, K, F, L>,
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
NG: 'static + Deref<Target = NetGraphMsgHandler<CA, L>> + Send + Sync,
UMH: 'static + Deref + Send + Sync,
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
>
(persister: CMP, event_handler: EH, chain_monitor: M, channel_manager: CM, peer_manager: PM, logger: L) -> Self
>(
persister: CMP, event_handler: EH, chain_monitor: M, channel_manager: CM,
net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L
) -> Self
where
CA::Target: 'static + chain::Access,
CF::Target: 'static + chain::Filter,
CW::Target: 'static + chain::Watch<Signer>,
T::Target: 'static + BroadcasterInterface,
@ -166,6 +200,8 @@ impl BackgroundProcessor {
let stop_thread = Arc::new(AtomicBool::new(false));
let stop_thread_clone = stop_thread.clone();
let handle = thread::spawn(move || -> Result<(), std::io::Error> {
let event_handler = DecoratingEventHandler { event_handler, net_graph_msg_handler };
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
channel_manager.timer_tick_occurred();
@ -274,6 +310,7 @@ mod tests {
use lightning::ln::features::InitFeatures;
use lightning::ln::msgs::{ChannelMessageHandler, Init};
use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
use lightning::util::config::UserConfig;
use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
use lightning::util::ser::Writeable;
@ -301,6 +338,7 @@ mod tests {
struct Node {
node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
net_graph_msg_handler: Option<Arc<NetGraphMsgHandler<Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>>,
peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
chain_monitor: Arc<ChainMonitor>,
persister: Arc<FilesystemPersister>,
@ -335,15 +373,18 @@ mod tests {
let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
let seed = [i as u8; 32];
let network = Network::Testnet;
let now = Duration::from_secs(genesis_block(network).header.time as u64);
let genesis_block = genesis_block(network);
let now = Duration::from_secs(genesis_block.header.time as u64);
let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
let best_block = BestBlock::from_genesis(network);
let params = ChainParameters { network, best_block };
let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
let network_graph = NetworkGraph::new(genesis_block.header.block_hash());
let net_graph_msg_handler = Some(Arc::new(NetGraphMsgHandler::new(network_graph, Some(chain_source.clone()), logger.clone())));
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone(), IgnoringMessageHandler{}));
let node = Node { node: manager, peer_manager, chain_monitor, persister, tx_broadcaster, logger, best_block };
let node = Node { node: manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, logger, best_block };
nodes.push(node);
}
@ -441,7 +482,7 @@ mod tests {
let data_dir = nodes[0].persister.get_data_dir();
let persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
macro_rules! check_persisted_data {
($node: expr, $filepath: expr, $expected_bytes: expr) => {
@ -494,7 +535,7 @@ mod tests {
let data_dir = nodes[0].persister.get_data_dir();
let persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
loop {
let log_entries = nodes[0].logger.lines.lock().unwrap();
let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
@ -516,7 +557,7 @@ mod tests {
let persister = |_: &_| Err(std::io::Error::new(std::io::ErrorKind::Other, "test"));
let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
match bg_processor.join() {
Ok(_) => panic!("Expected error persisting manager"),
Err(e) => {
@ -538,7 +579,7 @@ mod tests {
let event_handler = move |event: &Event| {
sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
};
let bg_processor = BackgroundProcessor::start(persister.clone(), event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
let bg_processor = BackgroundProcessor::start(persister.clone(), event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
// Open a channel and check that the FundingGenerationReady event was handled.
begin_open_channel!(nodes[0], nodes[1], channel_value);
@ -562,7 +603,7 @@ mod tests {
// Set up a background event handler for SpendableOutputs events.
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
// Force close the channel and check that the SpendableOutputs event was handled.
nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id).unwrap();