Add btxnodemonitor module

Signed-off-by: HenrikJannsen <boilingfrog@gmx.com>
This commit is contained in:
HenrikJannsen 2024-06-10 20:29:57 +07:00
parent 9c8e3ad6bc
commit 0752e07efc
No known key found for this signature in database
GPG Key ID: 02AA2BAE387C8307
14 changed files with 1255 additions and 0 deletions

25
btcnodemonitor/README.md Normal file
View File

@ -0,0 +1,25 @@
## Bitcoin node monitor
This is a simple headless node with a http server which connects periodically to the Bisq-provided Bitcoin nodes and
disconnect quickly afterwards.
### Run Bitcoin node monitor
Run the Gradle task:
```sh
./gradlew btcnodemonitor:run
```
Or create a run scrip by:
```sh
./gradlew btcnodemonitor:startBisqApp
```
And then run:
```sh
./bisq-btcnodemonitor
```

View File

@ -0,0 +1,37 @@
plugins {
id 'bisq.application'
id 'bisq.gradle.app_start_plugin.AppStartPlugin'
}
mainClassName = 'bisq.btcnodemonitor.BtcNodeMonitorMain'
distTar.enabled = true
dependencies {
implementation enforcedPlatform(project(':platform'))
implementation project(':proto')
implementation project(':common')
implementation project(':core')
implementation project(':p2p')
annotationProcessor libs.lombok
compileOnly libs.javax.annotation
compileOnly libs.lombok
implementation libs.logback.classic
implementation libs.logback.core
implementation libs.google.guava
implementation libs.apache.commons.lang3
implementation libs.jetbrains.annotations
implementation libs.slf4j.api
implementation(libs.netlayer.tor.external) {
exclude(module: 'slf4j-api')
}
implementation(libs.bitcoinj) {
exclude(module: 'bcprov-jdk15on')
exclude(module: 'guava')
exclude(module: 'jsr305')
exclude(module: 'okhttp')
exclude(module: 'okio')
exclude(module: 'slf4j-api')
}
implementation libs.spark.core
}

View File

@ -0,0 +1,66 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.btcnodemonitor;
import bisq.core.btc.nodes.BtcNodes;
import bisq.common.config.Config;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import bisq.btcnodemonitor.btc.PeerConncetionModel;
import bisq.btcnodemonitor.btc.PeerGroupService;
import bisq.btcnodemonitor.server.SimpleHttpServer;
import bisq.btcnodemonitor.socksProxy.ProxySetup;
@Slf4j
public class BtcNodeMonitor {
private final PeerGroupService peerGroupService;
private final ProxySetup proxySetup;
private final SimpleHttpServer simpleHttpServer;
public BtcNodeMonitor(Config config) {
PeerConncetionModel peerConncetionModel = new PeerConncetionModel(new BtcNodes().getProvidedBtcNodes(), this::onChange);
simpleHttpServer = new SimpleHttpServer(config, peerConncetionModel);
proxySetup = new ProxySetup(config);
peerGroupService = new PeerGroupService(config, peerConncetionModel);
}
public void onChange() {
simpleHttpServer.onChange();
}
public CompletableFuture<Void> start() {
return simpleHttpServer.start()
.thenCompose(nil -> proxySetup.createSocksProxy())
.thenAccept(peerGroupService::applySocks5Proxy)
.thenCompose(nil -> peerGroupService.start())
.thenRun(peerGroupService::connectToAll);
}
public CompletableFuture<Void> shutdown() {
return peerGroupService.shutdown()
.thenCompose(nil -> proxySetup.shutdown())
.thenCompose(nil -> simpleHttpServer.shutdown());
}
}

View File

@ -0,0 +1,73 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.btcnodemonitor;
import bisq.common.UserThread;
import bisq.common.config.Config;
import bisq.common.handlers.ResultHandler;
import bisq.common.setup.CommonSetup;
import bisq.common.setup.GracefulShutDownHandler;
import bisq.common.util.SingleThreadExecutorUtils;
import bisq.common.util.Utilities;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class BtcNodeMonitorMain implements GracefulShutDownHandler {
public static void main(String[] args) {
new BtcNodeMonitorMain(args);
}
private final Config config;
@Getter
private final BtcNodeMonitor btcNodeMonitor;
public BtcNodeMonitorMain(String[] args) {
config = new Config("bisq_btc_node_monitor", Utilities.getUserDataDir(), args);
CommonSetup.setup(config, this);
configUserThread();
btcNodeMonitor = new BtcNodeMonitor(config);
btcNodeMonitor.start().join();
keepRunning();
}
@Override
public void gracefulShutDown(ResultHandler resultHandler) {
btcNodeMonitor.shutdown().join();
System.exit(0);
resultHandler.handleResult();
}
private void keepRunning() {
try {
Thread.currentThread().setName("BtcNodeMonitorMain");
Thread.currentThread().join();
} catch (InterruptedException e) {
log.error("BtcNodeMonitorMain Thread interrupted", e);
gracefulShutDown(() -> {
});
}
}
private void configUserThread() {
UserThread.setExecutor(SingleThreadExecutorUtils.getSingleThreadExecutor("UserThread"));
}
}

