Use functional style for read/write locks. Add callable for throwable method calls.

This commit is contained in:
Manfred Karrer 2017-04-19 14:45:23 -05:00
parent 9dec307faa
commit a7c152f4ee
2 changed files with 117 additions and 200 deletions

View File

@ -1,13 +1,17 @@
package io.bisq.common.util;
import lombok.Getter;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
public class FunctionalReadWriteLock {
@Getter
private final Lock readLock;
@Getter
private final Lock writeLock;
public FunctionalReadWriteLock(boolean isFair) {
@ -37,6 +41,15 @@ public class FunctionalReadWriteLock {
}
}
public void write1(Callable block) throws Exception {
readLock.lock();
try {
block.call();
} finally {
readLock.unlock();
}
}
public <T> T write(Supplier<T> block) {
writeLock.lock();
try {
@ -55,4 +68,12 @@ public class FunctionalReadWriteLock {
}
}
public void write2(Callable block) throws Exception {
writeLock.lock();
try {
block.call();
} finally {
writeLock.unlock();
}
}
}

View File

@ -36,7 +36,6 @@ import javax.inject.Inject;
import javax.inject.Named;
import java.io.File;
import java.util.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
@ -115,8 +114,6 @@ public class BsqChainState implements Persistable {
transient private final Storage<BsqChainState> snapshotBsqChainStateStorage;
transient private BsqChainState snapshotCandidate;
transient private final FunctionalReadWriteLock lock;
transient private final ReentrantReadWriteLock.WriteLock writeLock;
transient private final ReentrantReadWriteLock.ReadLock readLock;
///////////////////////////////////////////////////////////////////////////////////////////
@ -144,11 +141,7 @@ public class BsqChainState implements Persistable {
genesisBlockHeight = TEST_NET_GENESIS_BLOCK_HEIGHT;
}
// TODO choose between two styles + consider using fair so that threads don't have to wait too long?
lock = new FunctionalReadWriteLock(true);
ReentrantReadWriteLock lock2 = new ReentrantReadWriteLock(true);
readLock = lock2.readLock();
writeLock = lock2.writeLock();
}
@ -190,12 +183,9 @@ public class BsqChainState implements Persistable {
}
public void setVotingFee(long fee, int blockHeight) {
try {
writeLock.lock();
lock.write(() -> {
votingFees.add(new Tuple2<>(fee, blockHeight));
} finally {
writeLock.unlock();
}
});
}
@ -205,13 +195,12 @@ public class BsqChainState implements Persistable {
void addBlock(BsqBlock block) throws BlockNotConnectingException {
try {
writeLock.lock();
lock.write2(() -> {
if (!blocks.contains(block)) {
if (blocks.isEmpty() || (blocks.getLast().getHash().equals(block.getPreviousBlockHash()) &&
blocks.getLast().getHeight() + 1 == block.getHeight())) {
blocks.add(block);
block.getTxs().stream().forEach(this::addTx);
block.getTxs().stream().forEach(BsqChainState.this::addTx);
chainHeadHeight = block.getHeight();
maybeMakeSnapshot();
//printDetails();
@ -224,107 +213,81 @@ public class BsqChainState implements Persistable {
} else {
log.trace("We got that block already");
}
} finally {
writeLock.unlock();
return null;
});
} catch (Exception e) {
throw new BlockNotConnectingException(block);
} catch (Throwable e) {
log.error(e.toString());
e.printStackTrace();
throw e;
}
}
void addTx(Tx tx) {
try {
writeLock.lock();
lock.write(() -> {
txMap.put(tx.getId(), tx);
} finally {
writeLock.unlock();
}
});
}
void addSpentTxWithSpentInfo(TxOutput spentTxOutput, SpentInfo spentInfo) {
// we only use spentInfoByTxOutputMap for json export
if (dumpBlockchainData) {
try {
writeLock.lock();
lock.write(() -> {
spentInfoByTxOutputMap.put(spentTxOutput.getTxIdIndexTuple(), spentInfo);
} finally {
writeLock.unlock();
}
});
}
}
void setGenesisTx(Tx tx) {
try {
writeLock.lock();
lock.write(() -> {
genesisTx = tx;
} finally {
writeLock.unlock();
}
});
}
void addUnspentTxOutput(TxOutput txOutput) {
try {
writeLock.lock();
lock.write(() -> {
unspentTxOutputSet.add(txOutput);
} finally {
writeLock.unlock();
}
});
}
void removeUnspentTxOutput(TxOutput txOutput) {
try {
writeLock.lock();
lock.write(() -> {
unspentTxOutputSet.remove(txOutput);
} finally {
writeLock.unlock();
}
});
}
void addBurnedFee(String txId, long burnedFee) {
try {
writeLock.lock();
lock.write(() -> {
burnedFeeByTxIdMap.put(txId, burnedFee);
} finally {
writeLock.unlock();
}
});
}
void addCompensationRequestOpReturnOutput(TxOutput opReturnTxOutput) {
try {
writeLock.lock();
lock.write(() -> {
compensationRequestOpReturnTxOutputs.add(opReturnTxOutput);
} finally {
writeLock.unlock();
}
});
}
void adCompensationRequestBtcTxOutputs(String btcAddress) {
try {
writeLock.lock();
lock.write(() -> {
compensationRequestBtcAddresses.add(btcAddress);
} finally {
writeLock.unlock();
});
}
}
void addVotingOpReturnOutput(TxOutput opReturnTxOutput) {
try {
writeLock.lock();
lock.write(() -> {
votingTxOutputs.add(opReturnTxOutput);
} finally {
writeLock.unlock();
}
});
}
void addIssuanceBtcTxOutput(TxOutput btcTxOutput) {
try {
writeLock.lock();
lock.write(() -> {
if (!issuanceBtcTxOutputsByBtcAddressMap.containsKey(btcTxOutput.getAddress()))
issuanceBtcTxOutputsByBtcAddressMap.put(btcTxOutput.getAddress(), new HashSet<>());
issuanceBtcTxOutputsByBtcAddressMap.get(btcTxOutput.getAddress()).add(btcTxOutput);
} finally {
writeLock.unlock();
}
});
}
@ -332,91 +295,60 @@ public class BsqChainState implements Persistable {
// Public read access
///////////////////////////////////////////////////////////////////////////////////////////
// TODO doesn't need a lock, it's a final String variable so should be thread safe
public String getGenesisTxId() {
try {
readLock.lock();
return genesisTxId;
} finally {
readLock.unlock();
}
}
public int getGenesisBlockHeight() {
try {
readLock.lock();
return lock.read(() -> {
return genesisBlockHeight;
} finally {
readLock.unlock();
}
});
}
public BsqChainState getClone() {
final byte[] serialize;
try {
readLock.lock();
serialize = Utilities.serialize(this);
} finally {
readLock.unlock();
}
return lock.read(() -> {
final byte[] serialize = Utilities.serialize(this);
return Utilities.<BsqChainState>deserialize(serialize);
});
}
public boolean containsBlock(BsqBlock bsqBlock) {
try {
readLock.lock();
return lock.read(() -> {
return blocks.contains(bsqBlock);
} finally {
readLock.unlock();
}
});
}
public boolean isTxOutputSpendable(String txId, int index) {
try {
readLock.lock();
return lock.read(() -> {
return getSpendableTxOutput(txId, index).isPresent();
} finally {
readLock.unlock();
}
});
}
public byte[] getSerializedBlocksFrom(int fromBlockHeight) {
try {
readLock.lock();
return lock.read(() -> {
List<BsqBlock> filtered = blocks.stream()
.filter(block -> block.getHeight() >= fromBlockHeight)
.collect(Collectors.toList());
return Utilities.<ArrayList<BsqBlock>>serialize(new ArrayList<>(filtered));
} finally {
readLock.unlock();
}
});
}
public boolean hasTxBurnedFee(String txId) {
try {
readLock.lock();
return lock.read(() -> {
return burnedFeeByTxIdMap.containsKey(txId) && burnedFeeByTxIdMap.get(txId) > 0;
} finally {
readLock.unlock();
}
});
}
public boolean containsTx(String txId) {
try {
readLock.lock();
return lock.read(() -> {
return getTx(txId).isPresent();
} finally {
readLock.unlock();
}
});
}
public int getChainHeadHeight() {
try {
readLock.lock();
return lock.read(() -> {
return chainHeadHeight;
} finally {
readLock.unlock();
}
});
}
@ -425,8 +357,7 @@ public class BsqChainState implements Persistable {
///////////////////////////////////////////////////////////////////////////////////////////
Optional<TxOutput> getSpendableTxOutput(String txId, int index) {
try {
readLock.lock();
return lock.read(() -> {
final Optional<TxOutput> spendingTxOutputOptional = getTx(txId).flatMap(e -> e.getTxOutput(index));
if (spendingTxOutputOptional.isPresent() &&
unspentTxOutputSet.contains(spendingTxOutputOptional.get()) &&
@ -435,14 +366,11 @@ public class BsqChainState implements Persistable {
} else {
return Optional.<TxOutput>empty();
}
} finally {
readLock.unlock();
}
});
}
long getCreateCompensationRequestFee(int blockHeight) {
try {
readLock.lock();
return lock.read(() -> {
long fee = -1;
for (Tuple2<Long, Integer> feeAtHeight : compensationRequestFees) {
if (feeAtHeight.second <= blockHeight)
@ -450,25 +378,19 @@ public class BsqChainState implements Persistable {
}
checkArgument(fee > -1, "compensationRequestFees must be set");
return fee;
} finally {
readLock.unlock();
}
});
}
//TODO not impl yet
boolean isCompensationRequestPeriodValid(int blockHeight) {
try {
readLock.lock();
return lock.read(() -> {
return true;
} finally {
readLock.unlock();
}
});
}
long getVotingFee(int blockHeight) {
try {
readLock.lock();
return lock.read(() -> {
long fee = -1;
for (Tuple2<Long, Integer> feeAtHeight : votingFees) {
if (feeAtHeight.second <= blockHeight)
@ -476,37 +398,34 @@ public class BsqChainState implements Persistable {
}
checkArgument(fee > -1, "compensationRequestFees must be set");
return fee;
} finally {
readLock.unlock();
}
});
}
//TODO not impl yet
boolean isVotingPeriodValid(int blockHeight) {
try {
readLock.lock();
return lock.read(() -> {
return true;
} finally {
readLock.unlock();
}
});
}
boolean containsCompensationRequestBtcAddress(String btcAddress) {
try {
readLock.lock();
return lock.read(() -> {
return compensationRequestBtcAddresses.contains(btcAddress);
} finally {
readLock.unlock();
}
});
}
Set<TxOutput> issuanceTxOutputsByBtcAddress(String btcAddress) {
try {
readLock.lock();
return lock.read(() -> {
return issuanceBtcTxOutputsByBtcAddressMap.get(btcAddress);
} finally {
readLock.unlock();
});
}
//TODO
// for genesis we dont need it and for issuance we need more implemented first
boolean isTxOutputMature(TxOutput spendingTxOutput) {
return lock.read(() -> {
return true;
});
}
@ -514,41 +433,18 @@ public class BsqChainState implements Persistable {
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private BsqChainState getClone(BsqChainState bsqChainState) {
final byte[] serialize = Utilities.serialize(bsqChainState);
return Utilities.<BsqChainState>deserialize(serialize);
}
//TODO
// for genesis we dont need it and for issuance we need more implemented first
private boolean isTxOutputMature(TxOutput spendingTxOutput) {
try {
readLock.lock();
return true;
} finally {
readLock.unlock();
}
}
private Optional<Tx> getTx(String txId) {
return lock.read(() -> {
readLock.lock();
return txMap.get(txId) != null ? Optional.of(txMap.get(txId)) : Optional.<Tx>empty();
});
}
private int getSnapshotHeight(int height) {
return getSnapshotHeight(genesisBlockHeight, height, SNAPSHOT_GRID);
}
private boolean isSnapshotHeight(int height) {
return isSnapshotHeight(genesisBlockHeight, height, SNAPSHOT_GRID);
}
private void maybeMakeSnapshot() {
try {
readLock.lock();
lock.read(() -> {
// dont access snapshotCandidate.getChainHeadHeight() as locks are transient and woudl give a null pointer!
if (isSnapshotHeight(getChainHeadHeight()) &&
(snapshotCandidate == null ||
@ -556,19 +452,19 @@ public class BsqChainState implements Persistable {
// At trigger event we store the latest snapshotCandidate to disc
if (snapshotCandidate != null) {
// We clone because storage is in a threaded context
final BsqChainState cloned = getClone(snapshotCandidate);
final byte[] serialize = Utilities.serialize(snapshotCandidate);
final BsqChainState cloned = Utilities.<BsqChainState>deserialize(serialize);
snapshotBsqChainStateStorage.queueUpForSave(cloned);
// dont access cloned anymore with methods as locks are transient!
log.info("Saved snapshotCandidate to Disc at height " + cloned.chainHeadHeight);
}
// Now we clone and keep it in memory for the next trigger
snapshotCandidate = getClone(this);
final byte[] serialize = Utilities.serialize(this);
snapshotCandidate = Utilities.<BsqChainState>deserialize(serialize);
// dont access cloned anymore with methods as locks are transient!
log.debug("Cloned new snapshotCandidate at height " + snapshotCandidate.chainHeadHeight);
}
} finally {
readLock.unlock();
}
});
}
private void printDetails() {