P2PDataStorage and FileManager improvements (#3690)

* [PR COMMENTS] Make maxSequenceNumberBeforePurge final

Instead of using a subclass that overwrites a value, utilize Guice
to inject the real value of 10000 in the app and let the tests overwrite
it with their own.

* [TESTS] Clean up 'Analyze Code' warnings

Remove unused imports and clean up some access modifiers now that
the final test structure is complete

* [REFACTOR] HashMapListener::onAdded/onRemoved

Previously, this interface was called each time an item was changed. This
required listeners to understand performance implications of multiple
adds or removes in a short time span.

Instead, give each listener the ability to process a list of added or
removed entrys which can help them avoid performance issues.

This patch is just a refactor. Each listener is called once for each
ProtectedStorageEntry. Future patches will change this.

* [REFACTOR] removeFromMapAndDataStore can operate on Collections

Minor performance overhead for constructing MapEntry and Collections
of one element, but keeps the code cleaner and all removes can still
use the same logic to remove from map, delete from data store, signal
listeners, etc.

The MapEntry type is used instead of Pair since it will require less
operations when this is eventually used in the removeExpiredEntries path.

* Change removeFromMapAndDataStore to signal listeners at the end in a batch

All current users still call this one-at-a-time. But, it gives the ability
for the expire code path to remove in a batch.

* Update removeExpiredEntries to remove all items in a batch

This will cause HashMapChangedListeners to receive just one onRemoved()
call for the expire work instead of multiple onRemoved() calls for each
item.

This required a bit of updating for the remove validation in tests so
that it correctly compares onRemoved with multiple items.

* ProposalService::onProtectedDataRemoved signals listeners once on batch removes

#3143 identified an issue that tempProposals listeners were being
signaled once for each item that was removed during the P2PDataStore
operation that expired old TempProposal objects. Some of the listeners
are very expensive (ProposalListPresentation::updateLists()) which results
in large UI performance issues.

Now that the infrastructure is in place to receive updates from the
P2PDataStore in a batch, the ProposalService can apply all of the removes
received from the P2PDataStore at once. This results in only 1 onChanged()
callback for each listener.

The end result is that updateLists() is only called once and the performance
problems are reduced.

This removes the need for #3148 and those interfaces will be removed in
the next patch.

* Remove HashmapChangedListener::onBatch operations

Now that the only user of this interface has been removed, go ahead
and delete it. This is a partial revert of
f5d75c4f60 that includes the code that was
added into ProposalService that subscribed to the P2PDataStore.

* [TESTS] Regression test for #3629

Write a test that shows the incorrect behavior for #3629, the hashmap
is rebuilt from disk using the 20-byte key instead of the 32-byte key.

* [BUGFIX] Reconstruct HashMap using 32-byte key

Addresses the first half of #3629 by ensuring that the reconstructed
HashMap always has the 32-byte key for each payload.

It turns out, the TempProposalStore persists the ProtectedStorageEntrys
on-disk as a List and doesn't persist the key at all. Then, on
reconstruction, it creates the 20-byte key for its internal map.

The fix is to update the TempProposalStore to use the 32-byte key instead.
This means that all writes, reads, and reconstrution of the TempProposalStore
uses the 32-byte key which matches perfectly with the in-memory map
of the P2PDataStorage that expects 32-byte keys.

Important to note that until all seednodes receive this update, nodes
will continue to have both the 20-byte and 32-byte keys in their HashMap.

* [BUGFIX] Use 32-byte key in requestData path

Addresses the second half of #3629 by using the HashMap, not the
protectedDataStore to generate the known keys in the requestData path.

This won't have any bandwidth reduction until all seednodes have the
update and only have the 32-byte key in their HashMap.

fixes #3629

* [DEAD CODE] Remove getProtectedDataStoreMap

The only user has been migrated to getMap(). Delete it so future
development doesn't have the same 20-byte vs 32-byte key issue.

* [TESTS] Allow tests to validate SequenceNumberMap write separately

In order to implement remove-before-add behavior, we need a way to
verify that the SequenceNumberMap was the only item updated.

* Implement remove-before-add message sequence behavior

It is possible to receive a RemoveData or RemoveMailboxData message
before the relevant AddData, but the current code does not handle
it.

This results in internal state updates and signal handler's being called
when an Add is received with a lower sequence number than a previously
seen Remove.

Minor test validation changes to allow tests to specify that only the
SequenceNumberMap should be written during an operation.

* [TESTS] Allow remove() verification to be more flexible

Now that we have introduced remove-before-add, we need a way
to validate that the SequenceNumberMap was written, but nothing
else. Add this feature to the validation path.

* Broadcast remove-before-add messages to P2P network

In order to aid in propagation of remove() messages, broadcast them
in the event the remove is seen before the add.

* [TESTS] Clean up remove verification helpers

Now that there are cases where the SequenceNumberMap and Broadcast
are called, but no other internal state is updated, the existing helper
functions conflate too many decisions. Remove them in favor of explicitly
defining each state change expected.

* [BUGFIX] Fix duplicate sequence number use case (startup)

Fix a bug introduced in d484617385 that
did not properly handle a valid use case for duplicate sequence numbers.

For in-memory-only ProtectedStoragePayloads, the client nodes need a way
to reconstruct the Payloads after startup from peer and seed nodes. This
involves sending a ProtectedStorageEntry with a sequence number that
is equal to the last one the client had already seen.

This patch adds tests to confirm the bug and fix as well as the changes
necessary to allow adding of Payloads that were previously seen, but
removed during a restart.

* Clean up AtomicBoolean usage in FileManager

Although the code was correct, it was hard to understand the relationship
between the to-be-written object and the savePending flag.

Trade two dependent atomics for one and comment the code to make it more
clear for the next reader.

* [DEADCODE] Clean up FileManager.java

* [BUGFIX] Shorter delay values not taking precedence

Fix a bug in the FileManager where a saveLater called with a low delay
won't execute until the delay specified by a previous saveLater call.

The trade off here is the execution of a task that returns early vs.
losing the requested delay.

* [REFACTOR] Inline saveNowInternal

Only one caller after deadcode removal.
This commit is contained in:
Christoph Atteneder 2019-11-26 14:34:32 +01:00 committed by GitHub
commit 66b2306ed9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 605 additions and 344 deletions

View File

@ -36,7 +36,7 @@ import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import lombok.extern.slf4j.Slf4j;
@ -46,10 +46,9 @@ public class FileManager<T extends PersistableEnvelope> {
private final File dir;
private final File storageFile;
private final ScheduledThreadPoolExecutor executor;
private final AtomicBoolean savePending;
private final long delay;
private final Callable<Void> saveFileTask;
private T persistable;
private final AtomicReference<T> nextWrite;
private final PersistenceProtoResolver persistenceProtoResolver;
private final ReentrantLock writeLock = CycleDetectingLockFactory.newInstance(CycleDetectingLockFactory.Policies.THROW).newReentrantLock("writeLock");
@ -61,26 +60,28 @@ public class FileManager<T extends PersistableEnvelope> {
this.dir = dir;
this.storageFile = storageFile;
this.persistenceProtoResolver = persistenceProtoResolver;
this.nextWrite = new AtomicReference<>(null);
executor = Utilities.getScheduledThreadPoolExecutor("FileManager", 1, 10, 5);
// File must only be accessed from the auto-save executor from now on, to avoid simultaneous access.
savePending = new AtomicBoolean();
this.delay = delay;
saveFileTask = () -> {
try {
Thread.currentThread().setName("Save-file-task-" + new Random().nextInt(10000));
// Runs in an auto save thread.
// TODO: this looks like it could cause corrupt data as the savePending is unset before the actual
// save. By moving to after the save there might be some persist operations that are not performed
// and data would be lost. Probably all persist operations should happen sequencially rather than
// skip one when there is already one scheduled
if (!savePending.getAndSet(false)) {
// Some other scheduled request already beat us to it.
// Atomically take the next object to write and set the value to null so concurrent saveFileTask
// won't duplicate work.
T persistable = this.nextWrite.getAndSet(null);
// If null, a concurrent saveFileTask already grabbed the data. Don't duplicate work.
if (persistable == null)
return null;
}
saveNowInternal(persistable);
long now = System.currentTimeMillis();
saveToFile(persistable, dir, storageFile);
log.debug("Save {} completed in {} msec", storageFile, System.currentTimeMillis() - now);
} catch (Throwable e) {
log.error("Error during saveFileTask", e);
}
@ -96,26 +97,20 @@ public class FileManager<T extends PersistableEnvelope> {
// API
///////////////////////////////////////////////////////////////////////////////////////////
/**
* Actually write the wallet file to disk, using an atomic rename when possible. Runs on the current thread.
*/
public void saveNow(T persistable) {
saveNowInternal(persistable);
}
/**
* Queues up a save in the background. Useful for not very important wallet changes.
*/
public void saveLater(T persistable) {
void saveLater(T persistable) {
saveLater(persistable, delay);
}
public void saveLater(T persistable, long delayInMilli) {
this.persistable = persistable;
if (savePending.getAndSet(true))
return; // Already pending.
// Atomically set the value of the next write. This allows batching of multiple writes of the same data
// structure if there are multiple calls to saveLater within a given `delayInMillis`.
this.nextWrite.set(persistable);
// Always schedule a write. It is possible that a previous saveLater was called with a larger `delayInMilli`
// and we want the lower delay to execute. The saveFileTask handles concurrent operations.
executor.schedule(saveFileTask, delayInMilli, TimeUnit.MILLISECONDS);
}
@ -134,7 +129,7 @@ public class FileManager<T extends PersistableEnvelope> {
}
}
public synchronized void removeFile(String fileName) {
synchronized void removeFile(String fileName) {
File file = new File(dir, fileName);
boolean result = file.delete();
if (!result)
@ -155,7 +150,7 @@ public class FileManager<T extends PersistableEnvelope> {
/**
* Shut down auto-saving.
*/
void shutDown() {
private void shutDown() {
executor.shutdown();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
@ -175,11 +170,11 @@ public class FileManager<T extends PersistableEnvelope> {
FileUtil.renameFile(storageFile, corruptedFile);
}
public synchronized void removeAndBackupFile(String fileName) throws IOException {
synchronized void removeAndBackupFile(String fileName) throws IOException {
removeAndBackupFile(dir, storageFile, fileName, "backup_of_corrupted_data");
}
public synchronized void backupFile(String fileName, int numMaxBackupFiles) {
synchronized void backupFile(String fileName, int numMaxBackupFiles) {
FileUtil.rollingBackup(dir, fileName, numMaxBackupFiles);
}
@ -187,12 +182,6 @@ public class FileManager<T extends PersistableEnvelope> {
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void saveNowInternal(T persistable) {
long now = System.currentTimeMillis();
saveToFile(persistable, dir, storageFile);
log.debug("Save {} completed in {} msec", storageFile, System.currentTimeMillis() - now);
}
private synchronized void saveToFile(T persistable, File dir, File storageFile) {
File tempFile = null;
FileOutputStream fileOutputStream = null;

View File

@ -44,6 +44,8 @@ import java.security.SignatureException;
import java.math.BigInteger;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -79,22 +81,26 @@ public class AlertManager {
if (!ignoreDevMsg) {
p2PService.addHashSetChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedStorageEntry data) {
final ProtectedStoragePayload protectedStoragePayload = data.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
Alert alert = (Alert) protectedStoragePayload;
if (verifySignature(alert))
alertMessageProperty.set(alert);
}
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
Alert alert = (Alert) protectedStoragePayload;
if (verifySignature(alert))
alertMessageProperty.set(alert);
}
});
}
@Override
public void onRemoved(ProtectedStorageEntry data) {
final ProtectedStoragePayload protectedStoragePayload = data.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
if (verifySignature((Alert) protectedStoragePayload))
alertMessageProperty.set(null);
}
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
if (verifySignature((Alert) protectedStoragePayload))
alertMessageProperty.set(null);
}
});
}
});
}

View File

@ -20,16 +20,11 @@ package bisq.core.dao.governance.proposal;
import bisq.core.btc.wallet.BsqWalletService;
import bisq.core.dao.DaoSetupService;
import bisq.core.dao.governance.proposal.storage.appendonly.ProposalPayload;
import bisq.core.dao.governance.proposal.storage.temp.TempProposalPayload;
import bisq.core.dao.state.DaoStateListener;
import bisq.core.dao.state.DaoStateService;
import bisq.core.dao.state.model.blockchain.Block;
import bisq.core.dao.state.model.governance.Proposal;
import bisq.network.p2p.storage.HashMapChangedListener;
import bisq.network.p2p.storage.P2PDataStorage;
import bisq.network.p2p.storage.payload.ProtectedStorageEntry;
import bisq.common.UserThread;
import org.bitcoinj.core.TransactionConfidence;
@ -55,8 +50,7 @@ import lombok.extern.slf4j.Slf4j;
* our own proposal that is not critical). Foreign proposals are only shown if confirmed and fully validated.
*/
@Slf4j
public class ProposalListPresentation implements DaoStateListener, HashMapChangedListener,
MyProposalListService.Listener, DaoSetupService {
public class ProposalListPresentation implements DaoStateListener, MyProposalListService.Listener, DaoSetupService {
private final ProposalService proposalService;
private final DaoStateService daoStateService;
private final MyProposalListService myProposalListService;
@ -66,7 +60,6 @@ public class ProposalListPresentation implements DaoStateListener, HashMapChange
@Getter
private final FilteredList<Proposal> activeOrMyUnconfirmedProposals = new FilteredList<>(allProposals);
private final ListChangeListener<Proposal> proposalListChangeListener;
private boolean tempProposalsChanged;
///////////////////////////////////////////////////////////////////////////////////////////
@ -76,7 +69,6 @@ public class ProposalListPresentation implements DaoStateListener, HashMapChange
@Inject
public ProposalListPresentation(ProposalService proposalService,
DaoStateService daoStateService,
P2PDataStorage p2PDataStorage,
MyProposalListService myProposalListService,
BsqWalletService bsqWalletService,
ProposalValidatorProvider validatorProvider) {
@ -87,7 +79,6 @@ public class ProposalListPresentation implements DaoStateListener, HashMapChange
this.validatorProvider = validatorProvider;
daoStateService.addDaoStateListener(this);
p2PDataStorage.addHashMapChangedListener(this);
myProposalListService.addListener(this);
proposalListChangeListener = c -> updateLists();
@ -124,44 +115,6 @@ public class ProposalListPresentation implements DaoStateListener, HashMapChange
updateLists();
}
///////////////////////////////////////////////////////////////////////////////////////////
// HashMapChangedListener
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onAdded(ProtectedStorageEntry entry) {
if (entry.getProtectedStoragePayload() instanceof TempProposalPayload) {
tempProposalsChanged = true;
}
}
@Override
public void onRemoved(ProtectedStorageEntry entry) {
if (entry.getProtectedStoragePayload() instanceof TempProposalPayload) {
tempProposalsChanged = true;
}
}
@Override
public void onBatchRemoveExpiredDataStarted() {
// We temporary remove the listener when batch processing starts to avoid that we rebuild our lists at each
// remove call. After batch processing at onBatchRemoveExpiredDataCompleted we add again our listener and call
// the updateLists method.
proposalService.getTempProposals().removeListener(proposalListChangeListener);
}
@Override
public void onBatchRemoveExpiredDataCompleted() {
proposalService.getTempProposals().addListener(proposalListChangeListener);
// We only call updateLists if tempProposals have changed. updateLists() is an expensive call and takes 200 ms.
if (tempProposalsChanged) {
updateLists();
tempProposalsChanged = false;
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// MyProposalListService.Listener
///////////////////////////////////////////////////////////////////////////////////////////

View File

@ -50,6 +50,8 @@ import javax.inject.Named;
import javafx.collections.FXCollections;
import javafx.collections.ObservableList;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@ -133,13 +135,15 @@ public class ProposalService implements HashMapChangedListener, AppendOnlyDataSt
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onAdded(ProtectedStorageEntry entry) {
onProtectedDataAdded(entry, true);
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
onProtectedDataAdded(protectedStorageEntry, true);
});
}
@Override
public void onRemoved(ProtectedStorageEntry entry) {
onProtectedDataRemoved(entry);
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
onProtectedDataRemoved(protectedStorageEntries);
}
@ -266,30 +270,39 @@ public class ProposalService implements HashMapChangedListener, AppendOnlyDataSt
}
}
private void onProtectedDataRemoved(ProtectedStorageEntry entry) {
ProtectedStoragePayload protectedStoragePayload = entry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof TempProposalPayload) {
Proposal proposal = ((TempProposalPayload) protectedStoragePayload).getProposal();
// We allow removal only if we are in the proposal phase.
boolean inPhase = periodService.isInPhase(daoStateService.getChainHeight(), DaoPhase.Phase.PROPOSAL);
boolean txInPastCycle = periodService.isTxInPastCycle(proposal.getTxId(), daoStateService.getChainHeight());
Optional<Tx> tx = daoStateService.getTx(proposal.getTxId());
boolean unconfirmedOrNonBsqTx = !tx.isPresent();
// if the tx is unconfirmed we need to be in the PROPOSAL phase, otherwise the tx must be confirmed.
if (inPhase || txInPastCycle || unconfirmedOrNonBsqTx) {
if (tempProposals.contains(proposal)) {
tempProposals.remove(proposal);
log.debug("We received a remove request for a TempProposalPayload and have removed the proposal " +
"from our list. proposal creation date={}, proposalTxId={}, inPhase={}, " +
"txInPastCycle={}, unconfirmedOrNonBsqTx={}",
proposal.getCreationDateAsDate(), proposal.getTxId(), inPhase, txInPastCycle, unconfirmedOrNonBsqTx);
private void onProtectedDataRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
// The listeners of tmpProposals can do large amounts of work that cause performance issues. Apply all of the
// updates at once using retainAll which will cause all listeners to be updated only once.
ArrayList<Proposal> tempProposalsWithUpdates = new ArrayList<>(tempProposals);
protectedStorageEntries.forEach(protectedStorageEntry -> {
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof TempProposalPayload) {
Proposal proposal = ((TempProposalPayload) protectedStoragePayload).getProposal();
// We allow removal only if we are in the proposal phase.
boolean inPhase = periodService.isInPhase(daoStateService.getChainHeight(), DaoPhase.Phase.PROPOSAL);
boolean txInPastCycle = periodService.isTxInPastCycle(proposal.getTxId(), daoStateService.getChainHeight());
Optional<Tx> tx = daoStateService.getTx(proposal.getTxId());
boolean unconfirmedOrNonBsqTx = !tx.isPresent();
// if the tx is unconfirmed we need to be in the PROPOSAL phase, otherwise the tx must be confirmed.
if (inPhase || txInPastCycle || unconfirmedOrNonBsqTx) {
if (tempProposalsWithUpdates.contains(proposal)) {
tempProposalsWithUpdates.remove(proposal);
log.debug("We received a remove request for a TempProposalPayload and have removed the proposal " +
"from our list. proposal creation date={}, proposalTxId={}, inPhase={}, " +
"txInPastCycle={}, unconfirmedOrNonBsqTx={}",
proposal.getCreationDateAsDate(), proposal.getTxId(), inPhase, txInPastCycle, unconfirmedOrNonBsqTx);
}
} else {
log.warn("We received a remove request outside the PROPOSAL phase. " +
"Proposal creation date={}, proposal.txId={}, current blockHeight={}",
proposal.getCreationDateAsDate(), proposal.getTxId(), daoStateService.getChainHeight());
}
} else {
log.warn("We received a remove request outside the PROPOSAL phase. " +
"Proposal creation date={}, proposal.txId={}, current blockHeight={}",
proposal.getCreationDateAsDate(), proposal.getTxId(), daoStateService.getChainHeight());
}
}
});
tempProposals.retainAll(tempProposalsWithUpdates);
}
private void onAppendOnlyDataAdded(PersistableNetworkPayload persistableNetworkPayload, boolean fromBroadcastMessage) {

View File

@ -56,7 +56,7 @@ public class TempProposalStore implements PersistableEnvelope {
///////////////////////////////////////////////////////////////////////////////////////////
private TempProposalStore(List<ProtectedStorageEntry> list) {
list.forEach(entry -> map.put(P2PDataStorage.getCompactHashAsByteArray(entry.getProtectedStoragePayload()), entry));
list.forEach(entry -> map.put(P2PDataStorage.get32ByteHashAsByteArray(entry.getProtectedStoragePayload()), entry));
}
public Message toProtoMessage() {

View File

@ -52,6 +52,7 @@ import java.security.SignatureException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
@ -133,23 +134,27 @@ public class FilterManager {
p2PService.addHashSetChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedStorageEntry data) {
if (data.getProtectedStoragePayload() instanceof Filter) {
Filter filter = (Filter) data.getProtectedStoragePayload();
boolean wasValid = addFilter(filter);
if (!wasValid) {
UserThread.runAfter(() -> p2PService.getP2PDataStorage().removeInvalidProtectedStorageEntry(data), 1);
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (protectedStorageEntry.getProtectedStoragePayload() instanceof Filter) {
Filter filter = (Filter) protectedStorageEntry.getProtectedStoragePayload();
boolean wasValid = addFilter(filter);
if (!wasValid) {
UserThread.runAfter(() -> p2PService.getP2PDataStorage().removeInvalidProtectedStorageEntry(protectedStorageEntry), 1);
}
}
}
});
}
@Override
public void onRemoved(ProtectedStorageEntry data) {
if (data.getProtectedStoragePayload() instanceof Filter) {
Filter filter = (Filter) data.getProtectedStoragePayload();
if (verifySignature(filter))
resetFilters();
}
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (protectedStorageEntry.getProtectedStoragePayload() instanceof Filter) {
Filter filter = (Filter) protectedStorageEntry.getProtectedStoragePayload();
if (verifySignature(filter))
resetFilters();
}
});
}
});
}

