mirror of
https://github.com/bisq-network/bisq.git
synced 2024-11-19 18:03:12 +01:00
Merge pull request #4689 from chimp1984/revert-dao-data-json-export-changes
Revert changes for json export of dao data
This commit is contained in:
commit
917ab4ee32
@ -209,7 +209,7 @@ public abstract class BsqNode implements DaoSetupService {
|
||||
parseBlockchainComplete = true;
|
||||
daoStateService.onParseBlockChainComplete();
|
||||
|
||||
exportJsonFilesService.onParseBlockChainComplete();
|
||||
maybeExportToJson();
|
||||
}
|
||||
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
@ -291,7 +291,7 @@ public abstract class BsqNode implements DaoSetupService {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
protected void maybeExportNewBlockToJson(Block block) {
|
||||
exportJsonFilesService.onNewBlock(block);
|
||||
protected void maybeExportToJson() {
|
||||
exportJsonFilesService.maybeExportToJson();
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ package bisq.core.dao.node.explorer;
|
||||
|
||||
import bisq.core.dao.DaoSetupService;
|
||||
import bisq.core.dao.state.DaoStateService;
|
||||
import bisq.core.dao.state.model.DaoState;
|
||||
import bisq.core.dao.state.model.blockchain.Block;
|
||||
import bisq.core.dao.state.model.blockchain.PubKeyScript;
|
||||
import bisq.core.dao.state.model.blockchain.Tx;
|
||||
@ -26,6 +27,7 @@ import bisq.core.dao.state.model.blockchain.TxOutput;
|
||||
import bisq.core.dao.state.model.blockchain.TxType;
|
||||
|
||||
import bisq.common.config.Config;
|
||||
import bisq.common.file.FileUtil;
|
||||
import bisq.common.file.JsonFileManager;
|
||||
import bisq.common.util.Utilities;
|
||||
|
||||
@ -35,11 +37,18 @@ import com.google.inject.Inject;
|
||||
|
||||
import javax.inject.Named;
|
||||
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
|
||||
import java.nio.file.Paths;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
@ -47,13 +56,17 @@ import java.util.stream.Collectors;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
@Slf4j
|
||||
public class ExportJsonFilesService implements DaoSetupService {
|
||||
private final DaoStateService daoStateService;
|
||||
private final File storageDir;
|
||||
private boolean dumpBlockchainData;
|
||||
private JsonFileManager blockFileManager, txFileManager, txOutputFileManager, bsqStateFileManager;
|
||||
private File blockDir;
|
||||
private final boolean dumpBlockchainData;
|
||||
|
||||
private final ListeningExecutorService executor = Utilities.getListeningExecutorService("JsonExporter",
|
||||
1, 1, 1200);
|
||||
private JsonFileManager txFileManager, txOutputFileManager, bsqStateFileManager;
|
||||
|
||||
@Inject
|
||||
public ExportJsonFilesService(DaoStateService daoStateService,
|
||||
@ -75,135 +88,88 @@ public class ExportJsonFilesService implements DaoSetupService {
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
if (!dumpBlockchainData) {
|
||||
return;
|
||||
if (dumpBlockchainData) {
|
||||
File jsonDir = new File(Paths.get(storageDir.getAbsolutePath(), "json").toString());
|
||||
File txDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "tx").toString());
|
||||
File txOutputDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "txo").toString());
|
||||
File bsqStateDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "all").toString());
|
||||
try {
|
||||
if (txDir.exists())
|
||||
FileUtil.deleteDirectory(txDir);
|
||||
if (txOutputDir.exists())
|
||||
FileUtil.deleteDirectory(txOutputDir);
|
||||
if (bsqStateDir.exists())
|
||||
FileUtil.deleteDirectory(bsqStateDir);
|
||||
if (jsonDir.exists())
|
||||
FileUtil.deleteDirectory(jsonDir);
|
||||
} catch (IOException e) {
|
||||
log.error(e.toString());
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
if (!jsonDir.mkdir())
|
||||
log.warn("make jsonDir failed.\njsonDir=" + jsonDir.getAbsolutePath());
|
||||
|
||||
if (!txDir.mkdir())
|
||||
log.warn("make txDir failed.\ntxDir=" + txDir.getAbsolutePath());
|
||||
|
||||
if (!txOutputDir.mkdir())
|
||||
log.warn("make txOutputDir failed.\ntxOutputDir=" + txOutputDir.getAbsolutePath());
|
||||
|
||||
if (!bsqStateDir.mkdir())
|
||||
log.warn("make bsqStateDir failed.\nbsqStateDir=" + bsqStateDir.getAbsolutePath());
|
||||
|
||||
txFileManager = new JsonFileManager(txDir);
|
||||
txOutputFileManager = new JsonFileManager(txOutputDir);
|
||||
bsqStateFileManager = new JsonFileManager(bsqStateDir);
|
||||
}
|
||||
|
||||
File jsonDir = new File(Paths.get(storageDir.getAbsolutePath(), "json").toString());
|
||||
blockDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "block").toString());
|
||||
File txDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "tx").toString());
|
||||
File txOutputDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "txo").toString());
|
||||
File bsqStateDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "all").toString());
|
||||
|
||||
if (!jsonDir.mkdir())
|
||||
log.warn("make jsonDir failed.\njsonDir=" + jsonDir.getAbsolutePath());
|
||||
|
||||
if (!blockDir.mkdir())
|
||||
log.warn("make blockDir failed.\njsonDir=" + blockDir.getAbsolutePath());
|
||||
|
||||
if (!txDir.mkdir())
|
||||
log.warn("make txDir failed.\ntxDir=" + txDir.getAbsolutePath());
|
||||
|
||||
if (!txOutputDir.mkdir())
|
||||
log.warn("make txOutputDir failed.\ntxOutputDir=" + txOutputDir.getAbsolutePath());
|
||||
|
||||
if (!bsqStateDir.mkdir())
|
||||
log.warn("make bsqStateDir failed.\nbsqStateDir=" + bsqStateDir.getAbsolutePath());
|
||||
|
||||
blockFileManager = new JsonFileManager(blockDir);
|
||||
txFileManager = new JsonFileManager(txDir);
|
||||
txOutputFileManager = new JsonFileManager(txOutputDir);
|
||||
bsqStateFileManager = new JsonFileManager(bsqStateDir);
|
||||
}
|
||||
|
||||
public void shutDown() {
|
||||
if (!dumpBlockchainData) {
|
||||
return;
|
||||
}
|
||||
|
||||
blockFileManager.shutDown();
|
||||
txFileManager.shutDown();
|
||||
txOutputFileManager.shutDown();
|
||||
bsqStateFileManager.shutDown();
|
||||
dumpBlockchainData = false;
|
||||
}
|
||||
|
||||
public void onNewBlock(Block block) {
|
||||
if (!dumpBlockchainData) {
|
||||
return;
|
||||
}
|
||||
|
||||
// We do write the block on the main thread as the overhead to create a thread and risk for inconsistency is not
|
||||
// worth the potential performance gain.
|
||||
processBlock(block, true);
|
||||
}
|
||||
|
||||
private void processBlock(Block block, boolean doDumpDaoState) {
|
||||
int lastPersistedBlock = getLastPersistedBlock();
|
||||
if (block.getHeight() <= lastPersistedBlock) {
|
||||
return;
|
||||
}
|
||||
|
||||
long ts = System.currentTimeMillis();
|
||||
JsonBlock jsonBlock = getJsonBlock(block);
|
||||
blockFileManager.writeToDisc(Utilities.objectToJson(jsonBlock), String.valueOf(jsonBlock.getHeight()));
|
||||
|
||||
jsonBlock.getTxs().forEach(jsonTx -> {
|
||||
txFileManager.writeToDisc(Utilities.objectToJson(jsonTx), jsonTx.getId());
|
||||
|
||||
jsonTx.getOutputs().forEach(jsonTxOutput ->
|
||||
txOutputFileManager.writeToDisc(Utilities.objectToJson(jsonTxOutput), jsonTxOutput.getId()));
|
||||
});
|
||||
|
||||
log.info("Write json data for block {} took {} ms", block.getHeight(), System.currentTimeMillis() - ts);
|
||||
|
||||
if (doDumpDaoState) {
|
||||
dumpDaoState();
|
||||
if (dumpBlockchainData && txFileManager != null) {
|
||||
txFileManager.shutDown();
|
||||
txOutputFileManager.shutDown();
|
||||
bsqStateFileManager.shutDown();
|
||||
}
|
||||
}
|
||||
|
||||
public void onParseBlockChainComplete() {
|
||||
if (!dumpBlockchainData) {
|
||||
return;
|
||||
}
|
||||
public void maybeExportToJson() {
|
||||
if (dumpBlockchainData &&
|
||||
daoStateService.isParseBlockChainComplete()) {
|
||||
// We store the data we need once we write the data to disk (in the thread) locally.
|
||||
// Access to daoStateService is single threaded, we must not access daoStateService from the thread.
|
||||
List<JsonTxOutput> allJsonTxOutputs = new ArrayList<>();
|
||||
|
||||
int lastPersistedBlock = getLastPersistedBlock();
|
||||
List<Block> blocks = daoStateService.getBlocksFromBlockHeight(lastPersistedBlock + 1, Integer.MAX_VALUE);
|
||||
List<JsonTx> jsonTxs = daoStateService.getUnorderedTxStream()
|
||||
.map(tx -> {
|
||||
JsonTx jsonTx = getJsonTx(tx);
|
||||
allJsonTxOutputs.addAll(jsonTx.getOutputs());
|
||||
return jsonTx;
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
// We use a thread here to write all past blocks to avoid that the main thread gets blocked for too long.
|
||||
new Thread(() -> {
|
||||
Thread.currentThread().setName("Write all blocks to json");
|
||||
blocks.forEach(e -> processBlock(e, false));
|
||||
}).start();
|
||||
|
||||
dumpDaoState();
|
||||
}
|
||||
|
||||
private void dumpDaoState() {
|
||||
// TODO we should get rid of that data structure and use the individual jsonBlocks instead as we cannot cache data
|
||||
// here and re-write each time the full blockchain which is already > 200 MB
|
||||
// Once the webapp has impl the changes we can delete that here.
|
||||
long ts = System.currentTimeMillis();
|
||||
List<JsonBlock> jsonBlockList = daoStateService.getBlocks().stream()
|
||||
.map(this::getJsonBlock)
|
||||
.collect(Collectors.toList());
|
||||
JsonBlocks jsonBlocks = new JsonBlocks(daoStateService.getChainHeight(), jsonBlockList);
|
||||
|
||||
// We use here the thread write method as the data is quite large and write can take a bit
|
||||
bsqStateFileManager.writeToDiscThreaded(Utilities.objectToJson(jsonBlocks), "blocks");
|
||||
log.info("Dumping full bsqState with {} blocks took {} ms",
|
||||
jsonBlocks.getBlocks().size(), System.currentTimeMillis() - ts);
|
||||
}
|
||||
|
||||
private int getLastPersistedBlock() {
|
||||
// At start we use one block before genesis
|
||||
int result = daoStateService.getGenesisBlockHeight() - 1;
|
||||
String[] list = blockDir.list();
|
||||
if (list != null && list.length > 0) {
|
||||
List<Integer> blocks = Arrays.stream(list)
|
||||
.filter(e -> !e.endsWith(".tmp"))
|
||||
.map(e -> e.replace(".json", ""))
|
||||
.map(Integer::valueOf)
|
||||
.sorted()
|
||||
DaoState daoState = daoStateService.getClone();
|
||||
List<JsonBlock> jsonBlockList = daoState.getBlocks().stream()
|
||||
.map(this::getJsonBlock)
|
||||
.collect(Collectors.toList());
|
||||
if (!blocks.isEmpty()) {
|
||||
Integer lastBlockHeight = blocks.get(blocks.size() - 1);
|
||||
if (lastBlockHeight > result) {
|
||||
result = lastBlockHeight;
|
||||
JsonBlocks jsonBlocks = new JsonBlocks(daoState.getChainHeight(), jsonBlockList);
|
||||
|
||||
ListenableFuture<Void> future = executor.submit(() -> {
|
||||
bsqStateFileManager.writeToDisc(Utilities.objectToJson(jsonBlocks), "blocks");
|
||||
allJsonTxOutputs.forEach(jsonTxOutput -> txOutputFileManager.writeToDisc(Utilities.objectToJson(jsonTxOutput), jsonTxOutput.getId()));
|
||||
jsonTxs.forEach(jsonTx -> txFileManager.writeToDisc(Utilities.objectToJson(jsonTx), jsonTx.getId()));
|
||||
return null;
|
||||
});
|
||||
|
||||
Futures.addCallback(future, new FutureCallback<>() {
|
||||
public void onSuccess(Void ignore) {
|
||||
}
|
||||
}
|
||||
|
||||
public void onFailure(@NotNull Throwable throwable) {
|
||||
log.error(throwable.toString());
|
||||
throwable.printStackTrace();
|
||||
}
|
||||
}, MoreExecutors.directExecutor());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private JsonBlock getJsonBlock(Block block) {
|
||||
|
@ -168,7 +168,7 @@ public class FullNode extends BsqNode {
|
||||
}
|
||||
|
||||
private void onNewBlock(Block block) {
|
||||
maybeExportNewBlockToJson(block);
|
||||
maybeExportToJson();
|
||||
|
||||
if (p2pNetworkReady && parseBlockchainComplete)
|
||||
fullNodeNetworkService.publishNewBlock(block);
|
||||
|
@ -28,7 +28,6 @@ import bisq.core.dao.node.parser.BlockParser;
|
||||
import bisq.core.dao.node.parser.exceptions.RequiredReorgFromSnapshotException;
|
||||
import bisq.core.dao.state.DaoStateService;
|
||||
import bisq.core.dao.state.DaoStateSnapshotService;
|
||||
import bisq.core.dao.state.model.blockchain.Block;
|
||||
|
||||
import bisq.network.p2p.P2PService;
|
||||
import bisq.network.p2p.network.Connection;
|
||||
@ -40,7 +39,6 @@ import com.google.inject.Inject;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@ -229,18 +227,19 @@ public class LiteNode extends BsqNode {
|
||||
}
|
||||
|
||||
// We received a new block
|
||||
private void onNewBlockReceived(RawBlock rawBlock) {
|
||||
int blockHeight = rawBlock.getHeight();
|
||||
log.debug("onNewBlockReceived: block at height {}, hash={}", blockHeight, rawBlock.getHash());
|
||||
private void onNewBlockReceived(RawBlock block) {
|
||||
int blockHeight = block.getHeight();
|
||||
log.debug("onNewBlockReceived: block at height {}, hash={}", blockHeight, block.getHash());
|
||||
|
||||
// We only update chainTipHeight if we get a newer block
|
||||
if (blockHeight > chainTipHeight)
|
||||
chainTipHeight = blockHeight;
|
||||
|
||||
try {
|
||||
Optional<Block> optionalBlock = doParseBlock(rawBlock);
|
||||
optionalBlock.ifPresent(this::maybeExportNewBlockToJson);
|
||||
doParseBlock(block);
|
||||
} catch (RequiredReorgFromSnapshotException ignore) {
|
||||
}
|
||||
|
||||
maybeExportToJson();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user