Refactor WalletRoutes to take DLCWalletLoaderApi as a paramete (#4565)

* Refactor WalletRoutes to take DLCWalletLoaderApi as a parameter, use the walletHolder inside of DLCWalletLoaderApi to fulfill requests

Fix rebase

Reduce parallelism in fetchFiltersFlow to 1 to reduce chance of hitting akka's max open requests

* Try fix loader test by catching RejectedExecutionException

* Add println to see if catching excpetion gets hit

* WIP: Try to fix recursive rescan

* Don't map on recursiveRescan so we don't block on it
This commit is contained in:
Chris Stewart 2022-08-04 09:16:46 -05:00 committed by GitHub
parent 524c0af536
commit a02e25b0ce
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 121 additions and 41 deletions

View file

@ -34,7 +34,7 @@ class DLCWalletBitcoindBackendLoaderTest extends WalletLoaderFixtures {
val bitcoind = walletHolderWithLoader.bitcoind val bitcoind = walletHolderWithLoader.bitcoind
//need some blocks to make rescans last longer for the test case //need some blocks to make rescans last longer for the test case
val blocksF = bitcoind.getNewAddress.flatMap(addr => val blocksF = bitcoind.getNewAddress.flatMap(addr =>
bitcoind.generateToAddress(500, addr)) bitcoind.generateToAddress(250, addr))
val loadedWalletF = loader.load(walletNameOpt = None, aesPasswordOpt = None) val loadedWalletF = loader.load(walletNameOpt = None, aesPasswordOpt = None)
@ -42,6 +42,7 @@ class DLCWalletBitcoindBackendLoaderTest extends WalletLoaderFixtures {
//as a hack, set rescanning to true, so next time we load it starts a rescan //as a hack, set rescanning to true, so next time we load it starts a rescan
val setRescanF = for { val setRescanF = for {
_ <- blocksF
walletConfig <- walletConfigF walletConfig <- walletConfigF
descriptorDAO = WalletStateDescriptorDAO()(system.dispatcher, descriptorDAO = WalletStateDescriptorDAO()(system.dispatcher,
walletConfig) walletConfig)
@ -51,7 +52,6 @@ class DLCWalletBitcoindBackendLoaderTest extends WalletLoaderFixtures {
//now that we have set rescanning, we should see a rescan next time we load wallet //now that we have set rescanning, we should see a rescan next time we load wallet
for { for {
_ <- setRescanF _ <- setRescanF
_ <- blocksF
(loadWallet2, _, _) <- loader.load( (loadWallet2, _, _) <- loader.load(
walletNameOpt = None, walletNameOpt = None,
aesPasswordOpt = None aesPasswordOpt = None

View file

@ -33,12 +33,13 @@ import org.bitcoins.core.wallet.fee.{FeeUnit, SatoshisPerVirtualByte}
import org.bitcoins.core.wallet.rescan.RescanState import org.bitcoins.core.wallet.rescan.RescanState
import org.bitcoins.core.wallet.utxo._ import org.bitcoins.core.wallet.utxo._
import org.bitcoins.crypto._ import org.bitcoins.crypto._
import org.bitcoins.feeprovider.ConstantFeeRateProvider
import org.bitcoins.node.Node import org.bitcoins.node.Node
import org.bitcoins.server.routes.{CommonRoutes, ServerCommand} import org.bitcoins.server.routes.{CommonRoutes, ServerCommand}
import org.bitcoins.testkit.BitcoinSTestAppConfig import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.testkit.wallet.DLCWalletUtil import org.bitcoins.testkit.wallet.DLCWalletUtil
import org.bitcoins.testkitcore.util.TransactionTestUtil import org.bitcoins.testkitcore.util.TransactionTestUtil
import org.bitcoins.wallet.MockWalletApi import org.bitcoins.wallet.{MockWalletApi, WalletHolder}
import org.scalamock.scalatest.MockFactory import org.scalamock.scalatest.MockFactory
import org.scalatest.wordspec.AnyWordSpec import org.scalatest.wordspec.AnyWordSpec
import scodec.bits.ByteVector import scodec.bits.ByteVector
@ -62,8 +63,6 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory {
val testAddress: BitcoinAddress = BitcoinAddress.fromString(testAddressStr) val testAddress: BitcoinAddress = BitcoinAddress.fromString(testAddressStr)
val testLabel: AddressLabelTag = AddressLabelTag("test") val testLabel: AddressLabelTag = AddressLabelTag("test")
val mockWalletApi: MockWalletApi = mock[MockWalletApi]
val mockChainApi: ChainApi = mock[ChainApi] val mockChainApi: ChainApi = mock[ChainApi]
val mockNode: Node = mock[Node] val mockNode: Node = mock[Node]
@ -72,8 +71,21 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory {
val nodeRoutes: NodeRoutes = NodeRoutes(mockNode) val nodeRoutes: NodeRoutes = NodeRoutes(mockNode)
val mockWalletApi: MockWalletApi = mock[MockWalletApi]
val walletHolder = new WalletHolder(Some(mockWalletApi))
val feeRateApi = ConstantFeeRateProvider(SatoshisPerVirtualByte.one)
val walletLoader: DLCWalletNeutrinoBackendLoader = {
DLCWalletNeutrinoBackendLoader(walletHolder,
mockChainApi,
mockNode,
feeRateApi)
}
val walletRoutes: WalletRoutes = val walletRoutes: WalletRoutes =
WalletRoutes(mockWalletApi)(system, conf.walletConf) WalletRoutes(walletLoader)(system, conf.walletConf)
val coreRoutes: CoreRoutes = CoreRoutes() val coreRoutes: CoreRoutes = CoreRoutes()

View file

@ -2,14 +2,17 @@ package org.bitcoins.server
import akka.http.scaladsl.model.ContentTypes._ import akka.http.scaladsl.model.ContentTypes._
import akka.http.scaladsl.testkit.ScalatestRouteTest import akka.http.scaladsl.testkit.ScalatestRouteTest
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.protocol.dlc.models.DLCMessage.DLCOffer import org.bitcoins.core.protocol.dlc.models.DLCMessage.DLCOffer
import org.bitcoins.core.protocol.dlc.models.DLCStatus import org.bitcoins.core.protocol.dlc.models.DLCStatus
import org.bitcoins.core.protocol.tlv.{DLCOfferTLV, LnMessageFactory} import org.bitcoins.core.protocol.tlv.{DLCOfferTLV, LnMessageFactory}
import org.bitcoins.core.wallet.fee.{FeeUnit, SatoshisPerVirtualByte} import org.bitcoins.core.wallet.fee.{FeeUnit, SatoshisPerVirtualByte}
import org.bitcoins.crypto.Sha256Digest import org.bitcoins.crypto.Sha256Digest
import org.bitcoins.feeprovider.ConstantFeeRateProvider
import org.bitcoins.node.Node
import org.bitcoins.server.routes.ServerCommand import org.bitcoins.server.routes.ServerCommand
import org.bitcoins.testkit.BitcoinSTestAppConfig import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.wallet.MockWalletApi import org.bitcoins.wallet.{MockWalletApi, WalletHolder}
import org.scalamock.scalatest.MockFactory import org.scalamock.scalatest.MockFactory
import org.scalatest.wordspec.AnyWordSpec import org.scalatest.wordspec.AnyWordSpec
@ -22,10 +25,24 @@ class WalletRoutesSpec
implicit val conf: BitcoinSAppConfig = implicit val conf: BitcoinSAppConfig =
BitcoinSTestAppConfig.getNeutrinoTestConfig() BitcoinSTestAppConfig.getNeutrinoTestConfig()
val mockWalletApi = mock[MockWalletApi]
val mockChainApi: ChainApi = mock[ChainApi]
val mockNode: Node = mock[Node]
val mockWalletApi: MockWalletApi = mock[MockWalletApi]
val walletHolder = new WalletHolder(Some(mockWalletApi))
val feeRateApi = ConstantFeeRateProvider(SatoshisPerVirtualByte.one)
val walletLoader: DLCWalletNeutrinoBackendLoader =
DLCWalletNeutrinoBackendLoader(walletHolder,
mockChainApi,
mockNode,
feeRateApi)
val walletRoutes: WalletRoutes = val walletRoutes: WalletRoutes =
WalletRoutes(mockWalletApi)(system, conf.walletConf) WalletRoutes(walletLoader)(system, conf.walletConf)
"WalletRoutes" should { "WalletRoutes" should {
"estimatefee" in { "estimatefee" in {

View file

@ -115,7 +115,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
_ = logger.info(s"Stopped ${nodeConf.nodeType.shortName} node") _ = logger.info(s"Stopped ${nodeConf.nodeType.shortName} node")
} yield { } yield {
//return empty wallet holder //return empty wallet holder
new WalletHolder() WalletHolder.empty
} }
} }
@ -145,7 +145,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
torConf.socks5ProxyParams, torConf.socks5ProxyParams,
network) network)
//get our wallet //get our wallet
val walletHolder = new WalletHolder() val walletHolder = WalletHolder.empty
val neutrinoWalletLoaderF = { val neutrinoWalletLoaderF = {
for { for {
node <- nodeF node <- nodeF
@ -224,7 +224,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
_ <- startHttpServer( _ <- startHttpServer(
nodeApiF = startedNodeF, nodeApiF = startedNodeF,
chainApi = chainApi, chainApi = chainApi,
walletF = configuredWalletF.map(_._1), walletLoaderF = neutrinoWalletLoaderF,
dlcNodeF = startedDLCNodeF, dlcNodeF = startedDLCNodeF,
torConfStarted = startedTorConfigF, torConfStarted = startedTorConfigF,
serverCmdLineArgs = serverArgParser, serverCmdLineArgs = serverArgParser,
@ -330,7 +330,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
WalletAppConfig.DEFAULT_WALLET_NAME) WalletAppConfig.DEFAULT_WALLET_NAME)
} yield walletName } yield walletName
val walletHolder = new WalletHolder() val walletHolder = WalletHolder.empty
val chainCallbacksF = for { val chainCallbacksF = for {
bitcoind <- bitcoindF bitcoind <- bitcoindF
} yield { } yield {
@ -399,7 +399,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
_ <- startHttpServer( _ <- startHttpServer(
nodeApiF = Future.successful(bitcoind), nodeApiF = Future.successful(bitcoind),
chainApi = bitcoind, chainApi = bitcoind,
walletF = walletF.map(_._1), walletLoaderF = loadWalletApiF,
dlcNodeF = dlcNodeF, dlcNodeF = dlcNodeF,
torConfStarted = startedTorConfigF, torConfStarted = startedTorConfigF,
serverCmdLineArgs = serverArgParser, serverCmdLineArgs = serverArgParser,
@ -440,7 +440,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
private def startHttpServer( private def startHttpServer(
nodeApiF: Future[NodeApi], nodeApiF: Future[NodeApi],
chainApi: ChainApi, chainApi: ChainApi,
walletF: Future[WalletHolder], walletLoaderF: Future[DLCWalletLoaderApi],
dlcNodeF: Future[DLCNode], dlcNodeF: Future[DLCNode],
torConfStarted: Future[Unit], torConfStarted: Future[Unit],
serverCmdLineArgs: ServerArgParser, serverCmdLineArgs: ServerArgParser,
@ -451,7 +451,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
implicit val walletConf: WalletAppConfig = conf.walletConf implicit val walletConf: WalletAppConfig = conf.walletConf
val walletRoutesF = { val walletRoutesF = {
walletF.map { w => walletLoaderF.map { w =>
WalletRoutes(w) WalletRoutes(w)
} }
} }

View file

@ -32,8 +32,10 @@ sealed trait DLCWalletLoaderApi extends Logging with StartStopAsync[Unit] {
implicit protected def system: ActorSystem implicit protected def system: ActorSystem
implicit private def ec: ExecutionContext = system.dispatcher implicit private def ec: ExecutionContext = system.dispatcher
def walletHolder: WalletHolder
/** Determine if a wallet has been loaded */ /** Determine if a wallet has been loaded */
def isWalletLoaded: Boolean def isWalletLoaded: Boolean = walletHolder.isInitialized
def load( def load(
walletNameOpt: Option[String], walletNameOpt: Option[String],
@ -337,7 +339,6 @@ case class DLCWalletBitcoindBackendLoader(
extends DLCWalletLoaderApi { extends DLCWalletLoaderApi {
import system.dispatcher import system.dispatcher
implicit private val nodeConf = conf.nodeConf implicit private val nodeConf = conf.nodeConf
override def isWalletLoaded: Boolean = walletHolder.isInitialized
override def load( override def load(
walletNameOpt: Option[String], walletNameOpt: Option[String],

View file

@ -8,7 +8,6 @@ import akka.stream.Materializer
import grizzled.slf4j.Logging import grizzled.slf4j.Logging
import org.bitcoins.commons.rpc._ import org.bitcoins.commons.rpc._
import org.bitcoins.commons.serializers.Picklers._ import org.bitcoins.commons.serializers.Picklers._
import org.bitcoins.core.api.dlc.wallet.DLCNeutrinoHDWalletApi
import org.bitcoins.core.api.wallet.db.SpendingInfoDb import org.bitcoins.core.api.wallet.db.SpendingInfoDb
import org.bitcoins.core.currency._ import org.bitcoins.core.currency._
import org.bitcoins.core.protocol.tlv._ import org.bitcoins.core.protocol.tlv._
@ -25,6 +24,7 @@ import org.bitcoins.crypto.NetworkElement
import org.bitcoins.keymanager._ import org.bitcoins.keymanager._
import org.bitcoins.keymanager.config.KeyManagerAppConfig import org.bitcoins.keymanager.config.KeyManagerAppConfig
import org.bitcoins.server.routes.{Server, ServerCommand, ServerRoute} import org.bitcoins.server.routes.{Server, ServerCommand, ServerRoute}
import org.bitcoins.wallet.WalletHolder
import org.bitcoins.wallet.config.WalletAppConfig import org.bitcoins.wallet.config.WalletAppConfig
import ujson._ import ujson._
import upickle.default._ import upickle.default._
@ -35,13 +35,16 @@ import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future} import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success} import scala.util.{Failure, Success}
case class WalletRoutes(wallet: DLCNeutrinoHDWalletApi)(implicit case class WalletRoutes(loader: DLCWalletLoaderApi)(implicit
system: ActorSystem, system: ActorSystem,
walletConf: WalletAppConfig) walletConf: WalletAppConfig)
extends ServerRoute extends ServerRoute
with Logging { with Logging {
import system.dispatcher import system.dispatcher
/** The loaded wallet that requests should be directed against */
private[this] val wallet: WalletHolder = loader.walletHolder
implicit private val kmConf: KeyManagerAppConfig = walletConf.kmConf implicit private val kmConf: KeyManagerAppConfig = walletConf.kmConf
private var rescanStateOpt: Option[RescanState] = None private var rescanStateOpt: Option[RescanState] = None

View file

@ -71,7 +71,7 @@ object RescanState {
} }
} }
/** Returns a Future that is compelted when a rescan is fully executed. /** Returns a Future that is completed when a rescan is fully executed.
* This means that the rescan was NOT terminated externally by completing * This means that the rescan was NOT terminated externally by completing
* the akka stream that underlies the rescan logic. * the akka stream that underlies the rescan logic.
*/ */

View file

@ -26,7 +26,7 @@ trait WalletLoaderFixtures
chainQueryApi = bitcoind, chainQueryApi = bitcoind,
feeRateApi = bitcoind) feeRateApi = bitcoind)
walletHolder = new WalletHolder() walletHolder = WalletHolder.empty
loader = DLCWalletBitcoindBackendLoader( loader = DLCWalletBitcoindBackendLoader(
walletHolder = walletHolder, walletHolder = walletHolder,
bitcoind = bitcoind, bitcoind = bitcoind,

View file

@ -61,11 +61,13 @@ import scala.concurrent.{ExecutionContext, Future}
class WalletNotInitialized extends Exception("The wallet is not initialized") class WalletNotInitialized extends Exception("The wallet is not initialized")
class WalletHolder(implicit ec: ExecutionContext) class WalletHolder(initWalletOpt: Option[DLCNeutrinoHDWalletApi])(implicit
ec: ExecutionContext)
extends DLCNeutrinoHDWalletApi extends DLCNeutrinoHDWalletApi
with Logging { with Logging {
@volatile private var walletOpt: Option[DLCNeutrinoHDWalletApi] = None @volatile private var walletOpt: Option[DLCNeutrinoHDWalletApi] =
initWalletOpt
private def wallet: DLCNeutrinoHDWalletApi = synchronized { private def wallet: DLCNeutrinoHDWalletApi = synchronized {
walletOpt match { walletOpt match {
@ -1002,3 +1004,9 @@ class WalletHolder(implicit ec: ExecutionContext)
newTags = newTags)) newTags = newTags))
} }
} }
object WalletHolder {
def empty(implicit ec: ExecutionContext): WalletHolder = new WalletHolder(
None)
}

View file

@ -13,13 +13,13 @@ import org.bitcoins.core.hd.{HDAccount, HDChainType}
import org.bitcoins.core.protocol.BlockStamp.BlockHeight import org.bitcoins.core.protocol.BlockStamp.BlockHeight
import org.bitcoins.core.protocol.script.ScriptPubKey import org.bitcoins.core.protocol.script.ScriptPubKey
import org.bitcoins.core.protocol.{BitcoinAddress, BlockStamp} import org.bitcoins.core.protocol.{BitcoinAddress, BlockStamp}
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.core.wallet.rescan.RescanState import org.bitcoins.core.wallet.rescan.RescanState
import org.bitcoins.crypto.DoubleSha256Digest import org.bitcoins.crypto.DoubleSha256Digest
import org.bitcoins.db.SafeDatabase import org.bitcoins.db.SafeDatabase
import org.bitcoins.wallet.{Wallet, WalletLogger} import org.bitcoins.wallet.{Wallet, WalletLogger}
import slick.dbio.{DBIOAction, Effect, NoStream} import slick.dbio.{DBIOAction, Effect, NoStream}
import java.util.concurrent.RejectedExecutionException
import scala.concurrent.{ExecutionContext, Future, Promise} import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success} import scala.util.{Failure, Success}
@ -100,11 +100,15 @@ private[wallet] trait RescanHandling extends WalletLogger {
state state
} }
res.recoverWith { case err: Throwable => res.recoverWith {
logger.error(s"Failed to rescan wallet", err) case _: RejectedExecutionException =>
stateDescriptorDAO println(s"Caught rejected execution exception")
.updateRescanning(false) Future.unit //don't do anything if its from the threadpool shutting down
.flatMap(_ => Future.failed(err)) case err: Throwable =>
logger.error(s"Failed to rescan wallet", err)
stateDescriptorDAO
.updateRescanning(false)
.flatMap(_ => Future.failed(err))
} }
res.map { res.map {
@ -123,7 +127,9 @@ private[wallet] trait RescanHandling extends WalletLogger {
Future.successful(RescanState.RescanAlreadyStarted) Future.successful(RescanState.RescanAlreadyStarted)
} }
} yield rescanState } yield {
rescanState
}
} }
lazy val walletCreationBlockHeight: Future[BlockHeight] = lazy val walletCreationBlockHeight: Future[BlockHeight] =
@ -138,7 +144,6 @@ private[wallet] trait RescanHandling extends WalletLogger {
parallelism: Int, parallelism: Int,
filterBatchSize: Int): RescanState.RescanStarted = { filterBatchSize: Int): RescanState.RescanStarted = {
val scriptsF = generateScriptPubKeys(account, addressBatchSize) val scriptsF = generateScriptPubKeys(account, addressBatchSize)
//by completing the promise returned by this sink //by completing the promise returned by this sink
//we will be able to arbitrarily terminate the stream //we will be able to arbitrarily terminate the stream
//see: https://doc.akka.io/docs/akka/current/stream/operators/Source/maybe.html //see: https://doc.akka.io/docs/akka/current/stream/operators/Source/maybe.html
@ -269,6 +274,38 @@ private[wallet] trait RescanHandling extends WalletLogger {
inProgress <- matchBlocks(endOpt = endOpt, inProgress <- matchBlocks(endOpt = endOpt,
startOpt = startOpt, startOpt = startOpt,
account = account) account = account)
_ = recursiveRescan(prevState = inProgress,
startOpt = startOpt,
endOpt = endOpt,
addressBatchSize = addressBatchSize,
addressCount = addressCount,
account = account)
} yield {
inProgress
}
}
/** Used to call a recursive rescan after the previous rescan is complete.
* The [[prevState]] parameter is what represents the previous rescan.
* We wait for this rescan to complete, and then check if we need to
* do another rescan
*/
private def recursiveRescan(
prevState: RescanState,
startOpt: Option[BlockStamp],
endOpt: Option[BlockStamp],
addressBatchSize: Int,
addressCount: Int,
account: HDAccount): Future[Unit] = {
val awaitPreviousRescanF = prevState match {
case r @ (_: RescanState.RescanStarted | RescanState.RescanDone) =>
RescanState.awaitRescanComplete(r)
case RescanState.RescanAlreadyStarted =>
//don't continue rescanning if previous rescan was not started
Future.unit
}
for {
_ <- awaitPreviousRescanF
externalGap <- calcAddressGap(HDChainType.External, account) externalGap <- calcAddressGap(HDChainType.External, account)
changeGap <- calcAddressGap(HDChainType.Change, account) changeGap <- calcAddressGap(HDChainType.Change, account)
_ <- { _ <- {
@ -290,7 +327,7 @@ private[wallet] trait RescanHandling extends WalletLogger {
doNeutrinoRescan(account, startOpt, endOpt, addressBatchSize) doNeutrinoRescan(account, startOpt, endOpt, addressBatchSize)
} }
} }
} yield inProgress } yield ()
} }
private def calcAddressGap( private def calcAddressGap(
@ -415,14 +452,16 @@ private[wallet] trait RescanHandling extends WalletLogger {
Vector[Int], Vector[Int],
Vector[ChainQueryApi.FilterResponse], Vector[ChainQueryApi.FilterResponse],
NotUsed] = { NotUsed] = {
Flow[Vector[Int]].mapAsync(FutureUtil.getParallelism) { //parallelism as 1 here because `getFiltersBetweenHeights`
case range: Vector[Int] => //fetches filters in parallel. We can run into our max open requests
val startHeight = range.head //allowed by akka if we have parallelism more than 1 here
val endHeight = range.last Flow[Vector[Int]].mapAsync(1) { case range: Vector[Int] =>
logger.info( val startHeight = range.head
s"Searching filters from start=$startHeight to end=$endHeight") val endHeight = range.last
chainQueryApi.getFiltersBetweenHeights(startHeight = startHeight, logger.info(
endHeight = endHeight) s"Searching filters from start=$startHeight to end=$endHeight")
chainQueryApi.getFiltersBetweenHeights(startHeight = startHeight,
endHeight = endHeight)
} }
} }