ListenableCompletableFuture, ListenableCompletionStage: complete migration to CompletableFuture

Release 0.17 was the transition release to allow users
to migrate from ListenableFuture to CompletableFuture.
In Release 0.18 we are removing ListenableCompletableFuture and
ListenableCompletionStage and changing method signatures to
return CompletableFuture.

This is technically a breaking change and, of necessity, we weren't
able to mark everything that is going away as "deprecated", but
we did put a special notice in the 0.17 release notes and the migration
should not be too difficult.
This commit is contained in:
Sean Gilligan 2024-09-01 21:34:01 -07:00 committed by Andreas Schildbach
parent d245256b96
commit 32cdb9fa8b
19 changed files with 101 additions and 239 deletions

View File

@ -27,7 +27,6 @@ import org.bitcoinj.script.ScriptException;
import org.bitcoinj.store.BlockStore;
import org.bitcoinj.store.BlockStoreException;
import org.bitcoinj.store.SPVBlockStore;
import org.bitcoinj.utils.ListenableCompletableFuture;
import org.bitcoinj.utils.ListenerRegistration;
import org.bitcoinj.utils.Threading;
import org.bitcoinj.utils.VersionTally;
@ -51,6 +50,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantLock;
@ -1071,8 +1071,8 @@ public abstract class AbstractBlockChain {
* @param height desired height
* @return future that will complete when height is reached
*/
public ListenableCompletableFuture<StoredBlock> getHeightFuture(final int height) {
final ListenableCompletableFuture<StoredBlock> result = new ListenableCompletableFuture<>();
public CompletableFuture<StoredBlock> getHeightFuture(final int height) {
final CompletableFuture<StoredBlock> result = new CompletableFuture<>();
addNewBestBlockListener(Threading.SAME_THREAD, new NewBestBlockListener() {
@Override
public void notifyNewBestBlock(StoredBlock block) throws VerificationException {

View File

@ -37,7 +37,6 @@ import org.bitcoinj.net.StreamConnection;
import org.bitcoinj.store.BlockStore;
import org.bitcoinj.store.BlockStoreException;
import org.bitcoinj.base.internal.FutureUtils;
import org.bitcoinj.utils.ListenableCompletableFuture;
import org.bitcoinj.utils.ListenerRegistration;
import org.bitcoinj.utils.Threading;
import org.bitcoinj.wallet.Wallet;
@ -424,15 +423,15 @@ public class Peer extends PeerSocketHandler {
}
/**
* Provides a ListenableCompletableFuture that can be used to wait for the socket to connect. A socket connection does not
* Provides a CompletableFuture that can be used to wait for the socket to connect. A socket connection does not
* mean that protocol handshake has occurred.
*/
public ListenableCompletableFuture<Peer> getConnectionOpenFuture() {
return ListenableCompletableFuture.of(connectionOpenFuture);
public CompletableFuture<Peer> getConnectionOpenFuture() {
return connectionOpenFuture;
}
public ListenableCompletableFuture<Peer> getVersionHandshakeFuture() {
return ListenableCompletableFuture.of(versionHandshakeFuture);
public CompletableFuture<Peer> getVersionHandshakeFuture() {
return versionHandshakeFuture;
}
@Override
@ -816,12 +815,12 @@ public class Peer extends PeerSocketHandler {
* @param tx The transaction
* @return A Future for a list of dependent transactions
*/
public ListenableCompletableFuture<List<Transaction>> downloadDependencies(Transaction tx) {
public CompletableFuture<List<Transaction>> downloadDependencies(Transaction tx) {
TransactionConfidence.ConfidenceType txConfidence = tx.getConfidence().getConfidenceType();
checkArgument(txConfidence != TransactionConfidence.ConfidenceType.BUILDING);
log.info("{}: Downloading dependencies of {}", getAddress(), tx.getTxId());
// future will be invoked when the entire dependency tree has been walked and the results compiled.
return ListenableCompletableFuture.of(downloadDependenciesInternal(tx, vDownloadTxDependencyDepth, 0));
return downloadDependenciesInternal(tx, vDownloadTxDependencyDepth, 0);
}
/**
@ -1275,11 +1274,11 @@ public class Peer extends PeerSocketHandler {
* If you want the block right away and don't mind waiting for it, just call .get() on the result. Your thread
* will block until the peer answers.
*/
public ListenableCompletableFuture<Block> getBlock(Sha256Hash blockHash) {
public CompletableFuture<Block> getBlock(Sha256Hash blockHash) {
// This does not need to be locked.
log.info("Request to fetch block {}", blockHash);
GetDataMessage getdata = GetDataMessage.ofBlock(blockHash, true);
return ListenableCompletableFuture.of(sendSingleGetData(getdata));
return sendSingleGetData(getdata);
}
/**
@ -1287,12 +1286,12 @@ public class Peer extends PeerSocketHandler {
* retrieved this way because peers don't have a transaction ID to transaction-pos-on-disk index, and besides,
* in future many peers will delete old transaction data they don't need.
*/
public ListenableCompletableFuture<Transaction> getPeerMempoolTransaction(Sha256Hash hash) {
public CompletableFuture<Transaction> getPeerMempoolTransaction(Sha256Hash hash) {
// This does not need to be locked.
// TODO: Unit test this method.
log.info("Request to fetch peer mempool tx {}", hash);
GetDataMessage getdata = GetDataMessage.ofTransaction(hash, vPeerVersionMessage.services().has(Services.NODE_WITNESS));
return ListenableCompletableFuture.of(sendSingleGetData(getdata));
return sendSingleGetData(getdata);
}
/** Sends a getdata with a single item in it. */
@ -1306,8 +1305,8 @@ public class Peer extends PeerSocketHandler {
}
/** Sends a getaddr request to the peer and returns a future that completes with the answer once the peer has replied. */
public ListenableCompletableFuture<AddressMessage> getAddr() {
ListenableCompletableFuture<AddressMessage> future = new ListenableCompletableFuture<>();
public CompletableFuture<AddressMessage> getAddr() {
CompletableFuture<AddressMessage> future = new CompletableFuture<>();
synchronized (getAddrFutures) {
getAddrFutures.add(future);
}
@ -1558,8 +1557,8 @@ public class Peer extends PeerSocketHandler {
* @deprecated Use {@link #sendPing()}
*/
@Deprecated
public ListenableCompletableFuture<Long> ping() {
return ListenableCompletableFuture.of(sendPing().thenApply(Duration::toMillis));
public CompletableFuture<Long> ping() {
return sendPing().thenApply(Duration::toMillis);
}
/**

View File

@ -50,7 +50,6 @@ import org.bitcoinj.script.Script;
import org.bitcoinj.script.ScriptPattern;
import org.bitcoinj.utils.ContextPropagatingThreadFactory;
import org.bitcoinj.utils.ExponentialBackoff;
import org.bitcoinj.utils.ListenableCompletableFuture;
import org.bitcoinj.utils.ListenerRegistration;
import org.bitcoinj.utils.Threading;
import org.bitcoinj.wallet.Wallet;
@ -1137,7 +1136,7 @@ public class PeerGroup implements TransactionBroadcaster {
* Starts the PeerGroup and begins network activity.
* @return A future that completes when first connection activity has been triggered (note: not first connection made).
*/
public ListenableCompletableFuture<Void> startAsync() {
public CompletableFuture<Void> startAsync() {
// This is run in a background thread by the Service implementation.
if (chain == null) {
// Just try to help catch what might be a programming error.
@ -1160,7 +1159,7 @@ public class PeerGroup implements TransactionBroadcaster {
log.error("Exception when starting up", e); // The executor swallows exceptions :(
}
}, executor);
return ListenableCompletableFuture.of(future);
return future;
}
/** Does a blocking startup. */
@ -1168,7 +1167,7 @@ public class PeerGroup implements TransactionBroadcaster {
startAsync().join();
}
public ListenableCompletableFuture<Void> stopAsync() {
public CompletableFuture<Void> stopAsync() {
checkState(vRunning);
vRunning = false;
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
@ -1190,7 +1189,7 @@ public class PeerGroup implements TransactionBroadcaster {
}
}, executor);
executor.shutdown();
return ListenableCompletableFuture.of(future);
return future;
}
/** Does a blocking stop */
@ -1263,12 +1262,12 @@ public class PeerGroup implements TransactionBroadcaster {
* than the current chain head, the relevant parts of the chain won't be redownloaded for you.</p>
*
* <p>This method invokes {@link PeerGroup#recalculateFastCatchupAndFilter(FilterRecalculateMode)}.
* The return value of this method is the {@code ListenableCompletableFuture} returned by that invocation.</p>
* The return value of this method is the {@code CompletableFuture} returned by that invocation.</p>
*
* @return a future that completes once each {@code Peer} in this group has had its
* {@code BloomFilter} (re)set.
*/
public ListenableCompletableFuture<BloomFilter> addPeerFilterProvider(PeerFilterProvider provider) {
public CompletableFuture<BloomFilter> addPeerFilterProvider(PeerFilterProvider provider) {
lock.lock();
try {
Objects.requireNonNull(provider);
@ -1287,7 +1286,7 @@ public class PeerGroup implements TransactionBroadcaster {
// if a key is added. Of course, by then we may have downloaded the chain already. Ideally adding keys would
// automatically rewind the block chain and redownload the blocks to find transactions relevant to those keys,
// all transparently and in the background. But we are a long way from that yet.
ListenableCompletableFuture<BloomFilter> future = recalculateFastCatchupAndFilter(FilterRecalculateMode.SEND_IF_CHANGED);
CompletableFuture<BloomFilter> future = recalculateFastCatchupAndFilter(FilterRecalculateMode.SEND_IF_CHANGED);
updateVersionMessageRelayTxesBeforeFilter(getVersionMessage());
return future;
} finally {
@ -1331,7 +1330,7 @@ public class PeerGroup implements TransactionBroadcaster {
DONT_SEND,
}
private final Map<FilterRecalculateMode, ListenableCompletableFuture<BloomFilter>> inFlightRecalculations = Maps.newHashMap();
private final Map<FilterRecalculateMode, CompletableFuture<BloomFilter>> inFlightRecalculations = Maps.newHashMap();
/**
* Recalculates the bloom filter given to peers as well as the timestamp after which full blocks are downloaded
@ -1341,8 +1340,8 @@ public class PeerGroup implements TransactionBroadcaster {
* @param mode In what situations to send the filter to connected peers.
* @return a future that completes once the filter has been calculated (note: this does not mean acknowledged by remote peers).
*/
public ListenableCompletableFuture<BloomFilter> recalculateFastCatchupAndFilter(final FilterRecalculateMode mode) {
final ListenableCompletableFuture<BloomFilter> future = new ListenableCompletableFuture<>();
public CompletableFuture<BloomFilter> recalculateFastCatchupAndFilter(final FilterRecalculateMode mode) {
final CompletableFuture<BloomFilter> future = new CompletableFuture<>();
synchronized (inFlightRecalculations) {
if (inFlightRecalculations.get(mode) != null)
return inFlightRecalculations.get(mode);
@ -2016,7 +2015,7 @@ public class PeerGroup implements TransactionBroadcaster {
* @param numPeers How many peers to wait for.
* @return a future that will be triggered when the number of connected peers is greater than or equals numPeers
*/
public ListenableCompletableFuture<List<Peer>> waitForPeers(final int numPeers) {
public CompletableFuture<List<Peer>> waitForPeers(final int numPeers) {
return waitForPeersOfVersion(numPeers, 0);
}
@ -2028,14 +2027,14 @@ public class PeerGroup implements TransactionBroadcaster {
* @param protocolVersion The protocol version the awaited peers must implement (or better).
* @return a future that will be triggered when the number of connected peers implementing protocolVersion or higher is greater than or equals numPeers
*/
public ListenableCompletableFuture<List<Peer>> waitForPeersOfVersion(final int numPeers, final long protocolVersion) {
public CompletableFuture<List<Peer>> waitForPeersOfVersion(final int numPeers, final long protocolVersion) {
List<Peer> foundPeers = findPeersOfAtLeastVersion(protocolVersion);
if (foundPeers.size() >= numPeers) {
ListenableCompletableFuture<List<Peer>> f = new ListenableCompletableFuture<>();
CompletableFuture<List<Peer>> f = new CompletableFuture<>();
f.complete(foundPeers);
return f;
}
final ListenableCompletableFuture<List<Peer>> future = new ListenableCompletableFuture<List<Peer>>();
final CompletableFuture<List<Peer>> future = new CompletableFuture<List<Peer>>();
addConnectedEventListener(new PeerConnectedEventListener() {
@Override
public void onPeerConnected(Peer peer, int peerCount) {
@ -2073,16 +2072,16 @@ public class PeerGroup implements TransactionBroadcaster {
* @param mask An integer representing a bit mask that will be ANDed with the peers advertised service masks.
* @return a future that will be triggered when the number of connected peers implementing protocolVersion or higher is greater than or equals numPeers
*/
public ListenableCompletableFuture<List<Peer>> waitForPeersWithServiceMask(final int numPeers, final int mask) {
public CompletableFuture<List<Peer>> waitForPeersWithServiceMask(final int numPeers, final int mask) {
lock.lock();
try {
List<Peer> foundPeers = findPeersWithServiceMask(mask);
if (foundPeers.size() >= numPeers) {
ListenableCompletableFuture<List<Peer>> f = new ListenableCompletableFuture<>();
CompletableFuture<List<Peer>> f = new CompletableFuture<>();
f.complete(foundPeers);
return f;
}
final ListenableCompletableFuture<List<Peer>> future = new ListenableCompletableFuture<>();
final CompletableFuture<List<Peer>> future = new CompletableFuture<>();
addConnectedEventListener(new PeerConnectedEventListener() {
@Override
public void onPeerConnected(Peer peer, int peerCount) {
@ -2165,7 +2164,7 @@ public class PeerGroup implements TransactionBroadcaster {
* peers. Once all connected peers have announced the transaction, the future available via the
* {@link TransactionBroadcast#awaitRelayed()} ()} method will be completed. If anything goes
* wrong the exception will be thrown when get() is called, or you can receive it via a callback on the
* {@link ListenableCompletableFuture}. This method returns immediately, so if you want it to block just call get() on the
* {@link CompletableFuture}. This method returns immediately, so if you want it to block just call get() on the
* result.</p>
*
* <p>Optionally, peers will be dropped after they have been used for broadcasting the transaction and they have

View File

@ -17,13 +17,13 @@
package org.bitcoinj.core;
import com.google.common.annotations.VisibleForTesting;
import org.bitcoinj.base.internal.FutureUtils;
import org.bitcoinj.net.MessageWriteTarget;
import org.bitcoinj.net.NioClient;
import org.bitcoinj.net.NioClientManager;
import org.bitcoinj.net.SocketTimeoutTask;
import org.bitcoinj.net.StreamConnection;
import org.bitcoinj.net.TimeoutHandler;
import org.bitcoinj.utils.ListenableCompletableFuture;
import org.bitcoinj.utils.Threading;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -38,6 +38,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.NotYetConnectedException;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import static org.bitcoinj.base.internal.Preconditions.checkArgument;
@ -92,7 +93,7 @@ public abstract class PeerSocketHandler implements TimeoutHandler, StreamConnect
* the peer will have received it. Throws NotYetConnectedException if we are not yet connected to the remote peer.
* TODO: Maybe use something other than the unchecked NotYetConnectedException here
*/
public ListenableCompletableFuture<Void> sendMessage(Message message) throws NotYetConnectedException {
public CompletableFuture<Void> sendMessage(Message message) throws NotYetConnectedException {
lock.lock();
try {
if (writeTarget == null)
@ -107,7 +108,7 @@ public abstract class PeerSocketHandler implements TimeoutHandler, StreamConnect
return writeTarget.writeBytes(out.toByteArray());
} catch (IOException e) {
exceptionCaught(e);
return ListenableCompletableFuture.failedFuture(e);
return FutureUtils.failedFuture(e);
}
}

View File

@ -21,7 +21,6 @@ import org.bitcoinj.base.internal.FutureUtils;
import org.bitcoinj.base.internal.StreamUtils;
import org.bitcoinj.base.internal.InternalUtils;
import org.bitcoinj.core.listeners.PreMessageReceivedEventListener;
import org.bitcoinj.utils.ListenableCompletableFuture;
import org.bitcoinj.utils.Threading;
import org.bitcoinj.wallet.Wallet;
import org.slf4j.Logger;
@ -89,13 +88,13 @@ public class TransactionBroadcast {
public static TransactionBroadcast createMockBroadcast(Transaction tx, final CompletableFuture<Transaction> future) {
return new TransactionBroadcast(tx) {
@Override
public ListenableCompletableFuture<Transaction> broadcast() {
return ListenableCompletableFuture.of(future);
public CompletableFuture<Transaction> broadcast() {
return future;
}
@Override
public ListenableCompletableFuture<Transaction> future() {
return ListenableCompletableFuture.of(future);
public CompletableFuture<Transaction> future() {
return future;
}
};
}
@ -105,8 +104,8 @@ public class TransactionBroadcast {
* @deprecated Use {@link #awaitRelayed()} (and maybe {@link CompletableFuture#thenApply(Function)})
*/
@Deprecated
public ListenableCompletableFuture<Transaction> future() {
return ListenableCompletableFuture.of(awaitRelayed().thenApply(TransactionBroadcast::transaction));
public CompletableFuture<Transaction> future() {
return awaitRelayed().thenApply(TransactionBroadcast::transaction);
}
public void setMinConnections(int minConnections) {
@ -246,10 +245,8 @@ public class TransactionBroadcast {
* @deprecated Use {@link #broadcastAndAwaitRelay()} or {@link #broadcastOnly()} as appropriate
*/
@Deprecated
public ListenableCompletableFuture<Transaction> broadcast() {
return ListenableCompletableFuture.of(
broadcastAndAwaitRelay().thenApply(TransactionBroadcast::transaction)
);
public CompletableFuture<Transaction> broadcast() {
return broadcastAndAwaitRelay().thenApply(TransactionBroadcast::transaction);
}
private CompletableFuture<Void> broadcastOne(Peer peer) {

View File

@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import org.bitcoinj.base.Sha256Hash;
import org.bitcoinj.base.internal.TimeUtils;
import org.bitcoinj.utils.ListenableCompletableFuture;
import org.bitcoinj.utils.ListenerRegistration;
import org.bitcoinj.utils.Threading;
import org.bitcoinj.wallet.CoinSelector;
@ -35,6 +34,7 @@ import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@ -535,8 +535,8 @@ public class TransactionConfidence {
* depth to one will wait until it appears in a block on the best chain, and zero will wait until it has been seen
* on the network.
*/
private synchronized ListenableCompletableFuture<TransactionConfidence> getDepthFuture(final int depth, Executor executor) {
final ListenableCompletableFuture<TransactionConfidence> result = new ListenableCompletableFuture<>();
private synchronized CompletableFuture<TransactionConfidence> getDepthFuture(final int depth, Executor executor) {
final CompletableFuture<TransactionConfidence> result = new CompletableFuture<>();
if (getDepthInBlocks() >= depth) {
result.complete(this);
}
@ -551,7 +551,7 @@ public class TransactionConfidence {
return result;
}
public synchronized ListenableCompletableFuture<TransactionConfidence> getDepthFuture(final int depth) {
public synchronized CompletableFuture<TransactionConfidence> getDepthFuture(final int depth) {
return getDepthFuture(depth, Threading.USER_THREAD);
}

View File

@ -21,7 +21,6 @@ import org.bitcoinj.base.internal.TimeUtils;
import org.bitcoinj.core.Block;
import org.bitcoinj.core.FilteredBlock;
import org.bitcoinj.core.Peer;
import org.bitcoinj.utils.ListenableCompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -127,7 +126,7 @@ public class DownloadProgressTracker implements BlockchainDownloadEventListener
* Returns a listenable future that completes with the height of the best chain (as reported by the peer) once chain
* download seems to be finished.
*/
public ListenableCompletableFuture<Long> getFuture() {
return ListenableCompletableFuture.of(future);
public CompletableFuture<Long> getFuture() {
return future;
}
}

View File

@ -18,7 +18,6 @@ package org.bitcoinj.net;
import org.bitcoinj.core.Context;
import org.bitcoinj.core.Peer;
import org.bitcoinj.utils.ListenableCompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -147,12 +146,12 @@ public class BlockingClient implements MessageWriteTarget {
}
@Override
public synchronized ListenableCompletableFuture<Void> writeBytes(byte[] message) throws IOException {
public synchronized CompletableFuture<Void> writeBytes(byte[] message) throws IOException {
try {
OutputStream stream = socket.getOutputStream();
stream.write(message);
stream.flush();
return ListenableCompletableFuture.completedFuture(null);
return CompletableFuture.completedFuture(null);
} catch (IOException e) {
log.error("Error writing message to connection, closing connection", e);
closeConnection();
@ -161,7 +160,7 @@ public class BlockingClient implements MessageWriteTarget {
}
/** Returns a future that completes once connection has occurred at the socket level or with an exception if failed to connect. */
public ListenableCompletableFuture<SocketAddress> getConnectFuture() {
return ListenableCompletableFuture.of(connectFuture);
public CompletableFuture<SocketAddress> getConnectFuture() {
return connectFuture;
}
}

View File

@ -17,7 +17,6 @@
package org.bitcoinj.net;
import com.google.common.util.concurrent.AbstractIdleService;
import org.bitcoinj.utils.ListenableCompletableFuture;
import javax.net.SocketFactory;
import java.io.IOException;
@ -28,6 +27,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
/**
* <p>A thin wrapper around a set of {@link BlockingClient}s.</p>
@ -55,7 +55,7 @@ public class BlockingClientManager extends AbstractIdleService implements Client
}
@Override
public ListenableCompletableFuture<SocketAddress> openConnection(SocketAddress serverAddress, StreamConnection connection) {
public CompletableFuture<SocketAddress> openConnection(SocketAddress serverAddress, StreamConnection connection) {
try {
if (!isRunning())
throw new IllegalStateException();

View File

@ -17,9 +17,9 @@
package org.bitcoinj.net;
import com.google.common.util.concurrent.Service;
import org.bitcoinj.utils.ListenableCompletableFuture;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
/**
* <p>A generic interface for an object which keeps track of a set of open client connections, creates new ones and
@ -33,7 +33,7 @@ public interface ClientConnectionManager extends Service {
* Creates a new connection to the given address, with the given connection used to handle incoming data. Any errors
* that occur during connection will be returned in the given future, including errors that can occur immediately.
*/
ListenableCompletableFuture<SocketAddress> openConnection(SocketAddress serverAddress, StreamConnection connection);
CompletableFuture<SocketAddress> openConnection(SocketAddress serverAddress, StreamConnection connection);
/** Gets the number of connected peers */
int getConnectedClientCount();

View File

@ -18,7 +18,6 @@ package org.bitcoinj.net;
import com.google.common.base.Throwables;
import org.bitcoinj.core.Message;
import org.bitcoinj.utils.ListenableCompletableFuture;
import org.bitcoinj.utils.Threading;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,6 +35,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;
import static org.bitcoinj.base.internal.Preconditions.checkState;
@ -68,9 +68,9 @@ class ConnectionHandler implements MessageWriteTarget {
private static class BytesAndFuture {
public final ByteBuffer bytes;
public final ListenableCompletableFuture<Void> future;
public final CompletableFuture<Void> future;
public BytesAndFuture(ByteBuffer bytes, ListenableCompletableFuture<Void> future) {
public BytesAndFuture(ByteBuffer bytes, CompletableFuture<Void> future) {
this.bytes = bytes;
this.future = future;
}
@ -148,7 +148,7 @@ class ConnectionHandler implements MessageWriteTarget {
}
@Override
public ListenableCompletableFuture<Void> writeBytes(byte[] message) throws IOException {
public CompletableFuture<Void> writeBytes(byte[] message) throws IOException {
boolean andUnlock = true;
lock.lock();
try {
@ -161,7 +161,7 @@ class ConnectionHandler implements MessageWriteTarget {
throw new IOException("Outbound buffer overflowed");
// Just dump the message onto the write buffer and call tryWriteBytes
// TODO: Kill the needless message duplication when the write completes right away
final ListenableCompletableFuture<Void> future = new ListenableCompletableFuture<>();
final CompletableFuture<Void> future = new CompletableFuture<>();
bytesToWrite.offer(new BytesAndFuture(ByteBuffer.wrap(Arrays.copyOf(message, message.length)), future));
bytesToWriteRemaining += message.length;
setWriteOps();

View File

@ -16,9 +16,8 @@
package org.bitcoinj.net;
import org.bitcoinj.utils.ListenableCompletableFuture;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
/**
* A target to which messages can be written/connection can be closed
@ -28,7 +27,7 @@ public interface MessageWriteTarget {
* Writes the given bytes to the remote server. The returned future will complete when all bytes
* have been written to the OS network buffer.
*/
ListenableCompletableFuture<Void> writeBytes(byte[] message) throws IOException;
CompletableFuture<Void> writeBytes(byte[] message) throws IOException;
/**
* Closes the connection to the server, triggering the {@link StreamConnection#connectionClosed()}
* event on the network-handling thread where all callbacks occur.

View File

@ -18,7 +18,6 @@
package org.bitcoinj.net;
import com.google.common.base.Throwables;
import org.bitcoinj.utils.ListenableCompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -26,6 +25,7 @@ import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
/**
* Creates a simple connection to a server using a {@link StreamConnection} to process data.
@ -129,7 +129,7 @@ public class NioClient implements MessageWriteTarget {
}
@Override
public synchronized ListenableCompletableFuture<Void> writeBytes(byte[] message) throws IOException {
public synchronized CompletableFuture<Void> writeBytes(byte[] message) throws IOException {
return handler.writeTarget.writeBytes(message);
}
}

View File

@ -18,8 +18,8 @@ package org.bitcoinj.net;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import org.bitcoinj.base.internal.FutureUtils;
import org.bitcoinj.utils.ContextPropagatingThreadFactory;
import org.bitcoinj.utils.ListenableCompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -156,7 +156,7 @@ public class NioClientManager extends AbstractExecutionThreadService implements
}
@Override
public ListenableCompletableFuture<SocketAddress> openConnection(SocketAddress serverAddress, StreamConnection connection) {
public CompletableFuture<SocketAddress> openConnection(SocketAddress serverAddress, StreamConnection connection) {
if (!isRunning())
throw new IllegalStateException();
// Create a new connection, give it a connection as an attachment
@ -167,9 +167,9 @@ public class NioClientManager extends AbstractExecutionThreadService implements
PendingConnect data = new PendingConnect(sc, connection, serverAddress);
newConnectionChannels.offer(data);
selector.wakeup();
return ListenableCompletableFuture.of(data.future);
return data.future;
} catch (Throwable e) {
return ListenableCompletableFuture.failedFuture(e);
return FutureUtils.failedFuture(e);
}
}

View File

@ -30,7 +30,6 @@ import org.bitcoinj.params.MainNetParams;
import org.bitcoinj.protocols.payments.PaymentProtocol.PkiVerificationData;
import org.bitcoinj.uri.BitcoinURI;
import org.bitcoinj.base.internal.FutureUtils;
import org.bitcoinj.utils.ListenableCompletableFuture;
import org.bitcoinj.utils.Threading;
import org.bitcoinj.wallet.SendRequest;
@ -62,7 +61,7 @@ import java.util.concurrent.ExecutorService;
* </ul>
*
* <p>If initialized with a BitcoinURI or a url, a network request is made for the payment request object and a
* {@link ListenableCompletableFuture} is returned that will be notified with the PaymentSession object after it is downloaded.</p>
* {@link CompletableFuture} is returned that will be notified with the PaymentSession object after it is downloaded.</p>
*
* <p>Once the PaymentSession is initialized, typically a wallet application will prompt the user to confirm that the
* amount and recipient are correct, perform any additional steps, and then construct a list of transactions to pass to
@ -70,7 +69,7 @@ import java.util.concurrent.ExecutorService;
*
* <p>Call sendPayment with a list of transactions that will be broadcast. A {@link Protos.Payment} message will be sent
* to the merchant if a payment url is provided in the PaymentRequest. NOTE: sendPayment does NOT broadcast the
* transactions to the bitcoin network. Instead it returns a ListenableCompletableFuture that will be notified when a
* transactions to the bitcoin network. Instead it returns a CompletableFuture that will be notified when a
* {@link Protos.PaymentACK} is received from the merchant. Typically a wallet will show the message to the user
* as a confirmation message that the payment is now "processing" or that an error occurred, and then broadcast the
* tx itself later if needed.</p>
@ -99,7 +98,7 @@ public class PaymentSession {
* the signature provided by the payment request. An exception is thrown by the future if the signature cannot
* be verified.</p>
*/
public static ListenableCompletableFuture<PaymentSession> createFromBitcoinUri(final BitcoinURI uri) throws PaymentProtocolException {
public static CompletableFuture<PaymentSession> createFromBitcoinUri(final BitcoinURI uri) throws PaymentProtocolException {
return createFromBitcoinUri(uri, true, null);
}
@ -111,7 +110,7 @@ public class PaymentSession {
* be used to verify the signature provided by the payment request. An exception is thrown by the future if the
* signature cannot be verified.
*/
public static ListenableCompletableFuture<PaymentSession> createFromBitcoinUri(final BitcoinURI uri, final boolean verifyPki)
public static CompletableFuture<PaymentSession> createFromBitcoinUri(final BitcoinURI uri, final boolean verifyPki)
throws PaymentProtocolException {
return createFromBitcoinUri(uri, verifyPki, null);
}
@ -125,13 +124,13 @@ public class PaymentSession {
* signature cannot be verified.
* If trustStoreLoader is null, the system default trust store is used.
*/
public static ListenableCompletableFuture<PaymentSession> createFromBitcoinUri(final BitcoinURI uri, final boolean verifyPki, @Nullable final TrustStoreLoader trustStoreLoader)
public static CompletableFuture<PaymentSession> createFromBitcoinUri(final BitcoinURI uri, final boolean verifyPki, @Nullable final TrustStoreLoader trustStoreLoader)
throws PaymentProtocolException {
String url = uri.getPaymentRequestUrl();
if (url == null)
throw new PaymentProtocolException.InvalidPaymentRequestURL("No payment request URL (r= parameter) in BitcoinURI " + uri);
try {
return ListenableCompletableFuture.of(fetchPaymentRequest(new URI(url), verifyPki, trustStoreLoader));
return fetchPaymentRequest(new URI(url), verifyPki, trustStoreLoader);
} catch (URISyntaxException e) {
throw new PaymentProtocolException.InvalidPaymentRequestURL(e);
}
@ -144,7 +143,7 @@ public class PaymentSession {
* be used to verify the signature provided by the payment request. An exception is thrown by the future if the
* signature cannot be verified.
*/
public static ListenableCompletableFuture<PaymentSession> createFromUrl(final String url) throws PaymentProtocolException {
public static CompletableFuture<PaymentSession> createFromUrl(final String url) throws PaymentProtocolException {
return createFromUrl(url, true, null);
}
@ -155,7 +154,7 @@ public class PaymentSession {
* be used to verify the signature provided by the payment request. An exception is thrown by the future if the
* signature cannot be verified.
*/
public static ListenableCompletableFuture<PaymentSession> createFromUrl(final String url, final boolean verifyPki)
public static CompletableFuture<PaymentSession> createFromUrl(final String url, final boolean verifyPki)
throws PaymentProtocolException {
return createFromUrl(url, verifyPki, null);
}
@ -168,12 +167,12 @@ public class PaymentSession {
* signature cannot be verified.
* If trustStoreLoader is null, the system default trust store is used.
*/
public static ListenableCompletableFuture<PaymentSession> createFromUrl(final String url, final boolean verifyPki, @Nullable final TrustStoreLoader trustStoreLoader)
public static CompletableFuture<PaymentSession> createFromUrl(final String url, final boolean verifyPki, @Nullable final TrustStoreLoader trustStoreLoader)
throws PaymentProtocolException {
if (url == null)
throw new PaymentProtocolException.InvalidPaymentRequestURL("null paymentRequestUrl");
try {
return ListenableCompletableFuture.of(fetchPaymentRequest(new URI(url), verifyPki, trustStoreLoader));
return fetchPaymentRequest(new URI(url), verifyPki, trustStoreLoader);
} catch(URISyntaxException e) {
throw new PaymentProtocolException.InvalidPaymentRequestURL(e);
}
@ -322,24 +321,24 @@ public class PaymentSession {
* @param memo is a message to include in the payment message sent to the merchant.
* @return a future for the PaymentACK
*/
public ListenableCompletableFuture<PaymentProtocol.Ack> sendPayment(List<Transaction> txns, @Nullable Address refundAddr, @Nullable String memo) {
public CompletableFuture<PaymentProtocol.Ack> sendPayment(List<Transaction> txns, @Nullable Address refundAddr, @Nullable String memo) {
Protos.Payment payment = null;
try {
payment = getPayment(txns, refundAddr, memo);
} catch (IOException e) {
return ListenableCompletableFuture.failedFuture(e);
return FutureUtils.failedFuture(e);
}
if (payment == null)
return ListenableCompletableFuture.failedFuture(new PaymentProtocolException.InvalidPaymentRequestURL("Missing Payment URL"));
return FutureUtils.failedFuture(new PaymentProtocolException.InvalidPaymentRequestURL("Missing Payment URL"));
if (isExpired())
return ListenableCompletableFuture.failedFuture(new PaymentProtocolException.Expired("PaymentRequest is expired"));
return FutureUtils.failedFuture(new PaymentProtocolException.Expired("PaymentRequest is expired"));
URL url;
try {
url = new URL(paymentDetails.getPaymentUrl());
} catch (MalformedURLException e) {
return ListenableCompletableFuture.failedFuture(new PaymentProtocolException.InvalidPaymentURL(e));
return FutureUtils.failedFuture(new PaymentProtocolException.InvalidPaymentURL(e));
}
return ListenableCompletableFuture.of(sendPayment(url, payment));
return sendPayment(url, payment);
}
/**

View File

@ -1,90 +0,0 @@
/*
* Copyright by the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bitcoinj.utils;
import org.bitcoinj.base.internal.FutureUtils;
import java.util.concurrent.CompletableFuture;
/**
* A {@link CompletableFuture} that is also a {@link com.google.common.util.concurrent.ListenableFuture} for migration
* from <b>Guava</b> {@code ListenableFuture} to {@link CompletableFuture}. This allows clients of <b>bitcoinj</b> to change the type
* of variables receiving {@code Future}s from bitcoinj methods. You must switch from Guava's
* {@link com.google.common.util.concurrent.ListenableFuture} (and related types) to Java 8's {@link CompletableFuture}.
* Release 0.18 of bitcoinj will <b>remove</b> this class,
* and the type of returned futures from bitcoinj, will be changed to {@link CompletableFuture}.
* <p>
* <b>WARNING: This class should be considered Deprecated for Removal, as it will be removed in Release 0.18.</b> See above for details.
*/
public class ListenableCompletableFuture<V> extends CompletableFuture<V> implements ListenableCompletionStage<V> {
/**
* Returns a new {@link CompletableFuture} that is already completed with
* the given value.
*
* @param value the value
* @param <T> the type of the value
* @return the completed CompletableFuture
* @deprecated Use {@link CompletableFuture#completedFuture(Object)}
*/
@Deprecated
public static <T> ListenableCompletableFuture<T> completedFuture(T value) {
ListenableCompletableFuture<T> future = new ListenableCompletableFuture<>();
future.complete(value);
return future;
}
/**
* Returns a new {@link ListenableCompletableFuture} that is already completed exceptionally
* with the given throwable.
*
* @param throwable the exceptions
* @param <T> the type of the expected value
* @return the completed CompletableFuture
* @deprecated Use {@code new CompletableFuture() + CompletableFuture.completeExceptionally()} or if JDK 9+ use {@code CompletableFuture.failedFuture()}
*/
@Deprecated
public static <T> ListenableCompletableFuture<T> failedFuture(Throwable throwable) {
return ListenableCompletableFuture.of(FutureUtils.failedFuture(throwable));
}
/**
* Converts a generic {@link CompletableFuture} to a {@code ListenableCompletableFuture}. If the passed
* in future is already a {@code ListenableCompletableFuture} no conversion is performed.
* @param future A CompletableFuture that may need to be converted
* @param <T> the type of the futures return value
* @return A ListenableCompletableFuture
* @deprecated Don't convert to {@link ListenableCompletableFuture}, use {@link CompletableFuture} directly.
*/
@Deprecated
public static <T> ListenableCompletableFuture<T> of(CompletableFuture<T> future) {
ListenableCompletableFuture<T> listenable;
if (future instanceof ListenableCompletableFuture) {
listenable = (ListenableCompletableFuture<T>) future;
} else {
listenable = new ListenableCompletableFuture<>();
future.whenComplete((value, ex) -> {
// We can't test for a not-null T `value`, because of the CompletableFuture<Void> case,
// so we test for a null Throwable `ex` instead.
if (ex == null) {
listenable.complete(value);
} else {
listenable.completeExceptionally(ex);
}
});
}
return listenable;
}
}

View File

@ -1,38 +0,0 @@
/*
* Copyright by the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bitcoinj.utils;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
/**
* A {@link CompletionStage} with a {@link ListenableFuture}-compatible interface to smooth migration
* from Guava {@code ListenableFuture} to {@link java.util.concurrent.CompletableFuture}/{@code CompletionStage}.
* <p>
* <b>WARNING: This interface should be considered Deprecated for Removal, It will be removed in Release 0.18</b>. See {@link ListenableCompletableFuture} for details.
*/
public interface ListenableCompletionStage<V> extends CompletionStage<V>, ListenableFuture<V> {
/**
* @deprecated Use {@link java.util.concurrent.CompletableFuture} and {@link java.util.concurrent.CompletableFuture#thenRunAsync(Runnable, Executor)}
*/
@Override
@Deprecated
default void addListener(Runnable listener, Executor executor) {
this.thenRunAsync(listener, executor);
}
}

View File

@ -85,7 +85,6 @@ import org.bitcoinj.signers.MissingSigResolutionSigner;
import org.bitcoinj.signers.TransactionSigner;
import org.bitcoinj.utils.BaseTaggableObject;
import org.bitcoinj.base.internal.FutureUtils;
import org.bitcoinj.utils.ListenableCompletableFuture;
import org.bitcoinj.utils.ListenerRegistration;
import org.bitcoinj.utils.Threading;
import org.bitcoinj.protobuf.wallet.Protos;
@ -3850,7 +3849,7 @@ public class Wallet extends BaseTaggableObject
* you can use {@link Threading#waitForUserCode()} to block until the future had a
* chance to be updated.</p>
*/
public ListenableCompletableFuture<Coin> getBalanceFuture(final Coin value, final BalanceType type) {
public CompletableFuture<Coin> getBalanceFuture(final Coin value, final BalanceType type) {
lock.lock();
try {
final CompletableFuture<Coin> future = new CompletableFuture<>();
@ -3864,7 +3863,7 @@ public class Wallet extends BaseTaggableObject
// avoid giving the user back futures that require the user code thread to be free.
balanceFutureRequests.add(new BalanceFutureRequest(future, value, type));
}
return ListenableCompletableFuture.of(future);
return future;
} finally {
lock.unlock();
}
@ -3980,7 +3979,7 @@ public class Wallet extends BaseTaggableObject
* @deprecated Use {@link #awaitRelayed()}
*/
@Deprecated
public final ListenableCompletableFuture<Transaction> broadcastComplete;
public final CompletableFuture<Transaction> broadcastComplete;
/**
* The broadcast object returned by the linked TransactionBroadcaster
* @deprecated Use {@link #getBroadcast()}
@ -3999,7 +3998,7 @@ public class Wallet extends BaseTaggableObject
public SendResult(TransactionBroadcast broadcast) {
this.tx = broadcast.transaction();
this.broadcast = broadcast;
this.broadcastComplete = ListenableCompletableFuture.of(broadcast.awaitRelayed().thenApply(TransactionBroadcast::transaction));
this.broadcastComplete = broadcast.awaitRelayed().thenApply(TransactionBroadcast::transaction);
}
public Transaction transaction() {
@ -5502,7 +5501,7 @@ public class Wallet extends BaseTaggableObject
* @return A list of transactions that the wallet just made/will make for internal maintenance. Might be empty.
* @throws org.bitcoinj.wallet.DeterministicUpgradeRequiresPassword if key rotation requires the users password.
*/
public ListenableCompletableFuture<List<Transaction>> doMaintenance(@Nullable AesKey aesKey, boolean signAndSend)
public CompletableFuture<List<Transaction>> doMaintenance(@Nullable AesKey aesKey, boolean signAndSend)
throws DeterministicUpgradeRequiresPassword {
return doMaintenance(KeyChainGroupStructure.BIP32, aesKey, signAndSend);
}
@ -5522,7 +5521,7 @@ public class Wallet extends BaseTaggableObject
* @return A list of transactions that the wallet just made/will make for internal maintenance. Might be empty.
* @throws org.bitcoinj.wallet.DeterministicUpgradeRequiresPassword if key rotation requires the users password.
*/
public ListenableCompletableFuture<List<Transaction>> doMaintenance(KeyChainGroupStructure structure,
public CompletableFuture<List<Transaction>> doMaintenance(KeyChainGroupStructure structure,
@Nullable AesKey aesKey, boolean signAndSend) throws DeterministicUpgradeRequiresPassword {
List<Transaction> txns;
lock.lock();
@ -5530,7 +5529,7 @@ public class Wallet extends BaseTaggableObject
try {
txns = maybeRotateKeys(structure, aesKey, signAndSend);
if (!signAndSend)
return ListenableCompletableFuture.completedFuture(txns);
return CompletableFuture.completedFuture(txns);
} finally {
keyChainGroupLock.unlock();
lock.unlock();
@ -5555,7 +5554,7 @@ public class Wallet extends BaseTaggableObject
log.error("Failed to broadcast rekey tx", e);
}
}
return ListenableCompletableFuture.of(FutureUtils.allAsList(futures));
return FutureUtils.allAsList(futures);
}
// Checks to see if any coins are controlled by rotating keys and if so, spends them.

View File

@ -34,7 +34,6 @@ import org.bitcoinj.core.TransactionOutput;
import org.bitcoinj.crypto.TrustStoreLoader;
import org.bitcoinj.params.MainNetParams;
import org.bitcoinj.params.TestNet3Params;
import org.bitcoinj.utils.ListenableCompletableFuture;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@ -212,10 +211,10 @@ public class PaymentSessionTest {
}
@Override
protected ListenableCompletableFuture<PaymentProtocol.Ack> sendPayment(final URL url, final Protos.Payment payment) {
protected CompletableFuture<PaymentProtocol.Ack> sendPayment(final URL url, final Protos.Payment payment) {
paymentLog.add(new PaymentLogItem(url, payment));
// Return a completed future that has a `null` value. This will satisfy the current tests.
return ListenableCompletableFuture.completedFuture(null);
return CompletableFuture.completedFuture(null);
}
public static class PaymentLogItem {