Improve service initialization coordination using rx.Observable

This change introduces the use of RxJava's Observable [1] to redesign
how we work with non-deterministic and/or event-based information, such
as: connecting to peer-to-peer infrastructure, synchronizing the bitcoin
blockchain, and so on.

Prior to this commit, these activities were initiated in methods like
WalletService#initialize and TomP2PMessageService#init. These methods
accepted 'listener' interfaces, and these listeners' callback methods
would be invoked whenever work progressed, completed, or failed.

This approach required significant coordination logic, which, prior to
this commit, was found primarily in MainModel#initBackend. A primary
goal of the logic found here was to determine when the backend was
"ready". This state was represented in MainModel's `backendReady` field,
which would be set to true once the following three conditions were
satisfied:

 1. the message service had finished initialization
 2. the wallet service had finished initialization, and
 3. the blockchain synchronization had reached 100%

Monitoring these three states was complex, and required hard-to-follow
conditional logic spread across a number of locations in the code. In
any case, however, once these three conditions were satisfied and
backendReady's value was set to true, a listener on the backendReady
field (in MainViewCB#doInitialize) would then populate combo boxes and
pending trade counts in the main view and cause the splash screen to
fade out, rendering the application ready for user interaction.

The introduction of rx.Observable is designed to achieve the same
show-the-splash-screen-until-everything-is-ready functionality described
above, without the complex monitoring, conditional logic and nested
callbacks. This is achieved by modeling each process as an Observable
stream of events. Observables in RxJava can emit any number of events,
and can complete either normally or with an error.

These observables may be 'subscribed' to by any number of subscribers,
and events emitted can be acted upon by instructing the subscriber what
to do `onNext`, `onCompleted`, and `onError`. So for example
WalletService now exposes an Observable<Double> called bootstrapState.
This Observable is subscribed to in MainModel#initBackend in such a way
that every time it emits a new double value (i.e. a new percentage), the
various bootstrap state text labels and progress indicators are updated
accordingly.

Where it gets really interesting, however, is when Observables are
combined. The primary complexity described above is coordinating the
fading out of the splash screen with the completed initialization of all
backend services. As can now be seen in MainModel#initBackend, the
wallet service and message service Observables are simply "merged" into
a single observable and returned. From the MainViewCB side, this "single
backend observable" is subscribed to and, when it completes (i.e. when
all the underlying Observables complete), then combo boxes and pending
trade counts are populated and the splash screen is faded out.

Understanding RxJava, Observables, and the principles of "Functional
Reactive Programming" takes time. It is a paradigm shift in dealing with
concurrency and non-determinism, but one that ultimately rewards those
who take the time. In the end, I believe it's use will result in a
significantly more concise and robust internal architecture for
Bitsquare, and using RxJava's lightweight, well-adopted and
infrastructure-agnostic API leaves us open to using Akka or other more
sophisticated infrastructure later without tying ourselves to those
specific APIs (because virtually anything can be modeled as an
Observable). Achieve these benifits means that core committers will need
to understand how RxJava works, how to think about it, and how to design
using it. I have spent the better part of the last week getting to know
it, and I am certainly still learning. I can recommend many resources to
aid in this process, but having gone through it myself, I recommend that
everyone read at least [1] and [2] first.

