Use PersistenceManager

This commit is contained in:
chimp1984 2020-10-01 19:17:50 -05:00
parent 406afa9dad
commit fae1cda701
No known key found for this signature in database
GPG key ID: 9801B4EC591F90E3

View file

@ -59,11 +59,11 @@ import bisq.common.app.Capabilities;
import bisq.common.crypto.CryptoException;
import bisq.common.crypto.Hash;
import bisq.common.crypto.Sig;
import bisq.common.persistence.PersistenceManager;
import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.proto.network.NetworkPayload;
import bisq.common.proto.persistable.PersistablePayload;
import bisq.common.proto.persistable.PersistedDataHost;
import bisq.common.storage.Storage;
import bisq.common.util.Hex;
import bisq.common.util.Tuple2;
import bisq.common.util.Utilities;
@ -131,7 +131,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
private final Set<HashMapChangedListener> hashMapChangedListeners = new CopyOnWriteArraySet<>();
private Timer removeExpiredEntriesTimer;
private final Storage<SequenceNumberMap> sequenceNumberMapStorage;
private final PersistenceManager<SequenceNumberMap> persistenceManager;
@VisibleForTesting
final SequenceNumberMap sequenceNumberMap = new SequenceNumberMap();
@ -153,29 +153,28 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
AppendOnlyDataStoreService appendOnlyDataStoreService,
ProtectedDataStoreService protectedDataStoreService,
ResourceDataStoreService resourceDataStoreService,
Storage<SequenceNumberMap> sequenceNumberMapStorage,
PersistenceManager<SequenceNumberMap> persistenceManager,
Clock clock,
@Named("MAX_SEQUENCE_NUMBER_MAP_SIZE_BEFORE_PURGE") int maxSequenceNumberBeforePurge) {
this.broadcaster = broadcaster;
this.appendOnlyDataStoreService = appendOnlyDataStoreService;
this.protectedDataStoreService = protectedDataStoreService;
this.resourceDataStoreService = resourceDataStoreService;
this.persistenceManager = persistenceManager;
this.clock = clock;
this.maxSequenceNumberMapSizeBeforePurge = maxSequenceNumberBeforePurge;
networkNode.addMessageListener(this);
networkNode.addConnectionListener(this);
this.sequenceNumberMapStorage = sequenceNumberMapStorage;
sequenceNumberMapStorage.setNumMaxBackupFiles(5);
persistenceManager.initialize(sequenceNumberMap, PersistenceManager.Priority.LOW);
}
@Override
public void readPersisted() {
SequenceNumberMap persistedSequenceNumberMap = sequenceNumberMapStorage.initAndGetPersisted(sequenceNumberMap, 300);
if (persistedSequenceNumberMap != null)
sequenceNumberMap.setMap(getPurgedSequenceNumberMap(persistedSequenceNumberMap.getMap()));
SequenceNumberMap persisted = persistenceManager.getPersisted();
if (persisted != null)
sequenceNumberMap.setMap(getPurgedSequenceNumberMap(persisted.getMap()));
}
// This method is called at startup in a non-user thread.
@ -445,8 +444,10 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
});
removeFromMapAndDataStore(toRemoveList);
if (sequenceNumberMap.size() > this.maxSequenceNumberMapSizeBeforePurge)
if (sequenceNumberMap.size() > this.maxSequenceNumberMapSizeBeforePurge) {
sequenceNumberMap.setMap(getPurgedSequenceNumberMap(sequenceNumberMap.getMap()));
requestPersistence();
}
}
public void onBootstrapComplete() {
@ -675,7 +676,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
// Record the updated sequence number and persist it. Higher delay so we can batch more items.
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), this.clock.millis()));
sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 2000);
requestPersistence();
// Optionally, broadcast the add/update depending on the calling environment
if (allowBroadcast)
@ -729,7 +730,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
// Record the latest sequence number and persist it
sequenceNumberMap.put(hashOfPayload, new MapValue(updatedEntry.getSequenceNumber(), this.clock.millis()));
sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 1000);
requestPersistence();
// Always broadcast refreshes
broadcaster.broadcast(refreshTTLMessage, sender);
@ -765,7 +766,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
// Record the latest sequence number and persist it
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), this.clock.millis()));
sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 300);
requestPersistence();
// Update that we have seen this AddOncePayload so the next time it is seen it fails verification
if (protectedStoragePayload instanceof AddOncePayload)
@ -879,7 +880,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
if (protectedStoragePayload instanceof PersistablePayload) {
ProtectedStorageEntry previous = protectedDataStoreService.remove(hashOfPayload, protectedStorageEntry);
if (previous == null)
log.error("We cannot remove the protectedStorageEntry from the persistedEntryMap as it does not exist.");
log.warn("We cannot remove the protectedStorageEntry from the persistedEntryMap as it does not exist.");
}
});
@ -915,6 +916,10 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
}
}
private void requestPersistence() {
persistenceManager.requestPersistence();
}
public static ByteArray get32ByteHashAsByteArray(NetworkPayload data) {
return new ByteArray(P2PDataStorage.get32ByteHash(data));
}