2022 07 20 wallet rescan stream (#4530)

* Implement rescan with akka streams

Get basic stream working with rescan

Fix bug where we weren't using rescan specific threadpool

Implement killswitch for rescan

* WIP: Expose promise to allow external completion of rescan stream

* Rework RescanStarted.RescanStarted to contain a promise to complete the stream early, and a future that represents the completed streams materialized value

* Comment cleanup

* Fix compile errors, remove killswitch

* Fix 2.12.x compile

* Introduce ActorSystem into wallet, refactor rescans to use that ActorSystem

* Fix import

* Fix bug where we were prepending instead appending to batched Vector

* Propogate RescanState upwards into WalletRoutes

* Refactor fetching of filters to be a Flow
This commit is contained in:
Chris Stewart 2022-07-24 12:26:21 -05:00 committed by GitHub
parent 412e6a06c4
commit b905afa65e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 383 additions and 203 deletions

View file

@ -48,7 +48,7 @@ import java.net.InetSocketAddress
import java.time.{ZoneId, ZonedDateTime}
import scala.collection.mutable
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, Promise}
class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory {
@ -1704,7 +1704,8 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory {
_: Boolean,
_: Boolean)(_: ExecutionContext))
.expects(None, None, 100, false, false, executor)
.returning(Future.successful(RescanState.RescanDone))
.returning(Future.successful(RescanState
.RescanStarted(Promise(), Future.successful(Vector.empty))))
val route1 =
walletRoutes.handleCommand(
@ -1733,7 +1734,8 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory {
false,
false,
executor)
.returning(Future.successful(RescanState.RescanDone))
.returning(Future.successful(RescanState
.RescanStarted(Promise(), Future.successful(Vector.empty))))
val route2 =
walletRoutes.handleCommand(
@ -1742,9 +1744,10 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory {
Arr(Arr(), Str("2018-10-27T12:34:56Z"), Null, true, true)))
Post() ~> route2 ~> check {
assert(contentType == `application/json`)
assert(
responseAs[String] == """{"result":"Rescan started.","error":null}""")
assert(contentType == `application/json`)
}
(mockWalletApi.isEmpty: () => Future[Boolean])
@ -1762,7 +1765,8 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory {
false,
false,
executor)
.returning(Future.successful(RescanState.RescanDone))
.returning(Future.successful(RescanState
.RescanStarted(Promise(), Future.successful(Vector.empty))))
val route3 =
walletRoutes.handleCommand(
@ -1801,7 +1805,7 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory {
Post() ~> route4 ~> check {
assert(contentType == `application/json`)
assert(
responseAs[String] == """{"result":"Rescan started.","error":null}""")
responseAs[String] == """{"result":"Rescan done.","error":null}""")
}
// negative cases
@ -1862,7 +1866,7 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory {
Post() ~> route8 ~> check {
assert(contentType == `application/json`)
assert(
responseAs[String] == """{"result":"Rescan started.","error":null}""")
responseAs[String] == """{"result":"Rescan done.","error":null}""")
}
}

View file

@ -10,6 +10,7 @@ import org.bitcoins.core.api.wallet.{NeutrinoWalletApi, WalletApi}
import org.bitcoins.core.gcs.FilterType
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.dlc.wallet.DLCWallet
import org.bitcoins.rpc.client.common.BitcoindRpcClient
@ -153,7 +154,7 @@ object BitcoindRpcBackendUtil extends Logging {
chainCallbacksOpt),
chainQueryApi = bitcoind,
feeRateApi = wallet.feeRateApi
)(wallet.walletConfig, wallet.ec)
)(wallet.walletConfig)
walletCallbackP.success(pairedWallet)
@ -217,7 +218,7 @@ object BitcoindRpcBackendUtil extends Logging {
chainCallbacksOpt),
chainQueryApi = bitcoind,
feeRateApi = wallet.feeRateApi
)(wallet.walletConfig, wallet.dlcConfig, wallet.ec)
)(wallet.walletConfig, wallet.dlcConfig)
walletCallbackP.success(pairedWallet)
@ -446,13 +447,7 @@ object BitcoindRpcBackendUtil extends Logging {
def pollMempool(): Future[Unit] = {
if (processingMempool.compareAndSet(false, true)) {
logger.debug("Polling bitcoind for mempool")
val numParallelism = {
val processors = Runtime.getRuntime.availableProcessors()
//max open requests is 32 in akka, so 1/8 of possible requests
//can be used to query the mempool, else just limit it be number of processors
//see: https://github.com/bitcoin-s/bitcoin-s/issues/4252
Math.min(4, processors)
}
val numParallelism = FutureUtil.getParallelism
//don't want to execute these in parallel
val processTxFlow = Sink.foreachAsync[Option[Transaction]](1) {

View file

@ -72,7 +72,7 @@ sealed trait DLCWalletLoaderApi extends Logging {
nodeApi = nodeApi,
chainQueryApi = chainQueryApi,
feeRateApi = feeProviderApi
)(walletConfig, ec)
)(walletConfig)
} yield (dlcWallet, walletConfig, dlcConfig)
}

View file

@ -13,6 +13,8 @@ import org.bitcoins.core.currency._
import org.bitcoins.core.protocol.tlv._
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.wallet.fee.{FeeUnit, SatoshisPerVirtualByte}
import org.bitcoins.core.wallet.rescan.RescanState
import org.bitcoins.core.wallet.rescan.RescanState.RescanDone
import org.bitcoins.core.wallet.utxo.{
AddressLabelTagName,
AddressLabelTagType,
@ -39,7 +41,9 @@ case class WalletRoutes(wallet: AnyDLCHDWalletApi)(implicit
with Logging {
import system.dispatcher
implicit val kmConf: KeyManagerAppConfig = walletConf.kmConf
implicit private val kmConf: KeyManagerAppConfig = walletConf.kmConf
private var rescanStateOpt: Option[RescanState] = None
private def spendingInfoDbToJson(spendingInfoDb: SpendingInfoDb): Value = {
Obj(
@ -77,7 +81,7 @@ case class WalletRoutes(wallet: AnyDLCHDWalletApi)(implicit
}
}
def handleCommand: PartialFunction[ServerCommand, Route] = {
override def handleCommand: PartialFunction[ServerCommand, Route] = {
case ServerCommand("isempty", _) =>
complete {
@ -716,36 +720,11 @@ case class WalletRoutes(wallet: AnyDLCHDWalletApi)(implicit
}
case ServerCommand("rescan", arr) =>
withValidServerCommand(Rescan.fromJsArr(arr)) {
case Rescan(batchSize,
startBlock,
endBlock,
force,
ignoreCreationTime) =>
complete {
val res = for {
empty <- wallet.isEmpty()
msg <-
if (force || empty) {
wallet
.rescanNeutrinoWallet(
startOpt = startBlock,
endOpt = endBlock,
addressBatchSize =
batchSize.getOrElse(wallet.discoveryBatchSize()),
useCreationTime = !ignoreCreationTime,
force = false)
Future.successful("Rescan started.")
} else {
Future.successful(
"DANGER! The wallet is not empty, however the rescan " +
"process destroys all existing records and creates new ones. " +
"Use force option if you really want to proceed. " +
"Don't forget to backup the wallet database.")
}
} yield msg
res.map(msg => Server.httpSuccess(msg))
}
withValidServerCommand(Rescan.fromJsArr(arr)) { case r: Rescan =>
complete {
val msgF = handleRescan(r)
msgF.map(msg => Server.httpSuccess(msg))
}
}
case ServerCommand("getutxos", _) =>
@ -1064,4 +1043,76 @@ case class WalletRoutes(wallet: AnyDLCHDWalletApi)(implicit
Future.successful(SatoshisPerVirtualByte.negativeOne)
}
}
private def handleRescan(rescan: Rescan): Future[String] = {
val res = for {
empty <- wallet.isEmpty()
rescanState <- {
if (empty) {
//if wallet is empty, just return Done immediately
Future.successful(RescanState.RescanDone)
} else {
rescanStateOpt match {
case Some(rescanState) =>
val stateF: Future[RescanState] = rescanState match {
case started: RescanState.RescanStarted =>
if (started.isStopped) {
//means rescan is done, reset the variable
rescanStateOpt = Some(RescanDone)
Future.successful(RescanDone)
} else {
//do nothing, we don't want to reset/stop a rescan that is running
Future.successful(started)
}
case RescanState.RescanDone =>
//if the previous rescan is done, start another rescan
startRescan(rescan)
case RescanState.RescanAlreadyStarted =>
Future.successful(RescanState.RescanAlreadyStarted)
}
stateF
case None =>
startRescan(rescan)
}
}
}
msg <- {
rescanState match {
case RescanState.RescanAlreadyStarted |
_: RescanState.RescanStarted =>
Future.successful("Rescan started.")
case RescanState.RescanDone =>
Future.successful("Rescan done.")
}
}
} yield msg
res
}
/** Only call this if we know we are in a state */
private def startRescan(rescan: Rescan): Future[RescanState] = {
val stateF = wallet
.rescanNeutrinoWallet(
startOpt = rescan.startBlock,
endOpt = rescan.endBlock,
addressBatchSize =
rescan.batchSize.getOrElse(wallet.discoveryBatchSize()),
useCreationTime = !rescan.ignoreCreationTime,
force = false
)
stateF.map {
case started: RescanState.RescanStarted =>
started.doneF.map { _ =>
logger.info(s"Rescan finished, setting state to RescanDone")
rescanStateOpt = Some(RescanState.RescanDone)
}
case RescanState.RescanAlreadyStarted | RescanState.RescanDone =>
//do nothing in these cases, no state needs to be reset
}
stateF
}
}

View file

@ -54,7 +54,7 @@ class BitcoindV19RpcClient(override val instance: BitcoindInstance)(implicit
FutureUtil.batchAndSyncExecute(elements = allHeights.toVector,
f = f,
batchSize = 25)
batchSize = FutureUtil.getParallelism)
}
override def getFilterCount(): Future[Int] = getBlockCount

View file

@ -54,7 +54,7 @@ class BitcoindV20RpcClient(override val instance: BitcoindInstance)(implicit
FutureUtil.batchAndSyncExecute(elements = allHeights.toVector,
f = f,
batchSize = 25)
batchSize = FutureUtil.getParallelism)
}
override def getFilterCount(): Future[Int] = getBlockCount

View file

@ -55,7 +55,7 @@ class BitcoindV21RpcClient(override val instance: BitcoindInstance)(implicit
FutureUtil.batchAndSyncExecute(elements = allHeights.toVector,
f = f,
batchSize = 25)
batchSize = FutureUtil.getParallelism)
}
override def getFilterCount(): Future[Int] = getBlockCount

View file

@ -1,10 +1,8 @@
package org.bitcoins.core.api.wallet
import org.bitcoins.core.api.wallet.NeutrinoWalletApi.BlockMatchingResponse
import org.bitcoins.core.gcs.GolombFilter
import org.bitcoins.core.protocol.BlockStamp
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.script.ScriptPubKey
import org.bitcoins.core.wallet.rescan.RescanState
import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
@ -26,30 +24,6 @@ trait NeutrinoWalletApi { self: WalletApi =>
blockFilters: Vector[(DoubleSha256Digest, GolombFilter)]): Future[
WalletApi with NeutrinoWalletApi]
/** Iterates over the block filters in order to find filters that match to the given addresses
*
* I queries the filter database for [[batchSize]] filters a time
* and tries to run [[GolombFilter.matchesAny]] for each filter.
*
* It tries to match the filters in parallel using [[parallelismLevel]] threads.
* For best results use it with a separate execution context.
*
* @param scripts list of [[ScriptPubKey]]'s to watch
* @param startOpt start point (if empty it starts with the genesis block)
* @param endOpt end point (if empty it ends with the best tip)
* @param batchSize number of filters that can be matched in one batch
* @param parallelismLevel max number of threads required to perform matching
* (default [[Runtime.getRuntime.availableProcessors()]])
* @return a list of matching block hashes
*/
def getMatchingBlocks(
scripts: Vector[ScriptPubKey],
startOpt: Option[BlockStamp] = None,
endOpt: Option[BlockStamp] = None,
batchSize: Int = 100,
parallelismLevel: Int = Runtime.getRuntime.availableProcessors())(implicit
ec: ExecutionContext): Future[Vector[BlockMatchingResponse]]
/** Recreates the account using BIP-157 approach
*
* DANGER! This method removes all records from the wallet database

View file

@ -141,4 +141,12 @@ object FutureUtil {
batchAndParallelExecute(elements, f, batchSize)
}
def getParallelism: Int = {
val processors = Runtime.getRuntime.availableProcessors()
//max open requests is 32 in akka, so 1/8 of possible requests
//can be used to open http requests in akka, else just limit it be number of processors
//see: https://github.com/bitcoin-s/bitcoin-s/issues/4252
Math.min(4, processors)
}
}

View file

@ -1,5 +1,9 @@
package org.bitcoins.core.wallet.rescan
import org.bitcoins.core.api.wallet.NeutrinoWalletApi.BlockMatchingResponse
import scala.concurrent.{Future, Promise}
sealed trait RescanState
object RescanState {
@ -8,6 +12,32 @@ object RescanState {
case object RescanDone extends RescanState
/** A rescan has already been started */
case object RescanInProgress extends RescanState
case object RescanAlreadyStarted extends RescanState
/** Indicates a rescan has bene started
* The promise [[completeRescanEarlyP]] gives us the ability to terminate
* the rescan early by completing the promise
* [[blocksMatchedF]] is a future that is completed when the rescan is done
* this returns all blocks that were matched during the rescan.
*/
case class RescanStarted(
private val completeRescanEarlyP: Promise[Option[Int]],
blocksMatchedF: Future[Vector[BlockMatchingResponse]])
extends RescanState {
def isStopped: Boolean = doneF.isCompleted
def doneF: Future[Vector[BlockMatchingResponse]] = blocksMatchedF
/** Completes the stream that the rescan in progress uses.
* This aborts the rescan early.
*/
def stop(): Future[Vector[BlockMatchingResponse]] = {
if (!completeRescanEarlyP.isCompleted) {
completeRescanEarlyP.success(None)
}
blocksMatchedF
}
}
}

View file

@ -131,7 +131,7 @@ abstract class CRUD[T, PrimaryKeyType](implicit
safeDatabase.run(findAllAction())
/** Returns number of rows in the table */
def count(): Future[Int] = safeDatabase.run(table.length.result)
def count(): Future[Int] = safeDatabase.run(countAction())
}
case class SafeDatabase(jdbcProfile: JdbcProfileComponent[DbAppConfig])

View file

@ -140,4 +140,7 @@ abstract class CRUDAction[T, PrimaryKeyType](implicit
table.delete
}
def countAction(): DBIOAction[Int, NoStream, Effect.Read] =
table.length.result
}

View file

@ -1,7 +1,8 @@
package org.bitcoins.dlc.wallet
import akka.actor.ActorSystem
import com.typesafe.config.Config
import org.bitcoins.commons.config.{AppConfigFactory, ConfigOps}
import org.bitcoins.commons.config.{AppConfigFactoryBase, ConfigOps}
import org.bitcoins.core.api.chain.ChainQueryApi
import org.bitcoins.core.api.dlc.wallet.db.DLCDb
import org.bitcoins.core.api.feeprovider.FeeRateApi
@ -34,10 +35,11 @@ case class DLCAppConfig(
baseDatadir: Path,
configOverrides: Vector[Config],
walletConfigOpt: Option[WalletAppConfig] = None)(implicit
override val ec: ExecutionContext)
val system: ActorSystem)
extends DbAppConfig
with DLCDbManagement
with JdbcProfileComponent[DLCAppConfig] {
implicit override val ec: ExecutionContext = system.dispatcher
override protected[bitcoins] def moduleName: String = "dlc"
override protected[bitcoins] type ConfigType = DLCAppConfig
@ -111,11 +113,10 @@ case class DLCAppConfig(
nodeApi: NodeApi,
chainQueryApi: ChainQueryApi,
feeRateApi: FeeRateApi)(implicit
walletConf: WalletAppConfig,
ec: ExecutionContext): Future[DLCWallet] = {
walletConf: WalletAppConfig): Future[DLCWallet] = {
DLCAppConfig.createDLCWallet(nodeApi = nodeApi,
chainQueryApi = chainQueryApi,
feeRateApi = feeRateApi)(walletConf, this, ec)
feeRateApi = feeRateApi)(walletConf, this)
}
private val callbacks = new Mutable(DLCWalletCallbacks.empty)
@ -207,7 +208,9 @@ case class DLCAppConfig(
}
}
object DLCAppConfig extends AppConfigFactory[DLCAppConfig] with WalletLogger {
object DLCAppConfig
extends AppConfigFactoryBase[DLCAppConfig, ActorSystem]
with WalletLogger {
override val moduleName: String = "dlc"
@ -215,7 +218,7 @@ object DLCAppConfig extends AppConfigFactory[DLCAppConfig] with WalletLogger {
* data directory and given list of configuration overrides.
*/
override def fromDatadir(datadir: Path, confs: Vector[Config])(implicit
ec: ExecutionContext): DLCAppConfig =
system: ActorSystem): DLCAppConfig =
DLCAppConfig(datadir, confs)
/** Creates a wallet based on the given [[WalletAppConfig]] */
@ -224,8 +227,8 @@ object DLCAppConfig extends AppConfigFactory[DLCAppConfig] with WalletLogger {
chainQueryApi: ChainQueryApi,
feeRateApi: FeeRateApi)(implicit
walletConf: WalletAppConfig,
dlcConf: DLCAppConfig,
ec: ExecutionContext): Future[DLCWallet] = {
dlcConf: DLCAppConfig): Future[DLCWallet] = {
import dlcConf.ec
val bip39PasswordOpt = walletConf.bip39PasswordOpt
walletConf.hasWallet().flatMap { walletExists =>
if (walletExists) {

View file

@ -44,7 +44,7 @@ import scodec.bits.ByteVector
import slick.dbio.{DBIO, DBIOAction}
import java.net.InetSocketAddress
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.Future
/** A [[Wallet]] with full DLC Functionality */
abstract class DLCWallet
@ -1967,8 +1967,7 @@ object DLCWallet extends WalletLogger {
feeRateApi: FeeRateApi
)(implicit
val walletConfig: WalletAppConfig,
val dlcConfig: DLCAppConfig,
val ec: ExecutionContext
val dlcConfig: DLCAppConfig
) extends DLCWallet
def apply(
@ -1976,8 +1975,7 @@ object DLCWallet extends WalletLogger {
chainQueryApi: ChainQueryApi,
feeRateApi: FeeRateApi)(implicit
config: WalletAppConfig,
dlcConfig: DLCAppConfig,
ec: ExecutionContext): DLCWallet = {
dlcConfig: DLCAppConfig): DLCWallet = {
DLCWalletImpl(nodeApi, chainQueryApi, feeRateApi)
}

View file

@ -19,12 +19,14 @@ The resolved configuration gets parsed by
projects. Here's some examples of how to construct a wallet configuration:
```scala mdoc:compile-only
import akka.actor.ActorSystem
import org.bitcoins.wallet.config.WalletAppConfig
import com.typesafe.config.ConfigFactory
import java.nio.file.Paths
import scala.util.Properties
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system: ActorSystem = ActorSystem("configuration-example")
// reads $HOME/.bitcoin-s/
val defaultConfig = WalletAppConfig.fromDefaultDatadir()

View file

@ -658,7 +658,9 @@ object Deps {
Compile.newMicroJson,
Compile.logback,
Compile.slf4j,
Compile.grizzledSlf4j
Compile.grizzledSlf4j,
Compile.akkaActor,
Compile.akkaStream
)
val walletTest = List(

View file

@ -384,8 +384,7 @@ object BitcoinSWalletTest extends WalletLogger {
walletConfigWithBip39Pw.start().flatMap { _ =>
val wallet =
Wallet(nodeApi, chainQueryApi, new RandomFeeProvider)(
walletConfigWithBip39Pw,
ec)
walletConfigWithBip39Pw)
Wallet.initialize(wallet, bip39PasswordOpt)
}
}
@ -425,8 +424,7 @@ object BitcoinSWalletTest extends WalletLogger {
val wallet =
DLCWallet(nodeApi, chainQueryApi, new RandomFeeProvider)(
walletConfigWithBip39Pw.walletConf,
config.dlcConf,
ec)
config.dlcConf)
Wallet
.initialize(wallet, bip39PasswordOpt)
@ -483,7 +481,7 @@ object BitcoinSWalletTest extends WalletLogger {
SyncUtil.getNodeApiWalletCallback(bitcoind, walletCallbackP.future),
chainQueryApi = bitcoind,
feeRateApi = new RandomFeeProvider
)(wallet.walletConfig, wallet.ec)
)(wallet.walletConfig)
//complete the walletCallbackP so we can handle the callbacks when they are
//called without hanging forever.
_ = walletCallbackP.success(walletWithCallback)

View file

@ -87,7 +87,9 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest {
_ =
assert(initBalance > CurrencyUnits.zero,
s"Cannot run rescan test if our init wallet balance is zero!")
_ <- wallet.fullRescanNeutrinoWallet(DEFAULT_ADDR_BATCH_SIZE)
rescanState <- wallet.fullRescanNeutrinoWallet(DEFAULT_ADDR_BATCH_SIZE)
_ = assert(rescanState.isInstanceOf[RescanState.RescanStarted])
_ <- rescanState.asInstanceOf[RescanState.RescanStarted].blocksMatchedF
balanceAfterRescan <- wallet.getBalance()
} yield {
assert(balanceAfterRescan == initBalance)
@ -137,12 +139,18 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest {
_ <- newTxWallet.clearAllUtxos()
zeroBalance <- newTxWallet.getBalance()
_ = assert(zeroBalance == Satoshis.zero)
_ <- newTxWallet.rescanNeutrinoWallet(startOpt = txInBlockHeightOpt,
endOpt = None,
addressBatchSize =
DEFAULT_ADDR_BATCH_SIZE,
useCreationTime = false,
force = false)
rescanState <- newTxWallet.rescanNeutrinoWallet(
startOpt = txInBlockHeightOpt,
endOpt = None,
addressBatchSize = DEFAULT_ADDR_BATCH_SIZE,
useCreationTime = false,
force = false)
_ <- {
rescanState match {
case started: RescanState.RescanStarted => started.blocksMatchedF
case _: RescanState => Future.unit
}
}
balance <- newTxWallet.getBalance()
unconfirmedBalance <- newTxWallet.getUnconfirmedBalance()
} yield {
@ -189,7 +197,7 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest {
newTxWallet.spendingInfoDAO
.findAllForAccount(account.hdAccount)
.map(_.map(_.txid))
blocks <- newTxWallet.transactionDAO
_ <- newTxWallet.transactionDAO
.findByTxIdBEs(txIds)
.map(_.flatMap(_.blockHashOpt))
@ -204,14 +212,13 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest {
changeAddress <- newTxWallet.getNewChangeAddress(account)
} yield prev :+ address.scriptPubKey :+ changeAddress.scriptPubKey
}
matches <- newTxWallet.getMatchingBlocks(scriptPubKeys,
None,
None,
batchSize = 1)
_ <- newTxWallet.getMatchingBlocks(scriptPubKeys,
None,
None,
batchSize = 1)
} yield {
assert(matches.size == blocks.size)
assert(
matches.forall(blockMatch => blocks.contains(blockMatch.blockHash)))
succeed
}
}
@ -247,12 +254,18 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest {
for {
newTxWallet <- newTxWalletF
_ <- newTxWallet.rescanNeutrinoWallet(startOpt = None,
endOpt = None,
addressBatchSize =
DEFAULT_ADDR_BATCH_SIZE,
useCreationTime = true,
force = false)
rescanState <- newTxWallet.rescanNeutrinoWallet(
startOpt = None,
endOpt = None,
addressBatchSize = DEFAULT_ADDR_BATCH_SIZE,
useCreationTime = true,
force = false)
_ <- {
rescanState match {
case started: RescanState.RescanStarted => started.blocksMatchedF
case _: RescanState => Future.unit
}
}
balance <- newTxWallet.getBalance()
unconfirmedBalance <- newTxWallet.getUnconfirmedBalance()
} yield {
@ -287,12 +300,19 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest {
s"Cannot run rescan test if our init wallet balance is zero!")
oldestUtxoHeight <- oldestHeightF
end = Some(BlockStamp.BlockHeight(oldestUtxoHeight - 1))
_ <- wallet.rescanNeutrinoWallet(startOpt = BlockStamp.height0Opt,
endOpt = end,
addressBatchSize =
DEFAULT_ADDR_BATCH_SIZE,
useCreationTime = false,
force = false)
rescanState <- wallet.rescanNeutrinoWallet(startOpt =
BlockStamp.height0Opt,
endOpt = end,
addressBatchSize =
DEFAULT_ADDR_BATCH_SIZE,
useCreationTime = false,
force = false)
_ <- {
rescanState match {
case started: RescanState.RescanStarted => started.blocksMatchedF
case _: RescanState => Future.unit
}
}
balanceAfterRescan <- wallet.getBalance()
} yield {
assert(balanceAfterRescan == CurrencyUnits.zero)
@ -314,7 +334,7 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest {
//slight delay to make sure other rescan is started
val alreadyStartedF =
AsyncUtil.nonBlockingSleep(50.millis).flatMap { _ =>
AsyncUtil.nonBlockingSleep(10.millis).flatMap { _ =>
wallet.rescanNeutrinoWallet(startOpt = None,
endOpt = None,
addressBatchSize =
@ -324,11 +344,12 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest {
}
for {
start <- startF
_ = assert(start == RescanState.RescanDone)
_ = assert(start.isInstanceOf[RescanState.RescanStarted])
//try another one
alreadyStarted <- alreadyStartedF
_ <- start.asInstanceOf[RescanState.RescanStarted].stop()
} yield {
assert(alreadyStarted == RescanState.RescanInProgress)
assert(alreadyStarted == RescanState.RescanAlreadyStarted)
}
}

View file

@ -20,7 +20,7 @@ import org.bitcoins.wallet.config.WalletAppConfig
import org.scalatest.compatible.Assertion
import play.api.libs.json._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.Future
import scala.io.Source
class TrezorAddressTest extends BitcoinSWalletTest with EmptyFixture {
@ -145,9 +145,7 @@ class TrezorAddressTest extends BitcoinSWalletTest with EmptyFixture {
ConfigFactory.parseString(confStr)
}
private def getWallet(config: WalletAppConfig)(implicit
ec: ExecutionContext): Future[Wallet] = {
import system.dispatcher
private def getWallet(config: WalletAppConfig): Future[Wallet] = {
val bip39PasswordOpt = None
val startedF = config.start()
for {
@ -155,7 +153,7 @@ class TrezorAddressTest extends BitcoinSWalletTest with EmptyFixture {
wallet =
Wallet(MockNodeApi,
MockChainQueryApi,
ConstantFeeRateProvider(SatoshisPerVirtualByte.one))(config, ec)
ConstantFeeRateProvider(SatoshisPerVirtualByte.one))(config)
init <- Wallet.initialize(wallet = wallet,
bip39PasswordOpt = bip39PasswordOpt)
} yield init

View file

@ -183,8 +183,7 @@ class WalletUnitTest extends BitcoinSWalletTest {
_ <- startedF
} yield {
Wallet(wallet.nodeApi, wallet.chainQueryApi, wallet.feeRateApi)(
uniqueEntropyWalletConfig,
wallet.ec)
uniqueEntropyWalletConfig)
}
recoverToSucceededIf[IllegalArgumentException] {

View file

@ -1,5 +1,7 @@
package org.bitcoins.wallet
import akka.actor.ActorSystem
import org.bitcoins.core.api.wallet.SyncHeightDescriptor
import org.bitcoins.core.api.chain.ChainQueryApi
import org.bitcoins.core.api.feeprovider.FeeRateApi
import org.bitcoins.core.api.node.NodeApi
@ -8,7 +10,6 @@ import org.bitcoins.core.api.wallet.{
AnyHDWalletApi,
BlockSyncState,
CoinSelectionAlgo,
SyncHeightDescriptor,
WalletInfo
}
import org.bitcoins.core.config.BitcoinNetwork
@ -59,11 +60,12 @@ abstract class Wallet
override def keyManager: BIP39KeyManager = {
walletConfig.kmConf.toBip39KeyManager
}
implicit val ec: ExecutionContext
implicit val walletConfig: WalletAppConfig
implicit val system: ActorSystem = walletConfig.system
implicit val ec: ExecutionContext = system.dispatcher
private[wallet] lazy val scheduler = walletConfig.scheduler
val chainParams: ChainParams = walletConfig.chain
@ -159,7 +161,9 @@ abstract class Wallet
}
}
override def stop(): Future[Wallet] = Future.successful(this)
override def stop(): Future[Wallet] = {
Future.successful(this)
}
def getSyncDescriptorOpt(): Future[Option[SyncHeightDescriptor]] = {
stateDescriptorDAO.getSyncHeight()
@ -978,16 +982,13 @@ object Wallet extends WalletLogger {
chainQueryApi: ChainQueryApi,
feeRateApi: FeeRateApi
)(implicit
val walletConfig: WalletAppConfig,
val ec: ExecutionContext
val walletConfig: WalletAppConfig
) extends Wallet
def apply(
nodeApi: NodeApi,
chainQueryApi: ChainQueryApi,
feeRateApi: FeeRateApi)(implicit
config: WalletAppConfig,
ec: ExecutionContext): Wallet = {
feeRateApi: FeeRateApi)(implicit config: WalletAppConfig): Wallet = {
WalletImpl(nodeApi, chainQueryApi, feeRateApi)
}
@ -995,8 +996,8 @@ object Wallet extends WalletLogger {
* @throws RuntimeException if a different master xpub key exists in the database
*/
private def createMasterXPub(keyManager: BIP39KeyManager)(implicit
walletAppConfig: WalletAppConfig,
ec: ExecutionContext): Future[ExtPublicKey] = {
walletAppConfig: WalletAppConfig): Future[ExtPublicKey] = {
import walletAppConfig.ec
val masterXPubDAO = MasterXPubDAO()
val countF = masterXPubDAO.count()
//make sure we don't have a xpub in the db
@ -1061,9 +1062,11 @@ object Wallet extends WalletLogger {
}
}
def initialize(wallet: Wallet, bip39PasswordOpt: Option[String])(implicit
ec: ExecutionContext): Future[Wallet] = {
def initialize(
wallet: Wallet,
bip39PasswordOpt: Option[String]): Future[Wallet] = {
implicit val walletAppConfig = wallet.walletConfig
import walletAppConfig.ec
val passwordOpt = walletAppConfig.aesPasswordOpt
val createMasterXpubF = createMasterXPub(wallet.keyManager)

View file

@ -112,15 +112,6 @@ class WalletHolder(implicit ec: ExecutionContext)
WalletApi with NeutrinoWalletApi] = delegate(
_.processCompactFilters(blockFilters))
override def getMatchingBlocks(
scripts: Vector[ScriptPubKey],
startOpt: Option[BlockStamp],
endOpt: Option[BlockStamp],
batchSize: Int,
parallelismLevel: Int)(implicit ec: ExecutionContext): Future[
Vector[NeutrinoWalletApi.BlockMatchingResponse]] = delegate(
_.getMatchingBlocks(scripts, startOpt, endOpt, batchSize, parallelismLevel))
override def rescanNeutrinoWallet(
startOpt: Option[BlockStamp],
endOpt: Option[BlockStamp],

View file

@ -1,8 +1,9 @@
package org.bitcoins.wallet.config
import akka.actor.ActorSystem
import com.typesafe.config.Config
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.commons.config.{AppConfigFactory, ConfigOps}
import org.bitcoins.commons.config.{AppConfigFactoryBase, ConfigOps}
import org.bitcoins.core.api.CallbackConfig
import org.bitcoins.core.api.chain.ChainQueryApi
import org.bitcoins.core.api.feeprovider.FeeRateApi
@ -41,13 +42,15 @@ case class WalletAppConfig(
baseDatadir: Path,
configOverrides: Vector[Config],
kmConfOpt: Option[KeyManagerAppConfig] = None)(implicit
override val ec: ExecutionContext)
val system: ActorSystem)
extends DbAppConfig
with WalletDbManagement
with JdbcProfileComponent[WalletAppConfig]
with DBMasterXPubApi
with CallbackConfig[WalletCallbacks] {
implicit override val ec: ExecutionContext = system.dispatcher
override protected[bitcoins] def moduleName: String =
WalletAppConfig.moduleName
@ -294,10 +297,10 @@ case class WalletAppConfig(
def createHDWallet(
nodeApi: NodeApi,
chainQueryApi: ChainQueryApi,
feeRateApi: FeeRateApi)(implicit ec: ExecutionContext): Future[Wallet] = {
feeRateApi: FeeRateApi)(implicit system: ActorSystem): Future[Wallet] = {
WalletAppConfig.createHDWallet(nodeApi = nodeApi,
chainQueryApi = chainQueryApi,
feeRateApi = feeRateApi)(this, ec)
feeRateApi = feeRateApi)(this, system)
}
private[this] var rebroadcastTransactionsCancelOpt: Option[
@ -348,7 +351,7 @@ case class WalletAppConfig(
}
object WalletAppConfig
extends AppConfigFactory[WalletAppConfig]
extends AppConfigFactoryBase[WalletAppConfig, ActorSystem]
with WalletLogger {
final val DEFAULT_WALLET_NAME: String =
@ -360,7 +363,7 @@ object WalletAppConfig
* data directory and given list of configuration overrides.
*/
override def fromDatadir(datadir: Path, confs: Vector[Config])(implicit
ec: ExecutionContext): WalletAppConfig =
system: ActorSystem): WalletAppConfig =
WalletAppConfig(datadir, confs)
/** Creates a wallet based on the given [[WalletAppConfig]] */
@ -369,7 +372,8 @@ object WalletAppConfig
chainQueryApi: ChainQueryApi,
feeRateApi: FeeRateApi)(implicit
walletConf: WalletAppConfig,
ec: ExecutionContext): Future[Wallet] = {
system: ActorSystem): Future[Wallet] = {
import system.dispatcher
walletConf.hasWallet().flatMap { walletExists =>
val bip39PasswordOpt = walletConf.bip39PasswordOpt

View file

@ -1,5 +1,8 @@
package org.bitcoins.wallet.internal
import akka.NotUsed
import akka.stream.scaladsl.{Flow, Keep, Merge, Sink, Source}
import org.bitcoins.core.api.chain.ChainQueryApi
import org.bitcoins.core.api.chain.ChainQueryApi.{
FilterResponse,
InvalidBlockRange
@ -15,7 +18,8 @@ import org.bitcoins.core.wallet.rescan.RescanState
import org.bitcoins.crypto.DoubleSha256Digest
import org.bitcoins.wallet.{Wallet, WalletLogger}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}
private[wallet] trait RescanHandling extends WalletLogger {
self: Wallet =>
@ -75,14 +79,14 @@ private[wallet] trait RescanHandling extends WalletLogger {
Future.successful(None)
}
_ <- clearUtxos(account)
_ <- doNeutrinoRescan(account, start, endOpt, addressBatchSize)
state <- doNeutrinoRescan(account, start, endOpt, addressBatchSize)
_ <- stateDescriptorDAO.updateRescanning(false)
_ <- walletCallbacks.executeOnRescanComplete(logger)
} yield {
logger.info(s"Finished rescanning the wallet. It took ${System
.currentTimeMillis() - startTime}ms")
RescanState.RescanDone
state
}
res.recoverWith { case err: Throwable =>
@ -96,7 +100,7 @@ private[wallet] trait RescanHandling extends WalletLogger {
} else {
logger.warn(
s"Rescan already started, ignoring request to start another one")
Future.successful(RescanState.RescanInProgress)
Future.successful(RescanState.RescanAlreadyStarted)
}
} yield rescanState
@ -107,20 +111,95 @@ private[wallet] trait RescanHandling extends WalletLogger {
.epochSecondToBlockHeight(creationTime.getEpochSecond)
.map(BlockHeight)
/** @inheritdoc */
override def getMatchingBlocks(
private def buildFilterMatchFlow(
range: Range,
scripts: Vector[ScriptPubKey],
parallelism: Int,
batchSize: Int): RescanState.RescanStarted = {
val maybe = Source.maybe[Int]
val combine: Source[Int, Promise[Option[Int]]] = {
Source.combineMat(maybe, Source(range))(Merge(_))(Keep.left)
}
val seed: Int => Vector[Int] = { case int =>
Vector(int)
}
val aggregate: (Vector[Int], Int) => Vector[Int] = {
case (vec: Vector[Int], int: Int) => vec.:+(int)
}
//this promise is completed after we scan the last filter
//in the rescanSink
val rescanCompletePromise: Promise[Unit] = Promise()
//fetches filters, matches filters against our wallet, and then request blocks
//for the wallet to process
val rescanSink: Sink[Int, Future[Seq[Vector[BlockMatchingResponse]]]] = {
Flow[Int]
.batch[Vector[Int]](batchSize, seed)(aggregate)
.via(fetchFiltersFlow)
.mapAsync(1) { case filterResponse =>
val f = searchFiltersForMatches(scripts, filterResponse, parallelism)(
ExecutionContext.fromExecutor(walletConfig.rescanThreadPool))
val heightRange = filterResponse.map(_.blockHeight)
f.onComplete {
case Success(_) =>
if (heightRange.lastOption == range.lastOption) {
//complete the stream if we processed the last filter
rescanCompletePromise.success(())
}
case Failure(_) => //do nothing, the stream will fail on its own
}
f
}
.toMat(Sink.seq)(Keep.right)
}
//the materialized values of the two streams
//completeRescanEarly allows us to safely complete the rescan early
//matchingBlocksF is materialized when the stream is complete. This is all blocks our wallet matched
val (completeRescanEarlyP, matchingBlocksF) =
combine.toMat(rescanSink)(Keep.both).run()
//if we have seen the last filter, complete the rescanEarlyP so we are consistent
rescanCompletePromise.future.map(_ => completeRescanEarlyP.success(None))
val flatten = matchingBlocksF.map(_.flatten.toVector)
//return RescanStarted with access to the ability to complete the rescan early
//via the completeRescanEarlyP promise.
RescanState.RescanStarted(completeRescanEarlyP, flatten)
}
/** Iterates over the block filters in order to find filters that match to the given addresses
*
* I queries the filter database for [[batchSize]] filters a time
* and tries to run [[GolombFilter.matchesAny]] for each filter.
*
* It tries to match the filters in parallel using [[parallelismLevel]] threads.
* For best results use it with a separate execution context.
*
* @param scripts list of [[ScriptPubKey]]'s to watch
* @param startOpt start point (if empty it starts with the genesis block)
* @param endOpt end point (if empty it ends with the best tip)
* @param batchSize number of filters that can be matched in one batch
* @param parallelismLevel max number of threads required to perform matching
* (default [[Runtime.getRuntime.availableProcessors()]])
* @return a list of matching block hashes
*/
def getMatchingBlocks(
scripts: Vector[ScriptPubKey],
startOpt: Option[BlockStamp] = None,
endOpt: Option[BlockStamp] = None,
batchSize: Int = 100,
parallelismLevel: Int = Runtime.getRuntime.availableProcessors())(implicit
ec: ExecutionContext): Future[Vector[BlockMatchingResponse]] = {
ec: ExecutionContext): Future[RescanState] = {
require(batchSize > 0, "batch size must be greater than zero")
require(parallelismLevel > 0, "parallelism level must be greater than zero")
if (scripts.isEmpty) {
Future.successful(Vector.empty)
Future.successful(RescanState.RescanDone)
} else {
for {
startHeight <- startOpt.fold(Future.successful(0))(
chainQueryApi.getHeightByBlockStamp)
@ -134,13 +213,13 @@ private[wallet] trait RescanHandling extends WalletLogger {
_ = logger.info(
s"Beginning to search for matches between ${startHeight}:${endHeight} against ${scripts.length} spks")
range = startHeight.to(endHeight)
matched <- FutureUtil.batchAndSyncExecute(
elements = range.toVector,
f = fetchFiltersInRange(scripts, parallelismLevel),
batchSize = batchSize)
rescanStarted = buildFilterMatchFlow(range,
scripts,
parallelismLevel,
batchSize)
} yield {
logger.info(s"Matched ${matched.length} blocks on rescan")
matched
rescanStarted
}
}
}
@ -152,16 +231,16 @@ private[wallet] trait RescanHandling extends WalletLogger {
account: HDAccount,
startOpt: Option[BlockStamp],
endOpt: Option[BlockStamp],
addressBatchSize: Int): Future[Unit] = {
addressBatchSize: Int): Future[RescanState] = {
for {
scriptPubKeys <- generateScriptPubKeys(account, addressBatchSize)
addressCount <- addressDAO.count()
_ <- matchBlocks(scriptPubKeys = scriptPubKeys,
endOpt = endOpt,
startOpt = startOpt)
inProgress <- matchBlocks(scriptPubKeys = scriptPubKeys,
endOpt = endOpt,
startOpt = startOpt)
externalGap <- calcAddressGap(HDChainType.External, account)
changeGap <- calcAddressGap(HDChainType.Change, account)
res <- {
_ <- {
logger.info(s"addressCount=$addressCount externalGap=$externalGap")
if (addressCount != 0) {
logger.info(
@ -180,7 +259,7 @@ private[wallet] trait RescanHandling extends WalletLogger {
doNeutrinoRescan(account, startOpt, endOpt, addressBatchSize)
}
}
} yield res
} yield inProgress
}
private def calcAddressGap(
@ -226,18 +305,18 @@ private[wallet] trait RescanHandling extends WalletLogger {
private def matchBlocks(
scriptPubKeys: Vector[ScriptPubKey],
endOpt: Option[BlockStamp],
startOpt: Option[BlockStamp]): Future[Vector[DoubleSha256Digest]] = {
startOpt: Option[BlockStamp]): Future[RescanState] = {
val blocksF = for {
blocks <- getMatchingBlocks(scripts = scriptPubKeys,
startOpt = startOpt,
endOpt = endOpt)(
val rescanStateF = for {
rescanState <- getMatchingBlocks(scripts = scriptPubKeys,
startOpt = startOpt,
endOpt = endOpt)(
ExecutionContext.fromExecutor(walletConfig.rescanThreadPool))
} yield {
blocks.sortBy(_.blockHeight).map(_.blockHash.flip)
rescanState
}
blocksF
rescanStateF
}
/** Use to generate a list of addresses to search when restoring our wallet
@ -298,17 +377,34 @@ private[wallet] trait RescanHandling extends WalletLogger {
}
}
private def fetchFiltersInRange(
/** Given a range of filter heights, we fetch the filters associated with those heights and emit them downstream */
private val fetchFiltersFlow: Flow[
Vector[Int],
Vector[ChainQueryApi.FilterResponse],
NotUsed] = {
Flow[Vector[Int]].mapAsync(FutureUtil.getParallelism) {
case range: Vector[Int] =>
val startHeight = range.head
val endHeight = range.last
logger.info(
s"Searching filters from start=$startHeight to end=$endHeight")
chainQueryApi.getFiltersBetweenHeights(startHeight = startHeight,
endHeight = endHeight)
}
}
/** Searches the given block filters against the given scriptPubKeys for matches.
* If there is a match, request the full block to search
*/
private def searchFiltersForMatches(
scripts: Vector[ScriptPubKey],
parallelismLevel: Int)(heightRange: Vector[Int])(implicit
filtersResponse: Vector[ChainQueryApi.FilterResponse],
parallelismLevel: Int)(implicit
ec: ExecutionContext): Future[Vector[BlockMatchingResponse]] = {
val startHeight = heightRange.head
val endHeight = heightRange.last
val startHeight = filtersResponse.head.blockHeight
val endHeight = filtersResponse.last.blockHeight
for {
filtersResponse <- chainQueryApi.getFiltersBetweenHeights(
startHeight = startHeight,
endHeight = endHeight)
filtered <- findMatches(filtersResponse, scripts, parallelismLevel)
filtered <- findMatches(filtersResponse, scripts, parallelismLevel)(ec)
_ <- downloadAndProcessBlocks(filtered.map(_.blockHash.flip))
} yield {
logger.info(