From aefa986467e0f65aa59ecbc3468664bf05016f66 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Mon, 24 Sep 2018 14:03:10 -0500 Subject: [PATCH 1/2] Fix threading issues with ExportJsonFilesService - Rename JsonBlockChainExporter to ExportJsonFilesService - Remove getBlocksFromState method - Add ExportJsonFilesService to DaoSetup and implement DaoSetupService - Add json dir for all json sub dirs - Move access to bsqStateService out from thread in ExportJsonFilesService - Only do write to disk in thread --- .../main/java/bisq/core/dao/DaoModule.java | 4 +- .../src/main/java/bisq/core/dao/DaoSetup.java | 9 +- .../bisq/core/dao/node/full/FullNode.java | 12 +- .../dao/node/json/ExportJsonFilesService.java | 229 ++++++++++++++++++ .../dao/node/json/JsonBlockChainExporter.java | 206 ---------------- .../bisq/core/dao/state/BsqStateService.java | 4 - .../bisq/core/dao/state/SnapshotManager.java | 4 +- 7 files changed, 247 insertions(+), 221 deletions(-) create mode 100644 core/src/main/java/bisq/core/dao/node/json/ExportJsonFilesService.java delete mode 100644 core/src/main/java/bisq/core/dao/node/json/JsonBlockChainExporter.java diff --git a/core/src/main/java/bisq/core/dao/DaoModule.java b/core/src/main/java/bisq/core/dao/DaoModule.java index 293a609630..48d1c3727b 100644 --- a/core/src/main/java/bisq/core/dao/DaoModule.java +++ b/core/src/main/java/bisq/core/dao/DaoModule.java @@ -53,7 +53,7 @@ import bisq.core.dao.node.BsqNodeProvider; import bisq.core.dao.node.full.FullNode; import bisq.core.dao.node.full.RpcService; import bisq.core.dao.node.full.network.FullNodeNetworkService; -import bisq.core.dao.node.json.JsonBlockChainExporter; +import bisq.core.dao.node.json.ExportJsonFilesService; import bisq.core.dao.node.lite.LiteNode; import bisq.core.dao.node.lite.network.LiteNodeNetworkService; import bisq.core.dao.node.parser.BlockParser; @@ -99,7 +99,7 @@ public class DaoModule extends AppModule { bind(BsqState.class).in(Singleton.class); bind(BsqStateService.class).in(Singleton.class); bind(SnapshotManager.class).in(Singleton.class); - bind(JsonBlockChainExporter.class).in(Singleton.class); + bind(ExportJsonFilesService.class).in(Singleton.class); // Period bind(CycleService.class).in(Singleton.class); diff --git a/core/src/main/java/bisq/core/dao/DaoSetup.java b/core/src/main/java/bisq/core/dao/DaoSetup.java index f09246988d..5c4740bb2c 100644 --- a/core/src/main/java/bisq/core/dao/DaoSetup.java +++ b/core/src/main/java/bisq/core/dao/DaoSetup.java @@ -25,6 +25,7 @@ import bisq.core.dao.governance.voteresult.VoteResultService; import bisq.core.dao.governance.votereveal.VoteRevealService; import bisq.core.dao.node.BsqNode; import bisq.core.dao.node.BsqNodeProvider; +import bisq.core.dao.node.json.ExportJsonFilesService; import bisq.core.dao.state.BsqStateService; import bisq.core.dao.state.period.CycleService; @@ -35,7 +36,6 @@ import com.google.inject.Inject; /** * High level entry point for Dao domain. * We initialize all main service classes here to be sure they are started. - * */ public class DaoSetup { private final BsqStateService bsqStateService; @@ -47,6 +47,7 @@ public class DaoSetup { private final VoteRevealService voteRevealService; private final VoteResultService voteResultService; private final BsqNode bsqNode; + private final ExportJsonFilesService exportJsonFilesService; @Inject public DaoSetup(BsqNodeProvider bsqNodeProvider, @@ -57,7 +58,8 @@ public class DaoSetup { BlindVoteListService blindVoteListService, MyBlindVoteListService myBlindVoteListService, VoteRevealService voteRevealService, - VoteResultService voteResultService) { + VoteResultService voteResultService, + ExportJsonFilesService exportJsonFilesService) { this.bsqStateService = bsqStateService; this.cycleService = cycleService; this.proposalService = proposalService; @@ -66,6 +68,7 @@ public class DaoSetup { this.myBlindVoteListService = myBlindVoteListService; this.voteRevealService = voteRevealService; this.voteResultService = voteResultService; + this.exportJsonFilesService = exportJsonFilesService; bsqNode = bsqNodeProvider.getBsqNode(); } @@ -81,6 +84,7 @@ public class DaoSetup { myBlindVoteListService.addListeners(); voteRevealService.addListeners(); voteResultService.addListeners(); + exportJsonFilesService.addListeners(); bsqStateService.start(); cycleService.start(); @@ -90,6 +94,7 @@ public class DaoSetup { myBlindVoteListService.start(); voteRevealService.start(); voteResultService.start(); + exportJsonFilesService.start(); bsqNode.setErrorMessageHandler(errorMessageHandler); bsqNode.start(); diff --git a/core/src/main/java/bisq/core/dao/node/full/FullNode.java b/core/src/main/java/bisq/core/dao/node/full/FullNode.java index 05b9ac7db8..df5bd81cf8 100644 --- a/core/src/main/java/bisq/core/dao/node/full/FullNode.java +++ b/core/src/main/java/bisq/core/dao/node/full/FullNode.java @@ -19,7 +19,7 @@ package bisq.core.dao.node.full; import bisq.core.dao.node.BsqNode; import bisq.core.dao.node.full.network.FullNodeNetworkService; -import bisq.core.dao.node.json.JsonBlockChainExporter; +import bisq.core.dao.node.json.ExportJsonFilesService; import bisq.core.dao.node.parser.BlockParser; import bisq.core.dao.node.parser.exceptions.BlockNotConnectingException; import bisq.core.dao.state.BsqStateService; @@ -49,7 +49,7 @@ public class FullNode extends BsqNode { private final RpcService rpcService; private final FullNodeNetworkService fullNodeNetworkService; - private final JsonBlockChainExporter jsonBlockChainExporter; + private final ExportJsonFilesService exportJsonFilesService; private boolean addBlockHandlerAdded; @@ -64,12 +64,12 @@ public class FullNode extends BsqNode { SnapshotManager snapshotManager, P2PService p2PService, RpcService rpcService, - JsonBlockChainExporter jsonBlockChainExporter, + ExportJsonFilesService exportJsonFilesService, FullNodeNetworkService fullNodeNetworkService) { super(blockParser, bsqStateService, snapshotManager, p2PService); this.rpcService = rpcService; - this.jsonBlockChainExporter = jsonBlockChainExporter; + this.exportJsonFilesService = exportJsonFilesService; this.fullNodeNetworkService = fullNodeNetworkService; } @@ -88,7 +88,7 @@ public class FullNode extends BsqNode { } public void shutDown() { - jsonBlockChainExporter.shutDown(); + exportJsonFilesService.shutDown(); fullNodeNetworkService.shutDown(); } @@ -147,7 +147,7 @@ public class FullNode extends BsqNode { } private void onNewBlock(Block block) { - jsonBlockChainExporter.maybeExport(); + exportJsonFilesService.maybeExport(); if (p2pNetworkReady && parseBlockchainComplete) fullNodeNetworkService.publishNewBlock(block); diff --git a/core/src/main/java/bisq/core/dao/node/json/ExportJsonFilesService.java b/core/src/main/java/bisq/core/dao/node/json/ExportJsonFilesService.java new file mode 100644 index 0000000000..928d626194 --- /dev/null +++ b/core/src/main/java/bisq/core/dao/node/json/ExportJsonFilesService.java @@ -0,0 +1,229 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.core.dao.node.json; + +import bisq.core.dao.DaoOptionKeys; +import bisq.core.dao.DaoSetupService; +import bisq.core.dao.state.BsqState; +import bisq.core.dao.state.BsqStateService; +import bisq.core.dao.state.blockchain.PubKeyScript; +import bisq.core.dao.state.blockchain.SpentInfo; +import bisq.core.dao.state.blockchain.Tx; +import bisq.core.dao.state.blockchain.TxOutput; +import bisq.core.dao.state.blockchain.TxType; + +import bisq.common.storage.FileUtil; +import bisq.common.storage.JsonFileManager; +import bisq.common.storage.Storage; +import bisq.common.util.Utilities; + +import org.bitcoinj.core.Utils; + +import com.google.inject.Inject; + +import javax.inject.Named; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; + +import java.nio.file.Paths; + +import java.io.File; +import java.io.IOException; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +import lombok.extern.slf4j.Slf4j; + +import org.jetbrains.annotations.NotNull; + +@Slf4j +public class ExportJsonFilesService implements DaoSetupService { + private final BsqStateService bsqStateService; + private final File storageDir; + private final boolean dumpBlockchainData; + + private final ListeningExecutorService executor = Utilities.getListeningExecutorService("JsonExporter", 1, 1, 1200); + private JsonFileManager txFileManager, txOutputFileManager, bsqStateFileManager; + + @Inject + public ExportJsonFilesService(BsqStateService bsqStateService, + @Named(Storage.STORAGE_DIR) File storageDir, + @Named(DaoOptionKeys.DUMP_BLOCKCHAIN_DATA) boolean dumpBlockchainData) { + this.bsqStateService = bsqStateService; + this.storageDir = storageDir; + this.dumpBlockchainData = dumpBlockchainData; + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // DaoSetupService + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void addListeners() { + } + + @Override + public void start() { + if (dumpBlockchainData) { + File jsonDir = new File(Paths.get(storageDir.getAbsolutePath(), "json").toString()); + File txDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "tx").toString()); + File txOutputDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "txo").toString()); + File bsqStateDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "all").toString()); + try { + if (txDir.exists()) + FileUtil.deleteDirectory(txDir); + if (txOutputDir.exists()) + FileUtil.deleteDirectory(txOutputDir); + if (bsqStateDir.exists()) + FileUtil.deleteDirectory(bsqStateDir); + if (jsonDir.exists()) + FileUtil.deleteDirectory(jsonDir); + } catch (IOException e) { + log.error(e.toString()); + e.printStackTrace(); + } + + if (!jsonDir.mkdir()) + log.warn("make jsonDir failed.\njsonDir=" + jsonDir.getAbsolutePath()); + + if (!txDir.mkdir()) + log.warn("make txDir failed.\ntxDir=" + txDir.getAbsolutePath()); + + if (!txOutputDir.mkdir()) + log.warn("make txOutputDir failed.\ntxOutputDir=" + txOutputDir.getAbsolutePath()); + + if (!bsqStateDir.mkdir()) + log.warn("make bsqStateDir failed.\nbsqStateDir=" + bsqStateDir.getAbsolutePath()); + + txFileManager = new JsonFileManager(txDir); + txOutputFileManager = new JsonFileManager(txOutputDir); + bsqStateFileManager = new JsonFileManager(bsqStateDir); + } + } + + public void shutDown() { + if (dumpBlockchainData) { + txFileManager.shutDown(); + txOutputFileManager.shutDown(); + bsqStateFileManager.shutDown(); + } + } + + public void maybeExport() { + if (dumpBlockchainData) { + // We store the data we need once we write the data to disk (in the thread) locally. + // Access to bsqStateService is single threaded, we must not access bsqStateService from the thread. + List jsonTxOutputs = new ArrayList<>(); + List jsonTxs = new ArrayList<>(); + BsqState bsqStateClone = bsqStateService.getClone(); + + Map txMap = new LinkedList<>(bsqStateService.getBlocks()).stream() + .filter(Objects::nonNull) + .flatMap(block -> block.getTxs().stream()) + .collect(Collectors.toMap(Tx::getId, tx -> tx)); + for (Tx tx : txMap.values()) { + String txId = tx.getId(); + Optional optionalTxType = bsqStateService.getOptionalTxType(txId); + optionalTxType.ifPresent(txType -> { + JsonTxType jsonTxType = txType != TxType.UNDEFINED_TX_TYPE ? JsonTxType.valueOf(txType.name()) : null; + + tx.getTxOutputs().forEach(txOutput -> { + Optional optionalSpentInfo = bsqStateService.getSpentInfo(txOutput); + boolean isBsqOutput = bsqStateService.isBsqTxOutputType(txOutput); + PubKeyScript pubKeyScript = txOutput.getPubKeyScript(); + JsonTxOutput jsonTxOutput = new JsonTxOutput(txId, + txOutput.getIndex(), + isBsqOutput ? txOutput.getValue() : 0, + !isBsqOutput ? txOutput.getValue() : 0, + txOutput.getBlockHeight(), + isBsqOutput, + bsqStateService.getBurntFee(tx.getId()), + txOutput.getAddress(), + pubKeyScript != null ? new JsonScriptPubKey(pubKeyScript) : null, + optionalSpentInfo.map(JsonSpentInfo::new).orElse(null), + tx.getTime(), + jsonTxType, + jsonTxType != null ? jsonTxType.getDisplayString() : "", + txOutput.getOpReturnData() != null ? Utils.HEX.encode(txOutput.getOpReturnData()) : null + ); + jsonTxOutputs.add(jsonTxOutput); + }); + + List inputs = tx.getTxInputs().stream() + .map(txInput -> { + Optional optionalTxOutput = bsqStateService.getConnectedTxOutput(txInput); + if (optionalTxOutput.isPresent()) { + final TxOutput connectedTxOutput = optionalTxOutput.get(); + final boolean isBsqOutput = bsqStateService.isBsqTxOutputType(connectedTxOutput); + return new JsonTxInput(txInput.getConnectedTxOutputIndex(), + txInput.getConnectedTxOutputTxId(), + connectedTxOutput.getValue(), + isBsqOutput, + connectedTxOutput.getAddress(), + tx.getTime()); + } else { + return null; + } + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + JsonTx jsonTx = new JsonTx(txId, + tx.getBlockHeight(), + tx.getBlockHash(), + tx.getTime(), + inputs, + jsonTxOutputs, + jsonTxType, + jsonTxType != null ? jsonTxType.getDisplayString() : "", + bsqStateService.getBurntFee(tx.getId())); + + jsonTxs.add(jsonTx); + }); + } + + ListenableFuture future = executor.submit(() -> { + bsqStateFileManager.writeToDisc(Utilities.objectToJson(bsqStateClone), "BsqStateService"); + jsonTxOutputs.forEach(jsonTxOutput -> txOutputFileManager.writeToDisc(Utilities.objectToJson(jsonTxOutput), jsonTxOutput.getId())); + jsonTxs.forEach(jsonTx -> txFileManager.writeToDisc(Utilities.objectToJson(jsonTx), jsonTx.getId())); + return null; + }); + + Futures.addCallback(future, new FutureCallback<>() { + public void onSuccess(Void ignore) { + log.trace("onSuccess"); + } + + public void onFailure(@NotNull Throwable throwable) { + log.error(throwable.toString()); + throwable.printStackTrace(); + } + }); + } + } +} diff --git a/core/src/main/java/bisq/core/dao/node/json/JsonBlockChainExporter.java b/core/src/main/java/bisq/core/dao/node/json/JsonBlockChainExporter.java deleted file mode 100644 index 25745e3556..0000000000 --- a/core/src/main/java/bisq/core/dao/node/json/JsonBlockChainExporter.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * This file is part of Bisq. - * - * Bisq is free software: you can redistribute it and/or modify it - * under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * Bisq is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public - * License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with Bisq. If not, see . - */ - -package bisq.core.dao.node.json; - -import bisq.core.dao.DaoOptionKeys; -import bisq.core.dao.state.BsqState; -import bisq.core.dao.state.BsqStateService; -import bisq.core.dao.state.blockchain.PubKeyScript; -import bisq.core.dao.state.blockchain.SpentInfo; -import bisq.core.dao.state.blockchain.Tx; -import bisq.core.dao.state.blockchain.TxOutput; -import bisq.core.dao.state.blockchain.TxType; - -import bisq.common.storage.FileUtil; -import bisq.common.storage.JsonFileManager; -import bisq.common.storage.Storage; -import bisq.common.util.Utilities; - -import org.bitcoinj.core.Utils; - -import com.google.inject.Inject; - -import javax.inject.Named; - -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; - -import java.nio.file.Paths; - -import java.io.File; -import java.io.IOException; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.stream.Collectors; - -import lombok.extern.slf4j.Slf4j; - -import org.jetbrains.annotations.NotNull; - -@Slf4j -public class JsonBlockChainExporter { - private final BsqStateService bsqStateService; - private final boolean dumpBlockchainData; - - private final ListeningExecutorService executor = Utilities.getListeningExecutorService("JsonExporter", 1, 1, 1200); - private JsonFileManager txFileManager, txOutputFileManager, jsonFileManager; - - @Inject - public JsonBlockChainExporter(BsqStateService bsqStateService, - @Named(Storage.STORAGE_DIR) File storageDir, - @Named(DaoOptionKeys.DUMP_BLOCKCHAIN_DATA) boolean dumpBlockchainData) { - this.bsqStateService = bsqStateService; - this.dumpBlockchainData = dumpBlockchainData; - - init(storageDir, dumpBlockchainData); - } - - private void init(@Named(Storage.STORAGE_DIR) File storageDir, @Named(DaoOptionKeys.DUMP_BLOCKCHAIN_DATA) boolean dumpBlockchainData) { - if (dumpBlockchainData) { - File txDir = new File(Paths.get(storageDir.getAbsolutePath(), "tx").toString()); - File txOutputDir = new File(Paths.get(storageDir.getAbsolutePath(), "txo").toString()); - File blockchainDir = new File(Paths.get(storageDir.getAbsolutePath(), "all").toString()); - try { - if (txDir.exists()) - FileUtil.deleteDirectory(txDir); - if (txOutputDir.exists()) - FileUtil.deleteDirectory(txOutputDir); - if (blockchainDir.exists()) - FileUtil.deleteDirectory(blockchainDir); - } catch (IOException e) { - e.printStackTrace(); - } - - if (!txDir.mkdir()) - log.warn("make txDir failed.\ntxDir=" + txDir.getAbsolutePath()); - - if (!txOutputDir.mkdir()) - log.warn("make txOutputDir failed.\ntxOutputDir=" + txOutputDir.getAbsolutePath()); - - if (!blockchainDir.mkdir()) - log.warn("make blockchainDir failed.\nblockchainDir=" + blockchainDir.getAbsolutePath()); - - txFileManager = new JsonFileManager(txDir); - txOutputFileManager = new JsonFileManager(txOutputDir); - jsonFileManager = new JsonFileManager(blockchainDir); - } - } - - public void shutDown() { - if (dumpBlockchainData) { - txFileManager.shutDown(); - txOutputFileManager.shutDown(); - jsonFileManager.shutDown(); - } - } - - public void maybeExport() { - if (dumpBlockchainData) { - ListenableFuture future = executor.submit(() -> { - final BsqState bsqStateClone = bsqStateService.getClone(); - Map txMap = bsqStateService.getBlocksFromState(bsqStateClone).stream() - .filter(Objects::nonNull) - .flatMap(block -> block.getTxs().stream()) - .collect(Collectors.toMap(Tx::getId, tx -> tx)); - for (Tx tx : txMap.values()) { - String txId = tx.getId(); - final Optional optionalTxType = bsqStateService.getOptionalTxType(txId); - optionalTxType.ifPresent(txType1 -> { - JsonTxType txType = txType1 != TxType.UNDEFINED_TX_TYPE ? - JsonTxType.valueOf(txType1.name()) : null; - List outputs = new ArrayList<>(); - tx.getTxOutputs().forEach(txOutput -> { - final Optional optionalSpentInfo = bsqStateService.getSpentInfo(txOutput); - final boolean isBsqOutput = bsqStateService.isBsqTxOutputType(txOutput); - final PubKeyScript pubKeyScript = txOutput.getPubKeyScript(); - final JsonTxOutput outputForJson = new JsonTxOutput(txId, - txOutput.getIndex(), - isBsqOutput ? txOutput.getValue() : 0, - !isBsqOutput ? txOutput.getValue() : 0, - txOutput.getBlockHeight(), - isBsqOutput, - bsqStateService.getBurntFee(tx.getId()), - txOutput.getAddress(), - pubKeyScript != null ? new JsonScriptPubKey(pubKeyScript) : null, - optionalSpentInfo.map(JsonSpentInfo::new).orElse(null), - tx.getTime(), - txType, - txType != null ? txType.getDisplayString() : "", - txOutput.getOpReturnData() != null ? Utils.HEX.encode(txOutput.getOpReturnData()) : null - ); - outputs.add(outputForJson); - txOutputFileManager.writeToDisc(Utilities.objectToJson(outputForJson), outputForJson.getId()); - }); - - - List inputs = tx.getTxInputs().stream() - .map(txInput -> { - Optional optionalTxOutput = bsqStateService.getConnectedTxOutput(txInput); - if (optionalTxOutput.isPresent()) { - final TxOutput connectedTxOutput = optionalTxOutput.get(); - final boolean isBsqOutput = bsqStateService.isBsqTxOutputType(connectedTxOutput); - return new JsonTxInput(txInput.getConnectedTxOutputIndex(), - txInput.getConnectedTxOutputTxId(), - connectedTxOutput.getValue(), - isBsqOutput, - connectedTxOutput.getAddress(), - tx.getTime()); - } else { - return null; - } - }) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - - final JsonTx jsonTx = new JsonTx(txId, - tx.getBlockHeight(), - tx.getBlockHash(), - tx.getTime(), - inputs, - outputs, - txType, - txType != null ? txType.getDisplayString() : "", - bsqStateService.getBurntFee(tx.getId())); - - txFileManager.writeToDisc(Utilities.objectToJson(jsonTx), txId); - }); - } - - jsonFileManager.writeToDisc(Utilities.objectToJson(bsqStateClone), "BsqStateService"); - return null; - }); - - Futures.addCallback(future, new FutureCallback() { - public void onSuccess(Void ignore) { - log.trace("onSuccess"); - } - - public void onFailure(@NotNull Throwable throwable) { - log.error(throwable.toString()); - throwable.printStackTrace(); - } - }); - } - } -} diff --git a/core/src/main/java/bisq/core/dao/state/BsqStateService.java b/core/src/main/java/bisq/core/dao/state/BsqStateService.java index 97eabda13d..926bf6a8b3 100644 --- a/core/src/main/java/bisq/core/dao/state/BsqStateService.java +++ b/core/src/main/java/bisq/core/dao/state/BsqStateService.java @@ -120,10 +120,6 @@ public class BsqStateService implements DaoSetupService { return bsqState.getClone(); } - public LinkedList getBlocksFromState(BsqState bsqState) { - return new LinkedList<>(bsqState.getBlocks()); - } - /////////////////////////////////////////////////////////////////////////////////////////// // ChainHeight diff --git a/core/src/main/java/bisq/core/dao/state/SnapshotManager.java b/core/src/main/java/bisq/core/dao/state/SnapshotManager.java index d86e98db34..dcdd65a0bd 100644 --- a/core/src/main/java/bisq/core/dao/state/SnapshotManager.java +++ b/core/src/main/java/bisq/core/dao/state/SnapshotManager.java @@ -29,6 +29,8 @@ import com.google.common.annotations.VisibleForTesting; import java.io.File; +import java.util.LinkedList; + import lombok.extern.slf4j.Slf4j; import static com.google.common.base.Preconditions.checkNotNull; @@ -101,7 +103,7 @@ public class SnapshotManager implements BsqStateListener { checkNotNull(storage, "storage must not be null"); BsqState persisted = storage.initAndGetPersisted(bsqState, 100); if (persisted != null) { - log.info("applySnapshot persisted.chainHeadHeight=" + bsqStateService.getBlocksFromState(persisted).getLast().getHeight()); + log.info("applySnapshot persisted.chainHeadHeight=" + new LinkedList<>(persisted.getBlocks()).getLast().getHeight()); bsqStateService.applySnapshot(persisted); } else { log.info("Try to apply snapshot but no stored snapshot available"); From 835db24e535c66626f5a1a0348f5f450b61f0a7a Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Mon, 24 Sep 2018 17:39:23 -0500 Subject: [PATCH 2/2] Fix threading issues with ExportJsonFilesService - Rename JsonBlockChainExporter to ExportJsonFilesService - Remove getBlocksFromState method - Add ExportJsonFilesService to DaoSetup and implement DaoSetupService - Add json dir for all json sub dirs - Move access to bsqStateService out from thread in ExportJsonFilesService - Only do write to disk in thread - Sync model files --- .../bisq/core/dao/node/full/FullNode.java | 2 +- .../dao/node/json/ExportJsonFilesService.java | 144 +++++++++--------- .../java/bisq/core/dao/node/json/JsonTx.java | 5 +- .../bisq/core/dao/node/json/JsonTxInput.java | 9 +- .../bisq/core/dao/node/json/JsonTxOutput.java | 11 +- .../core/dao/node/json/JsonTxOutputType.java | 47 ++++++ .../bisq/core/dao/node/json/JsonTxType.java | 9 +- 7 files changed, 143 insertions(+), 84 deletions(-) create mode 100644 core/src/main/java/bisq/core/dao/node/json/JsonTxOutputType.java diff --git a/core/src/main/java/bisq/core/dao/node/full/FullNode.java b/core/src/main/java/bisq/core/dao/node/full/FullNode.java index df5bd81cf8..5843368acc 100644 --- a/core/src/main/java/bisq/core/dao/node/full/FullNode.java +++ b/core/src/main/java/bisq/core/dao/node/full/FullNode.java @@ -147,7 +147,7 @@ public class FullNode extends BsqNode { } private void onNewBlock(Block block) { - exportJsonFilesService.maybeExport(); + exportJsonFilesService.exportToJson(); if (p2pNetworkReady && parseBlockchainComplete) fullNodeNetworkService.publishNewBlock(block); diff --git a/core/src/main/java/bisq/core/dao/node/json/ExportJsonFilesService.java b/core/src/main/java/bisq/core/dao/node/json/ExportJsonFilesService.java index 928d626194..bd64c30e9c 100644 --- a/core/src/main/java/bisq/core/dao/node/json/ExportJsonFilesService.java +++ b/core/src/main/java/bisq/core/dao/node/json/ExportJsonFilesService.java @@ -22,8 +22,6 @@ import bisq.core.dao.DaoSetupService; import bisq.core.dao.state.BsqState; import bisq.core.dao.state.BsqStateService; import bisq.core.dao.state.blockchain.PubKeyScript; -import bisq.core.dao.state.blockchain.SpentInfo; -import bisq.core.dao.state.blockchain.Tx; import bisq.core.dao.state.blockchain.TxOutput; import bisq.core.dao.state.blockchain.TxType; @@ -49,9 +47,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; @@ -66,7 +62,8 @@ public class ExportJsonFilesService implements DaoSetupService { private final File storageDir; private final boolean dumpBlockchainData; - private final ListeningExecutorService executor = Utilities.getListeningExecutorService("JsonExporter", 1, 1, 1200); + private final ListeningExecutorService executor = Utilities.getListeningExecutorService("JsonExporter", + 1, 1, 1200); private JsonFileManager txFileManager, txOutputFileManager, bsqStateFileManager; @Inject @@ -134,82 +131,91 @@ public class ExportJsonFilesService implements DaoSetupService { } } - public void maybeExport() { + public void exportToJson() { if (dumpBlockchainData) { // We store the data we need once we write the data to disk (in the thread) locally. // Access to bsqStateService is single threaded, we must not access bsqStateService from the thread. - List jsonTxOutputs = new ArrayList<>(); + List allJsonTxOutputs = new ArrayList<>(); List jsonTxs = new ArrayList<>(); BsqState bsqStateClone = bsqStateService.getClone(); - Map txMap = new LinkedList<>(bsqStateService.getBlocks()).stream() - .filter(Objects::nonNull) - .flatMap(block -> block.getTxs().stream()) - .collect(Collectors.toMap(Tx::getId, tx -> tx)); - for (Tx tx : txMap.values()) { + bsqStateService.getTxStream().forEach(tx -> { + List jsonTxOutputs = new ArrayList<>(); String txId = tx.getId(); - Optional optionalTxType = bsqStateService.getOptionalTxType(txId); - optionalTxType.ifPresent(txType -> { - JsonTxType jsonTxType = txType != TxType.UNDEFINED_TX_TYPE ? JsonTxType.valueOf(txType.name()) : null; - - tx.getTxOutputs().forEach(txOutput -> { - Optional optionalSpentInfo = bsqStateService.getSpentInfo(txOutput); - boolean isBsqOutput = bsqStateService.isBsqTxOutputType(txOutput); - PubKeyScript pubKeyScript = txOutput.getPubKeyScript(); - JsonTxOutput jsonTxOutput = new JsonTxOutput(txId, - txOutput.getIndex(), - isBsqOutput ? txOutput.getValue() : 0, - !isBsqOutput ? txOutput.getValue() : 0, - txOutput.getBlockHeight(), - isBsqOutput, - bsqStateService.getBurntFee(tx.getId()), - txOutput.getAddress(), - pubKeyScript != null ? new JsonScriptPubKey(pubKeyScript) : null, - optionalSpentInfo.map(JsonSpentInfo::new).orElse(null), - tx.getTime(), - jsonTxType, - jsonTxType != null ? jsonTxType.getDisplayString() : "", - txOutput.getOpReturnData() != null ? Utils.HEX.encode(txOutput.getOpReturnData()) : null - ); - jsonTxOutputs.add(jsonTxOutput); - }); - - List inputs = tx.getTxInputs().stream() - .map(txInput -> { - Optional optionalTxOutput = bsqStateService.getConnectedTxOutput(txInput); - if (optionalTxOutput.isPresent()) { - final TxOutput connectedTxOutput = optionalTxOutput.get(); - final boolean isBsqOutput = bsqStateService.isBsqTxOutputType(connectedTxOutput); - return new JsonTxInput(txInput.getConnectedTxOutputIndex(), - txInput.getConnectedTxOutputTxId(), - connectedTxOutput.getValue(), - isBsqOutput, - connectedTxOutput.getAddress(), - tx.getTime()); - } else { - return null; - } - }) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - - JsonTx jsonTx = new JsonTx(txId, - tx.getBlockHeight(), - tx.getBlockHash(), - tx.getTime(), - inputs, - jsonTxOutputs, + long time = tx.getTime(); + int blockHeight = tx.getBlockHeight(); + long burntFee = bsqStateService.getBurntFee(tx.getId()); + TxType txType = tx.getTxType(); + JsonTxType jsonTxType = txType != null ? JsonTxType.valueOf(txType.name()) : null; + String jsonTxTypeDisplayString = jsonTxType != null ? jsonTxType.getDisplayString() : ""; + tx.getTxOutputs().forEach(txOutput -> { + boolean isBsqTxOutputType = bsqStateService.isBsqTxOutputType(txOutput); + long bsqAmount = isBsqTxOutputType ? txOutput.getValue() : 0; + long btcAmount = !isBsqTxOutputType ? txOutput.getValue() : 0; + PubKeyScript pubKeyScript = txOutput.getPubKeyScript(); + JsonScriptPubKey scriptPubKey = pubKeyScript != null ? new JsonScriptPubKey(pubKeyScript) : null; + JsonSpentInfo spentInfo = bsqStateService.getSpentInfo(txOutput).map(JsonSpentInfo::new).orElse(null); + JsonTxOutputType txOutputType = JsonTxOutputType.valueOf(txOutput.getTxOutputType().name()); + int lockTime = txOutput.getLockTime(); + String opReturn = txOutput.getOpReturnData() != null ? Utils.HEX.encode(txOutput.getOpReturnData()) : null; + JsonTxOutput jsonTxOutput = new JsonTxOutput(txId, + txOutput.getIndex(), + bsqAmount, + btcAmount, + blockHeight, + isBsqTxOutputType, + burntFee, + txOutput.getAddress(), + scriptPubKey, + spentInfo, + time, jsonTxType, - jsonTxType != null ? jsonTxType.getDisplayString() : "", - bsqStateService.getBurntFee(tx.getId())); - - jsonTxs.add(jsonTx); + jsonTxTypeDisplayString, + txOutputType, + txOutputType.getDisplayString(), + opReturn, + lockTime + ); + jsonTxOutputs.add(jsonTxOutput); + allJsonTxOutputs.add(jsonTxOutput); }); - } + + List inputs = tx.getTxInputs().stream() + .map(txInput -> { + Optional optionalTxOutput = bsqStateService.getConnectedTxOutput(txInput); + if (optionalTxOutput.isPresent()) { + TxOutput connectedTxOutput = optionalTxOutput.get(); + boolean isBsqTxOutputType = bsqStateService.isBsqTxOutputType(connectedTxOutput); + return new JsonTxInput(txInput.getConnectedTxOutputIndex(), + txInput.getConnectedTxOutputTxId(), + connectedTxOutput.getValue(), + isBsqTxOutputType, + connectedTxOutput.getAddress(), + time); + } else { + return null; + } + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + JsonTx jsonTx = new JsonTx(txId, + blockHeight, + tx.getBlockHash(), + time, + inputs, + jsonTxOutputs, + jsonTxType, + jsonTxTypeDisplayString, + burntFee, + tx.getUnlockBlockHeight()); + + jsonTxs.add(jsonTx); + }); ListenableFuture future = executor.submit(() -> { bsqStateFileManager.writeToDisc(Utilities.objectToJson(bsqStateClone), "BsqStateService"); - jsonTxOutputs.forEach(jsonTxOutput -> txOutputFileManager.writeToDisc(Utilities.objectToJson(jsonTxOutput), jsonTxOutput.getId())); + allJsonTxOutputs.forEach(jsonTxOutput -> txOutputFileManager.writeToDisc(Utilities.objectToJson(jsonTxOutput), jsonTxOutput.getId())); jsonTxs.forEach(jsonTx -> txFileManager.writeToDisc(Utilities.objectToJson(jsonTx), jsonTx.getId())); return null; }); diff --git a/core/src/main/java/bisq/core/dao/node/json/JsonTx.java b/core/src/main/java/bisq/core/dao/node/json/JsonTx.java index 5ea7d38e4b..e68e74859e 100644 --- a/core/src/main/java/bisq/core/dao/node/json/JsonTx.java +++ b/core/src/main/java/bisq/core/dao/node/json/JsonTx.java @@ -23,9 +23,8 @@ import java.util.List; import lombok.Value; -//TODO sync up with data model @Value -public class JsonTx { +class JsonTx { private final String txVersion = Version.BSQ_TX_VERSION; private final String id; private final int blockHeight; @@ -36,4 +35,6 @@ public class JsonTx { private final JsonTxType txType; private final String txTypeDisplayString; private final long burntFee; + // If not set it is -1. LockTime of 0 is a valid value. + private final int unlockBlockHeight; } diff --git a/core/src/main/java/bisq/core/dao/node/json/JsonTxInput.java b/core/src/main/java/bisq/core/dao/node/json/JsonTxInput.java index 329e75a1e0..684d6031d8 100644 --- a/core/src/main/java/bisq/core/dao/node/json/JsonTxInput.java +++ b/core/src/main/java/bisq/core/dao/node/json/JsonTxInput.java @@ -21,14 +21,13 @@ import lombok.Value; import javax.annotation.concurrent.Immutable; -//TODO sync up with data model @Value @Immutable -public class JsonTxInput { - private final int spendingTxOutputIndex; - private final String spendingTxId; +class JsonTxInput { + private final int spendingTxOutputIndex; // connectedTxOutputIndex + private final String spendingTxId; // connectedTxOutputTxId private final long bsqAmount; - private final boolean isVerified; + private final boolean isVerified; // isBsqTxOutputType private final String address; private final long time; } diff --git a/core/src/main/java/bisq/core/dao/node/json/JsonTxOutput.java b/core/src/main/java/bisq/core/dao/node/json/JsonTxOutput.java index 069066a645..c19b7bd198 100644 --- a/core/src/main/java/bisq/core/dao/node/json/JsonTxOutput.java +++ b/core/src/main/java/bisq/core/dao/node/json/JsonTxOutput.java @@ -21,7 +21,8 @@ import bisq.common.app.Version; import lombok.Value; -//TODO sync up with data model +import javax.annotation.Nullable; + @Value public class JsonTxOutput { private final String txVersion = Version.BSQ_TX_VERSION; @@ -30,15 +31,21 @@ public class JsonTxOutput { private final long bsqAmount; private final long btcAmount; private final int height; - private final boolean isVerified; + private final boolean isVerified; // isBsqTxOutputType private final long burntFee; private final String address; + @Nullable private final JsonScriptPubKey scriptPubKey; + @Nullable private final JsonSpentInfo spentInfo; private final long time; private final JsonTxType txType; private final String txTypeDisplayString; + private final JsonTxOutputType txOutputType; // new + private final String txOutputTypeDisplayString; // new + @Nullable private final String opReturn; + private final int lockTime; // new public String getId() { return txId + ":" + outputIndex; diff --git a/core/src/main/java/bisq/core/dao/node/json/JsonTxOutputType.java b/core/src/main/java/bisq/core/dao/node/json/JsonTxOutputType.java new file mode 100644 index 0000000000..018a95f9c4 --- /dev/null +++ b/core/src/main/java/bisq/core/dao/node/json/JsonTxOutputType.java @@ -0,0 +1,47 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.core.dao.node.json; + +import lombok.Getter; + +// Need to be in sync with TxOutputType +public enum JsonTxOutputType { + UNDEFINED("Undefined"), + GENESIS_OUTPUT("Genesis"), + BSQ_OUTPUT("BSQ"), + BTC_OUTPUT("BTC"), + PROPOSAL_OP_RETURN_OUTPUT("Proposal opReturn"), + COMP_REQ_OP_RETURN_OUTPUT("Compensation request opReturn"), + CONFISCATE_BOND_OP_RETURN_OUTPUT("Confiscate bond opReturn"), + ISSUANCE_CANDIDATE_OUTPUT("Issuance candidate"), + BLIND_VOTE_LOCK_STAKE_OUTPUT("Blind vote lock stake"), + BLIND_VOTE_OP_RETURN_OUTPUT("Blind vote opReturn"), + VOTE_REVEAL_UNLOCK_STAKE_OUTPUT("Vote reveal unlock stake"), + VOTE_REVEAL_OP_RETURN_OUTPUT("Vote reveal opReturn"), + LOCKUP("Lockup"), + LOCKUP_OP_RETURN_OUTPUT("Lockup opReturn"), + UNLOCK("Unlock"), + INVALID_OUTPUT("Invalid"); + + @Getter + private String displayString; + + JsonTxOutputType(String displayString) { + this.displayString = displayString; + } +} diff --git a/core/src/main/java/bisq/core/dao/node/json/JsonTxType.java b/core/src/main/java/bisq/core/dao/node/json/JsonTxType.java index e624247377..3833341a11 100644 --- a/core/src/main/java/bisq/core/dao/node/json/JsonTxType.java +++ b/core/src/main/java/bisq/core/dao/node/json/JsonTxType.java @@ -19,7 +19,7 @@ package bisq.core.dao.node.json; import lombok.Getter; -//TODO sync up with data model +// Need to be in sync with TxOutputType public enum JsonTxType { UNDEFINED_TX_TYPE("Undefined"), UNVERIFIED("Unverified"), @@ -27,13 +27,12 @@ public enum JsonTxType { GENESIS("Genesis"), TRANSFER_BSQ("Transfer BSQ"), PAY_TRADE_FEE("Pay trade fee"), - PROPOSAL("Ballot"), + PROPOSAL("Proposal"), COMPENSATION_REQUEST("Compensation request"), - VOTE("Vote"), BLIND_VOTE("Blind vote"), VOTE_REVEAL("Vote reveal"), - LOCK_UP("Lockup"), - UN_LOCK("Unlock"); + LOCKUP("Lockup"), + UNLOCK("Unlock"); @Getter private String displayString;