View File

@ -40,6 +40,7 @@ import javax.inject.Inject;
import java.io.File;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
@ -87,26 +88,30 @@ public class OfferBookService {
p2PService.addHashSetChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedStorageEntry data) {
offerBookChangedListeners.stream().forEach(listener -> {
if (data.getProtectedStoragePayload() instanceof OfferPayload) {
OfferPayload offerPayload = (OfferPayload) data.getProtectedStoragePayload();
Offer offer = new Offer(offerPayload);
offer.setPriceFeedService(priceFeedService);
listener.onAdded(offer);
}
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
offerBookChangedListeners.stream().forEach(listener -> {
if (protectedStorageEntry.getProtectedStoragePayload() instanceof OfferPayload) {
OfferPayload offerPayload = (OfferPayload) protectedStorageEntry.getProtectedStoragePayload();
Offer offer = new Offer(offerPayload);
offer.setPriceFeedService(priceFeedService);
listener.onAdded(offer);
}
});
});
}
@Override
public void onRemoved(ProtectedStorageEntry data) {
offerBookChangedListeners.stream().forEach(listener -> {
if (data.getProtectedStoragePayload() instanceof OfferPayload) {
OfferPayload offerPayload = (OfferPayload) data.getProtectedStoragePayload();
Offer offer = new Offer(offerPayload);
offer.setPriceFeedService(priceFeedService);
listener.onRemoved(offer);
}
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
offerBookChangedListeners.stream().forEach(listener -> {
if (protectedStorageEntry.getProtectedStoragePayload() instanceof OfferPayload) {
OfferPayload offerPayload = (OfferPayload) protectedStorageEntry.getProtectedStoragePayload();
Offer offer = new Offer(offerPayload);
offer.setPriceFeedService(priceFeedService);
listener.onRemoved(offer);
}
});
});
}
});

