Refactor handling of DecryptedMessageWithPubKey

This commit is contained in:
chimp1984 2021-01-12 14:36:13 -05:00
parent d353140e7a
commit c229c3f014
No known key found for this signature in database
GPG key ID: 9801B4EC591F90E3

View file

@ -40,6 +40,8 @@ import bisq.common.crypto.PubKeyRing;
import bisq.common.proto.network.NetworkEnvelope; import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.taskrunner.Task; import bisq.common.taskrunner.Task;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -86,8 +88,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
// change and leave that for a later PR // change and leave that for a later PR
UserThread.runAfter(() -> { UserThread.runAfter(() -> {
mailboxMessageService.addDecryptedMailboxListener(this); mailboxMessageService.addDecryptedMailboxListener(this);
mailboxMessageService.getMyDecryptedMailboxMessages() handleMailboxCollection(mailboxMessageService.getMyDecryptedMailboxMessages());
.forEach(this::handleDecryptedMailboxMessageWithPubKey);
}, 100, TimeUnit.MILLISECONDS); }, 100, TimeUnit.MILLISECONDS);
} }
@ -106,15 +107,19 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@Override @Override
public void onDirectMessage(DecryptedMessageWithPubKey message, NodeAddress peer) { public void onDirectMessage(DecryptedMessageWithPubKey decryptedMessageWithPubKey, NodeAddress peer) {
NetworkEnvelope networkEnvelope = message.getNetworkEnvelope(); if (!isPubKeyValid(decryptedMessageWithPubKey)) {
if (networkEnvelope instanceof TradeMessage && return;
isMyMessage((TradeMessage) networkEnvelope) && }
isPubKeyValid(message)) {
NetworkEnvelope networkEnvelope = decryptedMessageWithPubKey.getNetworkEnvelope();
if (!isMyMessage(networkEnvelope)) {
return;
}
if (networkEnvelope instanceof TradeMessage) {
onTradeMessage((TradeMessage) networkEnvelope, peer); onTradeMessage((TradeMessage) networkEnvelope, peer);
} else if (networkEnvelope instanceof AckMessage && } else if (networkEnvelope instanceof AckMessage) {
isMyMessage((AckMessage) networkEnvelope) &&
isPubKeyValid(message)) {
onAckMessage((AckMessage) networkEnvelope, peer); onAckMessage((AckMessage) networkEnvelope, peer);
} }
} }
@ -125,44 +130,41 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@Override @Override
public void onMailboxMessageAdded(DecryptedMessageWithPubKey message, NodeAddress peer) { public void onMailboxMessageAdded(DecryptedMessageWithPubKey decryptedMessageWithPubKey, NodeAddress peer) {
handleDecryptedMailboxMessageWithPubKey(message, peer); handleMailboxCollection(Collections.singletonList(decryptedMessageWithPubKey));
} }
private void handleDecryptedMailboxMessageWithPubKey(DecryptedMessageWithPubKey decryptedMessageWithPubKey) { private void handleMailboxCollection(Collection<DecryptedMessageWithPubKey> collection) {
MailboxMessage mailboxMessage = (MailboxMessage) decryptedMessageWithPubKey.getNetworkEnvelope(); collection.stream()
NodeAddress senderNodeAddress = mailboxMessage.getSenderNodeAddress(); .filter(this::isPubKeyValid)
handleDecryptedMailboxMessageWithPubKey(decryptedMessageWithPubKey, senderNodeAddress); .map(DecryptedMessageWithPubKey::getNetworkEnvelope)
.filter(this::isMyMessage)
.filter(e -> e instanceof MailboxMessage)
.map(e -> (MailboxMessage) e)
.forEach(this::handleMailboxMessage);
} }
protected void handleDecryptedMailboxMessageWithPubKey(DecryptedMessageWithPubKey decryptedMessageWithPubKey, private void handleMailboxMessage(MailboxMessage mailboxMessage) {
NodeAddress peer) { if (mailboxMessage instanceof TradeMessage) {
NetworkEnvelope networkEnvelope = decryptedMessageWithPubKey.getNetworkEnvelope(); TradeMessage tradeMessage = (TradeMessage) mailboxMessage;
if (networkEnvelope instanceof MailboxMessage && networkEnvelope instanceof TradeMessage) { // We only remove here if we have already completed the trade.
TradeMessage tradeMessage = (TradeMessage) networkEnvelope; // Otherwise removal is done after successfully applied the task runner.
if (isMyMessage(tradeMessage) && isPubKeyValid(decryptedMessageWithPubKey)) { if (trade.isWithdrawn()) {
// We only remove here if we have already completed the trade. processModel.getP2PService().getMailboxMessageService().removeMailboxMsg(mailboxMessage);
// Otherwise removal is done after successfully applied the task runner. log.info("Remove {} from the P2P network as trade is already completed.",
if (trade.isWithdrawn()) { tradeMessage.getClass().getSimpleName());
MailboxMessage mailboxMessage = (MailboxMessage) tradeMessage; return;
processModel.getP2PService().getMailboxMessageService().removeMailboxMsg(mailboxMessage);
log.info("Remove {} from the P2P network as trade is already completed.",
tradeMessage.getClass().getSimpleName());
return;
}
onMailboxMessage(tradeMessage, peer);
} }
} else if (networkEnvelope instanceof AckMessage) { onMailboxMessage(tradeMessage, mailboxMessage.getSenderNodeAddress());
AckMessage ackMessage = (AckMessage) networkEnvelope; } else if (mailboxMessage instanceof AckMessage) {
if (isMyMessage(ackMessage) && isPubKeyValid(decryptedMessageWithPubKey)) { AckMessage ackMessage = (AckMessage) mailboxMessage;
if (!trade.isWithdrawn()) { if (!trade.isWithdrawn()) {
// We only apply the msg if we have not already completed the trade // We only apply the msg if we have not already completed the trade
onAckMessage(ackMessage, peer); onAckMessage(ackMessage, mailboxMessage.getSenderNodeAddress());
}
// In any case we remove the msg
processModel.getP2PService().getMailboxMessageService().removeMailboxMsg(ackMessage);
log.info("Remove {} from the P2P network.", ackMessage.getClass().getSimpleName());
} }
// In any case we remove the msg
processModel.getP2PService().getMailboxMessageService().removeMailboxMsg(ackMessage);
log.info("Remove {} from the P2P network.", ackMessage.getClass().getSimpleName());
} }
} }
@ -380,6 +382,16 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
cleanup(); cleanup();
} }
private boolean isMyMessage(NetworkEnvelope message) {
if (message instanceof TradeMessage) {
return isMyMessage((TradeMessage) message);
} else if (message instanceof AckMessage) {
return isMyMessage((AckMessage) message);
} else {
return false;
}
}
private boolean isMyMessage(TradeMessage message) { private boolean isMyMessage(TradeMessage message) {
return message.getTradeId().equals(trade.getId()); return message.getTradeId().equals(trade.getId());
} }