Shutdown PriceFeedService and related services

at shutdown routine
This commit is contained in:
chimp1984 2020-12-09 18:33:15 -05:00
parent de2755d2d7
commit a2d2e6c4fb
No known key found for this signature in database
GPG Key ID: 9801B4EC591F90E3
6 changed files with 97 additions and 21 deletions

View File

@ -23,6 +23,7 @@ import bisq.core.btc.wallet.BtcWalletService;
import bisq.core.dao.DaoSetup;
import bisq.core.dao.node.full.RpcService;
import bisq.core.offer.OpenOfferManager;
import bisq.core.provider.price.PriceFeedService;
import bisq.core.setup.CorePersistedDataHost;
import bisq.core.setup.CoreSetup;
import bisq.core.support.dispute.arbitration.arbitrator.ArbitratorManager;
@ -227,12 +228,14 @@ public abstract class BisqExecutable implements GracefulShutDownHandler, BisqSet
}
try {
injector.getInstance(PriceFeedService.class).shutDown();
injector.getInstance(ArbitratorManager.class).shutDown();
injector.getInstance(TradeStatisticsManager.class).shutDown();
injector.getInstance(XmrTxProofService.class).shutDown();
injector.getInstance(RpcService.class).shutDown();
injector.getInstance(DaoSetup.class).shutDown();
injector.getInstance(AvoidStandbyModeService.class).shutDown();
log.info("OpenOfferManager shutdown started");
injector.getInstance(OpenOfferManager.class).shutDown(() -> {
log.info("OpenOfferManager shutdown completed");
@ -265,7 +268,7 @@ public abstract class BisqExecutable implements GracefulShutDownHandler, BisqSet
// Wait max 20 sec.
UserThread.runAfter(() -> {
log.warn("Timeout triggered resultHandler");
log.warn("Graceful shut down not completed in 20 sec. We trigger our timeout handler.");
if (!hasDowngraded) {
// If user tried to downgrade we do not write the persistable data to avoid data corruption
PersistenceManager.flushAllDataToDisk(() -> {

View File

@ -92,6 +92,8 @@ public class PriceFeedService {
private String baseUrlOfRespondingProvider;
@Nullable
private Timer requestTimer;
@Nullable
private PriceRequest priceRequest;
///////////////////////////////////////////////////////////////////////////////////////////
@ -115,10 +117,20 @@ public class PriceFeedService {
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void shutDown() {
if (requestTimer != null) {
requestTimer.stop();
requestTimer = null;
}
if (priceRequest != null) {
priceRequest.shutDown();
}
}
public void setCurrencyCodeOnInit() {
if (getCurrencyCode() == null) {
final TradeCurrency preferredTradeCurrency = preferences.getPreferredTradeCurrency();
final String code = preferredTradeCurrency != null ? preferredTradeCurrency.getCode() : "USD";
TradeCurrency preferredTradeCurrency = preferences.getPreferredTradeCurrency();
String code = preferredTradeCurrency != null ? preferredTradeCurrency.getCode() : "USD";
setCurrencyCode(code);
}
}
@ -180,8 +192,8 @@ public class PriceFeedService {
}
}, (errorMessage, throwable) -> {
if (throwable instanceof PriceRequestException) {
final String baseUrlOfFaultyRequest = ((PriceRequestException) throwable).priceProviderBaseUrl;
final String baseUrlOfCurrentRequest = priceProvider.getBaseUrl();
String baseUrlOfFaultyRequest = ((PriceRequestException) throwable).priceProviderBaseUrl;
String baseUrlOfCurrentRequest = priceProvider.getBaseUrl();
if (baseUrlOfFaultyRequest != null && baseUrlOfCurrentRequest.equals(baseUrlOfFaultyRequest)) {
log.warn("We received an error: baseUrlOfCurrentRequest={}, baseUrlOfFaultyRequest={}",
baseUrlOfCurrentRequest, baseUrlOfFaultyRequest);
@ -223,7 +235,7 @@ public class PriceFeedService {
UserThread.runAfter(() -> {
retryDelay = Math.min(retryDelay + 5, PERIOD_SEC);
final String oldBaseUrl = priceProvider.getBaseUrl();
String oldBaseUrl = priceProvider.getBaseUrl();
setNewPriceProvider();
log.warn("We received an error at the request from provider {}. " +
"We select the new provider {} and use that for a new request. retryDelay was {} sec.", oldBaseUrl, priceProvider.getBaseUrl(), retryDelay);
@ -381,15 +393,15 @@ public class PriceFeedService {
}
private void requestAllPrices(PriceProvider provider, Runnable resultHandler, FaultHandler faultHandler) {
PriceRequest priceRequest = new PriceRequest();
priceRequest = new PriceRequest();
SettableFuture<Tuple2<Map<String, Long>, Map<String, MarketPrice>>> future = priceRequest.requestAllPrices(provider);
Futures.addCallback(future, new FutureCallback<Tuple2<Map<String, Long>, Map<String, MarketPrice>>>() {
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Tuple2<Map<String, Long>, Map<String, MarketPrice>> result) {
UserThread.execute(() -> {
checkNotNull(result, "Result must not be null at requestAllPrices");
// Each currency rate has a different timestamp, depending on when
// the pricenode aggregate rate was calculated
// the priceNode aggregate rate was calculated
// However, the request timestamp is when the pricenode was queried
epochInMillisAtLastRequest = System.currentTimeMillis();

View File

@ -41,12 +41,18 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class PriceProvider extends HttpClientProvider {
private boolean shutDownRequested;
// Do not use Guice here as we might create multiple instances
public PriceProvider(HttpClient httpClient, String baseUrl) {
super(httpClient, baseUrl, false);
}
public Tuple2<Map<String, Long>, Map<String, MarketPrice>> getAll() throws IOException {
if (shutDownRequested) {
return new Tuple2<>(new HashMap<>(), new HashMap<>());
}
Map<String, MarketPrice> marketPriceMap = new HashMap<>();
String hsVersion = "";
if (P2PService.getMyNodeAddress() != null)
@ -66,10 +72,10 @@ public class PriceProvider extends HttpClientProvider {
list.forEach(obj -> {
try {
LinkedTreeMap<?, ?> treeMap = (LinkedTreeMap<?, ?>) obj;
final String currencyCode = (String) treeMap.get("currencyCode");
final double price = (Double) treeMap.get("price");
String currencyCode = (String) treeMap.get("currencyCode");
double price = (Double) treeMap.get("price");
// json uses double for our timestampSec long value...
final long timestampSec = MathUtils.doubleToLong((Double) treeMap.get("timestampSec"));
long timestampSec = MathUtils.doubleToLong((Double) treeMap.get("timestampSec"));
marketPriceMap.put(currencyCode, new MarketPrice(currencyCode, price, timestampSec, true));
} catch (Throwable t) {
log.error(t.toString());
@ -83,4 +89,9 @@ public class PriceProvider extends HttpClientProvider {
public String getBaseUrl() {
return httpClient.getBaseUrl();
}
public void shutDown() {
shutDownRequested = true;
httpClient.shutDown();
}
}

View File

@ -28,37 +28,64 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@Slf4j
public class PriceRequest {
private static final ListeningExecutorService executorService = Utilities.getListeningExecutorService("PriceRequest", 3, 5, 10 * 60);
@Nullable
private PriceProvider provider;
private boolean shutDownRequested;
public PriceRequest() {
}
public SettableFuture<Tuple2<Map<String, Long>, Map<String, MarketPrice>>> requestAllPrices(PriceProvider provider) {
final String baseUrl = provider.getBaseUrl();
final SettableFuture<Tuple2<Map<String, Long>, Map<String, MarketPrice>>> resultFuture = SettableFuture.create();
this.provider = provider;
String baseUrl = provider.getBaseUrl();
SettableFuture<Tuple2<Map<String, Long>, Map<String, MarketPrice>>> resultFuture = SettableFuture.create();
ListenableFuture<Tuple2<Map<String, Long>, Map<String, MarketPrice>>> future = executorService.submit(() -> {
Thread.currentThread().setName("PriceRequest-" + provider.getBaseUrl());
Thread.currentThread().setName("PriceRequest-" + baseUrl);
return provider.getAll();
});
Futures.addCallback(future, new FutureCallback<Tuple2<Map<String, Long>, Map<String, MarketPrice>>>() {
Futures.addCallback(future, new FutureCallback<>() {
public void onSuccess(Tuple2<Map<String, Long>, Map<String, MarketPrice>> marketPriceTuple) {
log.trace("Received marketPriceTuple of {}\nfrom provider {}", marketPriceTuple, provider);
resultFuture.set(marketPriceTuple);
if (!shutDownRequested) {
resultFuture.set(marketPriceTuple);
}
}
public void onFailure(@NotNull Throwable throwable) {
resultFuture.setException(new PriceRequestException(throwable, baseUrl));
if (!shutDownRequested) {
resultFuture.setException(new PriceRequestException(throwable, baseUrl));
}
}
}, MoreExecutors.directExecutor());
return resultFuture;
}
public void shutDown() {
shutDownRequested = true;
if (provider != null) {
provider.shutDown();
}
executorService.shutdown();
try {
if (!executorService.awaitTermination(1, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
}
}

View File

@ -37,4 +37,6 @@ public interface HttpClient {
String getUid();
String getBaseUrl();
void shutDown();
}

View File

@ -64,6 +64,10 @@ public class HttpClientImpl implements HttpClient {
private String baseUrl;
private boolean ignoreSocks5Proxy;
private final String uid;
@Nullable
private HttpURLConnection connection;
@Nullable
private CloseableHttpClient httpclient;
@Inject
public HttpClientImpl(@Nullable Socks5ProxyProvider socks5ProxyProvider) {
@ -76,6 +80,19 @@ public class HttpClientImpl implements HttpClient {
uid = UUID.randomUUID().toString();
}
@Override
public void shutDown() {
if (connection != null) {
connection.disconnect();
}
if (httpclient != null) {
try {
httpclient.close();
} catch (IOException ignore) {
}
}
}
@Override
public void setBaseUrl(String baseUrl) {
this.baseUrl = baseUrl;
@ -117,7 +134,6 @@ public class HttpClientImpl implements HttpClient {
public String requestWithGETNoProxy(String param,
@Nullable String headerKey,
@Nullable String headerValue) throws IOException {
HttpURLConnection connection = null;
log.debug("Executing HTTP request " + baseUrl + param + " proxy: none.");
URL url = new URL(baseUrl + param);
try {
@ -177,7 +193,8 @@ public class HttpClientImpl implements HttpClient {
PoolingHttpClientConnectionManager cm = socks5Proxy.resolveAddrLocally() ?
new PoolingHttpClientConnectionManager(reg) :
new PoolingHttpClientConnectionManager(reg, new FakeDnsResolver());
try (CloseableHttpClient httpclient = HttpClients.custom().setConnectionManager(cm).build()) {
try {
httpclient = HttpClients.custom().setConnectionManager(cm).build();
InetSocketAddress socksAddress = new InetSocketAddress(socks5Proxy.getInetAddress(), socks5Proxy.getPort());
// remove me: Use this to test with system-wide Tor proxy, or change port for another proxy.
@ -191,11 +208,15 @@ public class HttpClientImpl implements HttpClient {
request.setHeader(headerKey, headerValue);
log.debug("Executing request " + request + " proxy: " + socksAddress);
try (CloseableHttpResponse response = httpclient.execute(request, context)) {
try (CloseableHttpResponse response = checkNotNull(httpclient).execute(request, context)) {
return convertInputStreamToString(response.getEntity().getContent());
}
} catch (Throwable t) {
throw new IOException("Error at requestWithGETProxy with URL: " + (baseUrl + param) + ". Throwable=" + t.getMessage());
} finally {
if (httpclient != null) {
httpclient.close();
}
}
}