View File

@ -0,0 +1,183 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.btcnodemonitor.btc;
import org.bitcoinj.core.Peer;
import org.bitcoinj.core.PeerAddress;
import org.bitcoinj.core.VersionMessage;
import com.google.common.base.Joiner;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.Setter;
@Getter
public class PeerConncetionInfo {
private final List<ConnectionAttempt> connectionAttempts = new ArrayList<>();
private final PeerAddress peerAddress;
private final Runnable onChangeHandler;
private Optional<ConnectionAttempt> currentConnectionAttempt = Optional.empty();
public PeerConncetionInfo(PeerAddress peerAddress, Runnable onChangeHandler) {
this.peerAddress = peerAddress;
this.onChangeHandler = onChangeHandler;
}
public ConnectionAttempt newConnectionAttempt(Peer peer) {
currentConnectionAttempt = Optional.of(new ConnectionAttempt(peer, onChangeHandler));
connectionAttempts.add(currentConnectionAttempt.get());
onChangeHandler.run();
return currentConnectionAttempt.get();
}
public String getAddress() {
InetAddress inetAddress = peerAddress.getAddr();
if (inetAddress != null) {
return inetAddress.getHostAddress();
} else {
return peerAddress.getHostname();
}
}
public String getShortId() {
return getAddress().substring(0, 12) + "...";
}
public int getNumConnectionAttempts() {
return connectionAttempts.size();
}
public int getNumConnections() {
return (int) connectionAttempts.stream().filter(e -> e.isConnected).count();
}
public int getNumDisconnections() {
return (int) connectionAttempts.stream().filter(e -> !e.isConnected).count();
}
public int getNumFailures() {
return (int) connectionAttempts.stream().filter(e -> e.exception.isPresent()).count();
}
public int getNumSuccess() {
return (int) connectionAttempts.stream().filter(e -> e.versionMessage.isPresent()).count();
}
public List<ConnectionAttempt> getReverseConnectionAttempts() {
List<ConnectionAttempt> reverseConnectionAttempts = new ArrayList<>(connectionAttempts);
Collections.reverse(reverseConnectionAttempts);
return reverseConnectionAttempts;
}
public Optional<ConnectionAttempt> getLastSuccessfulConnected() {
return getReverseConnectionAttempts().stream().filter(e -> e.versionMessage.isPresent()).findFirst();
}
public int getIndex(ConnectionAttempt connectionAttempt) {
return connectionAttempts.indexOf(connectionAttempt);
}
public long getLastSuccessfulConnectTime() {
return getReverseConnectionAttempts().stream().filter(e -> e.versionMessage.isPresent()).findFirst()
.map(ConnectionAttempt::getTimeToConnect)
.orElse(0L);
}
public double getAverageTimeToConnect() {
return connectionAttempts.stream().mapToLong(ConnectionAttempt::getTimeToConnect).average().orElse(0d);
}
public Optional<String> getLastExceptionMessage() {
return getReverseConnectionAttempts().stream()
.filter(e -> e.exception.isPresent())
.findFirst()
.flatMap(ConnectionAttempt::getException)
.map(Throwable::getMessage);
}
public String getAllExceptionMessages() {
return Joiner.on(",\n")
.join(getReverseConnectionAttempts().stream()
.filter(e -> e.exception.isPresent())
.flatMap(e -> e.getException().stream())
.map(Throwable::getMessage)
.collect(Collectors.toList()));
}
public double getFailureRate() {
if (getNumConnectionAttempts() == 0) {
return 0;
}
return getNumFailures() / (double) getNumConnectionAttempts();
}
@Override
public String toString() {
return getShortId();
}
@Getter
public static class ConnectionAttempt {
private final Peer peer;
private final Runnable updateHandler;
private final long connectTs;
private boolean isConnected;
@Setter
private long timeToConnect;
@Setter
private long timeToDisconnect;
@Setter
private long timeToFailure;
private Optional<Throwable> exception = Optional.empty();
private Optional<VersionMessage> versionMessage = Optional.empty();
public ConnectionAttempt(Peer peer, Runnable updateHandler) {
this.peer = peer;
this.updateHandler = updateHandler;
connectTs = System.currentTimeMillis();
}
public void onConnected() {
// We clone to avoid change of fields when disconnect happens
VersionMessage peerVersionMessage = peer.getPeerVersionMessage().duplicate();
versionMessage = Optional.of(peerVersionMessage);
isConnected = true;
updateHandler.run();
}
public void onDisconnected() {
isConnected = false;
updateHandler.run();
}
public void onException(Throwable exception) {
this.exception = Optional.of(exception);
isConnected = false;
updateHandler.run();
}
}
}

