From bcf956530fbde8e49dbcce88fb59c3b06da5d3a5 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Wed, 9 Dec 2020 18:14:50 -0500 Subject: [PATCH 1/3] Limit delay for remove offer result handler to 3 sec --- core/src/main/java/bisq/core/offer/OpenOfferManager.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/bisq/core/offer/OpenOfferManager.java b/core/src/main/java/bisq/core/offer/OpenOfferManager.java index 7db7011456..8c8ec4855d 100644 --- a/core/src/main/java/bisq/core/offer/OpenOfferManager.java +++ b/core/src/main/java/bisq/core/offer/OpenOfferManager.java @@ -225,8 +225,12 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe UserThread.execute(() -> openOffers.forEach( openOffer -> offerBookService.removeOfferAtShutDown(openOffer.getOffer().getOfferPayload()) )); - if (completeHandler != null) - UserThread.runAfter(completeHandler, size * 200 + 500, TimeUnit.MILLISECONDS); + if (completeHandler != null) { + // For typical number of offers we are tolerant with delay to give enough time to broadcast. + // If number of offers is very high we limit to 3 sec. to not delay other shutdown routines. + int delay = Math.min(3000, size * 200 + 500); + UserThread.runAfter(completeHandler, delay, TimeUnit.MILLISECONDS); + } } else { if (completeHandler != null) completeHandler.run(); From de2755d2d7f4c9a5784a4e44f791e768453ba916 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Wed, 9 Dec 2020 18:32:24 -0500 Subject: [PATCH 2/3] Remove debug log --- core/src/main/java/bisq/core/offer/OfferBookService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/bisq/core/offer/OfferBookService.java b/core/src/main/java/bisq/core/offer/OfferBookService.java index 66ed7ca2b3..64b3900463 100644 --- a/core/src/main/java/bisq/core/offer/OfferBookService.java +++ b/core/src/main/java/bisq/core/offer/OfferBookService.java @@ -203,7 +203,6 @@ public class OfferBookService { } public void removeOfferAtShutDown(OfferPayload offerPayload) { - log.debug("removeOfferAtShutDown " + offerPayload); removeOffer(offerPayload, null, null); } From a2d2e6c4fba580e74a1c8ef06f2ef1ec1c84eff4 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Wed, 9 Dec 2020 18:33:15 -0500 Subject: [PATCH 3/3] Shutdown PriceFeedService and related services at shutdown routine --- .../java/bisq/core/app/BisqExecutable.java | 5 ++- .../core/provider/price/PriceFeedService.java | 28 +++++++++---- .../core/provider/price/PriceProvider.java | 17 ++++++-- .../core/provider/price/PriceRequest.java | 39 ++++++++++++++++--- .../java/bisq/network/http/HttpClient.java | 2 + .../bisq/network/http/HttpClientImpl.java | 27 +++++++++++-- 6 files changed, 97 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/bisq/core/app/BisqExecutable.java b/core/src/main/java/bisq/core/app/BisqExecutable.java index 5e4863c818..dc1265ef94 100644 --- a/core/src/main/java/bisq/core/app/BisqExecutable.java +++ b/core/src/main/java/bisq/core/app/BisqExecutable.java @@ -23,6 +23,7 @@ import bisq.core.btc.wallet.BtcWalletService; import bisq.core.dao.DaoSetup; import bisq.core.dao.node.full.RpcService; import bisq.core.offer.OpenOfferManager; +import bisq.core.provider.price.PriceFeedService; import bisq.core.setup.CorePersistedDataHost; import bisq.core.setup.CoreSetup; import bisq.core.support.dispute.arbitration.arbitrator.ArbitratorManager; @@ -227,12 +228,14 @@ public abstract class BisqExecutable implements GracefulShutDownHandler, BisqSet } try { + injector.getInstance(PriceFeedService.class).shutDown(); injector.getInstance(ArbitratorManager.class).shutDown(); injector.getInstance(TradeStatisticsManager.class).shutDown(); injector.getInstance(XmrTxProofService.class).shutDown(); injector.getInstance(RpcService.class).shutDown(); injector.getInstance(DaoSetup.class).shutDown(); injector.getInstance(AvoidStandbyModeService.class).shutDown(); + log.info("OpenOfferManager shutdown started"); injector.getInstance(OpenOfferManager.class).shutDown(() -> { log.info("OpenOfferManager shutdown completed"); @@ -265,7 +268,7 @@ public abstract class BisqExecutable implements GracefulShutDownHandler, BisqSet // Wait max 20 sec. UserThread.runAfter(() -> { - log.warn("Timeout triggered resultHandler"); + log.warn("Graceful shut down not completed in 20 sec. We trigger our timeout handler."); if (!hasDowngraded) { // If user tried to downgrade we do not write the persistable data to avoid data corruption PersistenceManager.flushAllDataToDisk(() -> { diff --git a/core/src/main/java/bisq/core/provider/price/PriceFeedService.java b/core/src/main/java/bisq/core/provider/price/PriceFeedService.java index 44c9635b86..24a50430f9 100644 --- a/core/src/main/java/bisq/core/provider/price/PriceFeedService.java +++ b/core/src/main/java/bisq/core/provider/price/PriceFeedService.java @@ -92,6 +92,8 @@ public class PriceFeedService { private String baseUrlOfRespondingProvider; @Nullable private Timer requestTimer; + @Nullable + private PriceRequest priceRequest; /////////////////////////////////////////////////////////////////////////////////////////// @@ -115,10 +117,20 @@ public class PriceFeedService { // API /////////////////////////////////////////////////////////////////////////////////////////// + public void shutDown() { + if (requestTimer != null) { + requestTimer.stop(); + requestTimer = null; + } + if (priceRequest != null) { + priceRequest.shutDown(); + } + } + public void setCurrencyCodeOnInit() { if (getCurrencyCode() == null) { - final TradeCurrency preferredTradeCurrency = preferences.getPreferredTradeCurrency(); - final String code = preferredTradeCurrency != null ? preferredTradeCurrency.getCode() : "USD"; + TradeCurrency preferredTradeCurrency = preferences.getPreferredTradeCurrency(); + String code = preferredTradeCurrency != null ? preferredTradeCurrency.getCode() : "USD"; setCurrencyCode(code); } } @@ -180,8 +192,8 @@ public class PriceFeedService { } }, (errorMessage, throwable) -> { if (throwable instanceof PriceRequestException) { - final String baseUrlOfFaultyRequest = ((PriceRequestException) throwable).priceProviderBaseUrl; - final String baseUrlOfCurrentRequest = priceProvider.getBaseUrl(); + String baseUrlOfFaultyRequest = ((PriceRequestException) throwable).priceProviderBaseUrl; + String baseUrlOfCurrentRequest = priceProvider.getBaseUrl(); if (baseUrlOfFaultyRequest != null && baseUrlOfCurrentRequest.equals(baseUrlOfFaultyRequest)) { log.warn("We received an error: baseUrlOfCurrentRequest={}, baseUrlOfFaultyRequest={}", baseUrlOfCurrentRequest, baseUrlOfFaultyRequest); @@ -223,7 +235,7 @@ public class PriceFeedService { UserThread.runAfter(() -> { retryDelay = Math.min(retryDelay + 5, PERIOD_SEC); - final String oldBaseUrl = priceProvider.getBaseUrl(); + String oldBaseUrl = priceProvider.getBaseUrl(); setNewPriceProvider(); log.warn("We received an error at the request from provider {}. " + "We select the new provider {} and use that for a new request. retryDelay was {} sec.", oldBaseUrl, priceProvider.getBaseUrl(), retryDelay); @@ -381,15 +393,15 @@ public class PriceFeedService { } private void requestAllPrices(PriceProvider provider, Runnable resultHandler, FaultHandler faultHandler) { - PriceRequest priceRequest = new PriceRequest(); + priceRequest = new PriceRequest(); SettableFuture, Map>> future = priceRequest.requestAllPrices(provider); - Futures.addCallback(future, new FutureCallback, Map>>() { + Futures.addCallback(future, new FutureCallback<>() { @Override public void onSuccess(@Nullable Tuple2, Map> result) { UserThread.execute(() -> { checkNotNull(result, "Result must not be null at requestAllPrices"); // Each currency rate has a different timestamp, depending on when - // the pricenode aggregate rate was calculated + // the priceNode aggregate rate was calculated // However, the request timestamp is when the pricenode was queried epochInMillisAtLastRequest = System.currentTimeMillis(); diff --git a/core/src/main/java/bisq/core/provider/price/PriceProvider.java b/core/src/main/java/bisq/core/provider/price/PriceProvider.java index d0ac27c2d7..3d733fa145 100644 --- a/core/src/main/java/bisq/core/provider/price/PriceProvider.java +++ b/core/src/main/java/bisq/core/provider/price/PriceProvider.java @@ -41,12 +41,18 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class PriceProvider extends HttpClientProvider { + private boolean shutDownRequested; + // Do not use Guice here as we might create multiple instances public PriceProvider(HttpClient httpClient, String baseUrl) { super(httpClient, baseUrl, false); } public Tuple2, Map> getAll() throws IOException { + if (shutDownRequested) { + return new Tuple2<>(new HashMap<>(), new HashMap<>()); + } + Map marketPriceMap = new HashMap<>(); String hsVersion = ""; if (P2PService.getMyNodeAddress() != null) @@ -66,10 +72,10 @@ public class PriceProvider extends HttpClientProvider { list.forEach(obj -> { try { LinkedTreeMap treeMap = (LinkedTreeMap) obj; - final String currencyCode = (String) treeMap.get("currencyCode"); - final double price = (Double) treeMap.get("price"); + String currencyCode = (String) treeMap.get("currencyCode"); + double price = (Double) treeMap.get("price"); // json uses double for our timestampSec long value... - final long timestampSec = MathUtils.doubleToLong((Double) treeMap.get("timestampSec")); + long timestampSec = MathUtils.doubleToLong((Double) treeMap.get("timestampSec")); marketPriceMap.put(currencyCode, new MarketPrice(currencyCode, price, timestampSec, true)); } catch (Throwable t) { log.error(t.toString()); @@ -83,4 +89,9 @@ public class PriceProvider extends HttpClientProvider { public String getBaseUrl() { return httpClient.getBaseUrl(); } + + public void shutDown() { + shutDownRequested = true; + httpClient.shutDown(); + } } diff --git a/core/src/main/java/bisq/core/provider/price/PriceRequest.java b/core/src/main/java/bisq/core/provider/price/PriceRequest.java index fbeda54a25..80f276e796 100644 --- a/core/src/main/java/bisq/core/provider/price/PriceRequest.java +++ b/core/src/main/java/bisq/core/provider/price/PriceRequest.java @@ -28,37 +28,64 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import java.util.Map; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; @Slf4j public class PriceRequest { private static final ListeningExecutorService executorService = Utilities.getListeningExecutorService("PriceRequest", 3, 5, 10 * 60); + @Nullable + private PriceProvider provider; + private boolean shutDownRequested; public PriceRequest() { } public SettableFuture, Map>> requestAllPrices(PriceProvider provider) { - final String baseUrl = provider.getBaseUrl(); - final SettableFuture, Map>> resultFuture = SettableFuture.create(); + this.provider = provider; + String baseUrl = provider.getBaseUrl(); + SettableFuture, Map>> resultFuture = SettableFuture.create(); ListenableFuture, Map>> future = executorService.submit(() -> { - Thread.currentThread().setName("PriceRequest-" + provider.getBaseUrl()); + Thread.currentThread().setName("PriceRequest-" + baseUrl); return provider.getAll(); }); - Futures.addCallback(future, new FutureCallback, Map>>() { + Futures.addCallback(future, new FutureCallback<>() { public void onSuccess(Tuple2, Map> marketPriceTuple) { log.trace("Received marketPriceTuple of {}\nfrom provider {}", marketPriceTuple, provider); - resultFuture.set(marketPriceTuple); + if (!shutDownRequested) { + resultFuture.set(marketPriceTuple); + } + } public void onFailure(@NotNull Throwable throwable) { - resultFuture.setException(new PriceRequestException(throwable, baseUrl)); + if (!shutDownRequested) { + resultFuture.setException(new PriceRequestException(throwable, baseUrl)); + } } }, MoreExecutors.directExecutor()); return resultFuture; } + + public void shutDown() { + shutDownRequested = true; + if (provider != null) { + provider.shutDown(); + } + + executorService.shutdown(); + try { + if (!executorService.awaitTermination(1, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + } + } } diff --git a/p2p/src/main/java/bisq/network/http/HttpClient.java b/p2p/src/main/java/bisq/network/http/HttpClient.java index 5f33db2a23..fff78049b6 100644 --- a/p2p/src/main/java/bisq/network/http/HttpClient.java +++ b/p2p/src/main/java/bisq/network/http/HttpClient.java @@ -37,4 +37,6 @@ public interface HttpClient { String getUid(); String getBaseUrl(); + + void shutDown(); } diff --git a/p2p/src/main/java/bisq/network/http/HttpClientImpl.java b/p2p/src/main/java/bisq/network/http/HttpClientImpl.java index 8c298b83c4..57b2ac86ea 100644 --- a/p2p/src/main/java/bisq/network/http/HttpClientImpl.java +++ b/p2p/src/main/java/bisq/network/http/HttpClientImpl.java @@ -64,6 +64,10 @@ public class HttpClientImpl implements HttpClient { private String baseUrl; private boolean ignoreSocks5Proxy; private final String uid; + @Nullable + private HttpURLConnection connection; + @Nullable + private CloseableHttpClient httpclient; @Inject public HttpClientImpl(@Nullable Socks5ProxyProvider socks5ProxyProvider) { @@ -76,6 +80,19 @@ public class HttpClientImpl implements HttpClient { uid = UUID.randomUUID().toString(); } + @Override + public void shutDown() { + if (connection != null) { + connection.disconnect(); + } + if (httpclient != null) { + try { + httpclient.close(); + } catch (IOException ignore) { + } + } + } + @Override public void setBaseUrl(String baseUrl) { this.baseUrl = baseUrl; @@ -117,7 +134,6 @@ public class HttpClientImpl implements HttpClient { public String requestWithGETNoProxy(String param, @Nullable String headerKey, @Nullable String headerValue) throws IOException { - HttpURLConnection connection = null; log.debug("Executing HTTP request " + baseUrl + param + " proxy: none."); URL url = new URL(baseUrl + param); try { @@ -177,7 +193,8 @@ public class HttpClientImpl implements HttpClient { PoolingHttpClientConnectionManager cm = socks5Proxy.resolveAddrLocally() ? new PoolingHttpClientConnectionManager(reg) : new PoolingHttpClientConnectionManager(reg, new FakeDnsResolver()); - try (CloseableHttpClient httpclient = HttpClients.custom().setConnectionManager(cm).build()) { + try { + httpclient = HttpClients.custom().setConnectionManager(cm).build(); InetSocketAddress socksAddress = new InetSocketAddress(socks5Proxy.getInetAddress(), socks5Proxy.getPort()); // remove me: Use this to test with system-wide Tor proxy, or change port for another proxy. @@ -191,11 +208,15 @@ public class HttpClientImpl implements HttpClient { request.setHeader(headerKey, headerValue); log.debug("Executing request " + request + " proxy: " + socksAddress); - try (CloseableHttpResponse response = httpclient.execute(request, context)) { + try (CloseableHttpResponse response = checkNotNull(httpclient).execute(request, context)) { return convertInputStreamToString(response.getEntity().getContent()); } } catch (Throwable t) { throw new IOException("Error at requestWithGETProxy with URL: " + (baseUrl + param) + ". Throwable=" + t.getMessage()); + } finally { + if (httpclient != null) { + httpclient.close(); + } } }