Fix problem with double runlater at p2p state change

This commit is contained in:
Manfred Karrer 2014-11-27 01:37:56 +01:00
parent 970cd776f3
commit 635f839f78
4 changed files with 30 additions and 18 deletions

View File

@ -157,8 +157,10 @@ public class MainView extends ActivatableView<StackPane, MainViewModel> {
Platform.runLater( Platform.runLater(
() -> model.initBackend().subscribe( () -> model.initBackend().subscribe(
next -> { }, next -> {
error -> { }, },
error -> {
},
() -> Platform.runLater(() -> { () -> Platform.runLater(() -> {
bankAccountComboBoxHolder.getChildren().setAll(createBankAccountComboBox()); bankAccountComboBoxHolder.getChildren().setAll(createBankAccountComboBox());

View File

@ -159,25 +159,32 @@ class MainViewModel implements ViewModel {
() -> Platform.runLater(() -> networkSyncProgress.set(1.0))); () -> Platform.runLater(() -> networkSyncProgress.set(1.0)));
Observable<BootstrapState> message = messageService.init(); Observable<BootstrapState> message = messageService.init();
message.publish();
message.subscribe( message.subscribe(
state -> Platform.runLater(() -> bootstrapState.set(state)), state ->
error -> log.error(error.toString())); Platform.runLater(() -> bootstrapState.set(state)),
error -> log.error(error.toString()),
() -> log.trace("message completed"));
Observable<Object> wallet = walletService.initialize(Platform::runLater); Observable<Object> wallet = walletService.initialize(Platform::runLater);
wallet.subscribe( wallet.subscribe(
next -> { }, next -> {
},
error -> Platform.runLater(() -> walletServiceException.set(error)), error -> Platform.runLater(() -> walletServiceException.set(error)),
() -> { }); () -> log.trace("wallet completed"));
Observable<?> backend = Observable.merge(message, wallet); Observable<?> backend = Observable.merge(message, wallet);
backend.subscribe( backend.subscribe(
next -> { }, next -> {
error -> { }, },
error -> log.error(error.toString()),
() -> Platform.runLater(() -> { () -> Platform.runLater(() -> {
log.trace("backend completed");
tradeManager.getPendingTrades().addListener( tradeManager.getPendingTrades().addListener(
(MapChangeListener<String, Trade>) change -> updateNumPendingTrades()); (MapChangeListener<String, Trade>) change -> updateNumPendingTrades());
updateNumPendingTrades(); updateNumPendingTrades();
})); })
);
return backend; return backend;
} }
@ -207,6 +214,7 @@ class MainViewModel implements ViewModel {
private void updateNumPendingTrades() { private void updateNumPendingTrades() {
log.debug("updateNumPendingTrades " + tradeManager.getPendingTrades().size());
numPendingTrades.set(tradeManager.getPendingTrades().size()); numPendingTrades.set(tradeManager.getPendingTrades().size());
} }

View File

@ -33,7 +33,6 @@ import java.security.KeyPair;
import javax.inject.Inject; import javax.inject.Inject;
import javafx.application.Platform;
import javafx.beans.property.ObjectProperty; import javafx.beans.property.ObjectProperty;
import javafx.beans.property.SimpleObjectProperty; import javafx.beans.property.SimpleObjectProperty;
@ -331,7 +330,7 @@ class BootstrappedPeerFactory {
log.error(message); log.error(message);
bootstrapState.setMessage(message); bootstrapState.setMessage(message);
Platform.runLater(() -> this.bootstrapState.set(bootstrapState)); this.bootstrapState.set(bootstrapState);
} }
private void handleError(BootstrapState state, String errorMessage) { private void handleError(BootstrapState state, String errorMessage) {

View File

@ -114,9 +114,12 @@ public class TomP2PNode implements ClientNode {
this.keyPair = keyPair; this.keyPair = keyPair;
bootstrappedPeerFactory.setKeyPair(keyPair); bootstrappedPeerFactory.setKeyPair(keyPair);
Subject<BootstrapState, BootstrapState> bootstrapState = BehaviorSubject.create(); Subject<BootstrapState, BootstrapState> bootstrapStateSubject = BehaviorSubject.create();
bootstrappedPeerFactory.getBootstrapState().addListener((ov, oldValue, newValue) -> bootstrapState.onNext(newValue)); bootstrappedPeerFactory.getBootstrapState().addListener((ov, oldValue, newValue) -> {
log.debug("BootstrapState changed " + newValue);
bootstrapStateSubject.onNext(newValue);
});
SettableFuture<PeerDHT> bootstrapFuture = bootstrappedPeerFactory.start(); SettableFuture<PeerDHT> bootstrapFuture = bootstrappedPeerFactory.start();
Futures.addCallback(bootstrapFuture, new FutureCallback<PeerDHT>() { Futures.addCallback(bootstrapFuture, new FutureCallback<PeerDHT>() {
@ -129,24 +132,24 @@ public class TomP2PNode implements ClientNode {
try { try {
storeAddress(); storeAddress();
} catch (NetworkException e) { } catch (NetworkException e) {
bootstrapState.onError(e); bootstrapStateSubject.onError(e);
} }
bootstrapState.onCompleted(); bootstrapStateSubject.onCompleted();
} }
else { else {
log.error("Error at bootstrap: peerDHT = null"); log.error("Error at bootstrap: peerDHT = null");
bootstrapState.onError(new BitsquareException("Error at bootstrap: peerDHT = null")); bootstrapStateSubject.onError(new BitsquareException("Error at bootstrap: peerDHT = null"));
} }
} }
@Override @Override
public void onFailure(@NotNull Throwable t) { public void onFailure(@NotNull Throwable t) {
log.error("Exception at bootstrap " + t.getMessage()); log.error("Exception at bootstrap " + t.getMessage());
bootstrapState.onError(t); bootstrapStateSubject.onError(t);
} }
}); });
return bootstrapState.asObservable(); return bootstrapStateSubject.asObservable();
} }
public void shutDown() { public void shutDown() {