Move handling of mailbox messages from TradeManager to TradeProtocol

Make removal of mailbox messages automated in TradeProtocol
This commit is contained in:
chimp1984 2020-09-26 23:38:01 -05:00
parent 84a4982732
commit 6deeecb846
No known key found for this signature in database
GPG Key ID: 9801B4EC591F90E3
15 changed files with 91 additions and 109 deletions

View File

@ -33,7 +33,6 @@ import bisq.core.trade.protocol.ProcessModel;
import bisq.core.trade.protocol.ProcessModelServiceProvider;
import bisq.core.trade.txproof.AssetTxProofResult;
import bisq.network.p2p.DecryptedMessageWithPubKey;
import bisq.network.p2p.NodeAddress;
import bisq.common.crypto.PubKeyRing;
@ -69,9 +68,7 @@ import javafx.collections.ObservableList;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Getter;
@ -399,8 +396,6 @@ public abstract class Trade implements Tradable, Model {
transient private ObjectProperty<Coin> tradeAmountProperty;
transient private ObjectProperty<Volume> tradeVolumeProperty;
@Getter
final transient private Set<DecryptedMessageWithPubKey> decryptedMessageWithPubKeySet = new HashSet<>();
// Added in v1.1.6
@Getter
@ -712,10 +707,6 @@ public abstract class Trade implements Tradable, Model {
return delayedPayoutTx;
}
public void removeDecryptedMessageWithPubKey(DecryptedMessageWithPubKey decryptedMessageWithPubKey) {
decryptedMessageWithPubKeySet.remove(decryptedMessageWithPubKey);
}
public void addAndPersistChatMessage(ChatMessage chatMessage) {
if (!chatMessages.contains(chatMessage)) {
chatMessages.add(chatMessage);
@ -1208,7 +1199,6 @@ public abstract class Trade implements Tradable, Model {
",\n tradeAmount=" + tradeAmount +
",\n tradeAmountProperty=" + tradeAmountProperty +
",\n tradeVolumeProperty=" + tradeVolumeProperty +
",\n decryptedMessageWithPubKeySet=" + decryptedMessageWithPubKeySet +
",\n mediationResultState=" + mediationResultState +
",\n mediationResultStateProperty=" + mediationResultStateProperty +
",\n lockTime=" + lockTime +

View File

@ -34,7 +34,6 @@ import bisq.core.trade.closed.ClosedTradableManager;
import bisq.core.trade.failed.FailedTradesManager;
import bisq.core.trade.handlers.TradeResultHandler;
import bisq.core.trade.messages.TakeOfferRequest;
import bisq.core.trade.messages.TradeMessage;
import bisq.core.trade.protocol.MakerProtocol;
import bisq.core.trade.protocol.ProcessModel;
import bisq.core.trade.protocol.ProcessModelServiceProvider;
@ -45,14 +44,11 @@ import bisq.core.trade.statistics.TradeStatisticsManager;
import bisq.core.user.User;
import bisq.core.util.Validator;
import bisq.network.p2p.AckMessage;
import bisq.network.p2p.AckMessageSourceType;
import bisq.network.p2p.BootstrapListener;
import bisq.network.p2p.DecryptedDirectMessageListener;
import bisq.network.p2p.DecryptedMessageWithPubKey;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.P2PService;
import bisq.network.p2p.messaging.DecryptedMailboxListener;
import bisq.common.ClockWatcher;
import bisq.common.config.Config;
@ -107,7 +103,7 @@ import javax.annotation.Nullable;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
public class TradeManager implements PersistedDataHost, DecryptedDirectMessageListener, DecryptedMailboxListener {
public class TradeManager implements PersistedDataHost, DecryptedDirectMessageListener {
private static final Logger log = LoggerFactory.getLogger(TradeManager.class);
private final User user;
@ -184,7 +180,6 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
tradableListStorage = storage;
p2PService.addDecryptedDirectMessageListener(this);
p2PService.addDecryptedMailboxListener(this);
failedTradesManager.setUnFailTradeCallback(this::unFailTrade);
}
@ -280,43 +275,6 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
}
///////////////////////////////////////////////////////////////////////////////////////////
// DecryptedMailboxListener
///////////////////////////////////////////////////////////////////////////////////////////
// Might get called at startup after HS is published. Can be before or after initPendingTrades.
@Override
public void onMailboxMessageAdded(DecryptedMessageWithPubKey message, NodeAddress peer) {
NetworkEnvelope networkEnvelope = message.getNetworkEnvelope();
if (networkEnvelope instanceof TradeMessage) {
TradeMessage tradeMessage = (TradeMessage) networkEnvelope;
getTradeById(tradeMessage.getTradeId())
.ifPresent(trade -> {
// We don't need to persist the msg as if we don't processes the message it will not be
// removed from the P2P network and we will receive it again on next startup.
// This might happen in edge cases when the user shuts down after we received the msg but
// before it is processed.
//TODO
Set<DecryptedMessageWithPubKey> decryptedMessageWithPubKeySet = trade.getDecryptedMessageWithPubKeySet();
if (!decryptedMessageWithPubKeySet.contains(message)) {
decryptedMessageWithPubKeySet.add(message);
// The message will be removed after processed
TradeProtocol tradeProtocol = getTradeProtocol(trade);
tradeProtocol.applyMailboxMessage(message);
}
});
} else if (networkEnvelope instanceof AckMessage) {
AckMessage ackMessage = (AckMessage) networkEnvelope;
if (ackMessage.getSourceType() == AckMessageSourceType.TRADE_MESSAGE) {
// We remove here the message not in the trade protocol as it might be that the trade is already
// completed and the protocol is not listening.
p2PService.removeEntryFromMailbox(message);
}
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Lifecycle
///////////////////////////////////////////////////////////////////////////////////////////

View File

@ -28,7 +28,8 @@ import lombok.Value;
* We do the re-sending of the payment sent message via the BuyerSendCounterCurrencyTransferStartedMessage task on the
* buyer side, so seller do not need to do anything interactively.
*/
@SuppressWarnings("deprecation")
@Deprecated
@SuppressWarnings("ALL")
@EqualsAndHashCode(callSuper = true)
@Value
public class RefreshTradeStateRequest extends TradeMessage implements MailboxMessage {

View File

@ -27,6 +27,10 @@ import bisq.common.app.Version;
import lombok.EqualsAndHashCode;
import lombok.Value;
/**
* Not used anymore since v1.4.0
*/
@Deprecated
@SuppressWarnings("ALL")
@EqualsAndHashCode(callSuper = true)
@Value

View File

@ -111,7 +111,7 @@ public abstract class BuyerProtocol extends DisputeProtocol {
"arrive and the peer repeats sending us the message. We send another ACK msg.");
stopTimeout();
sendAckMessage(message, true, null);
processModel.removeMailboxMessageAfterProcessing(trade);
removeMailboxMessageAfterProcessing(message);
}))
.setup(tasks(BuyerProcessDepositTxAndDelayedPayoutTxMessage.class,
BuyerVerifiesFinalDelayedPayoutTx.class,

View File

@ -42,8 +42,6 @@ import bisq.core.trade.statistics.TradeStatisticsManager;
import bisq.core.user.User;
import bisq.network.p2p.AckMessage;
import bisq.network.p2p.DecryptedMessageWithPubKey;
import bisq.network.p2p.MailboxMessage;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.P2PService;
@ -91,8 +89,6 @@ public class ProcessModel implements Model, PersistablePayload {
transient private Transaction takeOfferFeeTx;
@Setter
transient private TradeMessage tradeMessage;
@Setter
transient private DecryptedMessageWithPubKey decryptedMessageWithPubKey;
// Added in v1.2.0
@Setter
@ -251,16 +247,6 @@ public class ProcessModel implements Model, PersistablePayload {
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void removeMailboxMessageAfterProcessing(Trade trade) {
if (tradeMessage instanceof MailboxMessage &&
decryptedMessageWithPubKey != null &&
decryptedMessageWithPubKey.getNetworkEnvelope().equals(tradeMessage)) {
log.debug("Remove decryptedMsgWithPubKey from P2P network. decryptedMsgWithPubKey = " + decryptedMessageWithPubKey);
getP2PService().removeEntryFromMailbox(decryptedMessageWithPubKey);
trade.removeDecryptedMessageWithPubKey(decryptedMessageWithPubKey);
}
}
@Override
public void persist() {
log.warn("persist is not implemented in that class");

View File

@ -106,7 +106,7 @@ public abstract class SellerProtocol extends DisputeProtocol {
"so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg.");
sendAckMessage(message, true, null);
processModel.removeMailboxMessageAfterProcessing(trade);
removeMailboxMessageAfterProcessing(message);
}))
.setup(tasks(
SellerProcessCounterCurrencyTransferStartedMessage.class,

View File

@ -31,6 +31,7 @@ import bisq.network.p2p.DecryptedMessageWithPubKey;
import bisq.network.p2p.MailboxMessage;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.SendMailboxMessageListener;
import bisq.network.p2p.messaging.DecryptedMailboxListener;
import bisq.common.Timer;
import bisq.common.UserThread;
@ -38,19 +39,18 @@ import bisq.common.crypto.PubKeyRing;
import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.taskrunner.Task;
import java.util.HashSet;
import java.security.PublicKey;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
@Slf4j
public abstract class TradeProtocol implements DecryptedDirectMessageListener {
public abstract class TradeProtocol implements DecryptedDirectMessageListener, DecryptedMailboxListener {
protected final ProcessModel processModel;
protected final Trade trade;
private Timer timeoutTimer;
private boolean initialized;
///////////////////////////////////////////////////////////////////////////////////////////
@ -69,7 +69,6 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener {
public void initialize(ProcessModelServiceProvider serviceProvider, TradeManager tradeManager, Offer offer) {
processModel.applyTransient(serviceProvider, tradeManager, offer);
initialized = true;
onInitialized();
}
@ -77,29 +76,16 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener {
if (!trade.isWithdrawn()) {
processModel.getP2PService().addDecryptedDirectMessageListener(this);
}
// Apply mailbox messages
// Clone to avoid ConcurrentModificationException. We remove items at the applyMailboxMessage call...
new HashSet<>(trade.getDecryptedMessageWithPubKeySet()).forEach(this::applyMailboxMessage);
processModel.getP2PService().addDecryptedMailboxListener(this);
processModel.getP2PService().getMailboxMap().values()
.stream().map(e -> e.second)
.forEach(this::handleDecryptedMessageWithPubKey);
}
public void onWithdrawCompleted() {
cleanup();
}
public void applyMailboxMessage(DecryptedMessageWithPubKey message) {
if (initialized && isPubKeyValid(message)) {
NetworkEnvelope networkEnvelope = message.getNetworkEnvelope();
if (networkEnvelope instanceof MailboxMessage &&
networkEnvelope instanceof TradeMessage) {
processModel.setDecryptedMessageWithPubKey(message);
TradeMessage tradeMessage = (TradeMessage) networkEnvelope;
NodeAddress peerNodeAddress = ((MailboxMessage) networkEnvelope).getSenderNodeAddress();
onMailboxMessage(tradeMessage, peerNodeAddress);
}
}
}
protected void onMailboxMessage(TradeMessage message, NodeAddress peerNodeAddress) {
log.info("Received {} as MailboxMessage from {} with tradeId {} and uid {}",
message.getClass().getSimpleName(), peerNodeAddress, message.getTradeId(), message.getUid());
@ -117,11 +103,75 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener {
if (networkEnvelope instanceof TradeMessage && isMyTradeMessage((TradeMessage) networkEnvelope)) {
onTradeMessage((TradeMessage) networkEnvelope, peer);
} else if (networkEnvelope instanceof AckMessage) {
onAckMessage((AckMessage) networkEnvelope, peer);
AckMessage ackMessage = (AckMessage) networkEnvelope;
if (ackMessage.getSourceType() == AckMessageSourceType.TRADE_MESSAGE) {
onAckMessage((AckMessage) networkEnvelope, peer);
}
}
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// DecryptedMailboxListener
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onMailboxMessageAdded(DecryptedMessageWithPubKey message, NodeAddress peer) {
handleDecryptedMessageWithPubKey(message, peer);
}
private void handleDecryptedMessageWithPubKey(DecryptedMessageWithPubKey decryptedMessageWithPubKey) {
MailboxMessage mailboxMessage = (MailboxMessage) decryptedMessageWithPubKey.getNetworkEnvelope();
NodeAddress senderNodeAddress = mailboxMessage.getSenderNodeAddress();
handleDecryptedMessageWithPubKey(decryptedMessageWithPubKey, senderNodeAddress);
}
protected void handleDecryptedMessageWithPubKey(DecryptedMessageWithPubKey decryptedMessageWithPubKey,
NodeAddress peer) {
if (!isPubKeyValid(decryptedMessageWithPubKey)) {
return;
}
NetworkEnvelope networkEnvelope = decryptedMessageWithPubKey.getNetworkEnvelope();
if (networkEnvelope instanceof TradeMessage) {
TradeMessage tradeMessage = (TradeMessage) networkEnvelope;
if (!isMyTradeMessage(tradeMessage)) {
return;
}
if (trade.isWithdrawn()) {
processModel.getP2PService().removeEntryFromMailbox(decryptedMessageWithPubKey);
log.info("Remove {} from the P2P network.", tradeMessage.getClass().getSimpleName());
return;
}
onMailboxMessage(tradeMessage, peer);
} else if (networkEnvelope instanceof AckMessage) {
AckMessage ackMessage = (AckMessage) networkEnvelope;
if (ackMessage.getSourceType() == AckMessageSourceType.TRADE_MESSAGE &&
ackMessage.getSourceId().equals(trade.getId())) {
if (!trade.isWithdrawn()) {
onAckMessage((AckMessage) networkEnvelope, peer);
}
processModel.getP2PService().removeEntryFromMailbox(decryptedMessageWithPubKey);
log.info("Remove {} from the P2P network.", ackMessage.getClass().getSimpleName());
}
}
}
public void removeMailboxMessageAfterProcessing(TradeMessage tradeMessage) {
if (tradeMessage instanceof MailboxMessage &&
processModel.getTradingPeer() != null &&
processModel.getTradingPeer().getPubKeyRing() != null &&
processModel.getTradingPeer().getPubKeyRing().getSignaturePubKey() != null) {
PublicKey sigPubKey = processModel.getTradingPeer().getPubKeyRing().getSignaturePubKey();
// We reconstruct the DecryptedMessageWithPubKey from the message and the peers signature pubKey
DecryptedMessageWithPubKey decryptedMessageWithPubKey = new DecryptedMessageWithPubKey(tradeMessage, sigPubKey);
processModel.getP2PService().removeEntryFromMailbox(decryptedMessageWithPubKey);
log.info("Remove {} from the P2P network.", tradeMessage.getClass().getSimpleName());
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Abstract
@ -313,6 +363,11 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener {
log.info("TaskRunner successfully completed. Triggered from {}, tradeId={}", source, trade.getId());
if (message != null) {
sendAckMessage(message, true, null);
// Once a taskRunner is completed we remove the mailbox message. To not remove it directly at the task
// adds some resilience in case of minor errors, so after a restart the mailbox message can be applied
// again.
removeMailboxMessageAfterProcessing(message);
}
}

View File

@ -52,13 +52,9 @@ public class ProcessPeerPublishedDelayedPayoutTxMessage extends TradeTask {
Transaction delayedPayoutTx = checkNotNull(trade.getDelayedPayoutTx());
WalletService.maybeAddSelfTxToWallet(delayedPayoutTx, processModel.getBtcWalletService().getWallet());
// todo trade.setState
complete();
} catch (Throwable t) {
failed(t);
} finally {
processModel.removeMailboxMessageAfterProcessing(trade);
}
}
}

View File

@ -76,8 +76,6 @@ public class BuyerProcessDepositTxAndDelayedPayoutTxMessage extends TradeTask {
complete();
} catch (Throwable t) {
failed(t);
} finally {
processModel.removeMailboxMessageAfterProcessing(trade);
}
}
}

View File

@ -76,8 +76,6 @@ public class BuyerProcessPayoutTxPublishedMessage extends TradeTask {
complete();
} catch (Throwable t) {
failed(t);
} finally {
processModel.removeMailboxMessageAfterProcessing(trade);
}
}
}

View File

@ -54,8 +54,6 @@ public class ProcessMediatedPayoutSignatureMessage extends TradeTask {
complete();
} catch (Throwable t) {
failed(t);
} finally {
processModel.removeMailboxMessageAfterProcessing(trade);
}
}
}

View File

@ -77,8 +77,6 @@ public class ProcessMediatedPayoutTxPublishedMessage extends TradeTask {
complete();
} catch (Throwable t) {
failed(t);
} finally {
processModel.removeMailboxMessageAfterProcessing(trade);
}
}
}

View File

@ -64,8 +64,6 @@ public class SellerProcessCounterCurrencyTransferStartedMessage extends TradeTas
complete();
} catch (Throwable t) {
failed(t);
} finally {
processModel.removeMailboxMessageAfterProcessing(trade);
}
}
}

View File

@ -52,6 +52,7 @@ import bisq.common.crypto.PubKeyRing;
import bisq.common.proto.ProtobufferException;
import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.proto.persistable.PersistedDataHost;
import bisq.common.util.Tuple2;
import com.google.inject.Inject;
@ -116,7 +117,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private final Set<DecryptedDirectMessageListener> decryptedDirectMessageListeners = new CopyOnWriteArraySet<>();
private final Set<DecryptedMailboxListener> decryptedMailboxListeners = new CopyOnWriteArraySet<>();
private final Set<P2PServiceListener> p2pServiceListeners = new CopyOnWriteArraySet<>();
private final Map<String, ProtectedMailboxStorageEntry> mailboxMap = new HashMap<>();
@Getter
private final Map<String, Tuple2<ProtectedMailboxStorageEntry, DecryptedMessageWithPubKey>> mailboxMap = new HashMap<>();
private final Set<Runnable> shutDownResultHandlers = new CopyOnWriteArraySet<>();
private final BooleanProperty hiddenServicePublished = new SimpleBooleanProperty();
private final BooleanProperty preliminaryDataReceived = new SimpleBooleanProperty();
@ -531,7 +533,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
NodeAddress senderNodeAddress = mailboxMessage.getSenderNodeAddress();
checkNotNull(senderNodeAddress, "senderAddress must not be null for mailbox network_messages");
mailboxMap.put(mailboxMessage.getUid(), protectedMailboxStorageEntry);
mailboxMap.put(mailboxMessage.getUid(), new Tuple2<>(protectedMailboxStorageEntry, decryptedMessageWithPubKey));
log.info("Received a {} mailbox message with messageUid {} and senderAddress {}", mailboxMessage.getClass().getSimpleName(), mailboxMessage.getUid(), senderNodeAddress);
decryptedMailboxListeners.forEach(
e -> e.onMailboxMessageAdded(decryptedMessageWithPubKey, senderNodeAddress));
@ -741,7 +743,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
MailboxMessage mailboxMessage = (MailboxMessage) decryptedMessageWithPubKey.getNetworkEnvelope();
String uid = mailboxMessage.getUid();
if (mailboxMap.containsKey(uid)) {
ProtectedMailboxStorageEntry mailboxData = mailboxMap.get(uid);
ProtectedMailboxStorageEntry mailboxData = mailboxMap.get(uid).first;
if (mailboxData != null && mailboxData.getProtectedStoragePayload() instanceof MailboxStoragePayload) {
MailboxStoragePayload expirableMailboxStoragePayload = (MailboxStoragePayload) mailboxData.getProtectedStoragePayload();
PublicKey receiversPubKey = mailboxData.getReceiversPubKey();