Renamed classes

This commit is contained in:
Florian Reimair 2019-06-05 19:22:29 +02:00
parent 344e0aba9d
commit afcc7c00f5
No known key found for this signature in database
GPG Key ID: 05634D8D7A7954C8
8 changed files with 436 additions and 456 deletions

View File

@ -77,10 +77,10 @@ P2PNetworkLoad.run.torProxyPort=9061
P2PNetworkLoad.run.historySize=500
#P2PNetworkMessageSnapshot Metric
P2PSeedNodeWithDaoSnapshot.enabled=true
P2PSeedNodeWithDaoSnapshot.run.interval=24
P2PSeedNodeWithDaoSnapshot.run.hosts=3f3cu2yw7u457ztq.onion:8000, 723ljisnynbtdohi.onion:8000, fl3mmribyxgrv63c.onion:8000
P2PSeedNodeWithDaoSnapshot.run.torProxyPort=9062
P2PSeedNodeSnapshot.enabled=true
P2PSeedNodeSnapshot.run.interval=24
P2PSeedNodeSnapshot.run.hosts=3f3cu2yw7u457ztq.onion:8000, 723ljisnynbtdohi.onion:8000, fl3mmribyxgrv63c.onion:8000
P2PSeedNodeSnapshot.run.torProxyPort=9062
#P2PMarketStats Metric
P2PMarketStats.enabled=true

View File

