Merge pull request #6505 from HenrikJannsen/use_thread_for_processing_BM_AccountingBlocks

Run processAccountingBlocks async in forkjoinpool thread
This commit is contained in:
Alejandro García 2023-01-06 14:09:35 +00:00 committed by GitHub
commit be1691ba0c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 42 additions and 27 deletions

View file

@ -64,11 +64,15 @@ public abstract class AccountingNode implements DaoSetupService, DaoStateListene
@Nullable
public static Sha256Hash getSha256Hash(Collection<AccountingBlock> blocks) {
long ts = System.currentTimeMillis();
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
for (AccountingBlock accountingBlock : blocks) {
outputStream.write(accountingBlock.toProtoMessage().toByteArray());
}
return Sha256Hash.of(outputStream.toByteArray());
Sha256Hash hash = Sha256Hash.of(outputStream.toByteArray());
// 2833 blocks takes about 23 ms
log.info("getSha256Hash for {} blocks took {} ms", blocks.size(), System.currentTimeMillis() - ts);
return hash;
} catch (IOException e) {
return null;
}

View file

@ -48,6 +48,8 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
@ -191,35 +193,44 @@ public class AccountingLiteNode extends AccountingNode implements AccountingLite
///////////////////////////////////////////////////////////////////////////////////////////
private void processAccountingBlocks(List<AccountingBlock> blocks) {
log.info("We received blocks from height {} to {}",
blocks.get(0).getHeight(),
blocks.get(blocks.size() - 1).getHeight());
CompletableFuture.runAsync(() -> {
long ts = System.currentTimeMillis();
log.info("We received blocks from height {} to {}",
blocks.get(0).getHeight(),
blocks.get(blocks.size() - 1).getHeight());
boolean requiresReOrg = false;
for (AccountingBlock block : blocks) {
try {
burningManAccountingService.addBlock(block);
} catch (BlockHeightNotConnectingException e) {
log.info("Height not connecting. This could happen if we received multiple responses and had already applied a previous one. {}", e.toString());
} catch (BlockHashNotConnectingException e) {
log.warn("Interrupt loop because a reorg is required. {}", e.toString());
requiresReOrg = true;
break;
AtomicBoolean requiresReOrg = new AtomicBoolean(false);
for (AccountingBlock block : blocks) {
try {
burningManAccountingService.addBlock(block);
} catch (BlockHeightNotConnectingException e) {
log.info("Height not connecting. This could happen if we received multiple responses and had already applied a previous one. {}", e.toString());
} catch (BlockHashNotConnectingException e) {
log.warn("Interrupt loop because a reorg is required. {}", e.toString());
requiresReOrg.set(true);
break;
}
}
}
if (requiresReOrg) {
applyReOrg();
return;
}
int heightOfLastBlock = burningManAccountingService.getBlockHeightOfLastBlock();
if (walletsSetup.isDownloadComplete() && heightOfLastBlock < bsqWalletService.getBestChainHeight()) {
accountingLiteNodeNetworkService.requestBlocks(heightOfLastBlock + 1);
} else {
if (!initialBlockRequestsComplete) {
onInitialBlockRequestsComplete();
}
}
UserThread.execute(() -> {
if (requiresReOrg.get()) {
applyReOrg();
return;
}
int heightOfLastBlock = burningManAccountingService.getBlockHeightOfLastBlock();
if (walletsSetup.isDownloadComplete() && heightOfLastBlock < bsqWalletService.getBestChainHeight()) {
accountingLiteNodeNetworkService.requestBlocks(heightOfLastBlock + 1);
} else {
if (!initialBlockRequestsComplete) {
onInitialBlockRequestsComplete();
}
}
// 2833 blocks takes about 24 sec
log.info("processAccountingBlocksAsync for {} blocks took {} ms", blocks.size(), System.currentTimeMillis() - ts);
});
});
}
private void processNewAccountingBlock(AccountingBlock accountingBlock) {