Merge pull request #3302 from freimair/monitor

Monitor fixes
This commit is contained in:
Christoph Atteneder 2019-09-26 13:41:25 +02:00 committed by GitHub
commit 95304ff98c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 157 additions and 52 deletions

View File

@ -29,6 +29,9 @@ import bisq.monitor.metric.TorStartupTime;
import bisq.monitor.reporter.ConsoleReporter;
import bisq.monitor.reporter.GraphiteReporter;
import bisq.common.app.Capabilities;
import bisq.common.app.Capability;
import org.berndpruenster.netlayer.tor.NativeTor;
import org.berndpruenster.netlayer.tor.Tor;
@ -78,6 +81,16 @@ public class Monitor {
// start Tor
Tor.setDefault(new NativeTor(TOR_WORKING_DIR, null, null, false));
Capabilities.app.addAll(Capability.TRADE_STATISTICS,
Capability.TRADE_STATISTICS_2,
Capability.ACCOUNT_AGE_WITNESS,
Capability.ACK_MSG,
Capability.PROPOSAL,
Capability.BLIND_VOTE,
Capability.DAO_STATE,
Capability.BUNDLE_OF_ENVELOPES,
Capability.MEDIATION);
// assemble Metrics
// - create reporters
Reporter graphiteReporter = new GraphiteReporter();

View File

@ -20,8 +20,13 @@ package bisq.monitor.metric;
import bisq.monitor.OnionParser;
import bisq.monitor.Reporter;
import bisq.core.account.witness.AccountAgeWitnessStore;
import bisq.core.btc.BaseCurrencyNetwork;
import bisq.core.offer.OfferPayload;
import bisq.core.proto.persistable.CorePersistenceProtoResolver;
import bisq.core.trade.statistics.TradeStatistics2Store;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.network.Connection;
import bisq.network.p2p.peers.getdata.messages.GetDataResponse;
import bisq.network.p2p.peers.getdata.messages.PreliminaryGetDataRequest;
@ -29,16 +34,24 @@ import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
import bisq.network.p2p.storage.payload.ProtectedStorageEntry;
import bisq.network.p2p.storage.payload.ProtectedStoragePayload;
import bisq.common.app.Version;
import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.proto.persistable.PersistableEnvelope;
import bisq.common.storage.Storage;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@ -46,13 +59,17 @@ import static com.google.common.base.Preconditions.checkNotNull;
/**
* Demo Stats metric derived from the OfferPayload messages we get from the seed nodes
*
*
* @author Florian Reimair
*/
@Slf4j
public class P2PMarketStats extends P2PSeedNodeSnapshotBase {
private static final String DATABASE_DIR = "run.dbDir";
private final Set<byte[]> hashes = new TreeSet<>(Arrays::compare);
final Map<NodeAddress, Statistics> versionBucketsPerHost = new ConcurrentHashMap<>();
/**
* Efficient way to count occurrences.
*/
@ -100,12 +117,65 @@ public class P2PMarketStats extends P2PSeedNodeSnapshotBase {
}
}
private class VersionsStatistics implements Statistics<Counter> {
private final Map<String, Counter> buckets = new HashMap<>();
@Override
public Statistics create() {
return new VersionsStatistics();
}
@Override
public void log(Object message) {
if (message instanceof OfferPayload) {
OfferPayload currentMessage = (OfferPayload) message;
String version = "v" + currentMessage.getId().substring(currentMessage.getId().lastIndexOf("-") + 1);
buckets.putIfAbsent(version, new Counter());
buckets.get(version).increment();
}
}
@Override
public Map<String, Counter> values() {
return buckets;
}
@Override
public void reset() {
buckets.clear();
}
}
public P2PMarketStats(Reporter graphiteReporter) {
super(graphiteReporter);
statistics = new MyStatistics();
}
@Override
public void configure(Properties properties) {
super.configure(properties);
if (hashes.isEmpty() && configuration.getProperty(DATABASE_DIR) != null) {
File dir = new File(configuration.getProperty(DATABASE_DIR));
String networkPostfix = "_" + BaseCurrencyNetwork.values()[Version.getBaseCurrencyNetwork()].toString();
try {
Storage<PersistableEnvelope> storage = new Storage<>(dir, new CorePersistenceProtoResolver(null, null, null, null), null);
TradeStatistics2Store tradeStatistics2Store = (TradeStatistics2Store) storage.initAndGetPersistedWithFileName(TradeStatistics2Store.class.getSimpleName() + networkPostfix, 0);
hashes.addAll(tradeStatistics2Store.getMap().keySet().stream().map(byteArray -> byteArray.bytes).collect(Collectors.toList()));
AccountAgeWitnessStore accountAgeWitnessStore = (AccountAgeWitnessStore) storage.initAndGetPersistedWithFileName(AccountAgeWitnessStore.class.getSimpleName() + networkPostfix, 0);
hashes.addAll(accountAgeWitnessStore.getMap().keySet().stream().map(byteArray -> byteArray.bytes).collect(Collectors.toList()));
} catch (NullPointerException e) {
// in case there is no store file
log.error("There is no storage file where there should be one: {}", dir.getAbsolutePath());
}
}
}
@Override
protected List<NetworkEnvelope> getRequests() {
List<NetworkEnvelope> result = new ArrayList<>();
@ -122,6 +192,11 @@ public class P2PMarketStats extends P2PSeedNodeSnapshotBase {
bucketsPerHost.forEach((host, statistics) -> statistics.values().forEach((market, numberOfOffers) -> report.put(OnionParser.prettyPrint(host) + "." + market.toString(), String.valueOf(((Counter) numberOfOffers).value()))));
reporter.report(report, getName());
// do version statistics
report.clear();
versionBucketsPerHost.values().stream().findAny().get().values().forEach((version, numberOfOccurrences) -> report.put(version.toString(), String.valueOf(((Counter) numberOfOccurrences).value())));
reporter.report(report, "versions");
}
protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection connection) {
@ -131,6 +206,7 @@ public class P2PMarketStats extends P2PSeedNodeSnapshotBase {
if (networkEnvelope instanceof GetDataResponse) {
Statistics result = this.statistics.create();
VersionsStatistics versions = new VersionsStatistics();
GetDataResponse dataResponse = (GetDataResponse) networkEnvelope;
final Set<ProtectedStorageEntry> dataSet = dataResponse.getDataSet();
@ -142,6 +218,7 @@ public class P2PMarketStats extends P2PSeedNodeSnapshotBase {
}
result.log(protectedStoragePayload);
versions.log(protectedStoragePayload);
});
Set<PersistableNetworkPayload> persistableNetworkPayloadSet = dataResponse
@ -160,6 +237,7 @@ public class P2PMarketStats extends P2PSeedNodeSnapshotBase {
}
bucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), result);
versionBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), versions);
return true;
}
return false;

