mirror of
https://github.com/bisq-network/bisq.git
synced 2024-11-19 18:03:12 +01:00
Merge pull request #5790 from chimp1984/extract-persistence-of-blocks
Extract persistence of BSQ blocks out of DaoStateStore [5]
This commit is contained in:
commit
7b6f971acd
1
.gitattributes
vendored
1
.gitattributes
vendored
@ -13,3 +13,4 @@
|
||||
*.jpeg binary
|
||||
*.png binary
|
||||
p2p/src/main/resources/*BTC_MAINNET filter=lfs diff=lfs merge=lfs -text
|
||||
p2p/src/main/resources/BsqBlocks_BTC_MAINNET/* filter=lfs diff=lfs merge=lfs -text
|
||||
|
10
build.gradle
10
build.gradle
@ -238,7 +238,7 @@ configure(project(':common')) {
|
||||
|
||||
javafx {
|
||||
version = "$javafxVersion"
|
||||
modules = ['javafx.graphics']
|
||||
modules = ['javafx.graphics']
|
||||
}
|
||||
|
||||
dependencies {
|
||||
@ -284,7 +284,7 @@ configure(project(':p2p')) {
|
||||
|
||||
javafx {
|
||||
version = "$javafxVersion"
|
||||
modules = ['javafx.base']
|
||||
modules = ['javafx.base']
|
||||
}
|
||||
|
||||
dependencies {
|
||||
@ -324,7 +324,7 @@ configure(project(':core')) {
|
||||
|
||||
javafx {
|
||||
version = "$javafxVersion"
|
||||
modules = ['javafx.base']
|
||||
modules = ['javafx.base']
|
||||
}
|
||||
|
||||
dependencies {
|
||||
@ -411,7 +411,7 @@ configure(project(':desktop')) {
|
||||
|
||||
javafx {
|
||||
version = "$javafxVersion"
|
||||
modules = ['javafx.controls', 'javafx.fxml']
|
||||
modules = ['javafx.controls', 'javafx.fxml']
|
||||
}
|
||||
|
||||
version = '1.7.5-SNAPSHOT'
|
||||
@ -459,7 +459,7 @@ configure(project(':monitor')) {
|
||||
|
||||
javafx {
|
||||
version = "$javafxVersion"
|
||||
modules = ['javafx.base']
|
||||
modules = ['javafx.base']
|
||||
}
|
||||
|
||||
mainClassName = 'bisq.monitor.Monitor'
|
||||
|
@ -18,7 +18,6 @@
|
||||
package bisq.common.util;
|
||||
|
||||
import bisq.common.UserThread;
|
||||
import bisq.common.app.DevEnv;
|
||||
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -27,8 +26,10 @@ import lombok.extern.slf4j.Slf4j;
|
||||
public class GcUtil {
|
||||
@Setter
|
||||
private static boolean DISABLE_GC_CALLS = false;
|
||||
private static int TRIGGER_MEM = 1000;
|
||||
private static int TRIGGER_MAX_MEM = 3000;
|
||||
private static final int TRIGGER_MEM = 1000;
|
||||
private static final int TRIGGER_MAX_MEM = 3000;
|
||||
private static int totalInvocations;
|
||||
private static long totalGCTime;
|
||||
|
||||
public static void autoReleaseMemory() {
|
||||
if (DISABLE_GC_CALLS)
|
||||
@ -59,20 +60,25 @@ public class GcUtil {
|
||||
long preGcMemory = Runtime.getRuntime().totalMemory();
|
||||
if (preGcMemory > trigger * 1024 * 1024) {
|
||||
System.gc();
|
||||
totalInvocations++;
|
||||
long postGcMemory = Runtime.getRuntime().totalMemory();
|
||||
log.info("GC reduced memory by {}. Total memory before/after: {}/{}. Took {} ms.",
|
||||
long duration = System.currentTimeMillis() - ts;
|
||||
totalGCTime += duration;
|
||||
log.info("GC reduced memory by {}. Total memory before/after: {}/{}. Took {} ms. Total GC invocations: {} / Total GC time {} sec",
|
||||
Utilities.readableFileSize(preGcMemory - postGcMemory),
|
||||
Utilities.readableFileSize(preGcMemory),
|
||||
Utilities.readableFileSize(postGcMemory),
|
||||
System.currentTimeMillis() - ts);
|
||||
if (DevEnv.isDevMode()) {
|
||||
duration,
|
||||
totalInvocations,
|
||||
totalGCTime / 1000d);
|
||||
/* if (DevEnv.isDevMode()) {
|
||||
try {
|
||||
// To see from where we got called
|
||||
throw new RuntimeException("Dummy Exception for print stacktrace at maybeReleaseMemory");
|
||||
} catch (Throwable t) {
|
||||
t.printStackTrace();
|
||||
}
|
||||
}
|
||||
}*/
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -35,6 +35,7 @@ import bisq.core.dao.governance.bond.unlock.UnlockTxService;
|
||||
import bisq.core.dao.governance.myvote.MyVote;
|
||||
import bisq.core.dao.governance.myvote.MyVoteListService;
|
||||
import bisq.core.dao.governance.param.Param;
|
||||
import bisq.core.dao.governance.period.CycleService;
|
||||
import bisq.core.dao.governance.period.PeriodService;
|
||||
import bisq.core.dao.governance.proposal.MyProposalListService;
|
||||
import bisq.core.dao.governance.proposal.ProposalConsensus;
|
||||
@ -99,14 +100,13 @@ import java.io.IOException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@ -128,6 +128,7 @@ public class DaoFacade implements DaoSetupService {
|
||||
private final DaoStateService daoStateService;
|
||||
private final DaoStateMonitoringService daoStateMonitoringService;
|
||||
private final PeriodService periodService;
|
||||
private final CycleService cycleService;
|
||||
private final MyBlindVoteListService myBlindVoteListService;
|
||||
private final MyVoteListService myVoteListService;
|
||||
private final CompensationProposalFactory compensationProposalFactory;
|
||||
@ -155,6 +156,7 @@ public class DaoFacade implements DaoSetupService {
|
||||
DaoStateService daoStateService,
|
||||
DaoStateMonitoringService daoStateMonitoringService,
|
||||
PeriodService periodService,
|
||||
CycleService cycleService,
|
||||
MyBlindVoteListService myBlindVoteListService,
|
||||
MyVoteListService myVoteListService,
|
||||
CompensationProposalFactory compensationProposalFactory,
|
||||
@ -178,6 +180,7 @@ public class DaoFacade implements DaoSetupService {
|
||||
this.daoStateService = daoStateService;
|
||||
this.daoStateMonitoringService = daoStateMonitoringService;
|
||||
this.periodService = periodService;
|
||||
this.cycleService = cycleService;
|
||||
this.myBlindVoteListService = myBlindVoteListService;
|
||||
this.myVoteListService = myVoteListService;
|
||||
this.compensationProposalFactory = compensationProposalFactory;
|
||||
@ -438,12 +441,10 @@ public class DaoFacade implements DaoSetupService {
|
||||
}
|
||||
|
||||
public Map<Integer, Date> getBlockStartDateByCycleIndex() {
|
||||
AtomicInteger index = new AtomicInteger();
|
||||
Map<Integer, Date> map = new HashMap<>();
|
||||
periodService.getCycles()
|
||||
.forEach(cycle -> daoStateService.getBlockAtHeight(cycle.getHeightOfFirstBlock())
|
||||
.ifPresent(block -> map.put(index.getAndIncrement(), new Date(block.getTime()))));
|
||||
return map;
|
||||
return periodService.getCycles().stream().collect(Collectors.toMap(
|
||||
cycleService::getCycleIndex,
|
||||
cycle -> new Date(daoStateService.getBlockTimeAtBlockHeight(cycle.getHeightOfFirstBlock()))
|
||||
));
|
||||
}
|
||||
|
||||
// Because last block in request and voting phases must not be used for making a tx as it will get confirmed in the
|
||||
|
@ -35,6 +35,7 @@ import bisq.network.p2p.network.Connection;
|
||||
|
||||
import bisq.common.Timer;
|
||||
import bisq.common.UserThread;
|
||||
import bisq.common.util.MathUtils;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
|
||||
@ -222,8 +223,11 @@ public class LiteNode extends BsqNode {
|
||||
|
||||
runDelayedBatchProcessing(new ArrayList<>(blockList),
|
||||
() -> {
|
||||
log.info("Parsing {} blocks took {} seconds.", blockList.size(),
|
||||
(System.currentTimeMillis() - ts) / 1000d);
|
||||
double duration = System.currentTimeMillis() - ts;
|
||||
log.info("Parsing {} blocks took {} seconds ({} min.) / {} ms in average / block", blockList.size(),
|
||||
MathUtils.roundDouble(duration / 1000d, 2),
|
||||
MathUtils.roundDouble(duration / 1000d / 60, 2),
|
||||
MathUtils.roundDouble(duration / blockList.size(), 2));
|
||||
// We only request again if wallet is synced, otherwise we would get repeated calls we want to avoid.
|
||||
// We deal with that case at the setupWalletBestBlockListener method above.
|
||||
if (walletsSetup.isDownloadComplete() &&
|
||||
|
@ -135,6 +135,8 @@ public class BlockParser {
|
||||
}
|
||||
|
||||
private boolean isBlockAlreadyAdded(RawBlock rawBlock) {
|
||||
return daoStateService.isBlockHashKnown(rawBlock.getHash());
|
||||
return daoStateService.getBlockAtHeight(rawBlock.getHeight())
|
||||
.map(block -> block.getHash().equals(rawBlock.getHash()))
|
||||
.orElse(false);
|
||||
}
|
||||
}
|
||||
|
@ -149,6 +149,10 @@ public class DaoStateService implements DaoSetupService {
|
||||
return DaoState.getClone(daoState);
|
||||
}
|
||||
|
||||
public protobuf.DaoState getBsqStateCloneExcludingBlocks() {
|
||||
return DaoState.getBsqStateCloneExcludingBlocks(daoState);
|
||||
}
|
||||
|
||||
public byte[] getSerializedStateForHashChain() {
|
||||
return daoState.getSerializedStateForHashChain();
|
||||
}
|
||||
@ -286,17 +290,6 @@ public class DaoStateService implements DaoSetupService {
|
||||
return daoState.getBlocks();
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether specified block hash belongs to a block we already know about.
|
||||
*
|
||||
* @param blockHash The hash of a {@link Block}.
|
||||
* @return True if the hash belongs to a {@link Block} we know about, otherwise
|
||||
* {@code false}.
|
||||
*/
|
||||
public boolean isBlockHashKnown(String blockHash) {
|
||||
return daoState.getBlockHashes().contains(blockHash);
|
||||
}
|
||||
|
||||
public Optional<Block> getLastBlock() {
|
||||
if (!getBlocks().isEmpty())
|
||||
return Optional.of(daoState.getLastBlock());
|
||||
@ -316,6 +309,10 @@ public class DaoStateService implements DaoSetupService {
|
||||
return Optional.ofNullable(daoState.getBlocksByHeight().get(height));
|
||||
}
|
||||
|
||||
public long getBlockTimeAtBlockHeight(int height) {
|
||||
return getBlockAtHeight(height).map(Block::getTime).orElse(0L);
|
||||
}
|
||||
|
||||
public boolean containsBlock(Block block) {
|
||||
return getBlocks().contains(block);
|
||||
}
|
||||
@ -324,12 +321,15 @@ public class DaoStateService implements DaoSetupService {
|
||||
return getBlockAtHeight(height).map(Block::getTime).orElse(0L);
|
||||
}
|
||||
|
||||
public List<Block> getBlocksFromBlockHeight(int fromBlockHeight) {
|
||||
return getBlocksFromBlockHeight(fromBlockHeight, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
public List<Block> getBlocksFromBlockHeight(int fromBlockHeight, int numMaxBlocks) {
|
||||
// We limit requests to numMaxBlocks blocks, to avoid performance issues and too
|
||||
// large network data in case a node requests too far back in history.
|
||||
return getBlocks().stream()
|
||||
.filter(block -> block.getHeight() >= fromBlockHeight)
|
||||
.sorted(Comparator.comparing(Block::getHeight))
|
||||
.limit(numMaxBlocks)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
@ -37,12 +37,15 @@ import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
|
||||
/**
|
||||
* Manages periodical snapshots of the DaoState.
|
||||
* At startup we apply a snapshot if available.
|
||||
@ -61,8 +64,10 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene
|
||||
private final Preferences preferences;
|
||||
private final File storageDir;
|
||||
|
||||
private DaoState daoStateSnapshotCandidate;
|
||||
private LinkedList<DaoStateHash> daoStateHashChainSnapshotCandidate = new LinkedList<>();
|
||||
private protobuf.DaoState daoStateCandidate;
|
||||
private LinkedList<DaoStateHash> hashChainCandidate = new LinkedList<>();
|
||||
private List<Block> blocksCandidate;
|
||||
private int snapshotHeight;
|
||||
private int chainHeightOfLastApplySnapshot;
|
||||
@Setter
|
||||
@Nullable
|
||||
@ -143,12 +148,20 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene
|
||||
long ts = System.currentTimeMillis();
|
||||
// We do not keep a copy of the clone as we use it immediately for persistence.
|
||||
GcUtil.maybeReleaseMemory();
|
||||
log.info("Create snapshot at height {}", daoStateService.getChainHeight());
|
||||
daoStateStorageService.requestPersistence(daoStateService.getClone(),
|
||||
new LinkedList<>(daoStateMonitoringService.getDaoStateHashChain()),
|
||||
int chainHeight = daoStateService.getChainHeight();
|
||||
log.info("Create snapshot at height {}", chainHeight);
|
||||
// We do not keep the data in our fields to enable gc as soon its released in the store
|
||||
|
||||
protobuf.DaoState daoStateForSnapshot = getDaoStateForSnapshot();
|
||||
List<Block> blocksForSnapshot = getBlocksForSnapshot();
|
||||
LinkedList<DaoStateHash> hashChainForSnapshot = getHashChainForSnapshot();
|
||||
daoStateStorageService.requestPersistence(daoStateForSnapshot,
|
||||
blocksForSnapshot,
|
||||
hashChainForSnapshot,
|
||||
() -> {
|
||||
GcUtil.maybeReleaseMemory();
|
||||
log.info("Persisted daoState after parsing completed at height {}. Took {} ms",
|
||||
daoStateService.getChainHeight(), System.currentTimeMillis() - ts);
|
||||
chainHeight, System.currentTimeMillis() - ts);
|
||||
});
|
||||
GcUtil.maybeReleaseMemory();
|
||||
});
|
||||
@ -166,8 +179,8 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene
|
||||
|
||||
// Either we don't have a snapshot candidate yet, or if we have one the height at that snapshot candidate must be
|
||||
// different to our current height.
|
||||
boolean noSnapshotCandidateOrDifferentHeight = daoStateSnapshotCandidate == null ||
|
||||
daoStateSnapshotCandidate.getChainHeight() != chainHeight;
|
||||
boolean noSnapshotCandidateOrDifferentHeight = daoStateCandidate == null ||
|
||||
snapshotHeight != chainHeight;
|
||||
if (isSnapshotHeight(chainHeight) &&
|
||||
!daoStateService.getBlocks().isEmpty() &&
|
||||
isValidHeight(daoStateService.getBlockHeightOfLastBlock()) &&
|
||||
@ -190,10 +203,10 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene
|
||||
return;
|
||||
}
|
||||
|
||||
if (daoStateSnapshotCandidate != null) {
|
||||
if (daoStateCandidate != null) {
|
||||
persist();
|
||||
} else {
|
||||
createClones();
|
||||
createSnapshot();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -201,41 +214,32 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene
|
||||
private void persist() {
|
||||
long ts = System.currentTimeMillis();
|
||||
readyForPersisting = false;
|
||||
daoStateStorageService.requestPersistence(daoStateSnapshotCandidate,
|
||||
daoStateHashChainSnapshotCandidate,
|
||||
daoStateStorageService.requestPersistence(daoStateCandidate,
|
||||
blocksCandidate,
|
||||
hashChainCandidate,
|
||||
() -> {
|
||||
log.info("Serializing snapshotCandidate for writing to Disc at chainHeight {} took {} ms.\n" +
|
||||
"daoStateSnapshotCandidate.height={};\n" +
|
||||
"daoStateHashChainSnapshotCandidate.height={}",
|
||||
daoStateService.getChainHeight(),
|
||||
System.currentTimeMillis() - ts,
|
||||
daoStateSnapshotCandidate != null ? daoStateSnapshotCandidate.getChainHeight() : "N/A",
|
||||
daoStateHashChainSnapshotCandidate != null && !daoStateHashChainSnapshotCandidate.isEmpty() ?
|
||||
daoStateHashChainSnapshotCandidate.getLast().getHeight() : "N/A");
|
||||
log.info("Serializing daoStateCandidate for writing to Disc at chainHeight {} took {} ms.",
|
||||
snapshotHeight, System.currentTimeMillis() - ts);
|
||||
|
||||
createClones();
|
||||
createSnapshot();
|
||||
readyForPersisting = true;
|
||||
});
|
||||
}
|
||||
|
||||
private void createClones() {
|
||||
private void createSnapshot() {
|
||||
long ts = System.currentTimeMillis();
|
||||
// Now we clone and keep it in memory for the next trigger event
|
||||
// We do not fit into the target grid of 20 blocks as we get called here once persistence is
|
||||
// done from the write thread (mapped back to user thread).
|
||||
// As we want to prevent to maintain 2 clones we prefer that strategy. If we would do the clone
|
||||
// after the persist call we would keep an additional copy in memory.
|
||||
daoStateSnapshotCandidate = daoStateService.getClone();
|
||||
daoStateHashChainSnapshotCandidate = new LinkedList<>(daoStateMonitoringService.getDaoStateHashChain());
|
||||
daoStateCandidate = getDaoStateForSnapshot();
|
||||
blocksCandidate = getBlocksForSnapshot();
|
||||
hashChainCandidate = getHashChainForSnapshot();
|
||||
snapshotHeight = daoStateService.getChainHeight();
|
||||
GcUtil.maybeReleaseMemory();
|
||||
|
||||
log.info("Cloned new snapshotCandidate at chainHeight {} took {} ms.\n" +
|
||||
"daoStateSnapshotCandidate.height={};\n" +
|
||||
"daoStateHashChainSnapshotCandidate.height={}",
|
||||
daoStateService.getChainHeight(), System.currentTimeMillis() - ts,
|
||||
daoStateSnapshotCandidate != null ? daoStateSnapshotCandidate.getChainHeight() : "N/A",
|
||||
daoStateHashChainSnapshotCandidate != null && !daoStateHashChainSnapshotCandidate.isEmpty() ?
|
||||
daoStateHashChainSnapshotCandidate.getLast().getHeight() : "N/A");
|
||||
log.info("Cloned new daoStateCandidate at height {} took {} ms.", snapshotHeight, System.currentTimeMillis() - ts);
|
||||
}
|
||||
|
||||
public void applySnapshot(boolean fromReorg) {
|
||||
@ -245,13 +249,14 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene
|
||||
int chainHeightOfPersisted = persistedBsqState.getChainHeight();
|
||||
if (!persistedBsqState.getBlocks().isEmpty()) {
|
||||
int heightOfLastBlock = persistedBsqState.getLastBlock().getHeight();
|
||||
log.debug("applySnapshot from persistedBsqState daoState with height of last block {}", heightOfLastBlock);
|
||||
checkArgument(heightOfLastBlock == chainHeightOfPersisted,
|
||||
"chainHeightOfPersisted must be same as heightOfLastBlock");
|
||||
if (isValidHeight(heightOfLastBlock)) {
|
||||
if (chainHeightOfLastApplySnapshot != chainHeightOfPersisted) {
|
||||
chainHeightOfLastApplySnapshot = chainHeightOfPersisted;
|
||||
daoStateService.applySnapshot(persistedBsqState);
|
||||
daoStateMonitoringService.applySnapshot(persistedDaoStateHashChain);
|
||||
daoStateStorageService.pruneStore();
|
||||
daoStateStorageService.releaseMemory();
|
||||
} else {
|
||||
// The reorg might have been caused by the previous parsing which might contains a range of
|
||||
// blocks.
|
||||
@ -268,6 +273,8 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene
|
||||
"We remove all dao store files and shutdown. " +
|
||||
"After a restart resource files will be applied if available.");
|
||||
resyncDaoStateFromResources();
|
||||
} else {
|
||||
log.info("No Bsq blocks in DaoState. Expected if no data are provided yet from resources or persisted data.");
|
||||
}
|
||||
} else {
|
||||
log.info("Try to apply snapshot but no stored snapshot available. That is expected at first blocks.");
|
||||
@ -309,4 +316,17 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene
|
||||
private boolean isSnapshotHeight(int height) {
|
||||
return isSnapshotHeight(genesisTxInfo.getGenesisBlockHeight(), height, SNAPSHOT_GRID);
|
||||
}
|
||||
|
||||
private protobuf.DaoState getDaoStateForSnapshot() {
|
||||
return daoStateService.getBsqStateCloneExcludingBlocks();
|
||||
}
|
||||
|
||||
private List<Block> getBlocksForSnapshot() {
|
||||
int fromBlockHeight = daoStateStorageService.getChainHeightOfPersistedBlocks() + 1;
|
||||
return daoStateService.getBlocksFromBlockHeight(fromBlockHeight);
|
||||
}
|
||||
|
||||
private LinkedList<DaoStateHash> getHashChainForSnapshot() {
|
||||
return new LinkedList<>(daoStateMonitoringService.getDaoStateHashChain());
|
||||
}
|
||||
}
|
||||
|
@ -71,6 +71,10 @@ public class DaoState implements PersistablePayload {
|
||||
return DaoState.fromProto(daoState.getBsqStateBuilder().build());
|
||||
}
|
||||
|
||||
public static protobuf.DaoState getBsqStateCloneExcludingBlocks(DaoState daoState) {
|
||||
return daoState.getBsqStateBuilderExcludingBlocks().build();
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Fields
|
||||
@ -115,8 +119,6 @@ public class DaoState implements PersistablePayload {
|
||||
@JsonExclude
|
||||
private transient final Map<Integer, Block> blocksByHeight; // Blocks indexed by height
|
||||
@JsonExclude
|
||||
private transient final Set<String> blockHashes; // Cache of known block hashes
|
||||
@JsonExclude
|
||||
private transient final Map<TxOutputType, Set<TxOutput>> txOutputsByTxOutputType = new HashMap<>();
|
||||
|
||||
|
||||
@ -172,10 +174,6 @@ public class DaoState implements PersistablePayload {
|
||||
.peek(this::addToTxOutputsByTxOutputTypeMap)
|
||||
.collect(Collectors.toMap(Tx::getId, Function.identity(), (x, y) -> x, HashMap::new));
|
||||
|
||||
blockHashes = blocks.stream()
|
||||
.map(Block::getHash)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
blocksByHeight = blocks.stream()
|
||||
.collect(Collectors.toMap(Block::getHeight, Function.identity(), (x, y) -> x, HashMap::new));
|
||||
}
|
||||
@ -212,6 +210,10 @@ public class DaoState implements PersistablePayload {
|
||||
LinkedList<Block> blocks = proto.getBlocksList().stream()
|
||||
.map(Block::fromProto)
|
||||
.collect(Collectors.toCollection(LinkedList::new));
|
||||
return fromProto(proto, blocks);
|
||||
}
|
||||
|
||||
public static DaoState fromProto(protobuf.DaoState proto, LinkedList<Block> blocks) {
|
||||
LinkedList<Cycle> cycles = proto.getCyclesList().stream()
|
||||
.map(Cycle::fromProto).collect(Collectors.toCollection(LinkedList::new));
|
||||
TreeMap<TxOutputKey, TxOutput> unspentTxOutputMap = new TreeMap<>(proto.getUnspentTxOutputMapMap().entrySet().stream()
|
||||
@ -293,10 +295,6 @@ public class DaoState implements PersistablePayload {
|
||||
return Collections.unmodifiableMap(txCache);
|
||||
}
|
||||
|
||||
public Set<String> getBlockHashes() {
|
||||
return Collections.unmodifiableSet(blockHashes);
|
||||
}
|
||||
|
||||
public Map<Integer, Block> getBlocksByHeight() {
|
||||
return Collections.unmodifiableMap(blocksByHeight);
|
||||
}
|
||||
@ -322,7 +320,6 @@ public class DaoState implements PersistablePayload {
|
||||
|
||||
public void addBlock(Block block) {
|
||||
blocks.add(block);
|
||||
blockHashes.add(block.getHash());
|
||||
blocksByHeight.put(block.getHeight(), block);
|
||||
}
|
||||
|
||||
@ -337,7 +334,6 @@ public class DaoState implements PersistablePayload {
|
||||
public void clearAndSetBlocks(List<Block> newBlocks) {
|
||||
blocks.clear();
|
||||
blocksByHeight.clear();
|
||||
blockHashes.clear();
|
||||
|
||||
addBlocks(newBlocks);
|
||||
}
|
||||
|
@ -0,0 +1,178 @@
|
||||
/*
|
||||
* This file is part of Bisq.
|
||||
*
|
||||
* Bisq is free software: you can redistribute it and/or modify it
|
||||
* under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or (at
|
||||
* your option) any later version.
|
||||
*
|
||||
* Bisq is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
|
||||
* License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package bisq.core.dao.state.storage;
|
||||
|
||||
import bisq.common.file.FileUtil;
|
||||
import bisq.common.proto.persistable.PersistenceProtoResolver;
|
||||
|
||||
import protobuf.BaseBlock;
|
||||
|
||||
import java.nio.file.Path;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class BlocksPersistence {
|
||||
public static final int BUCKET_SIZE = 1000; // results in about 1 MB files and about 1 new file per week
|
||||
|
||||
private final File storageDir;
|
||||
private final String fileName;
|
||||
private final PersistenceProtoResolver persistenceProtoResolver;
|
||||
private Path usedTempFilePath;
|
||||
|
||||
public BlocksPersistence(File storageDir, String fileName, PersistenceProtoResolver persistenceProtoResolver) {
|
||||
this.storageDir = storageDir;
|
||||
this.fileName = fileName;
|
||||
this.persistenceProtoResolver = persistenceProtoResolver;
|
||||
}
|
||||
|
||||
public void writeBlocks(List<BaseBlock> protobufBlocks) {
|
||||
long ts = System.currentTimeMillis();
|
||||
if (!storageDir.exists()) {
|
||||
storageDir.mkdir();
|
||||
}
|
||||
List<BaseBlock> temp = new ArrayList<>();
|
||||
int bucketIndex = 0;
|
||||
for (BaseBlock block : protobufBlocks) {
|
||||
temp.add(block);
|
||||
|
||||
int height = block.getHeight();
|
||||
bucketIndex = height / BUCKET_SIZE;
|
||||
int remainder = height % BUCKET_SIZE;
|
||||
boolean isLastBucketItem = remainder == 0;
|
||||
if (isLastBucketItem) {
|
||||
int first = bucketIndex * BUCKET_SIZE - BUCKET_SIZE + 1;
|
||||
int last = bucketIndex * BUCKET_SIZE;
|
||||
File storageFile = new File(storageDir, fileName + "_" + first + "-" + last);
|
||||
writeToDisk(storageFile, new BsqBlockStore(temp));
|
||||
temp = new ArrayList<>();
|
||||
}
|
||||
}
|
||||
if (!temp.isEmpty()) {
|
||||
bucketIndex++;
|
||||
int first = bucketIndex * BUCKET_SIZE - BUCKET_SIZE + 1;
|
||||
int last = bucketIndex * BUCKET_SIZE;
|
||||
File storageFile = new File(storageDir, fileName + "_" + first + "-" + last);
|
||||
writeToDisk(storageFile, new BsqBlockStore(temp));
|
||||
|
||||
}
|
||||
log.info("Write {} blocks to disk took {} msec", protobufBlocks.size(), System.currentTimeMillis() - ts);
|
||||
}
|
||||
|
||||
public void removeBlocksDirectory() {
|
||||
if (storageDir.exists()) {
|
||||
try {
|
||||
FileUtil.deleteDirectory(storageDir);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public List<BaseBlock> readBlocks(int from, int to) {
|
||||
if (!storageDir.exists()) {
|
||||
storageDir.mkdir();
|
||||
}
|
||||
|
||||
long ts = System.currentTimeMillis();
|
||||
List<BaseBlock> buckets = new ArrayList<>();
|
||||
int start = from / BUCKET_SIZE + 1;
|
||||
int end = to / BUCKET_SIZE + 1;
|
||||
for (int bucketIndex = start; bucketIndex <= end; bucketIndex++) {
|
||||
List<BaseBlock> bucket = readBucket(bucketIndex);
|
||||
buckets.addAll(bucket);
|
||||
}
|
||||
log.info("Reading {} blocks took {} msec", buckets.size(), System.currentTimeMillis() - ts);
|
||||
return buckets;
|
||||
}
|
||||
|
||||
|
||||
private List<BaseBlock> readBucket(int bucketIndex) {
|
||||
int first = bucketIndex * BUCKET_SIZE - BUCKET_SIZE + 1;
|
||||
int last = bucketIndex * BUCKET_SIZE;
|
||||
String child = fileName + "_" + first + "-" + last;
|
||||
File storageFile = new File(storageDir, child);
|
||||
if (!storageFile.exists()) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
try (FileInputStream fileInputStream = new FileInputStream(storageFile)) {
|
||||
protobuf.PersistableEnvelope proto = protobuf.PersistableEnvelope.parseDelimitedFrom(fileInputStream);
|
||||
BsqBlockStore bsqBlockStore = (BsqBlockStore) persistenceProtoResolver.fromProto(proto);
|
||||
return bsqBlockStore.getBlocksAsProto();
|
||||
} catch (Throwable t) {
|
||||
log.info("Reading {} failed with {}.", fileName, t.getMessage());
|
||||
return new ArrayList<>();
|
||||
}
|
||||
}
|
||||
|
||||
private void writeToDisk(File storageFile, BsqBlockStore bsqBlockStore) {
|
||||
File tempFile = null;
|
||||
FileOutputStream fileOutputStream = null;
|
||||
try {
|
||||
tempFile = usedTempFilePath != null
|
||||
? FileUtil.createNewFile(usedTempFilePath)
|
||||
: File.createTempFile("temp_" + fileName, null, storageDir);
|
||||
|
||||
// Don't use a new temp file path each time, as that causes the delete-on-exit hook to leak memory:
|
||||
tempFile.deleteOnExit();
|
||||
|
||||
fileOutputStream = new FileOutputStream(tempFile);
|
||||
bsqBlockStore.toProtoMessage().writeDelimitedTo(fileOutputStream);
|
||||
|
||||
// Attempt to force the bits to hit the disk. In reality the OS or hard disk itself may still decide
|
||||
// to not write through to physical media for at least a few seconds, but this is the best we can do.
|
||||
fileOutputStream.flush();
|
||||
fileOutputStream.getFD().sync();
|
||||
|
||||
// Close resources before replacing file with temp file because otherwise it causes problems on windows
|
||||
// when rename temp file
|
||||
fileOutputStream.close();
|
||||
|
||||
FileUtil.renameFile(tempFile, storageFile);
|
||||
usedTempFilePath = tempFile.toPath();
|
||||
} catch (Throwable t) {
|
||||
// If an error occurred, don't attempt to reuse this path again, in case temp file cleanup fails.
|
||||
usedTempFilePath = null;
|
||||
log.error("Error at saveToFile, storageFile={}", fileName, t);
|
||||
} finally {
|
||||
if (tempFile != null && tempFile.exists()) {
|
||||
log.warn("Temp file still exists after failed save. We will delete it now. storageFile={}", fileName);
|
||||
if (!tempFile.delete()) {
|
||||
log.error("Cannot delete temp file.");
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
if (fileOutputStream != null) {
|
||||
fileOutputStream.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
log.error("Cannot close resources." + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,52 @@
|
||||
/*
|
||||
* This file is part of Bisq.
|
||||
*
|
||||
* Bisq is free software: you can redistribute it and/or modify it
|
||||
* under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or (at
|
||||
* your option) any later version.
|
||||
*
|
||||
* Bisq is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
|
||||
* License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package bisq.core.dao.state.storage;
|
||||
|
||||
import bisq.common.proto.persistable.PersistableEnvelope;
|
||||
|
||||
import protobuf.BaseBlock;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* Wrapper for list of blocks
|
||||
*/
|
||||
@Slf4j
|
||||
public class BsqBlockStore implements PersistableEnvelope {
|
||||
@Getter
|
||||
private final List<BaseBlock> blocksAsProto;
|
||||
|
||||
public BsqBlockStore(List<protobuf.BaseBlock> blocksAsProto) {
|
||||
this.blocksAsProto = blocksAsProto;
|
||||
}
|
||||
|
||||
public Message toProtoMessage() {
|
||||
return protobuf.PersistableEnvelope.newBuilder()
|
||||
.setBsqBlockStore(protobuf.BsqBlockStore.newBuilder().addAllBlocks(blocksAsProto))
|
||||
.build();
|
||||
}
|
||||
|
||||
public static BsqBlockStore fromProto(protobuf.BsqBlockStore proto) {
|
||||
return new BsqBlockStore(proto.getBlocksList());
|
||||
}
|
||||
}
|
@ -0,0 +1,162 @@
|
||||
/*
|
||||
* This file is part of Bisq.
|
||||
*
|
||||
* Bisq is free software: you can redistribute it and/or modify it
|
||||
* under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or (at
|
||||
* your option) any later version.
|
||||
*
|
||||
* Bisq is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
|
||||
* License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package bisq.core.dao.state.storage;
|
||||
|
||||
import bisq.core.dao.state.GenesisTxInfo;
|
||||
import bisq.core.dao.state.model.blockchain.Block;
|
||||
|
||||
import bisq.common.config.Config;
|
||||
import bisq.common.proto.persistable.PersistenceProtoResolver;
|
||||
|
||||
import protobuf.BaseBlock;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Named;
|
||||
import javax.inject.Singleton;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
|
||||
import java.net.URL;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@Singleton
|
||||
public class BsqBlocksStorageService {
|
||||
public final static String NAME = "BsqBlocks";
|
||||
|
||||
private final int genesisBlockHeight;
|
||||
private final File storageDir;
|
||||
private final BlocksPersistence blocksPersistence;
|
||||
@Getter
|
||||
private int chainHeightOfPersistedBlocks;
|
||||
|
||||
@Inject
|
||||
public BsqBlocksStorageService(GenesisTxInfo genesisTxInfo,
|
||||
PersistenceProtoResolver persistenceProtoResolver,
|
||||
@Named(Config.STORAGE_DIR) File dbStorageDir) {
|
||||
genesisBlockHeight = genesisTxInfo.getGenesisBlockHeight();
|
||||
storageDir = new File(dbStorageDir.getAbsolutePath() + File.separator + NAME);
|
||||
blocksPersistence = new BlocksPersistence(storageDir, NAME, persistenceProtoResolver);
|
||||
}
|
||||
|
||||
public void persistBlocks(List<Block> blocks) {
|
||||
long ts = System.currentTimeMillis();
|
||||
List<BaseBlock> protobufBlocks = blocks.stream()
|
||||
.map(Block::toProtoMessage)
|
||||
.collect(Collectors.toList());
|
||||
blocksPersistence.writeBlocks(protobufBlocks);
|
||||
|
||||
if (!blocks.isEmpty()) {
|
||||
chainHeightOfPersistedBlocks = Math.max(chainHeightOfPersistedBlocks,
|
||||
getHeightOfLastFullBucket(blocks));
|
||||
}
|
||||
log.info("Persist (serialize+write) {} blocks took {} ms",
|
||||
blocks.size(),
|
||||
System.currentTimeMillis() - ts);
|
||||
}
|
||||
|
||||
public LinkedList<Block> readBlocks(int chainHeight) {
|
||||
long ts = System.currentTimeMillis();
|
||||
LinkedList<Block> blocks = new LinkedList<>();
|
||||
List<BaseBlock> list = blocksPersistence.readBlocks(genesisBlockHeight, chainHeight);
|
||||
list.stream().map(Block::fromProto)
|
||||
.forEach(blocks::add);
|
||||
log.info("Reading and deserializing {} blocks took {} ms", blocks.size(), System.currentTimeMillis() - ts);
|
||||
if (!blocks.isEmpty()) {
|
||||
chainHeightOfPersistedBlocks = getHeightOfLastFullBucket(blocks);
|
||||
}
|
||||
return blocks;
|
||||
}
|
||||
|
||||
public LinkedList<Block> migrateBlocks(List<protobuf.BaseBlock> protobufBlocks) {
|
||||
long ts = System.currentTimeMillis();
|
||||
blocksPersistence.writeBlocks(protobufBlocks);
|
||||
LinkedList<Block> blocks = new LinkedList<>();
|
||||
protobufBlocks.forEach(protobufBlock -> blocks.add(Block.fromProto(protobufBlock)));
|
||||
if (!blocks.isEmpty()) {
|
||||
chainHeightOfPersistedBlocks = getHeightOfLastFullBucket(blocks);
|
||||
}
|
||||
|
||||
log.info("Migrating blocks (write+deserialization) from DaoStateStore took {} ms", System.currentTimeMillis() - ts);
|
||||
return blocks;
|
||||
}
|
||||
|
||||
|
||||
void copyFromResources(String postFix) {
|
||||
long ts = System.currentTimeMillis();
|
||||
try {
|
||||
String dirName = BsqBlocksStorageService.NAME;
|
||||
String resourceDir = dirName + postFix;
|
||||
|
||||
if (storageDir.exists()) {
|
||||
log.info("No resource directory was copied. {} exists already.", dirName);
|
||||
return;
|
||||
}
|
||||
|
||||
URL dirUrl = getClass().getClassLoader().getResource(resourceDir);
|
||||
if (dirUrl == null) {
|
||||
log.info("Directory {} in resources does not exist.", resourceDir);
|
||||
return;
|
||||
}
|
||||
File dir = new File(dirUrl.toURI());
|
||||
String[] fileNames = dir.list();
|
||||
if (fileNames == null) {
|
||||
log.info("No files in directory. {}", dir.getAbsolutePath());
|
||||
return;
|
||||
}
|
||||
if (!storageDir.exists()) {
|
||||
storageDir.mkdir();
|
||||
}
|
||||
for (String fileName : fileNames) {
|
||||
URL url = getClass().getClassLoader().getResource(resourceDir + File.separator + fileName);
|
||||
File resourceFile = new File(url.toURI());
|
||||
File destinationFile = new File(storageDir, fileName);
|
||||
FileUtils.copyFile(resourceFile, destinationFile);
|
||||
}
|
||||
log.info("Copying {} resource files took {} ms", fileNames.length, System.currentTimeMillis() - ts);
|
||||
} catch (Throwable e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
private int getHeightOfLastFullBucket(List<Block> blocks) {
|
||||
int bucketIndex = blocks.get(blocks.size() - 1).getHeight() / BlocksPersistence.BUCKET_SIZE;
|
||||
return bucketIndex * BlocksPersistence.BUCKET_SIZE;
|
||||
}
|
||||
|
||||
public void removeBlocksDirectory() {
|
||||
blocksPersistence.removeBlocksDirectory();
|
||||
}
|
||||
|
||||
// We recreate the directory so that we don't fill the blocks after restart from resources
|
||||
// In copyFromResources we only check for the directory not the files inside.
|
||||
public void removeBlocksInDirectory() {
|
||||
blocksPersistence.removeBlocksDirectory();
|
||||
if (!storageDir.exists()) {
|
||||
storageDir.mkdir();
|
||||
}
|
||||
}
|
||||
}
|
@ -17,13 +17,14 @@
|
||||
|
||||
package bisq.core.dao.state.storage;
|
||||
|
||||
import bisq.core.dao.monitoring.DaoStateMonitoringService;
|
||||
import bisq.core.dao.monitoring.model.DaoStateHash;
|
||||
import bisq.core.dao.state.model.DaoState;
|
||||
import bisq.core.dao.state.model.blockchain.Block;
|
||||
|
||||
import bisq.network.p2p.storage.persistence.ResourceDataStoreService;
|
||||
import bisq.network.p2p.storage.persistence.StoreService;
|
||||
|
||||
import bisq.common.UserThread;
|
||||
import bisq.common.config.Config;
|
||||
import bisq.common.file.FileUtil;
|
||||
import bisq.common.persistence.PersistenceManager;
|
||||
@ -36,9 +37,12 @@ import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
|
||||
/**
|
||||
* Manages persistence of the daoState.
|
||||
*/
|
||||
@ -46,8 +50,9 @@ import lombok.extern.slf4j.Slf4j;
|
||||
public class DaoStateStorageService extends StoreService<DaoStateStore> {
|
||||
private static final String FILE_NAME = "DaoStateStore";
|
||||
|
||||
private final DaoState daoState;
|
||||
private final DaoStateMonitoringService daoStateMonitoringService;
|
||||
private final BsqBlocksStorageService bsqBlocksStorageService;
|
||||
private final File storageDir;
|
||||
private final LinkedList<Block> blocks = new LinkedList<>();
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
@ -56,13 +61,12 @@ public class DaoStateStorageService extends StoreService<DaoStateStore> {
|
||||
|
||||
@Inject
|
||||
public DaoStateStorageService(ResourceDataStoreService resourceDataStoreService,
|
||||
DaoState daoState,
|
||||
DaoStateMonitoringService daoStateMonitoringService,
|
||||
BsqBlocksStorageService bsqBlocksStorageService,
|
||||
@Named(Config.STORAGE_DIR) File storageDir,
|
||||
PersistenceManager<DaoStateStore> persistenceManager) {
|
||||
super(storageDir, persistenceManager);
|
||||
this.daoState = daoState;
|
||||
this.daoStateMonitoringService = daoStateMonitoringService;
|
||||
this.bsqBlocksStorageService = bsqBlocksStorageService;
|
||||
this.storageDir = storageDir;
|
||||
|
||||
resourceDataStoreService.addService(this);
|
||||
}
|
||||
@ -77,55 +81,122 @@ public class DaoStateStorageService extends StoreService<DaoStateStore> {
|
||||
return FILE_NAME;
|
||||
}
|
||||
|
||||
public void requestPersistence(DaoState daoState,
|
||||
public int getChainHeightOfPersistedBlocks() {
|
||||
return bsqBlocksStorageService.getChainHeightOfPersistedBlocks();
|
||||
}
|
||||
|
||||
public void requestPersistence(protobuf.DaoState daoStateAsProto,
|
||||
List<Block> blocks,
|
||||
LinkedList<DaoStateHash> daoStateHashChain,
|
||||
Runnable completeHandler) {
|
||||
if (daoState == null) {
|
||||
if (daoStateAsProto == null) {
|
||||
completeHandler.run();
|
||||
return;
|
||||
}
|
||||
|
||||
store.setDaoState(daoState);
|
||||
store.setDaoStateHashChain(daoStateHashChain);
|
||||
|
||||
// We let the persistence run in a thread to avoid the slow protobuf serialisation to happen on the user
|
||||
// thread. We also call it immediately to get notified about the completion event.
|
||||
new Thread(() -> {
|
||||
Thread.currentThread().setName("Serialize and write DaoState");
|
||||
Thread.currentThread().setName("Write-blocks-and-DaoState");
|
||||
bsqBlocksStorageService.persistBlocks(blocks);
|
||||
|
||||
store.setDaoStateAsProto(daoStateAsProto);
|
||||
store.setDaoStateHashChain(daoStateHashChain);
|
||||
long ts = System.currentTimeMillis();
|
||||
persistenceManager.persistNow(() -> {
|
||||
// After we have written to disk we remove the daoState in the store to avoid that it stays in
|
||||
// After we have written to disk we remove the daoStateAsProto in the store to avoid that it stays in
|
||||
// memory there until the next persist call.
|
||||
pruneStore();
|
||||
completeHandler.run();
|
||||
log.info("Persist daoState took {} ms", System.currentTimeMillis() - ts);
|
||||
store.releaseMemory();
|
||||
GcUtil.maybeReleaseMemory();
|
||||
UserThread.execute(completeHandler);
|
||||
});
|
||||
}).start();
|
||||
}
|
||||
|
||||
public void pruneStore() {
|
||||
store.setDaoState(null);
|
||||
store.setDaoStateHashChain(null);
|
||||
GcUtil.maybeReleaseMemory();
|
||||
@Override
|
||||
protected void readFromResources(String postFix, Runnable completeHandler) {
|
||||
new Thread(() -> {
|
||||
Thread.currentThread().setName("copyBsqBlocksFromResources");
|
||||
bsqBlocksStorageService.copyFromResources(postFix);
|
||||
|
||||
super.readFromResources(postFix, () -> {
|
||||
// We got mapped back to user thread so we need to create a new thread again as we dont want to
|
||||
// execute on user thread
|
||||
new Thread(() -> {
|
||||
Thread.currentThread().setName("Read-BsqBlocksStore");
|
||||
protobuf.DaoState daoStateAsProto = store.getDaoStateAsProto();
|
||||
if (daoStateAsProto != null) {
|
||||
LinkedList<Block> list;
|
||||
if (daoStateAsProto.getBlocksList().isEmpty()) {
|
||||
int chainHeight = daoStateAsProto.getChainHeight();
|
||||
list = bsqBlocksStorageService.readBlocks(chainHeight);
|
||||
if (!list.isEmpty()) {
|
||||
int heightOfLastBlock = list.getLast().getHeight();
|
||||
checkArgument(heightOfLastBlock == chainHeight,
|
||||
"heightOfLastBlock must match chainHeight");
|
||||
}
|
||||
} else {
|
||||
list = bsqBlocksStorageService.migrateBlocks(daoStateAsProto.getBlocksList());
|
||||
}
|
||||
blocks.clear();
|
||||
blocks.addAll(list);
|
||||
}
|
||||
UserThread.execute(completeHandler);
|
||||
}).start();
|
||||
});
|
||||
}).start();
|
||||
}
|
||||
|
||||
public DaoState getPersistedBsqState() {
|
||||
return store.getDaoState();
|
||||
protobuf.DaoState daoStateAsProto = store.getDaoStateAsProto();
|
||||
if (daoStateAsProto != null) {
|
||||
long ts = System.currentTimeMillis();
|
||||
DaoState daoState = DaoState.fromProto(daoStateAsProto, blocks);
|
||||
log.info("Deserializing DaoState with {} blocks took {} ms",
|
||||
daoState.getBlocks().size(), System.currentTimeMillis() - ts);
|
||||
return daoState;
|
||||
}
|
||||
return new DaoState();
|
||||
}
|
||||
|
||||
public LinkedList<DaoStateHash> getPersistedDaoStateHashChain() {
|
||||
return store.getDaoStateHashChain();
|
||||
}
|
||||
|
||||
public void releaseMemory() {
|
||||
blocks.clear();
|
||||
store.releaseMemory();
|
||||
GcUtil.maybeReleaseMemory();
|
||||
}
|
||||
|
||||
public void resyncDaoStateFromGenesis(Runnable resultHandler) {
|
||||
store.setDaoState(new DaoState());
|
||||
String backupDirName = "out_of_sync_dao_data";
|
||||
try {
|
||||
removeAndBackupDaoConsensusFiles(storageDir, backupDirName);
|
||||
} catch (Throwable t) {
|
||||
log.error(t.toString());
|
||||
}
|
||||
|
||||
store.setDaoStateAsProto(DaoState.getBsqStateCloneExcludingBlocks(new DaoState()));
|
||||
store.setDaoStateHashChain(new LinkedList<>());
|
||||
persistenceManager.persistNow(resultHandler);
|
||||
bsqBlocksStorageService.removeBlocksInDirectory();
|
||||
}
|
||||
|
||||
public void resyncDaoStateFromResources(File storageDir) throws IOException {
|
||||
// We delete all DAO consensus payload data and remove the daoState so it will rebuild from latest
|
||||
// We delete all DAO consensus data and remove the daoState so it will rebuild from latest
|
||||
// resource files.
|
||||
long currentTime = System.currentTimeMillis();
|
||||
String backupDirName = "out_of_sync_dao_data";
|
||||
removeAndBackupDaoConsensusFiles(storageDir, backupDirName);
|
||||
|
||||
String newFileName = "DaoStateStore_" + System.currentTimeMillis();
|
||||
FileUtil.removeAndBackupFile(storageDir, new File(storageDir, "DaoStateStore"), newFileName, backupDirName);
|
||||
|
||||
bsqBlocksStorageService.removeBlocksDirectory();
|
||||
}
|
||||
|
||||
private void removeAndBackupDaoConsensusFiles(File storageDir, String backupDirName) throws IOException {
|
||||
// We delete all DAO related data. Some will be rebuild from resources.
|
||||
long currentTime = System.currentTimeMillis();
|
||||
String newFileName = "BlindVoteStore_" + currentTime;
|
||||
FileUtil.removeAndBackupFile(storageDir, new File(storageDir, "BlindVoteStore"), newFileName, backupDirName);
|
||||
|
||||
@ -138,9 +209,6 @@ public class DaoStateStorageService extends StoreService<DaoStateStore> {
|
||||
|
||||
newFileName = "UnconfirmedBsqChangeOutputList_" + currentTime;
|
||||
FileUtil.removeAndBackupFile(storageDir, new File(storageDir, "UnconfirmedBsqChangeOutputList"), newFileName, backupDirName);
|
||||
|
||||
newFileName = "DaoStateStore_" + currentTime;
|
||||
FileUtil.removeAndBackupFile(storageDir, new File(storageDir, "DaoStateStore"), newFileName, backupDirName);
|
||||
}
|
||||
|
||||
|
||||
@ -150,7 +218,7 @@ public class DaoStateStorageService extends StoreService<DaoStateStore> {
|
||||
|
||||
@Override
|
||||
protected DaoStateStore createStore() {
|
||||
return new DaoStateStore(null, new LinkedList<>(daoStateMonitoringService.getDaoStateHashChain()));
|
||||
return new DaoStateStore(null, new LinkedList<>());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -18,7 +18,6 @@
|
||||
package bisq.core.dao.state.storage;
|
||||
|
||||
import bisq.core.dao.monitoring.model.DaoStateHash;
|
||||
import bisq.core.dao.state.model.DaoState;
|
||||
|
||||
import bisq.common.proto.persistable.PersistableEnvelope;
|
||||
|
||||
@ -38,18 +37,17 @@ import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
@Slf4j
|
||||
public class DaoStateStore implements PersistableEnvelope {
|
||||
// DaoState is always a clone and must not be used for read access beside initial read from disc when we apply
|
||||
// the snapshot!
|
||||
@Getter
|
||||
@Setter
|
||||
@Nullable
|
||||
private DaoState daoState;
|
||||
private protobuf.DaoState daoStateAsProto;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
private LinkedList<DaoStateHash> daoStateHashChain;
|
||||
|
||||
DaoStateStore(@Nullable DaoState daoState, LinkedList<DaoStateHash> daoStateHashChain) {
|
||||
this.daoState = daoState;
|
||||
DaoStateStore(@Nullable protobuf.DaoState daoStateAsProto, LinkedList<DaoStateHash> daoStateHashChain) {
|
||||
this.daoStateAsProto = daoStateAsProto;
|
||||
this.daoStateHashChain = daoStateHashChain;
|
||||
}
|
||||
|
||||
@ -59,9 +57,9 @@ public class DaoStateStore implements PersistableEnvelope {
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public Message toProtoMessage() {
|
||||
checkNotNull(daoState, "daoState must not be null when toProtoMessage is invoked");
|
||||
checkNotNull(daoStateAsProto, "daoStateAsProto must not be null when toProtoMessage is invoked");
|
||||
protobuf.DaoStateStore.Builder builder = protobuf.DaoStateStore.newBuilder()
|
||||
.setDaoState(daoState.getBsqStateBuilder())
|
||||
.setDaoState(daoStateAsProto)
|
||||
.addAllDaoStateHash(daoStateHashChain.stream()
|
||||
.map(DaoStateHash::toProtoMessage)
|
||||
.collect(Collectors.toList()));
|
||||
@ -76,6 +74,11 @@ public class DaoStateStore implements PersistableEnvelope {
|
||||
new LinkedList<>(proto.getDaoStateHashList().stream()
|
||||
.map(DaoStateHash::fromProto)
|
||||
.collect(Collectors.toList()));
|
||||
return new DaoStateStore(DaoState.fromProto(proto.getDaoState()), daoStateHashList);
|
||||
return new DaoStateStore(proto.getDaoState(), daoStateHashList);
|
||||
}
|
||||
|
||||
public void releaseMemory() {
|
||||
daoStateAsProto = null;
|
||||
daoStateHashChain = null;
|
||||
}
|
||||
}
|
||||
|
@ -30,6 +30,7 @@ import bisq.core.dao.governance.proposal.MyProposalList;
|
||||
import bisq.core.dao.governance.proposal.storage.appendonly.ProposalStore;
|
||||
import bisq.core.dao.governance.proposal.storage.temp.TempProposalStore;
|
||||
import bisq.core.dao.state.model.governance.BallotList;
|
||||
import bisq.core.dao.state.storage.BsqBlockStore;
|
||||
import bisq.core.dao.state.storage.DaoStateStore;
|
||||
import bisq.core.dao.state.unconfirmed.UnconfirmedBsqChangeOutputList;
|
||||
import bisq.core.payment.PaymentAccountList;
|
||||
@ -138,6 +139,8 @@ public class CorePersistenceProtoResolver extends CoreProtoResolver implements P
|
||||
return IgnoredMailboxMap.fromProto(proto.getIgnoredMailboxMap());
|
||||
case REMOVED_PAYLOADS_MAP:
|
||||
return RemovedPayloadsMap.fromProto(proto.getRemovedPayloadsMap());
|
||||
case BSQ_BLOCK_STORE:
|
||||
return BsqBlockStore.fromProto(proto.getBsqBlockStore());
|
||||
default:
|
||||
throw new ProtobufferRuntimeException("Unknown proto message case(PB.PersistableEnvelope). " +
|
||||
"messageCase=" + proto.getMessageCase() + "; proto raw data=" + proto.toString());
|
||||
|
@ -661,7 +661,7 @@ portfolio.context.notYourOffer=You can only duplicate offers where you were the
|
||||
|
||||
portfolio.closedTrades.deviation.help=Percentage price deviation from market
|
||||
|
||||
portfolio.pending.invalidTx=There is an issue wi~th a missing or invalid transaction.\n\n\
|
||||
portfolio.pending.invalidTx=There is an issue with a missing or invalid transaction.\n\n\
|
||||
Please do NOT send the fiat or altcoin payment.\n\n\
|
||||
Open a support ticket to get assistance from a Mediator.\n\n\
|
||||
Error message: {0}
|
||||
|
@ -1,72 +0,0 @@
|
||||
/*
|
||||
* This file is part of Bisq.
|
||||
*
|
||||
* Bisq is free software: you can redistribute it and/or modify it
|
||||
* under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or (at
|
||||
* your option) any later version.
|
||||
*
|
||||
* Bisq is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
|
||||
* License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package bisq.core.dao.state;
|
||||
|
||||
import bisq.core.dao.state.model.DaoState;
|
||||
import bisq.core.dao.state.model.blockchain.Block;
|
||||
import bisq.core.util.coin.BsqFormatter;
|
||||
|
||||
import org.bitcoinj.core.Coin;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class DaoStateServiceTest {
|
||||
@Test
|
||||
public void testIsBlockHashKnown() {
|
||||
DaoStateService stateService = new DaoStateService(
|
||||
new DaoState(),
|
||||
new GenesisTxInfo("fakegenesistxid", 100, Coin.parseCoin("2.5").value),
|
||||
new BsqFormatter());
|
||||
Assert.assertEquals(
|
||||
"Unknown block should not exist.",
|
||||
false,
|
||||
stateService.isBlockHashKnown("fakeblockhash0")
|
||||
);
|
||||
|
||||
Block block = new Block(0, 1534800000, "fakeblockhash0", null);
|
||||
stateService.onNewBlockHeight(0);
|
||||
stateService.onNewBlockWithEmptyTxs(block);
|
||||
Assert.assertEquals(
|
||||
"Block has to be genesis block to get added.",
|
||||
false,
|
||||
stateService.isBlockHashKnown("fakeblockhash0")
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
"Block that was never added should still not exist.",
|
||||
false,
|
||||
stateService.isBlockHashKnown("fakeblockhash1")
|
||||
);
|
||||
|
||||
block = new Block(1, 1534800001, "fakeblockhash1", null);
|
||||
stateService.onNewBlockHeight(1);
|
||||
stateService.onNewBlockWithEmptyTxs(block);
|
||||
block = new Block(2, 1534800002, "fakeblockhash2", null);
|
||||
stateService.onNewBlockHeight(2);
|
||||
stateService.onNewBlockWithEmptyTxs(block);
|
||||
block = new Block(3, 1534800003, "fakeblockhash3", null);
|
||||
stateService.onNewBlockHeight(3);
|
||||
stateService.onNewBlockWithEmptyTxs(block);
|
||||
Assert.assertEquals(
|
||||
"Block that was never added should still not exist after adding more blocks.",
|
||||
false,
|
||||
stateService.isBlockHashKnown("fakeblockhash4")
|
||||
);
|
||||
}
|
||||
}
|
@ -18,7 +18,6 @@
|
||||
package bisq.desktop.main.dao.governance.result;
|
||||
|
||||
import bisq.core.dao.state.DaoStateService;
|
||||
import bisq.core.dao.state.model.blockchain.Block;
|
||||
import bisq.core.dao.state.model.governance.Cycle;
|
||||
import bisq.core.dao.state.model.governance.DecryptedBallotsWithMerits;
|
||||
import bisq.core.dao.state.model.governance.EvaluatedProposal;
|
||||
@ -81,9 +80,7 @@ class ResultsOfCycle {
|
||||
// At a new cycle we have cycleStartTime 0 as the block is not processed yet.
|
||||
// To display a correct value we access again from the daoStateService
|
||||
if (cycleStartTime == 0)
|
||||
cycleStartTime = daoStateService.getBlockAtHeight(cycle.getHeightOfFirstBlock())
|
||||
.map(Block::getTime)
|
||||
.orElse(0L);
|
||||
cycleStartTime = daoStateService.getBlockTimeAtBlockHeight(cycle.getHeightOfFirstBlock());
|
||||
return cycleStartTime;
|
||||
}
|
||||
}
|
||||
|
@ -418,9 +418,7 @@ public class VoteResultView extends ActivatableView<GridPane, Void> implements D
|
||||
.peek(decryptedBallotsWithMerits -> stakeAndMerit.getAndAdd(decryptedBallotsWithMerits.getStake() + decryptedBallotsWithMerits.getMerit(daoStateService)))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
long cycleStartTime = daoStateService.getBlockAtHeight(cycle.getHeightOfFirstBlock())
|
||||
.map(Block::getTime)
|
||||
.orElse(0L);
|
||||
long cycleStartTime = daoStateService.getBlockTimeAtBlockHeight(cycle.getHeightOfFirstBlock());
|
||||
int cycleIndex = cycleService.getCycleIndex(cycle);
|
||||
ResultsOfCycle resultsOfCycle = new ResultsOfCycle(cycle,
|
||||
cycleIndex,
|
||||
|
2578
p2p/src/main/resources/BsqBlocks_BTC_MAINNET/BsqBlocks_571001-572000
Normal file
2578
p2p/src/main/resources/BsqBlocks_BTC_MAINNET/BsqBlocks_571001-572000
Normal file
File diff suppressed because it is too large
Load Diff
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
7192
p2p/src/main/resources/BsqBlocks_BTC_MAINNET/BsqBlocks_591001-592000
Normal file
7192
p2p/src/main/resources/BsqBlocks_BTC_MAINNET/BsqBlocks_591001-592000
Normal file
File diff suppressed because it is too large
Load Diff
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
9463
p2p/src/main/resources/BsqBlocks_BTC_MAINNET/BsqBlocks_621001-622000
Normal file
9463
p2p/src/main/resources/BsqBlocks_BTC_MAINNET/BsqBlocks_621001-622000
Normal file
File diff suppressed because it is too large
Load Diff
Binary file not shown.
Binary file not shown.
Binary file not shown.
5029
p2p/src/main/resources/BsqBlocks_BTC_MAINNET/BsqBlocks_625001-626000
Normal file
5029
p2p/src/main/resources/BsqBlocks_BTC_MAINNET/BsqBlocks_625001-626000
Normal file
File diff suppressed because it is too large
Load Diff
Binary file not shown.
6293
p2p/src/main/resources/BsqBlocks_BTC_MAINNET/BsqBlocks_627001-628000
Normal file
6293
p2p/src/main/resources/BsqBlocks_BTC_MAINNET/BsqBlocks_627001-628000
Normal file
File diff suppressed because it is too large
Load Diff
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
7506
p2p/src/main/resources/BsqBlocks_BTC_MAINNET/BsqBlocks_646001-647000
Normal file
7506
p2p/src/main/resources/BsqBlocks_BTC_MAINNET/BsqBlocks_646001-647000
Normal file
File diff suppressed because it is too large
Load Diff
Binary file not shown.
Binary file not shown.
Binary file not shown.
6631
p2p/src/main/resources/BsqBlocks_BTC_MAINNET/BsqBlocks_650001-651000
Normal file
6631
p2p/src/main/resources/BsqBlocks_BTC_MAINNET/BsqBlocks_650001-651000
Normal file
File diff suppressed because it is too large
Load Diff
6863
p2p/src/main/resources/BsqBlocks_BTC_MAINNET/BsqBlocks_651001-652000
Normal file
6863
p2p/src/main/resources/BsqBlocks_BTC_MAINNET/BsqBlocks_651001-652000
Normal file
File diff suppressed because it is too large
Load Diff
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue
Block a user