Change discover handling

This commit is contained in:
Manfred Karrer 2014-11-20 22:59:24 +01:00
parent 356b76a21a
commit 1d24a2aa38
7 changed files with 109 additions and 184 deletions

View File

@ -20,7 +20,6 @@ package io.bitsquare.app.cli;
import io.bitsquare.network.Node;
import net.tomp2p.dht.PeerBuilderDHT;
import net.tomp2p.dht.PeerDHT;
import net.tomp2p.nat.PeerBuilderNAT;
import net.tomp2p.p2p.Peer;
import net.tomp2p.p2p.PeerBuilder;
@ -28,7 +27,6 @@ import net.tomp2p.peers.Number160;
import net.tomp2p.peers.PeerAddress;
import net.tomp2p.peers.PeerMapChangeListener;
import net.tomp2p.peers.PeerStatistic;
import net.tomp2p.replication.IndirectReplication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -54,39 +52,41 @@ public class BootstrapNode {
try {
Number160 peerId = Number160.createHash(name);
peer = new PeerBuilder(peerId).ports(port).start();
/* peer.objectDataReply((sender, request) -> {
/*peer.objectDataReply((sender, request) -> {
log.trace("received request: " + request.toString());
return "pong";
});*/
PeerDHT peerDHT = new PeerBuilderDHT(peer).start();
new PeerBuilderDHT(peer).start();
new PeerBuilderNAT(peer).start();
new IndirectReplication(peerDHT).start();
peer.peerBean().peerMap().addPeerMapChangeListener(new PeerMapChangeListener() {
@Override
public void peerInserted(PeerAddress peerAddress, boolean verified) {
log.info("peerInserted: " + peerAddress);
log.debug("Peer inserted: peerAddress=" + peerAddress + ", verified=" + verified);
}
@Override
public void peerRemoved(PeerAddress peerAddress, PeerStatistic storedPeerAddress) {
log.info("peerRemoved: " + peerAddress);
public void peerRemoved(PeerAddress peerAddress, PeerStatistic peerStatistics) {
log.debug("Peer removed: peerAddress=" + peerAddress + ", peerStatistics=" + peerStatistics);
}
@Override
public void peerUpdated(PeerAddress peerAddress, PeerStatistic storedPeerAddress) {
public void peerUpdated(PeerAddress peerAddress, PeerStatistic peerStatistics) {
// log.debug("Peer updated: peerAddress=" + peerAddress + ", peerStatistics=" + peerStatistics);
}
});
log.info("Bootstrap node started with name " + name + " and port " + port);
new Thread(() -> {
while (running) {
log.info("List of all peers online ----------------------------");
for (PeerAddress peerAddress : peer.peerBean().peerMap().all()) {
log.info("Peer online: " + peerAddress);
log.info(peerAddress.toString());
}
log.info("-----------------------------------------------------");
try {
Thread.sleep(60000);
Thread.sleep(10000);
} catch (InterruptedException e) {
return;
}

View File

@ -87,22 +87,22 @@ class MainPM extends PresentationModel<MainModel> {
numPendingTrades.bind(model.numPendingTrades);
model.bootstrapState.addListener((ov, oldValue, newValue) -> {
if (newValue == BootstrapState.DIRECT_SUCCESS ||
newValue == BootstrapState.AUTO_PORT_FORWARDING_SUCCESS ||
newValue == BootstrapState.RELAY_SUCCESS) {
if (newValue == BootstrapState.DISCOVERY_NO_NAT_SUCCEEDED ||
newValue == BootstrapState.DISCOVERY_AUTO_PORT_FORWARDING_SUCCEEDED ||
newValue == BootstrapState.RELAY_SUCCEEDED) {
bootstrapState.set("Successfully connected to P2P network: " + newValue.getMessage());
bootstrapProgress.set(1);
if (newValue == BootstrapState.DIRECT_SUCCESS)
if (newValue == BootstrapState.DISCOVERY_NO_NAT_SUCCEEDED)
bootstrapIconId.set("image-connection-direct");
else if (newValue == BootstrapState.AUTO_PORT_FORWARDING_SUCCESS)
else if (newValue == BootstrapState.DISCOVERY_AUTO_PORT_FORWARDING_SUCCEEDED)
bootstrapIconId.set("image-connection-nat");
else if (newValue == BootstrapState.RELAY_SUCCESS)
else if (newValue == BootstrapState.RELAY_SUCCEEDED)
bootstrapIconId.set("image-connection-relay");
}
else if (newValue == BootstrapState.PEER_CREATION_FAILED ||
newValue == BootstrapState.DIRECT_FAILED ||
newValue == BootstrapState.AUTO_PORT_FORWARDING_FAILED ||
newValue == BootstrapState.DISCOVERY_FAILED ||
newValue == BootstrapState.DISCOVERY_AUTO_PORT_FORWARDING_FAILED ||
newValue == BootstrapState.RELAY_FAILED) {
bootstrapErrorMsg.set(newValue.getMessage());

View File

@ -19,7 +19,6 @@ package io.bitsquare.msg.tomp2p;
import io.bitsquare.network.BootstrapState;
import io.bitsquare.network.Node;
import io.bitsquare.persistence.Persistence;
import com.google.common.util.concurrent.SettableFuture;
@ -54,7 +53,6 @@ import net.tomp2p.peers.Number160;
import net.tomp2p.peers.PeerAddress;
import net.tomp2p.peers.PeerMapChangeListener;
import net.tomp2p.peers.PeerStatistic;
import net.tomp2p.replication.IndirectReplication;
import org.jetbrains.annotations.NotNull;
@ -78,7 +76,6 @@ class BootstrappedPeerFactory {
private boolean useManualPortForwarding;
private final Node bootstrapNode;
private final String networkInterface;
private final Persistence persistence;
private final SettableFuture<PeerDHT> settableFuture = SettableFuture.create();
@ -92,12 +89,10 @@ class BootstrappedPeerFactory {
///////////////////////////////////////////////////////////////////////////////////////////
@Inject
public BootstrappedPeerFactory(Persistence persistence,
@Named(Node.PORT_KEY) int port,
public BootstrappedPeerFactory(@Named(Node.PORT_KEY) int port,
@Named(USE_MANUAL_PORT_FORWARDING_KEY) boolean useManualPortForwarding,
@Named(BOOTSTRAP_NODE_KEY) Node bootstrapNode,
@Named(NETWORK_INTERFACE_KEY) String networkInterface) {
this.persistence = persistence;
this.port = port;
this.useManualPortForwarding = useManualPortForwarding;
this.bootstrapNode = bootstrapNode;
@ -120,8 +115,6 @@ class BootstrappedPeerFactory {
public SettableFuture<PeerDHT> start() {
try {
setState(BootstrapState.PEER_CREATION, "We create a P2P node.");
Bindings bindings = new Bindings();
if (!NETWORK_INTERFACE_UNSPECIFIED.equals(networkInterface))
bindings.addInterface(networkInterface);
@ -132,19 +125,17 @@ class BootstrappedPeerFactory {
.bindings(bindings)
.tcpPortForwarding(port)
.udpPortForwarding(port)
.behindFirewall()
.start();
}
else {
peer = new PeerBuilder(keyPair)
.ports(port)
.bindings(bindings)
.behindFirewall()
.start();
}
peerDHT = new PeerBuilderDHT(peer).start();
new IndirectReplication(peerDHT).start();
setState(BootstrapState.PEER_CREATED, "We created a peerDHT.");
peer.peerBean().peerMap().addPeerMapChangeListener(new PeerMapChangeListener() {
@Override
@ -163,46 +154,10 @@ class BootstrappedPeerFactory {
}
});
// We save last successful bootstrap method.
// Reset it to BootstrapState.DIRECT_SUCCESS after 5 start ups.
Object bootstrapCounterObject = persistence.read(this, "bootstrapCounter");
int bootstrapCounter = 0;
if (bootstrapCounterObject instanceof Integer)
bootstrapCounter = (int) bootstrapCounterObject + 1;
if (bootstrapCounter > 5) {
persistence.write(this, "lastSuccessfulBootstrap", BootstrapState.DIRECT_SUCCESS);
bootstrapCounter = 0;
}
persistence.write(this, "bootstrapCounter", bootstrapCounter);
BootstrapState lastSuccessfulBootstrap = BootstrapState.DIRECT_SUCCESS;
Object lastSuccessfulBootstrapObject = persistence.read(this, "lastSuccessfulBootstrap");
if (lastSuccessfulBootstrapObject instanceof BootstrapState)
lastSuccessfulBootstrap = (BootstrapState) lastSuccessfulBootstrapObject;
else
persistence.write(this, "lastSuccessfulBootstrap", lastSuccessfulBootstrap);
log.debug("lastSuccessfulBootstrap = " + lastSuccessfulBootstrap);
// just temporary always start with trying direct connection
lastSuccessfulBootstrap = BootstrapState.DIRECT_SUCCESS;
switch (lastSuccessfulBootstrap) {
// For the moment we don't support relay mode as it has too much problems
/* case RELAY_SUCCESS:
bootstrapWithRelay();
break;*/
case AUTO_PORT_FORWARDING_SUCCESS:
tryPortForwarding();
break;
case DIRECT_SUCCESS:
default:
discover();
break;
}
discoverExternalAddress();
} catch (IOException e) {
handleError(BootstrapState.PEER_CREATION, "Cannot create peer with port: " + port + ". Exception: " + e);
handleError(BootstrapState.PEER_CREATION_FAILED, "Cannot create a peer with port: " +
port + ". Exception: " + e);
}
return settableFuture;
@ -213,105 +168,73 @@ class BootstrappedPeerFactory {
peerDHT.shutdown();
}
// 1. Attempt: Try to discover our outside visible address
private void discover() {
setState(BootstrapState.DIRECT_INIT, "We are starting discovery against a bootstrap node.");
FutureDiscover futureDiscover = peer.discover().peerAddress(getBootstrapAddress()).start();
futureDiscover.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
if (future.isSuccess()) {
// We are well connected so we can offer our capabilities as relay node for other peers
new PeerBuilderNAT(peer).start();
if (useManualPortForwarding) {
setState(BootstrapState.MANUAL_PORT_FORWARDING_SUCCESS,
"We use manual port forwarding and are visible to other peers.");
bootstrap(BootstrapState.MANUAL_PORT_FORWARDING_SUCCESS);
}
else {
setState(BootstrapState.DIRECT_SUCCESS,
"We are directly connected and visible to other peers.");
bootstrap(BootstrapState.DIRECT_SUCCESS);
}
}
else {
setState(BootstrapState.DIRECT_NOT_SUCCEEDED, "We are probably behind a NAT and not reachable to " +
"other peers. We try to setup automatic port forwarding.");
tryPortForwarding();
}
}
@Override
public void exceptionCaught(Throwable t) throws Exception {
handleError(BootstrapState.DIRECT_FAILED, "Exception at discover: " + t.getMessage());
}
});
}
// 2. Attempt: Try to set up port forwarding with UPNP and NAT-PMP
private void tryPortForwarding() {
setState(BootstrapState.AUTO_PORT_FORWARDING_INIT, "We are trying with automatic port forwarding.");
// We need to discover our external address and test if we are reachable for other nodes
// We know our internal address from a discovery of our local network interfaces
// We start a discover process with our bootstrap node.
// There are 4 cases:
// 1. If we are not behind a NAT we get reported back the same address as our internal.
// 2. If we are behind a NAT and manual port forwarding is setup we get reported our external address from the
// bootstrap node and the bootstrap node could ping us so we know we are reachable.
// 3. If we are behind a NAT and the ping probes fails we need to setup port forwarding with UPnP or NAT-PMP.
// If that is successfully setup we need to try again a discover so we find out our external address and have
// tested successfully our reachability (the additional discover is done internally from startSetupPortforwarding)
// 4. If the port forwarding failed we can try as last resort to open a permanent TCP connection to the
// bootstrap node and use that peer as relay (currently not supported as its too unstable)
private void discoverExternalAddress() {
FutureDiscover futureDiscover = peer.discover().peerAddress(getBootstrapAddress()).start();
setState(BootstrapState.DISCOVERY_STARTED, "We are starting discovery against a bootstrap node.");
PeerNAT peerNAT = new PeerBuilderNAT(peer).start();
FutureNAT futureNAT = peerNAT.startSetupPortforwarding(futureDiscover);
futureNAT.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
if (future.isSuccess()) {
setState(BootstrapState.AUTO_PORT_FORWARDING_SETUP_DONE, "Automatic port forwarding is setup. " +
"We need to do a discover process again.");
// we need a second discover process
discoverAfterPortForwarding();
// If futureDiscover was successful we are directly connected (or manual port forwarding is set)
if (futureDiscover.isSuccess()) {
if (useManualPortForwarding) {
setState(BootstrapState.DISCOVERY_MANUAL_PORT_FORWARDING_SUCCEEDED,
"We use manual port forwarding and are visible to other peers.");
bootstrap();
}
else {
setState(BootstrapState.DISCOVERY_NO_NAT_SUCCEEDED,
"We are not behind a NAT and visible to other peers.");
bootstrap();
}
}
else {
handleError(BootstrapState.AUTO_PORT_FORWARDING_FAILED, "Automatic port forwarding failed. " +
"Fail reason: " + future.failedReason() +
"\nCheck if UPnP is not enabled on your router. " +
"\nYou can try also to setup manual port forwarding. " +
"\nRelay mode is currently not supported but will follow later. ");
setState(BootstrapState.DISCOVERY_AUTO_PORT_FORWARDING_STARTED,
"We are probably behind a NAT and not reachable to other peers. " +
"We try to setup automatic port forwarding.");
if (futureNAT.isSuccess()) {
setState(BootstrapState.DISCOVERY_AUTO_PORT_FORWARDING_SUCCEEDED,
"Discover with automatic port forwarding was successful.");
bootstrap();
}
else {
handleError(BootstrapState.DISCOVERY_AUTO_PORT_FORWARDING_FAILED, "Automatic port forwarding " +
"failed. " +
"Fail reason: " + future.failedReason() +
"\nCheck if UPnP is not enabled on your router. " +
"\nYou can try also to setup manual port forwarding. " +
"\nRelay mode is currently not supported but will follow later. ");
// For the moment we don't support relay mode as it has too much problems
// For the moment we don't support relay mode as it has too much problems
/* setState(BootstrapState.AUTO_PORT_FORWARDING_NOT_SUCCEEDED, "Port forwarding has failed. " +
"We try to use a relay as next step.");
bootstrapWithRelay();*/
}
}
}
@Override
public void exceptionCaught(Throwable t) throws Exception {
handleError(BootstrapState.AUTO_PORT_FORWARDING_FAILED, "Exception at port forwarding: " + t
handleError(BootstrapState.DISCOVERY_FAILED, "Exception at discover visibility: " + t
.getMessage());
}
});
}
// Try to determine our outside visible address after port forwarding is setup
private void discoverAfterPortForwarding() {
FutureDiscover futureDiscover = peer.discover().peerAddress(getBootstrapAddress()).start();
futureDiscover.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
if (future.isSuccess()) {
setState(BootstrapState.AUTO_PORT_FORWARDING_SUCCESS, "Discover with automatic port forwarding " +
"was successful.");
bootstrap(BootstrapState.AUTO_PORT_FORWARDING_SUCCESS);
}
else {
handleError(BootstrapState.AUTO_PORT_FORWARDING_FAILED, "Discover with automatic port forwarding " +
"has failed " +
futureDiscover.failedReason());
}
}
@Override
public void exceptionCaught(Throwable t) throws Exception {
handleError(BootstrapState.AUTO_PORT_FORWARDING_FAILED, "Exception at discover: " + t.getMessage());
}
});
}
// For the moment we don't support relay mode as it has too much problems
// 3. Attempt: We try to use another peer as relay
/* private void bootstrapWithRelay() {
@ -340,25 +263,25 @@ class BootstrappedPeerFactory {
});
}*/
private void bootstrap(BootstrapState state) {
private void bootstrap() {
FutureBootstrap futureBootstrap = peer.bootstrap().peerAddress(getBootstrapAddress()).start();
setState(BootstrapState.BOOT_STRAP_STARTED, "Bootstrap started.");
futureBootstrap.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
if (futureBootstrap.isSuccess()) {
setState(state, "Bootstrap successful.");
persistence.write(BootstrappedPeerFactory.this, "lastSuccessfulBootstrap", state);
setState(BootstrapState.BOOT_STRAP_SUCCEEDED, "Bootstrap successful.");
settableFuture.set(peerDHT);
}
else {
handleError(BootstrapState.DIRECT_NOT_SUCCEEDED, "Bootstrapping failed. " +
handleError(BootstrapState.BOOT_STRAP_FAILED, "Bootstrapping failed. " +
futureBootstrap.failedReason());
}
}
@Override
public void exceptionCaught(Throwable t) throws Exception {
handleError(BootstrapState.DIRECT_FAILED, "Exception at bootstrap: " + t.getMessage());
handleError(BootstrapState.BOOT_STRAP_FAILED, "Exception at bootstrap: " + t.getMessage());
}
});
}
@ -399,7 +322,6 @@ class BootstrappedPeerFactory {
private void handleError(BootstrapState state, String errorMessage) {
setState(state, errorMessage, false);
persistence.write(this, "lastSuccessfulBootstrap", BootstrapState.DIRECT_SUCCESS);
peerDHT.shutdown();
settableFuture.setException(new Exception(errorMessage));
}

View File

@ -53,7 +53,6 @@ import net.tomp2p.futures.BaseFutureListener;
import net.tomp2p.futures.FutureDirect;
import net.tomp2p.peers.Number160;
import net.tomp2p.peers.PeerAddress;
import net.tomp2p.peers.PeerSocketAddress;
import net.tomp2p.storage.Data;
import net.tomp2p.utils.Utils;
@ -378,13 +377,13 @@ public class TomP2PNode implements ClientNode {
public ConnectionType getConnectionType() {
BootstrapState bootstrapState = bootstrappedPeerFactory.getBootstrapState().get();
switch (bootstrapState) {
case DIRECT_SUCCESS:
case DISCOVERY_NO_NAT_SUCCEEDED:
return ConnectionType.DIRECT;
case MANUAL_PORT_FORWARDING_SUCCESS:
case DISCOVERY_MANUAL_PORT_FORWARDING_SUCCEEDED:
return ConnectionType.MANUAL_PORT_FORWARDING;
case AUTO_PORT_FORWARDING_SUCCESS:
case DISCOVERY_AUTO_PORT_FORWARDING_SUCCEEDED:
return ConnectionType.AUTO_PORT_FORWARDING;
case RELAY_SUCCESS:
case RELAY_SUCCEEDED:
return ConnectionType.RELAY;
default:
throw new BitsquareException("Invalid bootstrap state: %s", bootstrapState);
@ -393,11 +392,11 @@ public class TomP2PNode implements ClientNode {
@Override
public Node getAddress() {
PeerSocketAddress socketAddress = peerDHT.peerAddress().peerSocketAddress();
PeerAddress peerAddress = peerDHT.peerBean().serverPeerAddress();
return Node.at(
peerDHT.peerID().toString(),
socketAddress.inetAddress().toString(),
socketAddress.tcpPort());
peerAddress.inetAddress().getHostAddress(),
peerAddress.peerSocketAddress().tcpPort());
}
@Override

View File

@ -17,25 +17,22 @@
package io.bitsquare.network;
/**
* NOT_SUCCEEDED means we will try the next step, FAILED is used for fatal failures which will terminate the bootstrap
*/
public enum BootstrapState {
PEER_CREATION,
PEER_CREATED,
PEER_CREATION_FAILED,
DIRECT_INIT,
DIRECT_SUCCESS,
DIRECT_NOT_SUCCEEDED,
DIRECT_FAILED,
MANUAL_PORT_FORWARDING_SUCCESS,
AUTO_PORT_FORWARDING_INIT,
AUTO_PORT_FORWARDING_SETUP_DONE,
AUTO_PORT_FORWARDING_SUCCESS,
AUTO_PORT_FORWARDING_NOT_SUCCEEDED,
AUTO_PORT_FORWARDING_FAILED,
RELAY_INIT,
RELAY_SUCCESS,
RELAY_FAILED;
DISCOVERY_STARTED,
DISCOVERY_NO_NAT_SUCCEEDED,
DISCOVERY_MANUAL_PORT_FORWARDING_SUCCEEDED,
DISCOVERY_FAILED,
DISCOVERY_AUTO_PORT_FORWARDING_STARTED,
DISCOVERY_AUTO_PORT_FORWARDING_SUCCEEDED,
DISCOVERY_AUTO_PORT_FORWARDING_FAILED,
RELAY_STARTED,
RELAY_SUCCEEDED,
RELAY_FAILED,
BOOT_STRAP_STARTED,
BOOT_STRAP_SUCCEEDED,
BOOT_STRAP_FAILED;
private String message;

View File

@ -38,14 +38,19 @@
<logger name="io.netty.channel" level="WARN"/>
<logger name="io.netty.buffer" level="WARN"/>
<logger name="io.bitsquare.gui.ViewController" level="OFF"/>
<logger name="io.bitsquare.gui.CachedViewController" level="OFF"/>
<logger name="io.bitsquare.gui.util.Profiler" level="TRACE"/>
<logger name="io.bitsquare.gui.UIModel" level="WARN"/>
<logger name="io.bitsquare.gui.PresentationModel" level="WARN"/>
<logger name="io.bitsquare.gui.ViewController" level="WARN"/>
<logger name="io.bitsquare.gui.ViewCB" level="WARN"/>
<logger name="io.bitsquare.gui.CachedViewCB" level="WARN"/>
<logger name="io.bitsquare.gui.util.Profiler" level="WARN"/>
<logger name="io.bitsquare.persistence.Persistence" level="WARN"/>
<logger name="org.bitcoinj.core.BitcoinSerializer" level="WARN"/>
<logger name="org.bitcoinj.core.AbstractBlockChain" level="WARN"/>
<logger name="org.bitcoinj.wallet.DeterministicKeyChain" level="WARN"/>
<logger name="io.bitsquare.btc.WalletService" level="WARN"/>
<!--
<logger name="org.bitcoinj.core.Wallet" level="OFF"/>

View File

@ -33,7 +33,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import net.tomp2p.connection.Bindings;
import net.tomp2p.connection.ChannelClientConfiguration;
import net.tomp2p.connection.StandardProtocolFamily;
import net.tomp2p.dht.FutureGet;
import net.tomp2p.dht.FuturePut;
@ -54,8 +53,6 @@ import net.tomp2p.p2p.Peer;
import net.tomp2p.p2p.PeerBuilder;
import net.tomp2p.peers.Number160;
import net.tomp2p.peers.PeerAddress;
import net.tomp2p.peers.PeerMap;
import net.tomp2p.peers.PeerMapConfiguration;
import net.tomp2p.relay.RelayConfig;
import net.tomp2p.storage.Data;
@ -529,14 +526,19 @@ public class TomP2PTests {
private Peer bootstrapDirectConnection(int clientPort) {
Peer peer = null;
try {
Number160 peerId = Number160.createHash(UUID.randomUUID().toString());
/* Number160 peerId = Number160.createHash(UUID.randomUUID().toString());
PeerMapConfiguration pmc = new PeerMapConfiguration(peerId).peerNoVerification();
PeerMap pm = new PeerMap(pmc);
ChannelClientConfiguration cc = PeerBuilder.createDefaultChannelClientConfiguration();
cc.maxPermitsTCP(100);
cc.maxPermitsUDP(100);
peer = new PeerBuilder(peerId).bindings(getBindings()).channelClientConfiguration(cc).peerMap(pm)
.ports(clientPort).start();*/
Number160 peerId = Number160.createHash(UUID.randomUUID().toString());
peer = new PeerBuilder(peerId).bindings(getBindings())
.ports(clientPort).start();
FutureDiscover futureDiscover = peer.discover().peerAddress(BOOTSTRAP_NODE_ADDRESS).start();
futureDiscover.awaitUninterruptibly();
if (futureDiscover.isSuccess()) {