Properly close connecting peers.

Keep a collecting of pending peers.  Better socket closing.

Update issue 161.
This commit is contained in:
Miron Cuperman 2012-03-27 10:28:06 -07:00
parent 1e52a6eccc
commit 3bc999a032
3 changed files with 18 additions and 7 deletions

View File

@ -700,6 +700,7 @@ public class Peer {
* <p>This does not wait for the loop to terminate. * <p>This does not wait for the loop to terminate.
*/ */
public synchronized void disconnect() { public synchronized void disconnect() {
log.debug("Disconnecting peer");
running = false; running = false;
try { try {
// This is the correct way to stop an IO bound loop // This is the correct way to stop an IO bound loop

View File

@ -64,6 +64,8 @@ public class PeerGroup {
private ThreadPoolExecutor peerPool; private ThreadPoolExecutor peerPool;
// Currently active peers // Currently active peers
private Set<Peer> peers; private Set<Peer> peers;
// Currently connecting peers
private Set<Peer> pendingPeers;
// The peer we are currently downloading the chain from // The peer we are currently downloading the chain from
private Peer downloadPeer; private Peer downloadPeer;
// Callback for events related to chain download // Callback for events related to chain download
@ -111,6 +113,7 @@ public class PeerGroup {
inactives = new LinkedBlockingQueue<PeerAddress>(); inactives = new LinkedBlockingQueue<PeerAddress>();
// TODO: Remove usage of synchronized sets here in favor of simple coarse-grained locking. // TODO: Remove usage of synchronized sets here in favor of simple coarse-grained locking.
peers = Collections.synchronizedSet(new HashSet<Peer>()); peers = Collections.synchronizedSet(new HashSet<Peer>());
pendingPeers = Collections.synchronizedSet(new HashSet<Peer>());
peerDiscoverers = new CopyOnWriteArraySet<PeerDiscovery>(); peerDiscoverers = new CopyOnWriteArraySet<PeerDiscovery>();
peerPool = new ThreadPoolExecutor( peerPool = new ThreadPoolExecutor(
DEFAULT_CONNECTIONS, DEFAULT_CONNECTIONS,
@ -446,6 +449,12 @@ public class PeerGroup {
} }
} }
peers = null; // Fail quickly if someone tries to access peers while we are shutting down. peers = null; // Fail quickly if someone tries to access peers while we are shutting down.
synchronized (pendingPeers) {
for (Peer peer : pendingPeers) {
peer.disconnect();
}
}
pendingPeers = null;
} }
} }
@ -527,11 +536,18 @@ public class PeerGroup {
peerPool.execute(new Runnable() { peerPool.execute(new Runnable() {
public void run() { public void run() {
try { try {
if (shouldConnect) { synchronized (PeerGroup.this) {
// Add peer to pendingPeers so that we can shut it down if PeerGroup shuts down
pendingPeers.add(peer);
}
// Recheck if running, in case we shut down in the mean time
if (running && shouldConnect) {
log.info("Connecting to " + peer); log.info("Connecting to " + peer);
peer.connect(); peer.connect();
} }
synchronized (PeerGroup.this) { synchronized (PeerGroup.this) {
pendingPeers.remove(peer);
// We may have started shutting down the group since we started connecting. // We may have started shutting down the group since we started connecting.
// In this case, we must not add ourself to the list of peers because the controller // In this case, we must not add ourself to the list of peers because the controller
// thread already went through it. // thread already went through it.

View File

@ -165,12 +165,6 @@ public class TCPNetworkConnection implements NetworkConnection {
} }
public void shutdown() throws IOException { public void shutdown() throws IOException {
try {
socket.shutdownOutput();
socket.shutdownInput();
} catch (SocketException e) {
// ignore - might be still connecting
}
socket.close(); socket.close();
} }