@ -20,13 +20,12 @@ package bisq.monitor;
import bisq.monitor.metric.MarketStats;
import bisq.monitor.metric.P2PMarketStats;
import bisq.monitor.metric.P2PNetworkLoad;
import bisq.monitor.metric.P2PSeedNodeSnapshot;
import bisq.monitor.metric.P2PRoundTripTime;
import bisq.monitor.metric.PriceNodeStats;
import bisq.monitor.metric.TorHiddenServiceStartupTime;
import bisq.monitor.metric.TorRoundTripTime;
import bisq.monitor.metric.TorStartupTime;
import bisq.monitor.metric.P2PSeedNodeWithDaoSnapshot;
import bisq.monitor.metric.P2PSeedNodeSnapshot;
import bisq.monitor.reporter.ConsoleReporter;
import bisq.monitor.reporter.GraphiteReporter;
@ -94,7 +93,7 @@ public class Monitor {
metrics.add(new TorHiddenServiceStartupTime(graphiteReporter));
metrics.add(new P2PRoundTripTime(graphiteReporter));
metrics.add(new P2PNetworkLoad(graphiteReporter));
metrics.add(new P2PSeedNodeWithDaoSnapshot(graphiteReporter));
metrics.add(new P2PSeedNodeSnapshot(graphiteReporter));
metrics.add(new P2PMarketStats(graphiteReporter));
metrics.add(new PriceNodeStats(graphiteReporter));
metrics.add(new MarketStats(graphiteReporter));

View File

@ -50,7 +50,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
* @author Florian Reimair
*/
@Slf4j
public class P2PMarketStats extends P2PSeedNodeSnapshot {
public class P2PMarketStats extends P2PSeedNodeSnapshotBase {
private final Set<byte[]> hashes = new TreeSet<>(Arrays::compare);
/**

View File

@ -17,39 +17,44 @@
package bisq.monitor.metric;
import bisq.monitor.AvailableTor;
import bisq.monitor.Metric;
import bisq.monitor.Monitor;
import bisq.monitor.OnionParser;
import bisq.monitor.Reporter;
import bisq.monitor.ThreadGate;
import bisq.core.proto.network.CoreNetworkProtoResolver;
import bisq.core.dao.monitoring.model.StateHash;
import bisq.core.dao.monitoring.network.messages.GetBlindVoteStateHashesRequest;
import bisq.core.dao.monitoring.network.messages.GetDaoStateHashesRequest;
import bisq.core.dao.monitoring.network.messages.GetProposalStateHashesRequest;
import bisq.core.dao.monitoring.network.messages.GetStateHashesResponse;
import bisq.network.p2p.CloseConnectionMessage;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.network.Connection;
import bisq.network.p2p.network.MessageListener;
import bisq.network.p2p.network.NetworkNode;
import bisq.network.p2p.network.TorNetworkNode;
import bisq.network.p2p.peers.getdata.messages.GetDataResponse;
import bisq.network.p2p.peers.getdata.messages.PreliminaryGetDataRequest;
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
import bisq.network.p2p.storage.payload.ProtectedStorageEntry;
import bisq.network.p2p.storage.payload.ProtectedStoragePayload;
import bisq.common.proto.network.NetworkEnvelope;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import java.net.MalformedURLException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Contacts a list of hosts and asks them for all the data excluding persisted messages. The
@ -57,32 +62,67 @@ import org.jetbrains.annotations.NotNull;
* buckets, the Metric reports (for each host) the message types observed and
* their number.
*
* Furthermore, since the DAO is a thing now, the consistency of the DAO state held by each host is assessed and reported.
*
* @author Florian Reimair
*
*/
@Slf4j
abstract public class P2PSeedNodeSnapshot extends Metric implements MessageListener {
public class P2PSeedNodeSnapshot extends P2PSeedNodeSnapshotBase {
private static final String HOSTS = "run.hosts";
private static final String TOR_PROXY_PORT = "run.torProxyPort";
Statistics statistics;
final Map<NodeAddress, Statistics> bucketsPerHost = new ConcurrentHashMap<>();
private final ThreadGate gate = new ThreadGate();
protected final Set<byte[]> hashes = new TreeSet<>(Arrays::compare);
private int daostateheight = 550000;
private int proposalheight = daostateheight;
private int blindvoteheight = daostateheight;
/**
* Statistics Interface for use with derived classes.
*
* @param <T> the value type of the statistics implementation
* Efficient way to count message occurrences.
*/
protected interface Statistics<T> {
private class Counter {
private int value = 0;
Statistics create();
int value() {
return value;
}
void log(Object message);
void increment() {
value++;
}
}
Map<String, T> values();
/**
* Use a counter to do statistics.
*/
private class MyStatistics implements Statistics<Counter> {
void reset();
private final Map<String, Counter> buckets = new HashMap<>();
@Override
public Statistics create() {
return new MyStatistics();
}
@Override
public synchronized void log(Object message) {
// For logging different data types
String className = message.getClass().getSimpleName();
buckets.putIfAbsent(className, new Counter());
buckets.get(className).increment();
}
@Override
public Map<String, Counter> values() {
return buckets;
}
@Override
public synchronized void reset() {
buckets.clear();
}
}
public P2PSeedNodeSnapshot(Reporter reporter) {
@ -103,90 +143,197 @@ abstract public class P2PSeedNodeSnapshot extends Metric implements MessageListe
// .map(e -> e.bytes)
// .collect(Collectors.toSet());
statistics = new MyStatistics();
}
@Override
protected void execute() {
// start the network node
final NetworkNode networkNode = new TorNetworkNode(Integer.parseInt(configuration.getProperty(TOR_PROXY_PORT, "9054")),
new CoreNetworkProtoResolver(), false,
new AvailableTor(Monitor.TOR_WORKING_DIR, "unused"));
// we do not need to start the networkNode, as we do not need the HS
//networkNode.start(this);
protected List<NetworkEnvelope> getRequests() {
List<NetworkEnvelope> result = new ArrayList<>();
// clear our buckets
bucketsPerHost.clear();
Random random = new Random();
result.add(new PreliminaryGetDataRequest(random.nextInt(), hashes));
getRequests().forEach(getDataRequest -> send(networkNode, getDataRequest));
result.add(new GetDaoStateHashesRequest(daostateheight, random.nextInt()));
report();
}
result.add(new GetProposalStateHashesRequest(proposalheight, random.nextInt()));
abstract protected List<NetworkEnvelope> getRequests();
result.add(new GetBlindVoteStateHashesRequest(blindvoteheight, random.nextInt()));
protected void send(NetworkNode networkNode, NetworkEnvelope message) {
ArrayList<Thread> threadList = new ArrayList<>();
// for each configured host
for (String current : configuration.getProperty(HOSTS, "").split(",")) {
threadList.add(new Thread(() -> {
try {
// parse Url
NodeAddress target = OnionParser.getNodeAddress(current);
// do the data request
SettableFuture<Connection> future = networkNode.sendMessage(target, message);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
connection.addMessageListener(P2PSeedNodeSnapshot.this);
}
@Override
public void onFailure(@NotNull Throwable throwable) {
gate.proceed();
log.error(
"Sending PreliminaryDataRequest failed. That is expected if the peer is offline.\n\tException="
+ throwable.getMessage());
}
});
} catch (Exception e) {
gate.proceed(); // release the gate on error
e.printStackTrace();
}
}, current));
}
gate.engage(threadList.size());
// start all threads and wait until they all finished. We do that so we can
// minimize the time between querying the hosts and therefore the chance of
// inconsistencies.
threadList.forEach(Thread::start);
gate.await();
return result;
}
/**
* Report all the stuff. Uses the configured reporter directly.
*/
abstract void report();
void report() {
@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if(treatMessage(networkEnvelope, connection)) {
gate.proceed();
} else if (networkEnvelope instanceof CloseConnectionMessage) {
gate.unlock();
} else {
log.warn("Got an unexpected message of type <{}>",
networkEnvelope.getClass().getSimpleName());
// report
Map<String, String> report = new HashMap<>();
// - assemble histograms
bucketsPerHost.forEach((host, statistics) -> statistics.values().forEach((type, counter) -> report
.put(OnionParser.prettyPrint(host) + ".numberOfMessages." + type, String.valueOf(((Counter) counter).value()))));
// - assemble diffs
// - transfer values
Map<String, Statistics> messagesPerHost = new HashMap<>();
bucketsPerHost.forEach((host, value) -> messagesPerHost.put(OnionParser.prettyPrint(host), value));
// - pick reference seed node and its values
Optional<String> referenceHost = messagesPerHost.keySet().stream().sorted().findFirst();
Map<String, Counter> referenceValues = messagesPerHost.get(referenceHost.get()).values();
// - calculate diffs
messagesPerHost.forEach(
(host, statistics) -> {
statistics.values().forEach((messageType, count) -> {
try {
report.put(OnionParser.prettyPrint(host) + ".relativeNumberOfMessages." + messageType,
String.valueOf(((Counter) count).value() - referenceValues.get(messageType).value()));
} catch (MalformedURLException ignore) {
log.error("we should never got here");
}
});
try {
report.put(OnionParser.prettyPrint(host) + ".referenceHost", referenceHost.get());
} catch (MalformedURLException ignore) {
log.error("we should never got here");
}
});
// cleanup for next run
bucketsPerHost.forEach((host, statistics) -> statistics.reset());
// when our hash cache exceeds a hard limit, we clear the cache and start anew
if (hashes.size() > 150000)
hashes.clear();
// - report
reporter.report(report, getName());
// - assemble dao report
Map<String, String> daoreport = new HashMap<>();
// - transcode
Map<String, Map<NodeAddress, Tuple>> perType = new HashMap<>();
daoData.forEach((nodeAddress, daostatistics) -> daostatistics.values().forEach((type, tuple) -> {
perType.putIfAbsent((String) type, new HashMap<>());
perType.get(type).put(nodeAddress, (Tuple) tuple);
}));
// - process dao data
perType.forEach((type, nodeAddressTupleMap) -> {
// - find head
int head = (int) nodeAddressTupleMap.values().stream().sorted((o1, o2) -> Long.compare(o1.height, o2.height)).findFirst().get().height;
// - update queried height
if(type.contains("DaoState"))
daostateheight = head - 20;
else if(type.contains("Proposal"))
proposalheight = head - 20;
else
blindvoteheight = head - 20;
// - calculate diffs
nodeAddressTupleMap.forEach((nodeAddress, tuple) -> daoreport.put(type + "." + OnionParser.prettyPrint(nodeAddress) + ".head", Long.toString(tuple.height - head)));
// - memorize hashes
Set<ByteBuffer> states = new HashSet<>();
nodeAddressTupleMap.forEach((nodeAddress, tuple) -> states.add(ByteBuffer.wrap(tuple.hash)));
nodeAddressTupleMap.forEach((nodeAddress, tuple) -> daoreport.put(type + "." + OnionParser.prettyPrint(nodeAddress) + ".hash", Integer.toString(Arrays.asList(states.toArray()).indexOf(ByteBuffer.wrap(tuple.hash)))));
});
daoData.clear();
// - report
reporter.report(daoreport, "DaoStateSnapshot");
}
private class Tuple {
private final long height;
private final byte[] hash;
Tuple(long height, byte[] hash) {
this.height = height;
this.hash = hash;
}
}
abstract protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection connection);
private class DaoStatistics implements Statistics<Tuple> {
Map<String, Tuple> buckets = new ConcurrentHashMap<>();
@Override
public Statistics create() {
return new DaoStatistics();
}
@Override
public void log(Object message) {
// get last entry
StateHash last = (StateHash) ((GetStateHashesResponse) message).getStateHashes().get(((GetStateHashesResponse) message).getStateHashes().size() - 1);
// For logging different data types
String className = last.getClass().getSimpleName();
buckets.putIfAbsent(className, new Tuple(last.getHeight(), last.getHash()));
}
@Override
public Map<String, Tuple> values() {
return buckets;
}
@Override
public void reset() {
buckets.clear();
}
}
private Map<NodeAddress, Statistics> daoData = new ConcurrentHashMap<>();
protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection connection) {
checkNotNull(connection.getPeersNodeAddressProperty(),
"although the property is nullable, we need it to not be null");
if (networkEnvelope instanceof GetDataResponse) {
Statistics result = this.statistics.create();
GetDataResponse dataResponse = (GetDataResponse) networkEnvelope;
final Set<ProtectedStorageEntry> dataSet = dataResponse.getDataSet();
dataSet.forEach(e -> {
final ProtectedStoragePayload protectedStoragePayload = e.getProtectedStoragePayload();
if (protectedStoragePayload == null) {
log.warn("StoragePayload was null: {}", networkEnvelope.toString());
return;
}
result.log(protectedStoragePayload);
});
Set<PersistableNetworkPayload> persistableNetworkPayloadSet = dataResponse
.getPersistableNetworkPayloadSet();
if (persistableNetworkPayloadSet != null) {
persistableNetworkPayloadSet.forEach(persistableNetworkPayload -> {
// memorize message hashes
//Byte[] bytes = new Byte[persistableNetworkPayload.getHash().length];
//Arrays.setAll(bytes, n -> persistableNetworkPayload.getHash()[n]);
//hashes.add(bytes);
hashes.add(persistableNetworkPayload.getHash());
});
}
bucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), result);
return true;
} else if (networkEnvelope instanceof GetStateHashesResponse) {
daoData.putIfAbsent(connection.getPeersNodeAddressProperty().getValue(), new DaoStatistics());
daoData.get(connection.getPeersNodeAddressProperty().getValue()).log(networkEnvelope);
return true;
}
return false;
}
}

