Introduce 'FilterSyncMarker' to ChainApi, make it clearier what exact… (#2003)

* Introduce 'FilterSyncMarker' to ChainApi, make it clearier what exactly the (Int,DoubleSha256Digest) tuple is returned from ChainApi.nextBlockHeaderRange()

* Fix doc

* Add scaladoc to FilterSyncMarker

* Rebase onto master, fix conflicts to use FilterSyncMarker
This commit is contained in:
Chris Stewart 2020-09-11 13:48:40 -05:00 committed by GitHub
parent 4f02ee15db
commit 76b9577181
17 changed files with 85 additions and 77 deletions

View file

@ -6,8 +6,8 @@ import akka.http.scaladsl.model.ContentTypes._
import akka.http.scaladsl.server.ValidationRejection
import akka.http.scaladsl.testkit.ScalatestRouteTest
import org.bitcoins.core.Core
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.api.wallet.{AddressInfo, CoinSelectionAlgo}
import org.bitcoins.core.api.chain.db.ChainApi
import org.bitcoins.core.api.wallet.db.{
AccountDb,
AddressDb,

View file

@ -4,7 +4,7 @@ import akka.actor.ActorSystem
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import org.bitcoins.commons.serializers.Picklers._
import org.bitcoins.core.api.chain.db.ChainApi
import org.bitcoins.core.api.chain.ChainApi
case class ChainRoutes(chain: ChainApi)(implicit system: ActorSystem)
extends ServerRoute {

View file

@ -14,7 +14,7 @@ import org.bitcoins.chain.models.{
CompactFilterHeaderDAO
}
import org.bitcoins.core.Core
import org.bitcoins.core.api.chain.db.ChainApi
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.config.{BitcoinNetworks, MainNet, RegTest, TestNet3}
import org.bitcoins.core.util.{BitcoinSLogger, FutureUtil, NetworkUtil}
import org.bitcoins.db._

View file

@ -1,11 +1,8 @@
package org.bitcoins.chain.blockchain
import org.bitcoins.chain.pow.Pow
import org.bitcoins.core.api.chain.db.{
BlockHeaderDb,
BlockHeaderDbHelper,
ChainApi
}
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.api.chain.db.{BlockHeaderDb, BlockHeaderDbHelper}
import org.bitcoins.core.gcs.{BlockFilter, FilterHeader}
import org.bitcoins.core.number.{Int32, UInt32}
import org.bitcoins.core.p2p.CompactFilterMessage
@ -317,9 +314,10 @@ class ChainHandlerTest extends ChainDbUnitTest {
rangeOpt <-
chainHandler.nextBlockHeaderBatchRange(DoubleSha256DigestBE.empty, 1)
} yield {
val marker = rangeOpt.get
assert(rangeOpt.nonEmpty)
assert(rangeOpt.get._1 == 0)
assert(rangeOpt.get._2 == genesisHeader.hash)
assert(marker.startHeight == 0)
assert(marker.stopBlockHash == genesisHeader.hash)
}
//let's process a block header, and then be able to fetch that header as the last stopHash
@ -340,9 +338,10 @@ class ChainHandlerTest extends ChainDbUnitTest {
rangeOpt <-
chainApi.nextBlockHeaderBatchRange(DoubleSha256DigestBE.empty, 2)
} yield {
val marker = rangeOpt.get
assert(rangeOpt.nonEmpty)
assert(rangeOpt.get._1 == 0)
assert(rangeOpt.get._2 == blockHeader.hash)
assert(marker.startHeight == 0)
assert(marker.stopBlockHash == blockHeader.hash)
}
}
@ -365,9 +364,9 @@ class ChainHandlerTest extends ChainDbUnitTest {
count <- chainHandler.getBlockCount()
} yield {
assert(blockHeaderBatchOpt.isDefined)
val Some((height, hash)) = blockHeaderBatchOpt
assert(newHeaderB.hash == hash)
assert(newHeaderB.height == height)
val marker = blockHeaderBatchOpt.get
assert(newHeaderB.hash == marker.stopBlockHash)
assert(newHeaderB.height == marker.startHeight)
}
//now let's build a new block header ontop of C and process it
@ -386,9 +385,9 @@ class ChainHandlerTest extends ChainDbUnitTest {
} yield {
assert(count == 2)
assert(blockHeaderBatchOpt.isDefined)
val Some((height, hash)) = blockHeaderBatchOpt
assert(headerC.height == height)
assert(headerD.hash == hash)
val marker = blockHeaderBatchOpt.get
assert(headerC.height == marker.startHeight)
assert(headerD.hash == marker.stopBlockHash)
}
}
@ -401,9 +400,10 @@ class ChainHandlerTest extends ChainDbUnitTest {
rangeOpt <-
chainHandler.nextFilterHeaderBatchRange(DoubleSha256DigestBE.empty, 1)
} yield {
val marker = rangeOpt.get
assert(rangeOpt.nonEmpty)
assert(rangeOpt.get._1 == 0)
assert(rangeOpt.get._2 == bestBlockHashBE.flip)
assert(marker.startHeight == 0)
assert(marker.stopBlockHash == bestBlockHashBE.flip)
}
}

