mirror of
https://github.com/bisq-network/bisq.git
synced 2024-11-20 02:12:00 +01:00
Refactor authentication handling
This commit is contained in:
parent
873402d941
commit
6d68cf8470
@ -136,7 +136,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
|
||||
PeerGroup.setSimulateAuthTorNode(200);
|
||||
|
||||
// P2P network data storage
|
||||
dataStorage = new P2PDataStorage(peerGroup, storageDir);
|
||||
dataStorage = new P2PDataStorage(peerGroup, networkNode, storageDir);
|
||||
dataStorage.addHashMapChangedListener(this);
|
||||
|
||||
// Request initial data manager
|
||||
@ -270,7 +270,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
|
||||
private void authenticateSeedNode() {
|
||||
Log.traceCall();
|
||||
checkNotNull(connectedSeedNode != null, "connectedSeedNode must not be null");
|
||||
peerGroup.authenticateSeedNode(connectedSeedNode, seedNodeAddresses);
|
||||
peerGroup.authenticateToSeedNode(connectedSeedNode, seedNodeAddresses);
|
||||
}
|
||||
|
||||
|
||||
|
@ -16,6 +16,7 @@ import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.HashSet;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -85,7 +86,7 @@ public class AuthenticationHandshake implements MessageListener {
|
||||
if (verified) {
|
||||
GetPeersAuthRequest getPeersAuthRequest = new GetPeersAuthRequest(myAddress,
|
||||
authenticationResponse.responderNonce,
|
||||
new HashSet<>(peerGroup.getReportedPeers()));
|
||||
new HashSet<>(peerGroup.getAuthenticatedAndReportedPeers()));
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, getPeersAuthRequest);
|
||||
log.trace("Sent GetPeersAuthRequest {} to {}", getPeersAuthRequest, peerAddress);
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@ -115,7 +116,7 @@ public class AuthenticationHandshake implements MessageListener {
|
||||
if (verified) {
|
||||
// we create the msg with our already collected peer addresses (before adding the new ones)
|
||||
GetPeersAuthResponse getPeersAuthResponse = new GetPeersAuthResponse(myAddress,
|
||||
new HashSet<>(peerGroup.getReportedPeers()));
|
||||
new HashSet<>(peerGroup.getAuthenticatedAndReportedPeers()));
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, getPeersAuthResponse);
|
||||
log.trace("Sent GetPeersAuthResponse {} to {}", getPeersAuthResponse, peerAddress);
|
||||
|
||||
@ -200,8 +201,8 @@ public class AuthenticationHandshake implements MessageListener {
|
||||
// Responding to authentication request
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public SettableFuture<Connection> respondToAuthenticationRequest(AuthenticationRequest
|
||||
authenticationRequest, Connection connection) {
|
||||
public SettableFuture<Connection> respondToAuthenticationRequest(AuthenticationRequest authenticationRequest,
|
||||
Connection connection) {
|
||||
Log.traceCall("peerAddress " + peerAddress);
|
||||
// Responding peer
|
||||
|
||||
@ -222,7 +223,7 @@ public class AuthenticationHandshake implements MessageListener {
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, authenticationResponse);
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
public void onSuccess(Connection connection) {
|
||||
public void onSuccess(@Nullable Connection connection) {
|
||||
log.trace("onSuccess sending AuthenticationResponse");
|
||||
|
||||
connection.setPeerAddress(peerAddress);
|
||||
@ -244,6 +245,15 @@ public class AuthenticationHandshake implements MessageListener {
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Cancel if we send reject message
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public void cancel() {
|
||||
failed(new CancelAuthenticationException());
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Private
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
@ -290,4 +300,5 @@ public class AuthenticationHandshake implements MessageListener {
|
||||
public int hashCode() {
|
||||
return peerAddress != null ? peerAddress.hashCode() : 0;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,4 @@
|
||||
package io.bitsquare.p2p.peers;
|
||||
|
||||
public class CancelAuthenticationException extends Exception {
|
||||
}
|
@ -0,0 +1,123 @@
|
||||
package io.bitsquare.p2p.peers;
|
||||
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import io.bitsquare.app.Log;
|
||||
import io.bitsquare.common.UserThread;
|
||||
import io.bitsquare.p2p.Message;
|
||||
import io.bitsquare.p2p.network.Connection;
|
||||
import io.bitsquare.p2p.network.MessageListener;
|
||||
import io.bitsquare.p2p.network.NetworkNode;
|
||||
import io.bitsquare.p2p.peers.messages.maintenance.MaintenanceMessage;
|
||||
import io.bitsquare.p2p.peers.messages.maintenance.PingMessage;
|
||||
import io.bitsquare.p2p.peers.messages.maintenance.PongMessage;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.Timer;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class MaintenanceManager implements MessageListener {
|
||||
private static final Logger log = LoggerFactory.getLogger(MaintenanceManager.class);
|
||||
|
||||
private Timer sendPingTimer;
|
||||
private PeerGroup peerGroup;
|
||||
private NetworkNode networkNode;
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Constructor
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public MaintenanceManager(PeerGroup peerGroup, NetworkNode networkNode) {
|
||||
this.peerGroup = peerGroup;
|
||||
this.networkNode = networkNode;
|
||||
|
||||
networkNode.addMessageListener(this);
|
||||
startMaintenanceTimer();
|
||||
}
|
||||
|
||||
|
||||
public void shutDown() {
|
||||
Log.traceCall();
|
||||
if (sendPingTimer != null)
|
||||
sendPingTimer.cancel();
|
||||
}
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// MessageListener implementation
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message, Connection connection) {
|
||||
if (message instanceof MaintenanceMessage) {
|
||||
Log.traceCall(message.toString());
|
||||
log.debug("Received message " + message + " at " + peerGroup.getMyAddress() + " from " + connection.getPeerAddress());
|
||||
if (message instanceof PingMessage) {
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(connection, new PongMessage(((PingMessage) message).nonce));
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
public void onSuccess(Connection connection) {
|
||||
log.trace("PongMessage sent successfully");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(@NotNull Throwable throwable) {
|
||||
log.info("PongMessage sending failed " + throwable.getMessage());
|
||||
peerGroup.removePeer(connection.getPeerAddress());
|
||||
}
|
||||
});
|
||||
} else if (message instanceof PongMessage) {
|
||||
if (connection.getPeerAddress() != null) {
|
||||
Peer peer = peerGroup.getAuthenticatedPeers().get(connection.getPeerAddress());
|
||||
if (peer != null) {
|
||||
if (((PongMessage) message).nonce != peer.getPingNonce()) {
|
||||
log.warn("PongMessage invalid: self/peer " + peerGroup.getMyAddress() + "/" + connection.getPeerAddress());
|
||||
peerGroup.removePeer(peer.address);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void startMaintenanceTimer() {
|
||||
Log.traceCall();
|
||||
if (sendPingTimer != null)
|
||||
sendPingTimer.cancel();
|
||||
|
||||
sendPingTimer = UserThread.runAfterRandomDelay(() -> {
|
||||
pingPeers();
|
||||
startMaintenanceTimer();
|
||||
}, 5, 10, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
|
||||
private void pingPeers() {
|
||||
Set<Peer> connectedPeersList = new HashSet<>(peerGroup.getAuthenticatedPeers().values());
|
||||
if (!connectedPeersList.isEmpty()) {
|
||||
Log.traceCall();
|
||||
connectedPeersList.stream()
|
||||
.filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > PeerGroup.INACTIVITY_PERIOD_BEFORE_PING)
|
||||
.forEach(e -> UserThread.runAfterRandomDelay(() -> {
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(e.connection, new PingMessage(e.getPingNonce()));
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
public void onSuccess(Connection connection) {
|
||||
log.trace("PingMessage sent successfully");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(@NotNull Throwable throwable) {
|
||||
log.info("PingMessage sending failed " + throwable.getMessage());
|
||||
peerGroup.removePeer(e.address);
|
||||
}
|
||||
});
|
||||
}, 1, 10));
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,120 @@
|
||||
package io.bitsquare.p2p.peers;
|
||||
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import io.bitsquare.app.Log;
|
||||
import io.bitsquare.common.UserThread;
|
||||
import io.bitsquare.p2p.Message;
|
||||
import io.bitsquare.p2p.network.Connection;
|
||||
import io.bitsquare.p2p.network.MessageListener;
|
||||
import io.bitsquare.p2p.network.NetworkNode;
|
||||
import io.bitsquare.p2p.peers.messages.peerexchange.GetPeersRequest;
|
||||
import io.bitsquare.p2p.peers.messages.peerexchange.GetPeersResponse;
|
||||
import io.bitsquare.p2p.peers.messages.peerexchange.PeerExchangeMessage;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.Timer;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class PeerExchangeManager implements MessageListener {
|
||||
private static final Logger log = LoggerFactory.getLogger(PeerExchangeManager.class);
|
||||
|
||||
private final PeerGroup peerGroup;
|
||||
private final NetworkNode networkNode;
|
||||
|
||||
private Timer getPeersTimer;
|
||||
|
||||
public PeerExchangeManager(PeerGroup peerGroup, NetworkNode networkNode) {
|
||||
this.peerGroup = peerGroup;
|
||||
this.networkNode = networkNode;
|
||||
|
||||
networkNode.addMessageListener(this);
|
||||
startGetPeersTimer();
|
||||
}
|
||||
|
||||
public void shutDown() {
|
||||
Log.traceCall();
|
||||
if (getPeersTimer != null)
|
||||
getPeersTimer.cancel();
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// MessageListener implementation
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message, Connection connection) {
|
||||
if (message instanceof PeerExchangeMessage) {
|
||||
Log.traceCall(message.toString());
|
||||
log.debug("Received message " + message + " at " + peerGroup.getMyAddress() + " from " + connection.getPeerAddress());
|
||||
if (message instanceof GetPeersRequest) {
|
||||
GetPeersRequest getPeersRequestMessage = (GetPeersRequest) message;
|
||||
HashSet<ReportedPeer> reportedPeers = getPeersRequestMessage.reportedPeers;
|
||||
log.trace("Received peers: " + reportedPeers);
|
||||
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(connection,
|
||||
new GetPeersResponse(new HashSet<>(peerGroup.getAuthenticatedAndReportedPeers())));
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
public void onSuccess(Connection connection) {
|
||||
log.trace("GetPeersResponse sent successfully");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(@NotNull Throwable throwable) {
|
||||
log.info("GetPeersResponse sending failed " + throwable.getMessage());
|
||||
peerGroup.removePeer(getPeersRequestMessage.address);
|
||||
}
|
||||
});
|
||||
|
||||
peerGroup.addToReportedPeers(reportedPeers, connection);
|
||||
} else if (message instanceof GetPeersResponse) {
|
||||
GetPeersResponse getPeersResponse = (GetPeersResponse) message;
|
||||
HashSet<ReportedPeer> reportedPeers = getPeersResponse.reportedPeers;
|
||||
log.trace("Received peers: " + reportedPeers);
|
||||
peerGroup.addToReportedPeers(reportedPeers, connection);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void startGetPeersTimer() {
|
||||
Log.traceCall();
|
||||
if (getPeersTimer != null)
|
||||
getPeersTimer.cancel();
|
||||
|
||||
getPeersTimer = UserThread.runAfterRandomDelay(() -> {
|
||||
trySendGetPeersRequest();
|
||||
startGetPeersTimer();
|
||||
}, 1, 2, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
private void trySendGetPeersRequest() {
|
||||
Set<Peer> connectedPeersList = new HashSet<>(peerGroup.getAuthenticatedPeers().values());
|
||||
if (!connectedPeersList.isEmpty()) {
|
||||
Log.traceCall();
|
||||
connectedPeersList.stream()
|
||||
.forEach(e -> UserThread.runAfterRandomDelay(() -> {
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(e.connection,
|
||||
new GetPeersRequest(peerGroup.getMyAddress(), new HashSet<>(peerGroup.getAuthenticatedAndReportedPeers())));
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
public void onSuccess(Connection connection) {
|
||||
log.trace("sendGetPeersRequest sent successfully");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(@NotNull Throwable throwable) {
|
||||
log.info("sendGetPeersRequest sending failed " + throwable.getMessage());
|
||||
peerGroup.removePeer(e.address);
|
||||
}
|
||||
});
|
||||
}, 5, 10));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
File diff suppressed because it is too large
Load Diff
@ -18,6 +18,10 @@ public class ReportedPeer implements Serializable {
|
||||
this.lastActivityDate = lastActivityDate;
|
||||
}
|
||||
|
||||
public ReportedPeer(Address address) {
|
||||
this(address, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
|
@ -38,11 +38,12 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
|
||||
}
|
||||
|
||||
|
||||
private NetworkNode networkNode;
|
||||
private Address connectedSeedNodeAddress;
|
||||
private Collection<Address> seedNodeAddresses;
|
||||
private P2PDataStorage dataStorage;
|
||||
private Listener listener;
|
||||
private final NetworkNode networkNode;
|
||||
private final P2PDataStorage dataStorage;
|
||||
private final Listener listener;
|
||||
|
||||
private Optional<Address> optionalConnectedSeedNodeAddress = Optional.empty();
|
||||
private Optional<Collection<Address>> optionalSeedNodeAddresses = Optional.empty();
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
@ -62,8 +63,8 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public void requestData(Collection<Address> seedNodeAddresses) {
|
||||
if (this.seedNodeAddresses == null)
|
||||
this.seedNodeAddresses = seedNodeAddresses;
|
||||
if (!optionalSeedNodeAddresses.isPresent())
|
||||
optionalSeedNodeAddresses = Optional.of(seedNodeAddresses);
|
||||
|
||||
Log.traceCall(seedNodeAddresses.toString());
|
||||
if (!seedNodeAddresses.isEmpty()) {
|
||||
@ -78,8 +79,8 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
|
||||
@Override
|
||||
public void onSuccess(@Nullable Connection connection) {
|
||||
log.info("Send GetAllDataMessage to " + candidate + " succeeded.");
|
||||
checkArgument(connectedSeedNodeAddress == null, "We have already a connectedSeedNode. That must not happen.");
|
||||
connectedSeedNodeAddress = candidate;
|
||||
checkArgument(!optionalConnectedSeedNodeAddress.isPresent(), "We have already a connectedSeedNode. That must not happen.");
|
||||
optionalConnectedSeedNodeAddress = Optional.of(candidate);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -100,7 +101,7 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
|
||||
listener.onNoSeedNodeAvailable();
|
||||
|
||||
// We re try after 20-30 sec.
|
||||
UserThread.runAfterRandomDelay(() -> requestData(this.seedNodeAddresses),
|
||||
UserThread.runAfterRandomDelay(() -> requestData(optionalSeedNodeAddresses.get()),
|
||||
20, 30, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
@ -124,7 +125,7 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
|
||||
// we keep that connection open as the bootstrapping peer will use that for the authentication
|
||||
// as we are not authenticated yet the data adding will not be broadcasted
|
||||
set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress()));
|
||||
listener.onDataReceived(connectedSeedNodeAddress);
|
||||
optionalConnectedSeedNodeAddress.ifPresent(connectedSeedNodeAddress -> listener.onDataReceived(connectedSeedNodeAddress));
|
||||
}
|
||||
}
|
||||
|
||||
@ -135,8 +136,10 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
|
||||
|
||||
@Override
|
||||
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
|
||||
if (connectedSeedNodeAddress.equals(peerAddress))
|
||||
requestDataFromAuthenticatedSeedNode(peerAddress, connection);
|
||||
optionalConnectedSeedNodeAddress.ifPresent(connectedSeedNodeAddress -> {
|
||||
if (connectedSeedNodeAddress.equals(peerAddress))
|
||||
requestDataFromAuthenticatedSeedNode(peerAddress, connection);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@ -158,7 +161,8 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
|
||||
+ "\nWe will try again to request data from any of our seed nodes.");
|
||||
|
||||
// We will try again to request data from any of our seed nodes.
|
||||
requestData(seedNodeAddresses);
|
||||
if (optionalSeedNodeAddresses.isPresent())
|
||||
requestData(optionalSeedNodeAddresses.get());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -20,8 +20,8 @@ public abstract class AuthenticationMessage implements Message {
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "AuthenticationMessage{" +
|
||||
"networkId=" + networkId +
|
||||
return ", address=" + address.toString() +
|
||||
", networkId=" + networkId +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,24 @@
|
||||
package io.bitsquare.p2p.peers.messages.auth;
|
||||
|
||||
import io.bitsquare.app.Version;
|
||||
import io.bitsquare.p2p.Address;
|
||||
|
||||
public final class AuthenticationRejection extends AuthenticationMessage {
|
||||
// That object is sent over the wire, so we need to take care of version compatibility.
|
||||
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
|
||||
|
||||
public final long requesterNonce;
|
||||
|
||||
public AuthenticationRejection(Address address, long requesterNonce) {
|
||||
super(address);
|
||||
this.requesterNonce = requesterNonce;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "AuthenticationReject{" +
|
||||
"address=" + address +
|
||||
", requesterNonce=" + requesterNonce +
|
||||
super.toString() + "} ";
|
||||
}
|
||||
}
|
@ -19,6 +19,6 @@ public final class AuthenticationRequest extends AuthenticationMessage {
|
||||
return "AuthenticationRequest{" +
|
||||
"address=" + address +
|
||||
", nonce=" + requesterNonce +
|
||||
"} " + super.toString();
|
||||
super.toString() + "} ";
|
||||
}
|
||||
}
|
||||
|
@ -22,6 +22,6 @@ public final class AuthenticationResponse extends AuthenticationMessage {
|
||||
"address=" + address +
|
||||
", requesterNonce=" + requesterNonce +
|
||||
", challengerNonce=" + responderNonce +
|
||||
"} " + super.toString();
|
||||
super.toString() + "} ";
|
||||
}
|
||||
}
|
||||
|
@ -25,6 +25,6 @@ public final class GetPeersAuthRequest extends AuthenticationMessage {
|
||||
"address=" + address +
|
||||
", challengerNonce=" + responderNonce +
|
||||
", reportedPeers=" + reportedPeers +
|
||||
"} " + super.toString();
|
||||
super.toString() + "} ";
|
||||
}
|
||||
}
|
||||
|
@ -22,6 +22,6 @@ public final class GetPeersAuthResponse extends AuthenticationMessage {
|
||||
return "GetPeersAuthResponse{" +
|
||||
"address=" + address +
|
||||
", reportedPeers=" + reportedPeers +
|
||||
"} " + super.toString();
|
||||
super.toString() + "} ";
|
||||
}
|
||||
}
|
||||
|
@ -13,8 +13,7 @@ public abstract class MaintenanceMessage implements Message {
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "MaintenanceMessage{" +
|
||||
"networkId=" + networkId +
|
||||
return ", networkId=" + networkId +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,6 @@ public final class PingMessage extends MaintenanceMessage {
|
||||
public String toString() {
|
||||
return "PingMessage{" +
|
||||
"nonce=" + nonce +
|
||||
"} " + super.toString();
|
||||
super.toString() + "} ";
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,6 @@ public final class PongMessage extends MaintenanceMessage {
|
||||
public String toString() {
|
||||
return "PongMessage{" +
|
||||
"nonce=" + nonce +
|
||||
"} " + super.toString();
|
||||
super.toString() + "} ";
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
package io.bitsquare.p2p.peers.messages.maintenance;
|
||||
package io.bitsquare.p2p.peers.messages.peerexchange;
|
||||
|
||||
import io.bitsquare.app.Version;
|
||||
import io.bitsquare.p2p.Address;
|
||||
@ -6,7 +6,7 @@ import io.bitsquare.p2p.peers.ReportedPeer;
|
||||
|
||||
import java.util.HashSet;
|
||||
|
||||
public final class GetPeersRequest extends MaintenanceMessage {
|
||||
public final class GetPeersRequest extends PeerExchangeMessage {
|
||||
// That object is sent over the wire, so we need to take care of version compatibility.
|
||||
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
|
||||
|
||||
@ -23,6 +23,6 @@ public final class GetPeersRequest extends MaintenanceMessage {
|
||||
return "GetPeersRequest{" +
|
||||
"address=" + address +
|
||||
", reportedPeers=" + reportedPeers +
|
||||
"} " + super.toString();
|
||||
super.toString() + "} ";
|
||||
}
|
||||
}
|
@ -1,11 +1,11 @@
|
||||
package io.bitsquare.p2p.peers.messages.maintenance;
|
||||
package io.bitsquare.p2p.peers.messages.peerexchange;
|
||||
|
||||
import io.bitsquare.app.Version;
|
||||
import io.bitsquare.p2p.peers.ReportedPeer;
|
||||
|
||||
import java.util.HashSet;
|
||||
|
||||
public final class GetPeersResponse extends MaintenanceMessage {
|
||||
public final class GetPeersResponse extends PeerExchangeMessage {
|
||||
// That object is sent over the wire, so we need to take care of version compatibility.
|
||||
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
|
||||
|
||||
@ -19,6 +19,6 @@ public final class GetPeersResponse extends MaintenanceMessage {
|
||||
public String toString() {
|
||||
return "GetPeersResponse{" +
|
||||
"reportedPeers=" + reportedPeers +
|
||||
"} " + super.toString();
|
||||
super.toString() + "} ";
|
||||
}
|
||||
}
|
@ -0,0 +1,19 @@
|
||||
package io.bitsquare.p2p.peers.messages.peerexchange;
|
||||
|
||||
import io.bitsquare.app.Version;
|
||||
import io.bitsquare.p2p.Message;
|
||||
|
||||
public abstract class PeerExchangeMessage implements Message {
|
||||
private final int networkId = Version.NETWORK_ID;
|
||||
|
||||
@Override
|
||||
public int networkId() {
|
||||
return networkId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return ", networkId=" + networkId +
|
||||
'}';
|
||||
}
|
||||
}
|
@ -51,10 +51,12 @@ public class P2PDataStorage implements MessageListener {
|
||||
// Constructor
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public P2PDataStorage(PeerGroup peerGroup, File storageDir) {
|
||||
public P2PDataStorage(PeerGroup peerGroup, NetworkNode networkNode, File storageDir) {
|
||||
Log.traceCall();
|
||||
this.peerGroup = peerGroup;
|
||||
|
||||
networkNode.addMessageListener(this);
|
||||
|
||||
storage = new Storage<>(storageDir);
|
||||
|
||||
init();
|
||||
@ -62,13 +64,9 @@ public class P2PDataStorage implements MessageListener {
|
||||
|
||||
private void init() {
|
||||
Log.traceCall();
|
||||
HashMap<ByteArray, Integer> persisted = storage.initAndGetPersisted(sequenceNumberMap, "SequenceNumberMap");
|
||||
if (persisted != null) {
|
||||
HashMap<ByteArray, Integer> persisted = storage.initAndGetPersisted("SequenceNumberMap");
|
||||
if (persisted != null)
|
||||
sequenceNumberMap = persisted;
|
||||
}
|
||||
|
||||
NetworkNode networkNode = peerGroup.getNetworkNode();
|
||||
networkNode.addMessageListener(this);
|
||||
|
||||
timer.scheduleAtFixedRate(new TimerTask() {
|
||||
@Override
|
||||
@ -149,10 +147,12 @@ public class P2PDataStorage implements MessageListener {
|
||||
}
|
||||
|
||||
public boolean add(ProtectedData protectedData, @Nullable Address sender) {
|
||||
Log.traceCall();
|
||||
return doAdd(protectedData, sender, false);
|
||||
}
|
||||
|
||||
public boolean rePublish(ProtectedData protectedData, @Nullable Address sender) {
|
||||
Log.traceCall();
|
||||
return doAdd(protectedData, sender, true);
|
||||
}
|
||||
|
||||
@ -170,7 +170,8 @@ public class P2PDataStorage implements MessageListener {
|
||||
if (result) {
|
||||
map.put(hashOfPayload, protectedData);
|
||||
sequenceNumberMap.put(hashOfPayload, protectedData.sequenceNumber);
|
||||
|
||||
storage.queueUpForSave(sequenceNumberMap);
|
||||
|
||||
StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n");
|
||||
sb.append("Data set after addProtectedExpirableData:");
|
||||
map.values().stream().forEach(e -> sb.append("\n").append(e.toString()).append("\n"));
|
||||
@ -180,7 +181,7 @@ public class P2PDataStorage implements MessageListener {
|
||||
if (rePublish || !containsKey)
|
||||
broadcast(new AddDataMessage(protectedData), sender);
|
||||
|
||||
storage.queueUpForSave();
|
||||
|
||||
hashMapChangedListeners.stream().forEach(e -> e.onAdded(protectedData));
|
||||
} else {
|
||||
log.trace("add failed");
|
||||
@ -206,7 +207,7 @@ public class P2PDataStorage implements MessageListener {
|
||||
broadcast(new RemoveDataMessage(protectedData), sender);
|
||||
|
||||
sequenceNumberMap.put(hashOfPayload, protectedData.sequenceNumber);
|
||||
storage.queueUpForSave();
|
||||
storage.queueUpForSave(sequenceNumberMap);
|
||||
} else {
|
||||
log.debug("remove failed");
|
||||
}
|
||||
@ -231,7 +232,7 @@ public class P2PDataStorage implements MessageListener {
|
||||
broadcast(new RemoveMailboxDataMessage(protectedMailboxData), sender);
|
||||
|
||||
sequenceNumberMap.put(hashOfData, protectedMailboxData.sequenceNumber);
|
||||
storage.queueUpForSave();
|
||||
storage.queueUpForSave(sequenceNumberMap);
|
||||
} else {
|
||||
log.debug("removeMailboxData failed");
|
||||
}
|
||||
@ -288,7 +289,6 @@ public class P2PDataStorage implements MessageListener {
|
||||
Log.traceCall();
|
||||
map.remove(hashOfPayload);
|
||||
log.trace("Data removed from our map. We broadcast the message to our peers.");
|
||||
storage.queueUpForSave();
|
||||
hashMapChangedListeners.stream().forEach(e -> e.onRemoved(protectedData));
|
||||
|
||||
StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n" +
|
||||
|
@ -112,7 +112,7 @@ public class PeerGroupTest {
|
||||
P2PService p2PService1 = seedNode1.getP2PService();
|
||||
latch.await();
|
||||
Thread.sleep(500);
|
||||
Assert.assertEquals(0, p2PService1.getPeerGroup().getReportedPeers().size());
|
||||
Assert.assertEquals(0, p2PService1.getPeerGroup().getAuthenticatedAndReportedPeers().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -194,8 +194,8 @@ public class PeerGroupTest {
|
||||
});
|
||||
P2PService p2PService2 = seedNode2.getP2PService();
|
||||
latch.await();
|
||||
Assert.assertEquals(1, p2PService1.getPeerGroup().getReportedPeers().size());
|
||||
Assert.assertEquals(1, p2PService2.getPeerGroup().getReportedPeers().size());
|
||||
Assert.assertEquals(1, p2PService1.getPeerGroup().getAuthenticatedAndReportedPeers().size());
|
||||
Assert.assertEquals(1, p2PService2.getPeerGroup().getAuthenticatedAndReportedPeers().size());
|
||||
}
|
||||
|
||||
// @Test
|
||||
|
@ -66,8 +66,7 @@ public class ProtectedDataStorageTest {
|
||||
encryptionService1 = new EncryptionService(keyRing1);
|
||||
networkNode1 = TestUtils.getAndStartSeedNode(8001, useClearNet, seedNodes).getP2PService().getNetworkNode();
|
||||
peerGroup1 = new PeerGroup(networkNode1);
|
||||
peerGroup1.setSeedNodeAddresses(seedNodes);
|
||||
dataStorage1 = new P2PDataStorage(peerGroup1, new File("dummy"));
|
||||
dataStorage1 = new P2PDataStorage(peerGroup1, networkNode1, new File("dummy"));
|
||||
|
||||
// for mailbox
|
||||
keyRing2 = new KeyRing(new KeyStorage(dir2));
|
||||
@ -109,7 +108,7 @@ public class ProtectedDataStorageTest {
|
||||
public void testExpirableData() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException {
|
||||
P2PDataStorage.CHECK_TTL_INTERVAL = 10;
|
||||
// CHECK_TTL_INTERVAL is used in constructor of ProtectedExpirableDataStorage so we recreate it here
|
||||
dataStorage1 = new P2PDataStorage(peerGroup1, new File("dummy"));
|
||||
dataStorage1 = new P2PDataStorage(peerGroup1, networkNode1, new File("dummy"));
|
||||
mockData.ttl = 50;
|
||||
|
||||
ProtectedData data = dataStorage1.getDataWithSignedSeqNr(mockData, storageSignatureKeyPair1);
|
||||
|
Loading…
Reference in New Issue
Block a user