PeerGroup: migrate vConnectTimeout field to java.time API

This commit is contained in:
Andreas Schildbach 2023-03-10 18:33:24 +01:00
parent 034277fae0
commit 7e70541658
11 changed files with 81 additions and 53 deletions

View file

@ -355,9 +355,9 @@ public class PeerGroup implements TransactionBroadcaster {
private final FilterMerger bloomFilterMerger;
/** The default timeout between when a connection attempt begins and version message exchange completes */
public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 5000;
private volatile int vConnectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT_MILLIS;
public static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(5);
private volatile Duration vConnectTimeout = DEFAULT_CONNECT_TIMEOUT;
/** Whether bloom filter support is enabled when using a non FullPrunedBlockchain*/
private volatile boolean vBloomFilteringEnabled = true;
@ -617,7 +617,7 @@ public class PeerGroup implements TransactionBroadcaster {
executor.schedule(this, delay.toMillis(), TimeUnit.MILLISECONDS);
return;
}
connectTo(addrToTry, false, vConnectTimeoutMillis);
connectTo(addrToTry, false, vConnectTimeout);
} finally {
lock.unlock();
}
@ -1128,7 +1128,7 @@ public class PeerGroup implements TransactionBroadcaster {
// Do a fast blocking connect to see if anything is listening.
try (Socket socket = new Socket()) {
socket.connect(new InetSocketAddress(InetAddress.getLoopbackAddress(), params.getPort()),
vConnectTimeoutMillis);
Math.toIntExact(vConnectTimeout.toMillis()));
localhostCheckState = LocalhostCheckState.FOUND;
return true;
} catch (IOException e) {
@ -1454,7 +1454,7 @@ public class PeerGroup implements TransactionBroadcaster {
try {
PeerAddress peerAddress = new PeerAddress(params, address);
backoffMap.put(peerAddress, new ExponentialBackoff(peerBackoffParams));
return connectTo(peerAddress, true, vConnectTimeoutMillis);
return connectTo(peerAddress, true, vConnectTimeout);
} finally {
lock.unlock();
}
@ -1469,7 +1469,7 @@ public class PeerGroup implements TransactionBroadcaster {
try {
final PeerAddress localhost = PeerAddress.localhost(params);
backoffMap.put(localhost, new ExponentialBackoff(peerBackoffParams));
return connectTo(localhost, true, vConnectTimeoutMillis);
return connectTo(localhost, true, vConnectTimeout);
} finally {
lock.unlock();
}
@ -1481,10 +1481,11 @@ public class PeerGroup implements TransactionBroadcaster {
* @param address Remote network address
* @param incrementMaxConnections Whether to consider this connection an attempt to fill our quota, or something
* explicitly requested.
* @param connectTimeout timeout for establishing the connection to peers
* @return Peer or null.
*/
@Nullable @GuardedBy("lock")
protected Peer connectTo(PeerAddress address, boolean incrementMaxConnections, int connectTimeoutMillis) {
protected Peer connectTo(PeerAddress address, boolean incrementMaxConnections, Duration connectTimeout) {
checkState(lock.isHeldByCurrentThread());
VersionMessage ver = getVersionMessage().duplicate();
ver.bestHeight = chain == null ? 0 : chain.getBestChainHeight();
@ -1510,7 +1511,7 @@ public class PeerGroup implements TransactionBroadcaster {
handlePeerDeath(peer, cause);
return null;
}
peer.setSocketTimeout(connectTimeoutMillis);
peer.setSocketTimeout(connectTimeout);
// When the channel has connected and version negotiated successfully, handleNewPeer will end up being called on
// a worker thread.
if (incrementMaxConnections) {
@ -1530,9 +1531,16 @@ public class PeerGroup implements TransactionBroadcaster {
/**
* Sets the timeout between when a connection attempt to a peer begins and when the version message exchange
* completes. This does not apply to currently pending peers.
* @param connectTimeout timeout for estiablishing the connection to peers
*/
public void setConnectTimeout(Duration connectTimeout) {
this.vConnectTimeout = connectTimeout;
}
/** @deprecated use {@link #setConnectTimeout(Duration)} */
@Deprecated
public void setConnectTimeoutMillis(int connectTimeoutMillis) {
this.vConnectTimeoutMillis = connectTimeoutMillis;
setConnectTimeout(Duration.ofMillis(connectTimeoutMillis));
}
/**

View file

@ -36,6 +36,7 @@ import java.nio.Buffer;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.NotYetConnectedException;
import java.time.Duration;
import java.util.concurrent.locks.Lock;
import static com.google.common.base.Preconditions.checkArgument;
@ -82,8 +83,8 @@ public abstract class PeerSocketHandler implements TimeoutHandler, StreamConnect
}
@Override
public void setSocketTimeout(int timeoutMillis) {
timeoutTask.setSocketTimeout(timeoutMillis);
public void setSocketTimeout(Duration timeout) {
timeoutTask.setSocketTimeout(timeout);
}
/**

View file

@ -16,6 +16,8 @@
package org.bitcoinj.net;
import java.time.Duration;
/**
* A base class which provides basic support for socket timeouts. It is used instead of integrating timeouts into the
* NIO select thread both for simplicity and to keep code shared between NIO and blocking sockets as much as possible.
@ -34,7 +36,7 @@ public abstract class AbstractTimeoutHandler implements TimeoutHandler {
* <p>Enables or disables the timeout entirely. This may be useful if you want to store the timeout value but wish
* to temporarily disable/enable timeouts.</p>
*
* <p>The default is for timeoutEnabled to be true but timeoutMillis to be set to 0 (ie disabled).</p>
* <p>The default is for timeoutEnabled to be true but timeout to be set to {@link Duration#ZERO} (ie disabled).</p>
*
* <p>This call will reset the current progress towards the timeout.</p>
*/
@ -44,18 +46,18 @@ public abstract class AbstractTimeoutHandler implements TimeoutHandler {
}
/**
* <p>Sets the receive timeout to the given number of milliseconds, automatically killing the connection if no
* <p>Sets the receive timeout, automatically killing the connection if no
* messages are received for this long</p>
*
* <p>A timeout of 0 is interpreted as no timeout.</p>
* <p>A timeout of 0{@link Duration#ZERO} is interpreted as no timeout.</p>
*
* <p>The default is for timeoutEnabled to be true but timeoutMillis to be set to 0 (ie disabled).</p>
* <p>The default is for timeoutEnabled to be true but timeout to be set to {@link Duration#ZERO} (ie disabled).</p>
*
* <p>This call will reset the current progress towards the timeout.</p>
*/
@Override
public synchronized final void setSocketTimeout(int timeoutMillis) {
timeoutTask.setSocketTimeout(timeoutMillis);
public synchronized final void setSocketTimeout(Duration timeout) {
timeoutTask.setSocketTimeout(timeout);
}
/**

View file

@ -30,6 +30,7 @@ import java.net.Socket;
import java.net.SocketAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@ -58,14 +59,13 @@ public class BlockingClient implements MessageWriteTarget {
* open, but will call either the {@link StreamConnection#connectionOpened()} or
* {@link StreamConnection#connectionClosed()} callback on the created network event processing thread.</p>
*
* @param connectTimeoutMillis The connect timeout set on the connection (in milliseconds). 0 is interpreted as no
* timeout.
* @param connectTimeout The connect timeout set on the connection. ZERO is interpreted as no timeout.
* @param socketFactory An object that creates {@link Socket} objects on demand, which may be customised to control
* how this client connects to the internet. If not sure, use SocketFactory.getDefault()
* @param clientSet A set which this object will add itself to after initialization, and then remove itself from
*/
public BlockingClient(final SocketAddress serverAddress, final StreamConnection connection,
final int connectTimeoutMillis, final SocketFactory socketFactory,
final Duration connectTimeout, final SocketFactory socketFactory,
@Nullable final Set<BlockingClient> clientSet) throws IOException {
connectFuture = new CompletableFuture<>();
// Try to fit at least one message in the network buffer, but place an upper and lower limit on its size to make
@ -78,7 +78,7 @@ public class BlockingClient implements MessageWriteTarget {
if (clientSet != null)
clientSet.add(BlockingClient.this);
try {
socket.connect(serverAddress, connectTimeoutMillis);
socket.connect(serverAddress, Math.toIntExact(connectTimeout.toMillis()));
connection.connectionOpened();
connectFuture.complete(serverAddress);
InputStream stream = socket.getInputStream();

View file

@ -22,6 +22,7 @@ import org.bitcoinj.utils.ListenableCompletableFuture;
import javax.net.SocketFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@ -40,7 +41,7 @@ public class BlockingClientManager extends AbstractIdleService implements Client
private final SocketFactory socketFactory;
private final Set<BlockingClient> clients = Collections.synchronizedSet(new HashSet<BlockingClient>());
private int connectTimeoutMillis = 1000;
private Duration connectTimeout = Duration.ofSeconds(1);
public BlockingClientManager() {
socketFactory = SocketFactory.getDefault();
@ -59,15 +60,24 @@ public class BlockingClientManager extends AbstractIdleService implements Client
try {
if (!isRunning())
throw new IllegalStateException();
return new BlockingClient(serverAddress, connection, connectTimeoutMillis, socketFactory, clients).getConnectFuture();
return new BlockingClient(serverAddress, connection, connectTimeout, socketFactory, clients).getConnectFuture();
} catch (IOException e) {
throw new RuntimeException(e); // This should only happen if we are, eg, out of system resources
}
}
/** Sets the number of milliseconds to wait before giving up on a connect attempt */
/**
* Sets the number of milliseconds to wait before giving up on a connect attempt
* @param timeout timeout for establishing a connection to the client
*/
public void setConnectTimeout(Duration connectTimeout) {
this.connectTimeout = connectTimeout;
}
/** @deprecated use {@link #setConnectTimeout(Duration)} */
@Deprecated
public void setConnectTimeoutMillis(int connectTimeoutMillis) {
this.connectTimeoutMillis = connectTimeoutMillis;
setConnectTimeout(Duration.ofMillis(connectTimeoutMillis));
}
@Override

View file

@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.time.Duration;
/**
* Creates a simple connection to a server using a {@link StreamConnection} to process data.
@ -42,10 +43,10 @@ public class NioClient implements MessageWriteTarget {
private boolean closeOnOpen = false;
private boolean closeCalled = false;
Handler(StreamConnection upstreamConnection, int connectTimeoutMillis) {
Handler(StreamConnection upstreamConnection, Duration connectTimeout) {
this.upstreamConnection = upstreamConnection;
this.timeoutTask = new SocketTimeoutTask(this::timeoutOccurred);
setSocketTimeout(connectTimeoutMillis);
setSocketTimeout(connectTimeout);
setTimeoutEnabled(true);
}
@ -60,8 +61,8 @@ public class NioClient implements MessageWriteTarget {
}
@Override
public void setSocketTimeout(int timeoutMillis) {
timeoutTask.setSocketTimeout(timeoutMillis);
public void setSocketTimeout(Duration timeout) {
timeoutTask.setSocketTimeout(timeout);
}
@Override
@ -111,10 +112,10 @@ public class NioClient implements MessageWriteTarget {
* timeout.
*/
public NioClient(final SocketAddress serverAddress, final StreamConnection parser,
final int connectTimeoutMillis) throws IOException {
final Duration connectTimeout) throws IOException {
manager.startAsync();
manager.awaitRunning();
handler = new Handler(parser, connectTimeoutMillis);
handler = new Handler(parser, connectTimeout);
manager.openConnection(serverAddress, handler).whenComplete((result, t) -> {
if (t != null) {
log.error("Connect to {} failed: {}", serverAddress, Throwables.getRootCause(t));

View file

@ -16,6 +16,7 @@
package org.bitcoinj.net;
import java.time.Duration;
import java.util.Timer;
import java.util.TimerTask;
@ -26,7 +27,7 @@ public class SocketTimeoutTask implements TimeoutHandler {
// TimerTask and timeout value which are added to a timer to kill the connection on timeout
private final Runnable actualTask;
private TimerTask timeoutTask;
private long timeoutMillis = 0;
private Duration timeout = Duration.ZERO;
private boolean timeoutEnabled = true;
// A timer which manages expiring channels as their timeouts occur (if configured).
@ -40,7 +41,7 @@ public class SocketTimeoutTask implements TimeoutHandler {
* <p>Enables or disables the timeout entirely. This may be useful if you want to store the timeout value but wish
* to temporarily disable/enable timeouts.</p>
*
* <p>The default is for timeoutEnabled to be true but timeoutMillis to be set to 0 (ie disabled).</p>
* <p>The default is for timeoutEnabled to be true but timeout to be set to {@link Duration#ZERO} (ie disabled).</p>
*
* <p>This call will reset the current progress towards the timeout.</p>
*/
@ -51,18 +52,18 @@ public class SocketTimeoutTask implements TimeoutHandler {
}
/**
* <p>Sets the receive timeout to the given number of milliseconds, automatically killing the connection if no
* <p>Sets the receive timeout, automatically killing the connection if no
* messages are received for this long</p>
*
* <p>A timeout of 0 is interpreted as no timeout.</p>
* <p>A timeout of {@link Duration#ZERO} is interpreted as no timeout.</p>
*
* <p>The default is for timeoutEnabled to be true but timeoutMillis to be set to 0 (ie disabled).</p>
* <p>The default is for timeoutEnabled to be true but timeout to be set to {@link Duration#ZERO} (ie disabled).</p>
*
* <p>This call will reset the current progress towards the timeout.</p>
*/
@Override
public synchronized final void setSocketTimeout(int timeoutMillis) {
this.timeoutMillis = timeoutMillis;
public synchronized final void setSocketTimeout(Duration timeout) {
this.timeout = timeout;
resetTimeout();
}
@ -74,11 +75,11 @@ public class SocketTimeoutTask implements TimeoutHandler {
synchronized void resetTimeout() {
if (timeoutTask != null)
timeoutTask.cancel();
if (timeoutMillis == 0 || !timeoutEnabled)
if (timeout.isZero() || !timeoutEnabled)
return;
// TimerTasks are not reusable, so we create a new one each time
timeoutTask = timerTask(actualTask);
timeoutTimer.schedule(timeoutTask, timeoutMillis);
timeoutTimer.schedule(timeoutTask, timeout.toMillis());
}
// Create TimerTask from Runnable

View file

@ -16,6 +16,8 @@
package org.bitcoinj.net;
import java.time.Duration;
/**
* Provides basic support for socket timeouts. It is used instead of integrating timeouts into the
* NIO select thread both for simplicity and to keep code shared between NIO and blocking sockets as much as possible.
@ -25,21 +27,21 @@ public interface TimeoutHandler {
* <p>Enables or disables the timeout entirely. This may be useful if you want to store the timeout value but wish
* to temporarily disable/enable timeouts.</p>
*
* <p>The default is for timeoutEnabled to be true but timeoutMillis to be set to 0 (ie disabled).</p>
* <p>The default is for timeoutEnabled to be true but timeout to be set to 0 (ie disabled).</p>
*
* <p>This call will reset the current progress towards the timeout.</p>
*/
void setTimeoutEnabled(boolean timeoutEnabled);
/**
* <p>Sets the receive timeout to the given number of milliseconds, automatically killing the connection if no
* <p>Sets the receive timeout, automatically killing the connection if no
* messages are received for this long</p>
*
* <p>A timeout of 0 is interpreted as no timeout.</p>
* <p>A timeout of {@link Duration#ZERO} is interpreted as no timeout.</p>
*
* <p>The default is for timeoutEnabled to be true but timeoutMillis to be set to 0 (ie disabled).</p>
* <p>The default is for timeoutEnabled to be true but timeout to be set to {@link Duration#ZERO} (ie disabled).</p>
*
* <p>This call will reset the current progress towards the timeout.</p>
*/
void setSocketTimeout(int timeoutMillis);
void setSocketTimeout(Duration timeout);
}

View file

@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@ -207,7 +208,7 @@ public class BitcoindComparisonTool {
bitcoindChainHead = PARAMS.getGenesisBlock().getHash();
// bitcoind MUST be on localhost or we will get banned as a DoSer
new NioClient(new InetSocketAddress(InetAddress.getLoopbackAddress(), args.length > 2 ? Integer.parseInt(args[2]) : PARAMS.getPort()), bitcoind, 1000);
new NioClient(new InetSocketAddress(InetAddress.getLoopbackAddress(), args.length > 2 ? Integer.parseInt(args[2]) : PARAMS.getPort()), bitcoind, Duration.ofSeconds(1));
connectedFuture.get();

View file

@ -525,9 +525,9 @@ public class PeerGroupTest extends TestWithPeerGroup {
@Test
public void peerTimeoutTest() throws Exception {
final int timeout = 100;
final Duration timeout = Duration.ofMillis(100);
peerGroup.start();
peerGroup.setConnectTimeoutMillis(timeout);
peerGroup.setConnectTimeout(timeout);
final CompletableFuture<Void> peerConnectedFuture = new CompletableFuture<>();
final CompletableFuture<Void> peerDisconnectedFuture = new CompletableFuture<>();
@ -538,13 +538,14 @@ public class PeerGroupTest extends TestWithPeerGroup {
connectPeerWithoutVersionExchange(0);
// wait for disconnect (plus a bit more, in case test server is overloaded)
try {
peerDisconnectedFuture.get(timeout + 200, TimeUnit.MILLISECONDS);
peerDisconnectedFuture.get(timeout.plusMillis(200).toMillis(), TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
// the checks below suffice for this case too
}
// check things after disconnect
assertFalse(peerConnectedFuture.isDone()); // should never have connected
assertTrue(TimeUtils.elapsedTime(start).toMillis() >= timeout); // should not disconnect before timeout
Duration elapsed = TimeUtils.elapsedTime(start);
assertTrue(elapsed.toMillis() + " ms",elapsed.compareTo(timeout) >= 0); // should not disconnect before timeout
assertTrue(peerDisconnectedFuture.isDone()); // but should disconnect eventually
}

View file

@ -53,6 +53,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
@ -176,9 +177,9 @@ public class TestWithNetworkConnections {
if (clientType == ClientType.NIO_CLIENT_MANAGER || clientType == ClientType.BLOCKING_CLIENT_MANAGER)
channels.openConnection(new InetSocketAddress(InetAddress.getLoopbackAddress(), 2000), peer);
else if (clientType == ClientType.NIO_CLIENT)
new NioClient(new InetSocketAddress(InetAddress.getLoopbackAddress(), 2000), peer, 100);
new NioClient(new InetSocketAddress(InetAddress.getLoopbackAddress(), 2000), peer, Duration.ofMillis(100));
else if (clientType == ClientType.BLOCKING_CLIENT)
new BlockingClient(new InetSocketAddress(InetAddress.getLoopbackAddress(), 2000), peer, 100, SocketFactory.getDefault(), null);
new BlockingClient(new InetSocketAddress(InetAddress.getLoopbackAddress(), 2000), peer, Duration.ofMillis(100), SocketFactory.getDefault(), null);
else
throw new RuntimeException();
// Claim we are connected to a different IP that what we really are, so tx confidence broadcastBy sets work