Refactor P2PDataStorage::onDisconnect

1. Remove delete during stream iteration
2. Minimize branching w/ early returns for bad states
3. Use stream filter for readability
4. Implement additional checks that should be done when removing entries
This commit is contained in:
Julian Knutsen 2019-12-04 17:04:05 -08:00
parent 688405bc6d
commit df2e4cc013
No known key found for this signature in database
GPG Key ID: D85F536DB3615B2D
2 changed files with 50 additions and 52 deletions

View File

@ -440,46 +440,51 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
if (connection.hasPeersNodeAddress() && !closeConnectionReason.isIntended) {
map.values()
.forEach(protectedStorageEntry -> {
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof ExpirablePayload && protectedStoragePayload instanceof RequiresOwnerIsOnlinePayload) {
NodeAddress ownerNodeAddress = ((RequiresOwnerIsOnlinePayload) protectedStoragePayload).getOwnerNodeAddress();
if (connection.getPeersNodeAddressOptional().isPresent() &&
ownerNodeAddress.equals(connection.getPeersNodeAddressOptional().get())) {
// We have a RequiresLiveOwnerData data object with the node address of the
// disconnected peer. We remove that data from our map.
if (closeConnectionReason.isIntended)
return;
// Check if we have the data (e.g. OfferPayload)
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);
boolean containsKey = map.containsKey(hashOfPayload);
if (containsKey) {
log.debug("We remove the data as the data owner got disconnected with " +
"closeConnectionReason=" + closeConnectionReason);
if (!connection.getPeersNodeAddressOptional().isPresent())
return;
// We only set the data back by half of the TTL and remove the data only if is has
// expired after that back dating.
// We might get connection drops which are not caused by the node going offline, so
// we give more tolerance with that approach, giving the node the change to
// refresh the TTL with a refresh message.
// We observed those issues during stress tests, but it might have been caused by the
// test set up (many nodes/connections over 1 router)
// TODO investigate what causes the disconnections.
// Usually the are: SOCKET_TIMEOUT ,TERMINATED (EOFException)
protectedStorageEntry.backDate();
if (protectedStorageEntry.isExpired(this.clock)) {
log.info("We found an expired data entry which we have already back dated. " +
"We remove the protectedStoragePayload:\n\t" + Utilities.toTruncatedString(protectedStorageEntry.getProtectedStoragePayload(), 100));
removeFromMapAndDataStore(protectedStorageEntry, hashOfPayload);
}
} else {
log.debug("Remove data ignored as we don't have an entry for that data.");
}
}
}
});
}
NodeAddress peersNodeAddress = connection.getPeersNodeAddressOptional().get();
// Retrieve all the eligible payloads based on the node that disconnected
ArrayList<Map.Entry<ByteArray, ProtectedStorageEntry>> toBackDate =
map.entrySet().stream()
.filter(entry -> entry.getValue().getProtectedStoragePayload() instanceof ExpirablePayload)
.filter(entry -> entry.getValue().getProtectedStoragePayload() instanceof RequiresOwnerIsOnlinePayload)
.filter(entry -> ((RequiresOwnerIsOnlinePayload) entry.getValue().getProtectedStoragePayload()).getOwnerNodeAddress().equals(peersNodeAddress))
.collect(Collectors.toCollection(ArrayList::new));
// Backdate each payload
toBackDate.forEach(mapEntry -> {
// We only set the data back by half of the TTL and remove the data only if is has
// expired after that back dating.
// We might get connection drops which are not caused by the node going offline, so
// we give more tolerance with that approach, giving the node the change to
// refresh the TTL with a refresh message.
// We observed those issues during stress tests, but it might have been caused by the
// test set up (many nodes/connections over 1 router)
// TODO investigate what causes the disconnections.
// Usually the are: SOCKET_TIMEOUT ,TERMINATED (EOFException)
log.debug("We remove the data as the data owner got disconnected with " +
"closeConnectionReason=" + closeConnectionReason);
mapEntry.getValue().backDate();
});
// Remove each backdated payload that is now expired
ArrayList<Map.Entry<ByteArray, ProtectedStorageEntry>> toRemoveList =
toBackDate.stream().filter(mapEntry -> mapEntry.getValue().isExpired(this.clock))
.collect(Collectors.toCollection(ArrayList::new));
toRemoveList.forEach(toRemoveItem -> {
log.debug("We found an expired data entry. We remove the protectedData:\n\t" +
Utilities.toTruncatedString(toRemoveItem.getValue()));
});
removeFromMapAndDataStore(toRemoveList);
if (sequenceNumberMap.size() > this.maxSequenceNumberMapSizeBeforePurge)
sequenceNumberMap.setMap(getPurgedSequenceNumberMap(sequenceNumberMap.getMap()));
}
@Override

