Merge pull request #1433 from arik-so/2022-04-rapid-sync-bg-processor

Allow indication to BackgroundProcessor that graph sync is pending
This commit is contained in:
Jeffrey Czyz 2022-06-02 14:10:00 -05:00 committed by GitHub
commit 9c5008334c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 435 additions and 253 deletions

View file

@ -1,11 +1,13 @@
// Import that needs to be added manually // Imports that need to be added manually
use lightning_rapid_gossip_sync::RapidGossipSync;
use utils::test_logger; use utils::test_logger;
/// Actual fuzz test, method signature and name are fixed /// Actual fuzz test, method signature and name are fixed
fn do_test(data: &[u8]) { fn do_test(data: &[u8]) {
let block_hash = bitcoin::BlockHash::default(); let block_hash = bitcoin::BlockHash::default();
let network_graph = lightning::routing::network_graph::NetworkGraph::new(block_hash); let network_graph = lightning::routing::network_graph::NetworkGraph::new(block_hash);
lightning_rapid_gossip_sync::processing::update_network_graph(&network_graph, data); let rapid_sync = RapidGossipSync::new(&network_graph);
let _ = rapid_sync.update_network_graph(data);
} }
/// Method that needs to be added manually, {name}_test /// Method that needs to be added manually, {name}_test

View file

@ -16,6 +16,7 @@ rustdoc-args = ["--cfg", "docsrs"]
[dependencies] [dependencies]
bitcoin = "0.28.1" bitcoin = "0.28.1"
lightning = { version = "0.0.106", path = "../lightning", features = ["std"] } lightning = { version = "0.0.106", path = "../lightning", features = ["std"] }
lightning-rapid-gossip-sync = { version = "0.0.106", path = "../lightning-rapid-gossip-sync" }
[dev-dependencies] [dev-dependencies]
lightning = { version = "0.0.106", path = "../lightning", features = ["_test_utils"] } lightning = { version = "0.0.106", path = "../lightning", features = ["_test_utils"] }

View file

