From 27bc229faba80b304711fe7b5e126844c58504cc Mon Sep 17 00:00:00 2001
From: Mike Hearn
Date: Thu, 13 Nov 2014 22:45:52 +0100
Subject: [PATCH] Rewrite how peer group manages connections and its internal
thread:
- No longer uses Guava services, the change is source compatible but the two-step API is no longer needed
- Now has a dedicated ScheduledExecutorService as its core service thread, so we can schedule jobs for the future instead of using sleeps.
- Connection code was rewritten to be easier to follow (at least I think so).
The goal here is to generalise the peer group thread so it can do more things.
---
.../java/org/bitcoinj/core/PeerGroup.java | 386 ++++++++++--------
.../java/org/bitcoinj/kits/WalletAppKit.java | 22 +-
.../bitcoinj/testing/TestWithPeerGroup.java | 37 +-
.../bitcoinj/utils/DaemonThreadFactory.java | 12 +
...ilteredBlockAndPartialMerkleTreeTests.java | 1 -
.../java/org/bitcoinj/core/PeerGroupTest.java | 28 +-
6 files changed, 293 insertions(+), 193 deletions(-)
diff --git a/core/src/main/java/org/bitcoinj/core/PeerGroup.java b/core/src/main/java/org/bitcoinj/core/PeerGroup.java
index 4b30e63a6..4c0f47616 100644
--- a/core/src/main/java/org/bitcoinj/core/PeerGroup.java
+++ b/core/src/main/java/org/bitcoinj/core/PeerGroup.java
@@ -37,6 +37,7 @@ import org.bitcoinj.net.discovery.PeerDiscovery;
import org.bitcoinj.net.discovery.PeerDiscoveryException;
import org.bitcoinj.net.discovery.TorDiscovery;
import org.bitcoinj.script.Script;
+import org.bitcoinj.utils.DaemonThreadFactory;
import org.bitcoinj.utils.ExponentialBackoff;
import org.bitcoinj.utils.ListenerRegistration;
import org.bitcoinj.utils.Threading;
@@ -76,14 +77,24 @@ import static com.google.common.base.Preconditions.checkState;
* when finished. Note that not all methods of PeerGroup are safe to call from a UI thread as some may do
* network IO, but starting and stopping the service should be fine.
*/
-public class PeerGroup extends AbstractExecutionThreadService implements TransactionBroadcaster {
+public class PeerGroup implements TransactionBroadcaster {
private static final Logger log = LoggerFactory.getLogger(PeerGroup.class);
private static final int DEFAULT_CONNECTIONS = 4;
private static final int TOR_TIMEOUT_SECONDS = 60;
- private int maxPeersToDiscoverCount = 100;
+ private int vMaxPeersToDiscoverCount = 100;
protected final ReentrantLock lock = Threading.lock("peergroup");
+ // This executor is used to queue up jobs: it's used when we don't want to use locks for mutual exclusion,
+ // typically because the job might call in to user provided code that needs/wants the freedom to use the API
+ // however it wants, or because a job needs to be ordered relative to other jobs like that.
+ protected final ListeningScheduledExecutorService executor;
+
+ // Whether the peer group is currently running. Once shut down it cannot be restarted.
+ private volatile boolean vRunning;
+ // Whether the peer group has been started or not. An unstarted PG does not try to access the network.
+ private volatile boolean vUsedUp;
+
// Addresses to try to connect to, excluding active peers.
@GuardedBy("lock") private final PriorityQueue inactives;
@GuardedBy("lock") private final Map backoffMap;
@@ -180,13 +191,13 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
if (!sendIfChangedQueued) {
log.info("Queuing recalc of the Bloom filter due to new keys or scripts becoming available");
sendIfChangedQueued = true;
- Uninterruptibles.putUninterruptibly(jobQueue, bloomSendIfChanged);
+ executor.execute(bloomSendIfChanged);
}
} else {
if (!dontSendQueued) {
log.info("Queuing recalc of the Bloom filter due to observing a pay to pubkey output on a relevant tx");
dontSendQueued = true;
- Uninterruptibles.putUninterruptibly(jobQueue, bloomDontSend);
+ executor.execute(bloomDontSend);
}
}
}
@@ -238,10 +249,7 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
// Exponential backoff for peers starts at 1 second and maxes at 10 minutes.
private ExponentialBackoff.Params peerBackoffParams = new ExponentialBackoff.Params(1000, 1.5f, 10 * 60 * 1000);
// Tracks failures globally in case of a network failure.
- private ExponentialBackoff groupBackoff = new ExponentialBackoff(new ExponentialBackoff.Params(1000, 1.5f, 10 * 1000));
-
- // Things for the dedicated PeerGroup management thread to do.
- private LinkedBlockingQueue jobQueue = new LinkedBlockingQueue();
+ @GuardedBy("lock") private ExponentialBackoff groupBackoff = new ExponentialBackoff(new ExponentialBackoff.Params(1000, 1.5f, 10 * 1000));
// This is a synchronized set, so it locks on itself. We use it to prevent TransactionBroadcast objects from
// being garbage collected if nothing in the apps code holds on to them transitively. See the discussion
@@ -348,6 +356,8 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
peerFilterProviders = new CopyOnWriteArrayList();
this.torClient = torClient;
+ executor = createPrivateExecutor();
+
// This default sentinel value will be overridden by one of two actions:
// - adding a peer discovery source sets it to the default
// - using connectTo() will increment it by one
@@ -382,6 +392,23 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
bloomFilterMerger = new FilterMerger(DEFAULT_BLOOM_FILTER_FP_RATE);
}
+ private CountDownLatch executorStartupLatch = new CountDownLatch(1);
+
+ protected ListeningScheduledExecutorService createPrivateExecutor() {
+ ListeningScheduledExecutorService result = MoreExecutors.listeningDecorator(
+ new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("PeerGroup Thread"))
+ );
+ // Hack: jam the executor so jobs just queue up until the user calls start() on us. For example, adding a wallet
+ // results in a bloom filter recalc being queued, but we don't want to do that until we're actually started.
+ result.execute(new Runnable() {
+ @Override
+ public void run() {
+ Uninterruptibles.awaitUninterruptibly(executorStartupLatch);
+ }
+ });
+ return result;
+ }
+
/**
* Adjusts the desired number of connections that we will create to peers. Note that if there are already peers
* open and the new value is lower than the current number of peers, those connections will be terminated. Likewise
@@ -419,23 +446,86 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
}
private Runnable triggerConnectionsJob = new Runnable() {
+ private boolean firstRun = true;
+
@Override
public void run() {
- // We have to test the condition at the end, because during startup we need to run this at least once
- // when isRunning() can return false.
- do {
- try {
- connectToAnyPeer();
- } catch (PeerDiscoveryException e) {
- groupBackoff.trackFailure();
+ if (!vRunning) return;
+
+ boolean doDiscovery = false;
+ long now = Utils.currentTimeMillis();
+ lock.lock();
+ try {
+ // First run: try and use a local node if there is one, for the additional security it can provide.
+ // But, not on Android as there are none for this platform: it could only be a malicious app trying
+ // to hijack our traffic.
+ if (!Utils.isAndroidRuntime() && useLocalhostPeerWhenPossible && maybeCheckForLocalhostPeer() && firstRun) {
+ log.info("Localhost peer detected, trying to use it instead of P2P discovery");
+ maxConnections = 0;
+ connectToLocalHost();
+ return;
}
- } while (isRunning() && countConnectedAndPendingPeers() < getMaxConnections());
+
+ boolean havePeerWeCanTry = !inactives.isEmpty() && backoffMap.get(inactives.peek()).getRetryTime() <= now;
+ doDiscovery = !havePeerWeCanTry;
+ } finally {
+ firstRun = false;
+ lock.unlock();
+ }
+
+ // Don't hold the lock across discovery as this process can be very slow.
+ boolean discoverySuccess = false;
+ if (doDiscovery) {
+ try {
+ discoverySuccess = discoverPeers() > 0;
+ } catch (PeerDiscoveryException e) {
+ log.error("Peer discovery failure", e);
+ }
+ }
+
+ long retryTime = 0;
+ PeerAddress addrToTry = null;
+ lock.lock();
+ try {
+ if (doDiscovery) {
+ if (discoverySuccess) {
+ groupBackoff.trackSuccess();
+ } else {
+ groupBackoff.trackFailure();
+ }
+ }
+ // Inactives is sorted by backoffMap time.
+ if (inactives.isEmpty()) {
+ log.info("Peer discovery didn't provide us any more peers, will try again later.");
+ executor.schedule(this, groupBackoff.getRetryTime() - now, TimeUnit.MILLISECONDS);
+ return;
+ } else {
+ do {
+ addrToTry = inactives.poll();
+ } while (ipv6Unreachable && addrToTry.getAddr() instanceof Inet6Address);
+ retryTime = backoffMap.get(addrToTry).getRetryTime();
+ }
+ retryTime = Math.max(retryTime, groupBackoff.getRetryTime());
+ if (retryTime > now) {
+ long delay = retryTime - now;
+ log.info("Waiting {} msec before next connect attempt {}", delay, addrToTry == null ? "" : "to " + addrToTry);
+ inactives.add(addrToTry);
+ executor.schedule(this, delay, TimeUnit.MILLISECONDS);
+ return;
+ }
+ } finally {
+ lock.unlock();
+ }
+ connectTo(addrToTry, false, vConnectTimeoutMillis);
+ if (countConnectedAndPendingPeers() < getMaxConnections()) {
+ executor.execute(this); // Try next peer immediately.
+ }
}
};
private void triggerConnections() {
// Run on a background thread due to the need to potentially retry and back off in the background.
- Uninterruptibles.putUninterruptibly(jobQueue, triggerConnectionsJob);
+ executor.execute(triggerConnectionsJob);
}
/** The maximum number of connections that we will create to peers. */
@@ -641,11 +731,16 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
}
private void addInactive(PeerAddress peerAddress) {
- // Deduplicate
- if (backoffMap.containsKey(peerAddress))
- return;
- backoffMap.put(peerAddress, new ExponentialBackoff(peerBackoffParams));
- inactives.offer(peerAddress);
+ lock.lock();
+ try {
+ // Deduplicate
+ if (backoffMap.containsKey(peerAddress))
+ return;
+ backoffMap.put(peerAddress, new ExponentialBackoff(peerBackoffParams));
+ inactives.offer(peerAddress);
+ } finally {
+ lock.unlock();
+ }
}
/** Convenience method for addAddress(new PeerAddress(address, params.port)); */
@@ -668,62 +763,41 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
}
}
- protected void discoverPeers() throws PeerDiscoveryException {
- checkState(lock.isHeldByCurrentThread());
- if (peerDiscoverers.isEmpty())
- throw new PeerDiscoveryException("No peer discoverers registered");
+ /** Returns number of discovered peers. */
+ protected int discoverPeers() throws PeerDiscoveryException {
+ // Don't hold the lock whilst doing peer discovery: it can take a long time and cause high API latency.
+ checkState(!lock.isHeldByCurrentThread());
+ int maxPeersToDiscoverCount = this.vMaxPeersToDiscoverCount;
long start = System.currentTimeMillis();
final List addressList = Lists.newLinkedList();
- for (PeerDiscovery peerDiscovery : peerDiscoverers) {
+ for (PeerDiscovery peerDiscovery : peerDiscoverers /* COW */) {
InetSocketAddress[] addresses;
- // Don't hold the peergroup lock across peer discovery as it's likely to be very slow and would make the
- // peergroup API high latency.
- lock.unlock();
- try {
- addresses = peerDiscovery.getPeers(5, TimeUnit.SECONDS);
- } finally {
- lock.lock();
- }
+ addresses = peerDiscovery.getPeers(5, TimeUnit.SECONDS);
for (InetSocketAddress address : addresses) addressList.add(new PeerAddress(address));
if (addressList.size() >= maxPeersToDiscoverCount) break;
}
- for (PeerAddress address : addressList) {
- addInactive(address);
+ if (!addressList.isEmpty()) {
+ for (PeerAddress address : addressList) {
+ addInactive(address);
+ }
+ final ImmutableSet peersDiscoveredSet = ImmutableSet.copyOf(addressList);
+ for (final ListenerRegistration registration : peerEventListeners /* COW */) {
+ registration.executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ registration.listener.onPeersDiscovered(peersDiscoveredSet);
+ }
+ });
+ }
}
-
- final ImmutableSet peersDiscoveredSet = ImmutableSet.copyOf(addressList);
- for (final ListenerRegistration registration : peerEventListeners) {
- registration.executor.execute(new Runnable() {
- @Override
- public void run() {
- registration.listener.onPeersDiscovered(peersDiscoveredSet);
- }
- });
- }
-
log.info("Peer discovery took {}msec and returned {} items",
System.currentTimeMillis() - start, addressList.size());
- }
-
- @Override
- protected void run() throws Exception {
- // Runs in a background thread dedicated to the PeerGroup. Jobs are for handling peer connections with backoff,
- // and also recalculating filters.
- while (isRunning()) {
- jobQueue.take().run();
- }
+ return addressList.size();
}
@VisibleForTesting
void waitForJobQueue() {
- final CountDownLatch latch = new CountDownLatch(1);
- Uninterruptibles.putUninterruptibly(jobQueue, new Runnable() {
- @Override
- public void run() {
- latch.countDown();
- }
- });
- Uninterruptibles.awaitUninterruptibly(latch);
+ Futures.getUnchecked(executor.submit(Runnables.doNothing()));
}
private int countConnectedAndPendingPeers() {
@@ -765,107 +839,98 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
return false;
}
- /** Picks a peer from discovery and connects to it. If connection fails, picks another and tries again. */
- protected void connectToAnyPeer() throws PeerDiscoveryException {
- final State state = state();
- if (!(state == State.STARTING || state == State.RUNNING)) return;
-
- PeerAddress addr = null;
-
- long nowMillis = Utils.currentTimeMillis();
- long retryTime = 0;
- lock.lock();
- try {
- if (useLocalhostPeerWhenPossible && maybeCheckForLocalhostPeer()) {
- log.info("Localhost peer detected, trying to use it instead of P2P discovery");
- maxConnections = 0;
- connectToLocalHost();
- return;
- }
- if (!haveReadyInactivePeer(nowMillis)) {
- // Release the lock here because we'll probably do slow things like DNS lookups below,
- discoverPeers();
- groupBackoff.trackSuccess();
- nowMillis = Utils.currentTimeMillis();
- }
- if (inactives.size() == 0) {
- log.debug("Peer discovery didn't provide us any more peers, not trying to build new connection.");
- return;
- }
- while (addr == null || (ipv6Unreachable && addr.getAddr() instanceof Inet6Address))
- addr = inactives.poll();
- retryTime = backoffMap.get(addr).getRetryTime();
- } finally {
- // discoverPeers might throw an exception if something goes wrong: we then hit this path with addr == null.
- retryTime = Math.max(retryTime, groupBackoff.getRetryTime());
- lock.unlock();
- if (retryTime > nowMillis) {
- // Sleep until retry time
- final long millis = retryTime - nowMillis;
- log.info("Waiting {} msec before next connect attempt {}", millis, addr == null ? "" : "to " + addr);
- Utils.sleep(millis);
- }
- }
-
- // This method constructs a Peer and puts it into pendingPeers.
- checkNotNull(addr); // Help static analysis which can't see that addr is always set if we didn't throw above.
- connectTo(addr, false, vConnectTimeoutMillis);
- }
-
- private boolean haveReadyInactivePeer(long nowMillis) {
- // No inactive peers to try?
- if (inactives.size() == 0)
- return false;
- // All peers have not reached backoff retry time?
- if (backoffMap.get(inactives.peek()).getRetryTime() > nowMillis)
- return false;
- return true;
- }
-
- @Override
- protected void startUp() throws Exception {
+ /**
+ * Starts the PeerGroup and begins network activity.
+ * @return A future that completes when first connection activity has been triggered (note: not first connection made).
+ */
+ public ListenableFuture startAsync() {
// This is run in a background thread by the Service implementation.
if (chain == null) {
// Just try to help catch what might be a programming error.
log.warn("Starting up with no attached block chain. Did you forget to pass one to the constructor?");
}
- vPingTimer = new Timer("Peer pinging thread", true);
- if (torClient != null) {
- log.info("Starting Tor/Orchid ...");
- torClient.start();
- torClient.waitUntilReady(TOR_TIMEOUT_SECONDS * 1000);
- log.info("Tor ready");
- }
- channels.startAsync();
- channels.awaitRunning();
- triggerConnections();
- }
-
- @Override
- protected void shutDown() throws Exception {
- // This is run on a separate thread by the Service implementation.
- vPingTimer.cancel();
- // Blocking close of all sockets.
- channels.stopAsync();
- channels.awaitTerminated();
- for (PeerDiscovery peerDiscovery : peerDiscoverers) {
- peerDiscovery.shutdown();
- }
- if (torClient != null) {
- torClient.stop();
- }
- }
-
- @Override
- protected void triggerShutdown() {
- // Force the thread to wake up.
- Uninterruptibles.putUninterruptibly(jobQueue, new Runnable() {
+ checkState(!vUsedUp, "Cannot start a peer group twice");
+ vRunning = true;
+ vUsedUp = true;
+ executorStartupLatch.countDown();
+ // We do blocking waits during startup, so run on the executor thread.
+ return executor.submit(new Runnable() {
@Override
public void run() {
+ log.info("Starting ...");
+ vPingTimer = new Timer("Peer pinging thread", true);
+ if (torClient != null) {
+ log.info("Starting Tor/Orchid ...");
+ torClient.start();
+ try {
+ torClient.waitUntilReady(TOR_TIMEOUT_SECONDS * 1000);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ log.info("Tor ready");
+ }
+ channels.startAsync();
+ channels.awaitRunning();
+ triggerConnections();
}
});
}
+ /** Does a blocking startup. */
+ public void start() {
+ Futures.getUnchecked(startAsync());
+ }
+
+ /** Can just use start() for a blocking start here instead of startAsync/awaitRunning: PeerGroup is no longer a Guava service. */
+ @Deprecated
+ public void awaitRunning() {
+ waitForJobQueue();
+ }
+
+ public ListenableFuture stopAsync() {
+ checkState(vRunning);
+ vRunning = false;
+ ListenableFuture future = executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ log.info("Stopping ...");
+ vPingTimer.cancel();
+ // Blocking close of all sockets.
+ channels.stopAsync();
+ channels.awaitTerminated();
+ for (PeerDiscovery peerDiscovery : peerDiscoverers) {
+ peerDiscovery.shutdown();
+ }
+ if (torClient != null) {
+ torClient.stop();
+ }
+ vRunning = false;
+ }
+ });
+ executor.shutdown();
+ return future;
+ }
+
+ /** Does a blocking stop */
+ public void stop() {
+ try {
+ stopAsync();
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** Can just use stop() here instead of stopAsync/awaitTerminated: PeerGroup is no longer a Guava service. */
+ @Deprecated
+ public void awaitTerminated() {
+ try {
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
/**
* Link the given wallet to this PeerGroup. This is used for three purposes:
*
@@ -1332,8 +1397,7 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
protected void handlePeerDeath(final Peer peer, @Nullable Exception exception) {
// Peer deaths can occur during startup if a connect attempt after peer discovery aborts immediately.
- final State state = state();
- if (state != State.RUNNING && state != State.STARTING) return;
+ if (!isRunning()) return;
int numPeers;
int numConnectedPeers = 0;
@@ -1715,7 +1779,7 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
* @return the maximum number of peers to discover
*/
public int getMaxPeersToDiscoverCount() {
- return maxPeersToDiscoverCount;
+ return vMaxPeersToDiscoverCount;
}
/**
@@ -1724,7 +1788,7 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
* @param maxPeersToDiscoverCount the maximum number of peers to discover
*/
public void setMaxPeersToDiscoverCount(int maxPeersToDiscoverCount) {
- this.maxPeersToDiscoverCount = maxPeersToDiscoverCount;
+ this.vMaxPeersToDiscoverCount = maxPeersToDiscoverCount;
}
/** See {@link #setUseLocalhostPeerWhenPossible(boolean)} */
@@ -1750,4 +1814,8 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
lock.unlock();
}
}
+
+ public boolean isRunning() {
+ return vRunning;
+ }
}
diff --git a/core/src/main/java/org/bitcoinj/kits/WalletAppKit.java b/core/src/main/java/org/bitcoinj/kits/WalletAppKit.java
index 4a407ab95..35fc71d7e 100644
--- a/core/src/main/java/org/bitcoinj/kits/WalletAppKit.java
+++ b/core/src/main/java/org/bitcoinj/kits/WalletAppKit.java
@@ -18,9 +18,7 @@
package org.bitcoinj.kits;
import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.Service;
+import com.google.common.util.concurrent.*;
import com.subgraph.orchid.TorClient;
import org.bitcoinj.core.*;
import org.bitcoinj.net.discovery.DnsDiscovery;
@@ -293,8 +291,7 @@ public class WalletAppKit extends AbstractIdleService {
onSetupCompleted();
if (blockingStartup) {
- vPeerGroup.startAsync();
- vPeerGroup.awaitRunning();
+ vPeerGroup.start();
// Make sure we shut down cleanly.
installShutdownHook();
completeExtensionInitiations(vPeerGroup);
@@ -304,20 +301,20 @@ public class WalletAppKit extends AbstractIdleService {
vPeerGroup.startBlockChainDownload(listener);
listener.await();
} else {
- vPeerGroup.startAsync();
- vPeerGroup.addListener(new Service.Listener() {
+ Futures.addCallback(vPeerGroup.startAsync(), new FutureCallback() {
@Override
- public void running() {
+ public void onSuccess(@Nullable Object result) {
completeExtensionInitiations(vPeerGroup);
final PeerEventListener l = downloadListener == null ? new DownloadListener() : downloadListener;
vPeerGroup.startBlockChainDownload(l);
}
@Override
- public void failed(State from, Throwable failure) {
- throw new RuntimeException(failure);
+ public void onFailure(Throwable t) {
+ throw new RuntimeException(t);
+
}
- }, MoreExecutors.sameThreadExecutor());
+ });
}
} catch (BlockStoreException e) {
throw new IOException(e);
@@ -441,8 +438,7 @@ public class WalletAppKit extends AbstractIdleService {
protected void shutDown() throws Exception {
// Runs in a separate thread.
try {
- vPeerGroup.stopAsync();
- vPeerGroup.awaitTerminated();
+ vPeerGroup.stop();
vWallet.saveToFile(vWalletFile);
vStore.close();
diff --git a/core/src/main/java/org/bitcoinj/testing/TestWithPeerGroup.java b/core/src/main/java/org/bitcoinj/testing/TestWithPeerGroup.java
index 0d7b5691c..0705aac11 100644
--- a/core/src/main/java/org/bitcoinj/testing/TestWithPeerGroup.java
+++ b/core/src/main/java/org/bitcoinj/testing/TestWithPeerGroup.java
@@ -16,15 +16,19 @@
package org.bitcoinj.testing;
+import com.google.common.util.concurrent.*;
import org.bitcoinj.core.*;
import org.bitcoinj.net.BlockingClientManager;
+import org.bitcoinj.net.ClientConnectionManager;
import org.bitcoinj.net.NioClientManager;
import org.bitcoinj.params.UnitTestParams;
import org.bitcoinj.store.BlockStore;
import org.bitcoinj.store.MemoryBlockStore;
import com.google.common.base.Preconditions;
+import org.bitcoinj.utils.DaemonThreadFactory;
import java.net.InetSocketAddress;
+import java.util.concurrent.*;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
@@ -58,6 +62,7 @@ public class TestWithPeerGroup extends TestWithNetworkConnections {
remoteVersionMessage = new VersionMessage(unitTestParams, 1);
remoteVersionMessage.localServices = VersionMessage.NODE_NETWORK;
remoteVersionMessage.clientVersion = NotFoundMessage.MIN_PROTOCOL_VERSION;
+ blockJobs = false;
initPeerGroup();
}
@@ -65,9 +70,9 @@ public class TestWithPeerGroup extends TestWithNetworkConnections {
public void tearDown() {
try {
super.tearDown();
+ blockJobs = false;
Utils.finishMockSleep();
peerGroup.stopAsync();
- peerGroup.awaitTerminated();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -75,14 +80,40 @@ public class TestWithPeerGroup extends TestWithNetworkConnections {
protected void initPeerGroup() {
if (clientType == ClientType.NIO_CLIENT_MANAGER)
- peerGroup = new PeerGroup(unitTestParams, blockChain, new NioClientManager());
+ peerGroup = createPeerGroup(new NioClientManager());
else
- peerGroup = new PeerGroup(unitTestParams, blockChain, new BlockingClientManager());
+ peerGroup = createPeerGroup(new BlockingClientManager());
peerGroup.setPingIntervalMsec(0); // Disable the pings as they just get in the way of most tests.
peerGroup.addWallet(wallet);
peerGroup.setUseLocalhostPeerWhenPossible(false); // Prevents from connecting to bitcoin nodes on localhost.
}
+ protected boolean blockJobs = false;
+ protected final Semaphore jobBlocks = new Semaphore(0);
+
+ private PeerGroup createPeerGroup(final ClientConnectionManager manager) {
+ return new PeerGroup(unitTestParams, blockChain, manager) {
+ @Override
+ protected ListeningScheduledExecutorService createPrivateExecutor() {
+ return MoreExecutors.listeningDecorator(new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("PeerGroup test thread")) {
+ @Override
+ public ScheduledFuture> schedule(final Runnable command, final long delay, final TimeUnit unit) {
+ if (!blockJobs)
+ return super.schedule(command, delay, unit);
+ return super.schedule(new Runnable() {
+ @Override
+ public void run() {
+ Utils.rollMockClockMillis(unit.toMillis(delay));
+ command.run();
+ jobBlocks.acquireUninterruptibly();
+ }
+ }, 0 /* immediate */, unit);
+ }
+ });
+ }
+ };
+ }
+
protected InboundMessageQueuer connectPeerWithoutVersionExchange(int id) throws Exception {
Preconditions.checkArgument(id < PEER_SERVERS);
InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 2000 + id);
diff --git a/core/src/main/java/org/bitcoinj/utils/DaemonThreadFactory.java b/core/src/main/java/org/bitcoinj/utils/DaemonThreadFactory.java
index f48d65597..254feccd7 100644
--- a/core/src/main/java/org/bitcoinj/utils/DaemonThreadFactory.java
+++ b/core/src/main/java/org/bitcoinj/utils/DaemonThreadFactory.java
@@ -1,16 +1,28 @@
package org.bitcoinj.utils;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/** Thread factory whose threads are marked as daemon and won't prevent process exit. */
public class DaemonThreadFactory implements ThreadFactory {
+ @Nullable private final String name;
+
+ public DaemonThreadFactory(@Nullable String name) {
+ this.name = name;
+ }
+
+ public DaemonThreadFactory() {
+ this(null);
+ }
@Override
public Thread newThread(@Nonnull Runnable runnable) {
Thread thread = Executors.defaultThreadFactory().newThread(runnable);
thread.setDaemon(true);
+ if (name != null)
+ thread.setName(name);
return thread;
}
}
diff --git a/core/src/test/java/org/bitcoinj/core/FilteredBlockAndPartialMerkleTreeTests.java b/core/src/test/java/org/bitcoinj/core/FilteredBlockAndPartialMerkleTreeTests.java
index f994d8e9e..45c1eb533 100644
--- a/core/src/test/java/org/bitcoinj/core/FilteredBlockAndPartialMerkleTreeTests.java
+++ b/core/src/test/java/org/bitcoinj/core/FilteredBlockAndPartialMerkleTreeTests.java
@@ -188,7 +188,6 @@ public class FilteredBlockAndPartialMerkleTreeTests extends TestWithPeerGroup {
// Peer 1 goes away.
closePeer(peerOf(p1));
- peerGroup.stopAsync();
super.tearDown();
}
}
diff --git a/core/src/test/java/org/bitcoinj/core/PeerGroupTest.java b/core/src/test/java/org/bitcoinj/core/PeerGroupTest.java
index d826947f6..b53099f9b 100644
--- a/core/src/test/java/org/bitcoinj/core/PeerGroupTest.java
+++ b/core/src/test/java/org/bitcoinj/core/PeerGroupTest.java
@@ -159,8 +159,7 @@ public class PeerGroupTest extends TestWithPeerGroup {
public void shutdown() {
}
});
- peerGroup.startAsync();
- peerGroup.awaitRunning();
+ peerGroup.start();
latch.await();
// Check that we did indeed throw an exception. If we got here it means we threw and then PeerGroup tried
// again a bit later.
@@ -238,8 +237,6 @@ public class PeerGroupTest extends TestWithPeerGroup {
inbound(p2, new NotFoundMessage(unitTestParams, getdata.getItems()));
pingAndWait(p2);
assertEquals(value, wallet.getBalance(Wallet.BalanceType.ESTIMATED));
- peerGroup.stopAsync();
- peerGroup.awaitTerminated();
}
@@ -276,9 +273,7 @@ public class PeerGroupTest extends TestWithPeerGroup {
inbound(p1, new NotFoundMessage(unitTestParams, getdata.getItems()));
pingAndWait(p1);
assertEquals(value, wallet2.getBalance(Wallet.BalanceType.ESTIMATED));
- peerGroup.stopAsync();
- peerGroup.awaitTerminated();
- }
+ }
@Test
public void singleDownloadPeer1() throws Exception {
@@ -320,7 +315,6 @@ public class PeerGroupTest extends TestWithPeerGroup {
// Peer 2 fetches it next time it hears an inv (should it fetch immediately?).
inbound(p2, inv);
assertTrue(outbound(p2) instanceof GetDataMessage);
- peerGroup.stopAsync();
}
@Test
@@ -358,7 +352,6 @@ public class PeerGroupTest extends TestWithPeerGroup {
InboundMessageQueuer p2 = connectPeer(2);
Message message = (Message)outbound(p2);
assertNull(message == null ? "" : message.toString(), message);
- peerGroup.stopAsync();
}
@Test
@@ -559,10 +552,15 @@ public class PeerGroupTest extends TestWithPeerGroup {
}
});
peerGroup.setMaxConnections(3);
+
Utils.setMockSleep(true);
+ blockJobs = true;
+
+ jobBlocks.release(2); // startup + first peer discovery
peerGroup.startAsync();
peerGroup.awaitRunning();
+ jobBlocks.release(3); // One for each peer.
handleConnectToPeer(0);
handleConnectToPeer(1);
handleConnectToPeer(2);
@@ -575,6 +573,7 @@ public class PeerGroupTest extends TestWithPeerGroup {
assertEquals(2002, disconnectedPeers.take().getAddress().getPort()); // peer died
// discovers, connects to new peer
+ jobBlocks.release(1);
handleConnectToPeer(3);
assertEquals(2003, connectedPeers.take().getAddress().getPort());
@@ -582,30 +581,25 @@ public class PeerGroupTest extends TestWithPeerGroup {
assertEquals(2001, disconnectedPeers.take().getAddress().getPort()); // peer died
// Alternates trying two offline peers
- Utils.passMockSleep();
+ jobBlocks.release(10);
assertEquals(2001, disconnectedPeers.take().getAddress().getPort());
- Utils.passMockSleep();
assertEquals(2002, disconnectedPeers.take().getAddress().getPort());
- Utils.passMockSleep();
assertEquals(2001, disconnectedPeers.take().getAddress().getPort());
- Utils.passMockSleep();
assertEquals(2002, disconnectedPeers.take().getAddress().getPort());
- Utils.passMockSleep();
assertEquals(2001, disconnectedPeers.take().getAddress().getPort());
// Peer 2 comes online
startPeerServer(2);
- Utils.passMockSleep();
+ jobBlocks.release(1);
handleConnectToPeer(2);
assertEquals(2002, connectedPeers.take().getAddress().getPort());
+ jobBlocks.release(6);
stopPeerServer(2);
assertEquals(2002, disconnectedPeers.take().getAddress().getPort()); // peer died
// Peer 2 is tried before peer 1, since it has a lower backoff due to recent success
- Utils.passMockSleep();
assertEquals(2002, disconnectedPeers.take().getAddress().getPort());
- Utils.passMockSleep();
assertEquals(2001, disconnectedPeers.take().getAddress().getPort());
}