Merge pull request #5609 from chimp1984/optimize-dao-state-handling

Optimize DaoState snapshot behaviour
This commit is contained in:
sqrrm 2021-07-11 12:18:35 +02:00 committed by GitHub
commit b425f9abac
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 200 additions and 48 deletions

View file

@ -26,6 +26,7 @@ import bisq.common.file.FileUtil;
import bisq.common.handlers.ResultHandler;
import bisq.common.proto.persistable.PersistableEnvelope;
import bisq.common.proto.persistable.PersistenceProtoResolver;
import bisq.common.util.GcUtil;
import bisq.common.util.Utilities;
import com.google.inject.Inject;
@ -319,7 +320,11 @@ public class PersistenceManager<T extends PersistableEnvelope> {
new Thread(() -> {
T persisted = getPersisted(fileName);
if (persisted != null) {
UserThread.execute(() -> resultHandler.accept(persisted));
UserThread.execute(() -> {
resultHandler.accept(persisted);
GcUtil.maybeReleaseMemory();
});
} else {
UserThread.execute(orElse);
}
@ -496,6 +501,8 @@ public class PersistenceManager<T extends PersistableEnvelope> {
if (completeHandler != null) {
UserThread.execute(completeHandler);
}
GcUtil.maybeReleaseMemory();
}
}

View file

@ -23,6 +23,7 @@ import bisq.common.app.DevEnv;
import bisq.common.app.Log;
import bisq.common.app.Version;
import bisq.common.config.Config;
import bisq.common.util.GcUtil;
import bisq.common.util.Profiler;
import bisq.common.util.Utilities;
@ -54,6 +55,9 @@ public class CommonSetup {
Version.printVersion();
maybePrintPathOfCodeSource();
Profiler.printSystemLoad();
Profiler.printSystemLoadPeriodically(10, TimeUnit.MINUTES);
GcUtil.autoReleaseMemory();
setSystemProperties();
setupSigIntHandlers(gracefulShutDownHandler);
@ -61,10 +65,6 @@ public class CommonSetup {
DevEnv.setup(config);
}
public static void printSystemLoadPeriodically(int delayMin) {
UserThread.runPeriodically(Profiler::printSystemLoad, delayMin, TimeUnit.MINUTES);
}
public static void setupUncaughtExceptionHandler(UncaughtExceptionHandler uncaughtExceptionHandler) {
Thread.UncaughtExceptionHandler handler = (thread, throwable) -> {
// Might come from another thread

View file

@ -0,0 +1,52 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.common.util;
import bisq.common.UserThread;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class GcUtil {
public static void autoReleaseMemory() {
autoReleaseMemory(1000);
}
/**
* @param trigger Threshold for free memory in MB when we invoke the garbage collector
*/
public static void autoReleaseMemory(long trigger) {
UserThread.runPeriodically(() -> maybeReleaseMemory(trigger), 60);
}
public static void maybeReleaseMemory() {
maybeReleaseMemory(3000);
}
/**
* @param trigger Threshold for free memory in MB when we invoke the garbage collector
*/
public static void maybeReleaseMemory(long trigger) {
long totalMemory = Runtime.getRuntime().totalMemory();
if (totalMemory > trigger * 1024 * 1024) {
log.info("Invoke garbage collector. Total memory: {} {} {}", Utilities.readableFileSize(totalMemory), totalMemory, trigger * 1024 * 1024);
System.gc();
log.info("Total memory after gc() call: {}", Utilities.readableFileSize(Runtime.getRuntime().totalMemory()));
}
}
}

View file

@ -17,18 +17,30 @@
package bisq.common.util;
import bisq.common.UserThread;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class Profiler {
public static void printSystemLoadPeriodically(long delay, TimeUnit timeUnit) {
UserThread.runPeriodically(Profiler::printSystemLoad, delay, timeUnit);
}
public static void printSystemLoad() {
Runtime runtime = Runtime.getRuntime();
long free = runtime.freeMemory() / 1024 / 1024;
long total = runtime.totalMemory() / 1024 / 1024;
long free = runtime.freeMemory();
long total = runtime.totalMemory();
long used = total - free;
log.info("System report: Used memory: {} MB; Free memory: {} MB; Total memory: {} MB; No. of threads: {}",
used, free, total, Thread.activeCount());
log.info("Total memory: {}; Used memory: {}; Free memory: {}; Max memory: {}; No. of threads: {}",
Utilities.readableFileSize(total),
Utilities.readableFileSize(used),
Utilities.readableFileSize(free),
Utilities.readableFileSize(runtime.maxMemory()),
Thread.activeCount());
}
public static long getUsedMemoryInMB() {

View file

@ -130,7 +130,6 @@ public abstract class BisqExecutable implements GracefulShutDownHandler, BisqSet
// Headless versions can call inside launchApplication the onApplicationLaunched() manually
protected void onApplicationLaunched() {
configUserThread();
CommonSetup.printSystemLoadPeriodically(10);
// As the handler method might be overwritten by subclasses and they use the application as handler
// we need to setup the handler after the application is created.
CommonSetup.setupUncaughtExceptionHandler(this);

View file

@ -40,6 +40,7 @@ import bisq.common.UserThread;
import bisq.common.config.Config;
import bisq.common.crypto.Hash;
import bisq.common.file.FileUtil;
import bisq.common.util.GcUtil;
import bisq.common.util.Utilities;
import javax.inject.Inject;
@ -303,10 +304,10 @@ public class DaoStateMonitoringService implements DaoSetupService, DaoStateListe
height, daoStateBlockChain.getLast().getHeight());
prevHash = daoStateBlockChain.getLast().getHash();
}
byte[] stateHash = daoStateService.getSerializedStateForHashChain();
byte[] stateAsBytes = daoStateService.getSerializedStateForHashChain();
// We include the prev. hash in our new hash so we can be sure that if one hash is matching all the past would
// match as well.
byte[] combined = ArrayUtils.addAll(prevHash, stateHash);
byte[] combined = ArrayUtils.addAll(prevHash, stateAsBytes);
byte[] hash = Hash.getSha256Ripemd160hash(combined);
DaoStateHash myDaoStateHash = new DaoStateHash(height, hash, prevHash);
@ -333,8 +334,11 @@ public class DaoStateMonitoringService implements DaoSetupService, DaoStateListe
numCalls++;
}
private boolean processPeersDaoStateHash(DaoStateHash daoStateHash, Optional<NodeAddress> peersNodeAddress,
private boolean processPeersDaoStateHash(DaoStateHash daoStateHash,
Optional<NodeAddress> peersNodeAddress,
boolean notifyListeners) {
GcUtil.maybeReleaseMemory();
AtomicBoolean changed = new AtomicBoolean(false);
AtomicBoolean inConflictWithNonSeedNode = new AtomicBoolean(this.isInConflictWithNonSeedNode);
AtomicBoolean inConflictWithSeedNode = new AtomicBoolean(this.isInConflictWithSeedNode);
@ -374,11 +378,12 @@ public class DaoStateMonitoringService implements DaoSetupService, DaoStateListe
log.debug("Conflict with non-seed nodes: {}", conflictMsg);
}
if (notifyListeners && changed.get()) {
listeners.forEach(Listener::onChangeAfterBatchProcessing);
}
GcUtil.maybeReleaseMemory();
return changed.get();
}

View file

@ -29,6 +29,7 @@ import bisq.core.dao.state.model.blockchain.TxType;
import bisq.common.config.Config;
import bisq.common.file.FileUtil;
import bisq.common.file.JsonFileManager;
import bisq.common.util.GcUtil;
import bisq.common.util.Utilities;
import org.bitcoinj.core.Utils;
@ -144,6 +145,8 @@ public class ExportJsonFilesService implements DaoSetupService {
return jsonTx;
}).collect(Collectors.toList());
GcUtil.maybeReleaseMemory();
DaoState daoState = daoStateService.getClone();
List<JsonBlock> jsonBlockList = daoState.getBlocks().stream()
.map(this::getJsonBlock)
@ -154,6 +157,9 @@ public class ExportJsonFilesService implements DaoSetupService {
bsqStateFileManager.writeToDisc(Utilities.objectToJson(jsonBlocks), "blocks");
allJsonTxOutputs.forEach(jsonTxOutput -> txOutputFileManager.writeToDisc(Utilities.objectToJson(jsonTxOutput), jsonTxOutput.getId()));
jsonTxs.forEach(jsonTx -> txFileManager.writeToDisc(Utilities.objectToJson(jsonTx), jsonTx.getId()));
GcUtil.maybeReleaseMemory();
return null;
});

View file

@ -24,6 +24,7 @@ import bisq.core.dao.state.DaoStateService;
import bisq.core.dao.state.model.blockchain.Block;
import bisq.common.app.DevEnv;
import bisq.common.util.GcUtil;
import org.bitcoinj.core.Coin;
@ -114,6 +115,8 @@ public class BlockParser {
daoStateService.onParseBlockComplete(block);
log.info("Parsing {} transactions at block height {} took {} ms", rawBlock.getRawTxs().size(),
blockHeight, System.currentTimeMillis() - startTs);
GcUtil.maybeReleaseMemory();
return block;
}

View file

@ -281,7 +281,6 @@ public class DaoStateService implements DaoSetupService {
daoStateListeners.forEach(DaoStateListener::onParseBlockChainComplete);
}
public List<Block> getBlocks() {
return daoState.getBlocks();
}
@ -535,9 +534,7 @@ public class DaoStateService implements DaoSetupService {
///////////////////////////////////////////////////////////////////////////////////////////
private Set<TxOutput> getTxOutputsByTxOutputType(TxOutputType txOutputType) {
return getUnorderedTxOutputStream()
.filter(txOutput -> txOutput.getTxOutputType() == txOutputType)
.collect(Collectors.toSet());
return daoState.getTxOutputByTxOutputType(txOutputType);
}
public boolean isBsqTxOutputType(TxOutput txOutput) {

View file

@ -24,6 +24,7 @@ import bisq.core.dao.state.model.blockchain.Block;
import bisq.core.dao.state.storage.DaoStateStorageService;
import bisq.common.config.Config;
import bisq.common.util.GcUtil;
import javax.inject.Inject;
import javax.inject.Named;
@ -63,6 +64,7 @@ public class DaoStateSnapshotService {
@Setter
@Nullable
private Runnable daoRequiresRestartHandler;
private boolean requestPersistenceCalled;
///////////////////////////////////////////////////////////////////////////////////////////
@ -101,24 +103,43 @@ public class DaoStateSnapshotService {
!daoStateService.getBlocks().isEmpty() &&
isValidHeight(daoStateService.getBlockHeightOfLastBlock()) &&
noSnapshotCandidateOrDifferentHeight) {
// At trigger event we store the latest snapshotCandidate to disc
long ts = System.currentTimeMillis();
if (daoStateSnapshotCandidate != null) {
// Serialisation happens on the userThread so we do not need to clone the data. Write to disk happens
// in a thread but does not interfere with our objects as they got already serialized when passed to the
// write thread. We use requestPersistence so we do not write immediately but at next scheduled interval.
// This avoids frequent write at dao sync and better performance.
daoStateStorageService.requestPersistence(daoStateSnapshotCandidate, daoStateHashChainSnapshotCandidate);
log.info("Serializing snapshotCandidate for writing to Disc with height {} at height {} took {} ms",
daoStateSnapshotCandidate.getChainHeight(), chainHeight, System.currentTimeMillis() - ts);
// We protect to get called while we are not completed with persisting the daoState. This can take about
// 20 seconds and it is not expected that we get triggered another snapshot event in that period, but this
// check guards that we would skip such calls..
if (requestPersistenceCalled) {
log.warn("We try to persist a daoState but the previous call has not completed yet. " +
"We ignore that call and skip that snapshot. " +
"Snapshot will be created at next snapshot height again. This is not to be expected with live " +
"blockchain data.");
return;
}
ts = System.currentTimeMillis();
// Now we clone and keep it in memory for the next trigger event
daoStateSnapshotCandidate = daoStateService.getClone();
daoStateHashChainSnapshotCandidate = new LinkedList<>(daoStateMonitoringService.getDaoStateHashChain());
GcUtil.maybeReleaseMemory();
log.debug("Cloned new snapshotCandidate at height {} took {} ms", chainHeight, System.currentTimeMillis() - ts);
// At trigger event we store the latest snapshotCandidate to disc
long ts = System.currentTimeMillis();
requestPersistenceCalled = true;
daoStateStorageService.requestPersistence(daoStateSnapshotCandidate,
daoStateHashChainSnapshotCandidate,
() -> {
log.info("Serializing snapshotCandidate for writing to Disc with height {} at height {} took {} ms",
daoStateSnapshotCandidate != null ? daoStateSnapshotCandidate.getChainHeight() : "N/A",
chainHeight,
System.currentTimeMillis() - ts);
long ts2 = System.currentTimeMillis();
GcUtil.maybeReleaseMemory();
// Now we clone and keep it in memory for the next trigger event
daoStateSnapshotCandidate = daoStateService.getClone();
daoStateHashChainSnapshotCandidate = new LinkedList<>(daoStateMonitoringService.getDaoStateHashChain());
log.info("Cloned new snapshotCandidate at height {} took {} ms", chainHeight, System.currentTimeMillis() - ts2);
requestPersistenceCalled = false;
GcUtil.maybeReleaseMemory();
});
}
}

View file

@ -22,6 +22,7 @@ import bisq.core.dao.state.model.blockchain.SpentInfo;
import bisq.core.dao.state.model.blockchain.Tx;
import bisq.core.dao.state.model.blockchain.TxOutput;
import bisq.core.dao.state.model.blockchain.TxOutputKey;
import bisq.core.dao.state.model.blockchain.TxOutputType;
import bisq.core.dao.state.model.governance.Cycle;
import bisq.core.dao.state.model.governance.DecryptedBallotsWithMerits;
import bisq.core.dao.state.model.governance.EvaluatedProposal;
@ -38,6 +39,7 @@ import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -114,6 +116,9 @@ public class DaoState implements PersistablePayload {
private transient final Map<Integer, Block> blocksByHeight; // Blocks indexed by height
@JsonExclude
private transient final Set<String> blockHashes; // Cache of known block hashes
@JsonExclude
private transient final Map<TxOutputType, Set<TxOutput>> txOutputsByTxOutputType = new HashMap<>();
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
@ -164,6 +169,7 @@ public class DaoState implements PersistablePayload {
txCache = blocks.stream()
.flatMap(block -> block.getTxs().stream())
.peek(this::addToTxOutputsByTxOutputTypeMap)
.collect(Collectors.toMap(Tx::getId, Function.identity(), (x, y) -> x, HashMap::new));
blockHashes = blocks.stream()
@ -255,11 +261,32 @@ public class DaoState implements PersistablePayload {
// We shouldn't get duplicate txIds, but use putIfAbsent instead of put for consistency with the map merge
// function used in the constructor to initialise txCache (and to exactly match the pre-caching behaviour).
txCache.putIfAbsent(tx.getId(), tx);
addToTxOutputsByTxOutputTypeMap(tx);
}
public void setTxCache(Map<String, Tx> txCache) {
this.txCache.clear();
this.txCache.putAll(txCache);
txOutputsByTxOutputType.clear();
this.txCache.values().forEach(this::addToTxOutputsByTxOutputTypeMap);
}
private void addToTxOutputsByTxOutputTypeMap(Tx tx) {
tx.getTxOutputs().forEach(txOutput -> {
TxOutputType txOutputType = txOutput.getTxOutputType();
txOutputsByTxOutputType.putIfAbsent(txOutputType, new HashSet<>());
txOutputsByTxOutputType.get(txOutputType).add(txOutput);
});
}
public Set<TxOutput> getTxOutputByTxOutputType(TxOutputType txOutputType) {
if (txOutputsByTxOutputType.containsKey(txOutputType)) {
return Collections.unmodifiableSet(txOutputsByTxOutputType.get(txOutputType));
} else {
return new HashSet<>();
}
}
public Map<String, Tx> getTxCache() {
@ -300,7 +327,7 @@ public class DaoState implements PersistablePayload {
}
public void addBlocks(List<Block> newBlocks) {
newBlocks.forEach(b -> addBlock(b));
newBlocks.forEach(this::addBlock);
}
/**
@ -329,6 +356,7 @@ public class DaoState implements PersistablePayload {
",\n evaluatedProposalList=" + evaluatedProposalList +
",\n decryptedBallotsWithMeritsList=" + decryptedBallotsWithMeritsList +
",\n txCache=" + txCache +
",\n txOutputsByTxOutputType=" + txOutputsByTxOutputType +
"\n}";
}
}

View file

@ -76,10 +76,29 @@ public class DaoStateStorageService extends StoreService<DaoStateStore> {
return FILE_NAME;
}
public void requestPersistence(DaoState daoState, LinkedList<DaoStateHash> daoStateHashChain) {
public void requestPersistence(DaoState daoState,
LinkedList<DaoStateHash> daoStateHashChain,
Runnable completeHandler) {
if (daoState == null) {
completeHandler.run();
return;
}
store.setDaoState(daoState);
store.setDaoStateHashChain(daoStateHashChain);
persistenceManager.requestPersistence();
// We let the persistence run in a thread to avoid the slow protobuf serialisation to happen on the user
// thread. We also call it immediately to get notified about the completion event.
new Thread(() -> {
Thread.currentThread().setName("Serialize and write DaoState");
persistenceManager.persistNow(() -> {
// After we have written to disk we remove the the daoState in the store to avoid that it stays in
// memory there until the next persist call.
store.setDaoState(null);
completeHandler.run();
});
}).start();
}
public DaoState getPersistedBsqState() {
@ -125,7 +144,7 @@ public class DaoStateStorageService extends StoreService<DaoStateStore> {
@Override
protected DaoStateStore createStore() {
return new DaoStateStore(DaoState.getClone(daoState), new LinkedList<>(daoStateMonitoringService.getDaoStateHashChain()));
return new DaoStateStore(null, new LinkedList<>(daoStateMonitoringService.getDaoStateHashChain()));
}
@Override

View file

@ -31,6 +31,8 @@ import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
import static com.google.common.base.Preconditions.checkNotNull;
@ -40,12 +42,13 @@ public class DaoStateStore implements PersistableEnvelope {
// the snapshot!
@Getter
@Setter
@Nullable
private DaoState daoState;
@Getter
@Setter
private LinkedList<DaoStateHash> daoStateHashChain;
DaoStateStore(DaoState daoState, LinkedList<DaoStateHash> daoStateHashChain) {
DaoStateStore(@Nullable DaoState daoState, LinkedList<DaoStateHash> daoStateHashChain) {
this.daoState = daoState;
this.daoStateHashChain = daoStateHashChain;
}

View file

@ -247,7 +247,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);
map.put(hashOfPayload, protectedStorageEntry);
log.trace("## addProtectedMailboxStorageEntryToMap hashOfPayload={}, map={}", hashOfPayload, printMap());
//log.trace("## addProtectedMailboxStorageEntryToMap hashOfPayload={}, map={}", hashOfPayload, printMap());
}
@ -280,14 +280,14 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
Map<ByteArray, PersistableNetworkPayload> mapForDataRequest = getMapForDataRequest();
Set<byte[]> excludedKeys = getKeysAsByteSet(mapForDataRequest);
log.trace("## getKnownPayloadHashes map of PersistableNetworkPayloads={}, excludedKeys={}",
/* log.trace("## getKnownPayloadHashes map of PersistableNetworkPayloads={}, excludedKeys={}",
printPersistableNetworkPayloadMap(mapForDataRequest),
excludedKeys.stream().map(Utilities::encodeToHex).toArray());
excludedKeys.stream().map(Utilities::encodeToHex).toArray());*/
Set<byte[]> excludedKeysFromProtectedStorageEntryMap = getKeysAsByteSet(map);
log.trace("## getKnownPayloadHashes map of ProtectedStorageEntrys={}, excludedKeys={}",
/*log.trace("## getKnownPayloadHashes map of ProtectedStorageEntrys={}, excludedKeys={}",
printMap(),
excludedKeysFromProtectedStorageEntryMap.stream().map(Utilities::encodeToHex).toArray());
excludedKeysFromProtectedStorageEntryMap.stream().map(Utilities::encodeToHex).toArray());*/
excludedKeys.addAll(excludedKeysFromProtectedStorageEntryMap);
return excludedKeys;
@ -742,7 +742,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);
log.trace("## call addProtectedStorageEntry hash={}, map={}", hashOfPayload, printMap());
//log.trace("## call addProtectedStorageEntry hash={}, map={}", hashOfPayload, printMap());
// We do that check early as it is a very common case for returning, so we return early
// If we have seen a more recent operation for this payload and we have a payload locally, ignore it
@ -795,7 +795,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), this.clock.millis()));
requestPersistence();
log.trace("## ProtectedStorageEntry added to map. hash={}, map={}", hashOfPayload, printMap());
//log.trace("## ProtectedStorageEntry added to map. hash={}, map={}", hashOfPayload, printMap());
// Optionally, broadcast the add/update depending on the calling environment
if (allowBroadcast) {
@ -823,7 +823,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
ProtectedStoragePayload protectedStoragePayload = protectedMailboxStorageEntry.getProtectedStoragePayload();
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);
log.trace("## call republishProtectedStorageEntry hash={}, map={}", hashOfPayload, printMap());
//log.trace("## call republishProtectedStorageEntry hash={}, map={}", hashOfPayload, printMap());
if (hasAlreadyRemovedAddOncePayload(protectedStoragePayload, hashOfPayload)) {
log.trace("## We have already removed that AddOncePayload by a previous removeDataMessage. " +
@ -1023,9 +1023,9 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
ByteArray hashOfPayload = entry.getKey();
ProtectedStorageEntry protectedStorageEntry = entry.getValue();
log.trace("## removeFromMapAndDataStore: hashOfPayload={}, map before remove={}", hashOfPayload, printMap());
//log.trace("## removeFromMapAndDataStore: hashOfPayload={}, map before remove={}", hashOfPayload, printMap());
map.remove(hashOfPayload);
log.trace("## removeFromMapAndDataStore: map after remove={}", printMap());
//log.trace("## removeFromMapAndDataStore: map after remove={}", printMap());
// We inform listeners even the entry was not found in our map
removedProtectedStorageEntries.add(protectedStorageEntry);