Make fundRawTransactionInternal use DBIOActions (#4575)

* Make fundRawTransactionInternal use DBIOActions

* Fix to correctly use callback

* Move reservedUTXOsCallbackF to FundRawTxHelper
This commit is contained in:
benthecarman 2022-08-06 17:14:02 -05:00 committed by GitHub
parent cf22816003
commit 6119a334fa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 138 additions and 86 deletions

View file

@ -4,10 +4,13 @@ import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.wallet.fee.FeeUnit import org.bitcoins.core.wallet.fee.FeeUnit
import org.bitcoins.core.wallet.utxo.{InputInfo, ScriptSignatureParams} import org.bitcoins.core.wallet.utxo.{InputInfo, ScriptSignatureParams}
import scala.concurrent.Future
case class FundRawTxHelper[T <: RawTxFinalizer]( case class FundRawTxHelper[T <: RawTxFinalizer](
txBuilderWithFinalizer: RawTxBuilderWithFinalizer[T], txBuilderWithFinalizer: RawTxBuilderWithFinalizer[T],
scriptSigParams: Vector[ScriptSignatureParams[InputInfo]], scriptSigParams: Vector[ScriptSignatureParams[InputInfo]],
feeRate: FeeUnit) { feeRate: FeeUnit,
reservedUTXOsCallbackF: Future[Unit]) {
/** Produces the unsigned transaction built by fundrawtransaction */ /** Produces the unsigned transaction built by fundrawtransaction */
def unsignedTx: Transaction = txBuilderWithFinalizer.buildTx() def unsignedTx: Transaction = txBuilderWithFinalizer.buildTx()

View file

@ -103,7 +103,7 @@ abstract class DLCWallet
DLCActionBuilder(dlcWalletDAOs) DLCActionBuilder(dlcWalletDAOs)
} }
private lazy val safeDatabase: SafeDatabase = dlcDAO.safeDatabase private lazy val safeDLCDatabase: SafeDatabase = dlcDAO.safeDatabase
private lazy val walletDatabase: SafeDatabase = addressDAO.safeDatabase private lazy val walletDatabase: SafeDatabase = addressDAO.safeDatabase
/** Updates the contract Id in the wallet database for the given offer and accept */ /** Updates the contract Id in the wallet database for the given offer and accept */
@ -224,7 +224,7 @@ abstract class DLCWallet
val updateOracleSigsA = val updateOracleSigsA =
actionBuilder.updateDLCOracleSigsAction(outcomeAndSigByNonce) actionBuilder.updateDLCOracleSigsAction(outcomeAndSigByNonce)
for { for {
updates <- safeDatabase.runVec(updateOracleSigsA) updates <- safeDLCDatabase.runVec(updateOracleSigsA)
} yield updates } yield updates
} }
@ -269,7 +269,7 @@ abstract class DLCWallet
// allow this to fail in the case they have already been unreserved // allow this to fail in the case they have already been unreserved
_ <- unmarkUTXOsAsReserved(dbs).recover { case _: Throwable => () } _ <- unmarkUTXOsAsReserved(dbs).recover { case _: Throwable => () }
action = actionBuilder.deleteDLCAction(dlcId) action = actionBuilder.deleteDLCAction(dlcId)
_ <- safeDatabase.run(action) _ <- safeDLCDatabase.run(action)
} yield () } yield ()
} }
@ -484,7 +484,7 @@ abstract class DLCWallet
dlcInputs = dlcInputs, dlcInputs = dlcInputs,
dlcOfferDb = dlcOfferDb) dlcOfferDb = dlcOfferDb)
_ <- safeDatabase.run(offerActions) _ <- safeDLCDatabase.run(offerActions)
status <- findDLC(dlcId) status <- findDLC(dlcId)
_ <- dlcConfig.walletCallbacks.executeOnDLCStateChange(logger, status.get) _ <- dlcConfig.walletCallbacks.executeOnDLCStateChange(logger, status.get)
} yield offer } yield offer
@ -624,7 +624,7 @@ abstract class DLCWallet
acceptDb, acceptDb,
inputsDb, inputsDb,
contractDataDb) <- contractDataDb) <-
safeDatabase.run(zipped) safeDLCDatabase.run(zipped)
announcementDataDbs = announcementDataDbs =
createdDbs ++ groupedAnnouncements.existingAnnouncements createdDbs ++ groupedAnnouncements.existingAnnouncements
@ -652,7 +652,7 @@ abstract class DLCWallet
createAnnouncementAction = dlcAnnouncementDAO.createAllAction( createAnnouncementAction = dlcAnnouncementDAO.createAllAction(
dlcAnnouncementDbs) dlcAnnouncementDbs)
_ <- safeDatabase.run( _ <- safeDLCDatabase.run(
DBIOAction.seq(createNonceAction, createAnnouncementAction)) DBIOAction.seq(createNonceAction, createAnnouncementAction))
} yield { } yield {
InitializedAccept( InitializedAccept(
@ -867,7 +867,7 @@ abstract class DLCWallet
cetSigsDb = sigsDbs, cetSigsDb = sigsDbs,
refundSigsDb = refundSigsDb refundSigsDb = refundSigsDb
) )
_ <- safeDatabase.run(actions) _ <- safeDLCDatabase.run(actions)
dlcDb <- updateDLCContractIds(offer, accept) dlcDb <- updateDLCContractIds(offer, accept)
_ = logger.info( _ = logger.info(
s"Created DLCAccept for tempContractId ${offer.tempContractId.hex} with contract Id ${contractId.toHex}") s"Created DLCAccept for tempContractId ${offer.tempContractId.hex} with contract Id ${contractId.toHex}")
@ -968,7 +968,7 @@ abstract class DLCWallet
sigsAction, sigsAction,
refundSigAction, refundSigAction,
acceptDbAction)) acceptDbAction))
_ <- safeDatabase.run(actions) _ <- safeDLCDatabase.run(actions)
// .get is safe here because we must have an offer if we have a dlcDAO // .get is safe here because we must have an offer if we have a dlcDAO
offerDb <- dlcOfferDAO.findByDLCId(dlc.dlcId).map(_.head) offerDb <- dlcOfferDAO.findByDLCId(dlc.dlcId).map(_.head)
@ -1733,7 +1733,7 @@ abstract class DLCWallet
dlc dlc
} }
} }
safeDatabase.run(dlcAction).flatMap { intermediaries => safeDLCDatabase.run(dlcAction).flatMap { intermediaries =>
val actions = intermediaries.map { intermediary => val actions = intermediaries.map { intermediary =>
getWalletDLCDbsAction(intermediary).map { getWalletDLCDbsAction(intermediary).map {
case (closingTxOpt, payoutAddrOpt) => case (closingTxOpt, payoutAddrOpt) =>
@ -1778,7 +1778,7 @@ abstract class DLCWallet
} }
override def findDLC(dlcId: Sha256Digest): Future[Option[DLCStatus]] = { override def findDLC(dlcId: Sha256Digest): Future[Option[DLCStatus]] = {
val intermediaryF = safeDatabase.run(findDLCAction(dlcId)) val intermediaryF = safeDLCDatabase.run(findDLCAction(dlcId))
intermediaryF.flatMap { intermediaryF.flatMap {
case None => Future.successful(None) case None => Future.successful(None)

View file

@ -32,7 +32,7 @@ import scala.concurrent._
*/ */
private[bitcoins] trait DLCTransactionProcessing extends TransactionProcessing { private[bitcoins] trait DLCTransactionProcessing extends TransactionProcessing {
self: DLCWallet => self: DLCWallet =>
private lazy val safeDatabase: SafeDatabase = dlcDAO.safeDatabase private lazy val safeDLCDatabase: SafeDatabase = dlcDAO.safeDatabase
/** Calculates the new state of the DLCDb based on the closing transaction, /** Calculates the new state of the DLCDb based on the closing transaction,
* will delete old CET sigs that are no longer needed after execution * will delete old CET sigs that are no longer needed after execution
@ -224,7 +224,7 @@ private[bitcoins] trait DLCTransactionProcessing extends TransactionProcessing {
_ <- updateAnnouncementA _ <- updateAnnouncementA
} yield updatedDlcDb } yield updatedDlcDb
} }
updatedDlcDb <- safeDatabase.run(actions) updatedDlcDb <- safeDLCDatabase.run(actions)
} yield { } yield {
logger.info( logger.info(
s"Done calculating RemoteClaimed outcome for dlcId=${dlcId.hex}") s"Done calculating RemoteClaimed outcome for dlcId=${dlcId.hex}")

View file

@ -525,7 +525,8 @@ class WalletSendingTest extends BitcoinSWalletTest {
recoverToExceptionIf[RuntimeException](failedTx) recoverToExceptionIf[RuntimeException](failedTx)
exnF.map(err => exnF.map(err =>
assert(err.getMessage.contains("Failed to reserve all utxos"))) assert(err.getMessage.contains(
"Not enough value in given outputs to make transaction spending 599500000 sats plus fees")))
} }
} }

View file

@ -94,7 +94,7 @@ abstract class Wallet
private[bitcoins] val stateDescriptorDAO: WalletStateDescriptorDAO = private[bitcoins] val stateDescriptorDAO: WalletStateDescriptorDAO =
WalletStateDescriptorDAO() WalletStateDescriptorDAO()
private val safeDatabase: SafeDatabase = spendingInfoDAO.safeDatabase protected lazy val safeDatabase: SafeDatabase = spendingInfoDAO.safeDatabase
val nodeApi: NodeApi val nodeApi: NodeApi
val chainQueryApi: ChainQueryApi val chainQueryApi: ChainQueryApi
val creationTime: Instant = keyManager.creationTime val creationTime: Instant = keyManager.creationTime
@ -447,7 +447,7 @@ abstract class Wallet
_ = require( _ = require(
tmp.outputs.size == 1, tmp.outputs.size == 1,
s"Created tx is not as expected, does not have 1 output, got $tmp") s"Created tx is not as expected, does not have 1 output, got $tmp")
rawTxHelper = FundRawTxHelper(withFinalizer, utxos, feeRate) rawTxHelper = FundRawTxHelper(withFinalizer, utxos, feeRate, Future.unit)
tx <- finishSend(rawTxHelper, tx <- finishSend(rawTxHelper,
tmp.outputs.head.value, tmp.outputs.head.value,
feeRate, feeRate,
@ -495,7 +495,7 @@ abstract class Wallet
utxos, utxos,
feeRate, feeRate,
changeAddr.scriptPubKey) changeAddr.scriptPubKey)
rawTxHelper = FundRawTxHelper(txBuilder, utxos, feeRate) rawTxHelper = FundRawTxHelper(txBuilder, utxos, feeRate, Future.unit)
tx <- finishSend(rawTxHelper, amount, feeRate, newTags) tx <- finishSend(rawTxHelper, amount, feeRate, newTags)
} yield tx } yield tx
} }
@ -596,7 +596,10 @@ abstract class Wallet
sequence) sequence)
amount = outputs.foldLeft(CurrencyUnits.zero)(_ + _.value) amount = outputs.foldLeft(CurrencyUnits.zero)(_ + _.value)
rawTxHelper = FundRawTxHelper(txBuilder, spendingInfos, newFeeRate) rawTxHelper = FundRawTxHelper(txBuilder,
spendingInfos,
newFeeRate,
Future.unit)
tx <- tx <-
finishSend(rawTxHelper, amount, newFeeRate, Vector.empty) finishSend(rawTxHelper, amount, newFeeRate, Vector.empty)
} yield tx } yield tx

View file

@ -19,7 +19,6 @@ import org.bitcoins.core.wallet.utxo.{
AddressTagType AddressTagType
} }
import org.bitcoins.crypto.ECPublicKey import org.bitcoins.crypto.ECPublicKey
import org.bitcoins.db.SafeDatabase
import org.bitcoins.wallet._ import org.bitcoins.wallet._
import slick.dbio.{DBIOAction, Effect, NoStream} import slick.dbio.{DBIOAction, Effect, NoStream}
@ -32,8 +31,6 @@ import scala.util.{Failure, Success}
private[wallet] trait AddressHandling extends WalletLogger { private[wallet] trait AddressHandling extends WalletLogger {
self: Wallet => self: Wallet =>
private lazy val safeDatabase: SafeDatabase = addressDAO.safeDatabase
def contains( def contains(
address: BitcoinAddress, address: BitcoinAddress,
accountOpt: Option[HDAccount]): Future[Boolean] = { accountOpt: Option[HDAccount]): Future[Boolean] = {
@ -295,6 +292,13 @@ private[wallet] trait AddressHandling extends WalletLogger {
getNewAddressHelperAction(account, HDChainType.External) getNewAddressHelperAction(account, HDChainType.External)
} }
def getNewChangeAddressAction(account: AccountDb): DBIOAction[
BitcoinAddress,
NoStream,
Effect.Read with Effect.Write with Effect.Transactional] = {
getNewAddressHelperAction(account, HDChainType.Change)
}
def getNewAddress(account: AccountDb): Future[BitcoinAddress] = { def getNewAddress(account: AccountDb): Future[BitcoinAddress] = {
safeDatabase.run(getNewAddressAction(account)) safeDatabase.run(getNewAddressAction(account))
} }

View file

@ -1,7 +1,8 @@
package org.bitcoins.wallet.internal package org.bitcoins.wallet.internal
import org.bitcoins.core.api.wallet.db.{AccountDb, SpendingInfoDb}
import org.bitcoins.core.api.wallet._ import org.bitcoins.core.api.wallet._
import org.bitcoins.core.api.wallet.db.AccountDb
import org.bitcoins.core.hd.HDAccount
import org.bitcoins.core.policy.Policy import org.bitcoins.core.policy.Policy
import org.bitcoins.core.protocol.transaction._ import org.bitcoins.core.protocol.transaction._
import org.bitcoins.core.wallet.builder._ import org.bitcoins.core.wallet.builder._
@ -10,9 +11,9 @@ import org.bitcoins.core.wallet.utxo._
import org.bitcoins.wallet.{Wallet, WalletLogger} import org.bitcoins.wallet.{Wallet, WalletLogger}
import scala.concurrent.Future import scala.concurrent.Future
import scala.util.control.NonFatal
trait FundTransactionHandling extends WalletLogger { self: Wallet => trait FundTransactionHandling extends WalletLogger { self: Wallet =>
import walletConfig.profile.api._
def fundRawTransaction( def fundRawTransaction(
destinations: Vector[TransactionOutput], destinations: Vector[TransactionOutput],
@ -71,24 +72,52 @@ trait FundTransactionHandling extends WalletLogger { self: Wallet =>
fromTagOpt: Option[AddressTag], fromTagOpt: Option[AddressTag],
markAsReserved: Boolean): Future[ markAsReserved: Boolean): Future[
FundRawTxHelper[ShufflingNonInteractiveFinalizer]] = { FundRawTxHelper[ShufflingNonInteractiveFinalizer]] = {
val action = fundRawTransactionInternalAction(destinations,
feeRate,
fromAccount,
coinSelectionAlgo,
fromTagOpt,
markAsReserved)
for {
txHelper <- safeDatabase.run(action)
_ <- txHelper.reservedUTXOsCallbackF
} yield txHelper
}
private[bitcoins] def fundRawTransactionInternalAction(
destinations: Vector[TransactionOutput],
feeRate: FeeUnit,
fromAccount: AccountDb,
coinSelectionAlgo: CoinSelectionAlgo = CoinSelectionAlgo.LeastWaste,
fromTagOpt: Option[AddressTag],
markAsReserved: Boolean): DBIOAction[
FundRawTxHelper[ShufflingNonInteractiveFinalizer],
NoStream,
Effect.Read with Effect.Write with Effect.Transactional] = {
val amts = destinations.map(_.value) val amts = destinations.map(_.value)
//need to allow 0 for OP_RETURN outputs //need to allow 0 for OP_RETURN outputs
require(amts.forall(_.satoshis.toBigInt >= 0), require(amts.forall(_.satoshis.toBigInt >= 0),
s"Cannot fund a transaction for a negative amount, got=$amts") s"Cannot fund a transaction for a negative amount, got=$amts")
val amt = amts.sum val amt = amts.sum
logger.info(s"Attempting to fund a tx for amt=${amt} with feeRate=$feeRate") logger.info(s"Attempting to fund a tx for amt=$amt with feeRate=$feeRate")
val utxosF: Future[Vector[(SpendingInfoDb, Transaction)]] = val utxosA =
for { for {
utxos <- fromTagOpt match { utxos <- fromTagOpt match {
case None => case None =>
listUtxos(fromAccount.hdAccount) spendingInfoDAO.findAllUnspentForAccountAction(
fromAccount.hdAccount)
case Some(tag) => case Some(tag) =>
listUtxos(fromAccount.hdAccount, tag) spendingInfoDAO.findAllUnspentForTagAction(tag).map { utxos =>
utxos.filter(utxo =>
HDAccount.isSameAccount(bip32Path = utxo.privKeyPath,
account = fromAccount.hdAccount))
} }
utxoWithTxs <- Future.sequence { }
utxoWithTxs <- DBIO.sequence {
utxos.map { utxo => utxos.map { utxo =>
transactionDAO transactionDAO
.findByOutPoint(utxo.outPoint) .findByTxIdAction(utxo.outPoint.txIdBE)
.map(tx => (utxo, tx.get.transaction)) .map(tx => (utxo, tx.get.transaction))
} }
} }
@ -99,9 +128,9 @@ trait FundTransactionHandling extends WalletLogger { self: Wallet =>
} yield utxoWithTxs.filter(utxo => } yield utxoWithTxs.filter(utxo =>
!immatureCoinbases.exists(_._1 == utxo._1)) !immatureCoinbases.exists(_._1 == utxo._1))
val selectedUtxosF: Future[Vector[(SpendingInfoDb, Transaction)]] = val selectedUtxosA =
for { for {
walletUtxos <- utxosF walletUtxos <- utxosA
// filter out dust // filter out dust
selectableUtxos = walletUtxos selectableUtxos = walletUtxos
@ -118,14 +147,14 @@ trait FundTransactionHandling extends WalletLogger { self: Wallet =>
) )
filtered = walletUtxos.filter(utxo => filtered = walletUtxos.filter(utxo =>
utxos.exists(_.outPoint == utxo._1.outPoint)) utxos.exists(_.outPoint == utxo._1.outPoint))
_ <- (_, callbackF) <-
if (markAsReserved) markUTXOsAsReserved(filtered.map(_._1)) if (markAsReserved) markUTXOsAsReservedAction(filtered.map(_._1))
else Future.unit else DBIO.successful((Vector.empty, Future.unit))
} yield filtered } yield (filtered, callbackF)
val resultF = for { for {
selectedUtxos <- selectedUtxosF (selectedUtxos, callbackF) <- selectedUtxosA
change <- getNewChangeAddress(fromAccount) change <- getNewChangeAddressAction(fromAccount)
utxoSpendingInfos = { utxoSpendingInfos = {
selectedUtxos.map { case (utxo, prevTx) => selectedUtxos.map { case (utxo, prevTx) =>
utxo.toUTXOInfo(keyManager = self.keyManager, prevTx) utxo.toUTXOInfo(keyManager = self.keyManager, prevTx)
@ -147,23 +176,12 @@ trait FundTransactionHandling extends WalletLogger { self: Wallet =>
feeRate, feeRate,
change.scriptPubKey) change.scriptPubKey)
FundRawTxHelper(txBuilderWithFinalizer = txBuilder, val fundTxHelper = FundRawTxHelper(txBuilderWithFinalizer = txBuilder,
scriptSigParams = utxoSpendingInfos, scriptSigParams = utxoSpendingInfos,
feeRate) feeRate = feeRate,
} reservedUTXOsCallbackF = callbackF)
resultF.recoverWith { case NonFatal(error) => fundTxHelper
logger.error( }
s"Failed to reserve utxos for amount=${amt} feeRate=$feeRate, unreserving the selected utxos")
// un-reserve utxos since we failed to create valid spending infos
if (markAsReserved) {
for {
utxos <- selectedUtxosF
_ <- unmarkUTXOsAsReserved(utxos.map(_._1))
} yield error
} else Future.failed(error)
}
resultF
} }
} }

View file

@ -15,7 +15,6 @@ import org.bitcoins.core.protocol.script.ScriptPubKey
import org.bitcoins.core.protocol.{BitcoinAddress, BlockStamp} import org.bitcoins.core.protocol.{BitcoinAddress, BlockStamp}
import org.bitcoins.core.wallet.rescan.RescanState import org.bitcoins.core.wallet.rescan.RescanState
import org.bitcoins.crypto.DoubleSha256Digest import org.bitcoins.crypto.DoubleSha256Digest
import org.bitcoins.db.SafeDatabase
import org.bitcoins.wallet.{Wallet, WalletLogger} import org.bitcoins.wallet.{Wallet, WalletLogger}
import slick.dbio.{DBIOAction, Effect, NoStream} import slick.dbio.{DBIOAction, Effect, NoStream}
@ -28,7 +27,6 @@ private[wallet] trait RescanHandling extends WalletLogger {
///////////////////// /////////////////////
// Public facing API // Public facing API
private lazy val safeDatabase: SafeDatabase = addressDAO.safeDatabase
override def isRescanning(): Future[Boolean] = stateDescriptorDAO.isRescanning override def isRescanning(): Future[Boolean] = stateDescriptorDAO.isRescanning
/** @inheritdoc */ /** @inheritdoc */

View file

@ -17,7 +17,6 @@ import org.bitcoins.core.wallet.fee.FeeUnit
import org.bitcoins.core.wallet.utxo.TxoState._ import org.bitcoins.core.wallet.utxo.TxoState._
import org.bitcoins.core.wallet.utxo.{AddressTag, TxoState} import org.bitcoins.core.wallet.utxo.{AddressTag, TxoState}
import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.db.SafeDatabase
import org.bitcoins.wallet._ import org.bitcoins.wallet._
import scala.concurrent.{Future, Promise} import scala.concurrent.{Future, Promise}
@ -33,8 +32,6 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
import walletConfig.profile.api._ import walletConfig.profile.api._
private lazy val safeDatabase: SafeDatabase = transactionDAO.safeDatabase
///////////////////// /////////////////////
// Public facing API // Public facing API

View file

@ -4,12 +4,7 @@ import org.bitcoins.core.api.wallet.db._
import org.bitcoins.core.consensus.Consensus import org.bitcoins.core.consensus.Consensus
import org.bitcoins.core.hd.HDAccount import org.bitcoins.core.hd.HDAccount
import org.bitcoins.core.protocol.script.{P2WPKHWitnessSPKV0, P2WPKHWitnessV0} import org.bitcoins.core.protocol.script.{P2WPKHWitnessSPKV0, P2WPKHWitnessV0}
import org.bitcoins.core.protocol.transaction.{ import org.bitcoins.core.protocol.transaction._
CoinbaseInput,
Transaction,
TransactionOutPoint,
TransactionOutput
}
import org.bitcoins.core.util.BlockHashWithConfs import org.bitcoins.core.util.BlockHashWithConfs
import org.bitcoins.core.wallet.utxo.TxoState._ import org.bitcoins.core.wallet.utxo.TxoState._
import org.bitcoins.core.wallet.utxo._ import org.bitcoins.core.wallet.utxo._
@ -25,6 +20,7 @@ import scala.concurrent.Future
*/ */
private[wallet] trait UtxoHandling extends WalletLogger { private[wallet] trait UtxoHandling extends WalletLogger {
self: Wallet => self: Wallet =>
import walletConfig.profile.api._
/** @inheritdoc */ /** @inheritdoc */
def listDefaultAccountUtxos(): Future[Vector[SpendingInfoDb]] = def listDefaultAccountUtxos(): Future[Vector[SpendingInfoDb]] =
@ -296,13 +292,24 @@ private[wallet] trait UtxoHandling extends WalletLogger {
override def markUTXOsAsReserved( override def markUTXOsAsReserved(
utxos: Vector[SpendingInfoDb]): Future[Vector[SpendingInfoDb]] = { utxos: Vector[SpendingInfoDb]): Future[Vector[SpendingInfoDb]] = {
for {
(utxos, callbackF) <- safeDatabase.run(markUTXOsAsReservedAction(utxos))
_ <- callbackF
} yield utxos
}
protected def markUTXOsAsReservedAction(
utxos: Vector[SpendingInfoDb]): DBIOAction[
(Vector[SpendingInfoDb], Future[Unit]),
NoStream,
Effect.Read with Effect.Write] = {
val outPoints = utxos.map(_.outPoint) val outPoints = utxos.map(_.outPoint)
logger.info(s"Reserving utxos=$outPoints") logger.info(s"Reserving utxos=$outPoints")
val updated = utxos.map(_.copyWithState(TxoState.Reserved)) val updated = utxos.map(_.copyWithState(TxoState.Reserved))
for { spendingInfoDAO.markAsReservedAction(updated).map { utxos =>
utxos <- spendingInfoDAO.markAsReserved(updated) val callbackF = walletCallbacks.executeOnReservedUtxos(logger, utxos)
_ <- walletCallbacks.executeOnReservedUtxos(logger, utxos) (utxos, callbackF)
} yield utxos }
} }
/** @inheritdoc */ /** @inheritdoc */

View file

@ -457,9 +457,16 @@ case class SpendingInfoDAO()(implicit
UTXORecord.fromSpendingInfoDb(utxo, spks(utxo.output.scriptPubKey))) UTXORecord.fromSpendingInfoDb(utxo, spks(utxo.output.scriptPubKey)))
def findAllUnspent(): Future[Vector[SpendingInfoDb]] = { def findAllUnspent(): Future[Vector[SpendingInfoDb]] = {
safeDatabase.run(findAllUnspentAction())
}
def findAllUnspentAction(): DBIOAction[
Vector[SpendingInfoDb],
NoStream,
Effect.Read] = {
for { for {
utxos <- _findAllUnspent() utxos <- _findAllUnspentAction()
infos <- utxoToInfo(utxos) infos <- utxoToInfoAction(utxos)
} yield infos } yield infos
} }
@ -510,6 +517,14 @@ case class SpendingInfoDAO()(implicit
allUtxosF.map(filterUtxosByAccount(_, hdAccount)) allUtxosF.map(filterUtxosByAccount(_, hdAccount))
} }
def findAllUnspentForAccountAction(hdAccount: HDAccount): DBIOAction[
Vector[SpendingInfoDb],
NoStream,
Effect.Read] = {
val allUtxosA = findAllUnspentAction()
allUtxosA.map(filterUtxosByAccount(_, hdAccount))
}
def findAllForAccountAction(hdAccount: HDAccount): DBIOAction[ def findAllForAccountAction(hdAccount: HDAccount): DBIOAction[
Vector[SpendingInfoDb], Vector[SpendingInfoDb],
NoStream, NoStream,
@ -597,8 +612,11 @@ case class SpendingInfoDAO()(implicit
}) })
} }
def findAllUnspentForTag(tag: AddressTag): Future[Vector[SpendingInfoDb]] = { def findAllUnspentForTagAction(tag: AddressTag): DBIOAction[
val query = table Vector[SpendingInfoDb],
NoStream,
Effect.Read] = {
table
.join(spkTable) .join(spkTable)
.on(_.scriptPubKeyId === _.id) .on(_.scriptPubKeyId === _.id)
.filter(_._1.state.inSet(TxoState.receivedStates)) .filter(_._1.state.inSet(TxoState.receivedStates))
@ -608,24 +626,26 @@ case class SpendingInfoDAO()(implicit
.on(_._2.address === _.address) .on(_._2.address === _.address)
.filter(_._2.tagName === tag.tagName) .filter(_._2.tagName === tag.tagName)
.filter(_._2.tagType === tag.tagType) .filter(_._2.tagType === tag.tagType)
.result
safeDatabase .map(_.toVector)
.runVec(query.result)
.map(_.map { case (((utxoRecord, spkDb), _), _) => .map(_.map { case (((utxoRecord, spkDb), _), _) =>
utxoRecord.toSpendingInfoDb(spkDb.scriptPubKey) utxoRecord.toSpendingInfoDb(spkDb.scriptPubKey)
}) })
} }
def markAsReserved( def findAllUnspentForTag(tag: AddressTag): Future[Vector[SpendingInfoDb]] = {
ts: Vector[SpendingInfoDb]): Future[Vector[SpendingInfoDb]] = { safeDatabase.run(findAllUnspentForTagAction(tag))
}
def markAsReservedAction(ts: Vector[SpendingInfoDb]): DBIOAction[
Vector[SpendingInfoDb],
NoStream,
Effect.Read with Effect.Write] = {
//1. Check if any are reserved already //1. Check if any are reserved already
//2. if not, reserve them //2. if not, reserve them
//3. if they are reserved, throw an exception? //3. if they are reserved, throw an exception?
val outPoints = ts.map(_.outPoint) val outPoints = ts.map(_.outPoint)
val action: DBIOAction[ table
Int,
NoStream,
Effect.Write with Effect.Transactional] = table
.filter(_.outPoint.inSet(outPoints)) .filter(_.outPoint.inSet(outPoints))
.filter( .filter(
_.state.inSet(TxoState.receivedStates) _.state.inSet(TxoState.receivedStates)
@ -638,16 +658,17 @@ case class SpendingInfoDAO()(implicit
s"Failed to reserve all utxos, expected=${ts.length} actual=$count") s"Failed to reserve all utxos, expected=${ts.length} actual=$count")
DBIO.failed(exn) DBIO.failed(exn)
} else { } else {
DBIO.successful(count) DBIO.successful(count)
} }
} }
safeDatabase
.run(action)
.map(_ => ts.map(_.copyWithState(TxoState.Reserved))) .map(_ => ts.map(_.copyWithState(TxoState.Reserved)))
} }
def markAsReserved(
ts: Vector[SpendingInfoDb]): Future[Vector[SpendingInfoDb]] = {
safeDatabase.run(markAsReservedAction(ts))
}
def createOutPointsIndexIfNeeded(): Future[Unit] = Future { def createOutPointsIndexIfNeeded(): Future[Unit] = Future {
withStatement( withStatement(
s"CREATE UNIQUE INDEX IF NOT EXISTS utxo_outpoints ON $fullTableName (tx_outpoint)") { s"CREATE UNIQUE INDEX IF NOT EXISTS utxo_outpoints ON $fullTableName (tx_outpoint)") {