Orchid: better thread tracking

also, fix race condition in closing circuits on shutdown
This commit is contained in:
Devrandom 2014-08-24 14:15:50 -07:00 committed by Mike Hearn
parent de3665f734
commit f57c3a857c
6 changed files with 82 additions and 7 deletions

View File

@ -6,6 +6,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -52,7 +53,15 @@ public class CircuitCreationTask implements Runnable {
this.circuitManager = circuitManager;
this.initializationTracker = initializationTracker;
this.pathChooser = pathChooser;
this.executor = Executors.newCachedThreadPool();
this.executor = Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("CircuitCreationTask worker");
t.setDaemon(true);
return t;
}
});
this.buildHandler = createCircuitBuildHandler();
this.internalBuildHandler = createInternalCircuitBuildHandler();
this.predictor = new CircuitPredictor();

View File

@ -11,6 +11,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
@ -59,13 +60,23 @@ public class CircuitManagerImpl implements CircuitManager, DashboardRenderable {
private int pendingInternalCircuitCount = 0;
private final TorRandom random;
private final PendingExitStreams pendingExitStreams;
private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("CircuitManager worker");
t.setDaemon(true);
return t;
}
});
private final CircuitCreationTask circuitCreationTask;
private final TorInitializationTracker initializationTracker;
private final CircuitPathChooser pathChooser;
private final HiddenServiceManager hiddenServiceManager;
private final ReentrantLock lock = Threading.lock("circuitManager");
private boolean isBuilding = false;
public CircuitManagerImpl(TorConfig config, DirectoryDownloaderImpl directoryDownloader, Directory directory, ConnectionCache connectionCache, TorInitializationTracker initializationTracker) {
this.config = config;
this.directory = directory;
@ -87,13 +98,20 @@ public class CircuitManagerImpl implements CircuitManager, DashboardRenderable {
}
public void startBuildingCircuits() {
scheduledExecutor.scheduleAtFixedRate(circuitCreationTask, 0, 1000, TimeUnit.MILLISECONDS);
lock.lock();
try {
isBuilding = true;
scheduledExecutor.scheduleAtFixedRate(circuitCreationTask, 0, 1000, TimeUnit.MILLISECONDS);
} finally {
lock.unlock();
}
}
public void stopBuildingCircuits(boolean killCircuits) {
lock.lock();
try {
isBuilding = false;
scheduledExecutor.shutdownNow();
if (killCircuits) {
List<CircuitImpl> circuits = new ArrayList<CircuitImpl>(activeCircuits);
@ -111,6 +129,16 @@ public class CircuitManagerImpl implements CircuitManager, DashboardRenderable {
}
void addActiveCircuit(CircuitImpl circuit) {
lock.lock();
try {
if (!isBuilding) {
circuit.destroyCircuit();
}
} finally {
lock.unlock();
}
synchronized (activeCircuits) {
activeCircuits.add(circuit);
activeCircuits.notifyAll();

View File

@ -7,6 +7,8 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
@ -47,7 +49,15 @@ public class EntryGuards {
this.pendingProbes = new HashSet<GuardEntry>();
this.bridges = new Bridges(config, directoryDownloader);
this.lock = new Object();
this.executor = Executors.newCachedThreadPool();
this.executor = Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("EntryGuards worker");
t.setDaemon(true);
return t;
}
});
}
public boolean isUsingBridges() {

View File

@ -8,6 +8,7 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.logging.Logger;
import com.subgraph.orchid.data.IPv4Address;
@ -37,7 +38,15 @@ public class Dashboard implements DashboardRenderable, DashboardRenderer {
public Dashboard() {
renderables = new CopyOnWriteArrayList<DashboardRenderable>();
renderables.add(this);
executor = Executors.newCachedThreadPool();
executor = Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("Dashboard worker");
t.setDaemon(true);
return t;
}
});
listeningPort = chooseListeningPort();
}

View File

@ -6,6 +6,7 @@ import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
@ -23,7 +24,16 @@ public abstract class DescriptorCache <T extends Descriptor> {
private final DescriptorCacheData<T> data;
private final DirectoryStore store;
private final ScheduledExecutorService rebuildExecutor = Executors.newScheduledThreadPool(1);
private final ScheduledExecutorService rebuildExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("DescriptorCache rebuild worker");
t.setDaemon(true);
return t;
}
});
private final CacheFile cacheFile;
private final CacheFile journalFile;

View File

@ -7,6 +7,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
@ -31,7 +32,15 @@ public class DirectoryDownloadTask implements Runnable {
private final TorRandom random;
private final DescriptorProcessor descriptorProcessor;
private final ExecutorService executor = Executors.newCachedThreadPool();
private final ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("DirectoryDownloadTask worker");
t.setDaemon(true);
return t;
}
});
private volatile boolean isDownloadingCertificates;
private volatile boolean isDownloadingConsensus;