From de0e892b3ec8ae8f4a5bd45ab99f1a15c69d4bba Mon Sep 17 00:00:00 2001 From: Chris Stewart Date: Thu, 2 Mar 2023 06:46:08 -0600 Subject: [PATCH] 2023 02 27 Fix bug where duplicate filters caused node not to sync (#5008) * WIP * Modify DataMessageHandler to use Set for filter batch, fix test case * revert log level * Remove duplicate filters test * Re-add test to make sure we don't throw an exception when processing a filter we've seen before * Empty commit to re-run CI * Empty commit to re-run CI * Empty commit to re-run CI * Fix NeutrinoNodeTest * Empty commit to re-run CI * Empty commit to re-run CI --- .../chain/blockchain/ChainHandlerTest.scala | 17 ++++-- .../chain/blockchain/ChainHandler.scala | 56 ++++++++++++++----- .../chain/models/CompactFilterDAO.scala | 6 ++ .../peer/DataMessageHandlerTest.scala | 8 +-- .../org/bitcoins/node/NeutrinoNode.scala | 2 +- .../networking/peer/DataMessageHandler.scala | 15 ++--- 6 files changed, 72 insertions(+), 32 deletions(-) diff --git a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala index d304b802f3..3e414e9620 100644 --- a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala +++ b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala @@ -633,12 +633,19 @@ class ChainHandlerTest extends ChainDbUnitTest { } } - it must "fail when processing duplicate filters" in { + it must "not throw an exception when processing a filter we have already seen" in { chainHandler: ChainHandler => - recoverToSucceededIf[DuplicateFilters] { - val filters = Vector.fill(2)(ChainUnitTest.genesisFilterMessage) - - chainHandler.processFilters(filters) + val filter = ChainUnitTest.genesisFilterMessage + val filters = Vector.fill(2)(filter) + val filterCountBeforeF = chainHandler.getFilterCount() + val processF = + filterCountBeforeF.flatMap(_ => chainHandler.processFilters(filters)) + for { + _ <- processF + beforeFilterCount <- filterCountBeforeF + filterCount <- chainHandler.getFilterCount() + } yield { + assert(beforeFilterCount == filterCount) } } diff --git a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala index 73ee4c33bd..ad2c1ef8f0 100644 --- a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala +++ b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala @@ -384,29 +384,54 @@ class ChainHandler( /** @inheritdoc */ override def processFilters( messages: Vector[CompactFilterMessage]): Future[ChainApi] = { + //find filters we have seen before + val duplicateFiltersF: Future[Vector[CompactFilterDb]] = { + filterDAO.findByBlockHashes(messages.map(_.blockHash.flip)) + } - logger.debug(s"processFilters: messages=${messages}") - val filterHeadersF = filterHeaderDAO - .findAllByBlockHashes(messages.map(_.blockHash.flip)) - .map(_.sortBy(_.height)) + //only add new filters to our database + val newFiltersF = for { + duplicates <- duplicateFiltersF + } yield messages.filterNot(f => + duplicates.exists(_.blockHashBE == f.blockHash.flip)) - val messagesByBlockHash: Map[DoubleSha256DigestBE, CompactFilterMessage] = - messages.groupBy(_.blockHash.flip).map { case (blockHash, messages) => - if (messages.size > 1) - return Future.failed( - DuplicateFilters("Attempt to process duplicate filters")) - (blockHash, messages.head) + logger.debug( + s"processFilters: len=${messages.length} messages.blockHash=${messages + .map(_.blockHash.flip)}") + val filterHeadersF = { + for { + newFilters <- newFiltersF + filterHeaders <- filterHeaderDAO + .findAllByBlockHashes(newFilters.map(_.blockHash.flip)) + .map(_.sortBy(_.height)) + } yield filterHeaders + } + + val filtersByBlockHashF: Future[ + Map[DoubleSha256DigestBE, CompactFilterMessage]] = { + for { + newFilters <- newFiltersF + } yield { + newFilters.groupBy(_.blockHash.flip).map { case (blockHash, messages) => + if (messages.size > 1) { + return Future.failed(DuplicateFilters( + s"Attempt to process ${messages.length} duplicate filters for blockHashBE=$blockHash")) + } + (blockHash, messages.head) + } } + } for { filterHeaders <- filterHeadersF - _ = logger.debug(s"processFilters: filterHeaders=$filterHeaders") + filtersByBlockHash <- filtersByBlockHashF _ = require( - filterHeaders.size == messages.size, - s"Filter batch size does not match filter header batch size ${messages.size} != ${filterHeaders.size}") + filterHeaders.size == filtersByBlockHash.values.size, + s"Filter batch size does not match filter header batch size ${filtersByBlockHash.values.size} != ${filterHeaders.size}" + ) compactFilterDbs <- FutureUtil.makeAsync { () => filterHeaders.map { filterHeader => - findFilterDbFromMessage(filterHeader, messagesByBlockHash) + findFilterDbFromMessage(filterHeader, filtersByBlockHash) } } _ <- filterDAO.createAll(compactFilterDbs) @@ -423,7 +448,8 @@ class ChainHandler( this // Should never have the case where we have (Some, None) or (None, Some) because that means the vec would be both empty and non empty case (_, _) => - logger.warn("Was unable to process any filters") + logger.warn( + s"Was unable to process any filters minHeightOpt=$minHeightOpt maxHeightOpt=$maxHeightOpt compactFilterDbs.length=${compactFilterDbs.length}") this } } diff --git a/chain/src/main/scala/org/bitcoins/chain/models/CompactFilterDAO.scala b/chain/src/main/scala/org/bitcoins/chain/models/CompactFilterDAO.scala index 009834e0d2..787623cb9b 100644 --- a/chain/src/main/scala/org/bitcoins/chain/models/CompactFilterDAO.scala +++ b/chain/src/main/scala/org/bitcoins/chain/models/CompactFilterDAO.scala @@ -87,6 +87,12 @@ case class CompactFilterDAO()(implicit read(hash) } + def findByBlockHashes( + hashes: Vector[DoubleSha256DigestBE]): Future[Vector[CompactFilterDb]] = { + val action = findByPrimaryKeys(hashes).result + safeDatabase.runVec(action) + } + /** Retrieves a [[CompactFilterDb]] at the given height */ def getAtHeight(height: Int): Future[Vector[CompactFilterDb]] = { val query = getAtHeightQuery(height) diff --git a/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala b/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala index 8c9ca38f6a..a2b5b936ea 100644 --- a/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala @@ -47,7 +47,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor { node = node, state = HeaderSync, initialSyncDone = None, - currentFilterBatch = Vector.empty, + currentFilterBatch = Set.empty, filterHeaderHeightOpt = None, filterHeightOpt = None, syncPeer = Some(peer) @@ -98,7 +98,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor { node = node, state = HeaderSync, initialSyncDone = None, - currentFilterBatch = Vector.empty, + currentFilterBatch = Set.empty, filterHeaderHeightOpt = None, filterHeightOpt = None, syncPeer = Some(peer) @@ -143,7 +143,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor { node = node, state = HeaderSync, initialSyncDone = None, - currentFilterBatch = Vector.empty, + currentFilterBatch = Set.empty, filterHeaderHeightOpt = None, filterHeightOpt = None, syncPeer = Some(peer) @@ -209,7 +209,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor { node = node, state = HeaderSync, initialSyncDone = None, - currentFilterBatch = Vector.empty, + currentFilterBatch = Set.empty, filterHeaderHeightOpt = None, filterHeightOpt = None, syncPeer = Some(peer) diff --git a/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala b/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala index 71fed2fa35..8f0f793baf 100644 --- a/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala +++ b/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala @@ -67,7 +67,7 @@ case class NeutrinoNode( node = this, state = HeaderSync, initialSyncDone = None, - currentFilterBatch = Vector.empty, + currentFilterBatch = Set.empty, filterHeaderHeightOpt = filterHeaderCountOpt, filterHeightOpt = filterCountOpt, syncPeer = None diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala index 571f615271..0fd59e01ae 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala @@ -33,7 +33,7 @@ case class DataMessageHandler( node: Node, state: DataMessageHandlerState, initialSyncDone: Option[Promise[Done]], - currentFilterBatch: Vector[CompactFilterMessage], + currentFilterBatch: Set[CompactFilterMessage], filterHeaderHeightOpt: Option[Int], filterHeightOpt: Option[Int], syncPeer: Option[Peer])(implicit @@ -49,7 +49,7 @@ case class DataMessageHandler( private val syncing: Boolean = syncPeer.isDefined def reset: DataMessageHandler = copy(initialSyncDone = None, - currentFilterBatch = Vector.empty, + currentFilterBatch = Set.empty, filterHeaderHeightOpt = None, filterHeightOpt = None, syncPeer = None, @@ -193,8 +193,8 @@ case class DataMessageHandler( syncing } // If we are not syncing or our filter batch is full, process the filters - filterBatch = currentFilterBatch :+ filter - (newBatch, newChainApi) <- { + filterBatch = currentFilterBatch.+(filter) + (newBatch: Set[CompactFilterMessage], newChainApi) <- { if (!newSyncing || batchSizeFull) { val blockFilters = filterBatch.map { filter => (filter.blockHash, @@ -202,11 +202,12 @@ case class DataMessageHandler( } logger.info(s"Processing ${filterBatch.size} filters") for { - newChainApi <- chainApi.processFilters(filterBatch) + newChainApi <- chainApi.processFilters(filterBatch.toVector) _ <- appConfig.callBacks - .executeOnCompactFiltersReceivedCallbacks(blockFilters) - } yield (Vector.empty, newChainApi) + .executeOnCompactFiltersReceivedCallbacks( + blockFilters.toVector) + } yield (Set.empty, newChainApi) } else Future.successful((filterBatch, chainApi)) } _ <-