Fix removal of event listeners. Make PeerGroup also allow specification of arbitrary executors and run event listeners in those.

This commit is contained in:
Mike Hearn 2013-06-25 16:42:16 +02:00
parent c552c0cbdd
commit 50b71979bb
3 changed files with 72 additions and 19 deletions

View File

@ -20,6 +20,7 @@ package com.google.bitcoin.core;
import com.google.bitcoin.core.Peer.PeerHandler;
import com.google.bitcoin.discovery.PeerDiscovery;
import com.google.bitcoin.discovery.PeerDiscoveryException;
import com.google.bitcoin.utils.ListenerRegistration;
import com.google.bitcoin.utils.Threading;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
@ -87,7 +88,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
// Callback for events related to chain download
@GuardedBy("lock") private PeerEventListener downloadListener;
// Callbacks for events related to peer connection/disconnection
private final CopyOnWriteArrayList<PeerEventListener> peerEventListeners;
private final CopyOnWriteArrayList<ListenerRegistration<PeerEventListener>> peerEventListeners;
// Peer discovery sources, will be polled occasionally if there aren't enough inactives.
private CopyOnWriteArraySet<PeerDiscovery> peerDiscoverers;
// The version message to use for new connections.
@ -228,7 +229,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
pendingPeers = new ArrayList<Peer>();
channels = new DefaultChannelGroup();
peerDiscoverers = new CopyOnWriteArraySet<PeerDiscovery>();
peerEventListeners = new CopyOnWriteArrayList<PeerEventListener>();
peerEventListeners = new CopyOnWriteArrayList<ListenerRegistration<PeerEventListener>>();
}
/**
@ -419,7 +420,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
/**
* <p>Adds a listener that will be notified on a library controlled thread when:</p>
* <p>Adds a listener that will be notified on the given executor when:</p>
* <ol>
* <li>New peers are connected to.</li>
* <li>Peers are disconnected from.</li>
@ -428,16 +429,22 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
* <li>Blocks are downloaded by the download peer.</li>
* </li>
* </ol>
* <p>The listener will be locked during callback execution, which in turn will cause network message processing
* to stop until the listener returns.</p>
*/
public void addEventListener(PeerEventListener listener, Executor executor) {
peerEventListeners.add(new ListenerRegistration<PeerEventListener>(checkNotNull(listener), executor));
}
/**
* Same as {@link PeerGroup#addEventListener(PeerEventListener, java.util.concurrent.Executor)} but defaults
* to running on the user thread.
*/
public void addEventListener(PeerEventListener listener) {
peerEventListeners.add(checkNotNull(listener));
addEventListener(listener, Threading.userCode);
}
/** The given event listener will no longer be called with events. */
public boolean removeEventListener(PeerEventListener listener) {
return peerEventListeners.remove(checkNotNull(listener));
return ListenerRegistration.removeFromList(listener, peerEventListeners);
}
/**
@ -816,15 +823,23 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
// TODO: Find a way to balance the desire to propagate useful transactions against DoS attacks.
announcePendingWalletTransactions(wallets, Collections.singletonList(peer));
// And set up event listeners for clients. This will allow them to find out about new transactions and blocks.
for (PeerEventListener listener : peerEventListeners) {
peer.addEventListener(listener);
for (ListenerRegistration<PeerEventListener> registration : peerEventListeners) {
peer.addEventListener(registration.listener);
}
setupPingingForNewPeer(peer);
} finally {
lock.unlock();
}
for (PeerEventListener listener : peerEventListeners)
listener.onPeerConnected(peer, newSize);
final int fNewSize = newSize;
for (final ListenerRegistration<PeerEventListener> registration : peerEventListeners) {
registration.executor.execute(new Runnable() {
@Override
public void run() {
registration.listener.onPeerConnected(peer, fNewSize);
}
});
}
}
private void setupPingingForNewPeer(final Peer peer) {
@ -1010,9 +1025,16 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
for (Wallet wallet : wallets) {
peer.removeWallet(wallet);
}
for (PeerEventListener listener : peerEventListeners) {
listener.onPeerDisconnected(peer, numConnectedPeers);
peer.removeEventListener(listener);
final int fNumConnectedPeers = numConnectedPeers;
for (final ListenerRegistration<PeerEventListener> registration : peerEventListeners) {
registration.executor.execute(new Runnable() {
@Override
public void run() {
registration.listener.onPeerDisconnected(peer, fNumConnectedPeers);
}
});
peer.removeEventListener(registration.listener);
}
}

View File

@ -1367,7 +1367,7 @@ public class Wallet implements Serializable, BlockChainListener {
* like receiving money. The listener is executed by the given executor.
*/
public void addEventListener(WalletEventListener listener, Executor executor) {
eventListeners.add(new ListenerRegistration(listener, executor));
eventListeners.add(new ListenerRegistration<WalletEventListener>(listener, executor));
}
/**
@ -1375,7 +1375,7 @@ public class Wallet implements Serializable, BlockChainListener {
* was never added.
*/
public boolean removeEventListener(WalletEventListener listener) {
return eventListeners.remove(listener);
return ListenerRegistration.removeFromList(listener, eventListeners);
}
/**

View File

@ -1,11 +1,26 @@
/**
* Copyright 2013 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.bitcoin.utils;
import com.google.bitcoin.core.WalletEventListener;
import java.util.List;
import java.util.concurrent.Executor;
/**
* A simple wrapper around a listener and an executor.
* A simple wrapper around a listener and an executor, with some utility methods.
*/
public class ListenerRegistration<T> {
public T listener;
@ -15,4 +30,20 @@ public class ListenerRegistration<T> {
this.listener = listener;
this.executor = executor;
}
public static <T> boolean removeFromList(T listener, List<ListenerRegistration<T>> list) {
ListenerRegistration<T> item = null;
for (ListenerRegistration<T> registration : list) {
if (registration.listener == listener) {
item = registration;
break;
}
}
if (item != null) {
list.remove(item);
return true;
} else {
return false;
}
}
}