wallet: Get processBlockCachedUtxos() using DBIOAction (#5740)

* wallet: Get processBlockCachedUtxos() using DBIOAction

* Remove unused val
This commit is contained in:
Chris Stewart 2024-10-29 06:00:57 -05:00 committed by GitHub
parent 2366b80c70
commit 3ae69b6ab5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -179,15 +179,11 @@ case class TransactionProcessing(
// fetch all received spending info dbs relevant to txs in this block to improve performance // fetch all received spending info dbs relevant to txs in this block to improve performance
val receivedSpendingInfoDbsF = val receivedSpendingInfoDbsF =
spendingInfoDAO spendingInfoDAO
.findTxs(block.transactions.toVector) .findTxs(block.transactions)
val cachedReceivedOptF = receivedSpendingInfoDbsF val cachedReceivedOptF = receivedSpendingInfoDbsF
.map(Some(_)) // reduce allocations by creating Some here .map(Some(_)) // reduce allocations by creating Some here
// fetch all spending infoDbs for this block to improve performance
val spentSpendingInfoDbsF =
spendingInfoDAO.findOutputsBeingSpent(block.transactions.toVector)
val blockHash = block.blockHeader.hashBE val blockHash = block.blockHeader.hashBE
val blockHashWithConfsOptF: Future[Option[BlockHashWithConfs]] = val blockHashWithConfsOptF: Future[Option[BlockHashWithConfs]] =
WalletUtil.getBlockHashWithConfs(chainQueryApi, blockHash) WalletUtil.getBlockHashWithConfs(chainQueryApi, blockHash)
@ -196,71 +192,42 @@ case class TransactionProcessing(
// as an optimization // as an optimization
val relevantReceivedOutputsForBlockF = getRelevantOutputsForBlock(block) val relevantReceivedOutputsForBlockF = getRelevantOutputsForBlock(block)
val resultF: Future[Future[Unit]] = for { val actionsF: Future[Vector[
DBIOAction[ProcessTxResult, NoStream, Effect.Read & Effect.Write]]] =
for {
// map on these first so we don't have to call // map on these first so we don't have to call
// .map everytime we iterate through a tx // .map everytime we iterate through a tx
// which is costly (thread overhead) // which is costly (thread overhead)
receivedSpendingInfoDbsOpt <- cachedReceivedOptF receivedSpendingInfoDbsOpt <- cachedReceivedOptF
spentSpendingInfoDbs <- spentSpendingInfoDbsF
relevantReceivedOutputsForBlock <- relevantReceivedOutputsForBlock <-
relevantReceivedOutputsForBlockF relevantReceivedOutputsForBlockF
blockHashWithConfsOpt <- blockHashWithConfsOptF blockHashWithConfsOpt <- blockHashWithConfsOptF
} yield { actions = {
// we need to keep a cache of spentSpendingInfoDb block.transactions.map { transaction =>
// for the case where we receive & then spend that val relevantReceivedOutputsForTx = relevantReceivedOutputsForBlock
// same utxo in the same block
var cachedSpentOpt: Option[Vector[SpendingInfoDb]] = {
Some(spentSpendingInfoDbs)
}
val blockInputs = block.transactions.flatMap(_.inputs)
val processedF: Future[Unit] = {
block.transactions.foldLeft(Future.unit) { (walletF, transaction) =>
for {
_ <- walletF
relevantReceivedOutputsForTx = relevantReceivedOutputsForBlock
.getOrElse(transaction.txIdBE, Vector.empty) .getOrElse(transaction.txIdBE, Vector.empty)
action = for {
action <-
processTransactionImpl( processTransactionImpl(
transaction = transaction, transaction = transaction,
blockHashWithConfsOpt = blockHashWithConfsOpt, blockHashWithConfsOpt = blockHashWithConfsOpt,
newTags = Vector.empty, newTags = Vector.empty,
receivedSpendingInfoDbsOpt = receivedSpendingInfoDbsOpt, receivedSpendingInfoDbsOpt = receivedSpendingInfoDbsOpt,
spentSpendingInfoDbsOpt = cachedSpentOpt, spentSpendingInfoDbsOpt = None,
relevantReceivedOutputs = relevantReceivedOutputsForTx relevantReceivedOutputs = relevantReceivedOutputsForTx
) )
} yield {
processTxResult <- safeDatabase.run(action) action
_ = {
// need to look if a received utxo is spent in the same block
// if so, we need to update our cachedSpentF
val spentInSameBlock: Vector[SpendingInfoDb] = {
processTxResult.updatedIncoming.filter { spendingInfoDb =>
blockInputs.exists(
_.previousOutput == spendingInfoDb.outPoint
)
} }
} }
// add it to the cache
val newCachedSpentOpt = {
cachedSpentOpt match {
case Some(spentSpendingInfo) =>
Some(spentSpendingInfo ++ spentInSameBlock)
case None =>
Some(spentInSameBlock)
}
}
cachedSpentOpt = newCachedSpentOpt
} }
} yield { } yield {
() actions
}
}
}
processedF
} }
resultF.flatten actionsF
.flatMap(actions => Future.sequence(actions.map(safeDatabase.run)))
.map(_ => ())
} }
override def findTransaction( override def findTransaction(