Move the bucketed history tracking logic into a scoring submodule

This commit is contained in:
Matt Corallo 2023-05-20 23:31:57 +00:00
parent 534d7317cf
commit 2a1dff4c10

View file

@ -649,161 +649,6 @@ impl ProbabilisticScoringDecayParameters {
}
}
/// Tracks the historical state of a distribution as a weighted average of how much time was spent
/// in each of 8 buckets.
#[derive(Clone, Copy)]
struct HistoricalBucketRangeTracker {
buckets: [u16; 8],
}
impl HistoricalBucketRangeTracker {
fn new() -> Self { Self { buckets: [0; 8] } }
fn track_datapoint(&mut self, liquidity_offset_msat: u64, capacity_msat: u64) {
// We have 8 leaky buckets for min and max liquidity. Each bucket tracks the amount of time
// we spend in each bucket as a 16-bit fixed-point number with a 5 bit fractional part.
//
// Each time we update our liquidity estimate, we add 32 (1.0 in our fixed-point system) to
// the buckets for the current min and max liquidity offset positions.
//
// We then decay each bucket by multiplying by 2047/2048 (avoiding dividing by a
// non-power-of-two). This ensures we can't actually overflow the u16 - when we get to
// 63,457 adding 32 and decaying by 2047/2048 leaves us back at 63,457.
//
// In total, this allows us to track data for the last 8,000 or so payments across a given
// channel.
//
// These constants are a balance - we try to fit in 2 bytes per bucket to reduce overhead,
// and need to balance having more bits in the decimal part (to ensure decay isn't too
// non-linear) with having too few bits in the mantissa, causing us to not store very many
// datapoints.
//
// The constants were picked experimentally, selecting a decay amount that restricts us
// from overflowing buckets without having to cap them manually.
// Ensure the bucket index is in the range [0, 7], even if the liquidity offset is zero or
// the channel's capacity, though the second should generally never happen.
debug_assert!(liquidity_offset_msat <= capacity_msat);
let bucket_idx: u8 = (liquidity_offset_msat * 8 / capacity_msat.saturating_add(1))
.try_into().unwrap_or(32); // 32 is bogus for 8 buckets, and will be ignored
debug_assert!(bucket_idx < 8);
if bucket_idx < 8 {
for e in self.buckets.iter_mut() {
*e = ((*e as u32) * 2047 / 2048) as u16;
}
self.buckets[bucket_idx as usize] = self.buckets[bucket_idx as usize].saturating_add(32);
}
}
/// Decay all buckets by the given number of half-lives. Used to more aggressively remove old
/// datapoints as we receive newer information.
fn time_decay_data(&mut self, half_lives: u32) {
for e in self.buckets.iter_mut() {
*e = e.checked_shr(half_lives).unwrap_or(0);
}
}
}
impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) });
struct HistoricalMinMaxBuckets<'a> {
min_liquidity_offset_history: &'a HistoricalBucketRangeTracker,
max_liquidity_offset_history: &'a HistoricalBucketRangeTracker,
}
impl HistoricalMinMaxBuckets<'_> {
#[inline]
fn get_decayed_buckets<T: Time>(&self, now: T, last_updated: T, half_life: Duration)
-> ([u16; 8], [u16; 8], u32) {
let required_decays = now.duration_since(last_updated).as_secs()
.checked_div(half_life.as_secs())
.map_or(u32::max_value(), |decays| cmp::min(decays, u32::max_value() as u64) as u32);
let mut min_buckets = *self.min_liquidity_offset_history;
min_buckets.time_decay_data(required_decays);
let mut max_buckets = *self.max_liquidity_offset_history;
max_buckets.time_decay_data(required_decays);
(min_buckets.buckets, max_buckets.buckets, required_decays)
}
#[inline]
fn calculate_success_probability_times_billion<T: Time>(
&self, now: T, last_updated: T, half_life: Duration, amount_msat: u64, capacity_msat: u64)
-> Option<u64> {
// If historical penalties are enabled, calculate the penalty by walking the set of
// historical liquidity bucket (min, max) combinations (where min_idx < max_idx) and, for
// each, calculate the probability of success given our payment amount, then total the
// weighted average probability of success.
//
// We use a sliding scale to decide which point within a given bucket will be compared to
// the amount being sent - for lower-bounds, the amount being sent is compared to the lower
// edge of the first bucket (i.e. zero), but compared to the upper 7/8ths of the last
// bucket (i.e. 9 times the index, or 63), with each bucket in between increasing the
// comparison point by 1/64th. For upper-bounds, the same applies, however with an offset
// of 1/64th (i.e. starting at one and ending at 64). This avoids failing to assign
// penalties to channels at the edges.
//
// If we used the bottom edge of buckets, we'd end up never assigning any penalty at all to
// such a channel when sending less than ~0.19% of the channel's capacity (e.g. ~200k sats
// for a 1 BTC channel!).
//
// If we used the middle of each bucket we'd never assign any penalty at all when sending
// less than 1/16th of a channel's capacity, or 1/8th if we used the top of the bucket.
let mut total_valid_points_tracked = 0;
let payment_amt_64th_bucket: u8 = if amount_msat < u64::max_value() / 64 {
(amount_msat * 64 / capacity_msat.saturating_add(1))
.try_into().unwrap_or(65)
} else {
// Only use 128-bit arithmetic when multiplication will overflow to avoid 128-bit
// division. This branch should only be hit in fuzz testing since the amount would
// need to be over 2.88 million BTC in practice.
((amount_msat as u128) * 64 / (capacity_msat as u128).saturating_add(1))
.try_into().unwrap_or(65)
};
#[cfg(not(fuzzing))]
debug_assert!(payment_amt_64th_bucket <= 64);
if payment_amt_64th_bucket >= 64 { return None; }
// Check if all our buckets are zero, once decayed and treat it as if we had no data. We
// don't actually use the decayed buckets, though, as that would lose precision.
let (decayed_min_buckets, decayed_max_buckets, required_decays) =
self.get_decayed_buckets(now, last_updated, half_life);
if decayed_min_buckets.iter().all(|v| *v == 0) || decayed_max_buckets.iter().all(|v| *v == 0) {
return None;
}
for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
for max_bucket in self.max_liquidity_offset_history.buckets.iter().take(8 - min_idx) {
total_valid_points_tracked += (*min_bucket as u64) * (*max_bucket as u64);
}
}
// If the total valid points is smaller than 1.0 (i.e. 32 in our fixed-point scheme), treat
// it as if we were fully decayed.
if total_valid_points_tracked.checked_shr(required_decays).unwrap_or(0) < 32*32 {
return None;
}
let mut cumulative_success_prob_times_billion = 0;
for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
for (max_idx, max_bucket) in self.max_liquidity_offset_history.buckets.iter().enumerate().take(8 - min_idx) {
let bucket_prob_times_million = (*min_bucket as u64) * (*max_bucket as u64)
* 1024 * 1024 / total_valid_points_tracked;
let min_64th_bucket = min_idx as u8 * 9;
let max_64th_bucket = (7 - max_idx as u8) * 9 + 1;
if payment_amt_64th_bucket > max_64th_bucket {
// Success probability 0, the payment amount is above the max liquidity
} else if payment_amt_64th_bucket <= min_64th_bucket {
cumulative_success_prob_times_billion += bucket_prob_times_million * 1024;
} else {
cumulative_success_prob_times_billion += bucket_prob_times_million *
((max_64th_bucket - payment_amt_64th_bucket) as u64) * 1024 /
((max_64th_bucket - min_64th_bucket) as u64);
}
}
}
Some(cumulative_success_prob_times_billion)
}
}
/// Accounting for channel liquidity balance uncertainty.
///
/// Direction is defined in terms of [`NodeId`] partial ordering, where the source node is the
@ -1704,6 +1549,166 @@ mod approx {
}
}
mod bucketed_history {
use super::*;
/// Tracks the historical state of a distribution as a weighted average of how much time was spent
/// in each of 8 buckets.
#[derive(Clone, Copy)]
pub(super) struct HistoricalBucketRangeTracker {
buckets: [u16; 8],
}
impl HistoricalBucketRangeTracker {
pub(super) fn new() -> Self { Self { buckets: [0; 8] } }
pub(super) fn track_datapoint(&mut self, liquidity_offset_msat: u64, capacity_msat: u64) {
// We have 8 leaky buckets for min and max liquidity. Each bucket tracks the amount of time
// we spend in each bucket as a 16-bit fixed-point number with a 5 bit fractional part.
//
// Each time we update our liquidity estimate, we add 32 (1.0 in our fixed-point system) to
// the buckets for the current min and max liquidity offset positions.
//
// We then decay each bucket by multiplying by 2047/2048 (avoiding dividing by a
// non-power-of-two). This ensures we can't actually overflow the u16 - when we get to
// 63,457 adding 32 and decaying by 2047/2048 leaves us back at 63,457.
//
// In total, this allows us to track data for the last 8,000 or so payments across a given
// channel.
//
// These constants are a balance - we try to fit in 2 bytes per bucket to reduce overhead,
// and need to balance having more bits in the decimal part (to ensure decay isn't too
// non-linear) with having too few bits in the mantissa, causing us to not store very many
// datapoints.
//
// The constants were picked experimentally, selecting a decay amount that restricts us
// from overflowing buckets without having to cap them manually.
// Ensure the bucket index is in the range [0, 7], even if the liquidity offset is zero or
// the channel's capacity, though the second should generally never happen.
debug_assert!(liquidity_offset_msat <= capacity_msat);
let bucket_idx: u8 = (liquidity_offset_msat * 8 / capacity_msat.saturating_add(1))
.try_into().unwrap_or(32); // 32 is bogus for 8 buckets, and will be ignored
debug_assert!(bucket_idx < 8);
if bucket_idx < 8 {
for e in self.buckets.iter_mut() {
*e = ((*e as u32) * 2047 / 2048) as u16;
}
self.buckets[bucket_idx as usize] = self.buckets[bucket_idx as usize].saturating_add(32);
}
}
/// Decay all buckets by the given number of half-lives. Used to more aggressively remove old
/// datapoints as we receive newer information.
pub(super) fn time_decay_data(&mut self, half_lives: u32) {
for e in self.buckets.iter_mut() {
*e = e.checked_shr(half_lives).unwrap_or(0);
}
}
}
impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) });
pub(super) struct HistoricalMinMaxBuckets<'a> {
pub(super) min_liquidity_offset_history: &'a HistoricalBucketRangeTracker,
pub(super) max_liquidity_offset_history: &'a HistoricalBucketRangeTracker,
}
impl HistoricalMinMaxBuckets<'_> {
#[inline]
pub(super) fn get_decayed_buckets<T: Time>(&self, now: T, last_updated: T, half_life: Duration)
-> ([u16; 8], [u16; 8], u32) {
let required_decays = now.duration_since(last_updated).as_secs()
.checked_div(half_life.as_secs())
.map_or(u32::max_value(), |decays| cmp::min(decays, u32::max_value() as u64) as u32);
let mut min_buckets = *self.min_liquidity_offset_history;
min_buckets.time_decay_data(required_decays);
let mut max_buckets = *self.max_liquidity_offset_history;
max_buckets.time_decay_data(required_decays);
(min_buckets.buckets, max_buckets.buckets, required_decays)
}
#[inline]
pub(super) fn calculate_success_probability_times_billion<T: Time>(
&self, now: T, last_updated: T, half_life: Duration, amount_msat: u64, capacity_msat: u64)
-> Option<u64> {
// If historical penalties are enabled, calculate the penalty by walking the set of
// historical liquidity bucket (min, max) combinations (where min_idx < max_idx) and, for
// each, calculate the probability of success given our payment amount, then total the
// weighted average probability of success.
//
// We use a sliding scale to decide which point within a given bucket will be compared to
// the amount being sent - for lower-bounds, the amount being sent is compared to the lower
// edge of the first bucket (i.e. zero), but compared to the upper 7/8ths of the last
// bucket (i.e. 9 times the index, or 63), with each bucket in between increasing the
// comparison point by 1/64th. For upper-bounds, the same applies, however with an offset
// of 1/64th (i.e. starting at one and ending at 64). This avoids failing to assign
// penalties to channels at the edges.
//
// If we used the bottom edge of buckets, we'd end up never assigning any penalty at all to
// such a channel when sending less than ~0.19% of the channel's capacity (e.g. ~200k sats
// for a 1 BTC channel!).
//
// If we used the middle of each bucket we'd never assign any penalty at all when sending
// less than 1/16th of a channel's capacity, or 1/8th if we used the top of the bucket.
let mut total_valid_points_tracked = 0;
let payment_amt_64th_bucket: u8 = if amount_msat < u64::max_value() / 64 {
(amount_msat * 64 / capacity_msat.saturating_add(1))
.try_into().unwrap_or(65)
} else {
// Only use 128-bit arithmetic when multiplication will overflow to avoid 128-bit
// division. This branch should only be hit in fuzz testing since the amount would
// need to be over 2.88 million BTC in practice.
((amount_msat as u128) * 64 / (capacity_msat as u128).saturating_add(1))
.try_into().unwrap_or(65)
};
#[cfg(not(fuzzing))]
debug_assert!(payment_amt_64th_bucket <= 64);
if payment_amt_64th_bucket >= 64 { return None; }
// Check if all our buckets are zero, once decayed and treat it as if we had no data. We
// don't actually use the decayed buckets, though, as that would lose precision.
let (decayed_min_buckets, decayed_max_buckets, required_decays) =
self.get_decayed_buckets(now, last_updated, half_life);
if decayed_min_buckets.iter().all(|v| *v == 0) || decayed_max_buckets.iter().all(|v| *v == 0) {
return None;
}
for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
for max_bucket in self.max_liquidity_offset_history.buckets.iter().take(8 - min_idx) {
total_valid_points_tracked += (*min_bucket as u64) * (*max_bucket as u64);
}
}
// If the total valid points is smaller than 1.0 (i.e. 32 in our fixed-point scheme), treat
// it as if we were fully decayed.
if total_valid_points_tracked.checked_shr(required_decays).unwrap_or(0) < 32*32 {
return None;
}
let mut cumulative_success_prob_times_billion = 0;
for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
for (max_idx, max_bucket) in self.max_liquidity_offset_history.buckets.iter().enumerate().take(8 - min_idx) {
let bucket_prob_times_million = (*min_bucket as u64) * (*max_bucket as u64)
* 1024 * 1024 / total_valid_points_tracked;
let min_64th_bucket = min_idx as u8 * 9;
let max_64th_bucket = (7 - max_idx as u8) * 9 + 1;
if payment_amt_64th_bucket > max_64th_bucket {
// Success probability 0, the payment amount is above the max liquidity
} else if payment_amt_64th_bucket <= min_64th_bucket {
cumulative_success_prob_times_billion += bucket_prob_times_million * 1024;
} else {
cumulative_success_prob_times_billion += bucket_prob_times_million *
((max_64th_bucket - payment_amt_64th_bucket) as u64) * 1024 /
((max_64th_bucket - min_64th_bucket) as u64);
}
}
}
Some(cumulative_success_prob_times_billion)
}
}
}
use bucketed_history::{HistoricalBucketRangeTracker, HistoricalMinMaxBuckets};
impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> Writeable for ProbabilisticScorerUsingTime<G, L, T> where L::Target: Logger {
#[inline]
fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {