Match seed nodes ports with network id, improve logs

This commit is contained in:
Manfred Karrer 2015-11-10 17:29:41 +01:00
parent 5618f23654
commit 6b89f19927
30 changed files with 259 additions and 173 deletions

View file

@ -62,4 +62,13 @@ public final class SealedAndSigned implements Serializable {
return result;
}
@Override
public String toString() {
return "SealedAndSigned{" +
"encryptedSecretKey.hashCode()=" + Arrays.toString(encryptedSecretKey).hashCode() +
", encryptedPayloadWithHmac.hashCode()=" + Arrays.toString(encryptedPayloadWithHmac).hashCode() +
", signature.hashCode()=" + Arrays.toString(signature).hashCode() +
", sigPublicKey.hashCode()=" + sigPublicKey.hashCode() +
'}';
}
}

View file

@ -74,7 +74,7 @@
</GridPane.margin>
</TextField>
<Label text="Authenticated peers:" GridPane.rowIndex="4"/>
<Label fx:id="authenticatedPeersLabel" text="Authenticated peers:" GridPane.rowIndex="4"/>
<TextArea fx:id="authenticatedPeersTextArea" GridPane.rowIndex="4" GridPane.columnIndex="1"
mouseTransparent="true" focusTraversable="false"/>

View file