View File

@ -46,6 +46,7 @@ import java.security.SignatureException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -131,18 +132,22 @@ public abstract class DisputeAgentManager<T extends DisputeAgent> {
public void onAllServicesInitialized() {
disputeAgentService.addHashSetChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedStorageEntry data) {
if (isExpectedInstance(data)) {
updateMap();
}
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (isExpectedInstance(protectedStorageEntry)) {
updateMap();
}
});
}
@Override
public void onRemoved(ProtectedStorageEntry data) {
if (isExpectedInstance(data)) {
updateMap();
removeAcceptedDisputeAgentFromUser(data);
}
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (isExpectedInstance(protectedStorageEntry)) {
updateMap();
removeAcceptedDisputeAgentFromUser(protectedStorageEntry);
}
});
}
});

View File

@ -0,0 +1,127 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.core.dao.governance.proposal;
import bisq.core.dao.governance.period.PeriodService;
import bisq.core.dao.governance.proposal.storage.appendonly.ProposalStorageService;
import bisq.core.dao.governance.proposal.storage.temp.TempProposalPayload;
import bisq.core.dao.governance.proposal.storage.temp.TempProposalStorageService;
import bisq.core.dao.state.DaoStateService;
import bisq.core.dao.state.model.governance.DaoPhase;
import bisq.core.dao.state.model.governance.Proposal;
import bisq.network.p2p.P2PService;
import bisq.network.p2p.storage.payload.ProtectedStorageEntry;
import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreService;
import bisq.network.p2p.storage.persistence.ProtectedDataStoreService;
import javafx.collections.ListChangeListener;
import java.util.Arrays;
import java.util.Collections;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.Mockito.*;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/**
* Tests of the P2PDataStorage::onRemoved callback behavior to ensure that the proper number of signal events occur.
*/
public class ProposalServiceP2PDataStorageListenerTest {
private ProposalService proposalService;
@Mock
private PeriodService periodService;
@Mock
private DaoStateService daoStateService;
@Mock
private ListChangeListener<Proposal> tempProposalListener;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
this.proposalService = new ProposalService(
mock(P2PService.class),
this.periodService,
mock(ProposalStorageService.class),
mock(TempProposalStorageService.class),
mock(AppendOnlyDataStoreService.class),
mock(ProtectedDataStoreService.class),
this.daoStateService,
mock(ProposalValidatorProvider.class),
true);
// Create a state so that all added/removed Proposals will actually update the tempProposals list.
when(this.periodService.isInPhase(anyInt(), any(DaoPhase.Phase.class))).thenReturn(true);
when(this.daoStateService.isParseBlockChainComplete()).thenReturn(false);
}
private static ProtectedStorageEntry buildProtectedStorageEntry() {
ProtectedStorageEntry protectedStorageEntry = mock(ProtectedStorageEntry.class);
TempProposalPayload tempProposalPayload = mock(TempProposalPayload.class);
Proposal tempProposal = mock(Proposal.class);
when(protectedStorageEntry.getProtectedStoragePayload()).thenReturn(tempProposalPayload);
when(tempProposalPayload.getProposal()).thenReturn(tempProposal);
return protectedStorageEntry;
}
// TESTCASE: If an onRemoved callback is called which does not remove anything the tempProposals listeners
// are not signaled.
@Test
public void onRemoved_noSignalIfNoChange() {
this.proposalService.onRemoved(Collections.singletonList(mock(ProtectedStorageEntry.class)));
verify(this.tempProposalListener, never()).onChanged(any());
}
// TESTCASE: If an onRemoved callback is called with 1 element AND it creates a remove of 1 element, the tempProposal
// listeners are signaled once.
@Test
public void onRemoved_signalOnceOnOneChange() {
ProtectedStorageEntry one = buildProtectedStorageEntry();
this.proposalService.onAdded(Collections.singletonList(one));
this.proposalService.getTempProposals().addListener(this.tempProposalListener);
this.proposalService.onRemoved(Collections.singletonList(one));
verify(this.tempProposalListener).onChanged(any());
}
// TESTCASE: If an onRemoved callback is called with 2 elements AND it creates a remove of 2 elements, the
// tempProposal listeners are signaled once.
@Test
public void onRemoved_signalOnceOnMultipleChanges() {
ProtectedStorageEntry one = buildProtectedStorageEntry();
ProtectedStorageEntry two = buildProtectedStorageEntry();
this.proposalService.onAdded(Arrays.asList(one, two));
this.proposalService.getTempProposals().addListener(this.tempProposalListener);
this.proposalService.onRemoved(Arrays.asList(one, two));
verify(this.tempProposalListener).onChanged(any());
}
}

View File

@ -105,5 +105,6 @@ public class P2PModule extends AppModule {
bindConstant().annotatedWith(named(NetworkOptionKeys.MSG_THROTTLE_PER_10_SEC)).to(environment.getRequiredProperty(NetworkOptionKeys.MSG_THROTTLE_PER_10_SEC));
bindConstant().annotatedWith(named(NetworkOptionKeys.SEND_MSG_THROTTLE_TRIGGER)).to(environment.getRequiredProperty(NetworkOptionKeys.SEND_MSG_THROTTLE_TRIGGER));
bindConstant().annotatedWith(named(NetworkOptionKeys.SEND_MSG_THROTTLE_SLEEP)).to(environment.getRequiredProperty(NetworkOptionKeys.SEND_MSG_THROTTLE_SLEEP));
bindConstant().annotatedWith(named("MAX_SEQUENCE_NUMBER_MAP_SIZE_BEFORE_PURGE")).to(1000);
}
}

View File

