Remove double future callback in TomP2PNode, refactoring, cleanup

This commit is contained in:
Manfred Karrer 2014-11-13 12:12:49 +01:00
parent dc3911883c
commit a134c85f24
5 changed files with 74 additions and 110 deletions

View File

@ -21,7 +21,6 @@ import io.bitsquare.network.BootstrapState;
import io.bitsquare.network.Node;
import io.bitsquare.persistence.Persistence;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.name.Named;
@ -79,9 +78,9 @@ public class BootstrappedPeerFactory {
static final String NETWORK_INTERFACE_UNSPECIFIED = "<unspecified>";
private KeyPair keyPair;
private int port;
private final int port;
private final Node bootstrapNode;
private String networkInterface;
private final String networkInterface;
private final Persistence persistence;
private final SettableFuture<PeerDHT> settableFuture = SettableFuture.create();
@ -119,7 +118,7 @@ public class BootstrappedPeerFactory {
// Public methods
///////////////////////////////////////////////////////////////////////////////////////////
public ListenableFuture<PeerDHT> start() {
public SettableFuture<PeerDHT> start() {
try {
setState(BootstrapState.PEER_CREATION, "We create a P2P node.");
@ -193,7 +192,7 @@ public class BootstrappedPeerFactory {
break;
}
} catch (IOException e) {
handleError(BootstrapState.PEER_CREATION, "Cannot create peer with port: " + port + ". Exeption: " + e);
handleError(BootstrapState.PEER_CREATION, "Cannot create peer with port: " + port + ". Exception: " + e);
}
return settableFuture;

View File

@ -55,8 +55,6 @@ import net.tomp2p.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static io.bitsquare.util.tomp2p.BaseFutureUtil.isSuccess;
/**
* That service delivers direct messaging and DHT functionality from the TomP2P library
@ -95,7 +93,7 @@ class TomP2PMessageService implements MessageService {
public void init(BootstrapListener bootstrapListener) {
p2pNode.setMessageBroker(this);
p2pNode.setKeyPair(user.getMessageKeyPair());
p2pNode.start(bootstrapListener);
p2pNode.bootstrap(bootstrapListener);
}
public void shutDown() {
@ -142,7 +140,7 @@ class TomP2PMessageService implements MessageService {
futureDirect.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
if (isSuccess(futureDirect)) {
if (future.isSuccess()) {
Platform.runLater(listener::onResult);
}
else {
@ -185,7 +183,7 @@ class TomP2PMessageService implements MessageService {
}
}));
if (isSuccess(addFuture)) {
if (future.isSuccess()) {
log.trace("Add arbitrator to DHT was successful. Stored data: [key: " + locationKey + ", " +
"values: " + arbitratorData + "]");
}
@ -236,7 +234,7 @@ class TomP2PMessageService implements MessageService {
FutureGet futureGet = p2pNode.getDataMap(locationKey);
futureGet.addListener(new BaseFutureAdapter<BaseFuture>() {
@Override
public void operationComplete(BaseFuture baseFuture) throws Exception {
public void operationComplete(BaseFuture future) throws Exception {
Platform.runLater(() -> arbitratorListeners.stream().forEach(listener ->
{
List<Arbitrator> arbitrators = new ArrayList<>();
@ -254,12 +252,12 @@ class TomP2PMessageService implements MessageService {
listener.onArbitratorsReceived(arbitrators);
}));
if (isSuccess(baseFuture)) {
if (future.isSuccess()) {
log.trace("Get arbitrators from DHT was successful. Stored data: [key: " + locationKey + ", " +
"values: " + futureGet.dataMap() + "]");
}
else {
log.error("Get arbitrators from DHT failed with reason:" + baseFuture.failedReason());
log.error("Get arbitrators from DHT failed with reason:" + future.failedReason());
}
}
});

View File

@ -17,13 +17,14 @@
package io.bitsquare.msg.tomp2p;
import io.bitsquare.BitsquareException;
import io.bitsquare.msg.MessageBroker;
import io.bitsquare.msg.listeners.BootstrapListener;
import io.bitsquare.network.tomp2p.TomP2PPeer;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
@ -58,7 +59,7 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static io.bitsquare.util.tomp2p.BaseFutureUtil.isSuccess;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* The fully bootstrapped P2PNode which is responsible himself for his availability in the messaging system. It saves
@ -110,28 +111,42 @@ public class TomP2PNode {
bootstrappedPeerFactory.setKeyPair(keyPair);
}
public void start(BootstrapListener bootstrapListener) {
setupTimerForIPCheck();
public void bootstrap(BootstrapListener bootstrapListener) {
checkNotNull(keyPair, "keyPair must not be null.");
checkNotNull(messageBroker, "messageBroker must not be null.");
ListenableFuture<PeerDHT> bootstrapComplete = bootstrap();
Futures.addCallback(bootstrapComplete, new FutureCallback<PeerDHT>() {
bootstrappedPeerFactory.bootstrapState.addListener((ov, oldValue, newValue) ->
bootstrapListener.onBootstrapStateChanged(newValue));
SettableFuture<PeerDHT> bootstrapFuture = bootstrappedPeerFactory.start();
Futures.addCallback(bootstrapFuture, new FutureCallback<PeerDHT>() {
@Override
public void onSuccess(@Nullable PeerDHT result) {
log.debug("p2pNode.start success result = " + result);
Platform.runLater(bootstrapListener::onCompleted);
public void onSuccess(@Nullable PeerDHT peerDHT) {
if (peerDHT != null) {
TomP2PNode.this.peerDHT = peerDHT;
setup();
Platform.runLater(bootstrapListener::onCompleted);
}
else {
log.error("Error at bootstrap: peerDHT = null");
Platform.runLater(() -> bootstrapListener.onFailed(
new BitsquareException("Error at bootstrap: peerDHT = null")));
}
}
@Override
public void onFailure(@NotNull Throwable t) {
log.error(t.toString());
log.error("Exception at bootstrap " + t.getMessage());
Platform.runLater(() -> bootstrapListener.onFailed(t));
}
});
bootstrappedPeerFactory.bootstrapState.addListener((ov, oldValue, newValue) ->
bootstrapListener.onBootstrapStateChanged(newValue));
}
private void setup() {
setupTimerForIPCheck();
setupReplyHandler();
storeAddressAfterBootstrap();
}
public void shutDown() {
bootstrappedPeerFactory.shutDown();
@ -144,6 +159,7 @@ public class TomP2PNode {
return peerDHT;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Generic DHT methods
///////////////////////////////////////////////////////////////////////////////////////////
@ -194,7 +210,7 @@ public class TomP2PNode {
futureDirect.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
if (isSuccess(futureDirect)) {
if (future.isSuccess()) {
log.debug("sendMessage completed");
}
else {
@ -291,51 +307,6 @@ public class TomP2PNode {
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private ListenableFuture<PeerDHT> bootstrap() {
ListenableFuture<PeerDHT> bootstrapComplete = bootstrappedPeerFactory.start();
Futures.addCallback(bootstrapComplete, new FutureCallback<PeerDHT>() {
@Override
public void onSuccess(@Nullable PeerDHT peerDHT) {
try {
if (peerDHT != null) {
TomP2PNode.this.peerDHT = peerDHT;
setupReplyHandler();
FuturePut futurePut = storePeerAddress();
futurePut.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
if (isSuccess(futurePut)) {
storedPeerAddress = peerDHT.peerAddress();
log.debug("storedPeerAddress = " + storedPeerAddress);
}
else {
log.error("storedPeerAddress not successful");
}
}
@Override
public void exceptionCaught(Throwable t) throws Exception {
log.error("Error at storedPeerAddress " + t.toString());
}
});
}
else {
log.error("peerDHT is null");
}
} catch (IOException e) {
e.printStackTrace();
log.error("Error at storePeerAddress " + e.toString());
}
}
@Override
public void onFailure(@NotNull Throwable t) {
log.error("onFailure bootstrap " + t.toString());
}
});
return bootstrapComplete;
}
private void setupReplyHandler() {
peerDHT.peer().objectDataReply((sender, request) -> {
log.debug("handleMessage peerAddress " + sender);
@ -360,7 +331,7 @@ public class TomP2PNode {
public void run() {
if (peerDHT != null && !storedPeerAddress.equals(peerDHT.peerAddress())) {
try {
storePeerAddress();
storeAddress();
} catch (IOException e) {
e.printStackTrace();
log.error(e.toString());
@ -370,7 +341,33 @@ public class TomP2PNode {
}, checkIfIPChangedPeriod, checkIfIPChangedPeriod);
}
private FuturePut storePeerAddress() throws IOException {
private void storeAddressAfterBootstrap() {
try {
FuturePut futurePut = storeAddress();
futurePut.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
if (future.isSuccess()) {
storedPeerAddress = peerDHT.peerAddress();
log.debug("storedPeerAddress = " + storedPeerAddress);
}
else {
log.error("storedPeerAddress not successful");
}
}
@Override
public void exceptionCaught(Throwable t) throws Exception {
log.error("Error at storedPeerAddress " + t.toString());
}
});
} catch (IOException e) {
e.printStackTrace();
log.error("Error at storePeerAddress " + e.toString());
}
}
private FuturePut storeAddress() throws IOException {
Number160 locationKey = Utils.makeSHAHash(keyPair.getPublic().getEncoded());
Data data = new Data(new TomP2PPeer(peerDHT.peerAddress()));
log.debug("storePeerAddress " + peerDHT.peerAddress().toString());

View File

@ -48,8 +48,6 @@ import net.tomp2p.storage.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static io.bitsquare.util.tomp2p.BaseFutureUtil.isSuccess;
class TomP2POfferRepository implements OfferRepository {
private static final Logger log = LoggerFactory.getLogger(TomP2POfferRepository.class);
@ -79,7 +77,7 @@ class TomP2POfferRepository implements OfferRepository {
futurePut.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
if (isSuccess(future)) {
if (future.isSuccess()) {
Platform.runLater(() -> {
resultHandler.handleResult();
offerRepositoryListeners.stream().forEach(listener -> {
@ -168,8 +166,8 @@ class TomP2POfferRepository implements OfferRepository {
FutureGet futureGet = p2pNode.getDataMap(locationKey);
futureGet.addListener(new BaseFutureAdapter<BaseFuture>() {
@Override
public void operationComplete(BaseFuture baseFuture) throws Exception {
if (isSuccess(baseFuture)) {
public void operationComplete(BaseFuture future) throws Exception {
if (future.isSuccess()) {
final Map<Number640, Data> dataMap = futureGet.dataMap();
final List<Offer> offers = new ArrayList<>();
if (dataMap != null) {
@ -199,7 +197,7 @@ class TomP2POfferRepository implements OfferRepository {
listener.onOffersReceived(new ArrayList<>())));
}
else {
log.error("Get offers from DHT was not successful with reason:" + baseFuture.failedReason());
log.error("Get offers from DHT was not successful with reason:" + future.failedReason());
}
}
}
@ -234,7 +232,7 @@ class TomP2POfferRepository implements OfferRepository {
putFuture.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
if (isSuccess(putFuture))
if (future.isSuccess())
log.trace("Update invalidationTimestamp to DHT was successful. TimeStamp=" +
invalidationTimestamp.get());
else
@ -261,7 +259,7 @@ class TomP2POfferRepository implements OfferRepository {
getFuture.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
if (isSuccess(getFuture)) {
if (future.isSuccess()) {
Data data = getFuture.data();
if (data != null && data.object() instanceof Long) {
final Object object = data.object();

View File

@ -1,28 +0,0 @@
/*
* This file is part of Bitsquare.
*
* Bitsquare is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bitsquare is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.util.tomp2p;
import net.tomp2p.futures.BaseFuture;
public class BaseFutureUtil {
// Isolate the success handling as there is bug in port forwarding mode
public static boolean isSuccess(BaseFuture baseFuture) {
return baseFuture.isSuccess();
}
}