diff --git a/core/src/main/java/bisq/core/support/SupportManager.java b/core/src/main/java/bisq/core/support/SupportManager.java index b5a815ed9d..82ecc106d3 100644 --- a/core/src/main/java/bisq/core/support/SupportManager.java +++ b/core/src/main/java/bisq/core/support/SupportManager.java @@ -311,7 +311,7 @@ public abstract class SupportManager { decryptedMailboxMessageWithPubKeys.forEach(decryptedMessageWithPubKey -> { NetworkEnvelope networkEnvelope = decryptedMessageWithPubKey.getNetworkEnvelope(); - log.debug("decryptedMessageWithPubKey.message " + networkEnvelope); + log.trace("## decryptedMessageWithPubKey message={}", networkEnvelope.getClass().getSimpleName()); if (networkEnvelope instanceof SupportMessage) { SupportMessage supportMessage = (SupportMessage) networkEnvelope; onSupportMessage(supportMessage); diff --git a/p2p/src/main/java/bisq/network/p2p/mailbox/MailboxMessageService.java b/p2p/src/main/java/bisq/network/p2p/mailbox/MailboxMessageService.java index 49ae850c5b..db7146e6b3 100644 --- a/p2p/src/main/java/bisq/network/p2p/mailbox/MailboxMessageService.java +++ b/p2p/src/main/java/bisq/network/p2p/mailbox/MailboxMessageService.java @@ -55,6 +55,7 @@ import javax.inject.Inject; import javax.inject.Named; import javax.inject.Singleton; +import com.google.common.base.Joiner; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -69,6 +70,8 @@ import java.time.Clock; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -94,12 +97,18 @@ import static com.google.common.base.Preconditions.checkNotNull; * We store mailbox messages locally to cause less load at initial data requests (using excluded keys) and to get better * resilience in case processing failed. In such cases it would be re-applied after restart. * - * We use a map with the uid of the decrypted mailboxMessage if it was our own mailbox message or of the uid of the - * prefixedSealedAndSignedMessage if it was a foreign one. It would be better to use the hash of the payload but that + * We use a map with the uid of the decrypted mailboxMessage if it was our own mailbox message. Otherwise we use the uid of the + * prefixedSealedAndSignedMessage if it was a mailboxMessage not addressed to us. It would be better to use the hash of the payload but that * would require a large refactoring in the trade protocol. We call the remove method after a message got processed and pass * the tradeMessage to the remove method. We do not have the outer envelope which would be needed for the hash and * we do not want to pass around that to all trade methods just for that use case. So we use the uid as lookup to get * the mailboxItem containing the data we need for removal. + * + * If a node was not online and the remove mailbox message was sent during that time, the persisted mailbox message + * does not get removed. So we need to take care that the persisted data is not growing too much and we apply some + * filtering and limiting at reading the persisted data. + * Any message gets removed once the expiry data (max 15 days) is reached. Missing messages would be delivered from + * initial data requests. */ @Singleton @Slf4j @@ -160,25 +169,50 @@ public class MailboxMessageService implements SetupListener, RequestDataManager. @Override public void readPersisted(Runnable completeHandler) { persistenceManager.readPersisted(persisted -> { + log.trace("## readPersisted persisted {}", persisted.size()); + Map numItemsPerDay = new HashMap<>(); + // We sort by creation date and limit to max 3000 entries, so oldest items get skipped even if TTL + // is not reached to cap the memory footprint. 3000 items is about 10 MB. persisted.stream() + .sorted(Comparator.comparingLong(o -> ((MailboxItem) o).getProtectedMailboxStorageEntry().getCreationTimeStamp()).reversed()) + .limit(3000) .filter(e -> !e.isExpired(clock)) .filter(e -> !mailboxItemsByUid.containsKey(e.getUid())) .forEach(mailboxItem -> { - String uid = mailboxItem.getUid(); - mailboxItemsByUid.put(uid, mailboxItem); - mailboxMessageList.add(mailboxItem); - log.trace("## readPersisted uid={}\nhash={}\nmailboxItemsByUid={}", - uid, - P2PDataStorage.get32ByteHashAsByteArray(mailboxItem.getProtectedMailboxStorageEntry().getProtectedStoragePayload()), - mailboxItemsByUid); + ProtectedMailboxStorageEntry protectedMailboxStorageEntry = mailboxItem.getProtectedMailboxStorageEntry(); + int serializedSize = protectedMailboxStorageEntry.toProtoMessage().getSerializedSize(); + // Usual size is 3-4kb. A few are about 15kb and very few are larger and about 100kb or + // more (probably attachments in disputes) + // We ignore those large data to reduce memory footprint. + if (serializedSize < 20000) { + String date = new Date(protectedMailboxStorageEntry.getCreationTimeStamp()).toString(); + String day = date.substring(4, 10); + numItemsPerDay.putIfAbsent(day, 0L); + numItemsPerDay.put(day, numItemsPerDay.get(day) + 1); - // We add it to our map so that it get added to the excluded key set we send for - // the initial data requests. So that helps to lower the load for mailbox messages at - // initial data requests. - //todo check if listeners are called too early - p2PDataStorage.addProtectedMailboxStorageEntryToMap(mailboxItem.getProtectedMailboxStorageEntry()); + String uid = mailboxItem.getUid(); + mailboxItemsByUid.put(uid, mailboxItem); + mailboxMessageList.add(mailboxItem); + + // We add it to our map so that it get added to the excluded key set we send for + // the initial data requests. So that helps to lower the load for mailbox messages at + // initial data requests. + //todo check if listeners are called too early + p2PDataStorage.addProtectedMailboxStorageEntryToMap(protectedMailboxStorageEntry); + + log.trace("## readPersisted uid={}\nhash={}\nisMine={}\ndate={}\nsize={}", + uid, + P2PDataStorage.get32ByteHashAsByteArray(protectedMailboxStorageEntry.getProtectedStoragePayload()), + mailboxItem.isMine(), + date, + serializedSize); + } }); - log.info("We have loaded {} persisted mailboxMessage items", mailboxMessageList.size()); + + List> perDay = numItemsPerDay.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .collect(Collectors.toList()); + log.info("We loaded {} persisted mailbox messages.\nPer day distribution:\n{}", mailboxMessageList.size(), Joiner.on("\n").join(perDay)); requestPersistence(); completeHandler.run(); }, @@ -343,6 +377,7 @@ public class MailboxMessageService implements SetupListener, RequestDataManager. @Override public void onAdded(Collection protectedStorageEntries) { + log.trace("## onAdded"); Collection entries = protectedStorageEntries.stream() .filter(e -> e instanceof ProtectedMailboxStorageEntry) .map(e -> (ProtectedMailboxStorageEntry) e) @@ -449,10 +484,9 @@ public class MailboxMessageService implements SetupListener, RequestDataManager. if (!mailboxItemsByUid.containsKey(uid)) { mailboxItemsByUid.put(uid, mailboxItem); mailboxMessageList.add(mailboxItem); - log.trace("## handleMailboxItem uid={}\nhash={}\nmailboxMessageList={}", + log.trace("## handleMailboxItem uid={}\nhash={}", uid, - P2PDataStorage.get32ByteHashAsByteArray(mailboxItem.getProtectedMailboxStorageEntry().getProtectedStoragePayload()), - mailboxItemsByUid); + P2PDataStorage.get32ByteHashAsByteArray(mailboxItem.getProtectedMailboxStorageEntry().getProtectedStoragePayload())); requestPersistence(); } diff --git a/p2p/src/main/java/bisq/network/p2p/storage/persistence/RemovedPayloadsService.java b/p2p/src/main/java/bisq/network/p2p/storage/persistence/RemovedPayloadsService.java index af0a80f56a..f0fdd4fa4f 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/persistence/RemovedPayloadsService.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/persistence/RemovedPayloadsService.java @@ -58,7 +58,7 @@ public class RemovedPayloadsService implements PersistedDataHost { persisted.getDateByHashes().entrySet().stream() .filter(e -> e.getValue() > cutOffDate) .forEach(e -> removedPayloadsMap.getDateByHashes().put(e.getKey(), e.getValue())); - log.trace("readPersisted: removedPayloadsMap={}", removedPayloadsMap); + log.trace("## readPersisted: removedPayloadsMap size={}", removedPayloadsMap.getDateByHashes().size()); persistenceManager.requestPersistence(); completeHandler.run(); }, @@ -66,12 +66,12 @@ public class RemovedPayloadsService implements PersistedDataHost { } public boolean wasRemoved(P2PDataStorage.ByteArray hashOfPayload) { - log.trace("called wasRemoved: hashOfPayload={}, removedPayloadsMap={}", hashOfPayload.toString(), removedPayloadsMap); + log.trace("## called wasRemoved: hashOfPayload={}, removedPayloadsMap={}", hashOfPayload.toString(), removedPayloadsMap); return removedPayloadsMap.getDateByHashes().containsKey(hashOfPayload); } public void addHash(P2PDataStorage.ByteArray hashOfPayload) { - log.trace("called addHash: hashOfPayload={}, removedPayloadsMap={}", hashOfPayload.toString(), removedPayloadsMap); + log.trace("## called addHash: hashOfPayload={}, removedPayloadsMap={}", hashOfPayload.toString(), removedPayloadsMap); removedPayloadsMap.getDateByHashes().putIfAbsent(hashOfPayload, System.currentTimeMillis()); persistenceManager.requestPersistence(); }