Ping, Pong: make immutable

This commit is contained in:
Andreas Schildbach 2023-03-22 17:54:32 +01:00
parent 1d58722630
commit c3d20cb9b4
8 changed files with 66 additions and 41 deletions

View file

@ -1254,7 +1254,7 @@ public class Peer extends PeerSocketHandler {
} }
if (pingAfterGetData) if (pingAfterGetData)
sendMessage(new Ping((long) (Math.random() * Long.MAX_VALUE))); sendMessage(Ping.random());
} }
/** /**
@ -1555,7 +1555,7 @@ public class Peer extends PeerSocketHandler {
} }
PendingPing pendingPing = new PendingPing(nonce); PendingPing pendingPing = new PendingPing(nonce);
pendingPings.add(pendingPing); pendingPings.add(pendingPing);
sendMessage(new Ping(pendingPing.nonce)); sendMessage(Ping.of(pendingPing.nonce));
return pendingPing.future; return pendingPing.future;
} }
@ -1603,13 +1603,13 @@ public class Peer extends PeerSocketHandler {
} }
private void processPing(Ping m) { private void processPing(Ping m) {
sendMessage(new Pong(m.getNonce())); sendMessage(m.pong());
} }
protected void processPong(Pong m) { protected void processPong(Pong m) {
// Iterates over a snapshot of the list, so we can run unlocked here. // Iterates over a snapshot of the list, so we can run unlocked here.
for (PendingPing ping : pendingPings) { for (PendingPing ping : pendingPings) {
if (m.getNonce() == ping.nonce) { if (m.nonce() == ping.nonce) {
pendingPings.remove(ping); pendingPings.remove(ping);
// This line may trigger an event listener that re-runs ping(). // This line may trigger an event listener that re-runs ping().
ping.complete(); ping.complete();
@ -1770,7 +1770,7 @@ public class Peer extends PeerSocketHandler {
// TODO: This bizarre ping-after-getdata hack probably isn't necessary. // TODO: This bizarre ping-after-getdata hack probably isn't necessary.
// It's to ensure we know when the end of a filtered block stream of txns is, but we should just be // It's to ensure we know when the end of a filtered block stream of txns is, but we should just be
// able to match txns with the merkleblock. Ask Matt why it's written this way. // able to match txns with the merkleblock. Ask Matt why it's written this way.
sendMessage(new Ping((long) (Math.random() * Long.MAX_VALUE))); sendMessage(Ping.random());
}, Threading.SAME_THREAD); }, Threading.SAME_THREAD);
} finally { } finally {
lock.unlock(); lock.unlock();

View file

@ -1,6 +1,5 @@
/* /*
* Copyright 2011 Noa Resare * Copyright by the original author or authors.
* Copyright 2015 Andreas Schildbach
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -26,12 +25,12 @@ import java.nio.ByteBuffer;
import java.util.Random; import java.util.Random;
/** /**
* <p>See <a href="https://github.com/bitcoin/bips/blob/master/bip-0031.mediawiki">BIP31</a> for details.</p> * See <a href="https://github.com/bitcoin/bips/blob/master/bip-0031.mediawiki">BIP31</a> for details.
* * <p>
* <p>Instances of this class are not safe for use by multiple threads.</p> * Instances of this class are immutable.
*/ */
public class Ping extends BaseMessage { public class Ping extends BaseMessage {
private long nonce; private final long nonce;
/** /**
* Deserialize this message from a given payload. * Deserialize this message from a given payload.
@ -45,17 +44,29 @@ public class Ping extends BaseMessage {
} }
/** /**
* Create a Ping with a given nonce value. * Create a ping with a nonce value.
* Only use this if the remote node has a protocol version greater than 60000
*
* @param nonce nonce value
* @return ping message
*/ */
public Ping(long nonce) { public static Ping of(long nonce) {
this.nonce = nonce; return new Ping(nonce);
} }
/** /**
* Create a Ping with a random nonce value. * Create a ping with a random nonce value.
* Only use this if the remote node has a protocol version greater than 60000
*
* @return ping message
*/ */
public Ping() { public static Ping random() {
this.nonce = new Random().nextLong(); long nonce = new Random().nextLong();
return new Ping(nonce);
}
private Ping(long nonce) {
this.nonce = nonce;
} }
@Override @Override
@ -69,7 +80,16 @@ public class Ping extends BaseMessage {
return true; return true;
} }
public long getNonce() { public long nonce() {
return nonce; return nonce;
} }
/**
* Create a {@link Pong} reply to this ping.
*
* @return pong message
*/
public Pong pong() {
return Pong.of(nonce);
}
} }

View file

