Change ChainApi.getBestFilterHeader() return type to Future[Option[Co… (#1550)

* Change ChainApi.getBestFilterHeader() return type to Future[Option[CompactFilterHeaderDb]] to resolve issue 1549

* Run scalafmt
This commit is contained in:
Chris Stewart 2020-06-13 17:01:51 -05:00 committed by GitHub
parent aa53ee5f57
commit 94568aba22
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 132 additions and 32 deletions

View file

@ -0,0 +1,20 @@
package org.bitcoins.chain.models
import org.bitcoins.testkit.chain.ChainDbUnitTest
import org.scalatest.FutureOutcome
class CompactFilterDAOTest extends ChainDbUnitTest {
override type FixtureParam = CompactFilterDAO
override def withFixture(test: OneArgAsyncTest): FutureOutcome =
withCompactFilterDAO(test)
behavior of "CompactFilterDAO"
it must "retrieve getBestFilter when there are no filters in the db" in {
compactFilterDAO: CompactFilterDAO =>
compactFilterDAO.getBestFilter
.map(opt => assert(opt == None))
}
}

View file

@ -0,0 +1,21 @@
package org.bitcoins.chain.models
import org.bitcoins.testkit.chain.ChainDbUnitTest
import org.scalatest.FutureOutcome
class CompactFilterHeaderDAOTest extends ChainDbUnitTest {
override type FixtureParam = CompactFilterHeaderDAO
override def withFixture(test: OneArgAsyncTest): FutureOutcome =
withCompactFilterHeaderDAO(test)
behavior of "CompactFilterHeaderDAO"
it must "get the best filter header with a table with zero rows in it" in {
filterHeaderDAO =>
filterHeaderDAO.getBestFilterHeader.map { opt =>
assert(opt == None)
}
}
}

View file

@ -123,8 +123,9 @@ trait ChainApi extends ChainQueryApi {
/** Finds the "best" filter header we have stored in our database /** Finds the "best" filter header we have stored in our database
* What this means in practice is the latest filter header we * What this means in practice is the latest filter header we
* have received from our peer. * have received from our peer.
* Returns none if we have no filters in the database
* */ * */
def getBestFilterHeader(): Future[CompactFilterHeaderDb] def getBestFilterHeader(): Future[Option[CompactFilterHeaderDb]]
/** /**
* Looks up a compact filter header by its hash. * Looks up a compact filter header by its hash.

View file

@ -352,10 +352,15 @@ case class ChainHandler(
/** @inheritdoc */ /** @inheritdoc */
override def getFilterHeaderCount: Future[Int] = { override def getFilterHeaderCount: Future[Int] = {
logger.debug(s"Querying for filter header count") logger.debug(s"Querying for filter header count")
filterHeaderDAO.getBestFilter.map { filterHeader => filterHeaderDAO.getBestFilterHeader.map { filterHeaderOpt =>
val height = filterHeader.height filterHeaderOpt match {
logger.debug(s"getFilterCount result: count=$height") case Some(filterHeader) =>
height val height = filterHeader.height
logger.debug(s"getFilterCount result: count=$height")
height
case None =>
0
}
} }
} }
@ -365,8 +370,8 @@ case class ChainHandler(
filterHeaderDAO.getAtHeight(height) filterHeaderDAO.getAtHeight(height)
/** @inheritdoc */ /** @inheritdoc */
override def getBestFilterHeader(): Future[CompactFilterHeaderDb] = { override def getBestFilterHeader(): Future[Option[CompactFilterHeaderDb]] = {
filterHeaderDAO.getBestFilter filterHeaderDAO.getBestFilterHeader
} }
/** @inheritdoc */ /** @inheritdoc */
@ -377,10 +382,14 @@ case class ChainHandler(
/** @inheritdoc */ /** @inheritdoc */
override def getFilterCount: Future[Int] = { override def getFilterCount: Future[Int] = {
logger.debug(s"Querying for filter count") logger.debug(s"Querying for filter count")
filterDAO.getBestFilter.map { filter => filterDAO.getBestFilter.map { filterOpt =>
val height = filter.height filterOpt match {
logger.debug(s"getFilterCount result: count=$height") case Some(filter) =>
height val height = filter.height
logger.debug(s"getFilterCount result: count=$height")
height
case None => 0
}
} }
} }

View file

@ -34,16 +34,23 @@ abstract class FilterSync extends ChainVerificationLogger {
implicit ec: ExecutionContext, implicit ec: ExecutionContext,
chainAppConfig: ChainAppConfig): Future[ChainApi] = { chainAppConfig: ChainAppConfig): Future[ChainApi] = {
val ourBestFilterHeaderF = chainApi.getBestFilterHeader() val ourBestFilterHeaderOptF = chainApi.getBestFilterHeader()
val ourBestBlockHeaderF = chainApi.getBestBlockHeader() val ourBestBlockHeaderF = chainApi.getBestBlockHeader()
for { for {
ours <- ourBestFilterHeaderF oursOpt <- ourBestFilterHeaderOptF
ourBestBlockHeader <- ourBestBlockHeaderF ourBestBlockHeader <- ourBestBlockHeaderF
syncedChainApi <- syncFiltersToTip(chainApi = chainApi, syncedChainApi <- {
ourBestHeader = ourBestBlockHeader, oursOpt match {
ourBestFilterHeader = ours, case Some(ours) =>
getFilterFunc = getFilterFunc, syncFiltersToTip(chainApi = chainApi,
batchSize) ourBestHeader = ourBestBlockHeader,
ourBestFilterHeader = ours,
getFilterFunc = getFilterFunc,
batchSize)
case None =>
Future.failed(new RuntimeException(s"Cannot sync filters, we don't have any in the database"))
}
}
} yield { } yield {
syncedChainApi syncedChainApi
} }
@ -104,11 +111,19 @@ abstract class FilterSync extends ChainVerificationLogger {
case (apiF, missingHeaders) => case (apiF, missingHeaders) =>
for { for {
api <- apiF api <- apiF
bestFilter <- api.getBestFilterHeader() bestFilterOpt <- api.getBestFilterHeader()
newApi <- fetchFiltersForHeaderGroup(api, newApi <- {
missingHeaders, bestFilterOpt match {
bestFilter, case Some(bestFilter) =>
getFilterFunc) fetchFiltersForHeaderGroup(api,
missingHeaders,
bestFilter,
getFilterFunc)
case None =>
Future.failed(new RuntimeException(s"Cannot sync filter headers, we do not have any in the database"))
}
}
} yield newApi } yield newApi
} }
} }

View file

@ -125,14 +125,25 @@ case class CompactFilterDAO()(
table.filter(header => header.height >= from && header.height <= to).result table.filter(header => header.height >= from && header.height <= to).result
} }
def getBestFilter: Future[CompactFilterDb] = { /** Gets the heaviest filter from the database */
val join = table join blockHeaderTable on (_.blockHash === _.hash) def getBestFilter: Future[Option[CompactFilterDb]] = {
val join = (table.join(blockHeaderTable))
.on(_.blockHash === _.hash)
val query = join.groupBy(_._1).map { val query = join.groupBy(_._1).map {
case (filter, headers) => case (filter, headers) =>
filter -> headers.map(_._2.chainWork).max filter -> headers.map(_._2.chainWork).max
} }
safeDatabase val filtersWithWorkF: Future[Vector[(CompactFilterDb,Option[BigInt])]] = {
.runVec(query.result) safeDatabase.runVec(query.result)
.map(_.maxBy(_._2.getOrElse(BigInt(0)))._1) }
filtersWithWorkF.map { filtersWithWork =>
if (filtersWithWork.isEmpty) {
None
} else {
val highest = filtersWithWork.maxBy(_._2.getOrElse(BigInt(0)))._1
Some(highest)
}
}
} }
} }

View file

@ -117,15 +117,23 @@ case class CompactFilterHeaderDAO()(
query query
} }
def getBestFilter: Future[CompactFilterHeaderDb] = { def getBestFilterHeader: Future[Option[CompactFilterHeaderDb]] = {
val join = table join blockHeaderTable on (_.blockHash === _.hash) val join = table join blockHeaderTable on (_.blockHash === _.hash)
val query = join.groupBy(_._1).map { val query = join.groupBy(_._1).map {
case (filter, headers) => case (filter, headers) =>
filter -> headers.map(_._2.chainWork).max filter -> headers.map(_._2.chainWork).max
} }
safeDatabase val headersWithWorkF: Future[Vector[(CompactFilterHeaderDb,Option[BigInt])]] = {
.runVec(query.result) safeDatabase.runVec(query.result)
.map(_.maxBy(_._2.getOrElse(BigInt(0)))._1) }
headersWithWorkF.map { headersWithWork: Vector[(CompactFilterHeaderDb,Option[BigInt])] =>
if (headersWithWork.isEmpty) {
None
} else {
val highestWork = headersWithWork.maxBy(_._2.getOrElse(BigInt(0)))._1
Some(highestWork)
}
}
} }
} }

