From d983a1bac4716e6385d6d6fd6e73a35a15c489a0 Mon Sep 17 00:00:00 2001 From: Chris Stewart Date: Thu, 28 Dec 2023 10:43:01 -0600 Subject: [PATCH] 2023 12 24 Make filter headers / filters sync work with block header reorgs (#5333) * Rework ChainApi.nextBlockHeaderBatchRange() to take stopHash parameter, get chainTest/test passing * WIP: Refactor ChainApi.nextFilterHeaderBatchRange() * Finish ChainApi.nextFilterHeaderBatchRange() refactor, get all tests passing except reorg related tests in nodeTest * Get NeutrinoNodeTest reorg test case working * Improve flaky test * Cleanup * Switch sync check to bitcoinds(1) * Empty commit to run CI * Implement batchSize tests in ChainHandlerTest * Rework ChainHandlerTest to not assume we have the genesis filter header / filter inserted into the database already * Cleanup println * Fix bug with nextFilterHeaderBatchRange() wrt to startHeightOpt parameter * Fix off by one bug with compact filter sync * Add check for connectionCount * Add longer timeout * scalafmt --- .../rpc/client/common/BitcoindRpcClient.scala | 6 +- .../blockchain/ChainHandlerCachedTest.scala | 39 +- .../chain/blockchain/ChainHandlerTest.scala | 358 ++++++++++++++++-- .../chain/blockchain/ChainHandler.scala | 279 ++++++++++---- .../chain/blockchain/ChainHandlerCached.scala | 10 +- .../bitcoins/core/api/chain/ChainApi.scala | 25 +- .../core/api/chain/FilterSyncMarker.scala | 6 +- .../bitcoins/core/p2p/NetworkPayload.scala | 2 + .../org/bitcoins/node/NeutrinoNodeTest.scala | 6 + ...NeutrinoNodeWithUncachedBitcoindTest.scala | 2 +- .../scala/org/bitcoins/node/PeerManager.scala | 54 ++- .../networking/peer/DataMessageHandler.scala | 74 ++-- .../bitcoins/testkit/node/BaseNodeTest.scala | 6 +- 13 files changed, 700 insertions(+), 167 deletions(-) diff --git a/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/common/BitcoindRpcClient.scala b/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/common/BitcoindRpcClient.scala index f78ba41b53..1e5f408bf2 100644 --- a/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/common/BitcoindRpcClient.scala +++ b/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/common/BitcoindRpcClient.scala @@ -222,14 +222,16 @@ class BitcoindRpcClient(override val instance: BitcoindInstance)(implicit override def nextBlockHeaderBatchRange( prevStopHash: DoubleSha256DigestBE, + stopHash: DoubleSha256DigestBE, batchSize: Int): Future[Option[FilterSyncMarker]] = Future.failed( new UnsupportedOperationException( s"Bitcoind chainApi doesn't allow you fetch block header batch range")) override def nextFilterHeaderBatchRange( - startHeight: Int, - batchSize: Int): Future[Option[FilterSyncMarker]] = + stopBlockHash: DoubleSha256DigestBE, + batchSize: Int, + startHeightOpt: Option[Int]): Future[Option[FilterSyncMarker]] = Future.failed( new UnsupportedOperationException( s"Bitcoind chainApi doesn't allow you fetch filter header batch range")) diff --git a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerCachedTest.scala b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerCachedTest.scala index 2fe13af3da..4cc2746951 100644 --- a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerCachedTest.scala +++ b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerCachedTest.scala @@ -48,7 +48,11 @@ class ChainHandlerCachedTest extends ChainDbUnitTest { chainHandler.chainConfig.chain.genesisBlock.blockHeader val assert1F = for { rangeOpt <- - chainHandler.nextBlockHeaderBatchRange(DoubleSha256DigestBE.empty, 1) + chainHandler.nextBlockHeaderBatchRange(prevStopHash = + DoubleSha256DigestBE.empty, + stopHash = + genesisHeader.hashBE, + batchSize = 1) } yield { val marker = rangeOpt.get assert(rangeOpt.nonEmpty) @@ -72,7 +76,10 @@ class ChainHandlerCachedTest extends ChainDbUnitTest { for { chainApi <- chainApi2 rangeOpt <- - chainApi.nextBlockHeaderBatchRange(DoubleSha256DigestBE.empty, 2) + chainApi.nextBlockHeaderBatchRange(prevStopHash = + DoubleSha256DigestBE.empty, + stopHash = blockHeader.hashBE, + batchSize = 2) } yield { val marker = rangeOpt.get assert(rangeOpt.nonEmpty) @@ -91,12 +98,13 @@ class ChainHandlerCachedTest extends ChainDbUnitTest { //two competing headers B,C built off of A //so just pick the first headerB to be our next block header batch - val assert1F = for { + val assert0F = for { chainHandler <- chainHandlerF newHeaderB <- newHeaderBF newHeaderC <- newHeaderCF blockHeaderBatchOpt <- chainHandler.nextBlockHeaderBatchRange( prevStopHash = ChainTestUtil.regTestGenesisHeaderDb.hashBE, + stopHash = newHeaderB.hashBE, batchSize = batchSize) } yield { assert(blockHeaderBatchOpt.isDefined) @@ -107,6 +115,26 @@ class ChainHandlerCachedTest extends ChainDbUnitTest { assert(newHeaderB.height == marker.startHeight) } + //two competing headers B,C built off of A + //pick headerC to be our next block header batch + val assert1F = for { + _ <- assert0F + chainHandler <- chainHandlerF + newHeaderB <- newHeaderBF + newHeaderC <- newHeaderCF + blockHeaderBatchOpt <- chainHandler.nextBlockHeaderBatchRange( + prevStopHash = ChainTestUtil.regTestGenesisHeaderDb.hashBE, + stopHash = newHeaderC.hashBE, + batchSize = batchSize) + } yield { + assert(blockHeaderBatchOpt.isDefined) + val marker = blockHeaderBatchOpt.get + ChainHandlerTest.checkReorgHeaders(header1 = newHeaderB, + header2 = newHeaderC, + bestHash = marker.stopBlockHash.flip) + assert(newHeaderC.height == marker.startHeight) + } + //now let's build a new block header ontop of C and process it //when we call chainHandler.nextBlockHeaderBatchRange it //should be C's hash instead of B's hash @@ -118,6 +146,7 @@ class ChainHandlerCachedTest extends ChainDbUnitTest { chainApiD <- chainHandler.processHeader(headerD.blockHeader) blockHeaderBatchOpt <- chainApiD.nextBlockHeaderBatchRange( prevStopHash = ChainTestUtil.regTestGenesisHeaderDb.hashBE, + stopHash = headerD.hashBE, batchSize = batchSize) count <- chainApiD.getBlockCount() } yield { @@ -135,7 +164,9 @@ class ChainHandlerCachedTest extends ChainDbUnitTest { val assert1F = for { bestBlockHash <- chainHandler.getBestBlockHash() rangeOpt <- - chainHandler.nextBlockHeaderBatchRange(bestBlockHash, 1) + chainHandler.nextBlockHeaderBatchRange(prevStopHash = bestBlockHash, + stopHash = bestBlockHash, + batchSize = 1) } yield { assert(rangeOpt.isEmpty) } diff --git a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala index c044099cf5..c04a7685ba 100644 --- a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala +++ b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala @@ -38,7 +38,7 @@ class ChainHandlerTest extends ChainDbUnitTest { ChainFixtureTag.GenesisChainHandlerWithFilter override def withFixture(test: OneArgAsyncTest): FutureOutcome = - withChainHandlerGenesisFilter(test) + withChainHandler(test) val genesis: BlockHeaderDb = ChainTestUtil.genesisHeaderDb behavior of "ChainHandler" @@ -53,6 +53,16 @@ class ChainHandlerTest extends ChainDbUnitTest { nonce = UInt32(2083236893) ) + private def insertGenesisFilterHeaderAndFilter( + chainHandler: ChainHandler): Future[Unit] = { + for { + _ <- chainHandler.processFilterHeader( + ChainTestUtil.genesisFilterHeaderDb.filterHeader, + ChainTestUtil.genesisHeaderDb.hashBE) + _ <- chainHandler.processFilter(ChainTestUtil.genesisFilterMessage) + } yield () + } + it must "process a new valid block header, and then be able to fetch that header" in { chainHandler: ChainHandler => val newValidHeader = @@ -218,9 +228,12 @@ class ChainHandlerTest extends ChainDbUnitTest { it must "get the highest filter header" in { chainHandler: ChainHandler => { for { + initFhCount <- chainHandler.getFilterHeaderCount() + _ <- insertGenesisFilterHeaderAndFilter(chainHandler) count <- chainHandler.getFilterHeaderCount() genesisFilterHeader <- chainHandler.getFilterHeadersAtHeight(count) } yield { + assert(initFhCount == 0) assert(genesisFilterHeader.size == 1) assert( genesisFilterHeader.contains(ChainTestUtil.genesisFilterHeaderDb)) @@ -318,9 +331,12 @@ class ChainHandlerTest extends ChainDbUnitTest { { for { count <- chainHandler.getFilterCount() - genesisFilter <- chainHandler.getFiltersAtHeight(count) + _ <- insertGenesisFilterHeaderAndFilter(chainHandler) + genesisFilter <- chainHandler.getFiltersAtHeight(0) + count1 <- chainHandler.getFilterCount() } yield { assert(count == 0) + assert(count1 == 0) assert(genesisFilter.contains(ChainTestUtil.genesisFilterDb)) assert( genesisFilter.head.golombFilter == ChainTestUtil.genesisFilterDb.golombFilter) @@ -331,6 +347,7 @@ class ChainHandlerTest extends ChainDbUnitTest { it must "NOT create an unknown filter" in { chainHandler: ChainHandler => { val unknownHashF = for { + _ <- insertGenesisFilterHeaderAndFilter(chainHandler) _ <- chainHandler.processHeader(nextBlockHeader) blockHashBE <- chainHandler.getHeadersAtHeight(1).map(_.head.hashBE) golombFilter = BlockFilter.fromHex("017fa880", blockHashBE.flip) @@ -389,16 +406,20 @@ class ChainHandlerTest extends ChainDbUnitTest { } } - it must "generate a range for a block filter query for the genesis block" in { + it must "generate a range for a filter header query for the genesis block" in { chainHandler: ChainHandler => val genesisHeader = chainHandler.chainConfig.chain.genesisBlock.blockHeader val assert1F = for { rangeOpt <- - chainHandler.nextBlockHeaderBatchRange(DoubleSha256DigestBE.empty, 1) + chainHandler.nextBlockHeaderBatchRange(prevStopHash = + DoubleSha256DigestBE.empty, + stopHash = + genesisHeader.hashBE, + batchSize = 1) } yield { - val marker = rangeOpt.get assert(rangeOpt.nonEmpty) + val marker = rangeOpt.get assert(marker.startHeight == 0) assert(marker.stopBlockHash == genesisHeader.hash) } @@ -419,7 +440,9 @@ class ChainHandlerTest extends ChainDbUnitTest { for { chainApi <- chainApi2 rangeOpt <- - chainApi.nextBlockHeaderBatchRange(DoubleSha256DigestBE.empty, 2) + chainApi.nextBlockHeaderBatchRange(DoubleSha256DigestBE.empty, + stopHash = blockHeader.hashBE, + batchSize = 2) } yield { val marker = rangeOpt.get assert(rangeOpt.nonEmpty) @@ -428,7 +451,7 @@ class ChainHandlerTest extends ChainDbUnitTest { } } - it must "generate the correct range of block filters if a header is reorged" in { + it must "generate the correct range of filter headers if a block header is reorged" in { chainHandler: ChainHandler => val reorgFixtureF = buildChainHandlerCompetingHeaders(chainHandler) val chainHandlerF = reorgFixtureF.map(_.chainApi) @@ -437,13 +460,14 @@ class ChainHandlerTest extends ChainDbUnitTest { val batchSize = 100 //two competing headers B,C built off of A - //so just pick the first headerB to be our next block header batch - val assert1F = for { + //first specify header B to be syncing filter headers from + val assert0F = for { chainHandler <- chainHandlerF newHeaderB <- newHeaderBF newHeaderC <- newHeaderCF blockHeaderBatchOpt <- chainHandler.nextBlockHeaderBatchRange( prevStopHash = ChainTestUtil.regTestGenesisHeaderDb.hashBE, + stopHash = newHeaderB.hashBE, batchSize = batchSize) } yield { assert(blockHeaderBatchOpt.isDefined) @@ -452,6 +476,28 @@ class ChainHandlerTest extends ChainDbUnitTest { header2 = newHeaderC, bestHash = marker.stopBlockHash.flip) assert(newHeaderB.height == marker.startHeight) + assert(newHeaderB.hashBE == marker.stopBlockHash.flip) + } + + //two competing headers B,C built off of A + //first specify header C to be syncing filter headers from + val assert1F = for { + _ <- assert0F + chainHandler <- chainHandlerF + newHeaderB <- newHeaderBF + newHeaderC <- newHeaderCF + blockHeaderBatchOpt <- chainHandler.nextBlockHeaderBatchRange( + prevStopHash = ChainTestUtil.regTestGenesisHeaderDb.hashBE, + stopHash = newHeaderC.hashBE, + batchSize = batchSize) + } yield { + assert(blockHeaderBatchOpt.isDefined) + val marker = blockHeaderBatchOpt.get + ChainHandlerTest.checkReorgHeaders(header1 = newHeaderB, + header2 = newHeaderC, + bestHash = marker.stopBlockHash.flip) + assert(newHeaderC.height == marker.startHeight) + assert(newHeaderC.hashBE == marker.stopBlockHash.flip) } //now let's build a new block header ontop of C and process it @@ -465,6 +511,7 @@ class ChainHandlerTest extends ChainDbUnitTest { chainApiD <- chainHandler.processHeader(headerD.blockHeader) blockHeaderBatchOpt <- chainApiD.nextBlockHeaderBatchRange( prevStopHash = ChainTestUtil.regTestGenesisHeaderDb.hashBE, + stopHash = headerD.hashBE, batchSize = batchSize) count <- chainApiD.getBlockCount() } yield { @@ -472,7 +519,7 @@ class ChainHandlerTest extends ChainDbUnitTest { assert(blockHeaderBatchOpt.isDefined) val marker = blockHeaderBatchOpt.get assert(headerC.height == marker.startHeight) - assert(headerD.hash == marker.stopBlockHash) + assert(headerD.hashBE == marker.stopBlockHash.flip) } } @@ -482,30 +529,287 @@ class ChainHandlerTest extends ChainDbUnitTest { val assert1F = for { bestBlockHash <- chainHandler.getBestBlockHash() rangeOpt <- - chainHandler.nextBlockHeaderBatchRange(bestBlockHash, 1) + chainHandler.nextBlockHeaderBatchRange(prevStopHash = bestBlockHash, + stopHash = bestBlockHash, + batchSize = 1) } yield { - assert(rangeOpt.isEmpty) + assert(rangeOpt.isEmpty, s"rangeOpt=$rangeOpt") } assert1F } - it must "generate a range for a block filter header query" in { + it must "nextBlockHeaderBatchRange must honor the batchSize query" in { chainHandler: ChainHandler => + val reorgFixtureF = buildChainHandlerCompetingHeaders(chainHandler) + val chainHandlerF = reorgFixtureF.map(_.chainApi) + val newHeaderBF = reorgFixtureF.map(_.headerDb1) + val newHeaderCF = reorgFixtureF.map(_.headerDb2) + + //two competing headers B,C built off of A + //first specify header C to be syncing filter headers from + val assert1F = for { + chainHandler <- chainHandlerF + newHeaderB <- newHeaderBF + newHeaderC <- newHeaderCF + blockHeaderBatchOpt <- chainHandler.nextBlockHeaderBatchRange( + prevStopHash = ChainTestUtil.regTestGenesisHeaderDb.hashBE, + stopHash = newHeaderC.hashBE, + batchSize = 1) + } yield { + assert(blockHeaderBatchOpt.isDefined) + val marker = blockHeaderBatchOpt.get + ChainHandlerTest.checkReorgHeaders(header1 = newHeaderB, + header2 = newHeaderC, + bestHash = marker.stopBlockHash.flip) + assert(marker.startHeight == 1) + assert(newHeaderC.hashBE == marker.stopBlockHash.flip) + } + + //now let's build a new block header ontop of C and process it + //when we call chainHandler.nextBlockHeaderBatchRange it + //should be C's hash instead of D's hash due to batchSize + val assert2F = for { + _ <- assert1F + chainHandler <- chainHandlerF + headerC <- newHeaderCF + headerD = BlockHeaderHelper.buildNextHeader(headerC) + chainApiD <- chainHandler.processHeader(headerD.blockHeader) + blockHeaderBatchOpt <- chainApiD.nextBlockHeaderBatchRange( + prevStopHash = ChainTestUtil.regTestGenesisHeaderDb.hashBE, + stopHash = headerD.hashBE, + batchSize = 1) + } yield { + assert(blockHeaderBatchOpt.isDefined) + val marker = blockHeaderBatchOpt.get + assert(marker.startHeight == 1) + assert(headerC.hashBE == marker.stopBlockHashBE) + } + + val headerDF = { + newHeaderCF.map(headerC => BlockHeaderHelper.buildNextHeader(headerC)) + } + + val assert3F = for { + _ <- assert2F + chainHandler <- chainHandlerF + headerD <- headerDF + chainApiD <- chainHandler.processHeader(headerD.blockHeader) + blockHeaderBatchOpt <- chainApiD.nextBlockHeaderBatchRange( + prevStopHash = ChainTestUtil.regTestGenesisHeaderDb.hashBE, + stopHash = headerD.hashBE, + batchSize = 2) + } yield { + assert(blockHeaderBatchOpt.isDefined) + val marker = blockHeaderBatchOpt.get + assert(marker.startHeight == 1) + assert(headerD.hashBE == marker.stopBlockHash.flip) + } + + //must return None in the case of reorg scenario between prevStopHash / stopHash for { - bestBlock <- chainHandler.getBestBlockHeader() - bestBlockHashBE = bestBlock.hashBE - rangeOpt <- chainHandler.nextFilterHeaderBatchRange(0, 1) + _ <- assert3F + chainHandler <- chainHandlerF + headerB <- newHeaderBF + headerD <- headerDF + //note headerB and headerD are not part of the same chain as D is built ontop of C + blockHeaderBatchOpt <- + chainHandler.nextBlockHeaderBatchRange(prevStopHash = headerB.hashBE, + stopHash = headerD.hashBE, + batchSize = 1) + } yield { + assert(blockHeaderBatchOpt.isEmpty) + } + } + + it must "generate the correct range of block filters if a header is reorged" in { + chainHandler: ChainHandler => + val reorgFixtureF = buildChainHandlerCompetingHeaders(chainHandler) + val chainHandlerF = reorgFixtureF.map(_.chainApi) + val newHeaderBF = reorgFixtureF.map(_.headerDb1) + val newHeaderCF = reorgFixtureF.map(_.headerDb2) + val batchSize = 100 + + //two competing headers B,C built off of A + //first specify header B to be syncing filter headers from + val assert0F = for { + chainHandler <- chainHandlerF + newHeaderB <- newHeaderBF + newHeaderC <- newHeaderCF + blockHeaderBatchOpt <- chainHandler.nextFilterHeaderBatchRange( + stopBlockHash = newHeaderB.hashBE, + batchSize = batchSize) + } yield { + assert(blockHeaderBatchOpt.isDefined) + val marker = blockHeaderBatchOpt.get + ChainHandlerTest.checkReorgHeaders(header1 = newHeaderB, + header2 = newHeaderC, + bestHash = marker.stopBlockHash.flip) + assert(marker.startHeight == 0) + assert(marker.stopBlockHash.flip == newHeaderB.hashBE) + } + + //two competing headers B,C built off of A + //first specify header C to be syncing filter headers from + val assert1F = for { + _ <- assert0F + chainHandler <- chainHandlerF + newHeaderB <- newHeaderBF + newHeaderC <- newHeaderCF + blockHeaderBatchOpt <- chainHandler.nextFilterHeaderBatchRange( + stopBlockHash = newHeaderC.hashBE, + batchSize = batchSize) + } yield { + assert(blockHeaderBatchOpt.isDefined) + val marker = blockHeaderBatchOpt.get + ChainHandlerTest.checkReorgHeaders(header1 = newHeaderB, + header2 = newHeaderC, + bestHash = marker.stopBlockHash.flip) + assert(marker.startHeight == 0) + assert(marker.stopBlockHash.flip == newHeaderC.hashBE) + } + + //now let's build a new block header ontop of C and process it + //when we call chainHandler.nextFilterHeaderBatchRange it + //should be C's hash instead of B's hash + for { + _ <- assert1F + chainHandler <- chainHandlerF + headerC <- newHeaderCF + headerD = BlockHeaderHelper.buildNextHeader(headerC) + chainApiD <- chainHandler.processHeader(headerD.blockHeader) + blockHeaderBatchOpt <- chainApiD.nextFilterHeaderBatchRange( + stopBlockHash = headerD.hashBE, + batchSize = batchSize) + count <- chainApiD.getBlockCount() + } yield { + assert(count == 2) + assert(blockHeaderBatchOpt.isDefined) + val marker = blockHeaderBatchOpt.get + assert(marker.startHeight == 0) + assert(headerD.hash == marker.stopBlockHash) + } + + } + + it must "generate a range for a block filter query for the genesis block" in { + chainHandler: ChainHandler => + val genesisHeader = + chainHandler.chainConfig.chain.genesisBlock.blockHeader + val assert1F = for { + rangeOpt <- + chainHandler.nextFilterHeaderBatchRange(stopBlockHash = + genesisHeader.hashBE, + 1) + } yield { + assert(rangeOpt.nonEmpty) + val marker = rangeOpt.get + assert(marker.startHeight == 0) + assert(marker.stopBlockHash == genesisHeader.hash) + } + + //let's process a block header, and then be able to fetch that header as the last stopHash + val blockHeaderDb = { + BlockHeaderDbHelper.fromBlockHeader(height = 0, + chainWork = + Pow.getBlockProof(genesisHeader), + bh = genesisHeader) + } + + val blockHeader = BlockHeaderHelper.buildNextHeader(blockHeaderDb) + val chainApi2 = assert1F.flatMap { _ => + chainHandler.processHeader(blockHeader.blockHeader) + } + + for { + chainApi <- chainApi2 + rangeOpt <- + chainApi.nextFilterHeaderBatchRange(stopBlockHash = + blockHeader.hashBE, + batchSize = 2) } yield { val marker = rangeOpt.get assert(rangeOpt.nonEmpty) assert(marker.startHeight == 0) - assert(marker.stopBlockHash == bestBlockHashBE.flip) + assert(marker.stopBlockHash == blockHeader.hash) } } + it must "nextFilterHeaderBatchRange must honor the batchSize query" in { + chainHandler: ChainHandler => + val reorgFixtureF = buildChainHandlerCompetingHeaders(chainHandler) + val chainHandlerF = reorgFixtureF.map(_.chainApi) + val newHeaderBF = reorgFixtureF.map(_.headerDb1) + val newHeaderCF = reorgFixtureF.map(_.headerDb2) + + //two competing headers B,C built off of A + //first specify header C to be syncing filter headers from + val assert1F = for { + chainHandler <- chainHandlerF + newHeaderB <- newHeaderBF + newHeaderC <- newHeaderCF + blockHeaderBatchOpt <- chainHandler.nextFilterHeaderBatchRange( + stopBlockHash = newHeaderC.hashBE, + batchSize = 1) + } yield { + assert(blockHeaderBatchOpt.isDefined) + val marker = blockHeaderBatchOpt.get + ChainHandlerTest.checkReorgHeaders(header1 = newHeaderB, + header2 = newHeaderC, + bestHash = marker.stopBlockHash.flip) + assert(marker.startHeight == 0) + assert(newHeaderC.hashBE == marker.stopBlockHash.flip) + } + + val headerDF = { + newHeaderCF.map(headerC => BlockHeaderHelper.buildNextHeader(headerC)) + } + //now let's build a new block header ontop of C and process it + //when we call chainHandler.nextFilterHeaderBatchRange with batchSize=2 + //should get D's hash back as the stop hash + val assert3F = for { + _ <- assert1F + chainHandler <- chainHandlerF + headerD <- headerDF + chainApiD <- chainHandler.processHeader(headerD.blockHeader) + blockHeaderBatchOpt <- chainApiD.nextFilterHeaderBatchRange( + stopBlockHash = headerD.hashBE, + batchSize = 2) + } yield { + assert(blockHeaderBatchOpt.isDefined) + val marker = blockHeaderBatchOpt.get + assert(marker.startHeight == 0) + assert(marker.stopBlockHash.flip == headerD.hashBE) + } + + assert3F + } + + it must "nextFilterHeaderBatchRange must honor the startHeightOpt parameter" in { + chainHandler => + val reorgFixtureF = buildChainHandlerCompetingHeaders(chainHandler) + val chainHandlerF = reorgFixtureF.map(_.chainApi) + val newHeaderCF = reorgFixtureF.map(_.headerDb2) + val assert1F = for { + chainHandler <- chainHandlerF + newHeaderC <- newHeaderCF + rangeOpt <- chainHandler.nextFilterHeaderBatchRange(stopBlockHash = + newHeaderC.hashBE, + batchSize = 2, + startHeightOpt = + Some(0)) + } yield { + assert(rangeOpt.nonEmpty, s"rangeOpt=$rangeOpt") + val range = rangeOpt.get + assert(range.startHeight == 0) + assert(range.stopBlockHashBE == newHeaderC.hashBE) + } + assert1F + } + it must "read compact filters for the database" in { chainHandler: ChainHandler => for { + _ <- insertGenesisFilterHeaderAndFilter(chainHandler) bestBlock <- chainHandler.getBestBlockHeader() filterHeader <- chainHandler.getFilterHeader(bestBlock.hashBE) filter <- chainHandler.getFilter(bestBlock.hashBE) @@ -608,7 +912,10 @@ class ChainHandlerTest extends ChainDbUnitTest { } it must "find filters between heights" in { chainHandler: ChainHandler => - chainHandler.getFiltersBetweenHeights(0, 1).map { filters => + for { + _ <- insertGenesisFilterHeaderAndFilter(chainHandler) + filters <- chainHandler.getFiltersBetweenHeights(0, 1) + } yield { val genesis = ChainTestUtil.genesisFilterDb val genesisFilterResponse = FilterResponse(genesis.golombFilter, genesis.blockHashBE, @@ -642,12 +949,10 @@ class ChainHandlerTest extends ChainDbUnitTest { chainHandler: ChainHandler => val filter = ChainTestUtil.genesisFilterMessage val filters = Vector.fill(2)(filter) - val filterCountBeforeF = chainHandler.getFilterCount() - val processF = - filterCountBeforeF.flatMap(_ => chainHandler.processFilters(filters)) for { - _ <- processF - beforeFilterCount <- filterCountBeforeF + _ <- insertGenesisFilterHeaderAndFilter(chainHandler) + beforeFilterCount <- chainHandler.getFilterCount() + _ <- chainHandler.processFilters(filters) filterCount <- chainHandler.getFilterCount() } yield { assert(beforeFilterCount == filterCount) @@ -682,11 +987,10 @@ class ChainHandlerTest extends ChainDbUnitTest { } it must "get best filter" in { chainHandler: ChainHandler => - val bestFilterOptF = chainHandler.getBestFilter() - val bestFilterHeaderOptF = chainHandler.getBestFilterHeader() for { - bestFilterHeaderOpt <- bestFilterHeaderOptF - bestFilterOpt <- bestFilterOptF + _ <- insertGenesisFilterHeaderAndFilter(chainHandler) + bestFilterHeaderOpt <- chainHandler.getBestFilterHeader() + bestFilterOpt <- chainHandler.getBestFilter() } yield { assert(bestFilterHeaderOpt.isDefined) assert(bestFilterOpt.isDefined) diff --git a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala index fcca509054..c516a654a1 100644 --- a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala +++ b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala @@ -191,108 +191,191 @@ class ChainHandler( getBestBlockHeader().map(_.hashBE) } - protected def nextBlockHeaderBatchRangeWithChains( - prevStopHash: DoubleSha256DigestBE, - batchSize: Int, - blockchains: Vector[Blockchain]): Future[Option[FilterSyncMarker]] = for { - prevBlockHeaderOpt <- getHeader(prevStopHash) - headerOpt <- { - prevBlockHeaderOpt match { - case Some(prevBlockHeader) => - findNextHeader(prevBlockHeader, batchSize, blockchains) - case None => - if (prevStopHash == DoubleSha256DigestBE.empty) { - getHeadersAtHeight(batchSize - 1).flatMap { headers => - val fsmOptF = if (headers.isEmpty) { - //just get best height? - getBestBlockHeader().map { bestHeader => - val f = FilterSyncMarker(0, bestHeader.hash) - Some(f) - } - } else if (headers.length == 1) { - val header = headers.head - val f = FilterSyncMarker(0, header.hash) - Future.successful(Some(f)) - } else { - //just select first header, i guess - val header = headers.head - val f = FilterSyncMarker(0, header.hash) - Future.successful(Some(f)) - } - fsmOptF - } - } else { - Future.successful(None) - } - - } - } - } yield headerOpt - /** @inheritdoc */ override def nextBlockHeaderBatchRange( prevStopHash: DoubleSha256DigestBE, - batchSize: Int): Future[Option[FilterSyncMarker]] = - for { - blockchains <- blockHeaderDAO.getBlockchains() - syncMarkerOpt <- nextBlockHeaderBatchRangeWithChains(prevStopHash, - batchSize, - blockchains) - } yield syncMarkerOpt + stopHash: DoubleSha256DigestBE, + batchSize: Int): Future[Option[FilterSyncMarker]] = { + if (prevStopHash == DoubleSha256DigestBE.empty) { + getHeadersAtHeight(batchSize - 1).map { headers => + if (headers.length == 1) { + val fsm = FilterSyncMarker(0, headers.head.hash) + Some(fsm) + } else { + logger.warn( + s"ChainHandler.nextBlockHeaderBatchRange() did not find a single header, got zero or multiple=$headers") + None + } + } + } else if (prevStopHash == stopHash) { + //means are are in sync + Future.successful(None) + } else { + val prevBlockHeaderOptF = getHeader(prevStopHash) + val stopBlockHeaderOptF = getHeader(stopHash) + val blockchainsF = blockHeaderDAO.getBlockchains() + for { + prevBlockHeaderOpt <- prevBlockHeaderOptF + stopBlockHeaderOpt <- stopBlockHeaderOptF + blockchains <- blockchainsF + fsmOpt <- getFilterSyncStopHash(prevBlockHeaderOpt = prevBlockHeaderOpt, + stopBlockHeaderOpt = stopBlockHeaderOpt, + blockchains = blockchains, + batchSize = batchSize) + } yield { + fsmOpt + } + } + } + + /** Retrieves a [[FilterSyncMarker]] respeciting the batchSize parameter. If the stopBlockHeader is not within the batchSize parameter + * we walk backwards until we find a header within the batchSize limit + */ + private def getFilterSyncStopHash( + prevBlockHeaderOpt: Option[BlockHeaderDb], + stopBlockHeaderOpt: Option[BlockHeaderDb], + blockchains: Vector[Blockchain], + batchSize: Int): Future[Option[FilterSyncMarker]] = { + (prevBlockHeaderOpt, stopBlockHeaderOpt) match { + case (prevBlockHeaderOpt, Some(stopBlockHeader)) => + findNextHeader(prevBlockHeaderOpt = prevBlockHeaderOpt, + stopBlockHeaderDb = stopBlockHeader, + batchSize = batchSize, + blockchains = blockchains) + case (Some(prevBlockHeader), None) => + val exn = new RuntimeException( + s"Cannot find a stopBlockHeader, only found prevBlockHeader=${prevBlockHeader.hashBE}") + Future.failed(exn) + case (None, None) => + val exn = new RuntimeException( + s"Cannot find prevBlocKHeader or stopBlockHeader") + Future.failed(exn) + } + } /** Finds the next header in the chain. Uses chain work to break ties * returning only the header in the chain with the most work, * else returns None if there is no next header + * @param prevBlockHeaderOpt the previous block header we synced from, if None we are syncing from genesis */ private def findNextHeader( - prevBlockHeader: BlockHeaderDb, + prevBlockHeaderOpt: Option[BlockHeaderDb], + stopBlockHeaderDb: BlockHeaderDb, batchSize: Int, blockchains: Vector[Blockchain]): Future[Option[FilterSyncMarker]] = { - val chainsF = { + val isInBatchSize = + (stopBlockHeaderDb.height - prevBlockHeaderOpt + .map(_.height) + .getOrElse(0)) <= batchSize + + val prevBlockHeaderHeight = { + prevBlockHeaderOpt + .map(_.height) + .getOrElse(-1) //means we are syncing from the genesis block + } + + val prevBlockHeaderHashBE = { + prevBlockHeaderOpt + .map(_.hashBE) + .getOrElse( + DoubleSha256DigestBE.empty + ) //means we are syncing from genesis block + } + + val chainsF: Future[Vector[Blockchain]] = { val inMemoryBlockchains = { - blockchains.filter( - _.exists(_.previousBlockHashBE == prevBlockHeader.hashBE)) + blockchains.filter { b => + hasBothBlockHeaderHashes(blockchain = b, + prevBlockHeaderHashBE = + prevBlockHeaderHashBE, + stopBlockHeaderHashBE = + stopBlockHeaderDb.hashBE) + } } if (inMemoryBlockchains.nonEmpty) { Future.successful(inMemoryBlockchains) } else { - blockHeaderDAO.getBlockchainsBetweenHeights( - from = prevBlockHeader.height, - to = prevBlockHeader.height + batchSize) + if (isInBatchSize) { + blockHeaderDAO + .getBlockchainFrom(stopBlockHeaderDb) + .map { + case Some(blockchain) => + val hasBothHashes = + hasBothBlockHeaderHashes( + blockchain = blockchain, + prevBlockHeaderHashBE = prevBlockHeaderHashBE, + stopBlockHeaderHashBE = stopBlockHeaderDb.hashBE) + if (hasBothHashes) { + Vector(blockchain) + } else { + Vector.empty + } + case None => Vector.empty + } + + } else { + blockHeaderDAO.getBlockchainsBetweenHeights( + from = prevBlockHeaderHeight, + to = prevBlockHeaderHeight + batchSize) + } + } } - val startHeight = { - if (prevBlockHeader.hashBE == chainConfig.chain.genesisHash.flip) { - 1 - } else { - prevBlockHeader.height + 1 - } - } + val startHeight = prevBlockHeaderHeight + 1 for { chains <- chainsF } yield { - val nextBlockHeaderOpt = getBestChainAtHeight(startHeight = startHeight, - batchSize = batchSize, - blockchains = chains) + val nextBlockHeaderOpt = { + if (chains.isEmpty) { + //means prevBlockHeader and stopBlockHeader do not form a blockchain + None + } else if (isInBatchSize) { + Some(FilterSyncMarker(startHeight, stopBlockHeaderDb.hash)) + } else { + //means our requested stopBlockHeader is outside of batchSize + //we need to find an intermediate header within batchSize to sync against + getBestChainAtHeight(startHeight = startHeight, + batchSize = batchSize, + blockchains = chains) + } + } + nextBlockHeaderOpt match { case Some(next) => //this means we are synced, so return None val isSynced = - next.stopBlockHash == prevBlockHeader.hash || next.stopBlockHash == chainConfig.chain.genesisHash + next.stopBlockHashBE == prevBlockHeaderHashBE if (isSynced) { None } else { nextBlockHeaderOpt } case None => - //log here? None } } } + private def hasBothBlockHeaderHashes( + blockchain: Blockchain, + prevBlockHeaderHashBE: DoubleSha256DigestBE, + stopBlockHeaderHashBE: DoubleSha256DigestBE): Boolean = { + if (prevBlockHeaderHashBE == DoubleSha256DigestBE.empty) { + //carve out here in the case of genesis header, + //blockchains don't contain a block header with hash 0x000..0000 + blockchain.exists(_.hashBE == stopBlockHeaderHashBE) + } else { + val hasHash1 = + blockchain.exists(_.hashBE == prevBlockHeaderHashBE) + val hasHash2 = + blockchain.exists(_.hashBE == stopBlockHeaderHashBE) + hasHash1 && hasHash2 + } + + } + /** Given a vector of blockchains, this method finds the chain with the most chain work * and then returns the given height and hash of the block header included in that chain * This is useful for reorg situations where you aren't sure what header is included in a chain @@ -325,27 +408,59 @@ class ChainHandler( /** @inheritdoc */ override def nextFilterHeaderBatchRange( - filterHeight: Int, - batchSize: Int): Future[Option[FilterSyncMarker]] = { - val startHeight = if (filterHeight <= 0) 0 else filterHeight + 1 - val stopHeight = startHeight - 1 + batchSize - - val stopBlockF = - getFilterHeadersAtHeight(stopHeight).map(_.headOption).flatMap { - case Some(stopBlock) => - Future.successful(stopBlock) + stopBlockHash: DoubleSha256DigestBE, + batchSize: Int, + startHeightOpt: Option[Int]): Future[Option[FilterSyncMarker]] = { + val stopBlockHeaderDbOptF = getHeader(stopBlockHash) + val blockchainsF = blockHeaderDAO.getBlockchains() + val startHeadersOptF: Future[Option[Vector[BlockHeaderDb]]] = + startHeightOpt match { + case Some(startHeight) => + getHeadersAtHeight(startHeight - 1).map { headers => + if (headers.isEmpty) None + else Some(headers) + } case None => - // This means the stop height is past the filter header height - getBestFilterHeader().map( - _.getOrElse(throw UnknownBlockHeight( - s"Unknown filter header height $stopHeight"))) + getBestFilter().flatMap { + case Some(bestFilter) => + getHeader(bestFilter.blockHashBE) + .map(_.toVector) + .map(Some(_)) + case None => + //means we need to sync from genesis filter + Future.successful(None) + } } - stopBlockF.map { stopBlock => - if (startHeight > stopBlock.height) - None - else - Some(FilterSyncMarker(startHeight, stopBlock.blockHashBE.flip)) + stopBlockHeaderDbOptF.flatMap { + case Some(stopBlockHeaderDb) => + for { + startHeadersOpt <- startHeadersOptF + blockchains <- blockchainsF + fsmOptVec <- { + startHeadersOpt match { + case Some(startHeaders) => + Future.traverse(startHeaders) { h => + findNextHeader(prevBlockHeaderOpt = Some(h), + stopBlockHeaderDb = stopBlockHeaderDb, + batchSize = batchSize, + blockchains = blockchains) + } + + case None => + val fsmOptF = findNextHeader(prevBlockHeaderOpt = None, + stopBlockHeaderDb = + stopBlockHeaderDb, + batchSize = batchSize, + blockchains = blockchains) + fsmOptF.map(Vector(_)) + } + } + } yield { + fsmOptVec.flatten.headOption + } + case None => + Future.successful(None) } } diff --git a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandlerCached.scala b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandlerCached.scala index d95f3f9f49..21d2054b8c 100644 --- a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandlerCached.scala +++ b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandlerCached.scala @@ -8,7 +8,7 @@ import org.bitcoins.chain.models.{ CompactFilterHeaderDAO } import org.bitcoins.core.api.chain.db.{BlockHeaderDb, CompactFilterHeaderDb} -import org.bitcoins.core.api.chain.{ChainApi, FilterSyncMarker} +import org.bitcoins.core.api.chain.{ChainApi} import org.bitcoins.core.protocol.blockchain.BlockHeader import org.bitcoins.crypto.DoubleSha256DigestBE @@ -51,14 +51,6 @@ case class ChainHandlerCached( override def getBestFilterHeader(): Future[Option[CompactFilterHeaderDb]] = { getBestFilterHeaderWithChains(blockchains) } - - override def nextBlockHeaderBatchRange( - prevStopHash: DoubleSha256DigestBE, - batchSize: Int): Future[Option[FilterSyncMarker]] = { - nextBlockHeaderBatchRangeWithChains(prevStopHash = prevStopHash, - batchSize = batchSize, - blockchains = blockchains) - } } object ChainHandlerCached { diff --git a/core/src/main/scala/org/bitcoins/core/api/chain/ChainApi.scala b/core/src/main/scala/org/bitcoins/core/api/chain/ChainApi.scala index 1ea59557db..3a9c5a3abd 100644 --- a/core/src/main/scala/org/bitcoins/core/api/chain/ChainApi.scala +++ b/core/src/main/scala/org/bitcoins/core/api/chain/ChainApi.scala @@ -75,16 +75,37 @@ trait ChainApi extends ChainQueryApi { /** Generates a block range in form of (startHeight, stopHash) by the given stop hash. * Returns None if we are synced + * + * @param prevStopHash our previous block hash where filter header sync stopped + * @param stopHash the block hash we want to sync the new batch of filters to + * @param batchSize the batch size of filter headers + * @return */ def nextBlockHeaderBatchRange( prevStopHash: DoubleSha256DigestBE, + stopHash: DoubleSha256DigestBE, batchSize: Int): Future[Option[FilterSyncMarker]] /** Generates a filter header range in form of (startHeight, stopHash) by the given stop hash. */ + final def nextFilterHeaderBatchRange( + stopBlockHash: DoubleSha256DigestBE, + batchSize: Int): Future[Option[FilterSyncMarker]] = { + nextFilterHeaderBatchRange(stopBlockHash = stopBlockHash, + batchSize = batchSize, + startHeightOpt = None) + } + + /** Generates a query for a range of compact filters + * @param stopBlockHash the block hash to stop receiving filters at + * @param batchSize + * @param startHeightOpt the block height to start syncing filters from. If None, we query our chainstate for the last filter we've seen + * @return + */ def nextFilterHeaderBatchRange( - startHeight: Int, - batchSize: Int): Future[Option[FilterSyncMarker]] + stopBlockHash: DoubleSha256DigestBE, + batchSize: Int, + startHeightOpt: Option[Int]): Future[Option[FilterSyncMarker]] /** Adds a compact filter into the filter database. */ diff --git a/core/src/main/scala/org/bitcoins/core/api/chain/FilterSyncMarker.scala b/core/src/main/scala/org/bitcoins/core/api/chain/FilterSyncMarker.scala index ad5ef3e301..c88babfe5c 100644 --- a/core/src/main/scala/org/bitcoins/core/api/chain/FilterSyncMarker.scala +++ b/core/src/main/scala/org/bitcoins/core/api/chain/FilterSyncMarker.scala @@ -1,6 +1,6 @@ package org.bitcoins.core.api.chain -import org.bitcoins.crypto.DoubleSha256Digest +import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} /** This is a helper class for syncing block filters following the * BIP157 protocol. This indicates the starting block height we are @@ -12,6 +12,8 @@ case class FilterSyncMarker( startHeight: Int, stopBlockHash: DoubleSha256Digest) { + val stopBlockHashBE: DoubleSha256DigestBE = stopBlockHash.flip + override def toString: String = - s"FilterSyncMarker(startHeight=$startHeight, stopBlockHash=${stopBlockHash.flip.hex})" + s"FilterSyncMarker(startHeight=$startHeight,stopBlockHashBE=${stopBlockHash.flip.hex})" } diff --git a/core/src/main/scala/org/bitcoins/core/p2p/NetworkPayload.scala b/core/src/main/scala/org/bitcoins/core/p2p/NetworkPayload.scala index 81b1853bf3..e0e54e9cf9 100644 --- a/core/src/main/scala/org/bitcoins/core/p2p/NetworkPayload.scala +++ b/core/src/main/scala/org/bitcoins/core/p2p/NetworkPayload.scala @@ -1101,6 +1101,8 @@ case class CompactFilterMessage( filterBytes: ByteVector ) extends DataPayload { + val blockHashBE: DoubleSha256DigestBE = blockHash.flip + /** The number of filter bytes in this message */ val numFilterBytes: CompactSizeUInt = CompactSizeUInt( UInt64(filterBytes.length)) diff --git a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala index 9357ecc612..b0957fe57f 100644 --- a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala @@ -303,11 +303,17 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair { //2023-05-01T21:46:46Z [net] Failed to find block filter hashes in index: filter_type=basic, start_height=208, stop_hash=303cc906bf99b5370581e7f23285378c18005745882c6112dbbf3e61a82aeddb val node = nodeConnectedWithBitcoind.node val bitcoind = nodeConnectedWithBitcoind.bitcoinds(0) + val bitcoind1 = nodeConnectedWithBitcoind.bitcoinds(1) //start syncing node val numBlocks = 5 val genBlocksF = { for { + nodeUri <- NodeTestUtil.getNodeURIFromBitcoind(bitcoind1) + _ <- bitcoind1.disconnectNode(nodeUri) + _ <- AsyncUtil.retryUntilSatisfiedF( + () => node.getConnectionCount.map(_ == 1), + 1.second) //generate blocks while sync is ongoing _ <- bitcoind.generate(numBlocks) } yield { diff --git a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithUncachedBitcoindTest.scala b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithUncachedBitcoindTest.scala index 2327599fba..ed96311572 100644 --- a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithUncachedBitcoindTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithUncachedBitcoindTest.scala @@ -163,7 +163,7 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor { peer = peers(1) _ <- node.peerManager.isConnected(peer).map(assert(_)) bitcoinds <- bitcoindsF - + _ <- NodeTestUtil.awaitAllSync(node, bitcoinds(1)) //disconnect bitcoind(0) as its not needed for this test node0Uri <- NodeTestUtil.getNodeURIFromBitcoind(bitcoinds(0)) _ <- bitcoinds(0).disconnectNode(node0Uri) diff --git a/node/src/main/scala/org/bitcoins/node/PeerManager.scala b/node/src/main/scala/org/bitcoins/node/PeerManager.scala index e30a9bc92e..d24bce2eca 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerManager.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerManager.scala @@ -79,14 +79,18 @@ case class PeerManager( case None => sys.error(s"Could not find peer=$syncPeer") } - val sendCompactFilterHeaderMsgF = + val bestBlockHashF = chainApi.getBestBlockHash() + val sendCompactFilterHeaderMsgF = bestBlockHashF.flatMap { bestBlockHash => PeerManager.sendNextGetCompactFilterHeadersCommand( peerMessageSenderApi = peerMsgSender, chainApi = chainApi, peer = syncPeer, filterHeaderBatchSize = chainAppConfig.filterHeaderBatchSize, - prevStopHash = bestFilterHeader.blockHashBE + prevStopHash = bestFilterHeader.blockHashBE, + stopHash = bestBlockHash ) + } + sendCompactFilterHeaderMsgF.flatMap { isSyncFilterHeaders => // If we have started syncing filters if ( @@ -97,7 +101,8 @@ case class PeerManager( peerMessageSenderApi = peerMsgSender, chainApi = chainApi, filterBatchSize = chainAppConfig.filterBatchSize, - startHeight = compactFilterStartHeight, + startHeightOpt = Some(compactFilterStartHeight), + stopBlockHash = bestFilterHeader.blockHashBE, peer = syncPeer ) .map(_ => ()) @@ -846,6 +851,7 @@ case class PeerManager( .sendFirstGetCompactFilterHeadersCommand( peerMessageSenderApi = peerMsgSender, chainApi = chainApi, + stopBlockHeaderDb = bestBlockHeader, state = fhs) .map(_ => ()) case x @ (_: FilterSync | _: HeaderSync) => @@ -942,6 +948,7 @@ object PeerManager extends Logging { def sendFirstGetCompactFilterHeadersCommand( peerMessageSenderApi: PeerMessageSenderApi, chainApi: ChainApi, + stopBlockHeaderDb: BlockHeaderDb, state: SyncNodeState)(implicit ec: ExecutionContext, chainConfig: ChainAppConfig): Future[ @@ -952,13 +959,27 @@ object PeerManager extends Logging { .getBestFilterHeader() blockHash = bestFilterHeaderOpt match { case Some(filterHeaderDb) => - filterHeaderDb.blockHashBE + //need to check for reorg scenarios here + val isSameHeight = filterHeaderDb.height == stopBlockHeaderDb.height + val isNotSameBlockHash = + filterHeaderDb.blockHashBE != stopBlockHeaderDb.hashBE + if (isSameHeight && isNotSameBlockHash) { + //need to start from previous header has to sync filter headers + //correctly in a reorg scenario + stopBlockHeaderDb.previousBlockHashBE + } else { + filterHeaderDb.blockHashBE + } + case None => DoubleSha256DigestBE.empty } - hashHeightOpt <- chainApi.nextBlockHeaderBatchRange( - prevStopHash = blockHash, - batchSize = chainConfig.filterHeaderBatchSize) + hashHeightOpt <- { + chainApi.nextBlockHeaderBatchRange(prevStopHash = blockHash, + stopHash = stopBlockHeaderDb.hashBE, + batchSize = + chainConfig.filterHeaderBatchSize) + } res <- hashHeightOpt match { case Some(filterSyncMarker) => peerMessageSenderApi @@ -981,11 +1002,13 @@ object PeerManager extends Logging { chainApi: ChainApi, peer: Peer, filterHeaderBatchSize: Int, - prevStopHash: DoubleSha256DigestBE)(implicit + prevStopHash: DoubleSha256DigestBE, + stopHash: DoubleSha256DigestBE)(implicit ec: ExecutionContext): Future[Boolean] = { for { filterSyncMarkerOpt <- chainApi.nextBlockHeaderBatchRange( prevStopHash = prevStopHash, + stopHash = stopHash, batchSize = filterHeaderBatchSize) res <- filterSyncMarkerOpt match { case Some(filterSyncMarker) => @@ -1006,11 +1029,14 @@ object PeerManager extends Logging { peerMessageSenderApi: PeerMessageSenderApi, chainApi: ChainApi, filterBatchSize: Int, - startHeight: Int, + startHeightOpt: Option[Int], + stopBlockHash: DoubleSha256DigestBE, peer: Peer)(implicit ec: ExecutionContext): Future[Boolean] = { for { filterSyncMarkerOpt <- - chainApi.nextFilterHeaderBatchRange(startHeight, filterBatchSize) + chainApi.nextFilterHeaderBatchRange(stopBlockHash = stopBlockHash, + batchSize = filterBatchSize, + startHeightOpt = startHeightOpt) res <- filterSyncMarkerOpt match { case Some(filterSyncMarker) => logger.info( @@ -1028,16 +1054,18 @@ object PeerManager extends Logging { def fetchCompactFilterHeaders( state: SyncNodeState, //can we tighten this type up? chainApi: ChainApi, - peerMessageSenderApi: PeerMessageSenderApi)(implicit + peerMessageSenderApi: PeerMessageSenderApi, + stopBlockHeaderDb: BlockHeaderDb)(implicit ec: ExecutionContext, chainAppConfig: ChainAppConfig): Future[ Option[NodeState.FilterHeaderSync]] = { logger.info( - s"Now syncing filter headers from ${state.syncPeer} in state=${state}") + s"Now syncing filter headers from ${state.syncPeer} in state=${state} stopBlockHashBE=${stopBlockHeaderDb.hashBE}") for { newSyncingStateOpt <- PeerManager.sendFirstGetCompactFilterHeadersCommand( peerMessageSenderApi = peerMessageSenderApi, chainApi = chainApi, + stopBlockHeaderDb = stopBlockHeaderDb, state = state) } yield { newSyncingStateOpt @@ -1068,7 +1096,7 @@ object PeerManager extends Logging { chainApi.getBestFilter().flatMap { case Some(f) => //we have already started syncing filters, return the height of the last filter seen - Future.successful(f.height) + Future.successful(f.height + 1) case None => walletCreationTimeOpt match { case Some(instant) => diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala index 064f23f7ba..3f8216931f 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala @@ -173,12 +173,14 @@ case class DataMessageHandler( (newBatch: Set[CompactFilterMessage], newChainApi) <- { if (isFiltersSynced || batchSizeFull) { - logger.debug(s"Processing ${filterBatch.size} filters") val sortedBlockFiltersF = sortBlockFiltersByBlockHeight(filterBatch) for { sortedBlockFilters <- sortedBlockFiltersF sortedFilterMessages = sortedBlockFilters.map(_._2) + _ = logger.debug( + s"Processing ${filterBatch.size} filters bestBlockHashBE=${sortedFilterMessages.lastOption + .map(_.blockHashBE)}") newChainApi <- chainApi.processFilters(sortedFilterMessages) sortedGolombFilters = sortedBlockFilters.map(x => (x._1, x._3)) @@ -189,15 +191,20 @@ case class DataMessageHandler( } yield (Set.empty, newChainApi) } else Future.successful((filterBatch, chainApi)) } - (_, newFilterHeight) <- - calcFilterHeaderFilterHeight(newChainApi) filterHeaderSyncStateOpt <- if (batchSizeFull) { logger.debug( s"Received maximum amount of filters in one batch. This means we are not synced, requesting more") - sendNextGetCompactFilterCommand(peerMessageSenderApi, - newFilterHeight, - filterSyncState) + for { + bestBlockHash <- chainApi.getBestBlockHash() + fssOpt <- sendNextGetCompactFilterCommand( + peerMessageSenderApi = peerMessageSenderApi, + startHeightOpt = None, + stopBlockHash = bestBlockHash, + fs = filterSyncState) + } yield { + fssOpt + } } else Future.successful(Some(filterSyncState)) newDmhState <- { if (isFiltersSynced) { @@ -362,10 +369,12 @@ case class DataMessageHandler( /** syncs filter headers in case the header chain is still ahead post filter sync */ private def syncIfHeadersAhead( syncNodeState: SyncNodeState): Future[NodeState] = { + val bestBlockHeaderDbF = chainApi.getBestBlockHeader() for { headerHeight <- chainApi.getBestHashBlockHeight() filterHeaderCount <- chainApi.getFilterHeaderCount() filterCount <- chainApi.getFilterCount() + bestBlockHeaderDb <- bestBlockHeaderDbF newState <- { require(headerHeight >= Math.max(filterHeaderCount, filterCount), "Header chain cannot be behind filter or filter header chain") @@ -378,12 +387,14 @@ case class DataMessageHandler( val fhs = FilterHeaderSync(syncNodeState.syncPeer, syncNodeState.peers, syncNodeState.waitingForDisconnection) + for { syncingFilterHeadersState <- PeerManager - .sendFirstGetCompactFilterHeadersCommand(peerMessageSenderApi = - peerMessageSenderApi, - chainApi = chainApi, - state = fhs) + .sendFirstGetCompactFilterHeadersCommand( + peerMessageSenderApi = peerMessageSenderApi, + chainApi = chainApi, + state = fhs, + stopBlockHeaderDb = bestBlockHeaderDb) } yield { syncingFilterHeadersState.getOrElse( DoneSyncing(syncNodeState.peers, @@ -467,18 +478,21 @@ case class DataMessageHandler( private def sendNextGetCompactFilterHeadersCommand( peerMessageSenderApi: PeerMessageSenderApi, syncPeer: Peer, - prevStopHash: DoubleSha256DigestBE): Future[Boolean] = + prevStopHash: DoubleSha256DigestBE, + stopHash: DoubleSha256DigestBE): Future[Boolean] = PeerManager.sendNextGetCompactFilterHeadersCommand( peerMessageSenderApi = peerMessageSenderApi, chainApi = chainApi, peer = syncPeer, filterHeaderBatchSize = chainConfig.filterHeaderBatchSize, - prevStopHash = prevStopHash + prevStopHash = prevStopHash, + stopHash = stopHash ) private def sendNextGetCompactFilterCommand( peerMessageSenderApi: PeerMessageSenderApi, - startHeight: Int, + startHeightOpt: Option[Int], + stopBlockHash: DoubleSha256DigestBE, fs: NodeState.FilterSync): Future[Option[NodeState.FilterSync]] = { PeerManager @@ -486,7 +500,8 @@ case class DataMessageHandler( peerMessageSenderApi = peerMessageSenderApi, chainApi = chainApi, filterBatchSize = chainConfig.filterBatchSize, - startHeight = startHeight, + startHeightOpt = startHeightOpt, + stopBlockHash = stopBlockHash, peer = fs.syncPeer ) .map { isSyncing => @@ -503,9 +518,10 @@ case class DataMessageHandler( private def sendFirstGetCompactFilterCommand( peerMessageSenderApi: PeerMessageSenderApi, + stopBlockHash: DoubleSha256DigestBE, startHeight: Int, syncNodeState: SyncNodeState): Future[Option[NodeState.FilterSync]] = { - logger.info(s"Beginning to sync filters from startHeight=$startHeight") + logger.info(s"Beginning to sync filters to stopBlockHashBE=$stopBlockHash") val fs = syncNodeState match { case x @ (_: HeaderSync | _: FilterHeaderSync) => @@ -514,7 +530,8 @@ case class DataMessageHandler( } sendNextGetCompactFilterCommand(peerMessageSenderApi = peerMessageSenderApi, - startHeight = startHeight, + startHeightOpt = Some(startHeight), + stopBlockHash = stopBlockHash, fs = fs) } @@ -554,7 +571,6 @@ case class DataMessageHandler( for { (newFilterHeaderHeight, newFilterHeight) <- calcFilterHeaderFilterHeight( chainApi) - isSynced <- if (newFilterHeight == 0 && walletCreationTimeOpt.isDefined) { //if we have zero filters in our database and are syncing filters after a wallet creation time @@ -675,10 +691,18 @@ case class DataMessageHandler( // headers are synced now with the current sync peer, now move to validating it for all peers require(syncPeer == peer, s"syncPeer=$syncPeer peer=$peer") - val fhsOptF = PeerManager.fetchCompactFilterHeaders( - state = state, - chainApi = chainApi, - peerMessageSenderApi = peerMessageSenderApi) + val fhsOptF = { + for { + lastBlockHeaderDbOpt <- chainApi.getHeader(lastHash.flip) + fhs <- PeerManager.fetchCompactFilterHeaders( + state = state, + chainApi = chainApi, + peerMessageSenderApi = peerMessageSenderApi, + stopBlockHeaderDb = lastBlockHeaderDbOpt.get) + } yield { + fhs + } + } fhsOptF.map { case Some(s) => s case None => @@ -753,11 +777,13 @@ case class DataMessageHandler( peer: Peer): Future[NodeState] = { val filterHeaders = filterHeader.filterHeaders val blockCountF = chainApi.getBlockCount() + val bestBlockHashF = chainApi.getBestBlockHash() for { _ <- chainApi.processFilterHeaders(filterHeaders, filterHeader.stopHash.flip) filterHeaderCount <- chainApi.getFilterHeaderCount() blockCount <- blockCountF + bestBlockHash <- bestBlockHashF newState <- if (blockCount != filterHeaderCount) { logger.debug( @@ -765,15 +791,17 @@ case class DataMessageHandler( sendNextGetCompactFilterHeadersCommand( peerMessageSenderApi = peerMessageSenderApi, syncPeer = peer, - prevStopHash = filterHeader.stopHash.flip).map(_ => - filterHeaderSync) + prevStopHash = filterHeader.stopHash.flip, + stopHash = bestBlockHash).map(_ => filterHeaderSync) } else { for { startHeight <- PeerManager.getCompactFilterStartHeight( chainApi, walletCreationTimeOpt) + bestBlockHash <- bestBlockHashF filterSyncStateOpt <- sendFirstGetCompactFilterCommand( peerMessageSenderApi = peerMessageSenderApi, + stopBlockHash = bestBlockHash, startHeight = startHeight, syncNodeState = filterHeaderSync) } yield { diff --git a/testkit/src/main/scala/org/bitcoins/testkit/node/BaseNodeTest.scala b/testkit/src/main/scala/org/bitcoins/testkit/node/BaseNodeTest.scala index 1d10f9723e..5c64f65a4d 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/node/BaseNodeTest.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/node/BaseNodeTest.scala @@ -100,13 +100,15 @@ trait BaseNodeTest extends BitcoinSFixture with EmbeddedPg { Future.successful(this) override def nextBlockHeaderBatchRange( + prevStopHash: DoubleSha256DigestBE, stopHash: DoubleSha256DigestBE, batchSize: Int): Future[Option[FilterSyncMarker]] = Future.successful(None) override def nextFilterHeaderBatchRange( - startHeight: Int, - batchSize: Int): Future[Option[FilterSyncMarker]] = + stopBlockHash: DoubleSha256DigestBE, + batchSize: Int, + startHeightOpt: Option[Int]): Future[Option[FilterSyncMarker]] = Future.successful(None) override def processFilters(