From a02e25b0ce20ef0edf8995b1fb99d33ef862a418 Mon Sep 17 00:00:00 2001 From: Chris Stewart Date: Thu, 4 Aug 2022 09:16:46 -0500 Subject: [PATCH] Refactor `WalletRoutes` to take `DLCWalletLoaderApi` as a paramete (#4565) * Refactor WalletRoutes to take DLCWalletLoaderApi as a parameter, use the walletHolder inside of DLCWalletLoaderApi to fulfill requests Fix rebase Reduce parallelism in fetchFiltersFlow to 1 to reduce chance of hitting akka's max open requests * Try fix loader test by catching RejectedExecutionException * Add println to see if catching excpetion gets hit * WIP: Try to fix recursive rescan * Don't map on recursiveRescan so we don't block on it --- .../DLCWalletBitcoindBackendLoaderTest.scala | 4 +- .../org/bitcoins/server/RoutesSpec.scala | 20 ++++- .../bitcoins/server/WalletRoutesSpec.scala | 23 +++++- .../bitcoins/server/BitcoinSServerMain.scala | 14 ++-- .../bitcoins/server/DLCWalletLoaderApi.scala | 5 +- .../org/bitcoins/server/WalletRoutes.scala | 7 +- .../core/wallet/rescan/RescanState.scala | 2 +- .../testkit/server/WalletLoaderFixtures.scala | 2 +- .../org/bitcoins/wallet/WalletHolder.scala | 12 ++- .../wallet/internal/RescanHandling.scala | 73 ++++++++++++++----- 10 files changed, 121 insertions(+), 41 deletions(-) diff --git a/app/server-test/src/test/scala/org/bitcoins/server/DLCWalletBitcoindBackendLoaderTest.scala b/app/server-test/src/test/scala/org/bitcoins/server/DLCWalletBitcoindBackendLoaderTest.scala index 86a6ce482a..9955decfa2 100644 --- a/app/server-test/src/test/scala/org/bitcoins/server/DLCWalletBitcoindBackendLoaderTest.scala +++ b/app/server-test/src/test/scala/org/bitcoins/server/DLCWalletBitcoindBackendLoaderTest.scala @@ -34,7 +34,7 @@ class DLCWalletBitcoindBackendLoaderTest extends WalletLoaderFixtures { val bitcoind = walletHolderWithLoader.bitcoind //need some blocks to make rescans last longer for the test case val blocksF = bitcoind.getNewAddress.flatMap(addr => - bitcoind.generateToAddress(500, addr)) + bitcoind.generateToAddress(250, addr)) val loadedWalletF = loader.load(walletNameOpt = None, aesPasswordOpt = None) @@ -42,6 +42,7 @@ class DLCWalletBitcoindBackendLoaderTest extends WalletLoaderFixtures { //as a hack, set rescanning to true, so next time we load it starts a rescan val setRescanF = for { + _ <- blocksF walletConfig <- walletConfigF descriptorDAO = WalletStateDescriptorDAO()(system.dispatcher, walletConfig) @@ -51,7 +52,6 @@ class DLCWalletBitcoindBackendLoaderTest extends WalletLoaderFixtures { //now that we have set rescanning, we should see a rescan next time we load wallet for { _ <- setRescanF - _ <- blocksF (loadWallet2, _, _) <- loader.load( walletNameOpt = None, aesPasswordOpt = None diff --git a/app/server-test/src/test/scala/org/bitcoins/server/RoutesSpec.scala b/app/server-test/src/test/scala/org/bitcoins/server/RoutesSpec.scala index aa66a7c3b3..f7abe83ef0 100644 --- a/app/server-test/src/test/scala/org/bitcoins/server/RoutesSpec.scala +++ b/app/server-test/src/test/scala/org/bitcoins/server/RoutesSpec.scala @@ -33,12 +33,13 @@ import org.bitcoins.core.wallet.fee.{FeeUnit, SatoshisPerVirtualByte} import org.bitcoins.core.wallet.rescan.RescanState import org.bitcoins.core.wallet.utxo._ import org.bitcoins.crypto._ +import org.bitcoins.feeprovider.ConstantFeeRateProvider import org.bitcoins.node.Node import org.bitcoins.server.routes.{CommonRoutes, ServerCommand} import org.bitcoins.testkit.BitcoinSTestAppConfig import org.bitcoins.testkit.wallet.DLCWalletUtil import org.bitcoins.testkitcore.util.TransactionTestUtil -import org.bitcoins.wallet.MockWalletApi +import org.bitcoins.wallet.{MockWalletApi, WalletHolder} import org.scalamock.scalatest.MockFactory import org.scalatest.wordspec.AnyWordSpec import scodec.bits.ByteVector @@ -62,8 +63,6 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory { val testAddress: BitcoinAddress = BitcoinAddress.fromString(testAddressStr) val testLabel: AddressLabelTag = AddressLabelTag("test") - val mockWalletApi: MockWalletApi = mock[MockWalletApi] - val mockChainApi: ChainApi = mock[ChainApi] val mockNode: Node = mock[Node] @@ -72,8 +71,21 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory { val nodeRoutes: NodeRoutes = NodeRoutes(mockNode) + val mockWalletApi: MockWalletApi = mock[MockWalletApi] + + val walletHolder = new WalletHolder(Some(mockWalletApi)) + + val feeRateApi = ConstantFeeRateProvider(SatoshisPerVirtualByte.one) + + val walletLoader: DLCWalletNeutrinoBackendLoader = { + DLCWalletNeutrinoBackendLoader(walletHolder, + mockChainApi, + mockNode, + feeRateApi) + } + val walletRoutes: WalletRoutes = - WalletRoutes(mockWalletApi)(system, conf.walletConf) + WalletRoutes(walletLoader)(system, conf.walletConf) val coreRoutes: CoreRoutes = CoreRoutes() diff --git a/app/server-test/src/test/scala/org/bitcoins/server/WalletRoutesSpec.scala b/app/server-test/src/test/scala/org/bitcoins/server/WalletRoutesSpec.scala index 09637cb727..8f0db7ff41 100644 --- a/app/server-test/src/test/scala/org/bitcoins/server/WalletRoutesSpec.scala +++ b/app/server-test/src/test/scala/org/bitcoins/server/WalletRoutesSpec.scala @@ -2,14 +2,17 @@ package org.bitcoins.server import akka.http.scaladsl.model.ContentTypes._ import akka.http.scaladsl.testkit.ScalatestRouteTest +import org.bitcoins.core.api.chain.ChainApi import org.bitcoins.core.protocol.dlc.models.DLCMessage.DLCOffer import org.bitcoins.core.protocol.dlc.models.DLCStatus import org.bitcoins.core.protocol.tlv.{DLCOfferTLV, LnMessageFactory} import org.bitcoins.core.wallet.fee.{FeeUnit, SatoshisPerVirtualByte} import org.bitcoins.crypto.Sha256Digest +import org.bitcoins.feeprovider.ConstantFeeRateProvider +import org.bitcoins.node.Node import org.bitcoins.server.routes.ServerCommand import org.bitcoins.testkit.BitcoinSTestAppConfig -import org.bitcoins.wallet.MockWalletApi +import org.bitcoins.wallet.{MockWalletApi, WalletHolder} import org.scalamock.scalatest.MockFactory import org.scalatest.wordspec.AnyWordSpec @@ -22,10 +25,24 @@ class WalletRoutesSpec implicit val conf: BitcoinSAppConfig = BitcoinSTestAppConfig.getNeutrinoTestConfig() - val mockWalletApi = mock[MockWalletApi] + + val mockChainApi: ChainApi = mock[ChainApi] + + val mockNode: Node = mock[Node] + val mockWalletApi: MockWalletApi = mock[MockWalletApi] + + val walletHolder = new WalletHolder(Some(mockWalletApi)) + + val feeRateApi = ConstantFeeRateProvider(SatoshisPerVirtualByte.one) + + val walletLoader: DLCWalletNeutrinoBackendLoader = + DLCWalletNeutrinoBackendLoader(walletHolder, + mockChainApi, + mockNode, + feeRateApi) val walletRoutes: WalletRoutes = - WalletRoutes(mockWalletApi)(system, conf.walletConf) + WalletRoutes(walletLoader)(system, conf.walletConf) "WalletRoutes" should { "estimatefee" in { diff --git a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala index 2bc42939c4..d842cc9d27 100644 --- a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala +++ b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala @@ -115,7 +115,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit _ = logger.info(s"Stopped ${nodeConf.nodeType.shortName} node") } yield { //return empty wallet holder - new WalletHolder() + WalletHolder.empty } } @@ -145,7 +145,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit torConf.socks5ProxyParams, network) //get our wallet - val walletHolder = new WalletHolder() + val walletHolder = WalletHolder.empty val neutrinoWalletLoaderF = { for { node <- nodeF @@ -224,7 +224,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit _ <- startHttpServer( nodeApiF = startedNodeF, chainApi = chainApi, - walletF = configuredWalletF.map(_._1), + walletLoaderF = neutrinoWalletLoaderF, dlcNodeF = startedDLCNodeF, torConfStarted = startedTorConfigF, serverCmdLineArgs = serverArgParser, @@ -330,7 +330,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit WalletAppConfig.DEFAULT_WALLET_NAME) } yield walletName - val walletHolder = new WalletHolder() + val walletHolder = WalletHolder.empty val chainCallbacksF = for { bitcoind <- bitcoindF } yield { @@ -399,7 +399,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit _ <- startHttpServer( nodeApiF = Future.successful(bitcoind), chainApi = bitcoind, - walletF = walletF.map(_._1), + walletLoaderF = loadWalletApiF, dlcNodeF = dlcNodeF, torConfStarted = startedTorConfigF, serverCmdLineArgs = serverArgParser, @@ -440,7 +440,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit private def startHttpServer( nodeApiF: Future[NodeApi], chainApi: ChainApi, - walletF: Future[WalletHolder], + walletLoaderF: Future[DLCWalletLoaderApi], dlcNodeF: Future[DLCNode], torConfStarted: Future[Unit], serverCmdLineArgs: ServerArgParser, @@ -451,7 +451,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit implicit val walletConf: WalletAppConfig = conf.walletConf val walletRoutesF = { - walletF.map { w => + walletLoaderF.map { w => WalletRoutes(w) } } diff --git a/app/server/src/main/scala/org/bitcoins/server/DLCWalletLoaderApi.scala b/app/server/src/main/scala/org/bitcoins/server/DLCWalletLoaderApi.scala index 4576107422..80e01701bd 100644 --- a/app/server/src/main/scala/org/bitcoins/server/DLCWalletLoaderApi.scala +++ b/app/server/src/main/scala/org/bitcoins/server/DLCWalletLoaderApi.scala @@ -32,8 +32,10 @@ sealed trait DLCWalletLoaderApi extends Logging with StartStopAsync[Unit] { implicit protected def system: ActorSystem implicit private def ec: ExecutionContext = system.dispatcher + def walletHolder: WalletHolder + /** Determine if a wallet has been loaded */ - def isWalletLoaded: Boolean + def isWalletLoaded: Boolean = walletHolder.isInitialized def load( walletNameOpt: Option[String], @@ -337,7 +339,6 @@ case class DLCWalletBitcoindBackendLoader( extends DLCWalletLoaderApi { import system.dispatcher implicit private val nodeConf = conf.nodeConf - override def isWalletLoaded: Boolean = walletHolder.isInitialized override def load( walletNameOpt: Option[String], diff --git a/app/server/src/main/scala/org/bitcoins/server/WalletRoutes.scala b/app/server/src/main/scala/org/bitcoins/server/WalletRoutes.scala index efddb18b3e..6d60252ecf 100644 --- a/app/server/src/main/scala/org/bitcoins/server/WalletRoutes.scala +++ b/app/server/src/main/scala/org/bitcoins/server/WalletRoutes.scala @@ -8,7 +8,6 @@ import akka.stream.Materializer import grizzled.slf4j.Logging import org.bitcoins.commons.rpc._ import org.bitcoins.commons.serializers.Picklers._ -import org.bitcoins.core.api.dlc.wallet.DLCNeutrinoHDWalletApi import org.bitcoins.core.api.wallet.db.SpendingInfoDb import org.bitcoins.core.currency._ import org.bitcoins.core.protocol.tlv._ @@ -25,6 +24,7 @@ import org.bitcoins.crypto.NetworkElement import org.bitcoins.keymanager._ import org.bitcoins.keymanager.config.KeyManagerAppConfig import org.bitcoins.server.routes.{Server, ServerCommand, ServerRoute} +import org.bitcoins.wallet.WalletHolder import org.bitcoins.wallet.config.WalletAppConfig import ujson._ import upickle.default._ @@ -35,13 +35,16 @@ import scala.concurrent.duration.DurationInt import scala.concurrent.{Await, Future} import scala.util.{Failure, Success} -case class WalletRoutes(wallet: DLCNeutrinoHDWalletApi)(implicit +case class WalletRoutes(loader: DLCWalletLoaderApi)(implicit system: ActorSystem, walletConf: WalletAppConfig) extends ServerRoute with Logging { import system.dispatcher + /** The loaded wallet that requests should be directed against */ + private[this] val wallet: WalletHolder = loader.walletHolder + implicit private val kmConf: KeyManagerAppConfig = walletConf.kmConf private var rescanStateOpt: Option[RescanState] = None diff --git a/core/src/main/scala/org/bitcoins/core/wallet/rescan/RescanState.scala b/core/src/main/scala/org/bitcoins/core/wallet/rescan/RescanState.scala index 9a9ac50820..3139631084 100644 --- a/core/src/main/scala/org/bitcoins/core/wallet/rescan/RescanState.scala +++ b/core/src/main/scala/org/bitcoins/core/wallet/rescan/RescanState.scala @@ -71,7 +71,7 @@ object RescanState { } } - /** Returns a Future that is compelted when a rescan is fully executed. + /** Returns a Future that is completed when a rescan is fully executed. * This means that the rescan was NOT terminated externally by completing * the akka stream that underlies the rescan logic. */ diff --git a/testkit/src/main/scala/org/bitcoins/testkit/server/WalletLoaderFixtures.scala b/testkit/src/main/scala/org/bitcoins/testkit/server/WalletLoaderFixtures.scala index 47f4cb475b..bc03a78c49 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/server/WalletLoaderFixtures.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/server/WalletLoaderFixtures.scala @@ -26,7 +26,7 @@ trait WalletLoaderFixtures chainQueryApi = bitcoind, feeRateApi = bitcoind) - walletHolder = new WalletHolder() + walletHolder = WalletHolder.empty loader = DLCWalletBitcoindBackendLoader( walletHolder = walletHolder, bitcoind = bitcoind, diff --git a/wallet/src/main/scala/org/bitcoins/wallet/WalletHolder.scala b/wallet/src/main/scala/org/bitcoins/wallet/WalletHolder.scala index f26e5bc0af..f3084bec9a 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/WalletHolder.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/WalletHolder.scala @@ -61,11 +61,13 @@ import scala.concurrent.{ExecutionContext, Future} class WalletNotInitialized extends Exception("The wallet is not initialized") -class WalletHolder(implicit ec: ExecutionContext) +class WalletHolder(initWalletOpt: Option[DLCNeutrinoHDWalletApi])(implicit + ec: ExecutionContext) extends DLCNeutrinoHDWalletApi with Logging { - @volatile private var walletOpt: Option[DLCNeutrinoHDWalletApi] = None + @volatile private var walletOpt: Option[DLCNeutrinoHDWalletApi] = + initWalletOpt private def wallet: DLCNeutrinoHDWalletApi = synchronized { walletOpt match { @@ -1002,3 +1004,9 @@ class WalletHolder(implicit ec: ExecutionContext) newTags = newTags)) } } + +object WalletHolder { + + def empty(implicit ec: ExecutionContext): WalletHolder = new WalletHolder( + None) +} diff --git a/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala b/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala index bb9fcfd6bd..93efda6d80 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala @@ -13,13 +13,13 @@ import org.bitcoins.core.hd.{HDAccount, HDChainType} import org.bitcoins.core.protocol.BlockStamp.BlockHeight import org.bitcoins.core.protocol.script.ScriptPubKey import org.bitcoins.core.protocol.{BitcoinAddress, BlockStamp} -import org.bitcoins.core.util.FutureUtil import org.bitcoins.core.wallet.rescan.RescanState import org.bitcoins.crypto.DoubleSha256Digest import org.bitcoins.db.SafeDatabase import org.bitcoins.wallet.{Wallet, WalletLogger} import slick.dbio.{DBIOAction, Effect, NoStream} +import java.util.concurrent.RejectedExecutionException import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.{Failure, Success} @@ -100,11 +100,15 @@ private[wallet] trait RescanHandling extends WalletLogger { state } - res.recoverWith { case err: Throwable => - logger.error(s"Failed to rescan wallet", err) - stateDescriptorDAO - .updateRescanning(false) - .flatMap(_ => Future.failed(err)) + res.recoverWith { + case _: RejectedExecutionException => + println(s"Caught rejected execution exception") + Future.unit //don't do anything if its from the threadpool shutting down + case err: Throwable => + logger.error(s"Failed to rescan wallet", err) + stateDescriptorDAO + .updateRescanning(false) + .flatMap(_ => Future.failed(err)) } res.map { @@ -123,7 +127,9 @@ private[wallet] trait RescanHandling extends WalletLogger { Future.successful(RescanState.RescanAlreadyStarted) } - } yield rescanState + } yield { + rescanState + } } lazy val walletCreationBlockHeight: Future[BlockHeight] = @@ -138,7 +144,6 @@ private[wallet] trait RescanHandling extends WalletLogger { parallelism: Int, filterBatchSize: Int): RescanState.RescanStarted = { val scriptsF = generateScriptPubKeys(account, addressBatchSize) - //by completing the promise returned by this sink //we will be able to arbitrarily terminate the stream //see: https://doc.akka.io/docs/akka/current/stream/operators/Source/maybe.html @@ -269,6 +274,38 @@ private[wallet] trait RescanHandling extends WalletLogger { inProgress <- matchBlocks(endOpt = endOpt, startOpt = startOpt, account = account) + _ = recursiveRescan(prevState = inProgress, + startOpt = startOpt, + endOpt = endOpt, + addressBatchSize = addressBatchSize, + addressCount = addressCount, + account = account) + } yield { + inProgress + } + } + + /** Used to call a recursive rescan after the previous rescan is complete. + * The [[prevState]] parameter is what represents the previous rescan. + * We wait for this rescan to complete, and then check if we need to + * do another rescan + */ + private def recursiveRescan( + prevState: RescanState, + startOpt: Option[BlockStamp], + endOpt: Option[BlockStamp], + addressBatchSize: Int, + addressCount: Int, + account: HDAccount): Future[Unit] = { + val awaitPreviousRescanF = prevState match { + case r @ (_: RescanState.RescanStarted | RescanState.RescanDone) => + RescanState.awaitRescanComplete(r) + case RescanState.RescanAlreadyStarted => + //don't continue rescanning if previous rescan was not started + Future.unit + } + for { + _ <- awaitPreviousRescanF externalGap <- calcAddressGap(HDChainType.External, account) changeGap <- calcAddressGap(HDChainType.Change, account) _ <- { @@ -290,7 +327,7 @@ private[wallet] trait RescanHandling extends WalletLogger { doNeutrinoRescan(account, startOpt, endOpt, addressBatchSize) } } - } yield inProgress + } yield () } private def calcAddressGap( @@ -415,14 +452,16 @@ private[wallet] trait RescanHandling extends WalletLogger { Vector[Int], Vector[ChainQueryApi.FilterResponse], NotUsed] = { - Flow[Vector[Int]].mapAsync(FutureUtil.getParallelism) { - case range: Vector[Int] => - val startHeight = range.head - val endHeight = range.last - logger.info( - s"Searching filters from start=$startHeight to end=$endHeight") - chainQueryApi.getFiltersBetweenHeights(startHeight = startHeight, - endHeight = endHeight) + //parallelism as 1 here because `getFiltersBetweenHeights` + //fetches filters in parallel. We can run into our max open requests + //allowed by akka if we have parallelism more than 1 here + Flow[Vector[Int]].mapAsync(1) { case range: Vector[Int] => + val startHeight = range.head + val endHeight = range.last + logger.info( + s"Searching filters from start=$startHeight to end=$endHeight") + chainQueryApi.getFiltersBetweenHeights(startHeight = startHeight, + endHeight = endHeight) } }