Merge pull request #3178 from chimp1984/Improve-logging

Improve logging
This commit is contained in:
Christoph Atteneder 2019-09-02 09:18:16 +02:00 committed by GitHub
commit 800b677730
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 117 additions and 86 deletions

View file

@ -190,7 +190,7 @@ public class FileManager<T extends PersistableEnvelope> {
private void saveNowInternal(T persistable) { private void saveNowInternal(T persistable) {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
saveToFile(persistable, dir, storageFile); saveToFile(persistable, dir, storageFile);
log.trace("Save {} completed in {} msec", storageFile, System.currentTimeMillis() - now); log.debug("Save {} completed in {} msec", storageFile, System.currentTimeMillis() - now);
} }
private synchronized void saveToFile(T persistable, File dir, File storageFile) { private synchronized void saveToFile(T persistable, File dir, File storageFile) {

View file

@ -281,12 +281,14 @@ public abstract class BisqExecutable implements GracefulShutDownHandler, BisqSet
injector.getInstance(TradeManager.class).shutDown(); injector.getInstance(TradeManager.class).shutDown();
injector.getInstance(DaoSetup.class).shutDown(); injector.getInstance(DaoSetup.class).shutDown();
injector.getInstance(OpenOfferManager.class).shutDown(() -> { injector.getInstance(OpenOfferManager.class).shutDown(() -> {
log.info("OpenOfferManager shutdown completed");
injector.getInstance(P2PService.class).shutDown(() -> { injector.getInstance(P2PService.class).shutDown(() -> {
log.info("P2PService shutdown completed");
injector.getInstance(WalletsSetup.class).shutDownComplete.addListener((ov, o, n) -> { injector.getInstance(WalletsSetup.class).shutDownComplete.addListener((ov, o, n) -> {
log.info("WalletsSetup shutdown completed");
module.close(injector); module.close(injector);
log.debug("Graceful shutdown completed");
resultHandler.handleResult(); resultHandler.handleResult();
log.info("Graceful shutdown completed. Exiting now.");
System.exit(0); System.exit(0);
}); });
injector.getInstance(WalletsSetup.class).shutDown(); injector.getInstance(WalletsSetup.class).shutDown();

View file

@ -289,6 +289,11 @@ public class BisqSetup {
} }
public void start() { public void start() {
if (log.isDebugEnabled()) {
UserThread.runPeriodically(() -> {
log.debug("1 second heartbeat");
}, 1);
}
maybeReSyncSPVChain(); maybeReSyncSPVChain();
maybeShowTac(); maybeShowTac();
} }

View file

@ -79,8 +79,6 @@ public final class DisputeList implements PersistableEnvelope, PersistedDataHost
public static DisputeList fromProto(protobuf.DisputeList proto, public static DisputeList fromProto(protobuf.DisputeList proto,
CoreProtoResolver coreProtoResolver, CoreProtoResolver coreProtoResolver,
Storage<DisputeList> storage) { Storage<DisputeList> storage) {
log.debug("DisputeList fromProto of {} ", proto);
List<Dispute> list = proto.getDisputeList().stream() List<Dispute> list = proto.getDisputeList().stream()
.map(disputeProto -> Dispute.fromProto(disputeProto, coreProtoResolver)) .map(disputeProto -> Dispute.fromProto(disputeProto, coreProtoResolver))
.collect(Collectors.toList()); .collect(Collectors.toList());

View file

@ -114,7 +114,7 @@ public abstract class ProposalValidator implements ConsensusCritical {
if (isTxConfirmed) { if (isTxConfirmed) {
int txHeight = optionalTx.get().getBlockHeight(); int txHeight = optionalTx.get().getBlockHeight();
if (!periodService.isTxInCorrectCycle(txHeight, chainHeight)) { if (!periodService.isTxInCorrectCycle(txHeight, chainHeight)) {
log.debug("Tx is not in current cycle. proposal.getTxId()={}", proposal.getTxId()); log.trace("Tx is not in current cycle. proposal.getTxId()={}", proposal.getTxId());
return false; return false;
} }
if (!periodService.isInPhase(txHeight, DaoPhase.Phase.PROPOSAL)) { if (!periodService.isInPhase(txHeight, DaoPhase.Phase.PROPOSAL)) {

View file

@ -157,7 +157,14 @@ public class BlindVoteStateMonitoringService implements DaoSetupService, DaoStat
System.currentTimeMillis() - ts); System.currentTimeMillis() - ts);
} }
} }
maybeUpdateHashChain(blockHeight);
long ts = System.currentTimeMillis();
boolean updated = maybeUpdateHashChain(blockHeight);
if (updated) {
log.info("updateHashChain for block {} took {} ms",
blockHeight,
System.currentTimeMillis() - ts);
}
} }
@SuppressWarnings("Duplicates") @SuppressWarnings("Duplicates")
@ -241,11 +248,11 @@ public class BlindVoteStateMonitoringService implements DaoSetupService, DaoStat
// Private // Private
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private void maybeUpdateHashChain(int blockHeight) { private boolean maybeUpdateHashChain(int blockHeight) {
// We use first block in blind vote phase to create the hash of our blindVotes. We prefer to wait as long as // We use first block in blind vote phase to create the hash of our blindVotes. We prefer to wait as long as
// possible to increase the chance that we have received all blindVotes. // possible to increase the chance that we have received all blindVotes.
if (!isFirstBlockOfBlindVotePhase(blockHeight)) { if (!isFirstBlockOfBlindVotePhase(blockHeight)) {
return; return false;
} }
periodService.getCycle(blockHeight).ifPresent(cycle -> { periodService.getCycle(blockHeight).ifPresent(cycle -> {
@ -281,9 +288,12 @@ public class BlindVoteStateMonitoringService implements DaoSetupService, DaoStat
UserThread.runAfter(() -> blindVoteStateNetworkService.broadcastMyStateHash(myBlindVoteStateHash), delayInSec); UserThread.runAfter(() -> blindVoteStateNetworkService.broadcastMyStateHash(myBlindVoteStateHash), delayInSec);
} }
}); });
return true;
} }
private boolean processPeersBlindVoteStateHash(BlindVoteStateHash blindVoteStateHash, Optional<NodeAddress> peersNodeAddress, boolean notifyListeners) { private boolean processPeersBlindVoteStateHash(BlindVoteStateHash blindVoteStateHash,
Optional<NodeAddress> peersNodeAddress,
boolean notifyListeners) {
AtomicBoolean changed = new AtomicBoolean(false); AtomicBoolean changed = new AtomicBoolean(false);
AtomicBoolean inConflictWithNonSeedNode = new AtomicBoolean(this.isInConflictWithNonSeedNode); AtomicBoolean inConflictWithNonSeedNode = new AtomicBoolean(this.isInConflictWithNonSeedNode);
AtomicBoolean inConflictWithSeedNode = new AtomicBoolean(this.isInConflictWithSeedNode); AtomicBoolean inConflictWithSeedNode = new AtomicBoolean(this.isInConflictWithSeedNode);

View file

@ -117,6 +117,8 @@ public class DaoStateMonitoringService implements DaoSetupService, DaoStateListe
); );
private boolean checkpointFailed; private boolean checkpointFailed;
private boolean ignoreDevMsg; private boolean ignoreDevMsg;
private int numCalls;
private long accumulatedDuration;
private final File storageDir; private final File storageDir;
@ -176,6 +178,12 @@ public class DaoStateMonitoringService implements DaoSetupService, DaoStateListe
if (!ignoreDevMsg) { if (!ignoreDevMsg) {
verifyCheckpoints(); verifyCheckpoints();
} }
log.info("ParseBlockChainComplete: Accumulated updateHashChain() calls for {} block took {} ms " +
"({} ms in average / block)",
numCalls,
accumulatedDuration,
(int) ((double) accumulatedDuration / (double) numCalls));
} }
@Override @Override
@ -277,6 +285,7 @@ public class DaoStateMonitoringService implements DaoSetupService, DaoStateListe
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private void updateHashChain(Block block) { private void updateHashChain(Block block) {
long ts = System.currentTimeMillis();
byte[] prevHash; byte[] prevHash;
int height = block.getHeight(); int height = block.getHeight();
if (daoStateBlockChain.isEmpty()) { if (daoStateBlockChain.isEmpty()) {
@ -316,6 +325,13 @@ public class DaoStateMonitoringService implements DaoSetupService, DaoStateListe
int delayInSec = 5 + new Random().nextInt(10); int delayInSec = 5 + new Random().nextInt(10);
UserThread.runAfter(() -> daoStateNetworkService.broadcastMyStateHash(myDaoStateHash), delayInSec); UserThread.runAfter(() -> daoStateNetworkService.broadcastMyStateHash(myDaoStateHash), delayInSec);
} }
long duration = System.currentTimeMillis() - ts;
// We don't want to spam the output. We log accumulated time after parsing is completed.
log.trace("updateHashChain for block {} took {} ms",
block.getHeight(),
duration);
accumulatedDuration += duration;
numCalls++;
} }
private boolean processPeersDaoStateHash(DaoStateHash daoStateHash, Optional<NodeAddress> peersNodeAddress, private boolean processPeersDaoStateHash(DaoStateHash daoStateHash, Optional<NodeAddress> peersNodeAddress,

View file

@ -160,7 +160,13 @@ public class ProposalStateMonitoringService implements DaoSetupService, DaoState
System.currentTimeMillis() - ts); System.currentTimeMillis() - ts);
} }
} }
maybeUpdateHashChain(blockHeight); long ts = System.currentTimeMillis();
boolean updated = maybeUpdateHashChain(blockHeight);
if (updated) {
log.info("updateHashChain for block {} took {} ms",
blockHeight,
System.currentTimeMillis() - ts);
}
} }
@SuppressWarnings("Duplicates") @SuppressWarnings("Duplicates")