@ -35,7 +35,9 @@ import io.bitsquare.user.Preferences;
import javafx.beans.value.ChangeListener;
import javafx.collections.FXCollections;
import javafx.fxml.FXML;
import javafx.geometry.Insets;
import javafx.scene.control.ComboBox;
import javafx.scene.control.Label;
import javafx.scene.control.TextArea;
import javafx.scene.control.TextField;
import javafx.scene.layout.GridPane;
@ -53,12 +55,16 @@ public class NetworkSettingsView extends ActivatableViewAndModel<GridPane, Activ
private final Preferences preferences;
private final P2PService p2PService;
@FXML
TextField bitcoinNetwork, onionAddress, connectedPeersBTC;
@FXML
ComboBox<BitcoinNetwork> netWorkComboBox;
@FXML
TextArea authenticatedPeersTextArea;
@FXML
Label authenticatedPeersLabel;
private P2PServiceListener p2PServiceListener;
private ChangeListener<Number> numAuthenticatedPeersChangeListener;
private Set<Address> seedNodeAddresses;
@ -77,6 +83,7 @@ public class NetworkSettingsView extends ActivatableViewAndModel<GridPane, Activ
}
public void initialize() {
GridPane.setMargin(authenticatedPeersLabel, new Insets(4, 0, 0, 0));
bitcoinNetwork.setText(bitcoinNetworkString);
connectedPeersBTC.textProperty().bind(createStringBinding(() -> String.valueOf(walletService.numPeersProperty().get()), walletService
.numPeersProperty()));

View file

@ -164,7 +164,7 @@ public class FormBuilder {
public static Tuple2<Label, TextArea> addLabelTextArea(GridPane gridPane, int rowIndex, String title, String prompt, double top) {
Label label = addLabel(gridPane, rowIndex, title, 0);
GridPane.setMargin(label, new Insets(top, 0, 0, 0));
GridPane.setMargin(label, new Insets(top + 4, 0, 0, 0));
GridPane.setValignment(label, VPos.TOP);
TextArea textArea = new TextArea();

View file

@ -86,7 +86,7 @@ public abstract class Connection implements Closeable {
}
protected void onMessage(Message msg) throws IOException {
log.debug("RXD: " + msg.toString());
log.debug("onMessage: " + msg.toString());
if (msg instanceof ContainerMessage) {
synchronized (connectionListeners) {
for (ConnectionListener l : connectionListeners)

View file

@ -25,4 +25,12 @@ public final class SealedAndSignedMessage implements MailboxMessage {
public int networkId() {
return networkId;
}
@Override
public String toString() {
return "SealedAndSignedMessage{" +
"networkId=" + networkId +
", sealedAndSigned=" + sealedAndSigned +
'}';
}
}

View file

@ -159,11 +159,12 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override
public void onMessage(Message message, Connection connection) {
Log.traceCall();
if (message instanceof GetDataRequest) {
Log.traceCall(message.toString());
log.info("Received GetDataSetMessage: " + message);
networkNode.sendMessage(connection, new GetDataResponse(getDataSet()));
} else if (message instanceof GetDataResponse) {
Log.traceCall(message.toString());
GetDataResponse getDataResponse = (GetDataResponse) message;
log.info("Received GetDataResponse: " + message);
HashSet<ProtectedData> set = getDataResponse.set;
@ -176,6 +177,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
setRequestingDataCompleted();
} else if (message instanceof SealedAndSignedMessage) {
Log.traceCall(message.toString());
if (encryptionService != null) {
try {
SealedAndSignedMessage sealedAndSignedMessage = (SealedAndSignedMessage) message;

View file

@ -61,8 +61,8 @@ public final class DecryptedMsgWithPubKey implements MailMessage {
@Override
public String toString() {
return "DecryptedMessageWithPubKey{" +
"hashCode=" + hashCode() +
return "DecryptedMsgWithPubKey{" +
"networkId=" + networkId +
", message=" + message +
", signaturePubKey.hashCode()=" + signaturePubKey.hashCode() +
'}';

View file

@ -47,7 +47,6 @@ public class Connection implements MessageListener {
private final String portInfo;
private final String uid;
private final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
public final String objectId;
// set in init
private ObjectOutputStream objectOutputStream;
@ -75,8 +74,6 @@ public class Connection implements MessageListener {
this.messageListener = messageListener;
this.connectionListener = connectionListener;
objectId = super.toString().split("@")[1];
Log.traceCall();
uid = UUID.randomUUID().toString();
if (socket.getLocalPort() == 0)
@ -133,7 +130,10 @@ public class Connection implements MessageListener {
Log.traceCall();
if (!stopped) {
try {
log.info("writeObject " + message + " on connection with port " + portInfo);
log.info("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Write object to outputStream to peer: {} ({objectId=})\nmessage={}"
+ "\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n", getPeerAddress(), uid, message);
Object objectToWrite;
if (useCompression) {
byte[] messageAsBytes = ByteArrayUtils.objectToByteArray(message);
@ -189,27 +189,22 @@ public class Connection implements MessageListener {
@Nullable
public synchronized Address getPeerAddress() {
//Log.traceCall();
return peerAddress;
}
public Date getLastActivityDate() {
//Log.traceCall();
return sharedSpace.getLastActivityDate();
}
public boolean isAuthenticated() {
//Log.traceCall();
return isAuthenticated;
}
public String getUid() {
Log.traceCall();
return uid;
}
public boolean isStopped() {
Log.traceCall();
return stopped;
}
@ -219,17 +214,14 @@ public class Connection implements MessageListener {
///////////////////////////////////////////////////////////////////////////////////////////
public void shutDown(Runnable completeHandler) {
// Log.traceCall();
shutDown(true, completeHandler);
}
public void shutDown() {
//Log.traceCall();
shutDown(true, null);
}
private void shutDown(boolean sendCloseConnectionMessage) {
//Log.traceCall();
shutDown(sendCloseConnectionMessage, null);
}
@ -241,7 +233,7 @@ public class Connection implements MessageListener {
+ "\npeerAddress=" + peerAddress
+ "\nlocalPort/port=" + sharedSpace.getSocket().getLocalPort()
+ "/" + sharedSpace.getSocket().getPort()
+ "\nobjectId=" + objectId + " / uid=" + uid
+ "\nuid=" + uid
+ "\nisAuthenticated=" + isAuthenticated
+ "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n");
@ -249,10 +241,15 @@ public class Connection implements MessageListener {
if (sendCloseConnectionMessage) {
new Thread(() -> {
Thread.currentThread().setName("Connection:SendCloseConnectionMessage-" + this.objectId);
Thread.currentThread().setName("Connection:SendCloseConnectionMessage-" + this.uid);
Log.traceCall("sendCloseConnectionMessage");
try {
sendMessage(new CloseConnectionMessage());
stopped = true;
sharedSpace.stop();
if (inputHandler != null)
inputHandler.stop();
// TODO increase delay
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
@ -263,6 +260,10 @@ public class Connection implements MessageListener {
}
}).start();
} else {
stopped = true;
sharedSpace.stop();
if (inputHandler != null)
inputHandler.stop();
continueShutDown(shutDownCompleteHandler);
}
}
@ -270,11 +271,6 @@ public class Connection implements MessageListener {
private void continueShutDown(@Nullable Runnable shutDownCompleteHandler) {
Log.traceCall();
stopped = true;
sharedSpace.stop();
if (inputHandler != null)
inputHandler.stop();
ConnectionListener.Reason shutDownReason = sharedSpace.getShutDownReason();
if (shutDownReason == null)
shutDownReason = ConnectionListener.Reason.SHUT_DOWN;
@ -326,7 +322,6 @@ public class Connection implements MessageListener {
return "Connection{" +
"portInfo=" + portInfo +
", uid='" + uid + '\'' +
", objectId='" + objectId + '\'' +
", sharedSpace=" + sharedSpace.toString() +
", peerAddress=" + peerAddress +
", isAuthenticated=" + isAuthenticated +
@ -398,7 +393,6 @@ public class Connection implements MessageListener {
public void handleConnectionException(Exception e) {
Log.traceCall(e.toString());
log.debug("Exception might be expected: " + e.toString());
if (e instanceof SocketException) {
if (socket.isClosed())
shutDownReason = ConnectionListener.Reason.SOCKET_CLOSED;
@ -493,8 +487,11 @@ public class Connection implements MessageListener {
try {
log.trace("InputHandler waiting for incoming messages connection=" + sharedSpace.getConnectionInfo());
Object rawInputObject = objectInputStream.readObject();
log.info("New data arrived at inputHandler of connection=" + sharedSpace.getConnectionInfo()
+ " rawInputObject " + rawInputObject);
log.trace("New data arrived at inputHandler.Connection=" + sharedSpace.getConnectionInfo());
log.info("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" +
"New data arrived at inputHandler.\nReceived object={}"
+ "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n", rawInputObject);
int size = ByteArrayUtils.objectToByteArray(rawInputObject).length;
if (size > getMaxMsgSize()) {

View file

@ -138,7 +138,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
Log.traceCall();
// connection.sendMessage might take a bit (compression, write to stream), so we use a thread to not block
ListenableFuture<Connection> future = executorService.submit(() -> {
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + connection.objectId);
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + connection.getUid());
try {
connection.sendMessage(message);
return connection;
@ -239,7 +239,6 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
@Override
public void onMessage(Message message, Connection connection) {
Log.traceCall();
messageListeners.stream().forEach(e -> e.onMessage(message, connection));
}

View file

@ -62,8 +62,8 @@ public class AuthenticationHandshake implements MessageListener {
@Override
public void onMessage(Message message, Connection connection) {
Log.traceCall();
if (message instanceof AuthenticationMessage) {
Log.traceCall(message.toString());
if (message instanceof AuthenticationResponse) {
// Requesting peer
AuthenticationResponse authenticationResponse = (AuthenticationResponse) message;
@ -120,7 +120,7 @@ public class AuthenticationHandshake implements MessageListener {
});
log.info("AuthenticationComplete: Peer with address " + peerAddress
+ " authenticated (" + connection.objectId + "). Took "
+ " authenticated (" + connection.getUid() + "). Took "
+ (System.currentTimeMillis() - startAuthTs) + " ms.");
onSuccess(connection);
@ -140,7 +140,7 @@ public class AuthenticationHandshake implements MessageListener {
// we wait until the handshake is completed before setting the authenticate flag
// authentication at both sides of the connection
log.info("AuthenticationComplete: Peer with address " + peerAddress
+ " authenticated (" + connection.objectId + "). Took "
+ " authenticated (" + connection.getUid() + "). Took "
+ (System.currentTimeMillis() - startAuthTs) + " ms.");
onSuccess(connection);

View file

@ -22,6 +22,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -43,10 +44,11 @@ public class PeerGroup implements MessageListener, ConnectionListener {
MAX_CONNECTIONS = maxConnections;
}
private static final int SEND_PING_INTERVAL = new Random().nextInt(2 * 60 * 1000) + 2 * 60 * 1000; // 2-4 min.
private static final int GET_PEERS_INTERVAL = new Random().nextInt(1 * 60 * 1000) + 1 * 60 * 1000; // 1-2 min.
private static final int PING_AFTER_CONNECTION_INACTIVITY = 30 * 1000;
private static final int MAX_REPORTED_PEERS = 1000;
private static final int SEND_PING_INTERVAL = new Random().nextInt(2 * 60 * 1000) + 2 * 60 * 1000 * 1000; // 2-4 min.
private static final int GET_PEERS_INTERVAL = 10000 * 1000;//new Random().nextInt(1 * 60 * 1000) + 1 * 60 * 1000; // 1-2 min.
private static final int PING_AFTER_CONNECTION_INACTIVITY = 30 * 1000 * 1000;
private static final int MAX_REPORTED_PEERS = 1000 * 1000;
private static final int RETRY_FILL_AUTH_PEERS = 10000 * 1000;
private final NetworkNode networkNode;
private final Set<Address> seedNodeAddresses;
@ -54,6 +56,8 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private final Set<PeerListener> peerListeners = new HashSet<>();
private final Map<Address, Peer> authenticatedPeers = new HashMap<>();
private final Set<Address> reportedPeerAddresses = new HashSet<>();
private final Map<Address, AuthenticationHandshake> authenticationHandshakes = new ConcurrentHashMap<>();
private final Timer sendPingTimer = new Timer();
private final Timer getPeersTimer = new Timer();
@ -84,12 +88,10 @@ public class PeerGroup implements MessageListener, ConnectionListener {
@Override
public void onMessage(Message message, Connection connection) {
Log.traceCall();
if (message instanceof MaintenanceMessage)
processMaintenanceMessage((MaintenanceMessage) message, connection);
else if (message instanceof AuthenticationRequest) {
else if (message instanceof AuthenticationRequest)
processAuthenticationRequest(networkNode, (AuthenticationRequest) message, connection);
}
}
@ -171,25 +173,34 @@ public class PeerGroup implements MessageListener, ConnectionListener {
///////////////////////////////////////////////////////////////////////////////////////////
private void processAuthenticationRequest(NetworkNode networkNode, AuthenticationRequest message, final Connection connection) {
Log.traceCall();
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, PeerGroup.this, getMyAddress());
SettableFuture<Connection> future = authenticationHandshake.processAuthenticationRequest(message, connection);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
if (connection != null) {
setAuthenticated(connection, connection.getPeerAddress());
purgeReportedPeersIfExceeds();
Log.traceCall(message.toString());
Address peerAddress = message.address;
if (!authenticationHandshakes.containsKey(peerAddress)) {
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, PeerGroup.this, getMyAddress());
authenticationHandshakes.put(peerAddress, authenticationHandshake);
SettableFuture<Connection> future = authenticationHandshake.processAuthenticationRequest(message, connection);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
if (connection != null && peerAddress.equals(connection.getPeerAddress())) {
setAuthenticated(connection, peerAddress);
purgeReportedPeersIfExceeds();
} else {
log.error("Incorrect state at processAuthenticationRequest.onSuccess:\n" +
"peerAddress={}\nconnection=", peerAddress, connection);
}
}
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.error("AuthenticationHandshake failed. " + throwable.getMessage());
throwable.printStackTrace();
removePeer(connection.getPeerAddress());
}
});
@Override
public void onFailure(@NotNull Throwable throwable) {
log.error("AuthenticationHandshake failed. " + throwable.getMessage());
throwable.printStackTrace();
removePeer(connection.getPeerAddress());
}
});
} else {
log.warn("An authentication handshake is already created for that peerAddress ({})", peerAddress);
}
}
public void authenticateSeedNode(Address peerAddress) {
@ -204,55 +215,60 @@ public class PeerGroup implements MessageListener, ConnectionListener {
Log.traceCall(peerAddress.getFullAddress());
checkArgument(!authenticatedPeers.containsKey(peerAddress),
"We have that peer already authenticated. That must never happen.");
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress());
SettableFuture<Connection> future = authenticationHandshake.requestAuthentication(remainingAddresses, peerAddress);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
if (connection != null) {
setAuthenticated(connection, peerAddress);
if (continueOnSuccess) {
if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) {
log.info("We still don't have enough connections. Lets try the reported peers.");
authenticateToRemainingReportedPeers();
if (!authenticationHandshakes.containsKey(peerAddress)) {
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress());
SettableFuture<Connection> future = authenticationHandshake.requestAuthentication(remainingAddresses, peerAddress);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
if (connection != null) {
setAuthenticated(connection, peerAddress);
if (continueOnSuccess) {
if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) {
log.info("We still don't have enough connections. Lets try the reported peers.");
authenticateToRemainingReportedPeers();
} else {
log.info("We have already enough connections.");
}
} else {
log.info("We have already enough connections.");
log.info("We have already tried all reported peers and seed nodes. " +
"We stop bootstrapping now, but will repeat after an while.");
UserThread.runAfter(() -> authenticateToRemainingReportedPeers(),
RETRY_FILL_AUTH_PEERS, TimeUnit.MILLISECONDS);
}
} else {
log.info("We have already tried all reported peers and seed nodes. " +
"We stop bootstrapping now, but will repeat after an while.");
UserThread.runAfter(() -> authenticateToRemainingReportedPeers(), 60);
}
}
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("Send RequestAuthenticationMessage to " + peerAddress + " failed." +
"\nThat is expected if seed nodes are offline." +
"\nException:" + throwable.getMessage());
removePeer(peerAddress);
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("Send RequestAuthenticationMessage to " + peerAddress + " failed." +
"\nThat is expected if seed nodes are offline." +
"\nException:" + throwable.getMessage());
removePeer(peerAddress);
// If we fail we try again with the remaining set
remainingAddresses.remove(peerAddress);
// If we fail we try again with the remaining set
remainingAddresses.remove(peerAddress);
log.trace("We try to authenticate to another random seed nodes of that list: " + remainingAddresses);
log.trace("We try to authenticate to another random seed nodes of that list: " + remainingAddresses);
Optional<Tuple2<Address, Set<Address>>> tupleOptional = getRandomItemAndRemainingSet(remainingAddresses);
if (tupleOptional.isPresent()) {
log.info("We try to authenticate to a seed node. " + tupleOptional.get().first);
authenticateToSeedNode(tupleOptional.get().second, tupleOptional.get().first, true);
} else if (reportedPeerAddresses.size() > 0) {
log.info("We don't have any more seed nodes for connecting. Lets try the reported peers.");
authenticateToRemainingReportedPeers();
} else {
log.info("We don't have any more seed nodes or reported nodes for connecting. " +
"We stop bootstrapping now, but will repeat after an while.");
UserThread.runAfter(() -> authenticateToRemainingReportedPeers(), 60);
Optional<Tuple2<Address, Set<Address>>> tupleOptional = getRandomItemAndRemainingSet(remainingAddresses);
if (tupleOptional.isPresent()) {
log.info("We try to authenticate to a seed node. " + tupleOptional.get().first);
authenticateToSeedNode(tupleOptional.get().second, tupleOptional.get().first, true);
} else if (reportedPeerAddresses.size() > 0) {
log.info("We don't have any more seed nodes for connecting. Lets try the reported peers.");
authenticateToRemainingReportedPeers();
} else {
log.info("We don't have any more seed nodes or reported nodes for connecting. " +
"We stop bootstrapping now, but will repeat after an while.");
UserThread.runAfter(() -> authenticateToRemainingReportedPeers(),
RETRY_FILL_AUTH_PEERS, TimeUnit.MILLISECONDS);
}
}
}
});
});
} else {
log.warn("An authentication handshake is already created for that peerAddress ({})", peerAddress);
}
}
private void authenticateToRemainingReportedPeers() {
@ -273,39 +289,42 @@ public class PeerGroup implements MessageListener, ConnectionListener {
Log.traceCall(peerAddress.getFullAddress());
checkArgument(!authenticatedPeers.containsKey(peerAddress),
"We have that peer already authenticated. That must never happen.");
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress());
SettableFuture<Connection> future = authenticationHandshake.requestAuthentication(remainingAddresses, peerAddress);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
if (connection != null) {
setAuthenticated(connection, peerAddress);
if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) {
if (reportedPeerAddresses.size() > 0) {
log.info("We still don't have enough connections. " +
"Lets try the remaining reported peer addresses.");
authenticateToRemainingReportedPeers();
if (!authenticationHandshakes.containsKey(peerAddress)) {
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress());
SettableFuture<Connection> future = authenticationHandshake.requestAuthentication(remainingAddresses, peerAddress);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
if (connection != null) {
setAuthenticated(connection, peerAddress);
if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) {
if (reportedPeerAddresses.size() > 0) {
log.info("We still don't have enough connections. " +
"Lets try the remaining reported peer addresses.");
authenticateToRemainingReportedPeers();
} else {
log.info("We still don't have enough connections. Lets try the remaining seed nodes.");
authenticateToRemainingSeedNodes();
}
} else {
log.info("We still don't have enough connections. Lets try the remaining seed nodes.");
authenticateToRemainingSeedNodes();
log.info("We have already enough connections.");
}
} else {
log.info("We have already enough connections.");
}
}
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.error("AuthenticationHandshake failed. " + throwable.getMessage());
throwable.printStackTrace();
removePeer(peerAddress);
@Override
public void onFailure(@NotNull Throwable throwable) {
log.error("AuthenticationHandshake failed. " + throwable.getMessage());
throwable.printStackTrace();
removePeer(peerAddress);
log.info("Authentication failed. Lets try again with the remaining reported peer addresses.");
authenticateToRemainingReportedPeers();
}
});
log.info("Authentication failed. Lets try again with the remaining reported peer addresses.");
authenticateToRemainingReportedPeers();
}
});
} else {
log.warn("An authentication handshake is already created for that peerAddress ({})", peerAddress);
}
}
private void authenticateToRemainingSeedNodes() {
@ -317,7 +336,8 @@ public class PeerGroup implements MessageListener, ConnectionListener {
} else {
log.info("We don't have any more seed nodes for connecting. " +
"We stop bootstrapping now, but will repeat after an while.");
UserThread.runAfter(() -> authenticateToRemainingReportedPeers(), 60);
UserThread.runAfter(() -> authenticateToRemainingReportedPeers(),
RETRY_FILL_AUTH_PEERS, TimeUnit.MILLISECONDS);
}
}
@ -330,32 +350,37 @@ public class PeerGroup implements MessageListener, ConnectionListener {
Log.traceCall(peerAddress.getFullAddress());
checkArgument(!authenticatedPeers.containsKey(peerAddress),
"We have that seed node already authenticated. That must never happen.");
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress());
SettableFuture<Connection> future = authenticationHandshake.requestAuthenticationToPeer(peerAddress);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
if (connection != null) {
setAuthenticated(connection, peerAddress);
if (authenticationCompleteHandler != null)
authenticationCompleteHandler.run();
if (!authenticationHandshakes.containsKey(peerAddress)) {
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress());
SettableFuture<Connection> future = authenticationHandshake.requestAuthenticationToPeer(peerAddress);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
if (connection != null) {
setAuthenticated(connection, peerAddress);
if (authenticationCompleteHandler != null)
authenticationCompleteHandler.run();
}
}
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.error("AuthenticationHandshake failed. " + throwable.getMessage());
throwable.printStackTrace();
removePeer(peerAddress);
if (faultHandler != null)
faultHandler.run();
}
});
@Override
public void onFailure(@NotNull Throwable throwable) {
log.error("AuthenticationHandshake failed. " + throwable.getMessage());
throwable.printStackTrace();
removePeer(peerAddress);
if (faultHandler != null)
faultHandler.run();
}
});
} else {
log.warn("An authentication handshake is already created for that peerAddress ({})", peerAddress);
}
}
private void setAuthenticated(Connection connection, Address peerAddress) {
Log.traceCall(peerAddress.getFullAddress());
if (!authenticationHandshakes.containsKey(peerAddress))
authenticationHandshakes.remove(peerAddress);
log.info("\n\n############################################################\n" +
"We are authenticated to:" +
"\nconnection=" + connection.getUid()
@ -487,12 +512,12 @@ public class PeerGroup implements MessageListener, ConnectionListener {
});
}, 5, 10));
} else {
log.debug("No peers available for requesting data.");
log.trace("No peers available for requesting data.");
}
}
private void processMaintenanceMessage(MaintenanceMessage message, Connection connection) {
Log.traceCall();
Log.traceCall(message.toString());
log.debug("Received message " + message + " at " + getMyAddress() + " from " + connection.getPeerAddress());
if (message instanceof PingMessage) {
SettableFuture<Connection> future = networkNode.sendMessage(connection, new PongMessage(((PingMessage) message).nonce));
@ -639,6 +664,9 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private void removePeer(@Nullable Address peerAddress) {
Log.traceCall("peerAddress=" + peerAddress);
if (peerAddress != null) {
if (!authenticationHandshakes.containsKey(peerAddress))
authenticationHandshakes.remove(peerAddress);
boolean contained = reportedPeerAddresses.remove(peerAddress);
Peer disconnectedPeer = authenticatedPeers.remove(peerAddress);
if (disconnectedPeer != null)

View file

@ -10,4 +10,11 @@ public abstract class AuthenticationMessage implements Message {
public int networkId() {
return networkId;
}
@Override
public String toString() {
return "AuthenticationMessage{" +
"networkId=" + networkId +
'}';
}
}

View file

@ -20,6 +20,6 @@ public final class AuthenticationRequest extends AuthenticationMessage {
return "AuthenticationRequest{" +
"address=" + address +
", nonce=" + nonce +
'}';
"} " + super.toString();
}
}

View file

@ -23,6 +23,6 @@ public final class AuthenticationResponse extends AuthenticationMessage {
"address=" + address +
", requesterNonce=" + requesterNonce +
", challengerNonce=" + challengerNonce +
'}';
"} " + super.toString();
}
}

View file

@ -25,6 +25,6 @@ public final class GetPeersAuthRequest extends AuthenticationMessage {
"address=" + address +
", challengerNonce=" + challengerNonce +
", peerAddresses=" + peerAddresses +
'}';
"} " + super.toString();
}
}

View file

@ -19,7 +19,9 @@ public final class GetPeersAuthResponse extends AuthenticationMessage {
@Override
public String toString() {
return "GetPeersAuthResponse{" + "peerAddresses=" + peerAddresses + '}';
return "GetPeersAuthResponse{" +
"address=" + address +
", peerAddresses=" + peerAddresses +
"} " + super.toString();
}
}

View file

@ -22,6 +22,6 @@ public final class GetPeersRequest extends MaintenanceMessage {
return "GetPeersRequest{" +
"address=" + address +
", peerAddresses=" + peerAddresses +
'}';
"} " + super.toString();
}
}

View file

@ -18,7 +18,7 @@ public final class GetPeersResponse extends MaintenanceMessage {
@Override
public String toString() {
return "GetPeersResponse{" +
", peerAddresses=" + peerAddresses +
'}';
"peerAddresses=" + peerAddresses +
"} " + super.toString();
}
}

View file

@ -10,4 +10,11 @@ public abstract class MaintenanceMessage implements Message {
public int networkId() {
return networkId;
}
@Override
public String toString() {
return "MaintenanceMessage{" +
"networkId=" + networkId +
'}';
}
}

View file

@ -16,6 +16,6 @@ public final class PingMessage extends MaintenanceMessage {
public String toString() {
return "PingMessage{" +
"nonce=" + nonce +
'}';
"} " + super.toString();
}
}

View file

@ -16,6 +16,6 @@ public final class PongMessage extends MaintenanceMessage {
public String toString() {
return "PongMessage{" +
"nonce=" + nonce +
'}';
"} " + super.toString();
}
}

View file

@ -2,30 +2,33 @@ package io.bitsquare.p2p.seed;
import com.google.common.collect.Sets;
import io.bitsquare.p2p.Address;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set;
import java.util.stream.Collectors;
public class SeedNodesRepository {
private static final Logger log = LoggerFactory.getLogger(SeedNodesRepository.class);
// mainnet use port 8000
// testnet use port 8001
// regtest use port 8002
private Set<Address> torSeedNodeAddresses = Sets.newHashSet(
// mainnet
new Address("lmvdenjkyvx2ovga.onion:8000"),
/*new Address("lmvdenjkyvx2ovga.onion:8000"),
new Address("eo5ay2lyzrfvx2nr.onion:8000"),
new Address("si3uu56adkyqkldl.onion:8000"),
new Address("si3uu56adkyqkldl.onion:8000"),*/
// testnet
new Address("lmvdenjkyvx2ovga.onion:8001"),
new Address("eo5ay2lyzrfvx2nr.onion:8001"),
new Address("si3uu56adkyqkldl.onion:8001"),
new Address("znmy44wcstn2rkva.onion:8001"),
new Address("zvn7umikgxml6x6h.onion:8001"),
new Address("wnfxmrmsyeeos2dy.onion:8001"),
// regtest
new Address("lmvdenjkyvx2ovga.onion:8002"),
new Address("eo5ay2lyzrfvx2nr.onion:8002"),
new Address("si3uu56adkyqkldl.onion:8002")
new Address("rxdkppp3vicnbgqt.onion:8002"),
new Address("brmbf6mf67d2hlm4.onion:8002"),
new Address("mfla72c4igh5ta2t.onion:8002")
);
@ -49,8 +52,10 @@ public class SeedNodesRepository {
public Set<Address> geSeedNodeAddresses(boolean useLocalhost, int networkId) {
String networkIdAsString = String.valueOf(networkId);
Set<Address> addresses = useLocalhost ? localhostSeedNodeAddresses : torSeedNodeAddresses;
return addresses.stream()
Set<Address> filtered = addresses.stream()
.filter(e -> String.valueOf(e.port).endsWith(networkIdAsString)).collect(Collectors.toSet());
log.info("SeedNodeAddresses (useLocalhost={}) for networkId {}:\nnetworkId={}", useLocalhost, networkId, filtered);
return filtered;
}
public void setTorSeedNodeAddresses(Set<Address> torSeedNodeAddresses) {

View file

@ -101,8 +101,8 @@ public class ProtectedExpirableDataStorage implements MessageListener {
@Override
public void onMessage(Message message, Connection connection) {
Log.traceCall("Message=" + message);
if (message instanceof DataBroadcastMessage) {
Log.traceCall(message.toString());
if (connection.isAuthenticated()) {
log.trace("ProtectedExpirableDataMessage received " + message + " on connection " + connection);
if (message instanceof AddDataMessage) {

View file

@ -32,6 +32,6 @@ public final class AddDataMessage extends DataBroadcastMessage {
public String toString() {
return "AddDataMessage{" +
"data=" + data +
'}';
"} " + super.toString();
}
}

View file

@ -10,4 +10,11 @@ public abstract class DataBroadcastMessage implements Message {
public int networkId() {
return networkId;
}
@Override
public String toString() {
return "DataBroadcastMessage{" +
"networkId=" + networkId +
'}';
}
}

View file

@ -16,4 +16,11 @@ public final class GetDataRequest implements Message {
public int networkId() {
return networkId;
}
@Override
public String toString() {
return "GetDataRequest{" +
"networkId=" + networkId +
'}';
}
}

View file

@ -40,8 +40,9 @@ public final class GetDataResponse implements Message {
@Override
public String toString() {
return "AllDataMessage{" +
"set=" + set +
return "GetDataResponse{" +
"networkId=" + networkId +
", set=" + set +
'}';
}
}

View file

@ -33,6 +33,6 @@ public final class RemoveDataMessage extends DataBroadcastMessage {
public String toString() {
return "RemoveDataMessage{" +
"data=" + data +
'}';
"} " + super.toString();
}
}

View file

@ -33,6 +33,6 @@ public final class RemoveMailboxDataMessage extends DataBroadcastMessage {
public String toString() {
return "RemoveMailboxDataMessage{" +
"data=" + data +
'}';
"} " + super.toString();
}
}