2023 09 27 issue 5139 (#5247)

* Add test case for reorg on bitcoin network for NeutrinoNodeTest

* Get unit tests passing

* Refactor findNextHeader() to not take an Option[BlockHeaderDb], not it just takes BlockHeaderDb

* Explicity return None in the case where we don't need sync filter headers

* Fix off by one bug when starting filter header sync
This commit is contained in:
Chris Stewart 2023-09-29 07:35:26 -05:00 committed by GitHub
parent 7c133c19ca
commit 3e6ff52194
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 218 additions and 56 deletions

View file

@ -1,6 +1,9 @@
package org.bitcoins.chain.blockchain
import org.bitcoins.testkit.chain.ChainDbUnitTest
import org.bitcoins.chain.pow.Pow
import org.bitcoins.core.api.chain.db.BlockHeaderDbHelper
import org.bitcoins.crypto.DoubleSha256DigestBE
import org.bitcoins.testkit.chain.{BlockHeaderHelper, ChainDbUnitTest}
import org.bitcoins.testkit.chain.fixture.ChainFixtureTag
import org.bitcoins.testkitcore.chain.ChainTestUtil
import org.scalatest.FutureOutcome
@ -39,4 +42,103 @@ class ChainHandlerCachedTest extends ChainDbUnitTest {
}
}
it must "generate a range for a block filter query for the genesis block" in {
chainHandler: ChainHandlerCached =>
val genesisHeader =
chainHandler.chainConfig.chain.genesisBlock.blockHeader
val assert1F = for {
rangeOpt <-
chainHandler.nextBlockHeaderBatchRange(DoubleSha256DigestBE.empty, 1)
} yield {
val marker = rangeOpt.get
assert(rangeOpt.nonEmpty)
assert(marker.startHeight == 0)
assert(marker.stopBlockHash == genesisHeader.hash)
}
//let's process a block header, and then be able to fetch that header as the last stopHash
val blockHeaderDb = {
BlockHeaderDbHelper.fromBlockHeader(height = 0,
chainWork =
Pow.getBlockProof(genesisHeader),
bh = genesisHeader)
}
val blockHeader = BlockHeaderHelper.buildNextHeader(blockHeaderDb)
val chainApi2 = assert1F.flatMap { _ =>
chainHandler.processHeader(blockHeader.blockHeader)
}
for {
chainApi <- chainApi2
rangeOpt <-
chainApi.nextBlockHeaderBatchRange(DoubleSha256DigestBE.empty, 2)
} yield {
val marker = rangeOpt.get
assert(rangeOpt.nonEmpty)
assert(marker.startHeight == 0)
assert(marker.stopBlockHash == blockHeader.hash)
}
}
it must "generate the correct range of block filters if a header is reorged" in {
chainHandler: ChainHandler =>
val reorgFixtureF = buildChainHandlerCompetingHeaders(chainHandler)
val chainHandlerF = reorgFixtureF.map(_.chainApi)
val newHeaderBF = reorgFixtureF.map(_.headerDb1)
val newHeaderCF = reorgFixtureF.map(_.headerDb2)
val batchSize = 100
//two competing headers B,C built off of A
//so just pick the first headerB to be our next block header batch
val assert1F = for {
chainHandler <- chainHandlerF
newHeaderB <- newHeaderBF
newHeaderC <- newHeaderCF
blockHeaderBatchOpt <- chainHandler.nextBlockHeaderBatchRange(
prevStopHash = ChainTestUtil.regTestGenesisHeaderDb.hashBE,
batchSize = batchSize)
} yield {
assert(blockHeaderBatchOpt.isDefined)
val marker = blockHeaderBatchOpt.get
ChainHandlerTest.checkReorgHeaders(header1 = newHeaderB,
header2 = newHeaderC,
bestHash = marker.stopBlockHash.flip)
assert(newHeaderB.height == marker.startHeight)
}
//now let's build a new block header ontop of C and process it
//when we call chainHandler.nextBlockHeaderBatchRange it
//should be C's hash instead of B's hash
for {
_ <- assert1F
chainHandler <- chainHandlerF
headerC <- newHeaderCF
headerD = BlockHeaderHelper.buildNextHeader(headerC)
chainApiD <- chainHandler.processHeader(headerD.blockHeader)
blockHeaderBatchOpt <- chainApiD.nextBlockHeaderBatchRange(
prevStopHash = ChainTestUtil.regTestGenesisHeaderDb.hashBE,
batchSize = batchSize)
count <- chainApiD.getBlockCount()
} yield {
assert(count == 2)
assert(blockHeaderBatchOpt.isDefined)
val marker = blockHeaderBatchOpt.get
assert(headerC.height == marker.startHeight)
assert(headerD.hash == marker.stopBlockHash)
}
}
it must "return None for ChainHandler.nextBlockHeaderBatchRange if we are synced" in {
chainHandler: ChainHandler =>
val assert1F = for {
bestBlockHash <- chainHandler.getBestBlockHash()
rangeOpt <-
chainHandler.nextBlockHeaderBatchRange(bestBlockHash, 1)
} yield {
assert(rangeOpt.isEmpty)
}
assert1F
}
}

