Merge pull request #4610 from chimp1984/remove-address-prefix

Remove address prefix
This commit is contained in:
Christoph Atteneder 2020-10-09 06:43:33 +02:00 committed by GitHub
commit 352d954489
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 714 additions and 467 deletions

View File

@ -98,6 +98,10 @@ public class Capabilities {
return this.capabilities.containsAll(Arrays.asList(capabilities));
}
public boolean contains(Capability capability) {
return this.capabilities.contains(capability);
}
public boolean isEmpty() {
return capabilities.isEmpty();
}
@ -180,4 +184,18 @@ public class Capabilities {
public int size() {
return capabilities.size();
}
// We return true if our capabilities have less capabilities than the parameter value
public boolean hasLess(Capabilities other) {
return findHighestCapability(this) < findHighestCapability(other);
}
// We use the sum of all capabilities. Alternatively we could use the highest entry.
// Neither would support removal of past capabilities, a use case we never had so far and which might have
// backward compatibility issues, so we should treat capabilities as an append-only data structure.
public int findHighestCapability(Capabilities capabilities) {
return (int) capabilities.capabilities.stream()
.mapToLong(e -> (long) e.ordinal())
.sum();
}
}

View File

@ -40,5 +40,6 @@ public enum Capability {
SIGNED_ACCOUNT_AGE_WITNESS, // Supports the signed account age witness feature
MEDIATION, // Supports mediation feature
REFUND_AGENT, // Supports refund agents
TRADE_STATISTICS_HASH_UPDATE // We changed the hash method in 1.2.0 and that requires update to 1.2.2 for handling it correctly, otherwise the seed nodes have to process too much data.
TRADE_STATISTICS_HASH_UPDATE, // We changed the hash method in 1.2.0 and that requires update to 1.2.2 for handling it correctly, otherwise the seed nodes have to process too much data.
NO_ADDRESS_PRE_FIX // At 1.4.0 we removed the prefix filter for mailbox messages. If a peer has that capability we do not sent the prefix.
}

View File

