Send direct messages from all peers to random ones in network stress test

This commit is contained in:
Ivan Vilata-i-Balaguer 2016-04-29 13:12:18 +02:00
parent 97722a6ecd
commit b8d49e44b6

View file

@ -29,10 +29,7 @@ import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.security.Security;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -182,43 +179,46 @@ public class NetworkStressTest {
org.junit.Assert.assertTrue("timed out while waiting for bootstrap",
bootstrapLatch.await(30, TimeUnit.SECONDS));
// Test sending a direct message from peer #0 to peer #1.
// Test each peer sending a direct message to another random peer.
final int nPeers = peerNodes.size();
BooleanProperty sentDirectFailed = new SimpleBooleanProperty(false);
final CountDownLatch sentDirectLatch = new CountDownLatch(1);
final CountDownLatch receivedDirectLatch = new CountDownLatch(1);
final int srcPeerIdx = 0;
final int dstPeerIdx = 1;
final P2PService srcPeer = peerNodes.get(srcPeerIdx);
final P2PService dstPeer = peerNodes.get(dstPeerIdx);
srcPeer.sendEncryptedDirectMessage(dstPeer.getAddress(), peerPKRings.get(dstPeerIdx),
new StressTestDirectMessage("test/" + dstPeer.getAddress()), new SendDirectMessageListener() {
@Override
public void onArrived() {
sentDirectLatch.countDown();
}
final CountDownLatch sentDirectLatch = new CountDownLatch(nPeers);
final CountDownLatch receivedDirectLatch = new CountDownLatch(nPeers);
for (final P2PService srcPeer : peerNodes) {
final int dstPeerIdx = (int) (Math.random() * nPeers);
final P2PService dstPeer = peerNodes.get(dstPeerIdx);
/*System.out.println(
"Sending direct message from peer " + srcPeer.getAddress() + " to " + dstPeer.getAddress());*/
srcPeer.sendEncryptedDirectMessage(dstPeer.getAddress(), peerPKRings.get(dstPeerIdx),
new StressTestDirectMessage("test/" + dstPeer.getAddress()), new SendDirectMessageListener() {
@Override
public void onArrived() {
sentDirectLatch.countDown();
}
@Override
public void onFault() {
sentDirectFailed.set(true);
sentDirectLatch.countDown();
}
});
dstPeer.addDecryptedDirectMessageListener((decryptedMsgWithPubKey, peerNodeAddress) -> {
if (!(decryptedMsgWithPubKey.message instanceof StressTestDirectMessage))
return;
StressTestDirectMessage directMessage = (StressTestDirectMessage)(decryptedMsgWithPubKey.message);
if ((directMessage.getData().equals("test/" + dstPeer.getAddress())))
receivedDirectLatch.countDown();
});
@Override
public void onFault() {
sentDirectFailed.set(true);
sentDirectLatch.countDown();
}
});
dstPeer.addDecryptedDirectMessageListener((decryptedMsgWithPubKey, peerNodeAddress) -> {
if (!(decryptedMsgWithPubKey.message instanceof StressTestDirectMessage))
return;
StressTestDirectMessage directMessage = (StressTestDirectMessage) (decryptedMsgWithPubKey.message);
if ((directMessage.getData().equals("test/" + dstPeer.getAddress())))
receivedDirectLatch.countDown();
});
}
// Since receiving is completed before sending is reported to be complete,
// all receiving checks should end before all sending checks to avoid deadlocking.
// Wait for peer #1 to complete receiving.
org.junit.Assert.assertTrue("timed out while receiving direct message",
// Wait for peers to complete receiving.
org.junit.Assert.assertTrue("timed out while receiving direct messages",
receivedDirectLatch.await(30, TimeUnit.SECONDS));
// Wait for peer #0 to complete sending.
org.junit.Assert.assertTrue("timed out while sending direct message",
// Wait for peers to complete sending.
org.junit.Assert.assertTrue("timed out while sending direct messages",
sentDirectLatch.await(30, TimeUnit.SECONDS));
org.junit.Assert.assertFalse("peer failed to send message", sentDirectFailed.get());
org.junit.Assert.assertFalse("some peer(s) failed to send a direct message", sentDirectFailed.get());
}
private Path createTestDataDirectory() throws IOException {