Fix bug where compact filters weren't being processed in order of block height during IBD (#5041)

* Fix bug where compact filters weren't being processed in order of block height during IBD

* Use sorted compact filter messages in chainApi.processFilters()
This commit is contained in:
Chris Stewart 2023-04-10 16:42:48 -05:00 committed by GitHub
parent 5e668c5d5e
commit 13e5e6501c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 61 additions and 13 deletions

View File

@ -619,6 +619,16 @@ class ChainHandlerTest extends ChainDbUnitTest {
}
}
it must "return filters in order by block height" in { chainHandler =>
val maxHeightF = chainHandler.getBestHashBlockHeight()
for {
maxHeight <- maxHeightF
filters <- chainHandler.getFiltersBetweenHeights(0, maxHeight)
} yield {
assert(filters.sortBy(_.blockHeight) == filters)
}
}
it must "get the correct height from an epoch second" in {
chainHandler: ChainHandler =>
for {

View File

@ -403,7 +403,6 @@ class ChainHandler(
newFilters <- newFiltersF
filterHeaders <- filterHeaderDAO
.findAllByBlockHashes(newFilters.map(_.blockHash.flip))
.map(_.sortBy(_.height))
} yield filterHeaders
}

View File

@ -133,7 +133,10 @@ case class CompactFilterDAO()(implicit
Seq[CompactFilterDb],
CompactFilterDb,
Effect.Read] = {
table.filter(header => header.height >= from && header.height <= to).result
table
.filter(header => header.height >= from && header.height <= to)
.sortBy(_.height)
.result
}
private val bestFilterQuery = {

View File

@ -95,7 +95,9 @@ case class CompactFilterHeaderDAO()(implicit
def findAllByBlockHashes(hashes: Vector[DoubleSha256DigestBE]): Future[
Vector[CompactFilterHeaderDb]] = {
val query = table.filter(_.blockHash.inSet(hashes))
val query = table
.filter(_.blockHash.inSet(hashes))
.sortBy(_.height)
safeDatabase.runVec(query.result)
}

View File

@ -6,10 +6,10 @@ import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models.BlockHeaderDAO
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.api.node.NodeType
import org.bitcoins.core.gcs.BlockFilter
import org.bitcoins.core.gcs.{BlockFilter, GolombFilter}
import org.bitcoins.core.p2p._
import org.bitcoins.core.protocol.CompactSizeUInt
import org.bitcoins.crypto.DoubleSha256DigestBE
import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models._
import org.bitcoins.node.networking.peer.DataMessageHandlerState._
@ -176,17 +176,19 @@ case class DataMessageHandler(
// If we are not syncing or our filter batch is full, process the filters
(newBatch: Set[CompactFilterMessage], newChainApi) <- {
if (isFiltersSynced || batchSizeFull) {
val blockFilters = filterBatch.map { filter =>
(filter.blockHash,
BlockFilter.fromBytes(filter.filterBytes, filter.blockHash))
}
logger.info(s"Processing ${filterBatch.size} filters")
val sortedBlockFiltersF = sortBlockFiltersByBlockHeight(
filterBatch)
for {
newChainApi <- chainApi.processFilters(filterBatch.toVector)
sortedBlockFilters <- sortedBlockFiltersF
sortedFilterMessages = sortedBlockFilters.map(_._2)
newChainApi <- chainApi.processFilters(sortedFilterMessages)
sortedGolombFilters = sortedBlockFilters.map(x => (x._1, x._3))
_ <-
appConfig.callBacks
.executeOnCompactFiltersReceivedCallbacks(
blockFilters.toVector)
sortedGolombFilters)
} yield (Set.empty, newChainApi)
} else Future.successful((filterBatch, chainApi))
}
@ -802,6 +804,38 @@ case class DataMessageHandler(
}
} yield ()
}
private def sortBlockFiltersByBlockHeight(
filterBatch: Set[CompactFilterMessage]): Future[
Vector[(DoubleSha256Digest, CompactFilterMessage, GolombFilter)]] = {
val blockFiltersF: Future[
Set[(Int, DoubleSha256Digest, CompactFilterMessage, GolombFilter)]] = {
Future.traverse(filterBatch) { filter =>
val blockHeightOptF =
chainApi.getBlockHeight(filter.blockHash.flip)
val filtersWithBlockHeightF = for {
blockHeightOpt <- blockHeightOptF
} yield {
require(
blockHeightOpt.isDefined,
s"Could not find block height for blockHash=${filter.blockHash.flip}")
(blockHeightOpt.get,
filter.blockHash,
filter,
BlockFilter.fromBytes(filter.filterBytes, filter.blockHash))
}
filtersWithBlockHeightF
}
}
val sortedBlockFiltersF = {
blockFiltersF
.map(_.toVector.sortBy(_._1))
.map(set => set.map(tuple => (tuple._2, tuple._3, tuple._4)))
}
sortedBlockFiltersF
}
}
sealed trait StreamDataMessageWrapper

View File

@ -179,8 +179,8 @@ abstract class Wallet
} else {
FutureUtil
.batchAndParallelExecute(
blockFilters,
searchFilterMatches(scriptPubKeys.toVector)
elements = blockFilters,
f = searchFilterMatches(scriptPubKeys.toVector)
)
.map(_.flatten)
}