mirror of
https://github.com/bisq-network/bisq.git
synced 2025-01-19 05:44:05 +01:00
Persist map for removed mailbox messages (AddOncePayload more generally)
We add the date when we add the hash so that we can remove expired data.
This commit is contained in:
parent
6146362d71
commit
32f887478b
@ -46,6 +46,7 @@ import bisq.core.user.UserPayload;
|
||||
import bisq.network.p2p.mailbox.IgnoredMailboxMap;
|
||||
import bisq.network.p2p.mailbox.MailboxMessageList;
|
||||
import bisq.network.p2p.peers.peerexchange.PeerList;
|
||||
import bisq.network.p2p.storage.RemovedPayloadsMap;
|
||||
import bisq.network.p2p.storage.persistence.SequenceNumberMap;
|
||||
|
||||
import bisq.common.proto.ProtobufferRuntimeException;
|
||||
@ -135,6 +136,8 @@ public class CorePersistenceProtoResolver extends CoreProtoResolver implements P
|
||||
return MailboxMessageList.fromProto(proto.getMailboxMessageList(), networkProtoResolver);
|
||||
case IGNORED_MAILBOX_MAP:
|
||||
return IgnoredMailboxMap.fromProto(proto.getIgnoredMailboxMap());
|
||||
case REMOVED_PAYLOADS_MAP:
|
||||
return RemovedPayloadsMap.fromProto(proto.getRemovedPayloadsMap());
|
||||
default:
|
||||
throw new ProtobufferRuntimeException("Unknown proto message case(PB.PersistableEnvelope). " +
|
||||
"messageCase=" + proto.getMessageCase() + "; proto raw data=" + proto.toString());
|
||||
|
@ -39,6 +39,7 @@ import bisq.network.p2p.mailbox.IgnoredMailboxService;
|
||||
import bisq.network.p2p.mailbox.MailboxMessageService;
|
||||
import bisq.network.p2p.peers.PeerManager;
|
||||
import bisq.network.p2p.storage.P2PDataStorage;
|
||||
import bisq.network.p2p.storage.RemovedPayloadsStorageService;
|
||||
|
||||
import bisq.common.config.Config;
|
||||
import bisq.common.proto.persistable.PersistedDataHost;
|
||||
@ -70,6 +71,7 @@ public class CorePersistedDataHost {
|
||||
persistedDataHosts.add(injector.getInstance(PeerManager.class));
|
||||
persistedDataHosts.add(injector.getInstance(MailboxMessageService.class));
|
||||
persistedDataHosts.add(injector.getInstance(IgnoredMailboxService.class));
|
||||
persistedDataHosts.add(injector.getInstance(RemovedPayloadsStorageService.class));
|
||||
|
||||
if (injector.getInstance(Config.class).daoActivated) {
|
||||
persistedDataHosts.add(injector.getInstance(BallotListService.class));
|
||||
|
@ -133,8 +133,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
||||
|
||||
@Getter
|
||||
private final Map<ByteArray, ProtectedStorageEntry> map = new ConcurrentHashMap<>();
|
||||
//todo
|
||||
private final Set<ByteArray> removedAddOncePayloads = new HashSet<>();
|
||||
private final Set<HashMapChangedListener> hashMapChangedListeners = new CopyOnWriteArraySet<>();
|
||||
private Timer removeExpiredEntriesTimer;
|
||||
|
||||
@ -144,6 +142,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
||||
final SequenceNumberMap sequenceNumberMap = new SequenceNumberMap();
|
||||
|
||||
private final Set<AppendOnlyDataStoreListener> appendOnlyDataStoreListeners = new CopyOnWriteArraySet<>();
|
||||
private final RemovedPayloadsStorageService removedPayloadsStorageService;
|
||||
private final Clock clock;
|
||||
|
||||
/// The maximum number of items that must exist in the SequenceNumberMap before it is scheduled for a purge
|
||||
@ -165,6 +164,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
||||
ProtectedDataStoreService protectedDataStoreService,
|
||||
ResourceDataStoreService resourceDataStoreService,
|
||||
PersistenceManager<SequenceNumberMap> persistenceManager,
|
||||
RemovedPayloadsStorageService removedPayloadsStorageService,
|
||||
Clock clock,
|
||||
@Named("MAX_SEQUENCE_NUMBER_MAP_SIZE_BEFORE_PURGE") int maxSequenceNumberBeforePurge) {
|
||||
this.broadcaster = broadcaster;
|
||||
@ -172,6 +172,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
||||
this.protectedDataStoreService = protectedDataStoreService;
|
||||
this.resourceDataStoreService = resourceDataStoreService;
|
||||
this.persistenceManager = persistenceManager;
|
||||
this.removedPayloadsStorageService = removedPayloadsStorageService;
|
||||
this.clock = clock;
|
||||
this.maxSequenceNumberMapSizeBeforePurge = maxSequenceNumberBeforePurge;
|
||||
|
||||
@ -770,7 +771,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
||||
|
||||
public boolean addOncePayloadGotAlreadyRemoved(ProtectedStoragePayload protectedStoragePayload,
|
||||
ByteArray hashOfPayload) {
|
||||
return protectedStoragePayload instanceof AddOncePayload && removedAddOncePayloads.contains(hashOfPayload);
|
||||
return protectedStoragePayload instanceof AddOncePayload && removedPayloadsStorageService.wasRemoved(hashOfPayload);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -853,8 +854,9 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
||||
requestPersistence();
|
||||
|
||||
// Update that we have seen this AddOncePayload so the next time it is seen it fails verification
|
||||
if (protectedStoragePayload instanceof AddOncePayload)
|
||||
removedAddOncePayloads.add(hashOfPayload);
|
||||
if (protectedStoragePayload instanceof AddOncePayload) {
|
||||
removedPayloadsStorageService.addHash(hashOfPayload);
|
||||
}
|
||||
|
||||
if (storedEntry != null) {
|
||||
// Valid remove entry, do the remove and signal listeners
|
||||
|
@ -0,0 +1,67 @@
|
||||
/*
|
||||
* This file is part of Bisq.
|
||||
*
|
||||
* Bisq is free software: you can redistribute it and/or modify it
|
||||
* under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or (at
|
||||
* your option) any later version.
|
||||
*
|
||||
* Bisq is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
|
||||
* License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package bisq.network.p2p.storage;
|
||||
|
||||
|
||||
import bisq.common.proto.persistable.PersistableEnvelope;
|
||||
import bisq.common.util.Utilities;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class RemovedPayloadsMap implements PersistableEnvelope {
|
||||
@Getter
|
||||
private final Map<P2PDataStorage.ByteArray, Long> dateByHashes;
|
||||
|
||||
public RemovedPayloadsMap() {
|
||||
this.dateByHashes = new HashMap<>();
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// PROTO BUFFER
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
private RemovedPayloadsMap(Map<P2PDataStorage.ByteArray, Long> dateByHashes) {
|
||||
this.dateByHashes = dateByHashes;
|
||||
}
|
||||
|
||||
// Protobuf map only supports strings or integers as key, but no bytes or complex object so we convert the
|
||||
// bytes to a hex string, otherwise we would need to make a extra value object to wrap it.
|
||||
@Override
|
||||
public protobuf.PersistableEnvelope toProtoMessage() {
|
||||
protobuf.RemovedPayloadsMap.Builder builder = protobuf.RemovedPayloadsMap.newBuilder()
|
||||
.putAllDateByHashes(dateByHashes.entrySet().stream()
|
||||
.collect(Collectors.toMap(e -> Utilities.encodeToHex(e.getKey().bytes),
|
||||
Map.Entry::getValue)));
|
||||
return protobuf.PersistableEnvelope.newBuilder()
|
||||
.setRemovedPayloadsMap(builder)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static RemovedPayloadsMap fromProto(protobuf.RemovedPayloadsMap proto) {
|
||||
Map<P2PDataStorage.ByteArray, Long> dateByHashes = proto.getDateByHashesMap().entrySet().stream()
|
||||
.collect(Collectors.toMap(e -> new P2PDataStorage.ByteArray(Utilities.decodeFromHex(e.getKey())),
|
||||
Map.Entry::getValue));
|
||||
return new RemovedPayloadsMap(dateByHashes);
|
||||
}
|
||||
}
|
@ -0,0 +1,69 @@
|
||||
/*
|
||||
* This file is part of Bisq.
|
||||
*
|
||||
* Bisq is free software: you can redistribute it and/or modify it
|
||||
* under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or (at
|
||||
* your option) any later version.
|
||||
*
|
||||
* Bisq is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
|
||||
* License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package bisq.network.p2p.storage;
|
||||
|
||||
import bisq.network.p2p.storage.payload.MailboxStoragePayload;
|
||||
|
||||
import bisq.common.persistence.PersistenceManager;
|
||||
import bisq.common.proto.persistable.PersistedDataHost;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Singleton;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class RemovedPayloadsStorageService implements PersistedDataHost {
|
||||
private final PersistenceManager<RemovedPayloadsMap> persistenceManager;
|
||||
private final RemovedPayloadsMap removedPayloadsMap = new RemovedPayloadsMap();
|
||||
|
||||
@Inject
|
||||
public RemovedPayloadsStorageService(PersistenceManager<RemovedPayloadsMap> persistenceManager) {
|
||||
this.persistenceManager = persistenceManager;
|
||||
|
||||
this.persistenceManager.initialize(removedPayloadsMap, PersistenceManager.Source.PRIVATE_LOW_PRIO);
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// PersistedDataHost
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@Override
|
||||
public void readPersisted(Runnable completeHandler) {
|
||||
long cutOffDate = System.currentTimeMillis() - MailboxStoragePayload.TTL;
|
||||
persistenceManager.readPersisted(persisted -> {
|
||||
persisted.getDateByHashes().entrySet().stream()
|
||||
.filter(e -> e.getValue() < cutOffDate)
|
||||
.forEach(e -> removedPayloadsMap.getDateByHashes().put(e.getKey(), e.getValue()));
|
||||
persistenceManager.requestPersistence();
|
||||
completeHandler.run();
|
||||
},
|
||||
completeHandler);
|
||||
}
|
||||
|
||||
public boolean wasRemoved(P2PDataStorage.ByteArray hashOfPayload) {
|
||||
return removedPayloadsMap.getDateByHashes().containsKey(hashOfPayload);
|
||||
}
|
||||
|
||||
public void addHash(P2PDataStorage.ByteArray hashOfPayload) {
|
||||
removedPayloadsMap.getDateByHashes().put(hashOfPayload, System.currentTimeMillis());
|
||||
persistenceManager.requestPersistence();
|
||||
}
|
||||
}
|
@ -577,6 +577,10 @@ message MailboxMessageList {
|
||||
repeated MailboxItem mailbox_item = 1;
|
||||
}
|
||||
|
||||
message RemovedPayloadsMap {
|
||||
map<string, uint64> date_by_hashes = 1;
|
||||
}
|
||||
|
||||
message IgnoredMailboxMap {
|
||||
map<string, uint64> data = 1;
|
||||
}
|
||||
@ -1220,6 +1224,7 @@ message PersistableEnvelope {
|
||||
TradeStatistics3Store trade_statistics3_store = 31;
|
||||
MailboxMessageList mailbox_message_list = 32;
|
||||
IgnoredMailboxMap ignored_mailbox_map = 33;
|
||||
RemovedPayloadsMap removed_payloads_map = 34;
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user