2022 03 22 getrelevantoutputs upfront (#4219)

* WIP

Try and debug

WIP

Fix bug for incomingAmount

Clean up logs

more cleanups on logs

Remove another uncessary log

Don't process tx if we don't have a relevant received output for that specific tx

* Group hashes in batches of 1,000 before querying the db
This commit is contained in:
Chris Stewart 2022-04-26 11:12:44 -05:00 committed by GitHub
parent 525fb2ac0d
commit 6db1f26625
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 137 additions and 49 deletions

View file

@ -27,3 +27,5 @@ object TransactionOutput extends Factory[TransactionOutput] {
RawTransactionOutputParser.read(bytes) RawTransactionOutputParser.read(bytes)
} }
case class OutputWithIndex(output: TransactionOutput, index: Int)

View file

@ -7,7 +7,11 @@ import org.bitcoins.core.protocol.dlc.models.DLCMessage._
import org.bitcoins.core.protocol.dlc.models._ import org.bitcoins.core.protocol.dlc.models._
import org.bitcoins.core.protocol.script._ import org.bitcoins.core.protocol.script._
import org.bitcoins.core.protocol.tlv._ import org.bitcoins.core.protocol.tlv._
import org.bitcoins.core.protocol.transaction.{Transaction, WitnessTransaction} import org.bitcoins.core.protocol.transaction.{
OutputWithIndex,
Transaction,
WitnessTransaction
}
import org.bitcoins.core.psbt.InputPSBTRecord.PartialSignature import org.bitcoins.core.psbt.InputPSBTRecord.PartialSignature
import org.bitcoins.core.util.FutureUtil import org.bitcoins.core.util.FutureUtil
import org.bitcoins.core.wallet.utxo.AddressTag import org.bitcoins.core.wallet.utxo.AddressTag
@ -236,9 +240,15 @@ private[bitcoins] trait DLCTransactionProcessing extends TransactionProcessing {
tx: Transaction, tx: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE], blockHashOpt: Option[DoubleSha256DigestBE],
spendingInfoDbs: Vector[SpendingInfoDb], spendingInfoDbs: Vector[SpendingInfoDb],
newTags: Vector[AddressTag]): Future[Vector[SpendingInfoDb]] = { newTags: Vector[AddressTag],
relevantReceivedOutputs: Vector[OutputWithIndex]): Future[
Vector[SpendingInfoDb]] = {
super super
.processReceivedUtxos(tx, blockHashOpt, spendingInfoDbs, newTags) .processReceivedUtxos(tx,
blockHashOpt,
spendingInfoDbs,
newTags,
relevantReceivedOutputs)
.flatMap { res => .flatMap { res =>
for { for {
dlcDbs <- dlcDAO.findByFundingTxId(tx.txIdBE) dlcDbs <- dlcDAO.findByFundingTxId(tx.txIdBE)

View file

@ -51,7 +51,6 @@ class ProcessBlockTest extends BitcoinSWalletTestCachedBitcoinV19 {
_ <- wallet.processBlock(block) _ <- wallet.processBlock(block)
utxos <- wallet.listUtxos() utxos <- wallet.listUtxos()
height <- bitcoind.getBlockCount height <- bitcoind.getBlockCount
bestHash <- bitcoind.getBestBlockHash bestHash <- bitcoind.getBestBlockHash
syncHeightOpt <- wallet.getSyncDescriptorOpt() syncHeightOpt <- wallet.getSyncDescriptorOpt()

View file

@ -7,9 +7,9 @@ import org.bitcoins.core.protocol.BitcoinAddress
import org.bitcoins.core.protocol.blockchain.Block import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.script.ScriptPubKey import org.bitcoins.core.protocol.script.ScriptPubKey
import org.bitcoins.core.protocol.transaction.{ import org.bitcoins.core.protocol.transaction.{
OutputWithIndex,
Transaction, Transaction,
TransactionOutPoint, TransactionOutPoint
TransactionOutput
} }
import org.bitcoins.core.util.TimeUtil import org.bitcoins.core.util.TimeUtil
import org.bitcoins.core.wallet.fee.FeeUnit import org.bitcoins.core.wallet.fee.FeeUnit
@ -37,12 +37,17 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
transaction: Transaction, transaction: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE] blockHashOpt: Option[DoubleSha256DigestBE]
): Future[Wallet] = { ): Future[Wallet] = {
val relevantReceivedOutputsF = getRelevantOutputs(transaction)
for { for {
result <- processTransactionImpl(transaction = transaction, relevantReceivedOutputs <- relevantReceivedOutputsF
blockHashOpt = blockHashOpt, result <- processTransactionImpl(
newTags = Vector.empty, transaction = transaction,
receivedSpendingInfoDbsOpt = None, blockHashOpt = blockHashOpt,
spentSpendingInfoDbsOpt = None) newTags = Vector.empty,
receivedSpendingInfoDbsOpt = None,
spentSpendingInfoDbsOpt = None,
relevantReceivedOutputs
)
} yield { } yield {
if (result.updatedIncoming.nonEmpty || result.updatedOutgoing.nonEmpty) { if (result.updatedIncoming.nonEmpty || result.updatedOutgoing.nonEmpty) {
logger.info( logger.info(
@ -118,12 +123,19 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
spendingInfoDAO.findOutputsBeingSpent(block.transactions.toVector) spendingInfoDAO.findOutputsBeingSpent(block.transactions.toVector)
val blockHashOpt = Some(block.blockHeader.hash.flip) val blockHashOpt = Some(block.blockHeader.hash.flip)
//fetch all outputs we may have received in this block in advance
//as an optimization
val relevantReceivedOutputsForBlockF = getRelevantOutputsForBlock(block)
val resultF: Future[Future[Wallet]] = for { val resultF: Future[Future[Wallet]] = for {
//map on these first so we don't have to call //map on these first so we don't have to call
//.map everytime we iterate through a tx //.map everytime we iterate through a tx
//which is costly (thread overhead) //which is costly (thread overhead)
receivedSpendingInfoDbsOpt <- cachedReceivedOptF receivedSpendingInfoDbsOpt <- cachedReceivedOptF
spentSpendingInfoDbs <- spentSpendingInfoDbsF spentSpendingInfoDbs <- spentSpendingInfoDbsF
relevantReceivedOutputsForBlock <-
relevantReceivedOutputsForBlockF
} yield { } yield {
//we need to keep a cache of spentSpendingInfoDb //we need to keep a cache of spentSpendingInfoDb
//for the case where we receive & then spend that //for the case where we receive & then spend that
@ -143,7 +155,8 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
blockHashOpt = blockHashOpt, blockHashOpt = blockHashOpt,
newTags = Vector.empty, newTags = Vector.empty,
receivedSpendingInfoDbsOpt = receivedSpendingInfoDbsOpt, receivedSpendingInfoDbsOpt = receivedSpendingInfoDbsOpt,
spentSpendingInfoDbsOpt = cachedSpentOpt spentSpendingInfoDbsOpt = cachedSpentOpt,
relevantReceivedOutputs = relevantReceivedOutputsForBlock
) )
} }
_ = { _ = {
@ -232,6 +245,7 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
logger.info( logger.info(
s"Processing TX from our wallet, transaction=${transaction.txIdBE.hex} with blockHash=${blockHashOpt s"Processing TX from our wallet, transaction=${transaction.txIdBE.hex} with blockHash=${blockHashOpt
.map(_.hex)}") .map(_.hex)}")
val relevantOutputsF = getRelevantOutputs(transaction)
for { for {
(txDb, _) <- (txDb, _) <-
insertOutgoingTransaction(transaction, insertOutgoingTransaction(transaction,
@ -239,11 +253,13 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
inputAmount, inputAmount,
sentAmount, sentAmount,
blockHashOpt) blockHashOpt)
relevantOutputs <- relevantOutputsF
result <- processTransactionImpl(transaction = txDb.transaction, result <- processTransactionImpl(transaction = txDb.transaction,
blockHashOpt = blockHashOpt, blockHashOpt = blockHashOpt,
newTags = newTags, newTags = newTags,
receivedSpendingInfoDbsOpt = None, receivedSpendingInfoDbsOpt = None,
spentSpendingInfoDbsOpt = None) spentSpendingInfoDbsOpt = None,
relevantOutputs)
} yield { } yield {
val txid = txDb.transaction.txIdBE val txid = txDb.transaction.txIdBE
val changeOutputs = result.updatedIncoming.length val changeOutputs = result.updatedIncoming.length
@ -302,9 +318,19 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
transaction: Transaction, transaction: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE], blockHashOpt: Option[DoubleSha256DigestBE],
spendingInfoDbs: Vector[SpendingInfoDb], spendingInfoDbs: Vector[SpendingInfoDb],
newTags: Vector[AddressTag]): Future[Vector[SpendingInfoDb]] = { newTags: Vector[AddressTag],
if (spendingInfoDbs.isEmpty) { relevantReceivedOutputs: Vector[OutputWithIndex]): Future[
processNewReceivedTx(transaction, blockHashOpt, newTags) Vector[SpendingInfoDb]] = {
if (spendingInfoDbs.isEmpty && relevantReceivedOutputs.isEmpty) {
//as an optimization if we don't have any relevant utxos
//and any relevant outputs that match scripts in our wallet
//we can just return now
Future.successful(Vector.empty)
} else if (spendingInfoDbs.isEmpty) {
processNewReceivedTx(transaction,
blockHashOpt,
newTags,
relevantReceivedOutputs)
.map(_.toVector) .map(_.toVector)
} else { } else {
val processedVec = spendingInfoDbs.map { txo => val processedVec = spendingInfoDbs.map { txo =>
@ -347,13 +373,13 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
blockHashOpt: Option[DoubleSha256DigestBE], blockHashOpt: Option[DoubleSha256DigestBE],
newTags: Vector[AddressTag], newTags: Vector[AddressTag],
receivedSpendingInfoDbsOpt: Option[Vector[SpendingInfoDb]], receivedSpendingInfoDbsOpt: Option[Vector[SpendingInfoDb]],
spentSpendingInfoDbsOpt: Option[Vector[SpendingInfoDb]]): Future[ spentSpendingInfoDbsOpt: Option[Vector[SpendingInfoDb]],
relevantReceivedOutputs: Vector[OutputWithIndex]): Future[
ProcessTxResult] = { ProcessTxResult] = {
logger.debug( logger.debug(
s"Processing transaction=${transaction.txIdBE.hex} with blockHash=${blockHashOpt s"Processing transaction=${transaction.txIdBE.hex} with blockHash=${blockHashOpt
.map(_.hex)}") .map(_.hex)}")
val receivedSpendingInfoDbsF: Future[Vector[SpendingInfoDb]] = { val receivedSpendingInfoDbsF: Future[Vector[SpendingInfoDb]] = {
receivedSpendingInfoDbsOpt match { receivedSpendingInfoDbsOpt match {
case Some(received) => case Some(received) =>
@ -366,6 +392,7 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
} }
} }
val spentSpendingInfoDbsF: Future[Vector[SpendingInfoDb]] = { val spentSpendingInfoDbsF: Future[Vector[SpendingInfoDb]] = {
spentSpendingInfoDbsOpt match { spentSpendingInfoDbsOpt match {
case Some(spent) => case Some(spent) =>
@ -383,11 +410,13 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
for { for {
receivedSpendingInfoDbs <- receivedSpendingInfoDbsF receivedSpendingInfoDbs <- receivedSpendingInfoDbsF
receivedStart = TimeUtil.currentEpochMs receivedStart = TimeUtil.currentEpochMs
incoming <- processReceivedUtxos(transaction = transaction, incoming <- processReceivedUtxos(
blockHashOpt = blockHashOpt, transaction = transaction,
spendingInfoDbs = blockHashOpt = blockHashOpt,
receivedSpendingInfoDbs, spendingInfoDbs = receivedSpendingInfoDbs,
newTags = newTags) newTags = newTags,
relevantReceivedOutputs = relevantReceivedOutputs
)
_ = if (incoming.nonEmpty) { _ = if (incoming.nonEmpty) {
logger.info( logger.info(
s"Finished processing ${incoming.length} received outputs, it took=${TimeUtil.currentEpochMs - receivedStart}ms") s"Finished processing ${incoming.length} received outputs, it took=${TimeUtil.currentEpochMs - receivedStart}ms")
@ -481,8 +510,6 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
utxoF utxoF
} }
private case class OutputWithIndex(output: TransactionOutput, index: Int)
/** Processes an incoming transaction that already exists in our wallet. /** Processes an incoming transaction that already exists in our wallet.
* If the incoming transaction has more confirmations than what we * If the incoming transaction has more confirmations than what we
* have in the DB, we update the TX * have in the DB, we update the TX
@ -532,7 +559,9 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
transaction: Transaction, transaction: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE]): Future[ blockHashOpt: Option[DoubleSha256DigestBE]): Future[
Seq[SpendingInfoDb]] = { Seq[SpendingInfoDb]] = {
require(
outputsWithIndex.nonEmpty,
s"Cannot add utxos to wallet if we have none! got=${outputsWithIndex}")
val spks = outputsWithIndex.map(_.output.scriptPubKey).toVector val spks = outputsWithIndex.map(_.output.scriptPubKey).toVector
val addressDbsF: Future[Vector[AddressDb]] = { val addressDbsF: Future[Vector[AddressDb]] = {
@ -582,7 +611,6 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
addressDbs: Vector[AddressDb], addressDbs: Vector[AddressDb],
outputsWithIndex: Vector[OutputWithIndex]): Vector[ outputsWithIndex: Vector[OutputWithIndex]): Vector[
(AddressDb, OutputWithIndex)] = { (AddressDb, OutputWithIndex)] = {
val addressDbsWithOutputsOpt = outputsWithIndex.map { out => val addressDbsWithOutputsOpt = outputsWithIndex.map { out =>
//find address associated with spk //find address associated with spk
val addressDbOpt = val addressDbOpt =
@ -596,7 +624,8 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
} }
} }
//get rid of outputs we couldn't match to an address //get rid of outputs we couldn't match to an address
addressDbsWithOutputsOpt.flatten val result = addressDbsWithOutputsOpt.flatten
result
} }
private[wallet] def insertIncomingTransaction( private[wallet] def insertIncomingTransaction(
@ -611,20 +640,43 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
} yield (txDb, written) } yield (txDb, written)
} }
/** Filters outputs on tx so that only relevant outputs to our wallet are included */
private def getRelevantOutputs( private def getRelevantOutputs(
transaction: Transaction): Future[Vector[OutputWithIndex]] = { transaction: Transaction): Future[Vector[OutputWithIndex]] = {
val spks = transaction.outputs.map(_.scriptPubKey) val spks = transaction.outputs.map(_.scriptPubKey)
scriptPubKeyDAO.findScriptPubKeys(spks.toVector).map { addrs => scriptPubKeyDAO.findScriptPubKeys(spks.toVector).map { addrs =>
val withIndex = matchReceivedTx(addrs, transaction)
transaction.outputs.zipWithIndex
withIndex.collect {
case (out, idx)
if addrs.map(_.scriptPubKey).contains(out.scriptPubKey) =>
OutputWithIndex(out, idx)
}.toVector
} }
} }
private def matchReceivedTx(
addrs: Vector[ScriptPubKeyDb],
transaction: Transaction): Vector[OutputWithIndex] = {
val withIndex =
transaction.outputs.zipWithIndex
withIndex.collect {
case (out, idx) if addrs.map(_.scriptPubKey).contains(out.scriptPubKey) =>
OutputWithIndex(out, idx)
}.toVector
}
private def getRelevantOutputsForBlock(
block: Block): Future[Vector[OutputWithIndex]] = {
val spksInBlock: Vector[ScriptPubKey] = block.transactions
.flatMap(tx => tx.outputs.map(o => o.scriptPubKey))
.toVector
val spksInDbF = scriptPubKeyDAO.findScriptPubKeys(spksInBlock)
val result = spksInDbF.map { addrs =>
block.transactions.flatMap { tx =>
val m = matchReceivedTx(addrs, tx)
m
}
}
result.map(_.toVector)
}
/** Processes an incoming transaction that's new to us /** Processes an incoming transaction that's new to us
* *
* @return A list of inserted transaction outputs * @return A list of inserted transaction outputs
@ -632,32 +684,51 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
private def processNewReceivedTx( private def processNewReceivedTx(
transaction: Transaction, transaction: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE], blockHashOpt: Option[DoubleSha256DigestBE],
newTags: Vector[AddressTag]): Future[Seq[SpendingInfoDb]] = { newTags: Vector[AddressTag],
val outputsF = getRelevantOutputs(transaction) relevantReceivedOutputs: Vector[OutputWithIndex]): Future[
outputsF.flatMap { Seq[SpendingInfoDb]] = {
case Vector() => if (relevantReceivedOutputs.isEmpty) {
logger.trace(
s"Found no outputs relevant to us in transaction${transaction.txIdBE.hex}") logger.trace(
s"Found no outputs relevant to us in transaction${transaction.txIdBE.hex}")
Future.successful(Vector.empty)
} else {
val filteredOutputs =
transaction.outputs.zipWithIndex.filter(o =>
relevantReceivedOutputs.exists(_ == OutputWithIndex(o._1, o._2)))
if (filteredOutputs.isEmpty) {
//no relevant outputs in this tx, return early
Future.successful(Vector.empty) Future.successful(Vector.empty)
} else {
val relevantReceivedOutputsForTx: Vector[OutputWithIndex] = {
case outputsWithIndex => filteredOutputs.map { case (o, idx) =>
val totalIncoming = outputsWithIndex.map(_.output.value).sum OutputWithIndex(o, idx)
}.toVector
}
val spks = outputsWithIndex.map(_.output.scriptPubKey) val spks = relevantReceivedOutputsForTx.map(_.output.scriptPubKey)
val spksInDbF = addressDAO.findByScriptPubKeys(spks) val spksInDbF = addressDAO.findByScriptPubKeys(spks)
val ourOutputsF = for { val ourOutputsF = for {
spksInDb <- spksInDbF spksInDb <- spksInDbF
} yield { } yield {
outputsWithIndex.collect { relevantReceivedOutputsForTx.collect {
case OutputWithIndex(out, idx) case OutputWithIndex(out, idx)
if spksInDb.map(_.scriptPubKey).exists(_ == out.scriptPubKey) => if spksInDb.map(_.scriptPubKey).exists(_ == out.scriptPubKey) =>
OutputWithIndex(out, idx) OutputWithIndex(out, idx)
} }
} }
val txDbF: Future[(TransactionDb, IncomingTransactionDb)] = val txDbF: Future[(TransactionDb, IncomingTransactionDb)] = for {
insertIncomingTransaction(transaction, totalIncoming, blockHashOpt) ourOutputs <- ourOutputsF
totalIncoming = ourOutputs.map(_.output.value).sum
incomingTx <- insertIncomingTransaction(transaction,
totalIncoming,
blockHashOpt)
} yield incomingTx
val prevTagsDbF = for { val prevTagsDbF = for {
(txDb, _) <- txDbF (txDb, _) <- txDbF
@ -685,6 +756,7 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
utxos <- addReceivedUTXOs(ourOutputs, txDb.transaction, blockHashOpt) utxos <- addReceivedUTXOs(ourOutputs, txDb.transaction, blockHashOpt)
_ <- newTagsF _ <- newTagsF
} yield utxos } yield utxos
}
} }
} }

View file

@ -52,8 +52,13 @@ case class ScriptPubKeyDAO()(implicit
def findScriptPubKeys( def findScriptPubKeys(
spks: Vector[ScriptPubKey]): Future[Vector[ScriptPubKeyDb]] = { spks: Vector[ScriptPubKey]): Future[Vector[ScriptPubKeyDb]] = {
val hashes = spks.map(ScriptPubKeyDb.hash) val hashes = spks.map(ScriptPubKeyDb.hash)
val query = table.filter(_.hash.inSet(hashes)) //group hashes to avoid https://github.com/bitcoin-s/bitcoin-s/issues/4220
safeDatabase.runVec(query.result) val groupedHashes: Vector[Vector[Sha256Digest]] =
hashes.grouped(1000).toVector
val actions =
groupedHashes.map(hashes => table.filter(_.hash.inSet(hashes)).result)
val sequenced = DBIOAction.sequence(actions).map(_.flatten)
safeDatabase.runVec(sequenced)
} }
case class ScriptPubKeyTable(tag: Tag) case class ScriptPubKeyTable(tag: Tag)