Merge pull request #4851 from chimp1984/improve-bsq-get-block-request-handling

Improve getBlocks request handling
This commit is contained in:
sqrrm 2020-12-29 13:55:40 +01:00 committed by GitHub
commit 9cc3f687c0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 385 additions and 318 deletions

View file

@ -102,7 +102,7 @@ public final class RepublishGovernanceDataHandler {
connectToNextNode();
} else {
log.warn("We have stopped already. We ignore that timeoutTimer.run call. " +
"Might be caused by an previous networkNode.sendMessage.onFailure.");
"Might be caused by a previous networkNode.sendMessage.onFailure.");
}
},
TIMEOUT);
@ -118,7 +118,7 @@ public final class RepublishGovernanceDataHandler {
stop();
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call." +
"Might be caused by an previous timeout.");
"Might be caused by a previous timeout.");
}
}
@ -133,7 +133,7 @@ public final class RepublishGovernanceDataHandler {
connectToNextNode();
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onFailure call. " +
"Might be caused by an previous timeout.");
"Might be caused by a previous timeout.");
}
}
}, MoreExecutors.directExecutor());

View file

@ -116,7 +116,7 @@ abstract class RequestStateHashesHandler<Req extends GetStateHashesRequest, Res
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
} else {
log.trace("We have stopped already. We ignore that timeoutTimer.run call. " +
"Might be caused by an previous networkNode.sendMessage.onFailure.");
"Might be caused by a previous networkNode.sendMessage.onFailure.");
}
},
TIMEOUT);
@ -134,7 +134,7 @@ abstract class RequestStateHashesHandler<Req extends GetStateHashesRequest, Res
nodeAddress.getFullAddress());
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call." +
"Might be caused by an previous timeout.");
"Might be caused by a previous timeout.");
}
}
@ -149,7 +149,7 @@ abstract class RequestStateHashesHandler<Req extends GetStateHashesRequest, Res
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onFailure call. " +
"Might be caused by an previous timeout.");
"Might be caused by a previous timeout.");
}
}
}, MoreExecutors.directExecutor());

View file

