Remove mailbox msg early from network

This commit is contained in:
chimp1984 2020-11-11 20:31:42 -05:00
parent b5eacdb1eb
commit 8b738796db
No known key found for this signature in database
GPG key ID: 9801B4EC591F90E3
6 changed files with 81 additions and 68 deletions

View file

@ -133,7 +133,7 @@ public class PrivateNotificationManager {
} }
public void removePrivateNotification() { public void removePrivateNotification() {
p2PService.removeEntryFromMailbox(decryptedMessageWithPubKey); p2PService.removeMailboxMsg(decryptedMessageWithPubKey);
} }
private boolean isKeyValid(String privKeyString) { private boolean isKeyValid(String privKeyString) {

View file

@ -177,7 +177,7 @@ public abstract class SupportManager {
requestPersistence(); requestPersistence();
if (decryptedMessageWithPubKey != null) if (decryptedMessageWithPubKey != null)
p2PService.removeEntryFromMailbox(decryptedMessageWithPubKey); p2PService.removeMailboxMsg(decryptedMessageWithPubKey);
} }
} }
@ -314,7 +314,7 @@ public abstract class SupportManager {
log.debug("decryptedMessageWithPubKey.message " + networkEnvelope); log.debug("decryptedMessageWithPubKey.message " + networkEnvelope);
if (networkEnvelope instanceof SupportMessage) { if (networkEnvelope instanceof SupportMessage) {
dispatchMessage((SupportMessage) networkEnvelope); dispatchMessage((SupportMessage) networkEnvelope);
p2PService.removeEntryFromMailbox(decryptedMessageWithPubKey); p2PService.removeMailboxMsg(decryptedMessageWithPubKey);
} else if (networkEnvelope instanceof AckMessage) { } else if (networkEnvelope instanceof AckMessage) {
onAckMessage((AckMessage) networkEnvelope, decryptedMessageWithPubKey); onAckMessage((AckMessage) networkEnvelope, decryptedMessageWithPubKey);
} }

View file

@ -35,6 +35,9 @@ import java.util.List;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
//TODO with the redesign of mailbox messages that is not required anymore. We leave it for now as we want to minimize
// changes for the 1.5.0 release but we should clean up afterwards...
/** /**
* Util for removing pending mailbox messages in case the trade has been closed by the seller after confirming receipt * Util for removing pending mailbox messages in case the trade has been closed by the seller after confirming receipt
* and a AckMessage as mailbox message will be sent by the buyer once they go online. In that case the seller's trade * and a AckMessage as mailbox message will be sent by the buyer once they go online. In that case the seller's trade
@ -73,8 +76,7 @@ public class CleanupMailboxMessages {
} }
private void cleanupMailboxMessages(List<Trade> trades) { private void cleanupMailboxMessages(List<Trade> trades) {
p2PService.getMailboxItemsByUid().values() p2PService.getMailBoxMessages()
.stream().map(P2PService.MailboxItem::getDecryptedMessageWithPubKey)
.forEach(message -> handleDecryptedMessageWithPubKey(message, trades)); .forEach(message -> handleDecryptedMessageWithPubKey(message, trades));
} }
@ -102,7 +104,7 @@ public class CleanupMailboxMessages {
private void removeEntryFromMailbox(DecryptedMessageWithPubKey decryptedMessageWithPubKey, Trade trade) { private void removeEntryFromMailbox(DecryptedMessageWithPubKey decryptedMessageWithPubKey, Trade trade) {
log.info("We found a pending mailbox message ({}) for trade {}. As the trade is closed we remove the mailbox message.", log.info("We found a pending mailbox message ({}) for trade {}. As the trade is closed we remove the mailbox message.",
decryptedMessageWithPubKey.getNetworkEnvelope().getClass().getSimpleName(), trade.getId()); decryptedMessageWithPubKey.getNetworkEnvelope().getClass().getSimpleName(), trade.getId());
p2PService.removeEntryFromMailbox(decryptedMessageWithPubKey); p2PService.removeMailboxMsg(decryptedMessageWithPubKey);
} }
private boolean isMyMessage(TradeMessage message, Trade trade) { private boolean isMyMessage(TradeMessage message, Trade trade) {

View file

@ -30,7 +30,6 @@ import bisq.network.p2p.DecryptedDirectMessageListener;
import bisq.network.p2p.DecryptedMessageWithPubKey; import bisq.network.p2p.DecryptedMessageWithPubKey;
import bisq.network.p2p.MailboxMessage; import bisq.network.p2p.MailboxMessage;
import bisq.network.p2p.NodeAddress; import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.P2PService;
import bisq.network.p2p.SendMailboxMessageListener; import bisq.network.p2p.SendMailboxMessageListener;
import bisq.network.p2p.messaging.DecryptedMailboxListener; import bisq.network.p2p.messaging.DecryptedMailboxListener;
@ -78,8 +77,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
processModel.getP2PService().addDecryptedDirectMessageListener(this); processModel.getP2PService().addDecryptedDirectMessageListener(this);
} }
processModel.getP2PService().addDecryptedMailboxListener(this); processModel.getP2PService().addDecryptedMailboxListener(this);
processModel.getP2PService().getMailboxItemsByUid().values() processModel.getP2PService().getMailBoxMessages()
.stream().map(P2PService.MailboxItem::getDecryptedMessageWithPubKey)
.forEach(this::handleDecryptedMessageWithPubKey); .forEach(this::handleDecryptedMessageWithPubKey);
} }
@ -138,7 +136,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
// We only remove here if we have already completed the trade. // We only remove here if we have already completed the trade.
// Otherwise removal is done after successfully applied the task runner. // Otherwise removal is done after successfully applied the task runner.
if (trade.isWithdrawn()) { if (trade.isWithdrawn()) {
processModel.getP2PService().removeEntryFromMailbox(decryptedMessageWithPubKey); processModel.getP2PService().removeMailboxMsg(decryptedMessageWithPubKey);
log.info("Remove {} from the P2P network.", tradeMessage.getClass().getSimpleName()); log.info("Remove {} from the P2P network.", tradeMessage.getClass().getSimpleName());
return; return;
} }
@ -152,7 +150,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
onAckMessage((AckMessage) networkEnvelope, peer); onAckMessage((AckMessage) networkEnvelope, peer);
} }
// In any case we remove the msg // In any case we remove the msg
processModel.getP2PService().removeEntryFromMailbox(decryptedMessageWithPubKey); processModel.getP2PService().removeMailboxMsg(decryptedMessageWithPubKey);
log.info("Remove {} from the P2P network.", networkEnvelope.getClass().getSimpleName()); log.info("Remove {} from the P2P network.", networkEnvelope.getClass().getSimpleName());
} }
} }
@ -165,7 +163,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
PublicKey sigPubKey = processModel.getTradingPeer().getPubKeyRing().getSignaturePubKey(); PublicKey sigPubKey = processModel.getTradingPeer().getPubKeyRing().getSignaturePubKey();
// We reconstruct the DecryptedMessageWithPubKey from the message and the peers signature pubKey // We reconstruct the DecryptedMessageWithPubKey from the message and the peers signature pubKey
DecryptedMessageWithPubKey decryptedMessageWithPubKey = new DecryptedMessageWithPubKey(tradeMessage, sigPubKey); DecryptedMessageWithPubKey decryptedMessageWithPubKey = new DecryptedMessageWithPubKey(tradeMessage, sigPubKey);
processModel.getP2PService().removeEntryFromMailbox(decryptedMessageWithPubKey); processModel.getP2PService().removeMailboxMsg(decryptedMessageWithPubKey);
log.info("Remove {} from the P2P network.", tradeMessage.getClass().getSimpleName()); log.info("Remove {} from the P2P network.", tradeMessage.getClass().getSimpleName());
} }
} }

View file

@ -124,8 +124,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private final Set<DecryptedDirectMessageListener> decryptedDirectMessageListeners = new CopyOnWriteArraySet<>(); private final Set<DecryptedDirectMessageListener> decryptedDirectMessageListeners = new CopyOnWriteArraySet<>();
private final Set<DecryptedMailboxListener> decryptedMailboxListeners = new CopyOnWriteArraySet<>(); private final Set<DecryptedMailboxListener> decryptedMailboxListeners = new CopyOnWriteArraySet<>();
private final Set<P2PServiceListener> p2pServiceListeners = new CopyOnWriteArraySet<>(); private final Set<P2PServiceListener> p2pServiceListeners = new CopyOnWriteArraySet<>();
@Getter private final Map<String, List<MailboxItem>> mailboxItemsByUid = new HashMap<>();
private final Map<String, MailboxItem> mailboxItemsByUid = new HashMap<>();
private final Set<Runnable> shutDownResultHandlers = new CopyOnWriteArraySet<>(); private final Set<Runnable> shutDownResultHandlers = new CopyOnWriteArraySet<>();
private final BooleanProperty hiddenServicePublished = new SimpleBooleanProperty(); private final BooleanProperty hiddenServicePublished = new SimpleBooleanProperty();
private final BooleanProperty preliminaryDataReceived = new SimpleBooleanProperty(); private final BooleanProperty preliminaryDataReceived = new SimpleBooleanProperty();
@ -447,7 +446,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
checkArgument(protectedMailboxStorageEntries.size() == 1); checkArgument(protectedMailboxStorageEntries.size() == 1);
var decryptedEntries = new ArrayList<>(getDecryptedEntries(protectedMailboxStorageEntries)); var decryptedEntries = new ArrayList<>(getDecryptedEntries(protectedMailboxStorageEntries));
if (decryptedEntries.size() == 1) { if (decryptedEntries.size() == 1) {
storeMailboxDataAndNotifyListeners(decryptedEntries.get(0)); processMailboxItem(decryptedEntries.get(0));
} }
} }
@ -466,7 +465,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
Futures.addCallback(future, new FutureCallback<>() { Futures.addCallback(future, new FutureCallback<>() {
public void onSuccess(Set<MailboxItem> decryptedMailboxMessageWithEntries) { public void onSuccess(Set<MailboxItem> decryptedMailboxMessageWithEntries) {
UserThread.execute(() -> decryptedMailboxMessageWithEntries.forEach(e -> storeMailboxDataAndNotifyListeners(e))); UserThread.execute(() -> decryptedMailboxMessageWithEntries.forEach(e -> processMailboxItem(e)));
} }
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
@ -502,14 +501,51 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
return null; return null;
} }
private void storeMailboxDataAndNotifyListeners(MailboxItem mailboxItem) { private void processMailboxItem(MailboxItem mailboxItem) {
DecryptedMessageWithPubKey decryptedMessageWithPubKey = mailboxItem.getDecryptedMessageWithPubKey(); DecryptedMessageWithPubKey decryptedMessageWithPubKey = mailboxItem.getDecryptedMessageWithPubKey();
MailboxMessage mailboxMessage = (MailboxMessage) decryptedMessageWithPubKey.getNetworkEnvelope(); MailboxMessage mailboxMessage = (MailboxMessage) decryptedMessageWithPubKey.getNetworkEnvelope();
String uid = mailboxMessage.getUid();
mailboxItemsByUid.putIfAbsent(uid, new ArrayList<>());
mailboxItemsByUid.get(uid).add(mailboxItem);
NodeAddress sender = mailboxMessage.getSenderNodeAddress(); NodeAddress sender = mailboxMessage.getSenderNodeAddress();
mailboxItemsByUid.put(mailboxMessage.getUid(), mailboxItem);
log.info("Received a {} mailbox message with uid {} and senderAddress {}", log.info("Received a {} mailbox message with uid {} and senderAddress {}",
mailboxMessage.getClass().getSimpleName(), mailboxMessage.getUid(), sender); mailboxMessage.getClass().getSimpleName(), uid, sender);
decryptedMailboxListeners.forEach(e -> e.onMailboxMessageAdded(decryptedMessageWithPubKey, sender)); decryptedMailboxListeners.forEach(e -> e.onMailboxMessageAdded(decryptedMessageWithPubKey, sender));
if (isBootstrapped()) {
// After we notified our listeners we remove the data immediately from the network.
// In case the client has not been ready it need to take it via getMailBoxMessages.
removeMailboxEntryFromNetwork(mailboxItem.getProtectedMailboxStorageEntry());
} else {
log.info("We are not bootstrapped yet, so we remove later once the message got processed.");
}
}
private void removeMailboxEntryFromNetwork(ProtectedMailboxStorageEntry protectedMailboxStorageEntry) {
MailboxStoragePayload mailboxStoragePayload = (MailboxStoragePayload) protectedMailboxStorageEntry.getProtectedStoragePayload();
PublicKey receiversPubKey = protectedMailboxStorageEntry.getReceiversPubKey();
try {
ProtectedMailboxStorageEntry updatedEntry = p2PDataStorage.getMailboxDataWithSignedSeqNr(
mailboxStoragePayload,
keyRing.getSignatureKeyPair(),
receiversPubKey);
P2PDataStorage.ByteArray hashOfPayload = p2PDataStorage.get32ByteHashAsByteArray(mailboxStoragePayload);
if (p2PDataStorage.getMap().containsKey(hashOfPayload)) {
boolean result = p2PDataStorage.remove(updatedEntry, networkNode.getNodeAddress());
if (result) {
log.info("Removed mailboxEntry from network");
} else {
log.warn("Removing mailboxEntry from network failed");
}
} else {
log.info("The mailboxEntry was already removed earlier.");
}
} catch (CryptoException e) {
e.printStackTrace();
log.error("Could not remove ProtectedMailboxStorageEntry from network. Error: {}", e.toString());
}
} }
@ -736,56 +772,25 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
} }
} }
public void removeEntryFromMailbox(DecryptedMessageWithPubKey decryptedMessageWithPubKey) { public void removeMailboxMsg(DecryptedMessageWithPubKey decryptedMessageWithPubKey) {
// We need to delay a bit to avoid that we remove our msg then get it from other peers again and reapply it again. if (isBootstrapped()) {
// If we delay the removal we have better chances that repeated network_messages we got from other peers are already filtered // We need to delay a bit to not get a ConcurrentModificationException as we might iterate over
// at the P2PService layer. // mailboxItemsByUid while getting called.
// Though we have to check in the client classes to not apply the same message again as there is no guarantee UserThread.execute(() -> {
// when we would get a message again from the network. MailboxMessage mailboxMessage = (MailboxMessage) decryptedMessageWithPubKey.getNetworkEnvelope();
try { String uid = mailboxMessage.getUid();
UserThread.runAfter(() -> delayedRemoveEntryFromMailbox(decryptedMessageWithPubKey), 2); if (mailboxItemsByUid.containsKey(uid)) {
} catch (NetworkNotReadyException t) { List<MailboxItem> list = mailboxItemsByUid.get(uid);
// If we called too early it might throw a NetworkNotReadyException. We will try again
try {
UserThread.runAfter(() -> delayedRemoveEntryFromMailbox(decryptedMessageWithPubKey), 60);
} catch (NetworkNotReadyException ignore) {
log.warn("We tried to call delayedRemoveEntryFromMailbox 60 sec. after we received an " +
"NetworkNotReadyException but it failed again. We give up here.");
}
}
}
private void delayedRemoveEntryFromMailbox(DecryptedMessageWithPubKey decryptedMessageWithPubKey) { // In case we have not been bootstrapped when we tried to remove the message at the time when we
if (!isBootstrapped()) { // received the message, we remove it now.
// We don't throw an NetworkNotReadyException here. list.forEach(mailboxItem -> removeMailboxEntryFromNetwork(mailboxItem.getProtectedMailboxStorageEntry()));
// This case should not happen anyway as we check for isBootstrapped in the callers. mailboxItemsByUid.remove(uid);
log.warn("You must have bootstrapped before adding data to the P2P network.");
}
MailboxMessage mailboxMessage = (MailboxMessage) decryptedMessageWithPubKey.getNetworkEnvelope();
String uid = mailboxMessage.getUid();
if (mailboxItemsByUid.containsKey(uid)) {
ProtectedMailboxStorageEntry mailboxData = mailboxItemsByUid.get(uid).getProtectedMailboxStorageEntry();
if (mailboxData != null && mailboxData.getProtectedStoragePayload() instanceof MailboxStoragePayload) {
MailboxStoragePayload expirableMailboxStoragePayload = (MailboxStoragePayload) mailboxData.getProtectedStoragePayload();
PublicKey receiversPubKey = mailboxData.getReceiversPubKey();
checkArgument(receiversPubKey.equals(keyRing.getSignatureKeyPair().getPublic()),
"receiversPubKey is not matching with our key. That must not happen.");
try {
ProtectedMailboxStorageEntry protectedMailboxStorageEntry = p2PDataStorage.getMailboxDataWithSignedSeqNr(
expirableMailboxStoragePayload,
keyRing.getSignatureKeyPair(),
receiversPubKey);
p2PDataStorage.remove(protectedMailboxStorageEntry, networkNode.getNodeAddress());
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
} }
});
mailboxItemsByUid.remove(uid);
log.info("Removed successfully decryptedMsgWithPubKey. uid={}", uid);
}
} else { } else {
log.warn("uid for mailbox entry not found in mailboxMap." + "uid={}", uid); // In case the network was not ready yet we try again later
UserThread.runAfter(() -> removeMailboxMsg(decryptedMessageWithPubKey), 30);
} }
} }
@ -922,6 +927,14 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
.findAny(); .findAny();
} }
public Set<DecryptedMessageWithPubKey> getMailBoxMessages() {
return mailboxItemsByUid.values().stream()
.filter(list -> !list.isEmpty())
.map(list -> list.get(0))
.map(P2PService.MailboxItem::getDecryptedMessageWithPubKey)
.collect(Collectors.toSet());
}
@Value @Value
public class MailboxItem { public class MailboxItem {
private final ProtectedMailboxStorageEntry protectedMailboxStorageEntry; private final ProtectedMailboxStorageEntry protectedMailboxStorageEntry;

View file

@ -931,8 +931,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
removeFromMapAndDataStore(Collections.singletonList(Maps.immutableEntry(hashOfPayload, protectedStorageEntry))); removeFromMapAndDataStore(Collections.singletonList(Maps.immutableEntry(hashOfPayload, protectedStorageEntry)));
} }
private void removeFromMapAndDataStore( private void removeFromMapAndDataStore(Collection<Map.Entry<ByteArray,
Collection<Map.Entry<ByteArray, ProtectedStorageEntry>> entriesToRemoveWithPayloadHash) { ProtectedStorageEntry>> entriesToRemoveWithPayloadHash) {
if (entriesToRemoveWithPayloadHash.isEmpty()) if (entriesToRemoveWithPayloadHash.isEmpty())
return; return;