Use ThreadPoolExecutor with custom set queueCapacity instead of CachedThreadPool

The previously used newCachedThreadPool carries higher risk for execution exceptions if exceeded.

Originally we had only one executor with a corePoolSize of 15 and a maximumPoolSize of 30 and queueCapacity was set to maximumPoolSize.
This was risky when the 15 corePool threads have been busy and new messages or connection creation threads are
queued up with potentially significant delay until getting served leading to timeouts.
Now we use (if maxConnections is 12) corePoolSize of 24, maximumPoolSize 36 and queueCapacity 10. This gives
considerable headroom. We also have split up the executors in 2 distinct ones.

Signed-off-by: HenrikJannsen <boilingfrog@gmx.com>
This commit is contained in:
HenrikJannsen 2022-12-16 17:38:23 -05:00 committed by Christoph Atteneder
parent 9f8b3df1ac
commit 30afccb2d6
No known key found for this signature in database
GPG Key ID: CD5DC1C529CDFD3B
3 changed files with 30 additions and 22 deletions

View File

@ -64,7 +64,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -101,7 +100,15 @@ public class Utilities {
int corePoolSize,
int maximumPoolSize,
long keepAliveTimeInSec) {
return MoreExecutors.listeningDecorator(getThreadPoolExecutor(name, corePoolSize, maximumPoolSize, keepAliveTimeInSec));
return getListeningExecutorService(name, corePoolSize, maximumPoolSize, maximumPoolSize, keepAliveTimeInSec);
}
public static ListeningExecutorService getListeningExecutorService(String name,
int corePoolSize,
int maximumPoolSize,
int queueCapacity,
long keepAliveTimeInSec) {
return MoreExecutors.listeningDecorator(getThreadPoolExecutor(name, corePoolSize, maximumPoolSize, queueCapacity, keepAliveTimeInSec));
}
public static ListeningExecutorService getListeningExecutorService(String name,
@ -116,8 +123,17 @@ public class Utilities {
int corePoolSize,
int maximumPoolSize,
long keepAliveTimeInSec) {
return getThreadPoolExecutor(name, corePoolSize, maximumPoolSize, maximumPoolSize, keepAliveTimeInSec);
}
public static ThreadPoolExecutor getThreadPoolExecutor(String name,
int corePoolSize,
int maximumPoolSize,
int queueCapacity,
long keepAliveTimeInSec) {
return getThreadPoolExecutor(name, corePoolSize, maximumPoolSize, keepAliveTimeInSec,
new ArrayBlockingQueue<>(maximumPoolSize));
new ArrayBlockingQueue<>(queueCapacity));
}
private static ThreadPoolExecutor getThreadPoolExecutor(String name,
@ -135,22 +151,6 @@ public class Utilities {
return executor;
}
public static ExecutorService newCachedThreadPool(String name,
int maximumPoolSize,
long keepAliveTime,
TimeUnit timeUnit) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(name + "-%d")
.setDaemon(true)
.build();
return new ThreadPoolExecutor(0,
maximumPoolSize,
keepAliveTime,
timeUnit,
new SynchronousQueue<>(),
threadFactory);
}
@SuppressWarnings("SameParameterValue")
public static ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor(String name,
int corePoolSize,

View File

@ -100,8 +100,16 @@ public abstract class NetworkNode implements MessageListener {
this.networkProtoResolver = networkProtoResolver;
this.networkFilter = networkFilter;
connectionExecutor = MoreExecutors.listeningDecorator(Utilities.newCachedThreadPool("NetworkNode.connection", maxConnections * 2, 1, TimeUnit.MINUTES));
sendMessageExecutor = MoreExecutors.listeningDecorator(Utilities.newCachedThreadPool("NetworkNode.sendMessage", maxConnections * 2, 3, TimeUnit.MINUTES));
connectionExecutor = Utilities.getListeningExecutorService("NetworkNode.connection",
maxConnections * 2,
maxConnections * 3,
10,
60);
sendMessageExecutor = Utilities.getListeningExecutorService("NetworkNode.sendMessage",
maxConnections * 2,
maxConnections * 3,
10,
60);
serverExecutor = Utilities.getSingleThreadExecutor("NetworkNode.server-" + servicePort);
}

View File

@ -118,7 +118,7 @@ public class SeedNodeReportingService {
// The pool size must be larger as the expected parallel sends because HttpClient use it
// internally for asynchronous and dependent tasks.
executor = Utilities.newCachedThreadPool("SeedNodeReportingService", 20, 8, TimeUnit.MINUTES);
executor = Utilities.getThreadPoolExecutor("SeedNodeReportingService", 20, 40, 100, 8 * 60);
httpClient = HttpClient.newBuilder().executor(executor).build();
heartBeatTimer = UserThread.runPeriodically(this::sendHeartBeat, HEART_BEAT_DELAY_SEC);