Fix wrong handling of Capabilities. Add missing handling of CapabilityRequiringPayload

This commit is contained in:
Manfred Karrer 2017-11-09 14:33:42 -05:00
parent 706710a555
commit 44c7c712a3
No known key found for this signature in database
GPG Key ID: 401250966A6B2C46
10 changed files with 486 additions and 259 deletions

View File

@ -1,9 +1,9 @@
package io.bisq.network.p2p;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
public interface SupportedCapabilitiesMessage {
@Nullable
ArrayList<Integer> getSupportedCapabilities();
List<Integer> getSupportedCapabilities();
}

View File

@ -22,6 +22,7 @@ import io.bisq.network.p2p.storage.messages.AddDataMessage;
import io.bisq.network.p2p.storage.messages.AddPersistableNetworkPayloadMessage;
import io.bisq.network.p2p.storage.messages.RefreshOfferMessage;
import io.bisq.network.p2p.storage.payload.CapabilityRequiringPayload;
import io.bisq.network.p2p.storage.payload.PersistableNetworkPayload;
import io.bisq.network.p2p.storage.payload.ProtectedStoragePayload;
import javafx.beans.property.ObjectProperty;
import javafx.beans.property.ReadOnlyObjectProperty;
@ -189,8 +190,8 @@ public class Connection implements MessageListener {
long elapsed = now - lastSendTimeStamp;
if (elapsed < 20) {
log.debug("We got 2 sendMessage requests in less than 20 ms. We set the thread to sleep " +
"for 50 ms to avoid flooding our peer. lastSendTimeStamp={}, now={}, elapsed={}",
lastSendTimeStamp, now, elapsed);
"for 50 ms to avoid flooding our peer. lastSendTimeStamp={}, now={}, elapsed={}",
lastSendTimeStamp, now, elapsed);
Thread.sleep(50);
}
@ -203,25 +204,25 @@ public class Connection implements MessageListener {
if (networkEnvelope instanceof Ping | networkEnvelope instanceof RefreshOfferMessage) {
// pings and offer refresh msg we dont want to log in production
log.trace("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Sending direct message to peer" +
"Write object to outputStream to peer: {} (uid={})\ntruncated message={} / size={}" +
"\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n",
peersNodeAddress, uid, proto.toString(), proto.getSerializedSize());
"Sending direct message to peer" +
"Write object to outputStream to peer: {} (uid={})\ntruncated message={} / size={}" +
"\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n",
peersNodeAddress, uid, proto.toString(), proto.getSerializedSize());
} else if (networkEnvelope instanceof PrefixedSealedAndSignedMessage && peersNodeAddressOptional.isPresent()) {
setPeerType(Connection.PeerType.DIRECT_MSG_PEER);
log.debug("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Sending direct message to peer" +
"Write object to outputStream to peer: {} (uid={})\ntruncated message={} / size={}" +
"\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n",
peersNodeAddress, uid, Utilities.toTruncatedString(networkEnvelope), -1);
"Sending direct message to peer" +
"Write object to outputStream to peer: {} (uid={})\ntruncated message={} / size={}" +
"\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n",
peersNodeAddress, uid, Utilities.toTruncatedString(networkEnvelope), -1);
} else if (networkEnvelope instanceof GetDataResponse && ((GetDataResponse) networkEnvelope).isGetUpdatedDataResponse()) {
setPeerType(Connection.PeerType.PEER);
} else {
log.debug("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Write object to outputStream to peer: {} (uid={})\ntruncated message={} / size={}" +
"\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n",
peersNodeAddress, uid, Utilities.toTruncatedString(networkEnvelope), proto.getSerializedSize());
"Write object to outputStream to peer: {} (uid={})\ntruncated message={} / size={}" +
"\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n",
peersNodeAddress, uid, Utilities.toTruncatedString(networkEnvelope), proto.getSerializedSize());
}
if (!stopped) {
@ -249,6 +250,7 @@ public class Connection implements MessageListener {
}
public boolean isCapabilitySupported(NetworkEnvelope networkEnvelop) {
//TODO refactor, does not cover multiple items, add tests
if (networkEnvelop instanceof AddDataMessage) {
final ProtectedStoragePayload protectedStoragePayload = (((AddDataMessage) networkEnvelop).getProtectedStorageEntry()).getProtectedStoragePayload();
if (protectedStoragePayload instanceof CapabilityRequiringPayload) {
@ -262,16 +264,44 @@ public class Connection implements MessageListener {
}
}
log.debug("We do not send the message to the peer because he does not support the required capability for that message type.\n" +
"Required capabilities is: " + requiredCapabilities.toString() + "\n" +
"Supported capabilities is: " + supportedCapabilities.toString() + "\n" +
"connection: " + this.toString() + "\n" +
"storagePayload is: " + Utilities.toTruncatedString(protectedStoragePayload));
"Required capabilities is: " + requiredCapabilities.toString() + "\n" +
"Supported capabilities is: " + supportedCapabilities.toString() + "\n" +
"connection: " + this.toString() + "\n" +
"storagePayload is: " + Utilities.toTruncatedString(protectedStoragePayload));
return false;
} else {
log.debug("We do not send the message to the peer because he uses an old version which does not support capabilities.\n" +
"Required capabilities is: " + requiredCapabilities.toString() + "\n" +
"connection: " + this.toString() + "\n" +
"storagePayload is: " + Utilities.toTruncatedString(protectedStoragePayload));
"Required capabilities is: " + requiredCapabilities.toString() + "\n" +
"connection: " + this.toString() + "\n" +
"storagePayload is: " + Utilities.toTruncatedString(protectedStoragePayload));
return false;
}
} else {
return true;
}
} else if (networkEnvelop instanceof AddPersistableNetworkPayloadMessage) {
final PersistableNetworkPayload persistableNetworkPayload = ((AddPersistableNetworkPayloadMessage) networkEnvelop).getPersistableNetworkPayload();
if (persistableNetworkPayload instanceof CapabilityRequiringPayload) {
final List<Integer> requiredCapabilities = ((CapabilityRequiringPayload) persistableNetworkPayload).getRequiredCapabilities();
final List<Integer> supportedCapabilities = sharedModel.getSupportedCapabilities();
if (supportedCapabilities != null) {
for (int messageCapability : requiredCapabilities) {
for (int connectionCapability : supportedCapabilities) {
if (messageCapability == connectionCapability)
return true;
}
}
log.debug("We do not send the message to the peer because he does not support the required capability for that message type.\n" +
"Required capabilities is: " + requiredCapabilities.toString() + "\n" +
"Supported capabilities is: " + supportedCapabilities.toString() + "\n" +
"connection: " + this.toString() + "\n" +
"storagePayload is: " + Utilities.toTruncatedString(persistableNetworkPayload));
return false;
} else {
log.debug("We do not send the message to the peer because he uses an old version which does not support capabilities.\n" +
"Required capabilities is: " + requiredCapabilities.toString() + "\n" +
"connection: " + this.toString() + "\n" +
"storagePayload is: " + Utilities.toTruncatedString(persistableNetworkPayload));
return false;
}
} else {
@ -284,7 +314,10 @@ public class Connection implements MessageListener {
@SuppressWarnings("BooleanMethodIsAlwaysInverted")
public boolean isCapabilityRequired(NetworkEnvelope networkEnvelop) {
return networkEnvelop instanceof AddDataMessage && (((AddDataMessage) networkEnvelop).getProtectedStorageEntry()).getProtectedStoragePayload() instanceof CapabilityRequiringPayload;
return (networkEnvelop instanceof AddDataMessage &&
(((AddDataMessage) networkEnvelop).getProtectedStorageEntry()).getProtectedStoragePayload() instanceof CapabilityRequiringPayload) ||
(networkEnvelop instanceof AddPersistableNetworkPayloadMessage &&
(((AddPersistableNetworkPayloadMessage) networkEnvelop).getPersistableNetworkPayload() instanceof CapabilityRequiringPayload));
}
public List<Integer> getSupportedCapabilities() {
@ -301,7 +334,7 @@ public class Connection implements MessageListener {
boolean contained = messageListeners.remove(messageListener);
if (!contained)
log.debug("Try to remove a messageListener which was never added.\n\t" +
"That might happen because of async behaviour of CopyOnWriteArraySet");
"That might happen because of async behaviour of CopyOnWriteArraySet");
}
@SuppressWarnings({"unused", "UnusedReturnValue"})
@ -323,8 +356,8 @@ public class Connection implements MessageListener {
log.error("violatesThrottleLimit MSG_THROTTLE_PER_SEC ");
log.error("elapsed " + (now - compareValue));
log.error("messageTimeStamps: \n\t" + messageTimeStamps.stream()
.map(e -> "\n\tts=" + e.first.toString() + " message=" + e.second.getClass().getName())
.collect(Collectors.toList()).toString());
.map(e -> "\n\tts=" + e.first.toString() + " message=" + e.second.getClass().getName())
.collect(Collectors.toList()).toString());
}
}
@ -339,8 +372,8 @@ public class Connection implements MessageListener {
log.error("violatesThrottleLimit MSG_THROTTLE_PER_10_SEC ");
log.error("elapsed " + (now - compareValue));
log.error("messageTimeStamps: \n\t" + messageTimeStamps.stream()
.map(e -> "\n\tts=" + e.first.toString() + " message=" + e.second.getClass().getName())
.collect(Collectors.toList()).toString());
.map(e -> "\n\tts=" + e.first.toString() + " message=" + e.second.getClass().getName())
.collect(Collectors.toList()).toString());
}
}
// we limit to max 50 (MSG_THROTTLE_PER_10SEC) entries
@ -379,10 +412,10 @@ public class Connection implements MessageListener {
String peersNodeAddress = getPeersNodeAddressOptional().isPresent() ? getPeersNodeAddressOptional().get().getFullAddress() : "";
if (this instanceof InboundConnection) {
log.debug("\n\n############################################################\n" +
"We got the peers node address set.\n" +
"peersNodeAddress= " + peersNodeAddress +
"\nconnection.uid=" + getUid() +
"\n############################################################\n");
"We got the peers node address set.\n" +
"peersNodeAddress= " + peersNodeAddress +
"\nconnection.uid=" + getUid() +
"\n############################################################\n");
}
peersNodeAddressProperty.set(peerNodeAddress);
@ -444,11 +477,11 @@ public class Connection implements MessageListener {
if (!stopped) {
String peersNodeAddress = peersNodeAddressOptional.isPresent() ? peersNodeAddressOptional.get().toString() : "null";
log.debug("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" +
"ShutDown connection:"
+ "\npeersNodeAddress=" + peersNodeAddress
+ "\ncloseConnectionReason=" + closeConnectionReason
+ "\nuid=" + uid
+ "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n");
"ShutDown connection:"
+ "\npeersNodeAddress=" + peersNodeAddress
+ "\ncloseConnectionReason=" + closeConnectionReason
+ "\nuid=" + uid
+ "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n");
if (closeConnectionReason.sendCloseMessage) {
new Thread(() -> {
@ -456,7 +489,7 @@ public class Connection implements MessageListener {
Log.traceCall("sendCloseConnectionMessage");
try {
String reason = closeConnectionReason == CloseConnectionReason.RULE_VIOLATION ?
sharedModel.getRuleViolation().name() : closeConnectionReason.name();
sharedModel.getRuleViolation().name() : closeConnectionReason.name();
sendMessage(new CloseConnectionMessage(reason));
setStopFlags();
@ -545,22 +578,22 @@ public class Connection implements MessageListener {
@Override
public String toString() {
return "Connection{" +
"peerAddress=" + peersNodeAddressOptional +
", peerType=" + peerType +
", uid='" + uid + '\'' +
'}';
"peerAddress=" + peersNodeAddressOptional +
", peerType=" + peerType +
", uid='" + uid + '\'' +
'}';
}
@SuppressWarnings("unused")
public String printDetails() {
return "Connection{" +
"peerAddress=" + peersNodeAddressOptional +
", peerType=" + peerType +
", portInfo=" + portInfo +
", uid='" + uid + '\'' +
", sharedSpace=" + sharedModel.toString() +
", stopped=" + stopped +
'}';
"peerAddress=" + peersNodeAddressOptional +
", peerType=" + peerType +
", portInfo=" + portInfo +
", uid='" + uid + '\'' +
", sharedSpace=" + sharedModel.toString() +
", stopped=" + stopped +
'}';
}
@ -605,10 +638,10 @@ public class Connection implements MessageListener {
if (numRuleViolations >= ruleViolation.maxTolerance) {
log.warn("We close connection as we received too many corrupt requests.\n" +
"numRuleViolations={}\n\t" +
"corruptRequest={}\n\t" +
"corruptRequests={}\n\t" +
"connection={}", numRuleViolations, ruleViolation, ruleViolations.toString(), connection);
"numRuleViolations={}\n\t" +
"corruptRequest={}\n\t" +
"corruptRequests={}\n\t" +
"connection={}", numRuleViolations, ruleViolation, ruleViolations.toString(), connection);
this.ruleViolation = ruleViolation;
if (ruleViolation == RuleViolation.PEER_BANNED) {
log.warn("We detected a connection to a banned peer. We will close that connection. (reportInvalidRequest)");
@ -653,11 +686,11 @@ public class Connection implements MessageListener {
// TODO sometimes we get StreamCorruptedException, OptionalDataException, IllegalStateException
closeConnectionReason = CloseConnectionReason.UNKNOWN_EXCEPTION;
log.warn("Unknown reason for exception at socket {}\n\t" +
"connection={}\n\t" +
"Exception=",
socket.toString(),
this,
e.toString());
"connection={}\n\t" +
"Exception=",
socket.toString(),
this,
e.toString());
e.printStackTrace();
}
@ -686,9 +719,9 @@ public class Connection implements MessageListener {
@Override
public String toString() {
return "SharedSpace{" +
"socket=" + socket +
", ruleViolations=" + ruleViolations +
'}';
"socket=" + socket +
", ruleViolations=" + ruleViolations +
'}';
}
}
@ -744,13 +777,13 @@ public class Connection implements MessageListener {
Thread.currentThread().setName("InputHandler");
while (!stopped && !Thread.currentThread().isInterrupted()) {
if (!threadNameSet && sharedModel.connection != null &&
sharedModel.connection.getPeersNodeAddressOptional().isPresent()) {
sharedModel.connection.getPeersNodeAddressOptional().isPresent()) {
Thread.currentThread().setName("InputHandler-" + sharedModel.connection.getPeersNodeAddressOptional().get().getFullAddress());
threadNameSet = true;
}
try {
if (sharedModel.getSocket() != null &&
sharedModel.getSocket().isClosed()) {
sharedModel.getSocket().isClosed()) {
stopAndShutDown(CloseConnectionReason.SOCKET_CLOSED);
return;
}
@ -763,8 +796,8 @@ public class Connection implements MessageListener {
long elapsed = now - lastReadTimeStamp;
if (elapsed < 10) {
log.debug("We got 2 network_messages received in less than 10 ms. We set the thread to sleep " +
"for 20 ms to avoid getting flooded by our peer. lastReadTimeStamp={}, now={}, elapsed={}",
lastReadTimeStamp, now, elapsed);
"for 20 ms to avoid getting flooded by our peer. lastReadTimeStamp={}, now={}, elapsed={}",
lastReadTimeStamp, now, elapsed);
Thread.sleep(20);
}
@ -786,23 +819,23 @@ public class Connection implements MessageListener {
if (networkEnvelope instanceof Pong || networkEnvelope instanceof RefreshOfferMessage) {
// We only log Pong and RefreshOfferMsg when in dev environment (trace)
log.trace("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" +
"New data arrived at inputHandler of connection {}.\n" +
"Received object (truncated)={} / size={}"
+ "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n",
connection,
Utilities.toTruncatedString(proto.toString()),
size);
"New data arrived at inputHandler of connection {}.\n" +
"Received object (truncated)={} / size={}"
+ "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n",
connection,
Utilities.toTruncatedString(proto.toString()),
size);
} else {
// We want to log all incoming network_messages (except Pong and RefreshOfferMsg)
// so we log before the data type checks
//log.info("size={}; object={}", size, Utilities.toTruncatedString(rawInputObject.toString(), 100));
log.debug("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" +
"New data arrived at inputHandler of connection {}.\n" +
"Received object (truncated)={} / size={}"
+ "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n",
connection,
Utilities.toTruncatedString(proto.toString()),
size);
"New data arrived at inputHandler of connection {}.\n" +
"Received object (truncated)={} / size={}"
+ "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n",
connection,
Utilities.toTruncatedString(proto.toString()),
size);
}
// We want to track the size of each object even if it is invalid data
@ -821,10 +854,10 @@ public class Connection implements MessageListener {
}
if (networkEnvelope instanceof AddPersistableNetworkPayloadMessage &&
!((AddPersistableNetworkPayloadMessage) networkEnvelope).getPersistableNetworkPayload().verifyHashSize()) {
!((AddPersistableNetworkPayloadMessage) networkEnvelope).getPersistableNetworkPayload().verifyHashSize()) {
log.warn("PersistableNetworkPayload.verifyHashSize failed. hashSize={}; object={}",
((AddPersistableNetworkPayloadMessage) networkEnvelope).getPersistableNetworkPayload().getHash().length,
Utilities.toTruncatedString(proto));
((AddPersistableNetworkPayloadMessage) networkEnvelope).getPersistableNetworkPayload().getHash().length,
Utilities.toTruncatedString(proto));
if (reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED))
return;
}
@ -837,16 +870,16 @@ public class Connection implements MessageListener {
}
if (connection.violatesThrottleLimit(networkEnvelope)
&& reportInvalidRequest(RuleViolation.THROTTLE_LIMIT_EXCEEDED))
&& reportInvalidRequest(RuleViolation.THROTTLE_LIMIT_EXCEEDED))
return;
// Check P2P network ID
if (proto.getMessageVersion() != Version.getP2PMessageVersion()
&& reportInvalidRequest(RuleViolation.WRONG_NETWORK_ID)) {
&& reportInvalidRequest(RuleViolation.WRONG_NETWORK_ID)) {
log.warn("RuleViolation.WRONG_NETWORK_ID. version of message={}, app version={}, " +
"proto.toTruncatedString={}", proto.getMessageVersion(),
Version.getP2PMessageVersion(),
Utilities.toTruncatedString(proto.toString()));
"proto.toTruncatedString={}", proto.getMessageVersion(),
Version.getP2PMessageVersion(),
Utilities.toTruncatedString(proto.toString()));
return;
}
@ -856,7 +889,7 @@ public class Connection implements MessageListener {
if (networkEnvelope instanceof CloseConnectionMessage) {
// If we get a CloseConnectionMessage we shut down
log.debug("CloseConnectionMessage received. Reason={}\n\t" +
"connection={}", proto.getCloseConnectionMessage().getReason(), connection);
"connection={}", proto.getCloseConnectionMessage().getReason(), connection);
if (CloseConnectionReason.PEER_BANNED.name().equals(proto.getCloseConnectionMessage().getReason())) {
log.warn("We got shut down because we are banned by the other peer. (InputHandler.run CloseConnectionMessage)");
stopAndShutDown(CloseConnectionReason.PEER_BANNED);
@ -895,8 +928,8 @@ public class Connection implements MessageListener {
if (peersNodeAddressOptional.isPresent()) {
// If we have already the peers address we check again if it matches our stored one
checkArgument(peersNodeAddressOptional.get().equals(senderNodeAddress),
"senderNodeAddress not matching connections peer address.\n\t" +
"message=" + networkEnvelope);
"senderNodeAddress not matching connections peer address.\n\t" +
"message=" + networkEnvelope);
} else {
connection.setPeersNodeAddress(senderNodeAddress);
}
@ -946,10 +979,10 @@ public class Connection implements MessageListener {
@Override
public String toString() {
return "InputHandler{" +
"sharedSpace=" + sharedModel +
", port=" + portInfo +
", stopped=" + stopped +
'}';
"sharedSpace=" + sharedModel +
", port=" + portInfo +
", stopped=" + stopped +
'}';
}
}
}

View File

@ -0,0 +1,82 @@
{
"offerPayload": {
"id": "bynzq-3dbea64f-8028-4235-bfc7-7b7329bff180-053",
"date": 1510249787501,
"ownerNodeAddress": {
"hostName": "localhost",
"port": 3682
},
"direction": "BUY",
"price": 0,
"marketPriceMargin": 0.01,
"useMarketBasedPrice": true,
"amount": 1000000,
"minAmount": 1000000,
"baseCurrencyCode": "BTC",
"counterCurrencyCode": "USD",
"arbitratorNodeAddresses": [
{
"hostName": "localhost",
"port": 9222
}
],
"mediatorNodeAddresses": [
{
"hostName": "localhost",
"port": 9222
}
],
"paymentMethodId": "PERFECT_MONEY",
"makerPaymentAccountId": "619c802d-48ff-40a1-944e-77d5df5623eb",
"offerFeePaymentTxId": "1e5f0c750a8bc91c92cdd2132c916089bafd649b1d288980a65ce8dfdfc9a2ca",
"versionNr": "0.5.3",
"blockHeightAtOfferCreation": 166,
"txFee": 168000,
"makerFee": 20000,
"isCurrencyForMakerFeeBtc": true,
"buyerSecurityDeposit": 3000000,
"sellerSecurityDeposit": 1000000,
"maxTradeLimit": 100000000,
"maxTradePeriod": 86400000,
"useAutoClose": false,
"useReOpenAfterAutoClose": false,
"lowerClosePrice": 0,
"upperClosePrice": 0,
"isPrivateOffer": false,
"protocolVersion": 1
},
"tradeAmount": 1000000,
"tradePrice": 71132787,
"takerFeeTxID": "7361ac51789257022dc07189d56a42ad918922fad81d3d074ebf85ebbfdcb3c9",
"buyerNodeAddress": {
"hostName": "localhost",
"port": 3682
},
"sellerNodeAddress": {
"hostName": "localhost",
"port": 3632
},
"arbitratorNodeAddress": {
"hostName": "localhost",
"port": 9222
},
"mediatorNodeAddress": {
"hostName": "localhost",
"port": 9222
},
"isBuyerMakerAndSellerTaker": true,
"makerAccountId": "919666446",
"takerAccountId": "1680033503",
"makerPaymentAccountPayload": {
"accountNr": "adsf",
"paymentMethodId": "PERFECT_MONEY",
"id": "619c802d-48ff-40a1-944e-77d5df5623eb"
},
"takerPaymentAccountPayload": {
"accountNr": "dummy_31",
"paymentMethodId": "PERFECT_MONEY",
"id": "1f78ffab-082d-4646-aa3a-2089698748d0"
},
"makerPayoutAddressString": "mtEiwZUKnyEGunzTbpqNFwrNv4wEAbVoHK",
"takerPayoutAddressString": "mk8gDkjz4gKA3qWvRbbcc82VhEBx1c43P3"
}

View File

@ -0,0 +1,84 @@
{
"offerPayload": {
"id": "bynzq-3dbea64f-8028-4235-bfc7-7b7329bff180-053",
"date": 1510249787501,
"ownerNodeAddress": {
"hostName": "localhost",
"port": 3682
},
"direction": "BUY",
"price": 0,
"marketPriceMargin": 0.01,
"useMarketBasedPrice": true,
"amount": 1000000,
"minAmount": 1000000,
"baseCurrencyCode": "BTC",
"counterCurrencyCode": "USD",
"arbitratorNodeAddresses": [
{
"hostName": "localhost",
"port": 9222
}
],
"mediatorNodeAddresses": [
{
"hostName": "localhost",
"port": 9222
}
],
"paymentMethodId": "PERFECT_MONEY",
"makerPaymentAccountId": "619c802d-48ff-40a1-944e-77d5df5623eb",
"offerFeePaymentTxId": "1e5f0c750a8bc91c92cdd2132c916089bafd649b1d288980a65ce8dfdfc9a2ca",
"versionNr": "0.5.3",
"blockHeightAtOfferCreation": 166,
"txFee": 168000,
"makerFee": 20000,
"isCurrencyForMakerFeeBtc": true,
"buyerSecurityDeposit": 3000000,
"sellerSecurityDeposit": 1000000,
"maxTradeLimit": 100000000,
"maxTradePeriod": 86400000,
"useAutoClose": false,
"useReOpenAfterAutoClose": false,
"lowerClosePrice": 0,
"upperClosePrice": 0,
"isPrivateOffer": false,
"protocolVersion": 1
},
"tradeAmount": 1000000,
"tradePrice": 71132787,
"takerFeeTxID": "7361ac51789257022dc07189d56a42ad918922fad81d3d074ebf85ebbfdcb3c9",
"buyerNodeAddress": {
"hostName": "localhost",
"port": 3682
},
"sellerNodeAddress": {
"hostName": "localhost",
"port": 3632
},
"arbitratorNodeAddress": {
"hostName": "localhost",
"port": 9222
},
"mediatorNodeAddress": {
"hostName": "localhost",
"port": 9222
},
"isBuyerMakerAndSellerTaker": true,
"makerAccountId": "919666446",
"takerAccountId": "1680033503",
"makerPaymentAccountPayload": {
"accountNr": "adsf",
"paymentMethodId": "PERFECT_MONEY",
"id": "619c802d-48ff-40a1-944e-77d5df5623eb",
"maxTradePeriod": 86400000
},
"takerPaymentAccountPayload": {
"accountNr": "dummy_31",
"paymentMethodId": "PERFECT_MONEY",
"id": "1f78ffab-082d-4646-aa3a-2089698748d0",
"maxTradePeriod": 0
},
"makerPayoutAddressString": "mtEiwZUKnyEGunzTbpqNFwrNv4wEAbVoHK",
"takerPayoutAddressString": "mk8gDkjz4gKA3qWvRbbcc82VhEBx1c43P3"
}

View File

@ -1,6 +1,5 @@
package io.bisq.network.p2p.peers.getdata.messages;
import io.bisq.common.app.Capabilities;
import io.bisq.common.app.Version;
import io.bisq.common.proto.network.NetworkEnvelope;
import io.bisq.common.proto.network.NetworkProtoResolver;
@ -15,8 +14,8 @@ import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -32,10 +31,11 @@ public final class GetDataResponse extends NetworkEnvelope implements SupportedC
// We added that in v 0.6 and we would get a null object from older peers, so keep it annotated with @Nullable
@Nullable
private final Set<PersistableNetworkPayload> persistableNetworkPayloadSet;
private final int requestNonce;
private final boolean isGetUpdatedDataResponse;
private final ArrayList<Integer> supportedCapabilities = Capabilities.getCapabilities();
@Nullable
private final List<Integer> supportedCapabilities;
public GetDataResponse(Set<ProtectedStorageEntry> dataSet,
@Nullable Set<PersistableNetworkPayload> persistableNetworkPayloadSet,
@ -45,6 +45,7 @@ public final class GetDataResponse extends NetworkEnvelope implements SupportedC
persistableNetworkPayloadSet,
requestNonce,
isGetUpdatedDataResponse,
null,
Version.getP2PMessageVersion());
}
@ -56,6 +57,7 @@ public final class GetDataResponse extends NetworkEnvelope implements SupportedC
@Nullable Set<PersistableNetworkPayload> persistableNetworkPayloadSet,
int requestNonce,
boolean isGetUpdatedDataResponse,
@Nullable List<Integer> supportedCapabilities,
int messageVersion) {
super(messageVersion);
@ -63,6 +65,7 @@ public final class GetDataResponse extends NetworkEnvelope implements SupportedC
this.persistableNetworkPayloadSet = persistableNetworkPayloadSet;
this.requestNonce = requestNonce;
this.isGetUpdatedDataResponse = isGetUpdatedDataResponse;
this.supportedCapabilities = supportedCapabilities;
}
@Override
@ -79,9 +82,9 @@ public final class GetDataResponse extends NetworkEnvelope implements SupportedC
.build())
.collect(Collectors.toList()))
.setRequestNonce(requestNonce)
.setIsGetUpdatedDataResponse(isGetUpdatedDataResponse)
.addAllSupportedCapabilities(supportedCapabilities);
.setIsGetUpdatedDataResponse(isGetUpdatedDataResponse);
Optional.ofNullable(supportedCapabilities).ifPresent(e -> builder.addAllSupportedCapabilities(supportedCapabilities));
Optional.ofNullable(persistableNetworkPayloadSet).ifPresent(set -> builder.addAllPersistableNetworkPayloadItems(set.stream()
.map(PersistableNetworkPayload::toProtoMessage)
.collect(Collectors.toList())));
@ -109,6 +112,7 @@ public final class GetDataResponse extends NetworkEnvelope implements SupportedC
persistableNetworkPayloadSet,
proto.getRequestNonce(),
proto.getIsGetUpdatedDataResponse(),
proto.getSupportedCapabilitiesList().isEmpty() ? null : proto.getSupportedCapabilitiesList(),
messageVersion);
}
}

