Try to add block generate to address in fixture setup to get around compact filter sync edge case (#3231)

* Try to add block generate to address in fixture setup to get around the case where block headers & filter headers are synced but compact filters are not

* Push to github to force re-run of CI 2

* Add ChainApi.getBestFilter(), use it in sync to determine if filter headers & filters are in sync

* Push to github to force re-run of CI 3

* Simplify getBestFilter, add unit test

* Add more comments, clean up some code
This commit is contained in:
Chris Stewart 2021-06-08 12:47:28 -05:00 committed by GitHub
parent a56086b751
commit 7ba7f8b9ba
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 64 additions and 25 deletions

View file

@ -200,6 +200,8 @@ class BitcoindRpcClient(val instance: BitcoindInstance)(implicit
blockHash: DoubleSha256DigestBE): Future[Option[CompactFilterHeaderDb]] =
filterHeadersUnsupported
override def getBestFilter(): Future[Option[CompactFilterDb]] = ???
override def getFilter(
hash: DoubleSha256DigestBE): Future[Option[CompactFilterDb]] = ???

View file

@ -618,6 +618,19 @@ class ChainHandlerTest extends ChainDbUnitTest {
} yield assert(result)
}
it must "get best filter" in { chainHandler: ChainHandler =>
val bestFilterOptF = chainHandler.getBestFilter()
val bestFilterHeaderOptF = chainHandler.getBestFilterHeader()
for {
bestFilterHeaderOpt <- bestFilterHeaderOptF
bestFilterOpt <- bestFilterOptF
} yield {
assert(bestFilterHeaderOpt.isDefined)
assert(bestFilterOpt.isDefined)
assert(bestFilterOpt.get.hashBE == bestFilterHeaderOpt.get.filterHashBE)
}
}
/** Checks that
* 1. The header1 & header2 have the same chainwork
* 2. Checks that header1 and header2 have the same time

View file

@ -551,6 +551,10 @@ class ChainHandler(
}
}
override def getBestFilter(): Future[Option[CompactFilterDb]] = {
filterDAO.getBestFilter
}
/** This method retrieves the best [[CompactFilterHeaderDb]] from the database
* without any blockchain context, and then uses the [[CompactFilterHeaderDb.blockHashBE]]
* to query our block headers database looking for a filter header that is in the best chain

View file

@ -119,6 +119,8 @@ trait ChainApi extends ChainQueryApi {
*/
def getBestFilterHeader(): Future[Option[CompactFilterHeaderDb]]
def getBestFilter(): Future[Option[CompactFilterDb]]
/** Looks up a compact filter header by its hash.
*/
def getFilterHeader(

View file

@ -5,7 +5,11 @@ import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models.BlockHeaderDAO
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.api.chain.ChainQueryApi.FilterResponse
import org.bitcoins.core.api.chain.db.{BlockHeaderDb, CompactFilterHeaderDb}
import org.bitcoins.core.api.chain.db.{
BlockHeaderDb,
CompactFilterDb,
CompactFilterHeaderDb
}
import org.bitcoins.core.protocol.BlockStamp
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer
@ -65,7 +69,7 @@ case class NeutrinoNode(
chainApi <- chainApiFromDb()
header <- chainApi.getBestBlockHeader()
bestFilterHeaderOpt <- chainApi.getBestFilterHeader()
filterCount <- chainApi.getFilterCount()
bestFilterOpt <- chainApi.getBestFilter()
blockchains <-
BlockHeaderDAO()(executionContext, chainConfig).getBlockchains()
// Get all of our cached headers in case of a reorg
@ -74,7 +78,10 @@ case class NeutrinoNode(
tip = blockchains
.map(_.tip)
.head //should be safe since we have genesis header inserted in db
_ <- syncFilters(bestFilterHeaderOpt, tip, chainApi, filterCount)
_ <- syncFilters(bestFilterHeaderOpt = bestFilterHeaderOpt,
bestFilterOpt = bestFilterOpt,
bestBlockHeader = tip,
chainApi = chainApi)
} yield {
logger.info(
s"Starting sync node, height=${header.height} hash=${header.hashBE.hex}")
@ -83,41 +90,45 @@ case class NeutrinoNode(
private def syncFilters(
bestFilterHeaderOpt: Option[CompactFilterHeaderDb],
bestFilterOpt: Option[CompactFilterDb],
bestBlockHeader: BlockHeaderDb,
chainApi: ChainApi,
filterCount: Int): Future[Unit] = {
chainApi: ChainApi): Future[Unit] = {
// If we have started syncing filters headers
bestFilterHeaderOpt match {
case None =>
(bestFilterHeaderOpt, bestFilterOpt) match {
case (None, None) | (None, Some(_)) =>
//do nothing if we haven't started syncing
Future.unit
case Some(bestFilterHeader) =>
case (Some(bestFilterHeader), Some(bestFilter)) =>
val isFilterHeaderSynced =
bestFilterHeader.blockHashBE == bestBlockHeader.hashBE
if (isFilterHeaderSynced) {
//means we are in sync, with filter heads and block headers
val isFiltersSynced = {
//check if we have started syncing filters,
//and if so, see if filter headers and filters
//were in sync
bestFilter.hashBE == bestFilterHeader.filterHashBE
}
if (isFilterHeaderSynced && isFiltersSynced) {
//means we are in sync, with filter heads & block headers & filters
//if there _both_ filter headers and block headers are on
//an old tip, our event driven node will start syncing
//filters after block headers are in sync
//do nothing
//the shortcoming of this case is if you
//1. fully sync your block headers & filter headers
//2. start syncing compact filters
//3. stop your node
//4. restart your node before a block occurs on the network
//5. You won't begin syncing compact filters until a header occurs on the network
Future.unit
} else {
syncCompactFilters(bestFilterHeader, chainApi, filterCount)
syncCompactFilters(bestFilterHeader, chainApi, Some(bestFilter))
}
case (Some(bestFilterHeader), None) =>
syncCompactFilters(bestFilterHeader, chainApi, None)
}
}
/** Starts sync compact filer headers.
* Only starts syncing compact filters if our compact filter headers are in sync with block headers
*/
private def syncCompactFilters(
bestFilterHeader: CompactFilterHeaderDb,
chainApi: ChainApi,
filterCount: Int): Future[Unit] = {
bestFilterOpt: Option[CompactFilterDb]): Future[Unit] = {
val sendCompactFilterHeaderMsgF = {
peerMsgSender.sendNextGetCompactFilterHeadersCommand(
chainApi = chainApi,
@ -128,15 +139,17 @@ case class NeutrinoNode(
// If we have started syncing filters
if (
!isSyncFilterHeaders &&
filterCount != bestFilterHeader.height
&& filterCount != 0
bestFilterOpt.isDefined &&
bestFilterOpt.get.hashBE != bestFilterHeader.filterHashBE
) {
//means we are not syncing filter headers, and our filters are NOT
//in sync with our compact filter headers
logger.info(s"Starting sync filters in NeutrinoNode.sync()")
peerMsgSender
.sendNextGetCompactFilterCommand(chainApi = chainApi,
filterBatchSize =
chainConfig.filterBatchSize,
startHeight = filterCount)
.sendNextGetCompactFilterCommand(
chainApi = chainApi,
filterBatchSize = chainConfig.filterBatchSize,
startHeight = bestFilterOpt.get.height)
.map(_ => ())
} else {
Future.unit

View file

@ -122,6 +122,10 @@ trait BaseNodeTest extends BitcoinSFixture with EmbeddedPg {
override def getBestFilterHeader(): Future[Option[CompactFilterHeaderDb]] =
Future.successful(None)
override def getBestFilter(): Future[Option[CompactFilterDb]] = {
Future.successful(None)
}
override def getFilterHeader(blockHash: DoubleSha256DigestBE): Future[
Option[CompactFilterHeaderDb]] = Future.successful(None)

View file

@ -501,6 +501,7 @@ object NodeUnitTest extends P2PLogger {
system: ActorSystem): Future[NeutrinoNode] = {
import system.dispatcher
for {
_ <- node.sync()
_ <- NodeTestUtil.awaitSync(node, bitcoind)
_ <- NodeTestUtil.awaitCompactFilterHeadersSync(node, bitcoind)