View File

@ -0,0 +1,49 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.btcnodemonitor.btc;
import bisq.core.btc.nodes.BtcNodes;
import org.bitcoinj.core.PeerAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Getter;
@Getter
public class PeerConncetionModel {
private final Map<String, PeerConncetionInfo> map = new HashMap<>();
private final List<BtcNodes.BtcNode> providedBtcNodes;
private final Runnable onChangeHandler;
public PeerConncetionModel(List<BtcNodes.BtcNode> providedBtcNodes, Runnable onChangeHandler) {
this.providedBtcNodes = providedBtcNodes;
this.onChangeHandler = onChangeHandler;
}
public void fill(Set<PeerAddress> peerAddresses) {
map.clear();
map.putAll(peerAddresses.stream()
.map(peerAddress -> new PeerConncetionInfo(peerAddress, onChangeHandler))
.collect(Collectors.toMap(PeerConncetionInfo::getAddress, e -> e)));
}
}

View File

@ -0,0 +1,194 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.btcnodemonitor.btc;
import bisq.common.Timer;
import bisq.common.UserThread;
import bisq.common.util.SingleThreadExecutorUtils;
import org.bitcoinj.core.Context;
import org.bitcoinj.core.NetworkParameters;
import org.bitcoinj.core.Peer;
import org.bitcoinj.core.PeerAddress;
import org.bitcoinj.core.Utils;
import org.bitcoinj.core.VersionMessage;
import org.bitcoinj.net.BlockingClientManager;
import org.bitcoinj.net.ClientConnectionManager;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.net.SocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
@Slf4j
public class PeerConnection {
private final Context context;
private final int disconnectIntervalSec;
private final int reconnectIntervalSec;
private final ClientConnectionManager clientConnectionManager;
private final NetworkParameters params;
private final int connectTimeoutMillis;
private final int vMinRequiredProtocolVersion;
private final PeerConncetionInfo peerConncetionInfo;
private Optional<Timer> disconnectScheduler = Optional.empty();
private Optional<Timer> reconnectScheduler = Optional.empty();
private final AtomicBoolean shutdownCalled = new AtomicBoolean();
public PeerConnection(Context context,
PeerConncetionInfo peerConncetionInfo,
BlockingClientManager blockingClientManager,
int connectTimeoutMillis,
int disconnectIntervalSec,
int reconnectIntervalSec) {
this.context = context;
this.peerConncetionInfo = peerConncetionInfo;
this.clientConnectionManager = blockingClientManager;
this.connectTimeoutMillis = connectTimeoutMillis;
this.disconnectIntervalSec = disconnectIntervalSec;
this.reconnectIntervalSec = reconnectIntervalSec;
this.params = context.getParams();
vMinRequiredProtocolVersion = params.getProtocolVersionNum(NetworkParameters.ProtocolVersion.BLOOM_FILTER);
}
public CompletableFuture<Void> shutdown() {
shutdownCalled.set(true);
disconnectScheduler.ifPresent(Timer::stop);
reconnectScheduler.ifPresent(Timer::stop);
return CompletableFuture.runAsync(() -> {
log.info("shutdown {}", peerConncetionInfo);
Context.propagate(context);
disconnect();
}, SingleThreadExecutorUtils.getSingleThreadExecutor("shutdown-" + peerConncetionInfo.getShortId()));
}
public void connect() {
CompletableFuture.runAsync(() -> {
log.info("connect {}", peerConncetionInfo);
Context.propagate(context);
Peer peer = createPeer(peerConncetionInfo.getPeerAddress());
PeerConncetionInfo.ConnectionAttempt connectionAttempt = peerConncetionInfo.newConnectionAttempt(peer);
long ts = System.currentTimeMillis();
try {
peer.addConnectedEventListener((peer1, peerCount) -> {
connectionAttempt.setTimeToConnect(System.currentTimeMillis() - ts);
connectionAttempt.onConnected();
startAutoDisconnectAndReconnect();
});
peer.addDisconnectedEventListener((peer1, peerCount) -> {
long passed = System.currentTimeMillis() - ts;
if (Math.abs(passed - connectTimeoutMillis) < 100) {
// timeout is not handled as error in bitcoinj, but it simply disconnects
connectionAttempt.setTimeToFailure(passed);
connectionAttempt.onException(new TimeoutException("Connection timeout. Could not connect after " + passed / 1000 + " sec."));
} else {
connectionAttempt.setTimeToDisconnect(passed);
connectionAttempt.onDisconnected();
}
startAutoDisconnectAndReconnect();
});
openConnection(peer).join();
} catch (Exception exception) {
log.warn("Error at opening connection to peer {}", peerConncetionInfo, exception);
connectionAttempt.setTimeToFailure(System.currentTimeMillis() - ts);
connectionAttempt.onException(exception);
startAutoDisconnectAndReconnect();
}
}, SingleThreadExecutorUtils.getSingleThreadExecutor("connect-" + peerConncetionInfo.getShortId()));
}
private CompletableFuture<Void> disconnect() {
return peerConncetionInfo.getCurrentConnectionAttempt()
.map(currentConnectionAttempt -> CompletableFuture.runAsync(() -> {
log.info("disconnect {}", peerConncetionInfo);
Context.propagate(context);
currentConnectionAttempt.getPeer().close();
},
SingleThreadExecutorUtils.getSingleThreadExecutor("disconnect-" + peerConncetionInfo.getShortId())))
.orElse(CompletableFuture.completedFuture(null));
}
private void startAutoDisconnectAndReconnect() {
if (shutdownCalled.get()) {
return;
}
disconnectScheduler.ifPresent(Timer::stop);
disconnectScheduler = Optional.of(UserThread.runAfter(() -> {
if (shutdownCalled.get()) {
return;
}
disconnect()
.thenRun(() -> {
if (shutdownCalled.get()) {
return;
}
reconnectScheduler.ifPresent(Timer::stop);
reconnectScheduler = Optional.of(UserThread.runAfter(() -> {
if (shutdownCalled.get()) {
return;
}
connect();
}, reconnectIntervalSec));
});
}, disconnectIntervalSec));
}
private Peer createPeer(PeerAddress address) {
Peer peer = new Peer(params, getVersionMessage(address), address, null, 0, 0);
peer.setMinProtocolVersion(vMinRequiredProtocolVersion);
peer.setSocketTimeout(connectTimeoutMillis);
return peer;
}
private CompletableFuture<SocketAddress> openConnection(Peer peer) {
CompletableFuture<SocketAddress> future = new CompletableFuture<>();
ListenableFuture<SocketAddress> listenableFuture = clientConnectionManager.openConnection(peer.getAddress().toSocketAddress(), peer);
Futures.addCallback(listenableFuture, new FutureCallback<>() {
@Override
public void onSuccess(SocketAddress result) {
future.complete(result);
}
@Override
public void onFailure(@NotNull Throwable throwable) {
future.completeExceptionally(throwable);
}
}, MoreExecutors.directExecutor());
return future;
}
private VersionMessage getVersionMessage(PeerAddress address) {
VersionMessage versionMessage = new VersionMessage(params, 0);
versionMessage.bestHeight = 0;
versionMessage.time = Utils.currentTimeSeconds();
versionMessage.receivingAddr = address;
versionMessage.receivingAddr.setParent(versionMessage);
return versionMessage;
}
}

