Merge pull request #6562 from alvasw/move_get_single_thread_executor_to_own_utils

Move SingleThreadExecutors to its own Utils class
This commit is contained in:
Alejandro García 2023-02-05 22:42:02 +00:00 committed by GitHub
commit c61bd70ead
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 88 additions and 56 deletions

View file

@ -26,8 +26,8 @@ import bisq.common.file.FileUtil;
import bisq.common.handlers.ResultHandler; import bisq.common.handlers.ResultHandler;
import bisq.common.proto.persistable.PersistableEnvelope; import bisq.common.proto.persistable.PersistableEnvelope;
import bisq.common.proto.persistable.PersistenceProtoResolver; import bisq.common.proto.persistable.PersistenceProtoResolver;
import bisq.common.util.SingleThreadExecutorUtils;
import bisq.common.util.GcUtil; import bisq.common.util.GcUtil;
import bisq.common.util.Utilities;
import com.google.inject.Inject; import com.google.inject.Inject;
@ -517,7 +517,7 @@ public class PersistenceManager<T extends PersistableEnvelope> {
private ExecutorService getWriteToDiskExecutor() { private ExecutorService getWriteToDiskExecutor() {
if (writeToDiskExecutor == null) { if (writeToDiskExecutor == null) {
String name = "Write-" + fileName + "_to-disk"; String name = "Write-" + fileName + "_to-disk";
writeToDiskExecutor = Utilities.getSingleThreadExecutor(name); writeToDiskExecutor = SingleThreadExecutorUtils.getSingleThreadExecutor(name);
} }
return writeToDiskExecutor; return writeToDiskExecutor;
} }

View file

@ -0,0 +1,62 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.common.util;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class SingleThreadExecutorUtils {
public static ExecutorService getSingleThreadExecutor(Class<?> aClass) {
String name = aClass.getSimpleName();
return getSingleThreadExecutor(name);
}
public static ExecutorService getNonDaemonSingleThreadExecutor(Class<?> aClass) {
String name = aClass.getSimpleName();
return getSingleThreadExecutor(name, false);
}
public static ExecutorService getSingleThreadExecutor(String name) {
return getSingleThreadExecutor(name, true);
}
public static ListeningExecutorService getSingleThreadListeningExecutor(String name) {
return MoreExecutors.listeningDecorator(getSingleThreadExecutor(name));
}
public static ExecutorService getSingleThreadExecutor(ThreadFactory threadFactory) {
return Executors.newSingleThreadExecutor(threadFactory);
}
private static ExecutorService getSingleThreadExecutor(String name, boolean isDaemonThread) {
final ThreadFactory threadFactory = getThreadFactory(name, isDaemonThread);
return Executors.newSingleThreadExecutor(threadFactory);
}
private static ThreadFactory getThreadFactory(String name, boolean isDaemonThread) {
return new ThreadFactoryBuilder()
.setNameFormat(name)
.setDaemon(isDaemonThread)
.build();
}
}

View file

@ -84,45 +84,11 @@ import static com.google.common.base.Preconditions.checkNotNull;
@Slf4j @Slf4j
public class Utilities { public class Utilities {
public static ExecutorService getSingleThreadExecutor(Class<?> aClass) {
String name = aClass.getSimpleName();
return getSingleThreadExecutor(name);
}
public static ExecutorService getNonDaemonSingleThreadExecutor(Class<?> aClass) {
String name = aClass.getSimpleName();
return getSingleThreadExecutor(name, false);
}
public static ExecutorService getSingleThreadExecutor(String name) {
return getSingleThreadExecutor(name, true);
}
private static ExecutorService getSingleThreadExecutor(String name, boolean isDaemonThread) {
final ThreadFactory threadFactory = getThreadFactory(name, isDaemonThread);
return Executors.newSingleThreadExecutor(threadFactory);
}
private static ThreadFactory getThreadFactory(String name, boolean isDaemonThread) {
return new ThreadFactoryBuilder()
.setNameFormat(name)
.setDaemon(isDaemonThread)
.build();
}
public static ExecutorService getSingleThreadExecutor(ThreadFactory threadFactory) {
return Executors.newSingleThreadExecutor(threadFactory);
}
public static ExecutorService getFixedThreadPoolExecutor(int nThreads, public static ExecutorService getFixedThreadPoolExecutor(int nThreads,
ThreadFactory threadFactory) { ThreadFactory threadFactory) {
return Executors.newFixedThreadPool(nThreads, threadFactory); return Executors.newFixedThreadPool(nThreads, threadFactory);
} }
public static ListeningExecutorService getSingleThreadListeningExecutor(String name) {
return MoreExecutors.listeningDecorator(getSingleThreadExecutor(name));
}
public static ListeningExecutorService getListeningExecutorService(String name, public static ListeningExecutorService getListeningExecutorService(String name,
int corePoolSize, int corePoolSize,
int maximumPoolSize, int maximumPoolSize,

View file

@ -49,7 +49,7 @@ import bisq.core.util.coin.CoinFormatter;
import bisq.common.Timer; import bisq.common.Timer;
import bisq.common.UserThread; import bisq.common.UserThread;
import bisq.common.handlers.ResultHandler; import bisq.common.handlers.ResultHandler;
import bisq.common.util.Utilities; import bisq.common.util.SingleThreadExecutorUtils;
import org.bitcoinj.core.Address; import org.bitcoinj.core.Address;
import org.bitcoinj.core.Coin; import org.bitcoinj.core.Coin;
@ -116,7 +116,7 @@ class CoreWalletsService {
@Nullable @Nullable
private KeyParameter tempAesKey; private KeyParameter tempAesKey;
private final ListeningExecutorService executor = Utilities.getSingleThreadListeningExecutor("CoreWalletsService"); private final ListeningExecutorService executor = SingleThreadExecutorUtils.getSingleThreadListeningExecutor("CoreWalletsService");
@Inject @Inject
public CoreWalletsService(AppStartupState appStartupState, public CoreWalletsService(AppStartupState appStartupState,

View file

@ -22,7 +22,7 @@ import bisq.core.payment.TradeLimits;
import bisq.common.UserThread; import bisq.common.UserThread;
import bisq.common.app.AppModule; import bisq.common.app.AppModule;
import bisq.common.app.Version; import bisq.common.app.Version;
import bisq.common.util.Utilities; import bisq.common.util.SingleThreadExecutorUtils;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -60,7 +60,7 @@ public class BisqHeadlessAppMain extends BisqExecutable {
@Override @Override
protected void configUserThread() { protected void configUserThread() {
ExecutorService executorService = Utilities.getSingleThreadExecutor(this.getClass()); ExecutorService executorService = SingleThreadExecutorUtils.getSingleThreadExecutor(this.getClass());
UserThread.setExecutor(executorService); UserThread.setExecutor(executorService);
} }

View file

@ -39,8 +39,8 @@ import bisq.common.file.JsonFileManager;
import bisq.common.handlers.ResultHandler; import bisq.common.handlers.ResultHandler;
import bisq.common.persistence.PersistenceManager; import bisq.common.persistence.PersistenceManager;
import bisq.common.setup.GracefulShutDownHandler; import bisq.common.setup.GracefulShutDownHandler;
import bisq.common.util.SingleThreadExecutorUtils;
import bisq.common.util.Profiler; import bisq.common.util.Profiler;
import bisq.common.util.Utilities;
import java.time.Instant; import java.time.Instant;
import java.time.ZoneId; import java.time.ZoneId;
@ -69,7 +69,7 @@ public abstract class ExecutableForAppWithP2p extends BisqExecutable {
@Override @Override
protected void configUserThread() { protected void configUserThread() {
ExecutorService executorService = Utilities.getSingleThreadExecutor(this.getClass()); ExecutorService executorService = SingleThreadExecutorUtils.getSingleThreadExecutor(this.getClass());
UserThread.setExecutor(executorService); UserThread.setExecutor(executorService);
} }

View file

@ -31,7 +31,7 @@ import bisq.core.user.Preferences;
import bisq.common.UserThread; import bisq.common.UserThread;
import bisq.common.config.Config; import bisq.common.config.Config;
import bisq.common.handlers.ResultHandler; import bisq.common.handlers.ResultHandler;
import bisq.common.util.Utilities; import bisq.common.util.SingleThreadExecutorUtils;
import org.bitcoinj.core.Utils; import org.bitcoinj.core.Utils;
@ -91,7 +91,7 @@ public class RpcService {
// We could use multiple threads, but then we need to support ordering of results in a queue // We could use multiple threads, but then we need to support ordering of results in a queue
// Keep that for optimization after measuring performance differences // Keep that for optimization after measuring performance differences
private final ListeningExecutorService executor = Utilities.getSingleThreadListeningExecutor("RpcService"); private final ListeningExecutorService executor = SingleThreadExecutorUtils.getSingleThreadListeningExecutor("RpcService");
private volatile boolean shutdownInProgress; private volatile boolean shutdownInProgress;
private final Set<ResultHandler> setupResultHandlers = new CopyOnWriteArraySet<>(); private final Set<ResultHandler> setupResultHandlers = new CopyOnWriteArraySet<>();
private final Set<Consumer<Throwable>> setupErrorHandlers = new CopyOnWriteArraySet<>(); private final Set<Consumer<Throwable>> setupErrorHandlers = new CopyOnWriteArraySet<>();

View file

@ -17,6 +17,7 @@
package bisq.core.dao.node.full.rpc; package bisq.core.dao.node.full.rpc;
import bisq.common.util.SingleThreadExecutorUtils;
import bisq.common.util.Utilities; import bisq.common.util.Utilities;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -43,7 +44,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
public class BitcoindDaemon { public class BitcoindDaemon {
private final ListeningExecutorService executor = Utilities.getSingleThreadListeningExecutor("block-notification-server"); private final ListeningExecutorService executor = SingleThreadExecutorUtils.getSingleThreadListeningExecutor("block-notification-server");
private final ListeningExecutorService workerPool = Utilities.getListeningExecutorService("block-notification-worker", private final ListeningExecutorService workerPool = Utilities.getListeningExecutorService("block-notification-worker",
1, 10, 60, new ArrayBlockingQueue<>(100)); 1, 10, 60, new ArrayBlockingQueue<>(100));
private final ServerSocket serverSocket; private final ServerSocket serverSocket;

View file

@ -28,8 +28,8 @@ import bisq.common.UserThread;
import bisq.common.config.Config; import bisq.common.config.Config;
import bisq.common.file.FileUtil; import bisq.common.file.FileUtil;
import bisq.common.persistence.PersistenceManager; import bisq.common.persistence.PersistenceManager;
import bisq.common.util.SingleThreadExecutorUtils;
import bisq.common.util.GcUtil; import bisq.common.util.GcUtil;
import bisq.common.util.Utilities;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Named; import javax.inject.Named;
@ -55,7 +55,7 @@ public class DaoStateStorageService extends StoreService<DaoStateStore> {
private final BsqBlocksStorageService bsqBlocksStorageService; private final BsqBlocksStorageService bsqBlocksStorageService;
private final File storageDir; private final File storageDir;
private final LinkedList<Block> blocks = new LinkedList<>(); private final LinkedList<Block> blocks = new LinkedList<>();
private final ExecutorService executorService = Utilities.getNonDaemonSingleThreadExecutor(this.getClass()); private final ExecutorService executorService = SingleThreadExecutorUtils.getNonDaemonSingleThreadExecutor(this.getClass());
private Optional<Future<?>> future = Optional.empty(); private Optional<Future<?>> future = Optional.empty();

View file

@ -28,7 +28,7 @@ import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreService;
import bisq.common.UserThread; import bisq.common.UserThread;
import bisq.common.config.Config; import bisq.common.config.Config;
import bisq.common.file.FileUtil; import bisq.common.file.FileUtil;
import bisq.common.util.Utilities; import bisq.common.util.SingleThreadExecutorUtils;
import com.google.inject.Inject; import com.google.inject.Inject;
@ -71,7 +71,7 @@ public class TradeStatisticsConverter {
if (!tradeStatistics2Store.exists()) { if (!tradeStatistics2Store.exists()) {
return; return;
} }
executor = Utilities.getSingleThreadExecutor("TradeStatisticsConverter"); executor = SingleThreadExecutorUtils.getSingleThreadExecutor("TradeStatisticsConverter");
executor.submit(() -> { executor.submit(() -> {
// We convert early once tor is initialized but still not ready to receive data // We convert early once tor is initialized but still not ready to receive data
Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> tempMap = new HashMap<>(); Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> tempMap = new HashMap<>();

View file

@ -24,7 +24,7 @@ import bisq.core.app.CoreModule;
import bisq.common.UserThread; import bisq.common.UserThread;
import bisq.common.app.AppModule; import bisq.common.app.AppModule;
import bisq.common.handlers.ResultHandler; import bisq.common.handlers.ResultHandler;
import bisq.common.util.Utilities; import bisq.common.util.SingleThreadExecutorUtils;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -49,7 +49,7 @@ public class BisqDaemonMain extends BisqHeadlessAppMain implements BisqSetup.Bis
@Override @Override
protected void configUserThread() { protected void configUserThread() {
ExecutorService executorService = Utilities.getSingleThreadExecutor(this.getClass()); ExecutorService executorService = SingleThreadExecutorUtils.getSingleThreadExecutor(this.getClass());
UserThread.setExecutor(executorService); UserThread.setExecutor(executorService);
} }

View file

@ -17,6 +17,7 @@
package bisq.network; package bisq.network;
import bisq.common.util.SingleThreadExecutorUtils;
import bisq.common.util.Utilities; import bisq.common.util.Utilities;
import org.bitcoinj.core.NetworkParameters; import org.bitcoinj.core.NetworkParameters;
@ -86,7 +87,7 @@ public class Socks5DnsDiscovery extends MultiplexingDiscovery {
// Attempted workaround for reported bugs on Linux in which gethostbyname does not appear to be properly // Attempted workaround for reported bugs on Linux in which gethostbyname does not appear to be properly
// thread safe and can cause segfaults on some libc versions. // thread safe and can cause segfaults on some libc versions.
if (Utilities.isLinux()) if (Utilities.isLinux())
return Utilities.getSingleThreadExecutor(new ContextPropagatingThreadFactory("DNS seed lookups")); return SingleThreadExecutorUtils.getSingleThreadExecutor(new ContextPropagatingThreadFactory("DNS seed lookups"));
else else
return Utilities.getFixedThreadPoolExecutor(seeds.size(), new DaemonThreadFactory("DNS seed lookups")); return Utilities.getFixedThreadPoolExecutor(seeds.size(), new DaemonThreadFactory("DNS seed lookups"));
} }

View file

@ -40,6 +40,7 @@ import bisq.common.config.Config;
import bisq.common.proto.ProtobufferException; import bisq.common.proto.ProtobufferException;
import bisq.common.proto.network.NetworkEnvelope; import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.proto.network.NetworkProtoResolver; import bisq.common.proto.network.NetworkProtoResolver;
import bisq.common.util.SingleThreadExecutorUtils;
import bisq.common.util.Utilities; import bisq.common.util.Utilities;
import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.InvalidProtocolBufferException;
@ -133,7 +134,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
private final NetworkFilter networkFilter; private final NetworkFilter networkFilter;
@Getter @Getter
private final String uid; private final String uid;
private final ExecutorService singleThreadExecutor = Utilities.getSingleThreadExecutor(runnable -> new Thread(runnable, "Connection.java executor-service")); private final ExecutorService singleThreadExecutor = SingleThreadExecutorUtils.getSingleThreadExecutor(runnable -> new Thread(runnable, "Connection.java executor-service"));
@Getter @Getter
private final Statistic statistic; private final Statistic statistic;
@Getter @Getter

View file

@ -24,6 +24,7 @@ import bisq.common.UserThread;
import bisq.common.app.Capabilities; import bisq.common.app.Capabilities;
import bisq.common.proto.network.NetworkEnvelope; import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.proto.network.NetworkProtoResolver; import bisq.common.proto.network.NetworkProtoResolver;
import bisq.common.util.SingleThreadExecutorUtils;
import bisq.common.util.Utilities; import bisq.common.util.Utilities;
import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy; import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy;
@ -111,7 +112,7 @@ public abstract class NetworkNode implements MessageListener {
maxConnections * 3, maxConnections * 3,
30, 30,
30); 30);
serverExecutor = Utilities.getSingleThreadExecutor("NetworkNode.server-" + servicePort); serverExecutor = SingleThreadExecutorUtils.getSingleThreadExecutor("NetworkNode.server-" + servicePort);
} }
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////

View file

@ -23,7 +23,7 @@ import bisq.network.utils.Utils;
import bisq.common.Timer; import bisq.common.Timer;
import bisq.common.UserThread; import bisq.common.UserThread;
import bisq.common.proto.network.NetworkProtoResolver; import bisq.common.proto.network.NetworkProtoResolver;
import bisq.common.util.Utilities; import bisq.common.util.SingleThreadExecutorUtils;
import org.berndpruenster.netlayer.tor.HiddenServiceSocket; import org.berndpruenster.netlayer.tor.HiddenServiceSocket;
import org.berndpruenster.netlayer.tor.Tor; import org.berndpruenster.netlayer.tor.Tor;
@ -75,7 +75,7 @@ public class TorNetworkNode extends NetworkNode {
this.torMode = torMode; this.torMode = torMode;
this.streamIsolation = useStreamIsolation; this.streamIsolation = useStreamIsolation;
executor = Utilities.getSingleThreadExecutor("StartTor"); executor = SingleThreadExecutorUtils.getSingleThreadExecutor("StartTor");
} }