From afb376fc0750c2d8e3b46b8ab2b54c266dc6f44e Mon Sep 17 00:00:00 2001 From: Andreas Schildbach Date: Fri, 28 Feb 2020 12:40:50 +0100 Subject: [PATCH] MessageWriteTarget: Return a future from writeBytes() that completes when all bytes have been written to the OS network buffer. --- .../org/bitcoinj/core/PeerSocketHandler.java | 7 +++- .../java/org/bitcoinj/net/BlockingClient.java | 3 +- .../org/bitcoinj/net/ConnectionHandler.java | 37 +++++++++++++------ .../org/bitcoinj/net/MessageWriteTarget.java | 7 +++- .../main/java/org/bitcoinj/net/NioClient.java | 4 +- 5 files changed, 40 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/bitcoinj/core/PeerSocketHandler.java b/core/src/main/java/org/bitcoinj/core/PeerSocketHandler.java index 94206d426..45da9420a 100644 --- a/core/src/main/java/org/bitcoinj/core/PeerSocketHandler.java +++ b/core/src/main/java/org/bitcoinj/core/PeerSocketHandler.java @@ -16,6 +16,8 @@ package org.bitcoinj.core; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.bitcoinj.net.AbstractTimeoutHandler; import org.bitcoinj.net.MessageWriteTarget; import org.bitcoinj.net.NioClient; @@ -76,7 +78,7 @@ public abstract class PeerSocketHandler extends AbstractTimeoutHandler implement * the peer will have received it. Throws NotYetConnectedException if we are not yet connected to the remote peer. * TODO: Maybe use something other than the unchecked NotYetConnectedException here */ - public void sendMessage(Message message) throws NotYetConnectedException { + public ListenableFuture sendMessage(Message message) throws NotYetConnectedException { lock.lock(); try { if (writeTarget == null) @@ -88,9 +90,10 @@ public abstract class PeerSocketHandler extends AbstractTimeoutHandler implement ByteArrayOutputStream out = new ByteArrayOutputStream(); try { serializer.serialize(message, out); - writeTarget.writeBytes(out.toByteArray()); + return writeTarget.writeBytes(out.toByteArray()); } catch (IOException e) { exceptionCaught(e); + return Futures.immediateFailedFuture(e); } } diff --git a/core/src/main/java/org/bitcoinj/net/BlockingClient.java b/core/src/main/java/org/bitcoinj/net/BlockingClient.java index b4cfbcec2..a6b2f4d5e 100644 --- a/core/src/main/java/org/bitcoinj/net/BlockingClient.java +++ b/core/src/main/java/org/bitcoinj/net/BlockingClient.java @@ -143,11 +143,12 @@ public class BlockingClient implements MessageWriteTarget { } @Override - public synchronized void writeBytes(byte[] message) throws IOException { + public synchronized ListenableFuture writeBytes(byte[] message) throws IOException { try { OutputStream stream = socket.getOutputStream(); stream.write(message); stream.flush(); + return Futures.immediateFuture(null); } catch (IOException e) { log.error("Error writing message to connection, closing connection", e); closeConnection(); diff --git a/core/src/main/java/org/bitcoinj/net/ConnectionHandler.java b/core/src/main/java/org/bitcoinj/net/ConnectionHandler.java index 704ce5483..0f494e7fe 100644 --- a/core/src/main/java/org/bitcoinj/net/ConnectionHandler.java +++ b/core/src/main/java/org/bitcoinj/net/ConnectionHandler.java @@ -16,9 +16,11 @@ package org.bitcoinj.net; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.bitcoinj.core.Message; import org.bitcoinj.utils.Threading; -import com.google.common.base.Throwables; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -61,7 +63,17 @@ class ConnectionHandler implements MessageWriteTarget { @GuardedBy("lock") private boolean closeCalled = false; @GuardedBy("lock") private long bytesToWriteRemaining = 0; - @GuardedBy("lock") private final LinkedList bytesToWrite = new LinkedList<>(); + @GuardedBy("lock") private final LinkedList bytesToWrite = new LinkedList<>(); + + private static class BytesAndFuture { + public final ByteBuffer bytes; + public final SettableFuture future; + + public BytesAndFuture(ByteBuffer bytes, SettableFuture future) { + this.bytes = bytes; + this.future = future; + } + } private Set connectedHandlers; @@ -113,13 +125,14 @@ class ConnectionHandler implements MessageWriteTarget { lock.lock(); try { // Iterate through the outbound ByteBuff queue, pushing as much as possible into the OS' network buffer. - Iterator bytesIterator = bytesToWrite.iterator(); - while (bytesIterator.hasNext()) { - ByteBuffer buff = bytesIterator.next(); - bytesToWriteRemaining -= channel.write(buff); - if (!buff.hasRemaining()) - bytesIterator.remove(); - else { + Iterator iterator = bytesToWrite.iterator(); + while (iterator.hasNext()) { + BytesAndFuture bytesAndFuture = iterator.next(); + bytesToWriteRemaining -= channel.write(bytesAndFuture.bytes); + if (!bytesAndFuture.bytes.hasRemaining()) { + iterator.remove(); + bytesAndFuture.future.set(null); + } else { setWriteOps(); break; } @@ -134,7 +147,7 @@ class ConnectionHandler implements MessageWriteTarget { } @Override - public void writeBytes(byte[] message) throws IOException { + public ListenableFuture writeBytes(byte[] message) throws IOException { boolean andUnlock = true; lock.lock(); try { @@ -147,9 +160,11 @@ class ConnectionHandler implements MessageWriteTarget { throw new IOException("Outbound buffer overflowed"); // Just dump the message onto the write buffer and call tryWriteBytes // TODO: Kill the needless message duplication when the write completes right away - bytesToWrite.offer(ByteBuffer.wrap(Arrays.copyOf(message, message.length))); + final SettableFuture future = SettableFuture.create(); + bytesToWrite.offer(new BytesAndFuture(ByteBuffer.wrap(Arrays.copyOf(message, message.length)), future)); bytesToWriteRemaining += message.length; setWriteOps(); + return future; } catch (IOException e) { lock.unlock(); andUnlock = false; diff --git a/core/src/main/java/org/bitcoinj/net/MessageWriteTarget.java b/core/src/main/java/org/bitcoinj/net/MessageWriteTarget.java index 7679b7304..a16a5c407 100644 --- a/core/src/main/java/org/bitcoinj/net/MessageWriteTarget.java +++ b/core/src/main/java/org/bitcoinj/net/MessageWriteTarget.java @@ -16,6 +16,8 @@ package org.bitcoinj.net; +import com.google.common.util.concurrent.ListenableFuture; + import java.io.IOException; /** @@ -23,9 +25,10 @@ import java.io.IOException; */ public interface MessageWriteTarget { /** - * Writes the given bytes to the remote server. + * Writes the given bytes to the remote server. The returned future will complete when all bytes + * have been written to the OS network buffer. */ - void writeBytes(byte[] message) throws IOException; + ListenableFuture writeBytes(byte[] message) throws IOException; /** * Closes the connection to the server, triggering the {@link StreamConnection#connectionClosed()} * event on the network-handling thread where all callbacks occur. diff --git a/core/src/main/java/org/bitcoinj/net/NioClient.java b/core/src/main/java/org/bitcoinj/net/NioClient.java index 1ab218e10..6c4698854 100644 --- a/core/src/main/java/org/bitcoinj/net/NioClient.java +++ b/core/src/main/java/org/bitcoinj/net/NioClient.java @@ -120,7 +120,7 @@ public class NioClient implements MessageWriteTarget { } @Override - public synchronized void writeBytes(byte[] message) throws IOException { - handler.writeTarget.writeBytes(message); + public synchronized ListenableFuture writeBytes(byte[] message) throws IOException { + return handler.writeTarget.writeBytes(message); } }