diff --git a/build.gradle b/build.gradle index 620a32c4c1..fa3606ee32 100644 --- a/build.gradle +++ b/build.gradle @@ -41,6 +41,7 @@ repositories { dependencies { compile 'org.bitcoinj:bitcoinj-core:0.12.2' compile 'net.tomp2p:tomp2p-all:5.0-Alpha.8f1cafb-SNAPSHOT' + compile 'io.reactivex:rxjava:1.0.0-rc.12' compile 'org.springframework:spring-core:4.1.1.RELEASE' compile 'net.sf.jopt-simple:jopt-simple:4.8' compile 'org.slf4j:slf4j-api:1.7.7' diff --git a/src/main/java/io/bitsquare/btc/WalletService.java b/src/main/java/io/bitsquare/btc/WalletService.java index d8edd969b6..f0380de9bd 100644 --- a/src/main/java/io/bitsquare/btc/WalletService.java +++ b/src/main/java/io/bitsquare/btc/WalletService.java @@ -23,6 +23,7 @@ import io.bitsquare.btc.listeners.TxConfidenceListener; import io.bitsquare.crypto.SignatureService; import io.bitsquare.persistence.Persistence; +import org.bitcoinj.core.AbstractWalletEventListener; import org.bitcoinj.core.Address; import org.bitcoinj.core.AddressFormatException; import org.bitcoinj.core.Coin; @@ -85,6 +86,10 @@ import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import rx.Observable; +import rx.subjects.BehaviorSubject; +import rx.subjects.Subject; + import static org.bitcoinj.script.ScriptOpCodes.OP_RETURN; /** @@ -103,6 +108,10 @@ public class WalletService { private final List balanceListeners = new CopyOnWriteArrayList<>(); private final ReentrantLock lock = Threading.lock(LOCK_NAME); + private final ObservableDownloadListener downloadListener = new ObservableDownloadListener(); + private final Observable downloadProgress = downloadListener.getObservable(); + private final WalletEventListener walletEventListener = new BitsquareWalletEventListener(); + private final NetworkParameters params; private final FeePolicy feePolicy; private final SignatureService signatureService; @@ -113,7 +122,6 @@ public class WalletService { private WalletAppKit walletAppKit; private Wallet wallet; - private WalletEventListener walletEventListener; private AddressEntry registrationAddressEntry; private AddressEntry arbitratorDepositAddressEntry; private @GuardedBy(LOCK_NAME) List addressEntryList = new ArrayList<>(); @@ -140,8 +148,9 @@ public class WalletService { // Public Methods /////////////////////////////////////////////////////////////////////////////////////////// - public void initialize(Executor executor, BlockchainDownloadListener blockchainDownloadListener, - StartupListener startupListener) { + public Observable initialize(Executor executor) { + Subject status = BehaviorSubject.create(); + // Tell bitcoinj to execute event handlers on the JavaFX UI thread. This keeps things simple and means // we cannot forget to switch threads when adding event handlers. Unfortunately, the DownloadListener // we give to the app kit is currently an exception and runs on a library thread. It'll get fixed in @@ -159,7 +168,7 @@ public class WalletService { walletAppKit.peerGroup().setMaxConnections(11); walletAppKit.peerGroup().setBloomFilterFalsePositiveRate(0.00001); initWallet(); - executor.execute(() -> startupListener.completed()); + status.onCompleted(); } }; // Now configure and start the appkit. This will take a second or two - we could show a temporary splash screen @@ -186,21 +195,6 @@ public class WalletService { //walletAppKit.useTor(); } - // DownloadListener does not run yet in a user thread, so we map it our self - DownloadListener downloadListener = new DownloadListener() { - @Override - protected void progress(double percentage, int blocksLeft, Date date) { - super.progress(percentage, blocksLeft, date); - executor.execute(() -> blockchainDownloadListener.progress(percentage)); - } - - @Override - protected void doneDownload() { - super.doneDownload(); - executor.execute(() -> blockchainDownloadListener.doneDownload()); - } - }; - walletAppKit.setDownloadListener(downloadListener) .setBlockingStartup(false) .setUserAgent(userAgent.getName(), userAgent.getVersion()); @@ -215,52 +209,16 @@ public class WalletService { @Override public void failed(@NotNull Service.State from, @NotNull Throwable failure) { walletAppKit = null; - startupListener.failed(failure); + status.onError(failure); } }, Threading.USER_THREAD); walletAppKit.startAsync(); + + return status.mergeWith(downloadProgress); } private void initWallet() { wallet = walletAppKit.wallet(); - - walletEventListener = new WalletEventListener() { - @Override - public void onCoinsReceived(Wallet wallet, Transaction tx, Coin prevBalance, Coin newBalance) { - notifyBalanceListeners(); - } - - @Override - public void onCoinsSent(Wallet wallet, Transaction tx, Coin prevBalance, Coin newBalance) { - notifyBalanceListeners(); - } - - @Override - public void onReorganize(Wallet wallet) { - - } - - @Override - public void onTransactionConfidenceChanged(Wallet wallet, Transaction tx) { - log.trace("onTransactionConfidenceChanged " + tx.getHashAsString()); - notifyConfidenceListeners(tx); - } - - @Override - public void onWalletChanged(Wallet wallet) { - - } - - @Override - public void onScriptsAdded(Wallet wallet, List