Cleanup Connection. Remove protoInputStream.available() (is always 0)

This commit is contained in:
Manfred Karrer 2017-05-13 12:50:34 +02:00
parent cecb0b0838
commit 48dd93db82

View file

@ -4,7 +4,6 @@ import com.google.common.util.concurrent.CycleDetectingLockFactory;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import io.bisq.common.UserThread;
import io.bisq.common.app.DevEnv;
import io.bisq.common.app.Log;
import io.bisq.common.app.Version;
import io.bisq.common.network.NetworkEnvelope;
@ -176,10 +175,11 @@ public class Connection implements MessageListener {
///////////////////////////////////////////////////////////////////////////////////////////
// Called from various threads
public void sendMessage(NetworkEnvelope wireEnvelope) {
public void sendMessage(NetworkEnvelope networkEnvelope) {
log.debug(">> Send networkEnvelope of type: " + networkEnvelope.getClass().getSimpleName());
if (!stopped) {
if (!isCapabilityRequired(wireEnvelope) || isCapabilitySupported(wireEnvelope)) {
if (!isCapabilityRequired(networkEnvelope) || isCapabilitySupported(networkEnvelope)) {
try {
Log.traceCall();
@ -193,48 +193,46 @@ public class Connection implements MessageListener {
Thread.sleep(50);
}
PB.NetworkEnvelope envelope;
lastSendTimeStamp = now;
String peersNodeAddress = peersNodeAddressOptional.isPresent() ? peersNodeAddressOptional.get().toString() : "null";
envelope = wireEnvelope.toProtoNetworkEnvelope();
log.debug("Sending message: {}", Utilities.toTruncatedString(envelope.toString(), 10000));
PB.NetworkEnvelope proto = networkEnvelope.toProtoNetworkEnvelope();
log.debug("Sending message: {}", Utilities.toTruncatedString(proto.toString(), 10000));
if (wireEnvelope instanceof Ping | wireEnvelope instanceof RefreshOfferMessage) {
if (networkEnvelope instanceof Ping | networkEnvelope instanceof RefreshOfferMessage) {
// pings and offer refresh msg we dont want to log in production
log.trace("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Sending direct message to peer" +
"Write object to outputStream to peer: {} (uid={})\ntruncated message={} / size={}" +
"\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n",
peersNodeAddress, uid, envelope.toString(), envelope.getSerializedSize());
} else if (wireEnvelope instanceof PrefixedSealedAndSignedMessage && peersNodeAddressOptional.isPresent()) {
peersNodeAddress, uid, proto.toString(), proto.getSerializedSize());
} else if (networkEnvelope instanceof PrefixedSealedAndSignedMessage && peersNodeAddressOptional.isPresent()) {
setPeerType(Connection.PeerType.DIRECT_MSG_PEER);
log.debug("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Sending direct message to peer" +
"Write object to outputStream to peer: {} (uid={})\ntruncated message={} / size={}" +
"\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n",
peersNodeAddress, uid, Utilities.toTruncatedString(wireEnvelope), -1);
} else if (wireEnvelope instanceof GetDataResponse && ((GetDataResponse) wireEnvelope).isGetUpdatedDataResponse()) {
peersNodeAddress, uid, Utilities.toTruncatedString(networkEnvelope), -1);
} else if (networkEnvelope instanceof GetDataResponse && ((GetDataResponse) networkEnvelope).isGetUpdatedDataResponse()) {
setPeerType(Connection.PeerType.PEER);
} else {
log.debug("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Write object to outputStream to peer: {} (uid={})\ntruncated message={} / size={}" +
"\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n",
peersNodeAddress, uid, Utilities.toTruncatedString(wireEnvelope), envelope.getSerializedSize());
peersNodeAddress, uid, Utilities.toTruncatedString(networkEnvelope), proto.getSerializedSize());
}
if (!stopped) {
protoOutputStreamLock.lock();
envelope.writeDelimitedTo(protoOutputStream);
proto.writeDelimitedTo(protoOutputStream);
protoOutputStream.flush();
statistic.addSentBytes(envelope.getSerializedSize());
statistic.addSentMessage(wireEnvelope);
statistic.addSentBytes(proto.getSerializedSize());
statistic.addSentMessage(networkEnvelope);
// We don't want to get the activity ts updated by ping/pong msg
if (!(wireEnvelope instanceof KeepAliveMessage))
if (!(networkEnvelope instanceof KeepAliveMessage))
statistic.updateLastActivityTimestamp();
}
} catch (Throwable t) {
@ -247,7 +245,6 @@ public class Connection implements MessageListener {
} else {
log.debug("called sendMessage but was already stopped");
}
}
public boolean isCapabilitySupported(NetworkEnvelope wireEnvelope) {
@ -741,14 +738,12 @@ public class Connection implements MessageListener {
}
try {
if (sharedModel.getSocket() != null &&
sharedModel.getSocket().isClosed() ||
protoInputStream.available() < 0) {
log.warn("Shutdown because protoInputStream.available() < 0. protoInputStream.available()=" + protoInputStream.available());
stopAndShutDown(CloseConnectionReason.NO_PROTO_BUFFER_DATA);
sharedModel.getSocket().isClosed()) {
stopAndShutDown(CloseConnectionReason.SOCKET_CLOSED);
return;
}
Connection connection = sharedModel.connection;
Connection connection = checkNotNull(sharedModel.connection, "connection must not be null");
log.trace("InputHandler waiting for incoming network_messages.\n\tConnection=" + connection);
// Throttle inbound network_messages
@ -762,31 +757,18 @@ public class Connection implements MessageListener {
}
// Reading the protobuffer message from the inputstream
PB.NetworkEnvelope envelope;
try {
//TODO check
// if (protoInputStream.available() > 0) {
envelope = PB.NetworkEnvelope.parseDelimitedFrom(protoInputStream);
/* } else {
// we probably got a network issue so return here
// stop();
log.error("protoInputStream.available()=0. we probably got a network issue so return here");
return;
}*/
} catch (Throwable t) {
handleException(t);
return;
}
if (envelope == null) {
PB.NetworkEnvelope proto = PB.NetworkEnvelope.parseDelimitedFrom(protoInputStream);
if (proto == null) {
log.error("proto is null. Should not happen...");
stopAndShutDown(CloseConnectionReason.NO_PROTO_BUFFER_ENV);
return;
}
NetworkEnvelope networkEnvelope = networkProtoResolver.fromProto(envelope);
NetworkEnvelope networkEnvelope = networkProtoResolver.fromProto(proto);
lastReadTimeStamp = now;
log.debug("<< Received networkEnvelope of type: " + networkEnvelope.getClass().getSimpleName());
int size = envelope.getSerializedSize();
int size = proto.getSerializedSize();
if (networkEnvelope instanceof Pong || networkEnvelope instanceof RefreshOfferMessage) {
// We only log Pong and RefreshOfferMsg when in dev environment (trace)
log.trace("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" +
@ -794,7 +776,7 @@ public class Connection implements MessageListener {
"Received object (truncated)={} / size={}"
+ "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n",
connection,
Utilities.toTruncatedString(envelope.toString()),
Utilities.toTruncatedString(proto.toString()),
size);
} else {
// We want to log all incoming network_messages (except Pong and RefreshOfferMsg)
@ -805,7 +787,7 @@ public class Connection implements MessageListener {
"Received object (truncated)={} / size={}"
+ "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n",
connection,
Utilities.toTruncatedString(envelope.toString()),
Utilities.toTruncatedString(proto.toString()),
size);
}
@ -819,16 +801,17 @@ public class Connection implements MessageListener {
boolean exceeds;
if (networkEnvelope instanceof ExtendedDataSizePermission) {
exceeds = size > MAX_PERMITTED_MESSAGE_SIZE;
log.debug("size={}; object={}", size, Utilities.toTruncatedString(envelope, 100));
log.debug("size={}; object={}", size, Utilities.toTruncatedString(proto, 100));
} else {
exceeds = size > PERMITTED_MESSAGE_SIZE;
}
if (exceeds)
log.warn("size > MAX_MSG_SIZE. size={}; object={}", size, Utilities.toTruncatedString(envelope));
if (exceeds) {
log.warn("size > MAX_MSG_SIZE. size={}; object={}", size, Utilities.toTruncatedString(proto));
if (exceeds && reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED))
return;
if (reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED))
return;
}
// Then check data throttle limit. Do that for non-message type objects as well,
// so that's why we use serializable here.
@ -837,38 +820,32 @@ public class Connection implements MessageListener {
return;
// Check P2P network ID
int messageVersion = envelope.getMessageVersion();
if (messageVersion != Version.getP2PMessageVersion()) {
reportInvalidRequest(RuleViolation.WRONG_NETWORK_ID);
if (DevEnv.DEV_MODE)
throw new RuntimeException("messageVersion is not set " + messageVersion +
" / message=" + envelope.toString());
if (proto.getMessageVersion() != Version.getP2PMessageVersion()
&& reportInvalidRequest(RuleViolation.WRONG_NETWORK_ID))
return;
}
// TODO no inheritance & is this even needed? We can send unsupported stuff to old nodes
if (sharedModel.getSupportedCapabilities() == null && networkEnvelope instanceof SupportedCapabilitiesMessage)
sharedModel.setSupportedCapabilities(((SupportedCapabilitiesMessage) networkEnvelope).getSupportedCapabilities());
if (networkEnvelope instanceof CloseConnectionMessage) {
// If we get a CloseConnectionMessage we shut down
log.debug("CloseConnectionMessage received. Reason={}\n\t" +
"connection={}", envelope.getCloseConnectionMessage().getReason(), connection);
if (CloseConnectionReason.PEER_BANNED.name().equals(envelope.getCloseConnectionMessage().getReason())) {
"connection={}", proto.getCloseConnectionMessage().getReason(), connection);
if (CloseConnectionReason.PEER_BANNED.name().equals(proto.getCloseConnectionMessage().getReason())) {
log.warn("We got shut down because we are banned by the other peer. (InputHandler.run CloseConnectionMessage)");
stopAndShutDown(CloseConnectionReason.PEER_BANNED);
} else {
stopAndShutDown(CloseConnectionReason.CLOSE_REQUESTED_BY_PEER);
}
return;
} else if (!stopped) {
// We don't want to get the activity ts updated by ping/pong msg
if (!(networkEnvelope instanceof KeepAliveMessage)) {
if (!(networkEnvelope instanceof KeepAliveMessage))
connection.statistic.updateLastActivityTimestamp();
}
if (networkEnvelope instanceof GetDataRequest) {
if (networkEnvelope instanceof GetDataRequest)
connection.setPeerType(PeerType.INITIAL_DATA_REQUEST);
}
// First a seed node gets a message from a peer (PreliminaryDataRequest using
// AnonymousMessage interface) which does not have its hidden service
// published, so it does not know its address. As the IncomingConnection does not have the
@ -902,19 +879,16 @@ public class Connection implements MessageListener {
if (networkEnvelope instanceof PrefixedSealedAndSignedMessage)
connection.setPeerType(Connection.PeerType.DIRECT_MSG_PEER);
// Further treatment of the incoming message
messageListener.onMessage(networkEnvelope, connection);
}
} catch (InvalidClassException e) {
log.error(e.getMessage());
e.printStackTrace();
reportInvalidRequest(RuleViolation.INVALID_CLASS);
return;
} catch (NoClassDefFoundError e) {
log.warn(e.getMessage());
log.error(e.getMessage());
e.printStackTrace();
reportInvalidRequest(RuleViolation.INVALID_DATA_TYPE);
return;
} catch (Throwable t) {
handleException(t);
}