Attempt to fix memory leak in recalc chain work (#1535)

This commit is contained in:
Ben Carman 2020-06-11 13:12:22 -05:00 committed by GitHub
parent c54d6dcbdc
commit 5559475534
3 changed files with 61 additions and 33 deletions

View File

@ -531,6 +531,10 @@ class ChainHandlerTest extends ChainDbUnitTest {
headerDb <- newHandler.getBestBlockHeader()
} yield {
assert(headerDb.height == headersWithNoWork.head.height)
assert(
newHandler.blockchains.head
.groupBy(_.hashBE)
.forall(_._2.size == 1))
assert(headerDb.hashBE == headersWithNoWork.head.hashBE)
assert(headerDb.chainWork == BigInt(12885098501L))
}

View File

@ -481,11 +481,11 @@ case class ChainHandler(
}
def recalculateChainWork: Future[ChainHandler] = {
logger.info("Calculating chain work for previous blocks")
logger.info("Calculating chain work for previous blocks")
val batchSize = chainConfig.chain.difficultyChangeInterval
def loop(
currentChainWork: BigInt,
remainingHeaders: Vector[BlockHeaderDb],
accum: Vector[BlockHeaderDb]): Future[Vector[BlockHeaderDb]] = {
if (remainingHeaders.isEmpty) {
@ -493,6 +493,8 @@ case class ChainHandler(
} else {
val header = remainingHeaders.head
val currentChainWork =
accum.lastOption.map(_.chainWork).getOrElse(BigInt(0))
val newChainWork = currentChainWork + Pow.getBlockProof(
header.blockHeader)
val newHeader = header.copy(chainWork = newChainWork)
@ -508,49 +510,57 @@ case class ChainHandler(
.flatMap(
_ =>
loop(
newChainWork,
remainingHeaders.tail,
updated.takeRight(batchSize)
))
} else {
loop(newChainWork, remainingHeaders.tail, accum :+ newHeader)
loop(remainingHeaders.tail, accum :+ newHeader)
}
}
}
def loop2(
maxHeight: Int,
accum: Vector[BlockHeaderDb]): Future[Vector[BlockHeaderDb]] = {
val highestHeaderOpt = accum.maxByOption(_.height)
val currentHeight = highestHeaderOpt.map(_.height).getOrElse(0)
if (currentHeight >= maxHeight) {
Future.successful(accum)
} else {
val batchStartHeight = if (currentHeight == 0) {
0
} else {
currentHeight + 1
}
val batchEndHeight = Math.min(maxHeight, currentHeight + batchSize)
for {
headersToCalc <- blockHeaderDAO.getBetweenHeights(batchStartHeight,
batchEndHeight)
sortedHeaders = headersToCalc.sortBy(_.height)
headersWithWork <- loop(sortedHeaders,
Vector(highestHeaderOpt).flatten)
next <- loop2(maxHeight, headersWithWork)
} yield next
}
}
for {
tips <- blockHeaderDAO.chainTipsByHeight
fullChains <- FutureUtil.sequentially(tips)(tip =>
blockHeaderDAO.getFullBlockchainFrom(tip))
commonHistory = fullChains.foldLeft(fullChains.head.headers)(
(accum, chain) => chain.intersect(accum).toVector)
sortedHeaders = commonHistory.sortBy(_.height)
diffedChains = fullChains.map { blockchain =>
val sortedFullHeaders = blockchain.headers.sortBy(_.height)
// Remove the common blocks
sortedFullHeaders.diff(sortedHeaders)
}
commonWithWork <- loop(BigInt(0), sortedHeaders, Vector.empty)
finalCommon = Vector(sortedHeaders.head) ++ commonWithWork.takeRight(
batchSize)
commonChainWork = finalCommon.lastOption
.map(_.chainWork)
.getOrElse(BigInt(0))
newBlockchains <- FutureUtil.sequentially(diffedChains) { blockchain =>
loop(commonChainWork, blockchain, Vector.empty)
.map { newHeaders =>
val relevantHeaders =
(finalCommon ++ newHeaders).takeRight(
chainConfig.chain.difficultyChangeInterval)
Blockchain(relevantHeaders)
}
maxHeight <- blockHeaderDAO.maxHeight
startHeight <- blockHeaderDAO.getLowestNoWorkHeight
start <- if (startHeight == 0) {
Future.successful(Vector.empty)
} else {
blockHeaderDAO.getAtHeight(startHeight - 1)
}
_ <- loop2(maxHeight, start)
newBlockchains <- blockHeaderDAO.getBlockchains()
} yield {
logger.info("Finished calculating chain work")
this.copy(blockchains = newBlockchains.toVector)
this.copy(blockchains = newBlockchains)
}
}
}

View File

@ -228,6 +228,20 @@ case class BlockHeaderDAO()(
}
}
private val lowestNoWorkQuery: profile.ProfileAction[
Int,
NoStream,
Effect.Read] = {
val noWork =
table.filter(h => h.chainWork === BigInt(0) || h.chainWork == null)
noWork.map(_.height).min.getOrElse(0).result
}
def getLowestNoWorkHeight: Future[Int] = {
val query = lowestNoWorkQuery
database.run(query)
}
/** Returns the maximum block height from our database */
def maxHeight: Future[Int] = {
val query = maxHeightQuery