Improve getBlocks request handling

This commit is contained in:
chimp1984 2020-11-26 11:23:48 -05:00
parent 976caeb14e
commit 4a05e5bf0d
No known key found for this signature in database
GPG key ID: 9801B4EC591F90E3
6 changed files with 333 additions and 265 deletions

View file

@ -189,7 +189,7 @@ public abstract class BsqNode implements DaoSetupService {
if (chainHeight > genesisBlockHeight)
startBlockHeight = chainHeight + 1;
log.info("Start parse blocks:\n" +
log.info("getStartBlockHeight:\n" +
" Start block height={}\n" +
" Genesis txId={}\n" +
" Genesis block height={}\n" +
@ -223,7 +223,7 @@ public abstract class BsqNode implements DaoSetupService {
// height we have no block but chainHeight is initially set to genesis height (bad design ;-( but a bit tricky
// to change now as it used in many areas.)
if (daoStateService.getBlockAtHeight(rawBlock.getHeight()).isPresent()) {
log.debug("We have already a block with the height of the new block. Height of new block={}", rawBlock.getHeight());
log.info("We have already a block with the height of the new block. Height of new block={}", rawBlock.getHeight());
return Optional.empty();
}

View file

@ -136,51 +136,62 @@ public class FullNodeNetworkService implements MessageListener, PeerManager.List
@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if (networkEnvelope instanceof GetBlocksRequest) {
// We received a GetBlocksRequest from a liteNode
if (!stopped) {
final String uid = connection.getUid();
if (!getBlocksRequestHandlers.containsKey(uid)) {
GetBlocksRequestHandler requestHandler = new GetBlocksRequestHandler(networkNode,
daoStateService,
new GetBlocksRequestHandler.Listener() {
@Override
public void onComplete() {
getBlocksRequestHandlers.remove(uid);
}
@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
getBlocksRequestHandlers.remove(uid);
if (!stopped) {
log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" +
"ErrorMessage={}", connection, errorMessage);
peerManager.handleConnectionFault(connection);
} else {
log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call.");
}
}
});
getBlocksRequestHandlers.put(uid, requestHandler);
requestHandler.onGetBlocksRequest((GetBlocksRequest) networkEnvelope, connection);
} else {
log.warn("We have already a GetDataRequestHandler for that connection started. " +
"We start a cleanup timer if the handler has not closed by itself in between 2 minutes.");
UserThread.runAfter(() -> {
if (getBlocksRequestHandlers.containsKey(uid)) {
GetBlocksRequestHandler handler = getBlocksRequestHandlers.get(uid);
handler.stop();
getBlocksRequestHandlers.remove(uid);
}
}, CLEANUP_TIMER);
}
} else {
log.warn("We have stopped already. We ignore that onMessage call.");
}
handleGetBlocksRequest((GetBlocksRequest) networkEnvelope, connection);
} else if (networkEnvelope instanceof RepublishGovernanceDataRequest) {
log.warn("We received a RepublishGovernanceDataRequest and re-published all proposalPayloads and " +
"blindVotePayloads to the P2P network.");
missingDataRequestService.reRepublishAllGovernanceData();
handleRepublishGovernanceDataRequest();
}
}
private void handleGetBlocksRequest(GetBlocksRequest getBlocksRequest, Connection connection) {
if (stopped) {
log.warn("We have stopped already. We ignore that onMessage call.");
return;
}
String uid = connection.getUid();
if (getBlocksRequestHandlers.containsKey(uid)) {
log.warn("We have already a GetDataRequestHandler for that connection started. " +
"We start a cleanup timer if the handler has not closed by itself in between 2 minutes.");
UserThread.runAfter(() -> {
if (getBlocksRequestHandlers.containsKey(uid)) {
GetBlocksRequestHandler handler = getBlocksRequestHandlers.get(uid);
handler.stop();
getBlocksRequestHandlers.remove(uid);
}
}, CLEANUP_TIMER);
return;
}
GetBlocksRequestHandler requestHandler = new GetBlocksRequestHandler(networkNode,
daoStateService,
new GetBlocksRequestHandler.Listener() {
@Override
public void onComplete() {
getBlocksRequestHandlers.remove(uid);
}
@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
getBlocksRequestHandlers.remove(uid);
if (!stopped) {
log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" +
"ErrorMessage={}", connection, errorMessage);
if (connection != null) {
peerManager.handleConnectionFault(connection);
}
} else {
log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call.");
}
}
});
getBlocksRequestHandlers.put(uid, requestHandler);
requestHandler.onGetBlocksRequest(getBlocksRequest, connection);
}
private void handleRepublishGovernanceDataRequest() {
log.warn("We received a RepublishGovernanceDataRequest and re-published all proposalPayloads and " +
"blindVotePayloads to the P2P network.");
missingDataRequestService.reRepublishAllGovernanceData();
}
}