View File

@ -0,0 +1,173 @@
/*
* This file is part of Bisq.
*
* bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.monitor.metric;
import bisq.monitor.AvailableTor;
import bisq.monitor.Metric;
import bisq.monitor.Monitor;
import bisq.monitor.OnionParser;
import bisq.monitor.Reporter;
import bisq.monitor.ThreadGate;
import bisq.core.proto.network.CoreNetworkProtoResolver;
import bisq.network.p2p.CloseConnectionMessage;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.network.Connection;
import bisq.network.p2p.network.MessageListener;
import bisq.network.p2p.network.NetworkNode;
import bisq.network.p2p.network.TorNetworkNode;
import bisq.common.proto.network.NetworkEnvelope;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
/**
* Contacts a list of hosts and asks them for all the data excluding persisted messages. The
* answers are then compiled into buckets of message types. Based on these
* buckets, the Metric reports (for each host) the message types observed and
* their number.
*
* @author Florian Reimair
*
*/
@Slf4j
abstract public class P2PSeedNodeSnapshotBase extends Metric implements MessageListener {
private static final String HOSTS = "run.hosts";
private static final String TOR_PROXY_PORT = "run.torProxyPort";
Statistics statistics;
final Map<NodeAddress, Statistics> bucketsPerHost = new ConcurrentHashMap<>();
private final ThreadGate gate = new ThreadGate();
/**
* Statistics Interface for use with derived classes.
*
* @param <T> the value type of the statistics implementation
*/
protected interface Statistics<T> {
Statistics create();
void log(Object message);
Map<String, T> values();
void reset();
}
public P2PSeedNodeSnapshotBase(Reporter reporter) {
super(reporter);
}
@Override
protected void execute() {
// start the network node
final NetworkNode networkNode = new TorNetworkNode(Integer.parseInt(configuration.getProperty(TOR_PROXY_PORT, "9054")),
new CoreNetworkProtoResolver(), false,
new AvailableTor(Monitor.TOR_WORKING_DIR, "unused"));
// we do not need to start the networkNode, as we do not need the HS
//networkNode.start(this);
// clear our buckets
bucketsPerHost.clear();
getRequests().forEach(getDataRequest -> send(networkNode, getDataRequest));
report();
}
abstract protected List<NetworkEnvelope> getRequests();
protected void send(NetworkNode networkNode, NetworkEnvelope message) {
ArrayList<Thread> threadList = new ArrayList<>();
// for each configured host
for (String current : configuration.getProperty(HOSTS, "").split(",")) {
threadList.add(new Thread(() -> {
try {
// parse Url
NodeAddress target = OnionParser.getNodeAddress(current);
// do the data request
SettableFuture<Connection> future = networkNode.sendMessage(target, message);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
connection.addMessageListener(P2PSeedNodeSnapshotBase.this);
}
@Override
public void onFailure(@NotNull Throwable throwable) {
gate.proceed();
log.error(
"Sending PreliminaryDataRequest failed. That is expected if the peer is offline.\n\tException="
+ throwable.getMessage());
}
});
} catch (Exception e) {
gate.proceed(); // release the gate on error
e.printStackTrace();
}
}, current));
}
gate.engage(threadList.size());
// start all threads and wait until they all finished. We do that so we can
// minimize the time between querying the hosts and therefore the chance of
// inconsistencies.
threadList.forEach(Thread::start);
gate.await();
}
/**
* Report all the stuff. Uses the configured reporter directly.
*/
abstract void report();
@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if(treatMessage(networkEnvelope, connection)) {
gate.proceed();
} else if (networkEnvelope instanceof CloseConnectionMessage) {
gate.unlock();
} else {
log.warn("Got an unexpected message of type <{}>",
networkEnvelope.getClass().getSimpleName());
}
}
abstract protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection connection);
}

