Refactor handshake out for data request

This commit is contained in:
Manfred Karrer 2016-01-27 15:08:13 +01:00
parent e90dc52f36
commit cd5ecbb168
9 changed files with 453 additions and 223 deletions

View file

@ -286,7 +286,7 @@ public class Connection implements MessageListener {
shutDown(true, null); shutDown(true, null);
} }
private void shutDown(boolean sendCloseConnectionMessage) { public void shutDown(boolean sendCloseConnectionMessage) {
shutDown(sendCloseConnectionMessage, null); shutDown(sendCloseConnectionMessage, null);
} }
@ -557,6 +557,11 @@ public class Connection implements MessageListener {
public void stop() { public void stop() {
Log.traceCall(); Log.traceCall();
stopped = true; stopped = true;
try {
objectInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
} }
@Override @Override

View file

@ -32,7 +32,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
private final PeerManager peerManager; private final PeerManager peerManager;
private final Set<NodeAddress> seedNodeAddresses; private final Set<NodeAddress> seedNodeAddresses;
private final ScheduledThreadPoolExecutor executor; private final ScheduledThreadPoolExecutor executor;
private Timer continueWithMorePeersTimer, timeoutTimer, maintainConnectionsTimer; private Timer connectToMorePeersTimer, timeoutTimer, maintainConnectionsTimer;
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -42,6 +42,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
public PeerExchangeManager(NetworkNode networkNode, PeerManager peerManager, Set<NodeAddress> seedNodeAddresses) { public PeerExchangeManager(NetworkNode networkNode, PeerManager peerManager, Set<NodeAddress> seedNodeAddresses) {
this.networkNode = networkNode; this.networkNode = networkNode;
this.peerManager = peerManager; this.peerManager = peerManager;
checkArgument(!seedNodeAddresses.isEmpty(), "seedNodeAddresses must not be empty");
this.seedNodeAddresses = new HashSet<>(seedNodeAddresses); this.seedNodeAddresses = new HashSet<>(seedNodeAddresses);
executor = Utilities.getScheduledThreadPoolExecutor("PeerExchangeManager", 1, 10, 5); executor = Utilities.getScheduledThreadPoolExecutor("PeerExchangeManager", 1, 10, 5);
@ -52,7 +53,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
Log.traceCall(); Log.traceCall();
networkNode.removeMessageListener(this); networkNode.removeMessageListener(this);
stopContinueWithMorePeersTimer(); stopConnectToMorePeersTimer();
stopMaintainConnectionsTimer(); stopMaintainConnectionsTimer();
stopTimeoutTimer(); stopTimeoutTimer();
MoreExecutors.shutdownAndAwaitTermination(executor, 500, TimeUnit.MILLISECONDS); MoreExecutors.shutdownAndAwaitTermination(executor, 500, TimeUnit.MILLISECONDS);
@ -64,7 +65,9 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public void requestReportedPeers(NodeAddress nodeAddress) { public void requestReportedPeers(NodeAddress nodeAddress) {
requestReportedPeers(nodeAddress, new ArrayList<>(seedNodeAddresses)); ArrayList<NodeAddress> remainingNodeAddresses = new ArrayList<>(seedNodeAddresses);
remainingNodeAddresses.remove(nodeAddress);
requestReportedPeers(nodeAddress, remainingNodeAddresses);
long delay = new Random().nextInt(60) + 60 * 3; // 3-4 min. long delay = new Random().nextInt(60) + 60 * 3; // 3-4 min.
executor.scheduleAtFixedRate(() -> UserThread.execute(this::maintainConnections), executor.scheduleAtFixedRate(() -> UserThread.execute(this::maintainConnections),
@ -106,7 +109,11 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
Log.traceCall(message.toString()); Log.traceCall(message.toString());
if (message instanceof GetPeersRequest) { if (message instanceof GetPeersRequest) {
HashSet<ReportedPeer> reportedPeers = ((GetPeersRequest) message).reportedPeers; HashSet<ReportedPeer> reportedPeers = ((GetPeersRequest) message).reportedPeers;
log.trace("Received reported peers: " + reportedPeers);
StringBuilder result = new StringBuilder("Received peers:");
reportedPeers.stream().forEach(e -> result.append("\n").append(e));
log.trace(result.toString());
checkArgument(connection.getPeersNodeAddressOptional().isPresent(), checkArgument(connection.getPeersNodeAddressOptional().isPresent(),
"The peers address must have been already set at the moment"); "The peers address must have been already set at the moment");
SettableFuture<Connection> future = networkNode.sendMessage(connection, SettableFuture<Connection> future = networkNode.sendMessage(connection,
@ -126,11 +133,16 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
peerManager.addToReportedPeers(reportedPeers, connection); peerManager.addToReportedPeers(reportedPeers, connection);
} else if (message instanceof GetPeersResponse) { } else if (message instanceof GetPeersResponse) {
stopTimeoutTimer(); stopTimeoutTimer();
HashSet<ReportedPeer> reportedPeers = ((GetPeersResponse) message).reportedPeers; HashSet<ReportedPeer> reportedPeers = ((GetPeersResponse) message).reportedPeers;
log.trace("Received reported peers: " + reportedPeers);
StringBuilder result = new StringBuilder("Received peers:");
reportedPeers.stream().forEach(e -> result.append("\n").append(e));
log.trace(result.toString());
peerManager.addToReportedPeers(reportedPeers, connection); peerManager.addToReportedPeers(reportedPeers, connection);
continueWithMorePeers(); connectToMorePeers();
} }
} }
} }
@ -144,14 +156,14 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses); Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses);
checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at requestReportedPeers"); checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at requestReportedPeers");
stopContinueWithMorePeersTimer(); stopConnectToMorePeersTimer();
stopTimeoutTimer(); stopTimeoutTimer();
timeoutTimer = UserThread.runAfter(() -> { timeoutTimer = UserThread.runAfter(() -> {
log.info("timeoutTimer called"); log.info("timeoutTimer called");
handleError(nodeAddress, remainingNodeAddresses); handleError(nodeAddress, remainingNodeAddresses);
}, },
10, TimeUnit.SECONDS); 20, TimeUnit.SECONDS);
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress,
new GetPeersRequest(networkNode.getNodeAddress(), getReportedPeersHashSet(nodeAddress))); new GetPeersRequest(networkNode.getNodeAddress(), getReportedPeersHashSet(nodeAddress)));
@ -171,35 +183,11 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
}); });
} }
private void handleError(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) { private void connectToMorePeers() {
Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses);
stopTimeoutTimer();
if (!remainingNodeAddresses.isEmpty()) {
log.info("There are remaining nodes available for requesting peers. " +
"We will try getReportedPeers again.");
requestReportedPeersFromList(remainingNodeAddresses);
} else {
log.info("There is no remaining node available for requesting peers. " +
"That is expected if no other node is online.\n" +
"We will try to use reported peers (if no available we use persisted peers) " +
"and try again to request peers from our seed nodes after a random pause.");
if (continueWithMorePeersTimer == null)
continueWithMorePeersTimer = UserThread.runAfter(this::continueWithMorePeers,
30, TimeUnit.SECONDS);
}
}
private void requestReportedPeersFromList(List<NodeAddress> remainingNodeAddresses) {
NodeAddress nextCandidate = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size()));
remainingNodeAddresses.remove(nextCandidate);
requestReportedPeers(nextCandidate, remainingNodeAddresses);
}
private void continueWithMorePeers() {
Log.traceCall(); Log.traceCall();
stopContinueWithMorePeersTimer();
stopConnectToMorePeersTimer();
if (!peerManager.hasSufficientConnections()) { if (!peerManager.hasSufficientConnections()) {
// We want to keep it sorted but avoid duplicates // We want to keep it sorted but avoid duplicates
List<NodeAddress> list = new ArrayList<>(getFilteredAndSortedList(peerManager.getReportedPeers(), new ArrayList<>())); List<NodeAddress> list = new ArrayList<>(getFilteredAndSortedList(peerManager.getReportedPeers(), new ArrayList<>()));
@ -222,6 +210,67 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
} }
} }
private void handleError(NodeAddress peersNodeAddress, List<NodeAddress> remainingNodeAddresses) {
Log.traceCall("peersNodeAddress=" + peersNodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses);
stopTimeoutTimer();
// In case a shutdown was not triggered already by the error we close that connection
// if it is not a DIRECT_MSG_PEER
peerManager.shutDownConnection(peersNodeAddress);
if (!remainingNodeAddresses.isEmpty()) {
log.info("There are remaining nodes available for requesting peers. " +
"We will try getReportedPeers again.");
requestReportedPeersFromRandomPeer(remainingNodeAddresses);
} else {
log.info("There is no remaining node available for requesting peers. " +
"That is expected if no other node is online.\n" +
"We will try to use reported peers (if no available we use persisted peers) " +
"and try again to request peers from our seed nodes after a random pause.");
if (connectToMorePeersTimer == null)
connectToMorePeersTimer = UserThread.runAfterRandomDelay(this::connectToMorePeers, 20, 30);
}
}
// we check if we have at least one seed node connected
private void maintainConnections() {
Log.traceCall();
stopMaintainConnectionsTimer();
// we want at least 1 seed node connected
Set<Connection> confirmedConnections = networkNode.getConfirmedConnections();
long numberOfConnectedSeedNodes = confirmedConnections.stream()
.filter(peerManager::isSeedNode)
.count();
if (numberOfConnectedSeedNodes == 0)
requestReportedPeersFromRandomPeer(new ArrayList<>(seedNodeAddresses));
// We try to get sufficient connections by connecting to reported and persisted peers
if (numberOfConnectedSeedNodes == 0) {
// If we requested a seed node we delay a bit to not have too many requests simultaneously
if (connectToMorePeersTimer == null)
connectToMorePeersTimer = UserThread.runAfter(this::connectToMorePeers, 10);
} else {
connectToMorePeers();
}
// Use all outbound connections for updating reported peers and make sure we keep the connection alive
// Inbound connections should be maintained be the requesting peer
confirmedConnections.stream()
.filter(c -> c.getPeersNodeAddressOptional().isPresent() &&
c instanceof OutboundConnection).
forEach(c -> UserThread.runAfterRandomDelay(() ->
requestReportedPeers(c.getPeersNodeAddressOptional().get(), new ArrayList<>())
, 3, 5));
}
///////////////////////////////////////////////////////////////////////////////////////////
// Utils
///////////////////////////////////////////////////////////////////////////////////////////
// sorted by most recent lastActivityDate // sorted by most recent lastActivityDate
private List<NodeAddress> getFilteredAndSortedList(Set<ReportedPeer> set, List<NodeAddress> list) { private List<NodeAddress> getFilteredAndSortedList(Set<ReportedPeer> set, List<NodeAddress> list) {
return set.stream() return set.stream()
@ -236,34 +285,10 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
private void requestReportedPeersFromRandomPeer(List<NodeAddress> remainingNodeAddresses) {
// we check if we have at least one seed node connected NodeAddress nextCandidate = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size()));
private void maintainConnections() { remainingNodeAddresses.remove(nextCandidate);
Log.traceCall(); requestReportedPeers(nextCandidate, remainingNodeAddresses);
stopMaintainConnectionsTimer();
// we want at least 1 seed node connected
Set<Connection> allConnections = networkNode.getConfirmedConnections();
List<Connection> connectedSeedNodes = allConnections.stream()
.filter(peerManager::isSeedNode)
.collect(Collectors.toList());
if (connectedSeedNodes.size() == 0 && !seedNodeAddresses.isEmpty())
requestReportedPeersFromList(new ArrayList<>(seedNodeAddresses));
// We try to get sufficient connections by using reported and persisted peers
if (continueWithMorePeersTimer == null)
continueWithMorePeersTimer = UserThread.runAfterRandomDelay(this::continueWithMorePeers, 10, 20);
// Use all outbound connections for updating reported peers and make sure we keep the connection alive
// Inbound connections should be maintained be the requesting peer
networkNode.getConfirmedConnections().stream()
.filter(c -> c.getPeersNodeAddressOptional().isPresent() &&
c instanceof OutboundConnection).
forEach(c -> UserThread.runAfterRandomDelay(() ->
requestReportedPeers(c.getPeersNodeAddressOptional().get(), new ArrayList<>())
, 3, 5));
} }
private HashSet<ReportedPeer> getReportedPeersHashSet(NodeAddress receiverNodeAddress) { private HashSet<ReportedPeer> getReportedPeersHashSet(NodeAddress receiverNodeAddress) {
@ -275,10 +300,10 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
.collect(Collectors.toSet())); .collect(Collectors.toSet()));
} }
private void stopContinueWithMorePeersTimer() { private void stopConnectToMorePeersTimer() {
if (continueWithMorePeersTimer != null) { if (connectToMorePeersTimer != null) {
continueWithMorePeersTimer.cancel(); connectToMorePeersTimer.cancel();
continueWithMorePeersTimer = null; connectToMorePeersTimer = null;
} }
} }