View File

@ -1,7 +1,6 @@
package io.bisq.network.p2p.peers.getdata.messages;
import com.google.protobuf.ByteString;
import io.bisq.common.app.Capabilities;
import io.bisq.common.app.Version;
import io.bisq.common.proto.ProtoUtil;
import io.bisq.generated.protobuffer.PB;
@ -11,7 +10,9 @@ import lombok.EqualsAndHashCode;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -20,11 +21,12 @@ import java.util.stream.Collectors;
@Value
public final class PreliminaryGetDataRequest extends GetDataRequest implements AnonymousMessage, SupportedCapabilitiesMessage {
// ordinals of enum
private final ArrayList<Integer> supportedCapabilities = Capabilities.getCapabilities();
@Nullable
private final List<Integer> supportedCapabilities;
public PreliminaryGetDataRequest(int nonce,
Set<byte[]> excludedKeys) {
this(nonce, excludedKeys, Version.getP2PMessageVersion());
this(nonce, excludedKeys, null, Version.getP2PMessageVersion());
}
@ -34,8 +36,11 @@ public final class PreliminaryGetDataRequest extends GetDataRequest implements A
private PreliminaryGetDataRequest(int nonce,
Set<byte[]> excludedKeys,
@Nullable List<Integer> supportedCapabilities,
int messageVersion) {
super(messageVersion, nonce, excludedKeys);
this.supportedCapabilities = supportedCapabilities;
}
@Override
@ -44,8 +49,9 @@ public final class PreliminaryGetDataRequest extends GetDataRequest implements A
.setNonce(nonce)
.addAllExcludedKeys(excludedKeys.stream()
.map(ByteString::copyFrom)
.collect(Collectors.toList()))
.addAllSupportedCapabilities(supportedCapabilities);
.collect(Collectors.toList()));
Optional.ofNullable(supportedCapabilities).ifPresent(e -> builder.addAllSupportedCapabilities(supportedCapabilities));
return getNetworkEnvelopeBuilder()
.setPreliminaryGetDataRequest(builder)
@ -55,6 +61,7 @@ public final class PreliminaryGetDataRequest extends GetDataRequest implements A
public static PreliminaryGetDataRequest fromProto(PB.PreliminaryGetDataRequest proto, int messageVersion) {
return new PreliminaryGetDataRequest(proto.getNonce(),
ProtoUtil.byteSetFromProtoByteStringList(proto.getExcludedKeysList()),
proto.getSupportedCapabilitiesList().isEmpty() ? null : proto.getSupportedCapabilitiesList(),
messageVersion);
}
}

