Regenerate PendingHTLCsForwardable on reload instead of serializing

When we are prepared to forward HTLCs, we generate a
PendingHTLCsForwardable event with a time in the future when the
user should tell us to forward. This provides some basic batching
of forward events, improving privacy slightly.

After we generate the event, we expect users to spawn a timer in
the background and let us know when it finishes. However, if the
user shuts down before the timer fires, the user will restart and
have no idea that HTLCs are waiting to be forwarded/received.

To fix this, instead of serializing PendingHTLCsForwardable events
to disk while they're pending (before the user starts the timer),
we simply regenerate them when a ChannelManager is deserialized
with HTLCs pending.

Fixes #1042
This commit is contained in:
Matt Corallo 2021-09-15 19:20:44 +00:00
parent 24065c89a9
commit 0fcc34b9b5
3 changed files with 132 additions and 12 deletions

View file

@ -5276,6 +5276,16 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
None => continue,
}
}
if forward_htlcs_count > 0 {
// If we have pending HTLCs to forward, assume we either dropped a
// `PendingHTLCsForwardable` or the user received it but never processed it as they
// shut down before the timer hit. Either way, set the time_forwardable to a small
// constant as enough time has likely passed that we should simply handle the forwards
// now, or at least after the user gets a chance to reconnect to our peers.
pending_events_read.push(events::Event::PendingHTLCsForwardable {
time_forwardable: Duration::from_secs(2),
});
}
let background_event_count: u64 = Readable::read(reader)?;
let mut pending_background_events_read: Vec<BackgroundEvent> = Vec::with_capacity(cmp::min(background_event_count as usize, MAX_ALLOC_SIZE/mem::size_of::<BackgroundEvent>()));

View file

