From 86976e0003cb4a5de01bdadbcde9d9628c243468 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 10 Apr 2023 07:05:31 +0000 Subject: [PATCH 1/6] Don't rely on `calculate_success_probability*` to handle amt > cap Currently we let an `htlc_amount >= channel_capacity` pass through from `penalty_msat` to `calculate_success_probability_times_billion`, but only if its only marginally bigger (less than 65/64ths). This is fine as `calculate_success_probability_times_billion` handles bogus values just fine (it will always return a zero probability in such cases). However, this is risky, and in fact breaks in the coming commits, so instead check it before ever calling through to the historical bucket probability calculations. --- lightning/src/routing/scoring.rs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 235b1a148..40715c846 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -1035,6 +1035,7 @@ impl, BRT: Deref, /// Returns a liquidity penalty for routing the given HTLC `amount_msat` through the channel in /// this direction. fn penalty_msat(&self, amount_msat: u64, score_params: &ProbabilisticScoringFeeParameters) -> u64 { + let available_capacity = self.available_capacity(); let max_liquidity_msat = self.max_liquidity_msat(); let min_liquidity_msat = core::cmp::min(self.min_liquidity_msat(), max_liquidity_msat); @@ -1066,6 +1067,15 @@ impl, BRT: Deref, } }; + if amount_msat >= available_capacity { + // We're trying to send more than the capacity, use a max penalty. + res = res.saturating_add(Self::combined_penalty_msat(amount_msat, + NEGATIVE_LOG10_UPPER_BOUND * 2048, + score_params.historical_liquidity_penalty_multiplier_msat, + score_params.historical_liquidity_penalty_amount_multiplier_msat)); + return res; + } + if score_params.historical_liquidity_penalty_multiplier_msat != 0 || score_params.historical_liquidity_penalty_amount_multiplier_msat != 0 { let payment_amt_64th_bucket = if amount_msat < u64::max_value() / 64 { @@ -1130,17 +1140,20 @@ impl, BRT: Deref, } /// Returns the lower bound of the channel liquidity balance in this direction. + #[inline(always)] fn min_liquidity_msat(&self) -> u64 { self.decayed_offset_msat(*self.min_liquidity_offset_msat) } /// Returns the upper bound of the channel liquidity balance in this direction. + #[inline(always)] fn max_liquidity_msat(&self) -> u64 { self.available_capacity() .saturating_sub(self.decayed_offset_msat(*self.max_liquidity_offset_msat)) } /// Returns the capacity minus the in-flight HTLCs in this direction. + #[inline(always)] fn available_capacity(&self) -> u64 { self.capacity_msat.saturating_sub(self.inflight_htlc_msat) } @@ -2858,12 +2871,14 @@ mod tests { assert_eq!(scorer.historical_estimated_channel_liquidity_probabilities(42, &target), Some(([0; 8], [0; 8]))); - let usage = ChannelUsage { + let mut usage = ChannelUsage { amount_msat: 100, inflight_htlc_msat: 1024, effective_capacity: EffectiveCapacity::Total { capacity_msat: 1_024, htlc_maximum_msat: 1_024 }, }; scorer.payment_path_failed(&payment_path_for_amount(1), 42); + assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage, ¶ms), 2048); + usage.inflight_htlc_msat = 0; assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage, ¶ms), 409); let usage = ChannelUsage { From c4947acaec808ff3568e3c168ac942d9cfcdb9b0 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 10 Apr 2023 22:54:48 +0000 Subject: [PATCH 2/6] Correctly apply penalty bounds on the per-amount penalties When we attempt to score a channel which has a success probability very low, we may have a log well above our cut-off of two. For the liquidity penalties this works great, we bound it by `NEGATIVE_LOG10_UPPER_BOUND` and `min` the two scores. For the amount liquidity penalty we didn't do any `min`ing at all. This fix is to min the log itself first and then reuse the min'd log in both calculations. --- lightning/src/routing/scoring.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 40715c846..29650d371 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -1123,15 +1123,15 @@ impl, BRT: Deref, /// Computes the liquidity penalty from the penalty multipliers. #[inline(always)] - fn combined_penalty_msat(amount_msat: u64, negative_log10_times_2048: u64, + fn combined_penalty_msat(amount_msat: u64, mut negative_log10_times_2048: u64, liquidity_penalty_multiplier_msat: u64, liquidity_penalty_amount_multiplier_msat: u64, ) -> u64 { - let liquidity_penalty_msat = { - // Upper bound the liquidity penalty to ensure some channel is selected. - let multiplier_msat = liquidity_penalty_multiplier_msat; - let max_penalty_msat = multiplier_msat.saturating_mul(NEGATIVE_LOG10_UPPER_BOUND); - (negative_log10_times_2048.saturating_mul(multiplier_msat) / 2048).min(max_penalty_msat) - }; + negative_log10_times_2048 = + negative_log10_times_2048.min(NEGATIVE_LOG10_UPPER_BOUND * 2048); + + // Upper bound the liquidity penalty to ensure some channel is selected. + let liquidity_penalty_msat = negative_log10_times_2048 + .saturating_mul(liquidity_penalty_multiplier_msat) / 2048; let amount_penalty_msat = negative_log10_times_2048 .saturating_mul(liquidity_penalty_amount_multiplier_msat) .saturating_mul(amount_msat) / 2048 / AMOUNT_PENALTY_DIVISOR; From 568731008ec858dc8865b4e72f2911d3e52400c8 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 9 Apr 2023 04:43:23 +0000 Subject: [PATCH 3/6] Find payment bucket in calculate_success_probability_times_billion This simply moves code which will simplify the next commit somewhat. --- lightning/src/routing/scoring.rs | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 29650d371..5b8c28cab 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -725,7 +725,7 @@ impl HistoricalMinMaxBuckets<'_> { #[inline] fn calculate_success_probability_times_billion( - &self, now: T, last_updated: T, half_life: Duration, payment_amt_64th_bucket: u8) + &self, now: T, last_updated: T, half_life: Duration, amount_msat: u64, capacity_msat: u64) -> Option { // If historical penalties are enabled, calculate the penalty by walking the set of // historical liquidity bucket (min, max) combinations (where min_idx < max_idx) and, for @@ -748,6 +748,20 @@ impl HistoricalMinMaxBuckets<'_> { // less than 1/16th of a channel's capacity, or 1/8th if we used the top of the bucket. let mut total_valid_points_tracked = 0; + let payment_amt_64th_bucket: u8 = if amount_msat < u64::max_value() / 64 { + (amount_msat * 64 / capacity_msat.saturating_add(1)) + .try_into().unwrap_or(65) + } else { + // Only use 128-bit arithmetic when multiplication will overflow to avoid 128-bit + // division. This branch should only be hit in fuzz testing since the amount would + // need to be over 2.88 million BTC in practice. + ((amount_msat as u128) * 64 / (capacity_msat as u128).saturating_add(1)) + .try_into().unwrap_or(65) + }; + #[cfg(not(fuzzing))] + debug_assert!(payment_amt_64th_bucket <= 64); + if payment_amt_64th_bucket >= 64 { return None; } + // Check if all our buckets are zero, once decayed and treat it as if we had no data. We // don't actually use the decayed buckets, though, as that would lose precision. let (decayed_min_buckets, decayed_max_buckets, required_decays) = @@ -1078,26 +1092,13 @@ impl, BRT: Deref, if score_params.historical_liquidity_penalty_multiplier_msat != 0 || score_params.historical_liquidity_penalty_amount_multiplier_msat != 0 { - let payment_amt_64th_bucket = if amount_msat < u64::max_value() / 64 { - amount_msat * 64 / self.capacity_msat.saturating_add(1) - } else { - // Only use 128-bit arithmetic when multiplication will overflow to avoid 128-bit - // division. This branch should only be hit in fuzz testing since the amount would - // need to be over 2.88 million BTC in practice. - ((amount_msat as u128) * 64 / (self.capacity_msat as u128).saturating_add(1)) - .try_into().unwrap_or(65) - }; - #[cfg(not(fuzzing))] - debug_assert!(payment_amt_64th_bucket <= 64); - if payment_amt_64th_bucket > 64 { return res; } - let buckets = HistoricalMinMaxBuckets { min_liquidity_offset_history: &self.min_liquidity_offset_history, max_liquidity_offset_history: &self.max_liquidity_offset_history, }; if let Some(cumulative_success_prob_times_billion) = buckets .calculate_success_probability_times_billion(self.now, *self.last_updated, - self.decay_params.historical_no_updates_half_life, payment_amt_64th_bucket as u8) + self.decay_params.historical_no_updates_half_life, amount_msat, self.capacity_msat) { let historical_negative_log10_times_2048 = approx::negative_log10_times_2048(cumulative_success_prob_times_billion + 1, 1024 * 1024 * 1024); res = res.saturating_add(Self::combined_penalty_msat(amount_msat, From 534d7317cf7ee4dd9294b6c028eaa470fb9257db Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sat, 20 May 2023 23:28:18 +0000 Subject: [PATCH 4/6] Expose the historical success probability calculation itself In 3f32f60ae7e75a4be96d3d5adc8d18b53445e5e5 we exposed the historical success probability buckets directly, with a long method doc explaining how to use it. While this is great for logging exactly what the internal model thinks, its also helpful to let users know what the internal model thinks the success probability is directly, allowing them to compare route success probabilities. Here we do so but only for the historical tracking buckets. --- lightning/src/routing/scoring.rs | 56 +++++++++++++++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 5b8c28cab..804e08f27 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -939,6 +939,9 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU /// /// Because the datapoints are decayed slowly over time, values will eventually return to /// `Some(([0; 8], [0; 8]))`. + /// + /// In order to fetch a single success probability from the buckets provided here, as used in + /// the scoring model, see [`Self::historical_estimated_payment_success_probability`]. pub fn historical_estimated_channel_liquidity_probabilities(&self, scid: u64, target: &NodeId) -> Option<([u16; 8], [u16; 8])> { let graph = self.network_graph.read_only(); @@ -953,7 +956,7 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU min_liquidity_offset_history: &dir_liq.min_liquidity_offset_history, max_liquidity_offset_history: &dir_liq.max_liquidity_offset_history, }; - let (min_buckets, mut max_buckets, _) = buckets.get_decayed_buckets(T::now(), + let (min_buckets, mut max_buckets, _) = buckets.get_decayed_buckets(dir_liq.now, *dir_liq.last_updated, self.decay_params.historical_no_updates_half_life); // Note that the liquidity buckets are an offset from the edge, so we inverse // the max order to get the probabilities from zero. @@ -964,6 +967,39 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU } None } + + /// Query the probability of payment success sending the given `amount_msat` over the channel + /// with `scid` towards the given `target` node, based on the historical estimated liquidity + /// bounds. + /// + /// These are the same bounds as returned by + /// [`Self::historical_estimated_channel_liquidity_probabilities`] (but not those returned by + /// [`Self::estimated_channel_liquidity_range`]). + pub fn historical_estimated_payment_success_probability( + &self, scid: u64, target: &NodeId, amount_msat: u64) + -> Option { + let graph = self.network_graph.read_only(); + + if let Some(chan) = graph.channels().get(&scid) { + if let Some(liq) = self.channel_liquidities.get(&scid) { + if let Some((directed_info, source)) = chan.as_directed_to(target) { + let capacity_msat = directed_info.effective_capacity().as_msat(); + let dir_liq = liq.as_directed(source, target, 0, capacity_msat, self.decay_params); + + let buckets = HistoricalMinMaxBuckets { + min_liquidity_offset_history: &dir_liq.min_liquidity_offset_history, + max_liquidity_offset_history: &dir_liq.max_liquidity_offset_history, + }; + + return buckets.calculate_success_probability_times_billion(dir_liq.now, + *dir_liq.last_updated, self.decay_params.historical_no_updates_half_life, + amount_msat, capacity_msat + ).map(|p| p as f64 / (1024 * 1024 * 1024) as f64); + } + } + } + None + } } impl ChannelLiquidity { @@ -2847,6 +2883,8 @@ mod tests { assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage, ¶ms), 47); assert_eq!(scorer.historical_estimated_channel_liquidity_probabilities(42, &target), None); + assert_eq!(scorer.historical_estimated_payment_success_probability(42, &target, 42), + None); scorer.payment_path_failed(&payment_path_for_amount(1), 42); assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage, ¶ms), 2048); @@ -2854,6 +2892,10 @@ mod tests { // octile. assert_eq!(scorer.historical_estimated_channel_liquidity_probabilities(42, &target), Some(([32, 0, 0, 0, 0, 0, 0, 0], [32, 0, 0, 0, 0, 0, 0, 0]))); + assert_eq!(scorer.historical_estimated_payment_success_probability(42, &target, 1), + Some(1.0)); + assert_eq!(scorer.historical_estimated_payment_success_probability(42, &target, 500), + Some(0.0)); // Even after we tell the scorer we definitely have enough available liquidity, it will // still remember that there was some failure in the past, and assign a non-0 penalty. @@ -2863,6 +2905,17 @@ mod tests { assert_eq!(scorer.historical_estimated_channel_liquidity_probabilities(42, &target), Some(([31, 0, 0, 0, 0, 0, 0, 32], [31, 0, 0, 0, 0, 0, 0, 32]))); + // The exact success probability is a bit complicated and involves integer rounding, so we + // simply check bounds here. + let five_hundred_prob = + scorer.historical_estimated_payment_success_probability(42, &target, 500).unwrap(); + assert!(five_hundred_prob > 0.5); + assert!(five_hundred_prob < 0.52); + let one_prob = + scorer.historical_estimated_payment_success_probability(42, &target, 1).unwrap(); + assert!(one_prob < 1.0); + assert!(one_prob > 0.99); + // Advance the time forward 16 half-lives (which the docs claim will ensure all data is // gone), and check that we're back to where we started. SinceEpoch::advance(Duration::from_secs(10 * 16)); @@ -2871,6 +2924,7 @@ mod tests { // data entirely instead. assert_eq!(scorer.historical_estimated_channel_liquidity_probabilities(42, &target), Some(([0; 8], [0; 8]))); + assert_eq!(scorer.historical_estimated_payment_success_probability(42, &target, 1), None); let mut usage = ChannelUsage { amount_msat: 100, From 2a1dff4c1007ba777f446dd0b232cc20b900956b Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sat, 20 May 2023 23:31:57 +0000 Subject: [PATCH 5/6] Move the bucketed history tracking logic into a scoring submodule --- lightning/src/routing/scoring.rs | 315 ++++++++++++++++--------------- 1 file changed, 160 insertions(+), 155 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 804e08f27..9c337f9b9 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -649,161 +649,6 @@ impl ProbabilisticScoringDecayParameters { } } -/// Tracks the historical state of a distribution as a weighted average of how much time was spent -/// in each of 8 buckets. -#[derive(Clone, Copy)] -struct HistoricalBucketRangeTracker { - buckets: [u16; 8], -} - -impl HistoricalBucketRangeTracker { - fn new() -> Self { Self { buckets: [0; 8] } } - fn track_datapoint(&mut self, liquidity_offset_msat: u64, capacity_msat: u64) { - // We have 8 leaky buckets for min and max liquidity. Each bucket tracks the amount of time - // we spend in each bucket as a 16-bit fixed-point number with a 5 bit fractional part. - // - // Each time we update our liquidity estimate, we add 32 (1.0 in our fixed-point system) to - // the buckets for the current min and max liquidity offset positions. - // - // We then decay each bucket by multiplying by 2047/2048 (avoiding dividing by a - // non-power-of-two). This ensures we can't actually overflow the u16 - when we get to - // 63,457 adding 32 and decaying by 2047/2048 leaves us back at 63,457. - // - // In total, this allows us to track data for the last 8,000 or so payments across a given - // channel. - // - // These constants are a balance - we try to fit in 2 bytes per bucket to reduce overhead, - // and need to balance having more bits in the decimal part (to ensure decay isn't too - // non-linear) with having too few bits in the mantissa, causing us to not store very many - // datapoints. - // - // The constants were picked experimentally, selecting a decay amount that restricts us - // from overflowing buckets without having to cap them manually. - - // Ensure the bucket index is in the range [0, 7], even if the liquidity offset is zero or - // the channel's capacity, though the second should generally never happen. - debug_assert!(liquidity_offset_msat <= capacity_msat); - let bucket_idx: u8 = (liquidity_offset_msat * 8 / capacity_msat.saturating_add(1)) - .try_into().unwrap_or(32); // 32 is bogus for 8 buckets, and will be ignored - debug_assert!(bucket_idx < 8); - if bucket_idx < 8 { - for e in self.buckets.iter_mut() { - *e = ((*e as u32) * 2047 / 2048) as u16; - } - self.buckets[bucket_idx as usize] = self.buckets[bucket_idx as usize].saturating_add(32); - } - } - /// Decay all buckets by the given number of half-lives. Used to more aggressively remove old - /// datapoints as we receive newer information. - fn time_decay_data(&mut self, half_lives: u32) { - for e in self.buckets.iter_mut() { - *e = e.checked_shr(half_lives).unwrap_or(0); - } - } -} - -impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) }); - -struct HistoricalMinMaxBuckets<'a> { - min_liquidity_offset_history: &'a HistoricalBucketRangeTracker, - max_liquidity_offset_history: &'a HistoricalBucketRangeTracker, -} - -impl HistoricalMinMaxBuckets<'_> { - #[inline] - fn get_decayed_buckets(&self, now: T, last_updated: T, half_life: Duration) - -> ([u16; 8], [u16; 8], u32) { - let required_decays = now.duration_since(last_updated).as_secs() - .checked_div(half_life.as_secs()) - .map_or(u32::max_value(), |decays| cmp::min(decays, u32::max_value() as u64) as u32); - let mut min_buckets = *self.min_liquidity_offset_history; - min_buckets.time_decay_data(required_decays); - let mut max_buckets = *self.max_liquidity_offset_history; - max_buckets.time_decay_data(required_decays); - (min_buckets.buckets, max_buckets.buckets, required_decays) - } - - #[inline] - fn calculate_success_probability_times_billion( - &self, now: T, last_updated: T, half_life: Duration, amount_msat: u64, capacity_msat: u64) - -> Option { - // If historical penalties are enabled, calculate the penalty by walking the set of - // historical liquidity bucket (min, max) combinations (where min_idx < max_idx) and, for - // each, calculate the probability of success given our payment amount, then total the - // weighted average probability of success. - // - // We use a sliding scale to decide which point within a given bucket will be compared to - // the amount being sent - for lower-bounds, the amount being sent is compared to the lower - // edge of the first bucket (i.e. zero), but compared to the upper 7/8ths of the last - // bucket (i.e. 9 times the index, or 63), with each bucket in between increasing the - // comparison point by 1/64th. For upper-bounds, the same applies, however with an offset - // of 1/64th (i.e. starting at one and ending at 64). This avoids failing to assign - // penalties to channels at the edges. - // - // If we used the bottom edge of buckets, we'd end up never assigning any penalty at all to - // such a channel when sending less than ~0.19% of the channel's capacity (e.g. ~200k sats - // for a 1 BTC channel!). - // - // If we used the middle of each bucket we'd never assign any penalty at all when sending - // less than 1/16th of a channel's capacity, or 1/8th if we used the top of the bucket. - let mut total_valid_points_tracked = 0; - - let payment_amt_64th_bucket: u8 = if amount_msat < u64::max_value() / 64 { - (amount_msat * 64 / capacity_msat.saturating_add(1)) - .try_into().unwrap_or(65) - } else { - // Only use 128-bit arithmetic when multiplication will overflow to avoid 128-bit - // division. This branch should only be hit in fuzz testing since the amount would - // need to be over 2.88 million BTC in practice. - ((amount_msat as u128) * 64 / (capacity_msat as u128).saturating_add(1)) - .try_into().unwrap_or(65) - }; - #[cfg(not(fuzzing))] - debug_assert!(payment_amt_64th_bucket <= 64); - if payment_amt_64th_bucket >= 64 { return None; } - - // Check if all our buckets are zero, once decayed and treat it as if we had no data. We - // don't actually use the decayed buckets, though, as that would lose precision. - let (decayed_min_buckets, decayed_max_buckets, required_decays) = - self.get_decayed_buckets(now, last_updated, half_life); - if decayed_min_buckets.iter().all(|v| *v == 0) || decayed_max_buckets.iter().all(|v| *v == 0) { - return None; - } - - for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() { - for max_bucket in self.max_liquidity_offset_history.buckets.iter().take(8 - min_idx) { - total_valid_points_tracked += (*min_bucket as u64) * (*max_bucket as u64); - } - } - // If the total valid points is smaller than 1.0 (i.e. 32 in our fixed-point scheme), treat - // it as if we were fully decayed. - if total_valid_points_tracked.checked_shr(required_decays).unwrap_or(0) < 32*32 { - return None; - } - - let mut cumulative_success_prob_times_billion = 0; - for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() { - for (max_idx, max_bucket) in self.max_liquidity_offset_history.buckets.iter().enumerate().take(8 - min_idx) { - let bucket_prob_times_million = (*min_bucket as u64) * (*max_bucket as u64) - * 1024 * 1024 / total_valid_points_tracked; - let min_64th_bucket = min_idx as u8 * 9; - let max_64th_bucket = (7 - max_idx as u8) * 9 + 1; - if payment_amt_64th_bucket > max_64th_bucket { - // Success probability 0, the payment amount is above the max liquidity - } else if payment_amt_64th_bucket <= min_64th_bucket { - cumulative_success_prob_times_billion += bucket_prob_times_million * 1024; - } else { - cumulative_success_prob_times_billion += bucket_prob_times_million * - ((max_64th_bucket - payment_amt_64th_bucket) as u64) * 1024 / - ((max_64th_bucket - min_64th_bucket) as u64); - } - } - } - - Some(cumulative_success_prob_times_billion) - } -} - /// Accounting for channel liquidity balance uncertainty. /// /// Direction is defined in terms of [`NodeId`] partial ordering, where the source node is the @@ -1704,6 +1549,166 @@ mod approx { } } +mod bucketed_history { + use super::*; + + /// Tracks the historical state of a distribution as a weighted average of how much time was spent + /// in each of 8 buckets. + #[derive(Clone, Copy)] + pub(super) struct HistoricalBucketRangeTracker { + buckets: [u16; 8], + } + + impl HistoricalBucketRangeTracker { + pub(super) fn new() -> Self { Self { buckets: [0; 8] } } + pub(super) fn track_datapoint(&mut self, liquidity_offset_msat: u64, capacity_msat: u64) { + // We have 8 leaky buckets for min and max liquidity. Each bucket tracks the amount of time + // we spend in each bucket as a 16-bit fixed-point number with a 5 bit fractional part. + // + // Each time we update our liquidity estimate, we add 32 (1.0 in our fixed-point system) to + // the buckets for the current min and max liquidity offset positions. + // + // We then decay each bucket by multiplying by 2047/2048 (avoiding dividing by a + // non-power-of-two). This ensures we can't actually overflow the u16 - when we get to + // 63,457 adding 32 and decaying by 2047/2048 leaves us back at 63,457. + // + // In total, this allows us to track data for the last 8,000 or so payments across a given + // channel. + // + // These constants are a balance - we try to fit in 2 bytes per bucket to reduce overhead, + // and need to balance having more bits in the decimal part (to ensure decay isn't too + // non-linear) with having too few bits in the mantissa, causing us to not store very many + // datapoints. + // + // The constants were picked experimentally, selecting a decay amount that restricts us + // from overflowing buckets without having to cap them manually. + + // Ensure the bucket index is in the range [0, 7], even if the liquidity offset is zero or + // the channel's capacity, though the second should generally never happen. + debug_assert!(liquidity_offset_msat <= capacity_msat); + let bucket_idx: u8 = (liquidity_offset_msat * 8 / capacity_msat.saturating_add(1)) + .try_into().unwrap_or(32); // 32 is bogus for 8 buckets, and will be ignored + debug_assert!(bucket_idx < 8); + if bucket_idx < 8 { + for e in self.buckets.iter_mut() { + *e = ((*e as u32) * 2047 / 2048) as u16; + } + self.buckets[bucket_idx as usize] = self.buckets[bucket_idx as usize].saturating_add(32); + } + } + /// Decay all buckets by the given number of half-lives. Used to more aggressively remove old + /// datapoints as we receive newer information. + pub(super) fn time_decay_data(&mut self, half_lives: u32) { + for e in self.buckets.iter_mut() { + *e = e.checked_shr(half_lives).unwrap_or(0); + } + } + } + + impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) }); + + pub(super) struct HistoricalMinMaxBuckets<'a> { + pub(super) min_liquidity_offset_history: &'a HistoricalBucketRangeTracker, + pub(super) max_liquidity_offset_history: &'a HistoricalBucketRangeTracker, + } + + impl HistoricalMinMaxBuckets<'_> { + #[inline] + pub(super) fn get_decayed_buckets(&self, now: T, last_updated: T, half_life: Duration) + -> ([u16; 8], [u16; 8], u32) { + let required_decays = now.duration_since(last_updated).as_secs() + .checked_div(half_life.as_secs()) + .map_or(u32::max_value(), |decays| cmp::min(decays, u32::max_value() as u64) as u32); + let mut min_buckets = *self.min_liquidity_offset_history; + min_buckets.time_decay_data(required_decays); + let mut max_buckets = *self.max_liquidity_offset_history; + max_buckets.time_decay_data(required_decays); + (min_buckets.buckets, max_buckets.buckets, required_decays) + } + + #[inline] + pub(super) fn calculate_success_probability_times_billion( + &self, now: T, last_updated: T, half_life: Duration, amount_msat: u64, capacity_msat: u64) + -> Option { + // If historical penalties are enabled, calculate the penalty by walking the set of + // historical liquidity bucket (min, max) combinations (where min_idx < max_idx) and, for + // each, calculate the probability of success given our payment amount, then total the + // weighted average probability of success. + // + // We use a sliding scale to decide which point within a given bucket will be compared to + // the amount being sent - for lower-bounds, the amount being sent is compared to the lower + // edge of the first bucket (i.e. zero), but compared to the upper 7/8ths of the last + // bucket (i.e. 9 times the index, or 63), with each bucket in between increasing the + // comparison point by 1/64th. For upper-bounds, the same applies, however with an offset + // of 1/64th (i.e. starting at one and ending at 64). This avoids failing to assign + // penalties to channels at the edges. + // + // If we used the bottom edge of buckets, we'd end up never assigning any penalty at all to + // such a channel when sending less than ~0.19% of the channel's capacity (e.g. ~200k sats + // for a 1 BTC channel!). + // + // If we used the middle of each bucket we'd never assign any penalty at all when sending + // less than 1/16th of a channel's capacity, or 1/8th if we used the top of the bucket. + let mut total_valid_points_tracked = 0; + + let payment_amt_64th_bucket: u8 = if amount_msat < u64::max_value() / 64 { + (amount_msat * 64 / capacity_msat.saturating_add(1)) + .try_into().unwrap_or(65) + } else { + // Only use 128-bit arithmetic when multiplication will overflow to avoid 128-bit + // division. This branch should only be hit in fuzz testing since the amount would + // need to be over 2.88 million BTC in practice. + ((amount_msat as u128) * 64 / (capacity_msat as u128).saturating_add(1)) + .try_into().unwrap_or(65) + }; + #[cfg(not(fuzzing))] + debug_assert!(payment_amt_64th_bucket <= 64); + if payment_amt_64th_bucket >= 64 { return None; } + + // Check if all our buckets are zero, once decayed and treat it as if we had no data. We + // don't actually use the decayed buckets, though, as that would lose precision. + let (decayed_min_buckets, decayed_max_buckets, required_decays) = + self.get_decayed_buckets(now, last_updated, half_life); + if decayed_min_buckets.iter().all(|v| *v == 0) || decayed_max_buckets.iter().all(|v| *v == 0) { + return None; + } + + for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() { + for max_bucket in self.max_liquidity_offset_history.buckets.iter().take(8 - min_idx) { + total_valid_points_tracked += (*min_bucket as u64) * (*max_bucket as u64); + } + } + // If the total valid points is smaller than 1.0 (i.e. 32 in our fixed-point scheme), treat + // it as if we were fully decayed. + if total_valid_points_tracked.checked_shr(required_decays).unwrap_or(0) < 32*32 { + return None; + } + + let mut cumulative_success_prob_times_billion = 0; + for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() { + for (max_idx, max_bucket) in self.max_liquidity_offset_history.buckets.iter().enumerate().take(8 - min_idx) { + let bucket_prob_times_million = (*min_bucket as u64) * (*max_bucket as u64) + * 1024 * 1024 / total_valid_points_tracked; + let min_64th_bucket = min_idx as u8 * 9; + let max_64th_bucket = (7 - max_idx as u8) * 9 + 1; + if payment_amt_64th_bucket > max_64th_bucket { + // Success probability 0, the payment amount is above the max liquidity + } else if payment_amt_64th_bucket <= min_64th_bucket { + cumulative_success_prob_times_billion += bucket_prob_times_million * 1024; + } else { + cumulative_success_prob_times_billion += bucket_prob_times_million * + ((max_64th_bucket - payment_amt_64th_bucket) as u64) * 1024 / + ((max_64th_bucket - min_64th_bucket) as u64); + } + } + } + + Some(cumulative_success_prob_times_billion) + } + } +} +use bucketed_history::{HistoricalBucketRangeTracker, HistoricalMinMaxBuckets}; + impl>, L: Deref, T: Time> Writeable for ProbabilisticScorerUsingTime where L::Target: Logger { #[inline] fn write(&self, w: &mut W) -> Result<(), io::Error> { From 2bd2637b7e0fe338fa7841ba5473991b0c5150d1 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 23 Aug 2023 00:46:18 +0000 Subject: [PATCH 6/6] Store a `HistoricalMinMaxBuckets` in `DirectedChannelLiquidity` This removes the need to reconstruct the struct in a number of places by simply creating it up front. --- lightning/src/routing/scoring.rs | 66 +++++++++++++------------------- 1 file changed, 27 insertions(+), 39 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 9c337f9b9..cae22f769 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -673,8 +673,7 @@ struct ChannelLiquidity { struct DirectedChannelLiquidity, BRT: Deref, T: Time, U: Deref> { min_liquidity_offset_msat: L, max_liquidity_offset_msat: L, - min_liquidity_offset_history: BRT, - max_liquidity_offset_history: BRT, + liquidity_history: HistoricalMinMaxBuckets, inflight_htlc_msat: u64, capacity_msat: u64, last_updated: U, @@ -715,12 +714,9 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU let amt = directed_info.effective_capacity().as_msat(); let dir_liq = liq.as_directed(source, target, 0, amt, self.decay_params); - let buckets = HistoricalMinMaxBuckets { - min_liquidity_offset_history: &dir_liq.min_liquidity_offset_history, - max_liquidity_offset_history: &dir_liq.max_liquidity_offset_history, - }; - let (min_buckets, max_buckets, _) = buckets.get_decayed_buckets(now, - *dir_liq.last_updated, self.decay_params.historical_no_updates_half_life); + let (min_buckets, max_buckets, _) = dir_liq.liquidity_history + .get_decayed_buckets(now, *dir_liq.last_updated, + self.decay_params.historical_no_updates_half_life); log_debug!(self.logger, core::concat!( "Liquidity from {} to {} via {} is in the range ({}, {}).\n", @@ -797,12 +793,9 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU let amt = directed_info.effective_capacity().as_msat(); let dir_liq = liq.as_directed(source, target, 0, amt, self.decay_params); - let buckets = HistoricalMinMaxBuckets { - min_liquidity_offset_history: &dir_liq.min_liquidity_offset_history, - max_liquidity_offset_history: &dir_liq.max_liquidity_offset_history, - }; - let (min_buckets, mut max_buckets, _) = buckets.get_decayed_buckets(dir_liq.now, - *dir_liq.last_updated, self.decay_params.historical_no_updates_half_life); + let (min_buckets, mut max_buckets, _) = dir_liq.liquidity_history + .get_decayed_buckets(dir_liq.now, *dir_liq.last_updated, + self.decay_params.historical_no_updates_half_life); // Note that the liquidity buckets are an offset from the edge, so we inverse // the max order to get the probabilities from zero. max_buckets.reverse(); @@ -831,14 +824,9 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU let capacity_msat = directed_info.effective_capacity().as_msat(); let dir_liq = liq.as_directed(source, target, 0, capacity_msat, self.decay_params); - let buckets = HistoricalMinMaxBuckets { - min_liquidity_offset_history: &dir_liq.min_liquidity_offset_history, - max_liquidity_offset_history: &dir_liq.max_liquidity_offset_history, - }; - - return buckets.calculate_success_probability_times_billion(dir_liq.now, - *dir_liq.last_updated, self.decay_params.historical_no_updates_half_life, - amount_msat, capacity_msat + return dir_liq.liquidity_history.calculate_success_probability_times_billion( + dir_liq.now, *dir_liq.last_updated, + self.decay_params.historical_no_updates_half_life, amount_msat, capacity_msat ).map(|p| p as f64 / (1024 * 1024 * 1024) as f64); } } @@ -876,8 +864,10 @@ impl ChannelLiquidity { DirectedChannelLiquidity { min_liquidity_offset_msat, max_liquidity_offset_msat, - min_liquidity_offset_history, - max_liquidity_offset_history, + liquidity_history: HistoricalMinMaxBuckets { + min_liquidity_offset_history, + max_liquidity_offset_history, + }, inflight_htlc_msat, capacity_msat, last_updated: &self.last_updated, @@ -903,8 +893,10 @@ impl ChannelLiquidity { DirectedChannelLiquidity { min_liquidity_offset_msat, max_liquidity_offset_msat, - min_liquidity_offset_history, - max_liquidity_offset_history, + liquidity_history: HistoricalMinMaxBuckets { + min_liquidity_offset_history, + max_liquidity_offset_history, + }, inflight_htlc_msat, capacity_msat, last_updated: &mut self.last_updated, @@ -973,11 +965,7 @@ impl, BRT: Deref, if score_params.historical_liquidity_penalty_multiplier_msat != 0 || score_params.historical_liquidity_penalty_amount_multiplier_msat != 0 { - let buckets = HistoricalMinMaxBuckets { - min_liquidity_offset_history: &self.min_liquidity_offset_history, - max_liquidity_offset_history: &self.max_liquidity_offset_history, - }; - if let Some(cumulative_success_prob_times_billion) = buckets + if let Some(cumulative_success_prob_times_billion) = self.liquidity_history .calculate_success_probability_times_billion(self.now, *self.last_updated, self.decay_params.historical_no_updates_half_life, amount_msat, self.capacity_msat) { @@ -1087,15 +1075,15 @@ impl, BRT: DerefMut { - pub(super) min_liquidity_offset_history: &'a HistoricalBucketRangeTracker, - pub(super) max_liquidity_offset_history: &'a HistoricalBucketRangeTracker, + pub(super) struct HistoricalMinMaxBuckets> { + pub(super) min_liquidity_offset_history: D, + pub(super) max_liquidity_offset_history: D, } - impl HistoricalMinMaxBuckets<'_> { + impl> HistoricalMinMaxBuckets { #[inline] pub(super) fn get_decayed_buckets(&self, now: T, last_updated: T, half_life: Duration) -> ([u16; 8], [u16; 8], u32) {