Sort persisted mail box messages by age and limit total number.

Add size limit.
Add log for number of persisted mailbox msg per day
This commit is contained in:
chimp1984 2021-01-12 22:59:44 -05:00
parent 2fb3bba74d
commit d434cb3022
No known key found for this signature in database
GPG Key ID: 9801B4EC591F90E3
3 changed files with 56 additions and 22 deletions

View File

@ -311,7 +311,7 @@ public abstract class SupportManager {
decryptedMailboxMessageWithPubKeys.forEach(decryptedMessageWithPubKey -> {
NetworkEnvelope networkEnvelope = decryptedMessageWithPubKey.getNetworkEnvelope();
log.debug("decryptedMessageWithPubKey.message " + networkEnvelope);
log.trace("## decryptedMessageWithPubKey message={}", networkEnvelope.getClass().getSimpleName());
if (networkEnvelope instanceof SupportMessage) {
SupportMessage supportMessage = (SupportMessage) networkEnvelope;
onSupportMessage(supportMessage);

View File

@ -55,6 +55,7 @@ import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import com.google.common.base.Joiner;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -69,6 +70,8 @@ import java.time.Clock;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -94,12 +97,18 @@ import static com.google.common.base.Preconditions.checkNotNull;
* We store mailbox messages locally to cause less load at initial data requests (using excluded keys) and to get better
* resilience in case processing failed. In such cases it would be re-applied after restart.
*
* We use a map with the uid of the decrypted mailboxMessage if it was our own mailbox message or of the uid of the
* prefixedSealedAndSignedMessage if it was a foreign one. It would be better to use the hash of the payload but that
* We use a map with the uid of the decrypted mailboxMessage if it was our own mailbox message. Otherwise we use the uid of the
* prefixedSealedAndSignedMessage if it was a mailboxMessage not addressed to us. It would be better to use the hash of the payload but that
* would require a large refactoring in the trade protocol. We call the remove method after a message got processed and pass
* the tradeMessage to the remove method. We do not have the outer envelope which would be needed for the hash and
* we do not want to pass around that to all trade methods just for that use case. So we use the uid as lookup to get
* the mailboxItem containing the data we need for removal.
*
* If a node was not online and the remove mailbox message was sent during that time, the persisted mailbox message
* does not get removed. So we need to take care that the persisted data is not growing too much and we apply some
* filtering and limiting at reading the persisted data.
* Any message gets removed once the expiry data (max 15 days) is reached. Missing messages would be delivered from
* initial data requests.
*/
@Singleton
@Slf4j
@ -160,25 +169,50 @@ public class MailboxMessageService implements SetupListener, RequestDataManager.
@Override
public void readPersisted(Runnable completeHandler) {
persistenceManager.readPersisted(persisted -> {
log.trace("## readPersisted persisted {}", persisted.size());
Map<String, Long> numItemsPerDay = new HashMap<>();
// We sort by creation date and limit to max 3000 entries, so oldest items get skipped even if TTL
// is not reached to cap the memory footprint. 3000 items is about 10 MB.
persisted.stream()
.sorted(Comparator.comparingLong(o -> ((MailboxItem) o).getProtectedMailboxStorageEntry().getCreationTimeStamp()).reversed())
.limit(3000)
.filter(e -> !e.isExpired(clock))
.filter(e -> !mailboxItemsByUid.containsKey(e.getUid()))
.forEach(mailboxItem -> {
String uid = mailboxItem.getUid();
mailboxItemsByUid.put(uid, mailboxItem);
mailboxMessageList.add(mailboxItem);
log.trace("## readPersisted uid={}\nhash={}\nmailboxItemsByUid={}",
uid,
P2PDataStorage.get32ByteHashAsByteArray(mailboxItem.getProtectedMailboxStorageEntry().getProtectedStoragePayload()),
mailboxItemsByUid);
ProtectedMailboxStorageEntry protectedMailboxStorageEntry = mailboxItem.getProtectedMailboxStorageEntry();
int serializedSize = protectedMailboxStorageEntry.toProtoMessage().getSerializedSize();
// Usual size is 3-4kb. A few are about 15kb and very few are larger and about 100kb or
// more (probably attachments in disputes)
// We ignore those large data to reduce memory footprint.
if (serializedSize < 20000) {
String date = new Date(protectedMailboxStorageEntry.getCreationTimeStamp()).toString();
String day = date.substring(4, 10);
numItemsPerDay.putIfAbsent(day, 0L);
numItemsPerDay.put(day, numItemsPerDay.get(day) + 1);
// We add it to our map so that it get added to the excluded key set we send for
// the initial data requests. So that helps to lower the load for mailbox messages at
// initial data requests.
//todo check if listeners are called too early
p2PDataStorage.addProtectedMailboxStorageEntryToMap(mailboxItem.getProtectedMailboxStorageEntry());
String uid = mailboxItem.getUid();
mailboxItemsByUid.put(uid, mailboxItem);
mailboxMessageList.add(mailboxItem);
// We add it to our map so that it get added to the excluded key set we send for
// the initial data requests. So that helps to lower the load for mailbox messages at
// initial data requests.
//todo check if listeners are called too early
p2PDataStorage.addProtectedMailboxStorageEntryToMap(protectedMailboxStorageEntry);
log.trace("## readPersisted uid={}\nhash={}\nisMine={}\ndate={}\nsize={}",
uid,
P2PDataStorage.get32ByteHashAsByteArray(protectedMailboxStorageEntry.getProtectedStoragePayload()),
mailboxItem.isMine(),
date,
serializedSize);
}
});
log.info("We have loaded {} persisted mailboxMessage items", mailboxMessageList.size());
List<Map.Entry<String, Long>> perDay = numItemsPerDay.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.collect(Collectors.toList());
log.info("We loaded {} persisted mailbox messages.\nPer day distribution:\n{}", mailboxMessageList.size(), Joiner.on("\n").join(perDay));
requestPersistence();
completeHandler.run();
},
@ -343,6 +377,7 @@ public class MailboxMessageService implements SetupListener, RequestDataManager.
@Override
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
log.trace("## onAdded");
Collection<ProtectedMailboxStorageEntry> entries = protectedStorageEntries.stream()
.filter(e -> e instanceof ProtectedMailboxStorageEntry)
.map(e -> (ProtectedMailboxStorageEntry) e)
@ -449,10 +484,9 @@ public class MailboxMessageService implements SetupListener, RequestDataManager.
if (!mailboxItemsByUid.containsKey(uid)) {
mailboxItemsByUid.put(uid, mailboxItem);
mailboxMessageList.add(mailboxItem);
log.trace("## handleMailboxItem uid={}\nhash={}\nmailboxMessageList={}",
log.trace("## handleMailboxItem uid={}\nhash={}",
uid,
P2PDataStorage.get32ByteHashAsByteArray(mailboxItem.getProtectedMailboxStorageEntry().getProtectedStoragePayload()),
mailboxItemsByUid);
P2PDataStorage.get32ByteHashAsByteArray(mailboxItem.getProtectedMailboxStorageEntry().getProtectedStoragePayload()));
requestPersistence();
}

View File

@ -58,7 +58,7 @@ public class RemovedPayloadsService implements PersistedDataHost {
persisted.getDateByHashes().entrySet().stream()
.filter(e -> e.getValue() > cutOffDate)
.forEach(e -> removedPayloadsMap.getDateByHashes().put(e.getKey(), e.getValue()));
log.trace("readPersisted: removedPayloadsMap={}", removedPayloadsMap);
log.trace("## readPersisted: removedPayloadsMap size={}", removedPayloadsMap.getDateByHashes().size());
persistenceManager.requestPersistence();
completeHandler.run();
},
@ -66,12 +66,12 @@ public class RemovedPayloadsService implements PersistedDataHost {
}
public boolean wasRemoved(P2PDataStorage.ByteArray hashOfPayload) {
log.trace("called wasRemoved: hashOfPayload={}, removedPayloadsMap={}", hashOfPayload.toString(), removedPayloadsMap);
log.trace("## called wasRemoved: hashOfPayload={}, removedPayloadsMap={}", hashOfPayload.toString(), removedPayloadsMap);
return removedPayloadsMap.getDateByHashes().containsKey(hashOfPayload);
}
public void addHash(P2PDataStorage.ByteArray hashOfPayload) {
log.trace("called addHash: hashOfPayload={}, removedPayloadsMap={}", hashOfPayload.toString(), removedPayloadsMap);
log.trace("## called addHash: hashOfPayload={}, removedPayloadsMap={}", hashOfPayload.toString(), removedPayloadsMap);
removedPayloadsMap.getDateByHashes().putIfAbsent(hashOfPayload, System.currentTimeMillis());
persistenceManager.requestPersistence();
}