Add tests for add, remove

This commit is contained in:
Manfred Karrer 2014-10-24 21:18:18 +02:00
parent aeb7730147
commit 39febbac4b

View file

@ -19,11 +19,10 @@ package io.bitsquare.msg;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import net.tomp2p.connection.Ports;
import net.tomp2p.dht.FutureGet;
import net.tomp2p.dht.FuturePut;
import net.tomp2p.dht.FutureRemove;
import net.tomp2p.dht.PeerBuilderDHT;
import net.tomp2p.dht.PeerDHT;
import net.tomp2p.dht.StorageLayer;
@ -63,56 +62,36 @@ public class BasicUsecasesInWANTest {
private final static String SERVER_ID_1 = "digitalocean1.bitsquare.io"; // Manfreds server
private final static String SERVER_IP_1 = "188.226.179.109"; // Manfreds server
private final static int SERVER_PORT_1 = 5000;
private final static String SERVER_ID_2 = "digitalocean2.bitsquare.io"; // Steve's server
private final static String SERVER_IP_2 = "128.199.251.106"; // Steve's server
//private final static String SERVER_IP_2 = "128.199.251.106"; // Steve's server
private final static String SERVER_IP_2 = "188.226.179.109"; // Manfreds server
private final static int SERVER_PORT_2 = 5001;
private final static String SERVER_ID = SERVER_ID_1;
private final static String SERVER_IP = SERVER_IP_1;
private final static int SERVER_PORT = SERVER_PORT_1;
private final static int SERVER_PORT = 5000;
private final static String CLIENT_1_ID = "alice";
private final static String CLIENT_2_ID = "bob";
private final static int CLIENT_1_PORT = 6505;
private final static int CLIENT_2_PORT = 6506;
@Test
@Ignore
public void testBootstrap() throws Exception {
PeerDHT peerDHT = startClient(CLIENT_1_ID, CLIENT_1_PORT);
public void testPutGet() throws Exception {
PeerDHT peer1DHT = startClient(CLIENT_1_ID, new Ports().tcpPort());
PeerDHT peer2DHT = startClient(CLIENT_2_ID, new Ports().tcpPort());
// external ports cannot be tested as they come random
log.debug("############# tcpPort = " + peerDHT.peerAddress().tcpPort());
log.debug("############# udpPort = " + peerDHT.peerAddress().udpPort());
FuturePut futurePut = peer1DHT.put(Number160.createHash("key")).data(new Data("hallo")).start();
futurePut.awaitUninterruptibly();
assertTrue(futurePut.isSuccess());
// in case of port forwarding use that:
assertEquals(CLIENT_IP, peerDHT.peerAddress().inetAddress().getHostAddress());
// in case of relay use that:
//assertEquals("192.168.1.33", peerDHT.peerAddress().inetAddress().getHostAddress());
peerDHT.shutdown().awaitUninterruptibly();
}
@Test
@Ignore
public void testDHT() throws Exception {
PeerDHT peer1DHT = startClient(CLIENT_1_ID, CLIENT_1_PORT);
PeerDHT peer2DHT = startClient(CLIENT_2_ID, CLIENT_2_PORT);
FuturePut futurePut1 = peer1DHT.put(Number160.createHash("key")).data(new Data("hallo1")).start();
futurePut1.awaitUninterruptibly();
log.debug("futurePut1.isSuccess() = " + futurePut1.isSuccess());
// why fails that?
//assertTrue(futurePut1.isSuccess());
FutureGet futureGet2 = peer1DHT.get(Number160.createHash("key")).start();
futureGet2.awaitUninterruptibly();
assertTrue(futureGet2.isSuccess());
assertNotNull(futureGet2.data());
assertEquals("hallo1", futureGet2.data().object());
FutureGet futureGet = peer2DHT.get(Number160.createHash("key")).start();
futureGet.awaitUninterruptibly();
assertTrue(futureGet.isSuccess());
assertEquals("hallo", futureGet.data().object());
peer1DHT.shutdown().awaitUninterruptibly();
peer2DHT.shutdown().awaitUninterruptibly();
@ -120,75 +99,96 @@ public class BasicUsecasesInWANTest {
@Test
@Ignore
public void testAddGet() throws Exception {
PeerDHT peer1DHT = startClient(CLIENT_1_ID, new Ports().tcpPort());
PeerDHT peer2DHT = startClient(CLIENT_2_ID, new Ports().tcpPort());
FuturePut futurePut1 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo1")).start();
futurePut1.awaitUninterruptibly();
assertTrue(futurePut1.isSuccess());
FuturePut futurePut2 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo2")).start();
futurePut2.awaitUninterruptibly();
assertTrue(futurePut2.isSuccess());
FutureGet futureGet = peer2DHT.get(Number160.createHash("locationKey")).all().start();
futureGet.awaitUninterruptibly();
assertTrue(futureGet.isSuccess());
assertTrue(futureGet.dataMap().values().contains(new Data("hallo1")));
assertTrue(futureGet.dataMap().values().contains(new Data("hallo2")));
assertTrue(futureGet.dataMap().values().size() == 2);
peer1DHT.shutdown().awaitUninterruptibly();
peer2DHT.shutdown().awaitUninterruptibly();
}
@Test
@Ignore
public void testRemove() throws Exception {
PeerDHT peer1DHT = startClient(CLIENT_1_ID, new Ports().tcpPort());
PeerDHT peer2DHT = startClient(CLIENT_2_ID, new Ports().tcpPort());
FuturePut futurePut1 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo1")).start();
futurePut1.awaitUninterruptibly();
assertTrue(futurePut1.isSuccess());
FuturePut futurePut2 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo2")).start();
futurePut2.awaitUninterruptibly();
assertTrue(futurePut2.isSuccess());
Number160 contentKey = new Data("hallo1").hash();
FutureRemove futureRemove = peer2DHT.remove(Number160.createHash("locationKey")).contentKey(contentKey).start();
futureRemove.awaitUninterruptibly();
// why is futureRemove.isSuccess() = false ?
log.debug(futureRemove.failedReason());// Future (compl/canc):true/false, OK, Minimun number of results reached
// assertTrue(futureRemove.isSuccess());
FutureGet futureGet = peer2DHT.get(Number160.createHash("locationKey")).all().start();
futureGet.awaitUninterruptibly();
assertTrue(futureGet.isSuccess());
assertTrue(futureGet.dataMap().values().contains(new Data("hallo2")));
assertTrue(futureGet.dataMap().values().size() == 1);
peer1DHT.shutdown().awaitUninterruptibly();
peer2DHT.shutdown().awaitUninterruptibly();
}
@Test
//@Ignore
public void testDHT2Servers() throws Exception {
PeerDHT peer1DHT = startClient(CLIENT_1_ID, CLIENT_1_PORT, SERVER_ID_1, SERVER_IP_1, SERVER_PORT);
PeerDHT peer2DHT = startClient(CLIENT_2_ID, CLIENT_2_PORT, SERVER_ID_2, SERVER_IP_2, SERVER_PORT);
PeerDHT peer1DHT = startClient(CLIENT_1_ID, new Ports().tcpPort(), SERVER_ID_1, SERVER_IP_1, SERVER_PORT_1);
PeerDHT peer2DHT = startClient(CLIENT_2_ID, new Ports().tcpPort(), SERVER_ID_2, SERVER_IP_2, SERVER_PORT_2);
FuturePut futurePut1 = peer1DHT.put(Number160.createHash("key")).data(new Data("hallo1")).start();
futurePut1.awaitUninterruptibly();
log.debug("futurePut1.isSuccess() = " + futurePut1.isSuccess());
// why fails that?
//assertTrue(futurePut1.isSuccess());
FuturePut futurePut = peer1DHT.put(Number160.createHash("key")).data(new Data("hallo")).start();
futurePut.awaitUninterruptibly();
assertTrue(futurePut.isSuccess());
FutureGet futureGet2 = peer1DHT.get(Number160.createHash("key")).start();
futureGet2.awaitUninterruptibly();
assertTrue(futureGet2.isSuccess());
assertNotNull(futureGet2.data());
assertEquals("hallo1", futureGet2.data().object());
FutureGet futureGet = peer2DHT.get(Number160.createHash("key")).start();
futureGet.awaitUninterruptibly();
assertTrue(futureGet.isSuccess());
assertEquals("hallo", futureGet.data().object());
peer1DHT.shutdown().awaitUninterruptibly();
peer2DHT.shutdown().awaitUninterruptibly();
}
// That test is failing because of timeouts
/*
server:
15:52:23.749 [NETTY-TOMP2P - worker-client/server - -1-1] WARN net.tomp2p.connection.TimeoutFactory - channel
timeout for channel Sender [id: 0xc4f02816, /0:0:0:0:0:0:0:0:49351]
15:52:23.752 [NETTY-TOMP2P - worker-client/server - -1-1] WARN net.tomp2p.connection.TimeoutFactory - Request status
is msgid=1812373319,t=REQUEST_1,c=PING,tcp,s=paddr[0x7728a3c6e3351739891ca100b5fb774ec5af4ddf[/188.226.179.109,
5000]]/relay(false,0)=[],r=paddr[0x48181acd22b3edaebc8a447868a7df7ce629920a[/83.36.8.117,t:64506,
u:62960]]/relay(false,0)=[]
15:52:23.767 [NETTY-TOMP2P - worker-client/server - -1-1] DEBUG net.tomp2p.peers.PeerMap - peer
paddr[0x48181acd22b3edaebc8a447868a7df7ce629920a[/83.36.8.117,t:64506,u:62960]]/relay(false,
0)=[] is offline with reason {} net.tomp2p.connection.PeerException: Future (compl/canc):true/false, FAILED,
No future set beforehand, probably an early shutdown / timeout, or use setFailedLater() or setResponseLater()
client:
21:52:15.559 [NETTY-TOMP2P - worker-client/server - -1-8] DEBUG net.tomp2p.peers.PeerMap - peer
paddr[0x48181acd22b3edaebc8a447868a7df7ce629920a[/83.36.8.117,t:64506,u:62960]]/relay(false,
0)=[] is offline with reason {} net.tomp2p.connection.PeerException: Future (compl/canc):true/false, FAILED,
No future set beforehand, probably an early shutdown / timeout, or use setFailedLater() or setResponseLater()
*/
@Test
@Ignore
public void testSendDirect() throws Exception {
PeerDHT peer1DHT = startClient(CLIENT_1_ID, CLIENT_1_PORT);
PeerDHT peer2DHT = startClient(CLIENT_2_ID, CLIENT_2_PORT);
PeerDHT peer1DHT = startClient(CLIENT_1_ID, new Ports().tcpPort());
PeerDHT peer2DHT = startClient(CLIENT_2_ID, new Ports().tcpPort());
final CountDownLatch countDownLatch = new CountDownLatch(1);
final StringBuilder result = new StringBuilder();
peer2DHT.peer().objectDataReply((sender, request) -> {
countDownLatch.countDown();
result.append(String.valueOf(request));
return request;
});
log.debug("peer1DHT " + peer1DHT.peerAddress());
log.debug("peer2DHT " + peer2DHT.peerAddress());
// FuturePeerConnection futurePeerConnection = peer1DHT.peer().createPeerConnection(peer2DHT.peer()
// .peerAddress(),
// PeerConnection.HEART_BEAT_MILLIS);
FuturePeerConnection futurePeerConnection = peer1DHT.peer().createPeerConnection(peer2DHT.peer().peerAddress(),
500);
FutureDirect futureDirect = peer1DHT.peer().sendDirect(futurePeerConnection).object("hallo").start();
//FutureDirect futureDirect2 = peer1DHT.peer().sendDirect(peer2DHT.peer().peerAddress()).object("hallo")
// .start();
futureDirect.addListener(new BaseFutureAdapter<FutureDirect>() {
@Override
public void operationComplete(FutureDirect future) throws Exception {
@ -197,32 +197,28 @@ No future set beforehand, probably an early shutdown / timeout, or use setFailed
}
else {
log.debug("Failed");
// future response state:,type:FAILED,msg:7,reason:Channel creation failed [id: 0xc163a536]/io
// .netty.channel.ConnectTimeoutException:
// connection timed out: /83.36.8.117:54458
}
}
});
futureDirect.awaitUninterruptibly();
countDownLatch.await(3, TimeUnit.SECONDS);
if (countDownLatch.getCount() > 0)
Assert.fail("The test method did not complete successfully!");
// futureDirect.object() causes a null pointer as buffer() in futureDirect is null and
// buffer().object(); is called.
//assertEquals("hallo", futureDirect.object());
assertEquals("hallo", result.toString());
peer1DHT.shutdown().awaitUninterruptibly();
peer2DHT.shutdown().awaitUninterruptibly();
assertEquals("hallo", futureDirect.object());
assertEquals("hallo", result.toString());
}
private PeerDHT startClient(String clientId, int clientPort) throws Exception {
return startClient(clientId, clientPort, SERVER_ID, SERVER_IP, SERVER_PORT);
}
private PeerDHT startClient(String clientId, int clientPort, String serverId, String serverIP,
int serverPort) throws Exception {
private PeerDHT startClient(String clientId, int clientPort, String serverId,
String serverIP, int serverPort) throws Exception {
Peer peer = null;
try {
peer = new PeerBuilder(Number160.createHash(clientId)).ports(clientPort).behindFirewall().start();
@ -265,6 +261,7 @@ No future set beforehand, probably an early shutdown / timeout, or use setFailed
if (futureRelayNAT.isSuccess()) {
log.info("Bootstrap using relay successful. Address = " + peer.peerAddress());
return peerDHT;
}
else {
log.error("Bootstrap using relay failed " + futureRelayNAT.failedReason());
@ -279,13 +276,14 @@ No future set beforehand, probably an early shutdown / timeout, or use setFailed
log.error("Bootstrap in relay mode failed " + e.getMessage());
e.printStackTrace();
Assert.fail("Bootstrap in relay mode failed " + e.getMessage());
peer.shutdown().awaitUninterruptibly();
if (peer != null)
peer.shutdown().awaitUninterruptibly();
return null;
}
}
// just for documentation
public void startBootstrappingSeedNode() {
public void startServer() {
Peer peer = null;
try {
peer = new PeerBuilder(Number160.createHash("digitalocean1.bitsquare.io")).ports(5000).start();
@ -300,6 +298,8 @@ No future set beforehand, probably an early shutdown / timeout, or use setFailed
Thread.sleep(2000);
}
} catch (Exception e) {
if (peer != null)
peer.shutdown().awaitUninterruptibly();
e.printStackTrace();
}
}