Handle threading issues with parser

This commit is contained in:
Manfred Karrer 2017-04-13 10:37:07 -05:00
parent cd33a623e8
commit 8516781ed7
20 changed files with 204 additions and 267 deletions

View File

@ -50,7 +50,10 @@ public class LookAheadObjectInputStream extends ObjectInputStream {
Pattern.compile("java\\.util\\.Date$"),
Pattern.compile("java\\.util\\.HashSet$"),
Pattern.compile("java\\.util\\.HashMap$"),
Pattern.compile("org\\.bouncycastle\\.jcajce\\.provider\\.asymmetric\\.dsa\\.BCDSAPublicKey$"),
Pattern.compile("java\\.math\\.BigInteger$"),
// Type Signatures
// https://docs.oracle.com/javase/7/docs/technotes/guides/jni/spec/types.html
Pattern.compile("\\[B$") // byte array

View File

@ -120,7 +120,7 @@ public class Storage<T extends Serializable> {
}
// Save delayed and on a background thread
private void queueUpForSave(T serializable) {
public void queueUpForSave(T serializable) {
if (serializable != null) {
log.trace("save " + fileName);
checkNotNull(storageFile, "storageFile = null. Call setupFileStorage before using read/write.");

View File

@ -186,7 +186,7 @@ shared.makerTxFee=Maker: {0}
shared.takerTxFee=Taker: {0}
shared.securityDepositBox.description=Security deposit for BTC {0}
shared.iConfirm=I confirm
shared.tradingFeeInBsqInfo=equivalent to {0} burned as mining fee
shared.tradingFeeInBsqInfo=equivalent to {0} used as mining fee
shared.availableBsqBalance=Available BSQ balance
shared.unverifiedBsqBalance=Unverified BSQ balance
shared.totalBsqBalance=Total BSQ balance
@ -633,6 +633,7 @@ funds.tx.withdrawnFromWallet=Withdrawn from wallet
funds.tx.noTxAvailable=No transactions available
funds.tx.revert=Revert
funds.tx.txSent=Transaction successfully sent to a new address in the local bisq wallet.
funds.tx.direction.self=Intra-wallet transaction
####################################################################
@ -954,7 +955,7 @@ dao.wallet.send.setDestinationAddress=Fill in your destination address
dao.wallet.send.send=Send BSQ funds
dao.wallet.send.sendFunds.headline=Confirm withdrawal request
dao.wallet.send.sendFunds.details=Sending: {0}\nTo receiving address: {1}.\nRequired transaction fee is: {2} ({3} Satoshis/byte)\nTransaction size: {4} Kb\n\nThe recipient will receive: {5}\n\nAre you sure you want to withdraw that amount?
dao.wallet.burned=Burned BSQ (fee payment)
dao.wallet.bsqFee=BSQ fee payment
####################################################################

View File

@ -886,7 +886,7 @@ dao.wallet.send.setDestinationAddress=Unesite vašu adresu destinacije
dao.wallet.send.send=Pošalji BSQ sredstva
dao.wallet.send.sendFunds.headline=Potvrdi zahtev za podizanje
dao.wallet.send.sendFunds.details=Slanje\: {0}\nNa adresu primanja\: {1}\nPotrebna provizija transakcije je\: {2} ({3} Satošija/bajt)\nVeličina transakcije\: {4} Kb\n\nPrimalac će dobiti\: {5}\n\nDa li ste sigurni da želite podići taj iznos?
dao.wallet.burned=Spaljen BSQ (uplata provizije)
dao.wallet.bsqFee=Spaljen BSQ (uplata provizije)
####################################################################

View File

@ -23,7 +23,6 @@ import io.bisq.core.btc.Restrictions;
import io.bisq.core.btc.exceptions.TransactionVerificationException;
import io.bisq.core.btc.exceptions.WalletException;
import io.bisq.core.dao.blockchain.BsqBlockchainManager;
import io.bisq.core.dao.blockchain.TxOutputMap;
import io.bisq.core.provider.fee.FeeService;
import io.bisq.core.user.Preferences;
import javafx.collections.FXCollections;
@ -50,7 +49,6 @@ import static org.bitcoinj.core.TransactionConfidence.ConfidenceType.PENDING;
public class BsqWalletService extends WalletService {
private final BsqBlockchainManager bsqBlockchainManager;
private TxOutputMap txOutputMap;
private final BsqCoinSelector bsqCoinSelector;
@Getter
private final ObservableList<Transaction> walletTransactions = FXCollections.observableArrayList();
@ -66,7 +64,6 @@ public class BsqWalletService extends WalletService {
@Inject
public BsqWalletService(WalletsSetup walletsSetup,
BsqBlockchainManager bsqBlockchainManager,
TxOutputMap txOutputMap,
Preferences preferences,
FeeService feeService) {
super(walletsSetup,
@ -74,7 +71,6 @@ public class BsqWalletService extends WalletService {
feeService);
this.bsqBlockchainManager = bsqBlockchainManager;
this.txOutputMap = txOutputMap;
this.bsqCoinSelector = new BsqCoinSelector(true);
walletsSetup.addSetupCompletedHandler(() -> {
@ -200,7 +196,7 @@ public class BsqWalletService extends WalletService {
final boolean isPending = parentTx.getConfidence().getConfidenceType() == PENDING;
final boolean isMine = out.isMine(wallet);
return (isPending && isMine) ||
txOutputMap.contains(parentTx.getHashAsString(), out.getIndex());
bsqBlockchainManager.getTxOutputMap().contains(parentTx.getHashAsString(), out.getIndex());
})
.map(TransactionOutput::getParentTransaction)
.collect(Collectors.toSet());

View File

@ -22,7 +22,6 @@ import io.bisq.common.app.AppModule;
import io.bisq.core.dao.blockchain.BsqBlockchainManager;
import io.bisq.core.dao.blockchain.BsqBlockchainRpcService;
import io.bisq.core.dao.blockchain.BsqBlockchainService;
import io.bisq.core.dao.blockchain.TxOutputMap;
import io.bisq.core.dao.blockchain.json.JsonExporter;
import io.bisq.core.dao.compensation.CompensationRequestManager;
import io.bisq.core.dao.vote.VotingDefaultValues;
@ -46,7 +45,6 @@ public class DaoModule extends AppModule {
bind(DaoManager.class).in(Singleton.class);
bind(BsqBlockchainManager.class).in(Singleton.class);
bind(BsqBlockchainService.class).to(BsqBlockchainRpcService.class).in(Singleton.class);
bind(TxOutputMap.class).in(Singleton.class);
bind(JsonExporter.class).in(Singleton.class);
bind(DaoPeriodService.class).in(Singleton.class);
bind(VotingService.class).in(Singleton.class);

View File

@ -1,48 +0,0 @@
/*
* This file is part of bisq.
*
* bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bisq.core.dao.blockchain;
import lombok.Value;
import java.util.ArrayList;
import java.util.List;
@Value
public class BsqBlock {
private final int height;
private final List<String> txIds;
private final List<Tx> txList = new ArrayList<>();
public BsqBlock(List<String> txIds, int height) {
this.txIds = txIds;
this.height = height;
}
public void addTx(Tx tx) {
txList.add(tx);
}
@Override
public String toString() {
return "Block{" +
"\nheight=" + height +
",\ntxIds=" + txIds +
",\ntxList=" + txList +
"}\n";
}
}

View File

@ -18,7 +18,6 @@
package io.bisq.core.dao.blockchain;
import com.google.inject.Inject;
import io.bisq.common.app.DevEnv;
import io.bisq.common.handlers.ErrorMessageHandler;
import io.bisq.common.storage.Storage;
import io.bisq.core.btc.wallet.WalletUtils;
@ -61,8 +60,8 @@ public class BsqBlockchainManager {
// If we are block 119 and last snapshot was 60 then we get a new trigger for a snapshot at block 120 and
// new snapshot is block 90. We only persist at the new snapshot, so we always re-parse from latest snapshot after
// a restart.
// As we only store snapshots when ne Txos are added it might be that there are bigger gaps than SNAPSHOT_TRIGGER.
private static final int SNAPSHOT_TRIGGER = 10000; // set high to deactivate
// As we only store snapshots when Txos are added it might be that there are bigger gaps than SNAPSHOT_TRIGGER.
private static final int SNAPSHOT_TRIGGER = 50; // set high to deactivate
public static int getSnapshotTrigger() {
return SNAPSHOT_TRIGGER;
@ -94,8 +93,7 @@ public class BsqBlockchainManager {
public static boolean triggersSnapshot(int height) {
return height - SNAPSHOT_TRIGGER == getSnapshotHeight(height);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Class fields
///////////////////////////////////////////////////////////////////////////////////////////
@ -106,8 +104,9 @@ public class BsqBlockchainManager {
private final List<TxOutputMapListener> txOutputMapListeners = new ArrayList<>();
@Getter
private final TxOutputMap txOutputMap;
private final TxOutputMap snapshotTxOutputMap = new TxOutputMap();
private TxOutputMap txOutputMap;
private TxOutputMap snapshotTxOutputMap;
@Getter
private int chainHeadHeight;
@Getter
@ -123,13 +122,11 @@ public class BsqBlockchainManager {
@Inject
public BsqBlockchainManager(BsqBlockchainService blockchainService,
P2PService p2PService,
TxOutputMap txOutputMap,
JsonExporter jsonExporter,
@Named(Storage.DIR_KEY) File storageDir,
@Named(RpcOptionKeys.RPC_USER) String rpcUser) {
this.blockchainService = blockchainService;
this.p2PService = p2PService;
this.txOutputMap = txOutputMap;
this.jsonExporter = jsonExporter;
snapshotTxOutputMapStorage = new Storage<>(storageDir);
@ -142,14 +139,14 @@ public class BsqBlockchainManager {
///////////////////////////////////////////////////////////////////////////////////////////
public void onAllServicesInitialized(ErrorMessageHandler errorMessageHandler) {
TxOutputMap persisted = snapshotTxOutputMapStorage.initAndGetPersisted(snapshotTxOutputMap, "TxOutputMap");
TxOutputMap persisted = snapshotTxOutputMapStorage.initAndGetPersistedWithFileName("TxOutputMap");
if (persisted != null) {
txOutputMap.putAll(persisted.getMap());
txOutputMap.setSnapshotHeight(persisted.getSnapshotHeight());
txOutputMap.setBlockHeight(persisted.getBlockHeight());
}
if (!txOutputMap.isEmpty())
txOutputMap = persisted;
// If we have persisted data we notify our listeners
onBsqTxoChanged();
} else {
txOutputMap = new TxOutputMap();
}
if (connectToBtcCore)
blockchainService.setup(this::onSetupComplete, errorMessageHandler);
@ -170,10 +167,8 @@ public class BsqBlockchainManager {
}
public void add(TxOutput txOutput) {
if (!txOutputMap.contains(txOutput.getTxIdIndexTuple())) {
txOutputMap.put(txOutput);
if (txOutputMap.put(txOutput) == null)
onBsqTxoChanged();
}
}
public void addTxOutputMapListener(BsqBlockchainManager.TxOutputMapListener txOutputMapListener) {
@ -188,21 +183,20 @@ public class BsqBlockchainManager {
private void onSetupComplete() {
final int genesisBlockHeight = getGenesisBlockHeight();
final String genesisTxId = getGenesisTxId();
int startBlockHeight = Math.max(genesisBlockHeight, txOutputMap.getSnapshotHeight());
int startBlockHeight = Math.max(genesisBlockHeight, txOutputMap.getBlockHeight());
log.info("parseBlocks with:\n" +
"genesisTxId={}\n" +
"genesisBlockHeight={}\n" +
"startBlockHeight={}\n" +
"snapshotHeight={}",
genesisTxId, genesisBlockHeight, startBlockHeight, txOutputMap.getSnapshotHeight());
"txOutputMap.lastBlockHeight={}",
genesisTxId,
genesisBlockHeight,
startBlockHeight,
txOutputMap.getBlockHeight());
parseBlocks(startBlockHeight,
genesisBlockHeight,
genesisTxId);
// If we have past data we notify our listeners
if (txOutputMap.getSnapshotHeight() > 0)
onBsqTxoChanged();
}
// TODO handle reorgs
@ -216,11 +210,8 @@ public class BsqBlockchainManager {
genesisTxId,
txOutputMap,
snapshotTxOutputMap -> {
// we are done but it might be that new blocks have arrived in the meantime,
// so we try again with startBlockHeight set to current chainHeadHeight
//applyNewTxOutputMapAndPersistSnapshot(snapshotTxOutputMap, snapshotTxOutputMap.getLastBlockHeight());
applyNewTxOutputMap(snapshotTxOutputMap);
checkForSnapshotUpdate(snapshotTxOutputMap.getBlockHeight());
updateSnapshotOnTrigger(snapshotTxOutputMap.getBlockHeight());
}, chainTipTxOutputMap -> {
// we are done but it might be that new blocks have arrived in the meantime,
// so we try again with startBlockHeight set to current chainHeadHeight
@ -239,14 +230,29 @@ public class BsqBlockchainManager {
// We register our handler for new blocks
blockchainService.addBlockHandler(bsqBlock -> {
blockchainService.parseBlock(bsqBlock,
genesisBlockHeight,
genesisTxId,
txOutputMap,
newTxOutputMap -> {
applyNewTxOutputMap(newTxOutputMap);
checkForSnapshotUpdate(bsqBlock.getHeight());
log.debug("new block parsed. bsqBlock={}", bsqBlock);
if (txOutputMap.getBlockHeight() < newTxOutputMap.getBlockHeight()) {
applyNewTxOutputMap(newTxOutputMap);
updateSnapshotOnTrigger(newTxOutputMap.getBlockHeight());
log.debug("new block parsed. bsqBlock={}", bsqBlock);
} else {
log.warn("We got a newTxOutputMap with a lower block height than the one form the " +
"map we requested. That should not happen, but theoretically could be " +
"if 2 blocks arrive at nearly the same time and the second is faster in " +
"parsing than the first, so the callback of the first will have a lower " +
"height. " +
"txOutputMap.getBlockHeight()={}; " +
"newTxOutputMap.getBlockHeight()={}",
txOutputMap.getBlockHeight(),
newTxOutputMap.getBlockHeight());
checkArgument(txOutputMap.getBlockHeight() < newTxOutputMap.getBlockHeight(),
"blockheight of requesting map and callback cannot be the same");
}
}, throwable -> {
log.error(throwable.toString());
throwable.printStackTrace();
@ -261,36 +267,20 @@ public class BsqBlockchainManager {
private void applyNewTxOutputMap(TxOutputMap newTxOutputMap) {
this.txOutputMap.putAll(newTxOutputMap.getMap());
txOutputMap = newTxOutputMap;
txOutputMapListeners.stream().forEach(l -> l.onTxOutputMapChanged(txOutputMap));
txOutputMap.printSize();
}
private void checkForSnapshotUpdate(int blockHeight) {
final int snapshotHeight = getSnapshotHeight(blockHeight);
final int lastSnapshotHeight = txOutputMap.getSnapshotHeight();
log.debug("checkForSnapshotUpdate: snapshotHeight={}, lastSnapshotHeight={} , lastBlockHeight={} ",
snapshotHeight,
lastSnapshotHeight,
txOutputMap.getBlockHeight());
if (blockHeight > snapshotHeight && lastSnapshotHeight < snapshotHeight) {
snapshotTxOutputMap.putAll(TxOutputMap.getClonedMapUpToHeight(txOutputMap, snapshotHeight));
snapshotTxOutputMap.setSnapshotHeight(snapshotHeight);
snapshotTxOutputMap.setBlockHeight(blockHeight);
txOutputMap.setSnapshotHeight(snapshotHeight);
snapshotTxOutputMapStorage.queueUpForSave();
if (DevEnv.DEV_MODE) {
snapshotTxOutputMap.values().stream().mapToInt(TxOutput::getBlockHeight).max()
.ifPresent(maxHeight -> {
log.error("max blockHeight in snapshotTxOutputMap=" + maxHeight);
checkArgument(maxHeight <= snapshotHeight, "max blockHeight in snapshotTxOutputMap must " +
"not be higher than snapshotHeight");
});
}
log.error("Saved txOutputMap to disc. snapshotHeight={}, blockHeight={}",
snapshotHeight, blockHeight);
snapshotTxOutputMap.printSize();
private void updateSnapshotOnTrigger(int blockHeight) {
// At trigger time we store the last memory stored map to disc
if (snapshotTxOutputMap != null) {
// We clone because storage is in a threaded context
TxOutputMap clonedSnapshotTxOutputMap = TxOutputMap.getClonedMap(snapshotTxOutputMap);
snapshotTxOutputMapStorage.queueUpForSave(clonedSnapshotTxOutputMap);
}
// Now we save the map in memory for the next trigger
snapshotTxOutputMap = TxOutputMap.getClonedMap(txOutputMap);
}
private void onBsqTxoChanged() {

View File

@ -26,6 +26,7 @@ import com.google.inject.Inject;
import com.neemre.btcdcli4j.core.BitcoindException;
import com.neemre.btcdcli4j.core.CommunicationException;
import com.neemre.btcdcli4j.core.client.BtcdClientImpl;
import com.neemre.btcdcli4j.core.domain.Block;
import com.neemre.btcdcli4j.core.domain.RawTransaction;
import com.neemre.btcdcli4j.daemon.BtcdDaemonImpl;
import com.neemre.btcdcli4j.daemon.event.BlockListener;
@ -61,9 +62,9 @@ public class BsqBlockchainRpcService extends BsqBlockchainService {
private final String rpcPassword;
private final String rpcPort;
private final String rpcBlockPort;
private final ListeningExecutorService setupExecutor = Utilities.getListeningExecutorService("RpcService.setup", 1, 1, 5);
private final ListeningExecutorService parseBlockchainExecutor = Utilities.getListeningExecutorService("RpcService.requests", 1, 1, 10);
private final ListeningExecutorService getChainHeightExecutor = Utilities.getListeningExecutorService("RpcService.requests", 1, 1, 10);
private final ListeningExecutorService setupExecutor = Utilities.getListeningExecutorService("RpcServiceSetup", 1, 1, 5);
private final ListeningExecutorService parseBlocksExecutor = Utilities.getListeningExecutorService("ParseBlocks", 1, 1, 60);
private final ListeningExecutorService getChainHeightExecutor = Utilities.getListeningExecutorService("GetChainHeight", 1, 1, 60);
private BtcdClientImpl client;
private BtcdDaemonImpl daemon;
@ -166,7 +167,7 @@ public class BsqBlockchainRpcService extends BsqBlockchainService {
Consumer<TxOutputMap> snapShotHandler,
Consumer<TxOutputMap> resultHandler,
Consumer<Throwable> errorHandler) {
ListenableFuture<TxOutputMap> future = parseBlockchainExecutor.submit(() -> {
ListenableFuture<TxOutputMap> future = parseBlocksExecutor.submit(() -> {
long startTs = System.currentTimeMillis();
BsqParser bsqParser = new BsqParser(this);
// txOutputMap us used in UserThread context, so we clone
@ -188,7 +189,7 @@ public class BsqBlockchainRpcService extends BsqBlockchainService {
@Override
public void onSuccess(TxOutputMap clonedMap) {
UserThread.execute(() -> {
// We map to UserThread. Map was already cloned
// We map to UserThread. Map was already cloned initially.
UserThread.execute(() -> resultHandler.accept(clonedMap));
});
}
@ -202,13 +203,13 @@ public class BsqBlockchainRpcService extends BsqBlockchainService {
}
@Override
void parseBlock(BsqBlock block,
void parseBlock(Block block,
int genesisBlockHeight,
String genesisTxId,
TxOutputMap txOutputMap,
Consumer<TxOutputMap> resultHandler,
Consumer<Throwable> errorHandler) {
ListenableFuture<TxOutputMap> future = parseBlockchainExecutor.submit(() -> {
ListenableFuture<TxOutputMap> future = parseBlocksExecutor.submit(() -> {
BsqParser bsqParser = new BsqParser(this);
// txOutputMap us used in UserThread context, so we clone);
final TxOutputMap clonedMap = TxOutputMap.getClonedMap(txOutputMap);
@ -237,14 +238,14 @@ public class BsqBlockchainRpcService extends BsqBlockchainService {
}
@Override
void addBlockHandler(Consumer<BsqBlock> blockHandler) {
void addBlockHandler(Consumer<Block> blockHandler) {
daemon.addBlockListener(new BlockListener() {
@Override
public void blockDetected(com.neemre.btcdcli4j.core.domain.Block btcdBlock) {
if (btcdBlock != null) {
public void blockDetected(Block block) {
if (block != null) {
UserThread.execute(() -> {
log.info("New block received: height={}, id={}", btcdBlock.getHeight(), btcdBlock.getHash());
blockHandler.accept(new BsqBlock(btcdBlock.getTx(), btcdBlock.getHeight()));
log.info("New block received: height={}, id={}", block.getHeight(), block.getHash());
blockHandler.accept(block);
});
}
}
@ -264,7 +265,7 @@ public class BsqBlockchainRpcService extends BsqBlockchainService {
@VisibleForTesting
@Override
com.neemre.btcdcli4j.core.domain.Block requestBlock(int blockHeight) throws BitcoindException, CommunicationException {
Block requestBlock(int blockHeight) throws BitcoindException, CommunicationException {
return client.getBlock(client.getBlockHash(blockHeight));
}
@ -285,12 +286,13 @@ public class BsqBlockchainRpcService extends BsqBlockchainService {
.stream()
.filter(e -> e != null && e.getN() != null && e.getValue() != null && e.getScriptPubKey() != null)
.map(rawOutput -> new TxOutput(rawOutput.getN(),
rawOutput.getValue().movePointRight(8).longValue(),
rawTransaction.getTxId(),
new PubKeyScript(rawOutput.getScriptPubKey()),
blockHeight,
time,
signaturePubKey))
rawOutput.getValue().movePointRight(8).longValue(),
rawTransaction.getTxId(),
new PubKeyScript(rawOutput.getScriptPubKey()),
blockHeight,
time,
signaturePubKey)
)
.collect(Collectors.toList());
return new Tx(txId,
txInputs,

View File

@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import com.neemre.btcdcli4j.core.BitcoindException;
import com.neemre.btcdcli4j.core.CommunicationException;
import com.neemre.btcdcli4j.core.domain.Block;
import com.neemre.btcdcli4j.core.domain.RawTransaction;
import io.bisq.common.crypto.KeyRing;
import io.bisq.common.handlers.ErrorMessageHandler;
@ -61,14 +62,14 @@ abstract public class BsqBlockchainService {
Consumer<TxOutputMap> resultHandler,
Consumer<Throwable> errorHandler);
abstract void parseBlock(BsqBlock block,
abstract void parseBlock(Block block,
int genesisBlockHeight,
String genesisTxId,
TxOutputMap txOutputMap,
Consumer<TxOutputMap> resultHandler,
Consumer<Throwable> errorHandler);
abstract void addBlockHandler(Consumer<BsqBlock> onNewBlockHandler);
abstract void addBlockHandler(Consumer<Block> onNewBlockHandler);
///////////////////////////////////////////////////////////////////////////////////////////
@ -79,7 +80,7 @@ abstract public class BsqBlockchainService {
abstract int requestChainHeadHeight() throws BitcoindException, CommunicationException;
@VisibleForTesting
abstract com.neemre.btcdcli4j.core.domain.Block requestBlock(int i) throws BitcoindException, CommunicationException;
abstract Block requestBlock(int i) throws BitcoindException, CommunicationException;
@VisibleForTesting
abstract Tx requestTransaction(String txId, int blockHeight) throws BsqBlockchainException;

View File

@ -18,6 +18,7 @@
package io.bisq.core.dao.blockchain;
import com.google.common.annotations.VisibleForTesting;
import com.neemre.btcdcli4j.core.domain.Block;
import io.bisq.common.app.DevEnv;
import lombok.extern.slf4j.Slf4j;
@ -44,11 +45,6 @@ public class BsqParser {
this.bsqBlockchainService = bsqBlockchainService;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Parsing
///////////////////////////////////////////////////////////////////////////////////////////
@VisibleForTesting
void parseBlocks(int startBlockHeight,
int chainHeadHeight,
@ -59,42 +55,36 @@ public class BsqParser {
try {
log.info("chainHeadHeight=" + chainHeadHeight);
long startTotalTs = System.currentTimeMillis();
for (int height = startBlockHeight; height <= chainHeadHeight; height++) {
for (int blockHeight = startBlockHeight; blockHeight <= chainHeadHeight; blockHeight++) {
long startBlockTs = System.currentTimeMillis();
com.neemre.btcdcli4j.core.domain.Block btcdBlock = bsqBlockchainService.requestBlock(height);
log.debug("Current block height=" + height);
Block block = bsqBlockchainService.requestBlock(blockHeight);
log.debug("Current block blockHeight=" + blockHeight);
// 1 block has about 3 MB, but we keep it only in memory as long as needed
final BsqBlock bsqBlock = new BsqBlock(btcdBlock.getTx(), btcdBlock.getHeight());
parseBlock(bsqBlock,
parseBlock(block,
genesisBlockHeight,
genesisTxId,
txOutputMap);
txOutputMap.setBlockHeight(height);
if (BsqBlockchainManager.triggersSnapshot(height)) {
// We clone the map to isolate thread context. TxOutputMap is used in UserThread.
final TxOutputMap clonedSnapShotMap = TxOutputMap.getClonedMapUpToHeight(txOutputMap,
BsqBlockchainManager.getSnapshotHeight(height));
if (BsqBlockchainManager.triggersSnapshot(blockHeight)) {
TxOutputMap clonedSnapShotMap = TxOutputMap.getClonedMap(txOutputMap);
//clonedSnapShotMap.printUnspentTxOutputs("triggersSnapshot");
snapShotHandler.accept(clonedSnapShotMap);
}
/* StringBuilder sb = new StringBuilder("recursionMap:\n");
List<String> list = new ArrayList<>();
//recursionMap.entrySet().stream().forEach(e -> sb.append(e.getKey()).append(": ").append(e.getValue()).append("\n"));
recursionMap.entrySet().stream().forEach(e -> list.add("\nBlock height / Tx graph depth / Nr. of Txs: " + e.getKey()
recursionMap.entrySet().stream().forEach(e -> list.add("\nBlock blockHeight / Tx graph depth / Nr. of Txs: " + e.getKey()
+ " / " + e.getValue()));
Collections.sort(list);
list.stream().forEach(e -> sb.append(e).append("\n"));
log.warn(list.toString().replace(",", "").replace("[", "").replace("]", ""));*/
/* log.info("Parsing for block {} took {} ms. Total: {} ms for {} blocks",
height,
blockHeight,
(System.currentTimeMillis() - startBlockTs),
(System.currentTimeMillis() - startTotalTs),
(height - startBlockHeight + 1));
(blockHeight - startBlockHeight + 1));
Profiler.printSystemLoad(log);*/
}
log.info("Parsing for blocks {} to {} took {} ms",
@ -108,7 +98,7 @@ public class BsqParser {
}
}
void parseBlock(BsqBlock block,
void parseBlock(Block block,
int genesisBlockHeight,
String genesisTxId,
TxOutputMap txOutputMap)
@ -116,11 +106,11 @@ public class BsqParser {
int blockHeight = block.getHeight();
log.debug("Parse block at height={} ", blockHeight);
// We add all transactions to the block
List<String> txIds = block.getTxIds();
List<Tx> txList = new ArrayList<>();
Tx genesisTx = null;
for (String txId : txIds) {
for (String txId : block.getTx()) {
final Tx tx = bsqBlockchainService.requestTransaction(txId, blockHeight);
block.addTx(tx);
txList.add(tx);
if (txId.equals(genesisTxId))
genesisTx = tx;
}
@ -138,12 +128,15 @@ public class BsqParser {
// There are some blocks with testing such dependency chains like block 130768 where at each iteration only
// one get resolved.
// Lately there is a patter with 24 iterations observed
parseTransactions(block.getTxList(), txOutputMap, blockHeight, 0, 5300);
parseTransactions(txList, txOutputMap, blockHeight, 0, 5300);
checkArgument(txOutputMap.getBlockHeight() <= blockHeight,
"blockHeight from txOutputMap must not be larger than blockHeight in parser iteration");
txOutputMap.setBlockHeight(blockHeight);
}
@VisibleForTesting
void parseGenesisTx(Tx tx,
TxOutputMap txOutputMap) {
void parseGenesisTx(Tx tx, TxOutputMap txOutputMap) {
// Genesis tx uses all outputs as BSQ outputs
tx.getOutputs().stream().forEach(txOutput -> {
txOutput.setVerified(true);

View File

@ -19,10 +19,12 @@ package io.bisq.core.dao.blockchain;
import lombok.Value;
import javax.annotation.concurrent.Immutable;
import java.io.Serializable;
import java.util.List;
@Value
@Immutable
public class Tx implements Serializable {
private final String id;
private final List<TxInput> inputs;

View File

@ -19,9 +19,11 @@ package io.bisq.core.dao.blockchain;
import lombok.Value;
import javax.annotation.concurrent.Immutable;
import java.io.Serializable;
@Value
@Immutable
public class TxInput implements Serializable {
private final int spendingTxOutputIndex;
private final String spendingTxId;

View File

@ -24,8 +24,7 @@ import io.bisq.common.util.JsonExclude;
import io.bisq.core.dao.blockchain.btcd.PubKeyScript;
import io.bisq.network.p2p.storage.payload.LazyProcessedStoragePayload;
import io.bisq.network.p2p.storage.payload.PersistedStoragePayload;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Data;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@ -35,16 +34,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Getter
@EqualsAndHashCode
@Slf4j
@Data
public class TxOutput implements LazyProcessedStoragePayload, PersistedStoragePayload {
@JsonExclude
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;
@JsonExclude
public static final long TTL = TimeUnit.DAYS.toMillis(30);
// Immutable
private final int index;
private final long value;
private final String txId;
@ -53,33 +50,27 @@ public class TxOutput implements LazyProcessedStoragePayload, PersistedStoragePa
private final long time;
private final String txVersion = Version.BSQ_TX_VERSION;
@JsonExclude
private PublicKey signaturePubKey;
private final PublicKey signaturePubKey;
// Mutable
// mutable
@Setter
private boolean isBsqCoinBase;
@Setter
private boolean isVerified;
@Setter
private long burnedFee;
@Nullable
@Setter
private long btcTxFee;
@Nullable
@Setter
private SpendInfo spendInfo;
// Lazy set
@Nullable
private String address;
// Should be only used in emergency case if we need to add data but do not want to break backward compatibility
// at the P2P network storage checks. The hash of the object will be used to verify if the data is valid. Any new
// field in a class would break that hash and therefore break the storage mechanism.
@Getter
@Nullable
@Setter
private HashMap<String, String> extraDataMap;
public TxOutput(int index,
@ -108,36 +99,28 @@ public class TxOutput implements LazyProcessedStoragePayload, PersistedStoragePa
return signaturePubKey;
}
@Nullable
@Override
public HashMap<String, String> getExtraDataMap() {
return extraDataMap;
}
public List<String> getAddresses() {
return pubKeyScript.getAddresses();
}
public String getAddress() {
if (address == null) {
// Only at raw MS outputs addresses have more then 1 entry
// We do not support raw MS for BSQ but lets see if is needed anyway, might be removed
final List<String> addresses = pubKeyScript.getAddresses();
if (addresses.size() == 1) {
address = addresses.get(0);
} else if (addresses.size() > 1) {
final String msg = "We got a raw Multisig script. That is not supported for BSQ tokens.";
log.warn(msg);
address = addresses.toString();
if (DevEnv.DEV_MODE)
throw new RuntimeException(msg);
} else if (addresses.isEmpty()) {
final String msg = "We got no address. Unsupported pubKeyScript";
log.warn(msg);
address = "";
if (DevEnv.DEV_MODE)
throw new RuntimeException(msg);
}
String address = "";
// Only at raw MS outputs addresses have more then 1 entry
// We do not support raw MS for BSQ but lets see if is needed anyway, might be removed
final List<String> addresses = pubKeyScript.getAddresses();
if (addresses.size() == 1) {
address = addresses.get(0);
} else if (addresses.size() > 1) {
final String msg = "We got a raw Multisig script. That is not supported for BSQ tokens.";
log.warn(msg);
address = addresses.toString();
if (DevEnv.DEV_MODE)
throw new RuntimeException(msg);
} else {
final String msg = "We got no address. Unsupported pubKeyScript";
log.warn(msg);
if (DevEnv.DEV_MODE)
throw new RuntimeException(msg);
}
return address;
}
@ -154,6 +137,10 @@ public class TxOutput implements LazyProcessedStoragePayload, PersistedStoragePa
return txId + ":" + index;
}
public String getBlockHeightWithTxoId() {
return blockHeight + "/" + getTxoId();
}
public TxIdIndexTuple getTxIdIndexTuple() {
return new TxIdIndexTuple(txId, index);
}

View File

@ -21,13 +21,11 @@ import io.bisq.common.util.Utilities;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.bitcoinj.core.Transaction;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
// Map of any TxOutput which was ever used in context of a BSQ TX.
@ -47,28 +45,12 @@ public class TxOutputMap implements Serializable {
///////////////////////////////////////////////////////////////////////////////////////////
public static TxOutputMap getClonedMap(TxOutputMap txOutputMap) {
return new TxOutputMap(txOutputMap);
}
public static TxOutputMap getClonedMapUpToHeight(TxOutputMap txOutputMap, int snapshotHeight) {
final TxOutputMap txOutputMapClone = new TxOutputMap();
txOutputMapClone.setBlockHeight(txOutputMap.getBlockHeight());
txOutputMapClone.setSnapshotHeight(txOutputMap.getSnapshotHeight());
Map<TxIdIndexTuple, TxOutput> map = txOutputMap.entrySet().stream()
.filter(entry -> entry.getValue().getBlockHeight() <= snapshotHeight)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
txOutputMapClone.putAll(map);
return txOutputMapClone;
return Utilities.<TxOutputMap>deserialize(Utilities.serialize(txOutputMap));
}
@Getter
private HashMap<TxIdIndexTuple, TxOutput> map = new HashMap<>();
@Getter
@Setter
private int snapshotHeight = 0;
private HashMap<TxIdIndexTuple, TxOutput> map;
@Getter
@Setter
private int blockHeight;
@ -79,15 +61,9 @@ public class TxOutputMap implements Serializable {
///////////////////////////////////////////////////////////////////////////////////////////
public TxOutputMap() {
map = new HashMap<>();
}
private TxOutputMap(TxOutputMap txOutputMap) {
map = txOutputMap.getMap();
snapshotHeight = txOutputMap.getSnapshotHeight();
blockHeight = txOutputMap.getBlockHeight();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Public methods
///////////////////////////////////////////////////////////////////////////////////////////
@ -97,9 +73,13 @@ public class TxOutputMap implements Serializable {
return txOutput != null && txOutput.isUnSpend();
}
public boolean hasTxBurnedFee(String txId) {
final TxOutput txOutput = get(txId, 0);
return txOutput != null && txOutput.hasBurnedFee();
public boolean hasTxBurnedFee(Transaction tx) {
for (int i = 0; i < tx.getOutputs().size(); i++) {
final TxOutput txOutput = get(tx.getHashAsString(), i);
if (txOutput != null && txOutput.hasBurnedFee())
return true;
}
return false;
}
@ -108,17 +88,10 @@ public class TxOutputMap implements Serializable {
///////////////////////////////////////////////////////////////////////////////////////////
public Object put(TxOutput txOutput) {
blockHeight = txOutput.getBlockHeight();
return map.put(txOutput.getTxIdIndexTuple(), txOutput);
}
public void putAll(Map<TxIdIndexTuple, TxOutput> txOutputs) {
map.putAll(txOutputs);
}
public void putAll(TxOutputMap txOutputMap) {
map.putAll(txOutputMap.getMap());
}
@Nullable
public TxOutput get(String txId, int index) {
return get(new TxIdIndexTuple(txId, index));
@ -153,6 +126,10 @@ public class TxOutputMap implements Serializable {
return map.size();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Utils
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public String toString() {
return "TxOutputMap " + map.toString();
@ -161,5 +138,38 @@ public class TxOutputMap implements Serializable {
public void printSize() {
log.info("Nr of entries={}; Size in kb={}", size(), Utilities.serialize(this).length / 1000d);
}
public String getTuplesAsString() {
return map.keySet().stream().map(TxIdIndexTuple::toString).collect(Collectors.joining(","));
}
public Set<TxOutput> getUnspentTxOutputs() {
return map.values().stream().filter(TxOutput::isUnSpend).collect(Collectors.toSet());
}
public List<TxOutput> getSortedUnspentTxOutputs() {
List<TxOutput> list = getUnspentTxOutputs().stream().collect(Collectors.toList());
Collections.sort(list, (o1, o2) -> o1.getBlockHeightWithTxoId().compareTo(o2.getBlockHeightWithTxoId()));
return list;
}
public void printUnspentTxOutputs(String prefix) {
final String txoIds = getBlocHeightSortedTxoIds();
log.info(prefix + " utxo: size={}, blockHeight={}, hashCode={}, txoids={}",
getSortedUnspentTxOutputs().size(),
blockHeight,
getBlockHeightSortedTxoIdsHashCode(),
txoIds);
}
public int getBlockHeightSortedTxoIdsHashCode() {
return getBlocHeightSortedTxoIds().hashCode();
}
private String getBlocHeightSortedTxoIds() {
return getSortedUnspentTxOutputs().stream()
.map(e -> e.getBlockHeight() + "/" + e.getTxoId())
.collect(Collectors.joining("\n"));
}
}

View File

@ -22,27 +22,27 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import io.bisq.common.app.Version;
import io.bisq.common.util.JsonExclude;
import lombok.*;
import lombok.AllArgsConstructor;
import lombok.Value;
import javax.annotation.concurrent.Immutable;
import java.io.Serializable;
import java.util.List;
@Data
@NoArgsConstructor
@Value
@AllArgsConstructor
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = false)
@JsonInclude(Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@Immutable
public class PubKeyScript implements Serializable {
@JsonExclude
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;
private Integer reqSigs;
private ScriptTypes type;
private List<String> addresses;
private String asm;
private String hex;
private final Integer reqSigs;
private final ScriptTypes type;
private final List<String> addresses;
private final String asm;
private final String hex;
public PubKeyScript(com.neemre.btcdcli4j.core.domain.PubKeyScript scriptPubKey) {
this(scriptPubKey.getReqSigs(),

View File

@ -79,7 +79,8 @@ public class CoreProtobufferResolver implements ProtobufferResolver {
log.warn("fromProtoBuf called with empty envelope.");
return Optional.empty();
}
if (envelope.getMessageCase() != PING && envelope.getMessageCase() != PONG && envelope.getMessageCase() != REFRESH_TTL_MESSAGE) {
if (envelope.getMessageCase() != PING && envelope.getMessageCase() != PONG &&
envelope.getMessageCase() != REFRESH_TTL_MESSAGE) {
log.debug("Convert protobuffer envelope: {}, {}", envelope.getMessageCase(), envelope.toString());
} else {
log.debug("Convert protobuffer envelope: {}", envelope.getMessageCase());

View File

@ -110,7 +110,9 @@ public final class Preferences implements Persistable {
private String sellScreenCurrencyCode;
private int tradeStatisticsTickUnitIndex = 3;
// TODO can be removed (wait for PB merge)
private boolean useStickyMarketPrice = false;
private boolean sortMarketCurrenciesNumerically = true;
private boolean usePercentageBasedPrice = true;
private Map<String, String> peerTagMap = new HashMap<>();

View File

@ -130,7 +130,7 @@ class BsqTxListItem {
if (!isBurnedBsqTx)
address = received ? receivedWithAddress : sendToAddress;
else
address = Res.get("dao.wallet.burned");
address = "";
}
private void setupConfidence(BsqWalletService bsqWalletService) {

View File

@ -22,7 +22,6 @@ import io.bisq.common.locale.Res;
import io.bisq.core.btc.wallet.BsqWalletService;
import io.bisq.core.btc.wallet.BtcWalletService;
import io.bisq.core.dao.blockchain.BsqBlockchainManager;
import io.bisq.core.dao.blockchain.TxOutputMap;
import io.bisq.core.user.DontShowAgainLookup;
import io.bisq.core.user.Preferences;
import io.bisq.gui.common.view.ActivatableView;
@ -61,7 +60,6 @@ public class BsqTxView extends ActivatableView<GridPane, Void> {
private int gridRow = 0;
private BsqFormatter bsqFormatter;
private BsqWalletService bsqWalletService;
private TxOutputMap txOutputMap;
private BsqBlockchainManager bsqBlockchainManager;
private BtcWalletService btcWalletService;
private BsqBalanceUtil bsqBalanceUtil;
@ -81,13 +79,11 @@ public class BsqTxView extends ActivatableView<GridPane, Void> {
@Inject
private BsqTxView(BsqFormatter bsqFormatter, BsqWalletService bsqWalletService,
TxOutputMap txOutputMap,
BsqBlockchainManager bsqBlockchainManager,
BtcWalletService btcWalletService, BsqBalanceUtil bsqBalanceUtil, Preferences preferences) {
this.bsqFormatter = bsqFormatter;
this.bsqWalletService = bsqWalletService;
this.txOutputMap = txOutputMap;
this.bsqBlockchainManager = bsqBlockchainManager;
this.btcWalletService = btcWalletService;
this.bsqBalanceUtil = bsqBalanceUtil;
@ -139,7 +135,7 @@ public class BsqTxView extends ActivatableView<GridPane, Void> {
bsqWalletService.getWalletTransactions().removeListener(walletBsqTransactionsListener);
observableList.forEach(BsqTxListItem::cleanup);
if (rootParent != null)
((Pane) root.getParent()).heightProperty().removeListener(parentHeightListener);
rootParent.heightProperty().removeListener(parentHeightListener);
}
private void updateList() {
@ -153,7 +149,7 @@ public class BsqTxView extends ActivatableView<GridPane, Void> {
return new BsqTxListItem(transaction,
bsqWalletService,
btcWalletService,
txOutputMap.hasTxBurnedFee(transaction.getHashAsString()),
bsqBlockchainManager.getTxOutputMap().hasTxBurnedFee(transaction),
bsqFormatter);
}
)
@ -273,11 +269,12 @@ public class BsqTxView extends ActivatableView<GridPane, Void> {
if (item != null && !empty) {
String addressString = item.getAddress();
if (item.isBurnedBsqTx()) {
if (item.isBurnedBsqTx() || item.getAmount().isZero()) {
if (field != null)
field.setOnAction(null);
label = new Label(addressString);
label = new Label(item.isBurnedBsqTx() ?
Res.get("dao.wallet.bsqFee") : Res.get("funds.tx.direction.self"));
setGraphic(label);
} else {
field = new AddressWithIconAndDirection(item.getDirection(), addressString,