Implement better support for multiple peer discoveries. Resolves issue 302.

This commit is contained in:
Mike Rosseel 2014-09-26 17:54:59 +02:00 committed by Mike Hearn
parent 2834b7730f
commit be496b95a3
5 changed files with 85 additions and 2 deletions

View File

@ -17,11 +17,16 @@
package org.bitcoinj.core;
import java.util.List;
import java.util.Set;
/**
* Convenience implementation of {@link PeerEventListener}.
*/
public class AbstractPeerEventListener implements PeerEventListener {
@Override
public void onPeersDiscovered(Set<PeerAddress> peerAddresses) {
}
@Override
public void onBlocksDownloaded(Peer peer, Block block, int blocksLeft) {
}

View File

@ -18,6 +18,7 @@ package org.bitcoinj.core;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Set;
/**
* <p>Implementors can listen to events like blocks being downloaded/transactions being broadcast/connect/disconnects,
@ -25,6 +26,14 @@ import java.util.List;
* provide transactions to remote peers when they ask for them.</p>
*/
public interface PeerEventListener {
/**
* <p>Called when peers are discovered, this happens at startup of {@link PeerGroup} or if we run out of
* suitable {@link Peer}s to connect to.</p>
*
* @param peerAddresses the set of discovered {@link PeerAddress}es
*/
public void onPeersDiscovered(Set<PeerAddress> peerAddresses);
/**
* Called on a Peer thread when a block is received.<p>
*

View File

@ -32,6 +32,7 @@ import org.bitcoinj.utils.Threading;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.net.InetAddresses;
import com.google.common.primitives.Ints;
@ -80,6 +81,7 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
private static final Logger log = LoggerFactory.getLogger(PeerGroup.class);
private static final int DEFAULT_CONNECTIONS = 4;
private static final int TOR_TIMEOUT_SECONDS = 60;
private int maxPeersToDiscoverCount = 100;
protected final ReentrantLock lock = Threading.lock("peergroup");
@ -661,12 +663,12 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
if (peerDiscoverers.isEmpty())
throw new PeerDiscoveryException("No peer discoverers registered");
long start = System.currentTimeMillis();
Set<PeerAddress> addressSet = Sets.newHashSet();
final Set<PeerAddress> addressSet = Sets.newHashSet();
for (PeerDiscovery peerDiscovery : peerDiscoverers) {
InetSocketAddress[] addresses;
addresses = peerDiscovery.getPeers(5, TimeUnit.SECONDS);
for (InetSocketAddress address : addresses) addressSet.add(new PeerAddress(address));
if (addressSet.size() > 0) break;
if (addressSet.size() >= maxPeersToDiscoverCount) break;
}
lock.lock();
try {
@ -676,6 +678,17 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
} finally {
lock.unlock();
}
final ImmutableSet<PeerAddress> peersDiscoveredSet = ImmutableSet.copyOf(addressSet);
for (final ListenerRegistration<PeerEventListener> registration : peerEventListeners) {
registration.executor.execute(new Runnable() {
@Override
public void run() {
registration.listener.onPeersDiscovered(peersDiscoveredSet);
}
});
}
log.info("Peer discovery took {}msec and returned {} items",
System.currentTimeMillis() - start, addressSet.size());
}
@ -1649,6 +1662,24 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
return torClient;
}
/**
* Returns the maximum number of {@link Peer}s to discover. This maximum is checked after
* each {@link PeerDiscovery} so this max number can be surpassed.
* @return the maximum number of peers to discover
*/
public int getMaxPeersToDiscoverCount() {
return maxPeersToDiscoverCount;
}
/**
* Sets the maximum number of {@link Peer}s to discover. This maximum is checked after
* each {@link PeerDiscovery} so this max number can be surpassed.
* @param maxPeersToDiscoverCount the maximum number of peers to discover
*/
public void setMaxPeersToDiscoverCount(int maxPeersToDiscoverCount) {
this.maxPeersToDiscoverCount = maxPeersToDiscoverCount;
}
/** See {@link #setUseLocalhostPeerWhenPossible(boolean)} */
public boolean getUseLocalhostPeerWhenPossible() {
lock.lock();

View File

@ -19,6 +19,7 @@ package org.bitcoinj.jni;
import org.bitcoinj.core.*;
import java.util.List;
import java.util.Set;
/**
* An event listener that relays events to a native C++ object. A pointer to that object is stored in
@ -28,6 +29,9 @@ import java.util.List;
public class NativePeerEventListener implements PeerEventListener {
public long ptr;
@Override
public native void onPeersDiscovered(Set<PeerAddress> peerAddresses);
@Override
public native void onBlocksDownloaded(Peer peer, Block block, int blocksLeft);

View File

@ -166,6 +166,40 @@ public class PeerGroupTest extends TestWithPeerGroup {
assertTrue(result.get());
}
// Utility method to create a PeerDiscovery with a certain number of addresses.
private PeerDiscovery createPeerDiscovery(int nrOfAddressesWanted, int port) {
final InetSocketAddress[] addresses = new InetSocketAddress[nrOfAddressesWanted];
for (int addressNr = 0; addressNr < nrOfAddressesWanted; addressNr++) {
// make each address unique by using the counter to increment the port.
addresses[addressNr] = new InetSocketAddress("localhost", port + addressNr);
}
return new PeerDiscovery() {
public InetSocketAddress[] getPeers(long unused, TimeUnit unused2) throws PeerDiscoveryException {
return addresses;
}
public void shutdown() {
}
};
}
@Test
public void multiplePeerDiscovery() throws InterruptedException {
peerGroup.setMaxPeersToDiscoverCount(98);
peerGroup.addPeerDiscovery(createPeerDiscovery(1, 0));
peerGroup.addPeerDiscovery(createPeerDiscovery(2, 100));
peerGroup.addPeerDiscovery(createPeerDiscovery(96, 200));
peerGroup.addPeerDiscovery(createPeerDiscovery(3, 300));
peerGroup.addPeerDiscovery(createPeerDiscovery(1, 400));
peerGroup.addEventListener(new AbstractPeerEventListener() {
@Override
public void onPeersDiscovered(Set<PeerAddress> peerAddresses) {
assertEquals(99, peerAddresses.size());
}
});
peerGroup.startAsync();
peerGroup.awaitRunning();
}
@Test
public void receiveTxBroadcast() throws Exception {
// Check that when we receive transactions on all our peers, we do the right thing.