@ -1,6 +1,5 @@
/* /*
* Copyright 2012 Matt Corallo * Copyright by the original author or authors.
* Copyright 2015 Andreas Schildbach
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -25,12 +24,12 @@ import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
/** /**
* <p>See <a href="https://github.com/bitcoin/bips/blob/master/bip-0031.mediawiki">BIP31</a> for details.</p> * See <a href="https://github.com/bitcoin/bips/blob/master/bip-0031.mediawiki">BIP31</a> for details.
* * <p>
* <p>Instances of this class are not safe for use by multiple threads.</p> * Instances of this class are immutable.
*/ */
public class Pong extends BaseMessage { public class Pong extends BaseMessage {
private long nonce; private final long nonce;
/** /**
* Deserialize this message from a given payload. * Deserialize this message from a given payload.
@ -42,12 +41,18 @@ public class Pong extends BaseMessage {
public static Pong read(ByteBuffer payload) throws BufferUnderflowException, ProtocolException { public static Pong read(ByteBuffer payload) throws BufferUnderflowException, ProtocolException {
return new Pong(ByteUtils.readInt64(payload)); return new Pong(ByteUtils.readInt64(payload));
} }
/** /**
* Create a Pong with a nonce value. * Create a pong with a nonce value.
* Only use this if the remote node has a protocol version greater than 60000 *
* @param nonce nonce value
* @return pong message
*/ */
public Pong(long nonce) { public static Pong of(long nonce) {
return new Pong(nonce);
}
private Pong(long nonce) {
this.nonce = nonce; this.nonce = nonce;
} }
@ -57,7 +62,7 @@ public class Pong extends BaseMessage {
} }
/** Returns the nonce sent by the remote peer. */ /** Returns the nonce sent by the remote peer. */
public long getNonce() { public long nonce() {
return nonce; return nonce;
} }
} }

View file

