Merge pull request #4804 from chimp1984/improve-persistence-manager

Improve persistence manager
This commit is contained in:
sqrrm 2020-11-14 22:30:09 +01:00 committed by GitHub
commit c1287ac43a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -44,6 +44,7 @@ import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -80,15 +81,29 @@ public class PersistenceManager<T extends PersistableEnvelope> {
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public static final Map<String, PersistenceManager<?>> ALL_PERSISTENCE_MANAGERS = new HashMap<>(); public static final Map<String, PersistenceManager<?>> ALL_PERSISTENCE_MANAGERS = new HashMap<>();
public static boolean FLUSH_ALL_DATA_TO_DISK_CALLED = false;
// We don't know from which thread we are called so we map back to user thread
// We require being called only once from the global shutdown routine. As the shutdown routine has a timeout
// and error condition where we call the method as well beside the standard path and it could be that those
// alternative code paths call our method after it was called already, so it is a valid but rare case.
// We add a guard to prevent repeated calls.
public static void flushAllDataToDisk(ResultHandler completeHandler) { public static void flushAllDataToDisk(ResultHandler completeHandler) {
// We don't know from which thread we are called so we map to user thread
UserThread.execute(() -> {
if (FLUSH_ALL_DATA_TO_DISK_CALLED) {
log.warn("We got flushAllDataToDisk called again. This can happen in some rare cases. We ignore the repeated call.");
return;
}
FLUSH_ALL_DATA_TO_DISK_CALLED = true;
log.info("Start flushAllDataToDisk at shutdown"); log.info("Start flushAllDataToDisk at shutdown");
AtomicInteger openInstances = new AtomicInteger(ALL_PERSISTENCE_MANAGERS.size()); AtomicInteger openInstances = new AtomicInteger(ALL_PERSISTENCE_MANAGERS.size());
if (openInstances.get() == 0) { if (openInstances.get() == 0) {
log.info("flushAllDataToDisk completed"); log.info("No PersistenceManager instances have been created yet.");
UserThread.execute(completeHandler::handleResult); completeHandler.handleResult();
} }
new HashSet<>(ALL_PERSISTENCE_MANAGERS.values()).forEach(persistenceManager -> { new HashSet<>(ALL_PERSISTENCE_MANAGERS.values()).forEach(persistenceManager -> {
@ -96,27 +111,29 @@ public class PersistenceManager<T extends PersistableEnvelope> {
// a requestPersistence call after an important state update. Those are usually rather small data stores. // a requestPersistence call after an important state update. Those are usually rather small data stores.
// Otherwise we only persist if requestPersistence was called since the last persist call. // Otherwise we only persist if requestPersistence was called since the last persist call.
if (persistenceManager.source.flushAtShutDown || persistenceManager.persistenceRequested) { if (persistenceManager.source.flushAtShutDown || persistenceManager.persistenceRequested) {
// We don't know from which thread we are called so we map back to user thread when calling persistNow
UserThread.execute(() -> {
// We always get our completeHandler called even if exceptions happen. In case a file write fails // We always get our completeHandler called even if exceptions happen. In case a file write fails
// we still call our shutdown and count down routine as the completeHandler is triggered in any case. // we still call our shutdown and count down routine as the completeHandler is triggered in any case.
// We get our result handler called from the write thread so we map back to user thread.
persistenceManager.persistNow(() -> persistenceManager.persistNow(() ->
onWriteCompleted(completeHandler, openInstances, persistenceManager)); UserThread.execute(() -> onWriteCompleted(completeHandler, openInstances, persistenceManager)));
});
} else { } else {
onWriteCompleted(completeHandler, openInstances, persistenceManager); onWriteCompleted(completeHandler, openInstances, persistenceManager);
} }
}); });
});
} }
// We get called always from user thread here.
private static void onWriteCompleted(ResultHandler completeHandler, private static void onWriteCompleted(ResultHandler completeHandler,
AtomicInteger openInstances, AtomicInteger openInstances,
PersistenceManager<?> persistenceManager) { PersistenceManager<?> persistenceManager) {
persistenceManager.shutdown(); persistenceManager.shutdown();
if (openInstances.decrementAndGet() == 0) { if (openInstances.decrementAndGet() == 0) {
log.info("flushAllDataToDisk completed"); log.info("flushAllDataToDisk completed");
UserThread.execute(completeHandler::handleResult); completeHandler.handleResult();
} }
} }
@ -166,6 +183,7 @@ public class PersistenceManager<T extends PersistableEnvelope> {
@Nullable @Nullable
private Timer timer; private Timer timer;
private ExecutorService writeToDiskExecutor; private ExecutorService writeToDiskExecutor;
public final AtomicBoolean initCalled = new AtomicBoolean(false);
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -190,6 +208,29 @@ public class PersistenceManager<T extends PersistableEnvelope> {
} }
public void initialize(T persistable, String fileName, Source source) { public void initialize(T persistable, String fileName, Source source) {
if (FLUSH_ALL_DATA_TO_DISK_CALLED) {
log.warn("We have started the shut down routine already. We ignore that initialize call.");
return;
}
if (ALL_PERSISTENCE_MANAGERS.containsKey(fileName)) {
RuntimeException runtimeException = new RuntimeException("We must not create multiple " +
"PersistenceManager instances for file " + fileName + ".");
// We want to get logged from where we have been called so lets print the stack trace.
runtimeException.printStackTrace();
throw runtimeException;
}
if (initCalled.get()) {
RuntimeException runtimeException = new RuntimeException("We must not call initialize multiple times. " +
"PersistenceManager for file: " + fileName + ".");
// We want to get logged from where we have been called so lets print the stack trace.
runtimeException.printStackTrace();
throw runtimeException;
}
initCalled.set(true);
this.persistable = persistable; this.persistable = persistable;
this.fileName = fileName; this.fileName = fileName;
this.source = source; this.source = source;
@ -233,6 +274,11 @@ public class PersistenceManager<T extends PersistableEnvelope> {
* @param orElse Called if no file exists or reading of file failed. * @param orElse Called if no file exists or reading of file failed.
*/ */
public void readPersisted(String fileName, Consumer<T> resultHandler, Runnable orElse) { public void readPersisted(String fileName, Consumer<T> resultHandler, Runnable orElse) {
if (FLUSH_ALL_DATA_TO_DISK_CALLED) {
log.warn("We have started the shut down routine already. We ignore that readPersisted call.");
return;
}
new Thread(() -> { new Thread(() -> {
T persisted = getPersisted(fileName); T persisted = getPersisted(fileName);
if (persisted != null) { if (persisted != null) {
@ -252,6 +298,11 @@ public class PersistenceManager<T extends PersistableEnvelope> {
@Nullable @Nullable
public T getPersisted(String fileName) { public T getPersisted(String fileName) {
if (FLUSH_ALL_DATA_TO_DISK_CALLED) {
log.warn("We have started the shut down routine already. We ignore that getPersisted call.");
return null;
}
File storageFile = new File(dir, fileName); File storageFile = new File(dir, fileName);
if (!storageFile.exists()) { if (!storageFile.exists()) {
return null; return null;
@ -288,6 +339,11 @@ public class PersistenceManager<T extends PersistableEnvelope> {
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public void requestPersistence() { public void requestPersistence() {
if (FLUSH_ALL_DATA_TO_DISK_CALLED) {
log.warn("We have started the shut down routine already. We ignore that requestPersistence call.");
return;
}
persistenceRequested = true; persistenceRequested = true;
// We write to disk with a delay to avoid frequent write operations. Depending on the priority those delays // We write to disk with a delay to avoid frequent write operations. Depending on the priority those delays