From 295ce507827d077c33b335ee7b5db84c50a9444c Mon Sep 17 00:00:00 2001 From: Sean Gilligan Date: Fri, 29 Jul 2022 14:53:34 -0700 Subject: [PATCH] TransactionBroadcast: add broadcastOnly(), broadcastAndAwaitRelay() deprecate broadcast() --- .../java/org/bitcoinj/core/PeerGroup.java | 2 +- .../bitcoinj/core/TransactionBroadcast.java | 49 +++++++++++++++---- .../core/TransactionBroadcastTest.java | 4 +- 3 files changed, 42 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/bitcoinj/core/PeerGroup.java b/core/src/main/java/org/bitcoinj/core/PeerGroup.java index 342feeb39..f75aca5c9 100644 --- a/core/src/main/java/org/bitcoinj/core/PeerGroup.java +++ b/core/src/main/java/org/bitcoinj/core/PeerGroup.java @@ -2122,7 +2122,7 @@ public class PeerGroup implements TransactionBroadcaster { // eventually be collected. This in turn could result in the transaction not being committed to the wallet // at all. runningBroadcasts.add(broadcast); - broadcast.broadcast(); + broadcast.broadcastOnly(); return broadcast; } diff --git a/core/src/main/java/org/bitcoinj/core/TransactionBroadcast.java b/core/src/main/java/org/bitcoinj/core/TransactionBroadcast.java index 8d28dfb92..2d9458e09 100644 --- a/core/src/main/java/org/bitcoinj/core/TransactionBroadcast.java +++ b/core/src/main/java/org/bitcoinj/core/TransactionBroadcast.java @@ -50,7 +50,6 @@ import static com.google.common.base.Preconditions.checkState; public class TransactionBroadcast { private static final Logger log = LoggerFactory.getLogger(TransactionBroadcast.class); - // TODO: Decide how to make sentFuture publicly available (and possibly use internally) // This future completes when all broadcast messages were sent (to a buffer) private final CompletableFuture sentFuture = new CompletableFuture<>(); @@ -138,27 +137,29 @@ public class TransactionBroadcast { // TODO: Should this method be moved into the PeerGroup? /** * Broadcast this transaction to the proper calculated number of peers. Returns a future that completes when the message - * has is "seen" by remote peers. The {@link Transaction} itself is the returned type/value for the future. + * has been "sent" to a set of remote peers. The {@link TransactionBroadcast} itself is the returned type/value for the future. *

- * The broadcast process includes the following steps: + * The complete broadcast process includes the following steps: *

    *
  1. Wait until enough {@link org.bitcoinj.core.Peer}s are connected.
  2. - *
  3. Broadcast the transaction by a determined number of {@link org.bitcoinj.core.Peer}s
  4. + *
  5. Broadcast the transaction to a determined number of {@link org.bitcoinj.core.Peer}s
  6. *
  7. Wait for confirmation from a determined number of remote peers that they have received the broadcast
  8. - *
  9. Mark {@link TransactionBroadcast#future()} ("sent future") as complete
  10. + *
  11. Mark {@link TransactionBroadcast#future()} ("seen future") as complete
  12. *
+ * The future returned from this method completes when Step 2 is completed. *

* It should further be noted that "broadcast" in this class means that * {@link org.bitcoinj.net.MessageWriteTarget#writeBytes} has completed successfully which means the message has * been sent to the "OS network buffer" -- see {@link org.bitcoinj.net.MessageWriteTarget#writeBytes} or its implementation. *

- * @return A future that completes when the message has been confirmed as seen by the appropriate number of remote peers + * @return A future that completes when the message has been sent (or at least buffered) to the correct number of remote Peers. The future + * will complete exceptionally if any of the peer broadcasts fails. */ - public ListenableCompletableFuture broadcast() { + public CompletableFuture broadcastOnly() { peerGroup.addPreMessageReceivedEventListener(Threading.SAME_THREAD, rejectionListener); log.info("Waiting for {} peers required for broadcast, we have {} ...", minConnections, peerGroup.getConnectedPeers().size()); final Context context = Context.get(); - peerGroup.waitForPeers(minConnections).thenComposeAsync( peerList /* not used */ -> { + return peerGroup.waitForPeers(minConnections).thenComposeAsync( peerList /* not used */ -> { Context.propagate(context); // We now have enough connected peers to send the transaction. // This can be called immediately if we already have enough. Otherwise it'll be called from a peer @@ -200,8 +201,36 @@ public class TransactionBroadcast { log.error("broadcast - one ore more peers failed to send", err); sentFuture.completeExceptionally(err); } - }); - return ListenableCompletableFuture.of(seenFuture); + }) + .thenCompose(v -> sentFuture); + } + + /** + * Broadcast the transaction and wait for confirmation that the transaction has been received by the appropriate + * number of Peers before completing. + * @return A future that completes when the message has been relayed by the appropriate number of remote peers + */ + public CompletableFuture broadcastAndAwaitRelay() { + return broadcastOnly() + .thenCompose(broadcast -> this.seenFuture) + .thenApply(tx -> this); + } + + /** + * If you migrate to {@link #broadcastAndAwaitRelay()} and need a {@link CompletableFuture} that returns + * {@link Transaction} you can use: + *

{@code
+     *  CompletableFuture seenFuture = broadcast
+     *              .broadcastAndAwaitRelay()
+     *              .thenApply(TransactionBroadcast::transaction);
+     * }
+ * @deprecated Use {@link #broadcastAndAwaitRelay()} or {@link #broadcastOnly()} as appropriate + */ + @Deprecated + public ListenableCompletableFuture broadcast() { + return ListenableCompletableFuture.of( + broadcastAndAwaitRelay().thenApply(TransactionBroadcast::transaction) + ); } private CompletableFuture broadcastOne(Peer peer) { diff --git a/integration-test/src/test/java/org/bitcoinj/core/TransactionBroadcastTest.java b/integration-test/src/test/java/org/bitcoinj/core/TransactionBroadcastTest.java index 0a77a9f5b..d4fb969d4 100644 --- a/integration-test/src/test/java/org/bitcoinj/core/TransactionBroadcastTest.java +++ b/integration-test/src/test/java/org/bitcoinj/core/TransactionBroadcastTest.java @@ -85,7 +85,7 @@ public class TransactionBroadcastTest extends TestWithPeerGroup { TransactionBroadcast broadcast = new TransactionBroadcast(peerGroup, tx); final AtomicDouble lastProgress = new AtomicDouble(); broadcast.setProgressCallback(lastProgress::set); - CompletableFuture future = broadcast.broadcast(); + CompletableFuture future = broadcast.broadcastAndAwaitRelay(); assertFalse(future.isDone()); assertEquals(0.0, lastProgress.get(), 0.0); // We expect two peers to receive a tx message, and at least one of the others must announce for the future to @@ -133,7 +133,7 @@ public class TransactionBroadcastTest extends TestWithPeerGroup { InboundMessageQueuer[] channels = { connectPeer(0), connectPeer(1), connectPeer(2), connectPeer(3), connectPeer(4) }; Transaction tx = FakeTxBuilder.createFakeTx(UNITTEST); TransactionBroadcast broadcast = new TransactionBroadcast(peerGroup, tx); - CompletableFuture future = broadcast.broadcast(); + CompletableFuture future = broadcast.broadcastAndAwaitRelay(); // 0 and 3 are randomly selected to receive the broadcast. assertEquals(tx, outbound(channels[1])); assertEquals(tx, outbound(channels[2]));