Move snapshot handling to manager class. use callback for each block

This commit is contained in:
Manfred Karrer 2017-04-13 15:16:11 -05:00
parent 552d8f1910
commit b95ab354e5
5 changed files with 29 additions and 31 deletions

View file

@ -207,9 +207,9 @@ public class BsqBlockchainManager {
genesisBlockHeight,
genesisTxId,
txOutputMap,
snapshotTxOutputMap -> {
applyNewTxOutputMap(snapshotTxOutputMap);
updateSnapshotOnTrigger(snapshotTxOutputMap.getBlockHeight());
newBlockMap -> {
applyNewTxOutputMap(newBlockMap);
updateSnapshotIfTrigger(newBlockMap.getBlockHeight());
}, chainTipTxOutputMap -> {
// we are done but it might be that new blocks have arrived in the meantime,
// so we try again with startBlockHeight set to current chainHeadHeight
@ -236,23 +236,23 @@ public class BsqBlockchainManager {
genesisBlockHeight,
genesisTxId,
txOutputMap,
newTxOutputMap -> {
if (txOutputMap.getBlockHeight() < newTxOutputMap.getBlockHeight()) {
applyNewTxOutputMap(newTxOutputMap);
updateSnapshotOnTrigger(newTxOutputMap.getBlockHeight());
newBlockMap -> {
if (txOutputMap.getBlockHeight() < newBlockMap.getBlockHeight()) {
applyNewTxOutputMap(newBlockMap);
updateSnapshotIfTrigger(newBlockMap.getBlockHeight());
log.debug("new block parsed. bsqBlock={}", bsqBlock);
} else {
log.warn("We got a newTxOutputMap with a lower block height than the one from the " +
log.warn("We got a newBlockMap with a lower block height than the one from the " +
"map we requested. That should not happen, but theoretically could be " +
"if 2 blocks arrive at nearly the same time and the second is faster in " +
"parsing than the first, so the callback of the first will have a lower " +
"height. " +
"txOutputMap.getBlockHeight()={}; " +
"newTxOutputMap.getBlockHeight()={}\n" +
"newBlockMap.getBlockHeight()={}\n" +
"To avoid conflicts we start a reorg from the last snapshot.",
txOutputMap.getBlockHeight(),
newTxOutputMap.getBlockHeight());
startReOrgFromLastSnapshot(newTxOutputMap.getBlockHeight());
newBlockMap.getBlockHeight());
startReOrgFromLastSnapshot(newBlockMap.getBlockHeight());
}
}, throwable -> {
if (throwable instanceof OrphanDetectedException) {
@ -297,16 +297,18 @@ public class BsqBlockchainManager {
txOutputMapListeners.stream().forEach(l -> l.onTxOutputMapChanged(txOutputMap));
}
private void updateSnapshotOnTrigger(int blockHeight) {
// At trigger time we store the last memory stored map to disc
if (snapshotTxOutputMap != null) {
// We clone because storage is in a threaded context
TxOutputMap clonedSnapshotTxOutputMap = TxOutputMap.getClonedMap(snapshotTxOutputMap);
snapshotTxOutputMapStorage.queueUpForSave(clonedSnapshotTxOutputMap);
}
private void updateSnapshotIfTrigger(int blockHeight) {
if (triggersSnapshot(blockHeight)) {
// At trigger time we store the last memory stored map to disc
if (snapshotTxOutputMap != null) {
// We clone because storage is in a threaded context
TxOutputMap clonedSnapshotTxOutputMap = TxOutputMap.getClonedMap(snapshotTxOutputMap);
snapshotTxOutputMapStorage.queueUpForSave(clonedSnapshotTxOutputMap);
}
// Now we save the map in memory for the next trigger
snapshotTxOutputMap = TxOutputMap.getClonedMap(txOutputMap);
// Now we save the map in memory for the next trigger
snapshotTxOutputMap = TxOutputMap.getClonedMap(txOutputMap);
}
}
private void onBsqTxoChanged() {

View file

@ -180,7 +180,7 @@ public class BsqBlockchainRpcService extends BsqBlockchainService {
int genesisBlockHeight,
String genesisTxId,
TxOutputMap txOutputMap,
Consumer<TxOutputMap> snapShotHandler,
Consumer<TxOutputMap> newBlockHandler,
Consumer<TxOutputMap> resultHandler,
Consumer<Throwable> errorHandler) {
ListenableFuture<TxOutputMap> future = parseBlocksExecutor.submit(() -> {
@ -193,9 +193,9 @@ public class BsqBlockchainRpcService extends BsqBlockchainService {
genesisBlockHeight,
genesisTxId,
clonedMap,
clonedSnapShotMap -> {
newBlockMap -> {
// We map to UserThread. We don't need to clone as it was created already newly in the parser.
UserThread.execute(() -> snapShotHandler.accept(clonedSnapShotMap));
UserThread.execute(() -> newBlockHandler.accept(newBlockMap));
});
log.info("parseBlockchain took {} ms", System.currentTimeMillis() - startTs);
return clonedMap;

View file

@ -60,7 +60,7 @@ abstract public class BsqBlockchainService {
int genesisBlockHeight,
String genesisTxId,
TxOutputMap txOutputMap,
Consumer<TxOutputMap> snapShotHandler,
Consumer<TxOutputMap> newBlockHandler,
Consumer<TxOutputMap> resultHandler,
Consumer<Throwable> errorHandler);

View file

@ -49,7 +49,7 @@ public class BsqParser {
int genesisBlockHeight,
String genesisTxId,
TxOutputMap txOutputMap,
Consumer<TxOutputMap> snapShotHandler) throws BsqBlockchainException, OrphanDetectedException {
Consumer<TxOutputMap> newBlockHandler) throws BsqBlockchainException, OrphanDetectedException {
try {
log.info("chainHeadHeight=" + chainHeadHeight);
long startTotalTs = System.currentTimeMillis();
@ -62,11 +62,7 @@ public class BsqParser {
genesisTxId,
txOutputMap);
if (BsqBlockchainManager.triggersSnapshot(blockHeight)) {
TxOutputMap clonedSnapShotMap = TxOutputMap.getClonedMap(txOutputMap);
//clonedSnapShotMap.printUnspentTxOutputs("triggersSnapshot");
snapShotHandler.accept(clonedSnapShotMap);
}
newBlockHandler.accept(TxOutputMap.getClonedMap(txOutputMap));
/* StringBuilder sb = new StringBuilder("recursionMap:\n");
List<String> list = new ArrayList<>();

View file

@ -160,7 +160,7 @@ public class BsqTxView extends ActivatableView<GridPane, Void> {
if (!invalidBsqTransactions.isEmpty() && bsqBlockchainManager.isParseBlockchainComplete()) {
Set<String> txIds = invalidBsqTransactions.stream()
.filter(t -> t != null)
.map(t -> t.getHashAsString()).collect(Collectors.toSet());
.map(Transaction::getHashAsString).collect(Collectors.toSet());
log.error("invalidBsqTransactions " + txIds);
String key = "invalidBsqTransactionsWarning_" + txIds;