mirror of
https://github.com/bisq-network/bisq.git
synced 2025-02-22 14:42:37 +01:00
Apply formatting (no code change)
This commit is contained in:
parent
36f8b0275a
commit
2410f38796
1 changed files with 160 additions and 159 deletions
|
@ -291,14 +291,14 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
|
|||
if (msg instanceof AddDataMessage) {
|
||||
final ProtectedStoragePayload protectedStoragePayload = (((AddDataMessage) msg).getProtectedStorageEntry()).getProtectedStoragePayload();
|
||||
result = !(protectedStoragePayload instanceof CapabilityRequiringPayload);
|
||||
if(!result)
|
||||
if (!result)
|
||||
result = capabilities.containsAll(((CapabilityRequiringPayload) protectedStoragePayload).getRequiredCapabilities());
|
||||
} else if (msg instanceof AddPersistableNetworkPayloadMessage) {
|
||||
final PersistableNetworkPayload persistableNetworkPayload = ((AddPersistableNetworkPayloadMessage) msg).getPersistableNetworkPayload();
|
||||
result = !(persistableNetworkPayload instanceof CapabilityRequiringPayload);
|
||||
if(!result)
|
||||
result = capabilities.containsAll(((CapabilityRequiringPayload) persistableNetworkPayload).getRequiredCapabilities());
|
||||
} else if(msg instanceof CapabilityRequiringPayload) {
|
||||
if (!result)
|
||||
result = capabilities.containsAll(((CapabilityRequiringPayload) persistableNetworkPayload).getRequiredCapabilities());
|
||||
} else if (msg instanceof CapabilityRequiringPayload) {
|
||||
result = capabilities.containsAll(((CapabilityRequiringPayload) msg).getRequiredCapabilities());
|
||||
} else {
|
||||
result = true;
|
||||
|
@ -549,6 +549,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
|
|||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// SharedSpace
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* Holds all shared data between Connection and InputHandler
|
||||
* Runs in same thread as Connection
|
||||
|
@ -627,58 +628,58 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
|
|||
// Runs in same thread as Connection, receives a message, performs several checks on it
|
||||
// (including throttling limits, validity and statistics)
|
||||
// and delivers it to the message listener given in the constructor.
|
||||
private InputStream protoInputStream;
|
||||
private final NetworkProtoResolver networkProtoResolver;
|
||||
private InputStream protoInputStream;
|
||||
private final NetworkProtoResolver networkProtoResolver;
|
||||
|
||||
private long lastReadTimeStamp;
|
||||
private boolean threadNameSet;
|
||||
private long lastReadTimeStamp;
|
||||
private boolean threadNameSet;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
Thread.currentThread().setName("InputHandler");
|
||||
while (!stopped && !Thread.currentThread().isInterrupted()) {
|
||||
if (!threadNameSet && getPeersNodeAddressOptional().isPresent()) {
|
||||
Thread.currentThread().setName("InputHandler-" + getPeersNodeAddressOptional().get().getFullAddress());
|
||||
threadNameSet = true;
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
Thread.currentThread().setName("InputHandler");
|
||||
while (!stopped && !Thread.currentThread().isInterrupted()) {
|
||||
if (!threadNameSet && getPeersNodeAddressOptional().isPresent()) {
|
||||
Thread.currentThread().setName("InputHandler-" + getPeersNodeAddressOptional().get().getFullAddress());
|
||||
threadNameSet = true;
|
||||
}
|
||||
try {
|
||||
if (socket != null &&
|
||||
socket.isClosed()) {
|
||||
log.warn("Socket is null or closed socket={}", socket);
|
||||
shutDown(CloseConnectionReason.SOCKET_CLOSED);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (socket != null &&
|
||||
socket.isClosed()) {
|
||||
log.warn("Socket is null or closed socket={}", socket);
|
||||
shutDown(CloseConnectionReason.SOCKET_CLOSED);
|
||||
return;
|
||||
}
|
||||
|
||||
// Throttle inbound network_messages
|
||||
long now = System.currentTimeMillis();
|
||||
long elapsed = now - lastReadTimeStamp;
|
||||
if (elapsed < 10) {
|
||||
log.debug("We got 2 network_messages received in less than 10 ms. We set the thread to sleep " +
|
||||
"for 20 ms to avoid getting flooded by our peer. lastReadTimeStamp={}, now={}, elapsed={}",
|
||||
lastReadTimeStamp, now, elapsed);
|
||||
Thread.sleep(20);
|
||||
}
|
||||
// Throttle inbound network_messages
|
||||
long now = System.currentTimeMillis();
|
||||
long elapsed = now - lastReadTimeStamp;
|
||||
if (elapsed < 10) {
|
||||
log.debug("We got 2 network_messages received in less than 10 ms. We set the thread to sleep " +
|
||||
"for 20 ms to avoid getting flooded by our peer. lastReadTimeStamp={}, now={}, elapsed={}",
|
||||
lastReadTimeStamp, now, elapsed);
|
||||
Thread.sleep(20);
|
||||
}
|
||||
|
||||
// Reading the protobuffer message from the inputstream
|
||||
PB.NetworkEnvelope proto = PB.NetworkEnvelope.parseDelimitedFrom(protoInputStream);
|
||||
// Reading the protobuffer message from the inputstream
|
||||
PB.NetworkEnvelope proto = PB.NetworkEnvelope.parseDelimitedFrom(protoInputStream);
|
||||
|
||||
if (proto == null) {
|
||||
if (protoInputStream.read() == -1)
|
||||
log.info("proto is null because protoInputStream.read()=-1 (EOF). That is expected if client got stopped without proper shutdown.");
|
||||
else
|
||||
log.warn("proto is null. protoInputStream.read()=" + protoInputStream.read());
|
||||
shutDown(CloseConnectionReason.NO_PROTO_BUFFER_ENV);
|
||||
return;
|
||||
}
|
||||
if (proto == null) {
|
||||
if (protoInputStream.read() == -1)
|
||||
log.info("proto is null because protoInputStream.read()=-1 (EOF). That is expected if client got stopped without proper shutdown.");
|
||||
else
|
||||
log.warn("proto is null. protoInputStream.read()=" + protoInputStream.read());
|
||||
shutDown(CloseConnectionReason.NO_PROTO_BUFFER_ENV);
|
||||
return;
|
||||
}
|
||||
|
||||
NetworkEnvelope networkEnvelope = networkProtoResolver.fromProto(proto);
|
||||
lastReadTimeStamp = now;
|
||||
log.debug("<< Received networkEnvelope of type: " + networkEnvelope.getClass().getSimpleName());
|
||||
NetworkEnvelope networkEnvelope = networkProtoResolver.fromProto(proto);
|
||||
lastReadTimeStamp = now;
|
||||
log.debug("<< Received networkEnvelope of type: " + networkEnvelope.getClass().getSimpleName());
|
||||
|
||||
int size = proto.getSerializedSize();
|
||||
// We comment out that part as only debug and trace log level is used. For debugging purposes
|
||||
// we leave the code though.
|
||||
int size = proto.getSerializedSize();
|
||||
// We comment out that part as only debug and trace log level is used. For debugging purposes
|
||||
// we leave the code though.
|
||||
/*if (networkEnvelope instanceof Pong || networkEnvelope instanceof RefreshOfferMessage) {
|
||||
// We only log Pong and RefreshOfferMsg when in dev environment (trace)
|
||||
log.trace("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" +
|
||||
|
@ -701,121 +702,121 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
|
|||
size);
|
||||
}*/
|
||||
|
||||
// We want to track the size of each object even if it is invalid data
|
||||
statistic.addReceivedBytes(size);
|
||||
// We want to track the size of each object even if it is invalid data
|
||||
statistic.addReceivedBytes(size);
|
||||
|
||||
// We want to track the network_messages also before the checks, so do it early...
|
||||
statistic.addReceivedMessage(networkEnvelope);
|
||||
// We want to track the network_messages also before the checks, so do it early...
|
||||
statistic.addReceivedMessage(networkEnvelope);
|
||||
|
||||
// First we check the size
|
||||
boolean exceeds;
|
||||
if (networkEnvelope instanceof ExtendedDataSizePermission) {
|
||||
exceeds = size > MAX_PERMITTED_MESSAGE_SIZE;
|
||||
log.debug("size={}; object={}", size, Utilities.toTruncatedString(proto, 100));
|
||||
} else {
|
||||
exceeds = size > PERMITTED_MESSAGE_SIZE;
|
||||
}
|
||||
|
||||
if (networkEnvelope instanceof AddPersistableNetworkPayloadMessage &&
|
||||
!((AddPersistableNetworkPayloadMessage) networkEnvelope).getPersistableNetworkPayload().verifyHashSize()) {
|
||||
log.warn("PersistableNetworkPayload.verifyHashSize failed. hashSize={}; object={}",
|
||||
((AddPersistableNetworkPayloadMessage) networkEnvelope).getPersistableNetworkPayload().getHash().length,
|
||||
Utilities.toTruncatedString(proto));
|
||||
if (reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED))
|
||||
return;
|
||||
}
|
||||
|
||||
if (exceeds) {
|
||||
log.warn("size > MAX_MSG_SIZE. size={}; object={}", size, Utilities.toTruncatedString(proto));
|
||||
|
||||
if (reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED))
|
||||
return;
|
||||
}
|
||||
|
||||
if (violatesThrottleLimit(networkEnvelope)
|
||||
&& reportInvalidRequest(RuleViolation.THROTTLE_LIMIT_EXCEEDED))
|
||||
return;
|
||||
|
||||
// Check P2P network ID
|
||||
if (proto.getMessageVersion() != Version.getP2PMessageVersion()
|
||||
&& reportInvalidRequest(RuleViolation.WRONG_NETWORK_ID)) {
|
||||
log.warn("RuleViolation.WRONG_NETWORK_ID. version of message={}, app version={}, " +
|
||||
"proto.toTruncatedString={}", proto.getMessageVersion(),
|
||||
Version.getP2PMessageVersion(),
|
||||
Utilities.toTruncatedString(proto.toString()));
|
||||
return;
|
||||
}
|
||||
|
||||
if (networkEnvelope instanceof SupportedCapabilitiesMessage)
|
||||
capabilities.set(((SupportedCapabilitiesMessage) networkEnvelope).getSupportedCapabilities());
|
||||
|
||||
if (networkEnvelope instanceof CloseConnectionMessage) {
|
||||
// If we get a CloseConnectionMessage we shut down
|
||||
log.info("CloseConnectionMessage received. Reason={}\n\t" +
|
||||
"connection={}", proto.getCloseConnectionMessage().getReason(), this);
|
||||
if (CloseConnectionReason.PEER_BANNED.name().equals(proto.getCloseConnectionMessage().getReason())) {
|
||||
log.warn("We got shut down because we are banned by the other peer. (InputHandler.run CloseConnectionMessage)");
|
||||
shutDown(CloseConnectionReason.PEER_BANNED);
|
||||
} else {
|
||||
shutDown(CloseConnectionReason.CLOSE_REQUESTED_BY_PEER);
|
||||
}
|
||||
return;
|
||||
} else if (!stopped) {
|
||||
// We don't want to get the activity ts updated by ping/pong msg
|
||||
if (!(networkEnvelope instanceof KeepAliveMessage))
|
||||
statistic.updateLastActivityTimestamp();
|
||||
|
||||
if (networkEnvelope instanceof GetDataRequest)
|
||||
setPeerType(PeerType.INITIAL_DATA_REQUEST);
|
||||
|
||||
// First a seed node gets a message from a peer (PreliminaryDataRequest using
|
||||
// AnonymousMessage interface) which does not have its hidden service
|
||||
// published, so it does not know its address. As the IncomingConnection does not have the
|
||||
// peersNodeAddress set that connection cannot be used for outgoing network_messages until we
|
||||
// get the address set.
|
||||
// At the data update message (DataRequest using SendersNodeAddressMessage interface)
|
||||
// after the HS is published we get the peer's address set.
|
||||
|
||||
// There are only those network_messages used for new connections to a peer:
|
||||
// 1. PreliminaryDataRequest
|
||||
// 2. DataRequest (implements SendersNodeAddressMessage)
|
||||
// 3. GetPeersRequest (implements SendersNodeAddressMessage)
|
||||
// 4. DirectMessage (implements SendersNodeAddressMessage)
|
||||
if (networkEnvelope instanceof SendersNodeAddressMessage) {
|
||||
NodeAddress senderNodeAddress = ((SendersNodeAddressMessage) networkEnvelope).getSenderNodeAddress();
|
||||
Optional<NodeAddress> peersNodeAddressOptional = getPeersNodeAddressOptional();
|
||||
if (peersNodeAddressOptional.isPresent()) {
|
||||
// If we have already the peers address we check again if it matches our stored one
|
||||
checkArgument(peersNodeAddressOptional.get().equals(senderNodeAddress),
|
||||
"senderNodeAddress not matching connections peer address.\n\t" +
|
||||
"message=" + networkEnvelope);
|
||||
} else {
|
||||
// We must not shut down a banned peer at that moment as it would trigger a connection termination
|
||||
// and we could not send the CloseConnectionMessage.
|
||||
// We check for a banned peer inside setPeersNodeAddress() and shut down if banned.
|
||||
setPeersNodeAddress(senderNodeAddress);
|
||||
}
|
||||
}
|
||||
|
||||
if (networkEnvelope instanceof PrefixedSealedAndSignedMessage)
|
||||
setPeerType(Connection.PeerType.DIRECT_MSG_PEER);
|
||||
|
||||
onMessage(networkEnvelope, this);
|
||||
}
|
||||
} catch (InvalidClassException e) {
|
||||
log.error(e.getMessage());
|
||||
e.printStackTrace();
|
||||
reportInvalidRequest(RuleViolation.INVALID_CLASS);
|
||||
} catch (ProtobufferException | NoClassDefFoundError e) {
|
||||
log.error(e.getMessage());
|
||||
e.printStackTrace();
|
||||
reportInvalidRequest(RuleViolation.INVALID_DATA_TYPE);
|
||||
} catch (Throwable t) {
|
||||
handleException(t);
|
||||
// First we check the size
|
||||
boolean exceeds;
|
||||
if (networkEnvelope instanceof ExtendedDataSizePermission) {
|
||||
exceeds = size > MAX_PERMITTED_MESSAGE_SIZE;
|
||||
log.debug("size={}; object={}", size, Utilities.toTruncatedString(proto, 100));
|
||||
} else {
|
||||
exceeds = size > PERMITTED_MESSAGE_SIZE;
|
||||
}
|
||||
|
||||
if (networkEnvelope instanceof AddPersistableNetworkPayloadMessage &&
|
||||
!((AddPersistableNetworkPayloadMessage) networkEnvelope).getPersistableNetworkPayload().verifyHashSize()) {
|
||||
log.warn("PersistableNetworkPayload.verifyHashSize failed. hashSize={}; object={}",
|
||||
((AddPersistableNetworkPayloadMessage) networkEnvelope).getPersistableNetworkPayload().getHash().length,
|
||||
Utilities.toTruncatedString(proto));
|
||||
if (reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED))
|
||||
return;
|
||||
}
|
||||
|
||||
if (exceeds) {
|
||||
log.warn("size > MAX_MSG_SIZE. size={}; object={}", size, Utilities.toTruncatedString(proto));
|
||||
|
||||
if (reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED))
|
||||
return;
|
||||
}
|
||||
|
||||
if (violatesThrottleLimit(networkEnvelope)
|
||||
&& reportInvalidRequest(RuleViolation.THROTTLE_LIMIT_EXCEEDED))
|
||||
return;
|
||||
|
||||
// Check P2P network ID
|
||||
if (proto.getMessageVersion() != Version.getP2PMessageVersion()
|
||||
&& reportInvalidRequest(RuleViolation.WRONG_NETWORK_ID)) {
|
||||
log.warn("RuleViolation.WRONG_NETWORK_ID. version of message={}, app version={}, " +
|
||||
"proto.toTruncatedString={}", proto.getMessageVersion(),
|
||||
Version.getP2PMessageVersion(),
|
||||
Utilities.toTruncatedString(proto.toString()));
|
||||
return;
|
||||
}
|
||||
|
||||
if (networkEnvelope instanceof SupportedCapabilitiesMessage)
|
||||
capabilities.set(((SupportedCapabilitiesMessage) networkEnvelope).getSupportedCapabilities());
|
||||
|
||||
if (networkEnvelope instanceof CloseConnectionMessage) {
|
||||
// If we get a CloseConnectionMessage we shut down
|
||||
log.info("CloseConnectionMessage received. Reason={}\n\t" +
|
||||
"connection={}", proto.getCloseConnectionMessage().getReason(), this);
|
||||
if (CloseConnectionReason.PEER_BANNED.name().equals(proto.getCloseConnectionMessage().getReason())) {
|
||||
log.warn("We got shut down because we are banned by the other peer. (InputHandler.run CloseConnectionMessage)");
|
||||
shutDown(CloseConnectionReason.PEER_BANNED);
|
||||
} else {
|
||||
shutDown(CloseConnectionReason.CLOSE_REQUESTED_BY_PEER);
|
||||
}
|
||||
return;
|
||||
} else if (!stopped) {
|
||||
// We don't want to get the activity ts updated by ping/pong msg
|
||||
if (!(networkEnvelope instanceof KeepAliveMessage))
|
||||
statistic.updateLastActivityTimestamp();
|
||||
|
||||
if (networkEnvelope instanceof GetDataRequest)
|
||||
setPeerType(PeerType.INITIAL_DATA_REQUEST);
|
||||
|
||||
// First a seed node gets a message from a peer (PreliminaryDataRequest using
|
||||
// AnonymousMessage interface) which does not have its hidden service
|
||||
// published, so it does not know its address. As the IncomingConnection does not have the
|
||||
// peersNodeAddress set that connection cannot be used for outgoing network_messages until we
|
||||
// get the address set.
|
||||
// At the data update message (DataRequest using SendersNodeAddressMessage interface)
|
||||
// after the HS is published we get the peer's address set.
|
||||
|
||||
// There are only those network_messages used for new connections to a peer:
|
||||
// 1. PreliminaryDataRequest
|
||||
// 2. DataRequest (implements SendersNodeAddressMessage)
|
||||
// 3. GetPeersRequest (implements SendersNodeAddressMessage)
|
||||
// 4. DirectMessage (implements SendersNodeAddressMessage)
|
||||
if (networkEnvelope instanceof SendersNodeAddressMessage) {
|
||||
NodeAddress senderNodeAddress = ((SendersNodeAddressMessage) networkEnvelope).getSenderNodeAddress();
|
||||
Optional<NodeAddress> peersNodeAddressOptional = getPeersNodeAddressOptional();
|
||||
if (peersNodeAddressOptional.isPresent()) {
|
||||
// If we have already the peers address we check again if it matches our stored one
|
||||
checkArgument(peersNodeAddressOptional.get().equals(senderNodeAddress),
|
||||
"senderNodeAddress not matching connections peer address.\n\t" +
|
||||
"message=" + networkEnvelope);
|
||||
} else {
|
||||
// We must not shut down a banned peer at that moment as it would trigger a connection termination
|
||||
// and we could not send the CloseConnectionMessage.
|
||||
// We check for a banned peer inside setPeersNodeAddress() and shut down if banned.
|
||||
setPeersNodeAddress(senderNodeAddress);
|
||||
}
|
||||
}
|
||||
|
||||
if (networkEnvelope instanceof PrefixedSealedAndSignedMessage)
|
||||
setPeerType(Connection.PeerType.DIRECT_MSG_PEER);
|
||||
|
||||
onMessage(networkEnvelope, this);
|
||||
}
|
||||
} catch (InvalidClassException e) {
|
||||
log.error(e.getMessage());
|
||||
e.printStackTrace();
|
||||
reportInvalidRequest(RuleViolation.INVALID_CLASS);
|
||||
} catch (ProtobufferException | NoClassDefFoundError e) {
|
||||
log.error(e.getMessage());
|
||||
e.printStackTrace();
|
||||
reportInvalidRequest(RuleViolation.INVALID_DATA_TYPE);
|
||||
} catch (Throwable t) {
|
||||
handleException(t);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
handleException(t);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
handleException(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue