Optimize filter sync and fetching filter heights (#2568)

This commit is contained in:
benthecarman 2021-01-30 14:56:47 -06:00 committed by GitHub
parent 2055c5a3c7
commit 03ca6f617e
14 changed files with 84 additions and 68 deletions

View file

@ -169,7 +169,7 @@ class BitcoindRpcClient(val instance: BitcoindInstance)(implicit
s"Bitcoind chainApi doesn't allow you fetch block header batch range")) s"Bitcoind chainApi doesn't allow you fetch block header batch range"))
override def nextFilterHeaderBatchRange( override def nextFilterHeaderBatchRange(
stopHash: DoubleSha256DigestBE, startHeight: Int,
batchSize: Int): Future[Option[FilterSyncMarker]] = batchSize: Int): Future[Option[FilterSyncMarker]] =
Future.failed( Future.failed(
new UnsupportedOperationException( new UnsupportedOperationException(

View file

@ -426,8 +426,7 @@ class ChainHandlerTest extends ChainDbUnitTest {
for { for {
bestBlock <- chainHandler.getBestBlockHeader() bestBlock <- chainHandler.getBestBlockHeader()
bestBlockHashBE = bestBlock.hashBE bestBlockHashBE = bestBlock.hashBE
rangeOpt <- rangeOpt <- chainHandler.nextFilterHeaderBatchRange(0, 1)
chainHandler.nextFilterHeaderBatchRange(DoubleSha256DigestBE.empty, 1)
} yield { } yield {
val marker = rangeOpt.get val marker = rangeOpt.get
assert(rangeOpt.nonEmpty) assert(rangeOpt.nonEmpty)

View file

@ -317,30 +317,24 @@ class ChainHandler(
/** @inheritdoc */ /** @inheritdoc */
override def nextFilterHeaderBatchRange( override def nextFilterHeaderBatchRange(
prevStopHash: DoubleSha256DigestBE, filterHeight: Int,
batchSize: Int): Future[Option[FilterSyncMarker]] = { batchSize: Int): Future[Option[FilterSyncMarker]] = {
val startHeightF = if (prevStopHash == DoubleSha256DigestBE.empty) { val startHeight = if (filterHeight <= 0) 0 else filterHeight + 1
Future.successful(0) val stopHeight = startHeight - 1 + batchSize
} else {
for { val stopBlockF =
prevStopHeaderOpt <- getFilterHeader(prevStopHash) getFilterHeadersAtHeight(stopHeight).map(_.headOption).flatMap {
prevStopHeader = prevStopHeaderOpt.getOrElse( case Some(stopBlock) =>
throw UnknownBlockHash(s"Unknown block hash ${prevStopHash}")) Future.successful(stopBlock)
} yield prevStopHeader.height + 1 case None =>
// This means the stop height is past the filter header height
getBestFilterHeader().map(
_.getOrElse(throw UnknownBlockHeight(
s"Unknown filter header height $stopHeight")))
} }
for { stopBlockF.map { stopBlock =>
startHeight <- startHeightF if (startHeight > stopBlock.height)
filterHeaderCount <- getFilterHeaderCount()
stopHeight =
if (startHeight - 1 + batchSize > filterHeaderCount)
filterHeaderCount
else startHeight - 1 + batchSize
stopBlockOpt <- getFilterHeadersAtHeight(stopHeight).map(_.headOption)
stopBlock = stopBlockOpt.getOrElse(
throw UnknownBlockHeight(s"Unknown filter header height ${stopHeight}"))
} yield {
if (startHeight > stopHeight)
None None
else else
Some(FilterSyncMarker(startHeight, stopBlock.blockHashBE.flip)) Some(FilterSyncMarker(startHeight, stopBlock.blockHashBE.flip))
@ -525,13 +519,9 @@ class ChainHandler(
/** @inheritdoc */ /** @inheritdoc */
override def getFilterHeaderCount(): Future[Int] = { override def getFilterHeaderCount(): Future[Int] = {
logger.debug(s"Querying for filter header count") logger.debug(s"Querying for filter header count")
filterHeaderDAO.getBestFilterHeader.map { filterHeaderDAO.getBestFilterHeaderHeight.map { height =>
case Some(filterHeader) =>
val height = filterHeader.height
logger.debug(s"getFilterHeaderCount result: count=$height") logger.debug(s"getFilterHeaderCount result: count=$height")
height height
case None =>
0
} }
} }
@ -639,12 +629,9 @@ class ChainHandler(
/** @inheritdoc */ /** @inheritdoc */
override def getFilterCount(): Future[Int] = { override def getFilterCount(): Future[Int] = {
logger.debug(s"Querying for filter count") logger.debug(s"Querying for filter count")
filterDAO.getBestFilter.map { filterDAO.getBestFilterHeight.map { height =>
case Some(filter) =>
val height = filter.height
logger.debug(s"getFilterCount result: count=$height") logger.debug(s"getFilterCount result: count=$height")
height height
case None => 0
} }
} }

View file

@ -147,4 +147,23 @@ case class CompactFilterDAO()(implicit
def getBestFilter: Future[Option[CompactFilterDb]] = { def getBestFilter: Future[Option[CompactFilterDb]] = {
safeDatabase.run(bestFilterQuery).map(_.headOption) safeDatabase.run(bestFilterQuery).map(_.headOption)
} }
private val bestFilterHeightQuery = {
val join = table
.join(blockHeaderTable)
.on(_.blockHash === _.hash)
val maxQuery = join.map(_._2.chainWork).max
join
.filter(_._2.chainWork === maxQuery)
.take(1)
.map(_._1.height)
.result
.transactionally
}
def getBestFilterHeight: Future[Int] = {
safeDatabase.run(bestFilterHeightQuery).map(_.headOption.getOrElse(0))
}
} }

View file

@ -145,6 +145,25 @@ case class CompactFilterHeaderDAO()(implicit
safeDatabase.run(bestFilterHeaderQuery).map(_.headOption) safeDatabase.run(bestFilterHeaderQuery).map(_.headOption)
} }
private val bestFilterHeaderHeightQuery = {
val join = table
.join(blockHeaderTable)
.on(_.blockHash === _.hash)
val maxQuery = join.map(_._2.chainWork).max
join
.filter(_._2.chainWork === maxQuery)
.take(1)
.map(_._1.height)
.result
.transactionally
}
def getBestFilterHeaderHeight: Future[Int] = {
safeDatabase.run(bestFilterHeaderHeightQuery).map(_.headOption.getOrElse(0))
}
/** This looks for best filter headers whose [[CompactFilterHeaderDb.blockHashBE]] are associated with the given /** This looks for best filter headers whose [[CompactFilterHeaderDb.blockHashBE]] are associated with the given
* [[BlockHeaderDb.hashBE]] given as a parameter. * [[BlockHeaderDb.hashBE]] given as a parameter.
*/ */

View file

@ -39,10 +39,10 @@ trait ChainApi extends ChainQueryApi {
*/ */
def processHeaders(headers: Vector[BlockHeader]): Future[ChainApi] def processHeaders(headers: Vector[BlockHeader]): Future[ChainApi]
/** Gets a [[org.bitcoins.chain.models.BlockHeaderDb]] from the chain's database */ /** Gets a [[org.bitcoins.core.api.chain.db.BlockHeaderDb]] from the chain's database */
def getHeader(hash: DoubleSha256DigestBE): Future[Option[BlockHeaderDb]] def getHeader(hash: DoubleSha256DigestBE): Future[Option[BlockHeaderDb]]
/** Gets all [[org.bitcoins.chain.models.BlockHeaderDb]]s at a given height */ /** Gets all [[org.bitcoins.core.api.chain.db.BlockHeaderDb]]s at a given height */
def getHeadersAtHeight(height: Int): Future[Vector[BlockHeaderDb]] def getHeadersAtHeight(height: Int): Future[Vector[BlockHeaderDb]]
/** Gets the number of blocks in the database */ /** Gets the number of blocks in the database */
@ -83,7 +83,7 @@ trait ChainApi extends ChainQueryApi {
* Generates a filter header range in form of (startHeight, stopHash) by the given stop hash. * Generates a filter header range in form of (startHeight, stopHash) by the given stop hash.
*/ */
def nextFilterHeaderBatchRange( def nextFilterHeaderBatchRange(
prevStopHash: DoubleSha256DigestBE, startHeight: Int,
batchSize: Int): Future[Option[FilterSyncMarker]] batchSize: Int): Future[Option[FilterSyncMarker]]
/** /**

View file

@ -60,7 +60,7 @@ bitcoin-s {
# to keep the sync time fast, however, for regtest it should be small # to keep the sync time fast, however, for regtest it should be small
# so it does not exceed the chain size. # so it does not exceed the chain size.
filter-batch-size = 100 filter-batch-size = 1000
} }
# this config key is read by Slick # this config key is read by Slick
db { db {

View file

@ -168,7 +168,7 @@ bitcoin-s {
# to keep the sync time fast, however, for regtest it should be small # to keep the sync time fast, however, for regtest it should be small
# so it does not exceed the chain size. # so it does not exceed the chain size.
filter-batch-size = 100 filter-batch-size = 1000
} }
hikari-logging = true hikari-logging = true

View file

@ -77,11 +77,11 @@ case class NeutrinoNode(
prevStopHash = header.hashBE) prevStopHash = header.hashBE)
// If we have started syncing filters // If we have started syncing filters
if (filterCount != filterHeaderCount) if (filterCount != filterHeaderCount && filterCount != 0)
peerMsgSender.sendNextGetCompactFilterCommand( peerMsgSender.sendNextGetCompactFilterCommand(
chainApi = chainApi, chainApi = chainApi,
filterBatchSize = chainConfig.filterBatchSize, filterBatchSize = chainConfig.filterBatchSize,
stopHash = header.hashBE) startHeight = filterCount)
} }
logger.info( logger.info(

View file

@ -100,9 +100,10 @@ case class DataMessageHandler(
Future.successful((filterHeaderHeight, filterHeight + 1)) Future.successful((filterHeaderHeight, filterHeight + 1))
case (_, _) => // If either are None case (_, _) => // If either are None
for { for {
filterHeaderCount <- chainApi.getFilterHeaderCount() filterHeaderHeight <- chainApi.getFilterHeaderCount()
filterCount <- chainApi.getFilterCount() filterHeight <- chainApi.getFilterCount()
} yield (filterHeaderCount, filterCount + 1) } yield (filterHeaderHeight,
if (filterHeight == 0) 0 else filterHeight + 1)
} }
newSyncing = newSyncing =
if (batchSizeFull) { if (batchSizeFull) {
@ -136,8 +137,7 @@ case class DataMessageHandler(
if (batchSizeFull) { if (batchSizeFull) {
logger.info( logger.info(
s"Received maximum amount of filters in one batch. This means we are not synced, requesting more") s"Received maximum amount of filters in one batch. This means we are not synced, requesting more")
sendNextGetCompactFilterCommand(peerMsgSender, sendNextGetCompactFilterCommand(peerMsgSender, newFilterHeight)
filter.blockHash.flip)
} else FutureUtil.unit } else FutureUtil.unit
} yield { } yield {
this.copy( this.copy(
@ -352,26 +352,17 @@ case class DataMessageHandler(
private def sendNextGetCompactFilterCommand( private def sendNextGetCompactFilterCommand(
peerMsgSender: PeerMessageSender, peerMsgSender: PeerMessageSender,
stopHash: DoubleSha256DigestBE): Future[Boolean] = startHeight: Int): Future[Boolean] =
peerMsgSender.sendNextGetCompactFilterCommand(chainApi = chainApi, peerMsgSender.sendNextGetCompactFilterCommand(chainApi = chainApi,
filterBatchSize = filterBatchSize =
chainConfig.filterBatchSize, chainConfig.filterBatchSize,
stopHash = stopHash) startHeight = startHeight)
private def sendFirstGetCompactFilterCommand( private def sendFirstGetCompactFilterCommand(
peerMsgSender: PeerMessageSender): Future[Boolean] = peerMsgSender: PeerMessageSender): Future[Boolean] =
for { for {
filterCount <- chainApi.getFilterCount() filterCount <- chainApi.getFilterCount()
highestFilterOpt <- res <- sendNextGetCompactFilterCommand(peerMsgSender, filterCount)
chainApi
.getFiltersAtHeight(filterCount)
.map(_.headOption)
highestFilterBlockHash =
highestFilterOpt
.map(_.blockHashBE)
.getOrElse(DoubleSha256DigestBE.empty)
res <-
sendNextGetCompactFilterCommand(peerMsgSender, highestFilterBlockHash)
} yield res } yield res
private def handleInventoryMsg( private def handleInventoryMsg(

View file

@ -194,11 +194,10 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
private[node] def sendNextGetCompactFilterCommand( private[node] def sendNextGetCompactFilterCommand(
chainApi: ChainApi, chainApi: ChainApi,
filterBatchSize: Int, filterBatchSize: Int,
stopHash: DoubleSha256DigestBE)(implicit startHeight: Int)(implicit ec: ExecutionContext): Future[Boolean] = {
ec: ExecutionContext): Future[Boolean] = {
for { for {
filterSyncMarkerOpt <- filterSyncMarkerOpt <-
chainApi.nextFilterHeaderBatchRange(stopHash, filterBatchSize) chainApi.nextFilterHeaderBatchRange(startHeight, filterBatchSize)
res <- filterSyncMarkerOpt match { res <- filterSyncMarkerOpt match {
case Some(filterSyncMarker) => case Some(filterSyncMarker) =>
logger.info(s"Requesting compact filters from $filterSyncMarker") logger.info(s"Requesting compact filters from $filterSyncMarker")

View file

@ -76,7 +76,9 @@ bitcoin-s {
# to keep the sync time fast, however, for regtest it should be small # to keep the sync time fast, however, for regtest it should be small
# so it does not exceed the chain size. # so it does not exceed the chain size.
filter-batch-size = 100 # Set a small filter batch size in testkit so we test fetching
# multiple filter batches
filter-batch-size = 10
} }
# this config key is read by Slick # this config key is read by Slick
db { db {

View file

@ -108,7 +108,7 @@ trait NodeUnitTest extends BitcoinSFixture with EmbeddedPg {
Future.successful(None) Future.successful(None)
override def nextFilterHeaderBatchRange( override def nextFilterHeaderBatchRange(
stopHash: DoubleSha256DigestBE, startHeight: Int,
batchSize: Int): Future[Option[FilterSyncMarker]] = batchSize: Int): Future[Option[FilterSyncMarker]] =
Future.successful(None) Future.successful(None)

View file

@ -153,7 +153,7 @@ private[wallet] trait UtxoHandling extends WalletLogger {
_ = _ =
if (toUpdate.nonEmpty) if (toUpdate.nonEmpty)
logger.info(s"${toUpdate.size} txos are now confirmed!") logger.info(s"${toUpdate.size} txos are now confirmed!")
else logger.info("No txos to be confirmed") else logger.debug("No txos to be confirmed")
updated <- spendingInfoDAO.upsertAllSpendingInfoDb(toUpdate.flatten) updated <- spendingInfoDAO.upsertAllSpendingInfoDb(toUpdate.flatten)
} yield updated } yield updated
} }