diff --git a/core/src/main/scala/org/bitcoins/core/protocol/transaction/TransactionOutput.scala b/core/src/main/scala/org/bitcoins/core/protocol/transaction/TransactionOutput.scala index 4234a182d2..6ad2d178dc 100644 --- a/core/src/main/scala/org/bitcoins/core/protocol/transaction/TransactionOutput.scala +++ b/core/src/main/scala/org/bitcoins/core/protocol/transaction/TransactionOutput.scala @@ -27,3 +27,5 @@ object TransactionOutput extends Factory[TransactionOutput] { RawTransactionOutputParser.read(bytes) } + +case class OutputWithIndex(output: TransactionOutput, index: Int) diff --git a/dlc-wallet/src/main/scala/org/bitcoins/dlc/wallet/internal/DLCTransactionProcessing.scala b/dlc-wallet/src/main/scala/org/bitcoins/dlc/wallet/internal/DLCTransactionProcessing.scala index 8754638da3..a5c088f7fe 100644 --- a/dlc-wallet/src/main/scala/org/bitcoins/dlc/wallet/internal/DLCTransactionProcessing.scala +++ b/dlc-wallet/src/main/scala/org/bitcoins/dlc/wallet/internal/DLCTransactionProcessing.scala @@ -7,7 +7,11 @@ import org.bitcoins.core.protocol.dlc.models.DLCMessage._ import org.bitcoins.core.protocol.dlc.models._ import org.bitcoins.core.protocol.script._ import org.bitcoins.core.protocol.tlv._ -import org.bitcoins.core.protocol.transaction.{Transaction, WitnessTransaction} +import org.bitcoins.core.protocol.transaction.{ + OutputWithIndex, + Transaction, + WitnessTransaction +} import org.bitcoins.core.psbt.InputPSBTRecord.PartialSignature import org.bitcoins.core.util.FutureUtil import org.bitcoins.core.wallet.utxo.AddressTag @@ -236,9 +240,15 @@ private[bitcoins] trait DLCTransactionProcessing extends TransactionProcessing { tx: Transaction, blockHashOpt: Option[DoubleSha256DigestBE], spendingInfoDbs: Vector[SpendingInfoDb], - newTags: Vector[AddressTag]): Future[Vector[SpendingInfoDb]] = { + newTags: Vector[AddressTag], + relevantReceivedOutputs: Vector[OutputWithIndex]): Future[ + Vector[SpendingInfoDb]] = { super - .processReceivedUtxos(tx, blockHashOpt, spendingInfoDbs, newTags) + .processReceivedUtxos(tx, + blockHashOpt, + spendingInfoDbs, + newTags, + relevantReceivedOutputs) .flatMap { res => for { dlcDbs <- dlcDAO.findByFundingTxId(tx.txIdBE) diff --git a/wallet-test/src/test/scala/org/bitcoins/wallet/ProcessBlockTest.scala b/wallet-test/src/test/scala/org/bitcoins/wallet/ProcessBlockTest.scala index 67f79fab3f..8b8392dade 100644 --- a/wallet-test/src/test/scala/org/bitcoins/wallet/ProcessBlockTest.scala +++ b/wallet-test/src/test/scala/org/bitcoins/wallet/ProcessBlockTest.scala @@ -51,7 +51,6 @@ class ProcessBlockTest extends BitcoinSWalletTestCachedBitcoinV19 { _ <- wallet.processBlock(block) utxos <- wallet.listUtxos() - height <- bitcoind.getBlockCount bestHash <- bitcoind.getBestBlockHash syncHeightOpt <- wallet.getSyncDescriptorOpt() diff --git a/wallet/src/main/scala/org/bitcoins/wallet/internal/TransactionProcessing.scala b/wallet/src/main/scala/org/bitcoins/wallet/internal/TransactionProcessing.scala index 1f7a7197e0..8abd86ca85 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/internal/TransactionProcessing.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/internal/TransactionProcessing.scala @@ -7,9 +7,9 @@ import org.bitcoins.core.protocol.BitcoinAddress import org.bitcoins.core.protocol.blockchain.Block import org.bitcoins.core.protocol.script.ScriptPubKey import org.bitcoins.core.protocol.transaction.{ + OutputWithIndex, Transaction, - TransactionOutPoint, - TransactionOutput + TransactionOutPoint } import org.bitcoins.core.util.TimeUtil import org.bitcoins.core.wallet.fee.FeeUnit @@ -37,12 +37,17 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { transaction: Transaction, blockHashOpt: Option[DoubleSha256DigestBE] ): Future[Wallet] = { + val relevantReceivedOutputsF = getRelevantOutputs(transaction) for { - result <- processTransactionImpl(transaction = transaction, - blockHashOpt = blockHashOpt, - newTags = Vector.empty, - receivedSpendingInfoDbsOpt = None, - spentSpendingInfoDbsOpt = None) + relevantReceivedOutputs <- relevantReceivedOutputsF + result <- processTransactionImpl( + transaction = transaction, + blockHashOpt = blockHashOpt, + newTags = Vector.empty, + receivedSpendingInfoDbsOpt = None, + spentSpendingInfoDbsOpt = None, + relevantReceivedOutputs + ) } yield { if (result.updatedIncoming.nonEmpty || result.updatedOutgoing.nonEmpty) { logger.info( @@ -118,12 +123,19 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { spendingInfoDAO.findOutputsBeingSpent(block.transactions.toVector) val blockHashOpt = Some(block.blockHeader.hash.flip) + + //fetch all outputs we may have received in this block in advance + //as an optimization + val relevantReceivedOutputsForBlockF = getRelevantOutputsForBlock(block) + val resultF: Future[Future[Wallet]] = for { //map on these first so we don't have to call //.map everytime we iterate through a tx //which is costly (thread overhead) receivedSpendingInfoDbsOpt <- cachedReceivedOptF spentSpendingInfoDbs <- spentSpendingInfoDbsF + relevantReceivedOutputsForBlock <- + relevantReceivedOutputsForBlockF } yield { //we need to keep a cache of spentSpendingInfoDb //for the case where we receive & then spend that @@ -143,7 +155,8 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { blockHashOpt = blockHashOpt, newTags = Vector.empty, receivedSpendingInfoDbsOpt = receivedSpendingInfoDbsOpt, - spentSpendingInfoDbsOpt = cachedSpentOpt + spentSpendingInfoDbsOpt = cachedSpentOpt, + relevantReceivedOutputs = relevantReceivedOutputsForBlock ) } _ = { @@ -232,6 +245,7 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { logger.info( s"Processing TX from our wallet, transaction=${transaction.txIdBE.hex} with blockHash=${blockHashOpt .map(_.hex)}") + val relevantOutputsF = getRelevantOutputs(transaction) for { (txDb, _) <- insertOutgoingTransaction(transaction, @@ -239,11 +253,13 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { inputAmount, sentAmount, blockHashOpt) + relevantOutputs <- relevantOutputsF result <- processTransactionImpl(transaction = txDb.transaction, blockHashOpt = blockHashOpt, newTags = newTags, receivedSpendingInfoDbsOpt = None, - spentSpendingInfoDbsOpt = None) + spentSpendingInfoDbsOpt = None, + relevantOutputs) } yield { val txid = txDb.transaction.txIdBE val changeOutputs = result.updatedIncoming.length @@ -302,9 +318,19 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { transaction: Transaction, blockHashOpt: Option[DoubleSha256DigestBE], spendingInfoDbs: Vector[SpendingInfoDb], - newTags: Vector[AddressTag]): Future[Vector[SpendingInfoDb]] = { - if (spendingInfoDbs.isEmpty) { - processNewReceivedTx(transaction, blockHashOpt, newTags) + newTags: Vector[AddressTag], + relevantReceivedOutputs: Vector[OutputWithIndex]): Future[ + Vector[SpendingInfoDb]] = { + if (spendingInfoDbs.isEmpty && relevantReceivedOutputs.isEmpty) { + //as an optimization if we don't have any relevant utxos + //and any relevant outputs that match scripts in our wallet + //we can just return now + Future.successful(Vector.empty) + } else if (spendingInfoDbs.isEmpty) { + processNewReceivedTx(transaction, + blockHashOpt, + newTags, + relevantReceivedOutputs) .map(_.toVector) } else { val processedVec = spendingInfoDbs.map { txo => @@ -347,13 +373,13 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { blockHashOpt: Option[DoubleSha256DigestBE], newTags: Vector[AddressTag], receivedSpendingInfoDbsOpt: Option[Vector[SpendingInfoDb]], - spentSpendingInfoDbsOpt: Option[Vector[SpendingInfoDb]]): Future[ + spentSpendingInfoDbsOpt: Option[Vector[SpendingInfoDb]], + relevantReceivedOutputs: Vector[OutputWithIndex]): Future[ ProcessTxResult] = { logger.debug( s"Processing transaction=${transaction.txIdBE.hex} with blockHash=${blockHashOpt .map(_.hex)}") - val receivedSpendingInfoDbsF: Future[Vector[SpendingInfoDb]] = { receivedSpendingInfoDbsOpt match { case Some(received) => @@ -366,6 +392,7 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { } } + val spentSpendingInfoDbsF: Future[Vector[SpendingInfoDb]] = { spentSpendingInfoDbsOpt match { case Some(spent) => @@ -383,11 +410,13 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { for { receivedSpendingInfoDbs <- receivedSpendingInfoDbsF receivedStart = TimeUtil.currentEpochMs - incoming <- processReceivedUtxos(transaction = transaction, - blockHashOpt = blockHashOpt, - spendingInfoDbs = - receivedSpendingInfoDbs, - newTags = newTags) + incoming <- processReceivedUtxos( + transaction = transaction, + blockHashOpt = blockHashOpt, + spendingInfoDbs = receivedSpendingInfoDbs, + newTags = newTags, + relevantReceivedOutputs = relevantReceivedOutputs + ) _ = if (incoming.nonEmpty) { logger.info( s"Finished processing ${incoming.length} received outputs, it took=${TimeUtil.currentEpochMs - receivedStart}ms") @@ -481,8 +510,6 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { utxoF } - private case class OutputWithIndex(output: TransactionOutput, index: Int) - /** Processes an incoming transaction that already exists in our wallet. * If the incoming transaction has more confirmations than what we * have in the DB, we update the TX @@ -532,7 +559,9 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { transaction: Transaction, blockHashOpt: Option[DoubleSha256DigestBE]): Future[ Seq[SpendingInfoDb]] = { - + require( + outputsWithIndex.nonEmpty, + s"Cannot add utxos to wallet if we have none! got=${outputsWithIndex}") val spks = outputsWithIndex.map(_.output.scriptPubKey).toVector val addressDbsF: Future[Vector[AddressDb]] = { @@ -582,7 +611,6 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { addressDbs: Vector[AddressDb], outputsWithIndex: Vector[OutputWithIndex]): Vector[ (AddressDb, OutputWithIndex)] = { - val addressDbsWithOutputsOpt = outputsWithIndex.map { out => //find address associated with spk val addressDbOpt = @@ -596,7 +624,8 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { } } //get rid of outputs we couldn't match to an address - addressDbsWithOutputsOpt.flatten + val result = addressDbsWithOutputsOpt.flatten + result } private[wallet] def insertIncomingTransaction( @@ -611,20 +640,43 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { } yield (txDb, written) } + /** Filters outputs on tx so that only relevant outputs to our wallet are included */ private def getRelevantOutputs( transaction: Transaction): Future[Vector[OutputWithIndex]] = { val spks = transaction.outputs.map(_.scriptPubKey) scriptPubKeyDAO.findScriptPubKeys(spks.toVector).map { addrs => - val withIndex = - transaction.outputs.zipWithIndex - withIndex.collect { - case (out, idx) - if addrs.map(_.scriptPubKey).contains(out.scriptPubKey) => - OutputWithIndex(out, idx) - }.toVector + matchReceivedTx(addrs, transaction) } } + private def matchReceivedTx( + addrs: Vector[ScriptPubKeyDb], + transaction: Transaction): Vector[OutputWithIndex] = { + val withIndex = + transaction.outputs.zipWithIndex + withIndex.collect { + case (out, idx) if addrs.map(_.scriptPubKey).contains(out.scriptPubKey) => + OutputWithIndex(out, idx) + }.toVector + } + + private def getRelevantOutputsForBlock( + block: Block): Future[Vector[OutputWithIndex]] = { + val spksInBlock: Vector[ScriptPubKey] = block.transactions + .flatMap(tx => tx.outputs.map(o => o.scriptPubKey)) + .toVector + val spksInDbF = scriptPubKeyDAO.findScriptPubKeys(spksInBlock) + + val result = spksInDbF.map { addrs => + block.transactions.flatMap { tx => + val m = matchReceivedTx(addrs, tx) + m + } + } + + result.map(_.toVector) + } + /** Processes an incoming transaction that's new to us * * @return A list of inserted transaction outputs @@ -632,32 +684,51 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { private def processNewReceivedTx( transaction: Transaction, blockHashOpt: Option[DoubleSha256DigestBE], - newTags: Vector[AddressTag]): Future[Seq[SpendingInfoDb]] = { - val outputsF = getRelevantOutputs(transaction) - outputsF.flatMap { - case Vector() => - logger.trace( - s"Found no outputs relevant to us in transaction${transaction.txIdBE.hex}") + newTags: Vector[AddressTag], + relevantReceivedOutputs: Vector[OutputWithIndex]): Future[ + Seq[SpendingInfoDb]] = { + if (relevantReceivedOutputs.isEmpty) { + + logger.trace( + s"Found no outputs relevant to us in transaction${transaction.txIdBE.hex}") + + Future.successful(Vector.empty) + } else { + val filteredOutputs = + transaction.outputs.zipWithIndex.filter(o => + relevantReceivedOutputs.exists(_ == OutputWithIndex(o._1, o._2))) + + if (filteredOutputs.isEmpty) { + //no relevant outputs in this tx, return early Future.successful(Vector.empty) + } else { + val relevantReceivedOutputsForTx: Vector[OutputWithIndex] = { - case outputsWithIndex => - val totalIncoming = outputsWithIndex.map(_.output.value).sum + filteredOutputs.map { case (o, idx) => + OutputWithIndex(o, idx) + }.toVector + } - val spks = outputsWithIndex.map(_.output.scriptPubKey) + val spks = relevantReceivedOutputsForTx.map(_.output.scriptPubKey) val spksInDbF = addressDAO.findByScriptPubKeys(spks) val ourOutputsF = for { spksInDb <- spksInDbF } yield { - outputsWithIndex.collect { + relevantReceivedOutputsForTx.collect { case OutputWithIndex(out, idx) if spksInDb.map(_.scriptPubKey).exists(_ == out.scriptPubKey) => OutputWithIndex(out, idx) } } - val txDbF: Future[(TransactionDb, IncomingTransactionDb)] = - insertIncomingTransaction(transaction, totalIncoming, blockHashOpt) + val txDbF: Future[(TransactionDb, IncomingTransactionDb)] = for { + ourOutputs <- ourOutputsF + totalIncoming = ourOutputs.map(_.output.value).sum + incomingTx <- insertIncomingTransaction(transaction, + totalIncoming, + blockHashOpt) + } yield incomingTx val prevTagsDbF = for { (txDb, _) <- txDbF @@ -685,6 +756,7 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger { utxos <- addReceivedUTXOs(ourOutputs, txDb.transaction, blockHashOpt) _ <- newTagsF } yield utxos + } } } diff --git a/wallet/src/main/scala/org/bitcoins/wallet/models/ScriptPubKeyDAO.scala b/wallet/src/main/scala/org/bitcoins/wallet/models/ScriptPubKeyDAO.scala index a149631f29..64a6b66afc 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/models/ScriptPubKeyDAO.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/models/ScriptPubKeyDAO.scala @@ -52,8 +52,13 @@ case class ScriptPubKeyDAO()(implicit def findScriptPubKeys( spks: Vector[ScriptPubKey]): Future[Vector[ScriptPubKeyDb]] = { val hashes = spks.map(ScriptPubKeyDb.hash) - val query = table.filter(_.hash.inSet(hashes)) - safeDatabase.runVec(query.result) + //group hashes to avoid https://github.com/bitcoin-s/bitcoin-s/issues/4220 + val groupedHashes: Vector[Vector[Sha256Digest]] = + hashes.grouped(1000).toVector + val actions = + groupedHashes.map(hashes => table.filter(_.hash.inSet(hashes)).result) + val sequenced = DBIOAction.sequence(actions).map(_.flatten) + safeDatabase.runVec(sequenced) } case class ScriptPubKeyTable(tag: Tag)