View file

@ -25,7 +25,7 @@ import org.bitcoins.testkit.chain.fixture.ChainFixtureTag
import org.bitcoins.testkit.chain.{BlockHeaderHelper, ChainDbUnitTest}
import org.bitcoins.testkit.util.FileUtil
import org.bitcoins.testkitcore.chain.ChainTestUtil
import org.scalatest.{Assertion, FutureOutcome}
import org.scalatest.{Assertion, Assertions, FutureOutcome}
import play.api.libs.json.Json
import scala.concurrent.Future
@ -122,7 +122,7 @@ class ChainHandlerTest extends ChainDbUnitTest {
bestHash <- chainHandler.getBestBlockHash()
newHeaderC <- newHeaderCF
} yield {
checkReorgHeaders(header1 = newHeaderB,
ChainHandlerTest.checkReorgHeaders(header1 = newHeaderB,
header2 = newHeaderC,
bestHash = bestHash)
}
@ -448,7 +448,7 @@ class ChainHandlerTest extends ChainDbUnitTest {
} yield {
assert(blockHeaderBatchOpt.isDefined)
val marker = blockHeaderBatchOpt.get
checkReorgHeaders(header1 = newHeaderB,
ChainHandlerTest.checkReorgHeaders(header1 = newHeaderB,
header2 = newHeaderC,
bestHash = marker.stopBlockHash.flip)
assert(newHeaderB.height == marker.startHeight)
@ -479,11 +479,10 @@ class ChainHandlerTest extends ChainDbUnitTest {
it must "return None for ChainHandler.nextBlockHeaderBatchRange if we are synced" in {
chainHandler: ChainHandler =>
val genesisHeader =
chainHandler.chainConfig.chain.genesisBlock.blockHeader
val assert1F = for {
bestBlockHash <- chainHandler.getBestBlockHash()
rangeOpt <-
chainHandler.nextBlockHeaderBatchRange(genesisHeader.hashBE, 1)
chainHandler.nextBlockHeaderBatchRange(bestBlockHash, 1)
} yield {
assert(rangeOpt.isEmpty)
}
@ -721,21 +720,24 @@ class ChainHandlerTest extends ChainDbUnitTest {
}
}
}
object ChainHandlerTest {
/** Checks that
* 1. The header1 & header2 have the same chainwork
* 2. Checks that header1 and header2 have the same time
* 3. Checks bestHash is one of header1.hashBE or header2.hashBE
*/
private def checkReorgHeaders(
def checkReorgHeaders(
header1: BlockHeaderDb,
header2: BlockHeaderDb,
bestHash: DoubleSha256DigestBE): Assertion = {
assert(header1.chainWork == header2.chainWork)
assert(header1.time == header2.time)
Assertions.assert(header1.chainWork == header2.chainWork)
Assertions.assert(header1.time == header2.time)
//if both chainwork and time are the same, we are left to
//how the database serves up the data
//just make sure it is one of the two headers
assert(Vector(header1.hashBE, header2.hashBE).contains(bestHash))
Assertions.assert(Vector(header1.hashBE, header2.hashBE).contains(bestHash))
}
}

View file

@ -196,12 +196,37 @@ class ChainHandler(
batchSize: Int,
blockchains: Vector[Blockchain]): Future[Option[FilterSyncMarker]] = for {
prevBlockHeaderOpt <- getHeader(prevStopHash)
headerOpt <-
if (prevBlockHeaderOpt.isDefined)
findNextHeader(prevBlockHeaderOpt, batchSize, blockchains)
else if (prevStopHash == DoubleSha256DigestBE.empty)
findNextHeader(None, batchSize, blockchains)
else Future.successful(None)
headerOpt <- {
prevBlockHeaderOpt match {
case Some(prevBlockHeader) =>
findNextHeader(prevBlockHeader, batchSize, blockchains)
case None =>
if (prevStopHash == DoubleSha256DigestBE.empty) {
getHeadersAtHeight(batchSize - 1).flatMap { headers =>
val fsmOptF = if (headers.isEmpty) {
//just get best height?
getBestBlockHeader().map { bestHeader =>
val f = FilterSyncMarker(0, bestHeader.hash)
Some(f)
}
} else if (headers.length == 1) {
val header = headers.head
val f = FilterSyncMarker(0, header.hash)
Future.successful(Some(f))
} else {
//just select first header, i guess
val header = headers.head
val f = FilterSyncMarker(0, header.hash)
Future.successful(Some(f))
}
fsmOptF
}
} else {
Future.successful(None)
}
}
}
} yield headerOpt
/** @inheritdoc */
@ -216,18 +241,14 @@ class ChainHandler(
} yield syncMarkerOpt
/** Finds the next header in the chain. Uses chain work to break ties
* returning only the header in the chain with the most work
* returning only the header in the chain with the most work,
* else returns None if there is no next header
*/
private def findNextHeader(
prevBlockHeaderOpt: Option[BlockHeaderDb],
prevBlockHeader: BlockHeaderDb,
batchSize: Int,
blockchains: Vector[Blockchain]): Future[Option[FilterSyncMarker]] = {
val chainsF = prevBlockHeaderOpt match {
case None =>
blockHeaderDAO.getBlockchainsBetweenHeights(from = 0,
to = batchSize - 1)
case Some(prevBlockHeader) =>
val chainsF = {
val inMemoryBlockchains = {
blockchains.filter(
_.exists(_.previousBlockHashBE == prevBlockHeader.hashBE))
@ -241,11 +262,13 @@ class ChainHandler(
}
}
val startHeight = prevBlockHeaderOpt match {
case None => 0
case Some(prevBlockHeader) =>
val startHeight = {
if (prevBlockHeader.hashBE == chainConfig.chain.genesisHash.flip) {
1
} else {
prevBlockHeader.height + 1
}
}
for {
chains <- chainsF
@ -253,16 +276,19 @@ class ChainHandler(
val nextBlockHeaderOpt = getBestChainAtHeight(startHeight = startHeight,
batchSize = batchSize,
blockchains = chains)
(nextBlockHeaderOpt, prevBlockHeaderOpt) match {
case (Some(next), Some(prev)) =>
nextBlockHeaderOpt match {
case Some(next) =>
//this means we are synced, so return None
if (next.stopBlockHash == prev.hash) {
val isSynced =
next.stopBlockHash == prevBlockHeader.hash || next.stopBlockHash == chainConfig.chain.genesisHash
if (isSynced) {
None
} else {
nextBlockHeaderOpt
}
case (Some(_), None) | (None, Some(_)) | (None, None) =>
nextBlockHeaderOpt
case None =>
//log here?
None
}
}
}

View file

@ -74,6 +74,7 @@ trait ChainApi extends ChainQueryApi {
stopHash: DoubleSha256DigestBE): Future[ChainApi]
/** Generates a block range in form of (startHeight, stopHash) by the given stop hash.
* Returns None if we are synced
*/
def nextBlockHeaderBatchRange(
prevStopHash: DoubleSha256DigestBE,

View file

@ -385,4 +385,27 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
1.second)
} yield succeed
}
it must "handle reorgs correctly" in {
nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoinds =>
//https://github.com/bitcoin-s/bitcoin-s/issues/5017
val node = nodeConnectedWithBitcoind.node
val bitcoinds = nodeConnectedWithBitcoind.bitcoinds
val bitcoind0 = bitcoinds(0)
val bitcoind1 = bitcoinds(1)
for {
_ <- NodeTestUtil.awaitAllSync(node, bitcoind0)
//disconnect bitcoind1 as we don't need it
nodeUri1 <- NodeTestUtil.getNodeURIFromBitcoind(bitcoind1)
_ <- bitcoind1.disconnectNode(nodeUri1)
bestBlockHash0 <- bitcoind0.getBestBlockHash()
_ <- bitcoind0.invalidateBlock(bestBlockHash0)
//now generate a block, make sure we sync with them
_ <- bitcoind0.generate(1)
_ <- AsyncUtil.nonBlockingSleep(1.second)
//generate another block to make sure the reorg is complete
_ <- bitcoind0.generate(1)
_ <- NodeTestUtil.awaitAllSync(node, bitcoind0)
} yield succeed
}
}

View file

@ -1182,18 +1182,20 @@ case class ResponseTimeout(payload: NetworkPayload)
object PeerManager extends Logging {
/** Sends first getcfheader message.
* Returns None if are our filter headers are in sync with our block headers
*/
def sendFirstGetCompactFilterHeadersCommand(
peerMessageSenderApi: PeerMessageSenderApi,
chainApi: ChainApi,
peer: Peer,
peers: Set[Peer])(implicit
ec: ExecutionContext,
chainConfig: ChainAppConfig): Future[NodeState] = {
chainConfig: ChainAppConfig): Future[Option[NodeState]] = {
for {
bestFilterHeaderOpt <-
chainApi
.getBestFilterHeader()
filterCount <- chainApi.getFilterCount()
blockHash = bestFilterHeaderOpt match {
case Some(filterHeaderDb) =>
filterHeaderDb.blockHashBE
@ -1207,10 +1209,11 @@ object PeerManager extends Logging {
case Some(filterSyncMarker) =>
peerMessageSenderApi
.sendGetCompactFilterHeadersMessage(filterSyncMarker, Some(peer))
.map(_ => FilterHeaderSync(peer, peers))
.map(_ => Some(FilterHeaderSync(peer, peers)))
case None =>
sys.error(
s"Could not find block header in database to sync filter headers from! It's likely your database is corrupted blockHash=$blockHash bestFilterHeaderOpt=$bestFilterHeaderOpt filterCount=$filterCount")
logger.info(
s"Filter headers are synced! filterHeader.blockHashBE=$blockHash")
Future.successful(None)
}
} yield res
}
@ -1279,13 +1282,18 @@ object PeerManager extends Logging {
logger.info(
s"Now syncing filter headers from $syncPeer in state=${currentDmh.state}")
for {
newSyncingState <- PeerManager.sendFirstGetCompactFilterHeadersCommand(
newSyncingStateOpt <- PeerManager.sendFirstGetCompactFilterHeadersCommand(
peerMessageSenderApi = peerMessageSenderApi,
chainApi = currentDmh.chainApi,
peer = syncPeer,
peers = peers)
} yield {
newSyncingStateOpt match {
case Some(newSyncingState) =>
currentDmh.copy(state = newSyncingState)
case None =>
currentDmh.copy(state = DoneSyncing(currentDmh.state.peers))
}
}
}

View file

@ -434,7 +434,7 @@ case class DataMessageHandler(
peer = syncPeer,
peers = peers)
} yield {
syncingFilterHeadersState
syncingFilterHeadersState.getOrElse(DoneSyncing(peers))
}
} else {