@ -9014,6 +9014,125 @@ fn test_tx_confirmed_skipping_blocks_immediate_broadcast() {
do_test_tx_confirmed_skipping_blocks_immediate_broadcast(true);
}
#[test]
fn test_forwardable_regen() {
// Tests that if we reload a ChannelManager while forwards are pending we will regenerate the
// PendingHTLCsForwardable event automatically, ensuring we don't forget to forward/receive
// HTLCs.
// We test it for both payment receipt and payment forwarding.
let chanmon_cfgs = create_chanmon_cfgs(3);
let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
let persister: test_utils::TestPersister;
let new_chain_monitor: test_utils::TestChainMonitor;
let nodes_1_deserialized: ChannelManager<EnforcingSigner, &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestLogger>;
let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
create_announced_chan_between_nodes(&nodes, 1, 2, InitFeatures::known(), InitFeatures::known());
// First send a payment to nodes[1]
let (route, payment_hash, payment_preimage, payment_secret) = get_route_and_payment_hash!(nodes[0], nodes[1], 100_000);
nodes[0].node.send_payment(&route, payment_hash, &Some(payment_secret)).unwrap();
check_added_monitors!(nodes[0], 1);
let mut events = nodes[0].node.get_and_clear_pending_msg_events();
assert_eq!(events.len(), 1);
let payment_event = SendEvent::from_event(events.pop().unwrap());
nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &payment_event.msgs[0]);
commitment_signed_dance!(nodes[1], nodes[0], payment_event.commitment_msg, false);
expect_pending_htlcs_forwardable_ignore!(nodes[1]);
// Next send a payment which is forwarded by nodes[1]
let (route_2, payment_hash_2, payment_preimage_2, payment_secret_2) = get_route_and_payment_hash!(nodes[0], nodes[2], 200_000);
nodes[0].node.send_payment(&route_2, payment_hash_2, &Some(payment_secret_2)).unwrap();
check_added_monitors!(nodes[0], 1);
let mut events = nodes[0].node.get_and_clear_pending_msg_events();
assert_eq!(events.len(), 1);
let payment_event = SendEvent::from_event(events.pop().unwrap());
nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &payment_event.msgs[0]);
commitment_signed_dance!(nodes[1], nodes[0], payment_event.commitment_msg, false);
// There is already a PendingHTLCsForwardable event "pending" so another one will not be
// generated
assert!(nodes[1].node.get_and_clear_pending_events().is_empty());
// Now restart nodes[1] and make sure it regenerates a single PendingHTLCsForwardable
nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
nodes[2].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
let nodes_1_serialized = nodes[1].node.encode();
let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new());
let mut chan_1_monitor_serialized = test_utils::TestVecWriter(Vec::new());
{
let monitors = nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap();
let mut monitor_iter = monitors.iter();
monitor_iter.next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap();
monitor_iter.next().unwrap().1.write(&mut chan_1_monitor_serialized).unwrap();
}
persister = test_utils::TestPersister::new();
let keys_manager = &chanmon_cfgs[1].keys_manager;
new_chain_monitor = test_utils::TestChainMonitor::new(Some(nodes[1].chain_source), nodes[1].tx_broadcaster.clone(), nodes[1].logger, node_cfgs[1].fee_estimator, &persister, keys_manager);
nodes[1].chain_monitor = &new_chain_monitor;
let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..];
let (_, mut chan_0_monitor) = <(BlockHash, ChannelMonitor<EnforcingSigner>)>::read(
&mut chan_0_monitor_read, keys_manager).unwrap();
assert!(chan_0_monitor_read.is_empty());
let mut chan_1_monitor_read = &chan_1_monitor_serialized.0[..];
let (_, mut chan_1_monitor) = <(BlockHash, ChannelMonitor<EnforcingSigner>)>::read(
&mut chan_1_monitor_read, keys_manager).unwrap();
assert!(chan_1_monitor_read.is_empty());
let mut nodes_1_read = &nodes_1_serialized[..];
let (_, nodes_1_deserialized_tmp) = {
let mut channel_monitors = HashMap::new();
channel_monitors.insert(chan_0_monitor.get_funding_txo().0, &mut chan_0_monitor);
channel_monitors.insert(chan_1_monitor.get_funding_txo().0, &mut chan_1_monitor);
<(BlockHash, ChannelManager<EnforcingSigner, &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestLogger>)>::read(&mut nodes_1_read, ChannelManagerReadArgs {
default_config: UserConfig::default(),
keys_manager,
fee_estimator: node_cfgs[1].fee_estimator,
chain_monitor: nodes[1].chain_monitor,
tx_broadcaster: nodes[1].tx_broadcaster.clone(),
logger: nodes[1].logger,
channel_monitors,
}).unwrap()
};
nodes_1_deserialized = nodes_1_deserialized_tmp;
assert!(nodes_1_read.is_empty());
assert!(nodes[1].chain_monitor.watch_channel(chan_0_monitor.get_funding_txo().0, chan_0_monitor).is_ok());
assert!(nodes[1].chain_monitor.watch_channel(chan_1_monitor.get_funding_txo().0, chan_1_monitor).is_ok());
nodes[1].node = &nodes_1_deserialized;
check_added_monitors!(nodes[1], 2);
reconnect_nodes(&nodes[0], &nodes[1], (false, false), (0, 0), (0, 0), (0, 0), (0, 0), (0, 0), (false, false));
// Note that nodes[1] and nodes[2] resend their funding_locked here since they haven't updated
// the commitment state.
reconnect_nodes(&nodes[1], &nodes[2], (true, true), (0, 0), (0, 0), (0, 0), (0, 0), (0, 0), (false, false));
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
expect_pending_htlcs_forwardable!(nodes[1]);
expect_payment_received!(nodes[1], payment_hash, payment_secret, 100_000);
check_added_monitors!(nodes[1], 1);
let mut events = nodes[1].node.get_and_clear_pending_msg_events();
assert_eq!(events.len(), 1);
let payment_event = SendEvent::from_event(events.pop().unwrap());
nodes[2].node.handle_update_add_htlc(&nodes[1].node.get_our_node_id(), &payment_event.msgs[0]);
commitment_signed_dance!(nodes[2], nodes[1], payment_event.commitment_msg, false);
expect_pending_htlcs_forwardable!(nodes[2]);
expect_payment_received!(nodes[2], payment_hash_2, payment_secret_2, 200_000);
claim_payment(&nodes[0], &[&nodes[1]], payment_preimage);
claim_payment(&nodes[0], &[&nodes[1], &nodes[2]], payment_preimage_2);
}
#[test]
fn test_keysend_payments_to_public_node() {
let chanmon_cfgs = create_chanmon_cfgs(2);

View file

@ -240,9 +240,8 @@ impl Writeable for Event {
},
&Event::PendingHTLCsForwardable { time_forwardable: _ } => {
4u8.write(writer)?;
write_tlv_fields!(writer, {});
// We don't write the time_fordwardable out at all, as we presume when the user
// deserializes us at least that much time has elapsed.
// Note that we now ignore these on the read end as we'll re-generate them in
// ChannelManager, we write them here only for backwards compatibility.
},
&Event::SpendableOutputs { ref outputs } => {
5u8.write(writer)?;
@ -336,15 +335,7 @@ impl MaybeReadable for Event {
};
f()
},
4u8 => {
let f = || {
read_tlv_fields!(reader, {});
Ok(Some(Event::PendingHTLCsForwardable {
time_forwardable: Duration::from_secs(0)
}))
};
f()
},
4u8 => Ok(None),
5u8 => {
let f = || {
let mut outputs = VecReadWrapper(Vec::new());