Add peerrequest messages, clean up logging

This commit is contained in:
Manfred Karrer 2015-11-06 22:34:32 +01:00
parent e3cdad4299
commit 79f3ac99cf
35 changed files with 489 additions and 432 deletions

View File

@ -95,7 +95,7 @@ public class Storage<T extends Serializable> {
// Save delayed and on a background thread
public void queueUpForSave() {
log.debug("save " + fileName);
log.trace("save " + fileName);
checkNotNull(storageFile, "storageFile = null. Call setupFileStorage before using read/write.");
fileManager.saveLater(serializable);
@ -118,12 +118,12 @@ public class Storage<T extends Serializable> {
long now = System.currentTimeMillis();
try {
T persistedObject = fileManager.read(storageFile);
log.info("Read {} completed in {}msec", serializable.getClass().getSimpleName(), System.currentTimeMillis() - now);
log.trace("Read {} completed in {}msec", serializable.getClass().getSimpleName(), System.currentTimeMillis() - now);
// If we did not get any exception we can be sure the data are consistent so we make a backup
now = System.currentTimeMillis();
fileManager.backupFile(fileName);
log.info("Backup {} completed in {}msec", serializable.getClass().getSimpleName(), System.currentTimeMillis() - now);
log.trace("Backup {} completed in {}msec", serializable.getClass().getSimpleName(), System.currentTimeMillis() - now);
return persistedObject;
} catch (ClassCastException | IOException e) {

View File

@ -6,14 +6,18 @@
</encoder>
</appender>
<root level="TRACE">
<root level="INFO">
<appender-ref ref="CONSOLE_APPENDER"/>
</root>
<logger name="io.bitsquare.p2p" level="TRACE"/>
<!-- <logger name="io.bitsquare.p2p.peers.PeerGroup" level="TRACE"/>
<logger name="io.bitsquare.p2p.P2PService" level="TRACE"/>
<logger name="io.bitsquare.p2p.storage.ProtectedExpirableDataStorage" level="TRACE"/>
<logger name="io.bitsquare.p2p.network.LocalhostNetworkNode" level="TRACE"/>
<logger name="io.bitsquare.p2p.network.TorNetworkNode" level="TRACE"/>
<logger name="io.bitsquare.p2p.network.NetworkNode" level="TRACE"/>-->
<logger name="io.bitsquare" level="TRACE"/>
<logger name="com.msopentech.thali.toronionproxy.OnionProxyManagerEventHandler" level="WARN"/>
<logger name="io.bitsquare.btc.AddressBasedCoinSelector" level="OFF"/>

View File

@ -15,9 +15,9 @@ import io.bitsquare.crypto.EncryptionService;
import io.bitsquare.crypto.SealedAndSignedMessage;
import io.bitsquare.p2p.messaging.*;
import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.peer.Peer;
import io.bitsquare.p2p.peer.PeerGroup;
import io.bitsquare.p2p.peer.PeerListener;
import io.bitsquare.p2p.peers.Peer;
import io.bitsquare.p2p.peers.PeerGroup;
import io.bitsquare.p2p.peers.PeerListener;
import io.bitsquare.p2p.seed.SeedNodesRepository;
import io.bitsquare.p2p.storage.HashMapChangedListener;
import io.bitsquare.p2p.storage.ProtectedExpirableDataStorage;
@ -25,8 +25,8 @@ import io.bitsquare.p2p.storage.data.ExpirableMailboxPayload;
import io.bitsquare.p2p.storage.data.ExpirablePayload;
import io.bitsquare.p2p.storage.data.ProtectedData;
import io.bitsquare.p2p.storage.data.ProtectedMailboxData;
import io.bitsquare.p2p.storage.messages.AllDataMessage;
import io.bitsquare.p2p.storage.messages.GetAllDataMessage;
import io.bitsquare.p2p.storage.messages.GetDataRequest;
import io.bitsquare.p2p.storage.messages.GetDataResponse;
import javafx.beans.property.BooleanProperty;
import javafx.beans.property.SimpleBooleanProperty;
import org.fxmisc.easybind.EasyBind;
@ -153,17 +153,13 @@ public class P2PService implements SetupListener {
});
networkNode.addMessageListener((message, connection) -> {
if (message instanceof GetAllDataMessage) {
if (message instanceof GetDataRequest) {
log.trace("Received GetDataSetMessage: " + message);
networkNode.sendMessage(connection, new AllDataMessage(getDataSet()));
} else if (message instanceof AllDataMessage) {
AllDataMessage allDataMessage = (AllDataMessage) message;
HashSet<ProtectedData> set = allDataMessage.set;
networkNode.sendMessage(connection, new GetDataResponse(getDataSet()));
} else if (message instanceof GetDataResponse) {
GetDataResponse getDataResponse = (GetDataResponse) message;
HashSet<ProtectedData> set = getDataResponse.set;
if (!set.isEmpty()) {
StringBuilder sb = new StringBuilder("Received DataSetMessage:\n\n");
set.stream().forEach(e -> sb.append(e.toString() + "\n"));
sb.append("\n");
log.trace(sb.toString());
// we keep that connection open as the bootstrapping peer will use that for the authentication
// as we are not authenticated yet the data adding will not be broadcasted
set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress()));
@ -272,7 +268,7 @@ public class P2PService implements SetupListener {
Address candidate = remainingSeedNodeAddresses.remove(0);
log.info("We try to send a GetAllDataMessage request to a random seed node. " + candidate);
SettableFuture<Connection> future = networkNode.sendMessage(candidate, new GetAllDataMessage());
SettableFuture<Connection> future = networkNode.sendMessage(candidate, new GetDataRequest());
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
@ -315,7 +311,7 @@ public class P2PService implements SetupListener {
private void sendGetAllDataMessageAfterAuthentication(final Peer peer) {
log.trace("sendGetDataSetMessageAfterAuthentication");
// After authentication we request again data as we might have missed pushed data in the meantime
SettableFuture<Connection> future = networkNode.sendMessage(peer.connection, new GetAllDataMessage());
SettableFuture<Connection> future = networkNode.sendMessage(peer.connection, new GetDataRequest());
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {

View File

@ -188,15 +188,18 @@ public class Connection {
private void shutDown(boolean sendCloseConnectionMessage, @Nullable Runnable shutDownCompleteHandler) {
if (!stopped) {
log.info("\n\nShutDown connection:"
StringBuilder result = new StringBuilder("\n\n############################################################\n" +
"ShutDown connection:"
+ "\npeerAddress=" + peerAddress
+ "\nobjectId=" + getObjectId()
+ "\nuid=" + getUid()
+ "\nisAuthenticated=" + isAuthenticated()
+ "\nsocket.getPort()=" + sharedSpace.getSocket().getPort()
+ "\n\n");
log.debug("ShutDown " + this.getObjectId());
log.debug("ShutDown connection requested. Connection=" + this.toString());
+ "\nlocalPort/port=" + sharedSpace.getSocket().getLocalPort()
+ "/" + sharedSpace.getSocket().getPort()
+ "\nobjectId=" + getObjectId() + " / uid=" + getUid()
+ "\nisAuthenticated=" + isAuthenticated());
result.append("\n############################################################\n");
log.info(result.toString());
log.trace("ShutDown " + this.getObjectId());
log.trace("ShutDown connection requested. Connection=" + this.toString());
stopped = true;
sharedSpace.stop();

View File

@ -4,7 +4,6 @@ package io.bitsquare.p2p.network;
import io.bitsquare.p2p.Address;
public interface ConnectionListener {
enum Reason {
SOCKET_CLOSED,
RESET,

View File

@ -3,6 +3,5 @@ package io.bitsquare.p2p.network;
import io.bitsquare.p2p.Message;
public interface MessageListener {
void onMessage(Message message, Connection connection);
}

View File

@ -38,13 +38,15 @@ public class Server implements Runnable {
if (!stopped) {
log.info("Accepted new client on localPort/port " + socket.getLocalPort() + "/" + socket.getPort());
Connection connection = new Connection(socket, messageListener, connectionListener);
log.info("\n\nServer created new inbound connection:"
+ "\nserverSocket.getLocalPort()=" + serverSocket.getLocalPort()
+ "\nsocket.getPort()=" + socket.getPort()
+ "\nconnection.uid=" + connection.getUid()
+ "\n\n");
log.info("Server created new socket with port " + socket.getPort());
StringBuilder result = new StringBuilder("\n\n############################################################\n" +
"Server created new inbound connection:"
+ "\nlocalPort/port=" + serverSocket.getLocalPort()
+ "/" + socket.getPort()
+ "\nconnection.uid=" + connection.getUid());
result.append("\n############################################################\n");
log.info(result.toString());
if (!stopped)
connections.add(connection);
}

View File

@ -1,5 +0,0 @@
package io.bitsquare.p2p.network;
public interface ServerListener {
void onSocketHandler(Connection connection);
}

View File

@ -2,11 +2,9 @@ package io.bitsquare.p2p.network;
public interface SetupListener {
void onTorNodeReady();
void onHiddenServicePublished();
void onSetupFailed(Throwable throwable);
}

View File

@ -1,15 +0,0 @@
package io.bitsquare.p2p.network.messages;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Message;
public final class SelfTestMessage implements Message {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final long nonce;
public SelfTestMessage(long nonce) {
this.nonce = nonce;
}
}

View File

@ -1,4 +1,4 @@
package io.bitsquare.p2p.peer;
package io.bitsquare.p2p.peers;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
@ -9,10 +9,7 @@ import io.bitsquare.common.util.Tuple2;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.peer.messages.ChallengeMessage;
import io.bitsquare.p2p.peer.messages.GetPeersMessage;
import io.bitsquare.p2p.peer.messages.PeersMessage;
import io.bitsquare.p2p.peer.messages.RequestAuthenticationMessage;
import io.bitsquare.p2p.peers.messages.auth.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
@ -54,11 +51,11 @@ public class AuthenticationHandshake {
// Requesting peer
resultFuture = SettableFuture.create();
startAuthTs = System.currentTimeMillis();
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new RequestAuthenticationMessage(myAddress, getAndSetNonce()));
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new AuthenticationRequest(myAddress, getAndSetNonce()));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
log.info("send RequestAuthenticationMessage to " + peerAddress + " succeeded.");
log.trace("send RequestAuthenticationMessage to " + peerAddress + " succeeded.");
}
@Override
@ -73,18 +70,15 @@ public class AuthenticationHandshake {
}
public SettableFuture<Connection> requestAuthentication(Set<Address> remainingAddresses, Address peerAddress) {
log.info("requestAuthentication " + this);
log.info("remainingAddresses " + remainingAddresses);
log.info("peerAddress " + peerAddress);
// Requesting peer
resultFuture = SettableFuture.create();
startAuthTs = System.currentTimeMillis();
remainingAddresses.remove(peerAddress);
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new RequestAuthenticationMessage(myAddress, getAndSetNonce()));
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new AuthenticationRequest(myAddress, getAndSetNonce()));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
log.info("send RequestAuthenticationMessage to " + peerAddress + " succeeded.");
log.trace("send RequestAuthenticationMessage to " + peerAddress + " succeeded.");
}
@Override
@ -101,23 +95,25 @@ public class AuthenticationHandshake {
}
public SettableFuture<Connection> processAuthenticationRequest(RequestAuthenticationMessage requestAuthenticationMessage, Connection connection) {
public SettableFuture<Connection> processAuthenticationRequest(AuthenticationRequest authenticationRequest, Connection connection) {
// Responding peer
resultFuture = SettableFuture.create();
startAuthTs = System.currentTimeMillis();
Address peerAddress = requestAuthenticationMessage.address;
Address peerAddress = authenticationRequest.address;
log.trace("RequestAuthenticationMessage from " + peerAddress + " at " + myAddress);
log.info("We shut down inbound connection from peer {} to establish a new " +
"connection with his reported address.", peerAddress);
connection.shutDown(() -> UserThread.runAfter(() -> {
// we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to
// inconsistent state (removal of connection from NetworkNode.authenticatedConnections)
log.trace("processAuthenticationMessage: connection.shutDown complete. RequestAuthenticationMessage from " + peerAddress + " at " + myAddress);
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new ChallengeMessage(myAddress, requestAuthenticationMessage.nonce, getAndSetNonce()));
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new AuthenticationResponse(myAddress, authenticationRequest.nonce, getAndSetNonce()));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.debug("onSuccess sending ChallengeMessage");
log.trace("onSuccess sending ChallengeMessage");
}
@Override
@ -135,18 +131,19 @@ public class AuthenticationHandshake {
private void setupMessageListener() {
networkNode.addMessageListener((message, connection) -> {
if (message instanceof ChallengeMessage) {
if (message instanceof AuthenticationMessage) {
if (message instanceof AuthenticationResponse) {
// Requesting peer
ChallengeMessage challengeMessage = (ChallengeMessage) message;
Address peerAddress = challengeMessage.address;
AuthenticationResponse authenticationResponse = (AuthenticationResponse) message;
Address peerAddress = authenticationResponse.address;
log.trace("ChallengeMessage from " + peerAddress + " at " + myAddress);
log.trace("challengeMessage" + challengeMessage);
log.trace("challengeMessage" + authenticationResponse);
// HashMap<Address, Long> tempNonceMap = new HashMap<>(nonceMap);
boolean verified = nonce != 0 && nonce == challengeMessage.requesterNonce;
boolean verified = nonce != 0 && nonce == authenticationResponse.requesterNonce;
if (verified) {
connection.setPeerAddress(peerAddress);
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress,
new GetPeersMessage(myAddress, challengeMessage.challengerNonce, new HashSet<>(peerGroup.getAllPeerAddresses())));
new GetPeersAuthRequest(myAddress, authenticationResponse.challengerNonce, new HashSet<>(peerGroup.getAllPeerAddresses())));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
@ -160,23 +157,23 @@ public class AuthenticationHandshake {
}
});
} else {
log.warn("verify nonce failed. challengeMessage=" + challengeMessage + " / nonce=" + nonce);
UserThread.execute(() -> resultFuture.setException(new Exception("Verify nonce failed. challengeMessage=" + challengeMessage + " / nonceMap=" + nonce)));
log.warn("verify nonce failed. challengeMessage=" + authenticationResponse + " / nonce=" + nonce);
UserThread.execute(() -> resultFuture.setException(new Exception("Verify nonce failed. challengeMessage=" + authenticationResponse + " / nonceMap=" + nonce)));
}
} else if (message instanceof GetPeersMessage) {
} else if (message instanceof GetPeersAuthRequest) {
// Responding peer
GetPeersMessage getPeersMessage = (GetPeersMessage) message;
Address peerAddress = getPeersMessage.address;
GetPeersAuthRequest getPeersAuthRequest = (GetPeersAuthRequest) message;
Address peerAddress = getPeersAuthRequest.address;
log.trace("GetPeersMessage from " + peerAddress + " at " + myAddress);
boolean verified = nonce != 0 && nonce == getPeersMessage.challengerNonce;
boolean verified = nonce != 0 && nonce == getPeersAuthRequest.challengerNonce;
if (verified) {
// we add the reported peers to our own set
HashSet<Address> peerAddresses = ((GetPeersMessage) message).peerAddresses;
HashSet<Address> peerAddresses = getPeersAuthRequest.peerAddresses;
log.trace("Received peers: " + peerAddresses);
peerGroup.addToReportedPeers(peerAddresses, connection);
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress,
new PeersMessage(myAddress, new HashSet<>(peerGroup.getAllPeerAddresses())));
new GetPeersAuthResponse(myAddress, new HashSet<>(peerGroup.getAllPeerAddresses())));
log.trace("sent PeersMessage to " + peerAddress + " from " + myAddress
+ " with allPeers=" + peerGroup.getAllPeerAddresses());
Futures.addCallback(future, new FutureCallback<Connection>() {
@ -192,21 +189,21 @@ public class AuthenticationHandshake {
}
});
log.info("\n\nAuthenticationComplete\nPeer with address " + peerAddress
log.info("\n\nAuthenticationComplete: Peer with address " + peerAddress
+ " authenticated (" + connection.getObjectId() + "). Took "
+ (System.currentTimeMillis() - startAuthTs) + " ms. \n\n");
UserThread.execute(() -> resultFuture.set(connection));
} else {
log.warn("verify nonce failed. getPeersMessage=" + getPeersMessage + " / nonceMap=" + nonce);
UserThread.execute(() -> resultFuture.setException(new Exception("Verify nonce failed. getPeersMessage=" + getPeersMessage + " / nonceMap=" + nonce)));
log.warn("verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonceMap=" + nonce);
UserThread.execute(() -> resultFuture.setException(new Exception("Verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonceMap=" + nonce)));
}
} else if (message instanceof PeersMessage) {
} else if (message instanceof GetPeersAuthResponse) {
// Requesting peer
PeersMessage peersMessage = (PeersMessage) message;
Address peerAddress = peersMessage.address;
GetPeersAuthResponse getPeersAuthResponse = (GetPeersAuthResponse) message;
Address peerAddress = getPeersAuthResponse.address;
log.trace("PeersMessage from " + peerAddress + " at " + myAddress);
HashSet<Address> peerAddresses = peersMessage.peerAddresses;
HashSet<Address> peerAddresses = getPeersAuthResponse.peerAddresses;
log.trace("Received peers: " + peerAddresses);
peerGroup.addToReportedPeers(peerAddresses, connection);
@ -218,6 +215,7 @@ public class AuthenticationHandshake {
UserThread.execute(() -> resultFuture.set(connection));
}
}
});
}

View File

@ -1,4 +1,4 @@
package io.bitsquare.p2p.peer;
package io.bitsquare.p2p.peers;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.network.Connection;

View File

@ -1,4 +1,4 @@
package io.bitsquare.p2p.peer;
package io.bitsquare.p2p.peers;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.network.Connection;

View File

@ -1,18 +1,17 @@
package io.bitsquare.p2p.peer;
package io.bitsquare.p2p.peers;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.util.Tuple2;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.ConnectionListener;
import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.peer.messages.MaintenanceMessage;
import io.bitsquare.p2p.peer.messages.PingMessage;
import io.bitsquare.p2p.peer.messages.PongMessage;
import io.bitsquare.p2p.peer.messages.RequestAuthenticationMessage;
import io.bitsquare.p2p.peers.messages.auth.AuthenticationRequest;
import io.bitsquare.p2p.peers.messages.maintenance.*;
import io.bitsquare.p2p.storage.messages.BroadcastMessage;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -39,6 +38,7 @@ public class PeerGroup {
private static int MAX_CONNECTIONS = 8;
private static int MAINTENANCE_INTERVAL = new Random().nextInt(2 * 60 * 1000) + 2 * 60 * 1000; // 2-4 min.
private static int GET_PEERS_INTERVAL = 30000;//new Random().nextInt(2 * 60 * 1000) + 2 * 60 * 1000; // 2-4 min.
private static int PING_AFTER_CONNECTION_INACTIVITY = 30 * 1000;
public static void setMaxConnections(int maxConnections) {
@ -46,16 +46,16 @@ public class PeerGroup {
}
private final NetworkNode networkNode;
private final Set<Address> seedNodeAddresses;
private final CopyOnWriteArraySet<PeerListener> peerListeners = new CopyOnWriteArraySet<>();
private final ConcurrentHashMap<Address, Peer> authenticatedPeers = new ConcurrentHashMap<>();
private final CopyOnWriteArraySet<Address> reportedPeerAddresses = new CopyOnWriteArraySet<>();
;
private final Timer maintenanceTimer = new Timer();
private final Timer getPeersTimer = new Timer();
private volatile boolean shutDownInProgress;
private boolean firstPeerAdded = false;
///////////////////////////////////////////////////////////////////////////////////////////
@ -73,30 +73,10 @@ public class PeerGroup {
networkNode.addMessageListener((message, connection) -> {
if (message instanceof MaintenanceMessage)
processMaintenanceMessage((MaintenanceMessage) message, connection);
else if (message instanceof RequestAuthenticationMessage) {
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, PeerGroup.this, getMyAddress());
SettableFuture<Connection> future = authenticationHandshake.processAuthenticationRequest((RequestAuthenticationMessage) message, connection);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
if (connection != null) {
UserThread.execute(() -> {
setAuthenticated(connection, connection.getPeerAddress());
purgeReportedPeers();
});
}
}
@Override
public void onFailure(@NotNull Throwable throwable) {
throwable.printStackTrace();
log.error("AuthenticationHandshake failed. " + throwable.getMessage());
UserThread.execute(() -> removePeer(connection.getPeerAddress()));
else if (message instanceof AuthenticationRequest) {
processAuthenticationRequest(networkNode, (AuthenticationRequest) message, connection);
}
});
}
});
networkNode.addConnectionListener(new ConnectionListener() {
@Override
@ -121,56 +101,7 @@ public class PeerGroup {
}
});
maintenanceTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
Thread.currentThread().setName("MaintenanceTimer-" + new Random().nextInt(1000));
try {
UserThread.execute(() -> {
disconnectOldConnections();
pingPeers();
});
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing task failed. " + t.getMessage());
}
}
}, MAINTENANCE_INTERVAL, MAINTENANCE_INTERVAL);
}
private void disconnectOldConnections() {
List<Connection> authenticatedConnections = networkNode.getAllConnections().stream()
.filter(e -> e.isAuthenticated())
.collect(Collectors.toList());
if (authenticatedConnections.size() > MAX_CONNECTIONS) {
authenticatedConnections.sort((o1, o2) -> o1.getLastActivityDate().compareTo(o2.getLastActivityDate()));
log.info("Number of connections exceeds MAX_CONNECTIONS. Current size=" + authenticatedConnections.size());
Connection connection = authenticatedConnections.remove(0);
log.info("Shutdown oldest connection with last activity date=" + connection.getLastActivityDate() + " / connection=" + connection);
connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> disconnectOldConnections(), 100, 500, TimeUnit.MILLISECONDS));
}
}
private void pingPeers() {
log.trace("pingPeers");
Set<Peer> connectedPeersList = new HashSet<>(authenticatedPeers.values());
connectedPeersList.stream()
.filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > PING_AFTER_CONNECTION_INACTIVITY)
.forEach(e -> UserThread.runAfterRandomDelay(() -> {
SettableFuture<Connection> future = networkNode.sendMessage(e.connection, new PingMessage(e.getPingNonce()));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("PingMessage sent successfully");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("PingMessage sending failed " + throwable.getMessage());
removePeer(e.address);
}
});
}, 5, 10));
setupMaintenanceTimer();
}
@ -193,7 +124,7 @@ public class PeerGroup {
public void broadcast(BroadcastMessage message, @Nullable Address sender) {
log.trace("Broadcast message to " + authenticatedPeers.values().size() + " peers.");
log.trace("message = " + message);
printConnectedPeersMap();
printAuthenticatedPeers();
// TODO add randomized timing?
authenticatedPeers.values().stream()
@ -221,10 +152,36 @@ public class PeerGroup {
// Authentication to seed node
///////////////////////////////////////////////////////////////////////////////////////////
private void processAuthenticationRequest(NetworkNode networkNode, AuthenticationRequest message, final Connection connection) {
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, PeerGroup.this, getMyAddress());
SettableFuture<Connection> future = authenticationHandshake.processAuthenticationRequest(message, connection);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
if (connection != null) {
UserThread.execute(() -> {
setAuthenticated(connection, connection.getPeerAddress());
purgeReportedPeers();
});
}
}
@Override
public void onFailure(@NotNull Throwable throwable) {
throwable.printStackTrace();
log.error("AuthenticationHandshake failed. " + throwable.getMessage());
UserThread.execute(() -> removePeer(connection.getPeerAddress()));
}
});
}
public void authenticateSeedNode(Address peerAddress) {
authenticateToSeedNode(new HashSet<>(seedNodeAddresses), peerAddress, true);
}
// First we try to connect to 1 seed node. If we fail we try to connect to any reported peer.
// After connection is authenticated, we try to connect to any reported peer as long we have not
// reached our max connection size.
public void authenticateToSeedNode(Set<Address> remainingAddresses, Address peerAddress, boolean continueOnSuccess) {
checkArgument(!authenticatedPeers.containsKey(peerAddress),
"We have that peer already authenticated. That must never happen.");
@ -239,14 +196,14 @@ public class PeerGroup {
if (continueOnSuccess) {
if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) {
log.info("We still don't have enough connections. Lets try the reported peers.");
authenticateToAnyReportedPeer();
authenticateToRemainingReportedPeers();
} else {
log.info("We have already enough connections.");
}
} else {
log.info("We have already tried all reported peers and seed nodes. " +
"We stop bootstrapping now, but will repeat after an while.");
UserThread.runAfter(() -> authenticateToAnyReportedPeer(), 60);
UserThread.runAfter(() -> authenticateToRemainingReportedPeers(), 60);
}
}
}
@ -259,36 +216,33 @@ public class PeerGroup {
// If we fail we try again with the remaining set
remainingAddresses.remove(peerAddress);
List<Address> list = new ArrayList<>(remainingAddresses);
removeAuthenticatedPeersFromList(list);
if (!list.isEmpty()) {
Address item = getAndRemoveRandomItem(list);
log.info("We try to build an authenticated connection to a seed node. " + item);
authenticateToSeedNode(remainingAddresses, item, true);
Optional<Tuple2<Address, Set<Address>>> tupleOptional = getRandomItemAndRemainingSet(remainingAddresses);
if (tupleOptional.isPresent()) {
log.info("We try to authenticate to a seed node. " + tupleOptional.get().first);
authenticateToSeedNode(tupleOptional.get().second, tupleOptional.get().first, true);
} else {
log.info("We don't have any more seed nodes for connecting. Lets try the reported peers.");
authenticateToAnyReportedPeer();
authenticateToRemainingReportedPeers();
}
}
});
}
private void authenticateToAnyReportedPeer() {
// after we have at least one seed node we try to get reported peers connected
List<Address> list = new ArrayList<>(reportedPeerAddresses);
removeAuthenticatedPeersFromList(list);
if (!list.isEmpty()) {
Address item = getAndRemoveRandomItem(list);
log.info("We try to build an authenticated connection to a random peer. " + item + " / list=" + list);
authenticateToReportedPeer(new HashSet<>(list), item);
private void authenticateToRemainingReportedPeers() {
Optional<Tuple2<Address, Set<Address>>> tupleOptional = getRandomItemAndRemainingSet(reportedPeerAddresses);
if (tupleOptional.isPresent()) {
log.info("We try to authenticate to a random peer. " + tupleOptional.get().first);
authenticateToReportedPeer(tupleOptional.get().second, tupleOptional.get().first);
} else {
log.info("We don't have any reported peers for connecting. Lets try the remaining seed nodes.");
authenticateToRemainingSeedNodes();
}
}
public void authenticateToReportedPeer(Set<Address> remainingAddresses, Address peerAddress) {
// We try to connect to a reported peer. If we fail we repeat after the failed peer has been removed.
// If we succeed we repeat until we are ut of addresses.
private void authenticateToReportedPeer(Set<Address> remainingAddresses, Address peerAddress) {
checkArgument(!authenticatedPeers.containsKey(peerAddress),
"We have that peer already authenticated. That must never happen.");
@ -300,8 +254,14 @@ public class PeerGroup {
if (connection != null) {
setAuthenticated(connection, peerAddress);
if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) {
if (reportedPeerAddresses.size() > 0) {
log.info("We still don't have enough connections. " +
"Lets try the remaining reported peer addresses.");
authenticateToRemainingReportedPeers();
} else {
log.info("We still don't have enough connections. Lets try the remaining seed nodes.");
authenticateToRemainingSeedNodes();
}
} else {
log.info("We have already enough connections.");
}
@ -313,101 +273,22 @@ public class PeerGroup {
throwable.printStackTrace();
log.error("AuthenticationHandshake failed. " + throwable.getMessage());
removePeer(peerAddress);
log.info("Authentication failed. Lets try again with the remaining reported peer addresses.");
authenticateToRemainingReportedPeers();
}
});
}
private void authenticateToRemainingSeedNodes() {
// after we have at least one seed node we try to get reported peers connected
List<Address> list = new ArrayList<>(seedNodeAddresses);
removeAuthenticatedPeersFromList(list);
if (!list.isEmpty()) {
Address item = getAndRemoveRandomItem(list);
log.info("We try to build an authenticated connection to a random seed node. " + item + " / list=" + list);
authenticateToSeedNode(new HashSet<>(list), item, false);
Optional<Tuple2<Address, Set<Address>>> tupleOptional = getRandomItemAndRemainingSet(seedNodeAddresses);
if (tupleOptional.isPresent()) {
log.info("We try to authenticate to a random seed node. " + tupleOptional.get().first);
authenticateToSeedNode(tupleOptional.get().second, tupleOptional.get().first, false);
} else {
log.info("We don't have any more seed nodes for connecting. " +
"We stop bootstrapping now, but will repeat after an while.");
UserThread.runAfter(() -> authenticateToAnyReportedPeer(), 60);
}
}
/*private void authenticateToAnyNode1(Set<Address> addresses, Address peerAddress, boolean prioritizeSeedNodes) {
checkArgument(!authenticatedPeers.containsKey(peerAddress),
"We have that peer already authenticated. That must never happen.");
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress());
SettableFuture<Connection> future = authenticationHandshake.requestAuthentication(addresses, peerAddress);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
setAuthenticated(connection, peerAddress);
authenticateToNextRandomPeer();
}
@Override
public void onFailure(Throwable throwable) {
throwable.printStackTrace();
log.error("AuthenticationHandshake failed. " + throwable.getMessage());
removePeer(peerAddress);
authenticateToNextRandomPeer();
}
});
}
private void authenticateToNextRandomPeer() {
UserThread.runAfterRandomDelay(() -> {
log.info("authenticateToNextRandomPeer");
if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) {
Optional<Address> candidate = getRandomReportedPeerAddress();
if (candidate.isPresent()) {
log.info("We try to build an authenticated connection to a random peer. " + candidate.get());
authenticateToReportedPeer(candidate.get(), );
} else {
log.info("No more reportedPeerAddresses available for connecting. We try the remaining seed nodes");
candidate = getRandomSeedNodeAddress();
if (candidate.isPresent()) {
log.info("We try to build an authenticated connection to a random seed node. " + candidate.get());
authenticateToReportedPeer(candidate.get(), get);
} else {
log.info("No more seed nodes available for connecting.");
}
}
} else {
log.info("We have already enough connections.");
}
}, 200, 400, TimeUnit.MILLISECONDS);
}*/
private Optional<Address> getRandomSeedNodeAddress() {
List<Address> list = new ArrayList<>(seedNodeAddresses);
log.debug("### getRandomSeedNodeAddress list " + list);
removeAuthenticatedPeersFromList(list);
log.debug("### list post removeAuthenticatedPeersFromList " + list);
return getRandomEntry(list);
}
private Optional<Address> getRandomReportedPeerAddress() {
List<Address> list = new ArrayList<>(reportedPeerAddresses);
log.debug("### list reportedPeerAddresses " + reportedPeerAddresses);
log.debug("### list authenticatedPeers " + authenticatedPeers);
log.debug("### list pre " + list);
removeAuthenticatedPeersFromList(list);
log.debug("### list post " + list);
return getRandomEntry(list);
}
private void removeAuthenticatedPeersFromList(List<Address> list) {
authenticatedPeers.values().stream().forEach(e -> list.remove(e.address));
}
private Optional<Address> getRandomEntry(List<Address> list) {
if (list.size() > 0) {
Collections.shuffle(list);
return Optional.of(list.get(0));
} else {
return Optional.empty();
UserThread.runAfter(() -> authenticateToRemainingReportedPeers(), 60);
}
}
@ -453,16 +334,15 @@ public class PeerGroup {
connection.setAuthenticated(peerAddress, connection);
Peer peer = new Peer(connection);
addAuthenticatedPeer(peerAddress, peer);
addAuthenticatedPeer(new Peer(connection));
peerListeners.stream().forEach(e -> e.onConnectionAuthenticated(connection));
}
private void addAuthenticatedPeer(Address address, Peer peer) {
boolean firstPeerAdded;
authenticatedPeers.put(address, peer);
firstPeerAdded = authenticatedPeers.size() == 1;
private void addAuthenticatedPeer(Peer peer) {
authenticatedPeers.put(peer.address, peer);
reportedPeerAddresses.remove(peer.address);
firstPeerAdded = !firstPeerAdded && authenticatedPeers.size() == 1;
UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onPeerAdded(peer)));
@ -472,7 +352,154 @@ public class PeerGroup {
if (authenticatedPeers.size() > MAX_CONNECTIONS)
disconnectOldConnections();
printConnectedPeersMap();
printAuthenticatedPeers();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Maintenance
///////////////////////////////////////////////////////////////////////////////////////////
private void setupMaintenanceTimer() {
maintenanceTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
Thread.currentThread().setName("MaintenanceTimer-" + new Random().nextInt(1000));
try {
UserThread.execute(() -> {
disconnectOldConnections();
pingPeers();
});
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing task failed. " + t.getMessage());
}
}
}, MAINTENANCE_INTERVAL, MAINTENANCE_INTERVAL);
getPeersTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
Thread.currentThread().setName("GetPeersTimer-" + new Random().nextInt(1000));
try {
UserThread.execute(() -> sendAnnounceAndGetPeersMessage());
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing task failed. " + t.getMessage());
}
}
}, GET_PEERS_INTERVAL, GET_PEERS_INTERVAL);
}
private void disconnectOldConnections() {
List<Connection> authenticatedConnections = networkNode.getAllConnections().stream()
.filter(e -> e.isAuthenticated())
.collect(Collectors.toList());
if (authenticatedConnections.size() > MAX_CONNECTIONS) {
authenticatedConnections.sort((o1, o2) -> o1.getLastActivityDate().compareTo(o2.getLastActivityDate()));
log.info("Number of connections exceeds MAX_CONNECTIONS. Current size=" + authenticatedConnections.size());
Connection connection = authenticatedConnections.remove(0);
log.info("Shutdown oldest connection with last activity date=" + connection.getLastActivityDate() + " / connection=" + connection);
connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> disconnectOldConnections(), 100, 500, TimeUnit.MILLISECONDS));
}
}
private void pingPeers() {
log.trace("pingPeers");
Set<Peer> connectedPeersList = new HashSet<>(authenticatedPeers.values());
connectedPeersList.stream()
.filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > PING_AFTER_CONNECTION_INACTIVITY)
.forEach(e -> UserThread.runAfterRandomDelay(() -> {
SettableFuture<Connection> future = networkNode.sendMessage(e.connection, new PingMessage(e.getPingNonce()));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("PingMessage sent successfully");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("PingMessage sending failed " + throwable.getMessage());
removePeer(e.address);
}
});
}, 5, 10));
}
private void sendAnnounceAndGetPeersMessage() {
log.trace("sendAnnounceAndGetPeersMessage");
Set<Peer> connectedPeersList = new HashSet<>(authenticatedPeers.values());
connectedPeersList.stream()
.forEach(e -> UserThread.runAfterRandomDelay(() -> {
SettableFuture<Connection> future = networkNode.sendMessage(e.connection,
new GetPeersRequest(getMyAddress(), new HashSet<>(getAllPeerAddresses())));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("AnnounceAndGetPeersMessage sent successfully");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("AnnounceAndGetPeersMessage sending failed " + throwable.getMessage());
removePeer(e.address);
}
});
}, 5, 10));
}
private void processMaintenanceMessage(MaintenanceMessage message, Connection connection) {
log.debug("Received message " + message + " at " + getMyAddress() + " from " + connection.getPeerAddress());
if (message instanceof PingMessage) {
SettableFuture<Connection> future = networkNode.sendMessage(connection, new PongMessage(((PingMessage) message).nonce));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("PongMessage sent successfully");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("PongMessage sending failed " + throwable.getMessage());
removePeer(connection.getPeerAddress());
}
});
} else if (message instanceof PongMessage) {
if (connection.getPeerAddress() != null) {
Peer peer = authenticatedPeers.get(connection.getPeerAddress());
if (peer != null) {
if (((PongMessage) message).nonce != peer.getPingNonce()) {
removePeer(peer.address);
log.warn("PongMessage invalid: self/peer " + getMyAddress() + "/" + connection.getPeerAddress());
}
}
}
} else if (message instanceof GetPeersRequest) {
GetPeersRequest getPeersRequestMessage = (GetPeersRequest) message;
HashSet<Address> peerAddresses = getPeersRequestMessage.peerAddresses;
log.trace("Received peers: " + peerAddresses);
addToReportedPeers(peerAddresses, connection);
SettableFuture<Connection> future = networkNode.sendMessage(connection,
new GetPeersResponse(getMyAddress(), new HashSet<>(getAllPeerAddresses())));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("GetPeersResponse sent successfully");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("GetPeersResponse sending failed " + throwable.getMessage());
removePeer(getPeersRequestMessage.address);
}
});
} else if (message instanceof GetPeersResponse) {
GetPeersResponse getPeersResponse = (GetPeersResponse) message;
HashSet<Address> peerAddresses = getPeersResponse.peerAddresses;
log.trace("Received peers: " + peerAddresses);
addToReportedPeers(peerAddresses, connection);
}
}
@ -555,38 +582,6 @@ public class PeerGroup {
}
///////////////////////////////////////////////////////////////////////////////////////////
// Maintenance
///////////////////////////////////////////////////////////////////////////////////////////
private void processMaintenanceMessage(MaintenanceMessage message, Connection connection) {
log.debug("Received message " + message + " at " + getMyAddress() + " from " + connection.getPeerAddress());
if (message instanceof PingMessage) {
SettableFuture<Connection> future = networkNode.sendMessage(connection, new PongMessage(((PingMessage) message).nonce));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("PongMessage sent successfully");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("PongMessage sending failed " + throwable.getMessage());
removePeer(connection.getPeerAddress());
}
});
} else if (message instanceof PongMessage) {
Peer peer = authenticatedPeers.get(connection.getPeerAddress());
if (peer != null) {
if (((PongMessage) message).nonce != peer.getPingNonce()) {
removePeer(peer.address);
log.warn("PongMessage invalid: self/peer " + getMyAddress() + "/" + connection.getPeerAddress());
}
}
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Peers
///////////////////////////////////////////////////////////////////////////////////////////
@ -599,8 +594,8 @@ public class PeerGroup {
if (disconnectedPeer != null)
UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onPeerRemoved(peerAddress)));
printConnectedPeersMap();
printReportedPeersMap();
printAuthenticatedPeers();
printReportedPeers();
}
private Address getMyAddress() {
@ -616,21 +611,39 @@ public class PeerGroup {
return list.remove(new Random().nextInt(list.size()));
}
public void printConnectedPeersMap() {
StringBuilder result = new StringBuilder("\nConnected peers for node " + getMyAddress() + ":");
private Optional<Tuple2<Address, Set<Address>>> getRandomItemAndRemainingSet(Set<Address> remainingAddresses) {
List<Address> list = new ArrayList<>(remainingAddresses);
authenticatedPeers.values().stream().forEach(e -> list.remove(e.address));
if (!list.isEmpty()) {
Address item = getAndRemoveRandomItem(list);
return Optional.of(new Tuple2<>(item, new HashSet<>(list)));
} else {
return Optional.empty();
}
}
public void printAllPeers() {
printAuthenticatedPeers();
printReportedPeers();
}
public void printAuthenticatedPeers() {
StringBuilder result = new StringBuilder("\n\n############################################################\n" +
"Authenticated peers for node " + getMyAddress() + ":");
authenticatedPeers.values().stream().forEach(e -> {
result.append("\n\t" + e.address);
result.append("\n" + e.address);
});
result.append("\n");
result.append("\n############################################################\n");
log.info(result.toString());
}
public void printReportedPeersMap() {
StringBuilder result = new StringBuilder("\nReported peerAddresses for node " + getMyAddress() + ":");
public void printReportedPeers() {
StringBuilder result = new StringBuilder("\n\n############################################################\n" +
"Reported peers for node " + getMyAddress() + ":");
reportedPeerAddresses.stream().forEach(e -> {
result.append("\n\t" + e);
result.append("\n" + e);
});
result.append("\n");
result.append("\n############################################################\n");
log.info(result.toString());
}
}

View File

@ -1,4 +1,4 @@
package io.bitsquare.p2p.peer;
package io.bitsquare.p2p.peers;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.network.Connection;

View File

@ -1,4 +1,4 @@
package io.bitsquare.p2p.peer.messages;
package io.bitsquare.p2p.peers.messages.auth;
import io.bitsquare.p2p.Message;

View File

@ -1,16 +1,16 @@
package io.bitsquare.p2p.peer.messages;
package io.bitsquare.p2p.peers.messages.auth;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Address;
public final class RequestAuthenticationMessage implements AuthenticationMessage {
public final class AuthenticationRequest implements AuthenticationMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final Address address;
public final long nonce;
public RequestAuthenticationMessage(Address address, long nonce) {
public AuthenticationRequest(Address address, long nonce) {
this.address = address;
this.nonce = nonce;
}

View File

@ -1,9 +1,9 @@
package io.bitsquare.p2p.peer.messages;
package io.bitsquare.p2p.peers.messages.auth;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Address;
public final class ChallengeMessage implements AuthenticationMessage {
public final class AuthenticationResponse implements AuthenticationMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
@ -11,7 +11,7 @@ public final class ChallengeMessage implements AuthenticationMessage {
public final long requesterNonce;
public final long challengerNonce;
public ChallengeMessage(Address address, long requesterNonce, long challengerNonce) {
public AuthenticationResponse(Address address, long requesterNonce, long challengerNonce) {
this.address = address;
this.requesterNonce = requesterNonce;
this.challengerNonce = challengerNonce;

View File

@ -1,11 +1,11 @@
package io.bitsquare.p2p.peer.messages;
package io.bitsquare.p2p.peers.messages.auth;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Address;
import java.util.HashSet;
public final class GetPeersMessage implements AuthenticationMessage {
public final class GetPeersAuthRequest implements AuthenticationMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
@ -13,7 +13,7 @@ public final class GetPeersMessage implements AuthenticationMessage {
public final long challengerNonce;
public final HashSet<Address> peerAddresses;
public GetPeersMessage(Address address, long challengerNonce, HashSet<Address> peerAddresses) {
public GetPeersAuthRequest(Address address, long challengerNonce, HashSet<Address> peerAddresses) {
this.address = address;
this.challengerNonce = challengerNonce;
this.peerAddresses = peerAddresses;

View File

@ -1,18 +1,18 @@
package io.bitsquare.p2p.peer.messages;
package io.bitsquare.p2p.peers.messages.auth;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Address;
import java.util.HashSet;
public final class PeersMessage implements AuthenticationMessage {
public final class GetPeersAuthResponse implements AuthenticationMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final Address address;
public final HashSet<Address> peerAddresses;
public PeersMessage(Address address, HashSet<Address> peerAddresses) {
public GetPeersAuthResponse(Address address, HashSet<Address> peerAddresses) {
this.address = address;
this.peerAddresses = peerAddresses;
}

View File

@ -0,0 +1,27 @@
package io.bitsquare.p2p.peers.messages.maintenance;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Address;
import java.util.HashSet;
public final class GetPeersRequest implements MaintenanceMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final Address address;
public final HashSet<Address> peerAddresses;
public GetPeersRequest(Address address, HashSet<Address> peerAddresses) {
this.address = address;
this.peerAddresses = peerAddresses;
}
@Override
public String toString() {
return "GetPeersMessage{" +
"address=" + address +
", peerAddresses=" + peerAddresses +
'}';
}
}

View File

@ -0,0 +1,27 @@
package io.bitsquare.p2p.peers.messages.maintenance;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Address;
import java.util.HashSet;
public final class GetPeersResponse implements MaintenanceMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final Address address;
public final HashSet<Address> peerAddresses;
public GetPeersResponse(Address address, HashSet<Address> peerAddresses) {
this.address = address;
this.peerAddresses = peerAddresses;
}
@Override
public String toString() {
return "GetPeersMessage{" +
"address=" + address +
", peerAddresses=" + peerAddresses +
'}';
}
}

View File

@ -1,4 +1,4 @@
package io.bitsquare.p2p.peer.messages;
package io.bitsquare.p2p.peers.messages.maintenance;
import io.bitsquare.p2p.Message;

View File

@ -1,4 +1,4 @@
package io.bitsquare.p2p.peer.messages;
package io.bitsquare.p2p.peers.messages.maintenance;
import io.bitsquare.app.Version;

View File

@ -1,4 +1,4 @@
package io.bitsquare.p2p.peer.messages;
package io.bitsquare.p2p.peers.messages.maintenance;
import io.bitsquare.app.Version;

View File

@ -51,7 +51,7 @@ public class SeedNode {
checkArgument(arg1.equals("true") || arg1.equals("false"));
useLocalhost = ("true").equals(arg1);
if (args.length == 3) {
if (args.length > 2) {
String arg2 = args[2];
checkArgument(arg2.contains(":") && arg2.split(":").length > 1 && arg2.split(":")[1].length() > 3, "Wrong program argument");
List<String> list = Arrays.asList(arg2.split("|"));
@ -61,8 +61,8 @@ public class SeedNode {
seedNodes.add(new Address(e));
});
seedNodes.remove(mySeedNodeAddress);
} else {
log.error("Wrong number of program arguments." +
} else if (args.length > 3) {
log.error("Too many program arguments." +
"\nProgram arguments: myAddress useLocalhost seedNodes");
}
}

View File

@ -8,7 +8,7 @@ import io.bitsquare.common.crypto.Sig;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.network.IllegalRequest;
import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.peer.PeerGroup;
import io.bitsquare.p2p.peers.PeerGroup;
import io.bitsquare.p2p.storage.data.*;
import io.bitsquare.p2p.storage.messages.*;
import io.bitsquare.storage.Storage;
@ -130,10 +130,10 @@ public class ProtectedExpirableDataStorage {
log.trace("Data added to our map and it will be broadcasted to our peers.");
UserThread.execute(() -> hashMapChangedListeners.stream().forEach(e -> e.onAdded(protectedData)));
StringBuilder sb = new StringBuilder("\n\n----------------------------------------------------\n" +
StringBuilder sb = new StringBuilder("\n\n############################################################\n" +
"Data set after addProtectedExpirableData:");
map.values().stream().forEach(e -> sb.append("\n\n").append(e.toString()));
sb.append("\n----------------------------------------------------\n\n");
map.values().stream().forEach(e -> sb.append("\n").append(e.toString()));
sb.append("\n############################################################\n");
log.trace(sb.toString());
if (!containsKey)
@ -247,9 +247,10 @@ public class ProtectedExpirableDataStorage {
log.trace("Data removed from our map. We broadcast the message to our peers.");
UserThread.execute(() -> hashMapChangedListeners.stream().forEach(e -> e.onRemoved(protectedData)));
StringBuilder sb = new StringBuilder("\n\nSet after removeProtectedExpirableData:\n");
map.values().stream().forEach(e -> sb.append(e.toString() + "\n\n"));
sb.append("\n\n");
StringBuilder sb = new StringBuilder("\n\n############################################################\n" +
"Data set after removeProtectedExpirableData:");
map.values().stream().forEach(e -> sb.append("\n").append(e.toString()));
sb.append("\n############################################################\n");
log.trace(sb.toString());
}

View File

@ -3,10 +3,10 @@ package io.bitsquare.p2p.storage.messages;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Message;
public final class GetAllDataMessage implements Message {
public final class GetDataRequest implements Message {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public GetAllDataMessage() {
public GetDataRequest() {
}
}

View File

@ -6,22 +6,22 @@ import io.bitsquare.p2p.storage.data.ProtectedData;
import java.util.HashSet;
public final class AllDataMessage implements Message {
public final class GetDataResponse implements Message {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final HashSet<ProtectedData> set;
public AllDataMessage(HashSet<ProtectedData> set) {
public GetDataResponse(HashSet<ProtectedData> set) {
this.set = set;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof AllDataMessage)) return false;
if (!(o instanceof GetDataResponse)) return false;
AllDataMessage that = (AllDataMessage) o;
GetDataResponse that = (GetDataResponse) o;
return !(set != null ? !set.equals(that.set) : that.set != null);

View File

@ -10,4 +10,7 @@
<appender-ref ref="CONSOLE_APPENDER"/>
</root>
<logger name="com.msopentech.thali.toronionproxy.OnionProxyManagerEventHandler" level="WARN"/>
</configuration>

View File

@ -8,7 +8,7 @@ import io.bitsquare.p2p.messaging.MailboxMessage;
import io.bitsquare.p2p.messaging.SendMailboxMessageListener;
import io.bitsquare.p2p.mocks.MockMailboxMessage;
import io.bitsquare.p2p.network.LocalhostNetworkNode;
import io.bitsquare.p2p.peer.PeerGroup;
import io.bitsquare.p2p.peers.PeerGroup;
import io.bitsquare.p2p.seed.SeedNode;
import io.bitsquare.p2p.storage.data.DataAndSeqNr;
import io.bitsquare.p2p.storage.data.ProtectedData;

View File

@ -1,7 +1,7 @@
package io.bitsquare.p2p.network;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.peer.messages.RequestAuthenticationMessage;
import io.bitsquare.p2p.peers.messages.auth.AuthenticationRequest;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.junit.Before;
import org.junit.Ignore;
@ -77,8 +77,8 @@ public class LocalhostNetworkNodeTest {
});
startupLatch.await();
node2.sendMessage(new Address("localhost", 9001), new RequestAuthenticationMessage(new Address("localhost", 9002), 1));
node1.sendMessage(new Address("localhost", 9002), new RequestAuthenticationMessage(new Address("localhost", 9001), 1));
node2.sendMessage(new Address("localhost", 9001), new AuthenticationRequest(new Address("localhost", 9002), 1));
node1.sendMessage(new Address("localhost", 9002), new AuthenticationRequest(new Address("localhost", 9001), 1));
msgLatch.await();
CountDownLatch shutDownLatch = new CountDownLatch(2);

View File

@ -6,8 +6,8 @@ import io.bitsquare.p2p.P2PService;
import io.bitsquare.p2p.P2PServiceListener;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.LocalhostNetworkNode;
import io.bitsquare.p2p.peer.AuthenticationListener;
import io.bitsquare.p2p.peer.PeerGroup;
import io.bitsquare.p2p.peers.AuthenticationListener;
import io.bitsquare.p2p.peers.PeerGroup;
import io.bitsquare.p2p.seed.SeedNode;
import org.junit.*;
import org.slf4j.Logger;
@ -370,8 +370,8 @@ public class PeerGroupTest {
// total authentications at com nodes = 90, System load (nr. threads/used memory (MB)): 170/20
// total authentications at 20 nodes = 380, System load (nr. threads/used memory (MB)): 525/46
for (int i = 0; i < length; i++) {
nodes[i].getP2PService().getPeerGroup().printConnectedPeersMap();
nodes[i].getP2PService().getPeerGroup().printReportedPeersMap();
nodes[i].getP2PService().getPeerGroup().printAuthenticatedPeers();
nodes[i].getP2PService().getPeerGroup().printReportedPeers();
}
CountDownLatch shutDownLatch = new CountDownLatch(length);

View File

@ -9,7 +9,7 @@ import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.TestUtils;
import io.bitsquare.p2p.mocks.MockMessage;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.peer.PeerGroup;
import io.bitsquare.p2p.peers.PeerGroup;
import io.bitsquare.p2p.storage.data.DataAndSeqNr;
import io.bitsquare.p2p.storage.data.ExpirableMailboxPayload;
import io.bitsquare.p2p.storage.data.ProtectedData;

View File

@ -23,9 +23,16 @@
</encoder>
</appender>
<root level="TRACE">
<root level="INFO">
<appender-ref ref="CONSOLE_APPENDER"/>
</root>
<!-- <logger name="io.bitsquare.p2p.peers.PeerGroup" level="INFO"/>
<logger name="io.bitsquare.p2p.P2PService" level="INFO"/>
<logger name="io.bitsquare.p2p.storage.ProtectedExpirableDataStorage" level="INFO"/>
<logger name="io.bitsquare.p2p.network.LocalhostNetworkNode" level="INFO"/>
<logger name="io.bitsquare.p2p.network.TorNetworkNode" level="TRACE"/>
<logger name="io.bitsquare.p2p.network.NetworkNode" level="TRACE"/>-->
<logger name="com.msopentech.thali.toronionproxy.OnionProxyManagerEventHandler" level="WARN"/>