diff --git a/p2p/src/main/java/bisq/network/p2p/BundleOfEnvelopes.java b/p2p/src/main/java/bisq/network/p2p/BundleOfEnvelopes.java index c57e0d530e..61756c732d 100644 --- a/p2p/src/main/java/bisq/network/p2p/BundleOfEnvelopes.java +++ b/p2p/src/main/java/bisq/network/p2p/BundleOfEnvelopes.java @@ -17,6 +17,7 @@ package bisq.network.p2p; +import bisq.network.p2p.storage.messages.BroadcastMessage; import bisq.network.p2p.storage.payload.CapabilityRequiringPayload; import bisq.common.app.Capabilities; @@ -36,7 +37,7 @@ import lombok.Value; @EqualsAndHashCode(callSuper = true) @Value -public final class BundleOfEnvelopes extends NetworkEnvelope implements ExtendedDataSizePermission, CapabilityRequiringPayload { +public final class BundleOfEnvelopes extends BroadcastMessage implements ExtendedDataSizePermission, CapabilityRequiringPayload { private final List envelopes; diff --git a/p2p/src/main/java/bisq/network/p2p/P2PService.java b/p2p/src/main/java/bisq/network/p2p/P2PService.java index 1849fcb3da..571ddd666e 100644 --- a/p2p/src/main/java/bisq/network/p2p/P2PService.java +++ b/p2p/src/main/java/bisq/network/p2p/P2PService.java @@ -211,48 +211,48 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis } public void shutDown(Runnable shutDownCompleteHandler) { - if (!shutDownInProgress) { - shutDownInProgress = true; + shutDownResultHandlers.add(shutDownCompleteHandler); - shutDownResultHandlers.add(shutDownCompleteHandler); - - if (p2PDataStorage != null) - p2PDataStorage.shutDown(); - - if (peerManager != null) - peerManager.shutDown(); - - if (broadcaster != null) - broadcaster.shutDown(); - - if (requestDataManager != null) - requestDataManager.shutDown(); - - if (peerExchangeManager != null) - peerExchangeManager.shutDown(); - - if (keepAliveManager != null) - keepAliveManager.shutDown(); - - if (networkReadySubscription != null) - networkReadySubscription.unsubscribe(); - - if (networkNode != null) { - networkNode.shutDown(() -> { - shutDownResultHandlers.stream().forEach(Runnable::run); - shutDownComplete = true; - }); - } else { - shutDownResultHandlers.stream().forEach(Runnable::run); - shutDownComplete = true; - } + // We need to make sure queued up messages are flushed out before we continue shut down other network + // services + if (broadcaster != null) { + broadcaster.shutDown(this::doShutDown); } else { - log.debug("shutDown already in progress"); - if (shutDownComplete) { - shutDownCompleteHandler.run(); - } else { - shutDownResultHandlers.add(shutDownCompleteHandler); - } + doShutDown(); + } + } + + private void doShutDown() { + if (p2PDataStorage != null) { + p2PDataStorage.shutDown(); + } + + if (peerManager != null) { + peerManager.shutDown(); + } + + if (requestDataManager != null) { + requestDataManager.shutDown(); + } + + if (peerExchangeManager != null) { + peerExchangeManager.shutDown(); + } + + if (keepAliveManager != null) { + keepAliveManager.shutDown(); + } + + if (networkReadySubscription != null) { + networkReadySubscription.unsubscribe(); + } + + if (networkNode != null) { + networkNode.shutDown(() -> { + shutDownResultHandlers.forEach(Runnable::run); + }); + } else { + shutDownResultHandlers.forEach(Runnable::run); } } @@ -447,7 +447,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis @Override public void onRemoved(Collection protectedStorageEntries) { - // not handled + // not used } /////////////////////////////////////////////////////////////////////////////////////////// diff --git a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java index 7ffe569360..cc5b45bc52 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java @@ -21,10 +21,10 @@ import bisq.network.p2p.BundleOfEnvelopes; import bisq.network.p2p.NodeAddress; import bisq.network.p2p.network.Connection; import bisq.network.p2p.network.NetworkNode; +import bisq.network.p2p.storage.messages.BroadcastMessage; import bisq.common.Timer; import bisq.common.UserThread; -import bisq.common.proto.network.NetworkEnvelope; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -71,7 +71,7 @@ public class BroadcastHandler implements PeerManager.Listener { private final String uid; private boolean stopped, timeoutTriggered; - private int numOfCompletedBroadcasts, numOfFailedBroadcasts, numPeers; + private int numOfCompletedBroadcasts, numOfFailedBroadcasts, numPeersForBroadcast; private Timer timeoutTimer; @@ -93,25 +93,31 @@ public class BroadcastHandler implements PeerManager.Listener { // API /////////////////////////////////////////////////////////////////////////////////////////// - public void broadcast(List broadcastRequests) { + public void broadcast(List broadcastRequests, boolean shutDownRequested) { List confirmedConnections = new ArrayList<>(networkNode.getConfirmedConnections()); Collections.shuffle(confirmedConnections); int delay; - if (requestsContainOwnMessage(broadcastRequests)) { - // The broadcastRequests contains at least 1 message we have originated, so we send to all peers and - // with shorter delay - numPeers = confirmedConnections.size(); - delay = 50; + if (shutDownRequested) { + delay = 1; + // We sent to all peers as in case we had offers we want that it gets removed with higher reliability + numPeersForBroadcast = confirmedConnections.size(); } else { - // Relay nodes only send to max 7 peers and with longer delay - numPeers = Math.min(7, confirmedConnections.size()); - delay = 100; + if (requestsContainOwnMessage(broadcastRequests)) { + // The broadcastRequests contains at least 1 message we have originated, so we send to all peers and + // with shorter delay + numPeersForBroadcast = confirmedConnections.size(); + delay = 50; + } else { + // Relay nodes only send to max 7 peers and with longer delay + numPeersForBroadcast = Math.min(7, confirmedConnections.size()); + delay = 100; + } } - setupTimeoutHandler(broadcastRequests, delay); + setupTimeoutHandler(broadcastRequests, delay, shutDownRequested); - int iterations = numPeers; + int iterations = numPeersForBroadcast; for (int i = 0; i < iterations; i++) { long minDelay = (i + 1) * delay; long maxDelay = (i + 2) * delay; @@ -129,8 +135,8 @@ public class BroadcastHandler implements PeerManager.Listener { // Could be empty list... if (broadcastRequestsForConnection.isEmpty()) { // We decrease numPeers in that case for making completion checks correct. - if (numPeers > 0) { - numPeers--; + if (numPeersForBroadcast > 0) { + numPeersForBroadcast--; } checkForCompletion(); return; @@ -139,8 +145,8 @@ public class BroadcastHandler implements PeerManager.Listener { if (connection.isStopped()) { // Connection has died in the meantime. We skip it. // We decrease numPeers in that case for making completion checks correct. - if (numPeers > 0) { - numPeers--; + if (numPeersForBroadcast > 0) { + numPeersForBroadcast--; } checkForCompletion(); return; @@ -188,8 +194,12 @@ public class BroadcastHandler implements PeerManager.Listener { return broadcastRequests.stream().anyMatch(e -> myAddress.equals(e.getSender())); } - private void setupTimeoutHandler(List broadcastRequests, int delay) { - long timeoutDelay = BASE_TIMEOUT_MS + delay * (numPeers + 1); // We added 1 in the loop + private void setupTimeoutHandler(List broadcastRequests, + int delay, + boolean shutDownRequested) { + // In case of shutdown we try to complete fast and set a short 1 second timeout + long baseTimeoutMs = shutDownRequested ? TimeUnit.SECONDS.toMillis(1) : BASE_TIMEOUT_MS; + long timeoutDelay = baseTimeoutMs + delay * (numPeersForBroadcast + 1); // We added 1 in the loop timeoutTimer = UserThread.runAfter(() -> { if (stopped) { return; @@ -197,18 +207,20 @@ public class BroadcastHandler implements PeerManager.Listener { timeoutTriggered = true; - String errorMessage = "Timeout: Broadcast did not complete after " + timeoutDelay + " sec." + "\n" + - "numOfPeers=" + numPeers + "\n" + - "numOfCompletedBroadcasts=" + numOfCompletedBroadcasts + "\n" + - "numOfFailedBroadcasts=" + numOfFailedBroadcasts; - - log.warn(errorMessage); + log.warn("Broadcast did not complete after {} sec.\n" + + "numPeersForBroadcast={}\n" + + "numOfCompletedBroadcasts={}\n" + + "numOfFailedBroadcasts={}", + timeoutDelay / 1000d, + numPeersForBroadcast, + numOfCompletedBroadcasts, + numOfFailedBroadcasts); maybeNotifyListeners(broadcastRequests); cleanup(); - }, timeoutDelay); + }, timeoutDelay, TimeUnit.MILLISECONDS); } // We exclude the requests containing a message we received from that connection @@ -223,8 +235,9 @@ public class BroadcastHandler implements PeerManager.Listener { } private void sendToPeer(Connection connection, List broadcastRequestsForConnection) { - NetworkEnvelope networkEnvelope = getNetworkEnvelope(broadcastRequestsForConnection); - SettableFuture future = networkNode.sendMessage(connection, networkEnvelope); + // Can be BundleOfEnvelopes or a single BroadcastMessage + BroadcastMessage broadcastMessage = getMessage(broadcastRequestsForConnection); + SettableFuture future = networkNode.sendMessage(connection, broadcastMessage); Futures.addCallback(future, new FutureCallback<>() { @Override @@ -236,7 +249,6 @@ public class BroadcastHandler implements PeerManager.Listener { } maybeNotifyListeners(broadcastRequestsForConnection); - checkForCompletion(); } @@ -244,7 +256,6 @@ public class BroadcastHandler implements PeerManager.Listener { public void onFailure(@NotNull Throwable throwable) { log.warn("Broadcast to {} failed. ErrorMessage={}", connection.getPeersNodeAddressOptional(), throwable.getMessage()); - numOfFailedBroadcasts++; if (stopped) { @@ -252,13 +263,12 @@ public class BroadcastHandler implements PeerManager.Listener { } maybeNotifyListeners(broadcastRequestsForConnection); - checkForCompletion(); } }); } - private NetworkEnvelope getNetworkEnvelope(List broadcastRequests) { + private BroadcastMessage getMessage(List broadcastRequests) { if (broadcastRequests.size() == 1) { // If we only have 1 message we avoid the overhead of the BundleOfEnvelopes and send the message directly return broadcastRequests.get(0).getMessage(); @@ -270,22 +280,26 @@ public class BroadcastHandler implements PeerManager.Listener { } private void maybeNotifyListeners(List broadcastRequests) { + int numOfCompletedBroadcastsTarget = Math.max(1, Math.min(numPeersForBroadcast, 3)); // We use equal checks to avoid duplicated listener calls as it would be the case with >= checks. - if (numOfCompletedBroadcasts == 3) { - // We have heard back from 3 peers so we consider the message was sufficiently broadcast. + if (numOfCompletedBroadcasts == numOfCompletedBroadcastsTarget) { + // We have heard back from 3 peers (or all peers if numPeers is lower) so we consider the message was sufficiently broadcast. broadcastRequests.stream() .filter(broadcastRequest -> broadcastRequest.getListener() != null) .map(Broadcaster.BroadcastRequest::getListener) .forEach(listener -> listener.onSufficientlyBroadcast(broadcastRequests)); } else { - int maxPossibleSuccessCases = numPeers - numOfFailedBroadcasts; - if (maxPossibleSuccessCases == 2) { - // We never can reach required resilience as too many numOfFailedBroadcasts occurred. + // Number of open requests to peers is less than we need to reach numOfCompletedBroadcastsTarget. + // Thus we never can reach required resilience as too many numOfFailedBroadcasts occurred. + int openRequests = numPeersForBroadcast - numOfCompletedBroadcasts - numOfFailedBroadcasts; + int maxPossibleSuccessCases = openRequests + numOfCompletedBroadcasts; + // We subtract 1 as we want to have it called only once, with a < comparision we would trigger repeatedly. + if (maxPossibleSuccessCases == numOfCompletedBroadcastsTarget - 1) { broadcastRequests.stream() .filter(broadcastRequest -> broadcastRequest.getListener() != null) .map(Broadcaster.BroadcastRequest::getListener) .forEach(listener -> listener.onNotSufficientlyBroadcast(numOfCompletedBroadcasts, numOfFailedBroadcasts)); - } else if (timeoutTriggered && numOfCompletedBroadcasts < 3) { + } else if (timeoutTriggered && numOfCompletedBroadcasts < numOfCompletedBroadcastsTarget) { // We did not reach resilience level and timeout prevents to reach it later broadcastRequests.stream() .filter(broadcastRequest -> broadcastRequest.getListener() != null) @@ -296,7 +310,7 @@ public class BroadcastHandler implements PeerManager.Listener { } private void checkForCompletion() { - if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeers) { + if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeersForBroadcast) { cleanup(); } } diff --git a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java index a0ef75257a..d1288ffe94 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import lombok.Value; import lombok.extern.slf4j.Slf4j; @@ -46,6 +47,8 @@ public class Broadcaster implements BroadcastHandler.ResultHandler { private final Set broadcastHandlers = new CopyOnWriteArraySet<>(); private final List broadcastRequests = new ArrayList<>(); private Timer timer; + private boolean shutDownRequested; + private Runnable shutDownResultHandler; /////////////////////////////////////////////////////////////////////////////////////////// @@ -58,12 +61,24 @@ public class Broadcaster implements BroadcastHandler.ResultHandler { this.peerManager = peerManager; } - public void shutDown() { - broadcastHandlers.forEach(BroadcastHandler::cancel); + public void shutDown(Runnable resultHandler) { + shutDownRequested = true; + shutDownResultHandler = resultHandler; + if (broadcastRequests.isEmpty()) { + doShutDown(); + } else { + // We set delay of broadcasts and timeout to very low values, + // so we can expect that we get onCompleted called very fast and trigger the doShutDown from there. + maybeBroadcastBundle(); + } + } + private void doShutDown() { + broadcastHandlers.forEach(BroadcastHandler::cancel); if (timer != null) { timer.stop(); } + shutDownResultHandler.run(); } @@ -81,6 +96,8 @@ public class Broadcaster implements BroadcastHandler.ResultHandler { @Nullable NodeAddress sender, @Nullable BroadcastHandler.Listener listener) { broadcastRequests.add(new BroadcastRequest(message, sender, listener)); + // Keep that log on INFO for better debugging if the feature works as expected. Later it can + // be remove or set to DEBUG log.info("Broadcast requested for {}. We queue it up for next bundled broadcast.", message.getClass().getSimpleName()); @@ -91,10 +108,12 @@ public class Broadcaster implements BroadcastHandler.ResultHandler { private void maybeBroadcastBundle() { if (!broadcastRequests.isEmpty()) { - log.info("Broadcast bundled requests of {} messages", broadcastRequests.size()); + log.info("Broadcast bundled requests of {} messages. Message types: {}", + broadcastRequests.size(), + broadcastRequests.stream().map(e -> e.getMessage().getClass().getSimpleName()).collect(Collectors.toList())); BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager, this); broadcastHandlers.add(broadcastHandler); - broadcastHandler.broadcast(new ArrayList<>(broadcastRequests)); + broadcastHandler.broadcast(new ArrayList<>(broadcastRequests), shutDownRequested); broadcastRequests.clear(); timer = null; @@ -109,6 +128,9 @@ public class Broadcaster implements BroadcastHandler.ResultHandler { @Override public void onCompleted(BroadcastHandler broadcastHandler) { broadcastHandlers.remove(broadcastHandler); + if (shutDownRequested) { + doShutDown(); + } }