MessageWriteTarget: Return a future from writeBytes() that completes when all bytes have been written to the OS network buffer.

This commit is contained in:
Andreas Schildbach 2020-02-28 12:40:50 +01:00
parent 0a085fde6d
commit afb376fc07
5 changed files with 40 additions and 18 deletions

View file

@ -16,6 +16,8 @@
package org.bitcoinj.core;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.bitcoinj.net.AbstractTimeoutHandler;
import org.bitcoinj.net.MessageWriteTarget;
import org.bitcoinj.net.NioClient;
@ -76,7 +78,7 @@ public abstract class PeerSocketHandler extends AbstractTimeoutHandler implement
* the peer will have received it. Throws NotYetConnectedException if we are not yet connected to the remote peer.
* TODO: Maybe use something other than the unchecked NotYetConnectedException here
*/
public void sendMessage(Message message) throws NotYetConnectedException {
public ListenableFuture sendMessage(Message message) throws NotYetConnectedException {
lock.lock();
try {
if (writeTarget == null)
@ -88,9 +90,10 @@ public abstract class PeerSocketHandler extends AbstractTimeoutHandler implement
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
serializer.serialize(message, out);
writeTarget.writeBytes(out.toByteArray());
return writeTarget.writeBytes(out.toByteArray());
} catch (IOException e) {
exceptionCaught(e);
return Futures.immediateFailedFuture(e);
}
}

View file

@ -143,11 +143,12 @@ public class BlockingClient implements MessageWriteTarget {
}
@Override
public synchronized void writeBytes(byte[] message) throws IOException {
public synchronized ListenableFuture writeBytes(byte[] message) throws IOException {
try {
OutputStream stream = socket.getOutputStream();
stream.write(message);
stream.flush();
return Futures.immediateFuture(null);
} catch (IOException e) {
log.error("Error writing message to connection, closing connection", e);
closeConnection();

View file

@ -16,9 +16,11 @@
package org.bitcoinj.net;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.bitcoinj.core.Message;
import org.bitcoinj.utils.Threading;
import com.google.common.base.Throwables;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@ -61,7 +63,17 @@ class ConnectionHandler implements MessageWriteTarget {
@GuardedBy("lock") private boolean closeCalled = false;
@GuardedBy("lock") private long bytesToWriteRemaining = 0;
@GuardedBy("lock") private final LinkedList<ByteBuffer> bytesToWrite = new LinkedList<>();
@GuardedBy("lock") private final LinkedList<BytesAndFuture> bytesToWrite = new LinkedList<>();
private static class BytesAndFuture {
public final ByteBuffer bytes;
public final SettableFuture future;
public BytesAndFuture(ByteBuffer bytes, SettableFuture future) {
this.bytes = bytes;
this.future = future;
}
}
private Set<ConnectionHandler> connectedHandlers;
@ -113,13 +125,14 @@ class ConnectionHandler implements MessageWriteTarget {
lock.lock();
try {
// Iterate through the outbound ByteBuff queue, pushing as much as possible into the OS' network buffer.
Iterator<ByteBuffer> bytesIterator = bytesToWrite.iterator();
while (bytesIterator.hasNext()) {
ByteBuffer buff = bytesIterator.next();
bytesToWriteRemaining -= channel.write(buff);
if (!buff.hasRemaining())
bytesIterator.remove();
else {
Iterator<BytesAndFuture> iterator = bytesToWrite.iterator();
while (iterator.hasNext()) {
BytesAndFuture bytesAndFuture = iterator.next();
bytesToWriteRemaining -= channel.write(bytesAndFuture.bytes);
if (!bytesAndFuture.bytes.hasRemaining()) {
iterator.remove();
bytesAndFuture.future.set(null);
} else {
setWriteOps();
break;
}
@ -134,7 +147,7 @@ class ConnectionHandler implements MessageWriteTarget {
}
@Override
public void writeBytes(byte[] message) throws IOException {
public ListenableFuture writeBytes(byte[] message) throws IOException {
boolean andUnlock = true;
lock.lock();
try {
@ -147,9 +160,11 @@ class ConnectionHandler implements MessageWriteTarget {
throw new IOException("Outbound buffer overflowed");
// Just dump the message onto the write buffer and call tryWriteBytes
// TODO: Kill the needless message duplication when the write completes right away
bytesToWrite.offer(ByteBuffer.wrap(Arrays.copyOf(message, message.length)));
final SettableFuture<Object> future = SettableFuture.create();
bytesToWrite.offer(new BytesAndFuture(ByteBuffer.wrap(Arrays.copyOf(message, message.length)), future));
bytesToWriteRemaining += message.length;
setWriteOps();
return future;
} catch (IOException e) {
lock.unlock();
andUnlock = false;

View file

@ -16,6 +16,8 @@
package org.bitcoinj.net;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
/**
@ -23,9 +25,10 @@ import java.io.IOException;
*/
public interface MessageWriteTarget {
/**
* Writes the given bytes to the remote server.
* Writes the given bytes to the remote server. The returned future will complete when all bytes
* have been written to the OS network buffer.
*/
void writeBytes(byte[] message) throws IOException;
ListenableFuture writeBytes(byte[] message) throws IOException;
/**
* Closes the connection to the server, triggering the {@link StreamConnection#connectionClosed()}
* event on the network-handling thread where all callbacks occur.

View file

@ -120,7 +120,7 @@ public class NioClient implements MessageWriteTarget {
}
@Override
public synchronized void writeBytes(byte[] message) throws IOException {
handler.writeTarget.writeBytes(message);
public synchronized ListenableFuture writeBytes(byte[] message) throws IOException {
return handler.writeTarget.writeBytes(message);
}
}