Implement batching of database calls for our chain callback (#4003)

This commit is contained in:
Chris Stewart 2022-01-23 07:39:10 -06:00 committed by GitHub
parent 42d6955f79
commit 7ee1f0f406
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 127 additions and 80 deletions

View File

@ -251,13 +251,16 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory {
(mockChainApi
.getHeader(_: DoubleSha256DigestBE))
.expects(blockHeader.hashBE)
.twice()
.returning(Future.successful(Some(blockHeaderDb)))
(mockChainApi.getBestBlockHeader: () => Future[BlockHeaderDb])
.expects()
.returning(Future.successful(blockHeaderDb))
(mockChainApi
.getNumberOfConfirmations(_: DoubleSha256DigestBE))
.expects(blockHeader.hashBE)
.returning(Future.successful(Some(1)))
.getHeaders(_: Vector[DoubleSha256DigestBE]))
.expects(Vector(blockHeader.hashBE))
.returning(Future.successful(Vector(Some(blockHeaderDb))))
val route =
chainRoutes.handleCommand(
@ -266,7 +269,7 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory {
Get() ~> route ~> check {
assert(contentType == `application/json`)
assert(responseAs[
String] == s"""{"result":{"raw":"${blockHeader.hex}","hash":"${blockHeader.hashBE.hex}","confirmations":1,"height":1899697,"version":${blockHeader.version.toLong},"versionHex":"${blockHeader.version.hex}","merkleroot":"${blockHeader.merkleRootHashBE.hex}","time":${blockHeader.time.toLong},"mediantime":${blockHeaderDb.time.toLong},"nonce":${blockHeader.nonce.toLong},"bits":"${blockHeader.nBits.hex}","difficulty":${blockHeader.difficulty.toDouble},"chainwork":"$chainworkStr","previousblockhash":"${blockHeader.previousBlockHashBE.hex}","nextblockhash":null},"error":null}""")
String] == s"""{"result":{"raw":"${blockHeader.hex}","hash":"${blockHeader.hashBE.hex}","confirmations":0,"height":1899697,"version":${blockHeader.version.toLong},"versionHex":"${blockHeader.version.hex}","merkleroot":"${blockHeader.merkleRootHashBE.hex}","time":${blockHeader.time.toLong},"mediantime":${blockHeaderDb.time.toLong},"nonce":${blockHeader.nonce.toLong},"bits":"${blockHeader.nBits.hex}","difficulty":${blockHeader.difficulty.toDouble},"chainwork":"$chainworkStr","previousblockhash":"${blockHeader.previousBlockHashBE.hex}","nextblockhash":null},"error":null}""")
}
}

View File

@ -258,13 +258,15 @@ object BitcoindRpcBackendUtil extends Logging {
val executeCallbackF: Future[Wallet] = blockProcessedF.flatMap {
wallet =>
chainCallbacksOpt match {
case None => Future.successful(wallet)
case None => Future.successful(wallet)
case Some(callback) =>
//this can be slow as we aren't batching headers at all
val headerWithHeights =
Vector((blockHeaderResult.height, block.blockHeader))
val f = callback
.executeOnBlockHeaderConnectedCallbacks(
logger,
blockHeaderResult.height,
blockHeaderResult.blockHeader)
headerWithHeights)
f.map(_ => wallet)
}
}

View File

