[REFACTOR] Introduce buildGetDataResponse

This is just a strict move of code to reduce errors.
This commit is contained in:
Julian Knutsen 2019-11-20 17:55:41 -08:00
parent daffe6dc38
commit 944b3fffbc
No known key found for this signature in database
GPG key ID: D85F536DB3615B2D
2 changed files with 92 additions and 92 deletions

View file

@ -22,28 +22,16 @@ import bisq.network.p2p.network.Connection;
import bisq.network.p2p.network.NetworkNode;
import bisq.network.p2p.peers.getdata.messages.GetDataRequest;
import bisq.network.p2p.peers.getdata.messages.GetDataResponse;
import bisq.network.p2p.peers.getdata.messages.GetUpdatedDataRequest;
import bisq.network.p2p.storage.P2PDataStorage;
import bisq.network.p2p.storage.payload.CapabilityRequiringPayload;
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
import bisq.network.p2p.storage.payload.ProtectedStorageEntry;
import bisq.network.p2p.storage.payload.ProtectedStoragePayload;
import bisq.common.Timer;
import bisq.common.UserThread;
import bisq.common.util.Utilities;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
@ -51,7 +39,6 @@ import org.jetbrains.annotations.NotNull;
@Slf4j
public class GetDataRequestHandler {
private static final long TIMEOUT = 90;
private static final int MAX_ENTRIES = 10000;
///////////////////////////////////////////////////////////////////////////////////////////
@ -93,10 +80,7 @@ public class GetDataRequestHandler {
public void handle(GetDataRequest getDataRequest, final Connection connection) {
long ts = System.currentTimeMillis();
GetDataResponse getDataResponse = new GetDataResponse(getFilteredProtectedStorageEntries(getDataRequest, connection),
getFilteredPersistableNetworkPayload(getDataRequest, connection),
getDataRequest.getNonce(),
getDataRequest instanceof GetUpdatedDataRequest);
GetDataResponse getDataResponse = dataStorage.buildGetDataResponse(getDataRequest, connection);
if (timeoutTimer == null) {
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
@ -136,81 +120,6 @@ public class GetDataRequestHandler {
log.info("handle GetDataRequest took {} ms", System.currentTimeMillis() - ts);
}
private Set<PersistableNetworkPayload> getFilteredPersistableNetworkPayload(GetDataRequest getDataRequest,
Connection connection) {
Set<P2PDataStorage.ByteArray> tempLookupSet = new HashSet<>();
String connectionInfo = "connectionInfo" + connection.getPeersNodeAddressOptional()
.map(e -> "node address " + e.getFullAddress())
.orElseGet(() -> "connection UID " + connection.getUid());
Set<P2PDataStorage.ByteArray> excludedKeysAsByteArray = P2PDataStorage.ByteArray.convertBytesSetToByteArraySet(getDataRequest.getExcludedKeys());
AtomicInteger maxSize = new AtomicInteger(MAX_ENTRIES);
Set<PersistableNetworkPayload> result = dataStorage.getAppendOnlyDataStoreMap().entrySet().stream()
.filter(e -> !excludedKeysAsByteArray.contains(e.getKey()))
.filter(e -> maxSize.decrementAndGet() >= 0)
.map(Map.Entry::getValue)
.filter(connection::noCapabilityRequiredOrCapabilityIsSupported)
.filter(payload -> {
boolean notContained = tempLookupSet.add(new P2PDataStorage.ByteArray(payload.getHash()));
return notContained;
})
.collect(Collectors.toSet());
if (maxSize.get() <= 0) {
log.warn("The getData request from peer with {} caused too much PersistableNetworkPayload " +
"entries to get delivered. We limited the entries for the response to {} entries",
connectionInfo, MAX_ENTRIES);
}
log.info("The getData request from peer with {} contains {} PersistableNetworkPayload entries ",
connectionInfo, result.size());
return result;
}
private Set<ProtectedStorageEntry> getFilteredProtectedStorageEntries(GetDataRequest getDataRequest,
Connection connection) {
Set<ProtectedStorageEntry> filteredDataSet = new HashSet<>();
Set<Integer> lookupSet = new HashSet<>();
String connectionInfo = "connectionInfo" + connection.getPeersNodeAddressOptional()
.map(e -> "node address " + e.getFullAddress())
.orElseGet(() -> "connection UID " + connection.getUid());
AtomicInteger maxSize = new AtomicInteger(MAX_ENTRIES);
Set<P2PDataStorage.ByteArray> excludedKeysAsByteArray = P2PDataStorage.ByteArray.convertBytesSetToByteArraySet(getDataRequest.getExcludedKeys());
Set<ProtectedStorageEntry> filteredSet = dataStorage.getMap().entrySet().stream()
.filter(e -> !excludedKeysAsByteArray.contains(e.getKey()))
.filter(e -> maxSize.decrementAndGet() >= 0)
.map(Map.Entry::getValue)
.collect(Collectors.toSet());
if (maxSize.get() <= 0) {
log.warn("The getData request from peer with {} caused too much ProtectedStorageEntry " +
"entries to get delivered. We limited the entries for the response to {} entries",
connectionInfo, MAX_ENTRIES);
}
log.info("getFilteredProtectedStorageEntries " + filteredSet.size());
for (ProtectedStorageEntry protectedStorageEntry : filteredSet) {
final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
boolean doAdd = false;
if (protectedStoragePayload instanceof CapabilityRequiringPayload) {
if (connection.getCapabilities().containsAll(((CapabilityRequiringPayload) protectedStoragePayload).getRequiredCapabilities()))
doAdd = true;
else
log.debug("We do not send the message to the peer because they do not support the required capability for that message type.\n" +
"storagePayload is: " + Utilities.toTruncatedString(protectedStoragePayload));
} else {
doAdd = true;
}
if (doAdd) {
boolean notContained = lookupSet.add(protectedStoragePayload.hashCode());
if (notContained)
filteredDataSet.add(protectedStorageEntry);
}
}
log.info("The getData request from peer with {} contains {} ProtectedStorageEntry entries ",
connectionInfo, filteredDataSet.size());
return filteredDataSet;
}
public void stop() {
cleanup();
}

View file

@ -25,6 +25,8 @@ import bisq.network.p2p.network.MessageListener;
import bisq.network.p2p.network.NetworkNode;
import bisq.network.p2p.peers.BroadcastHandler;
import bisq.network.p2p.peers.Broadcaster;
import bisq.network.p2p.peers.getdata.messages.GetDataRequest;
import bisq.network.p2p.peers.getdata.messages.GetDataResponse;
import bisq.network.p2p.peers.getdata.messages.GetUpdatedDataRequest;
import bisq.network.p2p.peers.getdata.messages.PreliminaryGetDataRequest;
import bisq.network.p2p.storage.messages.AddDataMessage;
@ -34,6 +36,7 @@ import bisq.network.p2p.storage.messages.BroadcastMessage;
import bisq.network.p2p.storage.messages.RefreshOfferMessage;
import bisq.network.p2p.storage.messages.RemoveDataMessage;
import bisq.network.p2p.storage.messages.RemoveMailboxDataMessage;
import bisq.network.p2p.storage.payload.CapabilityRequiringPayload;
import bisq.network.p2p.storage.payload.DateTolerantPayload;
import bisq.network.p2p.storage.payload.ExpirablePayload;
import bisq.network.p2p.storage.payload.MailboxStoragePayload;
@ -90,6 +93,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
@ -110,6 +114,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
@VisibleForTesting
public static int CHECK_TTL_INTERVAL_SEC = 60;
private static final int MAX_ENTRIES = 10000;
private final Broadcaster broadcaster;
private final AppendOnlyDataStoreService appendOnlyDataStoreService;
private final ProtectedDataStoreService protectedDataStoreService;
@ -219,6 +225,91 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
return excludedKeys;
}
/**
* Returns a GetDataResponse object that contains the Payloads known locally, but not remotely.
*/
public GetDataResponse buildGetDataResponse(GetDataRequest getDataRequest, Connection connection) {
return new GetDataResponse(getFilteredProtectedStorageEntries(getDataRequest, connection),
getFilteredPersistableNetworkPayload(getDataRequest, connection),
getDataRequest.getNonce(),
getDataRequest instanceof GetUpdatedDataRequest);
}
private Set<PersistableNetworkPayload> getFilteredPersistableNetworkPayload(GetDataRequest getDataRequest,
Connection connection) {
Set<P2PDataStorage.ByteArray> tempLookupSet = new HashSet<>();
String connectionInfo = "connectionInfo" + connection.getPeersNodeAddressOptional()
.map(e -> "node address " + e.getFullAddress())
.orElseGet(() -> "connection UID " + connection.getUid());
Set<P2PDataStorage.ByteArray> excludedKeysAsByteArray = P2PDataStorage.ByteArray.convertBytesSetToByteArraySet(getDataRequest.getExcludedKeys());
AtomicInteger maxSize = new AtomicInteger(MAX_ENTRIES);
Set<PersistableNetworkPayload> result = this.appendOnlyDataStoreService.getMap().entrySet().stream()
.filter(e -> !excludedKeysAsByteArray.contains(e.getKey()))
.filter(e -> maxSize.decrementAndGet() >= 0)
.map(Map.Entry::getValue)
.filter(connection::noCapabilityRequiredOrCapabilityIsSupported)
.filter(payload -> {
boolean notContained = tempLookupSet.add(new P2PDataStorage.ByteArray(payload.getHash()));
return notContained;
})
.collect(Collectors.toSet());
if (maxSize.get() <= 0) {
log.warn("The getData request from peer with {} caused too much PersistableNetworkPayload " +
"entries to get delivered. We limited the entries for the response to {} entries",
connectionInfo, MAX_ENTRIES);
}
log.info("The getData request from peer with {} contains {} PersistableNetworkPayload entries ",
connectionInfo, result.size());
return result;
}
private Set<ProtectedStorageEntry> getFilteredProtectedStorageEntries(GetDataRequest getDataRequest,
Connection connection) {
Set<ProtectedStorageEntry> filteredDataSet = new HashSet<>();
Set<Integer> lookupSet = new HashSet<>();
String connectionInfo = "connectionInfo" + connection.getPeersNodeAddressOptional()
.map(e -> "node address " + e.getFullAddress())
.orElseGet(() -> "connection UID " + connection.getUid());
AtomicInteger maxSize = new AtomicInteger(MAX_ENTRIES);
Set<P2PDataStorage.ByteArray> excludedKeysAsByteArray = P2PDataStorage.ByteArray.convertBytesSetToByteArraySet(getDataRequest.getExcludedKeys());
Set<ProtectedStorageEntry> filteredSet = this.map.entrySet().stream()
.filter(e -> !excludedKeysAsByteArray.contains(e.getKey()))
.filter(e -> maxSize.decrementAndGet() >= 0)
.map(Map.Entry::getValue)
.collect(Collectors.toSet());
if (maxSize.get() <= 0) {
log.warn("The getData request from peer with {} caused too much ProtectedStorageEntry " +
"entries to get delivered. We limited the entries for the response to {} entries",
connectionInfo, MAX_ENTRIES);
}
log.info("getFilteredProtectedStorageEntries " + filteredSet.size());
for (ProtectedStorageEntry protectedStorageEntry : filteredSet) {
final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
boolean doAdd = false;
if (protectedStoragePayload instanceof CapabilityRequiringPayload) {
if (connection.getCapabilities().containsAll(((CapabilityRequiringPayload) protectedStoragePayload).getRequiredCapabilities()))
doAdd = true;
else
log.debug("We do not send the message to the peer because they do not support the required capability for that message type.\n" +
"storagePayload is: " + Utilities.toTruncatedString(protectedStoragePayload));
} else {
doAdd = true;
}
if (doAdd) {
boolean notContained = lookupSet.add(protectedStoragePayload.hashCode());
if (notContained)
filteredDataSet.add(protectedStorageEntry);
}
}
log.info("The getData request from peer with {} contains {} ProtectedStorageEntry entries ",
connectionInfo, filteredDataSet.size());
return filteredDataSet;
}
///////////////////////////////////////////////////////////////////////////////////////////
// API