Replace Future[Wallet] -> Wallet parameter in {BitcoinSWalletTest, BitcoindRpcBackendUtil} (#5796)

This commit is contained in:
Chris Stewart 2024-11-30 09:04:14 -06:00 committed by GitHub
parent 04757f9039
commit f280f35431
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 28 additions and 76 deletions

View file

@ -31,7 +31,7 @@ import org.bitcoins.zmq.ZMQSubscriber
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.{ExecutionContext, Future}
/** Useful utilities to use in the wallet project for syncing things against
* bitcoind
@ -210,14 +210,9 @@ object BitcoindRpcBackendUtil extends BitcoinSLogger {
wallet: Wallet,
chainCallbacksOpt: Option[ChainCallbacks]
)(implicit system: ActorSystem): Wallet = {
// We need to create a promise so we can inject the wallet with the callback
// after we have created it into SyncUtil.getNodeApiWalletCallback
// so we don't lose the internal state of the wallet
val walletCallbackP = Promise[Wallet]()
val nodeApi = BitcoindRpcBackendUtil.buildBitcoindNodeApi(
bitcoind,
walletCallbackP.future,
wallet,
chainCallbacksOpt
)
val pairedWallet = Wallet(
@ -225,8 +220,6 @@ object BitcoindRpcBackendUtil extends BitcoinSLogger {
chainQueryApi = bitcoind
)(wallet.walletConfig)
walletCallbackP.success(pairedWallet)
pairedWallet
}
@ -293,13 +286,9 @@ object BitcoindRpcBackendUtil extends BitcoinSLogger {
wallet: DLCWallet,
chainCallbacksOpt: Option[ChainCallbacks]
)(implicit system: ActorSystem): DLCWallet = {
// We need to create a promise so we can inject the wallet with the callback
// after we have created it into SyncUtil.getNodeApiWalletCallback
// so we don't lose the internal state of the wallet
val walletCallbackP = Promise[DLCWallet]()
val nodeApi = BitcoindRpcBackendUtil.buildBitcoindNodeApi(
bitcoind,
walletCallbackP.future,
wallet,
chainCallbacksOpt
)
@ -310,8 +299,6 @@ object BitcoindRpcBackendUtil extends BitcoinSLogger {
val pairedWallet =
DLCWallet(bitcoindCallbackWallet)(dlcConfig, walletConfig)
walletCallbackP.success(pairedWallet)
pairedWallet
}
@ -344,7 +331,7 @@ object BitcoindRpcBackendUtil extends BitcoinSLogger {
*/
def buildBitcoindNodeApi(
bitcoindRpcClient: BitcoindRpcClient,
walletF: Future[WalletApi],
wallet: WalletApi,
chainCallbacksOpt: Option[ChainCallbacks]
)(implicit system: ActorSystem): NodeApi = {
import system.dispatcher
@ -361,38 +348,34 @@ object BitcoindRpcBackendUtil extends BitcoinSLogger {
parallelism = numParallelism
)
val sinkF: Future[Sink[(Block, GetBlockHeaderResult), Future[Done]]] = {
walletF.map { wallet =>
Sink.foreachAsync(1) {
case (block: Block, blockHeaderResult: GetBlockHeaderResult) =>
val blockProcessedF =
wallet.transactionProcessing.processBlock(block)
val executeCallbackF: Future[Unit] = {
for {
wallet <- blockProcessedF
_ <- handleChainCallbacks(
chainCallbacksOpt,
blockHeaderResult
)
} yield wallet
}
val sink: Sink[(Block, GetBlockHeaderResult), Future[Done]] = {
Sink.foreachAsync(1) {
case (block: Block, blockHeaderResult: GetBlockHeaderResult) =>
val blockProcessedF =
wallet.transactionProcessing.processBlock(block)
val executeCallbackF: Future[Unit] = {
for {
wallet <- blockProcessedF
_ <- handleChainCallbacks(
chainCallbacksOpt,
blockHeaderResult
)
} yield wallet
}
executeCallbackF
}
executeCallbackF
}
}
val doneF: Future[Done] = sinkF.flatMap { sink =>
val doneF: Future[Done] =
source
.via(fetchBlocksFlow)
.toMat(sink)(Keep.right)
.run()
}
for {
_ <- doneF
w <- walletF
_ <- w.utxoHandling.updateUtxoPendingStates()
_ <- wallet.utxoHandling.updateUtxoPendingStates()
} yield ()
}

View file

@ -1,6 +1,5 @@
package org.bitcoins.testkit.chain
import org.apache.pekko.actor.ActorSystem
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.chain.blockchain.sync.{FilterSync, FilterWithHeaderHash}
import org.bitcoins.chain.config.ChainAppConfig
@ -15,9 +14,7 @@ import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.crypto.DoubleSha256DigestBE
import org.bitcoins.rpc.client.common.{BitcoindRpcClient, BlockchainRpc}
import org.bitcoins.server.BitcoindRpcBackendUtil
import org.bitcoins.testkit.chain.fixture.BitcoindBaseVersionChainHandlerViaRpc
import org.bitcoins.wallet.Wallet
import org.bitcoins.wallet.sync.WalletSync
import scala.concurrent.{ExecutionContext, Future}
@ -113,17 +110,6 @@ abstract class SyncUtil extends BitcoinSLogger {
}
}
def getNodeApiWalletCallback(
bitcoindRpcClient: BitcoindRpcClient,
walletF: Future[Wallet]
)(implicit system: ActorSystem): NodeApi = {
BitcoindRpcBackendUtil.buildBitcoindNodeApi(
bitcoindRpcClient,
walletF,
None
)
}
def getNodeChainQueryApi(
bitcoind: BitcoindRpcClient
)(implicit ec: ExecutionContext): NodeChainQueryApi = {
@ -132,16 +118,6 @@ abstract class SyncUtil extends BitcoinSLogger {
node.NodeChainQueryApi(nodeApi, chainQuery)
}
def getNodeChainQueryApiWalletCallback(
bitcoind: BitcoindRpcClient,
walletF: Future[Wallet]
)(implicit system: ActorSystem): NodeChainQueryApi = {
val chainQuery = bitcoind
val nodeApi =
SyncUtil.getNodeApiWalletCallback(bitcoind, walletF)
node.NodeChainQueryApi(nodeApi, chainQuery)
}
/** Syncs a wallet against bitcoind by retrieving full blocks and then calling
* [[Wallet.processBlock()]]
*/

View file

@ -11,10 +11,9 @@ import org.bitcoins.core.currency.*
import org.bitcoins.dlc.wallet.{DLCAppConfig, DLCWallet}
import org.bitcoins.node.NodeCallbacks
import org.bitcoins.rpc.client.common.{BitcoindRpcClient, BitcoindVersion}
import org.bitcoins.server.BitcoinSAppConfig
import org.bitcoins.server.{BitcoinSAppConfig, BitcoindRpcBackendUtil}
import org.bitcoins.server.util.CallbackUtil
import org.bitcoins.testkit.EmbeddedPg
import org.bitcoins.testkit.chain.SyncUtil
import org.bitcoins.testkit.fixtures.BitcoinSFixture
import org.bitcoins.testkit.keymanager.KeyManagerTestUtil
import org.bitcoins.testkit.node.MockNodeApi
@ -335,30 +334,24 @@ object BitcoinSWalletTest extends WalletLogger {
system: ActorSystem
): Future[WalletWithBitcoindRpc] = {
import system.dispatcher
// we need to create a promise so we can inject the wallet with the callback
// after we have created it into SyncUtil.getNodeApiWalletCallback
// so we don't lose the internal state of the wallet
val walletCallbackP = Promise[Wallet]()
val walletWithBitcoindF = for {
wallet <- BitcoinSWalletTest.createWallet2Accounts(bitcoind, bitcoind)
// create the wallet with the appropriate callbacks now that
// we have them
walletWithCallback = Wallet(
nodeApi =
SyncUtil.getNodeApiWalletCallback(bitcoind, walletCallbackP.future),
nodeApi = BitcoindRpcBackendUtil.buildBitcoindNodeApi(
bitcoind,
wallet,
None
),
chainQueryApi = bitcoind
)(wallet.walletConfig)
// complete the walletCallbackP so we can handle the callbacks when they are
// called without hanging forever.
_ = walletCallbackP.success(walletWithCallback)
} yield WalletWithBitcoindRpc(
walletWithCallback,
bitcoind,
wallet.walletConfig
walletWithCallback.walletConfig
)
walletWithBitcoindF.failed.foreach(err => walletCallbackP.failure(err))
walletWithBitcoindF
}