Stream block batch processing on render frames

To avoid that the UI gets frozen at batch processing of blocks we
delay each parsing to the next render frame. The total parsing time is
just about 5% slower that way but the UI can render updates.
We also changed the hash for the daoState as the hashing of the full
state becomes quite heavy. The size of the blocks is about 1,4 MB for
7000 blocks (dao testnet). As on a new block only the last block in the
chain got added and as we use the previous hash in the hash chain we
do not need to hash the full blocks list but only the last block.
By that we decrease batch processing time from 30 sec to 7 sec. and data
size of the daoState from 1,4 MB to 200 kb.
Also added progress display of missing blocks in the Tx UI.
This commit is contained in:
Manfred Karrer 2019-03-21 00:27:12 -05:00
parent f780cf5a15
commit 735f619fee
No known key found for this signature in database
GPG key ID: 401250966A6B2C46
10 changed files with 124 additions and 42 deletions

View file

@ -45,6 +45,7 @@ import lombok.extern.slf4j.Slf4j;
public class BallotListPresentation implements BallotListService.BallotListChangeListener, DaoStateListener {
private final BallotListService ballotListService;
private final PeriodService periodService;
private final DaoStateService daoStateService;
private final ProposalValidatorProvider proposalValidatorProvider;
@Getter
@ -64,6 +65,7 @@ public class BallotListPresentation implements BallotListService.BallotListChang
ProposalValidatorProvider proposalValidatorProvider) {
this.ballotListService = ballotListService;
this.periodService = periodService;
this.daoStateService = daoStateService;
this.proposalValidatorProvider = proposalValidatorProvider;
daoStateService.addDaoStateListener(this);
@ -76,8 +78,14 @@ public class BallotListPresentation implements BallotListService.BallotListChang
@Override
public void onNewBlockHeight(int blockHeight) {
//TODO should it be in onParseTxsComplete?
ballotsOfCycle.setPredicate(ballot -> periodService.isTxInCorrectCycle(ballot.getTxId(), blockHeight));
if (daoStateService.isParseBlockChainComplete()) {
ballotsOfCycle.setPredicate(ballot -> periodService.isTxInCorrectCycle(ballot.getTxId(), blockHeight));
}
}
@Override
public void onParseBlockChainComplete() {
ballotsOfCycle.setPredicate(ballot -> periodService.isTxInCorrectCycle(ballot.getTxId(), daoStateService.getChainHeight()));
}
@Override

View file

@ -77,9 +77,8 @@ public class CycleService implements DaoStateListener, DaoSetupService {
@Override
public void onNewBlockHeight(int blockHeight) {
if (blockHeight != genesisBlockHeight)
maybeCreateNewCycle(blockHeight, daoStateService.getCycles())
.ifPresent(daoStateService::addCycle);
maybeCreateNewCycle(blockHeight, daoStateService.getCycles())
.ifPresent(daoStateService::addCycle);
}
@ -118,7 +117,7 @@ public class CycleService implements DaoStateListener, DaoSetupService {
// applied the new cycle yet. But the first block of the old cycle will always be the same as the
// first block of the new cycle.
Cycle cycle = null;
if (blockHeight != genesisBlockHeight && isFirstBlockAfterPreviousCycle(blockHeight, cycles) && !cycles.isEmpty()) {
if (blockHeight > genesisBlockHeight && !cycles.isEmpty() && isFirstBlockAfterPreviousCycle(blockHeight, cycles)) {
// We have the not update daoStateService.getCurrentCycle() so we grab here the previousCycle
Cycle previousCycle = cycles.getLast();
// We create the new cycle as clone of the previous cycle and only if there have been change events we use
@ -160,8 +159,8 @@ public class CycleService implements DaoStateListener, DaoSetupService {
}
private boolean isFirstBlockAfterPreviousCycle(int height, LinkedList<Cycle> cycles) {
final int previousBlockHeight = height - 1;
final Optional<Cycle> previousCycle = getCycle(previousBlockHeight, cycles);
int previousBlockHeight = height - 1;
Optional<Cycle> previousCycle = getCycle(previousBlockHeight, cycles);
return previousCycle
.filter(cycle -> cycle.getHeightOfLastBlock() + 1 == height)
.isPresent();

View file

@ -256,7 +256,6 @@ public class BlindVoteStateMonitoringService implements DaoSetupService, DaoStat
byte[] combined = ArrayUtils.addAll(prevHash, serializedBlindVotes);
byte[] hash = Hash.getSha256Ripemd160hash(combined);
BlindVoteStateHash myBlindVoteStateHash = new BlindVoteStateHash(blockHeight, hash, prevHash, blindVotes.size());
BlindVoteStateBlock blindVoteStateBlock = new BlindVoteStateBlock(myBlindVoteStateHash);
blindVoteStateBlockChain.add(blindVoteStateBlock);

View file

@ -235,7 +235,7 @@ public class DaoStateMonitoringService implements DaoSetupService, DaoStateListe
height, daoStateBlockChain.getLast().getHeight());
prevHash = daoStateBlockChain.getLast().getHash();
}
byte[] stateHash = daoStateService.getSerializedDaoState();
byte[] stateHash = daoStateService.getSerializedStateForHashChain();
// We include the prev. hash in our new hash so we can be sure that if one hash is matching all the past would
// match as well.
byte[] combined = ArrayUtils.addAll(prevHash, stateHash);

View file

@ -30,6 +30,8 @@ import bisq.core.dao.state.DaoStateSnapshotService;
import bisq.network.p2p.P2PService;
import bisq.network.p2p.network.Connection;
import bisq.common.UserThread;
import com.google.inject.Inject;
import java.util.ArrayList;
@ -142,23 +144,43 @@ public class LiteNode extends BsqNode {
log.info("We received blocks from height {} to {}", blockList.get(0).getHeight(), chainTipHeight);
}
// 4000 blocks take about 3 seconds if DAO UI is not displayed or 7 sec. if it is displayed.
// We stream the parsing over each render frame to avoid that the UI get blocked in case we parse a lot of blocks.
// Parsing itself is very fast (3 sec. for 7000 blocks) but creating the hash chain slows down batch processing a lot
// (30 sec for 7000 blocks).
// The updates at block height change are not much optimized yet, so that can be for sure improved
// 144 blocks a day would result in about 4000 in a month, so if a user downloads the app after 1 months latest
// release it will be a bit of a performance hit. It is a one time event as the snapshots gets created and be
// used at next startup.
// used at next startup. New users will get the shipped snapshot. Users who have not used Bisq for longer might
// experience longer durations for batch processing.
long ts = System.currentTimeMillis();
for (RawBlock block : blockList) {
if (blockList.isEmpty()) {
onParseBlockChainComplete();
return;
}
runDelayedBatchProcessing(new ArrayList<>(blockList),
() -> {
log.info("Parsing {} blocks took {} seconds.", blockList.size(), (System.currentTimeMillis() - ts) / 1000d);
onParseBlockChainComplete();
});
}
private void runDelayedBatchProcessing(List<RawBlock> blocks, Runnable resultHandler) {
UserThread.execute(() -> {
if (blocks.isEmpty()) {
resultHandler.run();
return;
}
RawBlock block = blocks.remove(0);
try {
doParseBlock(block);
} catch (RequiredReorgFromSnapshotException e1) {
// In case we got a reorg we break the iteration
break;
runDelayedBatchProcessing(blocks, resultHandler);
} catch (RequiredReorgFromSnapshotException e) {
resultHandler.run();
}
}
log.info("Parsing {} blocks took {} seconds.", blockList.size(), (System.currentTimeMillis() - ts) / 1000d);
onParseBlockChainComplete();
});
}
// We received a new block

View file

@ -117,7 +117,8 @@ public class BlockParser {
.ifPresent(txList::add));
if (System.currentTimeMillis() - startTs > 0)
log.info("Parsing {} transactions took {} ms", rawBlock.getRawTxs().size(), System.currentTimeMillis() - startTs);
log.info("Parsing {} transactions at block height {} took {} ms", rawBlock.getRawTxs().size(),
blockHeight, System.currentTimeMillis() - startTs);
daoStateService.onParseBlockComplete(block);
return block;

View file

@ -153,8 +153,8 @@ public class DaoStateService implements DaoSetupService {
return DaoState.getClone(snapshotCandidate);
}
public byte[] getSerializedDaoState() {
return daoState.toProtoMessage().toByteArray();
public byte[] getSerializedStateForHashChain() {
return daoState.getSerializedStateForHashChain();
}
@ -223,13 +223,15 @@ public class DaoStateService implements DaoSetupService {
} else {
daoState.getBlocks().add(block);
log.info("New Block added at blockHeight {}", block.getHeight());
if (parseBlockChainComplete)
log.info("New Block added at blockHeight {}", block.getHeight());
}
}
// Third we get the onParseBlockComplete called after all rawTxs of blocks have been parsed
public void onParseBlockComplete(Block block) {
log.info("Parse block completed: Block height {}, {} BSQ transactions.", block.getHeight(), block.getTxs().size());
if (parseBlockChainComplete)
log.info("Parse block completed: Block height {}, {} BSQ transactions.", block.getHeight(), block.getTxs().size());
// Need to be called before onParseTxsCompleteAfterBatchProcessing as we use it in
// VoteResult and other listeners like balances usually listen on onParseTxsCompleteAfterBatchProcessing

View file

@ -160,9 +160,14 @@ public class DaoState implements PersistablePayload {
}
public PB.BsqState.Builder getBsqStateBuilder() {
final PB.BsqState.Builder builder = PB.BsqState.newBuilder();
return getBsqStateBuilderExcludingBlocks().addAllBlocks(blocks.stream()
.map(Block::toProtoMessage)
.collect(Collectors.toList()));
}
private PB.BsqState.Builder getBsqStateBuilderExcludingBlocks() {
PB.BsqState.Builder builder = PB.BsqState.newBuilder();
builder.setChainHeight(chainHeight)
.addAllBlocks(blocks.stream().map(Block::toProtoMessage).collect(Collectors.toList()))
.addAllCycles(cycles.stream().map(Cycle::toProtoMessage).collect(Collectors.toList()))
.putAllUnspentTxOutputMap(unspentTxOutputMap.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue().toProtoMessage())))
@ -222,6 +227,15 @@ public class DaoState implements PersistablePayload {
this.chainHeight = chainHeight;
}
public byte[] getSerializedStateForHashChain() {
// We only add last block as for the hash chain we include the prev. hash in the new hash so the state of the
// earlier blocks is included in the hash. The past blocks cannot be changed anyway when a new block arrives.
// Reorgs are handled by rebuilding the hash chain from last snapshot.
// Using the full blocks list becomes quite heavy. 7000 blocks are
// about 1.4 MB and creating the hash takes 30 sec. With using just the last block we reduce the time to 7 sec.
return getBsqStateBuilderExcludingBlocks().addBlocks(getBlocks().getLast().toProtoMessage()).build().toByteArray();
}
@Override
public String toString() {
return "DaoState{" +

View file

@ -41,6 +41,9 @@ import lombok.Value;
* updated during parsing. If we would set then after the parsing the immutable block we might have inconsistent data.
* There might be a way to do it but it comes with high complexity and risks so for now we prefer to have that known
* issue with not being fully immutable at that level.
*
* An empty block (no BSQ txs) has 146 bytes in Protobuffer serialized form.
*
*/
@EqualsAndHashCode(callSuper = true)
@Value

View file

@ -32,6 +32,7 @@ import bisq.core.btc.wallet.BsqWalletService;
import bisq.core.btc.wallet.BtcWalletService;
import bisq.core.dao.DaoFacade;
import bisq.core.dao.state.DaoStateListener;
import bisq.core.dao.state.DaoStateService;
import bisq.core.dao.state.model.blockchain.Block;
import bisq.core.dao.state.model.blockchain.TxType;
import bisq.core.dao.state.model.governance.IssuanceType;
@ -39,6 +40,8 @@ import bisq.core.locale.Res;
import bisq.core.user.Preferences;
import bisq.core.util.BsqFormatter;
import bisq.common.Timer;
import bisq.common.UserThread;
import bisq.common.app.DevEnv;
import org.bitcoinj.core.Coin;
@ -78,6 +81,7 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@FxmlView
@ -86,6 +90,7 @@ public class BsqTxView extends ActivatableView<GridPane, Void> implements BsqBal
private TableView<BsqTxListItem> tableView;
private final DaoFacade daoFacade;
private final DaoStateService daoStateService;
private final BsqFormatter bsqFormatter;
private final BsqWalletService bsqWalletService;
private final BtcWalletService btcWalletService;
@ -100,6 +105,10 @@ public class BsqTxView extends ActivatableView<GridPane, Void> implements BsqBal
private Label chainHeightLabel;
private ProgressBar chainSyncIndicator;
private ChangeListener<Number> walletChainHeightListener;
private Timer updateAnyChainHeightTimer;
private int walletChainHeight;
private int blockHeightBeforeProcessing;
private int missingBlocks;
///////////////////////////////////////////////////////////////////////////////////////////
@ -108,12 +117,14 @@ public class BsqTxView extends ActivatableView<GridPane, Void> implements BsqBal
@Inject
private BsqTxView(DaoFacade daoFacade,
DaoStateService daoStateService,
BsqWalletService bsqWalletService,
Preferences preferences,
BtcWalletService btcWalletService,
BsqBalanceUtil bsqBalanceUtil,
BsqFormatter bsqFormatter) {
this.daoFacade = daoFacade;
this.daoStateService = daoStateService;
this.bsqFormatter = bsqFormatter;
this.bsqWalletService = bsqWalletService;
this.preferences = preferences;
@ -163,8 +174,10 @@ public class BsqTxView extends ActivatableView<GridPane, Void> implements BsqBal
root.getChildren().add(vBox);
walletBsqTransactionsListener = change -> updateList();
//TODO do we want to get notified from wallet side?
walletChainHeightListener = (observable, oldValue, newValue) -> onUpdateAnyChainHeight();
walletChainHeightListener = (observable, oldValue, newValue) -> {
walletChainHeight = bsqWalletService.getBestChainHeight();
onUpdateAnyChainHeight();
};
}
@Override
@ -180,6 +193,15 @@ public class BsqTxView extends ActivatableView<GridPane, Void> implements BsqBal
daoFacade.addBsqStateListener(this);
updateList();
walletChainHeight = bsqWalletService.getBestChainHeight();
blockHeightBeforeProcessing = daoFacade.getChainHeight();
missingBlocks = walletChainHeight - blockHeightBeforeProcessing;
if (!daoStateService.isParseBlockChainComplete()) {
updateAnyChainHeightTimer = UserThread.runPeriodically(() -> {
onUpdateAnyChainHeight();
}, 100, TimeUnit.MILLISECONDS);
}
onUpdateAnyChainHeight();
}
@ -193,6 +215,11 @@ public class BsqTxView extends ActivatableView<GridPane, Void> implements BsqBal
daoFacade.removeBsqStateListener(this);
observableList.forEach(BsqTxListItem::cleanup);
if (updateAnyChainHeightTimer != null) {
updateAnyChainHeightTimer.stop();
updateAnyChainHeightTimer = null;
}
}
@ -221,6 +248,14 @@ public class BsqTxView extends ActivatableView<GridPane, Void> implements BsqBal
onUpdateAnyChainHeight();
}
@Override
public void onParseBlockChainComplete() {
if (updateAnyChainHeightTimer != null) {
updateAnyChainHeightTimer.stop();
updateAnyChainHeightTimer = null;
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
@ -228,28 +263,27 @@ public class BsqTxView extends ActivatableView<GridPane, Void> implements BsqBal
// If chain height from wallet of from the BSQ blockchain parsing changed we update our state.
private void onUpdateAnyChainHeight() {
final int bsqBlockChainHeight = daoFacade.getChainHeight();
final int bsqWalletChainHeight = bsqWalletService.getBestChainHeight();
if (bsqWalletChainHeight > 0) {
final boolean synced = bsqWalletChainHeight == bsqBlockChainHeight;
int currentBlockHeight = daoFacade.getChainHeight();
if (walletChainHeight > 0) {
int processedBlocks = currentBlockHeight - blockHeightBeforeProcessing;
double progress = (double) processedBlocks / (double) missingBlocks;
boolean synced = walletChainHeight == currentBlockHeight;
chainSyncIndicator.setVisible(!synced);
chainSyncIndicator.setManaged(!synced);
if (bsqBlockChainHeight != bsqWalletChainHeight)
chainSyncIndicator.setProgress(-1);
if (synced) {
chainHeightLabel.setText(Res.get("dao.wallet.chainHeightSynced",
bsqBlockChainHeight,
bsqWalletChainHeight));
currentBlockHeight,
walletChainHeight));
} else {
chainSyncIndicator.setProgress(progress);
chainHeightLabel.setText(Res.get("dao.wallet.chainHeightSyncing",
bsqBlockChainHeight,
bsqWalletChainHeight));
currentBlockHeight,
walletChainHeight));
}
} else {
chainHeightLabel.setText(Res.get("dao.wallet.chainHeightSyncing",
bsqBlockChainHeight,
bsqWalletChainHeight));
currentBlockHeight,
walletChainHeight));
}
updateList();
}