- Add shutdown handling to broadCaster.

It is important that we flush our queued requests
at shutdown and wait until broadcast is completed as a maker need to
remove his offers at shutdown.

- Add handling for the case that there are very few connections (as in
dev setup).

- Make BundleOfEnvelopes extend BroadcastMessage

- Add complete handler for broadCaster to shutdown in P2PService and
wait with shutdown of other services until broadcaster is completed.
- Remove case for repeated shutdown call on P2PService as it cannot
happen.
This commit is contained in:
chimp1984 2020-08-27 15:31:41 -05:00
parent bef470031e
commit b1702f7a6d
No known key found for this signature in database
GPG key ID: 9801B4EC591F90E3
4 changed files with 126 additions and 85 deletions

View file

@ -17,6 +17,7 @@
package bisq.network.p2p;
import bisq.network.p2p.storage.messages.BroadcastMessage;
import bisq.network.p2p.storage.payload.CapabilityRequiringPayload;
import bisq.common.app.Capabilities;
@ -36,7 +37,7 @@ import lombok.Value;
@EqualsAndHashCode(callSuper = true)
@Value
public final class BundleOfEnvelopes extends NetworkEnvelope implements ExtendedDataSizePermission, CapabilityRequiringPayload {
public final class BundleOfEnvelopes extends BroadcastMessage implements ExtendedDataSizePermission, CapabilityRequiringPayload {
private final List<NetworkEnvelope> envelopes;

View file

@ -121,7 +121,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private final BooleanProperty preliminaryDataReceived = new SimpleBooleanProperty();
private final IntegerProperty numConnectedPeers = new SimpleIntegerProperty(0);
private volatile boolean shutDownInProgress;
private boolean shutDownComplete;
private final Subscription networkReadySubscription;
private boolean isBootstrapped;
@ -210,48 +209,51 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
public void shutDown(Runnable shutDownCompleteHandler) {
if (!shutDownInProgress) {
shutDownInProgress = true;
shutDownResultHandlers.add(shutDownCompleteHandler);
shutDownResultHandlers.add(shutDownCompleteHandler);
if (p2PDataStorage != null)
p2PDataStorage.shutDown();
if (peerManager != null)
peerManager.shutDown();
if (broadcaster != null)
broadcaster.shutDown();
if (requestDataManager != null)
requestDataManager.shutDown();
if (peerExchangeManager != null)
peerExchangeManager.shutDown();
if (keepAliveManager != null)
keepAliveManager.shutDown();
if (networkReadySubscription != null)
networkReadySubscription.unsubscribe();
if (networkNode != null) {
networkNode.shutDown(() -> {
shutDownResultHandlers.stream().forEach(Runnable::run);
shutDownComplete = true;
});
} else {
shutDownResultHandlers.stream().forEach(Runnable::run);
shutDownComplete = true;
}
// We need to make sure queued up messages are flushed out before we continue shut down other network
// services
if (broadcaster != null) {
broadcaster.shutDown(this::doShutDown);
} else {
log.debug("shutDown already in progress");
if (shutDownComplete) {
shutDownCompleteHandler.run();
} else {
shutDownResultHandlers.add(shutDownCompleteHandler);
}
doShutDown();
}
}
private void doShutDown() {
log.error("doShutDown");
if (p2PDataStorage != null) {
p2PDataStorage.shutDown();
}
if (peerManager != null) {
peerManager.shutDown();
}
if (requestDataManager != null) {
requestDataManager.shutDown();
}
if (peerExchangeManager != null) {
peerExchangeManager.shutDown();
}
if (keepAliveManager != null) {
keepAliveManager.shutDown();
}
if (networkReadySubscription != null) {
networkReadySubscription.unsubscribe();
}
if (networkNode != null) {
networkNode.shutDown(() -> {
shutDownResultHandlers.forEach(Runnable::run);
shutDownComplete = true;
});
} else {
shutDownResultHandlers.forEach(Runnable::run);
shutDownComplete = true;
}
}
@ -670,6 +672,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
BroadcastHandler.Listener listener = new BroadcastHandler.Listener() {
@Override
public void onSufficientlyBroadcast(List<Broadcaster.BroadcastRequest> broadcastRequests) {
log.error("onSufficientlyBroadcast");
broadcastRequests.stream()
.filter(broadcastRequest -> broadcastRequest.getMessage() instanceof AddDataMessage)
.filter(broadcastRequest -> {
@ -681,6 +684,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override
public void onNotSufficientlyBroadcast(int numOfCompletedBroadcasts, int numOfFailedBroadcast) {
log.error("onNotSufficientlyBroadcast");
sendMailboxMessageListener.onFault("Message was not sufficiently broadcast.\n" +
"numOfCompletedBroadcasts: " + numOfCompletedBroadcasts + ".\n" +
"numOfFailedBroadcast=" + numOfFailedBroadcast);

View file

@ -21,10 +21,10 @@ import bisq.network.p2p.BundleOfEnvelopes;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.network.Connection;
import bisq.network.p2p.network.NetworkNode;
import bisq.network.p2p.storage.messages.BroadcastMessage;
import bisq.common.Timer;
import bisq.common.UserThread;
import bisq.common.proto.network.NetworkEnvelope;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@ -71,7 +71,7 @@ public class BroadcastHandler implements PeerManager.Listener {
private final String uid;
private boolean stopped, timeoutTriggered;
private int numOfCompletedBroadcasts, numOfFailedBroadcasts, numPeers;
private int numOfCompletedBroadcasts, numOfFailedBroadcasts, numPeersForBroadcast;
private Timer timeoutTimer;
@ -93,25 +93,31 @@ public class BroadcastHandler implements PeerManager.Listener {
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void broadcast(List<Broadcaster.BroadcastRequest> broadcastRequests) {
public void broadcast(List<Broadcaster.BroadcastRequest> broadcastRequests, boolean shutDownRequested) {
List<Connection> confirmedConnections = new ArrayList<>(networkNode.getConfirmedConnections());
Collections.shuffle(confirmedConnections);
int delay;
if (requestsContainOwnMessage(broadcastRequests)) {
// The broadcastRequests contains at least 1 message we have originated, so we send to all peers and
// with shorter delay
numPeers = confirmedConnections.size();
delay = 50;
if (shutDownRequested) {
delay = 1;
// We sent to all peers as in case we had offers we want that it gets removed with higher reliability
numPeersForBroadcast = confirmedConnections.size();
} else {
// Relay nodes only send to max 7 peers and with longer delay
numPeers = Math.min(7, confirmedConnections.size());
delay = 100;
if (requestsContainOwnMessage(broadcastRequests)) {
// The broadcastRequests contains at least 1 message we have originated, so we send to all peers and
// with shorter delay
numPeersForBroadcast = confirmedConnections.size();
delay = 50;
} else {
// Relay nodes only send to max 7 peers and with longer delay
numPeersForBroadcast = Math.min(7, confirmedConnections.size());
delay = 100;
}
}
setupTimeoutHandler(broadcastRequests, delay);
setupTimeoutHandler(broadcastRequests, delay, shutDownRequested);
int iterations = numPeers;
int iterations = numPeersForBroadcast;
for (int i = 0; i < iterations; i++) {
long minDelay = (i + 1) * delay;
long maxDelay = (i + 2) * delay;
@ -129,8 +135,8 @@ public class BroadcastHandler implements PeerManager.Listener {
// Could be empty list...
if (broadcastRequestsForConnection.isEmpty()) {
// We decrease numPeers in that case for making completion checks correct.
if (numPeers > 0) {
numPeers--;
if (numPeersForBroadcast > 0) {
numPeersForBroadcast--;
}
checkForCompletion();
return;
@ -139,8 +145,8 @@ public class BroadcastHandler implements PeerManager.Listener {
if (connection.isStopped()) {
// Connection has died in the meantime. We skip it.
// We decrease numPeers in that case for making completion checks correct.
if (numPeers > 0) {
numPeers--;
if (numPeersForBroadcast > 0) {
numPeersForBroadcast--;
}
checkForCompletion();
return;
@ -188,8 +194,12 @@ public class BroadcastHandler implements PeerManager.Listener {
return broadcastRequests.stream().anyMatch(e -> myAddress.equals(e.getSender()));
}
private void setupTimeoutHandler(List<Broadcaster.BroadcastRequest> broadcastRequests, int delay) {
long timeoutDelay = BASE_TIMEOUT_MS + delay * (numPeers + 1); // We added 1 in the loop
private void setupTimeoutHandler(List<Broadcaster.BroadcastRequest> broadcastRequests,
int delay,
boolean shutDownRequested) {
// In case of shutdown we try to complete fast and set a short 1 second timeout
long baseTimeoutMs = shutDownRequested ? TimeUnit.SECONDS.toMillis(1) : BASE_TIMEOUT_MS;
long timeoutDelay = baseTimeoutMs + delay * (numPeersForBroadcast + 1); // We added 1 in the loop
timeoutTimer = UserThread.runAfter(() -> {
if (stopped) {
return;
@ -197,18 +207,20 @@ public class BroadcastHandler implements PeerManager.Listener {
timeoutTriggered = true;
String errorMessage = "Timeout: Broadcast did not complete after " + timeoutDelay + " sec." + "\n" +
"numOfPeers=" + numPeers + "\n" +
"numOfCompletedBroadcasts=" + numOfCompletedBroadcasts + "\n" +
"numOfFailedBroadcasts=" + numOfFailedBroadcasts;
log.warn(errorMessage);
log.warn("Broadcast did not complete after {} sec.\n" +
"numPeersForBroadcast={}\n" +
"numOfCompletedBroadcasts={}\n" +
"numOfFailedBroadcasts={}",
timeoutDelay / 1000d,
numPeersForBroadcast,
numOfCompletedBroadcasts,
numOfFailedBroadcasts);
maybeNotifyListeners(broadcastRequests);
cleanup();
}, timeoutDelay);
}, timeoutDelay, TimeUnit.MILLISECONDS);
}
// We exclude the requests containing a message we received from that connection
@ -223,8 +235,9 @@ public class BroadcastHandler implements PeerManager.Listener {
}
private void sendToPeer(Connection connection, List<Broadcaster.BroadcastRequest> broadcastRequestsForConnection) {
NetworkEnvelope networkEnvelope = getNetworkEnvelope(broadcastRequestsForConnection);
SettableFuture<Connection> future = networkNode.sendMessage(connection, networkEnvelope);
// Can be BundleOfEnvelopes or a single BroadcastMessage
BroadcastMessage broadcastMessage = getMessage(broadcastRequestsForConnection);
SettableFuture<Connection> future = networkNode.sendMessage(connection, broadcastMessage);
Futures.addCallback(future, new FutureCallback<>() {
@Override
@ -236,7 +249,6 @@ public class BroadcastHandler implements PeerManager.Listener {
}
maybeNotifyListeners(broadcastRequestsForConnection);
checkForCompletion();
}
@ -244,7 +256,6 @@ public class BroadcastHandler implements PeerManager.Listener {
public void onFailure(@NotNull Throwable throwable) {
log.warn("Broadcast to {} failed. ErrorMessage={}", connection.getPeersNodeAddressOptional(),
throwable.getMessage());
numOfFailedBroadcasts++;
if (stopped) {
@ -252,13 +263,12 @@ public class BroadcastHandler implements PeerManager.Listener {
}
maybeNotifyListeners(broadcastRequestsForConnection);
checkForCompletion();
}
});
}
private NetworkEnvelope getNetworkEnvelope(List<Broadcaster.BroadcastRequest> broadcastRequests) {
private BroadcastMessage getMessage(List<Broadcaster.BroadcastRequest> broadcastRequests) {
if (broadcastRequests.size() == 1) {
// If we only have 1 message we avoid the overhead of the BundleOfEnvelopes and send the message directly
return broadcastRequests.get(0).getMessage();
@ -270,22 +280,26 @@ public class BroadcastHandler implements PeerManager.Listener {
}
private void maybeNotifyListeners(List<Broadcaster.BroadcastRequest> broadcastRequests) {
int numOfCompletedBroadcastsTarget = Math.max(1, Math.min(numPeersForBroadcast, 3));
// We use equal checks to avoid duplicated listener calls as it would be the case with >= checks.
if (numOfCompletedBroadcasts == 3) {
// We have heard back from 3 peers so we consider the message was sufficiently broadcast.
if (numOfCompletedBroadcasts == numOfCompletedBroadcastsTarget) {
// We have heard back from 3 peers (or all peers if numPeers is lower) so we consider the message was sufficiently broadcast.
broadcastRequests.stream()
.filter(broadcastRequest -> broadcastRequest.getListener() != null)
.map(Broadcaster.BroadcastRequest::getListener)
.forEach(listener -> listener.onSufficientlyBroadcast(broadcastRequests));
} else {
int maxPossibleSuccessCases = numPeers - numOfFailedBroadcasts;
if (maxPossibleSuccessCases == 2) {
// We never can reach required resilience as too many numOfFailedBroadcasts occurred.
// Number of open requests to peers is less than we need to reach numOfCompletedBroadcastsTarget.
// Thus we never can reach required resilience as too many numOfFailedBroadcasts occurred.
int openRequests = numPeersForBroadcast - numOfCompletedBroadcasts - numOfFailedBroadcasts;
int maxPossibleSuccessCases = openRequests + numOfCompletedBroadcasts;
// We subtract 1 as we want to have it called only once, with a < comparision we would trigger repeatedly.
if (maxPossibleSuccessCases == numOfCompletedBroadcastsTarget - 1) {
broadcastRequests.stream()
.filter(broadcastRequest -> broadcastRequest.getListener() != null)
.map(Broadcaster.BroadcastRequest::getListener)
.forEach(listener -> listener.onNotSufficientlyBroadcast(numOfCompletedBroadcasts, numOfFailedBroadcasts));
} else if (timeoutTriggered && numOfCompletedBroadcasts < 3) {
} else if (timeoutTriggered && numOfCompletedBroadcasts < numOfCompletedBroadcastsTarget) {
// We did not reach resilience level and timeout prevents to reach it later
broadcastRequests.stream()
.filter(broadcastRequest -> broadcastRequest.getListener() != null)
@ -296,7 +310,7 @@ public class BroadcastHandler implements PeerManager.Listener {
}
private void checkForCompletion() {
if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeers) {
if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeersForBroadcast) {
cleanup();
}
}

View file

@ -31,6 +31,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
@ -46,6 +47,8 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
private final Set<BroadcastHandler> broadcastHandlers = new CopyOnWriteArraySet<>();
private final List<BroadcastRequest> broadcastRequests = new ArrayList<>();
private Timer timer;
private boolean shutDownRequested;
private Runnable shutDownResultHandler;
///////////////////////////////////////////////////////////////////////////////////////////
@ -58,12 +61,24 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
this.peerManager = peerManager;
}
public void shutDown() {
broadcastHandlers.forEach(BroadcastHandler::cancel);
public void shutDown(Runnable resultHandler) {
shutDownRequested = true;
shutDownResultHandler = resultHandler;
if (broadcastRequests.isEmpty()) {
doShutDown();
} else {
// We set delay of broadcasts and timeout to very low values,
// so we can expect that we get onCompleted called very fast and trigger the doShutDown from there.
maybeBroadcastBundle();
}
}
private void doShutDown() {
broadcastHandlers.forEach(BroadcastHandler::cancel);
if (timer != null) {
timer.stop();
}
shutDownResultHandler.run();
}
@ -81,6 +96,8 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
@Nullable NodeAddress sender,
@Nullable BroadcastHandler.Listener listener) {
broadcastRequests.add(new BroadcastRequest(message, sender, listener));
// Keep that log on INFO for better debugging if the feature works as expected. Later it can
// be remove or set to DEBUG
log.info("Broadcast requested for {}. We queue it up for next bundled broadcast.",
message.getClass().getSimpleName());
@ -91,10 +108,12 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
private void maybeBroadcastBundle() {
if (!broadcastRequests.isEmpty()) {
log.info("Broadcast bundled requests of {} messages", broadcastRequests.size());
log.info("Broadcast bundled requests of {} messages. Message types: {}",
broadcastRequests.size(),
broadcastRequests.stream().map(e -> e.getClass().getSimpleName()).collect(Collectors.toList()));
BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager, this);
broadcastHandlers.add(broadcastHandler);
broadcastHandler.broadcast(new ArrayList<>(broadcastRequests));
broadcastHandler.broadcast(new ArrayList<>(broadcastRequests), shutDownRequested);
broadcastRequests.clear();
timer = null;
@ -109,6 +128,9 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
@Override
public void onCompleted(BroadcastHandler broadcastHandler) {
broadcastHandlers.remove(broadcastHandler);
if (shutDownRequested) {
doShutDown();
}
}