Clean up & optimise (Dao|Price)ChartDataModel

1. Tidy up the stream pipelines which sum over time intervals, by
   summing directly with a grouping collector, instead of wastefully
   collecting to an intermediate map of lists;

2. Move duplicate 'memoize' static method to the base class;

3. Factor out 'getDateFilteredMap' static method, to replace the
   repeated pattern of filtering date keys by a provided predicate and
   collecting into a new map;

4. Use 'Map::replaceAll' instead of the pattern:

      map.entrySet().forEach(e -> e.setValue(updateFn(e.getValue())));

5. Fix a quadratic time bug in 'getBsqMarketCapByInterval' by passing an
   ordered map to 'issuanceAsOfDate', so that it doesn't have to
   repeatedly sort or linearly scan the entire keyset of time intervals,
   to find the latest one before the provided date.
This commit is contained in:
Steven Barclay 2024-02-20 15:46:32 +08:00
parent 2b4fb78d99
commit 712c97826b
No known key found for this signature in database
GPG Key ID: 9FED6BF1176D500B
3 changed files with 77 additions and 136 deletions

View File

@ -23,7 +23,9 @@ import java.time.Instant;
import java.time.temporal.TemporalAdjuster;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
@ -89,13 +91,20 @@ public abstract class ChartDataModel extends ActivatableDataModel {
protected abstract void invalidateCache();
protected Map<Long, Long> getMergedMap(Map<Long, Long> map1,
Map<Long, Long> map2,
BinaryOperator<Long> mergeFunction) {
return Stream.concat(map1.entrySet().stream(),
map2.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue,
mergeFunction));
///////////////////////////////////////////////////////////////////////////////////////////
// Utils
///////////////////////////////////////////////////////////////////////////////////////////
protected static <T, R> Function<T, R> memoize(Function<T, R> fn) {
Map<T, R> map = new ConcurrentHashMap<>();
return x -> map.computeIfAbsent(x, fn);
}
protected static <V> Map<Long, V> getMergedMap(Map<Long, V> map1,
Map<Long, V> map2,
BinaryOperator<V> mergeFunction) {
return Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, mergeFunction));
}
}

View File