View file

@ -2,7 +2,7 @@ package org.bitcoins.chain.blockchain.sync
import akka.actor.ActorSystem
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.core.api.chain.db.ChainApi
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.crypto.DoubleSha256DigestBE
import org.bitcoins.testkit.chain.fixture.BitcoindChainHandlerViaRpc
import org.bitcoins.testkit.chain.{ChainDbUnitTest, SyncUtil}

View file

@ -1,7 +1,7 @@
package org.bitcoins.chain.blockchain.sync
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.core.api.chain.db.ChainApi
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.gcs.FilterType
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.testkit.chain.fixture.BitcoindV19ChainHandler

View file

@ -4,6 +4,7 @@ import org.bitcoins.chain.ChainVerificationLogger
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models._
import org.bitcoins.chain.pow.Pow
import org.bitcoins.core.api.chain.{ChainApi, FilterSyncMarker}
import org.bitcoins.core.api.chain.ChainQueryApi.FilterResponse
import org.bitcoins.core.api.chain.db._
import org.bitcoins.core.gcs.FilterHeader
@ -12,11 +13,7 @@ import org.bitcoins.core.p2p.CompactFilterMessage
import org.bitcoins.core.protocol.BlockStamp
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.crypto.{
CryptoUtil,
DoubleSha256Digest,
DoubleSha256DigestBE
}
import org.bitcoins.crypto.{CryptoUtil, DoubleSha256DigestBE}
import scala.annotation.tailrec
import scala.concurrent._
@ -149,8 +146,7 @@ case class ChainHandler(
/** @inheritdoc */
override def nextBlockHeaderBatchRange(
prevStopHash: DoubleSha256DigestBE,
batchSize: Int): Future[Option[(Int, DoubleSha256Digest)]] = {
batchSize: Int): Future[Option[FilterSyncMarker]] = {
for {
prevBlockHeaderOpt <- getHeader(prevStopHash)
headerOpt <- prevBlockHeaderOpt match {
@ -175,7 +171,7 @@ case class ChainHandler(
*/
private def findNextHeader(
prevBlockHeaderOpt: Option[BlockHeaderDb],
batchSize: Int): Future[Option[(Int, DoubleSha256Digest)]] = {
batchSize: Int): Future[Option[FilterSyncMarker]] = {
val chainsF = prevBlockHeaderOpt match {
case None =>
@ -217,7 +213,7 @@ case class ChainHandler(
private def getBestChainAtHeight(
startHeight: Int,
batchSize: Int,
blockchains: Vector[Blockchain]): Option[(Int, DoubleSha256Digest)] = {
blockchains: Vector[Blockchain]): Option[FilterSyncMarker] = {
//ok, we need to select the header that is contained in the chain
//with the most chain work
val targetHeight = startHeight + batchSize - 1
@ -227,11 +223,12 @@ case class ChainHandler(
val hashHeightOpt = mostWorkChainOpt.flatMap { mostWorkChain =>
val maxHeight = mostWorkChain.tip.height
if (targetHeight >= maxHeight) {
Some((startHeight, mostWorkChain.tip.hash))
val marker = FilterSyncMarker(startHeight, mostWorkChain.tip.hash)
Some(marker)
} else {
mostWorkChain
.find(_.height == targetHeight)
.map(h => (startHeight, h.hash))
.map(h => FilterSyncMarker(startHeight, h.hash))
}
}
hashHeightOpt
@ -240,7 +237,7 @@ case class ChainHandler(
/** @inheritdoc */
override def nextFilterHeaderBatchRange(
prevStopHash: DoubleSha256DigestBE,
batchSize: Int): Future[Option[(Int, DoubleSha256Digest)]] = {
batchSize: Int): Future[Option[FilterSyncMarker]] = {
val startHeightF = if (prevStopHash == DoubleSha256DigestBE.empty) {
Future.successful(0)
} else {
@ -265,7 +262,7 @@ case class ChainHandler(
if (startHeight > stopHeight)
None
else
Some((startHeight, stopBlock.blockHashBE.flip))
Some(FilterSyncMarker(startHeight, stopBlock.blockHashBE.flip))
}
}

View file

@ -3,7 +3,8 @@ package org.bitcoins.chain.blockchain.sync
import org.bitcoins.chain.ChainVerificationLogger
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.api.chain.db.{BlockHeaderDb, ChainApi}
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.api.chain.db.BlockHeaderDb
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.crypto.DoubleSha256DigestBE

View file

@ -2,11 +2,8 @@ package org.bitcoins.chain.blockchain.sync
import org.bitcoins.chain.ChainVerificationLogger
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.api.chain.db.{
BlockHeaderDb,
ChainApi,
CompactFilterHeaderDb
}
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.api.chain.db.{BlockHeaderDb, CompactFilterHeaderDb}
import org.bitcoins.core.gcs.{FilterHeader, GolombFilter}
import org.bitcoins.core.p2p.CompactFilterMessage
import org.bitcoins.core.protocol.blockchain.BlockHeader

View file

@ -1,11 +1,15 @@
package org.bitcoins.core.api.chain.db
package org.bitcoins.core.api.chain
import org.bitcoins.core.api.chain.ChainQueryApi
import org.bitcoins.core.api.chain.db.{
BlockHeaderDb,
CompactFilterDb,
CompactFilterHeaderDb
}
import org.bitcoins.core.gcs.FilterHeader
import org.bitcoins.core.p2p.CompactFilterMessage
import org.bitcoins.core.protocol.BlockStamp
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.crypto.DoubleSha256DigestBE
import scala.concurrent.Future
@ -69,14 +73,14 @@ trait ChainApi extends ChainQueryApi {
*/
def nextBlockHeaderBatchRange(
prevStopHash: DoubleSha256DigestBE,
batchSize: Int): Future[Option[(Int, DoubleSha256Digest)]]
batchSize: Int): Future[Option[FilterSyncMarker]]
/**
* Generates a filter header range in form of (startHeight, stopHash) by the given stop hash.
*/
def nextFilterHeaderBatchRange(
stopHash: DoubleSha256DigestBE,
batchSize: Int): Future[Option[(Int, DoubleSha256Digest)]]
batchSize: Int): Future[Option[FilterSyncMarker]]
/**
* Adds a compact filter into the filter database.

View file

@ -0,0 +1,11 @@
package org.bitcoins.core.api.chain
import org.bitcoins.crypto.DoubleSha256Digest
/** This is a helper class for syncing block filters following the
* BIP157 protocol. This indicates the starting block height we are
* syncing filters at, and the last block hash we expect in the batch
* of filters sent back to us by our peer
* @see https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki#cfheaders
*/
case class FilterSyncMarker(startHeight: Int, stopBlockHash: DoubleSha256Digest)

View file

@ -45,7 +45,7 @@ on regtest.
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.chain.blockchain.sync.ChainSync
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.api.chain.db.ChainApi
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.chain.models._
import org.bitcoins.core.api._

View file

@ -2,7 +2,7 @@ package org.bitcoins.node.networking.peer
import akka.Done
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.api.chain.db.ChainApi
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.gcs.BlockFilter
import org.bitcoins.core.p2p._
import org.bitcoins.crypto.DoubleSha256DigestBE
@ -327,10 +327,9 @@ case class DataMessageHandler(
prevStopHash = blockHash,
batchSize = chainConfig.filterHeaderBatchSize)
res <- hashHeightOpt match {
case Some((height, hash)) =>
case Some(filterSyncMarker) =>
peerMsgSender
.sendGetCompactFilterHeadersMessage(startHeight = height,
stopHash = hash)
.sendGetCompactFilterHeadersMessage(filterSyncMarker)
.map(_ => true)
case None =>
sys.error(

View file

@ -9,7 +9,7 @@ import org.bitcoins.chain.models.{
CompactFilterDAO,
CompactFilterHeaderDAO
}
import org.bitcoins.core.api.chain.db.ChainApi
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.p2p.{NetworkMessage, _}
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer

View file

@ -5,7 +5,7 @@ import java.net.InetAddress
import akka.actor.ActorRef
import akka.io.Tcp
import akka.util.Timeout
import org.bitcoins.core.api.chain.db.ChainApi
import org.bitcoins.core.api.chain.{ChainApi, FilterSyncMarker}
import org.bitcoins.core.bloom.BloomFilter
import org.bitcoins.core.number.Int32
import org.bitcoins.core.p2p._
@ -161,21 +161,21 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
}
def sendGetCompactFiltersMessage(
startHeight: Int,
stopHash: DoubleSha256Digest): Future[Unit] = {
filterSyncMarker: FilterSyncMarker): Future[Unit] = {
val message =
GetCompactFiltersMessage(if (startHeight < 0) 0 else startHeight,
stopHash)
GetCompactFiltersMessage(if (filterSyncMarker.startHeight < 0) 0
else filterSyncMarker.startHeight,
filterSyncMarker.stopBlockHash)
logger.debug(s"Sending getcfilters=$message to peer ${client.peer}")
sendMsg(message)
}
def sendGetCompactFilterHeadersMessage(
startHeight: Int,
stopHash: DoubleSha256Digest): Future[Unit] = {
filterSyncMarker: FilterSyncMarker): Future[Unit] = {
val message =
GetCompactFilterHeadersMessage(if (startHeight < 0) 0 else startHeight,
stopHash)
GetCompactFilterHeadersMessage(if (filterSyncMarker.startHeight < 0) 0
else filterSyncMarker.startHeight,
filterSyncMarker.stopBlockHash)
logger.debug(s"Sending getcfheaders=$message to peer ${client.peer}")
sendMsg(message)
}
@ -193,14 +193,14 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
stopHash: DoubleSha256DigestBE)(implicit
ec: ExecutionContext): Future[Boolean] = {
for {
nextRangeOpt <-
filterSyncMarkerOpt <-
chainApi.nextFilterHeaderBatchRange(stopHash, filterBatchSize)
res <- nextRangeOpt match {
case Some((startHeight, stopHash)) =>
res <- filterSyncMarkerOpt match {
case Some(filterSyncMarker) =>
logger.info(
s"Requesting compact filters from=$startHeight to=${stopHash.flip}")
s"Requesting compact filters from=${filterSyncMarker.startHeight} to=${filterSyncMarker.stopBlockHash}")
sendGetCompactFiltersMessage(startHeight, stopHash)
sendGetCompactFiltersMessage(filterSyncMarker)
.map(_ => true)
case None =>
Future.successful(false)
@ -214,15 +214,14 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
prevStopHash: DoubleSha256DigestBE)(implicit
ec: ExecutionContext): Future[Boolean] = {
for {
nextRangeOpt <- chainApi.nextBlockHeaderBatchRange(
filterSyncMarkerOpt <- chainApi.nextBlockHeaderBatchRange(
prevStopHash = prevStopHash,
batchSize = filterHeaderBatchSize)
res <- nextRangeOpt match {
case Some((startHeight, newStopHash)) =>
res <- filterSyncMarkerOpt match {
case Some(filterSyncMarker) =>
logger.info(
s"Requesting next compact filter headers from=$startHeight to=${newStopHash.flip}")
sendGetCompactFilterHeadersMessage(startHeight = startHeight,
stopHash = newStopHash)
s"Requesting next compact filter headers from=${filterSyncMarker.startHeight} to=${filterSyncMarker.stopBlockHash.flip}")
sendGetCompactFilterHeadersMessage(filterSyncMarker)
.map(_ => true)
case None =>
Future.successful(false)

View file

@ -10,6 +10,7 @@ import org.bitcoins.chain.blockchain.sync.ChainSync
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models._
import org.bitcoins.chain.pow.Pow
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.api.chain.db._
import org.bitcoins.core.p2p.CompactFilterMessage
import org.bitcoins.core.protocol.blockchain.{Block, BlockHeader}

View file

@ -4,10 +4,9 @@ import java.net.InetSocketAddress
import akka.actor.{ActorSystem, Cancellable}
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.api.chain.ChainQueryApi
import org.bitcoins.core.api.chain.{ChainApi, ChainQueryApi, FilterSyncMarker}
import org.bitcoins.core.api.chain.db.{
BlockHeaderDb,
ChainApi,
CompactFilterDb,
CompactFilterHeaderDb
}
@ -16,7 +15,7 @@ import org.bitcoins.core.gcs.FilterHeader
import org.bitcoins.core.p2p.CompactFilterMessage
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.core.protocol.{BitcoinAddress, BlockStamp}
import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.crypto.DoubleSha256DigestBE
import org.bitcoins.db.AppConfig
import org.bitcoins.node._
import org.bitcoins.node.config.NodeAppConfig
@ -106,12 +105,12 @@ trait NodeUnitTest extends BitcoinSFixture with EmbeddedPg {
override def nextBlockHeaderBatchRange(
stopHash: DoubleSha256DigestBE,
batchSize: Int): Future[Option[(Int, DoubleSha256Digest)]] =
batchSize: Int): Future[Option[FilterSyncMarker]] =
Future.successful(None)
override def nextFilterHeaderBatchRange(
stopHash: DoubleSha256DigestBE,
batchSize: Int): Future[Option[(Int, DoubleSha256Digest)]] =
batchSize: Int): Future[Option[FilterSyncMarker]] =
Future.successful(None)
override def processFilters(