Merge pull request #7194 from HenrikJannsen/improve-btc-monitor

Improve btc monitor
This commit is contained in:
Alejandro García 2024-07-19 00:16:49 +00:00 committed by GitHub
commit 73fc00026a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 179 additions and 87 deletions

View File

@ -54,8 +54,7 @@ public class BtcNodeMonitor {
return simpleHttpServer.start()
.thenCompose(nil -> proxySetup.createSocksProxy())
.thenAccept(peerGroupService::applySocks5Proxy)
.thenCompose(nil -> peerGroupService.start())
.thenRun(peerGroupService::connectToAll);
.thenCompose(nil -> peerGroupService.start());
}
public CompletableFuture<Void> shutdown() {

View File

@ -51,6 +51,7 @@ public class BtcNodeMonitorMain implements GracefulShutDownHandler {
@Override
public void gracefulShutDown(ResultHandler resultHandler) {
log.info("gracefulShutDown");
btcNodeMonitor.shutdown().join();
System.exit(0);
resultHandler.handleResult();

View File

@ -64,7 +64,9 @@ public class PeerConncetionInfo {
}
public String getShortId() {
return getAddress().substring(0, 12) + "...";
String address = getAddress();
int endIndex = Math.min(address.length(), 12);
return address.substring(0, endIndex) + "...";
}
public int getNumConnectionAttempts() {

View File

@ -17,8 +17,6 @@
package bisq.btcnodemonitor.btc;
import bisq.common.Timer;
import bisq.common.UserThread;
import bisq.common.util.SingleThreadExecutorUtils;
import org.bitcoinj.core.Context;
@ -27,6 +25,8 @@ import org.bitcoinj.core.Peer;
import org.bitcoinj.core.PeerAddress;
import org.bitcoinj.core.Utils;
import org.bitcoinj.core.VersionMessage;
import org.bitcoinj.core.listeners.PeerConnectedEventListener;
import org.bitcoinj.core.listeners.PeerDisconnectedEventListener;
import org.bitcoinj.net.BlockingClientManager;
import org.bitcoinj.net.ClientConnectionManager;
@ -39,6 +39,8 @@ import java.net.SocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
@ -55,9 +57,15 @@ public class PeerConnection {
private final int connectTimeoutMillis;
private final int vMinRequiredProtocolVersion;
private final PeerConncetionInfo peerConncetionInfo;
private Optional<Timer> disconnectScheduler = Optional.empty();
private Optional<Timer> reconnectScheduler = Optional.empty();
private final AtomicBoolean shutdownCalled = new AtomicBoolean();
private final ExecutorService connectionExecutor;
private final ExecutorService onConnectionExecutor;
private final ExecutorService onDisConnectionExecutor;
private Optional<PeerConnectedEventListener> peerConnectedEventListener = Optional.empty();
private Optional<PeerDisconnectedEventListener> peerDisconnectedEventListener = Optional.empty();
private Optional<CompletableFuture<Void>> connectAndDisconnectFuture = Optional.empty();
private Optional<CompletableFuture<Void>> innerConnectAndDisconnectFuture = Optional.empty();
private Optional<CompletableFuture<SocketAddress>> openConnectionFuture = Optional.empty();
public PeerConnection(Context context,
PeerConncetionInfo peerConncetionInfo,
@ -72,37 +80,110 @@ public class PeerConnection {
this.disconnectIntervalSec = disconnectIntervalSec;
this.reconnectIntervalSec = reconnectIntervalSec;
connectionExecutor = SingleThreadExecutorUtils.getSingleThreadExecutor("connection-" + peerConncetionInfo.getShortId());
onConnectionExecutor = SingleThreadExecutorUtils.getSingleThreadExecutor("onConnection-" + peerConncetionInfo.getShortId());
onDisConnectionExecutor = SingleThreadExecutorUtils.getSingleThreadExecutor("onDisConnection-" + peerConncetionInfo.getShortId());
this.params = context.getParams();
vMinRequiredProtocolVersion = params.getProtocolVersionNum(NetworkParameters.ProtocolVersion.BLOOM_FILTER);
}
public void start() {
CompletableFuture.runAsync(() -> {
do {
connectAndDisconnectFuture = Optional.of(connectAndDisconnect());
try {
connectAndDisconnectFuture.get().join();
} catch (Exception ignore) {
}
}
while (!shutdownCalled.get() && !Thread.currentThread().isInterrupted());
log.info("Exiting startConnectAndDisconnectLoop loop. Expected at shutdown");
}, connectionExecutor);
}
public CompletableFuture<Void> shutdown() {
log.info("Shutdown");
shutdownCalled.set(true);
disconnectScheduler.ifPresent(Timer::stop);
reconnectScheduler.ifPresent(Timer::stop);
peerConncetionInfo.getCurrentConnectionAttempt().ifPresent(connectionAttempt -> {
Peer peer = connectionAttempt.getPeer();
peerDisconnectedEventListener.ifPresent(peer::removeDisconnectedEventListener);
peerDisconnectedEventListener = Optional.empty();
peerConnectedEventListener.ifPresent(peer::removeConnectedEventListener);
peerConnectedEventListener = Optional.empty();
});
return CompletableFuture.runAsync(() -> {
log.info("shutdown {}", peerConncetionInfo);
Context.propagate(context);
disconnect();
peerConncetionInfo.getCurrentConnectionAttempt()
.ifPresent(currentConnectionAttempt -> currentConnectionAttempt.getPeer().close());
connectAndDisconnectFuture.ifPresent(connectFuture -> connectFuture.complete(null));
innerConnectAndDisconnectFuture.ifPresent(connectFuture -> connectFuture.complete(null));
connectionExecutor.shutdownNow();
onConnectionExecutor.shutdownNow();
onDisConnectionExecutor.shutdownNow();
}, SingleThreadExecutorUtils.getSingleThreadExecutor("shutdown-" + peerConncetionInfo.getShortId()));
}
public void connect() {
CompletableFuture.runAsync(() -> {
log.info("connect {}", peerConncetionInfo);
private CompletableFuture<Void> connectAndDisconnect() {
CompletableFuture<Void> future = new CompletableFuture<>();
innerConnectAndDisconnectFuture = Optional.of(CompletableFuture.runAsync(() -> {
log.info("\n>> Connect to {}", peerConncetionInfo.getAddress());
Context.propagate(context);
Peer peer = createPeer(peerConncetionInfo.getPeerAddress());
PeerConncetionInfo.ConnectionAttempt connectionAttempt = peerConncetionInfo.newConnectionAttempt(peer);
long ts = System.currentTimeMillis();
connectionAttempt.setConnectionStartedTs(ts);
try {
peer.addConnectedEventListener((peer1, peerCount) -> {
connectionAttempt.setDurationUntilConnection(System.currentTimeMillis() - ts);
connectionAttempt.setConnectionSuccessTs(System.currentTimeMillis());
peerConnectedEventListener = Optional.of((p, peerCount) -> {
peerConnectedEventListener.ifPresent(peer::removeConnectedEventListener);
peerConnectedEventListener = Optional.empty();
if (shutdownCalled.get()) {
return;
}
try {
log.info("\n## Successfully connected to {}", peer.getAddress());
long now = System.currentTimeMillis();
connectionAttempt.setDurationUntilConnection(now - ts);
connectionAttempt.setConnectionSuccessTs(now);
connectionAttempt.onConnected();
startAutoDisconnectAndReconnect();
});
peer.addDisconnectedEventListener((peer1, peerCount) -> {
try {
Thread.sleep(disconnectIntervalSec * 1000L); // 2 sec
} catch (InterruptedException ignore) {
}
if (shutdownCalled.get()) {
return;
}
log.info("Close peer {}", peer.getAddress());
peer.close();
} catch (Exception exception) {
log.warn("Exception at onConnection handler. {}", exception.toString());
handleException(exception, peer, connectionAttempt, ts, future);
}
});
peer.addConnectedEventListener(onConnectionExecutor, peerConnectedEventListener.get());
peerDisconnectedEventListener = Optional.of((p, peerCount) -> {
// At socket timeouts we get called twice from Bitcoinj
if (peerDisconnectedEventListener.isEmpty()) {
log.error("We got called twice at socketimeout from BitcoinJ and ignore the 2nd call.");
return;
}
peerDisconnectedEventListener.ifPresent(peer::removeDisconnectedEventListener);
peerDisconnectedEventListener = Optional.empty();
if (shutdownCalled.get()) {
return;
}
if (openConnectionFuture.isPresent() && !openConnectionFuture.get().isDone()) {
// BitcoinJ calls onDisconnect at sockettimeout without throwing an error on the open connection future.
openConnectionFuture.get().completeExceptionally(new TimeoutException("Open connection failed due timeout at PeerSocketHandler"));
return;
}
try {
log.info("\n<< Disconnected from {}", peer.getAddress());
long passed = System.currentTimeMillis() - ts;
// Timeout is not handled as error in bitcoinj, but it simply disconnects
// If we had a successful connect before we got versionMessage set, otherwise its from an error.
@ -113,52 +194,62 @@ public class PeerConnection {
connectionAttempt.setDurationUntilDisConnection(passed);
connectionAttempt.onDisconnected();
}
startAutoDisconnectAndReconnect();
});
openConnection(peer).join();
try {
Thread.sleep(reconnectIntervalSec * 2000L); // 120 sec
} catch (InterruptedException ignore) {
}
if (shutdownCalled.get()) {
return;
}
future.complete(null);
} catch (Exception exception) {
log.warn("Exception at onDisconnection handler. {}", exception.toString());
handleException(exception, peer, connectionAttempt, ts, future);
}
});
peer.addDisconnectedEventListener(onDisConnectionExecutor, peerDisconnectedEventListener.get());
try {
openConnectionFuture = Optional.of(openConnection(peer));
openConnectionFuture.get().join();
} catch (Exception exception) {
log.warn("Error at opening connection to peer {}", peerConncetionInfo, exception);
connectionAttempt.setDurationUntilFailure(System.currentTimeMillis() - ts);
connectionAttempt.onException(exception);
startAutoDisconnectAndReconnect();
log.warn("Error at opening connection to peer {}. {}", peerConncetionInfo, exception.toString());
handleException(exception, peer, connectionAttempt, ts, future);
}
}, SingleThreadExecutorUtils.getSingleThreadExecutor("connect-" + peerConncetionInfo.getShortId()));
}, MoreExecutors.directExecutor()));
return future;
}
private CompletableFuture<Void> disconnect() {
return peerConncetionInfo.getCurrentConnectionAttempt()
.map(currentConnectionAttempt -> CompletableFuture.runAsync(() -> {
log.info("disconnect {}", peerConncetionInfo);
Context.propagate(context);
currentConnectionAttempt.getPeer().close();
},
SingleThreadExecutorUtils.getSingleThreadExecutor("disconnect-" + peerConncetionInfo.getShortId())))
.orElse(CompletableFuture.completedFuture(null));
}
private void startAutoDisconnectAndReconnect() {
private void handleException(Throwable throwable,
Peer peer,
PeerConncetionInfo.ConnectionAttempt connectionAttempt,
long ts,
CompletableFuture<Void> future) {
peerDisconnectedEventListener.ifPresent(peer::removeDisconnectedEventListener);
peerDisconnectedEventListener = Optional.empty();
peerConnectedEventListener.ifPresent(peer::removeConnectedEventListener);
peerConnectedEventListener = Optional.empty();
if (shutdownCalled.get()) {
return;
}
disconnectScheduler.ifPresent(Timer::stop);
disconnectScheduler = Optional.of(UserThread.runAfter(() -> {
if (shutdownCalled.get()) {
return;
}
disconnect()
.thenRun(() -> {
if (shutdownCalled.get()) {
return;
}
reconnectScheduler.ifPresent(Timer::stop);
reconnectScheduler = Optional.of(UserThread.runAfter(() -> {
if (shutdownCalled.get()) {
return;
}
connect();
}, reconnectIntervalSec));
});
}, disconnectIntervalSec));
connectionAttempt.setDurationUntilFailure(System.currentTimeMillis() - ts);
connectionAttempt.onException(throwable);
try {
// Try disconnect
log.info("Try close peer {}", peer.getAddress());
peer.close();
} catch (Exception ignore) {
}
try {
Thread.sleep(reconnectIntervalSec * 1000L); // 120 sec
} catch (InterruptedException ignore) {
}
if (shutdownCalled.get()) {
return;
}
future.completeExceptionally(throwable);
}
private Peer createPeer(PeerAddress address) {

View File

@ -119,21 +119,24 @@ public class PeerGroupService {
Context.propagate(context);
blockingClientManager.startAsync();
blockingClientManager.awaitRunning();
}, SingleThreadExecutorUtils.getSingleThreadExecutor("start"));
}
public void connectToAll() {
peerConnections.forEach(PeerConnection::connect);
peerConnections.forEach(PeerConnection::start);
}, SingleThreadExecutorUtils.getSingleThreadExecutor("start"));
}
public CompletableFuture<Void> shutdown() {
return CompletableFuture.runAsync(() -> {
log.info("shutdown");
log.info("Shutdown all peerConnections");
Context.propagate(context);
CountDownLatch latch = new CountDownLatch(peerConnections.size());
peerConnections.forEach(e -> e.shutdown().thenRun(latch::countDown));
peerConnections.forEach(peerConnection -> peerConnection.shutdown()
.thenRun(latch::countDown));
try {
latch.await(5, TimeUnit.SECONDS);
if (latch.await(3, TimeUnit.SECONDS)) {
log.info("All peerConnections shut down.");
} else {
log.info("Shutdown of peerConnections not completed in time.");
}
blockingClientManager.stopAsync();
blockingClientManager.awaitTerminated(Duration.ofSeconds(2));
} catch (Exception e) {

View File

@ -56,28 +56,24 @@ public class SimpleHttpServer {
private final static String CLOSE_TAG = "</font><br/>";
private final static String WARNING_ICON = "&#9888; ";
private final static String ALERT_ICON = "&#9760; "; // &#9889; &#9889;
private final Config config;
@Getter
private final List<BtcNodes.BtcNode> providedBtcNodes;
private final Map<String, BtcNodes.BtcNode> btcNodeByAddress;
private final int port;
private final PeerConncetionModel peerConncetionModel;
private final String started;
private final String networkInfo;
private String html;
private int requestCounter;
private String networkInfo;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public SimpleHttpServer(Config config, PeerConncetionModel peerConncetionModel) {
this.config = config;
this.peerConncetionModel = peerConncetionModel;
started = new Date().toString();
this.providedBtcNodes = peerConncetionModel.getProvidedBtcNodes();
providedBtcNodes = peerConncetionModel.getProvidedBtcNodes();
BaseCurrencyNetwork network = config.baseCurrencyNetwork;
if (config.useTorForBtcMonitor) {
@ -121,9 +117,9 @@ public class SimpleHttpServer {
public CompletableFuture<Void> shutdown() {
return CompletableFuture.runAsync(() -> {
log.info("shutDown");
log.info("stop Spark server");
Spark.stop();
log.info("shutDown completed");
log.info("Spark server stopped");
});
}

View File

@ -81,12 +81,12 @@ public class ProxySetup {
}
public CompletableFuture<Void> shutdown() {
log.warn("start shutdown");
log.info("Shutdown tor");
return CompletableFuture.runAsync(() -> {
if (tor != null) {
tor.shutdown();
log.info("Tor shutdown completed");
}
log.warn("shutdown completed");
})
.orTimeout(2, TimeUnit.SECONDS);
}

View File

@ -63,7 +63,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -144,9 +143,9 @@ public class Utilities {
return executor;
}
public static void shutdownAndAwaitTermination(ExecutorService executor, long timeout, TimeUnit unit) {
public static boolean shutdownAndAwaitTermination(ExecutorService executor, long timeout, TimeUnit unit) {
//noinspection UnstableApiUsage
MoreExecutors.shutdownAndAwaitTermination(executor, timeout, unit);
return MoreExecutors.shutdownAndAwaitTermination(executor, timeout, unit);
}
public static <V> FutureCallback<V> failureCallback(Consumer<Throwable> errorHandler) {

View File

@ -17,20 +17,21 @@
package bisq.network.p2p.network;
import org.berndpruenster.netlayer.tor.NativeTor;
import org.berndpruenster.netlayer.tor.Tor;
import org.berndpruenster.netlayer.tor.TorCtlException;
import org.berndpruenster.netlayer.tor.Torrc;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.stream.Collectors;
import org.berndpruenster.netlayer.tor.NativeTor;
import org.berndpruenster.netlayer.tor.Tor;
import org.berndpruenster.netlayer.tor.TorCtlException;
import org.berndpruenster.netlayer.tor.Torrc;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
@ -65,7 +66,7 @@ public class NewTor extends TorMode {
long ts1 = new Date().getTime();
Collection<String> bridgeEntries = bridgeAddressProvider.getBridgeAddresses();
if (bridgeEntries != null)
if (bridgeEntries != null && !bridgeEntries.isEmpty())
log.info("Using bridges: {}", bridgeEntries.stream().collect(Collectors.joining(",")));
Torrc override = null;