Use DBIOActions to speed up processing transactions (#4572)

This commit is contained in:
benthecarman 2022-08-04 14:40:31 -05:00 committed by GitHub
parent c03b158f94
commit 2fa7c39f64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 154 additions and 100 deletions

View File

@ -17,6 +17,7 @@ import org.bitcoins.core.wallet.fee.FeeUnit
import org.bitcoins.core.wallet.utxo.TxoState._
import org.bitcoins.core.wallet.utxo.{AddressTag, TxoState}
import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.db.SafeDatabase
import org.bitcoins.wallet._
import scala.concurrent.{Future, Promise}
@ -30,6 +31,10 @@ import scala.util.{Failure, Success, Try}
private[bitcoins] trait TransactionProcessing extends WalletLogger {
self: Wallet =>
import walletConfig.profile.api._
private lazy val safeDatabase: SafeDatabase = transactionDAO.safeDatabase
/////////////////////
// Public facing API
@ -38,9 +43,8 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
transaction: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE]
): Future[Wallet] = {
val relevantReceivedOutputsF = getRelevantOutputs(transaction)
for {
relevantReceivedOutputs <- relevantReceivedOutputsF
relevantReceivedOutputs <- getRelevantOutputs(transaction)
result <- processTransactionImpl(
transaction = transaction,
blockHashOpt = blockHashOpt,
@ -206,8 +210,17 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
protected def insertTransaction(
tx: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE]): Future[TransactionDb] = {
safeDatabase.run(insertTransactionAction(tx, blockHashOpt))
}
protected def insertTransactionAction(
tx: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE]): DBIOAction[
TransactionDb,
NoStream,
Effect.Write with Effect.Read] = {
val txDb = TransactionDbHelper.fromTransaction(tx, blockHashOpt)
transactionDAO.upsert(txDb)
transactionDAO.upsertAction(txDb)
}
private[wallet] def insertOutgoingTransaction(
@ -351,9 +364,8 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
insertTransaction(transaction, blockHashOpt)
else Future.unit
}
toBeUpdated = outputsBeingSpent
.map(markAsSpent(_, transaction.txIdBE))
.flatten
toBeUpdated = outputsBeingSpent.flatMap(
markAsSpent(_, transaction.txIdBE))
processed <- updateUtxoSpentConfirmedStates(toBeUpdated)
} yield {
processed
@ -568,22 +580,18 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
val spks = outputsWithIndex.map(_.output.scriptPubKey).toVector
val addressDbsF: Future[Vector[AddressDb]] = {
getAddressDbs(spks)
}
val addressDbWithOutputF = for {
addressDbs <- addressDbsF
val addressDbWithOutputA = for {
addressDbs <- addressDAO.findByScriptPubKeysAction(spks)
} yield {
if (addressDbs.isEmpty) {
logger.warn(
s"Found zero addresses in the database to match an output we have a script for, txid=${transaction.txIdBE.hex} outputs=${outputsWithIndex}")
s"Found zero addresses in the database to match an output we have a script for, txid=${transaction.txIdBE.hex} outputs=$outputsWithIndex")
}
matchAddressDbWithOutputs(addressDbs, outputsWithIndex.toVector)
}
val nested = for {
addressDbWithOutput <- addressDbWithOutputF
addressDbWithOutput <- safeDatabase.run(addressDbWithOutputA)
} yield {
val outputsVec = addressDbWithOutput.map { case (addressDb, out) =>
require(addressDb.scriptPubKey == out.output.scriptPubKey)
@ -600,16 +608,6 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
nested.flatten
}
/** Tries to convert the provided spk to an address, and then checks if we have
* it in our address table
*/
private def getAddressDbs(
spks: Vector[ScriptPubKey]): Future[Vector[AddressDb]] = {
val addressDbF: Future[Vector[AddressDb]] =
addressDAO.findByScriptPubKeys(spks)
addressDbF
}
/** Matches address dbs with outputs, drops addressDb/outputs that do not have matches */
private def matchAddressDbWithOutputs(
addressDbs: Vector[AddressDb],
@ -635,12 +633,14 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
private[wallet] def insertIncomingTransaction(
transaction: Transaction,
incomingAmount: CurrencyUnit,
blockHashOpt: Option[DoubleSha256DigestBE]): Future[
(TransactionDb, IncomingTransactionDb)] = {
blockHashOpt: Option[DoubleSha256DigestBE]): DBIOAction[
(TransactionDb, IncomingTransactionDb),
NoStream,
Effect.Read with Effect.Write] = {
val incomingDb = IncomingTransactionDb(transaction.txIdBE, incomingAmount)
for {
txDb <- insertTransaction(transaction, blockHashOpt)
written <- incomingTxDAO.upsert(incomingDb)
txDb <- insertTransactionAction(transaction, blockHashOpt)
written <- incomingTxDAO.upsertAction(incomingDb)
} yield (txDb, written)
}
@ -700,7 +700,7 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
} else {
val filteredOutputs =
transaction.outputs.zipWithIndex.filter(o =>
relevantReceivedOutputs.exists(_ == OutputWithIndex(o._1, o._2)))
relevantReceivedOutputs.contains(OutputWithIndex(o._1, o._2)))
if (filteredOutputs.isEmpty) {
//no relevant outputs in this tx, return early
@ -714,35 +714,35 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
}
val spks = relevantReceivedOutputsForTx.map(_.output.scriptPubKey)
val spksInDbF = addressDAO.findByScriptPubKeys(spks)
val spksInDbA = addressDAO.findByScriptPubKeysAction(spks)
val ourOutputsF = for {
spksInDb <- spksInDbF
val ourOutputsA = for {
spksInDb <- spksInDbA
} yield {
relevantReceivedOutputsForTx.collect {
case OutputWithIndex(out, idx)
if spksInDb.map(_.scriptPubKey).exists(_ == out.scriptPubKey) =>
if spksInDb.map(_.scriptPubKey).contains(out.scriptPubKey) =>
OutputWithIndex(out, idx)
}
}
val txDbF: Future[(TransactionDb, IncomingTransactionDb)] = for {
ourOutputs <- ourOutputsF
val txDbA = for {
ourOutputs <- ourOutputsA
totalIncoming = ourOutputs.map(_.output.value).sum
incomingTx <- insertIncomingTransaction(transaction,
totalIncoming,
blockHashOpt)
} yield incomingTx
val prevTagsDbF = for {
(txDb, _) <- txDbF
val prevTagsDbA = for {
(txDb, _) <- txDbA
prevTagDbs <-
addressTagDAO.findTx(txDb.transaction, networkParameters)
addressTagDAO.findTxAction(txDb.transaction, networkParameters)
} yield prevTagDbs
val newTagsF = for {
ourOutputs <- ourOutputsF
prevTagDbs <- prevTagsDbF
val newTagsA = for {
ourOutputs <- ourOutputsA
prevTagDbs <- prevTagsDbA
prevTags = prevTagDbs.map(_.addressTag)
tagsToUse = prevTags
.filterNot(tag => newTags.contains(tag)) ++ newTags
@ -751,14 +751,18 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
.fromScriptPubKey(out.output.scriptPubKey, networkParameters)
tagsToUse.map(tag => AddressTagDb(address, tag))
}
created <- addressTagDAO.upsertAll(newTagDbs)
created <- addressTagDAO.upsertAllAction(newTagDbs)
} yield created
val action = for {
(txDb, _) <- txDbA
ourOutputs <- ourOutputsA
_ <- newTagsA
} yield (ourOutputs, txDb)
for {
(txDb, _) <- txDbF
ourOutputs <- ourOutputsF
(ourOutputs, txDb) <- safeDatabase.run(action)
utxos <- addReceivedUTXOs(ourOutputs, txDb.transaction, blockHashOpt)
_ <- newTagsF
} yield utxos
}
}

View File

@ -316,17 +316,21 @@ case class AddressDAO()(implicit
def findByScriptPubKeys(
spks: Vector[ScriptPubKey]): Future[Vector[AddressDb]] = {
val query = table
safeDatabase.run(findByScriptPubKeysAction(spks))
}
def findByScriptPubKeysAction(spks: Vector[ScriptPubKey]): DBIOAction[
Vector[AddressDb],
NoStream,
Effect.Read] = {
table
.join(spkTable)
.on(_.scriptPubKeyId === _.id)
.filter(_._2.scriptPubKey.inSet(spks))
safeDatabase
.runVec(query.result)
.map(res =>
res.map { case (addrRec, spkRec) =>
addrRec.toAddressDb(spkRec.scriptPubKey)
})
.result
.map(_.map { case (addrRec, spkRec) =>
addrRec.toAddressDb(spkRec.scriptPubKey)
}.toVector)
}
private def findMostRecentForChain(

View File

@ -5,7 +5,6 @@ import org.bitcoins.core.api.wallet.db.AddressTagDb
import org.bitcoins.core.config.NetworkParameters
import org.bitcoins.core.protocol.BitcoinAddress
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.core.wallet.utxo.{
AddressTag,
AddressTagName,
@ -74,10 +73,15 @@ case class AddressTagDAO()(implicit
ts: Vector[AddressTagDb]): Query[Table[_], AddressTagDb, Seq] =
findByPrimaryKeys(ts.map(t => (t.address, t.tagType)))
def findByAddress(address: BitcoinAddress): Future[Vector[AddressTagDb]] = {
val query = table.filter(_.address === address)
def findByAddressAction(address: BitcoinAddress): DBIOAction[
Vector[AddressTagDb],
NoStream,
Effect.Read] = {
table.filter(_.address === address).result.map(_.toVector)
}
safeDatabase.run(query.result).map(_.toVector)
def findByAddress(address: BitcoinAddress): Future[Vector[AddressTagDb]] = {
safeDatabase.run(findByAddressAction(address))
}
def findByAddressAndTag(
@ -135,44 +139,52 @@ case class AddressTagDAO()(implicit
safeDatabase.run(query.delete)
}
def findTx(
tx: Transaction,
network: NetworkParameters): Future[Vector[AddressTagDb]] = {
def findTxAction(tx: Transaction, network: NetworkParameters): DBIOAction[
Vector[AddressTagDb],
NoStream,
Effect.Read] = {
val txIds = tx.inputs.map(_.previousOutput.txIdBE)
def findUtxos = {
val infoQuery = spendingInfoTable.filter(_.txid.inSet(txIds))
safeDatabase.runVec(infoQuery.result)
val findUtxosA = {
spendingInfoTable.filter(_.txid.inSet(txIds)).result.map(_.toVector)
}
def findSpks(ids: Vector[Long]) = {
val spkQuery = spkTable.filter(_.id.inSet(ids))
safeDatabase.runVec(spkQuery.result)
spkTable.filter(_.id.inSet(ids)).result.map(_.toVector)
}
def spendingInfosF =
val spendingInfosA =
for {
utxos <- findUtxos
utxos <- findUtxosA
spks <-
if (utxos.isEmpty) Future.successful(Vector.empty)
if (utxos.isEmpty) DBIO.successful(Vector.empty)
else findSpks(utxos.map(_.scriptPubKeyId))
} yield {
val spksMap = spks.map(spk => (spk.id.get, spk.scriptPubKey)).toMap
utxos.map(utxo => utxo.toSpendingInfoDb(spksMap(utxo.scriptPubKeyId)))
}
spendingInfosF.flatMap { spendingInfos =>
if (spendingInfos.isEmpty) {
Future.successful(Vector.empty)
} else {
val spks = spendingInfos.map(_.output.scriptPubKey)
val addresses =
spks.map(spk => BitcoinAddress.fromScriptPubKey(spk, network))
spendingInfosA
.flatMap { spendingInfos =>
if (spendingInfos.isEmpty) {
DBIO.successful(Vector.empty)
} else {
val spks = spendingInfos.map(_.output.scriptPubKey)
val addresses =
spks.map(spk => BitcoinAddress.fromScriptPubKey(spk, network))
val findByAddressFs = addresses.map(address => findByAddress(address))
FutureUtil.collect(findByAddressFs).map(_.flatten)
val findByAddressAs =
addresses.map(address => findByAddressAction(address))
DBIO.sequence(findByAddressAs).map(_.flatten)
}
}
}
.map(_.toVector)
}
def findTx(
tx: Transaction,
network: NetworkParameters): Future[Vector[AddressTagDb]] = {
safeDatabase.run(findTxAction(tx, network))
}
def deleteByAddressesAction(addresses: Vector[BitcoinAddress]): DBIOAction[

View File

@ -6,7 +6,6 @@ import org.bitcoins.core.script.ScriptType
import org.bitcoins.crypto.Sha256Digest
import org.bitcoins.db.CRUDAutoInc
import org.bitcoins.wallet.config.WalletAppConfig
import slick.dbio.DBIOAction
import scala.concurrent.{ExecutionContext, Future}
@ -29,7 +28,7 @@ case class ScriptPubKeyDAO()(implicit
spkOpt <- spkFind.headOption
spk <- spkOpt match {
case Some(foundSpk) =>
DBIOAction.successful(foundSpk)
DBIO.successful(foundSpk)
case None =>
for {
newSpkId <- (table returning table.map(_.id)) += spkDb
@ -48,17 +47,24 @@ case class ScriptPubKeyDAO()(implicit
foundVecF.map(_.headOption)
}
/** Searches for the given set of spks and returns the ones that exist in the db */
def findScriptPubKeys(
spks: Vector[ScriptPubKey]): Future[Vector[ScriptPubKeyDb]] = {
def findScriptPubKeysAction(spks: Vector[ScriptPubKey]): DBIOAction[
Vector[ScriptPubKeyDb],
NoStream,
Effect.Read] = {
val hashes = spks.map(ScriptPubKeyDb.hash)
//group hashes to avoid https://github.com/bitcoin-s/bitcoin-s/issues/4220
val groupedHashes: Vector[Vector[Sha256Digest]] =
hashes.grouped(1000).toVector
val actions =
groupedHashes.map(hashes => table.filter(_.hash.inSet(hashes)).result)
val sequenced = DBIOAction.sequence(actions).map(_.flatten)
safeDatabase.runVec(sequenced)
DBIO.sequence(actions).map(_.flatten.toVector)
}
/** Searches for the given set of spks and returns the ones that exist in the db */
def findScriptPubKeys(
spks: Vector[ScriptPubKey]): Future[Vector[ScriptPubKeyDb]] = {
val action = findScriptPubKeysAction(spks)
safeDatabase.run(action)
}
case class ScriptPubKeyTable(tag: Tag)

View File

@ -257,6 +257,21 @@ case class SpendingInfoDAO()(implicit
findOutputsReceived(txs.map(_.txIdBE))
}
/** Fetches all received txos in our db that are in the given txs */
def findTxsAction(txs: Vector[Transaction]): DBIOAction[
Vector[SpendingInfoDb],
NoStream,
Effect.Read] = {
findOutputsReceivedAction(txs.map(_.txIdBE))
}
def findTxAction(tx: Transaction): DBIOAction[
Vector[SpendingInfoDb],
NoStream,
Effect.Read] = {
findTxsAction(Vector(tx))
}
private def _findOutputsBeingSpentQuery(txs: Vector[Transaction]): Vector[
Query[SpendingInfoTable, UTXORecord, Seq]] = {
val outPoints: Vector[TransactionOutPoint] = txs
@ -291,6 +306,14 @@ case class SpendingInfoDAO()(implicit
def findOutputsBeingSpent(
txs: Vector[Transaction]): Future[Vector[SpendingInfoDb]] = {
val action = findOutputsBeingSpentAction(txs)
safeDatabase.run(action)
}
def findOutputsBeingSpentAction(txs: Vector[Transaction]): DBIOAction[
Vector[SpendingInfoDb],
NoStream,
Effect.Read] = {
val queries = findOutputsBeingSpentQuery(txs)
val actions: Vector[DBIOAction[
Vector[(UTXORecord, ScriptPubKeyDb)],
@ -301,17 +324,11 @@ case class SpendingInfoDAO()(implicit
val action: DBIOAction[
Vector[(UTXORecord, ScriptPubKeyDb)],
NoStream,
Effect.Read] = DBIO.sequence(actions).map(_.flatten)
Effect.Read] = DBIO.sequence(actions).map(_.flatten.toVector)
val resultsF = safeDatabase.runVec(action)
for {
results <- resultsF
} yield {
results.map { case (utxo, spk) =>
utxo.toSpendingInfoDb(spk.scriptPubKey)
}
}
action.map(_.map { case (utxo, spk) =>
utxo.toSpendingInfoDb(spk.scriptPubKey)
})
}
/** Given a TXID, fetches all incoming TXOs and the address the TXO pays to
@ -355,17 +372,28 @@ case class SpendingInfoDAO()(implicit
/** Fetches all the incoming TXOs in our DB that are in
* the transaction with the given TXID
*/
def findOutputsReceived(
txids: Vector[DoubleSha256DigestBE]): Future[Vector[SpendingInfoDb]] = {
val filtered = spkJoinQuery
def findOutputsReceivedAction(
txids: Vector[DoubleSha256DigestBE]): DBIOAction[
Vector[SpendingInfoDb],
NoStream,
Effect.Read] = {
spkJoinQuery
.filter(_._1.state.inSet(TxoState.receivedStates))
.filter(_._1.txid.inSet(txids))
safeDatabase
.runVec(filtered.result)
.result
.map(res =>
res.map { case (utxoRec, spkRec) =>
utxoRec.toSpendingInfoDb(spkRec.scriptPubKey)
})
.map(_.toVector)
}
/** Fetches all the incoming TXOs in our DB that are in
* the transaction with the given TXID
*/
def findOutputsReceived(
txids: Vector[DoubleSha256DigestBE]): Future[Vector[SpendingInfoDb]] = {
safeDatabase.run(findOutputsReceivedAction(txids))
}
def findByScriptPubKey(