diff --git a/btcnodemonitor/README.md b/btcnodemonitor/README.md new file mode 100644 index 0000000000..af66181540 --- /dev/null +++ b/btcnodemonitor/README.md @@ -0,0 +1,25 @@ +## Bitcoin node monitor + +This is a simple headless node with a http server which connects periodically to the Bisq-provided Bitcoin nodes and +disconnect quickly afterwards. + +### Run Bitcoin node monitor + +Run the Gradle task: + +```sh +./gradlew btcnodemonitor:run +``` + +Or create a run scrip by: + +```sh +./gradlew btcnodemonitor:startBisqApp +``` + +And then run: + +```sh +./bisq-btcnodemonitor +``` + diff --git a/btcnodemonitor/build.gradle b/btcnodemonitor/build.gradle new file mode 100644 index 0000000000..ae58de1f94 --- /dev/null +++ b/btcnodemonitor/build.gradle @@ -0,0 +1,37 @@ +plugins { + id 'bisq.application' + id 'bisq.gradle.app_start_plugin.AppStartPlugin' +} + +mainClassName = 'bisq.btcnodemonitor.BtcNodeMonitorMain' + +distTar.enabled = true + +dependencies { + implementation enforcedPlatform(project(':platform')) + implementation project(':proto') + implementation project(':common') + implementation project(':core') + implementation project(':p2p') + annotationProcessor libs.lombok + compileOnly libs.javax.annotation + compileOnly libs.lombok + implementation libs.logback.classic + implementation libs.logback.core + implementation libs.google.guava + implementation libs.apache.commons.lang3 + implementation libs.jetbrains.annotations + implementation libs.slf4j.api + implementation(libs.netlayer.tor.external) { + exclude(module: 'slf4j-api') + } + implementation(libs.bitcoinj) { + exclude(module: 'bcprov-jdk15on') + exclude(module: 'guava') + exclude(module: 'jsr305') + exclude(module: 'okhttp') + exclude(module: 'okio') + exclude(module: 'slf4j-api') + } + implementation libs.spark.core +} diff --git a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/BtcNodeMonitor.java b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/BtcNodeMonitor.java new file mode 100644 index 0000000000..44fd3dd3cf --- /dev/null +++ b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/BtcNodeMonitor.java @@ -0,0 +1,66 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.btcnodemonitor; + + +import bisq.core.btc.nodes.BtcNodes; + +import bisq.common.config.Config; + +import java.util.concurrent.CompletableFuture; + +import lombok.extern.slf4j.Slf4j; + + + +import bisq.btcnodemonitor.btc.PeerConncetionModel; +import bisq.btcnodemonitor.btc.PeerGroupService; +import bisq.btcnodemonitor.server.SimpleHttpServer; +import bisq.btcnodemonitor.socksProxy.ProxySetup; + +@Slf4j +public class BtcNodeMonitor { + private final PeerGroupService peerGroupService; + private final ProxySetup proxySetup; + private final SimpleHttpServer simpleHttpServer; + + public BtcNodeMonitor(Config config) { + PeerConncetionModel peerConncetionModel = new PeerConncetionModel(new BtcNodes().getProvidedBtcNodes(), this::onChange); + simpleHttpServer = new SimpleHttpServer(config, peerConncetionModel); + proxySetup = new ProxySetup(config); + peerGroupService = new PeerGroupService(config, peerConncetionModel); + } + + public void onChange() { + simpleHttpServer.onChange(); + } + + public CompletableFuture start() { + return simpleHttpServer.start() + .thenCompose(nil -> proxySetup.createSocksProxy()) + .thenAccept(peerGroupService::applySocks5Proxy) + .thenCompose(nil -> peerGroupService.start()) + .thenRun(peerGroupService::connectToAll); + } + + public CompletableFuture shutdown() { + return peerGroupService.shutdown() + .thenCompose(nil -> proxySetup.shutdown()) + .thenCompose(nil -> simpleHttpServer.shutdown()); + } +} diff --git a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/BtcNodeMonitorMain.java b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/BtcNodeMonitorMain.java new file mode 100644 index 0000000000..13a14f10fd --- /dev/null +++ b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/BtcNodeMonitorMain.java @@ -0,0 +1,73 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.btcnodemonitor; + + +import bisq.common.UserThread; +import bisq.common.config.Config; +import bisq.common.handlers.ResultHandler; +import bisq.common.setup.CommonSetup; +import bisq.common.setup.GracefulShutDownHandler; +import bisq.common.util.SingleThreadExecutorUtils; +import bisq.common.util.Utilities; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class BtcNodeMonitorMain implements GracefulShutDownHandler { + public static void main(String[] args) { + new BtcNodeMonitorMain(args); + } + + private final Config config; + @Getter + private final BtcNodeMonitor btcNodeMonitor; + + public BtcNodeMonitorMain(String[] args) { + config = new Config("bisq_btc_node_monitor", Utilities.getUserDataDir(), args); + CommonSetup.setup(config, this); + configUserThread(); + + btcNodeMonitor = new BtcNodeMonitor(config); + btcNodeMonitor.start().join(); + keepRunning(); + } + + @Override + public void gracefulShutDown(ResultHandler resultHandler) { + btcNodeMonitor.shutdown().join(); + System.exit(0); + resultHandler.handleResult(); + } + + private void keepRunning() { + try { + Thread.currentThread().setName("BtcNodeMonitorMain"); + Thread.currentThread().join(); + } catch (InterruptedException e) { + log.error("BtcNodeMonitorMain Thread interrupted", e); + gracefulShutDown(() -> { + }); + } + } + + private void configUserThread() { + UserThread.setExecutor(SingleThreadExecutorUtils.getSingleThreadExecutor("UserThread")); + } +} diff --git a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerConncetionInfo.java b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerConncetionInfo.java new file mode 100644 index 0000000000..444083ba93 --- /dev/null +++ b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerConncetionInfo.java @@ -0,0 +1,183 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.btcnodemonitor.btc; + +import org.bitcoinj.core.Peer; +import org.bitcoinj.core.PeerAddress; +import org.bitcoinj.core.VersionMessage; + +import com.google.common.base.Joiner; + +import java.net.InetAddress; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import lombok.Getter; +import lombok.Setter; + +@Getter +public class PeerConncetionInfo { + private final List connectionAttempts = new ArrayList<>(); + private final PeerAddress peerAddress; + private final Runnable onChangeHandler; + private Optional currentConnectionAttempt = Optional.empty(); + + public PeerConncetionInfo(PeerAddress peerAddress, Runnable onChangeHandler) { + this.peerAddress = peerAddress; + this.onChangeHandler = onChangeHandler; + } + + public ConnectionAttempt newConnectionAttempt(Peer peer) { + currentConnectionAttempt = Optional.of(new ConnectionAttempt(peer, onChangeHandler)); + connectionAttempts.add(currentConnectionAttempt.get()); + onChangeHandler.run(); + return currentConnectionAttempt.get(); + } + + + public String getAddress() { + InetAddress inetAddress = peerAddress.getAddr(); + if (inetAddress != null) { + return inetAddress.getHostAddress(); + } else { + return peerAddress.getHostname(); + } + } + + public String getShortId() { + return getAddress().substring(0, 12) + "..."; + } + + public int getNumConnectionAttempts() { + return connectionAttempts.size(); + } + + public int getNumConnections() { + return (int) connectionAttempts.stream().filter(e -> e.isConnected).count(); + } + + public int getNumDisconnections() { + return (int) connectionAttempts.stream().filter(e -> !e.isConnected).count(); + } + + public int getNumFailures() { + return (int) connectionAttempts.stream().filter(e -> e.exception.isPresent()).count(); + } + + public int getNumSuccess() { + return (int) connectionAttempts.stream().filter(e -> e.versionMessage.isPresent()).count(); + } + + public List getReverseConnectionAttempts() { + List reverseConnectionAttempts = new ArrayList<>(connectionAttempts); + Collections.reverse(reverseConnectionAttempts); + return reverseConnectionAttempts; + } + + public Optional getLastSuccessfulConnected() { + return getReverseConnectionAttempts().stream().filter(e -> e.versionMessage.isPresent()).findFirst(); + } + + public int getIndex(ConnectionAttempt connectionAttempt) { + return connectionAttempts.indexOf(connectionAttempt); + } + + public long getLastSuccessfulConnectTime() { + return getReverseConnectionAttempts().stream().filter(e -> e.versionMessage.isPresent()).findFirst() + .map(ConnectionAttempt::getTimeToConnect) + .orElse(0L); + } + + public double getAverageTimeToConnect() { + return connectionAttempts.stream().mapToLong(ConnectionAttempt::getTimeToConnect).average().orElse(0d); + } + + public Optional getLastExceptionMessage() { + return getReverseConnectionAttempts().stream() + .filter(e -> e.exception.isPresent()) + .findFirst() + .flatMap(ConnectionAttempt::getException) + .map(Throwable::getMessage); + } + + public String getAllExceptionMessages() { + return Joiner.on(",\n") + .join(getReverseConnectionAttempts().stream() + .filter(e -> e.exception.isPresent()) + .flatMap(e -> e.getException().stream()) + .map(Throwable::getMessage) + .collect(Collectors.toList())); + } + + public double getFailureRate() { + if (getNumConnectionAttempts() == 0) { + return 0; + } + return getNumFailures() / (double) getNumConnectionAttempts(); + } + + @Override + public String toString() { + return getShortId(); + } + + @Getter + public static class ConnectionAttempt { + private final Peer peer; + private final Runnable updateHandler; + private final long connectTs; + private boolean isConnected; + @Setter + private long timeToConnect; + @Setter + private long timeToDisconnect; + @Setter + private long timeToFailure; + private Optional exception = Optional.empty(); + private Optional versionMessage = Optional.empty(); + + public ConnectionAttempt(Peer peer, Runnable updateHandler) { + this.peer = peer; + this.updateHandler = updateHandler; + connectTs = System.currentTimeMillis(); + } + + public void onConnected() { + // We clone to avoid change of fields when disconnect happens + VersionMessage peerVersionMessage = peer.getPeerVersionMessage().duplicate(); + versionMessage = Optional.of(peerVersionMessage); + isConnected = true; + updateHandler.run(); + } + + public void onDisconnected() { + isConnected = false; + updateHandler.run(); + } + + public void onException(Throwable exception) { + this.exception = Optional.of(exception); + isConnected = false; + updateHandler.run(); + } + } +} diff --git a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerConncetionModel.java b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerConncetionModel.java new file mode 100644 index 0000000000..0a69e4ad3f --- /dev/null +++ b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerConncetionModel.java @@ -0,0 +1,49 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.btcnodemonitor.btc; + +import bisq.core.btc.nodes.BtcNodes; + +import org.bitcoinj.core.PeerAddress; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import lombok.Getter; + +@Getter +public class PeerConncetionModel { + private final Map map = new HashMap<>(); + private final List providedBtcNodes; + private final Runnable onChangeHandler; + + public PeerConncetionModel(List providedBtcNodes, Runnable onChangeHandler) { + this.providedBtcNodes = providedBtcNodes; + this.onChangeHandler = onChangeHandler; + } + + public void fill(Set peerAddresses) { + map.clear(); + map.putAll(peerAddresses.stream() + .map(peerAddress -> new PeerConncetionInfo(peerAddress, onChangeHandler)) + .collect(Collectors.toMap(PeerConncetionInfo::getAddress, e -> e))); + } +} diff --git a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerConnection.java b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerConnection.java new file mode 100644 index 0000000000..c5fb619844 --- /dev/null +++ b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerConnection.java @@ -0,0 +1,194 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.btcnodemonitor.btc; + +import bisq.common.Timer; +import bisq.common.UserThread; +import bisq.common.util.SingleThreadExecutorUtils; + +import org.bitcoinj.core.Context; +import org.bitcoinj.core.NetworkParameters; +import org.bitcoinj.core.Peer; +import org.bitcoinj.core.PeerAddress; +import org.bitcoinj.core.Utils; +import org.bitcoinj.core.VersionMessage; +import org.bitcoinj.net.BlockingClientManager; +import org.bitcoinj.net.ClientConnectionManager; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; + +import java.net.SocketAddress; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +import lombok.extern.slf4j.Slf4j; + +import org.jetbrains.annotations.NotNull; + +@Slf4j +public class PeerConnection { + private final Context context; + private final int disconnectIntervalSec; + private final int reconnectIntervalSec; + private final ClientConnectionManager clientConnectionManager; + private final NetworkParameters params; + private final int connectTimeoutMillis; + private final int vMinRequiredProtocolVersion; + private final PeerConncetionInfo peerConncetionInfo; + private Optional disconnectScheduler = Optional.empty(); + private Optional reconnectScheduler = Optional.empty(); + private final AtomicBoolean shutdownCalled = new AtomicBoolean(); + + public PeerConnection(Context context, + PeerConncetionInfo peerConncetionInfo, + BlockingClientManager blockingClientManager, + int connectTimeoutMillis, + int disconnectIntervalSec, + int reconnectIntervalSec) { + this.context = context; + this.peerConncetionInfo = peerConncetionInfo; + this.clientConnectionManager = blockingClientManager; + this.connectTimeoutMillis = connectTimeoutMillis; + this.disconnectIntervalSec = disconnectIntervalSec; + this.reconnectIntervalSec = reconnectIntervalSec; + + this.params = context.getParams(); + vMinRequiredProtocolVersion = params.getProtocolVersionNum(NetworkParameters.ProtocolVersion.BLOOM_FILTER); + } + + public CompletableFuture shutdown() { + shutdownCalled.set(true); + disconnectScheduler.ifPresent(Timer::stop); + reconnectScheduler.ifPresent(Timer::stop); + return CompletableFuture.runAsync(() -> { + log.info("shutdown {}", peerConncetionInfo); + Context.propagate(context); + disconnect(); + }, SingleThreadExecutorUtils.getSingleThreadExecutor("shutdown-" + peerConncetionInfo.getShortId())); + } + + public void connect() { + CompletableFuture.runAsync(() -> { + log.info("connect {}", peerConncetionInfo); + Context.propagate(context); + Peer peer = createPeer(peerConncetionInfo.getPeerAddress()); + PeerConncetionInfo.ConnectionAttempt connectionAttempt = peerConncetionInfo.newConnectionAttempt(peer); + long ts = System.currentTimeMillis(); + try { + peer.addConnectedEventListener((peer1, peerCount) -> { + connectionAttempt.setTimeToConnect(System.currentTimeMillis() - ts); + connectionAttempt.onConnected(); + startAutoDisconnectAndReconnect(); + }); + peer.addDisconnectedEventListener((peer1, peerCount) -> { + long passed = System.currentTimeMillis() - ts; + if (Math.abs(passed - connectTimeoutMillis) < 100) { + // timeout is not handled as error in bitcoinj, but it simply disconnects + connectionAttempt.setTimeToFailure(passed); + connectionAttempt.onException(new TimeoutException("Connection timeout. Could not connect after " + passed / 1000 + " sec.")); + } else { + connectionAttempt.setTimeToDisconnect(passed); + connectionAttempt.onDisconnected(); + } + startAutoDisconnectAndReconnect(); + }); + openConnection(peer).join(); + } catch (Exception exception) { + log.warn("Error at opening connection to peer {}", peerConncetionInfo, exception); + connectionAttempt.setTimeToFailure(System.currentTimeMillis() - ts); + connectionAttempt.onException(exception); + startAutoDisconnectAndReconnect(); + } + }, SingleThreadExecutorUtils.getSingleThreadExecutor("connect-" + peerConncetionInfo.getShortId())); + } + + private CompletableFuture disconnect() { + return peerConncetionInfo.getCurrentConnectionAttempt() + .map(currentConnectionAttempt -> CompletableFuture.runAsync(() -> { + log.info("disconnect {}", peerConncetionInfo); + Context.propagate(context); + currentConnectionAttempt.getPeer().close(); + }, + SingleThreadExecutorUtils.getSingleThreadExecutor("disconnect-" + peerConncetionInfo.getShortId()))) + .orElse(CompletableFuture.completedFuture(null)); + } + + private void startAutoDisconnectAndReconnect() { + if (shutdownCalled.get()) { + return; + } + disconnectScheduler.ifPresent(Timer::stop); + disconnectScheduler = Optional.of(UserThread.runAfter(() -> { + if (shutdownCalled.get()) { + return; + } + disconnect() + .thenRun(() -> { + if (shutdownCalled.get()) { + return; + } + reconnectScheduler.ifPresent(Timer::stop); + reconnectScheduler = Optional.of(UserThread.runAfter(() -> { + if (shutdownCalled.get()) { + return; + } + connect(); + }, reconnectIntervalSec)); + }); + }, disconnectIntervalSec)); + } + + private Peer createPeer(PeerAddress address) { + Peer peer = new Peer(params, getVersionMessage(address), address, null, 0, 0); + peer.setMinProtocolVersion(vMinRequiredProtocolVersion); + peer.setSocketTimeout(connectTimeoutMillis); + return peer; + } + + private CompletableFuture openConnection(Peer peer) { + CompletableFuture future = new CompletableFuture<>(); + ListenableFuture listenableFuture = clientConnectionManager.openConnection(peer.getAddress().toSocketAddress(), peer); + Futures.addCallback(listenableFuture, new FutureCallback<>() { + @Override + public void onSuccess(SocketAddress result) { + future.complete(result); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + future.completeExceptionally(throwable); + } + }, MoreExecutors.directExecutor()); + return future; + } + + private VersionMessage getVersionMessage(PeerAddress address) { + VersionMessage versionMessage = new VersionMessage(params, 0); + versionMessage.bestHeight = 0; + versionMessage.time = Utils.currentTimeSeconds(); + versionMessage.receivingAddr = address; + versionMessage.receivingAddr.setParent(versionMessage); + return versionMessage; + } +} diff --git a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerGroupService.java b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerGroupService.java new file mode 100644 index 0000000000..0a277cb143 --- /dev/null +++ b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/PeerGroupService.java @@ -0,0 +1,150 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.btcnodemonitor.btc; + +import bisq.core.btc.nodes.BtcNodes; +import bisq.core.btc.nodes.BtcNodesRepository; +import bisq.core.btc.nodes.LocalBitcoinNode; +import bisq.core.btc.nodes.ProxySocketFactory; + +import bisq.common.UserThread; +import bisq.common.config.Config; +import bisq.common.util.SingleThreadExecutorUtils; + +import org.bitcoinj.core.Context; +import org.bitcoinj.core.NetworkParameters; +import org.bitcoinj.core.PeerAddress; +import org.bitcoinj.core.PeerGroup; +import org.bitcoinj.net.BlockingClientManager; +import org.bitcoinj.utils.Threading; + +import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy; + +import java.time.Duration; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Proxy; + +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class PeerGroupService { + private final NetworkParameters params; + private final LocalBitcoinNode localBitcoinNode; + private final Context context; + private final PeerConncetionModel peerConncetionModel; + private Set peerConnections; + private BlockingClientManager blockingClientManager; + + public PeerGroupService(Config config, PeerConncetionModel peerConncetionModel) { + this.peerConncetionModel = peerConncetionModel; + + params = Config.baseCurrencyNetworkParameters(); + context = new Context(params); + PeerGroup.setIgnoreHttpSeeds(true); + Threading.USER_THREAD = UserThread.getExecutor(); + localBitcoinNode = new LocalBitcoinNode(config); + } + + public void applySocks5Proxy(Optional socks5Proxy) { + int connectTimeoutMillis; + int disconnectIntervalSec; + int reconnectIntervalSec; + Set peerAddresses; + if (localBitcoinNode.shouldBeUsed()) { + InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress().getHostAddress(), params.getPort()); + peerAddresses = Set.of(new PeerAddress(address)); + connectTimeoutMillis = 1000; + disconnectIntervalSec = 5; + reconnectIntervalSec = 5; + blockingClientManager = new BlockingClientManager(); + } else { + BtcNodes btcNodes = new BtcNodes(); + List peerAddressList = btcNodesToPeerAddress(btcNodes, socks5Proxy); + peerAddresses = new HashSet<>(peerAddressList); + connectTimeoutMillis = socks5Proxy.map(s -> 60_000).orElse(10_000); + disconnectIntervalSec = 2; + reconnectIntervalSec = 120; + if (socks5Proxy.isPresent()) { + InetSocketAddress inetSocketAddress = new InetSocketAddress(socks5Proxy.get().getInetAddress(), socks5Proxy.get().getPort()); + Proxy proxy = new Proxy(Proxy.Type.SOCKS, inetSocketAddress); + ProxySocketFactory proxySocketFactory = new ProxySocketFactory(proxy); + blockingClientManager = new BlockingClientManager(proxySocketFactory); + } else { + blockingClientManager = new BlockingClientManager(); + } + } + log.info("Using peer addresses {}", peerAddresses); + blockingClientManager.setConnectTimeoutMillis(connectTimeoutMillis); + peerConncetionModel.fill(peerAddresses); + Set peerConncetionInfoSet = new HashSet<>(peerConncetionModel.getMap().values()); + peerConnections = peerConncetionInfoSet.stream() + .map(peerConncetionInfo -> new PeerConnection(context, + peerConncetionInfo, + blockingClientManager, + connectTimeoutMillis, + disconnectIntervalSec, + reconnectIntervalSec)) + .collect(Collectors.toSet()); + } + + public CompletableFuture start() { + return CompletableFuture.runAsync(() -> { + log.info("start"); + Context.propagate(context); + blockingClientManager.startAsync(); + blockingClientManager.awaitRunning(); + }, SingleThreadExecutorUtils.getSingleThreadExecutor("start")); + } + + public void connectToAll() { + peerConnections.forEach(PeerConnection::connect); + } + + public CompletableFuture shutdown() { + return CompletableFuture.runAsync(() -> { + log.info("shutdown"); + Context.propagate(context); + CountDownLatch latch = new CountDownLatch(peerConnections.size()); + peerConnections.forEach(e -> e.shutdown().thenRun(latch::countDown)); + try { + latch.await(5, TimeUnit.SECONDS); + blockingClientManager.stopAsync(); + blockingClientManager.awaitTerminated(Duration.ofSeconds(2)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, SingleThreadExecutorUtils.getSingleThreadExecutor("shutdown")); + } + + private List btcNodesToPeerAddress(BtcNodes btcNodes, Optional proxy) { + List nodes = btcNodes.getProvidedBtcNodes(); + BtcNodesRepository repository = new BtcNodesRepository(nodes); + return repository.getPeerAddresses(proxy.orElse(null)); + } +} diff --git a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/ServiceBits.java b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/ServiceBits.java new file mode 100644 index 0000000000..c7b59b8373 --- /dev/null +++ b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/btc/ServiceBits.java @@ -0,0 +1,63 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.btcnodemonitor.btc; + +import com.google.common.base.Joiner; + +import java.util.LinkedList; +import java.util.List; + +import static org.bitcoinj.core.VersionMessage.*; + +/** + * Borrowed from VersionMessage and added NODE_P2P_V2 + */ +public class ServiceBits { + public static final int NODE_P2P_V2 = 1 << 11; + + public static String toString(long services) { + List strings = new LinkedList<>(); + if ((services & NODE_NETWORK) == NODE_NETWORK) { + strings.add("NETWORK"); + services &= ~NODE_NETWORK; + } + if ((services & NODE_GETUTXOS) == NODE_GETUTXOS) { + strings.add("GETUTXOS"); + services &= ~NODE_GETUTXOS; + } + if ((services & NODE_BLOOM) == NODE_BLOOM) { + strings.add("BLOOM"); + services &= ~NODE_BLOOM; + } + if ((services & NODE_WITNESS) == NODE_WITNESS) { + strings.add("WITNESS"); + services &= ~NODE_WITNESS; + } + if ((services & NODE_NETWORK_LIMITED) == NODE_NETWORK_LIMITED) { + strings.add("NETWORK_LIMITED"); + services &= ~NODE_NETWORK_LIMITED; + } + if ((services & NODE_P2P_V2) == NODE_P2P_V2) { + strings.add("NODE_P2P_V2"); + services &= ~NODE_P2P_V2; + } + if (services != 0) + strings.add("Unrecognized service bit: " + Long.toBinaryString(services)); + return Joiner.on(", ").join(strings); + } +} diff --git a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/server/SimpleHttpServer.java b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/server/SimpleHttpServer.java new file mode 100644 index 0000000000..8d29b2ce03 --- /dev/null +++ b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/server/SimpleHttpServer.java @@ -0,0 +1,245 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.btcnodemonitor.server; + +import bisq.core.btc.nodes.BtcNodes; +import bisq.core.btc.nodes.LocalBitcoinNode; + +import bisq.common.config.BaseCurrencyNetwork; +import bisq.common.config.Config; +import bisq.common.util.MathUtils; +import bisq.common.util.Profiler; +import bisq.common.util.SingleThreadExecutorUtils; + +import org.bitcoinj.core.VersionMessage; + +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import javax.annotation.Nullable; + + + +import bisq.btcnodemonitor.btc.PeerConncetionInfo; +import bisq.btcnodemonitor.btc.PeerConncetionModel; +import bisq.btcnodemonitor.btc.ServiceBits; +import spark.Spark; + +@Slf4j +public class SimpleHttpServer { + private final static String CLOSE_TAG = "
"; + private final static String WARNING_ICON = "⚠ "; + private final static String ALERT_ICON = "☠ "; // ⚡ ⚡ + private final Config config; + @Getter + private final List providedBtcNodes; + private final Map btcNodeByAddress; + private final int port; + private final PeerConncetionModel peerConncetionModel; + private final String started; + + private String html; + private int requestCounter; + private String networkInfo; + + /////////////////////////////////////////////////////////////////////////////////////////// + // Constructor + /////////////////////////////////////////////////////////////////////////////////////////// + + public SimpleHttpServer(Config config, PeerConncetionModel peerConncetionModel) { + this.config = config; + this.peerConncetionModel = peerConncetionModel; + started = new Date().toString(); + + this.providedBtcNodes = peerConncetionModel.getProvidedBtcNodes(); + + BaseCurrencyNetwork network = config.baseCurrencyNetwork; + if (config.useTorForBtcMonitor) { + port = network.isMainnet() ? 8000 : 8001; + networkInfo = network.isMainnet() ? "TOR/MAIN_NET" : "TOR/REG_TEST"; + btcNodeByAddress = providedBtcNodes.stream() + .filter(e -> e.getOnionAddress() != null) + .collect(Collectors.toMap(BtcNodes.BtcNode::getOnionAddress, e -> e)); + } else { + port = network.isMainnet() ? 8080 : 8081; + networkInfo = network.isMainnet() ? "Clearnet/MAIN_NET" : "Clearnet/REG_TEST"; + + if (new LocalBitcoinNode(config).shouldBeUsed()) { + btcNodeByAddress = new HashMap<>(); + btcNodeByAddress.put("127.0.0.1", new BtcNodes.BtcNode("localhost", null, "127.0.0.1", + Config.baseCurrencyNetworkParameters().getPort(), "n/a")); + } else { + if (network.isMainnet()) { + btcNodeByAddress = providedBtcNodes.stream() + .filter(e -> e.getAddress() != null) + .collect(Collectors.toMap(BtcNodes.BtcNode::getAddress, e -> e)); + } else { + btcNodeByAddress = new HashMap<>(); + } + } + } + html = "Monitor for Bitcoin nodes created for " + networkInfo; + } + + public CompletableFuture start() { + html = "Monitor for Bitcoin nodes starting up for " + networkInfo; + return CompletableFuture.runAsync(() -> { + log.info("Server listen on {}", port); + Spark.port(port); + Spark.get("/", (req, res) -> { + log.info("Incoming request from: {}", req.userAgent()); + return html; + }); + }, SingleThreadExecutorUtils.getSingleThreadExecutor("SimpleHttpServer.start")); + } + + public CompletableFuture shutdown() { + return CompletableFuture.runAsync(() -> { + log.info("shutDown"); + Spark.stop(); + log.info("shutDown completed"); + }); + } + + public void onChange() { + StringBuilder sb = new StringBuilder(); + sb.append("" + + "" + + "" + + "

