From df2e4cc0132f5aa8b28ae640962bdbfd9366594d Mon Sep 17 00:00:00 2001 From: Julian Knutsen Date: Wed, 4 Dec 2019 17:04:05 -0800 Subject: [PATCH] Refactor P2PDataStorage::onDisconnect 1. Remove delete during stream iteration 2. Minimize branching w/ early returns for bad states 3. Use stream filter for readability 4. Implement additional checks that should be done when removing entries --- .../network/p2p/storage/P2PDataStorage.java | 81 ++++++++++--------- .../storage/P2PDataStoreDisconnectTest.java | 21 ++--- 2 files changed, 50 insertions(+), 52 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java index 15516da492..9b14f508c0 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java @@ -440,46 +440,51 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers @Override public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { - if (connection.hasPeersNodeAddress() && !closeConnectionReason.isIntended) { - map.values() - .forEach(protectedStorageEntry -> { - ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); - if (protectedStoragePayload instanceof ExpirablePayload && protectedStoragePayload instanceof RequiresOwnerIsOnlinePayload) { - NodeAddress ownerNodeAddress = ((RequiresOwnerIsOnlinePayload) protectedStoragePayload).getOwnerNodeAddress(); - if (connection.getPeersNodeAddressOptional().isPresent() && - ownerNodeAddress.equals(connection.getPeersNodeAddressOptional().get())) { - // We have a RequiresLiveOwnerData data object with the node address of the - // disconnected peer. We remove that data from our map. + if (closeConnectionReason.isIntended) + return; - // Check if we have the data (e.g. OfferPayload) - ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload); - boolean containsKey = map.containsKey(hashOfPayload); - if (containsKey) { - log.debug("We remove the data as the data owner got disconnected with " + - "closeConnectionReason=" + closeConnectionReason); + if (!connection.getPeersNodeAddressOptional().isPresent()) + return; - // We only set the data back by half of the TTL and remove the data only if is has - // expired after that back dating. - // We might get connection drops which are not caused by the node going offline, so - // we give more tolerance with that approach, giving the node the change to - // refresh the TTL with a refresh message. - // We observed those issues during stress tests, but it might have been caused by the - // test set up (many nodes/connections over 1 router) - // TODO investigate what causes the disconnections. - // Usually the are: SOCKET_TIMEOUT ,TERMINATED (EOFException) - protectedStorageEntry.backDate(); - if (protectedStorageEntry.isExpired(this.clock)) { - log.info("We found an expired data entry which we have already back dated. " + - "We remove the protectedStoragePayload:\n\t" + Utilities.toTruncatedString(protectedStorageEntry.getProtectedStoragePayload(), 100)); - removeFromMapAndDataStore(protectedStorageEntry, hashOfPayload); - } - } else { - log.debug("Remove data ignored as we don't have an entry for that data."); - } - } - } - }); - } + NodeAddress peersNodeAddress = connection.getPeersNodeAddressOptional().get(); + + // Retrieve all the eligible payloads based on the node that disconnected + ArrayList> toBackDate = + map.entrySet().stream() + .filter(entry -> entry.getValue().getProtectedStoragePayload() instanceof ExpirablePayload) + .filter(entry -> entry.getValue().getProtectedStoragePayload() instanceof RequiresOwnerIsOnlinePayload) + .filter(entry -> ((RequiresOwnerIsOnlinePayload) entry.getValue().getProtectedStoragePayload()).getOwnerNodeAddress().equals(peersNodeAddress)) + .collect(Collectors.toCollection(ArrayList::new)); + + // Backdate each payload + toBackDate.forEach(mapEntry -> { + // We only set the data back by half of the TTL and remove the data only if is has + // expired after that back dating. + // We might get connection drops which are not caused by the node going offline, so + // we give more tolerance with that approach, giving the node the change to + // refresh the TTL with a refresh message. + // We observed those issues during stress tests, but it might have been caused by the + // test set up (many nodes/connections over 1 router) + // TODO investigate what causes the disconnections. + // Usually the are: SOCKET_TIMEOUT ,TERMINATED (EOFException) + log.debug("We remove the data as the data owner got disconnected with " + + "closeConnectionReason=" + closeConnectionReason); + mapEntry.getValue().backDate(); + }); + + // Remove each backdated payload that is now expired + ArrayList> toRemoveList = + toBackDate.stream().filter(mapEntry -> mapEntry.getValue().isExpired(this.clock)) + .collect(Collectors.toCollection(ArrayList::new)); + + toRemoveList.forEach(toRemoveItem -> { + log.debug("We found an expired data entry. We remove the protectedData:\n\t" + + Utilities.toTruncatedString(toRemoveItem.getValue())); + }); + removeFromMapAndDataStore(toRemoveList); + + if (sequenceNumberMap.size() > this.maxSequenceNumberMapSizeBeforePurge) + sequenceNumberMap.setMap(getPurgedSequenceNumberMap(sequenceNumberMap.getMap())); } @Override diff --git a/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStoreDisconnectTest.java b/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStoreDisconnectTest.java index 11f43ee0a4..4d9deb1bf5 100644 --- a/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStoreDisconnectTest.java +++ b/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStoreDisconnectTest.java @@ -86,8 +86,7 @@ public class P2PDataStoreDisconnectTest { // TESTCASE: Bad peer info @Test public void peerConnectionUnknown() throws CryptoException, NoSuchAlgorithmException { - when(this.mockedConnection.hasPeersNodeAddress()).thenReturn(false); - + when(this.mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.empty()); ProtectedStorageEntry protectedStorageEntry = populateTestState(testState, 2); SavedTestState beforeState = this.testState.saveTestState(protectedStorageEntry); @@ -100,8 +99,7 @@ public class P2PDataStoreDisconnectTest { // TESTCASE: Intended disconnects don't trigger expiration @Test public void connectionClosedIntended() throws CryptoException, NoSuchAlgorithmException { - when(this.mockedConnection.hasPeersNodeAddress()).thenReturn(true); - + when(this.mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.of(getTestNodeAddress())); ProtectedStorageEntry protectedStorageEntry = populateTestState(testState, 2); SavedTestState beforeState = this.testState.saveTestState(protectedStorageEntry); @@ -114,8 +112,7 @@ public class P2PDataStoreDisconnectTest { // TESTCASE: Peer NodeAddress unknown @Test public void connectionClosedSkipsItemsPeerInfoBadState() throws NoSuchAlgorithmException, CryptoException { - when(this.mockedConnection.hasPeersNodeAddress()).thenReturn(true); - when(mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.empty()); + when(this.mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.empty()); ProtectedStorageEntry protectedStorageEntry = populateTestState(testState, 2); @@ -129,8 +126,7 @@ public class P2PDataStoreDisconnectTest { // TESTCASE: Unintended disconnects reduce the TTL for entrys that match disconnected peer @Test public void connectionClosedReduceTTL() throws NoSuchAlgorithmException, CryptoException { - when(this.mockedConnection.hasPeersNodeAddress()).thenReturn(true); - when(mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.of(TestState.getTestNodeAddress())); + when(this.mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.of(getTestNodeAddress())); ProtectedStorageEntry protectedStorageEntry = populateTestState(testState, TimeUnit.DAYS.toMillis(90)); @@ -144,8 +140,7 @@ public class P2PDataStoreDisconnectTest { // TESTCASE: Unintended disconnects don't reduce TTL for entrys that are not from disconnected peer @Test public void connectionClosedSkipsItemsNotFromPeer() throws NoSuchAlgorithmException, CryptoException { - when(this.mockedConnection.hasPeersNodeAddress()).thenReturn(true); - when(mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.of(new NodeAddress("notTestNode", 2020))); + when(this.mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.of(new NodeAddress("notTestNode", 2020))); ProtectedStorageEntry protectedStorageEntry = populateTestState(testState, 2); @@ -159,8 +154,7 @@ public class P2PDataStoreDisconnectTest { // TESTCASE: Unintended disconnects expire entrys that match disconnected peer and TTL is low enough for expire @Test public void connectionClosedReduceTTLAndExpireItemsFromPeer() throws NoSuchAlgorithmException, CryptoException { - when(this.mockedConnection.hasPeersNodeAddress()).thenReturn(true); - when(mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.of(TestState.getTestNodeAddress())); + when(this.mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.of(getTestNodeAddress())); ProtectedStorageEntry protectedStorageEntry = populateTestState(testState, 2); @@ -188,8 +182,7 @@ public class P2PDataStoreDisconnectTest { } } - when(this.mockedConnection.hasPeersNodeAddress()).thenReturn(true); - when(mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.of(TestState.getTestNodeAddress())); + when(this.mockedConnection.getPeersNodeAddressOptional()).thenReturn(Optional.of(getTestNodeAddress())); KeyPair ownerKeys = TestUtils.generateKeyPair(); ProtectedStoragePayload protectedStoragePayload =