View file

@ -49,7 +49,7 @@ import org.jetbrains.annotations.NotNull;
*/
@Slf4j
class GetBlocksRequestHandler {
private static final long TIMEOUT = 120;
private static final long TIMEOUT_MIN = 3;
///////////////////////////////////////////////////////////////////////////////////////////
@ -89,22 +89,28 @@ class GetBlocksRequestHandler {
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void onGetBlocksRequest(GetBlocksRequest getBlocksRequest, final Connection connection) {
public void onGetBlocksRequest(GetBlocksRequest getBlocksRequest, Connection connection) {
long ts = System.currentTimeMillis();
// We limit number of blocks to 6000 which is about 1.5 month.
List<Block> blocks = new LinkedList<>(daoStateService.getBlocksFromBlockHeight(getBlocksRequest.getFromBlockHeight(), 6000));
List<RawBlock> rawBlocks = blocks.stream().map(RawBlock::fromBlock).collect(Collectors.toList());
GetBlocksResponse getBlocksResponse = new GetBlocksResponse(rawBlocks, getBlocksRequest.getNonce());
log.info("Received GetBlocksRequest from {} for blocks from height {}",
connection.getPeersNodeAddressOptional(), getBlocksRequest.getFromBlockHeight());
if (timeoutTimer == null) {
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
String errorMessage = "A timeout occurred for getBlocksResponse.requestNonce:" +
getBlocksResponse.getRequestNonce() +
" on connection:" + connection;
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, connection);
},
TIMEOUT, TimeUnit.SECONDS);
log.info("Received GetBlocksRequest from {} for blocks from height {}. " +
"Building GetBlocksResponse with {} blocks took {} ms.",
connection.getPeersNodeAddressOptional(), getBlocksRequest.getFromBlockHeight(),
rawBlocks.size(), System.currentTimeMillis() - ts);
if (timeoutTimer != null) {
timeoutTimer.stop();
log.warn("Timeout was already running. We stopped it.");
}
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
String errorMessage = "A timeout occurred for getBlocksResponse.requestNonce:" +
getBlocksResponse.getRequestNonce() +
" on connection: " + connection;
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, connection);
},
TIMEOUT_MIN, TimeUnit.MINUTES);
SettableFuture<Connection> future = networkNode.sendMessage(connection, getBlocksResponse);
Futures.addCallback(future, new FutureCallback<>() {
@ -145,7 +151,7 @@ class GetBlocksRequestHandler {
private void handleFault(String errorMessage, CloseConnectionReason closeConnectionReason, Connection connection) {
if (!stopped) {
log.debug(errorMessage + "\n\tcloseConnectionReason=" + closeConnectionReason);
log.warn("{}, closeConnectionReason={}", errorMessage, closeConnectionReason);
cleanup();
listener.onFault(errorMessage, connection);
} else {

View file

@ -17,6 +17,7 @@
package bisq.core.dao.node.lite;
import bisq.core.btc.setup.WalletsSetup;
import bisq.core.btc.wallet.BsqWalletService;
import bisq.core.dao.node.BsqNode;
import bisq.core.dao.node.explorer.ExportJsonFilesService;
@ -37,8 +38,11 @@ import bisq.common.UserThread;
import com.google.inject.Inject;
import javafx.beans.value.ChangeListener;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@ -54,7 +58,9 @@ public class LiteNode extends BsqNode {
private final LiteNodeNetworkService liteNodeNetworkService;
private final BsqWalletService bsqWalletService;
private final WalletsSetup walletsSetup;
private Timer checkForBlockReceivedTimer;
private ChangeListener<Number> blockDownloadListener;
///////////////////////////////////////////////////////////////////////////////////////////
@ -69,11 +75,13 @@ public class LiteNode extends BsqNode {
P2PService p2PService,
LiteNodeNetworkService liteNodeNetworkService,
BsqWalletService bsqWalletService,
WalletsSetup walletsSetup,
ExportJsonFilesService exportJsonFilesService) {
super(blockParser, daoStateService, daoStateSnapshotService, p2PService, exportJsonFilesService);
this.liteNodeNetworkService = liteNodeNetworkService;
this.bsqWalletService = bsqWalletService;
this.walletsSetup = walletsSetup;
}
@ -87,7 +95,24 @@ public class LiteNode extends BsqNode {
liteNodeNetworkService.start();
bsqWalletService.addNewBestBlockListener(block -> {
// We wait until the wallet is synced before using it trigger requests
if (walletsSetup.isDownloadComplete()) {
setupWalletBestBlockListener();
} else {
if (blockDownloadListener == null) {
blockDownloadListener = (observable, oldValue, newValue) -> {
if ((double) newValue == 1) {
setupWalletBestBlockListener();
UserThread.execute(() -> walletsSetup.downloadPercentageProperty().removeListener(blockDownloadListener));
}
};
walletsSetup.downloadPercentageProperty().addListener(blockDownloadListener);
}
}
}
private void setupWalletBestBlockListener() {
bsqWalletService.addNewBestBlockListener(btcBlock -> {
// Check if we are done with parsing
if (!daoStateService.isParseBlockChainComplete())
return;
@ -97,20 +122,20 @@ public class LiteNode extends BsqNode {
checkForBlockReceivedTimer.stop();
}
int height = block.getHeight();
log.info("New block at height {} from bsqWalletService", height);
int btcWalletHeight = btcBlock.getHeight();
log.error("New block at height {} from bsqWalletService", btcWalletHeight);
// We expect to receive the new BSQ block from the network shortly after BitcoinJ has been aware of it.
// If we don't receive it we request it manually from seed nodes
checkForBlockReceivedTimer = UserThread.runAfter(() -> {
int chainHeight = daoStateService.getChainHeight();
if (chainHeight < height) {
int daoChainHeight = daoStateService.getChainHeight();
if (daoChainHeight < btcWalletHeight) {
log.warn("We did not receive a block from the network {} seconds after we saw the new block in BicoinJ. " +
"We request from our seed nodes missing blocks from block height {}.",
CHECK_FOR_BLOCK_RECEIVED_DELAY_SEC, chainHeight + 1);
liteNodeNetworkService.requestBlocks(chainHeight + 1);
CHECK_FOR_BLOCK_RECEIVED_DELAY_SEC, daoChainHeight + 1);
liteNodeNetworkService.requestBlocks(daoChainHeight + 1);
}
}, CHECK_FOR_BLOCK_RECEIVED_DELAY_SEC);
}, CHECK_FOR_BLOCK_RECEIVED_DELAY_SEC, TimeUnit.MILLISECONDS);
});
}
@ -157,7 +182,6 @@ public class LiteNode extends BsqNode {
// First we request the blocks from a full node
@Override
protected void startParseBlocks() {
log.info("startParseBlocks");
liteNodeNetworkService.requestBlocks(getStartBlockHeight());
}
@ -199,8 +223,12 @@ public class LiteNode extends BsqNode {
runDelayedBatchProcessing(new ArrayList<>(blockList),
() -> {
log.debug("Parsing {} blocks took {} seconds.", blockList.size(), (System.currentTimeMillis() - ts) / 1000d);
if (daoStateService.getChainHeight() < bsqWalletService.getBestChainHeight()) {
log.info("runDelayedBatchProcessing Parsing {} blocks took {} seconds.", blockList.size(),
(System.currentTimeMillis() - ts) / 1000d);
// We only request again if wallet is synced, otherwise we would get repeated calls we want to avoid.
// We deal with that case at the setupWalletBestBlockListener method above.
if (walletsSetup.isDownloadComplete() &&
daoStateService.getChainHeight() < bsqWalletService.getBestChainHeight()) {
liteNodeNetworkService.requestBlocks(getStartBlockHeight());
} else {
onParsingComplete.run();
@ -229,11 +257,12 @@ public class LiteNode extends BsqNode {
// We received a new block
private void onNewBlockReceived(RawBlock block) {
int blockHeight = block.getHeight();
log.debug("onNewBlockReceived: block at height {}, hash={}", blockHeight, block.getHash());
log.info("onNewBlockReceived: block at height {}, hash={}. Our DAO chainHeight={}", blockHeight, block.getHash(), chainTipHeight);
// We only update chainTipHeight if we get a newer block
if (blockHeight > chainTipHeight)
if (blockHeight > chainTipHeight) {
chainTipHeight = blockHeight;
}
try {
doParseBlock(block);

View file

@ -99,7 +99,7 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen
private final Map<Tuple2<NodeAddress, Integer>, RequestBlocksHandler> requestBlocksHandlerMap = new HashMap<>();
private Timer retryTimer;
private boolean stopped;
private Set<String> receivedBlocks = new HashSet<>();
private final Set<String> receivedBlocks = new HashSet<>();
///////////////////////////////////////////////////////////////////////////////////////////
@ -129,7 +129,6 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen
peerManager.addListener(this);
}
@SuppressWarnings("Duplicates")
public void shutDown() {
stopped = true;
stopRetryTimer();
@ -154,7 +153,9 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen
.findAny();
if (connectionToSeedNodeOptional.isPresent() &&
connectionToSeedNodeOptional.get().getPeersNodeAddressOptional().isPresent()) {
requestBlocks(connectionToSeedNodeOptional.get().getPeersNodeAddressOptional().get(), startBlockHeight);
NodeAddress candidate = connectionToSeedNodeOptional.get().getPeersNodeAddressOptional().get();
seedNodeAddresses.remove(candidate);
requestBlocks(candidate, startBlockHeight);
} else {
tryWithNewSeedNode(startBlockHeight);
}
@ -203,14 +204,18 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen
stopRetryTimer();
stopped = true;
tryWithNewSeedNode(lastRequestedBlockHeight);
if (lastRequestedBlockHeight > 0) {
tryWithNewSeedNode(lastRequestedBlockHeight);
}
}
@Override
public void onNewConnectionAfterAllConnectionsLost() {
closeAllHandlers();
stopped = false;
tryWithNewSeedNode(lastRequestedBlockHeight);
if (lastRequestedBlockHeight > 0) {
tryWithNewSeedNode(lastRequestedBlockHeight);
}
}
@Override
@ -218,8 +223,9 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen
log.info("onAwakeFromStandby");
closeAllHandlers();
stopped = false;
if (!networkNode.getAllConnections().isEmpty())
if (!networkNode.getAllConnections().isEmpty() && lastRequestedBlockHeight > 0) {
tryWithNewSeedNode(lastRequestedBlockHeight);
}
}
@ -234,15 +240,17 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen
// We combine blockHash and txId list in case we receive blocks with different transactions.
List<String> txIds = newBlockBroadcastMessage.getBlock().getRawTxs().stream().map(BaseTx::getId).collect(Collectors.toList());
String extBlockId = newBlockBroadcastMessage.getBlock().getHash() + ":" + txIds;
if (!receivedBlocks.contains(extBlockId)) {
log.debug("We received a new message from peer {} and broadcast it to our peers. extBlockId={}",
connection.getPeersNodeAddressOptional().orElse(null), extBlockId);
receivedBlocks.add(extBlockId);
broadcaster.broadcast(newBlockBroadcastMessage, connection.getPeersNodeAddressOptional().orElse(null));
listeners.forEach(listener -> listener.onNewBlockReceived(newBlockBroadcastMessage));
} else {
if (receivedBlocks.contains(extBlockId)) {
log.debug("We had that message already and do not further broadcast it. extBlockId={}", extBlockId);
return;
}
log.info("We received a NewBlockBroadcastMessage from peer {} and broadcast it to our peers. extBlockId={}",
connection.getPeersNodeAddressOptional().orElse(null), extBlockId);
receivedBlocks.add(extBlockId);
broadcaster.broadcast(newBlockBroadcastMessage, connection.getPeersNodeAddressOptional().orElse(null));
listeners.forEach(listener -> listener.onNewBlockReceived(newBlockBroadcastMessage));
}
}
@ -252,78 +260,86 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen
///////////////////////////////////////////////////////////////////////////////////////////
private void requestBlocks(NodeAddress peersNodeAddress, int startBlockHeight) {
if (!stopped) {
final Tuple2<NodeAddress, Integer> key = new Tuple2<>(peersNodeAddress, startBlockHeight);
if (!requestBlocksHandlerMap.containsKey(key)) {
if (startBlockHeight >= lastReceivedBlockHeight) {
RequestBlocksHandler requestBlocksHandler = new RequestBlocksHandler(networkNode,
peerManager,
peersNodeAddress,
startBlockHeight,
new RequestBlocksHandler.Listener() {
@Override
public void onComplete(GetBlocksResponse getBlocksResponse) {
log.debug("requestBlocksHandler of outbound connection complete. nodeAddress={}",
peersNodeAddress);
stopRetryTimer();
// need to remove before listeners are notified as they cause the update call
requestBlocksHandlerMap.remove(key);
// we only notify if our request was latest
if (startBlockHeight >= lastReceivedBlockHeight) {
lastReceivedBlockHeight = startBlockHeight;
listeners.forEach(listener -> listener.onRequestedBlocksReceived(getBlocksResponse,
() -> {
// After we received the blocks we allow to disconnect seed nodes.
// We delay 20 seconds to allow multiple requests to finish.
UserThread.runAfter(() -> peerManager.setAllowDisconnectSeedNodes(true), 20);
}));
} else {
log.warn("We got a response which is already obsolete because we receive a " +
"response from a request with a higher block height. " +
"This could theoretically happen, but is very unlikely.");
}
}
@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
log.warn("requestBlocksHandler with outbound connection failed.\n\tnodeAddress={}\n\t" +
"ErrorMessage={}", peersNodeAddress, errorMessage);
peerManager.handleConnectionFault(peersNodeAddress);
requestBlocksHandlerMap.remove(key);
listeners.forEach(listener -> listener.onFault(errorMessage, connection));
tryWithNewSeedNode(startBlockHeight);
}
});
requestBlocksHandlerMap.put(key, requestBlocksHandler);
log.info("requestBlocks with startBlockHeight={} from peer {}", startBlockHeight, peersNodeAddress);
requestBlocksHandler.requestBlocks();
} else {
log.warn("startBlockHeight must not be smaller than lastReceivedBlockHeight. That should never happen." +
"startBlockHeight={},lastReceivedBlockHeight={}", startBlockHeight, lastReceivedBlockHeight);
DevEnv.logErrorAndThrowIfDevMode("startBlockHeight must be larger than lastReceivedBlockHeight. startBlockHeight=" +
startBlockHeight + " / lastReceivedBlockHeight=" + lastReceivedBlockHeight);
}
} else {
log.warn("We have started already a requestDataHandshake for startBlockHeight {} to peer. nodeAddress={}\n" +
"We start a cleanup timer if the handler has not closed by itself in between 2 minutes.",
peersNodeAddress, startBlockHeight);
UserThread.runAfter(() -> {
if (requestBlocksHandlerMap.containsKey(key)) {
RequestBlocksHandler handler = requestBlocksHandlerMap.get(key);
handler.stop();
requestBlocksHandlerMap.remove(key);
}
}, CLEANUP_TIMER);
}
} else {
if (stopped) {
log.warn("We have stopped already. We ignore that requestData call.");
return;
}
Tuple2<NodeAddress, Integer> key = new Tuple2<>(peersNodeAddress, startBlockHeight);
if (requestBlocksHandlerMap.containsKey(key)) {
log.warn("We have started already a requestDataHandshake for startBlockHeight {} to peer. nodeAddress={}\n" +
"We start a cleanup timer if the handler has not closed by itself in between 2 minutes.",
peersNodeAddress, startBlockHeight);
UserThread.runAfter(() -> {
if (requestBlocksHandlerMap.containsKey(key)) {
RequestBlocksHandler handler = requestBlocksHandlerMap.get(key);
handler.stop();
requestBlocksHandlerMap.remove(key);
}
}, CLEANUP_TIMER);
return;
}
if (startBlockHeight < lastReceivedBlockHeight) {
log.warn("startBlockHeight must not be smaller than lastReceivedBlockHeight. That should never happen." +
"startBlockHeight={},lastReceivedBlockHeight={}", startBlockHeight, lastReceivedBlockHeight);
DevEnv.logErrorAndThrowIfDevMode("startBlockHeight must be larger than lastReceivedBlockHeight. startBlockHeight=" +
startBlockHeight + " / lastReceivedBlockHeight=" + lastReceivedBlockHeight);
return;
}
// In case we would have had an earlier request and had set allowDisconnectSeedNodes to true we un-do that
// if we get a repeated request.
peerManager.setAllowDisconnectSeedNodes(false);
RequestBlocksHandler requestBlocksHandler = new RequestBlocksHandler(networkNode,
peerManager,
peersNodeAddress,
startBlockHeight,
new RequestBlocksHandler.Listener() {
@Override
public void onComplete(GetBlocksResponse getBlocksResponse) {
log.info("requestBlocksHandler of outbound connection complete. nodeAddress={}",
peersNodeAddress);
stopRetryTimer();
// need to remove before listeners are notified as they cause the update call
requestBlocksHandlerMap.remove(key);
// we only notify if our request was latest
if (startBlockHeight >= lastReceivedBlockHeight) {
lastReceivedBlockHeight = startBlockHeight;
listeners.forEach(listener -> listener.onRequestedBlocksReceived(getBlocksResponse,
() -> {
// After we received the blocks we allow to disconnect seed nodes.
// We delay 20 seconds to allow multiple requests to finish.
UserThread.runAfter(() -> peerManager.setAllowDisconnectSeedNodes(true), 20);
}));
} else {
log.warn("We got a response which is already obsolete because we received a " +
"response from a request with a higher block height. " +
"This could theoretically happen, but is very unlikely.");
}
}
@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
log.warn("requestBlocksHandler with outbound connection failed.\n\tnodeAddress={}\n\t" +
"ErrorMessage={}", peersNodeAddress, errorMessage);
peerManager.handleConnectionFault(peersNodeAddress);
requestBlocksHandlerMap.remove(key);
listeners.forEach(listener -> listener.onFault(errorMessage, connection));
// We allow now to disconnect from that seed.
peerManager.setAllowDisconnectSeedNodes(true);
tryWithNewSeedNode(startBlockHeight);
}
});
requestBlocksHandlerMap.put(key, requestBlocksHandler);
requestBlocksHandler.requestBlocks();
}
@ -332,37 +348,40 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen
///////////////////////////////////////////////////////////////////////////////////////////
private void tryWithNewSeedNode(int startBlockHeight) {
if (retryTimer == null) {
retryCounter++;
if (retryCounter <= MAX_RETRY) {
retryTimer = UserThread.runAfter(() -> {
stopped = false;
stopRetryTimer();
List<NodeAddress> list = seedNodeAddresses.stream()
.filter(e -> peerManager.isSeedNode(e) && !peerManager.isSelf(e))
.collect(Collectors.toList());
Collections.shuffle(list);
if (!list.isEmpty()) {
NodeAddress nextCandidate = list.get(0);
seedNodeAddresses.remove(nextCandidate);
log.info("We try requestBlocks with {}", nextCandidate);
requestBlocks(nextCandidate, startBlockHeight);
} else {
log.warn("No more seed nodes available we could try.");
listeners.forEach(Listener::onNoSeedNodeAvailable);
}
},
RETRY_DELAY_SEC);
} else {
log.warn("We tried {} times but could not connect to a seed node.", retryCounter);
listeners.forEach(Listener::onNoSeedNodeAvailable);
}
} else {
if (retryTimer != null) {
log.warn("We have a retry timer already running.");
return;
}
retryCounter++;
if (retryCounter > MAX_RETRY) {
log.warn("We tried {} times but could not connect to a seed node.", retryCounter);
listeners.forEach(Listener::onNoSeedNodeAvailable);
return;
}
retryTimer = UserThread.runAfter(() -> {
stopped = false;
stopRetryTimer();
List<NodeAddress> list = seedNodeAddresses.stream()
.filter(e -> peerManager.isSeedNode(e) && !peerManager.isSelf(e))
.collect(Collectors.toList());
Collections.shuffle(list);
if (!list.isEmpty()) {
NodeAddress nextCandidate = list.get(0);
seedNodeAddresses.remove(nextCandidate);
log.info("We try requestBlocks from {} with startBlockHeight={}", nextCandidate, startBlockHeight);
requestBlocks(nextCandidate, startBlockHeight);
} else {
log.warn("No more seed nodes available we could try.");
listeners.forEach(Listener::onNoSeedNodeAvailable);
}
},
RETRY_DELAY_SEC);
}
private void stopRetryTimer() {
@ -386,15 +405,12 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen
requestBlocksHandlerMap.entrySet().stream()
.filter(e -> e.getKey().first.equals(nodeAddress))
.findAny()
.map(Map.Entry::getValue)
.ifPresent(handler -> {
final Tuple2<NodeAddress, Integer> key = new Tuple2<>(handler.getNodeAddress(), handler.getStartBlockHeight());
requestBlocksHandlerMap.get(key).cancel();
requestBlocksHandlerMap.remove(key);
.ifPresent(e -> {
e.getValue().cancel();
requestBlocksHandlerMap.remove(e.getKey());
});
}
private void closeAllHandlers() {
requestBlocksHandlerMap.values().forEach(RequestBlocksHandler::cancel);
requestBlocksHandlerMap.clear();

View file

@ -36,7 +36,9 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@ -44,14 +46,12 @@ import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static com.google.common.base.Preconditions.checkArgument;
/**
* Sends a GetBlocksRequest to a full node and listens on corresponding GetBlocksResponse from the full node.
*/
@Slf4j
public class RequestBlocksHandler implements MessageListener {
private static final long TIMEOUT = 120;
private static final long TIMEOUT_MIN = 3;
///////////////////////////////////////////////////////////////////////////////////////////
@ -108,56 +108,55 @@ public class RequestBlocksHandler implements MessageListener {
///////////////////////////////////////////////////////////////////////////////////////////
public void requestBlocks() {
if (!stopped) {
GetBlocksRequest getBlocksRequest = new GetBlocksRequest(startBlockHeight, nonce, networkNode.getNodeAddress());
log.debug("getBlocksRequest " + getBlocksRequest);
if (timeoutTimer == null) {
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
if (!stopped) {
String errorMessage = "A timeout occurred when sending getBlocksRequest:" + getBlocksRequest +
" on peersNodeAddress:" + nodeAddress;
log.debug(errorMessage + " / RequestDataHandler=" + RequestBlocksHandler.this);
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
} else {
log.trace("We have stopped already. We ignore that timeoutTimer.run call. " +
"Might be caused by an previous networkNode.sendMessage.onFailure.");
}
},
TIMEOUT);
if (stopped) {
log.warn("We have stopped already. We ignore that requestData call.");
return;
}
GetBlocksRequest getBlocksRequest = new GetBlocksRequest(startBlockHeight, nonce, networkNode.getNodeAddress());
if (timeoutTimer != null) {
log.warn("We had a timer already running and stop it.");
timeoutTimer.stop();
}
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
if (!stopped) {
String errorMessage = "A timeout occurred when sending getBlocksRequest:" + getBlocksRequest +
" on peersNodeAddress:" + nodeAddress;
log.debug("{} / RequestDataHandler={}", errorMessage, RequestBlocksHandler.this);
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
} else {
log.warn("We have stopped already. We ignore that timeoutTimer.run call. " +
"Might be caused by an previous networkNode.sendMessage.onFailure.");
}
},
TIMEOUT_MIN, TimeUnit.MINUTES);
log.info("We request blocks from peer {} from block height {}.", nodeAddress, getBlocksRequest.getFromBlockHeight());
networkNode.addMessageListener(this);
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getBlocksRequest);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
log.info("Sending of GetBlocksRequest message to peer {} succeeded.", nodeAddress.getFullAddress());
}
log.info("We request blocks from peer {} from block height {}.", nodeAddress, getBlocksRequest.getFromBlockHeight());
networkNode.addMessageListener(this);
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getBlocksRequest);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
if (!stopped) {
log.info("Sending of GetBlocksRequest message to peer {} succeeded.", nodeAddress.getFullAddress());
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call." +
"Might be caused by a previous timeout.");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
if (!stopped) {
String errorMessage = "Sending getBlocksRequest to " + nodeAddress +
" failed. That is expected if the peer is offline.\n\t" +
"getBlocksRequest=" + getBlocksRequest + "." +
"\n\tException=" + throwable.getMessage();
log.error(errorMessage);
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onFailure call.");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
if (!stopped) {
String errorMessage = "Sending getBlocksRequest to " + nodeAddress +
" failed. That is expected if the peer is offline.\n\t" +
"getBlocksRequest=" + getBlocksRequest + "." +
"\n\tException=" + throwable.getMessage();
log.error(errorMessage);
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onFailure call. " +
"Might be caused by a previous timeout.");
}
}
}, MoreExecutors.directExecutor());
} else {
log.warn("We have stopped already. We ignore that requestData call.");
}
}
}, MoreExecutors.directExecutor());
}
@ -168,31 +167,36 @@ public class RequestBlocksHandler implements MessageListener {
@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if (networkEnvelope instanceof GetBlocksResponse) {
if (connection.getPeersNodeAddressOptional().isPresent() && connection.getPeersNodeAddressOptional().get().equals(nodeAddress)) {
if (!stopped) {
GetBlocksResponse getBlocksResponse = (GetBlocksResponse) networkEnvelope;
if (getBlocksResponse.getRequestNonce() == nonce) {
stopTimeoutTimer();
checkArgument(connection.getPeersNodeAddressOptional().isPresent(),
"RequestDataHandler.onMessage: connection.getPeersNodeAddressOptional() must be present " +
"at that moment");
cleanup();
log.info("We received from peer {} a BlocksResponse with {} blocks",
nodeAddress.getFullAddress(), getBlocksResponse.getBlocks().size());
listener.onComplete(getBlocksResponse);
} else {
log.warn("Nonce not matching. That can happen rarely if we get a response after a canceled " +
"handshake (timeout causes connection close but peer might have sent a msg before " +
"connection was closed).\n\t" +
"We drop that message. nonce={} / requestNonce={}",
nonce, getBlocksResponse.getRequestNonce());
}
} else {
log.warn("We have stopped already. We ignore that onDataRequest call.");
}
} else {
log.warn("We got a message from ourselves. That should never happen.");
if (stopped) {
log.warn("We have stopped already. We ignore that onDataRequest call.");
return;
}
Optional<NodeAddress> optionalNodeAddress = connection.getPeersNodeAddressOptional();
if (!optionalNodeAddress.isPresent()) {
log.warn("Peers node address is not present, that is not expected.");
// We do not return here as in case the connection has been created from the peers side we might not
// have the address set. As we check the nonce later we do not care that much for the check if the
// connection address is the same as the one we used.
} else if (!optionalNodeAddress.get().equals(nodeAddress)) {
log.warn("Peers node address is the same we requested. We ignore that message.");
return;
}
GetBlocksResponse getBlocksResponse = (GetBlocksResponse) networkEnvelope;
if (getBlocksResponse.getRequestNonce() != nonce) {
log.warn("Nonce not matching. That can happen rarely if we get a response after a canceled " +
"handshake (timeout causes connection close but peer might have sent a msg before " +
"connection was closed).\n\t" +
"We drop that message. nonce={} / requestNonce={}",
nonce, getBlocksResponse.getRequestNonce());
return;
}
cleanup();
log.info("We received from peer {} a BlocksResponse with {} blocks",
nodeAddress.getFullAddress(), getBlocksResponse.getBlocks().size());
listener.onComplete(getBlocksResponse);
}
}
@ -206,7 +210,9 @@ public class RequestBlocksHandler implements MessageListener {
@SuppressWarnings("UnusedParameters")
private void handleFault(String errorMessage, NodeAddress nodeAddress, CloseConnectionReason closeConnectionReason) {
private void handleFault(String errorMessage,
NodeAddress nodeAddress,
CloseConnectionReason closeConnectionReason) {
cleanup();
peerManager.handleConnectionFault(nodeAddress);
listener.onFault(errorMessage, null);