Send direct messages in network stress test in a loop

Actually each peer runs a loop to setup messages to be sent in the future with a random delay.

This is an actual stress test for the first time. Maybe the timeouts should be adjusted accordingly.
This commit is contained in:
Ivan Vilata-i-Balaguer 2016-05-03 12:07:39 +02:00
parent c8e3c6d11a
commit 6c188e35fa

View file

@ -54,6 +54,8 @@ public class NetworkStressTest {
/** Environment variable to specify a persistent test data directory. */
private static final String TEST_DIR_ENVVAR = "STRESS_TEST_DIR";
/** Number of direct messages to be sent by each peer. */
private static int DIRECT_COUNT = 100;
/** Minimum delay between direct messages in milliseconds, 25% larger than throttle limit. */
private static long MIN_DIRECT_DELAY_MILLIS = Math.round(1.25 * (1.0 / Connection.MSG_THROTTLE_PER_SEC) * 1000);
/** Maximum delay between direct messages in milliseconds, 10 times larger than minimum. */
@ -189,8 +191,8 @@ public class NetworkStressTest {
// Test each peer sending a direct message to another random peer.
final int nPeers = peerNodes.size();
BooleanProperty sentDirectFailed = new SimpleBooleanProperty(false);
final CountDownLatch sentDirectLatch = new CountDownLatch(nPeers);
final CountDownLatch receivedDirectLatch = new CountDownLatch(nPeers);
final CountDownLatch sentDirectLatch = new CountDownLatch(DIRECT_COUNT * nPeers);
final CountDownLatch receivedDirectLatch = new CountDownLatch(DIRECT_COUNT * nPeers);
final Instant sendStart = Instant.now();
for (final P2PService srcPeer : peerNodes) {
// Make the peer ready for receiving direct messages.
@ -202,39 +204,41 @@ public class NetworkStressTest {
receivedDirectLatch.countDown();
});
// Select a random peer and send a direct message to it after a random delay
// not shorter than throttle limits.
final int dstPeerIdx = (int) (Math.random() * nPeers);
final P2PService dstPeer = peerNodes.get(dstPeerIdx);
final NodeAddress dstPeerAddress = dstPeer.getAddress();
//print("sending direct message from peer %s to %s", srcPeer.getAddress(), dstPeer.getAddress());
UserThread.runAfterRandomDelay(() -> srcPeer.sendEncryptedDirectMessage(
dstPeerAddress, peerPKRings.get(dstPeerIdx),
new StressTestDirectMessage("test/" + dstPeerAddress), new SendDirectMessageListener() {
@Override
public void onArrived() {
sentDirectLatch.countDown();
}
for (int i = 0; i < DIRECT_COUNT; i++) {
// Select a random peer and send a direct message to it after a random delay
// not shorter than throttle limits.
final int dstPeerIdx = (int) (Math.random() * nPeers);
final P2PService dstPeer = peerNodes.get(dstPeerIdx);
final NodeAddress dstPeerAddress = dstPeer.getAddress();
//print("sending direct message from peer %s to %s", srcPeer.getAddress(), dstPeer.getAddress());
UserThread.runAfterRandomDelay(() -> srcPeer.sendEncryptedDirectMessage(
dstPeerAddress, peerPKRings.get(dstPeerIdx),
new StressTestDirectMessage("test/" + dstPeerAddress), new SendDirectMessageListener() {
@Override
public void onArrived() {
sentDirectLatch.countDown();
}
@Override
public void onFault() {
sentDirectFailed.set(true);
sentDirectLatch.countDown();
}
}), MIN_DIRECT_DELAY_MILLIS, MAX_DIRECT_DELAY_MILLIS, TimeUnit.MILLISECONDS
);
@Override
public void onFault() {
sentDirectFailed.set(true);
sentDirectLatch.countDown();
}
}), MIN_DIRECT_DELAY_MILLIS, MAX_DIRECT_DELAY_MILLIS, TimeUnit.MILLISECONDS
);
}
}
// Since receiving is completed before sending is reported to be complete,
// all receiving checks should end before all sending checks to avoid deadlocking.
// Wait for peers to complete receiving.
org.junit.Assert.assertTrue("timed out while receiving direct messages",
receivedDirectLatch.await(30, TimeUnit.SECONDS));
print("receiving 1 direct message per peer took %ss",
print("receiving %d direct messages per peer took %ss", DIRECT_COUNT,
Duration.between(sendStart, Instant.now()).toMillis()/1000.0);
// Wait for peers to complete sending.
org.junit.Assert.assertTrue("timed out while sending direct messages",
sentDirectLatch.await(30, TimeUnit.SECONDS));
print("sending 1 direct message per peer took %ss",
print("sending %d direct messages per peer took %ss", DIRECT_COUNT,
Duration.between(sendStart, Instant.now()).toMillis()/1000.0);
org.junit.Assert.assertFalse("some peer(s) failed to send a direct message", sentDirectFailed.get());
}