View File

@ -0,0 +1,150 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.btcnodemonitor.btc;
import bisq.core.btc.nodes.BtcNodes;
import bisq.core.btc.nodes.BtcNodesRepository;
import bisq.core.btc.nodes.LocalBitcoinNode;
import bisq.core.btc.nodes.ProxySocketFactory;
import bisq.common.UserThread;
import bisq.common.config.Config;
import bisq.common.util.SingleThreadExecutorUtils;
import org.bitcoinj.core.Context;
import org.bitcoinj.core.NetworkParameters;
import org.bitcoinj.core.PeerAddress;
import org.bitcoinj.core.PeerGroup;
import org.bitcoinj.net.BlockingClientManager;
import org.bitcoinj.utils.Threading;
import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy;
import java.time.Duration;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class PeerGroupService {
private final NetworkParameters params;
private final LocalBitcoinNode localBitcoinNode;
private final Context context;
private final PeerConncetionModel peerConncetionModel;
private Set<PeerConnection> peerConnections;
private BlockingClientManager blockingClientManager;
public PeerGroupService(Config config, PeerConncetionModel peerConncetionModel) {
this.peerConncetionModel = peerConncetionModel;
params = Config.baseCurrencyNetworkParameters();
context = new Context(params);
PeerGroup.setIgnoreHttpSeeds(true);
Threading.USER_THREAD = UserThread.getExecutor();
localBitcoinNode = new LocalBitcoinNode(config);
}
public void applySocks5Proxy(Optional<Socks5Proxy> socks5Proxy) {
int connectTimeoutMillis;
int disconnectIntervalSec;
int reconnectIntervalSec;
Set<PeerAddress> peerAddresses;
if (localBitcoinNode.shouldBeUsed()) {
InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress().getHostAddress(), params.getPort());
peerAddresses = Set.of(new PeerAddress(address));
connectTimeoutMillis = 1000;
disconnectIntervalSec = 5;
reconnectIntervalSec = 5;
blockingClientManager = new BlockingClientManager();
} else {
BtcNodes btcNodes = new BtcNodes();
List<PeerAddress> peerAddressList = btcNodesToPeerAddress(btcNodes, socks5Proxy);
peerAddresses = new HashSet<>(peerAddressList);
connectTimeoutMillis = socks5Proxy.map(s -> 60_000).orElse(10_000);
disconnectIntervalSec = 2;
reconnectIntervalSec = 120;
if (socks5Proxy.isPresent()) {
InetSocketAddress inetSocketAddress = new InetSocketAddress(socks5Proxy.get().getInetAddress(), socks5Proxy.get().getPort());
Proxy proxy = new Proxy(Proxy.Type.SOCKS, inetSocketAddress);
ProxySocketFactory proxySocketFactory = new ProxySocketFactory(proxy);
blockingClientManager = new BlockingClientManager(proxySocketFactory);
} else {
blockingClientManager = new BlockingClientManager();
}
}
log.info("Using peer addresses {}", peerAddresses);
blockingClientManager.setConnectTimeoutMillis(connectTimeoutMillis);
peerConncetionModel.fill(peerAddresses);
Set<PeerConncetionInfo> peerConncetionInfoSet = new HashSet<>(peerConncetionModel.getMap().values());
peerConnections = peerConncetionInfoSet.stream()
.map(peerConncetionInfo -> new PeerConnection(context,
peerConncetionInfo,
blockingClientManager,
connectTimeoutMillis,
disconnectIntervalSec,
reconnectIntervalSec))
.collect(Collectors.toSet());
}
public CompletableFuture<Void> start() {
return CompletableFuture.runAsync(() -> {
log.info("start");
Context.propagate(context);
blockingClientManager.startAsync();
blockingClientManager.awaitRunning();
}, SingleThreadExecutorUtils.getSingleThreadExecutor("start"));
}
public void connectToAll() {
peerConnections.forEach(PeerConnection::connect);
}
public CompletableFuture<Void> shutdown() {
return CompletableFuture.runAsync(() -> {
log.info("shutdown");
Context.propagate(context);
CountDownLatch latch = new CountDownLatch(peerConnections.size());
peerConnections.forEach(e -> e.shutdown().thenRun(latch::countDown));
try {
latch.await(5, TimeUnit.SECONDS);
blockingClientManager.stopAsync();
blockingClientManager.awaitTerminated(Duration.ofSeconds(2));
} catch (Exception e) {
throw new RuntimeException(e);
}
}, SingleThreadExecutorUtils.getSingleThreadExecutor("shutdown"));
}
private List<PeerAddress> btcNodesToPeerAddress(BtcNodes btcNodes, Optional<Socks5Proxy> proxy) {
List<BtcNodes.BtcNode> nodes = btcNodes.getProvidedBtcNodes();
BtcNodesRepository repository = new BtcNodesRepository(nodes);
return repository.getPeerAddresses(proxy.orElse(null));
}
}

