Move SingleThreadExecutors to its own Utils class

This commit is contained in:
Alva Swanson 2023-02-05 14:07:11 +01:00
parent f15a5ded64
commit 53837bf00b
No known key found for this signature in database
GPG Key ID: 004760E77F753090
15 changed files with 88 additions and 56 deletions

View File

@ -26,8 +26,8 @@ import bisq.common.file.FileUtil;
import bisq.common.handlers.ResultHandler;
import bisq.common.proto.persistable.PersistableEnvelope;
import bisq.common.proto.persistable.PersistenceProtoResolver;
import bisq.common.util.SingleThreadExecutorUtils;
import bisq.common.util.GcUtil;
import bisq.common.util.Utilities;
import com.google.inject.Inject;
@ -517,7 +517,7 @@ public class PersistenceManager<T extends PersistableEnvelope> {
private ExecutorService getWriteToDiskExecutor() {
if (writeToDiskExecutor == null) {
String name = "Write-" + fileName + "_to-disk";
writeToDiskExecutor = Utilities.getSingleThreadExecutor(name);
writeToDiskExecutor = SingleThreadExecutorUtils.getSingleThreadExecutor(name);
}
return writeToDiskExecutor;
}

View File

@ -0,0 +1,62 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.common.util;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class SingleThreadExecutorUtils {
public static ExecutorService getSingleThreadExecutor(Class<?> aClass) {
String name = aClass.getSimpleName();
return getSingleThreadExecutor(name);
}
public static ExecutorService getNonDaemonSingleThreadExecutor(Class<?> aClass) {
String name = aClass.getSimpleName();
return getSingleThreadExecutor(name, false);
}
public static ExecutorService getSingleThreadExecutor(String name) {
return getSingleThreadExecutor(name, true);
}
public static ListeningExecutorService getSingleThreadListeningExecutor(String name) {
return MoreExecutors.listeningDecorator(getSingleThreadExecutor(name));
}
public static ExecutorService getSingleThreadExecutor(ThreadFactory threadFactory) {
return Executors.newSingleThreadExecutor(threadFactory);
}
private static ExecutorService getSingleThreadExecutor(String name, boolean isDaemonThread) {
final ThreadFactory threadFactory = getThreadFactory(name, isDaemonThread);
return Executors.newSingleThreadExecutor(threadFactory);
}
private static ThreadFactory getThreadFactory(String name, boolean isDaemonThread) {
return new ThreadFactoryBuilder()
.setNameFormat(name)
.setDaemon(isDaemonThread)
.build();
}
}

View File

@ -84,45 +84,11 @@ import static com.google.common.base.Preconditions.checkNotNull;
@Slf4j
public class Utilities {
public static ExecutorService getSingleThreadExecutor(Class<?> aClass) {
String name = aClass.getSimpleName();
return getSingleThreadExecutor(name);
}
public static ExecutorService getNonDaemonSingleThreadExecutor(Class<?> aClass) {
String name = aClass.getSimpleName();
return getSingleThreadExecutor(name, false);
}
public static ExecutorService getSingleThreadExecutor(String name) {
return getSingleThreadExecutor(name, true);
}
private static ExecutorService getSingleThreadExecutor(String name, boolean isDaemonThread) {
final ThreadFactory threadFactory = getThreadFactory(name, isDaemonThread);
return Executors.newSingleThreadExecutor(threadFactory);
}
private static ThreadFactory getThreadFactory(String name, boolean isDaemonThread) {
return new ThreadFactoryBuilder()
.setNameFormat(name)
.setDaemon(isDaemonThread)
.build();
}
public static ExecutorService getSingleThreadExecutor(ThreadFactory threadFactory) {
return Executors.newSingleThreadExecutor(threadFactory);
}
public static ExecutorService getFixedThreadPoolExecutor(int nThreads,
ThreadFactory threadFactory) {
return Executors.newFixedThreadPool(nThreads, threadFactory);
}
public static ListeningExecutorService getSingleThreadListeningExecutor(String name) {
return MoreExecutors.listeningDecorator(getSingleThreadExecutor(name));
}
public static ListeningExecutorService getListeningExecutorService(String name,
int corePoolSize,
int maximumPoolSize,

View File

@ -49,7 +49,7 @@ import bisq.core.util.coin.CoinFormatter;
import bisq.common.Timer;
import bisq.common.UserThread;
import bisq.common.handlers.ResultHandler;
import bisq.common.util.Utilities;
import bisq.common.util.SingleThreadExecutorUtils;
import org.bitcoinj.core.Address;
import org.bitcoinj.core.Coin;
@ -116,7 +116,7 @@ class CoreWalletsService {
@Nullable
private KeyParameter tempAesKey;
private final ListeningExecutorService executor = Utilities.getSingleThreadListeningExecutor("CoreWalletsService");
private final ListeningExecutorService executor = SingleThreadExecutorUtils.getSingleThreadListeningExecutor("CoreWalletsService");
@Inject
public CoreWalletsService(AppStartupState appStartupState,

View File

@ -22,7 +22,7 @@ import bisq.core.payment.TradeLimits;
import bisq.common.UserThread;
import bisq.common.app.AppModule;
import bisq.common.app.Version;
import bisq.common.util.Utilities;
import bisq.common.util.SingleThreadExecutorUtils;
import java.util.concurrent.ExecutorService;
@ -60,7 +60,7 @@ public class BisqHeadlessAppMain extends BisqExecutable {
@Override
protected void configUserThread() {
ExecutorService executorService = Utilities.getSingleThreadExecutor(this.getClass());
ExecutorService executorService = SingleThreadExecutorUtils.getSingleThreadExecutor(this.getClass());
UserThread.setExecutor(executorService);
}

View File

@ -39,8 +39,8 @@ import bisq.common.file.JsonFileManager;
import bisq.common.handlers.ResultHandler;
import bisq.common.persistence.PersistenceManager;
import bisq.common.setup.GracefulShutDownHandler;
import bisq.common.util.SingleThreadExecutorUtils;
import bisq.common.util.Profiler;
import bisq.common.util.Utilities;
import java.time.Instant;
import java.time.ZoneId;
@ -69,7 +69,7 @@ public abstract class ExecutableForAppWithP2p extends BisqExecutable {
@Override
protected void configUserThread() {
ExecutorService executorService = Utilities.getSingleThreadExecutor(this.getClass());
ExecutorService executorService = SingleThreadExecutorUtils.getSingleThreadExecutor(this.getClass());
UserThread.setExecutor(executorService);
}

View File

@ -31,7 +31,7 @@ import bisq.core.user.Preferences;
import bisq.common.UserThread;
import bisq.common.config.Config;
import bisq.common.handlers.ResultHandler;
import bisq.common.util.Utilities;
import bisq.common.util.SingleThreadExecutorUtils;
import org.bitcoinj.core.Utils;
@ -91,7 +91,7 @@ public class RpcService {
// We could use multiple threads, but then we need to support ordering of results in a queue
// Keep that for optimization after measuring performance differences
private final ListeningExecutorService executor = Utilities.getSingleThreadListeningExecutor("RpcService");
private final ListeningExecutorService executor = SingleThreadExecutorUtils.getSingleThreadListeningExecutor("RpcService");
private volatile boolean shutdownInProgress;
private final Set<ResultHandler> setupResultHandlers = new CopyOnWriteArraySet<>();
private final Set<Consumer<Throwable>> setupErrorHandlers = new CopyOnWriteArraySet<>();

View File

@ -17,6 +17,7 @@
package bisq.core.dao.node.full.rpc;
import bisq.common.util.SingleThreadExecutorUtils;
import bisq.common.util.Utilities;
import com.google.common.annotations.VisibleForTesting;
@ -43,7 +44,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class BitcoindDaemon {
private final ListeningExecutorService executor = Utilities.getSingleThreadListeningExecutor("block-notification-server");
private final ListeningExecutorService executor = SingleThreadExecutorUtils.getSingleThreadListeningExecutor("block-notification-server");
private final ListeningExecutorService workerPool = Utilities.getListeningExecutorService("block-notification-worker",
1, 10, 60, new ArrayBlockingQueue<>(100));
private final ServerSocket serverSocket;

View File

@ -28,8 +28,8 @@ import bisq.common.UserThread;
import bisq.common.config.Config;
import bisq.common.file.FileUtil;
import bisq.common.persistence.PersistenceManager;
import bisq.common.util.SingleThreadExecutorUtils;
import bisq.common.util.GcUtil;
import bisq.common.util.Utilities;
import javax.inject.Inject;
import javax.inject.Named;
@ -55,7 +55,7 @@ public class DaoStateStorageService extends StoreService<DaoStateStore> {
private final BsqBlocksStorageService bsqBlocksStorageService;
private final File storageDir;
private final LinkedList<Block> blocks = new LinkedList<>();
private final ExecutorService executorService = Utilities.getNonDaemonSingleThreadExecutor(this.getClass());
private final ExecutorService executorService = SingleThreadExecutorUtils.getNonDaemonSingleThreadExecutor(this.getClass());
private Optional<Future<?>> future = Optional.empty();

View File

@ -28,7 +28,7 @@ import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreService;
import bisq.common.UserThread;
import bisq.common.config.Config;
import bisq.common.file.FileUtil;
import bisq.common.util.Utilities;
import bisq.common.util.SingleThreadExecutorUtils;
import com.google.inject.Inject;
@ -71,7 +71,7 @@ public class TradeStatisticsConverter {
if (!tradeStatistics2Store.exists()) {
return;
}
executor = Utilities.getSingleThreadExecutor("TradeStatisticsConverter");
executor = SingleThreadExecutorUtils.getSingleThreadExecutor("TradeStatisticsConverter");
executor.submit(() -> {
// We convert early once tor is initialized but still not ready to receive data
Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> tempMap = new HashMap<>();

View File

@ -24,7 +24,7 @@ import bisq.core.app.CoreModule;
import bisq.common.UserThread;
import bisq.common.app.AppModule;
import bisq.common.handlers.ResultHandler;
import bisq.common.util.Utilities;
import bisq.common.util.SingleThreadExecutorUtils;
import java.util.concurrent.ExecutorService;
@ -49,7 +49,7 @@ public class BisqDaemonMain extends BisqHeadlessAppMain implements BisqSetup.Bis
@Override
protected void configUserThread() {
ExecutorService executorService = Utilities.getSingleThreadExecutor(this.getClass());
ExecutorService executorService = SingleThreadExecutorUtils.getSingleThreadExecutor(this.getClass());
UserThread.setExecutor(executorService);
}

View File

@ -17,6 +17,7 @@
package bisq.network;
import bisq.common.util.SingleThreadExecutorUtils;
import bisq.common.util.Utilities;
import org.bitcoinj.core.NetworkParameters;
@ -86,7 +87,7 @@ public class Socks5DnsDiscovery extends MultiplexingDiscovery {
// Attempted workaround for reported bugs on Linux in which gethostbyname does not appear to be properly
// thread safe and can cause segfaults on some libc versions.
if (Utilities.isLinux())
return Utilities.getSingleThreadExecutor(new ContextPropagatingThreadFactory("DNS seed lookups"));
return SingleThreadExecutorUtils.getSingleThreadExecutor(new ContextPropagatingThreadFactory("DNS seed lookups"));
else
return Utilities.getFixedThreadPoolExecutor(seeds.size(), new DaemonThreadFactory("DNS seed lookups"));
}

View File

@ -40,6 +40,7 @@ import bisq.common.config.Config;
import bisq.common.proto.ProtobufferException;
import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.proto.network.NetworkProtoResolver;
import bisq.common.util.SingleThreadExecutorUtils;
import bisq.common.util.Utilities;
import com.google.protobuf.InvalidProtocolBufferException;
@ -133,7 +134,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
private final NetworkFilter networkFilter;
@Getter
private final String uid;
private final ExecutorService singleThreadExecutor = Utilities.getSingleThreadExecutor(runnable -> new Thread(runnable, "Connection.java executor-service"));
private final ExecutorService singleThreadExecutor = SingleThreadExecutorUtils.getSingleThreadExecutor(runnable -> new Thread(runnable, "Connection.java executor-service"));
@Getter
private final Statistic statistic;
@Getter

View File

@ -24,6 +24,7 @@ import bisq.common.UserThread;
import bisq.common.app.Capabilities;
import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.proto.network.NetworkProtoResolver;
import bisq.common.util.SingleThreadExecutorUtils;
import bisq.common.util.Utilities;
import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy;
@ -111,7 +112,7 @@ public abstract class NetworkNode implements MessageListener {
maxConnections * 3,
30,
30);
serverExecutor = Utilities.getSingleThreadExecutor("NetworkNode.server-" + servicePort);
serverExecutor = SingleThreadExecutorUtils.getSingleThreadExecutor("NetworkNode.server-" + servicePort);
}
///////////////////////////////////////////////////////////////////////////////////////////

View File

@ -23,7 +23,7 @@ import bisq.network.utils.Utils;
import bisq.common.Timer;
import bisq.common.UserThread;
import bisq.common.proto.network.NetworkProtoResolver;
import bisq.common.util.Utilities;
import bisq.common.util.SingleThreadExecutorUtils;
import org.berndpruenster.netlayer.tor.HiddenServiceSocket;
import org.berndpruenster.netlayer.tor.Tor;
@ -75,7 +75,7 @@ public class TorNetworkNode extends NetworkNode {
this.torMode = torMode;
this.streamIsolation = useStreamIsolation;
executor = Utilities.getSingleThreadExecutor("StartTor");
executor = SingleThreadExecutorUtils.getSingleThreadExecutor("StartTor");
}