Update removeExpiredEntries to remove all items in a batch

This will cause HashMapChangedListeners to receive just one onRemoved()
call for the expire work instead of multiple onRemoved() calls for each
item.

This required a bit of updating for the remove validation in tests so
that it correctly compares onRemoved with multiple items.
This commit is contained in:
Julian Knutsen 2019-11-16 11:50:10 -08:00
parent 489b25aa13
commit eae641ee73
No known key found for this signature in database
GPG key ID: D85F536DB3615B2D
3 changed files with 72 additions and 35 deletions

View file

@ -203,13 +203,11 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
// Batch processing can cause performance issues, so we give listeners a chance to deal with it by notifying
// about start and end of iteration.
hashMapChangedListeners.forEach(HashMapChangedListener::onBatchRemoveExpiredDataStarted);
toRemoveList.forEach(mapEntry -> {
ProtectedStorageEntry protectedStorageEntry = mapEntry.getValue();
ByteArray payloadHash = mapEntry.getKey();
log.debug("We found an expired data entry. We remove the protectedData:\n\t" + Utilities.toTruncatedString(protectedStorageEntry));
removeFromMapAndDataStore(protectedStorageEntry, payloadHash);
toRemoveList.forEach(toRemoveItem -> {
log.debug("We found an expired data entry. We remove the protectedData:\n\t" +
Utilities.toTruncatedString(toRemoveItem.getValue()));
});
removeFromMapAndDataStore(toRemoveList);
hashMapChangedListeners.forEach(HashMapChangedListener::onBatchRemoveExpiredDataCompleted);
if (sequenceNumberMap.size() > this.maxSequenceNumberMapSizeBeforePurge)

View file

@ -32,6 +32,7 @@ import bisq.common.crypto.CryptoException;
import java.security.KeyPair;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
@ -149,10 +150,13 @@ public class P2PDataStorageRemoveExpiredTest {
public void removeExpiredEntries_PurgeSeqNrMap() throws CryptoException, NoSuchAlgorithmException {
final int initialClockIncrement = 5;
ArrayList<ProtectedStorageEntry> expectedRemoves = new ArrayList<>();
// Add 4 entries to our sequence number map that will be purged
KeyPair purgedOwnerKeys = TestUtils.generateKeyPair();
ProtectedStoragePayload purgedProtectedStoragePayload = new PersistableExpirableProtectedStoragePayloadStub(purgedOwnerKeys.getPublic(), 0);
ProtectedStorageEntry purgedProtectedStorageEntry = testState.mockedStorage.getProtectedStorageEntry(purgedProtectedStoragePayload, purgedOwnerKeys);
expectedRemoves.add(purgedProtectedStorageEntry);
Assert.assertTrue(testState.mockedStorage.addProtectedStorageEntry(purgedProtectedStorageEntry, TestState.getTestNodeAddress(), null, true));
@ -160,6 +164,7 @@ public class P2PDataStorageRemoveExpiredTest {
KeyPair ownerKeys = TestUtils.generateKeyPair();
ProtectedStoragePayload protectedStoragePayload = new PersistableExpirableProtectedStoragePayloadStub(ownerKeys.getPublic(), 0);
ProtectedStorageEntry tmpEntry = testState.mockedStorage.getProtectedStorageEntry(protectedStoragePayload, ownerKeys);
expectedRemoves.add(tmpEntry);
Assert.assertTrue(testState.mockedStorage.addProtectedStorageEntry(tmpEntry, TestState.getTestNodeAddress(), null, true));
}
@ -171,6 +176,7 @@ public class P2PDataStorageRemoveExpiredTest {
KeyPair keepOwnerKeys = TestUtils.generateKeyPair();
ProtectedStoragePayload keepProtectedStoragePayload = new PersistableExpirableProtectedStoragePayloadStub(keepOwnerKeys.getPublic(), 0);
ProtectedStorageEntry keepProtectedStorageEntry = testState.mockedStorage.getProtectedStorageEntry(keepProtectedStoragePayload, keepOwnerKeys);
expectedRemoves.add(keepProtectedStorageEntry);
Assert.assertTrue(testState.mockedStorage.addProtectedStorageEntry(keepProtectedStorageEntry, TestState.getTestNodeAddress(), null, true));
@ -178,17 +184,17 @@ public class P2PDataStorageRemoveExpiredTest {
// Advance time past it so they will be valid purge targets
this.testState.clockFake.increment(TimeUnit.DAYS.toMillis(P2PDataStorage.PURGE_AGE_DAYS + 1 - initialClockIncrement));
// The first entry (11 days old) should be purged
// The first 4 entries (11 days old) should be purged from the SequenceNumberMap
SavedTestState beforeState = this.testState.saveTestState(purgedProtectedStorageEntry);
this.testState.mockedStorage.removeExpiredEntries();
this.testState.verifyProtectedStorageRemove(beforeState, purgedProtectedStorageEntry, true, false, false, false);
this.testState.verifyProtectedStorageRemove(beforeState, expectedRemoves, true, false, false, false);
// Which means that an addition of a purged entry should succeed.
beforeState = this.testState.saveTestState(purgedProtectedStorageEntry);
Assert.assertTrue(this.testState.mockedStorage.addProtectedStorageEntry(purgedProtectedStorageEntry, TestState.getTestNodeAddress(), null, false));
this.testState.verifyProtectedStorageAdd(beforeState, purgedProtectedStorageEntry, true, false);
// The second entry (5 days old) should still exist which means trying to add it again should fail.
// The last entry (5 days old) should still exist in the SequenceNumberMap which means trying to add it again should fail.
beforeState = this.testState.saveTestState(keepProtectedStorageEntry);
Assert.assertFalse(this.testState.mockedStorage.addProtectedStorageEntry(keepProtectedStorageEntry, TestState.getTestNodeAddress(), null, false));
this.testState.verifyProtectedStorageAdd(beforeState, keepProtectedStorageEntry, false, false);

View file

@ -45,7 +45,10 @@ import bisq.common.storage.Storage;
import java.security.PublicKey;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
@ -205,38 +208,68 @@ public class TestState {
boolean expectedBroadcastOnStateChange,
boolean expectedSeqNrWriteOnStateChange,
boolean expectedIsDataOwner) {
P2PDataStorage.ByteArray hashMapHash = P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload());
P2PDataStorage.ByteArray storageHash = P2PDataStorage.getCompactHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload());
verifyProtectedStorageRemove(beforeState, Collections.singletonList(protectedStorageEntry),
expectedStateChange, expectedBroadcastOnStateChange,
expectedSeqNrWriteOnStateChange, expectedIsDataOwner);
}
void verifyProtectedStorageRemove(SavedTestState beforeState,
Collection<ProtectedStorageEntry> protectedStorageEntries,
boolean expectedStateChange,
boolean expectedBroadcastOnStateChange,
boolean expectedSeqNrWriteOnStateChange,
boolean expectedIsDataOwner) {
// The default matcher expects orders to stay the same. So, create a custom matcher function since
// we don't care about the order.
if (expectedStateChange) {
Assert.assertNull(this.mockedStorage.getMap().get(hashMapHash));
final ArgumentCaptor<Collection<ProtectedStorageEntry>> argument = ArgumentCaptor.forClass(Collection.class);
verify(this.hashMapChangedListener).onRemoved(argument.capture());
if (protectedStorageEntry.getProtectedStoragePayload() instanceof PersistablePayload) {
Assert.assertNull(this.mockedStorage.getProtectedDataStoreMap().get(storageHash));
verify(this.protectedDataStoreListener).onRemoved(protectedStorageEntry);
}
verify(this.hashMapChangedListener).onRemoved(Collections.singletonList(protectedStorageEntry));
if (expectedSeqNrWriteOnStateChange)
this.verifySequenceNumberMapWriteContains(P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()), protectedStorageEntry.getSequenceNumber());
if (expectedBroadcastOnStateChange) {
if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry)
verify(this.mockBroadcaster).broadcast(any(RemoveMailboxDataMessage.class), any(NodeAddress.class), eq(null), eq(expectedIsDataOwner));
else
verify(this.mockBroadcaster).broadcast(any(RemoveDataMessage.class), any(NodeAddress.class), eq(null), eq(expectedIsDataOwner));
}
Set<ProtectedStorageEntry> actual = new HashSet<>(argument.getValue());
Set<ProtectedStorageEntry> expected = new HashSet<>(protectedStorageEntries);
// Ensure we didn't remove duplicates
Assert.assertEquals(protectedStorageEntries.size(), expected.size());
Assert.assertEquals(argument.getValue().size(), actual.size());
Assert.assertEquals(expected, actual);
} else {
Assert.assertEquals(beforeState.protectedStorageEntryBeforeOp, this.mockedStorage.getMap().get(hashMapHash));
verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), any(NodeAddress.class), any(BroadcastHandler.Listener.class), anyBoolean());
verify(this.hashMapChangedListener, never()).onAdded(Collections.singletonList(protectedStorageEntry));
verify(this.protectedDataStoreListener, never()).onAdded(protectedStorageEntry);
verify(this.mockSeqNrStorage, never()).queueUpForSave(any(SequenceNumberMap.class), anyLong());
verify(this.hashMapChangedListener, never()).onRemoved(any());
}
protectedStorageEntries.forEach(protectedStorageEntry -> {
P2PDataStorage.ByteArray hashMapHash = P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload());
P2PDataStorage.ByteArray storageHash = P2PDataStorage.getCompactHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload());
if (expectedStateChange) {
Assert.assertNull(this.mockedStorage.getMap().get(hashMapHash));
if (protectedStorageEntry.getProtectedStoragePayload() instanceof PersistablePayload) {
Assert.assertNull(this.mockedStorage.getProtectedDataStoreMap().get(storageHash));
verify(this.protectedDataStoreListener).onRemoved(protectedStorageEntry);
}
if (expectedSeqNrWriteOnStateChange)
this.verifySequenceNumberMapWriteContains(P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()), protectedStorageEntry.getSequenceNumber());
if (expectedBroadcastOnStateChange) {
if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry)
verify(this.mockBroadcaster).broadcast(any(RemoveMailboxDataMessage.class), any(NodeAddress.class), eq(null), eq(expectedIsDataOwner));
else
verify(this.mockBroadcaster).broadcast(any(RemoveDataMessage.class), any(NodeAddress.class), eq(null), eq(expectedIsDataOwner));
}
} else {
Assert.assertEquals(beforeState.protectedStorageEntryBeforeOp, this.mockedStorage.getMap().get(hashMapHash));
verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), any(NodeAddress.class), any(BroadcastHandler.Listener.class), anyBoolean());
verify(this.hashMapChangedListener, never()).onAdded(Collections.singletonList(protectedStorageEntry));
verify(this.protectedDataStoreListener, never()).onAdded(protectedStorageEntry);
verify(this.mockSeqNrStorage, never()).queueUpForSave(any(SequenceNumberMap.class), anyLong());
}
});
}
void verifyRefreshTTL(SavedTestState beforeState,