Merge pull request #6551 from alvasw/fix_burningman_accounting_store_data_races

Fix BurningManAccountingStore data races
This commit is contained in:
Alejandro García 2023-02-04 12:43:36 +00:00 committed by GitHub
commit e25c27ec99
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 139 additions and 48 deletions

View file

@ -45,7 +45,6 @@ import javax.inject.Singleton;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Comparator;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.HashMap;
@ -113,7 +112,7 @@ public class BurningManAccountingService implements DaoSetupService {
CompletableFuture.runAsync(() -> {
Map<String, BalanceModel> map = new HashMap<>();
// addAccountingBlockToBalanceModel takes about 500ms for 100k items, so we run it in a non UI thread.
getBlocks().forEach(block -> addAccountingBlockToBalanceModel(map, block));
burningManAccountingStoreService.forEachBlock(block -> addAccountingBlockToBalanceModel(map, block));
UserThread.execute(() -> balanceModelByBurningManName.putAll(map));
});
}
@ -125,7 +124,7 @@ public class BurningManAccountingService implements DaoSetupService {
public void onInitialBlockRequestsComplete() {
updateBalanceModelByAddress();
getBlocks().forEach(this::addAccountingBlockToBalanceModel);
burningManAccountingStoreService.forEachBlock(this::addAccountingBlockToBalanceModel);
}
public void onNewBlockReceived(AccountingBlock accountingBlock) {
@ -134,25 +133,7 @@ public class BurningManAccountingService implements DaoSetupService {
}
public void addBlock(AccountingBlock block) throws BlockHashNotConnectingException, BlockHeightNotConnectingException {
if (!getBlocks().contains(block)) {
Optional<AccountingBlock> optionalLastBlock = getLastBlock();
if (optionalLastBlock.isPresent()) {
AccountingBlock lastBlock = optionalLastBlock.get();
if (block.getHeight() != lastBlock.getHeight() + 1) {
throw new BlockHeightNotConnectingException();
}
if (!Arrays.equals(block.getTruncatedPreviousBlockHash(), lastBlock.getTruncatedHash())) {
throw new BlockHashNotConnectingException();
}
} else if (block.getHeight() != EARLIEST_BLOCK_HEIGHT) {
throw new BlockHeightNotConnectingException();
}
log.info("Add new accountingBlock at height {} at {} with {} txs", block.getHeight(),
new Date(block.getDate()), block.getTxs().size());
burningManAccountingStoreService.addBlock(block);
} else {
log.info("We have that block already. Height: {}", block.getHeight());
}
burningManAccountingStoreService.addIfNewBlock(block);
}
public int getBlockHeightOfLastBlock() {
@ -160,11 +141,11 @@ public class BurningManAccountingService implements DaoSetupService {
}
public Optional<AccountingBlock> getLastBlock() {
return getBlocks().stream().max(Comparator.comparing(AccountingBlock::getHeight));
return burningManAccountingStoreService.getLastBlock();
}
public Optional<AccountingBlock> getBlockAtHeight(int height) {
return getBlocks().stream().filter(block -> block.getHeight() == height).findAny();
return burningManAccountingStoreService.getBlockAtHeight(height);
}
public Map<Date, Price> getAverageBsqPriceByMonth() {
@ -213,8 +194,8 @@ public class BurningManAccountingService implements DaoSetupService {
// Delegates
///////////////////////////////////////////////////////////////////////////////////////////
public List<AccountingBlock> getBlocks() {
return burningManAccountingStoreService.getBlocks();
public List<AccountingBlock> getBlocksAtLeastWithHeight(int minHeight) {
return burningManAccountingStoreService.getBlocksAtLeastWithHeight(minHeight);
}
public Map<String, String> getBurningManNameByAddress() {

View file

@ -39,7 +39,6 @@ import com.google.common.util.concurrent.SettableFuture;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@ -92,9 +91,7 @@ class GetAccountingBlocksRequestHandler {
public void onGetBlocksRequest(GetAccountingBlocksRequest request, Connection connection) {
long ts = System.currentTimeMillis();
List<AccountingBlock> blocks = burningManAccountingService.getBlocks().stream()
.filter(block -> block.getHeight() >= request.getFromBlockHeight())
.collect(Collectors.toList());
List<AccountingBlock> blocks = burningManAccountingService.getBlocksAtLeastWithHeight(request.getFromBlockHeight());
byte[] signature = AccountingNode.getSignature(AccountingNode.getSha256Hash(blocks), bmOracleNodePrivKey);
GetAccountingBlocksResponse getBlocksResponse = new GetAccountingBlocksResponse(blocks, request.getNonce(), bmOracleNodePubKey, signature);
log.info("Received GetAccountingBlocksRequest from {} for blocks from height {}. " +

View file

@ -19,27 +19,135 @@ package bisq.core.dao.burningman.accounting.storage;
import bisq.core.dao.burningman.accounting.blockchain.AccountingBlock;
import bisq.core.dao.burningman.accounting.exceptions.BlockHashNotConnectingException;
import bisq.core.dao.burningman.accounting.exceptions.BlockHeightNotConnectingException;
import bisq.common.proto.persistable.PersistableEnvelope;
import com.google.protobuf.Message;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import static bisq.core.dao.burningman.accounting.BurningManAccountingService.EARLIEST_BLOCK_HEIGHT;
@Slf4j
@Getter
public class BurningManAccountingStore implements PersistableEnvelope {
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final LinkedList<AccountingBlock> blocks = new LinkedList<>();
public BurningManAccountingStore(List<AccountingBlock> blocks) {
this.blocks.addAll(blocks);
}
public void addIfNewBlock(AccountingBlock newBlock) throws BlockHeightNotConnectingException, BlockHashNotConnectingException {
Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
tryToAddNewBlock(newBlock);
} finally {
writeLock.unlock();
}
}
public void forEachBlock(Consumer<AccountingBlock> consumer) {
Lock readLock = readWriteLock.readLock();
readLock.lock();
try {
blocks.forEach(consumer);
} finally {
readLock.unlock();
}
}
public void purgeLastTenBlocks() {
Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
purgeLast10Blocks();
} finally {
writeLock.unlock();
}
}
public Optional<AccountingBlock> getLastBlock() {
Lock readLock = readWriteLock.readLock();
readLock.lock();
try {
return blocks.stream()
.max(Comparator.comparing(AccountingBlock::getHeight));
} finally {
readLock.unlock();
}
}
public Optional<AccountingBlock> getBlockAtHeight(int height) {
Lock readLock = readWriteLock.readLock();
try {
return blocks.stream()
.filter(block -> block.getHeight() == height)
.findAny();
} finally {
readLock.unlock();
}
}
public List<AccountingBlock> getBlocksAtLeastWithHeight(int minHeight) {
Lock readLock = readWriteLock.readLock();
readLock.lock();
try {
return blocks.stream()
.filter(block -> block.getHeight() >= minHeight)
.collect(Collectors.toList());
} finally {
readLock.unlock();
}
}
private void tryToAddNewBlock(AccountingBlock newBlock) throws BlockHeightNotConnectingException, BlockHashNotConnectingException {
if (!blocks.contains(newBlock)) {
Optional<AccountingBlock> optionalLastBlock = getLastBlock();
if (optionalLastBlock.isPresent()) {
AccountingBlock lastBlock = optionalLastBlock.get();
if (newBlock.getHeight() != lastBlock.getHeight() + 1) {
throw new BlockHeightNotConnectingException();
}
if (!Arrays.equals(newBlock.getTruncatedPreviousBlockHash(), lastBlock.getTruncatedHash())) {
throw new BlockHashNotConnectingException();
}
} else if (newBlock.getHeight() != EARLIEST_BLOCK_HEIGHT) {
throw new BlockHeightNotConnectingException();
}
log.info("Add new accountingBlock at height {} at {} with {} txs", newBlock.getHeight(),
new Date(newBlock.getDate()), newBlock.getTxs().size());
blocks.add(newBlock);
} else {
log.info("We have that block already. Height: {}", newBlock.getHeight());
}
}
private void purgeLast10Blocks() {
if (blocks.size() <= 10) {
blocks.clear();
return;
}
List<AccountingBlock> purged = new ArrayList<>(blocks.subList(0, blocks.size() - 10));
blocks.clear();
blocks.addAll(purged);
}
public Message toProtoMessage() {
return protobuf.PersistableEnvelope.newBuilder()
.setBurningManAccountingStore(protobuf.BurningManAccountingStore.newBuilder()

View file

@ -18,6 +18,8 @@
package bisq.core.dao.burningman.accounting.storage;
import bisq.core.dao.burningman.accounting.blockchain.AccountingBlock;
import bisq.core.dao.burningman.accounting.exceptions.BlockHashNotConnectingException;
import bisq.core.dao.burningman.accounting.exceptions.BlockHeightNotConnectingException;
import bisq.network.p2p.storage.persistence.ResourceDataStoreService;
import bisq.network.p2p.storage.persistence.StoreService;
@ -32,8 +34,9 @@ import javax.inject.Singleton;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
@ -60,29 +63,31 @@ public class BurningManAccountingStoreService extends StoreService<BurningManAcc
persistenceManager.requestPersistence();
}
public List<AccountingBlock> getBlocks() {
return Collections.unmodifiableList(store.getBlocks());
public void addIfNewBlock(AccountingBlock block) throws BlockHashNotConnectingException, BlockHeightNotConnectingException {
store.addIfNewBlock(block);
requestPersistence();
}
public void addBlock(AccountingBlock block) {
store.getBlocks().add(block);
requestPersistence();
public void forEachBlock(Consumer<AccountingBlock> consumer) {
store.forEachBlock(consumer);
}
public void purgeLastTenBlocks() {
List<AccountingBlock> blocks = store.getBlocks();
if (blocks.size() <= 10) {
blocks.clear();
requestPersistence();
return;
}
List<AccountingBlock> purged = new ArrayList<>(blocks.subList(0, blocks.size() - 10));
blocks.clear();
blocks.addAll(purged);
store.purgeLastTenBlocks();
requestPersistence();
}
public Optional<AccountingBlock> getLastBlock() {
return store.getLastBlock();
}
public Optional<AccountingBlock> getBlockAtHeight(int height) {
return store.getBlockAtHeight(height);
}
public List<AccountingBlock> getBlocksAtLeastWithHeight(int minHeight) {
return store.getBlocksAtLeastWithHeight(minHeight);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Protected