Add additional guards to ensure that the shut down routine is not called multiple times

(which can happen in rare cases) and add guards that we never create multiple instances
for a given file as well not call initialize or other API methods after shutdown was started.
This commit is contained in:
chimp1984 2020-11-13 14:08:01 -05:00
parent b9f49f75d3
commit a64c11ebe4
No known key found for this signature in database
GPG Key ID: 9801B4EC591F90E3

View File

@ -44,6 +44,7 @@ import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -80,15 +81,29 @@ public class PersistenceManager<T extends PersistableEnvelope> {
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public static final Map<String, PersistenceManager<?>> ALL_PERSISTENCE_MANAGERS = new HashMap<>(); public static final Map<String, PersistenceManager<?>> ALL_PERSISTENCE_MANAGERS = new HashMap<>();
public static boolean FLUSH_ALL_DATA_TO_DISK_CALLED = false;
// We don't know from which thread we are called so we map back to user thread
// We require being called only once from the global shutdown routine. As the shutdown routine has a timeout
// and error condition where we call the method as well beside the standard path and it could be that those
// alternative code paths call our method after it was called already, so it is a valid but rare case.
// We add a guard to prevent repeated calls.
public static void flushAllDataToDisk(ResultHandler completeHandler) { public static void flushAllDataToDisk(ResultHandler completeHandler) {
// We don't know from which thread we are called so we map to user thread
UserThread.execute(() -> {
if (FLUSH_ALL_DATA_TO_DISK_CALLED) {
log.warn("We got flushAllDataToDisk called again. This can happen in some rare cases. We ignore the repeated call.");
return;
}
FLUSH_ALL_DATA_TO_DISK_CALLED = true;
log.info("Start flushAllDataToDisk at shutdown"); log.info("Start flushAllDataToDisk at shutdown");
AtomicInteger openInstances = new AtomicInteger(ALL_PERSISTENCE_MANAGERS.size()); AtomicInteger openInstances = new AtomicInteger(ALL_PERSISTENCE_MANAGERS.size());
if (openInstances.get() == 0) { if (openInstances.get() == 0) {
log.info("flushAllDataToDisk completed"); log.info("No PersistenceManager instances have been created yet.");
UserThread.execute(completeHandler::handleResult); completeHandler.handleResult();
} }
new HashSet<>(ALL_PERSISTENCE_MANAGERS.values()).forEach(persistenceManager -> { new HashSet<>(ALL_PERSISTENCE_MANAGERS.values()).forEach(persistenceManager -> {
@ -96,27 +111,29 @@ public class PersistenceManager<T extends PersistableEnvelope> {
// a requestPersistence call after an important state update. Those are usually rather small data stores. // a requestPersistence call after an important state update. Those are usually rather small data stores.
// Otherwise we only persist if requestPersistence was called since the last persist call. // Otherwise we only persist if requestPersistence was called since the last persist call.
if (persistenceManager.source.flushAtShutDown || persistenceManager.persistenceRequested) { if (persistenceManager.source.flushAtShutDown || persistenceManager.persistenceRequested) {
// We don't know from which thread we are called so we map back to user thread when calling persistNow
UserThread.execute(() -> {
// We always get our completeHandler called even if exceptions happen. In case a file write fails // We always get our completeHandler called even if exceptions happen. In case a file write fails
// we still call our shutdown and count down routine as the completeHandler is triggered in any case. // we still call our shutdown and count down routine as the completeHandler is triggered in any case.
// We get our result handler called from the write thread so we map back to user thread.
persistenceManager.persistNow(() -> persistenceManager.persistNow(() ->
onWriteCompleted(completeHandler, openInstances, persistenceManager)); UserThread.execute(() -> onWriteCompleted(completeHandler, openInstances, persistenceManager)));
});
} else { } else {
onWriteCompleted(completeHandler, openInstances, persistenceManager); onWriteCompleted(completeHandler, openInstances, persistenceManager);
} }
}); });
});
} }
// We get called always from user thread here.
private static void onWriteCompleted(ResultHandler completeHandler, private static void onWriteCompleted(ResultHandler completeHandler,
AtomicInteger openInstances, AtomicInteger openInstances,
PersistenceManager<?> persistenceManager) { PersistenceManager<?> persistenceManager) {
persistenceManager.shutdown(); persistenceManager.shutdown();
if (openInstances.decrementAndGet() == 0) { if (openInstances.decrementAndGet() == 0) {
log.info("flushAllDataToDisk completed"); log.info("flushAllDataToDisk completed");
UserThread.execute(completeHandler::handleResult); completeHandler.handleResult();
} }
} }
@ -166,6 +183,7 @@ public class PersistenceManager<T extends PersistableEnvelope> {
@Nullable @Nullable
private Timer timer; private Timer timer;
private ExecutorService writeToDiskExecutor; private ExecutorService writeToDiskExecutor;
public final AtomicBoolean initCalled = new AtomicBoolean(false);
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -190,6 +208,27 @@ public class PersistenceManager<T extends PersistableEnvelope> {
} }
public void initialize(T persistable, String fileName, Source source) { public void initialize(T persistable, String fileName, Source source) {
if (FLUSH_ALL_DATA_TO_DISK_CALLED) {
log.warn("We have started the shut down routine already. We ignore that initialize call.");
return;
}
if (ALL_PERSISTENCE_MANAGERS.containsKey(fileName)) {
RuntimeException runtimeException = new RuntimeException("We must not create multiple " +
"PersistenceManager instances for file " + fileName + ".");
// We want to get logged from where we have been called so lets print the stack trace.
runtimeException.printStackTrace();
throw runtimeException;
}
if (initCalled.get()) {
RuntimeException runtimeException = new RuntimeException("We must not call initialize multiple times. " +
"PersistenceManager for file: " + fileName + ".");
// We want to get logged from where we have been called so lets print the stack trace.
runtimeException.printStackTrace();
throw runtimeException;
}
this.persistable = persistable; this.persistable = persistable;
this.fileName = fileName; this.fileName = fileName;
this.source = source; this.source = source;
@ -233,6 +272,11 @@ public class PersistenceManager<T extends PersistableEnvelope> {
* @param orElse Called if no file exists or reading of file failed. * @param orElse Called if no file exists or reading of file failed.
*/ */
public void readPersisted(String fileName, Consumer<T> resultHandler, Runnable orElse) { public void readPersisted(String fileName, Consumer<T> resultHandler, Runnable orElse) {
if (FLUSH_ALL_DATA_TO_DISK_CALLED) {
log.warn("We have started the shut down routine already. We ignore that readPersisted call.");
return;
}
new Thread(() -> { new Thread(() -> {
T persisted = getPersisted(fileName); T persisted = getPersisted(fileName);
if (persisted != null) { if (persisted != null) {
@ -252,6 +296,11 @@ public class PersistenceManager<T extends PersistableEnvelope> {
@Nullable @Nullable
public T getPersisted(String fileName) { public T getPersisted(String fileName) {
if (FLUSH_ALL_DATA_TO_DISK_CALLED) {
log.warn("We have started the shut down routine already. We ignore that getPersisted call.");
return null;
}
File storageFile = new File(dir, fileName); File storageFile = new File(dir, fileName);
if (!storageFile.exists()) { if (!storageFile.exists()) {
return null; return null;
@ -288,6 +337,11 @@ public class PersistenceManager<T extends PersistableEnvelope> {
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public void requestPersistence() { public void requestPersistence() {
if (FLUSH_ALL_DATA_TO_DISK_CALLED) {
log.warn("We have started the shut down routine already. We ignore that requestPersistence call.");
return;
}
persistenceRequested = true; persistenceRequested = true;
// We write to disk with a delay to avoid frequent write operations. Depending on the priority those delays // We write to disk with a delay to avoid frequent write operations. Depending on the priority those delays