diff --git a/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java b/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java index 97319ff866..c3e124364f 100644 --- a/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java @@ -40,6 +40,8 @@ import bisq.common.crypto.PubKeyRing; import bisq.common.proto.network.NetworkEnvelope; import bisq.common.taskrunner.Task; +import java.util.Collection; +import java.util.Collections; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -86,8 +88,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D // change and leave that for a later PR UserThread.runAfter(() -> { mailboxMessageService.addDecryptedMailboxListener(this); - mailboxMessageService.getMyDecryptedMailboxMessages() - .forEach(this::handleDecryptedMailboxMessageWithPubKey); + handleMailboxCollection(mailboxMessageService.getMyDecryptedMailboxMessages()); }, 100, TimeUnit.MILLISECONDS); } @@ -106,15 +107,19 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D /////////////////////////////////////////////////////////////////////////////////////////// @Override - public void onDirectMessage(DecryptedMessageWithPubKey message, NodeAddress peer) { - NetworkEnvelope networkEnvelope = message.getNetworkEnvelope(); - if (networkEnvelope instanceof TradeMessage && - isMyMessage((TradeMessage) networkEnvelope) && - isPubKeyValid(message)) { + public void onDirectMessage(DecryptedMessageWithPubKey decryptedMessageWithPubKey, NodeAddress peer) { + if (!isPubKeyValid(decryptedMessageWithPubKey)) { + return; + } + + NetworkEnvelope networkEnvelope = decryptedMessageWithPubKey.getNetworkEnvelope(); + if (!isMyMessage(networkEnvelope)) { + return; + } + + if (networkEnvelope instanceof TradeMessage) { onTradeMessage((TradeMessage) networkEnvelope, peer); - } else if (networkEnvelope instanceof AckMessage && - isMyMessage((AckMessage) networkEnvelope) && - isPubKeyValid(message)) { + } else if (networkEnvelope instanceof AckMessage) { onAckMessage((AckMessage) networkEnvelope, peer); } } @@ -125,44 +130,41 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D /////////////////////////////////////////////////////////////////////////////////////////// @Override - public void onMailboxMessageAdded(DecryptedMessageWithPubKey message, NodeAddress peer) { - handleDecryptedMailboxMessageWithPubKey(message, peer); + public void onMailboxMessageAdded(DecryptedMessageWithPubKey decryptedMessageWithPubKey, NodeAddress peer) { + handleMailboxCollection(Collections.singletonList(decryptedMessageWithPubKey)); } - private void handleDecryptedMailboxMessageWithPubKey(DecryptedMessageWithPubKey decryptedMessageWithPubKey) { - MailboxMessage mailboxMessage = (MailboxMessage) decryptedMessageWithPubKey.getNetworkEnvelope(); - NodeAddress senderNodeAddress = mailboxMessage.getSenderNodeAddress(); - handleDecryptedMailboxMessageWithPubKey(decryptedMessageWithPubKey, senderNodeAddress); + private void handleMailboxCollection(Collection collection) { + collection.stream() + .filter(this::isPubKeyValid) + .map(DecryptedMessageWithPubKey::getNetworkEnvelope) + .filter(this::isMyMessage) + .filter(e -> e instanceof MailboxMessage) + .map(e -> (MailboxMessage) e) + .forEach(this::handleMailboxMessage); } - protected void handleDecryptedMailboxMessageWithPubKey(DecryptedMessageWithPubKey decryptedMessageWithPubKey, - NodeAddress peer) { - NetworkEnvelope networkEnvelope = decryptedMessageWithPubKey.getNetworkEnvelope(); - if (networkEnvelope instanceof MailboxMessage && networkEnvelope instanceof TradeMessage) { - TradeMessage tradeMessage = (TradeMessage) networkEnvelope; - if (isMyMessage(tradeMessage) && isPubKeyValid(decryptedMessageWithPubKey)) { - // We only remove here if we have already completed the trade. - // Otherwise removal is done after successfully applied the task runner. - if (trade.isWithdrawn()) { - MailboxMessage mailboxMessage = (MailboxMessage) tradeMessage; - processModel.getP2PService().getMailboxMessageService().removeMailboxMsg(mailboxMessage); - log.info("Remove {} from the P2P network as trade is already completed.", - tradeMessage.getClass().getSimpleName()); - return; - } - onMailboxMessage(tradeMessage, peer); + private void handleMailboxMessage(MailboxMessage mailboxMessage) { + if (mailboxMessage instanceof TradeMessage) { + TradeMessage tradeMessage = (TradeMessage) mailboxMessage; + // We only remove here if we have already completed the trade. + // Otherwise removal is done after successfully applied the task runner. + if (trade.isWithdrawn()) { + processModel.getP2PService().getMailboxMessageService().removeMailboxMsg(mailboxMessage); + log.info("Remove {} from the P2P network as trade is already completed.", + tradeMessage.getClass().getSimpleName()); + return; } - } else if (networkEnvelope instanceof AckMessage) { - AckMessage ackMessage = (AckMessage) networkEnvelope; - if (isMyMessage(ackMessage) && isPubKeyValid(decryptedMessageWithPubKey)) { - if (!trade.isWithdrawn()) { - // We only apply the msg if we have not already completed the trade - onAckMessage(ackMessage, peer); - } - // In any case we remove the msg - processModel.getP2PService().getMailboxMessageService().removeMailboxMsg(ackMessage); - log.info("Remove {} from the P2P network.", ackMessage.getClass().getSimpleName()); + onMailboxMessage(tradeMessage, mailboxMessage.getSenderNodeAddress()); + } else if (mailboxMessage instanceof AckMessage) { + AckMessage ackMessage = (AckMessage) mailboxMessage; + if (!trade.isWithdrawn()) { + // We only apply the msg if we have not already completed the trade + onAckMessage(ackMessage, mailboxMessage.getSenderNodeAddress()); } + // In any case we remove the msg + processModel.getP2PService().getMailboxMessageService().removeMailboxMsg(ackMessage); + log.info("Remove {} from the P2P network.", ackMessage.getClass().getSimpleName()); } } @@ -380,6 +382,16 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D cleanup(); } + private boolean isMyMessage(NetworkEnvelope message) { + if (message instanceof TradeMessage) { + return isMyMessage((TradeMessage) message); + } else if (message instanceof AckMessage) { + return isMyMessage((AckMessage) message); + } else { + return false; + } + } + private boolean isMyMessage(TradeMessage message) { return message.getTradeId().equals(trade.getId()); }