Use Thread for parsing BsqBlocks at BsqLiteNode

This commit is contained in:
Manfred Karrer 2017-04-16 23:38:50 -05:00
parent 2955c0efff
commit 27462f0e83
6 changed files with 59 additions and 18 deletions

View file

@ -30,6 +30,7 @@ import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import javax.inject.Inject; import javax.inject.Inject;
import java.util.List;
import java.util.function.Consumer; import java.util.function.Consumer;
// Used for non blocking access to blockchain data and parsing. Encapsulate thread context, so caller // Used for non blocking access to blockchain data and parsing. Encapsulate thread context, so caller
@ -176,4 +177,40 @@ public class BsqBlockchainRequest {
public void addBlockHandler(Consumer<Block> blockHandler) { public void addBlockHandler(Consumer<Block> blockHandler) {
bsqBlockchainService.registerBlockHandler(blockHandler); bsqBlockchainService.registerBlockHandler(blockHandler);
} }
// BsqLiteNode parse with delivered BsqBlocks. Much faster than requesting via RPC....
void parseBsqBlocks(List<BsqBlock> bsqBlockList,
int genesisBlockHeight,
String genesisTxId,
Consumer<BsqBlock> newBlockHandler,
ResultHandler resultHandler,
Consumer<Throwable> errorHandler) {
ListenableFuture<Void> future = parseBlocksExecutor.submit(() -> {
long startTs = System.currentTimeMillis();
bsqParser.parseBsqBlocks(bsqBlockList,
genesisBlockHeight,
genesisTxId,
newBsqBlock -> {
UserThread.execute(() -> newBlockHandler.accept(newBsqBlock));
});
log.info("parseBlocks took {} ms for {} blocks", System.currentTimeMillis() - startTs, bsqBlockList.size());
return null;
});
Futures.addCallback(future, new FutureCallback<Void>() {
@Override
public void onSuccess(Void ignore) {
UserThread.execute(() -> {
UserThread.execute(resultHandler::handleResult);
});
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.error(throwable.toString());
throwable.printStackTrace();
UserThread.execute(() -> errorHandler.accept(throwable));
}
});
}
} }

View file

@ -49,7 +49,6 @@ import java.util.List;
@Slf4j @Slf4j
public class BsqFullNode extends BsqNode { public class BsqFullNode extends BsqNode {
private final BsqBlockchainRequest bsqBlockchainRequest;
private final JsonExporter jsonExporter; private final JsonExporter jsonExporter;
@Getter @Getter
private boolean parseBlockchainComplete; private boolean parseBlockchainComplete;
@ -73,9 +72,9 @@ public class BsqFullNode extends BsqNode {
p2PService, p2PService,
bsqChainState, bsqChainState,
bsqParser, bsqParser,
bsqBlockchainRequest,
persistenceProtoResolver, persistenceProtoResolver,
storageDir); storageDir);
this.bsqBlockchainRequest = bsqBlockchainRequest;
this.jsonExporter = jsonExporter; this.jsonExporter = jsonExporter;
} }

View file

