TransactionBroadcast: internally create/complete sentFuture

* Add a private member called `sentFuture` that completes when the broadcast
  is "sent" (written to buffer) to the broadcast peers
* Update the broadcast() method to track success/failure of the individual
  broadcast messages and update sentFuture when the individual messages complete.
  (This is done by using `thenComposeAsync`/`whenComplete`)
* Add comments and TODOs
This commit is contained in:
Sean Gilligan 2022-07-29 13:22:00 -07:00
parent 30595af6e4
commit 32c58ad723

View file

@ -49,6 +49,10 @@ import static com.google.common.base.Preconditions.checkState;
public class TransactionBroadcast {
private static final Logger log = LoggerFactory.getLogger(TransactionBroadcast.class);
// TODO: Decide how to make sentFuture publicly available (and possibly use internally)
// This future completes when all broadcast messages were sent (to a buffer)
private final CompletableFuture<TransactionBroadcast> sentFuture = new CompletableFuture<>();
// This future completes when we have verified that more than numWaitingFor Peers have seen the broadcast
private final CompletableFuture<Transaction> seenFuture = new ListenableCompletableFuture<>();
private final PeerGroup peerGroup;
@ -130,11 +134,30 @@ public class TransactionBroadcast {
}
};
// TODO: Should this method be moved into the PeerGroup?
/**
* Broadcast this transaction to the proper calculated number of peers. Returns a future that completes when the message
* has is "seen" by remote peers. The {@link Transaction} itself is the returned type/value for the future.
* <p>
* The broadcast process includes the following steps:
* <ol>
* <li>Wait until enough {@link org.bitcoinj.core.Peer}s are connected.</li>
* <li>Broadcast the transaction by a determined number of {@link org.bitcoinj.core.Peer}s</li>
* <li>Wait for confirmation from a determined number of remote peers that they have received the broadcast</li>
* <li>Mark {@link TransactionBroadcast#future()} ("sent future") as complete</li>
* </ol>
* <p>
* It should further be noted that "broadcast" in this class means that
* {@link org.bitcoinj.net.MessageWriteTarget#writeBytes} has completed successfully which means the message has
* been sent to the "OS network buffer" -- see {@link org.bitcoinj.net.MessageWriteTarget#writeBytes} or its implementation.
* <p>
* @return A future that completes when the message has been confirmed as seen by the appropriate number of remote peers
*/
public ListenableCompletableFuture<Transaction> broadcast() {
peerGroup.addPreMessageReceivedEventListener(Threading.SAME_THREAD, rejectionListener);
log.info("Waiting for {} peers required for broadcast, we have {} ...", minConnections, peerGroup.getConnectedPeers().size());
final Context context = Context.get();
peerGroup.waitForPeers(minConnections).thenRunAsync(() -> {
peerGroup.waitForPeers(minConnections).thenComposeAsync( peerList /* not used */ -> {
Context.propagate(context);
// We now have enough connected peers to send the transaction.
// This can be called immediately if we already have enough. Otherwise it'll be called from a peer
@ -161,6 +184,8 @@ public class TransactionBroadcast {
numWaitingFor = (int) Math.ceil((peers.size() - numToBroadcastTo) / 2.0);
log.info("broadcastTransaction: We have {} peers, adding {} to the memory pool", peers.size(), tx.getTxId());
log.info("Sending to {} peers, will wait for {}, sending to: {}", numToBroadcastTo, numWaitingFor, InternalUtils.joiner(",").join(peers));
CompletableFuture[] sentFutures = new CompletableFuture[broadcastPeers.size()];
int i = 0;
for (final Peer peer : broadcastPeers) {
try {
CompletableFuture<Void> future = peer.sendMessage(tx);
@ -172,13 +197,28 @@ public class TransactionBroadcast {
peer.close();
}, Threading.THREAD_POOL);
}
sentFutures[i] = future;
i++;
// We don't record the peer as having seen the tx in the memory pool because we want to track only
// how many peers announced to us.
} catch (Exception e) {
// TODO: Put this exception into our returned future
log.error("Caught exception sending to {}", peer, e);
}
}
}, Threading.SAME_THREAD);
// Complete successfully if ALL peer.sendMessage complete successfully, fail otherwise
return CompletableFuture.allOf(sentFutures);
}, Threading.SAME_THREAD)
.whenComplete((v, err) -> {
// Complete `sentFuture` (even though it is currently unused)
if (err == null) {
log.info("broadcast has been written to correct number of peers with peer.sendMessage(tx)");
sentFuture.complete(this);
} else {
log.error("broadcast - one ore more peers failed to send", err);
sentFuture.completeExceptionally(err);
}
});
return ListenableCompletableFuture.of(seenFuture);
}