Remove handling of BundleOfEnvelopes in send method.

It is basically never executed and throttling would avoid it as well.
Cleanups, Rename
This commit is contained in:
chimp1984 2021-10-21 10:12:28 +02:00
parent 1c0b52cd31
commit 07354191a6
No known key found for this signature in database
GPG Key ID: 9801B4EC591F90E3

View File

@ -34,7 +34,6 @@ import bisq.network.p2p.storage.payload.ProtectedStoragePayload;
import bisq.common.Proto;
import bisq.common.UserThread;
import bisq.common.app.Capabilities;
import bisq.common.app.Capability;
import bisq.common.app.HasCapabilities;
import bisq.common.app.Version;
import bisq.common.config.Config;
@ -70,15 +69,12 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -123,14 +119,12 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
///////////////////////////////////////////////////////////////////////////////////////////
private final Socket socket;
// private final MessageListener messageListener;
private final ConnectionListener connectionListener;
@Nullable
private final NetworkFilter networkFilter;
@Getter
private final String uid;
private final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "Connection.java executor-service"));
// holder of state shared between InputHandler and Connection
@Getter
private final Statistic statistic;
@Getter
@ -221,10 +215,6 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
return capabilities;
}
private final Object lock = new Object();
private final Queue<BundleOfEnvelopes> queueOfBundles = new ConcurrentLinkedQueue<>();
private final ScheduledExecutorService bundleSender = Executors.newSingleThreadScheduledExecutor();
// Called from various threads
public void sendMessage(NetworkEnvelope networkEnvelope) {
long ts = System.currentTimeMillis();
@ -242,7 +232,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
return;
}
if (!noCapabilityRequiredOrCapabilityIsSupported(networkEnvelope)) {
if (!testCapability(networkEnvelope)) {
log.debug("Capability for networkEnvelope is required but not supported");
return;
}
@ -257,58 +247,6 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
getSendMsgThrottleTrigger(), getSendMsgThrottleSleep(), lastSendTimeStamp, now, elapsed,
networkEnvelope.getClass().getSimpleName());
// check if BundleOfEnvelopes is supported
if (getCapabilities().containsAll(new Capabilities(Capability.BUNDLE_OF_ENVELOPES))) {
synchronized (lock) {
// check if current envelope fits size
// - no? create new envelope
int size = !queueOfBundles.isEmpty() ? queueOfBundles.element().toProtoNetworkEnvelope().getSerializedSize() + networkEnvelopeSize : 0;
if (queueOfBundles.isEmpty() || size > MAX_PERMITTED_MESSAGE_SIZE * 0.9) {
// - no? create a bucket
queueOfBundles.add(new BundleOfEnvelopes());
// - and schedule it for sending
lastSendTimeStamp += getSendMsgThrottleSleep();
bundleSender.schedule(() -> {
if (!stopped) {
synchronized (lock) {
BundleOfEnvelopes bundle = queueOfBundles.poll();
if (bundle != null && !stopped) {
NetworkEnvelope envelope;
int msgSize;
if (bundle.getEnvelopes().size() == 1) {
envelope = bundle.getEnvelopes().get(0);
msgSize = envelope.toProtoNetworkEnvelope().getSerializedSize();
} else {
envelope = bundle;
msgSize = networkEnvelopeSize;
}
try {
protoOutputStream.writeEnvelope(envelope);
UserThread.execute(() -> messageListeners.forEach(e -> e.onMessageSent(envelope, this)));
UserThread.execute(() -> connectionStatistics.addSendMsgMetrics(System.currentTimeMillis() - ts, msgSize));
} catch (Throwable t) {
log.error("Sending envelope of class {} to address {} " +
"failed due {}",
envelope.getClass().getSimpleName(),
this.getPeersNodeAddressOptional(),
t.toString());
log.error("envelope: {}", envelope);
}
}
}
}
}, lastSendTimeStamp - now, TimeUnit.MILLISECONDS);
}
// - yes? add to bucket
queueOfBundles.element().add(networkEnvelope);
}
return;
}
Thread.sleep(getSendMsgThrottleSleep());
}
@ -326,7 +264,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
// TODO: If msg is BundleOfEnvelopes we should check each individual message for capability and filter out those
// which fail.
public boolean noCapabilityRequiredOrCapabilityIsSupported(Proto msg) {
public boolean testCapability(Proto msg) {
boolean result;
if (msg instanceof AddDataMessage) {
final ProtectedStoragePayload protectedStoragePayload = (((AddDataMessage) msg).getProtectedStorageEntry()).getProtectedStoragePayload();
@ -531,7 +469,6 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
stopped = true;
//noinspection UnstableApiUsage
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
log.error(t.getMessage());
@ -576,10 +513,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
//noinspection UnstableApiUsage
MoreExecutors.shutdownAndAwaitTermination(singleThreadExecutor, 500, TimeUnit.MILLISECONDS);
//noinspection UnstableApiUsage
MoreExecutors.shutdownAndAwaitTermination(bundleSender, 500, TimeUnit.MILLISECONDS);
log.debug("Connection shutdown complete {}", this.toString());
log.debug("Connection shutdown complete {}", this);
// Use UserThread.execute as its not clear if that is called from a non-UserThread
if (shutDownCompleteHandler != null)
UserThread.execute(shutDownCompleteHandler);
@ -655,7 +590,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
"numRuleViolations={}\n\t" +
"corruptRequest={}\n\t" +
"corruptRequests={}\n\t" +
"connection={}", numRuleViolations, ruleViolation, ruleViolations.toString(), this);
"connection={}", numRuleViolations, ruleViolation, ruleViolations, this);
this.ruleViolation = ruleViolation;
if (ruleViolation == RuleViolation.PEER_BANNED) {
log.warn("We close connection due RuleViolation.PEER_BANNED. peersNodeAddress={}", getPeersNodeAddressOptional());
@ -690,13 +625,13 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
log.info("SocketException (expected if connection lost). closeConnectionReason={}; connection={}", closeConnectionReason, this);
} else if (e instanceof SocketTimeoutException || e instanceof TimeoutException) {
closeConnectionReason = CloseConnectionReason.SOCKET_TIMEOUT;
log.info("Shut down caused by exception {} on connection={}", e.toString(), this);
log.info("Shut down caused by exception {} on connection={}", e, this);
} else if (e instanceof EOFException) {
closeConnectionReason = CloseConnectionReason.TERMINATED;
log.warn("Shut down caused by exception {} on connection={}", e.toString(), this);
log.warn("Shut down caused by exception {} on connection={}", e, this);
} else if (e instanceof OptionalDataException || e instanceof StreamCorruptedException) {
closeConnectionReason = CloseConnectionReason.CORRUPTED_DATA;
log.warn("Shut down caused by exception {} on connection={}", e.toString(), this);
log.warn("Shut down caused by exception {} on connection={}", e, this);
} else {
// TODO sometimes we get StreamCorruptedException, OptionalDataException, IllegalStateException
closeConnectionReason = CloseConnectionReason.UNKNOWN_EXCEPTION;