Score payment paths in BackgroundProcessor

This commit is contained in:
Valentine Wallace 2023-02-03 11:25:20 -05:00
parent 5aea2cf02b
commit 2f9c3e5ea1
No known key found for this signature in database
GPG Key ID: FD3E106A2CE099B4
2 changed files with 176 additions and 5 deletions

View File

@ -16,6 +16,9 @@
#[cfg(any(test, feature = "std"))]
extern crate core;
#[cfg(not(feature = "std"))]
extern crate alloc;
#[macro_use] extern crate lightning;
extern crate lightning_rapid_gossip_sync;
@ -28,7 +31,7 @@ use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMes
use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
use lightning::routing::router::Router;
use lightning::routing::scoring::WriteableScore;
use lightning::routing::scoring::{Score, WriteableScore};
use lightning::util::events::{Event, EventHandler, EventsProvider};
use lightning::util::logger::Logger;
use lightning::util::persist::Persister;
@ -49,6 +52,8 @@ use std::time::Instant;
#[cfg(feature = "futures")]
use futures_util::{select_biased, future::FutureExt, task};
#[cfg(not(feature = "std"))]
use alloc::vec::Vec;
/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
@ -216,6 +221,37 @@ fn handle_network_graph_update<L: Deref>(
}
}
fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
scorer: &'a S, event: &Event
) {
let mut score = scorer.lock();
match event {
Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
let path = path.iter().collect::<Vec<_>>();
score.payment_path_failed(&path, *scid);
},
Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
// Reached if the destination explicitly failed it back. We treat this as a successful probe
// because the payment made it all the way to the destination with sufficient liquidity.
let path = path.iter().collect::<Vec<_>>();
score.probe_successful(&path);
},
Event::PaymentPathSuccessful { path, .. } => {
let path = path.iter().collect::<Vec<_>>();
score.payment_path_successful(&path);
},
Event::ProbeSuccessful { path, .. } => {
let path = path.iter().collect::<Vec<_>>();
score.probe_successful(&path);
},
Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
let path = path.iter().collect::<Vec<_>>();
score.probe_failed(&path, *scid);
},
_ => {},
}
}
macro_rules! define_run_body {
($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
$channel_manager: ident, $process_channel_manager_events: expr,
@ -387,7 +423,7 @@ pub async fn process_events_async<
UMH: 'static + Deref + Send + Sync,
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
S: 'static + Deref<Target = SC> + Send + Sync,
SC: WriteableScore<'a>,
SC: for<'b> WriteableScore<'b>,
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
Sleeper: Fn(Duration) -> SleepFuture
>(
@ -417,10 +453,14 @@ where
let async_event_handler = |event| {
let network_graph = gossip_sync.network_graph();
let event_handler = &event_handler;
let scorer = &scorer;
async move {
if let Some(network_graph) = network_graph {
handle_network_graph_update(network_graph, &event)
}
if let Some(ref scorer) = scorer {
update_scorer(scorer, &event);
}
event_handler(event).await;
}
};
@ -516,7 +556,7 @@ impl BackgroundProcessor {
UMH: 'static + Deref + Send + Sync,
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
S: 'static + Deref<Target = SC> + Send + Sync,
SC: WriteableScore<'a>,
SC: for <'b> WriteableScore<'b>,
>(
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
@ -547,6 +587,9 @@ impl BackgroundProcessor {
if let Some(network_graph) = network_graph {
handle_network_graph_update(network_graph, &event)
}
if let Some(ref scorer) = scorer {
update_scorer(scorer, &event);
}
event_handler.handle_event(event);
};
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
@ -613,14 +656,16 @@ mod tests {
use bitcoin::blockdata::locktime::PackedLockTime;
use bitcoin::blockdata::transaction::{Transaction, TxOut};
use bitcoin::network::constants::Network;
use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
use lightning::chain::{BestBlock, Confirm, chainmonitor};
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
use lightning::chain::keysinterface::{InMemorySigner, EntropySource, KeysManager};
use lightning::chain::transaction::OutPoint;
use lightning::get_event_msg;
use lightning::ln::PaymentHash;
use lightning::ln::channelmanager;
use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters};
use lightning::ln::features::ChannelFeatures;
use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
use lightning::ln::features::{ChannelFeatures, NodeFeatures};
use lightning::ln::msgs::{ChannelMessageHandler, Init};
use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
@ -1296,4 +1341,124 @@ mod tests {
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
assert!(bg_processor.stop().is_ok());
}
#[test]
fn test_payment_path_scoring() {
// Ensure that we update the scorer when relevant events are processed. In this case, we ensure
// that we update the scorer upon a payment path succeeding (note that the channel must be
// public or else we won't score it).
// Set up a background event handler for FundingGenerationReady events.
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
let event_handler = move |event: Event| match event {
Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
Event::ProbeFailed { .. } => sender.send(event).unwrap(),
_ => panic!("Unexpected event: {:?}", event),
};
let nodes = create_nodes(1, "test_payment_path_scoring".to_string());
let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir.clone()));
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
let scored_scid = 4242;
let secp_ctx = Secp256k1::new();
let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
let path = vec![RouteHop {
pubkey: node_1_id,
node_features: NodeFeatures::empty(),
short_channel_id: scored_scid,
channel_features: ChannelFeatures::empty(),
fee_msat: 0,
cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
}];
nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
nodes[0].node.push_pending_event(Event::PaymentPathFailed {
payment_id: None,
payment_hash: PaymentHash([42; 32]),
payment_failed_permanently: false,
network_update: None,
all_paths_failed: true,
path: path.clone(),
short_channel_id: Some(scored_scid),
retry: None,
});
let event = receiver
.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
.expect("PaymentPathFailed not handled within deadline");
match event {
Event::PaymentPathFailed { .. } => {},
_ => panic!("Unexpected event"),
}
// Ensure we'll score payments that were explicitly failed back by the destination as
// ProbeSuccess.
nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
nodes[0].node.push_pending_event(Event::PaymentPathFailed {
payment_id: None,
payment_hash: PaymentHash([42; 32]),
payment_failed_permanently: true,
network_update: None,
all_paths_failed: true,
path: path.clone(),
short_channel_id: None,
retry: None,
});
let event = receiver
.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
.expect("PaymentPathFailed not handled within deadline");
match event {
Event::PaymentPathFailed { .. } => {},
_ => panic!("Unexpected event"),
}
nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() });
nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
payment_id: PaymentId([42; 32]),
payment_hash: None,
path: path.clone(),
});
let event = receiver
.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
.expect("PaymentPathSuccessful not handled within deadline");
match event {
Event::PaymentPathSuccessful { .. } => {},
_ => panic!("Unexpected event"),
}
nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
nodes[0].node.push_pending_event(Event::ProbeSuccessful {
payment_id: PaymentId([42; 32]),
payment_hash: PaymentHash([42; 32]),
path: path.clone(),
});
let event = receiver
.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
.expect("ProbeSuccessful not handled within deadline");
match event {
Event::ProbeSuccessful { .. } => {},
_ => panic!("Unexpected event"),
}
nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() });
nodes[0].node.push_pending_event(Event::ProbeFailed {
payment_id: PaymentId([42; 32]),
payment_hash: PaymentHash([42; 32]),
path: path.clone(),
short_channel_id: Some(scored_scid),
});
let event = receiver
.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
.expect("ProbeFailure not handled within deadline");
match event {
Event::ProbeFailed { .. } => {},
_ => panic!("Unexpected event"),
}
assert!(bg_processor.stop().is_ok());
}
}

View File

@ -5520,6 +5520,12 @@ where
events.into_inner()
}
#[cfg(feature = "_test_utils")]
pub fn push_pending_event(&self, event: events::Event) {
let mut events = self.pending_events.lock().unwrap();
events.push(event);
}
#[cfg(test)]
pub fn pop_pending_event(&self) -> Option<events::Event> {
let mut events = self.pending_events.lock().unwrap();