Make PeerGroup use the Guava services framework. Makes start/stop optionally non-blocking. Resolves issue 258.

This commit is contained in:
Mike Hearn 2012-12-21 22:37:12 +01:00
parent 826aafd3e0
commit 93893e10ad
3 changed files with 49 additions and 55 deletions

View File

@ -56,7 +56,8 @@ public interface PeerEventListener {
public void onPeerConnected(Peer peer, int peerCount);
/**
* Called when a peer is disconnected
* Called when a peer is disconnected. Note that this won't be called if the listener is registered on a
* {@link PeerGroup} and the group is in the process of shutting down.
*
* @param peer
* @param peerCount the total number of connected peers

View File

@ -22,10 +22,7 @@ import com.google.bitcoin.discovery.PeerDiscovery;
import com.google.bitcoin.discovery.PeerDiscoveryException;
import com.google.bitcoin.utils.EventListenerInvoker;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.*;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
@ -43,26 +40,31 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Maintain a number of connections to peers.<p>
* <p>Maintain a number of connections to peers.</p>
*
* PeerGroup tries to maintain a constant number of connections to a set of distinct peers.
* <p>PeerGroup tries to maintain a constant number of connections to a set of distinct peers.
* Each peer runs a network listener in its own thread. When a connection is lost, a new peer
* will be tried after a delay as long as the number of connections less than the maximum.<p>
* will be tried after a delay as long as the number of connections less than the maximum.</p>
*
* Connections are made to addresses from a provided list. When that list is exhausted,
* we start again from the head of the list.<p>
* <p>Connections are made to addresses from a provided list. When that list is exhausted,
* we start again from the head of the list.</p>
*
* The PeerGroup can broadcast a transaction to the currently connected set of peers. It can
* also handle download of the blockchain from peers, restarting the process when peers die.
* <p>The PeerGroup can broadcast a transaction to the currently connected set of peers. It can
* also handle download of the blockchain from peers, restarting the process when peers die.</p>
*
* <p>PeerGroup implements the {@link Service} interface. This means before it will do anything,
* you must call the {@link com.google.common.util.concurrent.Service#start()} method (which returns
* a future) or {@link com.google.common.util.concurrent.Service#startAndWait()} method, which will block
* until peer discovery is completed and some outbound connections have been initiated (it will return
* before handshaking is done, however). You should call {@link com.google.common.util.concurrent.Service#stop()}
* when finished. Note that not all methods of PeerGroup are safe to call from a UI thread as some may do
* network IO, but starting and stopping the service should be fine.</p>
*/
public class PeerGroup {
public class PeerGroup extends AbstractIdleService {
private static final int DEFAULT_CONNECTIONS = 4;
private static final Logger log = LoggerFactory.getLogger(PeerGroup.class);
// True if start() has been called.
private boolean running;
// These lists are all thread-safe so do not have to be accessed under the PeerGroup lock.
// Addresses to try to connect to, excluding active peers.
private List<PeerAddress> inactives;
@ -250,7 +252,7 @@ public class PeerGroup {
int adjustment;
synchronized (this) {
this.maxConnections = maxConnections;
if (!running) return;
if (!isRunning()) return;
}
// We may now have too many or too few open connections. Adding the sizes together here is a race condition.
adjustment = maxConnections - (peers.size() + pendingPeers.size());
@ -428,7 +430,7 @@ public class PeerGroup {
protected void connectToAnyPeer() throws PeerDiscoveryException {
// Do not call this method whilst synchronized on the PeerGroup lock.
final PeerAddress addr;
if (!isRunning()) return;
if (!(state() == State.STARTING || state() == State.RUNNING)) return;
synchronized (inactives) {
if (inactives.size() == 0) {
discoverPeers();
@ -449,44 +451,30 @@ public class PeerGroup {
connectTo(addr.toSocketAddress(), false);
}
/**
* Starts the PeerGroup. This may block whilst peer discovery takes place.
* @throws IllegalStateException if the PeerGroup is already running.
*/
public void start() {
@Override
protected void startUp() throws Exception {
// This is run in a background thread by the AbstractIdleService implementation.
synchronized (this) {
if (running)
throw new IllegalStateException("PeerGroup is already running.");
pingTimer = new Timer("Peer pinging thread", true);
running = true;
}
// Bring up the requested number of connections. If a connect attempt fails, new peers will be tried until
// there is a success, so just calling connectToAnyPeer for the wanted number of peers is sufficient.
// Bring up the requested number of connections. If a connect attempt fails,
// new peers will be tried until there is a success, so just calling connectToAnyPeer for the wanted number
// of peers is sufficient.
for (int i = 0; i < getMaxConnections(); i++) {
try {
connectToAnyPeer();
} catch (PeerDiscoveryException e) {
if (e.getCause() instanceof InterruptedException) return;
log.error(e.getMessage());
}
}
}
/**
* <p>Stop this PeerGroup.</p>
*
* <p>The peer group will be shut down and all background threads and resources terminated. After a PeerGroup is
* stopped it can't be restarted again, create a new one instead.</p>
*
* @throws IllegalStateException if the PeerGroup wasn't started.
*/
public synchronized void stop() {
if (!running)
throw new IllegalStateException("PeerGroup not started");
running = false;
pingTimer.cancel();
for (PeerDiscovery peerDiscovery : peerDiscoverers) {
peerDiscovery.shutdown();
@Override
protected void shutDown() throws Exception {
// This is run on a separate thread by the AbstractIdleService implementation.
synchronized (this) {
pingTimer.cancel();
}
// TODO: Make this shutdown process use a ChannelGroup.
LinkedList<ChannelFuture> futures;
@ -498,6 +486,9 @@ public class PeerGroup {
future.getChannel().close();
}
bootstrap.releaseExternalResources();
for (PeerDiscovery peerDiscovery : peerDiscoverers) {
peerDiscovery.shutdown();
}
}
/**
@ -558,10 +549,6 @@ public class PeerGroup {
return peers.size();
}
public synchronized boolean isRunning() {
return running;
}
/**
* Connect to a peer by creating a Netty channel to the destination address.
*
@ -804,10 +791,7 @@ public class PeerGroup {
// This can run on any Netty worker thread. Because connectToAnyPeer() must run unlocked to avoid circular
// deadlock, this method must run largely unlocked too. Some members are thread-safe and others aren't, so
// we synchronize only the parts that need it.
if (!isRunning()) {
log.info("Peer death while shutting down");
return;
}
if (!isRunning()) return;
checkArgument(!peers.contains(peer));
final Peer downloadPeer;
final PeerEventListener downloadListener;

View File

@ -30,8 +30,9 @@ import javax.swing.table.AbstractTableModel;
import java.awt.*;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.awt.event.WindowAdapter;
import java.awt.event.WindowEvent;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
/**
* Shows connected peers in a table view, so you can watch as they come and go.
@ -40,7 +41,6 @@ public class PeerMonitor {
private NetworkParameters params;
private PeerGroup peerGroup;
private PeerTableModel peerTableModel;
private ScheduledThreadPoolExecutor pingService;
public static void main(String[] args) throws Exception {
BriefLogFormatter.init();
@ -83,7 +83,16 @@ public class PeerMonitor {
private void setupGUI() {
JFrame window = new JFrame("Network monitor");
window.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
window.setDefaultCloseOperation(JFrame.DO_NOTHING_ON_CLOSE);
window.addWindowListener(new WindowAdapter() {
@Override
public void windowClosing(WindowEvent windowEvent) {
System.out.println("Shutting down ...");
peerGroup.stopAndWait();
System.out.println("Shutdown complete.");
System.exit(0);
}
});
JPanel panel = new JPanel();
JLabel instructions = new JLabel("Number of peers to connect to: ");