Simplify ChainHandler.nextFilterHeaderBatchRange() (#5336)

Get implementation working for walking backwards to find best filter associated with our stopBlockHeaderDb
This commit is contained in:
Chris Stewart 2023-12-29 10:20:24 -06:00 committed by GitHub
parent af361167a4
commit f1775c46d3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -211,13 +211,14 @@ class ChainHandler(
//means are are in sync //means are are in sync
Future.successful(None) Future.successful(None)
} else { } else {
val prevBlockHeaderOptF = getHeader(prevStopHash) val candidateStartHeaderOptF = getHeader(prevStopHash)
val stopBlockHeaderOptF = getHeader(stopHash) val stopBlockHeaderOptF = getHeader(stopHash)
for { for {
prevBlockHeaderOpt <- prevBlockHeaderOptF candidateStartHeaderOpt <- candidateStartHeaderOptF
stopBlockHeaderOpt <- stopBlockHeaderOptF stopBlockHeaderOpt <- stopBlockHeaderOptF
fsmOpt <- getFilterSyncStopHash(prevBlockHeaderOpt = prevBlockHeaderOpt, fsmOpt <- getFilterSyncStopHash(candidateStartHeaderOpt =
candidateStartHeaderOpt,
stopBlockHeaderOpt = stopBlockHeaderOpt, stopBlockHeaderOpt = stopBlockHeaderOpt,
batchSize = batchSize) batchSize = batchSize)
} yield { } yield {
@ -230,18 +231,19 @@ class ChainHandler(
* we walk backwards until we find a header within the batchSize limit * we walk backwards until we find a header within the batchSize limit
*/ */
private def getFilterSyncStopHash( private def getFilterSyncStopHash(
prevBlockHeaderOpt: Option[BlockHeaderDb], candidateStartHeaderOpt: Option[BlockHeaderDb],
stopBlockHeaderOpt: Option[BlockHeaderDb], stopBlockHeaderOpt: Option[BlockHeaderDb],
batchSize: Int): Future[Option[FilterSyncMarker]] = { batchSize: Int): Future[Option[FilterSyncMarker]] = {
(prevBlockHeaderOpt, stopBlockHeaderOpt) match { (candidateStartHeaderOpt, stopBlockHeaderOpt) match {
case (prevBlockHeaderOpt, Some(stopBlockHeader)) => case (candidateStartHeaderOpt, Some(stopBlockHeader)) =>
val blockchainOptF = blockHeaderDAO.getBlockchainFrom(stopBlockHeader) val blockchainOptF = blockHeaderDAO.getBlockchainFrom(stopBlockHeader)
for { for {
blockchainOpt <- blockchainOptF blockchainOpt <- blockchainOptF
fsmOpt <- { fsmOpt <- {
blockchainOpt match { blockchainOpt match {
case Some(blockchain) => case Some(blockchain) =>
findNextHeader(prevBlockHeaderOpt = prevBlockHeaderOpt, findNextHeader(candidateStartHeaderOpt =
candidateStartHeaderOpt,
stopBlockHeaderDb = stopBlockHeader, stopBlockHeaderDb = stopBlockHeader,
batchSize = batchSize, batchSize = batchSize,
blockchain = blockchain) blockchain = blockchain)
@ -253,9 +255,9 @@ class ChainHandler(
} }
} yield fsmOpt } yield fsmOpt
case (Some(prevBlockHeader), None) => case (Some(candidateStartHeader), None) =>
val exn = new RuntimeException( val exn = new RuntimeException(
s"Cannot find a stopBlockHeader, only found prevBlockHeader=${prevBlockHeader.hashBE}") s"Cannot find a stopBlockHeader, only found prevBlockHeader=${candidateStartHeader.hashBE}")
Future.failed(exn) Future.failed(exn)
case (None, None) => case (None, None) =>
val exn = new RuntimeException( val exn = new RuntimeException(
@ -267,26 +269,26 @@ class ChainHandler(
/** Finds the next header in the chain. Uses chain work to break ties /** Finds the next header in the chain. Uses chain work to break ties
* returning only the header in the chain with the most work, * returning only the header in the chain with the most work,
* else returns None if there is no next header * else returns None if there is no next header
* @param prevBlockHeaderOpt the previous block header we synced from, if None we are syncing from genesis * @param candidateStartHeaderOpt the previous block header we synced from, if None we are syncing from genesis
*/ */
private def findNextHeader( private def findNextHeader(
prevBlockHeaderOpt: Option[BlockHeaderDb], candidateStartHeaderOpt: Option[BlockHeaderDb],
stopBlockHeaderDb: BlockHeaderDb, stopBlockHeaderDb: BlockHeaderDb,
batchSize: Int, batchSize: Int,
blockchain: Blockchain): Future[Option[FilterSyncMarker]] = { blockchain: Blockchain): Future[Option[FilterSyncMarker]] = {
val isInBatchSize = val isInBatchSize =
(stopBlockHeaderDb.height - prevBlockHeaderOpt (stopBlockHeaderDb.height - candidateStartHeaderOpt
.map(_.height) .map(_.height)
.getOrElse(0)) <= batchSize .getOrElse(0)) <= batchSize
val prevBlockHeaderHeight = { val candidateStartHeaderHeight = {
prevBlockHeaderOpt candidateStartHeaderOpt
.map(_.height) .map(_.height)
.getOrElse(-1) //means we are syncing from the genesis block .getOrElse(-1) //means we are syncing from the genesis block
} }
val prevBlockHeaderHashBE = { val candidateStartHeaderHashBE = {
prevBlockHeaderOpt candidateStartHeaderOpt
.map(_.hashBE) .map(_.hashBE)
.getOrElse( .getOrElse(
DoubleSha256DigestBE.empty DoubleSha256DigestBE.empty
@ -296,21 +298,18 @@ class ChainHandler(
val chainOpt: Option[Blockchain] = { val chainOpt: Option[Blockchain] = {
val hasBothHashes = hasBothBlockHeaderHashes( val hasBothHashes = hasBothBlockHeaderHashes(
blockchain = blockchain, blockchain = blockchain,
prevBlockHeaderHashBE = prevBlockHeaderHashBE, prevBlockHeaderHashBE = candidateStartHeaderHashBE,
stopBlockHeaderHashBE = stopBlockHeaderDb.hashBE) stopBlockHeaderHashBE = stopBlockHeaderDb.hashBE)
if (hasBothHashes) Some(blockchain) if (hasBothHashes) Some(blockchain)
else None else None
} }
val startHeight = prevBlockHeaderHeight + 1 val startHeight = candidateStartHeaderHeight + 1
val nextBlockHeaderOpt = { val nextBlockHeaderOpt = {
chainOpt match { chainOpt match {
case Some(chain) => case Some(chain) =>
if (chainOpt.isEmpty) { if (isInBatchSize) {
//means prevBlockHeader and stopBlockHeader do not form a blockchain
None
} else if (isInBatchSize) {
Some(FilterSyncMarker(startHeight, stopBlockHeaderDb.hash)) Some(FilterSyncMarker(startHeight, stopBlockHeaderDb.hash))
} else { } else {
//means our requested stopBlockHeader is outside of batchSize //means our requested stopBlockHeader is outside of batchSize
@ -319,15 +318,16 @@ class ChainHandler(
batchSize = batchSize, batchSize = batchSize,
blockchains = Vector(chain)) blockchains = Vector(chain))
} }
case None => None case None =>
//means prevBlockHeader and stopBlockHeader do not form a blockchain
None
} }
} }
val resultOpt = nextBlockHeaderOpt match { val resultOpt = nextBlockHeaderOpt match {
case Some(next) => case Some(next) =>
//this means we are synced, so return None //this means we are synced, so return None
val isSynced = val isSynced =
next.stopBlockHashBE == prevBlockHeaderHashBE next.stopBlockHashBE == candidateStartHeaderHashBE
if (isSynced) { if (isSynced) {
None None
} else { } else {
@ -394,35 +394,33 @@ class ChainHandler(
batchSize: Int, batchSize: Int,
startHeightOpt: Option[Int]): Future[Option[FilterSyncMarker]] = { startHeightOpt: Option[Int]): Future[Option[FilterSyncMarker]] = {
val stopBlockHeaderDbOptF = getHeader(stopBlockHash) val stopBlockHeaderDbOptF = getHeader(stopBlockHash)
val startHeadersOptF: Future[Option[Vector[BlockHeaderDb]]] =
startHeightOpt match { val tupleOptF: Future[Option[(BlockHeaderDb, Blockchain)]] = for {
case Some(startHeight) => stopBlockHeaderDbOpt <- stopBlockHeaderDbOptF
getHeadersAtHeight(startHeight - 1).map { headers => tupleOpt = stopBlockHeaderDbOpt.map { s =>
if (headers.isEmpty) None val blockchainOptF = blockHeaderDAO.getBlockchainFrom(s)
else Some(headers) blockchainOptF.map {
} case Some(b) => Some((s, b))
case None => case None => None
getBestFilter().flatMap { }
case Some(bestFilter) =>
getHeader(bestFilter.blockHashBE)
.map(_.toVector)
.map(Some(_))
case None =>
//means we need to sync from genesis filter
Future.successful(None)
}
} }
result <- tupleOpt match {
case Some(t) => t
case None => Future.successful(None)
}
} yield {
result
}
for { for {
stopBlockHeaderDbOpt <- stopBlockHeaderDbOptF tupleOpt <- tupleOptF
startBlockHeadersOpt <- startHeadersOptF
fsmOpt <- { fsmOpt <- {
stopBlockHeaderDbOpt match { tupleOpt match {
case Some(stopBlockHeaderDb) => case Some((stopBlockHeaderDb, blockchain)) =>
getFilterSyncMarkerFromStopBlockHeader( getFilterSyncMarkerFromStopBlockHeader(stopBlockHeaderDb =
stopBlockHeaderDb = stopBlockHeaderDb, stopBlockHeaderDb,
candidateStartHeadersOpt = startBlockHeadersOpt, blockchain = blockchain,
batchSize = batchSize) batchSize = batchSize)
case None => case None =>
val exn = new RuntimeException( val exn = new RuntimeException(
s"Could not find stopBlockHeaderHash=$stopBlockHash in chaindb") s"Could not find stopBlockHeaderHash=$stopBlockHash in chaindb")
@ -439,40 +437,49 @@ class ChainHandler(
*/ */
private def getFilterSyncMarkerFromStopBlockHeader( private def getFilterSyncMarkerFromStopBlockHeader(
stopBlockHeaderDb: BlockHeaderDb, stopBlockHeaderDb: BlockHeaderDb,
candidateStartHeadersOpt: Option[Vector[BlockHeaderDb]], blockchain: Blockchain,
batchSize: Int): Future[Option[FilterSyncMarker]] = { batchSize: Int): Future[Option[FilterSyncMarker]] = {
val blockchainOptF = blockHeaderDAO.getBlockchainFrom(stopBlockHeaderDb)
for {
blockchainOpt <- blockchainOptF
fsmOptVec <- {
blockchainOpt match {
case None =>
val exn = new RuntimeException(
s"Could not form a blockchain with stopHash=${stopBlockHeaderDb.hashBE}")
Future.failed(exn)
case Some(blockchain) =>
candidateStartHeadersOpt match {
case Some(startHeaders) =>
Future.traverse(startHeaders) { h =>
findNextHeader(prevBlockHeaderOpt = Some(h),
stopBlockHeaderDb = stopBlockHeaderDb,
batchSize = batchSize,
blockchain = blockchain)
}
case None => //walk backwards until we find a filter associated with our stopBlockHeaderDb
val fsmOptF = findNextHeader(prevBlockHeaderOpt = None, val bestFilterOptF: Future[Option[CompactFilterDb]] = {
stopBlockHeaderDb = Future.find(blockchain.map(b => getFilter(b.hashBE)))(_.isDefined)
stopBlockHeaderDb, }.map(_.flatten)
batchSize = batchSize,
blockchain = blockchain) //get blockheader associated with the best filter associated with stopBlockHeaderDb
fsmOptF.map(Vector(_)) val bestHeaderOptF: Future[Option[BlockHeaderDb]] = bestFilterOptF.flatMap {
} case Some(bestFilter) => getHeader(bestFilter.blockHashBE)
} case None => Future.successful(None)
}
} yield {
fsmOptVec.flatten.headOption
} }
for {
bestHeaderOpt <- bestHeaderOptF
fsmOpt <- findBestFilterSyncMarker(bestHeaderOpt.toVector,
stopBlockHeaderDb,
blockchain,
batchSize)
} yield fsmOpt
}
private def findBestFilterSyncMarker(
candidateStartHeaders: Vector[BlockHeaderDb],
stopBlockHeaderDb: BlockHeaderDb,
blockchain: Blockchain,
batchSize: Int): Future[Option[FilterSyncMarker]] = {
val fsmOptF: Future[Option[FilterSyncMarker]] = {
val startHeaderOpt = {
candidateStartHeaders.find(h =>
hasBothBlockHeaderHashes(blockchain = blockchain,
prevBlockHeaderHashBE = h.hashBE,
stopBlockHeaderHashBE =
stopBlockHeaderDb.hashBE))
}
findNextHeader(candidateStartHeaderOpt = startHeaderOpt,
stopBlockHeaderDb = stopBlockHeaderDb,
batchSize = batchSize,
blockchain = blockchain)
}
fsmOptF
} }
/** @inheritdoc */ /** @inheritdoc */