Merge pull request #6468 from HenrikJannsen/change_threadpool

Use ThreadPoolExecutor with custom set queueCapacity instead of CachedThreadPool
This commit is contained in:
sqrrm 2022-12-19 16:42:47 +01:00 committed by GitHub
commit c76ee16a2b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 30 additions and 22 deletions

View file

@ -64,7 +64,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -101,7 +100,15 @@ public class Utilities {
int corePoolSize,
int maximumPoolSize,
long keepAliveTimeInSec) {
return MoreExecutors.listeningDecorator(getThreadPoolExecutor(name, corePoolSize, maximumPoolSize, keepAliveTimeInSec));
return getListeningExecutorService(name, corePoolSize, maximumPoolSize, maximumPoolSize, keepAliveTimeInSec);
}
public static ListeningExecutorService getListeningExecutorService(String name,
int corePoolSize,
int maximumPoolSize,
int queueCapacity,
long keepAliveTimeInSec) {
return MoreExecutors.listeningDecorator(getThreadPoolExecutor(name, corePoolSize, maximumPoolSize, queueCapacity, keepAliveTimeInSec));
}
public static ListeningExecutorService getListeningExecutorService(String name,
@ -116,8 +123,17 @@ public class Utilities {
int corePoolSize,
int maximumPoolSize,
long keepAliveTimeInSec) {
return getThreadPoolExecutor(name, corePoolSize, maximumPoolSize, maximumPoolSize, keepAliveTimeInSec);
}
public static ThreadPoolExecutor getThreadPoolExecutor(String name,
int corePoolSize,
int maximumPoolSize,
int queueCapacity,
long keepAliveTimeInSec) {
return getThreadPoolExecutor(name, corePoolSize, maximumPoolSize, keepAliveTimeInSec,
new ArrayBlockingQueue<>(maximumPoolSize));
new ArrayBlockingQueue<>(queueCapacity));
}
private static ThreadPoolExecutor getThreadPoolExecutor(String name,
@ -135,22 +151,6 @@ public class Utilities {
return executor;
}
public static ExecutorService newCachedThreadPool(String name,
int maximumPoolSize,
long keepAliveTime,
TimeUnit timeUnit) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(name + "-%d")
.setDaemon(true)
.build();
return new ThreadPoolExecutor(0,
maximumPoolSize,
keepAliveTime,
timeUnit,
new SynchronousQueue<>(),
threadFactory);
}
@SuppressWarnings("SameParameterValue")
public static ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor(String name,
int corePoolSize,

View file

@ -100,8 +100,16 @@ public abstract class NetworkNode implements MessageListener {
this.networkProtoResolver = networkProtoResolver;
this.networkFilter = networkFilter;
connectionExecutor = MoreExecutors.listeningDecorator(Utilities.newCachedThreadPool("NetworkNode.connection", maxConnections * 2, 1, TimeUnit.MINUTES));
sendMessageExecutor = MoreExecutors.listeningDecorator(Utilities.newCachedThreadPool("NetworkNode.sendMessage", maxConnections * 2, 3, TimeUnit.MINUTES));
connectionExecutor = Utilities.getListeningExecutorService("NetworkNode.connection",
maxConnections * 2,
maxConnections * 3,
10,
60);
sendMessageExecutor = Utilities.getListeningExecutorService("NetworkNode.sendMessage",
maxConnections * 2,
maxConnections * 3,
10,
60);
serverExecutor = Utilities.getSingleThreadExecutor("NetworkNode.server-" + servicePort);
}

View file

@ -118,7 +118,7 @@ public class SeedNodeReportingService {
// The pool size must be larger as the expected parallel sends because HttpClient use it
// internally for asynchronous and dependent tasks.
executor = Utilities.newCachedThreadPool("SeedNodeReportingService", 20, 8, TimeUnit.MINUTES);
executor = Utilities.getThreadPoolExecutor("SeedNodeReportingService", 20, 40, 100, 8 * 60);
httpClient = HttpClient.newBuilder().executor(executor).build();
heartBeatTimer = UserThread.runPeriodically(this::sendHeartBeat, HEART_BEAT_DELAY_SEC);