mirror of
https://github.com/bisq-network/bisq.git
synced 2025-02-24 15:10:44 +01:00
Merge pull request #4586 from chimp1984/reduce-initial-date-request
Reduce initial data request
This commit is contained in:
commit
30f44919fc
25 changed files with 485 additions and 201 deletions
|
@ -17,6 +17,9 @@
|
|||
|
||||
package bisq.common.app;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
|
@ -29,6 +32,11 @@ public class Version {
|
|||
// We use semantic versioning with major, minor and patch
|
||||
public static final String VERSION = "1.3.9";
|
||||
|
||||
/**
|
||||
* Holds a list of the versions of tagged resource files for optimizing the getData requests.
|
||||
*/
|
||||
public static final List<String> HISTORY = Arrays.asList("1.4.0");
|
||||
|
||||
public static int getMajorVersion(String version) {
|
||||
return getSubVersion(version, 0);
|
||||
}
|
||||
|
@ -45,7 +53,7 @@ public class Version {
|
|||
return isNewVersion(newVersion, VERSION);
|
||||
}
|
||||
|
||||
static boolean isNewVersion(String newVersion, String currentVersion) {
|
||||
public static boolean isNewVersion(String newVersion, String currentVersion) {
|
||||
if (newVersion.equals(currentVersion))
|
||||
return false;
|
||||
else if (getMajorVersion(newVersion) > getMajorVersion(currentVersion))
|
||||
|
|
|
@ -77,12 +77,17 @@ public class Storage<T extends PersistableEnvelope> {
|
|||
this.corruptedDatabaseFilesHandler = corruptedDatabaseFilesHandler;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public T getPersisted(String fileName) {
|
||||
return getPersisted(new File(dir, fileName));
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public T initAndGetPersistedWithFileName(String fileName, long delay) {
|
||||
this.fileName = fileName;
|
||||
storageFile = new File(dir, fileName);
|
||||
fileManager = new FileManager<>(dir, storageFile, delay, persistenceProtoResolver);
|
||||
return getPersisted();
|
||||
return getPersisted(storageFile);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
|
@ -96,7 +101,7 @@ public class Storage<T extends PersistableEnvelope> {
|
|||
this.fileName = fileName;
|
||||
storageFile = new File(dir, fileName);
|
||||
fileManager = new FileManager<>(dir, storageFile, delay, persistenceProtoResolver);
|
||||
return getPersisted();
|
||||
return getPersisted(storageFile);
|
||||
}
|
||||
|
||||
public void queueUpForSave() {
|
||||
|
@ -144,7 +149,7 @@ public class Storage<T extends PersistableEnvelope> {
|
|||
// We do the file read on the UI thread to avoid problems from multi threading.
|
||||
// Data are small and read is done only at startup, so it is no performance issue.
|
||||
@Nullable
|
||||
private T getPersisted() {
|
||||
private T getPersisted(File storageFile) {
|
||||
if (storageFile.exists()) {
|
||||
long now = System.currentTimeMillis();
|
||||
try {
|
||||
|
|
|
@ -19,18 +19,13 @@ package bisq.core.account.sign;
|
|||
|
||||
|
||||
import bisq.network.p2p.storage.P2PDataStorage;
|
||||
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
|
||||
|
||||
import bisq.common.proto.persistable.ThreadedPersistableEnvelope;
|
||||
import bisq.network.p2p.storage.persistence.PersistableNetworkPayloadStore;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
|
||||
|
@ -40,9 +35,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||
* definition and provide a hashMap for the domain access.
|
||||
*/
|
||||
@Slf4j
|
||||
public class SignedWitnessStore implements ThreadedPersistableEnvelope {
|
||||
@Getter
|
||||
private Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> map = new ConcurrentHashMap<>();
|
||||
public class SignedWitnessStore extends PersistableNetworkPayloadStore {
|
||||
|
||||
SignedWitnessStore() {
|
||||
}
|
||||
|
|
|
@ -18,18 +18,13 @@
|
|||
package bisq.core.account.witness;
|
||||
|
||||
import bisq.network.p2p.storage.P2PDataStorage;
|
||||
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
|
||||
|
||||
import bisq.common.proto.persistable.ThreadedPersistableEnvelope;
|
||||
import bisq.network.p2p.storage.persistence.PersistableNetworkPayloadStore;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
|
||||
|
@ -39,9 +34,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||
* definition and provide a hashMap for the domain access.
|
||||
*/
|
||||
@Slf4j
|
||||
public class AccountAgeWitnessStore implements ThreadedPersistableEnvelope {
|
||||
@Getter
|
||||
private Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> map = new ConcurrentHashMap<>();
|
||||
public class AccountAgeWitnessStore extends PersistableNetworkPayloadStore {
|
||||
|
||||
AccountAgeWitnessStore() {
|
||||
}
|
||||
|
|
|
@ -18,18 +18,13 @@
|
|||
package bisq.core.dao.governance.blindvote.storage;
|
||||
|
||||
import bisq.network.p2p.storage.P2PDataStorage;
|
||||
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
|
||||
|
||||
import bisq.common.proto.persistable.ThreadedPersistableEnvelope;
|
||||
import bisq.network.p2p.storage.persistence.PersistableNetworkPayloadStore;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
|
||||
|
@ -39,9 +34,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||
* definition and provide a hashMap for the domain access.
|
||||
*/
|
||||
@Slf4j
|
||||
public class BlindVoteStore implements ThreadedPersistableEnvelope {
|
||||
@Getter
|
||||
private Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> map = new ConcurrentHashMap<>();
|
||||
public class BlindVoteStore extends PersistableNetworkPayloadStore {
|
||||
|
||||
BlindVoteStore() {
|
||||
}
|
||||
|
|
|
@ -18,18 +18,13 @@
|
|||
package bisq.core.dao.governance.proposal.storage.appendonly;
|
||||
|
||||
import bisq.network.p2p.storage.P2PDataStorage;
|
||||
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
|
||||
|
||||
import bisq.common.proto.persistable.ThreadedPersistableEnvelope;
|
||||
import bisq.network.p2p.storage.persistence.PersistableNetworkPayloadStore;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
|
||||
|
@ -39,9 +34,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||
* definition and provide a hashMap for the domain access.
|
||||
*/
|
||||
@Slf4j
|
||||
public class ProposalStore implements ThreadedPersistableEnvelope {
|
||||
@Getter
|
||||
private Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> map = new ConcurrentHashMap<>();
|
||||
public class ProposalStore extends PersistableNetworkPayloadStore {
|
||||
|
||||
ProposalStore() {
|
||||
}
|
||||
|
|
|
@ -79,11 +79,12 @@ public class ProvidersRepository {
|
|||
fillProviderList();
|
||||
selectNextProviderBaseUrl();
|
||||
|
||||
if (bannedNodes == null)
|
||||
if (bannedNodes == null) {
|
||||
log.info("Selected provider baseUrl={}, providerList={}", baseUrl, providerList);
|
||||
else
|
||||
} else if (!bannedNodes.isEmpty()) {
|
||||
log.warn("We have banned provider nodes: bannedNodes={}, selected provider baseUrl={}, providerList={}",
|
||||
bannedNodes, baseUrl, providerList);
|
||||
}
|
||||
}
|
||||
|
||||
public void selectNextProviderBaseUrl() {
|
||||
|
|
|
@ -100,8 +100,11 @@ public abstract class DisputeAgentService<T extends DisputeAgent> {
|
|||
} else {
|
||||
bannedDisputeAgents = null;
|
||||
}
|
||||
if (bannedDisputeAgents != null)
|
||||
|
||||
if (bannedDisputeAgents != null && !bannedDisputeAgents.isEmpty()) {
|
||||
log.warn("bannedDisputeAgents=" + bannedDisputeAgents);
|
||||
}
|
||||
|
||||
Set<T> disputeAgentSet = getDisputeAgentSet(bannedDisputeAgents);
|
||||
|
||||
Map<NodeAddress, T> map = new HashMap<>();
|
||||
|
|
|
@ -24,9 +24,8 @@ import bisq.network.p2p.storage.persistence.MapStoreService;
|
|||
import bisq.common.config.Config;
|
||||
import bisq.common.storage.Storage;
|
||||
|
||||
import javax.inject.Named;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Named;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
|
@ -64,6 +63,10 @@ public class TradeStatistics2StorageService extends MapStoreService<TradeStatist
|
|||
return store.getMap();
|
||||
}
|
||||
|
||||
public Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> getMapOfAllData() {
|
||||
return getMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canHandle(PersistableNetworkPayload payload) {
|
||||
return payload instanceof TradeStatistics2;
|
||||
|
@ -78,9 +81,4 @@ public class TradeStatistics2StorageService extends MapStoreService<TradeStatist
|
|||
protected TradeStatistics2Store createStore() {
|
||||
return new TradeStatistics2Store();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void readStore() {
|
||||
super.readStore();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,18 +18,13 @@
|
|||
package bisq.core.trade.statistics;
|
||||
|
||||
import bisq.network.p2p.storage.P2PDataStorage;
|
||||
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
|
||||
|
||||
import bisq.common.proto.persistable.ThreadedPersistableEnvelope;
|
||||
import bisq.network.p2p.storage.persistence.PersistableNetworkPayloadStore;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
|
@ -38,9 +33,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||
* definition and provide a hashMap for the domain access.
|
||||
*/
|
||||
@Slf4j
|
||||
public class TradeStatistics2Store implements ThreadedPersistableEnvelope {
|
||||
@Getter
|
||||
private final Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> map = new ConcurrentHashMap<>();
|
||||
public class TradeStatistics2Store extends PersistableNetworkPayloadStore {
|
||||
|
||||
TradeStatistics2Store() {
|
||||
}
|
||||
|
|
|
@ -79,7 +79,7 @@ public class TradeStatisticsManager {
|
|||
addToSet((TradeStatistics2) payload);
|
||||
});
|
||||
|
||||
Set<TradeStatistics2> collect = tradeStatistics2StorageService.getMap().values().stream()
|
||||
Set<TradeStatistics2> set = tradeStatistics2StorageService.getMapOfAllData().values().stream()
|
||||
.filter(e -> e instanceof TradeStatistics2)
|
||||
.map(e -> (TradeStatistics2) e)
|
||||
.map(WrapperTradeStatistics2::new)
|
||||
|
@ -87,7 +87,7 @@ public class TradeStatisticsManager {
|
|||
.map(WrapperTradeStatistics2::unwrap)
|
||||
.filter(TradeStatistics2::isValid)
|
||||
.collect(Collectors.toSet());
|
||||
observableTradeStatisticsSet.addAll(collect);
|
||||
observableTradeStatisticsSet.addAll(set);
|
||||
|
||||
priceFeedService.applyLatestBisqMarketPrice(observableTradeStatisticsSet);
|
||||
|
||||
|
@ -99,7 +99,6 @@ public class TradeStatisticsManager {
|
|||
}
|
||||
|
||||
private void addToSet(TradeStatistics2 tradeStatistics) {
|
||||
|
||||
if (!observableTradeStatisticsSet.contains(tradeStatistics)) {
|
||||
Optional<TradeStatistics2> duplicate = observableTradeStatisticsSet.stream().filter(
|
||||
e -> e.getOfferId().equals(tradeStatistics.getOfferId())).findAny();
|
||||
|
|
|
@ -88,22 +88,22 @@ public class GetDataRequestHandler {
|
|||
.map(e -> "node address " + e.getFullAddress())
|
||||
.orElseGet(() -> "connection UID " + connection.getUid());
|
||||
|
||||
AtomicBoolean outPersistableNetworkPayloadOutputTruncated = new AtomicBoolean(false);
|
||||
AtomicBoolean outProtectedStoragePayloadOutputTruncated = new AtomicBoolean(false);
|
||||
AtomicBoolean wasPersistableNetworkPayloadsTruncated = new AtomicBoolean(false);
|
||||
AtomicBoolean wasProtectedStorageEntriesTruncated = new AtomicBoolean(false);
|
||||
GetDataResponse getDataResponse = dataStorage.buildGetDataResponse(
|
||||
getDataRequest,
|
||||
MAX_ENTRIES,
|
||||
outPersistableNetworkPayloadOutputTruncated,
|
||||
outProtectedStoragePayloadOutputTruncated,
|
||||
wasPersistableNetworkPayloadsTruncated,
|
||||
wasProtectedStorageEntriesTruncated,
|
||||
connection.getCapabilities());
|
||||
|
||||
if (outPersistableNetworkPayloadOutputTruncated.get()) {
|
||||
if (wasPersistableNetworkPayloadsTruncated.get()) {
|
||||
log.warn("The getData request from peer with {} caused too much PersistableNetworkPayload " +
|
||||
"entries to get delivered. We limited the entries for the response to {} entries",
|
||||
connectionInfo, MAX_ENTRIES);
|
||||
}
|
||||
|
||||
if (outProtectedStoragePayloadOutputTruncated.get()) {
|
||||
if (wasProtectedStorageEntriesTruncated.get()) {
|
||||
log.warn("The getData request from peer with {} caused too much ProtectedStorageEntry " +
|
||||
"entries to get delivered. We limited the entries for the response to {} entries",
|
||||
connectionInfo, MAX_ENTRIES);
|
||||
|
|
|
@ -260,15 +260,16 @@ class RequestDataHandler implements MessageListener {
|
|||
|
||||
// Log different data types
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("\n#################################################################\n");
|
||||
sb.append("Connected to node: " + peersNodeAddress.getFullAddress() + "\n");
|
||||
String sep = System.lineSeparator();
|
||||
sb.append(sep).append("#################################################################").append(sep);
|
||||
sb.append("Connected to node: ").append(peersNodeAddress.getFullAddress()).append(sep);
|
||||
int items = dataSet.size() + persistableNetworkPayloadSet.size();
|
||||
sb.append("Received ").append(items).append(" instances from a ")
|
||||
.append(getDataRequestType).append("\n");
|
||||
.append(getDataRequestType).append(sep);
|
||||
payloadByClassName.forEach((key, value) -> sb.append(key)
|
||||
.append(": ")
|
||||
.append(value.size())
|
||||
.append("\n"));
|
||||
.append(sep));
|
||||
sb.append("#################################################################");
|
||||
log.info(sb.toString());
|
||||
}
|
||||
|
|
|
@ -27,6 +27,8 @@ import lombok.EqualsAndHashCode;
|
|||
import lombok.Getter;
|
||||
import lombok.ToString;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@Getter
|
||||
@ToString
|
||||
|
@ -35,11 +37,18 @@ public abstract class GetDataRequest extends NetworkEnvelope implements Extended
|
|||
// Keys for ProtectedStorageEntry items to be excluded from the request because the peer has them already
|
||||
protected final Set<byte[]> excludedKeys;
|
||||
|
||||
// Added at v1.4.0
|
||||
// The version of the requester. Used for response to send potentially missing historical data
|
||||
@Nullable
|
||||
protected final String version;
|
||||
|
||||
public GetDataRequest(int messageVersion,
|
||||
int nonce,
|
||||
Set<byte[]> excludedKeys) {
|
||||
Set<byte[]> excludedKeys,
|
||||
@Nullable String version) {
|
||||
super(messageVersion);
|
||||
this.nonce = nonce;
|
||||
this.excludedKeys = excludedKeys;
|
||||
this.version = version;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import protobuf.NetworkEnvelope;
|
|||
|
||||
import com.google.protobuf.ByteString;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -34,7 +35,7 @@ import lombok.EqualsAndHashCode;
|
|||
import lombok.Value;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
@Slf4j
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
|
@ -48,6 +49,7 @@ public final class GetUpdatedDataRequest extends GetDataRequest implements Sende
|
|||
this(senderNodeAddress,
|
||||
nonce,
|
||||
excludedKeys,
|
||||
Version.VERSION,
|
||||
Version.getP2PMessageVersion());
|
||||
}
|
||||
|
||||
|
@ -59,35 +61,41 @@ public final class GetUpdatedDataRequest extends GetDataRequest implements Sende
|
|||
private GetUpdatedDataRequest(NodeAddress senderNodeAddress,
|
||||
int nonce,
|
||||
Set<byte[]> excludedKeys,
|
||||
@Nullable String version,
|
||||
int messageVersion) {
|
||||
super(messageVersion,
|
||||
nonce,
|
||||
excludedKeys);
|
||||
checkNotNull(senderNodeAddress, "senderNodeAddress must not be null at GetUpdatedDataRequest");
|
||||
excludedKeys,
|
||||
version);
|
||||
this.senderNodeAddress = senderNodeAddress;
|
||||
}
|
||||
|
||||
@Override
|
||||
public protobuf.NetworkEnvelope toProtoNetworkEnvelope() {
|
||||
final protobuf.GetUpdatedDataRequest.Builder builder = protobuf.GetUpdatedDataRequest.newBuilder()
|
||||
protobuf.GetUpdatedDataRequest.Builder builder = protobuf.GetUpdatedDataRequest.newBuilder()
|
||||
.setSenderNodeAddress(senderNodeAddress.toProtoMessage())
|
||||
.setNonce(nonce)
|
||||
.addAllExcludedKeys(excludedKeys.stream()
|
||||
.map(ByteString::copyFrom)
|
||||
.collect(Collectors.toList()));
|
||||
|
||||
Optional.ofNullable(version).ifPresent(builder::setVersion);
|
||||
NetworkEnvelope proto = getNetworkEnvelopeBuilder()
|
||||
.setGetUpdatedDataRequest(builder)
|
||||
.build();
|
||||
log.info("Sending a GetUpdatedDataRequest with {} kB", proto.getSerializedSize() / 1000d);
|
||||
log.info("Sending a GetUpdatedDataRequest with {} kB and {} excluded key entries. Requesters version={}",
|
||||
proto.getSerializedSize() / 1000d, excludedKeys.size(), version);
|
||||
return proto;
|
||||
}
|
||||
|
||||
public static GetUpdatedDataRequest fromProto(protobuf.GetUpdatedDataRequest proto, int messageVersion) {
|
||||
log.info("Received a GetUpdatedDataRequest with {} kB", proto.getSerializedSize() / 1000d);
|
||||
Set<byte[]> excludedKeys = ProtoUtil.byteSetFromProtoByteStringList(proto.getExcludedKeysList());
|
||||
String requestersVersion = ProtoUtil.stringOrNullFromProto(proto.getVersion());
|
||||
log.info("Received a GetUpdatedDataRequest with {} kB and {} excluded key entries. Requesters version={}",
|
||||
proto.getSerializedSize() / 1000d, excludedKeys.size(), requestersVersion);
|
||||
return new GetUpdatedDataRequest(NodeAddress.fromProto(proto.getSenderNodeAddress()),
|
||||
proto.getNonce(),
|
||||
ProtoUtil.byteSetFromProtoByteStringList(proto.getExcludedKeysList()),
|
||||
excludedKeys,
|
||||
requestersVersion,
|
||||
messageVersion);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import protobuf.NetworkEnvelope;
|
|||
|
||||
import com.google.protobuf.ByteString;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -35,7 +36,7 @@ import lombok.EqualsAndHashCode;
|
|||
import lombok.Value;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
@Slf4j
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
|
@ -43,9 +44,12 @@ import org.jetbrains.annotations.NotNull;
|
|||
public final class PreliminaryGetDataRequest extends GetDataRequest implements AnonymousMessage, SupportedCapabilitiesMessage {
|
||||
private final Capabilities supportedCapabilities;
|
||||
|
||||
public PreliminaryGetDataRequest(int nonce,
|
||||
@NotNull Set<byte[]> excludedKeys) {
|
||||
this(nonce, excludedKeys, Capabilities.app, Version.getP2PMessageVersion());
|
||||
public PreliminaryGetDataRequest(int nonce, Set<byte[]> excludedKeys) {
|
||||
this(nonce,
|
||||
excludedKeys,
|
||||
Version.VERSION,
|
||||
Capabilities.app,
|
||||
Version.getP2PMessageVersion());
|
||||
}
|
||||
|
||||
|
||||
|
@ -54,34 +58,40 @@ public final class PreliminaryGetDataRequest extends GetDataRequest implements A
|
|||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
private PreliminaryGetDataRequest(int nonce,
|
||||
@NotNull Set<byte[]> excludedKeys,
|
||||
@NotNull Capabilities supportedCapabilities,
|
||||
Set<byte[]> excludedKeys,
|
||||
@Nullable String version,
|
||||
Capabilities supportedCapabilities,
|
||||
int messageVersion) {
|
||||
super(messageVersion, nonce, excludedKeys);
|
||||
super(messageVersion, nonce, excludedKeys, version);
|
||||
|
||||
this.supportedCapabilities = supportedCapabilities;
|
||||
}
|
||||
|
||||
@Override
|
||||
public protobuf.NetworkEnvelope toProtoNetworkEnvelope() {
|
||||
final protobuf.PreliminaryGetDataRequest.Builder builder = protobuf.PreliminaryGetDataRequest.newBuilder()
|
||||
protobuf.PreliminaryGetDataRequest.Builder builder = protobuf.PreliminaryGetDataRequest.newBuilder()
|
||||
.addAllSupportedCapabilities(Capabilities.toIntList(supportedCapabilities))
|
||||
.setNonce(nonce)
|
||||
.addAllExcludedKeys(excludedKeys.stream()
|
||||
.map(ByteString::copyFrom)
|
||||
.collect(Collectors.toList()));
|
||||
|
||||
Optional.ofNullable(version).ifPresent(builder::setVersion);
|
||||
NetworkEnvelope proto = getNetworkEnvelopeBuilder()
|
||||
.setPreliminaryGetDataRequest(builder)
|
||||
.build();
|
||||
log.info("Sending a PreliminaryGetDataRequest with {} kB", proto.getSerializedSize() / 1000d);
|
||||
log.info("Sending a PreliminaryGetDataRequest with {} kB and {} excluded key entries. Requesters version={}",
|
||||
proto.getSerializedSize() / 1000d, excludedKeys.size(), version);
|
||||
return proto;
|
||||
}
|
||||
|
||||
public static PreliminaryGetDataRequest fromProto(protobuf.PreliminaryGetDataRequest proto, int messageVersion) {
|
||||
log.info("Received a PreliminaryGetDataRequest with {} kB", proto.getSerializedSize() / 1000d);
|
||||
Set<byte[]> excludedKeys = ProtoUtil.byteSetFromProtoByteStringList(proto.getExcludedKeysList());
|
||||
String requestersVersion = ProtoUtil.stringOrNullFromProto(proto.getVersion());
|
||||
log.info("Received a PreliminaryGetDataRequest with {} kB and {} excluded key entries. Requesters version={}",
|
||||
proto.getSerializedSize() / 1000d, excludedKeys.size(), requestersVersion);
|
||||
return new PreliminaryGetDataRequest(proto.getNonce(),
|
||||
ProtoUtil.byteSetFromProtoByteStringList(proto.getExcludedKeysList()),
|
||||
excludedKeys,
|
||||
requestersVersion,
|
||||
Capabilities.fromIntList(proto.getSupportedCapabilitiesList()),
|
||||
messageVersion);
|
||||
}
|
||||
|
|
|
@ -47,6 +47,8 @@ import bisq.network.p2p.storage.payload.ProtectedStoragePayload;
|
|||
import bisq.network.p2p.storage.payload.RequiresOwnerIsOnlinePayload;
|
||||
import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreListener;
|
||||
import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreService;
|
||||
import bisq.network.p2p.storage.persistence.HistoricalDataStoreService;
|
||||
import bisq.network.p2p.storage.persistence.PersistableNetworkPayloadStore;
|
||||
import bisq.network.p2p.storage.persistence.ProtectedDataStoreService;
|
||||
import bisq.network.p2p.storage.persistence.ResourceDataStoreService;
|
||||
import bisq.network.p2p.storage.persistence.SequenceNumberMap;
|
||||
|
@ -195,14 +197,14 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
|||
* Returns a PreliminaryGetDataRequest that can be sent to a peer node to request missing Payload data.
|
||||
*/
|
||||
public PreliminaryGetDataRequest buildPreliminaryGetDataRequest(int nonce) {
|
||||
return new PreliminaryGetDataRequest(nonce, this.getKnownPayloadHashes());
|
||||
return new PreliminaryGetDataRequest(nonce, getKnownPayloadHashes());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a GetUpdatedDataRequest that can be sent to a peer node to request missing Payload data.
|
||||
*/
|
||||
public GetUpdatedDataRequest buildGetUpdatedDataRequest(NodeAddress senderNodeAddress, int nonce) {
|
||||
return new GetUpdatedDataRequest(senderNodeAddress, nonce, this.getKnownPayloadHashes());
|
||||
return new GetUpdatedDataRequest(senderNodeAddress, nonce, getKnownPayloadHashes());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -213,78 +215,53 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
|||
// PersistedStoragePayload items don't get removed, so we don't have an issue with the case that
|
||||
// an object gets removed in between PreliminaryGetDataRequest and the GetUpdatedDataRequest and we would
|
||||
// miss that event if we do not load the full set or use some delta handling.
|
||||
Set<byte[]> excludedKeys = this.appendOnlyDataStoreService.getMap().keySet().stream()
|
||||
.map(e -> e.bytes)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
Set<byte[]> excludedKeysFromPersistedEntryMap = this.map.keySet()
|
||||
.stream()
|
||||
.map(e -> e.bytes)
|
||||
.collect(Collectors.toSet());
|
||||
Set<byte[]> excludedKeys = getKeysAsByteSet(getMapForDataRequest());
|
||||
Set<byte[]> excludedKeysFromPersistedEntryMap = getKeysAsByteSet(map);
|
||||
|
||||
excludedKeys.addAll(excludedKeysFromPersistedEntryMap);
|
||||
|
||||
return excludedKeys;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generic function that can be used to filter a Map<ByteArray, ProtectedStorageEntry || PersistableNetworkPayload>
|
||||
* by a given set of keys and peer capabilities.
|
||||
*/
|
||||
static private <T extends NetworkPayload> Set<T> filterKnownHashes(
|
||||
Map<ByteArray, T> toFilter,
|
||||
Function<T, ? extends NetworkPayload> objToPayload,
|
||||
Set<ByteArray> knownHashes,
|
||||
Capabilities peerCapabilities,
|
||||
int maxEntries,
|
||||
AtomicBoolean outTruncated) {
|
||||
|
||||
AtomicInteger limit = new AtomicInteger(maxEntries);
|
||||
|
||||
Set<T> filteredResults = toFilter.entrySet().stream()
|
||||
.filter(e -> !knownHashes.contains(e.getKey()))
|
||||
.filter(e -> limit.decrementAndGet() >= 0)
|
||||
.map(Map.Entry::getValue)
|
||||
.filter(networkPayload -> shouldTransmitPayloadToPeer(peerCapabilities,
|
||||
objToPayload.apply(networkPayload)))
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
if (limit.get() < 0)
|
||||
outTruncated.set(true);
|
||||
|
||||
return filteredResults;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a GetDataResponse object that contains the Payloads known locally, but not remotely.
|
||||
*/
|
||||
public GetDataResponse buildGetDataResponse(
|
||||
GetDataRequest getDataRequest,
|
||||
int maxEntriesPerType,
|
||||
AtomicBoolean outPersistableNetworkPayloadOutputTruncated,
|
||||
AtomicBoolean outProtectedStorageEntryOutputTruncated,
|
||||
AtomicBoolean wasPersistableNetworkPayloadsTruncated,
|
||||
AtomicBoolean wasProtectedStorageEntriesTruncated,
|
||||
Capabilities peerCapabilities) {
|
||||
|
||||
Set<P2PDataStorage.ByteArray> excludedKeysAsByteArray =
|
||||
P2PDataStorage.ByteArray.convertBytesSetToByteArraySet(getDataRequest.getExcludedKeys());
|
||||
|
||||
// Pre v 1.4.0 requests do not have set the requesters version field so it is null.
|
||||
// The methods in HistoricalDataStoreService will return all historical data in that case.
|
||||
// mapForDataResponse contains the filtered by version data from HistoricalDataStoreService as well as all other
|
||||
// maps of the remaining appendOnlyDataStoreServices.
|
||||
Map<ByteArray, PersistableNetworkPayload> mapForDataResponse = getMapForDataResponse(getDataRequest.getVersion());
|
||||
Set<PersistableNetworkPayload> filteredPersistableNetworkPayloads =
|
||||
filterKnownHashes(
|
||||
this.appendOnlyDataStoreService.getMap(),
|
||||
mapForDataResponse,
|
||||
Function.identity(),
|
||||
excludedKeysAsByteArray,
|
||||
peerCapabilities,
|
||||
maxEntriesPerType,
|
||||
outPersistableNetworkPayloadOutputTruncated);
|
||||
wasPersistableNetworkPayloadsTruncated);
|
||||
log.info("{} PersistableNetworkPayload entries remained after filtered by excluded keys. Original map had {} entries.",
|
||||
filteredPersistableNetworkPayloads.size(), mapForDataResponse.size());
|
||||
|
||||
Set<ProtectedStorageEntry> filteredProtectedStorageEntries =
|
||||
filterKnownHashes(
|
||||
this.map,
|
||||
map,
|
||||
ProtectedStorageEntry::getProtectedStoragePayload,
|
||||
excludedKeysAsByteArray,
|
||||
peerCapabilities,
|
||||
maxEntriesPerType,
|
||||
outProtectedStorageEntryOutputTruncated);
|
||||
wasProtectedStorageEntriesTruncated);
|
||||
log.info("{} ProtectedStorageEntry entries remained after filtered by excluded keys. Original map had {} entries.",
|
||||
filteredProtectedStorageEntries.size(), map.size());
|
||||
|
||||
return new GetDataResponse(
|
||||
filteredProtectedStorageEntries,
|
||||
|
@ -293,6 +270,84 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
|||
getDataRequest instanceof GetUpdatedDataRequest);
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Utils for collecting the exclude hashes
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
private Map<ByteArray, PersistableNetworkPayload> getMapForDataRequest() {
|
||||
Map<ByteArray, PersistableNetworkPayload> map = new HashMap<>();
|
||||
appendOnlyDataStoreService.getServices()
|
||||
.forEach(service -> {
|
||||
Map<ByteArray, PersistableNetworkPayload> serviceMap;
|
||||
if (service instanceof HistoricalDataStoreService) {
|
||||
var historicalDataStoreService = (HistoricalDataStoreService<? extends PersistableNetworkPayloadStore>) service;
|
||||
// As we add the version to our request we only use the live data. Eventually missing data will be
|
||||
// derived from the version.
|
||||
serviceMap = historicalDataStoreService.getMapOfLiveData();
|
||||
} else {
|
||||
serviceMap = service.getMap();
|
||||
}
|
||||
map.putAll(serviceMap);
|
||||
log.info("We added {} entries from {} to the excluded key set of our request",
|
||||
serviceMap.size(), service.getClass().getSimpleName());
|
||||
});
|
||||
return map;
|
||||
}
|
||||
|
||||
private Map<ByteArray, PersistableNetworkPayload> getMapForDataResponse(String requestersVersion) {
|
||||
Map<ByteArray, PersistableNetworkPayload> map = new HashMap<>();
|
||||
appendOnlyDataStoreService.getServices()
|
||||
.forEach(service -> {
|
||||
Map<ByteArray, PersistableNetworkPayload> serviceMap;
|
||||
if (service instanceof HistoricalDataStoreService) {
|
||||
var historicalDataStoreService = (HistoricalDataStoreService<? extends PersistableNetworkPayloadStore>) service;
|
||||
serviceMap = historicalDataStoreService.getMapSinceVersion(requestersVersion);
|
||||
} else {
|
||||
serviceMap = service.getMap();
|
||||
}
|
||||
map.putAll(serviceMap);
|
||||
log.info("We added {} entries from {} to be filtered by excluded keys",
|
||||
serviceMap.size(), service.getClass().getSimpleName());
|
||||
});
|
||||
return map;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generic function that can be used to filter a Map<ByteArray, ProtectedStorageEntry || PersistableNetworkPayload>
|
||||
* by a given set of keys and peer capabilities.
|
||||
*/
|
||||
static private <T extends NetworkPayload> Set<T> filterKnownHashes(
|
||||
Map<ByteArray, T> mapToFilter,
|
||||
Function<T, ? extends NetworkPayload> objToPayloadFunction,
|
||||
Set<ByteArray> knownHashes,
|
||||
Capabilities peerCapabilities,
|
||||
int maxEntries,
|
||||
AtomicBoolean wasTruncated) {
|
||||
|
||||
AtomicInteger limit = new AtomicInteger(maxEntries);
|
||||
|
||||
Set<T> filteredResults = mapToFilter.entrySet().stream()
|
||||
.filter(e -> !knownHashes.contains(e.getKey()))
|
||||
.filter(e -> limit.decrementAndGet() >= 0)
|
||||
.map(Map.Entry::getValue)
|
||||
.filter(networkPayload -> shouldTransmitPayloadToPeer(peerCapabilities,
|
||||
objToPayloadFunction.apply(networkPayload)))
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
if (limit.get() < 0) {
|
||||
wasTruncated.set(true);
|
||||
}
|
||||
|
||||
return filteredResults;
|
||||
}
|
||||
|
||||
private Set<byte[]> getKeysAsByteSet(Map<ByteArray, ? extends PersistablePayload> map) {
|
||||
return map.keySet().stream()
|
||||
.map(e -> e.bytes)
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if a Payload should be transmit to a peer given the peer's supported capabilities.
|
||||
*/
|
||||
|
@ -402,6 +457,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
|||
return appendOnlyDataStoreService.getMap();
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// MessageListener implementation
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -579,10 +635,11 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
|||
|
||||
// To avoid that expired data get stored and broadcast we check early for expire date.
|
||||
if (protectedStorageEntry.isExpired(clock)) {
|
||||
log.warn("We received an expired protectedStorageEntry from peer {}",
|
||||
sender != null ? sender.getFullAddress() : "sender is null");
|
||||
String peer = sender != null ? sender.getFullAddress() : "sender is null";
|
||||
log.warn("We received an expired protectedStorageEntry from peer {}. ProtectedStoragePayload={}",
|
||||
peer, protectedStorageEntry.getProtectedStoragePayload().getClass().getSimpleName());
|
||||
log.debug("Expired protectedStorageEntry from peer {}. getCreationTimeStamp={}, protectedStorageEntry={}",
|
||||
sender != null ? sender.getFullAddress() : "sender is null",
|
||||
peer,
|
||||
new Date(protectedStorageEntry.getCreationTimeStamp()),
|
||||
protectedStorageEntry);
|
||||
return false;
|
||||
|
@ -624,7 +681,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
|||
if (allowBroadcast)
|
||||
broadcaster.broadcast(new AddDataMessage(protectedStorageEntry), sender, listener);
|
||||
|
||||
// Persist ProtectedStorageEntrys carrying PersistablePayload payloads
|
||||
// Persist ProtectedStorageEntries carrying PersistablePayload payloads
|
||||
if (protectedStoragePayload instanceof PersistablePayload)
|
||||
protectedDataStoreService.put(hashOfPayload, protectedStorageEntry);
|
||||
|
||||
|
@ -733,37 +790,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
|||
return true;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This method must be called only from client code not from network messages! We omit the ownership checks
|
||||
* so we must apply it only if it comes from our trusted application code. It is used from client code which detects
|
||||
* that the domain object violates specific domain rules.
|
||||
* We could make it more generic by adding an Interface with a generic validation method.
|
||||
*
|
||||
* @param protectedStorageEntry The entry to be removed
|
||||
*/
|
||||
public void removeInvalidProtectedStorageEntry(ProtectedStorageEntry protectedStorageEntry) {
|
||||
log.warn("We remove an invalid protectedStorageEntry: {}", protectedStorageEntry);
|
||||
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
|
||||
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);
|
||||
|
||||
if (!map.containsKey(hashOfPayload)) {
|
||||
return;
|
||||
}
|
||||
|
||||
removeFromMapAndDataStore(protectedStorageEntry, hashOfPayload);
|
||||
|
||||
// We do not update the sequence number as that method is only called if we have received an invalid
|
||||
// protectedStorageEntry from a previous add operation.
|
||||
|
||||
// We do not call maybeAddToRemoveAddOncePayloads to avoid that an invalid object might block a valid object
|
||||
// which we might receive in future (could be potential attack).
|
||||
|
||||
// We do not broadcast as this is a local operation only to avoid our maps get polluted with invalid objects
|
||||
// and as we do not check for ownership a node would not accept such a procedure if it would come from untrusted
|
||||
// source (network).
|
||||
}
|
||||
|
||||
public ProtectedStorageEntry getProtectedStorageEntry(ProtectedStoragePayload protectedStoragePayload,
|
||||
KeyPair ownerStoragePubKey)
|
||||
throws CryptoException {
|
||||
|
@ -823,7 +849,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
|||
appendOnlyDataStoreListeners.add(listener);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public void removeAppendOnlyDataStoreListener(AppendOnlyDataStoreListener listener) {
|
||||
appendOnlyDataStoreListeners.remove(listener);
|
||||
}
|
||||
|
@ -1018,7 +1043,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
|||
// Util
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public String getHex() {
|
||||
return Utilities.encodeToHex(bytes);
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
|
@ -36,7 +37,8 @@ import lombok.extern.slf4j.Slf4j;
|
|||
*/
|
||||
@Slf4j
|
||||
public class AppendOnlyDataStoreService {
|
||||
private List<MapStoreService<? extends PersistableEnvelope, PersistableNetworkPayload>> services = new ArrayList<>();
|
||||
@Getter
|
||||
private final List<MapStoreService<? extends PersistableEnvelope, PersistableNetworkPayload>> services = new ArrayList<>();
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -0,0 +1,207 @@
|
|||
/*
|
||||
* This file is part of Bisq.
|
||||
*
|
||||
* Bisq is free software: you can redistribute it and/or modify it
|
||||
* under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or (at
|
||||
* your option) any later version.
|
||||
*
|
||||
* Bisq is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
|
||||
* License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package bisq.network.p2p.storage.persistence;
|
||||
|
||||
import bisq.network.p2p.storage.P2PDataStorage;
|
||||
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
|
||||
|
||||
import bisq.common.app.Version;
|
||||
import bisq.common.storage.Storage;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* Manages historical data stores tagged with the release versions.
|
||||
* New data is added to the default map in the store (live data). Historical data is created from resource files.
|
||||
* For initial data requests we only use the live data as the users version is sent with the
|
||||
* request so the responding (seed)node can figure out if we miss any of the historical data.
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class HistoricalDataStoreService<T extends PersistableNetworkPayloadStore> extends MapStoreService<T, PersistableNetworkPayload> {
|
||||
private ImmutableMap<String, PersistableNetworkPayloadStore> storesByVersion;
|
||||
// Cache to avoid that we have to recreate the historical data at each request
|
||||
private ImmutableMap<P2PDataStorage.ByteArray, PersistableNetworkPayload> allHistoricalPayloads;
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Constructor
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public HistoricalDataStoreService(File storageDir, Storage<T> storage) {
|
||||
super(storageDir, storage);
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// API
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// We give back a map of our live map and all historical maps newer than the requested version.
|
||||
// If requestersVersion is null we return all historical data.
|
||||
public Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> getMapSinceVersion(String requestersVersion) {
|
||||
// We add all our live data
|
||||
Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> result = new HashMap<>(store.getMap());
|
||||
|
||||
// If we have a store with a newer version than the requesters version we will add those as well.
|
||||
storesByVersion.entrySet().stream()
|
||||
.filter(entry -> {
|
||||
// Old nodes not sending the version will get delivered all data
|
||||
if (requestersVersion == null) {
|
||||
log.info("The requester did not send a version. This is expected for not updated nodes.");
|
||||
return true;
|
||||
}
|
||||
|
||||
// Otherwise we only add data if the requesters version is older then
|
||||
// the version of the particular store.
|
||||
String storeVersion = entry.getKey();
|
||||
boolean newVersion = Version.isNewVersion(storeVersion, requestersVersion);
|
||||
String details = newVersion ?
|
||||
"As our historical store is a newer version we add the data to our result map." :
|
||||
"As the requester version is not older as our historical store we do not " +
|
||||
"add the data to the result map.";
|
||||
log.info("The requester had version {}. Our historical data store has version {}.\n{}",
|
||||
requestersVersion, storeVersion, details);
|
||||
return newVersion;
|
||||
})
|
||||
.map(e -> e.getValue().getMap())
|
||||
.forEach(result::putAll);
|
||||
|
||||
log.info("We found {} entries since requesters version {}",
|
||||
result.size(), requestersVersion);
|
||||
return result;
|
||||
}
|
||||
|
||||
public Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> getMapOfLiveData() {
|
||||
return store.getMap();
|
||||
}
|
||||
|
||||
public Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> getMapOfAllData() {
|
||||
Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> result = new HashMap<>(getMapOfLiveData());
|
||||
result.putAll(allHistoricalPayloads);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// MapStoreService
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// TODO optimize so that callers to AppendOnlyDataStoreService are not invoking that often getMap
|
||||
// ProposalService is one of the main callers and could avoid it by using the ProposalStoreService directly
|
||||
// instead of AppendOnlyDataStoreService
|
||||
|
||||
// By default we return the live data only. This method should not be used by domain clients but rather the
|
||||
// custom methods getMapOfAllData, getMapOfLiveData or getMapSinceVersion
|
||||
@Override
|
||||
public Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> getMap() {
|
||||
return store.getMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void put(P2PDataStorage.ByteArray hash, PersistableNetworkPayload payload) {
|
||||
if (anyMapContainsKey(hash)) {
|
||||
return;
|
||||
}
|
||||
|
||||
getMapOfLiveData().put(hash, payload);
|
||||
persist();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PersistableNetworkPayload putIfAbsent(P2PDataStorage.ByteArray hash, PersistableNetworkPayload payload) {
|
||||
if (anyMapContainsKey(hash)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// We do not return the value from getMapOfLiveData().put as we checked before that it does not contain any value.
|
||||
// So it will be always null. We still keep the return type as we override the method from MapStoreService which
|
||||
// follow the Map.putIfAbsent signature.
|
||||
getMapOfLiveData().put(hash, payload);
|
||||
persist();
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void readFromResources(String postFix) {
|
||||
readStore();
|
||||
log.info("We have created the {} store for the live data and filled it with {} entries from the persisted data.",
|
||||
getFileName(), getMapOfLiveData().size());
|
||||
|
||||
// Now we add our historical data stores. As they are immutable after created we use an ImmutableMap
|
||||
ImmutableMap.Builder<P2PDataStorage.ByteArray, PersistableNetworkPayload> allHistoricalPayloadsBuilder = ImmutableMap.builder();
|
||||
ImmutableMap.Builder<String, PersistableNetworkPayloadStore> storesByVersionBuilder = ImmutableMap.builder();
|
||||
|
||||
Version.HISTORY.forEach(version -> readHistoricalStoreFromResources(version, postFix, allHistoricalPayloadsBuilder, storesByVersionBuilder));
|
||||
|
||||
allHistoricalPayloads = allHistoricalPayloadsBuilder.build();
|
||||
storesByVersion = storesByVersionBuilder.build();
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Private
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
private void readHistoricalStoreFromResources(String version,
|
||||
String postFix,
|
||||
ImmutableMap.Builder<P2PDataStorage.ByteArray, PersistableNetworkPayload> allHistoricalDataBuilder,
|
||||
ImmutableMap.Builder<String, PersistableNetworkPayloadStore> storesByVersionBuilder) {
|
||||
String fileName = getFileName() + "_" + version;
|
||||
boolean wasCreatedFromResources = makeFileFromResourceFile(fileName, postFix);
|
||||
|
||||
// If resource file does not exist we return null. We do not create a new store as it would never get filled.
|
||||
PersistableNetworkPayloadStore historicalStore = storage.getPersisted(fileName);
|
||||
if (historicalStore == null) {
|
||||
log.warn("Resource file with file name {} does not exits.", fileName);
|
||||
return;
|
||||
}
|
||||
|
||||
storesByVersionBuilder.put(version, historicalStore);
|
||||
allHistoricalDataBuilder.putAll(historicalStore.getMap());
|
||||
|
||||
if (wasCreatedFromResources) {
|
||||
pruneStore(historicalStore, version);
|
||||
}
|
||||
}
|
||||
|
||||
private void pruneStore(PersistableNetworkPayloadStore historicalStore, String version) {
|
||||
int preLive = getMapOfLiveData().size();
|
||||
getMapOfLiveData().keySet().removeAll(historicalStore.getMap().keySet());
|
||||
int postLive = getMapOfLiveData().size();
|
||||
if (preLive > postLive) {
|
||||
log.info("We pruned data from our live data store which are already contained in the historical data store with version {}. " +
|
||||
"The live map had {} entries before pruning and has {} entries afterwards.",
|
||||
version, preLive, postLive);
|
||||
} else {
|
||||
log.info("No pruning from historical data store with version {} was applied", version);
|
||||
}
|
||||
storage.queueUpForSave(store);
|
||||
}
|
||||
|
||||
private boolean anyMapContainsKey(P2PDataStorage.ByteArray hash) {
|
||||
return getMapOfLiveData().containsKey(hash) || allHistoricalPayloads.containsKey(hash);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* This file is part of Bisq.
|
||||
*
|
||||
* Bisq is free software: you can redistribute it and/or modify it
|
||||
* under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or (at
|
||||
* your option) any later version.
|
||||
*
|
||||
* Bisq is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
|
||||
* License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package bisq.network.p2p.storage.persistence;
|
||||
|
||||
import bisq.network.p2p.storage.P2PDataStorage;
|
||||
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
|
||||
|
||||
import bisq.common.proto.persistable.ThreadedPersistableEnvelope;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* Base class for store implementations using a map with a PersistableNetworkPayload
|
||||
* as the type of the map value.
|
||||
*/
|
||||
public abstract class PersistableNetworkPayloadStore implements ThreadedPersistableEnvelope {
|
||||
@Getter
|
||||
public Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> map = new ConcurrentHashMap<>();
|
||||
}
|
|
@ -81,33 +81,33 @@ public abstract class StoreService<T extends PersistableEnvelope> {
|
|||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
protected void readFromResources(String postFix) {
|
||||
makeFileFromResourceFile(postFix);
|
||||
String fileName = getFileName();
|
||||
makeFileFromResourceFile(fileName, postFix);
|
||||
try {
|
||||
readStore();
|
||||
} catch (Throwable t) {
|
||||
try {
|
||||
String fileName = getFileName();
|
||||
storage.removeAndBackupFile(fileName);
|
||||
} catch (IOException e) {
|
||||
log.error(e.toString());
|
||||
}
|
||||
makeFileFromResourceFile(postFix);
|
||||
makeFileFromResourceFile(fileName, postFix);
|
||||
readStore();
|
||||
}
|
||||
}
|
||||
|
||||
protected void makeFileFromResourceFile(String postFix) {
|
||||
final String fileName = getFileName();
|
||||
protected boolean makeFileFromResourceFile(String fileName, String postFix) {
|
||||
String resourceFileName = fileName + postFix;
|
||||
File dbDir = new File(absolutePathOfStorageDir);
|
||||
if (!dbDir.exists() && !dbDir.mkdir())
|
||||
log.warn("make dir failed.\ndbDir=" + dbDir.getAbsolutePath());
|
||||
|
||||
final File destinationFile = new File(Paths.get(absolutePathOfStorageDir, fileName).toString());
|
||||
File destinationFile = new File(Paths.get(absolutePathOfStorageDir, fileName).toString());
|
||||
if (!destinationFile.exists()) {
|
||||
try {
|
||||
log.info("We copy resource to file: resourceFileName={}, destinationFile={}", resourceFileName, destinationFile);
|
||||
FileUtil.resourceToFile(resourceFileName, destinationFile);
|
||||
return true;
|
||||
} catch (ResourceNotFoundException e) {
|
||||
log.info("Could not find resourceFile " + resourceFileName + ". That is expected if none is provided yet.");
|
||||
} catch (Throwable e) {
|
||||
|
@ -116,14 +116,13 @@ public abstract class StoreService<T extends PersistableEnvelope> {
|
|||
e.printStackTrace();
|
||||
}
|
||||
} else {
|
||||
log.debug(fileName + " file exists already.");
|
||||
log.info("No resource file was copied. {} exists already.", fileName);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
protected void readStore() {
|
||||
final String fileName = getFileName();
|
||||
store = storage.initAndGetPersistedWithFileName(fileName, 100);
|
||||
protected T getStore(String fileName) {
|
||||
T store = storage.initAndGetPersistedWithFileName(fileName, 100);
|
||||
if (store != null) {
|
||||
log.info("{}: size of {}: {} MB", this.getClass().getSimpleName(),
|
||||
storage.getClass().getSimpleName(),
|
||||
|
@ -131,6 +130,11 @@ public abstract class StoreService<T extends PersistableEnvelope> {
|
|||
} else {
|
||||
store = createStore();
|
||||
}
|
||||
return store;
|
||||
}
|
||||
|
||||
protected void readStore() {
|
||||
store = getStore(getFileName());
|
||||
}
|
||||
|
||||
protected abstract T createStore();
|
||||
|
|
|
@ -44,6 +44,9 @@ import java.util.HashSet;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -53,11 +56,6 @@ import static org.mockito.Mockito.mock;
|
|||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.withSettings;
|
||||
|
||||
|
||||
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
public class P2PDataStorageBuildGetDataResponseTest {
|
||||
abstract static class P2PDataStorageBuildGetDataResponseTestBase {
|
||||
// GIVEN null & non-null supportedCapabilities
|
||||
|
@ -240,7 +238,10 @@ public class P2PDataStorageBuildGetDataResponseTest {
|
|||
Assert.assertEquals(getDataRequest instanceof GetUpdatedDataRequest, getDataResponse.isGetUpdatedDataResponse());
|
||||
Assert.assertEquals(getDataResponse.getSupportedCapabilities(), Capabilities.app);
|
||||
Assert.assertEquals(1, getDataResponse.getPersistableNetworkPayloadSet().size());
|
||||
Assert.assertTrue(getDataResponse.getPersistableNetworkPayloadSet().contains(onlyLocal1));
|
||||
Set<PersistableNetworkPayload> persistableNetworkPayloadSet = getDataResponse.getPersistableNetworkPayloadSet();
|
||||
|
||||
// We use a set at the filter so it is not deterministic which item get truncated
|
||||
Assert.assertEquals(1, persistableNetworkPayloadSet.size());
|
||||
Assert.assertTrue(getDataResponse.getDataSet().isEmpty());
|
||||
}
|
||||
|
||||
|
|
|
@ -35,13 +35,15 @@ import java.security.NoSuchAlgorithmException;
|
|||
|
||||
import java.util.Set;
|
||||
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
import static org.mockito.Mockito.*;
|
||||
import static org.mockito.Mockito.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class P2PDataStorageRequestDataTest {
|
||||
private TestState testState;
|
||||
|
|
|
@ -21,7 +21,6 @@ import bisq.network.p2p.storage.P2PDataStorage;
|
|||
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
|
||||
import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreService;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -31,17 +30,16 @@ import java.util.Map;
|
|||
* @see <a href="https://martinfowler.com/articles/mocksArentStubs.html#TheDifferenceBetweenMocksAndStubs">Reference</a>
|
||||
*/
|
||||
public class AppendOnlyDataStoreServiceFake extends AppendOnlyDataStoreService {
|
||||
private final Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> map;
|
||||
|
||||
public AppendOnlyDataStoreServiceFake() {
|
||||
map = new HashMap<>();
|
||||
addService(new MapStoreServiceFake());
|
||||
}
|
||||
|
||||
public Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> getMap() {
|
||||
return map;
|
||||
return super.getMap();
|
||||
}
|
||||
|
||||
public void put(P2PDataStorage.ByteArray hashAsByteArray, PersistableNetworkPayload payload) {
|
||||
map.put(hashAsByteArray, payload);
|
||||
super.put(hashAsByteArray, payload);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -92,9 +92,10 @@ message BundleOfEnvelopes {
|
|||
// get data
|
||||
|
||||
message PreliminaryGetDataRequest {
|
||||
int32 nonce = 21;
|
||||
int32 nonce = 21; // This was set to 21 instead of 1 in some old commit so we cannot change it.
|
||||
repeated bytes excluded_keys = 2;
|
||||
repeated int32 supported_capabilities = 3;
|
||||
string version = 4;
|
||||
}
|
||||
|
||||
message GetDataResponse {
|
||||
|
@ -109,6 +110,7 @@ message GetUpdatedDataRequest {
|
|||
NodeAddress sender_node_address = 1;
|
||||
int32 nonce = 2;
|
||||
repeated bytes excluded_keys = 3;
|
||||
string version = 4;
|
||||
}
|
||||
|
||||
// peers
|
||||
|
|
Loading…
Add table
Reference in a new issue