Merge pull request #2469 from freimair/refactor_capabilities

Refactor capabilities and other P2P-related stuff
This commit is contained in:
Manfred Karrer 2019-03-06 18:38:23 -05:00 committed by GitHub
commit 73eea7927a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 398 additions and 574 deletions

View File

@ -17,55 +17,87 @@
package bisq.common.app;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.Setter;
/**
* hold a set of capabilities and offers appropriate comparison methods.
*
* @author Florian Reimair
*/
public class Capabilities {
// We can define here special features the client is supporting.
// Useful for updates to new versions where a new data type would break backwards compatibility or to
// limit a node to certain behaviour and roles like the seed nodes.
// We don't use the Enum in any serialized data, as changes in the enum would break backwards compatibility. We use the ordinal integer instead.
// Sequence in the enum must not be changed (append only).
public enum Capability {
TRADE_STATISTICS,
TRADE_STATISTICS_2,
ACCOUNT_AGE_WITNESS,
SEED_NODE,
DAO_FULL_NODE,
PROPOSAL,
BLIND_VOTE,
ACK_MSG,
BSQ_BLOCK
/**
* The global set of capabilities, i.e. the capabilities if the local app.
*/
public static final Capabilities app = new Capabilities();
protected final Set<Capability> capabilities = new HashSet<>();
public Capabilities(Capability... capabilities) {
this(Arrays.asList(capabilities));
}
// Application need to set supported capabilities at startup
@Getter
@Setter
private static List<Integer> supportedCapabilities = new ArrayList<>();
public static void addCapability(int capability) {
supportedCapabilities.add(capability);
public Capabilities(Capabilities capabilities) {
this(capabilities.capabilities);
}
public static boolean isCapabilitySupported(final List<Integer> requiredItems, final List<Integer> supportedItems) {
if (requiredItems != null && !requiredItems.isEmpty()) {
if (supportedItems != null && !supportedItems.isEmpty()) {
List<Integer> matches = new ArrayList<>();
for (int requiredItem : requiredItems) {
matches.addAll(supportedItems.stream()
.filter(supportedItem -> requiredItem == supportedItem)
.collect(Collectors.toList()));
}
return matches.size() == requiredItems.size();
} else {
return false;
}
} else {
return true;
}
public Capabilities(Collection<Capability> capabilities) {
this.capabilities.addAll(capabilities);
}
public void resetCapabilities(Capability... capabilities) {
resetCapabilities(Arrays.asList(capabilities));
}
public void resetCapabilities(Capabilities capabilities) {
resetCapabilities(capabilities.capabilities);
}
public void resetCapabilities(Collection<Capability> capabilities) {
this.capabilities.clear();
this.capabilities.addAll(capabilities);
}
public boolean isCapabilitySupported(final Set<Capability> requiredItems) {
return capabilities.containsAll(requiredItems);
}
public boolean isCapabilitySupported(final Capabilities capabilities) {
return isCapabilitySupported(capabilities.capabilities);
}
public boolean hasCapabilities() {
return !capabilities.isEmpty();
}
/**
* helper for protobuffer stuff
*
* @param capabilities
* @return int list of Capability ordinals
*/
public static List<Integer> toIntList(Capabilities capabilities) {
return capabilities.capabilities.stream().map(capability -> capability.ordinal()).sorted().collect(Collectors.toList());
}
/**
* helper for protobuffer stuff
*
* @param capabilities a list of Capability ordinals
* @return a {@link Capabilities} object
*/
public static Capabilities fromIntList(List<Integer> capabilities) {
return new Capabilities(capabilities.stream().map(integer -> Capability.values()[integer]).collect(Collectors.toSet()));
}
@Override
public String toString() {
return Arrays.toString(Capabilities.toIntList(this).toArray());
}
}

View File

@ -0,0 +1,35 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.common.app;
// We can define here special features the client is supporting.
// Useful for updates to new versions where a new data type would break backwards compatibility or to
// limit a node to certain behaviour and roles like the seed nodes.
// We don't use the Enum in any serialized data, as changes in the enum would break backwards compatibility. We use the ordinal integer instead.
// Sequence in the enum must not be changed (append only).
public enum Capability {
TRADE_STATISTICS,
TRADE_STATISTICS_2,
ACCOUNT_AGE_WITNESS,
SEED_NODE,
DAO_FULL_NODE,
PROPOSAL,
BLIND_VOTE,
ACK_MSG,
BSQ_BLOCK
}

View File

@ -17,39 +17,47 @@
package bisq.common.app;
import java.util.Arrays;
import java.util.HashSet;
import org.junit.Test;
import static bisq.common.app.Capability.*;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class CapabilitiesTest {
@Test
public void testVersionNumber() {
// if required are null or empty its true
assertTrue(Capabilities.isCapabilitySupported(null, null));
assertTrue(Capabilities.isCapabilitySupported(null, Arrays.asList()));
assertTrue(Capabilities.isCapabilitySupported(null, Arrays.asList(0)));
assertTrue(Capabilities.isCapabilitySupported(Arrays.asList(), null));
assertTrue(Capabilities.isCapabilitySupported(Arrays.asList(), Arrays.asList()));
assertTrue(Capabilities.isCapabilitySupported(Arrays.asList(), Arrays.asList(0)));
public void testNoCapabilitiesAvailable() {
Capabilities DUT = new Capabilities();
// required are not null and not empty but supported is null or empty its false
assertFalse(Capabilities.isCapabilitySupported(Arrays.asList(0), null));
assertFalse(Capabilities.isCapabilitySupported(Arrays.asList(0), Arrays.asList()));
assertTrue(DUT.isCapabilitySupported(new HashSet<>()));
assertFalse(DUT.isCapabilitySupported(new Capabilities(SEED_NODE)));
}
@Test
public void testO() {
Capabilities DUT = new Capabilities(TRADE_STATISTICS);
assertTrue(DUT.isCapabilitySupported(new HashSet<>()));
}
@Test
public void testSingleMatch() {
Capabilities DUT = new Capabilities(TRADE_STATISTICS);
// single match
assertTrue(Capabilities.isCapabilitySupported(Arrays.asList(0), Arrays.asList(0)));
assertFalse(Capabilities.isCapabilitySupported(Arrays.asList(1), Arrays.asList(0)));
assertFalse(Capabilities.isCapabilitySupported(Arrays.asList(0), Arrays.asList(1)));
assertTrue(DUT.isCapabilitySupported(new Capabilities(TRADE_STATISTICS)));
assertFalse(DUT.isCapabilitySupported(new Capabilities(SEED_NODE)));
}
// multi match
assertTrue(Capabilities.isCapabilitySupported(Arrays.asList(0), Arrays.asList(0, 1)));
assertFalse(Capabilities.isCapabilitySupported(Arrays.asList(0), Arrays.asList(1, 2)));
assertTrue(Capabilities.isCapabilitySupported(Arrays.asList(0, 1), Arrays.asList(0, 1)));
assertTrue(Capabilities.isCapabilitySupported(Arrays.asList(0, 1), Arrays.asList(1, 0)));
assertFalse(Capabilities.isCapabilitySupported(Arrays.asList(0, 1), Arrays.asList(0)));
@Test
public void testMultiMatch() {
Capabilities DUT = new Capabilities(TRADE_STATISTICS, TRADE_STATISTICS_2);
assertTrue(DUT.isCapabilitySupported(new Capabilities(TRADE_STATISTICS)));
assertFalse(DUT.isCapabilitySupported(new Capabilities(SEED_NODE)));
assertTrue(DUT.isCapabilitySupported(new Capabilities(TRADE_STATISTICS, TRADE_STATISTICS_2)));
assertFalse(DUT.isCapabilitySupported(new Capabilities(SEED_NODE, TRADE_STATISTICS_2)));
}
}

View File

@ -29,6 +29,7 @@ import bisq.network.p2p.seed.SeedNodeRepository;
import bisq.common.Timer;
import bisq.common.UserThread;
import bisq.common.app.Capabilities;
import bisq.common.app.Capability;
import javax.inject.Inject;
@ -164,22 +165,20 @@ public final class RepublishGovernanceDataHandler {
}
private void connectToAnyFullNode() {
List<Integer> required = new ArrayList<>(Collections.singletonList(
Capabilities.Capability.DAO_FULL_NODE.ordinal()
));
Capabilities required = new Capabilities(Capability.DAO_FULL_NODE);
List<Peer> list = peerManager.getLivePeers(null).stream()
.filter(peer -> Capabilities.isCapabilitySupported(required, peer.getSupportedCapabilities()))
.filter(peer -> peer.isCapabilitySupported(required))
.collect(Collectors.toList());
if (list.isEmpty())
list = peerManager.getReportedPeers().stream()
.filter(peer -> Capabilities.isCapabilitySupported(required, peer.getSupportedCapabilities()))
.filter(peer -> peer.isCapabilitySupported(required))
.collect(Collectors.toList());
if (list.isEmpty())
list = peerManager.getPersistedPeers().stream()
.filter(peer -> Capabilities.isCapabilitySupported(required, peer.getSupportedCapabilities()))
.filter(peer -> peer.isCapabilitySupported(required))
.collect(Collectors.toList());
if (!list.isEmpty()) {

View File

@ -21,15 +21,12 @@ import bisq.network.p2p.DirectMessage;
import bisq.network.p2p.storage.payload.CapabilityRequiringPayload;
import bisq.common.app.Capabilities;
import bisq.common.app.Capability;
import bisq.common.app.Version;
import bisq.common.proto.network.NetworkEnvelope;
import io.bisq.generated.protobuffer.PB;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
@ -62,10 +59,8 @@ public final class RepublishGovernanceDataRequest extends NetworkEnvelope implem
}
@Override
public List<Integer> getRequiredCapabilities() {
return new ArrayList<>(Collections.singletonList(
Capabilities.Capability.DAO_FULL_NODE.ordinal()
));
public Capabilities getRequiredCapabilities() {
return new Capabilities(Capability.DAO_FULL_NODE);
}
@Override

View File

@ -25,6 +25,7 @@ import bisq.network.p2p.storage.payload.DateTolerantPayload;
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
import bisq.common.app.Capabilities;
import bisq.common.app.Capability;
import bisq.common.crypto.Hash;
import bisq.common.proto.persistable.PersistableEnvelope;
import bisq.common.util.Utilities;
@ -33,10 +34,7 @@ import io.bisq.generated.protobuffer.PB;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.EqualsAndHashCode;
@ -134,10 +132,8 @@ public final class BlindVotePayload implements PersistableNetworkPayload, Persis
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public List<Integer> getRequiredCapabilities() {
return new ArrayList<>(Collections.singletonList(
Capabilities.Capability.BLIND_VOTE.ordinal()
));
public Capabilities getRequiredCapabilities() {
return new Capabilities(Capability.BLIND_VOTE);
}
@Override

View File

@ -24,6 +24,7 @@ import bisq.network.p2p.storage.payload.CapabilityRequiringPayload;
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
import bisq.common.app.Capabilities;
import bisq.common.app.Capability;
import bisq.common.crypto.Hash;
import bisq.common.proto.persistable.PersistableEnvelope;
import bisq.common.util.Utilities;
@ -32,10 +33,6 @@ import io.bisq.generated.protobuffer.PB;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
@ -111,10 +108,8 @@ public class ProposalPayload implements PersistableNetworkPayload, PersistableEn
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public List<Integer> getRequiredCapabilities() {
return new ArrayList<>(Collections.singletonList(
Capabilities.Capability.PROPOSAL.ordinal()
));
public Capabilities getRequiredCapabilities() {
return new Capabilities(Capability.PROPOSAL);
}
@Override

View File

@ -25,6 +25,7 @@ import bisq.network.p2p.storage.payload.LazyProcessedPayload;
import bisq.network.p2p.storage.payload.ProtectedStoragePayload;
import bisq.common.app.Capabilities;
import bisq.common.app.Capability;
import bisq.common.crypto.Sig;
import bisq.common.proto.persistable.PersistablePayload;
@ -36,9 +37,6 @@ import org.springframework.util.CollectionUtils;
import java.security.PublicKey;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@ -130,10 +128,8 @@ public class TempProposalPayload implements LazyProcessedPayload, ProtectedStora
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public List<Integer> getRequiredCapabilities() {
return new ArrayList<>(Collections.singletonList(
Capabilities.Capability.PROPOSAL.ordinal()
));
public Capabilities getRequiredCapabilities() {
return new Capabilities(Capability.PROPOSAL);
}

View File

@ -21,15 +21,12 @@ import bisq.network.p2p.DirectMessage;
import bisq.network.p2p.storage.payload.CapabilityRequiringPayload;
import bisq.common.app.Capabilities;
import bisq.common.app.Capability;
import bisq.common.app.Version;
import bisq.common.proto.network.NetworkEnvelope;
import io.bisq.generated.protobuffer.PB;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
@ -68,10 +65,8 @@ public final class GetBlocksRequest extends NetworkEnvelope implements DirectMes
}
@Override
public List<Integer> getRequiredCapabilities() {
return new ArrayList<>(Collections.singletonList(
Capabilities.Capability.DAO_FULL_NODE.ordinal()
));
public Capabilities getRequiredCapabilities() {
return new Capabilities(Capability.DAO_FULL_NODE);
}

View File

@ -23,15 +23,12 @@ import bisq.network.p2p.storage.messages.BroadcastMessage;
import bisq.network.p2p.storage.payload.CapabilityRequiringPayload;
import bisq.common.app.Capabilities;
import bisq.common.app.Capability;
import bisq.common.app.Version;
import bisq.common.proto.network.NetworkEnvelope;
import io.bisq.generated.protobuffer.PB;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
@ -70,9 +67,7 @@ public final class NewBlockBroadcastMessage extends BroadcastMessage implements
}
@Override
public List<Integer> getRequiredCapabilities() {
return new ArrayList<>(Collections.singletonList(
Capabilities.Capability.BSQ_BLOCK.ordinal()
));
public Capabilities getRequiredCapabilities() {
return new Capabilities(Capability.BSQ_BLOCK);
}
}

View File

@ -25,7 +25,6 @@ import bisq.common.crypto.PubKeyRing;
import io.bisq.generated.protobuffer.PB;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@ -42,7 +41,7 @@ public final class OfferAvailabilityRequest extends OfferMessage implements Supp
private final PubKeyRing pubKeyRing;
private final long takersTradePrice;
@Nullable
private final List<Integer> supportedCapabilities;
private final Capabilities supportedCapabilities;
public OfferAvailabilityRequest(String offerId,
PubKeyRing pubKeyRing,
@ -50,7 +49,7 @@ public final class OfferAvailabilityRequest extends OfferMessage implements Supp
this(offerId,
pubKeyRing,
takersTradePrice,
Capabilities.getSupportedCapabilities(),
Capabilities.app,
Version.getP2PMessageVersion(),
UUID.randomUUID().toString());
}
@ -63,7 +62,7 @@ public final class OfferAvailabilityRequest extends OfferMessage implements Supp
private OfferAvailabilityRequest(String offerId,
PubKeyRing pubKeyRing,
long takersTradePrice,
@Nullable List<Integer> supportedCapabilities,
@Nullable Capabilities supportedCapabilities,
int messageVersion,
@Nullable String uid) {
super(messageVersion, offerId, uid);
@ -79,7 +78,7 @@ public final class OfferAvailabilityRequest extends OfferMessage implements Supp
.setPubKeyRing(pubKeyRing.toProtoMessage())
.setTakersTradePrice(takersTradePrice);
Optional.ofNullable(supportedCapabilities).ifPresent(e -> builder.addAllSupportedCapabilities(supportedCapabilities));
Optional.ofNullable(supportedCapabilities).ifPresent(e -> builder.addAllSupportedCapabilities(Capabilities.toIntList(supportedCapabilities)));
Optional.ofNullable(uid).ifPresent(e -> builder.setUid(uid));
return getNetworkEnvelopeBuilder()
@ -91,7 +90,7 @@ public final class OfferAvailabilityRequest extends OfferMessage implements Supp
return new OfferAvailabilityRequest(proto.getOfferId(),
PubKeyRing.fromProto(proto.getPubKeyRing()),
proto.getTakersTradePrice(),
proto.getSupportedCapabilitiesList().isEmpty() ? null : proto.getSupportedCapabilitiesList(),
Capabilities.fromIntList(proto.getSupportedCapabilitiesList()),
messageVersion,
proto.getUid().isEmpty() ? null : proto.getUid());
}

View File

@ -29,7 +29,6 @@ import bisq.common.proto.ProtoUtil;
import io.bisq.generated.protobuffer.PB;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@ -45,7 +44,7 @@ import javax.annotation.Nullable;
public final class OfferAvailabilityResponse extends OfferMessage implements SupportedCapabilitiesMessage {
private final AvailabilityResult availabilityResult;
@Nullable
private final List<Integer> supportedCapabilities;
private final Capabilities supportedCapabilities;
// Was introduced in v 0.9.0. Might be null if msg received from node with old version
@Nullable
@ -54,7 +53,7 @@ public final class OfferAvailabilityResponse extends OfferMessage implements Sup
public OfferAvailabilityResponse(String offerId, AvailabilityResult availabilityResult, NodeAddress arbitrator) {
this(offerId,
availabilityResult,
Capabilities.getSupportedCapabilities(),
Capabilities.app,
Version.getP2PMessageVersion(),
UUID.randomUUID().toString(),
arbitrator);
@ -67,7 +66,7 @@ public final class OfferAvailabilityResponse extends OfferMessage implements Sup
private OfferAvailabilityResponse(String offerId,
AvailabilityResult availabilityResult,
@Nullable List<Integer> supportedCapabilities,
@Nullable Capabilities supportedCapabilities,
int messageVersion,
@Nullable String uid,
@Nullable NodeAddress arbitrator) {
@ -83,7 +82,7 @@ public final class OfferAvailabilityResponse extends OfferMessage implements Sup
.setOfferId(offerId)
.setAvailabilityResult(PB.AvailabilityResult.valueOf(availabilityResult.name()));
Optional.ofNullable(supportedCapabilities).ifPresent(e -> builder.addAllSupportedCapabilities(supportedCapabilities));
Optional.ofNullable(supportedCapabilities).ifPresent(e -> builder.addAllSupportedCapabilities(Capabilities.toIntList(supportedCapabilities)));
Optional.ofNullable(uid).ifPresent(e -> builder.setUid(uid));
Optional.ofNullable(arbitrator).ifPresent(e -> builder.setArbitrator(arbitrator.toProtoMessage()));
@ -95,7 +94,7 @@ public final class OfferAvailabilityResponse extends OfferMessage implements Sup
public static OfferAvailabilityResponse fromProto(PB.OfferAvailabilityResponse proto, int messageVersion) {
return new OfferAvailabilityResponse(proto.getOfferId(),
ProtoUtil.enumFromProto(AvailabilityResult.class, proto.getAvailabilityResult().name()),
proto.getSupportedCapabilitiesList().isEmpty() ? null : proto.getSupportedCapabilitiesList(),
Capabilities.fromIntList(proto.getSupportedCapabilitiesList()),
messageVersion,
proto.getUid().isEmpty() ? null : proto.getUid(),
proto.hasArbitrator() ? NodeAddress.fromProto(proto.getArbitrator()) : null);

View File

@ -24,6 +24,7 @@ import bisq.network.p2p.storage.payload.LazyProcessedPayload;
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
import bisq.common.app.Capabilities;
import bisq.common.app.Capability;
import bisq.common.proto.persistable.PersistableEnvelope;
import bisq.common.util.Utilities;
@ -31,10 +32,7 @@ import io.bisq.generated.protobuffer.PB;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.Value;
@ -104,10 +102,8 @@ public class AccountAgeWitness implements LazyProcessedPayload, PersistableNetwo
// Pre 0.6 version don't know the new message type and throw an error which leads to disconnecting the peer.
@Override
public List<Integer> getRequiredCapabilities() {
return new ArrayList<>(Collections.singletonList(
Capabilities.Capability.ACCOUNT_AGE_WITNESS.ordinal()
));
public Capabilities getRequiredCapabilities() {
return new Capabilities(Capability.ACCOUNT_AGE_WITNESS);
}
@Override

View File

@ -21,29 +21,30 @@ import bisq.core.app.BisqEnvironment;
import bisq.core.dao.DaoOptionKeys;
import bisq.common.app.Capabilities;
import bisq.common.app.Capability;
import java.util.ArrayList;
import java.util.Arrays;
public class CoreNetworkCapabilities {
public static void setSupportedCapabilities(BisqEnvironment bisqEnvironment) {
final ArrayList<Integer> supportedCapabilities = new ArrayList<>(Arrays.asList(
Capabilities.Capability.TRADE_STATISTICS.ordinal(),
Capabilities.Capability.TRADE_STATISTICS_2.ordinal(),
Capabilities.Capability.ACCOUNT_AGE_WITNESS.ordinal(),
Capabilities.Capability.ACK_MSG.ordinal()
final ArrayList<Capability> supportedCapabilities = new ArrayList<>(Arrays.asList(
Capability.TRADE_STATISTICS,
Capability.TRADE_STATISTICS_2,
Capability.ACCOUNT_AGE_WITNESS,
Capability.ACK_MSG
));
if (BisqEnvironment.isDaoActivated(bisqEnvironment)) {
supportedCapabilities.add(Capabilities.Capability.PROPOSAL.ordinal());
supportedCapabilities.add(Capabilities.Capability.BLIND_VOTE.ordinal());
supportedCapabilities.add(Capabilities.Capability.BSQ_BLOCK.ordinal());
supportedCapabilities.add(Capability.PROPOSAL);
supportedCapabilities.add(Capability.BLIND_VOTE);
supportedCapabilities.add(Capability.BSQ_BLOCK);
String isFullDaoNode = bisqEnvironment.getProperty(DaoOptionKeys.FULL_DAO_NODE, String.class, "");
if (isFullDaoNode != null && !isFullDaoNode.isEmpty())
supportedCapabilities.add(Capabilities.Capability.DAO_FULL_NODE.ordinal());
supportedCapabilities.add(Capability.DAO_FULL_NODE);
}
Capabilities.setSupportedCapabilities(supportedCapabilities);
Capabilities.app.resetCapabilities(supportedCapabilities);
}
}

View File

@ -29,6 +29,7 @@ import bisq.network.p2p.storage.payload.LazyProcessedPayload;
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
import bisq.common.app.Capabilities;
import bisq.common.app.Capability;
import bisq.common.crypto.Hash;
import bisq.common.proto.persistable.PersistableEnvelope;
import bisq.common.util.JsonExclude;
@ -44,10 +45,7 @@ import org.bitcoinj.utils.Fiat;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -215,10 +213,8 @@ public final class TradeStatistics2 implements LazyProcessedPayload, Persistable
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public List<Integer> getRequiredCapabilities() {
return new ArrayList<>(Collections.singletonList(
Capabilities.Capability.TRADE_STATISTICS_2.ordinal()
));
public Capabilities getRequiredCapabilities() {
return new Capabilities(Capability.TRADE_STATISTICS_2);
}
@Override

View File

@ -65,7 +65,7 @@ public class P2pNetworkListItem {
e -> sentBytes.set(formatter.formatBytes((long) e)));
receivedBytesSubscription = EasyBind.subscribe(statistic.receivedBytesProperty(),
e -> receivedBytes.set(formatter.formatBytes((long) e)));
onionAddressSubscription = EasyBind.subscribe(connection.peersNodeAddressProperty(),
onionAddressSubscription = EasyBind.subscribe(connection.getPeersNodeAddressProperty(),
nodeAddress -> onionAddress.set(nodeAddress != null ? nodeAddress.getFullAddress() : Res.get("settings.net.notKnownYet")));
roundTripTimeSubscription = EasyBind.subscribe(statistic.roundTripTimeProperty(),
roundTripTime -> this.roundTripTime.set((int) roundTripTime == 0 ? "-" : roundTripTime + " ms"));

View File

@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@ -32,6 +33,7 @@ import org.springframework.core.env.PropertySource;
import bisq.common.Clock;
import bisq.common.app.Capabilities;
import bisq.common.app.Capability;
import bisq.common.app.Version;
import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.proto.network.NetworkProtoResolver;
@ -139,8 +141,11 @@ public class P2PNetworkLoad extends Metric implements MessageListener, SetupList
history = Collections.synchronizedMap(new FixedSizeHistoryTracker(Integer.parseInt(configuration.getProperty(HISTORY_SIZE, "200"))));
// add all capabilities
if(!Capabilities.getSupportedCapabilities().contains(Capabilities.Capability.DAO_FULL_NODE.ordinal()))
Capabilities.addCapability(Capabilities.Capability.DAO_FULL_NODE.ordinal());
List<Integer> capabilityOrdinals = Capabilities.toIntList(Capabilities.app);
if(!capabilityOrdinals.contains(Capability.DAO_FULL_NODE.ordinal())) {
capabilityOrdinals.add(Capability.DAO_FULL_NODE.ordinal());
Capabilities.app.resetCapabilities(Capabilities.fromIntList(capabilityOrdinals));
}
}
@Override

View File

@ -308,9 +308,9 @@ public class P2PSeedNodeSnapshot extends Metric implements MessageListener, Setu
});
}
checkNotNull(connection.peersNodeAddressProperty(),
checkNotNull(connection.getPeersNodeAddressProperty(),
"although the property is nullable, we need it to not be null");
bucketsPerHost.put(connection.peersNodeAddressProperty().getValue(), result);
bucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), result);
connection.shutDown(CloseConnectionReason.APP_SHUT_DOWN);
gate.proceed();

View File

@ -21,6 +21,7 @@ import bisq.network.p2p.storage.payload.CapabilityRequiringPayload;
import bisq.network.p2p.storage.payload.ExpirablePayload;
import bisq.common.app.Capabilities;
import bisq.common.app.Capability;
import bisq.common.app.Version;
import bisq.common.proto.ProtoUtil;
import bisq.common.proto.network.NetworkEnvelope;
@ -28,9 +29,6 @@ import bisq.common.proto.persistable.PersistablePayload;
import io.bisq.generated.protobuffer.PB;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@ -153,10 +151,8 @@ public final class AckMessage extends NetworkEnvelope implements MailboxMessage,
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public List<Integer> getRequiredCapabilities() {
return new ArrayList<>(Collections.singletonList(
Capabilities.Capability.ACK_MSG.ordinal()
));
public Capabilities getRequiredCapabilities() {
return new Capabilities(Capability.ACK_MSG);
}
@Override

View File

@ -619,15 +619,12 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
// it from old versions, so we filter those.
Optional<Peer> optionalPeer = allPeers.stream()
.filter(peer -> peer.getNodeAddress().equals(peersNodeAddress))
.filter(peer -> peer.getSupportedCapabilities() != null)
.filter(peer -> !peer.getSupportedCapabilities().isEmpty())
.filter(peer -> peer.hasCapabilities())
.findAny();
if (optionalPeer.isPresent()) {
Peer peer = optionalPeer.get();
boolean result = Connection.isCapabilityRequired(message) &&
!Connection.isCapabilitySupported((CapabilityRequiringPayload) message, peer.getSupportedCapabilities());
boolean result = optionalPeer.get().isCapabilitySupported(((CapabilityRequiringPayload) message).getRequiredCapabilities());
if (result)
if (!result)
log.warn("We don't send the message because the peer does not support the required capability. " +
"peersNodeAddress={}", peersNodeAddress);

View File

@ -17,11 +17,11 @@
package bisq.network.p2p;
import java.util.List;
import bisq.common.app.Capabilities;
import javax.annotation.Nullable;
public interface SupportedCapabilitiesMessage {
@Nullable
List<Integer> getSupportedCapabilities();
Capabilities getSupportedCapabilities();
}

View File

@ -53,7 +53,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import javafx.beans.property.ObjectProperty;
import javafx.beans.property.ReadOnlyObjectProperty;
import javafx.beans.property.SimpleObjectProperty;
import java.net.Socket;
@ -81,9 +80,7 @@ import java.util.stream.Collectors;
import java.lang.ref.WeakReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.Nullable;
@ -96,7 +93,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
* All handlers are called on User thread.
*/
@Slf4j
public class Connection implements MessageListener {
public class Connection extends Capabilities implements Runnable, MessageListener {
///////////////////////////////////////////////////////////////////////////////////////////
// Enums
@ -135,11 +132,11 @@ public class Connection implements MessageListener {
private final Socket socket;
// private final MessageListener messageListener;
private final ConnectionListener connectionListener;
private final String portInfo;
@Getter
private final String uid;
private final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
// holder of state shared between InputHandler and Connection
private final SharedModel sharedModel;
@Getter
private final Statistic statistic;
private final int msgThrottlePer10Sec;
private final int msgThrottlePerSec;
@ -147,19 +144,28 @@ public class Connection implements MessageListener {
private final int sendMsgThrottleSleep;
// set in init
private InputHandler inputHandler;
private SynchronizedProtoOutputStream protoOutputStream;
// mutable data, set from other threads but not changed internally.
@Getter
private Optional<NodeAddress> peersNodeAddressOptional = Optional.<NodeAddress>empty();
@Getter
private volatile boolean stopped;
private PeerType peerType;
// Use Peer as default, in case of other types they will set it as soon as possible.
@Getter
private PeerType peerType = PeerType.PEER;
@Getter
private final ObjectProperty<NodeAddress> peersNodeAddressProperty = new SimpleObjectProperty<>();
private final List<Tuple2<Long, String>> messageTimeStamps = new ArrayList<>();
private final CopyOnWriteArraySet<MessageListener> messageListeners = new CopyOnWriteArraySet<>();
private volatile long lastSendTimeStamp = 0;
private final CopyOnWriteArraySet<WeakReference<SupportedCapabilitiesListener>> capabilitiesListeners = new CopyOnWriteArraySet<>();
@Getter
private RuleViolation ruleViolation;
private final ConcurrentHashMap<RuleViolation, Integer> ruleViolations = new ConcurrentHashMap<>();
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
@ -182,17 +188,11 @@ public class Connection implements MessageListener {
addMessageListener(messageListener);
sharedModel = new SharedModel(this, socket);
if (socket.getLocalPort() == 0)
portInfo = "port=" + socket.getPort();
else
portInfo = "localPort=" + socket.getLocalPort() + "/port=" + socket.getPort();
init(peersNodeAddress, networkProtoResolver);
this.networkProtoResolver = networkProtoResolver;
init(peersNodeAddress);
}
private void init(@Nullable NodeAddress peersNodeAddress, NetworkProtoResolver networkProtoResolver) {
private void init(@Nullable NodeAddress peersNodeAddress) {
try {
socket.setSoTimeout(SOCKET_TIMEOUT);
// Need to access first the ObjectOutputStream otherwise the ObjectInputStream would block
@ -201,13 +201,9 @@ public class Connection implements MessageListener {
// the associated ObjectOutputStream on the other end of the connection has written.
// It will not return until that header has been read.
protoOutputStream = new SynchronizedProtoOutputStream(socket.getOutputStream(), statistic);
InputStream protoInputStream = socket.getInputStream();
protoInputStream = socket.getInputStream();
// We create a thread for handling inputStream data
inputHandler = new InputHandler(sharedModel, protoInputStream, portInfo, this, networkProtoResolver);
singleThreadExecutor.submit(inputHandler);
// Use Peer as default, in case of other types they will set it as soon as possible.
peerType = PeerType.PEER;
singleThreadExecutor.submit(this);
if (peersNodeAddress != null)
setPeersNodeAddress(peersNodeAddress);
@ -218,12 +214,6 @@ public class Connection implements MessageListener {
}
}
private void handleException(Throwable e) {
if (sharedModel != null)
sharedModel.handleConnectionException(e);
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
@ -282,8 +272,6 @@ public class Connection implements MessageListener {
} catch (Throwable t) {
handleException(t);
}
} else {
log.info("We did not send the message because the peer does not support our required capabilities. message={}, peers supportedCapabilities={}", networkEnvelope, sharedModel.getSupportedCapabilities());
}
} else {
log.debug("called sendMessage but was already stopped");
@ -291,46 +279,27 @@ public class Connection implements MessageListener {
}
public boolean noCapabilityRequiredOrCapabilityIsSupported(Proto msg) {
return !isCapabilityRequired(msg) || isCapabilitySupported(msg);
}
@SuppressWarnings("BooleanMethodIsAlwaysInverted")
public static boolean isCapabilityRequired(Proto msg) {
boolean result;
if (msg instanceof AddDataMessage) {
final ProtectedStoragePayload protectedStoragePayload = (((AddDataMessage) msg).getProtectedStorageEntry()).getProtectedStoragePayload();
return protectedStoragePayload instanceof CapabilityRequiringPayload;
result = !(protectedStoragePayload instanceof CapabilityRequiringPayload);
if(!result)
result = isCapabilitySupported(((CapabilityRequiringPayload) protectedStoragePayload).getRequiredCapabilities());
} else if (msg instanceof AddPersistableNetworkPayloadMessage) {
final PersistableNetworkPayload persistableNetworkPayload = ((AddPersistableNetworkPayloadMessage) msg).getPersistableNetworkPayload();
return persistableNetworkPayload instanceof CapabilityRequiringPayload;
result = !(persistableNetworkPayload instanceof CapabilityRequiringPayload);
if(!result)
result = isCapabilitySupported(((CapabilityRequiringPayload) persistableNetworkPayload).getRequiredCapabilities());
} else if(msg instanceof CapabilityRequiringPayload) {
result = isCapabilitySupported(((CapabilityRequiringPayload) msg).getRequiredCapabilities());
} else {
return msg instanceof CapabilityRequiringPayload;
result = true;
}
}
private boolean isCapabilitySupported(Proto msg) {
if (msg instanceof AddDataMessage) {
final ProtectedStoragePayload protectedStoragePayload = (((AddDataMessage) msg).getProtectedStorageEntry()).getProtectedStoragePayload();
return protectedStoragePayload instanceof CapabilityRequiringPayload && isCapabilitySupported((CapabilityRequiringPayload) protectedStoragePayload);
} else if (msg instanceof AddPersistableNetworkPayloadMessage) {
final PersistableNetworkPayload persistableNetworkPayload = ((AddPersistableNetworkPayloadMessage) msg).getPersistableNetworkPayload();
return persistableNetworkPayload instanceof CapabilityRequiringPayload && isCapabilitySupported((CapabilityRequiringPayload) persistableNetworkPayload);
} else {
return msg instanceof CapabilityRequiringPayload && isCapabilitySupported((CapabilityRequiringPayload) msg);
}
}
if (!result)
log.info("We did not send the message because the peer does not support our required capabilities. message={}, peers supportedCapabilities={}", msg, capabilities);
private boolean isCapabilitySupported(CapabilityRequiringPayload payload) {
return isCapabilitySupported(payload, sharedModel.getSupportedCapabilities());
}
public static boolean isCapabilitySupported(CapabilityRequiringPayload payload, List<Integer> supportedCapabilities) {
final List<Integer> requiredCapabilities = payload.getRequiredCapabilities();
return Capabilities.isCapabilitySupported(requiredCapabilities, supportedCapabilities);
}
@Nullable
public List<Integer> getSupportedCapabilities() {
return sharedModel.getSupportedCapabilities();
return result;
}
public void addMessageListener(MessageListener messageListener) {
@ -350,11 +319,6 @@ public class Connection implements MessageListener {
capabilitiesListeners.add(new WeakReference<>(listener));
}
@SuppressWarnings({"unused", "UnusedReturnValue"})
public boolean reportIllegalRequest(RuleViolation ruleViolation) {
return sharedModel.reportInvalidRequest(ruleViolation);
}
// TODO either use the argument or delete it
private boolean violatesThrottleLimit(NetworkEnvelope networkEnvelope) {
long now = System.currentTimeMillis();
@ -436,48 +400,18 @@ public class Connection implements MessageListener {
if (BanList.isBanned(peerNodeAddress)) {
log.warn("We detected a connection to a banned peer. We will close that connection. (setPeersNodeAddress)");
sharedModel.reportInvalidRequest(RuleViolation.PEER_BANNED);
reportInvalidRequest(RuleViolation.PEER_BANNED);
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Getters
///////////////////////////////////////////////////////////////////////////////////////////
public Optional<NodeAddress> getPeersNodeAddressOptional() {
return peersNodeAddressOptional;
}
public String getUid() {
return uid;
}
public boolean hasPeersNodeAddress() {
return peersNodeAddressOptional.isPresent();
}
public boolean isStopped() {
return stopped;
}
public PeerType getPeerType() {
return peerType;
}
public ReadOnlyObjectProperty<NodeAddress> peersNodeAddressProperty() {
return peersNodeAddressProperty;
}
public RuleViolation getRuleViolation() {
return sharedModel.getRuleViolation();
}
public Statistic getStatistic() {
return statistic;
}
///////////////////////////////////////////////////////////////////////////////////////////
// ShutDown
///////////////////////////////////////////////////////////////////////////////////////////
@ -502,22 +436,22 @@ public class Connection implements MessageListener {
Thread.currentThread().setName("Connection:SendCloseConnectionMessage-" + this.uid);
try {
String reason = closeConnectionReason == CloseConnectionReason.RULE_VIOLATION ?
sharedModel.getRuleViolation().name() : closeConnectionReason.name();
getRuleViolation().name() : closeConnectionReason.name();
sendMessage(new CloseConnectionMessage(reason));
setStopFlags();
stopped = true;
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
log.error(t.getMessage());
t.printStackTrace();
} finally {
setStopFlags();
stopped = true;
UserThread.execute(() -> doShutDown(closeConnectionReason, shutDownCompleteHandler));
}
}).start();
} else {
setStopFlags();
stopped = true;
doShutDown(closeConnectionReason, shutDownCompleteHandler);
}
} else {
@ -527,18 +461,11 @@ public class Connection implements MessageListener {
}
}
private void setStopFlags() {
stopped = true;
sharedModel.stop();
if (inputHandler != null)
inputHandler.stop();
}
private void doShutDown(CloseConnectionReason closeConnectionReason, @Nullable Runnable shutDownCompleteHandler) {
// Use UserThread.execute as its not clear if that is called from a non-UserThread
UserThread.execute(() -> connectionListener.onDisconnect(closeConnectionReason, this));
try {
sharedModel.getSocket().close();
socket.close();
} catch (SocketException e) {
log.trace("SocketException at shutdown might be expected " + e.getMessage());
} catch (IOException e) {
@ -546,6 +473,14 @@ public class Connection implements MessageListener {
e.printStackTrace();
} finally {
protoOutputStream.onConnectionShutdown();
try {
protoInputStream.close();
} catch (IOException e) {
log.error(e.getMessage());
e.printStackTrace();
}
MoreExecutors.shutdownAndAwaitTermination(singleThreadExecutor, 500, TimeUnit.MILLISECONDS);
log.debug("Connection shutdown complete " + this.toString());
@ -582,12 +517,20 @@ public class Connection implements MessageListener {
@SuppressWarnings("unused")
public String printDetails() {
String portInfo;
if (socket.getLocalPort() == 0)
portInfo = "port=" + socket.getPort();
else
portInfo = "localPort=" + socket.getLocalPort() + "/port=" + socket.getPort();
return "Connection{" +
"peerAddress=" + peersNodeAddressOptional +
", peerType=" + peerType +
", portInfo=" + portInfo +
", uid='" + uid + '\'' +
", sharedSpace=" + sharedModel.toString() +
", ruleViolation=" + ruleViolation +
", ruleViolations=" + ruleViolations +
", supportedCapabilities=" + capabilities +
", stopped=" + stopped +
'}';
}
@ -596,142 +539,76 @@ public class Connection implements MessageListener {
///////////////////////////////////////////////////////////////////////////////////////////
// SharedSpace
///////////////////////////////////////////////////////////////////////////////////////////
/**
* Holds all shared data between Connection and InputHandler
* Runs in same thread as Connection
*/
private static class SharedModel {
private static final Logger log = LoggerFactory.getLogger(SharedModel.class);
private final Connection connection;
private final Socket socket;
private final ConcurrentHashMap<RuleViolation, Integer> ruleViolations = new ConcurrentHashMap<>();
// mutable
private volatile boolean stopped;
private CloseConnectionReason closeConnectionReason;
private RuleViolation ruleViolation;
@Nullable
private List<Integer> supportedCapabilities;
SharedModel(Connection connection, Socket socket) {
this.connection = connection;
this.socket = socket;
}
public boolean reportInvalidRequest(RuleViolation ruleViolation) {
log.warn("We got reported the ruleViolation {} at connection {}", ruleViolation, this);
int numRuleViolations;
numRuleViolations = ruleViolations.getOrDefault(ruleViolation, 0);
public boolean reportInvalidRequest(RuleViolation ruleViolation) {
log.warn("We got reported the ruleViolation {} at connection {}", ruleViolation, connection);
int numRuleViolations;
numRuleViolations = ruleViolations.getOrDefault(ruleViolation, 0);
numRuleViolations++;
ruleViolations.put(ruleViolation, numRuleViolations);
numRuleViolations++;
ruleViolations.put(ruleViolation, numRuleViolations);
if (numRuleViolations >= ruleViolation.maxTolerance) {
log.warn("We close connection as we received too many corrupt requests.\n" +
"numRuleViolations={}\n\t" +
"corruptRequest={}\n\t" +
"corruptRequests={}\n\t" +
"connection={}", numRuleViolations, ruleViolation, ruleViolations.toString(), connection);
this.ruleViolation = ruleViolation;
if (ruleViolation == RuleViolation.PEER_BANNED) {
log.warn("We close connection due RuleViolation.PEER_BANNED. peersNodeAddress={}", connection.getPeersNodeAddressOptional());
shutDown(CloseConnectionReason.PEER_BANNED);
} else if (ruleViolation == RuleViolation.INVALID_CLASS) {
log.warn("We close connection due RuleViolation.INVALID_CLASS");
shutDown(CloseConnectionReason.INVALID_CLASS_RECEIVED);
} else {
log.warn("We close connection due RuleViolation.RULE_VIOLATION");
shutDown(CloseConnectionReason.RULE_VIOLATION);
}
return true;
if (numRuleViolations >= ruleViolation.maxTolerance) {
log.warn("We close connection as we received too many corrupt requests.\n" +
"numRuleViolations={}\n\t" +
"corruptRequest={}\n\t" +
"corruptRequests={}\n\t" +
"connection={}", numRuleViolations, ruleViolation, ruleViolations.toString(), this);
this.ruleViolation = ruleViolation;
if (ruleViolation == RuleViolation.PEER_BANNED) {
log.warn("We close connection due RuleViolation.PEER_BANNED. peersNodeAddress={}", getPeersNodeAddressOptional());
shutDown(CloseConnectionReason.PEER_BANNED);
} else if (ruleViolation == RuleViolation.INVALID_CLASS) {
log.warn("We close connection due RuleViolation.INVALID_CLASS");
shutDown(CloseConnectionReason.INVALID_CLASS_RECEIVED);
} else {
return false;
log.warn("We close connection due RuleViolation.RULE_VIOLATION");
shutDown(CloseConnectionReason.RULE_VIOLATION);
}
}
@Nullable
public List<Integer> getSupportedCapabilities() {
return supportedCapabilities;
}
@SuppressWarnings("NullableProblems")
public void setSupportedCapabilities(List<Integer> supportedCapabilities) {
this.supportedCapabilities = supportedCapabilities;
connection.capabilitiesListeners.forEach(l -> {
SupportedCapabilitiesListener supportedCapabilitiesListener = l.get();
if (supportedCapabilitiesListener != null)
supportedCapabilitiesListener.onChanged(supportedCapabilities);
});
}
public void handleConnectionException(Throwable e) {
if (e instanceof SocketException) {
if (socket.isClosed())
closeConnectionReason = CloseConnectionReason.SOCKET_CLOSED;
else
closeConnectionReason = CloseConnectionReason.RESET;
log.info("SocketException (expected if connection lost). closeConnectionReason={}; connection={}", closeConnectionReason, connection);
} else if (e instanceof SocketTimeoutException || e instanceof TimeoutException) {
closeConnectionReason = CloseConnectionReason.SOCKET_TIMEOUT;
log.info("Shut down caused by exception {} on connection={}", e.toString(), connection);
} else if (e instanceof EOFException) {
closeConnectionReason = CloseConnectionReason.TERMINATED;
log.warn("Shut down caused by exception {} on connection={}", e.toString(), connection);
} else if (e instanceof OptionalDataException || e instanceof StreamCorruptedException) {
closeConnectionReason = CloseConnectionReason.CORRUPTED_DATA;
log.warn("Shut down caused by exception {} on connection={}", e.toString(), connection);
} else {
// TODO sometimes we get StreamCorruptedException, OptionalDataException, IllegalStateException
closeConnectionReason = CloseConnectionReason.UNKNOWN_EXCEPTION;
log.warn("Unknown reason for exception at socket: {}\n\t" +
"peer={}\n\t" +
"Exception={}",
socket.toString(),
connection.peersNodeAddressOptional,
e.toString());
e.printStackTrace();
}
shutDown(closeConnectionReason);
}
public void shutDown(CloseConnectionReason closeConnectionReason) {
if (!stopped) {
stopped = true;
connection.shutDown(closeConnectionReason);
}
}
public Socket getSocket() {
return socket;
}
public void stop() {
stopped = true;
}
RuleViolation getRuleViolation() {
return ruleViolation;
}
@Override
public String toString() {
return "SharedModel{" +
"\n connection=" + connection +
",\n socket=" + socket +
",\n ruleViolations=" + ruleViolations +
",\n stopped=" + stopped +
",\n closeConnectionReason=" + closeConnectionReason +
",\n ruleViolation=" + ruleViolation +
",\n supportedCapabilities=" + supportedCapabilities +
"\n}";
return true;
} else {
return false;
}
}
private void handleException(Throwable e) {
CloseConnectionReason closeConnectionReason;
if (e instanceof SocketException) {
if (socket.isClosed())
closeConnectionReason = CloseConnectionReason.SOCKET_CLOSED;
else
closeConnectionReason = CloseConnectionReason.RESET;
log.info("SocketException (expected if connection lost). closeConnectionReason={}; connection={}", closeConnectionReason, this);
} else if (e instanceof SocketTimeoutException || e instanceof TimeoutException) {
closeConnectionReason = CloseConnectionReason.SOCKET_TIMEOUT;
log.info("Shut down caused by exception {} on connection={}", e.toString(), this);
} else if (e instanceof EOFException) {
closeConnectionReason = CloseConnectionReason.TERMINATED;
log.warn("Shut down caused by exception {} on connection={}", e.toString(), this);
} else if (e instanceof OptionalDataException || e instanceof StreamCorruptedException) {
closeConnectionReason = CloseConnectionReason.CORRUPTED_DATA;
log.warn("Shut down caused by exception {} on connection={}", e.toString(), this);
} else {
// TODO sometimes we get StreamCorruptedException, OptionalDataException, IllegalStateException
closeConnectionReason = CloseConnectionReason.UNKNOWN_EXCEPTION;
log.warn("Unknown reason for exception at socket: {}\n\t" +
"peer={}\n\t" +
"Exception={}",
socket.toString(),
this.peersNodeAddressOptional,
e.toString());
e.printStackTrace();
}
shutDown(closeConnectionReason);
}
///////////////////////////////////////////////////////////////////////////////////////////
// InputHandler
@ -740,65 +617,29 @@ public class Connection implements MessageListener {
// Runs in same thread as Connection, receives a message, performs several checks on it
// (including throttling limits, validity and statistics)
// and delivers it to the message listener given in the constructor.
private static class InputHandler implements Runnable {
private static final Logger log = LoggerFactory.getLogger(InputHandler.class);
private final SharedModel sharedModel;
private final InputStream protoInputStream;
private final String portInfo;
private final MessageListener messageListener;
private InputStream protoInputStream;
private final NetworkProtoResolver networkProtoResolver;
private volatile boolean stopped;
private long lastReadTimeStamp;
private boolean threadNameSet;
InputHandler(SharedModel sharedModel,
InputStream protoInputStream,
String portInfo,
MessageListener messageListener,
NetworkProtoResolver networkProtoResolver) {
this.sharedModel = sharedModel;
this.protoInputStream = protoInputStream;
this.portInfo = portInfo;
this.messageListener = messageListener;
this.networkProtoResolver = networkProtoResolver;
}
public void stop() {
if (!stopped) {
try {
protoInputStream.close();
} catch (IOException e) {
log.error("IOException at InputHandler.stop\n" + e.getMessage());
e.printStackTrace();
} finally {
stopped = true;
}
}
}
@Override
public void run() {
try {
Thread.currentThread().setName("InputHandler");
while (!stopped && !Thread.currentThread().isInterrupted()) {
if (!threadNameSet && sharedModel.connection != null &&
sharedModel.connection.getPeersNodeAddressOptional().isPresent()) {
Thread.currentThread().setName("InputHandler-" + sharedModel.connection.getPeersNodeAddressOptional().get().getFullAddress());
if (!threadNameSet && getPeersNodeAddressOptional().isPresent()) {
Thread.currentThread().setName("InputHandler-" + getPeersNodeAddressOptional().get().getFullAddress());
threadNameSet = true;
}
try {
if (sharedModel.getSocket() != null &&
sharedModel.getSocket().isClosed()) {
log.warn("Socket is null or closed socket={}", sharedModel.getSocket());
stopAndShutDown(CloseConnectionReason.SOCKET_CLOSED);
if (socket != null &&
socket.isClosed()) {
log.warn("Socket is null or closed socket={}", socket);
shutDown(CloseConnectionReason.SOCKET_CLOSED);
return;
}
Connection connection = checkNotNull(sharedModel.connection, "connection must not be null");
log.trace("InputHandler waiting for incoming network_messages.\n\tConnection={}", connection);
// Throttle inbound network_messages
long now = System.currentTimeMillis();
long elapsed = now - lastReadTimeStamp;
@ -817,7 +658,7 @@ public class Connection implements MessageListener {
log.info("proto is null because protoInputStream.read()=-1 (EOF). That is expected if client got stopped without proper shutdown.");
else
log.warn("proto is null. protoInputStream.read()=" + protoInputStream.read());
stopAndShutDown(CloseConnectionReason.NO_PROTO_BUFFER_ENV);
shutDown(CloseConnectionReason.NO_PROTO_BUFFER_ENV);
return;
}
@ -851,10 +692,10 @@ public class Connection implements MessageListener {
}*/
// We want to track the size of each object even if it is invalid data
connection.statistic.addReceivedBytes(size);
statistic.addReceivedBytes(size);
// We want to track the network_messages also before the checks, so do it early...
connection.statistic.addReceivedMessage(networkEnvelope);
statistic.addReceivedMessage(networkEnvelope);
// First we check the size
boolean exceeds;
@ -881,7 +722,7 @@ public class Connection implements MessageListener {
return;
}
if (connection.violatesThrottleLimit(networkEnvelope)
if (violatesThrottleLimit(networkEnvelope)
&& reportInvalidRequest(RuleViolation.THROTTLE_LIMIT_EXCEEDED))
return;
@ -895,27 +736,27 @@ public class Connection implements MessageListener {
return;
}
if (sharedModel.getSupportedCapabilities() == null && networkEnvelope instanceof SupportedCapabilitiesMessage)
sharedModel.setSupportedCapabilities(((SupportedCapabilitiesMessage) networkEnvelope).getSupportedCapabilities());
if (networkEnvelope instanceof SupportedCapabilitiesMessage)
resetCapabilities(((SupportedCapabilitiesMessage) networkEnvelope).getSupportedCapabilities());
if (networkEnvelope instanceof CloseConnectionMessage) {
// If we get a CloseConnectionMessage we shut down
log.info("CloseConnectionMessage received. Reason={}\n\t" +
"connection={}", proto.getCloseConnectionMessage().getReason(), connection);
"connection={}", proto.getCloseConnectionMessage().getReason(), this);
if (CloseConnectionReason.PEER_BANNED.name().equals(proto.getCloseConnectionMessage().getReason())) {
log.warn("We got shut down because we are banned by the other peer. (InputHandler.run CloseConnectionMessage)");
stopAndShutDown(CloseConnectionReason.PEER_BANNED);
shutDown(CloseConnectionReason.PEER_BANNED);
} else {
stopAndShutDown(CloseConnectionReason.CLOSE_REQUESTED_BY_PEER);
shutDown(CloseConnectionReason.CLOSE_REQUESTED_BY_PEER);
}
return;
} else if (!stopped) {
// We don't want to get the activity ts updated by ping/pong msg
if (!(networkEnvelope instanceof KeepAliveMessage))
connection.statistic.updateLastActivityTimestamp();
statistic.updateLastActivityTimestamp();
if (networkEnvelope instanceof GetDataRequest)
connection.setPeerType(PeerType.INITIAL_DATA_REQUEST);
setPeerType(PeerType.INITIAL_DATA_REQUEST);
// First a seed node gets a message from a peer (PreliminaryDataRequest using
// AnonymousMessage interface) which does not have its hidden service
@ -932,7 +773,7 @@ public class Connection implements MessageListener {
// 4. DirectMessage (implements SendersNodeAddressMessage)
if (networkEnvelope instanceof SendersNodeAddressMessage) {
NodeAddress senderNodeAddress = ((SendersNodeAddressMessage) networkEnvelope).getSenderNodeAddress();
Optional<NodeAddress> peersNodeAddressOptional = connection.getPeersNodeAddressOptional();
Optional<NodeAddress> peersNodeAddressOptional = getPeersNodeAddressOptional();
if (peersNodeAddressOptional.isPresent()) {
// If we have already the peers address we check again if it matches our stored one
checkArgument(peersNodeAddressOptional.get().equals(senderNodeAddress),
@ -942,14 +783,14 @@ public class Connection implements MessageListener {
// We must not shut down a banned peer at that moment as it would trigger a connection termination
// and we could not send the CloseConnectionMessage.
// We check for a banned peer inside setPeersNodeAddress() and shut down if banned.
connection.setPeersNodeAddress(senderNodeAddress);
setPeersNodeAddress(senderNodeAddress);
}
}
if (networkEnvelope instanceof PrefixedSealedAndSignedMessage)
connection.setPeerType(Connection.PeerType.DIRECT_MSG_PEER);
setPeerType(Connection.PeerType.DIRECT_MSG_PEER);
messageListener.onMessage(networkEnvelope, connection);
onMessage(networkEnvelope, this);
}
} catch (InvalidClassException e) {
log.error(e.getMessage());
@ -967,33 +808,4 @@ public class Connection implements MessageListener {
handleException(t);
}
}
private void stopAndShutDown(CloseConnectionReason reason) {
stop();
sharedModel.shutDown(reason);
}
private void handleException(Throwable e) {
stop();
if (sharedModel != null)
sharedModel.handleConnectionException(e);
}
private boolean reportInvalidRequest(RuleViolation ruleViolation) {
boolean causedShutDown = sharedModel.reportInvalidRequest(ruleViolation);
if (causedShutDown)
stop();
return causedShutDown;
}
@Override
public String toString() {
return "InputHandler{" +
"sharedSpace=" + sharedModel +
", port=" + portInfo +
", stopped=" + stopped +
'}';
}
}
}

View File

@ -17,8 +17,8 @@
package bisq.network.p2p.network;
import java.util.List;
import bisq.common.app.Capabilities;
public interface SupportedCapabilitiesListener {
void onChanged(List<Integer> supportedCapabilities);
void onChanged(Capabilities supportedCapabilities);
}

View File

@ -205,8 +205,6 @@ public class BroadcastHandler implements PeerManager.Listener {
}
}
});
} else {
log.debug("We did not send the message because the peer does not support our required capabilities. message={}, peers supportedCapabilities={}", Utilities.toTruncatedString(message, 200), connection.getSupportedCapabilities());
}
} else {
onFault("Connection stopped already", false);

View File

@ -32,6 +32,7 @@ import bisq.network.p2p.seed.SeedNodeRepository;
import bisq.common.Clock;
import bisq.common.Timer;
import bisq.common.UserThread;
import bisq.common.app.Capabilities;
import bisq.common.proto.persistable.PersistedDataHost;
import bisq.common.proto.persistable.PersistenceProtoResolver;
import bisq.common.storage.Storage;
@ -175,7 +176,7 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
PeerList persistedPeerList = storage.initAndGetPersistedWithFileName("PeerList", 1000);
if (persistedPeerList != null) {
long peersWithNoCapabilitiesSet = persistedPeerList.getList().stream()
.filter(e -> e.getSupportedCapabilities().isEmpty())
.filter(e -> e.hasCapabilities())
.mapToInt(e -> 1)
.count();
if (peersWithNoCapabilitiesSet > 100) {
@ -448,7 +449,7 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
// If a node is trying to send too many list we treat it as rule violation.
// Reported list include the connected list. We use the max value and give some extra headroom.
// Will trigger a shutdown after 2nd time sending too much
connection.reportIllegalRequest(RuleViolation.TOO_MANY_REPORTED_PEERS_SENT);
connection.reportInvalidRequest(RuleViolation.TOO_MANY_REPORTED_PEERS_SENT);
}
}
@ -659,19 +660,18 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
// filter(connection -> connection.getPeersNodeAddressOptional().isPresent())
return networkNode.getConfirmedConnections().stream()
.map((Connection connection) -> {
List<Integer> supportedCapabilities = connection.getSupportedCapabilities();
Capabilities supportedCapabilities = new Capabilities(connection);
// If we have a new connection the supportedCapabilities is empty.
// We lookup if we have already stored the supportedCapabilities at the persisted or reported peers
// and if so we use that.
if (supportedCapabilities == null || supportedCapabilities.isEmpty()) {
if (!supportedCapabilities.hasCapabilities()) {
Set<Peer> allPeers = new HashSet<>(getPersistedPeers());
allPeers.addAll(getReportedPeers());
supportedCapabilities = allPeers.stream().filter(peer -> peer.getNodeAddress().equals(connection.getPeersNodeAddressOptional().get()))
.filter(peer -> !peer.getSupportedCapabilities().isEmpty())
.findAny()
.map(Peer::getSupportedCapabilities)
.filter(list -> !list.isEmpty())
.orElse(new ArrayList<>());
Optional<Peer> ourPeer = allPeers.stream().filter(peer -> peer.getNodeAddress().equals(connection.getPeersNodeAddressOptional().get()))
.filter(peer -> !peer.hasCapabilities())
.findAny();
if(ourPeer.isPresent())
supportedCapabilities = new Capabilities(ourPeer.get());
}
Peer peer = new Peer(connection.getPeersNodeAddressOptional().get(), supportedCapabilities);
connection.addWeakCapabilitiesListener(peer);

View File

@ -38,7 +38,6 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -159,27 +158,11 @@ public class GetDataRequestHandler {
final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
boolean doAdd = false;
if (protectedStoragePayload instanceof CapabilityRequiringPayload) {
final List<Integer> requiredCapabilities = ((CapabilityRequiringPayload) protectedStoragePayload).getRequiredCapabilities();
final List<Integer> supportedCapabilities = connection.getSupportedCapabilities();
if (supportedCapabilities != null) {
for (int messageCapability : requiredCapabilities) {
for (int connectionCapability : supportedCapabilities) {
if (messageCapability == connectionCapability) {
doAdd = true;
break;
}
}
}
if (!doAdd)
log.debug("We do not send the message to the peer because he does not support the required capability for that message type.\n" +
"Required capabilities is: " + requiredCapabilities.toString() + "\n" +
"Supported capabilities is: " + supportedCapabilities.toString() + "\n" +
"storagePayload is: " + Utilities.toTruncatedString(protectedStoragePayload));
} else {
log.debug("We do not send the message to the peer because he uses an old version which does not support capabilities.\n" +
"Required capabilities is: " + requiredCapabilities.toString() + "\n" +
if (connection.isCapabilitySupported(((CapabilityRequiringPayload) protectedStoragePayload).getRequiredCapabilities()))
doAdd = true;
else
log.debug("We do not send the message to the peer because he does not support the required capability for that message type.\n" +
"storagePayload is: " + Utilities.toTruncatedString(protectedStoragePayload));
}
} else {
doAdd = true;
}

View File

@ -31,7 +31,6 @@ import bisq.common.proto.network.NetworkProtoResolver;
import io.bisq.generated.protobuffer.PB;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -57,7 +56,7 @@ public final class GetDataResponse extends NetworkEnvelope implements SupportedC
private final int requestNonce;
private final boolean isGetUpdatedDataResponse;
@Nullable
private final List<Integer> supportedCapabilities;
private final Capabilities supportedCapabilities;
public GetDataResponse(Set<ProtectedStorageEntry> dataSet,
@Nullable Set<PersistableNetworkPayload> persistableNetworkPayloadSet,
@ -67,7 +66,7 @@ public final class GetDataResponse extends NetworkEnvelope implements SupportedC
persistableNetworkPayloadSet,
requestNonce,
isGetUpdatedDataResponse,
Capabilities.getSupportedCapabilities(),
Capabilities.app,
Version.getP2PMessageVersion());
}
@ -79,7 +78,7 @@ public final class GetDataResponse extends NetworkEnvelope implements SupportedC
@Nullable Set<PersistableNetworkPayload> persistableNetworkPayloadSet,
int requestNonce,
boolean isGetUpdatedDataResponse,
@Nullable List<Integer> supportedCapabilities,
@Nullable Capabilities supportedCapabilities,
int messageVersion) {
super(messageVersion);
@ -106,7 +105,7 @@ public final class GetDataResponse extends NetworkEnvelope implements SupportedC
.setRequestNonce(requestNonce)
.setIsGetUpdatedDataResponse(isGetUpdatedDataResponse);
Optional.ofNullable(supportedCapabilities).ifPresent(e -> builder.addAllSupportedCapabilities(supportedCapabilities));
Optional.ofNullable(supportedCapabilities).ifPresent(e -> builder.addAllSupportedCapabilities(Capabilities.toIntList(supportedCapabilities)));
Optional.ofNullable(persistableNetworkPayloadSet).ifPresent(set -> builder.addAllPersistableNetworkPayloadItems(set.stream()
.map(PersistableNetworkPayload::toProtoMessage)
.collect(Collectors.toList())));
@ -134,7 +133,7 @@ public final class GetDataResponse extends NetworkEnvelope implements SupportedC
persistableNetworkPayloadSet,
proto.getRequestNonce(),
proto.getIsGetUpdatedDataResponse(),
proto.getSupportedCapabilitiesList().isEmpty() ? null : proto.getSupportedCapabilitiesList(),
Capabilities.fromIntList(proto.getSupportedCapabilitiesList()),
messageVersion);
}
}

View File

@ -28,7 +28,6 @@ import io.bisq.generated.protobuffer.PB;
import com.google.protobuf.ByteString;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -45,11 +44,11 @@ import javax.annotation.Nullable;
public final class PreliminaryGetDataRequest extends GetDataRequest implements AnonymousMessage, SupportedCapabilitiesMessage {
// ordinals of enum
@Nullable
private final List<Integer> supportedCapabilities;
private final Capabilities supportedCapabilities;
public PreliminaryGetDataRequest(int nonce,
Set<byte[]> excludedKeys) {
this(nonce, excludedKeys, Capabilities.getSupportedCapabilities(), Version.getP2PMessageVersion());
this(nonce, excludedKeys, Capabilities.app, Version.getP2PMessageVersion());
}
@ -59,7 +58,7 @@ public final class PreliminaryGetDataRequest extends GetDataRequest implements A
private PreliminaryGetDataRequest(int nonce,
Set<byte[]> excludedKeys,
@Nullable List<Integer> supportedCapabilities,
@Nullable Capabilities supportedCapabilities,
int messageVersion) {
super(messageVersion, nonce, excludedKeys);
@ -74,7 +73,7 @@ public final class PreliminaryGetDataRequest extends GetDataRequest implements A
.map(ByteString::copyFrom)
.collect(Collectors.toList()));
Optional.ofNullable(supportedCapabilities).ifPresent(e -> builder.addAllSupportedCapabilities(supportedCapabilities));
Optional.ofNullable(supportedCapabilities).ifPresent(e -> builder.addAllSupportedCapabilities(Capabilities.toIntList(supportedCapabilities)));
return getNetworkEnvelopeBuilder()
.setPreliminaryGetDataRequest(builder)
@ -84,7 +83,7 @@ public final class PreliminaryGetDataRequest extends GetDataRequest implements A
public static PreliminaryGetDataRequest fromProto(PB.PreliminaryGetDataRequest proto, int messageVersion) {
return new PreliminaryGetDataRequest(proto.getNonce(),
ProtoUtil.byteSetFromProtoByteStringList(proto.getExcludedKeysList()),
proto.getSupportedCapabilitiesList().isEmpty() ? null : proto.getSupportedCapabilitiesList(),
Capabilities.fromIntList(proto.getSupportedCapabilitiesList()),
messageVersion);
}
}

View File

@ -20,14 +20,13 @@ package bisq.network.p2p.peers.peerexchange;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.network.SupportedCapabilitiesListener;
import bisq.common.app.Capabilities;
import bisq.common.proto.network.NetworkPayload;
import bisq.common.proto.persistable.PersistablePayload;
import io.bisq.generated.protobuffer.PB;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
@ -39,31 +38,28 @@ import javax.annotation.Nullable;
@Getter
@EqualsAndHashCode(exclude = {"date"}) // failedConnectionAttempts is transient and therefore excluded anyway
@Slf4j
public final class Peer implements NetworkPayload, PersistablePayload, SupportedCapabilitiesListener {
public final class Peer extends Capabilities implements NetworkPayload, PersistablePayload, SupportedCapabilitiesListener {
private static final int MAX_FAILED_CONNECTION_ATTEMPTS = 5;
private final NodeAddress nodeAddress;
private final long date;
// Added in v. 0.7.1
@Setter
private List<Integer> supportedCapabilities = new ArrayList<>();
@Setter
transient private int failedConnectionAttempts = 0;
public Peer(NodeAddress nodeAddress, @Nullable List<Integer> supportedCapabilities) {
this(nodeAddress, new Date().getTime(),
supportedCapabilities == null ? new ArrayList<>() : supportedCapabilities);
public Peer(NodeAddress nodeAddress, @Nullable Capabilities supportedCapabilities) {
this(nodeAddress, new Date().getTime(), supportedCapabilities);
}
///////////////////////////////////////////////////////////////////////////////////////////
// PROTO BUFFER
///////////////////////////////////////////////////////////////////////////////////////////
private Peer(NodeAddress nodeAddress, long date, List<Integer> supportedCapabilities) {
private Peer(NodeAddress nodeAddress, long date, Capabilities supportedCapabilities) {
super(supportedCapabilities);
this.nodeAddress = nodeAddress;
this.date = date;
this.supportedCapabilities = supportedCapabilities;
}
@Override
@ -71,15 +67,14 @@ public final class Peer implements NetworkPayload, PersistablePayload, Supported
return PB.Peer.newBuilder()
.setNodeAddress(nodeAddress.toProtoMessage())
.setDate(date)
.addAllSupportedCapabilities(supportedCapabilities)
.addAllSupportedCapabilities(Capabilities.toIntList(this))
.build();
}
public static Peer fromProto(PB.Peer proto) {
return new Peer(NodeAddress.fromProto(proto.getNodeAddress()),
proto.getDate(),
proto.getSupportedCapabilitiesList().isEmpty() ?
new ArrayList<>() : new ArrayList<>(proto.getSupportedCapabilitiesList()));
Capabilities.fromIntList(proto.getSupportedCapabilitiesList()));
}
@ -100,9 +95,9 @@ public final class Peer implements NetworkPayload, PersistablePayload, Supported
}
@Override
public void onChanged(List<Integer> supportedCapabilities) {
if (supportedCapabilities != null && !supportedCapabilities.isEmpty())
this.supportedCapabilities = supportedCapabilities;
public void onChanged(Capabilities supportedCapabilities) {
if (supportedCapabilities.hasCapabilities())
resetCapabilities(supportedCapabilities);
}
@ -110,7 +105,7 @@ public final class Peer implements NetworkPayload, PersistablePayload, Supported
public String toString() {
return "Peer{" +
"\n nodeAddress=" + nodeAddress +
",\n supportedCapabilities=" + supportedCapabilities +
",\n supportedCapabilities=" + capabilities +
",\n failedConnectionAttempts=" + failedConnectionAttempts +
",\n date=" + date +
"\n}";

View File

@ -29,7 +29,6 @@ import bisq.common.proto.network.NetworkEnvelope;
import io.bisq.generated.protobuffer.PB;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -48,10 +47,10 @@ public final class GetPeersRequest extends NetworkEnvelope implements PeerExchan
private final int nonce;
private final Set<Peer> reportedPeers;
@Nullable
private final List<Integer> supportedCapabilities;
private final Capabilities supportedCapabilities;
public GetPeersRequest(NodeAddress senderNodeAddress, int nonce, Set<Peer> reportedPeers) {
this(senderNodeAddress, nonce, reportedPeers, Capabilities.getSupportedCapabilities(), Version.getP2PMessageVersion());
this(senderNodeAddress, nonce, reportedPeers, Capabilities.app, Version.getP2PMessageVersion());
}
@ -62,7 +61,7 @@ public final class GetPeersRequest extends NetworkEnvelope implements PeerExchan
private GetPeersRequest(NodeAddress senderNodeAddress,
int nonce,
Set<Peer> reportedPeers,
@Nullable List<Integer> supportedCapabilities,
@Nullable Capabilities supportedCapabilities,
int messageVersion) {
super(messageVersion);
checkNotNull(senderNodeAddress, "senderNodeAddress must not be null at GetPeersRequest");
@ -81,7 +80,7 @@ public final class GetPeersRequest extends NetworkEnvelope implements PeerExchan
.map(Peer::toProtoMessage)
.collect(Collectors.toList()));
Optional.ofNullable(supportedCapabilities).ifPresent(e -> builder.addAllSupportedCapabilities(supportedCapabilities));
Optional.ofNullable(supportedCapabilities).ifPresent(e -> builder.addAllSupportedCapabilities(Capabilities.toIntList(supportedCapabilities)));
return getNetworkEnvelopeBuilder()
.setGetPeersRequest(builder)
@ -94,7 +93,7 @@ public final class GetPeersRequest extends NetworkEnvelope implements PeerExchan
new HashSet<>(proto.getReportedPeersList().stream()
.map(Peer::fromProto)
.collect(Collectors.toSet())),
proto.getSupportedCapabilitiesList().isEmpty() ? null : proto.getSupportedCapabilitiesList(),
Capabilities.fromIntList(proto.getSupportedCapabilitiesList()),
messageVersion);
}
}

View File

@ -28,7 +28,6 @@ import bisq.common.proto.network.NetworkEnvelope;
import io.bisq.generated.protobuffer.PB;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -44,10 +43,10 @@ public final class GetPeersResponse extends NetworkEnvelope implements PeerExcha
private final int requestNonce;
private final Set<Peer> reportedPeers;
@Nullable
private final List<Integer> supportedCapabilities;
private final Capabilities supportedCapabilities;
public GetPeersResponse(int requestNonce, Set<Peer> reportedPeers) {
this(requestNonce, reportedPeers, Capabilities.getSupportedCapabilities(), Version.getP2PMessageVersion());
this(requestNonce, reportedPeers, Capabilities.app, Version.getP2PMessageVersion());
}
@ -57,7 +56,7 @@ public final class GetPeersResponse extends NetworkEnvelope implements PeerExcha
private GetPeersResponse(int requestNonce,
Set<Peer> reportedPeers,
@Nullable List<Integer> supportedCapabilities,
@Nullable Capabilities supportedCapabilities,
int messageVersion) {
super(messageVersion);
this.requestNonce = requestNonce;
@ -73,7 +72,7 @@ public final class GetPeersResponse extends NetworkEnvelope implements PeerExcha
.map(Peer::toProtoMessage)
.collect(Collectors.toList()));
Optional.ofNullable(supportedCapabilities).ifPresent(e -> builder.addAllSupportedCapabilities(supportedCapabilities));
Optional.ofNullable(supportedCapabilities).ifPresent(e -> builder.addAllSupportedCapabilities(Capabilities.toIntList(supportedCapabilities)));
return getNetworkEnvelopeBuilder()
.setGetPeersResponse(builder)
@ -86,12 +85,12 @@ public final class GetPeersResponse extends NetworkEnvelope implements PeerExcha
.map(peer -> {
NodeAddress nodeAddress = new NodeAddress(peer.getNodeAddress().getHostName(),
peer.getNodeAddress().getPort());
return new Peer(nodeAddress, peer.getSupportedCapabilitiesList());
return new Peer(nodeAddress, Capabilities.fromIntList(peer.getSupportedCapabilitiesList()));
})
.collect(Collectors.toCollection(HashSet::new));
return new GetPeersResponse(proto.getRequestNonce(),
reportedPeers,
proto.getSupportedCapabilitiesList().isEmpty() ? null : proto.getSupportedCapabilitiesList(),
Capabilities.fromIntList(proto.getSupportedCapabilitiesList()),
messageVersion);
}
}

View File

@ -17,10 +17,9 @@
package bisq.network.p2p.storage.payload;
import bisq.common.app.Capabilities;
import bisq.common.proto.network.NetworkPayload;
import java.util.List;
/**
* Used for payloads which requires certain capability.
* <p/>
@ -31,5 +30,5 @@ public interface CapabilityRequiringPayload extends NetworkPayload {
/**
* @return Capabilities the other node need to support to receive that message
*/
List<Integer> getRequiredCapabilities();
Capabilities getRequiredCapabilities();
}

View File

@ -25,10 +25,13 @@ import bisq.core.app.misc.ModuleForAppWithP2p;
import bisq.common.UserThread;
import bisq.common.app.AppModule;
import bisq.common.app.Capabilities;
import bisq.common.app.Capability;
import bisq.common.setup.CommonSetup;
import joptsimple.OptionSet;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@ -60,7 +63,10 @@ public class SeedNodeMain extends ExecutableForAppWithP2p {
@Override
protected void addCapabilities() {
Capabilities.addCapability(Capabilities.Capability.SEED_NODE.ordinal());
// TODO got even worse after refactoring
List<Integer> current = Capabilities.toIntList(Capabilities.app);
current.add(Capability.SEED_NODE.ordinal());
Capabilities.app.resetCapabilities(Capabilities.fromIntList(current));
}
@Override