View File

@ -0,0 +1,63 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.btcnodemonitor.btc;
import com.google.common.base.Joiner;
import java.util.LinkedList;
import java.util.List;
import static org.bitcoinj.core.VersionMessage.*;
/**
* Borrowed from VersionMessage and added NODE_P2P_V2
*/
public class ServiceBits {
public static final int NODE_P2P_V2 = 1 << 11;
public static String toString(long services) {
List<String> strings = new LinkedList<>();
if ((services & NODE_NETWORK) == NODE_NETWORK) {
strings.add("NETWORK");
services &= ~NODE_NETWORK;
}
if ((services & NODE_GETUTXOS) == NODE_GETUTXOS) {
strings.add("GETUTXOS");
services &= ~NODE_GETUTXOS;
}
if ((services & NODE_BLOOM) == NODE_BLOOM) {
strings.add("BLOOM");
services &= ~NODE_BLOOM;
}
if ((services & NODE_WITNESS) == NODE_WITNESS) {
strings.add("WITNESS");
services &= ~NODE_WITNESS;
}
if ((services & NODE_NETWORK_LIMITED) == NODE_NETWORK_LIMITED) {
strings.add("NETWORK_LIMITED");
services &= ~NODE_NETWORK_LIMITED;
}
if ((services & NODE_P2P_V2) == NODE_P2P_V2) {
strings.add("NODE_P2P_V2");
services &= ~NODE_P2P_V2;
}
if (services != 0)
strings.add("Unrecognized service bit: " + Long.toBinaryString(services));
return Joiner.on(", ").join(strings);
}
}

