Add support for multiple clients calling the setup method.

Signed-off-by: HenrikJannsen <boilingfrog@gmx.com>
This commit is contained in:
HenrikJannsen 2022-11-21 18:57:00 -05:00
parent e2f7c4e103
commit a551a41e5a
No known key found for this signature in database
GPG key ID: 02AA2BAE387C8307

View file

@ -56,6 +56,8 @@ import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@ -91,6 +93,9 @@ public class RpcService {
// Keep that for optimization after measuring performance differences
private final ListeningExecutorService executor = Utilities.getSingleThreadListeningExecutor("RpcService");
private volatile boolean isShutDown;
private final Set<ResultHandler> setupResultHandlers = new CopyOnWriteArraySet<>();
private final Set<Consumer<Throwable>> setupErrorHandlers = new CopyOnWriteArraySet<>();
private volatile boolean setupComplete;
///////////////////////////////////////////////////////////////////////////////////////////
@ -139,7 +144,20 @@ public class RpcService {
executor.shutdown();
}
void setup(ResultHandler resultHandler, Consumer<Throwable> errorHandler) {
public void setup(ResultHandler resultHandler, Consumer<Throwable> errorHandler) {
if (setupComplete) {
// Setup got already called and has finished.
resultHandler.handleResult();
return;
} else {
setupResultHandlers.add(resultHandler);
setupErrorHandlers.add(errorHandler);
if (setupResultHandlers.size() > 1) {
// Setup got already called but has not finished yet.
return;
}
}
try {
ListenableFuture<Void> future = executor.submit(() -> {
try {
@ -159,7 +177,11 @@ public class RpcService {
daemon = new BitcoindDaemon(rpcBlockHost, rpcBlockPort, throwable -> {
log.error(throwable.toString());
throwable.printStackTrace();
UserThread.execute(() -> errorHandler.accept(new RpcException(throwable)));
UserThread.execute(() -> {
setupErrorHandlers.forEach(handler -> handler.accept(new RpcException(throwable)));
setupErrorHandlers.clear();
setupResultHandlers.clear();
});
});
log.info("Setup took {} ms", System.currentTimeMillis() - startTs);
@ -173,11 +195,20 @@ public class RpcService {
Futures.addCallback(future, new FutureCallback<>() {
public void onSuccess(Void ignore) {
UserThread.execute(resultHandler::handleResult);
setupComplete = true;
UserThread.execute(() -> {
setupResultHandlers.forEach(ResultHandler::handleResult);
setupResultHandlers.clear();
setupErrorHandlers.clear();
});
}
public void onFailure(@NotNull Throwable throwable) {
UserThread.execute(() -> errorHandler.accept(throwable));
UserThread.execute(() -> {
setupErrorHandlers.forEach(handler -> handler.accept(throwable));
setupErrorHandlers.clear();
setupResultHandlers.clear();
});
}
}, MoreExecutors.directExecutor());
} catch (Exception e) {