From 073337914103e8a77a271b644a142e1806f12abd Mon Sep 17 00:00:00 2001 From: Florian Reimair Date: Sat, 6 Jul 2019 13:34:17 +0200 Subject: [PATCH] Split on resulting message size --- .../bisq/network/p2p/EnvelopeOfEnvelopes.java | 2 +- .../bisq/network/p2p/network/Connection.java | 22 +++++++++++-------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/EnvelopeOfEnvelopes.java b/p2p/src/main/java/bisq/network/p2p/EnvelopeOfEnvelopes.java index 8a89b6057f..68a84d0a3d 100644 --- a/p2p/src/main/java/bisq/network/p2p/EnvelopeOfEnvelopes.java +++ b/p2p/src/main/java/bisq/network/p2p/EnvelopeOfEnvelopes.java @@ -34,7 +34,7 @@ import lombok.Value; @EqualsAndHashCode(callSuper = true) @Value -public final class EnvelopeOfEnvelopes extends NetworkEnvelope { +public final class EnvelopeOfEnvelopes extends NetworkEnvelope implements ExtendedDataSizePermission { private final List envelopes; diff --git a/p2p/src/main/java/bisq/network/p2p/network/Connection.java b/p2p/src/main/java/bisq/network/p2p/network/Connection.java index 9c6af942f0..8433aa834c 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/Connection.java +++ b/p2p/src/main/java/bisq/network/p2p/network/Connection.java @@ -72,8 +72,10 @@ import java.io.StreamCorruptedException; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -235,7 +237,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { return capabilities; } - EnvelopeOfEnvelopes envelopeOfEnvelopes = null; + Object lock = new Object(); + Queue envelopeOfEnvelopes = new ConcurrentLinkedQueue<>(); ScheduledExecutorService envelopeOfEnvelopesSender = Executors.newSingleThreadScheduledExecutor(); // Called from various threads @@ -285,25 +288,26 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { // check if EnvelopeOfEnvelopes is supported if(getCapabilities().containsAll(new Capabilities(Capability.ENVELOPE_OF_ENVELOPES))) { - // check if a bucket is already there - synchronized (envelopeOfEnvelopes) { - if(envelopeOfEnvelopes == null) { + synchronized (lock) { + // check if current envelope fits size + // - no? create new envelope + if(envelopeOfEnvelopes.isEmpty() || envelopeOfEnvelopes.element().toProtoNetworkEnvelope().getSerializedSize() + networkEnvelope.toProtoNetworkEnvelope().getSerializedSize() > MAX_PERMITTED_MESSAGE_SIZE * 0.9) { // - no? create a bucket - envelopeOfEnvelopes = new EnvelopeOfEnvelopes(); + envelopeOfEnvelopes.add(new EnvelopeOfEnvelopes()); + System.err.println("added fresh container"); // - and schedule it for sending envelopeOfEnvelopesSender.schedule(() -> { if (!stopped) { - synchronized (envelopeOfEnvelopes) { + synchronized (lock) { lastSendTimeStamp = System.currentTimeMillis(); - protoOutputStream.writeEnvelope(envelopeOfEnvelopes); - envelopeOfEnvelopes = null; + protoOutputStream.writeEnvelope(envelopeOfEnvelopes.poll()); } } }, sendMsgThrottleSleep, TimeUnit.MILLISECONDS); } // - yes? add to bucket - envelopeOfEnvelopes.add(networkEnvelope); + envelopeOfEnvelopes.element().add(networkEnvelope); } return; }