Limit getDataResponse to 90% of MAX_PERMITTED_MESSAGE_SIZE (10MB)

Allocate 25% of the space for PersistableNetworkPayloads and 75% for ProtectedStorageEntries

Signed-off-by: HenrikJannsen <boilingfrog@gmx.com>
This commit is contained in:
HenrikJannsen 2022-11-23 16:03:59 -05:00
parent 60522218a7
commit 91c25f8aa5
No known key found for this signature in database
GPG Key ID: 02AA2BAE387C8307
2 changed files with 43 additions and 8 deletions

View File

@ -114,6 +114,10 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
return PERMITTED_MESSAGE_SIZE;
}
public static int getMaxPermittedMessageSize() {
return MAX_PERMITTED_MESSAGE_SIZE;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Class fields

View File

@ -105,6 +105,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@ -317,6 +318,12 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
// mapForDataResponse contains the filtered by version data from HistoricalDataStoreService as well as all other
// maps of the remaining appendOnlyDataStoreServices.
Map<ByteArray, PersistableNetworkPayload> mapForDataResponse = getMapForDataResponse(getDataRequest.getVersion());
// Give a bit of tolerance for message overhead
double maxSize = Connection.getMaxPermittedMessageSize() * 0.9;
// 25% of space is allocated for PersistableNetworkPayloads
long limit = Math.round(maxSize * 0.25);
Set<PersistableNetworkPayload> filteredPersistableNetworkPayloads =
filterKnownHashes(
mapForDataResponse,
@ -324,6 +331,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
excludedKeysAsByteArray,
peerCapabilities,
maxEntriesPerType,
limit,
wasPersistableNetworkPayloadsTruncated);
log.info("{} PersistableNetworkPayload entries remained after filtered by excluded keys. " +
"Original map had {} entries.",
@ -333,6 +341,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
.map(e -> Utilities.encodeToHex(e.getHash()))
.toArray());
// We give 75% space to ProtectedStorageEntries as they contain MailBoxMessages and those can be larger.
limit = Math.round(maxSize * 0.75);
Set<ProtectedStorageEntry> filteredProtectedStorageEntries =
filterKnownHashes(
map,
@ -340,6 +350,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
excludedKeysAsByteArray,
peerCapabilities,
maxEntriesPerType,
limit,
wasProtectedStorageEntriesTruncated);
log.info("{} ProtectedStorageEntry entries remained after filtered by excluded keys. " +
"Original map had {} entries.",
@ -411,16 +422,42 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
Set<ByteArray> knownHashes,
Capabilities peerCapabilities,
int maxEntries,
long limit,
AtomicBoolean outTruncated) {
log.info("Num knownHashes {}", knownHashes.size());
AtomicLong totalSize = new AtomicLong();
AtomicBoolean exceededSizeLimit = new AtomicBoolean();
// We start with the non-DateSortedTruncatablePayload as they have higher priority. In case we would exceed our
// size limit the following DateSortedTruncatablePayload items would not get added at all.
Set<Map.Entry<ByteArray, T>> entries = toFilter.entrySet();
List<T> filteredResults = entries.stream()
.filter(entry -> !(entry.getValue() instanceof DateSortedTruncatablePayload))
.filter(entry -> !knownHashes.contains(entry.getKey()))
.map(Map.Entry::getValue)
.filter(payload -> shouldTransmitPayloadToPeer(peerCapabilities, objToPayload.apply(payload)))
.filter(payload -> {
if (exceededSizeLimit.get() || totalSize.addAndGet(payload.toProtoMessage().getSerializedSize()) > limit) {
exceededSizeLimit.set(true);
}
return !exceededSizeLimit.get();
})
.collect(Collectors.toList());
log.info("Num filtered non-dateSortedTruncatablePayloads {}", filteredResults.size());
List<T> dateSortedTruncatablePayloads = entries.stream()
.filter(entry -> entry.getValue() instanceof DateSortedTruncatablePayload)
.filter(entry -> !knownHashes.contains(entry.getKey()))
.map(Map.Entry::getValue)
.filter(payload -> shouldTransmitPayloadToPeer(peerCapabilities, objToPayload.apply(payload)))
.filter(payload -> {
if (exceededSizeLimit.get() || totalSize.addAndGet(payload.toProtoMessage().getSerializedSize()) > limit) {
exceededSizeLimit.set(true);
}
return !exceededSizeLimit.get();
})
.sorted(Comparator.comparing(payload -> ((DateSortedTruncatablePayload) payload).getDate()))
.collect(Collectors.toList());
log.info("Num filtered dateSortedTruncatablePayloads {}", dateSortedTruncatablePayloads.size());
@ -435,14 +472,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
}
}
List<T> filteredResults = entries.stream()
.filter(entry -> !(entry.getValue() instanceof DateSortedTruncatablePayload))
.filter(entry -> !knownHashes.contains(entry.getKey()))
.map(Map.Entry::getValue)
.filter(payload -> shouldTransmitPayloadToPeer(peerCapabilities, objToPayload.apply(payload)))
.collect(Collectors.toList());
log.info("Num filtered non-dateSortedTruncatablePayloads {}", filteredResults.size());
// The non-dateSortedTruncatablePayloads have higher prio, so we added dateSortedTruncatablePayloads
// after those so in case we need to truncate we first truncate the dateSortedTruncatablePayloads.
filteredResults.addAll(dateSortedTruncatablePayloads);
@ -455,6 +484,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
log.info("Num filteredResults {}", filteredResults.size());
}
outTruncated.set(outTruncated.get() || exceededSizeLimit.get());
return new HashSet<>(filteredResults);
}