add ReportedPeer class

This commit is contained in:
Manfred Karrer 2015-11-16 19:51:22 +01:00
parent e90c1cabfe
commit 2950f6e347
9 changed files with 153 additions and 74 deletions

View File

@ -82,7 +82,7 @@ public class AuthenticationHandshake implements MessageListener {
if (verified) {
GetPeersAuthRequest getPeersAuthRequest = new GetPeersAuthRequest(myAddress,
authenticationResponse.responderNonce,
new HashSet<>(peerGroup.getAllPeerAddresses()));
new HashSet<>(peerGroup.getReportedPeers()));
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, getPeersAuthRequest);
log.trace("Sent GetPeersAuthRequest {} to {}", getPeersAuthRequest, peerAddress);
Futures.addCallback(future, new FutureCallback<Connection>() {
@ -113,14 +113,14 @@ public class AuthenticationHandshake implements MessageListener {
if (verified) {
// we create the msg with our already collected peer addresses (before adding the new ones)
GetPeersAuthResponse getPeersAuthResponse = new GetPeersAuthResponse(myAddress,
new HashSet<>(peerGroup.getAllPeerAddresses()));
new HashSet<>(peerGroup.getReportedPeers()));
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, getPeersAuthResponse);
log.trace("Sent GetPeersAuthResponse {} to {}", getPeersAuthResponse, peerAddress);
// now we add the reported peers to our own set
HashSet<Address> peerAddresses = getPeersAuthRequest.peerAddresses;
log.trace("Received reported peers: " + peerAddresses);
peerGroup.addToReportedPeers(peerAddresses, connection);
HashSet<ReportedPeer> reportedPeers = getPeersAuthRequest.reportedPeers;
log.trace("Received reported peers: " + reportedPeers);
peerGroup.addToReportedPeers(reportedPeers, connection);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
@ -148,9 +148,9 @@ public class AuthenticationHandshake implements MessageListener {
GetPeersAuthResponse getPeersAuthResponse = (GetPeersAuthResponse) message;
Address peerAddress = getPeersAuthResponse.address;
log.trace("GetPeersAuthResponse from " + peerAddress + " at " + myAddress);
HashSet<Address> peerAddresses = getPeersAuthResponse.peerAddresses;
log.trace("Received reported peers: " + peerAddresses);
peerGroup.addToReportedPeers(peerAddresses, connection);
HashSet<ReportedPeer> reportedPeers = getPeersAuthResponse.reportedPeers;
log.trace("Received reported peers: " + reportedPeers);
peerGroup.addToReportedPeers(reportedPeers, connection);
log.info("AuthenticationComplete: Peer with address " + peerAddress
+ " authenticated (" + connection.getUid() + "). Took "

View File

@ -6,10 +6,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.util.Random;
public class Peer implements Serializable {
public class Peer {
private static final Logger log = LoggerFactory.getLogger(Peer.class);
public final Connection connection;

View File

@ -48,7 +48,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private final Set<Address> seedNodeAddresses;
private final Map<Address, Peer> authenticatedPeers = new HashMap<>();
private final Set<Address> reportedPeerAddresses = new HashSet<>();
private final Set<ReportedPeer> reportedPeers = new HashSet<>();
private final Map<Address, AuthenticationHandshake> authenticationHandshakes = new HashMap<>();
private Timer sendPingTimer = new Timer();
@ -256,7 +256,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
if (tupleOptional.isPresent()) {
log.info("We try to authenticate to a seed node. " + tupleOptional.get().first);
authenticateToSeedNode(tupleOptional.get().second, tupleOptional.get().first, true);
} else if (reportedPeerAddresses.size() > 0) {
} else if (reportedPeers.size() > 0) {
log.info("We don't have any more seed nodes for connecting. Lets try the reported peers.");
authenticateToRemainingReportedPeers(true);
} else {
@ -293,7 +293,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private void authenticateToRemainingReportedPeers(boolean calledFromAuthenticateToSeedNode) {
Log.traceCall();
Optional<Tuple2<Address, Set<Address>>> tupleOptional = getRandomNotAuthPeerAndRemainingSet(reportedPeerAddresses);
Optional<Tuple2<ReportedPeer, Set<ReportedPeer>>> tupleOptional = getReportedPeerAndRemainingSet(reportedPeers);
if (tupleOptional.isPresent()) {
log.info("We try to authenticate to a random peer. " + tupleOptional.get().first);
authenticateToReportedPeer(tupleOptional.get().first);
@ -310,20 +310,21 @@ public class PeerGroup implements MessageListener, ConnectionListener {
// We try to connect to a reported peer. If we fail we repeat after the failed peer has been removed.
// If we succeed we repeat until we are out of addresses.
private void authenticateToReportedPeer(Address peerAddress) {
Log.traceCall(peerAddress.getFullAddress());
checkArgument(!authenticatedPeers.containsKey(peerAddress),
private void authenticateToReportedPeer(ReportedPeer reportedPeer) {
Log.traceCall(reportedPeer.toString());
final Address reportedPeerAddress = reportedPeer.address;
checkArgument(!authenticatedPeers.containsKey(reportedPeerAddress),
"We have that peer already authenticated. That must never happen.");
if (!authenticationHandshakes.containsKey(peerAddress)) {
if (!authenticationHandshakes.containsKey(reportedPeerAddress)) {
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress());
authenticationHandshakes.put(peerAddress, authenticationHandshake);
SettableFuture<Connection> future = authenticationHandshake.requestAuthentication(peerAddress);
authenticationHandshakes.put(reportedPeerAddress, authenticationHandshake);
SettableFuture<Connection> future = authenticationHandshake.requestAuthentication(reportedPeerAddress);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
setAuthenticated(connection, peerAddress);
setAuthenticated(connection, reportedPeerAddress);
if (getAuthenticatedPeers().size() < MAX_CONNECTIONS_LOW_PRIO) {
if (reportedPeerAddresses.size() > 0) {
if (reportedPeers.size() > 0) {
log.info("We still don't have enough connections. " +
"Lets try the remaining reported peer addresses.");
authenticateToRemainingReportedPeers(false);
@ -340,12 +341,12 @@ public class PeerGroup implements MessageListener, ConnectionListener {
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("Send RequestAuthenticationMessage to a reported peer with address " + peerAddress + " failed." +
log.info("Send RequestAuthenticationMessage to a reported peer with address " + reportedPeer + " failed." +
"\nThat is expected if the nodes was offline." +
"\nException:" + throwable.getMessage());
removePeer(peerAddress);
removeReportedPeer(reportedPeer);
if (reportedPeerAddresses.size() > 0) {
if (reportedPeers.size() > 0) {
log.info("Authentication failed. Lets try again with the remaining reported peer addresses.");
authenticateToRemainingReportedPeers(false);
} else {
@ -357,7 +358,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
});
} else {
log.warn("An authentication handshake is already created for that peerAddress ({})", peerAddress);
log.warn("An authentication handshake is already created for that peerAddress ({})", reportedPeer);
}
}
@ -421,7 +422,8 @@ public class PeerGroup implements MessageListener, ConnectionListener {
Log.traceCall(peer.toString());
Address peerAddress = peer.address;
authenticatedPeers.put(peerAddress, peer);
reportedPeerAddresses.remove(peerAddress);
reportedPeers.remove(new ReportedPeer(peerAddress, new Date()));
if (!checkIfConnectedPeersExceeds())
printAuthenticatedPeers();
@ -541,7 +543,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
connectedPeersList.stream()
.forEach(e -> UserThread.runAfterRandomDelay(() -> {
SettableFuture<Connection> future = networkNode.sendMessage(e.connection,
new GetPeersRequest(getMyAddress(), new HashSet<>(getAllPeerAddresses())));
new GetPeersRequest(getMyAddress(), new HashSet<>(getReportedPeers())));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
@ -587,12 +589,12 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
} else if (message instanceof GetPeersRequest) {
GetPeersRequest getPeersRequestMessage = (GetPeersRequest) message;
HashSet<Address> peerAddresses = getPeersRequestMessage.peerAddresses;
log.trace("Received peers: " + peerAddresses);
addToReportedPeers(peerAddresses, connection);
HashSet<ReportedPeer> reportedPeers = getPeersRequestMessage.reportedPeers;
log.trace("Received peers: " + reportedPeers);
addToReportedPeers(reportedPeers, connection);
SettableFuture<Connection> future = networkNode.sendMessage(connection,
new GetPeersResponse(new HashSet<>(getAllPeerAddresses())));
new GetPeersResponse(new HashSet<>(getReportedPeers())));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
@ -607,9 +609,9 @@ public class PeerGroup implements MessageListener, ConnectionListener {
});
} else if (message instanceof GetPeersResponse) {
GetPeersResponse getPeersResponse = (GetPeersResponse) message;
HashSet<Address> peerAddresses = getPeersResponse.peerAddresses;
log.trace("Received peers: " + peerAddresses);
addToReportedPeers(peerAddresses, connection);
HashSet<ReportedPeer> reportedPeers = getPeersResponse.reportedPeers;
log.trace("Received peers: " + reportedPeers);
addToReportedPeers(reportedPeers, connection);
}
}
@ -622,11 +624,13 @@ public class PeerGroup implements MessageListener, ConnectionListener {
return authenticatedPeers;
}
public Set<Address> getAllPeerAddresses() {
Set<Address> allPeerAddresses = new HashSet<>(reportedPeerAddresses);
allPeerAddresses.addAll(authenticatedPeers.values().stream()
.map(e -> e.address).collect(Collectors.toSet()));
return allPeerAddresses;
public Set<ReportedPeer> getReportedPeers() {
Set<ReportedPeer> all = new HashSet<>(reportedPeers);
Set<ReportedPeer> authenticated = authenticatedPeers.values().stream()
.map(e -> new ReportedPeer(e.address, new Date()))
.collect(Collectors.toSet());
all.addAll(authenticated);
return all;
}
public Set<Address> getSeedNodeAddresses() {
@ -642,16 +646,16 @@ public class PeerGroup implements MessageListener, ConnectionListener {
// Reported peers
///////////////////////////////////////////////////////////////////////////////////////////
void addToReportedPeers(HashSet<Address> peerAddresses, Connection connection) {
void addToReportedPeers(HashSet<ReportedPeer> reportedPeers, Connection connection) {
Log.traceCall();
// we disconnect misbehaving nodes trying to send too many peers
// reported peers include the peers connected peers which is normally max. 8 but we give some headroom
// for safety
if (peerAddresses.size() > 1100) {
if (reportedPeers.size() > 1100) {
connection.shutDown();
} else {
peerAddresses.remove(getMyAddress());
reportedPeerAddresses.addAll(peerAddresses);
reportedPeers.remove(new ReportedPeer(getMyAddress(), new Date()));
this.reportedPeers.addAll(reportedPeers);
purgeReportedPeersIfExceeds();
}
@ -660,25 +664,25 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private void purgeReportedPeersIfExceeds() {
Log.traceCall();
int size = reportedPeerAddresses.size();
int size = reportedPeers.size();
if (size > MAX_REPORTED_PEERS) {
log.trace("We have more then {} reported peers. size={}. " +
"We remove random peers from the reported peers list.", MAX_REPORTED_PEERS, size);
int diff = size - MAX_REPORTED_PEERS;
List<Address> list = new LinkedList<>(getReportedNotConnectedPeerAddresses());
List<ReportedPeer> list = new LinkedList<>(getReportedNotConnectedPeerAddresses());
for (int i = 0; i < diff; i++) {
Address toRemove = getAndRemoveRandomAddress(list);
reportedPeerAddresses.remove(toRemove);
ReportedPeer toRemove = getAndRemoveRandomReportedPeer(list);
reportedPeers.remove(toRemove);
}
} else {
log.trace("We don't have more then {} reported peers yet.", MAX_REPORTED_PEERS);
}
}
private Set<Address> getReportedNotConnectedPeerAddresses() {
private Set<ReportedPeer> getReportedNotConnectedPeerAddresses() {
Log.traceCall();
Set<Address> set = new HashSet<>(reportedPeerAddresses);
authenticatedPeers.values().stream().forEach(e -> set.remove(e.address));
Set<ReportedPeer> set = new HashSet<>(reportedPeers);
authenticatedPeers.values().stream().forEach(e -> set.remove(new ReportedPeer(e.address, new Date())));
return set;
}
@ -687,16 +691,27 @@ public class PeerGroup implements MessageListener, ConnectionListener {
// Peers
///////////////////////////////////////////////////////////////////////////////////////////
private void removeReportedPeer(@Nullable ReportedPeer reportedPeer) {
Log.traceCall("reportedPeer=" + reportedPeer);
if (reportedPeer != null) {
boolean wasInReportedPeers = reportedPeers.remove(reportedPeer);
if (wasInReportedPeers)
printReportedPeers();
removePeer(reportedPeer.address);
}
}
private void removePeer(@Nullable Address peerAddress) {
Log.traceCall("peerAddress=" + peerAddress);
if (peerAddress != null) {
if (authenticationHandshakes.containsKey(peerAddress))
authenticationHandshakes.remove(peerAddress);
boolean wasInReportedPeers = reportedPeerAddresses.remove(peerAddress);
Peer disconnectedPeer = authenticatedPeers.remove(peerAddress);
if (wasInReportedPeers || disconnectedPeer != null)
printAllPeers();
if (disconnectedPeer != null)
printAuthenticatedPeers();
}
}
@ -710,6 +725,19 @@ public class PeerGroup implements MessageListener, ConnectionListener {
// Utils
///////////////////////////////////////////////////////////////////////////////////////////
private Optional<Tuple2<ReportedPeer, Set<ReportedPeer>>> getReportedPeerAndRemainingSet(Set<ReportedPeer> remainingReportedPeers) {
Log.traceCall();
List<ReportedPeer> list = new ArrayList<>(remainingReportedPeers);
authenticatedPeers.values().stream().forEach(e -> list.remove(new ReportedPeer(e.address, new Date())));
if (!list.isEmpty()) {
ReportedPeer item = getAndRemoveRandomReportedPeer(list);
return Optional.of(new Tuple2<>(item, new HashSet<>(list)));
} else {
return Optional.empty();
}
}
private Optional<Tuple2<Address, Set<Address>>> getRandomNotAuthPeerAndRemainingSet(Set<Address> remainingAddresses) {
Log.traceCall();
List<Address> list = new ArrayList<>(remainingAddresses);
@ -722,6 +750,11 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
}
private ReportedPeer getAndRemoveRandomReportedPeer(List<ReportedPeer> list) {
Log.traceCall();
return list.remove(new Random().nextInt(list.size()));
}
private Address getAndRemoveRandomAddress(List<Address> list) {
Log.traceCall();
return list.remove(new Random().nextInt(list.size()));
@ -744,7 +777,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
public void printReportedPeers() {
StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" +
"Reported peers for node " + getMyAddress() + ":");
reportedPeerAddresses.stream().forEach(e -> result.append("\n").append(e));
reportedPeers.stream().forEach(e -> result.append("\n").append(e));
result.append("\n------------------------------------------------------------\n");
log.info(result.toString());
}

View File

@ -0,0 +1,44 @@
package io.bitsquare.p2p.peers;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Address;
import java.io.Serializable;
import java.util.Date;
public class ReportedPeer implements Serializable {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final Address address;
public final Date lastActivityDate;
public ReportedPeer(Address address, Date lastActivityDate) {
this.address = address;
this.lastActivityDate = lastActivityDate;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ReportedPeer)) return false;
ReportedPeer that = (ReportedPeer) o;
return !(address != null ? !address.equals(that.address) : that.address != null);
}
@Override
public int hashCode() {
return address != null ? address.hashCode() : 0;
}
@Override
public String toString() {
return "ReportedPeer{" +
"address=" + address +
", lastActivityDate=" + lastActivityDate +
'}';
}
}

View File

@ -2,6 +2,7 @@ package io.bitsquare.p2p.peers.messages.auth;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.peers.ReportedPeer;
import java.util.HashSet;
@ -11,12 +12,12 @@ public final class GetPeersAuthRequest extends AuthenticationMessage {
public final Address address;
public final long responderNonce;
public final HashSet<Address> peerAddresses;
public final HashSet<ReportedPeer> reportedPeers;
public GetPeersAuthRequest(Address address, long responderNonce, HashSet<Address> peerAddresses) {
public GetPeersAuthRequest(Address address, long responderNonce, HashSet<ReportedPeer> reportedPeers) {
this.address = address;
this.responderNonce = responderNonce;
this.peerAddresses = peerAddresses;
this.reportedPeers = reportedPeers;
}
@Override
@ -24,7 +25,7 @@ public final class GetPeersAuthRequest extends AuthenticationMessage {
return "GetPeersAuthRequest{" +
"address=" + address +
", challengerNonce=" + responderNonce +
", peerAddresses=" + peerAddresses +
", reportedPeers=" + reportedPeers +
"} " + super.toString();
}
}

View File

@ -2,6 +2,7 @@ package io.bitsquare.p2p.peers.messages.auth;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.peers.ReportedPeer;
import java.util.HashSet;
@ -10,18 +11,18 @@ public final class GetPeersAuthResponse extends AuthenticationMessage {
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final Address address;
public final HashSet<Address> peerAddresses;
public final HashSet<ReportedPeer> reportedPeers;
public GetPeersAuthResponse(Address address, HashSet<Address> peerAddresses) {
public GetPeersAuthResponse(Address address, HashSet<ReportedPeer> reportedPeers) {
this.address = address;
this.peerAddresses = peerAddresses;
this.reportedPeers = reportedPeers;
}
@Override
public String toString() {
return "GetPeersAuthResponse{" +
"address=" + address +
", peerAddresses=" + peerAddresses +
", reportedPeers=" + reportedPeers +
"} " + super.toString();
}
}

View File

@ -2,6 +2,7 @@ package io.bitsquare.p2p.peers.messages.maintenance;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.peers.ReportedPeer;
import java.util.HashSet;
@ -10,18 +11,18 @@ public final class GetPeersRequest extends MaintenanceMessage {
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final Address address;
public final HashSet<Address> peerAddresses;
public final HashSet<ReportedPeer> reportedPeers;
public GetPeersRequest(Address address, HashSet<Address> peerAddresses) {
public GetPeersRequest(Address address, HashSet<ReportedPeer> reportedPeers) {
this.address = address;
this.peerAddresses = peerAddresses;
this.reportedPeers = reportedPeers;
}
@Override
public String toString() {
return "GetPeersRequest{" +
"address=" + address +
", peerAddresses=" + peerAddresses +
", reportedPeers=" + reportedPeers +
"} " + super.toString();
}
}

View File

@ -1,7 +1,7 @@
package io.bitsquare.p2p.peers.messages.maintenance;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.peers.ReportedPeer;
import java.util.HashSet;
@ -9,16 +9,16 @@ public final class GetPeersResponse extends MaintenanceMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final HashSet<Address> peerAddresses;
public final HashSet<ReportedPeer> reportedPeers;
public GetPeersResponse(HashSet<Address> peerAddresses) {
this.peerAddresses = peerAddresses;
public GetPeersResponse(HashSet<ReportedPeer> reportedPeers) {
this.reportedPeers = reportedPeers;
}
@Override
public String toString() {
return "GetPeersResponse{" +
"peerAddresses=" + peerAddresses +
"reportedPeers=" + reportedPeers +
"} " + super.toString();
}
}

View File

@ -109,7 +109,7 @@ public class PeerGroupTest {
P2PService p2PService1 = seedNode1.getP2PService();
latch.await();
Thread.sleep(500);
Assert.assertEquals(0, p2PService1.getPeerGroup().getAllPeerAddresses().size());
Assert.assertEquals(0, p2PService1.getPeerGroup().getReportedPeers().size());
}
@Test
@ -184,8 +184,8 @@ public class PeerGroupTest {
});
P2PService p2PService2 = seedNode2.getP2PService();
latch.await();
Assert.assertEquals(1, p2PService1.getPeerGroup().getAllPeerAddresses().size());
Assert.assertEquals(1, p2PService2.getPeerGroup().getAllPeerAddresses().size());
Assert.assertEquals(1, p2PService1.getPeerGroup().getReportedPeers().size());
Assert.assertEquals(1, p2PService2.getPeerGroup().getReportedPeers().size());
}
// @Test