From 3e6ff52194e0d7a5841af9d64ad493c1208cc7e8 Mon Sep 17 00:00:00 2001 From: Chris Stewart Date: Fri, 29 Sep 2023 07:35:26 -0500 Subject: [PATCH] 2023 09 27 issue 5139 (#5247) * Add test case for reorg on bitcoin network for NeutrinoNodeTest * Get unit tests passing * Refactor findNextHeader() to not take an Option[BlockHeaderDb], not it just takes BlockHeaderDb * Explicity return None in the case where we don't need sync filter headers * Fix off by one bug when starting filter header sync --- .../blockchain/ChainHandlerCachedTest.scala | 104 +++++++++++++++++- .../chain/blockchain/ChainHandlerTest.scala | 30 ++--- .../chain/blockchain/ChainHandler.scala | 92 ++++++++++------ .../bitcoins/core/api/chain/ChainApi.scala | 1 + .../org/bitcoins/node/NeutrinoNodeTest.scala | 23 ++++ .../scala/org/bitcoins/node/PeerManager.scala | 22 ++-- .../networking/peer/DataMessageHandler.scala | 2 +- 7 files changed, 218 insertions(+), 56 deletions(-) diff --git a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerCachedTest.scala b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerCachedTest.scala index 8f2fb62511..2fe13af3da 100644 --- a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerCachedTest.scala +++ b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerCachedTest.scala @@ -1,6 +1,9 @@ package org.bitcoins.chain.blockchain -import org.bitcoins.testkit.chain.ChainDbUnitTest +import org.bitcoins.chain.pow.Pow +import org.bitcoins.core.api.chain.db.BlockHeaderDbHelper +import org.bitcoins.crypto.DoubleSha256DigestBE +import org.bitcoins.testkit.chain.{BlockHeaderHelper, ChainDbUnitTest} import org.bitcoins.testkit.chain.fixture.ChainFixtureTag import org.bitcoins.testkitcore.chain.ChainTestUtil import org.scalatest.FutureOutcome @@ -39,4 +42,103 @@ class ChainHandlerCachedTest extends ChainDbUnitTest { } } + it must "generate a range for a block filter query for the genesis block" in { + chainHandler: ChainHandlerCached => + val genesisHeader = + chainHandler.chainConfig.chain.genesisBlock.blockHeader + val assert1F = for { + rangeOpt <- + chainHandler.nextBlockHeaderBatchRange(DoubleSha256DigestBE.empty, 1) + } yield { + val marker = rangeOpt.get + assert(rangeOpt.nonEmpty) + assert(marker.startHeight == 0) + assert(marker.stopBlockHash == genesisHeader.hash) + } + + //let's process a block header, and then be able to fetch that header as the last stopHash + val blockHeaderDb = { + BlockHeaderDbHelper.fromBlockHeader(height = 0, + chainWork = + Pow.getBlockProof(genesisHeader), + bh = genesisHeader) + } + + val blockHeader = BlockHeaderHelper.buildNextHeader(blockHeaderDb) + val chainApi2 = assert1F.flatMap { _ => + chainHandler.processHeader(blockHeader.blockHeader) + } + + for { + chainApi <- chainApi2 + rangeOpt <- + chainApi.nextBlockHeaderBatchRange(DoubleSha256DigestBE.empty, 2) + } yield { + val marker = rangeOpt.get + assert(rangeOpt.nonEmpty) + assert(marker.startHeight == 0) + assert(marker.stopBlockHash == blockHeader.hash) + } + } + + it must "generate the correct range of block filters if a header is reorged" in { + chainHandler: ChainHandler => + val reorgFixtureF = buildChainHandlerCompetingHeaders(chainHandler) + val chainHandlerF = reorgFixtureF.map(_.chainApi) + val newHeaderBF = reorgFixtureF.map(_.headerDb1) + val newHeaderCF = reorgFixtureF.map(_.headerDb2) + val batchSize = 100 + + //two competing headers B,C built off of A + //so just pick the first headerB to be our next block header batch + val assert1F = for { + chainHandler <- chainHandlerF + newHeaderB <- newHeaderBF + newHeaderC <- newHeaderCF + blockHeaderBatchOpt <- chainHandler.nextBlockHeaderBatchRange( + prevStopHash = ChainTestUtil.regTestGenesisHeaderDb.hashBE, + batchSize = batchSize) + } yield { + assert(blockHeaderBatchOpt.isDefined) + val marker = blockHeaderBatchOpt.get + ChainHandlerTest.checkReorgHeaders(header1 = newHeaderB, + header2 = newHeaderC, + bestHash = marker.stopBlockHash.flip) + assert(newHeaderB.height == marker.startHeight) + } + + //now let's build a new block header ontop of C and process it + //when we call chainHandler.nextBlockHeaderBatchRange it + //should be C's hash instead of B's hash + for { + _ <- assert1F + chainHandler <- chainHandlerF + headerC <- newHeaderCF + headerD = BlockHeaderHelper.buildNextHeader(headerC) + chainApiD <- chainHandler.processHeader(headerD.blockHeader) + blockHeaderBatchOpt <- chainApiD.nextBlockHeaderBatchRange( + prevStopHash = ChainTestUtil.regTestGenesisHeaderDb.hashBE, + batchSize = batchSize) + count <- chainApiD.getBlockCount() + } yield { + assert(count == 2) + assert(blockHeaderBatchOpt.isDefined) + val marker = blockHeaderBatchOpt.get + assert(headerC.height == marker.startHeight) + assert(headerD.hash == marker.stopBlockHash) + } + + } + + it must "return None for ChainHandler.nextBlockHeaderBatchRange if we are synced" in { + chainHandler: ChainHandler => + val assert1F = for { + bestBlockHash <- chainHandler.getBestBlockHash() + rangeOpt <- + chainHandler.nextBlockHeaderBatchRange(bestBlockHash, 1) + } yield { + assert(rangeOpt.isEmpty) + } + assert1F + } } diff --git a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala index d4799c302e..c044099cf5 100644 --- a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala +++ b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala @@ -25,7 +25,7 @@ import org.bitcoins.testkit.chain.fixture.ChainFixtureTag import org.bitcoins.testkit.chain.{BlockHeaderHelper, ChainDbUnitTest} import org.bitcoins.testkit.util.FileUtil import org.bitcoins.testkitcore.chain.ChainTestUtil -import org.scalatest.{Assertion, FutureOutcome} +import org.scalatest.{Assertion, Assertions, FutureOutcome} import play.api.libs.json.Json import scala.concurrent.Future @@ -122,9 +122,9 @@ class ChainHandlerTest extends ChainDbUnitTest { bestHash <- chainHandler.getBestBlockHash() newHeaderC <- newHeaderCF } yield { - checkReorgHeaders(header1 = newHeaderB, - header2 = newHeaderC, - bestHash = bestHash) + ChainHandlerTest.checkReorgHeaders(header1 = newHeaderB, + header2 = newHeaderC, + bestHash = bestHash) } // build a new header D off of C which was seen later @@ -448,9 +448,9 @@ class ChainHandlerTest extends ChainDbUnitTest { } yield { assert(blockHeaderBatchOpt.isDefined) val marker = blockHeaderBatchOpt.get - checkReorgHeaders(header1 = newHeaderB, - header2 = newHeaderC, - bestHash = marker.stopBlockHash.flip) + ChainHandlerTest.checkReorgHeaders(header1 = newHeaderB, + header2 = newHeaderC, + bestHash = marker.stopBlockHash.flip) assert(newHeaderB.height == marker.startHeight) } @@ -479,11 +479,10 @@ class ChainHandlerTest extends ChainDbUnitTest { it must "return None for ChainHandler.nextBlockHeaderBatchRange if we are synced" in { chainHandler: ChainHandler => - val genesisHeader = - chainHandler.chainConfig.chain.genesisBlock.blockHeader val assert1F = for { + bestBlockHash <- chainHandler.getBestBlockHash() rangeOpt <- - chainHandler.nextBlockHeaderBatchRange(genesisHeader.hashBE, 1) + chainHandler.nextBlockHeaderBatchRange(bestBlockHash, 1) } yield { assert(rangeOpt.isEmpty) } @@ -721,21 +720,24 @@ class ChainHandlerTest extends ChainDbUnitTest { } } +} + +object ChainHandlerTest { /** Checks that * 1. The header1 & header2 have the same chainwork * 2. Checks that header1 and header2 have the same time * 3. Checks bestHash is one of header1.hashBE or header2.hashBE */ - private def checkReorgHeaders( + def checkReorgHeaders( header1: BlockHeaderDb, header2: BlockHeaderDb, bestHash: DoubleSha256DigestBE): Assertion = { - assert(header1.chainWork == header2.chainWork) - assert(header1.time == header2.time) + Assertions.assert(header1.chainWork == header2.chainWork) + Assertions.assert(header1.time == header2.time) //if both chainwork and time are the same, we are left to //how the database serves up the data //just make sure it is one of the two headers - assert(Vector(header1.hashBE, header2.hashBE).contains(bestHash)) + Assertions.assert(Vector(header1.hashBE, header2.hashBE).contains(bestHash)) } } diff --git a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala index 01c03b94c3..b596e8c912 100644 --- a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala +++ b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala @@ -196,12 +196,37 @@ class ChainHandler( batchSize: Int, blockchains: Vector[Blockchain]): Future[Option[FilterSyncMarker]] = for { prevBlockHeaderOpt <- getHeader(prevStopHash) - headerOpt <- - if (prevBlockHeaderOpt.isDefined) - findNextHeader(prevBlockHeaderOpt, batchSize, blockchains) - else if (prevStopHash == DoubleSha256DigestBE.empty) - findNextHeader(None, batchSize, blockchains) - else Future.successful(None) + headerOpt <- { + prevBlockHeaderOpt match { + case Some(prevBlockHeader) => + findNextHeader(prevBlockHeader, batchSize, blockchains) + case None => + if (prevStopHash == DoubleSha256DigestBE.empty) { + getHeadersAtHeight(batchSize - 1).flatMap { headers => + val fsmOptF = if (headers.isEmpty) { + //just get best height? + getBestBlockHeader().map { bestHeader => + val f = FilterSyncMarker(0, bestHeader.hash) + Some(f) + } + } else if (headers.length == 1) { + val header = headers.head + val f = FilterSyncMarker(0, header.hash) + Future.successful(Some(f)) + } else { + //just select first header, i guess + val header = headers.head + val f = FilterSyncMarker(0, header.hash) + Future.successful(Some(f)) + } + fsmOptF + } + } else { + Future.successful(None) + } + + } + } } yield headerOpt /** @inheritdoc */ @@ -216,35 +241,33 @@ class ChainHandler( } yield syncMarkerOpt /** Finds the next header in the chain. Uses chain work to break ties - * returning only the header in the chain with the most work + * returning only the header in the chain with the most work, + * else returns None if there is no next header */ private def findNextHeader( - prevBlockHeaderOpt: Option[BlockHeaderDb], + prevBlockHeader: BlockHeaderDb, batchSize: Int, blockchains: Vector[Blockchain]): Future[Option[FilterSyncMarker]] = { - - val chainsF = prevBlockHeaderOpt match { - case None => - blockHeaderDAO.getBlockchainsBetweenHeights(from = 0, - to = batchSize - 1) - case Some(prevBlockHeader) => - val inMemoryBlockchains = { - blockchains.filter( - _.exists(_.previousBlockHashBE == prevBlockHeader.hashBE)) - } - if (inMemoryBlockchains.nonEmpty) { - Future.successful(inMemoryBlockchains) - } else { - blockHeaderDAO.getBlockchainsBetweenHeights( - from = prevBlockHeader.height, - to = prevBlockHeader.height + batchSize) - } + val chainsF = { + val inMemoryBlockchains = { + blockchains.filter( + _.exists(_.previousBlockHashBE == prevBlockHeader.hashBE)) + } + if (inMemoryBlockchains.nonEmpty) { + Future.successful(inMemoryBlockchains) + } else { + blockHeaderDAO.getBlockchainsBetweenHeights( + from = prevBlockHeader.height, + to = prevBlockHeader.height + batchSize) + } } - val startHeight = prevBlockHeaderOpt match { - case None => 0 - case Some(prevBlockHeader) => + val startHeight = { + if (prevBlockHeader.hashBE == chainConfig.chain.genesisHash.flip) { + 1 + } else { prevBlockHeader.height + 1 + } } for { @@ -253,16 +276,19 @@ class ChainHandler( val nextBlockHeaderOpt = getBestChainAtHeight(startHeight = startHeight, batchSize = batchSize, blockchains = chains) - (nextBlockHeaderOpt, prevBlockHeaderOpt) match { - case (Some(next), Some(prev)) => + nextBlockHeaderOpt match { + case Some(next) => //this means we are synced, so return None - if (next.stopBlockHash == prev.hash) { + val isSynced = + next.stopBlockHash == prevBlockHeader.hash || next.stopBlockHash == chainConfig.chain.genesisHash + if (isSynced) { None } else { nextBlockHeaderOpt } - case (Some(_), None) | (None, Some(_)) | (None, None) => - nextBlockHeaderOpt + case None => + //log here? + None } } } diff --git a/core/src/main/scala/org/bitcoins/core/api/chain/ChainApi.scala b/core/src/main/scala/org/bitcoins/core/api/chain/ChainApi.scala index 2ef6a7978f..1ea59557db 100644 --- a/core/src/main/scala/org/bitcoins/core/api/chain/ChainApi.scala +++ b/core/src/main/scala/org/bitcoins/core/api/chain/ChainApi.scala @@ -74,6 +74,7 @@ trait ChainApi extends ChainQueryApi { stopHash: DoubleSha256DigestBE): Future[ChainApi] /** Generates a block range in form of (startHeight, stopHash) by the given stop hash. + * Returns None if we are synced */ def nextBlockHeaderBatchRange( prevStopHash: DoubleSha256DigestBE, diff --git a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala index ba577c4328..67d871a196 100644 --- a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala @@ -385,4 +385,27 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair { 1.second) } yield succeed } + + it must "handle reorgs correctly" in { + nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoinds => + //https://github.com/bitcoin-s/bitcoin-s/issues/5017 + val node = nodeConnectedWithBitcoind.node + val bitcoinds = nodeConnectedWithBitcoind.bitcoinds + val bitcoind0 = bitcoinds(0) + val bitcoind1 = bitcoinds(1) + for { + _ <- NodeTestUtil.awaitAllSync(node, bitcoind0) + //disconnect bitcoind1 as we don't need it + nodeUri1 <- NodeTestUtil.getNodeURIFromBitcoind(bitcoind1) + _ <- bitcoind1.disconnectNode(nodeUri1) + bestBlockHash0 <- bitcoind0.getBestBlockHash() + _ <- bitcoind0.invalidateBlock(bestBlockHash0) + //now generate a block, make sure we sync with them + _ <- bitcoind0.generate(1) + _ <- AsyncUtil.nonBlockingSleep(1.second) + //generate another block to make sure the reorg is complete + _ <- bitcoind0.generate(1) + _ <- NodeTestUtil.awaitAllSync(node, bitcoind0) + } yield succeed + } } diff --git a/node/src/main/scala/org/bitcoins/node/PeerManager.scala b/node/src/main/scala/org/bitcoins/node/PeerManager.scala index dab059129d..10f802342d 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerManager.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerManager.scala @@ -1182,18 +1182,20 @@ case class ResponseTimeout(payload: NetworkPayload) object PeerManager extends Logging { + /** Sends first getcfheader message. + * Returns None if are our filter headers are in sync with our block headers + */ def sendFirstGetCompactFilterHeadersCommand( peerMessageSenderApi: PeerMessageSenderApi, chainApi: ChainApi, peer: Peer, peers: Set[Peer])(implicit ec: ExecutionContext, - chainConfig: ChainAppConfig): Future[NodeState] = { + chainConfig: ChainAppConfig): Future[Option[NodeState]] = { for { bestFilterHeaderOpt <- chainApi .getBestFilterHeader() - filterCount <- chainApi.getFilterCount() blockHash = bestFilterHeaderOpt match { case Some(filterHeaderDb) => filterHeaderDb.blockHashBE @@ -1207,10 +1209,11 @@ object PeerManager extends Logging { case Some(filterSyncMarker) => peerMessageSenderApi .sendGetCompactFilterHeadersMessage(filterSyncMarker, Some(peer)) - .map(_ => FilterHeaderSync(peer, peers)) + .map(_ => Some(FilterHeaderSync(peer, peers))) case None => - sys.error( - s"Could not find block header in database to sync filter headers from! It's likely your database is corrupted blockHash=$blockHash bestFilterHeaderOpt=$bestFilterHeaderOpt filterCount=$filterCount") + logger.info( + s"Filter headers are synced! filterHeader.blockHashBE=$blockHash") + Future.successful(None) } } yield res } @@ -1279,13 +1282,18 @@ object PeerManager extends Logging { logger.info( s"Now syncing filter headers from $syncPeer in state=${currentDmh.state}") for { - newSyncingState <- PeerManager.sendFirstGetCompactFilterHeadersCommand( + newSyncingStateOpt <- PeerManager.sendFirstGetCompactFilterHeadersCommand( peerMessageSenderApi = peerMessageSenderApi, chainApi = currentDmh.chainApi, peer = syncPeer, peers = peers) } yield { - currentDmh.copy(state = newSyncingState) + newSyncingStateOpt match { + case Some(newSyncingState) => + currentDmh.copy(state = newSyncingState) + case None => + currentDmh.copy(state = DoneSyncing(currentDmh.state.peers)) + } } } diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala index 51cc2650fb..31e6e746c1 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala @@ -434,7 +434,7 @@ case class DataMessageHandler( peer = syncPeer, peers = peers) } yield { - syncingFilterHeadersState + syncingFilterHeadersState.getOrElse(DoneSyncing(peers)) } } else {