mirror of
https://github.com/bisq-network/bisq.git
synced 2025-02-23 15:00:30 +01:00
working ping/pong
This commit is contained in:
parent
e224422b83
commit
dd17069b1e
6 changed files with 398 additions and 53 deletions
|
@ -22,5 +22,10 @@
|
|||
<artifactId>spring-core</artifactId>
|
||||
<version>4.1.1.RELEASE</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.protobuf</groupId>
|
||||
<artifactId>protobuf-java</artifactId>
|
||||
<version>3.1.0</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
42
common/src/main/proto/bsmessage.proto
Normal file
42
common/src/main/proto/bsmessage.proto
Normal file
|
@ -0,0 +1,42 @@
|
|||
syntax = "proto3";
|
||||
|
||||
package io.bitsquare.proto;
|
||||
|
||||
//
|
||||
// A simple protocol for describing signed sets of IP addresses. Intended to be distributed via HTTP[S] or in files.
|
||||
//
|
||||
|
||||
option java_package = "io.bitsquare.common.wire.proto";
|
||||
option java_outer_classname = "Messages";
|
||||
|
||||
message Envelope {
|
||||
int64 p2p_network_version = 1;
|
||||
oneof message {
|
||||
Ping ping = 2;
|
||||
Pong pong = 3;
|
||||
RefreshTTLMessage refresh_ttl_message = 4;
|
||||
CloseConnectionMessage close_connection_message = 5;
|
||||
}
|
||||
}
|
||||
|
||||
message Ping {
|
||||
int32 nonce = 1;
|
||||
int32 lastRoundTripTime = 2;
|
||||
}
|
||||
|
||||
message Pong {
|
||||
int32 requestNonce = 1;
|
||||
}
|
||||
|
||||
message RefreshTTLMessage {
|
||||
bytes hashOfDataAndSeqNr = 1; // 32 bytes
|
||||
bytes signature = 2; // 46 bytes
|
||||
bytes hashOfPayload = 3; // 32 bytes
|
||||
int32 sequenceNumber = 4; // 4 bytes
|
||||
}
|
||||
|
||||
message CloseConnectionMessage {
|
||||
int64 messageVersion = 1;
|
||||
string reason = 2;
|
||||
}
|
||||
|
|
@ -0,0 +1,57 @@
|
|||
package io.bitsquare.common.wire.proto;
|
||||
|
||||
import com.google.protobuf.AbstractParser;
|
||||
import com.google.protobuf.Descriptors;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.MessageLite;
|
||||
import com.sun.xml.internal.messaging.saaj.util.ByteInputStream;
|
||||
import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.Test;
|
||||
|
||||
import io.bitsquare.common.wire.proto.Messages;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
@Slf4j
|
||||
public class ProtoBufferTest {
|
||||
|
||||
@Test
|
||||
public void protoTest() {
|
||||
Messages.Ping Ping = Messages.Ping.newBuilder().setNonce(100).build();
|
||||
Messages.Pong Pong = Messages.Pong.newBuilder().setRequestNonce(1000).build();
|
||||
Messages.Envelope envelope1 = Messages.Envelope.newBuilder().setPing(Ping).build();
|
||||
Messages.Envelope envelope2 = Messages.Envelope.newBuilder().setPong(Pong).build();
|
||||
log.info(Ping.toString());
|
||||
log.info(Pong.toString());
|
||||
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
assertFalse(Messages.Envelope.newBuilder().setP2PNetworkVersion(1).isInitialized());
|
||||
try {
|
||||
envelope1.writeDelimitedTo(outputStream);
|
||||
envelope2.writeDelimitedTo(outputStream);
|
||||
ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
|
||||
Messages.Envelope envelope3 = Messages.Envelope.parseDelimitedFrom(inputStream);
|
||||
Messages.Envelope envelope4 = Messages.Envelope.parseDelimitedFrom(inputStream);
|
||||
|
||||
|
||||
log.info("message: {}", envelope3.getPing());
|
||||
//log.info("peerseesd empty: '{}'",envelope3.getPong().equals(Messages.Envelope.) == "");
|
||||
log.info("3 = {} 4 = {}",isPing(envelope3), isPing(envelope4));
|
||||
log.info(envelope3.toString());
|
||||
log.info(envelope4.toString());
|
||||
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public boolean isPing(Messages.Envelope envelope) {
|
||||
return !envelope.getPing().getDefaultInstanceForType().equals(envelope.getPing());
|
||||
}
|
||||
}
|
97
doc/protobuffer.md
Normal file
97
doc/protobuffer.md
Normal file
|
@ -0,0 +1,97 @@
|
|||
# Protobuffer migration
|
||||
|
||||
* classes sent over the wire (P2P network)
|
||||
* classes serialized to disk
|
||||
|
||||
If possible we'll start with the P2P network because this has wider backward compatibility impact.
|
||||
|
||||
## Frameworks
|
||||
|
||||
- Drop-in replacement for serialization, fixes size concerns but not backward compat:
|
||||
https://ruedigermoeller.github.io/fast-serialization/
|
||||
-
|
||||
|
||||
## Protobuffer Setup
|
||||
|
||||
### installing
|
||||
|
||||
* install the latest protobuffer release on your machine (3.1.0 at this time of writing):
|
||||
https://github.com/google/protobuf/releases
|
||||
|
||||
### maven plugin vs ant plugin
|
||||
|
||||
* Bitcoinj uses an ant plugin to call the 'protoc' executable.
|
||||
This was tried but didn't work, although command-line invocation with the same params worked.
|
||||
|
||||
* There is also a protobuf maven plugin
|
||||
This worked immediately + is executed automatically when doing 'mvn clean install'.
|
||||
Output is in target/generated-sources which avoids the temptation of checking in generated classes.
|
||||
|
||||
### multiple different messages in one stream
|
||||
|
||||
In order to support this, we need to use .writeDelimitedTo(outputstream) and parseDelimitedFrom(inputstream).
|
||||
The writeDelimited writes a length varint before the object, allowing the parseDelimited to know the extent of the message.
|
||||
|
||||
## P2P Network
|
||||
|
||||
### Extends Payload search results
|
||||
|
||||
```
|
||||
public final class PubKeyRing implements Payload {
|
||||
public final class SealedAndSigned implements Payload {
|
||||
public final class PrivateNotification implements Payload {
|
||||
public final class Dispute implements Payload {
|
||||
public final class DisputeResult implements Payload {
|
||||
public final class Attachment implements Payload {
|
||||
public final class RawTransactionInput implements Payload {
|
||||
public abstract class PaymentAccountContractData implements Payload {
|
||||
public final class Contract implements Payload {
|
||||
public final class NodeAddress implements Persistable, Payload {
|
||||
public final class Peer implements Payload, Persistable {
|
||||
public interface CapabilityRequiringPayload extends Payload {
|
||||
public interface ExpirablePayload extends Payload {
|
||||
public interface RequiresOwnerIsOnlinePayload extends Payload {
|
||||
public class ProtectedStorageEntry implements Payload {
|
||||
```
|
||||
|
||||
### Messages
|
||||
|
||||
```
|
||||
public interface DirectMessage extends Message {
|
||||
public interface SupportedCapabilitiesMessage extends Message {
|
||||
public final class MockPayload implements Message, ExpirablePayload {
|
||||
public interface AnonymousMessage extends Message {
|
||||
public final class CloseConnectionMessage implements Message {
|
||||
public interface SendersNodeAddressMessage extends Message {
|
||||
public interface GetDataRequest extends Message {
|
||||
public abstract class KeepAliveMessage implements Message {
|
||||
abstract class PeerExchangeMessage implements Message {
|
||||
public abstract class BroadcastMessage implements Message {
|
||||
|
||||
```
|
||||
|
||||
## Disk serialization
|
||||
|
||||
extends Serializable
|
||||
|
||||
```
|
||||
private static class MockMessage implements Serializable {
|
||||
public interface Persistable extends Serializable {
|
||||
public class Tuple2<A, B> implements Serializable {
|
||||
public class Tuple3<A, B, C> implements Serializable {
|
||||
public class Tuple4<A, B, C, D> implements Serializable {
|
||||
public static <T extends Serializable> T deserialize(byte[] data) {
|
||||
public interface Payload extends Serializable {
|
||||
public class PlainTextWrapper implements Serializable {
|
||||
public class Storage<T extends Serializable> {
|
||||
public abstract class HttpClientProvider implements Serializable {
|
||||
public class PaymentAccountFilter implements Serializable {
|
||||
public class CurrencyTuple implements Serializable {
|
||||
public class ProcessModel implements Model, Serializable {
|
||||
public final class Altcoin implements Monetary, Comparable<Altcoin>, Serializable {
|
||||
public class AltcoinExchangeRate implements Serializable {
|
||||
public interface Message extends Serializable {
|
||||
public interface Message extends Serializable {
|
||||
public static final class DataAndSeqNrPair implements Serializable {
|
||||
|
||||
```
|
|
@ -3,12 +3,15 @@ package io.bitsquare.p2p.network;
|
|||
import com.google.common.util.concurrent.CycleDetectingLockFactory;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.common.util.concurrent.Uninterruptibles;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.Descriptors;
|
||||
import io.bitsquare.app.Log;
|
||||
import io.bitsquare.app.Version;
|
||||
import io.bitsquare.common.ByteArrayUtils;
|
||||
import io.bitsquare.common.UserThread;
|
||||
import io.bitsquare.common.util.Tuple2;
|
||||
import io.bitsquare.common.util.Utilities;
|
||||
import io.bitsquare.common.wire.proto.Messages;
|
||||
import io.bitsquare.io.LookAheadObjectInputStream;
|
||||
import io.bitsquare.p2p.Message;
|
||||
import io.bitsquare.p2p.NodeAddress;
|
||||
|
@ -96,14 +99,14 @@ public class Connection implements MessageListener {
|
|||
private final String portInfo;
|
||||
private final String uid;
|
||||
private final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
|
||||
private final ReentrantLock objectOutputStreamLock = cycleDetectingLockFactory.newReentrantLock("objectOutputStreamLock");
|
||||
private final ReentrantLock protoOutputStreamLock = cycleDetectingLockFactory.newReentrantLock("protoOutputStreamLock");
|
||||
// holder of state shared between InputHandler and Connection
|
||||
private final SharedModel sharedModel;
|
||||
private final Statistic statistic;
|
||||
|
||||
// set in init
|
||||
private InputHandler inputHandler;
|
||||
private ObjectOutputStream objectOutputStream;
|
||||
private OutputStream protoOutputStream;
|
||||
|
||||
// mutable data, set from other threads but not changed internally.
|
||||
private Optional<NodeAddress> peersNodeAddressOptional = Optional.empty();
|
||||
|
@ -143,13 +146,13 @@ public class Connection implements MessageListener {
|
|||
socket.setSoTimeout(SOCKET_TIMEOUT);
|
||||
// Need to access first the ObjectOutputStream otherwise the ObjectInputStream would block
|
||||
// See: https://stackoverflow.com/questions/5658089/java-creating-a-new-objectinputstream-blocks/5658109#5658109
|
||||
// When you construct an ObjectInputStream, in the constructor the class attempts to read a header that
|
||||
// When you construct an ObjectInputStream, in the constructor the class attempts to read a header that
|
||||
// the associated ObjectOutputStream on the other end of the connection has written.
|
||||
// It will not return until that header has been read.
|
||||
objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
|
||||
ObjectInputStream objectInputStream = new LookAheadObjectInputStream(socket.getInputStream(), true);
|
||||
// It will not return until that header has been read.
|
||||
protoOutputStream = socket.getOutputStream();
|
||||
InputStream protoInputStream = socket.getInputStream();
|
||||
// We create a thread for handling inputStream data
|
||||
inputHandler = new InputHandler(sharedModel, objectInputStream, portInfo, this);
|
||||
inputHandler = new InputHandler(sharedModel, protoInputStream, portInfo, this);
|
||||
singleThreadExecutor.submit(inputHandler);
|
||||
|
||||
// Use Peer as default, in case of other types they will set it as soon as possible.
|
||||
|
@ -174,6 +177,7 @@ public class Connection implements MessageListener {
|
|||
|
||||
// Called form various threads
|
||||
public void sendMessage(Message message) {
|
||||
|
||||
if (!stopped) {
|
||||
if (!isCapabilityRequired(message) || isCapabilitySupported(message)) {
|
||||
try {
|
||||
|
@ -189,18 +193,39 @@ public class Connection implements MessageListener {
|
|||
Thread.sleep(50);
|
||||
}
|
||||
|
||||
Messages.Envelope.Builder builder = Messages.Envelope.newBuilder().setP2PNetworkVersion(Version.P2P_NETWORK_VERSION);
|
||||
Messages.Envelope envelope = null;
|
||||
|
||||
lastSendTimeStamp = now;
|
||||
String peersNodeAddress = peersNodeAddressOptional.isPresent() ? peersNodeAddressOptional.get().toString() : "null";
|
||||
int size = ByteArrayUtils.objectToByteArray(message).length;
|
||||
|
||||
if (message instanceof Ping || message instanceof RefreshTTLMessage) {
|
||||
if (message instanceof Ping) {
|
||||
envelope = builder.setPing(Messages.Ping.newBuilder()
|
||||
.setNonce(((Ping) message).nonce)
|
||||
.setLastRoundTripTime(((Ping) message).lastRoundTripTime).build()).build();
|
||||
// pings and offer refresh msg we dont want to log in production
|
||||
log.trace("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
|
||||
"Sending direct message to peer" +
|
||||
"Write object to outputStream to peer: {} (uid={})\ntruncated message={} / size={}" +
|
||||
"\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n",
|
||||
peersNodeAddress, uid, Utilities.toTruncatedString(message), size);
|
||||
} else if (message instanceof PrefixedSealedAndSignedMessage && peersNodeAddressOptional.isPresent()) {
|
||||
peersNodeAddress, uid, envelope.toString(), envelope.getSerializedSize());
|
||||
} else if (message instanceof RefreshTTLMessage) {
|
||||
envelope = builder.setRefreshTtlMessage(Messages.RefreshTTLMessage.newBuilder()
|
||||
.setHashOfDataAndSeqNr(ByteString.copyFrom(((RefreshTTLMessage) message).hashOfDataAndSeqNr))
|
||||
.setHashOfPayload(ByteString.copyFrom(((RefreshTTLMessage) message).hashOfPayload))
|
||||
.setSequenceNumber(((RefreshTTLMessage) message).sequenceNumber)
|
||||
.setSignature(ByteString.copyFrom(((RefreshTTLMessage) message).signature)).build()).build();
|
||||
|
||||
// pings and offer refresh msg we dont want to log in production
|
||||
log.trace("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
|
||||
"Sending direct message to peer" +
|
||||
"Write object to outputStream to peer: {} (uid={})\ntruncated message={} / size={}" +
|
||||
"\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n",
|
||||
peersNodeAddress, uid, envelope.toString(), envelope.getSerializedSize());
|
||||
}
|
||||
/*
|
||||
|
||||
else if (message instanceof PrefixedSealedAndSignedMessage && peersNodeAddressOptional.isPresent()) {
|
||||
setPeerType(Connection.PeerType.DIRECT_MSG_PEER);
|
||||
|
||||
log.debug("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
|
||||
|
@ -216,13 +241,15 @@ public class Connection implements MessageListener {
|
|||
"\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n",
|
||||
peersNodeAddress, uid, Utilities.toTruncatedString(message), size);
|
||||
}
|
||||
*/
|
||||
|
||||
if (!stopped) {
|
||||
objectOutputStreamLock.lock();
|
||||
objectOutputStream.writeObject(message);
|
||||
objectOutputStream.flush();
|
||||
if (!stopped && envelope != null) {
|
||||
|
||||
statistic.addSentBytes(size);
|
||||
protoOutputStreamLock.lock();
|
||||
envelope.writeDelimitedTo(protoOutputStream);
|
||||
protoOutputStream.flush();
|
||||
|
||||
statistic.addSentBytes(envelope.getSerializedSize());
|
||||
statistic.addSentMessage(message);
|
||||
|
||||
// We don't want to get the activity ts updated by ping/pong msg
|
||||
|
@ -232,18 +259,21 @@ public class Connection implements MessageListener {
|
|||
} catch (IOException e) {
|
||||
// an exception lead to a shutdown
|
||||
sharedModel.handleConnectionException(e);
|
||||
|
||||
} catch (Throwable t) {
|
||||
log.error(t.getMessage());
|
||||
t.printStackTrace();
|
||||
sharedModel.handleConnectionException(t);
|
||||
} finally {
|
||||
if (objectOutputStreamLock.isLocked())
|
||||
objectOutputStreamLock.unlock();
|
||||
if (protoOutputStreamLock.isLocked())
|
||||
protoOutputStreamLock.unlock();
|
||||
}
|
||||
|
||||
}
|
||||
} else {
|
||||
log.debug("called sendMessage but was already stopped");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public boolean isCapabilitySupported(Message message) {
|
||||
|
@ -681,7 +711,7 @@ public class Connection implements MessageListener {
|
|||
private static final Logger log = LoggerFactory.getLogger(InputHandler.class);
|
||||
|
||||
private final SharedModel sharedModel;
|
||||
private final ObjectInputStream objectInputStream;
|
||||
private final InputStream protoInputStream;
|
||||
private final String portInfo;
|
||||
private final MessageListener messageListener;
|
||||
|
||||
|
@ -689,9 +719,9 @@ public class Connection implements MessageListener {
|
|||
private long lastReadTimeStamp;
|
||||
private boolean threadNameSet;
|
||||
|
||||
public InputHandler(SharedModel sharedModel, ObjectInputStream objectInputStream, String portInfo, MessageListener messageListener) {
|
||||
public InputHandler(SharedModel sharedModel, InputStream protoInputStream, String portInfo, MessageListener messageListener) {
|
||||
this.sharedModel = sharedModel;
|
||||
this.objectInputStream = objectInputStream;
|
||||
this.protoInputStream = protoInputStream;
|
||||
this.portInfo = portInfo;
|
||||
this.messageListener = messageListener;
|
||||
}
|
||||
|
@ -699,7 +729,7 @@ public class Connection implements MessageListener {
|
|||
public void stop() {
|
||||
if (!stopped) {
|
||||
try {
|
||||
objectInputStream.close();
|
||||
protoInputStream.close();
|
||||
} catch (IOException e) {
|
||||
log.error("IOException at InputHandler.stop\n" + e.getMessage());
|
||||
e.printStackTrace();
|
||||
|
@ -719,8 +749,8 @@ public class Connection implements MessageListener {
|
|||
threadNameSet = true;
|
||||
}
|
||||
try {
|
||||
if (sharedModel.getSocket().isClosed() || objectInputStream.available() < 0) {
|
||||
log.warn("Shutdown because objectInputStream.available() < 0. objectInputStream.available()=" + objectInputStream.available());
|
||||
if (sharedModel.getSocket().isClosed() || protoInputStream.available() < 0) {
|
||||
log.warn("Shutdown because protoInputStream.available() < 0. protoInputStream.available()=" + protoInputStream.available());
|
||||
sharedModel.shutDown(CloseConnectionReason.TERMINATED);
|
||||
return;
|
||||
}
|
||||
|
@ -728,7 +758,11 @@ public class Connection implements MessageListener {
|
|||
Connection connection = sharedModel.connection;
|
||||
log.trace("InputHandler waiting for incoming messages.\n\tConnection=" + connection);
|
||||
|
||||
Object rawInputObject = objectInputStream.readObject();
|
||||
Messages.Envelope envelope = Messages.Envelope.parseDelimitedFrom(protoInputStream);
|
||||
if(envelope == null) {
|
||||
log.warn("Envelope is null");
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// Throttle inbound messages
|
||||
|
@ -741,20 +775,38 @@ public class Connection implements MessageListener {
|
|||
Thread.sleep(20);
|
||||
}
|
||||
|
||||
Message message = null;
|
||||
lastReadTimeStamp = now;
|
||||
int size = ByteArrayUtils.objectToByteArray(rawInputObject).length;
|
||||
int size = envelope.getSerializedSize();
|
||||
|
||||
if (rawInputObject instanceof Pong || rawInputObject instanceof RefreshTTLMessage) {
|
||||
if (isPong(envelope) || isRefreshTTLMessage(envelope)) {
|
||||
message = new Pong(envelope.getPong().getRequestNonce());
|
||||
// We only log Pong and RefreshTTLMessage when in dev environment (trace)
|
||||
log.trace("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" +
|
||||
"New data arrived at inputHandler of connection {}.\n" +
|
||||
"Received object (truncated)={} / size={}"
|
||||
+ "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n",
|
||||
connection,
|
||||
Utilities.toTruncatedString(rawInputObject),
|
||||
envelope.toString(),
|
||||
size);
|
||||
} else if (rawInputObject instanceof Message) {
|
||||
// We want to log all incoming messages (except Pong and RefreshTTLMessage)
|
||||
} else
|
||||
if (isPong(envelope) || isRefreshTTLMessage(envelope)) {
|
||||
Messages.RefreshTTLMessage msg = envelope.getRefreshTtlMessage();
|
||||
message = new RefreshTTLMessage(msg.getHashOfDataAndSeqNr().toByteArray(),
|
||||
msg.getSignature().toByteArray(),
|
||||
msg.getHashOfPayload().toByteArray(), msg.getSequenceNumber());
|
||||
// We only log Pong and RefreshTTLMessage when in dev environment (trace)
|
||||
log.trace("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" +
|
||||
"New data arrived at inputHandler of connection {}.\n" +
|
||||
"Received object (truncated)={} / size={}"
|
||||
+ "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n",
|
||||
connection,
|
||||
envelope.toString(),
|
||||
size);
|
||||
}
|
||||
/*
|
||||
else if (rawInputObject instanceof Message) {
|
||||
// We want to log all incoming messages (except Pong and RefreshTTLMessage)
|
||||
// so we log before the data type checks
|
||||
//log.info("size={}; object={}", size, Utilities.toTruncatedString(rawInputObject.toString(), 100));
|
||||
log.debug("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" +
|
||||
|
@ -764,17 +816,20 @@ public class Connection implements MessageListener {
|
|||
connection,
|
||||
Utilities.toTruncatedString(rawInputObject),
|
||||
size);
|
||||
} else {
|
||||
}
|
||||
*/
|
||||
else {
|
||||
log.error("Invalid data arrived at inputHandler of connection {} Size={}", connection, size);
|
||||
try {
|
||||
// Don't call toString on rawInputObject
|
||||
log.error("rawInputObject.className=" + rawInputObject.getClass().getName());
|
||||
log.error("rawInputObject.className=" + envelope.toString());
|
||||
} catch (Throwable ignore) {
|
||||
}
|
||||
}
|
||||
|
||||
// We want to track the size of each object even if it is invalid data
|
||||
connection.statistic.addReceivedBytes(size);
|
||||
/* TODO
|
||||
|
||||
// We want to track the messages also before the checks, so do it early...
|
||||
Message message = null;
|
||||
|
@ -783,23 +838,24 @@ public class Connection implements MessageListener {
|
|||
connection.statistic.addReceivedMessage((Message) rawInputObject);
|
||||
}
|
||||
|
||||
|
||||
*/
|
||||
// First we check the size
|
||||
boolean exceeds;
|
||||
if (rawInputObject instanceof GetDataResponse) {
|
||||
exceeds = size > MAX_MSG_SIZE_GET_DATA;
|
||||
log.info("size={}; object={}", size, Utilities.toTruncatedString(rawInputObject.toString(), 100));
|
||||
if (envelope.getSerializedSize() > MAX_MSG_SIZE_GET_DATA) { // TODO should be datamessage
|
||||
exceeds = true;
|
||||
log.info("size={}; object={}", size, envelope.toString());
|
||||
} else {
|
||||
exceeds = size > MAX_MSG_SIZE;
|
||||
}
|
||||
if (exceeds)
|
||||
log.warn("size > MAX_MSG_SIZE. size={}; object={}", size, message);
|
||||
log.warn("size > MAX_MSG_SIZE. size={}; object={}", size, envelope.toString());
|
||||
|
||||
if (exceeds && reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED))
|
||||
return;
|
||||
|
||||
/* TODO protobuffer equivalent?
|
||||
|
||||
// Then we check if data is of type Serializable (objectInputStream supports
|
||||
// Then we check if data is of type Serializable (protoInputStream supports
|
||||
// Externalizable objects as well)
|
||||
Serializable serializable;
|
||||
if (rawInputObject instanceof Serializable) {
|
||||
|
@ -809,41 +865,47 @@ public class Connection implements MessageListener {
|
|||
// We return anyway here independent of the return value of reportInvalidRequest
|
||||
return;
|
||||
}
|
||||
*/
|
||||
|
||||
// Then check data throttle limit. Do that for non-message type objects as well,
|
||||
// Then check data throttle limit. Do that for non-message type objects as well,
|
||||
// so that's why we use serializable here.
|
||||
if (connection.violatesThrottleLimit(serializable) && reportInvalidRequest(RuleViolation.THROTTLE_LIMIT_EXCEEDED))
|
||||
if (connection.violatesThrottleLimit(null) && reportInvalidRequest(RuleViolation.THROTTLE_LIMIT_EXCEEDED))
|
||||
return;
|
||||
|
||||
// We do the message type check after the size/throttle checks.
|
||||
/* TODO catch exception during parse instead of this
|
||||
|
||||
// We do the message type check after the size/throttle checks.
|
||||
// The type check was done already earlier so we only check if message is not null.
|
||||
if (message == null) {
|
||||
reportInvalidRequest(RuleViolation.INVALID_DATA_TYPE);
|
||||
// We return anyway here independent of the return value of reportInvalidRequest
|
||||
return;
|
||||
}
|
||||
*/
|
||||
|
||||
// Check P2P network ID
|
||||
int messageVersion = message.getMessageVersion();
|
||||
int messageVersion = (int)envelope.getP2PNetworkVersion();
|
||||
int p2PMessageVersion = Version.getP2PMessageVersion();
|
||||
if (messageVersion != p2PMessageVersion) {
|
||||
log.warn("message.getMessageVersion()=" + messageVersion);
|
||||
log.warn("Version.getP2PMessageVersion()=" + p2PMessageVersion);
|
||||
log.warn("message=" + message);
|
||||
log.warn("message=" + envelope.toString());
|
||||
/* TODO
|
||||
reportInvalidRequest(RuleViolation.WRONG_NETWORK_ID);
|
||||
// We return anyway here independent of the return value of reportInvalidRequest
|
||||
return;
|
||||
*/
|
||||
}
|
||||
|
||||
if (sharedModel.getSupportedCapabilities() == null && message instanceof SupportedCapabilitiesMessage)
|
||||
sharedModel.setSupportedCapabilities(((SupportedCapabilitiesMessage) message).getSupportedCapabilities());
|
||||
|
||||
if (message instanceof CloseConnectionMessage) {
|
||||
if (isCloseConnectionMessage(envelope)) {
|
||||
// If we get a CloseConnectionMessage we shut down
|
||||
log.debug("CloseConnectionMessage received. Reason={}\n\t" +
|
||||
"connection={}", ((CloseConnectionMessage) message).reason, connection);
|
||||
"connection={}", envelope.getCloseConnectionMessage().getReason(), connection);
|
||||
stop();
|
||||
if (CloseConnectionReason.PEER_BANNED.name().equals(((CloseConnectionMessage) message).reason)) {
|
||||
if (CloseConnectionReason.PEER_BANNED.name().equals(envelope.getCloseConnectionMessage().getReason())) {
|
||||
log.warn("We got shut down because we are banned by the other peer. (InputHandler.run CloseConnectionMessage)");
|
||||
sharedModel.shutDown(CloseConnectionReason.PEER_BANNED);
|
||||
} else {
|
||||
|
@ -851,18 +913,18 @@ public class Connection implements MessageListener {
|
|||
}
|
||||
} else if (!stopped) {
|
||||
// We don't want to get the activity ts updated by ping/pong msg
|
||||
if (!(message instanceof KeepAliveMessage))
|
||||
if (!(isPing(envelope) || isPong(envelope))) {
|
||||
connection.statistic.updateLastActivityTimestamp();
|
||||
}
|
||||
|
||||
if (message instanceof GetDataRequest)
|
||||
connection.setPeerType(PeerType.INITIAL_DATA_REQUEST);
|
||||
|
||||
// First a seed node gets a message from a peer (PreliminaryDataRequest using
|
||||
// AnonymousMessage interface) which does not have its hidden service
|
||||
// published, so it does not know its address. As the IncomingConnection does not have the
|
||||
// peersNodeAddress set that connection cannot be used for outgoing messages until we
|
||||
// peersNodeAddress set that connection cannot be used for outgoing messages until we
|
||||
// get the address set.
|
||||
// At the data update message (DataRequest using SendersNodeAddressMessage interface)
|
||||
// At the data update message (DataRequest using SendersNodeAddressMessage interface)
|
||||
// after the HS is published we get the peer's address set.
|
||||
|
||||
// There are only those messages used for new connections to a peer:
|
||||
|
@ -872,7 +934,7 @@ public class Connection implements MessageListener {
|
|||
// 4. DirectMessage (implements SendersNodeAddressMessage)
|
||||
if (message instanceof SendersNodeAddressMessage) {
|
||||
NodeAddress senderNodeAddress = ((SendersNodeAddressMessage) message).getSenderNodeAddress();
|
||||
// We must not shut down a banned peer at that moment as it would trigger a connection termination
|
||||
// We must not shut down a banned peer at that moment as it would trigger a connection termination
|
||||
// and we could not send the CloseConnectionMessage.
|
||||
// We shut down a banned peer at the next step at setPeersNodeAddress().
|
||||
|
||||
|
@ -890,6 +952,7 @@ public class Connection implements MessageListener {
|
|||
if (message instanceof PrefixedSealedAndSignedMessage)
|
||||
connection.setPeerType(Connection.PeerType.DIRECT_MSG_PEER);
|
||||
|
||||
|
||||
messageListener.onMessage(message, connection);
|
||||
}
|
||||
} catch (InvalidClassException e) {
|
||||
|
@ -897,12 +960,13 @@ public class Connection implements MessageListener {
|
|||
e.printStackTrace();
|
||||
reportInvalidRequest(RuleViolation.INVALID_CLASS);
|
||||
return;
|
||||
} catch (ClassNotFoundException | NoClassDefFoundError e) {
|
||||
}/*
|
||||
catch (ClassNotFoundException | NoClassDefFoundError e) {
|
||||
log.warn(e.getMessage());
|
||||
e.printStackTrace();
|
||||
reportInvalidRequest(RuleViolation.INVALID_DATA_TYPE);
|
||||
return;
|
||||
} catch (IOException e) {
|
||||
} */ catch (IOException e) {
|
||||
stop();
|
||||
sharedModel.handleConnectionException(e);
|
||||
} catch (Throwable t) {
|
||||
|
@ -920,6 +984,21 @@ public class Connection implements MessageListener {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean isPing(Messages.Envelope envelope) {
|
||||
return Messages.Ping.getDefaultInstance().equals(envelope.getPing());
|
||||
}
|
||||
private boolean isPong(Messages.Envelope envelope) {
|
||||
return Messages.Pong.getDefaultInstance().equals(envelope.getPong());
|
||||
}
|
||||
|
||||
private boolean isRefreshTTLMessage(Messages.Envelope envelope) {
|
||||
return Messages.RefreshTTLMessage.getDefaultInstance().equals(envelope.getRefreshTtlMessage());
|
||||
}
|
||||
|
||||
private boolean isCloseConnectionMessage(Messages.Envelope envelope) {
|
||||
return Messages.CloseConnectionMessage.getDefaultInstance().equals(envelope.getCloseConnectionMessage());
|
||||
}
|
||||
|
||||
private boolean reportInvalidRequest(RuleViolation ruleViolation) {
|
||||
boolean causedShutDown = sharedModel.reportInvalidRequest(ruleViolation);
|
||||
if (causedShutDown)
|
||||
|
|
65
pom.xml
65
pom.xml
|
@ -83,6 +83,23 @@
|
|||
</executions>
|
||||
</plugin>-->
|
||||
|
||||
<plugin>
|
||||
<groupId>org.xolstice.maven.plugins</groupId>
|
||||
<artifactId>protobuf-maven-plugin</artifactId>
|
||||
<version>0.5.0</version>
|
||||
<configuration>
|
||||
<protocExecutable>/usr/local/bin/protoc</protocExecutable>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>compile</goal>
|
||||
<goal>test-compile</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
|
@ -166,6 +183,11 @@
|
|||
<version>1.16.12</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.protobuf</groupId>
|
||||
<artifactId>protobuf-java</artifactId>
|
||||
<version>3.1.0</version>
|
||||
</dependency>
|
||||
|
||||
<!--logging-->
|
||||
<dependency>
|
||||
|
@ -204,4 +226,47 @@
|
|||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>update-protobuf</id>
|
||||
<activation>
|
||||
<property>
|
||||
<name>updateProtobuf</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
</activation>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-antrun-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>compile-protoc</id>
|
||||
<phase>generate-sources</phase>
|
||||
<configuration>
|
||||
<tasks>
|
||||
<path id="proto.path">
|
||||
<fileset dir="common/src/main/proto">
|
||||
<include name="**/*.proto"/>
|
||||
</fileset>
|
||||
</path>
|
||||
<pathconvert pathsep=" " property="proto.files" refid="proto.path"/>
|
||||
<exec executable="protoc" failonerror="true">
|
||||
<arg value="--java_out=./common/src/main/java"/>
|
||||
<arg value="-Icommon/src/main/proto"/>
|
||||
<arg line="${proto.files}"/>
|
||||
</exec>
|
||||
</tasks>
|
||||
</configuration>
|
||||
<goals>
|
||||
<goal>run</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</profile>
|
||||
</profiles>
|
||||
</project>
|
||||
|
|
Loading…
Add table
Reference in a new issue