View File

@ -0,0 +1,245 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.btcnodemonitor.server;
import bisq.core.btc.nodes.BtcNodes;
import bisq.core.btc.nodes.LocalBitcoinNode;
import bisq.common.config.BaseCurrencyNetwork;
import bisq.common.config.Config;
import bisq.common.util.MathUtils;
import bisq.common.util.Profiler;
import bisq.common.util.SingleThreadExecutorUtils;
import org.bitcoinj.core.VersionMessage;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
import bisq.btcnodemonitor.btc.PeerConncetionInfo;
import bisq.btcnodemonitor.btc.PeerConncetionModel;
import bisq.btcnodemonitor.btc.ServiceBits;
import spark.Spark;
@Slf4j
public class SimpleHttpServer {
private final static String CLOSE_TAG = "</font><br/>";
private final static String WARNING_ICON = "&#9888; ";
private final static String ALERT_ICON = "&#9760; "; // &#9889; &#9889;
private final Config config;
@Getter
private final List<BtcNodes.BtcNode> providedBtcNodes;
private final Map<String, BtcNodes.BtcNode> btcNodeByAddress;
private final int port;
private final PeerConncetionModel peerConncetionModel;
private final String started;
private String html;
private int requestCounter;
private String networkInfo;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public SimpleHttpServer(Config config, PeerConncetionModel peerConncetionModel) {
this.config = config;
this.peerConncetionModel = peerConncetionModel;
started = new Date().toString();
this.providedBtcNodes = peerConncetionModel.getProvidedBtcNodes();
BaseCurrencyNetwork network = config.baseCurrencyNetwork;
if (config.useTorForBtcMonitor) {
port = network.isMainnet() ? 8000 : 8001;
networkInfo = network.isMainnet() ? "TOR/MAIN_NET" : "TOR/REG_TEST";
btcNodeByAddress = providedBtcNodes.stream()
.filter(e -> e.getOnionAddress() != null)
.collect(Collectors.toMap(BtcNodes.BtcNode::getOnionAddress, e -> e));
} else {
port = network.isMainnet() ? 8080 : 8081;
networkInfo = network.isMainnet() ? "Clearnet/MAIN_NET" : "Clearnet/REG_TEST";
if (new LocalBitcoinNode(config).shouldBeUsed()) {
btcNodeByAddress = new HashMap<>();
btcNodeByAddress.put("127.0.0.1", new BtcNodes.BtcNode("localhost", null, "127.0.0.1",
Config.baseCurrencyNetworkParameters().getPort(), "n/a"));
} else {
if (network.isMainnet()) {
btcNodeByAddress = providedBtcNodes.stream()
.filter(e -> e.getAddress() != null)
.collect(Collectors.toMap(BtcNodes.BtcNode::getAddress, e -> e));
} else {
btcNodeByAddress = new HashMap<>();
}
}
}
html = "Monitor for Bitcoin nodes created for " + networkInfo;
}
public CompletableFuture<Void> start() {
html = "Monitor for Bitcoin nodes starting up for " + networkInfo;
return CompletableFuture.runAsync(() -> {
log.info("Server listen on {}", port);
Spark.port(port);
Spark.get("/", (req, res) -> {
log.info("Incoming request from: {}", req.userAgent());
return html;
});
}, SingleThreadExecutorUtils.getSingleThreadExecutor("SimpleHttpServer.start"));
}
public CompletableFuture<Void> shutdown() {
return CompletableFuture.runAsync(() -> {
log.info("shutDown");
Spark.stop();
log.info("shutDown completed");
});
}
public void onChange() {
StringBuilder sb = new StringBuilder();
sb.append("<result>" +
"<head>" +
"<style type=\"text/css\">" +
" a {" +
" text-decoration:none; color: black;" +
" }" +
" #warn { color: #ff7700; } " +
" #error { color: #ff0000; } " +
"table, th, td {border: 1px solid black;}" +
"</style></head>" +
"<body><h3>")
.append("Monitor for Bitcoin nodes using ").append(networkInfo)
.append(", started at: ").append(started).append("<br/>")
.append("System load: ").append(Profiler.getSystemLoad()).append("<br/>")
.append("<br/>").append("<table style=\"width:100%\">")
.append("<tr>")
.append("<th align=\"left\">Node operator</th>")
.append("<th align=\"left\">Connection attempts</th>")
.append("<th align=\"left\">Node info</th>").append("</tr>");
Map<String, PeerConncetionInfo> peersMap = peerConncetionModel.getMap();
btcNodeByAddress.values().stream()
.sorted(Comparator.comparing(BtcNodes.BtcNode::getId))
.forEach(btcNode -> {
sb.append("<tr valign=\"top\">");
String address = btcNode.getAddress();
PeerConncetionInfo peerConncetionInfo = peersMap.get(address);
if (peersMap.containsKey(address)) {
sb.append("<td>").append(getOperatorInfo(btcNode, address)).append("</td>")
.append("<td>").append(getConnectionInfo(peerConncetionInfo)).append("</td>")
.append("<td>").append(getNodeInfo(peerConncetionInfo)).append("</td>");
sb.append("</tr>");
return;
}
address = btcNode.getOnionAddress();
peerConncetionInfo = peersMap.get(address);
if (peersMap.containsKey(address)) {
sb.append("<td>").append(getOperatorInfo(btcNode, address)).append("</td>")
.append("<td>").append(getConnectionInfo(peerConncetionInfo)).append("</td>")
.append("<td>").append(getNodeInfo(peerConncetionInfo)).append("</td>");
} else {
/* sb.append("<td>").append(getOperatorInfo(btcNode, null)).append("</td>")
.append("<td>").append("n/a").append("</td>");*/
}
sb.append("</tr>");
});
sb.append("</table></body></result>");
html = sb.toString();
}
private String getOperatorInfo(BtcNodes.BtcNode btcNode, @Nullable String address) {
StringBuilder sb = new StringBuilder();
sb.append(btcNode.getId()).append("<br/>");
if (address != null) {
sb.append("Address: ").append(address).append("<br/>");
}
return sb.toString();
}
private String getConnectionInfo(PeerConncetionInfo info) {
StringBuilder sb = new StringBuilder();
sb.append("Num connection attempts: ").append(info.getNumConnectionAttempts()).append("<br/>");
double failureRate = info.getFailureRate();
String failureRateString = MathUtils.roundDouble(failureRate * 100, 2) + "%";
if (failureRate > 0.9) {
failureRateString = asError(failureRateString, failureRateString);
} else if (failureRate > 0.3) {
failureRateString = asWarn(failureRateString, failureRateString);
}
sb.append("FailureRate (success/failures): ").append(failureRateString)
.append("(").append(info.getNumFailures()).append(" / ")
.append(info.getNumFailures()).append(")").append("<br/>");
info.getLastExceptionMessage().ifPresent(errorMessage ->
sb.append(asError("LastExceptionMessage: " + errorMessage, info.getAllExceptionMessages()))
.append("<br/>"));
sb.append("Duration to connect: ").append(MathUtils.roundDouble(info.getLastSuccessfulConnectTime() / 1000d, 2)).append(" sec").append("<br/>");
return sb.toString();
}
private String getNodeInfo(PeerConncetionInfo info) {
if (info.getLastSuccessfulConnected().isEmpty()) {
return "";
}
PeerConncetionInfo.ConnectionAttempt attempt = info.getLastSuccessfulConnected().get();
if (attempt.getVersionMessage().isEmpty()) {
return "";
}
int index = info.getIndex(attempt);
StringBuilder sb = new StringBuilder();
VersionMessage versionMessage = attempt.getVersionMessage().get();
sb.append("Result from connection attempt: ").append(index).append("<br/>");
sb.append("Height: ").append(versionMessage.bestHeight).append("<br/>");
sb.append("Version: ").append(versionMessage.subVer).append(" (").append(versionMessage.clientVersion).append(")").append("<br/>");
String serviceBits = ServiceBits.toString(versionMessage.localServices);
sb.append("Services: ").append(serviceBits)
.append(" (").append(versionMessage.localServices).append(")").append("<br/>");
long peerTime = versionMessage.time * 1000;
sb.append("Time: ").append(String.format(Locale.US, "%tF %tT", peerTime, peerTime));
return sb.toString();
}
private static String decorate(String style, String value, String tooltip) {
return "<b><a id=\"" + style + "\" href=\"#\" title=\"" + tooltip + "\">" + value + "</a></b>";
}
private static String asWarn(String value, String tooltip) {
return decorate("warn", value, tooltip);
}
private static String asError(String value, String tooltip) {
return decorate("error", value, tooltip);
}
}

