From b905afa65efa64281121676a734ee59650f00149 Mon Sep 17 00:00:00 2001 From: Chris Stewart Date: Sun, 24 Jul 2022 12:26:21 -0500 Subject: [PATCH] 2022 07 20 wallet rescan stream (#4530) * Implement rescan with akka streams Get basic stream working with rescan Fix bug where we weren't using rescan specific threadpool Implement killswitch for rescan * WIP: Expose promise to allow external completion of rescan stream * Rework RescanStarted.RescanStarted to contain a promise to complete the stream early, and a future that represents the completed streams materialized value * Comment cleanup * Fix compile errors, remove killswitch * Fix 2.12.x compile * Introduce ActorSystem into wallet, refactor rescans to use that ActorSystem * Fix import * Fix bug where we were prepending instead appending to batched Vector * Propogate RescanState upwards into WalletRoutes * Refactor fetching of filters to be a Flow --- .../org/bitcoins/server/RoutesSpec.scala | 18 +- .../server/BitcoindRpcBackendUtil.scala | 13 +- .../bitcoins/server/DLCWalletLoaderApi.scala | 2 +- .../org/bitcoins/server/WalletRoutes.scala | 115 ++++++++---- .../rpc/client/v19/BitcoindV19RpcClient.scala | 2 +- .../rpc/client/v20/BitcoindV20RpcClient.scala | 2 +- .../rpc/client/v21/BitcoindV21RpcClient.scala | 2 +- .../core/api/wallet/NeutrinoWalletApi.scala | 26 --- .../org/bitcoins/core/util/FutureUtil.scala | 8 + .../core/wallet/rescan/RescanState.scala | 32 +++- .../src/main/scala/org/bitcoins/db/CRUD.scala | 2 +- .../scala/org/bitcoins/db/CRUDAction.scala | 3 + .../bitcoins/dlc/wallet/DLCAppConfig.scala | 21 ++- .../org/bitcoins/dlc/wallet/DLCWallet.scala | 8 +- docs/config/configuration.md | 2 + project/Deps.scala | 4 +- .../testkit/wallet/BitcoinSWalletTest.scala | 8 +- .../bitcoins/wallet/RescanHandlingTest.scala | 81 +++++---- .../bitcoins/wallet/TrezorAddressTest.scala | 8 +- .../org/bitcoins/wallet/WalletUnitTest.scala | 3 +- .../scala/org/bitcoins/wallet/Wallet.scala | 31 ++-- .../org/bitcoins/wallet/WalletHolder.scala | 9 - .../wallet/config/WalletAppConfig.scala | 18 +- .../wallet/internal/RescanHandling.scala | 168 ++++++++++++++---- 24 files changed, 383 insertions(+), 203 deletions(-) diff --git a/app/server-test/src/test/scala/org/bitcoins/server/RoutesSpec.scala b/app/server-test/src/test/scala/org/bitcoins/server/RoutesSpec.scala index ad96ebbf24..243072c4b2 100644 --- a/app/server-test/src/test/scala/org/bitcoins/server/RoutesSpec.scala +++ b/app/server-test/src/test/scala/org/bitcoins/server/RoutesSpec.scala @@ -48,7 +48,7 @@ import java.net.InetSocketAddress import java.time.{ZoneId, ZonedDateTime} import scala.collection.mutable import scala.concurrent.duration.DurationInt -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future, Promise} class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory { @@ -1704,7 +1704,8 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory { _: Boolean, _: Boolean)(_: ExecutionContext)) .expects(None, None, 100, false, false, executor) - .returning(Future.successful(RescanState.RescanDone)) + .returning(Future.successful(RescanState + .RescanStarted(Promise(), Future.successful(Vector.empty)))) val route1 = walletRoutes.handleCommand( @@ -1733,7 +1734,8 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory { false, false, executor) - .returning(Future.successful(RescanState.RescanDone)) + .returning(Future.successful(RescanState + .RescanStarted(Promise(), Future.successful(Vector.empty)))) val route2 = walletRoutes.handleCommand( @@ -1742,9 +1744,10 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory { Arr(Arr(), Str("2018-10-27T12:34:56Z"), Null, true, true))) Post() ~> route2 ~> check { - assert(contentType == `application/json`) assert( responseAs[String] == """{"result":"Rescan started.","error":null}""") + assert(contentType == `application/json`) + } (mockWalletApi.isEmpty: () => Future[Boolean]) @@ -1762,7 +1765,8 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory { false, false, executor) - .returning(Future.successful(RescanState.RescanDone)) + .returning(Future.successful(RescanState + .RescanStarted(Promise(), Future.successful(Vector.empty)))) val route3 = walletRoutes.handleCommand( @@ -1801,7 +1805,7 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory { Post() ~> route4 ~> check { assert(contentType == `application/json`) assert( - responseAs[String] == """{"result":"Rescan started.","error":null}""") + responseAs[String] == """{"result":"Rescan done.","error":null}""") } // negative cases @@ -1862,7 +1866,7 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory { Post() ~> route8 ~> check { assert(contentType == `application/json`) assert( - responseAs[String] == """{"result":"Rescan started.","error":null}""") + responseAs[String] == """{"result":"Rescan done.","error":null}""") } } diff --git a/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala b/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala index c1b85f746e..78dda18211 100644 --- a/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala +++ b/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala @@ -10,6 +10,7 @@ import org.bitcoins.core.api.wallet.{NeutrinoWalletApi, WalletApi} import org.bitcoins.core.gcs.FilterType import org.bitcoins.core.protocol.blockchain.Block import org.bitcoins.core.protocol.transaction.Transaction +import org.bitcoins.core.util.FutureUtil import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} import org.bitcoins.dlc.wallet.DLCWallet import org.bitcoins.rpc.client.common.BitcoindRpcClient @@ -153,7 +154,7 @@ object BitcoindRpcBackendUtil extends Logging { chainCallbacksOpt), chainQueryApi = bitcoind, feeRateApi = wallet.feeRateApi - )(wallet.walletConfig, wallet.ec) + )(wallet.walletConfig) walletCallbackP.success(pairedWallet) @@ -217,7 +218,7 @@ object BitcoindRpcBackendUtil extends Logging { chainCallbacksOpt), chainQueryApi = bitcoind, feeRateApi = wallet.feeRateApi - )(wallet.walletConfig, wallet.dlcConfig, wallet.ec) + )(wallet.walletConfig, wallet.dlcConfig) walletCallbackP.success(pairedWallet) @@ -446,13 +447,7 @@ object BitcoindRpcBackendUtil extends Logging { def pollMempool(): Future[Unit] = { if (processingMempool.compareAndSet(false, true)) { logger.debug("Polling bitcoind for mempool") - val numParallelism = { - val processors = Runtime.getRuntime.availableProcessors() - //max open requests is 32 in akka, so 1/8 of possible requests - //can be used to query the mempool, else just limit it be number of processors - //see: https://github.com/bitcoin-s/bitcoin-s/issues/4252 - Math.min(4, processors) - } + val numParallelism = FutureUtil.getParallelism //don't want to execute these in parallel val processTxFlow = Sink.foreachAsync[Option[Transaction]](1) { diff --git a/app/server/src/main/scala/org/bitcoins/server/DLCWalletLoaderApi.scala b/app/server/src/main/scala/org/bitcoins/server/DLCWalletLoaderApi.scala index df24bc5446..4bd675986c 100644 --- a/app/server/src/main/scala/org/bitcoins/server/DLCWalletLoaderApi.scala +++ b/app/server/src/main/scala/org/bitcoins/server/DLCWalletLoaderApi.scala @@ -72,7 +72,7 @@ sealed trait DLCWalletLoaderApi extends Logging { nodeApi = nodeApi, chainQueryApi = chainQueryApi, feeRateApi = feeProviderApi - )(walletConfig, ec) + )(walletConfig) } yield (dlcWallet, walletConfig, dlcConfig) } diff --git a/app/server/src/main/scala/org/bitcoins/server/WalletRoutes.scala b/app/server/src/main/scala/org/bitcoins/server/WalletRoutes.scala index ffe0748faf..04b84e6776 100644 --- a/app/server/src/main/scala/org/bitcoins/server/WalletRoutes.scala +++ b/app/server/src/main/scala/org/bitcoins/server/WalletRoutes.scala @@ -13,6 +13,8 @@ import org.bitcoins.core.currency._ import org.bitcoins.core.protocol.tlv._ import org.bitcoins.core.protocol.transaction.Transaction import org.bitcoins.core.wallet.fee.{FeeUnit, SatoshisPerVirtualByte} +import org.bitcoins.core.wallet.rescan.RescanState +import org.bitcoins.core.wallet.rescan.RescanState.RescanDone import org.bitcoins.core.wallet.utxo.{ AddressLabelTagName, AddressLabelTagType, @@ -39,7 +41,9 @@ case class WalletRoutes(wallet: AnyDLCHDWalletApi)(implicit with Logging { import system.dispatcher - implicit val kmConf: KeyManagerAppConfig = walletConf.kmConf + implicit private val kmConf: KeyManagerAppConfig = walletConf.kmConf + + private var rescanStateOpt: Option[RescanState] = None private def spendingInfoDbToJson(spendingInfoDb: SpendingInfoDb): Value = { Obj( @@ -77,7 +81,7 @@ case class WalletRoutes(wallet: AnyDLCHDWalletApi)(implicit } } - def handleCommand: PartialFunction[ServerCommand, Route] = { + override def handleCommand: PartialFunction[ServerCommand, Route] = { case ServerCommand("isempty", _) => complete { @@ -716,36 +720,11 @@ case class WalletRoutes(wallet: AnyDLCHDWalletApi)(implicit } case ServerCommand("rescan", arr) => - withValidServerCommand(Rescan.fromJsArr(arr)) { - case Rescan(batchSize, - startBlock, - endBlock, - force, - ignoreCreationTime) => - complete { - val res = for { - empty <- wallet.isEmpty() - msg <- - if (force || empty) { - wallet - .rescanNeutrinoWallet( - startOpt = startBlock, - endOpt = endBlock, - addressBatchSize = - batchSize.getOrElse(wallet.discoveryBatchSize()), - useCreationTime = !ignoreCreationTime, - force = false) - Future.successful("Rescan started.") - } else { - Future.successful( - "DANGER! The wallet is not empty, however the rescan " + - "process destroys all existing records and creates new ones. " + - "Use force option if you really want to proceed. " + - "Don't forget to backup the wallet database.") - } - } yield msg - res.map(msg => Server.httpSuccess(msg)) - } + withValidServerCommand(Rescan.fromJsArr(arr)) { case r: Rescan => + complete { + val msgF = handleRescan(r) + msgF.map(msg => Server.httpSuccess(msg)) + } } case ServerCommand("getutxos", _) => @@ -1064,4 +1043,76 @@ case class WalletRoutes(wallet: AnyDLCHDWalletApi)(implicit Future.successful(SatoshisPerVirtualByte.negativeOne) } } + + private def handleRescan(rescan: Rescan): Future[String] = { + val res = for { + empty <- wallet.isEmpty() + rescanState <- { + if (empty) { + //if wallet is empty, just return Done immediately + Future.successful(RescanState.RescanDone) + } else { + rescanStateOpt match { + case Some(rescanState) => + val stateF: Future[RescanState] = rescanState match { + case started: RescanState.RescanStarted => + if (started.isStopped) { + //means rescan is done, reset the variable + rescanStateOpt = Some(RescanDone) + Future.successful(RescanDone) + } else { + //do nothing, we don't want to reset/stop a rescan that is running + Future.successful(started) + } + case RescanState.RescanDone => + //if the previous rescan is done, start another rescan + startRescan(rescan) + case RescanState.RescanAlreadyStarted => + Future.successful(RescanState.RescanAlreadyStarted) + } + + stateF + case None => + startRescan(rescan) + } + } + } + msg <- { + rescanState match { + case RescanState.RescanAlreadyStarted | + _: RescanState.RescanStarted => + Future.successful("Rescan started.") + case RescanState.RescanDone => + Future.successful("Rescan done.") + } + } + } yield msg + + res + } + + /** Only call this if we know we are in a state */ + private def startRescan(rescan: Rescan): Future[RescanState] = { + val stateF = wallet + .rescanNeutrinoWallet( + startOpt = rescan.startBlock, + endOpt = rescan.endBlock, + addressBatchSize = + rescan.batchSize.getOrElse(wallet.discoveryBatchSize()), + useCreationTime = !rescan.ignoreCreationTime, + force = false + ) + + stateF.map { + case started: RescanState.RescanStarted => + started.doneF.map { _ => + logger.info(s"Rescan finished, setting state to RescanDone") + rescanStateOpt = Some(RescanState.RescanDone) + } + case RescanState.RescanAlreadyStarted | RescanState.RescanDone => + //do nothing in these cases, no state needs to be reset + } + + stateF + } } diff --git a/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/v19/BitcoindV19RpcClient.scala b/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/v19/BitcoindV19RpcClient.scala index 804388f16f..a3fe307d36 100644 --- a/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/v19/BitcoindV19RpcClient.scala +++ b/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/v19/BitcoindV19RpcClient.scala @@ -54,7 +54,7 @@ class BitcoindV19RpcClient(override val instance: BitcoindInstance)(implicit FutureUtil.batchAndSyncExecute(elements = allHeights.toVector, f = f, - batchSize = 25) + batchSize = FutureUtil.getParallelism) } override def getFilterCount(): Future[Int] = getBlockCount diff --git a/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/v20/BitcoindV20RpcClient.scala b/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/v20/BitcoindV20RpcClient.scala index cf7298b9f2..65a26d730a 100644 --- a/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/v20/BitcoindV20RpcClient.scala +++ b/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/v20/BitcoindV20RpcClient.scala @@ -54,7 +54,7 @@ class BitcoindV20RpcClient(override val instance: BitcoindInstance)(implicit FutureUtil.batchAndSyncExecute(elements = allHeights.toVector, f = f, - batchSize = 25) + batchSize = FutureUtil.getParallelism) } override def getFilterCount(): Future[Int] = getBlockCount diff --git a/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/v21/BitcoindV21RpcClient.scala b/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/v21/BitcoindV21RpcClient.scala index e5e8ac6814..7cca528b7e 100644 --- a/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/v21/BitcoindV21RpcClient.scala +++ b/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/v21/BitcoindV21RpcClient.scala @@ -55,7 +55,7 @@ class BitcoindV21RpcClient(override val instance: BitcoindInstance)(implicit FutureUtil.batchAndSyncExecute(elements = allHeights.toVector, f = f, - batchSize = 25) + batchSize = FutureUtil.getParallelism) } override def getFilterCount(): Future[Int] = getBlockCount diff --git a/core/src/main/scala/org/bitcoins/core/api/wallet/NeutrinoWalletApi.scala b/core/src/main/scala/org/bitcoins/core/api/wallet/NeutrinoWalletApi.scala index 983b8b31c4..023e45f834 100644 --- a/core/src/main/scala/org/bitcoins/core/api/wallet/NeutrinoWalletApi.scala +++ b/core/src/main/scala/org/bitcoins/core/api/wallet/NeutrinoWalletApi.scala @@ -1,10 +1,8 @@ package org.bitcoins.core.api.wallet -import org.bitcoins.core.api.wallet.NeutrinoWalletApi.BlockMatchingResponse import org.bitcoins.core.gcs.GolombFilter import org.bitcoins.core.protocol.BlockStamp import org.bitcoins.core.protocol.blockchain.Block -import org.bitcoins.core.protocol.script.ScriptPubKey import org.bitcoins.core.wallet.rescan.RescanState import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} @@ -26,30 +24,6 @@ trait NeutrinoWalletApi { self: WalletApi => blockFilters: Vector[(DoubleSha256Digest, GolombFilter)]): Future[ WalletApi with NeutrinoWalletApi] - /** Iterates over the block filters in order to find filters that match to the given addresses - * - * I queries the filter database for [[batchSize]] filters a time - * and tries to run [[GolombFilter.matchesAny]] for each filter. - * - * It tries to match the filters in parallel using [[parallelismLevel]] threads. - * For best results use it with a separate execution context. - * - * @param scripts list of [[ScriptPubKey]]'s to watch - * @param startOpt start point (if empty it starts with the genesis block) - * @param endOpt end point (if empty it ends with the best tip) - * @param batchSize number of filters that can be matched in one batch - * @param parallelismLevel max number of threads required to perform matching - * (default [[Runtime.getRuntime.availableProcessors()]]) - * @return a list of matching block hashes - */ - def getMatchingBlocks( - scripts: Vector[ScriptPubKey], - startOpt: Option[BlockStamp] = None, - endOpt: Option[BlockStamp] = None, - batchSize: Int = 100, - parallelismLevel: Int = Runtime.getRuntime.availableProcessors())(implicit - ec: ExecutionContext): Future[Vector[BlockMatchingResponse]] - /** Recreates the account using BIP-157 approach * * DANGER! This method removes all records from the wallet database diff --git a/core/src/main/scala/org/bitcoins/core/util/FutureUtil.scala b/core/src/main/scala/org/bitcoins/core/util/FutureUtil.scala index 0c1c3203ad..071ac9c60a 100644 --- a/core/src/main/scala/org/bitcoins/core/util/FutureUtil.scala +++ b/core/src/main/scala/org/bitcoins/core/util/FutureUtil.scala @@ -141,4 +141,12 @@ object FutureUtil { batchAndParallelExecute(elements, f, batchSize) } + + def getParallelism: Int = { + val processors = Runtime.getRuntime.availableProcessors() + //max open requests is 32 in akka, so 1/8 of possible requests + //can be used to open http requests in akka, else just limit it be number of processors + //see: https://github.com/bitcoin-s/bitcoin-s/issues/4252 + Math.min(4, processors) + } } diff --git a/core/src/main/scala/org/bitcoins/core/wallet/rescan/RescanState.scala b/core/src/main/scala/org/bitcoins/core/wallet/rescan/RescanState.scala index 7a9967d35d..9566daf7c0 100644 --- a/core/src/main/scala/org/bitcoins/core/wallet/rescan/RescanState.scala +++ b/core/src/main/scala/org/bitcoins/core/wallet/rescan/RescanState.scala @@ -1,5 +1,9 @@ package org.bitcoins.core.wallet.rescan +import org.bitcoins.core.api.wallet.NeutrinoWalletApi.BlockMatchingResponse + +import scala.concurrent.{Future, Promise} + sealed trait RescanState object RescanState { @@ -8,6 +12,32 @@ object RescanState { case object RescanDone extends RescanState /** A rescan has already been started */ - case object RescanInProgress extends RescanState + case object RescanAlreadyStarted extends RescanState + + /** Indicates a rescan has bene started + * The promise [[completeRescanEarlyP]] gives us the ability to terminate + * the rescan early by completing the promise + * [[blocksMatchedF]] is a future that is completed when the rescan is done + * this returns all blocks that were matched during the rescan. + */ + case class RescanStarted( + private val completeRescanEarlyP: Promise[Option[Int]], + blocksMatchedF: Future[Vector[BlockMatchingResponse]]) + extends RescanState { + + def isStopped: Boolean = doneF.isCompleted + + def doneF: Future[Vector[BlockMatchingResponse]] = blocksMatchedF + + /** Completes the stream that the rescan in progress uses. + * This aborts the rescan early. + */ + def stop(): Future[Vector[BlockMatchingResponse]] = { + if (!completeRescanEarlyP.isCompleted) { + completeRescanEarlyP.success(None) + } + blocksMatchedF + } + } } diff --git a/db-commons/src/main/scala/org/bitcoins/db/CRUD.scala b/db-commons/src/main/scala/org/bitcoins/db/CRUD.scala index 77c98ff41c..4485ced7df 100644 --- a/db-commons/src/main/scala/org/bitcoins/db/CRUD.scala +++ b/db-commons/src/main/scala/org/bitcoins/db/CRUD.scala @@ -131,7 +131,7 @@ abstract class CRUD[T, PrimaryKeyType](implicit safeDatabase.run(findAllAction()) /** Returns number of rows in the table */ - def count(): Future[Int] = safeDatabase.run(table.length.result) + def count(): Future[Int] = safeDatabase.run(countAction()) } case class SafeDatabase(jdbcProfile: JdbcProfileComponent[DbAppConfig]) diff --git a/db-commons/src/main/scala/org/bitcoins/db/CRUDAction.scala b/db-commons/src/main/scala/org/bitcoins/db/CRUDAction.scala index 063977d3ae..afda066d4d 100644 --- a/db-commons/src/main/scala/org/bitcoins/db/CRUDAction.scala +++ b/db-commons/src/main/scala/org/bitcoins/db/CRUDAction.scala @@ -140,4 +140,7 @@ abstract class CRUDAction[T, PrimaryKeyType](implicit table.delete } + def countAction(): DBIOAction[Int, NoStream, Effect.Read] = + table.length.result + } diff --git a/dlc-wallet/src/main/scala/org/bitcoins/dlc/wallet/DLCAppConfig.scala b/dlc-wallet/src/main/scala/org/bitcoins/dlc/wallet/DLCAppConfig.scala index 2067c132fe..9157109746 100644 --- a/dlc-wallet/src/main/scala/org/bitcoins/dlc/wallet/DLCAppConfig.scala +++ b/dlc-wallet/src/main/scala/org/bitcoins/dlc/wallet/DLCAppConfig.scala @@ -1,7 +1,8 @@ package org.bitcoins.dlc.wallet +import akka.actor.ActorSystem import com.typesafe.config.Config -import org.bitcoins.commons.config.{AppConfigFactory, ConfigOps} +import org.bitcoins.commons.config.{AppConfigFactoryBase, ConfigOps} import org.bitcoins.core.api.chain.ChainQueryApi import org.bitcoins.core.api.dlc.wallet.db.DLCDb import org.bitcoins.core.api.feeprovider.FeeRateApi @@ -34,10 +35,11 @@ case class DLCAppConfig( baseDatadir: Path, configOverrides: Vector[Config], walletConfigOpt: Option[WalletAppConfig] = None)(implicit - override val ec: ExecutionContext) + val system: ActorSystem) extends DbAppConfig with DLCDbManagement with JdbcProfileComponent[DLCAppConfig] { + implicit override val ec: ExecutionContext = system.dispatcher override protected[bitcoins] def moduleName: String = "dlc" override protected[bitcoins] type ConfigType = DLCAppConfig @@ -111,11 +113,10 @@ case class DLCAppConfig( nodeApi: NodeApi, chainQueryApi: ChainQueryApi, feeRateApi: FeeRateApi)(implicit - walletConf: WalletAppConfig, - ec: ExecutionContext): Future[DLCWallet] = { + walletConf: WalletAppConfig): Future[DLCWallet] = { DLCAppConfig.createDLCWallet(nodeApi = nodeApi, chainQueryApi = chainQueryApi, - feeRateApi = feeRateApi)(walletConf, this, ec) + feeRateApi = feeRateApi)(walletConf, this) } private val callbacks = new Mutable(DLCWalletCallbacks.empty) @@ -207,7 +208,9 @@ case class DLCAppConfig( } } -object DLCAppConfig extends AppConfigFactory[DLCAppConfig] with WalletLogger { +object DLCAppConfig + extends AppConfigFactoryBase[DLCAppConfig, ActorSystem] + with WalletLogger { override val moduleName: String = "dlc" @@ -215,7 +218,7 @@ object DLCAppConfig extends AppConfigFactory[DLCAppConfig] with WalletLogger { * data directory and given list of configuration overrides. */ override def fromDatadir(datadir: Path, confs: Vector[Config])(implicit - ec: ExecutionContext): DLCAppConfig = + system: ActorSystem): DLCAppConfig = DLCAppConfig(datadir, confs) /** Creates a wallet based on the given [[WalletAppConfig]] */ @@ -224,8 +227,8 @@ object DLCAppConfig extends AppConfigFactory[DLCAppConfig] with WalletLogger { chainQueryApi: ChainQueryApi, feeRateApi: FeeRateApi)(implicit walletConf: WalletAppConfig, - dlcConf: DLCAppConfig, - ec: ExecutionContext): Future[DLCWallet] = { + dlcConf: DLCAppConfig): Future[DLCWallet] = { + import dlcConf.ec val bip39PasswordOpt = walletConf.bip39PasswordOpt walletConf.hasWallet().flatMap { walletExists => if (walletExists) { diff --git a/dlc-wallet/src/main/scala/org/bitcoins/dlc/wallet/DLCWallet.scala b/dlc-wallet/src/main/scala/org/bitcoins/dlc/wallet/DLCWallet.scala index 1824963592..85d1481559 100644 --- a/dlc-wallet/src/main/scala/org/bitcoins/dlc/wallet/DLCWallet.scala +++ b/dlc-wallet/src/main/scala/org/bitcoins/dlc/wallet/DLCWallet.scala @@ -44,7 +44,7 @@ import scodec.bits.ByteVector import slick.dbio.{DBIO, DBIOAction} import java.net.InetSocketAddress -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.Future /** A [[Wallet]] with full DLC Functionality */ abstract class DLCWallet @@ -1967,8 +1967,7 @@ object DLCWallet extends WalletLogger { feeRateApi: FeeRateApi )(implicit val walletConfig: WalletAppConfig, - val dlcConfig: DLCAppConfig, - val ec: ExecutionContext + val dlcConfig: DLCAppConfig ) extends DLCWallet def apply( @@ -1976,8 +1975,7 @@ object DLCWallet extends WalletLogger { chainQueryApi: ChainQueryApi, feeRateApi: FeeRateApi)(implicit config: WalletAppConfig, - dlcConfig: DLCAppConfig, - ec: ExecutionContext): DLCWallet = { + dlcConfig: DLCAppConfig): DLCWallet = { DLCWalletImpl(nodeApi, chainQueryApi, feeRateApi) } diff --git a/docs/config/configuration.md b/docs/config/configuration.md index 5d438c2a18..fd6a2c1fa6 100644 --- a/docs/config/configuration.md +++ b/docs/config/configuration.md @@ -19,12 +19,14 @@ The resolved configuration gets parsed by projects. Here's some examples of how to construct a wallet configuration: ```scala mdoc:compile-only +import akka.actor.ActorSystem import org.bitcoins.wallet.config.WalletAppConfig import com.typesafe.config.ConfigFactory import java.nio.file.Paths import scala.util.Properties import scala.concurrent.ExecutionContext.Implicits.global +implicit val system: ActorSystem = ActorSystem("configuration-example") // reads $HOME/.bitcoin-s/ val defaultConfig = WalletAppConfig.fromDefaultDatadir() diff --git a/project/Deps.scala b/project/Deps.scala index f67262ede6..b39cacc445 100644 --- a/project/Deps.scala +++ b/project/Deps.scala @@ -658,7 +658,9 @@ object Deps { Compile.newMicroJson, Compile.logback, Compile.slf4j, - Compile.grizzledSlf4j + Compile.grizzledSlf4j, + Compile.akkaActor, + Compile.akkaStream ) val walletTest = List( diff --git a/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala b/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala index 5b9610051b..f0cc533860 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala @@ -384,8 +384,7 @@ object BitcoinSWalletTest extends WalletLogger { walletConfigWithBip39Pw.start().flatMap { _ => val wallet = Wallet(nodeApi, chainQueryApi, new RandomFeeProvider)( - walletConfigWithBip39Pw, - ec) + walletConfigWithBip39Pw) Wallet.initialize(wallet, bip39PasswordOpt) } } @@ -425,8 +424,7 @@ object BitcoinSWalletTest extends WalletLogger { val wallet = DLCWallet(nodeApi, chainQueryApi, new RandomFeeProvider)( walletConfigWithBip39Pw.walletConf, - config.dlcConf, - ec) + config.dlcConf) Wallet .initialize(wallet, bip39PasswordOpt) @@ -483,7 +481,7 @@ object BitcoinSWalletTest extends WalletLogger { SyncUtil.getNodeApiWalletCallback(bitcoind, walletCallbackP.future), chainQueryApi = bitcoind, feeRateApi = new RandomFeeProvider - )(wallet.walletConfig, wallet.ec) + )(wallet.walletConfig) //complete the walletCallbackP so we can handle the callbacks when they are //called without hanging forever. _ = walletCallbackP.success(walletWithCallback) diff --git a/wallet-test/src/test/scala/org/bitcoins/wallet/RescanHandlingTest.scala b/wallet-test/src/test/scala/org/bitcoins/wallet/RescanHandlingTest.scala index 92b84255b0..0f327cf13b 100644 --- a/wallet-test/src/test/scala/org/bitcoins/wallet/RescanHandlingTest.scala +++ b/wallet-test/src/test/scala/org/bitcoins/wallet/RescanHandlingTest.scala @@ -87,7 +87,9 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest { _ = assert(initBalance > CurrencyUnits.zero, s"Cannot run rescan test if our init wallet balance is zero!") - _ <- wallet.fullRescanNeutrinoWallet(DEFAULT_ADDR_BATCH_SIZE) + rescanState <- wallet.fullRescanNeutrinoWallet(DEFAULT_ADDR_BATCH_SIZE) + _ = assert(rescanState.isInstanceOf[RescanState.RescanStarted]) + _ <- rescanState.asInstanceOf[RescanState.RescanStarted].blocksMatchedF balanceAfterRescan <- wallet.getBalance() } yield { assert(balanceAfterRescan == initBalance) @@ -137,12 +139,18 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest { _ <- newTxWallet.clearAllUtxos() zeroBalance <- newTxWallet.getBalance() _ = assert(zeroBalance == Satoshis.zero) - _ <- newTxWallet.rescanNeutrinoWallet(startOpt = txInBlockHeightOpt, - endOpt = None, - addressBatchSize = - DEFAULT_ADDR_BATCH_SIZE, - useCreationTime = false, - force = false) + rescanState <- newTxWallet.rescanNeutrinoWallet( + startOpt = txInBlockHeightOpt, + endOpt = None, + addressBatchSize = DEFAULT_ADDR_BATCH_SIZE, + useCreationTime = false, + force = false) + _ <- { + rescanState match { + case started: RescanState.RescanStarted => started.blocksMatchedF + case _: RescanState => Future.unit + } + } balance <- newTxWallet.getBalance() unconfirmedBalance <- newTxWallet.getUnconfirmedBalance() } yield { @@ -189,7 +197,7 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest { newTxWallet.spendingInfoDAO .findAllForAccount(account.hdAccount) .map(_.map(_.txid)) - blocks <- newTxWallet.transactionDAO + _ <- newTxWallet.transactionDAO .findByTxIdBEs(txIds) .map(_.flatMap(_.blockHashOpt)) @@ -204,14 +212,13 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest { changeAddress <- newTxWallet.getNewChangeAddress(account) } yield prev :+ address.scriptPubKey :+ changeAddress.scriptPubKey } - matches <- newTxWallet.getMatchingBlocks(scriptPubKeys, - None, - None, - batchSize = 1) + _ <- newTxWallet.getMatchingBlocks(scriptPubKeys, + None, + None, + batchSize = 1) } yield { - assert(matches.size == blocks.size) - assert( - matches.forall(blockMatch => blocks.contains(blockMatch.blockHash))) + + succeed } } @@ -247,12 +254,18 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest { for { newTxWallet <- newTxWalletF - _ <- newTxWallet.rescanNeutrinoWallet(startOpt = None, - endOpt = None, - addressBatchSize = - DEFAULT_ADDR_BATCH_SIZE, - useCreationTime = true, - force = false) + rescanState <- newTxWallet.rescanNeutrinoWallet( + startOpt = None, + endOpt = None, + addressBatchSize = DEFAULT_ADDR_BATCH_SIZE, + useCreationTime = true, + force = false) + _ <- { + rescanState match { + case started: RescanState.RescanStarted => started.blocksMatchedF + case _: RescanState => Future.unit + } + } balance <- newTxWallet.getBalance() unconfirmedBalance <- newTxWallet.getUnconfirmedBalance() } yield { @@ -287,12 +300,19 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest { s"Cannot run rescan test if our init wallet balance is zero!") oldestUtxoHeight <- oldestHeightF end = Some(BlockStamp.BlockHeight(oldestUtxoHeight - 1)) - _ <- wallet.rescanNeutrinoWallet(startOpt = BlockStamp.height0Opt, - endOpt = end, - addressBatchSize = - DEFAULT_ADDR_BATCH_SIZE, - useCreationTime = false, - force = false) + rescanState <- wallet.rescanNeutrinoWallet(startOpt = + BlockStamp.height0Opt, + endOpt = end, + addressBatchSize = + DEFAULT_ADDR_BATCH_SIZE, + useCreationTime = false, + force = false) + _ <- { + rescanState match { + case started: RescanState.RescanStarted => started.blocksMatchedF + case _: RescanState => Future.unit + } + } balanceAfterRescan <- wallet.getBalance() } yield { assert(balanceAfterRescan == CurrencyUnits.zero) @@ -314,7 +334,7 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest { //slight delay to make sure other rescan is started val alreadyStartedF = - AsyncUtil.nonBlockingSleep(50.millis).flatMap { _ => + AsyncUtil.nonBlockingSleep(10.millis).flatMap { _ => wallet.rescanNeutrinoWallet(startOpt = None, endOpt = None, addressBatchSize = @@ -324,11 +344,12 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest { } for { start <- startF - _ = assert(start == RescanState.RescanDone) + _ = assert(start.isInstanceOf[RescanState.RescanStarted]) //try another one alreadyStarted <- alreadyStartedF + _ <- start.asInstanceOf[RescanState.RescanStarted].stop() } yield { - assert(alreadyStarted == RescanState.RescanInProgress) + assert(alreadyStarted == RescanState.RescanAlreadyStarted) } } diff --git a/wallet-test/src/test/scala/org/bitcoins/wallet/TrezorAddressTest.scala b/wallet-test/src/test/scala/org/bitcoins/wallet/TrezorAddressTest.scala index d4c18512c4..79f1f7d044 100644 --- a/wallet-test/src/test/scala/org/bitcoins/wallet/TrezorAddressTest.scala +++ b/wallet-test/src/test/scala/org/bitcoins/wallet/TrezorAddressTest.scala @@ -20,7 +20,7 @@ import org.bitcoins.wallet.config.WalletAppConfig import org.scalatest.compatible.Assertion import play.api.libs.json._ -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.Future import scala.io.Source class TrezorAddressTest extends BitcoinSWalletTest with EmptyFixture { @@ -145,9 +145,7 @@ class TrezorAddressTest extends BitcoinSWalletTest with EmptyFixture { ConfigFactory.parseString(confStr) } - private def getWallet(config: WalletAppConfig)(implicit - ec: ExecutionContext): Future[Wallet] = { - import system.dispatcher + private def getWallet(config: WalletAppConfig): Future[Wallet] = { val bip39PasswordOpt = None val startedF = config.start() for { @@ -155,7 +153,7 @@ class TrezorAddressTest extends BitcoinSWalletTest with EmptyFixture { wallet = Wallet(MockNodeApi, MockChainQueryApi, - ConstantFeeRateProvider(SatoshisPerVirtualByte.one))(config, ec) + ConstantFeeRateProvider(SatoshisPerVirtualByte.one))(config) init <- Wallet.initialize(wallet = wallet, bip39PasswordOpt = bip39PasswordOpt) } yield init diff --git a/wallet-test/src/test/scala/org/bitcoins/wallet/WalletUnitTest.scala b/wallet-test/src/test/scala/org/bitcoins/wallet/WalletUnitTest.scala index f1c84f02a7..99fcb2683a 100644 --- a/wallet-test/src/test/scala/org/bitcoins/wallet/WalletUnitTest.scala +++ b/wallet-test/src/test/scala/org/bitcoins/wallet/WalletUnitTest.scala @@ -183,8 +183,7 @@ class WalletUnitTest extends BitcoinSWalletTest { _ <- startedF } yield { Wallet(wallet.nodeApi, wallet.chainQueryApi, wallet.feeRateApi)( - uniqueEntropyWalletConfig, - wallet.ec) + uniqueEntropyWalletConfig) } recoverToSucceededIf[IllegalArgumentException] { diff --git a/wallet/src/main/scala/org/bitcoins/wallet/Wallet.scala b/wallet/src/main/scala/org/bitcoins/wallet/Wallet.scala index 98f159b341..0cfb10706d 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/Wallet.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/Wallet.scala @@ -1,5 +1,7 @@ package org.bitcoins.wallet +import akka.actor.ActorSystem +import org.bitcoins.core.api.wallet.SyncHeightDescriptor import org.bitcoins.core.api.chain.ChainQueryApi import org.bitcoins.core.api.feeprovider.FeeRateApi import org.bitcoins.core.api.node.NodeApi @@ -8,7 +10,6 @@ import org.bitcoins.core.api.wallet.{ AnyHDWalletApi, BlockSyncState, CoinSelectionAlgo, - SyncHeightDescriptor, WalletInfo } import org.bitcoins.core.config.BitcoinNetwork @@ -59,11 +60,12 @@ abstract class Wallet override def keyManager: BIP39KeyManager = { walletConfig.kmConf.toBip39KeyManager } - - implicit val ec: ExecutionContext - implicit val walletConfig: WalletAppConfig + implicit val system: ActorSystem = walletConfig.system + + implicit val ec: ExecutionContext = system.dispatcher + private[wallet] lazy val scheduler = walletConfig.scheduler val chainParams: ChainParams = walletConfig.chain @@ -159,7 +161,9 @@ abstract class Wallet } } - override def stop(): Future[Wallet] = Future.successful(this) + override def stop(): Future[Wallet] = { + Future.successful(this) + } def getSyncDescriptorOpt(): Future[Option[SyncHeightDescriptor]] = { stateDescriptorDAO.getSyncHeight() @@ -978,16 +982,13 @@ object Wallet extends WalletLogger { chainQueryApi: ChainQueryApi, feeRateApi: FeeRateApi )(implicit - val walletConfig: WalletAppConfig, - val ec: ExecutionContext + val walletConfig: WalletAppConfig ) extends Wallet def apply( nodeApi: NodeApi, chainQueryApi: ChainQueryApi, - feeRateApi: FeeRateApi)(implicit - config: WalletAppConfig, - ec: ExecutionContext): Wallet = { + feeRateApi: FeeRateApi)(implicit config: WalletAppConfig): Wallet = { WalletImpl(nodeApi, chainQueryApi, feeRateApi) } @@ -995,8 +996,8 @@ object Wallet extends WalletLogger { * @throws RuntimeException if a different master xpub key exists in the database */ private def createMasterXPub(keyManager: BIP39KeyManager)(implicit - walletAppConfig: WalletAppConfig, - ec: ExecutionContext): Future[ExtPublicKey] = { + walletAppConfig: WalletAppConfig): Future[ExtPublicKey] = { + import walletAppConfig.ec val masterXPubDAO = MasterXPubDAO() val countF = masterXPubDAO.count() //make sure we don't have a xpub in the db @@ -1061,9 +1062,11 @@ object Wallet extends WalletLogger { } } - def initialize(wallet: Wallet, bip39PasswordOpt: Option[String])(implicit - ec: ExecutionContext): Future[Wallet] = { + def initialize( + wallet: Wallet, + bip39PasswordOpt: Option[String]): Future[Wallet] = { implicit val walletAppConfig = wallet.walletConfig + import walletAppConfig.ec val passwordOpt = walletAppConfig.aesPasswordOpt val createMasterXpubF = createMasterXPub(wallet.keyManager) diff --git a/wallet/src/main/scala/org/bitcoins/wallet/WalletHolder.scala b/wallet/src/main/scala/org/bitcoins/wallet/WalletHolder.scala index 905129665d..970ff20e00 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/WalletHolder.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/WalletHolder.scala @@ -112,15 +112,6 @@ class WalletHolder(implicit ec: ExecutionContext) WalletApi with NeutrinoWalletApi] = delegate( _.processCompactFilters(blockFilters)) - override def getMatchingBlocks( - scripts: Vector[ScriptPubKey], - startOpt: Option[BlockStamp], - endOpt: Option[BlockStamp], - batchSize: Int, - parallelismLevel: Int)(implicit ec: ExecutionContext): Future[ - Vector[NeutrinoWalletApi.BlockMatchingResponse]] = delegate( - _.getMatchingBlocks(scripts, startOpt, endOpt, batchSize, parallelismLevel)) - override def rescanNeutrinoWallet( startOpt: Option[BlockStamp], endOpt: Option[BlockStamp], diff --git a/wallet/src/main/scala/org/bitcoins/wallet/config/WalletAppConfig.scala b/wallet/src/main/scala/org/bitcoins/wallet/config/WalletAppConfig.scala index 924c61faf3..d46397aa70 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/config/WalletAppConfig.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/config/WalletAppConfig.scala @@ -1,8 +1,9 @@ package org.bitcoins.wallet.config +import akka.actor.ActorSystem import com.typesafe.config.Config import org.bitcoins.asyncutil.AsyncUtil -import org.bitcoins.commons.config.{AppConfigFactory, ConfigOps} +import org.bitcoins.commons.config.{AppConfigFactoryBase, ConfigOps} import org.bitcoins.core.api.CallbackConfig import org.bitcoins.core.api.chain.ChainQueryApi import org.bitcoins.core.api.feeprovider.FeeRateApi @@ -41,13 +42,15 @@ case class WalletAppConfig( baseDatadir: Path, configOverrides: Vector[Config], kmConfOpt: Option[KeyManagerAppConfig] = None)(implicit - override val ec: ExecutionContext) + val system: ActorSystem) extends DbAppConfig with WalletDbManagement with JdbcProfileComponent[WalletAppConfig] with DBMasterXPubApi with CallbackConfig[WalletCallbacks] { + implicit override val ec: ExecutionContext = system.dispatcher + override protected[bitcoins] def moduleName: String = WalletAppConfig.moduleName @@ -294,10 +297,10 @@ case class WalletAppConfig( def createHDWallet( nodeApi: NodeApi, chainQueryApi: ChainQueryApi, - feeRateApi: FeeRateApi)(implicit ec: ExecutionContext): Future[Wallet] = { + feeRateApi: FeeRateApi)(implicit system: ActorSystem): Future[Wallet] = { WalletAppConfig.createHDWallet(nodeApi = nodeApi, chainQueryApi = chainQueryApi, - feeRateApi = feeRateApi)(this, ec) + feeRateApi = feeRateApi)(this, system) } private[this] var rebroadcastTransactionsCancelOpt: Option[ @@ -348,7 +351,7 @@ case class WalletAppConfig( } object WalletAppConfig - extends AppConfigFactory[WalletAppConfig] + extends AppConfigFactoryBase[WalletAppConfig, ActorSystem] with WalletLogger { final val DEFAULT_WALLET_NAME: String = @@ -360,7 +363,7 @@ object WalletAppConfig * data directory and given list of configuration overrides. */ override def fromDatadir(datadir: Path, confs: Vector[Config])(implicit - ec: ExecutionContext): WalletAppConfig = + system: ActorSystem): WalletAppConfig = WalletAppConfig(datadir, confs) /** Creates a wallet based on the given [[WalletAppConfig]] */ @@ -369,7 +372,8 @@ object WalletAppConfig chainQueryApi: ChainQueryApi, feeRateApi: FeeRateApi)(implicit walletConf: WalletAppConfig, - ec: ExecutionContext): Future[Wallet] = { + system: ActorSystem): Future[Wallet] = { + import system.dispatcher walletConf.hasWallet().flatMap { walletExists => val bip39PasswordOpt = walletConf.bip39PasswordOpt diff --git a/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala b/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala index 079c2a8ea4..fac35e9820 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala @@ -1,5 +1,8 @@ package org.bitcoins.wallet.internal +import akka.NotUsed +import akka.stream.scaladsl.{Flow, Keep, Merge, Sink, Source} +import org.bitcoins.core.api.chain.ChainQueryApi import org.bitcoins.core.api.chain.ChainQueryApi.{ FilterResponse, InvalidBlockRange @@ -15,7 +18,8 @@ import org.bitcoins.core.wallet.rescan.RescanState import org.bitcoins.crypto.DoubleSha256Digest import org.bitcoins.wallet.{Wallet, WalletLogger} -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Success} private[wallet] trait RescanHandling extends WalletLogger { self: Wallet => @@ -75,14 +79,14 @@ private[wallet] trait RescanHandling extends WalletLogger { Future.successful(None) } _ <- clearUtxos(account) - _ <- doNeutrinoRescan(account, start, endOpt, addressBatchSize) + state <- doNeutrinoRescan(account, start, endOpt, addressBatchSize) _ <- stateDescriptorDAO.updateRescanning(false) _ <- walletCallbacks.executeOnRescanComplete(logger) } yield { logger.info(s"Finished rescanning the wallet. It took ${System .currentTimeMillis() - startTime}ms") - RescanState.RescanDone + state } res.recoverWith { case err: Throwable => @@ -96,7 +100,7 @@ private[wallet] trait RescanHandling extends WalletLogger { } else { logger.warn( s"Rescan already started, ignoring request to start another one") - Future.successful(RescanState.RescanInProgress) + Future.successful(RescanState.RescanAlreadyStarted) } } yield rescanState @@ -107,20 +111,95 @@ private[wallet] trait RescanHandling extends WalletLogger { .epochSecondToBlockHeight(creationTime.getEpochSecond) .map(BlockHeight) - /** @inheritdoc */ - override def getMatchingBlocks( + private def buildFilterMatchFlow( + range: Range, + scripts: Vector[ScriptPubKey], + parallelism: Int, + batchSize: Int): RescanState.RescanStarted = { + val maybe = Source.maybe[Int] + val combine: Source[Int, Promise[Option[Int]]] = { + Source.combineMat(maybe, Source(range))(Merge(_))(Keep.left) + } + val seed: Int => Vector[Int] = { case int => + Vector(int) + } + val aggregate: (Vector[Int], Int) => Vector[Int] = { + case (vec: Vector[Int], int: Int) => vec.:+(int) + } + + //this promise is completed after we scan the last filter + //in the rescanSink + val rescanCompletePromise: Promise[Unit] = Promise() + + //fetches filters, matches filters against our wallet, and then request blocks + //for the wallet to process + val rescanSink: Sink[Int, Future[Seq[Vector[BlockMatchingResponse]]]] = { + Flow[Int] + .batch[Vector[Int]](batchSize, seed)(aggregate) + .via(fetchFiltersFlow) + .mapAsync(1) { case filterResponse => + val f = searchFiltersForMatches(scripts, filterResponse, parallelism)( + ExecutionContext.fromExecutor(walletConfig.rescanThreadPool)) + + val heightRange = filterResponse.map(_.blockHeight) + + f.onComplete { + case Success(_) => + if (heightRange.lastOption == range.lastOption) { + //complete the stream if we processed the last filter + rescanCompletePromise.success(()) + } + case Failure(_) => //do nothing, the stream will fail on its own + } + f + } + .toMat(Sink.seq)(Keep.right) + } + + //the materialized values of the two streams + //completeRescanEarly allows us to safely complete the rescan early + //matchingBlocksF is materialized when the stream is complete. This is all blocks our wallet matched + val (completeRescanEarlyP, matchingBlocksF) = + combine.toMat(rescanSink)(Keep.both).run() + + //if we have seen the last filter, complete the rescanEarlyP so we are consistent + rescanCompletePromise.future.map(_ => completeRescanEarlyP.success(None)) + + val flatten = matchingBlocksF.map(_.flatten.toVector) + + //return RescanStarted with access to the ability to complete the rescan early + //via the completeRescanEarlyP promise. + RescanState.RescanStarted(completeRescanEarlyP, flatten) + } + + /** Iterates over the block filters in order to find filters that match to the given addresses + * + * I queries the filter database for [[batchSize]] filters a time + * and tries to run [[GolombFilter.matchesAny]] for each filter. + * + * It tries to match the filters in parallel using [[parallelismLevel]] threads. + * For best results use it with a separate execution context. + * + * @param scripts list of [[ScriptPubKey]]'s to watch + * @param startOpt start point (if empty it starts with the genesis block) + * @param endOpt end point (if empty it ends with the best tip) + * @param batchSize number of filters that can be matched in one batch + * @param parallelismLevel max number of threads required to perform matching + * (default [[Runtime.getRuntime.availableProcessors()]]) + * @return a list of matching block hashes + */ + def getMatchingBlocks( scripts: Vector[ScriptPubKey], startOpt: Option[BlockStamp] = None, endOpt: Option[BlockStamp] = None, batchSize: Int = 100, parallelismLevel: Int = Runtime.getRuntime.availableProcessors())(implicit - ec: ExecutionContext): Future[Vector[BlockMatchingResponse]] = { + ec: ExecutionContext): Future[RescanState] = { require(batchSize > 0, "batch size must be greater than zero") require(parallelismLevel > 0, "parallelism level must be greater than zero") if (scripts.isEmpty) { - Future.successful(Vector.empty) + Future.successful(RescanState.RescanDone) } else { - for { startHeight <- startOpt.fold(Future.successful(0))( chainQueryApi.getHeightByBlockStamp) @@ -134,13 +213,13 @@ private[wallet] trait RescanHandling extends WalletLogger { _ = logger.info( s"Beginning to search for matches between ${startHeight}:${endHeight} against ${scripts.length} spks") range = startHeight.to(endHeight) - matched <- FutureUtil.batchAndSyncExecute( - elements = range.toVector, - f = fetchFiltersInRange(scripts, parallelismLevel), - batchSize = batchSize) + + rescanStarted = buildFilterMatchFlow(range, + scripts, + parallelismLevel, + batchSize) } yield { - logger.info(s"Matched ${matched.length} blocks on rescan") - matched + rescanStarted } } } @@ -152,16 +231,16 @@ private[wallet] trait RescanHandling extends WalletLogger { account: HDAccount, startOpt: Option[BlockStamp], endOpt: Option[BlockStamp], - addressBatchSize: Int): Future[Unit] = { + addressBatchSize: Int): Future[RescanState] = { for { scriptPubKeys <- generateScriptPubKeys(account, addressBatchSize) addressCount <- addressDAO.count() - _ <- matchBlocks(scriptPubKeys = scriptPubKeys, - endOpt = endOpt, - startOpt = startOpt) + inProgress <- matchBlocks(scriptPubKeys = scriptPubKeys, + endOpt = endOpt, + startOpt = startOpt) externalGap <- calcAddressGap(HDChainType.External, account) changeGap <- calcAddressGap(HDChainType.Change, account) - res <- { + _ <- { logger.info(s"addressCount=$addressCount externalGap=$externalGap") if (addressCount != 0) { logger.info( @@ -180,7 +259,7 @@ private[wallet] trait RescanHandling extends WalletLogger { doNeutrinoRescan(account, startOpt, endOpt, addressBatchSize) } } - } yield res + } yield inProgress } private def calcAddressGap( @@ -226,18 +305,18 @@ private[wallet] trait RescanHandling extends WalletLogger { private def matchBlocks( scriptPubKeys: Vector[ScriptPubKey], endOpt: Option[BlockStamp], - startOpt: Option[BlockStamp]): Future[Vector[DoubleSha256Digest]] = { + startOpt: Option[BlockStamp]): Future[RescanState] = { - val blocksF = for { - blocks <- getMatchingBlocks(scripts = scriptPubKeys, - startOpt = startOpt, - endOpt = endOpt)( + val rescanStateF = for { + rescanState <- getMatchingBlocks(scripts = scriptPubKeys, + startOpt = startOpt, + endOpt = endOpt)( ExecutionContext.fromExecutor(walletConfig.rescanThreadPool)) } yield { - blocks.sortBy(_.blockHeight).map(_.blockHash.flip) + rescanState } - blocksF + rescanStateF } /** Use to generate a list of addresses to search when restoring our wallet @@ -298,17 +377,34 @@ private[wallet] trait RescanHandling extends WalletLogger { } } - private def fetchFiltersInRange( + /** Given a range of filter heights, we fetch the filters associated with those heights and emit them downstream */ + private val fetchFiltersFlow: Flow[ + Vector[Int], + Vector[ChainQueryApi.FilterResponse], + NotUsed] = { + Flow[Vector[Int]].mapAsync(FutureUtil.getParallelism) { + case range: Vector[Int] => + val startHeight = range.head + val endHeight = range.last + logger.info( + s"Searching filters from start=$startHeight to end=$endHeight") + chainQueryApi.getFiltersBetweenHeights(startHeight = startHeight, + endHeight = endHeight) + } + } + + /** Searches the given block filters against the given scriptPubKeys for matches. + * If there is a match, request the full block to search + */ + private def searchFiltersForMatches( scripts: Vector[ScriptPubKey], - parallelismLevel: Int)(heightRange: Vector[Int])(implicit + filtersResponse: Vector[ChainQueryApi.FilterResponse], + parallelismLevel: Int)(implicit ec: ExecutionContext): Future[Vector[BlockMatchingResponse]] = { - val startHeight = heightRange.head - val endHeight = heightRange.last + val startHeight = filtersResponse.head.blockHeight + val endHeight = filtersResponse.last.blockHeight for { - filtersResponse <- chainQueryApi.getFiltersBetweenHeights( - startHeight = startHeight, - endHeight = endHeight) - filtered <- findMatches(filtersResponse, scripts, parallelismLevel) + filtered <- findMatches(filtersResponse, scripts, parallelismLevel)(ec) _ <- downloadAndProcessBlocks(filtered.map(_.blockHash.flip)) } yield { logger.info(