2020 07 25 optimize recalc chainwork (#1697)

* Optimize recalc Chainwork

* Typo and warning fixes

* Rename force recalc cli option

* Always update gensis header

* Add config option

* Fix config option

* Add to example config

Co-authored-by: Ben Carman <benthecarman@live.com>
This commit is contained in:
Chris Stewart 2020-07-31 10:43:04 -05:00 committed by GitHub
parent ce3667857d
commit d39613e9b0
6 changed files with 192 additions and 76 deletions

View File

@ -3,6 +3,7 @@ package org.bitcoins.server
import java.nio.file.Paths import java.nio.file.Paths
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.dispatch.Dispatchers
import akka.http.scaladsl.Http import akka.http.scaladsl.Http
import org.bitcoins.chain.api.ChainApi import org.bitcoins.chain.api.ChainApi
import org.bitcoins.chain.blockchain.ChainHandler import org.bitcoins.chain.blockchain.ChainHandler
@ -53,7 +54,7 @@ object Main extends App with BitcoinSLogger {
} }
val forceChainWorkRecalc: Boolean = val forceChainWorkRecalc: Boolean =
argsWithIndex.exists(_._1.toLowerCase == "--force-recalc") args.exists(_.toLowerCase == "--force-recalc-chainwork")
val logger = HttpLoggerImpl(conf.nodeConf).getLogger val logger = HttpLoggerImpl(conf.nodeConf).getLogger
@ -79,7 +80,7 @@ object Main extends App with BitcoinSLogger {
//run chain work migration //run chain work migration
val chainApiF = configInitializedF.flatMap { _ => val chainApiF = configInitializedF.flatMap { _ =>
runChainWorkCalc(forceChainWorkRecalc) runChainWorkCalc(forceChainWorkRecalc || chainConf.forceRecalcChainWork)
} }
//get a node that isn't started //get a node that isn't started
@ -219,11 +220,15 @@ object Main extends App with BitcoinSLogger {
/** This is needed for migrations V2/V3 on the chain project to re-calculate the total work for the chain */ /** This is needed for migrations V2/V3 on the chain project to re-calculate the total work for the chain */
private def runChainWorkCalc(force: Boolean)(implicit private def runChainWorkCalc(force: Boolean)(implicit
chainAppConfig: ChainAppConfig, chainAppConfig: ChainAppConfig,
ec: ExecutionContext): Future[ChainApi] = { system: ActorSystem): Future[ChainApi] = {
import system.dispatcher
val blockEC =
system.dispatchers.lookup(Dispatchers.DefaultBlockingDispatcherId)
for { for {
chainApi <- ChainHandler.fromDatabase(blockHeaderDAO = BlockHeaderDAO(), chainApi <- ChainHandler.fromDatabase(
CompactFilterHeaderDAO(), blockHeaderDAO = BlockHeaderDAO()(blockEC, chainAppConfig),
CompactFilterDAO()) CompactFilterHeaderDAO()(blockEC, chainAppConfig),
CompactFilterDAO()(blockEC, chainAppConfig))
isMissingChainWork <- chainApi.isMissingChainWork isMissingChainWork <- chainApi.isMissingChainWork
chainApiWithWork <- chainApiWithWork <-
if (isMissingChainWork || force) { if (isMissingChainWork || force) {

View File

@ -18,6 +18,7 @@ import org.bitcoins.crypto.{
DoubleSha256DigestBE DoubleSha256DigestBE
} }
import scala.annotation.tailrec
import scala.concurrent._ import scala.concurrent._
/** /**
@ -502,87 +503,180 @@ case class ChainHandler(
} yield isMissingWork } yield isMissingWork
} }
@tailrec
private def calcChainWork(
remainingHeaders: Vector[BlockHeaderDb],
accum: Vector[BlockHeaderDb],
lastHeaderWithWorkInDb: BlockHeaderDb): Vector[BlockHeaderDb] = {
if (remainingHeaders.isEmpty) {
accum
} else {
val header = remainingHeaders.head
val currentChainWork = {
accum.lastOption.map(_.chainWork) match {
case Some(prevWork) =>
prevWork
case None =>
// this should be the case where the accum is
//empty, so this header is the last one we have
//stored in the database
lastHeaderWithWorkInDb.chainWork
}
}
val newChainWork =
currentChainWork + Pow.getBlockProof(header.blockHeader)
val newHeader = header.copy(chainWork = newChainWork)
calcChainWork(remainingHeaders.tail,
accum :+ newHeader,
lastHeaderWithWorkInDb)
}
}
private def getBatchForRecalc(
startHeight: Int,
maxHeight: Int,
batchSize: Int): Future[Vector[Blockchain]] = {
val batchEndHeight = Math.min(maxHeight, startHeight + batchSize - 1)
val headersToCalcF = {
logger.trace(s"Fetching from=$startHeight to=$batchEndHeight")
blockHeaderDAO.getBlockchainsBetweenHeights(from = startHeight,
to = batchEndHeight)
}
headersToCalcF
}
/** Creates [[numBatches]] of requests to the database fetching [[batchSize]] headers
* starting at [[batchStartHeight]]. These are executed in parallel. After all are fetched
* we join them into one future and return it. */
private def batchAndGetBlockchains(
batchSize: Int,
batchStartHeight: Int,
maxHeight: Int,
numBatches: Int): Future[Vector[Blockchain]] = {
var counter = batchStartHeight
val range = 0.until(numBatches)
val batchesNested: Vector[Future[Vector[Blockchain]]] = range.map { _ =>
val f =
if (counter <= maxHeight) {
getBatchForRecalc(startHeight = counter,
maxHeight = maxHeight,
batchSize = batchSize)
} else {
Future.successful(Vector.empty)
}
counter += batchSize
f
}.toVector
Future
.sequence(batchesNested)
.map(_.flatten)
}
private def runRecalculateChainWork(
maxHeight: Int,
lastHeader: BlockHeaderDb): Future[Vector[BlockHeaderDb]] = {
val currentHeight = lastHeader.height
val numBatches = 1
val batchSize =
chainConfig.appConfig.chain.difficultyChangeInterval / numBatches
if (currentHeight >= maxHeight) {
Future.successful(Vector.empty)
} else {
val batchStartHeight = currentHeight + 1
val headersToCalcF = batchAndGetBlockchains(
batchSize = batchSize,
batchStartHeight = batchStartHeight,
maxHeight = maxHeight,
numBatches = numBatches
)
for {
headersToCalc <- headersToCalcF
_ = headersToCalc.headOption.map { h =>
logger.info(
s"Recalculating chain work... current height: ${h.height} maxHeight=$maxHeight")
}
headersWithWork = {
headersToCalc.flatMap { chain =>
calcChainWork(remainingHeaders = chain.headers.sortBy(_.height),
accum = Vector.empty,
lastHeaderWithWorkInDb = lastHeader)
}
}
//unfortunately on sqlite there is a bottle neck here
//sqlite allows you to read in parallel but only write
//sequentially https://stackoverflow.com/a/23350768/967713
//so while it looks like we are executing in parallel
//in reality there is only one thread that can write to the db
//at a single time
_ = logger.trace(
s"Upserting from height=${headersWithWork.headOption.map(_.height)} " +
s"to height=${headersWithWork.lastOption.map(_.height)}")
_ <- FutureUtil.batchExecute(
headersWithWork,
blockHeaderDAO.upsertAll,
Vector.empty,
batchSize
)
_ = logger.trace(
s"Done upserting from height=${headersWithWork.headOption.map(
_.height)} to height=${headersWithWork.lastOption.map(_.height)}")
next <- runRecalculateChainWork(maxHeight, headersWithWork.last)
} yield {
next
}
}
}
def recalculateChainWork: Future[ChainHandler] = { def recalculateChainWork: Future[ChainHandler] = {
logger.info("Calculating chain work for previous blocks") logger.info("Calculating chain work for previous blocks")
val batchSize = chainConfig.chain.difficultyChangeInterval val maxHeightF = blockHeaderDAO.maxHeight
val startHeightF = blockHeaderDAO.getLowestNoWorkHeight
def loop( val startF = for {
remainingHeaders: Vector[BlockHeaderDb], startHeight <- startHeightF
accum: Vector[BlockHeaderDb]): Future[Vector[BlockHeaderDb]] = { headers <- {
if (remainingHeaders.isEmpty) {
blockHeaderDAO.upsertAll(accum.takeRight(batchSize))
} else {
val header = remainingHeaders.head
val currentChainWork =
accum.lastOption.map(_.chainWork).getOrElse(BigInt(0))
val newChainWork =
currentChainWork + Pow.getBlockProof(header.blockHeader)
val newHeader = header.copy(chainWork = newChainWork)
// Add the last batch to the database and create log
if (header.height % batchSize == 0) {
logger.info(
s"Recalculating chain work... current height: ${header.height}")
val updated = accum :+ newHeader
// updated the latest batch
blockHeaderDAO
.upsertAll(updated.takeRight(batchSize))
.flatMap(_ =>
loop(
remainingHeaders.tail,
updated.takeRight(batchSize)
))
} else {
loop(remainingHeaders.tail, accum :+ newHeader)
}
}
}
def loop2(
maxHeight: Int,
accum: Vector[BlockHeaderDb]): Future[Vector[BlockHeaderDb]] = {
val highestHeaderOpt =
if (accum.isEmpty) None else Some(accum.maxBy(_.height))
val currentHeight = highestHeaderOpt.map(_.height).getOrElse(0)
if (currentHeight >= maxHeight) {
Future.successful(accum)
} else {
val (batchStartHeight, prev) = if (currentHeight == 0) {
(0, Vector.empty)
} else {
(currentHeight + 1, Vector(highestHeaderOpt).flatten)
}
val batchEndHeight = Math.min(maxHeight, currentHeight + batchSize)
for {
headersToCalc <-
blockHeaderDAO.getBetweenHeights(batchStartHeight, batchEndHeight)
sortedHeaders = headersToCalc.sortBy(_.height)
headersWithWork <- loop(sortedHeaders, prev)
next <- loop2(maxHeight, headersWithWork)
} yield next
}
}
for {
maxHeight <- blockHeaderDAO.maxHeight
startHeight <- blockHeaderDAO.getLowestNoWorkHeight
start <-
if (startHeight == 0) { if (startHeight == 0) {
Future.successful(Vector.empty) val genesisHeaderF = blockHeaderDAO.getAtHeight(0)
genesisHeaderF.flatMap { h =>
require(h.length == 1, s"Should only have one genesis header!")
calculateChainWorkGenesisBlock(h.head)
.map(Vector(_))
}
} else { } else {
blockHeaderDAO.getAtHeight(startHeight - 1) blockHeaderDAO.getAtHeight(startHeight - 1)
} }
_ <- loop2(maxHeight, start) }
} yield headers
val resultF = for {
maxHeight <- maxHeightF
start <- startF
_ <- runRecalculateChainWork(maxHeight, start.head)
newBlockchains <- blockHeaderDAO.getBlockchains() newBlockchains <- blockHeaderDAO.getBlockchains()
} yield { } yield {
logger.info("Finished calculating chain work") logger.info("Finished calculating chain work")
this.copy(blockchains = newBlockchains) this.copy(blockchains = newBlockchains)
} }
resultF.failed.foreach { err =>
logger.error(s"Failed to recalculate chain work", err)
}
resultF
}
/** Calculates the chain work for the genesis header */
private def calculateChainWorkGenesisBlock(
genesisHeader: BlockHeaderDb): Future[BlockHeaderDb] = {
val expectedWork = Pow.getBlockProof(genesisHeader.blockHeader)
val genesisWithWork = genesisHeader.copy(chainWork = expectedWork)
blockHeaderDAO.update(genesisWithWork)
} }
} }

View File

@ -98,6 +98,10 @@ case class ChainAppConfig(
lazy val filterBatchSize: Int = lazy val filterBatchSize: Int =
config.getInt(s"${moduleName}.neutrino.filter-batch-size") config.getInt(s"${moduleName}.neutrino.filter-batch-size")
lazy val forceRecalcChainWork: Boolean =
config.getBooleanOrElse(s"$moduleName.force-recalc-chainwork",
default = false)
/** Starts the associated application */ /** Starts the associated application */
override def start(): Future[Unit] = FutureUtil.unit override def start(): Future[Unit] = FutureUtil.unit
} }

View File

@ -64,4 +64,15 @@ object FutureUtil {
} }
} yield batchExecution } yield batchExecution
} }
/** Batches the [[elements]] by [[batchSize]] and then calls [[f]] on them in parallel */
def batchAndParallelExecute[T, U](
elements: Vector[T],
f: Vector[T] => Future[U],
batchSize: Int)(implicit ec: ExecutionContext): Future[Vector[U]] = {
val batches = elements.grouped(batchSize).toVector
val execute: Vector[Future[U]] = batches.map(b => f(b))
val doneF = Future.sequence(execute)
doneF
}
} }

View File

@ -217,9 +217,10 @@ case class SafeDatabase(jdbcProfile: JdbcProfileComponent[AppConfig])
*/ */
def runVec[R](action: DBIOAction[Seq[R], NoStream, _])(implicit def runVec[R](action: DBIOAction[Seq[R], NoStream, _])(implicit
ec: ExecutionContext): Future[Vector[R]] = { ec: ExecutionContext): Future[Vector[R]] = {
val result = val result = scala.concurrent.blocking {
if (sqlite) database.run[Seq[R]](foreignKeysPragma >> action) if (sqlite) database.run[Seq[R]](foreignKeysPragma >> action)
else database.run[Seq[R]](action) else database.run[Seq[R]](action)
}
result.map(_.toVector).recoverWith { logAndThrowError(action) } result.map(_.toVector).recoverWith { logAndThrowError(action) }
} }
} }

View File

@ -122,6 +122,7 @@ bitcoin-s {
} }
chain { chain {
force-recalc-chainwork = false
neutrino { neutrino {
filter-header-batch-size = 2000 filter-header-batch-size = 2000
filter-header-batch-size.regtest = 10 filter-header-batch-size.regtest = 10