TransactionBroadcast: add awaitRelayed(), deprecate future()

* Add `awaitRelayed()` method that returns a CF that completes when relaying is confirmed.
* Deprecate `future()` and reimplement it using `awaitRelayed()`.
This commit is contained in:
Sean Gilligan 2023-03-27 06:49:22 -07:00 committed by Andreas Schildbach
parent 504b67c186
commit b24964d056
6 changed files with 31 additions and 19 deletions

View File

@ -2184,7 +2184,7 @@ public class PeerGroup implements TransactionBroadcaster {
/**
* <p>Given a transaction, sends it un-announced to one peer and then waits for it to be received back from other
* peers. Once all connected peers have announced the transaction, the future available via the
* {@link TransactionBroadcast#future()} method will be completed. If anything goes
* {@link TransactionBroadcast#awaitRelayed()} ()} method will be completed. If anything goes
* wrong the exception will be thrown when get() is called, or you can receive it via a callback on the
* {@link ListenableCompletableFuture}. This method returns immediately, so if you want it to block just call get() on the
* result.</p>
@ -2214,9 +2214,9 @@ public class PeerGroup implements TransactionBroadcaster {
broadcast.setMinConnections(minConnections);
broadcast.setDropPeersAfterBroadcast(dropPeersAfterBroadcast && tx.getConfidence().numBroadcastPeers() == 0);
// Send the TX to the wallet once we have a successful broadcast.
broadcast.future().whenComplete((transaction, throwable) -> {
if (transaction != null) {
runningBroadcasts.remove(broadcast);
broadcast.awaitRelayed().whenComplete((bcast, throwable) -> {
if (bcast != null) {
runningBroadcasts.remove(bcast);
// OK, now tell the wallet about the transaction. If the wallet created the transaction then
// it already knows and will ignore this. If it's a transaction we received from
// somebody else via a side channel and are now broadcasting, this will put it into the
@ -2227,14 +2227,14 @@ public class PeerGroup implements TransactionBroadcaster {
// We may end up with two threads trying to do this in parallel - the wallet will
// ignore whichever one loses the race.
try {
wallet.receivePending(transaction, null);
wallet.receivePending(bcast.transaction(), null);
} catch (VerificationException e) {
throw new RuntimeException(e); // Cannot fail to verify a tx we created ourselves.
}
}
} else {
// This can happen if we get a reject message from a peer.
runningBroadcasts.remove(broadcast);
runningBroadcasts.remove(bcast);
}
});
// Keep a reference to the TransactionBroadcast object. This is important because otherwise, the entire tree

View File

@ -38,6 +38,7 @@ import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static org.bitcoinj.base.internal.Preconditions.checkState;
@ -55,7 +56,7 @@ public class TransactionBroadcast {
private final CompletableFuture<TransactionBroadcast> sentFuture = new CompletableFuture<>();
// This future completes when we have verified that more than numWaitingFor Peers have seen the broadcast
private final CompletableFuture<Transaction> seenFuture = new ListenableCompletableFuture<>();
private final CompletableFuture<TransactionBroadcast> seenFuture = new CompletableFuture<>();
private final PeerGroup peerGroup;
private final Transaction tx;
private int minConnections;
@ -102,9 +103,11 @@ public class TransactionBroadcast {
/**
* @return future that completes when some number of remote peers has rebroadcast the transaction
* @deprecated Use {@link #awaitRelayed()} (and maybe {@link CompletableFuture#thenApply(Function)})
*/
@Deprecated
public ListenableCompletableFuture<Transaction> future() {
return ListenableCompletableFuture.of(seenFuture);
return ListenableCompletableFuture.of(awaitRelayed().thenApply(TransactionBroadcast::transaction));
}
public void setMinConnections(int minConnections) {
@ -145,7 +148,7 @@ public class TransactionBroadcast {
* <li>Wait until enough {@link org.bitcoinj.core.Peer}s are connected.</li>
* <li>Broadcast the transaction to a determined number of {@link org.bitcoinj.core.Peer}s</li>
* <li>Wait for confirmation from a determined number of remote peers that they have received the broadcast</li>
* <li>Mark {@link TransactionBroadcast#future()} ("seen future") as complete</li>
* <li>Mark {@link TransactionBroadcast#awaitRelayed()} ()} ("seen future") as complete</li>
* </ol>
* The future returned from this method completes when Step 2 is completed.
* <p>
@ -213,8 +216,15 @@ public class TransactionBroadcast {
*/
public CompletableFuture<TransactionBroadcast> broadcastAndAwaitRelay() {
return broadcastOnly()
.thenCompose(broadcast -> this.seenFuture)
.thenApply(tx -> this);
.thenCompose(broadcast -> this.seenFuture);
}
/**
* Wait for confirmation the transaction has been relayed.
* @return A future that completes when the message has been relayed by the appropriate number of remote peers
*/
public CompletableFuture<TransactionBroadcast> awaitRelayed() {
return seenFuture;
}
/**
@ -311,7 +321,7 @@ public class TransactionBroadcast {
log.info("broadcastTransaction: {} complete", tx.getTxId());
peerGroup.removePreMessageReceivedEventListener(rejectionListener);
conf.removeEventListener(this);
seenFuture.complete(tx); // RE-ENTRANCY POINT
seenFuture.complete(TransactionBroadcast.this); // RE-ENTRANCY POINT
}
}
}

View File

@ -4030,7 +4030,7 @@ public class Wallet extends BaseTaggableObject
public SendResult(TransactionBroadcast broadcast) {
this.tx = broadcast.transaction();
this.broadcast = broadcast;
this.broadcastComplete = broadcast.future();
this.broadcastComplete = ListenableCompletableFuture.of(broadcast.awaitRelayed().thenApply(TransactionBroadcast::transaction));
}
}
@ -5496,7 +5496,9 @@ public class Wallet extends BaseTaggableObject
TransactionBroadcaster broadcaster = vTransactionBroadcaster;
for (Transaction tx : txns) {
try {
final CompletableFuture<Transaction> future = broadcaster.broadcastTransaction(tx).future();
final CompletableFuture<Transaction> future = broadcaster.broadcastTransaction(tx)
.awaitRelayed()
.thenApply(TransactionBroadcast::transaction);
futures.add(future);
future.whenComplete((transaction, throwable) -> {
if (transaction != null) {

View File

@ -187,7 +187,7 @@ public class ForwardingService implements Closeable {
})
.thenCompose(broadcast -> {
System.out.printf("Transaction %s is signed and is being delivered to %s...\n", broadcast.transaction().getTxId(), network);
return broadcast.future(); // Return a future that completes when Peers report they have seen the transaction
return broadcast.awaitRelayed().thenApply(TransactionBroadcast::transaction); // Wait until peers report they have seen the transaction
})
.thenAccept(tx ->
System.out.printf("Sent %s onwards and acknowledged by peers, via transaction %s\n", tx.getOutputSum().toFriendlyString(), tx.getTxId())
@ -220,7 +220,7 @@ public class ForwardingService implements Closeable {
* <li>Wait until enough {@link org.bitcoinj.core.Peer}s are connected.</li>
* <li>Broadcast the transaction by a determined number of {@link org.bitcoinj.core.Peer}s</li>
* <li>Wait for confirmation from a determined number of remote peers that they have received the broadcast</li>
* <li>Mark {@link TransactionBroadcast#future()} as complete</li>
* <li>Mark {@link TransactionBroadcast#awaitRelayed()} as complete</li>
* </ol>
* Note: There is a pending PR (#2548) which will make available an additional {@link CompletableFuture} that will complete
* after step 3 above. When and if that PR is merged, this method should probably return that future.

View File

@ -93,7 +93,7 @@ public class TestFeeLevel {
" peers connected"));
kit.peerGroup().addConnectedEventListener((peer, peerCount) -> System.out.println(peerCount +
" peers connected"));
kit.peerGroup().broadcastTransaction(request.tx).future().get();
kit.peerGroup().broadcastTransaction(request.tx).awaitRelayed().get();
System.out.println("Send complete, waiting for confirmation");
request.tx.getConfidence().getDepthFuture(1).get();

View File

@ -726,7 +726,7 @@ public class WalletTool implements Callable<Integer> {
// Wait for peers to connect, the tx to be sent to one of them and for it to be propagated across the
// network. Once propagation is complete and we heard the transaction back from all our peers, it will
// be committed to the wallet.
peerGroup.broadcastTransaction(tx).future().get();
peerGroup.broadcastTransaction(tx).awaitRelayed().get();
// Hack for regtest/single peer mode, as we're about to shut down and won't get an ACK from the remote end.
List<Peer> peerList = peerGroup.getConnectedPeers();
if (peerList.size() == 1)
@ -909,7 +909,7 @@ public class WalletTool implements Callable<Integer> {
TransactionBroadcast broadcast = peerGroup.broadcastTransaction(req.tx);
try {
// Wait for broadcast to be sent
broadcast.future().get();
broadcast.awaitRelayed().get();
} catch (InterruptedException | ExecutionException e) {
System.err.println("Failed to broadcast payment " + e.getMessage());
System.exit(1);