Split on resulting message size

This commit is contained in:
Florian Reimair 2019-07-06 13:34:17 +02:00
parent f73fa94984
commit 0733379141
No known key found for this signature in database
GPG key ID: 05634D8D7A7954C8
2 changed files with 14 additions and 10 deletions

View file

@ -34,7 +34,7 @@ import lombok.Value;
@EqualsAndHashCode(callSuper = true)
@Value
public final class EnvelopeOfEnvelopes extends NetworkEnvelope {
public final class EnvelopeOfEnvelopes extends NetworkEnvelope implements ExtendedDataSizePermission {
private final List<NetworkEnvelope> envelopes;

View file

@ -72,8 +72,10 @@ import java.io.StreamCorruptedException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -235,7 +237,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
return capabilities;
}
EnvelopeOfEnvelopes envelopeOfEnvelopes = null;
Object lock = new Object();
Queue<EnvelopeOfEnvelopes> envelopeOfEnvelopes = new ConcurrentLinkedQueue<>();
ScheduledExecutorService envelopeOfEnvelopesSender = Executors.newSingleThreadScheduledExecutor();
// Called from various threads
@ -285,25 +288,26 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
// check if EnvelopeOfEnvelopes is supported
if(getCapabilities().containsAll(new Capabilities(Capability.ENVELOPE_OF_ENVELOPES))) {
// check if a bucket is already there
synchronized (envelopeOfEnvelopes) {
if(envelopeOfEnvelopes == null) {
synchronized (lock) {
// check if current envelope fits size
// - no? create new envelope
if(envelopeOfEnvelopes.isEmpty() || envelopeOfEnvelopes.element().toProtoNetworkEnvelope().getSerializedSize() + networkEnvelope.toProtoNetworkEnvelope().getSerializedSize() > MAX_PERMITTED_MESSAGE_SIZE * 0.9) {
// - no? create a bucket
envelopeOfEnvelopes = new EnvelopeOfEnvelopes();
envelopeOfEnvelopes.add(new EnvelopeOfEnvelopes());
System.err.println("added fresh container");
// - and schedule it for sending
envelopeOfEnvelopesSender.schedule(() -> {
if (!stopped) {
synchronized (envelopeOfEnvelopes) {
synchronized (lock) {
lastSendTimeStamp = System.currentTimeMillis();
protoOutputStream.writeEnvelope(envelopeOfEnvelopes);
envelopeOfEnvelopes = null;
protoOutputStream.writeEnvelope(envelopeOfEnvelopes.poll());
}
}
}, sendMsgThrottleSleep, TimeUnit.MILLISECONDS);
}
// - yes? add to bucket
envelopeOfEnvelopes.add(networkEnvelope);
envelopeOfEnvelopes.element().add(networkEnvelope);
}
return;
}