Maintain pending futures and cancel them at cleanup.

Signed-off-by: HenrikJannsen <boilingfrog@gmx.com>
This commit is contained in:
HenrikJannsen 2023-01-04 13:55:20 -05:00
parent 3e48956227
commit 5e29bfe4c2
No known key found for this signature in database
GPG Key ID: 02AA2BAE387C8307

View File

@ -35,7 +35,10 @@ import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -44,6 +47,7 @@ import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@Slf4j @Slf4j
public class BroadcastHandler implements PeerManager.Listener { public class BroadcastHandler implements PeerManager.Listener {
@ -79,8 +83,9 @@ public class BroadcastHandler implements PeerManager.Listener {
private final AtomicInteger numOfCompletedBroadcasts = new AtomicInteger(); private final AtomicInteger numOfCompletedBroadcasts = new AtomicInteger();
private final AtomicInteger numOfFailedBroadcasts = new AtomicInteger(); private final AtomicInteger numOfFailedBroadcasts = new AtomicInteger();
private final AtomicInteger numPeersForBroadcast = new AtomicInteger(); private final AtomicInteger numPeersForBroadcast = new AtomicInteger();
@Nullable
private Timer timeoutTimer; private Timer timeoutTimer;
private final Set<SettableFuture<Connection>> sendMessageFutures = new CopyOnWriteArraySet<>();
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -104,6 +109,10 @@ public class BroadcastHandler implements PeerManager.Listener {
public void broadcast(List<Broadcaster.BroadcastRequest> broadcastRequests, public void broadcast(List<Broadcaster.BroadcastRequest> broadcastRequests,
boolean shutDownRequested, boolean shutDownRequested,
ListeningExecutorService executor) { ListeningExecutorService executor) {
if (broadcastRequests.isEmpty()) {
return;
}
List<Connection> confirmedConnections = new ArrayList<>(networkNode.getConfirmedConnections()); List<Connection> confirmedConnections = new ArrayList<>(networkNode.getConfirmedConnections());
Collections.shuffle(confirmedConnections); Collections.shuffle(confirmedConnections);
@ -162,7 +171,12 @@ public class BroadcastHandler implements PeerManager.Listener {
return; return;
} }
try {
sendToPeer(connection, broadcastRequestsForConnection, executor); sendToPeer(connection, broadcastRequestsForConnection, executor);
} catch (RejectedExecutionException e) {
log.error("RejectedExecutionException at broadcast ", e);
cleanup();
}
}, minDelay, maxDelay, TimeUnit.MILLISECONDS); }, minDelay, maxDelay, TimeUnit.MILLISECONDS);
} }
} }
@ -250,7 +264,7 @@ public class BroadcastHandler implements PeerManager.Listener {
// Can be BundleOfEnvelopes or a single BroadcastMessage // Can be BundleOfEnvelopes or a single BroadcastMessage
BroadcastMessage broadcastMessage = getMessage(broadcastRequestsForConnection); BroadcastMessage broadcastMessage = getMessage(broadcastRequestsForConnection);
SettableFuture<Connection> future = networkNode.sendMessage(connection, broadcastMessage, executor); SettableFuture<Connection> future = networkNode.sendMessage(connection, broadcastMessage, executor);
sendMessageFutures.add(future);
Futures.addCallback(future, new FutureCallback<>() { Futures.addCallback(future, new FutureCallback<>() {
@Override @Override
public void onSuccess(Connection connection) { public void onSuccess(Connection connection) {
@ -324,11 +338,22 @@ public class BroadcastHandler implements PeerManager.Listener {
} }
private void cleanup() { private void cleanup() {
if (stopped.get()) {
return;
}
stopped.set(true); stopped.set(true);
if (timeoutTimer != null) { if (timeoutTimer != null) {
timeoutTimer.stop(); timeoutTimer.stop();
timeoutTimer = null; timeoutTimer = null;
} }
sendMessageFutures.stream()
.filter(future -> !future.isCancelled() && !future.isDone())
.forEach(future -> future.cancel(true));
sendMessageFutures.clear();
peerManager.removeListener(this); peerManager.removeListener(this);
resultHandler.onCompleted(this); resultHandler.onCompleted(this);
} }