[1]: https://github.com/ReactiveX/RxJava/wiki/Observable
[2]: [The introduction to Reactive Programming you've been
missing](https://gist.github.com/staltz/868e7e9bc2a7b8c1f754)
This commit is contained in:
Chris Beams 2014-11-18 11:57:50 +01:00
parent 3c3d3a507c
commit 67295aea55
No known key found for this signature in database
GPG Key ID: 3D214F8F5BC5ED73
8 changed files with 145 additions and 220 deletions

View File

@ -41,6 +41,7 @@ repositories {
dependencies {
compile 'org.bitcoinj:bitcoinj-core:0.12'
compile 'net.tomp2p:tomp2p-all:5.0-Alpha.60850a9-SNAPSHOT'
compile 'io.reactivex:rxjava:1.0.0-rc.12'
compile 'org.springframework:spring-core:4.1.1.RELEASE'
compile 'net.sf.jopt-simple:jopt-simple:4.8'
compile 'org.slf4j:slf4j-api:1.7.7'

View File

@ -23,6 +23,7 @@ import io.bitsquare.btc.listeners.TxConfidenceListener;
import io.bitsquare.crypto.SignatureService;
import io.bitsquare.persistence.Persistence;
import org.bitcoinj.core.AbstractWalletEventListener;
import org.bitcoinj.core.Address;
import org.bitcoinj.core.AddressFormatException;
import org.bitcoinj.core.Coin;
@ -85,6 +86,10 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.subjects.BehaviorSubject;
import rx.subjects.Subject;
import static org.bitcoinj.script.ScriptOpCodes.OP_RETURN;
/**
@ -103,6 +108,10 @@ public class WalletService {
private final List<BalanceListener> balanceListeners = new CopyOnWriteArrayList<>();
private final ReentrantLock lock = Threading.lock(LOCK_NAME);
private final ObservableDownloadListener downloadListener = new ObservableDownloadListener();
private final Observable<Double> downloadProgress = downloadListener.getObservable();
private final WalletEventListener walletEventListener = new BitsquareWalletEventListener();
private final NetworkParameters params;
private final FeePolicy feePolicy;
private final SignatureService signatureService;
@ -113,7 +122,6 @@ public class WalletService {
private WalletAppKit walletAppKit;
private Wallet wallet;
private WalletEventListener walletEventListener;
private AddressEntry registrationAddressEntry;
private AddressEntry arbitratorDepositAddressEntry;
private @GuardedBy(LOCK_NAME) List<AddressEntry> addressEntryList = new ArrayList<>();
@ -140,8 +148,9 @@ public class WalletService {
// Public Methods
///////////////////////////////////////////////////////////////////////////////////////////
public void initialize(Executor executor, BlockchainDownloadListener blockchainDownloadListener,
StartupListener startupListener) {
public Observable<Object> initialize(Executor executor) {
Subject<Object, Object> status = BehaviorSubject.create();
// Tell bitcoinj to execute event handlers on the JavaFX UI thread. This keeps things simple and means
// we cannot forget to switch threads when adding event handlers. Unfortunately, the DownloadListener
// we give to the app kit is currently an exception and runs on a library thread. It'll get fixed in
@ -159,7 +168,7 @@ public class WalletService {
walletAppKit.peerGroup().setMaxConnections(11);
walletAppKit.peerGroup().setBloomFilterFalsePositiveRate(0.00001);
initWallet();
executor.execute(() -> startupListener.completed());
status.onCompleted();
}
};
// Now configure and start the appkit. This will take a second or two - we could show a temporary splash screen
@ -186,21 +195,6 @@ public class WalletService {
//walletAppKit.useTor();
}
// DownloadListener does not run yet in a user thread, so we map it our self
DownloadListener downloadListener = new DownloadListener() {
@Override
protected void progress(double percentage, int blocksLeft, Date date) {
super.progress(percentage, blocksLeft, date);
executor.execute(() -> blockchainDownloadListener.progress(percentage));
}
@Override
protected void doneDownload() {
super.doneDownload();
executor.execute(() -> blockchainDownloadListener.doneDownload());
}
};
walletAppKit.setDownloadListener(downloadListener)
.setBlockingStartup(false)
.setUserAgent(userAgent.getName(), userAgent.getVersion());
@ -215,51 +209,16 @@ public class WalletService {
@Override
public void failed(@NotNull Service.State from, @NotNull Throwable failure) {
walletAppKit = null;
startupListener.failed(failure);
status.onError(failure);
}
}, Threading.USER_THREAD);
walletAppKit.startAsync();
return status.mergeWith(downloadProgress);
}
private void initWallet() {
wallet = walletAppKit.wallet();
walletEventListener = new WalletEventListener() {
@Override
public void onCoinsReceived(Wallet wallet, Transaction tx, Coin prevBalance, Coin newBalance) {
notifyBalanceListeners();
}
@Override
public void onCoinsSent(Wallet wallet, Transaction tx, Coin prevBalance, Coin newBalance) {
notifyBalanceListeners();
}
@Override
public void onReorganize(Wallet wallet) {
}
@Override
public void onTransactionConfidenceChanged(Wallet wallet, Transaction tx) {
notifyConfidenceListeners(tx);
}
@Override
public void onWalletChanged(Wallet wallet) {
}
@Override
public void onScriptsAdded(Wallet wallet, List<Script> scripts) {
}
@Override
public void onKeysAdded(List<ECKey> keys) {
}
};
wallet.addEventListener(walletEventListener);
Serializable serializable = persistence.read(this, "addressEntryList");
@ -291,6 +250,10 @@ public class WalletService {
walletAppKit.stopAsync();
}
public Observable<Double> getDownloadProgress() {
return downloadProgress;
}
public Wallet getWallet() {
return wallet;
}
@ -407,17 +370,6 @@ public class WalletService {
}
private void notifyConfidenceListeners(Transaction tx) {
for (AddressConfidenceListener addressConfidenceListener : addressConfidenceListeners) {
List<TransactionConfidence> transactionConfidenceList = new ArrayList<>();
transactionConfidenceList.add(getTransactionConfidence(tx, addressConfidenceListener.getAddress()));
TransactionConfidence transactionConfidence = getMostRecentConfidence(transactionConfidenceList);
addressConfidenceListener.onTransactionConfidenceChanged(transactionConfidence);
}
txConfidenceListeners.stream().filter(txConfidenceListener -> tx.getHashAsString().equals
(txConfidenceListener.getTxID())).forEach(txConfidenceListener -> txConfidenceListener
.onTransactionConfidenceChanged(tx.getConfidence()));
}
private TransactionConfidence getTransactionConfidence(Transaction tx, Address address) {
@ -506,18 +458,6 @@ public class WalletService {
return balance;
}
private void notifyBalanceListeners() {
for (BalanceListener balanceListener : balanceListeners) {
Coin balance;
if (balanceListener.getAddress() != null)
balance = getBalanceForAddress(balanceListener.getAddress());
else
balance = getWalletBalance();
balanceListener.onBalanceChanged(balance);
}
}
public Coin getWalletBalance() {
return wallet.getBalance(Wallet.BalanceType.ESTIMATED);
}
@ -1155,16 +1095,66 @@ public class WalletService {
// Inner classes
///////////////////////////////////////////////////////////////////////////////////////////
public static interface StartupListener {
void completed();
void failed(Throwable failure);
private static class ObservableDownloadListener extends DownloadListener {
private final Subject<Double, Double> subject = BehaviorSubject.create(0d);
@Override
protected void progress(double percentage, int blocksLeft, Date date) {
super.progress(percentage, blocksLeft, date);
subject.onNext(percentage);
}
@Override
protected void doneDownload() {
super.doneDownload();
subject.onCompleted();
}
public Observable<Double> getObservable() {
return subject.asObservable();
}
}
public static interface BlockchainDownloadListener {
void progress(double percentage);
void doneDownload();
private class BitsquareWalletEventListener extends AbstractWalletEventListener {
@Override
public void onCoinsReceived(Wallet wallet, Transaction tx, Coin prevBalance, Coin newBalance) {
notifyBalanceListeners();
}
@Override
public void onCoinsSent(Wallet wallet, Transaction tx, Coin prevBalance, Coin newBalance) {
notifyBalanceListeners();
}
@Override
public void onTransactionConfidenceChanged(Wallet wallet, Transaction tx) {
for (AddressConfidenceListener addressConfidenceListener : addressConfidenceListeners) {
List<TransactionConfidence> transactionConfidenceList = new ArrayList<>();
transactionConfidenceList.add(getTransactionConfidence(tx, addressConfidenceListener.getAddress()));
TransactionConfidence transactionConfidence = getMostRecentConfidence(transactionConfidenceList);
addressConfidenceListener.onTransactionConfidenceChanged(transactionConfidence);
}
txConfidenceListeners.stream()
.filter(txConfidenceListener -> tx.getHashAsString().equals(txConfidenceListener.getTxID()))
.forEach(txConfidenceListener ->
txConfidenceListener.onTransactionConfidenceChanged(tx.getConfidence()));
}
private void notifyBalanceListeners() {
for (BalanceListener balanceListener : balanceListeners) {
Coin balance;
if (balanceListener.getAddress() != null)
balance = getBalanceForAddress(balanceListener.getAddress());
else
balance = getWalletBalance();
balanceListener.onBalanceChanged(balance);
}
}
}
}

View File

@ -23,7 +23,6 @@ import io.bitsquare.btc.WalletService;
import io.bitsquare.gui.Model;
import io.bitsquare.gui.util.BSFormatter;
import io.bitsquare.msg.MessageService;
import io.bitsquare.msg.listeners.BootstrapListener;
import io.bitsquare.network.BootstrapState;
import io.bitsquare.persistence.Persistence;
import io.bitsquare.trade.Trade;
@ -50,10 +49,11 @@ import javafx.util.StringConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
class MainModel implements Model {
private static final Logger log = LoggerFactory.getLogger(MainModel.class);
final BooleanProperty backendReady = new SimpleBooleanProperty();
final DoubleProperty networkSyncProgress = new SimpleDoubleProperty(-1);
final IntegerProperty numPendingTrades = new SimpleIntegerProperty(0);
final ObjectProperty<BootstrapState> bootstrapState = new SimpleObjectProperty<>();
@ -81,10 +81,6 @@ class MainModel implements Model {
private final BitcoinNetwork bitcoinNetwork;
private final BSFormatter formatter;
private boolean messageServiceInited;
private boolean walletServiceInited;
private boolean servicesInitialised;
@Inject
public MainModel(User user, WalletService walletService, MessageService messageService, TradeManager tradeManager,
@ -155,59 +151,35 @@ class MainModel implements Model {
bankAccountsComboBoxPrompt.set(user.getBankAccounts().isEmpty() ? "No accounts" : "");
}
public void initBackend() {
messageService.init(new BootstrapListener() {
@Override
public void onCompleted() {
messageServiceInited = true;
if (walletServiceInited) onServicesInitialised();
}
public Observable<?> initBackend() {
@Override
public void onFailed(Throwable throwable) {
log.error(throwable.toString());
}
walletService.getDownloadProgress().subscribe(
percentage -> Platform.runLater(() -> networkSyncProgress.set(percentage / 100.0)),
error -> Platform.runLater(() -> System.out.println("error = " + error)),
() -> Platform.runLater(() -> networkSyncProgress.set(1.0)));
@Override
public void onBootstrapStateChanged(BootstrapState bootstrapState) {
MainModel.this.bootstrapState.set(bootstrapState);
}
});
Observable<BootstrapState> message = messageService.init();
message.subscribe(
state -> Platform.runLater(() -> bootstrapState.set(state)),
error -> log.error(error.toString()));
WalletService.BlockchainDownloadListener blockchainDownloadListener = new WalletService
.BlockchainDownloadListener() {
@Override
public void progress(double percentage) {
networkSyncProgress.set(percentage / 100.0);
Observable<Object> wallet = walletService.initialize(Platform::runLater);
wallet.subscribe(
next -> { },
error -> Platform.runLater(() -> walletServiceException.set(error)),
() -> { });
if (servicesInitialised && percentage >= 100.0)
backendReady.set(true);
}
Observable<?> backend = Observable.merge(message, wallet);
backend.subscribe(
next -> { },
error -> { },
() -> Platform.runLater(() -> {
tradeManager.getPendingTrades().addListener(
(MapChangeListener<String, Trade>) change -> updateNumPendingTrades());
updateNumPendingTrades();
}));
@Override
public void doneDownload() {
networkSyncProgress.set(1.0);
if (servicesInitialised)
backendReady.set(true);
}
};
WalletService.StartupListener startupListener = new WalletService.StartupListener() {
@Override
public void completed() {
walletServiceInited = true;
if (messageServiceInited)
onServicesInitialised();
}
@Override
public void failed(final Throwable failure) {
walletServiceException.set(failure);
}
};
walletService.initialize(Platform::runLater, blockchainDownloadListener, startupListener);
return backend;
}
@ -234,17 +206,6 @@ class MainModel implements Model {
}
private void onServicesInitialised() {
tradeManager.getPendingTrades().addListener((MapChangeListener<String,
Trade>) change -> updateNumPendingTrades());
updateNumPendingTrades();
servicesInitialised = true;
if (networkSyncProgress.get() >= 1.0)
backendReady.set(true);
}
private void updateNumPendingTrades() {
numPendingTrades.set(tradeManager.getPendingTrades().size());
}

View File

@ -143,22 +143,24 @@ public class MainViewCB extends FxmlController<Pane, MainModel> {
root.getChildren().addAll(baseApplicationContainer, splashScreen);
model.backendReady.addListener((ov1, prev1, ready) -> {
if (!ready)
return;
Platform.runLater(
() -> model.initBackend().subscribe(
next -> { },
error -> { },
() -> Platform.runLater(() -> {
bankAccountComboBoxHolder.getChildren().setAll(createBankAccountComboBox());
bankAccountComboBoxHolder.getChildren().setAll(createBankAccountComboBox());
applyPendingTradesInfoIcon(model.numPendingTrades.get(), portfolioButtonHolder);
model.numPendingTrades.addListener((ov2, prev2, numPendingTrades) ->
applyPendingTradesInfoIcon((int) numPendingTrades, portfolioButtonHolder));
applyPendingTradesInfoIcon(model.numPendingTrades.get(), portfolioButtonHolder);
model.numPendingTrades.addListener((ov2, prev2, numPendingTrades) ->
applyPendingTradesInfoIcon((int) numPendingTrades, portfolioButtonHolder));
navigation.navigateToLastStoredItem();
navigation.navigateToLastStoredItem();
transitions.fadeOutAndRemove(splashScreen, 1500);
});
Platform.runLater(model::initBackend);
transitions.fadeOutAndRemove(splashScreen, 1500);
}
)
)
);
}
private VBox createSplashScreen() {

View File

@ -19,16 +19,18 @@ package io.bitsquare.msg;
import io.bitsquare.arbitrator.Arbitrator;
import io.bitsquare.msg.listeners.ArbitratorListener;
import io.bitsquare.msg.listeners.BootstrapListener;
import io.bitsquare.msg.listeners.GetPeerAddressListener;
import io.bitsquare.msg.listeners.IncomingMessageListener;
import io.bitsquare.msg.listeners.OutgoingMessageListener;
import io.bitsquare.network.BootstrapState;
import io.bitsquare.network.Peer;
import java.security.PublicKey;
import java.util.Locale;
import rx.Observable;
public interface MessageService extends MessageBroker {
void sendMessage(Peer peer, Message message, OutgoingMessageListener listener);
@ -45,7 +47,7 @@ public interface MessageService extends MessageBroker {
void getArbitrators(Locale defaultLanguageLocale);
void init(BootstrapListener bootstrapListener);
Observable<BootstrapState> init();
void getPeerAddress(PublicKey messagePublicKey, GetPeerAddressListener getPeerAddressListener);
}

View File

@ -1,29 +0,0 @@
/*
* This file is part of Bitsquare.
*
* Bitsquare is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bitsquare is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.msg.listeners;
import io.bitsquare.network.BootstrapState;
public interface BootstrapListener {
public void onCompleted();
public void onFailed(Throwable throwable);
public void onBootstrapStateChanged(BootstrapState state);
}

View File

@ -21,10 +21,10 @@ import io.bitsquare.arbitrator.Arbitrator;
import io.bitsquare.msg.Message;
import io.bitsquare.msg.MessageService;
import io.bitsquare.msg.listeners.ArbitratorListener;
import io.bitsquare.msg.listeners.BootstrapListener;
import io.bitsquare.msg.listeners.GetPeerAddressListener;
import io.bitsquare.msg.listeners.IncomingMessageListener;
import io.bitsquare.msg.listeners.OutgoingMessageListener;
import io.bitsquare.network.BootstrapState;
import io.bitsquare.network.Peer;
import io.bitsquare.network.tomp2p.TomP2PPeer;
import io.bitsquare.user.User;
@ -55,6 +55,8 @@ import net.tomp2p.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
/**
* That service delivers direct messaging and DHT functionality from the TomP2P library
@ -90,10 +92,8 @@ class TomP2PMessageService implements MessageService {
// Public Methods
///////////////////////////////////////////////////////////////////////////////////////////
public void init(BootstrapListener bootstrapListener) {
p2pNode.setMessageBroker(this);
p2pNode.setKeyPair(user.getMessageKeyPair());
p2pNode.bootstrap(bootstrapListener);
public Observable<BootstrapState> init() {
return p2pNode.bootstrap(this, user.getMessageKeyPair());
}
public void shutDown() {

View File

@ -19,7 +19,6 @@ package io.bitsquare.msg.tomp2p;
import io.bitsquare.BitsquareException;
import io.bitsquare.msg.MessageBroker;
import io.bitsquare.msg.listeners.BootstrapListener;
import io.bitsquare.network.BootstrapState;
import io.bitsquare.network.ClientNode;
import io.bitsquare.network.ConnectionType;
@ -42,8 +41,6 @@ import javax.annotation.Nullable;
import javax.inject.Inject;
import javafx.application.Platform;
import net.tomp2p.dht.FutureGet;
import net.tomp2p.dht.FuturePut;
import net.tomp2p.dht.FutureRemove;
@ -62,6 +59,10 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.subjects.BehaviorSubject;
import rx.subjects.Subject;
import static com.google.common.base.Preconditions.checkNotNull;
/**
@ -105,21 +106,17 @@ public class TomP2PNode implements ClientNode {
// Public methods
///////////////////////////////////////////////////////////////////////////////////////////
public void setMessageBroker(MessageBroker messageBroker) {
this.messageBroker = messageBroker;
}
public void setKeyPair(@NotNull KeyPair keyPair) {
this.keyPair = keyPair;
bootstrappedPeerFactory.setKeyPair(keyPair);
}
public void bootstrap(BootstrapListener bootstrapListener) {
public Observable<BootstrapState> bootstrap(MessageBroker messageBroker, KeyPair keyPair) {
checkNotNull(keyPair, "keyPair must not be null.");
checkNotNull(messageBroker, "messageBroker must not be null.");
bootstrappedPeerFactory.getBootstrapState().addListener((ov, oldValue, newValue) ->
bootstrapListener.onBootstrapStateChanged(newValue));
this.messageBroker = messageBroker;
this.keyPair = keyPair;
bootstrappedPeerFactory.setKeyPair(keyPair);
Subject<BootstrapState, BootstrapState> bootstrapState = BehaviorSubject.create();
bootstrappedPeerFactory.getBootstrapState().addListener((ov, oldValue, newValue) -> bootstrapState.onNext(newValue));
SettableFuture<PeerDHT> bootstrapFuture = bootstrappedPeerFactory.start();
Futures.addCallback(bootstrapFuture, new FutureCallback<PeerDHT>() {
@ -128,21 +125,22 @@ public class TomP2PNode implements ClientNode {
if (peerDHT != null) {
TomP2PNode.this.peerDHT = peerDHT;
setup();
Platform.runLater(bootstrapListener::onCompleted);
bootstrapState.onCompleted();
}
else {
log.error("Error at bootstrap: peerDHT = null");
Platform.runLater(() -> bootstrapListener.onFailed(
new BitsquareException("Error at bootstrap: peerDHT = null")));
bootstrapState.onError(new BitsquareException("Error at bootstrap: peerDHT = null"));
}
}
@Override
public void onFailure(@NotNull Throwable t) {
log.error("Exception at bootstrap " + t.getMessage());
Platform.runLater(() -> bootstrapListener.onFailed(t));
bootstrapState.onError(t);
}
});
return bootstrapState.asObservable();
}
private void setup() {