Do not apply large persisted messages

Signed-off-by: HenrikJannsen <boilingfrog@gmx.com>
This commit is contained in:
HenrikJannsen 2022-11-24 13:36:47 -05:00
parent 143af43eba
commit 2de86222b7
No known key found for this signature in database
GPG Key ID: 02AA2BAE387C8307

View File

@ -163,37 +163,45 @@ public class MailboxMessageService implements HashMapChangedListener, PersistedD
public void readPersisted(Runnable completeHandler) {
persistenceManager.readPersisted(persisted -> {
Map<String, Tuple2<AtomicLong, List<Integer>>> numItemsPerDay = new HashMap<>();
AtomicLong totalSize = new AtomicLong();
// We sort by creation date and limit to max 3000 entries, so the oldest items get skipped even if TTL
// is not reached to cap the memory footprint. 3000 items is about 10 MB.
// is not reached. 3000 items is about 60 MB with max size of 20kb supported for storage.
persisted.stream()
.sorted(Comparator.comparingLong(o -> ((MailboxItem) o).getProtectedMailboxStorageEntry().getCreationTimeStamp()).reversed())
.limit(3000)
.filter(e -> !e.isExpired(clock))
.filter(e -> !mailboxItemsByUid.containsKey(e.getUid()))
.limit(3000)
.forEach(mailboxItem -> {
ProtectedMailboxStorageEntry protectedMailboxStorageEntry = mailboxItem.getProtectedMailboxStorageEntry();
int serializedSize = protectedMailboxStorageEntry.toProtoMessage().getSerializedSize();
// Usual size is 3-4kb. A few are about 15kb and very few are larger and about 100kb or
// more (probably attachments in disputes)
String date = new Date(protectedMailboxStorageEntry.getCreationTimeStamp()).toString();
if (serializedSize > 20000) {
log.warn("Large mailboxItem: size={}; date={}; sender={}", Utilities.readableFileSize(serializedSize), date,
mailboxItem.getProtectedMailboxStorageEntry().getMailboxStoragePayload().getPrefixedSealedAndSignedMessage().getSenderNodeAddress());
}
String day = date.substring(4, 10);
numItemsPerDay.putIfAbsent(day, new Tuple2<>(new AtomicLong(0), new ArrayList<>()));
Tuple2<AtomicLong, List<Integer>> tuple = numItemsPerDay.get(day);
tuple.first.getAndIncrement();
tuple.second.add(serializedSize);
String uid = mailboxItem.getUid();
mailboxItemsByUid.put(uid, mailboxItem);
mailboxMessageList.add(mailboxItem);
// We add it to our map so that it get added to the excluded key set we send for
// the initial data requests. So that helps to lower the load for mailbox messages at
// initial data requests.
p2PDataStorage.addProtectedMailboxStorageEntryToMap(protectedMailboxStorageEntry);
// We only keep small items, to reduce the potential impact of missed remove messages.
// E.g. if a seed at a longer restart period missed the remove messages, then when loading from
// persisted data the messages, they would add those again and distribute then later at requests to peers.
// Those outdated messages would then stay in the network until TTL triggers removal.
// By not applying large messages we reduce the impact of such cases at costs of extra loading costs if the message is still alive.
if (serializedSize < 20000) {
mailboxItemsByUid.put(mailboxItem.getUid(), mailboxItem);
mailboxMessageList.add(mailboxItem);
totalSize.getAndAdd(serializedSize);
// We add it to our map so that it get added to the excluded key set we send for
// the initial data requests. So that helps to lower the load for mailbox messages at
// initial data requests.
p2PDataStorage.addProtectedMailboxStorageEntryToMap(protectedMailboxStorageEntry);
} else {
log.info("We ignore this large persisted mailboxItem. If still valid we will reload it from seed nodes at getData requests.\n" +
"Size={}; date={}; sender={}", Utilities.readableFileSize(serializedSize), date,
mailboxItem.getProtectedMailboxStorageEntry().getMailboxStoragePayload().getPrefixedSealedAndSignedMessage().getSenderNodeAddress());
}
});
List<String> perDay = numItemsPerDay.entrySet().stream()
@ -206,13 +214,15 @@ public class MailboxMessageService implements HashMapChangedListener, PersistedD
.filter(s -> s > 20000)
.map(Utilities::readableFileSize)
.collect(Collectors.toList());
return entry.getKey() + ": Num messages: " + tuple.first + "; total size: " +
Utilities.readableFileSize(sum) + "; large messages: " + largeItems;
String largeMsgInfo = largeItems.isEmpty() ? "" : "; Large messages: " + largeItems;
return entry.getKey() + ": Num messages: " + tuple.first + "; Total size: " +
Utilities.readableFileSize(sum) + largeMsgInfo;
})
.collect(Collectors.toList());
log.info("We loaded {} persisted mailbox messages.\nPer day distribution:\n{}",
log.info("We loaded {} persisted mailbox messages with {}.\nPer day distribution:\n{}",
mailboxMessageList.size(),
Utilities.readableFileSize(totalSize.get()),
Joiner.on("\n").join(perDay));
requestPersistence();