@ -9,6 +9,7 @@
#![cfg_attr(docsrs, feature(doc_auto_cfg))] #![cfg_attr(docsrs, feature(doc_auto_cfg))]
#[macro_use] extern crate lightning; #[macro_use] extern crate lightning;
extern crate lightning_rapid_gossip_sync;
use lightning::chain; use lightning::chain;
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
@ -22,6 +23,7 @@ use lightning::routing::scoring::WriteableScore;
use lightning::util::events::{Event, EventHandler, EventsProvider}; use lightning::util::events::{Event, EventHandler, EventsProvider};
use lightning::util::logger::Logger; use lightning::util::logger::Logger;
use lightning::util::persist::Persister; use lightning::util::persist::Persister;
use lightning_rapid_gossip_sync::RapidGossipSync;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::thread; use std::thread;
@ -77,6 +79,11 @@ const PING_TIMER: u64 = 1;
/// Prune the network graph of stale entries hourly. /// Prune the network graph of stale entries hourly.
const NETWORK_PRUNE_TIMER: u64 = 60 * 60; const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
#[cfg(all(not(test), debug_assertions))]
const SCORER_PERSIST_TIMER: u64 = 30;
#[cfg(test)]
const SCORER_PERSIST_TIMER: u64 = 1;
#[cfg(not(test))] #[cfg(not(test))]
const FIRST_NETWORK_PRUNE_TIMER: u64 = 60; const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
#[cfg(test)] #[cfg(test)]
@ -142,6 +149,12 @@ impl BackgroundProcessor {
/// functionality implemented by other handlers. /// functionality implemented by other handlers.
/// * [`NetGraphMsgHandler`] if given will update the [`NetworkGraph`] based on payment failures. /// * [`NetGraphMsgHandler`] if given will update the [`NetworkGraph`] based on payment failures.
/// ///
/// # Rapid Gossip Sync
///
/// If rapid gossip sync is meant to run at startup, pass an optional [`RapidGossipSync`]
/// to `rapid_gossip_sync` to indicate to [`BackgroundProcessor`] not to prune the
/// [`NetworkGraph`] instance until the [`RapidGossipSync`] instance completes its first sync.
///
/// [top-level documentation]: BackgroundProcessor /// [top-level documentation]: BackgroundProcessor
/// [`join`]: Self::join /// [`join`]: Self::join
/// [`stop`]: Self::stop /// [`stop`]: Self::stop
@ -175,9 +188,11 @@ impl BackgroundProcessor {
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync, PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
S: 'static + Deref<Target = SC> + Send + Sync, S: 'static + Deref<Target = SC> + Send + Sync,
SC: WriteableScore<'a>, SC: WriteableScore<'a>,
RGS: 'static + Deref<Target = RapidGossipSync<G>> + Send
>( >(
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L, scorer: Option<S> net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L, scorer: Option<S>,
rapid_gossip_sync: Option<RGS>
) -> Self ) -> Self
where where
CA::Target: 'static + chain::Access, CA::Target: 'static + chain::Access,
@ -204,6 +219,7 @@ impl BackgroundProcessor {
let mut last_freshness_call = Instant::now(); let mut last_freshness_call = Instant::now();
let mut last_ping_call = Instant::now(); let mut last_ping_call = Instant::now();
let mut last_prune_call = Instant::now(); let mut last_prune_call = Instant::now();
let mut last_scorer_persist_call = Instant::now();
let mut have_pruned = false; let mut have_pruned = false;
loop { loop {
@ -272,22 +288,41 @@ impl BackgroundProcessor {
// pruning their network graph. We run once 60 seconds after startup before // pruning their network graph. We run once 60 seconds after startup before
// continuing our normal cadence. // continuing our normal cadence.
if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } { if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
if let Some(ref handler) = net_graph_msg_handler { // The network graph must not be pruned while rapid sync completion is pending
log_trace!(logger, "Pruning network graph of stale entries"); log_trace!(logger, "Assessing prunability of network graph");
handler.network_graph().remove_stale_channels(); let graph_to_prune = match rapid_gossip_sync.as_ref() {
if let Err(e) = persister.persist_graph(handler.network_graph()) { Some(rapid_sync) => {
if rapid_sync.is_initial_sync_complete() {
Some(rapid_sync.network_graph())
} else {
None
}
},
None => net_graph_msg_handler.as_ref().map(|handler| handler.network_graph())
};
if let Some(network_graph_reference) = graph_to_prune {
network_graph_reference.remove_stale_channels();
if let Err(e) = persister.persist_graph(network_graph_reference) {
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e) log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
} }
last_prune_call = Instant::now();
have_pruned = true;
} else {
log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
} }
}
if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
if let Some(ref scorer) = scorer { if let Some(ref scorer) = scorer {
log_trace!(logger, "Persisting scorer"); log_trace!(logger, "Persisting scorer");
if let Err(e) = persister.persist_scorer(&scorer) { if let Err(e) = persister.persist_scorer(&scorer) {
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
} }
} }
last_scorer_persist_call = Instant::now();
last_prune_call = Instant::now();
have_pruned = true;
} }
} }
@ -370,7 +405,7 @@ mod tests {
use lightning::chain::transaction::OutPoint; use lightning::chain::transaction::OutPoint;
use lightning::get_event_msg; use lightning::get_event_msg;
use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager}; use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
use lightning::ln::features::InitFeatures; use lightning::ln::features::{ChannelFeatures, InitFeatures};
use lightning::ln::msgs::{ChannelMessageHandler, Init}; use lightning::ln::msgs::{ChannelMessageHandler, Init};
use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler}; use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler}; use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
@ -385,8 +420,10 @@ mod tests {
use std::fs; use std::fs;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::sync::mpsc::SyncSender;
use std::time::Duration; use std::time::Duration;
use lightning::routing::scoring::{FixedPenaltyScorer}; use lightning::routing::scoring::{FixedPenaltyScorer};
use lightning_rapid_gossip_sync::RapidGossipSync;
use super::{BackgroundProcessor, FRESHNESS_TIMER}; use super::{BackgroundProcessor, FRESHNESS_TIMER};
const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER; const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
@ -414,6 +451,7 @@ mod tests {
logger: Arc<test_utils::TestLogger>, logger: Arc<test_utils::TestLogger>,
best_block: BestBlock, best_block: BestBlock,
scorer: Arc<Mutex<FixedPenaltyScorer>>, scorer: Arc<Mutex<FixedPenaltyScorer>>,
rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph>>>>
} }
impl Drop for Node { impl Drop for Node {
@ -428,6 +466,7 @@ mod tests {
struct Persister { struct Persister {
graph_error: Option<(std::io::ErrorKind, &'static str)>, graph_error: Option<(std::io::ErrorKind, &'static str)>,
graph_persistence_notifier: Option<SyncSender<()>>,
manager_error: Option<(std::io::ErrorKind, &'static str)>, manager_error: Option<(std::io::ErrorKind, &'static str)>,
scorer_error: Option<(std::io::ErrorKind, &'static str)>, scorer_error: Option<(std::io::ErrorKind, &'static str)>,
filesystem_persister: FilesystemPersister, filesystem_persister: FilesystemPersister,
@ -436,13 +475,17 @@ mod tests {
impl Persister { impl Persister {
fn new(data_dir: String) -> Self { fn new(data_dir: String) -> Self {
let filesystem_persister = FilesystemPersister::new(data_dir.clone()); let filesystem_persister = FilesystemPersister::new(data_dir.clone());
Self { graph_error: None, manager_error: None, scorer_error: None, filesystem_persister } Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
} }
fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self { fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
Self { graph_error: Some((error, message)), ..self } Self { graph_error: Some((error, message)), ..self }
} }
fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
Self { graph_persistence_notifier: Some(sender), ..self }
}
fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self { fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
Self { manager_error: Some((error, message)), ..self } Self { manager_error: Some((error, message)), ..self }
} }
@ -461,6 +504,10 @@ mod tests {
} }
if key == "network_graph" { if key == "network_graph" {
if let Some(sender) = &self.graph_persistence_notifier {
sender.send(()).unwrap();
};
if let Some((error, message)) = self.graph_error { if let Some((error, message)) = self.graph_error {
return Err(std::io::Error::new(error, message)) return Err(std::io::Error::new(error, message))
} }
@ -504,7 +551,8 @@ mod tests {
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )}; let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{})); let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0))); let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
let node = Node { node: manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer }; let rapid_gossip_sync = None;
let node = Node { node: manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer, rapid_gossip_sync };
nodes.push(node); nodes.push(node);
} }
@ -602,7 +650,7 @@ mod tests {
let data_dir = nodes[0].persister.get_data_dir(); let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir)); let persister = Arc::new(Persister::new(data_dir));
let event_handler = |_: &_| {}; let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
macro_rules! check_persisted_data { macro_rules! check_persisted_data {
($node: expr, $filepath: expr) => { ($node: expr, $filepath: expr) => {
@ -667,7 +715,7 @@ mod tests {
let data_dir = nodes[0].persister.get_data_dir(); let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir)); let persister = Arc::new(Persister::new(data_dir));
let event_handler = |_: &_| {}; let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
loop { loop {
let log_entries = nodes[0].logger.lines.lock().unwrap(); let log_entries = nodes[0].logger.lines.lock().unwrap();
let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string(); let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
@ -690,7 +738,7 @@ mod tests {
let data_dir = nodes[0].persister.get_data_dir(); let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test")); let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
let event_handler = |_: &_| {}; let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
match bg_processor.join() { match bg_processor.join() {
Ok(_) => panic!("Expected error persisting manager"), Ok(_) => panic!("Expected error persisting manager"),
Err(e) => { Err(e) => {
@ -707,7 +755,7 @@ mod tests {
let data_dir = nodes[0].persister.get_data_dir(); let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test")); let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
let event_handler = |_: &_| {}; let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
match bg_processor.stop() { match bg_processor.stop() {
Ok(_) => panic!("Expected error persisting network graph"), Ok(_) => panic!("Expected error persisting network graph"),
@ -725,7 +773,7 @@ mod tests {
let data_dir = nodes[0].persister.get_data_dir(); let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test")); let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
let event_handler = |_: &_| {}; let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
match bg_processor.stop() { match bg_processor.stop() {
Ok(_) => panic!("Expected error persisting scorer"), Ok(_) => panic!("Expected error persisting scorer"),
@ -748,7 +796,7 @@ mod tests {
let event_handler = move |event: &Event| { let event_handler = move |event: &Event| {
sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(); sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
}; };
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
// Open a channel and check that the FundingGenerationReady event was handled. // Open a channel and check that the FundingGenerationReady event was handled.
begin_open_channel!(nodes[0], nodes[1], channel_value); begin_open_channel!(nodes[0], nodes[1], channel_value);
@ -773,7 +821,7 @@ mod tests {
let (sender, receiver) = std::sync::mpsc::sync_channel(1); let (sender, receiver) = std::sync::mpsc::sync_channel(1);
let event_handler = move |event: &Event| sender.send(event.clone()).unwrap(); let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
let persister = Arc::new(Persister::new(data_dir)); let persister = Arc::new(Persister::new(data_dir));
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
// Force close the channel and check that the SpendableOutputs event was handled. // Force close the channel and check that the SpendableOutputs event was handled.
nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap(); nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
@ -791,6 +839,83 @@ mod tests {
assert!(bg_processor.stop().is_ok()); assert!(bg_processor.stop().is_ok());
} }
#[test]
fn test_scorer_persistence() {
let nodes = create_nodes(2, "test_scorer_persistence".to_string());
let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
loop {
let log_entries = nodes[0].logger.lines.lock().unwrap();
let expected_log = "Persisting scorer".to_string();
if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
break
}
}
assert!(bg_processor.stop().is_ok());
}
#[test]
fn test_not_pruning_network_graph_until_graph_sync_completion() {
let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
let data_dir = nodes[0].persister.get_data_dir();
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
let network_graph = nodes[0].network_graph.clone();
let rapid_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
let features = ChannelFeatures::empty();
network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
.expect("Failed to update channel from partial announcement");
let original_graph_description = network_graph.to_string();
assert!(original_graph_description.contains("42: features: 0000, node_one:"));
assert_eq!(network_graph.read_only().channels().len(), 1);
let event_handler = |_: &_| {};
let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), Some(rapid_sync.clone()));
loop {
let log_entries = nodes[0].logger.lines.lock().unwrap();
let expected_log_a = "Assessing prunability of network graph".to_string();
let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
break
}
}
let initialization_input = vec![
76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
];
rapid_sync.update_network_graph(&initialization_input[..]).unwrap();
// this should have added two channels
assert_eq!(network_graph.read_only().channels().len(), 3);
let _ = receiver
.recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
.expect("Network graph not pruned within deadline");
background_processor.stop().unwrap();
// all channels should now be pruned
assert_eq!(network_graph.read_only().channels().len(), 0);
}
#[test] #[test]
fn test_invoice_payer() { fn test_invoice_payer() {
let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet); let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
@ -803,7 +928,7 @@ mod tests {
let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes); let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2))); let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
let event_handler = Arc::clone(&invoice_payer); let event_handler = Arc::clone(&invoice_payer);
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
assert!(bg_processor.stop().is_ok()); assert!(bg_processor.stop().is_ok());
} }
} }

View file

@ -30,10 +30,12 @@
//! use bitcoin::blockdata::constants::genesis_block; //! use bitcoin::blockdata::constants::genesis_block;
//! use bitcoin::Network; //! use bitcoin::Network;
//! use lightning::routing::network_graph::NetworkGraph; //! use lightning::routing::network_graph::NetworkGraph;
//! use lightning_rapid_gossip_sync::RapidGossipSync;
//! //!
//! let block_hash = genesis_block(Network::Bitcoin).header.block_hash(); //! let block_hash = genesis_block(Network::Bitcoin).header.block_hash();
//! let network_graph = NetworkGraph::new(block_hash); //! let network_graph = NetworkGraph::new(block_hash);
//! let new_last_sync_timestamp_result = lightning_rapid_gossip_sync::sync_network_graph_with_file_path(&network_graph, "./rapid_sync.lngossip"); //! let rapid_sync = RapidGossipSync::new(&network_graph);
//! let new_last_sync_timestamp_result = rapid_sync.sync_network_graph_with_file_path("./rapid_sync.lngossip");
//! ``` //! ```
//! //!
//! The primary benefit this syncing mechanism provides is that given a trusted server, a //! The primary benefit this syncing mechanism provides is that given a trusted server, a
@ -57,8 +59,10 @@
extern crate test; extern crate test;
use std::fs::File; use std::fs::File;
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use lightning::routing::network_graph; use lightning::routing::network_graph::NetworkGraph;
use crate::error::GraphSyncError; use crate::error::GraphSyncError;
@ -68,19 +72,51 @@ pub mod error;
/// Core functionality of this crate /// Core functionality of this crate
pub mod processing; pub mod processing;
/// Sync gossip data from a file /// Rapid Gossip Sync struct
/// Returns the last sync timestamp to be used the next time rapid sync data is queried. /// See [crate-level documentation] for usage.
/// ///
/// `network_graph`: The network graph to apply the updates to /// [crate-level documentation]: crate
/// pub struct RapidGossipSync<NG: Deref<Target=NetworkGraph>> {
/// `sync_path`: Path to the file where the gossip update data is located network_graph: NG,
/// is_initial_sync_complete: AtomicBool
pub fn sync_network_graph_with_file_path( }
network_graph: &network_graph::NetworkGraph,
sync_path: &str, impl<NG: Deref<Target=NetworkGraph>> RapidGossipSync<NG> {
) -> Result<u32, GraphSyncError> { /// Instantiate a new [`RapidGossipSync`] instance
let mut file = File::open(sync_path)?; pub fn new(network_graph: NG) -> Self {
processing::update_network_graph_from_byte_stream(&network_graph, &mut file) Self {
network_graph,
is_initial_sync_complete: AtomicBool::new(false)
}
}
/// Sync gossip data from a file
/// Returns the last sync timestamp to be used the next time rapid sync data is queried.
///
/// `network_graph`: The network graph to apply the updates to
///
/// `sync_path`: Path to the file where the gossip update data is located
///
pub fn sync_network_graph_with_file_path(
&self,
sync_path: &str,
) -> Result<u32, GraphSyncError> {
let mut file = File::open(sync_path)?;
self.update_network_graph_from_byte_stream(&mut file)
}
/// Gets a reference to the underlying [`NetworkGraph`] which was provided in
/// [`RapidGossipSync::new`].
///
/// (C-not exported) as bindings don't support a reference-to-a-reference yet
pub fn network_graph(&self) -> &NG {
&self.network_graph
}
/// Returns whether a rapid gossip sync has completed at least once
pub fn is_initial_sync_complete(&self) -> bool {
self.is_initial_sync_complete.load(Ordering::Acquire)
}
} }
#[cfg(test)] #[cfg(test)]
@ -92,8 +128,7 @@ mod tests {
use lightning::ln::msgs::DecodeError; use lightning::ln::msgs::DecodeError;
use lightning::routing::network_graph::NetworkGraph; use lightning::routing::network_graph::NetworkGraph;
use crate::RapidGossipSync;
use crate::sync_network_graph_with_file_path;
#[test] #[test]
fn test_sync_from_file() { fn test_sync_from_file() {
@ -156,7 +191,8 @@ mod tests {
assert_eq!(network_graph.read_only().channels().len(), 0); assert_eq!(network_graph.read_only().channels().len(), 0);
let sync_result = sync_network_graph_with_file_path(&network_graph, &graph_sync_test_file); let rapid_sync = RapidGossipSync::new(&network_graph);
let sync_result = rapid_sync.sync_network_graph_with_file_path(&graph_sync_test_file);
if sync_result.is_err() { if sync_result.is_err() {
panic!("Unexpected sync result: {:?}", sync_result) panic!("Unexpected sync result: {:?}", sync_result)
@ -187,11 +223,12 @@ mod tests {
assert_eq!(network_graph.read_only().channels().len(), 0); assert_eq!(network_graph.read_only().channels().len(), 0);
let rapid_sync = RapidGossipSync::new(&network_graph);
let start = std::time::Instant::now(); let start = std::time::Instant::now();
let sync_result = let sync_result = rapid_sync
sync_network_graph_with_file_path(&network_graph, "./res/full_graph.lngossip"); .sync_network_graph_with_file_path("./res/full_graph.lngossip");
if let Err(crate::error::GraphSyncError::DecodeError(DecodeError::Io(io_error))) = &sync_result { if let Err(crate::error::GraphSyncError::DecodeError(DecodeError::Io(io_error))) = &sync_result {
let error_string = format!("Input file lightning-graph-sync/res/full_graph.lngossip is missing! Download it from https://bitcoin.ninja/ldk-compressed_graph-bc08df7542-2022-05-05.bin\n\n{:?}", io_error); let error_string = format!("Input file lightning-rapid-gossip-sync/res/full_graph.lngossip is missing! Download it from https://bitcoin.ninja/ldk-compressed_graph-bc08df7542-2022-05-05.bin\n\n{:?}", io_error);
#[cfg(not(require_route_graph_test))] #[cfg(not(require_route_graph_test))]
{ {
println!("{}", error_string); println!("{}", error_string);
@ -218,19 +255,17 @@ pub mod bench {
use lightning::ln::msgs::DecodeError; use lightning::ln::msgs::DecodeError;
use lightning::routing::network_graph::NetworkGraph; use lightning::routing::network_graph::NetworkGraph;
use crate::sync_network_graph_with_file_path; use crate::RapidGossipSync;
#[bench] #[bench]
fn bench_reading_full_graph_from_file(b: &mut Bencher) { fn bench_reading_full_graph_from_file(b: &mut Bencher) {
let block_hash = genesis_block(Network::Bitcoin).block_hash(); let block_hash = genesis_block(Network::Bitcoin).block_hash();
b.iter(|| { b.iter(|| {
let network_graph = NetworkGraph::new(block_hash); let network_graph = NetworkGraph::new(block_hash);
let sync_result = sync_network_graph_with_file_path( let rapid_sync = RapidGossipSync::new(&network_graph);
&network_graph, let sync_result = rapid_sync.sync_network_graph_with_file_path("./res/full_graph.lngossip");
"./res/full_graph.lngossip",
);
if let Err(crate::error::GraphSyncError::DecodeError(DecodeError::Io(io_error))) = &sync_result { if let Err(crate::error::GraphSyncError::DecodeError(DecodeError::Io(io_error))) = &sync_result {
let error_string = format!("Input file lightning-graph-sync/res/full_graph.lngossip is missing! Download it from https://bitcoin.ninja/ldk-compressed_graph-bc08df7542-2022-05-05.bin\n\n{:?}", io_error); let error_string = format!("Input file lightning-rapid-gossip-sync/res/full_graph.lngossip is missing! Download it from https://bitcoin.ninja/ldk-compressed_graph-bc08df7542-2022-05-05.bin\n\n{:?}", io_error);
#[cfg(not(require_route_graph_test))] #[cfg(not(require_route_graph_test))]
{ {
println!("{}", error_string); println!("{}", error_string);

View file

@ -1,6 +1,8 @@
use std::cmp::max; use std::cmp::max;
use std::io; use std::io;
use std::io::Read; use std::io::Read;
use std::ops::Deref;
use std::sync::atomic::Ordering;
use bitcoin::BlockHash; use bitcoin::BlockHash;
use bitcoin::secp256k1::PublicKey; use bitcoin::secp256k1::PublicKey;
@ -8,10 +10,11 @@ use bitcoin::secp256k1::PublicKey;
use lightning::ln::msgs::{ use lightning::ln::msgs::{
DecodeError, ErrorAction, LightningError, OptionalField, UnsignedChannelUpdate, DecodeError, ErrorAction, LightningError, OptionalField, UnsignedChannelUpdate,
}; };
use lightning::routing::network_graph; use lightning::routing::network_graph::NetworkGraph;
use lightning::util::ser::{BigSize, Readable}; use lightning::util::ser::{BigSize, Readable};
use crate::error::GraphSyncError; use crate::error::GraphSyncError;
use crate::RapidGossipSync;
/// The purpose of this prefix is to identify the serialization format, should other rapid gossip /// The purpose of this prefix is to identify the serialization format, should other rapid gossip
/// sync formats arise in the future. /// sync formats arise in the future.
@ -23,203 +26,207 @@ const GOSSIP_PREFIX: [u8; 4] = [76, 68, 75, 1];
/// avoid malicious updates being able to trigger excessive memory allocation. /// avoid malicious updates being able to trigger excessive memory allocation.
const MAX_INITIAL_NODE_ID_VECTOR_CAPACITY: u32 = 50_000; const MAX_INITIAL_NODE_ID_VECTOR_CAPACITY: u32 = 50_000;
/// Update network graph from binary data. impl<NG: Deref<Target=NetworkGraph>> RapidGossipSync<NG> {
/// Returns the last sync timestamp to be used the next time rapid sync data is queried. /// Update network graph from binary data.
/// /// Returns the last sync timestamp to be used the next time rapid sync data is queried.
/// `network_graph`: network graph to be updated ///
/// /// `network_graph`: network graph to be updated
/// `update_data`: `&[u8]` binary stream that comprises the update data ///
pub fn update_network_graph( /// `update_data`: `&[u8]` binary stream that comprises the update data
network_graph: &network_graph::NetworkGraph, pub fn update_network_graph(&self, update_data: &[u8]) -> Result<u32, GraphSyncError> {
update_data: &[u8], let mut read_cursor = io::Cursor::new(update_data);
) -> Result<u32, GraphSyncError> { self.update_network_graph_from_byte_stream(&mut read_cursor)
let mut read_cursor = io::Cursor::new(update_data);
update_network_graph_from_byte_stream(&network_graph, &mut read_cursor)
}
pub(crate) fn update_network_graph_from_byte_stream<R: Read>(
network_graph: &network_graph::NetworkGraph,
mut read_cursor: &mut R,
) -> Result<u32, GraphSyncError> {
let mut prefix = [0u8; 4];
read_cursor.read_exact(&mut prefix)?;
match prefix {
GOSSIP_PREFIX => {},
_ => {
return Err(DecodeError::UnknownVersion.into());
}
};
let chain_hash: BlockHash = Readable::read(read_cursor)?;
let latest_seen_timestamp: u32 = Readable::read(read_cursor)?;
// backdate the applied timestamp by a week
let backdated_timestamp = latest_seen_timestamp.saturating_sub(24 * 3600 * 7);
let node_id_count: u32 = Readable::read(read_cursor)?;
let mut node_ids: Vec<PublicKey> = Vec::with_capacity(std::cmp::min(
node_id_count,
MAX_INITIAL_NODE_ID_VECTOR_CAPACITY,
) as usize);
for _ in 0..node_id_count {
let current_node_id = Readable::read(read_cursor)?;
node_ids.push(current_node_id);
} }
let mut previous_scid: u64 = 0;
let announcement_count: u32 = Readable::read(read_cursor)?;
for _ in 0..announcement_count {
let features = Readable::read(read_cursor)?;
// handle SCID pub(crate) fn update_network_graph_from_byte_stream<R: Read>(
let scid_delta: BigSize = Readable::read(read_cursor)?; &self,
let short_channel_id = previous_scid mut read_cursor: &mut R,
.checked_add(scid_delta.0) ) -> Result<u32, GraphSyncError> {
.ok_or(DecodeError::InvalidValue)?; let mut prefix = [0u8; 4];
previous_scid = short_channel_id; read_cursor.read_exact(&mut prefix)?;
let node_id_1_index: BigSize = Readable::read(read_cursor)?; match prefix {
let node_id_2_index: BigSize = Readable::read(read_cursor)?; GOSSIP_PREFIX => {}
if max(node_id_1_index.0, node_id_2_index.0) >= node_id_count as u64 { _ => {
return Err(DecodeError::InvalidValue.into()); return Err(DecodeError::UnknownVersion.into());
};
let node_id_1 = node_ids[node_id_1_index.0 as usize];
let node_id_2 = node_ids[node_id_2_index.0 as usize];
let announcement_result = network_graph.add_channel_from_partial_announcement(
short_channel_id,
backdated_timestamp as u64,
features,
node_id_1,
node_id_2,
);
if let Err(lightning_error) = announcement_result {
if let ErrorAction::IgnoreDuplicateGossip = lightning_error.action {
// everything is fine, just a duplicate channel announcement
} else {
return Err(lightning_error.into());
}
}
}
previous_scid = 0; // updates start at a new scid
let update_count: u32 = Readable::read(read_cursor)?;
if update_count == 0 {
return Ok(latest_seen_timestamp);
}
// obtain default values for non-incremental updates
let default_cltv_expiry_delta: u16 = Readable::read(&mut read_cursor)?;
let default_htlc_minimum_msat: u64 = Readable::read(&mut read_cursor)?;
let default_fee_base_msat: u32 = Readable::read(&mut read_cursor)?;
let default_fee_proportional_millionths: u32 = Readable::read(&mut read_cursor)?;
let tentative_default_htlc_maximum_msat: u64 = Readable::read(&mut read_cursor)?;
let default_htlc_maximum_msat = if tentative_default_htlc_maximum_msat == u64::max_value() {
OptionalField::Absent
} else {
OptionalField::Present(tentative_default_htlc_maximum_msat)
};
for _ in 0..update_count {
let scid_delta: BigSize = Readable::read(read_cursor)?;
let short_channel_id = previous_scid
.checked_add(scid_delta.0)
.ok_or(DecodeError::InvalidValue)?;
previous_scid = short_channel_id;
let channel_flags: u8 = Readable::read(read_cursor)?;
// flags are always sent in full, and hence always need updating
let standard_channel_flags = channel_flags & 0b_0000_0011;
let mut synthetic_update = if channel_flags & 0b_1000_0000 == 0 {
// full update, field flags will indicate deviations from the default
UnsignedChannelUpdate {
chain_hash,
short_channel_id,
timestamp: backdated_timestamp,
flags: standard_channel_flags,
cltv_expiry_delta: default_cltv_expiry_delta,
htlc_minimum_msat: default_htlc_minimum_msat,
htlc_maximum_msat: default_htlc_maximum_msat.clone(),
fee_base_msat: default_fee_base_msat,
fee_proportional_millionths: default_fee_proportional_millionths,
excess_data: vec![],
}
} else {
// incremental update, field flags will indicate mutated values
let read_only_network_graph = network_graph.read_only();
let channel = read_only_network_graph
.channels()
.get(&short_channel_id)
.ok_or(LightningError {
err: "Couldn't find channel for update".to_owned(),
action: ErrorAction::IgnoreError,
})?;
let directional_info = channel
.get_directional_info(channel_flags)
.ok_or(LightningError {
err: "Couldn't find previous directional data for update".to_owned(),
action: ErrorAction::IgnoreError,
})?;
let htlc_maximum_msat =
if let Some(htlc_maximum_msat) = directional_info.htlc_maximum_msat {
OptionalField::Present(htlc_maximum_msat)
} else {
OptionalField::Absent
};
UnsignedChannelUpdate {
chain_hash,
short_channel_id,
timestamp: backdated_timestamp,
flags: standard_channel_flags,
cltv_expiry_delta: directional_info.cltv_expiry_delta,
htlc_minimum_msat: directional_info.htlc_minimum_msat,
htlc_maximum_msat,
fee_base_msat: directional_info.fees.base_msat,
fee_proportional_millionths: directional_info.fees.proportional_millionths,
excess_data: vec![],
} }
}; };
if channel_flags & 0b_0100_0000 > 0 { let chain_hash: BlockHash = Readable::read(read_cursor)?;
let cltv_expiry_delta: u16 = Readable::read(read_cursor)?; let latest_seen_timestamp: u32 = Readable::read(read_cursor)?;
synthetic_update.cltv_expiry_delta = cltv_expiry_delta; // backdate the applied timestamp by a week
let backdated_timestamp = latest_seen_timestamp.saturating_sub(24 * 3600 * 7);
let node_id_count: u32 = Readable::read(read_cursor)?;
let mut node_ids: Vec<PublicKey> = Vec::with_capacity(std::cmp::min(
node_id_count,
MAX_INITIAL_NODE_ID_VECTOR_CAPACITY,
) as usize);
for _ in 0..node_id_count {
let current_node_id = Readable::read(read_cursor)?;
node_ids.push(current_node_id);
} }
if channel_flags & 0b_0010_0000 > 0 { let network_graph = &self.network_graph;
let htlc_minimum_msat: u64 = Readable::read(read_cursor)?;
synthetic_update.htlc_minimum_msat = htlc_minimum_msat;
}
if channel_flags & 0b_0001_0000 > 0 { let mut previous_scid: u64 = 0;
let fee_base_msat: u32 = Readable::read(read_cursor)?; let announcement_count: u32 = Readable::read(read_cursor)?;
synthetic_update.fee_base_msat = fee_base_msat; for _ in 0..announcement_count {
} let features = Readable::read(read_cursor)?;
if channel_flags & 0b_0000_1000 > 0 { // handle SCID
let fee_proportional_millionths: u32 = Readable::read(read_cursor)?; let scid_delta: BigSize = Readable::read(read_cursor)?;
synthetic_update.fee_proportional_millionths = fee_proportional_millionths; let short_channel_id = previous_scid
} .checked_add(scid_delta.0)
.ok_or(DecodeError::InvalidValue)?;
previous_scid = short_channel_id;
if channel_flags & 0b_0000_0100 > 0 { let node_id_1_index: BigSize = Readable::read(read_cursor)?;
let tentative_htlc_maximum_msat: u64 = Readable::read(read_cursor)?; let node_id_2_index: BigSize = Readable::read(read_cursor)?;
synthetic_update.htlc_maximum_msat = if tentative_htlc_maximum_msat == u64::max_value() if max(node_id_1_index.0, node_id_2_index.0) >= node_id_count as u64 {
{ return Err(DecodeError::InvalidValue.into());
OptionalField::Absent
} else {
OptionalField::Present(tentative_htlc_maximum_msat)
}; };
let node_id_1 = node_ids[node_id_1_index.0 as usize];
let node_id_2 = node_ids[node_id_2_index.0 as usize];
let announcement_result = network_graph.add_channel_from_partial_announcement(
short_channel_id,
backdated_timestamp as u64,
features,
node_id_1,
node_id_2,
);
if let Err(lightning_error) = announcement_result {
if let ErrorAction::IgnoreDuplicateGossip = lightning_error.action {
// everything is fine, just a duplicate channel announcement
} else {
return Err(lightning_error.into());
}
}
} }
network_graph.update_channel_unsigned(&synthetic_update)?; previous_scid = 0; // updates start at a new scid
}
Ok(latest_seen_timestamp) let update_count: u32 = Readable::read(read_cursor)?;
if update_count == 0 {
return Ok(latest_seen_timestamp);
}
// obtain default values for non-incremental updates
let default_cltv_expiry_delta: u16 = Readable::read(&mut read_cursor)?;
let default_htlc_minimum_msat: u64 = Readable::read(&mut read_cursor)?;
let default_fee_base_msat: u32 = Readable::read(&mut read_cursor)?;
let default_fee_proportional_millionths: u32 = Readable::read(&mut read_cursor)?;
let tentative_default_htlc_maximum_msat: u64 = Readable::read(&mut read_cursor)?;
let default_htlc_maximum_msat = if tentative_default_htlc_maximum_msat == u64::max_value() {
OptionalField::Absent
} else {
OptionalField::Present(tentative_default_htlc_maximum_msat)
};
for _ in 0..update_count {
let scid_delta: BigSize = Readable::read(read_cursor)?;
let short_channel_id = previous_scid
.checked_add(scid_delta.0)
.ok_or(DecodeError::InvalidValue)?;
previous_scid = short_channel_id;
let channel_flags: u8 = Readable::read(read_cursor)?;
// flags are always sent in full, and hence always need updating
let standard_channel_flags = channel_flags & 0b_0000_0011;
let mut synthetic_update = if channel_flags & 0b_1000_0000 == 0 {
// full update, field flags will indicate deviations from the default
UnsignedChannelUpdate {
chain_hash,
short_channel_id,
timestamp: backdated_timestamp,
flags: standard_channel_flags,
cltv_expiry_delta: default_cltv_expiry_delta,
htlc_minimum_msat: default_htlc_minimum_msat,
htlc_maximum_msat: default_htlc_maximum_msat.clone(),
fee_base_msat: default_fee_base_msat,
fee_proportional_millionths: default_fee_proportional_millionths,
excess_data: vec![],
}
} else {
// incremental update, field flags will indicate mutated values
let read_only_network_graph = network_graph.read_only();
let channel = read_only_network_graph
.channels()
.get(&short_channel_id)
.ok_or(LightningError {
err: "Couldn't find channel for update".to_owned(),
action: ErrorAction::IgnoreError,
})?;
let directional_info = channel
.get_directional_info(channel_flags)
.ok_or(LightningError {
err: "Couldn't find previous directional data for update".to_owned(),
action: ErrorAction::IgnoreError,
})?;
let htlc_maximum_msat =
if let Some(htlc_maximum_msat) = directional_info.htlc_maximum_msat {
OptionalField::Present(htlc_maximum_msat)
} else {
OptionalField::Absent
};
UnsignedChannelUpdate {
chain_hash,
short_channel_id,
timestamp: backdated_timestamp,
flags: standard_channel_flags,
cltv_expiry_delta: directional_info.cltv_expiry_delta,
htlc_minimum_msat: directional_info.htlc_minimum_msat,
htlc_maximum_msat,
fee_base_msat: directional_info.fees.base_msat,
fee_proportional_millionths: directional_info.fees.proportional_millionths,
excess_data: vec![],
}
};
if channel_flags & 0b_0100_0000 > 0 {
let cltv_expiry_delta: u16 = Readable::read(read_cursor)?;
synthetic_update.cltv_expiry_delta = cltv_expiry_delta;
}
if channel_flags & 0b_0010_0000 > 0 {
let htlc_minimum_msat: u64 = Readable::read(read_cursor)?;
synthetic_update.htlc_minimum_msat = htlc_minimum_msat;
}
if channel_flags & 0b_0001_0000 > 0 {
let fee_base_msat: u32 = Readable::read(read_cursor)?;
synthetic_update.fee_base_msat = fee_base_msat;
}
if channel_flags & 0b_0000_1000 > 0 {
let fee_proportional_millionths: u32 = Readable::read(read_cursor)?;
synthetic_update.fee_proportional_millionths = fee_proportional_millionths;
}
if channel_flags & 0b_0000_0100 > 0 {
let tentative_htlc_maximum_msat: u64 = Readable::read(read_cursor)?;
synthetic_update.htlc_maximum_msat = if tentative_htlc_maximum_msat == u64::max_value()
{
OptionalField::Absent
} else {
OptionalField::Present(tentative_htlc_maximum_msat)
};
}
network_graph.update_channel_unsigned(&synthetic_update)?;
}
self.network_graph.set_last_rapid_gossip_sync_timestamp(latest_seen_timestamp);
self.is_initial_sync_complete.store(true, Ordering::Release);
Ok(latest_seen_timestamp)
}
} }
#[cfg(test)] #[cfg(test)]
@ -231,7 +238,7 @@ mod tests {
use lightning::routing::network_graph::NetworkGraph; use lightning::routing::network_graph::NetworkGraph;
use crate::error::GraphSyncError; use crate::error::GraphSyncError;
use crate::processing::update_network_graph; use crate::RapidGossipSync;
#[test] #[test]
fn network_graph_fails_to_update_from_clipped_input() { fn network_graph_fails_to_update_from_clipped_input() {
@ -254,7 +261,8 @@ mod tests {
0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 2, 68, 226, 0, 6, 11, 0, 1, 24, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 2, 68, 226, 0, 6, 11, 0, 1, 24, 0,
0, 3, 232, 0, 0, 0, 0, 3, 232, 0, 0, 0,
]; ];
let update_result = update_network_graph(&network_graph, &example_input[..]); let rapid_sync = RapidGossipSync::new(&network_graph);
let update_result = rapid_sync.update_network_graph(&example_input[..]);
assert!(update_result.is_err()); assert!(update_result.is_err());
if let Err(GraphSyncError::DecodeError(DecodeError::ShortRead)) = update_result { if let Err(GraphSyncError::DecodeError(DecodeError::ShortRead)) = update_result {
// this is the expected error type // this is the expected error type
@ -278,7 +286,8 @@ mod tests {
assert_eq!(network_graph.read_only().channels().len(), 0); assert_eq!(network_graph.read_only().channels().len(), 0);
let update_result = update_network_graph(&network_graph, &incremental_update_input[..]); let rapid_sync = RapidGossipSync::new(&network_graph);
let update_result = rapid_sync.update_network_graph(&incremental_update_input[..]);
assert!(update_result.is_err()); assert!(update_result.is_err());
if let Err(GraphSyncError::LightningError(lightning_error)) = update_result { if let Err(GraphSyncError::LightningError(lightning_error)) = update_result {
assert_eq!(lightning_error.err, "Couldn't find channel for update"); assert_eq!(lightning_error.err, "Couldn't find channel for update");
@ -310,7 +319,8 @@ mod tests {
assert_eq!(network_graph.read_only().channels().len(), 0); assert_eq!(network_graph.read_only().channels().len(), 0);
let update_result = update_network_graph(&network_graph, &announced_update_input[..]); let rapid_sync = RapidGossipSync::new(&network_graph);
let update_result = rapid_sync.update_network_graph(&announced_update_input[..]);
assert!(update_result.is_err()); assert!(update_result.is_err());
if let Err(GraphSyncError::LightningError(lightning_error)) = update_result { if let Err(GraphSyncError::LightningError(lightning_error)) = update_result {
assert_eq!( assert_eq!(
@ -345,7 +355,8 @@ mod tests {
assert_eq!(network_graph.read_only().channels().len(), 0); assert_eq!(network_graph.read_only().channels().len(), 0);
let initialization_result = update_network_graph(&network_graph, &initialization_input[..]); let rapid_sync = RapidGossipSync::new(&network_graph);
let initialization_result = rapid_sync.update_network_graph(&initialization_input[..]);
if initialization_result.is_err() { if initialization_result.is_err() {
panic!( panic!(
"Unexpected initialization result: {:?}", "Unexpected initialization result: {:?}",
@ -373,10 +384,7 @@ mod tests {
0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 136, 0, 0, 0, 221, 255, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 136, 0, 0, 0, 221, 255, 2,
68, 226, 0, 6, 11, 0, 1, 128, 68, 226, 0, 6, 11, 0, 1, 128,
]; ];
let update_result = update_network_graph( let update_result = rapid_sync.update_network_graph(&opposite_direction_incremental_update_input[..]);
&network_graph,
&opposite_direction_incremental_update_input[..],
);
assert!(update_result.is_err()); assert!(update_result.is_err());
if let Err(GraphSyncError::LightningError(lightning_error)) = update_result { if let Err(GraphSyncError::LightningError(lightning_error)) = update_result {
assert_eq!( assert_eq!(
@ -413,7 +421,8 @@ mod tests {
assert_eq!(network_graph.read_only().channels().len(), 0); assert_eq!(network_graph.read_only().channels().len(), 0);
let initialization_result = update_network_graph(&network_graph, &initialization_input[..]); let rapid_sync = RapidGossipSync::new(&network_graph);
let initialization_result = rapid_sync.update_network_graph(&initialization_input[..]);
assert!(initialization_result.is_ok()); assert!(initialization_result.is_ok());
let single_direction_incremental_update_input = vec![ let single_direction_incremental_update_input = vec![
@ -423,10 +432,7 @@ mod tests {
0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 136, 0, 0, 0, 221, 255, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 136, 0, 0, 0, 221, 255, 2,
68, 226, 0, 6, 11, 0, 1, 128, 68, 226, 0, 6, 11, 0, 1, 128,
]; ];
let update_result = update_network_graph( let update_result = rapid_sync.update_network_graph(&single_direction_incremental_update_input[..]);
&network_graph,
&single_direction_incremental_update_input[..],
);
if update_result.is_err() { if update_result.is_err() {
panic!("Unexpected update result: {:?}", update_result) panic!("Unexpected update result: {:?}", update_result)
} }
@ -474,7 +480,8 @@ mod tests {
assert_eq!(network_graph.read_only().channels().len(), 0); assert_eq!(network_graph.read_only().channels().len(), 0);
let update_result = update_network_graph(&network_graph, &valid_input[..]); let rapid_sync = RapidGossipSync::new(&network_graph);
let update_result = rapid_sync.update_network_graph(&valid_input[..]);
if update_result.is_err() { if update_result.is_err() {
panic!("Unexpected update result: {:?}", update_result) panic!("Unexpected update result: {:?}", update_result)
} }

View file

@ -123,9 +123,7 @@ impl Readable for NodeId {
/// Represents the network as nodes and channels between them /// Represents the network as nodes and channels between them
pub struct NetworkGraph { pub struct NetworkGraph {
/// The unix timestamp in UTC provided by the most recent rapid gossip sync last_rapid_gossip_sync_timestamp: Mutex<Option<u32>>,
/// It will be set by the rapid sync process after every sync completion
pub last_rapid_gossip_sync_timestamp: Option<u32>,
genesis_hash: BlockHash, genesis_hash: BlockHash,
// Lock order: channels -> nodes // Lock order: channels -> nodes
channels: RwLock<BTreeMap<u64, ChannelInfo>>, channels: RwLock<BTreeMap<u64, ChannelInfo>>,
@ -136,11 +134,12 @@ impl Clone for NetworkGraph {
fn clone(&self) -> Self { fn clone(&self) -> Self {
let channels = self.channels.read().unwrap(); let channels = self.channels.read().unwrap();
let nodes = self.nodes.read().unwrap(); let nodes = self.nodes.read().unwrap();
let last_rapid_gossip_sync_timestamp = self.get_last_rapid_gossip_sync_timestamp();
Self { Self {
genesis_hash: self.genesis_hash.clone(), genesis_hash: self.genesis_hash.clone(),
channels: RwLock::new(channels.clone()), channels: RwLock::new(channels.clone()),
nodes: RwLock::new(nodes.clone()), nodes: RwLock::new(nodes.clone()),
last_rapid_gossip_sync_timestamp: self.last_rapid_gossip_sync_timestamp.clone(), last_rapid_gossip_sync_timestamp: Mutex::new(last_rapid_gossip_sync_timestamp)
} }
} }
} }
@ -994,8 +993,9 @@ impl Writeable for NetworkGraph {
node_info.write(writer)?; node_info.write(writer)?;
} }
let last_rapid_gossip_sync_timestamp = self.get_last_rapid_gossip_sync_timestamp();
write_tlv_fields!(writer, { write_tlv_fields!(writer, {
(1, self.last_rapid_gossip_sync_timestamp, option), (1, last_rapid_gossip_sync_timestamp, option),
}); });
Ok(()) Ok(())
} }
@ -1030,7 +1030,7 @@ impl Readable for NetworkGraph {
genesis_hash, genesis_hash,
channels: RwLock::new(channels), channels: RwLock::new(channels),
nodes: RwLock::new(nodes), nodes: RwLock::new(nodes),
last_rapid_gossip_sync_timestamp, last_rapid_gossip_sync_timestamp: Mutex::new(last_rapid_gossip_sync_timestamp),
}) })
} }
} }
@ -1064,7 +1064,7 @@ impl NetworkGraph {
genesis_hash, genesis_hash,
channels: RwLock::new(BTreeMap::new()), channels: RwLock::new(BTreeMap::new()),
nodes: RwLock::new(BTreeMap::new()), nodes: RwLock::new(BTreeMap::new()),
last_rapid_gossip_sync_timestamp: None, last_rapid_gossip_sync_timestamp: Mutex::new(None),
} }
} }
@ -1078,6 +1078,18 @@ impl NetworkGraph {
} }
} }
/// The unix timestamp provided by the most recent rapid gossip sync.
/// It will be set by the rapid sync process after every sync completion.
pub fn get_last_rapid_gossip_sync_timestamp(&self) -> Option<u32> {
self.last_rapid_gossip_sync_timestamp.lock().unwrap().clone()
}
/// Update the unix timestamp provided by the most recent rapid gossip sync.
/// This should be done automatically by the rapid sync process after every sync completion.
pub fn set_last_rapid_gossip_sync_timestamp(&self, last_rapid_gossip_sync_timestamp: u32) {
self.last_rapid_gossip_sync_timestamp.lock().unwrap().replace(last_rapid_gossip_sync_timestamp);
}
/// Clears the `NodeAnnouncementInfo` field for all nodes in the `NetworkGraph` for testing /// Clears the `NodeAnnouncementInfo` field for all nodes in the `NetworkGraph` for testing
/// purposes. /// purposes.
#[cfg(test)] #[cfg(test)]
@ -2374,13 +2386,13 @@ mod tests {
#[test] #[test]
fn network_graph_tlv_serialization() { fn network_graph_tlv_serialization() {
let mut network_graph = create_network_graph(); let mut network_graph = create_network_graph();
network_graph.last_rapid_gossip_sync_timestamp.replace(42); network_graph.set_last_rapid_gossip_sync_timestamp(42);
let mut w = test_utils::TestVecWriter(Vec::new()); let mut w = test_utils::TestVecWriter(Vec::new());
network_graph.write(&mut w).unwrap(); network_graph.write(&mut w).unwrap();
let reassembled_network_graph: NetworkGraph = Readable::read(&mut io::Cursor::new(&w.0)).unwrap(); let reassembled_network_graph: NetworkGraph = Readable::read(&mut io::Cursor::new(&w.0)).unwrap();
assert!(reassembled_network_graph == network_graph); assert!(reassembled_network_graph == network_graph);
assert_eq!(reassembled_network_graph.last_rapid_gossip_sync_timestamp.unwrap(), 42); assert_eq!(reassembled_network_graph.get_last_rapid_gossip_sync_timestamp().unwrap(), 42);
} }
#[test] #[test]