View file

@ -155,6 +155,19 @@ trait ChainUnitTest
destroy = () => ChainUnitTest.destroyAllTables)(test) destroy = () => ChainUnitTest.destroyAllTables)(test)
} }
/** Creates a compact filter DAO with zero rows in it */
def withCompactFilterHeaderDAO(test: OneArgAsyncTest): FutureOutcome = {
makeFixture(build = () => ChainUnitTest.createFilterHeaderDAO(),
destroy = ChainUnitTest.destroyAllTables)(test)
}
/** Creates a compact filter DAO with zero rows in it */
def withCompactFilterDAO(test: OneArgAsyncTest): FutureOutcome = {
makeFixture(build = () => ChainUnitTest.createFilterDAO(),
destroy = ChainUnitTest.destroyAllTables)(test)
}
def withPopulatedBlockHeaderDAO(test: OneArgAsyncTest): FutureOutcome = { def withPopulatedBlockHeaderDAO(test: OneArgAsyncTest): FutureOutcome = {
makeFixture(build = () => ChainUnitTest.createPopulatedBlockHeaderDAO, makeFixture(build = () => ChainUnitTest.createPopulatedBlockHeaderDAO,
destroy = () => ChainUnitTest.destroyAllTables)(test) destroy = () => ChainUnitTest.destroyAllTables)(test)
@ -313,6 +326,7 @@ object ChainUnitTest extends ChainVerificationLogger {
def createFilterHeaderDAO()( def createFilterHeaderDAO()(
implicit appConfig: ChainAppConfig, implicit appConfig: ChainAppConfig,
ec: ExecutionContext): Future[CompactFilterHeaderDAO] = { ec: ExecutionContext): Future[CompactFilterHeaderDAO] = {
appConfig.migrate()
Future.successful(CompactFilterHeaderDAO()) Future.successful(CompactFilterHeaderDAO())
} }
@ -325,6 +339,7 @@ object ChainUnitTest extends ChainVerificationLogger {
def createFilterDAO()( def createFilterDAO()(
implicit appConfig: ChainAppConfig, implicit appConfig: ChainAppConfig,
ec: ExecutionContext): Future[CompactFilterDAO] = { ec: ExecutionContext): Future[CompactFilterDAO] = {
appConfig.migrate()
Future.successful(CompactFilterDAO()) Future.successful(CompactFilterDAO())
} }