View File

@ -1,339 +0,0 @@
/*
* This file is part of Bisq.
*
* bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.monitor.metric;
import bisq.monitor.OnionParser;
import bisq.monitor.Reporter;
import bisq.core.dao.monitoring.model.StateHash;
import bisq.core.dao.monitoring.network.messages.GetBlindVoteStateHashesRequest;
import bisq.core.dao.monitoring.network.messages.GetDaoStateHashesRequest;
import bisq.core.dao.monitoring.network.messages.GetProposalStateHashesRequest;
import bisq.core.dao.monitoring.network.messages.GetStateHashesResponse;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.network.Connection;
import bisq.network.p2p.peers.getdata.messages.GetDataResponse;
import bisq.network.p2p.peers.getdata.messages.PreliminaryGetDataRequest;
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
import bisq.network.p2p.storage.payload.ProtectedStorageEntry;
import bisq.network.p2p.storage.payload.ProtectedStoragePayload;
import bisq.common.proto.network.NetworkEnvelope;
import java.net.MalformedURLException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Contacts a list of hosts and asks them for all the data excluding persisted messages. The
* answers are then compiled into buckets of message types. Based on these
* buckets, the Metric reports (for each host) the message types observed and
* their number.
*
* Furthermore, since the DAO is a thing now, the consistency of the DAO state held by each host is assessed and reported.
*
* @author Florian Reimair
*
*/
@Slf4j
public class P2PSeedNodeWithDaoSnapshot extends P2PSeedNodeSnapshot {
Statistics statistics;
final Map<NodeAddress, Statistics> bucketsPerHost = new ConcurrentHashMap<>();
protected final Set<byte[]> hashes = new TreeSet<>(Arrays::compare);
private int daostateheight = 550000;
private int proposalheight = daostateheight;
private int blindvoteheight = daostateheight;
/**
* Efficient way to count message occurrences.
*/
private class Counter {
private int value = 0;
int value() {
return value;
}
void increment() {
value++;
}
}
/**
* Use a counter to do statistics.
*/
private class MyStatistics implements Statistics<Counter> {
private final Map<String, Counter> buckets = new HashMap<>();
@Override
public Statistics create() {
return new MyStatistics();
}
@Override
public synchronized void log(Object message) {
// For logging different data types
String className = message.getClass().getSimpleName();
buckets.putIfAbsent(className, new Counter());
buckets.get(className).increment();
}
@Override
public Map<String, Counter> values() {
return buckets;
}
@Override
public synchronized void reset() {
buckets.clear();
}
}
public P2PSeedNodeWithDaoSnapshot(Reporter reporter) {
super(reporter);
// AppendOnlyDataStoreService appendOnlyDataStoreService,
// ProtectedDataStoreService protectedDataStoreService,
// ResourceDataStoreService resourceDataStoreService,
// Storage<SequenceNumberMap> sequenceNumberMapStorage) {
//
// Set<byte[]> excludedKeys = dataStorage.getAppendOnlyDataStoreMap().keySet().stream()
// .map(e -> e.bytes)
// .collect(Collectors.toSet());
//
// Set<byte[]> excludedKeysFromPersistedEntryMap = dataStorage.getProtectedDataStoreMap().keySet()
// .stream()
// .map(e -> e.bytes)
// .collect(Collectors.toSet());
statistics = new MyStatistics();
}
protected List<NetworkEnvelope> getRequests() {
List<NetworkEnvelope> result = new ArrayList<>();
Random random = new Random();
result.add(new PreliminaryGetDataRequest(random.nextInt(), hashes));
result.add(new GetDaoStateHashesRequest(daostateheight, random.nextInt()));
result.add(new GetProposalStateHashesRequest(proposalheight, random.nextInt()));
result.add(new GetBlindVoteStateHashesRequest(blindvoteheight, random.nextInt()));
return result;
}
/**
* Report all the stuff. Uses the configured reporter directly.
*/
void report() {
// report
Map<String, String> report = new HashMap<>();
// - assemble histograms
bucketsPerHost.forEach((host, statistics) -> statistics.values().forEach((type, counter) -> report
.put(OnionParser.prettyPrint(host) + ".numberOfMessages." + type, String.valueOf(((Counter) counter).value()))));
// - assemble diffs
// - transfer values
Map<String, Statistics> messagesPerHost = new HashMap<>();
bucketsPerHost.forEach((host, value) -> messagesPerHost.put(OnionParser.prettyPrint(host), value));
// - pick reference seed node and its values
Optional<String> referenceHost = messagesPerHost.keySet().stream().sorted().findFirst();
Map<String, Counter> referenceValues = messagesPerHost.get(referenceHost.get()).values();
// - calculate diffs
messagesPerHost.forEach(
(host, statistics) -> {
statistics.values().forEach((messageType, count) -> {
try {
report.put(OnionParser.prettyPrint(host) + ".relativeNumberOfMessages." + messageType,
String.valueOf(((Counter) count).value() - referenceValues.get(messageType).value()));
} catch (MalformedURLException ignore) {
log.error("we should never got here");
}
});
try {
report.put(OnionParser.prettyPrint(host) + ".referenceHost", referenceHost.get());
} catch (MalformedURLException ignore) {
log.error("we should never got here");
}
});
// cleanup for next run
bucketsPerHost.forEach((host, statistics) -> statistics.reset());
// when our hash cache exceeds a hard limit, we clear the cache and start anew
if (hashes.size() > 150000)
hashes.clear();
// - report
reporter.report(report, getName());
// - assemble dao report
Map<String, String> daoreport = new HashMap<>();
// - transcode
Map<String, Map<NodeAddress, Tuple>> perType = new HashMap<>();
daoData.forEach((nodeAddress, daostatistics) -> daostatistics.values().forEach((type, tuple) -> {
perType.putIfAbsent((String) type, new HashMap<>());
perType.get(type).put(nodeAddress, (Tuple) tuple);
}));
// - process dao data
perType.forEach((type, nodeAddressTupleMap) -> {
// - find head
int head = (int) nodeAddressTupleMap.values().stream().sorted((o1, o2) -> Long.compare(o1.height, o2.height)).findFirst().get().height;
// - update queried height
if(type.contains("DaoState"))
daostateheight = head - 20;
else if(type.contains("Proposal"))
proposalheight = head - 20;
else
blindvoteheight = head - 20;
// - calculate diffs
nodeAddressTupleMap.forEach((nodeAddress, tuple) -> daoreport.put(type + "." + OnionParser.prettyPrint(nodeAddress) + ".head", Long.toString(tuple.height - head)));
// - memorize hashes
Set<ByteBuffer> states = new HashSet<>();
nodeAddressTupleMap.forEach((nodeAddress, tuple) -> states.add(ByteBuffer.wrap(tuple.hash)));
nodeAddressTupleMap.forEach((nodeAddress, tuple) -> daoreport.put(type + "." + OnionParser.prettyPrint(nodeAddress) + ".hash", Integer.toString(Arrays.asList(states.toArray()).indexOf(ByteBuffer.wrap(tuple.hash)))));
});
daoData.clear();
// - report
reporter.report(daoreport, "DaoStateSnapshot");
}
private class Tuple {
private final long height;
private final byte[] hash;
Tuple(long height, byte[] hash) {
this.height = height;
this.hash = hash;
}
}
private class DaoStatistics implements Statistics<Tuple> {
Map<String, Tuple> buckets = new ConcurrentHashMap<>();
@Override
public Statistics create() {
return new DaoStatistics();
}
@Override
public void log(Object message) {
// get last entry
StateHash last = (StateHash) ((GetStateHashesResponse) message).getStateHashes().get(((GetStateHashesResponse) message).getStateHashes().size() - 1);
// For logging different data types
String className = last.getClass().getSimpleName();
buckets.putIfAbsent(className, new Tuple(last.getHeight(), last.getHash()));
}
@Override
public Map<String, Tuple> values() {
return buckets;
}
@Override
public void reset() {
buckets.clear();
}
}
private Map<NodeAddress, Statistics> daoData = new ConcurrentHashMap<>();
protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection connection) {
checkNotNull(connection.getPeersNodeAddressProperty(),
"although the property is nullable, we need it to not be null");
if (networkEnvelope instanceof GetDataResponse) {
Statistics result = this.statistics.create();
GetDataResponse dataResponse = (GetDataResponse) networkEnvelope;
final Set<ProtectedStorageEntry> dataSet = dataResponse.getDataSet();
dataSet.forEach(e -> {
final ProtectedStoragePayload protectedStoragePayload = e.getProtectedStoragePayload();
if (protectedStoragePayload == null) {
log.warn("StoragePayload was null: {}", networkEnvelope.toString());
return;
}
result.log(protectedStoragePayload);
});
Set<PersistableNetworkPayload> persistableNetworkPayloadSet = dataResponse
.getPersistableNetworkPayloadSet();
if (persistableNetworkPayloadSet != null) {
persistableNetworkPayloadSet.forEach(persistableNetworkPayload -> {
// memorize message hashes
//Byte[] bytes = new Byte[persistableNetworkPayload.getHash().length];
//Arrays.setAll(bytes, n -> persistableNetworkPayload.getHash()[n]);
//hashes.add(bytes);
hashes.add(persistableNetworkPayload.getHash());
});
}
bucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), result);
return true;
} else if (networkEnvelope instanceof GetStateHashesResponse) {
daoData.putIfAbsent(connection.getPeersNodeAddressProperty().getValue(), new DaoStatistics());
daoData.get(connection.getPeersNodeAddressProperty().getValue()).log(networkEnvelope);
return true;
}
return false;
}
}

