Merge pull request #2939 from freimair/envelope_of_envelopes

Bundle of envelopes
This commit is contained in:
Christoph Atteneder 2019-07-08 19:45:34 +02:00 committed by GitHub
commit 7b29511415
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 154 additions and 15 deletions

View File

@ -32,5 +32,6 @@ public enum Capability {
BLIND_VOTE,
ACK_MSG,
BSQ_BLOCK,
DAO_STATE
DAO_STATE,
ENVELOPE_OF_ENVELOPES
}

View File

@ -66,6 +66,8 @@ message NetworkEnvelope {
NewBlindVoteStateHashMessage new_blind_vote_state_hash_message = 40;
GetBlindVoteStateHashesRequest get_blind_vote_state_hashes_request = 41;
GetBlindVoteStateHashesResponse get_blind_vote_state_hashes_response = 42;
BundleOfEnvelopes bundle_of_envelopes = 43;
}
}
@ -73,6 +75,10 @@ message NetworkEnvelope {
// Implementations of NetworkEnvelope
///////////////////////////////////////////////////////////////////////////////////////////
message BundleOfEnvelopes {
repeated NetworkEnvelope envelopes = 1;
}
// get data
message PreliminaryGetDataRequest {

View File

@ -54,6 +54,7 @@ import bisq.core.trade.statistics.TradeStatistics;
import bisq.network.p2p.AckMessage;
import bisq.network.p2p.CloseConnectionMessage;
import bisq.network.p2p.BundleOfEnvelopes;
import bisq.network.p2p.PrefixedSealedAndSignedMessage;
import bisq.network.p2p.peers.getdata.messages.GetDataResponse;
import bisq.network.p2p.peers.getdata.messages.GetUpdatedDataRequest;
@ -190,6 +191,9 @@ public class CoreNetworkProtoResolver extends CoreProtoResolver implements Netwo
case GET_BLIND_VOTE_STATE_HASHES_RESPONSE:
return GetBlindVoteStateHashesResponse.fromProto(proto.getGetBlindVoteStateHashesResponse(), messageVersion);
case BUNDLE_OF_ENVELOPES:
return BundleOfEnvelopes.fromProto(proto.getBundleOfEnvelopes(), this, messageVersion);
default:
throw new ProtobufferException("Unknown proto message case (PB.NetworkEnvelope). messageCase=" +
proto.getMessageCase() + "; proto raw data=" + proto.toString());

View File

@ -29,6 +29,7 @@ import lombok.extern.slf4j.Slf4j;
public class CoreNetworkCapabilities {
public static void setSupportedCapabilities(BisqEnvironment bisqEnvironment) {
Capabilities.app.addAll(Capability.TRADE_STATISTICS, Capability.TRADE_STATISTICS_2, Capability.ACCOUNT_AGE_WITNESS, Capability.ACK_MSG);
Capabilities.app.addAll(Capability.ENVELOPE_OF_ENVELOPES);
if (BisqEnvironment.isDaoActivated(bisqEnvironment)) {
Capabilities.app.addAll(Capability.PROPOSAL, Capability.BLIND_VOTE, Capability.BSQ_BLOCK, Capability.DAO_STATE);

View File

@ -0,0 +1,83 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.network.p2p;
import bisq.common.app.Version;
import bisq.common.proto.ProtobufferException;
import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.proto.network.NetworkProtoResolver;
import io.bisq.generated.protobuffer.PB;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
import lombok.Value;
@EqualsAndHashCode(callSuper = true)
@Value
public final class BundleOfEnvelopes extends NetworkEnvelope implements ExtendedDataSizePermission {
private final List<NetworkEnvelope> envelopes;
public BundleOfEnvelopes() {
this(new ArrayList<>(), Version.getP2PMessageVersion());
}
public void add(NetworkEnvelope networkEnvelope) {
envelopes.add(networkEnvelope);
}
///////////////////////////////////////////////////////////////////////////////////////////
// PROTO BUFFER
///////////////////////////////////////////////////////////////////////////////////////////
private BundleOfEnvelopes(List<NetworkEnvelope> envelopes, int messageVersion) {
super(messageVersion);
this.envelopes = envelopes;
}
@Override
public PB.NetworkEnvelope toProtoNetworkEnvelope() {
return getNetworkEnvelopeBuilder()
.setBundleOfEnvelopes(PB.BundleOfEnvelopes.newBuilder().addAllEnvelopes(envelopes.stream()
.map(NetworkEnvelope::toProtoNetworkEnvelope)
.collect(Collectors.toList())))
.build();
}
public static BundleOfEnvelopes fromProto(PB.BundleOfEnvelopes proto, NetworkProtoResolver resolver, int messageVersion) {
List<NetworkEnvelope> envelopes = proto.getEnvelopesList()
.stream()
.map(envelope -> {
try {
return resolver.fromProto(envelope);
} catch (ProtobufferException e) {
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
return new BundleOfEnvelopes(envelopes, messageVersion);
}
}

View File

@ -17,6 +17,7 @@
package bisq.network.p2p.network;
import bisq.network.p2p.BundleOfEnvelopes;
import bisq.network.p2p.CloseConnectionMessage;
import bisq.network.p2p.ExtendedDataSizePermission;
import bisq.network.p2p.NodeAddress;
@ -38,6 +39,7 @@ import bisq.network.p2p.storage.payload.ProtectedStoragePayload;
import bisq.common.Proto;
import bisq.common.UserThread;
import bisq.common.app.Capabilities;
import bisq.common.app.Capability;
import bisq.common.app.HasCapabilities;
import bisq.common.app.Version;
import bisq.common.proto.ProtobufferException;
@ -69,11 +71,14 @@ import java.io.StreamCorruptedException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -186,7 +191,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
uid = UUID.randomUUID().toString();
statistic = new Statistic();
if(connectionConfig == null)
if (connectionConfig == null)
connectionConfig = new ConnectionConfig(MSG_THROTTLE_PER_SEC, MSG_THROTTLE_PER_10_SEC, SEND_MSG_THROTTLE_TRIGGER, SEND_MSG_THROTTLE_SLEEP);
msgThrottlePerSec = connectionConfig.getMsgThrottlePerSec();
msgThrottlePer10Sec = connectionConfig.getMsgThrottlePer10Sec();
@ -230,6 +235,10 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
return capabilities;
}
Object lock = new Object();
Queue<BundleOfEnvelopes> queueOfBundles = new ConcurrentLinkedQueue<>();
ScheduledExecutorService bundleSender = Executors.newSingleThreadScheduledExecutor();
// Called from various threads
public void sendMessage(NetworkEnvelope networkEnvelope) {
log.debug(">> Send networkEnvelope of type: " + networkEnvelope.getClass().getSimpleName());
@ -237,18 +246,6 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
if (!stopped) {
if (noCapabilityRequiredOrCapabilityIsSupported(networkEnvelope)) {
try {
// Throttle outbound network_messages
long now = System.currentTimeMillis();
long elapsed = now - lastSendTimeStamp;
if (elapsed < sendMsgThrottleTrigger) {
log.debug("We got 2 sendMessage requests in less than {} ms. We set the thread to sleep " +
"for {} ms to avoid flooding our peer. lastSendTimeStamp={}, now={}, elapsed={}, networkEnvelope={}",
sendMsgThrottleTrigger, sendMsgThrottleSleep, lastSendTimeStamp, now, elapsed,
networkEnvelope.getClass().getSimpleName());
Thread.sleep(sendMsgThrottleSleep);
}
lastSendTimeStamp = now;
String peersNodeAddress = peersNodeAddressOptional.map(NodeAddress::toString).orElse("null");
PB.NetworkEnvelope proto = networkEnvelope.toProtoNetworkEnvelope();
@ -278,6 +275,47 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
peersNodeAddress, uid, Utilities.toTruncatedString(networkEnvelope), proto.getSerializedSize());
}
// Throttle outbound network_messages
long now = System.currentTimeMillis();
long elapsed = now - lastSendTimeStamp;
if (elapsed < sendMsgThrottleTrigger) {
log.debug("We got 2 sendMessage requests in less than {} ms. We set the thread to sleep " +
"for {} ms to avoid flooding our peer. lastSendTimeStamp={}, now={}, elapsed={}, networkEnvelope={}",
sendMsgThrottleTrigger, sendMsgThrottleSleep, lastSendTimeStamp, now, elapsed,
networkEnvelope.getClass().getSimpleName());
// check if BundleOfEnvelopes is supported
if (getCapabilities().containsAll(new Capabilities(Capability.ENVELOPE_OF_ENVELOPES))) {
synchronized (lock) {
// check if current envelope fits size
// - no? create new envelope
if (queueOfBundles.isEmpty() || queueOfBundles.element().toProtoNetworkEnvelope().getSerializedSize() + networkEnvelope.toProtoNetworkEnvelope().getSerializedSize() > MAX_PERMITTED_MESSAGE_SIZE * 0.9) {
// - no? create a bucket
queueOfBundles.add(new BundleOfEnvelopes());
// - and schedule it for sending
lastSendTimeStamp += sendMsgThrottleSleep;
bundleSender.schedule(() -> {
if (!stopped) {
synchronized (lock) {
protoOutputStream.writeEnvelope(queueOfBundles.poll());
}
}
}, lastSendTimeStamp - now, TimeUnit.MILLISECONDS);
}
// - yes? add to bucket
queueOfBundles.element().add(networkEnvelope);
}
return;
}
Thread.sleep(sendMsgThrottleSleep);
}
lastSendTimeStamp = now;
if (!stopped) {
protoOutputStream.writeEnvelope(networkEnvelope);
}
@ -377,7 +415,13 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
checkArgument(connection.equals(this));
UserThread.execute(() -> messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection)));
if (networkEnvelope instanceof BundleOfEnvelopes)
for (NetworkEnvelope current : ((BundleOfEnvelopes) networkEnvelope).getEnvelopes()) {
UserThread.execute(() -> messageListeners.forEach(e -> e.onMessage(current, connection)));
}
else
UserThread.execute(() -> messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection)));
}