From 67295aea55f92664c13bb373789ffdab1d80c10e Mon Sep 17 00:00:00 2001 From: Chris Beams Date: Tue, 18 Nov 2014 11:57:50 +0100 Subject: [PATCH] Improve service initialization coordination using rx.Observable This change introduces the use of RxJava's Observable [1] to redesign how we work with non-deterministic and/or event-based information, such as: connecting to peer-to-peer infrastructure, synchronizing the bitcoin blockchain, and so on. Prior to this commit, these activities were initiated in methods like WalletService#initialize and TomP2PMessageService#init. These methods accepted 'listener' interfaces, and these listeners' callback methods would be invoked whenever work progressed, completed, or failed. This approach required significant coordination logic, which, prior to this commit, was found primarily in MainModel#initBackend. A primary goal of the logic found here was to determine when the backend was "ready". This state was represented in MainModel's `backendReady` field, which would be set to true once the following three conditions were satisfied: 1. the message service had finished initialization 2. the wallet service had finished initialization, and 3. the blockchain synchronization had reached 100% Monitoring these three states was complex, and required hard-to-follow conditional logic spread across a number of locations in the code. In any case, however, once these three conditions were satisfied and backendReady's value was set to true, a listener on the backendReady field (in MainViewCB#doInitialize) would then populate combo boxes and pending trade counts in the main view and cause the splash screen to fade out, rendering the application ready for user interaction. The introduction of rx.Observable is designed to achieve the same show-the-splash-screen-until-everything-is-ready functionality described above, without the complex monitoring, conditional logic and nested callbacks. This is achieved by modeling each process as an Observable stream of events. Observables in RxJava can emit any number of events, and can complete either normally or with an error. These observables may be 'subscribed' to by any number of subscribers, and events emitted can be acted upon by instructing the subscriber what to do `onNext`, `onCompleted`, and `onError`. So for example WalletService now exposes an Observable called bootstrapState. This Observable is subscribed to in MainModel#initBackend in such a way that every time it emits a new double value (i.e. a new percentage), the various bootstrap state text labels and progress indicators are updated accordingly. Where it gets really interesting, however, is when Observables are combined. The primary complexity described above is coordinating the fading out of the splash screen with the completed initialization of all backend services. As can now be seen in MainModel#initBackend, the wallet service and message service Observables are simply "merged" into a single observable and returned. From the MainViewCB side, this "single backend observable" is subscribed to and, when it completes (i.e. when all the underlying Observables complete), then combo boxes and pending trade counts are populated and the splash screen is faded out. Understanding RxJava, Observables, and the principles of "Functional Reactive Programming" takes time. It is a paradigm shift in dealing with concurrency and non-determinism, but one that ultimately rewards those who take the time. In the end, I believe it's use will result in a significantly more concise and robust internal architecture for Bitsquare, and using RxJava's lightweight, well-adopted and infrastructure-agnostic API leaves us open to using Akka or other more sophisticated infrastructure later without tying ourselves to those specific APIs (because virtually anything can be modeled as an Observable). Achieve these benifits means that core committers will need to understand how RxJava works, how to think about it, and how to design using it. I have spent the better part of the last week getting to know it, and I am certainly still learning. I can recommend many resources to aid in this process, but having gone through it myself, I recommend that everyone read at least [1] and [2] first. [1]: https://github.com/ReactiveX/RxJava/wiki/Observable [2]: [The introduction to Reactive Programming you've been missing](https://gist.github.com/staltz/868e7e9bc2a7b8c1f754) --- build.gradle | 1 + .../java/io/bitsquare/btc/WalletService.java | 164 ++++++++---------- .../java/io/bitsquare/gui/main/MainModel.java | 91 +++------- .../io/bitsquare/gui/main/MainViewCB.java | 28 +-- .../java/io/bitsquare/msg/MessageService.java | 6 +- .../msg/listeners/BootstrapListener.java | 29 ---- .../msg/tomp2p/TomP2PMessageService.java | 10 +- .../io/bitsquare/msg/tomp2p/TomP2PNode.java | 36 ++-- 8 files changed, 145 insertions(+), 220 deletions(-) delete mode 100644 src/main/java/io/bitsquare/msg/listeners/BootstrapListener.java diff --git a/build.gradle b/build.gradle index e3c45ce1b2..345720ff82 100644 --- a/build.gradle +++ b/build.gradle @@ -41,6 +41,7 @@ repositories { dependencies { compile 'org.bitcoinj:bitcoinj-core:0.12' compile 'net.tomp2p:tomp2p-all:5.0-Alpha.60850a9-SNAPSHOT' + compile 'io.reactivex:rxjava:1.0.0-rc.12' compile 'org.springframework:spring-core:4.1.1.RELEASE' compile 'net.sf.jopt-simple:jopt-simple:4.8' compile 'org.slf4j:slf4j-api:1.7.7' diff --git a/src/main/java/io/bitsquare/btc/WalletService.java b/src/main/java/io/bitsquare/btc/WalletService.java index fcfb24a8db..5eba838f18 100644 --- a/src/main/java/io/bitsquare/btc/WalletService.java +++ b/src/main/java/io/bitsquare/btc/WalletService.java @@ -23,6 +23,7 @@ import io.bitsquare.btc.listeners.TxConfidenceListener; import io.bitsquare.crypto.SignatureService; import io.bitsquare.persistence.Persistence; +import org.bitcoinj.core.AbstractWalletEventListener; import org.bitcoinj.core.Address; import org.bitcoinj.core.AddressFormatException; import org.bitcoinj.core.Coin; @@ -85,6 +86,10 @@ import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import rx.Observable; +import rx.subjects.BehaviorSubject; +import rx.subjects.Subject; + import static org.bitcoinj.script.ScriptOpCodes.OP_RETURN; /** @@ -103,6 +108,10 @@ public class WalletService { private final List balanceListeners = new CopyOnWriteArrayList<>(); private final ReentrantLock lock = Threading.lock(LOCK_NAME); + private final ObservableDownloadListener downloadListener = new ObservableDownloadListener(); + private final Observable downloadProgress = downloadListener.getObservable(); + private final WalletEventListener walletEventListener = new BitsquareWalletEventListener(); + private final NetworkParameters params; private final FeePolicy feePolicy; private final SignatureService signatureService; @@ -113,7 +122,6 @@ public class WalletService { private WalletAppKit walletAppKit; private Wallet wallet; - private WalletEventListener walletEventListener; private AddressEntry registrationAddressEntry; private AddressEntry arbitratorDepositAddressEntry; private @GuardedBy(LOCK_NAME) List addressEntryList = new ArrayList<>(); @@ -140,8 +148,9 @@ public class WalletService { // Public Methods /////////////////////////////////////////////////////////////////////////////////////////// - public void initialize(Executor executor, BlockchainDownloadListener blockchainDownloadListener, - StartupListener startupListener) { + public Observable initialize(Executor executor) { + Subject status = BehaviorSubject.create(); + // Tell bitcoinj to execute event handlers on the JavaFX UI thread. This keeps things simple and means // we cannot forget to switch threads when adding event handlers. Unfortunately, the DownloadListener // we give to the app kit is currently an exception and runs on a library thread. It'll get fixed in @@ -159,7 +168,7 @@ public class WalletService { walletAppKit.peerGroup().setMaxConnections(11); walletAppKit.peerGroup().setBloomFilterFalsePositiveRate(0.00001); initWallet(); - executor.execute(() -> startupListener.completed()); + status.onCompleted(); } }; // Now configure and start the appkit. This will take a second or two - we could show a temporary splash screen @@ -186,21 +195,6 @@ public class WalletService { //walletAppKit.useTor(); } - // DownloadListener does not run yet in a user thread, so we map it our self - DownloadListener downloadListener = new DownloadListener() { - @Override - protected void progress(double percentage, int blocksLeft, Date date) { - super.progress(percentage, blocksLeft, date); - executor.execute(() -> blockchainDownloadListener.progress(percentage)); - } - - @Override - protected void doneDownload() { - super.doneDownload(); - executor.execute(() -> blockchainDownloadListener.doneDownload()); - } - }; - walletAppKit.setDownloadListener(downloadListener) .setBlockingStartup(false) .setUserAgent(userAgent.getName(), userAgent.getVersion()); @@ -215,51 +209,16 @@ public class WalletService { @Override public void failed(@NotNull Service.State from, @NotNull Throwable failure) { walletAppKit = null; - startupListener.failed(failure); + status.onError(failure); } }, Threading.USER_THREAD); walletAppKit.startAsync(); + + return status.mergeWith(downloadProgress); } private void initWallet() { wallet = walletAppKit.wallet(); - - walletEventListener = new WalletEventListener() { - @Override - public void onCoinsReceived(Wallet wallet, Transaction tx, Coin prevBalance, Coin newBalance) { - notifyBalanceListeners(); - } - - @Override - public void onCoinsSent(Wallet wallet, Transaction tx, Coin prevBalance, Coin newBalance) { - notifyBalanceListeners(); - } - - @Override - public void onReorganize(Wallet wallet) { - - } - - @Override - public void onTransactionConfidenceChanged(Wallet wallet, Transaction tx) { - notifyConfidenceListeners(tx); - } - - @Override - public void onWalletChanged(Wallet wallet) { - - } - - @Override - public void onScriptsAdded(Wallet wallet, List