Remove expire optimization in onDisconnect

We already have a garbage collection thread that runs every minute
to clean up items. Doing it again during onDisconnect is an unnecessary
optimization that adds complexity and caused bugs.

For example, the original implementation did not handle the sequence
number map correctly and was removing entries during a stream iteration.

This also reduces the complexity of testing. There is one code path
responsible for reducing ttls and one code path responsible for
expiring entries. Much easier to reason about.
This commit is contained in:
Julian Knutsen 2019-12-04 17:20:19 -08:00
parent df2e4cc013
commit b166009398
No known key found for this signature in database
GPG Key ID: D85F536DB3615B2D
2 changed files with 24 additions and 99 deletions

View File

@ -448,43 +448,24 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
NodeAddress peersNodeAddress = connection.getPeersNodeAddressOptional().get();
// Retrieve all the eligible payloads based on the node that disconnected
ArrayList<Map.Entry<ByteArray, ProtectedStorageEntry>> toBackDate =
map.entrySet().stream()
.filter(entry -> entry.getValue().getProtectedStoragePayload() instanceof ExpirablePayload)
.filter(entry -> entry.getValue().getProtectedStoragePayload() instanceof RequiresOwnerIsOnlinePayload)
.filter(entry -> ((RequiresOwnerIsOnlinePayload) entry.getValue().getProtectedStoragePayload()).getOwnerNodeAddress().equals(peersNodeAddress))
.collect(Collectors.toCollection(ArrayList::new));
// Backdate each payload
toBackDate.forEach(mapEntry -> {
// We only set the data back by half of the TTL and remove the data only if is has
// expired after that back dating.
// We might get connection drops which are not caused by the node going offline, so
// we give more tolerance with that approach, giving the node the change to
// refresh the TTL with a refresh message.
// We observed those issues during stress tests, but it might have been caused by the
// test set up (many nodes/connections over 1 router)
// TODO investigate what causes the disconnections.
// Usually the are: SOCKET_TIMEOUT ,TERMINATED (EOFException)
log.debug("We remove the data as the data owner got disconnected with " +
"closeConnectionReason=" + closeConnectionReason);
mapEntry.getValue().backDate();
});
// Remove each backdated payload that is now expired
ArrayList<Map.Entry<ByteArray, ProtectedStorageEntry>> toRemoveList =
toBackDate.stream().filter(mapEntry -> mapEntry.getValue().isExpired(this.clock))
.collect(Collectors.toCollection(ArrayList::new));
toRemoveList.forEach(toRemoveItem -> {
log.debug("We found an expired data entry. We remove the protectedData:\n\t" +
Utilities.toTruncatedString(toRemoveItem.getValue()));
});
removeFromMapAndDataStore(toRemoveList);
if (sequenceNumberMap.size() > this.maxSequenceNumberMapSizeBeforePurge)
sequenceNumberMap.setMap(getPurgedSequenceNumberMap(sequenceNumberMap.getMap()));
// Backdate all the eligible payloads based on the node that disconnected
map.values().stream()
.filter(protectedStorageEntry -> protectedStorageEntry.getProtectedStoragePayload() instanceof ExpirablePayload)
.filter(protectedStorageEntry -> protectedStorageEntry.getProtectedStoragePayload() instanceof RequiresOwnerIsOnlinePayload)
.filter(protectedStorageEntry -> ((RequiresOwnerIsOnlinePayload) protectedStorageEntry.getProtectedStoragePayload()).getOwnerNodeAddress().equals(peersNodeAddress))
.forEach(protectedStorageEntry -> {
// We only set the data back by half of the TTL and remove the data only if is has
// expired after that back dating.
// We might get connection drops which are not caused by the node going offline, so
// we give more tolerance with that approach, giving the node the change to
// refresh the TTL with a refresh message.
// We observed those issues during stress tests, but it might have been caused by the
// test set up (many nodes/connections over 1 router)
// TODO investigate what causes the disconnections.
// Usually the are: SOCKET_TIMEOUT ,TERMINATED (EOFException)
log.debug("Backdating {} due to closeConnectionReason={}", protectedStorageEntry, closeConnectionReason);
protectedStorageEntry.backDate();
});
}
@Override

View File

