From 5559475534a7909261343cf0351ca0adf24749a4 Mon Sep 17 00:00:00 2001 From: Ben Carman Date: Thu, 11 Jun 2020 13:12:22 -0500 Subject: [PATCH] Attempt to fix memory leak in recalc chain work (#1535) --- .../chain/blockchain/ChainHandlerTest.scala | 4 + .../chain/blockchain/ChainHandler.scala | 76 +++++++++++-------- .../chain/models/BlockHeaderDAO.scala | 14 ++++ 3 files changed, 61 insertions(+), 33 deletions(-) diff --git a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala index 1bdb6ce62d..f10c180e53 100644 --- a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala +++ b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala @@ -531,6 +531,10 @@ class ChainHandlerTest extends ChainDbUnitTest { headerDb <- newHandler.getBestBlockHeader() } yield { assert(headerDb.height == headersWithNoWork.head.height) + assert( + newHandler.blockchains.head + .groupBy(_.hashBE) + .forall(_._2.size == 1)) assert(headerDb.hashBE == headersWithNoWork.head.hashBE) assert(headerDb.chainWork == BigInt(12885098501L)) } diff --git a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala index c6627aac5a..0313363d24 100644 --- a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala +++ b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala @@ -481,11 +481,11 @@ case class ChainHandler( } def recalculateChainWork: Future[ChainHandler] = { - logger.info("Calculating chain work for previous blocks") + logger.info("Calculating chain work for previous blocks") val batchSize = chainConfig.chain.difficultyChangeInterval + def loop( - currentChainWork: BigInt, remainingHeaders: Vector[BlockHeaderDb], accum: Vector[BlockHeaderDb]): Future[Vector[BlockHeaderDb]] = { if (remainingHeaders.isEmpty) { @@ -493,6 +493,8 @@ case class ChainHandler( } else { val header = remainingHeaders.head + val currentChainWork = + accum.lastOption.map(_.chainWork).getOrElse(BigInt(0)) val newChainWork = currentChainWork + Pow.getBlockProof( header.blockHeader) val newHeader = header.copy(chainWork = newChainWork) @@ -508,49 +510,57 @@ case class ChainHandler( .flatMap( _ => loop( - newChainWork, remainingHeaders.tail, updated.takeRight(batchSize) )) } else { - loop(newChainWork, remainingHeaders.tail, accum :+ newHeader) + loop(remainingHeaders.tail, accum :+ newHeader) } } } + def loop2( + maxHeight: Int, + accum: Vector[BlockHeaderDb]): Future[Vector[BlockHeaderDb]] = { + + val highestHeaderOpt = accum.maxByOption(_.height) + val currentHeight = highestHeaderOpt.map(_.height).getOrElse(0) + + if (currentHeight >= maxHeight) { + Future.successful(accum) + } else { + val batchStartHeight = if (currentHeight == 0) { + 0 + } else { + currentHeight + 1 + } + + val batchEndHeight = Math.min(maxHeight, currentHeight + batchSize) + + for { + headersToCalc <- blockHeaderDAO.getBetweenHeights(batchStartHeight, + batchEndHeight) + sortedHeaders = headersToCalc.sortBy(_.height) + headersWithWork <- loop(sortedHeaders, + Vector(highestHeaderOpt).flatten) + next <- loop2(maxHeight, headersWithWork) + } yield next + } + } + for { - tips <- blockHeaderDAO.chainTipsByHeight - fullChains <- FutureUtil.sequentially(tips)(tip => - blockHeaderDAO.getFullBlockchainFrom(tip)) - - commonHistory = fullChains.foldLeft(fullChains.head.headers)( - (accum, chain) => chain.intersect(accum).toVector) - sortedHeaders = commonHistory.sortBy(_.height) - - diffedChains = fullChains.map { blockchain => - val sortedFullHeaders = blockchain.headers.sortBy(_.height) - // Remove the common blocks - sortedFullHeaders.diff(sortedHeaders) - } - - commonWithWork <- loop(BigInt(0), sortedHeaders, Vector.empty) - finalCommon = Vector(sortedHeaders.head) ++ commonWithWork.takeRight( - batchSize) - commonChainWork = finalCommon.lastOption - .map(_.chainWork) - .getOrElse(BigInt(0)) - newBlockchains <- FutureUtil.sequentially(diffedChains) { blockchain => - loop(commonChainWork, blockchain, Vector.empty) - .map { newHeaders => - val relevantHeaders = - (finalCommon ++ newHeaders).takeRight( - chainConfig.chain.difficultyChangeInterval) - Blockchain(relevantHeaders) - } + maxHeight <- blockHeaderDAO.maxHeight + startHeight <- blockHeaderDAO.getLowestNoWorkHeight + start <- if (startHeight == 0) { + Future.successful(Vector.empty) + } else { + blockHeaderDAO.getAtHeight(startHeight - 1) } + _ <- loop2(maxHeight, start) + newBlockchains <- blockHeaderDAO.getBlockchains() } yield { logger.info("Finished calculating chain work") - this.copy(blockchains = newBlockchains.toVector) + this.copy(blockchains = newBlockchains) } } } diff --git a/chain/src/main/scala/org/bitcoins/chain/models/BlockHeaderDAO.scala b/chain/src/main/scala/org/bitcoins/chain/models/BlockHeaderDAO.scala index 1668a5ab2d..6ca43640d0 100644 --- a/chain/src/main/scala/org/bitcoins/chain/models/BlockHeaderDAO.scala +++ b/chain/src/main/scala/org/bitcoins/chain/models/BlockHeaderDAO.scala @@ -228,6 +228,20 @@ case class BlockHeaderDAO()( } } + private val lowestNoWorkQuery: profile.ProfileAction[ + Int, + NoStream, + Effect.Read] = { + val noWork = + table.filter(h => h.chainWork === BigInt(0) || h.chainWork == null) + noWork.map(_.height).min.getOrElse(0).result + } + + def getLowestNoWorkHeight: Future[Int] = { + val query = lowestNoWorkQuery + database.run(query) + } + /** Returns the maximum block height from our database */ def maxHeight: Future[Int] = { val query = maxHeightQuery