Clear capabilitiesListeners at shutdown

Improve logs
This commit is contained in:
chimp1984 2020-10-07 13:03:18 -05:00
parent 8aec306159
commit 9821dd6271
No known key found for this signature in database
GPG key ID: 9801B4EC591F90E3

View file

@ -159,6 +159,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
private final List<Long> messageTimeStamps = new ArrayList<>();
private final CopyOnWriteArraySet<MessageListener> messageListeners = new CopyOnWriteArraySet<>();
private volatile long lastSendTimeStamp = 0;
// We use a weak reference here to ensure that no connection causes a memory leak in case it get closed without
// the shutDown being called.
private final CopyOnWriteArraySet<WeakReference<SupportedCapabilitiesListener>> capabilitiesListeners = new CopyOnWriteArraySet<>();
@Getter
@ -514,6 +516,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
} finally {
protoOutputStream.onConnectionShutdown();
capabilitiesListeners.clear();
try {
protoInputStream.close();
} catch (IOException e) {
@ -559,7 +563,6 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
'}';
}
@SuppressWarnings("unused")
public String printDetails() {
String portInfo;
if (socket.getLocalPort() == 0)
@ -783,19 +786,26 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
}
if (networkEnvelope instanceof SupportedCapabilitiesMessage) {
Capabilities supportedCapabilities = ((SupportedCapabilitiesMessage) networkEnvelope).getSupportedCapabilities();
if (supportedCapabilities != null) {
if (!capabilities.equals(supportedCapabilities)) {
capabilities.set(supportedCapabilities);
Capabilities capabilities = ((SupportedCapabilitiesMessage) networkEnvelope).getSupportedCapabilities();
if (capabilities != null) {
if (!this.capabilities.equals(capabilities)) {
this.capabilities.set(capabilities);
// Capabilities can be empty. We only check for mandatory if we get some capabilities.
if (!capabilities.isEmpty() && !Capabilities.hasMandatoryCapability(capabilities)) {
String senderNodeAddress = networkEnvelope instanceof SendersNodeAddressMessage ?
((SendersNodeAddressMessage) networkEnvelope).getSenderNodeAddress().getFullAddress() :
"[unknown address]";
log.info("We close a connection to old node {}. " +
"Capabilities of old node: {}, networkEnvelope class name={}",
senderNodeAddress, capabilities.prettyPrint(), networkEnvelope.getClass().getSimpleName());
if (!this.capabilities.isEmpty() && !Capabilities.hasMandatoryCapability(this.capabilities)) {
String senderNodeAddress = getPeersNodeAddressOptional().isPresent() ?
getPeersNodeAddressOptional().get().getFullAddress() :
networkEnvelope instanceof SendersNodeAddressMessage ?
((SendersNodeAddressMessage) networkEnvelope).getSenderNodeAddress().getFullAddress() :
"[unknown address]";
log.info("We close a connection because of " +
"CloseConnectionReason.MANDATORY_CAPABILITIES_NOT_SUPPORTED " +
"to node {}. Capabilities of old node: {}, " +
"networkEnvelope class name={}",
senderNodeAddress,
this.capabilities.prettyPrint(),
networkEnvelope.getClass().getSimpleName());
shutDown(CloseConnectionReason.MANDATORY_CAPABILITIES_NOT_SUPPORTED);
return;
}
@ -803,7 +813,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
capabilitiesListeners.forEach(weakListener -> {
SupportedCapabilitiesListener supportedCapabilitiesListener = weakListener.get();
if (supportedCapabilitiesListener != null) {
UserThread.execute(() -> supportedCapabilitiesListener.onChanged(supportedCapabilities));
UserThread.execute(() -> supportedCapabilitiesListener.onChanged(capabilities));
}
});
}