@ -75,6 +75,7 @@ import javafx.beans.property.SimpleIntegerProperty;
import java.security.PublicKey;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@ -432,15 +433,15 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onAdded(ProtectedStorageEntry protectedStorageEntry) {
if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry)
processMailboxEntry((ProtectedMailboxStorageEntry) protectedStorageEntry);
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry)
processMailboxEntry((ProtectedMailboxStorageEntry) protectedStorageEntry);
});
}
@Override
public void onRemoved(ProtectedStorageEntry data) {
}
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) { }
///////////////////////////////////////////////////////////////////////////////////////////
// DirectMessages

View File

@ -132,7 +132,7 @@ class RequestDataHandler implements MessageListener {
.map(e -> e.bytes)
.collect(Collectors.toSet());
Set<byte[]> excludedKeysFromPersistedEntryMap = dataStorage.getProtectedDataStoreMap().keySet()
Set<byte[]> excludedKeysFromPersistedEntryMap = dataStorage.getMap().keySet()
.stream()
.map(e -> e.bytes)
.collect(Collectors.toSet());

View File

@ -19,17 +19,11 @@ package bisq.network.p2p.storage;
import bisq.network.p2p.storage.payload.ProtectedStorageEntry;
import java.util.Collection;
public interface HashMapChangedListener {
void onAdded(ProtectedStorageEntry data);
void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries);
@SuppressWarnings("UnusedParameters")
void onRemoved(ProtectedStorageEntry data);
// We process all expired entries after a delay (60 s) after onBootstrapComplete.
// We notify listeners of start and completion so they can optimize to only update after batch processing is done.
default void onBatchRemoveExpiredDataStarted() {
}
default void onBatchRemoveExpiredDataCompleted() {
}
void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries);
}

View File

@ -63,9 +63,13 @@ import bisq.common.util.Utilities;
import com.google.protobuf.ByteString;
import com.google.inject.name.Named;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import java.security.KeyPair;
import java.security.PublicKey;
@ -73,6 +77,8 @@ import java.security.PublicKey;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
@ -115,13 +121,17 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
private Timer removeExpiredEntriesTimer;
private final Storage<SequenceNumberMap> sequenceNumberMapStorage;
private final SequenceNumberMap sequenceNumberMap = new SequenceNumberMap();
@VisibleForTesting
final SequenceNumberMap sequenceNumberMap = new SequenceNumberMap();
private final Set<AppendOnlyDataStoreListener> appendOnlyDataStoreListeners = new CopyOnWriteArraySet<>();
private final Set<ProtectedDataStoreListener> protectedDataStoreListeners = new CopyOnWriteArraySet<>();
private final Clock clock;
protected int maxSequenceNumberMapSizeBeforePurge;
/// The maximum number of items that must exist in the SequenceNumberMap before it is scheduled for a purge
/// which removes entries after PURGE_AGE_DAYS.
private final int maxSequenceNumberMapSizeBeforePurge;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
@ -134,12 +144,14 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
ProtectedDataStoreService protectedDataStoreService,
ResourceDataStoreService resourceDataStoreService,
Storage<SequenceNumberMap> sequenceNumberMapStorage,
Clock clock) {
Clock clock,
@Named("MAX_SEQUENCE_NUMBER_MAP_SIZE_BEFORE_PURGE") int maxSequenceNumberBeforePurge) {
this.broadcaster = broadcaster;
this.appendOnlyDataStoreService = appendOnlyDataStoreService;
this.protectedDataStoreService = protectedDataStoreService;
this.resourceDataStoreService = resourceDataStoreService;
this.clock = clock;
this.maxSequenceNumberMapSizeBeforePurge = maxSequenceNumberBeforePurge;
networkNode.addMessageListener(this);
@ -147,7 +159,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
this.sequenceNumberMapStorage = sequenceNumberMapStorage;
sequenceNumberMapStorage.setNumMaxBackupFiles(5);
this.maxSequenceNumberMapSizeBeforePurge = 1000;
}
@Override
@ -191,17 +202,13 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
.filter(entry -> entry.getValue().isExpired(this.clock))
.collect(Collectors.toCollection(ArrayList::new));
// Batch processing can cause performance issues, so we give listeners a chance to deal with it by notifying
// about start and end of iteration.
hashMapChangedListeners.forEach(HashMapChangedListener::onBatchRemoveExpiredDataStarted);
toRemoveList.forEach(mapEntry -> {
ProtectedStorageEntry protectedStorageEntry = mapEntry.getValue();
ByteArray payloadHash = mapEntry.getKey();
log.debug("We found an expired data entry. We remove the protectedData:\n\t" + Utilities.toTruncatedString(protectedStorageEntry));
removeFromMapAndDataStore(protectedStorageEntry, payloadHash);
// Batch processing can cause performance issues, so do all of the removes first, then update the listeners
// to let them know about the removes.
toRemoveList.forEach(toRemoveItem -> {
log.debug("We found an expired data entry. We remove the protectedData:\n\t" +
Utilities.toTruncatedString(toRemoveItem.getValue()));
});
hashMapChangedListeners.forEach(HashMapChangedListener::onBatchRemoveExpiredDataCompleted);
removeFromMapAndDataStore(toRemoveList);
if (sequenceNumberMap.size() > this.maxSequenceNumberMapSizeBeforePurge)
sequenceNumberMap.setMap(getPurgedSequenceNumberMap(sequenceNumberMap.getMap()));
@ -215,11 +222,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
return appendOnlyDataStoreService.getMap();
}
public Map<P2PDataStorage.ByteArray, ProtectedStorageEntry> getProtectedDataStoreMap() {
return protectedDataStoreService.getMap();
}
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@ -388,23 +390,33 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
return false;
}
// If we have seen a more recent operation for this payload, we ignore the current one
if(!hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload))
ProtectedStorageEntry storedEntry = map.get(hashOfPayload);
// If we have seen a more recent operation for this payload and we have a payload locally, ignore it
if (storedEntry != null &&
!hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload)) {
return false;
}
// We want to allow add operations for equal sequence numbers if we don't have the payload locally. This is
// the case for non-persistent Payloads that need to be reconstructed from peer and seed nodes each startup.
MapValue sequenceNumberMapValue = sequenceNumberMap.get(hashOfPayload);
if (sequenceNumberMapValue != null &&
protectedStorageEntry.getSequenceNumber() < sequenceNumberMapValue.sequenceNr) {
return false;
}
// Verify the ProtectedStorageEntry is well formed and valid for the add operation
if (!protectedStorageEntry.isValidForAddOperation())
return false;
ProtectedStorageEntry storedEntry = map.get(hashOfPayload);
// If we have already seen an Entry with the same hash, verify the metadata is equal
if (storedEntry != null && !protectedStorageEntry.matchesRelevantPubKey(storedEntry))
return false;
// This is an updated entry. Record it and signal listeners.
map.put(hashOfPayload, protectedStorageEntry);
hashMapChangedListeners.forEach(e -> e.onAdded(protectedStorageEntry));
hashMapChangedListeners.forEach(e -> e.onAdded(Collections.singletonList(protectedStorageEntry)));
// Record the updated sequence number and persist it. Higher delay so we can batch more items.
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), this.clock.millis()));
@ -416,8 +428,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
// Persist ProtectedStorageEntrys carrying PersistablePayload payloads and signal listeners on changes
if (protectedStoragePayload instanceof PersistablePayload) {
ByteArray compactHash = P2PDataStorage.getCompactHashAsByteArray(protectedStoragePayload);
ProtectedStorageEntry previous = protectedDataStoreService.putIfAbsent(compactHash, protectedStorageEntry);
ProtectedStorageEntry previous = protectedDataStoreService.putIfAbsent(hashOfPayload, protectedStorageEntry);
if (previous == null)
protectedDataStoreListeners.forEach(e -> e.onAdded(protectedStorageEntry));
}
@ -481,13 +492,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);
// If we don't know about the target of this remove, ignore it
ProtectedStorageEntry storedEntry = map.get(hashOfPayload);
if (storedEntry == null) {
log.debug("Remove data ignored as we don't have an entry for that data.");
return false;
}
// If we have seen a more recent operation for this payload, ignore this one
if (!hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload))
return false;
@ -497,19 +501,26 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
return false;
// If we have already seen an Entry with the same hash, verify the metadata is the same
if (!protectedStorageEntry.matchesRelevantPubKey(storedEntry))
ProtectedStorageEntry storedEntry = map.get(hashOfPayload);
if (storedEntry != null && !protectedStorageEntry.matchesRelevantPubKey(storedEntry))
return false;
// Valid remove entry, do the remove and signal listeners
removeFromMapAndDataStore(protectedStorageEntry, hashOfPayload);
printData("after remove");
// Record the latest sequence number and persist it
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), this.clock.millis()));
sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 300);
maybeAddToRemoveAddOncePayloads(protectedStoragePayload, hashOfPayload);
if (storedEntry != null) {
// Valid remove entry, do the remove and signal listeners
removeFromMapAndDataStore(protectedStorageEntry, hashOfPayload);
} /* else {
// This means the RemoveData or RemoveMailboxData was seen prior to the AddData. We have already updated
// the SequenceNumberMap appropriately so the stale Add will not pass validation, but we still want to
// broadcast the remove to peers so they can update their state appropriately
} */
printData("after remove");
if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry) {
broadcast(new RemoveMailboxDataMessage((ProtectedMailboxStorageEntry) protectedStorageEntry), sender, null, isDataOwner);
} else {
@ -637,19 +648,35 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
///////////////////////////////////////////////////////////////////////////////////////////
private void removeFromMapAndDataStore(ProtectedStorageEntry protectedStorageEntry, ByteArray hashOfPayload) {
map.remove(hashOfPayload);
hashMapChangedListeners.forEach(e -> e.onRemoved(protectedStorageEntry));
removeFromMapAndDataStore(Collections.singletonList(Maps.immutableEntry(hashOfPayload, protectedStorageEntry)));
}
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof PersistablePayload) {
ByteArray compactHash = getCompactHashAsByteArray(protectedStoragePayload);
ProtectedStorageEntry previous = protectedDataStoreService.remove(compactHash, protectedStorageEntry);
if (previous != null) {
protectedDataStoreListeners.forEach(e -> e.onRemoved(protectedStorageEntry));
} else {
log.info("We cannot remove the protectedStorageEntry from the persistedEntryMap as it does not exist.");
private void removeFromMapAndDataStore(
Collection<Map.Entry<ByteArray, ProtectedStorageEntry>> entriesToRemoveWithPayloadHash) {
if (entriesToRemoveWithPayloadHash.isEmpty())
return;
ArrayList<ProtectedStorageEntry> entriesForSignal = new ArrayList<>(entriesToRemoveWithPayloadHash.size());
entriesToRemoveWithPayloadHash.forEach(entryToRemoveWithPayloadHash -> {
ByteArray hashOfPayload = entryToRemoveWithPayloadHash.getKey();
ProtectedStorageEntry protectedStorageEntry = entryToRemoveWithPayloadHash.getValue();
map.remove(hashOfPayload);
entriesForSignal.add(protectedStorageEntry);
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof PersistablePayload) {
ProtectedStorageEntry previous = protectedDataStoreService.remove(hashOfPayload, protectedStorageEntry);
if (previous != null) {
protectedDataStoreListeners.forEach(e -> e.onRemoved(protectedStorageEntry));
} else {
log.info("We cannot remove the protectedStorageEntry from the persistedEntryMap as it does not exist.");
}
}
}
});
hashMapChangedListeners.forEach(e -> e.onRemoved(entriesForSignal));
}
private boolean hasSequenceNrIncreased(int newSequenceNumber, ByteArray hashOfData) {
@ -690,14 +717,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
return new ByteArray(P2PDataStorage.get32ByteHash(data));
}
public static ByteArray getCompactHashAsByteArray(ProtectedStoragePayload protectedStoragePayload) {
return new ByteArray(getCompactHash(protectedStoragePayload));
}
private static byte[] getCompactHash(ProtectedStoragePayload protectedStoragePayload) {
return Hash.getSha256Ripemd160hash(protectedStoragePayload.toProtoMessage().toByteArray());
}
// Get a new map with entries older than PURGE_AGE_DAYS purged from the given map.
private Map<ByteArray, MapValue> getPurgedSequenceNumberMap(Map<ByteArray, MapValue> persisted) {
Map<ByteArray, MapValue> purged = new HashMap<>();

View File

@ -186,9 +186,9 @@ public class P2PDataStorageClientAPITest {
this.testState.mockedStorage.getMailboxDataWithSignedSeqNr(mailboxStoragePayload, receiverKeys, receiverKeys.getPublic());
SavedTestState beforeState = this.testState.saveTestState(protectedMailboxStorageEntry);
Assert.assertFalse(this.testState.mockedStorage.remove(protectedMailboxStorageEntry, TestState.getTestNodeAddress(), true));
Assert.assertTrue(this.testState.mockedStorage.remove(protectedMailboxStorageEntry, TestState.getTestNodeAddress(), true));
this.testState.verifyProtectedStorageRemove(beforeState, protectedMailboxStorageEntry, false, true, true, true);
this.testState.verifyProtectedStorageRemove(beforeState, protectedMailboxStorageEntry, false, false, false, true, true);
}
// TESTCASE: Adding, then removing a mailbox message from the getMailboxDataWithSignedSeqNr API
@ -210,7 +210,7 @@ public class P2PDataStorageClientAPITest {
SavedTestState beforeState = this.testState.saveTestState(protectedMailboxStorageEntry);
Assert.assertTrue(this.testState.mockedStorage.remove(protectedMailboxStorageEntry, TestState.getTestNodeAddress(), true));
this.testState.verifyProtectedStorageRemove(beforeState, protectedMailboxStorageEntry, true, true, true,true);
this.testState.verifyProtectedStorageRemove(beforeState, protectedMailboxStorageEntry, true, true, true, true,true);
}
// TESTCASE: Removing a mailbox message that was added from the onMessage handler
@ -237,6 +237,6 @@ public class P2PDataStorageClientAPITest {
SavedTestState beforeState = this.testState.saveTestState(protectedMailboxStorageEntry);
Assert.assertTrue(this.testState.mockedStorage.remove(protectedMailboxStorageEntry, TestState.getTestNodeAddress(), true));
this.testState.verifyProtectedStorageRemove(beforeState, protectedMailboxStorageEntry, true, true, true,true);
this.testState.verifyProtectedStorageRemove(beforeState, protectedMailboxStorageEntry, true, true, true, true,true);
}
}

