diff --git a/core/src/main/java/org/bitcoinj/core/PeerGroup.java b/core/src/main/java/org/bitcoinj/core/PeerGroup.java index ac5749235..23dbf7f58 100644 --- a/core/src/main/java/org/bitcoinj/core/PeerGroup.java +++ b/core/src/main/java/org/bitcoinj/core/PeerGroup.java @@ -2036,7 +2036,7 @@ public class PeerGroup implements TransactionBroadcaster { * peers. Once all connected peers have announced the transaction, the future available via the * {@link TransactionBroadcast#future()} method will be completed. If anything goes * wrong the exception will be thrown when get() is called, or you can receive it via a callback on the - * {@link ListenableFuture}. This method returns immediately, so if you want it to block just call get() on the + * {@link ListenableCompletableFuture}. This method returns immediately, so if you want it to block just call get() on the * result.

* *

Optionally, peers will be dropped after they have been used for broadcasting the transaction and they have @@ -2064,9 +2064,8 @@ public class PeerGroup implements TransactionBroadcaster { broadcast.setMinConnections(minConnections); broadcast.setDropPeersAfterBroadcast(dropPeersAfterBroadcast && tx.getConfidence().numBroadcastPeers() == 0); // Send the TX to the wallet once we have a successful broadcast. - Futures.addCallback(broadcast.future(), new FutureCallback() { - @Override - public void onSuccess(Transaction transaction) { + broadcast.future().whenComplete((transaction, throwable) -> { + if (transaction != null) { runningBroadcasts.remove(broadcast); // OK, now tell the wallet about the transaction. If the wallet created the transaction then // it already knows and will ignore this. If it's a transaction we received from @@ -2083,14 +2082,11 @@ public class PeerGroup implements TransactionBroadcaster { throw new RuntimeException(e); // Cannot fail to verify a tx we created ourselves. } } - } - - @Override - public void onFailure(Throwable throwable) { + } else { // This can happen if we get a reject message from a peer. runningBroadcasts.remove(broadcast); } - }, MoreExecutors.directExecutor()); + }); // Keep a reference to the TransactionBroadcast object. This is important because otherwise, the entire tree // of objects we just created would become garbage if the user doesn't hold on to the returned future, and // eventually be collected. This in turn could result in the transaction not being committed to the wallet diff --git a/core/src/main/java/org/bitcoinj/core/TransactionBroadcast.java b/core/src/main/java/org/bitcoinj/core/TransactionBroadcast.java index efb6b872c..93df8a866 100644 --- a/core/src/main/java/org/bitcoinj/core/TransactionBroadcast.java +++ b/core/src/main/java/org/bitcoinj/core/TransactionBroadcast.java @@ -19,7 +19,6 @@ package org.bitcoinj.core; import com.google.common.annotations.*; import com.google.common.base.*; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.Uninterruptibles; import org.bitcoinj.utils.*; import org.bitcoinj.wallet.Wallet; @@ -42,7 +41,7 @@ import org.bitcoinj.core.listeners.PreMessageReceivedEventListener; public class TransactionBroadcast { private static final Logger log = LoggerFactory.getLogger(TransactionBroadcast.class); - private final SettableFuture future = SettableFuture.create(); + private final CompletableFuture future = new ListenableCompletableFuture<>(); private final PeerGroup peerGroup; private final Transaction tx; private int minConnections; @@ -69,22 +68,22 @@ public class TransactionBroadcast { } @VisibleForTesting - public static TransactionBroadcast createMockBroadcast(Transaction tx, final SettableFuture future) { + public static TransactionBroadcast createMockBroadcast(Transaction tx, final CompletableFuture future) { return new TransactionBroadcast(tx) { @Override - public ListenableFuture broadcast() { - return future; + public ListenableCompletableFuture broadcast() { + return ListenableCompletableFuture.of(future); } @Override - public ListenableFuture future() { - return future; + public ListenableCompletableFuture future() { + return ListenableCompletableFuture.of(future); } }; } - public ListenableFuture future() { - return future; + public ListenableCompletableFuture future() { + return ListenableCompletableFuture.of(future); } public void setMinConnections(int minConnections) { @@ -106,7 +105,7 @@ public class TransactionBroadcast { long threshold = Math.round(numWaitingFor / 2.0); if (size > threshold) { log.warn("Threshold for considering broadcast rejected has been reached ({}/{})", size, threshold); - future.setException(new RejectedTransactionException(tx, rejectMessage)); + future.completeExceptionally(new RejectedTransactionException(tx, rejectMessage)); peerGroup.removePreMessageReceivedEventListener(this); } } @@ -115,11 +114,11 @@ public class TransactionBroadcast { } }; - public ListenableFuture broadcast() { + public ListenableCompletableFuture broadcast() { peerGroup.addPreMessageReceivedEventListener(Threading.SAME_THREAD, rejectionListener); log.info("Waiting for {} peers required for broadcast, we have {} ...", minConnections, peerGroup.getConnectedPeers().size()); peerGroup.waitForPeers(minConnections).addListener(new EnoughAvailablePeers(), Threading.SAME_THREAD); - return future; + return ListenableCompletableFuture.of(future); } private class EnoughAvailablePeers implements Runnable { @@ -211,7 +210,7 @@ public class TransactionBroadcast { log.info("broadcastTransaction: {} complete", tx.getTxId()); peerGroup.removePreMessageReceivedEventListener(rejectionListener); conf.removeEventListener(this); - future.set(tx); // RE-ENTRANCY POINT + future.complete(tx); // RE-ENTRANCY POINT } } } diff --git a/core/src/main/java/org/bitcoinj/utils/FutureUtils.java b/core/src/main/java/org/bitcoinj/utils/FutureUtils.java new file mode 100644 index 000000000..2458e1924 --- /dev/null +++ b/core/src/main/java/org/bitcoinj/utils/FutureUtils.java @@ -0,0 +1,77 @@ +/* + * Copyright by the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bitcoinj.utils; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +/** + * Utilities for {@link CompletableFuture}. + *

+ * Note: When the bitcoinj migration to {@code CompletableFuture} is finished this class will + * either be removed or its remaining methods changed to use generic {@code CompletableFuture}s. + */ +public class FutureUtils { + + /** + * Thank you Apache-licensed Spotify https://github.com/spotify/completable-futures + * @param stages A list of {@code CompletionStage}s all returning the same type + * @param the result type + * @return A generic CompletableFuture that returns a list of result type + */ + private static CompletableFuture> allAsCFList( + List> stages) { + @SuppressWarnings("unchecked") // generic array creation + final CompletableFuture[] all = new CompletableFuture[stages.size()]; + for (int i = 0; i < stages.size(); i++) { + all[i] = stages.get(i).toCompletableFuture(); + } + + CompletableFuture allOf = CompletableFuture.allOf(all); + + for (CompletableFuture completableFuture : all) { + completableFuture.exceptionally(throwable -> { + if (!allOf.isDone()) { + allOf.completeExceptionally(throwable); + } + return null; // intentionally unused + }); + } + + return allOf + .thenApply(ignored -> { + final List result = new ArrayList<>(all.length); + for (CompletableFuture completableFuture : all) { + result.add(completableFuture.join()); + } + return result; + }); + } + + /** + * Note: When the migration to {@code CompletableFuture} is complete this routine will + * either be removed or changed to return a generic {@code CompletableFuture}. + * @param stages A list of {@code CompletionStage}s all returning the same type + * @param the result type + * @return A ListenableCompletableFuture that returns a list of result type + */ + public static ListenableCompletableFuture> allAsList( + List> stages) { + return ListenableCompletableFuture.of(FutureUtils.allAsCFList(stages)); + } +} diff --git a/core/src/main/java/org/bitcoinj/utils/ListenableCompletableFuture.java b/core/src/main/java/org/bitcoinj/utils/ListenableCompletableFuture.java index 971494374..2d003519a 100644 --- a/core/src/main/java/org/bitcoinj/utils/ListenableCompletableFuture.java +++ b/core/src/main/java/org/bitcoinj/utils/ListenableCompletableFuture.java @@ -23,6 +23,23 @@ import java.util.concurrent.CompletableFuture; */ public class ListenableCompletableFuture extends CompletableFuture implements ListenableCompletionStage { + /** + * Returns a new {@link CompletableFuture} that is already completed with + * the given value. + *

+ * When the migration to {@link CompletableFuture} is finished use of this method + * can be replaced with {@link CompletableFuture#completedFuture(Object)}. + * + * @param value the value + * @param the type of the value + * @return the completed CompletableFuture + */ + public static ListenableCompletableFuture completedFuture(T value) { + ListenableCompletableFuture future = new ListenableCompletableFuture<>(); + future.complete(value); + return future; + } + /** * Converts a generic {@link CompletableFuture} to a {@code ListenableCompletableFuture}. If the passed * in future is already a {@code ListenableCompletableFuture} no conversion is performed. diff --git a/core/src/main/java/org/bitcoinj/wallet/Wallet.java b/core/src/main/java/org/bitcoinj/wallet/Wallet.java index 6099e368b..c83044d25 100644 --- a/core/src/main/java/org/bitcoinj/wallet/Wallet.java +++ b/core/src/main/java/org/bitcoinj/wallet/Wallet.java @@ -20,10 +20,7 @@ package org.bitcoinj.wallet; import com.google.common.annotations.*; import com.google.common.collect.*; import com.google.common.math.IntMath; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.*; import net.jcip.annotations.*; import org.bitcoinj.core.listeners.*; @@ -3882,7 +3879,7 @@ public class Wallet extends BaseTaggableObject /** The Bitcoin transaction message that moves the money. */ public final Transaction tx; /** A future that will complete once the tx message has been successfully broadcast to the network. This is just the result of calling broadcast.future() */ - public final ListenableFuture broadcastComplete; + public final ListenableCompletableFuture broadcastComplete; /** The broadcast object returned by the linked TransactionBroadcaster */ public final TransactionBroadcast broadcast; @@ -5319,7 +5316,7 @@ public class Wallet extends BaseTaggableObject * @return A list of transactions that the wallet just made/will make for internal maintenance. Might be empty. * @throws org.bitcoinj.wallet.DeterministicUpgradeRequiresPassword if key rotation requires the users password. */ - public ListenableFuture> doMaintenance(@Nullable KeyParameter aesKey, boolean signAndSend) + public ListenableCompletableFuture> doMaintenance(@Nullable KeyParameter aesKey, boolean signAndSend) throws DeterministicUpgradeRequiresPassword { return doMaintenance(KeyChainGroupStructure.DEFAULT, aesKey, signAndSend); } @@ -5339,7 +5336,7 @@ public class Wallet extends BaseTaggableObject * @return A list of transactions that the wallet just made/will make for internal maintenance. Might be empty. * @throws org.bitcoinj.wallet.DeterministicUpgradeRequiresPassword if key rotation requires the users password. */ - public ListenableFuture> doMaintenance(KeyChainGroupStructure structure, + public ListenableCompletableFuture> doMaintenance(KeyChainGroupStructure structure, @Nullable KeyParameter aesKey, boolean signAndSend) throws DeterministicUpgradeRequiresPassword { List txns; lock.lock(); @@ -5347,34 +5344,30 @@ public class Wallet extends BaseTaggableObject try { txns = maybeRotateKeys(structure, aesKey, signAndSend); if (!signAndSend) - return Futures.immediateFuture(txns); + return ListenableCompletableFuture.completedFuture(txns); } finally { keyChainGroupLock.unlock(); lock.unlock(); } checkState(!lock.isHeldByCurrentThread()); - ArrayList> futures = new ArrayList<>(txns.size()); + ArrayList> futures = new ArrayList<>(txns.size()); TransactionBroadcaster broadcaster = vTransactionBroadcaster; for (Transaction tx : txns) { try { - final ListenableFuture future = broadcaster.broadcastTransaction(tx).future(); + final CompletableFuture future = broadcaster.broadcastTransaction(tx).future(); futures.add(future); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Transaction transaction) { + future.whenComplete((transaction, throwable) -> { + if (transaction != null) { log.info("Successfully broadcast key rotation tx: {}", transaction); - } - - @Override - public void onFailure(Throwable throwable) { + } else { log.error("Failed to broadcast key rotation tx", throwable); } - }, MoreExecutors.directExecutor()); + }); } catch (Exception e) { log.error("Failed to broadcast rekey tx", e); } } - return Futures.allAsList(futures); + return FutureUtils.allAsList(futures); } // Checks to see if any coins are controlled by rotating keys and if so, spends them. diff --git a/core/src/test/java/org/bitcoinj/core/TransactionBroadcastTest.java b/core/src/test/java/org/bitcoinj/core/TransactionBroadcastTest.java index 4fbc0cfb0..fc4af613e 100644 --- a/core/src/test/java/org/bitcoinj/core/TransactionBroadcastTest.java +++ b/core/src/test/java/org/bitcoinj/core/TransactionBroadcastTest.java @@ -18,7 +18,6 @@ package org.bitcoinj.core; import com.google.common.util.concurrent.AtomicDouble; -import com.google.common.util.concurrent.ListenableFuture; import org.bitcoinj.core.listeners.TransactionConfidenceEventListener; import org.bitcoinj.testing.*; import org.bitcoinj.utils.*; @@ -72,7 +71,7 @@ public class TransactionBroadcastTest extends TestWithPeerGroup { TransactionBroadcast broadcast = new TransactionBroadcast(peerGroup, tx); final AtomicDouble lastProgress = new AtomicDouble(); broadcast.setProgressCallback(progress -> lastProgress.set(progress)); - ListenableFuture future = broadcast.broadcast(); + CompletableFuture future = broadcast.broadcast(); assertFalse(future.isDone()); assertEquals(0.0, lastProgress.get(), 0.0); // We expect two peers to receive a tx message, and at least one of the others must announce for the future to @@ -120,7 +119,7 @@ public class TransactionBroadcastTest extends TestWithPeerGroup { InboundMessageQueuer[] channels = { connectPeer(0), connectPeer(1), connectPeer(2), connectPeer(3), connectPeer(4) }; Transaction tx = FakeTxBuilder.createFakeTx(UNITTEST); TransactionBroadcast broadcast = new TransactionBroadcast(peerGroup, tx); - ListenableFuture future = broadcast.broadcast(); + CompletableFuture future = broadcast.broadcast(); // 0 and 3 are randomly selected to receive the broadcast. assertEquals(tx, outbound(channels[1])); assertEquals(tx, outbound(channels[2])); diff --git a/core/src/test/java/org/bitcoinj/testing/MockTransactionBroadcaster.java b/core/src/test/java/org/bitcoinj/testing/MockTransactionBroadcaster.java index dcc8d2d9a..070fb19d4 100644 --- a/core/src/test/java/org/bitcoinj/testing/MockTransactionBroadcaster.java +++ b/core/src/test/java/org/bitcoinj/testing/MockTransactionBroadcaster.java @@ -20,11 +20,7 @@ import org.bitcoinj.core.*; import org.bitcoinj.utils.Threading; import org.bitcoinj.wallet.Wallet; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; - +import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.locks.ReentrantLock; @@ -40,16 +36,16 @@ public class MockTransactionBroadcaster implements TransactionBroadcaster { public static class TxFuturePair { public final Transaction tx; - public final SettableFuture future; + public final CompletableFuture future; - public TxFuturePair(Transaction tx, SettableFuture future) { + public TxFuturePair(Transaction tx, CompletableFuture future) { this.tx = tx; this.future = future; } /** Tells the broadcasting code that the broadcast was a success, just does future.set(tx) */ public void succeed() { - future.set(tx); + future.complete(tx); } } @@ -74,22 +70,17 @@ public class MockTransactionBroadcaster implements TransactionBroadcaster { // Use a lock just to catch lock ordering inversions e.g. wallet->broadcaster. lock.lock(); try { - SettableFuture result = SettableFuture.create(); + CompletableFuture result = new CompletableFuture<>(); broadcasts.put(new TxFuturePair(tx, result)); - Futures.addCallback(result, new FutureCallback() { - @Override - public void onSuccess(Transaction result) { + result.whenComplete((transaction, t) -> { + if (transaction != null) { try { - wallet.receivePending(result, null); + wallet.receivePending(transaction, null); } catch (VerificationException e) { throw new RuntimeException(e); } } - - @Override - public void onFailure(Throwable t) { - } - }, MoreExecutors.directExecutor()); + }); return TransactionBroadcast.createMockBroadcast(tx, result); } catch (InterruptedException e) { throw new RuntimeException(e); diff --git a/examples/src/main/java/org/bitcoinj/examples/ForwardingService.java b/examples/src/main/java/org/bitcoinj/examples/ForwardingService.java index 272f7df9d..a74323dc3 100644 --- a/examples/src/main/java/org/bitcoinj/examples/ForwardingService.java +++ b/examples/src/main/java/org/bitcoinj/examples/ForwardingService.java @@ -136,12 +136,10 @@ public class ForwardingService { checkNotNull(sendResult); // We should never try to send more coins than we have! System.out.println("Sending ..."); // Register a callback that is invoked when the transaction has propagated across the network. - // This shows a second style of registering ListenableFuture callbacks, it works when you don't - // need access to the object the future returns. - sendResult.broadcastComplete.addListener(() -> { + sendResult.broadcastComplete.thenAccept(transaction -> { // The wallet has changed now, it'll get auto saved shortly or when the app shuts down. - System.out.println("Sent coins onwards! Transaction hash is " + sendResult.tx.getTxId()); - }, MoreExecutors.directExecutor()); + System.out.println("Sent coins onwards! Transaction hash is " + transaction.getTxId()); + }); } catch (KeyCrypterException | InsufficientMoneyException e) { // We don't use encrypted wallets in this example - can never happen. throw new RuntimeException(e);