diff --git a/wallet/src/main/scala/org/bitcoins/wallet/internal/TransactionProcessing.scala b/wallet/src/main/scala/org/bitcoins/wallet/internal/TransactionProcessing.scala index 27803a561c..0901d49595 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/internal/TransactionProcessing.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/internal/TransactionProcessing.scala @@ -17,6 +17,7 @@ import org.bitcoins.core.wallet.fee.FeeUnit import org.bitcoins.core.wallet.utxo.TxoState._ import org.bitcoins.core.wallet.utxo.{AddressTag, TxoState} import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} +import org.bitcoins.db.SafeDatabase import org.bitcoins.wallet._ import scala.concurrent.{Future, Promise} @@ -30,6 +31,10 @@ import scala.util.{Failure, Success, Try} private[bitcoins] trait TransactionProcessing extends WalletLogger { self: Wallet => + import walletConfig.profile.api._ + + private lazy val safeDatabase: SafeDatabase = transactionDAO.safeDatabase + ///////////////////// // Public facing API @@ -38,9 +43,8 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { transaction: Transaction, blockHashOpt: Option[DoubleSha256DigestBE] ): Future[Wallet] = { - val relevantReceivedOutputsF = getRelevantOutputs(transaction) for { - relevantReceivedOutputs <- relevantReceivedOutputsF + relevantReceivedOutputs <- getRelevantOutputs(transaction) result <- processTransactionImpl( transaction = transaction, blockHashOpt = blockHashOpt, @@ -206,8 +210,17 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { protected def insertTransaction( tx: Transaction, blockHashOpt: Option[DoubleSha256DigestBE]): Future[TransactionDb] = { + safeDatabase.run(insertTransactionAction(tx, blockHashOpt)) + } + + protected def insertTransactionAction( + tx: Transaction, + blockHashOpt: Option[DoubleSha256DigestBE]): DBIOAction[ + TransactionDb, + NoStream, + Effect.Write with Effect.Read] = { val txDb = TransactionDbHelper.fromTransaction(tx, blockHashOpt) - transactionDAO.upsert(txDb) + transactionDAO.upsertAction(txDb) } private[wallet] def insertOutgoingTransaction( @@ -351,9 +364,8 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { insertTransaction(transaction, blockHashOpt) else Future.unit } - toBeUpdated = outputsBeingSpent - .map(markAsSpent(_, transaction.txIdBE)) - .flatten + toBeUpdated = outputsBeingSpent.flatMap( + markAsSpent(_, transaction.txIdBE)) processed <- updateUtxoSpentConfirmedStates(toBeUpdated) } yield { processed @@ -568,22 +580,18 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { val spks = outputsWithIndex.map(_.output.scriptPubKey).toVector - val addressDbsF: Future[Vector[AddressDb]] = { - getAddressDbs(spks) - } - - val addressDbWithOutputF = for { - addressDbs <- addressDbsF + val addressDbWithOutputA = for { + addressDbs <- addressDAO.findByScriptPubKeysAction(spks) } yield { if (addressDbs.isEmpty) { logger.warn( - s"Found zero addresses in the database to match an output we have a script for, txid=${transaction.txIdBE.hex} outputs=${outputsWithIndex}") + s"Found zero addresses in the database to match an output we have a script for, txid=${transaction.txIdBE.hex} outputs=$outputsWithIndex") } matchAddressDbWithOutputs(addressDbs, outputsWithIndex.toVector) } val nested = for { - addressDbWithOutput <- addressDbWithOutputF + addressDbWithOutput <- safeDatabase.run(addressDbWithOutputA) } yield { val outputsVec = addressDbWithOutput.map { case (addressDb, out) => require(addressDb.scriptPubKey == out.output.scriptPubKey) @@ -600,16 +608,6 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { nested.flatten } - /** Tries to convert the provided spk to an address, and then checks if we have - * it in our address table - */ - private def getAddressDbs( - spks: Vector[ScriptPubKey]): Future[Vector[AddressDb]] = { - val addressDbF: Future[Vector[AddressDb]] = - addressDAO.findByScriptPubKeys(spks) - addressDbF - } - /** Matches address dbs with outputs, drops addressDb/outputs that do not have matches */ private def matchAddressDbWithOutputs( addressDbs: Vector[AddressDb], @@ -635,12 +633,14 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { private[wallet] def insertIncomingTransaction( transaction: Transaction, incomingAmount: CurrencyUnit, - blockHashOpt: Option[DoubleSha256DigestBE]): Future[ - (TransactionDb, IncomingTransactionDb)] = { + blockHashOpt: Option[DoubleSha256DigestBE]): DBIOAction[ + (TransactionDb, IncomingTransactionDb), + NoStream, + Effect.Read with Effect.Write] = { val incomingDb = IncomingTransactionDb(transaction.txIdBE, incomingAmount) for { - txDb <- insertTransaction(transaction, blockHashOpt) - written <- incomingTxDAO.upsert(incomingDb) + txDb <- insertTransactionAction(transaction, blockHashOpt) + written <- incomingTxDAO.upsertAction(incomingDb) } yield (txDb, written) } @@ -700,7 +700,7 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { } else { val filteredOutputs = transaction.outputs.zipWithIndex.filter(o => - relevantReceivedOutputs.exists(_ == OutputWithIndex(o._1, o._2))) + relevantReceivedOutputs.contains(OutputWithIndex(o._1, o._2))) if (filteredOutputs.isEmpty) { //no relevant outputs in this tx, return early @@ -714,35 +714,35 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { } val spks = relevantReceivedOutputsForTx.map(_.output.scriptPubKey) - val spksInDbF = addressDAO.findByScriptPubKeys(spks) + val spksInDbA = addressDAO.findByScriptPubKeysAction(spks) - val ourOutputsF = for { - spksInDb <- spksInDbF + val ourOutputsA = for { + spksInDb <- spksInDbA } yield { relevantReceivedOutputsForTx.collect { case OutputWithIndex(out, idx) - if spksInDb.map(_.scriptPubKey).exists(_ == out.scriptPubKey) => + if spksInDb.map(_.scriptPubKey).contains(out.scriptPubKey) => OutputWithIndex(out, idx) } } - val txDbF: Future[(TransactionDb, IncomingTransactionDb)] = for { - ourOutputs <- ourOutputsF + val txDbA = for { + ourOutputs <- ourOutputsA totalIncoming = ourOutputs.map(_.output.value).sum incomingTx <- insertIncomingTransaction(transaction, totalIncoming, blockHashOpt) } yield incomingTx - val prevTagsDbF = for { - (txDb, _) <- txDbF + val prevTagsDbA = for { + (txDb, _) <- txDbA prevTagDbs <- - addressTagDAO.findTx(txDb.transaction, networkParameters) + addressTagDAO.findTxAction(txDb.transaction, networkParameters) } yield prevTagDbs - val newTagsF = for { - ourOutputs <- ourOutputsF - prevTagDbs <- prevTagsDbF + val newTagsA = for { + ourOutputs <- ourOutputsA + prevTagDbs <- prevTagsDbA prevTags = prevTagDbs.map(_.addressTag) tagsToUse = prevTags .filterNot(tag => newTags.contains(tag)) ++ newTags @@ -751,14 +751,18 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { .fromScriptPubKey(out.output.scriptPubKey, networkParameters) tagsToUse.map(tag => AddressTagDb(address, tag)) } - created <- addressTagDAO.upsertAll(newTagDbs) + created <- addressTagDAO.upsertAllAction(newTagDbs) } yield created + val action = for { + (txDb, _) <- txDbA + ourOutputs <- ourOutputsA + _ <- newTagsA + } yield (ourOutputs, txDb) + for { - (txDb, _) <- txDbF - ourOutputs <- ourOutputsF + (ourOutputs, txDb) <- safeDatabase.run(action) utxos <- addReceivedUTXOs(ourOutputs, txDb.transaction, blockHashOpt) - _ <- newTagsF } yield utxos } } diff --git a/wallet/src/main/scala/org/bitcoins/wallet/models/AddressDAO.scala b/wallet/src/main/scala/org/bitcoins/wallet/models/AddressDAO.scala index a4d6d01086..97124bb800 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/models/AddressDAO.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/models/AddressDAO.scala @@ -316,17 +316,21 @@ case class AddressDAO()(implicit def findByScriptPubKeys( spks: Vector[ScriptPubKey]): Future[Vector[AddressDb]] = { - val query = table + safeDatabase.run(findByScriptPubKeysAction(spks)) + } + + def findByScriptPubKeysAction(spks: Vector[ScriptPubKey]): DBIOAction[ + Vector[AddressDb], + NoStream, + Effect.Read] = { + table .join(spkTable) .on(_.scriptPubKeyId === _.id) .filter(_._2.scriptPubKey.inSet(spks)) - - safeDatabase - .runVec(query.result) - .map(res => - res.map { case (addrRec, spkRec) => - addrRec.toAddressDb(spkRec.scriptPubKey) - }) + .result + .map(_.map { case (addrRec, spkRec) => + addrRec.toAddressDb(spkRec.scriptPubKey) + }.toVector) } private def findMostRecentForChain( diff --git a/wallet/src/main/scala/org/bitcoins/wallet/models/AddressTagDAO.scala b/wallet/src/main/scala/org/bitcoins/wallet/models/AddressTagDAO.scala index 7eaec97120..cc2ceaee50 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/models/AddressTagDAO.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/models/AddressTagDAO.scala @@ -5,7 +5,6 @@ import org.bitcoins.core.api.wallet.db.AddressTagDb import org.bitcoins.core.config.NetworkParameters import org.bitcoins.core.protocol.BitcoinAddress import org.bitcoins.core.protocol.transaction.Transaction -import org.bitcoins.core.util.FutureUtil import org.bitcoins.core.wallet.utxo.{ AddressTag, AddressTagName, @@ -74,10 +73,15 @@ case class AddressTagDAO()(implicit ts: Vector[AddressTagDb]): Query[Table[_], AddressTagDb, Seq] = findByPrimaryKeys(ts.map(t => (t.address, t.tagType))) - def findByAddress(address: BitcoinAddress): Future[Vector[AddressTagDb]] = { - val query = table.filter(_.address === address) + def findByAddressAction(address: BitcoinAddress): DBIOAction[ + Vector[AddressTagDb], + NoStream, + Effect.Read] = { + table.filter(_.address === address).result.map(_.toVector) + } - safeDatabase.run(query.result).map(_.toVector) + def findByAddress(address: BitcoinAddress): Future[Vector[AddressTagDb]] = { + safeDatabase.run(findByAddressAction(address)) } def findByAddressAndTag( @@ -135,44 +139,52 @@ case class AddressTagDAO()(implicit safeDatabase.run(query.delete) } - def findTx( - tx: Transaction, - network: NetworkParameters): Future[Vector[AddressTagDb]] = { + def findTxAction(tx: Transaction, network: NetworkParameters): DBIOAction[ + Vector[AddressTagDb], + NoStream, + Effect.Read] = { val txIds = tx.inputs.map(_.previousOutput.txIdBE) - def findUtxos = { - val infoQuery = spendingInfoTable.filter(_.txid.inSet(txIds)) - safeDatabase.runVec(infoQuery.result) + val findUtxosA = { + spendingInfoTable.filter(_.txid.inSet(txIds)).result.map(_.toVector) } def findSpks(ids: Vector[Long]) = { - val spkQuery = spkTable.filter(_.id.inSet(ids)) - safeDatabase.runVec(spkQuery.result) + spkTable.filter(_.id.inSet(ids)).result.map(_.toVector) } - def spendingInfosF = + val spendingInfosA = for { - utxos <- findUtxos + utxos <- findUtxosA spks <- - if (utxos.isEmpty) Future.successful(Vector.empty) + if (utxos.isEmpty) DBIO.successful(Vector.empty) else findSpks(utxos.map(_.scriptPubKeyId)) } yield { val spksMap = spks.map(spk => (spk.id.get, spk.scriptPubKey)).toMap utxos.map(utxo => utxo.toSpendingInfoDb(spksMap(utxo.scriptPubKeyId))) } - spendingInfosF.flatMap { spendingInfos => - if (spendingInfos.isEmpty) { - Future.successful(Vector.empty) - } else { - val spks = spendingInfos.map(_.output.scriptPubKey) - val addresses = - spks.map(spk => BitcoinAddress.fromScriptPubKey(spk, network)) + spendingInfosA + .flatMap { spendingInfos => + if (spendingInfos.isEmpty) { + DBIO.successful(Vector.empty) + } else { + val spks = spendingInfos.map(_.output.scriptPubKey) + val addresses = + spks.map(spk => BitcoinAddress.fromScriptPubKey(spk, network)) - val findByAddressFs = addresses.map(address => findByAddress(address)) - FutureUtil.collect(findByAddressFs).map(_.flatten) + val findByAddressAs = + addresses.map(address => findByAddressAction(address)) + DBIO.sequence(findByAddressAs).map(_.flatten) + } } - } + .map(_.toVector) + } + + def findTx( + tx: Transaction, + network: NetworkParameters): Future[Vector[AddressTagDb]] = { + safeDatabase.run(findTxAction(tx, network)) } def deleteByAddressesAction(addresses: Vector[BitcoinAddress]): DBIOAction[ diff --git a/wallet/src/main/scala/org/bitcoins/wallet/models/ScriptPubKeyDAO.scala b/wallet/src/main/scala/org/bitcoins/wallet/models/ScriptPubKeyDAO.scala index 64a6b66afc..ea724ddc48 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/models/ScriptPubKeyDAO.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/models/ScriptPubKeyDAO.scala @@ -6,7 +6,6 @@ import org.bitcoins.core.script.ScriptType import org.bitcoins.crypto.Sha256Digest import org.bitcoins.db.CRUDAutoInc import org.bitcoins.wallet.config.WalletAppConfig -import slick.dbio.DBIOAction import scala.concurrent.{ExecutionContext, Future} @@ -29,7 +28,7 @@ case class ScriptPubKeyDAO()(implicit spkOpt <- spkFind.headOption spk <- spkOpt match { case Some(foundSpk) => - DBIOAction.successful(foundSpk) + DBIO.successful(foundSpk) case None => for { newSpkId <- (table returning table.map(_.id)) += spkDb @@ -48,17 +47,24 @@ case class ScriptPubKeyDAO()(implicit foundVecF.map(_.headOption) } - /** Searches for the given set of spks and returns the ones that exist in the db */ - def findScriptPubKeys( - spks: Vector[ScriptPubKey]): Future[Vector[ScriptPubKeyDb]] = { + def findScriptPubKeysAction(spks: Vector[ScriptPubKey]): DBIOAction[ + Vector[ScriptPubKeyDb], + NoStream, + Effect.Read] = { val hashes = spks.map(ScriptPubKeyDb.hash) //group hashes to avoid https://github.com/bitcoin-s/bitcoin-s/issues/4220 val groupedHashes: Vector[Vector[Sha256Digest]] = hashes.grouped(1000).toVector val actions = groupedHashes.map(hashes => table.filter(_.hash.inSet(hashes)).result) - val sequenced = DBIOAction.sequence(actions).map(_.flatten) - safeDatabase.runVec(sequenced) + DBIO.sequence(actions).map(_.flatten.toVector) + } + + /** Searches for the given set of spks and returns the ones that exist in the db */ + def findScriptPubKeys( + spks: Vector[ScriptPubKey]): Future[Vector[ScriptPubKeyDb]] = { + val action = findScriptPubKeysAction(spks) + safeDatabase.run(action) } case class ScriptPubKeyTable(tag: Tag) diff --git a/wallet/src/main/scala/org/bitcoins/wallet/models/SpendingInfoDAO.scala b/wallet/src/main/scala/org/bitcoins/wallet/models/SpendingInfoDAO.scala index d54b521ce6..c7e9c2f004 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/models/SpendingInfoDAO.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/models/SpendingInfoDAO.scala @@ -257,6 +257,21 @@ case class SpendingInfoDAO()(implicit findOutputsReceived(txs.map(_.txIdBE)) } + /** Fetches all received txos in our db that are in the given txs */ + def findTxsAction(txs: Vector[Transaction]): DBIOAction[ + Vector[SpendingInfoDb], + NoStream, + Effect.Read] = { + findOutputsReceivedAction(txs.map(_.txIdBE)) + } + + def findTxAction(tx: Transaction): DBIOAction[ + Vector[SpendingInfoDb], + NoStream, + Effect.Read] = { + findTxsAction(Vector(tx)) + } + private def _findOutputsBeingSpentQuery(txs: Vector[Transaction]): Vector[ Query[SpendingInfoTable, UTXORecord, Seq]] = { val outPoints: Vector[TransactionOutPoint] = txs @@ -291,6 +306,14 @@ case class SpendingInfoDAO()(implicit def findOutputsBeingSpent( txs: Vector[Transaction]): Future[Vector[SpendingInfoDb]] = { + val action = findOutputsBeingSpentAction(txs) + safeDatabase.run(action) + } + + def findOutputsBeingSpentAction(txs: Vector[Transaction]): DBIOAction[ + Vector[SpendingInfoDb], + NoStream, + Effect.Read] = { val queries = findOutputsBeingSpentQuery(txs) val actions: Vector[DBIOAction[ Vector[(UTXORecord, ScriptPubKeyDb)], @@ -301,17 +324,11 @@ case class SpendingInfoDAO()(implicit val action: DBIOAction[ Vector[(UTXORecord, ScriptPubKeyDb)], NoStream, - Effect.Read] = DBIO.sequence(actions).map(_.flatten) + Effect.Read] = DBIO.sequence(actions).map(_.flatten.toVector) - val resultsF = safeDatabase.runVec(action) - - for { - results <- resultsF - } yield { - results.map { case (utxo, spk) => - utxo.toSpendingInfoDb(spk.scriptPubKey) - } - } + action.map(_.map { case (utxo, spk) => + utxo.toSpendingInfoDb(spk.scriptPubKey) + }) } /** Given a TXID, fetches all incoming TXOs and the address the TXO pays to @@ -355,17 +372,28 @@ case class SpendingInfoDAO()(implicit /** Fetches all the incoming TXOs in our DB that are in * the transaction with the given TXID */ - def findOutputsReceived( - txids: Vector[DoubleSha256DigestBE]): Future[Vector[SpendingInfoDb]] = { - val filtered = spkJoinQuery + def findOutputsReceivedAction( + txids: Vector[DoubleSha256DigestBE]): DBIOAction[ + Vector[SpendingInfoDb], + NoStream, + Effect.Read] = { + spkJoinQuery .filter(_._1.state.inSet(TxoState.receivedStates)) .filter(_._1.txid.inSet(txids)) - safeDatabase - .runVec(filtered.result) + .result .map(res => res.map { case (utxoRec, spkRec) => utxoRec.toSpendingInfoDb(spkRec.scriptPubKey) }) + .map(_.toVector) + } + + /** Fetches all the incoming TXOs in our DB that are in + * the transaction with the given TXID + */ + def findOutputsReceived( + txids: Vector[DoubleSha256DigestBE]): Future[Vector[SpendingInfoDb]] = { + safeDatabase.run(findOutputsReceivedAction(txids)) } def findByScriptPubKey(