diff --git a/core/src/main/scala/org/bitcoins/core/util/FutureUtil.scala b/core/src/main/scala/org/bitcoins/core/util/FutureUtil.scala index 1f556dcb3e..48c261e80a 100644 --- a/core/src/main/scala/org/bitcoins/core/util/FutureUtil.scala +++ b/core/src/main/scala/org/bitcoins/core/util/FutureUtil.scala @@ -78,7 +78,7 @@ object FutureUtil { } /** Batches the elements by batchSize, executes f, and then aggregates all of the results - * into a vector and returns it. This is is the synchrononous version of [[batchAndParallelExecute()]] + * into a vector and returns it. This is is the synchronous version of [[batchAndParallelExecute()]] */ def batchAndSyncExecute[T, U]( elements: Vector[T], diff --git a/wallet-test/src/test/scala/org/bitcoins/wallet/RescanHandlingTest.scala b/wallet-test/src/test/scala/org/bitcoins/wallet/RescanHandlingTest.scala index 719af1d322..23268683c9 100644 --- a/wallet-test/src/test/scala/org/bitcoins/wallet/RescanHandlingTest.scala +++ b/wallet-test/src/test/scala/org/bitcoins/wallet/RescanHandlingTest.scala @@ -2,6 +2,7 @@ package org.bitcoins.wallet import org.bitcoins.core.currency.{Bitcoins, CurrencyUnits, Satoshis} import org.bitcoins.core.protocol.BlockStamp +import org.bitcoins.core.protocol.script.ScriptPubKey import org.bitcoins.core.util.FutureUtil import org.bitcoins.server.BitcoinSAppConfig import org.bitcoins.testkit.BitcoinSTestAppConfig @@ -12,6 +13,8 @@ import org.bitcoins.testkit.wallet.{ } import org.scalatest.FutureOutcome +import scala.concurrent.Future + class RescanHandlingTest extends BitcoinSWalletTest { /** Wallet config with data directory set to user temp directory */ @@ -131,6 +134,9 @@ class RescanHandlingTest extends BitcoinSWalletTest { initBlockHeight <- initBlockHeightF txInBlockHeight = initBlockHeight + numBlocks txInBlockHeightOpt = Some(BlockStamp.BlockHeight(txInBlockHeight)) + _ <- newTxWallet.clearAllUtxosAndAddresses() + zeroBalance <- newTxWallet.getBalance() + _ = assert(zeroBalance == Satoshis.zero) _ <- newTxWallet.rescanNeutrinoWallet(startOpt = txInBlockHeightOpt, endOpt = None, addressBatchSize = @@ -142,6 +148,66 @@ class RescanHandlingTest extends BitcoinSWalletTest { } } + it must "be able to discover funds using multiple batches" in { + fixture: WalletWithBitcoind => + val WalletWithBitcoindV19(wallet, bitcoind) = fixture + + val amt = Bitcoins.one + val numBlocks = 1 + + //send funds to a fresh wallet address + val addrF = wallet.getNewAddress() + val bitcoindAddrF = bitcoind.getNewAddress + val newTxWalletF = for { + addr <- addrF + txid <- bitcoind.sendToAddress(addr, amt) + tx <- bitcoind.getRawTransactionRaw(txid) + bitcoindAddr <- bitcoindAddrF + blockHashes <- + bitcoind.generateToAddress(blocks = numBlocks, address = bitcoindAddr) + newTxWallet <- wallet.processTransaction(transaction = tx, + blockHashOpt = + blockHashes.headOption) + balance <- newTxWallet.getBalance() + unconfirmedBalance <- newTxWallet.getUnconfirmedBalance() + } yield { + //balance doesn't have to exactly equal, as there was money in the + //wallet before hand. + assert(balance >= amt) + assert(balance == unconfirmedBalance) + newTxWallet + } + + for { + newTxWallet <- newTxWalletF + + account <- newTxWallet.getDefaultAccount() + blocks <- + newTxWallet.spendingInfoDAO + .findAllForAccount(account.hdAccount) + .map(_.flatMap(_.blockHash).distinct) + + _ <- newTxWallet.clearAllUtxosAndAddresses() + scriptPubKeys <- + 1.to(10).foldLeft(Future.successful(Vector.empty[ScriptPubKey])) { + (prevFuture, _) => + for { + prev <- prevFuture + address <- newTxWallet.getNewAddress(account) + changeAddress <- newTxWallet.getNewChangeAddress(account) + } yield prev :+ address.scriptPubKey :+ changeAddress.scriptPubKey + } + matches <- newTxWallet.getMatchingBlocks(scriptPubKeys, + None, + None, + batchSize = 1) + } yield { + assert(matches.size == blocks.size) + assert( + matches.forall(blockMatch => blocks.contains(blockMatch.blockHash))) + } + } + it must "be able to discover funds that occurred from the wallet creation time" in { fixture: WalletWithBitcoind => val WalletWithBitcoindV19(wallet, bitcoind) = fixture diff --git a/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala b/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala index b2337ac4d2..109ae6d199 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala @@ -107,10 +107,9 @@ private[wallet] trait RescanHandling extends WalletLogger { _ = logger.info( s"Beginning to search for matches between ${startHeight}:${endHeight} against ${scripts.length} spks") range = startHeight.to(endHeight) - matched <- FutureUtil.batchExecute( + matched <- FutureUtil.batchAndSyncExecute( elements = range.toVector, f = fetchFiltersInRange(scripts, parallelismLevel), - init = Vector.empty, batchSize = batchSize) } yield { logger.info(s"Matched ${matched.length} blocks on rescan") @@ -264,20 +263,28 @@ private[wallet] trait RescanHandling extends WalletLogger { startHeight = startHeight, endHeight = endHeight) filtered <- findMatches(filtersResponse, scripts, parallelismLevel) - } yield filtered.toVector + } yield { + logger.info( + s"Found ${filtered.length} matches from start=$startHeight to end=$endHeight") + filtered + } } private def findMatches( filters: Vector[FilterResponse], scripts: Vector[ScriptPubKey], - parallelismLevel: Int): Future[Iterator[BlockMatchingResponse]] = { - if (filters.isEmpty) - Future.successful(Iterator.empty) - else { + parallelismLevel: Int): Future[Vector[BlockMatchingResponse]] = { + if (filters.isEmpty) { + logger.info("No Filters to check against") + Future.successful(Vector.empty) + } else if (scripts.isEmpty) { + logger.info("No scripts to check against") + Future.successful(Vector.empty) + } else { val bytes = scripts.map(_.asmBytes) /* Iterates over the grouped vector of filters to find matches with the given [[bytes]]. */ val groupSize = calcGroupSize(filters.size, parallelismLevel) - val filterGroups = filters.grouped(groupSize) + val filterGroups = filters.grouped(groupSize).toVector // Sequence on the filter groups making sure the number of threads doesn't exceed [[parallelismLevel]]. Future .sequence(filterGroups.map { filterGroup => @@ -290,6 +297,7 @@ private[wallet] trait RescanHandling extends WalletLogger { (blocks, filter) => val matcher = SimpleFilterMatcher(filter.compactFilter) if (matcher.matchesAny(bytes)) { + logger.info(s"Found a match in block ${filter.blockHeight}") blocks :+ BlockMatchingResponse(filter.blockHash, filter.blockHeight) } else {