Merge remote-tracking branch 'origin/Development' into Development

This commit is contained in:
Manfred Karrer 2016-05-18 21:45:17 +02:00
commit 8e9793d4e7
3 changed files with 804 additions and 3 deletions

View File

@ -775,6 +775,12 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
return peerManager;
}
@VisibleForTesting
@Nullable
public KeyRing getKeyRing() {
return optionalKeyRing.isPresent()? optionalKeyRing.get() : null;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////

View File

@ -62,10 +62,11 @@ public class Connection implements MessageListener {
// Static
///////////////////////////////////////////////////////////////////////////////////////////
private static final int MAX_MSG_SIZE = 500 * 1024; // 500 kb
// Leaving some constants package-private for tests to know limits.
static final int MAX_MSG_SIZE = 500 * 1024; // 500 kb
//TODO decrease limits again after testing
private static final int MSG_THROTTLE_PER_SEC = 70; // With MAX_MSG_SIZE of 500kb results in bandwidth of 35 mbit/sec
private static final int MSG_THROTTLE_PER_10_SEC = 500; // With MAX_MSG_SIZE of 100kb results in bandwidth of 50 mbit/sec for 10 sec
static final int MSG_THROTTLE_PER_SEC = 70; // With MAX_MSG_SIZE of 500kb results in bandwidth of 35 mbit/sec
static final int MSG_THROTTLE_PER_10_SEC = 500; // With MAX_MSG_SIZE of 100kb results in bandwidth of 50 mbit/sec for 10 sec
private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(60);
public static int getMaxMsgSize() {

View File

@ -0,0 +1,794 @@
package io.bitsquare.p2p.network;
import io.bitsquare.app.Version;
import io.bitsquare.common.Clock;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.crypto.KeyRing;
import io.bitsquare.common.crypto.KeyStorage;
import io.bitsquare.common.crypto.PubKeyRing;
import io.bitsquare.common.util.Tuple3;
import io.bitsquare.crypto.DecryptedMsgWithPubKey;
import io.bitsquare.crypto.EncryptionService;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.P2PService;
import io.bitsquare.p2p.P2PServiceListener;
import io.bitsquare.p2p.Utils;
import io.bitsquare.p2p.messaging.*;
import io.bitsquare.p2p.seed.SeedNode;
import io.bitsquare.p2p.seed.SeedNodesRepository;
import javafx.beans.property.BooleanProperty;
import javafx.beans.property.SimpleBooleanProperty;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.JUnitCore;
import org.junit.runner.Request;
import org.junit.runner.Result;
import org.junit.runner.notification.Failure;
import java.io.File;
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.security.Security;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Bitsquare network stress tests.
*
* You can invoke this class directly from the command line.
* If the name of a single test is given as an argument, only that test is run.
* Otherwise all tests in the class are run.
*
* You can also set some {@code STRESS_TEST_*} environment variables to
* customize the execution of tests.
* See the {@code *_ENVVAR} constants for the names of these variables.
*/
public class NetworkStressTest {
// Test parameters
/** Whether to log messages less important than warnings. */
private static final boolean USE_DETAILED_LOGGING = false;
// Constants
/** Environment variable to specify the number of peers in the test. */
private static final String NPEERS_ENVVAR = "STRESS_TEST_NPEERS";
/** Environment variable to specify a persistent test data directory. */
private static final String TEST_DIR_ENVVAR = "STRESS_TEST_DIR";
/** Environment variable to specify the number of direct messages sent per peer. */
private static final String DIRECT_COUNT_ENVVAR = "STRESS_TEST_NDIRECT";
/** Environment variable to specify the number of mailbox messages sent per peer. */
private static final String MAILBOX_COUNT_ENVVAR = "STRESS_TEST_NMAILBOX";
/** Numeric identifier of the regtest Bitcoin network. */
private static final int REGTEST_NETWORK_ID = 2;
/** Default number of peers in the test. */
private static final int NPEERS_DEFAULT = 4;
/** Minimum number of peers for the test to work (2 for direct messages, 3 for mailbox messages). */
private static final int NPEERS_MIN = 3;
/** Default number of direct messages to be sent by each peer. */
private static final int DIRECT_COUNT_DEFAULT = 100;
/** Default number of mailbox messages to be sent by each peer. */
private static final int MAILBOX_COUNT_DEFAULT = 100;
/** Maximum delay in seconds for a node to receive preliminary data. */
private static long MAX_PRELIMINARY_DELAY_SECS = 5;
/** Maximum delay in seconds for a node to bootstrap after receiving preliminary data. */
private static long MAX_BOOTSTRAP_DELAY_SECS = 5;
/** Maximum delay in seconds for a node to shutdown. */
private static long MAX_SHUTDOWN_DELAY_SECS = 2;
/** Minimum delay between direct messages in milliseconds, 25% larger than throttle limit. */
private static long MIN_DIRECT_DELAY_MILLIS = Math.round(1.25 * (1.0 / Connection.MSG_THROTTLE_PER_SEC) * 1000);
/** Maximum delay between direct messages in milliseconds, 10 times larger than minimum. */
private static long MAX_DIRECT_DELAY_MILLIS = 10 * MIN_DIRECT_DELAY_MILLIS;
/** Estimated delay in seconds to send or receive a mailbox message. */
private static long MAILBOX_DELAY_SECS = 2;
// Instance fields
/** The last time a progress bar update was printed (to throttle message printing). */
private long lastProgressUpdateMillis = 0;
/** A directory to (temporarily) hold seed and normal nodes' configuration and state files. */
private Path testDataDir;
/** Whether to use localhost addresses instead of Tor hidden services. */
private boolean useLocalhost;
/** A single seed node that other nodes will contact to request initial data. */
private SeedNode seedNode;
/** The repository of seed nodes used in the test. */
private SeedNodesRepository seedNodesRepository = new SeedNodesRepository();
/** A list of peer nodes represented as P2P services. */
private List<P2PService> peerNodes = new ArrayList<>();
/** A list of peer node's service ports. */
private List<Integer> peerPorts = new ArrayList<>();
/** A list of peer node's public key rings. */
private List<PubKeyRing> peerPKRings = new ArrayList<>();
/** Number of direct messages to be sent by each peer. */
private int directCount = DIRECT_COUNT_DEFAULT;
/** Number of mailbox messages to be sent by each peer. */
private int mailboxCount = MAILBOX_COUNT_DEFAULT;
// # MAIN ENTRY POINT
// Inspired by <https://stackoverflow.com/a/9288513> by Marc Peters.
public static void main(String[] args) {
Request request = (args.length == 0)
? Request.aClass(NetworkStressTest.class)
: Request.method(NetworkStressTest.class, args[0]);
Result result = new JUnitCore().run(request);
for (Failure f : result.getFailures())
System.err.printf("\n%s\n%s", f, f.getTrace());
System.exit(result.wasSuccessful() ? 0 : 1);
}
// # COMMON UTILITIES
private void print(String message, Object... args) {
System.out.println(this.getClass().getSimpleName() + ": "
+ String.format(message, args));
}
/** Decrease latch count and print a progress indicator based on the given character. */
private void countDownAndPrint(CountDownLatch latch, char c) {
latch.countDown();
printProgress(c, (int)latch.getCount());
}
/** Print a progress indicator based on the given character. */
private void printProgress(char c, int n) {
if (n < 1)
return; // completed tasks are not shown
// Do not print the indicator if the last one was shown less than half second ago.
long now = System.currentTimeMillis();
if ((now - lastProgressUpdateMillis) < 500)
return;
lastProgressUpdateMillis = now;
// Keep a fixed length so that indicators do not overwrite partially.
System.out.print(String.format("\r%s> %c*%-6d ", this.getClass().getSimpleName(), c, n));
System.out.flush();
}
private static void assertLatch(String message, CountDownLatch latch, long timeout, TimeUnit unit)
throws InterruptedException {
if (!latch.await(timeout, unit))
org.junit.Assert.fail(String.format("%s (%d pending in latch)", message, latch.getCount()));
}
private Tuple3<Long, Long, Long> minMaxAvg(List<Long> l) {
long min = Long.MAX_VALUE;
long max = Long.MIN_VALUE;
long sum = 0;
for (long e : l) {
if (e < min)
min = e;
if (e > max)
max = e;
sum += e;
}
return new Tuple3<>(min, max, sum / l.size());
}
// # TEST SETUP
@Before
public void setUp() throws Exception {
// Parse test parameter environment variables.
/** Number of peer nodes to create. */
final int nPeers = parseEnvInt(NPEERS_ENVVAR, NPEERS_DEFAULT, NPEERS_MIN);
directCount = parseEnvInt(DIRECT_COUNT_ENVVAR, DIRECT_COUNT_DEFAULT, 0);
mailboxCount = parseEnvInt(MAILBOX_COUNT_ENVVAR, MAILBOX_COUNT_DEFAULT, 0);
/** A property where threads can indicate setup failure of local services (Tor node, hidden service). */
final BooleanProperty localServicesFailed = new SimpleBooleanProperty(false);
/** A barrier to wait for concurrent setup of local services (Tor node, hidden service). */
final CountDownLatch localServicesLatch = new CountDownLatch(1 /*seed node*/ + nPeers);
/* A barrier to wait for concurrent reception of preliminary data in peers. */
final CountDownLatch prelimDataLatch = new CountDownLatch(nPeers);
/* A barrier to wait for concurrent bootstrap of peers. */
final CountDownLatch bootstrapLatch = new CountDownLatch(nPeers);
// Set a security provider to allow key generation.
Security.addProvider(new BouncyCastleProvider());
// Create the test data directory.
testDataDir = createTestDataDirectory();
print("test data directory: " + testDataDir);
// Setting the executor seems to make tests more stable against ``ConcurrentModificationException``
// (see #443). However it make it use more open files, so you may need to use ``ulimit -n NUMBER``
// or run ``prlimit -nNUMBER -pPID`` (as root) on your shell's PID if you get too many open files errors.
// NUMBER=16384 seems to be enough for 100 peers in Debian GNU/Linux.
UserThread.setExecutor(Executors.newSingleThreadExecutor());
// Create and start the seed node.
seedNode = new SeedNode(testDataDir.toString());
final NodeAddress seedNodeAddress = newSeedNodeAddress();
useLocalhost = seedNodeAddress.hostName.equals("localhost");
final Set<NodeAddress> seedNodes = new HashSet<>(1);
seedNodes.add(seedNodeAddress); // the only seed node in tests
seedNode.createAndStartP2PService(seedNodeAddress, SeedNode.MAX_CONNECTIONS_DEFAULT, useLocalhost,
REGTEST_NETWORK_ID, USE_DETAILED_LOGGING, seedNodes,
new SeedServiceListener(localServicesLatch, localServicesFailed));
print("created seed node");
// Create and start peer nodes, all connecting to the seed node above.
if (useLocalhost) {
seedNodesRepository.setLocalhostSeedNodeAddresses(seedNodes);
} else {
seedNodesRepository.setTorSeedNodeAddresses(seedNodes);
}
for (int p = 0; p < nPeers; p++) {
// peer network port
final int peerPort = Utils.findFreeSystemPort();
peerPorts.add(peerPort);
// create, save and start peer
final P2PService peer = createPeerNode(p, peerPort);
//noinspection ConstantConditions
peerPKRings.add(peer.getKeyRing().getPubKeyRing());
peerNodes.add(peer);
peer.start(new PeerServiceListener(
localServicesLatch, localServicesFailed, prelimDataLatch, bootstrapLatch));
}
print("created peer nodes");
// Wait for concurrent tasks to finish.
localServicesLatch.await();
// Check if any node reported setup failure on start.
if (localServicesFailed.get()) {
throw new Exception("nodes failed to start");
}
print("all local nodes started");
// Wait for peers to get their preliminary data.
assertLatch("timed out while waiting for preliminary data",
prelimDataLatch, MAX_PRELIMINARY_DELAY_SECS * nPeers, TimeUnit.SECONDS);
print("preliminary data received");
// Wait for peers to complete their bootstrapping.
assertLatch("timed out while waiting for bootstrap",
bootstrapLatch, MAX_BOOTSTRAP_DELAY_SECS * nPeers, TimeUnit.SECONDS);
print("bootstrap complete");
}
/** Parse an integer value from the given environment variable, with default and minimum values. */
private int parseEnvInt(String envVar, int defValue, int minValue) {
int value = defValue;
final String envValue = System.getenv(envVar);
if (envValue != null && !envValue.equals(""))
value = Integer.parseInt(envValue);
if (value < minValue)
throw new IllegalArgumentException(
String.format("%s must be at least %d: %d", envVar, minValue, value)
);
return value;
}
private Path createTestDataDirectory() throws IOException {
Path stressTestDirPath;
final String stressTestDir = System.getenv(TEST_DIR_ENVVAR);
if ((stressTestDir != null) && !stressTestDir.equals("")) {
// Test directory specified, use and create if missing.
stressTestDirPath = Paths.get(stressTestDir);
if (!Files.isDirectory(stressTestDirPath)) {
//noinspection ResultOfMethodCallIgnored
stressTestDirPath.toFile().mkdirs();
}
} else {
stressTestDirPath = Files.createTempDirectory("bsq" + this.getClass().getSimpleName());
}
return stressTestDirPath;
}
@NotNull
private static NodeAddress newSeedNodeAddress() {
// The address is only considered by ``SeedNodesRepository`` if
// it ends in the digit matching the network identifier.
int port;
do {
port = Utils.findFreeSystemPort();
} while (port % 10 != REGTEST_NETWORK_ID);
return new NodeAddress("localhost", port);
}
@NotNull
private P2PService createPeerNode(int n, int port) {
// peer data directories
final File peerDir = new File(testDataDir.toFile(), String.format("peer-%06d", n));
final File peerTorDir = new File(peerDir, "tor");
final File peerStorageDir = new File(peerDir, "db");
final File peerKeysDir = new File(peerDir, "keys");
//noinspection ResultOfMethodCallIgnored
peerKeysDir.mkdirs(); // needed for creating the key ring
// peer keys
final KeyStorage peerKeyStorage = new KeyStorage(peerKeysDir);
final KeyRing peerKeyRing = new KeyRing(peerKeyStorage);
final EncryptionService peerEncryptionService = new EncryptionService(peerKeyRing);
return new P2PService(seedNodesRepository, port, peerTorDir, useLocalhost,
REGTEST_NETWORK_ID, peerStorageDir, new Clock(), peerEncryptionService, peerKeyRing);
}
// ## TEST SETUP: P2P service listener classes
private class TestSetupListener implements SetupListener {
private final CountDownLatch localServicesLatch;
private final BooleanProperty localServicesFailed;
TestSetupListener(CountDownLatch localServicesLatch, BooleanProperty localServicesFailed) {
this.localServicesLatch = localServicesLatch;
this.localServicesFailed = localServicesFailed;
}
@Override
public void onTorNodeReady() {
// do nothing
}
@Override
public void onHiddenServicePublished() {
// successful result
localServicesLatch.countDown();
}
@Override
public void onSetupFailed(Throwable throwable) {
// failed result
localServicesFailed.set(true);
localServicesLatch.countDown();
}
}
private class SeedServiceListener extends TestSetupListener implements P2PServiceListener {
SeedServiceListener(CountDownLatch localServicesLatch, BooleanProperty localServicesFailed) {
super(localServicesLatch, localServicesFailed);
}
@Override
public void onRequestingDataCompleted() {
// preliminary data not used in single seed node
}
@Override
public void onNoSeedNodeAvailable() {
// expected in single seed node
}
@Override
public void onNoPeersAvailable() {
// expected in single seed node
}
@Override
public void onBootstrapComplete() {
// not used in single seed node
}
}
private class PeerServiceListener extends TestSetupListener implements P2PServiceListener {
private final CountDownLatch prelimDataLatch;
private final CountDownLatch bootstrapLatch;
PeerServiceListener(CountDownLatch localServicesLatch, BooleanProperty localServicesFailed,
CountDownLatch prelimDataLatch, CountDownLatch bootstrapLatch) {
super(localServicesLatch, localServicesFailed);
this.prelimDataLatch = prelimDataLatch;
this.bootstrapLatch = bootstrapLatch;
}
@Override
public void onRequestingDataCompleted() {
// preliminary data received
countDownAndPrint(prelimDataLatch, 'p');
}
@Override
public void onNoSeedNodeAvailable() {
// do nothing
}
@Override
public void onNoPeersAvailable() {
// do nothing
}
@Override
public void onBootstrapComplete() {
// peer bootstrapped
countDownAndPrint(bootstrapLatch, 'b');
}
}
// # TEST CLEANUP
@After
public void tearDown() throws InterruptedException, IOException {
/** A barrier to wait for concurrent shutdown of services. */
final int nNodes = (seedNode != null ? 1 : 0) + peerNodes.size();
final CountDownLatch shutdownLatch = new CountDownLatch(nNodes);
print("stopping all local nodes");
// Stop peer nodes.
for (P2PService peer : peerNodes) {
peer.shutDown(() -> countDownAndPrint(shutdownLatch, '.'));
}
// Stop the seed node.
if (seedNode != null) {
seedNode.shutDown(() -> countDownAndPrint(shutdownLatch, '.'));
}
// Wait for concurrent tasks to finish.
assertLatch("timed out while stopping nodes",
shutdownLatch, MAX_SHUTDOWN_DELAY_SECS * nNodes, TimeUnit.SECONDS);
print("all local nodes stopped");
// Cleanup test data directory.
print("cleaning up test data directory");
if (testDataDir != null) {
deleteTestDataDirectory();
}
}
/**
* Delete the test data directory recursively, unless <code>STRESS_TEST_DIR</code> is defined,
* in which case peer node keys are kept.
*
* @throws IOException
*/
private void deleteTestDataDirectory() throws IOException {
// Based on <https://stackoverflow.com/a/27917071/6239236> by Tomasz Dzięcielewski.
final String stressTestDir = System.getenv(TEST_DIR_ENVVAR);
final boolean keep = (stressTestDir != null) && !stressTestDir.equals("");
Files.walkFileTree(testDataDir, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
final String fileName = file.getFileName().toString();
if (!(keep && (fileName.matches("enc\\.key|sig\\.key|private_key")))) // peer and tor keys
Files.delete(file);
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
// ``dir`` is always a directory, I/O errors may still trigger ``NullPointerException``.
//noinspection ConstantConditions
if (!(keep && dir.toFile().listFiles().length > 0))
Files.delete(dir);
return FileVisitResult.CONTINUE;
}
});
}
// # DIRECT SENDING AND RECEIVING
/** Test each peer sending a direct message to another random peer. */
@Test
public void test_direct() throws InterruptedException {
final int nPeers = peerNodes.size();
BooleanProperty sentDirectFailed = new SimpleBooleanProperty(false);
final List<Long> sentDelays = new Vector<>(nPeers * directCount);
final CountDownLatch sentDirectLatch = new CountDownLatch(directCount * nPeers);
final CountDownLatch receivedDirectLatch = new CountDownLatch(directCount * nPeers);
final long sendStartMillis = System.currentTimeMillis();
for (final P2PService srcPeer : peerNodes) {
final NodeAddress srcPeerAddress = srcPeer.getAddress();
// Make the peer ready for receiving direct messages.
srcPeer.addDecryptedDirectMessageListener((decryptedMsgWithPubKey, peerNodeAddress) -> {
if (!(decryptedMsgWithPubKey.message instanceof StressTestDirectMessage))
return;
StressTestDirectMessage directMessage = (StressTestDirectMessage) (decryptedMsgWithPubKey.message);
if ((directMessage.getData().equals("test/" + srcPeerAddress)))
receivedDirectLatch.countDown();
});
long nextSendMillis = System.currentTimeMillis();
for (int i = 0; i < directCount; i++) {
// Select a random peer (different than source one) and send a direct message to it...
int peerIdx;
NodeAddress peerAddr;
do {
peerIdx = (int) (Math.random() * nPeers);
peerAddr = peerNodes.get(peerIdx).getAddress();
} while (srcPeerAddress.equals(peerAddr));
final int dstPeerIdx = peerIdx;
final NodeAddress dstPeerAddress = peerAddr;
// ...after a random delay not shorter than throttle limits.
nextSendMillis += Math.round(Math.random() * (MAX_DIRECT_DELAY_MILLIS - MIN_DIRECT_DELAY_MILLIS));
final long sendAfterMillis = nextSendMillis - System.currentTimeMillis();
/*print("sending direct message from peer %s to %s in %sms",
srcPeer.getAddress(), dstPeer.getAddress(), sendAfterMillis);*/
UserThread.runAfter(() -> {
final long sendMillis = System.currentTimeMillis();
srcPeer.sendEncryptedDirectMessage(
dstPeerAddress, peerPKRings.get(dstPeerIdx),
new StressTestDirectMessage("test/" + dstPeerAddress),
new SendDirectMessageListener() {
@Override
public void onArrived() {
sentDelays.add(System.currentTimeMillis() - sendMillis);
countDownAndPrint(sentDirectLatch, 'd');
}
@Override
public void onFault() {
sentDirectFailed.set(true);
countDownAndPrint(sentDirectLatch, 'd');
}
});
}, sendAfterMillis, TimeUnit.MILLISECONDS
);
}
}
print("%d direct messages scheduled to be sent by each of %d peers", directCount, nPeers);
// Since receiving is completed before sending is reported to be complete,
// all receiving checks should end before all sending checks to avoid deadlocking.
/** Time to transmit all messages in the worst random case, and with no computation delays. */
final long idealMaxDirectDelay = MAX_DIRECT_DELAY_MILLIS * directCount;
// Wait for peers to complete receiving. We are generous here.
assertLatch("timed out while receiving direct messages",
receivedDirectLatch, 25 * idealMaxDirectDelay, TimeUnit.MILLISECONDS);
final long recvMillis = System.currentTimeMillis() - sendStartMillis;
print("receiving %d direct messages per peer took %ss (%.2f x ideal max)",
directCount, recvMillis / 1000.0, recvMillis / (float) idealMaxDirectDelay);
// Wait for peers to complete sending.
// This should be nearly instantaneous after waiting for reception is completed.
assertLatch("timed out while sending direct messages",
sentDirectLatch, idealMaxDirectDelay / 10, TimeUnit.MILLISECONDS);
Tuple3<Long, Long, Long> mma = minMaxAvg(sentDelays);
print("sending %d direct messages per peer took %ss (min/max/avg %s/%s/%s ms)",
directCount, (System.currentTimeMillis() - sendStartMillis) / 1000.0,
mma.first, mma.second, mma.third);
org.junit.Assert.assertFalse("some peer(s) failed to send a direct message", sentDirectFailed.get());
}
// # DIRECT + MAILBOX SENDING AND RECEIVING
/** Test sending and receiving mailbox messages. */
@Test
public void test_mailbox() throws InterruptedException {
// We start by putting the first half of peers online and the second one offline.
// Then the first online peer sends a number of messages to random peers (regardless of their state),
// so that some messages are delivered directly and others into a mailbox.
// Then the first online peer is put offline and the last offline peer is put online
// (so it can get its mailbox messages),
// and the new first online node sends messages.
// This is repeated until all nodes have been online and offline.
final int nPeers = peerNodes.size();
// No sent latch here since the order of events is different
// depending on whether the message goes direct or via mailbox.
final CountDownLatch receivedMailboxLatch = new CountDownLatch(mailboxCount * nPeers);
// Configure the first half of peers to receive messages...
int firstPeerDown = (int)Math.ceil(nPeers / 2.0);
for (P2PService peer : peerNodes.subList(0, firstPeerDown)) {
addMailboxListeners(peer, receivedMailboxLatch);
}
// ...and put the second half offline.
final CountDownLatch halfShutDown = new CountDownLatch(nPeers / 2);
for (P2PService peer : peerNodes.subList(firstPeerDown, nPeers)) {
peer.shutDown(halfShutDown::countDown);
}
assertLatch("timed out while stopping a half of the peers",
halfShutDown, MAX_SHUTDOWN_DELAY_SECS * nPeers, TimeUnit.SECONDS);
//print("stopped a half of the peers for mailbox test");
// Cycle through peers sending to others, stopping the peer
// and starting one of the stopped peers.
print("%d mailbox messages to be sent by each of %d peers", mailboxCount, nPeers);
BooleanProperty sentMailboxFailed = new SimpleBooleanProperty(false);
final long sendStartMillis = System.currentTimeMillis();
for (int firstOnline = 0, firstOffline = firstPeerDown;
firstOnline < nPeers;
firstOnline++, firstOffline = ++firstOffline % nPeers) {
// The first online peer sends messages to random other peers.
final P2PService onlinePeer = peerNodes.get(firstOnline);
final NodeAddress onlinePeerAddress = onlinePeer.getAddress();
final CountDownLatch sendLatch = new CountDownLatch(mailboxCount);
for (int i = 0; i < mailboxCount; i++) {
// Select a random peer (different than source one)...
int peerIdx;
NodeAddress peerAddr;
do {
peerIdx = (int) (Math.random() * nPeers);
peerAddr = peerNodes.get(peerIdx).getAddress();
} while (onlinePeerAddress.equals(peerAddr));
final int dstPeerIdx = peerIdx;
final NodeAddress dstPeerAddress = peerAddr;
// ...and send a message to it.
onlinePeer.sendEncryptedMailboxMessage(dstPeerAddress, peerPKRings.get(dstPeerIdx),
new StressTestMailboxMessage(onlinePeerAddress, "test/" + dstPeerAddress),
new SendMailboxMessageListener() { // checked in receiver
@Override
public void onArrived() {
sendLatch.countDown();
}
@Override
public void onStoredInMailbox() {
sendLatch.countDown();
}
@Override
public void onFault(String errorMessage) {
sentMailboxFailed.set(true);
sendLatch.countDown();
}
});
}
assertLatch("timed out while sending from peer " + firstOnline,
sendLatch, MAILBOX_DELAY_SECS * mailboxCount, TimeUnit.SECONDS);
// When done, put first online peer offline.
final CountDownLatch stopLatch = new CountDownLatch(1);
onlinePeer.shutDown(stopLatch::countDown);
assertLatch("timed out while stopping peer " + firstOnline,
stopLatch, MAX_SHUTDOWN_DELAY_SECS, TimeUnit.SECONDS);
//print("put peer %d offline", firstOnline);
// When done, put first offline peer online and setup message listeners.
final CountDownLatch startLatch = new CountDownLatch(1);
final P2PService startedPeer = createPeerNode(firstOffline, peerPorts.get(firstOffline));
addMailboxListeners(startedPeer, receivedMailboxLatch);
peerNodes.set(firstOffline, startedPeer);
startedPeer.start(new MailboxStartListener(startLatch));
assertLatch("timed out while starting peer " + firstOffline,
startLatch,
// this assumes some delay per received mailbox message
(MAX_PRELIMINARY_DELAY_SECS + MAX_BOOTSTRAP_DELAY_SECS) + MAILBOX_DELAY_SECS * nPeers,
TimeUnit.SECONDS);
//print("put peer %d online", firstOffline);
}
/** Time to transmit all messages with the estimated per-message delay, with no computation delays. */
final long idealMaxMailboxDelay = 2 * MAILBOX_DELAY_SECS * 1000 * nPeers * mailboxCount;
assertLatch("timed out while receiving mailbox messages",
receivedMailboxLatch, idealMaxMailboxDelay, TimeUnit.MILLISECONDS);
final long recvMillis = System.currentTimeMillis() - sendStartMillis;
print("receiving %d mailbox messages per peer took %ss (%.2f x ideal max)",
mailboxCount, recvMillis / 1000.0, recvMillis / (float) idealMaxMailboxDelay);
org.junit.Assert.assertFalse("some peer(s) failed to send a message", sentMailboxFailed.get());
}
/** Configure the peer to decrease the latch on receipt of mailbox message (direct or via mailbox). */
private void addMailboxListeners(P2PService peer, CountDownLatch receivedMailboxLatch) {
class MailboxMessageListener implements DecryptedDirectMessageListener, DecryptedMailboxListener {
private void handle(DecryptedMsgWithPubKey decryptedMsgWithPubKey) {
if (!(decryptedMsgWithPubKey.message instanceof StressTestMailboxMessage))
return;
StressTestMailboxMessage msg = (StressTestMailboxMessage) (decryptedMsgWithPubKey.message);
if ((msg.getData().equals("test/" + peer.getAddress())))
countDownAndPrint(receivedMailboxLatch, 'm');
}
@Override
public void onDirectMessage(
DecryptedMsgWithPubKey decryptedMsgWithPubKey, NodeAddress srcNodeAddress) {
handle(decryptedMsgWithPubKey);
}
@Override
public void onMailboxMessageAdded(
DecryptedMsgWithPubKey decryptedMsgWithPubKey, NodeAddress srcNodeAddress) {
handle(decryptedMsgWithPubKey);
}
}
final MailboxMessageListener listener = new MailboxMessageListener();
peer.addDecryptedDirectMessageListener(listener);
peer.addDecryptedMailboxListener(listener);
}
private class MailboxStartListener implements P2PServiceListener {
private final CountDownLatch startLatch;
MailboxStartListener(CountDownLatch startLatch) {
this.startLatch = startLatch;
}
@Override
public void onRequestingDataCompleted() {
}
@Override
public void onNoSeedNodeAvailable() {
}
@Override
public void onNoPeersAvailable() {
}
@Override
public void onBootstrapComplete() {
startLatch.countDown();
}
@Override
public void onTorNodeReady() {
}
@Override
public void onHiddenServicePublished() {
}
@Override
public void onSetupFailed(Throwable throwable) {
}
}
}
// # MESSAGE CLASSES
final class StressTestDirectMessage implements DirectMessage {
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;
private final int messageVersion = Version.getP2PMessageVersion();
private String data;
StressTestDirectMessage(String data) {
this.data = data;
}
@Override
public int getMessageVersion() {
return messageVersion;
}
String getData() {
return data;
}
}
final class StressTestMailboxMessage implements MailboxMessage {
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;
private final int messageVersion = Version.getP2PMessageVersion();
private final String uid = UUID.randomUUID().toString();
private NodeAddress senderNodeAddress;
private String data;
StressTestMailboxMessage(NodeAddress sender, String data) {
this.senderNodeAddress = sender;
this.data = data;
}
@Override
public int getMessageVersion() {
return messageVersion;
}
@Override
public NodeAddress getSenderNodeAddress() {
return senderNodeAddress;
}
@Override
public String getUID() {
return uid;
}
String getData() {
return data;
}
}