Abstract out multiplexing from DnsDiscovery.

This commit is contained in:
Mike Hearn 2015-01-07 16:43:45 +01:00
parent f36576ce29
commit 0b5b101343
2 changed files with 137 additions and 64 deletions

View File

@ -18,13 +18,10 @@
package org.bitcoinj.net.discovery;
import org.bitcoinj.core.NetworkParameters;
import com.google.common.collect.Lists;
import org.bitcoinj.utils.DaemonThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.*;
@ -39,12 +36,7 @@ import java.util.concurrent.*;
* will return up to 30 random peers from the set of those returned within the timeout period. If you want more peers
* to connect to, you need to discover them via other means (like addr broadcasts).</p>
*/
public class DnsDiscovery implements PeerDiscovery {
private static final Logger log = LoggerFactory.getLogger(DnsDiscovery.class);
private final String[] dnsSeeds;
private final NetworkParameters netParams;
public class DnsDiscovery extends MultiplexingDiscovery {
/**
* Supports finding peers through DNS A records. Community run DNS entry points will be used.
*
@ -58,65 +50,44 @@ public class DnsDiscovery implements PeerDiscovery {
* Supports finding peers through DNS A records.
*
* @param dnsSeeds Host names to be examined for seed addresses.
* @param netParams Network parameters to be used for port information.
* @param params Network parameters to be used for port information.
*/
public DnsDiscovery(String[] dnsSeeds, NetworkParameters netParams) {
this.dnsSeeds = dnsSeeds;
this.netParams = netParams;
public DnsDiscovery(String[] dnsSeeds, NetworkParameters params) {
super(params, buildDiscoveries(params, dnsSeeds));
}
@Override
public InetSocketAddress[] getPeers(long timeoutValue, TimeUnit timeoutUnit) throws PeerDiscoveryException {
if (dnsSeeds == null || dnsSeeds.length == 0)
throw new PeerDiscoveryException("No DNS seeds configured; unable to find any peers");
private static List<PeerDiscovery> buildDiscoveries(NetworkParameters params, String[] seeds) {
List<PeerDiscovery> discoveries = new ArrayList<PeerDiscovery>(seeds.length);
for (String seed : seeds)
discoveries.add(new DnsSeedDiscovery(params, seed));
return discoveries;
}
// Java doesn't have an async DNS API so we have to do all lookups in a thread pool, as sometimes seeds go
// hard down and it takes ages to give up and move on.
ExecutorService threadPool = Executors.newFixedThreadPool(dnsSeeds.length, new DaemonThreadFactory());
try {
List<Callable<InetAddress[]>> tasks = Lists.newArrayList();
for (final String seed : dnsSeeds) {
tasks.add(new Callable<InetAddress[]>() {
@Override
public InetAddress[] call() throws Exception {
return InetAddress.getAllByName(seed);
}
});
/** Implements discovery from a single DNS host. */
public static class DnsSeedDiscovery implements PeerDiscovery {
private final String hostname;
private final NetworkParameters params;
public DnsSeedDiscovery(NetworkParameters params, String hostname) {
this.hostname = hostname;
this.params = params;
}
@Override
public InetSocketAddress[] getPeers(long timeoutValue, TimeUnit timeoutUnit) throws PeerDiscoveryException {
try {
InetAddress[] response = InetAddress.getAllByName(hostname);
InetSocketAddress[] result = new InetSocketAddress[response.length];
for (int i = 0; i < response.length; i++)
result[i] = new InetSocketAddress(response[i], params.getPort());
return result;
} catch (UnknownHostException e) {
throw new PeerDiscoveryException(e);
}
final List<Future<InetAddress[]>> futures = threadPool.invokeAll(tasks, timeoutValue, timeoutUnit);
ArrayList<InetSocketAddress> addrs = Lists.newArrayList();
for (int i = 0; i < futures.size(); i++) {
Future<InetAddress[]> future = futures.get(i);
if (future.isCancelled()) {
log.warn("DNS seed {}: timed out", dnsSeeds[i]);
continue; // Timed out.
}
final InetAddress[] inetAddresses;
try {
inetAddresses = future.get();
log.info("DNS seed {}: got {} peers", dnsSeeds[i], inetAddresses.length);
} catch (ExecutionException e) {
log.error("DNS seed {}: failed to look up: {}", dnsSeeds[i], e.getMessage());
continue;
}
for (InetAddress addr : inetAddresses) {
addrs.add(new InetSocketAddress(addr, netParams.getPort()));
}
}
if (addrs.size() == 0)
throw new PeerDiscoveryException("Unable to find any peers via DNS");
Collections.shuffle(addrs);
threadPool.shutdownNow();
return addrs.toArray(new InetSocketAddress[addrs.size()]);
} catch (InterruptedException e) {
throw new PeerDiscoveryException(e);
} finally {
threadPool.shutdown();
}
@Override
public void shutdown() {
}
}
/** We don't have a way to abort a DNS lookup, so this does nothing */
@Override
public void shutdown() {
}
}

View File

@ -0,0 +1,102 @@
/**
* Copyright 2014 Mike Hearn
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bitcoinj.net.discovery;
import com.google.common.collect.Lists;
import org.bitcoinj.core.NetworkParameters;
import org.bitcoinj.utils.DaemonThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
import static com.google.common.base.Preconditions.checkArgument;
/**
* MultiplexingDiscovery queries multiple PeerDiscovery objects, shuffles their responses and then returns the results,
* thus selecting randomly between them and reducing the influence of any particular seed. Any that don't respond
* within the timeout are ignored. Backends are queried in parallel. Backends may block
*/
public class MultiplexingDiscovery implements PeerDiscovery {
private static final Logger log = LoggerFactory.getLogger(MultiplexingDiscovery.class);
protected final List<PeerDiscovery> seeds;
protected final NetworkParameters netParams;
private volatile ExecutorService vThreadPool;
/**
* Will query the given seeds in parallel before producing a merged response.
*/
public MultiplexingDiscovery(NetworkParameters params, List<PeerDiscovery> seeds) {
checkArgument(!seeds.isEmpty());
this.netParams = params;
this.seeds = seeds;
}
@Override
public InetSocketAddress[] getPeers(final long timeoutValue, final TimeUnit timeoutUnit) throws PeerDiscoveryException {
vThreadPool = Executors.newFixedThreadPool(seeds.size(), new DaemonThreadFactory());
try {
List<Callable<InetSocketAddress[]>> tasks = Lists.newArrayList();
for (final PeerDiscovery seed : seeds) {
tasks.add(new Callable<InetSocketAddress[]>() {
@Override
public InetSocketAddress[] call() throws Exception {
return seed.getPeers(timeoutValue, timeoutUnit);
}
});
}
final List<Future<InetSocketAddress[]>> futures = vThreadPool.invokeAll(tasks, timeoutValue, timeoutUnit);
ArrayList<InetSocketAddress> addrs = Lists.newArrayList();
for (int i = 0; i < futures.size(); i++) {
Future<InetSocketAddress[]> future = futures.get(i);
if (future.isCancelled()) {
log.warn("Seed {}: timed out", seeds.get(i));
continue; // Timed out.
}
final InetSocketAddress[] inetAddresses;
try {
inetAddresses = future.get();
} catch (ExecutionException e) {
log.warn("Seed {}: failed to look up: {}", seeds.get(i), e.getMessage());
continue;
}
Collections.addAll(addrs, inetAddresses);
}
if (addrs.size() == 0)
throw new PeerDiscoveryException("No peer discovery returned any results: check internet connection?");
Collections.shuffle(addrs);
vThreadPool.shutdownNow();
return addrs.toArray(new InetSocketAddress[addrs.size()]);
} catch (InterruptedException e) {
throw new PeerDiscoveryException(e);
} finally {
vThreadPool.shutdown();
}
}
@Override
public void shutdown() {
ExecutorService tp = vThreadPool;
if (tp != null)
tp.shutdown();
}
}