Add missing listener notification and clean up class

This commit is contained in:
chimp1984 2019-08-27 22:03:23 +02:00
parent 1be6abf262
commit f95a7f5d3f
No known key found for this signature in database
GPG key ID: 9801B4EC591F90E3

View file

@ -121,8 +121,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
private static ConnectionConfig connectionConfig;
// Leaving some constants package-private for tests to know limits.
static final int PERMITTED_MESSAGE_SIZE = 200 * 1024; // 200 kb
static final int MAX_PERMITTED_MESSAGE_SIZE = 10 * 1024 * 1024; // 10 MB (425 offers resulted in about 660 kb, mailbox msg will add more to it) offer has usually 2 kb, mailbox 3kb.
private static final int PERMITTED_MESSAGE_SIZE = 200 * 1024; // 200 kb
private static final int MAX_PERMITTED_MESSAGE_SIZE = 10 * 1024 * 1024; // 10 MB (425 offers resulted in about 660 kb, mailbox msg will add more to it) offer has usually 2 kb, mailbox 3kb.
//TODO decrease limits again after testing
private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(120);
@ -172,7 +172,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
private RuleViolation ruleViolation;
private final ConcurrentHashMap<RuleViolation, Integer> ruleViolations = new ConcurrentHashMap<>();
private Capabilities capabilities = new Capabilities();
private final Capabilities capabilities = new Capabilities();
///////////////////////////////////////////////////////////////////////////////////////////
@ -233,9 +233,9 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
return capabilities;
}
Object lock = new Object();
Queue<BundleOfEnvelopes> queueOfBundles = new ConcurrentLinkedQueue<>();
ScheduledExecutorService bundleSender = Executors.newSingleThreadScheduledExecutor();
private final Object lock = new Object();
private final Queue<BundleOfEnvelopes> queueOfBundles = new ConcurrentLinkedQueue<>();
private final ScheduledExecutorService bundleSender = Executors.newSingleThreadScheduledExecutor();
// Called from various threads
public void sendMessage(NetworkEnvelope networkEnvelope) {
@ -250,7 +250,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
log.debug("Sending message: {}", Utilities.toTruncatedString(proto.toString(), 10000));
if (networkEnvelope instanceof Ping | networkEnvelope instanceof RefreshOfferMessage) {
// pings and offer refresh msg we dont want to log in production
// pings and offer refresh msg we don't want to log in production
log.trace("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Sending direct message to peer" +
"Write object to outputStream to peer: {} (uid={})\ntruncated message={} / size={}" +
@ -298,10 +298,11 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
if (!stopped) {
synchronized (lock) {
BundleOfEnvelopes current = queueOfBundles.poll();
if(current.getEnvelopes().size() == 1)
if (current != null && current.getEnvelopes().size() == 1) {
protoOutputStream.writeEnvelope(current.getEnvelopes().get(0));
else
} else {
protoOutputStream.writeEnvelope(current);
}
}
}
}, lastSendTimeStamp - now, TimeUnit.MILLISECONDS);
@ -386,10 +387,10 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
messageTimeStamps.add(now);
// clean list
while(messageTimeStamps.size() > msgThrottlePer10Sec)
while (messageTimeStamps.size() > msgThrottlePer10Sec)
messageTimeStamps.remove(0);
return violatesThrottleLimit(now,1, msgThrottlePerSec) || violatesThrottleLimit(now,10, msgThrottlePer10Sec);
return violatesThrottleLimit(now, 1, msgThrottlePerSec) || violatesThrottleLimit(now, 10, msgThrottlePer10Sec);
}
private boolean violatesThrottleLimit(long now, int seconds, int messageCountLimit) {
@ -399,7 +400,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
long compareValue = messageTimeStamps.get(messageTimeStamps.size() - messageCountLimit);
// if duration < seconds sec we received too much network_messages
if(now - compareValue < TimeUnit.SECONDS.toMillis(seconds)) {
if (now - compareValue < TimeUnit.SECONDS.toMillis(seconds)) {
log.error("violatesThrottleLimit {}/{} second(s)", messageCountLimit, seconds);
return true;
@ -436,7 +437,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
this.peerType = peerType;
}
public void setPeersNodeAddress(NodeAddress peerNodeAddress) {
private void setPeersNodeAddress(NodeAddress peerNodeAddress) {
checkNotNull(peerNodeAddress, "peerAddress must not be null");
peersNodeAddressOptional = Optional.of(peerNodeAddress);
@ -494,6 +495,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
stopped = true;
//noinspection UnstableApiUsage
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
log.error(t.getMessage());
@ -534,6 +536,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
e.printStackTrace();
}
//noinspection UnstableApiUsage
MoreExecutors.shutdownAndAwaitTermination(singleThreadExecutor, 500, TimeUnit.MILLISECONDS);
log.debug("Connection shutdown complete " + this.toString());
@ -705,7 +708,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
Thread.sleep(20);
}
// Reading the protobuffer message from the inputstream
// Reading the protobuffer message from the inputStream
protobuf.NetworkEnvelope proto = protobuf.NetworkEnvelope.parseDelimitedFrom(protoInputStream);
if (proto == null) {
@ -794,6 +797,12 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
Capabilities supportedCapabilities = ((SupportedCapabilitiesMessage) networkEnvelope).getSupportedCapabilities();
if (supportedCapabilities != null) {
capabilities.set(supportedCapabilities);
capabilitiesListeners.forEach(weakListener -> {
SupportedCapabilitiesListener supportedCapabilitiesListener = weakListener.get();
if (supportedCapabilitiesListener != null) {
supportedCapabilitiesListener.onChanged(supportedCapabilities);
}
});
}
}