From 96f8a6c2811f88127835da56dcda99c22ea0a796 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Wed, 5 Nov 2014 16:47:12 +0100 Subject: [PATCH 01/17] Use @Ignore --- src/test/java/io/bitsquare/msg/TomP2PTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/test/java/io/bitsquare/msg/TomP2PTests.java b/src/test/java/io/bitsquare/msg/TomP2PTests.java index 044048af11..c2ff6dc12e 100644 --- a/src/test/java/io/bitsquare/msg/TomP2PTests.java +++ b/src/test/java/io/bitsquare/msg/TomP2PTests.java @@ -46,6 +46,7 @@ import net.tomp2p.storage.Data; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; @@ -63,7 +64,7 @@ import static org.junit.Assert.*; * In the configure method and the connectionType you can define your test scenario further. */ -//@Ignore +@Ignore public class TomP2PTests { private static final Logger log = LoggerFactory.getLogger(TomP2PTests.class); From bcaa8b99469308d0026a71d872092dbae0fb1083 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Wed, 5 Nov 2014 17:02:26 +0100 Subject: [PATCH 02/17] Fix random ports --- .../java/io/bitsquare/msg/TomP2PTests.java | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/src/test/java/io/bitsquare/msg/TomP2PTests.java b/src/test/java/io/bitsquare/msg/TomP2PTests.java index c2ff6dc12e..a4803b73bb 100644 --- a/src/test/java/io/bitsquare/msg/TomP2PTests.java +++ b/src/test/java/io/bitsquare/msg/TomP2PTests.java @@ -82,10 +82,6 @@ public class TomP2PTests { private final static String SEED_IP_WAN_2 = "188.226.179.109"; private final static int SEED_PORT_WAN_2 = 5001; - // new Ports().tcpPort() returns a random port - private final static int CLIENT_1_PORT = new Ports().tcpPort(); - private final static int CLIENT_2_PORT = new Ports().tcpPort(); - // If you want to test in one specific connection mode define it directly, otherwise use UNKNOWN private final ConnectionType forcedConnectionType = ConnectionType.NAT; private ConnectionType resolvedConnectionType; @@ -108,6 +104,10 @@ public class TomP2PTests { // Only in NAT mode we have to deal with that bug. if (forcedConnectionType == ConnectionType.NAT || resolvedConnectionType == ConnectionType.NAT) ignoreSuccessTests = true; + + // new Ports().tcpPort() returns a random port + client1Port = new Ports().tcpPort(); + client2Port = new Ports().tcpPort(); } // In port forwarding mode the isSuccess returns false, but the DHT operations succeeded. @@ -136,6 +136,8 @@ public class TomP2PTests { private int seedPort; private PeerDHT peer1DHT; private PeerDHT peer2DHT; + private int client1Port; + private int client2Port; /////////////////////////////////////////////////////////////////////////////////////////// // Seed node @@ -194,30 +196,30 @@ public class TomP2PTests { @Test public void bootstrapInUnknownMode() throws Exception { if (forcedConnectionType == ConnectionType.UNKNOWN) - assertNotNull(bootstrapInUnknownMode("node_1", CLIENT_1_PORT)); + assertNotNull(bootstrapInUnknownMode("node_1", client1Port)); } @Test public void testBootstrapDirectConnection() throws Exception { if (forcedConnectionType == ConnectionType.DIRECT) - assertNotNull(bootstrapDirectConnection("node_1", CLIENT_1_PORT)); + assertNotNull(bootstrapDirectConnection("node_1", client1Port)); } @Test public void testBootstrapWithPortForwarding() throws Exception { if (forcedConnectionType == ConnectionType.NAT) - assertNotNull(bootstrapWithPortForwarding("node_1", CLIENT_1_PORT)); + assertNotNull(bootstrapWithPortForwarding("node_1", client1Port)); } @Test public void testBootstrapInRelayMode() throws Exception { if (forcedConnectionType == ConnectionType.RELAY) - assertNotNull(bootstrapInRelayMode("node_1", CLIENT_1_PORT)); + assertNotNull(bootstrapInRelayMode("node_1", client1Port)); } @Test public void testPut() throws Exception { - peer1DHT = getDHTPeer("node_1", CLIENT_1_PORT); + peer1DHT = getDHTPeer("node_1", client1Port); FuturePut futurePut = peer1DHT.put(Number160.createHash("key")).data(new Data("hallo")).start(); futurePut.awaitUninterruptibly(); if (!ignoreSuccessTests) @@ -226,14 +228,14 @@ public class TomP2PTests { @Test public void testPutGet() throws Exception { - peer1DHT = getDHTPeer("node_1", CLIENT_1_PORT); + peer1DHT = getDHTPeer("node_1", client1Port); FuturePut futurePut = peer1DHT.put(Number160.createHash("key")).data(new Data("hallo")).start(); futurePut.awaitUninterruptibly(); if (!ignoreSuccessTests) assertTrue(futurePut.isSuccess()); - peer2DHT = getDHTPeer("node_2", CLIENT_2_PORT); + peer2DHT = getDHTPeer("node_2", client2Port); FutureGet futureGet = peer2DHT.get(Number160.createHash("key")).start(); futureGet.awaitUninterruptibly(); if (!ignoreSuccessTests) @@ -243,7 +245,7 @@ public class TomP2PTests { @Test public void testAdd() throws Exception { - peer1DHT = getDHTPeer("node_1", CLIENT_1_PORT); + peer1DHT = getDHTPeer("node_1", client1Port); FuturePut futurePut1 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo1")).start(); futurePut1.awaitUninterruptibly(); if (!ignoreSuccessTests) @@ -257,7 +259,7 @@ public class TomP2PTests { @Test public void testAddGet() throws Exception { - peer1DHT = getDHTPeer("node_1", CLIENT_1_PORT); + peer1DHT = getDHTPeer("node_1", client1Port); FuturePut futurePut1 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo1")).start(); futurePut1.awaitUninterruptibly(); if (!ignoreSuccessTests) @@ -269,7 +271,7 @@ public class TomP2PTests { assertTrue(futurePut2.isSuccess()); - peer2DHT = getDHTPeer("node_2", CLIENT_2_PORT); + peer2DHT = getDHTPeer("node_2", client2Port); FutureGet futureGet = peer2DHT.get(Number160.createHash("locationKey")).all().start(); futureGet.awaitUninterruptibly(); if (!ignoreSuccessTests) @@ -282,7 +284,7 @@ public class TomP2PTests { @Test public void testAddRemove() throws Exception { - peer1DHT = getDHTPeer("node_1", CLIENT_1_PORT); + peer1DHT = getDHTPeer("node_1", client1Port); FuturePut futurePut1 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo1")).start(); futurePut1.awaitUninterruptibly(); if (!ignoreSuccessTests) @@ -294,7 +296,7 @@ public class TomP2PTests { assertTrue(futurePut2.isSuccess()); - peer2DHT = getDHTPeer("node_2", CLIENT_2_PORT); + peer2DHT = getDHTPeer("node_2", client2Port); Number160 contentKey = new Data("hallo1").hash(); FutureRemove futureRemove = peer2DHT.remove(Number160.createHash("locationKey")).contentKey(contentKey).start(); futureRemove.awaitUninterruptibly(); @@ -320,8 +322,8 @@ public class TomP2PTests { @Test public void testSendDirectRelay() throws Exception { if (forcedConnectionType == ConnectionType.RELAY || resolvedConnectionType == ConnectionType.RELAY) { - peer1DHT = getDHTPeer("node_1", CLIENT_1_PORT); - peer2DHT = getDHTPeer("node_2", CLIENT_2_PORT); + peer1DHT = getDHTPeer("node_1", client1Port); + peer2DHT = getDHTPeer("node_2", client2Port); final CountDownLatch countDownLatch = new CountDownLatch(1); @@ -353,7 +355,7 @@ public class TomP2PTests { @Test public void testSendDirectPortForwarding() throws Exception { if (forcedConnectionType == ConnectionType.NAT || resolvedConnectionType == ConnectionType.NAT) { - peer1DHT = getDHTPeer("node_1", CLIENT_1_PORT); + peer1DHT = getDHTPeer("node_1", client1Port); PeerAddress reachablePeerAddress = new PeerAddress(Number160.createHash(seedId), seedIP, seedPort, seedPort); From 4fb8030a43e83900cd2f43c5e9c3399a7a25df31 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Wed, 5 Nov 2014 20:41:34 +0100 Subject: [PATCH 03/17] Add loops, add extra SeedNode class, remove LanTest class --- .../bitsquare/msg/BasicUsecasesInLANTest.java | 202 ---------- .../io/bitsquare/msg/SeedNodeForTesting.java | 72 ++++ .../java/io/bitsquare/msg/TomP2PTests.java | 380 ++++++++++-------- 3 files changed, 278 insertions(+), 376 deletions(-) delete mode 100644 src/test/java/io/bitsquare/msg/BasicUsecasesInLANTest.java create mode 100644 src/test/java/io/bitsquare/msg/SeedNodeForTesting.java diff --git a/src/test/java/io/bitsquare/msg/BasicUsecasesInLANTest.java b/src/test/java/io/bitsquare/msg/BasicUsecasesInLANTest.java deleted file mode 100644 index 9e866d2527..0000000000 --- a/src/test/java/io/bitsquare/msg/BasicUsecasesInLANTest.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * This file is part of Bitsquare. - * - * Bitsquare is free software: you can redistribute it and/or modify it - * under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * Bitsquare is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public - * License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with Bitsquare. If not, see . - */ - -package io.bitsquare.msg; - -import java.io.IOException; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import net.tomp2p.dht.FutureGet; -import net.tomp2p.dht.FuturePut; -import net.tomp2p.dht.PeerBuilderDHT; -import net.tomp2p.dht.PeerDHT; -import net.tomp2p.dht.StorageLayer; -import net.tomp2p.dht.StorageMemory; -import net.tomp2p.futures.FutureDirect; -import net.tomp2p.futures.FutureDiscover; -import net.tomp2p.nat.FutureNAT; -import net.tomp2p.nat.FutureRelayNAT; -import net.tomp2p.nat.PeerBuilderNAT; -import net.tomp2p.nat.PeerNAT; -import net.tomp2p.p2p.Peer; -import net.tomp2p.p2p.PeerBuilder; -import net.tomp2p.peers.Number160; -import net.tomp2p.peers.PeerAddress; -import net.tomp2p.storage.Data; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.*; - -public class BasicUsecasesInLANTest { - private static final Logger log = LoggerFactory.getLogger(BasicUsecasesInLANTest.class); - - private final static String SERVER_ID = "localhost"; - private final static String SERVER_IP = "127.0.0.1"; - private final static int SERVER_PORT = 5000; - - private final static String CLIENT_1_ID = "alice"; - private final static String CLIENT_2_ID = "bob"; - private final static int CLIENT_1_PORT = 6510; - private final static int CLIENT_2_PORT = 6511; - - private Thread serverThread; - - @Before - public void startServer() throws Exception { - serverThread = new Thread(() -> { - Peer peer = null; - try { - peer = new PeerBuilder(Number160.createHash(SERVER_ID)).ports(SERVER_PORT).start(); - log.debug("peer started."); - while (true) { - for (PeerAddress pa : peer.peerBean().peerMap().all()) { - log.debug("peer online (TCP):" + pa); - } - Thread.sleep(2000); - } - } catch (InterruptedException e) { - if (peer != null) - peer.shutdown().awaitUninterruptibly(); - } catch (IOException e2) { - e2.printStackTrace(); - } - }); - serverThread.start(); - } - - @After - public void stopServer() throws Exception { - serverThread.interrupt(); - } - - @Test - @Ignore - public void testBootstrap() throws Exception { - PeerDHT peerDHT = startClient(CLIENT_1_ID, CLIENT_1_PORT); - assertEquals(CLIENT_1_PORT, peerDHT.peerAddress().tcpPort()); - assertEquals(CLIENT_1_PORT, peerDHT.peerAddress().udpPort()); - peerDHT.shutdown().awaitUninterruptibly(); - } - - @Test - @Ignore - public void testDHT() throws Exception { - PeerDHT peer1DHT = startClient(CLIENT_1_ID, CLIENT_1_PORT); - PeerDHT peer2DHT = startClient(CLIENT_2_ID, CLIENT_2_PORT); - - FuturePut futurePut1 = peer1DHT.put(Number160.createHash("key")).data(new Data("hallo1")).start(); - futurePut1.awaitUninterruptibly(); - // why fails that? - //assertTrue(futurePut1.isSuccess()); - - FutureGet futureGet2 = peer1DHT.get(Number160.createHash("key")).start(); - futureGet2.awaitUninterruptibly(); - assertTrue(futureGet2.isSuccess()); - assertNotNull(futureGet2.data()); - assertEquals("hallo1", futureGet2.data().object()); - - peer1DHT.shutdown().awaitUninterruptibly(); - peer2DHT.shutdown().awaitUninterruptibly(); - } - - @Test - @Ignore - public void testSendDirect() throws Exception { - PeerDHT peer1DHT = startClient(CLIENT_1_ID, CLIENT_1_PORT); - PeerDHT peer2DHT = startClient(CLIENT_2_ID, CLIENT_2_PORT); - - final CountDownLatch countDownLatch = new CountDownLatch(1); - final StringBuilder result = new StringBuilder(); - peer2DHT.peer().objectDataReply((sender, request) -> { - countDownLatch.countDown(); - result.append(String.valueOf(request)); - return null; - }); - - FutureDirect futureDirect = peer1DHT.peer().sendDirect(peer2DHT.peerAddress()).object("hallo").start(); - futureDirect.awaitUninterruptibly(); - countDownLatch.await(1, TimeUnit.SECONDS); - if (countDownLatch.getCount() > 0) - Assert.fail("The test method did not complete successfully!"); - - peer1DHT.shutdown().awaitUninterruptibly(); - peer2DHT.shutdown().awaitUninterruptibly(); - - assertEquals("hallo", result.toString()); - } - - - private PeerDHT startClient(String clientId, int clientPort) throws Exception { - try { - Peer peer = new PeerBuilder(Number160.createHash(clientId)).ports(clientPort).behindFirewall().start(); - PeerDHT peerDHT = new PeerBuilderDHT(peer).storageLayer(new StorageLayer(new StorageMemory())).start(); - - PeerAddress masterNodeAddress = new PeerAddress(Number160.createHash(SERVER_ID), SERVER_IP, SERVER_PORT, - SERVER_PORT); - FutureDiscover futureDiscover = peer.discover().peerAddress(masterNodeAddress).start(); - futureDiscover.awaitUninterruptibly(); - if (futureDiscover.isSuccess()) { - log.info("Discover with direct connection successful. Address = " + futureDiscover.peerAddress()); - return peerDHT; - } - else { - PeerNAT peerNAT = new PeerBuilderNAT(peer).start(); - FutureNAT futureNAT = peerNAT.startSetupPortforwarding(futureDiscover); - futureNAT.awaitUninterruptibly(); - if (futureNAT.isSuccess()) { - log.info("Automatic port forwarding is setup. Address = " + - futureNAT.peerAddress()); - return peerDHT; - } - else { - FutureRelayNAT futureRelayNAT = peerNAT.startRelay(futureDiscover, futureNAT); - futureRelayNAT.awaitUninterruptibly(); - if (futureRelayNAT.isSuccess()) { - log.info("Bootstrap using relay successful. Address = " + - peer.peerAddress()); - futureRelayNAT.shutdown(); - peer.shutdown().awaitUninterruptibly(); - return peerDHT; - } - else { - log.error("Bootstrap using relay failed " + futureRelayNAT.failedReason()); - Assert.fail("Bootstrap using relay failed " + futureRelayNAT.failedReason()); - futureRelayNAT.shutdown(); - peer.shutdown().awaitUninterruptibly(); - return null; - } - } - } - } catch (IOException e) { - log.error("Bootstrap in relay mode failed " + e.getMessage()); - e.printStackTrace(); - Assert.fail("Bootstrap in relay mode failed " + e.getMessage()); - return null; - } - } - -} diff --git a/src/test/java/io/bitsquare/msg/SeedNodeForTesting.java b/src/test/java/io/bitsquare/msg/SeedNodeForTesting.java new file mode 100644 index 0000000000..cee0df632f --- /dev/null +++ b/src/test/java/io/bitsquare/msg/SeedNodeForTesting.java @@ -0,0 +1,72 @@ +/* + * This file is part of Bitsquare. + * + * Bitsquare is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bitsquare is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bitsquare. If not, see . + */ + +package io.bitsquare.msg; + +import net.tomp2p.dht.PeerBuilderDHT; +import net.tomp2p.dht.PeerDHT; +import net.tomp2p.nat.PeerBuilderNAT; +import net.tomp2p.p2p.Peer; +import net.tomp2p.p2p.PeerBuilder; +import net.tomp2p.peers.Number160; +import net.tomp2p.peers.PeerAddress; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Used for testing with {@link TomP2PTests} + */ +public class SeedNodeForTesting { + private static final Logger log = LoggerFactory.getLogger(SeedNodeForTesting.class); + + public static void main(String[] args) throws Exception { + // Define your seed node IP and port + // "127.0.0.1" for localhost or SEED_ID_WAN_1 + new SeedNodeForTesting().startSeedNode("localhost", 5000); + } + + public Thread startSeedNode(String seedNodeId, int seedNodePort) { + Thread thread = new Thread(() -> { + Peer peer = null; + try { + peer = new PeerBuilder(Number160.createHash(seedNodeId)).ports(seedNodePort).start(); + PeerDHT peerDHT = new PeerBuilderDHT(peer).start(); + peerDHT.peer().objectDataReply((sender, request) -> { + log.trace("received request: ", request.toString()); + return "pong"; + }); + + new PeerBuilderNAT(peer).start(); + + log.debug("SeedNode started."); + for (; ; ) { + for (PeerAddress pa : peer.peerBean().peerMap().all()) { + log.debug("peer online:" + pa); + } + Thread.sleep(2000); + } + } catch (Exception e) { + if (peer != null) + peer.shutdown().awaitUninterruptibly(); + } + }); + thread.start(); + return thread; + } + +} diff --git a/src/test/java/io/bitsquare/msg/TomP2PTests.java b/src/test/java/io/bitsquare/msg/TomP2PTests.java index a4803b73bb..d347af6e76 100644 --- a/src/test/java/io/bitsquare/msg/TomP2PTests.java +++ b/src/test/java/io/bitsquare/msg/TomP2PTests.java @@ -58,16 +58,41 @@ import static org.junit.Assert.*; * Test bootstrapping, DHT operations like put/get/add/remove and sendDirect in both LAN and WAN environment * Test scenarios in direct connection, auto port forwarding or relay mode. *

