diff --git a/core/src/main/java/bisq/core/monitor/DoubleValueItem.java b/core/src/main/java/bisq/core/monitor/DoubleValueItem.java index 28f6374e0d..15530d7cb3 100644 --- a/core/src/main/java/bisq/core/monitor/DoubleValueItem.java +++ b/core/src/main/java/bisq/core/monitor/DoubleValueItem.java @@ -81,6 +81,6 @@ public enum DoubleValueItem implements ReportingItem { @Override public String toString() { - return name() + "= " + value; + return name() + "=" + value; } } diff --git a/core/src/main/java/bisq/core/monitor/IntegerValueItem.java b/core/src/main/java/bisq/core/monitor/IntegerValueItem.java index 8055eb6335..06253f1ee6 100644 --- a/core/src/main/java/bisq/core/monitor/IntegerValueItem.java +++ b/core/src/main/java/bisq/core/monitor/IntegerValueItem.java @@ -102,6 +102,6 @@ public enum IntegerValueItem implements ReportingItem { @Override public String toString() { - return name() + "= " + value; + return name() + "=" + value; } } diff --git a/core/src/main/java/bisq/core/monitor/StringValueItem.java b/core/src/main/java/bisq/core/monitor/StringValueItem.java index 0eae92afe9..f698b30bae 100644 --- a/core/src/main/java/bisq/core/monitor/StringValueItem.java +++ b/core/src/main/java/bisq/core/monitor/StringValueItem.java @@ -82,6 +82,6 @@ public enum StringValueItem implements ReportingItem { @Override public String toString() { - return name() + "= " + value; + return name() + "=" + value; } } diff --git a/seednode/src/main/java/bisq/seednode/SeedNode.java b/seednode/src/main/java/bisq/seednode/SeedNode.java index ff2f2609d3..996acedb9b 100644 --- a/seednode/src/main/java/bisq/seednode/SeedNode.java +++ b/seednode/src/main/java/bisq/seednode/SeedNode.java @@ -21,7 +21,11 @@ import bisq.core.app.misc.AppSetup; import bisq.core.app.misc.AppSetupWithP2PAndDAO; import bisq.core.network.p2p.inventory.GetInventoryRequestHandler; +import bisq.common.config.Config; + import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.name.Names; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -32,6 +36,7 @@ public class SeedNode { private Injector injector; private AppSetup appSetup; private GetInventoryRequestHandler getInventoryRequestHandler; + private SeedNodeReportingService seedNodeReportingService; public SeedNode() { } @@ -41,9 +46,19 @@ public class SeedNode { appSetup.start(); getInventoryRequestHandler = injector.getInstance(GetInventoryRequestHandler.class); + + String seedNodeReportingServerUrl = injector.getInstance(Key.get(String.class, Names.named(Config.SEED_NODE_REPORTING_SERVER_URL))); + if (seedNodeReportingServerUrl != null && !seedNodeReportingServerUrl.trim().isEmpty()) { + seedNodeReportingService = injector.getInstance(SeedNodeReportingService.class); + } } public void shutDown() { - getInventoryRequestHandler.shutDown(); + if (getInventoryRequestHandler != null) { + getInventoryRequestHandler.shutDown(); + } + if (seedNodeReportingService != null) { + seedNodeReportingService.shutDown(); + } } } diff --git a/seednode/src/main/java/bisq/seednode/SeedNodeReportingService.java b/seednode/src/main/java/bisq/seednode/SeedNodeReportingService.java new file mode 100644 index 0000000000..25c91007c8 --- /dev/null +++ b/seednode/src/main/java/bisq/seednode/SeedNodeReportingService.java @@ -0,0 +1,281 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.seednode; + +import bisq.core.dao.DaoFacade; +import bisq.core.dao.monitoring.BlindVoteStateMonitoringService; +import bisq.core.dao.monitoring.DaoStateMonitoringService; +import bisq.core.dao.monitoring.ProposalStateMonitoringService; +import bisq.core.dao.monitoring.model.BlindVoteStateBlock; +import bisq.core.dao.monitoring.model.DaoStateBlock; +import bisq.core.dao.monitoring.model.ProposalStateBlock; +import bisq.core.dao.state.DaoStateListener; +import bisq.core.dao.state.DaoStateService; +import bisq.core.monitor.DoubleValueItem; +import bisq.core.monitor.IntegerValueItem; +import bisq.core.monitor.ReportingItems; +import bisq.core.monitor.StringValueItem; + +import bisq.network.p2p.P2PService; +import bisq.network.p2p.network.NetworkNode; +import bisq.network.p2p.network.Statistic; +import bisq.network.p2p.peers.PeerManager; +import bisq.network.p2p.storage.P2PDataStorage; +import bisq.network.p2p.storage.payload.ProtectedStorageEntry; + +import bisq.common.Timer; +import bisq.common.UserThread; +import bisq.common.app.Version; +import bisq.common.config.Config; +import bisq.common.util.Profiler; +import bisq.common.util.Utilities; + +import javax.inject.Inject; +import javax.inject.Named; +import javax.inject.Singleton; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; + +import java.io.IOException; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import java.lang.management.ManagementFactory; + +import lombok.extern.slf4j.Slf4j; + +/** + * Sends reporting data to monitoring server via clear net. + * The seed node is configured with nginx as proxy which takes care of TLS handling and provides the client key. + * + * We send on a regular interval every 60 seconds the used memory metric data which serves as a heartbeat to signal the + * monitor that the seed node is alive. + * We send every 5 minutes the network data, network load data and node specific data. + * At each new block we send the DAO hashes and block height. + */ +@Slf4j +@Singleton +public class SeedNodeReportingService { + private final static long REPORT_DELAY_SEC = TimeUnit.MINUTES.toSeconds(5); + private final static long HEART_BEAT_DELAY_SEC = TimeUnit.MINUTES.toSeconds(1); + + private final P2PService p2PService; + private final NetworkNode networkNode; + private final PeerManager peerManager; + private final P2PDataStorage p2PDataStorage; + private final DaoStateService daoStateService; + private final DaoStateMonitoringService daoStateMonitoringService; + private final ProposalStateMonitoringService proposalStateMonitoringService; + private final BlindVoteStateMonitoringService blindVoteStateMonitoringService; + private final int maxConnections; + private final String seedNodeReportingServerUrl; + private final DaoStateListener daoStateListener; + private final HttpClient httpClient; + + private Timer dataReportTimer; + private final Timer heartBeatTimer; + private final ThreadPoolExecutor executor; + + @Inject + public SeedNodeReportingService(P2PService p2PService, + DaoFacade daoFacade, + NetworkNode networkNode, + PeerManager peerManager, + P2PDataStorage p2PDataStorage, + DaoStateService daoStateService, + DaoStateMonitoringService daoStateMonitoringService, + ProposalStateMonitoringService proposalStateMonitoringService, + BlindVoteStateMonitoringService blindVoteStateMonitoringService, + @Named(Config.MAX_CONNECTIONS) int maxConnections, + @Named(Config.SEED_NODE_REPORTING_SERVER_URL) String seedNodeReportingServerUrl) { + this.p2PService = p2PService; + this.networkNode = networkNode; + this.peerManager = peerManager; + this.p2PDataStorage = p2PDataStorage; + this.daoStateService = daoStateService; + this.daoStateMonitoringService = daoStateMonitoringService; + this.proposalStateMonitoringService = proposalStateMonitoringService; + this.blindVoteStateMonitoringService = blindVoteStateMonitoringService; + this.maxConnections = maxConnections; + this.seedNodeReportingServerUrl = seedNodeReportingServerUrl; + + executor = Utilities.getThreadPoolExecutor("SeedNodeReportingService", 2, 4, 30); + httpClient = HttpClient.newHttpClient(); + + heartBeatTimer = UserThread.runPeriodically(this::sendHeartBeat, HEART_BEAT_DELAY_SEC); + + // We send each time when a new block is received and the DAO hash has been provided (which + // takes a bit after the block arrives). + daoStateMonitoringService.addListener(new DaoStateMonitoringService.Listener() { + @Override + public void onDaoStateHashesChanged() { + sendBlockRelatedData(); + } + + @Override + public void onCheckpointFail() { + } + }); + + // Independent of the block + daoStateListener = new DaoStateListener() { + @Override + public void onParseBlockChainComplete() { + daoFacade.removeBsqStateListener(daoStateListener); + dataReportTimer = UserThread.runPeriodically(() -> sendDataReport(), REPORT_DELAY_SEC); + sendDataReport(); + + sendBlockRelatedData(); + } + }; + daoFacade.addBsqStateListener(daoStateListener); + } + + public void shutDown() { + if (heartBeatTimer != null) { + heartBeatTimer.stop(); + } + if (dataReportTimer != null) { + dataReportTimer.stop(); + } + + Utilities.shutdownAndAwaitTermination(executor, 2, TimeUnit.SECONDS); + } + + private void sendHeartBeat() { + if (p2PService.getAddress() == null) { + return; + } + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + reportingItems.add(IntegerValueItem.usedMemoryInMB.withValue((int) Profiler.getUsedMemoryInMB())); + sendReportingItems(reportingItems); + } + + private void sendBlockRelatedData() { + if (p2PService.getAddress() == null) { + return; + } + + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + int daoStateChainHeight = daoStateService.getChainHeight(); + reportingItems.add(IntegerValueItem.daoStateChainHeight.withValue(daoStateChainHeight)); + daoStateService.getLastBlock().map(block -> (int) (block.getTime() / 1000)) + .ifPresent(blockTime -> reportingItems.add(IntegerValueItem.blockTimeIsSec.withValue(blockTime))); + LinkedList daoStateBlockChain = daoStateMonitoringService.getDaoStateBlockChain(); + if (!daoStateBlockChain.isEmpty()) { + String daoStateHash = Utilities.bytesAsHexString(daoStateBlockChain.getLast().getMyStateHash().getHash()); + reportingItems.add(StringValueItem.daoStateHash.withValue(daoStateHash)); + } + + LinkedList proposalStateBlockChain = proposalStateMonitoringService.getProposalStateBlockChain(); + if (!proposalStateBlockChain.isEmpty()) { + String proposalHash = Utilities.bytesAsHexString(proposalStateBlockChain.getLast().getMyStateHash().getHash()); + reportingItems.add(StringValueItem.proposalHash.withValue(proposalHash)); + } + + LinkedList blindVoteStateBlockChain = blindVoteStateMonitoringService.getBlindVoteStateBlockChain(); + if (!blindVoteStateBlockChain.isEmpty()) { + String blindVoteHash = Utilities.bytesAsHexString(blindVoteStateBlockChain.getLast().getMyStateHash().getHash()); + reportingItems.add(StringValueItem.blindVoteHash.withValue(blindVoteHash)); + } + + sendReportingItems(reportingItems); + } + + private void sendDataReport() { + if (p2PService.getAddress() == null) { + return; + } + + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + + // Data + Map numItemsByType = new HashMap<>(); + Stream.concat(p2PDataStorage.getPersistableNetworkPayloadCollection().stream() + .map(payload -> payload.getClass().getSimpleName()), + p2PDataStorage.getMap().values().stream() + .map(ProtectedStorageEntry::getProtectedStoragePayload) + .map(payload -> payload.getClass().getSimpleName())) + .forEach(className -> { + numItemsByType.putIfAbsent(className, 0); + numItemsByType.put(className, numItemsByType.get(className) + 1); + }); + numItemsByType.forEach((key, value) -> reportingItems.add(IntegerValueItem.from(key, value))); + + // Network + reportingItems.add(IntegerValueItem.numConnections.withValue(networkNode.getAllConnections().size())); + reportingItems.add(IntegerValueItem.peakNumConnections.withValue(peerManager.getPeakNumConnections())); + reportingItems.add(IntegerValueItem.numAllConnectionsLostEvents.withValue(peerManager.getNumAllConnectionsLostEvents())); + reportingItems.add(IntegerValueItem.sentBytes.withValue((int) Statistic.getTotalSentBytes())); + reportingItems.add(IntegerValueItem.receivedBytes.withValue((int) Statistic.getTotalReceivedBytes())); + reportingItems.add(DoubleValueItem.sentBytesPerSec.withValue(Statistic.getTotalSentBytesPerSec())); + reportingItems.add(DoubleValueItem.sentMessagesPerSec.withValue(Statistic.getNumTotalSentMessagesPerSec())); + reportingItems.add(DoubleValueItem.receivedBytesPerSec.withValue(Statistic.getTotalReceivedBytesPerSec())); + reportingItems.add(DoubleValueItem.receivedMessagesPerSec.withValue(Statistic.numTotalReceivedMessagesPerSec())); + + // Node + reportingItems.add(IntegerValueItem.usedMemoryInMB.withValue((int) Profiler.getUsedMemoryInMB())); + reportingItems.add(IntegerValueItem.totalMemoryInMB.withValue((int) Profiler.getTotalMemoryInMB())); + reportingItems.add(IntegerValueItem.jvmStartTimeInSec.withValue((int) (ManagementFactory.getRuntimeMXBean().getStartTime() / 1000))); + reportingItems.add(IntegerValueItem.maxConnections.withValue(maxConnections)); + reportingItems.add(StringValueItem.version.withValue(Version.VERSION)); + + // If no commit hash is found we use 0 in hex format + String commitHash = Version.findCommitHash().orElse("00"); + reportingItems.add(StringValueItem.commitHash.withValue(commitHash)); + + sendReportingItems(reportingItems); + } + + private void sendReportingItems(ReportingItems reportingItems) { + CompletableFuture.runAsync(() -> { + log.info("Send report to monitor server: {}", reportingItems.toString()); + // We send the data as hex encoded protobuf data. We do not use the envelope as it is not part of the p2p system. + byte[] protoMessageAsBytes = reportingItems.toProtoMessageAsBytes(); + try { + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(seedNodeReportingServerUrl)) + .POST(HttpRequest.BodyPublishers.ofByteArray(protoMessageAsBytes)) + .header("User-Agent", getMyAddress()) + .build(); + HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + if (response.statusCode() != 200) { + log.error("Response error message: {}", response); + } + } catch (IOException e) { + log.warn("IOException at sending reporting. {}", e.getMessage()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, executor); + } + + private String getMyAddress() { + return p2PService.getAddress() != null ? p2PService.getAddress().getFullAddress() : "N/A"; + } + +}