mirror of
https://github.com/bisq-network/bisq.git
synced 2024-11-19 09:52:23 +01:00
Update guava, cleanup threading
This commit is contained in:
parent
1a66d3cef5
commit
def492a22a
@ -2,19 +2,34 @@ package io.bitsquare.p2p.seed;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import io.bitsquare.common.UserThread;
|
||||
import io.bitsquare.common.util.Utilities;
|
||||
import org.bouncycastle.jce.provider.BouncyCastleProvider;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.Security;
|
||||
import java.util.Random;
|
||||
import java.util.Scanner;
|
||||
import java.util.Timer;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
||||
public class SeedNodeMain {
|
||||
private static final Logger log = LoggerFactory.getLogger(SeedNodeMain.class);
|
||||
private static SeedNodeMain seedNodeMain;
|
||||
private SeedNode seedNode;
|
||||
|
||||
private boolean stopped;
|
||||
|
||||
// args: port useLocalhost seedNodes
|
||||
// eg. 4444 true localhost:7777 localhost:8888
|
||||
// To stop enter: q
|
||||
public static void main(String[] args) throws NoSuchAlgorithmException {
|
||||
seedNodeMain = new SeedNodeMain(args);
|
||||
}
|
||||
|
||||
public SeedNodeMain(String[] args) {
|
||||
Security.addProvider(new BouncyCastleProvider());
|
||||
|
||||
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
|
||||
@ -22,10 +37,41 @@ public class SeedNodeMain {
|
||||
.setDaemon(true)
|
||||
.build();
|
||||
UserThread.setExecutor(Executors.newSingleThreadExecutor(threadFactory));
|
||||
UserThread.execute(() -> {
|
||||
try {
|
||||
seedNode = new SeedNode();
|
||||
seedNode.processArgs(args);
|
||||
seedNode.createAndStartP2PService();
|
||||
} catch (Throwable t) {
|
||||
t.printStackTrace();
|
||||
log.error("Executing task failed. " + t.getMessage());
|
||||
}
|
||||
});
|
||||
listenForExitCommand();
|
||||
}
|
||||
|
||||
SeedNode seedNode = new SeedNode();
|
||||
seedNode.processArgs(args);
|
||||
seedNode.createAndStartP2PService();
|
||||
seedNode.listenForExitCommand();
|
||||
public void listenForExitCommand() {
|
||||
Scanner scan = new Scanner(System.in);
|
||||
String line;
|
||||
while (!stopped && ((line = scan.nextLine()) != null)) {
|
||||
if (line.equals("q")) {
|
||||
if (!stopped) {
|
||||
stopped = true;
|
||||
Timer timeout = Utilities.runTimerTask(() -> {
|
||||
Thread.currentThread().setName("ShutdownTimeout-" + new Random().nextInt(1000));
|
||||
log.error("Timeout occurred at shutDown request");
|
||||
System.exit(1);
|
||||
}, 10);
|
||||
|
||||
if (seedNode != null) {
|
||||
seedNode.shutDown(() -> {
|
||||
timeout.cancel();
|
||||
log.debug("Shutdown seed node complete.");
|
||||
System.exit(0);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -17,8 +17,9 @@
|
||||
|
||||
package io.bitsquare.common;
|
||||
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
public class UserThread {
|
||||
|
||||
@ -30,7 +31,12 @@ public class UserThread {
|
||||
UserThread.executor = executor;
|
||||
}
|
||||
|
||||
public static Executor executor = Executors.newSingleThreadExecutor();
|
||||
static {
|
||||
// If not defined we use same thread as caller thread
|
||||
executor = MoreExecutors.directExecutor();
|
||||
}
|
||||
|
||||
private static Executor executor;
|
||||
|
||||
public static void execute(Runnable command) {
|
||||
UserThread.executor.execute(command);
|
||||
|
@ -19,6 +19,9 @@ package io.bitsquare.common.util;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.io.CharStreams;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.google.gson.*;
|
||||
import javafx.scene.input.Clipboard;
|
||||
import javafx.scene.input.ClipboardContent;
|
||||
@ -32,6 +35,13 @@ import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URLConnection;
|
||||
import java.net.URLEncoder;
|
||||
import java.util.Random;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
/**
|
||||
@ -51,6 +61,58 @@ public class Utilities {
|
||||
return gson.toJson(object);
|
||||
}
|
||||
|
||||
public static ListeningExecutorService getListeningExecutorService(String name,
|
||||
int corePoolSize,
|
||||
int maximumPoolSize,
|
||||
long keepAliveTime) {
|
||||
return MoreExecutors.listeningDecorator(getThreadPoolExecutor(name, corePoolSize, maximumPoolSize, keepAliveTime));
|
||||
}
|
||||
|
||||
public static ThreadPoolExecutor getThreadPoolExecutor(String name,
|
||||
int corePoolSize,
|
||||
int maximumPoolSize,
|
||||
long keepAliveTime) {
|
||||
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
|
||||
.setNameFormat(name)
|
||||
.setDaemon(true)
|
||||
.build();
|
||||
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
|
||||
TimeUnit.SECONDS, new ArrayBlockingQueue<>(maximumPoolSize), threadFactory);
|
||||
threadPoolExecutor.allowCoreThreadTimeOut(true);
|
||||
threadPoolExecutor.setRejectedExecutionHandler((r, executor) -> log.warn("RejectedExecutionHandler called"));
|
||||
return threadPoolExecutor;
|
||||
}
|
||||
|
||||
public static Timer runTimerTaskWithRandomDelay(Runnable runnable, long minDelay, long maxDelay) {
|
||||
return runTimerTaskWithRandomDelay(runnable, minDelay, maxDelay, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public static Timer runTimerTaskWithRandomDelay(Runnable runnable, long minDelay, long maxDelay, TimeUnit timeUnit) {
|
||||
return runTimerTask(runnable, new Random().nextInt((int) (maxDelay - minDelay)) + minDelay, timeUnit);
|
||||
}
|
||||
|
||||
public static Timer runTimerTask(Runnable runnable, long delay) {
|
||||
return runTimerTask(runnable, delay, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public static Timer runTimerTask(Runnable runnable, long delay, TimeUnit timeUnit) {
|
||||
Timer timer = new Timer();
|
||||
timer.schedule(new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
Thread.currentThread().setName("TimerTask-" + new Random().nextInt(10000));
|
||||
try {
|
||||
runnable.run();
|
||||
} catch (Throwable t) {
|
||||
t.printStackTrace();
|
||||
log.error("Executing timerTask failed. " + t.getMessage());
|
||||
}
|
||||
}
|
||||
}, timeUnit.convert(delay, timeUnit));
|
||||
return timer;
|
||||
}
|
||||
|
||||
|
||||
public static boolean isUnix() {
|
||||
return isOSX() || isLinux() || getOSName().contains("freebsd");
|
||||
}
|
||||
|
@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -67,7 +68,7 @@ public class FileManager<T> {
|
||||
private final AtomicBoolean savePending;
|
||||
private final long delay;
|
||||
private final TimeUnit delayTimeUnit;
|
||||
private final Callable<Void> saver;
|
||||
private final Callable<Void> saveFileTask;
|
||||
private T serializable;
|
||||
|
||||
|
||||
@ -88,6 +89,7 @@ public class FileManager<T> {
|
||||
executor = new ScheduledThreadPoolExecutor(1, builder.build());
|
||||
executor.setKeepAliveTime(5, TimeUnit.SECONDS);
|
||||
executor.allowCoreThreadTimeOut(true);
|
||||
executor.setMaximumPoolSize(10);
|
||||
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
|
||||
|
||||
// File must only be accessed from the auto-save executor from now on, to avoid simultaneous access.
|
||||
@ -95,7 +97,8 @@ public class FileManager<T> {
|
||||
this.delay = delay;
|
||||
this.delayTimeUnit = checkNotNull(delayTimeUnit);
|
||||
|
||||
saver = () -> {
|
||||
saveFileTask = () -> {
|
||||
Thread.currentThread().setName("Save-file-task-" + new Random().nextInt(10000));
|
||||
// Runs in an auto save thread.
|
||||
if (!savePending.getAndSet(false)) {
|
||||
// Some other scheduled request already beat us to it.
|
||||
@ -137,7 +140,7 @@ public class FileManager<T> {
|
||||
|
||||
if (savePending.getAndSet(true))
|
||||
return; // Already pending.
|
||||
executor.schedule(saver, delay, delayTimeUnit);
|
||||
executor.schedule(saveFileTask, delay, delayTimeUnit);
|
||||
}
|
||||
|
||||
public synchronized T read(File file) {
|
||||
|
@ -121,7 +121,10 @@ public class WalletService {
|
||||
|
||||
Timer timeoutTimer = FxTimer.runLater(
|
||||
Duration.ofMillis(STARTUP_TIMEOUT),
|
||||
() -> exceptionHandler.handleException(new TimeoutException("Wallet did not initialize in " + STARTUP_TIMEOUT / 1000 + " seconds."))
|
||||
() -> {
|
||||
Thread.currentThread().setName("WalletService:StartupTimeout-" + new Random().nextInt(1000));
|
||||
exceptionHandler.handleException(new TimeoutException("Wallet did not initialize in " + STARTUP_TIMEOUT / 1000 + " seconds."));
|
||||
}
|
||||
);
|
||||
|
||||
// If seed is non-null it means we are restoring from backup.
|
||||
|
@ -1,16 +1,14 @@
|
||||
package io.bitsquare.crypto;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.google.protobuf.ByteString;
|
||||
import io.bitsquare.common.UserThread;
|
||||
import io.bitsquare.common.util.Utilities;
|
||||
import org.bitcoinj.crypto.KeyCrypterScrypt;
|
||||
import org.bitcoinj.wallet.Protos;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.spongycastle.crypto.params.KeyParameter;
|
||||
|
||||
import java.util.concurrent.*;
|
||||
|
||||
//TODO: Borrowed form BitcoinJ/Lighthouse. Remove Protos dependency, check complete code logic.
|
||||
public class ScryptUtil {
|
||||
private static final Logger log = LoggerFactory.getLogger(ScryptUtil.class);
|
||||
@ -30,13 +28,7 @@ public class ScryptUtil {
|
||||
}
|
||||
|
||||
public static void deriveKeyWithScrypt(KeyCrypterScrypt keyCrypterScrypt, String password, DeriveKeyResultHandler resultHandler) {
|
||||
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
|
||||
.setNameFormat("Routing-%d")
|
||||
.setDaemon(true)
|
||||
.build();
|
||||
|
||||
ExecutorService executorService = new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory);
|
||||
executorService.submit(() -> {
|
||||
Utilities.getThreadPoolExecutor("ScryptUtil:deriveKeyWithScrypt-%d", 1, 2, 5L).submit(() -> {
|
||||
try {
|
||||
log.info("Doing key derivation");
|
||||
long start = System.currentTimeMillis();
|
||||
|
@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
|
||||
import java.io.Serializable;
|
||||
import java.time.Duration;
|
||||
import java.util.Date;
|
||||
import java.util.Random;
|
||||
|
||||
public class OpenOffer implements Tradable, Serializable {
|
||||
// That object is saved to disc. We need to take care of changes to not break deserialization.
|
||||
@ -102,6 +103,7 @@ public class OpenOffer implements Tradable, Serializable {
|
||||
timeoutTimer = FxTimer.runLater(
|
||||
Duration.ofMillis(TIMEOUT),
|
||||
() -> {
|
||||
Thread.currentThread().setName("OpenOffer:Timeout-" + new Random().nextInt(1000));
|
||||
log.debug("Timeout reached");
|
||||
if (state == State.RESERVED)
|
||||
setState(State.AVAILABLE);
|
||||
|
@ -50,8 +50,6 @@ import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.google.inject.internal.util.$Preconditions.checkNotNull;
|
||||
import static io.bitsquare.util.Validator.nonEmptyStringOf;
|
||||
@ -70,7 +68,6 @@ public class OpenOfferManager {
|
||||
private final TradableList<OpenOffer> openOffers;
|
||||
private final Storage<TradableList<OpenOffer>> openOffersStorage;
|
||||
private boolean shutDownRequested;
|
||||
private ScheduledThreadPoolExecutor executor;
|
||||
private P2PServiceListener p2PServiceListener;
|
||||
private final Timer timer = new Timer();
|
||||
|
||||
@ -181,7 +178,7 @@ public class OpenOfferManager {
|
||||
}
|
||||
|
||||
private void rePublishOffers() {
|
||||
log.trace("rePublishOffers");
|
||||
if (!openOffers.isEmpty()) log.trace("rePublishOffers");
|
||||
for (OpenOffer openOffer : openOffers) {
|
||||
offerBookService.addOffer(openOffer.getOffer(),
|
||||
() -> log.debug("Successful added offer to P2P network"),
|
||||
@ -196,14 +193,8 @@ public class OpenOfferManager {
|
||||
}
|
||||
|
||||
public void shutDown(Runnable completeHandler) {
|
||||
if (executor != null) {
|
||||
executor.shutdown();
|
||||
try {
|
||||
executor.awaitTermination(5, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
if (timer != null)
|
||||
timer.cancel();
|
||||
|
||||
if (!shutDownRequested) {
|
||||
log.debug("shutDown");
|
||||
|
@ -34,6 +34,7 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Random;
|
||||
|
||||
import static io.bitsquare.util.Validator.nonEmptyStringOf;
|
||||
|
||||
@ -144,6 +145,7 @@ public class OfferAvailabilityProtocol {
|
||||
stopTimeout();
|
||||
|
||||
timeoutTimer = FxTimer.runLater(Duration.ofMillis(TIMEOUT), () -> {
|
||||
Thread.currentThread().setName("OfferAvailabilityProtocol:Timeout-" + new Random().nextInt(1000));
|
||||
log.warn("Timeout reached");
|
||||
errorMessageHandler.handleErrorMessage("Timeout reached: Peer has not responded.");
|
||||
});
|
||||
|
@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
|
||||
import java.security.PublicKey;
|
||||
import java.time.Duration;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
|
||||
import static io.bitsquare.util.Validator.nonEmptyStringOf;
|
||||
|
||||
@ -126,6 +127,7 @@ public abstract class TradeProtocol {
|
||||
stopTimeout();
|
||||
|
||||
timeoutTimer = FxTimer.runLater(Duration.ofMillis(TIMEOUT), () -> {
|
||||
Thread.currentThread().setName("TradeProtocol:Timeout-" + new Random().nextInt(1000));
|
||||
log.error("Timeout reached");
|
||||
trade.setErrorMessage("A timeout occurred.");
|
||||
cleanupTradable();
|
||||
|
@ -3,7 +3,6 @@ package io.bitsquare.p2p;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.name.Named;
|
||||
import io.bitsquare.app.ProgramArguments;
|
||||
@ -12,6 +11,7 @@ import io.bitsquare.common.crypto.CryptoException;
|
||||
import io.bitsquare.common.crypto.KeyRing;
|
||||
import io.bitsquare.common.crypto.PubKeyRing;
|
||||
import io.bitsquare.common.crypto.SealedAndSigned;
|
||||
import io.bitsquare.common.util.Utilities;
|
||||
import io.bitsquare.crypto.EncryptionService;
|
||||
import io.bitsquare.crypto.SealedAndSignedMessage;
|
||||
import io.bitsquare.p2p.messaging.*;
|
||||
@ -36,7 +36,8 @@ import java.io.File;
|
||||
import java.math.BigInteger;
|
||||
import java.security.PublicKey;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
@ -78,7 +79,6 @@ public class P2PService {
|
||||
private boolean allSeedNodesRequested;
|
||||
private Timer sendGetAllDataMessageTimer;
|
||||
private volatile boolean hiddenServiceReady;
|
||||
private final ExecutorService executorService;
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Constructor
|
||||
@ -102,13 +102,6 @@ public class P2PService {
|
||||
|
||||
networkStatistics = new NetworkStatistics();
|
||||
|
||||
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
|
||||
.setNameFormat("P2PService-%d")
|
||||
.setDaemon(true)
|
||||
.build();
|
||||
|
||||
executorService = new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory);
|
||||
|
||||
init();
|
||||
}
|
||||
|
||||
@ -204,7 +197,7 @@ public class P2PService {
|
||||
|
||||
networkNode.addMessageListener((message, connection) -> {
|
||||
if (message instanceof GetDataSetMessage) {
|
||||
log.trace("Received GetAllDataMessage: " + message);
|
||||
log.trace("Received GetDataSetMessage: " + message);
|
||||
|
||||
// we only reply if we did not get the message form ourselves (in case we are a seed node)
|
||||
if (!getDataSetMessageNonceList.contains(((GetDataSetMessage) message).nonce)) {
|
||||
@ -595,19 +588,17 @@ public class P2PService {
|
||||
// we try to connect to 2 seed nodes
|
||||
if (connectedSeedNodes.size() < 2 && !remainingSeedNodeAddresses.isEmpty()) {
|
||||
// give a random pause of 1-3 sec. before using the next
|
||||
sendGetAllDataMessageTimer = new Timer();
|
||||
sendGetAllDataMessageTimer.schedule(new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
Thread.currentThread().setName("SendGetAllDataMessageTimer-" + new Random().nextInt(1000));
|
||||
try {
|
||||
UserThread.execute(() -> sendGetAllDataMessage(remainingSeedNodeAddresses));
|
||||
} catch (Throwable t) {
|
||||
t.printStackTrace();
|
||||
log.error("Executing task failed. " + t.getMessage());
|
||||
}
|
||||
|
||||
if (sendGetAllDataMessageTimer != null) sendGetAllDataMessageTimer.cancel();
|
||||
sendGetAllDataMessageTimer = Utilities.runTimerTaskWithRandomDelay(() -> {
|
||||
Thread.currentThread().setName("SendGetAllDataMessageTimer-" + new Random().nextInt(1000));
|
||||
try {
|
||||
UserThread.execute(() -> sendGetAllDataMessage(remainingSeedNodeAddresses));
|
||||
} catch (Throwable t) {
|
||||
t.printStackTrace();
|
||||
log.error("Executing task failed. " + t.getMessage());
|
||||
}
|
||||
}, new Random().nextInt(2000) + 1000);
|
||||
}, 1, 3);
|
||||
} else {
|
||||
allSeedNodesRequested = true;
|
||||
}
|
||||
@ -617,7 +608,7 @@ public class P2PService {
|
||||
public void onFailure(Throwable throwable) {
|
||||
log.info("Send GetAllDataMessage to " + candidate + " failed. Exception:" + throwable.getMessage());
|
||||
log.trace("We try to connect another random seed node. " + remainingSeedNodeAddresses);
|
||||
sendGetAllDataMessage(remainingSeedNodeAddresses);
|
||||
UserThread.execute(() -> sendGetAllDataMessage(remainingSeedNodeAddresses));
|
||||
}
|
||||
});
|
||||
} else {
|
||||
|
@ -8,10 +8,7 @@ import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.net.ServerSocket;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.zip.DataFormatException;
|
||||
import java.util.zip.Deflater;
|
||||
import java.util.zip.Inflater;
|
||||
@ -31,22 +28,6 @@ public class Utils {
|
||||
}
|
||||
}
|
||||
|
||||
public static void shutDownExecutorService(ExecutorService executorService) {
|
||||
shutDownExecutorService(executorService, 200);
|
||||
}
|
||||
|
||||
public static void shutDownExecutorService(ExecutorService executorService, long waitBeforeShutDown) {
|
||||
executorService.shutdown();
|
||||
try {
|
||||
boolean done = executorService.awaitTermination(waitBeforeShutDown, TimeUnit.MILLISECONDS);
|
||||
if (!done) log.trace("Not all tasks completed at shutdown.");
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
final List<Runnable> rejected = executorService.shutdownNow();
|
||||
log.debug("Rejected tasks: {}", rejected.size());
|
||||
}
|
||||
|
||||
public static byte[] compress(Serializable input) {
|
||||
return compress(ByteArrayUtils.objectToByteArray(input));
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
package io.bitsquare.p2p.network;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.common.util.concurrent.Uninterruptibles;
|
||||
import io.bitsquare.common.ByteArrayUtils;
|
||||
import io.bitsquare.common.UserThread;
|
||||
@ -20,7 +20,10 @@ import java.net.SocketTimeoutException;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Connection is created by the server thread or by send message from NetworkNode.
|
||||
@ -33,12 +36,13 @@ public class Connection {
|
||||
private static final int MAX_ILLEGAL_REQUESTS = 5;
|
||||
private static final int SOCKET_TIMEOUT = 30 * 60 * 1000; // 30 min.
|
||||
private InputHandler inputHandler;
|
||||
private boolean isAuthenticated;
|
||||
|
||||
public static int getMaxMsgSize() {
|
||||
return MAX_MSG_SIZE;
|
||||
}
|
||||
|
||||
private final int port;
|
||||
private final String portInfo;
|
||||
private final String uid;
|
||||
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
|
||||
|
||||
@ -65,7 +69,7 @@ public class Connection {
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public Connection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener) {
|
||||
port = socket.getLocalPort();
|
||||
portInfo = "localPort=" + socket.getLocalPort() + "/port=" + socket.getPort();
|
||||
uid = UUID.randomUUID().toString();
|
||||
|
||||
init(socket, messageListener, connectionListener);
|
||||
@ -84,7 +88,7 @@ public class Connection {
|
||||
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
|
||||
|
||||
// We create a thread for handling inputStream data
|
||||
inputHandler = new InputHandler(sharedSpace, objectInputStream, port);
|
||||
inputHandler = new InputHandler(sharedSpace, objectInputStream, portInfo);
|
||||
executorService.submit(inputHandler);
|
||||
} catch (IOException e) {
|
||||
sharedSpace.handleConnectionException(e);
|
||||
@ -103,14 +107,14 @@ public class Connection {
|
||||
|
||||
public synchronized void setAuthenticated(Address peerAddress, Connection connection) {
|
||||
this.peerAddress = peerAddress;
|
||||
isAuthenticated = true;
|
||||
UserThread.execute(() -> sharedSpace.getConnectionListener().onPeerAddressAuthenticated(peerAddress, connection));
|
||||
}
|
||||
|
||||
public void sendMessage(Message message) {
|
||||
// That method we get called form user thread
|
||||
if (!stopped) {
|
||||
try {
|
||||
log.trace("writeObject " + message + " on connection with port " + port);
|
||||
log.trace("writeObject " + message + " on connection with port " + portInfo);
|
||||
if (!stopped) {
|
||||
Object objectToWrite;
|
||||
if (useCompression) {
|
||||
@ -156,7 +160,7 @@ public class Connection {
|
||||
}
|
||||
|
||||
public synchronized boolean isAuthenticated() {
|
||||
return peerAddress != null;
|
||||
return isAuthenticated;
|
||||
}
|
||||
|
||||
public String getUid() {
|
||||
@ -204,30 +208,42 @@ public class Connection {
|
||||
UserThread.execute(() -> sharedSpace.getConnectionListener().onDisconnect(ConnectionListener.Reason.SHUT_DOWN, this));
|
||||
|
||||
if (sendCloseConnectionMessage) {
|
||||
sendMessage(new CloseConnectionMessage());
|
||||
// give a bit of time for closing gracefully
|
||||
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
executorService.submit(() -> {
|
||||
Thread.currentThread().setName("Connection:Send-CloseConnectionMessage-" + this.getObjectId());
|
||||
try {
|
||||
sendMessage(new CloseConnectionMessage());
|
||||
// give a bit of time for closing gracefully
|
||||
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
|
||||
|
||||
try {
|
||||
sharedSpace.getSocket().close();
|
||||
} catch (SocketException e) {
|
||||
log.trace("SocketException at shutdown might be expected " + e.getMessage());
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
Utils.shutDownExecutorService(executorService);
|
||||
|
||||
log.debug("Connection shutdown complete " + this.toString());
|
||||
// dont use executorService as its shut down but call handler on own thread
|
||||
// to not get interrupted by caller
|
||||
if (shutDownCompleteHandler != null)
|
||||
new Thread(shutDownCompleteHandler).start();
|
||||
UserThread.execute(() -> continueShutDown(shutDownCompleteHandler));
|
||||
} catch (Throwable t) {
|
||||
throw t;
|
||||
}
|
||||
});
|
||||
} else {
|
||||
continueShutDown(shutDownCompleteHandler);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void continueShutDown(@Nullable Runnable shutDownCompleteHandler) {
|
||||
try {
|
||||
sharedSpace.getSocket().close();
|
||||
} catch (SocketException e) {
|
||||
log.trace("SocketException at shutdown might be expected " + e.getMessage());
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
MoreExecutors.shutdownAndAwaitTermination(executorService, 500, TimeUnit.MILLISECONDS);
|
||||
|
||||
log.debug("Connection shutdown complete " + this.toString());
|
||||
// dont use executorService as its shut down but call handler on own thread
|
||||
// to not get interrupted by caller
|
||||
if (shutDownCompleteHandler != null)
|
||||
UserThread.execute(shutDownCompleteHandler);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
@ -236,14 +252,14 @@ public class Connection {
|
||||
|
||||
Connection that = (Connection) o;
|
||||
|
||||
if (port != that.port) return false;
|
||||
if (portInfo != null ? !portInfo.equals(that.portInfo) : that.portInfo != null) return false;
|
||||
return !(uid != null ? !uid.equals(that.uid) : that.uid != null);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = port;
|
||||
int result = portInfo != null ? portInfo.hashCode() : 0;
|
||||
result = 31 * result + (uid != null ? uid.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
@ -251,7 +267,7 @@ public class Connection {
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Connection{" +
|
||||
"port=" + port +
|
||||
"portInfo=" + portInfo +
|
||||
", uid='" + uid + '\'' +
|
||||
", objectId='" + getObjectId() + '\'' +
|
||||
", sharedSpace=" + sharedSpace.toString() +
|
||||
@ -266,6 +282,10 @@ public class Connection {
|
||||
return super.toString().split("@")[1].toString();
|
||||
}
|
||||
|
||||
public void setPeerAddress(@Nullable Address peerAddress) {
|
||||
this.peerAddress = peerAddress;
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// SharedSpace
|
||||
@ -379,31 +399,23 @@ public class Connection {
|
||||
|
||||
private final SharedSpace sharedSpace;
|
||||
private final ObjectInputStream objectInputStream;
|
||||
private final int port;
|
||||
private final ExecutorService executorService;
|
||||
private final String portInfo;
|
||||
private volatile boolean stopped;
|
||||
|
||||
public InputHandler(SharedSpace sharedSpace, ObjectInputStream objectInputStream, int port) {
|
||||
public InputHandler(SharedSpace sharedSpace, ObjectInputStream objectInputStream, String portInfo) {
|
||||
this.sharedSpace = sharedSpace;
|
||||
this.objectInputStream = objectInputStream;
|
||||
this.port = port;
|
||||
|
||||
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
|
||||
.setNameFormat("InputHandler-onMessage-" + port)
|
||||
.setDaemon(true)
|
||||
.build();
|
||||
executorService = new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory);
|
||||
this.portInfo = portInfo;
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
stopped = true;
|
||||
Utils.shutDownExecutorService(executorService);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
Thread.currentThread().setName("InputHandler-" + port);
|
||||
Thread.currentThread().setName("InputHandler-" + portInfo);
|
||||
while (!stopped && !Thread.currentThread().isInterrupted()) {
|
||||
try {
|
||||
log.trace("InputHandler waiting for incoming messages connection=" + sharedSpace.getConnectionId());
|
||||
@ -447,14 +459,7 @@ public class Connection {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
executorService.submit(() -> {
|
||||
try {
|
||||
sharedSpace.onMessage(message);
|
||||
} catch (Throwable t) {
|
||||
t.printStackTrace();
|
||||
log.error("Executing task failed. " + t.getMessage());
|
||||
}
|
||||
});
|
||||
sharedSpace.onMessage(message);
|
||||
}
|
||||
} else {
|
||||
sharedSpace.reportIllegalRequest(IllegalRequest.InvalidDataType);
|
||||
@ -482,7 +487,7 @@ public class Connection {
|
||||
public String toString() {
|
||||
return "InputHandler{" +
|
||||
"sharedSpace=" + sharedSpace +
|
||||
", port=" + port +
|
||||
", port=" + portInfo +
|
||||
", stopped=" + stopped +
|
||||
'}';
|
||||
}
|
||||
|
@ -1,6 +1,9 @@
|
||||
package io.bitsquare.p2p.network;
|
||||
|
||||
import com.google.common.util.concurrent.*;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.Uninterruptibles;
|
||||
import com.msopentech.thali.java.toronionproxy.JavaOnionProxyContext;
|
||||
import com.msopentech.thali.java.toronionproxy.JavaOnionProxyManager;
|
||||
import io.bitsquare.common.UserThread;
|
||||
@ -16,7 +19,7 @@ import java.net.BindException;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public class LocalhostNetworkNode extends NetworkNode {
|
||||
@ -47,11 +50,7 @@ public class LocalhostNetworkNode extends NetworkNode {
|
||||
public void start(@Nullable SetupListener setupListener) {
|
||||
if (setupListener != null) addSetupListener(setupListener);
|
||||
|
||||
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
|
||||
.setNameFormat("NetworkNode-" + port)
|
||||
.setDaemon(true)
|
||||
.build();
|
||||
executorService = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory));
|
||||
createExecutor();
|
||||
|
||||
//Tor delay simulation
|
||||
createTorNode(torNode -> {
|
||||
@ -92,19 +91,22 @@ public class LocalhostNetworkNode extends NetworkNode {
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
private void createTorNode(final Consumer<TorNode> resultHandler) {
|
||||
Callable<TorNode<JavaOnionProxyManager, JavaOnionProxyContext>> task = () -> {
|
||||
Thread.currentThread().setName("CreateTorNode-" + new Random().nextInt(1000));
|
||||
long ts = System.currentTimeMillis();
|
||||
if (simulateTorDelayTorNode > 0)
|
||||
Uninterruptibles.sleepUninterruptibly(simulateTorDelayTorNode, TimeUnit.MILLISECONDS);
|
||||
ListenableFuture<TorNode<JavaOnionProxyManager, JavaOnionProxyContext>> future = executorService.submit(() -> {
|
||||
Thread.currentThread().setName("NetworkNode:CreateTorNode-" + new Random().nextInt(1000));
|
||||
try {
|
||||
long ts = System.currentTimeMillis();
|
||||
if (simulateTorDelayTorNode > 0)
|
||||
Uninterruptibles.sleepUninterruptibly(simulateTorDelayTorNode, TimeUnit.MILLISECONDS);
|
||||
|
||||
log.info("\n\n############################################################\n" +
|
||||
"TorNode created [simulation]:" +
|
||||
"\nTook " + (System.currentTimeMillis() - ts) + " ms"
|
||||
+ "\n############################################################\n");
|
||||
return null;
|
||||
};
|
||||
ListenableFuture<TorNode<JavaOnionProxyManager, JavaOnionProxyContext>> future = executorService.submit(task);
|
||||
log.info("\n\n############################################################\n" +
|
||||
"TorNode created [simulation]:" +
|
||||
"\nTook " + (System.currentTimeMillis() - ts) + " ms"
|
||||
+ "\n############################################################\n");
|
||||
return null;
|
||||
} catch (Throwable t) {
|
||||
throw t;
|
||||
}
|
||||
});
|
||||
Futures.addCallback(future, new FutureCallback<TorNode<JavaOnionProxyManager, JavaOnionProxyContext>>() {
|
||||
public void onSuccess(TorNode<JavaOnionProxyManager, JavaOnionProxyContext> torNode) {
|
||||
UserThread.execute(() -> resultHandler.accept(torNode));
|
||||
@ -117,19 +119,22 @@ public class LocalhostNetworkNode extends NetworkNode {
|
||||
}
|
||||
|
||||
private void createHiddenService(final Consumer<HiddenServiceDescriptor> resultHandler) {
|
||||
Callable<HiddenServiceDescriptor> task = () -> {
|
||||
Thread.currentThread().setName("CreateHiddenService-" + new Random().nextInt(1000));
|
||||
long ts = System.currentTimeMillis();
|
||||
if (simulateTorDelayHiddenService > 0)
|
||||
Uninterruptibles.sleepUninterruptibly(simulateTorDelayHiddenService, TimeUnit.MILLISECONDS);
|
||||
ListenableFuture<HiddenServiceDescriptor> future = executorService.submit(() -> {
|
||||
Thread.currentThread().setName("NetworkNode:CreateHiddenService-" + new Random().nextInt(1000));
|
||||
try {
|
||||
long ts = System.currentTimeMillis();
|
||||
if (simulateTorDelayHiddenService > 0)
|
||||
Uninterruptibles.sleepUninterruptibly(simulateTorDelayHiddenService, TimeUnit.MILLISECONDS);
|
||||
|
||||
log.info("\n\n############################################################\n" +
|
||||
"Hidden service created [simulation]:" +
|
||||
"\nTook " + (System.currentTimeMillis() - ts) + " ms"
|
||||
+ "\n############################################################\n");
|
||||
return null;
|
||||
};
|
||||
ListenableFuture<HiddenServiceDescriptor> future = executorService.submit(task);
|
||||
log.info("\n\n############################################################\n" +
|
||||
"Hidden service created [simulation]:" +
|
||||
"\nTook " + (System.currentTimeMillis() - ts) + " ms"
|
||||
+ "\n############################################################\n");
|
||||
return null;
|
||||
} catch (Throwable t) {
|
||||
throw t;
|
||||
}
|
||||
});
|
||||
Futures.addCallback(future, new FutureCallback<HiddenServiceDescriptor>() {
|
||||
public void onSuccess(HiddenServiceDescriptor hiddenServiceDescriptor) {
|
||||
UserThread.execute(() -> resultHandler.accept(hiddenServiceDescriptor));
|
||||
|
@ -2,6 +2,7 @@ package io.bitsquare.p2p.network;
|
||||
|
||||
import com.google.common.util.concurrent.*;
|
||||
import io.bitsquare.common.UserThread;
|
||||
import io.bitsquare.common.util.Utilities;
|
||||
import io.bitsquare.p2p.Address;
|
||||
import io.bitsquare.p2p.Message;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
@ -10,11 +11,15 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
@ -22,8 +27,8 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
|
||||
private static final Logger log = LoggerFactory.getLogger(NetworkNode.class);
|
||||
|
||||
protected final int port;
|
||||
private final Set<Connection> outBoundConnections = Collections.synchronizedSet(new HashSet<>());
|
||||
private final Set<Connection> inBoundConnections = Collections.synchronizedSet(new HashSet<>());
|
||||
private final Set<Connection> outBoundConnections = new CopyOnWriteArraySet<>();
|
||||
private final Set<Connection> inBoundConnections = new CopyOnWriteArraySet<>();
|
||||
private final List<MessageListener> messageListeners = new CopyOnWriteArrayList<>();
|
||||
private final List<ConnectionListener> connectionListeners = new CopyOnWriteArrayList<>();
|
||||
protected final List<SetupListener> setupListeners = new CopyOnWriteArrayList<>();
|
||||
@ -76,15 +81,16 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
|
||||
return sendMessage(connection, message);
|
||||
} else {
|
||||
final SettableFuture<Connection> resultFuture = SettableFuture.create();
|
||||
Callable<Connection> task = () -> {
|
||||
Connection newConnection;
|
||||
ListenableFuture<Connection> future = executorService.submit(() -> {
|
||||
Thread.currentThread().setName("NetworkNode:SendMessage-create-new-outbound-connection-to-" + peerAddress);
|
||||
try {
|
||||
Thread.currentThread().setName("Outgoing-connection-to-" + peerAddress);
|
||||
Connection newConnection;
|
||||
log.trace("We have not found any connection for that peerAddress. " +
|
||||
"We will create a new outbound connection.");
|
||||
try {
|
||||
Socket socket = getSocket(peerAddress); // can take a while when using tor
|
||||
newConnection = new Connection(socket, NetworkNode.this, NetworkNode.this);
|
||||
newConnection.setPeerAddress(peerAddress);
|
||||
outBoundConnections.add(newConnection);
|
||||
|
||||
log.info("\n\nNetworkNode created new outbound connection:"
|
||||
@ -93,28 +99,26 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
|
||||
+ "\nmessage=" + message
|
||||
+ "\n\n");
|
||||
} catch (Throwable t) {
|
||||
UserThread.execute(() -> resultFuture.setException(t));
|
||||
return null;
|
||||
throw t;
|
||||
}
|
||||
|
||||
newConnection.sendMessage(message);
|
||||
|
||||
return newConnection;
|
||||
} catch (Throwable t) {
|
||||
t.printStackTrace();
|
||||
log.error("Executing task failed. " + t.getMessage());
|
||||
UserThread.execute(() -> resultFuture.setException(t));
|
||||
throw t;
|
||||
}
|
||||
};
|
||||
|
||||
ListenableFuture<Connection> future = executorService.submit(task);
|
||||
});
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
public void onSuccess(Connection connection) {
|
||||
UserThread.execute(() -> resultFuture.set(connection));
|
||||
}
|
||||
|
||||
public void onFailure(@NotNull Throwable throwable) {
|
||||
if (!(throwable instanceof ConnectException)) {
|
||||
throwable.printStackTrace();
|
||||
log.error("Executing task failed. " + throwable.getMessage());
|
||||
}
|
||||
UserThread.execute(() -> resultFuture.setException(throwable));
|
||||
}
|
||||
});
|
||||
@ -123,9 +127,15 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
|
||||
}
|
||||
|
||||
public SettableFuture<Connection> sendMessage(Connection connection, Message message) {
|
||||
// connection.sendMessage might take a bit (compression, write to stream), so we use a thread to not block
|
||||
ListenableFuture<Connection> future = executorService.submit(() -> {
|
||||
connection.sendMessage(message);
|
||||
return connection;
|
||||
Thread.currentThread().setName("NetworkNode:SendMessage-to-connection-" + connection.getObjectId());
|
||||
try {
|
||||
connection.sendMessage(message);
|
||||
return connection;
|
||||
} catch (Throwable t) {
|
||||
throw t;
|
||||
}
|
||||
});
|
||||
final SettableFuture<Connection> resultFuture = SettableFuture.create();
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@ -234,6 +244,10 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
|
||||
// Protected
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
protected void createExecutor() {
|
||||
executorService = Utilities.getListeningExecutorService("NetworkNode-" + port, 20, 50, 120L);
|
||||
}
|
||||
|
||||
protected void startServer(ServerSocket serverSocket) {
|
||||
server = new Server(serverSocket,
|
||||
(message, connection) -> NetworkNode.this.onMessage(message, connection),
|
||||
|
@ -29,13 +29,14 @@ public class Server implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
Thread.currentThread().setName("Server-" + serverSocket.getLocalPort());
|
||||
// Thread created by NetworkNode
|
||||
Thread.currentThread().setName("NetworkNode:Server-" + serverSocket.getLocalPort());
|
||||
try {
|
||||
while (!stopped && !Thread.currentThread().isInterrupted()) {
|
||||
log.info("Ready to accept new clients on port " + serverSocket.getLocalPort());
|
||||
final Socket socket = serverSocket.accept();
|
||||
if (!stopped) {
|
||||
log.info("Accepted new client on port " + socket.getLocalPort());
|
||||
log.info("Accepted new client on localPort/port " + socket.getLocalPort() + "/" + socket.getPort());
|
||||
Connection connection = new Connection(socket, messageListener, connectionListener);
|
||||
log.info("\n\nServer created new inbound connection:"
|
||||
+ "\nserverSocket.getLocalPort()=" + serverSocket.getLocalPort()
|
||||
|
@ -1,11 +1,14 @@
|
||||
package io.bitsquare.p2p.network;
|
||||
|
||||
import com.google.common.util.concurrent.*;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.msopentech.thali.java.toronionproxy.JavaOnionProxyContext;
|
||||
import com.msopentech.thali.java.toronionproxy.JavaOnionProxyManager;
|
||||
import io.bitsquare.common.UserThread;
|
||||
import io.bitsquare.common.util.Utilities;
|
||||
import io.bitsquare.p2p.Address;
|
||||
import io.bitsquare.p2p.Utils;
|
||||
import io.nucleo.net.HiddenServiceDescriptor;
|
||||
import io.nucleo.net.TorNode;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
@ -17,12 +20,10 @@ import java.io.IOException;
|
||||
import java.net.Socket;
|
||||
import java.util.Random;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
public class TorNetworkNode extends NetworkNode {
|
||||
private static final Logger log = LoggerFactory.getLogger(TorNetworkNode.class);
|
||||
@ -65,12 +66,7 @@ public class TorNetworkNode extends NetworkNode {
|
||||
if (setupListener != null)
|
||||
addSetupListener(setupListener);
|
||||
|
||||
// executorService might have been shutdown before a restart, so we create a new one
|
||||
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
|
||||
.setNameFormat("NetworkNode-" + port)
|
||||
.setDaemon(true)
|
||||
.build();
|
||||
executorService = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory));
|
||||
createExecutor();
|
||||
|
||||
// Create the tor node (takes about 6 sec.)
|
||||
createTorNode(torDir, torNode -> {
|
||||
@ -83,9 +79,12 @@ public class TorNetworkNode extends NetworkNode {
|
||||
TorNetworkNode.this.hiddenServiceDescriptor = hiddenServiceDescriptor;
|
||||
|
||||
startServer(hiddenServiceDescriptor.getServerSocket());
|
||||
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
|
||||
|
||||
setupListeners.stream().forEach(e -> UserThread.execute(() -> e.onHiddenServiceReady()));
|
||||
Runnable task = () -> {
|
||||
Thread.currentThread().setName("DelayNotifySetupListenersTimer-" + new Random().nextInt(1000));
|
||||
setupListeners.stream()
|
||||
.forEach(e -> UserThread.execute(() -> e.onHiddenServiceReady()));
|
||||
};
|
||||
Utilities.runTimerTask(task, 500, TimeUnit.MILLISECONDS);
|
||||
});
|
||||
});
|
||||
}
|
||||
@ -102,32 +101,32 @@ public class TorNetworkNode extends NetworkNode {
|
||||
public void shutDown(Runnable shutDownCompleteHandler) {
|
||||
log.info("Shutdown TorNetworkNode");
|
||||
this.shutDownCompleteHandler = shutDownCompleteHandler;
|
||||
checkNotNull(executorService, "executorService must not be null");
|
||||
|
||||
shutDownTimeoutTimer = new Timer();
|
||||
shutDownTimeoutTimer.schedule(new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
Thread.currentThread().setName("ShutDownTimeoutTimer-" + new Random().nextInt(1000));
|
||||
log.error("A timeout occurred at shutDown");
|
||||
shutDownExecutorService();
|
||||
}
|
||||
}, SHUT_DOWN_TIMEOUT);
|
||||
shutDownTimeoutTimer = Utilities.runTimerTask(() -> {
|
||||
Thread.currentThread().setName("ShutDownTimeoutTimer-" + new Random().nextInt(1000));
|
||||
log.error("A timeout occurred at shutDown");
|
||||
shutDownExecutorService();
|
||||
}, SHUT_DOWN_TIMEOUT, TimeUnit.DAYS.MILLISECONDS);
|
||||
|
||||
executorService.submit(() -> super.shutDown(() -> {
|
||||
networkNodeShutDownDoneComplete = true;
|
||||
if (torShutDownComplete)
|
||||
shutDownExecutorService();
|
||||
}));
|
||||
if (executorService != null) {
|
||||
executorService.submit(() -> super.shutDown(() -> {
|
||||
networkNodeShutDownDoneComplete = true;
|
||||
if (torShutDownComplete)
|
||||
shutDownExecutorService();
|
||||
}
|
||||
));
|
||||
} else {
|
||||
log.error("executorService must not be null at shutDown");
|
||||
}
|
||||
|
||||
ListenableFuture<?> future2 = executorService.submit(() -> {
|
||||
long ts = System.currentTimeMillis();
|
||||
log.info("Shutdown torNode");
|
||||
try {
|
||||
long ts = System.currentTimeMillis();
|
||||
log.info("Shutdown torNode");
|
||||
if (torNode != null)
|
||||
torNode.shutdown();
|
||||
log.info("Shutdown torNode done after " + (System.currentTimeMillis() - ts) + " ms.");
|
||||
} catch (IOException e) {
|
||||
} catch (Throwable e) {
|
||||
e.printStackTrace();
|
||||
log.error("Shutdown torNode failed with exception: " + e.getMessage());
|
||||
shutDownExecutorService();
|
||||
@ -156,36 +155,32 @@ public class TorNetworkNode extends NetworkNode {
|
||||
|
||||
private void shutDownExecutorService() {
|
||||
shutDownTimeoutTimer.cancel();
|
||||
ListenableFuture<?> future = executorService.submit(() -> {
|
||||
long ts = System.currentTimeMillis();
|
||||
log.info("Shutdown executorService");
|
||||
Utils.shutDownExecutorService(executorService);
|
||||
log.info("Shutdown executorService done after " + (System.currentTimeMillis() - ts) + " ms.");
|
||||
});
|
||||
Futures.addCallback(future, new FutureCallback<Object>() {
|
||||
@Override
|
||||
public void onSuccess(Object o) {
|
||||
new Thread(() -> {
|
||||
Thread.currentThread().setName("NetworkNode:shutDownExecutorService-" + new Random().nextInt(1000));
|
||||
try {
|
||||
long ts = System.currentTimeMillis();
|
||||
log.info("Shutdown executorService");
|
||||
MoreExecutors.shutdownAndAwaitTermination(executorService, 500, TimeUnit.MILLISECONDS);
|
||||
log.info("Shutdown executorService done after " + (System.currentTimeMillis() - ts) + " ms.");
|
||||
|
||||
log.info("Shutdown completed");
|
||||
UserThread.execute(() -> shutDownCompleteHandler.run());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable throwable) {
|
||||
throwable.printStackTrace();
|
||||
log.error("Shutdown executorService failed with exception: " + throwable.getMessage());
|
||||
} catch (Throwable t) {
|
||||
t.printStackTrace();
|
||||
log.error("Shutdown executorService failed with exception: " + t.getMessage());
|
||||
UserThread.execute(() -> shutDownCompleteHandler.run());
|
||||
}
|
||||
});
|
||||
}).start();
|
||||
}
|
||||
|
||||
private void restartTor() {
|
||||
restartCounter++;
|
||||
if (restartCounter <= MAX_RESTART_ATTEMPTS) {
|
||||
shutDown(() -> {
|
||||
Uninterruptibles.sleepUninterruptibly(WAIT_BEFORE_RESTART, TimeUnit.MILLISECONDS);
|
||||
log.warn("We restart tor as too many self tests failed.");
|
||||
shutDown(() -> Utilities.runTimerTask(() -> {
|
||||
Thread.currentThread().setName("RestartTorTimer-" + new Random().nextInt(1000));
|
||||
log.warn("We restart tor as starting tor failed.");
|
||||
start(null);
|
||||
});
|
||||
}, WAIT_BEFORE_RESTART, TimeUnit.MILLISECONDS));
|
||||
} else {
|
||||
log.error("We tried to restart tor " + restartCounter
|
||||
+ " times, but we failed to get tor running. We give up now.");
|
||||
@ -197,23 +192,26 @@ public class TorNetworkNode extends NetworkNode {
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
private void createTorNode(final File torDir, final Consumer<TorNode> resultHandler) {
|
||||
Callable<TorNode<JavaOnionProxyManager, JavaOnionProxyContext>> task = () -> {
|
||||
Thread.currentThread().setName("CreateTorNode-" + new Random().nextInt(1000));
|
||||
long ts = System.currentTimeMillis();
|
||||
if (torDir.mkdirs())
|
||||
log.trace("Created directory for tor");
|
||||
ListenableFuture<TorNode<JavaOnionProxyManager, JavaOnionProxyContext>> future = executorService.submit(() -> {
|
||||
Thread.currentThread().setName("NetworkNode:CreateTorNode-" + new Random().nextInt(1000));
|
||||
try {
|
||||
long ts = System.currentTimeMillis();
|
||||
if (torDir.mkdirs())
|
||||
log.trace("Created directory for tor");
|
||||
|
||||
log.trace("Create TorNode");
|
||||
TorNode<JavaOnionProxyManager, JavaOnionProxyContext> torNode1 = new TorNode<JavaOnionProxyManager, JavaOnionProxyContext>(
|
||||
torDir) {
|
||||
};
|
||||
log.info("\n\n############################################################\n" +
|
||||
"TorNode created:" +
|
||||
"\nTook " + (System.currentTimeMillis() - ts) + " ms"
|
||||
+ "\n############################################################\n");
|
||||
return torNode1;
|
||||
};
|
||||
ListenableFuture<TorNode<JavaOnionProxyManager, JavaOnionProxyContext>> future = executorService.submit(task);
|
||||
log.trace("Create TorNode");
|
||||
TorNode<JavaOnionProxyManager, JavaOnionProxyContext> torNode1 = new TorNode<JavaOnionProxyManager, JavaOnionProxyContext>(
|
||||
torDir) {
|
||||
};
|
||||
log.info("\n\n############################################################\n" +
|
||||
"TorNode created:" +
|
||||
"\nTook " + (System.currentTimeMillis() - ts) + " ms"
|
||||
+ "\n############################################################\n");
|
||||
return torNode1;
|
||||
} catch (Throwable t) {
|
||||
throw t;
|
||||
}
|
||||
});
|
||||
Futures.addCallback(future, new FutureCallback<TorNode<JavaOnionProxyManager, JavaOnionProxyContext>>() {
|
||||
public void onSuccess(TorNode<JavaOnionProxyManager, JavaOnionProxyContext> torNode) {
|
||||
resultHandler.accept(torNode);
|
||||
@ -228,20 +226,23 @@ public class TorNetworkNode extends NetworkNode {
|
||||
|
||||
private void createHiddenService(final TorNode torNode, final int port,
|
||||
final Consumer<HiddenServiceDescriptor> resultHandler) {
|
||||
Callable<HiddenServiceDescriptor> task = () -> {
|
||||
Thread.currentThread().setName("CreateHiddenService-" + new Random().nextInt(1000));
|
||||
long ts = System.currentTimeMillis();
|
||||
log.debug("Create hidden service");
|
||||
HiddenServiceDescriptor hiddenServiceDescriptor = torNode.createHiddenService(port);
|
||||
log.info("\n\n############################################################\n" +
|
||||
"Hidden service created:" +
|
||||
"\nAddress=" + hiddenServiceDescriptor.getFullAddress() +
|
||||
"\nTook " + (System.currentTimeMillis() - ts) + " ms"
|
||||
+ "\n############################################################\n");
|
||||
ListenableFuture<HiddenServiceDescriptor> future = executorService.submit(() -> {
|
||||
Thread.currentThread().setName("NetworkNode:CreateHiddenService-" + new Random().nextInt(1000));
|
||||
try {
|
||||
long ts = System.currentTimeMillis();
|
||||
log.debug("Create hidden service");
|
||||
HiddenServiceDescriptor hiddenServiceDescriptor = torNode.createHiddenService(port);
|
||||
log.info("\n\n############################################################\n" +
|
||||
"Hidden service created:" +
|
||||
"\nAddress=" + hiddenServiceDescriptor.getFullAddress() +
|
||||
"\nTook " + (System.currentTimeMillis() - ts) + " ms"
|
||||
+ "\n############################################################\n");
|
||||
|
||||
return hiddenServiceDescriptor;
|
||||
};
|
||||
ListenableFuture<HiddenServiceDescriptor> future = executorService.submit(task);
|
||||
return hiddenServiceDescriptor;
|
||||
} catch (Throwable t) {
|
||||
throw t;
|
||||
}
|
||||
});
|
||||
Futures.addCallback(future, new FutureCallback<HiddenServiceDescriptor>() {
|
||||
public void onSuccess(HiddenServiceDescriptor hiddenServiceDescriptor) {
|
||||
resultHandler.accept(hiddenServiceDescriptor);
|
||||
|
@ -1,9 +1,11 @@
|
||||
package io.bitsquare.p2p.routing;
|
||||
|
||||
import com.google.common.util.concurrent.*;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import io.bitsquare.common.UserThread;
|
||||
import io.bitsquare.common.util.Utilities;
|
||||
import io.bitsquare.p2p.Address;
|
||||
import io.bitsquare.p2p.Utils;
|
||||
import io.bitsquare.p2p.network.*;
|
||||
import io.bitsquare.p2p.routing.messages.*;
|
||||
import io.bitsquare.p2p.storage.messages.BroadcastMessage;
|
||||
@ -13,7 +15,10 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class Routing {
|
||||
@ -39,10 +44,9 @@ public class Routing {
|
||||
private final Map<Address, Long> nonceMap = new ConcurrentHashMap<>();
|
||||
private final List<RoutingListener> routingListeners = new CopyOnWriteArrayList<>();
|
||||
private final Map<Address, Peer> authenticatedPeers = new ConcurrentHashMap<>();
|
||||
private final Set<Address> reportedPeerAddresses = Collections.synchronizedSet(new HashSet<>());
|
||||
private final Set<Address> reportedPeerAddresses = new CopyOnWriteArraySet<>();
|
||||
private final Map<Address, Runnable> authenticationCompleteHandlers = new ConcurrentHashMap<>();
|
||||
private final Timer maintenanceTimer = new Timer();
|
||||
private final ExecutorService executorService;
|
||||
private volatile boolean shutDownInProgress;
|
||||
|
||||
|
||||
@ -56,13 +60,6 @@ public class Routing {
|
||||
// We copy it as we remove ourselves later from the list if we are a seed node
|
||||
this.seedNodes = new CopyOnWriteArrayList<>(seeds);
|
||||
|
||||
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
|
||||
.setNameFormat("Routing-%d")
|
||||
.setDaemon(true)
|
||||
.build();
|
||||
|
||||
executorService = new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory);
|
||||
|
||||
init(networkNode);
|
||||
}
|
||||
|
||||
@ -116,7 +113,7 @@ public class Routing {
|
||||
maintenanceTimer.scheduleAtFixedRate(new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
Thread.currentThread().setName("RoutingMaintenanceTimer-" + new Random().nextInt(1000));
|
||||
Thread.currentThread().setName("MaintenanceTimer-" + new Random().nextInt(1000));
|
||||
try {
|
||||
UserThread.execute(() -> {
|
||||
disconnectOldConnections();
|
||||
@ -139,8 +136,11 @@ public class Routing {
|
||||
log.info("Number of connections exceeds MAX_CONNECTIONS. Current size=" + authenticatedConnections.size());
|
||||
Connection connection = authenticatedConnections.remove(0);
|
||||
log.info("Shutdown oldest connection with last activity date=" + connection.getLastActivityDate() + " / connection=" + connection);
|
||||
connection.shutDown(() -> disconnectOldConnections());
|
||||
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
|
||||
|
||||
connection.shutDown(() -> Utilities.runTimerTask(() -> {
|
||||
Thread.currentThread().setName("DelayDisconnectOldConnectionsTimer-" + new Random().nextInt(1000));
|
||||
disconnectOldConnections();
|
||||
}, 1, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
}
|
||||
|
||||
@ -149,7 +149,8 @@ public class Routing {
|
||||
List<Peer> connectedPeersList = new ArrayList<>(authenticatedPeers.values());
|
||||
connectedPeersList.stream()
|
||||
.filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > PING_AFTER_CONNECTION_INACTIVITY)
|
||||
.forEach(e -> {
|
||||
.forEach(e -> Utilities.runTimerTaskWithRandomDelay(() -> {
|
||||
Thread.currentThread().setName("DelayPingPeersTimer-" + new Random().nextInt(1000));
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(e.connection, new PingMessage(e.getPingNonce()));
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
@ -163,8 +164,7 @@ public class Routing {
|
||||
removePeer(e.address);
|
||||
}
|
||||
});
|
||||
Uninterruptibles.sleepUninterruptibly(new Random().nextInt(5000) + 5000, TimeUnit.MILLISECONDS);
|
||||
});
|
||||
}, 5, 10));
|
||||
}
|
||||
|
||||
|
||||
@ -177,8 +177,6 @@ public class Routing {
|
||||
shutDownInProgress = true;
|
||||
if (maintenanceTimer != null)
|
||||
maintenanceTimer.cancel();
|
||||
|
||||
Utils.shutDownExecutorService(executorService);
|
||||
}
|
||||
}
|
||||
|
||||
@ -253,16 +251,7 @@ public class Routing {
|
||||
|
||||
public void startAuthentication(Set<Address> connectedSeedNodes) {
|
||||
connectedSeedNodes.forEach(connectedSeedNode -> {
|
||||
executorService.submit(() -> {
|
||||
try {
|
||||
sendRequestAuthenticationMessage(seedNodes, connectedSeedNode);
|
||||
// give a random pause of 3-5 sec. before using the next
|
||||
Uninterruptibles.sleepUninterruptibly(new Random().nextInt(2000) + 3000, TimeUnit.MILLISECONDS);
|
||||
} catch (Throwable t) {
|
||||
t.printStackTrace();
|
||||
log.error("Executing task failed. " + t.getMessage());
|
||||
}
|
||||
});
|
||||
sendRequestAuthenticationMessage(seedNodes, connectedSeedNode);
|
||||
});
|
||||
}
|
||||
|
||||
@ -319,44 +308,40 @@ public class Routing {
|
||||
RequestAuthenticationMessage requestAuthenticationMessage = (RequestAuthenticationMessage) message;
|
||||
Address peerAddress = requestAuthenticationMessage.address;
|
||||
log.trace("RequestAuthenticationMessage from " + peerAddress + " at " + getAddress());
|
||||
connection.shutDown(() -> {
|
||||
// we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to
|
||||
// inconsistent state (removal of connection from NetworkNode.authenticatedConnections)
|
||||
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
|
||||
|
||||
if (simulateAuthTorNode > 0)
|
||||
Uninterruptibles.sleepUninterruptibly(simulateAuthTorNode, TimeUnit.MILLISECONDS);
|
||||
|
||||
log.trace("processAuthenticationMessage: connection.shutDown complete. RequestAuthenticationMessage from " + peerAddress + " at " + getAddress());
|
||||
long nonce = addToMapAndGetNonce(peerAddress);
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new ChallengeMessage(getAddress(), requestAuthenticationMessage.nonce, nonce));
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
public void onSuccess(Connection connection) {
|
||||
log.debug("onSuccess ");
|
||||
|
||||
// TODO check nr. of connections, remove older connections (?)
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable throwable) {
|
||||
log.debug("onFailure ");
|
||||
// TODO skip to next node or retry?
|
||||
connection.shutDown(() -> Utilities.runTimerTask(() -> {
|
||||
Thread.currentThread().setName("DelaySendChallengeMessageTimer-" + new Random().nextInt(1000));
|
||||
// we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to
|
||||
// inconsistent state (removal of connection from NetworkNode.authenticatedConnections)
|
||||
log.trace("processAuthenticationMessage: connection.shutDown complete. RequestAuthenticationMessage from " + peerAddress + " at " + getAddress());
|
||||
long nonce = addToMapAndGetNonce(peerAddress);
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new ChallengeMessage(getAddress(), requestAuthenticationMessage.nonce, nonce));
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
public void onSuccess(Connection connection) {
|
||||
log.debug("onSuccess ");
|
||||
log.debug("onSuccess sending ChallengeMessage");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable throwable) {
|
||||
log.debug("onFailure ");
|
||||
log.warn("onFailure sending ChallengeMessage. We try again.");
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new ChallengeMessage(getAddress(), requestAuthenticationMessage.nonce, nonce));
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
public void onSuccess(Connection connection) {
|
||||
log.debug("onSuccess sending 2. ChallengeMessage");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable throwable) {
|
||||
log.warn("onFailure sending ChallengeMessage. We give up.");
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
},
|
||||
100 + simulateAuthTorNode,
|
||||
TimeUnit.MILLISECONDS));
|
||||
} else if (message instanceof ChallengeMessage) {
|
||||
ChallengeMessage challengeMessage = (ChallengeMessage) message;
|
||||
Address peerAddress = challengeMessage.address;
|
||||
@ -364,16 +349,13 @@ public class Routing {
|
||||
HashMap<Address, Long> tempNonceMap = new HashMap<>(nonceMap);
|
||||
boolean verified = verifyNonceAndAuthenticatePeerAddress(challengeMessage.requesterNonce, peerAddress);
|
||||
if (verified) {
|
||||
connection.setPeerAddress(peerAddress);
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress,
|
||||
new GetPeersMessage(getAddress(), challengeMessage.challengerNonce, getAllPeerAddresses()));
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
public void onSuccess(Connection connection) {
|
||||
log.trace("GetPeersMessage sent successfully from " + getAddress() + " to " + peerAddress);
|
||||
|
||||
/* // we wait to get the success to reduce the time span of the moment of
|
||||
// authentication at both sides of the connection
|
||||
setAuthenticated(connection, peerAddress);*/
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -480,25 +462,20 @@ public class Routing {
|
||||
}
|
||||
|
||||
private void authenticateToNextRandomPeer() {
|
||||
executorService.submit(() -> {
|
||||
try {
|
||||
Uninterruptibles.sleepUninterruptibly(new Random().nextInt(200) + 200, TimeUnit.MILLISECONDS);
|
||||
if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) {
|
||||
Address randomNotConnectedPeerAddress = getRandomNotConnectedPeerAddress();
|
||||
if (randomNotConnectedPeerAddress != null) {
|
||||
log.info("We try to build an authenticated connection to a random peer. " + randomNotConnectedPeerAddress);
|
||||
authenticateToPeer(randomNotConnectedPeerAddress, null, () -> authenticateToNextRandomPeer());
|
||||
} else {
|
||||
log.info("No more peers available for connecting.");
|
||||
}
|
||||
Utilities.runTimerTaskWithRandomDelay(() -> {
|
||||
Thread.currentThread().setName("DelayAuthenticateToNextRandomPeerTimer-" + new Random().nextInt(1000));
|
||||
if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) {
|
||||
Address randomNotConnectedPeerAddress = getRandomNotConnectedPeerAddress();
|
||||
if (randomNotConnectedPeerAddress != null) {
|
||||
log.info("We try to build an authenticated connection to a random peer. " + randomNotConnectedPeerAddress);
|
||||
authenticateToPeer(randomNotConnectedPeerAddress, null, () -> authenticateToNextRandomPeer());
|
||||
} else {
|
||||
log.info("We have already enough connections.");
|
||||
log.info("No more peers available for connecting.");
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
t.printStackTrace();
|
||||
log.error("Executing task failed. " + t.getMessage());
|
||||
} else {
|
||||
log.info("We have already enough connections.");
|
||||
}
|
||||
});
|
||||
}, 200, 400, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public void authenticateToPeer(Address address, @Nullable Runnable authenticationCompleteHandler, @Nullable Runnable faultHandler) {
|
||||
|
@ -1,5 +1,6 @@
|
||||
package io.bitsquare.p2p.seed;
|
||||
|
||||
import io.bitsquare.common.UserThread;
|
||||
import io.bitsquare.common.crypto.KeyRing;
|
||||
import io.bitsquare.crypto.EncryptionService;
|
||||
import io.bitsquare.p2p.Address;
|
||||
@ -10,7 +11,8 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class SeedNode {
|
||||
private static final Logger log = LoggerFactory.getLogger(SeedNode.class);
|
||||
@ -31,7 +33,6 @@ public class SeedNode {
|
||||
|
||||
// args: port useLocalhost seedNodes
|
||||
// eg. 4444 true localhost:7777 localhost:8888
|
||||
// To stop enter: q
|
||||
public void processArgs(String[] args) {
|
||||
if (args.length > 0) {
|
||||
port = Integer.parseInt(args[0]);
|
||||
@ -49,29 +50,6 @@ public class SeedNode {
|
||||
}
|
||||
}
|
||||
|
||||
public void listenForExitCommand() {
|
||||
Scanner scan = new Scanner(System.in);
|
||||
String line;
|
||||
while (!stopped && ((line = scan.nextLine()) != null)) {
|
||||
if (line.equals("q")) {
|
||||
Timer timeout = new Timer();
|
||||
timeout.schedule(new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
log.error("Timeout occurred at shutDown request");
|
||||
System.exit(1);
|
||||
}
|
||||
}, 10 * 1000);
|
||||
|
||||
shutDown(() -> {
|
||||
timeout.cancel();
|
||||
log.debug("Shutdown seed node complete.");
|
||||
System.exit(0);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void createAndStartP2PService() {
|
||||
createAndStartP2PService(null, null, port, useLocalhost, seedNodes, null);
|
||||
}
|
||||
@ -103,7 +81,7 @@ public class SeedNode {
|
||||
stopped = true;
|
||||
|
||||
p2PService.shutDown(() -> {
|
||||
if (shutDownCompleteHandler != null) new Thread(shutDownCompleteHandler).start();
|
||||
if (shutDownCompleteHandler != null) UserThread.execute(shutDownCompleteHandler);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -56,7 +56,7 @@ public class ProtectedDataStorageTest {
|
||||
dir2.delete();
|
||||
dir2.mkdir();
|
||||
|
||||
UserThread.executor = Executors.newSingleThreadExecutor();
|
||||
UserThread.setExecutor(Executors.newSingleThreadExecutor());
|
||||
ProtectedExpirableDataStorage.CHECK_TTL_INTERVAL = 10 * 60 * 1000;
|
||||
|
||||
keyRing1 = new KeyRing(new KeyStorage(dir1));
|
||||
|
Loading…
Reference in New Issue
Block a user