Implement ability to cancel bitcoind polling jobs (#4538)

* Implement ability to cancel bitcoind polling jobs

* Fix bug so we don't block application on fully syncing wallet against bitcoind
This commit is contained in:
Chris Stewart 2022-07-24 19:56:53 -05:00 committed by GitHub
parent 4ef27dd163
commit d508d65142
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 105 additions and 49 deletions

View file

@ -97,6 +97,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
override def stop(): Future[Unit] = {
logger.error(s"Exiting process")
mempoolPollingCancellableOpt.foreach(_.cancel())
for {
_ <- conf.stop()
_ <- serverBindingsOpt match {
@ -279,6 +280,10 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
} yield info
}
/** Variable to hold the cancellable to stop polling bitcoind for blocks/txs */
private var mempoolPollingCancellableOpt: Option[BitcoindPollingCancellabe] =
None
/** Start the bitcoin-s wallet server with a bitcoind backend
* @param startedTorConfigF a future that is completed when tor is fully started
* @return
@ -362,6 +367,10 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
} yield dlcNode
}
//intentionally Future[Future[]], the inner Future[BitcoindPollingCancellable]
//is blocked on syncing the wallet from bitcoind. This can take hours
//so don't block the rest of the appllication on that
val pollingCancellableNestedF: Future[Future[BitcoindPollingCancellabe]] =
for {
bitcoind <- bitcoindF
bitcoindNetwork <- getBlockChainInfo(bitcoind).map(_.chain)
@ -379,7 +388,8 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
wsSource = wsSource
)
walletName <- walletNameF
walletCallbacks = WebsocketUtil.buildWalletCallbacks(wsQueue, walletName)
walletCallbacks = WebsocketUtil.buildWalletCallbacks(wsQueue,
walletName)
chainCallbacks <- chainCallbacksF
(wallet, walletConfig, dlcConfig) <- walletF
_ = walletConfig.addCallbacks(walletCallbacks)
@ -387,7 +397,8 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
//intentionally doesn't map on this otherwise we
//wait until we are done syncing the entire wallet
//which could take 1 hour
_ = syncWalletWithBitcoindAndStartPolling(bitcoind,
mempoolPollingCancellable = syncWalletWithBitcoindAndStartPolling(
bitcoind,
wallet,
Some(chainCallbacks))
dlcWalletCallbacks = WebsocketUtil.buildDLCWalletCallbacks(wsQueue)
@ -397,8 +408,15 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
_ <- restartRescanIfNeeded(wallet)
} yield {
logger.info(s"Done starting Main!")
()
mempoolPollingCancellable
}
//set the polling cancellable after the wallet is done syncing against bitcoind
pollingCancellableNestedF.map(
_.map(c => mempoolPollingCancellableOpt = Some(c)))
//don't return the Future that represents the full syncing of the wallet with bitcoind
pollingCancellableNestedF.map(_ => ())
}
private var serverBindingsOpt: Option[ServerBindings] = None
@ -492,11 +510,13 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
* to block the rest of the application from starting if we have to
* do a ton of syncing. However, we don't want to swallow
* exceptions thrown by this method.
* @return the [[Cancellable]] representing the schedule job that polls the mempool. You can call .cancel() to stop this
*/
private def syncWalletWithBitcoindAndStartPolling(
bitcoind: BitcoindRpcClient,
wallet: WalletApi with NeutrinoWalletApi,
chainCallbacksOpt: Option[ChainCallbacks]): Future[Unit] = {
chainCallbacksOpt: Option[ChainCallbacks]): Future[
BitcoindPollingCancellabe] = {
val f = for {
_ <- AsyncUtil.retryUntilSatisfiedF(
conditionF = { () =>
@ -515,26 +535,29 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
wallet,
chainCallbacksOpt)(system)
_ <- wallet.updateUtxoPendingStates()
_ <-
pollingCancellable <-
if (bitcoindRpcConf.zmqConfig == ZmqConfig.empty) {
BitcoindRpcBackendUtil
.startBitcoindBlockPolling(wallet, bitcoind, chainCallbacksOpt)
.map { _ =>
BitcoindRpcBackendUtil
.map { blockingPollingCancellable =>
val mempoolCancellable = BitcoindRpcBackendUtil
.startBitcoindMempoolPolling(wallet, bitcoind) { tx =>
nodeConf.callBacks
.executeOnTxReceivedCallbacks(logger, tx)
}
()
BitcoindPollingCancellabe(blockingPollingCancellable,
mempoolCancellable)
}
} else {
Future {
BitcoindRpcBackendUtil.startZMQWalletCallbacks(
wallet,
bitcoindRpcConf.zmqConfig)
BitcoindPollingCancellabe.none
}
}
} yield ()
} yield pollingCancellable
f.failed.foreach(err =>
logger.error(s"Error syncing bitcoin-s wallet with bitcoind", err))

View file

@ -147,11 +147,12 @@ object BitcoindRpcBackendUtil extends Logging {
// so we don't lose the internal state of the wallet
val walletCallbackP = Promise[Wallet]()
val pairedWallet = Wallet(
nodeApi =
BitcoindRpcBackendUtil.buildBitcoindNodeApi(bitcoind,
val nodeApi = BitcoindRpcBackendUtil.buildBitcoindNodeApi(
bitcoind,
walletCallbackP.future,
chainCallbacksOpt),
chainCallbacksOpt)
val pairedWallet = Wallet(
nodeApi = nodeApi,
chainQueryApi = bitcoind,
feeRateApi = wallet.feeRateApi
)(wallet.walletConfig)

View file

@ -0,0 +1,26 @@
package org.bitcoins.server.util
import akka.actor.Cancellable
import grizzled.slf4j.Logging
case class BitcoindPollingCancellabe(
blockPollingCancellable: Cancellable,
mempoolPollingCancelable: Cancellable)
extends Cancellable
with Logging {
override def cancel(): Boolean = {
logger.info(s"Cancelling bitcoind polling jobs")
blockPollingCancellable.cancel() && mempoolPollingCancelable.cancel()
}
override def isCancelled: Boolean =
blockPollingCancellable.isCancelled && mempoolPollingCancelable.cancel()
}
object BitcoindPollingCancellabe {
val none: BitcoindPollingCancellabe = BitcoindPollingCancellabe(
Cancellable.alreadyCancelled,
Cancellable.alreadyCancelled)
}

View file

@ -83,9 +83,6 @@ private[wallet] trait RescanHandling extends WalletLogger {
_ <- stateDescriptorDAO.updateRescanning(false)
_ <- walletCallbacks.executeOnRescanComplete(logger)
} yield {
logger.info(s"Finished rescanning the wallet. It took ${System
.currentTimeMillis() - startTime}ms")
state
}
@ -96,6 +93,15 @@ private[wallet] trait RescanHandling extends WalletLogger {
.flatMap(_ => Future.failed(err))
}
res.map {
case r: RescanState.RescanStarted =>
r.doneF.map(_ =>
logger.info(s"Finished rescanning the wallet. It took ${System
.currentTimeMillis() - startTime}ms"))
case RescanState.RescanDone | RescanState.RescanAlreadyStarted =>
//nothing to log
}
res
} else {
logger.warn(