wallet: Rework where we fetch the number of block confirmations for a tx in the wallet (#5738)

* wallet: Rework where we fetch the number of block confirmations for a tx in the wallet

* fix reserve handling
This commit is contained in:
Chris Stewart 2024-10-26 06:57:44 -05:00 committed by GitHub
parent 30663fe622
commit 8917188220
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 71 additions and 93 deletions

View file

@ -442,7 +442,8 @@ case class TransactionProcessing(
toBeUpdated = outputsBeingSpent.flatMap( toBeUpdated = outputsBeingSpent.flatMap(
markAsSpent(_, transaction.txIdBE) markAsSpent(_, transaction.txIdBE)
) )
processed <- utxoHandling.updateUtxoSpentConfirmedStates(toBeUpdated) withBlocks <- utxoHandling.getDbsByRelevantBlock(toBeUpdated)
processed <- utxoHandling.updateUtxoSpentConfirmedStates(withBlocks)
} yield { } yield {
processed processed
} }
@ -619,19 +620,25 @@ case class TransactionProcessing(
) )
val updateTxDbF = insertTransaction(transaction, blockHashOpt) val updateTxDbF = insertTransaction(transaction, blockHashOpt)
val withBlocksF = updateTxDbF.flatMap(_ =>
utxoHandling.getDbsByRelevantBlock(Vector(foundTxo)))
// Update Txo State // Update Txo State
updateTxDbF.flatMap(_ => withBlocksF.flatMap { withBlocks =>
utxoHandling.updateUtxoReceiveConfirmedStates(foundTxo).flatMap { utxoHandling
case Some(txo) => .updateUtxoReceiveConfirmedStates(withBlocks)
logger.debug( .flatMap { txos =>
s"Updated block_hash of txo=${txo.txid.hex} new block hash=${blockHash.hex}" if (txos.length == 1) {
) val txo = txos.head
Future.successful(txo) logger.debug(
case None => s"Updated block_hash of txo=${txo.txid.hex} new block hash=${blockHash.hex}"
// State was not updated so we need to update it so it's block hash is in the database )
spendingInfoDAO.update(foundTxo) Future.successful(txo)
}) } else {
// State was not updated so we need to update it so it's block hash is in the database
spendingInfoDAO.update(foundTxo)
}
}
}
case None => case None =>
logger.debug( logger.debug(
s"Skipping further processing of transaction=${transaction.txIdBE.hex}, already processed." s"Skipping further processing of transaction=${transaction.txIdBE.hex}, already processed."

View file

@ -157,28 +157,15 @@ case class UtxoHandling(
} }
private[wallet] def updateUtxoSpentConfirmedStates( private[wallet] def updateUtxoSpentConfirmedStates(
txo: SpendingInfoDb relevantBlocks: Map[Option[BlockHashWithConfs], Vector[SpendingInfoDb]]
): Future[Option[SpendingInfoDb]] = {
updateUtxoSpentConfirmedStates(Vector(txo)).map(_.headOption)
}
private[wallet] def updateUtxoSpentConfirmedStates(
txos: Vector[SpendingInfoDb]
): Future[Vector[SpendingInfoDb]] = { ): Future[Vector[SpendingInfoDb]] = {
updateUtxoStates(txos, UtxoHandling.updateSpentTxoWithConfs) updateUtxoStates(relevantBlocks, UtxoHandling.updateSpentTxoWithConfs)
} }
private[wallet] def updateUtxoReceiveConfirmedStates( private[wallet] def updateUtxoReceiveConfirmedStates(
txo: SpendingInfoDb relevantBlocks: Map[Option[BlockHashWithConfs], Vector[SpendingInfoDb]]
): Future[Option[SpendingInfoDb]] = {
updateUtxoReceiveConfirmedStates(Vector(txo))
.map(_.headOption)
}
private[wallet] def updateUtxoReceiveConfirmedStates(
txos: Vector[SpendingInfoDb]
): Future[Vector[SpendingInfoDb]] = { ): Future[Vector[SpendingInfoDb]] = {
updateUtxoStates(txos, UtxoHandling.updateReceivedTxoWithConfs) updateUtxoStates(relevantBlocks, UtxoHandling.updateReceivedTxoWithConfs)
} }
/** Returns a map of the SpendingInfoDbs with their relevant block. If the /** Returns a map of the SpendingInfoDbs with their relevant block. If the
@ -189,7 +176,7 @@ case class UtxoHandling(
*/ */
private[wallet] def getDbsByRelevantBlock( private[wallet] def getDbsByRelevantBlock(
spendingInfoDbs: Vector[SpendingInfoDb] spendingInfoDbs: Vector[SpendingInfoDb]
): Future[Map[Option[DoubleSha256DigestBE], Vector[SpendingInfoDb]]] = { ): Future[Map[Option[BlockHashWithConfs], Vector[SpendingInfoDb]]] = {
val txIds = val txIds =
spendingInfoDbs.map { db => spendingInfoDbs.map { db =>
db.spendingTxIdOpt match { db.spendingTxIdOpt match {
@ -200,7 +187,7 @@ case class UtxoHandling(
} }
} }
transactionDAO.findByTxIdBEs(txIds).map { txDbs => val blockHashMapF = transactionDAO.findByTxIdBEs(txIds).map { txDbs =>
val blockHashMap = txDbs.map(db => db.txIdBE -> db.blockHashOpt).toMap val blockHashMap = txDbs.map(db => db.txIdBE -> db.blockHashOpt).toMap
val blockHashAndDb = spendingInfoDbs.map { txo => val blockHashAndDb = spendingInfoDbs.map { txo =>
val txToUse = txo.state match { val txToUse = txo.state match {
@ -216,6 +203,8 @@ case class UtxoHandling(
blockHashOpt -> vec.map(_._2) blockHashOpt -> vec.map(_._2)
} }
} }
blockHashMapF.flatMap(getConfirmationsForBlocks)
} }
/** Updates all the given SpendingInfoDbs to the correct state based on how /** Updates all the given SpendingInfoDbs to the correct state based on how
@ -227,54 +216,30 @@ case class UtxoHandling(
* of confirmations * of confirmations
*/ */
private def updateUtxoStates( private def updateUtxoStates(
spendingInfoDbs: Vector[SpendingInfoDb], txsByBlock: Map[Option[BlockHashWithConfs], Vector[SpendingInfoDb]],
fn: (SpendingInfoDb, Int, Int) => SpendingInfoDb fn: (SpendingInfoDb, Int, Int) => SpendingInfoDb
): Future[Vector[SpendingInfoDb]] = { ): Future[Vector[SpendingInfoDb]] = {
val relevantBlocksF val toUpdates: Vector[SpendingInfoDb] = txsByBlock.flatMap {
: Future[Map[Option[DoubleSha256DigestBE], Vector[SpendingInfoDb]]] = { case (Some(blockHashWithConfs), txos) =>
getDbsByRelevantBlock(spendingInfoDbs) blockHashWithConfs.confirmationsOpt match {
} case None =>
logger.warn(
// fetch all confirmations for those blocks, do it in parallel s"Given txos exist in block (${blockHashWithConfs.blockHash.hex}) that we do not have or that has been reorged! $txos"
// as an optimzation, previously we would fetch sequentially )
val blocksWithConfsF Vector.empty
: Future[Map[Option[BlockHashWithConfs], Vector[SpendingInfoDb]]] = { case Some(confs) =>
for { txos.map(fn(_, confs, walletConfig.requiredConfirmations))
relevantBlocks <- relevantBlocksF }
blocksWithConfirmations <- getConfirmationsForBlocks(relevantBlocks) case (None, txos) =>
} yield blocksWithConfirmations logger.debug(
} s"Currently have ${txos.size} transactions in the mempool"
)
val toUpdateF = blocksWithConfsF.map { txsByBlock => txos
val toUpdateFs: Vector[SpendingInfoDb] = txsByBlock.flatMap { }.toVector
case (Some(blockHashWithConfs), txos) => if (toUpdates.nonEmpty)
blockHashWithConfs.confirmationsOpt match { logger.info(s"${toUpdates.size} txos are now confirmed!")
case None => else logger.trace("No txos to be confirmed")
logger.warn( spendingInfoDAO.upsertAllSpendingInfoDb(toUpdates)
s"Given txos exist in block (${blockHashWithConfs.blockHash.hex}) that we do not have or that has been reorged! $txos"
)
Vector.empty
case Some(confs) =>
txos.map(fn(_, confs, walletConfig.requiredConfirmations))
}
case (None, txos) =>
logger.debug(
s"Currently have ${txos.size} transactions in the mempool"
)
txos
}.toVector
toUpdateFs
}
for {
toUpdate <- toUpdateF
_ =
if (toUpdate.nonEmpty)
logger.info(s"${toUpdate.size} txos are now confirmed!")
else logger.trace("No txos to be confirmed")
updated <- spendingInfoDAO.upsertAllSpendingInfoDb(toUpdate)
} yield updated
} }
/** Fetches confirmations for the given blocks in parallel */ /** Fetches confirmations for the given blocks in parallel */
@ -429,19 +394,21 @@ case class UtxoHandling(
utxos: Vector[SpendingInfoDb] utxos: Vector[SpendingInfoDb]
): Future[Vector[SpendingInfoDb]] = { ): Future[Vector[SpendingInfoDb]] = {
logger.info(s"Unreserving utxos ${utxos.map(_.outPoint)}") logger.info(s"Unreserving utxos ${utxos.map(_.outPoint)}")
val updatedUtxosF = Future { val updatedUtxosF
: Future[Map[Option[BlockHashWithConfs], Vector[SpendingInfoDb]]] = {
// make sure exception isn't thrown outside of a future to fix // make sure exception isn't thrown outside of a future to fix
// see: https://github.com/bitcoin-s/bitcoin-s/issues/3813 // see: https://github.com/bitcoin-s/bitcoin-s/issues/3813
val unreserved = utxos.filterNot(_.state == TxoState.Reserved) val unreserved = utxos.filterNot(_.state == TxoState.Reserved)
require( if (unreserved.nonEmpty) {
unreserved.isEmpty, val exn = new IllegalArgumentException(
s"Some utxos are not reserved, got $unreserved" s"Some utxos are not reserved, got unreserved=$unreserved utxos=$utxos")
) Future.failed(exn)
} else {
// unmark all utxos are reserved // unmark all utxos are reserved
val updatedUtxos = utxos val updatedUtxos = utxos
.map(_.copyWithState(TxoState.PendingConfirmationsReceived)) .map(_.copyWithState(TxoState.PendingConfirmationsReceived))
updatedUtxos getDbsByRelevantBlock(updatedUtxos)
}
} }
for { for {
@ -450,8 +417,9 @@ case class UtxoHandling(
updatedConfirmed <- updateUtxoReceiveConfirmedStates(updatedUtxos) updatedConfirmed <- updateUtxoReceiveConfirmedStates(updatedUtxos)
// update the utxos that are in blocks but not considered confirmed yet // update the utxos that are in blocks but not considered confirmed yet
pendingConf = updatedUtxos.filterNot(utxo => pendingConf = updatedUtxos.values.flatten
updatedConfirmed.exists(_.outPoint == utxo.outPoint)) .filterNot(utxo => updatedConfirmed.exists(_.outPoint == utxo.outPoint))
.toVector
updated <- spendingInfoDAO.updateAllSpendingInfoDb( updated <- spendingInfoDAO.updateAllSpendingInfoDb(
pendingConf ++ updatedConfirmed pendingConf ++ updatedConfirmed
) )
@ -478,9 +446,12 @@ case class UtxoHandling(
_ = logger.debug(s"Updating states of ${infos.size} pending utxos...") _ = logger.debug(s"Updating states of ${infos.size} pending utxos...")
receivedUtxos = infos.filter(_.state.isInstanceOf[ReceivedState]) receivedUtxos = infos.filter(_.state.isInstanceOf[ReceivedState])
spentUtxos = infos.filter(_.state.isInstanceOf[SpentState]) spentUtxos = infos.filter(_.state.isInstanceOf[SpentState])
updatedReceivedInfos <- updateUtxoReceiveConfirmedStates(receivedUtxos) receivedWithBlocks <- getDbsByRelevantBlock(receivedUtxos)
updatedSpentInfos <- updateUtxoSpentConfirmedStates(spentUtxos) spentWithBlocks <- getDbsByRelevantBlock(spentUtxos)
} yield (updatedReceivedInfos ++ updatedSpentInfos).toVector updatedReceivedInfos <- updateUtxoReceiveConfirmedStates(
receivedWithBlocks)
updatedSpentInfos <- updateUtxoSpentConfirmedStates(spentWithBlocks)
} yield (updatedReceivedInfos ++ updatedSpentInfos)
} }
/** Inserts the UTXO at the given index into our DB, swallowing the error if /** Inserts the UTXO at the given index into our DB, swallowing the error if