Refactor sendMessage method: Return early

This commit is contained in:
chimp1984 2021-01-01 16:24:17 -05:00
parent cc433ab57d
commit f57d4e041b
No known key found for this signature in database
GPG key ID: 9801B4EC591F90E3

View file

@ -241,95 +241,99 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
public void sendMessage(NetworkEnvelope networkEnvelope) {
log.debug(">> Send networkEnvelope of type: {}", networkEnvelope.getClass().getSimpleName());
if (!stopped) {
if (networkFilter != null &&
peersNodeAddressOptional.isPresent() &&
networkFilter.isPeerBanned(peersNodeAddressOptional.get())) {
reportInvalidRequest(RuleViolation.PEER_BANNED);
return;
if (stopped) {
log.debug("called sendMessage but was already stopped");
return;
}
if (networkFilter != null &&
peersNodeAddressOptional.isPresent() &&
networkFilter.isPeerBanned(peersNodeAddressOptional.get())) {
reportInvalidRequest(RuleViolation.PEER_BANNED);
return;
}
if (!noCapabilityRequiredOrCapabilityIsSupported(networkEnvelope)) {
log.debug("Capability for networkEnvelope is required but not supported");
return;
}
try {
String peersNodeAddress = peersNodeAddressOptional.map(NodeAddress::toString).orElse("null");
if (networkEnvelope instanceof PrefixedSealedAndSignedMessage && peersNodeAddressOptional.isPresent()) {
setPeerType(Connection.PeerType.DIRECT_MSG_PEER);
log.debug("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Sending direct message to peer" +
"Write object to outputStream to peer: {} (uid={})\ntruncated message={} / size={}" +
"\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n",
peersNodeAddress, uid, Utilities.toTruncatedString(networkEnvelope), -1);
} else if (networkEnvelope instanceof GetDataResponse && ((GetDataResponse) networkEnvelope).isGetUpdatedDataResponse()) {
setPeerType(Connection.PeerType.PEER);
}
if (noCapabilityRequiredOrCapabilityIsSupported(networkEnvelope)) {
try {
String peersNodeAddress = peersNodeAddressOptional.map(NodeAddress::toString).orElse("null");
// Throttle outbound network_messages
long now = System.currentTimeMillis();
long elapsed = now - lastSendTimeStamp;
if (elapsed < getSendMsgThrottleTrigger()) {
log.debug("We got 2 sendMessage requests in less than {} ms. We set the thread to sleep " +
"for {} ms to avoid flooding our peer. lastSendTimeStamp={}, now={}, elapsed={}, networkEnvelope={}",
getSendMsgThrottleTrigger(), getSendMsgThrottleSleep(), lastSendTimeStamp, now, elapsed,
networkEnvelope.getClass().getSimpleName());
if (networkEnvelope instanceof PrefixedSealedAndSignedMessage && peersNodeAddressOptional.isPresent()) {
setPeerType(Connection.PeerType.DIRECT_MSG_PEER);
// check if BundleOfEnvelopes is supported
if (getCapabilities().containsAll(new Capabilities(Capability.BUNDLE_OF_ENVELOPES))) {
synchronized (lock) {
// check if current envelope fits size
// - no? create new envelope
if (queueOfBundles.isEmpty() || queueOfBundles.element().toProtoNetworkEnvelope().getSerializedSize() + networkEnvelope.toProtoNetworkEnvelope().getSerializedSize() > MAX_PERMITTED_MESSAGE_SIZE * 0.9) {
// - no? create a bucket
queueOfBundles.add(new BundleOfEnvelopes());
log.debug("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Sending direct message to peer" +
"Write object to outputStream to peer: {} (uid={})\ntruncated message={} / size={}" +
"\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n",
peersNodeAddress, uid, Utilities.toTruncatedString(networkEnvelope), -1);
} else if (networkEnvelope instanceof GetDataResponse && ((GetDataResponse) networkEnvelope).isGetUpdatedDataResponse()) {
setPeerType(Connection.PeerType.PEER);
}
// - and schedule it for sending
lastSendTimeStamp += getSendMsgThrottleSleep();
// Throttle outbound network_messages
long now = System.currentTimeMillis();
long elapsed = now - lastSendTimeStamp;
if (elapsed < getSendMsgThrottleTrigger()) {
log.debug("We got 2 sendMessage requests in less than {} ms. We set the thread to sleep " +
"for {} ms to avoid flooding our peer. lastSendTimeStamp={}, now={}, elapsed={}, networkEnvelope={}",
getSendMsgThrottleTrigger(), getSendMsgThrottleSleep(), lastSendTimeStamp, now, elapsed,
networkEnvelope.getClass().getSimpleName());
// check if BundleOfEnvelopes is supported
if (getCapabilities().containsAll(new Capabilities(Capability.BUNDLE_OF_ENVELOPES))) {
synchronized (lock) {
// check if current envelope fits size
// - no? create new envelope
if (queueOfBundles.isEmpty() || queueOfBundles.element().toProtoNetworkEnvelope().getSerializedSize() + networkEnvelope.toProtoNetworkEnvelope().getSerializedSize() > MAX_PERMITTED_MESSAGE_SIZE * 0.9) {
// - no? create a bucket
queueOfBundles.add(new BundleOfEnvelopes());
// - and schedule it for sending
lastSendTimeStamp += getSendMsgThrottleSleep();
bundleSender.schedule(() -> {
if (!stopped) {
synchronized (lock) {
BundleOfEnvelopes bundle = queueOfBundles.poll();
if (bundle != null && !stopped) {
NetworkEnvelope envelope = bundle.getEnvelopes().size() == 1 ?
bundle.getEnvelopes().get(0) :
bundle;
try {
protoOutputStream.writeEnvelope(envelope);
} catch (Throwable t) {
log.error("Sending envelope of class {} to address {} " +
"failed due {}",
envelope.getClass().getSimpleName(),
this.getPeersNodeAddressOptional(),
t.toString());
log.error("envelope: {}", envelope);
}
}
bundleSender.schedule(() -> {
if (!stopped) {
synchronized (lock) {
BundleOfEnvelopes bundle = queueOfBundles.poll();
if (bundle != null && !stopped) {
NetworkEnvelope envelope = bundle.getEnvelopes().size() == 1 ?
bundle.getEnvelopes().get(0) :
bundle;
try {
protoOutputStream.writeEnvelope(envelope);
} catch (Throwable t) {
log.error("Sending envelope of class {} to address {} " +
"failed due {}",
envelope.getClass().getSimpleName(),
this.getPeersNodeAddressOptional(),
t.toString());
log.error("envelope: {}", envelope);
}
}
}, lastSendTimeStamp - now, TimeUnit.MILLISECONDS);
}
}
// - yes? add to bucket
queueOfBundles.element().add(networkEnvelope);
}
return;
}, lastSendTimeStamp - now, TimeUnit.MILLISECONDS);
}
Thread.sleep(getSendMsgThrottleSleep());
// - yes? add to bucket
queueOfBundles.element().add(networkEnvelope);
}
lastSendTimeStamp = now;
if (!stopped) {
protoOutputStream.writeEnvelope(networkEnvelope);
}
} catch (Throwable t) {
handleException(t);
return;
}
Thread.sleep(getSendMsgThrottleSleep());
}
} else {
log.debug("called sendMessage but was already stopped");
lastSendTimeStamp = now;
if (!stopped) {
protoOutputStream.writeEnvelope(networkEnvelope);
}
} catch (Throwable t) {
handleException(t);
}
}