Optimize node.start() and fetching filter & filter header heights (#2554)

* Make node.start() more async

* Add missing filter indexes

* Move queries to vals
This commit is contained in:
benthecarman 2021-01-22 10:38:32 -06:00 committed by GitHub
parent e2ace38a1f
commit 0d07d558c4
6 changed files with 64 additions and 45 deletions

View file

@ -0,0 +1,2 @@
CREATE INDEX cfilters_block_hash_index on cfilters (block_hash);
CREATE INDEX cfheaders_hash_index on cfheaders (hash);

View file

@ -0,0 +1,2 @@
CREATE INDEX "cfilters_block_hash_index" on "cfilters" ("block_hash");
CREATE INDEX "cfheaders_hash_index" on "cfheaders" ("hash");

View file

@ -128,21 +128,23 @@ case class CompactFilterDAO()(implicit
table.filter(header => header.height >= from && header.height <= to).result
}
/** Gets the heaviest filter from the database */
def getBestFilter: Future[Option[CompactFilterDb]] = {
private val bestFilterQuery = {
val join = table
.join(blockHeaderTable)
.on(_.blockHash === _.hash)
val maxQuery = join.map(_._2.chainWork).max
val query = join.filter(_._2.chainWork === maxQuery).take(1).map(_._1)
join
.filter(_._2.chainWork === maxQuery)
.take(1)
.map(_._1)
.result
.transactionally
}
for {
filterOpt <-
safeDatabase
.run(query.result)
.map(_.headOption)
} yield filterOpt
/** Gets the heaviest filter from the database */
def getBestFilter: Future[Option[CompactFilterDb]] = {
safeDatabase.run(bestFilterQuery).map(_.headOption)
}
}

View file

@ -121,26 +121,28 @@ case class CompactFilterHeaderDAO()(implicit
query
}
/** Fetches the best filter header from the database _without_ context
* that it's actually in our best blockchain. For instance, this filter header could be
* reorged out for whatever reason.
* @see https://github.com/bitcoin-s/bitcoin-s/issues/1919#issuecomment-682041737
*/
def getBestFilterHeader: Future[Option[CompactFilterHeaderDb]] = {
private val bestFilterHeaderQuery = {
val join = table
.join(blockHeaderTable)
.on(_.blockHash === _.hash)
val maxQuery = join.map(_._2.chainWork).max
val query = join.filter(_._2.chainWork === maxQuery).take(1).map(_._1)
join
.filter(_._2.chainWork === maxQuery)
.take(1)
.map(_._1)
.result
.transactionally
}
for {
filterOpt <-
safeDatabase
.run(query.result)
.map(_.headOption)
} yield filterOpt
/** Fetches the best filter header from the database _without_ context
* that it's actually in our best blockchain. For instance, this filter header could be
* reorged out for whatever reason.
* @see https://github.com/bitcoin-s/bitcoin-s/issues/1919#issuecomment-682041737
*/
def getBestFilterHeader: Future[Option[CompactFilterHeaderDb]] = {
safeDatabase.run(bestFilterHeaderQuery).map(_.headOption)
}
/** This looks for best filter headers whose [[CompactFilterHeaderDb.blockHashBE]] are associated with the given

View file

@ -51,13 +51,13 @@ class DbManagementTest extends BitcoinSAsyncTest with EmbeddedPg {
val result = chainDbManagement.migrate()
chainAppConfig.driver match {
case SQLite =>
val expected = 5
val expected = 6
assert(result == expected)
val flywayInfo = chainDbManagement.info()
assert(flywayInfo.applied().length == expected)
assert(flywayInfo.pending().length == 0)
case PostgreSQL =>
val expected = 4
val expected = 5
assert(result == expected)
val flywayInfo = chainDbManagement.info()
//+1 for << Flyway Schema Creation >>

View file

@ -118,34 +118,45 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
logger.info("Starting node")
val start = System.currentTimeMillis()
for {
val startConfsF = for {
_ <- chainAppConfig.start()
_ <- nodeAppConfig.start()
// get chainApi so we don't need to call chainApiFromDb on every call
chainApi <- chainApiFromDb()
node <- {
val isInitializedF = for {
_ <- peerMsgSenderF.map(_.connect())
_ <- AsyncUtil.retryUntilSatisfiedF(() => isInitialized,
interval = 250.millis)
} yield ()
} yield ()
isInitializedF.failed.foreach(err =>
logger.error(s"Failed to connect with peer=$peer with err=${err}"))
val chainApiF = startConfsF.flatMap(_ => chainApiFromDb())
isInitializedF.map { _ =>
logger.info(s"Our peer=$peer has been initialized")
logger.info(s"Our node has been full started. It took=${System
.currentTimeMillis() - start}ms")
this
}
val startNodeF = {
val isInitializedF = for {
_ <- peerMsgSenderF.map(_.connect())
_ <- AsyncUtil.retryUntilSatisfiedF(() => isInitialized,
interval = 250.millis)
} yield ()
isInitializedF.failed.foreach(err =>
logger.error(s"Failed to connect with peer=$peer with err=$err"))
isInitializedF.map { _ =>
logger.info(s"Our peer=$peer has been initialized")
logger.info(s"Our node has been full started. It took=${System
.currentTimeMillis() - start}ms")
this
}
}
val bestHashF = chainApiF.flatMap(_.getBestBlockHash())
val bestHeightF = chainApiF.flatMap(_.getBestHashBlockHeight())
val filterHeaderCountF = chainApiF.flatMap(_.getFilterHeaderCount())
val filterCountF = chainApiF.flatMap(_.getFilterCount())
for {
_ <- startConfsF
node <- startNodeF
_ = logger.trace("Fetching node starting point")
bestHash <- chainApi.getBestBlockHash()
bestHeight <- chainApi.getBestHashBlockHeight()
filterCount <- chainApi.getFilterCount()
filterHeaderCount <- chainApi.getFilterHeaderCount()
bestHash <- bestHashF
bestHeight <- bestHeightF
filterHeaderCount <- filterHeaderCountF
filterCount <- filterCountF
} yield {
logger.info(
s"Started node, best block hash ${bestHash.hex} at height $bestHeight, with $filterHeaderCount filter headers and $filterCount filters")