Cleanup and refactoring

This commit is contained in:
Florian Reimair 2020-02-21 13:08:38 +01:00 committed by chimp1984
parent 292f057f22
commit 6c1d7509a1
No known key found for this signature in database
GPG key ID: 9801B4EC591F90E3

View file

@ -50,27 +50,12 @@ import static com.google.common.base.Preconditions.checkNotNull;
*/
@Slf4j
public class P2PMarketStats extends P2PSeedNodeSnapshotBase {
final Map<NodeAddress, Statistics<Counter>> versionBucketsPerHost = new ConcurrentHashMap<>();
final Map<NodeAddress, Statistics<Aggregator>> versionBucketsPerHost = new ConcurrentHashMap<>();
final Map<NodeAddress, Statistics<Aggregator>> offerVolumeBucketsPerHost = new ConcurrentHashMap<>();
final Map<NodeAddress, Statistics<List<Long>>> offerVolumeDistributionBucketsPerHost = new ConcurrentHashMap<>();
final Map<NodeAddress, Statistics<Map<NodeAddress, Counter>>> offersPerTraderBucketsPerHost = new ConcurrentHashMap<>();
final Map<NodeAddress, Statistics<Map<NodeAddress, Aggregator>>> offersPerTraderBucketsPerHost = new ConcurrentHashMap<>();
final Map<NodeAddress, Statistics<Map<NodeAddress, Aggregator>>> volumePerTraderBucketsPerHost = new ConcurrentHashMap<>();
/**
* Efficient way to count occurrences.
*/
private class Counter {
private long value = 0;
synchronized long value() {
return value;
}
synchronized void increment() {
value++;
}
}
/**
* Efficient way to aggregate numbers.
*/
@ -81,85 +66,78 @@ public class P2PMarketStats extends P2PSeedNodeSnapshotBase {
return value;
}
synchronized void increment() {
value++;
}
synchronized void add(long amount) {
value += amount;
}
}
private class OfferCountStatistics extends Statistics<Counter> {
private abstract class OfferStatistics<T> extends Statistics<T> {
@Override
public synchronized void log(Object message) {
if (message instanceof OfferPayload) {
OfferPayload currentMessage = (OfferPayload) message;
// For logging different data types
String market = currentMessage.getDirection() + "." + currentMessage.getBaseCurrencyCode() + "_" + currentMessage.getCounterCurrencyCode();
buckets.putIfAbsent(market, new Counter());
buckets.get(market).increment();
process(market, currentMessage);
}
}
abstract void process(String market, OfferPayload currentMessage);
}
private class OfferVolumeStatistics extends Statistics<Aggregator> {
@Override
public synchronized void log(Object message) {
if (message instanceof OfferPayload) {
OfferPayload currentMessage = (OfferPayload) message;
String market = currentMessage.getDirection() + "." + currentMessage.getBaseCurrencyCode() + "_" + currentMessage.getCounterCurrencyCode();
buckets.putIfAbsent(market, new Aggregator());
buckets.get(market).add(currentMessage.getAmount());
}
}
}
private class OfferVolumeDistributionStatistics extends Statistics<List<Long>> {
private class OfferCountStatistics extends OfferStatistics<Aggregator> {
@Override
public synchronized void log(Object message) {
if (message instanceof OfferPayload) {
OfferPayload currentMessage = (OfferPayload) message;
String market = currentMessage.getDirection() + "." + currentMessage.getBaseCurrencyCode() + "_" + currentMessage.getCounterCurrencyCode();
buckets.putIfAbsent(market, new ArrayList<>());
buckets.get(market).add(currentMessage.getAmount());
}
void process(String market, OfferPayload currentMessage) {
buckets.putIfAbsent(market, new Aggregator());
buckets.get(market).increment();
}
}
private class OffersPerTraderStatistics extends Statistics<Map<NodeAddress, Counter>> {
private class OfferVolumeStatistics extends OfferStatistics<Aggregator> {
@Override
public synchronized void log(Object message) {
if (message instanceof OfferPayload) {
OfferPayload currentMessage = (OfferPayload) message;
String market = currentMessage.getDirection() + "." + currentMessage.getBaseCurrencyCode() + "_" + currentMessage.getCounterCurrencyCode();
buckets.putIfAbsent(market, new HashMap<>());
buckets.get(market).putIfAbsent(currentMessage.getOwnerNodeAddress(), new Counter());
buckets.get(market).get(currentMessage.getOwnerNodeAddress()).increment();
}
void process(String market, OfferPayload currentMessage) {
buckets.putIfAbsent(market, new Aggregator());
buckets.get(market).add(currentMessage.getAmount());
}
}
private class VolumePerTraderStatistics extends Statistics<Map<NodeAddress, Aggregator>> {
private class OfferVolumeDistributionStatistics extends OfferStatistics<List<Long>> {
@Override
public synchronized void log(Object message) {
if (message instanceof OfferPayload) {
OfferPayload currentMessage = (OfferPayload) message;
String market = currentMessage.getDirection() + "." + currentMessage.getBaseCurrencyCode() + "_" + currentMessage.getCounterCurrencyCode();
buckets.putIfAbsent(market, new HashMap<>());
buckets.get(market).putIfAbsent(currentMessage.getOwnerNodeAddress(), new Aggregator());
buckets.get(market).get(currentMessage.getOwnerNodeAddress()).add(currentMessage.getAmount());
}
void process(String market, OfferPayload currentMessage) {
buckets.putIfAbsent(market, new ArrayList<>());
buckets.get(market).add(currentMessage.getAmount());
}
}
private class VersionsStatistics extends Statistics<Counter> {
private class OffersPerTraderStatistics extends OfferStatistics<Map<NodeAddress, Aggregator>> {
@Override
void process(String market, OfferPayload currentMessage) {
buckets.putIfAbsent(market, new HashMap<>());
buckets.get(market).putIfAbsent(currentMessage.getOwnerNodeAddress(), new Aggregator());
buckets.get(market).get(currentMessage.getOwnerNodeAddress()).increment();
}
}
private class VolumePerTraderStatistics extends OfferStatistics<Map<NodeAddress, Aggregator>> {
@Override
void process(String market, OfferPayload currentMessage) {
buckets.putIfAbsent(market, new HashMap<>());
buckets.get(market).putIfAbsent(currentMessage.getOwnerNodeAddress(), new Aggregator());
buckets.get(market).get(currentMessage.getOwnerNodeAddress()).add(currentMessage.getAmount());
}
}
private class VersionsStatistics extends Statistics<Aggregator> {
@Override
public void log(Object message) {
@ -169,7 +147,7 @@ public class P2PMarketStats extends P2PSeedNodeSnapshotBase {
String version = "v" + currentMessage.getId().substring(currentMessage.getId().lastIndexOf("-") + 1);
buckets.putIfAbsent(version, new Counter());
buckets.putIfAbsent(version, new Aggregator());
buckets.get(version).increment();
}
}
@ -189,10 +167,25 @@ public class P2PMarketStats extends P2PSeedNodeSnapshotBase {
return result;
}
protected void createHistogram(List<Long> input, String market, Map<String, String> report) {
int numberOfBins = 5;
// - get biggest offer
double max = input.stream().max(Long::compareTo).get() * 1.01;
// - create histogram
input.stream().collect(
Collectors.groupingBy(aLong -> aLong == max ? (int) numberOfBins - 1 : (int) Math.floor(aLong / (max / numberOfBins)), Collectors.counting())).
forEach((integer, integer2) -> report.put(market + ".bin_" + integer, String.valueOf(integer2)));
report.put(market + ".number_of_bins", String.valueOf((int) numberOfBins));
report.put(market + ".max", String.valueOf((int) max));
}
@Override
protected void report() {
Map<String, String> report = new HashMap<>();
bucketsPerHost.values().stream().findFirst().ifPresent(nodeAddressStatisticsEntry -> nodeAddressStatisticsEntry.values().forEach((market, numberOfOffers) -> report.put(market, String.valueOf(((Counter) numberOfOffers).value()))));
bucketsPerHost.values().stream().findFirst().ifPresent(nodeAddressStatisticsEntry -> nodeAddressStatisticsEntry.values().forEach((market, numberOfOffers) -> report.put(market, String.valueOf(((Aggregator) numberOfOffers).value()))));
reporter.report(report, getName() + ".offerCount");
// do offerbook volume statistics
@ -202,40 +195,21 @@ public class P2PMarketStats extends P2PSeedNodeSnapshotBase {
// do the offer vs volume histogram
report.clear();
int numberOfBins = 5;
// - get a data set
offerVolumeDistributionBucketsPerHost.values().stream().findFirst().ifPresent(listStatistics -> listStatistics.values().forEach((market, offers) -> {
// - get biggest offer
Long max = offers.stream().max(Long::compareTo).get();
// - create histogram
offers.stream().collect(
Collectors.groupingBy(aLong -> aLong == max ? (int) numberOfBins - 1 : (int) Math.floor(aLong / (max / numberOfBins)), Collectors.counting())).
forEach((integer, integer2) -> report.put(market + ".bin_" + integer, String.valueOf(integer2)));
report.put(market + ".number_of_bins", String.valueOf((int) numberOfBins));
createHistogram(offers, market, report);
}));
reporter.report(report, getName() + ".volume-per-offer-distribution");
// do offers per trader
report.clear();
// - get a data set
offersPerTraderBucketsPerHost.values().stream().findFirst().ifPresent(mapStatistics -> mapStatistics.values().forEach((market, stuff) -> {
List<Long> offerPerTrader = stuff.values().stream().map(Counter::value).collect(Collectors.toList());
List<Long> offerPerTrader = stuff.values().stream().map(Aggregator::value).collect(Collectors.toList());
// - get most active trader
double max = offerPerTrader.stream().max(Long::compareTo).get() + 0.01;
// - create histogram
offerPerTrader.stream().collect(
Collectors.groupingBy(aLong -> aLong == max ? (int) numberOfBins - 1 : (int) Math.floor(aLong / (max / numberOfBins)), Collectors.counting())).
forEach((integer, integer2) -> report.put(market + ".bin_" + integer, String.valueOf(integer2)));
report.put(market + ".number_of_traders", String.valueOf(stuff.size()));
report.put(market + ".number_of_bins", String.valueOf((int) numberOfBins));
createHistogram(offerPerTrader, market, report);
}));
reporter.report(report, getName() + ".offersPerTrader");
reporter.report(report, getName() + ".traders_by_number_of_offers");
// do volume per trader
report.clear();
@ -243,18 +217,9 @@ public class P2PMarketStats extends P2PSeedNodeSnapshotBase {
volumePerTraderBucketsPerHost.values().stream().findFirst().ifPresent(mapStatistics -> mapStatistics.values().forEach((market, stuff) -> {
List<Long> volumePerTrader = stuff.values().stream().map(Aggregator::value).collect(Collectors.toList());
// - get most active trader
double max = volumePerTrader.stream().max(Long::compareTo).get() + 0.01;
// - create histogram
volumePerTrader.stream().collect(
Collectors.groupingBy(aLong -> aLong == max ? (int) numberOfBins - 1 : (int) Math.floor(aLong / (max / numberOfBins)), Collectors.counting())).
forEach((integer, integer2) -> report.put(market + ".bin_" + integer, String.valueOf(integer2)));
report.put(market + ".number_of_traders", String.valueOf(stuff.size()));
report.put(market + ".number_of_bins", String.valueOf((int) numberOfBins));
createHistogram(volumePerTrader, market, report);
}));
reporter.report(report, getName() + ".volumePerTrader");
reporter.report(report, getName() + ".traders_by_volume");
// do version statistics
report.clear();