2023 02 27 Fix bug where duplicate filters caused node not to sync (#5008)

* WIP

* Modify DataMessageHandler to use Set for filter batch, fix test case

* revert log level

* Remove duplicate filters test

* Re-add test to make sure we don't throw an exception when processing a filter we've seen before

* Empty commit to re-run CI

* Empty commit to re-run CI

* Empty commit to re-run CI

* Fix NeutrinoNodeTest

* Empty commit to re-run CI

* Empty commit to re-run CI
This commit is contained in:
Chris Stewart 2023-03-02 06:46:08 -06:00 committed by GitHub
parent b444a192b0
commit de0e892b3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 72 additions and 32 deletions

View File

@ -633,12 +633,19 @@ class ChainHandlerTest extends ChainDbUnitTest {
}
}
it must "fail when processing duplicate filters" in {
it must "not throw an exception when processing a filter we have already seen" in {
chainHandler: ChainHandler =>
recoverToSucceededIf[DuplicateFilters] {
val filters = Vector.fill(2)(ChainUnitTest.genesisFilterMessage)
chainHandler.processFilters(filters)
val filter = ChainUnitTest.genesisFilterMessage
val filters = Vector.fill(2)(filter)
val filterCountBeforeF = chainHandler.getFilterCount()
val processF =
filterCountBeforeF.flatMap(_ => chainHandler.processFilters(filters))
for {
_ <- processF
beforeFilterCount <- filterCountBeforeF
filterCount <- chainHandler.getFilterCount()
} yield {
assert(beforeFilterCount == filterCount)
}
}

View File

@ -384,29 +384,54 @@ class ChainHandler(
/** @inheritdoc */
override def processFilters(
messages: Vector[CompactFilterMessage]): Future[ChainApi] = {
//find filters we have seen before
val duplicateFiltersF: Future[Vector[CompactFilterDb]] = {
filterDAO.findByBlockHashes(messages.map(_.blockHash.flip))
}
logger.debug(s"processFilters: messages=${messages}")
val filterHeadersF = filterHeaderDAO
.findAllByBlockHashes(messages.map(_.blockHash.flip))
.map(_.sortBy(_.height))
//only add new filters to our database
val newFiltersF = for {
duplicates <- duplicateFiltersF
} yield messages.filterNot(f =>
duplicates.exists(_.blockHashBE == f.blockHash.flip))
val messagesByBlockHash: Map[DoubleSha256DigestBE, CompactFilterMessage] =
messages.groupBy(_.blockHash.flip).map { case (blockHash, messages) =>
if (messages.size > 1)
return Future.failed(
DuplicateFilters("Attempt to process duplicate filters"))
(blockHash, messages.head)
logger.debug(
s"processFilters: len=${messages.length} messages.blockHash=${messages
.map(_.blockHash.flip)}")
val filterHeadersF = {
for {
newFilters <- newFiltersF
filterHeaders <- filterHeaderDAO
.findAllByBlockHashes(newFilters.map(_.blockHash.flip))
.map(_.sortBy(_.height))
} yield filterHeaders
}
val filtersByBlockHashF: Future[
Map[DoubleSha256DigestBE, CompactFilterMessage]] = {
for {
newFilters <- newFiltersF
} yield {
newFilters.groupBy(_.blockHash.flip).map { case (blockHash, messages) =>
if (messages.size > 1) {
return Future.failed(DuplicateFilters(
s"Attempt to process ${messages.length} duplicate filters for blockHashBE=$blockHash"))
}
(blockHash, messages.head)
}
}
}
for {
filterHeaders <- filterHeadersF
_ = logger.debug(s"processFilters: filterHeaders=$filterHeaders")
filtersByBlockHash <- filtersByBlockHashF
_ = require(
filterHeaders.size == messages.size,
s"Filter batch size does not match filter header batch size ${messages.size} != ${filterHeaders.size}")
filterHeaders.size == filtersByBlockHash.values.size,
s"Filter batch size does not match filter header batch size ${filtersByBlockHash.values.size} != ${filterHeaders.size}"
)
compactFilterDbs <- FutureUtil.makeAsync { () =>
filterHeaders.map { filterHeader =>
findFilterDbFromMessage(filterHeader, messagesByBlockHash)
findFilterDbFromMessage(filterHeader, filtersByBlockHash)
}
}
_ <- filterDAO.createAll(compactFilterDbs)
@ -423,7 +448,8 @@ class ChainHandler(
this
// Should never have the case where we have (Some, None) or (None, Some) because that means the vec would be both empty and non empty
case (_, _) =>
logger.warn("Was unable to process any filters")
logger.warn(
s"Was unable to process any filters minHeightOpt=$minHeightOpt maxHeightOpt=$maxHeightOpt compactFilterDbs.length=${compactFilterDbs.length}")
this
}
}

View File

@ -87,6 +87,12 @@ case class CompactFilterDAO()(implicit
read(hash)
}
def findByBlockHashes(
hashes: Vector[DoubleSha256DigestBE]): Future[Vector[CompactFilterDb]] = {
val action = findByPrimaryKeys(hashes).result
safeDatabase.runVec(action)
}
/** Retrieves a [[CompactFilterDb]] at the given height */
def getAtHeight(height: Int): Future[Vector[CompactFilterDb]] = {
val query = getAtHeightQuery(height)

View File

@ -47,7 +47,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
node = node,
state = HeaderSync,
initialSyncDone = None,
currentFilterBatch = Vector.empty,
currentFilterBatch = Set.empty,
filterHeaderHeightOpt = None,
filterHeightOpt = None,
syncPeer = Some(peer)
@ -98,7 +98,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
node = node,
state = HeaderSync,
initialSyncDone = None,
currentFilterBatch = Vector.empty,
currentFilterBatch = Set.empty,
filterHeaderHeightOpt = None,
filterHeightOpt = None,
syncPeer = Some(peer)
@ -143,7 +143,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
node = node,
state = HeaderSync,
initialSyncDone = None,
currentFilterBatch = Vector.empty,
currentFilterBatch = Set.empty,
filterHeaderHeightOpt = None,
filterHeightOpt = None,
syncPeer = Some(peer)
@ -209,7 +209,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
node = node,
state = HeaderSync,
initialSyncDone = None,
currentFilterBatch = Vector.empty,
currentFilterBatch = Set.empty,
filterHeaderHeightOpt = None,
filterHeightOpt = None,
syncPeer = Some(peer)

View File

@ -67,7 +67,7 @@ case class NeutrinoNode(
node = this,
state = HeaderSync,
initialSyncDone = None,
currentFilterBatch = Vector.empty,
currentFilterBatch = Set.empty,
filterHeaderHeightOpt = filterHeaderCountOpt,
filterHeightOpt = filterCountOpt,
syncPeer = None

View File

@ -33,7 +33,7 @@ case class DataMessageHandler(
node: Node,
state: DataMessageHandlerState,
initialSyncDone: Option[Promise[Done]],
currentFilterBatch: Vector[CompactFilterMessage],
currentFilterBatch: Set[CompactFilterMessage],
filterHeaderHeightOpt: Option[Int],
filterHeightOpt: Option[Int],
syncPeer: Option[Peer])(implicit
@ -49,7 +49,7 @@ case class DataMessageHandler(
private val syncing: Boolean = syncPeer.isDefined
def reset: DataMessageHandler = copy(initialSyncDone = None,
currentFilterBatch = Vector.empty,
currentFilterBatch = Set.empty,
filterHeaderHeightOpt = None,
filterHeightOpt = None,
syncPeer = None,
@ -193,8 +193,8 @@ case class DataMessageHandler(
syncing
}
// If we are not syncing or our filter batch is full, process the filters
filterBatch = currentFilterBatch :+ filter
(newBatch, newChainApi) <- {
filterBatch = currentFilterBatch.+(filter)
(newBatch: Set[CompactFilterMessage], newChainApi) <- {
if (!newSyncing || batchSizeFull) {
val blockFilters = filterBatch.map { filter =>
(filter.blockHash,
@ -202,11 +202,12 @@ case class DataMessageHandler(
}
logger.info(s"Processing ${filterBatch.size} filters")
for {
newChainApi <- chainApi.processFilters(filterBatch)
newChainApi <- chainApi.processFilters(filterBatch.toVector)
_ <-
appConfig.callBacks
.executeOnCompactFiltersReceivedCallbacks(blockFilters)
} yield (Vector.empty, newChainApi)
.executeOnCompactFiltersReceivedCallbacks(
blockFilters.toVector)
} yield (Set.empty, newChainApi)
} else Future.successful((filterBatch, chainApi))
}
_ <-