Merge pull request #6340 from chimp1984/improve_initial_data_load

Improve initial data load
This commit is contained in:
Christoph Atteneder 2022-10-13 09:56:38 +02:00 committed by GitHub
commit 8386c411e2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 38 additions and 14 deletions

View file

@ -43,7 +43,7 @@ import org.jetbrains.annotations.NotNull;
public class GetDataRequestHandler {
private static final long TIMEOUT = 180;
private static final int MAX_ENTRIES = 10000;
private static final int MAX_ENTRIES = 5000;
///////////////////////////////////////////////////////////////////////////////////////////
// Listener

View file

@ -73,7 +73,7 @@ class RequestDataHandler implements MessageListener {
///////////////////////////////////////////////////////////////////////////////////////////
public interface Listener {
void onComplete();
void onComplete(boolean wasTruncated);
@SuppressWarnings("UnusedParameters")
void onFault(String errorMessage, @SuppressWarnings("SameParameterValue") @Nullable Connection connection);
@ -201,8 +201,7 @@ class RequestDataHandler implements MessageListener {
connection.getPeersNodeAddressOptional().get());
cleanup();
listener.onComplete();
// firstRequest = false;
listener.onComplete(getDataResponse.isWasTruncated());
} else {
log.warn("Nonce not matching. That can happen rarely if we get a response after a canceled " +
"handshake (timeout causes connection close but peer might have sent a msg before " +

View file

@ -101,7 +101,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
private Timer retryTimer;
private boolean dataUpdateRequested;
private boolean stopped;
private int numRepeatedRequests = 0;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
@ -323,7 +323,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
RequestDataHandler requestDataHandler = new RequestDataHandler(networkNode, dataStorage, peerManager,
new RequestDataHandler.Listener() {
@Override
public void onComplete() {
public void onComplete(boolean wasTruncated) {
log.trace("RequestDataHandshake of outbound connection complete. nodeAddress={}",
nodeAddress);
stopRetryTimer();
@ -347,6 +347,17 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
}
checkNotNull(listener).onDataReceived();
if (wasTruncated) {
if (numRepeatedRequests < 10) {
log.info("DataResponse did not contain all data, so we repeat request until we got all data");
UserThread.runAfter(() -> requestData(nodeAddress, remainingNodeAddresses), 2);
} else {
log.info("DataResponse still did not contained all data but we requested already 10 times and stop now.");
}
} else {
log.info("DataResponse contained all data");
}
}
@Override
@ -387,6 +398,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
}
});
handlerMap.put(nodeAddress, requestDataHandler);
numRepeatedRequests++;
requestDataHandler.requestData(nodeAddress, isPreliminaryDataRequest);
} else {
log.warn("We have started already a requestDataHandshake to peer. nodeAddress=" + nodeAddress + "\n" +

View file

@ -56,14 +56,19 @@ public final class GetDataResponse extends NetworkEnvelope implements SupportedC
private final boolean isGetUpdatedDataResponse;
private final Capabilities supportedCapabilities;
// Added at v1.9.6
private final boolean wasTruncated;
public GetDataResponse(@NotNull Set<ProtectedStorageEntry> dataSet,
@NotNull Set<PersistableNetworkPayload> persistableNetworkPayloadSet,
int requestNonce,
boolean isGetUpdatedDataResponse) {
boolean isGetUpdatedDataResponse,
boolean wasTruncated) {
this(dataSet,
persistableNetworkPayloadSet,
requestNonce,
isGetUpdatedDataResponse,
wasTruncated,
Capabilities.app,
Version.getP2PMessageVersion());
}
@ -76,6 +81,7 @@ public final class GetDataResponse extends NetworkEnvelope implements SupportedC
@NotNull Set<PersistableNetworkPayload> persistableNetworkPayloadSet,
int requestNonce,
boolean isGetUpdatedDataResponse,
boolean wasTruncated,
@NotNull Capabilities supportedCapabilities,
int messageVersion) {
super(messageVersion);
@ -84,6 +90,7 @@ public final class GetDataResponse extends NetworkEnvelope implements SupportedC
this.persistableNetworkPayloadSet = persistableNetworkPayloadSet;
this.requestNonce = requestNonce;
this.isGetUpdatedDataResponse = isGetUpdatedDataResponse;
this.wasTruncated = wasTruncated;
this.supportedCapabilities = supportedCapabilities;
}
@ -105,6 +112,7 @@ public final class GetDataResponse extends NetworkEnvelope implements SupportedC
.collect(Collectors.toList()))
.setRequestNonce(requestNonce)
.setIsGetUpdatedDataResponse(isGetUpdatedDataResponse)
.setWasTruncated(wasTruncated)
.addAllSupportedCapabilities(Capabilities.toIntList(supportedCapabilities));
protobuf.NetworkEnvelope proto = getNetworkEnvelopeBuilder()
@ -126,6 +134,7 @@ public final class GetDataResponse extends NetworkEnvelope implements SupportedC
persistableNetworkPayloadSet,
proto.getRequestNonce(),
proto.getIsGetUpdatedDataResponse(),
proto.getWasTruncated(),
Capabilities.fromIntList(proto.getSupportedCapabilitiesList()),
messageVersion);
}