- * The seed node code is in startSeedNode. + * The start a seed node code use the {@link SeedNodeForTesting} class. *

* To configure your test environment edit the static fields for id, IP and port. - * In the configure method and the connectionType you can define your test scenario further. + * In the configure method and the connectionType you can define your test scenario. */ @Ignore public class TomP2PTests { private static final Logger log = LoggerFactory.getLogger(TomP2PTests.class); + /** + * Use UNKNOWN when you want to test the strategy to try first direct, then nat and lat relay + * Use on eof the others when you want to connect only with that mode. Be sure that you can really succeed in that + * mode (e.g. for NAT you need to have a UPnP or NAT PMP enabled router). + */ + private enum ConnectionType { + UNKNOWN, + DIRECT, + NAT, + RELAY + } + + // need to be static to keep them during tests + private final static Map cachedPeers = new HashMap<>(); + + private String seedId; + private String seedIP; + private int seedPort; + private PeerDHT peer1DHT; + private PeerDHT peer2DHT; + private int client1Port; + private int client2Port; + private ConnectionType resolvedConnectionType; + + /////////////////////////////////////////////////////////////////////////////////////////// // Configure /////////////////////////////////////////////////////////////////////////////////////////// @@ -83,8 +108,18 @@ public class TomP2PTests { private final static int SEED_PORT_WAN_2 = 5001; // If you want to test in one specific connection mode define it directly, otherwise use UNKNOWN - private final ConnectionType forcedConnectionType = ConnectionType.NAT; - private ConnectionType resolvedConnectionType; + private final ConnectionType forcedConnectionType = ConnectionType.DIRECT; + + // In port forwarding mode the isSuccess returns false, but the DHT operations succeeded. + // Needs investigation why. Will be removed as far its fixed. + private boolean ignoreSuccessTests = false; + + // If cache is used tests get faster as it doesn't create and bootstrap a new node at every test. + // Need to observe if it can have some side effects. + private boolean cacheClients = true; + + // Use that to stress test with repeated run of the test method body + private int stressTestLoopCount = 1; @Before public void configure() { @@ -105,77 +140,17 @@ public class TomP2PTests { if (forcedConnectionType == ConnectionType.NAT || resolvedConnectionType == ConnectionType.NAT) ignoreSuccessTests = true; + client1Port = getNewRandomPort(); + client2Port = getNewRandomPort(); + } + + private int getNewRandomPort() { // new Ports().tcpPort() returns a random port - client1Port = new Ports().tcpPort(); - client2Port = new Ports().tcpPort(); - } + int newPort = new Ports().tcpPort(); + while (newPort == client1Port || newPort == client2Port) + newPort = new Ports().tcpPort(); - // In port forwarding mode the isSuccess returns false, but the DHT operations succeeded. - // Needs investigation why. Will be removed as far its fixed. - private boolean ignoreSuccessTests = false; - - // If cache is used tests get faster as it doesn't create and bootstrap a new node at every test. - // Need to observe if it can have some side effects. - private boolean cacheClients = true; - - - /////////////////////////////////////////////////////////////////////////////////////////// - // Private fields - /////////////////////////////////////////////////////////////////////////////////////////// - - private enum ConnectionType { - UNKNOWN, - DIRECT, - NAT, - RELAY - } - - private final static Map cachedPeers = new HashMap<>(); - private String seedId; - private String seedIP; - private int seedPort; - private PeerDHT peer1DHT; - private PeerDHT peer2DHT; - private int client1Port; - private int client2Port; - - /////////////////////////////////////////////////////////////////////////////////////////// - // Seed node - /////////////////////////////////////////////////////////////////////////////////////////// - - public static void main(String[] args) throws Exception { - // Define your seed node IP and port - // "127.0.0.1" for localhost or SEED_ID_WAN_1 - new TomP2PTests().startSeedNode("127.0.0.1", 5000); - } - - public Thread startSeedNode(String seedNodeId, int seedNodePort) { - Thread thread = new Thread(() -> { - Peer peer = null; - try { - peer = new PeerBuilder(Number160.createHash(seedNodeId)).ports(seedNodePort).start(); - PeerDHT peerDHT = new PeerBuilderDHT(peer).start(); - peerDHT.peer().objectDataReply((sender, request) -> { - log.trace("received request: ", request.toString()); - return "pong"; - }); - - new PeerBuilderNAT(peer).start(); - - log.debug("peer started."); - for (; ; ) { - for (PeerAddress pa : peer.peerBean().peerMap().all()) { - log.debug("peer online (TCP):" + pa); - } - Thread.sleep(2000); - } - } catch (Exception e) { - if (peer != null) - peer.shutdown().awaitUninterruptibly(); - } - }); - thread.start(); - return thread; + return newPort; } @@ -195,175 +170,230 @@ public class TomP2PTests { @Test public void bootstrapInUnknownMode() throws Exception { - if (forcedConnectionType == ConnectionType.UNKNOWN) - assertNotNull(bootstrapInUnknownMode("node_1", client1Port)); + for (int i = 0; i < stressTestLoopCount; i++) { + configure(); + if (forcedConnectionType == ConnectionType.UNKNOWN) + assertNotNull(bootstrapInUnknownMode("node_1", client1Port)); + + shutdown(); + } } @Test public void testBootstrapDirectConnection() throws Exception { - if (forcedConnectionType == ConnectionType.DIRECT) - assertNotNull(bootstrapDirectConnection("node_1", client1Port)); + for (int i = 0; i < stressTestLoopCount; i++) { + configure(); + if (forcedConnectionType == ConnectionType.DIRECT) + assertNotNull(bootstrapDirectConnection("node_1", client1Port)); + + shutdown(); + } } @Test public void testBootstrapWithPortForwarding() throws Exception { - if (forcedConnectionType == ConnectionType.NAT) - assertNotNull(bootstrapWithPortForwarding("node_1", client1Port)); + for (int i = 0; i < stressTestLoopCount; i++) { + configure(); + if (forcedConnectionType == ConnectionType.NAT) + assertNotNull(bootstrapWithPortForwarding("node_1", client1Port)); + + shutdown(); + } } @Test public void testBootstrapInRelayMode() throws Exception { - if (forcedConnectionType == ConnectionType.RELAY) - assertNotNull(bootstrapInRelayMode("node_1", client1Port)); + for (int i = 0; i < stressTestLoopCount; i++) { + configure(); + if (forcedConnectionType == ConnectionType.RELAY) + assertNotNull(bootstrapInRelayMode("node_1", client1Port)); + + shutdown(); + } } @Test public void testPut() throws Exception { - peer1DHT = getDHTPeer("node_1", client1Port); - FuturePut futurePut = peer1DHT.put(Number160.createHash("key")).data(new Data("hallo")).start(); - futurePut.awaitUninterruptibly(); - if (!ignoreSuccessTests) - assertTrue(futurePut.isSuccess()); + for (int i = 0; i < stressTestLoopCount; i++) { + configure(); + + peer1DHT = getDHTPeer("node_1", client1Port); + FuturePut futurePut = peer1DHT.put(Number160.createHash("key")).data(new Data("hallo")).start(); + futurePut.awaitUninterruptibly(); + if (!ignoreSuccessTests) + assertTrue(futurePut.isSuccess()); + + shutdown(); + } } @Test public void testPutGet() throws Exception { - peer1DHT = getDHTPeer("node_1", client1Port); - FuturePut futurePut = peer1DHT.put(Number160.createHash("key")).data(new Data("hallo")).start(); - futurePut.awaitUninterruptibly(); - if (!ignoreSuccessTests) - assertTrue(futurePut.isSuccess()); + for (int i = 0; i < stressTestLoopCount; i++) { + configure(); + peer1DHT = getDHTPeer("node_1", client1Port); + FuturePut futurePut = peer1DHT.put(Number160.createHash("key")).data(new Data("hallo")).start(); + futurePut.awaitUninterruptibly(); + if (!ignoreSuccessTests) + assertTrue(futurePut.isSuccess()); - peer2DHT = getDHTPeer("node_2", client2Port); - FutureGet futureGet = peer2DHT.get(Number160.createHash("key")).start(); - futureGet.awaitUninterruptibly(); - if (!ignoreSuccessTests) - assertTrue(futureGet.isSuccess()); - assertEquals("hallo", futureGet.data().object()); + peer2DHT = getDHTPeer("node_2", client2Port); + FutureGet futureGet = peer2DHT.get(Number160.createHash("key")).start(); + futureGet.awaitUninterruptibly(); + if (!ignoreSuccessTests) + assertTrue(futureGet.isSuccess()); + assertEquals("hallo", futureGet.data().object()); + + shutdown(); + } } @Test public void testAdd() throws Exception { - peer1DHT = getDHTPeer("node_1", client1Port); - FuturePut futurePut1 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo1")).start(); - futurePut1.awaitUninterruptibly(); - if (!ignoreSuccessTests) - assertTrue(futurePut1.isSuccess()); + for (int i = 0; i < stressTestLoopCount; i++) { + configure(); + peer1DHT = getDHTPeer("node_1", client1Port); + FuturePut futurePut1 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo1")).start(); + futurePut1.awaitUninterruptibly(); + if (!ignoreSuccessTests) + assertTrue(futurePut1.isSuccess()); - FuturePut futurePut2 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo2")).start(); - futurePut2.awaitUninterruptibly(); - if (!ignoreSuccessTests) - assertTrue(futurePut2.isSuccess()); + FuturePut futurePut2 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo2")).start(); + futurePut2.awaitUninterruptibly(); + if (!ignoreSuccessTests) + assertTrue(futurePut2.isSuccess()); + + shutdown(); + } } @Test public void testAddGet() throws Exception { - peer1DHT = getDHTPeer("node_1", client1Port); - FuturePut futurePut1 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo1")).start(); - futurePut1.awaitUninterruptibly(); - if (!ignoreSuccessTests) - assertTrue(futurePut1.isSuccess()); + for (int i = 0; i < stressTestLoopCount; i++) { + configure(); + peer1DHT = getDHTPeer("node_1", client1Port); + FuturePut futurePut1 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo1")).start(); + futurePut1.awaitUninterruptibly(); + if (!ignoreSuccessTests) + assertTrue(futurePut1.isSuccess()); - FuturePut futurePut2 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo2")).start(); - futurePut2.awaitUninterruptibly(); - if (!ignoreSuccessTests) - assertTrue(futurePut2.isSuccess()); + FuturePut futurePut2 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo2")).start(); + futurePut2.awaitUninterruptibly(); + if (!ignoreSuccessTests) + assertTrue(futurePut2.isSuccess()); - peer2DHT = getDHTPeer("node_2", client2Port); - FutureGet futureGet = peer2DHT.get(Number160.createHash("locationKey")).all().start(); - futureGet.awaitUninterruptibly(); - if (!ignoreSuccessTests) - assertTrue(futureGet.isSuccess()); + peer2DHT = getDHTPeer("node_2", client2Port); + FutureGet futureGet = peer2DHT.get(Number160.createHash("locationKey")).all().start(); + futureGet.awaitUninterruptibly(); + if (!ignoreSuccessTests) + assertTrue(futureGet.isSuccess()); - assertTrue(futureGet.dataMap().values().contains(new Data("hallo1"))); - assertTrue(futureGet.dataMap().values().contains(new Data("hallo2"))); - assertTrue(futureGet.dataMap().values().size() == 2); + assertTrue(futureGet.dataMap().values().contains(new Data("hallo1"))); + assertTrue(futureGet.dataMap().values().contains(new Data("hallo2"))); + assertTrue(futureGet.dataMap().values().size() == 2); + + shutdown(); + } } @Test public void testAddRemove() throws Exception { - peer1DHT = getDHTPeer("node_1", client1Port); - FuturePut futurePut1 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo1")).start(); - futurePut1.awaitUninterruptibly(); - if (!ignoreSuccessTests) - assertTrue(futurePut1.isSuccess()); + for (int i = 0; i < stressTestLoopCount; i++) { + configure(); + peer1DHT = getDHTPeer("node_1", client1Port); + FuturePut futurePut1 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo1")).start(); + futurePut1.awaitUninterruptibly(); + if (!ignoreSuccessTests) + assertTrue(futurePut1.isSuccess()); - FuturePut futurePut2 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo2")).start(); - futurePut2.awaitUninterruptibly(); - if (!ignoreSuccessTests) - assertTrue(futurePut2.isSuccess()); + FuturePut futurePut2 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo2")).start(); + futurePut2.awaitUninterruptibly(); + if (!ignoreSuccessTests) + assertTrue(futurePut2.isSuccess()); - peer2DHT = getDHTPeer("node_2", client2Port); - Number160 contentKey = new Data("hallo1").hash(); - FutureRemove futureRemove = peer2DHT.remove(Number160.createHash("locationKey")).contentKey(contentKey).start(); - futureRemove.awaitUninterruptibly(); + peer2DHT = getDHTPeer("node_2", client2Port); + Number160 contentKey = new Data("hallo1").hash(); + FutureRemove futureRemove = peer2DHT.remove(Number160.createHash("locationKey")).contentKey(contentKey) + .start(); + futureRemove.awaitUninterruptibly(); - // TODO: That fails always also with localhost seed node - /*if (!ignoreSuccessTests) - assertTrue(futureRemove.isSuccess());*/ + // That fails sometimes in direct mode and NAT + if (!ignoreSuccessTests) + assertTrue(futureRemove.isSuccess()); - FutureGet futureGet = peer2DHT.get(Number160.createHash("locationKey")).all().start(); - futureGet.awaitUninterruptibly(); - if (!ignoreSuccessTests) - assertTrue(futureGet.isSuccess()); + FutureGet futureGet = peer2DHT.get(Number160.createHash("locationKey")).all().start(); + futureGet.awaitUninterruptibly(); + if (!ignoreSuccessTests) + assertTrue(futureGet.isSuccess()); - assertTrue(futureGet.dataMap().values().contains(new Data("hallo2"))); - assertTrue(futureGet.dataMap().values().size() == 1); + assertTrue(futureGet.dataMap().values().contains(new Data("hallo2"))); + assertTrue(futureGet.dataMap().values().size() == 1); + + shutdown(); + } } // The sendDirect operation fails in port forwarding mode because most routers does not support NAT reflections. // So if both clients are behind NAT they cannot send direct message to each other. // That will probably be fixed in a future version of TomP2P - // In relay mode the test succeeds @Test - public void testSendDirectRelay() throws Exception { - if (forcedConnectionType == ConnectionType.RELAY || resolvedConnectionType == ConnectionType.RELAY) { - peer1DHT = getDHTPeer("node_1", client1Port); - peer2DHT = getDHTPeer("node_2", client2Port); + public void testSendDirectBetweenLocalPeers() throws Exception { + for (int i = 0; i < stressTestLoopCount; i++) { + configure(); + if (forcedConnectionType != ConnectionType.NAT && resolvedConnectionType != ConnectionType.NAT) { + peer1DHT = getDHTPeer("node_1", client1Port); + peer2DHT = getDHTPeer("node_2", client2Port); - final CountDownLatch countDownLatch = new CountDownLatch(1); + final CountDownLatch countDownLatch = new CountDownLatch(1); - final StringBuilder result = new StringBuilder(); - peer2DHT.peer().objectDataReply((sender, request) -> { - countDownLatch.countDown(); - result.append(String.valueOf(request)); - return "pong"; - }); - FuturePeerConnection futurePeerConnection = peer1DHT.peer().createPeerConnection(peer2DHT.peer() - .peerAddress(), - 500); - FutureDirect futureDirect = peer1DHT.peer().sendDirect(futurePeerConnection).object("hallo").start(); - futureDirect.awaitUninterruptibly(); + final StringBuilder result = new StringBuilder(); + peer2DHT.peer().objectDataReply((sender, request) -> { + countDownLatch.countDown(); + result.append(String.valueOf(request)); + return "pong"; + }); + FuturePeerConnection futurePeerConnection = peer1DHT.peer().createPeerConnection(peer2DHT.peer() + .peerAddress(), 500); + FutureDirect futureDirect = peer1DHT.peer().sendDirect(futurePeerConnection).object("hallo").start(); + futureDirect.awaitUninterruptibly(); - countDownLatch.await(3, TimeUnit.SECONDS); - if (countDownLatch.getCount() > 0) - Assert.fail("The test method did not complete successfully!"); + countDownLatch.await(3, TimeUnit.SECONDS); + if (countDownLatch.getCount() > 0) + Assert.fail("The test method did not complete successfully!"); - assertEquals("hallo", result.toString()); - assertTrue(futureDirect.isSuccess()); - assertEquals("pong", futureDirect.object()); + assertEquals("hallo", result.toString()); + assertTrue(futureDirect.isSuccess()); + log.debug(futureDirect.object().toString()); + assertEquals("pong", futureDirect.object()); + } + + shutdown(); } } - // That test should succeed in port forwarding as we use the server seed node as receiver. + // That test should always succeed as we use the server seed node as receiver. // A node can send a message to another peer which is not in the same LAN. @Test - public void testSendDirectPortForwarding() throws Exception { - if (forcedConnectionType == ConnectionType.NAT || resolvedConnectionType == ConnectionType.NAT) { + public void testSendDirectToSeedNode() throws Exception { + for (int i = 0; i < stressTestLoopCount; i++) { + configure(); peer1DHT = getDHTPeer("node_1", client1Port); PeerAddress reachablePeerAddress = new PeerAddress(Number160.createHash(seedId), seedIP, seedPort, seedPort); - FuturePeerConnection futurePeerConnection = peer1DHT.peer().createPeerConnection(reachablePeerAddress, 500); + FuturePeerConnection futurePeerConnection = peer1DHT.peer().createPeerConnection + (reachablePeerAddress, 500); FutureDirect futureDirect = peer1DHT.peer().sendDirect(futurePeerConnection).object("hallo").start(); futureDirect.awaitUninterruptibly(); assertTrue(futureDirect.isSuccess()); assertEquals("pong", futureDirect.object()); + + shutdown(); } } @@ -556,8 +586,10 @@ public class TomP2PTests { } if (peer == null) - Assert.fail("Bootstrapping in all modes failed. Check if the seed node is running. " + - "seedNodeId= " + seedNodeId + + Assert.fail("Bootstrapping failed. Check if the seed node is running. " + + "forcedConnectionType= " + forcedConnectionType + + " resolvedConnectionType= " + resolvedConnectionType + + " seedNodeId= " + seedNodeId + " seedNodeIP= " + seedNodeIP + " seedNodePort= " + seedNodePort); From df3a50706639085ccd12d0257f3f222465737e8f Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Wed, 5 Nov 2014 22:00:55 +0100 Subject: [PATCH 04/17] Change polling from 1 sec to 3 sec, refactorings --- .../java/io/bitsquare/app/ArgumentParser.java | 13 ++++-- .../java/io/bitsquare/app/cli/SeedNode.java | 16 +++---- .../gui/main/trade/offerbook/OfferBook.java | 2 +- .../msg/BootstrappedPeerFactory.java | 46 ++++++++++--------- .../io/bitsquare/msg/actor/DHTManager.java | 8 ++-- 5 files changed, 45 insertions(+), 40 deletions(-) diff --git a/src/main/java/io/bitsquare/app/ArgumentParser.java b/src/main/java/io/bitsquare/app/ArgumentParser.java index 2507017c4c..2d784655b1 100644 --- a/src/main/java/io/bitsquare/app/ArgumentParser.java +++ b/src/main/java/io/bitsquare/app/ArgumentParser.java @@ -17,6 +17,8 @@ package io.bitsquare.app; +import io.bitsquare.network.BootstrapNode; + import net.sourceforge.argparse4j.ArgumentParsers; import net.sourceforge.argparse4j.inf.ArgumentParserException; import net.sourceforge.argparse4j.inf.Namespace; @@ -24,7 +26,7 @@ import net.sourceforge.argparse4j.inf.Namespace; /* optional arguments: -h, --help show this help message and exit - -d PEERID, --peerid PEERID Seed peer ID. (default: digitalocean1.bitsquare.io) + -d PEER_ID, --peerid PEER Seed peer ID (default: "digitalocean1.bitsquare.io") -p PORT, --port PORT IP port to listen on. (default: 5000) -i INTERFACE, --interface INTERFACE Network interface to listen on. -n NAME, --name NAME Append name to application name. @@ -33,10 +35,12 @@ public class ArgumentParser { public static String PEER_ID_FLAG = "peerid"; public static String PORT_FLAG = "port"; - public static Integer PORT_DEFAULT = 5000; - public static String INFHINT_FLAG = "interface"; + public static String INTERFACE_HINT_FLAG = "interface"; public static String NAME_FLAG = "name"; + public static Integer PORT_DEFAULT = 5000; + public static String PEER_ID_DEFAULT = BootstrapNode.DIGITAL_OCEAN1.getId(); + private final net.sourceforge.argparse4j.inf.ArgumentParser parser; public ArgumentParser() { @@ -44,11 +48,12 @@ public class ArgumentParser { .defaultHelp(true) .description("Bitsquare - The decentralized bitcoin exchange."); parser.addArgument("-d", "--" + PEER_ID_FLAG) + .setDefault(PEER_ID_DEFAULT) .help("Seed peer ID."); parser.addArgument("-p", "--" + PORT_FLAG) .setDefault(PORT_DEFAULT) .help("IP port to listen on."); - parser.addArgument("-i", "--" + INFHINT_FLAG) + parser.addArgument("-i", "--" + INTERFACE_HINT_FLAG) .help("Network interface to listen on."); parser.addArgument("-n", "--" + NAME_FLAG) .help("Append name to application name."); diff --git a/src/main/java/io/bitsquare/app/cli/SeedNode.java b/src/main/java/io/bitsquare/app/cli/SeedNode.java index 01517108e6..8c20e06289 100644 --- a/src/main/java/io/bitsquare/app/cli/SeedNode.java +++ b/src/main/java/io/bitsquare/app/cli/SeedNode.java @@ -52,16 +52,11 @@ public class SeedNode { ArgumentParser parser = new ArgumentParser(); Namespace namespace = parser.parseArgs(args); - if (namespace.getString(ArgumentParser.INFHINT_FLAG) != null) { - interfaceHint = namespace.getString(ArgumentParser.INFHINT_FLAG); - } + if (namespace.getString(ArgumentParser.INTERFACE_HINT_FLAG) != null) + interfaceHint = namespace.getString(ArgumentParser.INTERFACE_HINT_FLAG); int serverPort = Integer.valueOf(namespace.getString(ArgumentParser.PORT_FLAG)); - - String seedID = BootstrapNode.LOCAL_HOST.getId(); - if (namespace.getString(ArgumentParser.PEER_ID_FLAG) != null) { - seedID = namespace.getString(ArgumentParser.PEER_ID_FLAG); - } + String seedID = namespace.getString(ArgumentParser.PEER_ID_FLAG); final Set peerAddresses = new HashSet<>(); for (Node node : BootstrapNode.values()) { @@ -81,14 +76,15 @@ public class SeedNode { inbox.send(seedNode, new InitializePeer(Number160.createHash(seedID), serverPort, interfaceHint, peerAddresses)); + final String _seedID = seedID; Thread seedNodeThread = new Thread(() -> { Boolean quit = false; while (!quit) { try { Object m = inbox.receive(FiniteDuration.create(5L, "seconds")); if (m instanceof PeerInitialized) { - log.debug("Seed Peer Initialized on port " + ((PeerInitialized) m).getPort - ()); + log.debug("Seed Peer with ID " + _seedID + + " initialized on port " + ((PeerInitialized) m).getPort()); } } catch (Exception e) { if (!(e instanceof TimeoutException)) { diff --git a/src/main/java/io/bitsquare/gui/main/trade/offerbook/OfferBook.java b/src/main/java/io/bitsquare/gui/main/trade/offerbook/OfferBook.java index e962e6b451..f9296cd0ee 100644 --- a/src/main/java/io/bitsquare/gui/main/trade/offerbook/OfferBook.java +++ b/src/main/java/io/bitsquare/gui/main/trade/offerbook/OfferBook.java @@ -173,7 +173,7 @@ public class OfferBook { private void startPolling() { addListeners(); setBankAccount(user.getCurrentBankAccount()); - pollingTimer = Utilities.setInterval(1000, (animationTimer) -> { + pollingTimer = Utilities.setInterval(3000, (animationTimer) -> { messageFacade.requestInvalidationTimeStampFromDHT(fiatCode); return null; }); diff --git a/src/main/java/io/bitsquare/msg/BootstrappedPeerFactory.java b/src/main/java/io/bitsquare/msg/BootstrappedPeerFactory.java index 0a3481fba0..199f022faa 100644 --- a/src/main/java/io/bitsquare/msg/BootstrappedPeerFactory.java +++ b/src/main/java/io/bitsquare/msg/BootstrappedPeerFactory.java @@ -77,6 +77,8 @@ public class BootstrappedPeerFactory { private final SettableFuture settableFuture = SettableFuture.create(); public final StringProperty connectionState = new SimpleStringProperty(); + private Peer peer; + private PeerDHT peerDHT; /////////////////////////////////////////////////////////////////////////////////////////// @@ -109,8 +111,8 @@ public class BootstrappedPeerFactory { public ListenableFuture start(int port) { try { - Peer peer = new PeerBuilder(keyPair).ports(port).behindFirewall().start(); - PeerDHT peerDHT = new PeerBuilderDHT(peer).storageLayer(new StorageLayer(storage)).start(); + peer = new PeerBuilder(keyPair).ports(port).start(); + peerDHT = new PeerBuilderDHT(peer).storageLayer(new StorageLayer(storage)).start(); peer.peerBean().peerMap().addPeerMapChangeListener(new PeerMapChangeListener() { @Override @@ -154,18 +156,14 @@ public class BootstrappedPeerFactory { switch (lastSuccessfulBootstrap) { case "relay": - futureDiscover = peerDHT.peer().discover().peerAddress(getBootstrapAddress()).start(); - PeerNAT peerNAT = new PeerBuilderNAT(peerDHT.peer()).start(); - FutureNAT futureNAT = peerNAT.startSetupPortforwarding(futureDiscover); - bootstrapWithRelay(peerDHT, peerNAT, futureDiscover, futureNAT); + bootstrapWithRelay(); break; case "portForwarding": - futureDiscover = peerDHT.peer().discover().peerAddress(getBootstrapAddress()).start(); - tryPortForwarding(peerDHT, futureDiscover); + tryPortForwarding(); break; case "default": default: - discover(peerDHT); + discover(); break; } } catch (IOException e) { @@ -177,13 +175,13 @@ public class BootstrappedPeerFactory { } // 1. Attempt: Try to discover our outside visible address - private void discover(PeerDHT peerDHT) { - FutureDiscover futureDiscover = peerDHT.peer().discover().peerAddress(getBootstrapAddress()).start(); + private void discover() { + FutureDiscover futureDiscover = peer.discover().peerAddress(getBootstrapAddress()).start(); futureDiscover.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { if (future.isSuccess()) { - setState("We are visible to other peers: My address visible to " + + setState("We are directly connected and visible to other peers: My address visible to " + "the outside is " + futureDiscover.peerAddress()); persistence.write(BootstrappedPeerFactory.this, "lastSuccessfulBootstrap", "default"); settableFuture.set(peerDHT); @@ -192,7 +190,7 @@ public class BootstrappedPeerFactory { setState("We are probably behind a NAT and not reachable to other peers. " + "We try port forwarding as next step."); - tryPortForwarding(peerDHT, futureDiscover); + tryPortForwarding(); } } @@ -206,20 +204,22 @@ public class BootstrappedPeerFactory { } // 2. Attempt: Try to set up port forwarding with UPNP and NAT-PMP - private void tryPortForwarding(PeerDHT peerDHT, FutureDiscover futureDiscover) { - PeerNAT peerNAT = new PeerBuilderNAT(peerDHT.peer()).start(); + private void tryPortForwarding() { + FutureDiscover futureDiscover = peer.discover().peerAddress(getBootstrapAddress()).start(); + PeerNAT peerNAT = new PeerBuilderNAT(peer).start(); FutureNAT futureNAT = peerNAT.startSetupPortforwarding(futureDiscover); futureNAT.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { if (future.isSuccess()) { - setState("Automatic port forwarding is setup. Address = " + futureNAT.peerAddress()); + setState("Automatic port forwarding is setup. We need to run a discover again. Address = " + + futureNAT.peerAddress()); // we need a second discover process - discoverAfterPortForwarding(peerDHT); + discoverAfterPortForwarding(); } else { setState("Port forwarding has failed. We try to use a relay as next step."); - bootstrapWithRelay(peerDHT, peerNAT, futureDiscover, futureNAT); + bootstrapWithRelay(); } } @@ -233,8 +233,8 @@ public class BootstrappedPeerFactory { } // Try to determine our outside visible address after port forwarding is setup - private void discoverAfterPortForwarding(PeerDHT peerDHT) { - FutureDiscover futureDiscover = peerDHT.peer().discover().peerAddress(getBootstrapAddress()).start(); + private void discoverAfterPortForwarding() { + FutureDiscover futureDiscover = peer.discover().peerAddress(getBootstrapAddress()).start(); futureDiscover.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { @@ -265,8 +265,10 @@ public class BootstrappedPeerFactory { } // 3. Attempt: We try to use another peer as relay - private void bootstrapWithRelay(PeerDHT peerDHT, PeerNAT peerNAT, FutureDiscover futureDiscover, - FutureNAT futureNAT) { + private void bootstrapWithRelay() { + FutureDiscover futureDiscover = peer.discover().peerAddress(getBootstrapAddress()).start(); + PeerNAT peerNAT = new PeerBuilderNAT(peer).start(); + FutureNAT futureNAT = peerNAT.startSetupPortforwarding(futureDiscover); FutureRelayNAT futureRelayNAT = peerNAT.startRelay(futureDiscover, futureNAT); futureRelayNAT.addListener(new BaseFutureListener() { @Override diff --git a/src/main/java/io/bitsquare/msg/actor/DHTManager.java b/src/main/java/io/bitsquare/msg/actor/DHTManager.java index dd26040666..f024f74e73 100644 --- a/src/main/java/io/bitsquare/msg/actor/DHTManager.java +++ b/src/main/java/io/bitsquare/msg/actor/DHTManager.java @@ -79,6 +79,10 @@ public class DHTManager extends AbstractActor { peer = new PeerBuilder(initializePeer.getPeerId()).ports(initializePeer.getPort()).bindings(bindings) .start(); + peer.objectDataReply((sender, request) -> { + log.debug("received request: ", request.toString()); + return "pong"; + }); // For the moment we want not to bootstrap to other seed nodes to keep test scenarios // simple @@ -110,9 +114,7 @@ public class DHTManager extends AbstractActor { sender().tell(new PeerInitialized(peer.peerID(), initializePeer.getPort()), self()); } catch (Throwable t) { - log.info("The second instance has been started. If that happens at the first instance" + - " we are in trouble... " + t.getMessage()); - sender().tell(new PeerInitialized(null, null), self()); + log.error(t.getMessage()); } } From 399842d8e277c311faf1239612be1a6368f9cf67 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Wed, 5 Nov 2014 22:12:48 +0100 Subject: [PATCH 05/17] Extract isSuccess to a method and return always true to ignore a bug from TomP2P (isSuccess returns always false) --- .../io/bitsquare/msg/TomP2PMessageFacade.java | 124 ++++++++++-------- 1 file changed, 67 insertions(+), 57 deletions(-) diff --git a/src/main/java/io/bitsquare/msg/TomP2PMessageFacade.java b/src/main/java/io/bitsquare/msg/TomP2PMessageFacade.java index b974144bf6..af245bbd28 100644 --- a/src/main/java/io/bitsquare/msg/TomP2PMessageFacade.java +++ b/src/main/java/io/bitsquare/msg/TomP2PMessageFacade.java @@ -140,7 +140,7 @@ class TomP2PMessageFacade implements MessageFacade { futureGet.addListener(new BaseFutureAdapter() { @Override public void operationComplete(BaseFuture baseFuture) throws Exception { - if (baseFuture.isSuccess() && futureGet.data() != null) { + if (isSuccess(baseFuture) && futureGet.data() != null) { final Peer peer = (Peer) futureGet.data().object(); Platform.runLater(() -> listener.onResult(peer)); } @@ -151,7 +151,6 @@ class TomP2PMessageFacade implements MessageFacade { }); } - /////////////////////////////////////////////////////////////////////////////////////////// // Offer /////////////////////////////////////////////////////////////////////////////////////////// @@ -168,48 +167,54 @@ class TomP2PMessageFacade implements MessageFacade { ", hash: " + offerData.hash().toString() + "]"); FuturePut futurePut = p2pNode.addProtectedData(locationKey, offerData); futurePut.addListener(new BaseFutureListener() { - @Override - public void operationComplete(BaseFuture future) throws Exception { - // deactivate it for the moment until the port forwarding bug is fixed - // if (future.isSuccess()) { - Platform.runLater(() -> { - addOfferListener.onComplete(); - offerBookListeners.stream().forEach(listener -> { - try { - Object offerDataObject = offerData.object(); - if (offerDataObject instanceof Offer) { - log.error("Added offer to DHT with ID: " + ((Offer) offerDataObject).getId()); - listener.onOfferAdded((Offer) offerDataObject); - } - } catch (ClassNotFoundException | IOException e) { - e.printStackTrace(); - log.error("Add offer to DHT failed: " + e.getMessage()); - } - }); + @Override + public void operationComplete(BaseFuture future) throws Exception { + if (isSuccess(future)) { + Platform.runLater(() -> { + addOfferListener.onComplete(); + offerBookListeners.stream().forEach(listener -> { + try { + Object offerDataObject = offerData.object(); + if (offerDataObject instanceof Offer) { + log.error("Added offer to DHT with ID: " + ((Offer) + offerDataObject).getId()); + listener.onOfferAdded((Offer) offerDataObject); + } + } catch (ClassNotFoundException | IOException e) { + e.printStackTrace(); + log.error("Add offer to DHT failed: " + e.getMessage()); + } + }); - // TODO will be removed when we don't use polling anymore - writeInvalidationTimestampToDHT(locationKey); - log.trace("Add offer to DHT was successful. Added data: [locationKey: " + locationKey + - ", value: " + offerData + "]"); - }); - /* } - else { - Platform.runLater(() -> { - addOfferListener.onFailed("Add offer to DHT failed.", - new Exception("Add offer to DHT failed. Reason: " + future.failedReason())); - log.error("Add offer to DHT failed. Reason: " + future.failedReason()); - }); - }*/ - } + // TODO will be removed when we don't use polling anymore + writeInvalidationTimestampToDHT(locationKey); + log.trace("Add offer to DHT was successful. Added data: " + + "[locationKey: " + locationKey + + ", value: " + offerData + "]"); + }); + } + else { + Platform.runLater(() -> { + addOfferListener.onFailed("Add offer to DHT failed.", + new Exception("Add offer to DHT failed. Reason: " + future + .failedReason())); + log.error("Add offer to DHT failed. Reason: " + future.failedReason + ()); + }); + } + } - @Override - public void exceptionCaught(Throwable t) throws Exception { - Platform.runLater(() -> { - addOfferListener.onFailed("Add offer to DHT failed with an exception.", t); - log.error("Add offer to DHT failed with an exception: " + t.getMessage()); - }); - } - }); + @Override + public void exceptionCaught(Throwable t) throws Exception { + Platform.runLater(() -> { + addOfferListener.onFailed("Add offer to DHT failed with an exception.", + t); + log.error("Add offer to DHT failed with an exception: " + t.getMessage()); + }); + } + } + + ); } catch (IOException e) { Platform.runLater(() -> { addOfferListener.onFailed("Add offer to DHT failed with an exception.", e); @@ -220,6 +225,7 @@ class TomP2PMessageFacade implements MessageFacade { //TODO remove is failing, probably due Coin or Fiat class (was working before) // objects are identical but returned object form network might have some problem with serialisation? + public void removeOffer(Offer offer) { Number160 locationKey = Number160.createHash(offer.getCurrency().getCurrencyCode()); try { @@ -230,8 +236,7 @@ class TomP2PMessageFacade implements MessageFacade { futureRemove.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { - // deactivate it for the moment until the port forwarding bug is fixed - // if (future.isSuccess()) { + if (isSuccess(future)) { Platform.runLater(() -> { offerBookListeners.stream().forEach(offerBookListener -> { try { @@ -249,11 +254,11 @@ class TomP2PMessageFacade implements MessageFacade { }); writeInvalidationTimestampToDHT(locationKey); }); - /* } + } else { log.error("Remove offer from DHT failed. Cause: future.isSuccess() = false, locationKey: " + locationKey + ", Reason: " + future.failedReason()); - }*/ + } } @Override @@ -274,7 +279,7 @@ class TomP2PMessageFacade implements MessageFacade { futureGet.addListener(new BaseFutureAdapter() { @Override public void operationComplete(BaseFuture baseFuture) throws Exception { - if (baseFuture.isSuccess()) { + if (isSuccess(baseFuture)) { final Map dataMap = futureGet.dataMap(); final List offers = new ArrayList<>(); if (dataMap != null) { @@ -319,13 +324,13 @@ class TomP2PMessageFacade implements MessageFacade { public void sendMessage(Peer peer, Message message, OutgoingMessageListener listener) { if (!(peer instanceof TomP2PPeer)) { - throw new IllegalArgumentException("peer must be of type TomP2PPeer") ; + throw new IllegalArgumentException("peer must be of type TomP2PPeer"); } - FutureDirect futureDirect = p2pNode.sendData(((TomP2PPeer)peer).getPeerAddress(), message); + FutureDirect futureDirect = p2pNode.sendData(((TomP2PPeer) peer).getPeerAddress(), message); futureDirect.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { - if (futureDirect.isSuccess()) { + if (isSuccess(futureDirect)) { Platform.runLater(listener::onResult); } else { @@ -367,7 +372,7 @@ class TomP2PMessageFacade implements MessageFacade { } })); - if (addFuture.isSuccess()) { + if (isSuccess(addFuture)) { log.trace("Add arbitrator to DHT was successful. Stored data: [key: " + locationKey + ", " + "values: " + arbitratorData + "]"); } @@ -402,7 +407,7 @@ class TomP2PMessageFacade implements MessageFacade { } } })); - if (removeFuture.isSuccess()) { + if (isSuccess(removeFuture)) { log.trace("Remove arbitrator from DHT was successful. Stored data: [key: " + locationKey + ", " + "values: " + arbitratorData + "]"); } @@ -436,7 +441,7 @@ class TomP2PMessageFacade implements MessageFacade { listener.onArbitratorsReceived(arbitrators); })); - if (baseFuture.isSuccess()) { + if (isSuccess(baseFuture)) { log.trace("Get arbitrators from DHT was successful. Stored data: [key: " + locationKey + ", " + "values: " + futureGet.dataMap() + "]"); } @@ -495,7 +500,7 @@ class TomP2PMessageFacade implements MessageFacade { putFuture.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { - if (putFuture.isSuccess()) + if (isSuccess(putFuture)) log.trace("Update invalidationTimestamp to DHT was successful. TimeStamp=" + invalidationTimestamp.get()); else @@ -522,7 +527,7 @@ class TomP2PMessageFacade implements MessageFacade { getFuture.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { - if (getFuture.isSuccess()) { + if (isSuccess(getFuture)) { Data data = getFuture.data(); if (data != null && data.object() instanceof Long) { final Object object = data.object(); @@ -533,12 +538,12 @@ class TomP2PMessageFacade implements MessageFacade { }); } else { - //log.error("Get invalidationTimestamp from DHT failed. Data = " + data); + log.error("Get invalidationTimestamp from DHT failed. Data = " + data); } } else if (getFuture.data() == null) { // OK as nothing is set at the moment - // log.trace("Get invalidationTimestamp from DHT returns null. That is ok for the startup."); + log.trace("Get invalidationTimestamp from DHT returns null. That is ok for the startup."); } else { log.error("Get invalidationTimestamp from DHT failed with reason:" + getFuture.failedReason()); @@ -557,6 +562,11 @@ class TomP2PMessageFacade implements MessageFacade { return Number160.createHash(locationKey + "invalidated"); } + // Isolate the success handling as there is bug in port forwarding mode + private boolean isSuccess(BaseFuture baseFuture) { + // return baseFuture.isSuccess(); + return true; + } /////////////////////////////////////////////////////////////////////////////////////////// // Incoming message handler From dbbe7cec493811e2872f68a46dfed67f464b7039 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Wed, 5 Nov 2014 23:15:36 +0100 Subject: [PATCH 06/17] Extract isSuccess in P2PNode, set custom log levels --- src/main/java/io/bitsquare/msg/P2PNode.java | 10 ++++++++-- .../java/io/bitsquare/msg/TomP2PMessageFacade.java | 2 +- src/main/resources/logback.xml | 12 +++++++++--- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/bitsquare/msg/P2PNode.java b/src/main/java/io/bitsquare/msg/P2PNode.java index d7f42bb4ff..232bbd09e8 100644 --- a/src/main/java/io/bitsquare/msg/P2PNode.java +++ b/src/main/java/io/bitsquare/msg/P2PNode.java @@ -187,7 +187,7 @@ public class P2PNode { futureDirect.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { - if (futureDirect.isSuccess()) { + if (isSuccess(futureDirect)) { log.debug("sendMessage completed"); } else { @@ -297,7 +297,7 @@ public class P2PNode { futurePut.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { - if (future.isSuccess()) { + if (isSuccess(futurePut)) { storedPeerAddress = peerDHT.peerAddress(); log.debug("storedPeerAddress = " + storedPeerAddress); } @@ -382,4 +382,10 @@ public class P2PNode { storage = new StorageMemory(); } } + + // Isolate the success handling as there is bug in port forwarding mode + private boolean isSuccess(BaseFuture baseFuture) { + // return baseFuture.isSuccess(); + return true; + } } diff --git a/src/main/java/io/bitsquare/msg/TomP2PMessageFacade.java b/src/main/java/io/bitsquare/msg/TomP2PMessageFacade.java index af245bbd28..fb4d60b97b 100644 --- a/src/main/java/io/bitsquare/msg/TomP2PMessageFacade.java +++ b/src/main/java/io/bitsquare/msg/TomP2PMessageFacade.java @@ -538,7 +538,7 @@ class TomP2PMessageFacade implements MessageFacade { }); } else { - log.error("Get invalidationTimestamp from DHT failed. Data = " + data); + //log.error("Get invalidationTimestamp from DHT failed. Data = " + data); } } else if (getFuture.data() == null) { diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 6b4d2431bf..dcf292b153 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -26,7 +26,13 @@ - + + + + + + + @@ -37,8 +43,8 @@ - - + +