@ -26,11 +26,9 @@ import bisq.network.p2p.storage.payload.ProtectedStorageEntry;
import bisq.network.p2p.storage.payload.ProtectedStoragePayload;
import bisq.common.crypto.CryptoException;
import bisq.common.proto.persistable.PersistablePayload;
import java.security.KeyPair;
import java.security.NoSuchAlgorithmException;
import java.security.PublicKey;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@ -64,12 +62,11 @@ public class P2PDataStoreDisconnectTest {
private static void verifyStateAfterDisconnect(TestState currentState,
SavedTestState beforeState,
boolean wasRemoved,
boolean wasTTLReduced) {
ProtectedStorageEntry protectedStorageEntry = beforeState.protectedStorageEntryBeforeOp;
currentState.verifyProtectedStorageRemove(beforeState, protectedStorageEntry,
wasRemoved, wasRemoved, false, false);
false, false, false, false);
if (wasTTLReduced)
Assert.assertTrue(protectedStorageEntry.getCreationTimeStamp() < beforeState.creationTimestampBeforeUpdate);
@ -93,7 +90,7 @@ public class P2PDataStoreDisconnectTest {
this.testState.mockedStorage.onDisconnect(CloseConnectionReason.SOCKET_CLOSED, mockedConnection);
verifyStateAfterDisconnect(this.testState, beforeState, false, false);
verifyStateAfterDisconnect(this.testState, beforeState, false);
}
// TESTCASE: Intended disconnects don't trigger expiration
@ -106,7 +103,7 @@ public class P2PDataStoreDisconnectTest {
this.testState.mockedStorage.onDisconnect(CloseConnectionReason.CLOSE_REQUESTED_BY_PEER, mockedConnection);
verifyStateAfterDisconnect(this.testState, beforeState, false, false);
verifyStateAfterDisconnect(this.testState, beforeState, false);
}
// TESTCASE: Peer NodeAddress unknown
@ -120,7 +117,7 @@ public class P2PDataStoreDisconnectTest {
this.testState.mockedStorage.onDisconnect(CloseConnectionReason.SOCKET_CLOSED, mockedConnection);
verifyStateAfterDisconnect(this.testState, beforeState, false, false);
verifyStateAfterDisconnect(this.testState, beforeState, false);
}
// TESTCASE: Unintended disconnects reduce the TTL for entrys that match disconnected peer
@ -134,7 +131,7 @@ public class P2PDataStoreDisconnectTest {
this.testState.mockedStorage.onDisconnect(CloseConnectionReason.SOCKET_CLOSED, mockedConnection);
verifyStateAfterDisconnect(this.testState, beforeState, false, true);
verifyStateAfterDisconnect(this.testState, beforeState, true);
}
// TESTCASE: Unintended disconnects don't reduce TTL for entrys that are not from disconnected peer
@ -148,59 +145,6 @@ public class P2PDataStoreDisconnectTest {
this.testState.mockedStorage.onDisconnect(CloseConnectionReason.SOCKET_CLOSED, mockedConnection);
verifyStateAfterDisconnect(this.testState, beforeState, false, false);
}
// TESTCASE: Unintended disconnects expire entrys that match disconnected peer and TTL is low enough for expire
@Test
public void connectionClosedReduceTTLAndExpireItemsFromPeer() throws NoSuchAlgorithmException, CryptoException {
when(this.mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.of(getTestNodeAddress()));
ProtectedStorageEntry protectedStorageEntry = populateTestState(testState, 2);
SavedTestState beforeState = this.testState.saveTestState(protectedStorageEntry);
// Increment the time by 1 hour which will put the protectedStorageState outside TTL
this.testState.incrementClock();
this.testState.mockedStorage.onDisconnect(CloseConnectionReason.SOCKET_CLOSED, mockedConnection);
verifyStateAfterDisconnect(this.testState, beforeState, true, true);
}
// TESTCASE: ProtectedStoragePayloads implementing the PersistablePayload interface are correctly removed
// from the persistent store during the onDisconnect path.
@Test
public void connectionClosedReduceTTLAndExpireItemsFromPeerPersistable()
throws NoSuchAlgorithmException, CryptoException {
class ExpirablePersistentProtectedStoragePayloadStub
extends ExpirableProtectedStoragePayloadStub implements PersistablePayload {
private ExpirablePersistentProtectedStoragePayloadStub(PublicKey ownerPubKey) {
super(ownerPubKey, 0);
}
}
when(this.mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.of(getTestNodeAddress()));
KeyPair ownerKeys = TestUtils.generateKeyPair();
ProtectedStoragePayload protectedStoragePayload =
new ExpirablePersistentProtectedStoragePayloadStub(ownerKeys.getPublic());
ProtectedStorageEntry protectedStorageEntry =
testState.mockedStorage.getProtectedStorageEntry(protectedStoragePayload, ownerKeys);
testState.mockedStorage.addProtectedStorageEntry(
protectedStorageEntry, getTestNodeAddress(), null);
SavedTestState beforeState = this.testState.saveTestState(protectedStorageEntry);
// Increment the time by 1 hour which will put the protectedStorageState outside TTL
this.testState.incrementClock();
this.testState.mockedStorage.onDisconnect(CloseConnectionReason.SOCKET_CLOSED, mockedConnection);
verifyStateAfterDisconnect(this.testState, beforeState, true, false);
verifyStateAfterDisconnect(this.testState, beforeState, false);
}
}