@ -23,6 +23,7 @@ import java.util.HashSet;
import org.junit.Test;
import static bisq.common.app.Capability.DAO_FULL_NODE;
import static bisq.common.app.Capability.SEED_NODE;
import static bisq.common.app.Capability.TRADE_STATISTICS;
import static bisq.common.app.Capability.TRADE_STATISTICS_2;
@ -40,6 +41,47 @@ public class CapabilitiesTest {
assertFalse(DUT.containsAll(new Capabilities(SEED_NODE)));
}
@Test
public void testHasLess() {
assertTrue(new Capabilities().hasLess(new Capabilities(SEED_NODE)));
assertFalse(new Capabilities().hasLess(new Capabilities()));
assertFalse(new Capabilities(SEED_NODE).hasLess(new Capabilities()));
assertTrue(new Capabilities(SEED_NODE).hasLess(new Capabilities(DAO_FULL_NODE)));
assertFalse(new Capabilities(DAO_FULL_NODE).hasLess(new Capabilities(SEED_NODE)));
Capabilities all = new Capabilities(
TRADE_STATISTICS,
TRADE_STATISTICS_2,
Capability.ACCOUNT_AGE_WITNESS,
Capability.ACK_MSG,
Capability.PROPOSAL,
Capability.BLIND_VOTE,
Capability.DAO_STATE,
Capability.BUNDLE_OF_ENVELOPES,
Capability.MEDIATION,
Capability.SIGNED_ACCOUNT_AGE_WITNESS,
Capability.REFUND_AGENT,
Capability.TRADE_STATISTICS_HASH_UPDATE
);
Capabilities other = new Capabilities(
TRADE_STATISTICS,
TRADE_STATISTICS_2,
Capability.ACCOUNT_AGE_WITNESS,
Capability.ACK_MSG,
Capability.PROPOSAL,
Capability.BLIND_VOTE,
Capability.DAO_STATE,
Capability.BUNDLE_OF_ENVELOPES,
Capability.MEDIATION,
Capability.SIGNED_ACCOUNT_AGE_WITNESS,
Capability.REFUND_AGENT,
Capability.TRADE_STATISTICS_HASH_UPDATE,
Capability.NO_ADDRESS_PRE_FIX
);
assertTrue(all.hasLess(other));
}
@Test
public void testO() {
Capabilities DUT = new Capabilities(TRADE_STATISTICS);

View File

@ -170,7 +170,7 @@ public final class RepublishGovernanceDataHandler {
private void connectToAnyFullNode() {
Capabilities required = new Capabilities(Capability.DAO_FULL_NODE);
List<Peer> list = peerManager.getLivePeers(null).stream()
List<Peer> list = peerManager.getLivePeers().stream()
.filter(peer -> peer.getCapabilities().containsAll(required))
.collect(Collectors.toList());

View File

@ -180,7 +180,7 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
closeHandler(connection);
if (peerManager.isNodeBanned(closeConnectionReason, connection)) {
if (peerManager.isPeerBanned(closeConnectionReason, connection)) {
connection.getPeersNodeAddressOptional().ifPresent(nodeAddress -> {
seedNodeAddresses.remove(nodeAddress);
removeFromRequestBlocksHandlerMap(nodeAddress);

View File

@ -39,7 +39,8 @@ public class CoreNetworkCapabilities {
Capability.MEDIATION,
Capability.SIGNED_ACCOUNT_AGE_WITNESS,
Capability.REFUND_AGENT,
Capability.TRADE_STATISTICS_HASH_UPDATE
Capability.TRADE_STATISTICS_HASH_UPDATE,
Capability.NO_ADDRESS_PRE_FIX
);
if (config.daoActivated) {

View File

@ -214,8 +214,6 @@ public abstract class DisputeAgentManager<T extends DisputeAgent> {
observableMap.putAll(filtered);
observableMap.values().forEach(this::addAcceptedDisputeAgentToUser);
log.info("Available disputeAgents: {}", observableMap.keySet());
}

View File

@ -30,6 +30,7 @@ import bisq.network.p2p.DecryptedDirectMessageListener;
import bisq.network.p2p.DecryptedMessageWithPubKey;
import bisq.network.p2p.MailboxMessage;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.P2PService;
import bisq.network.p2p.SendMailboxMessageListener;
import bisq.network.p2p.messaging.DecryptedMailboxListener;
@ -77,8 +78,8 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
processModel.getP2PService().addDecryptedDirectMessageListener(this);
}
processModel.getP2PService().addDecryptedMailboxListener(this);
processModel.getP2PService().getMailboxMap().values()
.stream().map(e -> e.second)
processModel.getP2PService().getMailboxItemsByUid().values()
.stream().map(P2PService.MailboxItem::getDecryptedMessageWithPubKey)
.forEach(this::handleDecryptedMessageWithPubKey);
}

View File

@ -133,7 +133,7 @@ public class P2PNetworkLoad extends Metric implements MessageListener, SetupList
networkProtoResolver);
DefaultSeedNodeRepository seedNodeRepository = new DefaultSeedNodeRepository(config);
PeerManager peerManager = new PeerManager(networkNode, seedNodeRepository, new ClockWatcher(),
maxConnections, new PersistenceManager<>(storageDir, persistenceProtoResolver, corruptedStorageFileHandler));
new PersistenceManager<>(storageDir, persistenceProtoResolver, corruptedStorageFileHandler), maxConnections);
// init file storage
peerManager.readPersisted();

View File

@ -19,5 +19,5 @@ package bisq.network.p2p;
public interface DecryptedDirectMessageListener {
void onDirectMessage(DecryptedMessageWithPubKey decryptedMessageWithPubKey, @SuppressWarnings("UnusedParameters") NodeAddress peerNodeAddress);
void onDirectMessage(DecryptedMessageWithPubKey decryptedMessageWithPubKey, NodeAddress peerNodeAddress);
}

View File

@ -31,7 +31,6 @@ import bisq.network.p2p.peers.Broadcaster;
import bisq.network.p2p.peers.PeerManager;
import bisq.network.p2p.peers.getdata.RequestDataManager;
import bisq.network.p2p.peers.keepalive.KeepAliveManager;
import bisq.network.p2p.peers.peerexchange.Peer;
import bisq.network.p2p.peers.peerexchange.PeerExchangeManager;
import bisq.network.p2p.seed.SeedNodeRepository;
import bisq.network.p2p.storage.HashMapChangedListener;
@ -46,19 +45,23 @@ import bisq.network.p2p.storage.payload.ProtectedStorageEntry;
import bisq.network.p2p.storage.payload.ProtectedStoragePayload;
import bisq.common.UserThread;
import bisq.common.app.Capabilities;
import bisq.common.app.Capability;
import bisq.common.crypto.CryptoException;
import bisq.common.crypto.KeyRing;
import bisq.common.crypto.PubKeyRing;
import bisq.common.proto.ProtobufferException;
import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.proto.persistable.PersistedDataHost;
import bisq.common.util.Tuple2;
import bisq.common.util.Utilities;
import com.google.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
@ -74,21 +77,26 @@ import javafx.beans.property.SimpleIntegerProperty;
import java.security.PublicKey;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.Getter;
import lombok.Value;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -118,7 +126,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private final Set<DecryptedMailboxListener> decryptedMailboxListeners = new CopyOnWriteArraySet<>();
private final Set<P2PServiceListener> p2pServiceListeners = new CopyOnWriteArraySet<>();
@Getter
private final Map<String, Tuple2<ProtectedMailboxStorageEntry, DecryptedMessageWithPubKey>> mailboxMap = new HashMap<>();
private final Map<String, MailboxItem> mailboxItemsByUid = new HashMap<>();
private final Set<Runnable> shutDownResultHandlers = new CopyOnWriteArraySet<>();
private final BooleanProperty hiddenServicePublished = new SimpleBooleanProperty();
private final BooleanProperty preliminaryDataReceived = new SimpleBooleanProperty();
@ -164,7 +172,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
this.networkNode.addConnectionListener(this);
this.networkNode.addMessageListener(this);
this.p2PDataStorage.addHashMapChangedListener(this);
this.requestDataManager.addListener(this);
// We need to have both the initial data delivered and the hidden service published
@ -197,13 +204,11 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
public void onAllServicesInitialized() {
if (networkNode.getNodeAddress() != null) {
maybeProcessAllMailboxEntries();
myNodeAddress = networkNode.getNodeAddress();
} else {
// If our HS is still not published
networkNode.nodeAddressProperty().addListener((observable, oldValue, newValue) -> {
if (newValue != null) {
maybeProcessAllMailboxEntries();
myNodeAddress = networkNode.getNodeAddress();
}
});
@ -280,12 +285,13 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
boolean seedNodesAvailable = requestDataManager.requestPreliminaryData();
keepAliveManager.start();
p2pServiceListeners.stream().forEach(SetupListener::onTorNodeReady);
p2pServiceListeners.forEach(SetupListener::onTorNodeReady);
if (!seedNodesAvailable) {
isBootstrapped = true;
maybeProcessAllMailboxEntries();
p2pServiceListeners.stream().forEach(P2PServiceListener::onNoSeedNodeAvailable);
// As we do not expect a updated data request response we start here with addHashMapChangedListenerAndApply
addHashMapChangedListenerAndApply();
p2pServiceListeners.forEach(P2PServiceListener::onNoSeedNodeAvailable);
}
}
@ -295,17 +301,17 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
hiddenServicePublished.set(true);
p2pServiceListeners.stream().forEach(SetupListener::onHiddenServicePublished);
p2pServiceListeners.forEach(SetupListener::onHiddenServicePublished);
}
@Override
public void onSetupFailed(Throwable throwable) {
p2pServiceListeners.stream().forEach(e -> e.onSetupFailed(throwable));
p2pServiceListeners.forEach(e -> e.onSetupFailed(throwable));
}
@Override
public void onRequestCustomBridges() {
p2pServiceListeners.stream().forEach(SetupListener::onRequestCustomBridges);
p2pServiceListeners.forEach(SetupListener::onRequestCustomBridges);
}
// Called from networkReadyBinding
@ -317,8 +323,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
"seedNodeOfPreliminaryDataRequest must be present");
requestDataManager.requestUpdateData();
/*if (Capabilities.app.containsAll(Capability.SEED_NODE))
UserThread.runPeriodically(() -> requestDataManager.requestUpdateData(), 1, TimeUnit.HOURS);*/
// If we start up first time we don't have any peers so we need to request from seed node.
// As well it can be that the persisted peer list is outdated with dead peers.
@ -346,25 +350,27 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
public void onUpdatedDataReceived() {
if (!isBootstrapped) {
isBootstrapped = true;
maybeProcessAllMailboxEntries();
p2pServiceListeners.stream().forEach(P2PServiceListener::onUpdatedDataReceived);
// Only now we start listening and processing. The p2PDataStorage is our cache for data we have received
// after the hidden service was ready.
addHashMapChangedListenerAndApply();
p2pServiceListeners.forEach(P2PServiceListener::onUpdatedDataReceived);
p2PDataStorage.onBootstrapComplete();
}
}
@Override
public void onNoSeedNodeAvailable() {
p2pServiceListeners.stream().forEach(P2PServiceListener::onNoSeedNodeAvailable);
p2pServiceListeners.forEach(P2PServiceListener::onNoSeedNodeAvailable);
}
@Override
public void onNoPeersAvailable() {
p2pServiceListeners.stream().forEach(P2PServiceListener::onNoPeersAvailable);
p2pServiceListeners.forEach(P2PServiceListener::onNoPeersAvailable);
}
@Override
public void onDataReceived() {
p2pServiceListeners.stream().forEach(P2PServiceListener::onDataReceived);
p2pServiceListeners.forEach(P2PServiceListener::onDataReceived);
}
@ -398,58 +404,121 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if (networkEnvelope instanceof PrefixedSealedAndSignedMessage) {
// Seed nodes don't have set the encryptionService
PrefixedSealedAndSignedMessage sealedMsg = (PrefixedSealedAndSignedMessage) networkEnvelope;
connection.setPeerType(Connection.PeerType.DIRECT_MSG_PEER);
try {
PrefixedSealedAndSignedMessage prefixedSealedAndSignedMessage = (PrefixedSealedAndSignedMessage) networkEnvelope;
if (verifyAddressPrefixHash(prefixedSealedAndSignedMessage)) {
// We set connectionType to that connection to avoid that is get closed when
// we get too many connection attempts.
connection.setPeerType(Connection.PeerType.DIRECT_MSG_PEER);
log.debug("Try to decrypt...");
DecryptedMessageWithPubKey decryptedMessageWithPubKey = encryptionService.decryptAndVerify(
prefixedSealedAndSignedMessage.getSealedAndSigned());
log.debug("\n\nDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD\n" +
"Decrypted SealedAndSignedMessage:\ndecryptedMsgWithPubKey={}"
+ "\nDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD\n", decryptedMessageWithPubKey);
if (connection.getPeersNodeAddressOptional().isPresent())
decryptedDirectMessageListeners.forEach(
e -> e.onDirectMessage(decryptedMessageWithPubKey, connection.getPeersNodeAddressOptional().get()));
else
log.error("peersNodeAddress is not available at onMessage.");
} else {
log.debug("Wrong receiverAddressMaskHash. The message is not intended for us.");
}
DecryptedMessageWithPubKey decryptedMsg = encryptionService.decryptAndVerify(sealedMsg.getSealedAndSigned());
connection.getPeersNodeAddressOptional().ifPresentOrElse(nodeAddress ->
decryptedDirectMessageListeners.forEach(e -> e.onDirectMessage(decryptedMsg, nodeAddress)),
() -> {
log.error("peersNodeAddress is expected to be available at onMessage for " +
"processing PrefixedSealedAndSignedMessage.");
});
} catch (CryptoException e) {
log.debug(networkEnvelope.toString());
log.debug(e.toString());
log.debug("Decryption of prefixedSealedAndSignedMessage.sealedAndSigned failed. " +
"That is expected if the message is not intended for us.");
log.warn("Decryption of a direct message failed. This is not expected as the " +
"direct message was sent to our node.");
} catch (ProtobufferException e) {
log.error("Protobuffer data could not be processed: {}", e.toString());
log.error("ProtobufferException at decryptAndVerify: {}", e.toString());
e.getStackTrace();
}
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// HashMapChangedListener implementation
// HashMapChangedListener implementation for ProtectedStorageEntry items
///////////////////////////////////////////////////////////////////////////////////////////
private void addHashMapChangedListenerAndApply() {
p2PDataStorage.addHashMapChangedListener(this);
onAdded(p2PDataStorage.getMap().values());
}
@Override
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry)
processMailboxEntry((ProtectedMailboxStorageEntry) protectedStorageEntry);
});
Collection<ProtectedMailboxStorageEntry> entries = protectedStorageEntries.stream()
.filter(e -> e instanceof ProtectedMailboxStorageEntry)
.map(e -> (ProtectedMailboxStorageEntry) e)
.filter(e -> networkNode.getNodeAddress() != null)
.filter(e -> !seedNodeRepository.isSeedNode(networkNode.getNodeAddress())) // Seed nodes don't expect mailbox messages
.collect(Collectors.toSet());
if (entries.size() > 1) {
threadedBatchProcessMailboxEntries(entries);
} else if (entries.size() == 1) {
processSingleMailboxEntry(entries);
}
}
@Override
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
// not used
private void processSingleMailboxEntry(Collection<ProtectedMailboxStorageEntry> protectedMailboxStorageEntries) {
checkArgument(protectedMailboxStorageEntries.size() == 1);
var decryptedEntries = new ArrayList<>(getDecryptedEntries(protectedMailboxStorageEntries));
if (decryptedEntries.size() == 1) {
storeMailboxDataAndNotifyListeners(decryptedEntries.get(0));
}
}
// We run the batch processing of all mailbox messages we have received at startup in a thread to not block the UI.
// For about 1000 messages decryption takes about 1 sec.
private void threadedBatchProcessMailboxEntries(Collection<ProtectedMailboxStorageEntry> protectedMailboxStorageEntries) {
ListeningExecutorService executor = Utilities.getSingleThreadListeningExecutor("processMailboxEntry-" + new Random().nextInt(1000));
long ts = System.currentTimeMillis();
ListenableFuture<Set<MailboxItem>> future = executor.submit(() -> {
var decryptedEntries = getDecryptedEntries(protectedMailboxStorageEntries);
log.info("Batch processing of {} mailbox entries took {} ms",
protectedMailboxStorageEntries.size(),
System.currentTimeMillis() - ts);
return decryptedEntries;
});
Futures.addCallback(future, new FutureCallback<>() {
public void onSuccess(Set<MailboxItem> decryptedMailboxMessageWithEntries) {
UserThread.execute(() -> decryptedMailboxMessageWithEntries.forEach(e -> storeMailboxDataAndNotifyListeners(e)));
}
public void onFailure(@NotNull Throwable throwable) {
log.error(throwable.toString());
}
}, MoreExecutors.directExecutor());
}
private Set<MailboxItem> getDecryptedEntries(Collection<ProtectedMailboxStorageEntry> protectedMailboxStorageEntries) {
Set<MailboxItem> decryptedMailboxMessageWithEntries = new HashSet<>();
protectedMailboxStorageEntries.stream()
.map(this::decryptProtectedMailboxStorageEntry)
.filter(Objects::nonNull)
.forEach(decryptedMailboxMessageWithEntries::add);
return decryptedMailboxMessageWithEntries;
}
@Nullable
private MailboxItem decryptProtectedMailboxStorageEntry(ProtectedMailboxStorageEntry protectedMailboxStorageEntry) {
try {
DecryptedMessageWithPubKey decryptedMessageWithPubKey = encryptionService.decryptAndVerify(protectedMailboxStorageEntry
.getMailboxStoragePayload()
.getPrefixedSealedAndSignedMessage()
.getSealedAndSigned());
checkArgument(decryptedMessageWithPubKey.getNetworkEnvelope() instanceof MailboxMessage);
return new MailboxItem(protectedMailboxStorageEntry, decryptedMessageWithPubKey);
} catch (CryptoException ignore) {
// Expected if message was not intended for us
} catch (ProtobufferException e) {
log.error(e.toString());
e.getStackTrace();
}
return null;
}
private void storeMailboxDataAndNotifyListeners(MailboxItem mailboxItem) {
DecryptedMessageWithPubKey decryptedMessageWithPubKey = mailboxItem.getDecryptedMessageWithPubKey();
MailboxMessage mailboxMessage = (MailboxMessage) decryptedMessageWithPubKey.getNetworkEnvelope();
NodeAddress sender = mailboxMessage.getSenderNodeAddress();
mailboxItemsByUid.put(mailboxMessage.getUid(), mailboxItem);
log.info("Received a {} mailbox message with uid {} and senderAddress {}",
mailboxMessage.getClass().getSimpleName(), mailboxMessage.getUid(), sender);
decryptedMailboxListeners.forEach(e -> e.onMailboxMessageAdded(decryptedMessageWithPubKey, sender));
}
///////////////////////////////////////////////////////////////////////////////////////////
// DirectMessages
///////////////////////////////////////////////////////////////////////////////////////////
@ -486,13 +555,15 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
log.debug("\n\nEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE\n" +
"Encrypt message:\nmessage={}"
+ "\nEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE\n", message);
PrefixedSealedAndSignedMessage prefixedSealedAndSignedMessage = new PrefixedSealedAndSignedMessage(
networkNode.getNodeAddress(),
encryptionService.encryptAndSign(pubKeyRing, message),
peersNodeAddress.getAddressPrefixHash(),
UUID.randomUUID().toString());
SettableFuture<Connection> future = networkNode.sendMessage(peersNodeAddress, prefixedSealedAndSignedMessage);
Futures.addCallback(future, new FutureCallback<Connection>() {
// Prefix is not needed for direct messages but as old code is doing the verification we still need to
// send it if peer has not updated.
PrefixedSealedAndSignedMessage sealedMsg = getPrefixedSealedAndSignedMessage(peersNodeAddress,
pubKeyRing,
message);
SettableFuture<Connection> future = networkNode.sendMessage(peersNodeAddress, sealedMsg);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Connection connection) {
sendDirectMessageListener.onArrived();
@ -513,48 +584,30 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
}
private PrefixedSealedAndSignedMessage getPrefixedSealedAndSignedMessage(NodeAddress peersNodeAddress,
PubKeyRing pubKeyRing,
NetworkEnvelope message) throws CryptoException {
byte[] addressPrefixHash;
if (peerManager.peerHasCapability(peersNodeAddress, Capability.NO_ADDRESS_PRE_FIX)) {
// The peer has an updated version so we do not need to send the prefix.
// We cannot use null as not updated nodes would get a nullPointer at protobuf serialisation.
addressPrefixHash = new byte[0];
} else {
addressPrefixHash = peersNodeAddress.getAddressPrefixHash();
}
return new PrefixedSealedAndSignedMessage(
networkNode.getNodeAddress(),
encryptionService.encryptAndSign(pubKeyRing, message),
addressPrefixHash,
UUID.randomUUID().toString());
}
///////////////////////////////////////////////////////////////////////////////////////////
// MailboxMessages
///////////////////////////////////////////////////////////////////////////////////////////
private void processMailboxEntry(ProtectedMailboxStorageEntry protectedMailboxStorageEntry) {
NodeAddress nodeAddress = networkNode.getNodeAddress();
// Seed nodes don't receive mailbox network_messages
if (nodeAddress != null && !seedNodeRepository.isSeedNode(nodeAddress)) {
MailboxStoragePayload mailboxStoragePayload = protectedMailboxStorageEntry.getMailboxStoragePayload();
PrefixedSealedAndSignedMessage prefixedSealedAndSignedMessage = mailboxStoragePayload.getPrefixedSealedAndSignedMessage();
if (verifyAddressPrefixHash(prefixedSealedAndSignedMessage)) {
try {
DecryptedMessageWithPubKey decryptedMessageWithPubKey = encryptionService.decryptAndVerify(
prefixedSealedAndSignedMessage.getSealedAndSigned());
if (decryptedMessageWithPubKey.getNetworkEnvelope() instanceof MailboxMessage) {
MailboxMessage mailboxMessage = (MailboxMessage) decryptedMessageWithPubKey.getNetworkEnvelope();
NodeAddress senderNodeAddress = mailboxMessage.getSenderNodeAddress();
checkNotNull(senderNodeAddress, "senderAddress must not be null for mailbox network_messages");
mailboxMap.put(mailboxMessage.getUid(), new Tuple2<>(protectedMailboxStorageEntry, decryptedMessageWithPubKey));
log.info("Received a {} mailbox message with messageUid {} and senderAddress {}", mailboxMessage.getClass().getSimpleName(), mailboxMessage.getUid(), senderNodeAddress);
decryptedMailboxListeners.forEach(
e -> e.onMailboxMessageAdded(decryptedMessageWithPubKey, senderNodeAddress));
} else {
log.warn("tryDecryptMailboxData: Expected MailboxMessage but got other type. " +
"decryptedMsgWithPubKey.message={}", decryptedMessageWithPubKey.getNetworkEnvelope());
}
} catch (CryptoException e) {
log.debug(e.toString());
log.debug("Decryption of prefixedSealedAndSignedMessage.sealedAndSigned failed. " +
"That is expected if the message is not intended for us.");
} catch (ProtobufferException e) {
log.error("Protobuffer data could not be processed: {}", e.toString());
}
} else {
log.trace("Wrong blurredAddressHash. The message is not intended for us.");
}
}
}
public void sendEncryptedMailboxMessage(NodeAddress peersNodeAddress, PubKeyRing peersPubKeyRing,
public void sendEncryptedMailboxMessage(NodeAddress peer, PubKeyRing peersPubKeyRing,
NetworkEnvelope message,
SendMailboxMessageListener sendMailboxMessageListener) {
if (peersPubKeyRing == null) {
@ -562,12 +615,10 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
return;
}
checkNotNull(peersNodeAddress,
"PeerAddress must not be null (sendEncryptedMailboxMessage)");
checkNotNull(peer, "PeerAddress must not be null (sendEncryptedMailboxMessage)");
checkNotNull(networkNode.getNodeAddress(),
"My node address must not be null at sendEncryptedMailboxMessage");
checkArgument(!keyRing.getPubKeyRing().equals(peersPubKeyRing),
"We got own keyring instead of that from peer");
checkArgument(!keyRing.getPubKeyRing().equals(peersPubKeyRing), "We got own keyring instead of that from peer");
if (!isBootstrapped())
throw new NetworkNotReadyException();
@ -578,7 +629,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
return;
}
if (capabilityRequiredAndCapabilityNotSupported(peersNodeAddress, message)) {
if (capabilityRequiredAndCapabilityNotSupported(peer, message)) {
sendMailboxMessageListener.onFault("We did not send the EncryptedMailboxMessage " +
"because the peer does not support the capability.");
return;
@ -589,15 +640,12 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
"Encrypt message:\nmessage={}"
+ "\nEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE\n", message);
PrefixedSealedAndSignedMessage prefixedSealedAndSignedMessage = new PrefixedSealedAndSignedMessage(
networkNode.getNodeAddress(),
encryptionService.encryptAndSign(peersPubKeyRing, message),
peersNodeAddress.getAddressPrefixHash(),
UUID.randomUUID().toString());
PrefixedSealedAndSignedMessage prefixedSealedAndSignedMessage = getPrefixedSealedAndSignedMessage(peer,
peersPubKeyRing, message);
log.debug("sendEncryptedMailboxMessage msg={}, peersNodeAddress={}", message, peersNodeAddress);
SettableFuture<Connection> future = networkNode.sendMessage(peersNodeAddress, prefixedSealedAndSignedMessage);
Futures.addCallback(future, new FutureCallback<Connection>() {
log.debug("sendEncryptedMailboxMessage msg={}, peersNodeAddress={}", message, peer);
SettableFuture<Connection> future = networkNode.sendMessage(peer, prefixedSealedAndSignedMessage);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Connection connection) {
sendMailboxMessageListener.onArrived();
@ -624,22 +672,11 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
if (!(message instanceof CapabilityRequiringPayload))
return false;
// We only expect AckMessage so far
if (!(message instanceof AckMessage))
log.warn("We got a CapabilityRequiringPayload for the mailbox message which is not a AckMessage. " +
"peersNodeAddress={}", peersNodeAddress);
Set<Peer> allPeers = peerManager.getPersistedPeers();
allPeers.addAll(peerManager.getReportedPeers());
allPeers.addAll(peerManager.getLivePeers(null));
// We might have multiple entries of the same peer without the supportedCapabilities field set if we received
// it from old versions, so we filter those.
Optional<Peer> optionalPeer = allPeers.stream()
.filter(peer -> peer.getNodeAddress().equals(peersNodeAddress))
.filter(peer -> !peer.getCapabilities().isEmpty())
.findAny();
if (optionalPeer.isPresent()) {
boolean result = optionalPeer.get().getCapabilities().containsAll(((CapabilityRequiringPayload) message).getRequiredCapabilities());
Optional<Capabilities> optionalCapabilities = peerManager.findPeersCapabilities(peersNodeAddress);
if (optionalCapabilities.isPresent()) {
boolean result = optionalCapabilities.get().containsAll(((CapabilityRequiringPayload) message).getRequiredCapabilities());
if (!result)
log.warn("We don't send the message because the peer does not support the required capability. " +
@ -654,15 +691,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
private void maybeProcessAllMailboxEntries() {
if (isBootstrapped) {
p2PDataStorage.getMap().values().forEach(protectedStorageEntry -> {
if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry)
processMailboxEntry((ProtectedMailboxStorageEntry) protectedStorageEntry);
});
}
}
private void addMailboxData(MailboxStoragePayload expirableMailboxStoragePayload,
PublicKey receiversPublicKey,
SendMailboxMessageListener sendMailboxMessageListener) {
@ -742,8 +770,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
MailboxMessage mailboxMessage = (MailboxMessage) decryptedMessageWithPubKey.getNetworkEnvelope();
String uid = mailboxMessage.getUid();
if (mailboxMap.containsKey(uid)) {
ProtectedMailboxStorageEntry mailboxData = mailboxMap.get(uid).first;
if (mailboxItemsByUid.containsKey(uid)) {
ProtectedMailboxStorageEntry mailboxData = mailboxItemsByUid.get(uid).getProtectedMailboxStorageEntry();
if (mailboxData != null && mailboxData.getProtectedStoragePayload() instanceof MailboxStoragePayload) {
MailboxStoragePayload expirableMailboxStoragePayload = (MailboxStoragePayload) mailboxData.getProtectedStoragePayload();
PublicKey receiversPubKey = mailboxData.getReceiversPubKey();
@ -759,7 +787,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
}
mailboxMap.remove(uid);
mailboxItemsByUid.remove(uid);
log.info("Removed successfully decryptedMsgWithPubKey. uid={}", uid);
}
} else {
@ -840,8 +868,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
public void removeP2PServiceListener(P2PServiceListener listener) {
if (p2pServiceListeners.contains(listener))
p2pServiceListeners.remove(listener);
p2pServiceListeners.remove(listener);
}
public void addHashSetChangedListener(HashMapChangedListener hashMapChangedListener) {
@ -893,18 +920,16 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
return keyRing;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private boolean verifyAddressPrefixHash(PrefixedSealedAndSignedMessage prefixedSealedAndSignedMessage) {
if (networkNode.getNodeAddress() != null) {
byte[] blurredAddressHash = networkNode.getNodeAddress().getAddressPrefixHash();
return blurredAddressHash != null &&
Arrays.equals(blurredAddressHash, prefixedSealedAndSignedMessage.getAddressPrefixHash());
} else {
log.debug("myOnionAddress is null at verifyAddressPrefixHash. That is expected at startup.");
return false;
@Value
public class MailboxItem {
private final ProtectedMailboxStorageEntry protectedMailboxStorageEntry;
private final DecryptedMessageWithPubKey decryptedMessageWithPubKey;
public MailboxItem(ProtectedMailboxStorageEntry protectedMailboxStorageEntry,
DecryptedMessageWithPubKey decryptedMessageWithPubKey) {
this.protectedMailboxStorageEntry = protectedMailboxStorageEntry;
this.decryptedMessageWithPubKey = decryptedMessageWithPubKey;
}
}
}

View File

@ -33,7 +33,11 @@ import static com.google.common.base.Preconditions.checkNotNull;
public final class PrefixedSealedAndSignedMessage extends NetworkEnvelope implements MailboxMessage, SendersNodeAddressMessage {
private final NodeAddress senderNodeAddress;
private final SealedAndSigned sealedAndSigned;
// From v1.4.0 on addressPrefixHash can be an empty byte array.
// We cannot make it nullable as not updated nodes would get a nullPointer exception at protobuf serialisation.
private final byte[] addressPrefixHash;
private final String uid;
public PrefixedSealedAndSignedMessage(NodeAddress senderNodeAddress,
@ -71,7 +75,8 @@ public final class PrefixedSealedAndSignedMessage extends NetworkEnvelope implem
.build();
}
public static PrefixedSealedAndSignedMessage fromProto(protobuf.PrefixedSealedAndSignedMessage proto, int messageVersion) {
public static PrefixedSealedAndSignedMessage fromProto(protobuf.PrefixedSealedAndSignedMessage proto,
int messageVersion) {
return new PrefixedSealedAndSignedMessage(NodeAddress.fromProto(proto.getNodeAddress()),
SealedAndSigned.fromProto(proto.getSealedAndSigned()),
proto.getAddressPrefixHash().toByteArray(),

View File

@ -159,6 +159,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
private final List<Long> messageTimeStamps = new ArrayList<>();
private final CopyOnWriteArraySet<MessageListener> messageListeners = new CopyOnWriteArraySet<>();
private volatile long lastSendTimeStamp = 0;
// We use a weak reference here to ensure that no connection causes a memory leak in case it get closed without
// the shutDown being called.
private final CopyOnWriteArraySet<WeakReference<SupportedCapabilitiesListener>> capabilitiesListeners = new CopyOnWriteArraySet<>();
@Getter
@ -514,6 +516,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
} finally {
protoOutputStream.onConnectionShutdown();
capabilitiesListeners.clear();
try {
protoInputStream.close();
} catch (IOException e) {
@ -559,7 +563,6 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
'}';
}
@SuppressWarnings("unused")
public String printDetails() {
String portInfo;
if (socket.getLocalPort() == 0)
@ -783,30 +786,9 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
}
if (networkEnvelope instanceof SupportedCapabilitiesMessage) {
Capabilities supportedCapabilities = ((SupportedCapabilitiesMessage) networkEnvelope).getSupportedCapabilities();
if (supportedCapabilities != null) {
if (!capabilities.equals(supportedCapabilities)) {
capabilities.set(supportedCapabilities);
// Capabilities can be empty. We only check for mandatory if we get some capabilities.
if (!capabilities.isEmpty() && !Capabilities.hasMandatoryCapability(capabilities)) {
String senderNodeAddress = networkEnvelope instanceof SendersNodeAddressMessage ?
((SendersNodeAddressMessage) networkEnvelope).getSenderNodeAddress().getFullAddress() :
"[unknown address]";
log.info("We close a connection to old node {}. " +
"Capabilities of old node: {}, networkEnvelope class name={}",
senderNodeAddress, capabilities.prettyPrint(), networkEnvelope.getClass().getSimpleName());
shutDown(CloseConnectionReason.MANDATORY_CAPABILITIES_NOT_SUPPORTED);
return;
}
capabilitiesListeners.forEach(weakListener -> {
SupportedCapabilitiesListener supportedCapabilitiesListener = weakListener.get();
if (supportedCapabilitiesListener != null) {
UserThread.execute(() -> supportedCapabilitiesListener.onChanged(supportedCapabilities));
}
});
}
boolean causedShutDown = handleSupportedCapabilitiesMessage(networkEnvelope);
if (causedShutDown) {
return;
}
}
@ -882,4 +864,50 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
handleException(t);
}
}
protected boolean handleSupportedCapabilitiesMessage(NetworkEnvelope networkEnvelope) {
Capabilities supportedCapabilities = ((SupportedCapabilitiesMessage) networkEnvelope).getSupportedCapabilities();
if (supportedCapabilities == null || supportedCapabilities.isEmpty()) {
return false;
}
if (this.capabilities.equals(supportedCapabilities)) {
return false;
}
if (!Capabilities.hasMandatoryCapability(supportedCapabilities)) {
log.info("We close a connection because of " +
"CloseConnectionReason.MANDATORY_CAPABILITIES_NOT_SUPPORTED " +
"to node {}. Capabilities of old node: {}, " +
"networkEnvelope class name={}",
getSenderNodeAddressAsString(networkEnvelope),
supportedCapabilities.prettyPrint(),
networkEnvelope.getClass().getSimpleName());
shutDown(CloseConnectionReason.MANDATORY_CAPABILITIES_NOT_SUPPORTED);
return true;
}
this.capabilities.set(supportedCapabilities);
capabilitiesListeners.forEach(weakListener -> {
SupportedCapabilitiesListener supportedCapabilitiesListener = weakListener.get();
if (supportedCapabilitiesListener != null) {
UserThread.execute(() -> supportedCapabilitiesListener.onChanged(supportedCapabilities));
}
});
return false;
}
@Nullable
private NodeAddress getSenderNodeAddress(NetworkEnvelope networkEnvelope) {
return getPeersNodeAddressOptional().orElse(
networkEnvelope instanceof SendersNodeAddressMessage ?
((SendersNodeAddressMessage) networkEnvelope).getSenderNodeAddress() :
null);
}
private String getSenderNodeAddressAsString(NetworkEnvelope networkEnvelope) {
NodeAddress nodeAddress = getSenderNodeAddress(networkEnvelope);
return nodeAddress == null ? "null" : nodeAddress.getFullAddress();
}
}

View File

@ -21,6 +21,7 @@ import bisq.network.p2p.NodeAddress;
import bisq.common.Timer;
import bisq.common.UserThread;
import bisq.common.app.Capabilities;
import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.proto.network.NetworkProtoResolver;
import bisq.common.util.Utilities;
@ -496,4 +497,12 @@ public abstract class NetworkNode implements MessageListener {
public NodeAddress getNodeAddress() {
return nodeAddressProperty.get();
}
public Optional<Capabilities> findPeersCapabilities(NodeAddress nodeAddress) {
return getConfirmedConnections().stream()
.filter(c -> c.getPeersNodeAddressProperty().get() != null)
.filter(c -> c.getPeersNodeAddressProperty().get().equals(nodeAddress))
.map(Connection::getCapabilities)
.findAny();
}
}

View File

@ -50,11 +50,11 @@ class SynchronizedProtoOutputStream extends ProtoOutputStream {
} catch (InterruptedException e) {
Thread currentThread = Thread.currentThread();
currentThread.interrupt();
final String msg = "Thread " + currentThread + " was interrupted. InterruptedException=" + e;
String msg = "Thread " + currentThread + " was interrupted. InterruptedException=" + e;
log.error(msg);
throw new BisqRuntimeException(msg, e);
} catch (ExecutionException e) {
final String msg = "Failed to write envelope. ExecutionException " + e;
String msg = "Failed to write envelope. ExecutionException " + e;
log.error(msg);
throw new BisqRuntimeException(msg, e);
}
@ -65,7 +65,7 @@ class SynchronizedProtoOutputStream extends ProtoOutputStream {
executorService.shutdownNow();
super.onConnectionShutdown();
} catch (Throwable t) {
log.error("Failed to handle connection shutdown. Throwable={}", t);
log.error("Failed to handle connection shutdown. Throwable={}", t.toString());
}
}
}

View File

@ -32,6 +32,7 @@ import bisq.common.ClockWatcher;
import bisq.common.Timer;
import bisq.common.UserThread;
import bisq.common.app.Capabilities;
import bisq.common.app.Capability;
import bisq.common.config.Config;
import bisq.common.persistence.PersistenceManager;
import bisq.common.proto.persistable.PersistedDataHost;
@ -42,6 +43,7 @@ import javax.inject.Named;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
@ -59,8 +61,10 @@ import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
import static com.google.common.base.Preconditions.checkArgument;
@Slf4j
public class PeerManager implements ConnectionListener, PersistedDataHost {
public final class PeerManager implements ConnectionListener, PersistedDataHost {
///////////////////////////////////////////////////////////////////////////////////////////
// Static
@ -77,9 +81,6 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
// Age of what we consider connected peers still as live peers
private static final long MAX_AGE_LIVE_PEERS = TimeUnit.MINUTES.toMillis(30);
private static final boolean PRINT_REPORTED_PEERS_DETAILS = true;
@Setter
private boolean allowDisconnectSeedNodes;
private Set<Peer> latestLivePeers = new HashSet<>();
///////////////////////////////////////////////////////////////////////////////////////////
@ -101,19 +102,23 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
private final NetworkNode networkNode;
private final ClockWatcher clockWatcher;
private int maxConnections;
private final Set<NodeAddress> seedNodeAddresses;
private final PersistenceManager<PeerList> persistenceManager;
private final PeerList peerList = new PeerList();
private final HashSet<Peer> persistedPeers = new HashSet<>();
private final Set<Peer> reportedPeers = new HashSet<>();
private final ClockWatcher.Listener listener;
private final ClockWatcher.Listener clockWatcherListener;
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
// Persistable peerList
private final PeerList peerList = new PeerList();
// Peers we got reported from other peers
@Getter
private final Set<Peer> reportedPeers = new HashSet<>();
// Most recent peers with activity date of last 30 min.
private final Set<Peer> latestLivePeers = new HashSet<>();
private Timer checkMaxConnectionsTimer;
private boolean stopped;
private boolean lostAllConnections;
private int maxConnections;
@Getter
private int minConnections;
@ -121,6 +126,8 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
private int maxConnectionsPeer;
private int maxConnectionsNonDirect;
private int maxConnectionsAbsolute;
@Setter
private boolean allowDisconnectSeedNodes;
///////////////////////////////////////////////////////////////////////////////////////////
@ -131,8 +138,8 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
public PeerManager(NetworkNode networkNode,
SeedNodeRepository seedNodeRepository,
ClockWatcher clockWatcher,
@Named(Config.MAX_CONNECTIONS) int maxConnections,
PersistenceManager<PeerList> persistenceManager) {
PersistenceManager<PeerList> persistenceManager,
@Named(Config.MAX_CONNECTIONS) int maxConnections) {
this.networkNode = networkNode;
this.seedNodeAddresses = new HashSet<>(seedNodeRepository.getSeedNodeAddresses());
this.clockWatcher = clockWatcher;
@ -144,7 +151,7 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
setConnectionLimits(maxConnections);
// we check if app was idle for more then 5 sec.
listener = new ClockWatcher.Listener() {
clockWatcherListener = new ClockWatcher.Listener() {
@Override
public void onSecondTick() {
}
@ -155,56 +162,34 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
@Override
public void onAwakeFromStandby(long missedMs) {
// TODO is "stopped = false;" correct?
// We got probably stopped set to true when we got a longer interruption (e.g. lost all connections),
// now we get awake again, so set stopped to false.
stopped = false;
listeners.forEach(Listener::onAwakeFromStandby);
}
};
clockWatcher.addListener(listener);
clockWatcher.addListener(clockWatcherListener);
}
public void shutDown() {
networkNode.removeConnectionListener(this);
clockWatcher.removeListener(listener);
clockWatcher.removeListener(clockWatcherListener);
stopCheckMaxConnectionsTimer();
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
// PersistedDataHost implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void readPersisted() {
PeerList persisted = persistenceManager.getPersisted();
if (persisted != null) {
this.persistedPeers.addAll(persisted.getList());
peerList.setAll(persisted.getSet());
}
}
public int getMaxConnections() {
return maxConnectionsAbsolute;
}
public void addListener(Listener listener) {
listeners.add(listener);
}
public void removeListener(Listener listener) {
listeners.remove(listener);
}
// Modify this to change the relationships between connection limits.
// maxConnections default 12
private void setConnectionLimits(int maxConnections) {
this.maxConnections = maxConnections; // app node 12; seedNode 30
minConnections = Math.max(1, (int) Math.round(maxConnections * 0.7)); // app node 1-8; seedNode 21
disconnectFromSeedNode = maxConnections; // app node 12; seedNode 30
maxConnectionsPeer = Math.max(4, (int) Math.round(maxConnections * 1.3)); // app node 16; seedNode 39
maxConnectionsNonDirect = Math.max(8, (int) Math.round(maxConnections * 1.7)); // app node 20; seedNode 51
maxConnectionsAbsolute = Math.max(12, (int) Math.round(maxConnections * 2.5)); // app node 30; seedNode 66
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
@ -212,64 +197,238 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
@Override
public void onConnection(Connection connection) {
boolean seedNode = isSeedNode(connection);
Optional<NodeAddress> addressOptional = connection.getPeersNodeAddressOptional();
if (log.isDebugEnabled()) {
String peer = addressOptional.map(NodeAddress::getFullAddress).orElseGet(() ->
"not known yet (connection id=" + connection.getUid() + ")");
log.debug("onConnection: peer = {}{}",
peer,
seedNode ? " (SeedNode)" : "");
}
if (seedNode)
if (isSeedNode(connection)) {
connection.setPeerType(Connection.PeerType.SEED_NODE);
}
doHouseKeeping();
if (lostAllConnections) {
lostAllConnections = false;
stopped = false;
listeners.stream().forEach(Listener::onNewConnectionAfterAllConnectionsLost);
listeners.forEach(Listener::onNewConnectionAfterAllConnectionsLost);
}
connection.getPeersNodeAddressOptional()
.flatMap(this::findPeer)
.ifPresent(Peer::onConnection);
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
log.info("onDisconnect called: nodeAddress={}, closeConnectionReason={}", connection.getPeersNodeAddressOptional(), closeConnectionReason);
final Optional<NodeAddress> addressOptional = connection.getPeersNodeAddressOptional();
log.debug("onDisconnect: peer = {}{} / closeConnectionReason: {}",
(addressOptional.isPresent() ? addressOptional.get().getFullAddress() : "not known yet (connection id=" + connection.getUid() + ")"),
isSeedNode(connection) ? " (SeedNode)" : "",
closeConnectionReason);
log.info("onDisconnect called: nodeAddress={}, closeConnectionReason={}",
connection.getPeersNodeAddressOptional(), closeConnectionReason);
handleConnectionFault(connection);
lostAllConnections = networkNode.getAllConnections().isEmpty();
if (lostAllConnections) {
stopped = true;
log.warn("\n------------------------------------------------------------\n" +
"All connections lost\n" +
"------------------------------------------------------------");
listeners.stream().forEach(Listener::onAllConnectionsLost);
listeners.forEach(Listener::onAllConnectionsLost);
}
maybeRemoveBannedPeer(closeConnectionReason, connection);
}
if (connection.getPeersNodeAddressOptional().isPresent() && isNodeBanned(closeConnectionReason, connection)) {
final NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get();
@Override
public void onError(Throwable throwable) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// Connection
///////////////////////////////////////////////////////////////////////////////////////////
public boolean hasSufficientConnections() {
return networkNode.getConfirmedConnections().size() >= minConnections;
}
// Checks if that connection has the peers node address
public boolean isConfirmed(NodeAddress nodeAddress) {
return networkNode.getNodeAddressesOfConfirmedConnections().contains(nodeAddress);
}
public void handleConnectionFault(Connection connection) {
connection.getPeersNodeAddressOptional().ifPresent(nodeAddress -> handleConnectionFault(nodeAddress, connection));
}
public void handleConnectionFault(NodeAddress nodeAddress) {
handleConnectionFault(nodeAddress, null);
}
public void handleConnectionFault(NodeAddress nodeAddress, @Nullable Connection connection) {
boolean doRemovePersistedPeer = false;
removeReportedPeer(nodeAddress);
Optional<Peer> persistedPeerOptional = findPersistedPeer(nodeAddress);
if (persistedPeerOptional.isPresent()) {
Peer persistedPeer = persistedPeerOptional.get();
persistedPeer.onDisconnect();
doRemovePersistedPeer = persistedPeer.tooManyFailedConnectionAttempts();
}
boolean ruleViolation = connection != null && connection.getRuleViolation() != null;
doRemovePersistedPeer = doRemovePersistedPeer || ruleViolation;
if (doRemovePersistedPeer)
removePersistedPeer(nodeAddress);
else
removeTooOldPersistedPeers();
}
public boolean isSeedNode(Connection connection) {
return connection.getPeersNodeAddressOptional().isPresent() &&
seedNodeAddresses.contains(connection.getPeersNodeAddressOptional().get());
}
public boolean isSelf(NodeAddress nodeAddress) {
return nodeAddress.equals(networkNode.getNodeAddress());
}
private boolean isSeedNode(Peer peer) {
return seedNodeAddresses.contains(peer.getNodeAddress());
}
public boolean isSeedNode(NodeAddress nodeAddress) {
return seedNodeAddresses.contains(nodeAddress);
}
public boolean isPeerBanned(CloseConnectionReason closeConnectionReason, Connection connection) {
return closeConnectionReason == CloseConnectionReason.PEER_BANNED &&
connection.getPeersNodeAddressOptional().isPresent();
}
private void maybeRemoveBannedPeer(CloseConnectionReason closeConnectionReason, Connection connection) {
if (connection.getPeersNodeAddressOptional().isPresent() && isPeerBanned(closeConnectionReason, connection)) {
NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get();
seedNodeAddresses.remove(nodeAddress);
removePersistedPeer(nodeAddress);
removeReportedPeer(nodeAddress);
}
}
public boolean isNodeBanned(CloseConnectionReason closeConnectionReason, Connection connection) {
return closeConnectionReason == CloseConnectionReason.PEER_BANNED &&
connection.getPeersNodeAddressOptional().isPresent();
///////////////////////////////////////////////////////////////////////////////////////////
// Peer
///////////////////////////////////////////////////////////////////////////////////////////
@SuppressWarnings("unused")
public Optional<Peer> findPeer(NodeAddress peersNodeAddress) {
return getAllPeers().stream()
.filter(peer -> peer.getNodeAddress().equals(peersNodeAddress))
.findAny();
}
@Override
public void onError(Throwable throwable) {
public Set<Peer> getAllPeers() {
Set<Peer> allPeers = new HashSet<>(getLivePeers());
allPeers.addAll(getPersistedPeers());
allPeers.addAll(reportedPeers);
return allPeers;
}
public Collection<Peer> getPersistedPeers() {
return peerList.getSet();
}
public void addToReportedPeers(Set<Peer> reportedPeersToAdd,
Connection connection,
Capabilities capabilities) {
applyCapabilities(connection, capabilities);
Set<Peer> peers = reportedPeersToAdd.stream()
.filter(peer -> !isSelf(peer.getNodeAddress()))
.collect(Collectors.toSet());
printNewReportedPeers(peers);
// We check if the reported msg is not violating our rules
if (peers.size() <= (MAX_REPORTED_PEERS + maxConnectionsAbsolute + 10)) {
reportedPeers.addAll(peers);
purgeReportedPeersIfExceeds();
getPersistedPeers().addAll(peers);
purgePersistedPeersIfExceeds();
requestPersistence();
printReportedPeers();
} else {
// If a node is trying to send too many list we treat it as rule violation.
// Reported list include the connected list. We use the max value and give some extra headroom.
// Will trigger a shutdown after 2nd time sending too much
connection.reportInvalidRequest(RuleViolation.TOO_MANY_REPORTED_PEERS_SENT);
}
}
// Delivers the live peers from the last 30 min (MAX_AGE_LIVE_PEERS)
// We include older peers to avoid risks for network partitioning
public Set<Peer> getLivePeers() {
return getLivePeers(null);
}
public Set<Peer> getLivePeers(@Nullable NodeAddress excludedNodeAddress) {
int oldNumLatestLivePeers = latestLivePeers.size();
Set<Peer> peers = new HashSet<>(latestLivePeers);
Set<Peer> currentLivePeers = getConnectedReportedPeers().stream()
.filter(e -> !isSeedNode(e))
.filter(e -> !e.getNodeAddress().equals(excludedNodeAddress))
.collect(Collectors.toSet());
peers.addAll(currentLivePeers);
long maxAge = new Date().getTime() - MAX_AGE_LIVE_PEERS;
latestLivePeers.clear();
Set<Peer> recentPeers = peers.stream()
.filter(peer -> peer.getDateAsLong() > maxAge)
.collect(Collectors.toSet());
latestLivePeers.addAll(recentPeers);
if (oldNumLatestLivePeers != latestLivePeers.size())
log.info("Num of latestLivePeers={}", latestLivePeers.size());
return latestLivePeers;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Capabilities
///////////////////////////////////////////////////////////////////////////////////////////
public boolean peerHasCapability(NodeAddress peersNodeAddress, Capability capability) {
return findPeersCapabilities(peersNodeAddress)
.map(capabilities -> capabilities.contains(capability))
.orElse(false);
}
public Optional<Capabilities> findPeersCapabilities(NodeAddress nodeAddress) {
// We look up first our connections as that is our own data. If not found there we look up the peers which
// include reported peers.
Optional<Capabilities> optionalCapabilities = networkNode.findPeersCapabilities(nodeAddress);
if (optionalCapabilities.isPresent() && !optionalCapabilities.get().isEmpty()) {
return optionalCapabilities;
}
// Reported peers are not trusted data. We could get capabilities which miss the
// peers real capability or we could get maliciously altered capabilities telling us the peer supports a
// capability which is in fact not supported. This could lead to connection loss as we might send data not
// recognized by the peer. As we register a listener on connection if we don't have set the capability from our
// own sources we would get it fixed as soon we have a connection with that peer, rendering such an attack
// inefficient.
// Also this risk is only for not updated peers, so in case that would be abused for an
// attack all users have a strong incentive to update ;-).
return getAllPeers().stream()
.filter(peer -> peer.getNodeAddress().equals(nodeAddress))
.findAny()
.map(Peer::getCapabilities);
}
private void applyCapabilities(Connection connection, Capabilities newCapabilities) {
if (newCapabilities == null || newCapabilities.isEmpty()) {
return;
}
connection.getPeersNodeAddressOptional().ifPresent(nodeAddress -> {
getAllPeers().stream()
.filter(peer -> peer.getNodeAddress().equals(nodeAddress))
.filter(peer -> peer.getCapabilities().hasLess(newCapabilities))
.forEach(peer -> peer.setCapabilities(newCapabilities));
});
requestPersistence();
}
@ -410,30 +569,21 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Reported peers
///////////////////////////////////////////////////////////////////////////////////////////
private boolean removeReportedPeer(Peer reportedPeer) {
boolean contained = reportedPeers.remove(reportedPeer);
private void removeReportedPeer(Peer reportedPeer) {
reportedPeers.remove(reportedPeer);
printReportedPeers();
return contained;
}
@SuppressWarnings("UnusedReturnValue")
@Nullable
private Peer removeReportedPeer(NodeAddress nodeAddress) {
private void removeReportedPeer(NodeAddress nodeAddress) {
List<Peer> reportedPeersClone = new ArrayList<>(reportedPeers);
Optional<Peer> reportedPeerOptional = reportedPeersClone.stream()
.filter(e -> e.getNodeAddress().equals(nodeAddress)).findAny();
if (reportedPeerOptional.isPresent()) {
Peer reportedPeer = reportedPeerOptional.get();
removeReportedPeer(reportedPeer);
return reportedPeer;
} else {
return null;
}
reportedPeersClone.stream()
.filter(e -> e.getNodeAddress().equals(nodeAddress))
.findAny()
.ifPresent(this::removeReportedPeer);
}
private void removeTooOldReportedPeers() {
@ -444,31 +594,6 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
reportedPeersToRemove.forEach(this::removeReportedPeer);
}
public Set<Peer> getReportedPeers() {
return reportedPeers;
}
public void addToReportedPeers(Set<Peer> reportedPeersToAdd, Connection connection) {
printNewReportedPeers(reportedPeersToAdd);
// We check if the reported msg is not violating our rules
if (reportedPeersToAdd.size() <= (MAX_REPORTED_PEERS + maxConnectionsAbsolute + 10)) {
reportedPeers.addAll(reportedPeersToAdd);
purgeReportedPeersIfExceeds();
persistedPeers.addAll(reportedPeersToAdd);
purgePersistedPeersIfExceeds();
peerList.setAll(persistedPeers);
persistenceManager.requestPersistence();
printReportedPeers();
} else {
// If a node is trying to send too many list we treat it as rule violation.
// Reported list include the connected list. We use the max value and give some extra headroom.
// Will trigger a shutdown after 2nd time sending too much
connection.reportInvalidRequest(RuleViolation.TOO_MANY_REPORTED_PEERS_SENT);
}
}
private void purgeReportedPeersIfExceeds() {
int size = reportedPeers.size();
@ -477,7 +602,7 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
"We remove random peers from the reported peers list.", size, MAX_REPORTED_PEERS);
int diff = size - MAX_REPORTED_PEERS;
List<Peer> list = new ArrayList<>(reportedPeers);
// we dont use sorting by lastActivityDate to keep it more random
// we don't use sorting by lastActivityDate to keep it more random
for (int i = 0; i < diff; i++) {
if (!list.isEmpty()) {
Peer toRemove = list.remove(new Random().nextInt(list.size()));
@ -491,12 +616,11 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
private void printReportedPeers() {
if (!reportedPeers.isEmpty()) {
//noinspection ConstantConditions
if (PRINT_REPORTED_PEERS_DETAILS) {
StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" +
"Collected reported peers:");
List<Peer> reportedPeersClone = new ArrayList<>(reportedPeers);
reportedPeersClone.stream().forEach(e -> result.append("\n").append(e));
reportedPeersClone.forEach(e -> result.append("\n").append(e));
result.append("\n------------------------------------------------------------\n");
log.trace(result.toString());
}
@ -505,11 +629,10 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
}
private void printNewReportedPeers(Set<Peer> reportedPeers) {
//noinspection ConstantConditions
if (PRINT_REPORTED_PEERS_DETAILS) {
StringBuilder result = new StringBuilder("We received new reportedPeers:");
List<Peer> reportedPeersClone = new ArrayList<>(reportedPeers);
reportedPeersClone.stream().forEach(e -> result.append("\n\t").append(e));
reportedPeersClone.forEach(e -> result.append("\n\t").append(e));
log.trace(result.toString());
}
log.debug("Number of new arrived reported peers: {}", reportedPeers.size());
@ -517,47 +640,51 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
///////////////////////////////////////////////////////////////////////////////////////////
// Persisted list
// Persisted peers
///////////////////////////////////////////////////////////////////////////////////////////
private boolean removePersistedPeer(Peer persistedPeer) {
if (persistedPeers.contains(persistedPeer)) {
persistedPeers.remove(persistedPeer);
peerList.setAll(persistedPeers);
persistenceManager.requestPersistence();
if (getPersistedPeers().contains(persistedPeer)) {
getPersistedPeers().remove(persistedPeer);
requestPersistence();
return true;
} else {
return false;
}
}
@SuppressWarnings("UnusedReturnValue")
private boolean removePersistedPeer(NodeAddress nodeAddress) {
Optional<Peer> persistedPeerOptional = getPersistedPeerOptional(nodeAddress);
return persistedPeerOptional.isPresent() && removePersistedPeer(persistedPeerOptional.get());
private void requestPersistence() {
persistenceManager.requestPersistence();
}
private Optional<Peer> getPersistedPeerOptional(NodeAddress nodeAddress) {
return persistedPeers.stream()
.filter(e -> e.getNodeAddress().equals(nodeAddress)).findAny();
@SuppressWarnings("UnusedReturnValue")
private boolean removePersistedPeer(NodeAddress nodeAddress) {
Optional<Peer> optionalPersistedPeer = findPersistedPeer(nodeAddress);
return optionalPersistedPeer.isPresent() && removePersistedPeer(optionalPersistedPeer.get());
}
private Optional<Peer> findPersistedPeer(NodeAddress nodeAddress) {
return getPersistedPeers().stream()
.filter(e -> e.getNodeAddress().equals(nodeAddress))
.findAny();
}
private void removeTooOldPersistedPeers() {
Set<Peer> persistedPeersToRemove = persistedPeers.stream()
Set<Peer> persistedPeersToRemove = getPersistedPeers().stream()
.filter(reportedPeer -> new Date().getTime() - reportedPeer.getDate().getTime() > MAX_AGE)
.collect(Collectors.toSet());
persistedPeersToRemove.forEach(this::removePersistedPeer);
}
private void purgePersistedPeersIfExceeds() {
int size = persistedPeers.size();
int size = getPersistedPeers().size();
int limit = MAX_PERSISTED_PEERS;
if (size > limit) {
log.trace("We have already {} persisted peers which exceeds our limit of {}." +
"We remove random peers from the persisted peers list.", size, limit);
int diff = size - limit;
List<Peer> list = new ArrayList<>(persistedPeers);
// we dont use sorting by lastActivityDate to avoid attack vectors and keep it more random
List<Peer> list = new ArrayList<>(getPersistedPeers());
// we don't use sorting by lastActivityDate to avoid attack vectors and keep it more random
for (int i = 0; i < diff; i++) {
if (!list.isEmpty()) {
Peer toRemove = list.remove(new Random().nextInt(list.size()));
@ -569,110 +696,44 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
}
}
public Set<Peer> getPersistedPeers() {
return persistedPeers;
///////////////////////////////////////////////////////////////////////////////////////////
// Getters
///////////////////////////////////////////////////////////////////////////////////////////
public int getMaxConnections() {
return maxConnectionsAbsolute;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Misc
// Listeners
///////////////////////////////////////////////////////////////////////////////////////////
public boolean hasSufficientConnections() {
return networkNode.getNodeAddressesOfConfirmedConnections().size() >= minConnections;
public void addListener(Listener listener) {
listeners.add(listener);
}
private boolean isSeedNode(Peer reportedPeer) {
return seedNodeAddresses.contains(reportedPeer.getNodeAddress());
public void removeListener(Listener listener) {
listeners.remove(listener);
}
public boolean isSeedNode(NodeAddress nodeAddress) {
return seedNodeAddresses.contains(nodeAddress);
}
public boolean isSeedNode(Connection connection) {
return connection.hasPeersNodeAddress() && seedNodeAddresses.contains(connection.getPeersNodeAddressOptional().get());
}
public boolean isSelf(Peer reportedPeer) {
return isSelf(reportedPeer.getNodeAddress());
}
public boolean isSelf(NodeAddress nodeAddress) {
return nodeAddress.equals(networkNode.getNodeAddress());
}
public boolean isConfirmed(Peer reportedPeer) {
return isConfirmed(reportedPeer.getNodeAddress());
}
// Checks if that connection has the peers node address
public boolean isConfirmed(NodeAddress nodeAddress) {
return networkNode.getNodeAddressesOfConfirmedConnections().contains(nodeAddress);
}
public void handleConnectionFault(Connection connection) {
connection.getPeersNodeAddressOptional().ifPresent(nodeAddress -> handleConnectionFault(nodeAddress, connection));
}
public void handleConnectionFault(NodeAddress nodeAddress) {
handleConnectionFault(nodeAddress, null);
}
public void handleConnectionFault(NodeAddress nodeAddress, @Nullable Connection connection) {
log.debug("handleConnectionFault called: nodeAddress=" + nodeAddress);
boolean doRemovePersistedPeer = false;
removeReportedPeer(nodeAddress);
Optional<Peer> persistedPeerOptional = getPersistedPeerOptional(nodeAddress);
if (persistedPeerOptional.isPresent()) {
Peer persistedPeer = persistedPeerOptional.get();
persistedPeer.increaseFailedConnectionAttempts();
doRemovePersistedPeer = persistedPeer.tooManyFailedConnectionAttempts();
}
doRemovePersistedPeer = doRemovePersistedPeer || (connection != null && connection.getRuleViolation() != null);
if (doRemovePersistedPeer)
removePersistedPeer(nodeAddress);
else
removeTooOldPersistedPeers();
}
public void shutDownConnection(Connection connection, CloseConnectionReason closeConnectionReason) {
if (connection.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER)
connection.shutDown(closeConnectionReason);
}
public void shutDownConnection(NodeAddress peersNodeAddress, CloseConnectionReason closeConnectionReason) {
networkNode.getAllConnections().stream()
.filter(connection -> connection.getPeersNodeAddressOptional().isPresent() &&
connection.getPeersNodeAddressOptional().get().equals(peersNodeAddress) &&
connection.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER)
.findAny()
.ifPresent(connection -> connection.shutDown(closeConnectionReason));
}
// Delivers the live peers from the last 30 min (MAX_AGE_LIVE_PEERS)
// We include older peers to avoid risks for network partitioning
public Set<Peer> getLivePeers(NodeAddress excludedNodeAddress) {
int oldNumLatestLivePeers = latestLivePeers.size();
Set<Peer> currentLivePeers = new HashSet<>(getConnectedReportedPeers().stream()
.filter(e -> !isSeedNode(e))
.filter(e -> !e.getNodeAddress().equals(excludedNodeAddress))
.collect(Collectors.toSet()));
latestLivePeers.addAll(currentLivePeers);
long maxAge = new Date().getTime() - MAX_AGE_LIVE_PEERS;
latestLivePeers = latestLivePeers.stream()
.filter(peer -> peer.getDate().getTime() > maxAge)
.collect(Collectors.toSet());
if (oldNumLatestLivePeers != latestLivePeers.size())
log.info("Num of latestLivePeers={}", latestLivePeers.size());
return latestLivePeers;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
// Private misc
///////////////////////////////////////////////////////////////////////////////////////////
// Modify this to change the relationships between connection limits.
// maxConnections default 12
private void setConnectionLimits(int maxConnections) {
this.maxConnections = maxConnections; // app node 12; seedNode 30
minConnections = Math.max(1, (int) Math.round(maxConnections * 0.7)); // app node 1-8; seedNode 21
disconnectFromSeedNode = maxConnections; // app node 12; seedNode 30
maxConnectionsPeer = Math.max(4, (int) Math.round(maxConnections * 1.3)); // app node 16; seedNode 39
maxConnectionsNonDirect = Math.max(8, (int) Math.round(maxConnections * 1.7)); // app node 20; seedNode 51
maxConnectionsAbsolute = Math.max(12, (int) Math.round(maxConnections * 2.5)); // app node 30; seedNode 66
}
private Set<Peer> getConnectedReportedPeers() {
// networkNode.getConfirmedConnections includes:
// filter(connection -> connection.getPeersNodeAddressOptional().isPresent())
@ -682,17 +743,31 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
// If we have a new connection the supportedCapabilities is empty.
// We lookup if we have already stored the supportedCapabilities at the persisted or reported peers
// and if so we use that.
if (supportedCapabilities.isEmpty()) {
Set<Peer> allPeers = new HashSet<>(getPersistedPeers());
allPeers.addAll(getReportedPeers());
Optional<Peer> ourPeer = allPeers.stream().filter(peer -> peer.getNodeAddress().equals(connection.getPeersNodeAddressOptional().get()))
Optional<NodeAddress> peersNodeAddressOptional = connection.getPeersNodeAddressOptional();
checkArgument(peersNodeAddressOptional.isPresent()); // getConfirmedConnections delivers only connections where we know the address
NodeAddress peersNodeAddress = peersNodeAddressOptional.get();
boolean capabilitiesNotFoundInConnection = supportedCapabilities.isEmpty();
if (capabilitiesNotFoundInConnection) {
// If not found in connection we look up if we got the Capabilities set from any of the
// reported or persisted peers
Set<Peer> persistedAndReported = new HashSet<>(getPersistedPeers());
persistedAndReported.addAll(getReportedPeers());
Optional<Peer> candidate = persistedAndReported.stream()
.filter(peer -> peer.getNodeAddress().equals(peersNodeAddress))
.filter(peer -> !peer.getCapabilities().isEmpty())
.findAny();
if (ourPeer.isPresent())
supportedCapabilities = new Capabilities(ourPeer.get().getCapabilities());
if (candidate.isPresent()) {
supportedCapabilities = new Capabilities(candidate.get().getCapabilities());
}
}
Peer peer = new Peer(peersNodeAddress, supportedCapabilities);
// If we did not found the capability from our own connection we add a listener,
// so once we get a connection with that peer and exchange a message containing the capabilities
// we get set the capabilities.
if (capabilitiesNotFoundInConnection) {
connection.addWeakCapabilitiesListener(peer);
}
Peer peer = new Peer(connection.getPeersNodeAddressOptional().get(), supportedCapabilities);
connection.addWeakCapabilitiesListener(peer);
return peer;
})
.collect(Collectors.toSet());
@ -709,8 +784,8 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
if (!networkNode.getConfirmedConnections().isEmpty()) {
StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" +
"Connected peers for node " + networkNode.getNodeAddress() + ":");
networkNode.getConfirmedConnections().stream().forEach(e -> result.append("\n")
.append(e.getPeersNodeAddressOptional().get()).append(" ").append(e.getPeerType()));
networkNode.getConfirmedConnections().forEach(e -> result.append("\n")
.append(e.getPeersNodeAddressOptional()).append(" ").append(e.getPeerType()));
result.append("\n------------------------------------------------------------\n");
log.debug(result.toString());
}

View File

@ -210,8 +210,8 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
closeHandler(connection);
if (peerManager.isNodeBanned(closeConnectionReason, connection) && connection.getPeersNodeAddressOptional().isPresent()) {
final NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get();
if (peerManager.isPeerBanned(closeConnectionReason, connection) && connection.getPeersNodeAddressOptional().isPresent()) {
NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get();
seedNodeAddresses.remove(nodeAddress);
handlerMap.remove(nodeAddress);
}

View File

@ -32,6 +32,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@ -83,11 +84,11 @@ class GetPeersRequestHandler {
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void handle(GetPeersRequest getPeersRequest, final Connection connection) {
public void handle(GetPeersRequest getPeersRequest, Connection connection) {
checkArgument(connection.getPeersNodeAddressOptional().isPresent(),
"The peers address must have been already set at the moment");
GetPeersResponse getPeersResponse = new GetPeersResponse(getPeersRequest.getNonce(),
peerManager.getLivePeers(connection.getPeersNodeAddressOptional().get()));
new HashSet<>(peerManager.getLivePeers(connection.getPeersNodeAddressOptional().get())));
checkArgument(timeoutTimer == null, "onGetPeersRequest must not be called twice.");
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
@ -130,8 +131,9 @@ class GetPeersRequestHandler {
}
}
}, MoreExecutors.directExecutor());
peerManager.addToReportedPeers(getPeersRequest.getReportedPeers(), connection);
peerManager.addToReportedPeers(getPeersRequest.getReportedPeers(),
connection,
getPeersRequest.getSupportedCapabilities());
}

View File

@ -27,7 +27,6 @@ import bisq.common.proto.persistable.PersistablePayload;
import java.util.Date;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@ -35,17 +34,15 @@ import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
@Getter
@EqualsAndHashCode(exclude = {"date"}) // failedConnectionAttempts is transient and therefore excluded anyway
@Slf4j
public final class Peer implements HasCapabilities, NetworkPayload, PersistablePayload, SupportedCapabilitiesListener {
private static final int MAX_FAILED_CONNECTION_ATTEMPTS = 5;
private final NodeAddress nodeAddress;
private final long date;
// Added in v. 0.7.1
@Setter
transient private int failedConnectionAttempts = 0;
@Setter
private Capabilities capabilities = new Capabilities();
public Peer(NodeAddress nodeAddress, @Nullable Capabilities supportedCapabilities) {
@ -83,10 +80,14 @@ public final class Peer implements HasCapabilities, NetworkPayload, PersistableP
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void increaseFailedConnectionAttempts() {
public void onDisconnect() {
this.failedConnectionAttempts++;
}
public void onConnection() {
this.failedConnectionAttempts--;
}
public boolean tooManyFailedConnectionAttempts() {
return failedConnectionAttempts >= MAX_FAILED_CONNECTION_ATTEMPTS;
}
@ -95,6 +96,10 @@ public final class Peer implements HasCapabilities, NetworkPayload, PersistableP
return new Date(date);
}
public long getDateAsLong() {
return date;
}
@Override
public void onChanged(Capabilities supportedCapabilities) {
if (!supportedCapabilities.isEmpty()) {
@ -102,14 +107,29 @@ public final class Peer implements HasCapabilities, NetworkPayload, PersistableP
}
}
// We use only node address for equals and hashcode
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Peer)) return false;
Peer peer = (Peer) o;
return nodeAddress != null ? nodeAddress.equals(peer.nodeAddress) : peer.nodeAddress == null;
}
@Override
public int hashCode() {
return nodeAddress != null ? nodeAddress.hashCode() : 0;
}
@Override
public String toString() {
return "Peer{" +
"\n nodeAddress=" + nodeAddress +
",\n supportedCapabilities=" + capabilities +
",\n failedConnectionAttempts=" + failedConnectionAttempts +
",\n date=" + date +
",\n failedConnectionAttempts=" + failedConnectionAttempts +
",\n capabilities=" + capabilities +
"\n}";
}
}

View File

@ -35,6 +35,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@ -104,7 +105,9 @@ class PeerExchangeHandler implements MessageListener {
log.debug("sendGetPeersRequest to nodeAddress={}", nodeAddress);
if (!stopped) {
if (networkNode.getNodeAddress() != null) {
GetPeersRequest getPeersRequest = new GetPeersRequest(networkNode.getNodeAddress(), nonce, peerManager.getLivePeers(nodeAddress));
GetPeersRequest getPeersRequest = new GetPeersRequest(networkNode.getNodeAddress(),
nonce,
new HashSet<>(peerManager.getLivePeers(nodeAddress)));
if (timeoutTimer == null) {
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
if (!stopped) {
@ -168,7 +171,9 @@ class PeerExchangeHandler implements MessageListener {
// Check if the response is for our request
if (getPeersResponse.getRequestNonce() == nonce) {
peerManager.addToReportedPeers(getPeersResponse.getReportedPeers(), connection);
peerManager.addToReportedPeers(getPeersResponse.getReportedPeers(),
connection,
getPeersResponse.getSupportedCapabilities());
cleanup();
listener.onComplete();
} else {

View File

@ -147,8 +147,9 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener,
}, RETRY_DELAY_SEC);
}
if (peerManager.isNodeBanned(closeConnectionReason, connection))
seedNodeAddresses.remove(connection.getPeersNodeAddressOptional().get());
if (peerManager.isPeerBanned(closeConnectionReason, connection)) {
connection.getPeersNodeAddressOptional().ifPresent(seedNodeAddresses::remove);
}
}
@Override
@ -224,7 +225,8 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener,
///////////////////////////////////////////////////////////////////////////////////////////
private void requestReportedPeers(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) {
log.debug("requestReportedPeers nodeAddress={}; remainingNodeAddresses.size={}", nodeAddress, remainingNodeAddresses.size());
log.debug("requestReportedPeers nodeAddress={}; remainingNodeAddresses.size={}",
nodeAddress, remainingNodeAddresses.size());
if (!stopped) {
if (!handlerMap.containsKey(nodeAddress)) {
PeerExchangeHandler peerExchangeHandler = new PeerExchangeHandler(networkNode,

View File

@ -21,46 +21,55 @@ import bisq.common.proto.persistable.PersistableEnvelope;
import com.google.protobuf.Message;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@EqualsAndHashCode
public class PeerList implements PersistableEnvelope {
@Getter
private final List<Peer> list = new ArrayList<>();
private final Set<Peer> set = new HashSet<>();
public PeerList() {
}
public PeerList(List<Peer> list) {
setAll(list);
public PeerList(Set<Peer> set) {
setAll(set);
}
public int size() {
return list.size();
return set.size();
}
@Override
public Message toProtoMessage() {
return protobuf.PersistableEnvelope.newBuilder()
.setPeerList(protobuf.PeerList.newBuilder()
.addAllPeer(list.stream().map(Peer::toProtoMessage).collect(Collectors.toList())))
.addAllPeer(set.stream().map(Peer::toProtoMessage).collect(Collectors.toList())))
.build();
}
public static PeerList fromProto(protobuf.PeerList proto) {
return new PeerList(new ArrayList<>(proto.getPeerList().stream()
return new PeerList(proto.getPeerList().stream()
.map(Peer::fromProto)
.collect(Collectors.toList())));
.collect(Collectors.toSet()));
}
public void setAll(Collection<Peer> collection) {
this.list.clear();
this.list.addAll(collection);
this.set.clear();
this.set.addAll(collection);
}
@Override
public String toString() {
return "PeerList{" +
"\n set=" + set +
"\n}";
}
}

View File

@ -47,8 +47,14 @@ public final class GetPeersRequest extends NetworkEnvelope implements PeerExchan
@Nullable
private final Capabilities supportedCapabilities;
public GetPeersRequest(NodeAddress senderNodeAddress, int nonce, Set<Peer> reportedPeers) {
this(senderNodeAddress, nonce, reportedPeers, Capabilities.app, Version.getP2PMessageVersion());
public GetPeersRequest(NodeAddress senderNodeAddress,
int nonce,
Set<Peer> reportedPeers) {
this(senderNodeAddress,
nonce,
reportedPeers,
Capabilities.app,
Version.getP2PMessageVersion());
}

View File

@ -43,8 +43,12 @@ public final class GetPeersResponse extends NetworkEnvelope implements PeerExcha
@Nullable
private final Capabilities supportedCapabilities;
public GetPeersResponse(int requestNonce, Set<Peer> reportedPeers) {
this(requestNonce, reportedPeers, Capabilities.app, Version.getP2PMessageVersion());
public GetPeersResponse(int requestNonce,
Set<Peer> reportedPeers) {
this(requestNonce,
reportedPeers,
Capabilities.app,
Version.getP2PMessageVersion());
}

View File

@ -24,6 +24,7 @@ import java.util.Collection;
public interface HashMapChangedListener {
void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries);
@SuppressWarnings("UnusedParameters")
void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries);
default void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
// Often we are only interested in added data as there is no use case for remove
}
}

View File

@ -637,12 +637,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
// To avoid that expired data get stored and broadcast we check early for expire date.
if (protectedStorageEntry.isExpired(clock)) {
String peer = sender != null ? sender.getFullAddress() : "sender is null";
log.warn("We received an expired protectedStorageEntry from peer {}. ProtectedStoragePayload={}",
log.debug("We received an expired protectedStorageEntry from peer {}. ProtectedStoragePayload={}",
peer, protectedStorageEntry.getProtectedStoragePayload().getClass().getSimpleName());
log.debug("Expired protectedStorageEntry from peer {}. getCreationTimeStamp={}, protectedStorageEntry={}",
peer,
new Date(protectedStorageEntry.getCreationTimeStamp()),
protectedStorageEntry);
return false;
}

View File

@ -117,12 +117,11 @@ public abstract class StoreService<T extends PersistableEnvelope> {
T persisted = persistenceManager.getPersisted(fileName);
if (persisted != null) {
store = persisted;
int length = store.toProtoMessage().toByteArray().length;
/* int length = store.toProtoMessage().getSerializedSize();
double size = length > 1_000_000D ? length / 1_000_000D : length / 1_000D;
String unit = length > 1_000_000D ? "MB" : "KB";
log.info("{}: size of {}: {} {}", this.getClass().getSimpleName(),
persisted.getClass().getSimpleName(), size, unit);
persisted.getClass().getSimpleName(), size, unit);*/
} else {
store = createStore();
}

View File

@ -60,7 +60,7 @@ public class MockNode {
networkNode = mock(NetworkNode.class);
File storageDir = Files.createTempDirectory("storage").toFile();
PersistenceManager<PeerList> persistenceManager = new PersistenceManager<>(storageDir, mock(PersistenceProtoResolver.class), mock(CorruptedStorageFileHandler.class));
peerManager = new PeerManager(networkNode, mock(SeedNodeRepository.class), new ClockWatcher(), maxConnections, persistenceManager);
peerManager = new PeerManager(networkNode, mock(SeedNodeRepository.class), new ClockWatcher(), persistenceManager, maxConnections);
connections = new HashSet<>();
when(networkNode.getAllConnections()).thenReturn(connections);
}