Split PeerConnectionEventListener

Split PeerConnectionEventListener into individual connect, disconnect
and discovery listeners.
Remove custom listener registration from Peer, as now it's possible to
register a connect listener only, without a disconnect listener.
This commit is contained in:
Ross Nicoll 2016-02-13 15:39:57 +00:00
parent aa33c0e843
commit ea3713ec85
13 changed files with 312 additions and 161 deletions

View file

@ -62,26 +62,14 @@ public class Peer extends PeerSocketHandler {
private final AbstractBlockChain blockChain;
private final Context context;
// onPeerDisconnected should not be called directly by Peers when a PeerGroup is involved (we don't know the total
// number of connected peers), thus we use a wrapper that PeerGroup can use to register listeners that wont get
// onPeerDisconnected calls
static class PeerConnectionListenerRegistration extends ListenerRegistration<PeerConnectionEventListener> {
boolean callOnDisconnect = true;
public PeerConnectionListenerRegistration(PeerConnectionEventListener listener, Executor executor) {
super(listener, executor);
}
public PeerConnectionListenerRegistration(PeerConnectionEventListener listener, Executor executor, boolean callOnDisconnect) {
this(listener, executor);
this.callOnDisconnect = callOnDisconnect;
}
}
private final CopyOnWriteArrayList<ListenerRegistration<BlocksDownloadedEventListener>> blocksDownloadedEventListeners
= new CopyOnWriteArrayList<ListenerRegistration<BlocksDownloadedEventListener>>();
private final CopyOnWriteArrayList<ListenerRegistration<ChainDownloadStartedEventListener>> chainDownloadStartedEventListeners
= new CopyOnWriteArrayList<ListenerRegistration<ChainDownloadStartedEventListener>>();
private final CopyOnWriteArrayList<PeerConnectionListenerRegistration> connectionEventListeners
= new CopyOnWriteArrayList<PeerConnectionListenerRegistration>();
private final CopyOnWriteArrayList<ListenerRegistration<PeerConnectedEventListener>> connectedEventListeners
= new CopyOnWriteArrayList<ListenerRegistration<PeerConnectedEventListener>>();
private final CopyOnWriteArrayList<ListenerRegistration<PeerDisconnectedEventListener>> disconnectedEventListeners
= new CopyOnWriteArrayList<ListenerRegistration<PeerDisconnectedEventListener>>();
private final CopyOnWriteArrayList<ListenerRegistration<GetDataEventListener>> getDataEventListeners
= new CopyOnWriteArrayList<ListenerRegistration<GetDataEventListener>>();
private final CopyOnWriteArrayList<ListenerRegistration<PreMessageReceivedEventListener>> preMessageReceivedEventListeners
@ -262,7 +250,8 @@ public class Peer extends PeerSocketHandler {
public void addEventListener(AbstractPeerEventListener listener) {
addBlocksDownloadedEventListener(Threading.USER_THREAD, listener);
addChainDownloadStartedEventListener(Threading.USER_THREAD, listener);
addConnectionEventListener(Threading.USER_THREAD, listener);
addConnectedEventListener(Threading.USER_THREAD, listener);
addDisconnectedEventListener(Threading.USER_THREAD, listener);
addGetDataEventListener(Threading.USER_THREAD, listener);
addOnTransactionBroadcastListener(Threading.USER_THREAD, listener);
addPreMessageReceivedEventListener(Threading.USER_THREAD, listener);
@ -273,7 +262,8 @@ public class Peer extends PeerSocketHandler {
public void addEventListener(AbstractPeerEventListener listener, Executor executor) {
addBlocksDownloadedEventListener(executor, listener);
addChainDownloadStartedEventListener(executor, listener);
addConnectionEventListener(executor, listener);
addConnectedEventListener(executor, listener);
addDisconnectedEventListener(executor, listener);
addGetDataEventListener(executor, listener);
addOnTransactionBroadcastListener(executor, listener);
addPreMessageReceivedEventListener(executor, listener);
@ -284,7 +274,8 @@ public class Peer extends PeerSocketHandler {
public void removeEventListener(AbstractPeerEventListener listener) {
removeBlocksDownloadedEventListener(listener);
removeChainDownloadStartedEventListener(listener);
removeConnectionEventListener(listener);
removeConnectedEventListener(listener);
removeDisconnectedEventListener(listener);
removeGetDataEventListener(listener);
removeOnTransactionBroadcastListener(listener);
removePreMessageReceivedEventListener(listener);
@ -310,14 +301,24 @@ public class Peer extends PeerSocketHandler {
chainDownloadStartedEventListeners.add(new ListenerRegistration(listener, executor));
}
/** Registers a listener that is invoked when a peer is connected or disconnected. */
public void addConnectionEventListener(PeerConnectionEventListener listener) {
addConnectionEventListener(Threading.USER_THREAD, listener);
/** Registers a listener that is invoked when a peer is connected. */
public void addConnectedEventListener(PeerConnectedEventListener listener) {
addConnectedEventListener(Threading.USER_THREAD, listener);
}
/** Registers a listener that is invoked when a peer is connected or disconnected. */
public void addConnectionEventListener(Executor executor, PeerConnectionEventListener listener) {
connectionEventListeners.add(new PeerConnectionListenerRegistration(listener, executor));
/** Registers a listener that is invoked when a peer is connected. */
public void addConnectedEventListener(Executor executor, PeerConnectedEventListener listener) {
connectedEventListeners.add(new ListenerRegistration(listener, executor));
}
/** Registers a listener that is invoked when a peer is disconnected. */
public void addDisconnectedEventListener(PeerDisconnectedEventListener listener) {
addDisconnectedEventListener(Threading.USER_THREAD, listener);
}
/** Registers a listener that is invoked when a peer is disconnected. */
public void addDisconnectedEventListener(Executor executor, PeerDisconnectedEventListener listener) {
disconnectedEventListeners.add(new ListenerRegistration(listener, executor));
}
/** Registers a listener that is called when messages are received. */
@ -350,11 +351,6 @@ public class Peer extends PeerSocketHandler {
preMessageReceivedEventListeners.add(new ListenerRegistration<PreMessageReceivedEventListener>(listener, executor));
}
// Package-local version for PeerGroup
void addConnectionEventListenerWithoutOnDisconnect(Executor executor, PeerConnectionEventListener listener) {
connectionEventListeners.add(new PeerConnectionListenerRegistration(listener, executor, false));
}
public boolean removeBlocksDownloadedEventListener(BlocksDownloadedEventListener listener) {
return ListenerRegistration.removeFromList(listener, blocksDownloadedEventListeners);
}
@ -363,8 +359,12 @@ public class Peer extends PeerSocketHandler {
return ListenerRegistration.removeFromList(listener, chainDownloadStartedEventListeners);
}
public boolean removeConnectionEventListener(PeerConnectionEventListener listener) {
return ListenerRegistration.removeFromList(listener, connectionEventListeners);
public boolean removeConnectedEventListener(PeerConnectedEventListener listener) {
return ListenerRegistration.removeFromList(listener, connectedEventListeners);
}
public boolean removeDisconnectedEventListener(PeerDisconnectedEventListener listener) {
return ListenerRegistration.removeFromList(listener, disconnectedEventListeners);
}
public boolean removeGetDataEventListener(GetDataEventListener listener) {
@ -396,14 +396,13 @@ public class Peer extends PeerSocketHandler {
@Override
public void connectionClosed() {
for (final PeerConnectionListenerRegistration registration : connectionEventListeners) {
if (registration.callOnDisconnect)
registration.executor.execute(new Runnable() {
@Override
public void run() {
registration.listener.onPeerDisconnected(Peer.this, 0);
}
});
for (final ListenerRegistration<PeerDisconnectedEventListener> registration : disconnectedEventListeners) {
registration.executor.execute(new Runnable() {
@Override
public void run() {
registration.listener.onPeerDisconnected(Peer.this, 0);
}
});
}
}
@ -491,7 +490,7 @@ public class Peer extends PeerSocketHandler {
}
isAcked = true;
this.setTimeoutEnabled(false);
for (final ListenerRegistration<PeerConnectionEventListener> registration : connectionEventListeners) {
for (final ListenerRegistration<PeerConnectedEventListener> registration : connectedEventListeners) {
registration.executor.execute(new Runnable() {
@Override
public void run() {

View file

@ -120,9 +120,15 @@ public class PeerGroup implements TransactionBroadcaster {
= new CopyOnWriteArrayList<ListenerRegistration<BlocksDownloadedEventListener>>();
private final CopyOnWriteArrayList<ListenerRegistration<ChainDownloadStartedEventListener>> peersChainDownloadStartedEventListeners
= new CopyOnWriteArrayList<ListenerRegistration<ChainDownloadStartedEventListener>>();
/** Callbacks for events related to peers connecting */
protected final CopyOnWriteArrayList<ListenerRegistration<PeerConnectedEventListener>> peerConnectedEventListeners
= new CopyOnWriteArrayList<ListenerRegistration<PeerConnectedEventListener>>();
/** Callbacks for events related to peer connection/disconnection */
protected final CopyOnWriteArrayList<ListenerRegistration<PeerConnectionEventListener>> peerConnectionEventListeners
= new CopyOnWriteArrayList<ListenerRegistration<PeerConnectionEventListener>>();
protected final CopyOnWriteArrayList<ListenerRegistration<PeerDiscoveredEventListener>> peerDiscoveredEventListeners
= new CopyOnWriteArrayList<ListenerRegistration<PeerDiscoveredEventListener>>();
/** Callbacks for events related to peers disconnecting */
protected final CopyOnWriteArrayList<ListenerRegistration<PeerDisconnectedEventListener>> peerDisconnectedEventListeners
= new CopyOnWriteArrayList<ListenerRegistration<PeerDisconnectedEventListener>>();
/** Callbacks for events related to peer data being received */
private final CopyOnWriteArrayList<ListenerRegistration<GetDataEventListener>> peerGetDataEventListeners
= new CopyOnWriteArrayList<ListenerRegistration<GetDataEventListener>>();
@ -241,7 +247,7 @@ public class PeerGroup implements TransactionBroadcaster {
}
}
private class PeerStartupListener extends AbstractPeerConnectionEventListener {
private class PeerStartupListener implements PeerConnectedEventListener, PeerDisconnectedEventListener {
@Override
public void onPeerConnected(Peer peer, int peerCount) {
handleNewPeer(peer);
@ -254,7 +260,7 @@ public class PeerGroup implements TransactionBroadcaster {
}
}
private final PeerConnectionEventListener startupListener = new PeerStartupListener();
private final PeerStartupListener startupListener = new PeerStartupListener();
/**
* The default Bloom filter false positive rate, which is selected to be extremely low such that you hardly ever
@ -704,7 +710,9 @@ public class PeerGroup implements TransactionBroadcaster {
public void addEventListener(AbstractPeerEventListener listener, Executor executor) {
addBlocksDownloadedEventListener(Threading.USER_THREAD, listener);
addChainDownloadStartedEventListener(Threading.USER_THREAD, listener);
addConnectionEventListener(Threading.USER_THREAD, listener);
addConnectedEventListener(Threading.USER_THREAD, listener);
addDisconnectedEventListener(Threading.USER_THREAD, listener);
addDiscoveredEventListener(Threading.USER_THREAD, listener);
addGetDataEventListener(Threading.USER_THREAD, listener);
addOnTransactionBroadcastListener(Threading.USER_THREAD, listener);
addPreMessageReceivedEventListener(Threading.USER_THREAD, listener);
@ -715,7 +723,9 @@ public class PeerGroup implements TransactionBroadcaster {
public void addEventListener(AbstractPeerEventListener listener) {
addBlocksDownloadedEventListener(executor, listener);
addChainDownloadStartedEventListener(executor, listener);
addConnectionEventListener(executor, listener);
addConnectedEventListener(executor, listener);
addDisconnectedEventListener(executor, listener);
addDiscoveredEventListener(executor, listener);
addGetDataEventListener(executor, listener);
addOnTransactionBroadcastListener(executor, listener);
addPreMessageReceivedEventListener(executor, listener);
@ -756,26 +766,51 @@ public class PeerGroup implements TransactionBroadcaster {
peer.addChainDownloadStartedEventListener(executor, listener);
}
/** See {@link Peer#addConnectionEventListener(PeerConnectionEventListener)} */
public void addConnectionEventListener(PeerConnectionEventListener listener) {
addConnectionEventListener(Threading.USER_THREAD, listener);
/** See {@link Peer#addConnectedEventListener(PeerConnectedEventListener)} */
public void addConnectedEventListener(PeerConnectedEventListener listener) {
addConnectedEventListener(Threading.USER_THREAD, listener);
}
/**
* <p>Adds a listener that will be notified on the given executor when:</p>
* <ol>
* <li>New peers are discovered.</li>
* <li>New peers are connected to.</li>
* <li>Peers are disconnected from.</li>
* </li>
* </ol>
* <p>Adds a listener that will be notified on the given executor when
* new peers are connected to.</p>
*/
public void addConnectionEventListener(Executor executor, PeerConnectionEventListener listener) {
peerConnectionEventListeners.add(new ListenerRegistration<PeerConnectionEventListener>(checkNotNull(listener), executor));
public void addConnectedEventListener(Executor executor, PeerConnectedEventListener listener) {
peerConnectedEventListeners.add(new ListenerRegistration<PeerConnectedEventListener>(checkNotNull(listener), executor));
for (Peer peer : getConnectedPeers())
peer.addConnectionEventListener(executor, listener);
peer.addConnectedEventListener(executor, listener);
for (Peer peer : getPendingPeers())
peer.addConnectionEventListener(executor, listener);
peer.addConnectedEventListener(executor, listener);
}
/** See {@link Peer#addDisconnectedEventListener(PeerDisconnectedEventListener)} */
public void addDisconnectedEventListener(PeerDisconnectedEventListener listener) {
addDisconnectedEventListener(Threading.USER_THREAD, listener);
}
/**
* <p>Adds a listener that will be notified on the given executor when
* peers are disconnected from.</p>
*/
public void addDisconnectedEventListener(Executor executor, PeerDisconnectedEventListener listener) {
peerDisconnectedEventListeners.add(new ListenerRegistration<PeerDisconnectedEventListener>(checkNotNull(listener), executor));
for (Peer peer : getConnectedPeers())
peer.addDisconnectedEventListener(executor, listener);
for (Peer peer : getPendingPeers())
peer.addDisconnectedEventListener(executor, listener);
}
/** See {@link Peer#addDiscoveredEventListener(PeerDiscoveredEventListener)} */
public void addDiscoveredEventListener(PeerDiscoveredEventListener listener) {
addDiscoveredEventListener(Threading.USER_THREAD, listener);
}
/**
* <p>Adds a listener that will be notified on the given executor when new
* peers are discovered.</p>
*/
public void addDiscoveredEventListener(Executor executor, PeerDiscoveredEventListener listener) {
peerDiscoveredEventListeners.add(new ListenerRegistration<PeerDiscoveredEventListener>(checkNotNull(listener), executor));
}
/** See {@link Peer#addGetDataEventListener(GetDataEventListener)} */
@ -825,7 +860,9 @@ public class PeerGroup implements TransactionBroadcaster {
public void removeEventListener(AbstractPeerEventListener listener) {
removeBlocksDownloadedEventListener(listener);
removeChainDownloadStartedEventListener(listener);
removeConnectionEventListener(listener);
removeConnectedEventListener(listener);
removeDisconnectedEventListener(listener);
removeDiscoveredEventListener(listener);
removeGetDataEventListener(listener);
removeOnTransactionBroadcastListener(listener);
removePreMessageReceivedEventListener(listener);
@ -850,12 +887,28 @@ public class PeerGroup implements TransactionBroadcaster {
}
/** The given event listener will no longer be called with events. */
public boolean removeConnectionEventListener(PeerConnectionEventListener listener) {
boolean result = ListenerRegistration.removeFromList(listener, peerConnectionEventListeners);
public boolean removeConnectedEventListener(PeerConnectedEventListener listener) {
boolean result = ListenerRegistration.removeFromList(listener, peerConnectedEventListeners);
for (Peer peer : getConnectedPeers())
peer.removeConnectionEventListener(listener);
peer.removeConnectedEventListener(listener);
for (Peer peer : getPendingPeers())
peer.removeConnectionEventListener(listener);
peer.removeConnectedEventListener(listener);
return result;
}
/** The given event listener will no longer be called with events. */
public boolean removeDisconnectedEventListener(PeerDisconnectedEventListener listener) {
boolean result = ListenerRegistration.removeFromList(listener, peerDisconnectedEventListeners);
for (Peer peer : getConnectedPeers())
peer.removeDisconnectedEventListener(listener);
for (Peer peer : getPendingPeers())
peer.removeDisconnectedEventListener(listener);
return result;
}
/** The given event listener will no longer be called with events. */
public boolean removeDiscoveredEventListener(PeerDiscoveredEventListener listener) {
boolean result = ListenerRegistration.removeFromList(listener, peerDiscoveredEventListeners);
return result;
}
@ -999,7 +1052,7 @@ public class PeerGroup implements TransactionBroadcaster {
addInactive(address);
}
final ImmutableSet<PeerAddress> peersDiscoveredSet = ImmutableSet.copyOf(addressList);
for (final ListenerRegistration<PeerConnectionEventListener> registration : peerConnectionEventListeners /* COW */) {
for (final ListenerRegistration<PeerDiscoveredEventListener> registration : peerDiscoveredEventListeners /* COW */) {
registration.executor.execute(new Runnable() {
@Override
public void run() {
@ -1427,7 +1480,8 @@ public class PeerGroup implements TransactionBroadcaster {
ver.time = Utils.currentTimeSeconds();
Peer peer = createPeer(address, ver);
peer.addConnectionEventListener(Threading.SAME_THREAD, startupListener);
peer.addConnectedEventListener(Threading.SAME_THREAD, startupListener);
peer.addDisconnectedEventListener(Threading.SAME_THREAD, startupListener);
peer.setMinProtocolVersion(vMinRequiredProtocolVersion);
pendingPeers.add(peer);
@ -1576,8 +1630,9 @@ public class PeerGroup implements TransactionBroadcaster {
peer.addBlocksDownloadedEventListener(registration.executor, registration.listener);
for (ListenerRegistration<ChainDownloadStartedEventListener> registration : peersChainDownloadStartedEventListeners)
peer.addChainDownloadStartedEventListener(registration.executor, registration.listener);
for (ListenerRegistration<PeerConnectionEventListener> registration : peerConnectionEventListeners)
peer.addConnectionEventListenerWithoutOnDisconnect(registration.executor, registration.listener);
for (ListenerRegistration<PeerConnectedEventListener> registration : peerConnectedEventListeners)
peer.addConnectedEventListener(registration.executor, registration.listener);
// We intentionally do not add disconnect listeners to peers
for (ListenerRegistration<GetDataEventListener> registration : peerGetDataEventListeners)
peer.addGetDataEventListener(registration.executor, registration.listener);
for (ListenerRegistration<OnTransactionBroadcastListener> registration : peersTransactionBroadastEventListeners)
@ -1589,7 +1644,7 @@ public class PeerGroup implements TransactionBroadcaster {
}
final int fNewSize = newSize;
for (final ListenerRegistration<PeerConnectionEventListener> registration : peerConnectionEventListeners) {
for (final ListenerRegistration<PeerConnectedEventListener> registration : peerConnectedEventListeners) {
registration.executor.execute(new Runnable() {
@Override
public void run() {
@ -1763,14 +1818,14 @@ public class PeerGroup implements TransactionBroadcaster {
peer.removePreMessageReceivedEventListener(registration.listener);
for (ListenerRegistration<OnTransactionBroadcastListener> registration : peersTransactionBroadastEventListeners)
peer.removeOnTransactionBroadcastListener(registration.listener);
for (final ListenerRegistration<PeerConnectionEventListener> registration : peerConnectionEventListeners) {
for (final ListenerRegistration<PeerDisconnectedEventListener> registration : peerDisconnectedEventListeners) {
registration.executor.execute(new Runnable() {
@Override
public void run() {
registration.listener.onPeerDisconnected(peer, fNumConnectedPeers);
}
});
peer.removeConnectionEventListener(registration.listener);
peer.removeDisconnectedEventListener(registration.listener);
}
}
@ -1961,13 +2016,13 @@ public class PeerGroup implements TransactionBroadcaster {
return Futures.immediateFuture(foundPeers);
}
final SettableFuture<List<Peer>> future = SettableFuture.create();
addConnectionEventListener(new AbstractPeerConnectionEventListener() {
addConnectedEventListener(new PeerConnectedEventListener() {
@Override
public void onPeerConnected(Peer peer, int peerCount) {
final List<Peer> peers = findPeersOfAtLeastVersion(protocolVersion);
if (peers.size() >= numPeers) {
future.set(peers);
removeConnectionEventListener(this);
removeConnectedEventListener(this);
}
}
});
@ -2005,13 +2060,13 @@ public class PeerGroup implements TransactionBroadcaster {
if (foundPeers.size() >= numPeers)
return Futures.immediateFuture(foundPeers);
final SettableFuture<List<Peer>> future = SettableFuture.create();
addConnectionEventListener(new AbstractPeerConnectionEventListener() {
addConnectedEventListener(new PeerConnectedEventListener() {
@Override
public void onPeerConnected(Peer peer, int peerCount) {
final List<Peer> peers = findPeersWithServiceMask(mask);
if (peers.size() >= numPeers) {
future.set(peers);
removeConnectionEventListener(this);
removeConnectedEventListener(this);
}
}
});

View file

@ -0,0 +1,34 @@
/**
* Copyright 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bitcoinj.core.listeners;
import org.bitcoinj.core.Peer;
/**
* <p>Implementors can listen to events indicating a new peer connecting.</p>
*/
public interface PeerConnectedEventListener {
/**
* Called when a peer is connected. If this listener is registered to a {@link Peer} instead of a {@link PeerGroup},
* peerCount will always be 1.
*
* @param peer
* @param peerCount the total number of connected peers
*/
void onPeerConnected(Peer peer, int peerCount);
}

View file

@ -17,31 +17,15 @@
package org.bitcoinj.core.listeners;
import org.bitcoinj.core.Peer;
import org.bitcoinj.core.PeerAddress;
import java.util.Set;
/**
* <p>Implementors can listen to events like blocks being downloaded/transactions being broadcast/connect/disconnects,
* they can pre-filter messages before they are processed by a {@link Peer} or {@link PeerGroup}, and they can
* provide transactions to remote peers when they ask for them.</p>
* <p>Implementors can listen to events like peer discovery, connect or disconnects.</p>
*
* @deprecated Use the single event interfaces instead
*/
public interface PeerConnectionEventListener {
/**
* <p>Called when peers are discovered, this happens at startup of {@link PeerGroup} or if we run out of
* suitable {@link Peer}s to connect to.</p>
*
* @param peerAddresses the set of discovered {@link PeerAddress}es
*/
void onPeersDiscovered(Set<PeerAddress> peerAddresses);
/**
* Called when a peer is connected. If this listener is registered to a {@link Peer} instead of a {@link PeerGroup},
* peerCount will always be 1.
*
* @param peer
* @param peerCount the total number of connected peers
*/
void onPeerConnected(Peer peer, int peerCount);
@Deprecated
public interface PeerConnectionEventListener extends PeerConnectedEventListener,
PeerDiscoveredEventListener, PeerDisconnectedEventListener {
/**
* Called when a peer is disconnected. Note that this won't be called if the listener is registered on a

View file

@ -0,0 +1,36 @@
/**
* Copyright 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bitcoinj.core.listeners;
import org.bitcoinj.core.Peer;
/**
* <p>Implementors can listen to events indicating a peer disconnecting.</p>
*/
public interface PeerDisconnectedEventListener {
/**
* Called when a peer is disconnected. Note that this won't be called if the listener is registered on a
* {@link PeerGroup} and the group is in the process of shutting down. If this listener is registered to a
* {@link Peer} instead of a {@link PeerGroup}, peerCount will always be 0. This handler can be called without
* a corresponding invocation of onPeerConnected if the initial connection is never successful.
*
* @param peer
* @param peerCount the total number of connected peers
*/
void onPeerDisconnected(Peer peer, int peerCount);
}

View file

@ -0,0 +1,34 @@
/**
* Copyright 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bitcoinj.core.listeners;
import org.bitcoinj.core.Peer;
import org.bitcoinj.core.PeerAddress;
import java.util.Set;
/**
* <p>Implementors can listen to events for peers being discovered.</p>
*/
public interface PeerDiscoveredEventListener {
/**
* <p>Called when peers are discovered, this happens at startup of {@link PeerGroup} or if we run out of
* suitable {@link Peer}s to connect to.</p>
*
* @param peerAddresses the set of discovered {@link PeerAddress}es
*/
void onPeersDiscovered(Set<PeerAddress> peerAddresses);
}

View file

@ -87,7 +87,7 @@ public class BitcoindComparisonTool {
final Set<Sha256Hash> blocksPendingSend = Collections.synchronizedSet(new HashSet<Sha256Hash>());
final AtomicInteger unexpectedInvs = new AtomicInteger(0);
final SettableFuture<Void> connectedFuture = SettableFuture.create();
final PeerConnectionEventListener listener = new PeerConnectionEventListener() {
bitcoind.addConnectedEventListener(Threading.SAME_THREAD, new PeerConnectedEventListener() {
@Override
public void onPeerConnected(Peer peer, int peerCount) {
if (!peer.getPeerVersionMessage().subVer.contains("Satoshi")) {
@ -107,20 +107,17 @@ public class BitcoindComparisonTool {
bitcoind.startBlockChainDownload();
connectedFuture.set(null);
}
});
bitcoind.addDisconnectedEventListener(Threading.SAME_THREAD, new PeerDisconnectedEventListener() {
@Override
public void onPeerDisconnected(Peer peer, int peerCount) {
log.error("bitcoind node disconnected!");
System.exit(1);
}
});
@Override
public void onPeersDiscovered(Set<PeerAddress> peerAddresses) {
// Ignore
}
};
final PreMessageReceivedEventListener preMessageReceivedListener = new PreMessageReceivedEventListener() {
bitcoind.addPreMessageReceivedEventListener(Threading.SAME_THREAD, new PreMessageReceivedEventListener() {
@Override
public Message onPreMessageReceived(Peer peer, Message m) {
if (m instanceof HeadersMessage) {
@ -201,10 +198,7 @@ public class BitcoindComparisonTool {
}
return m;
}
};
bitcoind.addConnectionEventListener(Threading.SAME_THREAD, listener);
bitcoind.addPreMessageReceivedEventListener(Threading.SAME_THREAD, preMessageReceivedListener);
});
bitcoindChainHead = params.getGenesisBlock().getHash();

View file

@ -47,7 +47,18 @@ public class PeerGroupTest extends TestWithPeerGroup {
private BlockingQueue<Peer> connectedPeers;
private BlockingQueue<Peer> disconnectedPeers;
private PeerConnectionEventListener listener;
private PeerConnectedEventListener connectedListener = new PeerConnectedEventListener() {
@Override
public void onPeerConnected(Peer peer, int peerCount) {
connectedPeers.add(peer);
}
};
private PeerDisconnectedEventListener disconnectedListener = new PeerDisconnectedEventListener() {
@Override
public void onPeerDisconnected(Peer peer, int peerCount) {
disconnectedPeers.add(peer);
}
};
private PreMessageReceivedEventListener preMessageReceivedListener;
private Map<Peer, AtomicInteger> peerToMessageCount;
@ -68,23 +79,6 @@ public class PeerGroupTest extends TestWithPeerGroup {
peerToMessageCount = new HashMap<Peer, AtomicInteger>();
connectedPeers = new LinkedBlockingQueue<Peer>();
disconnectedPeers = new LinkedBlockingQueue<Peer>();
listener = new PeerConnectionEventListener() {
@Override
public void onPeerConnected(Peer peer, int peerCount) {
connectedPeers.add(peer);
}
@Override
public void onPeerDisconnected(Peer peer, int peerCount) {
disconnectedPeers.add(peer);
}
@Override
public void onPeersDiscovered(Set<PeerAddress> peerAddresses) {
// Ignore
}
};
preMessageReceivedListener = new PreMessageReceivedEventListener() {
@Override
public Message onPreMessageReceived(Peer peer, Message m) {
@ -108,7 +102,8 @@ public class PeerGroupTest extends TestWithPeerGroup {
@Test
public void listener() throws Exception {
peerGroup.addConnectionEventListener(listener);
peerGroup.addConnectedEventListener(connectedListener);
peerGroup.addDisconnectedEventListener(disconnectedListener);
peerGroup.addPreMessageReceivedEventListener(preMessageReceivedListener);
peerGroup.start();
@ -130,8 +125,10 @@ public class PeerGroupTest extends TestWithPeerGroup {
disconnectedPeers.take();
assertEquals(0, disconnectedPeers.size());
assertTrue(peerGroup.removeConnectionEventListener(listener));
assertFalse(peerGroup.removeConnectionEventListener(listener));
assertTrue(peerGroup.removeConnectedEventListener(connectedListener));
assertFalse(peerGroup.removeConnectedEventListener(connectedListener));
assertTrue(peerGroup.removeDisconnectedEventListener(disconnectedListener));
assertFalse(peerGroup.removeDisconnectedEventListener(disconnectedListener));
assertTrue(peerGroup.removePreMessageReceivedEventListener(preMessageReceivedListener));
assertFalse(peerGroup.removePreMessageReceivedEventListener(preMessageReceivedListener));
}
@ -189,7 +186,7 @@ public class PeerGroupTest extends TestWithPeerGroup {
peerGroup.addPeerDiscovery(createPeerDiscovery(96, 200));
peerGroup.addPeerDiscovery(createPeerDiscovery(3, 300));
peerGroup.addPeerDiscovery(createPeerDiscovery(1, 400));
peerGroup.addConnectionEventListener(new AbstractPeerConnectionEventListener() {
peerGroup.addDiscoveredEventListener(new PeerDiscoveredEventListener() {
@Override
public void onPeersDiscovered(Set<PeerAddress> peerAddresses) {
assertEquals(99, peerAddresses.size());
@ -299,7 +296,7 @@ public class PeerGroupTest extends TestWithPeerGroup {
assertNull(outbound(p2));
// Peer 1 goes away, peer 2 becomes the download peer and thus queries the remote mempool.
final SettableFuture<Void> p1CloseFuture = SettableFuture.create();
peerOf(p1).addConnectionEventListener(new AbstractPeerConnectionEventListener() {
peerOf(p1).addDisconnectedEventListener(new PeerDisconnectedEventListener() {
@Override
public void onPeerDisconnected(Peer peer, int peerCount) {
p1CloseFuture.set(null);
@ -504,12 +501,13 @@ public class PeerGroupTest extends TestWithPeerGroup {
final SettableFuture<Void> peerConnectedFuture = SettableFuture.create();
final SettableFuture<Void> peerDisconnectedFuture = SettableFuture.create();
peerGroup.addConnectionEventListener(Threading.SAME_THREAD, new AbstractPeerConnectionEventListener() {
peerGroup.addConnectedEventListener(Threading.SAME_THREAD, new PeerConnectedEventListener() {
@Override
public void onPeerConnected(Peer peer, int peerCount) {
peerConnectedFuture.set(null);
}
});
peerGroup.addDisconnectedEventListener(Threading.SAME_THREAD, new PeerDisconnectedEventListener() {
@Override
public void onPeerDisconnected(Peer peer, int peerCount) {
peerDisconnectedFuture.set(null);
@ -539,7 +537,8 @@ public class PeerGroupTest extends TestWithPeerGroup {
new InetSocketAddress("localhost", 2001),
new InetSocketAddress("localhost", 2002)
);
peerGroup.addConnectionEventListener(listener);
peerGroup.addConnectedEventListener(connectedListener);
peerGroup.addDisconnectedEventListener(disconnectedListener);
peerGroup.addPreMessageReceivedEventListener(preMessageReceivedListener);
peerGroup.addPeerDiscovery(new PeerDiscovery() {
@Override

View file

@ -102,13 +102,26 @@ public class PeerTest extends TestWithNetworkConnections {
}
@Test
public void testAddEventListener() throws Exception {
public void testAddConnectedEventListener() throws Exception {
connect();
PeerConnectionEventListener listener = new AbstractPeerConnectionEventListener() {
PeerConnectedEventListener listener = new AbstractPeerConnectionEventListener() {
};
peer.addConnectionEventListener(listener);
assertTrue(peer.removeConnectionEventListener(listener));
assertFalse(peer.removeConnectionEventListener(listener));
assertFalse(peer.removeConnectedEventListener(listener));
peer.addConnectedEventListener(listener);
assertTrue(peer.removeConnectedEventListener(listener));
assertFalse(peer.removeConnectedEventListener(listener));
}
@Test
public void testAddDisconnectedEventListener() throws Exception {
connect();
PeerDisconnectedEventListener listener = new AbstractPeerConnectionEventListener() {
};
assertFalse(peer.removeDisconnectedEventListener(listener));
peer.addDisconnectedEventListener(listener);
assertTrue(peer.removeDisconnectedEventListener(listener));
assertFalse(peer.removeDisconnectedEventListener(listener));
}
// Check that it runs through the event loop and shut down correctly
@ -749,12 +762,14 @@ public class PeerTest extends TestWithNetworkConnections {
// Set up the connection with an old version.
final SettableFuture<Void> connectedFuture = SettableFuture.create();
final SettableFuture<Void> disconnectedFuture = SettableFuture.create();
peer.addConnectionEventListener(new AbstractPeerConnectionEventListener() {
peer.addConnectedEventListener(new PeerConnectedEventListener() {
@Override
public void onPeerConnected(Peer peer, int peerCount) {
connectedFuture.set(null);
}
});
peer.addDisconnectedEventListener(new PeerDisconnectedEventListener() {
@Override
public void onPeerDisconnected(Peer peer, int peerCount) {
disconnectedFuture.set(null);
@ -859,7 +874,7 @@ public class PeerTest extends TestWithNetworkConnections {
};
connect(); // Writes out a verack+version.
final SettableFuture<Void> peerDisconnected = SettableFuture.create();
writeTarget.peer.addConnectionEventListener(new AbstractPeerConnectionEventListener() {
writeTarget.peer.addDisconnectedEventListener(new PeerDisconnectedEventListener() {
@Override
public void onPeerDisconnected(Peer p, int peerCount) {
peerDisconnected.set(null);

View file

@ -17,7 +17,8 @@
package org.bitcoinj.testing;
import org.bitcoinj.core.listeners.AbstractPeerConnectionEventListener;
import org.bitcoinj.core.listeners.PeerDisconnectedEventListener;
import org.bitcoinj.core.listeners.PreMessageReceivedEventListener;
import org.bitcoinj.core.*;
import org.bitcoinj.net.*;
import org.bitcoinj.params.UnitTestParams;
@ -39,7 +40,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import org.bitcoinj.core.listeners.PreMessageReceivedEventListener;
/**
* Utility class that makes it easy to work with mock NetworkConnections.
@ -149,7 +149,7 @@ public class TestWithNetworkConnections {
checkArgument(versionMessage.hasBlockChain());
final AtomicBoolean doneConnecting = new AtomicBoolean(false);
final Thread thisThread = Thread.currentThread();
peer.addConnectionEventListener(new AbstractPeerConnectionEventListener() {
peer.addDisconnectedEventListener(new PeerDisconnectedEventListener() {
@Override
public void onPeerDisconnected(Peer p, int peerCount) {
synchronized (doneConnecting) {

View file

@ -17,7 +17,8 @@
package org.bitcoinj.examples;
import org.bitcoinj.core.listeners.AbstractPeerEventListener;
import org.bitcoinj.core.listeners.PeerConnectedEventListener;
import org.bitcoinj.core.listeners.PeerDisconnectedEventListener;
import org.bitcoinj.core.NetworkParameters;
import org.bitcoinj.core.Peer;
import org.bitcoinj.core.PeerGroup;
@ -67,13 +68,14 @@ public class PeerMonitor {
peerGroup.setUserAgent("PeerMonitor", "1.0");
peerGroup.setMaxConnections(4);
peerGroup.addPeerDiscovery(new DnsDiscovery(params));
peerGroup.addConnectionEventListener(new AbstractPeerEventListener() {
peerGroup.addConnectedEventListener(new PeerConnectedEventListener() {
@Override
public void onPeerConnected(final Peer peer, int peerCount) {
refreshUI();
lookupReverseDNS(peer);
}
});
peerGroup.addDisconnectedEventListener(new PeerDisconnectedEventListener() {
@Override
public void onPeerDisconnected(final Peer peer, int peerCount) {
refreshUI();

View file

@ -16,7 +16,8 @@
package org.bitcoinj.examples;
import org.bitcoinj.core.listeners.AbstractPeerEventListener;
import org.bitcoinj.core.listeners.PeerConnectedEventListener;
import org.bitcoinj.core.listeners.PeerDisconnectedEventListener;
import org.bitcoinj.core.NetworkParameters;
import org.bitcoinj.core.Peer;
import org.bitcoinj.core.PeerAddress;
@ -84,7 +85,7 @@ public class PrintPeers {
final Peer peer = new Peer(params, new VersionMessage(params, 0), null, new PeerAddress(address));
final SettableFuture<Void> future = SettableFuture.create();
// Once the connection has completed version handshaking ...
peer.addConnectionEventListener(new AbstractPeerEventListener() {
peer.addConnectedEventListener(new PeerConnectedEventListener() {
@Override
public void onPeerConnected(Peer p, int peerCount) {
// Check the chain height it claims to have.
@ -106,7 +107,8 @@ public class PrintPeers {
future.set(null);
peer.close();
}
});
peer.addDisconnectedEventListener(new PeerDisconnectedEventListener() {
@Override
public void onPeerDisconnected(Peer p, int peerCount) {
if (!future.isDone())

View file

@ -15,7 +15,8 @@
package org.bitcoinj.tools;
import org.bitcoinj.core.*;
import org.bitcoinj.core.listeners.PeerConnectionEventListener;
import org.bitcoinj.core.listeners.PeerConnectedEventListener;
import org.bitcoinj.core.listeners.PeerDisconnectedEventListener;
import org.bitcoinj.kits.WalletAppKit;
import org.bitcoinj.params.MainNetParams;
import org.bitcoinj.utils.BriefLogFormatter;
@ -80,17 +81,13 @@ public class TestFeeLevel {
System.out.println("Size in bytes is " + request.tx.bitcoinSerialize().length);
System.out.println("TX is " + request.tx);
System.out.println("Waiting for " + kit.peerGroup().getMaxConnections() + " connected peers");
kit.peerGroup().addConnectionEventListener(new PeerConnectionEventListener() {
@Override
public void onPeersDiscovered(Set<PeerAddress> peerAddresses) {
}
kit.peerGroup().addDisconnectedEventListener(new PeerDisconnectedEventListener() {
@Override
public void onPeerDisconnected(Peer peer, int peerCount) {
System.out.println(peerCount + " peers connected");
}
});
kit.peerGroup().addConnectedEventListener(new PeerConnectedEventListener() {
@Override
public void onPeerConnected(Peer peer, int peerCount) {
System.out.println(peerCount + " peers connected");