Use ByteArray for map key, fix auth loops

This commit is contained in:
Manfred Karrer 2015-11-11 00:50:56 +01:00
parent b827a9812d
commit 1f3c2c1479
14 changed files with 227 additions and 131 deletions

View file

@ -0,0 +1,32 @@
package io.bitsquare.common;
import io.bitsquare.app.Version;
import java.io.Serializable;
import java.util.Arrays;
public class ByteArray implements Serializable {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final byte[] bytes;
public ByteArray(byte[] bytes) {
this.bytes = bytes;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ByteArray)) return false;
ByteArray byteArray = (ByteArray) o;
return Arrays.equals(bytes, byteArray.bytes);
}
@Override
public int hashCode() {
return bytes != null ? Arrays.hashCode(bytes) : 0;
}
}

View file

@ -52,7 +52,6 @@
<TextField fx:id="connectedPeersBTC" GridPane.rowIndex="2" GridPane.columnIndex="1"
mouseTransparent="true" focusTraversable="false"/>
<TitledGroupBg text="P2P network" GridPane.rowIndex="3" GridPane.rowSpan="5">
<padding>
<Insets top="50.0"/>

View file

@ -36,6 +36,7 @@ import javafx.beans.value.ChangeListener;
import javafx.collections.FXCollections;
import javafx.fxml.FXML;
import javafx.geometry.Insets;
import javafx.geometry.VPos;
import javafx.scene.control.ComboBox;
import javafx.scene.control.Label;
import javafx.scene.control.TextArea;
@ -84,6 +85,7 @@ public class NetworkSettingsView extends ActivatableViewAndModel<GridPane, Activ
public void initialize() {
GridPane.setMargin(authenticatedPeersLabel, new Insets(4, 0, 0, 0));
GridPane.setValignment(authenticatedPeersLabel, VPos.TOP);
bitcoinNetwork.setText(bitcoinNetworkString);
connectedPeersBTC.textProperty().bind(createStringBinding(() -> String.valueOf(walletService.numPeersProperty().get()), walletService
.numPeersProperty()));

View file

@ -1,11 +1,14 @@
package io.bitsquare.p2p;
import io.bitsquare.common.crypto.Hash;
import java.io.Serializable;
import java.util.regex.Pattern;
public class Address implements Serializable {
public final String hostName;
public final int port;
transient private byte[] blurredAddress;
public Address(String hostName, int port) {
this.hostName = hostName;
@ -22,8 +25,11 @@ public class Address implements Serializable {
return hostName + ":" + port;
}
public String getAddressMask() {
return getFullAddress().substring(0, 2);
// We use just a few chars form or address to blur the potential receiver for sent messages
public byte[] getBlurredAddress() {
if (blurredAddress == null)
blurredAddress = Hash.getHash(getFullAddress().substring(0, 2));
return blurredAddress;
}
@Override

View file

@ -7,9 +7,9 @@ import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.bitsquare.app.Log;
import io.bitsquare.app.ProgramArguments;
import io.bitsquare.common.ByteArray;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.crypto.CryptoException;
import io.bitsquare.common.crypto.Hash;
import io.bitsquare.common.crypto.KeyRing;
import io.bitsquare.common.crypto.PubKeyRing;
import io.bitsquare.crypto.EncryptionService;
@ -26,6 +26,7 @@ import io.bitsquare.p2p.storage.data.ExpirableMailboxPayload;
import io.bitsquare.p2p.storage.data.ExpirablePayload;
import io.bitsquare.p2p.storage.data.ProtectedData;
import io.bitsquare.p2p.storage.data.ProtectedMailboxData;
import io.bitsquare.p2p.storage.messages.DataExchangeMessage;
import io.bitsquare.p2p.storage.messages.GetDataRequest;
import io.bitsquare.p2p.storage.messages.GetDataResponse;
import javafx.beans.property.BooleanProperty;
@ -40,7 +41,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.math.BigInteger;
import java.security.PublicKey;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
@ -83,8 +83,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private final BooleanProperty authenticated = new SimpleBooleanProperty();
private MonadicBinding<Boolean> readyForAuthentication;
public final IntegerProperty numAuthenticatedPeers = new SimpleIntegerProperty(0);
@Nullable
private byte[] blurredAddressHash = null;
///////////////////////////////////////////////////////////////////////////////////////////
@ -167,20 +165,20 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
public void onMessage(Message message, Connection connection) {
if (message instanceof GetDataRequest) {
Log.traceCall(message.toString());
log.info("Received GetDataSetMessage: " + message);
networkNode.sendMessage(connection, new GetDataResponse(getDataSet()));
} else if (message instanceof GetDataResponse) {
Log.traceCall(message.toString());
GetDataResponse getDataResponse = (GetDataResponse) message;
log.info("Received GetDataResponse: " + message);
HashSet<ProtectedData> set = getDataResponse.set;
if (!set.isEmpty()) {
// we keep that connection open as the bootstrapping peer will use that for the authentication
// as we are not authenticated yet the data adding will not be broadcasted
set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress()));
} else {
log.trace("Received DataSetMessage: Empty data set");
}
setRequestingDataCompleted();
} else if (message instanceof DataExchangeMessage) {
Log.traceCall(message.toString());
DataExchangeMessage dataExchangeMessage = (DataExchangeMessage) message;
HashSet<ProtectedData> set = dataExchangeMessage.set;
set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress()));
setRequestingDataCompleted();
} else if (message instanceof SealedAndSignedMessage) {
Log.traceCall(message.toString());
@ -205,6 +203,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
private boolean verifyBlurredAddressHash(SealedAndSignedMessage sealedAndSignedMessage) {
byte[] blurredAddressHash = getAddress().getBlurredAddress();
return blurredAddressHash != null &&
Arrays.equals(blurredAddressHash, sealedAndSignedMessage.blurredAddressHash);
}
@ -259,17 +258,14 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override
public void onPeerAdded(Peer peer) {
Log.traceCall();
}
@Override
public void onPeerRemoved(Address address) {
Log.traceCall();
}
@Override
public void onConnectionAuthenticated(Connection connection) {
Log.traceCall();
}
@ -292,8 +288,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
Log.traceCall();
checkArgument(networkNode.getAddress() != null, "Address must be set when we have the hidden service ready");
blurredAddressHash = Hash.getHash(getAddress().getAddressMask());
p2pServiceListeners.stream().forEach(e -> e.onHiddenServicePublished());
// 3. (or 2.). Step: Hidden service is published
@ -369,19 +363,20 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
// 5. Step:
private void sendGetAllDataMessageAfterAuthentication(final Peer peer) {
Log.traceCall();
log.trace("sendGetDataSetMessageAfterAuthentication");
// After authentication we request again data as we might have missed pushed data in the meantime
SettableFuture<Connection> future = networkNode.sendMessage(peer.connection, new GetDataRequest());
Log.traceCall(peer.toString());
// We have to exchange the data again as we might have missed pushed data in the meantime
// After authentication we send our data set to the other peer.
// As he will do the same we will get his actual data set.
SettableFuture<Connection> future = networkNode.sendMessage(peer.connection, new DataExchangeMessage(getDataSet()));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
log.info("onPeerAddressAuthenticated Send GetAllDataMessage to " + peer.address + " succeeded.");
log.info("sendGetAllDataMessageAfterAuthentication Send DataExchangeMessage to " + peer.address + " succeeded.");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.warn("onPeerAddressAuthenticated Send GetAllDataMessage to " + peer.address + " failed. " +
log.warn("sendGetAllDataMessageAfterAuthentication Send DataExchangeMessage to " + peer.address + " failed. " +
"Exception:" + throwable.getMessage());
}
});
@ -463,7 +458,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
if (encryptionService != null) {
try {
SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage(
encryptionService.encryptAndSign(pubKeyRing, message), Hash.getHash(peerAddress.getAddressMask()));
encryptionService.encryptAndSign(pubKeyRing, message), peerAddress.getBlurredAddress());
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, sealedAndSignedMessage);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
@ -509,7 +504,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
if (encryptionService != null) {
try {
SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage(
encryptionService.encryptAndSign(peersPubKeyRing, message), Hash.getHash(peerAddress.getAddressMask()));
encryptionService.encryptAndSign(peersPubKeyRing, message), peerAddress.getBlurredAddress());
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, sealedAndSignedMessage);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
@ -615,7 +610,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
}
public Map<BigInteger, ProtectedData> getDataMap() {
public Map<ByteArray, ProtectedData> getDataMap() {
Log.traceCall();
return dataStorage.getMap();
}

View file

@ -87,6 +87,8 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
if (connection != null) {
return sendMessage(connection, message);
} else {
log.debug("inBoundConnections " + inBoundConnections.toString());
log.debug("outBoundConnections " + outBoundConnections.toString());
log.trace("We have not found any connection for that peerAddress. " +
"We will create a new outbound connection.");
@ -220,7 +222,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
@Override
public void onConnection(Connection connection) {
Log.traceCall();
Log.traceCall("NetworkNode connection=" + connection);
connectionListeners.stream().forEach(e -> e.onConnection(connection));
}
@ -298,7 +300,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
startServerConnectionListener = new ConnectionListener() {
@Override
public void onConnection(Connection connection) {
Log.traceCall();
Log.traceCall("startServerConnectionListener connection=" + connection);
// we still have not authenticated so put it to the temp list
inBoundConnections.add(connection);
NetworkNode.this.onConnection(connection);
@ -333,13 +335,15 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
}
private Optional<Connection> lookupOutboundConnection(Address peerAddress) {
Log.traceCall();
Log.traceCall(peerAddress.toString());
log.debug("outBoundConnections " + outBoundConnections);
return outBoundConnections.stream()
.filter(e -> peerAddress.equals(e.getPeerAddress())).findAny();
}
private Optional<Connection> lookupInboundConnection(Address peerAddress) {
Log.traceCall();
Log.traceCall(peerAddress.toString());
log.debug("inBoundConnections " + inBoundConnections);
return inBoundConnections.stream()
.filter(e -> peerAddress.equals(e.getPeerAddress())).findAny();
}

View file

@ -67,17 +67,18 @@ public class AuthenticationHandshake implements MessageListener {
if (message instanceof AuthenticationResponse) {
// Requesting peer
AuthenticationResponse authenticationResponse = (AuthenticationResponse) message;
connection.setPeerAddress(authenticationResponse.address);
Address peerAddress = authenticationResponse.address;
log.trace("ChallengeMessage from " + peerAddress + " at " + myAddress);
boolean verified = nonce != 0 && nonce == authenticationResponse.requesterNonce;
if (verified) {
connection.setPeerAddress(peerAddress);
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress,
new GetPeersAuthRequest(myAddress, authenticationResponse.challengerNonce, new HashSet<>(peerGroup.getAllPeerAddresses())));
new GetPeersAuthRequest(myAddress, authenticationResponse.responderNonce, new HashSet<>(peerGroup.getAllPeerAddresses())));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("GetPeersAuthRequest sent successfully from " + myAddress + " to " + peerAddress);
connection.setPeerAddress(peerAddress);
}
@Override
@ -95,21 +96,29 @@ public class AuthenticationHandshake implements MessageListener {
GetPeersAuthRequest getPeersAuthRequest = (GetPeersAuthRequest) message;
Address peerAddress = getPeersAuthRequest.address;
log.trace("GetPeersMessage from " + peerAddress + " at " + myAddress);
boolean verified = nonce != 0 && nonce == getPeersAuthRequest.challengerNonce;
boolean verified = nonce != 0 && nonce == getPeersAuthRequest.responderNonce;
if (verified) {
// we add the reported peers to our own set
HashSet<Address> peerAddresses = getPeersAuthRequest.peerAddresses;
log.trace("Received peers: " + peerAddresses);
peerGroup.addToReportedPeers(peerAddresses, connection);
// we create the msg with our already collected peer addresses (before adding the new ones)
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress,
new GetPeersAuthResponse(myAddress, new HashSet<>(peerGroup.getAllPeerAddresses())));
log.trace("sent GetPeersAuthResponse to " + peerAddress + " from " + myAddress
+ " with allPeers=" + peerGroup.getAllPeerAddresses());
// now we add the reported peers to our own set
HashSet<Address> peerAddresses = getPeersAuthRequest.peerAddresses;
log.trace("Received peers: " + peerAddresses);
peerGroup.addToReportedPeers(peerAddresses, connection);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("GetPeersAuthResponse sent successfully from " + myAddress + " to " + peerAddress);
connection.setPeerAddress(peerAddress);
log.info("AuthenticationComplete: Peer with address " + peerAddress
+ " authenticated (" + connection.getUid() + "). Took "
+ (System.currentTimeMillis() - startAuthTs) + " ms.");
AuthenticationHandshake.this.onSuccess(connection);
}
@Override
@ -118,12 +127,6 @@ public class AuthenticationHandshake implements MessageListener {
onFault(throwable);
}
});
log.info("AuthenticationComplete: Peer with address " + peerAddress
+ " authenticated (" + connection.getUid() + "). Took "
+ (System.currentTimeMillis() - startAuthTs) + " ms.");
onSuccess(connection);
} else {
log.warn("verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonce=" + nonce);
onFault(new Exception("Verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonce=" + nonce));
@ -219,7 +222,7 @@ public class AuthenticationHandshake implements MessageListener {
// inconsistent state (removal of connection from NetworkNode.authenticatedConnections)
log.trace("processAuthenticationMessage: connection.shutDown complete. RequestAuthenticationMessage from " + peerAddress + " at " + myAddress);
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new AuthenticationResponse(myAddress, authenticationRequest.nonce, getAndSetNonce()));
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new AuthenticationResponse(myAddress, authenticationRequest.requesterNonce, getAndSetNonce()));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {

View file

@ -55,6 +55,7 @@ public class Peer implements Serializable {
return "Peer{" +
"address=" + address +
", pingNonce=" + pingNonce +
", connection=" + connection +
'}';
}
}

