Use AtomicBoolean for stopped and timeoutTriggered

Signed-off-by: HenrikJannsen <boilingfrog@gmx.com>
This commit is contained in:
HenrikJannsen 2023-01-04 12:58:22 -05:00
parent 803a58eab7
commit 41fb5e464c
No known key found for this signature in database
GPG Key ID: 02AA2BAE387C8307

View File

@ -37,6 +37,8 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -72,8 +74,12 @@ public class BroadcastHandler implements PeerManager.Listener {
private final ResultHandler resultHandler; private final ResultHandler resultHandler;
private final String uid; private final String uid;
private boolean stopped, timeoutTriggered; private final AtomicBoolean stopped = new AtomicBoolean();
private int numOfCompletedBroadcasts, numOfFailedBroadcasts, numPeersForBroadcast; private final AtomicBoolean timeoutTriggered = new AtomicBoolean();
private final AtomicInteger numOfCompletedBroadcasts = new AtomicInteger();
private final AtomicInteger numOfFailedBroadcasts = new AtomicInteger();
private final AtomicInteger numPeersForBroadcast = new AtomicInteger();
private Timer timeoutTimer; private Timer timeoutTimer;
@ -105,29 +111,29 @@ public class BroadcastHandler implements PeerManager.Listener {
if (shutDownRequested) { if (shutDownRequested) {
delay = 1; delay = 1;
// We sent to all peers as in case we had offers we want that it gets removed with higher reliability // We sent to all peers as in case we had offers we want that it gets removed with higher reliability
numPeersForBroadcast = confirmedConnections.size(); numPeersForBroadcast.set(confirmedConnections.size());
} else { } else {
if (requestsContainOwnMessage(broadcastRequests)) { if (requestsContainOwnMessage(broadcastRequests)) {
// The broadcastRequests contains at least 1 message we have originated, so we send to all peers and // The broadcastRequests contains at least 1 message we have originated, so we send to all peers and
// with shorter delay // with shorter delay
numPeersForBroadcast = confirmedConnections.size(); numPeersForBroadcast.set(confirmedConnections.size());
delay = 50; delay = 50;
} else { } else {
// Relay nodes only send to max 7 peers and with longer delay // Relay nodes only send to max 7 peers and with longer delay
numPeersForBroadcast = Math.min(7, confirmedConnections.size()); numPeersForBroadcast.set(Math.min(7, confirmedConnections.size()));
delay = 100; delay = 100;
} }
} }
setupTimeoutHandler(broadcastRequests, delay, shutDownRequested); setupTimeoutHandler(broadcastRequests, delay, shutDownRequested);
int iterations = numPeersForBroadcast; int iterations = numPeersForBroadcast.get();
for (int i = 0; i < iterations; i++) { for (int i = 0; i < iterations; i++) {
long minDelay = (i + 1) * delay; long minDelay = (i + 1) * delay;
long maxDelay = (i + 2) * delay; long maxDelay = (i + 2) * delay;
Connection connection = confirmedConnections.get(i); Connection connection = confirmedConnections.get(i);
UserThread.runAfterRandomDelay(() -> { UserThread.runAfterRandomDelay(() -> {
if (stopped) { if (stopped.get()) {
return; return;
} }
@ -139,8 +145,8 @@ public class BroadcastHandler implements PeerManager.Listener {
// Could be empty list... // Could be empty list...
if (broadcastRequestsForConnection.isEmpty()) { if (broadcastRequestsForConnection.isEmpty()) {
// We decrease numPeers in that case for making completion checks correct. // We decrease numPeers in that case for making completion checks correct.
if (numPeersForBroadcast > 0) { if (numPeersForBroadcast.get() > 0) {
numPeersForBroadcast--; numPeersForBroadcast.decrementAndGet();
} }
checkForCompletion(); checkForCompletion();
return; return;
@ -149,8 +155,8 @@ public class BroadcastHandler implements PeerManager.Listener {
if (connection.isStopped()) { if (connection.isStopped()) {
// Connection has died in the meantime. We skip it. // Connection has died in the meantime. We skip it.
// We decrease numPeers in that case for making completion checks correct. // We decrease numPeers in that case for making completion checks correct.
if (numPeersForBroadcast > 0) { if (numPeersForBroadcast.get() > 0) {
numPeersForBroadcast--; numPeersForBroadcast.decrementAndGet();
} }
checkForCompletion(); checkForCompletion();
return; return;
@ -162,7 +168,7 @@ public class BroadcastHandler implements PeerManager.Listener {
} }
public void cancel() { public void cancel() {
stopped = true; stopped.set(true);
cleanup(); cleanup();
} }
@ -203,13 +209,13 @@ public class BroadcastHandler implements PeerManager.Listener {
boolean shutDownRequested) { boolean shutDownRequested) {
// In case of shutdown we try to complete fast and set a short 1 second timeout // In case of shutdown we try to complete fast and set a short 1 second timeout
long baseTimeoutMs = shutDownRequested ? TimeUnit.SECONDS.toMillis(1) : BASE_TIMEOUT_MS; long baseTimeoutMs = shutDownRequested ? TimeUnit.SECONDS.toMillis(1) : BASE_TIMEOUT_MS;
long timeoutDelay = baseTimeoutMs + delay * (numPeersForBroadcast + 1); // We added 1 in the loop long timeoutDelay = baseTimeoutMs + delay * (numPeersForBroadcast.get() + 1); // We added 1 in the loop
timeoutTimer = UserThread.runAfter(() -> { timeoutTimer = UserThread.runAfter(() -> {
if (stopped) { if (stopped.get()) {
return; return;
} }
timeoutTriggered = true; timeoutTriggered.set(true);
log.warn("Broadcast did not complete after {} sec.\n" + log.warn("Broadcast did not complete after {} sec.\n" +
"numPeersForBroadcast={}\n" + "numPeersForBroadcast={}\n" +
@ -248,9 +254,9 @@ public class BroadcastHandler implements PeerManager.Listener {
Futures.addCallback(future, new FutureCallback<>() { Futures.addCallback(future, new FutureCallback<>() {
@Override @Override
public void onSuccess(Connection connection) { public void onSuccess(Connection connection) {
numOfCompletedBroadcasts++; numOfCompletedBroadcasts.incrementAndGet();
if (stopped) { if (stopped.get()) {
return; return;
} }
@ -262,9 +268,9 @@ public class BroadcastHandler implements PeerManager.Listener {
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
log.warn("Broadcast to {} failed. ErrorMessage={}", connection.getPeersNodeAddressOptional(), log.warn("Broadcast to {} failed. ErrorMessage={}", connection.getPeersNodeAddressOptional(),
throwable.getMessage()); throwable.getMessage());
numOfFailedBroadcasts++; numOfFailedBroadcasts.incrementAndGet();
if (stopped) { if (stopped.get()) {
return; return;
} }
@ -286,9 +292,9 @@ public class BroadcastHandler implements PeerManager.Listener {
} }
private void maybeNotifyListeners(List<Broadcaster.BroadcastRequest> broadcastRequests) { private void maybeNotifyListeners(List<Broadcaster.BroadcastRequest> broadcastRequests) {
int numOfCompletedBroadcastsTarget = Math.max(1, Math.min(numPeersForBroadcast, 3)); int numOfCompletedBroadcastsTarget = Math.max(1, Math.min(numPeersForBroadcast.get(), 3));
// We use equal checks to avoid duplicated listener calls as it would be the case with >= checks. // We use equal checks to avoid duplicated listener calls as it would be the case with >= checks.
if (numOfCompletedBroadcasts == numOfCompletedBroadcastsTarget) { if (numOfCompletedBroadcasts.get() == numOfCompletedBroadcastsTarget) {
// We have heard back from 3 peers (or all peers if numPeers is lower) so we consider the message was sufficiently broadcast. // We have heard back from 3 peers (or all peers if numPeers is lower) so we consider the message was sufficiently broadcast.
broadcastRequests.stream() broadcastRequests.stream()
.filter(broadcastRequest -> broadcastRequest.getListener() != null) .filter(broadcastRequest -> broadcastRequest.getListener() != null)
@ -297,28 +303,28 @@ public class BroadcastHandler implements PeerManager.Listener {
} else { } else {
// We check if number of open requests to peers is less than we need to reach numOfCompletedBroadcastsTarget. // We check if number of open requests to peers is less than we need to reach numOfCompletedBroadcastsTarget.
// Thus we never can reach required resilience as too many numOfFailedBroadcasts occurred. // Thus we never can reach required resilience as too many numOfFailedBroadcasts occurred.
int maxPossibleSuccessCases = numPeersForBroadcast - numOfFailedBroadcasts; int maxPossibleSuccessCases = numPeersForBroadcast.get() - numOfFailedBroadcasts.get();
// We subtract 1 as we want to have it called only once, with a < comparision we would trigger repeatedly. // We subtract 1 as we want to have it called only once, with a < comparision we would trigger repeatedly.
boolean notEnoughSucceededOrOpen = maxPossibleSuccessCases == numOfCompletedBroadcastsTarget - 1; boolean notEnoughSucceededOrOpen = maxPossibleSuccessCases == numOfCompletedBroadcastsTarget - 1;
// We did not reach resilience level and timeout prevents to reach it later // We did not reach resilience level and timeout prevents to reach it later
boolean timeoutAndNotEnoughSucceeded = timeoutTriggered && numOfCompletedBroadcasts < numOfCompletedBroadcastsTarget; boolean timeoutAndNotEnoughSucceeded = timeoutTriggered.get() && numOfCompletedBroadcasts.get() < numOfCompletedBroadcastsTarget;
if (notEnoughSucceededOrOpen || timeoutAndNotEnoughSucceeded) { if (notEnoughSucceededOrOpen || timeoutAndNotEnoughSucceeded) {
broadcastRequests.stream() broadcastRequests.stream()
.filter(broadcastRequest -> broadcastRequest.getListener() != null) .filter(broadcastRequest -> broadcastRequest.getListener() != null)
.map(Broadcaster.BroadcastRequest::getListener) .map(Broadcaster.BroadcastRequest::getListener)
.forEach(listener -> listener.onNotSufficientlyBroadcast(numOfCompletedBroadcasts, numOfFailedBroadcasts)); .forEach(listener -> listener.onNotSufficientlyBroadcast(numOfCompletedBroadcasts.get(), numOfFailedBroadcasts.get()));
} }
} }
} }
private void checkForCompletion() { private void checkForCompletion() {
if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeersForBroadcast) { if (numOfCompletedBroadcasts.get() + numOfFailedBroadcasts.get() == numPeersForBroadcast.get()) {
cleanup(); cleanup();
} }
} }
private void cleanup() { private void cleanup() {
stopped = true; stopped.set(true);
if (timeoutTimer != null) { if (timeoutTimer != null) {
timeoutTimer.stop(); timeoutTimer.stop();
timeoutTimer = null; timeoutTimer = null;