@ -209,7 +209,7 @@ public class FilteredBlockAndPartialMerkleTreeTest extends TestWithPeerGroup {
inbound(p1, tx1); inbound(p1, tx1);
inbound(p1, tx2); inbound(p1, tx2);
inbound(p1, tx3); inbound(p1, tx3);
inbound(p1, new Pong(((Ping)ping).getNonce())); inbound(p1, ((Ping) ping).pong());
pingAndWait(p1); pingAndWait(p1);

View file

@ -473,12 +473,12 @@ public class PeerGroupTest extends TestWithPeerGroup {
versionMessage.localServices = Services.of(Services.NODE_NETWORK); versionMessage.localServices = Services.of(Services.NODE_NETWORK);
InboundMessageQueuer p1 = connectPeer(1, versionMessage); InboundMessageQueuer p1 = connectPeer(1, versionMessage);
Ping ping = (Ping) waitForOutbound(p1); Ping ping = (Ping) waitForOutbound(p1);
inbound(p1, new Pong(ping.getNonce())); inbound(p1, ping.pong());
pingAndWait(p1); pingAndWait(p1);
assertTrue(peerGroup.getConnectedPeers().get(0).lastPingInterval().isPresent()); assertTrue(peerGroup.getConnectedPeers().get(0).lastPingInterval().isPresent());
// The call to outbound should block until a ping arrives. // The call to outbound should block until a ping arrives.
ping = (Ping) waitForOutbound(p1); ping = (Ping) waitForOutbound(p1);
inbound(p1, new Pong(ping.getNonce())); inbound(p1, ping.pong());
assertTrue(peerGroup.getConnectedPeers().get(0).lastPingInterval().isPresent()); assertTrue(peerGroup.getConnectedPeers().get(0).lastPingInterval().isPresent());
} }
@ -844,7 +844,7 @@ public class PeerGroupTest extends TestWithPeerGroup {
assertNotEquals(filter, newFilter); assertNotEquals(filter, newFilter);
assertNextMessageIs(p1, MemoryPoolMessage.class); assertNextMessageIs(p1, MemoryPoolMessage.class);
Ping ping = assertNextMessageIs(p1, Ping.class); Ping ping = assertNextMessageIs(p1, Ping.class);
inbound(p1, new Pong(ping.getNonce())); inbound(p1, ping.pong());
// Await restart of the chain download. // Await restart of the chain download.
GetDataMessage getdata = assertNextMessageIs(p1, GetDataMessage.class); GetDataMessage getdata = assertNextMessageIs(p1, GetDataMessage.class);
@ -858,12 +858,12 @@ public class PeerGroupTest extends TestWithPeerGroup {
peerGroup.waitForJobQueue(); peerGroup.waitForJobQueue();
newFilter = assertNextMessageIs(p1, BloomFilter.class); newFilter = assertNextMessageIs(p1, BloomFilter.class);
assertNextMessageIs(p1, MemoryPoolMessage.class); assertNextMessageIs(p1, MemoryPoolMessage.class);
inbound(p1, new Pong(assertNextMessageIs(p1, Ping.class).getNonce())); inbound(p1, assertNextMessageIs(p1, Ping.class).pong());
assertNextMessageIs(p1, GetDataMessage.class); assertNextMessageIs(p1, GetDataMessage.class);
newBlocks = blocks.subList(6, blocks.size()); newBlocks = blocks.subList(6, blocks.size());
filterAndSend(p1, newBlocks, newFilter); filterAndSend(p1, newBlocks, newFilter);
// Send a non-tx message so the peer knows the filtered block is over and force processing. // Send a non-tx message so the peer knows the filtered block is over and force processing.
inbound(p1, new Ping()); inbound(p1, Ping.random());
pingAndWait(p1); pingAndWait(p1);
assertEquals(expectedBalance, wallet.getBalance()); assertEquals(expectedBalance, wallet.getBalance());

View file

@ -515,7 +515,7 @@ public class PeerTest extends TestWithNetworkConnections {
Ping pingMsg = (Ping) outbound(writeTarget); Ping pingMsg = (Ping) outbound(writeTarget);
TimeUtils.rollMockClock(Duration.ofSeconds(5)); TimeUtils.rollMockClock(Duration.ofSeconds(5));
// The pong is returned. // The pong is returned.
inbound(writeTarget, new Pong(pingMsg.getNonce())); inbound(writeTarget, pingMsg.pong());
pingAndWait(writeTarget); pingAndWait(writeTarget);
assertTrue(future.isDone()); assertTrue(future.isDone());
Duration elapsed = future.get(); Duration elapsed = future.get();
@ -526,7 +526,7 @@ public class PeerTest extends TestWithNetworkConnections {
CompletableFuture<Duration> future2 = peer.sendPing(); CompletableFuture<Duration> future2 = peer.sendPing();
pingMsg = (Ping) outbound(writeTarget); pingMsg = (Ping) outbound(writeTarget);
TimeUtils.rollMockClock(Duration.ofSeconds(50)); TimeUtils.rollMockClock(Duration.ofSeconds(50));
inbound(writeTarget, new Pong(pingMsg.getNonce())); inbound(writeTarget, pingMsg.pong());
Duration elapsed2 = future2.get(); Duration elapsed2 = future2.get();
assertEquals(elapsed2, peer.lastPingInterval().get()); assertEquals(elapsed2, peer.lastPingInterval().get());
assertEquals(Duration.ofMillis(7250), peer.pingInterval().get()); assertEquals(Duration.ofMillis(7250), peer.pingInterval().get());

View file

@ -56,7 +56,7 @@ public abstract class InboundMessageQueuer extends PeerSocketHandler {
@Override @Override
protected void processMessage(Message m) throws Exception { protected void processMessage(Message m) throws Exception {
if (m instanceof Ping) { if (m instanceof Ping) {
CompletableFuture<Void> future = mapPingFutures.get(((Ping) m).getNonce()); CompletableFuture<Void> future = mapPingFutures.get(((Ping) m).nonce());
if (future != null) { if (future != null) {
future.complete(null); future.complete(null);
return; return;

View file

@ -219,7 +219,7 @@ public class TestWithNetworkConnections {
// Send a ping and wait for it to get to the other side // Send a ping and wait for it to get to the other side
CompletableFuture<Void> pingReceivedFuture = new CompletableFuture<>(); CompletableFuture<Void> pingReceivedFuture = new CompletableFuture<>();
p.mapPingFutures.put(nonce, pingReceivedFuture); p.mapPingFutures.put(nonce, pingReceivedFuture);
p.peer.sendMessage(new Ping(nonce)); p.peer.sendMessage(Ping.of(nonce));
pingReceivedFuture.get(); pingReceivedFuture.get();
p.mapPingFutures.remove(nonce); p.mapPingFutures.remove(nonce);
} }
@ -228,14 +228,14 @@ public class TestWithNetworkConnections {
// Receive a ping (that the Peer doesn't see) and wait for it to get through the socket // Receive a ping (that the Peer doesn't see) and wait for it to get through the socket
final CompletableFuture<Void> pongReceivedFuture = new CompletableFuture<>(); final CompletableFuture<Void> pongReceivedFuture = new CompletableFuture<>();
PreMessageReceivedEventListener listener = (p1, m) -> { PreMessageReceivedEventListener listener = (p1, m) -> {
if (m instanceof Pong && ((Pong) m).getNonce() == nonce) { if (m instanceof Pong && ((Pong) m).nonce() == nonce) {
pongReceivedFuture.complete(null); pongReceivedFuture.complete(null);
return null; return null;
} }
return m; return m;
}; };
p.peer.addPreMessageReceivedEventListener(Threading.SAME_THREAD, listener); p.peer.addPreMessageReceivedEventListener(Threading.SAME_THREAD, listener);
inbound(p, new Pong(nonce)); inbound(p, Pong.of(nonce));
pongReceivedFuture.get(); pongReceivedFuture.get();
p.peer.removePreMessageReceivedEventListener(listener); p.peer.removePreMessageReceivedEventListener(listener);
} }