Add reorg handling

This commit is contained in:
Manfred Karrer 2017-04-13 13:50:03 -05:00
parent 8516781ed7
commit 552d8f1910
9 changed files with 156 additions and 58 deletions

View File

@ -92,6 +92,7 @@ public class BsqWalletService extends WalletService {
@Override
public void onReorganize(Wallet wallet) {
log.warn("onReorganize ");
updateBsqWalletTransactions();
}
@ -196,7 +197,7 @@ public class BsqWalletService extends WalletService {
final boolean isPending = parentTx.getConfidence().getConfidenceType() == PENDING;
final boolean isMine = out.isMine(wallet);
return (isPending && isMine) ||
bsqBlockchainManager.getTxOutputMap().contains(parentTx.getHashAsString(), out.getIndex());
bsqBlockchainManager.getTxOutputMap() != null && bsqBlockchainManager.getTxOutputMap().contains(parentTx.getHashAsString(), out.getIndex());
})
.map(TransactionOutput::getParentTransaction)
.collect(Collectors.toSet());

View File

@ -288,7 +288,7 @@ public abstract class WalletService {
public TransactionConfidence getConfidenceForAddress(Address address) {
List<TransactionConfidence> transactionConfidenceList = new ArrayList<>();
if (wallet != null) {
Set<Transaction> transactions = wallet.getTransactions(true);
Set<Transaction> transactions = wallet.getTransactions(false);
if (transactions != null) {
transactionConfidenceList.addAll(transactions.stream().map(tx ->
getTransactionConfidence(tx, address)).collect(Collectors.toList()));
@ -300,8 +300,7 @@ public abstract class WalletService {
@Nullable
public TransactionConfidence getConfidenceForTxId(String txId) {
if (wallet != null) {
// TODO includeDead txs?
Set<Transaction> transactions = wallet.getTransactions(true);
Set<Transaction> transactions = wallet.getTransactions(false);
for (Transaction tx : transactions) {
if (tx.getHashAsString().equals(txId))
return tx.getConfidence();
@ -384,7 +383,7 @@ public abstract class WalletService {
public int getNumTxOutputsForAddress(Address address) {
List<TransactionOutput> transactionOutputs = new ArrayList<>();
wallet.getTransactions(true).stream().forEach(t -> transactionOutputs.addAll(t.getOutputs()));
wallet.getTransactions(false).stream().forEach(t -> transactionOutputs.addAll(t.getOutputs()));
int outputs = 0;
for (TransactionOutput output : transactionOutputs) {
if (WalletUtils.isOutputScriptConvertableToAddress(output) &&
@ -574,6 +573,11 @@ public abstract class WalletService {
notifyBalanceListeners(tx);
}
@Override
public void onReorganize(Wallet wallet) {
log.warn("onReorganize ");
}
@Override
public void onTransactionConfidenceChanged(Wallet wallet, Transaction tx) {
for (AddressConfidenceListener addressConfidenceListener : addressConfidenceListeners) {

View File

@ -183,12 +183,12 @@ public class BsqBlockchainManager {
private void onSetupComplete() {
final int genesisBlockHeight = getGenesisBlockHeight();
final String genesisTxId = getGenesisTxId();
int startBlockHeight = Math.max(genesisBlockHeight, txOutputMap.getBlockHeight());
int startBlockHeight = Math.max(genesisBlockHeight, txOutputMap.getBlockHeight() + 1);
log.info("parseBlocks with:\n" +
"genesisTxId={}\n" +
"genesisBlockHeight={}\n" +
"startBlockHeight={}\n" +
"txOutputMap.lastBlockHeight={}",
"txOutputMap.blockHeight={}",
genesisTxId,
genesisBlockHeight,
startBlockHeight,
@ -199,8 +199,6 @@ public class BsqBlockchainManager {
genesisTxId);
}
// TODO handle reorgs
private void parseBlocks(int startBlockHeight, int genesisBlockHeight, String genesisTxId) {
blockchainService.requestChainHeadHeight(chainHeadHeight -> {
if (chainHeadHeight != startBlockHeight) {
@ -220,8 +218,12 @@ public class BsqBlockchainManager {
genesisBlockHeight,
genesisTxId);
}, throwable -> {
log.error(throwable.toString());
throwable.printStackTrace();
if (throwable instanceof OrphanDetectedException) {
startReOrgFromLastSnapshot(((OrphanDetectedException) throwable).getBlockHeight());
} else {
log.error(throwable.toString());
throwable.printStackTrace();
}
});
} else {
// We dont have received new blocks in the meantime so we are completed and we register our handler
@ -230,7 +232,6 @@ public class BsqBlockchainManager {
// We register our handler for new blocks
blockchainService.addBlockHandler(bsqBlock -> {
blockchainService.parseBlock(bsqBlock,
genesisBlockHeight,
genesisTxId,
@ -241,21 +242,25 @@ public class BsqBlockchainManager {
updateSnapshotOnTrigger(newTxOutputMap.getBlockHeight());
log.debug("new block parsed. bsqBlock={}", bsqBlock);
} else {
log.warn("We got a newTxOutputMap with a lower block height than the one form the " +
log.warn("We got a newTxOutputMap with a lower block height than the one from the " +
"map we requested. That should not happen, but theoretically could be " +
"if 2 blocks arrive at nearly the same time and the second is faster in " +
"parsing than the first, so the callback of the first will have a lower " +
"height. " +
"txOutputMap.getBlockHeight()={}; " +
"newTxOutputMap.getBlockHeight()={}",
"newTxOutputMap.getBlockHeight()={}\n" +
"To avoid conflicts we start a reorg from the last snapshot.",
txOutputMap.getBlockHeight(),
newTxOutputMap.getBlockHeight());
checkArgument(txOutputMap.getBlockHeight() < newTxOutputMap.getBlockHeight(),
"blockheight of requesting map and callback cannot be the same");
startReOrgFromLastSnapshot(newTxOutputMap.getBlockHeight());
}
}, throwable -> {
log.error(throwable.toString());
throwable.printStackTrace();
if (throwable instanceof OrphanDetectedException) {
startReOrgFromLastSnapshot(((OrphanDetectedException) throwable).getBlockHeight());
} else {
log.error(throwable.toString());
throwable.printStackTrace();
}
});
});
}
@ -265,6 +270,27 @@ public class BsqBlockchainManager {
});
}
private void startReOrgFromLastSnapshot(int blockHeight) {
log.warn("We have to do a re-org because a new block did not connect to our chain.");
int startBlockHeight = snapshotTxOutputMap != null ? snapshotTxOutputMap.getBlockHeight() : getGenesisBlockHeight();
checkArgument(snapshotTxOutputMap == null || startBlockHeight >= blockHeight - SNAPSHOT_TRIGGER);
blockchainService.requestBlock(startBlockHeight,
block -> {
if (snapshotTxOutputMap != null) {
checkArgument(startBlockHeight <= block.getHeight());
checkArgument(block.getHash().equals(snapshotTxOutputMap.getBlockHash()));
applyNewTxOutputMap(snapshotTxOutputMap);
} else {
applyNewTxOutputMap(new TxOutputMap());
}
parseBlocks(startBlockHeight,
getGenesisBlockHeight(),
getGenesisTxId());
}, throwable -> {
log.error(throwable.toString());
throwable.printStackTrace();
});
}
private void applyNewTxOutputMap(TxOutputMap newTxOutputMap) {
txOutputMap = newTxOutputMap;

View File

@ -65,6 +65,7 @@ public class BsqBlockchainRpcService extends BsqBlockchainService {
private final ListeningExecutorService setupExecutor = Utilities.getListeningExecutorService("RpcServiceSetup", 1, 1, 5);
private final ListeningExecutorService parseBlocksExecutor = Utilities.getListeningExecutorService("ParseBlocks", 1, 1, 60);
private final ListeningExecutorService getChainHeightExecutor = Utilities.getListeningExecutorService("GetChainHeight", 1, 1, 60);
private final ListeningExecutorService getBlockExecutor = Utilities.getListeningExecutorService("GetBlock", 1, 1, 60);
private BtcdClientImpl client;
private BtcdDaemonImpl daemon;
@ -158,6 +159,21 @@ public class BsqBlockchainRpcService extends BsqBlockchainService {
});
}
@Override
void requestBlock(int blockHeight, Consumer<Block> resultHandler, Consumer<Throwable> errorHandler) {
ListenableFuture<Block> future = getBlockExecutor.submit(() -> requestBlock(blockHeight));
Futures.addCallback(future, new FutureCallback<Block>() {
public void onSuccess(Block block) {
UserThread.execute(() -> resultHandler.accept(block));
}
public void onFailure(@NotNull Throwable throwable) {
UserThread.execute(() -> errorHandler.accept(throwable));
}
});
}
@Override
void parseBlocks(int startBlockHeight,
int chainHeadHeight,
@ -266,7 +282,8 @@ public class BsqBlockchainRpcService extends BsqBlockchainService {
@VisibleForTesting
@Override
Block requestBlock(int blockHeight) throws BitcoindException, CommunicationException {
return client.getBlock(client.getBlockHash(blockHeight));
final String blockHash = client.getBlockHash(blockHeight);
return client.getBlock(blockHash);
}
@VisibleForTesting

View File

@ -53,6 +53,8 @@ abstract public class BsqBlockchainService {
abstract void requestChainHeadHeight(Consumer<Integer> resultHandler, Consumer<Throwable> errorHandler);
abstract void requestBlock(int blockHeight, Consumer<Block> resultHandler, Consumer<Throwable> errorHandler);
abstract void parseBlocks(int startBlockHeight,
int chainHeadHeight,
int genesisBlockHeight,

View File

@ -23,10 +23,7 @@ import io.bisq.common.app.DevEnv;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.concurrent.Immutable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@ -40,6 +37,7 @@ public class BsqParser {
// Map<Integer, String> recursionMap = new HashMap<>();
private final BsqBlockchainService bsqBlockchainService;
private Map<String, Integer> blockHeightByHashMap = new HashMap<>();
public BsqParser(BsqBlockchainService bsqBlockchainService) {
this.bsqBlockchainService = bsqBlockchainService;
@ -51,7 +49,7 @@ public class BsqParser {
int genesisBlockHeight,
String genesisTxId,
TxOutputMap txOutputMap,
Consumer<TxOutputMap> snapShotHandler) throws BsqBlockchainException {
Consumer<TxOutputMap> snapShotHandler) throws BsqBlockchainException, OrphanDetectedException {
try {
log.info("chainHeadHeight=" + chainHeadHeight);
long startTotalTs = System.currentTimeMillis();
@ -59,7 +57,6 @@ public class BsqParser {
long startBlockTs = System.currentTimeMillis();
Block block = bsqBlockchainService.requestBlock(blockHeight);
log.debug("Current block blockHeight=" + blockHeight);
parseBlock(block,
genesisBlockHeight,
genesisTxId,
@ -91,6 +88,8 @@ public class BsqParser {
startBlockHeight,
chainHeadHeight,
System.currentTimeMillis() - startTotalTs);
} catch (OrphanDetectedException e) {
throw e;
} catch (Throwable t) {
log.error(t.toString());
t.printStackTrace();
@ -102,37 +101,53 @@ public class BsqParser {
int genesisBlockHeight,
String genesisTxId,
TxOutputMap txOutputMap)
throws BsqBlockchainException {
throws BsqBlockchainException, OrphanDetectedException {
int blockHeight = block.getHeight();
log.debug("Parse block at height={} ", blockHeight);
// We add all transactions to the block
List<Tx> txList = new ArrayList<>();
Tx genesisTx = null;
for (String txId : block.getTx()) {
final Tx tx = bsqBlockchainService.requestTransaction(txId, blockHeight);
txList.add(tx);
if (txId.equals(genesisTxId))
genesisTx = tx;
if (txOutputMap.getBlockHeight() >= blockHeight) {
log.warn("blockHeight from txOutputMap must not be larger than blockHeight in parser iteration");
throw new OrphanDetectedException(blockHeight);
}
if (genesisTx != null) {
checkArgument(blockHeight == genesisBlockHeight,
"If we have a matching genesis tx the block height must match as well");
parseGenesisTx(genesisTx, txOutputMap);
final String previousBlockHash = block.getPreviousBlockHash();
if (blockHeightByHashMap.isEmpty() ||
(blockHeightByHashMap.containsKey(previousBlockHash) &&
blockHeightByHashMap.containsKey(previousBlockHash) &&
blockHeight == blockHeightByHashMap.get(previousBlockHash) + 1)) {
blockHeightByHashMap.put(block.getHash(), blockHeight);
// check if the new block is the same chain we have built on.
log.debug("Parse block at height={} ", blockHeight);
// We add all transactions to the block
List<Tx> txList = new ArrayList<>();
Tx genesisTx = null;
for (String txId : block.getTx()) {
final Tx tx = bsqBlockchainService.requestTransaction(txId, blockHeight);
txList.add(tx);
if (txId.equals(genesisTxId))
genesisTx = tx;
}
if (genesisTx != null) {
checkArgument(blockHeight == genesisBlockHeight,
"If we have a matching genesis tx the block height must match as well");
parseGenesisTx(genesisTx, txOutputMap);
}
//txSize = block.getTxList().size();
// Worst case is that all txs in a block are depending on another, so only one get resolved at each iteration.
// Min tx size is 189 bytes (normally about 240 bytes), 1 MB can contain max. about 5300 txs (usually 2000).
// Realistically we don't expect more then a few recursive calls.
// There are some blocks with testing such dependency chains like block 130768 where at each iteration only
// one get resolved.
// Lately there is a patter with 24 iterations observed
parseTransactions(txList, txOutputMap, blockHeight, 0, 5300);
txOutputMap.setBlockHeight(blockHeight);
txOutputMap.setBlockHash(block.getHash());
} else {
log.warn("We need to do a re-org. We got a new block which does not connect to our current chain.");
throw new OrphanDetectedException(blockHeight);
}
//txSize = block.getTxList().size();
// Worst case is that all txs in a block are depending on another, so only one get resolved at each iteration.
// Min tx size is 189 bytes (normally about 240 bytes), 1 MB can contain max. about 5300 txs (usually 2000).
// Realistically we don't expect more then a few recursive calls.
// There are some blocks with testing such dependency chains like block 130768 where at each iteration only
// one get resolved.
// Lately there is a patter with 24 iterations observed
parseTransactions(txList, txOutputMap, blockHeight, 0, 5300);
checkArgument(txOutputMap.getBlockHeight() <= blockHeight,
"blockHeight from txOutputMap must not be larger than blockHeight in parser iteration");
txOutputMap.setBlockHeight(blockHeight);
}
@VisibleForTesting

View File

@ -0,0 +1,30 @@
/*
* This file is part of bisq.
*
* bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bisq.core.dao.blockchain;
import lombok.Getter;
@Getter
public class OrphanDetectedException extends Exception {
private int blockHeight;
public OrphanDetectedException(int blockHeight) {
this.blockHeight = blockHeight;
}
}

View File

@ -54,6 +54,9 @@ public class TxOutputMap implements Serializable {
@Getter
@Setter
private int blockHeight;
@Getter
@Setter
private String blockHash;
///////////////////////////////////////////////////////////////////////////////////////////

View File

@ -70,7 +70,7 @@ public class BsqBlockchainServiceTest {
}
@Test
public void testGenTx() throws BsqBlockchainException, BitcoindException, CommunicationException {
public void testGenTx() throws BsqBlockchainException, BitcoindException, CommunicationException, OrphanDetectedException {
// GENESIS_TX (block 0):
// Input 0: output from GEN_FUNDING_TX_ID
// Output 0: ADDRESS_GEN_1 ADDRESS_GEN_1_VALUE
@ -97,7 +97,7 @@ public class BsqBlockchainServiceTest {
@Test
public void testGenToTx1() throws BsqBlockchainException, BitcoindException, CommunicationException {
public void testGenToTx1() throws BsqBlockchainException, BitcoindException, CommunicationException, OrphanDetectedException {
// GENESIS_TX (block 0):
// Input 0: Output 0 from GEN_FUNDING_TX_ID
// Output 0: ADDRESS_GEN_1 ADDRESS_GEN_1_VALUE
@ -137,7 +137,7 @@ public class BsqBlockchainServiceTest {
}
@Test
public void testGenToTx1ToTx2InBlock1() throws BsqBlockchainException, BitcoindException, CommunicationException {
public void testGenToTx1ToTx2InBlock1() throws BsqBlockchainException, BitcoindException, CommunicationException, OrphanDetectedException {
// GENESIS_TX (block 0):
// Input 0: Output 0 from GEN_FUNDING_TX_ID
// Output 0: ADDRESS_GEN_1 ADDRESS_GEN_1_VALUE
@ -196,7 +196,7 @@ public class BsqBlockchainServiceTest {
}
@Test
public void testGenToTx1ToTx2InBlock2() throws BsqBlockchainException, BitcoindException, CommunicationException {
public void testGenToTx1ToTx2InBlock2() throws BsqBlockchainException, BitcoindException, CommunicationException, OrphanDetectedException {
// GENESIS_TX (block 0):
// Input 0: Output 0 from GEN_FUNDING_TX_ID
// Output 0: ADDRESS_GEN_1 ADDRESS_GEN_1_VALUE
@ -254,7 +254,7 @@ public class BsqBlockchainServiceTest {
}
@Test
public void testGenToTx1ToTx2AndGenToTx2InBlock1() throws BsqBlockchainException, BitcoindException, CommunicationException {
public void testGenToTx1ToTx2AndGenToTx2InBlock1() throws BsqBlockchainException, BitcoindException, CommunicationException, OrphanDetectedException {
// GENESIS_TX (block 0):
// Input 0: Output 0 from GEN_FUNDING_TX_ID
// Output 0: ADDRESS_GEN_1 ADDRESS_GEN_1_VALUE
@ -396,7 +396,7 @@ public class BsqBlockchainServiceTest {
private void parseAllBlocksFromGenesis()
throws BitcoindException, CommunicationException, BsqBlockchainException {
throws BitcoindException, CommunicationException, BsqBlockchainException, OrphanDetectedException {
BsqParser bsqParser = new BsqParser(service);
bsqParser.parseBlocks(BLOCK_0,
service.requestChainHeadHeight(),