From 8b738796dbcded02aa2f7303b5c3f1ffa1260f5a Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Wed, 11 Nov 2020 20:31:42 -0500 Subject: [PATCH] Remove mailbox msg early from network --- .../alert/PrivateNotificationManager.java | 2 +- .../bisq/core/support/SupportManager.java | 4 +- .../trade/closed/CleanupMailboxMessages.java | 8 +- .../core/trade/protocol/TradeProtocol.java | 10 +- .../java/bisq/network/p2p/P2PService.java | 121 ++++++++++-------- .../network/p2p/storage/P2PDataStorage.java | 4 +- 6 files changed, 81 insertions(+), 68 deletions(-) diff --git a/core/src/main/java/bisq/core/alert/PrivateNotificationManager.java b/core/src/main/java/bisq/core/alert/PrivateNotificationManager.java index 467605757d..b009e59eb9 100644 --- a/core/src/main/java/bisq/core/alert/PrivateNotificationManager.java +++ b/core/src/main/java/bisq/core/alert/PrivateNotificationManager.java @@ -133,7 +133,7 @@ public class PrivateNotificationManager { } public void removePrivateNotification() { - p2PService.removeEntryFromMailbox(decryptedMessageWithPubKey); + p2PService.removeMailboxMsg(decryptedMessageWithPubKey); } private boolean isKeyValid(String privKeyString) { diff --git a/core/src/main/java/bisq/core/support/SupportManager.java b/core/src/main/java/bisq/core/support/SupportManager.java index 4017686fa8..20419714d9 100644 --- a/core/src/main/java/bisq/core/support/SupportManager.java +++ b/core/src/main/java/bisq/core/support/SupportManager.java @@ -177,7 +177,7 @@ public abstract class SupportManager { requestPersistence(); if (decryptedMessageWithPubKey != null) - p2PService.removeEntryFromMailbox(decryptedMessageWithPubKey); + p2PService.removeMailboxMsg(decryptedMessageWithPubKey); } } @@ -314,7 +314,7 @@ public abstract class SupportManager { log.debug("decryptedMessageWithPubKey.message " + networkEnvelope); if (networkEnvelope instanceof SupportMessage) { dispatchMessage((SupportMessage) networkEnvelope); - p2PService.removeEntryFromMailbox(decryptedMessageWithPubKey); + p2PService.removeMailboxMsg(decryptedMessageWithPubKey); } else if (networkEnvelope instanceof AckMessage) { onAckMessage((AckMessage) networkEnvelope, decryptedMessageWithPubKey); } diff --git a/core/src/main/java/bisq/core/trade/closed/CleanupMailboxMessages.java b/core/src/main/java/bisq/core/trade/closed/CleanupMailboxMessages.java index 388b670172..5b71684ade 100644 --- a/core/src/main/java/bisq/core/trade/closed/CleanupMailboxMessages.java +++ b/core/src/main/java/bisq/core/trade/closed/CleanupMailboxMessages.java @@ -35,6 +35,9 @@ import java.util.List; import lombok.extern.slf4j.Slf4j; +//TODO with the redesign of mailbox messages that is not required anymore. We leave it for now as we want to minimize +// changes for the 1.5.0 release but we should clean up afterwards... + /** * Util for removing pending mailbox messages in case the trade has been closed by the seller after confirming receipt * and a AckMessage as mailbox message will be sent by the buyer once they go online. In that case the seller's trade @@ -73,8 +76,7 @@ public class CleanupMailboxMessages { } private void cleanupMailboxMessages(List trades) { - p2PService.getMailboxItemsByUid().values() - .stream().map(P2PService.MailboxItem::getDecryptedMessageWithPubKey) + p2PService.getMailBoxMessages() .forEach(message -> handleDecryptedMessageWithPubKey(message, trades)); } @@ -102,7 +104,7 @@ public class CleanupMailboxMessages { private void removeEntryFromMailbox(DecryptedMessageWithPubKey decryptedMessageWithPubKey, Trade trade) { log.info("We found a pending mailbox message ({}) for trade {}. As the trade is closed we remove the mailbox message.", decryptedMessageWithPubKey.getNetworkEnvelope().getClass().getSimpleName(), trade.getId()); - p2PService.removeEntryFromMailbox(decryptedMessageWithPubKey); + p2PService.removeMailboxMsg(decryptedMessageWithPubKey); } private boolean isMyMessage(TradeMessage message, Trade trade) { diff --git a/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java b/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java index af259e25c7..505eca1efd 100644 --- a/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java @@ -30,7 +30,6 @@ import bisq.network.p2p.DecryptedDirectMessageListener; import bisq.network.p2p.DecryptedMessageWithPubKey; import bisq.network.p2p.MailboxMessage; import bisq.network.p2p.NodeAddress; -import bisq.network.p2p.P2PService; import bisq.network.p2p.SendMailboxMessageListener; import bisq.network.p2p.messaging.DecryptedMailboxListener; @@ -78,8 +77,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D processModel.getP2PService().addDecryptedDirectMessageListener(this); } processModel.getP2PService().addDecryptedMailboxListener(this); - processModel.getP2PService().getMailboxItemsByUid().values() - .stream().map(P2PService.MailboxItem::getDecryptedMessageWithPubKey) + processModel.getP2PService().getMailBoxMessages() .forEach(this::handleDecryptedMessageWithPubKey); } @@ -138,7 +136,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D // We only remove here if we have already completed the trade. // Otherwise removal is done after successfully applied the task runner. if (trade.isWithdrawn()) { - processModel.getP2PService().removeEntryFromMailbox(decryptedMessageWithPubKey); + processModel.getP2PService().removeMailboxMsg(decryptedMessageWithPubKey); log.info("Remove {} from the P2P network.", tradeMessage.getClass().getSimpleName()); return; } @@ -152,7 +150,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D onAckMessage((AckMessage) networkEnvelope, peer); } // In any case we remove the msg - processModel.getP2PService().removeEntryFromMailbox(decryptedMessageWithPubKey); + processModel.getP2PService().removeMailboxMsg(decryptedMessageWithPubKey); log.info("Remove {} from the P2P network.", networkEnvelope.getClass().getSimpleName()); } } @@ -165,7 +163,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D PublicKey sigPubKey = processModel.getTradingPeer().getPubKeyRing().getSignaturePubKey(); // We reconstruct the DecryptedMessageWithPubKey from the message and the peers signature pubKey DecryptedMessageWithPubKey decryptedMessageWithPubKey = new DecryptedMessageWithPubKey(tradeMessage, sigPubKey); - processModel.getP2PService().removeEntryFromMailbox(decryptedMessageWithPubKey); + processModel.getP2PService().removeMailboxMsg(decryptedMessageWithPubKey); log.info("Remove {} from the P2P network.", tradeMessage.getClass().getSimpleName()); } } diff --git a/p2p/src/main/java/bisq/network/p2p/P2PService.java b/p2p/src/main/java/bisq/network/p2p/P2PService.java index a290406fbe..bed8d80b86 100644 --- a/p2p/src/main/java/bisq/network/p2p/P2PService.java +++ b/p2p/src/main/java/bisq/network/p2p/P2PService.java @@ -124,8 +124,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis private final Set decryptedDirectMessageListeners = new CopyOnWriteArraySet<>(); private final Set decryptedMailboxListeners = new CopyOnWriteArraySet<>(); private final Set p2pServiceListeners = new CopyOnWriteArraySet<>(); - @Getter - private final Map mailboxItemsByUid = new HashMap<>(); + private final Map> mailboxItemsByUid = new HashMap<>(); private final Set shutDownResultHandlers = new CopyOnWriteArraySet<>(); private final BooleanProperty hiddenServicePublished = new SimpleBooleanProperty(); private final BooleanProperty preliminaryDataReceived = new SimpleBooleanProperty(); @@ -447,7 +446,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis checkArgument(protectedMailboxStorageEntries.size() == 1); var decryptedEntries = new ArrayList<>(getDecryptedEntries(protectedMailboxStorageEntries)); if (decryptedEntries.size() == 1) { - storeMailboxDataAndNotifyListeners(decryptedEntries.get(0)); + processMailboxItem(decryptedEntries.get(0)); } } @@ -466,7 +465,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis Futures.addCallback(future, new FutureCallback<>() { public void onSuccess(Set decryptedMailboxMessageWithEntries) { - UserThread.execute(() -> decryptedMailboxMessageWithEntries.forEach(e -> storeMailboxDataAndNotifyListeners(e))); + UserThread.execute(() -> decryptedMailboxMessageWithEntries.forEach(e -> processMailboxItem(e))); } public void onFailure(@NotNull Throwable throwable) { @@ -502,14 +501,51 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis return null; } - private void storeMailboxDataAndNotifyListeners(MailboxItem mailboxItem) { + private void processMailboxItem(MailboxItem mailboxItem) { DecryptedMessageWithPubKey decryptedMessageWithPubKey = mailboxItem.getDecryptedMessageWithPubKey(); MailboxMessage mailboxMessage = (MailboxMessage) decryptedMessageWithPubKey.getNetworkEnvelope(); + String uid = mailboxMessage.getUid(); + mailboxItemsByUid.putIfAbsent(uid, new ArrayList<>()); + mailboxItemsByUid.get(uid).add(mailboxItem); + NodeAddress sender = mailboxMessage.getSenderNodeAddress(); - mailboxItemsByUid.put(mailboxMessage.getUid(), mailboxItem); log.info("Received a {} mailbox message with uid {} and senderAddress {}", - mailboxMessage.getClass().getSimpleName(), mailboxMessage.getUid(), sender); + mailboxMessage.getClass().getSimpleName(), uid, sender); decryptedMailboxListeners.forEach(e -> e.onMailboxMessageAdded(decryptedMessageWithPubKey, sender)); + + if (isBootstrapped()) { + // After we notified our listeners we remove the data immediately from the network. + // In case the client has not been ready it need to take it via getMailBoxMessages. + removeMailboxEntryFromNetwork(mailboxItem.getProtectedMailboxStorageEntry()); + } else { + log.info("We are not bootstrapped yet, so we remove later once the message got processed."); + } + } + + private void removeMailboxEntryFromNetwork(ProtectedMailboxStorageEntry protectedMailboxStorageEntry) { + MailboxStoragePayload mailboxStoragePayload = (MailboxStoragePayload) protectedMailboxStorageEntry.getProtectedStoragePayload(); + PublicKey receiversPubKey = protectedMailboxStorageEntry.getReceiversPubKey(); + try { + ProtectedMailboxStorageEntry updatedEntry = p2PDataStorage.getMailboxDataWithSignedSeqNr( + mailboxStoragePayload, + keyRing.getSignatureKeyPair(), + receiversPubKey); + + P2PDataStorage.ByteArray hashOfPayload = p2PDataStorage.get32ByteHashAsByteArray(mailboxStoragePayload); + if (p2PDataStorage.getMap().containsKey(hashOfPayload)) { + boolean result = p2PDataStorage.remove(updatedEntry, networkNode.getNodeAddress()); + if (result) { + log.info("Removed mailboxEntry from network"); + } else { + log.warn("Removing mailboxEntry from network failed"); + } + } else { + log.info("The mailboxEntry was already removed earlier."); + } + } catch (CryptoException e) { + e.printStackTrace(); + log.error("Could not remove ProtectedMailboxStorageEntry from network. Error: {}", e.toString()); + } } @@ -736,56 +772,25 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis } } - public void removeEntryFromMailbox(DecryptedMessageWithPubKey decryptedMessageWithPubKey) { - // We need to delay a bit to avoid that we remove our msg then get it from other peers again and reapply it again. - // If we delay the removal we have better chances that repeated network_messages we got from other peers are already filtered - // at the P2PService layer. - // Though we have to check in the client classes to not apply the same message again as there is no guarantee - // when we would get a message again from the network. - try { - UserThread.runAfter(() -> delayedRemoveEntryFromMailbox(decryptedMessageWithPubKey), 2); - } catch (NetworkNotReadyException t) { - // If we called too early it might throw a NetworkNotReadyException. We will try again - try { - UserThread.runAfter(() -> delayedRemoveEntryFromMailbox(decryptedMessageWithPubKey), 60); - } catch (NetworkNotReadyException ignore) { - log.warn("We tried to call delayedRemoveEntryFromMailbox 60 sec. after we received an " + - "NetworkNotReadyException but it failed again. We give up here."); - } - } - } + public void removeMailboxMsg(DecryptedMessageWithPubKey decryptedMessageWithPubKey) { + if (isBootstrapped()) { + // We need to delay a bit to not get a ConcurrentModificationException as we might iterate over + // mailboxItemsByUid while getting called. + UserThread.execute(() -> { + MailboxMessage mailboxMessage = (MailboxMessage) decryptedMessageWithPubKey.getNetworkEnvelope(); + String uid = mailboxMessage.getUid(); + if (mailboxItemsByUid.containsKey(uid)) { + List list = mailboxItemsByUid.get(uid); - private void delayedRemoveEntryFromMailbox(DecryptedMessageWithPubKey decryptedMessageWithPubKey) { - if (!isBootstrapped()) { - // We don't throw an NetworkNotReadyException here. - // This case should not happen anyway as we check for isBootstrapped in the callers. - log.warn("You must have bootstrapped before adding data to the P2P network."); - } - - MailboxMessage mailboxMessage = (MailboxMessage) decryptedMessageWithPubKey.getNetworkEnvelope(); - String uid = mailboxMessage.getUid(); - if (mailboxItemsByUid.containsKey(uid)) { - ProtectedMailboxStorageEntry mailboxData = mailboxItemsByUid.get(uid).getProtectedMailboxStorageEntry(); - if (mailboxData != null && mailboxData.getProtectedStoragePayload() instanceof MailboxStoragePayload) { - MailboxStoragePayload expirableMailboxStoragePayload = (MailboxStoragePayload) mailboxData.getProtectedStoragePayload(); - PublicKey receiversPubKey = mailboxData.getReceiversPubKey(); - checkArgument(receiversPubKey.equals(keyRing.getSignatureKeyPair().getPublic()), - "receiversPubKey is not matching with our key. That must not happen."); - try { - ProtectedMailboxStorageEntry protectedMailboxStorageEntry = p2PDataStorage.getMailboxDataWithSignedSeqNr( - expirableMailboxStoragePayload, - keyRing.getSignatureKeyPair(), - receiversPubKey); - p2PDataStorage.remove(protectedMailboxStorageEntry, networkNode.getNodeAddress()); - } catch (CryptoException e) { - log.error("Signing at getDataWithSignedSeqNr failed. That should never happen."); + // In case we have not been bootstrapped when we tried to remove the message at the time when we + // received the message, we remove it now. + list.forEach(mailboxItem -> removeMailboxEntryFromNetwork(mailboxItem.getProtectedMailboxStorageEntry())); + mailboxItemsByUid.remove(uid); } - - mailboxItemsByUid.remove(uid); - log.info("Removed successfully decryptedMsgWithPubKey. uid={}", uid); - } + }); } else { - log.warn("uid for mailbox entry not found in mailboxMap." + "uid={}", uid); + // In case the network was not ready yet we try again later + UserThread.runAfter(() -> removeMailboxMsg(decryptedMessageWithPubKey), 30); } } @@ -922,6 +927,14 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis .findAny(); } + public Set getMailBoxMessages() { + return mailboxItemsByUid.values().stream() + .filter(list -> !list.isEmpty()) + .map(list -> list.get(0)) + .map(P2PService.MailboxItem::getDecryptedMessageWithPubKey) + .collect(Collectors.toSet()); + } + @Value public class MailboxItem { private final ProtectedMailboxStorageEntry protectedMailboxStorageEntry; diff --git a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java index bba3c44e28..0d6147349c 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java @@ -931,8 +931,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers removeFromMapAndDataStore(Collections.singletonList(Maps.immutableEntry(hashOfPayload, protectedStorageEntry))); } - private void removeFromMapAndDataStore( - Collection> entriesToRemoveWithPayloadHash) { + private void removeFromMapAndDataStore(Collection> entriesToRemoveWithPayloadHash) { if (entriesToRemoveWithPayloadHash.isEmpty()) return;