@ -189,7 +189,7 @@ public abstract class BsqNode implements DaoSetupService {
if (chainHeight > genesisBlockHeight)
startBlockHeight = chainHeight + 1;
log.info("Start parse blocks:\n" +
log.info("getStartBlockHeight:\n" +
" Start block height={}\n" +
" Genesis txId={}\n" +
" Genesis block height={}\n" +
@ -223,15 +223,14 @@ public abstract class BsqNode implements DaoSetupService {
// height we have no block but chainHeight is initially set to genesis height (bad design ;-( but a bit tricky
// to change now as it used in many areas.)
if (daoStateService.getBlockAtHeight(rawBlock.getHeight()).isPresent()) {
log.debug("We have already a block with the height of the new block. Height of new block={}", rawBlock.getHeight());
log.info("We have already a block with the height of the new block. Height of new block={}", rawBlock.getHeight());
return Optional.empty();
}
try {
Block block = blockParser.parseBlock(rawBlock);
if (pendingBlocks.contains(rawBlock))
pendingBlocks.remove(rawBlock);
pendingBlocks.remove(rawBlock);
// After parsing we check if we have pending blocks we might have received earlier but which have been
// not connecting from the latest height we had. The list is sorted by height

View file

@ -43,7 +43,7 @@ enum JsonTxOutputType {
INVALID_OUTPUT("Invalid");
@Getter
private String displayString;
private final String displayString;
JsonTxOutputType(String displayString) {
this.displayString = displayString;

View file

@ -40,7 +40,7 @@ enum JsonTxType {
IRREGULAR("Irregular");
@Getter
private String displayString;
private final String displayString;
JsonTxType(String displayString) {
this.displayString = displayString;

View file

@ -136,51 +136,62 @@ public class FullNodeNetworkService implements MessageListener, PeerManager.List
@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if (networkEnvelope instanceof GetBlocksRequest) {
// We received a GetBlocksRequest from a liteNode
if (!stopped) {
final String uid = connection.getUid();
if (!getBlocksRequestHandlers.containsKey(uid)) {
GetBlocksRequestHandler requestHandler = new GetBlocksRequestHandler(networkNode,
daoStateService,
new GetBlocksRequestHandler.Listener() {
@Override
public void onComplete() {
getBlocksRequestHandlers.remove(uid);
}
@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
getBlocksRequestHandlers.remove(uid);
if (!stopped) {
log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" +
"ErrorMessage={}", connection, errorMessage);
peerManager.handleConnectionFault(connection);
} else {
log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call.");
}
}
});
getBlocksRequestHandlers.put(uid, requestHandler);
requestHandler.onGetBlocksRequest((GetBlocksRequest) networkEnvelope, connection);
} else {
log.warn("We have already a GetDataRequestHandler for that connection started. " +
"We start a cleanup timer if the handler has not closed by itself in between 2 minutes.");
UserThread.runAfter(() -> {
if (getBlocksRequestHandlers.containsKey(uid)) {
GetBlocksRequestHandler handler = getBlocksRequestHandlers.get(uid);
handler.stop();
getBlocksRequestHandlers.remove(uid);
}
}, CLEANUP_TIMER);
}
} else {
log.warn("We have stopped already. We ignore that onMessage call.");
}
handleGetBlocksRequest((GetBlocksRequest) networkEnvelope, connection);
} else if (networkEnvelope instanceof RepublishGovernanceDataRequest) {
log.warn("We received a RepublishGovernanceDataRequest and re-published all proposalPayloads and " +
"blindVotePayloads to the P2P network.");
missingDataRequestService.reRepublishAllGovernanceData();
handleRepublishGovernanceDataRequest();
}
}
private void handleGetBlocksRequest(GetBlocksRequest getBlocksRequest, Connection connection) {
if (stopped) {
log.warn("We have stopped already. We ignore that onMessage call.");
return;
}
String uid = connection.getUid();
if (getBlocksRequestHandlers.containsKey(uid)) {
log.warn("We have already a GetDataRequestHandler for that connection started. " +
"We start a cleanup timer if the handler has not closed by itself in between 2 minutes.");
UserThread.runAfter(() -> {
if (getBlocksRequestHandlers.containsKey(uid)) {
GetBlocksRequestHandler handler = getBlocksRequestHandlers.get(uid);
handler.stop();
getBlocksRequestHandlers.remove(uid);
}
}, CLEANUP_TIMER);
return;
}
GetBlocksRequestHandler requestHandler = new GetBlocksRequestHandler(networkNode,
daoStateService,
new GetBlocksRequestHandler.Listener() {
@Override
public void onComplete() {
getBlocksRequestHandlers.remove(uid);
}
@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
getBlocksRequestHandlers.remove(uid);
if (!stopped) {
log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" +
"ErrorMessage={}", connection, errorMessage);
if (connection != null) {
peerManager.handleConnectionFault(connection);
}
} else {
log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call.");
}
}
});
getBlocksRequestHandlers.put(uid, requestHandler);
requestHandler.onGetBlocksRequest(getBlocksRequest, connection);
}
private void handleRepublishGovernanceDataRequest() {
log.warn("We received a RepublishGovernanceDataRequest and re-published all proposalPayloads and " +
"blindVotePayloads to the P2P network.");
missingDataRequestService.reRepublishAllGovernanceData();
}
}

View file

@ -49,7 +49,7 @@ import org.jetbrains.annotations.NotNull;
*/
@Slf4j
class GetBlocksRequestHandler {
private static final long TIMEOUT = 120;
private static final long TIMEOUT_MIN = 3;
///////////////////////////////////////////////////////////////////////////////////////////
@ -89,22 +89,28 @@ class GetBlocksRequestHandler {
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void onGetBlocksRequest(GetBlocksRequest getBlocksRequest, final Connection connection) {
public void onGetBlocksRequest(GetBlocksRequest getBlocksRequest, Connection connection) {
long ts = System.currentTimeMillis();
// We limit number of blocks to 6000 which is about 1.5 month.
List<Block> blocks = new LinkedList<>(daoStateService.getBlocksFromBlockHeight(getBlocksRequest.getFromBlockHeight(), 6000));
List<RawBlock> rawBlocks = blocks.stream().map(RawBlock::fromBlock).collect(Collectors.toList());
GetBlocksResponse getBlocksResponse = new GetBlocksResponse(rawBlocks, getBlocksRequest.getNonce());
log.info("Received GetBlocksRequest from {} for blocks from height {}",
connection.getPeersNodeAddressOptional(), getBlocksRequest.getFromBlockHeight());
if (timeoutTimer == null) {
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
String errorMessage = "A timeout occurred for getBlocksResponse.requestNonce:" +
getBlocksResponse.getRequestNonce() +
" on connection:" + connection;
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, connection);
},
TIMEOUT, TimeUnit.SECONDS);
log.info("Received GetBlocksRequest from {} for blocks from height {}. " +
"Building GetBlocksResponse with {} blocks took {} ms.",
connection.getPeersNodeAddressOptional(), getBlocksRequest.getFromBlockHeight(),
rawBlocks.size(), System.currentTimeMillis() - ts);
if (timeoutTimer != null) {
timeoutTimer.stop();
log.warn("Timeout was already running. We stopped it.");
}
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
String errorMessage = "A timeout occurred for getBlocksResponse.requestNonce:" +
getBlocksResponse.getRequestNonce() +
" on connection: " + connection;
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, connection);
},
TIMEOUT_MIN, TimeUnit.MINUTES);
SettableFuture<Connection> future = networkNode.sendMessage(connection, getBlocksResponse);
Futures.addCallback(future, new FutureCallback<>() {
@ -145,7 +151,7 @@ class GetBlocksRequestHandler {
private void handleFault(String errorMessage, CloseConnectionReason closeConnectionReason, Connection connection) {
if (!stopped) {
log.debug(errorMessage + "\n\tcloseConnectionReason=" + closeConnectionReason);
log.warn("{}, closeConnectionReason={}", errorMessage, closeConnectionReason);
cleanup();
listener.onFault(errorMessage, connection);
} else {

View file

@ -17,6 +17,7 @@
package bisq.core.dao.node.lite;
import bisq.core.btc.setup.WalletsSetup;
import bisq.core.btc.wallet.BsqWalletService;
import bisq.core.dao.node.BsqNode;
import bisq.core.dao.node.explorer.ExportJsonFilesService;
@ -37,6 +38,8 @@ import bisq.common.UserThread;
import com.google.inject.Inject;
import javafx.beans.value.ChangeListener;
import java.util.ArrayList;
import java.util.List;
@ -54,7 +57,9 @@ public class LiteNode extends BsqNode {
private final LiteNodeNetworkService liteNodeNetworkService;
private final BsqWalletService bsqWalletService;
private final WalletsSetup walletsSetup;
private Timer checkForBlockReceivedTimer;
private final ChangeListener<Number> blockDownloadListener;
///////////////////////////////////////////////////////////////////////////////////////////
@ -69,11 +74,19 @@ public class LiteNode extends BsqNode {
P2PService p2PService,
LiteNodeNetworkService liteNodeNetworkService,
BsqWalletService bsqWalletService,
WalletsSetup walletsSetup,
ExportJsonFilesService exportJsonFilesService) {
super(blockParser, daoStateService, daoStateSnapshotService, p2PService, exportJsonFilesService);
this.liteNodeNetworkService = liteNodeNetworkService;
this.bsqWalletService = bsqWalletService;
this.walletsSetup = walletsSetup;
blockDownloadListener = (observable, oldValue, newValue) -> {
if ((double) newValue == 1) {
setupWalletBestBlockListener();
}
};
}
@ -87,7 +100,18 @@ public class LiteNode extends BsqNode {
liteNodeNetworkService.start();
bsqWalletService.addNewBestBlockListener(block -> {
// We wait until the wallet is synced before using it for triggering requests
if (walletsSetup.isDownloadComplete()) {
setupWalletBestBlockListener();
} else {
walletsSetup.downloadPercentageProperty().addListener(blockDownloadListener);
}
}
private void setupWalletBestBlockListener() {
walletsSetup.downloadPercentageProperty().removeListener(blockDownloadListener);
bsqWalletService.addNewBestBlockListener(blockFromWallet -> {
// Check if we are done with parsing
if (!daoStateService.isParseBlockChainComplete())
return;
@ -97,18 +121,18 @@ public class LiteNode extends BsqNode {
checkForBlockReceivedTimer.stop();
}
int height = block.getHeight();
log.info("New block at height {} from bsqWalletService", height);
int walletBlockHeight = blockFromWallet.getHeight();
log.info("New block at height {} from bsqWalletService", walletBlockHeight);
// We expect to receive the new BSQ block from the network shortly after BitcoinJ has been aware of it.
// If we don't receive it we request it manually from seed nodes
checkForBlockReceivedTimer = UserThread.runAfter(() -> {
int chainHeight = daoStateService.getChainHeight();
if (chainHeight < height) {
log.warn("We did not receive a block from the network {} seconds after we saw the new block in BicoinJ. " +
int daoChainHeight = daoStateService.getChainHeight();
if (daoChainHeight < walletBlockHeight) {
log.warn("We did not receive a block from the network {} seconds after we saw the new block in BitcoinJ. " +
"We request from our seed nodes missing blocks from block height {}.",
CHECK_FOR_BLOCK_RECEIVED_DELAY_SEC, chainHeight + 1);
liteNodeNetworkService.requestBlocks(chainHeight + 1);
CHECK_FOR_BLOCK_RECEIVED_DELAY_SEC, daoChainHeight + 1);
liteNodeNetworkService.requestBlocks(daoChainHeight + 1);
}
}, CHECK_FOR_BLOCK_RECEIVED_DELAY_SEC);
});
@ -157,7 +181,6 @@ public class LiteNode extends BsqNode {
// First we request the blocks from a full node
@Override
protected void startParseBlocks() {
log.info("startParseBlocks");
liteNodeNetworkService.requestBlocks(getStartBlockHeight());
}
@ -199,8 +222,12 @@ public class LiteNode extends BsqNode {
runDelayedBatchProcessing(new ArrayList<>(blockList),
() -> {
log.debug("Parsing {} blocks took {} seconds.", blockList.size(), (System.currentTimeMillis() - ts) / 1000d);
if (daoStateService.getChainHeight() < bsqWalletService.getBestChainHeight()) {
log.info("runDelayedBatchProcessing Parsing {} blocks took {} seconds.", blockList.size(),
(System.currentTimeMillis() - ts) / 1000d);
// We only request again if wallet is synced, otherwise we would get repeated calls we want to avoid.
// We deal with that case at the setupWalletBestBlockListener method above.
if (walletsSetup.isDownloadComplete() &&
daoStateService.getChainHeight() < bsqWalletService.getBestChainHeight()) {
liteNodeNetworkService.requestBlocks(getStartBlockHeight());
} else {
onParsingComplete.run();
@ -229,11 +256,13 @@ public class LiteNode extends BsqNode {
// We received a new block
private void onNewBlockReceived(RawBlock block) {
int blockHeight = block.getHeight();
log.debug("onNewBlockReceived: block at height {}, hash={}", blockHeight, block.getHash());
log.info("onNewBlockReceived: block at height {}, hash={}. Our DAO chainHeight={}",
blockHeight, block.getHash(), chainTipHeight);
// We only update chainTipHeight if we get a newer block
if (blockHeight > chainTipHeight)
if (blockHeight > chainTipHeight) {
chainTipHeight = blockHeight;
}
try {
doParseBlock(block);

View file

@ -99,7 +99,7 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen
private final Map<Tuple2<NodeAddress, Integer>, RequestBlocksHandler> requestBlocksHandlerMap = new HashMap<>();
private Timer retryTimer;
private boolean stopped;
private Set<String> receivedBlocks = new HashSet<>();
private final Set<String> receivedBlocks = new HashSet<>();
///////////////////////////////////////////////////////////////////////////////////////////
@ -129,7 +129,6 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen
peerManager.addListener(this);
}
@SuppressWarnings("Duplicates")
public void shutDown() {
stopped = true;
stopRetryTimer();
@ -152,19 +151,21 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen
Optional<Connection> connectionToSeedNodeOptional = networkNode.getConfirmedConnections().stream()
.filter(peerManager::isSeedNode)
.findAny();
if (connectionToSeedNodeOptional.isPresent() &&
connectionToSeedNodeOptional.get().getPeersNodeAddressOptional().isPresent()) {
requestBlocks(connectionToSeedNodeOptional.get().getPeersNodeAddressOptional().get(), startBlockHeight);
} else {
tryWithNewSeedNode(startBlockHeight);
}
connectionToSeedNodeOptional.flatMap(Connection::getPeersNodeAddressOptional)
.ifPresentOrElse(candidate -> {
seedNodeAddresses.remove(candidate);
requestBlocks(candidate, startBlockHeight);
}, () -> {
tryWithNewSeedNode(startBlockHeight);
});
}
public void reset() {
lastRequestedBlockHeight = 0;
lastReceivedBlockHeight = 0;
retryCounter = 0;
requestBlocksHandlerMap.values().forEach(RequestBlocksHandler::cancel);
requestBlocksHandlerMap.values().forEach(RequestBlocksHandler::terminate);
}
@ -202,7 +203,6 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen
closeAllHandlers();
stopRetryTimer();
stopped = true;
tryWithNewSeedNode(lastRequestedBlockHeight);
}
@ -218,8 +218,7 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen
log.info("onAwakeFromStandby");
closeAllHandlers();
stopped = false;
if (!networkNode.getAllConnections().isEmpty())
tryWithNewSeedNode(lastRequestedBlockHeight);
tryWithNewSeedNode(lastRequestedBlockHeight);
}
@ -232,17 +231,20 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen
if (networkEnvelope instanceof NewBlockBroadcastMessage) {
NewBlockBroadcastMessage newBlockBroadcastMessage = (NewBlockBroadcastMessage) networkEnvelope;
// We combine blockHash and txId list in case we receive blocks with different transactions.
List<String> txIds = newBlockBroadcastMessage.getBlock().getRawTxs().stream().map(BaseTx::getId).collect(Collectors.toList());
String extBlockId = newBlockBroadcastMessage.getBlock().getHash() + ":" + txIds;
if (!receivedBlocks.contains(extBlockId)) {
log.debug("We received a new message from peer {} and broadcast it to our peers. extBlockId={}",
connection.getPeersNodeAddressOptional().orElse(null), extBlockId);
receivedBlocks.add(extBlockId);
broadcaster.broadcast(newBlockBroadcastMessage, connection.getPeersNodeAddressOptional().orElse(null));
listeners.forEach(listener -> listener.onNewBlockReceived(newBlockBroadcastMessage));
} else {
log.debug("We had that message already and do not further broadcast it. extBlockId={}", extBlockId);
List<String> txIds = newBlockBroadcastMessage.getBlock().getRawTxs().stream()
.map(BaseTx::getId)
.collect(Collectors.toList());
String blockUid = newBlockBroadcastMessage.getBlock().getHash() + ":" + txIds;
if (receivedBlocks.contains(blockUid)) {
log.debug("We had that message already and do not further broadcast it. blockUid={}", blockUid);
return;
}
log.info("We received a NewBlockBroadcastMessage from peer {} and broadcast it to our peers. blockUid={}",
connection.getPeersNodeAddressOptional().orElse(null), blockUid);
receivedBlocks.add(blockUid);
broadcaster.broadcast(newBlockBroadcastMessage, connection.getPeersNodeAddressOptional().orElse(null));
listeners.forEach(listener -> listener.onNewBlockReceived(newBlockBroadcastMessage));
}
}
@ -252,78 +254,85 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen
///////////////////////////////////////////////////////////////////////////////////////////
private void requestBlocks(NodeAddress peersNodeAddress, int startBlockHeight) {
if (!stopped) {
final Tuple2<NodeAddress, Integer> key = new Tuple2<>(peersNodeAddress, startBlockHeight);
if (!requestBlocksHandlerMap.containsKey(key)) {
if (startBlockHeight >= lastReceivedBlockHeight) {
RequestBlocksHandler requestBlocksHandler = new RequestBlocksHandler(networkNode,
peerManager,
peersNodeAddress,
startBlockHeight,
new RequestBlocksHandler.Listener() {
@Override
public void onComplete(GetBlocksResponse getBlocksResponse) {
log.debug("requestBlocksHandler of outbound connection complete. nodeAddress={}",
peersNodeAddress);
stopRetryTimer();
// need to remove before listeners are notified as they cause the update call
requestBlocksHandlerMap.remove(key);
// we only notify if our request was latest
if (startBlockHeight >= lastReceivedBlockHeight) {
lastReceivedBlockHeight = startBlockHeight;
listeners.forEach(listener -> listener.onRequestedBlocksReceived(getBlocksResponse,
() -> {
// After we received the blocks we allow to disconnect seed nodes.
// We delay 20 seconds to allow multiple requests to finish.
UserThread.runAfter(() -> peerManager.setAllowDisconnectSeedNodes(true), 20);
}));
} else {
log.warn("We got a response which is already obsolete because we receive a " +
"response from a request with a higher block height. " +
"This could theoretically happen, but is very unlikely.");
}
}
@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
log.warn("requestBlocksHandler with outbound connection failed.\n\tnodeAddress={}\n\t" +
"ErrorMessage={}", peersNodeAddress, errorMessage);
peerManager.handleConnectionFault(peersNodeAddress);
requestBlocksHandlerMap.remove(key);
listeners.forEach(listener -> listener.onFault(errorMessage, connection));
tryWithNewSeedNode(startBlockHeight);
}
});
requestBlocksHandlerMap.put(key, requestBlocksHandler);
log.info("requestBlocks with startBlockHeight={} from peer {}", startBlockHeight, peersNodeAddress);
requestBlocksHandler.requestBlocks();
} else {
log.warn("startBlockHeight must not be smaller than lastReceivedBlockHeight. That should never happen." +
"startBlockHeight={},lastReceivedBlockHeight={}", startBlockHeight, lastReceivedBlockHeight);
DevEnv.logErrorAndThrowIfDevMode("startBlockHeight must be larger than lastReceivedBlockHeight. startBlockHeight=" +
startBlockHeight + " / lastReceivedBlockHeight=" + lastReceivedBlockHeight);
}
} else {
log.warn("We have started already a requestDataHandshake for startBlockHeight {} to peer. nodeAddress={}\n" +
"We start a cleanup timer if the handler has not closed by itself in between 2 minutes.",
peersNodeAddress, startBlockHeight);
UserThread.runAfter(() -> {
if (requestBlocksHandlerMap.containsKey(key)) {
RequestBlocksHandler handler = requestBlocksHandlerMap.get(key);
handler.stop();
requestBlocksHandlerMap.remove(key);
}
}, CLEANUP_TIMER);
}
} else {
if (stopped) {
log.warn("We have stopped already. We ignore that requestData call.");
return;
}
Tuple2<NodeAddress, Integer> key = new Tuple2<>(peersNodeAddress, startBlockHeight);
if (requestBlocksHandlerMap.containsKey(key)) {
log.warn("We have started already a requestDataHandshake for startBlockHeight {} to peer. nodeAddress={}\n" +
"We start a cleanup timer if the handler has not closed by itself in between 2 minutes.",
peersNodeAddress, startBlockHeight);
UserThread.runAfter(() -> {
if (requestBlocksHandlerMap.containsKey(key)) {
RequestBlocksHandler handler = requestBlocksHandlerMap.get(key);
handler.terminate();
requestBlocksHandlerMap.remove(key);
}
}, CLEANUP_TIMER);
return;
}
if (startBlockHeight < lastReceivedBlockHeight) {
log.warn("startBlockHeight must not be smaller than lastReceivedBlockHeight. That should never happen." +
"startBlockHeight={},lastReceivedBlockHeight={}", startBlockHeight, lastReceivedBlockHeight);
DevEnv.logErrorAndThrowIfDevMode("startBlockHeight must be larger than lastReceivedBlockHeight. startBlockHeight=" +
startBlockHeight + " / lastReceivedBlockHeight=" + lastReceivedBlockHeight);
return;
}
// In case we would have had an earlier request and had set allowDisconnectSeedNodes to true we un-do that
// if we get a repeated request.
peerManager.setAllowDisconnectSeedNodes(false);
RequestBlocksHandler requestBlocksHandler = new RequestBlocksHandler(networkNode,
peerManager,
peersNodeAddress,
startBlockHeight,
new RequestBlocksHandler.Listener() {
@Override
public void onComplete(GetBlocksResponse getBlocksResponse) {
log.info("requestBlocksHandler to {} completed", peersNodeAddress);
stopRetryTimer();
// need to remove before listeners are notified as they cause the update call
requestBlocksHandlerMap.remove(key);
// we only notify if our request was latest
if (startBlockHeight >= lastReceivedBlockHeight) {
lastReceivedBlockHeight = startBlockHeight;
listeners.forEach(listener -> listener.onRequestedBlocksReceived(getBlocksResponse,
() -> {
// After we received the blocks we allow to disconnect seed nodes.
// We delay 20 seconds to allow multiple requests to finish.
UserThread.runAfter(() -> peerManager.setAllowDisconnectSeedNodes(true), 20);
}));
} else {
log.warn("We got a response which is already obsolete because we received a " +
"response from a request with a higher block height. " +
"This could theoretically happen, but is very unlikely.");
}
}
@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
log.warn("requestBlocksHandler with outbound connection failed.\n\tnodeAddress={}\n\t" +
"ErrorMessage={}", peersNodeAddress, errorMessage);
peerManager.handleConnectionFault(peersNodeAddress);
requestBlocksHandlerMap.remove(key);
listeners.forEach(listener -> listener.onFault(errorMessage, connection));
// We allow now to disconnect from that seed.
peerManager.setAllowDisconnectSeedNodes(true);
tryWithNewSeedNode(startBlockHeight);
}
});
requestBlocksHandlerMap.put(key, requestBlocksHandler);
requestBlocksHandler.requestBlocks();
}
@ -332,37 +341,52 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen
///////////////////////////////////////////////////////////////////////////////////////////
private void tryWithNewSeedNode(int startBlockHeight) {
if (retryTimer == null) {
retryCounter++;
if (retryCounter <= MAX_RETRY) {
retryTimer = UserThread.runAfter(() -> {
stopped = false;
stopRetryTimer();
List<NodeAddress> list = seedNodeAddresses.stream()
.filter(e -> peerManager.isSeedNode(e) && !peerManager.isSelf(e))
.collect(Collectors.toList());
Collections.shuffle(list);
if (!list.isEmpty()) {
NodeAddress nextCandidate = list.get(0);
seedNodeAddresses.remove(nextCandidate);
log.info("We try requestBlocks with {}", nextCandidate);
requestBlocks(nextCandidate, startBlockHeight);
} else {
log.warn("No more seed nodes available we could try.");
listeners.forEach(Listener::onNoSeedNodeAvailable);
}
},
RETRY_DELAY_SEC);
} else {
log.warn("We tried {} times but could not connect to a seed node.", retryCounter);
listeners.forEach(Listener::onNoSeedNodeAvailable);
}
} else {
log.warn("We have a retry timer already running.");
if (networkNode.getAllConnections().isEmpty()) {
return;
}
if (lastRequestedBlockHeight == 0) {
return;
}
if (stopped) {
return;
}
if (retryTimer != null) {
log.warn("We have a retry timer already running.");
return;
}
retryCounter++;
if (retryCounter > MAX_RETRY) {
log.warn("We tried {} times but could not connect to a seed node.", retryCounter);
listeners.forEach(Listener::onNoSeedNodeAvailable);
return;
}
retryTimer = UserThread.runAfter(() -> {
stopped = false;
stopRetryTimer();
List<NodeAddress> list = seedNodeAddresses.stream()
.filter(e -> peerManager.isSeedNode(e) && !peerManager.isSelf(e))
.collect(Collectors.toList());
Collections.shuffle(list);
if (!list.isEmpty()) {
NodeAddress nextCandidate = list.get(0);
seedNodeAddresses.remove(nextCandidate);
log.info("We try requestBlocks from {} with startBlockHeight={}", nextCandidate, startBlockHeight);
requestBlocks(nextCandidate, startBlockHeight);
} else {
log.warn("No more seed nodes available we could try.");
listeners.forEach(Listener::onNoSeedNodeAvailable);
}
},
RETRY_DELAY_SEC);
}
private void stopRetryTimer() {
@ -386,17 +410,14 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen
requestBlocksHandlerMap.entrySet().stream()
.filter(e -> e.getKey().first.equals(nodeAddress))
.findAny()
.map(Map.Entry::getValue)
.ifPresent(handler -> {
final Tuple2<NodeAddress, Integer> key = new Tuple2<>(handler.getNodeAddress(), handler.getStartBlockHeight());
requestBlocksHandlerMap.get(key).cancel();
requestBlocksHandlerMap.remove(key);
.ifPresent(e -> {
e.getValue().terminate();
requestBlocksHandlerMap.remove(e.getKey());
});
}
private void closeAllHandlers() {
requestBlocksHandlerMap.values().forEach(RequestBlocksHandler::cancel);
requestBlocksHandlerMap.values().forEach(RequestBlocksHandler::terminate);
requestBlocksHandlerMap.clear();
}
}

View file

@ -36,7 +36,9 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@ -44,14 +46,12 @@ import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static com.google.common.base.Preconditions.checkArgument;
/**
* Sends a GetBlocksRequest to a full node and listens on corresponding GetBlocksResponse from the full node.
*/
@Slf4j
public class RequestBlocksHandler implements MessageListener {
private static final long TIMEOUT = 120;
private static final long TIMEOUT_MIN = 3;
///////////////////////////////////////////////////////////////////////////////////////////
@ -98,66 +98,61 @@ public class RequestBlocksHandler implements MessageListener {
this.listener = listener;
}
public void cancel() {
cleanup();
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void requestBlocks() {
if (!stopped) {
GetBlocksRequest getBlocksRequest = new GetBlocksRequest(startBlockHeight, nonce, networkNode.getNodeAddress());
log.debug("getBlocksRequest " + getBlocksRequest);
if (timeoutTimer == null) {
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
if (!stopped) {
String errorMessage = "A timeout occurred when sending getBlocksRequest:" + getBlocksRequest +
" on peersNodeAddress:" + nodeAddress;
log.debug(errorMessage + " / RequestDataHandler=" + RequestBlocksHandler.this);
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
} else {
log.trace("We have stopped already. We ignore that timeoutTimer.run call. " +
"Might be caused by an previous networkNode.sendMessage.onFailure.");
}
},
TIMEOUT);
if (stopped) {
log.warn("We have stopped already. We ignore that requestData call.");
return;
}
GetBlocksRequest getBlocksRequest = new GetBlocksRequest(startBlockHeight, nonce, networkNode.getNodeAddress());
if (timeoutTimer != null) {
log.warn("We had a timer already running and stop it.");
timeoutTimer.stop();
}
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
if (!stopped) {
String errorMessage = "A timeout occurred when sending getBlocksRequest:" + getBlocksRequest +
" on peersNodeAddress:" + nodeAddress;
log.debug("{} / RequestDataHandler={}", errorMessage, RequestBlocksHandler.this);
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
} else {
log.warn("We have stopped already. We ignore that timeoutTimer.run call. " +
"Might be caused by a previous networkNode.sendMessage.onFailure.");
}
},
TIMEOUT_MIN, TimeUnit.MINUTES);
log.info("We request blocks from peer {} from block height {}.", nodeAddress, getBlocksRequest.getFromBlockHeight());
networkNode.addMessageListener(this);
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getBlocksRequest);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
log.info("Sending of GetBlocksRequest message to peer {} succeeded.", nodeAddress.getFullAddress());
}
log.info("We request blocks from peer {} from block height {}.", nodeAddress, getBlocksRequest.getFromBlockHeight());
networkNode.addMessageListener(this);
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getBlocksRequest);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
if (!stopped) {
log.info("Sending of GetBlocksRequest message to peer {} succeeded.", nodeAddress.getFullAddress());
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call." +
"Might be caused by a previous timeout.");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
if (!stopped) {
String errorMessage = "Sending getBlocksRequest to " + nodeAddress +
" failed. That is expected if the peer is offline.\n\t" +
"getBlocksRequest=" + getBlocksRequest + "." +
"\n\tException=" + throwable.getMessage();
log.error(errorMessage);
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
} else {
log.warn("We have stopped already. We ignore that networkNode.sendMessage.onFailure call.");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
if (!stopped) {
String errorMessage = "Sending getBlocksRequest to " + nodeAddress +
" failed. That is expected if the peer is offline.\n\t" +
"getBlocksRequest=" + getBlocksRequest + "." +
"\n\tException=" + throwable.getMessage();
log.error(errorMessage);
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onFailure call. " +
"Might be caused by a previous timeout.");
}
}
}, MoreExecutors.directExecutor());
} else {
log.warn("We have stopped already. We ignore that requestData call.");
}
}
}, MoreExecutors.directExecutor());
}
@ -168,56 +163,60 @@ public class RequestBlocksHandler implements MessageListener {
@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if (networkEnvelope instanceof GetBlocksResponse) {
if (connection.getPeersNodeAddressOptional().isPresent() && connection.getPeersNodeAddressOptional().get().equals(nodeAddress)) {
if (!stopped) {
GetBlocksResponse getBlocksResponse = (GetBlocksResponse) networkEnvelope;
if (getBlocksResponse.getRequestNonce() == nonce) {
stopTimeoutTimer();
checkArgument(connection.getPeersNodeAddressOptional().isPresent(),
"RequestDataHandler.onMessage: connection.getPeersNodeAddressOptional() must be present " +
"at that moment");
cleanup();
log.info("We received from peer {} a BlocksResponse with {} blocks",
nodeAddress.getFullAddress(), getBlocksResponse.getBlocks().size());
listener.onComplete(getBlocksResponse);
} else {
log.warn("Nonce not matching. That can happen rarely if we get a response after a canceled " +
"handshake (timeout causes connection close but peer might have sent a msg before " +
"connection was closed).\n\t" +
"We drop that message. nonce={} / requestNonce={}",
nonce, getBlocksResponse.getRequestNonce());
}
} else {
log.warn("We have stopped already. We ignore that onDataRequest call.");
}
} else {
log.warn("We got a message from ourselves. That should never happen.");
if (stopped) {
log.warn("We have stopped already. We ignore that onDataRequest call.");
return;
}
Optional<NodeAddress> optionalNodeAddress = connection.getPeersNodeAddressOptional();
if (!optionalNodeAddress.isPresent()) {
log.warn("Peers node address is not present, that is not expected.");
// We do not return here as in case the connection has been created from the peers side we might not
// have the address set. As we check the nonce later we do not care that much for the check if the
// connection address is the same as the one we used.
} else if (!optionalNodeAddress.get().equals(nodeAddress)) {
log.warn("Peers node address is not the same we used for the request. This is not expected. We ignore that message.");
return;
}
GetBlocksResponse getBlocksResponse = (GetBlocksResponse) networkEnvelope;
if (getBlocksResponse.getRequestNonce() != nonce) {
log.warn("Nonce not matching. That can happen rarely if we get a response after a canceled " +
"handshake (timeout causes connection close but peer might have sent a msg before " +
"connection was closed).\n\t" +
"We drop that message. nonce={} / requestNonce={}",
nonce, getBlocksResponse.getRequestNonce());
return;
}
terminate();
log.info("We received from peer {} a BlocksResponse with {} blocks",
nodeAddress.getFullAddress(), getBlocksResponse.getBlocks().size());
listener.onComplete(getBlocksResponse);
}
}
public void stop() {
cleanup();
public void terminate() {
stopped = true;
networkNode.removeMessageListener(this);
stopTimeoutTimer();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
@SuppressWarnings("UnusedParameters")
private void handleFault(String errorMessage, NodeAddress nodeAddress, CloseConnectionReason closeConnectionReason) {
cleanup();
private void handleFault(String errorMessage,
NodeAddress nodeAddress,
CloseConnectionReason closeConnectionReason) {
terminate();
peerManager.handleConnectionFault(nodeAddress);
listener.onFault(errorMessage, null);
}
private void cleanup() {
stopped = true;
networkNode.removeMessageListener(this);
stopTimeoutTimer();
}
private void stopTimeoutTimer() {
if (timeoutTimer != null) {
timeoutTimer.stop();

View file

@ -27,7 +27,8 @@ import com.google.common.collect.ImmutableList;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import javax.annotation.Nullable;
@ -36,7 +37,8 @@ import javax.annotation.Nullable;
* After parsing it will get cloned to the immutable Tx.
* We don't need to implement the ProtoBuffer methods as it is not persisted or sent over the wire.
*/
@Data
@Getter
@Setter
public class TempTx extends BaseTx {
static TempTx fromRawTx(RawTx rawTx) {
return new TempTx(rawTx.getTxVersion(),

View file

@ -24,7 +24,8 @@ import bisq.core.dao.state.model.blockchain.TxOutputType;
import java.util.Objects;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import javax.annotation.Nullable;
@ -32,7 +33,8 @@ import javax.annotation.Nullable;
* Contains mutable BSQ specific data (TxOutputType) and used only during tx parsing.
* Will get converted to immutable TxOutput after tx parsing is completed.
*/
@Data
@Getter
@Setter
public class TempTxOutput extends BaseTxOutput {
static TempTxOutput fromRawTxOutput(RawTxOutput txOutput) {
return new TempTxOutput(txOutput.getIndex(),
@ -78,6 +80,10 @@ public class TempTxOutput extends BaseTxOutput {
this.unlockBlockHeight = unlockBlockHeight;
}
public boolean isOpReturnOutput() {
// We do not check for pubKeyScript.scriptType.NULL_DATA because that is only set if dumpBlockchainData is true
return getOpReturnData() != null;
}
@Override
public String toString() {
@ -88,12 +94,6 @@ public class TempTxOutput extends BaseTxOutput {
"\n} " + super.toString();
}
public boolean isOpReturnOutput() {
// We do not check for pubKeyScript.scriptType.NULL_DATA because that is only set if dumpBlockchainData is true
return getOpReturnData() != null;
}
// Enums must not be used directly for hashCode or equals as it delivers the Object.hashCode (internal address)!
// The equals and hashCode methods cannot be overwritten in Enums.
@Override

View file

@ -87,9 +87,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
*/
@Slf4j
class TxOutputParser {
private static int ACTIVATE_HARD_FORK_1_HEIGHT_MAINNET = 605000;
private static int ACTIVATE_HARD_FORK_1_HEIGHT_TESTNET = 1583054;
private static int ACTIVATE_HARD_FORK_1_HEIGHT_REGTEST = 1;
private static final int ACTIVATE_HARD_FORK_1_HEIGHT_MAINNET = 605000;
private static final int ACTIVATE_HARD_FORK_1_HEIGHT_TESTNET = 1583054;
private static final int ACTIVATE_HARD_FORK_1_HEIGHT_REGTEST = 1;
private final DaoStateService daoStateService;
// Setters

View file

@ -24,7 +24,7 @@ import lombok.Getter;
@Getter
public class BlockHashNotConnectingException extends Exception {
private RawBlock rawBlock;
private final RawBlock rawBlock;
public BlockHashNotConnectingException(RawBlock rawBlock) {
this.rawBlock = rawBlock;

View file

@ -24,7 +24,7 @@ import lombok.Getter;
@Getter
public class BlockHeightNotConnectingException extends Exception {
private RawBlock rawBlock;
private final RawBlock rawBlock;
public BlockHeightNotConnectingException(RawBlock rawBlock) {
this.rawBlock = rawBlock;

View file

@ -24,7 +24,7 @@ import lombok.Getter;
@Getter
public class RequiredReorgFromSnapshotException extends Exception {
private RawBlock rawBlock;
private final RawBlock rawBlock;
public RequiredReorgFromSnapshotException(RawBlock rawBlock) {
this.rawBlock = rawBlock;

View file

@ -134,7 +134,7 @@ class RequestDataHandler implements MessageListener {
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
} else {
log.trace("We have stopped already. We ignore that timeoutTimer.run call. " +
"Might be caused by an previous networkNode.sendMessage.onFailure.");
"Might be caused by a previous networkNode.sendMessage.onFailure.");
}
},
TIMEOUT);
@ -152,7 +152,7 @@ class RequestDataHandler implements MessageListener {
log.trace("Send {} to {} succeeded.", getDataRequest, nodeAddress);
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call." +
"Might be caused by an previous timeout.");
"Might be caused by a previous timeout.");
}
}
@ -166,7 +166,7 @@ class RequestDataHandler implements MessageListener {
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onFailure call. " +
"Might be caused by an previous timeout.");
"Might be caused by a previous timeout.");
}
}
}, MoreExecutors.directExecutor());