2021 05 09 received utxos (#3063)

* Make TransactionProcessing.processTransactionImpl() a bit more parallel, fetch all relevant received and spent txos in parallel

* More refactors, make sure return types are Vector rather than Seq to make sure we don't have any pathlogical surprises with scala collections

* Fix type annoation to be Vector

* Fix remaining usage of Seq in TransactionProcessing

* Fix pattern match on Vector on 2.12.x

* Change input to Vector rather than Seq in processSpentUtxos()
This commit is contained in:
Chris Stewart 2021-05-12 10:45:23 -05:00 committed by GitHub
parent 2b8ac08cdc
commit 72636b7180
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 70 additions and 51 deletions

View File

@ -181,7 +181,7 @@ class SpendingInfoDAOTest extends WalletDAOFixture {
for {
utxo <- WalletTestUtil.insertLegacyUTXO(daos,
state = TxoState.DoesNotExist)
foundTxos <- spendingInfoDAO.findTx(utxo.txid)
foundTxos <- spendingInfoDAO.findOutputsReceived(utxo.txid)
} yield assert(foundTxos.contains(utxo))
}

View File

@ -191,40 +191,43 @@ private[wallet] trait TransactionProcessing extends WalletLogger {
}
}
protected def processIncomingUtxos(
/** Processes received utxos that are contained in the given transaction
* @param transaction the transaction that we are receiving utxos from
* @param blockHashOpt the block hash that contains this tx
* @param spendingInfoDbs the spending info dbs that are relevant for this transaction
* @param newTags tags associated with this tx
* @return
*/
protected def processReceivedUtxos(
transaction: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE],
newTags: Vector[AddressTag]): Future[Vector[SpendingInfoDb]] =
spendingInfoDAO
.findTx(transaction)
.flatMap {
// no existing elements found
case Vector() =>
processNewIncomingTx(transaction, blockHashOpt, newTags)
.map(_.toVector)
case txos: Vector[SpendingInfoDb] =>
val processedVec = txos.map { txo =>
processExistingIncomingTxo(transaction, blockHashOpt, txo)
}
Future.sequence(processedVec)
spendingInfoDbs: Vector[SpendingInfoDb],
newTags: Vector[AddressTag]): Future[Vector[SpendingInfoDb]] = {
if (spendingInfoDbs.isEmpty) {
processNewReceivedTx(transaction, blockHashOpt, newTags)
.map(_.toVector)
} else {
val processedVec = spendingInfoDbs.map { txo =>
processExistingReceivedTxo(transaction, blockHashOpt, txo)
}
Future.sequence(processedVec)
}
}
/** Searches for outputs on the given transaction that are
* being spent from our wallet
*/
protected def processSpentUtxos(
transaction: Transaction,
outputsBeingSpent: Vector[SpendingInfoDb],
blockHashOpt: Option[DoubleSha256DigestBE]): Future[
Vector[SpendingInfoDb]] = {
for {
outputsBeingSpent <- spendingInfoDAO.findOutputsBeingSpent(transaction)
_ <-
_ <- {
if (outputsBeingSpent.nonEmpty)
insertTransaction(transaction, blockHashOpt)
else Future.unit
}
processed <- Future
.sequence {
outputsBeingSpent.map(markAsSpent(_, transaction.txIdBE))
@ -249,9 +252,24 @@ private[wallet] trait TransactionProcessing extends WalletLogger {
s"Processing transaction=${transaction.txIdBE.hex} with blockHash=${blockHashOpt
.map(_.hex)}")
val receivedSpendingInfoDbsF: Future[Vector[SpendingInfoDb]] = {
spendingInfoDAO.findTx(transaction)
}
val spentSpendingInfoDbsF: Future[Vector[SpendingInfoDb]] = {
spendingInfoDAO.findOutputsBeingSpent(transaction)
}
for {
incoming <- processIncomingUtxos(transaction, blockHashOpt, newTags)
outgoing <- processSpentUtxos(transaction, blockHashOpt)
receivedSpendingInfoDbs <- receivedSpendingInfoDbsF
incoming <- processReceivedUtxos(transaction = transaction,
blockHashOpt = blockHashOpt,
spendingInfoDbs =
receivedSpendingInfoDbs,
newTags = newTags)
spentSpendingInfoDbs <- spentSpendingInfoDbsF
outgoing <- processSpentUtxos(transaction = transaction,
outputsBeingSpent = spentSpendingInfoDbs,
blockHashOpt = blockHashOpt)
_ <- walletCallbacks.executeOnTransactionProcessed(logger, transaction)
} yield {
ProcessTxResult(incoming, outgoing)
@ -331,7 +349,7 @@ private[wallet] trait TransactionProcessing extends WalletLogger {
* If the incoming transaction has more confirmations than what we
* have in the DB, we update the TX
*/
private def processExistingIncomingTxo(
private def processExistingReceivedTxo(
transaction: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE],
foundTxo: SpendingInfoDb): Future[SpendingInfoDb] = {
@ -432,7 +450,7 @@ private[wallet] trait TransactionProcessing extends WalletLogger {
}
private def getRelevantOutputs(
transaction: Transaction): Future[Seq[OutputWithIndex]] = {
transaction: Transaction): Future[Vector[OutputWithIndex]] = {
val spks = transaction.outputs.map(_.scriptPubKey)
scriptPubKeyDAO.findScriptPubKeys(spks.toVector).map { addrs =>
val withIndex =
@ -441,7 +459,7 @@ private[wallet] trait TransactionProcessing extends WalletLogger {
case (out, idx)
if addrs.map(_.scriptPubKey).contains(out.scriptPubKey) =>
OutputWithIndex(out, idx)
}
}.toVector
}
}
@ -449,12 +467,13 @@ private[wallet] trait TransactionProcessing extends WalletLogger {
*
* @return A list of inserted transaction outputs
*/
private def processNewIncomingTx(
private def processNewReceivedTx(
transaction: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE],
newTags: Vector[AddressTag]): Future[Seq[SpendingInfoDb]] = {
getRelevantOutputs(transaction).flatMap {
case Nil =>
val outputsF = getRelevantOutputs(transaction)
outputsF.flatMap {
case Vector() =>
logger.trace(
s"Found no outputs relevant to us in transaction${transaction.txIdBE.hex}")
Future.successful(Vector.empty)

View File

@ -188,24 +188,24 @@ case class SpendingInfoDAO()(implicit
* the given TX
*/
def findTx(tx: Transaction): Future[Vector[SpendingInfoDb]] =
findTx(tx.txIdBE)
findOutputsReceived(tx.txIdBE)
private def _findOutputsBeingSpent(
tx: Transaction): Future[Vector[UTXORecord]] = {
val filtered = table
.filter { case txo =>
txo.outPoint.inSet(tx.inputs.map(_.previousOutput))
}
safeDatabase.runVec(filtered.result)
}
/** Finds all the outputs being spent in the given
* transaction
*/
def findOutputsBeingSpent(tx: Transaction): Future[Seq[SpendingInfoDb]] = {
def _findOutputsBeingSpent: Future[Seq[UTXORecord]] = {
val filtered = table
.filter { case txo =>
txo.outPoint.inSet(tx.inputs.map(_.previousOutput))
}
safeDatabase.run(filtered.result)
}
def findOutputsBeingSpent(tx: Transaction): Future[Vector[SpendingInfoDb]] = {
for {
utxos <- _findOutputsBeingSpent
utxos <- _findOutputsBeingSpent(tx)
spks <- findScriptPubKeysByUtxos(utxos)
} yield {
utxos.map(utxo =>
@ -246,14 +246,17 @@ case class SpendingInfoDAO()(implicit
safeDatabase.runVec(query.result)
}
/** Joins the spk table on the spending info table with the spk id */
private val spkJoinQuery = table
.join(spkTable)
.on(_.scriptPubKeyId === _.id)
/** Fetches all the incoming TXOs in our DB that are in
* the transaction with the given TXID
*/
def findTx(txid: DoubleSha256DigestBE): Future[Vector[SpendingInfoDb]] = {
val query = table
.join(spkTable)
.on(_.scriptPubKeyId === _.id)
val filtered = query.filter(_._1.txid === txid)
def findOutputsReceived(
txid: DoubleSha256DigestBE): Future[Vector[SpendingInfoDb]] = {
val filtered = spkJoinQuery.filter(_._1.txid === txid)
safeDatabase
.runVec(filtered.result)
.map(res =>
@ -264,10 +267,7 @@ case class SpendingInfoDAO()(implicit
def findByScriptPubKey(
scriptPubKey: ScriptPubKey): Future[Vector[SpendingInfoDb]] = {
val query = table
.join(spkTable)
.on(_.scriptPubKeyId === _.id)
val filtered = query.filter(_._2.scriptPubKey === scriptPubKey)
val filtered = spkJoinQuery.filter(_._2.scriptPubKey === scriptPubKey)
safeDatabase
.runVec(filtered.result)
.map(res =>
@ -424,7 +424,7 @@ case class SpendingInfoDAO()(implicit
}
private def findScriptPubKeysByUtxos(
utxos: Seq[UTXORecord]): Future[Map[Long, ScriptPubKeyDb]] = {
utxos: Vector[UTXORecord]): Future[Map[Long, ScriptPubKeyDb]] = {
val ids = utxos.map(_.scriptPubKeyId)
findScriptPubKeys(ids)
}