From 4a05e5bf0d5c5a84678ff032e987ea5997300af9 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Thu, 26 Nov 2020 11:23:48 -0500 Subject: [PATCH] Improve getBlocks request handling --- .../main/java/bisq/core/dao/node/BsqNode.java | 4 +- .../full/network/FullNodeNetworkService.java | 99 ++++--- .../full/network/GetBlocksRequestHandler.java | 32 ++- .../bisq/core/dao/node/lite/LiteNode.java | 55 +++- .../lite/network/LiteNodeNetworkService.java | 252 ++++++++++-------- .../lite/network/RequestBlocksHandler.java | 156 +++++------ 6 files changed, 333 insertions(+), 265 deletions(-) diff --git a/core/src/main/java/bisq/core/dao/node/BsqNode.java b/core/src/main/java/bisq/core/dao/node/BsqNode.java index e06be38757..037f7b3b7f 100644 --- a/core/src/main/java/bisq/core/dao/node/BsqNode.java +++ b/core/src/main/java/bisq/core/dao/node/BsqNode.java @@ -189,7 +189,7 @@ public abstract class BsqNode implements DaoSetupService { if (chainHeight > genesisBlockHeight) startBlockHeight = chainHeight + 1; - log.info("Start parse blocks:\n" + + log.info("getStartBlockHeight:\n" + " Start block height={}\n" + " Genesis txId={}\n" + " Genesis block height={}\n" + @@ -223,7 +223,7 @@ public abstract class BsqNode implements DaoSetupService { // height we have no block but chainHeight is initially set to genesis height (bad design ;-( but a bit tricky // to change now as it used in many areas.) if (daoStateService.getBlockAtHeight(rawBlock.getHeight()).isPresent()) { - log.debug("We have already a block with the height of the new block. Height of new block={}", rawBlock.getHeight()); + log.info("We have already a block with the height of the new block. Height of new block={}", rawBlock.getHeight()); return Optional.empty(); } diff --git a/core/src/main/java/bisq/core/dao/node/full/network/FullNodeNetworkService.java b/core/src/main/java/bisq/core/dao/node/full/network/FullNodeNetworkService.java index 640ee94b15..3faf2b8bac 100644 --- a/core/src/main/java/bisq/core/dao/node/full/network/FullNodeNetworkService.java +++ b/core/src/main/java/bisq/core/dao/node/full/network/FullNodeNetworkService.java @@ -136,51 +136,62 @@ public class FullNodeNetworkService implements MessageListener, PeerManager.List @Override public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { if (networkEnvelope instanceof GetBlocksRequest) { - // We received a GetBlocksRequest from a liteNode - if (!stopped) { - final String uid = connection.getUid(); - if (!getBlocksRequestHandlers.containsKey(uid)) { - GetBlocksRequestHandler requestHandler = new GetBlocksRequestHandler(networkNode, - daoStateService, - new GetBlocksRequestHandler.Listener() { - @Override - public void onComplete() { - getBlocksRequestHandlers.remove(uid); - } - - @Override - public void onFault(String errorMessage, @Nullable Connection connection) { - getBlocksRequestHandlers.remove(uid); - if (!stopped) { - log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" + - "ErrorMessage={}", connection, errorMessage); - peerManager.handleConnectionFault(connection); - } else { - log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call."); - } - } - }); - getBlocksRequestHandlers.put(uid, requestHandler); - requestHandler.onGetBlocksRequest((GetBlocksRequest) networkEnvelope, connection); - } else { - log.warn("We have already a GetDataRequestHandler for that connection started. " + - "We start a cleanup timer if the handler has not closed by itself in between 2 minutes."); - - UserThread.runAfter(() -> { - if (getBlocksRequestHandlers.containsKey(uid)) { - GetBlocksRequestHandler handler = getBlocksRequestHandlers.get(uid); - handler.stop(); - getBlocksRequestHandlers.remove(uid); - } - }, CLEANUP_TIMER); - } - } else { - log.warn("We have stopped already. We ignore that onMessage call."); - } + handleGetBlocksRequest((GetBlocksRequest) networkEnvelope, connection); } else if (networkEnvelope instanceof RepublishGovernanceDataRequest) { - log.warn("We received a RepublishGovernanceDataRequest and re-published all proposalPayloads and " + - "blindVotePayloads to the P2P network."); - missingDataRequestService.reRepublishAllGovernanceData(); + handleRepublishGovernanceDataRequest(); } } + + private void handleGetBlocksRequest(GetBlocksRequest getBlocksRequest, Connection connection) { + if (stopped) { + log.warn("We have stopped already. We ignore that onMessage call."); + return; + } + + String uid = connection.getUid(); + if (getBlocksRequestHandlers.containsKey(uid)) { + log.warn("We have already a GetDataRequestHandler for that connection started. " + + "We start a cleanup timer if the handler has not closed by itself in between 2 minutes."); + + UserThread.runAfter(() -> { + if (getBlocksRequestHandlers.containsKey(uid)) { + GetBlocksRequestHandler handler = getBlocksRequestHandlers.get(uid); + handler.stop(); + getBlocksRequestHandlers.remove(uid); + } + }, CLEANUP_TIMER); + return; + } + + GetBlocksRequestHandler requestHandler = new GetBlocksRequestHandler(networkNode, + daoStateService, + new GetBlocksRequestHandler.Listener() { + @Override + public void onComplete() { + getBlocksRequestHandlers.remove(uid); + } + + @Override + public void onFault(String errorMessage, @Nullable Connection connection) { + getBlocksRequestHandlers.remove(uid); + if (!stopped) { + log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" + + "ErrorMessage={}", connection, errorMessage); + if (connection != null) { + peerManager.handleConnectionFault(connection); + } + } else { + log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call."); + } + } + }); + getBlocksRequestHandlers.put(uid, requestHandler); + requestHandler.onGetBlocksRequest(getBlocksRequest, connection); + } + + private void handleRepublishGovernanceDataRequest() { + log.warn("We received a RepublishGovernanceDataRequest and re-published all proposalPayloads and " + + "blindVotePayloads to the P2P network."); + missingDataRequestService.reRepublishAllGovernanceData(); + } } diff --git a/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java b/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java index b197970f1a..dd8d6e9203 100644 --- a/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java +++ b/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java @@ -49,7 +49,7 @@ import org.jetbrains.annotations.NotNull; */ @Slf4j class GetBlocksRequestHandler { - private static final long TIMEOUT = 120; + private static final long TIMEOUT_MIN = 3; /////////////////////////////////////////////////////////////////////////////////////////// @@ -89,22 +89,28 @@ class GetBlocksRequestHandler { // API /////////////////////////////////////////////////////////////////////////////////////////// - public void onGetBlocksRequest(GetBlocksRequest getBlocksRequest, final Connection connection) { + public void onGetBlocksRequest(GetBlocksRequest getBlocksRequest, Connection connection) { + long ts = System.currentTimeMillis(); // We limit number of blocks to 6000 which is about 1.5 month. List blocks = new LinkedList<>(daoStateService.getBlocksFromBlockHeight(getBlocksRequest.getFromBlockHeight(), 6000)); List rawBlocks = blocks.stream().map(RawBlock::fromBlock).collect(Collectors.toList()); GetBlocksResponse getBlocksResponse = new GetBlocksResponse(rawBlocks, getBlocksRequest.getNonce()); - log.info("Received GetBlocksRequest from {} for blocks from height {}", - connection.getPeersNodeAddressOptional(), getBlocksRequest.getFromBlockHeight()); - if (timeoutTimer == null) { - timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions - String errorMessage = "A timeout occurred for getBlocksResponse.requestNonce:" + - getBlocksResponse.getRequestNonce() + - " on connection:" + connection; - handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, connection); - }, - TIMEOUT, TimeUnit.SECONDS); + log.info("Received GetBlocksRequest from {} for blocks from height {}. " + + "Building GetBlocksResponse with {} blocks took {} ms.", + connection.getPeersNodeAddressOptional(), getBlocksRequest.getFromBlockHeight(), + rawBlocks.size(), System.currentTimeMillis() - ts); + + if (timeoutTimer != null) { + timeoutTimer.stop(); + log.warn("Timeout was already running. We stopped it."); } + timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions + String errorMessage = "A timeout occurred for getBlocksResponse.requestNonce:" + + getBlocksResponse.getRequestNonce() + + " on connection: " + connection; + handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, connection); + }, + TIMEOUT_MIN, TimeUnit.MINUTES); SettableFuture future = networkNode.sendMessage(connection, getBlocksResponse); Futures.addCallback(future, new FutureCallback<>() { @@ -145,7 +151,7 @@ class GetBlocksRequestHandler { private void handleFault(String errorMessage, CloseConnectionReason closeConnectionReason, Connection connection) { if (!stopped) { - log.debug(errorMessage + "\n\tcloseConnectionReason=" + closeConnectionReason); + log.warn("{}, closeConnectionReason={}", errorMessage, closeConnectionReason); cleanup(); listener.onFault(errorMessage, connection); } else { diff --git a/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java b/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java index a51c5a1123..ab7c4b12c8 100644 --- a/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java +++ b/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java @@ -17,6 +17,7 @@ package bisq.core.dao.node.lite; +import bisq.core.btc.setup.WalletsSetup; import bisq.core.btc.wallet.BsqWalletService; import bisq.core.dao.node.BsqNode; import bisq.core.dao.node.explorer.ExportJsonFilesService; @@ -37,8 +38,11 @@ import bisq.common.UserThread; import com.google.inject.Inject; +import javafx.beans.value.ChangeListener; + import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -54,7 +58,9 @@ public class LiteNode extends BsqNode { private final LiteNodeNetworkService liteNodeNetworkService; private final BsqWalletService bsqWalletService; + private final WalletsSetup walletsSetup; private Timer checkForBlockReceivedTimer; + private ChangeListener blockDownloadListener; /////////////////////////////////////////////////////////////////////////////////////////// @@ -69,11 +75,13 @@ public class LiteNode extends BsqNode { P2PService p2PService, LiteNodeNetworkService liteNodeNetworkService, BsqWalletService bsqWalletService, + WalletsSetup walletsSetup, ExportJsonFilesService exportJsonFilesService) { super(blockParser, daoStateService, daoStateSnapshotService, p2PService, exportJsonFilesService); this.liteNodeNetworkService = liteNodeNetworkService; this.bsqWalletService = bsqWalletService; + this.walletsSetup = walletsSetup; } @@ -87,7 +95,24 @@ public class LiteNode extends BsqNode { liteNodeNetworkService.start(); - bsqWalletService.addNewBestBlockListener(block -> { + // We wait until the wallet is synced before using it trigger requests + if (walletsSetup.isDownloadComplete()) { + setupWalletBestBlockListener(); + } else { + if (blockDownloadListener == null) { + blockDownloadListener = (observable, oldValue, newValue) -> { + if ((double) newValue == 1) { + setupWalletBestBlockListener(); + UserThread.execute(() -> walletsSetup.downloadPercentageProperty().removeListener(blockDownloadListener)); + } + }; + walletsSetup.downloadPercentageProperty().addListener(blockDownloadListener); + } + } + } + + private void setupWalletBestBlockListener() { + bsqWalletService.addNewBestBlockListener(btcBlock -> { // Check if we are done with parsing if (!daoStateService.isParseBlockChainComplete()) return; @@ -97,20 +122,20 @@ public class LiteNode extends BsqNode { checkForBlockReceivedTimer.stop(); } - int height = block.getHeight(); - log.info("New block at height {} from bsqWalletService", height); + int btcWalletHeight = btcBlock.getHeight(); + log.error("New block at height {} from bsqWalletService", btcWalletHeight); // We expect to receive the new BSQ block from the network shortly after BitcoinJ has been aware of it. // If we don't receive it we request it manually from seed nodes checkForBlockReceivedTimer = UserThread.runAfter(() -> { - int chainHeight = daoStateService.getChainHeight(); - if (chainHeight < height) { + int daoChainHeight = daoStateService.getChainHeight(); + if (daoChainHeight < btcWalletHeight) { log.warn("We did not receive a block from the network {} seconds after we saw the new block in BicoinJ. " + "We request from our seed nodes missing blocks from block height {}.", - CHECK_FOR_BLOCK_RECEIVED_DELAY_SEC, chainHeight + 1); - liteNodeNetworkService.requestBlocks(chainHeight + 1); + CHECK_FOR_BLOCK_RECEIVED_DELAY_SEC, daoChainHeight + 1); + liteNodeNetworkService.requestBlocks(daoChainHeight + 1); } - }, CHECK_FOR_BLOCK_RECEIVED_DELAY_SEC); + }, CHECK_FOR_BLOCK_RECEIVED_DELAY_SEC, TimeUnit.MILLISECONDS); }); } @@ -157,7 +182,6 @@ public class LiteNode extends BsqNode { // First we request the blocks from a full node @Override protected void startParseBlocks() { - log.info("startParseBlocks"); liteNodeNetworkService.requestBlocks(getStartBlockHeight()); } @@ -199,8 +223,12 @@ public class LiteNode extends BsqNode { runDelayedBatchProcessing(new ArrayList<>(blockList), () -> { - log.debug("Parsing {} blocks took {} seconds.", blockList.size(), (System.currentTimeMillis() - ts) / 1000d); - if (daoStateService.getChainHeight() < bsqWalletService.getBestChainHeight()) { + log.info("runDelayedBatchProcessing Parsing {} blocks took {} seconds.", blockList.size(), + (System.currentTimeMillis() - ts) / 1000d); + // We only request again if wallet is synced, otherwise we would get repeated calls we want to avoid. + // We deal with that case at the setupWalletBestBlockListener method above. + if (walletsSetup.isDownloadComplete() && + daoStateService.getChainHeight() < bsqWalletService.getBestChainHeight()) { liteNodeNetworkService.requestBlocks(getStartBlockHeight()); } else { onParsingComplete.run(); @@ -229,11 +257,12 @@ public class LiteNode extends BsqNode { // We received a new block private void onNewBlockReceived(RawBlock block) { int blockHeight = block.getHeight(); - log.debug("onNewBlockReceived: block at height {}, hash={}", blockHeight, block.getHash()); + log.info("onNewBlockReceived: block at height {}, hash={}. Our DAO chainHeight={}", blockHeight, block.getHash(), chainTipHeight); // We only update chainTipHeight if we get a newer block - if (blockHeight > chainTipHeight) + if (blockHeight > chainTipHeight) { chainTipHeight = blockHeight; + } try { doParseBlock(block); diff --git a/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java b/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java index 0b40e80b96..b8a975b490 100644 --- a/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java +++ b/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java @@ -99,7 +99,7 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen private final Map, RequestBlocksHandler> requestBlocksHandlerMap = new HashMap<>(); private Timer retryTimer; private boolean stopped; - private Set receivedBlocks = new HashSet<>(); + private final Set receivedBlocks = new HashSet<>(); /////////////////////////////////////////////////////////////////////////////////////////// @@ -129,7 +129,6 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen peerManager.addListener(this); } - @SuppressWarnings("Duplicates") public void shutDown() { stopped = true; stopRetryTimer(); @@ -154,7 +153,9 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen .findAny(); if (connectionToSeedNodeOptional.isPresent() && connectionToSeedNodeOptional.get().getPeersNodeAddressOptional().isPresent()) { - requestBlocks(connectionToSeedNodeOptional.get().getPeersNodeAddressOptional().get(), startBlockHeight); + NodeAddress candidate = connectionToSeedNodeOptional.get().getPeersNodeAddressOptional().get(); + seedNodeAddresses.remove(candidate); + requestBlocks(candidate, startBlockHeight); } else { tryWithNewSeedNode(startBlockHeight); } @@ -203,14 +204,18 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen stopRetryTimer(); stopped = true; - tryWithNewSeedNode(lastRequestedBlockHeight); + if (lastRequestedBlockHeight > 0) { + tryWithNewSeedNode(lastRequestedBlockHeight); + } } @Override public void onNewConnectionAfterAllConnectionsLost() { closeAllHandlers(); stopped = false; - tryWithNewSeedNode(lastRequestedBlockHeight); + if (lastRequestedBlockHeight > 0) { + tryWithNewSeedNode(lastRequestedBlockHeight); + } } @Override @@ -218,8 +223,9 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen log.info("onAwakeFromStandby"); closeAllHandlers(); stopped = false; - if (!networkNode.getAllConnections().isEmpty()) + if (!networkNode.getAllConnections().isEmpty() && lastRequestedBlockHeight > 0) { tryWithNewSeedNode(lastRequestedBlockHeight); + } } @@ -234,15 +240,17 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen // We combine blockHash and txId list in case we receive blocks with different transactions. List txIds = newBlockBroadcastMessage.getBlock().getRawTxs().stream().map(BaseTx::getId).collect(Collectors.toList()); String extBlockId = newBlockBroadcastMessage.getBlock().getHash() + ":" + txIds; - if (!receivedBlocks.contains(extBlockId)) { - log.debug("We received a new message from peer {} and broadcast it to our peers. extBlockId={}", - connection.getPeersNodeAddressOptional().orElse(null), extBlockId); - receivedBlocks.add(extBlockId); - broadcaster.broadcast(newBlockBroadcastMessage, connection.getPeersNodeAddressOptional().orElse(null)); - listeners.forEach(listener -> listener.onNewBlockReceived(newBlockBroadcastMessage)); - } else { + + if (receivedBlocks.contains(extBlockId)) { log.debug("We had that message already and do not further broadcast it. extBlockId={}", extBlockId); + return; } + + log.info("We received a NewBlockBroadcastMessage from peer {} and broadcast it to our peers. extBlockId={}", + connection.getPeersNodeAddressOptional().orElse(null), extBlockId); + receivedBlocks.add(extBlockId); + broadcaster.broadcast(newBlockBroadcastMessage, connection.getPeersNodeAddressOptional().orElse(null)); + listeners.forEach(listener -> listener.onNewBlockReceived(newBlockBroadcastMessage)); } } @@ -252,78 +260,86 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen /////////////////////////////////////////////////////////////////////////////////////////// private void requestBlocks(NodeAddress peersNodeAddress, int startBlockHeight) { - if (!stopped) { - final Tuple2 key = new Tuple2<>(peersNodeAddress, startBlockHeight); - if (!requestBlocksHandlerMap.containsKey(key)) { - if (startBlockHeight >= lastReceivedBlockHeight) { - RequestBlocksHandler requestBlocksHandler = new RequestBlocksHandler(networkNode, - peerManager, - peersNodeAddress, - startBlockHeight, - new RequestBlocksHandler.Listener() { - @Override - public void onComplete(GetBlocksResponse getBlocksResponse) { - log.debug("requestBlocksHandler of outbound connection complete. nodeAddress={}", - peersNodeAddress); - stopRetryTimer(); - - // need to remove before listeners are notified as they cause the update call - requestBlocksHandlerMap.remove(key); - // we only notify if our request was latest - if (startBlockHeight >= lastReceivedBlockHeight) { - lastReceivedBlockHeight = startBlockHeight; - - listeners.forEach(listener -> listener.onRequestedBlocksReceived(getBlocksResponse, - () -> { - // After we received the blocks we allow to disconnect seed nodes. - // We delay 20 seconds to allow multiple requests to finish. - UserThread.runAfter(() -> peerManager.setAllowDisconnectSeedNodes(true), 20); - })); - } else { - log.warn("We got a response which is already obsolete because we receive a " + - "response from a request with a higher block height. " + - "This could theoretically happen, but is very unlikely."); - } - } - - @Override - public void onFault(String errorMessage, @Nullable Connection connection) { - log.warn("requestBlocksHandler with outbound connection failed.\n\tnodeAddress={}\n\t" + - "ErrorMessage={}", peersNodeAddress, errorMessage); - - peerManager.handleConnectionFault(peersNodeAddress); - requestBlocksHandlerMap.remove(key); - - listeners.forEach(listener -> listener.onFault(errorMessage, connection)); - - tryWithNewSeedNode(startBlockHeight); - } - }); - requestBlocksHandlerMap.put(key, requestBlocksHandler); - log.info("requestBlocks with startBlockHeight={} from peer {}", startBlockHeight, peersNodeAddress); - requestBlocksHandler.requestBlocks(); - } else { - log.warn("startBlockHeight must not be smaller than lastReceivedBlockHeight. That should never happen." + - "startBlockHeight={},lastReceivedBlockHeight={}", startBlockHeight, lastReceivedBlockHeight); - DevEnv.logErrorAndThrowIfDevMode("startBlockHeight must be larger than lastReceivedBlockHeight. startBlockHeight=" + - startBlockHeight + " / lastReceivedBlockHeight=" + lastReceivedBlockHeight); - } - } else { - log.warn("We have started already a requestDataHandshake for startBlockHeight {} to peer. nodeAddress={}\n" + - "We start a cleanup timer if the handler has not closed by itself in between 2 minutes.", - peersNodeAddress, startBlockHeight); - - UserThread.runAfter(() -> { - if (requestBlocksHandlerMap.containsKey(key)) { - RequestBlocksHandler handler = requestBlocksHandlerMap.get(key); - handler.stop(); - requestBlocksHandlerMap.remove(key); - } - }, CLEANUP_TIMER); - } - } else { + if (stopped) { log.warn("We have stopped already. We ignore that requestData call."); + return; } + + Tuple2 key = new Tuple2<>(peersNodeAddress, startBlockHeight); + if (requestBlocksHandlerMap.containsKey(key)) { + log.warn("We have started already a requestDataHandshake for startBlockHeight {} to peer. nodeAddress={}\n" + + "We start a cleanup timer if the handler has not closed by itself in between 2 minutes.", + peersNodeAddress, startBlockHeight); + + UserThread.runAfter(() -> { + if (requestBlocksHandlerMap.containsKey(key)) { + RequestBlocksHandler handler = requestBlocksHandlerMap.get(key); + handler.stop(); + requestBlocksHandlerMap.remove(key); + } + }, CLEANUP_TIMER); + return; + } + + if (startBlockHeight < lastReceivedBlockHeight) { + log.warn("startBlockHeight must not be smaller than lastReceivedBlockHeight. That should never happen." + + "startBlockHeight={},lastReceivedBlockHeight={}", startBlockHeight, lastReceivedBlockHeight); + DevEnv.logErrorAndThrowIfDevMode("startBlockHeight must be larger than lastReceivedBlockHeight. startBlockHeight=" + + startBlockHeight + " / lastReceivedBlockHeight=" + lastReceivedBlockHeight); + return; + } + + // In case we would have had an earlier request and had set allowDisconnectSeedNodes to true we un-do that + // if we get a repeated request. + peerManager.setAllowDisconnectSeedNodes(false); + RequestBlocksHandler requestBlocksHandler = new RequestBlocksHandler(networkNode, + peerManager, + peersNodeAddress, + startBlockHeight, + new RequestBlocksHandler.Listener() { + @Override + public void onComplete(GetBlocksResponse getBlocksResponse) { + log.info("requestBlocksHandler of outbound connection complete. nodeAddress={}", + peersNodeAddress); + stopRetryTimer(); + + // need to remove before listeners are notified as they cause the update call + requestBlocksHandlerMap.remove(key); + // we only notify if our request was latest + if (startBlockHeight >= lastReceivedBlockHeight) { + lastReceivedBlockHeight = startBlockHeight; + + listeners.forEach(listener -> listener.onRequestedBlocksReceived(getBlocksResponse, + () -> { + // After we received the blocks we allow to disconnect seed nodes. + // We delay 20 seconds to allow multiple requests to finish. + UserThread.runAfter(() -> peerManager.setAllowDisconnectSeedNodes(true), 20); + })); + } else { + log.warn("We got a response which is already obsolete because we received a " + + "response from a request with a higher block height. " + + "This could theoretically happen, but is very unlikely."); + } + } + + @Override + public void onFault(String errorMessage, @Nullable Connection connection) { + log.warn("requestBlocksHandler with outbound connection failed.\n\tnodeAddress={}\n\t" + + "ErrorMessage={}", peersNodeAddress, errorMessage); + + peerManager.handleConnectionFault(peersNodeAddress); + requestBlocksHandlerMap.remove(key); + + listeners.forEach(listener -> listener.onFault(errorMessage, connection)); + + // We allow now to disconnect from that seed. + peerManager.setAllowDisconnectSeedNodes(true); + + tryWithNewSeedNode(startBlockHeight); + } + }); + requestBlocksHandlerMap.put(key, requestBlocksHandler); + requestBlocksHandler.requestBlocks(); } @@ -332,37 +348,40 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen /////////////////////////////////////////////////////////////////////////////////////////// private void tryWithNewSeedNode(int startBlockHeight) { - if (retryTimer == null) { - retryCounter++; - if (retryCounter <= MAX_RETRY) { - retryTimer = UserThread.runAfter(() -> { - stopped = false; - - stopRetryTimer(); - - List list = seedNodeAddresses.stream() - .filter(e -> peerManager.isSeedNode(e) && !peerManager.isSelf(e)) - .collect(Collectors.toList()); - Collections.shuffle(list); - - if (!list.isEmpty()) { - NodeAddress nextCandidate = list.get(0); - seedNodeAddresses.remove(nextCandidate); - log.info("We try requestBlocks with {}", nextCandidate); - requestBlocks(nextCandidate, startBlockHeight); - } else { - log.warn("No more seed nodes available we could try."); - listeners.forEach(Listener::onNoSeedNodeAvailable); - } - }, - RETRY_DELAY_SEC); - } else { - log.warn("We tried {} times but could not connect to a seed node.", retryCounter); - listeners.forEach(Listener::onNoSeedNodeAvailable); - } - } else { + if (retryTimer != null) { log.warn("We have a retry timer already running."); + return; } + + retryCounter++; + + if (retryCounter > MAX_RETRY) { + log.warn("We tried {} times but could not connect to a seed node.", retryCounter); + listeners.forEach(Listener::onNoSeedNodeAvailable); + return; + } + + retryTimer = UserThread.runAfter(() -> { + stopped = false; + + stopRetryTimer(); + + List list = seedNodeAddresses.stream() + .filter(e -> peerManager.isSeedNode(e) && !peerManager.isSelf(e)) + .collect(Collectors.toList()); + Collections.shuffle(list); + + if (!list.isEmpty()) { + NodeAddress nextCandidate = list.get(0); + seedNodeAddresses.remove(nextCandidate); + log.info("We try requestBlocks from {} with startBlockHeight={}", nextCandidate, startBlockHeight); + requestBlocks(nextCandidate, startBlockHeight); + } else { + log.warn("No more seed nodes available we could try."); + listeners.forEach(Listener::onNoSeedNodeAvailable); + } + }, + RETRY_DELAY_SEC); } private void stopRetryTimer() { @@ -386,15 +405,12 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen requestBlocksHandlerMap.entrySet().stream() .filter(e -> e.getKey().first.equals(nodeAddress)) .findAny() - .map(Map.Entry::getValue) - .ifPresent(handler -> { - final Tuple2 key = new Tuple2<>(handler.getNodeAddress(), handler.getStartBlockHeight()); - requestBlocksHandlerMap.get(key).cancel(); - requestBlocksHandlerMap.remove(key); + .ifPresent(e -> { + e.getValue().cancel(); + requestBlocksHandlerMap.remove(e.getKey()); }); } - private void closeAllHandlers() { requestBlocksHandlerMap.values().forEach(RequestBlocksHandler::cancel); requestBlocksHandlerMap.clear(); diff --git a/core/src/main/java/bisq/core/dao/node/lite/network/RequestBlocksHandler.java b/core/src/main/java/bisq/core/dao/node/lite/network/RequestBlocksHandler.java index 60374ffd7d..0b884dc72f 100644 --- a/core/src/main/java/bisq/core/dao/node/lite/network/RequestBlocksHandler.java +++ b/core/src/main/java/bisq/core/dao/node/lite/network/RequestBlocksHandler.java @@ -36,7 +36,9 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; +import java.util.Optional; import java.util.Random; +import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -44,14 +46,12 @@ import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import static com.google.common.base.Preconditions.checkArgument; - /** * Sends a GetBlocksRequest to a full node and listens on corresponding GetBlocksResponse from the full node. */ @Slf4j public class RequestBlocksHandler implements MessageListener { - private static final long TIMEOUT = 120; + private static final long TIMEOUT_MIN = 3; /////////////////////////////////////////////////////////////////////////////////////////// @@ -108,56 +108,55 @@ public class RequestBlocksHandler implements MessageListener { /////////////////////////////////////////////////////////////////////////////////////////// public void requestBlocks() { - if (!stopped) { - GetBlocksRequest getBlocksRequest = new GetBlocksRequest(startBlockHeight, nonce, networkNode.getNodeAddress()); - log.debug("getBlocksRequest " + getBlocksRequest); - if (timeoutTimer == null) { - timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions - if (!stopped) { - String errorMessage = "A timeout occurred when sending getBlocksRequest:" + getBlocksRequest + - " on peersNodeAddress:" + nodeAddress; - log.debug(errorMessage + " / RequestDataHandler=" + RequestBlocksHandler.this); - handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT); - } else { - log.trace("We have stopped already. We ignore that timeoutTimer.run call. " + - "Might be caused by an previous networkNode.sendMessage.onFailure."); - } - }, - TIMEOUT); + if (stopped) { + log.warn("We have stopped already. We ignore that requestData call."); + return; + } + + GetBlocksRequest getBlocksRequest = new GetBlocksRequest(startBlockHeight, nonce, networkNode.getNodeAddress()); + + if (timeoutTimer != null) { + log.warn("We had a timer already running and stop it."); + timeoutTimer.stop(); + } + timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions + if (!stopped) { + String errorMessage = "A timeout occurred when sending getBlocksRequest:" + getBlocksRequest + + " on peersNodeAddress:" + nodeAddress; + log.debug("{} / RequestDataHandler={}", errorMessage, RequestBlocksHandler.this); + handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT); + } else { + log.warn("We have stopped already. We ignore that timeoutTimer.run call. " + + "Might be caused by an previous networkNode.sendMessage.onFailure."); + } + }, + TIMEOUT_MIN, TimeUnit.MINUTES); + + log.info("We request blocks from peer {} from block height {}.", nodeAddress, getBlocksRequest.getFromBlockHeight()); + + networkNode.addMessageListener(this); + + SettableFuture future = networkNode.sendMessage(nodeAddress, getBlocksRequest); + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(Connection connection) { + log.info("Sending of GetBlocksRequest message to peer {} succeeded.", nodeAddress.getFullAddress()); } - log.info("We request blocks from peer {} from block height {}.", nodeAddress, getBlocksRequest.getFromBlockHeight()); - networkNode.addMessageListener(this); - SettableFuture future = networkNode.sendMessage(nodeAddress, getBlocksRequest); - Futures.addCallback(future, new FutureCallback<>() { - @Override - public void onSuccess(Connection connection) { - if (!stopped) { - log.info("Sending of GetBlocksRequest message to peer {} succeeded.", nodeAddress.getFullAddress()); - } else { - log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call." + - "Might be caused by a previous timeout."); - } + @Override + public void onFailure(@NotNull Throwable throwable) { + if (!stopped) { + String errorMessage = "Sending getBlocksRequest to " + nodeAddress + + " failed. That is expected if the peer is offline.\n\t" + + "getBlocksRequest=" + getBlocksRequest + "." + + "\n\tException=" + throwable.getMessage(); + log.error(errorMessage); + handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE); + } else { + log.trace("We have stopped already. We ignore that networkNode.sendMessage.onFailure call."); } - - @Override - public void onFailure(@NotNull Throwable throwable) { - if (!stopped) { - String errorMessage = "Sending getBlocksRequest to " + nodeAddress + - " failed. That is expected if the peer is offline.\n\t" + - "getBlocksRequest=" + getBlocksRequest + "." + - "\n\tException=" + throwable.getMessage(); - log.error(errorMessage); - handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE); - } else { - log.trace("We have stopped already. We ignore that networkNode.sendMessage.onFailure call. " + - "Might be caused by a previous timeout."); - } - } - }, MoreExecutors.directExecutor()); - } else { - log.warn("We have stopped already. We ignore that requestData call."); - } + } + }, MoreExecutors.directExecutor()); } @@ -168,31 +167,36 @@ public class RequestBlocksHandler implements MessageListener { @Override public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { if (networkEnvelope instanceof GetBlocksResponse) { - if (connection.getPeersNodeAddressOptional().isPresent() && connection.getPeersNodeAddressOptional().get().equals(nodeAddress)) { - if (!stopped) { - GetBlocksResponse getBlocksResponse = (GetBlocksResponse) networkEnvelope; - if (getBlocksResponse.getRequestNonce() == nonce) { - stopTimeoutTimer(); - checkArgument(connection.getPeersNodeAddressOptional().isPresent(), - "RequestDataHandler.onMessage: connection.getPeersNodeAddressOptional() must be present " + - "at that moment"); - cleanup(); - log.info("We received from peer {} a BlocksResponse with {} blocks", - nodeAddress.getFullAddress(), getBlocksResponse.getBlocks().size()); - listener.onComplete(getBlocksResponse); - } else { - log.warn("Nonce not matching. That can happen rarely if we get a response after a canceled " + - "handshake (timeout causes connection close but peer might have sent a msg before " + - "connection was closed).\n\t" + - "We drop that message. nonce={} / requestNonce={}", - nonce, getBlocksResponse.getRequestNonce()); - } - } else { - log.warn("We have stopped already. We ignore that onDataRequest call."); - } - } else { - log.warn("We got a message from ourselves. That should never happen."); + if (stopped) { + log.warn("We have stopped already. We ignore that onDataRequest call."); + return; } + + Optional optionalNodeAddress = connection.getPeersNodeAddressOptional(); + if (!optionalNodeAddress.isPresent()) { + log.warn("Peers node address is not present, that is not expected."); + // We do not return here as in case the connection has been created from the peers side we might not + // have the address set. As we check the nonce later we do not care that much for the check if the + // connection address is the same as the one we used. + } else if (!optionalNodeAddress.get().equals(nodeAddress)) { + log.warn("Peers node address is the same we requested. We ignore that message."); + return; + } + + GetBlocksResponse getBlocksResponse = (GetBlocksResponse) networkEnvelope; + if (getBlocksResponse.getRequestNonce() != nonce) { + log.warn("Nonce not matching. That can happen rarely if we get a response after a canceled " + + "handshake (timeout causes connection close but peer might have sent a msg before " + + "connection was closed).\n\t" + + "We drop that message. nonce={} / requestNonce={}", + nonce, getBlocksResponse.getRequestNonce()); + return; + } + + cleanup(); + log.info("We received from peer {} a BlocksResponse with {} blocks", + nodeAddress.getFullAddress(), getBlocksResponse.getBlocks().size()); + listener.onComplete(getBlocksResponse); } } @@ -206,7 +210,9 @@ public class RequestBlocksHandler implements MessageListener { @SuppressWarnings("UnusedParameters") - private void handleFault(String errorMessage, NodeAddress nodeAddress, CloseConnectionReason closeConnectionReason) { + private void handleFault(String errorMessage, + NodeAddress nodeAddress, + CloseConnectionReason closeConnectionReason) { cleanup(); peerManager.handleConnectionFault(nodeAddress); listener.onFault(errorMessage, null);