Fix BurningManAccountingStore data races

Multiple threads read and write to the accounting blocks list causing
data races. Luckily, the LinkedList threw a ConcurrentModificationException
to limit damage. Now, a ReadWriteLock protects the LinkedList against
data races. Multiple threads can read the list at the same time but only
one thread can write to it. Other writing threads wait until it's their
turn.

Fixes #6545
This commit is contained in:
Alva Swanson 2023-02-03 23:41:51 +01:00
parent 8dbdecd6f1
commit 4779c82d0b
No known key found for this signature in database
GPG key ID: 004760E77F753090
4 changed files with 139 additions and 48 deletions

View file

@ -45,7 +45,6 @@ import javax.inject.Singleton;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Comparator;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.HashMap;
@ -113,7 +112,7 @@ public class BurningManAccountingService implements DaoSetupService {
CompletableFuture.runAsync(() -> {
Map<String, BalanceModel> map = new HashMap<>();
// addAccountingBlockToBalanceModel takes about 500ms for 100k items, so we run it in a non UI thread.
getBlocks().forEach(block -> addAccountingBlockToBalanceModel(map, block));
burningManAccountingStoreService.forEachBlock(block -> addAccountingBlockToBalanceModel(map, block));
UserThread.execute(() -> balanceModelByBurningManName.putAll(map));
});
}
@ -125,7 +124,7 @@ public class BurningManAccountingService implements DaoSetupService {
public void onInitialBlockRequestsComplete() {
updateBalanceModelByAddress();
getBlocks().forEach(this::addAccountingBlockToBalanceModel);
burningManAccountingStoreService.forEachBlock(this::addAccountingBlockToBalanceModel);
}
public void onNewBlockReceived(AccountingBlock accountingBlock) {
@ -134,25 +133,7 @@ public class BurningManAccountingService implements DaoSetupService {
}
public void addBlock(AccountingBlock block) throws BlockHashNotConnectingException, BlockHeightNotConnectingException {
if (!getBlocks().contains(block)) {
Optional<AccountingBlock> optionalLastBlock = getLastBlock();
if (optionalLastBlock.isPresent()) {
AccountingBlock lastBlock = optionalLastBlock.get();
if (block.getHeight() != lastBlock.getHeight() + 1) {
throw new BlockHeightNotConnectingException();
}
if (!Arrays.equals(block.getTruncatedPreviousBlockHash(), lastBlock.getTruncatedHash())) {
throw new BlockHashNotConnectingException();
}
} else if (block.getHeight() != EARLIEST_BLOCK_HEIGHT) {
throw new BlockHeightNotConnectingException();
}
log.info("Add new accountingBlock at height {} at {} with {} txs", block.getHeight(),
new Date(block.getDate()), block.getTxs().size());
burningManAccountingStoreService.addBlock(block);
} else {
log.info("We have that block already. Height: {}", block.getHeight());
}
burningManAccountingStoreService.addIfNewBlock(block);
}
public int getBlockHeightOfLastBlock() {
@ -160,11 +141,11 @@ public class BurningManAccountingService implements DaoSetupService {
}
public Optional<AccountingBlock> getLastBlock() {
return getBlocks().stream().max(Comparator.comparing(AccountingBlock::getHeight));
return burningManAccountingStoreService.getLastBlock();
}
public Optional<AccountingBlock> getBlockAtHeight(int height) {
return getBlocks().stream().filter(block -> block.getHeight() == height).findAny();
return burningManAccountingStoreService.getBlockAtHeight(height);
}
public Map<Date, Price> getAverageBsqPriceByMonth() {
@ -213,8 +194,8 @@ public class BurningManAccountingService implements DaoSetupService {
// Delegates
///////////////////////////////////////////////////////////////////////////////////////////
public List<AccountingBlock> getBlocks() {
return burningManAccountingStoreService.getBlocks();
public List<AccountingBlock> getBlocksAtLeastWithHeight(int minHeight) {
return burningManAccountingStoreService.getBlocksAtLeastWithHeight(minHeight);
}
public Map<String, String> getBurningManNameByAddress() {

View file

@ -39,7 +39,6 @@ import com.google.common.util.concurrent.SettableFuture;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@ -92,9 +91,7 @@ class GetAccountingBlocksRequestHandler {
public void onGetBlocksRequest(GetAccountingBlocksRequest request, Connection connection) {
long ts = System.currentTimeMillis();
List<AccountingBlock> blocks = burningManAccountingService.getBlocks().stream()
.filter(block -> block.getHeight() >= request.getFromBlockHeight())
.collect(Collectors.toList());
List<AccountingBlock> blocks = burningManAccountingService.getBlocksAtLeastWithHeight(request.getFromBlockHeight());
byte[] signature = AccountingNode.getSignature(AccountingNode.getSha256Hash(blocks), bmOracleNodePrivKey);
GetAccountingBlocksResponse getBlocksResponse = new GetAccountingBlocksResponse(blocks, request.getNonce(), bmOracleNodePubKey, signature);
log.info("Received GetAccountingBlocksRequest from {} for blocks from height {}. " +

View file

@ -19,27 +19,135 @@ package bisq.core.dao.burningman.accounting.storage;
import bisq.core.dao.burningman.accounting.blockchain.AccountingBlock;
import bisq.core.dao.burningman.accounting.exceptions.BlockHashNotConnectingException;
import bisq.core.dao.burningman.accounting.exceptions.BlockHeightNotConnectingException;
import bisq.common.proto.persistable.PersistableEnvelope;
import com.google.protobuf.Message;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import static bisq.core.dao.burningman.accounting.BurningManAccountingService.EARLIEST_BLOCK_HEIGHT;
@Slf4j
@Getter
public class BurningManAccountingStore implements PersistableEnvelope {
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final LinkedList<AccountingBlock> blocks = new LinkedList<>();
public BurningManAccountingStore(List<AccountingBlock> blocks) {
this.blocks.addAll(blocks);
}
public void addIfNewBlock(AccountingBlock newBlock) throws BlockHeightNotConnectingException, BlockHashNotConnectingException {
Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
tryToAddNewBlock(newBlock);
} finally {
writeLock.unlock();
}
}
public void forEachBlock(Consumer<AccountingBlock> consumer) {
Lock readLock = readWriteLock.readLock();
readLock.lock();
try {
blocks.forEach(consumer);
} finally {
readLock.unlock();
}
}
public void purgeLastTenBlocks() {
Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
purgeLast10Blocks();
} finally {
writeLock.unlock();
}
}
public Optional<AccountingBlock> getLastBlock() {
Lock readLock = readWriteLock.readLock();
readLock.lock();
try {
return blocks.stream()
.max(Comparator.comparing(AccountingBlock::getHeight));
} finally {
readLock.unlock();
}
}
public Optional<AccountingBlock> getBlockAtHeight(int height) {
Lock readLock = readWriteLock.readLock();
try {
return blocks.stream()
.filter(block -> block.getHeight() == height)
.findAny();
} finally {
readLock.unlock();
}
}
public List<AccountingBlock> getBlocksAtLeastWithHeight(int minHeight) {
Lock readLock = readWriteLock.readLock();
readLock.lock();
try {
return blocks.stream()
.filter(block -> block.getHeight() >= minHeight)
.collect(Collectors.toList());
} finally {
readLock.unlock();
}
}
private void tryToAddNewBlock(AccountingBlock newBlock) throws BlockHeightNotConnectingException, BlockHashNotConnectingException {
if (!blocks.contains(newBlock)) {
Optional<AccountingBlock> optionalLastBlock = getLastBlock();
if (optionalLastBlock.isPresent()) {
AccountingBlock lastBlock = optionalLastBlock.get();
if (newBlock.getHeight() != lastBlock.getHeight() + 1) {
throw new BlockHeightNotConnectingException();
}
if (!Arrays.equals(newBlock.getTruncatedPreviousBlockHash(), lastBlock.getTruncatedHash())) {
throw new BlockHashNotConnectingException();
}
} else if (newBlock.getHeight() != EARLIEST_BLOCK_HEIGHT) {
throw new BlockHeightNotConnectingException();
}
log.info("Add new accountingBlock at height {} at {} with {} txs", newBlock.getHeight(),
new Date(newBlock.getDate()), newBlock.getTxs().size());
blocks.add(newBlock);
} else {
log.info("We have that block already. Height: {}", newBlock.getHeight());
}
}
private void purgeLast10Blocks() {
if (blocks.size() <= 10) {
blocks.clear();
return;
}
List<AccountingBlock> purged = new ArrayList<>(blocks.subList(0, blocks.size() - 10));
blocks.clear();
blocks.addAll(purged);
}
public Message toProtoMessage() {
return protobuf.PersistableEnvelope.newBuilder()
.setBurningManAccountingStore(protobuf.BurningManAccountingStore.newBuilder()

View file

@ -18,6 +18,8 @@
package bisq.core.dao.burningman.accounting.storage;
import bisq.core.dao.burningman.accounting.blockchain.AccountingBlock;
import bisq.core.dao.burningman.accounting.exceptions.BlockHashNotConnectingException;
import bisq.core.dao.burningman.accounting.exceptions.BlockHeightNotConnectingException;
import bisq.network.p2p.storage.persistence.ResourceDataStoreService;
import bisq.network.p2p.storage.persistence.StoreService;
@ -32,8 +34,9 @@ import javax.inject.Singleton;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
@ -60,29 +63,31 @@ public class BurningManAccountingStoreService extends StoreService<BurningManAcc
persistenceManager.requestPersistence();
}
public List<AccountingBlock> getBlocks() {
return Collections.unmodifiableList(store.getBlocks());
public void addIfNewBlock(AccountingBlock block) throws BlockHashNotConnectingException, BlockHeightNotConnectingException {
store.addIfNewBlock(block);
requestPersistence();
}
public void addBlock(AccountingBlock block) {
store.getBlocks().add(block);
requestPersistence();
public void forEachBlock(Consumer<AccountingBlock> consumer) {
store.forEachBlock(consumer);
}
public void purgeLastTenBlocks() {
List<AccountingBlock> blocks = store.getBlocks();
if (blocks.size() <= 10) {
blocks.clear();
requestPersistence();
return;
}
List<AccountingBlock> purged = new ArrayList<>(blocks.subList(0, blocks.size() - 10));
blocks.clear();
blocks.addAll(purged);
store.purgeLastTenBlocks();
requestPersistence();
}
public Optional<AccountingBlock> getLastBlock() {
return store.getLastBlock();
}
public Optional<AccountingBlock> getBlockAtHeight(int height) {
return store.getBlockAtHeight(height);
}
public List<AccountingBlock> getBlocksAtLeastWithHeight(int minHeight) {
return store.getBlocksAtLeastWithHeight(minHeight);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Protected