Merge pull request #3045 from freimair/monitor-bsqblocks

Monitor fixes
This commit is contained in:
sqrrm 2019-08-09 17:47:20 +02:00 committed by GitHub
commit b1ebe071ed
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 68 additions and 134 deletions

View file

@ -119,13 +119,17 @@ public abstract class Metric extends Configurable implements Runnable {
@Override
public void run() {
Thread.currentThread().setName("Metric: " + getName());
try {
Thread.currentThread().setName("Metric: " + getName());
// execute all the things
synchronized (this) {
log.info("{} started", getName());
execute();
log.info("{} done", getName());
// execute all the things
synchronized (this) {
log.info("{} started", getName());
execute();
log.info("{} done", getName());
}
} catch(Throwable e) {
log.error("A metric misbehaved!", e);
}
}

View file

@ -103,7 +103,6 @@ public class MarketStats extends Metric {
}
amount.find();
timestamp.find();
System.err.println(getName() + ".volume." + market.group(1) + " " + amount.group(1) + " " + timestamp.group(1).substring(0, timestamp.group(1).length() - 3));
reporter.report("volume." + market.group(1), amount.group(1), timestamp.group(1), getName());
});
} catch (IllegalStateException ignore) {

View file

@ -17,166 +17,95 @@
package bisq.monitor.metric;
import bisq.monitor.AvailableTor;
import bisq.monitor.Metric;
import bisq.monitor.Monitor;
import bisq.monitor.OnionParser;
import bisq.monitor.Reporter;
import bisq.monitor.StatisticsHelper;
import bisq.monitor.ThreadGate;
import bisq.core.proto.network.CoreNetworkProtoResolver;
import bisq.network.p2p.CloseConnectionMessage;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.network.CloseConnectionReason;
import bisq.network.p2p.network.Connection;
import bisq.network.p2p.network.MessageListener;
import bisq.network.p2p.network.NetworkNode;
import bisq.network.p2p.network.SetupListener;
import bisq.network.p2p.network.TorNetworkNode;
import bisq.network.p2p.peers.keepalive.messages.Ping;
import bisq.network.p2p.peers.keepalive.messages.Pong;
import bisq.common.proto.network.NetworkEnvelope;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import lombok.extern.slf4j.Slf4j;
import static com.google.common.base.Preconditions.checkNotNull;
import org.jetbrains.annotations.NotNull;
@Slf4j
public class P2PRoundTripTime extends Metric implements MessageListener, SetupListener {
public class P2PRoundTripTime extends P2PSeedNodeSnapshotBase {
private static final String SAMPLE_SIZE = "run.sampleSize";
private static final String HOSTS = "run.hosts";
private static final String TOR_PROXY_PORT = "run.torProxyPort";
private NetworkNode networkNode;
private final File torHiddenServiceDir = new File("metric_" + getName());
private int nonce;
private long start;
private List<Long> samples;
private final ThreadGate gate = new ThreadGate();
private final ThreadGate hsReady = new ThreadGate();
private Map<Integer, Long> sentAt = new HashMap<>();
private Map<NodeAddress, Statistics> measurements = new HashMap<>();
public P2PRoundTripTime(Reporter reporter) {
super(reporter);
}
@Override
protected void execute() {
/**
* Use a counter to do statistics.
*/
private class Statistics {
if (null == networkNode) {
// close the gate
hsReady.engage();
private final List<Long> samples = new ArrayList<>();
networkNode = new TorNetworkNode(Integer.parseInt(configuration.getProperty(TOR_PROXY_PORT, "9052")),
new CoreNetworkProtoResolver(), false,
new AvailableTor(Monitor.TOR_WORKING_DIR, torHiddenServiceDir.getName()));
networkNode.start(this);
// wait for the gate to be reopened
hsReady.await();
public synchronized void log(Object message) {
Pong pong = (Pong) message;
Long start = sentAt.get(pong.getRequestNonce());
if(start != null)
samples.add(System.currentTimeMillis() - start);
}
// for each configured host
for (String current : configuration.getProperty(HOSTS, "").split(",")) {
try {
// parse Url
NodeAddress target = OnionParser.getNodeAddress(current);
// init sample bucket
samples = new ArrayList<>();
while (samples.size() < Integer.parseInt(configuration.getProperty(SAMPLE_SIZE, "1"))) {
// so we do not get disconnected due to DoS protection mechanisms
Thread.sleep(200);
nonce = new Random().nextInt();
// close the gate
gate.engage();
start = System.currentTimeMillis();
SettableFuture<Connection> future = networkNode.sendMessage(target, new Ping(nonce, 42));
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
connection.addMessageListener(P2PRoundTripTime.this);
}
@Override
public void onFailure(@NotNull Throwable throwable) {
gate.proceed();
log.error("Sending ping failed. That is expected if the peer is offline.\n\tException="
+ throwable.getMessage());
}
});
// wait for the gate to open again
gate.await();
// remove the message listener so we do not get messages we are not interested in anymore
// (especially relevant when gate.await() times out)
future.get().removeMessageListener(this);
}
// report
reporter.report(StatisticsHelper.process(samples),
getName() + "." + OnionParser.prettyPrint(target));
} catch (Exception e) {
gate.proceed(); // release the gate on error
e.printStackTrace();
}
public List<Long> values() {
return samples;
}
}
@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
protected List<NetworkEnvelope> getRequests() {
List<NetworkEnvelope> result = new ArrayList<>();
Random random = new Random();
for (int i = 0; i < Integer.parseInt(configuration.getProperty(SAMPLE_SIZE, "1")); i++)
result.add(new Ping(random.nextInt(), 42));
return result;
}
@Override
protected void aboutToSend(NetworkEnvelope message) {
sentAt.put(((Ping) message).getNonce(), System.currentTimeMillis());
}
@Override
protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if (networkEnvelope instanceof Pong) {
Pong pong = (Pong) networkEnvelope;
if (pong.getRequestNonce() == nonce) {
samples.add(System.currentTimeMillis() - start);
} else {
log.warn("Nonce not matching. That should never happen.\n\t" +
"We drop that message. nonce={} / requestNonce={}",
nonce, pong.getRequestNonce());
}
checkNotNull(connection.getPeersNodeAddressProperty(),
"although the property is nullable, we need it to not be null");
measurements.putIfAbsent(connection.getPeersNodeAddressProperty().getValue(), new Statistics());
measurements.get(connection.getPeersNodeAddressProperty().getValue()).log(networkEnvelope);
connection.shutDown(CloseConnectionReason.APP_SHUT_DOWN);
// open the gate
gate.proceed();
} else if (networkEnvelope instanceof CloseConnectionMessage) {
gate.unlock();
} else {
log.warn("Got a message of type <{}>, expected <Pong>", networkEnvelope.getClass().getSimpleName());
return true;
}
return false;
}
@Override
public void onTorNodeReady() {
}
@Override
public void onHiddenServicePublished() {
hsReady.proceed();
}
@Override
public void onSetupFailed(Throwable throwable) {
}
@Override
public void onRequestCustomBridges() {
void report() {
// report
measurements.forEach(((nodeAddress, samples) ->
reporter.report(StatisticsHelper.process(samples.values()),
getName() + "." + OnionParser.prettyPrint(nodeAddress))
));
// clean up for next round
measurements = new HashMap<>();
}
}

View file

@ -190,8 +190,8 @@ public class P2PSeedNodeSnapshot extends P2PSeedNodeSnapshotBase {
try {
report.put(OnionParser.prettyPrint(host) + ".relativeNumberOfMessages." + messageType,
String.valueOf(((Counter) count).value() - referenceValues.get(messageType).value()));
} catch (MalformedURLException ignore) {
log.error("we should never got here");
} catch (MalformedURLException | NullPointerException ignore) {
log.error("we should never have gotten here", ignore);
}
});
try {

View file

@ -118,12 +118,12 @@ abstract public class P2PSeedNodeSnapshotBase extends Metric implements MessageL
NodeAddress target = OnionParser.getNodeAddress(current);
// do the data request
aboutToSend(message);
SettableFuture<Connection> future = networkNode.sendMessage(target, message);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
connection.removeMessageListener(P2PSeedNodeSnapshotBase.this);
connection.addMessageListener(P2PSeedNodeSnapshotBase.this);
}
@ -131,8 +131,7 @@ abstract public class P2PSeedNodeSnapshotBase extends Metric implements MessageL
public void onFailure(@NotNull Throwable throwable) {
gate.proceed();
log.error(
"Sending PreliminaryDataRequest failed. That is expected if the peer is offline.\n\tException="
+ throwable.getMessage());
"Sending {} failed. That is expected if the peer is offline.\n\tException={}", message.getClass().getSimpleName(), throwable.getMessage());
}
});
@ -153,6 +152,8 @@ abstract public class P2PSeedNodeSnapshotBase extends Metric implements MessageL
gate.await();
}
protected void aboutToSend(NetworkEnvelope message) { };
/**
* Report all the stuff. Uses the configured reporter directly.
*/
@ -168,6 +169,7 @@ abstract public class P2PSeedNodeSnapshotBase extends Metric implements MessageL
log.warn("Got an unexpected message of type <{}>",
networkEnvelope.getClass().getSimpleName());
}
connection.removeMessageListener(this);
}
abstract protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection connection);