Merge pull request #3773 from stejbac/add-tx-map-to-daostate

Add transient tx map to DaoState to speed up getTx queries
This commit is contained in:
Christoph Atteneder 2019-12-16 15:21:03 +01:00 committed by GitHub
commit 20b56c7bde
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 89 additions and 39 deletions

View file

@ -624,8 +624,8 @@ public class DaoFacade implements DaoSetupService {
return daoStateService.getUnspentTxOutputs();
}
public Set<Tx> getTxs() {
return daoStateService.getTxs();
public int getNumTxs() {
return daoStateService.getNumTxs();
}
public Optional<TxOutput> getLockupTxOutput(String txId) {

View file

@ -140,7 +140,7 @@ public class ExportJsonFilesService implements DaoSetupService {
// Access to daoStateService is single threaded, we must not access daoStateService from the thread.
List<JsonTxOutput> allJsonTxOutputs = new ArrayList<>();
List<JsonTx> jsonTxs = daoStateService.getTxStream()
List<JsonTx> jsonTxs = daoStateService.getUnorderedTxStream()
.map(tx -> {
JsonTx jsonTx = getJsonTx(tx);
allJsonTxOutputs.addAll(jsonTx.getOutputs());

View file

@ -22,7 +22,6 @@ import bisq.core.dao.node.parser.exceptions.BlockHashNotConnectingException;
import bisq.core.dao.node.parser.exceptions.BlockHeightNotConnectingException;
import bisq.core.dao.state.DaoStateService;
import bisq.core.dao.state.model.blockchain.Block;
import bisq.core.dao.state.model.blockchain.Tx;
import bisq.common.app.DevEnv;
@ -31,7 +30,6 @@ import org.bitcoinj.core.Coin;
import javax.inject.Inject;
import java.util.LinkedList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
@ -55,7 +53,6 @@ public class BlockParser {
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
@SuppressWarnings("WeakerAccess")
@Inject
public BlockParser(TxParser txParser,
DaoStateService daoStateService) {
@ -106,14 +103,13 @@ public class BlockParser {
// one get resolved.
// Lately there is a patter with 24 iterations observed
long startTs = System.currentTimeMillis();
List<Tx> txList = block.getTxs();
rawBlock.getRawTxs().forEach(rawTx ->
txParser.findTx(rawTx,
genesisTxId,
genesisBlockHeight,
genesisTotalSupply)
.ifPresent(txList::add));
.ifPresent(tx -> daoStateService.onNewTxForLastBlock(block, tx)));
log.info("Parsing {} transactions at block height {} took {} ms", rawBlock.getRawTxs().size(),
blockHeight, System.currentTimeMillis() - startTs);

View file

@ -35,8 +35,8 @@ import bisq.core.dao.state.model.governance.EvaluatedProposal;
import bisq.core.dao.state.model.governance.Issuance;
import bisq.core.dao.state.model.governance.IssuanceType;
import bisq.core.dao.state.model.governance.ParamChange;
import bisq.core.util.coin.BsqFormatter;
import bisq.core.util.ParsingUtils;
import bisq.core.util.coin.BsqFormatter;
import org.bitcoinj.core.Coin;
@ -115,6 +115,8 @@ public class DaoStateService implements DaoSetupService {
daoState.setChainHeight(snapshot.getChainHeight());
daoState.setTxCache(snapshot.getTxCache());
daoState.getBlocks().clear();
daoState.getBlocks().addAll(snapshot.getBlocks());
@ -226,7 +228,25 @@ public class DaoStateService implements DaoSetupService {
}
}
// Third we get the onParseBlockComplete called after all rawTxs of blocks have been parsed
// Third we add each successfully parsed BSQ tx to the last block
public void onNewTxForLastBlock(Block block, Tx tx) {
assertDaoStateChange();
getLastBlock().ifPresent(lastBlock -> {
if (block == lastBlock) {
// We need to ensure that the txs in all blocks are in sync with the txs in our txMap (cache).
block.addTx(tx);
daoState.addToTxCache(tx);
} else {
// Not clear if this case can happen but at onNewBlockWithEmptyTxs we handle such a potential edge
// case as well, so we need to reflect that here as well.
log.warn("Block for parsing does not match last block. That might happen in edge cases at reorgs. " +
"Received block={}", block);
}
});
}
// Fourth we get the onParseBlockComplete called after all rawTxs of blocks have been parsed
public void onParseBlockComplete(Block block) {
if (parseBlockChainComplete)
log.info("Parse block completed: Block height {}, {} BSQ transactions.", block.getHeight(), block.getTxs().size());
@ -343,29 +363,24 @@ public class DaoStateService implements DaoSetupService {
// Tx
///////////////////////////////////////////////////////////////////////////////////////////
public Stream<Tx> getTxStream() {
return getBlocks().stream()
.flatMap(block -> block.getTxs().stream());
public Stream<Tx> getUnorderedTxStream() {
return daoState.getTxCache().values().stream();
}
public TreeMap<String, Tx> getTxMap() {
return new TreeMap<>(getTxStream().collect(Collectors.toMap(Tx::getId, tx -> tx)));
}
public Set<Tx> getTxs() {
return getTxStream().collect(Collectors.toSet());
}
public Optional<Tx> getTx(String txId) {
return getTxStream().filter(tx -> tx.getId().equals(txId)).findAny();
public int getNumTxs() {
return daoState.getTxCache().size();
}
public List<Tx> getInvalidTxs() {
return getTxStream().filter(tx -> tx.getTxType() == TxType.INVALID).collect(Collectors.toList());
return getUnorderedTxStream().filter(tx -> tx.getTxType() == TxType.INVALID).collect(Collectors.toList());
}
public List<Tx> getIrregularTxs() {
return getTxStream().filter(tx -> tx.getTxType() == TxType.IRREGULAR).collect(Collectors.toList());
return getUnorderedTxStream().filter(tx -> tx.getTxType() == TxType.IRREGULAR).collect(Collectors.toList());
}
public Optional<Tx> getTx(String txId) {
return Optional.ofNullable(daoState.getTxCache().get(txId));
}
public boolean containsTx(String txId) {
@ -395,11 +410,11 @@ public class DaoStateService implements DaoSetupService {
}
public long getTotalBurntFee() {
return getTxStream().mapToLong(Tx::getBurntFee).sum();
return getUnorderedTxStream().mapToLong(Tx::getBurntFee).sum();
}
public Set<Tx> getBurntFeeTxs() {
return getTxStream()
return getUnorderedTxStream()
.filter(tx -> tx.getBurntFee() > 0)
.collect(Collectors.toSet());
}
@ -418,17 +433,17 @@ public class DaoStateService implements DaoSetupService {
// TxOutput
///////////////////////////////////////////////////////////////////////////////////////////
public Stream<TxOutput> getTxOutputStream() {
return getTxStream()
private Stream<TxOutput> getUnorderedTxOutputStream() {
return getUnorderedTxStream()
.flatMap(tx -> tx.getTxOutputs().stream());
}
public boolean existsTxOutput(TxOutputKey key) {
return getTxOutputStream().anyMatch(txOutput -> txOutput.getKey().equals(key));
return getUnorderedTxOutputStream().anyMatch(txOutput -> txOutput.getKey().equals(key));
}
public Optional<TxOutput> getTxOutput(TxOutputKey txOutputKey) {
return getTxOutputStream()
return getUnorderedTxOutputStream()
.filter(txOutput -> txOutput.getKey().equals(txOutputKey))
.findAny();
}
@ -513,8 +528,8 @@ public class DaoStateService implements DaoSetupService {
// TxOutputType
///////////////////////////////////////////////////////////////////////////////////////////
public Set<TxOutput> getTxOutputsByTxOutputType(TxOutputType txOutputType) {
return getTxOutputStream()
private Set<TxOutput> getTxOutputsByTxOutputType(TxOutputType txOutputType) {
return getUnorderedTxOutputStream()
.filter(txOutput -> txOutput.getTxOutputType() == txOutputType)
.collect(Collectors.toSet());
}
@ -823,12 +838,12 @@ public class DaoStateService implements DaoSetupService {
}
public long getTotalAmountOfInvalidatedBsq() {
return getTxStream().mapToLong(Tx::getInvalidatedBsq).sum();
return getUnorderedTxStream().mapToLong(Tx::getInvalidatedBsq).sum();
}
// Contains burnt fee and invalidated bsq due invalid txs
public long getTotalAmountOfBurntBsq() {
return getTxStream().mapToLong(Tx::getBurntBsq).sum();
return getUnorderedTxStream().mapToLong(Tx::getBurntBsq).sum();
}
// Confiscate bond

View file

@ -19,6 +19,7 @@ package bisq.core.dao.state.model;
import bisq.core.dao.state.model.blockchain.Block;
import bisq.core.dao.state.model.blockchain.SpentInfo;
import bisq.core.dao.state.model.blockchain.Tx;
import bisq.core.dao.state.model.blockchain.TxOutput;
import bisq.core.dao.state.model.blockchain.TxOutputKey;
import bisq.core.dao.state.model.governance.Cycle;
@ -28,22 +29,25 @@ import bisq.core.dao.state.model.governance.Issuance;
import bisq.core.dao.state.model.governance.ParamChange;
import bisq.common.proto.persistable.PersistablePayload;
import bisq.common.util.JsonExclude;
import com.google.protobuf.Message;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/**
* Root class for mutable state of the DAO.
* Holds both blockchain data as well as data derived from the governance process (voting).
@ -98,6 +102,10 @@ public class DaoState implements PersistablePayload {
@Getter
private final List<DecryptedBallotsWithMerits> decryptedBallotsWithMeritsList;
// Transient data used only as an index - must be kept in sync with the block list
@JsonExclude
private transient final Map<String, Tx> txCache; // key is txId
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
@ -145,6 +153,10 @@ public class DaoState implements PersistablePayload {
this.paramChangeList = paramChangeList;
this.evaluatedProposalList = evaluatedProposalList;
this.decryptedBallotsWithMeritsList = decryptedBallotsWithMeritsList;
txCache = blocks.stream()
.flatMap(block -> block.getTxs().stream())
.collect(Collectors.toMap(Tx::getId, Function.identity(), (x, y) -> x, HashMap::new));
}
@Override
@ -224,6 +236,21 @@ public class DaoState implements PersistablePayload {
return getBsqStateBuilderExcludingBlocks().addBlocks(getBlocks().getLast().toProtoMessage()).build().toByteArray();
}
public void addToTxCache(Tx tx) {
// We shouldn't get duplicate txIds, but use putIfAbsent instead of put for consistency with the map merge
// function used in the constructor to initialise txCache (and to exactly match the pre-caching behaviour).
txCache.putIfAbsent(tx.getId(), tx);
}
public void setTxCache(Map<String, Tx> txCache) {
this.txCache.clear();
this.txCache.putAll(txCache);
}
public Map<String, Tx> getTxCache() {
return Collections.unmodifiableMap(txCache);
}
@Override
public String toString() {
return "DaoState{" +
@ -237,6 +264,7 @@ public class DaoState implements PersistablePayload {
",\n paramChangeList=" + paramChangeList +
",\n evaluatedProposalList=" + evaluatedProposalList +
",\n decryptedBallotsWithMeritsList=" + decryptedBallotsWithMeritsList +
",\n txCache=" + txCache +
"\n}";
}
}

View file

@ -24,11 +24,11 @@ import bisq.common.proto.persistable.PersistablePayload;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
import lombok.Value;
/**
* The Block which gets persisted in the DaoState. During parsing transactions can be
@ -44,8 +44,8 @@ import lombok.Value;
*
*/
@EqualsAndHashCode(callSuper = true)
@Value
public final class Block extends BaseBlock implements PersistablePayload, ImmutableDaoStateModel {
// We do not expose txs with a Lombok getter. We cannot make it immutable as we add transactions during parsing.
private final List<Tx> txs;
public Block(int height, long time, String hash, String previousBlockHash) {
@ -93,6 +93,17 @@ public final class Block extends BaseBlock implements PersistablePayload, Immuta
txs);
}
public void addTx(Tx tx) {
txs.add(tx);
}
// We want to guarantee that no client can modify the list. We use unmodifiableList and not ImmutableList as
// we want that clients reflect any change to the source list. Also ImmutableList is more expensive as it
// creates a copy.
public List<Tx> getTxs() {
return Collections.unmodifiableList(txs);
}
@Override
public String toString() {
return "Block{" +

View file

@ -143,7 +143,7 @@ public class BSQTransactionsView extends ActivatableView<GridPane, Void> impleme
///////////////////////////////////////////////////////////////////////////////////////////
private void updateWithBsqBlockChainData() {
allTxTextField.setText(String.valueOf(daoFacade.getTxs().size()));
allTxTextField.setText(String.valueOf(daoFacade.getNumTxs()));
utxoTextField.setText(String.valueOf(daoFacade.getUnspentTxOutputs().size()));
compensationIssuanceTxTextField.setText(String.valueOf(daoFacade.getNumIssuanceTransactions(IssuanceType.COMPENSATION)));
reimbursementIssuanceTxTextField.setText(String.valueOf(daoFacade.getNumIssuanceTransactions(IssuanceType.REIMBURSEMENT)));