View file

@ -349,11 +349,13 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
.map(e -> get32ByteHashAsByteArray((e.getProtectedStoragePayload())))
.toArray());
boolean wasTruncated = wasPersistableNetworkPayloadsTruncated.get() || wasProtectedStorageEntriesTruncated.get();
return new GetDataResponse(
filteredProtectedStorageEntries,
filteredPersistableNetworkPayloads,
getDataRequest.getNonce(),
getDataRequest instanceof GetUpdatedDataRequest);
getDataRequest instanceof GetUpdatedDataRequest,
wasTruncated);
}
@ -428,6 +430,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
int fromIndex = dateSortedTruncatablePayloads.size() - maxItems;
int toIndex = dateSortedTruncatablePayloads.size();
dateSortedTruncatablePayloads = dateSortedTruncatablePayloads.subList(fromIndex, toIndex);
outTruncated.set(true);
log.info("Num truncated dateSortedTruncatablePayloads {}", dateSortedTruncatablePayloads.size());
}
}
@ -509,12 +512,11 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
// performance issues.
// Processing 82645 items took now 61 ms compared to earlier version where it took ages (> 2min).
// Usually we only get about a few hundred or max. a few 1000 items. 82645 is all
// trade stats stats and all account age witness data.
// trade stats and all account age witness data.
// We only apply it once from first response
if (!initialRequestApplied) {
if (!initialRequestApplied || getDataResponse.isWasTruncated()) {
addPersistableNetworkPayloadFromInitialRequest(e);
}
} else {
// We don't broadcast here as we are only connected to the seed node and would be pointless

View file

@ -22,8 +22,8 @@ import bisq.network.p2p.TestUtils;
import bisq.network.p2p.peers.getdata.messages.GetDataResponse;
import bisq.network.p2p.storage.mocks.PersistableNetworkPayloadStub;
import bisq.network.p2p.storage.mocks.ProtectedStoragePayloadStub;
import bisq.network.p2p.storage.payload.ProcessOncePersistableNetworkPayload;
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
import bisq.network.p2p.storage.payload.ProcessOncePersistableNetworkPayload;
import bisq.network.p2p.storage.payload.ProtectedStorageEntry;
import bisq.network.p2p.storage.payload.ProtectedStoragePayload;
@ -34,6 +34,8 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import org.mockito.MockitoAnnotations;
import org.junit.Before;
import org.junit.Test;
@ -41,8 +43,6 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.mockito.MockitoAnnotations;
public class P2PDataStorageProcessGetDataResponse {
private TestState testState;
@ -71,6 +71,7 @@ public class P2PDataStorageProcessGetDataResponse {
new HashSet<>(protectedStorageEntries),
new HashSet<>(persistableNetworkPayloads),
1,
false,
false);
}

View file

@ -118,6 +118,7 @@ message GetDataResponse {
repeated StorageEntryWrapper data_set = 3;
repeated int32 supported_capabilities = 4;
repeated PersistableNetworkPayload persistable_network_payload_items = 5;
bool was_truncated = 6;
}
message GetUpdatedDataRequest {