@ -29,7 +29,6 @@ import io.bisq.common.storage.Storage;
import io.bisq.common.util.Utilities; import io.bisq.common.util.Utilities;
import io.bisq.core.app.BisqEnvironment; import io.bisq.core.app.BisqEnvironment;
import io.bisq.core.dao.blockchain.exceptions.BlockNotConnectingException; import io.bisq.core.dao.blockchain.exceptions.BlockNotConnectingException;
import io.bisq.core.dao.blockchain.exceptions.BsqBlockchainException;
import io.bisq.core.dao.blockchain.p2p.GetBsqBlocksRequest; import io.bisq.core.dao.blockchain.p2p.GetBsqBlocksRequest;
import io.bisq.core.dao.blockchain.p2p.GetBsqBlocksResponse; import io.bisq.core.dao.blockchain.p2p.GetBsqBlocksResponse;
import io.bisq.core.dao.blockchain.p2p.NewBsqBlockBroadcastMsg; import io.bisq.core.dao.blockchain.p2p.NewBsqBlockBroadcastMsg;
@ -60,12 +59,14 @@ public class BsqLiteNode extends BsqNode {
P2PService p2PService, P2PService p2PService,
BsqChainState bsqChainState, BsqChainState bsqChainState,
BsqParser bsqParser, BsqParser bsqParser,
BsqBlockchainRequest bsqBlockchainRequest,
PersistenceProtoResolver persistenceProtoResolver, PersistenceProtoResolver persistenceProtoResolver,
@Named(Storage.STORAGE_DIR) File storageDir) { @Named(Storage.STORAGE_DIR) File storageDir) {
super(bisqEnvironment, super(bisqEnvironment,
p2PService, p2PService,
bsqChainState, bsqChainState,
bsqParser, bsqParser,
bsqBlockchainRequest,
persistenceProtoResolver, persistenceProtoResolver,
storageDir); storageDir);
} }
@ -134,16 +135,20 @@ public class BsqLiteNode extends BsqNode {
byte[] bsqBlocksBytes = getBsqBlocksResponse.getBsqBlocksBytes(); byte[] bsqBlocksBytes = getBsqBlocksResponse.getBsqBlocksBytes();
List<BsqBlock> bsqBlockList = Utilities.<ArrayList<BsqBlock>>deserialize(bsqBlocksBytes); List<BsqBlock> bsqBlockList = Utilities.<ArrayList<BsqBlock>>deserialize(bsqBlocksBytes);
log.debug("received msg with {} items", bsqBlockList.size()); log.debug("received msg with {} items", bsqBlockList.size());
try { bsqBlockchainRequest.parseBsqBlocks(bsqBlockList,
bsqParser.parseBsqBlocks(bsqBlockList, getGenesisBlockHeight(), getGenesisTxId(), getGenesisBlockHeight(),
this::onNewBsqBlock); getGenesisTxId(),
this::onNewBsqBlock,
onParseBlockchainComplete(getGenesisBlockHeight(), getGenesisTxId()); () -> {
} catch (BsqBlockchainException e) { onParseBlockchainComplete(getGenesisBlockHeight(), getGenesisTxId());
e.printStackTrace(); }, throwable -> {
} catch (BlockNotConnectingException e) { if (throwable instanceof BlockNotConnectingException) {
e.printStackTrace(); startReOrgFromLastSnapshot(((BlockNotConnectingException) throwable).getBlock());
} } else {
log.error(throwable.toString());
throwable.printStackTrace();
}
});
} else if (parseBlockchainComplete && msg instanceof NewBsqBlockBroadcastMsg) { } else if (parseBlockchainComplete && msg instanceof NewBsqBlockBroadcastMsg) {
NewBsqBlockBroadcastMsg newBsqBlockBroadcastMsg = (NewBsqBlockBroadcastMsg) msg; NewBsqBlockBroadcastMsg newBsqBlockBroadcastMsg = (NewBsqBlockBroadcastMsg) msg;
byte[] bsqBlockBytes = newBsqBlockBroadcastMsg.getBsqBlockBytes(); byte[] bsqBlockBytes = newBsqBlockBroadcastMsg.getBsqBlockBytes();

View file

@ -103,6 +103,8 @@ public abstract class BsqNode {
@SuppressWarnings("WeakerAccess") @SuppressWarnings("WeakerAccess")
protected final BsqParser bsqParser; protected final BsqParser bsqParser;
@SuppressWarnings("WeakerAccess") @SuppressWarnings("WeakerAccess")
protected final BsqBlockchainRequest bsqBlockchainRequest;
@SuppressWarnings("WeakerAccess")
protected final List<BsqChainStateListener> bsqChainStateListeners = new ArrayList<>(); protected final List<BsqChainStateListener> bsqChainStateListeners = new ArrayList<>();
private final Storage<BsqChainState> snapshotBsqChainStateStorage; private final Storage<BsqChainState> snapshotBsqChainStateStorage;
@ -123,12 +125,14 @@ public abstract class BsqNode {
P2PService p2PService, P2PService p2PService,
BsqChainState bsqChainState, BsqChainState bsqChainState,
BsqParser bsqParser, BsqParser bsqParser,
BsqBlockchainRequest bsqBlockchainRequest,
PersistenceProtoResolver persistenceProtoResolver, PersistenceProtoResolver persistenceProtoResolver,
@Named(Storage.STORAGE_DIR) File storageDir) { @Named(Storage.STORAGE_DIR) File storageDir) {
this.p2PService = p2PService; this.p2PService = p2PService;
this.bsqChainState = bsqChainState; this.bsqChainState = bsqChainState;
this.bsqParser = bsqParser; this.bsqParser = bsqParser;
this.bsqBlockchainRequest = bsqBlockchainRequest;
isRegTest = bisqEnvironment.getBitcoinNetwork() == BitcoinNetwork.REGTEST; isRegTest = bisqEnvironment.getBitcoinNetwork() == BitcoinNetwork.REGTEST;
snapshotBsqChainStateStorage = new Storage<>(storageDir, persistenceProtoResolver); snapshotBsqChainStateStorage = new Storage<>(storageDir, persistenceProtoResolver);

View file

@ -57,8 +57,6 @@ public class BsqParser {
String genesisTxId, String genesisTxId,
Consumer<BsqBlock> newBlockHandler) Consumer<BsqBlock> newBlockHandler)
throws BsqBlockchainException, BlockNotConnectingException { throws BsqBlockchainException, BlockNotConnectingException {
long startTs = System.currentTimeMillis();
// TODO thread
for (BsqBlock bsqBlock : bsqBlocks) { for (BsqBlock bsqBlock : bsqBlocks) {
parseBsqBlock(bsqBlock, parseBsqBlock(bsqBlock,
genesisBlockHeight, genesisBlockHeight,
@ -66,8 +64,6 @@ public class BsqParser {
bsqChainState.addBlock(bsqBlock); bsqChainState.addBlock(bsqBlock);
newBlockHandler.accept(bsqBlock); newBlockHandler.accept(bsqBlock);
} }
log.info("parseBlocks took {} ms for {} blocks", System.currentTimeMillis() - startTs, bsqBlocks.size());
} }
void parseBsqBlock(BsqBlock bsqBlock, void parseBsqBlock(BsqBlock bsqBlock,

View file

@ -321,7 +321,7 @@ public class PendingTradesViewModel extends ActivatableWithDataModel<PendingTrad
private void onTradeStateChanged(Trade.State tradeState) { private void onTradeStateChanged(Trade.State tradeState) {
Log.traceCall(tradeState.toString()); Log.traceCall(tradeState.toString());
log.info("UI tradeState={}, id={}", log.debug("UI tradeState={}, id={}",
tradeState, tradeState,
trade != null ? trade.getShortId() : "trade is null"); trade != null ? trade.getShortId() : "trade is null");
// TODO what is first valid state for trade? // TODO what is first valid state for trade?