Optimize org.bitcoins.chain.blockchain.BaseBlockChain (#781)

* Optimize org.bitcoins.chain.blockchain.BaseBlockChain

* remove toStream calls

* cleanup

* fix IndexOutOfBoundsException
This commit is contained in:
rorp 2019-10-06 07:47:02 -07:00 committed by Chris Stewart
parent fd91fc37e2
commit d390de323c
3 changed files with 89 additions and 61 deletions

View File

@ -6,6 +6,8 @@ import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.ChainVerificationLogger
import org.bitcoins.chain.validation.TipUpdateResult
import org.bitcoins.chain.validation.TipValidation
import org.bitcoins.core.crypto.DoubleSha256DigestBE
import scala.annotation.tailrec
// INTERNAL NOTE: Due to changes in the Scala collections in 2.13 this
@ -55,12 +57,11 @@ private[blockchain] trait BaseBlockChain {
/** Splits the blockchain at the header, returning a new blockchain where the best tip is the given header */
def fromHeader(header: BlockHeaderDb): Option[Blockchain] = {
val headerIdxOpt = headers.zipWithIndex.find(_._1 == header)
headerIdxOpt.map {
case (header, idx) =>
val newChain = this.compObjectfromHeaders(headers.splitAt(idx)._2)
require(newChain.tip == header)
newChain
val headerIdxOpt = findHeaderIdx(header.hashBE)
headerIdxOpt.map { idx =>
val newChain = this.compObjectfromHeaders(headers.splitAt(idx)._2)
require(newChain.tip == header)
newChain
}
}
@ -68,6 +69,25 @@ private[blockchain] trait BaseBlockChain {
def fromValidHeader(header: BlockHeaderDb): Blockchain = {
fromHeader(header).get
}
/** Finds a header index by the given hash */
def findHeaderIdx(hashBE: DoubleSha256DigestBE): Option[Int] = {
@tailrec
def loop(idx: Int): Option[Int] = {
if (idx >= headers.size)
None
else {
val header = headers(idx)
if (header.hashBE == hashBE)
Some(idx)
else
loop(idx + 1)
}
}
loop(0)
}
}
private[blockchain] trait BaseBlockChainCompObject
@ -87,12 +107,7 @@ private[blockchain] trait BaseBlockChainCompObject
s"Attempting to add new tip=${header.hashBE.hex} with prevhash=${header.previousBlockHashBE.hex} to chain")
val tipResult: ConnectTipResult = {
val prevBlockHeaderIdxOpt =
blockchain.headers.zipWithIndex.find {
case (headerDb, _) =>
headerDb.hashBE == header.previousBlockHashBE
}
prevBlockHeaderIdxOpt match {
findPrevBlockHeaderIdx(header, blockchain) match {
case None =>
logger.warn(
s"No common ancestor found in the chain to connect to ${header.hashBE}")
@ -100,8 +115,9 @@ private[blockchain] trait BaseBlockChainCompObject
val failed = ConnectTipResult.BadTip(err)
failed
case Some((prevBlockHeader, prevHeaderIdx)) =>
case Some(prevHeaderIdx) =>
//found a header to connect to!
val prevBlockHeader = blockchain.headers(prevHeaderIdx)
logger.debug(
s"Attempting to add new tip=${header.hashBE.hex} with prevhash=${header.previousBlockHashBE.hex} to chain")
val chain = blockchain.fromValidHeader(prevBlockHeader)
@ -114,15 +130,17 @@ private[blockchain] trait BaseBlockChainCompObject
s"Successfully verified=${success.header.hashBE.hex}, connecting to chain")
val connectionIdx = blockchain.length - prevHeaderIdx
val oldChain =
blockchain.takeRight(connectionIdx)
val newChain =
Blockchain.fromHeaders(success.headerDb +: oldChain)
// we construct a new blockchain by prepending the headers vector from the old one with the new tip
// in order to avoid creating unnecessary hidden copies of the blockchain here
if (connectionIdx != blockchain.length) {
val newChain = Blockchain(
success.headerDb +: blockchain.headers.takeRight(
connectionIdx))
//means we have a reorg, since we aren't connecting to latest tip
ConnectTipResult.Reorg(success, newChain)
} else {
val newChain = Blockchain(
success.headerDb +: blockchain.headers)
//we just extended the latest tip
ConnectTipResult.ExtendChain(success, newChain)
}
@ -137,6 +155,7 @@ private[blockchain] trait BaseBlockChainCompObject
}
/** Iterates through each given blockchains attempting to connect the given headers to that chain
*
* @return The final updates for each chain
*
* */
@ -147,31 +166,19 @@ private[blockchain] trait BaseBlockChainCompObject
logger.debug(
s"Attempting to connect ${headers.length} headers to ${blockchains.length} blockchains")
@tailrec
def loop(
headersToProcess: Vector[BlockHeader],
lastUpdates: Vector[BlockchainUpdate]): Vector[BlockchainUpdate] = {
headersToProcess match {
case h +: t =>
val newUpdates: Vector[BlockchainUpdate] = lastUpdates
.flatMap { lastUpdate =>
val connectTipResult =
Blockchain.connectTip(header = h,
blockchain = lastUpdate.blockchain)
parseConnectTipResult(connectTipResult, lastUpdate)
}
loop(headersToProcess = t, lastUpdates = newUpdates)
case Vector() =>
lastUpdates
}
}
val initUpdates = blockchains.map { blockchain =>
val initUpdates: Vector[BlockchainUpdate] = blockchains.map { blockchain =>
BlockchainUpdate.Successful(blockchain, Vector.empty)
}
loop(headers, initUpdates)
headers.foldLeft(initUpdates) { (lastUpdates, h) =>
lastUpdates
.flatMap { lastUpdate =>
val connectTipResult =
Blockchain.connectTip(header = h,
blockchain = lastUpdate.blockchain)
parseConnectTipResult(connectTipResult, lastUpdate)
}
}
}
/** Parses a connect tip result, and depending on the result it
@ -211,4 +218,21 @@ private[blockchain] trait BaseBlockChainCompObject
}
/**
* Finds the parent's index of the given header
*/
private def findPrevBlockHeaderIdx(
header: BlockHeader,
blockchain: Blockchain): Option[Int] = {
// Let's see if we are lucky and the latest tip is the parent.
val latestTip = blockchain.tip
if (latestTip.hashBE == header.previousBlockHashBE) {
// Yes we are.
Some(0)
} else {
// No. Scanning the blockchain to find the parent.
blockchain.findHeaderIdx(header.previousBlockHashBE)
}
}
}

View File

@ -59,26 +59,30 @@ case class ChainHandler(
/** @inheritdoc */
override def processHeaders(headers: Vector[BlockHeader])(
implicit ec: ExecutionContext): Future[ChainApi] = {
val blockchainUpdates: Vector[BlockchainUpdate] = {
Blockchain.connectHeadersToChains(headers, blockchains)
}
val headersToBeCreated = {
blockchainUpdates.flatMap(_.successfulHeaders).distinct
}
val chains = blockchainUpdates.map(_.blockchain)
val createdF = blockHeaderDAO.createAll(headersToBeCreated)
val newChainHandler = this.copy(blockchains = chains)
createdF.map { _ =>
chains.foreach { c =>
logger.info(
s"Processed headers from height=${c(headers.length - 1).height} to ${c.height}. Best hash=${c.tip.hashBE.hex}")
if (headers.isEmpty) {
Future.successful(this)
} else {
val blockchainUpdates: Vector[BlockchainUpdate] = {
Blockchain.connectHeadersToChains(headers, blockchains)
}
val headersToBeCreated = {
blockchainUpdates.flatMap(_.successfulHeaders).distinct
}
val chains = blockchainUpdates.map(_.blockchain)
val createdF = blockHeaderDAO.createAll(headersToBeCreated)
val newChainHandler = this.copy(blockchains = chains)
createdF.map { _ =>
chains.foreach { c =>
logger.info(
s"Processed headers from height=${c(headers.length - 1).height} to ${c.height}. Best hash=${c.tip.hashBE.hex}")
}
newChainHandler
}
newChainHandler
}
}

View File

@ -225,7 +225,7 @@ case class DataMessageHandler(
res <- nextRangeOpt match {
case Some((startHeight, stopHash)) =>
logger.info(
s"Requesting compact filter headers from=$startHeight to=$stopHash")
s"Requesting compact filter headers from=$startHeight to=${stopHash.flip}")
peerMsgSender
.sendGetCompactFilterHeadersMessage(startHeight, stopHash)
.map(_ => true)
@ -259,7 +259,7 @@ case class DataMessageHandler(
res <- nextRangeOpt match {
case Some((startHeight, stopHash)) =>
logger.info(
s"Requesting compact filters from=$startHeight to=$stopHash")
s"Requesting compact filters from=$startHeight to=${stopHash.flip}")
peerMsgSender
.sendGetCompactFiltersMessage(startHeight, stopHash)
.map(_ => true)