mirror of
https://github.com/bitcoin-s/bitcoin-s.git
synced 2025-01-18 21:34:39 +01:00
Fix rescans that are larger than the batch size (#1916)
* Fix rescans that are larger than the batch size * Add test
This commit is contained in:
parent
ef4329d283
commit
8d47b68764
@ -78,7 +78,7 @@ object FutureUtil {
|
||||
}
|
||||
|
||||
/** Batches the elements by batchSize, executes f, and then aggregates all of the results
|
||||
* into a vector and returns it. This is is the synchrononous version of [[batchAndParallelExecute()]]
|
||||
* into a vector and returns it. This is is the synchronous version of [[batchAndParallelExecute()]]
|
||||
*/
|
||||
def batchAndSyncExecute[T, U](
|
||||
elements: Vector[T],
|
||||
|
@ -2,6 +2,7 @@ package org.bitcoins.wallet
|
||||
|
||||
import org.bitcoins.core.currency.{Bitcoins, CurrencyUnits, Satoshis}
|
||||
import org.bitcoins.core.protocol.BlockStamp
|
||||
import org.bitcoins.core.protocol.script.ScriptPubKey
|
||||
import org.bitcoins.core.util.FutureUtil
|
||||
import org.bitcoins.server.BitcoinSAppConfig
|
||||
import org.bitcoins.testkit.BitcoinSTestAppConfig
|
||||
@ -12,6 +13,8 @@ import org.bitcoins.testkit.wallet.{
|
||||
}
|
||||
import org.scalatest.FutureOutcome
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
class RescanHandlingTest extends BitcoinSWalletTest {
|
||||
|
||||
/** Wallet config with data directory set to user temp directory */
|
||||
@ -131,6 +134,9 @@ class RescanHandlingTest extends BitcoinSWalletTest {
|
||||
initBlockHeight <- initBlockHeightF
|
||||
txInBlockHeight = initBlockHeight + numBlocks
|
||||
txInBlockHeightOpt = Some(BlockStamp.BlockHeight(txInBlockHeight))
|
||||
_ <- newTxWallet.clearAllUtxosAndAddresses()
|
||||
zeroBalance <- newTxWallet.getBalance()
|
||||
_ = assert(zeroBalance == Satoshis.zero)
|
||||
_ <- newTxWallet.rescanNeutrinoWallet(startOpt = txInBlockHeightOpt,
|
||||
endOpt = None,
|
||||
addressBatchSize =
|
||||
@ -142,6 +148,66 @@ class RescanHandlingTest extends BitcoinSWalletTest {
|
||||
}
|
||||
}
|
||||
|
||||
it must "be able to discover funds using multiple batches" in {
|
||||
fixture: WalletWithBitcoind =>
|
||||
val WalletWithBitcoindV19(wallet, bitcoind) = fixture
|
||||
|
||||
val amt = Bitcoins.one
|
||||
val numBlocks = 1
|
||||
|
||||
//send funds to a fresh wallet address
|
||||
val addrF = wallet.getNewAddress()
|
||||
val bitcoindAddrF = bitcoind.getNewAddress
|
||||
val newTxWalletF = for {
|
||||
addr <- addrF
|
||||
txid <- bitcoind.sendToAddress(addr, amt)
|
||||
tx <- bitcoind.getRawTransactionRaw(txid)
|
||||
bitcoindAddr <- bitcoindAddrF
|
||||
blockHashes <-
|
||||
bitcoind.generateToAddress(blocks = numBlocks, address = bitcoindAddr)
|
||||
newTxWallet <- wallet.processTransaction(transaction = tx,
|
||||
blockHashOpt =
|
||||
blockHashes.headOption)
|
||||
balance <- newTxWallet.getBalance()
|
||||
unconfirmedBalance <- newTxWallet.getUnconfirmedBalance()
|
||||
} yield {
|
||||
//balance doesn't have to exactly equal, as there was money in the
|
||||
//wallet before hand.
|
||||
assert(balance >= amt)
|
||||
assert(balance == unconfirmedBalance)
|
||||
newTxWallet
|
||||
}
|
||||
|
||||
for {
|
||||
newTxWallet <- newTxWalletF
|
||||
|
||||
account <- newTxWallet.getDefaultAccount()
|
||||
blocks <-
|
||||
newTxWallet.spendingInfoDAO
|
||||
.findAllForAccount(account.hdAccount)
|
||||
.map(_.flatMap(_.blockHash).distinct)
|
||||
|
||||
_ <- newTxWallet.clearAllUtxosAndAddresses()
|
||||
scriptPubKeys <-
|
||||
1.to(10).foldLeft(Future.successful(Vector.empty[ScriptPubKey])) {
|
||||
(prevFuture, _) =>
|
||||
for {
|
||||
prev <- prevFuture
|
||||
address <- newTxWallet.getNewAddress(account)
|
||||
changeAddress <- newTxWallet.getNewChangeAddress(account)
|
||||
} yield prev :+ address.scriptPubKey :+ changeAddress.scriptPubKey
|
||||
}
|
||||
matches <- newTxWallet.getMatchingBlocks(scriptPubKeys,
|
||||
None,
|
||||
None,
|
||||
batchSize = 1)
|
||||
} yield {
|
||||
assert(matches.size == blocks.size)
|
||||
assert(
|
||||
matches.forall(blockMatch => blocks.contains(blockMatch.blockHash)))
|
||||
}
|
||||
}
|
||||
|
||||
it must "be able to discover funds that occurred from the wallet creation time" in {
|
||||
fixture: WalletWithBitcoind =>
|
||||
val WalletWithBitcoindV19(wallet, bitcoind) = fixture
|
||||
|
@ -107,10 +107,9 @@ private[wallet] trait RescanHandling extends WalletLogger {
|
||||
_ = logger.info(
|
||||
s"Beginning to search for matches between ${startHeight}:${endHeight} against ${scripts.length} spks")
|
||||
range = startHeight.to(endHeight)
|
||||
matched <- FutureUtil.batchExecute(
|
||||
matched <- FutureUtil.batchAndSyncExecute(
|
||||
elements = range.toVector,
|
||||
f = fetchFiltersInRange(scripts, parallelismLevel),
|
||||
init = Vector.empty,
|
||||
batchSize = batchSize)
|
||||
} yield {
|
||||
logger.info(s"Matched ${matched.length} blocks on rescan")
|
||||
@ -264,20 +263,28 @@ private[wallet] trait RescanHandling extends WalletLogger {
|
||||
startHeight = startHeight,
|
||||
endHeight = endHeight)
|
||||
filtered <- findMatches(filtersResponse, scripts, parallelismLevel)
|
||||
} yield filtered.toVector
|
||||
} yield {
|
||||
logger.info(
|
||||
s"Found ${filtered.length} matches from start=$startHeight to end=$endHeight")
|
||||
filtered
|
||||
}
|
||||
}
|
||||
|
||||
private def findMatches(
|
||||
filters: Vector[FilterResponse],
|
||||
scripts: Vector[ScriptPubKey],
|
||||
parallelismLevel: Int): Future[Iterator[BlockMatchingResponse]] = {
|
||||
if (filters.isEmpty)
|
||||
Future.successful(Iterator.empty)
|
||||
else {
|
||||
parallelismLevel: Int): Future[Vector[BlockMatchingResponse]] = {
|
||||
if (filters.isEmpty) {
|
||||
logger.info("No Filters to check against")
|
||||
Future.successful(Vector.empty)
|
||||
} else if (scripts.isEmpty) {
|
||||
logger.info("No scripts to check against")
|
||||
Future.successful(Vector.empty)
|
||||
} else {
|
||||
val bytes = scripts.map(_.asmBytes)
|
||||
/* Iterates over the grouped vector of filters to find matches with the given [[bytes]]. */
|
||||
val groupSize = calcGroupSize(filters.size, parallelismLevel)
|
||||
val filterGroups = filters.grouped(groupSize)
|
||||
val filterGroups = filters.grouped(groupSize).toVector
|
||||
// Sequence on the filter groups making sure the number of threads doesn't exceed [[parallelismLevel]].
|
||||
Future
|
||||
.sequence(filterGroups.map { filterGroup =>
|
||||
@ -290,6 +297,7 @@ private[wallet] trait RescanHandling extends WalletLogger {
|
||||
(blocks, filter) =>
|
||||
val matcher = SimpleFilterMatcher(filter.compactFilter)
|
||||
if (matcher.matchesAny(bytes)) {
|
||||
logger.info(s"Found a match in block ${filter.blockHeight}")
|
||||
blocks :+ BlockMatchingResponse(filter.blockHash,
|
||||
filter.blockHeight)
|
||||
} else {
|
||||
|
Loading…
Reference in New Issue
Block a user