View File

@ -0,0 +1,93 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.btcnodemonitor.socksProxy;
import bisq.network.Socks5ProxyProvider;
import bisq.network.p2p.network.NewTor;
import bisq.common.config.Config;
import bisq.common.util.SingleThreadExecutorUtils;
import org.berndpruenster.netlayer.tor.Tor;
import org.berndpruenster.netlayer.tor.TorCtlException;
import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy;
import java.nio.file.Paths;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import static com.google.common.base.Preconditions.checkArgument;
@Slf4j
public class ProxySetup {
@Getter
private final File torDir;
private final SocksProxyFactory socksProxyFactory;
private final Config config;
private NewTor torMode;
private Tor tor;
public ProxySetup(Config config) {
this.config = config;
socksProxyFactory = new SocksProxyFactory("127.0.0.1");
Socks5ProxyProvider socks5ProxyProvider = new Socks5ProxyProvider("", "");
socks5ProxyProvider.setSocks5ProxyInternal(socksProxyFactory);
String networkDirName = config.baseCurrencyNetwork.name().toLowerCase();
torDir = Paths.get(config.appDataDir.getPath(), networkDirName, "tor").toFile();
}
public CompletableFuture<Optional<Socks5Proxy>> createSocksProxy() {
log.info("createSocksProxy");
if (!config.useTorForBtcMonitor) {
return CompletableFuture.completedFuture(Optional.empty());
}
checkArgument(tor == null);
return CompletableFuture.supplyAsync(() -> {
torMode = new NewTor(torDir, null, "", ArrayList::new);
try {
// blocking
tor = torMode.getTor();
socksProxyFactory.setTor(tor);
return Optional.of(socksProxyFactory.getSocksProxy());
} catch (IOException | TorCtlException e) {
throw new RuntimeException(e);
}
}, SingleThreadExecutorUtils.getSingleThreadExecutor("ProxySetup.start"));
}
public CompletableFuture<Void> shutdown() {
log.warn("start shutdown");
return CompletableFuture.runAsync(() -> {
if (tor != null) {
tor.shutdown();
}
log.warn("shutdown completed");
})
.orTimeout(2, TimeUnit.SECONDS);
}
}

