Init proper NetworkLoad Metric

This commit is contained in:
Florian Reimair 2019-02-03 13:40:36 +01:00
parent bf66ddfeef
commit 7c860e005b

View File

@ -17,46 +17,40 @@
package bisq.monitor.metric;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.File;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.jetbrains.annotations.NotNull;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import org.springframework.core.env.PropertySource;
import bisq.common.Clock;
import bisq.common.app.Version;
import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.proto.network.NetworkProtoResolver;
import bisq.core.app.BisqEnvironment;
import bisq.core.btc.BaseCurrencyNetwork;
import bisq.core.btc.BtcOptionKeys;
import bisq.core.network.p2p.seed.DefaultSeedNodeRepository;
import bisq.core.network.p2p.seed.SeedNodeAddressLookup;
import bisq.core.proto.network.CoreNetworkProtoResolver;
import bisq.core.proto.persistable.CorePersistenceProtoResolver;
import bisq.monitor.AvailableTor;
import bisq.monitor.Metric;
import bisq.monitor.Monitor;
import bisq.monitor.OnionParser;
import bisq.monitor.Reporter;
import bisq.monitor.ThreadGate;
import bisq.network.p2p.CloseConnectionMessage;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.network.Connection;
import bisq.network.p2p.network.MessageListener;
import bisq.network.p2p.network.NetworkNode;
import bisq.network.p2p.network.SetupListener;
import bisq.network.p2p.network.TorNetworkNode;
import bisq.network.p2p.peers.getdata.messages.GetDataResponse;
import bisq.network.p2p.peers.getdata.messages.PreliminaryGetDataRequest;
import bisq.network.p2p.storage.P2PDataStorage;
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
import bisq.network.p2p.storage.payload.ProtectedStorageEntry;
import bisq.network.p2p.storage.payload.ProtectedStoragePayload;
import bisq.network.p2p.peers.PeerManager;
import bisq.network.p2p.peers.keepalive.KeepAliveManager;
import bisq.network.p2p.peers.peerexchange.PeerExchangeManager;
import bisq.network.p2p.storage.messages.BroadcastMessage;
import lombok.extern.slf4j.Slf4j;
/**
@ -71,27 +65,34 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class P2PNetworkLoad extends Metric implements MessageListener, SetupListener {
private static final String HOSTS = "run.hosts";
private static final String TOR_PROXY_PORT = "run.torProxyPort";
private static final String MAX_CONNECTIONS = "run.maxConnections";
private NetworkNode networkNode;
private final File torHiddenServiceDir = new File("metric_p2pNetworkLoad");
private int nonce;
private Map<NodeAddress, Map<String, Counter>> bucketsPerHost = new ConcurrentHashMap<>();
private Set<byte[]> hashes = new HashSet<>();
private final ThreadGate hsReady = new ThreadGate();
private final ThreadGate gate = new ThreadGate();
private Map<String, Counter> buckets = new ConcurrentHashMap<>();
private KeepAliveManager keepAliveManager;
/**
* Efficient way to count message occurrences.
*/
private class Counter {
private int value = 0;
private int value = 1;
int value() {
return value;
/**
* atomic get and reset
*
* @return the current value
*/
synchronized int getAndReset() {
try {
return value;
} finally {
value = 0;
}
}
void increment() {
synchronized void increment() {
value++;
}
}
@ -104,6 +105,7 @@ public class P2PNetworkLoad extends Metric implements MessageListener, SetupList
@Override
protected void execute() {
// in case we do not have a NetworkNode up and running, we create one
if (null == networkNode) {
// prepare the gate
@ -117,145 +119,68 @@ public class P2PNetworkLoad extends Metric implements MessageListener, SetupList
// wait for the HS to be published
hsReady.await();
// boot up P2P node
File storageDir = new File("/tmp/bisq-metric-storage");
String seedNodes = "";
try {
BisqEnvironment environment = new BisqEnvironment(new PropertySource<String>("name") {
@Override
public String getProperty(String name) {
if(BtcOptionKeys.BASE_CURRENCY_NETWORK.equals(name))
return BaseCurrencyNetwork.BTC_MAINNET.name();
return "";
}
});
int maxConnections = Integer.parseInt(configuration.getProperty(MAX_CONNECTIONS, "12"));
NetworkProtoResolver networkProtoResolver = new CoreNetworkProtoResolver();
CorePersistenceProtoResolver persistenceProtoResolver = new CorePersistenceProtoResolver(null,
networkProtoResolver, storageDir);
DefaultSeedNodeRepository seedNodeRepository = new DefaultSeedNodeRepository(
new SeedNodeAddressLookup(environment, false, 0, null, seedNodes));
PeerManager peerManager = new PeerManager(networkNode, seedNodeRepository, new Clock(),
persistenceProtoResolver, maxConnections, storageDir);
PeerExchangeManager peerExchangeManager = new PeerExchangeManager(networkNode, seedNodeRepository,
peerManager);
// updates the peer list every now and then as well
peerExchangeManager
.requestReportedPeersFromSeedNodes(seedNodeRepository.getSeedNodeAddresses().iterator().next()); // irgendeine
// seednode
// nehmen
keepAliveManager = new KeepAliveManager(networkNode, peerManager);
keepAliveManager.start();
networkNode.addMessageListener(this);
} catch (Throwable e) {
e.printStackTrace();
}
}
// clear our buckets
bucketsPerHost.clear();
ArrayList<Thread> threadList = new ArrayList<>();
// for each configured host
for (String current : configuration.getProperty(HOSTS, "").split(",")) {
threadList.add(new Thread(() -> {
try {
// parse Url
NodeAddress target = OnionParser.getNodeAddress(current);
// do the data request
nonce = new Random().nextInt();
SettableFuture<Connection> future = networkNode.sendMessage(target,
new PreliminaryGetDataRequest(nonce, hashes));
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
connection.addMessageListener(P2PNetworkLoad.this);
log.debug("Send PreliminaryDataRequest to " + connection + " succeeded.");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
gate.proceed();
log.error(
"Sending PreliminaryDataRequest failed. That is expected if the peer is offline.\n\tException="
+ throwable.getMessage());
}
});
} catch (Exception e) {
gate.proceed(); // release the gate on error
e.printStackTrace();
}
}, current));
}
gate.engage(threadList.size());
// start all threads and wait until they all finished. We do that so we can
// minimize the time between querying the hosts and therefore the chance of
// inconsistencies.
threadList.forEach(Thread::start);
gate.await();
// report
Map<String, String> report = new HashMap<>();
// - assemble histograms
bucketsPerHost.forEach((host, buckets) -> buckets.forEach((type, counter) -> report
.put(OnionParser.prettyPrint(host) + "." + type, String.valueOf(counter.value()))));
// - assemble diffs
Map<String, Integer> messagesPerHost = new HashMap<>();
bucketsPerHost.forEach((host, buckets) -> messagesPerHost.put(OnionParser.prettyPrint(host),
buckets.values().stream().mapToInt(Counter::value).sum()));
Optional<String> referenceHost = messagesPerHost.keySet().stream().sorted().findFirst();
Integer referenceValue = messagesPerHost.get(referenceHost.get());
// - get snapshot so we do not loose data
Set<String> keys = new HashSet<>(buckets.keySet());
messagesPerHost.forEach(
(host, numberOfMessages) -> {
try {
report.put(OnionParser.prettyPrint(host) + ".relativeNumberOfMessages",
String.valueOf(numberOfMessages - referenceValue));
report.put(OnionParser.prettyPrint(host) + ".referenceHost", referenceHost.get());
report.put(OnionParser.prettyPrint(host) + ".referenceValue", String.valueOf(referenceValue));
} catch (MalformedURLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
// - transfer values to report
keys.forEach(key -> report.put(key, String.valueOf(buckets.get(key).getAndReset())));
// when our hash cache exceeds a hard limit, we clear the cache and start anew
if (hashes.size() > 150000)
hashes.clear();
// in case we just started anew, do not report our findings as they contain not
// only the changes since our last run, but a whole lot more data dating back even
// to the beginning of bisq.
if (!hashes.isEmpty())
reporter.report(report, "bisq." + getName());
// - report
reporter.report(report, "bisq." + getName());
}
@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if (networkEnvelope instanceof GetDataResponse) {
GetDataResponse dataResponse = (GetDataResponse) networkEnvelope;
Map<String, Counter> buckets = new HashMap<>();
final Set<ProtectedStorageEntry> dataSet = dataResponse.getDataSet();
dataSet.forEach(e -> {
final ProtectedStoragePayload protectedStoragePayload = e.getProtectedStoragePayload();
if (protectedStoragePayload == null) {
log.warn("StoragePayload was null: {}", networkEnvelope.toString());
return;
}
// memorize message hashes
hashes.add(P2PDataStorage.get32ByteHash(protectedStoragePayload));
// For logging different data types
String className = protectedStoragePayload.getClass().getSimpleName();
try {
buckets.get(className).increment();
} catch (NullPointerException nullPointerException) {
buckets.put(className, new Counter());
}
});
Set<PersistableNetworkPayload> persistableNetworkPayloadSet = dataResponse
.getPersistableNetworkPayloadSet();
if (persistableNetworkPayloadSet != null) {
persistableNetworkPayloadSet.forEach(persistableNetworkPayload -> {
// memorize message hashes
hashes.add(persistableNetworkPayload.getHash());
// For logging different data types
String className = persistableNetworkPayload.getClass().getSimpleName();
buckets.putIfAbsent(className, new Counter());
buckets.get(className).increment();
});
// TODO check if we already have this very message
if (networkEnvelope instanceof BroadcastMessage) {
try {
buckets.get(networkEnvelope.getClass().getSimpleName()).increment();
} catch (NullPointerException e) {
// use exception handling because we hardly ever need to add a fresh bucket
buckets.put(networkEnvelope.getClass().getSimpleName(), new Counter());
}
checkNotNull(connection.peersNodeAddressProperty(),
"although the property is nullable, we need it to not be null");
bucketsPerHost.put(connection.peersNodeAddressProperty().getValue(), buckets);
connection.removeMessageListener(this);
gate.proceed();
} else if (networkEnvelope instanceof CloseConnectionMessage) {
gate.unlock();
} else {
log.warn("Got a message of type <{}>, expected <GetDataResponse>",
networkEnvelope.getClass().getSimpleName());
}
}
@ -276,4 +201,11 @@ public class P2PNetworkLoad extends Metric implements MessageListener, SetupList
@Override
public void onRequestCustomBridges() {
}
@Override
public void shutdown() {
keepAliveManager.shutDown();
super.shutdown();
}
}