") + .append("Monitor for Bitcoin nodes using ").append(networkInfo) + .append(", started at: ").append(started).append("
") + .append("System load: ").append(Profiler.getSystemLoad()).append("
") + .append("
").append("") + .append("") + .append("") + .append("") + .append("").append(""); + + Map peersMap = peerConncetionModel.getMap(); + btcNodeByAddress.values().stream() + .sorted(Comparator.comparing(BtcNodes.BtcNode::getId)) + .forEach(btcNode -> { + sb.append(""); + String address = btcNode.getAddress(); + PeerConncetionInfo peerConncetionInfo = peersMap.get(address); + if (peersMap.containsKey(address)) { + sb.append("") + .append("") + .append(""); + sb.append(""); + return; + } + + address = btcNode.getOnionAddress(); + peerConncetionInfo = peersMap.get(address); + if (peersMap.containsKey(address)) { + sb.append("") + .append("") + .append(""); + } else { + /* sb.append("") + .append("");*/ + } + sb.append(""); + }); + + sb.append("
Node operatorConnection attemptsNode info
").append(getOperatorInfo(btcNode, address)).append("").append(getConnectionInfo(peerConncetionInfo)).append("").append(getNodeInfo(peerConncetionInfo)).append("
").append(getOperatorInfo(btcNode, address)).append("").append(getConnectionInfo(peerConncetionInfo)).append("").append(getNodeInfo(peerConncetionInfo)).append("").append(getOperatorInfo(btcNode, null)).append("").append("n/a").append("
"); + html = sb.toString(); + } + + private String getOperatorInfo(BtcNodes.BtcNode btcNode, @Nullable String address) { + StringBuilder sb = new StringBuilder(); + sb.append(btcNode.getId()).append("
"); + if (address != null) { + sb.append("Address: ").append(address).append("
"); + } + return sb.toString(); + } + + private String getConnectionInfo(PeerConncetionInfo info) { + StringBuilder sb = new StringBuilder(); + sb.append("Num connection attempts: ").append(info.getNumConnectionAttempts()).append("
"); + double failureRate = info.getFailureRate(); + String failureRateString = MathUtils.roundDouble(failureRate * 100, 2) + "%"; + if (failureRate > 0.9) { + failureRateString = asError(failureRateString, failureRateString); + } else if (failureRate > 0.3) { + failureRateString = asWarn(failureRateString, failureRateString); + } + sb.append("FailureRate (success/failures): ").append(failureRateString) + .append("(").append(info.getNumFailures()).append(" / ") + .append(info.getNumFailures()).append(")").append("
"); + info.getLastExceptionMessage().ifPresent(errorMessage -> + sb.append(asError("LastExceptionMessage: " + errorMessage, info.getAllExceptionMessages())) + .append("
")); + sb.append("Duration to connect: ").append(MathUtils.roundDouble(info.getLastSuccessfulConnectTime() / 1000d, 2)).append(" sec").append("
"); + return sb.toString(); + } + + private String getNodeInfo(PeerConncetionInfo info) { + if (info.getLastSuccessfulConnected().isEmpty()) { + return ""; + } + PeerConncetionInfo.ConnectionAttempt attempt = info.getLastSuccessfulConnected().get(); + if (attempt.getVersionMessage().isEmpty()) { + return ""; + } + int index = info.getIndex(attempt); + + StringBuilder sb = new StringBuilder(); + VersionMessage versionMessage = attempt.getVersionMessage().get(); + sb.append("Result from connection attempt: ").append(index).append("
"); + sb.append("Height: ").append(versionMessage.bestHeight).append("
"); + sb.append("Version: ").append(versionMessage.subVer).append(" (").append(versionMessage.clientVersion).append(")").append("
"); + String serviceBits = ServiceBits.toString(versionMessage.localServices); + sb.append("Services: ").append(serviceBits) + .append(" (").append(versionMessage.localServices).append(")").append("
"); + long peerTime = versionMessage.time * 1000; + sb.append("Time: ").append(String.format(Locale.US, "%tF %tT", peerTime, peerTime)); + return sb.toString(); + } + + private static String decorate(String style, String value, String tooltip) { + return "" + value + ""; + } + + private static String asWarn(String value, String tooltip) { + return decorate("warn", value, tooltip); + } + + private static String asError(String value, String tooltip) { + return decorate("error", value, tooltip); + } +} diff --git a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/socksProxy/ProxySetup.java b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/socksProxy/ProxySetup.java new file mode 100644 index 0000000000..44cfa144cf --- /dev/null +++ b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/socksProxy/ProxySetup.java @@ -0,0 +1,93 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.btcnodemonitor.socksProxy; + +import bisq.network.Socks5ProxyProvider; +import bisq.network.p2p.network.NewTor; + +import bisq.common.config.Config; +import bisq.common.util.SingleThreadExecutorUtils; + +import org.berndpruenster.netlayer.tor.Tor; +import org.berndpruenster.netlayer.tor.TorCtlException; + +import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy; + +import java.nio.file.Paths; + +import java.io.File; +import java.io.IOException; + +import java.util.ArrayList; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import static com.google.common.base.Preconditions.checkArgument; + +@Slf4j +public class ProxySetup { + @Getter + private final File torDir; + private final SocksProxyFactory socksProxyFactory; + private final Config config; + private NewTor torMode; + private Tor tor; + + public ProxySetup(Config config) { + this.config = config; + socksProxyFactory = new SocksProxyFactory("127.0.0.1"); + Socks5ProxyProvider socks5ProxyProvider = new Socks5ProxyProvider("", ""); + socks5ProxyProvider.setSocks5ProxyInternal(socksProxyFactory); + String networkDirName = config.baseCurrencyNetwork.name().toLowerCase(); + torDir = Paths.get(config.appDataDir.getPath(), networkDirName, "tor").toFile(); + } + + public CompletableFuture> createSocksProxy() { + log.info("createSocksProxy"); + if (!config.useTorForBtcMonitor) { + return CompletableFuture.completedFuture(Optional.empty()); + } + checkArgument(tor == null); + return CompletableFuture.supplyAsync(() -> { + torMode = new NewTor(torDir, null, "", ArrayList::new); + try { + // blocking + tor = torMode.getTor(); + socksProxyFactory.setTor(tor); + return Optional.of(socksProxyFactory.getSocksProxy()); + } catch (IOException | TorCtlException e) { + throw new RuntimeException(e); + } + }, SingleThreadExecutorUtils.getSingleThreadExecutor("ProxySetup.start")); + } + + public CompletableFuture shutdown() { + log.warn("start shutdown"); + return CompletableFuture.runAsync(() -> { + if (tor != null) { + tor.shutdown(); + } + log.warn("shutdown completed"); + }) + .orTimeout(2, TimeUnit.SECONDS); + } +} diff --git a/btcnodemonitor/src/main/java/bisq/btcnodemonitor/socksProxy/SocksProxyFactory.java b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/socksProxy/SocksProxyFactory.java new file mode 100644 index 0000000000..e454030500 --- /dev/null +++ b/btcnodemonitor/src/main/java/bisq/btcnodemonitor/socksProxy/SocksProxyFactory.java @@ -0,0 +1,59 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.btcnodemonitor.socksProxy; + +import bisq.network.p2p.network.Socks5ProxyInternalFactory; + +import org.berndpruenster.netlayer.tor.Tor; + +import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy; + +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +import javax.annotation.Nullable; + +@Slf4j +public class SocksProxyFactory implements Socks5ProxyInternalFactory { + private final String torControlHost; + @Setter + @Nullable + private Tor tor; + private Socks5Proxy socksProxy; + + public SocksProxyFactory(String torControlHost) { + this.torControlHost = torControlHost; + } + + @Override + public Socks5Proxy getSocksProxy() { + if (tor == null) { + return null; + } else { + try { + if (socksProxy == null) { + socksProxy = tor.getProxy(torControlHost, null); + } + return socksProxy; + } catch (Throwable t) { + log.error("Error at getSocksProxy", t); + return null; + } + } + } +} diff --git a/btcnodemonitor/src/main/resources/logback.xml b/btcnodemonitor/src/main/resources/logback.xml new file mode 100644 index 0000000000..5becc221f5 --- /dev/null +++ b/btcnodemonitor/src/main/resources/logback.xml @@ -0,0 +1,17 @@ + + + + + %highlight(%d{MMM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{40}: %msg %xEx%n) + + + + + + + + + + + + diff --git a/settings.gradle b/settings.gradle index 35128e1f04..8220109914 100644 --- a/settings.gradle +++ b/settings.gradle @@ -21,6 +21,7 @@ toolchainManagement { include 'proto' include 'assets' +include 'btcnodemonitor' include 'common' include 'p2p' include 'core'