Move setupReplyHandler to MessageService

This commit is contained in:
Manfred Karrer 2015-03-20 01:22:13 +01:00
parent 23a868f8fc
commit 5b090cf7b8
10 changed files with 63 additions and 51 deletions

View File

@ -26,6 +26,7 @@ import io.bitsquare.gui.components.Popups;
import io.bitsquare.gui.main.MainView;
import io.bitsquare.gui.main.debug.DebugView;
import io.bitsquare.gui.util.ImageUtil;
import io.bitsquare.p2p.tomp2p.TomP2PService;
import io.bitsquare.persistence.Persistence;
import io.bitsquare.user.AccountSettings;
import io.bitsquare.user.User;
@ -39,6 +40,7 @@ import com.google.inject.Injector;
import java.io.IOException;
import javafx.application.Application;
import javafx.application.Platform;
import javafx.scene.*;
import javafx.scene.image.*;
import javafx.scene.input.*;
@ -72,11 +74,12 @@ public class BitsquareApp extends Application {
this.primaryStage = primaryStage;
log.trace("BitsquareApp.start");
TomP2PService.setUserThread(Platform::runLater);
bitsquareAppModule = new BitsquareAppModule(env, primaryStage);
injector = Guice.createInjector(bitsquareAppModule);
injector.getInstance(InjectorViewFactory.class).setInjector(injector);
// route uncaught exceptions to a user-facing dialog
Thread.currentThread().setUncaughtExceptionHandler((thread, throwable) ->

View File

@ -157,7 +157,7 @@ class MainViewModel implements ViewModel {
error -> log.error(error.toString()),
() -> Platform.runLater(() -> setBitcoinNetworkSyncProgress(1.0)));
Observable<BootstrapState> bootstrapStateAsObservable = clientNode.bootstrap(user.getMessageKeyPair(), messageService);
Observable<BootstrapState> bootstrapStateAsObservable = clientNode.bootstrap(user.getMessageKeyPair());
bootstrapStateAsObservable.publish();
bootstrapStateAsObservable.subscribe(
state -> Platform.runLater(() -> setBootstrapState(state)),

View File

@ -28,5 +28,5 @@ public interface ClientNode {
Node getBootstrapNodeAddress();
public Observable<BootstrapState> bootstrap(KeyPair keyPair, MessageHandler messageHandler);
public Observable<BootstrapState> bootstrap(KeyPair keyPair);
}

View File

@ -20,7 +20,7 @@ package io.bitsquare.p2p;
import io.bitsquare.p2p.listener.SendMessageListener;
public interface MessageService extends P2PService, MessageHandler {
public interface MessageService extends P2PService {
void sendMessage(Peer peer, Message message, SendMessageListener listener);

View File

@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
public class TomP2PAddressService extends TomP2PDHTService implements AddressService {
private static final Logger log = LoggerFactory.getLogger(TomP2PAddressService.class);
private static final int IP_CHECK_PERIOD = 2 * 60 * 1000; // Cheap call if nothing changes, so set it short to 2 min.
private static final int STORE_ADDRESS_PERIOD = 5 * 60 * 1000; // Save every 5 min.
private static final int ADDRESS_TTL = STORE_ADDRESS_PERIOD * 2; // TTL 10 min.

View File

@ -38,7 +38,7 @@ public class TomP2PMessageService extends TomP2PService implements MessageServic
private static final Logger log = LoggerFactory.getLogger(TomP2PMessageService.class);
private final CopyOnWriteArrayList<MessageHandler> messageHandlers = new CopyOnWriteArrayList<>();
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
@ -50,6 +50,12 @@ public class TomP2PMessageService extends TomP2PService implements MessageServic
}
@Override
public void bootstrapCompleted() {
setupReplyHandler();
}
///////////////////////////////////////////////////////////////////////////////////////////
// MessageService implementation
///////////////////////////////////////////////////////////////////////////////////////////
@ -92,17 +98,29 @@ public class TomP2PMessageService extends TomP2PService implements MessageServic
if (!messageHandlers.remove(listener))
throw new IllegalArgumentException("Try to remove listener which was never added.");
}
///////////////////////////////////////////////////////////////////////////////////////////
// MessageHandler implementation
// Private
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void handleMessage(Message message, Peer sender) {
if (sender instanceof TomP2PPeer)
executor.execute(() -> messageHandlers.stream().forEach(e -> e.handleMessage((Message) message, sender)));
else
throw new IllegalArgumentException("Peer must be of type TomP2PPeer");
private void setupReplyHandler() {
peerDHT.peer().objectDataReply((sender, message) -> {
log.debug("handleMessage peerAddress " + sender);
log.debug("handleMessage message " + message);
if (!sender.equals(peerDHT.peer().peerAddress())) {
if (message instanceof Message)
executor.execute(() -> messageHandlers.stream().forEach(e -> e.handleMessage((Message) message, new TomP2PPeer(sender))));
else
throw new RuntimeException("We got an object which is not type of Message. That must never happen. Request object = " + message);
}
else {
throw new RuntimeException("Received msg from myself. That must never happen.");
}
return true;
});
}
}

View File

@ -21,8 +21,6 @@ import io.bitsquare.BitsquareException;
import io.bitsquare.p2p.BootstrapState;
import io.bitsquare.p2p.ClientNode;
import io.bitsquare.p2p.ConnectionType;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.MessageHandler;
import io.bitsquare.p2p.Node;
import com.google.common.util.concurrent.FutureCallback;
@ -77,10 +75,9 @@ public class TomP2PNode implements ClientNode {
// Public methods
///////////////////////////////////////////////////////////////////////////////////////////
public Observable<BootstrapState> bootstrap(KeyPair keyPair, MessageHandler messageHandler) {
public Observable<BootstrapState> bootstrap(KeyPair keyPair) {
bootstrappedPeerBuilder.setKeyPair(keyPair);
bootstrappedPeerBuilder.getBootstrapState().addListener((ov, oldValue, newValue) -> {
log.debug("BootstrapState changed " + newValue);
bootstrapStateSubject.onNext(newValue);
@ -92,7 +89,6 @@ public class TomP2PNode implements ClientNode {
public void onSuccess(@Nullable PeerDHT peerDHT) {
if (peerDHT != null) {
TomP2PNode.this.peerDHT = peerDHT;
setupReplyHandler(messageHandler);
bootstrapStateSubject.onCompleted();
}
else {
@ -149,30 +145,4 @@ public class TomP2PNode implements ClientNode {
public Node getBootstrapNodeAddress() {
return bootstrappedPeerBuilder.getBootstrapNode();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void setupReplyHandler(MessageHandler messageHandler) {
peerDHT.peer().objectDataReply((sender, request) -> {
log.debug("handleMessage peerAddress " + sender);
log.debug("handleMessage message " + request);
if (!sender.equals(peerDHT.peer().peerAddress())) {
if (request instanceof Message)
messageHandler.handleMessage((Message) request, new TomP2PPeer(sender));
else
throw new RuntimeException("We got an object which is not type of Message. That must never happen. Request object = " + request);
}
else {
throw new RuntimeException("Received msg from myself. That must never happen.");
}
return true;
});
}
}

View File

@ -24,8 +24,6 @@ import java.util.concurrent.Executor;
import javax.inject.Inject;
import javafx.application.Platform;
import net.tomp2p.dht.PeerDHT;
import org.slf4j.Logger;
@ -45,9 +43,16 @@ import rx.Subscriber;
public class TomP2PService implements P2PService {
private static final Logger log = LoggerFactory.getLogger(TomP2PService.class);
private static Executor userThread;
// Set to Platform::runLater from app to get all callbacks on the userThread
public static void setUserThread(Executor userThread) {
TomP2PService.userThread = userThread;
}
private final Subscriber<BootstrapState> subscriber;
protected Executor executor = Platform::runLater;
protected Executor executor = userThread;
protected PeerDHT peerDHT;

View File

@ -64,6 +64,8 @@ public class Trade implements Serializable {
private final Offer offer;
private final Date date;
private State state;
private Coin tradeAmount;
private Contract contract;
private String contractAsJson;
private String takerContractSignature;
@ -71,9 +73,6 @@ public class Trade implements Serializable {
private Transaction depositTx;
private Transaction payoutTx;
private Coin tradeAmount;
private State state;
// For changing values we use properties to get binding support in the UI (table)
// When serialized those transient properties are not instantiated, so we instantiate them in the getters at first
// access. Only use the accessor not the private field.
@ -211,4 +210,20 @@ public class Trade implements Serializable {
return _state;
}
@Override
public String toString() {
return "Trade{" +
"offer=" + offer +
", date=" + date +
", state=" + state +
", tradeAmount=" + tradeAmount +
", contract=" + contract +
", contractAsJson='" + contractAsJson + '\'' +
", takerContractSignature='" + takerContractSignature + '\'' +
", offererContractSignature='" + offererContractSignature + '\'' +
", depositTx=" + depositTx +
", payoutTx=" + payoutTx +
'}';
}
}

View File

@ -98,7 +98,7 @@ public class PlaceOfferProtocolTest {
tomP2PNode = new TomP2PNode(bootstrappedPeerBuilder);
messageService = new TomP2PMessageService(tomP2PNode);
Observable<BootstrapState> messageObservable = tomP2PNode.bootstrap(user.getMessageKeyPair(), messageService);
Observable<BootstrapState> messageObservable = tomP2PNode.bootstrap(user.getMessageKeyPair());
messageObservable.publish();
messageObservable.subscribe(
state -> log.trace("state changed: " + state),