diff --git a/common/src/main/java/bisq/common/app/Capability.java b/common/src/main/java/bisq/common/app/Capability.java index 83e4d0f113..dc15baaf24 100644 --- a/common/src/main/java/bisq/common/app/Capability.java +++ b/common/src/main/java/bisq/common/app/Capability.java @@ -32,5 +32,6 @@ public enum Capability { BLIND_VOTE, ACK_MSG, BSQ_BLOCK, - DAO_STATE + DAO_STATE, + ENVELOPE_OF_ENVELOPES } diff --git a/common/src/main/proto/pb.proto b/common/src/main/proto/pb.proto index aba03e79d8..b1e3d291d6 100644 --- a/common/src/main/proto/pb.proto +++ b/common/src/main/proto/pb.proto @@ -66,6 +66,8 @@ message NetworkEnvelope { NewBlindVoteStateHashMessage new_blind_vote_state_hash_message = 40; GetBlindVoteStateHashesRequest get_blind_vote_state_hashes_request = 41; GetBlindVoteStateHashesResponse get_blind_vote_state_hashes_response = 42; + + BundleOfEnvelopes bundle_of_envelopes = 43; } } @@ -73,6 +75,10 @@ message NetworkEnvelope { // Implementations of NetworkEnvelope /////////////////////////////////////////////////////////////////////////////////////////// +message BundleOfEnvelopes { + repeated NetworkEnvelope envelopes = 1; +} + // get data message PreliminaryGetDataRequest { diff --git a/core/src/main/java/bisq/core/proto/network/CoreNetworkProtoResolver.java b/core/src/main/java/bisq/core/proto/network/CoreNetworkProtoResolver.java index 85a02942bc..4c9509a947 100644 --- a/core/src/main/java/bisq/core/proto/network/CoreNetworkProtoResolver.java +++ b/core/src/main/java/bisq/core/proto/network/CoreNetworkProtoResolver.java @@ -54,6 +54,7 @@ import bisq.core.trade.statistics.TradeStatistics; import bisq.network.p2p.AckMessage; import bisq.network.p2p.CloseConnectionMessage; +import bisq.network.p2p.BundleOfEnvelopes; import bisq.network.p2p.PrefixedSealedAndSignedMessage; import bisq.network.p2p.peers.getdata.messages.GetDataResponse; import bisq.network.p2p.peers.getdata.messages.GetUpdatedDataRequest; @@ -190,6 +191,9 @@ public class CoreNetworkProtoResolver extends CoreProtoResolver implements Netwo case GET_BLIND_VOTE_STATE_HASHES_RESPONSE: return GetBlindVoteStateHashesResponse.fromProto(proto.getGetBlindVoteStateHashesResponse(), messageVersion); + case BUNDLE_OF_ENVELOPES: + return BundleOfEnvelopes.fromProto(proto.getBundleOfEnvelopes(), this, messageVersion); + default: throw new ProtobufferException("Unknown proto message case (PB.NetworkEnvelope). messageCase=" + proto.getMessageCase() + "; proto raw data=" + proto.toString()); diff --git a/core/src/main/java/bisq/core/setup/CoreNetworkCapabilities.java b/core/src/main/java/bisq/core/setup/CoreNetworkCapabilities.java index 3eafe49a1e..e0e2bbaa9b 100644 --- a/core/src/main/java/bisq/core/setup/CoreNetworkCapabilities.java +++ b/core/src/main/java/bisq/core/setup/CoreNetworkCapabilities.java @@ -29,6 +29,7 @@ import lombok.extern.slf4j.Slf4j; public class CoreNetworkCapabilities { public static void setSupportedCapabilities(BisqEnvironment bisqEnvironment) { Capabilities.app.addAll(Capability.TRADE_STATISTICS, Capability.TRADE_STATISTICS_2, Capability.ACCOUNT_AGE_WITNESS, Capability.ACK_MSG); + Capabilities.app.addAll(Capability.ENVELOPE_OF_ENVELOPES); if (BisqEnvironment.isDaoActivated(bisqEnvironment)) { Capabilities.app.addAll(Capability.PROPOSAL, Capability.BLIND_VOTE, Capability.BSQ_BLOCK, Capability.DAO_STATE); diff --git a/p2p/src/main/java/bisq/network/p2p/BundleOfEnvelopes.java b/p2p/src/main/java/bisq/network/p2p/BundleOfEnvelopes.java new file mode 100644 index 0000000000..d7943d35cb --- /dev/null +++ b/p2p/src/main/java/bisq/network/p2p/BundleOfEnvelopes.java @@ -0,0 +1,83 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p; + +import bisq.common.app.Version; +import bisq.common.proto.ProtobufferException; +import bisq.common.proto.network.NetworkEnvelope; +import bisq.common.proto.network.NetworkProtoResolver; + +import io.bisq.generated.protobuffer.PB; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import lombok.EqualsAndHashCode; +import lombok.Value; + +@EqualsAndHashCode(callSuper = true) +@Value +public final class BundleOfEnvelopes extends NetworkEnvelope implements ExtendedDataSizePermission { + + private final List envelopes; + + public BundleOfEnvelopes() { + this(new ArrayList<>(), Version.getP2PMessageVersion()); + } + + public void add(NetworkEnvelope networkEnvelope) { + envelopes.add(networkEnvelope); + } + + /////////////////////////////////////////////////////////////////////////////////////////// + // PROTO BUFFER + /////////////////////////////////////////////////////////////////////////////////////////// + + private BundleOfEnvelopes(List envelopes, int messageVersion) { + super(messageVersion); + this.envelopes = envelopes; + } + + + @Override + public PB.NetworkEnvelope toProtoNetworkEnvelope() { + return getNetworkEnvelopeBuilder() + .setBundleOfEnvelopes(PB.BundleOfEnvelopes.newBuilder().addAllEnvelopes(envelopes.stream() + .map(NetworkEnvelope::toProtoNetworkEnvelope) + .collect(Collectors.toList()))) + .build(); + } + + public static BundleOfEnvelopes fromProto(PB.BundleOfEnvelopes proto, NetworkProtoResolver resolver, int messageVersion) { + List envelopes = proto.getEnvelopesList() + .stream() + .map(envelope -> { + try { + return resolver.fromProto(envelope); + } catch (ProtobufferException e) { + return null; + } + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + return new BundleOfEnvelopes(envelopes, messageVersion); + } +} diff --git a/p2p/src/main/java/bisq/network/p2p/network/Connection.java b/p2p/src/main/java/bisq/network/p2p/network/Connection.java index 3b1851a5a6..e36fb5b729 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/Connection.java +++ b/p2p/src/main/java/bisq/network/p2p/network/Connection.java @@ -17,6 +17,7 @@ package bisq.network.p2p.network; +import bisq.network.p2p.BundleOfEnvelopes; import bisq.network.p2p.CloseConnectionMessage; import bisq.network.p2p.ExtendedDataSizePermission; import bisq.network.p2p.NodeAddress; @@ -38,6 +39,7 @@ import bisq.network.p2p.storage.payload.ProtectedStoragePayload; import bisq.common.Proto; import bisq.common.UserThread; import bisq.common.app.Capabilities; +import bisq.common.app.Capability; import bisq.common.app.HasCapabilities; import bisq.common.app.Version; import bisq.common.proto.ProtobufferException; @@ -69,11 +71,14 @@ import java.io.StreamCorruptedException; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -186,7 +191,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { uid = UUID.randomUUID().toString(); statistic = new Statistic(); - if(connectionConfig == null) + if (connectionConfig == null) connectionConfig = new ConnectionConfig(MSG_THROTTLE_PER_SEC, MSG_THROTTLE_PER_10_SEC, SEND_MSG_THROTTLE_TRIGGER, SEND_MSG_THROTTLE_SLEEP); msgThrottlePerSec = connectionConfig.getMsgThrottlePerSec(); msgThrottlePer10Sec = connectionConfig.getMsgThrottlePer10Sec(); @@ -230,6 +235,10 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { return capabilities; } + Object lock = new Object(); + Queue queueOfBundles = new ConcurrentLinkedQueue<>(); + ScheduledExecutorService bundleSender = Executors.newSingleThreadScheduledExecutor(); + // Called from various threads public void sendMessage(NetworkEnvelope networkEnvelope) { log.debug(">> Send networkEnvelope of type: " + networkEnvelope.getClass().getSimpleName()); @@ -237,18 +246,6 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { if (!stopped) { if (noCapabilityRequiredOrCapabilityIsSupported(networkEnvelope)) { try { - // Throttle outbound network_messages - long now = System.currentTimeMillis(); - long elapsed = now - lastSendTimeStamp; - if (elapsed < sendMsgThrottleTrigger) { - log.debug("We got 2 sendMessage requests in less than {} ms. We set the thread to sleep " + - "for {} ms to avoid flooding our peer. lastSendTimeStamp={}, now={}, elapsed={}, networkEnvelope={}", - sendMsgThrottleTrigger, sendMsgThrottleSleep, lastSendTimeStamp, now, elapsed, - networkEnvelope.getClass().getSimpleName()); - Thread.sleep(sendMsgThrottleSleep); - } - - lastSendTimeStamp = now; String peersNodeAddress = peersNodeAddressOptional.map(NodeAddress::toString).orElse("null"); PB.NetworkEnvelope proto = networkEnvelope.toProtoNetworkEnvelope(); @@ -278,6 +275,47 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { peersNodeAddress, uid, Utilities.toTruncatedString(networkEnvelope), proto.getSerializedSize()); } + // Throttle outbound network_messages + long now = System.currentTimeMillis(); + long elapsed = now - lastSendTimeStamp; + if (elapsed < sendMsgThrottleTrigger) { + log.debug("We got 2 sendMessage requests in less than {} ms. We set the thread to sleep " + + "for {} ms to avoid flooding our peer. lastSendTimeStamp={}, now={}, elapsed={}, networkEnvelope={}", + sendMsgThrottleTrigger, sendMsgThrottleSleep, lastSendTimeStamp, now, elapsed, + networkEnvelope.getClass().getSimpleName()); + + // check if BundleOfEnvelopes is supported + if (getCapabilities().containsAll(new Capabilities(Capability.ENVELOPE_OF_ENVELOPES))) { + synchronized (lock) { + // check if current envelope fits size + // - no? create new envelope + if (queueOfBundles.isEmpty() || queueOfBundles.element().toProtoNetworkEnvelope().getSerializedSize() + networkEnvelope.toProtoNetworkEnvelope().getSerializedSize() > MAX_PERMITTED_MESSAGE_SIZE * 0.9) { + // - no? create a bucket + queueOfBundles.add(new BundleOfEnvelopes()); + + // - and schedule it for sending + lastSendTimeStamp += sendMsgThrottleSleep; + + bundleSender.schedule(() -> { + if (!stopped) { + synchronized (lock) { + protoOutputStream.writeEnvelope(queueOfBundles.poll()); + } + } + }, lastSendTimeStamp - now, TimeUnit.MILLISECONDS); + } + + // - yes? add to bucket + queueOfBundles.element().add(networkEnvelope); + } + return; + } + + Thread.sleep(sendMsgThrottleSleep); + } + + lastSendTimeStamp = now; + if (!stopped) { protoOutputStream.writeEnvelope(networkEnvelope); } @@ -377,7 +415,13 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { @Override public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { checkArgument(connection.equals(this)); - UserThread.execute(() -> messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection))); + + if (networkEnvelope instanceof BundleOfEnvelopes) + for (NetworkEnvelope current : ((BundleOfEnvelopes) networkEnvelope).getEnvelopes()) { + UserThread.execute(() -> messageListeners.forEach(e -> e.onMessage(current, connection))); + } + else + UserThread.execute(() -> messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection))); }