diff --git a/network/src/main/java/io/bitsquare/p2p/peers/BroadcastHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/BroadcastHandler.java index 7c8b591237..5d8c6105bc 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/BroadcastHandler.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/BroadcastHandler.java @@ -10,17 +10,13 @@ import io.bitsquare.common.util.Utilities; import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.NetworkNode; -import io.bitsquare.p2p.storage.messages.AddDataMessage; import io.bitsquare.p2p.storage.messages.BroadcastMessage; -import io.bitsquare.p2p.storage.payload.CapabilityRequiringPayload; -import io.bitsquare.p2p.storage.payload.StoragePayload; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; -import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -70,7 +66,6 @@ public class BroadcastHandler implements PeerManager.Listener { private Listener listener; private int numOfPeers; private Timer timeoutTimer; - private Set broadcastQueue = new CopyOnWriteArraySet<>(); /////////////////////////////////////////////////////////////////////////////////////////// @@ -127,9 +122,7 @@ public class BroadcastHandler implements PeerManager.Listener { "numOfPeers=" + numOfPeers + "\n\t" + "numOfCompletedBroadcasts=" + numOfCompletedBroadcasts + "\n\t" + "numOfCompletedBroadcasts=" + numOfCompletedBroadcasts + "\n\t" + - "numOfFailedBroadcasts=" + numOfFailedBroadcasts + "\n\t" + - "broadcastQueue.size()=" + broadcastQueue.size() + "\n\t" + - "broadcastQueue=" + broadcastQueue); + "numOfFailedBroadcasts=" + numOfFailedBroadcasts); onFault(errorMessage, false); }, timeoutDelay); @@ -153,16 +146,14 @@ public class BroadcastHandler implements PeerManager.Listener { "message = " + Utilities.toTruncatedString(message); if (!stopped) { if (!connection.isStopped()) { - if (!isCapabilityRequired(message) || isCapabilitySupported(connection, message)) { + if (!connection.isCapabilityRequired(message) || connection.isCapabilitySupported(message)) { NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get(); log.trace("Broadcast message to " + nodeAddress + "."); - broadcastQueue.add(nodeAddress.getFullAddress()); SettableFuture future = networkNode.sendMessage(connection, message); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Connection connection) { numOfCompletedBroadcasts++; - broadcastQueue.remove(nodeAddress.getFullAddress()); if (!stopped) { log.trace("Broadcast to " + nodeAddress + " succeeded."); @@ -188,7 +179,6 @@ public class BroadcastHandler implements PeerManager.Listener { @Override public void onFailure(@NotNull Throwable throwable) { numOfFailedBroadcasts++; - broadcastQueue.remove(nodeAddress.getFullAddress()); if (!stopped) { log.info("Broadcast to " + nodeAddress + " failed.\n\t" + "ErrorMessage=" + throwable.getMessage()); @@ -208,41 +198,6 @@ public class BroadcastHandler implements PeerManager.Listener { } } - private boolean isCapabilitySupported(Connection connection, BroadcastMessage message) { - if (message instanceof AddDataMessage) { - final StoragePayload storagePayload = (((AddDataMessage) message).protectedStorageEntry).getStoragePayload(); - if (storagePayload instanceof CapabilityRequiringPayload) { - final List requiredCapabilities = ((CapabilityRequiringPayload) storagePayload).getRequiredCapabilities(); - final List supportedCapabilities = connection.getSupportedCapabilities(); - if (supportedCapabilities != null) { - for (int messageCapability : requiredCapabilities) { - for (int connectionCapability : supportedCapabilities) { - if (messageCapability == connectionCapability) - return true; - } - } - log.debug("We do not send the message to the peer because he does not support the required capability for that message type.\n" + - "Required capabilities is: " + requiredCapabilities.toString() + "\n" + - "Supported capabilities is: " + supportedCapabilities.toString() + "\n" + - "storagePayload is: " + Utilities.toTruncatedString(storagePayload)); - return false; - } else { - log.debug("We do not send the message to the peer because he uses an old version which does not support capabilities.\n" + - "Required capabilities is: " + requiredCapabilities.toString() + "\n" + - "storagePayload is: " + Utilities.toTruncatedString(storagePayload)); - return false; - } - } else { - return true; - } - } else { - return true; - } - } - - private boolean isCapabilityRequired(BroadcastMessage message) { - return message instanceof AddDataMessage && (((AddDataMessage) message).protectedStorageEntry).getStoragePayload() instanceof CapabilityRequiringPayload; - } /////////////////////////////////////////////////////////////////////////////////////////// // PeerManager.Listener implementation