Fix issues in RequestManager. Refactor BsqLiteNode. Add comments, Refactorings.

This commit is contained in:
Manfred Karrer 2018-03-06 22:34:53 -05:00
parent fc59873092
commit ad48986395
No known key found for this signature in database
GPG key ID: 401250966A6B2C46
13 changed files with 222 additions and 212 deletions

View file

@ -58,4 +58,11 @@ public class DaoManager {
bsqNode.onAllServicesInitialized(errorMessageHandler);
}
}
public void shutDown() {
daoPeriodService.shutDown();
voteManager.shutDown();
compensationRequestManager.shutDown();
bsqNode.shutDown();
}
}

View file

@ -25,6 +25,7 @@ import io.bisq.core.dao.blockchain.BsqFullNode;
import io.bisq.core.dao.blockchain.BsqLiteNode;
import io.bisq.core.dao.blockchain.BsqNodeProvider;
import io.bisq.core.dao.blockchain.json.JsonBlockChainExporter;
import io.bisq.core.dao.blockchain.p2p.RequestManager;
import io.bisq.core.dao.blockchain.parse.*;
import io.bisq.core.dao.request.compensation.CompensationRequestManager;
import io.bisq.core.dao.vote.VotingDefaultValues;
@ -44,6 +45,7 @@ public class DaoModule extends AppModule {
protected void configure() {
bind(DaoManager.class).in(Singleton.class);
bind(RequestManager.class).in(Singleton.class);
bind(BsqLiteNode.class).in(Singleton.class);
bind(BsqFullNode.class).in(Singleton.class);
bind(BsqNodeProvider.class).in(Singleton.class);

View file

@ -112,6 +112,10 @@ public class DaoPeriodService {
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void shutDown() {
}
public void onAllServicesInitialized() {
btcWalletService.getChainHeightProperty().addListener((observable, oldValue, newValue) -> {
onChainHeightChanged((int) newValue);

View file

@ -23,20 +23,18 @@ import io.bisq.common.handlers.ErrorMessageHandler;
import io.bisq.core.dao.blockchain.exceptions.BlockNotConnectingException;
import io.bisq.core.dao.blockchain.json.JsonBlockChainExporter;
import io.bisq.core.dao.blockchain.p2p.RequestManager;
import io.bisq.core.dao.blockchain.p2p.messages.GetBsqBlocksResponse;
import io.bisq.core.dao.blockchain.p2p.messages.NewBsqBlockBroadcastMessage;
import io.bisq.core.dao.blockchain.parse.BsqBlockChain;
import io.bisq.core.dao.blockchain.parse.BsqFullNodeExecutor;
import io.bisq.core.dao.blockchain.parse.BsqParser;
import io.bisq.core.dao.blockchain.vo.BsqBlock;
import io.bisq.core.provider.fee.FeeService;
import io.bisq.network.p2p.P2PService;
import io.bisq.network.p2p.network.Connection;
import io.bisq.network.p2p.seed.SeedNodesRepository;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.Nullable;
// We are in UserThread context. We get callbacks from threaded classes which are already mapped to the UserThread.
/**
* Main class for a full node which have Bitcoin Core with rpc running and does the blockchain lookup itself.
* It also provides the BSQ transactions to lit nodes on request.
*/
@Slf4j
public class BsqFullNode extends BsqNode {
@ -56,12 +54,12 @@ public class BsqFullNode extends BsqNode {
BsqBlockChain bsqBlockChain,
JsonBlockChainExporter jsonBlockChainExporter,
FeeService feeService,
SeedNodesRepository seedNodesRepository) {
RequestManager requestManager) {
super(p2PService,
bsqParser,
bsqBlockChain,
feeService,
seedNodesRepository);
requestManager);
this.bsqFullNodeExecutor = bsqFullNodeExecutor;
this.jsonBlockChainExporter = jsonBlockChainExporter;
}
@ -71,22 +69,30 @@ public class BsqFullNode extends BsqNode {
// Public methods
///////////////////////////////////////////////////////////////////////////////////////////
public void shutDown() {
super.shutDown();
jsonBlockChainExporter.shutDown();
}
@Override
public void onAllServicesInitialized(ErrorMessageHandler errorMessageHandler) {
// bsqFullNodeExecutor.setup need to return with result handler before
// super.onAllServicesInitialized(errorMessageHandler) is called
// bsqFullNodeExecutor.setup is and async call.
bsqFullNodeExecutor.setup(() -> {
super.onAllServicesInitialized(errorMessageHandler);
super.onInitialized();
startParseBlocks();
},
throwable -> {
log.error(throwable.toString());
throwable.printStackTrace();
errorMessageHandler.handleErrorMessage("Initializing BsqFullNode failed: Error=" + throwable.toString());
});
}
@Override
protected void parseBlocksWithChainHeadHeight(int startBlockHeight, int genesisBlockHeight, String genesisTxId) {
protected void startParseBlocks() {
parseBlocks(getStartBlockHeight(), genesisBlockHeight, genesisTxId);
}
private void parseBlocks(int startBlockHeight, int genesisBlockHeight, String genesisTxId) {
log.info("parseBlocksWithChainHeadHeight startBlockHeight={}", startBlockHeight);
bsqFullNodeExecutor.requestChainHeadHeight(chainHeadHeight -> parseBlocks(startBlockHeight, genesisBlockHeight, genesisTxId, chainHeadHeight),
throwable -> {
@ -95,8 +101,7 @@ public class BsqFullNode extends BsqNode {
});
}
@Override
protected void parseBlocks(int startBlockHeight, int genesisBlockHeight, String genesisTxId, Integer chainHeadHeight) {
private void parseBlocks(int startBlockHeight, int genesisBlockHeight, String genesisTxId, Integer chainHeadHeight) {
log.info("parseBlocks with from={} with chainHeadHeight={}", startBlockHeight, chainHeadHeight);
if (chainHeadHeight != startBlockHeight) {
if (startBlockHeight <= chainHeadHeight) {
@ -111,7 +116,7 @@ public class BsqFullNode extends BsqNode {
// We also set up the listener in the else main branch where we check
// if we at chainTip, so do nto include here another check as it would
// not trigger the listener registration.
parseBlocksWithChainHeadHeight(chainHeadHeight,
parseBlocks(chainHeadHeight,
genesisBlockHeight,
genesisTxId);
}, throwable -> {
@ -124,10 +129,10 @@ public class BsqFullNode extends BsqNode {
});
} else {
log.warn("We are trying to start with a block which is above the chain height of bitcoin core. We need probably wait longer until bitcoin core has fully synced. We try again after a delay of 1 min.");
UserThread.runAfter(() -> parseBlocksWithChainHeadHeight(startBlockHeight, genesisBlockHeight, genesisTxId), 60);
UserThread.runAfter(() -> parseBlocks(startBlockHeight, genesisBlockHeight, genesisTxId), 60);
}
} else {
// We dont have received new blocks in the meantime so we are completed and we register our handler
// We don't have received new blocks in the meantime so we are completed and we register our handler
onParseBlockchainComplete(genesisBlockHeight, genesisTxId);
}
}
@ -136,55 +141,22 @@ public class BsqFullNode extends BsqNode {
protected void onP2PNetworkReady() {
super.onP2PNetworkReady();
if (requestManager == null && p2pNetworkReady) {
createRequestBlocksManager();
if (parseBlockchainComplete)
addBlockHandler();
}
}
@Override
protected void onParseBlockchainComplete(int genesisBlockHeight, String genesisTxId) {
private void onParseBlockchainComplete(int genesisBlockHeight, String genesisTxId) {
log.info("onParseBlockchainComplete");
parseBlockchainComplete = true;
if (requestManager == null && p2pNetworkReady) {
createRequestBlocksManager();
if (p2pNetworkReady)
addBlockHandler();
bsqBlockChainListeners.forEach(BsqBlockChainListener::onBsqBlockChainChanged);
}
bsqBlockChainListeners.stream().forEach(BsqBlockChainListener::onBsqBlockChainChanged);
}
private void createRequestBlocksManager() {
requestManager = new RequestManager(p2PService.getNetworkNode(),
p2PService.getPeerManager(),
p2PService.getBroadcaster(),
seedNodesRepository.getSeedNodeAddresses(),
bsqBlockChain,
new RequestManager.Listener() {
@Override
public void onBlockReceived(GetBsqBlocksResponse getBsqBlocksResponse) {
}
@Override
public void onNewBsqBlockBroadcastMessage(NewBsqBlockBroadcastMessage newBsqBlockBroadcastMessage) {
}
@Override
public void onNoSeedNodeAvailable() {
}
@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
}
});
}
private void addBlockHandler() {
// We register our handler for new blocks
bsqFullNodeExecutor.addBlockHandler(btcdBlock -> bsqFullNodeExecutor.parseBtcdBlock(btcdBlock,
genesisBlockHeight,
genesisTxId,
@ -203,7 +175,7 @@ public class BsqFullNode extends BsqNode {
protected void onNewBsqBlock(BsqBlock bsqBlock) {
super.onNewBsqBlock(bsqBlock);
jsonBlockChainExporter.maybeExport();
if (parseBlockchainComplete && p2pNetworkReady && requestManager != null)
if (parseBlockchainComplete && p2pNetworkReady)
requestManager.publishNewBlock(bsqBlock);
}
}

View file

@ -31,14 +31,17 @@ import io.bisq.core.dao.blockchain.vo.BsqBlock;
import io.bisq.core.provider.fee.FeeService;
import io.bisq.network.p2p.P2PService;
import io.bisq.network.p2p.network.Connection;
import io.bisq.network.p2p.seed.SeedNodesRepository;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
// We are in UserThread context. We get callbacks from threaded classes which are already mapped to the UserThread.
/**
* Main class for lite nodes which receive the BSQ transactions from a full node (e.g. seed nodes)
*/
@Slf4j
public class BsqLiteNode extends BsqNode {
private final BsqLiteNodeExecutor bsqLiteNodeExecutor;
@ -55,12 +58,12 @@ public class BsqLiteNode extends BsqNode {
BsqLiteNodeExecutor bsqLiteNodeExecutor,
BsqBlockChain bsqBlockChain,
FeeService feeService,
SeedNodesRepository seedNodesRepository) {
RequestManager requestManager) {
super(p2PService,
bsqParser,
bsqBlockChain,
feeService,
seedNodesRepository);
requestManager);
this.bsqLiteNodeExecutor = bsqLiteNodeExecutor;
}
@ -71,70 +74,30 @@ public class BsqLiteNode extends BsqNode {
@Override
public void onAllServicesInitialized(ErrorMessageHandler errorMessageHandler) {
super.onAllServicesInitialized(errorMessageHandler);
super.onInitialized();
}
@Override
protected void onP2PNetworkReady() {
super.onP2PNetworkReady();
requestManager = new RequestManager(p2PService.getNetworkNode(),
p2PService.getPeerManager(),
p2PService.getBroadcaster(),
seedNodesRepository.getSeedNodeAddresses(),
bsqBlockChain,
new RequestManager.Listener() {
requestManager.addListener(new RequestManager.Listener() {
@Override
public void onBlockReceived(GetBsqBlocksResponse getBsqBlocksResponse) {
List<BsqBlock> bsqBlockList = new ArrayList<>(getBsqBlocksResponse.getBsqBlocks());
log.info("received msg with {} items", bsqBlockList.size());
if (bsqBlockList.size() > 0)
log.info("block height of last item: {}", bsqBlockList.get(bsqBlockList.size() - 1).getHeight());
// Be safe and reset all mutable data in case the provider would not have done it
bsqBlockList.stream().forEach(BsqBlock::reset);
bsqLiteNodeExecutor.parseBsqBlocksForLiteNode(bsqBlockList,
genesisBlockHeight,
genesisTxId,
BsqLiteNode.this::onNewBsqBlock,
() -> onParseBlockchainComplete(genesisBlockHeight, genesisTxId), throwable -> {
if (throwable instanceof BlockNotConnectingException) {
startReOrgFromLastSnapshot();
} else {
log.error(throwable.toString());
throwable.printStackTrace();
}
});
public void onRequestedBlocksReceived(GetBsqBlocksResponse getBsqBlocksResponse) {
BsqLiteNode.this.onRequestedBlocksReceived(new ArrayList<>(getBsqBlocksResponse.getBsqBlocks()));
}
@Override
public void onNewBsqBlockBroadcastMessage(NewBsqBlockBroadcastMessage newBsqBlockBroadcastMessage) {
BsqBlock bsqBlock = newBsqBlockBroadcastMessage.getBsqBlock();
// Be safe and reset all mutable data in case the provider would not have done it
bsqBlock.reset();
log.info("received broadcastNewBsqBlock bsqBlock {}", bsqBlock.getHeight());
if (!bsqBlockChain.containsBlock(bsqBlock)) {
bsqLiteNodeExecutor.parseBsqBlockForLiteNode(bsqBlock,
genesisBlockHeight,
genesisTxId,
() -> onNewBsqBlock(bsqBlock), throwable -> {
if (throwable instanceof BlockNotConnectingException) {
startReOrgFromLastSnapshot();
} else {
log.error(throwable.toString());
throwable.printStackTrace();
}
});
}
public void onNewBlockReceived(NewBsqBlockBroadcastMessage newBsqBlockBroadcastMessage) {
BsqLiteNode.this.onNewBlockReceived(newBsqBlockBroadcastMessage.getBsqBlock());
}
@Override
public void onNoSeedNodeAvailable() {
}
@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
}
});
@ -142,19 +105,55 @@ public class BsqLiteNode extends BsqNode {
UserThread.runAfter(this::startParseBlocks, 2);
}
// First we request the blocks from a full node
@Override
protected void parseBlocksWithChainHeadHeight(int startBlockHeight, int genesisBlockHeight, String genesisTxId) {
parseBlocks(startBlockHeight, genesisBlockHeight, genesisTxId, 0);
protected void startParseBlocks() {
requestManager.requestBlocks(getStartBlockHeight());
}
@Override
protected void parseBlocks(int startBlockHeight, int genesisBlockHeight, String genesisTxId, Integer chainHeadHeight) {
requestManager.requestBlocks(startBlockHeight);
// We received the missing blocks
private void onRequestedBlocksReceived(List<BsqBlock> bsqBlockList) {
log.info("onRequestedBlocksReceived: blocks with {} items", bsqBlockList.size());
if (bsqBlockList.size() > 0)
log.info("block height of last item: {}", bsqBlockList.get(bsqBlockList.size() - 1).getHeight());
// We reset all mutable data in case the provider would not have done it.
bsqBlockList.forEach(BsqBlock::reset);
bsqLiteNodeExecutor.parseBlocks(bsqBlockList,
genesisBlockHeight,
genesisTxId,
BsqLiteNode.this::onNewBsqBlock,
this::onParseBlockchainComplete,
getErrorHandler());
}
@Override
protected void onParseBlockchainComplete(int genesisBlockHeight, String genesisTxId) {
// We received a new block
private void onNewBlockReceived(BsqBlock bsqBlock) {
// We reset all mutable data in case the provider would not have done it.
bsqBlock.reset();
log.info("onNewBlockReceived: bsqBlock={}", bsqBlock.getHeight());
if (!bsqBlockChain.containsBlock(bsqBlock)) {
bsqLiteNodeExecutor.parseBlock(bsqBlock,
genesisBlockHeight,
genesisTxId,
() -> onNewBsqBlock(bsqBlock),
getErrorHandler());
}
}
private void onParseBlockchainComplete() {
parseBlockchainComplete = true;
bsqBlockChainListeners.stream().forEach(BsqBlockChainListener::onBsqBlockChainChanged);
bsqBlockChainListeners.forEach(BsqBlockChainListener::onBsqBlockChainChanged);
}
@NotNull
private Consumer<Throwable> getErrorHandler() {
return throwable -> {
if (throwable instanceof BlockNotConnectingException) {
startReOrgFromLastSnapshot();
} else {
log.error(throwable.toString());
throwable.printStackTrace();
}
};
}
}

View file

@ -26,14 +26,17 @@ import io.bisq.core.dao.blockchain.vo.BsqBlock;
import io.bisq.core.provider.fee.FeeService;
import io.bisq.network.p2p.P2PService;
import io.bisq.network.p2p.P2PServiceListener;
import io.bisq.network.p2p.seed.SeedNodesRepository;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
// We are in UserThread context. We get callbacks from threaded classes which are already mapped to the UserThread.
/**
* Base class for the lite and full node.
* <p>
* We are in UserThread context. We get callbacks from threaded classes which are already mapped to the UserThread.
*/
@Slf4j
public abstract class BsqNode {
@ -47,12 +50,11 @@ public abstract class BsqNode {
protected final BsqParser bsqParser;
@SuppressWarnings("WeakerAccess")
protected final BsqBlockChain bsqBlockChain;
protected final SeedNodesRepository seedNodesRepository;
@SuppressWarnings("WeakerAccess")
protected final List<BsqBlockChainListener> bsqBlockChainListeners = new ArrayList<>();
protected final String genesisTxId;
protected final int genesisBlockHeight;
protected RequestManager requestManager;
protected final RequestManager requestManager;
@Getter
protected boolean parseBlockchainComplete;
@ -69,12 +71,12 @@ public abstract class BsqNode {
BsqParser bsqParser,
BsqBlockChain bsqBlockChain,
FeeService feeService,
SeedNodesRepository seedNodesRepository) {
RequestManager requestManager) {
this.p2PService = p2PService;
this.bsqParser = bsqParser;
this.bsqBlockChain = bsqBlockChain;
this.seedNodesRepository = seedNodesRepository;
this.requestManager = requestManager;
genesisTxId = bsqBlockChain.getGenesisTxId();
genesisBlockHeight = bsqBlockChain.getGenesisBlockHeight();
@ -87,10 +89,16 @@ public abstract class BsqNode {
///////////////////////////////////////////////////////////////////////////////////////////
// Public methods
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void onAllServicesInitialized(ErrorMessageHandler errorMessageHandler) {
public void shutDown() {
requestManager.shutDown();
}
public abstract void onAllServicesInitialized(ErrorMessageHandler errorMessageHandler);
public void onInitialized() {
applySnapshot();
log.info("onAllServicesInitialized");
if (p2PService.isBootstrapped()) {
@ -139,7 +147,7 @@ public abstract class BsqNode {
private void applySnapshot() {
bsqBlockChain.applySnapshot();
bsqBlockChainListeners.stream().forEach(BsqBlockChainListener::onBsqBlockChainChanged);
bsqBlockChainListeners.forEach(BsqBlockChainListener::onBsqBlockChainChanged);
}
@SuppressWarnings("WeakerAccess")
@ -147,9 +155,8 @@ public abstract class BsqNode {
p2pNetworkReady = true;
}
@SuppressWarnings("WeakerAccess")
protected void startParseBlocks() {
int startBlockHeight = Math.max(genesisBlockHeight, bsqBlockChain.getChainHeadHeight() + 1);
protected int getStartBlockHeight() {
final int startBlockHeight = Math.max(genesisBlockHeight, bsqBlockChain.getChainHeadHeight() + 1);
log.info("Start parse blocks:\n" +
" Start block height={}\n" +
" Genesis txId={}\n" +
@ -160,22 +167,14 @@ public abstract class BsqNode {
genesisBlockHeight,
bsqBlockChain.getChainHeadHeight());
parseBlocksWithChainHeadHeight(startBlockHeight,
genesisBlockHeight,
genesisTxId);
return startBlockHeight;
}
abstract protected void parseBlocksWithChainHeadHeight(int startBlockHeight, int genesisBlockHeight, String genesisTxId);
abstract protected void parseBlocks(int startBlockHeight, int genesisBlockHeight, String genesisTxId, Integer chainHeadHeight);
abstract protected void onParseBlockchainComplete(int genesisBlockHeight, String genesisTxId);
abstract protected void startParseBlocks();
@SuppressWarnings("WeakerAccess")
protected void onNewBsqBlock(BsqBlock bsqBlock) {
//TODO called at each block at startup parsing. cause a lot of cpu waste at listeners...
// -> make more fine grained callbacks so UI only listens on final results when parsing is complete
bsqBlockChainListeners.stream().forEach(BsqBlockChainListener::onBsqBlockChainChanged);
bsqBlockChainListeners.forEach(BsqBlockChainListener::onBsqBlockChainChanged);
}
@SuppressWarnings("WeakerAccess")

View file

@ -25,7 +25,7 @@ import lombok.extern.slf4j.Slf4j;
import javax.inject.Named;
/**
* Basic wiring of blockchain related services and event listeners
* Returns a bsqFullNode or bsqLiteNode based on the DaoOptionKeys.FULL_DAO_NODE option.
*/
@Slf4j
public class BsqNodeProvider {

View file

@ -15,6 +15,7 @@ import io.bisq.network.p2p.network.Connection;
import io.bisq.network.p2p.network.MessageListener;
import io.bisq.network.p2p.network.NetworkNode;
import io.bisq.network.p2p.peers.PeerManager;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -46,12 +47,14 @@ public class RequestBlocksHandler implements MessageListener {
private final NetworkNode networkNode;
private final PeerManager peerManager;
@Getter
private final NodeAddress nodeAddress;
@Getter
private final int startBlockHeight;
private final Listener listener;
private Timer timeoutTimer;
private final int nonce = new Random().nextInt();
private boolean stopped;
private Connection connection;
private NodeAddress peersNodeAddress;
///////////////////////////////////////////////////////////////////////////////////////////
@ -60,9 +63,13 @@ public class RequestBlocksHandler implements MessageListener {
public RequestBlocksHandler(NetworkNode networkNode,
PeerManager peerManager,
NodeAddress nodeAddress,
int startBlockHeight,
Listener listener) {
this.networkNode = networkNode;
this.peerManager = peerManager;
this.nodeAddress = nodeAddress;
this.startBlockHeight = startBlockHeight;
this.listener = listener;
}
@ -75,10 +82,7 @@ public class RequestBlocksHandler implements MessageListener {
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void requestBlocks(NodeAddress nodeAddress, int startBlockHeight) {
Log.traceCall("nodeAddress=" + nodeAddress);
this.peersNodeAddress = nodeAddress;
public void requestBlocks() {
if (!stopped) {
GetBsqBlocksRequest getBsqBlocksRequest = new GetBsqBlocksRequest(startBlockHeight, nonce);
log.debug("getBsqBlocksRequest " + getBsqBlocksRequest);
@ -86,9 +90,9 @@ public class RequestBlocksHandler implements MessageListener {
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
if (!stopped) {
String errorMessage = "A timeout occurred at sending getBsqBlocksRequest:" + getBsqBlocksRequest +
" on peersNodeAddress:" + peersNodeAddress;
" on peersNodeAddress:" + nodeAddress;
log.debug(errorMessage + " / RequestDataHandler=" + RequestBlocksHandler.this);
handleFault(errorMessage, peersNodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
} else {
log.trace("We have stopped already. We ignore that timeoutTimer.run call. " +
"Might be caused by an previous networkNode.sendMessage.onFailure.");
@ -97,15 +101,14 @@ public class RequestBlocksHandler implements MessageListener {
TIMEOUT);
}
log.debug("We send a {} to peer {}. ", getBsqBlocksRequest.getClass().getSimpleName(), peersNodeAddress);
log.debug("We send a {} to peer {}. ", getBsqBlocksRequest.getClass().getSimpleName(), nodeAddress);
networkNode.addMessageListener(this);
SettableFuture<Connection> future = networkNode.sendMessage(peersNodeAddress, getBsqBlocksRequest);
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getBsqBlocksRequest);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
if (!stopped) {
RequestBlocksHandler.this.connection = connection;
log.trace("Send " + getBsqBlocksRequest + " to " + peersNodeAddress + " succeeded.");
log.trace("Send " + getBsqBlocksRequest + " to " + nodeAddress + " succeeded.");
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call." +
"Might be caused by an previous timeout.");
@ -115,12 +118,12 @@ public class RequestBlocksHandler implements MessageListener {
@Override
public void onFailure(@NotNull Throwable throwable) {
if (!stopped) {
String errorMessage = "Sending getBsqBlocksRequest to " + peersNodeAddress +
String errorMessage = "Sending getBsqBlocksRequest to " + nodeAddress +
" failed. That is expected if the peer is offline.\n\t" +
"getBsqBlocksRequest=" + getBsqBlocksRequest + "." +
"\n\tException=" + throwable.getMessage();
log.error(errorMessage);
handleFault(errorMessage, peersNodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onFailure call. " +
"Might be caused by an previous timeout.");
@ -140,7 +143,7 @@ public class RequestBlocksHandler implements MessageListener {
@Override
public void onMessage(NetworkEnvelope networkEnvelop, Connection connection) {
if (networkEnvelop instanceof GetBsqBlocksResponse) {
if (connection.getPeersNodeAddressOptional().isPresent() && connection.getPeersNodeAddressOptional().get().equals(peersNodeAddress)) {
if (connection.getPeersNodeAddressOptional().isPresent() && connection.getPeersNodeAddressOptional().get().equals(nodeAddress)) {
Log.traceCall(networkEnvelop.toString() + "\n\tconnection=" + connection);
if (!stopped) {
GetBsqBlocksResponse getBsqBlocksResponse = (GetBsqBlocksResponse) networkEnvelop;

View file

@ -15,9 +15,11 @@ import io.bisq.network.p2p.NodeAddress;
import io.bisq.network.p2p.network.*;
import io.bisq.network.p2p.peers.Broadcaster;
import io.bisq.network.p2p.peers.PeerManager;
import io.bisq.network.p2p.seed.SeedNodesRepository;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.Nullable;
import javax.inject.Inject;
import java.util.*;
import java.util.stream.Collectors;
@ -40,9 +42,9 @@ public class RequestManager implements MessageListener, ConnectionListener, Peer
public interface Listener {
void onNoSeedNodeAvailable();
void onBlockReceived(GetBsqBlocksResponse getBsqBlocksResponse);
void onRequestedBlocksReceived(GetBsqBlocksResponse getBsqBlocksResponse);
void onNewBsqBlockBroadcastMessage(NewBsqBlockBroadcastMessage newBsqBlockBroadcastMessage);
void onNewBlockReceived(NewBsqBlockBroadcastMessage newBsqBlockBroadcastMessage);
void onFault(String errorMessage, @Nullable Connection connection);
}
@ -57,7 +59,8 @@ public class RequestManager implements MessageListener, ConnectionListener, Peer
private final Broadcaster broadcaster;
private final BsqBlockChain bsqBlockChain;
private final Collection<NodeAddress> seedNodeAddresses;
private final Listener listener;
private final List<Listener> listeners = new ArrayList<>();
private final Map<Tuple2<NodeAddress, Integer>, RequestBlocksHandler> requestBlocksHandlerMap = new HashMap<>();
private final Map<String, GetBlocksRequestHandler> getBlocksRequestHandlers = new HashMap<>();
@ -69,25 +72,29 @@ public class RequestManager implements MessageListener, ConnectionListener, Peer
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
@Inject
public RequestManager(NetworkNode networkNode,
PeerManager peerManager,
Broadcaster broadcaster,
Set<NodeAddress> seedNodeAddresses,
BsqBlockChain bsqBlockChain,
Listener listener) {
SeedNodesRepository seedNodesRepository,
BsqBlockChain bsqBlockChain) {
this.networkNode = networkNode;
this.peerManager = peerManager;
this.broadcaster = broadcaster;
this.bsqBlockChain = bsqBlockChain;
// seedNodeAddresses can be empty (in case there is only 1 seed node, the seed node starting up has no other seed nodes)
this.seedNodeAddresses = new HashSet<>(seedNodeAddresses);
this.listener = listener;
this.seedNodeAddresses = new HashSet<>(seedNodesRepository.getSeedNodeAddresses());
networkNode.addMessageListener(this);
networkNode.addConnectionListener(this);
peerManager.addListener(this);
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
@SuppressWarnings("Duplicates")
public void shutDown() {
Log.traceCall();
stopped = true;
@ -98,10 +105,9 @@ public class RequestManager implements MessageListener, ConnectionListener, Peer
closeAllHandlers();
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void addListener(Listener listener) {
listeners.add(listener);
}
public void requestBlocks(int startBlockHeight) {
Log.traceCall();
@ -139,13 +145,10 @@ public class RequestManager implements MessageListener, ConnectionListener, Peer
closeHandler(connection);
if (peerManager.isNodeBanned(closeConnectionReason, connection)) {
final NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get();
connection.getPeersNodeAddressOptional().ifPresent(nodeAddress -> {
seedNodeAddresses.remove(nodeAddress);
requestBlocksHandlerMap.entrySet().stream()
.filter(e -> e.getKey().first.equals(nodeAddress))
.findAny()
.map(Map.Entry::getValue)
.ifPresent(requestBlocksHandlerMap::remove);
removeFromRequestBlocksHandlerMap(nodeAddress);
});
}
}
@ -239,7 +242,7 @@ public class RequestManager implements MessageListener, ConnectionListener, Peer
log.warn("We have stopped already. We ignore that onMessage call.");
}
} else if (networkEnvelop instanceof NewBsqBlockBroadcastMessage) {
listener.onNewBsqBlockBroadcastMessage((NewBsqBlockBroadcastMessage) networkEnvelop);
listeners.forEach(listener -> listener.onNewBlockReceived((NewBsqBlockBroadcastMessage) networkEnvelop));
}
}
@ -252,7 +255,10 @@ public class RequestManager implements MessageListener, ConnectionListener, Peer
final Tuple2<NodeAddress, Integer> key = new Tuple2<>(peersNodeAddress, startBlockHeight);
if (!requestBlocksHandlerMap.containsKey(key)) {
if (startBlockHeight >= lastReceivedBlockHeight) {
RequestBlocksHandler requestBlocksHandler = new RequestBlocksHandler(networkNode, peerManager,
RequestBlocksHandler requestBlocksHandler = new RequestBlocksHandler(networkNode,
peerManager,
peersNodeAddress,
startBlockHeight,
new RequestBlocksHandler.Listener() {
@Override
public void onComplete(GetBsqBlocksResponse getBsqBlocksResponse) {
@ -265,7 +271,7 @@ public class RequestManager implements MessageListener, ConnectionListener, Peer
// we only notify if our request was latest
if (startBlockHeight >= lastReceivedBlockHeight) {
lastReceivedBlockHeight = startBlockHeight;
listener.onBlockReceived(getBsqBlocksResponse);
listeners.forEach(listener -> listener.onRequestedBlocksReceived(getBsqBlocksResponse));
} else {
log.warn("We got a response which is already obsolete because we receive a " +
"response from a request with a higher block height. " +
@ -281,13 +287,13 @@ public class RequestManager implements MessageListener, ConnectionListener, Peer
peerManager.handleConnectionFault(peersNodeAddress);
requestBlocksHandlerMap.remove(key);
listener.onFault(errorMessage, connection);
listeners.forEach(listener -> listener.onFault(errorMessage, connection));
tryWithNewSeedNode(startBlockHeight);
}
});
requestBlocksHandlerMap.put(key, requestBlocksHandler);
requestBlocksHandler.requestBlocks(peersNodeAddress, startBlockHeight);
requestBlocksHandler.requestBlocks();
} else {
//TODO check with re-orgs
// FIXME when a lot of blocks are created we get caught here. Seems to be a threading issue...
@ -302,10 +308,10 @@ public class RequestManager implements MessageListener, ConnectionListener, Peer
"We start a cleanup timer if the handler has not closed by itself in between 2 minutes.");
UserThread.runAfter(() -> {
if (requestBlocksHandlerMap.containsKey(peersNodeAddress)) {
RequestBlocksHandler handler = requestBlocksHandlerMap.get(peersNodeAddress);
if (requestBlocksHandlerMap.containsKey(key)) {
RequestBlocksHandler handler = requestBlocksHandlerMap.get(key);
handler.stop();
requestBlocksHandlerMap.remove(peersNodeAddress);
requestBlocksHandlerMap.remove(key);
}
}, CLEANUP_TIMER);
}
@ -342,13 +348,13 @@ public class RequestManager implements MessageListener, ConnectionListener, Peer
requestBlocks(nextCandidate, startBlockHeight);
} else {
log.warn("No more seed nodes available we could try.");
listener.onNoSeedNodeAvailable();
listeners.forEach(Listener::onNoSeedNodeAvailable);
}
},
RETRY_DELAY_SEC);
} else {
log.warn("We tried {} times but could not connect to a seed node.", retryCounter);
listener.onNoSeedNodeAvailable();
listeners.forEach(Listener::onNoSeedNodeAvailable);
}
} else {
log.warn("We have a retry timer already running.");
@ -366,17 +372,27 @@ public class RequestManager implements MessageListener, ConnectionListener, Peer
Optional<NodeAddress> peersNodeAddressOptional = connection.getPeersNodeAddressOptional();
if (peersNodeAddressOptional.isPresent()) {
NodeAddress nodeAddress = peersNodeAddressOptional.get();
if (requestBlocksHandlerMap.containsKey(nodeAddress)) {
requestBlocksHandlerMap.get(nodeAddress).cancel();
requestBlocksHandlerMap.remove(nodeAddress);
}
removeFromRequestBlocksHandlerMap(nodeAddress);
} else {
log.trace("closeHandler: nodeAddress not set in connection " + connection);
}
}
private void removeFromRequestBlocksHandlerMap(NodeAddress nodeAddress) {
requestBlocksHandlerMap.entrySet().stream()
.filter(e -> e.getKey().first.equals(nodeAddress))
.findAny()
.map(Map.Entry::getValue)
.ifPresent(handler -> {
final Tuple2<NodeAddress, Integer> key = new Tuple2<>(handler.getNodeAddress(), handler.getStartBlockHeight());
requestBlocksHandlerMap.get(key).cancel();
requestBlocksHandlerMap.remove(key);
});
}
private void closeAllHandlers() {
requestBlocksHandlerMap.values().stream().forEach(RequestBlocksHandler::cancel);
requestBlocksHandlerMap.values().forEach(RequestBlocksHandler::cancel);
requestBlocksHandlerMap.clear();
}
}

View file

@ -53,7 +53,7 @@ public class BsqLiteNodeExecutor {
this.bsqBlockChain = bsqBlockChain;
}
public void parseBsqBlocksForLiteNode(List<BsqBlock> bsqBlockList,
public void parseBlocks(List<BsqBlock> bsqBlockList,
int genesisBlockHeight,
String genesisTxId,
Consumer<BsqBlock> newBlockHandler,
@ -83,7 +83,7 @@ public class BsqLiteNodeExecutor {
}
// TODO check why it's not handled in the parser
public void parseBsqBlockForLiteNode(BsqBlock bsqBlock,
public void parseBlock(BsqBlock bsqBlock,
int genesisBlockHeight,
String genesisTxId,
ResultHandler resultHandler,

View file

@ -111,6 +111,10 @@ public class CompensationRequestManager implements PersistedDataHost, BsqBlockCh
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void shutDown() {
}
public void onAllServicesInitialized() {
p2PService.addHashSetChangedListener(this);

View file

@ -91,6 +91,10 @@ public class VotingManager implements PersistedDataHost {
this.votingDefaultValues = votingDefaultValues;
}
public void shutDown() {
}
@Override
public void readPersisted() {
if (BisqEnvironment.isDAOActivatedAndBaseCurrencySupportingBsq()) {

View file

@ -45,7 +45,7 @@ import io.bisq.core.arbitration.DisputeManager;
import io.bisq.core.btc.AddressEntryList;
import io.bisq.core.btc.BaseCurrencyNetwork;
import io.bisq.core.btc.wallet.*;
import io.bisq.core.dao.blockchain.json.JsonBlockChainExporter;
import io.bisq.core.dao.DaoManager;
import io.bisq.core.dao.request.compensation.CompensationRequestManager;
import io.bisq.core.dao.vote.VotingManager;
import io.bisq.core.filter.FilterManager;
@ -453,7 +453,7 @@ public class BisqApp extends Application {
if (injector != null) {
injector.getInstance(ArbitratorManager.class).shutDown();
injector.getInstance(TradeManager.class).shutDown();
injector.getInstance(JsonBlockChainExporter.class).shutDown();
injector.getInstance(DaoManager.class).shutDown();
//noinspection CodeBlock2Expr
injector.getInstance(OpenOfferManager.class).shutDown(() -> {
injector.getInstance(P2PService.class).shutDown(() -> {