mirror of
https://github.com/bisq-network/bisq.git
synced 2024-11-19 18:03:12 +01:00
Fix issue with message Listenter added too late and wrong handling of PreliminaryDataRequest vs. UpdateDataRequest
This commit is contained in:
parent
48e6cc34a8
commit
fd1efe7353
@ -37,6 +37,7 @@ public class RequestDataHandler implements MessageListener {
|
||||
private static final Logger log = LoggerFactory.getLogger(RequestDataHandler.class);
|
||||
|
||||
private static final long TIME_OUT_SEC = 20;
|
||||
private NodeAddress peersNodeAddress;
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
@ -85,8 +86,9 @@ public class RequestDataHandler implements MessageListener {
|
||||
// API
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public void requestData(NodeAddress nodeAddress) {
|
||||
public void requestData(NodeAddress nodeAddress, boolean isPreliminaryDataRequest) {
|
||||
Log.traceCall("nodeAddress=" + nodeAddress);
|
||||
peersNodeAddress = nodeAddress;
|
||||
if (!stopped) {
|
||||
GetDataRequest getDataRequest;
|
||||
|
||||
@ -99,7 +101,7 @@ public class RequestDataHandler implements MessageListener {
|
||||
.map(e -> e.getKey().bytes)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
if (networkNode.getNodeAddress() == null)
|
||||
if (isPreliminaryDataRequest)
|
||||
getDataRequest = new PreliminaryGetDataRequest(nonce, excludedKeys);
|
||||
else
|
||||
getDataRequest = new GetUpdatedDataRequest(networkNode.getNodeAddress(), nonce, excludedKeys);
|
||||
@ -120,13 +122,13 @@ public class RequestDataHandler implements MessageListener {
|
||||
}
|
||||
|
||||
log.debug("We send a {} to peer {}. ", getDataRequest.getClass().getSimpleName(), nodeAddress);
|
||||
networkNode.addMessageListener(this);
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getDataRequest);
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
public void onSuccess(Connection connection) {
|
||||
if (!stopped) {
|
||||
RequestDataHandler.this.connection = connection;
|
||||
connection.addMessageListener(RequestDataHandler.this);
|
||||
log.trace("Send " + getDataRequest + " to " + nodeAddress + " succeeded.");
|
||||
} else {
|
||||
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call." +
|
||||
@ -161,84 +163,87 @@ public class RequestDataHandler implements MessageListener {
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message, Connection connection) {
|
||||
if (message instanceof GetDataResponse) {
|
||||
Log.traceCall(message.toString() + "\n\tconnection=" + connection);
|
||||
if (!stopped) {
|
||||
GetDataResponse getDataResponse = (GetDataResponse) message;
|
||||
Map<String, Set<StoragePayload>> payloadByClassName = new HashMap<>();
|
||||
final HashSet<ProtectedStorageEntry> dataSet = getDataResponse.dataSet;
|
||||
dataSet.stream().forEach(e -> {
|
||||
final StoragePayload storagePayload = e.getStoragePayload();
|
||||
String className = storagePayload.getClass().getSimpleName();
|
||||
if (!payloadByClassName.containsKey(className))
|
||||
payloadByClassName.put(className, new HashSet<>());
|
||||
|
||||
payloadByClassName.get(className).add(storagePayload);
|
||||
});
|
||||
StringBuilder sb = new StringBuilder("Received data size: ").append(dataSet.size()).append(", data items: ");
|
||||
payloadByClassName.entrySet().stream().forEach(e -> sb.append(e.getValue().size()).append(" items of ").append(e.getKey()).append("; "));
|
||||
log.info(sb.toString());
|
||||
|
||||
if (getDataResponse.requestNonce == nonce) {
|
||||
stopTimeoutTimer();
|
||||
checkArgument(connection.getPeersNodeAddressOptional().isPresent(),
|
||||
"RequestDataHandler.onMessage: connection.getPeersNodeAddressOptional() must be present " +
|
||||
"at that moment");
|
||||
|
||||
final NodeAddress sender = connection.getPeersNodeAddressOptional().get();
|
||||
|
||||
List<ProtectedStorageEntry> processDelayedItems = new ArrayList<>();
|
||||
if (connection.getPeersNodeAddressOptional().isPresent() && connection.getPeersNodeAddressOptional().get().equals(peersNodeAddress)) {
|
||||
if (message instanceof GetDataResponse) {
|
||||
Log.traceCall(message.toString() + "\n\tconnection=" + connection);
|
||||
if (!stopped) {
|
||||
GetDataResponse getDataResponse = (GetDataResponse) message;
|
||||
Map<String, Set<StoragePayload>> payloadByClassName = new HashMap<>();
|
||||
final HashSet<ProtectedStorageEntry> dataSet = getDataResponse.dataSet;
|
||||
dataSet.stream().forEach(e -> {
|
||||
if (e.getStoragePayload() instanceof LazyProcessedStoragePayload)
|
||||
processDelayedItems.add(e);
|
||||
else {
|
||||
// We dont broadcast here (last param) as we are only connected to the seed node and would be pointless
|
||||
dataStorage.add(e, sender, null, false, false);
|
||||
}
|
||||
final StoragePayload storagePayload = e.getStoragePayload();
|
||||
String className = storagePayload.getClass().getSimpleName();
|
||||
if (!payloadByClassName.containsKey(className))
|
||||
payloadByClassName.put(className, new HashSet<>());
|
||||
|
||||
payloadByClassName.get(className).add(storagePayload);
|
||||
});
|
||||
StringBuilder sb = new StringBuilder("Received data size: ").append(dataSet.size()).append(", data items: ");
|
||||
payloadByClassName.entrySet().stream().forEach(e -> sb.append(e.getValue().size()).append(" items of ").append(e.getKey()).append("; "));
|
||||
log.info(sb.toString());
|
||||
|
||||
// We process the LazyProcessedStoragePayload items (TradeStatistics) in batches with a delay in between.
|
||||
// We want avoid that the UI get stuck when processing many entries.
|
||||
// The dataStorage.add call is a bit expensive as sig checks is done there.
|
||||
if (getDataResponse.requestNonce == nonce) {
|
||||
stopTimeoutTimer();
|
||||
checkArgument(connection.getPeersNodeAddressOptional().isPresent(),
|
||||
"RequestDataHandler.onMessage: connection.getPeersNodeAddressOptional() must be present " +
|
||||
"at that moment");
|
||||
|
||||
// Using a background thread might be an alternative but it would require much more effort and
|
||||
// it would also decrease user experience if the app gets under heavy load (like at startup with wallet sync).
|
||||
// Beside that we mitigated the problem already as we will not get the whole TradeStatistics as we
|
||||
// pass the excludeKeys and we pack the latest data dump
|
||||
// into the resources, so a new user do not need to request all data.
|
||||
final NodeAddress sender = connection.getPeersNodeAddressOptional().get();
|
||||
|
||||
// In future we will probably limit by date or load on demand from user intent to not get too much data.
|
||||
List<ProtectedStorageEntry> processDelayedItems = new ArrayList<>();
|
||||
dataSet.stream().forEach(e -> {
|
||||
if (e.getStoragePayload() instanceof LazyProcessedStoragePayload)
|
||||
processDelayedItems.add(e);
|
||||
else {
|
||||
// We dont broadcast here (last param) as we are only connected to the seed node and would be pointless
|
||||
dataStorage.add(e, sender, null, false, false);
|
||||
}
|
||||
});
|
||||
|
||||
// We split the list into sub lists with max 50 items and delay each batch with 200 ms.
|
||||
int size = processDelayedItems.size();
|
||||
int chunkSize = 50;
|
||||
int chunks = 1 + size / chunkSize;
|
||||
int startIndex = 0;
|
||||
for (int i = 0; i < chunks && startIndex < size; i++, startIndex += chunkSize) {
|
||||
long delay = (i + 1) * 200;
|
||||
int endIndex = Math.min(size, startIndex + chunkSize);
|
||||
List<ProtectedStorageEntry> subList = processDelayedItems.subList(startIndex, endIndex);
|
||||
UserThread.runAfter(() -> {
|
||||
subList.stream().forEach(protectedStorageEntry -> dataStorage.add(protectedStorageEntry, sender, null, false, false));
|
||||
}, delay, TimeUnit.MILLISECONDS);
|
||||
// We process the LazyProcessedStoragePayload items (TradeStatistics) in batches with a delay in between.
|
||||
// We want avoid that the UI get stuck when processing many entries.
|
||||
// The dataStorage.add call is a bit expensive as sig checks is done there.
|
||||
|
||||
// Using a background thread might be an alternative but it would require much more effort and
|
||||
// it would also decrease user experience if the app gets under heavy load (like at startup with wallet sync).
|
||||
// Beside that we mitigated the problem already as we will not get the whole TradeStatistics as we
|
||||
// pass the excludeKeys and we pack the latest data dump
|
||||
// into the resources, so a new user do not need to request all data.
|
||||
|
||||
// In future we will probably limit by date or load on demand from user intent to not get too much data.
|
||||
|
||||
// We split the list into sub lists with max 50 items and delay each batch with 200 ms.
|
||||
int size = processDelayedItems.size();
|
||||
int chunkSize = 50;
|
||||
int chunks = 1 + size / chunkSize;
|
||||
int startIndex = 0;
|
||||
for (int i = 0; i < chunks && startIndex < size; i++, startIndex += chunkSize) {
|
||||
long delay = (i + 1) * 200;
|
||||
int endIndex = Math.min(size, startIndex + chunkSize);
|
||||
List<ProtectedStorageEntry> subList = processDelayedItems.subList(startIndex, endIndex);
|
||||
UserThread.runAfter(() -> {
|
||||
subList.stream().forEach(protectedStorageEntry -> dataStorage.add(protectedStorageEntry, sender, null, false, false));
|
||||
}, delay, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
cleanup();
|
||||
listener.onComplete();
|
||||
} else {
|
||||
log.debug("Nonce not matching. That can happen rarely if we get a response after a canceled " +
|
||||
"handshake (timeout causes connection close but peer might have sent a msg before " +
|
||||
"connection was closed).\n\t" +
|
||||
"We drop that message. nonce={} / requestNonce={}",
|
||||
nonce, getDataResponse.requestNonce);
|
||||
}
|
||||
|
||||
cleanup();
|
||||
listener.onComplete();
|
||||
} else {
|
||||
log.debug("Nonce not matching. That can happen rarely if we get a response after a canceled " +
|
||||
"handshake (timeout causes connection close but peer might have sent a msg before " +
|
||||
"connection was closed).\n\t" +
|
||||
"We drop that message. nonce={} / requestNonce={}",
|
||||
nonce, getDataResponse.requestNonce);
|
||||
log.warn("We have stopped already. We ignore that onDataRequest call.");
|
||||
}
|
||||
} else {
|
||||
log.warn("We have stopped already. We ignore that onDataRequest call.");
|
||||
}
|
||||
} else {
|
||||
log.trace("We got a message from another connection and ignore it.");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void stop() {
|
||||
cleanup();
|
||||
}
|
||||
@ -258,8 +263,7 @@ public class RequestDataHandler implements MessageListener {
|
||||
private void cleanup() {
|
||||
Log.traceCall();
|
||||
stopped = true;
|
||||
if (connection != null)
|
||||
connection.removeMessageListener(this);
|
||||
networkNode.removeMessageListener(this);
|
||||
stopTimeoutTimer();
|
||||
}
|
||||
|
||||
|
@ -24,6 +24,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
|
||||
|
||||
private static final long RETRY_DELAY_SEC = 10;
|
||||
private static final long CLEANUP_TIMER = 120;
|
||||
private boolean isPreliminaryDataRequest = true;
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
@ -101,6 +102,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
|
||||
Collections.shuffle(nodeAddresses);
|
||||
NodeAddress nextCandidate = nodeAddresses.get(0);
|
||||
nodeAddresses.remove(nextCandidate);
|
||||
isPreliminaryDataRequest = true;
|
||||
requestData(nextCandidate, nodeAddresses);
|
||||
}
|
||||
}
|
||||
@ -114,6 +116,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
|
||||
Collections.shuffle(remainingNodeAddresses);
|
||||
NodeAddress candidate = nodeAddressOfPreliminaryDataRequest.get();
|
||||
remainingNodeAddresses.remove(candidate);
|
||||
isPreliminaryDataRequest = false;
|
||||
requestData(candidate, remainingNodeAddresses);
|
||||
}
|
||||
}
|
||||
@ -301,7 +304,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
|
||||
}
|
||||
});
|
||||
handlerMap.put(nodeAddress, requestDataHandler);
|
||||
requestDataHandler.requestData(nodeAddress);
|
||||
requestDataHandler.requestData(nodeAddress, isPreliminaryDataRequest);
|
||||
} else {
|
||||
log.warn("We have started already a requestDataHandshake to peer. nodeAddress=" + nodeAddress + "\n" +
|
||||
"We start a cleanup timer if the handler has not closed by itself in between 2 minutes.");
|
||||
|
Loading…
Reference in New Issue
Block a user