Have PeerGroup ping peers that support BIP 31 ping/pong responses, by default every 5 seconds. The last time ping is available in Peer.getLastPingTime(). This will shortly be used for ordering peers by responsiveness so we don't try and download from overloaded peers.

This commit is contained in:
Mike Hearn 2012-12-07 12:47:38 +01:00
parent f440913c1d
commit 5cc9710e1f
7 changed files with 171 additions and 69 deletions

View File

@ -768,11 +768,13 @@ public class Peer {
/**
* Sends the peer a ping message and returns a future that will be invoked when the pong is received back.
* The future provides a number which is the number of milliseconds elapsed between the ping and the pong.
* Once the pong is received the value returned by {@link com.google.bitcoin.core.Peer#getLastPingTime()} is
* updated.
* @throws ProtocolException if the peer version is too low to support measurable pings.
*/
public ListenableFuture<Long> ping() throws IOException, ProtocolException {
int peerVersion = getPeerVersionMessage().clientVersion;
if (peerVersion < 60000)
if (peerVersion < Pong.MIN_PROTOCOL_VERSION)
throw new ProtocolException("Peer version is too low for measurable pings: " + peerVersion);
PendingPing pendingPing = new PendingPing();
pendingPings.add(pendingPing);

View File

@ -90,14 +90,23 @@ public class PeerGroup {
// A class that tracks recent transactions that have been broadcast across the network, counts how many
// peers announced them and updates the transaction confidence data. It is passed to each Peer.
private final MemoryPool memoryPool;
// How many connections we want to have open at the current time. If we lose connections, we'll try opening more
// until we reach this count.
private int maxConnections;
// Runs a background thread that we use for scheduling pings to our peers, so we can measure their performance
// and network latency. We ping peers every pingIntervalMsec milliseconds.
private Timer pingTimer;
/** How many milliseconds to wait after receiving a pong before sending another ping. */
public static final long DEFAULT_PING_INTERVAL_MSEC = 5000;
private long pingIntervalMsec = DEFAULT_PING_INTERVAL_MSEC;
private final NetworkParameters params;
private final AbstractBlockChain chain;
private long fastCatchupTimeSecs;
private ArrayList<Wallet> wallets;
private AbstractPeerEventListener getDataListener;
private AbstractPeerEventListener getDataListener;
private ClientBootstrap bootstrap;
private int minBroadcastConnections = 0;
@ -146,7 +155,7 @@ public class PeerGroup {
* <p>A ClientBootstrap creates raw (TCP) connections to other nodes on the network. Normally you won't need to
* provide one - use the other constructors. Providing your own bootstrap is useful if you want to control
* details like how many network threads are used, the connection timeout value and so on. To do this, you can
* use {@link PeerGroup.createClientBootstrap()} method and then customize the resulting object. Example:</p>
* use {@link PeerGroup#createClientBootstrap()} method and then customize the resulting object. Example:</p>
*
* <pre>
* ClientBootstrap bootstrap = PeerGroup.createClientBootstrap();
@ -175,9 +184,12 @@ public class PeerGroup {
// Configure Netty. The "ClientBootstrap" creates connections to other nodes. It can be configured in various
// ways to control the network.
this.bootstrap = bootstrap != null ? bootstrap : createClientBootstrap();
if (this.bootstrap.getPipelineFactory() == null)
if (bootstrap == null) {
this.bootstrap = createClientBootstrap();
this.bootstrap.setPipelineFactory(makePipelineFactory(params, chain));
} else {
this.bootstrap = bootstrap;
}
inactives = Collections.synchronizedList(new ArrayList<PeerAddress>());
peers = Collections.synchronizedList(new ArrayList<Peer>());
@ -442,9 +454,13 @@ public class PeerGroup {
/**
* Starts the PeerGroup. This may block whilst peer discovery takes place.
* @throws IllegalStateException if the PeerGroup is already running.
*/
public void start() {
synchronized (this) {
if (running)
throw new IllegalStateException("PeerGroup is already running.");
pingTimer = new Timer("Peer pinging thread", true);
running = true;
}
// Bring up the requested number of connections. If a connect attempt fails, new peers will be tried until
@ -459,29 +475,32 @@ public class PeerGroup {
}
/**
* Stop this PeerGroup.
* <p>Stop this PeerGroup.</p>
*
* <p>The peer group will be asynchronously shut down. Some time after it is shut down all peers
* will be disconnected and no threads will be running.
*
* <p>It is an error to call any other method on PeerGroup after calling this one.
* <p>The peer group will be shut down and all background threads and resources terminated. After a PeerGroup is
* stopped it can't be restarted again, create a new one instead.</p>
*
* @throws IllegalStateException if the PeerGroup wasn't started.
*/
public synchronized void stop() {
if (running) {
running = false;
for (PeerDiscovery peerDiscovery : peerDiscoverers) {
peerDiscovery.shutdown();
}
LinkedList<ChannelFuture> futures;
synchronized (channelFutures) {
// Copy the list here because the act of closing the channel modifies the channelFutures map.
futures = new LinkedList<ChannelFuture>(channelFutures.values());
}
for (ChannelFuture future : futures) {
future.getChannel().close();
}
bootstrap.releaseExternalResources();
if (!running)
throw new IllegalStateException("PeerGroup not started");
running = false;
pingTimer.cancel();
for (PeerDiscovery peerDiscovery : peerDiscoverers) {
peerDiscovery.shutdown();
}
// TODO: Make this shutdown process use a ChannelGroup.
LinkedList<ChannelFuture> futures;
synchronized (channelFutures) {
// Copy the list here because the act of closing the channel modifies the channelFutures map.
futures = new LinkedList<ChannelFuture>(channelFutures.values());
}
for (ChannelFuture future : futures) {
future.getChannel().close();
}
bootstrap.releaseExternalResources();
}
/**
@ -560,6 +579,11 @@ public class PeerGroup {
// Internal version. Do not call whilst holding the PeerGroup lock.
protected ChannelFuture connectTo(SocketAddress address, boolean incrementMaxConnections) {
ChannelFuture future = bootstrap.connect(address);
// When the channel has connected and version negotiated successfully, handleNewPeer will end up being called on
// a worker thread.
// Set up the address on the TCPNetworkConnection handler object.
// TODO: This is stupid and racy, get rid of it.
TCPNetworkConnection.NetworkHandler networkHandler =
(TCPNetworkConnection.NetworkHandler) future.getChannel().getPipeline().get("codec");
if (networkHandler != null) {
@ -623,11 +647,12 @@ public class PeerGroup {
}
protected synchronized void handleNewPeer(final Peer peer) {
// Runs on a peer thread for every peer that is newly connected.
// Runs on a netty worker thread for every peer that is newly connected.
log.info("{}: New peer", peer);
// Link the peer to the memory pool so broadcast transactions have their confidence levels updated.
peer.setMemoryPool(memoryPool);
// If we want to download the chain, and we aren't currently doing so, do so now.
// TODO: Change this so we automatically switch the download peer based on ping times.
if (downloadListener != null && downloadPeer == null) {
log.info(" starting block chain download");
startBlockChainDownloadFromPeer(peer);
@ -652,6 +677,7 @@ public class PeerGroup {
for (PeerEventListener listener : peerEventListeners) {
peer.addEventListener(listener);
}
setupPingingForNewPeer(peer);
EventListenerInvoker.invoke(peerEventListeners, new EventListenerInvoker<PeerEventListener>() {
@Override
public void invoke(PeerEventListener listener) {
@ -660,6 +686,47 @@ public class PeerGroup {
});
}
private void setupPingingForNewPeer(final Peer peer) {
if (peer.getPeerVersionMessage().clientVersion < Pong.MIN_PROTOCOL_VERSION)
return;
if (getPingIntervalMsec() <= 0)
return; // Disabled.
// Start the process of pinging the peer. Do a ping right now and then ensure there's a fixed delay between
// each ping. If the peer is taken out of the peers list then the cycle will stop.
new Runnable() {
private boolean firstRun = true;
public void run() {
// Ensure that the first ping happens immediately and later pings after the requested delay.
if (firstRun) {
firstRun = false;
try {
peer.ping().addListener(this, MoreExecutors.sameThreadExecutor());
} catch (Exception e) {
log.warn("{}: Exception whilst trying to ping peer: {}", peer, e.toString());
return;
}
return;
}
final long interval = getPingIntervalMsec();
if (interval <= 0)
return; // Disabled.
pingTimer.schedule(new TimerTask() {
@Override
public void run() {
try {
if (!peers.contains(peer))
return; // Peer was removed/shut down.
peer.ping().addListener(this, MoreExecutors.sameThreadExecutor());
} catch (Exception e) {
log.warn("{}: Exception whilst trying to ping peer: {}", peer, e.toString());
}
}
}, interval);
}
}.run();
}
/** Returns true if at least one peer received an inv. */
private synchronized boolean announcePendingWalletTransactions(List<Wallet> announceWallets,
List<Peer> announceToPeers) {
@ -962,6 +1029,26 @@ public class PeerGroup {
return future;
}
/**
* Returns the period between pings for an individual peer. Setting this lower means more accurate and timely ping
* times are available via {@link com.google.bitcoin.core.Peer#getLastPingTime()} but it increases load on the
* remote node. It defaults to 5000.
*/
public synchronized long getPingIntervalMsec() {
return pingIntervalMsec;
}
/**
* Sets the period between pings for an individual peer. Setting this lower means more accurate and timely ping
* times are available via {@link com.google.bitcoin.core.Peer#getLastPingTime()} but it increases load on the
* remote node. It defaults to {@link PeerGroup#DEFAULT_PING_INTERVAL_MSEC}.
* Setting the value to be <= 0 disables pinging entirely, although you can still request one yourself
* using {@link com.google.bitcoin.core.Peer#ping()}.
*/
public synchronized void setPingIntervalMsec(long pingIntervalMsec) {
this.pingIntervalMsec = pingIntervalMsec;
}
private static class PeerGroupThreadFactory implements ThreadFactory {
static final AtomicInteger poolNumber = new AtomicInteger(1);
final ThreadGroup group;

View File

@ -20,8 +20,11 @@ import java.io.IOException;
import java.io.OutputStream;
public class Pong extends Message {
/** The smallest protocol version that supports the pong response (BIP 31). Anything beyond version 60000. */
public static final int MIN_PROTOCOL_VERSION = 60001;
private long nonce;
public Pong(NetworkParameters params, byte[] payloadBytes) throws ProtocolException {
super(params, payloadBytes, 0);
}
@ -46,7 +49,6 @@ public class Pong extends Message {
@Override
protected void parseLite() {
}
long getNonce() {

View File

@ -49,4 +49,8 @@ public class FakeChannel extends AbstractChannel {
public ChannelEvent nextEvent() {
return events.poll();
}
public ChannelEvent nextEventBlocking() throws InterruptedException {
return events.take();
}
}

View File

@ -67,6 +67,7 @@ public class PeerGroupTest extends TestWithNetworkConnections {
});
peerGroup = new PeerGroup(params, blockChain, bootstrap);
peerGroup.addWallet(wallet);
peerGroup.setPingIntervalMsec(0); // Disable the pings as they just get in the way of most tests.
}
@Test
@ -346,7 +347,6 @@ public class PeerGroupTest extends TestWithNetworkConnections {
FakeChannel p3 = connectPeer(3);
assertTrue(outbound(p3) instanceof InventoryMessage);
peerGroup.stop();
control.verify();
}
@ -368,12 +368,30 @@ public class PeerGroupTest extends TestWithNetworkConnections {
w2.addKey(key2);
assertEquals(peerGroup.getFastCatchupTimeSecs(), time - 100000);
}
@Test
public void testSetMaximumConnections() {
peerGroup.setMaxConnections(1);
peerGroup.setMaxConnections(4);
peerGroup.setMaxConnections(10);
peerGroup.setMaxConnections(1);
public void noPings() throws Exception {
peerGroup.start();
peerGroup.setPingIntervalMsec(0);
VersionMessage versionMessage = new VersionMessage(params, 2);
versionMessage.clientVersion = Pong.MIN_PROTOCOL_VERSION;
connectPeer(1, versionMessage);
assertFalse(peerGroup.getConnectedPeers().get(0).getLastPingTime() < Long.MAX_VALUE);
}
@Test
public void pings() throws Exception {
peerGroup.start();
peerGroup.setPingIntervalMsec(100);
VersionMessage versionMessage = new VersionMessage(params, 2);
versionMessage.clientVersion = Pong.MIN_PROTOCOL_VERSION;
FakeChannel p1 = connectPeer(1, versionMessage);
Ping ping = (Ping) outbound(p1);
inbound(p1, new Pong(ping.getNonce()));
assertTrue(peerGroup.getConnectedPeers().get(0).getLastPingTime() < Long.MAX_VALUE);
// The call to outbound should block until a ping arrives.
ping = (Ping) waitForOutbound(p1);
inbound(p1, new Pong(ping.getNonce()));
assertTrue(peerGroup.getConnectedPeers().get(0).getLastPingTime() < Long.MAX_VALUE);
}
}

View File

@ -16,19 +16,19 @@
package com.google.bitcoin.core;
import static org.easymock.EasyMock.*;
import com.google.bitcoin.store.MemoryBlockStore;
import com.google.bitcoin.utils.BriefLogFormatter;
import org.easymock.EasyMock;
import org.easymock.IMocksControl;
import org.jboss.netty.channel.*;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import org.easymock.EasyMock;
import org.easymock.IMocksControl;
import org.jboss.netty.channel.*;
import com.google.bitcoin.store.MemoryBlockStore;
import com.google.bitcoin.utils.BriefLogFormatter;
import static org.easymock.EasyMock.createStrictControl;
import static org.easymock.EasyMock.expect;
/**
* Utility class that makes it easy to work with mock NetworkConnections.
@ -120,6 +120,10 @@ public class TestWithNetworkConnections {
return nextEvent.getMessage();
}
protected Object waitForOutbound(FakeChannel ch) throws InterruptedException {
return ((MessageEvent)ch.nextEventBlocking()).getMessage();
}
protected Peer peerOf(Channel ch) {
return PeerGroup.peerFromChannel(ch);
}

View File

@ -16,20 +16,22 @@
package com.google.bitcoin.examples;
import com.google.bitcoin.core.*;
import com.google.bitcoin.core.AbstractPeerEventListener;
import com.google.bitcoin.core.NetworkParameters;
import com.google.bitcoin.core.Peer;
import com.google.bitcoin.core.PeerGroup;
import com.google.bitcoin.discovery.DnsDiscovery;
import com.google.bitcoin.utils.BriefLogFormatter;
import com.google.common.util.concurrent.MoreExecutors;
import javax.swing.*;
import javax.swing.event.ChangeEvent;
import javax.swing.event.ChangeListener;
import javax.swing.table.AbstractTableModel;
import java.awt.*;
import java.io.IOException;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Shows connected peers in a table view, so you can watch as they come and go.
@ -57,17 +59,10 @@ public class PeerMonitor {
peerGroup.setUserAgent("PeerMonitor", "1.0");
peerGroup.setMaxConnections(4);
peerGroup.addPeerDiscovery(new DnsDiscovery(params));
pingService = new ScheduledThreadPoolExecutor(1);
peerGroup.addEventListener(new AbstractPeerEventListener() {
@Override
public void onPeerConnected(final Peer peer, int peerCount) {
refreshUI();
// Ping the peer with a 1 second delay between pings.
pingService.scheduleWithFixedDelay(new Runnable() {
public void run() {
pingPeer(peer);
}
}, 0, 1, TimeUnit.SECONDS);
}
@Override
@ -77,23 +72,6 @@ public class PeerMonitor {
});
}
private void pingPeer(final Peer peer) {
try {
// Annoyingly, java.awt.EventQueue is not an executor, so we can't
// dispatch the listener directly to the right thread.
peer.ping().addListener(new Runnable() {
public void run() {
// When we get the pong message back, refresh the table.
refreshUI();
}
}, MoreExecutors.sameThreadExecutor());
} catch (IOException e) {
e.printStackTrace();
} catch (ProtocolException e) {
// Peer is too old to support pinging, so just ignore this here.
}
}
private void refreshUI() {
// Tell the Swing UI thread to redraw the peers table.
SwingUtilities.invokeLater(new Runnable() {
@ -127,6 +105,13 @@ public class PeerMonitor {
window.pack();
window.setSize(640, 480);
window.setVisible(true);
// Refresh the UI every half second to get the latest ping times. The event handler runs in the UI thread.
new Timer(1000, new ActionListener() {
public void actionPerformed(ActionEvent actionEvent) {
peerTableModel.fireTableDataChanged();
}
}).start();
}
private class PeerTableModel extends AbstractTableModel {