Merge remote-tracking branch 'origin/bundle-msg-at-broadcast' into bundle-msg-at-broadcast

# Conflicts:
#	p2p/src/main/java/bisq/network/p2p/P2PService.java
This commit is contained in:
chimp1984 2020-08-29 13:13:31 -05:00
commit 366fdb3a09
No known key found for this signature in database
GPG key ID: 9801B4EC591F90E3
4 changed files with 122 additions and 85 deletions

View file

@ -17,6 +17,7 @@
package bisq.network.p2p; package bisq.network.p2p;
import bisq.network.p2p.storage.messages.BroadcastMessage;
import bisq.network.p2p.storage.payload.CapabilityRequiringPayload; import bisq.network.p2p.storage.payload.CapabilityRequiringPayload;
import bisq.common.app.Capabilities; import bisq.common.app.Capabilities;
@ -36,7 +37,7 @@ import lombok.Value;
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@Value @Value
public final class BundleOfEnvelopes extends NetworkEnvelope implements ExtendedDataSizePermission, CapabilityRequiringPayload { public final class BundleOfEnvelopes extends BroadcastMessage implements ExtendedDataSizePermission, CapabilityRequiringPayload {
private final List<NetworkEnvelope> envelopes; private final List<NetworkEnvelope> envelopes;

View file

@ -211,48 +211,48 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
} }
public void shutDown(Runnable shutDownCompleteHandler) { public void shutDown(Runnable shutDownCompleteHandler) {
if (!shutDownInProgress) { shutDownResultHandlers.add(shutDownCompleteHandler);
shutDownInProgress = true;
shutDownResultHandlers.add(shutDownCompleteHandler); // We need to make sure queued up messages are flushed out before we continue shut down other network
// services
if (p2PDataStorage != null) if (broadcaster != null) {
p2PDataStorage.shutDown(); broadcaster.shutDown(this::doShutDown);
if (peerManager != null)
peerManager.shutDown();
if (broadcaster != null)
broadcaster.shutDown();
if (requestDataManager != null)
requestDataManager.shutDown();
if (peerExchangeManager != null)
peerExchangeManager.shutDown();
if (keepAliveManager != null)
keepAliveManager.shutDown();
if (networkReadySubscription != null)
networkReadySubscription.unsubscribe();
if (networkNode != null) {
networkNode.shutDown(() -> {
shutDownResultHandlers.stream().forEach(Runnable::run);
shutDownComplete = true;
});
} else {
shutDownResultHandlers.stream().forEach(Runnable::run);
shutDownComplete = true;
}
} else { } else {
log.debug("shutDown already in progress"); doShutDown();
if (shutDownComplete) { }
shutDownCompleteHandler.run(); }
} else {
shutDownResultHandlers.add(shutDownCompleteHandler); private void doShutDown() {
} if (p2PDataStorage != null) {
p2PDataStorage.shutDown();
}
if (peerManager != null) {
peerManager.shutDown();
}
if (requestDataManager != null) {
requestDataManager.shutDown();
}
if (peerExchangeManager != null) {
peerExchangeManager.shutDown();
}
if (keepAliveManager != null) {
keepAliveManager.shutDown();
}
if (networkReadySubscription != null) {
networkReadySubscription.unsubscribe();
}
if (networkNode != null) {
networkNode.shutDown(() -> {
shutDownResultHandlers.forEach(Runnable::run);
});
} else {
shutDownResultHandlers.forEach(Runnable::run);
} }
} }
@ -447,7 +447,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override @Override
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) { public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
// not handled // not used
} }
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////

View file

