2022 07 29 fix multiple batch test (#4556)

* Fix multiple batch unit test so it fails

* Fix compile

* Move generateScriptPubKeys into buildFilterMatchFlow()

* Rename param to filterBatchSize

* Rename to buildRescanFlow

* Remove println

* Cleanup
This commit is contained in:
Chris Stewart 2022-08-03 12:33:37 -05:00 committed by GitHub
parent 1d3db879ad
commit c89491c3e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 69 additions and 67 deletions

View File

@ -3,7 +3,6 @@ package org.bitcoins.wallet
import org.bitcoins.asyncutil.AsyncUtil import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.core.currency.{Bitcoins, CurrencyUnits, Satoshis} import org.bitcoins.core.currency.{Bitcoins, CurrencyUnits, Satoshis}
import org.bitcoins.core.protocol.BlockStamp import org.bitcoins.core.protocol.BlockStamp
import org.bitcoins.core.protocol.script.ScriptPubKey
import org.bitcoins.core.protocol.transaction.TransactionOutput import org.bitcoins.core.protocol.transaction.TransactionOutput
import org.bitcoins.core.util.FutureUtil import org.bitcoins.core.util.FutureUtil
import org.bitcoins.core.wallet.rescan.RescanState import org.bitcoins.core.wallet.rescan.RescanState
@ -161,13 +160,15 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest {
val amt = Bitcoins.one val amt = Bitcoins.one
val numBlocks = 1 val numBlocks = 1
val initBalanceF = wallet.getBalance()
val defaultAccountF = wallet.getDefaultAccount() val defaultAccountF = wallet.getDefaultAccount()
//send funds to a fresh wallet address //send funds to a fresh wallet address
val addrF = wallet.getNewAddress() val addrF = wallet.getNewAddress()
val bitcoindAddrF = bitcoind.getNewAddress val bitcoindAddrF = bitcoind.getNewAddress
val newTxWalletF = for { val balanceAfterPayment1F = for {
addr <- addrF addr <- addrF
_ <- initBalanceF
txid <- bitcoind.sendToAddress(addr, amt) txid <- bitcoind.sendToAddress(addr, amt)
tx <- bitcoind.getRawTransactionRaw(txid) tx <- bitcoind.getRawTransactionRaw(txid)
bitcoindAddr <- bitcoindAddrF bitcoindAddr <- bitcoindAddrF
@ -183,40 +184,31 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest {
//wallet before hand. //wallet before hand.
assert(balance >= amt) assert(balance >= amt)
assert(amt == unconfirmedBalance) assert(amt == unconfirmedBalance)
newTxWallet balance
} }
for { for {
newTxWallet <- newTxWalletF _ <- initBalanceF
balanceAfterPayment1 <- balanceAfterPayment1F
account <- defaultAccountF account <- defaultAccountF
txIds <- txIds <-
wallet wallet
.listUtxos(account.hdAccount) .listUtxos(account.hdAccount)
.map(_.map(_.txid)) .map(_.map(_.txid))
_ <- newTxWallet _ <- wallet
.findByTxIds(txIds) .findByTxIds(txIds)
.map(_.flatMap(_.blockHashOpt)) .map(_.flatMap(_.blockHashOpt))
_ <- newTxWallet.clearAllUtxos() _ <- wallet.clearAllUtxos()
_ <- newTxWallet.clearAllAddresses() _ <- wallet.clearAllAddresses()
_ <- balanceAfterClear <- wallet.getBalance()
1.to(10).foldLeft(Future.successful(Vector.empty[ScriptPubKey])) { rescanState <- wallet.fullRescanNeutrinoWallet(1, true)
(prevFuture, _) => _ <- RescanState.awaitRescanDone(rescanState)
for { balanceAfterRescan <- wallet.getBalance()
prev <- prevFuture
address <- wallet.getNewAddress(account)
changeAddress <- wallet.getNewChangeAddress(account)
} yield prev :+ address.scriptPubKey :+ changeAddress.scriptPubKey
}
_ <- wallet.rescanNeutrinoWallet(startOpt = None,
endOpt = None,
addressBatchSize = 1,
useCreationTime = false,
force = true)
} yield { } yield {
assert(balanceAfterClear == CurrencyUnits.zero)
succeed assert(balanceAfterPayment1 == balanceAfterRescan)
} }
} }

View File

@ -129,15 +129,26 @@ private[wallet] trait RescanHandling extends WalletLogger {
.epochSecondToBlockHeight(creationTime.getEpochSecond) .epochSecondToBlockHeight(creationTime.getEpochSecond)
.map(BlockHeight) .map(BlockHeight)
private def buildFilterMatchFlow( private def buildRescanFlow(
account: HDAccount,
addressBatchSize: Int,
range: Range, range: Range,
scripts: Vector[ScriptPubKey],
parallelism: Int, parallelism: Int,
batchSize: Int): RescanState.RescanStarted = { filterBatchSize: Int): RescanState.RescanStarted = {
val scriptsF = generateScriptPubKeys(account, addressBatchSize)
//by completing the promise returned by this sink
//we will be able to arbitrarily terminate the stream
//see: https://doc.akka.io/docs/akka/current/stream/operators/Source/maybe.html
val maybe = Source.maybe[Int] val maybe = Source.maybe[Int]
//combine the Source.maybe with the Source providing filter heights
//this is needed so we can arbitrarily kill the stream with
//the promise returned by Source.maybe
val combine: Source[Int, Promise[Option[Int]]] = { val combine: Source[Int, Promise[Option[Int]]] = {
Source.combineMat(maybe, Source(range))(Merge(_))(Keep.left) Source.combineMat(maybe, Source(range))(Merge(_))(Keep.left)
} }
val seed: Int => Vector[Int] = { case int => val seed: Int => Vector[Int] = { case int =>
Vector(int) Vector(int)
} }
@ -150,14 +161,18 @@ private[wallet] trait RescanHandling extends WalletLogger {
val rescanCompletePromise: Promise[Unit] = Promise() val rescanCompletePromise: Promise[Unit] = Promise()
//fetches filters, matches filters against our wallet, and then request blocks //fetches filters, matches filters against our wallet, and then request blocks
//for the wallet to process //for the wallet to process. This sink takes as input filter heights
//to fetch for rescanning.
val rescanSink: Sink[Int, Future[Seq[Vector[BlockMatchingResponse]]]] = { val rescanSink: Sink[Int, Future[Seq[Vector[BlockMatchingResponse]]]] = {
Flow[Int] Flow[Int]
.batch[Vector[Int]](batchSize, seed)(aggregate) .batch[Vector[Int]](filterBatchSize, seed)(aggregate)
.via(fetchFiltersFlow) .via(fetchFiltersFlow)
.mapAsync(1) { case filterResponse => .mapAsync(1) { case filterResponse =>
val f = searchFiltersForMatches(scripts, filterResponse, parallelism)( val f =
ExecutionContext.fromExecutor(walletConfig.rescanThreadPool)) scriptsF.flatMap { scripts =>
searchFiltersForMatches(scripts, filterResponse, parallelism)(
ExecutionContext.fromExecutor(walletConfig.rescanThreadPool))
}
val heightRange = filterResponse.map(_.blockHeight) val heightRange = filterResponse.map(_.blockHeight)
@ -207,38 +222,35 @@ private[wallet] trait RescanHandling extends WalletLogger {
* @return a list of matching block hashes * @return a list of matching block hashes
*/ */
def getMatchingBlocks( def getMatchingBlocks(
scripts: Vector[ScriptPubKey],
startOpt: Option[BlockStamp] = None, startOpt: Option[BlockStamp] = None,
endOpt: Option[BlockStamp] = None, endOpt: Option[BlockStamp] = None,
batchSize: Int = 100, addressBatchSize: Int = 100,
parallelismLevel: Int = Runtime.getRuntime.availableProcessors())(implicit parallelismLevel: Int = Runtime.getRuntime.availableProcessors(),
account: HDAccount)(implicit
ec: ExecutionContext): Future[RescanState] = { ec: ExecutionContext): Future[RescanState] = {
require(batchSize > 0, "batch size must be greater than zero") require(addressBatchSize > 0, "batch size must be greater than zero")
require(parallelismLevel > 0, "parallelism level must be greater than zero") require(parallelismLevel > 0, "parallelism level must be greater than zero")
if (scripts.isEmpty) { for {
Future.successful(RescanState.RescanDone) startHeight <- startOpt.fold(Future.successful(0))(
} else { chainQueryApi.getHeightByBlockStamp)
for { _ = if (startHeight < 0)
startHeight <- startOpt.fold(Future.successful(0))( throw InvalidBlockRange(s"Start position cannot negative")
chainQueryApi.getHeightByBlockStamp) endHeight <- endOpt.fold(chainQueryApi.getFilterCount())(
_ = if (startHeight < 0) chainQueryApi.getHeightByBlockStamp)
throw InvalidBlockRange(s"Start position cannot negative") _ = if (startHeight > endHeight)
endHeight <- endOpt.fold(chainQueryApi.getFilterCount())( throw InvalidBlockRange(
chainQueryApi.getHeightByBlockStamp) s"End position cannot precede start: $startHeight:$endHeight")
_ = if (startHeight > endHeight) _ = logger.info(
throw InvalidBlockRange( s"Beginning to search for matches between ${startHeight}:${endHeight}")
s"End position cannot precede start: $startHeight:$endHeight") range = startHeight.to(endHeight)
_ = logger.info(
s"Beginning to search for matches between ${startHeight}:${endHeight} against ${scripts.length} spks")
range = startHeight.to(endHeight)
rescanStarted = buildFilterMatchFlow(range, rescanStarted = buildRescanFlow(account = account,
scripts, addressBatchSize = addressBatchSize,
parallelismLevel, range = range,
batchSize) parallelism = parallelismLevel,
} yield { filterBatchSize = addressBatchSize)
rescanStarted } yield {
} rescanStarted
} }
} }
@ -251,11 +263,10 @@ private[wallet] trait RescanHandling extends WalletLogger {
endOpt: Option[BlockStamp], endOpt: Option[BlockStamp],
addressBatchSize: Int): Future[RescanState] = { addressBatchSize: Int): Future[RescanState] = {
for { for {
scriptPubKeys <- generateScriptPubKeys(account, addressBatchSize)
addressCount <- addressDAO.count() addressCount <- addressDAO.count()
inProgress <- matchBlocks(scriptPubKeys = scriptPubKeys, inProgress <- matchBlocks(endOpt = endOpt,
endOpt = endOpt, startOpt = startOpt,
startOpt = startOpt) account = account)
externalGap <- calcAddressGap(HDChainType.External, account) externalGap <- calcAddressGap(HDChainType.External, account)
changeGap <- calcAddressGap(HDChainType.Change, account) changeGap <- calcAddressGap(HDChainType.Change, account)
_ <- { _ <- {
@ -321,14 +332,13 @@ private[wallet] trait RescanHandling extends WalletLogger {
} }
private def matchBlocks( private def matchBlocks(
scriptPubKeys: Vector[ScriptPubKey],
endOpt: Option[BlockStamp], endOpt: Option[BlockStamp],
startOpt: Option[BlockStamp]): Future[RescanState] = { startOpt: Option[BlockStamp],
account: HDAccount): Future[RescanState] = {
val rescanStateF = for { val rescanStateF = for {
rescanState <- getMatchingBlocks(scripts = scriptPubKeys, rescanState <- getMatchingBlocks(startOpt = startOpt,
startOpt = startOpt, endOpt = endOpt,
endOpt = endOpt)( account = account)(
ExecutionContext.fromExecutor(walletConfig.rescanThreadPool)) ExecutionContext.fromExecutor(walletConfig.rescanThreadPool))
} yield { } yield {
rescanState rescanState