2023 12 24 Make filter headers / filters sync work with block header reorgs (#5333)

* Rework ChainApi.nextBlockHeaderBatchRange() to take stopHash parameter, get chainTest/test passing

* WIP: Refactor ChainApi.nextFilterHeaderBatchRange()

* Finish ChainApi.nextFilterHeaderBatchRange() refactor, get all tests passing except reorg related tests in nodeTest

* Get NeutrinoNodeTest reorg test case working

* Improve flaky test

* Cleanup

* Switch sync check to bitcoinds(1)

* Empty commit to run CI

* Implement batchSize tests in ChainHandlerTest

* Rework ChainHandlerTest to not assume we have the genesis filter header / filter inserted into the database already

* Cleanup println

* Fix bug with nextFilterHeaderBatchRange() wrt to startHeightOpt parameter

* Fix off by one bug with compact filter sync

* Add check for connectionCount

* Add longer timeout

* scalafmt
This commit is contained in:
Chris Stewart 2023-12-28 10:43:01 -06:00 committed by GitHub
parent 0ace6dfd2e
commit d983a1bac4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 700 additions and 167 deletions

View File

@ -222,14 +222,16 @@ class BitcoindRpcClient(override val instance: BitcoindInstance)(implicit
override def nextBlockHeaderBatchRange(
prevStopHash: DoubleSha256DigestBE,
stopHash: DoubleSha256DigestBE,
batchSize: Int): Future[Option[FilterSyncMarker]] =
Future.failed(
new UnsupportedOperationException(
s"Bitcoind chainApi doesn't allow you fetch block header batch range"))
override def nextFilterHeaderBatchRange(
startHeight: Int,
batchSize: Int): Future[Option[FilterSyncMarker]] =
stopBlockHash: DoubleSha256DigestBE,
batchSize: Int,
startHeightOpt: Option[Int]): Future[Option[FilterSyncMarker]] =
Future.failed(
new UnsupportedOperationException(
s"Bitcoind chainApi doesn't allow you fetch filter header batch range"))

View File

@ -48,7 +48,11 @@ class ChainHandlerCachedTest extends ChainDbUnitTest {
chainHandler.chainConfig.chain.genesisBlock.blockHeader
val assert1F = for {
rangeOpt <-
chainHandler.nextBlockHeaderBatchRange(DoubleSha256DigestBE.empty, 1)
chainHandler.nextBlockHeaderBatchRange(prevStopHash =
DoubleSha256DigestBE.empty,
stopHash =
genesisHeader.hashBE,
batchSize = 1)
} yield {
val marker = rangeOpt.get
assert(rangeOpt.nonEmpty)
@ -72,7 +76,10 @@ class ChainHandlerCachedTest extends ChainDbUnitTest {
for {
chainApi <- chainApi2
rangeOpt <-
chainApi.nextBlockHeaderBatchRange(DoubleSha256DigestBE.empty, 2)
chainApi.nextBlockHeaderBatchRange(prevStopHash =
DoubleSha256DigestBE.empty,
stopHash = blockHeader.hashBE,
batchSize = 2)
} yield {
val marker = rangeOpt.get
assert(rangeOpt.nonEmpty)
@ -91,12 +98,13 @@ class ChainHandlerCachedTest extends ChainDbUnitTest {
//two competing headers B,C built off of A
//so just pick the first headerB to be our next block header batch
val assert1F = for {
val assert0F = for {
chainHandler <- chainHandlerF
newHeaderB <- newHeaderBF
newHeaderC <- newHeaderCF
blockHeaderBatchOpt <- chainHandler.nextBlockHeaderBatchRange(
prevStopHash = ChainTestUtil.regTestGenesisHeaderDb.hashBE,
stopHash = newHeaderB.hashBE,
batchSize = batchSize)
} yield {
assert(blockHeaderBatchOpt.isDefined)
@ -107,6 +115,26 @@ class ChainHandlerCachedTest extends ChainDbUnitTest {
assert(newHeaderB.height == marker.startHeight)
}
//two competing headers B,C built off of A
//pick headerC to be our next block header batch
val assert1F = for {
_ <- assert0F
chainHandler <- chainHandlerF
newHeaderB <- newHeaderBF
newHeaderC <- newHeaderCF
blockHeaderBatchOpt <- chainHandler.nextBlockHeaderBatchRange(
prevStopHash = ChainTestUtil.regTestGenesisHeaderDb.hashBE,
stopHash = newHeaderC.hashBE,
batchSize = batchSize)
} yield {
assert(blockHeaderBatchOpt.isDefined)
val marker = blockHeaderBatchOpt.get
ChainHandlerTest.checkReorgHeaders(header1 = newHeaderB,
header2 = newHeaderC,
bestHash = marker.stopBlockHash.flip)
assert(newHeaderC.height == marker.startHeight)
}
//now let's build a new block header ontop of C and process it
//when we call chainHandler.nextBlockHeaderBatchRange it
//should be C's hash instead of B's hash
@ -118,6 +146,7 @@ class ChainHandlerCachedTest extends ChainDbUnitTest {
chainApiD <- chainHandler.processHeader(headerD.blockHeader)
blockHeaderBatchOpt <- chainApiD.nextBlockHeaderBatchRange(
prevStopHash = ChainTestUtil.regTestGenesisHeaderDb.hashBE,
stopHash = headerD.hashBE,
batchSize = batchSize)
count <- chainApiD.getBlockCount()
} yield {
@ -135,7 +164,9 @@ class ChainHandlerCachedTest extends ChainDbUnitTest {
val assert1F = for {
bestBlockHash <- chainHandler.getBestBlockHash()
rangeOpt <-
chainHandler.nextBlockHeaderBatchRange(bestBlockHash, 1)
chainHandler.nextBlockHeaderBatchRange(prevStopHash = bestBlockHash,
stopHash = bestBlockHash,
batchSize = 1)
} yield {
assert(rangeOpt.isEmpty)
}

View File

@ -38,7 +38,7 @@ class ChainHandlerTest extends ChainDbUnitTest {
ChainFixtureTag.GenesisChainHandlerWithFilter
override def withFixture(test: OneArgAsyncTest): FutureOutcome =
withChainHandlerGenesisFilter(test)
withChainHandler(test)
val genesis: BlockHeaderDb = ChainTestUtil.genesisHeaderDb
behavior of "ChainHandler"
@ -53,6 +53,16 @@ class ChainHandlerTest extends ChainDbUnitTest {
nonce = UInt32(2083236893)
)
private def insertGenesisFilterHeaderAndFilter(
chainHandler: ChainHandler): Future[Unit] = {
for {
_ <- chainHandler.processFilterHeader(
ChainTestUtil.genesisFilterHeaderDb.filterHeader,
ChainTestUtil.genesisHeaderDb.hashBE)
_ <- chainHandler.processFilter(ChainTestUtil.genesisFilterMessage)
} yield ()
}
it must "process a new valid block header, and then be able to fetch that header" in {
chainHandler: ChainHandler =>
val newValidHeader =
@ -218,9 +228,12 @@ class ChainHandlerTest extends ChainDbUnitTest {
it must "get the highest filter header" in { chainHandler: ChainHandler =>
{
for {
initFhCount <- chainHandler.getFilterHeaderCount()
_ <- insertGenesisFilterHeaderAndFilter(chainHandler)
count <- chainHandler.getFilterHeaderCount()
genesisFilterHeader <- chainHandler.getFilterHeadersAtHeight(count)
} yield {
assert(initFhCount == 0)
assert(genesisFilterHeader.size == 1)
assert(
genesisFilterHeader.contains(ChainTestUtil.genesisFilterHeaderDb))
@ -318,9 +331,12 @@ class ChainHandlerTest extends ChainDbUnitTest {
{
for {
count <- chainHandler.getFilterCount()
genesisFilter <- chainHandler.getFiltersAtHeight(count)
_ <- insertGenesisFilterHeaderAndFilter(chainHandler)
genesisFilter <- chainHandler.getFiltersAtHeight(0)
count1 <- chainHandler.getFilterCount()
} yield {
assert(count == 0)
assert(count1 == 0)
assert(genesisFilter.contains(ChainTestUtil.genesisFilterDb))
assert(
genesisFilter.head.golombFilter == ChainTestUtil.genesisFilterDb.golombFilter)
@ -331,6 +347,7 @@ class ChainHandlerTest extends ChainDbUnitTest {
it must "NOT create an unknown filter" in { chainHandler: ChainHandler =>
{
val unknownHashF = for {
_ <- insertGenesisFilterHeaderAndFilter(chainHandler)
_ <- chainHandler.processHeader(nextBlockHeader)
blockHashBE <- chainHandler.getHeadersAtHeight(1).map(_.head.hashBE)
golombFilter = BlockFilter.fromHex("017fa880", blockHashBE.flip)
@ -389,16 +406,20 @@ class ChainHandlerTest extends ChainDbUnitTest {
}
}
it must "generate a range for a block filter query for the genesis block" in {
it must "generate a range for a filter header query for the genesis block" in {
chainHandler: ChainHandler =>
val genesisHeader =
chainHandler.chainConfig.chain.genesisBlock.blockHeader
val assert1F = for {
rangeOpt <-
chainHandler.nextBlockHeaderBatchRange(DoubleSha256DigestBE.empty, 1)
chainHandler.nextBlockHeaderBatchRange(prevStopHash =
DoubleSha256DigestBE.empty,
stopHash =
genesisHeader.hashBE,
batchSize = 1)
} yield {
val marker = rangeOpt.get
assert(rangeOpt.nonEmpty)
val marker = rangeOpt.get
assert(marker.startHeight == 0)
assert(marker.stopBlockHash == genesisHeader.hash)
}
@ -419,7 +440,9 @@ class ChainHandlerTest extends ChainDbUnitTest {
for {
chainApi <- chainApi2
rangeOpt <-
chainApi.nextBlockHeaderBatchRange(DoubleSha256DigestBE.empty, 2)
chainApi.nextBlockHeaderBatchRange(DoubleSha256DigestBE.empty,
stopHash = blockHeader.hashBE,
batchSize = 2)
} yield {
val marker = rangeOpt.get
assert(rangeOpt.nonEmpty)
@ -428,7 +451,7 @@ class ChainHandlerTest extends ChainDbUnitTest {
}
}
it must "generate the correct range of block filters if a header is reorged" in {
it must "generate the correct range of filter headers if a block header is reorged" in {
chainHandler: ChainHandler =>
val reorgFixtureF = buildChainHandlerCompetingHeaders(chainHandler)
val chainHandlerF = reorgFixtureF.map(_.chainApi)
@ -437,13 +460,14 @@ class ChainHandlerTest extends ChainDbUnitTest {
val batchSize = 100
//two competing headers B,C built off of A
//so just pick the first headerB to be our next block header batch
val assert1F = for {
//first specify header B to be syncing filter headers from
val assert0F = for {
chainHandler <- chainHandlerF
newHeaderB <- newHeaderBF
newHeaderC <- newHeaderCF
blockHeaderBatchOpt <- chainHandler.nextBlockHeaderBatchRange(
prevStopHash = ChainTestUtil.regTestGenesisHeaderDb.hashBE,
stopHash = newHeaderB.hashBE,
batchSize = batchSize)
} yield {
assert(blockHeaderBatchOpt.isDefined)
@ -452,6 +476,28 @@ class ChainHandlerTest extends ChainDbUnitTest {
header2 = newHeaderC,
bestHash = marker.stopBlockHash.flip)
assert(newHeaderB.height == marker.startHeight)
assert(newHeaderB.hashBE == marker.stopBlockHash.flip)
}
//two competing headers B,C built off of A
//first specify header C to be syncing filter headers from
val assert1F = for {
_ <- assert0F
chainHandler <- chainHandlerF
newHeaderB <- newHeaderBF
newHeaderC <- newHeaderCF
blockHeaderBatchOpt <- chainHandler.nextBlockHeaderBatchRange(
prevStopHash = ChainTestUtil.regTestGenesisHeaderDb.hashBE,
stopHash = newHeaderC.hashBE,
batchSize = batchSize)
} yield {
assert(blockHeaderBatchOpt.isDefined)
val marker = blockHeaderBatchOpt.get
ChainHandlerTest.checkReorgHeaders(header1 = newHeaderB,
header2 = newHeaderC,
bestHash = marker.stopBlockHash.flip)
assert(newHeaderC.height == marker.startHeight)
assert(newHeaderC.hashBE == marker.stopBlockHash.flip)
}
//now let's build a new block header ontop of C and process it
@ -465,6 +511,7 @@ class ChainHandlerTest extends ChainDbUnitTest {
chainApiD <- chainHandler.processHeader(headerD.blockHeader)
blockHeaderBatchOpt <- chainApiD.nextBlockHeaderBatchRange(
prevStopHash = ChainTestUtil.regTestGenesisHeaderDb.hashBE,
stopHash = headerD.hashBE,
batchSize = batchSize)
count <- chainApiD.getBlockCount()
} yield {
@ -472,7 +519,7 @@ class ChainHandlerTest extends ChainDbUnitTest {
assert(blockHeaderBatchOpt.isDefined)
val marker = blockHeaderBatchOpt.get
assert(headerC.height == marker.startHeight)
assert(headerD.hash == marker.stopBlockHash)
assert(headerD.hashBE == marker.stopBlockHash.flip)
}
}
@ -482,30 +529,287 @@ class ChainHandlerTest extends ChainDbUnitTest {
val assert1F = for {
bestBlockHash <- chainHandler.getBestBlockHash()
rangeOpt <-
chainHandler.nextBlockHeaderBatchRange(bestBlockHash, 1)
chainHandler.nextBlockHeaderBatchRange(prevStopHash = bestBlockHash,
stopHash = bestBlockHash,
batchSize = 1)
} yield {
assert(rangeOpt.isEmpty)
assert(rangeOpt.isEmpty, s"rangeOpt=$rangeOpt")
}
assert1F
}
it must "generate a range for a block filter header query" in {
it must "nextBlockHeaderBatchRange must honor the batchSize query" in {
chainHandler: ChainHandler =>
val reorgFixtureF = buildChainHandlerCompetingHeaders(chainHandler)
val chainHandlerF = reorgFixtureF.map(_.chainApi)
val newHeaderBF = reorgFixtureF.map(_.headerDb1)
val newHeaderCF = reorgFixtureF.map(_.headerDb2)
//two competing headers B,C built off of A
//first specify header C to be syncing filter headers from
val assert1F = for {
chainHandler <- chainHandlerF
newHeaderB <- newHeaderBF
newHeaderC <- newHeaderCF
blockHeaderBatchOpt <- chainHandler.nextBlockHeaderBatchRange(
prevStopHash = ChainTestUtil.regTestGenesisHeaderDb.hashBE,
stopHash = newHeaderC.hashBE,
batchSize = 1)
} yield {
assert(blockHeaderBatchOpt.isDefined)
val marker = blockHeaderBatchOpt.get
ChainHandlerTest.checkReorgHeaders(header1 = newHeaderB,
header2 = newHeaderC,
bestHash = marker.stopBlockHash.flip)
assert(marker.startHeight == 1)
assert(newHeaderC.hashBE == marker.stopBlockHash.flip)
}
//now let's build a new block header ontop of C and process it
//when we call chainHandler.nextBlockHeaderBatchRange it
//should be C's hash instead of D's hash due to batchSize
val assert2F = for {
_ <- assert1F
chainHandler <- chainHandlerF
headerC <- newHeaderCF
headerD = BlockHeaderHelper.buildNextHeader(headerC)
chainApiD <- chainHandler.processHeader(headerD.blockHeader)
blockHeaderBatchOpt <- chainApiD.nextBlockHeaderBatchRange(
prevStopHash = ChainTestUtil.regTestGenesisHeaderDb.hashBE,
stopHash = headerD.hashBE,
batchSize = 1)
} yield {
assert(blockHeaderBatchOpt.isDefined)
val marker = blockHeaderBatchOpt.get
assert(marker.startHeight == 1)
assert(headerC.hashBE == marker.stopBlockHashBE)
}
val headerDF = {
newHeaderCF.map(headerC => BlockHeaderHelper.buildNextHeader(headerC))
}
val assert3F = for {
_ <- assert2F
chainHandler <- chainHandlerF
headerD <- headerDF
chainApiD <- chainHandler.processHeader(headerD.blockHeader)
blockHeaderBatchOpt <- chainApiD.nextBlockHeaderBatchRange(
prevStopHash = ChainTestUtil.regTestGenesisHeaderDb.hashBE,
stopHash = headerD.hashBE,
batchSize = 2)
} yield {
assert(blockHeaderBatchOpt.isDefined)
val marker = blockHeaderBatchOpt.get
assert(marker.startHeight == 1)
assert(headerD.hashBE == marker.stopBlockHash.flip)
}
//must return None in the case of reorg scenario between prevStopHash / stopHash
for {
bestBlock <- chainHandler.getBestBlockHeader()
bestBlockHashBE = bestBlock.hashBE
rangeOpt <- chainHandler.nextFilterHeaderBatchRange(0, 1)
_ <- assert3F
chainHandler <- chainHandlerF
headerB <- newHeaderBF
headerD <- headerDF
//note headerB and headerD are not part of the same chain as D is built ontop of C
blockHeaderBatchOpt <-
chainHandler.nextBlockHeaderBatchRange(prevStopHash = headerB.hashBE,
stopHash = headerD.hashBE,
batchSize = 1)
} yield {
assert(blockHeaderBatchOpt.isEmpty)
}
}
it must "generate the correct range of block filters if a header is reorged" in {
chainHandler: ChainHandler =>
val reorgFixtureF = buildChainHandlerCompetingHeaders(chainHandler)
val chainHandlerF = reorgFixtureF.map(_.chainApi)
val newHeaderBF = reorgFixtureF.map(_.headerDb1)
val newHeaderCF = reorgFixtureF.map(_.headerDb2)
val batchSize = 100
//two competing headers B,C built off of A
//first specify header B to be syncing filter headers from
val assert0F = for {
chainHandler <- chainHandlerF
newHeaderB <- newHeaderBF
newHeaderC <- newHeaderCF
blockHeaderBatchOpt <- chainHandler.nextFilterHeaderBatchRange(
stopBlockHash = newHeaderB.hashBE,
batchSize = batchSize)
} yield {
assert(blockHeaderBatchOpt.isDefined)
val marker = blockHeaderBatchOpt.get
ChainHandlerTest.checkReorgHeaders(header1 = newHeaderB,
header2 = newHeaderC,
bestHash = marker.stopBlockHash.flip)
assert(marker.startHeight == 0)
assert(marker.stopBlockHash.flip == newHeaderB.hashBE)
}
//two competing headers B,C built off of A
//first specify header C to be syncing filter headers from
val assert1F = for {
_ <- assert0F
chainHandler <- chainHandlerF
newHeaderB <- newHeaderBF
newHeaderC <- newHeaderCF
blockHeaderBatchOpt <- chainHandler.nextFilterHeaderBatchRange(
stopBlockHash = newHeaderC.hashBE,
batchSize = batchSize)
} yield {
assert(blockHeaderBatchOpt.isDefined)
val marker = blockHeaderBatchOpt.get
ChainHandlerTest.checkReorgHeaders(header1 = newHeaderB,
header2 = newHeaderC,
bestHash = marker.stopBlockHash.flip)
assert(marker.startHeight == 0)
assert(marker.stopBlockHash.flip == newHeaderC.hashBE)
}
//now let's build a new block header ontop of C and process it
//when we call chainHandler.nextFilterHeaderBatchRange it
//should be C's hash instead of B's hash
for {
_ <- assert1F
chainHandler <- chainHandlerF
headerC <- newHeaderCF
headerD = BlockHeaderHelper.buildNextHeader(headerC)
chainApiD <- chainHandler.processHeader(headerD.blockHeader)
blockHeaderBatchOpt <- chainApiD.nextFilterHeaderBatchRange(
stopBlockHash = headerD.hashBE,
batchSize = batchSize)
count <- chainApiD.getBlockCount()
} yield {
assert(count == 2)
assert(blockHeaderBatchOpt.isDefined)
val marker = blockHeaderBatchOpt.get
assert(marker.startHeight == 0)
assert(headerD.hash == marker.stopBlockHash)
}
}
it must "generate a range for a block filter query for the genesis block" in {
chainHandler: ChainHandler =>
val genesisHeader =
chainHandler.chainConfig.chain.genesisBlock.blockHeader
val assert1F = for {
rangeOpt <-
chainHandler.nextFilterHeaderBatchRange(stopBlockHash =
genesisHeader.hashBE,
1)
} yield {
assert(rangeOpt.nonEmpty)
val marker = rangeOpt.get
assert(marker.startHeight == 0)
assert(marker.stopBlockHash == genesisHeader.hash)
}
//let's process a block header, and then be able to fetch that header as the last stopHash
val blockHeaderDb = {
BlockHeaderDbHelper.fromBlockHeader(height = 0,
chainWork =
Pow.getBlockProof(genesisHeader),
bh = genesisHeader)
}
val blockHeader = BlockHeaderHelper.buildNextHeader(blockHeaderDb)
val chainApi2 = assert1F.flatMap { _ =>
chainHandler.processHeader(blockHeader.blockHeader)
}
for {
chainApi <- chainApi2
rangeOpt <-
chainApi.nextFilterHeaderBatchRange(stopBlockHash =
blockHeader.hashBE,
batchSize = 2)
} yield {
val marker = rangeOpt.get
assert(rangeOpt.nonEmpty)
assert(marker.startHeight == 0)
assert(marker.stopBlockHash == bestBlockHashBE.flip)
assert(marker.stopBlockHash == blockHeader.hash)
}
}
it must "nextFilterHeaderBatchRange must honor the batchSize query" in {
chainHandler: ChainHandler =>
val reorgFixtureF = buildChainHandlerCompetingHeaders(chainHandler)
val chainHandlerF = reorgFixtureF.map(_.chainApi)
val newHeaderBF = reorgFixtureF.map(_.headerDb1)
val newHeaderCF = reorgFixtureF.map(_.headerDb2)
//two competing headers B,C built off of A
//first specify header C to be syncing filter headers from
val assert1F = for {
chainHandler <- chainHandlerF
newHeaderB <- newHeaderBF
newHeaderC <- newHeaderCF
blockHeaderBatchOpt <- chainHandler.nextFilterHeaderBatchRange(
stopBlockHash = newHeaderC.hashBE,
batchSize = 1)
} yield {
assert(blockHeaderBatchOpt.isDefined)
val marker = blockHeaderBatchOpt.get
ChainHandlerTest.checkReorgHeaders(header1 = newHeaderB,
header2 = newHeaderC,
bestHash = marker.stopBlockHash.flip)
assert(marker.startHeight == 0)
assert(newHeaderC.hashBE == marker.stopBlockHash.flip)
}
val headerDF = {
newHeaderCF.map(headerC => BlockHeaderHelper.buildNextHeader(headerC))
}
//now let's build a new block header ontop of C and process it
//when we call chainHandler.nextFilterHeaderBatchRange with batchSize=2
//should get D's hash back as the stop hash
val assert3F = for {
_ <- assert1F
chainHandler <- chainHandlerF
headerD <- headerDF
chainApiD <- chainHandler.processHeader(headerD.blockHeader)
blockHeaderBatchOpt <- chainApiD.nextFilterHeaderBatchRange(
stopBlockHash = headerD.hashBE,
batchSize = 2)
} yield {
assert(blockHeaderBatchOpt.isDefined)
val marker = blockHeaderBatchOpt.get
assert(marker.startHeight == 0)
assert(marker.stopBlockHash.flip == headerD.hashBE)
}
assert3F
}
it must "nextFilterHeaderBatchRange must honor the startHeightOpt parameter" in {
chainHandler =>
val reorgFixtureF = buildChainHandlerCompetingHeaders(chainHandler)
val chainHandlerF = reorgFixtureF.map(_.chainApi)
val newHeaderCF = reorgFixtureF.map(_.headerDb2)
val assert1F = for {
chainHandler <- chainHandlerF
newHeaderC <- newHeaderCF
rangeOpt <- chainHandler.nextFilterHeaderBatchRange(stopBlockHash =
newHeaderC.hashBE,
batchSize = 2,
startHeightOpt =
Some(0))
} yield {
assert(rangeOpt.nonEmpty, s"rangeOpt=$rangeOpt")
val range = rangeOpt.get
assert(range.startHeight == 0)
assert(range.stopBlockHashBE == newHeaderC.hashBE)
}
assert1F
}
it must "read compact filters for the database" in {
chainHandler: ChainHandler =>
for {
_ <- insertGenesisFilterHeaderAndFilter(chainHandler)
bestBlock <- chainHandler.getBestBlockHeader()
filterHeader <- chainHandler.getFilterHeader(bestBlock.hashBE)
filter <- chainHandler.getFilter(bestBlock.hashBE)
@ -608,7 +912,10 @@ class ChainHandlerTest extends ChainDbUnitTest {
}
it must "find filters between heights" in { chainHandler: ChainHandler =>
chainHandler.getFiltersBetweenHeights(0, 1).map { filters =>
for {
_ <- insertGenesisFilterHeaderAndFilter(chainHandler)
filters <- chainHandler.getFiltersBetweenHeights(0, 1)
} yield {
val genesis = ChainTestUtil.genesisFilterDb
val genesisFilterResponse = FilterResponse(genesis.golombFilter,
genesis.blockHashBE,
@ -642,12 +949,10 @@ class ChainHandlerTest extends ChainDbUnitTest {
chainHandler: ChainHandler =>
val filter = ChainTestUtil.genesisFilterMessage
val filters = Vector.fill(2)(filter)
val filterCountBeforeF = chainHandler.getFilterCount()
val processF =
filterCountBeforeF.flatMap(_ => chainHandler.processFilters(filters))
for {
_ <- processF
beforeFilterCount <- filterCountBeforeF
_ <- insertGenesisFilterHeaderAndFilter(chainHandler)
beforeFilterCount <- chainHandler.getFilterCount()
_ <- chainHandler.processFilters(filters)
filterCount <- chainHandler.getFilterCount()
} yield {
assert(beforeFilterCount == filterCount)
@ -682,11 +987,10 @@ class ChainHandlerTest extends ChainDbUnitTest {
}
it must "get best filter" in { chainHandler: ChainHandler =>
val bestFilterOptF = chainHandler.getBestFilter()
val bestFilterHeaderOptF = chainHandler.getBestFilterHeader()
for {
bestFilterHeaderOpt <- bestFilterHeaderOptF
bestFilterOpt <- bestFilterOptF
_ <- insertGenesisFilterHeaderAndFilter(chainHandler)
bestFilterHeaderOpt <- chainHandler.getBestFilterHeader()
bestFilterOpt <- chainHandler.getBestFilter()
} yield {
assert(bestFilterHeaderOpt.isDefined)
assert(bestFilterOpt.isDefined)

View File

@ -191,108 +191,191 @@ class ChainHandler(
getBestBlockHeader().map(_.hashBE)
}
protected def nextBlockHeaderBatchRangeWithChains(
prevStopHash: DoubleSha256DigestBE,
batchSize: Int,
blockchains: Vector[Blockchain]): Future[Option[FilterSyncMarker]] = for {
prevBlockHeaderOpt <- getHeader(prevStopHash)
headerOpt <- {
prevBlockHeaderOpt match {
case Some(prevBlockHeader) =>
findNextHeader(prevBlockHeader, batchSize, blockchains)
case None =>
if (prevStopHash == DoubleSha256DigestBE.empty) {
getHeadersAtHeight(batchSize - 1).flatMap { headers =>
val fsmOptF = if (headers.isEmpty) {
//just get best height?
getBestBlockHeader().map { bestHeader =>
val f = FilterSyncMarker(0, bestHeader.hash)
Some(f)
}
} else if (headers.length == 1) {
val header = headers.head
val f = FilterSyncMarker(0, header.hash)
Future.successful(Some(f))
} else {
//just select first header, i guess
val header = headers.head
val f = FilterSyncMarker(0, header.hash)
Future.successful(Some(f))
}
fsmOptF
}
} else {
Future.successful(None)
}
}
}
} yield headerOpt
/** @inheritdoc */
override def nextBlockHeaderBatchRange(
prevStopHash: DoubleSha256DigestBE,
batchSize: Int): Future[Option[FilterSyncMarker]] =
for {
blockchains <- blockHeaderDAO.getBlockchains()
syncMarkerOpt <- nextBlockHeaderBatchRangeWithChains(prevStopHash,
batchSize,
blockchains)
} yield syncMarkerOpt
stopHash: DoubleSha256DigestBE,
batchSize: Int): Future[Option[FilterSyncMarker]] = {
if (prevStopHash == DoubleSha256DigestBE.empty) {
getHeadersAtHeight(batchSize - 1).map { headers =>
if (headers.length == 1) {
val fsm = FilterSyncMarker(0, headers.head.hash)
Some(fsm)
} else {
logger.warn(
s"ChainHandler.nextBlockHeaderBatchRange() did not find a single header, got zero or multiple=$headers")
None
}
}
} else if (prevStopHash == stopHash) {
//means are are in sync
Future.successful(None)
} else {
val prevBlockHeaderOptF = getHeader(prevStopHash)
val stopBlockHeaderOptF = getHeader(stopHash)
val blockchainsF = blockHeaderDAO.getBlockchains()
for {
prevBlockHeaderOpt <- prevBlockHeaderOptF
stopBlockHeaderOpt <- stopBlockHeaderOptF
blockchains <- blockchainsF
fsmOpt <- getFilterSyncStopHash(prevBlockHeaderOpt = prevBlockHeaderOpt,
stopBlockHeaderOpt = stopBlockHeaderOpt,
blockchains = blockchains,
batchSize = batchSize)
} yield {
fsmOpt
}
}
}
/** Retrieves a [[FilterSyncMarker]] respeciting the batchSize parameter. If the stopBlockHeader is not within the batchSize parameter
* we walk backwards until we find a header within the batchSize limit
*/
private def getFilterSyncStopHash(
prevBlockHeaderOpt: Option[BlockHeaderDb],
stopBlockHeaderOpt: Option[BlockHeaderDb],
blockchains: Vector[Blockchain],
batchSize: Int): Future[Option[FilterSyncMarker]] = {
(prevBlockHeaderOpt, stopBlockHeaderOpt) match {
case (prevBlockHeaderOpt, Some(stopBlockHeader)) =>
findNextHeader(prevBlockHeaderOpt = prevBlockHeaderOpt,
stopBlockHeaderDb = stopBlockHeader,
batchSize = batchSize,
blockchains = blockchains)
case (Some(prevBlockHeader), None) =>
val exn = new RuntimeException(
s"Cannot find a stopBlockHeader, only found prevBlockHeader=${prevBlockHeader.hashBE}")
Future.failed(exn)
case (None, None) =>
val exn = new RuntimeException(
s"Cannot find prevBlocKHeader or stopBlockHeader")
Future.failed(exn)
}
}
/** Finds the next header in the chain. Uses chain work to break ties
* returning only the header in the chain with the most work,
* else returns None if there is no next header
* @param prevBlockHeaderOpt the previous block header we synced from, if None we are syncing from genesis
*/
private def findNextHeader(
prevBlockHeader: BlockHeaderDb,
prevBlockHeaderOpt: Option[BlockHeaderDb],
stopBlockHeaderDb: BlockHeaderDb,
batchSize: Int,
blockchains: Vector[Blockchain]): Future[Option[FilterSyncMarker]] = {
val chainsF = {
val isInBatchSize =
(stopBlockHeaderDb.height - prevBlockHeaderOpt
.map(_.height)
.getOrElse(0)) <= batchSize
val prevBlockHeaderHeight = {
prevBlockHeaderOpt
.map(_.height)
.getOrElse(-1) //means we are syncing from the genesis block
}
val prevBlockHeaderHashBE = {
prevBlockHeaderOpt
.map(_.hashBE)
.getOrElse(
DoubleSha256DigestBE.empty
) //means we are syncing from genesis block
}
val chainsF: Future[Vector[Blockchain]] = {
val inMemoryBlockchains = {
blockchains.filter(
_.exists(_.previousBlockHashBE == prevBlockHeader.hashBE))
blockchains.filter { b =>
hasBothBlockHeaderHashes(blockchain = b,
prevBlockHeaderHashBE =
prevBlockHeaderHashBE,
stopBlockHeaderHashBE =
stopBlockHeaderDb.hashBE)
}
}
if (inMemoryBlockchains.nonEmpty) {
Future.successful(inMemoryBlockchains)
} else {
blockHeaderDAO.getBlockchainsBetweenHeights(
from = prevBlockHeader.height,
to = prevBlockHeader.height + batchSize)
if (isInBatchSize) {
blockHeaderDAO
.getBlockchainFrom(stopBlockHeaderDb)
.map {
case Some(blockchain) =>
val hasBothHashes =
hasBothBlockHeaderHashes(
blockchain = blockchain,
prevBlockHeaderHashBE = prevBlockHeaderHashBE,
stopBlockHeaderHashBE = stopBlockHeaderDb.hashBE)
if (hasBothHashes) {
Vector(blockchain)
} else {
Vector.empty
}
case None => Vector.empty
}
} else {
blockHeaderDAO.getBlockchainsBetweenHeights(
from = prevBlockHeaderHeight,
to = prevBlockHeaderHeight + batchSize)
}
}
}
val startHeight = {
if (prevBlockHeader.hashBE == chainConfig.chain.genesisHash.flip) {
1
} else {
prevBlockHeader.height + 1
}
}
val startHeight = prevBlockHeaderHeight + 1
for {
chains <- chainsF
} yield {
val nextBlockHeaderOpt = getBestChainAtHeight(startHeight = startHeight,
batchSize = batchSize,
blockchains = chains)
val nextBlockHeaderOpt = {
if (chains.isEmpty) {
//means prevBlockHeader and stopBlockHeader do not form a blockchain
None
} else if (isInBatchSize) {
Some(FilterSyncMarker(startHeight, stopBlockHeaderDb.hash))
} else {
//means our requested stopBlockHeader is outside of batchSize
//we need to find an intermediate header within batchSize to sync against
getBestChainAtHeight(startHeight = startHeight,
batchSize = batchSize,
blockchains = chains)
}
}
nextBlockHeaderOpt match {
case Some(next) =>
//this means we are synced, so return None
val isSynced =
next.stopBlockHash == prevBlockHeader.hash || next.stopBlockHash == chainConfig.chain.genesisHash
next.stopBlockHashBE == prevBlockHeaderHashBE
if (isSynced) {
None
} else {
nextBlockHeaderOpt
}
case None =>
//log here?
None
}
}
}
private def hasBothBlockHeaderHashes(
blockchain: Blockchain,
prevBlockHeaderHashBE: DoubleSha256DigestBE,
stopBlockHeaderHashBE: DoubleSha256DigestBE): Boolean = {
if (prevBlockHeaderHashBE == DoubleSha256DigestBE.empty) {
//carve out here in the case of genesis header,
//blockchains don't contain a block header with hash 0x000..0000
blockchain.exists(_.hashBE == stopBlockHeaderHashBE)
} else {
val hasHash1 =
blockchain.exists(_.hashBE == prevBlockHeaderHashBE)
val hasHash2 =
blockchain.exists(_.hashBE == stopBlockHeaderHashBE)
hasHash1 && hasHash2
}
}
/** Given a vector of blockchains, this method finds the chain with the most chain work
* and then returns the given height and hash of the block header included in that chain
* This is useful for reorg situations where you aren't sure what header is included in a chain
@ -325,27 +408,59 @@ class ChainHandler(
/** @inheritdoc */
override def nextFilterHeaderBatchRange(
filterHeight: Int,
batchSize: Int): Future[Option[FilterSyncMarker]] = {
val startHeight = if (filterHeight <= 0) 0 else filterHeight + 1
val stopHeight = startHeight - 1 + batchSize
val stopBlockF =
getFilterHeadersAtHeight(stopHeight).map(_.headOption).flatMap {
case Some(stopBlock) =>
Future.successful(stopBlock)
stopBlockHash: DoubleSha256DigestBE,
batchSize: Int,
startHeightOpt: Option[Int]): Future[Option[FilterSyncMarker]] = {
val stopBlockHeaderDbOptF = getHeader(stopBlockHash)
val blockchainsF = blockHeaderDAO.getBlockchains()
val startHeadersOptF: Future[Option[Vector[BlockHeaderDb]]] =
startHeightOpt match {
case Some(startHeight) =>
getHeadersAtHeight(startHeight - 1).map { headers =>
if (headers.isEmpty) None
else Some(headers)
}
case None =>
// This means the stop height is past the filter header height
getBestFilterHeader().map(
_.getOrElse(throw UnknownBlockHeight(
s"Unknown filter header height $stopHeight")))
getBestFilter().flatMap {
case Some(bestFilter) =>
getHeader(bestFilter.blockHashBE)
.map(_.toVector)
.map(Some(_))
case None =>
//means we need to sync from genesis filter
Future.successful(None)
}
}
stopBlockF.map { stopBlock =>
if (startHeight > stopBlock.height)
None
else
Some(FilterSyncMarker(startHeight, stopBlock.blockHashBE.flip))
stopBlockHeaderDbOptF.flatMap {
case Some(stopBlockHeaderDb) =>
for {
startHeadersOpt <- startHeadersOptF
blockchains <- blockchainsF
fsmOptVec <- {
startHeadersOpt match {
case Some(startHeaders) =>
Future.traverse(startHeaders) { h =>
findNextHeader(prevBlockHeaderOpt = Some(h),
stopBlockHeaderDb = stopBlockHeaderDb,
batchSize = batchSize,
blockchains = blockchains)
}
case None =>
val fsmOptF = findNextHeader(prevBlockHeaderOpt = None,
stopBlockHeaderDb =
stopBlockHeaderDb,
batchSize = batchSize,
blockchains = blockchains)
fsmOptF.map(Vector(_))
}
}
} yield {
fsmOptVec.flatten.headOption
}
case None =>
Future.successful(None)
}
}

View File

@ -8,7 +8,7 @@ import org.bitcoins.chain.models.{
CompactFilterHeaderDAO
}
import org.bitcoins.core.api.chain.db.{BlockHeaderDb, CompactFilterHeaderDb}
import org.bitcoins.core.api.chain.{ChainApi, FilterSyncMarker}
import org.bitcoins.core.api.chain.{ChainApi}
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.crypto.DoubleSha256DigestBE
@ -51,14 +51,6 @@ case class ChainHandlerCached(
override def getBestFilterHeader(): Future[Option[CompactFilterHeaderDb]] = {
getBestFilterHeaderWithChains(blockchains)
}
override def nextBlockHeaderBatchRange(
prevStopHash: DoubleSha256DigestBE,
batchSize: Int): Future[Option[FilterSyncMarker]] = {
nextBlockHeaderBatchRangeWithChains(prevStopHash = prevStopHash,
batchSize = batchSize,
blockchains = blockchains)
}
}
object ChainHandlerCached {

View File

@ -75,16 +75,37 @@ trait ChainApi extends ChainQueryApi {
/** Generates a block range in form of (startHeight, stopHash) by the given stop hash.
* Returns None if we are synced
*
* @param prevStopHash our previous block hash where filter header sync stopped
* @param stopHash the block hash we want to sync the new batch of filters to
* @param batchSize the batch size of filter headers
* @return
*/
def nextBlockHeaderBatchRange(
prevStopHash: DoubleSha256DigestBE,
stopHash: DoubleSha256DigestBE,
batchSize: Int): Future[Option[FilterSyncMarker]]
/** Generates a filter header range in form of (startHeight, stopHash) by the given stop hash.
*/
final def nextFilterHeaderBatchRange(
stopBlockHash: DoubleSha256DigestBE,
batchSize: Int): Future[Option[FilterSyncMarker]] = {
nextFilterHeaderBatchRange(stopBlockHash = stopBlockHash,
batchSize = batchSize,
startHeightOpt = None)
}
/** Generates a query for a range of compact filters
* @param stopBlockHash the block hash to stop receiving filters at
* @param batchSize
* @param startHeightOpt the block height to start syncing filters from. If None, we query our chainstate for the last filter we've seen
* @return
*/
def nextFilterHeaderBatchRange(
startHeight: Int,
batchSize: Int): Future[Option[FilterSyncMarker]]
stopBlockHash: DoubleSha256DigestBE,
batchSize: Int,
startHeightOpt: Option[Int]): Future[Option[FilterSyncMarker]]
/** Adds a compact filter into the filter database.
*/

View File

@ -1,6 +1,6 @@
package org.bitcoins.core.api.chain
import org.bitcoins.crypto.DoubleSha256Digest
import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
/** This is a helper class for syncing block filters following the
* BIP157 protocol. This indicates the starting block height we are
@ -12,6 +12,8 @@ case class FilterSyncMarker(
startHeight: Int,
stopBlockHash: DoubleSha256Digest) {
val stopBlockHashBE: DoubleSha256DigestBE = stopBlockHash.flip
override def toString: String =
s"FilterSyncMarker(startHeight=$startHeight, stopBlockHash=${stopBlockHash.flip.hex})"
s"FilterSyncMarker(startHeight=$startHeight,stopBlockHashBE=${stopBlockHash.flip.hex})"
}

View File

@ -1101,6 +1101,8 @@ case class CompactFilterMessage(
filterBytes: ByteVector
) extends DataPayload {
val blockHashBE: DoubleSha256DigestBE = blockHash.flip
/** The number of filter bytes in this message */
val numFilterBytes: CompactSizeUInt = CompactSizeUInt(
UInt64(filterBytes.length))

View File

@ -303,11 +303,17 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
//2023-05-01T21:46:46Z [net] Failed to find block filter hashes in index: filter_type=basic, start_height=208, stop_hash=303cc906bf99b5370581e7f23285378c18005745882c6112dbbf3e61a82aeddb
val node = nodeConnectedWithBitcoind.node
val bitcoind = nodeConnectedWithBitcoind.bitcoinds(0)
val bitcoind1 = nodeConnectedWithBitcoind.bitcoinds(1)
//start syncing node
val numBlocks = 5
val genBlocksF = {
for {
nodeUri <- NodeTestUtil.getNodeURIFromBitcoind(bitcoind1)
_ <- bitcoind1.disconnectNode(nodeUri)
_ <- AsyncUtil.retryUntilSatisfiedF(
() => node.getConnectionCount.map(_ == 1),
1.second)
//generate blocks while sync is ongoing
_ <- bitcoind.generate(numBlocks)
} yield {

View File

@ -163,7 +163,7 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
peer = peers(1)
_ <- node.peerManager.isConnected(peer).map(assert(_))
bitcoinds <- bitcoindsF
_ <- NodeTestUtil.awaitAllSync(node, bitcoinds(1))
//disconnect bitcoind(0) as its not needed for this test
node0Uri <- NodeTestUtil.getNodeURIFromBitcoind(bitcoinds(0))
_ <- bitcoinds(0).disconnectNode(node0Uri)

View File

@ -79,14 +79,18 @@ case class PeerManager(
case None =>
sys.error(s"Could not find peer=$syncPeer")
}
val sendCompactFilterHeaderMsgF =
val bestBlockHashF = chainApi.getBestBlockHash()
val sendCompactFilterHeaderMsgF = bestBlockHashF.flatMap { bestBlockHash =>
PeerManager.sendNextGetCompactFilterHeadersCommand(
peerMessageSenderApi = peerMsgSender,
chainApi = chainApi,
peer = syncPeer,
filterHeaderBatchSize = chainAppConfig.filterHeaderBatchSize,
prevStopHash = bestFilterHeader.blockHashBE
prevStopHash = bestFilterHeader.blockHashBE,
stopHash = bestBlockHash
)
}
sendCompactFilterHeaderMsgF.flatMap { isSyncFilterHeaders =>
// If we have started syncing filters
if (
@ -97,7 +101,8 @@ case class PeerManager(
peerMessageSenderApi = peerMsgSender,
chainApi = chainApi,
filterBatchSize = chainAppConfig.filterBatchSize,
startHeight = compactFilterStartHeight,
startHeightOpt = Some(compactFilterStartHeight),
stopBlockHash = bestFilterHeader.blockHashBE,
peer = syncPeer
)
.map(_ => ())
@ -846,6 +851,7 @@ case class PeerManager(
.sendFirstGetCompactFilterHeadersCommand(
peerMessageSenderApi = peerMsgSender,
chainApi = chainApi,
stopBlockHeaderDb = bestBlockHeader,
state = fhs)
.map(_ => ())
case x @ (_: FilterSync | _: HeaderSync) =>
@ -942,6 +948,7 @@ object PeerManager extends Logging {
def sendFirstGetCompactFilterHeadersCommand(
peerMessageSenderApi: PeerMessageSenderApi,
chainApi: ChainApi,
stopBlockHeaderDb: BlockHeaderDb,
state: SyncNodeState)(implicit
ec: ExecutionContext,
chainConfig: ChainAppConfig): Future[
@ -952,13 +959,27 @@ object PeerManager extends Logging {
.getBestFilterHeader()
blockHash = bestFilterHeaderOpt match {
case Some(filterHeaderDb) =>
filterHeaderDb.blockHashBE
//need to check for reorg scenarios here
val isSameHeight = filterHeaderDb.height == stopBlockHeaderDb.height
val isNotSameBlockHash =
filterHeaderDb.blockHashBE != stopBlockHeaderDb.hashBE
if (isSameHeight && isNotSameBlockHash) {
//need to start from previous header has to sync filter headers
//correctly in a reorg scenario
stopBlockHeaderDb.previousBlockHashBE
} else {
filterHeaderDb.blockHashBE
}
case None =>
DoubleSha256DigestBE.empty
}
hashHeightOpt <- chainApi.nextBlockHeaderBatchRange(
prevStopHash = blockHash,
batchSize = chainConfig.filterHeaderBatchSize)
hashHeightOpt <- {
chainApi.nextBlockHeaderBatchRange(prevStopHash = blockHash,
stopHash = stopBlockHeaderDb.hashBE,
batchSize =
chainConfig.filterHeaderBatchSize)
}
res <- hashHeightOpt match {
case Some(filterSyncMarker) =>
peerMessageSenderApi
@ -981,11 +1002,13 @@ object PeerManager extends Logging {
chainApi: ChainApi,
peer: Peer,
filterHeaderBatchSize: Int,
prevStopHash: DoubleSha256DigestBE)(implicit
prevStopHash: DoubleSha256DigestBE,
stopHash: DoubleSha256DigestBE)(implicit
ec: ExecutionContext): Future[Boolean] = {
for {
filterSyncMarkerOpt <- chainApi.nextBlockHeaderBatchRange(
prevStopHash = prevStopHash,
stopHash = stopHash,
batchSize = filterHeaderBatchSize)
res <- filterSyncMarkerOpt match {
case Some(filterSyncMarker) =>
@ -1006,11 +1029,14 @@ object PeerManager extends Logging {
peerMessageSenderApi: PeerMessageSenderApi,
chainApi: ChainApi,
filterBatchSize: Int,
startHeight: Int,
startHeightOpt: Option[Int],
stopBlockHash: DoubleSha256DigestBE,
peer: Peer)(implicit ec: ExecutionContext): Future[Boolean] = {
for {
filterSyncMarkerOpt <-
chainApi.nextFilterHeaderBatchRange(startHeight, filterBatchSize)
chainApi.nextFilterHeaderBatchRange(stopBlockHash = stopBlockHash,
batchSize = filterBatchSize,
startHeightOpt = startHeightOpt)
res <- filterSyncMarkerOpt match {
case Some(filterSyncMarker) =>
logger.info(
@ -1028,16 +1054,18 @@ object PeerManager extends Logging {
def fetchCompactFilterHeaders(
state: SyncNodeState, //can we tighten this type up?
chainApi: ChainApi,
peerMessageSenderApi: PeerMessageSenderApi)(implicit
peerMessageSenderApi: PeerMessageSenderApi,
stopBlockHeaderDb: BlockHeaderDb)(implicit
ec: ExecutionContext,
chainAppConfig: ChainAppConfig): Future[
Option[NodeState.FilterHeaderSync]] = {
logger.info(
s"Now syncing filter headers from ${state.syncPeer} in state=${state}")
s"Now syncing filter headers from ${state.syncPeer} in state=${state} stopBlockHashBE=${stopBlockHeaderDb.hashBE}")
for {
newSyncingStateOpt <- PeerManager.sendFirstGetCompactFilterHeadersCommand(
peerMessageSenderApi = peerMessageSenderApi,
chainApi = chainApi,
stopBlockHeaderDb = stopBlockHeaderDb,
state = state)
} yield {
newSyncingStateOpt
@ -1068,7 +1096,7 @@ object PeerManager extends Logging {
chainApi.getBestFilter().flatMap {
case Some(f) =>
//we have already started syncing filters, return the height of the last filter seen
Future.successful(f.height)
Future.successful(f.height + 1)
case None =>
walletCreationTimeOpt match {
case Some(instant) =>

View File

@ -173,12 +173,14 @@ case class DataMessageHandler(
(newBatch: Set[CompactFilterMessage], newChainApi) <- {
if (isFiltersSynced || batchSizeFull) {
logger.debug(s"Processing ${filterBatch.size} filters")
val sortedBlockFiltersF =
sortBlockFiltersByBlockHeight(filterBatch)
for {
sortedBlockFilters <- sortedBlockFiltersF
sortedFilterMessages = sortedBlockFilters.map(_._2)
_ = logger.debug(
s"Processing ${filterBatch.size} filters bestBlockHashBE=${sortedFilterMessages.lastOption
.map(_.blockHashBE)}")
newChainApi <- chainApi.processFilters(sortedFilterMessages)
sortedGolombFilters = sortedBlockFilters.map(x =>
(x._1, x._3))
@ -189,15 +191,20 @@ case class DataMessageHandler(
} yield (Set.empty, newChainApi)
} else Future.successful((filterBatch, chainApi))
}
(_, newFilterHeight) <-
calcFilterHeaderFilterHeight(newChainApi)
filterHeaderSyncStateOpt <-
if (batchSizeFull) {
logger.debug(
s"Received maximum amount of filters in one batch. This means we are not synced, requesting more")
sendNextGetCompactFilterCommand(peerMessageSenderApi,
newFilterHeight,
filterSyncState)
for {
bestBlockHash <- chainApi.getBestBlockHash()
fssOpt <- sendNextGetCompactFilterCommand(
peerMessageSenderApi = peerMessageSenderApi,
startHeightOpt = None,
stopBlockHash = bestBlockHash,
fs = filterSyncState)
} yield {
fssOpt
}
} else Future.successful(Some(filterSyncState))
newDmhState <- {
if (isFiltersSynced) {
@ -362,10 +369,12 @@ case class DataMessageHandler(
/** syncs filter headers in case the header chain is still ahead post filter sync */
private def syncIfHeadersAhead(
syncNodeState: SyncNodeState): Future[NodeState] = {
val bestBlockHeaderDbF = chainApi.getBestBlockHeader()
for {
headerHeight <- chainApi.getBestHashBlockHeight()
filterHeaderCount <- chainApi.getFilterHeaderCount()
filterCount <- chainApi.getFilterCount()
bestBlockHeaderDb <- bestBlockHeaderDbF
newState <- {
require(headerHeight >= Math.max(filterHeaderCount, filterCount),
"Header chain cannot be behind filter or filter header chain")
@ -378,12 +387,14 @@ case class DataMessageHandler(
val fhs = FilterHeaderSync(syncNodeState.syncPeer,
syncNodeState.peers,
syncNodeState.waitingForDisconnection)
for {
syncingFilterHeadersState <- PeerManager
.sendFirstGetCompactFilterHeadersCommand(peerMessageSenderApi =
peerMessageSenderApi,
chainApi = chainApi,
state = fhs)
.sendFirstGetCompactFilterHeadersCommand(
peerMessageSenderApi = peerMessageSenderApi,
chainApi = chainApi,
state = fhs,
stopBlockHeaderDb = bestBlockHeaderDb)
} yield {
syncingFilterHeadersState.getOrElse(
DoneSyncing(syncNodeState.peers,
@ -467,18 +478,21 @@ case class DataMessageHandler(
private def sendNextGetCompactFilterHeadersCommand(
peerMessageSenderApi: PeerMessageSenderApi,
syncPeer: Peer,
prevStopHash: DoubleSha256DigestBE): Future[Boolean] =
prevStopHash: DoubleSha256DigestBE,
stopHash: DoubleSha256DigestBE): Future[Boolean] =
PeerManager.sendNextGetCompactFilterHeadersCommand(
peerMessageSenderApi = peerMessageSenderApi,
chainApi = chainApi,
peer = syncPeer,
filterHeaderBatchSize = chainConfig.filterHeaderBatchSize,
prevStopHash = prevStopHash
prevStopHash = prevStopHash,
stopHash = stopHash
)
private def sendNextGetCompactFilterCommand(
peerMessageSenderApi: PeerMessageSenderApi,
startHeight: Int,
startHeightOpt: Option[Int],
stopBlockHash: DoubleSha256DigestBE,
fs: NodeState.FilterSync): Future[Option[NodeState.FilterSync]] = {
PeerManager
@ -486,7 +500,8 @@ case class DataMessageHandler(
peerMessageSenderApi = peerMessageSenderApi,
chainApi = chainApi,
filterBatchSize = chainConfig.filterBatchSize,
startHeight = startHeight,
startHeightOpt = startHeightOpt,
stopBlockHash = stopBlockHash,
peer = fs.syncPeer
)
.map { isSyncing =>
@ -503,9 +518,10 @@ case class DataMessageHandler(
private def sendFirstGetCompactFilterCommand(
peerMessageSenderApi: PeerMessageSenderApi,
stopBlockHash: DoubleSha256DigestBE,
startHeight: Int,
syncNodeState: SyncNodeState): Future[Option[NodeState.FilterSync]] = {
logger.info(s"Beginning to sync filters from startHeight=$startHeight")
logger.info(s"Beginning to sync filters to stopBlockHashBE=$stopBlockHash")
val fs = syncNodeState match {
case x @ (_: HeaderSync | _: FilterHeaderSync) =>
@ -514,7 +530,8 @@ case class DataMessageHandler(
}
sendNextGetCompactFilterCommand(peerMessageSenderApi = peerMessageSenderApi,
startHeight = startHeight,
startHeightOpt = Some(startHeight),
stopBlockHash = stopBlockHash,
fs = fs)
}
@ -554,7 +571,6 @@ case class DataMessageHandler(
for {
(newFilterHeaderHeight, newFilterHeight) <- calcFilterHeaderFilterHeight(
chainApi)
isSynced <-
if (newFilterHeight == 0 && walletCreationTimeOpt.isDefined) {
//if we have zero filters in our database and are syncing filters after a wallet creation time
@ -675,10 +691,18 @@ case class DataMessageHandler(
// headers are synced now with the current sync peer, now move to validating it for all peers
require(syncPeer == peer, s"syncPeer=$syncPeer peer=$peer")
val fhsOptF = PeerManager.fetchCompactFilterHeaders(
state = state,
chainApi = chainApi,
peerMessageSenderApi = peerMessageSenderApi)
val fhsOptF = {
for {
lastBlockHeaderDbOpt <- chainApi.getHeader(lastHash.flip)
fhs <- PeerManager.fetchCompactFilterHeaders(
state = state,
chainApi = chainApi,
peerMessageSenderApi = peerMessageSenderApi,
stopBlockHeaderDb = lastBlockHeaderDbOpt.get)
} yield {
fhs
}
}
fhsOptF.map {
case Some(s) => s
case None =>
@ -753,11 +777,13 @@ case class DataMessageHandler(
peer: Peer): Future[NodeState] = {
val filterHeaders = filterHeader.filterHeaders
val blockCountF = chainApi.getBlockCount()
val bestBlockHashF = chainApi.getBestBlockHash()
for {
_ <- chainApi.processFilterHeaders(filterHeaders,
filterHeader.stopHash.flip)
filterHeaderCount <- chainApi.getFilterHeaderCount()
blockCount <- blockCountF
bestBlockHash <- bestBlockHashF
newState <-
if (blockCount != filterHeaderCount) {
logger.debug(
@ -765,15 +791,17 @@ case class DataMessageHandler(
sendNextGetCompactFilterHeadersCommand(
peerMessageSenderApi = peerMessageSenderApi,
syncPeer = peer,
prevStopHash = filterHeader.stopHash.flip).map(_ =>
filterHeaderSync)
prevStopHash = filterHeader.stopHash.flip,
stopHash = bestBlockHash).map(_ => filterHeaderSync)
} else {
for {
startHeight <- PeerManager.getCompactFilterStartHeight(
chainApi,
walletCreationTimeOpt)
bestBlockHash <- bestBlockHashF
filterSyncStateOpt <- sendFirstGetCompactFilterCommand(
peerMessageSenderApi = peerMessageSenderApi,
stopBlockHash = bestBlockHash,
startHeight = startHeight,
syncNodeState = filterHeaderSync)
} yield {

View File

@ -100,13 +100,15 @@ trait BaseNodeTest extends BitcoinSFixture with EmbeddedPg {
Future.successful(this)
override def nextBlockHeaderBatchRange(
prevStopHash: DoubleSha256DigestBE,
stopHash: DoubleSha256DigestBE,
batchSize: Int): Future[Option[FilterSyncMarker]] =
Future.successful(None)
override def nextFilterHeaderBatchRange(
startHeight: Int,
batchSize: Int): Future[Option[FilterSyncMarker]] =
stopBlockHash: DoubleSha256DigestBE,
batchSize: Int,
startHeightOpt: Option[Int]): Future[Option[FilterSyncMarker]] =
Future.successful(None)
override def processFilters(