Batch add filters to database (#1725)

* Process filter headers in batch

* Use correct batch

* Cache filter heights

* Convert to option
This commit is contained in:
Ben Carman 2020-08-04 07:30:42 -05:00 committed by GitHub
parent 231b692fdf
commit a022fbaed1

View file

@ -4,7 +4,6 @@ import org.bitcoins.chain.api.ChainApi
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.gcs.BlockFilter
import org.bitcoins.core.p2p._
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.crypto.DoubleSha256DigestBE
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.BroadcastAbleTransactionDAO
@ -23,6 +22,8 @@ case class DataMessageHandler(
chainApi: ChainApi,
callbacks: NodeCallbacks,
currentFilterBatch: Vector[CompactFilterMessage] = Vector.empty,
filterHeaderHeightOpt: Option[Int] = None,
filterHeightOpt: Option[Int] = None,
syncing: Boolean = false)(implicit
ec: ExecutionContext,
appConfig: NodeAppConfig,
@ -71,50 +72,72 @@ case class DataMessageHandler(
synced
}
}
newFilterHeaderHeight <- filterHeaderHeightOpt match {
case None =>
chainApi.getFilterHeaderCount()
case Some(filterHeaderHeight) =>
Future.successful(filterHeaderHeight + filterHeaders.size)
}
} yield {
this.copy(chainApi = newChainApi, syncing = newSyncing)
this.copy(chainApi = newChainApi,
syncing = newSyncing,
filterHeaderHeightOpt = Some(newFilterHeaderHeight))
}
case filter: CompactFilterMessage =>
logger.debug(s"Received ${filter.commandName}, $filter")
val batchSizeFull: Boolean =
currentFilterBatch.size == chainConfig.filterBatchSize - 1
for {
(newBatch, newSyncing) <-
(newFilterHeaderHeight, newFilterHeight) <-
(filterHeaderHeightOpt, filterHeightOpt) match {
case (Some(filterHeaderHeight), Some(filterHeight)) =>
Future.successful((filterHeaderHeight, filterHeight + 1))
case (_, _) => // If either are None
for {
filterHeaderCount <- chainApi.getFilterHeaderCount()
filterCount <- chainApi.getFilterCount()
} yield (filterHeaderCount, filterCount + 1)
}
newSyncing <-
if (batchSizeFull) {
logger.info(
s"Received maximum amount of filters in one batch. This means we are not synced, requesting more")
for {
_ <- sendNextGetCompactFilterCommand(peerMsgSender,
filter.blockHash.flip)
} yield (Vector.empty, syncing)
} yield syncing
} else {
for {
filterHeaderCount <- chainApi.getFilterHeaderCount()
filterCount <- chainApi.getFilterCount()
} yield {
val syncing = filterCount < filterHeaderCount - 1
if (!syncing) {
logger.info(s"We are synced")
}
(currentFilterBatch :+ filter, syncing)
val syncing = newFilterHeight < newFilterHeaderHeight
if (!syncing) {
logger.info(s"We are synced")
}
Future.successful(syncing)
}
newChainApi <- chainApi.processFilter(filter)
// If we are not syncing or our filter batch is full, process the filters
_ <-
filterBatch = currentFilterBatch :+ filter
(newBatch, newChainApi) <-
if (!newSyncing || batchSizeFull) {
val blockFilters = newBatch.map { filter =>
val blockFilters = filterBatch.map { filter =>
(filter.blockHash,
BlockFilter.fromBytes(filter.filterBytes, filter.blockHash))
}
callbacks.executeOnCompactFiltersReceivedCallbacks(logger,
blockFilters)
} else FutureUtil.unit
logger.debug(s"Processing ${filterBatch.size} filters")
for {
newChainApi <- chainApi.processFilters(filterBatch)
_ <- callbacks.executeOnCompactFiltersReceivedCallbacks(
logger,
blockFilters)
} yield (Vector.empty, newChainApi)
} else Future.successful((filterBatch, chainApi))
} yield {
this.copy(chainApi = newChainApi,
currentFilterBatch = newBatch,
syncing = newSyncing)
this.copy(
chainApi = newChainApi,
currentFilterBatch = newBatch,
syncing = newSyncing,
filterHeaderHeightOpt = Some(newFilterHeaderHeight),
filterHeightOpt = Some(newFilterHeight)
)
}
case notHandling @ (MemPoolMessage | _: GetHeadersMessage |
_: GetBlocksMessage | _: GetCompactFiltersMessage |