Improve btc monitor

Signed-off-by: HenrikJannsen <boilingfrog@gmx.com>
This commit is contained in:
HenrikJannsen 2024-07-05 16:00:02 +07:00
parent 52d0e151df
commit d99e251615
No known key found for this signature in database
GPG Key ID: 02AA2BAE387C8307
7 changed files with 170 additions and 78 deletions

View File

@ -54,8 +54,7 @@ public class BtcNodeMonitor {
return simpleHttpServer.start() return simpleHttpServer.start()
.thenCompose(nil -> proxySetup.createSocksProxy()) .thenCompose(nil -> proxySetup.createSocksProxy())
.thenAccept(peerGroupService::applySocks5Proxy) .thenAccept(peerGroupService::applySocks5Proxy)
.thenCompose(nil -> peerGroupService.start()) .thenCompose(nil -> peerGroupService.start());
.thenRun(peerGroupService::connectToAll);
} }
public CompletableFuture<Void> shutdown() { public CompletableFuture<Void> shutdown() {

View File

@ -51,6 +51,7 @@ public class BtcNodeMonitorMain implements GracefulShutDownHandler {
@Override @Override
public void gracefulShutDown(ResultHandler resultHandler) { public void gracefulShutDown(ResultHandler resultHandler) {
log.info("gracefulShutDown");
btcNodeMonitor.shutdown().join(); btcNodeMonitor.shutdown().join();
System.exit(0); System.exit(0);
resultHandler.handleResult(); resultHandler.handleResult();

View File

@ -64,7 +64,9 @@ public class PeerConncetionInfo {
} }
public String getShortId() { public String getShortId() {
return getAddress().substring(0, 12) + "..."; String address = getAddress();
int endIndex = Math.min(address.length(), 12);
return address.substring(0, endIndex) + "...";
} }
public int getNumConnectionAttempts() { public int getNumConnectionAttempts() {

View File

@ -17,8 +17,6 @@
package bisq.btcnodemonitor.btc; package bisq.btcnodemonitor.btc;
import bisq.common.Timer;
import bisq.common.UserThread;
import bisq.common.util.SingleThreadExecutorUtils; import bisq.common.util.SingleThreadExecutorUtils;
import org.bitcoinj.core.Context; import org.bitcoinj.core.Context;
@ -27,6 +25,8 @@ import org.bitcoinj.core.Peer;
import org.bitcoinj.core.PeerAddress; import org.bitcoinj.core.PeerAddress;
import org.bitcoinj.core.Utils; import org.bitcoinj.core.Utils;
import org.bitcoinj.core.VersionMessage; import org.bitcoinj.core.VersionMessage;
import org.bitcoinj.core.listeners.PeerConnectedEventListener;
import org.bitcoinj.core.listeners.PeerDisconnectedEventListener;
import org.bitcoinj.net.BlockingClientManager; import org.bitcoinj.net.BlockingClientManager;
import org.bitcoinj.net.ClientConnectionManager; import org.bitcoinj.net.ClientConnectionManager;
@ -39,6 +39,8 @@ import java.net.SocketAddress;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -55,9 +57,15 @@ public class PeerConnection {
private final int connectTimeoutMillis; private final int connectTimeoutMillis;
private final int vMinRequiredProtocolVersion; private final int vMinRequiredProtocolVersion;
private final PeerConncetionInfo peerConncetionInfo; private final PeerConncetionInfo peerConncetionInfo;
private Optional<Timer> disconnectScheduler = Optional.empty();
private Optional<Timer> reconnectScheduler = Optional.empty();
private final AtomicBoolean shutdownCalled = new AtomicBoolean(); private final AtomicBoolean shutdownCalled = new AtomicBoolean();
private final ExecutorService connectionExecutor;
private final ExecutorService onConnectionExecutor;
private final ExecutorService onDisConnectionExecutor;
private Optional<PeerConnectedEventListener> peerConnectedEventListener = Optional.empty();
private Optional<PeerDisconnectedEventListener> peerDisconnectedEventListener = Optional.empty();
private Optional<CompletableFuture<Void>> connectAndDisconnectFuture = Optional.empty();
private Optional<CompletableFuture<Void>> innerConnectAndDisconnectFuture = Optional.empty();
private Optional<CompletableFuture<SocketAddress>> openConnectionFuture = Optional.empty();
public PeerConnection(Context context, public PeerConnection(Context context,
PeerConncetionInfo peerConncetionInfo, PeerConncetionInfo peerConncetionInfo,
@ -72,37 +80,110 @@ public class PeerConnection {
this.disconnectIntervalSec = disconnectIntervalSec; this.disconnectIntervalSec = disconnectIntervalSec;
this.reconnectIntervalSec = reconnectIntervalSec; this.reconnectIntervalSec = reconnectIntervalSec;
connectionExecutor = SingleThreadExecutorUtils.getSingleThreadExecutor("connection-" + peerConncetionInfo.getShortId());
onConnectionExecutor = SingleThreadExecutorUtils.getSingleThreadExecutor("onConnection-" + peerConncetionInfo.getShortId());
onDisConnectionExecutor = SingleThreadExecutorUtils.getSingleThreadExecutor("onDisConnection-" + peerConncetionInfo.getShortId());
this.params = context.getParams(); this.params = context.getParams();
vMinRequiredProtocolVersion = params.getProtocolVersionNum(NetworkParameters.ProtocolVersion.BLOOM_FILTER); vMinRequiredProtocolVersion = params.getProtocolVersionNum(NetworkParameters.ProtocolVersion.BLOOM_FILTER);
} }
public void start() {
CompletableFuture.runAsync(() -> {
do {
connectAndDisconnectFuture = Optional.of(connectAndDisconnect());
try {
connectAndDisconnectFuture.get().join();
} catch (Exception ignore) {
}
}
while (!shutdownCalled.get() && !Thread.currentThread().isInterrupted());
log.info("Exiting startConnectAndDisconnectLoop loop. Expected at shutdown");
}, connectionExecutor);
}
public CompletableFuture<Void> shutdown() { public CompletableFuture<Void> shutdown() {
log.info("Shutdown");
shutdownCalled.set(true); shutdownCalled.set(true);
disconnectScheduler.ifPresent(Timer::stop); peerConncetionInfo.getCurrentConnectionAttempt().ifPresent(connectionAttempt -> {
reconnectScheduler.ifPresent(Timer::stop); Peer peer = connectionAttempt.getPeer();
peerDisconnectedEventListener.ifPresent(peer::removeDisconnectedEventListener);
peerDisconnectedEventListener = Optional.empty();
peerConnectedEventListener.ifPresent(peer::removeConnectedEventListener);
peerConnectedEventListener = Optional.empty();
});
return CompletableFuture.runAsync(() -> { return CompletableFuture.runAsync(() -> {
log.info("shutdown {}", peerConncetionInfo);
Context.propagate(context); Context.propagate(context);
disconnect(); peerConncetionInfo.getCurrentConnectionAttempt()
.ifPresent(currentConnectionAttempt -> currentConnectionAttempt.getPeer().close());
connectAndDisconnectFuture.ifPresent(connectFuture -> connectFuture.complete(null));
innerConnectAndDisconnectFuture.ifPresent(connectFuture -> connectFuture.complete(null));
connectionExecutor.shutdownNow();
onConnectionExecutor.shutdownNow();
onDisConnectionExecutor.shutdownNow();
}, SingleThreadExecutorUtils.getSingleThreadExecutor("shutdown-" + peerConncetionInfo.getShortId())); }, SingleThreadExecutorUtils.getSingleThreadExecutor("shutdown-" + peerConncetionInfo.getShortId()));
} }
public void connect() { private CompletableFuture<Void> connectAndDisconnect() {
CompletableFuture.runAsync(() -> { CompletableFuture<Void> future = new CompletableFuture<>();
log.info("connect {}", peerConncetionInfo); innerConnectAndDisconnectFuture = Optional.of(CompletableFuture.runAsync(() -> {
log.info("\n>> Connect to {}", peerConncetionInfo.getAddress());
Context.propagate(context); Context.propagate(context);
Peer peer = createPeer(peerConncetionInfo.getPeerAddress()); Peer peer = createPeer(peerConncetionInfo.getPeerAddress());
PeerConncetionInfo.ConnectionAttempt connectionAttempt = peerConncetionInfo.newConnectionAttempt(peer); PeerConncetionInfo.ConnectionAttempt connectionAttempt = peerConncetionInfo.newConnectionAttempt(peer);
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
connectionAttempt.setConnectionStartedTs(ts); connectionAttempt.setConnectionStartedTs(ts);
peerConnectedEventListener = Optional.of((p, peerCount) -> {
peerConnectedEventListener.ifPresent(peer::removeConnectedEventListener);
peerConnectedEventListener = Optional.empty();
if (shutdownCalled.get()) {
return;
}
try { try {
peer.addConnectedEventListener((peer1, peerCount) -> { log.info("\n## Successfully connected to {}", peer.getAddress());
connectionAttempt.setDurationUntilConnection(System.currentTimeMillis() - ts); long now = System.currentTimeMillis();
connectionAttempt.setConnectionSuccessTs(System.currentTimeMillis()); connectionAttempt.setDurationUntilConnection(now - ts);
connectionAttempt.setConnectionSuccessTs(now);
connectionAttempt.onConnected(); connectionAttempt.onConnected();
startAutoDisconnectAndReconnect();
try {
Thread.sleep(disconnectIntervalSec * 1000L); // 2 sec
} catch (InterruptedException ignore) {
}
if (shutdownCalled.get()) {
return;
}
log.info("Close peer {}", peer.getAddress());
peer.close();
} catch (Exception exception) {
log.warn("Exception at onConnection handler. {}", exception.toString());
handleException(exception, peer, connectionAttempt, ts, future);
}
}); });
peer.addDisconnectedEventListener((peer1, peerCount) -> { peer.addConnectedEventListener(onConnectionExecutor, peerConnectedEventListener.get());
peerDisconnectedEventListener = Optional.of((p, peerCount) -> {
// At socket timeouts we get called twice from Bitcoinj
if (peerDisconnectedEventListener.isEmpty()) {
log.error("We got called twice at socketimeout from BitcoinJ and ignore the 2nd call.");
return;
}
peerDisconnectedEventListener.ifPresent(peer::removeDisconnectedEventListener);
peerDisconnectedEventListener = Optional.empty();
if (shutdownCalled.get()) {
return;
}
if (openConnectionFuture.isPresent() && !openConnectionFuture.get().isDone()) {
// BitcoinJ calls onDisconnect at sockettimeout without throwing an error on the open connection future.
openConnectionFuture.get().completeExceptionally(new TimeoutException("Open connection failed due timeout at PeerSocketHandler"));
return;
}
try {
log.info("\n<< Disconnected from {}", peer.getAddress());
long passed = System.currentTimeMillis() - ts; long passed = System.currentTimeMillis() - ts;
// Timeout is not handled as error in bitcoinj, but it simply disconnects // Timeout is not handled as error in bitcoinj, but it simply disconnects
// If we had a successful connect before we got versionMessage set, otherwise its from an error. // If we had a successful connect before we got versionMessage set, otherwise its from an error.
@ -113,52 +194,62 @@ public class PeerConnection {
connectionAttempt.setDurationUntilDisConnection(passed); connectionAttempt.setDurationUntilDisConnection(passed);
connectionAttempt.onDisconnected(); connectionAttempt.onDisconnected();
} }
startAutoDisconnectAndReconnect(); try {
}); Thread.sleep(reconnectIntervalSec * 2000L); // 120 sec
openConnection(peer).join(); } catch (InterruptedException ignore) {
}
if (shutdownCalled.get()) {
return;
}
future.complete(null);
} catch (Exception exception) { } catch (Exception exception) {
log.warn("Error at opening connection to peer {}", peerConncetionInfo, exception); log.warn("Exception at onDisconnection handler. {}", exception.toString());
connectionAttempt.setDurationUntilFailure(System.currentTimeMillis() - ts); handleException(exception, peer, connectionAttempt, ts, future);
connectionAttempt.onException(exception);
startAutoDisconnectAndReconnect();
} }
}, SingleThreadExecutorUtils.getSingleThreadExecutor("connect-" + peerConncetionInfo.getShortId()));
}
private CompletableFuture<Void> disconnect() {
return peerConncetionInfo.getCurrentConnectionAttempt()
.map(currentConnectionAttempt -> CompletableFuture.runAsync(() -> {
log.info("disconnect {}", peerConncetionInfo);
Context.propagate(context);
currentConnectionAttempt.getPeer().close();
},
SingleThreadExecutorUtils.getSingleThreadExecutor("disconnect-" + peerConncetionInfo.getShortId())))
.orElse(CompletableFuture.completedFuture(null));
}
private void startAutoDisconnectAndReconnect() {
if (shutdownCalled.get()) {
return;
}
disconnectScheduler.ifPresent(Timer::stop);
disconnectScheduler = Optional.of(UserThread.runAfter(() -> {
if (shutdownCalled.get()) {
return;
}
disconnect()
.thenRun(() -> {
if (shutdownCalled.get()) {
return;
}
reconnectScheduler.ifPresent(Timer::stop);
reconnectScheduler = Optional.of(UserThread.runAfter(() -> {
if (shutdownCalled.get()) {
return;
}
connect();
}, reconnectIntervalSec));
}); });
}, disconnectIntervalSec)); peer.addDisconnectedEventListener(onDisConnectionExecutor, peerDisconnectedEventListener.get());
try {
openConnectionFuture = Optional.of(openConnection(peer));
openConnectionFuture.get().join();
} catch (Exception exception) {
log.warn("Error at opening connection to peer {}. {}", peerConncetionInfo, exception.toString());
handleException(exception, peer, connectionAttempt, ts, future);
}
}, MoreExecutors.directExecutor()));
return future;
}
private void handleException(Throwable throwable,
Peer peer,
PeerConncetionInfo.ConnectionAttempt connectionAttempt,
long ts,
CompletableFuture<Void> future) {
peerDisconnectedEventListener.ifPresent(peer::removeDisconnectedEventListener);
peerDisconnectedEventListener = Optional.empty();
peerConnectedEventListener.ifPresent(peer::removeConnectedEventListener);
peerConnectedEventListener = Optional.empty();
if (shutdownCalled.get()) {
return;
}
connectionAttempt.setDurationUntilFailure(System.currentTimeMillis() - ts);
connectionAttempt.onException(throwable);
try {
// Try disconnect
log.info("Try close peer {}", peer.getAddress());
peer.close();
} catch (Exception ignore) {
}
try {
Thread.sleep(reconnectIntervalSec * 1000L); // 120 sec
} catch (InterruptedException ignore) {
}
if (shutdownCalled.get()) {
return;
}
future.completeExceptionally(throwable);
} }
private Peer createPeer(PeerAddress address) { private Peer createPeer(PeerAddress address) {

View File

@ -119,21 +119,24 @@ public class PeerGroupService {
Context.propagate(context); Context.propagate(context);
blockingClientManager.startAsync(); blockingClientManager.startAsync();
blockingClientManager.awaitRunning(); blockingClientManager.awaitRunning();
}, SingleThreadExecutorUtils.getSingleThreadExecutor("start"));
}
public void connectToAll() { peerConnections.forEach(PeerConnection::start);
peerConnections.forEach(PeerConnection::connect); }, SingleThreadExecutorUtils.getSingleThreadExecutor("start"));
} }
public CompletableFuture<Void> shutdown() { public CompletableFuture<Void> shutdown() {
return CompletableFuture.runAsync(() -> { return CompletableFuture.runAsync(() -> {
log.info("shutdown"); log.info("Shutdown all peerConnections");
Context.propagate(context); Context.propagate(context);
CountDownLatch latch = new CountDownLatch(peerConnections.size()); CountDownLatch latch = new CountDownLatch(peerConnections.size());
peerConnections.forEach(e -> e.shutdown().thenRun(latch::countDown)); peerConnections.forEach(peerConnection -> peerConnection.shutdown()
.thenRun(latch::countDown));
try { try {
latch.await(5, TimeUnit.SECONDS); if (latch.await(3, TimeUnit.SECONDS)) {
log.info("All peerConnections shut down.");
} else {
log.info("Shutdown of peerConnections not completed in time.");
}
blockingClientManager.stopAsync(); blockingClientManager.stopAsync();
blockingClientManager.awaitTerminated(Duration.ofSeconds(2)); blockingClientManager.awaitTerminated(Duration.ofSeconds(2));
} catch (Exception e) { } catch (Exception e) {

View File

@ -56,28 +56,24 @@ public class SimpleHttpServer {
private final static String CLOSE_TAG = "</font><br/>"; private final static String CLOSE_TAG = "</font><br/>";
private final static String WARNING_ICON = "&#9888; "; private final static String WARNING_ICON = "&#9888; ";
private final static String ALERT_ICON = "&#9760; "; // &#9889; &#9889; private final static String ALERT_ICON = "&#9760; "; // &#9889; &#9889;
private final Config config;
@Getter @Getter
private final List<BtcNodes.BtcNode> providedBtcNodes; private final List<BtcNodes.BtcNode> providedBtcNodes;
private final Map<String, BtcNodes.BtcNode> btcNodeByAddress; private final Map<String, BtcNodes.BtcNode> btcNodeByAddress;
private final int port; private final int port;
private final PeerConncetionModel peerConncetionModel; private final PeerConncetionModel peerConncetionModel;
private final String started; private final String started;
private final String networkInfo;
private String html; private String html;
private int requestCounter;
private String networkInfo;
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Constructor // Constructor
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public SimpleHttpServer(Config config, PeerConncetionModel peerConncetionModel) { public SimpleHttpServer(Config config, PeerConncetionModel peerConncetionModel) {
this.config = config;
this.peerConncetionModel = peerConncetionModel; this.peerConncetionModel = peerConncetionModel;
started = new Date().toString(); started = new Date().toString();
providedBtcNodes = peerConncetionModel.getProvidedBtcNodes();
this.providedBtcNodes = peerConncetionModel.getProvidedBtcNodes();
BaseCurrencyNetwork network = config.baseCurrencyNetwork; BaseCurrencyNetwork network = config.baseCurrencyNetwork;
if (config.useTorForBtcMonitor) { if (config.useTorForBtcMonitor) {
@ -121,9 +117,9 @@ public class SimpleHttpServer {
public CompletableFuture<Void> shutdown() { public CompletableFuture<Void> shutdown() {
return CompletableFuture.runAsync(() -> { return CompletableFuture.runAsync(() -> {
log.info("shutDown"); log.info("stop Spark server");
Spark.stop(); Spark.stop();
log.info("shutDown completed"); log.info("Spark server stopped");
}); });
} }

View File

@ -81,12 +81,12 @@ public class ProxySetup {
} }
public CompletableFuture<Void> shutdown() { public CompletableFuture<Void> shutdown() {
log.warn("start shutdown"); log.info("Shutdown tor");
return CompletableFuture.runAsync(() -> { return CompletableFuture.runAsync(() -> {
if (tor != null) { if (tor != null) {
tor.shutdown(); tor.shutdown();
log.info("Tor shutdown completed");
} }
log.warn("shutdown completed");
}) })
.orTimeout(2, TimeUnit.SECONDS); .orTimeout(2, TimeUnit.SECONDS);
} }