mirror of
https://github.com/bisq-network/bisq.git
synced 2024-11-19 01:41:11 +01:00
Merge pull request #6947 from djing-chan/fix-initial-data-request-handling
Fix initial data request handling
This commit is contained in:
commit
95e74937c1
@ -140,7 +140,7 @@ public class SignedWitnessService {
|
||||
} else {
|
||||
p2PService.addP2PServiceListener(new BootstrapListener() {
|
||||
@Override
|
||||
public void onUpdatedDataReceived() {
|
||||
public void onDataReceived() {
|
||||
onBootstrapComplete();
|
||||
}
|
||||
});
|
||||
|
@ -214,7 +214,7 @@ public class AccountAgeWitnessService {
|
||||
} else {
|
||||
p2PService.addP2PServiceListener(new BootstrapListener() {
|
||||
@Override
|
||||
public void onUpdatedDataReceived() {
|
||||
public void onDataReceived() {
|
||||
onBootStrapped();
|
||||
}
|
||||
});
|
||||
|
@ -48,7 +48,7 @@ public class AppStartupState {
|
||||
private final BooleanProperty walletAndNetworkReady = new SimpleBooleanProperty();
|
||||
private final BooleanProperty allDomainServicesInitialized = new SimpleBooleanProperty();
|
||||
private final BooleanProperty applicationFullyInitialized = new SimpleBooleanProperty();
|
||||
private final BooleanProperty updatedDataReceived = new SimpleBooleanProperty();
|
||||
private final BooleanProperty dataReceived = new SimpleBooleanProperty();
|
||||
private final BooleanProperty isBlockDownloadComplete = new SimpleBooleanProperty();
|
||||
private final BooleanProperty hasSufficientPeersForBroadcast = new SimpleBooleanProperty();
|
||||
|
||||
@ -57,8 +57,8 @@ public class AppStartupState {
|
||||
|
||||
p2PService.addP2PServiceListener(new BootstrapListener() {
|
||||
@Override
|
||||
public void onUpdatedDataReceived() {
|
||||
updatedDataReceived.set(true);
|
||||
public void onDataReceived() {
|
||||
dataReceived.set(true);
|
||||
}
|
||||
});
|
||||
|
||||
@ -72,7 +72,7 @@ public class AppStartupState {
|
||||
hasSufficientPeersForBroadcast.set(true);
|
||||
});
|
||||
|
||||
p2pNetworkAndWalletInitialized = EasyBind.combine(updatedDataReceived,
|
||||
p2pNetworkAndWalletInitialized = EasyBind.combine(dataReceived,
|
||||
isBlockDownloadComplete,
|
||||
hasSufficientPeersForBroadcast,
|
||||
allDomainServicesInitialized,
|
||||
@ -122,14 +122,6 @@ public class AppStartupState {
|
||||
return applicationFullyInitialized;
|
||||
}
|
||||
|
||||
public boolean isUpdatedDataReceived() {
|
||||
return updatedDataReceived.get();
|
||||
}
|
||||
|
||||
public ReadOnlyBooleanProperty updatedDataReceivedProperty() {
|
||||
return updatedDataReceived;
|
||||
}
|
||||
|
||||
public boolean isBlockDownloadComplete() {
|
||||
return isBlockDownloadComplete.get();
|
||||
}
|
||||
|
@ -849,8 +849,8 @@ public class BisqSetup {
|
||||
return p2PNetworkSetup.getP2PNetworkStatusIconId();
|
||||
}
|
||||
|
||||
public BooleanProperty getUpdatedDataReceived() {
|
||||
return p2PNetworkSetup.getUpdatedDataReceived();
|
||||
public BooleanProperty getDataReceived() {
|
||||
return p2PNetworkSetup.getDataReceived();
|
||||
}
|
||||
|
||||
public StringProperty getP2pNetworkLabelId() {
|
||||
|
@ -73,7 +73,7 @@ public class P2PNetworkSetup {
|
||||
@Getter
|
||||
final StringProperty p2pNetworkWarnMsg = new SimpleStringProperty();
|
||||
@Getter
|
||||
final BooleanProperty updatedDataReceived = new SimpleBooleanProperty();
|
||||
final BooleanProperty dataReceived = new SimpleBooleanProperty();
|
||||
@Getter
|
||||
final BooleanProperty p2pNetworkFailed = new SimpleBooleanProperty();
|
||||
final FilterManager filterManager;
|
||||
@ -97,12 +97,11 @@ public class P2PNetworkSetup {
|
||||
StringProperty bootstrapState = new SimpleStringProperty();
|
||||
StringProperty bootstrapWarning = new SimpleStringProperty();
|
||||
BooleanProperty hiddenServicePublished = new SimpleBooleanProperty();
|
||||
BooleanProperty initialP2PNetworkDataReceived = new SimpleBooleanProperty();
|
||||
|
||||
addP2PMessageFilter();
|
||||
|
||||
p2PNetworkInfoBinding = EasyBind.combine(bootstrapState, bootstrapWarning, p2PService.getNumConnectedPeers(),
|
||||
walletsSetup.numPeersProperty(), hiddenServicePublished, initialP2PNetworkDataReceived,
|
||||
walletsSetup.numPeersProperty(), hiddenServicePublished, dataReceived,
|
||||
(state, warning, numP2pPeers, numBtcPeers, hiddenService, dataReceived) -> {
|
||||
String result;
|
||||
String daoFullNode = preferences.isDaoFullNode() ? Res.get("mainView.footer.daoFullNode") + " / " : "";
|
||||
@ -171,8 +170,8 @@ public class P2PNetworkSetup {
|
||||
@Override
|
||||
public void onDataReceived() {
|
||||
log.debug("onRequestingDataCompleted");
|
||||
initialP2PNetworkDataReceived.set(true);
|
||||
bootstrapState.set(Res.get("mainView.bootstrapState.initialDataReceived"));
|
||||
dataReceived.set(true);
|
||||
splashP2PNetworkAnimationVisible.set(false);
|
||||
p2pNetworkInitialized.set(true);
|
||||
}
|
||||
@ -208,7 +207,6 @@ public class P2PNetworkSetup {
|
||||
public void onUpdatedDataReceived() {
|
||||
log.debug("onUpdatedDataReceived");
|
||||
splashP2PNetworkAnimationVisible.set(false);
|
||||
updatedDataReceived.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -183,7 +183,7 @@ public abstract class AccountingNode implements DaoSetupService, DaoStateListene
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpdatedDataReceived() {
|
||||
public void onDataReceived() {
|
||||
onP2PNetworkReady();
|
||||
}
|
||||
};
|
||||
|
@ -113,6 +113,7 @@ public abstract class BsqNode implements DaoSetupService {
|
||||
|
||||
@Override
|
||||
public void onDataReceived() {
|
||||
onP2PNetworkReady();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -126,7 +127,6 @@ public abstract class BsqNode implements DaoSetupService {
|
||||
|
||||
@Override
|
||||
public void onUpdatedDataReceived() {
|
||||
onP2PNetworkReady();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -91,9 +91,12 @@ class GetBlocksRequestHandler {
|
||||
|
||||
public void onGetBlocksRequest(GetBlocksRequest getBlocksRequest, Connection connection) {
|
||||
long ts = System.currentTimeMillis();
|
||||
// We limit number of blocks to 3000 which is about 3 weeks.
|
||||
List<Block> blocks = new LinkedList<>(daoStateService.getBlocksFromBlockHeight(getBlocksRequest.getFromBlockHeight(), 3000));
|
||||
List<RawBlock> rawBlocks = blocks.stream().map(RawBlock::fromBlock).collect(Collectors.toList());
|
||||
// We limit number of blocks to 3000 which is about 3 weeks and about 5 MB on data
|
||||
List<Block> blocks = daoStateService.getBlocksFromBlockHeight(getBlocksRequest.getFromBlockHeight());
|
||||
List<RawBlock> rawBlocks = new LinkedList<>(blocks).stream()
|
||||
.map(RawBlock::fromBlock)
|
||||
.limit(3000)
|
||||
.collect(Collectors.toList());
|
||||
GetBlocksResponse getBlocksResponse = new GetBlocksResponse(rawBlocks, getBlocksRequest.getNonce());
|
||||
log.info("Received GetBlocksRequest from {} for blocks from height {}. " +
|
||||
"Building GetBlocksResponse with {} blocks took {} ms.",
|
||||
|
@ -87,6 +87,7 @@ public class LiteNode extends BsqNode {
|
||||
blockDownloadListener = (observable, oldValue, newValue) -> {
|
||||
if ((double) newValue == 1) {
|
||||
setupWalletBestBlockListener();
|
||||
maybeStartRequestingBlocks();
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -176,8 +177,13 @@ public class LiteNode extends BsqNode {
|
||||
}
|
||||
});
|
||||
|
||||
if (!parseBlockchainComplete)
|
||||
maybeStartRequestingBlocks();
|
||||
}
|
||||
|
||||
private void maybeStartRequestingBlocks() {
|
||||
if (walletsSetup.isDownloadComplete() && p2pNetworkReady && !parseBlockchainComplete) {
|
||||
startParseBlocks();
|
||||
}
|
||||
}
|
||||
|
||||
// First we request the blocks from a full node
|
||||
@ -192,7 +198,8 @@ public class LiteNode extends BsqNode {
|
||||
return;
|
||||
}
|
||||
|
||||
// If we request blocks we increment the ConnectionState counter.
|
||||
// If we request blocks we increment the ConnectionState counter so that the connection does not get reset from
|
||||
// INITIAL_DATA_EXCHANGE to PEER and therefore lower priority for getting closed
|
||||
ConnectionState.incrementExpectedInitialDataResponses();
|
||||
|
||||
if (chainHeight == daoStateService.getGenesisBlockHeight()) {
|
||||
@ -229,6 +236,11 @@ public class LiteNode extends BsqNode {
|
||||
return;
|
||||
}
|
||||
|
||||
if (walletsSetup.isDownloadComplete() && chainTipHeight < bsqWalletService.getBestChainHeight()) {
|
||||
// We need to request more blocks and increment the ConnectionState counter so that the connection does not get reset from
|
||||
// INITIAL_DATA_EXCHANGE to PEER and therefore lower priority for getting closed
|
||||
ConnectionState.incrementExpectedInitialDataResponses();
|
||||
}
|
||||
runDelayedBatchProcessing(new ArrayList<>(blockList),
|
||||
() -> {
|
||||
double duration = System.currentTimeMillis() - ts;
|
||||
@ -239,8 +251,12 @@ public class LiteNode extends BsqNode {
|
||||
// We only request again if wallet is synced, otherwise we would get repeated calls we want to avoid.
|
||||
// We deal with that case at the setupWalletBestBlockListener method above.
|
||||
if (walletsSetup.isDownloadComplete() && daoStateService.getChainHeight() < bsqWalletService.getBestChainHeight()) {
|
||||
log.info("We have completed batch processing of {} blocks but we have still {} missing blocks and request again.",
|
||||
blockList.size(), bsqWalletService.getBestChainHeight() - daoStateService.getChainHeight());
|
||||
|
||||
liteNodeNetworkService.requestBlocks(daoStateService.getChainHeight() + 1);
|
||||
} else {
|
||||
log.info("We have completed batch processing of {} blocks and we have reached the chain tip of the wallet.", blockList.size());
|
||||
onParsingComplete.run();
|
||||
onParseBlockChainComplete();
|
||||
}
|
||||
|
@ -128,7 +128,7 @@ public class RequestBlocksHandler implements MessageListener {
|
||||
},
|
||||
TIMEOUT_MIN, TimeUnit.MINUTES);
|
||||
|
||||
log.info("We request blocks from peer {} from block height {}.", nodeAddress, getBlocksRequest.getFromBlockHeight());
|
||||
log.info("\n\n>> We request blocks from peer {} from block height {}.\n", nodeAddress, getBlocksRequest.getFromBlockHeight());
|
||||
|
||||
networkNode.addMessageListener(this);
|
||||
|
||||
@ -136,7 +136,7 @@ public class RequestBlocksHandler implements MessageListener {
|
||||
Futures.addCallback(future, new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(Connection connection) {
|
||||
log.info("Sending of GetBlocksRequest message to peer {} succeeded.", nodeAddress.getFullAddress());
|
||||
log.debug("Sending of GetBlocksRequest message to peer {} succeeded.", nodeAddress.getFullAddress());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -190,7 +190,9 @@ public class RequestBlocksHandler implements MessageListener {
|
||||
}
|
||||
|
||||
terminate();
|
||||
log.info("We received from peer {} a BlocksResponse with {} blocks",
|
||||
log.info("\n#################################################################\n" +
|
||||
"We received from peer {} a BlocksResponse with {} blocks" +
|
||||
"\n#################################################################\n",
|
||||
nodeAddress.getFullAddress(), getBlocksResponse.getBlocks().size());
|
||||
listener.onComplete(getBlocksResponse);
|
||||
}
|
||||
|
@ -75,7 +75,7 @@ public final class GetBlocksResponse extends NetworkEnvelope implements DirectMe
|
||||
List<RawBlock> list = proto.getRawBlocksList().stream()
|
||||
.map(RawBlock::fromProto)
|
||||
.collect(Collectors.toList());
|
||||
log.info("Received a GetBlocksResponse with {} blocks and {} kB size", list.size(), proto.getSerializedSize() / 1000d);
|
||||
log.info("\n\n<< Received a GetBlocksResponse with {} blocks and {} kB size\n", list.size(), proto.getSerializedSize() / 1000d);
|
||||
return new GetBlocksResponse(proto.getRawBlocksList().isEmpty() ?
|
||||
new ArrayList<>() :
|
||||
list,
|
||||
|
@ -349,15 +349,8 @@ public class DaoStateService implements DaoSetupService {
|
||||
}
|
||||
|
||||
public List<Block> getBlocksFromBlockHeight(int fromBlockHeight) {
|
||||
return getBlocksFromBlockHeight(fromBlockHeight, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
public List<Block> getBlocksFromBlockHeight(int fromBlockHeight, int numMaxBlocks) {
|
||||
// We limit requests to numMaxBlocks blocks, to avoid performance issues and too
|
||||
// large network data in case a node requests too far back in history.
|
||||
return getBlocks().stream()
|
||||
.filter(block -> block.getHeight() >= fromBlockHeight)
|
||||
.limit(numMaxBlocks)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
@ -188,6 +188,12 @@ public class FilterManager {
|
||||
p2PService.addP2PServiceListener(new P2PServiceListener() {
|
||||
@Override
|
||||
public void onDataReceived() {
|
||||
// We should have received all data at that point and if the filters were not set we
|
||||
// clean up the persisted banned nodes in the options file as it might be that we missed the filter
|
||||
// remove message if we have not been online.
|
||||
if (filterProperty.get() == null) {
|
||||
clearBannedNodes();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -200,12 +206,6 @@ public class FilterManager {
|
||||
|
||||
@Override
|
||||
public void onUpdatedDataReceived() {
|
||||
// We should have received all data at that point and if the filters were not set we
|
||||
// clean up the persisted banned nodes in the options file as it might be that we missed the filter
|
||||
// remove message if we have not been online.
|
||||
if (filterProperty.get() == null) {
|
||||
clearBannedNodes();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -112,7 +112,7 @@ public class OfferBookService {
|
||||
if (dumpStatistics) {
|
||||
p2PService.addP2PServiceListener(new BootstrapListener() {
|
||||
@Override
|
||||
public void onUpdatedDataReceived() {
|
||||
public void onDataReceived() {
|
||||
addOfferBookChangedListener(new OfferBookChangedListener() {
|
||||
@Override
|
||||
public void onAdded(Offer offer) {
|
||||
|
@ -211,7 +211,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
|
||||
} else {
|
||||
p2PService.addP2PServiceListener(new BootstrapListener() {
|
||||
@Override
|
||||
public void onUpdatedDataReceived() {
|
||||
public void onDataReceived() {
|
||||
onBootstrapComplete();
|
||||
}
|
||||
});
|
||||
|
@ -78,7 +78,7 @@ public class TriggerPriceService {
|
||||
} else {
|
||||
p2PService.addP2PServiceListener(new BootstrapListener() {
|
||||
@Override
|
||||
public void onUpdatedDataReceived() {
|
||||
public void onDataReceived() {
|
||||
onBootstrapComplete();
|
||||
}
|
||||
});
|
||||
|
@ -122,7 +122,7 @@ public class OpenBsqSwapOfferService {
|
||||
};
|
||||
bootstrapListener = new BootstrapListener() {
|
||||
@Override
|
||||
public void onUpdatedDataReceived() {
|
||||
public void onDataReceived() {
|
||||
onP2PServiceReady();
|
||||
p2PService.removeP2PServiceListener(bootstrapListener);
|
||||
}
|
||||
|
@ -253,7 +253,7 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
|
||||
|
||||
p2PService.addP2PServiceListener(new BootstrapListener() {
|
||||
@Override
|
||||
public void onUpdatedDataReceived() {
|
||||
public void onDataReceived() {
|
||||
tryApplyMessages();
|
||||
checkDisputesForUpdates();
|
||||
}
|
||||
|
@ -161,7 +161,7 @@ public abstract class DisputeAgentManager<T extends DisputeAgent> {
|
||||
else
|
||||
p2PService.addP2PServiceListener(new BootstrapListener() {
|
||||
@Override
|
||||
public void onUpdatedDataReceived() {
|
||||
public void onDataReceived() {
|
||||
startRepublishDisputeAgent();
|
||||
}
|
||||
});
|
||||
|
@ -400,7 +400,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
|
||||
} else {
|
||||
p2PService.addP2PServiceListener(new BootstrapListener() {
|
||||
@Override
|
||||
public void onUpdatedDataReceived() {
|
||||
public void onDataReceived() {
|
||||
initPersistedTrades();
|
||||
}
|
||||
});
|
||||
|
@ -69,7 +69,7 @@ public class CleanupMailboxMessagesService {
|
||||
} else {
|
||||
p2PService.addP2PServiceListener(new BootstrapListener() {
|
||||
@Override
|
||||
public void onUpdatedDataReceived() {
|
||||
public void onDataReceived() {
|
||||
cleanupMailboxMessages(trades);
|
||||
}
|
||||
});
|
||||
|
@ -95,7 +95,7 @@ public class TradeStatisticsConverter {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpdatedDataReceived() {
|
||||
public void onDataReceived() {
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -337,7 +337,7 @@ public class XmrTxProofService implements AssetTxProofService {
|
||||
} else {
|
||||
bootstrapListener = new BootstrapListener() {
|
||||
@Override
|
||||
public void onUpdatedDataReceived() {
|
||||
public void onDataReceived() {
|
||||
p2PService.removeP2PServiceListener(bootstrapListener);
|
||||
result.set(true);
|
||||
}
|
||||
|
@ -820,7 +820,7 @@ public class MainView extends InitializableView<StackPane, MainViewModel>
|
||||
}
|
||||
});
|
||||
|
||||
model.getUpdatedDataReceived().addListener((observable, oldValue, newValue) -> {
|
||||
model.getDataReceived().addListener((observable, oldValue, newValue) -> {
|
||||
p2PNetworkIcon.setOpacity(1);
|
||||
p2pNetworkProgressBar.setProgress(0);
|
||||
});
|
||||
|
@ -536,7 +536,7 @@ public class MainViewModel implements ViewModel, BisqSetup.BisqSetupListener {
|
||||
} else {
|
||||
p2PService.addP2PServiceListener(new BootstrapListener() {
|
||||
@Override
|
||||
public void onUpdatedDataReceived() {
|
||||
public void onDataReceived() {
|
||||
setupInvalidOpenOffersHandler();
|
||||
}
|
||||
});
|
||||
@ -699,7 +699,7 @@ public class MainViewModel implements ViewModel, BisqSetup.BisqSetupListener {
|
||||
} else {
|
||||
p2PService.addP2PServiceListener(new BootstrapListener() {
|
||||
@Override
|
||||
public void onUpdatedDataReceived() {
|
||||
public void onDataReceived() {
|
||||
accountAgeWitnessService.publishMyAccountAgeWitness(aliPayAccount.getPaymentAccountPayload());
|
||||
}
|
||||
});
|
||||
@ -842,8 +842,8 @@ public class MainViewModel implements ViewModel, BisqSetup.BisqSetupListener {
|
||||
return bisqSetup.getP2PNetworkStatusIconId();
|
||||
}
|
||||
|
||||
BooleanProperty getUpdatedDataReceived() {
|
||||
return bisqSetup.getUpdatedDataReceived();
|
||||
BooleanProperty getDataReceived() {
|
||||
return bisqSetup.getDataReceived();
|
||||
}
|
||||
|
||||
StringProperty getP2pNetworkLabelId() {
|
||||
|
@ -208,7 +208,7 @@ public abstract class TradeStepView extends AnchorPane {
|
||||
} else {
|
||||
bootstrapListener = new BootstrapListener() {
|
||||
@Override
|
||||
public void onUpdatedDataReceived() {
|
||||
public void onDataReceived() {
|
||||
registerSubscriptions();
|
||||
}
|
||||
};
|
||||
|
@ -2,10 +2,12 @@
|
||||
<configuration>
|
||||
<appender name="CONSOLE_APPENDER" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%highlight(%d{MMM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{30}: %msg %xEx%n)</pattern>
|
||||
<pattern>%highlight(%d{MMM-dd HH:mm:ss.SSS} [%thread] %-5level%logger{40}: %msg %xEx%n)</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!-- <logger name="org.bitcoinj" level="WARN"/>-->
|
||||
<logger name="org.berndpruenster.netlayer.tor.Tor" level="WARN"/>
|
||||
<root level="TRACE">
|
||||
<appender-ref ref="CONSOLE_APPENDER"/>
|
||||
</root>
|
||||
|
@ -40,11 +40,11 @@ public abstract class BootstrapListener implements P2PServiceListener {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDataReceived() {
|
||||
public void onUpdatedDataReceived() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public abstract void onUpdatedDataReceived();
|
||||
public abstract void onDataReceived();
|
||||
|
||||
@Override
|
||||
public void onRequestCustomBridges() {
|
||||
|
@ -310,7 +310,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
|
||||
|
||||
@Override
|
||||
public void onUpdatedDataReceived() {
|
||||
applyIsBootstrapped(P2PServiceListener::onUpdatedDataReceived);
|
||||
p2pServiceListeners.forEach(P2PServiceListener::onUpdatedDataReceived);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -325,7 +325,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
|
||||
|
||||
@Override
|
||||
public void onDataReceived() {
|
||||
p2pServiceListeners.forEach(P2PServiceListener::onDataReceived);
|
||||
applyIsBootstrapped(P2PServiceListener::onDataReceived);
|
||||
|
||||
}
|
||||
|
||||
private void applyIsBootstrapped(Consumer<P2PServiceListener> listenerHandler) {
|
||||
|
@ -142,7 +142,7 @@ class RequestDataHandler implements MessageListener {
|
||||
}
|
||||
|
||||
getDataRequestType = getDataRequest.getClass().getSimpleName();
|
||||
log.info("We send a {} to peer {}. ", getDataRequestType, nodeAddress);
|
||||
log.info("\n\n>> We send a {} to peer {}\n", getDataRequestType, nodeAddress);
|
||||
networkNode.addMessageListener(this);
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getDataRequest);
|
||||
//noinspection UnstableApiUsage
|
||||
@ -242,7 +242,7 @@ class RequestDataHandler implements MessageListener {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
String sep = System.lineSeparator();
|
||||
sb.append(sep).append("#################################################################").append(sep);
|
||||
sb.append("Connected to node: ").append(peersNodeAddress.getFullAddress()).append(sep);
|
||||
sb.append("Data provided by node: ").append(peersNodeAddress.getFullAddress()).append(sep);
|
||||
int items = dataSet.size() + persistableNetworkPayloadSet.size();
|
||||
sb.append("Received ").append(items).append(" instances from a ")
|
||||
.append(getDataRequestType).append(sep);
|
||||
@ -252,7 +252,7 @@ class RequestDataHandler implements MessageListener {
|
||||
.append(" / ")
|
||||
.append(Utilities.readableFileSize(value.second.get()))
|
||||
.append(sep));
|
||||
sb.append("#################################################################");
|
||||
sb.append("#################################################################\n");
|
||||
log.info(sb.toString());
|
||||
}
|
||||
|
||||
|
@ -109,6 +109,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
|
||||
private Optional<NodeAddress> nodeAddressOfPreliminaryDataRequest = Optional.empty();
|
||||
private Timer retryTimer;
|
||||
private boolean dataUpdateRequested;
|
||||
private boolean allDataReceived;
|
||||
private boolean stopped;
|
||||
private int numRepeatedRequests = 0;
|
||||
|
||||
@ -359,17 +360,26 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
|
||||
checkNotNull(listener).onUpdatedDataReceived();
|
||||
}
|
||||
|
||||
checkNotNull(listener).onDataReceived();
|
||||
|
||||
if (wasTruncated) {
|
||||
if (numRepeatedRequests < 10) {
|
||||
if (numRepeatedRequests < 20) {
|
||||
// If we had allDataReceived already set to true but get a response with truncated flag,
|
||||
// we still repeat the request to that node for higher redundancy. Otherwise, one seed node
|
||||
// providing incomplete data would stop others to fill the gaps.
|
||||
log.info("DataResponse did not contain all data, so we repeat request until we got all data");
|
||||
UserThread.runAfter(() -> requestData(nodeAddress, remainingNodeAddresses), 2);
|
||||
} else {
|
||||
log.info("DataResponse still did not contained all data but we requested already 10 times and stop now.");
|
||||
} else if (!allDataReceived) {
|
||||
allDataReceived = true;
|
||||
log.warn("\n#################################################################\n" +
|
||||
"Loading initial data from {} did not complete after 20 repeated requests. \n" +
|
||||
"#################################################################\n", nodeAddress);
|
||||
checkNotNull(listener).onDataReceived();
|
||||
}
|
||||
} else {
|
||||
log.info("DataResponse contained all data");
|
||||
} else if (!allDataReceived) {
|
||||
allDataReceived = true;
|
||||
log.info("\n\n#################################################################\n" +
|
||||
"Loading initial data from {} completed\n" +
|
||||
"#################################################################\n", nodeAddress);
|
||||
checkNotNull(listener).onDataReceived();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -126,9 +126,9 @@ public final class GetDataResponse extends NetworkEnvelope implements SupportedC
|
||||
NetworkProtoResolver resolver,
|
||||
int messageVersion) {
|
||||
boolean wasTruncated = proto.getWasTruncated();
|
||||
log.info("Received a GetDataResponse with {} {}",
|
||||
log.info("\n\n<< Received a GetDataResponse with {} {}\n",
|
||||
Utilities.readableFileSize(proto.getSerializedSize()),
|
||||
wasTruncated ? " (was truncated)" : "");
|
||||
wasTruncated ? " (still data missing)" : " (all data received)");
|
||||
Set<ProtectedStorageEntry> dataSet = proto.getDataSetList().stream()
|
||||
.map(entry -> (ProtectedStorageEntry) resolver.fromProto(entry)).collect(Collectors.toSet());
|
||||
Set<PersistableNetworkPayload> persistableNetworkPayloadSet = proto.getPersistableNetworkPayloadItemsList().stream()
|
||||
|
@ -145,7 +145,7 @@ public abstract class HistoricalDataStoreService<T extends PersistableNetworkPay
|
||||
@Override
|
||||
protected void readFromResources(String postFix, Runnable completeHandler) {
|
||||
readStore(persisted -> {
|
||||
log.info("We have created the {} store for the live data and filled it with {} entries from the persisted data.",
|
||||
log.debug("We have created the {} store for the live data and filled it with {} entries from the persisted data.",
|
||||
getFileName(), getMapOfLiveData().size());
|
||||
|
||||
// Now we add our historical data stores.
|
||||
@ -185,7 +185,7 @@ public abstract class HistoricalDataStoreService<T extends PersistableNetworkPay
|
||||
persistenceManager.readPersisted(fileName, persisted -> {
|
||||
storesByVersion.put(version, persisted);
|
||||
allHistoricalPayloads.putAll(persisted.getMap());
|
||||
log.info("We have read from {} {} historical items.", fileName, persisted.getMap().size());
|
||||
log.debug("We have read from {} {} historical items.", fileName, persisted.getMap().size());
|
||||
pruneStore(persisted, version);
|
||||
completeHandler.run();
|
||||
},
|
||||
@ -199,11 +199,11 @@ public abstract class HistoricalDataStoreService<T extends PersistableNetworkPay
|
||||
mapOfLiveData.keySet().removeAll(historicalStore.getMap().keySet());
|
||||
int postLive = mapOfLiveData.size();
|
||||
if (preLive > postLive) {
|
||||
log.info("We pruned data from our live data store which are already contained in the historical data store with version {}. " +
|
||||
log.debug("We pruned data from our live data store which are already contained in the historical data store with version {}. " +
|
||||
"The live map had {} entries before pruning and has {} entries afterwards.",
|
||||
version, preLive, postLive);
|
||||
} else {
|
||||
log.info("No pruning from historical data store with version {} was applied", version);
|
||||
log.debug("No pruning from historical data store with version {} was applied", version);
|
||||
}
|
||||
requestPersistence();
|
||||
}
|
||||
|
@ -113,18 +113,18 @@ public abstract class StoreService<T extends PersistableEnvelope> {
|
||||
File destinationFile = new File(Paths.get(absolutePathOfStorageDir, fileName).toString());
|
||||
if (!destinationFile.exists()) {
|
||||
try {
|
||||
log.info("We copy resource to file: resourceFileName={}, destinationFile={}", resourceFileName, destinationFile);
|
||||
log.debug("We copy resource to file: resourceFileName={}, destinationFile={}", resourceFileName, destinationFile);
|
||||
FileUtil.resourceToFile(resourceFileName, destinationFile);
|
||||
return true;
|
||||
} catch (ResourceNotFoundException e) {
|
||||
log.info("Could not find resourceFile " + resourceFileName + ". That is expected if none is provided yet.");
|
||||
log.debug("Could not find resourceFile " + resourceFileName + ". That is expected if none is provided yet.");
|
||||
} catch (Throwable e) {
|
||||
log.error("Could not copy resourceFile " + resourceFileName + " to " +
|
||||
destinationFile.getAbsolutePath() + ".\n" + e.getMessage());
|
||||
e.printStackTrace();
|
||||
}
|
||||
} else {
|
||||
log.info("No resource file was copied. {} exists already.", fileName);
|
||||
log.debug("No resource file was copied. {} exists already.", fileName);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
// TorNode created. Took 6 sec.
|
||||
// Hidden service created. Took 40-50 sec.
|
||||
@ -94,7 +93,7 @@ public class PeerServiceTest {
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
//@Test
|
||||
public void testSingleSeedNode() throws InterruptedException {
|
||||
LocalhostNetworkNode.setSimulateTorDelayTorNode(0);
|
||||
LocalhostNetworkNode.setSimulateTorDelayHiddenService(0);
|
||||
|
@ -55,7 +55,7 @@ public class Statistics {
|
||||
priceFeedService.setCurrencyCode("USD");
|
||||
p2pService.addP2PServiceListener(new BootstrapListener() {
|
||||
@Override
|
||||
public void onUpdatedDataReceived() {
|
||||
public void onDataReceived() {
|
||||
// we need to have tor ready
|
||||
log.info("onBootstrapComplete: we start requestPriceFeed");
|
||||
priceFeedService.requestPriceFeed(price -> log.info("requestPriceFeed. price=" + price),
|
||||
|
Loading…
Reference in New Issue
Block a user