bitcoinj/net: migrate from ListenableFuture to CompletableFuture

This commit is contained in:
Sean Gilligan 2022-03-20 14:09:11 -07:00 committed by Andreas Schildbach
parent d212eb7185
commit 2d60caeadf
11 changed files with 57 additions and 57 deletions

View file

@ -20,9 +20,7 @@ package org.bitcoinj.core;
import com.google.common.annotations.*; import com.google.common.annotations.*;
import com.google.common.base.*; import com.google.common.base.*;
import com.google.common.collect.*; import com.google.common.collect.*;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableScheduledFuture; import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
@ -1381,7 +1379,7 @@ public class PeerGroup implements TransactionBroadcaster {
try { try {
log.info("Attempting connection to {} ({} connected, {} pending, {} max)", address, log.info("Attempting connection to {} ({} connected, {} pending, {} max)", address,
peers.size(), pendingPeers.size(), maxConnections); peers.size(), pendingPeers.size(), maxConnections);
ListenableFuture<SocketAddress> future = channels.openConnection(address.toSocketAddress(), peer); CompletableFuture<SocketAddress> future = channels.openConnection(address.toSocketAddress(), peer);
if (future.isDone()) if (future.isDone())
Uninterruptibles.getUninterruptibly(future); Uninterruptibles.getUninterruptibly(future);
} catch (ExecutionException e) { } catch (ExecutionException e) {

View file

@ -16,13 +16,12 @@
package org.bitcoinj.core; package org.bitcoinj.core;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.bitcoinj.net.AbstractTimeoutHandler; import org.bitcoinj.net.AbstractTimeoutHandler;
import org.bitcoinj.net.MessageWriteTarget; import org.bitcoinj.net.MessageWriteTarget;
import org.bitcoinj.net.NioClient; import org.bitcoinj.net.NioClient;
import org.bitcoinj.net.NioClientManager; import org.bitcoinj.net.NioClientManager;
import org.bitcoinj.net.StreamConnection; import org.bitcoinj.net.StreamConnection;
import org.bitcoinj.utils.ListenableCompletableFuture;
import org.bitcoinj.utils.Threading; import org.bitcoinj.utils.Threading;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -79,7 +78,7 @@ public abstract class PeerSocketHandler extends AbstractTimeoutHandler implement
* the peer will have received it. Throws NotYetConnectedException if we are not yet connected to the remote peer. * the peer will have received it. Throws NotYetConnectedException if we are not yet connected to the remote peer.
* TODO: Maybe use something other than the unchecked NotYetConnectedException here * TODO: Maybe use something other than the unchecked NotYetConnectedException here
*/ */
public ListenableFuture sendMessage(Message message) throws NotYetConnectedException { public ListenableCompletableFuture<Void> sendMessage(Message message) throws NotYetConnectedException {
lock.lock(); lock.lock();
try { try {
if (writeTarget == null) if (writeTarget == null)
@ -94,7 +93,7 @@ public abstract class PeerSocketHandler extends AbstractTimeoutHandler implement
return writeTarget.writeBytes(out.toByteArray()); return writeTarget.writeBytes(out.toByteArray());
} catch (IOException e) { } catch (IOException e) {
exceptionCaught(e); exceptionCaught(e);
return Futures.immediateFailedFuture(e); return ListenableCompletableFuture.failedFuture(e);
} }
} }

View file

@ -18,7 +18,6 @@ package org.bitcoinj.core;
import com.google.common.annotations.*; import com.google.common.annotations.*;
import com.google.common.base.*; import com.google.common.base.*;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles; import com.google.common.util.concurrent.Uninterruptibles;
import org.bitcoinj.utils.*; import org.bitcoinj.utils.*;
import org.bitcoinj.wallet.Wallet; import org.bitcoinj.wallet.Wallet;
@ -160,11 +159,11 @@ public class TransactionBroadcast {
log.info("Sending to {} peers, will wait for {}, sending to: {}", numToBroadcastTo, numWaitingFor, Joiner.on(",").join(peers)); log.info("Sending to {} peers, will wait for {}, sending to: {}", numToBroadcastTo, numWaitingFor, Joiner.on(",").join(peers));
for (final Peer peer : peers) { for (final Peer peer : peers) {
try { try {
ListenableFuture future = peer.sendMessage(tx); CompletableFuture<Void> future = peer.sendMessage(tx);
if (dropPeersAfterBroadcast) { if (dropPeersAfterBroadcast) {
// We drop the peer shortly after the transaction has been sent, because this peer will not // We drop the peer shortly after the transaction has been sent, because this peer will not
// send us back useful broadcast confirmations. // send us back useful broadcast confirmations.
future.addListener(() -> { future.thenRunAsync(() -> {
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
peer.close(); peer.close();
}, Threading.THREAD_POOL); }, Threading.THREAD_POOL);

View file

@ -16,10 +16,8 @@
package org.bitcoinj.net; package org.bitcoinj.net;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.bitcoinj.core.*; import org.bitcoinj.core.*;
import org.bitcoinj.utils.ListenableCompletableFuture;
import org.slf4j.*; import org.slf4j.*;
import javax.annotation.*; import javax.annotation.*;
@ -28,6 +26,7 @@ import java.io.*;
import java.net.*; import java.net.*;
import java.nio.*; import java.nio.*;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture;
import static com.google.common.base.Preconditions.*; import static com.google.common.base.Preconditions.*;
@ -46,7 +45,7 @@ public class BlockingClient implements MessageWriteTarget {
private Socket socket; private Socket socket;
private volatile boolean vCloseRequested = false; private volatile boolean vCloseRequested = false;
private SettableFuture<SocketAddress> connectFuture; private CompletableFuture<SocketAddress> connectFuture;
/** /**
* <p>Creates a new client to the given server address using the given {@link StreamConnection} to decode the data. * <p>Creates a new client to the given server address using the given {@link StreamConnection} to decode the data.
@ -63,7 +62,7 @@ public class BlockingClient implements MessageWriteTarget {
public BlockingClient(final SocketAddress serverAddress, final StreamConnection connection, public BlockingClient(final SocketAddress serverAddress, final StreamConnection connection,
final int connectTimeoutMillis, final SocketFactory socketFactory, final int connectTimeoutMillis, final SocketFactory socketFactory,
@Nullable final Set<BlockingClient> clientSet) throws IOException { @Nullable final Set<BlockingClient> clientSet) throws IOException {
connectFuture = SettableFuture.create(); connectFuture = new CompletableFuture<>();
// Try to fit at least one message in the network buffer, but place an upper and lower limit on its size to make // Try to fit at least one message in the network buffer, but place an upper and lower limit on its size to make
// sure it doesn't get too large or have to call read too often. // sure it doesn't get too large or have to call read too often.
connection.setWriteTarget(this); connection.setWriteTarget(this);
@ -76,13 +75,13 @@ public class BlockingClient implements MessageWriteTarget {
try { try {
socket.connect(serverAddress, connectTimeoutMillis); socket.connect(serverAddress, connectTimeoutMillis);
connection.connectionOpened(); connection.connectionOpened();
connectFuture.set(serverAddress); connectFuture.complete(serverAddress);
InputStream stream = socket.getInputStream(); InputStream stream = socket.getInputStream();
runReadLoop(stream, connection); runReadLoop(stream, connection);
} catch (Exception e) { } catch (Exception e) {
if (!vCloseRequested) { if (!vCloseRequested) {
log.error("Error trying to open/read from connection: {}: {}", serverAddress, e.getMessage()); log.error("Error trying to open/read from connection: {}: {}", serverAddress, e.getMessage());
connectFuture.setException(e); connectFuture.completeExceptionally(e);
} }
} finally { } finally {
try { try {
@ -142,12 +141,12 @@ public class BlockingClient implements MessageWriteTarget {
} }
@Override @Override
public synchronized ListenableFuture writeBytes(byte[] message) throws IOException { public synchronized ListenableCompletableFuture<Void> writeBytes(byte[] message) throws IOException {
try { try {
OutputStream stream = socket.getOutputStream(); OutputStream stream = socket.getOutputStream();
stream.write(message); stream.write(message);
stream.flush(); stream.flush();
return Futures.immediateFuture(null); return ListenableCompletableFuture.completedFuture(null);
} catch (IOException e) { } catch (IOException e) {
log.error("Error writing message to connection, closing connection", e); log.error("Error writing message to connection, closing connection", e);
closeConnection(); closeConnection();
@ -156,7 +155,7 @@ public class BlockingClient implements MessageWriteTarget {
} }
/** Returns a future that completes once connection has occurred at the socket level or with an exception if failed to connect. */ /** Returns a future that completes once connection has occurred at the socket level or with an exception if failed to connect. */
public ListenableFuture<SocketAddress> getConnectFuture() { public ListenableCompletableFuture<SocketAddress> getConnectFuture() {
return connectFuture; return ListenableCompletableFuture.of(connectFuture);
} }
} }

View file

@ -17,7 +17,7 @@
package org.bitcoinj.net; package org.bitcoinj.net;
import com.google.common.util.concurrent.AbstractIdleService; import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.ListenableFuture; import org.bitcoinj.utils.ListenableCompletableFuture;
import javax.net.SocketFactory; import javax.net.SocketFactory;
import java.io.IOException; import java.io.IOException;
@ -55,7 +55,7 @@ public class BlockingClientManager extends AbstractIdleService implements Client
} }
@Override @Override
public ListenableFuture<SocketAddress> openConnection(SocketAddress serverAddress, StreamConnection connection) { public ListenableCompletableFuture<SocketAddress> openConnection(SocketAddress serverAddress, StreamConnection connection) {
try { try {
if (!isRunning()) if (!isRunning())
throw new IllegalStateException(); throw new IllegalStateException();

View file

@ -16,8 +16,8 @@
package org.bitcoinj.net; package org.bitcoinj.net;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service; import com.google.common.util.concurrent.Service;
import org.bitcoinj.utils.ListenableCompletableFuture;
import java.net.SocketAddress; import java.net.SocketAddress;
@ -33,7 +33,7 @@ public interface ClientConnectionManager extends Service {
* Creates a new connection to the given address, with the given connection used to handle incoming data. Any errors * Creates a new connection to the given address, with the given connection used to handle incoming data. Any errors
* that occur during connection will be returned in the given future, including errors that can occur immediately. * that occur during connection will be returned in the given future, including errors that can occur immediately.
*/ */
ListenableFuture<SocketAddress> openConnection(SocketAddress serverAddress, StreamConnection connection); ListenableCompletableFuture<SocketAddress> openConnection(SocketAddress serverAddress, StreamConnection connection);
/** Gets the number of connected peers */ /** Gets the number of connected peers */
int getConnectedClientCount(); int getConnectedClientCount();

View file

@ -17,9 +17,8 @@
package org.bitcoinj.net; package org.bitcoinj.net;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.bitcoinj.core.Message; import org.bitcoinj.core.Message;
import org.bitcoinj.utils.ListenableCompletableFuture;
import org.bitcoinj.utils.Threading; import org.bitcoinj.utils.Threading;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -68,9 +67,9 @@ class ConnectionHandler implements MessageWriteTarget {
private static class BytesAndFuture { private static class BytesAndFuture {
public final ByteBuffer bytes; public final ByteBuffer bytes;
public final SettableFuture future; public final ListenableCompletableFuture<Void> future;
public BytesAndFuture(ByteBuffer bytes, SettableFuture future) { public BytesAndFuture(ByteBuffer bytes, ListenableCompletableFuture<Void> future) {
this.bytes = bytes; this.bytes = bytes;
this.future = future; this.future = future;
} }
@ -132,7 +131,7 @@ class ConnectionHandler implements MessageWriteTarget {
bytesToWriteRemaining -= channel.write(bytesAndFuture.bytes); bytesToWriteRemaining -= channel.write(bytesAndFuture.bytes);
if (!bytesAndFuture.bytes.hasRemaining()) { if (!bytesAndFuture.bytes.hasRemaining()) {
iterator.remove(); iterator.remove();
bytesAndFuture.future.set(null); bytesAndFuture.future.complete(null);
} else { } else {
setWriteOps(); setWriteOps();
break; break;
@ -148,7 +147,7 @@ class ConnectionHandler implements MessageWriteTarget {
} }
@Override @Override
public ListenableFuture writeBytes(byte[] message) throws IOException { public ListenableCompletableFuture<Void> writeBytes(byte[] message) throws IOException {
boolean andUnlock = true; boolean andUnlock = true;
lock.lock(); lock.lock();
try { try {
@ -161,7 +160,7 @@ class ConnectionHandler implements MessageWriteTarget {
throw new IOException("Outbound buffer overflowed"); throw new IOException("Outbound buffer overflowed");
// Just dump the message onto the write buffer and call tryWriteBytes // Just dump the message onto the write buffer and call tryWriteBytes
// TODO: Kill the needless message duplication when the write completes right away // TODO: Kill the needless message duplication when the write completes right away
final SettableFuture<Object> future = SettableFuture.create(); final ListenableCompletableFuture<Void> future = new ListenableCompletableFuture<>();
bytesToWrite.offer(new BytesAndFuture(ByteBuffer.wrap(Arrays.copyOf(message, message.length)), future)); bytesToWrite.offer(new BytesAndFuture(ByteBuffer.wrap(Arrays.copyOf(message, message.length)), future));
bytesToWriteRemaining += message.length; bytesToWriteRemaining += message.length;
setWriteOps(); setWriteOps();

View file

@ -16,7 +16,7 @@
package org.bitcoinj.net; package org.bitcoinj.net;
import com.google.common.util.concurrent.ListenableFuture; import org.bitcoinj.utils.ListenableCompletableFuture;
import java.io.IOException; import java.io.IOException;
@ -28,7 +28,7 @@ public interface MessageWriteTarget {
* Writes the given bytes to the remote server. The returned future will complete when all bytes * Writes the given bytes to the remote server. The returned future will complete when all bytes
* have been written to the OS network buffer. * have been written to the OS network buffer.
*/ */
ListenableFuture writeBytes(byte[] message) throws IOException; ListenableCompletableFuture<Void> writeBytes(byte[] message) throws IOException;
/** /**
* Closes the connection to the server, triggering the {@link StreamConnection#connectionClosed()} * Closes the connection to the server, triggering the {@link StreamConnection#connectionClosed()}
* event on the network-handling thread where all callbacks occur. * event on the network-handling thread where all callbacks occur.

View file

@ -18,10 +18,7 @@
package org.bitcoinj.net; package org.bitcoinj.net;
import com.google.common.base.*; import com.google.common.base.*;
import com.google.common.util.concurrent.FutureCallback; import org.bitcoinj.utils.ListenableCompletableFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.*; import org.slf4j.*;
import java.io.*; import java.io.*;
@ -105,16 +102,11 @@ public class NioClient implements MessageWriteTarget {
manager.startAsync(); manager.startAsync();
manager.awaitRunning(); manager.awaitRunning();
handler = new Handler(parser, connectTimeoutMillis); handler = new Handler(parser, connectTimeoutMillis);
Futures.addCallback(manager.openConnection(serverAddress, handler), new FutureCallback<SocketAddress>() { manager.openConnection(serverAddress, handler).whenComplete((result, t) -> {
@Override if (t != null) {
public void onSuccess(SocketAddress result) {
}
@Override
public void onFailure(Throwable t) {
log.error("Connect to {} failed: {}", serverAddress, Throwables.getRootCause(t)); log.error("Connect to {} failed: {}", serverAddress, Throwables.getRootCause(t));
} }
}, MoreExecutors.directExecutor()); });
} }
@Override @Override
@ -123,7 +115,7 @@ public class NioClient implements MessageWriteTarget {
} }
@Override @Override
public synchronized ListenableFuture writeBytes(byte[] message) throws IOException { public synchronized ListenableCompletableFuture<Void> writeBytes(byte[] message) throws IOException {
return handler.writeTarget.writeBytes(message); return handler.writeTarget.writeBytes(message);
} }
} }

View file

@ -18,9 +18,6 @@ package org.bitcoinj.net;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractExecutionThreadService; import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.bitcoinj.utils.*; import org.bitcoinj.utils.*;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -45,7 +42,7 @@ public class NioClientManager extends AbstractExecutionThreadService implements
SocketChannel sc; SocketChannel sc;
StreamConnection connection; StreamConnection connection;
SocketAddress address; SocketAddress address;
SettableFuture<SocketAddress> future = SettableFuture.create(); CompletableFuture<SocketAddress> future = new CompletableFuture<>();
PendingConnect(SocketChannel sc, StreamConnection connection, SocketAddress address) { this.sc = sc; this.connection = connection; this.address = address; } PendingConnect(SocketChannel sc, StreamConnection connection, SocketAddress address) { this.sc = sc; this.connection = connection; this.address = address; }
} }
@ -68,11 +65,11 @@ public class NioClientManager extends AbstractExecutionThreadService implements
log.info("Connected to {}", sc.socket().getRemoteSocketAddress()); log.info("Connected to {}", sc.socket().getRemoteSocketAddress());
key.interestOps((key.interestOps() | SelectionKey.OP_READ) & ~SelectionKey.OP_CONNECT).attach(handler); key.interestOps((key.interestOps() | SelectionKey.OP_READ) & ~SelectionKey.OP_CONNECT).attach(handler);
connection.connectionOpened(); connection.connectionOpened();
data.future.set(data.address); data.future.complete(data.address);
} else { } else {
log.warn("Failed to connect to {}", sc.socket().getRemoteSocketAddress()); log.warn("Failed to connect to {}", sc.socket().getRemoteSocketAddress());
handler.closeConnection(); // Failed to connect for some reason handler.closeConnection(); // Failed to connect for some reason
data.future.setException(new ConnectException("Unknown reason")); data.future.completeExceptionally(new ConnectException("Unknown reason"));
data.future = null; data.future = null;
} }
} catch (Exception e) { } catch (Exception e) {
@ -82,7 +79,7 @@ public class NioClientManager extends AbstractExecutionThreadService implements
Throwable cause = Throwables.getRootCause(e); Throwable cause = Throwables.getRootCause(e);
log.warn("Failed to connect with exception: {}: {}", cause.getClass().getName(), cause.getMessage(), e); log.warn("Failed to connect with exception: {}: {}", cause.getClass().getName(), cause.getMessage(), e);
handler.closeConnection(); handler.closeConnection();
data.future.setException(cause); data.future.completeExceptionally(cause);
data.future = null; data.future = null;
} }
} else // Process bytes read } else // Process bytes read
@ -148,7 +145,7 @@ public class NioClientManager extends AbstractExecutionThreadService implements
} }
@Override @Override
public ListenableFuture<SocketAddress> openConnection(SocketAddress serverAddress, StreamConnection connection) { public ListenableCompletableFuture<SocketAddress> openConnection(SocketAddress serverAddress, StreamConnection connection) {
if (!isRunning()) if (!isRunning())
throw new IllegalStateException(); throw new IllegalStateException();
// Create a new connection, give it a connection as an attachment // Create a new connection, give it a connection as an attachment
@ -159,9 +156,9 @@ public class NioClientManager extends AbstractExecutionThreadService implements
PendingConnect data = new PendingConnect(sc, connection, serverAddress); PendingConnect data = new PendingConnect(sc, connection, serverAddress);
newConnectionChannels.offer(data); newConnectionChannels.offer(data);
selector.wakeup(); selector.wakeup();
return data.future; return ListenableCompletableFuture.of(data.future);
} catch (Throwable e) { } catch (Throwable e) {
return Futures.immediateFailedFuture(e); return ListenableCompletableFuture.failedFuture(e);
} }
} }

View file

@ -42,6 +42,23 @@ public class ListenableCompletableFuture<V> extends CompletableFuture<V> impleme
return future; return future;
} }
/**
* Returns a new {@link CompletableFuture} that is already completed exceptionally
* the given throwable.
* <p>
* When the migration to {@link CompletableFuture} is finished we'll probably move this
* method to FutureUtils as the {@code failedFuture()} is not available until Java 9.
*
* @param throwable the exceptions
* @param <T> the type of the expected value
* @return the completed CompletableFuture
*/
public static <T> ListenableCompletableFuture<T> failedFuture(Throwable throwable) {
ListenableCompletableFuture<T> future = new ListenableCompletableFuture<>();
future.completeExceptionally(throwable);
return future;
}
/** /**
* Converts a generic {@link CompletableFuture} to a {@code ListenableCompletableFuture}. If the passed * Converts a generic {@link CompletableFuture} to a {@code ListenableCompletableFuture}. If the passed
* in future is already a {@code ListenableCompletableFuture} no conversion is performed. * in future is already a {@code ListenableCompletableFuture} no conversion is performed.