View File

@ -1,6 +1,5 @@
package io.bisq.network.p2p.peers.peerexchange.messages;
import io.bisq.common.app.Capabilities;
import io.bisq.common.app.Version;
import io.bisq.common.proto.network.NetworkEnvelope;
import io.bisq.generated.protobuffer.PB;
@ -11,8 +10,10 @@ import io.bisq.network.p2p.peers.peerexchange.Peer;
import lombok.EqualsAndHashCode;
import lombok.Value;
import java.util.ArrayList;
import javax.annotation.Nullable;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
@ -23,10 +24,11 @@ public final class GetPeersRequest extends NetworkEnvelope implements PeerExchan
private final NodeAddress senderNodeAddress;
private final int nonce;
private final HashSet<Peer> reportedPeers;
private final ArrayList<Integer> supportedCapabilities = Capabilities.getCapabilities();
@Nullable
private final List<Integer> supportedCapabilities;
public GetPeersRequest(NodeAddress senderNodeAddress, int nonce, HashSet<Peer> reportedPeers) {
this(senderNodeAddress, nonce, reportedPeers, Version.getP2PMessageVersion());
this(senderNodeAddress, nonce, reportedPeers, null, Version.getP2PMessageVersion());
}
@ -34,24 +36,32 @@ public final class GetPeersRequest extends NetworkEnvelope implements PeerExchan
// PROTO BUFFER
///////////////////////////////////////////////////////////////////////////////////////////
private GetPeersRequest(NodeAddress senderNodeAddress, int nonce, HashSet<Peer> reportedPeers, int messageVersion) {
private GetPeersRequest(NodeAddress senderNodeAddress,
int nonce,
HashSet<Peer> reportedPeers,
@Nullable List<Integer> supportedCapabilities,
int messageVersion) {
super(messageVersion);
checkNotNull(senderNodeAddress, "senderNodeAddress must not be null at GetPeersRequest");
this.senderNodeAddress = senderNodeAddress;
this.nonce = nonce;
this.reportedPeers = reportedPeers;
this.supportedCapabilities = supportedCapabilities;
}
@Override
public PB.NetworkEnvelope toProtoNetworkEnvelope() {
final PB.GetPeersRequest.Builder builder = PB.GetPeersRequest.newBuilder()
.setSenderNodeAddress(senderNodeAddress.toProtoMessage())
.setNonce(nonce)
.addAllReportedPeers(reportedPeers.stream()
.map(Peer::toProtoMessage)
.collect(Collectors.toList()));
Optional.ofNullable(supportedCapabilities).ifPresent(e -> builder.addAllSupportedCapabilities(supportedCapabilities));
return getNetworkEnvelopeBuilder()
.setGetPeersRequest(PB.GetPeersRequest.newBuilder()
.setSenderNodeAddress(senderNodeAddress.toProtoMessage())
.setNonce(nonce)
.addAllReportedPeers(reportedPeers.stream()
.map(Peer::toProtoMessage)
.collect(Collectors.toList()))
.addAllSupportedCapabilities(supportedCapabilities))
.setGetPeersRequest(builder)
.build();
}
@ -61,6 +71,7 @@ public final class GetPeersRequest extends NetworkEnvelope implements PeerExchan
new HashSet<>(proto.getReportedPeersList().stream()
.map(Peer::fromProto)
.collect(Collectors.toSet())),
proto.getSupportedCapabilitiesList().isEmpty() ? null : proto.getSupportedCapabilitiesList(),
messageVersion);
}
}