@ -21,10 +21,10 @@ import bisq.network.p2p.BundleOfEnvelopes;
import bisq.network.p2p.NodeAddress; import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.network.Connection; import bisq.network.p2p.network.Connection;
import bisq.network.p2p.network.NetworkNode; import bisq.network.p2p.network.NetworkNode;
import bisq.network.p2p.storage.messages.BroadcastMessage;
import bisq.common.Timer; import bisq.common.Timer;
import bisq.common.UserThread; import bisq.common.UserThread;
import bisq.common.proto.network.NetworkEnvelope;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
@ -71,7 +71,7 @@ public class BroadcastHandler implements PeerManager.Listener {
private final String uid; private final String uid;
private boolean stopped, timeoutTriggered; private boolean stopped, timeoutTriggered;
private int numOfCompletedBroadcasts, numOfFailedBroadcasts, numPeers; private int numOfCompletedBroadcasts, numOfFailedBroadcasts, numPeersForBroadcast;
private Timer timeoutTimer; private Timer timeoutTimer;
@ -93,25 +93,31 @@ public class BroadcastHandler implements PeerManager.Listener {
// API // API
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public void broadcast(List<Broadcaster.BroadcastRequest> broadcastRequests) { public void broadcast(List<Broadcaster.BroadcastRequest> broadcastRequests, boolean shutDownRequested) {
List<Connection> confirmedConnections = new ArrayList<>(networkNode.getConfirmedConnections()); List<Connection> confirmedConnections = new ArrayList<>(networkNode.getConfirmedConnections());
Collections.shuffle(confirmedConnections); Collections.shuffle(confirmedConnections);
int delay; int delay;
if (requestsContainOwnMessage(broadcastRequests)) { if (shutDownRequested) {
// The broadcastRequests contains at least 1 message we have originated, so we send to all peers and delay = 1;
// with shorter delay // We sent to all peers as in case we had offers we want that it gets removed with higher reliability
numPeers = confirmedConnections.size(); numPeersForBroadcast = confirmedConnections.size();
delay = 50;
} else { } else {
// Relay nodes only send to max 7 peers and with longer delay if (requestsContainOwnMessage(broadcastRequests)) {
numPeers = Math.min(7, confirmedConnections.size()); // The broadcastRequests contains at least 1 message we have originated, so we send to all peers and
delay = 100; // with shorter delay
numPeersForBroadcast = confirmedConnections.size();
delay = 50;
} else {
// Relay nodes only send to max 7 peers and with longer delay
numPeersForBroadcast = Math.min(7, confirmedConnections.size());
delay = 100;
}
} }
setupTimeoutHandler(broadcastRequests, delay); setupTimeoutHandler(broadcastRequests, delay, shutDownRequested);
int iterations = numPeers; int iterations = numPeersForBroadcast;
for (int i = 0; i < iterations; i++) { for (int i = 0; i < iterations; i++) {
long minDelay = (i + 1) * delay; long minDelay = (i + 1) * delay;
long maxDelay = (i + 2) * delay; long maxDelay = (i + 2) * delay;
@ -129,8 +135,8 @@ public class BroadcastHandler implements PeerManager.Listener {
// Could be empty list... // Could be empty list...
if (broadcastRequestsForConnection.isEmpty()) { if (broadcastRequestsForConnection.isEmpty()) {
// We decrease numPeers in that case for making completion checks correct. // We decrease numPeers in that case for making completion checks correct.
if (numPeers > 0) { if (numPeersForBroadcast > 0) {
numPeers--; numPeersForBroadcast--;
} }
checkForCompletion(); checkForCompletion();
return; return;
@ -139,8 +145,8 @@ public class BroadcastHandler implements PeerManager.Listener {
if (connection.isStopped()) { if (connection.isStopped()) {
// Connection has died in the meantime. We skip it. // Connection has died in the meantime. We skip it.
// We decrease numPeers in that case for making completion checks correct. // We decrease numPeers in that case for making completion checks correct.
if (numPeers > 0) { if (numPeersForBroadcast > 0) {
numPeers--; numPeersForBroadcast--;
} }
checkForCompletion(); checkForCompletion();
return; return;
@ -188,8 +194,12 @@ public class BroadcastHandler implements PeerManager.Listener {
return broadcastRequests.stream().anyMatch(e -> myAddress.equals(e.getSender())); return broadcastRequests.stream().anyMatch(e -> myAddress.equals(e.getSender()));
} }
private void setupTimeoutHandler(List<Broadcaster.BroadcastRequest> broadcastRequests, int delay) { private void setupTimeoutHandler(List<Broadcaster.BroadcastRequest> broadcastRequests,
long timeoutDelay = BASE_TIMEOUT_MS + delay * (numPeers + 1); // We added 1 in the loop int delay,
boolean shutDownRequested) {
// In case of shutdown we try to complete fast and set a short 1 second timeout
long baseTimeoutMs = shutDownRequested ? TimeUnit.SECONDS.toMillis(1) : BASE_TIMEOUT_MS;
long timeoutDelay = baseTimeoutMs + delay * (numPeersForBroadcast + 1); // We added 1 in the loop
timeoutTimer = UserThread.runAfter(() -> { timeoutTimer = UserThread.runAfter(() -> {
if (stopped) { if (stopped) {
return; return;
@ -197,18 +207,20 @@ public class BroadcastHandler implements PeerManager.Listener {
timeoutTriggered = true; timeoutTriggered = true;
String errorMessage = "Timeout: Broadcast did not complete after " + timeoutDelay + " sec." + "\n" + log.warn("Broadcast did not complete after {} sec.\n" +
"numOfPeers=" + numPeers + "\n" + "numPeersForBroadcast={}\n" +
"numOfCompletedBroadcasts=" + numOfCompletedBroadcasts + "\n" + "numOfCompletedBroadcasts={}\n" +
"numOfFailedBroadcasts=" + numOfFailedBroadcasts; "numOfFailedBroadcasts={}",
timeoutDelay / 1000d,
log.warn(errorMessage); numPeersForBroadcast,
numOfCompletedBroadcasts,
numOfFailedBroadcasts);
maybeNotifyListeners(broadcastRequests); maybeNotifyListeners(broadcastRequests);
cleanup(); cleanup();
}, timeoutDelay); }, timeoutDelay, TimeUnit.MILLISECONDS);
} }
// We exclude the requests containing a message we received from that connection // We exclude the requests containing a message we received from that connection
@ -223,8 +235,9 @@ public class BroadcastHandler implements PeerManager.Listener {
} }
private void sendToPeer(Connection connection, List<Broadcaster.BroadcastRequest> broadcastRequestsForConnection) { private void sendToPeer(Connection connection, List<Broadcaster.BroadcastRequest> broadcastRequestsForConnection) {
NetworkEnvelope networkEnvelope = getNetworkEnvelope(broadcastRequestsForConnection); // Can be BundleOfEnvelopes or a single BroadcastMessage
SettableFuture<Connection> future = networkNode.sendMessage(connection, networkEnvelope); BroadcastMessage broadcastMessage = getMessage(broadcastRequestsForConnection);
SettableFuture<Connection> future = networkNode.sendMessage(connection, broadcastMessage);
Futures.addCallback(future, new FutureCallback<>() { Futures.addCallback(future, new FutureCallback<>() {
@Override @Override
@ -236,7 +249,6 @@ public class BroadcastHandler implements PeerManager.Listener {
} }
maybeNotifyListeners(broadcastRequestsForConnection); maybeNotifyListeners(broadcastRequestsForConnection);
checkForCompletion(); checkForCompletion();
} }
@ -244,7 +256,6 @@ public class BroadcastHandler implements PeerManager.Listener {
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
log.warn("Broadcast to {} failed. ErrorMessage={}", connection.getPeersNodeAddressOptional(), log.warn("Broadcast to {} failed. ErrorMessage={}", connection.getPeersNodeAddressOptional(),
throwable.getMessage()); throwable.getMessage());
numOfFailedBroadcasts++; numOfFailedBroadcasts++;
if (stopped) { if (stopped) {
@ -252,13 +263,12 @@ public class BroadcastHandler implements PeerManager.Listener {
} }
maybeNotifyListeners(broadcastRequestsForConnection); maybeNotifyListeners(broadcastRequestsForConnection);
checkForCompletion(); checkForCompletion();
} }
}); });
} }
private NetworkEnvelope getNetworkEnvelope(List<Broadcaster.BroadcastRequest> broadcastRequests) { private BroadcastMessage getMessage(List<Broadcaster.BroadcastRequest> broadcastRequests) {
if (broadcastRequests.size() == 1) { if (broadcastRequests.size() == 1) {
// If we only have 1 message we avoid the overhead of the BundleOfEnvelopes and send the message directly // If we only have 1 message we avoid the overhead of the BundleOfEnvelopes and send the message directly
return broadcastRequests.get(0).getMessage(); return broadcastRequests.get(0).getMessage();
@ -270,22 +280,26 @@ public class BroadcastHandler implements PeerManager.Listener {
} }
private void maybeNotifyListeners(List<Broadcaster.BroadcastRequest> broadcastRequests) { private void maybeNotifyListeners(List<Broadcaster.BroadcastRequest> broadcastRequests) {
int numOfCompletedBroadcastsTarget = Math.max(1, Math.min(numPeersForBroadcast, 3));
// We use equal checks to avoid duplicated listener calls as it would be the case with >= checks. // We use equal checks to avoid duplicated listener calls as it would be the case with >= checks.
if (numOfCompletedBroadcasts == 3) { if (numOfCompletedBroadcasts == numOfCompletedBroadcastsTarget) {
// We have heard back from 3 peers so we consider the message was sufficiently broadcast. // We have heard back from 3 peers (or all peers if numPeers is lower) so we consider the message was sufficiently broadcast.
broadcastRequests.stream() broadcastRequests.stream()
.filter(broadcastRequest -> broadcastRequest.getListener() != null) .filter(broadcastRequest -> broadcastRequest.getListener() != null)
.map(Broadcaster.BroadcastRequest::getListener) .map(Broadcaster.BroadcastRequest::getListener)
.forEach(listener -> listener.onSufficientlyBroadcast(broadcastRequests)); .forEach(listener -> listener.onSufficientlyBroadcast(broadcastRequests));
} else { } else {
int maxPossibleSuccessCases = numPeers - numOfFailedBroadcasts; // Number of open requests to peers is less than we need to reach numOfCompletedBroadcastsTarget.
if (maxPossibleSuccessCases == 2) { // Thus we never can reach required resilience as too many numOfFailedBroadcasts occurred.
// We never can reach required resilience as too many numOfFailedBroadcasts occurred. int openRequests = numPeersForBroadcast - numOfCompletedBroadcasts - numOfFailedBroadcasts;
int maxPossibleSuccessCases = openRequests + numOfCompletedBroadcasts;
// We subtract 1 as we want to have it called only once, with a < comparision we would trigger repeatedly.
if (maxPossibleSuccessCases == numOfCompletedBroadcastsTarget - 1) {
broadcastRequests.stream() broadcastRequests.stream()
.filter(broadcastRequest -> broadcastRequest.getListener() != null) .filter(broadcastRequest -> broadcastRequest.getListener() != null)
.map(Broadcaster.BroadcastRequest::getListener) .map(Broadcaster.BroadcastRequest::getListener)
.forEach(listener -> listener.onNotSufficientlyBroadcast(numOfCompletedBroadcasts, numOfFailedBroadcasts)); .forEach(listener -> listener.onNotSufficientlyBroadcast(numOfCompletedBroadcasts, numOfFailedBroadcasts));
} else if (timeoutTriggered && numOfCompletedBroadcasts < 3) { } else if (timeoutTriggered && numOfCompletedBroadcasts < numOfCompletedBroadcastsTarget) {
// We did not reach resilience level and timeout prevents to reach it later // We did not reach resilience level and timeout prevents to reach it later
broadcastRequests.stream() broadcastRequests.stream()
.filter(broadcastRequest -> broadcastRequest.getListener() != null) .filter(broadcastRequest -> broadcastRequest.getListener() != null)
@ -296,7 +310,7 @@ public class BroadcastHandler implements PeerManager.Listener {
} }
private void checkForCompletion() { private void checkForCompletion() {
if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeers) { if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeersForBroadcast) {
cleanup(); cleanup();
} }
} }

View file

@ -31,6 +31,7 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Value; import lombok.Value;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -46,6 +47,8 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
private final Set<BroadcastHandler> broadcastHandlers = new CopyOnWriteArraySet<>(); private final Set<BroadcastHandler> broadcastHandlers = new CopyOnWriteArraySet<>();
private final List<BroadcastRequest> broadcastRequests = new ArrayList<>(); private final List<BroadcastRequest> broadcastRequests = new ArrayList<>();
private Timer timer; private Timer timer;
private boolean shutDownRequested;
private Runnable shutDownResultHandler;
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -58,12 +61,24 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
this.peerManager = peerManager; this.peerManager = peerManager;
} }
public void shutDown() { public void shutDown(Runnable resultHandler) {
broadcastHandlers.forEach(BroadcastHandler::cancel); shutDownRequested = true;
shutDownResultHandler = resultHandler;
if (broadcastRequests.isEmpty()) {
doShutDown();
} else {
// We set delay of broadcasts and timeout to very low values,
// so we can expect that we get onCompleted called very fast and trigger the doShutDown from there.
maybeBroadcastBundle();
}
}
private void doShutDown() {
broadcastHandlers.forEach(BroadcastHandler::cancel);
if (timer != null) { if (timer != null) {
timer.stop(); timer.stop();
} }
shutDownResultHandler.run();
} }
@ -81,6 +96,8 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
@Nullable NodeAddress sender, @Nullable NodeAddress sender,
@Nullable BroadcastHandler.Listener listener) { @Nullable BroadcastHandler.Listener listener) {
broadcastRequests.add(new BroadcastRequest(message, sender, listener)); broadcastRequests.add(new BroadcastRequest(message, sender, listener));
// Keep that log on INFO for better debugging if the feature works as expected. Later it can
// be remove or set to DEBUG
log.info("Broadcast requested for {}. We queue it up for next bundled broadcast.", log.info("Broadcast requested for {}. We queue it up for next bundled broadcast.",
message.getClass().getSimpleName()); message.getClass().getSimpleName());
@ -91,10 +108,12 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
private void maybeBroadcastBundle() { private void maybeBroadcastBundle() {
if (!broadcastRequests.isEmpty()) { if (!broadcastRequests.isEmpty()) {
log.info("Broadcast bundled requests of {} messages", broadcastRequests.size()); log.info("Broadcast bundled requests of {} messages. Message types: {}",
broadcastRequests.size(),
broadcastRequests.stream().map(e -> e.getMessage().getClass().getSimpleName()).collect(Collectors.toList()));
BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager, this); BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager, this);
broadcastHandlers.add(broadcastHandler); broadcastHandlers.add(broadcastHandler);
broadcastHandler.broadcast(new ArrayList<>(broadcastRequests)); broadcastHandler.broadcast(new ArrayList<>(broadcastRequests), shutDownRequested);
broadcastRequests.clear(); broadcastRequests.clear();
timer = null; timer = null;
@ -109,6 +128,9 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
@Override @Override
public void onCompleted(BroadcastHandler broadcastHandler) { public void onCompleted(BroadcastHandler broadcastHandler) {
broadcastHandlers.remove(broadcastHandler); broadcastHandlers.remove(broadcastHandler);
if (shutDownRequested) {
doShutDown();
}
} }