Merge pull request #4635 from chimp1984/fix-performance-issues-at-dump-dao-data

Change write DAO json files to disk strategy
This commit is contained in:
Christoph Atteneder 2020-10-13 09:20:51 +02:00 committed by GitHub
commit 88f9eee75b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 204 additions and 144 deletions

View File

@ -24,13 +24,26 @@ import java.nio.file.Paths;
import java.io.File;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@Slf4j
public class JsonFileManager {
private final ThreadPoolExecutor executor;
private final static List<JsonFileManager> INSTANCES = new ArrayList<>();
public static void shutDownAllInstances() {
INSTANCES.forEach(JsonFileManager::shutDown);
}
@Nullable
private ThreadPoolExecutor executor;
private final File dir;
@ -41,54 +54,62 @@ public class JsonFileManager {
public JsonFileManager(File dir) {
this.dir = dir;
this.executor = Utilities.getThreadPoolExecutor("JsonFileManagerExecutor", 5, 50, 60);
if (!dir.exists() && !dir.mkdir()) {
log.warn("make dir failed");
}
if (!dir.exists())
if (!dir.mkdir())
log.warn("make dir failed");
INSTANCES.add(this);
}
Runtime.getRuntime().addShutdownHook(new Thread(JsonFileManager.this::shutDown,
"JsonFileManager.ShutDownHook"));
@NotNull
protected ThreadPoolExecutor getExecutor() {
if (executor == null) {
executor = Utilities.getThreadPoolExecutor("JsonFileManagerExecutor", 5, 50, 60);
}
return executor;
}
public void shutDown() {
executor.shutdown();
if (executor != null) {
executor.shutdown();
}
}
public void writeToDiscThreaded(String json, String fileName) {
getExecutor().execute(() -> writeToDisc(json, fileName));
}
public void writeToDisc(String json, String fileName) {
executor.execute(() -> {
File jsonFile = new File(Paths.get(dir.getAbsolutePath(), fileName + ".json").toString());
File tempFile = null;
PrintWriter printWriter = null;
try {
tempFile = File.createTempFile("temp", null, dir);
if (!executor.isShutdown() && !executor.isTerminated() && !executor.isTerminating())
tempFile.deleteOnExit();
File jsonFile = new File(Paths.get(dir.getAbsolutePath(), fileName + ".json").toString());
File tempFile = null;
PrintWriter printWriter = null;
try {
tempFile = File.createTempFile("temp", null, dir);
tempFile.deleteOnExit();
printWriter = new PrintWriter(tempFile);
printWriter.println(json);
printWriter = new PrintWriter(tempFile);
printWriter.println(json);
// This close call and comment is borrowed from FileManager. Not 100% sure it that is really needed but
// seems that had fixed in the past and we got reported issues on Windows so that fix might be still
// required.
// Close resources before replacing file with temp file because otherwise it causes problems on windows
// when rename temp file
printWriter.close();
// This close call and comment is borrowed from FileManager. Not 100% sure it that is really needed but
// seems that had fixed in the past and we got reported issues on Windows so that fix might be still
// required.
// Close resources before replacing file with temp file because otherwise it causes problems on windows
// when rename temp file
printWriter.close();
FileUtil.renameFile(tempFile, jsonFile);
} catch (Throwable t) {
log.error("storageFile " + jsonFile.toString());
t.printStackTrace();
} finally {
if (tempFile != null && tempFile.exists()) {
log.warn("Temp file still exists after failed save. We will delete it now. storageFile=" + fileName);
if (!tempFile.delete())
log.error("Cannot delete temp file.");
}
if (printWriter != null)
printWriter.close();
FileUtil.renameFile(tempFile, jsonFile);
} catch (Throwable t) {
log.error("storageFile " + jsonFile.toString());
t.printStackTrace();
} finally {
if (tempFile != null && tempFile.exists()) {
log.warn("Temp file still exists after failed save. We will delete it now. storageFile=" + fileName);
if (!tempFile.delete())
log.error("Cannot delete temp file.");
}
});
if (printWriter != null)
printWriter.close();
}
}
}

View File

@ -21,6 +21,7 @@ import bisq.core.app.BisqExecutable;
import bisq.core.btc.setup.WalletsSetup;
import bisq.core.btc.wallet.BsqWalletService;
import bisq.core.btc.wallet.BtcWalletService;
import bisq.core.dao.DaoSetup;
import bisq.core.offer.OpenOfferManager;
import bisq.core.support.dispute.arbitration.arbitrator.ArbitratorManager;
@ -31,6 +32,7 @@ import bisq.network.p2p.seed.SeedNodeRepository;
import bisq.common.UserThread;
import bisq.common.app.DevEnv;
import bisq.common.config.Config;
import bisq.common.file.JsonFileManager;
import bisq.common.handlers.ResultHandler;
import bisq.common.persistence.PersistenceManager;
import bisq.common.setup.GracefulShutDownHandler;
@ -83,6 +85,8 @@ public abstract class ExecutableForAppWithP2p extends BisqExecutable {
log.info("gracefulShutDown");
try {
if (injector != null) {
JsonFileManager.shutDownAllInstances();
injector.getInstance(DaoSetup.class).shutDown();
injector.getInstance(ArbitratorManager.class).shutDown();
injector.getInstance(OpenOfferManager.class).shutDown(() -> injector.getInstance(P2PService.class).shutDown(() -> {
injector.getInstance(WalletsSetup.class).shutDownComplete.addListener((ov, o, n) -> {

View File

@ -209,7 +209,7 @@ public abstract class BsqNode implements DaoSetupService {
parseBlockchainComplete = true;
daoStateService.onParseBlockChainComplete();
maybeExportToJson();
exportJsonFilesService.onParseBlockChainComplete();
}
@SuppressWarnings("WeakerAccess")
@ -291,7 +291,7 @@ public abstract class BsqNode implements DaoSetupService {
return Optional.empty();
}
protected void maybeExportToJson() {
exportJsonFilesService.maybeExportToJson();
protected void maybeExportNewBlockToJson(Block block) {
exportJsonFilesService.onNewBlock(block);
}
}

View File

@ -19,7 +19,6 @@ package bisq.core.dao.node.explorer;
import bisq.core.dao.DaoSetupService;
import bisq.core.dao.state.DaoStateService;
import bisq.core.dao.state.model.DaoState;
import bisq.core.dao.state.model.blockchain.Block;
import bisq.core.dao.state.model.blockchain.PubKeyScript;
import bisq.core.dao.state.model.blockchain.Tx;
@ -27,7 +26,6 @@ import bisq.core.dao.state.model.blockchain.TxOutput;
import bisq.core.dao.state.model.blockchain.TxType;
import bisq.common.config.Config;
import bisq.common.file.FileUtil;
import bisq.common.file.JsonFileManager;
import bisq.common.util.Utilities;
@ -37,18 +35,11 @@ import com.google.inject.Inject;
import javax.inject.Named;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.nio.file.Paths;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@ -56,17 +47,13 @@ import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
@Slf4j
public class ExportJsonFilesService implements DaoSetupService {
private final DaoStateService daoStateService;
private final File storageDir;
private final boolean dumpBlockchainData;
private final ListeningExecutorService executor = Utilities.getListeningExecutorService("JsonExporter",
1, 1, 1200);
private JsonFileManager txFileManager, txOutputFileManager, bsqStateFileManager;
private boolean dumpBlockchainData;
private JsonFileManager blockFileManager, txFileManager, txOutputFileManager, bsqStateFileManager;
private File blockDir;
@Inject
public ExportJsonFilesService(DaoStateService daoStateService,
@ -88,88 +75,135 @@ public class ExportJsonFilesService implements DaoSetupService {
@Override
public void start() {
if (dumpBlockchainData) {
File jsonDir = new File(Paths.get(storageDir.getAbsolutePath(), "json").toString());
File txDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "tx").toString());
File txOutputDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "txo").toString());
File bsqStateDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "all").toString());
try {
if (txDir.exists())
FileUtil.deleteDirectory(txDir);
if (txOutputDir.exists())
FileUtil.deleteDirectory(txOutputDir);
if (bsqStateDir.exists())
FileUtil.deleteDirectory(bsqStateDir);
if (jsonDir.exists())
FileUtil.deleteDirectory(jsonDir);
} catch (IOException e) {
log.error(e.toString());
e.printStackTrace();
}
if (!jsonDir.mkdir())
log.warn("make jsonDir failed.\njsonDir=" + jsonDir.getAbsolutePath());
if (!txDir.mkdir())
log.warn("make txDir failed.\ntxDir=" + txDir.getAbsolutePath());
if (!txOutputDir.mkdir())
log.warn("make txOutputDir failed.\ntxOutputDir=" + txOutputDir.getAbsolutePath());
if (!bsqStateDir.mkdir())
log.warn("make bsqStateDir failed.\nbsqStateDir=" + bsqStateDir.getAbsolutePath());
txFileManager = new JsonFileManager(txDir);
txOutputFileManager = new JsonFileManager(txOutputDir);
bsqStateFileManager = new JsonFileManager(bsqStateDir);
if (!dumpBlockchainData) {
return;
}
File jsonDir = new File(Paths.get(storageDir.getAbsolutePath(), "json").toString());
blockDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "block").toString());
File txDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "tx").toString());
File txOutputDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "txo").toString());
File bsqStateDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "all").toString());
if (!jsonDir.mkdir())
log.warn("make jsonDir failed.\njsonDir=" + jsonDir.getAbsolutePath());
if (!blockDir.mkdir())
log.warn("make blockDir failed.\njsonDir=" + blockDir.getAbsolutePath());
if (!txDir.mkdir())
log.warn("make txDir failed.\ntxDir=" + txDir.getAbsolutePath());
if (!txOutputDir.mkdir())
log.warn("make txOutputDir failed.\ntxOutputDir=" + txOutputDir.getAbsolutePath());
if (!bsqStateDir.mkdir())
log.warn("make bsqStateDir failed.\nbsqStateDir=" + bsqStateDir.getAbsolutePath());
blockFileManager = new JsonFileManager(blockDir);
txFileManager = new JsonFileManager(txDir);
txOutputFileManager = new JsonFileManager(txOutputDir);
bsqStateFileManager = new JsonFileManager(bsqStateDir);
}
public void shutDown() {
if (dumpBlockchainData && txFileManager != null) {
txFileManager.shutDown();
txOutputFileManager.shutDown();
bsqStateFileManager.shutDown();
if (!dumpBlockchainData) {
return;
}
blockFileManager.shutDown();
txFileManager.shutDown();
txOutputFileManager.shutDown();
bsqStateFileManager.shutDown();
dumpBlockchainData = false;
}
public void onNewBlock(Block block) {
if (!dumpBlockchainData) {
return;
}
// We do write the block on the main thread as the overhead to create a thread and risk for inconsistency is not
// worth the potential performance gain.
processBlock(block, true);
}
private void processBlock(Block block, boolean doDumpDaoState) {
int lastPersistedBlock = getLastPersistedBlock();
if (block.getHeight() <= lastPersistedBlock) {
return;
}
long ts = System.currentTimeMillis();
JsonBlock jsonBlock = getJsonBlock(block);
blockFileManager.writeToDisc(Utilities.objectToJson(jsonBlock), String.valueOf(jsonBlock.getHeight()));
jsonBlock.getTxs().forEach(jsonTx -> {
txFileManager.writeToDisc(Utilities.objectToJson(jsonTx), jsonTx.getId());
jsonTx.getOutputs().forEach(jsonTxOutput ->
txOutputFileManager.writeToDisc(Utilities.objectToJson(jsonTxOutput), jsonTxOutput.getId()));
});
log.info("Write json data for block {} took {} ms", block.getHeight(), System.currentTimeMillis() - ts);
if (doDumpDaoState) {
dumpDaoState();
}
}
public void maybeExportToJson() {
if (dumpBlockchainData &&
daoStateService.isParseBlockChainComplete()) {
// We store the data we need once we write the data to disk (in the thread) locally.
// Access to daoStateService is single threaded, we must not access daoStateService from the thread.
List<JsonTxOutput> allJsonTxOutputs = new ArrayList<>();
List<JsonTx> jsonTxs = daoStateService.getUnorderedTxStream()
.map(tx -> {
JsonTx jsonTx = getJsonTx(tx);
allJsonTxOutputs.addAll(jsonTx.getOutputs());
return jsonTx;
}).collect(Collectors.toList());
DaoState daoState = daoStateService.getClone();
List<JsonBlock> jsonBlockList = daoState.getBlocks().stream()
.map(this::getJsonBlock)
.collect(Collectors.toList());
JsonBlocks jsonBlocks = new JsonBlocks(daoState.getChainHeight(), jsonBlockList);
ListenableFuture<Void> future = executor.submit(() -> {
bsqStateFileManager.writeToDisc(Utilities.objectToJson(jsonBlocks), "blocks");
allJsonTxOutputs.forEach(jsonTxOutput -> txOutputFileManager.writeToDisc(Utilities.objectToJson(jsonTxOutput), jsonTxOutput.getId()));
jsonTxs.forEach(jsonTx -> txFileManager.writeToDisc(Utilities.objectToJson(jsonTx), jsonTx.getId()));
return null;
});
Futures.addCallback(future, new FutureCallback<>() {
public void onSuccess(Void ignore) {
}
public void onFailure(@NotNull Throwable throwable) {
log.error(throwable.toString());
throwable.printStackTrace();
}
}, MoreExecutors.directExecutor());
public void onParseBlockChainComplete() {
if (!dumpBlockchainData) {
return;
}
int lastPersistedBlock = getLastPersistedBlock();
List<Block> blocks = daoStateService.getBlocksFromBlockHeight(lastPersistedBlock + 1, Integer.MAX_VALUE);
// We use a thread here to write all past blocks to avoid that the main thread gets blocked for too long.
new Thread(() -> {
Thread.currentThread().setName("Write all blocks to json");
blocks.forEach(e -> processBlock(e, false));
}).start();
dumpDaoState();
}
private void dumpDaoState() {
// TODO we should get rid of that data structure and use the individual jsonBlocks instead as we cannot cache data
// here and re-write each time the full blockchain which is already > 200 MB
// Once the webapp has impl the changes we can delete that here.
long ts = System.currentTimeMillis();
List<JsonBlock> jsonBlockList = daoStateService.getBlocks().stream()
.map(this::getJsonBlock)
.collect(Collectors.toList());
JsonBlocks jsonBlocks = new JsonBlocks(daoStateService.getChainHeight(), jsonBlockList);
// We use here the thread write method as the data is quite large and write can take a bit
bsqStateFileManager.writeToDiscThreaded(Utilities.objectToJson(jsonBlocks), "blocks");
log.info("Dumping full bsqState with {} blocks took {} ms",
jsonBlocks.getBlocks().size(), System.currentTimeMillis() - ts);
}
private int getLastPersistedBlock() {
// At start we use one block before genesis
int result = daoStateService.getGenesisBlockHeight() - 1;
String[] list = blockDir.list();
if (list != null && list.length > 0) {
List<Integer> blocks = Arrays.stream(list)
.filter(e -> !e.endsWith(".tmp"))
.map(e -> e.replace(".json", ""))
.map(Integer::valueOf)
.sorted()
.collect(Collectors.toList());
if (!blocks.isEmpty()) {
Integer lastBlockHeight = blocks.get(blocks.size() - 1);
if (lastBlockHeight > result) {
result = lastBlockHeight;
}
}
}
return result;
}
private JsonBlock getJsonBlock(Block block) {

View File

@ -168,7 +168,7 @@ public class FullNode extends BsqNode {
}
private void onNewBlock(Block block) {
maybeExportToJson();
maybeExportNewBlockToJson(block);
if (p2pNetworkReady && parseBlockchainComplete)
fullNodeNetworkService.publishNewBlock(block);

View File

@ -28,6 +28,7 @@ import bisq.core.dao.node.parser.BlockParser;
import bisq.core.dao.node.parser.exceptions.RequiredReorgFromSnapshotException;
import bisq.core.dao.state.DaoStateService;
import bisq.core.dao.state.DaoStateSnapshotService;
import bisq.core.dao.state.model.blockchain.Block;
import bisq.network.p2p.P2PService;
import bisq.network.p2p.network.Connection;
@ -39,6 +40,7 @@ import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
@ -227,19 +229,18 @@ public class LiteNode extends BsqNode {
}
// We received a new block
private void onNewBlockReceived(RawBlock block) {
int blockHeight = block.getHeight();
log.debug("onNewBlockReceived: block at height {}, hash={}", blockHeight, block.getHash());
private void onNewBlockReceived(RawBlock rawBlock) {
int blockHeight = rawBlock.getHeight();
log.debug("onNewBlockReceived: block at height {}, hash={}", blockHeight, rawBlock.getHash());
// We only update chainTipHeight if we get a newer block
if (blockHeight > chainTipHeight)
chainTipHeight = blockHeight;
try {
doParseBlock(block);
Optional<Block> optionalBlock = doParseBlock(rawBlock);
optionalBlock.ifPresent(this::maybeExportNewBlockToJson);
} catch (RequiredReorgFromSnapshotException ignore) {
}
maybeExportToJson();
}
}

View File

@ -245,6 +245,6 @@ public class OfferBookService {
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
jsonFileManager.writeToDisc(Utilities.objectToJson(offerForJsonList), "offers_statistics");
jsonFileManager.writeToDiscThreaded(Utilities.objectToJson(offerForJsonList), "offers_statistics");
}
}

View File

@ -58,7 +58,7 @@ public class DumpDelayedPayoutTx {
.map(trade -> new DelayedPayoutHash(trade.getId(),
Utilities.bytesAsHexString(((Trade) trade).getDelayedPayoutTxBytes())))
.collect(Collectors.toList());
jsonFileManager.writeToDisc(Utilities.objectToJson(delayedPayoutHashes), fileName);
jsonFileManager.writeToDiscThreaded(Utilities.objectToJson(delayedPayoutHashes), fileName);
}
}

View File

@ -123,13 +123,13 @@ public class TradeStatisticsManager {
ArrayList<CurrencyTuple> fiatCurrencyList = CurrencyUtil.getAllSortedFiatCurrencies().stream()
.map(e -> new CurrencyTuple(e.getCode(), e.getName(), 8))
.collect(Collectors.toCollection(ArrayList::new));
jsonFileManager.writeToDisc(Utilities.objectToJson(fiatCurrencyList), "fiat_currency_list");
jsonFileManager.writeToDiscThreaded(Utilities.objectToJson(fiatCurrencyList), "fiat_currency_list");
ArrayList<CurrencyTuple> cryptoCurrencyList = CurrencyUtil.getAllSortedCryptoCurrencies().stream()
.map(e -> new CurrencyTuple(e.getCode(), e.getName(), 8))
.collect(Collectors.toCollection(ArrayList::new));
cryptoCurrencyList.add(0, new CurrencyTuple(Res.getBaseCurrencyCode(), Res.getBaseCurrencyName(), 8));
jsonFileManager.writeToDisc(Utilities.objectToJson(cryptoCurrencyList), "crypto_currency_list");
jsonFileManager.writeToDiscThreaded(Utilities.objectToJson(cryptoCurrencyList), "crypto_currency_list");
}
List<TradeStatisticsForJson> list = observableTradeStatisticsSet.stream()
@ -138,6 +138,6 @@ public class TradeStatisticsManager {
.collect(Collectors.toList());
TradeStatisticsForJson[] array = new TradeStatisticsForJson[list.size()];
list.toArray(array);
jsonFileManager.writeToDisc(Utilities.objectToJson(array), "trade_statistics");
jsonFileManager.writeToDiscThreaded(Utilities.objectToJson(array), "trade_statistics");
}
}