View File

@ -1,6 +1,5 @@
package io.bisq.network.p2p.peers.peerexchange.messages;
import io.bisq.common.app.Capabilities;
import io.bisq.common.app.Version;
import io.bisq.common.proto.network.NetworkEnvelope;
import io.bisq.generated.protobuffer.PB;
@ -10,8 +9,10 @@ import io.bisq.network.p2p.peers.peerexchange.Peer;
import lombok.EqualsAndHashCode;
import lombok.Value;
import java.util.ArrayList;
import javax.annotation.Nullable;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@EqualsAndHashCode(callSuper = true)
@ -19,10 +20,11 @@ import java.util.stream.Collectors;
public final class GetPeersResponse extends NetworkEnvelope implements PeerExchangeMessage, SupportedCapabilitiesMessage {
private final int requestNonce;
private final HashSet<Peer> reportedPeers;
private final ArrayList<Integer> supportedCapabilities = Capabilities.getCapabilities();
@Nullable
private final List<Integer> supportedCapabilities;
public GetPeersResponse(int requestNonce, HashSet<Peer> reportedPeers) {
this(requestNonce, reportedPeers, Version.getP2PMessageVersion());
this(requestNonce, reportedPeers, null, Version.getP2PMessageVersion());
}
@ -30,31 +32,41 @@ public final class GetPeersResponse extends NetworkEnvelope implements PeerExcha
// PROTO BUFFER
///////////////////////////////////////////////////////////////////////////////////////////
private GetPeersResponse(int requestNonce, HashSet<Peer> reportedPeers, int messageVersion) {
private GetPeersResponse(int requestNonce,
HashSet<Peer> reportedPeers,
@Nullable List<Integer> supportedCapabilities,
int messageVersion) {
super(messageVersion);
this.requestNonce = requestNonce;
this.reportedPeers = reportedPeers;
this.supportedCapabilities = supportedCapabilities;
}
@Override
public PB.NetworkEnvelope toProtoNetworkEnvelope() {
return getNetworkEnvelopeBuilder()
.setGetPeersResponse(PB.GetPeersResponse.newBuilder()
final PB.GetPeersResponse.Builder builder = PB.GetPeersResponse.newBuilder()
.setRequestNonce(requestNonce)
.addAllReportedPeers(reportedPeers.stream()
.map(Peer::toProtoMessage)
.collect(Collectors.toList()))
.addAllSupportedCapabilities(supportedCapabilities))
.build();
.map(Peer::toProtoMessage)
.collect(Collectors.toList()));
Optional.ofNullable(supportedCapabilities).ifPresent(e -> builder.addAllSupportedCapabilities(supportedCapabilities));
return getNetworkEnvelopeBuilder()
.setGetPeersResponse(builder)
.build();
}
public static GetPeersResponse fromProto(PB.GetPeersResponse getPeersResponse, int messageVersion) {
public static GetPeersResponse fromProto(PB.GetPeersResponse proto, int messageVersion) {
HashSet<Peer> reportedPeers = new HashSet<>(
getPeersResponse.getReportedPeersList()
.stream()
.map(peer -> new Peer(new NodeAddress(peer.getNodeAddress().getHostName(),
peer.getNodeAddress().getPort())))
.collect(Collectors.toList()));
return new GetPeersResponse(getPeersResponse.getRequestNonce(), reportedPeers, messageVersion);
proto.getReportedPeersList()
.stream()
.map(peer -> new Peer(new NodeAddress(peer.getNodeAddress().getHostName(),
peer.getNodeAddress().getPort())))
.collect(Collectors.toList()));
return new GetPeersResponse(proto.getRequestNonce(),
reportedPeers,
proto.getSupportedCapabilitiesList().isEmpty() ? null : proto.getSupportedCapabilitiesList(),
messageVersion);
}
}

