Add custom thread pool to broadcaster

The broadcasting consumes most of the threads but has lower priority than other messages being sent.
By separating that thread pool from the common sendMessage executor we can reduce the risk that a burst of
broadcasts exhausts the thread pool and might drop send message tasks.

Signed-off-by: HenrikJannsen <boilingfrog@gmx.com>
This commit is contained in:
HenrikJannsen 2022-12-27 17:33:39 -05:00
parent d5b65fe239
commit a8a0c0e725
No known key found for this signature in database
GPG key ID: 02AA2BAE387C8307
2 changed files with 28 additions and 6 deletions

View file

@ -28,6 +28,7 @@ import bisq.common.UserThread;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
@ -94,7 +95,9 @@ public class BroadcastHandler implements PeerManager.Listener {
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void broadcast(List<Broadcaster.BroadcastRequest> broadcastRequests, boolean shutDownRequested) {
public void broadcast(List<Broadcaster.BroadcastRequest> broadcastRequests,
boolean shutDownRequested,
ListeningExecutorService executor) {
List<Connection> confirmedConnections = new ArrayList<>(networkNode.getConfirmedConnections());
Collections.shuffle(confirmedConnections);
@ -153,7 +156,7 @@ public class BroadcastHandler implements PeerManager.Listener {
return;
}
sendToPeer(connection, broadcastRequestsForConnection);
sendToPeer(connection, broadcastRequestsForConnection, executor);
}, minDelay, maxDelay, TimeUnit.MILLISECONDS);
}
}
@ -235,10 +238,12 @@ public class BroadcastHandler implements PeerManager.Listener {
.collect(Collectors.toList());
}
private void sendToPeer(Connection connection, List<Broadcaster.BroadcastRequest> broadcastRequestsForConnection) {
private void sendToPeer(Connection connection,
List<Broadcaster.BroadcastRequest> broadcastRequestsForConnection,
ListeningExecutorService executor) {
// Can be BundleOfEnvelopes or a single BroadcastMessage
BroadcastMessage broadcastMessage = getMessage(broadcastRequestsForConnection);
SettableFuture<Connection> future = networkNode.sendMessage(connection, broadcastMessage);
SettableFuture<Connection> future = networkNode.sendMessage(connection, broadcastMessage, executor);
Futures.addCallback(future, new FutureCallback<>() {
@Override

View file

@ -23,13 +23,20 @@ import bisq.network.p2p.storage.messages.BroadcastMessage;
import bisq.common.Timer;
import bisq.common.UserThread;
import bisq.common.config.Config;
import bisq.common.util.Utilities;
import javax.inject.Inject;
import javax.inject.Named;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -49,6 +56,7 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
private Timer timer;
private boolean shutDownRequested;
private Runnable shutDownResultHandler;
private final ListeningExecutorService executor;
///////////////////////////////////////////////////////////////////////////////////////////
@ -56,9 +64,18 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
///////////////////////////////////////////////////////////////////////////////////////////
@Inject
public Broadcaster(NetworkNode networkNode, PeerManager peerManager) {
public Broadcaster(NetworkNode networkNode,
PeerManager peerManager,
@Named(Config.MAX_CONNECTIONS) int maxConnections) {
this.networkNode = networkNode;
this.peerManager = peerManager;
ThreadPoolExecutor threadPoolExecutor = Utilities.getThreadPoolExecutor("Broadcaster",
maxConnections,
maxConnections * 2,
30,
30);
executor = MoreExecutors.listeningDecorator(threadPoolExecutor);
}
public void shutDown(Runnable resultHandler) {
@ -119,7 +136,7 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
broadcastRequests.stream().map(e -> e.getMessage().getClass().getSimpleName()).collect(Collectors.toList()));
BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager, this);
broadcastHandlers.add(broadcastHandler);
broadcastHandler.broadcast(new ArrayList<>(broadcastRequests), shutDownRequested);
broadcastHandler.broadcast(new ArrayList<>(broadcastRequests), shutDownRequested, executor);
broadcastRequests.clear();
if (timer != null) {