@ -32,14 +32,12 @@ import javax.inject.Inject;
import java.time.Instant;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@ -47,8 +45,6 @@ import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
@Slf4j
public class PriceChartDataModel extends ChartDataModel {
private final TradeStatisticsManager tradeStatisticsManager;
@ -266,7 +262,7 @@ public class PriceChartDataModel extends ChartDataModel {
}
private Map<Long, Double> getBsqMarketCapByInterval(Predicate<TradeStatistics3> collectionFilter,
Function<List<TradeStatistics3>, Double> getAveragePriceFunction) {
Function<List<TradeStatistics3>, Double> getAveragePriceFunction) {
var toTimeIntervalFn = toCachedTimeIntervalFn();
return getBsqMarketCapByInterval(tradeStatisticsManager.getObservableTradeStatisticsSet(),
collectionFilter,
@ -276,92 +272,58 @@ public class PriceChartDataModel extends ChartDataModel {
}
private Map<Long, Double> getBsqMarketCapByInterval(Collection<TradeStatistics3> tradeStatistics3s,
Predicate<TradeStatistics3> collectionFilter,
Function<TradeStatistics3, Long> groupByDateFunction,
Predicate<Long> dateFilter,
Function<List<TradeStatistics3>, Double> getAveragePriceFunction) {
Predicate<TradeStatistics3> collectionFilter,
Function<TradeStatistics3, Long> groupByDateFunction,
Predicate<Long> dateFilter,
Function<List<TradeStatistics3>, Double> getAveragePriceFunction) {
Map<Long, List<TradeStatistics3>> pricesGroupedByDate = tradeStatistics3s.stream()
.filter(collectionFilter)
.collect(Collectors.groupingBy(groupByDateFunction));
Stream<Map.Entry<Long,List<TradeStatistics3>>> filteredByDate =
pricesGroupedByDate.entrySet().stream()
.filter(entry -> dateFilter.test(entry.getKey()));
Stream<Map.Entry<Long, List<TradeStatistics3>>> filteredByDate = pricesGroupedByDate.entrySet().stream()
.filter(entry -> dateFilter.test(entry.getKey()));
Map<Long, Double> resultsByDateBucket = filteredByDate
.map(entry -> new AbstractMap.SimpleEntry<>(
entry.getKey(),
getAveragePriceFunction.apply(entry.getValue())))
.filter(e -> e.getValue() > 0d)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (u, v) -> v, HashMap::new));
// apply the available BSQ to the data set
Map<Long, Double> totalSupplyByInterval = getOutstandingBsqByInterval();
resultsByDateBucket.keySet().forEach(dateKey -> {
Double availableBsq = issuanceAsOfDate(totalSupplyByInterval, dateKey)/100;
resultsByDateBucket.put(dateKey, resultsByDateBucket.get(dateKey) * availableBsq); // market cap (price * available BSQ)
NavigableMap<Long, Double> totalSupplyByInterval = getOutstandingBsqByInterval();
resultsByDateBucket.replaceAll((dateKey, result) -> {
double availableBsq = issuanceAsOfDate(totalSupplyByInterval, dateKey) / 100d;
return result * availableBsq; // market cap (price * available BSQ)
});
return resultsByDateBucket;
}
private Double issuanceAsOfDate(@NotNull Map<Long, Double> totalSupplyByInterval, Long dateKey) {
ArrayList<Long> list = new ArrayList<>(totalSupplyByInterval.keySet());
list.sort(Collections.reverseOrder());
Optional<Long> foundKey = list.stream()
.filter(d -> dateKey >= d)
.findFirst();
if (foundKey.isPresent()) {
return totalSupplyByInterval.get(foundKey.get());
}
return 0.0;
private double issuanceAsOfDate(NavigableMap<Long, Double> totalSupplyByInterval, long dateKey) {
var entry = totalSupplyByInterval.floorEntry(dateKey);
return entry != null ? entry.getValue() : 0d;
}
private Map<Long, Double> getOutstandingBsqByInterval() {
private NavigableMap<Long, Double> getOutstandingBsqByInterval() {
Stream<Tx> txStream = daoStateService.getBlocks().stream()
.flatMap(b -> b.getTxs().stream())
.filter(tx -> tx.getBurntFee() > 0);
Map<Long, Double> simpleBurns = txStream
.collect(Collectors.groupingBy(tx ->
toTimeInterval(Instant.ofEpochMilli(tx.getTime()))))
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey,
entry -> entry.getValue().stream()
.mapToDouble(Tx::getBurntBsq)
.sum()));
simpleBurns.forEach((k,v) -> simpleBurns.put(k, -v));
.collect(Collectors.groupingBy(
tx -> toTimeInterval(Instant.ofEpochMilli(tx.getTime())),
Collectors.summingDouble(Tx::getBurntBsq)));
simpleBurns.replaceAll((k, v) -> -v);
Collection<Issuance> issuanceSet = daoStateService.getIssuanceItems();
Map<Long, Double> simpleIssuance = issuanceSet.stream()
.collect(Collectors.groupingBy(issuance ->
toTimeInterval(Instant.ofEpochMilli(blockTimeOfIssuanceFunction.apply(issuance)))))
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey,
entry -> entry.getValue().stream()
.mapToDouble(Issuance::getAmount)
.sum()));
.collect(Collectors.groupingBy(
issuance -> toTimeInterval(Instant.ofEpochMilli(blockTimeOfIssuanceFunction.apply(issuance))),
Collectors.summingDouble(Issuance::getAmount)));
Map<Long, Double> supplyByInterval = Stream.concat(simpleIssuance.entrySet().stream(),
simpleBurns.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue,
Double::sum));
NavigableMap<Long, Double> supplyByInterval = new TreeMap<>(getMergedMap(simpleIssuance, simpleBurns, Double::sum));
ArrayList<Long> listCombined = new ArrayList<>(supplyByInterval.keySet());
Collections.sort(listCombined);
AtomicReference<Double> atomicSum = new AtomicReference<>((double) (daoStateService.getGenesisTotalSupply().value));
listCombined.forEach(k -> supplyByInterval.put(k, atomicSum.accumulateAndGet(supplyByInterval.get(k), Double::sum)));
final double[] partialSum = {daoStateService.getGenesisTotalSupply().value};
supplyByInterval.replaceAll((k, v) -> partialSum[0] += v);
return supplyByInterval;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Utils
///////////////////////////////////////////////////////////////////////////////////////////
private static <T, R> Function<T, R> memoize(Function<T, R> fn) {
Map<T, R> map = new ConcurrentHashMap<>();
return x -> map.computeIfAbsent(x, fn);
}
}

