TransactionBroadcast: add broadcastOnly(), broadcastAndAwaitRelay() deprecate broadcast()

This commit is contained in:
Sean Gilligan 2022-07-29 14:53:34 -07:00 committed by Andreas Schildbach
parent 2bd3a8bcde
commit 295ce50782
3 changed files with 42 additions and 13 deletions

View file

@ -2122,7 +2122,7 @@ public class PeerGroup implements TransactionBroadcaster {
// eventually be collected. This in turn could result in the transaction not being committed to the wallet
// at all.
runningBroadcasts.add(broadcast);
broadcast.broadcast();
broadcast.broadcastOnly();
return broadcast;
}

View file

@ -50,7 +50,6 @@ import static com.google.common.base.Preconditions.checkState;
public class TransactionBroadcast {
private static final Logger log = LoggerFactory.getLogger(TransactionBroadcast.class);
// TODO: Decide how to make sentFuture publicly available (and possibly use internally)
// This future completes when all broadcast messages were sent (to a buffer)
private final CompletableFuture<TransactionBroadcast> sentFuture = new CompletableFuture<>();
@ -138,27 +137,29 @@ public class TransactionBroadcast {
// TODO: Should this method be moved into the PeerGroup?
/**
* Broadcast this transaction to the proper calculated number of peers. Returns a future that completes when the message
* has is "seen" by remote peers. The {@link Transaction} itself is the returned type/value for the future.
* has been "sent" to a set of remote peers. The {@link TransactionBroadcast} itself is the returned type/value for the future.
* <p>
* The broadcast process includes the following steps:
* The complete broadcast process includes the following steps:
* <ol>
* <li>Wait until enough {@link org.bitcoinj.core.Peer}s are connected.</li>
* <li>Broadcast the transaction by a determined number of {@link org.bitcoinj.core.Peer}s</li>
* <li>Broadcast the transaction to a determined number of {@link org.bitcoinj.core.Peer}s</li>
* <li>Wait for confirmation from a determined number of remote peers that they have received the broadcast</li>
* <li>Mark {@link TransactionBroadcast#future()} ("sent future") as complete</li>
* <li>Mark {@link TransactionBroadcast#future()} ("seen future") as complete</li>
* </ol>
* The future returned from this method completes when Step 2 is completed.
* <p>
* It should further be noted that "broadcast" in this class means that
* {@link org.bitcoinj.net.MessageWriteTarget#writeBytes} has completed successfully which means the message has
* been sent to the "OS network buffer" -- see {@link org.bitcoinj.net.MessageWriteTarget#writeBytes} or its implementation.
* <p>
* @return A future that completes when the message has been confirmed as seen by the appropriate number of remote peers
* @return A future that completes when the message has been sent (or at least buffered) to the correct number of remote Peers. The future
* will complete exceptionally if <i>any</i> of the peer broadcasts fails.
*/
public ListenableCompletableFuture<Transaction> broadcast() {
public CompletableFuture<TransactionBroadcast> broadcastOnly() {
peerGroup.addPreMessageReceivedEventListener(Threading.SAME_THREAD, rejectionListener);
log.info("Waiting for {} peers required for broadcast, we have {} ...", minConnections, peerGroup.getConnectedPeers().size());
final Context context = Context.get();
peerGroup.waitForPeers(minConnections).thenComposeAsync( peerList /* not used */ -> {
return peerGroup.waitForPeers(minConnections).thenComposeAsync( peerList /* not used */ -> {
Context.propagate(context);
// We now have enough connected peers to send the transaction.
// This can be called immediately if we already have enough. Otherwise it'll be called from a peer
@ -200,8 +201,36 @@ public class TransactionBroadcast {
log.error("broadcast - one ore more peers failed to send", err);
sentFuture.completeExceptionally(err);
}
});
return ListenableCompletableFuture.of(seenFuture);
})
.thenCompose(v -> sentFuture);
}
/**
* Broadcast the transaction and wait for confirmation that the transaction has been received by the appropriate
* number of Peers before completing.
* @return A future that completes when the message has been relayed by the appropriate number of remote peers
*/
public CompletableFuture<TransactionBroadcast> broadcastAndAwaitRelay() {
return broadcastOnly()
.thenCompose(broadcast -> this.seenFuture)
.thenApply(tx -> this);
}
/**
* If you migrate to {@link #broadcastAndAwaitRelay()} and need a {@link CompletableFuture} that returns
* {@link Transaction} you can use:
* <pre>{@code
* CompletableFuture<Transaction> seenFuture = broadcast
* .broadcastAndAwaitRelay()
* .thenApply(TransactionBroadcast::transaction);
* }</pre>
* @deprecated Use {@link #broadcastAndAwaitRelay()} or {@link #broadcastOnly()} as appropriate
*/
@Deprecated
public ListenableCompletableFuture<Transaction> broadcast() {
return ListenableCompletableFuture.of(
broadcastAndAwaitRelay().thenApply(TransactionBroadcast::transaction)
);
}
private CompletableFuture<Void> broadcastOne(Peer peer) {

View file

@ -85,7 +85,7 @@ public class TransactionBroadcastTest extends TestWithPeerGroup {
TransactionBroadcast broadcast = new TransactionBroadcast(peerGroup, tx);
final AtomicDouble lastProgress = new AtomicDouble();
broadcast.setProgressCallback(lastProgress::set);
CompletableFuture<Transaction> future = broadcast.broadcast();
CompletableFuture<TransactionBroadcast> future = broadcast.broadcastAndAwaitRelay();
assertFalse(future.isDone());
assertEquals(0.0, lastProgress.get(), 0.0);
// We expect two peers to receive a tx message, and at least one of the others must announce for the future to
@ -133,7 +133,7 @@ public class TransactionBroadcastTest extends TestWithPeerGroup {
InboundMessageQueuer[] channels = { connectPeer(0), connectPeer(1), connectPeer(2), connectPeer(3), connectPeer(4) };
Transaction tx = FakeTxBuilder.createFakeTx(UNITTEST);
TransactionBroadcast broadcast = new TransactionBroadcast(peerGroup, tx);
CompletableFuture<Transaction> future = broadcast.broadcast();
CompletableFuture<TransactionBroadcast> future = broadcast.broadcastAndAwaitRelay();
// 0 and 3 are randomly selected to receive the broadcast.
assertEquals(tx, outbound(channels[1]));
assertEquals(tx, outbound(channels[2]));