Increase socket timeout from 10 sec to 2 min.

This commit is contained in:
Manfred Karrer 2017-12-17 20:38:39 +01:00
parent 67fbaa2ecb
commit c947dab7eb
No known key found for this signature in database
GPG key ID: 401250966A6B2C46
15 changed files with 1510 additions and 1 deletions

View file

@ -32,7 +32,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
// Run in UserThread
public abstract class NetworkNode implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(NetworkNode.class);
private static final int CREATE_SOCKET_TIMEOUT_MILLIS = 10000;
private static final int CREATE_SOCKET_TIMEOUT_MILLIS = 120000;
final int servicePort;
private final NetworkProtoResolver networkProtoResolver;

15
seednode_monitor/pom.xml Normal file
View file

@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>parent</artifactId>
<groupId>io.bisq</groupId>
<version>0.6.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>seednode_monitor</artifactId>
</project>

View file

@ -0,0 +1,31 @@
/*
* This file is part of bisq.
*
* bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bisq.seednode_monitor;
import lombok.Getter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Getter
public class Metrics {
List<Long> requestDurations = new ArrayList<>();
List<String> errorMessages = new ArrayList<>();
List<Map<String, Integer>> receivedObjectsList = new ArrayList<>();
}

View file

@ -0,0 +1,42 @@
/*
* This file is part of bisq.
*
* bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bisq.seednode_monitor;
import io.bisq.network.p2p.NodeAddress;
import javafx.beans.property.ReadOnlyStringProperty;
import javafx.beans.property.SimpleStringProperty;
import javafx.beans.property.StringProperty;
import javax.inject.Inject;
import java.util.HashMap;
public class MetricsByNodeAddressMap extends HashMap<NodeAddress, Metrics> {
private StringProperty resultAsStringProperty = new SimpleStringProperty();
@Inject
public MetricsByNodeAddressMap() {
}
public ReadOnlyStringProperty resultAsStringProperty() {
return resultAsStringProperty;
}
public void setResultAsString(String result) {
this.resultAsStringProperty.set(result);
}
}

View file

@ -0,0 +1,82 @@
/*
* This file is part of bisq.
*
* bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bisq.seednode_monitor;
import io.bisq.common.app.Version;
import io.bisq.common.crypto.KeyRing;
import io.bisq.common.proto.persistable.PersistedDataHost;
import io.bisq.core.app.BisqEnvironment;
import io.bisq.core.app.SetupUtils;
import io.bisq.network.crypto.EncryptionService;
import lombok.extern.slf4j.Slf4j;
import javax.inject.Inject;
import java.util.ArrayList;
@Slf4j
public class MonitorAppSetup {
private MonitorP2PService seedNodeMonitorP2PService;
private final KeyRing keyRing;
private final EncryptionService encryptionService;
@Inject
public MonitorAppSetup(MonitorP2PService seedNodeMonitorP2PService, KeyRing keyRing, EncryptionService encryptionService) {
this.seedNodeMonitorP2PService = seedNodeMonitorP2PService;
this.keyRing = keyRing;
this.encryptionService = encryptionService;
Version.setBaseCryptoNetworkId(BisqEnvironment.getBaseCurrencyNetwork().ordinal());
Version.printVersion();
}
public void start() {
SetupUtils.checkCryptoSetup(keyRing, encryptionService, () -> {
initPersistedDataHosts();
initBasicServices();
}, throwable -> {
log.error(throwable.getMessage());
throwable.printStackTrace();
System.exit(1);
});
}
public void initPersistedDataHosts() {
ArrayList<PersistedDataHost> persistedDataHosts = new ArrayList<>();
persistedDataHosts.add(seedNodeMonitorP2PService);
// we apply at startup the reading of persisted data but don't want to get it triggered in the constructor
persistedDataHosts.stream().forEach(e -> {
try {
log.info("call readPersisted at " + e.getClass().getSimpleName());
e.readPersisted();
} catch (Throwable e1) {
log.error("readPersisted error", e1);
}
});
}
protected void initBasicServices() {
SetupUtils.readFromResources(seedNodeMonitorP2PService.getP2PDataStorage()).addListener((observable, oldValue, newValue) -> {
if (newValue)
startInitP2PNetwork();
});
}
private void startInitP2PNetwork() {
seedNodeMonitorP2PService.start();
}
}

View file

@ -0,0 +1,28 @@
/*
* This file is part of bisq.
*
* bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bisq.seednode_monitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
public class MonitorOptionKeys {
public MonitorOptionKeys() {
}
}

View file

@ -0,0 +1,83 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bisq.seednode_monitor;
import com.google.inject.Singleton;
import com.google.inject.name.Names;
import io.bisq.common.app.AppModule;
import io.bisq.network.NetworkOptionKeys;
import io.bisq.network.Socks5ProxyProvider;
import io.bisq.network.p2p.NetworkNodeProvider;
import io.bisq.network.p2p.P2PService;
import io.bisq.network.p2p.network.NetworkNode;
import io.bisq.network.p2p.peers.BanList;
import io.bisq.network.p2p.peers.Broadcaster;
import io.bisq.network.p2p.peers.PeerManager;
import io.bisq.network.p2p.peers.getdata.RequestDataManager;
import io.bisq.network.p2p.peers.keepalive.KeepAliveManager;
import io.bisq.network.p2p.peers.peerexchange.PeerExchangeManager;
import io.bisq.network.p2p.storage.P2PDataStorage;
import org.springframework.core.env.Environment;
import java.io.File;
import static com.google.inject.name.Names.named;
public class MonitorP2PModule extends AppModule {
public MonitorP2PModule(Environment environment) {
super(environment);
}
@Override
protected void configure() {
bind(MonitorP2PService.class).in(Singleton.class);
bind(PeerManager.class).in(Singleton.class);
bind(P2PDataStorage.class).in(Singleton.class);
bind(RequestDataManager.class).in(Singleton.class);
bind(PeerExchangeManager.class).in(Singleton.class);
bind(KeepAliveManager.class).in(Singleton.class);
bind(Broadcaster.class).in(Singleton.class);
bind(BanList.class).in(Singleton.class);
bind(NetworkNode.class).toProvider(NetworkNodeProvider.class).in(Singleton.class);
bind(Socks5ProxyProvider.class).in(Singleton.class);
Boolean useLocalhostForP2P = environment.getProperty(NetworkOptionKeys.USE_LOCALHOST_FOR_P2P, boolean.class, false);
bind(boolean.class).annotatedWith(Names.named(NetworkOptionKeys.USE_LOCALHOST_FOR_P2P)).toInstance(useLocalhostForP2P);
File torDir = new File(environment.getRequiredProperty(NetworkOptionKeys.TOR_DIR));
bind(File.class).annotatedWith(named(NetworkOptionKeys.TOR_DIR)).toInstance(torDir);
// use a fixed port as arbitrator use that for his ID
Integer port = environment.getProperty(NetworkOptionKeys.PORT_KEY, int.class, 9999);
bind(int.class).annotatedWith(Names.named(NetworkOptionKeys.PORT_KEY)).toInstance(port);
Integer maxConnections = environment.getProperty(NetworkOptionKeys.MAX_CONNECTIONS, int.class, P2PService.MAX_CONNECTIONS_DEFAULT);
bind(int.class).annotatedWith(Names.named(NetworkOptionKeys.MAX_CONNECTIONS)).toInstance(maxConnections);
Integer networkId = environment.getProperty(NetworkOptionKeys.NETWORK_ID, int.class, 1);
bind(int.class).annotatedWith(Names.named(NetworkOptionKeys.NETWORK_ID)).toInstance(networkId);
bindConstant().annotatedWith(named(NetworkOptionKeys.SEED_NODES_KEY)).to(environment.getRequiredProperty(NetworkOptionKeys.SEED_NODES_KEY));
bindConstant().annotatedWith(named(NetworkOptionKeys.MY_ADDRESS)).to(environment.getRequiredProperty(NetworkOptionKeys.MY_ADDRESS));
bindConstant().annotatedWith(named(NetworkOptionKeys.BAN_LIST)).to(environment.getRequiredProperty(NetworkOptionKeys.BAN_LIST));
bindConstant().annotatedWith(named(NetworkOptionKeys.SOCKS_5_PROXY_BTC_ADDRESS)).to(environment.getRequiredProperty(NetworkOptionKeys.SOCKS_5_PROXY_BTC_ADDRESS));
bindConstant().annotatedWith(named(NetworkOptionKeys.SOCKS_5_PROXY_HTTP_ADDRESS)).to(environment.getRequiredProperty(NetworkOptionKeys.SOCKS_5_PROXY_HTTP_ADDRESS));
}
}

View file

@ -0,0 +1,117 @@
/*
* This file is part of bisq.
*
* bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bisq.seednode_monitor;
import io.bisq.common.app.Log;
import io.bisq.common.proto.persistable.PersistedDataHost;
import io.bisq.network.Socks5ProxyProvider;
import io.bisq.network.p2p.network.NetworkNode;
import io.bisq.network.p2p.network.SetupListener;
import io.bisq.network.p2p.storage.P2PDataStorage;
import io.bisq.seednode_monitor.request.MonitorRequestManager;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import static com.google.common.base.Preconditions.checkArgument;
@Slf4j
public class MonitorP2PService implements SetupListener, PersistedDataHost {
private final NetworkNode networkNode;
@Getter
private final P2PDataStorage p2PDataStorage;
private final MonitorRequestManager requestDataManager;
private final Socks5ProxyProvider socks5ProxyProvider;
private volatile boolean shutDownInProgress;
private boolean shutDownComplete;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
@com.google.inject.Inject
public MonitorP2PService(NetworkNode networkNode,
P2PDataStorage p2PDataStorage,
MonitorRequestManager requestDataManager,
Socks5ProxyProvider socks5ProxyProvider) {
this.networkNode = networkNode;
this.p2PDataStorage = p2PDataStorage;
this.requestDataManager = requestDataManager;
this.socks5ProxyProvider = socks5ProxyProvider;
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void readPersisted() {
p2PDataStorage.readPersisted();
}
public void start() {
networkNode.start(this);
}
public void shutDown(Runnable shutDownCompleteHandler) {
Log.traceCall();
if (!shutDownInProgress) {
shutDownInProgress = true;
if (networkNode != null) {
networkNode.shutDown(() -> {
shutDownComplete = true;
});
} else {
shutDownComplete = true;
}
} else {
log.debug("shutDown already in progress");
if (shutDownComplete) {
shutDownCompleteHandler.run();
}
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// SetupListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onTorNodeReady() {
socks5ProxyProvider.setSocks5ProxyInternal(networkNode.getSocksProxy());
}
@Override
public void onHiddenServicePublished() {
checkArgument(networkNode.getNodeAddress() != null, "Address must be set when we have the hidden service ready");
requestDataManager.start();
}
@Override
public void onSetupFailed(Throwable throwable) {
}
@Override
public void onRequestCustomBridges() {
}
}

View file

@ -0,0 +1,155 @@
package io.bisq.seednode_monitor;
import ch.qos.logback.classic.Level;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.name.Names;
import io.bisq.common.CommonOptionKeys;
import io.bisq.common.UserThread;
import io.bisq.common.app.Capabilities;
import io.bisq.common.app.Log;
import io.bisq.common.app.Version;
import io.bisq.common.crypto.LimitedKeyStrengthException;
import io.bisq.common.handlers.ResultHandler;
import io.bisq.common.locale.CurrencyUtil;
import io.bisq.common.locale.Res;
import io.bisq.common.util.Utilities;
import io.bisq.core.arbitration.ArbitratorManager;
import io.bisq.core.btc.BaseCurrencyNetwork;
import io.bisq.core.btc.wallet.BsqWalletService;
import io.bisq.core.btc.wallet.BtcWalletService;
import io.bisq.core.btc.wallet.WalletsSetup;
import io.bisq.core.dao.DaoOptionKeys;
import io.bisq.core.offer.OpenOfferManager;
import io.bisq.network.p2p.P2PService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.bitcoinj.store.BlockStoreException;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import java.nio.file.Paths;
import java.security.NoSuchAlgorithmException;
import java.security.Security;
import java.util.ArrayList;
import java.util.Arrays;
@Slf4j
public class SeedNodeMonitor {
// Bisq v0.6.1 did not change anything relevant for that project so we stick with 0.6.0
public static final String VERSION = "0.6.0";
private static BisqEnvironment bisqEnvironment;
public static void setEnvironment(BisqEnvironment bisqEnvironment) {
SeedNodeMonitor.bisqEnvironment = bisqEnvironment;
}
private final Injector injector;
private final SeedNodeMonitorModule seedNodeModule;
private final AppSetup appSetup;
public SeedNodeMonitor() {
String logPath = Paths.get(bisqEnvironment.getProperty(AppOptionKeys.APP_DATA_DIR_KEY), "bisq").toString();
Log.setup(logPath);
Log.setLevel(Level.toLevel(bisqEnvironment.getRequiredProperty(CommonOptionKeys.LOG_LEVEL_KEY)));
log.info("Log files under: " + logPath);
log.info("SeedNode.VERSION: " + SeedNodeMonitor.VERSION);
log.info("Bisq exchange Version{" +
"VERSION=" + Version.VERSION +
", P2P_NETWORK_VERSION=" + Version.P2P_NETWORK_VERSION +
", LOCAL_DB_VERSION=" + Version.LOCAL_DB_VERSION +
", TRADE_PROTOCOL_VERSION=" + Version.TRADE_PROTOCOL_VERSION +
", BASE_CURRENCY_NETWORK=NOT SET" +
", getP2PNetworkId()=NOT SET" +
'}');
Utilities.printSysInfo();
// setup UncaughtExceptionHandler
Thread.UncaughtExceptionHandler handler = (thread, throwable) -> {
// Might come from another thread
if (throwable.getCause() != null && throwable.getCause().getCause() != null &&
throwable.getCause().getCause() instanceof BlockStoreException) {
log.error(throwable.getMessage());
} else {
log.error("Uncaught Exception from thread " + Thread.currentThread().getName());
log.error("throwableMessage= " + throwable.getMessage());
log.error("throwableClass= " + throwable.getClass());
log.error("Stack trace:\n" + ExceptionUtils.getStackTrace(throwable));
throwable.printStackTrace();
}
};
Thread.setDefaultUncaughtExceptionHandler(handler);
Thread.currentThread().setUncaughtExceptionHandler(handler);
try {
Utilities.checkCryptoPolicySetup();
} catch (NoSuchAlgorithmException | LimitedKeyStrengthException e) {
e.printStackTrace();
UserThread.execute(this::shutDown);
}
Security.addProvider(new BouncyCastleProvider());
final BaseCurrencyNetwork baseCurrencyNetwork = BisqEnvironment.getBaseCurrencyNetwork();
final String currencyCode = baseCurrencyNetwork.getCurrencyCode();
Res.setBaseCurrencyCode(currencyCode);
Res.setBaseCurrencyName(baseCurrencyNetwork.getCurrencyName());
CurrencyUtil.setBaseCurrencyCode(currencyCode);
seedNodeModule = new SeedNodeMonitorModule(bisqEnvironment);
injector = Guice.createInjector(seedNodeModule);
Boolean fullDaoNode = injector.getInstance(Key.get(Boolean.class, Names.named(DaoOptionKeys.FULL_DAO_NODE)));
appSetup = fullDaoNode ? injector.getInstance(AppSetupWithP2PAndDAO.class) : injector.getInstance(AppSetupWithP2P.class);
if (fullDaoNode)
Capabilities.setSupportedCapabilities(new ArrayList<>(Arrays.asList(
Capabilities.Capability.TRADE_STATISTICS.ordinal(),
Capabilities.Capability.TRADE_STATISTICS_2.ordinal(),
Capabilities.Capability.ACCOUNT_AGE_WITNESS.ordinal(),
Capabilities.Capability.SEED_NODE.ordinal(),
Capabilities.Capability.DAO_FULL_NODE.ordinal()
)));
else
Capabilities.setSupportedCapabilities(new ArrayList<>(Arrays.asList(
Capabilities.Capability.TRADE_STATISTICS.ordinal(),
Capabilities.Capability.TRADE_STATISTICS_2.ordinal(),
Capabilities.Capability.ACCOUNT_AGE_WITNESS.ordinal(),
Capabilities.Capability.SEED_NODE.ordinal()
)));
appSetup.start();
}
private void shutDown() {
gracefulShutDown(() -> {
log.debug("Shutdown complete");
System.exit(0);
});
}
public void gracefulShutDown(ResultHandler resultHandler) {
log.debug("gracefulShutDown");
try {
if (injector != null) {
injector.getInstance(ArbitratorManager.class).shutDown();
injector.getInstance(OpenOfferManager.class).shutDown(() -> injector.getInstance(P2PService.class).shutDown(() -> {
injector.getInstance(WalletsSetup.class).shutDownComplete.addListener((ov, o, n) -> {
seedNodeModule.close(injector);
log.debug("Graceful shutdown completed");
resultHandler.handleResult();
});
injector.getInstance(WalletsSetup.class).shutDown();
injector.getInstance(BtcWalletService.class).shutDown();
injector.getInstance(BsqWalletService.class).shutDown();
}));
// we wait max 5 sec.
UserThread.runAfter(resultHandler::handleResult, 5);
} else {
UserThread.runAfter(resultHandler::handleResult, 1);
}
} catch (Throwable t) {
log.debug("App shutdown failed with exception");
t.printStackTrace();
System.exit(1);
}
}
}

View file

@ -0,0 +1,26 @@
/*
* This file is part of bisq.
*
* bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bisq.seednode_monitor;
import io.bisq.core.app.BisqEnvironment;
public class SeedNodeMonitorEnvironment extends BisqEnvironment {
public SeedNodeMonitorEnvironment() {
}
}

View file

@ -0,0 +1,184 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bisq.seednode_monitor;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.bisq.common.UserThread;
import io.bisq.common.util.Profiler;
import io.bisq.common.util.RestartUtil;
import io.bisq.common.util.Utilities;
import io.bisq.core.app.AppOptionKeys;
import io.bisq.core.app.BisqEnvironment;
import io.bisq.core.app.BisqExecutable;
import joptsimple.OptionException;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.bitcoinj.store.BlockStoreException;
import java.io.IOException;
import java.util.Locale;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import static io.bisq.core.app.BisqEnvironment.DEFAULT_APP_NAME;
import static io.bisq.core.app.BisqEnvironment.DEFAULT_USER_DATA_DIR;
@Slf4j
public class SeedNodeMonitorMain extends BisqExecutable {
private static final long MAX_MEMORY_MB_DEFAULT = 500;
private static final long CHECK_MEMORY_PERIOD_SEC = 5 * 60;
private SeedNode seedNode;
private volatile boolean stopped;
private static long maxMemory = MAX_MEMORY_MB_DEFAULT;
static {
// Need to set default locale initially otherwise we get problems at non-english OS
Locale.setDefault(new Locale("en", Locale.getDefault().getCountry()));
Utilities.removeCryptographyRestrictions();
}
public static void main(String[] args) throws Exception {
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("SeedNodeMain")
.setDaemon(true)
.build();
UserThread.setExecutor(Executors.newSingleThreadExecutor(threadFactory));
// We don't want to do the full argument parsing here as that might easily change in update versions
// So we only handle the absolute minimum which is APP_NAME, APP_DATA_DIR_KEY and USER_DATA_DIR
BisqEnvironment.setDefaultAppName("bisq_seednode");
OptionParser parser = new OptionParser();
parser.allowsUnrecognizedOptions();
parser.accepts(AppOptionKeys.USER_DATA_DIR_KEY, description("User data directory", DEFAULT_USER_DATA_DIR))
.withRequiredArg();
parser.accepts(AppOptionKeys.APP_NAME_KEY, description("Application name", DEFAULT_APP_NAME))
.withRequiredArg();
OptionSet options;
try {
options = parser.parse(args);
} catch (OptionException ex) {
System.out.println("error: " + ex.getMessage());
System.out.println();
parser.printHelpOn(System.out);
System.exit(EXIT_FAILURE);
return;
}
BisqEnvironment bisqEnvironment = getBisqEnvironment(options);
// need to call that before BisqAppMain().execute(args)
BisqExecutable.initAppDir(bisqEnvironment.getProperty(AppOptionKeys.APP_DATA_DIR_KEY));
// For some reason the JavaFX launch process results in us losing the thread context class loader: reset it.
// In order to work around a bug in JavaFX 8u25 and below, you must include the following code as the first line of your realMain method:
Thread.currentThread().setContextClassLoader(SeedNodeMonitorMain.class.getClassLoader());
new SeedNodeMonitorMain().execute(args);
}
@SuppressWarnings("InfiniteLoopStatement")
@Override
protected void doExecute(OptionSet options) {
final BisqEnvironment bisqEnvironment = getBisqEnvironment(options);
SeedNode.setEnvironment(bisqEnvironment);
UserThread.execute(() -> {
try {
seedNode = new SeedNode();
} catch (Exception e) {
e.printStackTrace();
}
});
Thread.UncaughtExceptionHandler handler = (thread, throwable) -> {
if (throwable.getCause() != null && throwable.getCause().getCause() != null &&
throwable.getCause().getCause() instanceof BlockStoreException) {
log.error(throwable.getMessage());
} else {
log.error("Uncaught Exception from thread " + Thread.currentThread().getName());
log.error("throwableMessage= " + throwable.getMessage());
log.error("throwableClass= " + throwable.getClass());
log.error("Stack trace:\n" + ExceptionUtils.getStackTrace(throwable));
throwable.printStackTrace();
log.error("We shut down the app because an unhandled error occurred");
// We don't use the restart as in case of OutOfMemory errors the restart might fail as well
// The run loop will restart the node anyway...
System.exit(EXIT_FAILURE);
}
};
Thread.setDefaultUncaughtExceptionHandler(handler);
Thread.currentThread().setUncaughtExceptionHandler(handler);
String maxMemoryOption = bisqEnvironment.getProperty(AppOptionKeys.MAX_MEMORY);
if (maxMemoryOption != null && !maxMemoryOption.isEmpty()) {
try {
maxMemory = Integer.parseInt(maxMemoryOption);
} catch (Throwable t) {
log.error(t.getMessage());
}
}
UserThread.runPeriodically(() -> {
Profiler.printSystemLoad(log);
if (!stopped) {
long usedMemoryInMB = Profiler.getUsedMemoryInMB();
if (usedMemoryInMB > (maxMemory * 0.8)) {
log.warn("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" +
"We are over our memory warn limit and call the GC. usedMemoryInMB: {}" +
"\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n\n",
usedMemoryInMB);
System.gc();
Profiler.printSystemLoad(log);
}
UserThread.runAfter(() -> {
if (Profiler.getUsedMemoryInMB() > maxMemory)
restart(bisqEnvironment);
}, 5);
}
}, CHECK_MEMORY_PERIOD_SEC);
while (true) {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException ignore) {
}
}
}
private void restart(BisqEnvironment bisqEnvironment) {
stopped = true;
seedNode.gracefulShutDown(() -> {
//noinspection finally
try {
final String[] tokens = bisqEnvironment.getAppDataDir().split("_");
String logPath = "error_" + (tokens.length > 1 ? tokens[tokens.length - 2] : "") + ".log";
RestartUtil.restartApplication(logPath);
} catch (IOException e) {
log.error(e.toString());
e.printStackTrace();
} finally {
log.warn("Shutdown complete");
System.exit(0);
}
});
}
}

View file

@ -0,0 +1,129 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bisq.seednode_monitor;
import com.google.inject.Singleton;
import io.bisq.common.Clock;
import io.bisq.common.app.AppModule;
import io.bisq.common.crypto.KeyRing;
import io.bisq.common.crypto.KeyStorage;
import io.bisq.common.proto.network.NetworkProtoResolver;
import io.bisq.common.proto.persistable.PersistenceProtoResolver;
import io.bisq.common.storage.Storage;
import io.bisq.core.alert.AlertModule;
import io.bisq.core.app.BisqEnvironment;
import io.bisq.core.arbitration.ArbitratorModule;
import io.bisq.core.btc.BitcoinModule;
import io.bisq.core.dao.DaoModule;
import io.bisq.core.filter.FilterModule;
import io.bisq.core.network.CoreSeedNodesRepository;
import io.bisq.core.offer.OfferModule;
import io.bisq.core.proto.network.CoreNetworkProtoResolver;
import io.bisq.core.proto.persistable.CorePersistenceProtoResolver;
import io.bisq.core.trade.TradeModule;
import io.bisq.core.user.Preferences;
import io.bisq.core.user.User;
import io.bisq.network.crypto.EncryptionServiceModule;
import io.bisq.network.p2p.network.BridgeAddressProvider;
import io.bisq.network.p2p.seed.SeedNodesRepository;
import org.springframework.core.env.Environment;
import java.io.File;
import static com.google.inject.name.Names.named;
class SeedNodeMonitorModule extends AppModule {
public SeedNodeMonitorModule(Environment environment) {
super(environment);
}
@Override
protected void configure() {
bind(BisqEnvironment.class).toInstance((BisqEnvironment) environment);
// bind(CachingViewLoader.class).in(Singleton.class);
bind(KeyStorage.class).in(Singleton.class);
bind(KeyRing.class).in(Singleton.class);
bind(User.class).in(Singleton.class);
// bind(NotificationCenter.class).in(Singleton.class);
bind(Clock.class).in(Singleton.class);
bind(Preferences.class).in(Singleton.class);
bind(BridgeAddressProvider.class).to(Preferences.class).in(Singleton.class);
bind(SeedNodesRepository.class).to(CoreSeedNodesRepository.class).in(Singleton.class);
File storageDir = new File(environment.getRequiredProperty(Storage.STORAGE_DIR));
bind(File.class).annotatedWith(named(Storage.STORAGE_DIR)).toInstance(storageDir);
File keyStorageDir = new File(environment.getRequiredProperty(KeyStorage.KEY_STORAGE_DIR));
bind(File.class).annotatedWith(named(KeyStorage.KEY_STORAGE_DIR)).toInstance(keyStorageDir);
bind(NetworkProtoResolver.class).to(CoreNetworkProtoResolver.class).in(Singleton.class);
bind(PersistenceProtoResolver.class).to(CorePersistenceProtoResolver.class).in(Singleton.class);
// ordering is used for shut down sequence
install(tradeModule());
install(encryptionServiceModule());
install(arbitratorModule());
install(offerModule());
install(p2pModule());
install(bitcoinModule());
install(daoModule());
// install(guiModule());
install(alertModule());
install(filterModule());
}
private TradeModule tradeModule() {
return new TradeModule(environment);
}
private EncryptionServiceModule encryptionServiceModule() {
return new EncryptionServiceModule(environment);
}
private ArbitratorModule arbitratorModule() {
return new ArbitratorModule(environment);
}
private AlertModule alertModule() {
return new AlertModule(environment);
}
private FilterModule filterModule() {
return new FilterModule(environment);
}
private OfferModule offerModule() {
return new OfferModule(environment);
}
private MonitorP2PModule p2pModule() {
return new MonitorP2PModule(environment);
}
private BitcoinModule bitcoinModule() {
return new BitcoinModule(environment);
}
private DaoModule daoModule() {
return new DaoModule(environment);
}
}

View file

@ -0,0 +1,265 @@
package io.bisq.seednode_monitor.request;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import io.bisq.common.Timer;
import io.bisq.common.UserThread;
import io.bisq.common.app.DevEnv;
import io.bisq.common.app.Log;
import io.bisq.common.proto.network.NetworkEnvelope;
import io.bisq.common.proto.network.NetworkPayload;
import io.bisq.network.p2p.NodeAddress;
import io.bisq.network.p2p.network.CloseConnectionReason;
import io.bisq.network.p2p.network.Connection;
import io.bisq.network.p2p.network.MessageListener;
import io.bisq.network.p2p.network.NetworkNode;
import io.bisq.network.p2p.peers.getdata.messages.GetDataRequest;
import io.bisq.network.p2p.peers.getdata.messages.GetDataResponse;
import io.bisq.network.p2p.peers.getdata.messages.PreliminaryGetDataRequest;
import io.bisq.network.p2p.storage.P2PDataStorage;
import io.bisq.network.p2p.storage.payload.PersistableNetworkPayload;
import io.bisq.network.p2p.storage.payload.ProtectedStorageEntry;
import io.bisq.network.p2p.storage.payload.ProtectedStoragePayload;
import io.bisq.seednode_monitor.Metrics;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import java.util.*;
import java.util.stream.Collectors;
@Slf4j
class MonitorRequestHandler implements MessageListener {
private static final long TIME_OUT_SEC = 120;
private NodeAddress peersNodeAddress;
private long requestTs;
///////////////////////////////////////////////////////////////////////////////////////////
// Listener
///////////////////////////////////////////////////////////////////////////////////////////
public interface Listener {
void onComplete();
@SuppressWarnings("UnusedParameters")
void onFault(String errorMessage);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Class fields
///////////////////////////////////////////////////////////////////////////////////////////
private final NetworkNode networkNode;
private P2PDataStorage dataStorage;
private Metrics metric;
private final Listener listener;
private Timer timeoutTimer;
private final int nonce = new Random().nextInt();
private boolean stopped;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public MonitorRequestHandler(NetworkNode networkNode, P2PDataStorage dataStorage, Metrics metric, Listener listener) {
this.networkNode = networkNode;
this.dataStorage = dataStorage;
this.metric = metric;
this.listener = listener;
}
public void cancel() {
cleanup();
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void requestData(NodeAddress nodeAddress) {
Log.traceCall("nodeAddress=" + nodeAddress);
peersNodeAddress = nodeAddress;
requestTs = new Date().getTime();
if (!stopped) {
Set<byte[]> excludedKeys = dataStorage.getPersistableNetworkPayloadCollection().getMap().entrySet().stream()
.map(e -> e.getKey().bytes)
.collect(Collectors.toSet());
GetDataRequest getDataRequest = new PreliminaryGetDataRequest(nonce, excludedKeys);
if (timeoutTimer != null) {
log.warn("timeoutTimer was already set. That must not happen.");
timeoutTimer.stop();
if (DevEnv.DEV_MODE)
throw new RuntimeException("timeoutTimer was already set. That must not happen.");
}
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
if (!stopped) {
String errorMessage = "A timeout occurred at sending getDataRequest:" + getDataRequest +
" on nodeAddress:" + nodeAddress;
log.warn(errorMessage + " / RequestDataHandler=" + MonitorRequestHandler.this);
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
} else {
log.trace("We have stopped already. We ignore that timeoutTimer.run call. " +
"Might be caused by an previous networkNode.sendMessage.onFailure.");
}
},
TIME_OUT_SEC);
log.info("We send a {} to peer {}. ", getDataRequest.getClass().getSimpleName(), nodeAddress);
networkNode.addMessageListener(this);
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getDataRequest);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
if (!stopped) {
log.info("Send " + getDataRequest + " to " + nodeAddress + " has succeeded.");
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call." +
"Might be caused by an previous timeout.");
}
}
@Override
public void onFailure(@NotNull Throwable throwable) {
if (!stopped) {
String errorMessage = "Sending getDataRequest to " + nodeAddress +
" failed.\n\t" +
"getDataRequest=" + getDataRequest + "." +
"\n\tException=" + throwable.getMessage();
log.warn(errorMessage);
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onFailure call. " +
"Might be caused by an previous timeout.");
}
}
});
} else {
log.warn("We have stopped already. We ignore that requestData call.");
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onMessage(NetworkEnvelope networkEnvelop, Connection connection) {
if (networkEnvelop instanceof GetDataResponse &&
connection.getPeersNodeAddressOptional().isPresent() &&
connection.getPeersNodeAddressOptional().get().equals(peersNodeAddress)) {
Log.traceCall(networkEnvelop.toString() + "\n\tconnection=" + connection);
if (!stopped) {
GetDataResponse getDataResponse = (GetDataResponse) networkEnvelop;
if (getDataResponse.getRequestNonce() == nonce) {
stopTimeoutTimer();
Map<String, Set<NetworkPayload>> payloadByClassName = new HashMap<>();
final Set<ProtectedStorageEntry> dataSet = getDataResponse.getDataSet();
dataSet.stream().forEach(e -> {
final ProtectedStoragePayload protectedStoragePayload = e.getProtectedStoragePayload();
if (protectedStoragePayload == null) {
log.warn("StoragePayload was null: {}", networkEnvelop.toString());
return;
}
// For logging different data types
String className = protectedStoragePayload.getClass().getSimpleName();
if (!payloadByClassName.containsKey(className))
payloadByClassName.put(className, new HashSet<>());
payloadByClassName.get(className).add(protectedStoragePayload);
});
Set<PersistableNetworkPayload> persistableNetworkPayloadSet = getDataResponse.getPersistableNetworkPayloadSet();
if (persistableNetworkPayloadSet != null) {
persistableNetworkPayloadSet.stream().forEach(persistableNetworkPayload -> {
// For logging different data types
String className = persistableNetworkPayload.getClass().getSimpleName();
if (!payloadByClassName.containsKey(className))
payloadByClassName.put(className, new HashSet<>());
payloadByClassName.get(className).add(persistableNetworkPayload);
});
}
// Log different data types
StringBuilder sb = new StringBuilder();
sb.append("\n#################################################################\n");
sb.append("Connected to node: ").append(peersNodeAddress.getFullAddress()).append("\n");
final int items = dataSet.size() +
(persistableNetworkPayloadSet != null ? persistableNetworkPayloadSet.size() : 0);
sb.append("Received ").append(items).append(" instances\n");
Map<String, Integer> receivedObjects = new HashMap<>();
payloadByClassName.entrySet().stream().forEach(e -> {
sb.append(e.getKey())
.append(": ")
.append(e.getValue().size())
.append("\n");
receivedObjects.put(e.getKey(), e.getValue().size());
});
sb.append("#################################################################");
log.info(sb.toString());
metric.getReceivedObjectsList().add(receivedObjects);
final long duration = new Date().getTime() - requestTs;
log.info("Requesting data took {} ms", duration);
metric.getRequestDurations().add(duration);
cleanup();
connection.shutDown(CloseConnectionReason.CLOSE_REQUESTED_BY_PEER, listener::onComplete);
} else {
log.debug("Nonce not matching. That can happen rarely if we get a response after a canceled " +
"handshake (timeout causes connection close but peer might have sent a msg before " +
"connection was closed).\n\t" +
"We drop that message. nonce={} / requestNonce={}",
nonce, getDataResponse.getRequestNonce());
}
} else {
log.warn("We have stopped already. We ignore that onDataRequest call.");
}
}
}
public void stop() {
cleanup();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void handleFault(String errorMessage, NodeAddress nodeAddress, CloseConnectionReason closeConnectionReason) {
cleanup();
metric.getErrorMessages().add(errorMessage + " (" + new Date().toString() + ")");
// In case we would have already a connection we close it
networkNode.getAllConnections().stream()
.filter(connection -> connection.getPeersNodeAddressOptional().isPresent() && connection.getPeersNodeAddressOptional().get().equals(nodeAddress))
.forEach(c -> c.shutDown(closeConnectionReason));
listener.onFault(errorMessage);
}
private void cleanup() {
Log.traceCall();
stopped = true;
networkNode.removeMessageListener(this);
stopTimeoutTimer();
}
private void stopTimeoutTimer() {
if (timeoutTimer != null) {
timeoutTimer.stop();
timeoutTimer = null;
}
}
}

View file

@ -0,0 +1,316 @@
package io.bisq.seednode_monitor.request;
import io.bisq.common.Clock;
import io.bisq.common.Timer;
import io.bisq.common.UserThread;
import io.bisq.common.app.Log;
import io.bisq.common.util.MathUtils;
import io.bisq.network.p2p.NodeAddress;
import io.bisq.network.p2p.network.CloseConnectionReason;
import io.bisq.network.p2p.network.Connection;
import io.bisq.network.p2p.network.ConnectionListener;
import io.bisq.network.p2p.network.NetworkNode;
import io.bisq.network.p2p.seed.SeedNodesRepository;
import io.bisq.network.p2p.storage.P2PDataStorage;
import io.bisq.seednode_monitor.Metrics;
import io.bisq.seednode_monitor.SeedNodeMonitorMain;
import lombok.extern.slf4j.Slf4j;
import javax.inject.Inject;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Slf4j
public class MonitorRequestManager implements ConnectionListener {
private static final long RETRY_DELAY_SEC = 30;
private static final long CLEANUP_TIMER = 60;
private static final long REQUEST_PERIOD_SEC = 60 * 10;
private static final long REQUEST_PERIOD_MIN = 30;
///////////////////////////////////////////////////////////////////////////////////////////
// Listener
///////////////////////////////////////////////////////////////////////////////////////////
public interface Listener {
void onDataReceived();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Class fields
///////////////////////////////////////////////////////////////////////////////////////////
private final NetworkNode networkNode;
private P2PDataStorage dataStorage;
private SeedNodesRepository seedNodesRepository;
private Clock clock;
private final Set<NodeAddress> seedNodeAddresses;
//TODO
private Listener listener = new Listener() {
@Override
public void onDataReceived() {
}
};
private final Map<NodeAddress, MonitorRequestHandler> handlerMap = new HashMap<>();
private final Map<NodeAddress, Metrics> metricsMap = new HashMap<>();
private Map<NodeAddress, Timer> retryTimerMap = new HashMap<>();
private boolean stopped;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
@Inject
public MonitorRequestManager(NetworkNode networkNode,
P2PDataStorage dataStorage,
SeedNodesRepository seedNodesRepository,
Clock clock) {
this.networkNode = networkNode;
this.dataStorage = dataStorage;
this.seedNodesRepository = seedNodesRepository;
this.clock = clock;
this.networkNode.addConnectionListener(this);
seedNodeAddresses = new HashSet<>(seedNodesRepository.getSeedNodeAddresses());
seedNodeAddresses.addAll(seedNodesRepository.getSeedNodeAddressesOldVersions());
seedNodeAddresses.stream().forEach(nodeAddress -> metricsMap.put(nodeAddress, new Metrics()));
}
public void shutDown() {
Log.traceCall();
stopped = true;
stopAllRetryTimers();
networkNode.removeConnectionListener(this);
closeAllHandlers();
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void addListener(Listener listener) {
this.listener = listener;
}
public void start() {
// We want get the logs each 10 minutes
clock.start();
clock.addListener(new Clock.Listener() {
@Override
public void onSecondTick() {
//TODO test
processOnMinuteTick();
}
@Override
public void onMinuteTick() {
// processOnMinuteTick();
}
@Override
public void onMissedSecondTick(long missed) {
}
});
}
private void processOnMinuteTick() {
// long minutes = System.currentTimeMillis() / 1000 / 60;
long minutes = System.currentTimeMillis() / 1000 ;
if (minutes % REQUEST_PERIOD_MIN == 0) {
stopAllRetryTimers();
closeAllConnections();
// we give 1 sec. for all connection shutdown
final int[] delay = {1000};
seedNodeAddresses.stream().forEach(nodeAddress -> {
UserThread.runAfter(() -> requestData(nodeAddress), delay[0], TimeUnit.MILLISECONDS);
delay[0] += 100;
});
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onConnection(Connection connection) {
Log.traceCall();
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
Log.traceCall();
closeHandler(connection);
}
@Override
public void onError(Throwable throwable) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// RequestData
///////////////////////////////////////////////////////////////////////////////////////////
private void requestData(NodeAddress nodeAddress) {
if (!stopped) {
if (!handlerMap.containsKey(nodeAddress)) {
MonitorRequestHandler requestDataHandler = new MonitorRequestHandler(networkNode,
dataStorage,
metricsMap.get(nodeAddress),
new MonitorRequestHandler.Listener() {
@Override
public void onComplete() {
log.trace("RequestDataHandshake of outbound connection complete. nodeAddress={}",
nodeAddress);
stopRetryTimer(nodeAddress);
// need to remove before listeners are notified as they cause the update call
handlerMap.remove(nodeAddress);
listener.onDataReceived();
onMetricsUpdated();
}
@Override
public void onFault(String errorMessage) {
handlerMap.remove(nodeAddress);
onMetricsUpdated();
final Timer timer = UserThread.runAfter(() -> requestData(nodeAddress), RETRY_DELAY_SEC);
retryTimerMap.put(nodeAddress, timer);
}
});
handlerMap.put(nodeAddress, requestDataHandler);
requestDataHandler.requestData(nodeAddress);
} else {
log.warn("We have started already a requestDataHandshake to peer. nodeAddress=" + nodeAddress + "\n" +
"We start a cleanup timer if the handler has not closed by itself in between 2 minutes.");
UserThread.runAfter(() -> {
if (handlerMap.containsKey(nodeAddress)) {
MonitorRequestHandler handler = handlerMap.get(nodeAddress);
handler.stop();
handlerMap.remove(nodeAddress);
}
}, CLEANUP_TIMER);
}
} else {
log.warn("We have stopped already. We ignore that requestData call.");
}
}
private void onMetricsUpdated() {
Map<String, Double> accumulatedValues = new HashMap<>();
final double[] items = {0};
metricsMap.entrySet().stream().forEach(e -> {
final List<Map<String, Integer>> receivedObjectsList = e.getValue().getReceivedObjectsList();
if (!receivedObjectsList.isEmpty()) {
items[0] += 1;
Map<String, Integer> last = receivedObjectsList.get(receivedObjectsList.size() - 1);
last.entrySet().stream().forEach(e2 -> {
int accuValue = e2.getValue();
if (accumulatedValues.containsKey(e2.getKey()))
accuValue += accumulatedValues.get(e2.getKey());
accumulatedValues.put(e2.getKey(), (double) accuValue);
});
}
});
Map<String, Double> averageValues = new HashMap<>();
accumulatedValues.entrySet().stream().forEach(e -> {
averageValues.put(e.getKey(), e.getValue() / items[0]);
});
StringBuilder sb = new StringBuilder("\n#################################################################\n");
metricsMap.entrySet().stream().forEach(e -> {
final OptionalDouble averageOptional = e.getValue().getRequestDurations().stream().mapToLong(value -> value).average();
int average = 0;
if (averageOptional.isPresent())
average = (int) averageOptional.getAsDouble();
sb.append("\nNode: ")
.append(e.getKey())
.append(" (")
.append(seedNodesRepository.getOperator(e.getKey()))
.append(")\n")
.append("Durations: ")
.append(e.getValue().getRequestDurations())
.append("\n")
.append("Duration average: ")
.append(average)
.append("\n")
.append("Errors: ")
.append(e.getValue().getErrorMessages())
.append("\n")
.append("All data: ")
.append(e.getValue().getReceivedObjectsList())
.append("\n");
final List<Map<String, Integer>> receivedObjectsList = e.getValue().getReceivedObjectsList();
if (!receivedObjectsList.isEmpty()) {
Map<String, Integer> last = receivedObjectsList.get(receivedObjectsList.size() - 1);
sb.append("Last data: ").append(last).append("\nAverage of last:\n");
last.entrySet().stream().forEach(e2 -> {
double deviation = MathUtils.roundDouble((double) e2.getValue() / averageValues.get(e2.getKey()) * 100, 2);
sb.append(e2.getKey()).append(": ")
.append(deviation).append(" % compared to average")
.append("\n");
});
}
});
sb.append("\n#################################################################\n\n");
log.info(sb.toString());
SeedNodeMonitorMain.metricsLog = sb.toString();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Utils
///////////////////////////////////////////////////////////////////////////////////////////
private void closeAllConnections() {
networkNode.getAllConnections().stream().forEach(connection -> connection.shutDown(CloseConnectionReason.CLOSE_REQUESTED_BY_PEER));
}
private void stopAllRetryTimers() {
retryTimerMap.values().stream().forEach(Timer::stop);
retryTimerMap.clear();
}
private void stopRetryTimer(NodeAddress nodeAddress) {
retryTimerMap.entrySet().stream()
.filter(e -> e.getKey().equals(nodeAddress))
.forEach(e -> e.getValue().stop());
retryTimerMap.remove(nodeAddress);
}
private void closeHandler(Connection connection) {
Optional<NodeAddress> peersNodeAddressOptional = connection.getPeersNodeAddressOptional();
if (peersNodeAddressOptional.isPresent()) {
NodeAddress nodeAddress = peersNodeAddressOptional.get();
if (handlerMap.containsKey(nodeAddress)) {
handlerMap.get(nodeAddress).cancel();
handlerMap.remove(nodeAddress);
}
} else {
log.trace("closeRequestDataHandler: nodeAddress not set in connection " + connection);
}
}
private void closeAllHandlers() {
handlerMap.values().stream().forEach(MonitorRequestHandler::cancel);
handlerMap.clear();
}
}

View file

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ This file is part of Bisq.
~
~ Bisq is free software: you can redistribute it and/or modify it
~ under the terms of the GNU Affero General Public License as published by
~ the Free Software Foundation, either version 3 of the License, or (at
~ your option) any later version.
~
~ Bisq is distributed in the hope that it will be useful, but WITHOUT
~ ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
~ FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
~ License for more details.
~
~ You should have received a copy of the GNU Affero General Public License
~ along with Bisq. If not, see <http://www.gnu.org/licenses/>.
-->
<configuration>
<appender name="CONSOLE_APPENDER" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%highlight(%d{MMM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{15}: %msg %xEx%n)</pattern>
</encoder>
</appender>
<root level="TRACE">
<appender-ref ref="CONSOLE_APPENDER"/>
</root>
<logger name="io.bisq.common.storage.Storage" level="WARN"/>
<logger name="io.bisq.common.storage.FileManager" level="WARN"/>
<logger name="com.neemre.btcdcli4j" level="WARN"/>
<logger name="com.msopentech.thali.toronionproxy.OnionProxyManagerEventHandler" level="INFO"/>
</configuration>