View File

@ -55,6 +55,7 @@ import static bisq.network.p2p.storage.TestState.*;
* 2 & 3 Client API [addPersistableNetworkPayload(reBroadcast=(true && false))]
* 4. onMessage() [onMessage(AddPersistableNetworkPayloadMessage)]
*/
@SuppressWarnings("unused")
public class P2PDataStoragePersistableNetworkPayloadTest {
@RunWith(Parameterized.class)

View File

@ -38,6 +38,7 @@ import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.junit.Assert;
@ -65,6 +66,7 @@ import static bisq.network.p2p.storage.TestState.*;
* 1. Client API [addProtectedStorageEntry(), refreshTTL(), remove()]
* 2. onMessage() [AddDataMessage, RefreshOfferMessage, RemoveDataMessage]
*/
@SuppressWarnings("unused")
public class P2PDataStorageProtectedStorageEntryTest {
@RunWith(Parameterized.class)
abstract public static class ProtectedStorageEntryTestBase {
@ -200,7 +202,10 @@ public class P2PDataStorageProtectedStorageEntryTest {
void doProtectedStorageRemoveAndVerify(ProtectedStorageEntry entry,
boolean expectedReturnValue,
boolean expectInternalStateChange) {
boolean expectedHashMapAndDataStoreUpdated,
boolean expectedListenersSignaled,
boolean expectedBroadcast,
boolean expectedSeqNrWrite) {
SavedTestState beforeState = this.testState.saveTestState(entry);
@ -209,7 +214,7 @@ public class P2PDataStorageProtectedStorageEntryTest {
if (!this.useMessageHandler)
Assert.assertEquals(expectedReturnValue, addResult);
this.testState.verifyProtectedStorageRemove(beforeState, entry, expectInternalStateChange, true, true, this.expectIsDataOwner());
this.testState.verifyProtectedStorageRemove(beforeState, entry, expectedHashMapAndDataStoreUpdated, expectedListenersSignaled, expectedBroadcast, expectedSeqNrWrite, this.expectIsDataOwner());
}
/// Valid Add Tests (isValidForAdd() and matchesRelevantPubKey() return true)
@ -262,7 +267,7 @@ public class P2PDataStorageProtectedStorageEntryTest {
ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(1);
doProtectedStorageAddAndVerify(entryForAdd, true, true);
doProtectedStorageRemoveAndVerify(entryForRemove, false, false);
doProtectedStorageRemoveAndVerify(entryForRemove, false, false, false, false, false);
doProtectedStorageAddAndVerify(entryForAdd, false, false);
}
@ -310,7 +315,7 @@ public class P2PDataStorageProtectedStorageEntryTest {
doProtectedStorageAddAndVerify(entryForAdd, true, true);
doProtectedStorageRemoveAndVerify(entryForRemove, false, false);
doProtectedStorageRemoveAndVerify(entryForRemove, false, false, false, false, false);
}
// TESTCASE: Removing an item after successfully added (remove seq # > add seq #)
@ -320,15 +325,15 @@ public class P2PDataStorageProtectedStorageEntryTest {
ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(2);
doProtectedStorageAddAndVerify(entryForAdd, true, true);
doProtectedStorageRemoveAndVerify(entryForRemove, true, true);
doProtectedStorageRemoveAndVerify(entryForRemove, true, true, true, true, true);
}
// TESTCASE: Removing an item before it was added
// TESTCASE: Removing an item before it was added. This triggers a SequenceNumberMap write and broadcast
@Test
public void remove_notExists() {
ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(1);
doProtectedStorageRemoveAndVerify(entryForRemove, false, false);
doProtectedStorageRemoveAndVerify(entryForRemove, true, false, false, true, true);
}
// TESTCASE: Removing an item after successfully adding (remove seq # < add seq #)
@ -338,7 +343,7 @@ public class P2PDataStorageProtectedStorageEntryTest {
ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(1);
doProtectedStorageAddAndVerify(entryForAdd, true, true);
doProtectedStorageRemoveAndVerify(entryForRemove, false, false);
doProtectedStorageRemoveAndVerify(entryForRemove, false, false, false, false, false);
}
// TESTCASE: Add after removed (same seq #)
@ -348,7 +353,7 @@ public class P2PDataStorageProtectedStorageEntryTest {
doProtectedStorageAddAndVerify(entryForAdd, true, true);
ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(2);
doProtectedStorageRemoveAndVerify(entryForRemove, true, true);
doProtectedStorageRemoveAndVerify(entryForRemove, true, true, true, true, true);
doProtectedStorageAddAndVerify(entryForAdd, false, false);
}
@ -360,7 +365,7 @@ public class P2PDataStorageProtectedStorageEntryTest {
doProtectedStorageAddAndVerify(entryForAdd, true, true);
ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(2);
doProtectedStorageRemoveAndVerify(entryForRemove, true, true);
doProtectedStorageRemoveAndVerify(entryForRemove, true, true, true, true, true);
entryForAdd = this.getProtectedStorageEntryForAdd(3);
doProtectedStorageAddAndVerify(entryForAdd, true, true);
@ -375,7 +380,7 @@ public class P2PDataStorageProtectedStorageEntryTest {
doProtectedStorageAddAndVerify(entryForAdd, true, true);
ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(2, false, true);
doProtectedStorageRemoveAndVerify(entryForRemove, false, false);
doProtectedStorageRemoveAndVerify(entryForRemove, false, false, false, false, false);
}
// TESTCASE: Remove fails if Entry is valid for remove, but metadata doesn't match remove target
@ -385,7 +390,7 @@ public class P2PDataStorageProtectedStorageEntryTest {
doProtectedStorageAddAndVerify(entryForAdd, true, true);
ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(2, true, false);
doProtectedStorageRemoveAndVerify(entryForRemove, false, false);
doProtectedStorageRemoveAndVerify(entryForRemove, false, false, false, false, false);
}
// TESTCASE: Remove fails if Entry is not valid for remove and metadata doesn't match remove target
@ -395,7 +400,7 @@ public class P2PDataStorageProtectedStorageEntryTest {
doProtectedStorageAddAndVerify(entryForAdd, true, true);
ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(2, false, false);
doProtectedStorageRemoveAndVerify(entryForRemove, false, false);
doProtectedStorageRemoveAndVerify(entryForRemove, false, false, false, false, false);
}
@ -406,24 +411,42 @@ public class P2PDataStorageProtectedStorageEntryTest {
doProtectedStorageAddAndVerify(entryForAdd, true, true);
ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(3);
doProtectedStorageRemoveAndVerify(entryForRemove, true, true);
doProtectedStorageRemoveAndVerify(entryForRemove, true, true, true, true, true);
entryForAdd = this.getProtectedStorageEntryForAdd(1);
doProtectedStorageAddAndVerify(entryForAdd, false, false);
}
// TESTCASE: Received remove for nonexistent item that was later received
// XXXBUGXXX: There may be cases where removes are reordered with adds (remove during pending GetDataRequest?).
// The proper behavior may be to not add the late messages, but the current code will successfully add them
// even in the AddOncePayload (mailbox) case.
@Test
public void remove_lateAdd() {
ProtectedStorageEntry entryForAdd = this.getProtectedStorageEntryForAdd(1);
ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(2);
doProtectedStorageRemoveAndVerify(entryForRemove, false, false);
this.doRemove(entryForRemove);
doProtectedStorageAddAndVerify(entryForAdd, false, false);
}
// TESTCASE: Invalid remove doesn't block a valid add (isValidForRemove == false | matchesRelevantPubKey == false)
@Test
public void remove_entryNotIsValidForRemoveDoesntBlockAdd1() {
ProtectedStorageEntry entryForAdd = this.getProtectedStorageEntryForAdd(1);
ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(1, false, false);
this.doRemove(entryForRemove);
doProtectedStorageAddAndVerify(entryForAdd, true, true);
}
// TESTCASE: Invalid remove doesn't block a valid add (isValidForRemove == false | matchesRelevantPubKey == true)
@Test
public void remove_entryNotIsValidForRemoveDoesntBlockAdd2() {
ProtectedStorageEntry entryForAdd = this.getProtectedStorageEntryForAdd(1);
ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(1, false, true);
this.doRemove(entryForRemove);
// should be (false, false)
doProtectedStorageAddAndVerify(entryForAdd, true, true);
}
}
@ -539,7 +562,7 @@ public class P2PDataStorageProtectedStorageEntryTest {
ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(2);
doProtectedStorageAddAndVerify(entryForAdd, true, true);
doProtectedStorageRemoveAndVerify(entryForRemove, true, true);
doProtectedStorageRemoveAndVerify(entryForRemove, true, true, true, true, true);
doRefreshTTLAndVerify(buildRefreshOfferMessage(entryForAdd, this.payloadOwnerKeys,3), false, false);
}
@ -553,6 +576,34 @@ public class P2PDataStorageProtectedStorageEntryTest {
KeyPair notOwner = TestUtils.generateKeyPair();
doRefreshTTLAndVerify(buildRefreshOfferMessage(entry, notOwner, 2), false, false);
}
// TESTCASE: After restart, identical sequence numbers are accepted ONCE. We need a way to reconstruct
// in-memory ProtectedStorageEntrys from seed and peer nodes around startup time.
@Test
public void addProtectedStorageEntry_afterRestartCanAddDuplicateSeqNr() {
ProtectedStorageEntry toAdd1 = this.getProtectedStorageEntryForAdd(1);
doProtectedStorageAddAndVerify(toAdd1, true, true);
this.testState.simulateRestart();
// Can add equal seqNr only once
doProtectedStorageAddAndVerify(toAdd1, true, true);
// Can't add equal seqNr twice
doProtectedStorageAddAndVerify(toAdd1, false, false);
}
// TESTCASE: After restart, old sequence numbers are not accepted
@Test
public void addProtectedStorageEntry_afterRestartCanNotAddLowerSeqNr() {
ProtectedStorageEntry toAdd1 = this.getProtectedStorageEntryForAdd(1);
ProtectedStorageEntry toAdd2 = this.getProtectedStorageEntryForAdd(2);
doProtectedStorageAddAndVerify(toAdd2, true, true);
this.testState.simulateRestart();
doProtectedStorageAddAndVerify(toAdd1, false, false);
}
}
/**
@ -570,6 +621,33 @@ public class P2PDataStorageProtectedStorageEntryTest {
return ProtectedStorageEntry.class;
}
// Tests that just apply to PersistablePayload objects
// TESTCASE: Ensure the HashMap is the same before and after a restart
@Test
public void addProtectedStorageEntry_afterReadFromResourcesWithDuplicate_3629RegressionTest() {
ProtectedStorageEntry protectedStorageEntry = this.getProtectedStorageEntryForAdd(1);
doProtectedStorageAddAndVerify(protectedStorageEntry, true, true);
Map<P2PDataStorage.ByteArray, ProtectedStorageEntry> beforeRestart = this.testState.mockedStorage.getMap();
this.testState.simulateRestart();
Assert.assertEquals(beforeRestart, this.testState.mockedStorage.getMap());
}
// TESTCASE: After restart, identical sequence numbers are not accepted for persistent payloads
@Test
public void addProtectedStorageEntry_afterRestartCanNotAddDuplicateSeqNr() {
ProtectedStorageEntry toAdd1 = this.getProtectedStorageEntryForAdd(1);
doProtectedStorageAddAndVerify(toAdd1, true, true);
this.testState.simulateRestart();
// Can add equal seqNr only once
doProtectedStorageAddAndVerify(toAdd1, false, false);
}
}
/**
@ -621,7 +699,7 @@ public class P2PDataStorageProtectedStorageEntryTest {
doProtectedStorageAddAndVerify(entryForAdd, true, true);
ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(2);
doProtectedStorageRemoveAndVerify(entryForRemove, true, true);
doProtectedStorageRemoveAndVerify(entryForRemove, true, true, true, true, true);
entryForAdd = this.getProtectedStorageEntryForAdd(3);
doProtectedStorageAddAndVerify(entryForAdd, false, false);

View File

@ -32,6 +32,7 @@ import bisq.common.crypto.CryptoException;
import java.security.KeyPair;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
@ -66,7 +67,7 @@ public class P2PDataStorageRemoveExpiredTest {
SavedTestState beforeState = this.testState.saveTestState(protectedStorageEntry);
this.testState.mockedStorage.removeExpiredEntries();
this.testState.verifyProtectedStorageRemove(beforeState, protectedStorageEntry, false, false, false, false);
this.testState.verifyProtectedStorageRemove(beforeState, protectedStorageEntry, false, false, false, false, false);
}
// TESTCASE: Correctly skips all PersistableNetworkPayloads since they are not expirable
@ -92,7 +93,7 @@ public class P2PDataStorageRemoveExpiredTest {
SavedTestState beforeState = this.testState.saveTestState(protectedStorageEntry);
this.testState.mockedStorage.removeExpiredEntries();
this.testState.verifyProtectedStorageRemove(beforeState, protectedStorageEntry, false, false, false, false);
this.testState.verifyProtectedStorageRemove(beforeState, protectedStorageEntry, false, false, false, false, false);
}
// TESTCASE: Correctly expires non-persistable entries that are expired
@ -109,7 +110,7 @@ public class P2PDataStorageRemoveExpiredTest {
SavedTestState beforeState = this.testState.saveTestState(protectedStorageEntry);
this.testState.mockedStorage.removeExpiredEntries();
this.testState.verifyProtectedStorageRemove(beforeState, protectedStorageEntry, true, false, false, false);
this.testState.verifyProtectedStorageRemove(beforeState, protectedStorageEntry, true, true, false, false, false);
}
// TESTCASE: Correctly skips persistable entries that are not expired
@ -123,7 +124,7 @@ public class P2PDataStorageRemoveExpiredTest {
SavedTestState beforeState = this.testState.saveTestState(protectedStorageEntry);
this.testState.mockedStorage.removeExpiredEntries();
this.testState.verifyProtectedStorageRemove(beforeState, protectedStorageEntry, false, false, false, false);
this.testState.verifyProtectedStorageRemove(beforeState, protectedStorageEntry, false, false, false, false, false);
}
// TESTCASE: Correctly expires persistable entries that are expired
@ -140,7 +141,7 @@ public class P2PDataStorageRemoveExpiredTest {
SavedTestState beforeState = this.testState.saveTestState(protectedStorageEntry);
this.testState.mockedStorage.removeExpiredEntries();
this.testState.verifyProtectedStorageRemove(beforeState, protectedStorageEntry, true, false, false, false);
this.testState.verifyProtectedStorageRemove(beforeState, protectedStorageEntry, true, true, false, false, false);
}
// TESTCASE: Ensure we try to purge old entries sequence number map when size exceeds the maximum size
@ -149,17 +150,21 @@ public class P2PDataStorageRemoveExpiredTest {
public void removeExpiredEntries_PurgeSeqNrMap() throws CryptoException, NoSuchAlgorithmException {
final int initialClockIncrement = 5;
ArrayList<ProtectedStorageEntry> expectedRemoves = new ArrayList<>();
// Add 4 entries to our sequence number map that will be purged
KeyPair purgedOwnerKeys = TestUtils.generateKeyPair();
ProtectedStoragePayload purgedProtectedStoragePayload = new PersistableExpirableProtectedStoragePayloadStub(purgedOwnerKeys.getPublic(), 0);
ProtectedStorageEntry purgedProtectedStorageEntry = testState.mockedStorage.getProtectedStorageEntry(purgedProtectedStoragePayload, purgedOwnerKeys);
expectedRemoves.add(purgedProtectedStorageEntry);
Assert.assertTrue(testState.mockedStorage.addProtectedStorageEntry(purgedProtectedStorageEntry, TestState.getTestNodeAddress(), null, true));
for (int i = 0; i < 4; ++i) {
for (int i = 0; i < MAX_SEQUENCE_NUMBER_MAP_SIZE_BEFORE_PURGE - 1; ++i) {
KeyPair ownerKeys = TestUtils.generateKeyPair();
ProtectedStoragePayload protectedStoragePayload = new PersistableExpirableProtectedStoragePayloadStub(ownerKeys.getPublic(), 0);
ProtectedStorageEntry tmpEntry = testState.mockedStorage.getProtectedStorageEntry(protectedStoragePayload, ownerKeys);
expectedRemoves.add(tmpEntry);
Assert.assertTrue(testState.mockedStorage.addProtectedStorageEntry(tmpEntry, TestState.getTestNodeAddress(), null, true));
}
@ -171,6 +176,7 @@ public class P2PDataStorageRemoveExpiredTest {
KeyPair keepOwnerKeys = TestUtils.generateKeyPair();
ProtectedStoragePayload keepProtectedStoragePayload = new PersistableExpirableProtectedStoragePayloadStub(keepOwnerKeys.getPublic(), 0);
ProtectedStorageEntry keepProtectedStorageEntry = testState.mockedStorage.getProtectedStorageEntry(keepProtectedStoragePayload, keepOwnerKeys);
expectedRemoves.add(keepProtectedStorageEntry);
Assert.assertTrue(testState.mockedStorage.addProtectedStorageEntry(keepProtectedStorageEntry, TestState.getTestNodeAddress(), null, true));
@ -178,19 +184,9 @@ public class P2PDataStorageRemoveExpiredTest {
// Advance time past it so they will be valid purge targets
this.testState.clockFake.increment(TimeUnit.DAYS.toMillis(P2PDataStorage.PURGE_AGE_DAYS + 1 - initialClockIncrement));
// The first entry (11 days old) should be purged
// The first 4 entries (11 days old) should be purged from the SequenceNumberMap
SavedTestState beforeState = this.testState.saveTestState(purgedProtectedStorageEntry);
this.testState.mockedStorage.removeExpiredEntries();
this.testState.verifyProtectedStorageRemove(beforeState, purgedProtectedStorageEntry, true, false, false, false);
// Which means that an addition of a purged entry should succeed.
beforeState = this.testState.saveTestState(purgedProtectedStorageEntry);
Assert.assertTrue(this.testState.mockedStorage.addProtectedStorageEntry(purgedProtectedStorageEntry, TestState.getTestNodeAddress(), null, false));
this.testState.verifyProtectedStorageAdd(beforeState, purgedProtectedStorageEntry, true, false);
// The second entry (5 days old) should still exist which means trying to add it again should fail.
beforeState = this.testState.saveTestState(keepProtectedStorageEntry);
Assert.assertFalse(this.testState.mockedStorage.addProtectedStorageEntry(keepProtectedStorageEntry, TestState.getTestNodeAddress(), null, false));
this.testState.verifyProtectedStorageAdd(beforeState, keepProtectedStorageEntry, false, false);
this.testState.verifyProtectedStorageRemove(beforeState, expectedRemoves, true, true, false, false, false);
}
}

View File

@ -39,10 +39,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static bisq.network.p2p.storage.TestState.*;
@ -72,7 +69,7 @@ public class P2PDataStoreDisconnectTest {
ProtectedStorageEntry protectedStorageEntry = beforeState.protectedStorageEntryBeforeOp;
currentState.verifyProtectedStorageRemove(beforeState, protectedStorageEntry,
wasRemoved, false, false, false);
wasRemoved, wasRemoved, false, false, false);
if (wasTTLReduced)
Assert.assertTrue(protectedStorageEntry.getCreationTimeStamp() < beforeState.creationTimestampBeforeUpdate);
@ -173,7 +170,7 @@ public class P2PDataStoreDisconnectTest {
class ExpirablePersistentProtectedStoragePayloadStub
extends ExpirableProtectedStoragePayloadStub implements PersistablePayload {
public ExpirablePersistentProtectedStoragePayloadStub(PublicKey ownerPubKey) {
private ExpirablePersistentProtectedStoragePayloadStub(PublicKey ownerPubKey) {
super(ownerPubKey, 0);
}
}

View File

@ -35,7 +35,6 @@ import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
import bisq.network.p2p.storage.payload.ProtectedMailboxStorageEntry;
import bisq.network.p2p.storage.payload.ProtectedStorageEntry;
import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreListener;
import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreService;
import bisq.network.p2p.storage.persistence.ProtectedDataStoreListener;
import bisq.network.p2p.storage.persistence.ProtectedDataStoreService;
import bisq.network.p2p.storage.persistence.ResourceDataStoreService;
@ -47,8 +46,10 @@ import bisq.common.storage.Storage;
import java.security.PublicKey;
import java.time.Clock;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
@ -63,51 +64,88 @@ import static org.mockito.Mockito.*;
* Used in the P2PDataStorage*Test(s) in order to leverage common test set up and validation.
*/
public class TestState {
final P2PDataStorage mockedStorage;
static final int MAX_SEQUENCE_NUMBER_MAP_SIZE_BEFORE_PURGE = 5;
P2PDataStorage mockedStorage;
final Broadcaster mockBroadcaster;
final AppendOnlyDataStoreListener appendOnlyDataStoreListener;
private final ProtectedDataStoreListener protectedDataStoreListener;
final HashMapChangedListener hashMapChangedListener;
private final HashMapChangedListener hashMapChangedListener;
private final Storage<SequenceNumberMap> mockSeqNrStorage;
private final ProtectedDataStoreService protectedDataStoreService;
final ClockFake clockFake;
/**
* Subclass of P2PDataStorage that allows for easier testing, but keeps all functionality
*/
static class P2PDataStorageForTest extends P2PDataStorage {
P2PDataStorageForTest(NetworkNode networkNode,
Broadcaster broadcaster,
AppendOnlyDataStoreService appendOnlyDataStoreService,
ProtectedDataStoreService protectedDataStoreService,
ResourceDataStoreService resourceDataStoreService,
Storage<SequenceNumberMap> sequenceNumberMapStorage,
Clock clock) {
super(networkNode, broadcaster, appendOnlyDataStoreService, protectedDataStoreService, resourceDataStoreService, sequenceNumberMapStorage, clock);
this.maxSequenceNumberMapSizeBeforePurge = 5;
}
}
TestState() {
this.mockBroadcaster = mock(Broadcaster.class);
this.mockSeqNrStorage = mock(Storage.class);
this.clockFake = new ClockFake();
this.protectedDataStoreService = new ProtectedDataStoreServiceFake();
this.mockedStorage = new P2PDataStorageForTest(mock(NetworkNode.class),
this.mockedStorage = new P2PDataStorage(mock(NetworkNode.class),
this.mockBroadcaster,
new AppendOnlyDataStoreServiceFake(),
new ProtectedDataStoreServiceFake(), mock(ResourceDataStoreService.class),
this.mockSeqNrStorage, this.clockFake);
this.protectedDataStoreService, mock(ResourceDataStoreService.class),
this.mockSeqNrStorage, this.clockFake, MAX_SEQUENCE_NUMBER_MAP_SIZE_BEFORE_PURGE);
this.appendOnlyDataStoreListener = mock(AppendOnlyDataStoreListener.class);
this.protectedDataStoreListener = mock(ProtectedDataStoreListener.class);
this.hashMapChangedListener = mock(HashMapChangedListener.class);
this.mockedStorage.addHashMapChangedListener(this.hashMapChangedListener);
this.mockedStorage.addAppendOnlyDataStoreListener(this.appendOnlyDataStoreListener);
this.mockedStorage.addProtectedDataStoreListener(this.protectedDataStoreListener);
this.mockedStorage = createP2PDataStorageForTest(
this.mockBroadcaster,
this.protectedDataStoreService,
this.mockSeqNrStorage,
this.clockFake,
this.hashMapChangedListener,
this.appendOnlyDataStoreListener,
this.protectedDataStoreListener);
}
/**
* Re-initializes the in-memory data structures from the storage objects to simulate a node restarting. Important
* to note that the current TestState uses Test Doubles instead of actual disk storage so this is just "simulating"
* not running the entire storage code paths.
*/
void simulateRestart() {
when(this.mockSeqNrStorage.initAndGetPersisted(any(SequenceNumberMap.class), anyLong()))
.thenReturn(this.mockedStorage.sequenceNumberMap);
this.mockedStorage = createP2PDataStorageForTest(
this.mockBroadcaster,
this.protectedDataStoreService,
this.mockSeqNrStorage,
this.clockFake,
this.hashMapChangedListener,
this.appendOnlyDataStoreListener,
this.protectedDataStoreListener);
}
private static P2PDataStorage createP2PDataStorageForTest(
Broadcaster broadcaster,
ProtectedDataStoreService protectedDataStoreService,
Storage<SequenceNumberMap> sequenceNrMapStorage,
ClockFake clock,
HashMapChangedListener hashMapChangedListener,
AppendOnlyDataStoreListener appendOnlyDataStoreListener,
ProtectedDataStoreListener protectedDataStoreListener) {
P2PDataStorage p2PDataStorage = new P2PDataStorage(mock(NetworkNode.class),
broadcaster,
new AppendOnlyDataStoreServiceFake(),
protectedDataStoreService, mock(ResourceDataStoreService.class),
sequenceNrMapStorage, clock, MAX_SEQUENCE_NUMBER_MAP_SIZE_BEFORE_PURGE);
// Currently TestState only supports reading ProtectedStorageEntries off disk.
p2PDataStorage.readFromResources("unused");
p2PDataStorage.readPersisted();
p2PDataStorage.addHashMapChangedListener(hashMapChangedListener);
p2PDataStorage.addAppendOnlyDataStoreListener(appendOnlyDataStoreListener);
p2PDataStorage.addProtectedDataStoreListener(protectedDataStoreListener);
return p2PDataStorage;
}
private void resetState() {
@ -177,7 +215,6 @@ public class TestState {
boolean expectedStateChange,
boolean expectedIsDataOwner) {
P2PDataStorage.ByteArray hashMapHash = P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload());
P2PDataStorage.ByteArray storageHash = P2PDataStorage.getCompactHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload());
if (expectedStateChange) {
Assert.assertEquals(protectedStorageEntry, this.mockedStorage.getMap().get(hashMapHash));
@ -187,14 +224,14 @@ public class TestState {
// TODO: Should the behavior be identical between this and the HashMap listeners?
// TODO: Do we want ot overwrite stale values in order to persist updated sequence numbers and timestamps?
if (protectedStorageEntry.getProtectedStoragePayload() instanceof PersistablePayload && beforeState.protectedStorageEntryBeforeOpDataStoreMap == null) {
Assert.assertEquals(protectedStorageEntry, this.mockedStorage.getProtectedDataStoreMap().get(storageHash));
Assert.assertEquals(protectedStorageEntry, this.protectedDataStoreService.getMap().get(hashMapHash));
verify(this.protectedDataStoreListener).onAdded(protectedStorageEntry);
} else {
Assert.assertEquals(beforeState.protectedStorageEntryBeforeOpDataStoreMap, this.mockedStorage.getProtectedDataStoreMap().get(storageHash));
Assert.assertEquals(beforeState.protectedStorageEntryBeforeOpDataStoreMap, this.protectedDataStoreService.getMap().get(hashMapHash));
verify(this.protectedDataStoreListener, never()).onAdded(protectedStorageEntry);
}
verify(this.hashMapChangedListener).onAdded(protectedStorageEntry);
verify(this.hashMapChangedListener).onAdded(Collections.singletonList(protectedStorageEntry));
final ArgumentCaptor<BroadcastMessage> captor = ArgumentCaptor.forClass(BroadcastMessage.class);
verify(this.mockBroadcaster).broadcast(captor.capture(), any(NodeAddress.class),
@ -207,12 +244,12 @@ public class TestState {
this.verifySequenceNumberMapWriteContains(P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()), protectedStorageEntry.getSequenceNumber());
} else {
Assert.assertEquals(beforeState.protectedStorageEntryBeforeOp, this.mockedStorage.getMap().get(hashMapHash));
Assert.assertEquals(beforeState.protectedStorageEntryBeforeOpDataStoreMap, this.mockedStorage.getProtectedDataStoreMap().get(storageHash));
Assert.assertEquals(beforeState.protectedStorageEntryBeforeOpDataStoreMap, this.protectedDataStoreService.getMap().get(hashMapHash));
verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), any(NodeAddress.class), any(BroadcastHandler.Listener.class), anyBoolean());
// Internal state didn't change... nothing should be notified
verify(this.hashMapChangedListener, never()).onAdded(protectedStorageEntry);
verify(this.hashMapChangedListener, never()).onAdded(Collections.singletonList(protectedStorageEntry));
verify(this.protectedDataStoreListener, never()).onAdded(protectedStorageEntry);
verify(this.mockSeqNrStorage, never()).queueUpForSave(any(SequenceNumberMap.class), anyLong());
}
@ -220,42 +257,77 @@ public class TestState {
void verifyProtectedStorageRemove(SavedTestState beforeState,
ProtectedStorageEntry protectedStorageEntry,
boolean expectedStateChange,
boolean expectedBroadcastOnStateChange,
boolean expectedSeqNrWriteOnStateChange,
boolean expectedHashMapAndDataStoreUpdated,
boolean expectedListenersSignaled,
boolean expectedBroadcast,
boolean expectedSeqNrWrite,
boolean expectedIsDataOwner) {
P2PDataStorage.ByteArray hashMapHash = P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload());
P2PDataStorage.ByteArray storageHash = P2PDataStorage.getCompactHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload());
if (expectedStateChange) {
Assert.assertNull(this.mockedStorage.getMap().get(hashMapHash));
verifyProtectedStorageRemove(beforeState, Collections.singletonList(protectedStorageEntry),
expectedHashMapAndDataStoreUpdated, expectedListenersSignaled, expectedBroadcast,
expectedSeqNrWrite, expectedIsDataOwner);
}
if (protectedStorageEntry.getProtectedStoragePayload() instanceof PersistablePayload) {
Assert.assertNull(this.mockedStorage.getProtectedDataStoreMap().get(storageHash));
void verifyProtectedStorageRemove(SavedTestState beforeState,
Collection<ProtectedStorageEntry> protectedStorageEntries,
boolean expectedHashMapAndDataStoreUpdated,
boolean expectedListenersSignaled,
boolean expectedBroadcast,
boolean expectedSeqNrWrite,
boolean expectedIsDataOwner) {
verify(this.protectedDataStoreListener).onRemoved(protectedStorageEntry);
}
// The default matcher expects orders to stay the same. So, create a custom matcher function since
// we don't care about the order.
if (expectedListenersSignaled) {
final ArgumentCaptor<Collection<ProtectedStorageEntry>> argument = ArgumentCaptor.forClass(Collection.class);
verify(this.hashMapChangedListener).onRemoved(argument.capture());
verify(this.hashMapChangedListener).onRemoved(protectedStorageEntry);
Set<ProtectedStorageEntry> actual = new HashSet<>(argument.getValue());
Set<ProtectedStorageEntry> expected = new HashSet<>(protectedStorageEntries);
if (expectedSeqNrWriteOnStateChange)
this.verifySequenceNumberMapWriteContains(P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()), protectedStorageEntry.getSequenceNumber());
// Ensure we didn't remove duplicates
Assert.assertEquals(protectedStorageEntries.size(), expected.size());
Assert.assertEquals(argument.getValue().size(), actual.size());
Assert.assertEquals(expected, actual);
} else {
verify(this.hashMapChangedListener, never()).onRemoved(any());
verify(this.protectedDataStoreListener, never()).onAdded(any());
}
if (expectedBroadcastOnStateChange) {
if (!expectedSeqNrWrite)
verify(this.mockSeqNrStorage, never()).queueUpForSave(any(SequenceNumberMap.class), anyLong());
if (!expectedBroadcast)
verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), any(NodeAddress.class), any(BroadcastHandler.Listener.class), anyBoolean());
protectedStorageEntries.forEach(protectedStorageEntry -> {
P2PDataStorage.ByteArray hashMapHash = P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload());
if (expectedSeqNrWrite)
this.verifySequenceNumberMapWriteContains(P2PDataStorage.get32ByteHashAsByteArray(
protectedStorageEntry.getProtectedStoragePayload()), protectedStorageEntry.getSequenceNumber());
if (expectedBroadcast) {
if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry)
verify(this.mockBroadcaster).broadcast(any(RemoveMailboxDataMessage.class), any(NodeAddress.class), eq(null), eq(expectedIsDataOwner));
else
verify(this.mockBroadcaster).broadcast(any(RemoveDataMessage.class), any(NodeAddress.class), eq(null), eq(expectedIsDataOwner));
}
} else {
Assert.assertEquals(beforeState.protectedStorageEntryBeforeOp, this.mockedStorage.getMap().get(hashMapHash));
verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), any(NodeAddress.class), any(BroadcastHandler.Listener.class), anyBoolean());
verify(this.hashMapChangedListener, never()).onAdded(protectedStorageEntry);
verify(this.protectedDataStoreListener, never()).onAdded(protectedStorageEntry);
verify(this.mockSeqNrStorage, never()).queueUpForSave(any(SequenceNumberMap.class), anyLong());
}
if (expectedHashMapAndDataStoreUpdated) {
Assert.assertNull(this.mockedStorage.getMap().get(hashMapHash));
if (protectedStorageEntry.getProtectedStoragePayload() instanceof PersistablePayload) {
Assert.assertNull(this.protectedDataStoreService.getMap().get(hashMapHash));
verify(this.protectedDataStoreListener).onRemoved(protectedStorageEntry);
}
} else {
Assert.assertEquals(beforeState.protectedStorageEntryBeforeOp, this.mockedStorage.getMap().get(hashMapHash));
}
});
}
void verifyRefreshTTL(SavedTestState beforeState,
@ -352,11 +424,10 @@ public class TestState {
private SavedTestState(TestState testState, ProtectedStorageEntry protectedStorageEntry) {
this(testState);
P2PDataStorage.ByteArray storageHash = P2PDataStorage.getCompactHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload());
this.protectedStorageEntryBeforeOpDataStoreMap = testState.mockedStorage.getProtectedDataStoreMap().get(storageHash);
P2PDataStorage.ByteArray hashMapHash = P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload());
this.protectedStorageEntryBeforeOp = testState.mockedStorage.getMap().get(hashMapHash);
this.protectedStorageEntryBeforeOpDataStoreMap = testState.protectedDataStoreService.getMap().get(hashMapHash);
this.creationTimestampBeforeUpdate = (this.protectedStorageEntryBeforeOp != null) ? this.protectedStorageEntryBeforeOp.getCreationTimeStamp() : 0;
}

View File

@ -45,7 +45,7 @@ public class ProtectedStoragePayloadStub implements ProtectedStoragePayload {
@Getter
private PublicKey ownerPubKey;
protected Message messageMock;
protected final Message messageMock;
public ProtectedStoragePayloadStub(PublicKey ownerPubKey) {
this.ownerPubKey = ownerPubKey;