PeerGroup: migrate start/stop to CompletableFuture

This will need to be rebased, paying special attention to merging
ListenableCompletableFuture.of()
This commit is contained in:
Sean Gilligan 2022-02-23 14:35:57 -08:00 committed by Andreas Schildbach
parent 974086636c
commit d3e7f854b9
3 changed files with 41 additions and 20 deletions

View File

@ -1024,7 +1024,7 @@ public class PeerGroup implements TransactionBroadcaster {
* Starts the PeerGroup and begins network activity.
* @return A future that completes when first connection activity has been triggered (note: not first connection made).
*/
public ListenableFuture startAsync() {
public ListenableCompletableFuture<Void> startAsync() {
// This is run in a background thread by the Service implementation.
if (chain == null) {
// Just try to help catch what might be a programming error.
@ -1035,7 +1035,7 @@ public class PeerGroup implements TransactionBroadcaster {
vUsedUp = true;
executorStartupLatch.countDown();
// We do blocking waits during startup, so run on the executor thread.
return executor.submit(() -> {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
log.info("Starting ...");
channels.startAsync();
@ -1045,18 +1045,19 @@ public class PeerGroup implements TransactionBroadcaster {
} catch (Throwable e) {
log.error("Exception when starting up", e); // The executor swallows exceptions :(
}
});
}, executor);
return ListenableCompletableFuture.of(future);
}
/** Does a blocking startup. */
public void start() {
Futures.getUnchecked(startAsync());
startAsync().join();
}
public ListenableFuture stopAsync() {
public ListenableCompletableFuture<Void> stopAsync() {
checkState(vRunning);
vRunning = false;
ListenableFuture future = executor.submit(() -> {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
log.info("Stopping ...");
Stopwatch watch = Stopwatch.createStarted();
@ -1073,9 +1074,9 @@ public class PeerGroup implements TransactionBroadcaster {
} catch (Throwable e) {
log.error("Exception when shutting down", e); // The executor swallows exceptions :(
}
});
}, executor);
executor.shutdown();
return future;
return ListenableCompletableFuture.of(future);
}
/** Does a blocking stop */

View File

@ -20,9 +20,6 @@ package org.bitcoinj.kits;
import com.google.common.collect.*;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import org.bitcoinj.core.listeners.*;
import org.bitcoinj.core.*;
import org.bitcoinj.crypto.DeterministicKey;
@ -359,19 +356,14 @@ public class WalletAppKit extends AbstractIdleService {
vPeerGroup.startBlockChainDownload(listener);
listener.await();
} else {
Futures.addCallback(vPeerGroup.startAsync(), new FutureCallback() {
@Override
public void onSuccess(@Nullable Object result) {
vPeerGroup.startAsync().whenComplete((result, t) -> {
if (t == null) {
final DownloadProgressTracker l = downloadListener == null ? new DownloadProgressTracker() : downloadListener;
vPeerGroup.startBlockChainDownload(l);
}
@Override
public void onFailure(Throwable t) {
} else {
throw new RuntimeException(t);
}
}, MoreExecutors.directExecutor());
});
}
} catch (BlockStoreException e) {
throw new IOException(e);

View File

@ -22,4 +22,32 @@ import java.util.concurrent.CompletableFuture;
* from Guava {@code ListenableFuture} to {@link CompletableFuture}.
*/
public class ListenableCompletableFuture<V> extends CompletableFuture<V> implements ListenableCompletionStage<V> {
/**
* Converts a generic {@link CompletableFuture} to a {@code ListenableCompletableFuture}. If the passed
* in future is already a {@code ListenableCompletableFuture} no conversion is performed.
* <p>
* When the migration to {@link CompletableFuture} is finished usages of this method
* can simply be removed as the conversion will no longer be required.
* @param future A CompletableFuture that may need to be converted
* @param <T> the type of the futures return value
* @return A ListenableCompletableFuture
*/
public static <T> ListenableCompletableFuture<T> of(CompletableFuture<T> future) {
ListenableCompletableFuture<T> listenable;
if (future instanceof ListenableCompletableFuture) {
listenable = (ListenableCompletableFuture<T>) future;
} else {
listenable = new ListenableCompletableFuture<>();
future.whenComplete((val, ex) -> {
// We can't test for a null val, because of the CompletableFuture<Void> special case.
if (ex == null) {
listenable.complete(val);
} else {
listenable.completeExceptionally(ex);
}
});
}
return listenable;
}
}