Use AtomicBoolean for stopped and timeoutTriggered

Signed-off-by: HenrikJannsen <boilingfrog@gmx.com>
This commit is contained in:
HenrikJannsen 2023-01-04 12:58:22 -05:00 committed by Alejandro García
parent 19629ab088
commit cb635809e0
No known key found for this signature in database
GPG Key ID: F806F422E222AA02

View File

@ -37,6 +37,8 @@ import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@ -72,8 +74,12 @@ public class BroadcastHandler implements PeerManager.Listener {
private final ResultHandler resultHandler;
private final String uid;
private boolean stopped, timeoutTriggered;
private int numOfCompletedBroadcasts, numOfFailedBroadcasts, numPeersForBroadcast;
private final AtomicBoolean stopped = new AtomicBoolean();
private final AtomicBoolean timeoutTriggered = new AtomicBoolean();
private final AtomicInteger numOfCompletedBroadcasts = new AtomicInteger();
private final AtomicInteger numOfFailedBroadcasts = new AtomicInteger();
private final AtomicInteger numPeersForBroadcast = new AtomicInteger();
private Timer timeoutTimer;
@ -105,29 +111,29 @@ public class BroadcastHandler implements PeerManager.Listener {
if (shutDownRequested) {
delay = 1;
// We sent to all peers as in case we had offers we want that it gets removed with higher reliability
numPeersForBroadcast = confirmedConnections.size();
numPeersForBroadcast.set(confirmedConnections.size());
} else {
if (requestsContainOwnMessage(broadcastRequests)) {
// The broadcastRequests contains at least 1 message we have originated, so we send to all peers and
// with shorter delay
numPeersForBroadcast = confirmedConnections.size();
numPeersForBroadcast.set(confirmedConnections.size());
delay = 50;
} else {
// Relay nodes only send to max 7 peers and with longer delay
numPeersForBroadcast = Math.min(7, confirmedConnections.size());
numPeersForBroadcast.set(Math.min(7, confirmedConnections.size()));
delay = 100;
}
}
setupTimeoutHandler(broadcastRequests, delay, shutDownRequested);
int iterations = numPeersForBroadcast;
int iterations = numPeersForBroadcast.get();
for (int i = 0; i < iterations; i++) {
long minDelay = (i + 1) * delay;
long maxDelay = (i + 2) * delay;
Connection connection = confirmedConnections.get(i);
UserThread.runAfterRandomDelay(() -> {
if (stopped) {
if (stopped.get()) {
return;
}
@ -139,8 +145,8 @@ public class BroadcastHandler implements PeerManager.Listener {
// Could be empty list...
if (broadcastRequestsForConnection.isEmpty()) {
// We decrease numPeers in that case for making completion checks correct.
if (numPeersForBroadcast > 0) {
numPeersForBroadcast--;
if (numPeersForBroadcast.get() > 0) {
numPeersForBroadcast.decrementAndGet();
}
checkForCompletion();
return;
@ -149,8 +155,8 @@ public class BroadcastHandler implements PeerManager.Listener {
if (connection.isStopped()) {
// Connection has died in the meantime. We skip it.
// We decrease numPeers in that case for making completion checks correct.
if (numPeersForBroadcast > 0) {
numPeersForBroadcast--;
if (numPeersForBroadcast.get() > 0) {
numPeersForBroadcast.decrementAndGet();
}
checkForCompletion();
return;
@ -162,7 +168,7 @@ public class BroadcastHandler implements PeerManager.Listener {
}
public void cancel() {
stopped = true;
stopped.set(true);
cleanup();
}
@ -203,13 +209,13 @@ public class BroadcastHandler implements PeerManager.Listener {
boolean shutDownRequested) {
// In case of shutdown we try to complete fast and set a short 1 second timeout
long baseTimeoutMs = shutDownRequested ? TimeUnit.SECONDS.toMillis(1) : BASE_TIMEOUT_MS;
long timeoutDelay = baseTimeoutMs + delay * (numPeersForBroadcast + 1); // We added 1 in the loop
long timeoutDelay = baseTimeoutMs + delay * (numPeersForBroadcast.get() + 1); // We added 1 in the loop
timeoutTimer = UserThread.runAfter(() -> {
if (stopped) {
if (stopped.get()) {
return;
}
timeoutTriggered = true;
timeoutTriggered.set(true);
log.warn("Broadcast did not complete after {} sec.\n" +
"numPeersForBroadcast={}\n" +
@ -248,9 +254,9 @@ public class BroadcastHandler implements PeerManager.Listener {
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
numOfCompletedBroadcasts++;
numOfCompletedBroadcasts.incrementAndGet();
if (stopped) {
if (stopped.get()) {
return;
}
@ -262,9 +268,9 @@ public class BroadcastHandler implements PeerManager.Listener {
public void onFailure(@NotNull Throwable throwable) {
log.warn("Broadcast to {} failed. ErrorMessage={}", connection.getPeersNodeAddressOptional(),
throwable.getMessage());
numOfFailedBroadcasts++;
numOfFailedBroadcasts.incrementAndGet();
if (stopped) {
if (stopped.get()) {
return;
}
@ -286,9 +292,9 @@ public class BroadcastHandler implements PeerManager.Listener {
}
private void maybeNotifyListeners(List<Broadcaster.BroadcastRequest> broadcastRequests) {
int numOfCompletedBroadcastsTarget = Math.max(1, Math.min(numPeersForBroadcast, 3));
int numOfCompletedBroadcastsTarget = Math.max(1, Math.min(numPeersForBroadcast.get(), 3));
// We use equal checks to avoid duplicated listener calls as it would be the case with >= checks.
if (numOfCompletedBroadcasts == numOfCompletedBroadcastsTarget) {
if (numOfCompletedBroadcasts.get() == numOfCompletedBroadcastsTarget) {
// We have heard back from 3 peers (or all peers if numPeers is lower) so we consider the message was sufficiently broadcast.
broadcastRequests.stream()
.filter(broadcastRequest -> broadcastRequest.getListener() != null)
@ -297,28 +303,28 @@ public class BroadcastHandler implements PeerManager.Listener {
} else {
// We check if number of open requests to peers is less than we need to reach numOfCompletedBroadcastsTarget.
// Thus we never can reach required resilience as too many numOfFailedBroadcasts occurred.
int maxPossibleSuccessCases = numPeersForBroadcast - numOfFailedBroadcasts;
int maxPossibleSuccessCases = numPeersForBroadcast.get() - numOfFailedBroadcasts.get();
// We subtract 1 as we want to have it called only once, with a < comparision we would trigger repeatedly.
boolean notEnoughSucceededOrOpen = maxPossibleSuccessCases == numOfCompletedBroadcastsTarget - 1;
// We did not reach resilience level and timeout prevents to reach it later
boolean timeoutAndNotEnoughSucceeded = timeoutTriggered && numOfCompletedBroadcasts < numOfCompletedBroadcastsTarget;
boolean timeoutAndNotEnoughSucceeded = timeoutTriggered.get() && numOfCompletedBroadcasts.get() < numOfCompletedBroadcastsTarget;
if (notEnoughSucceededOrOpen || timeoutAndNotEnoughSucceeded) {
broadcastRequests.stream()
.filter(broadcastRequest -> broadcastRequest.getListener() != null)
.map(Broadcaster.BroadcastRequest::getListener)
.forEach(listener -> listener.onNotSufficientlyBroadcast(numOfCompletedBroadcasts, numOfFailedBroadcasts));
.forEach(listener -> listener.onNotSufficientlyBroadcast(numOfCompletedBroadcasts.get(), numOfFailedBroadcasts.get()));
}
}
}
private void checkForCompletion() {
if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeersForBroadcast) {
if (numOfCompletedBroadcasts.get() + numOfFailedBroadcasts.get() == numPeersForBroadcast.get()) {
cleanup();
}
}
private void cleanup() {
stopped = true;
stopped.set(true);
if (timeoutTimer != null) {
timeoutTimer.stop();
timeoutTimer = null;