From 7ee1f0f4061080ec5cde0da80aa8840ad42a72fe Mon Sep 17 00:00:00 2001 From: Chris Stewart Date: Sun, 23 Jan 2022 07:39:10 -0600 Subject: [PATCH] Implement batching of database calls for our chain callback (#4003) --- .../org/bitcoins/server/RoutesSpec.scala | 13 +++-- .../server/BitcoindRpcBackendUtil.scala | 8 +-- .../org/bitcoins/server/ChainRoutes.scala | 7 +-- .../org/bitcoins/server/util/ChainUtil.scala | 29 ++++++---- .../bitcoins/server/util/WebsocketUtil.scala | 54 +++++++++++-------- .../rpc/client/common/BitcoindRpcClient.scala | 9 ++++ .../chain/blockchain/ChainHandlerTest.scala | 10 ++-- .../org/bitcoins/chain/ChainCallbacks.scala | 16 +++--- .../chain/blockchain/ChainHandler.scala | 36 +++++-------- .../chain/models/BlockHeaderDAO.scala | 15 ++++++ .../bitcoins/core/api/chain/ChainApi.scala | 3 ++ .../core/api/chain/ChainQueryApi.scala | 2 +- .../bitcoins/testkit/node/BaseNodeTest.scala | 5 ++ 13 files changed, 127 insertions(+), 80 deletions(-) diff --git a/app/server-test/src/test/scala/org/bitcoins/server/RoutesSpec.scala b/app/server-test/src/test/scala/org/bitcoins/server/RoutesSpec.scala index 5a008d9807..654062bd98 100644 --- a/app/server-test/src/test/scala/org/bitcoins/server/RoutesSpec.scala +++ b/app/server-test/src/test/scala/org/bitcoins/server/RoutesSpec.scala @@ -251,13 +251,16 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory { (mockChainApi .getHeader(_: DoubleSha256DigestBE)) .expects(blockHeader.hashBE) - .twice() .returning(Future.successful(Some(blockHeaderDb))) + (mockChainApi.getBestBlockHeader: () => Future[BlockHeaderDb]) + .expects() + .returning(Future.successful(blockHeaderDb)) + (mockChainApi - .getNumberOfConfirmations(_: DoubleSha256DigestBE)) - .expects(blockHeader.hashBE) - .returning(Future.successful(Some(1))) + .getHeaders(_: Vector[DoubleSha256DigestBE])) + .expects(Vector(blockHeader.hashBE)) + .returning(Future.successful(Vector(Some(blockHeaderDb)))) val route = chainRoutes.handleCommand( @@ -266,7 +269,7 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory { Get() ~> route ~> check { assert(contentType == `application/json`) assert(responseAs[ - String] == s"""{"result":{"raw":"${blockHeader.hex}","hash":"${blockHeader.hashBE.hex}","confirmations":1,"height":1899697,"version":${blockHeader.version.toLong},"versionHex":"${blockHeader.version.hex}","merkleroot":"${blockHeader.merkleRootHashBE.hex}","time":${blockHeader.time.toLong},"mediantime":${blockHeaderDb.time.toLong},"nonce":${blockHeader.nonce.toLong},"bits":"${blockHeader.nBits.hex}","difficulty":${blockHeader.difficulty.toDouble},"chainwork":"$chainworkStr","previousblockhash":"${blockHeader.previousBlockHashBE.hex}","nextblockhash":null},"error":null}""") + String] == s"""{"result":{"raw":"${blockHeader.hex}","hash":"${blockHeader.hashBE.hex}","confirmations":0,"height":1899697,"version":${blockHeader.version.toLong},"versionHex":"${blockHeader.version.hex}","merkleroot":"${blockHeader.merkleRootHashBE.hex}","time":${blockHeader.time.toLong},"mediantime":${blockHeaderDb.time.toLong},"nonce":${blockHeader.nonce.toLong},"bits":"${blockHeader.nBits.hex}","difficulty":${blockHeader.difficulty.toDouble},"chainwork":"$chainworkStr","previousblockhash":"${blockHeader.previousBlockHashBE.hex}","nextblockhash":null},"error":null}""") } } diff --git a/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala b/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala index 586129f39b..905117e1dc 100644 --- a/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala +++ b/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala @@ -258,13 +258,15 @@ object BitcoindRpcBackendUtil extends Logging { val executeCallbackF: Future[Wallet] = blockProcessedF.flatMap { wallet => chainCallbacksOpt match { - case None => Future.successful(wallet) + case None => Future.successful(wallet) case Some(callback) => + //this can be slow as we aren't batching headers at all + val headerWithHeights = + Vector((blockHeaderResult.height, block.blockHeader)) val f = callback .executeOnBlockHeaderConnectedCallbacks( logger, - blockHeaderResult.height, - blockHeaderResult.blockHeader) + headerWithHeights) f.map(_ => wallet) } } diff --git a/app/server/src/main/scala/org/bitcoins/server/ChainRoutes.scala b/app/server/src/main/scala/org/bitcoins/server/ChainRoutes.scala index ccdfebfa34..5038234293 100644 --- a/app/server/src/main/scala/org/bitcoins/server/ChainRoutes.scala +++ b/app/server/src/main/scala/org/bitcoins/server/ChainRoutes.scala @@ -51,11 +51,12 @@ case class ChainRoutes(chain: ChainApi, network: BitcoinNetwork)(implicit chain.getHeader(hash).flatMap { case None => Future.successful(Server.httpSuccess(ujson.Null)) case Some(_) => - val resultF = ChainUtil.getBlockHeaderResult(hash, chain) + val resultsF = + ChainUtil.getBlockHeaderResult(Vector(hash), chain) for { - result <- resultF + results <- resultsF } yield { - val json = upickle.default.writeJs(result)( + val json = upickle.default.writeJs(results.head)( Picklers.getBlockHeaderResultPickler) Server.httpSuccess(json) } diff --git a/app/server/src/main/scala/org/bitcoins/server/util/ChainUtil.scala b/app/server/src/main/scala/org/bitcoins/server/util/ChainUtil.scala index 3683ddf663..95cc7d24fc 100644 --- a/app/server/src/main/scala/org/bitcoins/server/util/ChainUtil.scala +++ b/app/server/src/main/scala/org/bitcoins/server/util/ChainUtil.scala @@ -10,20 +10,27 @@ import scala.concurrent.{ExecutionContext, Future} object ChainUtil { - def getBlockHeaderResult(hash: DoubleSha256DigestBE, chain: ChainApi)(implicit - ec: ExecutionContext): Future[GetBlockHeaderResult] = { - val headerOptF = chain.getHeader(hash) - val confsOptF = chain.getNumberOfConfirmations(hash) - for { - headerOpt <- headerOptF - confsOpt <- confsOptF + def getBlockHeaderResult( + hashes: Vector[DoubleSha256DigestBE], + chain: ChainApi)(implicit + ec: ExecutionContext): Future[Vector[GetBlockHeaderResult]] = { + val headersF: Future[Vector[Option[BlockHeaderDb]]] = + chain.getHeaders(hashes) + val bestHeightF = chain.getBestBlockHeader().map(_.height) + val headersWithConfsF: Future[Vector[Option[(BlockHeaderDb, Int)]]] = for { + headers <- headersF + bestHeight <- bestHeightF } yield { - val zipped: Option[(BlockHeaderDb, Int)] = - headerOpt.zip(confsOpt).headOption - zipped match { + headers.map(hOpt => hOpt.map(h => (h, bestHeight - h.height))) + } + + for { + headersWithConfs <- headersWithConfsF + } yield { + headersWithConfs.map { case None => sys.error( - s"Could not find block header hash=$hash or confirmations for the header ") + s"Could not find block header or confirmations for the header ") case Some((header, confs)) => val chainworkStr = { val bytes = ByteVector(header.chainWork.toByteArray) diff --git a/app/server/src/main/scala/org/bitcoins/server/util/WebsocketUtil.scala b/app/server/src/main/scala/org/bitcoins/server/util/WebsocketUtil.scala index 44be4facd4..01aaef3ec0 100644 --- a/app/server/src/main/scala/org/bitcoins/server/util/WebsocketUtil.scala +++ b/app/server/src/main/scala/org/bitcoins/server/util/WebsocketUtil.scala @@ -11,16 +11,13 @@ import org.bitcoins.commons.jsonmodels.ws.{ } import org.bitcoins.commons.serializers.WsPicklers import org.bitcoins.core.api.chain.ChainApi +import org.bitcoins.core.protocol.blockchain.BlockHeader import org.bitcoins.core.protocol.dlc.models.DLCStatus import org.bitcoins.core.protocol.transaction.Transaction +import org.bitcoins.core.util.FutureUtil +import org.bitcoins.crypto.DoubleSha256DigestBE import org.bitcoins.dlc.wallet.{DLCWalletCallbacks, OnDLCStateChange} -import org.bitcoins.wallet.{ - OnNewAddressGenerated, - OnReservedUtxos, - OnTransactionBroadcast, - OnTransactionProcessed, - WalletCallbacks -} +import org.bitcoins.wallet._ import scala.concurrent.{ExecutionContext, Future} @@ -29,22 +26,33 @@ object WebsocketUtil extends Logging { def buildChainCallbacks( queue: SourceQueueWithComplete[Message], chainApi: ChainApi)(implicit ec: ExecutionContext): ChainCallbacks = { - val onBlockProcessed: OnBlockHeaderConnected = { case (_, header) => - val resultF = - ChainUtil.getBlockHeaderResult(header.hashBE, chainApi) - val f = for { - result <- resultF - notification = - ChainNotification.BlockProcessedNotification(result) - notificationJson = - upickle.default.writeJs(notification)( - WsPicklers.blockProcessedPickler) - msg = TextMessage.Strict(notificationJson.toString()) - _ <- queue.offer(msg) - } yield { - () - } - f + val onBlockProcessed: OnBlockHeaderConnected = { + case headersWithHeight: Vector[(Int, BlockHeader)] => + val hashes: Vector[DoubleSha256DigestBE] = + headersWithHeight.map(_._2.hashBE) + val resultsF = + ChainUtil.getBlockHeaderResult(hashes, chainApi) + val f = for { + results <- resultsF + notifications = + results.map(result => + ChainNotification.BlockProcessedNotification(result)) + notificationsJson = notifications.map { notification => + upickle.default.writeJs(notification)( + WsPicklers.blockProcessedPickler) + } + + msgs = notificationsJson.map(n => TextMessage.Strict(n.toString())) + _ <- FutureUtil.sequentially(msgs) { case msg => + val x: Future[Unit] = queue + .offer(msg) + .map(_ => ()) + x + } + } yield { + () + } + f } ChainCallbacks.onBlockHeaderConnected(onBlockProcessed) diff --git a/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/common/BitcoindRpcClient.scala b/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/common/BitcoindRpcClient.scala index 825b0d420b..2747addc67 100644 --- a/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/common/BitcoindRpcClient.scala +++ b/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/common/BitcoindRpcClient.scala @@ -157,6 +157,15 @@ class BitcoindRpcClient(override val instance: BitcoindInstance)(implicit hash: DoubleSha256DigestBE): Future[Option[BlockHeaderDb]] = getBlockHeader(hash).map(header => Some(header.blockHeaderDb)) + override def getHeaders(hashes: Vector[DoubleSha256DigestBE]): Future[ + Vector[Option[BlockHeaderDb]]] = { + //sends a request for every header, i'm not aware of a way to batch these + val resultsNested: Vector[Future[Option[BlockHeaderDb]]] = + hashes.map(getHeader) + Future + .sequence(resultsNested) + } + override def getHeadersBetween( from: BlockHeaderDb, to: BlockHeaderDb): Future[Vector[BlockHeaderDb]] = { diff --git a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala index c4730607e6..34ebae3e11 100644 --- a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala +++ b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala @@ -672,10 +672,12 @@ class ChainHandlerTest extends ChainDbUnitTest { chainHandler: ChainHandler => val resultP: Promise[Boolean] = Promise() - val callback: OnBlockHeaderConnected = (_: Int, _: BlockHeader) => { - Future { - resultP.success(true) - () + val callback: OnBlockHeaderConnected = { + case _: Vector[(Int, BlockHeader)] => { + Future { + resultP.success(true) + () + } } } diff --git a/chain/src/main/scala/org/bitcoins/chain/ChainCallbacks.scala b/chain/src/main/scala/org/bitcoins/chain/ChainCallbacks.scala index d1f979aed1..f4b37c955a 100644 --- a/chain/src/main/scala/org/bitcoins/chain/ChainCallbacks.scala +++ b/chain/src/main/scala/org/bitcoins/chain/ChainCallbacks.scala @@ -1,7 +1,7 @@ package org.bitcoins.chain import grizzled.slf4j.Logger -import org.bitcoins.core.api.{Callback2, CallbackHandler} +import org.bitcoins.core.api.{Callback, CallbackHandler} import org.bitcoins.core.protocol.blockchain.BlockHeader import scala.concurrent.{ExecutionContext, Future} @@ -9,18 +9,18 @@ import scala.concurrent.{ExecutionContext, Future} trait ChainCallbacks { def onBlockHeaderConnected: CallbackHandler[ - (Int, BlockHeader), + Vector[(Int, BlockHeader)], OnBlockHeaderConnected] def +(other: ChainCallbacks): ChainCallbacks def executeOnBlockHeaderConnectedCallbacks( logger: Logger, - height: Int, - header: BlockHeader)(implicit ec: ExecutionContext): Future[Unit] = { + heightHeaderTuple: Vector[(Int, BlockHeader)])(implicit + ec: ExecutionContext): Future[Unit] = { onBlockHeaderConnected.execute( - (height, header), + heightHeaderTuple, (err: Throwable) => logger.error( s"${onBlockHeaderConnected.name} Callback failed with error: ", @@ -30,13 +30,13 @@ trait ChainCallbacks { } /** Callback for handling a received block header */ -trait OnBlockHeaderConnected extends Callback2[Int, BlockHeader] +trait OnBlockHeaderConnected extends Callback[Vector[(Int, BlockHeader)]] object ChainCallbacks { private case class ChainCallbacksImpl( onBlockHeaderConnected: CallbackHandler[ - (Int, BlockHeader), + Vector[(Int, BlockHeader)], OnBlockHeaderConnected]) extends ChainCallbacks { @@ -56,7 +56,7 @@ object ChainCallbacks { onBlockHeaderConnected: Vector[OnBlockHeaderConnected] = Vector.empty): ChainCallbacks = ChainCallbacksImpl(onBlockHeaderConnected = - CallbackHandler[(Int, BlockHeader), OnBlockHeaderConnected]( + CallbackHandler[Vector[(Int, BlockHeader)], OnBlockHeaderConnected]( "onBlockHeaderConnected", onBlockHeaderConnected)) } diff --git a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala index b865a47f4a..f9517ec225 100644 --- a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala +++ b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala @@ -98,15 +98,12 @@ class ChainHandler( /** @inheritdoc */ override def getHeader( hash: DoubleSha256DigestBE): Future[Option[BlockHeaderDb]] = { - blockHeaderDAO.findByHash(hash).map { header => - logger.debug(s"Looking for header by hash=$hash") - val resultStr = header - .map(h => - s"height=${h.height}, hash=${h.hashBE}, chain work=${h.chainWork}") - .getOrElse("None") - logger.debug(s"getHeader result: $resultStr") - header - } + getHeaders(Vector(hash)).map(_.head) + } + + override def getHeaders(hashes: Vector[DoubleSha256DigestBE]): Future[ + Vector[Option[BlockHeaderDb]]] = { + blockHeaderDAO.findByHashes(hashes) } protected def processHeadersWithChains( @@ -129,7 +126,7 @@ class ChainHandler( val successfullyValidatedHeaders = blockchainUpdates .flatMap(_.successfulHeaders) - val headersToBeCreated = { + val headersToBeCreated: Vector[BlockHeaderDb] = { // During reorgs, we can be sent a header twice successfullyValidatedHeaders.distinct } @@ -153,18 +150,13 @@ class ChainHandler( createdF.map { headers => if (chainConfig.chainCallbacks.onBlockHeaderConnected.nonEmpty) { - headersToBeCreated.reverseIterator.foldLeft(Future.unit) { - (acc, header) => - for { - _ <- acc - _ <- - chainConfig.chainCallbacks - .executeOnBlockHeaderConnectedCallbacks( - logger, - header.height, - header.blockHeader) - } yield () - } + val headersWithHeight: Vector[(Int, BlockHeader)] = { + headersToBeCreated.reverseIterator.map(h => + (h.height, h.blockHeader)) + }.toVector + + chainConfig.chainCallbacks + .executeOnBlockHeaderConnectedCallbacks(logger, headersWithHeight) } chains.foreach { c => logger.info( diff --git a/chain/src/main/scala/org/bitcoins/chain/models/BlockHeaderDAO.scala b/chain/src/main/scala/org/bitcoins/chain/models/BlockHeaderDAO.scala index cdba36111c..a9e4592f36 100644 --- a/chain/src/main/scala/org/bitcoins/chain/models/BlockHeaderDAO.scala +++ b/chain/src/main/scala/org/bitcoins/chain/models/BlockHeaderDAO.scala @@ -52,6 +52,21 @@ case class BlockHeaderDAO()(implicit safeDatabase.runVec(query).map(_.headOption) } + /** Finds the block headers associated with the hashes. Returns None if we could not find a particular + * hash in the database + */ + def findByHashes(hashes: Vector[DoubleSha256DigestBE]): Future[ + Vector[Option[BlockHeaderDb]]] = { + val query = findByPrimaryKeys(hashes) + val resultsF: Future[Vector[BlockHeaderDb]] = + safeDatabase.runVec(query.result.transactionally) + for { + results <- resultsF + } yield { + hashes.map(h => results.find(_.blockHeader.hashBE == h)) + } + } + override def findByPrimaryKeys(hashes: Vector[DoubleSha256DigestBE]): Query[ BlockHeaderTable, BlockHeaderDb, diff --git a/core/src/main/scala/org/bitcoins/core/api/chain/ChainApi.scala b/core/src/main/scala/org/bitcoins/core/api/chain/ChainApi.scala index ac3996a6f8..ff7475181a 100644 --- a/core/src/main/scala/org/bitcoins/core/api/chain/ChainApi.scala +++ b/core/src/main/scala/org/bitcoins/core/api/chain/ChainApi.scala @@ -40,6 +40,9 @@ trait ChainApi extends ChainQueryApi { /** Gets a [[org.bitcoins.core.api.chain.db.BlockHeaderDb]] from the chain's database */ def getHeader(hash: DoubleSha256DigestBE): Future[Option[BlockHeaderDb]] + def getHeaders(hashes: Vector[DoubleSha256DigestBE]): Future[ + Vector[Option[BlockHeaderDb]]] + /** Gets all [[org.bitcoins.core.api.chain.db.BlockHeaderDb]]s at a given height */ def getHeadersAtHeight(height: Int): Future[Vector[BlockHeaderDb]] diff --git a/core/src/main/scala/org/bitcoins/core/api/chain/ChainQueryApi.scala b/core/src/main/scala/org/bitcoins/core/api/chain/ChainQueryApi.scala index 35aaf38723..1c49ac7d80 100644 --- a/core/src/main/scala/org/bitcoins/core/api/chain/ChainQueryApi.scala +++ b/core/src/main/scala/org/bitcoins/core/api/chain/ChainQueryApi.scala @@ -26,7 +26,7 @@ trait ChainQueryApi { /** Gets number of confirmations for the given block hash */ def getNumberOfConfirmations( - blockHashOpt: DoubleSha256DigestBE): Future[Option[Int]] + blockHash: DoubleSha256DigestBE): Future[Option[Int]] /** Gets the number of compact filters in the database */ def getFilterCount(): Future[Int] diff --git a/testkit/src/main/scala/org/bitcoins/testkit/node/BaseNodeTest.scala b/testkit/src/main/scala/org/bitcoins/testkit/node/BaseNodeTest.scala index cb70a59028..8e8b167eb0 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/node/BaseNodeTest.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/node/BaseNodeTest.scala @@ -80,6 +80,11 @@ trait BaseNodeTest extends BitcoinSFixture with EmbeddedPg { hash: DoubleSha256DigestBE): Future[Option[BlockHeaderDb]] = Future.successful(None) + override def getHeaders(hashes: Vector[DoubleSha256DigestBE]): Future[ + Vector[Option[BlockHeaderDb]]] = { + Future.successful(Vector.fill(hashes.length)(None)) + } + override def getHeadersAtHeight( height: Int): Future[Vector[BlockHeaderDb]] = Future.successful(Vector.empty)