Reduce usage of .findAll() (doesn't scale for large dbs). Now pass in… (#2706)

* Reduce usage of .findAll() (doesn't scale for large dbs). Now pass in the specific things we are searching for

* Add timestamps for checking how long it takes to processBlock()
This commit is contained in:
Chris Stewart 2021-02-21 16:15:50 -06:00 committed by GitHub
parent b63333327f
commit 1a2ddf6a0d
3 changed files with 38 additions and 10 deletions

View File

@ -83,4 +83,16 @@ class ScriptPubKeyDAOTest extends WalletDAOFixture {
}
}
it must "find a scriptPubKey" in { daos =>
val scriptPubKeyDAO = daos.scriptPubKeyDAO
val pkh = P2PKHScriptPubKey(ECPublicKey.freshPublicKey)
val notInDb = P2PKHScriptPubKey(ECPublicKey.freshPublicKey)
for {
_ <- scriptPubKeyDAO.createIfNotExists(ScriptPubKeyDb(pkh))
found <- scriptPubKeyDAO.findScriptPubKeys(Vector(pkh, notInDb))
} yield {
assert(found.length == 1)
assert(found.head.scriptPubKey == pkh)
}
}
}

View File

@ -8,7 +8,7 @@ import org.bitcoins.core.number.UInt32
import org.bitcoins.core.protocol.BitcoinAddress
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.transaction.{Transaction, TransactionOutput}
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.core.util.{FutureUtil, TimeUtil}
import org.bitcoins.core.wallet.fee.FeeUnit
import org.bitcoins.core.wallet.utxo.{AddressTag, TxoState}
import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
@ -43,7 +43,7 @@ private[wallet] trait TransactionProcessing extends WalletLogger {
override def processBlock(block: Block): Future[Wallet] = {
logger.info(s"Processing block=${block.blockHeader.hash.flip}")
val start = TimeUtil.currentEpochMs
val resF = for {
newWallet <- block.transactions.foldLeft(Future.successful(this)) {
(acc, transaction) =>
@ -67,9 +67,12 @@ private[wallet] trait TransactionProcessing extends WalletLogger {
f.onComplete(failure =>
signalBlockProcessingCompletion(block.blockHeader.hash, failure))
f.foreach(_ =>
f.foreach { _ =>
val stop = TimeUtil.currentEpochMs
logger.info(
s"Finished processing of block=${block.blockHeader.hash.flip}."))
s"Finished processing of block=${block.blockHeader.hash.flip}. It took ${stop - start}ms")
}
f.failed.foreach(e =>
logger.error(s"Error processing of block=${block.blockHeader.hash.flip}.",
e))
@ -414,7 +417,8 @@ private[wallet] trait TransactionProcessing extends WalletLogger {
private def getRelevantOutputs(
transaction: Transaction): Future[Seq[OutputWithIndex]] = {
scriptPubKeyDAO.findAll().map { addrs =>
val spks = transaction.outputs.map(_.scriptPubKey)
scriptPubKeyDAO.findScriptPubKeys(spks.toVector).map { addrs =>
val withIndex =
transaction.outputs.zipWithIndex
withIndex.collect {
@ -452,17 +456,16 @@ private[wallet] trait TransactionProcessing extends WalletLogger {
s"Found $count relevant output(s) in transaction=${transaction.txIdBE.hex}: $outputStr")
val totalIncoming = outputsWithIndex.map(_.output.value).sum
val spks = outputsWithIndex.map(_.output.scriptPubKey)
val spksInDbF = addressDAO.findByScriptPubKeys(spks.toVector)
for {
(txDb, _) <- insertIncomingTransaction(transaction, totalIncoming)
addrs <- addressDAO.findAllAddresses()
spksInDb <- spksInDbF
ourOutputs = outputsWithIndex.collect {
case OutputWithIndex(out, idx)
if addrs.map(_.scriptPubKey).contains(out.scriptPubKey) =>
if spksInDb.map(_.scriptPubKey).contains(out.scriptPubKey) =>
OutputWithIndex(out, idx)
}
prevTagDbs <-
addressTagDAO.findTx(txDb.transaction, networkParameters)
prevTags = prevTagDbs.map(_.addressTag)

View File

@ -41,6 +41,19 @@ case class ScriptPubKeyDAO()(implicit
safeDatabase.run(actions.transactionally)
}
/** Finds a scriptPubKey in the database, if it exists */
def findScriptPubKey(spk: ScriptPubKey): Future[Option[ScriptPubKeyDb]] = {
val foundVecF = findScriptPubKeys(Vector(spk))
foundVecF.map(_.headOption)
}
/** Searches for the given set of spks and returns the ones that exist in the db */
def findScriptPubKeys(
spks: Vector[ScriptPubKey]): Future[Vector[ScriptPubKeyDb]] = {
val query = table.filter(_.scriptPubKey.inSet(spks))
safeDatabase.runVec(query.result)
}
case class ScriptPubKeyTable(tag: Tag)
extends TableAutoInc[ScriptPubKeyDb](tag, schemaName, "pub_key_scripts") {