Improve monitor

This commit is contained in:
Manfred Karrer 2017-12-19 00:18:18 +01:00
parent 852d4c5ba5
commit 13883fe4ec
No known key found for this signature in database
GPG Key ID: 401250966A6B2C46
8 changed files with 65 additions and 74 deletions

View File

@ -32,6 +32,7 @@ import io.bisq.network.p2p.peers.getdata.RequestDataManager;
import io.bisq.network.p2p.peers.keepalive.KeepAliveManager;
import io.bisq.network.p2p.peers.peerexchange.PeerExchangeManager;
import io.bisq.network.p2p.storage.P2PDataStorage;
import io.bisq.seednode_monitor.metrics.MetricsModel;
import org.springframework.core.env.Environment;
import java.io.File;
@ -47,7 +48,7 @@ public class MonitorP2PModule extends AppModule {
@Override
protected void configure() {
bind(MetricsByNodeAddressMap.class).in(Singleton.class);
bind(MetricsModel.class).in(Singleton.class);
bind(MonitorP2PService.class).in(Singleton.class);
bind(PeerManager.class).in(Singleton.class);

View File

@ -23,7 +23,7 @@ import io.bisq.network.Socks5ProxyProvider;
import io.bisq.network.p2p.network.NetworkNode;
import io.bisq.network.p2p.network.SetupListener;
import io.bisq.network.p2p.storage.P2PDataStorage;
import io.bisq.seednode_monitor.request.MonitorRequestManager;
import io.bisq.seednode_monitor.metrics.MonitorRequestManager;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

View File

@ -20,6 +20,7 @@ import io.bisq.core.btc.wallet.BsqWalletService;
import io.bisq.core.btc.wallet.BtcWalletService;
import io.bisq.core.btc.wallet.WalletsSetup;
import io.bisq.core.offer.OpenOfferManager;
import io.bisq.seednode_monitor.metrics.MetricsModel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
@ -36,7 +37,7 @@ public class SeedNodeMonitor {
private static SeedNodeMonitorEnvironment environment;
@Getter
private final MetricsByNodeAddressMap metricsByNodeAddressMap;
private final MetricsModel metricsService;
public static void setEnvironment(SeedNodeMonitorEnvironment environment) {
SeedNodeMonitor.environment = environment;
@ -96,7 +97,7 @@ public class SeedNodeMonitor {
seedNodeModule = new SeedNodeMonitorModule(environment);
injector = Guice.createInjector(seedNodeModule);
metricsByNodeAddressMap = injector.getInstance(MetricsByNodeAddressMap.class);
metricsService = injector.getInstance(MetricsModel.class);
MonitorAppSetup appSetup = injector.getInstance(MonitorAppSetup.class);
appSetup.start();

View File

@ -99,7 +99,7 @@ public class SeedNodeMonitorMain extends BisqExecutable {
port(8080);
get("/", (req, res) -> {
log.info("Incoming request from: " + req.userAgent());
return seedNodeMonitor.getMetricsByNodeAddressMap().getResultAsHtml();
return seedNodeMonitor.getMetricsService().getResultAsHtml();
});
new SeedNodeMonitorMain().execute(args);

View File

@ -15,7 +15,7 @@
* along with bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bisq.seednode_monitor;
package io.bisq.seednode_monitor.metrics;
import lombok.Getter;

View File

@ -15,11 +15,12 @@
* along with bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bisq.seednode_monitor;
package io.bisq.seednode_monitor.metrics;
import io.bisq.common.util.MathUtils;
import io.bisq.network.p2p.NodeAddress;
import io.bisq.network.p2p.seed.SeedNodesRepository;
import io.bisq.seednode_monitor.MonitorOptionKeys;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@ -32,7 +33,7 @@ import java.util.*;
import java.util.stream.Collectors;
@Slf4j
public class MetricsByNodeAddressMap extends HashMap<NodeAddress, Metrics> {
public class MetricsModel {
private SlackApi slackApi;
@Getter
private String resultAsString;
@ -42,26 +43,34 @@ public class MetricsByNodeAddressMap extends HashMap<NodeAddress, Metrics> {
@Setter
private long lastCheckTs;
private int totalErrors = 0;
private HashMap<NodeAddress, Metrics> map = new HashMap<>();
@Inject
public MetricsByNodeAddressMap(SeedNodesRepository seedNodesRepository,
@Named(MonitorOptionKeys.SLACK_URL_SEED_CHANNEL) String slackUrlSeedChannel) {
public MetricsModel(SeedNodesRepository seedNodesRepository,
@Named(MonitorOptionKeys.SLACK_URL_SEED_CHANNEL) String slackUrlSeedChannel) {
this.seedNodesRepository = seedNodesRepository;
if (!slackUrlSeedChannel.isEmpty())
slackApi = new SlackApi(slackUrlSeedChannel);
}
public void addToMap(NodeAddress nodeAddress, Metrics metrics) {
map.put(nodeAddress, metrics);
}
public Metrics getMetrics(NodeAddress nodeAddress) {
return map.get(nodeAddress);
}
public void updateReport() {
Map<String, Double> accumulatedValues = new HashMap<>();
final double[] items = {0};
List<Entry<NodeAddress, Metrics>> entryList = entrySet().stream()
List<Map.Entry<NodeAddress, Metrics>> entryList = map.entrySet().stream()
.sorted(Comparator.comparing(entrySet -> seedNodesRepository.getOperator(entrySet.getKey())))
.collect(Collectors.toList());
totalErrors = 0;
entryList.stream().forEach(e -> {
totalErrors += e.getValue().errorMessages.size();
totalErrors += e.getValue().errorMessages.stream().filter(s -> !s.isEmpty()).count();
final List<Map<String, Integer>> receivedObjectsList = e.getValue().getReceivedObjectsList();
if (!receivedObjectsList.isEmpty()) {
items[0] += 1;
@ -109,7 +118,7 @@ public class MetricsByNodeAddressMap extends HashMap<NodeAddress, Metrics> {
StringBuilder sb = new StringBuilder();
sb.append("Seed nodes in error:" + totalErrors);
sb.append("\nLast check started at: " + time);
sb.append("\nLast check started at: " + time + "\n");
entryList.stream().forEach(e -> {
final List<Long> allDurations = e.getValue().getRequestDurations();
@ -182,8 +191,8 @@ public class MetricsByNodeAddressMap extends HashMap<NodeAddress, Metrics> {
"Please check the monitoring status page at http://seedmonitor.0-2-1.net:8080/"));
}
});
sb.append("\nDuration all requests: ").append(allDurationsString)
.append("\nAll data: ").append(allReceivedDataString);
sb.append("Duration all requests: ").append(allDurationsString)
.append("\nAll data: ").append(allReceivedDataString).append("\n");
html.append("</td></tr>");
}
@ -191,12 +200,11 @@ public class MetricsByNodeAddressMap extends HashMap<NodeAddress, Metrics> {
html.append("</table></body></html>");
resultAsString = sb.toString();
resultAsHtml = html.toString();
log();
}
public void log() {
log.info("\n#################################################################\n" +
log.info("\n\n#################################################################\n" +
resultAsString +
"\n#################################################################\n\n");
"#################################################################\n\n");
}
}

View File

@ -1,4 +1,4 @@
package io.bisq.seednode_monitor.request;
package io.bisq.seednode_monitor.metrics;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@ -21,7 +21,6 @@ import io.bisq.network.p2p.storage.P2PDataStorage;
import io.bisq.network.p2p.storage.payload.PersistableNetworkPayload;
import io.bisq.network.p2p.storage.payload.ProtectedStorageEntry;
import io.bisq.network.p2p.storage.payload.ProtectedStoragePayload;
import io.bisq.seednode_monitor.Metrics;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
@ -111,14 +110,14 @@ class MonitorRequestHandler implements MessageListener {
},
TIMEOUT_SEC);
log.info("We send a {} to peer {}. ", getDataRequest.getClass().getSimpleName(), nodeAddress);
log.info("We send a PreliminaryGetDataRequest to peer {}. ", nodeAddress);
networkNode.addMessageListener(this);
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getDataRequest);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
if (!stopped) {
log.info("Send " + getDataRequest + " to " + nodeAddress + " has succeeded.");
log.info("Send PreliminaryGetDataRequest to " + nodeAddress + " has succeeded.");
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call." +
"Might be caused by an previous timeout.");

View File

@ -1,6 +1,5 @@
package io.bisq.seednode_monitor.request;
package io.bisq.seednode_monitor.metrics;
import io.bisq.common.Clock;
import io.bisq.common.Timer;
import io.bisq.common.UserThread;
import io.bisq.network.p2p.NodeAddress;
@ -10,8 +9,6 @@ import io.bisq.network.p2p.network.ConnectionListener;
import io.bisq.network.p2p.network.NetworkNode;
import io.bisq.network.p2p.seed.SeedNodesRepository;
import io.bisq.network.p2p.storage.P2PDataStorage;
import io.bisq.seednode_monitor.Metrics;
import io.bisq.seednode_monitor.MetricsByNodeAddressMap;
import io.bisq.seednode_monitor.MonitorOptionKeys;
import lombok.extern.slf4j.Slf4j;
import net.gpedro.integrations.slack.SlackApi;
@ -24,10 +21,10 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public class MonitorRequestManager implements ConnectionListener {
private static final long RETRY_DELAY_SEC = 30;
private static final long RETRY_DELAY_SEC = 20;
private static final long CLEANUP_TIMER = 60;
private static final long REQUEST_PERIOD_MIN = 10;
private static final int MAX_RETRIES = 4;
private static final int MAX_RETRIES = 6;
///////////////////////////////////////////////////////////////////////////////////////////
@ -35,11 +32,12 @@ public class MonitorRequestManager implements ConnectionListener {
///////////////////////////////////////////////////////////////////////////////////////////
private final NetworkNode networkNode;
private final int numNodes;
private SlackApi slackApi;
private P2PDataStorage dataStorage;
private SeedNodesRepository seedNodesRepository;
private MetricsByNodeAddressMap metricsByNodeAddressMap;
private Clock clock;
private MetricsModel metricsModel;
private final Set<NodeAddress> seedNodeAddresses;
private final Map<NodeAddress, MonitorRequestHandler> handlerMap = new HashMap<>();
@ -47,7 +45,7 @@ public class MonitorRequestManager implements ConnectionListener {
private Map<NodeAddress, Integer> retryCounterMap = new HashMap<>();
private boolean stopped;
private Set<NodeAddress> nodesInError = new HashSet<>();
private int completedRequestIndex;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
@ -57,21 +55,20 @@ public class MonitorRequestManager implements ConnectionListener {
public MonitorRequestManager(NetworkNode networkNode,
P2PDataStorage dataStorage,
SeedNodesRepository seedNodesRepository,
MetricsByNodeAddressMap metricsByNodeAddressMap,
Clock clock,
MetricsModel metricsModel,
@Named(MonitorOptionKeys.SLACK_URL_SEED_CHANNEL) String slackUrlSeedChannel) {
this.networkNode = networkNode;
this.dataStorage = dataStorage;
this.seedNodesRepository = seedNodesRepository;
this.metricsByNodeAddressMap = metricsByNodeAddressMap;
this.clock = clock;
this.metricsModel = metricsModel;
if (!slackUrlSeedChannel.isEmpty())
slackApi = new SlackApi(slackUrlSeedChannel);
this.networkNode.addConnectionListener(this);
seedNodeAddresses = new HashSet<>(seedNodesRepository.getSeedNodeAddresses());
seedNodeAddresses.stream().forEach(nodeAddress -> metricsByNodeAddressMap.put(nodeAddress, new Metrics()));
seedNodeAddresses.stream().forEach(nodeAddress -> metricsModel.addToMap(nodeAddress, new Metrics()));
numNodes = seedNodeAddresses.size();
}
public void shutDown() {
@ -87,39 +84,21 @@ public class MonitorRequestManager implements ConnectionListener {
///////////////////////////////////////////////////////////////////////////////////////////
public void start() {
// We want get the logs each 10 minutes
clock.start();
clock.addListener(new Clock.Listener() {
@Override
public void onSecondTick() {
}
@Override
public void onMinuteTick() {
processOnMinuteTick();
}
@Override
public void onMissedSecondTick(long missed) {
}
});
requestAllNodes();
UserThread.runPeriodically(this::requestAllNodes, REQUEST_PERIOD_MIN, TimeUnit.MINUTES);
}
private void processOnMinuteTick() {
long minutes = System.currentTimeMillis() / 1000 / 60;
final long currentTimeMillis = System.currentTimeMillis();
if (minutes % REQUEST_PERIOD_MIN == 0) {
stopAllRetryTimers();
closeAllConnections();
private void requestAllNodes() {
stopAllRetryTimers();
closeAllConnections();
// we give 1 sec. for all connection shutdown
final int[] delay = {1000};
metricsModel.setLastCheckTs(System.currentTimeMillis());
// we give 1 sec. for all connection shutdown
final int[] delay = {1000};
metricsByNodeAddressMap.setLastCheckTs(currentTimeMillis);
seedNodeAddresses.stream().forEach(nodeAddress -> {
UserThread.runAfter(() -> requestData(nodeAddress), delay[0], TimeUnit.MILLISECONDS);
delay[0] += 100;
});
}
seedNodeAddresses.stream().forEach(nodeAddress -> {
UserThread.runAfter(() -> requestFromNode(nodeAddress), delay[0], TimeUnit.MILLISECONDS);
delay[0] += 100;
});
}
@ -145,12 +124,12 @@ public class MonitorRequestManager implements ConnectionListener {
// RequestData
///////////////////////////////////////////////////////////////////////////////////////////
private void requestData(NodeAddress nodeAddress) {
private void requestFromNode(NodeAddress nodeAddress) {
if (!stopped) {
if (!handlerMap.containsKey(nodeAddress)) {
MonitorRequestHandler requestDataHandler = new MonitorRequestHandler(networkNode,
dataStorage,
metricsByNodeAddressMap.get(nodeAddress),
metricsModel.getMetrics(nodeAddress),
new MonitorRequestHandler.Listener() {
@Override
public void onComplete() {
@ -162,8 +141,10 @@ public class MonitorRequestManager implements ConnectionListener {
// need to remove before listeners are notified as they cause the update call
handlerMap.remove(nodeAddress);
metricsByNodeAddressMap.updateReport();
metricsByNodeAddressMap.log();
metricsModel.updateReport();
completedRequestIndex++;
if (completedRequestIndex == numNodes)
metricsModel.log();
if (nodesInError.contains(nodeAddress)) {
nodesInError.remove(nodeAddress);
@ -176,9 +157,6 @@ public class MonitorRequestManager implements ConnectionListener {
@Override
public void onFault(String errorMessage, NodeAddress nodeAddress) {
handlerMap.remove(nodeAddress);
metricsByNodeAddressMap.updateReport();
metricsByNodeAddressMap.log();
int retryCounter;
if (retryCounterMap.containsKey(nodeAddress))
retryCounter = retryCounterMap.get(nodeAddress);
@ -186,10 +164,14 @@ public class MonitorRequestManager implements ConnectionListener {
retryCounter = 0;
if (retryCounter < MAX_RETRIES) {
final Timer timer = UserThread.runAfter(() -> requestData(nodeAddress), RETRY_DELAY_SEC);
final Timer timer = UserThread.runAfter(() -> requestFromNode(nodeAddress), RETRY_DELAY_SEC);
retryTimerMap.put(nodeAddress, timer);
retryCounterMap.put(nodeAddress, ++retryCounter);
} else {
metricsModel.updateReport();
completedRequestIndex++;
if (completedRequestIndex == numNodes)
metricsModel.log();
nodesInError.add(nodeAddress);
if (slackApi != null)
slackApi.call(new SlackMessage("Error: " + nodeAddress.getFullAddress(),