View file

@ -45,9 +45,10 @@ public class BsqNodeProvider {
!preferences.getRpcPw().isEmpty() && !preferences.getRpcPw().isEmpty() &&
preferences.getBlockNotifyPort() > 0; preferences.getBlockNotifyPort() > 0;
boolean daoFullNode = preferences.isDaoFullNode(); boolean daoFullNode = preferences.isDaoFullNode();
if (daoFullNode && !rpcDataSet) if (daoFullNode && !rpcDataSet) {
log.warn("daoFullNode is set but RPC user and pw are missing"); log.warn("daoFullNode is set in preferences but RPC user and pw are missing. We reset daoFullNode in preferences to false.");
preferences.setDaoFullNode(false);
}
bsqNode = daoFullNode && rpcDataSet ? bsqFullNode : bsqLiteNode; bsqNode = daoFullNode && rpcDataSet ? bsqFullNode : bsqLiteNode;
} }
} }

View file

@ -88,9 +88,6 @@ public class LiteNode extends BsqNode {
liteNodeNetworkService.start(); liteNodeNetworkService.start();
bsqWalletService.addNewBestBlockListener(block -> { bsqWalletService.addNewBestBlockListener(block -> {
int height = block.getHeight();
log.info("New block at height {} from bsqWalletService", height);
// Check if we are done with parsing // Check if we are done with parsing
if (!daoStateService.isParseBlockChainComplete()) if (!daoStateService.isParseBlockChainComplete())
return; return;
@ -100,6 +97,9 @@ public class LiteNode extends BsqNode {
checkForBlockReceivedTimer.stop(); checkForBlockReceivedTimer.stop();
} }
int height = block.getHeight();
log.info("New block at height {} from bsqWalletService", height);
// We expect to receive the new BSQ block from the network shortly after BitcoinJ has been aware of it. // We expect to receive the new BSQ block from the network shortly after BitcoinJ has been aware of it.
// If we don't receive it we request it manually from seed nodes // If we don't receive it we request it manually from seed nodes
checkForBlockReceivedTimer = UserThread.runAfter(() -> { checkForBlockReceivedTimer = UserThread.runAfter(() -> {

View file

@ -143,6 +143,10 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen
listeners.add(listener); listeners.add(listener);
} }
/**
*
* @param startBlockHeight Block height from where we expect new blocks (current block height in bsqState + 1)
*/
public void requestBlocks(int startBlockHeight) { public void requestBlocks(int startBlockHeight) {
lastRequestedBlockHeight = startBlockHeight; lastRequestedBlockHeight = startBlockHeight;
Optional<Connection> connectionToSeedNodeOptional = networkNode.getConfirmedConnections().stream() Optional<Connection> connectionToSeedNodeOptional = networkNode.getConfirmedConnections().stream()

View file

@ -80,7 +80,7 @@ public class BlockParser {
*/ */
public Block parseBlock(RawBlock rawBlock) throws BlockHashNotConnectingException, BlockHeightNotConnectingException { public Block parseBlock(RawBlock rawBlock) throws BlockHashNotConnectingException, BlockHeightNotConnectingException {
int blockHeight = rawBlock.getHeight(); int blockHeight = rawBlock.getHeight();
log.debug("Parse block at height={} ", blockHeight); log.trace("Parse block at height={} ", blockHeight);
validateIfBlockIsConnecting(rawBlock); validateIfBlockIsConnecting(rawBlock);

View file

@ -385,7 +385,7 @@ public class TxParser {
} }
// TRANSFER_BSQ has no fee, no opReturn and no UNLOCK_OUTPUT at first output // TRANSFER_BSQ has no fee, no opReturn and no UNLOCK_OUTPUT at first output
log.debug("No burned fee and no OP_RETURN, so this is a TRANSFER_BSQ tx."); log.trace("No burned fee and no OP_RETURN, so this is a TRANSFER_BSQ tx.");
return TxType.TRANSFER_BSQ; return TxType.TRANSFER_BSQ;
} }

View file

@ -187,7 +187,7 @@ public class Offer implements NetworkPayload, PersistablePayload {
return null; return null;
} }
} else { } else {
log.debug("We don't have a market price.\n" + log.trace("We don't have a market price. " +
"That case could only happen if you don't have a price feed."); "That case could only happen if you don't have a price feed.");
return null; return null;
} }

View file

@ -196,15 +196,14 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
stopPeriodicRepublishOffersTimer(); stopPeriodicRepublishOffersTimer();
stopRetryRepublishOffersTimer(); stopRetryRepublishOffersTimer();
log.debug("remove all open offers at shutDown"); log.info("Remove open offers at shutDown. Number of open offers: {}", openOffers.size());
// we remove own offers from offerbook when we go offline // we remove own offers from offerbook when we go offline
// Normally we use a delay for broadcasting to the peers, but at shut down we want to get it fast out // Normally we use a delay for broadcasting to the peers, but at shut down we want to get it fast out
int size = openOffers != null ? openOffers.size() : 0;
final int size = openOffers != null ? openOffers.size() : 0;
if (offerBookService.isBootstrapped() && size > 0) { if (offerBookService.isBootstrapped() && size > 0) {
openOffers.forEach(openOffer -> offerBookService.removeOfferAtShutDown(openOffer.getOffer().getOfferPayload())); openOffers.forEach(openOffer -> offerBookService.removeOfferAtShutDown(openOffer.getOffer().getOfferPayload()));
if (completeHandler != null) if (completeHandler != null)
UserThread.runAfter(completeHandler::run, size * 200 + 500, TimeUnit.MILLISECONDS); UserThread.runAfter(completeHandler, size * 200 + 500, TimeUnit.MILLISECONDS);
} else { } else {
if (completeHandler != null) if (completeHandler != null)
completeHandler.run(); completeHandler.run();
@ -223,7 +222,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
}, errorMessage -> { }, errorMessage -> {
})); }));
if (completeHandler != null) if (completeHandler != null)
UserThread.runAfter(completeHandler::run, size * 200 + 500, TimeUnit.MILLISECONDS); UserThread.runAfter(completeHandler, size * 200 + 500, TimeUnit.MILLISECONDS);
} }

