diff --git a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala index a7093d5fb7..795b5a31bf 100644 --- a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala +++ b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala @@ -97,6 +97,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit override def stop(): Future[Unit] = { logger.error(s"Exiting process") + mempoolPollingCancellableOpt.foreach(_.cancel()) for { _ <- conf.stop() _ <- serverBindingsOpt match { @@ -279,6 +280,10 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit } yield info } + /** Variable to hold the cancellable to stop polling bitcoind for blocks/txs */ + private var mempoolPollingCancellableOpt: Option[BitcoindPollingCancellabe] = + None + /** Start the bitcoin-s wallet server with a bitcoind backend * @param startedTorConfigF a future that is completed when tor is fully started * @return @@ -362,43 +367,56 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit } yield dlcNode } - for { - bitcoind <- bitcoindF - bitcoindNetwork <- getBlockChainInfo(bitcoind).map(_.chain) - _ = require( - bitcoindNetwork == network, - s"bitcoind ($bitcoindNetwork) on different network than wallet ($network)") - _ <- loadWalletApiF - _ <- startHttpServer( - nodeApiF = Future.successful(bitcoind), - chainApi = bitcoind, - walletF = walletF.map(_._1), - dlcNodeF = dlcNodeF, - torConfStarted = startedTorConfigF, - serverCmdLineArgs = serverArgParser, - wsSource = wsSource - ) - walletName <- walletNameF - walletCallbacks = WebsocketUtil.buildWalletCallbacks(wsQueue, walletName) - chainCallbacks <- chainCallbacksF - (wallet, walletConfig, dlcConfig) <- walletF - _ = walletConfig.addCallbacks(walletCallbacks) + //intentionally Future[Future[]], the inner Future[BitcoindPollingCancellable] + //is blocked on syncing the wallet from bitcoind. This can take hours + //so don't block the rest of the appllication on that + val pollingCancellableNestedF: Future[Future[BitcoindPollingCancellabe]] = + for { + bitcoind <- bitcoindF + bitcoindNetwork <- getBlockChainInfo(bitcoind).map(_.chain) + _ = require( + bitcoindNetwork == network, + s"bitcoind ($bitcoindNetwork) on different network than wallet ($network)") + _ <- loadWalletApiF + _ <- startHttpServer( + nodeApiF = Future.successful(bitcoind), + chainApi = bitcoind, + walletF = walletF.map(_._1), + dlcNodeF = dlcNodeF, + torConfStarted = startedTorConfigF, + serverCmdLineArgs = serverArgParser, + wsSource = wsSource + ) + walletName <- walletNameF + walletCallbacks = WebsocketUtil.buildWalletCallbacks(wsQueue, + walletName) + chainCallbacks <- chainCallbacksF + (wallet, walletConfig, dlcConfig) <- walletF + _ = walletConfig.addCallbacks(walletCallbacks) - //intentionally doesn't map on this otherwise we - //wait until we are done syncing the entire wallet - //which could take 1 hour - _ = syncWalletWithBitcoindAndStartPolling(bitcoind, - wallet, - Some(chainCallbacks)) - dlcWalletCallbacks = WebsocketUtil.buildDLCWalletCallbacks(wsQueue) - _ = dlcConfig.addCallbacks(dlcWalletCallbacks) - _ <- startedTorConfigF - _ <- handleDuplicateSpendingInfoDb(wallet, walletConfig) - _ <- restartRescanIfNeeded(wallet) - } yield { - logger.info(s"Done starting Main!") - () - } + //intentionally doesn't map on this otherwise we + //wait until we are done syncing the entire wallet + //which could take 1 hour + mempoolPollingCancellable = syncWalletWithBitcoindAndStartPolling( + bitcoind, + wallet, + Some(chainCallbacks)) + dlcWalletCallbacks = WebsocketUtil.buildDLCWalletCallbacks(wsQueue) + _ = dlcConfig.addCallbacks(dlcWalletCallbacks) + _ <- startedTorConfigF + _ <- handleDuplicateSpendingInfoDb(wallet, walletConfig) + _ <- restartRescanIfNeeded(wallet) + } yield { + logger.info(s"Done starting Main!") + mempoolPollingCancellable + } + + //set the polling cancellable after the wallet is done syncing against bitcoind + pollingCancellableNestedF.map( + _.map(c => mempoolPollingCancellableOpt = Some(c))) + + //don't return the Future that represents the full syncing of the wallet with bitcoind + pollingCancellableNestedF.map(_ => ()) } private var serverBindingsOpt: Option[ServerBindings] = None @@ -492,11 +510,13 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit * to block the rest of the application from starting if we have to * do a ton of syncing. However, we don't want to swallow * exceptions thrown by this method. + * @return the [[Cancellable]] representing the schedule job that polls the mempool. You can call .cancel() to stop this */ private def syncWalletWithBitcoindAndStartPolling( bitcoind: BitcoindRpcClient, wallet: WalletApi with NeutrinoWalletApi, - chainCallbacksOpt: Option[ChainCallbacks]): Future[Unit] = { + chainCallbacksOpt: Option[ChainCallbacks]): Future[ + BitcoindPollingCancellabe] = { val f = for { _ <- AsyncUtil.retryUntilSatisfiedF( conditionF = { () => @@ -515,26 +535,29 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit wallet, chainCallbacksOpt)(system) _ <- wallet.updateUtxoPendingStates() - _ <- + pollingCancellable <- if (bitcoindRpcConf.zmqConfig == ZmqConfig.empty) { BitcoindRpcBackendUtil .startBitcoindBlockPolling(wallet, bitcoind, chainCallbacksOpt) - .map { _ => - BitcoindRpcBackendUtil + .map { blockingPollingCancellable => + val mempoolCancellable = BitcoindRpcBackendUtil .startBitcoindMempoolPolling(wallet, bitcoind) { tx => nodeConf.callBacks .executeOnTxReceivedCallbacks(logger, tx) } - () + + BitcoindPollingCancellabe(blockingPollingCancellable, + mempoolCancellable) } } else { Future { BitcoindRpcBackendUtil.startZMQWalletCallbacks( wallet, bitcoindRpcConf.zmqConfig) + BitcoindPollingCancellabe.none } } - } yield () + } yield pollingCancellable f.failed.foreach(err => logger.error(s"Error syncing bitcoin-s wallet with bitcoind", err)) diff --git a/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala b/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala index 78dda18211..efb0b45d9a 100644 --- a/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala +++ b/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala @@ -147,11 +147,12 @@ object BitcoindRpcBackendUtil extends Logging { // so we don't lose the internal state of the wallet val walletCallbackP = Promise[Wallet]() + val nodeApi = BitcoindRpcBackendUtil.buildBitcoindNodeApi( + bitcoind, + walletCallbackP.future, + chainCallbacksOpt) val pairedWallet = Wallet( - nodeApi = - BitcoindRpcBackendUtil.buildBitcoindNodeApi(bitcoind, - walletCallbackP.future, - chainCallbacksOpt), + nodeApi = nodeApi, chainQueryApi = bitcoind, feeRateApi = wallet.feeRateApi )(wallet.walletConfig) diff --git a/app/server/src/main/scala/org/bitcoins/server/util/BitcoindPollingCancellabe.scala b/app/server/src/main/scala/org/bitcoins/server/util/BitcoindPollingCancellabe.scala new file mode 100644 index 0000000000..dcc19e863d --- /dev/null +++ b/app/server/src/main/scala/org/bitcoins/server/util/BitcoindPollingCancellabe.scala @@ -0,0 +1,26 @@ +package org.bitcoins.server.util + +import akka.actor.Cancellable +import grizzled.slf4j.Logging + +case class BitcoindPollingCancellabe( + blockPollingCancellable: Cancellable, + mempoolPollingCancelable: Cancellable) + extends Cancellable + with Logging { + + override def cancel(): Boolean = { + logger.info(s"Cancelling bitcoind polling jobs") + blockPollingCancellable.cancel() && mempoolPollingCancelable.cancel() + } + + override def isCancelled: Boolean = + blockPollingCancellable.isCancelled && mempoolPollingCancelable.cancel() +} + +object BitcoindPollingCancellabe { + + val none: BitcoindPollingCancellabe = BitcoindPollingCancellabe( + Cancellable.alreadyCancelled, + Cancellable.alreadyCancelled) +} diff --git a/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala b/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala index fac35e9820..716e487e6d 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala @@ -83,9 +83,6 @@ private[wallet] trait RescanHandling extends WalletLogger { _ <- stateDescriptorDAO.updateRescanning(false) _ <- walletCallbacks.executeOnRescanComplete(logger) } yield { - logger.info(s"Finished rescanning the wallet. It took ${System - .currentTimeMillis() - startTime}ms") - state } @@ -96,6 +93,15 @@ private[wallet] trait RescanHandling extends WalletLogger { .flatMap(_ => Future.failed(err)) } + res.map { + case r: RescanState.RescanStarted => + r.doneF.map(_ => + logger.info(s"Finished rescanning the wallet. It took ${System + .currentTimeMillis() - startTime}ms")) + case RescanState.RescanDone | RescanState.RescanAlreadyStarted => + //nothing to log + } + res } else { logger.warn(