From 6deeecb8464918e48bbce3fca45443ca11621ba6 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Sat, 26 Sep 2020 23:38:01 -0500 Subject: [PATCH] Move handling of mailbox messages from TradeManager to TradeProtocol Make removal of mailbox messages automated in TradeProtocol --- core/src/main/java/bisq/core/trade/Trade.java | 10 -- .../java/bisq/core/trade/TradeManager.java | 44 +-------- .../messages/RefreshTradeStateRequest.java | 3 +- .../messages/TraderSignedWitnessMessage.java | 4 + .../core/trade/protocol/BuyerProtocol.java | 2 +- .../core/trade/protocol/ProcessModel.java | 14 --- .../core/trade/protocol/SellerProtocol.java | 2 +- .../core/trade/protocol/TradeProtocol.java | 99 ++++++++++++++----- ...ssPeerPublishedDelayedPayoutTxMessage.java | 4 - ...essDepositTxAndDelayedPayoutTxMessage.java | 2 - .../BuyerProcessPayoutTxPublishedMessage.java | 2 - ...ProcessMediatedPayoutSignatureMessage.java | 2 - ...ocessMediatedPayoutTxPublishedMessage.java | 2 - ...CounterCurrencyTransferStartedMessage.java | 2 - .../java/bisq/network/p2p/P2PService.java | 8 +- 15 files changed, 91 insertions(+), 109 deletions(-) diff --git a/core/src/main/java/bisq/core/trade/Trade.java b/core/src/main/java/bisq/core/trade/Trade.java index f3c36215c1..a4f49efa4c 100644 --- a/core/src/main/java/bisq/core/trade/Trade.java +++ b/core/src/main/java/bisq/core/trade/Trade.java @@ -33,7 +33,6 @@ import bisq.core.trade.protocol.ProcessModel; import bisq.core.trade.protocol.ProcessModelServiceProvider; import bisq.core.trade.txproof.AssetTxProofResult; -import bisq.network.p2p.DecryptedMessageWithPubKey; import bisq.network.p2p.NodeAddress; import bisq.common.crypto.PubKeyRing; @@ -69,9 +68,7 @@ import javafx.collections.ObservableList; import java.time.temporal.ChronoUnit; import java.util.Date; -import java.util.HashSet; import java.util.Optional; -import java.util.Set; import java.util.stream.Collectors; import lombok.Getter; @@ -399,8 +396,6 @@ public abstract class Trade implements Tradable, Model { transient private ObjectProperty tradeAmountProperty; transient private ObjectProperty tradeVolumeProperty; - @Getter - final transient private Set decryptedMessageWithPubKeySet = new HashSet<>(); // Added in v1.1.6 @Getter @@ -712,10 +707,6 @@ public abstract class Trade implements Tradable, Model { return delayedPayoutTx; } - public void removeDecryptedMessageWithPubKey(DecryptedMessageWithPubKey decryptedMessageWithPubKey) { - decryptedMessageWithPubKeySet.remove(decryptedMessageWithPubKey); - } - public void addAndPersistChatMessage(ChatMessage chatMessage) { if (!chatMessages.contains(chatMessage)) { chatMessages.add(chatMessage); @@ -1208,7 +1199,6 @@ public abstract class Trade implements Tradable, Model { ",\n tradeAmount=" + tradeAmount + ",\n tradeAmountProperty=" + tradeAmountProperty + ",\n tradeVolumeProperty=" + tradeVolumeProperty + - ",\n decryptedMessageWithPubKeySet=" + decryptedMessageWithPubKeySet + ",\n mediationResultState=" + mediationResultState + ",\n mediationResultStateProperty=" + mediationResultStateProperty + ",\n lockTime=" + lockTime + diff --git a/core/src/main/java/bisq/core/trade/TradeManager.java b/core/src/main/java/bisq/core/trade/TradeManager.java index 900e4db64c..30c98ce5fe 100644 --- a/core/src/main/java/bisq/core/trade/TradeManager.java +++ b/core/src/main/java/bisq/core/trade/TradeManager.java @@ -34,7 +34,6 @@ import bisq.core.trade.closed.ClosedTradableManager; import bisq.core.trade.failed.FailedTradesManager; import bisq.core.trade.handlers.TradeResultHandler; import bisq.core.trade.messages.TakeOfferRequest; -import bisq.core.trade.messages.TradeMessage; import bisq.core.trade.protocol.MakerProtocol; import bisq.core.trade.protocol.ProcessModel; import bisq.core.trade.protocol.ProcessModelServiceProvider; @@ -45,14 +44,11 @@ import bisq.core.trade.statistics.TradeStatisticsManager; import bisq.core.user.User; import bisq.core.util.Validator; -import bisq.network.p2p.AckMessage; -import bisq.network.p2p.AckMessageSourceType; import bisq.network.p2p.BootstrapListener; import bisq.network.p2p.DecryptedDirectMessageListener; import bisq.network.p2p.DecryptedMessageWithPubKey; import bisq.network.p2p.NodeAddress; import bisq.network.p2p.P2PService; -import bisq.network.p2p.messaging.DecryptedMailboxListener; import bisq.common.ClockWatcher; import bisq.common.config.Config; @@ -107,7 +103,7 @@ import javax.annotation.Nullable; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -public class TradeManager implements PersistedDataHost, DecryptedDirectMessageListener, DecryptedMailboxListener { +public class TradeManager implements PersistedDataHost, DecryptedDirectMessageListener { private static final Logger log = LoggerFactory.getLogger(TradeManager.class); private final User user; @@ -184,7 +180,6 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi tradableListStorage = storage; p2PService.addDecryptedDirectMessageListener(this); - p2PService.addDecryptedMailboxListener(this); failedTradesManager.setUnFailTradeCallback(this::unFailTrade); } @@ -280,43 +275,6 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi } - /////////////////////////////////////////////////////////////////////////////////////////// - // DecryptedMailboxListener - /////////////////////////////////////////////////////////////////////////////////////////// - - // Might get called at startup after HS is published. Can be before or after initPendingTrades. - @Override - public void onMailboxMessageAdded(DecryptedMessageWithPubKey message, NodeAddress peer) { - NetworkEnvelope networkEnvelope = message.getNetworkEnvelope(); - if (networkEnvelope instanceof TradeMessage) { - TradeMessage tradeMessage = (TradeMessage) networkEnvelope; - getTradeById(tradeMessage.getTradeId()) - .ifPresent(trade -> { - // We don't need to persist the msg as if we don't processes the message it will not be - // removed from the P2P network and we will receive it again on next startup. - // This might happen in edge cases when the user shuts down after we received the msg but - // before it is processed. - //TODO - Set decryptedMessageWithPubKeySet = trade.getDecryptedMessageWithPubKeySet(); - if (!decryptedMessageWithPubKeySet.contains(message)) { - decryptedMessageWithPubKeySet.add(message); - - // The message will be removed after processed - TradeProtocol tradeProtocol = getTradeProtocol(trade); - tradeProtocol.applyMailboxMessage(message); - } - - }); - } else if (networkEnvelope instanceof AckMessage) { - AckMessage ackMessage = (AckMessage) networkEnvelope; - if (ackMessage.getSourceType() == AckMessageSourceType.TRADE_MESSAGE) { - // We remove here the message not in the trade protocol as it might be that the trade is already - // completed and the protocol is not listening. - p2PService.removeEntryFromMailbox(message); - } - } - } - /////////////////////////////////////////////////////////////////////////////////////////// // Lifecycle /////////////////////////////////////////////////////////////////////////////////////////// diff --git a/core/src/main/java/bisq/core/trade/messages/RefreshTradeStateRequest.java b/core/src/main/java/bisq/core/trade/messages/RefreshTradeStateRequest.java index c2a6da5416..f865ea58de 100644 --- a/core/src/main/java/bisq/core/trade/messages/RefreshTradeStateRequest.java +++ b/core/src/main/java/bisq/core/trade/messages/RefreshTradeStateRequest.java @@ -28,7 +28,8 @@ import lombok.Value; * We do the re-sending of the payment sent message via the BuyerSendCounterCurrencyTransferStartedMessage task on the * buyer side, so seller do not need to do anything interactively. */ -@SuppressWarnings("deprecation") +@Deprecated +@SuppressWarnings("ALL") @EqualsAndHashCode(callSuper = true) @Value public class RefreshTradeStateRequest extends TradeMessage implements MailboxMessage { diff --git a/core/src/main/java/bisq/core/trade/messages/TraderSignedWitnessMessage.java b/core/src/main/java/bisq/core/trade/messages/TraderSignedWitnessMessage.java index 56667b1dbf..bae708b577 100644 --- a/core/src/main/java/bisq/core/trade/messages/TraderSignedWitnessMessage.java +++ b/core/src/main/java/bisq/core/trade/messages/TraderSignedWitnessMessage.java @@ -27,6 +27,10 @@ import bisq.common.app.Version; import lombok.EqualsAndHashCode; import lombok.Value; +/** + * Not used anymore since v1.4.0 + */ +@Deprecated @SuppressWarnings("ALL") @EqualsAndHashCode(callSuper = true) @Value diff --git a/core/src/main/java/bisq/core/trade/protocol/BuyerProtocol.java b/core/src/main/java/bisq/core/trade/protocol/BuyerProtocol.java index e69e4a6872..2e7c4d9ac2 100644 --- a/core/src/main/java/bisq/core/trade/protocol/BuyerProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/BuyerProtocol.java @@ -111,7 +111,7 @@ public abstract class BuyerProtocol extends DisputeProtocol { "arrive and the peer repeats sending us the message. We send another ACK msg."); stopTimeout(); sendAckMessage(message, true, null); - processModel.removeMailboxMessageAfterProcessing(trade); + removeMailboxMessageAfterProcessing(message); })) .setup(tasks(BuyerProcessDepositTxAndDelayedPayoutTxMessage.class, BuyerVerifiesFinalDelayedPayoutTx.class, diff --git a/core/src/main/java/bisq/core/trade/protocol/ProcessModel.java b/core/src/main/java/bisq/core/trade/protocol/ProcessModel.java index 150db21210..445dc02545 100644 --- a/core/src/main/java/bisq/core/trade/protocol/ProcessModel.java +++ b/core/src/main/java/bisq/core/trade/protocol/ProcessModel.java @@ -42,8 +42,6 @@ import bisq.core.trade.statistics.TradeStatisticsManager; import bisq.core.user.User; import bisq.network.p2p.AckMessage; -import bisq.network.p2p.DecryptedMessageWithPubKey; -import bisq.network.p2p.MailboxMessage; import bisq.network.p2p.NodeAddress; import bisq.network.p2p.P2PService; @@ -91,8 +89,6 @@ public class ProcessModel implements Model, PersistablePayload { transient private Transaction takeOfferFeeTx; @Setter transient private TradeMessage tradeMessage; - @Setter - transient private DecryptedMessageWithPubKey decryptedMessageWithPubKey; // Added in v1.2.0 @Setter @@ -251,16 +247,6 @@ public class ProcessModel implements Model, PersistablePayload { // API /////////////////////////////////////////////////////////////////////////////////////////// - public void removeMailboxMessageAfterProcessing(Trade trade) { - if (tradeMessage instanceof MailboxMessage && - decryptedMessageWithPubKey != null && - decryptedMessageWithPubKey.getNetworkEnvelope().equals(tradeMessage)) { - log.debug("Remove decryptedMsgWithPubKey from P2P network. decryptedMsgWithPubKey = " + decryptedMessageWithPubKey); - getP2PService().removeEntryFromMailbox(decryptedMessageWithPubKey); - trade.removeDecryptedMessageWithPubKey(decryptedMessageWithPubKey); - } - } - @Override public void persist() { log.warn("persist is not implemented in that class"); diff --git a/core/src/main/java/bisq/core/trade/protocol/SellerProtocol.java b/core/src/main/java/bisq/core/trade/protocol/SellerProtocol.java index b7ec415ffa..d8a06cfd4d 100644 --- a/core/src/main/java/bisq/core/trade/protocol/SellerProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/SellerProtocol.java @@ -106,7 +106,7 @@ public abstract class SellerProtocol extends DisputeProtocol { "so we ignore the message. This can happen if the ACK message to the peer did not " + "arrive and the peer repeats sending us the message. We send another ACK msg."); sendAckMessage(message, true, null); - processModel.removeMailboxMessageAfterProcessing(trade); + removeMailboxMessageAfterProcessing(message); })) .setup(tasks( SellerProcessCounterCurrencyTransferStartedMessage.class, diff --git a/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java b/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java index 3f53833b3b..cacb643f69 100644 --- a/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java @@ -31,6 +31,7 @@ import bisq.network.p2p.DecryptedMessageWithPubKey; import bisq.network.p2p.MailboxMessage; import bisq.network.p2p.NodeAddress; import bisq.network.p2p.SendMailboxMessageListener; +import bisq.network.p2p.messaging.DecryptedMailboxListener; import bisq.common.Timer; import bisq.common.UserThread; @@ -38,19 +39,18 @@ import bisq.common.crypto.PubKeyRing; import bisq.common.proto.network.NetworkEnvelope; import bisq.common.taskrunner.Task; -import java.util.HashSet; +import java.security.PublicKey; import lombok.extern.slf4j.Slf4j; import javax.annotation.Nullable; @Slf4j -public abstract class TradeProtocol implements DecryptedDirectMessageListener { +public abstract class TradeProtocol implements DecryptedDirectMessageListener, DecryptedMailboxListener { protected final ProcessModel processModel; protected final Trade trade; private Timer timeoutTimer; - private boolean initialized; /////////////////////////////////////////////////////////////////////////////////////////// @@ -69,7 +69,6 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener { public void initialize(ProcessModelServiceProvider serviceProvider, TradeManager tradeManager, Offer offer) { processModel.applyTransient(serviceProvider, tradeManager, offer); - initialized = true; onInitialized(); } @@ -77,29 +76,16 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener { if (!trade.isWithdrawn()) { processModel.getP2PService().addDecryptedDirectMessageListener(this); } - - // Apply mailbox messages - // Clone to avoid ConcurrentModificationException. We remove items at the applyMailboxMessage call... - new HashSet<>(trade.getDecryptedMessageWithPubKeySet()).forEach(this::applyMailboxMessage); + processModel.getP2PService().addDecryptedMailboxListener(this); + processModel.getP2PService().getMailboxMap().values() + .stream().map(e -> e.second) + .forEach(this::handleDecryptedMessageWithPubKey); } public void onWithdrawCompleted() { cleanup(); } - public void applyMailboxMessage(DecryptedMessageWithPubKey message) { - if (initialized && isPubKeyValid(message)) { - NetworkEnvelope networkEnvelope = message.getNetworkEnvelope(); - if (networkEnvelope instanceof MailboxMessage && - networkEnvelope instanceof TradeMessage) { - processModel.setDecryptedMessageWithPubKey(message); - TradeMessage tradeMessage = (TradeMessage) networkEnvelope; - NodeAddress peerNodeAddress = ((MailboxMessage) networkEnvelope).getSenderNodeAddress(); - onMailboxMessage(tradeMessage, peerNodeAddress); - } - } - } - protected void onMailboxMessage(TradeMessage message, NodeAddress peerNodeAddress) { log.info("Received {} as MailboxMessage from {} with tradeId {} and uid {}", message.getClass().getSimpleName(), peerNodeAddress, message.getTradeId(), message.getUid()); @@ -117,11 +103,75 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener { if (networkEnvelope instanceof TradeMessage && isMyTradeMessage((TradeMessage) networkEnvelope)) { onTradeMessage((TradeMessage) networkEnvelope, peer); } else if (networkEnvelope instanceof AckMessage) { - onAckMessage((AckMessage) networkEnvelope, peer); + AckMessage ackMessage = (AckMessage) networkEnvelope; + if (ackMessage.getSourceType() == AckMessageSourceType.TRADE_MESSAGE) { + onAckMessage((AckMessage) networkEnvelope, peer); + } } } } + /////////////////////////////////////////////////////////////////////////////////////////// + // DecryptedMailboxListener + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onMailboxMessageAdded(DecryptedMessageWithPubKey message, NodeAddress peer) { + handleDecryptedMessageWithPubKey(message, peer); + } + + private void handleDecryptedMessageWithPubKey(DecryptedMessageWithPubKey decryptedMessageWithPubKey) { + MailboxMessage mailboxMessage = (MailboxMessage) decryptedMessageWithPubKey.getNetworkEnvelope(); + NodeAddress senderNodeAddress = mailboxMessage.getSenderNodeAddress(); + handleDecryptedMessageWithPubKey(decryptedMessageWithPubKey, senderNodeAddress); + } + + protected void handleDecryptedMessageWithPubKey(DecryptedMessageWithPubKey decryptedMessageWithPubKey, + NodeAddress peer) { + if (!isPubKeyValid(decryptedMessageWithPubKey)) { + return; + } + + NetworkEnvelope networkEnvelope = decryptedMessageWithPubKey.getNetworkEnvelope(); + if (networkEnvelope instanceof TradeMessage) { + TradeMessage tradeMessage = (TradeMessage) networkEnvelope; + if (!isMyTradeMessage(tradeMessage)) { + return; + } + + if (trade.isWithdrawn()) { + processModel.getP2PService().removeEntryFromMailbox(decryptedMessageWithPubKey); + log.info("Remove {} from the P2P network.", tradeMessage.getClass().getSimpleName()); + return; + } + + onMailboxMessage(tradeMessage, peer); + } else if (networkEnvelope instanceof AckMessage) { + AckMessage ackMessage = (AckMessage) networkEnvelope; + if (ackMessage.getSourceType() == AckMessageSourceType.TRADE_MESSAGE && + ackMessage.getSourceId().equals(trade.getId())) { + if (!trade.isWithdrawn()) { + onAckMessage((AckMessage) networkEnvelope, peer); + } + processModel.getP2PService().removeEntryFromMailbox(decryptedMessageWithPubKey); + log.info("Remove {} from the P2P network.", ackMessage.getClass().getSimpleName()); + } + } + } + + public void removeMailboxMessageAfterProcessing(TradeMessage tradeMessage) { + if (tradeMessage instanceof MailboxMessage && + processModel.getTradingPeer() != null && + processModel.getTradingPeer().getPubKeyRing() != null && + processModel.getTradingPeer().getPubKeyRing().getSignaturePubKey() != null) { + PublicKey sigPubKey = processModel.getTradingPeer().getPubKeyRing().getSignaturePubKey(); + // We reconstruct the DecryptedMessageWithPubKey from the message and the peers signature pubKey + DecryptedMessageWithPubKey decryptedMessageWithPubKey = new DecryptedMessageWithPubKey(tradeMessage, sigPubKey); + processModel.getP2PService().removeEntryFromMailbox(decryptedMessageWithPubKey); + log.info("Remove {} from the P2P network.", tradeMessage.getClass().getSimpleName()); + } + } + /////////////////////////////////////////////////////////////////////////////////////////// // Abstract @@ -313,6 +363,11 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener { log.info("TaskRunner successfully completed. Triggered from {}, tradeId={}", source, trade.getId()); if (message != null) { sendAckMessage(message, true, null); + + // Once a taskRunner is completed we remove the mailbox message. To not remove it directly at the task + // adds some resilience in case of minor errors, so after a restart the mailbox message can be applied + // again. + removeMailboxMessageAfterProcessing(message); } } diff --git a/core/src/main/java/bisq/core/trade/protocol/tasks/ProcessPeerPublishedDelayedPayoutTxMessage.java b/core/src/main/java/bisq/core/trade/protocol/tasks/ProcessPeerPublishedDelayedPayoutTxMessage.java index 385e863850..a104b433b8 100644 --- a/core/src/main/java/bisq/core/trade/protocol/tasks/ProcessPeerPublishedDelayedPayoutTxMessage.java +++ b/core/src/main/java/bisq/core/trade/protocol/tasks/ProcessPeerPublishedDelayedPayoutTxMessage.java @@ -52,13 +52,9 @@ public class ProcessPeerPublishedDelayedPayoutTxMessage extends TradeTask { Transaction delayedPayoutTx = checkNotNull(trade.getDelayedPayoutTx()); WalletService.maybeAddSelfTxToWallet(delayedPayoutTx, processModel.getBtcWalletService().getWallet()); - // todo trade.setState - complete(); } catch (Throwable t) { failed(t); - } finally { - processModel.removeMailboxMessageAfterProcessing(trade); } } } diff --git a/core/src/main/java/bisq/core/trade/protocol/tasks/buyer/BuyerProcessDepositTxAndDelayedPayoutTxMessage.java b/core/src/main/java/bisq/core/trade/protocol/tasks/buyer/BuyerProcessDepositTxAndDelayedPayoutTxMessage.java index 2bfea25a9c..dfecf3abde 100644 --- a/core/src/main/java/bisq/core/trade/protocol/tasks/buyer/BuyerProcessDepositTxAndDelayedPayoutTxMessage.java +++ b/core/src/main/java/bisq/core/trade/protocol/tasks/buyer/BuyerProcessDepositTxAndDelayedPayoutTxMessage.java @@ -76,8 +76,6 @@ public class BuyerProcessDepositTxAndDelayedPayoutTxMessage extends TradeTask { complete(); } catch (Throwable t) { failed(t); - } finally { - processModel.removeMailboxMessageAfterProcessing(trade); } } } diff --git a/core/src/main/java/bisq/core/trade/protocol/tasks/buyer/BuyerProcessPayoutTxPublishedMessage.java b/core/src/main/java/bisq/core/trade/protocol/tasks/buyer/BuyerProcessPayoutTxPublishedMessage.java index f82bb916be..45cd777a21 100644 --- a/core/src/main/java/bisq/core/trade/protocol/tasks/buyer/BuyerProcessPayoutTxPublishedMessage.java +++ b/core/src/main/java/bisq/core/trade/protocol/tasks/buyer/BuyerProcessPayoutTxPublishedMessage.java @@ -76,8 +76,6 @@ public class BuyerProcessPayoutTxPublishedMessage extends TradeTask { complete(); } catch (Throwable t) { failed(t); - } finally { - processModel.removeMailboxMessageAfterProcessing(trade); } } } diff --git a/core/src/main/java/bisq/core/trade/protocol/tasks/mediation/ProcessMediatedPayoutSignatureMessage.java b/core/src/main/java/bisq/core/trade/protocol/tasks/mediation/ProcessMediatedPayoutSignatureMessage.java index a049d0932d..ea1a02a02a 100644 --- a/core/src/main/java/bisq/core/trade/protocol/tasks/mediation/ProcessMediatedPayoutSignatureMessage.java +++ b/core/src/main/java/bisq/core/trade/protocol/tasks/mediation/ProcessMediatedPayoutSignatureMessage.java @@ -54,8 +54,6 @@ public class ProcessMediatedPayoutSignatureMessage extends TradeTask { complete(); } catch (Throwable t) { failed(t); - } finally { - processModel.removeMailboxMessageAfterProcessing(trade); } } } diff --git a/core/src/main/java/bisq/core/trade/protocol/tasks/mediation/ProcessMediatedPayoutTxPublishedMessage.java b/core/src/main/java/bisq/core/trade/protocol/tasks/mediation/ProcessMediatedPayoutTxPublishedMessage.java index 9fdcc4b75c..226fefbb5a 100644 --- a/core/src/main/java/bisq/core/trade/protocol/tasks/mediation/ProcessMediatedPayoutTxPublishedMessage.java +++ b/core/src/main/java/bisq/core/trade/protocol/tasks/mediation/ProcessMediatedPayoutTxPublishedMessage.java @@ -77,8 +77,6 @@ public class ProcessMediatedPayoutTxPublishedMessage extends TradeTask { complete(); } catch (Throwable t) { failed(t); - } finally { - processModel.removeMailboxMessageAfterProcessing(trade); } } } diff --git a/core/src/main/java/bisq/core/trade/protocol/tasks/seller/SellerProcessCounterCurrencyTransferStartedMessage.java b/core/src/main/java/bisq/core/trade/protocol/tasks/seller/SellerProcessCounterCurrencyTransferStartedMessage.java index d2a1296445..5f8d6bc50e 100644 --- a/core/src/main/java/bisq/core/trade/protocol/tasks/seller/SellerProcessCounterCurrencyTransferStartedMessage.java +++ b/core/src/main/java/bisq/core/trade/protocol/tasks/seller/SellerProcessCounterCurrencyTransferStartedMessage.java @@ -64,8 +64,6 @@ public class SellerProcessCounterCurrencyTransferStartedMessage extends TradeTas complete(); } catch (Throwable t) { failed(t); - } finally { - processModel.removeMailboxMessageAfterProcessing(trade); } } } diff --git a/p2p/src/main/java/bisq/network/p2p/P2PService.java b/p2p/src/main/java/bisq/network/p2p/P2PService.java index 8eb0df0903..4d2a5ef902 100644 --- a/p2p/src/main/java/bisq/network/p2p/P2PService.java +++ b/p2p/src/main/java/bisq/network/p2p/P2PService.java @@ -52,6 +52,7 @@ import bisq.common.crypto.PubKeyRing; import bisq.common.proto.ProtobufferException; import bisq.common.proto.network.NetworkEnvelope; import bisq.common.proto.persistable.PersistedDataHost; +import bisq.common.util.Tuple2; import com.google.inject.Inject; @@ -116,7 +117,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis private final Set decryptedDirectMessageListeners = new CopyOnWriteArraySet<>(); private final Set decryptedMailboxListeners = new CopyOnWriteArraySet<>(); private final Set p2pServiceListeners = new CopyOnWriteArraySet<>(); - private final Map mailboxMap = new HashMap<>(); + @Getter + private final Map> mailboxMap = new HashMap<>(); private final Set shutDownResultHandlers = new CopyOnWriteArraySet<>(); private final BooleanProperty hiddenServicePublished = new SimpleBooleanProperty(); private final BooleanProperty preliminaryDataReceived = new SimpleBooleanProperty(); @@ -531,7 +533,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis NodeAddress senderNodeAddress = mailboxMessage.getSenderNodeAddress(); checkNotNull(senderNodeAddress, "senderAddress must not be null for mailbox network_messages"); - mailboxMap.put(mailboxMessage.getUid(), protectedMailboxStorageEntry); + mailboxMap.put(mailboxMessage.getUid(), new Tuple2<>(protectedMailboxStorageEntry, decryptedMessageWithPubKey)); log.info("Received a {} mailbox message with messageUid {} and senderAddress {}", mailboxMessage.getClass().getSimpleName(), mailboxMessage.getUid(), senderNodeAddress); decryptedMailboxListeners.forEach( e -> e.onMailboxMessageAdded(decryptedMessageWithPubKey, senderNodeAddress)); @@ -741,7 +743,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis MailboxMessage mailboxMessage = (MailboxMessage) decryptedMessageWithPubKey.getNetworkEnvelope(); String uid = mailboxMessage.getUid(); if (mailboxMap.containsKey(uid)) { - ProtectedMailboxStorageEntry mailboxData = mailboxMap.get(uid); + ProtectedMailboxStorageEntry mailboxData = mailboxMap.get(uid).first; if (mailboxData != null && mailboxData.getProtectedStoragePayload() instanceof MailboxStoragePayload) { MailboxStoragePayload expirableMailboxStoragePayload = (MailboxStoragePayload) mailboxData.getProtectedStoragePayload(); PublicKey receiversPubKey = mailboxData.getReceiversPubKey();