View File

@ -86,8 +86,7 @@ public class P2PDataStoreDisconnectTest {
// TESTCASE: Bad peer info
@Test
public void peerConnectionUnknown() throws CryptoException, NoSuchAlgorithmException {
when(this.mockedConnection.hasPeersNodeAddress()).thenReturn(false);
when(this.mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.empty());
ProtectedStorageEntry protectedStorageEntry = populateTestState(testState, 2);
SavedTestState beforeState = this.testState.saveTestState(protectedStorageEntry);
@ -100,8 +99,7 @@ public class P2PDataStoreDisconnectTest {
// TESTCASE: Intended disconnects don't trigger expiration
@Test
public void connectionClosedIntended() throws CryptoException, NoSuchAlgorithmException {
when(this.mockedConnection.hasPeersNodeAddress()).thenReturn(true);
when(this.mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.of(getTestNodeAddress()));
ProtectedStorageEntry protectedStorageEntry = populateTestState(testState, 2);
SavedTestState beforeState = this.testState.saveTestState(protectedStorageEntry);
@ -114,8 +112,7 @@ public class P2PDataStoreDisconnectTest {
// TESTCASE: Peer NodeAddress unknown
@Test
public void connectionClosedSkipsItemsPeerInfoBadState() throws NoSuchAlgorithmException, CryptoException {
when(this.mockedConnection.hasPeersNodeAddress()).thenReturn(true);
when(mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.empty());
when(this.mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.empty());
ProtectedStorageEntry protectedStorageEntry = populateTestState(testState, 2);
@ -129,8 +126,7 @@ public class P2PDataStoreDisconnectTest {
// TESTCASE: Unintended disconnects reduce the TTL for entrys that match disconnected peer
@Test
public void connectionClosedReduceTTL() throws NoSuchAlgorithmException, CryptoException {
when(this.mockedConnection.hasPeersNodeAddress()).thenReturn(true);
when(mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.of(TestState.getTestNodeAddress()));
when(this.mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.of(getTestNodeAddress()));
ProtectedStorageEntry protectedStorageEntry = populateTestState(testState, TimeUnit.DAYS.toMillis(90));
@ -144,8 +140,7 @@ public class P2PDataStoreDisconnectTest {
// TESTCASE: Unintended disconnects don't reduce TTL for entrys that are not from disconnected peer
@Test
public void connectionClosedSkipsItemsNotFromPeer() throws NoSuchAlgorithmException, CryptoException {
when(this.mockedConnection.hasPeersNodeAddress()).thenReturn(true);
when(mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.of(new NodeAddress("notTestNode", 2020)));
when(this.mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.of(new NodeAddress("notTestNode", 2020)));
ProtectedStorageEntry protectedStorageEntry = populateTestState(testState, 2);
@ -159,8 +154,7 @@ public class P2PDataStoreDisconnectTest {
// TESTCASE: Unintended disconnects expire entrys that match disconnected peer and TTL is low enough for expire
@Test
public void connectionClosedReduceTTLAndExpireItemsFromPeer() throws NoSuchAlgorithmException, CryptoException {
when(this.mockedConnection.hasPeersNodeAddress()).thenReturn(true);
when(mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.of(TestState.getTestNodeAddress()));
when(this.mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.of(getTestNodeAddress()));
ProtectedStorageEntry protectedStorageEntry = populateTestState(testState, 2);
@ -188,8 +182,7 @@ public class P2PDataStoreDisconnectTest {
}
}
when(this.mockedConnection.hasPeersNodeAddress()).thenReturn(true);
when(mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.of(TestState.getTestNodeAddress()));
when(this.mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.of(getTestNodeAddress()));
KeyPair ownerKeys = TestUtils.generateKeyPair();
ProtectedStoragePayload protectedStoragePayload =