Convert TransactionBroadcast, Wallet.doMaintenance() to CompletableFuture

* Use ListenableCompletableFuture on all public APIs
* Use CompletableFuture internally, in example, and in tests
* Add new FutureUtils class
* Add 2 new static methods to ListenableCompletableFuture
This commit is contained in:
Sean Gilligan 2022-02-23 14:01:26 -08:00 committed by Andreas Schildbach
parent 7caa1aee9a
commit 9db8b49c50
8 changed files with 136 additions and 66 deletions

View File

@ -2036,7 +2036,7 @@ public class PeerGroup implements TransactionBroadcaster {
* peers. Once all connected peers have announced the transaction, the future available via the
* {@link TransactionBroadcast#future()} method will be completed. If anything goes
* wrong the exception will be thrown when get() is called, or you can receive it via a callback on the
* {@link ListenableFuture}. This method returns immediately, so if you want it to block just call get() on the
* {@link ListenableCompletableFuture}. This method returns immediately, so if you want it to block just call get() on the
* result.</p>
*
* <p>Optionally, peers will be dropped after they have been used for broadcasting the transaction and they have
@ -2064,9 +2064,8 @@ public class PeerGroup implements TransactionBroadcaster {
broadcast.setMinConnections(minConnections);
broadcast.setDropPeersAfterBroadcast(dropPeersAfterBroadcast && tx.getConfidence().numBroadcastPeers() == 0);
// Send the TX to the wallet once we have a successful broadcast.
Futures.addCallback(broadcast.future(), new FutureCallback<Transaction>() {
@Override
public void onSuccess(Transaction transaction) {
broadcast.future().whenComplete((transaction, throwable) -> {
if (transaction != null) {
runningBroadcasts.remove(broadcast);
// OK, now tell the wallet about the transaction. If the wallet created the transaction then
// it already knows and will ignore this. If it's a transaction we received from
@ -2083,14 +2082,11 @@ public class PeerGroup implements TransactionBroadcaster {
throw new RuntimeException(e); // Cannot fail to verify a tx we created ourselves.
}
}
}
@Override
public void onFailure(Throwable throwable) {
} else {
// This can happen if we get a reject message from a peer.
runningBroadcasts.remove(broadcast);
}
}, MoreExecutors.directExecutor());
});
// Keep a reference to the TransactionBroadcast object. This is important because otherwise, the entire tree
// of objects we just created would become garbage if the user doesn't hold on to the returned future, and
// eventually be collected. This in turn could result in the transaction not being committed to the wallet

View File

@ -19,7 +19,6 @@ package org.bitcoinj.core;
import com.google.common.annotations.*;
import com.google.common.base.*;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import org.bitcoinj.utils.*;
import org.bitcoinj.wallet.Wallet;
@ -42,7 +41,7 @@ import org.bitcoinj.core.listeners.PreMessageReceivedEventListener;
public class TransactionBroadcast {
private static final Logger log = LoggerFactory.getLogger(TransactionBroadcast.class);
private final SettableFuture<Transaction> future = SettableFuture.create();
private final CompletableFuture<Transaction> future = new ListenableCompletableFuture<>();
private final PeerGroup peerGroup;
private final Transaction tx;
private int minConnections;
@ -69,22 +68,22 @@ public class TransactionBroadcast {
}
@VisibleForTesting
public static TransactionBroadcast createMockBroadcast(Transaction tx, final SettableFuture<Transaction> future) {
public static TransactionBroadcast createMockBroadcast(Transaction tx, final CompletableFuture<Transaction> future) {
return new TransactionBroadcast(tx) {
@Override
public ListenableFuture<Transaction> broadcast() {
return future;
public ListenableCompletableFuture<Transaction> broadcast() {
return ListenableCompletableFuture.of(future);
}
@Override
public ListenableFuture<Transaction> future() {
return future;
public ListenableCompletableFuture<Transaction> future() {
return ListenableCompletableFuture.of(future);
}
};
}
public ListenableFuture<Transaction> future() {
return future;
public ListenableCompletableFuture<Transaction> future() {
return ListenableCompletableFuture.of(future);
}
public void setMinConnections(int minConnections) {
@ -106,7 +105,7 @@ public class TransactionBroadcast {
long threshold = Math.round(numWaitingFor / 2.0);
if (size > threshold) {
log.warn("Threshold for considering broadcast rejected has been reached ({}/{})", size, threshold);
future.setException(new RejectedTransactionException(tx, rejectMessage));
future.completeExceptionally(new RejectedTransactionException(tx, rejectMessage));
peerGroup.removePreMessageReceivedEventListener(this);
}
}
@ -115,11 +114,11 @@ public class TransactionBroadcast {
}
};
public ListenableFuture<Transaction> broadcast() {
public ListenableCompletableFuture<Transaction> broadcast() {
peerGroup.addPreMessageReceivedEventListener(Threading.SAME_THREAD, rejectionListener);
log.info("Waiting for {} peers required for broadcast, we have {} ...", minConnections, peerGroup.getConnectedPeers().size());
peerGroup.waitForPeers(minConnections).addListener(new EnoughAvailablePeers(), Threading.SAME_THREAD);
return future;
return ListenableCompletableFuture.of(future);
}
private class EnoughAvailablePeers implements Runnable {
@ -211,7 +210,7 @@ public class TransactionBroadcast {
log.info("broadcastTransaction: {} complete", tx.getTxId());
peerGroup.removePreMessageReceivedEventListener(rejectionListener);
conf.removeEventListener(this);
future.set(tx); // RE-ENTRANCY POINT
future.complete(tx); // RE-ENTRANCY POINT
}
}
}

View File

@ -0,0 +1,77 @@
/*
* Copyright by the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bitcoinj.utils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
/**
* Utilities for {@link CompletableFuture}.
* <p>
* Note: When the <b>bitcoinj</b> migration to {@code CompletableFuture} is finished this class will
* either be removed or its remaining methods changed to use generic {@code CompletableFuture}s.
*/
public class FutureUtils {
/**
* Thank you Apache-licensed Spotify https://github.com/spotify/completable-futures
* @param stages A list of {@code CompletionStage}s all returning the same type
* @param <T> the result type
* @return A generic CompletableFuture that returns a list of result type
*/
private static <T> CompletableFuture<List<T>> allAsCFList(
List<? extends CompletionStage<? extends T>> stages) {
@SuppressWarnings("unchecked") // generic array creation
final CompletableFuture<? extends T>[] all = new CompletableFuture[stages.size()];
for (int i = 0; i < stages.size(); i++) {
all[i] = stages.get(i).toCompletableFuture();
}
CompletableFuture<Void> allOf = CompletableFuture.allOf(all);
for (CompletableFuture<? extends T> completableFuture : all) {
completableFuture.exceptionally(throwable -> {
if (!allOf.isDone()) {
allOf.completeExceptionally(throwable);
}
return null; // intentionally unused
});
}
return allOf
.thenApply(ignored -> {
final List<T> result = new ArrayList<>(all.length);
for (CompletableFuture<? extends T> completableFuture : all) {
result.add(completableFuture.join());
}
return result;
});
}
/**
* Note: When the migration to {@code CompletableFuture} is complete this routine will
* either be removed or changed to return a generic {@code CompletableFuture}.
* @param stages A list of {@code CompletionStage}s all returning the same type
* @param <T> the result type
* @return A ListenableCompletableFuture that returns a list of result type
*/
public static <T> ListenableCompletableFuture<List<T>> allAsList(
List<? extends CompletionStage<? extends T>> stages) {
return ListenableCompletableFuture.of(FutureUtils.allAsCFList(stages));
}
}

View File

@ -23,6 +23,23 @@ import java.util.concurrent.CompletableFuture;
*/
public class ListenableCompletableFuture<V> extends CompletableFuture<V> implements ListenableCompletionStage<V> {
/**
* Returns a new {@link CompletableFuture} that is already completed with
* the given value.
* <p>
* When the migration to {@link CompletableFuture} is finished use of this method
* can be replaced with {@link CompletableFuture#completedFuture(Object)}.
*
* @param value the value
* @param <T> the type of the value
* @return the completed CompletableFuture
*/
public static <T> ListenableCompletableFuture<T> completedFuture(T value) {
ListenableCompletableFuture<T> future = new ListenableCompletableFuture<>();
future.complete(value);
return future;
}
/**
* Converts a generic {@link CompletableFuture} to a {@code ListenableCompletableFuture}. If the passed
* in future is already a {@code ListenableCompletableFuture} no conversion is performed.

View File

@ -20,10 +20,7 @@ package org.bitcoinj.wallet;
import com.google.common.annotations.*;
import com.google.common.collect.*;
import com.google.common.math.IntMath;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.*;
import net.jcip.annotations.*;
import org.bitcoinj.core.listeners.*;
@ -3882,7 +3879,7 @@ public class Wallet extends BaseTaggableObject
/** The Bitcoin transaction message that moves the money. */
public final Transaction tx;
/** A future that will complete once the tx message has been successfully broadcast to the network. This is just the result of calling broadcast.future() */
public final ListenableFuture<Transaction> broadcastComplete;
public final ListenableCompletableFuture<Transaction> broadcastComplete;
/** The broadcast object returned by the linked TransactionBroadcaster */
public final TransactionBroadcast broadcast;
@ -5319,7 +5316,7 @@ public class Wallet extends BaseTaggableObject
* @return A list of transactions that the wallet just made/will make for internal maintenance. Might be empty.
* @throws org.bitcoinj.wallet.DeterministicUpgradeRequiresPassword if key rotation requires the users password.
*/
public ListenableFuture<List<Transaction>> doMaintenance(@Nullable KeyParameter aesKey, boolean signAndSend)
public ListenableCompletableFuture<List<Transaction>> doMaintenance(@Nullable KeyParameter aesKey, boolean signAndSend)
throws DeterministicUpgradeRequiresPassword {
return doMaintenance(KeyChainGroupStructure.DEFAULT, aesKey, signAndSend);
}
@ -5339,7 +5336,7 @@ public class Wallet extends BaseTaggableObject
* @return A list of transactions that the wallet just made/will make for internal maintenance. Might be empty.
* @throws org.bitcoinj.wallet.DeterministicUpgradeRequiresPassword if key rotation requires the users password.
*/
public ListenableFuture<List<Transaction>> doMaintenance(KeyChainGroupStructure structure,
public ListenableCompletableFuture<List<Transaction>> doMaintenance(KeyChainGroupStructure structure,
@Nullable KeyParameter aesKey, boolean signAndSend) throws DeterministicUpgradeRequiresPassword {
List<Transaction> txns;
lock.lock();
@ -5347,34 +5344,30 @@ public class Wallet extends BaseTaggableObject
try {
txns = maybeRotateKeys(structure, aesKey, signAndSend);
if (!signAndSend)
return Futures.immediateFuture(txns);
return ListenableCompletableFuture.completedFuture(txns);
} finally {
keyChainGroupLock.unlock();
lock.unlock();
}
checkState(!lock.isHeldByCurrentThread());
ArrayList<ListenableFuture<Transaction>> futures = new ArrayList<>(txns.size());
ArrayList<CompletableFuture<Transaction>> futures = new ArrayList<>(txns.size());
TransactionBroadcaster broadcaster = vTransactionBroadcaster;
for (Transaction tx : txns) {
try {
final ListenableFuture<Transaction> future = broadcaster.broadcastTransaction(tx).future();
final CompletableFuture<Transaction> future = broadcaster.broadcastTransaction(tx).future();
futures.add(future);
Futures.addCallback(future, new FutureCallback<Transaction>() {
@Override
public void onSuccess(Transaction transaction) {
future.whenComplete((transaction, throwable) -> {
if (transaction != null) {
log.info("Successfully broadcast key rotation tx: {}", transaction);
}
@Override
public void onFailure(Throwable throwable) {
} else {
log.error("Failed to broadcast key rotation tx", throwable);
}
}, MoreExecutors.directExecutor());
});
} catch (Exception e) {
log.error("Failed to broadcast rekey tx", e);
}
}
return Futures.allAsList(futures);
return FutureUtils.allAsList(futures);
}
// Checks to see if any coins are controlled by rotating keys and if so, spends them.

View File

@ -18,7 +18,6 @@
package org.bitcoinj.core;
import com.google.common.util.concurrent.AtomicDouble;
import com.google.common.util.concurrent.ListenableFuture;
import org.bitcoinj.core.listeners.TransactionConfidenceEventListener;
import org.bitcoinj.testing.*;
import org.bitcoinj.utils.*;
@ -72,7 +71,7 @@ public class TransactionBroadcastTest extends TestWithPeerGroup {
TransactionBroadcast broadcast = new TransactionBroadcast(peerGroup, tx);
final AtomicDouble lastProgress = new AtomicDouble();
broadcast.setProgressCallback(progress -> lastProgress.set(progress));
ListenableFuture<Transaction> future = broadcast.broadcast();
CompletableFuture<Transaction> future = broadcast.broadcast();
assertFalse(future.isDone());
assertEquals(0.0, lastProgress.get(), 0.0);
// We expect two peers to receive a tx message, and at least one of the others must announce for the future to
@ -120,7 +119,7 @@ public class TransactionBroadcastTest extends TestWithPeerGroup {
InboundMessageQueuer[] channels = { connectPeer(0), connectPeer(1), connectPeer(2), connectPeer(3), connectPeer(4) };
Transaction tx = FakeTxBuilder.createFakeTx(UNITTEST);
TransactionBroadcast broadcast = new TransactionBroadcast(peerGroup, tx);
ListenableFuture<Transaction> future = broadcast.broadcast();
CompletableFuture<Transaction> future = broadcast.broadcast();
// 0 and 3 are randomly selected to receive the broadcast.
assertEquals(tx, outbound(channels[1]));
assertEquals(tx, outbound(channels[2]));

View File

@ -20,11 +20,7 @@ import org.bitcoinj.core.*;
import org.bitcoinj.utils.Threading;
import org.bitcoinj.wallet.Wallet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
@ -40,16 +36,16 @@ public class MockTransactionBroadcaster implements TransactionBroadcaster {
public static class TxFuturePair {
public final Transaction tx;
public final SettableFuture<Transaction> future;
public final CompletableFuture<Transaction> future;
public TxFuturePair(Transaction tx, SettableFuture<Transaction> future) {
public TxFuturePair(Transaction tx, CompletableFuture<Transaction> future) {
this.tx = tx;
this.future = future;
}
/** Tells the broadcasting code that the broadcast was a success, just does future.set(tx) */
public void succeed() {
future.set(tx);
future.complete(tx);
}
}
@ -74,22 +70,17 @@ public class MockTransactionBroadcaster implements TransactionBroadcaster {
// Use a lock just to catch lock ordering inversions e.g. wallet->broadcaster.
lock.lock();
try {
SettableFuture<Transaction> result = SettableFuture.create();
CompletableFuture<Transaction> result = new CompletableFuture<>();
broadcasts.put(new TxFuturePair(tx, result));
Futures.addCallback(result, new FutureCallback<Transaction>() {
@Override
public void onSuccess(Transaction result) {
result.whenComplete((transaction, t) -> {
if (transaction != null) {
try {
wallet.receivePending(result, null);
wallet.receivePending(transaction, null);
} catch (VerificationException e) {
throw new RuntimeException(e);
}
}
@Override
public void onFailure(Throwable t) {
}
}, MoreExecutors.directExecutor());
});
return TransactionBroadcast.createMockBroadcast(tx, result);
} catch (InterruptedException e) {
throw new RuntimeException(e);

View File

@ -136,12 +136,10 @@ public class ForwardingService {
checkNotNull(sendResult); // We should never try to send more coins than we have!
System.out.println("Sending ...");
// Register a callback that is invoked when the transaction has propagated across the network.
// This shows a second style of registering ListenableFuture callbacks, it works when you don't
// need access to the object the future returns.
sendResult.broadcastComplete.addListener(() -> {
sendResult.broadcastComplete.thenAccept(transaction -> {
// The wallet has changed now, it'll get auto saved shortly or when the app shuts down.
System.out.println("Sent coins onwards! Transaction hash is " + sendResult.tx.getTxId());
}, MoreExecutors.directExecutor());
System.out.println("Sent coins onwards! Transaction hash is " + transaction.getTxId());
});
} catch (KeyCrypterException | InsufficientMoneyException e) {
// We don't use encrypted wallets in this example - can never happen.
throw new RuntimeException(e);