View file

@ -6,7 +6,6 @@ import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.app.Log;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.util.Tuple2;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.network.Connection;
@ -44,10 +43,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
MAX_CONNECTIONS = maxConnections;
}
private static final int SEND_PING_INTERVAL = new Random().nextInt(5 * 60 * 1000) + 5 * 60 * 1000;
private static final int PING_AFTER_CONNECTION_INACTIVITY = 30 * 1000;
private static final int GET_PEERS_INTERVAL = new Random().nextInt(1 * 60 * 1000) + 1 * 60 * 1000; // 1-2 min.
private static final int RETRY_FILL_AUTH_PEERS = GET_PEERS_INTERVAL + 5000;
private static final int MAX_REPORTED_PEERS = 1000;
private final NetworkNode networkNode;
@ -58,8 +54,8 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private final Set<Address> reportedPeerAddresses = new HashSet<>();
private final Map<Address, AuthenticationHandshake> authenticationHandshakes = new ConcurrentHashMap<>();
private final Timer sendPingTimer = new Timer();
private final Timer getPeersTimer = new Timer();
private Timer sendPingTimer = new Timer();
private Timer getPeersTimer = new Timer();
private boolean shutDownInProgress;
private boolean firstPeerAdded = false;
@ -78,7 +74,8 @@ public class PeerGroup implements MessageListener, ConnectionListener {
networkNode.addMessageListener(this);
networkNode.addConnectionListener(this);
setupMaintenanceTimer();
startMaintenanceTimer();
startGetPeersTimer();
}
@ -215,6 +212,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
"We have that peer already authenticated. That must never happen.");
if (!authenticationHandshakes.containsKey(peerAddress)) {
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress());
authenticationHandshakes.put(peerAddress, authenticationHandshake);
SettableFuture<Connection> future = authenticationHandshake.requestAuthentication(remainingAddresses, peerAddress);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
@ -224,15 +222,15 @@ public class PeerGroup implements MessageListener, ConnectionListener {
if (continueOnSuccess) {
if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) {
log.info("We still don't have enough connections. Lets try the reported peers.");
authenticateToRemainingReportedPeers();
authenticateToRemainingReportedPeers(true);
} else {
log.info("We have already enough connections.");
}
} else {
log.info("We have already tried all reported peers and seed nodes. " +
"We stop bootstrapping now, but will repeat after an while.");
UserThread.runAfter(() -> authenticateToRemainingReportedPeers(),
RETRY_FILL_AUTH_PEERS, TimeUnit.MILLISECONDS);
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeers(true),
1, 2, TimeUnit.MINUTES);
}
}
}
@ -255,12 +253,12 @@ public class PeerGroup implements MessageListener, ConnectionListener {
authenticateToSeedNode(tupleOptional.get().second, tupleOptional.get().first, true);
} else if (reportedPeerAddresses.size() > 0) {
log.info("We don't have any more seed nodes for connecting. Lets try the reported peers.");
authenticateToRemainingReportedPeers();
authenticateToRemainingReportedPeers(true);
} else {
log.info("We don't have any more seed nodes or reported nodes for connecting. " +
"We stop bootstrapping now, but will repeat after an while.");
UserThread.runAfter(() -> authenticateToRemainingReportedPeers(),
RETRY_FILL_AUTH_PEERS, TimeUnit.MILLISECONDS);
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeers(true),
1, 2, TimeUnit.MINUTES);
}
}
});
@ -269,12 +267,17 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
}
private void authenticateToRemainingReportedPeers() {
private void authenticateToRemainingReportedPeers(boolean calledFromSeedNodeMethod) {
Log.traceCall();
Optional<Tuple2<Address, Set<Address>>> tupleOptional = getRandomItemAndRemainingSet(reportedPeerAddresses);
if (tupleOptional.isPresent()) {
log.info("We try to authenticate to a random peer. " + tupleOptional.get().first);
authenticateToReportedPeer(tupleOptional.get().second, tupleOptional.get().first);
} else if (calledFromSeedNodeMethod) {
log.info("We don't have any reported peers for connecting. " +
"As we tried recently the seed nodes we will wait a bit before repeating.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingSeedNodes(),
1, 2, TimeUnit.MINUTES);
} else {
log.info("We don't have any reported peers for connecting. Lets try the remaining seed nodes.");
authenticateToRemainingSeedNodes();
@ -289,6 +292,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
"We have that peer already authenticated. That must never happen.");
if (!authenticationHandshakes.containsKey(peerAddress)) {
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress());
authenticationHandshakes.put(peerAddress, authenticationHandshake);
SettableFuture<Connection> future = authenticationHandshake.requestAuthentication(remainingAddresses, peerAddress);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
@ -299,10 +303,12 @@ public class PeerGroup implements MessageListener, ConnectionListener {
if (reportedPeerAddresses.size() > 0) {
log.info("We still don't have enough connections. " +
"Lets try the remaining reported peer addresses.");
authenticateToRemainingReportedPeers();
authenticateToRemainingReportedPeers(false);
} else {
log.info("We still don't have enough connections. Lets try the remaining seed nodes.");
authenticateToRemainingSeedNodes();
log.info("We still don't have enough connections. " +
"Lets wait a bit and then try the remaining seed nodes.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingSeedNodes(),
1, 2, TimeUnit.MINUTES);
}
} else {
log.info("We have already enough connections.");
@ -316,8 +322,17 @@ public class PeerGroup implements MessageListener, ConnectionListener {
throwable.printStackTrace();
removePeer(peerAddress);
authenticateToRemainingReportedPeers(false);
if (reportedPeerAddresses.size() > 0) {
log.info("Authentication failed. Lets try again with the remaining reported peer addresses.");
authenticateToRemainingReportedPeers();
authenticateToRemainingReportedPeers(false);
} else {
log.info("Authentication failed. " +
"Lets wait a bit and then try the remaining seed nodes.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingSeedNodes(),
1, 2, TimeUnit.MINUTES);
}
}
});
} else {
@ -334,8 +349,8 @@ public class PeerGroup implements MessageListener, ConnectionListener {
} else {
log.info("We don't have any more seed nodes for connecting. " +
"We stop bootstrapping now, but will repeat after an while.");
UserThread.runAfter(() -> authenticateToRemainingReportedPeers(),
RETRY_FILL_AUTH_PEERS, TimeUnit.MILLISECONDS);
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeers(true),
1, 2, TimeUnit.MINUTES);
}
}
@ -350,6 +365,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
"We have that seed node already authenticated. That must never happen.");
if (!authenticationHandshakes.containsKey(peerAddress)) {
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress());
authenticationHandshakes.put(peerAddress, authenticationHandshake);
SettableFuture<Connection> future = authenticationHandshake.requestAuthenticationToPeer(peerAddress);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
@ -377,7 +393,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private void setAuthenticated(Connection connection, Address peerAddress) {
Log.traceCall(peerAddress.getFullAddress());
if (!authenticationHandshakes.containsKey(peerAddress))
if (authenticationHandshakes.containsKey(peerAddress))
authenticationHandshakes.remove(peerAddress);
log.info("\n\n############################################################\n" +
"We are authenticated to:" +
@ -412,38 +428,28 @@ public class PeerGroup implements MessageListener, ConnectionListener {
// Maintenance
///////////////////////////////////////////////////////////////////////////////////////////
private void setupMaintenanceTimer() {
private void startMaintenanceTimer() {
Log.traceCall();
sendPingTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
Utilities.setThreadName("MaintenanceTimer");
try {
UserThread.execute(() -> {
if (sendPingTimer != null)
sendPingTimer.cancel();
sendPingTimer = UserThread.runAfterRandomDelay(() -> {
checkIfConnectedPeersExceeds();
pingPeers();
});
} catch (Throwable t) {
log.error("Executing task failed. " + t.getMessage());
t.printStackTrace();
}
}
}, SEND_PING_INTERVAL, SEND_PING_INTERVAL);
getPeersTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
Utilities.setThreadName("GetPeersTimer");
try {
UserThread.execute(() -> trySendGetPeersRequest());
} catch (Throwable t) {
log.error("Executing task failed. " + t.getMessage());
t.printStackTrace();
}
}
}, GET_PEERS_INTERVAL, GET_PEERS_INTERVAL);
startMaintenanceTimer();
}, 5, 10, TimeUnit.MINUTES);
}
private void startGetPeersTimer() {
Log.traceCall();
if (getPeersTimer != null)
getPeersTimer.cancel();
getPeersTimer = UserThread.runAfterRandomDelay(() -> {
trySendGetPeersRequest();
startGetPeersTimer();
}, 1, 2, TimeUnit.MINUTES);
}
private boolean checkIfConnectedPeersExceeds() {
Log.traceCall();
@ -484,7 +490,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
removePeer(e.address);
}
});
}, 5, 10));
}, 1, 10));
}
private void trySendGetPeersRequest() {
@ -659,7 +665,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private void removePeer(@Nullable Address peerAddress) {
Log.traceCall("peerAddress=" + peerAddress);
if (peerAddress != null) {
if (!authenticationHandshakes.containsKey(peerAddress))
if (authenticationHandshakes.containsKey(peerAddress))
authenticationHandshakes.remove(peerAddress);
boolean contained = reportedPeerAddresses.remove(peerAddress);

View file

@ -8,18 +8,18 @@ public final class AuthenticationRequest extends AuthenticationMessage {
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final Address address;
public final long nonce;
public final long requesterNonce;
public AuthenticationRequest(Address address, long nonce) {
public AuthenticationRequest(Address address, long requesterNonce) {
this.address = address;
this.nonce = nonce;
this.requesterNonce = requesterNonce;
}
@Override
public String toString() {
return "AuthenticationRequest{" +
"address=" + address +
", nonce=" + nonce +
", nonce=" + requesterNonce +
"} " + super.toString();
}
}

View file

@ -9,12 +9,12 @@ public final class AuthenticationResponse extends AuthenticationMessage {
public final Address address;
public final long requesterNonce;
public final long challengerNonce;
public final long responderNonce;
public AuthenticationResponse(Address address, long requesterNonce, long challengerNonce) {
public AuthenticationResponse(Address address, long requesterNonce, long responderNonce) {
this.address = address;
this.requesterNonce = requesterNonce;
this.challengerNonce = challengerNonce;
this.responderNonce = responderNonce;
}
@Override
@ -22,7 +22,7 @@ public final class AuthenticationResponse extends AuthenticationMessage {
return "AuthenticationResponse{" +
"address=" + address +
", requesterNonce=" + requesterNonce +
", challengerNonce=" + challengerNonce +
", challengerNonce=" + responderNonce +
"} " + super.toString();
}
}

View file

@ -10,12 +10,12 @@ public final class GetPeersAuthRequest extends AuthenticationMessage {
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final Address address;
public final long challengerNonce;
public final long responderNonce;
public final HashSet<Address> peerAddresses;
public GetPeersAuthRequest(Address address, long challengerNonce, HashSet<Address> peerAddresses) {
public GetPeersAuthRequest(Address address, long responderNonce, HashSet<Address> peerAddresses) {
this.address = address;
this.challengerNonce = challengerNonce;
this.responderNonce = responderNonce;
this.peerAddresses = peerAddresses;
}
@ -23,7 +23,7 @@ public final class GetPeersAuthRequest extends AuthenticationMessage {
public String toString() {
return "GetPeersAuthRequest{" +
"address=" + address +
", challengerNonce=" + challengerNonce +
", challengerNonce=" + responderNonce +
", peerAddresses=" + peerAddresses +
"} " + super.toString();
}

View file

@ -2,6 +2,7 @@ package io.bitsquare.p2p.storage;
import com.google.common.annotations.VisibleForTesting;
import io.bitsquare.app.Log;
import io.bitsquare.common.ByteArray;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.crypto.CryptoException;
import io.bitsquare.common.crypto.Hash;
@ -25,7 +26,6 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.File;
import java.math.BigInteger;
import java.security.KeyPair;
import java.security.PublicKey;
import java.util.HashMap;
@ -42,9 +42,9 @@ public class ProtectedExpirableDataStorage implements MessageListener {
public static int CHECK_TTL_INTERVAL = 10 * 60 * 1000;
private final PeerGroup peerGroup;
private final Map<BigInteger, ProtectedData> map = new HashMap<>();
private final Map<ByteArray, ProtectedData> map = new HashMap<>();
private final CopyOnWriteArraySet<HashMapChangedListener> hashMapChangedListeners = new CopyOnWriteArraySet<>();
private HashMap<BigInteger, Integer> sequenceNumberMap = new HashMap<>();
private HashMap<ByteArray, Integer> sequenceNumberMap = new HashMap<>();
private final Storage<HashMap> storage;
private final Timer timer = new Timer();
private volatile boolean shutDownInProgress;
@ -65,7 +65,7 @@ public class ProtectedExpirableDataStorage implements MessageListener {
private void init() {
Log.traceCall();
HashMap<BigInteger, Integer> persisted = storage.initAndGetPersisted(sequenceNumberMap, "sequenceNumberMap");
HashMap<ByteArray, Integer> persisted = storage.initAndGetPersisted(sequenceNumberMap, "sequenceNumberMap");
if (persisted != null) {
sequenceNumberMap = persisted;
}
@ -137,7 +137,7 @@ public class ProtectedExpirableDataStorage implements MessageListener {
public boolean add(ProtectedData protectedData, @Nullable Address sender) {
Log.traceCall();
BigInteger hashOfPayload = getHashAsBigInteger(protectedData.expirablePayload);
ByteArray hashOfPayload = getHashAsByteArray(protectedData.expirablePayload);
boolean containsKey = map.containsKey(hashOfPayload);
boolean result = checkPublicKeys(protectedData, true)
&& checkSignature(protectedData);
@ -171,7 +171,7 @@ public class ProtectedExpirableDataStorage implements MessageListener {
public boolean remove(ProtectedData protectedData, @Nullable Address sender) {
Log.traceCall();
BigInteger hashOfPayload = getHashAsBigInteger(protectedData.expirablePayload);
ByteArray hashOfPayload = getHashAsByteArray(protectedData.expirablePayload);
boolean containsKey = map.containsKey(hashOfPayload);
if (!containsKey) log.debug("Remove data ignored as we don't have an entry for that data.");
boolean result = containsKey
@ -196,7 +196,7 @@ public class ProtectedExpirableDataStorage implements MessageListener {
public boolean removeMailboxData(ProtectedMailboxData protectedMailboxData, @Nullable Address sender) {
Log.traceCall();
BigInteger hashOfData = getHashAsBigInteger(protectedMailboxData.expirablePayload);
ByteArray hashOfData = getHashAsByteArray(protectedMailboxData.expirablePayload);
boolean containsKey = map.containsKey(hashOfData);
if (!containsKey) log.debug("Remove data ignored as we don't have an entry for that data.");
boolean result = containsKey
@ -219,14 +219,14 @@ public class ProtectedExpirableDataStorage implements MessageListener {
return result;
}
public Map<BigInteger, ProtectedData> getMap() {
public Map<ByteArray, ProtectedData> getMap() {
return map;
}
public ProtectedData getDataWithSignedSeqNr(ExpirablePayload payload, KeyPair ownerStoragePubKey)
throws CryptoException {
Log.traceCall();
BigInteger hashOfData = getHashAsBigInteger(payload);
ByteArray hashOfData = getHashAsByteArray(payload);
int sequenceNumber;
if (sequenceNumberMap.containsKey(hashOfData))
sequenceNumber = sequenceNumberMap.get(hashOfData) + 1;
@ -242,7 +242,7 @@ public class ProtectedExpirableDataStorage implements MessageListener {
KeyPair storageSignaturePubKey, PublicKey receiversPublicKey)
throws CryptoException {
Log.traceCall();
BigInteger hashOfData = getHashAsBigInteger(expirableMailboxPayload);
ByteArray hashOfData = getHashAsByteArray(expirableMailboxPayload);
int sequenceNumber;
if (sequenceNumberMap.containsKey(hashOfData))
sequenceNumber = sequenceNumberMap.get(hashOfData) + 1;
@ -265,7 +265,7 @@ public class ProtectedExpirableDataStorage implements MessageListener {
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void doRemoveProtectedExpirableData(ProtectedData protectedData, BigInteger hashOfPayload) {
private void doRemoveProtectedExpirableData(ProtectedData protectedData, ByteArray hashOfPayload) {
Log.traceCall();
map.remove(hashOfPayload);
log.trace("Data removed from our map. We broadcast the message to our peers.");
@ -278,7 +278,7 @@ public class ProtectedExpirableDataStorage implements MessageListener {
log.info(sb.toString());
}
private boolean isSequenceNrValid(ProtectedData data, BigInteger hashOfData) {
private boolean isSequenceNrValid(ProtectedData data, ByteArray hashOfData) {
Log.traceCall();
int newSequenceNumber = data.sequenceNumber;
Integer storedSequenceNumber = sequenceNumberMap.get(hashOfData);
@ -325,10 +325,10 @@ public class ProtectedExpirableDataStorage implements MessageListener {
return result;
}
private boolean checkIfStoredDataMatchesNewData(ProtectedData data, BigInteger hashOfData) {
private boolean checkIfStoredDataMatchesNewData(ProtectedData data, ByteArray hashOfData) {
Log.traceCall();
ProtectedData storedData = map.get(hashOfData);
boolean result = getHashAsBigInteger(storedData.expirablePayload).equals(hashOfData)
boolean result = getHashAsByteArray(storedData.expirablePayload).equals(hashOfData)
&& storedData.ownerStoragePubKey.equals(data.ownerStoragePubKey);
if (!result)
log.error("New data entry does not match our stored data. Consider it might be an attempt of fraud");
@ -336,14 +336,14 @@ public class ProtectedExpirableDataStorage implements MessageListener {
return result;
}
private boolean checkIfStoredMailboxDataMatchesNewMailboxData(ProtectedMailboxData data, BigInteger hashOfData) {
private boolean checkIfStoredMailboxDataMatchesNewMailboxData(ProtectedMailboxData data, ByteArray hashOfData) {
Log.traceCall();
ProtectedData storedData = map.get(hashOfData);
if (storedData instanceof ProtectedMailboxData) {
ProtectedMailboxData storedMailboxData = (ProtectedMailboxData) storedData;
// publicKey is not the same (stored: sender, new: receiver)
boolean result = storedMailboxData.receiversPubKey.equals(data.receiversPubKey)
&& getHashAsBigInteger(storedMailboxData.expirablePayload).equals(hashOfData);
&& getHashAsByteArray(storedMailboxData.expirablePayload).equals(hashOfData);
if (!result)
log.error("New data entry does not match our stored data. Consider it might be an attempt of fraud");
@ -359,8 +359,8 @@ public class ProtectedExpirableDataStorage implements MessageListener {
peerGroup.broadcast(message, sender);
}
private BigInteger getHashAsBigInteger(ExpirablePayload payload) {
return new BigInteger(Hash.getHash(payload));
private ByteArray getHashAsByteArray(ExpirablePayload payload) {
return new ByteArray(Hash.getHash(payload));
}
}

View file

@ -0,0 +1,48 @@
package io.bitsquare.p2p.storage.messages;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.storage.data.ProtectedData;
import java.util.HashSet;
public final class DataExchangeMessage implements Message {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
private final int networkId = Version.NETWORK_ID;
public final HashSet<ProtectedData> set;
public DataExchangeMessage(HashSet<ProtectedData> set) {
this.set = set;
}
@Override
public int networkId() {
return networkId;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof DataExchangeMessage)) return false;
DataExchangeMessage that = (DataExchangeMessage) o;
return !(set != null ? !set.equals(that.set) : that.set != null);
}
@Override
public int hashCode() {
return set != null ? set.hashCode() : 0;
}
@Override
public String toString() {
return "GetDataResponse{" +
"networkId=" + networkId +
", set=" + set +
'}';
}
}