Make BP's test_payment_path_scoring dual sync/async.

This finally gives us a bit of test coverage of the async BP, which
was embarrassingly missing until now.
This commit is contained in:
Matt Corallo 2023-04-03 20:19:49 +00:00
parent ca367f5d08
commit f0b928b06d
2 changed files with 140 additions and 96 deletions

View file

@ -25,6 +25,7 @@ lightning = { version = "0.0.114", path = "../lightning", default-features = fal
lightning-rapid-gossip-sync = { version = "0.0.114", path = "../lightning-rapid-gossip-sync", default-features = false }
[dev-dependencies]
tokio = { version = "1.14", features = [ "macros", "rt", "rt-multi-thread", "sync", "time" ] }
lightning = { version = "0.0.114", path = "../lightning", features = ["_test_utils"] }
lightning-invoice = { version = "0.22.0", path = "../lightning-invoice" }
lightning-persister = { version = "0.0.114", path = "../lightning-persister" }

View file

@ -1419,12 +1419,100 @@ mod tests {
assert_eq!(network_graph.read_only().channels().len(), 0);
}
macro_rules! do_test_payment_path_scoring {
($nodes: expr, $receive: expr) => {
// Ensure that we update the scorer when relevant events are processed. In this case, we ensure
// that we update the scorer upon a payment path succeeding (note that the channel must be
// public or else we won't score it).
// A background event handler for FundingGenerationReady events must be hooked up to a
// running background processor.
let scored_scid = 4242;
let secp_ctx = Secp256k1::new();
let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
let path = vec![RouteHop {
pubkey: node_1_id,
node_features: NodeFeatures::empty(),
short_channel_id: scored_scid,
channel_features: ChannelFeatures::empty(),
fee_msat: 0,
cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
}];
$nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
$nodes[0].node.push_pending_event(Event::PaymentPathFailed {
payment_id: None,
payment_hash: PaymentHash([42; 32]),
payment_failed_permanently: false,
failure: PathFailure::OnPath { network_update: None },
path: path.clone(),
short_channel_id: Some(scored_scid),
});
let event = $receive.expect("PaymentPathFailed not handled within deadline");
match event {
Event::PaymentPathFailed { .. } => {},
_ => panic!("Unexpected event"),
}
// Ensure we'll score payments that were explicitly failed back by the destination as
// ProbeSuccess.
$nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
$nodes[0].node.push_pending_event(Event::PaymentPathFailed {
payment_id: None,
payment_hash: PaymentHash([42; 32]),
payment_failed_permanently: true,
failure: PathFailure::OnPath { network_update: None },
path: path.clone(),
short_channel_id: None,
});
let event = $receive.expect("PaymentPathFailed not handled within deadline");
match event {
Event::PaymentPathFailed { .. } => {},
_ => panic!("Unexpected event"),
}
$nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() });
$nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
payment_id: PaymentId([42; 32]),
payment_hash: None,
path: path.clone(),
});
let event = $receive.expect("PaymentPathSuccessful not handled within deadline");
match event {
Event::PaymentPathSuccessful { .. } => {},
_ => panic!("Unexpected event"),
}
$nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
$nodes[0].node.push_pending_event(Event::ProbeSuccessful {
payment_id: PaymentId([42; 32]),
payment_hash: PaymentHash([42; 32]),
path: path.clone(),
});
let event = $receive.expect("ProbeSuccessful not handled within deadline");
match event {
Event::ProbeSuccessful { .. } => {},
_ => panic!("Unexpected event"),
}
$nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() });
$nodes[0].node.push_pending_event(Event::ProbeFailed {
payment_id: PaymentId([42; 32]),
payment_hash: PaymentHash([42; 32]),
path,
short_channel_id: Some(scored_scid),
});
let event = $receive.expect("ProbeFailure not handled within deadline");
match event {
Event::ProbeFailed { .. } => {},
_ => panic!("Unexpected event"),
}
}
}
#[test]
fn test_payment_path_scoring() {
// Ensure that we update the scorer when relevant events are processed. In this case, we ensure
// that we update the scorer upon a payment path succeeding (note that the channel must be
// public or else we won't score it).
// Set up a background event handler for FundingGenerationReady events.
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
let event_handler = move |event: Event| match event {
Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
@ -1439,101 +1527,56 @@ mod tests {
let persister = Arc::new(Persister::new(data_dir));
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
let scored_scid = 4242;
let secp_ctx = Secp256k1::new();
let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
let path = vec![RouteHop {
pubkey: node_1_id,
node_features: NodeFeatures::empty(),
short_channel_id: scored_scid,
channel_features: ChannelFeatures::empty(),
fee_msat: 0,
cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
}];
nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
nodes[0].node.push_pending_event(Event::PaymentPathFailed {
payment_id: None,
payment_hash: PaymentHash([42; 32]),
payment_failed_permanently: false,
failure: PathFailure::OnPath { network_update: None },
path: path.clone(),
short_channel_id: Some(scored_scid),
});
let event = receiver
.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
.expect("PaymentPathFailed not handled within deadline");
match event {
Event::PaymentPathFailed { .. } => {},
_ => panic!("Unexpected event"),
}
// Ensure we'll score payments that were explicitly failed back by the destination as
// ProbeSuccess.
nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
nodes[0].node.push_pending_event(Event::PaymentPathFailed {
payment_id: None,
payment_hash: PaymentHash([42; 32]),
payment_failed_permanently: true,
failure: PathFailure::OnPath { network_update: None },
path: path.clone(),
short_channel_id: None,
});
let event = receiver
.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
.expect("PaymentPathFailed not handled within deadline");
match event {
Event::PaymentPathFailed { .. } => {},
_ => panic!("Unexpected event"),
}
nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() });
nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
payment_id: PaymentId([42; 32]),
payment_hash: None,
path: path.clone(),
});
let event = receiver
.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
.expect("PaymentPathSuccessful not handled within deadline");
match event {
Event::PaymentPathSuccessful { .. } => {},
_ => panic!("Unexpected event"),
}
nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
nodes[0].node.push_pending_event(Event::ProbeSuccessful {
payment_id: PaymentId([42; 32]),
payment_hash: PaymentHash([42; 32]),
path: path.clone(),
});
let event = receiver
.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
.expect("ProbeSuccessful not handled within deadline");
match event {
Event::ProbeSuccessful { .. } => {},
_ => panic!("Unexpected event"),
}
nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() });
nodes[0].node.push_pending_event(Event::ProbeFailed {
payment_id: PaymentId([42; 32]),
payment_hash: PaymentHash([42; 32]),
path,
short_channel_id: Some(scored_scid),
});
let event = receiver
.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
.expect("ProbeFailure not handled within deadline");
match event {
Event::ProbeFailed { .. } => {},
_ => panic!("Unexpected event"),
}
do_test_payment_path_scoring!(nodes, receiver.recv_timeout(Duration::from_secs(EVENT_DEADLINE)));
if !std::thread::panicking() {
bg_processor.stop().unwrap();
}
}
#[tokio::test]
#[cfg(feature = "futures")]
async fn test_payment_path_scoring_async() {
let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
let event_handler = move |event: Event| {
let sender_ref = sender.clone();
async move {
match event {
Event::PaymentPathFailed { .. } => { sender_ref.send(event).await.unwrap() },
Event::PaymentPathSuccessful { .. } => { sender_ref.send(event).await.unwrap() },
Event::ProbeSuccessful { .. } => { sender_ref.send(event).await.unwrap() },
Event::ProbeFailed { .. } => { sender_ref.send(event).await.unwrap() },
_ => panic!("Unexpected event: {:?}", event),
}
}
};
let nodes = create_nodes(1, "test_payment_path_scoring_async".to_string());
let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
let bp_future = super::process_events_async(
persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
Some(nodes[0].scorer.clone()), move |dur: Duration| {
let mut exit_receiver = exit_receiver.clone();
Box::pin(async move {
tokio::select! {
_ = tokio::time::sleep(dur) => false,
_ = exit_receiver.changed() => true,
}
})
}, false,
);
// TODO: Drop _local and simply spawn after #2003
let local_set = tokio::task::LocalSet::new();
local_set.spawn_local(bp_future);
local_set.spawn_local(async move {
do_test_payment_path_scoring!(nodes, receiver.recv().await);
exit_sender.send(()).unwrap();
});
local_set.await;
}
}