@ -51,11 +51,12 @@ case class ChainRoutes(chain: ChainApi, network: BitcoinNetwork)(implicit
chain.getHeader(hash).flatMap {
case None => Future.successful(Server.httpSuccess(ujson.Null))
case Some(_) =>
val resultF = ChainUtil.getBlockHeaderResult(hash, chain)
val resultsF =
ChainUtil.getBlockHeaderResult(Vector(hash), chain)
for {
result <- resultF
results <- resultsF
} yield {
val json = upickle.default.writeJs(result)(
val json = upickle.default.writeJs(results.head)(
Picklers.getBlockHeaderResultPickler)
Server.httpSuccess(json)
}

View File

@ -10,20 +10,27 @@ import scala.concurrent.{ExecutionContext, Future}
object ChainUtil {
def getBlockHeaderResult(hash: DoubleSha256DigestBE, chain: ChainApi)(implicit
ec: ExecutionContext): Future[GetBlockHeaderResult] = {
val headerOptF = chain.getHeader(hash)
val confsOptF = chain.getNumberOfConfirmations(hash)
for {
headerOpt <- headerOptF
confsOpt <- confsOptF
def getBlockHeaderResult(
hashes: Vector[DoubleSha256DigestBE],
chain: ChainApi)(implicit
ec: ExecutionContext): Future[Vector[GetBlockHeaderResult]] = {
val headersF: Future[Vector[Option[BlockHeaderDb]]] =
chain.getHeaders(hashes)
val bestHeightF = chain.getBestBlockHeader().map(_.height)
val headersWithConfsF: Future[Vector[Option[(BlockHeaderDb, Int)]]] = for {
headers <- headersF
bestHeight <- bestHeightF
} yield {
val zipped: Option[(BlockHeaderDb, Int)] =
headerOpt.zip(confsOpt).headOption
zipped match {
headers.map(hOpt => hOpt.map(h => (h, bestHeight - h.height)))
}
for {
headersWithConfs <- headersWithConfsF
} yield {
headersWithConfs.map {
case None =>
sys.error(
s"Could not find block header hash=$hash or confirmations for the header ")
s"Could not find block header or confirmations for the header ")
case Some((header, confs)) =>
val chainworkStr = {
val bytes = ByteVector(header.chainWork.toByteArray)

View File

@ -11,16 +11,13 @@ import org.bitcoins.commons.jsonmodels.ws.{
}
import org.bitcoins.commons.serializers.WsPicklers
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.core.protocol.dlc.models.DLCStatus
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.crypto.DoubleSha256DigestBE
import org.bitcoins.dlc.wallet.{DLCWalletCallbacks, OnDLCStateChange}
import org.bitcoins.wallet.{
OnNewAddressGenerated,
OnReservedUtxos,
OnTransactionBroadcast,
OnTransactionProcessed,
WalletCallbacks
}
import org.bitcoins.wallet._
import scala.concurrent.{ExecutionContext, Future}
@ -29,22 +26,33 @@ object WebsocketUtil extends Logging {
def buildChainCallbacks(
queue: SourceQueueWithComplete[Message],
chainApi: ChainApi)(implicit ec: ExecutionContext): ChainCallbacks = {
val onBlockProcessed: OnBlockHeaderConnected = { case (_, header) =>
val resultF =
ChainUtil.getBlockHeaderResult(header.hashBE, chainApi)
val f = for {
result <- resultF
notification =
ChainNotification.BlockProcessedNotification(result)
notificationJson =
upickle.default.writeJs(notification)(
WsPicklers.blockProcessedPickler)
msg = TextMessage.Strict(notificationJson.toString())
_ <- queue.offer(msg)
} yield {
()
}
f
val onBlockProcessed: OnBlockHeaderConnected = {
case headersWithHeight: Vector[(Int, BlockHeader)] =>
val hashes: Vector[DoubleSha256DigestBE] =
headersWithHeight.map(_._2.hashBE)
val resultsF =
ChainUtil.getBlockHeaderResult(hashes, chainApi)
val f = for {
results <- resultsF
notifications =
results.map(result =>
ChainNotification.BlockProcessedNotification(result))
notificationsJson = notifications.map { notification =>
upickle.default.writeJs(notification)(
WsPicklers.blockProcessedPickler)
}
msgs = notificationsJson.map(n => TextMessage.Strict(n.toString()))
_ <- FutureUtil.sequentially(msgs) { case msg =>
val x: Future[Unit] = queue
.offer(msg)
.map(_ => ())
x
}
} yield {
()
}
f
}
ChainCallbacks.onBlockHeaderConnected(onBlockProcessed)

View File

@ -157,6 +157,15 @@ class BitcoindRpcClient(override val instance: BitcoindInstance)(implicit
hash: DoubleSha256DigestBE): Future[Option[BlockHeaderDb]] =
getBlockHeader(hash).map(header => Some(header.blockHeaderDb))
override def getHeaders(hashes: Vector[DoubleSha256DigestBE]): Future[
Vector[Option[BlockHeaderDb]]] = {
//sends a request for every header, i'm not aware of a way to batch these
val resultsNested: Vector[Future[Option[BlockHeaderDb]]] =
hashes.map(getHeader)
Future
.sequence(resultsNested)
}
override def getHeadersBetween(
from: BlockHeaderDb,
to: BlockHeaderDb): Future[Vector[BlockHeaderDb]] = {

View File

@ -672,10 +672,12 @@ class ChainHandlerTest extends ChainDbUnitTest {
chainHandler: ChainHandler =>
val resultP: Promise[Boolean] = Promise()
val callback: OnBlockHeaderConnected = (_: Int, _: BlockHeader) => {
Future {
resultP.success(true)
()
val callback: OnBlockHeaderConnected = {
case _: Vector[(Int, BlockHeader)] => {
Future {
resultP.success(true)
()
}
}
}

View File

@ -1,7 +1,7 @@
package org.bitcoins.chain
import grizzled.slf4j.Logger
import org.bitcoins.core.api.{Callback2, CallbackHandler}
import org.bitcoins.core.api.{Callback, CallbackHandler}
import org.bitcoins.core.protocol.blockchain.BlockHeader
import scala.concurrent.{ExecutionContext, Future}
@ -9,18 +9,18 @@ import scala.concurrent.{ExecutionContext, Future}
trait ChainCallbacks {
def onBlockHeaderConnected: CallbackHandler[
(Int, BlockHeader),
Vector[(Int, BlockHeader)],
OnBlockHeaderConnected]
def +(other: ChainCallbacks): ChainCallbacks
def executeOnBlockHeaderConnectedCallbacks(
logger: Logger,
height: Int,
header: BlockHeader)(implicit ec: ExecutionContext): Future[Unit] = {
heightHeaderTuple: Vector[(Int, BlockHeader)])(implicit
ec: ExecutionContext): Future[Unit] = {
onBlockHeaderConnected.execute(
(height, header),
heightHeaderTuple,
(err: Throwable) =>
logger.error(
s"${onBlockHeaderConnected.name} Callback failed with error: ",
@ -30,13 +30,13 @@ trait ChainCallbacks {
}
/** Callback for handling a received block header */
trait OnBlockHeaderConnected extends Callback2[Int, BlockHeader]
trait OnBlockHeaderConnected extends Callback[Vector[(Int, BlockHeader)]]
object ChainCallbacks {
private case class ChainCallbacksImpl(
onBlockHeaderConnected: CallbackHandler[
(Int, BlockHeader),
Vector[(Int, BlockHeader)],
OnBlockHeaderConnected])
extends ChainCallbacks {
@ -56,7 +56,7 @@ object ChainCallbacks {
onBlockHeaderConnected: Vector[OnBlockHeaderConnected] =
Vector.empty): ChainCallbacks =
ChainCallbacksImpl(onBlockHeaderConnected =
CallbackHandler[(Int, BlockHeader), OnBlockHeaderConnected](
CallbackHandler[Vector[(Int, BlockHeader)], OnBlockHeaderConnected](
"onBlockHeaderConnected",
onBlockHeaderConnected))
}

View File

@ -98,15 +98,12 @@ class ChainHandler(
/** @inheritdoc */
override def getHeader(
hash: DoubleSha256DigestBE): Future[Option[BlockHeaderDb]] = {
blockHeaderDAO.findByHash(hash).map { header =>
logger.debug(s"Looking for header by hash=$hash")
val resultStr = header
.map(h =>
s"height=${h.height}, hash=${h.hashBE}, chain work=${h.chainWork}")
.getOrElse("None")
logger.debug(s"getHeader result: $resultStr")
header
}
getHeaders(Vector(hash)).map(_.head)
}
override def getHeaders(hashes: Vector[DoubleSha256DigestBE]): Future[
Vector[Option[BlockHeaderDb]]] = {
blockHeaderDAO.findByHashes(hashes)
}
protected def processHeadersWithChains(
@ -129,7 +126,7 @@ class ChainHandler(
val successfullyValidatedHeaders = blockchainUpdates
.flatMap(_.successfulHeaders)
val headersToBeCreated = {
val headersToBeCreated: Vector[BlockHeaderDb] = {
// During reorgs, we can be sent a header twice
successfullyValidatedHeaders.distinct
}
@ -153,18 +150,13 @@ class ChainHandler(
createdF.map { headers =>
if (chainConfig.chainCallbacks.onBlockHeaderConnected.nonEmpty) {
headersToBeCreated.reverseIterator.foldLeft(Future.unit) {
(acc, header) =>
for {
_ <- acc
_ <-
chainConfig.chainCallbacks
.executeOnBlockHeaderConnectedCallbacks(
logger,
header.height,
header.blockHeader)
} yield ()
}
val headersWithHeight: Vector[(Int, BlockHeader)] = {
headersToBeCreated.reverseIterator.map(h =>
(h.height, h.blockHeader))
}.toVector
chainConfig.chainCallbacks
.executeOnBlockHeaderConnectedCallbacks(logger, headersWithHeight)
}
chains.foreach { c =>
logger.info(

View File

@ -52,6 +52,21 @@ case class BlockHeaderDAO()(implicit
safeDatabase.runVec(query).map(_.headOption)
}
/** Finds the block headers associated with the hashes. Returns None if we could not find a particular
* hash in the database
*/
def findByHashes(hashes: Vector[DoubleSha256DigestBE]): Future[
Vector[Option[BlockHeaderDb]]] = {
val query = findByPrimaryKeys(hashes)
val resultsF: Future[Vector[BlockHeaderDb]] =
safeDatabase.runVec(query.result.transactionally)
for {
results <- resultsF
} yield {
hashes.map(h => results.find(_.blockHeader.hashBE == h))
}
}
override def findByPrimaryKeys(hashes: Vector[DoubleSha256DigestBE]): Query[
BlockHeaderTable,
BlockHeaderDb,

View File

@ -40,6 +40,9 @@ trait ChainApi extends ChainQueryApi {
/** Gets a [[org.bitcoins.core.api.chain.db.BlockHeaderDb]] from the chain's database */
def getHeader(hash: DoubleSha256DigestBE): Future[Option[BlockHeaderDb]]
def getHeaders(hashes: Vector[DoubleSha256DigestBE]): Future[
Vector[Option[BlockHeaderDb]]]
/** Gets all [[org.bitcoins.core.api.chain.db.BlockHeaderDb]]s at a given height */
def getHeadersAtHeight(height: Int): Future[Vector[BlockHeaderDb]]

View File

@ -26,7 +26,7 @@ trait ChainQueryApi {
/** Gets number of confirmations for the given block hash */
def getNumberOfConfirmations(
blockHashOpt: DoubleSha256DigestBE): Future[Option[Int]]
blockHash: DoubleSha256DigestBE): Future[Option[Int]]
/** Gets the number of compact filters in the database */
def getFilterCount(): Future[Int]

View File

@ -80,6 +80,11 @@ trait BaseNodeTest extends BitcoinSFixture with EmbeddedPg {
hash: DoubleSha256DigestBE): Future[Option[BlockHeaderDb]] =
Future.successful(None)
override def getHeaders(hashes: Vector[DoubleSha256DigestBE]): Future[
Vector[Option[BlockHeaderDb]]] = {
Future.successful(Vector.fill(hashes.length)(None))
}
override def getHeadersAtHeight(
height: Int): Future[Vector[BlockHeaderDb]] =
Future.successful(Vector.empty)