View File

@ -129,7 +129,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
log.info("Could not find resourceFile " + resourceFileName + ". That is expected if none is provided yet.");
} catch (Throwable e) {
log.error("Could not copy resourceFile " + resourceFileName + " to " +
destinationFile.getAbsolutePath() + ".\n" + e.getMessage());
destinationFile.getAbsolutePath() + ".\n" + e.getMessage());
e.printStackTrace();
}
} else {
@ -145,7 +145,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
// In case another object is already listening...
if (!hashMapChangedListeners.isEmpty())
map.values().stream()
.forEach(protectedStorageEntry -> hashMapChangedListeners.stream().forEach(e -> e.onAdded(protectedStorageEntry)));
.forEach(protectedStorageEntry -> hashMapChangedListeners.stream().forEach(e -> e.onAdded(protectedStorageEntry)));
} else {
persistableEntryMap = new PersistableEntryMap();
}
@ -168,7 +168,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
log.info("Could not find resourceFile " + resourceFileName + ". That is expected if none is provided yet.");
} catch (Throwable e) {
log.error("Could not copy resourceFile " + resourceFileName + " to " +
destinationFile.getAbsolutePath() + ".\n" + e.getMessage());
destinationFile.getAbsolutePath() + ".\n" + e.getMessage());
e.printStackTrace();
}
} else {
@ -182,7 +182,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
// In case another object is already listening...
if (!persistableNetworkPayloadMapListeners.isEmpty())
persistableNetworkPayloadCollection.getMap().values().stream()
.forEach(payload -> persistableNetworkPayloadMapListeners.stream().forEach(e -> e.onAdded(payload)));
.forEach(payload -> persistableNetworkPayloadMapListeners.stream().forEach(e -> e.onAdded(payload)));
} else {
persistableNetworkPayloadCollection = new PersistableNetworkPayloadCollection();
}
@ -209,20 +209,20 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
Map<ByteArray, ProtectedStorageEntry> temp = new HashMap<>(map);
Set<ProtectedStorageEntry> toRemoveSet = new HashSet<>();
temp.entrySet().stream()
.filter(entry -> entry.getValue().isExpired())
.forEach(entry -> {
ByteArray hashOfPayload = entry.getKey();
ProtectedStorageEntry protectedStorageEntry = map.get(hashOfPayload);
if (!(protectedStorageEntry.getProtectedStoragePayload() instanceof PersistableNetworkPayload)) {
toRemoveSet.add(protectedStorageEntry);
log.debug("We found an expired data entry. We remove the protectedData:\n\t" + Utilities.toTruncatedString(protectedStorageEntry));
map.remove(hashOfPayload);
}
});
.filter(entry -> entry.getValue().isExpired())
.forEach(entry -> {
ByteArray hashOfPayload = entry.getKey();
ProtectedStorageEntry protectedStorageEntry = map.get(hashOfPayload);
if (!(protectedStorageEntry.getProtectedStoragePayload() instanceof PersistableNetworkPayload)) {
toRemoveSet.add(protectedStorageEntry);
log.debug("We found an expired data entry. We remove the protectedData:\n\t" + Utilities.toTruncatedString(protectedStorageEntry));
map.remove(hashOfPayload);
}
});
toRemoveSet.stream().forEach(
protectedDataToRemove -> hashMapChangedListeners.stream().forEach(
listener -> listener.onRemoved(protectedDataToRemove)));
protectedDataToRemove -> hashMapChangedListeners.stream().forEach(
listener -> listener.onRemoved(protectedDataToRemove)));
if (sequenceNumberMap.size() > 1000)
sequenceNumberMap.setMap(getPurgedSequenceNumberMap(sequenceNumberMap.getMap()));
@ -249,7 +249,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
refreshTTL((RefreshOfferMessage) networkEnvelop, peersNodeAddress, false);
} else if (networkEnvelop instanceof AddPersistableNetworkPayloadMessage) {
addPersistableNetworkPayload(((AddPersistableNetworkPayloadMessage) networkEnvelop).getPersistableNetworkPayload(),
peersNodeAddress, false, true, false, true);
peersNodeAddress, false, true, false, true);
}
});
}
@ -268,46 +268,46 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
if (connection.hasPeersNodeAddress() && !closeConnectionReason.isIntended) {
map.values().stream()
.forEach(protectedData -> {
ExpirablePayload expirablePayload = protectedData.getProtectedStoragePayload();
if (expirablePayload instanceof RequiresOwnerIsOnlinePayload) {
RequiresOwnerIsOnlinePayload requiresOwnerIsOnlinePayload = (RequiresOwnerIsOnlinePayload) expirablePayload;
NodeAddress ownerNodeAddress = requiresOwnerIsOnlinePayload.getOwnerNodeAddress();
if (ownerNodeAddress.equals(connection.getPeersNodeAddressOptional().get())) {
// We have a RequiresLiveOwnerData data object with the node address of the
// disconnected peer. We remove that data from our map.
.forEach(protectedData -> {
ExpirablePayload expirablePayload = protectedData.getProtectedStoragePayload();
if (expirablePayload instanceof RequiresOwnerIsOnlinePayload) {
RequiresOwnerIsOnlinePayload requiresOwnerIsOnlinePayload = (RequiresOwnerIsOnlinePayload) expirablePayload;
NodeAddress ownerNodeAddress = requiresOwnerIsOnlinePayload.getOwnerNodeAddress();
if (ownerNodeAddress.equals(connection.getPeersNodeAddressOptional().get())) {
// We have a RequiresLiveOwnerData data object with the node address of the
// disconnected peer. We remove that data from our map.
// Check if we have the data (e.g. OfferPayload)
ByteArray hashOfPayload = getHashAsByteArray(expirablePayload);
boolean containsKey = map.containsKey(hashOfPayload);
if (containsKey) {
log.debug("We remove the data as the data owner got disconnected with " +
"closeConnectionReason=" + closeConnectionReason);
// Check if we have the data (e.g. OfferPayload)
ByteArray hashOfPayload = getHashAsByteArray(expirablePayload);
boolean containsKey = map.containsKey(hashOfPayload);
if (containsKey) {
log.debug("We remove the data as the data owner got disconnected with " +
"closeConnectionReason=" + closeConnectionReason);
//noinspection ConstantConditions
Log.logIfStressTests("We remove the data as the data owner got disconnected with " +
"closeConnectionReason=" + closeConnectionReason +
" / isIntended=" + closeConnectionReason.isIntended +
" / peer=" + (connection.getPeersNodeAddressOptional().isPresent() ? connection.getPeersNodeAddressOptional().get() : "PeersNode unknown"));
//noinspection ConstantConditions
Log.logIfStressTests("We remove the data as the data owner got disconnected with " +
"closeConnectionReason=" + closeConnectionReason +
" / isIntended=" + closeConnectionReason.isIntended +
" / peer=" + (connection.getPeersNodeAddressOptional().isPresent() ? connection.getPeersNodeAddressOptional().get() : "PeersNode unknown"));
// We only set the data back by half of the TTL and remove the data only if is has
// expired after tha back dating.
// We might get connection drops which are not caused by the node going offline, so
// we give more tolerance with that approach, giving the node the change to
// refresh the TTL with a refresh message.
// We observed those issues during stress tests, but it might have been caused by the
// test set up (many nodes/connections over 1 router)
// TODO investigate what causes the disconnections.
// Usually the are: SOCKET_TIMEOUT ,TERMINATED (EOFException)
protectedData.backDate();
if (protectedData.isExpired())
doRemoveProtectedExpirableData(protectedData, hashOfPayload);
} else {
log.debug("Remove data ignored as we don't have an entry for that data.");
// We only set the data back by half of the TTL and remove the data only if is has
// expired after tha back dating.
// We might get connection drops which are not caused by the node going offline, so
// we give more tolerance with that approach, giving the node the change to
// refresh the TTL with a refresh message.
// We observed those issues during stress tests, but it might have been caused by the
// test set up (many nodes/connections over 1 router)
// TODO investigate what causes the disconnections.
// Usually the are: SOCKET_TIMEOUT ,TERMINATED (EOFException)
protectedData.backDate();
if (protectedData.isExpired())
doRemoveProtectedExpirableData(protectedData, hashOfPayload);
} else {
log.debug("Remove data ignored as we don't have an entry for that data.");
}
}
}
}
});
});
}
}
@ -339,12 +339,14 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
persistableNetworkPayloadMapStorage.queueUpForSave(persistableNetworkPayloadCollection, 2000);
persistableNetworkPayloadMapListeners.stream().forEach(e -> e.onAdded(payload));
}
if (allowBroadcast)
if (allowBroadcast) {
log.error("broadcast AddPersistableNetworkPayloadMessage");
broadcaster.broadcast(new AddPersistableNetworkPayloadMessage(payload), sender, null, isDataOwner);
}
return true;
} else {
log.warn("Publish date of payload is not matching our current time and outside of our tolerance.\n" +
"Payload={}; now={}", payload.toString(), new Date());
"Payload={}; now={}", payload.toString(), new Date());
return false;
}
} else {
@ -370,8 +372,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
ByteArray hashOfPayload = getHashAsByteArray(protectedStoragePayload);
boolean sequenceNrValid = isSequenceNrValid(protectedStorageEntry.getSequenceNumber(), hashOfPayload);
boolean result = checkPublicKeys(protectedStorageEntry, true)
&& checkSignature(protectedStorageEntry)
&& sequenceNrValid;
&& checkSignature(protectedStorageEntry)
&& sequenceNrValid;
boolean containsKey = map.containsKey(hashOfPayload);
if (containsKey)
@ -432,10 +434,10 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
final boolean checkSignature = checkSignature(ownerPubKey, hashOfDataAndSeqNr, signature);
final boolean hasSequenceNrIncreased = hasSequenceNrIncreased(sequenceNumber, hashOfPayload);
final boolean checkIfStoredDataPubKeyMatchesNewDataPubKey = checkIfStoredDataPubKeyMatchesNewDataPubKey(ownerPubKey,
hashOfPayload);
hashOfPayload);
boolean allValid = checkSignature &&
hasSequenceNrIncreased &&
checkIfStoredDataPubKeyMatchesNewDataPubKey;
hasSequenceNrIncreased &&
checkIfStoredDataPubKeyMatchesNewDataPubKey;
// printData("before refreshTTL");
if (allValid) {
@ -464,10 +466,10 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
if (!containsKey)
log.debug("Remove data ignored as we don't have an entry for that data.");
boolean result = containsKey
&& checkPublicKeys(protectedStorageEntry, false)
&& isSequenceNrValid(protectedStorageEntry.getSequenceNumber(), hashOfPayload)
&& checkSignature(protectedStorageEntry)
&& checkIfStoredDataPubKeyMatchesNewDataPubKey(protectedStorageEntry.getOwnerPubKey(), hashOfPayload);
&& checkPublicKeys(protectedStorageEntry, false)
&& isSequenceNrValid(protectedStorageEntry.getSequenceNumber(), hashOfPayload)
&& checkSignature(protectedStorageEntry)
&& checkIfStoredDataPubKeyMatchesNewDataPubKey(protectedStorageEntry.getOwnerPubKey(), hashOfPayload);
// printData("before remove");
if (result) {
@ -491,11 +493,11 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
if (!containsKey)
log.debug("Remove data ignored as we don't have an entry for that data.");
boolean result = containsKey
&& checkPublicKeys(protectedMailboxStorageEntry, false)
&& isSequenceNrValid(protectedMailboxStorageEntry.getSequenceNumber(), hashOfData)
&& protectedMailboxStorageEntry.getMailboxStoragePayload().getOwnerPubKey().equals(protectedMailboxStorageEntry.getReceiversPubKey()) // at remove both keys are the same (only receiver is able to remove data)
&& checkSignature(protectedMailboxStorageEntry)
&& checkIfStoredMailboxDataMatchesNewMailboxData(protectedMailboxStorageEntry.getReceiversPubKey(), hashOfData);
&& checkPublicKeys(protectedMailboxStorageEntry, false)
&& isSequenceNrValid(protectedMailboxStorageEntry.getSequenceNumber(), hashOfData)
&& protectedMailboxStorageEntry.getMailboxStoragePayload().getOwnerPubKey().equals(protectedMailboxStorageEntry.getReceiversPubKey()) // at remove both keys are the same (only receiver is able to remove data)
&& checkSignature(protectedMailboxStorageEntry)
&& checkIfStoredMailboxDataMatchesNewMailboxData(protectedMailboxStorageEntry.getReceiversPubKey(), hashOfData);
// printData("before removeMailboxData");
if (result) {
@ -512,7 +514,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
}
public ProtectedStorageEntry getProtectedStorageEntry(ProtectedStoragePayload protectedStoragePayload, KeyPair ownerStoragePubKey)
throws CryptoException {
throws CryptoException {
ByteArray hashOfData = getHashAsByteArray(protectedStoragePayload);
int sequenceNumber;
if (sequenceNumberMap.containsKey(hashOfData))
@ -526,7 +528,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
}
public RefreshOfferMessage getRefreshTTLMessage(ProtectedStoragePayload protectedStoragePayload, KeyPair ownerStoragePubKey)
throws CryptoException {
throws CryptoException {
ByteArray hashOfPayload = getHashAsByteArray(protectedStoragePayload);
int sequenceNumber;
if (sequenceNumberMap.containsKey(hashOfPayload))
@ -541,7 +543,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
public ProtectedMailboxStorageEntry getMailboxDataWithSignedSeqNr(MailboxStoragePayload expirableMailboxStoragePayload,
KeyPair storageSignaturePubKey, PublicKey receiversPublicKey)
throws CryptoException {
throws CryptoException {
ByteArray hashOfData = getHashAsByteArray(expirableMailboxStoragePayload);
int sequenceNumber;
if (sequenceNumberMap.containsKey(hashOfData))
@ -552,7 +554,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
byte[] hashOfDataAndSeqNr = P2PDataStorage.getHash(new DataAndSeqNrPair(expirableMailboxStoragePayload, sequenceNumber));
byte[] signature = Sig.sign(storageSignaturePubKey.getPrivate(), hashOfDataAndSeqNr);
return new ProtectedMailboxStorageEntry(expirableMailboxStoragePayload,
storageSignaturePubKey.getPublic(), sequenceNumber, signature, receiversPublicKey);
storageSignaturePubKey.getPublic(), sequenceNumber, signature, receiversPublicKey);
}
public void addHashMapChangedListener(HashMapChangedListener hashMapChangedListener) {
@ -587,12 +589,12 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
int storedSequenceNumber = sequenceNumberMap.get(hashOfData).sequenceNr;
if (newSequenceNumber >= storedSequenceNumber) {
log.trace("Sequence number is valid (>=). sequenceNumber = "
+ newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber);
+ newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber);
return true;
} else {
log.debug("Sequence number is invalid. sequenceNumber = "
+ newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber + "\n" +
"That can happen if the data owner gets an old delayed data storage message.");
+ newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber + "\n" +
"That can happen if the data owner gets an old delayed data storage message.");
return false;
}
} else {
@ -606,23 +608,23 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
int storedSequenceNumber = sequenceNumberMap.get(hashOfData).sequenceNr;
if (newSequenceNumber > storedSequenceNumber) {
log.trace("Sequence number has increased (>). sequenceNumber = "
+ newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber + " / hashOfData=" + hashOfData.toString());
+ newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber + " / hashOfData=" + hashOfData.toString());
return true;
} else if (newSequenceNumber == storedSequenceNumber) {
String msg;
if (newSequenceNumber == 0) {
msg = "Sequence number is equal to the stored one and both are 0." +
"That is expected for network_messages which never got updated (mailbox msg).";
"That is expected for network_messages which never got updated (mailbox msg).";
} else {
msg = "Sequence number is equal to the stored one. sequenceNumber = "
+ newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber;
+ newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber;
}
log.debug(msg);
return false;
} else {
log.debug("Sequence number is invalid. sequenceNumber = "
+ newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber + "\n" +
"That can happen if the data owner gets an old delayed data storage message.");
+ newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber + "\n" +
"That can happen if the data owner gets an old delayed data storage message.");
return false;
}
} else {
@ -636,7 +638,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
boolean result = Sig.verify(ownerPubKey, hashOfDataAndSeqNr, signature);
if (!result)
log.warn("Signature verification failed at checkSignature. " +
"That should not happen.");
"That should not happen.");
return result;
} catch (CryptoException e) {
@ -659,25 +661,25 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
MailboxStoragePayload payload = (MailboxStoragePayload) protectedStoragePayload;
if (isAddOperation)
result = payload.getSenderPubKeyForAddOperation() != null &&
payload.getSenderPubKeyForAddOperation().equals(protectedStorageEntry.getOwnerPubKey());
payload.getSenderPubKeyForAddOperation().equals(protectedStorageEntry.getOwnerPubKey());
else
result = payload.getOwnerPubKey() != null &&
payload.getOwnerPubKey().equals(protectedStorageEntry.getOwnerPubKey());
payload.getOwnerPubKey().equals(protectedStorageEntry.getOwnerPubKey());
} else {
result = protectedStorageEntry.getOwnerPubKey() != null &&
protectedStoragePayload != null &&
protectedStorageEntry.getOwnerPubKey().equals(protectedStoragePayload.getOwnerPubKey());
protectedStoragePayload != null &&
protectedStorageEntry.getOwnerPubKey().equals(protectedStoragePayload.getOwnerPubKey());
}
if (!result) {
String res1 = protectedStorageEntry.toString();
String res2 = "null";
if (protectedStoragePayload != null &&
protectedStoragePayload.getOwnerPubKey() != null)
protectedStoragePayload.getOwnerPubKey() != null)
res2 = Utilities.encodeToHex(protectedStoragePayload.getOwnerPubKey().getEncoded(), true);
log.warn("PublicKey of payload data and ProtectedStorageEntry are not matching. protectedStorageEntry=" + res1 +
"protectedStorageEntry.getStoragePayload().getOwnerPubKey()=" + res2);
"protectedStorageEntry.getStoragePayload().getOwnerPubKey()=" + res2);
}
return result;
}
@ -687,8 +689,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
boolean result = storedData.getOwnerPubKey() != null && storedData.getOwnerPubKey().equals(ownerPubKey);
if (!result)
log.warn("New data entry does not match our stored data. storedData.ownerPubKey=" +
(storedData.getOwnerPubKey() != null ? storedData.getOwnerPubKey().toString() : "null") +
", ownerPubKey=" + ownerPubKey);
(storedData.getOwnerPubKey() != null ? storedData.getOwnerPubKey().toString() : "null") +
", ownerPubKey=" + ownerPubKey);
return result;
}
@ -699,10 +701,10 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
ProtectedMailboxStorageEntry entry = (ProtectedMailboxStorageEntry) storedData;
// publicKey is not the same (stored: sender, new: receiver)
boolean result = entry.getReceiversPubKey().equals(receiversPubKey)
&& getHashAsByteArray(entry.getProtectedStoragePayload()).equals(hashOfData);
&& getHashAsByteArray(entry.getProtectedStoragePayload()).equals(hashOfData);
if (!result)
log.warn("New data entry does not match our stored data. entry.receiversPubKey=" + entry.getReceiversPubKey()
+ ", receiversPubKey=" + receiversPubKey);
+ ", receiversPubKey=" + receiversPubKey);
return result;
} else {
@ -737,28 +739,28 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
sb.append("Data set ").append(info).append(" operation");
// We print the items sorted by hash with the payload class name and id
List<Tuple2<String, ProtectedStorageEntry>> tempList = map.values().stream()
.map(e -> new Tuple2<>(org.bitcoinj.core.Utils.HEX.encode(getHashAsByteArray(e.getProtectedStoragePayload()).bytes), e))
.collect(Collectors.toList());
.map(e -> new Tuple2<>(org.bitcoinj.core.Utils.HEX.encode(getHashAsByteArray(e.getProtectedStoragePayload()).bytes), e))
.collect(Collectors.toList());
tempList.sort((o1, o2) -> o1.first.compareTo(o2.first));
tempList.stream().forEach(e -> {
final ProtectedStorageEntry storageEntry = e.second;
final ProtectedStoragePayload protectedStoragePayload = storageEntry.getProtectedStoragePayload();
final MapValue mapValue = sequenceNumberMap.get(getHashAsByteArray(protectedStoragePayload));
sb.append("\n")
.append("Hash=")
.append(e.first)
.append("; Class=")
.append(protectedStoragePayload.getClass().getSimpleName())
.append("; SequenceNumbers (Object/Stored)=")
.append(storageEntry.getSequenceNumber())
.append(" / ")
.append(mapValue != null ? mapValue.sequenceNr : "null")
.append("; TimeStamp (Object/Stored)=")
.append(storageEntry.getCreationTimeStamp())
.append(" / ")
.append(mapValue != null ? mapValue.timeStamp : "null")
.append("; Payload=")
.append(Utilities.toTruncatedString(protectedStoragePayload));
.append("Hash=")
.append(e.first)
.append("; Class=")
.append(protectedStoragePayload.getClass().getSimpleName())
.append("; SequenceNumbers (Object/Stored)=")
.append(storageEntry.getSequenceNumber())
.append(" / ")
.append(mapValue != null ? mapValue.sequenceNr : "null")
.append("; TimeStamp (Object/Stored)=")
.append(storageEntry.getCreationTimeStamp())
.append(" / ")
.append(mapValue != null ? mapValue.timeStamp : "null")
.append("; Payload=")
.append(Utilities.toTruncatedString(protectedStoragePayload));
});
sb.append("\n------------------------------------------------------------\n");
log.debug(sb.toString());
@ -798,9 +800,9 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
@Override
public com.google.protobuf.Message toProtoMessage() {
return PB.DataAndSeqNrPair.newBuilder()
.setPayload((PB.StoragePayload) protectedStoragePayload.toProtoMessage())
.setSequenceNumber(sequenceNumber)
.build();
.setPayload((PB.StoragePayload) protectedStoragePayload.toProtoMessage())
.setSequenceNumber(sequenceNumber)
.build();
}
}
@ -817,8 +819,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
@Override
public String toString() {
return "ByteArray{" +
"bytes as Hex=" + Hex.toHexString(bytes) +
'}';
"bytes as Hex=" + Hex.toHexString(bytes) +
'}';
}
public ByteArray(byte[] bytes) {
@ -849,10 +851,10 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
public static Set<P2PDataStorage.ByteArray> convertBytesSetToByteArraySet(Set<byte[]> set) {
return set != null ?
set.stream()
.map(P2PDataStorage.ByteArray::new)
.collect(Collectors.toSet())
: new HashSet<>();
set.stream()
.map(P2PDataStorage.ByteArray::new)
.collect(Collectors.toSet())
: new HashSet<>();
}
}

View File

@ -3,16 +3,13 @@ package io.bisq.network.p2p.storage.messages;
import io.bisq.common.app.Version;
import io.bisq.common.proto.network.NetworkProtoResolver;
import io.bisq.generated.protobuffer.PB;
import io.bisq.network.p2p.storage.payload.CapabilityRequiringPayload;
import io.bisq.network.p2p.storage.payload.PersistableNetworkPayload;
import lombok.EqualsAndHashCode;
import lombok.Value;
import java.util.List;
@EqualsAndHashCode(callSuper = true)
@Value
public final class AddPersistableNetworkPayloadMessage extends BroadcastMessage implements CapabilityRequiringPayload {
public final class AddPersistableNetworkPayloadMessage extends BroadcastMessage {
private final PersistableNetworkPayload persistableNetworkPayload;
public AddPersistableNetworkPayloadMessage(PersistableNetworkPayload persistableNetworkPayload) {
@ -43,9 +40,4 @@ public final class AddPersistableNetworkPayloadMessage extends BroadcastMessage
return new AddPersistableNetworkPayloadMessage((PersistableNetworkPayload) resolver.fromProto(proto.getPayload()),
messageVersion);
}
@Override
public List<Integer> getRequiredCapabilities() {
return null;
}
}