View file

@ -5,7 +5,7 @@ import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Message; import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.*; import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.peers.messages.data.DataRequest; import io.bitsquare.p2p.peers.messages.data.UpdateDataRequest;
import io.bitsquare.storage.Storage; import io.bitsquare.storage.Storage;
import javafx.beans.value.ChangeListener; import javafx.beans.value.ChangeListener;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
@ -135,7 +135,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
public void onMessage(Message message, Connection connection) { public void onMessage(Message message, Connection connection) {
// In case a seed node connects to another seed node we get his address at the DataRequest triggered from // In case a seed node connects to another seed node we get his address at the DataRequest triggered from
// RequestDataManager.updateDataFromConnectedSeedNode // RequestDataManager.updateDataFromConnectedSeedNode
if (message instanceof DataRequest) { if (message instanceof UpdateDataRequest) {
Optional<NodeAddress> peersNodeAddressOptional = connection.getPeersNodeAddressOptional(); Optional<NodeAddress> peersNodeAddressOptional = connection.getPeersNodeAddressOptional();
if (peersNodeAddressOptional.isPresent() && if (peersNodeAddressOptional.isPresent() &&
seedNodeAddresses.contains(peersNodeAddressOptional.get())) seedNodeAddresses.contains(peersNodeAddressOptional.get()))
@ -251,8 +251,9 @@ public class PeerManager implements ConnectionListener, MessageListener {
.collect(Collectors.toMap(e -> e, Function.identity())); .collect(Collectors.toMap(e -> e, Function.identity()));
HashSet<ReportedPeer> adjustedReportedPeers = new HashSet<>(); HashSet<ReportedPeer> adjustedReportedPeers = new HashSet<>();
reportedPeersToAdd.stream() reportedPeersToAdd.stream()
.filter(e -> !e.nodeAddress.equals(networkNode.getNodeAddress())) .filter(e -> e.nodeAddress != null &&
.filter(e -> !getConnectedPeers().contains(e)) !e.nodeAddress.equals(networkNode.getNodeAddress()) &&
!getConnectedPeers().contains(e))
.forEach(e -> { .forEach(e -> {
if (reportedPeersMap.containsKey(e)) { if (reportedPeersMap.containsKey(e)) {
if (e.lastActivityDate != null && reportedPeersMap.get(e).lastActivityDate != null) { if (e.lastActivityDate != null && reportedPeersMap.get(e).lastActivityDate != null) {
@ -377,6 +378,19 @@ public class PeerManager implements ConnectionListener, MessageListener {
return networkNode.getNodeAddressesOfConfirmedConnections().contains(nodeAddress); return networkNode.getNodeAddressesOfConfirmedConnections().contains(nodeAddress);
} }
public void shutDownConnection(Connection connection) {
if (connection.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER)
connection.shutDown();
}
public void shutDownConnection(NodeAddress peersNodeAddress) {
networkNode.getAllConnections().stream()
.filter(connection -> connection.getPeersNodeAddressOptional().isPresent() &&
connection.getPeersNodeAddressOptional().get().equals(peersNodeAddress) &&
connection.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER)
.findFirst()
.ifPresent(connection -> connection.shutDown(true));
}
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Private // Private

View file

@ -0,0 +1,183 @@
package io.bitsquare.p2p.peers;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.app.Log;
import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.peers.messages.data.DataRequest;
import io.bitsquare.p2p.peers.messages.data.DataResponse;
import io.bitsquare.p2p.peers.messages.data.PreliminaryDataRequest;
import io.bitsquare.p2p.peers.messages.data.UpdateDataRequest;
import io.bitsquare.p2p.storage.P2PDataStorage;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Random;
import java.util.Timer;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
public class RequestDataHandshake implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(RequestDataHandshake.class);
///////////////////////////////////////////////////////////////////////////////////////////
// Listener
///////////////////////////////////////////////////////////////////////////////////////////
public interface Listener {
void onComplete();
void onFault(String errorMessage);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Class fields
///////////////////////////////////////////////////////////////////////////////////////////
private final NetworkNode networkNode;
private final P2PDataStorage dataStorage;
private final PeerManager peerManager;
private final Listener listener;
private Timer timeoutTimer;
private long requestNonce;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public RequestDataHandshake(NetworkNode networkNode, P2PDataStorage dataStorage, PeerManager peerManager,
Listener listener) {
this.networkNode = networkNode;
this.dataStorage = dataStorage;
this.peerManager = peerManager;
this.listener = listener;
networkNode.addMessageListener(this);
}
public void shutDown() {
Log.traceCall();
networkNode.removeMessageListener(this);
stopTimeoutTimer();
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void requestData(NodeAddress nodeAddress) {
Log.traceCall("nodeAddress=" + nodeAddress);
stopTimeoutTimer();
checkArgument(timeoutTimer == null, "requestData must not be called twice.");
timeoutTimer = UserThread.runAfter(() -> {
log.info("timeoutTimer called");
peerManager.shutDownConnection(nodeAddress);
shutDown();
listener.onFault("A timeout occurred");
},
10, TimeUnit.SECONDS);
Message dataRequest;
requestNonce = new Random().nextLong();
if (networkNode.getNodeAddress() == null)
dataRequest = new PreliminaryDataRequest(requestNonce);
else
dataRequest = new UpdateDataRequest(networkNode.getNodeAddress(), requestNonce);
log.info("We send a {} to peer {}. ", dataRequest.getClass().getSimpleName(), nodeAddress);
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, dataRequest);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
log.trace("Send DataRequest to " + nodeAddress + " succeeded.");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
String errorMessage = "Sending " + dataRequest.getClass().getSimpleName() + " to " + nodeAddress +
" failed. That is expected if the peer is offline. " +
"Exception:" + throwable.getMessage();
log.info(errorMessage);
peerManager.shutDownConnection(nodeAddress);
shutDown();
listener.onFault(errorMessage);
}
});
}
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onMessage(Message message, Connection connection) {
if (message instanceof DataRequest) {
Log.traceCall(message.toString());
DataRequest dataRequest = (DataRequest) message;
DataResponse dataResponse = new DataResponse(new HashSet<>(dataStorage.getMap().values()), dataRequest.getNonce());
SettableFuture<Connection> future = networkNode.sendMessage(connection, dataResponse);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("Send DataResponse to {} succeeded. dataResponse={}",
connection.getPeersNodeAddressOptional(), dataResponse);
shutDown();
listener.onComplete();
}
@Override
public void onFailure(@NotNull Throwable throwable) {
String errorMessage = "Send DataResponse to " + connection.getPeersNodeAddressOptional() + " failed. " +
"That is expected if the peer went offline. " +
"Exception:" + throwable.getMessage();
log.info(errorMessage);
peerManager.shutDownConnection(connection);
shutDown();
listener.onFault(errorMessage);
}
});
} else if (message instanceof DataResponse) {
DataResponse dataResponse = (DataResponse) message;
if (dataResponse.requestNonce == requestNonce) {
Log.traceCall(message.toString());
stopTimeoutTimer();
// connection.getPeersNodeAddressOptional() is not present at the first call
log.debug("connection.getPeersNodeAddressOptional() " + connection.getPeersNodeAddressOptional());
connection.getPeersNodeAddressOptional().ifPresent(peersNodeAddress -> {
((DataResponse) message).dataSet.stream()
.forEach(e -> dataStorage.add(e, peersNodeAddress));
});
shutDown();
listener.onComplete();
}
}
}
private void stopTimeoutTimer() {
if (timeoutTimer != null) {
timeoutTimer.cancel();
timeoutTimer = null;
}
}
}

View file

@ -1,8 +1,5 @@
package io.bitsquare.p2p.peers; package io.bitsquare.p2p.peers;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.app.Log; import io.bitsquare.app.Log;
import io.bitsquare.common.UserThread; import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Message; import io.bitsquare.p2p.Message;
@ -11,11 +8,7 @@ import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.MessageListener; import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode; import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.peers.messages.data.DataRequest; import io.bitsquare.p2p.peers.messages.data.DataRequest;
import io.bitsquare.p2p.peers.messages.data.DataResponse;
import io.bitsquare.p2p.peers.messages.data.PreliminaryDataRequest;
import io.bitsquare.p2p.storage.P2PDataStorage; import io.bitsquare.p2p.storage.P2PDataStorage;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -55,8 +48,10 @@ public class RequestDataManager implements MessageListener {
private final Collection<NodeAddress> seedNodeAddresses; private final Collection<NodeAddress> seedNodeAddresses;
private final Listener listener; private final Listener listener;
private final Map<NodeAddress, RequestDataHandshake> requestDataHandshakeMap = new HashMap<>();
private Optional<NodeAddress> nodeOfPreliminaryDataRequest = Optional.empty(); private Optional<NodeAddress> nodeOfPreliminaryDataRequest = Optional.empty();
private Timer requestDataAfterDelayTimer, timeoutTimer; private Timer requestDataTimer;
private boolean dataUpdateRequested; private boolean dataUpdateRequested;
@ -69,6 +64,7 @@ public class RequestDataManager implements MessageListener {
this.networkNode = networkNode; this.networkNode = networkNode;
this.dataStorage = dataStorage; this.dataStorage = dataStorage;
this.peerManager = peerManager; this.peerManager = peerManager;
checkArgument(!seedNodeAddresses.isEmpty(), "seedNodeAddresses must not be empty.");
this.seedNodeAddresses = new HashSet<>(seedNodeAddresses); this.seedNodeAddresses = new HashSet<>(seedNodeAddresses);
this.listener = listener; this.listener = listener;
@ -78,10 +74,11 @@ public class RequestDataManager implements MessageListener {
public void shutDown() { public void shutDown() {
Log.traceCall(); Log.traceCall();
stopRequestDataTimer();
networkNode.removeMessageListener(this); networkNode.removeMessageListener(this);
stopRequestDataTimer(); requestDataHandshakeMap.values().stream().forEach(RequestDataHandshake::shutDown);
stopTimeoutTimer();
} }
@ -91,8 +88,7 @@ public class RequestDataManager implements MessageListener {
public void requestPreliminaryData() { public void requestPreliminaryData() {
Log.traceCall(); Log.traceCall();
checkArgument(!seedNodeAddresses.isEmpty(), "seedNodeAddresses must not be empty."); requestDataFromRandomPeer(new ArrayList<>(seedNodeAddresses));
requestDataFromList(new ArrayList<>(seedNodeAddresses));
} }
public void requestUpdatesData() { public void requestUpdatesData() {
@ -116,30 +112,24 @@ public class RequestDataManager implements MessageListener {
@Override @Override
public void onMessage(Message message, Connection connection) { public void onMessage(Message message, Connection connection) {
if (message instanceof PreliminaryDataRequest || message instanceof DataRequest) { if (message instanceof DataRequest) {
Log.traceCall(message.toString()); Log.traceCall(message.toString());
networkNode.sendMessage(connection, new DataResponse(new HashSet<>(dataStorage.getMap().values()))); log.trace("Received {} at {}", message.getClass().getSimpleName(), connection);
} else if (message instanceof DataResponse) { RequestDataHandshake requestDataHandshake = new RequestDataHandshake(networkNode, dataStorage, peerManager,
Log.traceCall(message.toString()); new RequestDataHandshake.Listener() {
stopTimeoutTimer(); @Override
connection.getPeersNodeAddressOptional().ifPresent(peersNodeAddress -> { public void onComplete() {
((DataResponse) message).dataSet.stream() log.trace("RequestDataHandshake of inbound connection complete. Connection= {}",
.forEach(e -> dataStorage.add(e, peersNodeAddress)); connection);
}
// 1. We get a response from requestPreliminaryData @Override
if (!nodeOfPreliminaryDataRequest.isPresent()) { public void onFault(String errorMessage) {
nodeOfPreliminaryDataRequest = Optional.of(peersNodeAddress); log.info("RequestDataHandshake of inbound connection failed. Connection= {}",
listener.onPreliminaryDataReceived(); errorMessage, connection);
} }
});
// 2. Later we get a response from requestUpdatesData requestDataHandshake.onMessage(message, connection);
if (dataUpdateRequested) {
dataUpdateRequested = false;
listener.onUpdatedDataReceived();
}
listener.onDataReceived();
});
} }
} }
@ -148,7 +138,7 @@ public class RequestDataManager implements MessageListener {
// Private // Private
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private void requestDataFromList(List<NodeAddress> nodeAddresses) { private void requestDataFromRandomPeer(List<NodeAddress> nodeAddresses) {
Log.traceCall("remainingNodeAddresses=" + nodeAddresses); Log.traceCall("remainingNodeAddresses=" + nodeAddresses);
NodeAddress nextCandidate = nodeAddresses.get(new Random().nextInt(nodeAddresses.size())); NodeAddress nextCandidate = nodeAddresses.get(new Random().nextInt(nodeAddresses.size()));
nodeAddresses.remove(nextCandidate); nodeAddresses.remove(nextCandidate);
@ -157,83 +147,79 @@ public class RequestDataManager implements MessageListener {
private void requestData(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) { private void requestData(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) {
Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses); Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses);
log.info("We try to send a DataRequest request to peer. " + nodeAddress);
stopTimeoutTimer(); if (!requestDataHandshakeMap.containsKey(nodeAddress)) {
stopRequestDataTimer(); RequestDataHandshake requestDataHandshake = new RequestDataHandshake(networkNode, dataStorage, peerManager,
new RequestDataHandshake.Listener() {
@Override
public void onComplete() {
stopRequestDataTimer();
timeoutTimer = UserThread.runAfter(() -> { // need to remove before listeners are notified as they cause the update call
log.info("timeoutTimer called"); requestDataHandshakeMap.remove(nodeAddress);
handleError(nodeAddress, remainingNodeAddresses);
},
10, TimeUnit.SECONDS);
Message dataRequest; // 1. We get a response from requestPreliminaryData
if (networkNode.getNodeAddress() == null) if (!nodeOfPreliminaryDataRequest.isPresent()) {
dataRequest = new PreliminaryDataRequest(); nodeOfPreliminaryDataRequest = Optional.of(nodeAddress);
else listener.onPreliminaryDataReceived();
dataRequest = new DataRequest(networkNode.getNodeAddress()); }
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, dataRequest); // 2. Later we get a response from requestUpdatesData
Futures.addCallback(future, new FutureCallback<Connection>() { if (dataUpdateRequested) {
@Override dataUpdateRequested = false;
public void onSuccess(@Nullable Connection connection) { listener.onUpdatedDataReceived();
log.trace("Send DataRequest to " + nodeAddress + " succeeded."); }
}
@Override listener.onDataReceived();
public void onFailure(@NotNull Throwable throwable) { }
log.info("Send DataRequest to " + nodeAddress + " failed. " +
"That is expected if the peer is offline. " +
"Exception:" + throwable.getMessage());
handleError(nodeAddress, remainingNodeAddresses); @Override
} public void onFault(String errorMessage) {
}); if (!remainingNodeAddresses.isEmpty()) {
} log.info("There are remaining nodes available for requesting data. " +
"We will try requestDataFromPeers again.");
private void handleError(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) { requestDataFromRandomPeer(remainingNodeAddresses);
Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses);
stopTimeoutTimer();
if (!remainingNodeAddresses.isEmpty()) {
log.info("There are remaining nodes available for requesting data. " +
"We will try requestDataFromPeers again.");
requestDataFromList(remainingNodeAddresses);
} else {
log.info("There is no remaining node available for requesting data. " +
"That is expected if no other node is online.\n" +
"We will try to use reported peers (if no available we use persisted peers) " +
"and try again to request data from our seed nodes after a random pause.");
if (peerManager.isSeedNode(nodeAddress))
listener.onNoSeedNodeAvailable();
else
listener.onNoPeersAvailable();
requestDataAfterDelayTimer = UserThread.runAfterRandomDelay(() -> {
log.trace("requestDataAfterDelayTimer called");
if (!seedNodeAddresses.isEmpty()) {
Set<NodeAddress> nodeAddressesOfConfirmedConnections = networkNode.getNodeAddressesOfConfirmedConnections();
// We want to keep it sorted but avoid duplicates
// We don't filter out already established connections for seed nodes as it might be that
// we got from the other seed node contacted but we still have not requested the initial
// data set
List<NodeAddress> list = new ArrayList<>(seedNodeAddresses);
list.addAll(getFilteredAndSortedList(peerManager.getReportedPeers(), list));
list.addAll(getFilteredAndSortedList(peerManager.getPersistedPeers(), list));
log.trace("Sorted and filtered list: list=" + list);
if (!list.isEmpty()) {
NodeAddress nextCandidate = list.get(0);
list.remove(nextCandidate);
requestData(nextCandidate, list);
} else { } else {
log.info("Neither seed nodes, reported peers nor persisted peers are available. " + log.info("There is no remaining node available for requesting data. " +
"At least seed nodes should be always available."); "That is expected if no other node is online.\n" +
"We will try to use reported peers (if no available we use persisted peers) " +
"and try again to request data from our seed nodes after a random pause.");
// try again after a pause
stopRequestDataTimer();
requestDataTimer = UserThread.runAfterRandomDelay(() -> {
log.trace("requestDataAfterDelayTimer called");
// We want to keep it sorted but avoid duplicates
// We don't filter out already established connections for seed nodes as it might be that
// we got from the other seed node contacted but we still have not requested the initial
// data set
List<NodeAddress> list = new ArrayList<>(seedNodeAddresses);
list.addAll(getFilteredAndSortedList(peerManager.getReportedPeers(), list));
list.addAll(getFilteredAndSortedList(peerManager.getPersistedPeers(), list));
log.trace("Sorted and filtered list: list=" + list);
checkArgument(!list.isEmpty(), "seedNodeAddresses must not be empty.");
NodeAddress nextCandidate = list.get(0);
list.remove(nextCandidate);
requestData(nextCandidate, list);
},
10, 15, TimeUnit.SECONDS);
}
requestDataHandshakeMap.remove(nodeAddress);
// Notify listeners
if (!nodeOfPreliminaryDataRequest.isPresent()) {
if (peerManager.isSeedNode(nodeAddress))
listener.onNoSeedNodeAvailable();
else
listener.onNoPeersAvailable();
} }
} }
}, });
10, 15, TimeUnit.SECONDS); requestDataHandshakeMap.put(nodeAddress, requestDataHandshake);
requestDataHandshake.requestData(nodeAddress);
} else {
log.warn("We have started already a DataRequest request to peer. " + nodeAddress);
} }
} }
@ -242,7 +228,7 @@ public class RequestDataManager implements MessageListener {
return set.stream() return set.stream()
.filter(e -> !list.contains(e.nodeAddress) && .filter(e -> !list.contains(e.nodeAddress) &&
!peerManager.isSeedNode(e) && !peerManager.isSeedNode(e) &&
!peerManager.isSelf(e.nodeAddress)) !peerManager.isSelf(e))
.collect(Collectors.toList()) .collect(Collectors.toList())
.stream() .stream()
.sorted((o1, o2) -> o2.lastActivityDate.compareTo(o1.lastActivityDate)) .sorted((o1, o2) -> o2.lastActivityDate.compareTo(o1.lastActivityDate))
@ -251,16 +237,9 @@ public class RequestDataManager implements MessageListener {
} }
private void stopRequestDataTimer() { private void stopRequestDataTimer() {
if (requestDataAfterDelayTimer != null) { if (requestDataTimer != null) {
requestDataAfterDelayTimer.cancel(); requestDataTimer.cancel();
requestDataAfterDelayTimer = null; requestDataTimer = null;
}
}
private void stopTimeoutTimer() {
if (timeoutTimer != null) {
timeoutTimer.cancel();
timeoutTimer = null;
} }
} }
} }

View file

@ -1,36 +1,5 @@
package io.bitsquare.p2p.peers.messages.data; package io.bitsquare.p2p.peers.messages.data;
import io.bitsquare.app.Version; public interface DataRequest {
import io.bitsquare.p2p.NodeAddress; long getNonce();
import io.bitsquare.p2p.network.messages.SendersNodeAddressMessage;
public final class DataRequest implements SendersNodeAddressMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
private final int networkId = Version.getNetworkId();
private final NodeAddress senderNodeAddress;
public DataRequest(NodeAddress senderNodeAddress) {
this.senderNodeAddress = senderNodeAddress;
}
@Override
public NodeAddress getSenderNodeAddress() {
return senderNodeAddress;
}
@Override
public int networkId() {
return networkId;
}
@Override
public String toString() {
return "DataRequest{" +
"senderNodeAddress=" + senderNodeAddress +
", networkId=" + networkId +
'}';
}
} }

View file

@ -12,9 +12,11 @@ public final class DataResponse implements Message {
private final int networkId = Version.getNetworkId(); private final int networkId = Version.getNetworkId();
public final HashSet<ProtectedData> dataSet; public final HashSet<ProtectedData> dataSet;
public final long requestNonce;
public DataResponse(HashSet<ProtectedData> dataSet) { public DataResponse(HashSet<ProtectedData> dataSet, long requestNonce) {
this.dataSet = dataSet; this.dataSet = dataSet;
this.requestNonce = requestNonce;
} }
@Override @Override
@ -43,6 +45,7 @@ public final class DataResponse implements Message {
return "DataResponse{" + return "DataResponse{" +
"networkId=" + networkId + "networkId=" + networkId +
", dataSet=" + dataSet + ", dataSet=" + dataSet +
", requestNonce=" + requestNonce +
'}'; '}';
} }
} }

View file

@ -3,13 +3,20 @@ package io.bitsquare.p2p.peers.messages.data;
import io.bitsquare.app.Version; import io.bitsquare.app.Version;
import io.bitsquare.p2p.network.messages.AnonymousMessage; import io.bitsquare.p2p.network.messages.AnonymousMessage;
public final class PreliminaryDataRequest implements AnonymousMessage { public final class PreliminaryDataRequest implements AnonymousMessage, DataRequest {
// That object is sent over the wire, so we need to take care of version compatibility. // That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
private final int networkId = Version.getNetworkId(); private final int networkId = Version.getNetworkId();
private final long nonce;
public PreliminaryDataRequest() { public PreliminaryDataRequest(long nonce) {
this.nonce = nonce;
}
@Override
public long getNonce() {
return nonce;
} }
@Override @Override
@ -21,6 +28,7 @@ public final class PreliminaryDataRequest implements AnonymousMessage {
public String toString() { public String toString() {
return "PreliminaryDataRequest{" + return "PreliminaryDataRequest{" +
"networkId=" + networkId + "networkId=" + networkId +
", nonce=" + nonce +
'}'; '}';
} }
} }

View file

@ -0,0 +1,44 @@
package io.bitsquare.p2p.peers.messages.data;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.messages.SendersNodeAddressMessage;
public final class UpdateDataRequest implements SendersNodeAddressMessage, DataRequest {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
private final int networkId = Version.getNetworkId();
private final NodeAddress senderNodeAddress;
private final long nonce;
public UpdateDataRequest(NodeAddress senderNodeAddress, long nonce) {
this.senderNodeAddress = senderNodeAddress;
this.nonce = nonce;
}
@Override
public long getNonce() {
return nonce;
}
@Override
public NodeAddress getSenderNodeAddress() {
return senderNodeAddress;
}
@Override
public int networkId() {
return networkId;
}
@Override
public String toString() {
return "DataRequest{" +
"senderNodeAddress=" + senderNodeAddress +
", networkId=" + networkId +
", nonce=" + nonce +
'}';
}
}