View File

@ -42,8 +42,6 @@ import bisq.network.p2p.peers.peerexchange.PeerList;
import bisq.network.p2p.storage.messages.BroadcastMessage;
import bisq.common.ClockWatcher;
import bisq.common.app.Capabilities;
import bisq.common.app.Capability;
import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.proto.network.NetworkProtoResolver;
import bisq.common.storage.CorruptedDatabaseFilesHandler;
@ -202,8 +200,6 @@ public class P2PNetworkLoad extends Metric implements MessageListener, SetupList
super.configure(properties);
history = Collections.synchronizedMap(new FixedSizeHistoryTracker<>(Integer.parseInt(configuration.getProperty(HISTORY_SIZE, "200"))));
Capabilities.app.addAll(Capability.DAO_FULL_NODE);
}
/**

View File

@ -20,11 +20,15 @@ package bisq.monitor.metric;
import bisq.monitor.OnionParser;
import bisq.monitor.Reporter;
import bisq.core.account.witness.AccountAgeWitnessStore;
import bisq.core.btc.BaseCurrencyNetwork;
import bisq.core.dao.monitoring.model.StateHash;
import bisq.core.dao.monitoring.network.messages.GetBlindVoteStateHashesRequest;
import bisq.core.dao.monitoring.network.messages.GetDaoStateHashesRequest;
import bisq.core.dao.monitoring.network.messages.GetProposalStateHashesRequest;
import bisq.core.dao.monitoring.network.messages.GetStateHashesResponse;
import bisq.core.proto.persistable.CorePersistenceProtoResolver;
import bisq.core.trade.statistics.TradeStatistics2Store;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.network.Connection;
@ -34,12 +38,17 @@ import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
import bisq.network.p2p.storage.payload.ProtectedStorageEntry;
import bisq.network.p2p.storage.payload.ProtectedStoragePayload;
import bisq.common.app.Version;
import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.proto.persistable.PersistableEnvelope;
import bisq.common.storage.Storage;
import java.net.MalformedURLException;
import java.nio.ByteBuffer;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
@ -47,11 +56,12 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@ -71,35 +81,21 @@ import static com.google.common.base.Preconditions.checkNotNull;
*/
@Slf4j
public class P2PSeedNodeSnapshot extends P2PSeedNodeSnapshotBase {
private static final String DATABASE_DIR = "run.dbDir";
Statistics statistics;
final Map<NodeAddress, Statistics> bucketsPerHost = new ConcurrentHashMap<>();
protected final Set<byte[]> hashes = new TreeSet<>(Arrays::compare);
private int daostateheight = 550000;
private int daostateheight = 594000;
private int proposalheight = daostateheight;
private int blindvoteheight = daostateheight;
/**
* Efficient way to count message occurrences.
*/
private class Counter {
private int value = 0;
int value() {
return value;
}
void increment() {
value++;
}
}
/**
* Use a counter to do statistics.
*/
private class MyStatistics implements Statistics<Counter> {
private class MyStatistics implements Statistics<Set<Integer>> {
private final Map<String, Counter> buckets = new HashMap<>();
private final Map<String, Set<Integer>> buckets = new HashMap<>();
@Override
public Statistics create() {
@ -112,12 +108,12 @@ public class P2PSeedNodeSnapshot extends P2PSeedNodeSnapshotBase {
// For logging different data types
String className = message.getClass().getSimpleName();
buckets.putIfAbsent(className, new Counter());
buckets.get(className).increment();
buckets.putIfAbsent(className, new HashSet<>());
buckets.get(className).add(message.hashCode());
}
@Override
public Map<String, Counter> values() {
public Map<String, Set<Integer>> values() {
return buckets;
}
@ -130,24 +126,30 @@ public class P2PSeedNodeSnapshot extends P2PSeedNodeSnapshotBase {
public P2PSeedNodeSnapshot(Reporter reporter) {
super(reporter);
// AppendOnlyDataStoreService appendOnlyDataStoreService,
// ProtectedDataStoreService protectedDataStoreService,
// ResourceDataStoreService resourceDataStoreService,
// Storage<SequenceNumberMap> sequenceNumberMapStorage) {
//
// Set<byte[]> excludedKeys = dataStorage.getAppendOnlyDataStoreMap().keySet().stream()
// .map(e -> e.bytes)
// .collect(Collectors.toSet());
//
// Set<byte[]> excludedKeysFromPersistedEntryMap = dataStorage.getProtectedDataStoreMap().keySet()
// .stream()
// .map(e -> e.bytes)
// .collect(Collectors.toSet());
statistics = new MyStatistics();
}
@Override
public void configure(Properties properties) {
super.configure(properties);
if (hashes.isEmpty() && configuration.getProperty(DATABASE_DIR) != null) {
File dir = new File(configuration.getProperty(DATABASE_DIR));
String networkPostfix = "_" + BaseCurrencyNetwork.values()[Version.getBaseCurrencyNetwork()].toString();
try {
Storage<PersistableEnvelope> storage = new Storage<>(dir, new CorePersistenceProtoResolver(null, null, null, null), null);
TradeStatistics2Store tradeStatistics2Store = (TradeStatistics2Store) storage.initAndGetPersistedWithFileName(TradeStatistics2Store.class.getSimpleName() + networkPostfix, 0);
hashes.addAll(tradeStatistics2Store.getMap().keySet().stream().map(byteArray -> byteArray.bytes).collect(Collectors.toList()));
AccountAgeWitnessStore accountAgeWitnessStore = (AccountAgeWitnessStore) storage.initAndGetPersistedWithFileName(AccountAgeWitnessStore.class.getSimpleName() + networkPostfix, 0);
hashes.addAll(accountAgeWitnessStore.getMap().keySet().stream().map(byteArray -> byteArray.bytes).collect(Collectors.toList()));
} catch (NullPointerException e) {
// in case there is no store file
log.error("There is no storage file where there should be one: {}", dir.getAbsolutePath());
}
}
}
protected List<NetworkEnvelope> getRequests() {
List<NetworkEnvelope> result = new ArrayList<>();
@ -171,8 +173,8 @@ public class P2PSeedNodeSnapshot extends P2PSeedNodeSnapshotBase {
// report
Map<String, String> report = new HashMap<>();
// - assemble histograms
bucketsPerHost.forEach((host, statistics) -> statistics.values().forEach((type, counter) -> report
.put(OnionParser.prettyPrint(host) + ".numberOfMessages." + type, String.valueOf(((Counter) counter).value()))));
bucketsPerHost.forEach((host, statistics) -> statistics.values().forEach((type, set) -> report
.put(OnionParser.prettyPrint(host) + ".numberOfMessages." + type, Integer.toString(((Set) set).size()))));
// - assemble diffs
// - transfer values
@ -180,22 +182,26 @@ public class P2PSeedNodeSnapshot extends P2PSeedNodeSnapshotBase {
bucketsPerHost.forEach((host, value) -> messagesPerHost.put(OnionParser.prettyPrint(host), value));
// - pick reference seed node and its values
Optional<String> referenceHost = messagesPerHost.keySet().stream().sorted().findFirst();
Map<String, Counter> referenceValues = messagesPerHost.get(referenceHost.get()).values();
String referenceHost = "overall_number_of_unique_messages";
Map<String, Set<Object>> referenceValues = new HashMap<>();
messagesPerHost.forEach((host, statistics) -> statistics.values().forEach((type, set) -> {
referenceValues.putIfAbsent((String) type, new HashSet<>());
referenceValues.get(type).addAll((Set) set);
}));
// - calculate diffs
messagesPerHost.forEach(
(host, statistics) -> {
statistics.values().forEach((messageType, count) -> {
statistics.values().forEach((messageType, set) -> {
try {
report.put(OnionParser.prettyPrint(host) + ".relativeNumberOfMessages." + messageType,
String.valueOf(((Counter) count).value() - referenceValues.get(messageType).value()));
String.valueOf(((Set) set).size() - referenceValues.get(messageType).size()));
} catch (MalformedURLException | NullPointerException ignore) {
log.error("we should never have gotten here", ignore);
}
});
try {
report.put(OnionParser.prettyPrint(host) + ".referenceHost", referenceHost.get());
report.put(OnionParser.prettyPrint(host) + ".referenceHost", referenceHost);
} catch (MalformedURLException ignore) {
log.error("we should never got here");
}
@ -239,8 +245,18 @@ public class P2PSeedNodeSnapshot extends P2PSeedNodeSnapshotBase {
nodeAddressTupleMap.forEach((nodeAddress, tuple) -> daoreport.put(type + "." + OnionParser.prettyPrint(nodeAddress) + ".head", Long.toString(tuple.height - head)));
// - memorize hashes
Set<ByteBuffer> states = new HashSet<>();
nodeAddressTupleMap.forEach((nodeAddress, tuple) -> states.add(ByteBuffer.wrap(tuple.hash)));
Map<ByteBuffer, Integer> hitcount = new HashMap<>();
nodeAddressTupleMap.forEach((nodeAddress, tuple) -> {
ByteBuffer hash = ByteBuffer.wrap(tuple.hash);
if (hitcount.containsKey(hash)) {
hitcount.put(hash, hitcount.get(hash) + 1);
} else
hitcount.put(hash, 1);
});
List<ByteBuffer> states = hitcount.entrySet().stream().sorted((o1, o2) -> o2.getValue().compareTo(o1.getValue())).map(byteBufferIntegerEntry -> byteBufferIntegerEntry.getKey()).collect(Collectors.toList());
hitcount.clear();
nodeAddressTupleMap.forEach((nodeAddress, tuple) -> daoreport.put(type + "." + OnionParser.prettyPrint(nodeAddress) + ".hash", Integer.toString(Arrays.asList(states.toArray()).indexOf(ByteBuffer.wrap(tuple.hash)))));
// - report reference head
@ -300,7 +316,7 @@ public class P2PSeedNodeSnapshot extends P2PSeedNodeSnapshotBase {
protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection connection) {
checkNotNull(connection.getPeersNodeAddressProperty(),
"although the property is nullable, we need it to not be null");
if (networkEnvelope instanceof GetDataResponse) {
Statistics result = this.statistics.create();

View File

@ -47,6 +47,7 @@ P2PNetworkLoad.run.historySize=200
#P2PSeedNodeSnapshotBase Metric
P2PSeedNodeSnapshot.enabled=true
P2PSeedNodeSnapshot.run.dbDir=bisq/p2p/build/resources/main/
P2PSeedNodeSnapshot.run.interval=24
P2PSeedNodeSnapshot.run.hosts=3f3cu2yw7u457ztq.onion:8000, 723ljisnynbtdohi.onion:8000, fl3mmribyxgrv63c.onion:8000
P2PSeedNodeSnapshot.run.torProxyPort=9062
@ -54,6 +55,7 @@ P2PSeedNodeSnapshot.run.torProxyPort=9062
#P2PMarketStats Metric
P2PMarketStats.enabled=false
P2PMarketStats.run.interval=37
P2PMarketStats.run.dbDir=bisq/p2p/build/resources/main/
P2PMarketStats.run.hosts=ef5qnzx6znifo3df.onion:8000
P2PMarketStats.run.torProxyPort=9063