View File

@ -0,0 +1,59 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.btcnodemonitor.socksProxy;
import bisq.network.p2p.network.Socks5ProxyInternalFactory;
import org.berndpruenster.netlayer.tor.Tor;
import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
@Slf4j
public class SocksProxyFactory implements Socks5ProxyInternalFactory {
private final String torControlHost;
@Setter
@Nullable
private Tor tor;
private Socks5Proxy socksProxy;
public SocksProxyFactory(String torControlHost) {
this.torControlHost = torControlHost;
}
@Override
public Socks5Proxy getSocksProxy() {
if (tor == null) {
return null;
} else {
try {
if (socksProxy == null) {
socksProxy = tor.getProxy(torControlHost, null);
}
return socksProxy;
} catch (Throwable t) {
log.error("Error at getSocksProxy", t);
return null;
}
}
}
}

View File

@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="CONSOLE_APPENDER" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%highlight(%d{MMM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{40}: %msg %xEx%n)</pattern>
</encoder>
</appender>
<root level="TRACE">
<appender-ref ref="CONSOLE_APPENDER"/>
</root>
<logger name="org.bitcoinj" level="INFO"/>
<logger name="spark.embeddedserver" level="WARN"/>
<logger name="org.eclipse.jetty.server" level="WARN"/>
<logger name="org.berndpruenster.netlayer.tor.Tor" level="WARN"/>
</configuration>

View File

@ -21,6 +21,7 @@ toolchainManagement {
include 'proto'
include 'assets'
include 'btcnodemonitor'
include 'common'
include 'p2p'
include 'core'