Refactor checkMaxConnections (#3126)

* Change access level for checkMaxConnections to be tested

* Refactor checkMaxConnections

Fix connection limit checks so as to prevent the following warning:

> WARN  b.n.p2p.peers.PeerManager: No candidates found to remove (That
case should not be possible as we use in the last case all
connections).

* Add MockNode that allows for simulating connections

* Add PeerManagerTest

The old PeerManagerTest was located under network/p2p/routing, which is
no longer the correct location. Additionally, it was outdated so I
just removed it and added a new file under network/p2p/peers containing
tests for checkMaxConnections.

* Add testCompile dependency to core

This is necessary because bisq.network.p2p.MockNode imports
bisq.core.network.p2p.seed.DefaultSeedNodeRepository.

* Update based on review feedback

Mock the SeedNodeRepository superclass, thus eliminating the dependency
to core.
This commit is contained in:
Christoph Atteneder 2019-11-26 15:16:07 +01:00 committed by GitHub
commit 9c3c6182c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 337 additions and 529 deletions

View File

@ -40,6 +40,8 @@ import javax.inject.Named;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
@ -300,65 +302,83 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
}
}
private boolean checkMaxConnections() {
@VisibleForTesting
boolean checkMaxConnections() {
Set<Connection> allConnections = new HashSet<>(networkNode.getAllConnections());
int size = allConnections.size();
log.info("We have {} connections open. Our limit is {}", size, maxConnections);
if (size > maxConnections) {
log.info("We have too many connections open. " +
"Lets try first to remove the inbound connections of type PEER.");
List<Connection> candidates = allConnections.stream()
.filter(e -> e instanceof InboundConnection)
if (size <= maxConnections) {
log.debug("We have not exceeded the maxConnections limit of {} " +
"so don't need to close any connections.", size);
return false;
}
log.info("We have too many connections open. " +
"Lets try first to remove the inbound connections of type PEER.");
List<Connection> candidates = allConnections.stream()
.filter(e -> e instanceof InboundConnection)
.filter(e -> e.getPeerType() == Connection.PeerType.PEER)
.collect(Collectors.toList());
if (candidates.isEmpty()) {
log.info("No candidates found. We check if we exceed our " +
"maxConnectionsPeer limit of {}", maxConnectionsPeer);
if (size <= maxConnectionsPeer) {
log.info("We have not exceeded maxConnectionsPeer limit of {} " +
"so don't need to close any connections", maxConnectionsPeer);
return false;
}
log.info("We have exceeded maxConnectionsPeer limit of {}. " +
"Lets try to remove ANY connection of type PEER.", maxConnectionsPeer);
candidates = allConnections.stream()
.filter(e -> e.getPeerType() == Connection.PeerType.PEER)
.collect(Collectors.toList());
if (candidates.isEmpty()) {
log.info("No candidates found. We check if we exceed our " +
"maxConnectionsPeer limit of {}", maxConnectionsPeer);
if (size > maxConnectionsPeer) {
log.info("Lets try to remove ANY connection of type PEER.");
candidates = allConnections.stream()
.filter(e -> e.getPeerType() == Connection.PeerType.PEER)
.collect(Collectors.toList());
"maxConnectionsNonDirect limit of {}", maxConnectionsNonDirect);
if (size <= maxConnectionsNonDirect) {
log.info("We have not exceeded maxConnectionsNonDirect limit of {} " +
"so don't need to close any connections", maxConnectionsNonDirect);
return false;
}
if (candidates.isEmpty()) {
log.debug("No candidates found. We check if we exceed our " +
"maxConnectionsNonDirect limit of {}", maxConnectionsNonDirect);
if (size > maxConnectionsNonDirect) {
log.info("Lets try to remove any connection which is not of type DIRECT_MSG_PEER or INITIAL_DATA_REQUEST.");
candidates = allConnections.stream()
.filter(e -> e.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER && e.getPeerType() != Connection.PeerType.INITIAL_DATA_REQUEST)
.collect(Collectors.toList());
log.info("We have exceeded maxConnectionsNonDirect limit of {} " +
"Lets try to remove any connection which is not " +
"of type DIRECT_MSG_PEER or INITIAL_DATA_REQUEST.", maxConnectionsNonDirect);
candidates = allConnections.stream()
.filter(e -> e.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER &&
e.getPeerType() != Connection.PeerType.INITIAL_DATA_REQUEST)
.collect(Collectors.toList());
if (candidates.isEmpty()) {
log.debug("No candidates found. We check if we exceed our " +
"maxConnectionsAbsolute limit of {}", maxConnectionsAbsolute);
if (size > maxConnectionsAbsolute) {
log.info("We reached abs. max. connections. Lets try to remove ANY connection.");
candidates = new ArrayList<>(allConnections);
}
}
}
if (candidates.isEmpty()) {
log.info("No candidates found. We check if we exceed our " +
"maxConnectionsAbsolute limit of {}", maxConnectionsAbsolute);
if (size <= maxConnectionsAbsolute) {
log.info("We have not exceeded maxConnectionsAbsolute limit of {} " +
"so don't need to close any connections", maxConnectionsAbsolute);
return false;
}
log.info("We reached abs. max. connections. Lets try to remove ANY connection.");
candidates = new ArrayList<>(allConnections);
}
}
}
if (!candidates.isEmpty()) {
candidates.sort(Comparator.comparingLong(o -> o.getStatistic().getLastActivityTimestamp()));
Connection connection = candidates.remove(0);
log.info("checkMaxConnections: Num candidates for shut down={}. We close oldest connection: {}", candidates.size(), connection);
log.debug("We are going to shut down the oldest connection.\n\tconnection={}", connection.toString());
if (!connection.isStopped())
connection.shutDown(CloseConnectionReason.TOO_MANY_CONNECTIONS_OPEN, () -> UserThread.runAfter(this::checkMaxConnections, 100, TimeUnit.MILLISECONDS));
return true;
} else {
log.debug("No candidates found to remove.\n\t" +
"size={}, allConnections={}", size, allConnections);
return false;
}
if (!candidates.isEmpty()) {
candidates.sort(Comparator.comparingLong(o -> o.getStatistic().getLastActivityTimestamp()));
Connection connection = candidates.remove(0);
log.info("checkMaxConnections: Num candidates for shut down={}. We close oldest connection: {}", candidates.size(), connection);
log.debug("We are going to shut down the oldest connection.\n\tconnection={}", connection.toString());
if (!connection.isStopped())
connection.shutDown(CloseConnectionReason.TOO_MANY_CONNECTIONS_OPEN, () -> UserThread.runAfter(this::checkMaxConnections, 100, TimeUnit.MILLISECONDS));
return true;
} else {
log.trace("We only have {} connections open and don't need to close any.", size);
log.info("No candidates found to remove.\n\t" +
"size={}, allConnections={}", size, allConnections);
return false;
}
}

View File

@ -0,0 +1,86 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.network.p2p;
import bisq.network.p2p.seed.SeedNodeRepository;
import bisq.network.p2p.network.Connection;
import bisq.network.p2p.network.InboundConnection;
import bisq.network.p2p.network.NetworkNode;
import bisq.network.p2p.network.OutboundConnection;
import bisq.network.p2p.network.Statistic;
import bisq.network.p2p.peers.PeerManager;
import bisq.network.p2p.peers.peerexchange.PeerList;
import bisq.common.ClockWatcher;
import bisq.common.proto.persistable.PersistenceProtoResolver;
import bisq.common.storage.CorruptedDatabaseFilesHandler;
import bisq.common.storage.Storage;
import java.io.File;
import java.util.HashSet;
import java.util.Set;
import lombok.Getter;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MockNode {
@Getter
public NetworkNode networkNode;
@Getter
public PeerManager peerManager;
@Getter
public Set<Connection> connections;
@Getter
public int maxConnections;
public MockNode(int maxConnections) {
this.maxConnections = maxConnections;
networkNode = mock(NetworkNode.class);
Storage<PeerList> storage = new Storage<>(mock(File.class), mock(PersistenceProtoResolver.class), mock(CorruptedDatabaseFilesHandler.class));
peerManager = new PeerManager(networkNode, mock(SeedNodeRepository.class), new ClockWatcher(), maxConnections, storage);
connections = new HashSet<>();
when(networkNode.getAllConnections()).thenReturn(connections);
}
public void addInboundConnection(Connection.PeerType peerType) {
InboundConnection inboundConnection = mock(InboundConnection.class);
when(inboundConnection.getPeerType()).thenReturn(peerType);
Statistic statistic = mock(Statistic.class);
long lastActivityTimestamp = System.currentTimeMillis();
when(statistic.getLastActivityTimestamp()).thenReturn(lastActivityTimestamp);
when(inboundConnection.getStatistic()).thenReturn(statistic);
doNothing().when(inboundConnection).run();
connections.add(inboundConnection);
}
public void addOutboundConnection(Connection.PeerType peerType) {
OutboundConnection outboundConnection = mock(OutboundConnection.class);
when(outboundConnection.getPeerType()).thenReturn(peerType);
Statistic statistic = mock(Statistic.class);
long lastActivityTimestamp = System.currentTimeMillis();
when(statistic.getLastActivityTimestamp()).thenReturn(lastActivityTimestamp);
when(outboundConnection.getStatistic()).thenReturn(statistic);
doNothing().when(outboundConnection).run();
connections.add(outboundConnection);
}
}

View File

@ -0,0 +1,188 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.network.p2p.peers;
import bisq.network.p2p.MockNode;
import bisq.network.p2p.network.CloseConnectionReason;
import bisq.network.p2p.network.Connection;
import bisq.network.p2p.network.InboundConnection;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.*;
public class PeerManagerTest {
private MockNode node;
private int maxConnectionsPeer;
private int maxConnectionsNonDirect;
@Before
public void Setup() {
node = new MockNode(2);
maxConnectionsPeer = Math.max(4, (int) Math.round(node.getMaxConnections() * 1.3));
maxConnectionsNonDirect = Math.max(8, (int) Math.round(node.getMaxConnections() * 1.7));
}
@Test
public void testCheckMaxConnectionsNotExceeded() {
for (int i = 0; i < 2; i++) {
node.addInboundConnection(Connection.PeerType.PEER);
}
assertEquals(2, node.getNetworkNode().getAllConnections().size());
assertFalse(node.getPeerManager().checkMaxConnections());
node.getNetworkNode().getAllConnections().forEach(connection -> {
verify(connection, never()).shutDown(eq(CloseConnectionReason.TOO_MANY_CONNECTIONS_OPEN), isA(Runnable.class));
});
}
@Test
public void testCheckMaxConnectionsExceededWithInboundPeers() throws InterruptedException {
for (int i = 0; i < 3; i++) {
node.addInboundConnection(Connection.PeerType.PEER);
}
assertEquals(3, node.getNetworkNode().getAllConnections().size());
List<Connection> inboundSortedPeerConnections = node.getNetworkNode().getAllConnections().stream()
.filter(e -> e instanceof InboundConnection)
.filter(e -> e.getPeerType() == Connection.PeerType.PEER)
.sorted(Comparator.comparingLong(o -> o.getStatistic().getLastActivityTimestamp()))
.collect(Collectors.toList());
Connection oldestConnection = inboundSortedPeerConnections.remove(0);
assertTrue(node.getPeerManager().checkMaxConnections());
// Need to wait because the shutDownCompleteHandler calls
// checkMaxConnections on the user thread after a delay
Thread.sleep(500);
verify(oldestConnection, times(1)).shutDown(
eq(CloseConnectionReason.TOO_MANY_CONNECTIONS_OPEN),
isA(Runnable.class));
inboundSortedPeerConnections.forEach(connection -> {
verify(connection, never()).shutDown(
eq(CloseConnectionReason.TOO_MANY_CONNECTIONS_OPEN),
isA(Runnable.class));
});
}
@Test
public void testCheckMaxConnectionsPeerLimitNotExceeded() {
for (int i = 0; i < maxConnectionsPeer; i++) {
node.addOutboundConnection(Connection.PeerType.PEER);
}
assertEquals(maxConnectionsPeer, node.getNetworkNode().getAllConnections().size());
assertFalse(node.getPeerManager().checkMaxConnections());
node.getNetworkNode().getAllConnections().forEach(connection -> {
verify(connection, never()).shutDown(eq(CloseConnectionReason.TOO_MANY_CONNECTIONS_OPEN), isA(Runnable.class));
});
}
@Test
public void testCheckMaxConnectionsPeerLimitExceeded() throws InterruptedException {
for (int i = 0; i < maxConnectionsPeer + 1; i++) {
node.addOutboundConnection(Connection.PeerType.PEER);
}
assertEquals(maxConnectionsPeer + 1, node.getNetworkNode().getAllConnections().size());
List<Connection> sortedPeerConnections = node.getNetworkNode().getAllConnections().stream()
.filter(e -> e.getPeerType() == Connection.PeerType.PEER)
.sorted(Comparator.comparingLong(o -> o.getStatistic().getLastActivityTimestamp()))
.collect(Collectors.toList());
Connection oldestConnection = sortedPeerConnections.remove(0);
assertTrue(node.getPeerManager().checkMaxConnections());
// Need to wait because the shutDownCompleteHandler calls
// checkMaxConnections on the user thread after a delay
Thread.sleep(500);
verify(oldestConnection, times(1)).shutDown(
eq(CloseConnectionReason.TOO_MANY_CONNECTIONS_OPEN),
isA(Runnable.class));
sortedPeerConnections.forEach(connection -> {
verify(connection, never()).shutDown(
eq(CloseConnectionReason.TOO_MANY_CONNECTIONS_OPEN),
isA(Runnable.class));
});
}
@Test
public void testCheckMaxConnectionsNonDirectLimitNotExceeded() {
for (int i = 0; i < maxConnectionsNonDirect; i++) {
node.addOutboundConnection(Connection.PeerType.SEED_NODE);
}
assertEquals(maxConnectionsNonDirect, node.getNetworkNode().getAllConnections().size());
assertFalse(node.getPeerManager().checkMaxConnections());
node.getNetworkNode().getAllConnections().forEach(connection -> {
verify(connection, never()).shutDown(eq(CloseConnectionReason.TOO_MANY_CONNECTIONS_OPEN), isA(Runnable.class));
});
}
@Test
public void testCheckMaxConnectionsNonDirectLimitExceeded() throws InterruptedException {
for (int i = 0; i < maxConnectionsNonDirect + 1; i++) {
node.addOutboundConnection(Connection.PeerType.PEER);
}
assertEquals(maxConnectionsNonDirect + 1, node.getNetworkNode().getAllConnections().size());
List<Connection> sortedPeerConnections = node.getNetworkNode().getAllConnections().stream()
.filter(e -> e.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER &&
e.getPeerType() != Connection.PeerType.INITIAL_DATA_REQUEST)
.sorted(Comparator.comparingLong(o -> o.getStatistic().getLastActivityTimestamp()))
.collect(Collectors.toList());
Connection oldestConnection = sortedPeerConnections.remove(0);
assertTrue(node.getPeerManager().checkMaxConnections());
// Need to wait because the shutDownCompleteHandler calls
// checkMaxConnections on the user thread after a delay
Thread.sleep(500);
verify(oldestConnection, times(1)).shutDown(
eq(CloseConnectionReason.TOO_MANY_CONNECTIONS_OPEN),
isA(Runnable.class));
sortedPeerConnections.forEach(connection -> {
verify(connection, never()).shutDown(
eq(CloseConnectionReason.TOO_MANY_CONNECTIONS_OPEN),
isA(Runnable.class));
});
}
@Test
public void testCheckMaxConnectionsExceededWithOutboundSeeds() {
for (int i = 0; i < 3; i++) {
node.addOutboundConnection(Connection.PeerType.SEED_NODE);
}
assertEquals(3, node.getNetworkNode().getAllConnections().size());
assertFalse(node.getPeerManager().checkMaxConnections());
node.getNetworkNode().getAllConnections().forEach(connection -> {
verify(connection, never()).shutDown(eq(CloseConnectionReason.TOO_MANY_CONNECTIONS_OPEN), isA(Runnable.class));
});
}
}

View File

@ -1,486 +0,0 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.network.p2p.routing;
import bisq.network.p2p.DummySeedNode;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.P2PService;
import bisq.network.p2p.P2PServiceListener;
import bisq.network.p2p.network.LocalhostNetworkNode;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
// TorNode created. Took 6 sec.
// Hidden service created. Took 40-50 sec.
// Connection establishment takes about 4 sec.
// need to define seed node addresses first before using tor version
//TODO P2P network tests are outdated
@SuppressWarnings({"UnusedAssignment", "EmptyMethod"})
@Ignore
public class PeerManagerTest {
private static final Logger log = LoggerFactory.getLogger(PeerManagerTest.class);
private static final int MAX_CONNECTIONS = 100;
final boolean useLocalhostForP2P = true;
private CountDownLatch latch;
private Set<NodeAddress> seedNodes;
private int sleepTime;
private DummySeedNode seedNode1, seedNode2, seedNode3;
@Before
public void setup() throws InterruptedException {
LocalhostNetworkNode.setSimulateTorDelayTorNode(50);
LocalhostNetworkNode.setSimulateTorDelayHiddenService(8);
seedNodes = new HashSet<>();
//noinspection ConstantConditions
if (useLocalhostForP2P) {
//seedNodes.add(new NodeAddress("localhost:8001"));
// seedNodes.add(new NodeAddress("localhost:8002"));
seedNodes.add(new NodeAddress("localhost:8003"));
sleepTime = 100;
} else {
seedNodes.add(new NodeAddress("3omjuxn7z73pxoee.onion:8001"));
seedNodes.add(new NodeAddress("j24fxqyghjetgpdx.onion:8002"));
seedNodes.add(new NodeAddress("45367tl6unwec6kw.onion:8003"));
sleepTime = 1000;
}
}
@After
public void tearDown() throws InterruptedException {
Thread.sleep(sleepTime);
if (seedNode1 != null) {
CountDownLatch shutDownLatch = new CountDownLatch(1);
seedNode1.shutDown(shutDownLatch::countDown);
shutDownLatch.await();
}
if (seedNode2 != null) {
CountDownLatch shutDownLatch = new CountDownLatch(1);
seedNode2.shutDown(shutDownLatch::countDown);
shutDownLatch.await();
}
if (seedNode3 != null) {
CountDownLatch shutDownLatch = new CountDownLatch(1);
seedNode3.shutDown(shutDownLatch::countDown);
shutDownLatch.await();
}
}
// @Test
public void testSingleSeedNode() throws InterruptedException {
LocalhostNetworkNode.setSimulateTorDelayTorNode(0);
LocalhostNetworkNode.setSimulateTorDelayHiddenService(0);
seedNodes = new HashSet<>();
NodeAddress nodeAddress = new NodeAddress("localhost:8001");
seedNodes.add(nodeAddress);
seedNode1 = new DummySeedNode("test_dummy_dir");
latch = new CountDownLatch(2);
seedNode1.createAndStartP2PService(nodeAddress, MAX_CONNECTIONS, useLocalhostForP2P, 2, true,
seedNodes, new P2PServiceListener() {
@Override
public void onDataReceived() {
latch.countDown();
}
@Override
public void onTorNodeReady() {
}
@Override
public void onNoSeedNodeAvailable() {
}
@Override
public void onNoPeersAvailable() {
}
@Override
public void onUpdatedDataReceived() {
}
@Override
public void onHiddenServicePublished() {
latch.countDown();
}
@Override
public void onSetupFailed(Throwable throwable) {
}
@Override
public void onRequestCustomBridges() {
}
});
P2PService p2PService1 = seedNode1.getSeedNodeP2PService();
latch.await();
Thread.sleep(500);
//Assert.assertEquals(0, p2PService1.getPeerManager().getAuthenticatedAndReportedPeers().size());
}
@Test
public void test2SeedNodes() throws InterruptedException {
LocalhostNetworkNode.setSimulateTorDelayTorNode(0);
LocalhostNetworkNode.setSimulateTorDelayHiddenService(0);
seedNodes = new HashSet<>();
NodeAddress nodeAddress1 = new NodeAddress("localhost:8001");
seedNodes.add(nodeAddress1);
NodeAddress nodeAddress2 = new NodeAddress("localhost:8002");
seedNodes.add(nodeAddress2);
latch = new CountDownLatch(6);
seedNode1 = new DummySeedNode("test_dummy_dir");
seedNode1.createAndStartP2PService(nodeAddress1, MAX_CONNECTIONS, useLocalhostForP2P, 2, true, seedNodes, new P2PServiceListener() {
@Override
public void onDataReceived() {
latch.countDown();
}
@Override
public void onNoSeedNodeAvailable() {
}
@Override
public void onTorNodeReady() {
}
@Override
public void onNoPeersAvailable() {
}
@Override
public void onUpdatedDataReceived() {
latch.countDown();
}
@Override
public void onHiddenServicePublished() {
latch.countDown();
}
@Override
public void onSetupFailed(Throwable throwable) {
}
@Override
public void onRequestCustomBridges() {
}
});
P2PService p2PService1 = seedNode1.getSeedNodeP2PService();
Thread.sleep(500);
seedNode2 = new DummySeedNode("test_dummy_dir");
seedNode2.createAndStartP2PService(nodeAddress2, MAX_CONNECTIONS, useLocalhostForP2P, 2, true, seedNodes, new P2PServiceListener() {
@Override
public void onDataReceived() {
latch.countDown();
}
@Override
public void onNoSeedNodeAvailable() {
}
@Override
public void onTorNodeReady() {
}
@Override
public void onNoPeersAvailable() {
}
@Override
public void onUpdatedDataReceived() {
latch.countDown();
}
@Override
public void onHiddenServicePublished() {
latch.countDown();
}
@Override
public void onSetupFailed(Throwable throwable) {
}
@Override
public void onRequestCustomBridges() {
}
});
P2PService p2PService2 = seedNode2.getSeedNodeP2PService();
latch.await();
// Assert.assertEquals(1, p2PService1.getPeerManager().getAuthenticatedAndReportedPeers().size());
// Assert.assertEquals(1, p2PService2.getPeerManager().getAuthenticatedAndReportedPeers().size());
}
// @Test
public void testAuthentication() throws InterruptedException {
log.debug("### start");
LocalhostNetworkNode.setSimulateTorDelayTorNode(0);
LocalhostNetworkNode.setSimulateTorDelayHiddenService(0);
DummySeedNode seedNode1 = getAndStartSeedNode(8001);
log.debug("### seedNode1");
Thread.sleep(100);
log.debug("### seedNode1 100");
Thread.sleep(1000);
DummySeedNode seedNode2 = getAndStartSeedNode(8002);
// authentication:
// node2 -> node1 RequestAuthenticationMessage
// node1: close connection
// node1 -> node2 ChallengeMessage on new connection
// node2: authentication to node1 done if nonce ok
// node2 -> node1 GetPeersMessage
// node1: authentication to node2 done if nonce ok
// node1 -> node2 PeersMessage
// first authentication from seedNode2 to seedNode1, then from seedNode1 to seedNode2
//TODO
/* CountDownLatch latch1 = new CountDownLatch(2);
AuthenticationListener routingListener1 = new AuthenticationListener() {
@Override
public void onConnectionAuthenticated(Connection connection) {
log.debug("onConnectionAuthenticated " + connection);
latch1.countDown();
}
};
seedNode1.getP2PService().getPeerGroup().addPeerListener(routingListener1);
AuthenticationListener routingListener2 = new AuthenticationListener() {
@Override
public void onConnectionAuthenticated(Connection connection) {
log.debug("onConnectionAuthenticated " + connection);
latch1.countDown();
}
};
seedNode2.getP2PService().getPeerGroup().addPeerListener(routingListener2);
latch1.await();
seedNode1.getP2PService().getPeerGroup().removePeerListener(routingListener1);
seedNode2.getP2PService().getPeerGroup().removePeerListener(routingListener2);
// wait until Peers msg finished
Thread.sleep(sleepTime);
// authentication:
// authentication from seedNode3 to seedNode1, then from seedNode1 to seedNode3
// authentication from seedNode3 to seedNode2, then from seedNode2 to seedNode3
SeedNode seedNode3 = getAndStartSeedNode(8003);
CountDownLatch latch2 = new CountDownLatch(3);
seedNode1.getP2PService().getPeerGroup().addPeerListener(new AuthenticationListener() {
@Override
public void onConnectionAuthenticated(Connection connection) {
log.debug("onConnectionAuthenticated " + connection);
latch2.countDown();
}
});
seedNode2.getP2PService().getPeerGroup().addPeerListener(new AuthenticationListener() {
@Override
public void onConnectionAuthenticated(Connection connection) {
log.debug("onConnectionAuthenticated " + connection);
latch2.countDown();
}
});
seedNode3.getP2PService().getPeerGroup().addPeerListener(new AuthenticationListener() {
@Override
public void onConnectionAuthenticated(Connection connection) {
log.debug("onConnectionAuthenticated " + connection);
latch2.countDown();
}
});
latch2.await();
// wait until Peers msg finished
Thread.sleep(sleepTime);
CountDownLatch shutDownLatch = new CountDownLatch(3);
seedNode1.shutDown(() -> shutDownLatch.countDown());
seedNode2.shutDown(() -> shutDownLatch.countDown());
seedNode3.shutDown(() -> shutDownLatch.countDown());
shutDownLatch.await();*/
}
//@Test
public void testAuthenticationWithDisconnect() throws InterruptedException {
//TODO
/* LocalhostNetworkNode.setSimulateTorDelayTorNode(0);
LocalhostNetworkNode.setSimulateTorDelayHiddenService(0);
SeedNode seedNode1 = getAndStartSeedNode(8001);
SeedNode seedNode2 = getAndStartSeedNode(8002);
// authentication:
// node2 -> node1 RequestAuthenticationMessage
// node1: close connection
// node1 -> node2 ChallengeMessage on new connection
// node2: authentication to node1 done if nonce ok
// node2 -> node1 GetPeersMessage
// node1: authentication to node2 done if nonce ok
// node1 -> node2 PeersMessage
// first authentication from seedNode2 to seedNode1, then from seedNode1 to seedNode2
CountDownLatch latch1 = new CountDownLatch(2);
AuthenticationListener routingListener1 = new AuthenticationListener() {
@Override
public void onConnectionAuthenticated(Connection connection) {
log.debug("onConnectionAuthenticated " + connection);
latch1.countDown();
}
};
seedNode1.getP2PService().getPeerGroup().addPeerListener(routingListener1);
AuthenticationListener routingListener2 = new AuthenticationListener() {
@Override
public void onConnectionAuthenticated(Connection connection) {
log.debug("onConnectionAuthenticated " + connection);
latch1.countDown();
}
};
seedNode2.getP2PService().getPeerGroup().addPeerListener(routingListener2);
latch1.await();
// shut down node 2
Thread.sleep(sleepTime);
seedNode1.getP2PService().getPeerGroup().removePeerListener(routingListener1);
seedNode2.getP2PService().getPeerGroup().removePeerListener(routingListener2);
CountDownLatch shutDownLatch1 = new CountDownLatch(1);
seedNode2.shutDown(() -> shutDownLatch1.countDown());
shutDownLatch1.await();
// restart node 2
seedNode2 = getAndStartSeedNode(8002);
CountDownLatch latch3 = new CountDownLatch(1);
routingListener2 = new AuthenticationListener() {
@Override
public void onConnectionAuthenticated(Connection connection) {
log.debug("onConnectionAuthenticated " + connection);
latch3.countDown();
}
};
seedNode2.getP2PService().getPeerGroup().addPeerListener(routingListener2);
latch3.await();
Thread.sleep(sleepTime);
CountDownLatch shutDownLatch = new CountDownLatch(2);
seedNode1.shutDown(() -> shutDownLatch.countDown());
seedNode2.shutDown(() -> shutDownLatch.countDown());
shutDownLatch.await();*/
}
//@Test
public void testAuthenticationWithManyNodes() throws InterruptedException {
//TODO
/* int authentications = 0;
int length = 3;
SeedNode[] nodes = new SeedNode[length];
for (int i = 0; i < length; i++) {
SeedNode node = getAndStartSeedNode(8001 + i);
nodes[i] = node;
latch = new CountDownLatch(i * 2);
authentications += (i * 2);
node.getP2PService().getPeerGroup().addPeerListener(new AuthenticationListener() {
@Override
public void onConnectionAuthenticated(Connection connection) {
log.debug("onConnectionAuthenticated " + connection);
latch.countDown();
}
});
latch.await();
Thread.sleep(sleepTime);
}
log.debug("total authentications " + authentications);
Profiler.printSystemLoad(log);
// total authentications at 8 nodes = 56
// total authentications at com nodes = 90, System load (no. threads/used memory (MB)): 170/20
// total authentications at 20 nodes = 380, System load (no. threads/used memory (MB)): 525/46
for (int i = 0; i < length; i++) {
nodes[i].getP2PService().getPeerGroup().printAuthenticatedPeers();
nodes[i].getP2PService().getPeerGroup().printReportedPeers();
}
CountDownLatch shutDownLatch = new CountDownLatch(length);
for (int i = 0; i < length; i++) {
nodes[i].shutDown(() -> shutDownLatch.countDown());
}
shutDownLatch.await();*/
}
private DummySeedNode getAndStartSeedNode(int port) throws InterruptedException {
DummySeedNode seedNode = new DummySeedNode("test_dummy_dir");
latch = new CountDownLatch(1);
seedNode.createAndStartP2PService(new NodeAddress("localhost", port), MAX_CONNECTIONS, useLocalhostForP2P, 2, true, seedNodes, new P2PServiceListener() {
@Override
public void onDataReceived() {
latch.countDown();
}
@Override
public void onNoSeedNodeAvailable() {
}
@Override
public void onTorNodeReady() {
}
@Override
public void onNoPeersAvailable() {
}
@Override
public void onUpdatedDataReceived() {
}
@Override
public void onHiddenServicePublished() {
}
@Override
public void onSetupFailed(Throwable throwable) {
}
@Override
public void onRequestCustomBridges() {
}
});
latch.await();
Thread.sleep(sleepTime);
return seedNode;
}
}