Use BitcoindRpcBackendUtil.buildBitcoindNodeApi() in wallet test fixtures, re-implement getConfirmationsForBlocks() to use akka streams to avoid max-open-requests limit inside of akka (#5259)

This commit is contained in:
Chris Stewart 2023-10-12 18:02:06 -05:00 committed by GitHub
parent d65bd11bd6
commit 531c909597
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 25 additions and 76 deletions

View File

@ -4,7 +4,6 @@ import akka.NotUsed
import akka.stream.scaladsl.Flow
import org.bitcoins.commons.jsonmodels.bitcoind.GetBlockHeaderResult
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.crypto.DoubleSha256Digest
import org.bitcoins.rpc.client.common.BitcoindRpcClient
@ -15,8 +14,7 @@ object BitcoindStreamUtil {
/** Creates a flow that you can feed block hashes too and the block header and block will get emitted downstream */
def fetchBlocksBitcoind(
bitcoindRpcClient: BitcoindRpcClient,
parallelism: Int = FutureUtil.getParallelism)(implicit
ec: ExecutionContext): Flow[
parallelism: Int)(implicit ec: ExecutionContext): Flow[
DoubleSha256Digest,
(Block, GetBlockHeaderResult),
NotUsed] = {

View File

@ -1,5 +1,6 @@
package org.bitcoins.testkit.chain
import akka.actor.ActorSystem
import grizzled.slf4j.Logging
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.chain.blockchain.sync.{
@ -19,6 +20,7 @@ import org.bitcoins.core.util.FutureUtil
import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.rpc.client.common.BitcoindRpcClient
import org.bitcoins.rpc.client.v19.V19BlockFilterRpc
import org.bitcoins.server.BitcoindRpcBackendUtil
import org.bitcoins.testkit.chain.fixture.{
BitcoindBaseVersionChainHandlerViaRpc,
BitcoindBlockFilterRpcChainHandler,
@ -107,62 +109,10 @@ abstract class SyncUtil extends Logging {
def getNodeApiWalletCallback(
bitcoindRpcClient: BitcoindRpcClient,
walletF: Future[Wallet])(implicit ec: ExecutionContext): NodeApi = {
new NodeApi {
/** Request the underlying node to download the given blocks from its peers and feed the blocks to [[org.bitcoins.node.NodeCallbacks]].
*/
override def downloadBlocks(
blockHashes: Vector[DoubleSha256Digest]): Future[Unit] = {
logger.info(s"Fetching ${blockHashes.length} hashes from bitcoind")
val f: Vector[DoubleSha256Digest] => Future[Wallet] = { hashes =>
val blocksF =
FutureUtil.sequentially(hashes)(bitcoindRpcClient.getBlockRaw)
val updatedWalletF = for {
blocks <- blocksF
wallet <- walletF
processedWallet <- {
FutureUtil.foldLeftAsync(wallet, blocks) { case (wallet, block) =>
wallet.processBlock(block)
}
}
} yield processedWallet
updatedWalletF
}
val batchSize = 25
val batchedExecutedF = {
for {
wallet <- walletF
updatedWallet <-
FutureUtil.batchExecute[DoubleSha256Digest, Wallet](
elements = blockHashes,
f = f,
init = wallet,
batchSize = batchSize)
} yield updatedWallet
}
batchedExecutedF.map { _ =>
logger.info(
s"Done fetching ${blockHashes.length} hashes from bitcoind")
()
}
}
/** Broadcasts the given transaction over the P2P network
*/
override def broadcastTransactions(
transactions: Vector[Transaction]): Future[Unit] = {
bitcoindRpcClient.broadcastTransactions(transactions)
}
override def getConnectionCount: Future[Int] =
bitcoindRpcClient.getConnectionCount
}
walletF: Future[Wallet])(implicit system: ActorSystem): NodeApi = {
BitcoindRpcBackendUtil.buildBitcoindNodeApi(bitcoindRpcClient,
walletF,
None)
}
def getNodeChainQueryApi(bitcoind: BitcoindRpcClient)(implicit
@ -175,7 +125,7 @@ abstract class SyncUtil extends Logging {
def getNodeChainQueryApiWalletCallback(
bitcoind: BitcoindRpcClient,
walletF: Future[Wallet])(implicit
ec: ExecutionContext): NodeChainQueryApi = {
system: ActorSystem): NodeChainQueryApi = {
val chainQuery = bitcoind
val nodeApi =
SyncUtil.getNodeApiWalletCallback(bitcoind, walletF)

View File

@ -1,11 +1,12 @@
package org.bitcoins.wallet.internal
import akka.stream.scaladsl.{Sink, Source}
import org.bitcoins.core.api.wallet.db._
import org.bitcoins.core.consensus.Consensus
import org.bitcoins.core.hd.HDAccount
import org.bitcoins.core.protocol.script.{P2WPKHWitnessSPKV0, P2WPKHWitnessV0}
import org.bitcoins.core.protocol.transaction._
import org.bitcoins.core.util.BlockHashWithConfs
import org.bitcoins.core.util.{BlockHashWithConfs, FutureUtil}
import org.bitcoins.core.wallet.utxo.TxoState._
import org.bitcoins.core.wallet.utxo._
import org.bitcoins.crypto.DoubleSha256DigestBE
@ -191,22 +192,22 @@ private[wallet] trait UtxoHandling extends WalletLogger {
Vector[SpendingInfoDb]]): Future[
Map[Option[BlockHashWithConfs], Vector[SpendingInfoDb]]] = {
val blockHashesWithConfsVec = relevantBlocks.map {
case (blockHashOpt, spendingInfoDbs) =>
blockHashOpt match {
case Some(blockHash) =>
chainQueryApi
.getNumberOfConfirmations(blockHash)
.map(confs => Some(BlockHashWithConfs(blockHash, confs)))
.map(blockWithConfsOpt => (blockWithConfsOpt, spendingInfoDbs))
case None =>
Future.successful((None, spendingInfoDbs))
}
}
val resultF = Source(relevantBlocks)
.mapAsync(FutureUtil.getParallelism) {
case (blockHashOpt, spendingInfoDbs) =>
blockHashOpt match {
case Some(blockHash) =>
chainQueryApi
.getNumberOfConfirmations(blockHash)
.map(confs => Some(BlockHashWithConfs(blockHash, confs)))
.map(blockWithConfsOpt => (blockWithConfsOpt, spendingInfoDbs))
case None =>
Future.successful((None, spendingInfoDbs))
}
}
.runWith(Sink.seq)
Future
.sequence(blockHashesWithConfsVec)
.map(_.toMap)
resultF.map(_.toMap)
}
/** Constructs a DB level representation of the given UTXO, and persist it to disk */