Remove unused broadcastQueue, move methods for ap. check to connection

This commit is contained in:
Manfred Karrer 2016-07-23 16:31:01 +02:00
parent 7d2e2c73d9
commit 780f0233e9

View File

@ -10,17 +10,13 @@ import io.bitsquare.common.util.Utilities;
import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.NetworkNode; import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.storage.messages.AddDataMessage;
import io.bitsquare.p2p.storage.messages.BroadcastMessage; import io.bitsquare.p2p.storage.messages.BroadcastMessage;
import io.bitsquare.p2p.storage.payload.CapabilityRequiringPayload;
import io.bitsquare.p2p.storage.payload.StoragePayload;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.*; import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -70,7 +66,6 @@ public class BroadcastHandler implements PeerManager.Listener {
private Listener listener; private Listener listener;
private int numOfPeers; private int numOfPeers;
private Timer timeoutTimer; private Timer timeoutTimer;
private Set<String> broadcastQueue = new CopyOnWriteArraySet<>();
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -127,9 +122,7 @@ public class BroadcastHandler implements PeerManager.Listener {
"numOfPeers=" + numOfPeers + "\n\t" + "numOfPeers=" + numOfPeers + "\n\t" +
"numOfCompletedBroadcasts=" + numOfCompletedBroadcasts + "\n\t" + "numOfCompletedBroadcasts=" + numOfCompletedBroadcasts + "\n\t" +
"numOfCompletedBroadcasts=" + numOfCompletedBroadcasts + "\n\t" + "numOfCompletedBroadcasts=" + numOfCompletedBroadcasts + "\n\t" +
"numOfFailedBroadcasts=" + numOfFailedBroadcasts + "\n\t" + "numOfFailedBroadcasts=" + numOfFailedBroadcasts);
"broadcastQueue.size()=" + broadcastQueue.size() + "\n\t" +
"broadcastQueue=" + broadcastQueue);
onFault(errorMessage, false); onFault(errorMessage, false);
}, timeoutDelay); }, timeoutDelay);
@ -153,16 +146,14 @@ public class BroadcastHandler implements PeerManager.Listener {
"message = " + Utilities.toTruncatedString(message); "message = " + Utilities.toTruncatedString(message);
if (!stopped) { if (!stopped) {
if (!connection.isStopped()) { if (!connection.isStopped()) {
if (!isCapabilityRequired(message) || isCapabilitySupported(connection, message)) { if (!connection.isCapabilityRequired(message) || connection.isCapabilitySupported(message)) {
NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get(); NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get();
log.trace("Broadcast message to " + nodeAddress + "."); log.trace("Broadcast message to " + nodeAddress + ".");
broadcastQueue.add(nodeAddress.getFullAddress());
SettableFuture<Connection> future = networkNode.sendMessage(connection, message); SettableFuture<Connection> future = networkNode.sendMessage(connection, message);
Futures.addCallback(future, new FutureCallback<Connection>() { Futures.addCallback(future, new FutureCallback<Connection>() {
@Override @Override
public void onSuccess(Connection connection) { public void onSuccess(Connection connection) {
numOfCompletedBroadcasts++; numOfCompletedBroadcasts++;
broadcastQueue.remove(nodeAddress.getFullAddress());
if (!stopped) { if (!stopped) {
log.trace("Broadcast to " + nodeAddress + " succeeded."); log.trace("Broadcast to " + nodeAddress + " succeeded.");
@ -188,7 +179,6 @@ public class BroadcastHandler implements PeerManager.Listener {
@Override @Override
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
numOfFailedBroadcasts++; numOfFailedBroadcasts++;
broadcastQueue.remove(nodeAddress.getFullAddress());
if (!stopped) { if (!stopped) {
log.info("Broadcast to " + nodeAddress + " failed.\n\t" + log.info("Broadcast to " + nodeAddress + " failed.\n\t" +
"ErrorMessage=" + throwable.getMessage()); "ErrorMessage=" + throwable.getMessage());
@ -208,41 +198,6 @@ public class BroadcastHandler implements PeerManager.Listener {
} }
} }
private boolean isCapabilitySupported(Connection connection, BroadcastMessage message) {
if (message instanceof AddDataMessage) {
final StoragePayload storagePayload = (((AddDataMessage) message).protectedStorageEntry).getStoragePayload();
if (storagePayload instanceof CapabilityRequiringPayload) {
final List<Integer> requiredCapabilities = ((CapabilityRequiringPayload) storagePayload).getRequiredCapabilities();
final List<Integer> supportedCapabilities = connection.getSupportedCapabilities();
if (supportedCapabilities != null) {
for (int messageCapability : requiredCapabilities) {
for (int connectionCapability : supportedCapabilities) {
if (messageCapability == connectionCapability)
return true;
}
}
log.debug("We do not send the message to the peer because he does not support the required capability for that message type.\n" +
"Required capabilities is: " + requiredCapabilities.toString() + "\n" +
"Supported capabilities is: " + supportedCapabilities.toString() + "\n" +
"storagePayload is: " + Utilities.toTruncatedString(storagePayload));
return false;
} else {
log.debug("We do not send the message to the peer because he uses an old version which does not support capabilities.\n" +
"Required capabilities is: " + requiredCapabilities.toString() + "\n" +
"storagePayload is: " + Utilities.toTruncatedString(storagePayload));
return false;
}
} else {
return true;
}
} else {
return true;
}
}
private boolean isCapabilityRequired(BroadcastMessage message) {
return message instanceof AddDataMessage && (((AddDataMessage) message).protectedStorageEntry).getStoragePayload() instanceof CapabilityRequiringPayload;
}
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// PeerManager.Listener implementation // PeerManager.Listener implementation