Republish trade stats on interval. tune delays

This commit is contained in:
Manfred Karrer 2016-07-26 21:31:27 +02:00
parent f298201964
commit 281c9f5fa7
5 changed files with 49 additions and 30 deletions

View file

@ -66,10 +66,7 @@ import org.spongycastle.crypto.params.KeyParameter;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Named; import javax.inject.Named;
import java.io.File; import java.io.File;
import java.util.ArrayList; import java.util.*;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -78,6 +75,8 @@ import static io.bitsquare.util.Validator.nonEmptyStringOf;
public class TradeManager { public class TradeManager {
private static final Logger log = LoggerFactory.getLogger(TradeManager.class); private static final Logger log = LoggerFactory.getLogger(TradeManager.class);
private static final long REPUBLISH_STATISTICS_INTERVAL_MIN = TimeUnit.HOURS.toMillis(1);
private final User user; private final User user;
private final KeyRing keyRing; private final KeyRing keyRing;
private final WalletService walletService; private final WalletService walletService;
@ -93,6 +92,8 @@ public class TradeManager {
private final Storage<TradableList<Trade>> tradableListStorage; private final Storage<TradableList<Trade>> tradableListStorage;
private final TradableList<Trade> trades; private final TradableList<Trade> trades;
private final BooleanProperty pendingTradesInitialized = new SimpleBooleanProperty(); private final BooleanProperty pendingTradesInitialized = new SimpleBooleanProperty();
private boolean stopped;
private List<Trade> tradesForStatistics;
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -166,6 +167,11 @@ public class TradeManager {
}); });
} }
///////////////////////////////////////////////////////////////////////////////////////////
// Lifecycle
///////////////////////////////////////////////////////////////////////////////////////////
public void onAllServicesInitialized() { public void onAllServicesInitialized() {
Log.traceCall(); Log.traceCall();
if (p2PService.isBootstrapped()) if (p2PService.isBootstrapped())
@ -181,17 +187,16 @@ public class TradeManager {
}); });
} }
public void shutDown() {
/////////////////////////////////////////////////////////////////////////////////////////// stopped = true;
// Lifecycle }
///////////////////////////////////////////////////////////////////////////////////////////
private void initPendingTrades() { private void initPendingTrades() {
Log.traceCall(); Log.traceCall();
List<Trade> toAdd = new ArrayList<>(); List<Trade> toAdd = new ArrayList<>();
List<Trade> toRemove = new ArrayList<>(); List<Trade> toRemove = new ArrayList<>();
List<Trade> tradesForStatistics = new ArrayList<>(); tradesForStatistics = new ArrayList<>();
for (Trade trade : trades) { for (Trade trade : trades) {
trade.setStorage(tradableListStorage); trade.setStorage(tradableListStorage);
@ -217,7 +222,17 @@ public class TradeManager {
tradesForStatistics.add((Trade) tradable); tradesForStatistics.add((Trade) tradable);
} }
publishTradeStatistics(tradesForStatistics); // We start later to have better connectivity to the network
UserThread.runPeriodically(() -> publishTradeStatistics(tradesForStatistics),
30, TimeUnit.SECONDS);
//TODO can be removed at next release
// For the first 2 weeks of the release we re publish the trades to get faster good distribution
// otherwise the trades would only be published again at restart and if a client dont do that the stats might be missing
// for a longer time as initially there are not many peer upgraded and supporting flooding of the stats data.
if (new Date().before(new Date(2016 - 1900, Calendar.AUGUST, 8)))
UserThread.runPeriodically(() -> publishTradeStatistics(tradesForStatistics),
REPUBLISH_STATISTICS_INTERVAL_MIN, TimeUnit.MILLISECONDS);
pendingTradesInitialized.set(true); pendingTradesInitialized.set(true);
} }
@ -235,11 +250,13 @@ public class TradeManager {
// Only trades from last 30 days // Only trades from last 30 days
if ((new Date().getTime() - trade.getDate().getTime()) < TimeUnit.DAYS.toMillis(30)) { if ((new Date().getTime() - trade.getDate().getTime()) < TimeUnit.DAYS.toMillis(30)) {
final long minDelay = i + 30; long delay = 3000;
final long maxDelay = i + 32; final long minDelay = (i + 1) * delay;
// We start after 30 sec. to have better connection and use a delay to avoid flooding the network to intensely final long maxDelay = (i + 2) * delay;
// roughly 1 item per 1-2 seconds UserThread.runAfterRandomDelay(() -> {
UserThread.runAfterRandomDelay(() -> p2PService.addData(tradeStatistics, true), minDelay, maxDelay, TimeUnit.SECONDS); if (!stopped)
p2PService.addData(tradeStatistics, true);
}, minDelay, maxDelay, TimeUnit.MILLISECONDS);
} }
} }
} }

