Peer: deprecate ping() and replace with sendPing()

1. Returns Duration rather than long
2. Uses CompletableFuture rather than ListenableCompletableFuture

I've also changed the error-handling a little. Instead of throwing
a ProtocolException (RuntimeException) if PingPong isn't supported
the exception is set in the CompletableFuture. We don't have anything
catching this exception, so wrapping it in the CompletableFuture seems
like a safe move.
This commit is contained in:
Sean Gilligan 2023-03-02 09:27:31 -08:00 committed by Andreas Schildbach
parent 916989a7cd
commit 17abffa04b
6 changed files with 38 additions and 29 deletions

View File

@ -1465,7 +1465,7 @@ public class Peer extends PeerSocketHandler {
private class PendingPing {
// The future that will be invoked when the pong is heard back.
public final CompletableFuture<Long> future;
public final CompletableFuture<Duration> future;
// The random nonce that lets us tell apart overlapping pings/pongs.
public final long nonce;
// Measurement of the time elapsed.
@ -1483,7 +1483,7 @@ public class Peer extends PeerSocketHandler {
Peer.this.addPingTimeData(elapsed.toMillis());
if (log.isDebugEnabled())
log.debug("{}: ping time is {} ms", Peer.this.toString(), elapsed.toMillis());
future.complete(elapsed.toMillis());
future.complete(elapsed);
}
}
}
@ -1508,20 +1508,20 @@ public class Peer extends PeerSocketHandler {
}
/**
* Sends the peer a ping message and returns a future that will be invoked when the pong is received back.
* The future provides a number which is the number of milliseconds elapsed between the ping and the pong.
* Once the pong is received the value returned by {@link Peer#getLastPingTime()} is
* updated.
* @throws ProtocolException if the peer version is too low to support measurable pings.
* Sends the peer a ping message and returns a future that will be completed when the pong is received back.
* The future provides a {@link Duration} which contains the time elapsed between the ping and the pong.
* Once the pong is received the value returned by {@link Peer#getLastPingTime()} is updated.
* The future completes exceptionally with a {@link ProtocolException} if the peer version is too low to support measurable pings.
* @return A future for the duration representing elapsed time
*/
public ListenableCompletableFuture<Long> ping() throws ProtocolException {
return ping((long) (Math.random() * Long.MAX_VALUE));
public CompletableFuture<Duration> sendPing() {
return sendPing((long) (Math.random() * Long.MAX_VALUE));
}
protected ListenableCompletableFuture<Long> ping(long nonce) throws ProtocolException {
protected CompletableFuture<Duration> sendPing(long nonce) {
final VersionMessage ver = vPeerVersionMessage;
if (!ver.isPingPongSupported())
throw new ProtocolException("Peer version is too low for measurable pings: " + ver);
return FutureUtils.failedFuture(new ProtocolException("Peer version is too low for measurable pings: " + ver));
if (pendingPings.size() > PENDING_PINGS_LIMIT) {
log.info("{}: Too many pending pings, disconnecting", this);
close();
@ -1529,11 +1529,19 @@ public class Peer extends PeerSocketHandler {
PendingPing pendingPing = new PendingPing(nonce);
pendingPings.add(pendingPing);
sendMessage(new Ping(pendingPing.nonce));
return ListenableCompletableFuture.of(pendingPing.future);
return pendingPing.future;
}
/**
* Returns the elapsed time of the last ping/pong cycle. If {@link Peer#ping()} has never
* @deprecated Use {@link #sendPing()}
*/
@Deprecated
public ListenableCompletableFuture<Long> ping() {
return ListenableCompletableFuture.of(sendPing().thenApply(Duration::toMillis));
}
/**
* Returns the elapsed time of the last ping/pong cycle. If {@link Peer#sendPing()} has never
* been called or we did not hear back the "pong" message yet, returns {@link Long#MAX_VALUE}.
*/
public long getLastPingTime() {
@ -1548,7 +1556,7 @@ public class Peer extends PeerSocketHandler {
}
/**
* Returns a moving average of the last N ping/pong cycles. If {@link Peer#ping()} has never
* Returns a moving average of the last N ping/pong cycles. If {@link Peer#sendPing()} has never
* been called or we did not hear back the "pong" message yet, returns {@link Long#MAX_VALUE}. The moving average
* window is 5 buckets.
*/
@ -1719,7 +1727,7 @@ public class Peer extends PeerSocketHandler {
}
// Ping/pong to wait for blocks that are still being streamed to us to finish being downloaded and
// discarded.
ping().thenRunAsync(() -> {
sendPing().thenRunAsync(() -> {
lock.lock();
checkNotNull(awaitingFreshFilter);
GetDataMessage getdata = new GetDataMessage(params);

View File

@ -1672,7 +1672,7 @@ public class PeerGroup implements TransactionBroadcaster {
for (Peer peer : getConnectedPeers()) {
if (peer.getPeerVersionMessage().clientVersion < params.getProtocolVersionNum(NetworkParameters.ProtocolVersion.PONG))
continue;
peer.ping();
peer.sendPing();
}
} catch (Throwable e) {
log.error("Exception in ping loop", e); // The executor swallows exceptions :(
@ -2231,7 +2231,7 @@ public class PeerGroup implements TransactionBroadcaster {
* times are available via {@link Peer#getLastPingTime()} but it increases load on the
* remote node. It defaults to {@link PeerGroup#DEFAULT_PING_INTERVAL_MSEC}.
* Setting the value to be smaller or equals 0 disables pinging entirely, although you can still request one yourself
* using {@link Peer#ping()}.
* using {@link Peer#sendPing()}.
*/
public void setPingIntervalMsec(long pingIntervalMsec) {
lock.lock();

View File

@ -270,7 +270,7 @@ public class VersionMessage extends Message {
/**
* Returns true if the clientVersion field is {@link NetworkParameters.ProtocolVersion#PONG} or higher.
* If it is then {@link Peer#ping()} is usable.
* If it is then {@link Peer#sendPing()} is usable.
*/
public boolean isPingPongSupported() {
return clientVersion >= params.getProtocolVersionNum(NetworkParameters.ProtocolVersion.PONG);

View File

@ -302,7 +302,7 @@ public class BitcoindComparisonTool {
locator = new BlockLocator();
locator = locator.add(bitcoindChainHead);
bitcoind.sendMessage(new GetHeadersMessage(PARAMS, locator, hashTo));
bitcoind.ping().get();
bitcoind.sendPing().get();
if (!chain.getChainHead().getHeader().getHash().equals(bitcoindChainHead)) {
rulesSinceFirstFail++;
log.error("ERROR: bitcoind and bitcoinj acceptance differs on block \"" + block.ruleName + "\"");
@ -313,7 +313,7 @@ public class BitcoindComparisonTool {
} else if (rule instanceof MemoryPoolState) {
MemoryPoolMessage message = new MemoryPoolMessage();
bitcoind.sendMessage(message);
bitcoind.ping().get();
bitcoind.sendPing().get();
if (mostRecentInv == null && !((MemoryPoolState) rule).mempool.isEmpty()) {
log.error("ERROR: bitcoind had an empty mempool, but we expected some transactions on rule " + rule.ruleName);
rulesSinceFirstFail++;

View File

@ -45,6 +45,7 @@ import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.channels.CancelledKeyException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@ -504,7 +505,7 @@ public class PeerTest extends TestWithNetworkConnections {
// No ping pong happened yet.
assertEquals(Long.MAX_VALUE, peer.getLastPingTime());
assertEquals(Long.MAX_VALUE, peer.getPingTime());
CompletableFuture<Long> future = peer.ping();
CompletableFuture<Duration> future = peer.sendPing();
assertEquals(Long.MAX_VALUE, peer.getLastPingTime());
assertEquals(Long.MAX_VALUE, peer.getPingTime());
assertFalse(future.isDone());
@ -514,17 +515,17 @@ public class PeerTest extends TestWithNetworkConnections {
inbound(writeTarget, new Pong(pingMsg.getNonce()));
pingAndWait(writeTarget);
assertTrue(future.isDone());
long elapsed = future.get();
assertTrue("" + elapsed, elapsed > 1000);
assertEquals(elapsed, peer.getLastPingTime());
assertEquals(elapsed, peer.getPingTime());
Duration elapsed = future.get();
assertTrue(elapsed.toMillis() + " ms", elapsed.toMillis() > 1000);
assertEquals(elapsed.toMillis(), peer.getLastPingTime());
assertEquals(elapsed.toMillis(), peer.getPingTime());
// Do it again and make sure it affects the average.
future = peer.ping();
CompletableFuture<Duration> future2 = peer.sendPing();
pingMsg = (Ping) outbound(writeTarget);
TimeUtils.rollMockClock(50);
inbound(writeTarget, new Pong(pingMsg.getNonce()));
elapsed = future.get();
assertEquals(elapsed, peer.getLastPingTime());
Duration elapsed2 = future2.get();
assertEquals(elapsed2.toMillis(), peer.getLastPingTime());
assertEquals(7250, peer.getPingTime());
}

View File

@ -729,7 +729,7 @@ public class WalletTool implements Callable<Integer> {
// Hack for regtest/single peer mode, as we're about to shut down and won't get an ACK from the remote end.
List<Peer> peerList = peerGroup.getConnectedPeers();
if (peerList.size() == 1)
peerList.get(0).ping().get();
peerList.get(0).sendPing().get();
} catch (BlockStoreException | ExecutionException | InterruptedException | KeyCrypterException e) {
throw new RuntimeException(e);
} catch (InsufficientMoneyException e) {