View File

@ -40,8 +40,6 @@ import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@ -146,9 +144,7 @@ public class DaoChartDataModel extends ChartDataModel {
Map<Long, Long> reimbursementMap = getReimbursementByInterval();
Map<Long, Long> burnFromArbitrationMap = getProofOfBurnFromArbitrationByInterval();
Map<Long, Long> mergedMap = getMergedMap(reimbursementMap, burnFromArbitrationMap, (a, b) -> a - b);
arbitrationDiffByInterval = mergedMap.entrySet().stream()
.filter(e -> getPostTagDateFilter().test(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
arbitrationDiffByInterval = getDateFilteredMap(mergedMap, getPostTagDateFilter());
return arbitrationDiffByInterval;
}
@ -162,9 +158,7 @@ public class DaoChartDataModel extends ChartDataModel {
Map<Long, Long> tradeFee = getBsqTradeFeeByInterval();
Map<Long, Long> proofOfBurn = getProofOfBurnFromBtcFeesByInterval();
Map<Long, Long> merged = getMergedMap(tradeFee, proofOfBurn, Long::sum);
totalTradeFeesByInterval = merged.entrySet().stream()
.filter(entry -> entry.getKey() * 1000 >= TAG_DATE.getTimeInMillis())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
totalTradeFeesByInterval = getDateFilteredMap(merged, e -> e * 1000 >= TAG_DATE.getTimeInMillis());
return totalTradeFeesByInterval;
}
@ -207,9 +201,7 @@ public class DaoChartDataModel extends ChartDataModel {
return reimbursementByIntervalAfterTagging;
}
reimbursementByIntervalAfterTagging = getReimbursementByInterval().entrySet().stream()
.filter(e -> getPostTagDateFilter().test(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
reimbursementByIntervalAfterTagging = getDateFilteredMap(getReimbursementByInterval(), getPostTagDateFilter());
return reimbursementByIntervalAfterTagging;
}
@ -236,9 +228,7 @@ public class DaoChartDataModel extends ChartDataModel {
return bsqTradeFeeByIntervalAfterTagging;
}
bsqTradeFeeByIntervalAfterTagging = getBsqTradeFeeByInterval().entrySet().stream()
.filter(e -> getPostTagDateFilter().test(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
bsqTradeFeeByIntervalAfterTagging = getDateFilteredMap(getBsqTradeFeeByInterval(), getPostTagDateFilter());
return bsqTradeFeeByIntervalAfterTagging;
}
@ -256,17 +246,14 @@ public class DaoChartDataModel extends ChartDataModel {
return miscBurnByInterval;
}
miscBurnByInterval = daoStateService.getBurntFeeTxs().stream()
Map<Long, Long> allMiscBurnByInterval = getBurntFeeTxStream()
.filter(e -> e.getTxType() != TxType.PAY_TRADE_FEE)
.filter(e -> e.getTxType() != TxType.PROOF_OF_BURN)
.collect(Collectors.groupingBy(tx -> toTimeInterval(Instant.ofEpochMilli(tx.getTime()))))
.entrySet()
.stream()
.filter(entry -> dateFilter.test(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey,
entry -> entry.getValue().stream()
.mapToLong(Tx::getBurntBsq)
.sum()));
.collect(Collectors.groupingBy(
tx -> toTimeInterval(Instant.ofEpochMilli(tx.getTime())),
Collectors.summingLong(Tx::getBurntBsq)));
miscBurnByInterval = getDateFilteredMap(allMiscBurnByInterval, dateFilter);
return miscBurnByInterval;
}
@ -308,21 +295,14 @@ public class DaoChartDataModel extends ChartDataModel {
Collection<Issuance> issuanceSetForType = daoStateService.getIssuanceItems();
// get all issued and burnt BSQ, not just the filtered date range
Map<Long, Long> tmpIssuedByInterval = getIssuedBsqByInterval(issuanceSetForType, e -> true);
Map<Long, Long> tmpBurnedByInterval = new TreeMap<>(getBurntBsqByInterval(getBurntFeeTxStream(), e -> true)
.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> -e.getValue())));
Map<Long, Long> tmpSupplyByInterval = getMergedMap(tmpIssuedByInterval, tmpBurnedByInterval, Long::sum);
Map<Long, Long> tmpBurnedByInterval = getBurntBsqByInterval(getBurntFeeTxStream(), e -> true);
tmpBurnedByInterval.replaceAll((k, v) -> -v);
totalSupplyByInterval = new TreeMap<>(tmpSupplyByInterval.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
AtomicReference<Long> atomicSum = new AtomicReference<>(genesisValue);
totalSupplyByInterval.entrySet().forEach(e -> e.setValue(
atomicSum.accumulateAndGet(e.getValue(), Long::sum)
));
Map<Long, Long> tmpSupplyByInterval = new TreeMap<>(getMergedMap(tmpIssuedByInterval, tmpBurnedByInterval, Long::sum));
final long[] partialSum = {genesisValue};
tmpSupplyByInterval.replaceAll((k, v) -> partialSum[0] += v);
// now apply the requested date filter
totalSupplyByInterval = totalSupplyByInterval.entrySet().stream()
.filter(e -> dateFilter.test(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
totalSupplyByInterval = getDateFilteredMap(tmpSupplyByInterval, dateFilter);
return totalSupplyByInterval;
}
@ -332,8 +312,8 @@ public class DaoChartDataModel extends ChartDataModel {
}
Map<Long, Long> issued = getTotalIssuedByInterval();
Map<Long, Long> burned = new TreeMap<>(getTotalBurnedByInterval().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> -e.getValue())));
Map<Long, Long> burned = getTotalBurnedByInterval();
burned.replaceAll((k, v) -> -v);
supplyChangeByInterval = getMergedMap(issued, burned, Long::sum);
return supplyChangeByInterval;
}
@ -343,21 +323,15 @@ public class DaoChartDataModel extends ChartDataModel {
///////////////////////////////////////////////////////////////////////////////////////////
private Map<Long, Long> getIssuedBsqByInterval(Collection<Issuance> issuanceSet, Predicate<Long> dateFilter) {
return issuanceSet.stream()
.collect(Collectors.groupingBy(issuance ->
toTimeInterval(Instant.ofEpochMilli(blockTimeOfIssuanceFunction.apply(issuance)))))
.entrySet()
.stream()
.filter(entry -> dateFilter.test(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey,
entry -> entry.getValue().stream()
.mapToLong(Issuance::getAmount)
.sum()));
var allIssuedBsq = issuanceSet.stream()
.collect(Collectors.groupingBy(
issuance -> toTimeInterval(Instant.ofEpochMilli(blockTimeOfIssuanceFunction.apply(issuance))),
Collectors.summingLong(Issuance::getAmount)));
return getDateFilteredMap(allIssuedBsq, dateFilter);
}
private Map<Long, Long> getHistoricalIssuedBsqByInterval(Map<Long, Long> historicalData,
Predicate<Long> dateFilter) {
return historicalData.entrySet().stream()
.filter(e -> dateFilter.test(e.getKey()))
.collect(Collectors.toMap(e -> toTimeInterval(Instant.ofEpochSecond(e.getKey())),
@ -367,15 +341,10 @@ public class DaoChartDataModel extends ChartDataModel {
private Map<Long, Long> getBurntBsqByInterval(Stream<Tx> txStream, Predicate<Long> dateFilter) {
var toTimeIntervalFn = toCachedTimeIntervalFn();
return txStream
.collect(Collectors.groupingBy(tx -> toTimeIntervalFn.applyAsLong(Instant.ofEpochMilli(tx.getTime()))))
.entrySet()
.stream()
.filter(entry -> dateFilter.test(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey,
entry -> entry.getValue().stream()
.mapToLong(Tx::getBurntBsq)
.sum()));
var allBurntBsq = txStream.collect(Collectors.groupingBy(
tx -> toTimeIntervalFn.applyAsLong(Instant.ofEpochMilli(tx.getTime())),
Collectors.summingLong(Tx::getBurntBsq)));
return getDateFilteredMap(allBurntBsq, dateFilter);
}
private Predicate<Long> getPostTagDateFilter() {
@ -402,9 +371,10 @@ public class DaoChartDataModel extends ChartDataModel {
// Utils
///////////////////////////////////////////////////////////////////////////////////////////
private static <T, R> Function<T, R> memoize(Function<T, R> fn) {
Map<T, R> map = new ConcurrentHashMap<>();
return x -> map.computeIfAbsent(x, fn);
private static <V> Map<Long, V> getDateFilteredMap(Map<Long, V> map, Predicate<Long> dateFilter) {
return map.entrySet().stream()
.filter(e -> dateFilter.test(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (u, v) -> v, HashMap::new));
}