View file

@ -72,7 +72,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
private static final long RETRY_REPUBLISH_DELAY_SEC = 10; private static final long RETRY_REPUBLISH_DELAY_SEC = 10;
private static final long REPUBLISH_AGAIN_AT_STARTUP_DELAY_SEC = 10; private static final long REPUBLISH_AGAIN_AT_STARTUP_DELAY_SEC = 10;
private static final long REPUBLISH_INTERVAL_MS = TimeUnit.MINUTES.toMillis(DevFlags.STRESS_TEST_MODE ? 12 : 12); private static final long REPUBLISH_INTERVAL_MS = TimeUnit.MINUTES.toMillis(DevFlags.STRESS_TEST_MODE ? 14 : 14);
private static final long REFRESH_INTERVAL_MS = TimeUnit.MINUTES.toMillis(DevFlags.STRESS_TEST_MODE ? 4 : 4); private static final long REFRESH_INTERVAL_MS = TimeUnit.MINUTES.toMillis(DevFlags.STRESS_TEST_MODE ? 4 : 4);
private final KeyRing keyRing; private final KeyRing keyRing;
@ -432,9 +432,10 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
stopPeriodicRefreshOffersTimer(); stopPeriodicRefreshOffersTimer();
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
// we delay to avoid reaching throttle limits // we delay to avoid reaching throttle limits
// roughly 1 offer per second
final long minDelay = i * 500 + 1; long delay = 500;
final long maxDelay = minDelay * 2 + 500; final long minDelay = (i + 1) * delay;
final long maxDelay = (i + 2) * delay;
final OpenOffer openOffer = openOffersList.get(i); final OpenOffer openOffer = openOffersList.get(i);
UserThread.runAfterRandomDelay(() -> { UserThread.runAfterRandomDelay(() -> {
if (openOffers.contains(openOffer)) if (openOffers.contains(openOffer))
@ -503,11 +504,12 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
// we delay to avoid reaching throttle limits // we delay to avoid reaching throttle limits
// roughly 4 offers per second // roughly 4 offers per second
final int n = i;
final long minDelay = i * 120 + 1; long delay = 150;
final long maxDelay = minDelay * 2; final long minDelay = (i + 1) * delay;
final long maxDelay = (i + 2) * delay;
final OpenOffer openOffer = openOffersList.get(i);
UserThread.runAfterRandomDelay(() -> { UserThread.runAfterRandomDelay(() -> {
OpenOffer openOffer = openOffersList.get(n);
// we need to check if in the meantime the offer has been removed // we need to check if in the meantime the offer has been removed
if (openOffers.contains(openOffer)) if (openOffers.contains(openOffer))
refreshOffer(openOffer); refreshOffer(openOffer);

View file

@ -45,6 +45,7 @@ import io.bitsquare.gui.main.overlays.windows.SendAlertMessageWindow;
import io.bitsquare.gui.util.ImageUtil; import io.bitsquare.gui.util.ImageUtil;
import io.bitsquare.p2p.P2PService; import io.bitsquare.p2p.P2PService;
import io.bitsquare.storage.Storage; import io.bitsquare.storage.Storage;
import io.bitsquare.trade.TradeManager;
import io.bitsquare.trade.offer.OpenOfferManager; import io.bitsquare.trade.offer.OpenOfferManager;
import javafx.application.Application; import javafx.application.Application;
import javafx.application.Platform; import javafx.application.Platform;
@ -370,6 +371,7 @@ public class BitsquareApp extends Application {
if (injector != null) { if (injector != null) {
injector.getInstance(ArbitratorManager.class).shutDown(); injector.getInstance(ArbitratorManager.class).shutDown();
injector.getInstance(MainViewModel.class).shutDown(); injector.getInstance(MainViewModel.class).shutDown();
injector.getInstance(TradeManager.class).shutDown();
injector.getInstance(OpenOfferManager.class).shutDown(() -> { injector.getInstance(OpenOfferManager.class).shutDown(() -> {
injector.getInstance(P2PService.class).shutDown(() -> { injector.getInstance(P2PService.class).shutDown(() -> {
injector.getInstance(WalletService.class).shutDownDone.addListener((ov, o, n) -> { injector.getInstance(WalletService.class).shutDownDone.addListener((ov, o, n) -> {

View file

@ -50,9 +50,6 @@ public class HttpClient {
public String requestWithGET(String param) throws IOException, HttpException { public String requestWithGET(String param) throws IOException, HttpException {
checkNotNull(baseUrl, "baseUrl must be set before calling requestWithGET"); checkNotNull(baseUrl, "baseUrl must be set before calling requestWithGET");
Socks5Proxy socks5Proxy = socks5ProxyProvider.getSocks5Proxy(); Socks5Proxy socks5Proxy = socks5ProxyProvider.getSocks5Proxy();
if (useSocks5Proxy && socks5Proxy == null)
log.error("socks5Proxy is null. That might be the case if you use localhost dev environment so no internal proxy was created and you " +
"has tor enabled but no proxy is defined in the program arguments.");
return useSocks5Proxy && socks5Proxy != null ? requestWithGETProxy(param, socks5Proxy) : requestWithGETNoProxy(param); return useSocks5Proxy && socks5Proxy != null ? requestWithGETProxy(param, socks5Proxy) : requestWithGETNoProxy(param);
} }

View file

@ -107,11 +107,11 @@ public class BroadcastHandler implements PeerManager.Listener {
List<Connection> connectedPeersList = new ArrayList<>(connectedPeersSet); List<Connection> connectedPeersList = new ArrayList<>(connectedPeersSet);
Collections.shuffle(connectedPeersList); Collections.shuffle(connectedPeersList);
numOfPeers = connectedPeersList.size(); numOfPeers = connectedPeersList.size();
int factor = 1; int delay = 50;
if (!isDataOwner) { if (!isDataOwner) {
// for not data owner (relay nodes) we send to max. 7 nodes and use a longer delay // for not data owner (relay nodes) we send to max. 7 nodes and use a longer delay
numOfPeers = Math.min(7, connectedPeersList.size()); numOfPeers = Math.min(7, connectedPeersList.size());
factor = 2; delay = 100;
} }
long timeoutDelay = TIMEOUT_PER_PEER_SEC * numOfPeers; long timeoutDelay = TIMEOUT_PER_PEER_SEC * numOfPeers;
@ -130,8 +130,9 @@ public class BroadcastHandler implements PeerManager.Listener {
for (int i = 0; i < numOfPeers; i++) { for (int i = 0; i < numOfPeers; i++) {
if (stopped) if (stopped)
break; // do not continue sending after a timeout or a cancellation break; // do not continue sending after a timeout or a cancellation
final long minDelay = i * 30 * factor + 1;
final long maxDelay = minDelay * 2 + 30 * factor; final long minDelay = (i + 1) * delay;
final long maxDelay = (i + 2) * delay;
final Connection connection = connectedPeersList.get(i); final Connection connection = connectedPeersList.get(i);
UserThread.runAfterRandomDelay(() -> sendToPeer(connection, message), minDelay, maxDelay, TimeUnit.MILLISECONDS); UserThread.runAfterRandomDelay(() -> sendToPeer(connection, message), minDelay, maxDelay, TimeUnit.MILLISECONDS);
} }