View File

@ -16,54 +16,54 @@ System.baseCurrencyNetwork=0
#Edit and uncomment the lines below to your liking
#TorStartupTime Metric
TorStartupTime.enabled=true
TorStartupTime.enabled=false
TorStartupTime.run.interval=100
TorStartupTime.run.socksPort=90500
TorRoundTripTime.enabled=true
TorRoundTripTime.enabled=false
TorRoundTripTime.run.interval=100
TorRoundTripTime.run.sampleSize=3
# torproject.org hidden service
TorRoundTripTime.run.hosts=http://expyuzz4wqqyqhjn.onion:80
#TorHiddenServiceStartupTime Metric
TorHiddenServiceStartupTime.enabled=true
TorHiddenServiceStartupTime.enabled=false
TorHiddenServiceStartupTime.run.interval=100
TorHiddenServiceStartupTime.run.localPort=90501
TorHiddenServiceStartupTime.run.servicePort=90511
#P2PRoundTripTime Metric
P2PRoundTripTime.enabled=true
P2PRoundTripTime.enabled=false
P2PRoundTripTime.run.interval=100
P2PRoundTripTime.run.sampleSize=5
P2PRoundTripTime.run.hosts=723ljisnynbtdohi.onion:8000, fl3mmribyxgrv63c.onion:8000
P2PRoundTripTime.run.torProxyPort=9060
#P2PNetworkLoad Metric
P2PNetworkLoad.enabled=true
P2PNetworkLoad.enabled=false
P2PNetworkLoad.run.interval=100
P2PNetworkLoad.run.torProxyPort=9061
P2PNetworkLoad.run.historySize=200
#P2PSeedNodeSnapshot Metric
P2PSeedNodeWithDaoSnapshot.enabled=true
P2PSeedNodeWithDaoSnapshot.run.interval=24
P2PSeedNodeWithDaoSnapshot.run.hosts=3f3cu2yw7u457ztq.onion:8000, 723ljisnynbtdohi.onion:8000, fl3mmribyxgrv63c.onion:8000
P2PSeedNodeWithDaoSnapshot.run.torProxyPort=9062
#P2PSeedNodeSnapshotBase Metric
P2PSeedNodeSnapshot.enabled=true
P2PSeedNodeSnapshot.run.interval=24
P2PSeedNodeSnapshot.run.hosts=3f3cu2yw7u457ztq.onion:8000, 723ljisnynbtdohi.onion:8000, fl3mmribyxgrv63c.onion:8000
P2PSeedNodeSnapshot.run.torProxyPort=9062
#P2PMarketStats Metric
P2PMarketStats.enabled=true
P2PMarketStats.enabled=false
P2PMarketStats.run.interval=37
P2PMarketStats.run.hosts=ef5qnzx6znifo3df.onion:8000
P2PMarketStats.run.torProxyPort=9063
#PriceNodeStats Metric
PriceNodeStats.enabled=true
PriceNodeStats.enabled=false
PriceNodeStats.run.interval=42
PriceNodeStats.run.hosts=http://5bmpx76qllutpcyp.onion, http://xc3nh4juf2hshy7e.onion, http://44mgyoe2b6oqiytt.onion, http://62nvujg5iou3vu3i.onion, http://ceaanhbvluug4we6.onion
#MarketStats Metric
MarketStats.enabled=true
MarketStats.enabled=false
MarketStats.run.interval=191
#Another Metric

View File

@ -121,7 +121,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
// Leaving some constants package-private for tests to know limits.
static final int PERMITTED_MESSAGE_SIZE = 200 * 1024; // 200 kb
static final int MAX_PERMITTED_MESSAGE_SIZE = 15 * 1024 * 1024; // 10 MB (425 offers resulted in about 660 kb, mailbox msg will add more to it) offer has usually 2 kb, mailbox 3kb.
static final int MAX_PERMITTED_MESSAGE_SIZE = 10 * 1024 * 1024; // 10 MB (425 offers resulted in about 660 kb, mailbox msg will add more to it) offer has usually 2 kb, mailbox 3kb.
//TODO decrease limits again after testing
private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(120);