diff --git a/core/src/main/java/com/google/bitcoin/core/Peer.java b/core/src/main/java/com/google/bitcoin/core/Peer.java
index 43a2e558a..6d32174e2 100644
--- a/core/src/main/java/com/google/bitcoin/core/Peer.java
+++ b/core/src/main/java/com/google/bitcoin/core/Peer.java
@@ -768,11 +768,13 @@ public class Peer {
/**
* Sends the peer a ping message and returns a future that will be invoked when the pong is received back.
* The future provides a number which is the number of milliseconds elapsed between the ping and the pong.
+ * Once the pong is received the value returned by {@link com.google.bitcoin.core.Peer#getLastPingTime()} is
+ * updated.
* @throws ProtocolException if the peer version is too low to support measurable pings.
*/
public ListenableFuture A ClientBootstrap creates raw (TCP) connections to other nodes on the network. Normally you won't need to
* provide one - use the other constructors. Providing your own bootstrap is useful if you want to control
* details like how many network threads are used, the connection timeout value and so on. To do this, you can
- * use {@link PeerGroup.createClientBootstrap()} method and then customize the resulting object. Example:
* ClientBootstrap bootstrap = PeerGroup.createClientBootstrap(); @@ -175,9 +184,12 @@ public class PeerGroup { // Configure Netty. The "ClientBootstrap" creates connections to other nodes. It can be configured in various // ways to control the network. - this.bootstrap = bootstrap != null ? bootstrap : createClientBootstrap(); - if (this.bootstrap.getPipelineFactory() == null) + if (bootstrap == null) { + this.bootstrap = createClientBootstrap(); this.bootstrap.setPipelineFactory(makePipelineFactory(params, chain)); + } else { + this.bootstrap = bootstrap; + } inactives = Collections.synchronizedList(new ArrayList()); peers = Collections.synchronizedList(new ArrayList ()); @@ -442,9 +454,13 @@ public class PeerGroup { /** * Starts the PeerGroup. This may block whilst peer discovery takes place. + * @throws IllegalStateException if the PeerGroup is already running. */ public void start() { synchronized (this) { + if (running) + throw new IllegalStateException("PeerGroup is already running."); + pingTimer = new Timer("Peer pinging thread", true); running = true; } // Bring up the requested number of connections. If a connect attempt fails, new peers will be tried until @@ -459,29 +475,32 @@ public class PeerGroup { } /** - * Stop this PeerGroup. + * Stop this PeerGroup.
* - *The peer group will be asynchronously shut down. Some time after it is shut down all peers - * will be disconnected and no threads will be running. - * - *
It is an error to call any other method on PeerGroup after calling this one. + *
The peer group will be shut down and all background threads and resources terminated. After a PeerGroup is + * stopped it can't be restarted again, create a new one instead.
+ * + * @throws IllegalStateException if the PeerGroup wasn't started. */ public synchronized void stop() { - if (running) { - running = false; - for (PeerDiscovery peerDiscovery : peerDiscoverers) { - peerDiscovery.shutdown(); - } - LinkedListfutures; - synchronized (channelFutures) { - // Copy the list here because the act of closing the channel modifies the channelFutures map. - futures = new LinkedList (channelFutures.values()); - } - for (ChannelFuture future : futures) { - future.getChannel().close(); - } - bootstrap.releaseExternalResources(); + if (!running) + throw new IllegalStateException("PeerGroup not started"); + + running = false; + pingTimer.cancel(); + for (PeerDiscovery peerDiscovery : peerDiscoverers) { + peerDiscovery.shutdown(); } + // TODO: Make this shutdown process use a ChannelGroup. + LinkedList futures; + synchronized (channelFutures) { + // Copy the list here because the act of closing the channel modifies the channelFutures map. + futures = new LinkedList (channelFutures.values()); + } + for (ChannelFuture future : futures) { + future.getChannel().close(); + } + bootstrap.releaseExternalResources(); } /** @@ -560,6 +579,11 @@ public class PeerGroup { // Internal version. Do not call whilst holding the PeerGroup lock. protected ChannelFuture connectTo(SocketAddress address, boolean incrementMaxConnections) { ChannelFuture future = bootstrap.connect(address); + // When the channel has connected and version negotiated successfully, handleNewPeer will end up being called on + // a worker thread. + + // Set up the address on the TCPNetworkConnection handler object. + // TODO: This is stupid and racy, get rid of it. TCPNetworkConnection.NetworkHandler networkHandler = (TCPNetworkConnection.NetworkHandler) future.getChannel().getPipeline().get("codec"); if (networkHandler != null) { @@ -623,11 +647,12 @@ public class PeerGroup { } protected synchronized void handleNewPeer(final Peer peer) { - // Runs on a peer thread for every peer that is newly connected. + // Runs on a netty worker thread for every peer that is newly connected. log.info("{}: New peer", peer); // Link the peer to the memory pool so broadcast transactions have their confidence levels updated. peer.setMemoryPool(memoryPool); // If we want to download the chain, and we aren't currently doing so, do so now. + // TODO: Change this so we automatically switch the download peer based on ping times. if (downloadListener != null && downloadPeer == null) { log.info(" starting block chain download"); startBlockChainDownloadFromPeer(peer); @@ -652,6 +677,7 @@ public class PeerGroup { for (PeerEventListener listener : peerEventListeners) { peer.addEventListener(listener); } + setupPingingForNewPeer(peer); EventListenerInvoker.invoke(peerEventListeners, new EventListenerInvoker () { @Override public void invoke(PeerEventListener listener) { @@ -660,6 +686,47 @@ public class PeerGroup { }); } + private void setupPingingForNewPeer(final Peer peer) { + if (peer.getPeerVersionMessage().clientVersion < Pong.MIN_PROTOCOL_VERSION) + return; + if (getPingIntervalMsec() <= 0) + return; // Disabled. + // Start the process of pinging the peer. Do a ping right now and then ensure there's a fixed delay between + // each ping. If the peer is taken out of the peers list then the cycle will stop. + new Runnable() { + private boolean firstRun = true; + public void run() { + // Ensure that the first ping happens immediately and later pings after the requested delay. + if (firstRun) { + firstRun = false; + try { + peer.ping().addListener(this, MoreExecutors.sameThreadExecutor()); + } catch (Exception e) { + log.warn("{}: Exception whilst trying to ping peer: {}", peer, e.toString()); + return; + } + return; + } + + final long interval = getPingIntervalMsec(); + if (interval <= 0) + return; // Disabled. + pingTimer.schedule(new TimerTask() { + @Override + public void run() { + try { + if (!peers.contains(peer)) + return; // Peer was removed/shut down. + peer.ping().addListener(this, MoreExecutors.sameThreadExecutor()); + } catch (Exception e) { + log.warn("{}: Exception whilst trying to ping peer: {}", peer, e.toString()); + } + } + }, interval); + } + }.run(); + } + /** Returns true if at least one peer received an inv. */ private synchronized boolean announcePendingWalletTransactions(List announceWallets, List announceToPeers) { @@ -962,6 +1029,26 @@ public class PeerGroup { return future; } + /** + * Returns the period between pings for an individual peer. Setting this lower means more accurate and timely ping + * times are available via {@link com.google.bitcoin.core.Peer#getLastPingTime()} but it increases load on the + * remote node. It defaults to 5000. + */ + public synchronized long getPingIntervalMsec() { + return pingIntervalMsec; + } + + /** + * Sets the period between pings for an individual peer. Setting this lower means more accurate and timely ping + * times are available via {@link com.google.bitcoin.core.Peer#getLastPingTime()} but it increases load on the + * remote node. It defaults to {@link PeerGroup#DEFAULT_PING_INTERVAL_MSEC}. + * Setting the value to be <= 0 disables pinging entirely, although you can still request one yourself + * using {@link com.google.bitcoin.core.Peer#ping()}. + */ + public synchronized void setPingIntervalMsec(long pingIntervalMsec) { + this.pingIntervalMsec = pingIntervalMsec; + } + private static class PeerGroupThreadFactory implements ThreadFactory { static final AtomicInteger poolNumber = new AtomicInteger(1); final ThreadGroup group; diff --git a/core/src/main/java/com/google/bitcoin/core/Pong.java b/core/src/main/java/com/google/bitcoin/core/Pong.java index 012b5dedd..f19490ea9 100644 --- a/core/src/main/java/com/google/bitcoin/core/Pong.java +++ b/core/src/main/java/com/google/bitcoin/core/Pong.java @@ -20,8 +20,11 @@ import java.io.IOException; import java.io.OutputStream; public class Pong extends Message { + /** The smallest protocol version that supports the pong response (BIP 31). Anything beyond version 60000. */ + public static final int MIN_PROTOCOL_VERSION = 60001; + private long nonce; - + public Pong(NetworkParameters params, byte[] payloadBytes) throws ProtocolException { super(params, payloadBytes, 0); } @@ -46,7 +49,6 @@ public class Pong extends Message { @Override protected void parseLite() { - } long getNonce() { diff --git a/core/src/test/java/com/google/bitcoin/core/FakeChannel.java b/core/src/test/java/com/google/bitcoin/core/FakeChannel.java index 1d0604200..f40a8c8b4 100644 --- a/core/src/test/java/com/google/bitcoin/core/FakeChannel.java +++ b/core/src/test/java/com/google/bitcoin/core/FakeChannel.java @@ -49,4 +49,8 @@ public class FakeChannel extends AbstractChannel { public ChannelEvent nextEvent() { return events.poll(); } + + public ChannelEvent nextEventBlocking() throws InterruptedException { + return events.take(); + } } diff --git a/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java b/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java index 4f9fbea91..5325fc6b6 100644 --- a/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java +++ b/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java @@ -67,6 +67,7 @@ public class PeerGroupTest extends TestWithNetworkConnections { }); peerGroup = new PeerGroup(params, blockChain, bootstrap); peerGroup.addWallet(wallet); + peerGroup.setPingIntervalMsec(0); // Disable the pings as they just get in the way of most tests. } @Test @@ -346,7 +347,6 @@ public class PeerGroupTest extends TestWithNetworkConnections { FakeChannel p3 = connectPeer(3); assertTrue(outbound(p3) instanceof InventoryMessage); - peerGroup.stop(); control.verify(); } @@ -368,12 +368,30 @@ public class PeerGroupTest extends TestWithNetworkConnections { w2.addKey(key2); assertEquals(peerGroup.getFastCatchupTimeSecs(), time - 100000); } - + @Test - public void testSetMaximumConnections() { - peerGroup.setMaxConnections(1); - peerGroup.setMaxConnections(4); - peerGroup.setMaxConnections(10); - peerGroup.setMaxConnections(1); + public void noPings() throws Exception { + peerGroup.start(); + peerGroup.setPingIntervalMsec(0); + VersionMessage versionMessage = new VersionMessage(params, 2); + versionMessage.clientVersion = Pong.MIN_PROTOCOL_VERSION; + connectPeer(1, versionMessage); + assertFalse(peerGroup.getConnectedPeers().get(0).getLastPingTime() < Long.MAX_VALUE); + } + + @Test + public void pings() throws Exception { + peerGroup.start(); + peerGroup.setPingIntervalMsec(100); + VersionMessage versionMessage = new VersionMessage(params, 2); + versionMessage.clientVersion = Pong.MIN_PROTOCOL_VERSION; + FakeChannel p1 = connectPeer(1, versionMessage); + Ping ping = (Ping) outbound(p1); + inbound(p1, new Pong(ping.getNonce())); + assertTrue(peerGroup.getConnectedPeers().get(0).getLastPingTime() < Long.MAX_VALUE); + // The call to outbound should block until a ping arrives. + ping = (Ping) waitForOutbound(p1); + inbound(p1, new Pong(ping.getNonce())); + assertTrue(peerGroup.getConnectedPeers().get(0).getLastPingTime() < Long.MAX_VALUE); } } diff --git a/core/src/test/java/com/google/bitcoin/core/TestWithNetworkConnections.java b/core/src/test/java/com/google/bitcoin/core/TestWithNetworkConnections.java index fea480194..3101702c7 100644 --- a/core/src/test/java/com/google/bitcoin/core/TestWithNetworkConnections.java +++ b/core/src/test/java/com/google/bitcoin/core/TestWithNetworkConnections.java @@ -16,19 +16,19 @@ package com.google.bitcoin.core; -import static org.easymock.EasyMock.*; +import com.google.bitcoin.store.MemoryBlockStore; +import com.google.bitcoin.utils.BriefLogFormatter; +import org.easymock.EasyMock; +import org.easymock.IMocksControl; +import org.jboss.netty.channel.*; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; -import org.easymock.EasyMock; -import org.easymock.IMocksControl; -import org.jboss.netty.channel.*; - -import com.google.bitcoin.store.MemoryBlockStore; -import com.google.bitcoin.utils.BriefLogFormatter; +import static org.easymock.EasyMock.createStrictControl; +import static org.easymock.EasyMock.expect; /** * Utility class that makes it easy to work with mock NetworkConnections. @@ -120,6 +120,10 @@ public class TestWithNetworkConnections { return nextEvent.getMessage(); } + protected Object waitForOutbound(FakeChannel ch) throws InterruptedException { + return ((MessageEvent)ch.nextEventBlocking()).getMessage(); + } + protected Peer peerOf(Channel ch) { return PeerGroup.peerFromChannel(ch); } diff --git a/examples/src/main/java/com/google/bitcoin/examples/PeerMonitor.java b/examples/src/main/java/com/google/bitcoin/examples/PeerMonitor.java index 655f932ac..03d6cbbae 100644 --- a/examples/src/main/java/com/google/bitcoin/examples/PeerMonitor.java +++ b/examples/src/main/java/com/google/bitcoin/examples/PeerMonitor.java @@ -16,20 +16,22 @@ package com.google.bitcoin.examples; -import com.google.bitcoin.core.*; +import com.google.bitcoin.core.AbstractPeerEventListener; +import com.google.bitcoin.core.NetworkParameters; +import com.google.bitcoin.core.Peer; +import com.google.bitcoin.core.PeerGroup; import com.google.bitcoin.discovery.DnsDiscovery; import com.google.bitcoin.utils.BriefLogFormatter; -import com.google.common.util.concurrent.MoreExecutors; import javax.swing.*; import javax.swing.event.ChangeEvent; import javax.swing.event.ChangeListener; import javax.swing.table.AbstractTableModel; import java.awt.*; -import java.io.IOException; +import java.awt.event.ActionEvent; +import java.awt.event.ActionListener; import java.util.List; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; /** * Shows connected peers in a table view, so you can watch as they come and go. @@ -57,17 +59,10 @@ public class PeerMonitor { peerGroup.setUserAgent("PeerMonitor", "1.0"); peerGroup.setMaxConnections(4); peerGroup.addPeerDiscovery(new DnsDiscovery(params)); - pingService = new ScheduledThreadPoolExecutor(1); peerGroup.addEventListener(new AbstractPeerEventListener() { @Override public void onPeerConnected(final Peer peer, int peerCount) { refreshUI(); - // Ping the peer with a 1 second delay between pings. - pingService.scheduleWithFixedDelay(new Runnable() { - public void run() { - pingPeer(peer); - } - }, 0, 1, TimeUnit.SECONDS); } @Override @@ -77,23 +72,6 @@ public class PeerMonitor { }); } - private void pingPeer(final Peer peer) { - try { - // Annoyingly, java.awt.EventQueue is not an executor, so we can't - // dispatch the listener directly to the right thread. - peer.ping().addListener(new Runnable() { - public void run() { - // When we get the pong message back, refresh the table. - refreshUI(); - } - }, MoreExecutors.sameThreadExecutor()); - } catch (IOException e) { - e.printStackTrace(); - } catch (ProtocolException e) { - // Peer is too old to support pinging, so just ignore this here. - } - } - private void refreshUI() { // Tell the Swing UI thread to redraw the peers table. SwingUtilities.invokeLater(new Runnable() { @@ -127,6 +105,13 @@ public class PeerMonitor { window.pack(); window.setSize(640, 480); window.setVisible(true); + + // Refresh the UI every half second to get the latest ping times. The event handler runs in the UI thread. + new Timer(1000, new ActionListener() { + public void actionPerformed(ActionEvent actionEvent) { + peerTableModel.fireTableDataChanged(); + } + }).start(); } private class PeerTableModel extends AbstractTableModel {