View file

@ -49,7 +49,7 @@ public class PriceRequest {
Futures.addCallback(future, new FutureCallback<Tuple2<Map<String, Long>, Map<String, MarketPrice>>>() { Futures.addCallback(future, new FutureCallback<Tuple2<Map<String, Long>, Map<String, MarketPrice>>>() {
public void onSuccess(Tuple2<Map<String, Long>, Map<String, MarketPrice>> marketPriceTuple) { public void onSuccess(Tuple2<Map<String, Long>, Map<String, MarketPrice>> marketPriceTuple) {
log.debug("Received marketPriceTuple of {}\nfrom provider {}", marketPriceTuple, provider); log.trace("Received marketPriceTuple of {}\nfrom provider {}", marketPriceTuple, provider);
resultFuture.set(marketPriceTuple); resultFuture.set(marketPriceTuple);
} }

View file

@ -40,8 +40,6 @@ import java.util.stream.Stream;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
@Slf4j @Slf4j
public final class TradableList<T extends Tradable> implements PersistableEnvelope { public final class TradableList<T extends Tradable> implements PersistableEnvelope {
transient final private Storage<TradableList<T>> storage; transient final private Storage<TradableList<T>> storage;
@ -80,13 +78,10 @@ public final class TradableList<T extends Tradable> implements PersistableEnvelo
.build(); .build();
} }
@Nullable
public static TradableList fromProto(protobuf.TradableList proto, public static TradableList fromProto(protobuf.TradableList proto,
CoreProtoResolver coreProtoResolver, CoreProtoResolver coreProtoResolver,
Storage<TradableList<Tradable>> storage, Storage<TradableList<Tradable>> storage,
BtcWalletService btcWalletService) { BtcWalletService btcWalletService) {
log.debug("TradableList fromProto of {} ", proto);
List<Tradable> list = proto.getTradableList().stream() List<Tradable> list = proto.getTradableList().stream()
.map(tradable -> { .map(tradable -> {
switch (tradable.getMessageCase()) { switch (tradable.getMessageCase()) {

View file

@ -141,7 +141,7 @@ public class AssetTradeActivityCheck {
"\n\n" + newAssets.toString() + "\n\n" + newAssets.toString() +
"\n\n" + sufficientlyTraded.toString(); "\n\n" + sufficientlyTraded.toString();
// Utilities.copyToClipboard(result); // Utilities.copyToClipboard(result);
log.debug(result); log.trace(result);
} }
private boolean isWarmingUp(String code) { private boolean isWarmingUp(String code) {

View file

@ -170,7 +170,7 @@ public class TradeStatisticsManager {
private void addToMap(TradeStatistics2 tradeStatistics, Map<String, TradeStatistics2> map) { private void addToMap(TradeStatistics2 tradeStatistics, Map<String, TradeStatistics2> map) {
TradeStatistics2 prevValue = map.putIfAbsent(tradeStatistics.getOfferId(), tradeStatistics); TradeStatistics2 prevValue = map.putIfAbsent(tradeStatistics.getOfferId(), tradeStatistics);
if (prevValue != null) if (prevValue != null)
log.debug("We have already an item with the same offer ID. That might happen if both the maker and the taker published the tradeStatistics"); log.trace("We have already an item with the same offer ID. That might happen if both the maker and the taker published the tradeStatistics");
} }
private void dump() { private void dump() {

View file

@ -537,7 +537,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
log.error("Protobuffer data could not be processed: {}", e.toString()); log.error("Protobuffer data could not be processed: {}", e.toString());
} }
} else { } else {
log.debug("Wrong blurredAddressHash. The message is not intended for us."); log.trace("Wrong blurredAddressHash. The message is not intended for us.");
} }
} }
} }

View file

@ -247,7 +247,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
String peersNodeAddress = peersNodeAddressOptional.map(NodeAddress::toString).orElse("null"); String peersNodeAddress = peersNodeAddressOptional.map(NodeAddress::toString).orElse("null");
protobuf.NetworkEnvelope proto = networkEnvelope.toProtoNetworkEnvelope(); protobuf.NetworkEnvelope proto = networkEnvelope.toProtoNetworkEnvelope();
log.debug("Sending message: {}", Utilities.toTruncatedString(proto.toString(), 10000)); log.trace("Sending message: {}", Utilities.toTruncatedString(proto.toString(), 10000));
if (networkEnvelope instanceof Ping | networkEnvelope instanceof RefreshOfferMessage) { if (networkEnvelope instanceof Ping | networkEnvelope instanceof RefreshOfferMessage) {
// pings and offer refresh msg we don't want to log in production // pings and offer refresh msg we don't want to log in production

View file

@ -483,7 +483,7 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
List<Peer> reportedPeersClone = new ArrayList<>(reportedPeers); List<Peer> reportedPeersClone = new ArrayList<>(reportedPeers);
reportedPeersClone.stream().forEach(e -> result.append("\n").append(e)); reportedPeersClone.stream().forEach(e -> result.append("\n").append(e));
result.append("\n------------------------------------------------------------\n"); result.append("\n------------------------------------------------------------\n");
log.debug(result.toString()); log.trace(result.toString());
} }
log.debug("Number of reported peers: {}", reportedPeers.size()); log.debug("Number of reported peers: {}", reportedPeers.size());
} }
@ -495,7 +495,7 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
StringBuilder result = new StringBuilder("We received new reportedPeers:"); StringBuilder result = new StringBuilder("We received new reportedPeers:");
List<Peer> reportedPeersClone = new ArrayList<>(reportedPeers); List<Peer> reportedPeersClone = new ArrayList<>(reportedPeers);
reportedPeersClone.stream().forEach(e -> result.append("\n\t").append(e)); reportedPeersClone.stream().forEach(e -> result.append("\n\t").append(e));
log.debug(result.toString()); log.trace(result.toString());
} }
log.debug("Number of new arrived reported peers: {}", reportedPeers.size()); log.debug("Number of new arrived reported peers: {}", reportedPeers.size());
} }

View file

@ -42,13 +42,12 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -56,8 +55,6 @@ import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import static com.google.common.base.Preconditions.checkArgument;
@Slf4j @Slf4j
class RequestDataHandler implements MessageListener { class RequestDataHandler implements MessageListener {
private static final long TIMEOUT = 90; private static final long TIMEOUT = 90;
@ -93,10 +90,10 @@ class RequestDataHandler implements MessageListener {
// Constructor // Constructor
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public RequestDataHandler(NetworkNode networkNode, RequestDataHandler(NetworkNode networkNode,
P2PDataStorage dataStorage, P2PDataStorage dataStorage,
PeerManager peerManager, PeerManager peerManager,
Listener listener) { Listener listener) {
this.networkNode = networkNode; this.networkNode = networkNode;
this.dataStorage = dataStorage; this.dataStorage = dataStorage;
this.peerManager = peerManager; this.peerManager = peerManager;
@ -112,7 +109,7 @@ class RequestDataHandler implements MessageListener {
// API // API
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public void requestData(NodeAddress nodeAddress, boolean isPreliminaryDataRequest) { void requestData(NodeAddress nodeAddress, boolean isPreliminaryDataRequest) {
peersNodeAddress = nodeAddress; peersNodeAddress = nodeAddress;
if (!stopped) { if (!stopped) {
GetDataRequest getDataRequest; GetDataRequest getDataRequest;
@ -155,6 +152,7 @@ class RequestDataHandler implements MessageListener {
log.info("We send a {} to peer {}. ", getDataRequest.getClass().getSimpleName(), nodeAddress); log.info("We send a {} to peer {}. ", getDataRequest.getClass().getSimpleName(), nodeAddress);
networkNode.addMessageListener(this); networkNode.addMessageListener(this);
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getDataRequest); SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getDataRequest);
//noinspection UnstableApiUsage
Futures.addCallback(future, new FutureCallback<Connection>() { Futures.addCallback(future, new FutureCallback<Connection>() {
@Override @Override
public void onSuccess(Connection connection) { public void onSuccess(Connection connection) {
@ -198,7 +196,7 @@ class RequestDataHandler implements MessageListener {
GetDataResponse getDataResponse = (GetDataResponse) networkEnvelope; GetDataResponse getDataResponse = (GetDataResponse) networkEnvelope;
Map<String, Set<NetworkPayload>> payloadByClassName = new HashMap<>(); Map<String, Set<NetworkPayload>> payloadByClassName = new HashMap<>();
final Set<ProtectedStorageEntry> dataSet = getDataResponse.getDataSet(); final Set<ProtectedStorageEntry> dataSet = getDataResponse.getDataSet();
dataSet.stream().forEach(e -> { dataSet.forEach(e -> {
final ProtectedStoragePayload protectedStoragePayload = e.getProtectedStoragePayload(); final ProtectedStoragePayload protectedStoragePayload = e.getProtectedStoragePayload();
if (protectedStoragePayload == null) { if (protectedStoragePayload == null) {
log.warn("StoragePayload was null: {}", networkEnvelope.toString()); log.warn("StoragePayload was null: {}", networkEnvelope.toString());
@ -216,7 +214,7 @@ class RequestDataHandler implements MessageListener {
Set<PersistableNetworkPayload> persistableNetworkPayloadSet = getDataResponse.getPersistableNetworkPayloadSet(); Set<PersistableNetworkPayload> persistableNetworkPayloadSet = getDataResponse.getPersistableNetworkPayloadSet();
if (persistableNetworkPayloadSet != null) { if (persistableNetworkPayloadSet != null) {
persistableNetworkPayloadSet.stream().forEach(persistableNetworkPayload -> { persistableNetworkPayloadSet.forEach(persistableNetworkPayload -> {
// For logging different data types // For logging different data types
String className = persistableNetworkPayload.getClass().getSimpleName(); String className = persistableNetworkPayload.getClass().getSimpleName();
if (!payloadByClassName.containsKey(className)) if (!payloadByClassName.containsKey(className))
@ -233,62 +231,57 @@ class RequestDataHandler implements MessageListener {
final int items = dataSet.size() + final int items = dataSet.size() +
(persistableNetworkPayloadSet != null ? persistableNetworkPayloadSet.size() : 0); (persistableNetworkPayloadSet != null ? persistableNetworkPayloadSet.size() : 0);
sb.append("Received ").append(items).append(" instances\n"); sb.append("Received ").append(items).append(" instances\n");
payloadByClassName.entrySet().stream().forEach(e -> sb.append(e.getKey()) payloadByClassName.forEach((key, value) -> sb.append(key)
.append(": ") .append(": ")
.append(e.getValue().size()) .append(value.size())
.append("\n")); .append("\n"));
sb.append("#################################################################"); sb.append("#################################################################");
log.info(sb.toString()); log.info(sb.toString());
if (getDataResponse.getRequestNonce() == nonce) { if (getDataResponse.getRequestNonce() == nonce) {
stopTimeoutTimer(); stopTimeoutTimer();
checkArgument(connection.getPeersNodeAddressOptional().isPresent(), if (!connection.getPeersNodeAddressOptional().isPresent()) {
"RequestDataHandler.onMessage: connection.getPeersNodeAddressOptional() must be present " + log.error("RequestDataHandler.onMessage: connection.getPeersNodeAddressOptional() must be present " +
"at that moment"); "at that moment");
return;
}
final NodeAddress sender = connection.getPeersNodeAddressOptional().get(); final NodeAddress sender = connection.getPeersNodeAddressOptional().get();
List<NetworkPayload> processDelayedItems = new ArrayList<>(); long ts = System.currentTimeMillis();
dataSet.stream().forEach(e -> { AtomicInteger counter = new AtomicInteger();
if (e.getProtectedStoragePayload() instanceof LazyProcessedPayload) { dataSet.forEach(e -> {
processDelayedItems.add(e); // We don't broadcast here (last param) as we are only connected to the seed node and would be pointless
} else { dataStorage.addProtectedStorageEntry(e, sender, null, false, false);
// We dont broadcast here (last param) as we are only connected to the seed node and would be pointless counter.getAndIncrement();
dataStorage.addProtectedStorageEntry(e, sender, null, false, false);
}
}); });
log.info("Processing {} protectedStorageEntries took {} ms.", counter.get(), System.currentTimeMillis() - ts);
if (persistableNetworkPayloadSet != null) { if (persistableNetworkPayloadSet != null) {
persistableNetworkPayloadSet.stream().forEach(e -> { ts = System.currentTimeMillis();
persistableNetworkPayloadSet.forEach(e -> {
if (e instanceof LazyProcessedPayload) { if (e instanceof LazyProcessedPayload) {
processDelayedItems.add(e); // We use an optimized method as many checks are not required in that case to avoid
// performance issues.
// Processing 82645 items took now 61 ms compared to earlier version where it took ages (> 2min).
// Usually we only get about a few hundred or max. a few 1000 items. 82645 is all
// trade stats stats and all account age witness data.
dataStorage.addPersistableNetworkPayloadFromInitialRequest(e);
} else { } else {
// We dont broadcast here as we are only connected to the seed node and would be pointless // We don't broadcast here as we are only connected to the seed node and would be pointless
dataStorage.addPersistableNetworkPayload(e, sender, false, false, false, false); dataStorage.addPersistableNetworkPayload(e, sender, false,
false, false, false);
} }
}); });
log.info("Processing {} persistableNetworkPayloads took {} ms.",
persistableNetworkPayloadSet.size(), System.currentTimeMillis() - ts);
} }
long ts = System.currentTimeMillis();
processDelayedItems.forEach(item -> {
if (item instanceof ProtectedStorageEntry)
dataStorage.addProtectedStorageEntry((ProtectedStorageEntry) item, sender, null,
false, false);
else if (item instanceof PersistableNetworkPayload) {
// We use an optimized method as many checks are not required in that case to avoid
// performance issues.
// Processing 82645 items took now 61 ms compared to earlier version where it took ages (> 2min).
// Usually we only get about a few hundred or max. a few 1000 items. 82645 is all
// trade stats stats and all account age witness data.
dataStorage.addPersistableNetworkPayloadFromInitialRequest((PersistableNetworkPayload) item);
}
});
log.info("Processing {} items took {} ms.", processDelayedItems.size(), System.currentTimeMillis() - ts);
cleanup(); cleanup();
listener.onComplete(); listener.onComplete();
} else { } else {
log.debug("Nonce not matching. That can happen rarely if we get a response after a canceled " + log.warn("Nonce not matching. That can happen rarely if we get a response after a canceled " +
"handshake (timeout causes connection close but peer might have sent a msg before " + "handshake (timeout causes connection close but peer might have sent a msg before " +
"connection was closed).\n\t" + "connection was closed).\n\t" +
"We drop that message. nonce={} / requestNonce={}", "We drop that message. nonce={} / requestNonce={}",
@ -313,7 +306,9 @@ class RequestDataHandler implements MessageListener {
@SuppressWarnings("UnusedParameters") @SuppressWarnings("UnusedParameters")
private void handleFault(String errorMessage, NodeAddress nodeAddress, CloseConnectionReason closeConnectionReason) { private void handleFault(String errorMessage,
NodeAddress nodeAddress,
CloseConnectionReason closeConnectionReason) {
cleanup(); cleanup();
log.info(errorMessage); log.info(errorMessage);
//peerManager.shutDownConnection(nodeAddress, closeConnectionReason); //peerManager.shutDownConnection(nodeAddress, closeConnectionReason);

View file

@ -317,7 +317,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
boolean allowBroadcast, boolean allowBroadcast,
boolean reBroadcast, boolean reBroadcast,
boolean checkDate) { boolean checkDate) {
log.debug("addPersistableNetworkPayload payload={}", payload); log.trace("addPersistableNetworkPayload payload={}", payload);
byte[] hash = payload.getHash(); byte[] hash = payload.getHash();
if (payload.verifyHashSize()) { if (payload.verifyHashSize()) {
ByteArray hashAsByteArray = new ByteArray(hash); ByteArray hashAsByteArray = new ByteArray(hash);
@ -683,7 +683,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
msg = "Sequence number is equal to the stored one. sequenceNumber = " msg = "Sequence number is equal to the stored one. sequenceNumber = "
+ newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber; + newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber;
} }
log.debug(msg); log.trace(msg);
return false; return false;
} else { } else {
log.debug("Sequence number is invalid. sequenceNumber = " log.debug("Sequence number is invalid. sequenceNumber = "
@ -834,7 +834,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
.append(Utilities.toTruncatedString(protectedStoragePayload)); .append(Utilities.toTruncatedString(protectedStoragePayload));
}); });
sb.append("\n------------------------------------------------------------\n"); sb.append("\n------------------------------------------------------------\n");
log.debug(sb.toString()); log.trace(sb.toString());
//log.debug("Data set " + info + " operation: size=" + map.values().size()); //log.debug("Data set " + info + " operation: size=" + map.values().size());
} }
} }