This commit is contained in:
Florian Reimair 2019-07-08 09:33:50 +02:00
parent 8dd993fd54
commit 3fb8be3481
No known key found for this signature in database
GPG key ID: 05634D8D7A7954C8
4 changed files with 22 additions and 22 deletions

View file

@ -67,7 +67,7 @@ message NetworkEnvelope {
GetBlindVoteStateHashesRequest get_blind_vote_state_hashes_request = 41; GetBlindVoteStateHashesRequest get_blind_vote_state_hashes_request = 41;
GetBlindVoteStateHashesResponse get_blind_vote_state_hashes_response = 42; GetBlindVoteStateHashesResponse get_blind_vote_state_hashes_response = 42;
EnvelopeOfEnvelopes envelope_of_envelopes = 43; BundleOfEnvelopes bundle_of_envelopes = 43;
} }
} }
@ -75,7 +75,7 @@ message NetworkEnvelope {
// Implementations of NetworkEnvelope // Implementations of NetworkEnvelope
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
message EnvelopeOfEnvelopes { message BundleOfEnvelopes {
repeated NetworkEnvelope envelopes = 1; repeated NetworkEnvelope envelopes = 1;
} }

View file

@ -54,7 +54,7 @@ import bisq.core.trade.statistics.TradeStatistics;
import bisq.network.p2p.AckMessage; import bisq.network.p2p.AckMessage;
import bisq.network.p2p.CloseConnectionMessage; import bisq.network.p2p.CloseConnectionMessage;
import bisq.network.p2p.EnvelopeOfEnvelopes; import bisq.network.p2p.BundleOfEnvelopes;
import bisq.network.p2p.PrefixedSealedAndSignedMessage; import bisq.network.p2p.PrefixedSealedAndSignedMessage;
import bisq.network.p2p.peers.getdata.messages.GetDataResponse; import bisq.network.p2p.peers.getdata.messages.GetDataResponse;
import bisq.network.p2p.peers.getdata.messages.GetUpdatedDataRequest; import bisq.network.p2p.peers.getdata.messages.GetUpdatedDataRequest;
@ -191,8 +191,8 @@ public class CoreNetworkProtoResolver extends CoreProtoResolver implements Netwo
case GET_BLIND_VOTE_STATE_HASHES_RESPONSE: case GET_BLIND_VOTE_STATE_HASHES_RESPONSE:
return GetBlindVoteStateHashesResponse.fromProto(proto.getGetBlindVoteStateHashesResponse(), messageVersion); return GetBlindVoteStateHashesResponse.fromProto(proto.getGetBlindVoteStateHashesResponse(), messageVersion);
case ENVELOPE_OF_ENVELOPES: case BUNDLE_OF_ENVELOPES:
return EnvelopeOfEnvelopes.fromProto(proto.getEnvelopeOfEnvelopes(), this, messageVersion); return BundleOfEnvelopes.fromProto(proto.getBundleOfEnvelopes(), this, messageVersion);
default: default:
throw new ProtobufferException("Unknown proto message case (PB.NetworkEnvelope). messageCase=" + throw new ProtobufferException("Unknown proto message case (PB.NetworkEnvelope). messageCase=" +

View file

@ -34,11 +34,11 @@ import lombok.Value;
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@Value @Value
public final class EnvelopeOfEnvelopes extends NetworkEnvelope implements ExtendedDataSizePermission { public final class BundleOfEnvelopes extends NetworkEnvelope implements ExtendedDataSizePermission {
private final List<NetworkEnvelope> envelopes; private final List<NetworkEnvelope> envelopes;
public EnvelopeOfEnvelopes() { public BundleOfEnvelopes() {
this(new ArrayList<>(), Version.getP2PMessageVersion()); this(new ArrayList<>(), Version.getP2PMessageVersion());
} }
@ -50,7 +50,7 @@ public final class EnvelopeOfEnvelopes extends NetworkEnvelope implements Extend
// PROTO BUFFER // PROTO BUFFER
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private EnvelopeOfEnvelopes(List<NetworkEnvelope> envelopes, int messageVersion) { private BundleOfEnvelopes(List<NetworkEnvelope> envelopes, int messageVersion) {
super(messageVersion); super(messageVersion);
this.envelopes = envelopes; this.envelopes = envelopes;
} }
@ -59,13 +59,13 @@ public final class EnvelopeOfEnvelopes extends NetworkEnvelope implements Extend
@Override @Override
public PB.NetworkEnvelope toProtoNetworkEnvelope() { public PB.NetworkEnvelope toProtoNetworkEnvelope() {
return getNetworkEnvelopeBuilder() return getNetworkEnvelopeBuilder()
.setEnvelopeOfEnvelopes(PB.EnvelopeOfEnvelopes.newBuilder().addAllEnvelopes(envelopes.stream() .setBundleOfEnvelopes(PB.BundleOfEnvelopes.newBuilder().addAllEnvelopes(envelopes.stream()
.map(NetworkEnvelope::toProtoNetworkEnvelope) .map(NetworkEnvelope::toProtoNetworkEnvelope)
.collect(Collectors.toList()))) .collect(Collectors.toList())))
.build(); .build();
} }
public static EnvelopeOfEnvelopes fromProto(PB.EnvelopeOfEnvelopes proto, NetworkProtoResolver resolver, int messageVersion) { public static BundleOfEnvelopes fromProto(PB.BundleOfEnvelopes proto, NetworkProtoResolver resolver, int messageVersion) {
List<NetworkEnvelope> envelopes = proto.getEnvelopesList() List<NetworkEnvelope> envelopes = proto.getEnvelopesList()
.stream() .stream()
.map(envelope -> { .map(envelope -> {
@ -78,6 +78,6 @@ public final class EnvelopeOfEnvelopes extends NetworkEnvelope implements Extend
.filter(Objects::nonNull) .filter(Objects::nonNull)
.collect(Collectors.toList()); .collect(Collectors.toList());
return new EnvelopeOfEnvelopes(envelopes, messageVersion); return new BundleOfEnvelopes(envelopes, messageVersion);
} }
} }

View file

@ -17,8 +17,8 @@
package bisq.network.p2p.network; package bisq.network.p2p.network;
import bisq.network.p2p.BundleOfEnvelopes;
import bisq.network.p2p.CloseConnectionMessage; import bisq.network.p2p.CloseConnectionMessage;
import bisq.network.p2p.EnvelopeOfEnvelopes;
import bisq.network.p2p.ExtendedDataSizePermission; import bisq.network.p2p.ExtendedDataSizePermission;
import bisq.network.p2p.NodeAddress; import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.PrefixedSealedAndSignedMessage; import bisq.network.p2p.PrefixedSealedAndSignedMessage;
@ -238,8 +238,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
} }
Object lock = new Object(); Object lock = new Object();
Queue<EnvelopeOfEnvelopes> envelopeOfEnvelopes = new ConcurrentLinkedQueue<>(); Queue<BundleOfEnvelopes> queueOfBundles = new ConcurrentLinkedQueue<>();
ScheduledExecutorService envelopeOfEnvelopesSender = Executors.newSingleThreadScheduledExecutor(); ScheduledExecutorService bundleSender = Executors.newSingleThreadScheduledExecutor();
// Called from various threads // Called from various threads
public void sendMessage(NetworkEnvelope networkEnvelope) { public void sendMessage(NetworkEnvelope networkEnvelope) {
@ -286,30 +286,30 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
sendMsgThrottleTrigger, sendMsgThrottleSleep, lastSendTimeStamp, now, elapsed, sendMsgThrottleTrigger, sendMsgThrottleSleep, lastSendTimeStamp, now, elapsed,
networkEnvelope.getClass().getSimpleName()); networkEnvelope.getClass().getSimpleName());
// check if EnvelopeOfEnvelopes is supported // check if BundleOfEnvelopes is supported
if(getCapabilities().containsAll(new Capabilities(Capability.ENVELOPE_OF_ENVELOPES))) { if(getCapabilities().containsAll(new Capabilities(Capability.ENVELOPE_OF_ENVELOPES))) {
synchronized (lock) { synchronized (lock) {
// check if current envelope fits size // check if current envelope fits size
// - no? create new envelope // - no? create new envelope
if(envelopeOfEnvelopes.isEmpty() || envelopeOfEnvelopes.element().toProtoNetworkEnvelope().getSerializedSize() + networkEnvelope.toProtoNetworkEnvelope().getSerializedSize() > MAX_PERMITTED_MESSAGE_SIZE * 0.9) { if(queueOfBundles.isEmpty() || queueOfBundles.element().toProtoNetworkEnvelope().getSerializedSize() + networkEnvelope.toProtoNetworkEnvelope().getSerializedSize() > MAX_PERMITTED_MESSAGE_SIZE * 0.9) {
// - no? create a bucket // - no? create a bucket
envelopeOfEnvelopes.add(new EnvelopeOfEnvelopes()); queueOfBundles.add(new BundleOfEnvelopes());
System.err.println("added fresh container"); System.err.println("added fresh container");
// - and schedule it for sending // - and schedule it for sending
lastSendTimeStamp += sendMsgThrottleSleep; lastSendTimeStamp += sendMsgThrottleSleep;
envelopeOfEnvelopesSender.schedule(() -> { bundleSender.schedule(() -> {
if (!stopped) { if (!stopped) {
synchronized (lock) { synchronized (lock) {
protoOutputStream.writeEnvelope(envelopeOfEnvelopes.poll()); protoOutputStream.writeEnvelope(queueOfBundles.poll());
} }
} }
}, lastSendTimeStamp - now, TimeUnit.MILLISECONDS); }, lastSendTimeStamp - now, TimeUnit.MILLISECONDS);
} }
// - yes? add to bucket // - yes? add to bucket
envelopeOfEnvelopes.element().add(networkEnvelope); queueOfBundles.element().add(networkEnvelope);
} }
return; return;
} }
@ -433,8 +433,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
checkArgument(connection.equals(this)); checkArgument(connection.equals(this));
if(networkEnvelope instanceof EnvelopeOfEnvelopes) if(networkEnvelope instanceof BundleOfEnvelopes)
for(NetworkEnvelope current : ((EnvelopeOfEnvelopes) networkEnvelope).getEnvelopes()) { for(NetworkEnvelope current : ((BundleOfEnvelopes) networkEnvelope).getEnvelopes()) {
UserThread.execute(() -> messageListeners.forEach(e -> e.onMessage(current, connection))); UserThread.execute(() -> messageListeners.forEach(e -> e.onMessage(current, connection)));
} }
else else